42 template<rpp::constra
int::observer TObs>
47 disposable_with_observer(TObs&& observer, std::weak_ptr<subject_state> state)
49 , m_state{std::move(state)}
54 void base_dispose_impl(interface_disposable::Mode)
noexcept override
56 if (
const auto shared = m_state.lock())
58 std::unique_lock lock{shared->m_mutex};
59 process_state_unsafe(shared->m_state,
60 [&](
const shared_observers& observers) {
61 shared->m_state = cleanup_observers(observers, this);
66 std::weak_ptr<subject_state> m_state{};
69 using observer = std::shared_ptr<rpp::details::observers::observer_vtable<Type>>;
70 using observers = std::deque<observer>;
71 using shared_observers = std::shared_ptr<observers>;
72 using state_t = std::variant<shared_observers, std::exception_ptr, completed, disposed>;
79 template<rpp::constra
int::observer_of_type<Type> TObs>
80 void on_subscribe(TObs&& observer)
82 std::unique_lock lock{m_mutex};
85 [&](
const shared_observers& observers) {
90 auto new_observers = std::make_shared<subject_state::observers>();
91 new_observers->emplace_back(ptr);
92 m_state = std::move(new_observers);
96 observers->emplace_back(ptr);
100 ptr->set_upstream(d.as_weak());
102 [&](
const std::exception_ptr& err) {
104 observer.on_error(err);
108 observer.on_completed();
112 void on_next(
const Type& v)
114 std::unique_lock observers_lock{m_mutex};
116 if (!std::holds_alternative<shared_observers>(m_state))
120 const auto observers = std::get<shared_observers>(m_state);
124 const auto begin = observers->cbegin();
125 const auto end = observers->cend();
127 observers_lock.unlock();
129 std::lock_guard lock{m_serialized_mutex};
130 std::for_each(begin, end, [&](
const observer& obs) { obs->on_next(v); });
133 void on_error(
const std::exception_ptr& err)
136 std::lock_guard lock{m_serialized_mutex};
137 if (
const auto observers = exchange_observers_under_lock_if_there(err))
138 rpp::utils::for_each(*observers, [&](
const observer& obs) { obs->on_error(err); });
146 std::lock_guard lock{m_serialized_mutex};
147 if (
const auto observers = exchange_observers_under_lock_if_there(
completed{}))
148 rpp::utils::for_each(*observers, [](
const observer& obs) { obs->on_completed(); });
154 void composite_dispose_impl(interface_disposable::Mode)
noexcept override
156 exchange_observers_under_lock_if_there(
disposed{});
161 auto subs = std::make_shared<observers>();
164 std::copy_if(current_subs->cbegin(),
165 current_subs->cend(),
166 std::back_inserter(*subs),
167 [&to_delete](
const observer& obs) {
168 return to_delete != obs.get();
174 static void process_state_unsafe(
const state_t& state,
const auto&... actions)
179 shared_observers exchange_observers_under_lock_if_there(state_t&& new_val)
181 std::lock_guard lock{m_mutex};
183 if (!std::holds_alternative<shared_observers>(m_state))
186 return std::get<shared_observers>(std::exchange(m_state, std::move(new_val)));
191 std::mutex m_mutex{};
192 RPP_NO_UNIQUE_ADDRESS std::conditional_t<Serialized, std::mutex, rpp::utils::none_mutex> m_serialized_mutex{};