ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
subject_state.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/subscribers/dynamic_subscriber.hpp>
13#include <rpp/utils/constraints.hpp>
14#include <rpp/utils/overloaded.hpp>
15#include <rpp/utils/utilities.hpp>
16
17#include <functional>
18#include <memory>
19#include <mutex>
20#include <variant>
21#include <vector>
22
23namespace rpp::subjects::details
24{
25struct completed {};
26struct unsubscribed {};
27
28template<rpp::constraint::decayed_type T>
29class subject_state : public std::enable_shared_from_this<subject_state<T>>
30{
32 using shared_subscribers = std::shared_ptr<std::vector<subscriber>>;
33 using weak_subscribers = std::weak_ptr<std::vector<subscriber>>;
34 using state_t = std::variant<shared_subscribers, std::exception_ptr, completed, unsubscribed>;
35
36public:
37 subject_state() = default;
38 subject_state(const subject_state&) = delete;
39 subject_state(subject_state&&) noexcept = delete;
40
41 void on_subscribe(const subscriber& subscriber)
42 {
43 std::unique_lock lock{m_mutex};
44
45 process_state(m_state,
46 [&](const shared_subscribers& subs)
47 {
48 auto new_subs = make_copy_of_subscribed_subs(subs->size() + 1, subs);
49 new_subs->push_back(subscriber);
50 m_state = new_subs;
51 m_weak_subscribers = new_subs;
52
53 lock.unlock();
54
55 add_callback_on_unsubscribe(subscriber);
56 },
57 [&](std::exception_ptr err)
58 {
59 lock.unlock();
60 subscriber.on_error(err);
61 },
62 [&](completed)
63 {
64 lock.unlock();
65 subscriber.on_completed();
66 },
67 [&](unsubscribed)
68 {
69 lock.unlock();
70 subscriber.unsubscribe();
71 });
72 }
73
74 void on_next(const T& v)
75 {
76 if (auto subs = extract_subscribers_under_lock_if_there())
77 rpp::utils::for_each(*subs, [&](const auto& sub) { sub.on_next(v); });
78 }
79
80 void on_error(const std::exception_ptr& err)
81 {
82 if (auto subs = exchange_subscribers_under_lock_if_there(state_t{err}))
83 rpp::utils::for_each(*subs, [&](const auto& sub) { sub.on_error(err); });
84 }
85
86 void on_completed()
87 {
88 if (auto subs = exchange_subscribers_under_lock_if_there(completed{}))
89 rpp::utils::for_each(*subs, std::mem_fn(&dynamic_subscriber<T>::on_completed));
90 }
91
92 void on_unsubscribe()
93 {
94 if (auto subs = exchange_subscribers_under_lock_if_there(unsubscribed{}))
95 rpp::utils::for_each(*subs, std::mem_fn(&dynamic_subscriber<T>::unsubscribe));
96 }
97
98private:
99 static void process_state(const state_t& state, const auto&...actions)
100 {
101 std::visit(rpp::utils::overloaded{ actions..., [](auto) {} }, state);
102 }
103
104 static shared_subscribers make_copy_of_subscribed_subs(size_t expected_size, shared_subscribers current_subs)
105 {
106 auto subs = std::make_shared<std::vector<dynamic_subscriber<T>>>();
107 subs->reserve(expected_size);
108 std::copy_if(current_subs->cbegin(),
109 current_subs->cend(),
110 std::back_inserter(*subs),
112 return subs;
113 }
114
115 void add_callback_on_unsubscribe(const dynamic_subscriber<T>& subscriber)
116 {
117 auto weak = this->weak_from_this();
118 subscriber.get_subscription().add([weak]
119 {
120 if (auto shared = weak.lock())
121 {
122 std::unique_lock lock{shared->m_mutex};
123 process_state(shared->m_state,
124 [&](const shared_subscribers& subs)
125 {
126 auto new_size = std::max(subs->size(), size_t{1}) - 1;
127 shared->m_state = shared->make_copy_of_subscribed_subs(new_size, subs);
128 });
129 }
130 });
131 }
132
133 shared_subscribers extract_subscribers_under_lock_if_there()
134 {
135 if (auto locked = m_weak_subscribers.lock())
136 return locked;
137
138 std::unique_lock lock{ m_mutex };
139
140 if (!std::holds_alternative<shared_subscribers>(m_state))
141 return {};
142
143 auto subs = std::get<shared_subscribers>(m_state);
144 m_weak_subscribers = subs;
145
146 lock.unlock();
147 return subs;
148 }
149
150 shared_subscribers exchange_subscribers_under_lock_if_there(state_t&& new_val)
151 {
152 std::unique_lock lock{ m_mutex };
153
154 if (!std::holds_alternative<shared_subscribers>(m_state))
155 return {};
156
157 auto subs = std::get<shared_subscribers>(m_state);
158 m_state = std::move(new_val);
159
160 lock.unlock();
161 return subs;
162 }
163private:
164 std::mutex m_mutex{};
165 state_t m_state = std::make_shared<std::vector<subscriber>>();
166 weak_subscribers m_weak_subscribers = std::get<shared_subscribers>(m_state);
167};
168} // namespace rpp::subjects::details
std::weak_ptr< details::subscription_state > add(const TSub &sub=TSub{}) const
Add any other subscription to this as dependent.
Definition: composite_subscription.hpp:43
subscriber which uses dynamic_observer<T> to hide original callbacks
Definition: dynamic_subscriber.hpp:24
Definition: subject_state.hpp:30
Definition: subject_state.hpp:25
Definition: subject_state.hpp:26