12#include <rpp/subscribers/dynamic_subscriber.hpp>
13#include <rpp/utils/constraints.hpp>
14#include <rpp/utils/overloaded.hpp>
15#include <rpp/utils/utilities.hpp>
23namespace rpp::subjects::details
28template<rpp::constra
int::decayed_type T>
29class subject_state :
public std::enable_shared_from_this<subject_state<T>>
32 using shared_subscribers = std::shared_ptr<std::vector<subscriber>>;
33 using weak_subscribers = std::weak_ptr<std::vector<subscriber>>;
34 using state_t = std::variant<shared_subscribers, std::exception_ptr, completed, unsubscribed>;
43 std::unique_lock lock{m_mutex};
45 process_state(m_state,
46 [&](
const shared_subscribers& subs)
48 auto new_subs = make_copy_of_subscribed_subs(subs->size() + 1, subs);
51 m_weak_subscribers = new_subs;
57 [&](std::exception_ptr err)
74 void on_next(
const T& v)
76 if (
auto subs = extract_subscribers_under_lock_if_there())
77 rpp::utils::for_each(*subs, [&](
const auto& sub) { sub.on_next(v); });
80 void on_error(
const std::exception_ptr& err)
82 if (
auto subs = exchange_subscribers_under_lock_if_there(state_t{err}))
83 rpp::utils::for_each(*subs, [&](
const auto& sub) { sub.on_error(err); });
88 if (
auto subs = exchange_subscribers_under_lock_if_there(
completed{}))
94 if (
auto subs = exchange_subscribers_under_lock_if_there(
unsubscribed{}))
99 static void process_state(
const state_t& state,
const auto&...actions)
101 std::visit(rpp::utils::overloaded{ actions..., [](
auto) {} }, state);
104 static shared_subscribers make_copy_of_subscribed_subs(
size_t expected_size, shared_subscribers current_subs)
106 auto subs = std::make_shared<std::vector<dynamic_subscriber<T>>>();
107 subs->reserve(expected_size);
108 std::copy_if(current_subs->cbegin(),
109 current_subs->cend(),
110 std::back_inserter(*subs),
117 auto weak = this->weak_from_this();
120 if (
auto shared = weak.lock())
122 std::unique_lock lock{shared->m_mutex};
123 process_state(shared->m_state,
124 [&](
const shared_subscribers& subs)
126 auto new_size = std::max(subs->size(), size_t{1}) - 1;
127 shared->m_state = shared->make_copy_of_subscribed_subs(new_size, subs);
133 shared_subscribers extract_subscribers_under_lock_if_there()
135 if (
auto locked = m_weak_subscribers.lock())
138 std::unique_lock lock{ m_mutex };
140 if (!std::holds_alternative<shared_subscribers>(m_state))
143 auto subs = std::get<shared_subscribers>(m_state);
144 m_weak_subscribers = subs;
150 shared_subscribers exchange_subscribers_under_lock_if_there(state_t&& new_val)
152 std::unique_lock lock{ m_mutex };
154 if (!std::holds_alternative<shared_subscribers>(m_state))
157 auto subs = std::get<shared_subscribers>(m_state);
158 m_state = std::move(new_val);
164 std::mutex m_mutex{};
165 state_t m_state = std::make_shared<std::vector<subscriber>>();
166 weak_subscribers m_weak_subscribers = std::get<shared_subscribers>(m_state);
std::weak_ptr< details::subscription_state > add(const TSub &sub=TSub{}) const
Add any other subscription to this as dependent.
Definition: composite_subscription.hpp:43
subscriber which uses dynamic_observer<T> to hide original callbacks
Definition: dynamic_subscriber.hpp:24
Definition: subject_state.hpp:30
Definition: subject_state.hpp:25
Definition: subject_state.hpp:26