42 template<rpp::constra
int::observer TObs>
44 ,
public rpp::details::base_disposable
47 disposable_with_observer(TObs&& observer, std::weak_ptr<subject_state> state)
49 , m_state{std::move(state)}
54 void base_dispose_impl(interface_disposable::Mode)
noexcept override
56 if (
const auto shared = m_state.lock())
58 std::unique_lock lock{shared->m_mutex};
59 process_state_unsafe(shared->m_state,
60 [&](
const shared_observers& observers) {
61 shared->m_state = cleanup_observers(observers, this);
66 std::weak_ptr<subject_state> m_state{};
69 using observer = std::shared_ptr<rpp::details::observers::observer_vtable<Type>>;
70 using observers = std::list<observer>;
71 using shared_observers = std::shared_ptr<observers>;
72 using state_t = std::variant<shared_observers, std::exception_ptr, completed, disposed>;
77 subject_state() =
default;
79 template<rpp::constra
int::observer_of_type<Type> TObs>
80 void on_subscribe(TObs&& observer)
82 std::unique_lock lock{m_mutex};
85 [&](
const shared_observers& observers) {
86 auto d = disposable_wrapper_impl<disposable_with_observer<std::decay_t<TObs>>>::make(std::forward<TObs>(observer), this->wrapper_from_this().lock());
90 auto new_observers = std::make_shared<subject_state::observers>();
91 new_observers->emplace_back(ptr);
92 m_state = std::move(new_observers);
96 observers->emplace_back(ptr);
100 ptr->set_upstream(d.as_weak());
102 [&](
const std::exception_ptr& err) {
104 observer.on_error(err);
108 observer.on_completed();
112 void on_next(
const Type& v)
114 std::unique_lock observers_lock{m_mutex};
115 process_state_unsafe(m_state, [&](shared_observers observers) {
119 auto itr = observers->cbegin();
120 const auto size = observers->size();
122 observers_lock.unlock();
124 std::lock_guard lock{m_serialized_mutex};
125 for (
size_t i = 0; i < size; ++i)
127 (*(itr++))->on_next(v);
132 void on_error(
const std::exception_ptr& err)
135 std::lock_guard lock{m_serialized_mutex};
136 if (
const auto observers = exchange_observers_under_lock_if_there(err))
137 rpp::utils::for_each(*observers, [&](
const observer& obs) { obs->on_error(err); });
145 std::lock_guard lock{m_serialized_mutex};
146 if (
const auto observers = exchange_observers_under_lock_if_there(
completed{}))
147 rpp::utils::for_each(*observers, [](
const observer& obs) { obs->on_completed(); });
153 void composite_dispose_impl(interface_disposable::Mode)
noexcept override
155 exchange_observers_under_lock_if_there(
disposed{});
160 auto subs = std::make_shared<observers>();
163 std::copy_if(current_subs->cbegin(),
164 current_subs->cend(),
165 std::back_inserter(*subs),
166 [&to_delete](
const observer& obs) {
167 return to_delete != obs.get();
173 static auto process_state_unsafe(
const state_t& state,
const auto&... actions)
178 shared_observers exchange_observers_under_lock_if_there(state_t&& new_val)
180 std::lock_guard lock{m_mutex};
182 return process_state_unsafe(m_state, [&](shared_observers observers) {
183 m_state = std::move(new_val);
184 return observers; }, [](
auto) {
return shared_observers{}; });
189 std::mutex m_mutex{};
190 RPP_NO_UNIQUE_ADDRESS std::conditional_t<Serialized, std::mutex, rpp::utils::none_mutex> m_serialized_mutex{};