ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
subject_state.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/observables/fwd.hpp>
13
14#include <rpp/disposables/callback_disposable.hpp>
15#include <rpp/disposables/composite_disposable.hpp>
16#include <rpp/disposables/disposable_wrapper.hpp>
17#include <rpp/observers/dynamic_observer.hpp>
18#include <rpp/utils/constraints.hpp>
19#include <rpp/utils/functors.hpp>
20#include <rpp/utils/utils.hpp>
21
22#include <algorithm>
23#include <deque>
24#include <memory>
25#include <mutex>
26#include <variant>
27
28namespace rpp::subjects::details
29{
30 struct completed
31 {
32 };
33
34 struct disposed
35 {
36 };
37
38 template<rpp::constraint::decayed_type Type, bool Serialized>
40 , public rpp::details::enable_wrapper_from_this<subject_state<Type, Serialized>>
41 {
42 template<rpp::constraint::observer TObs>
43 class disposable_with_observer : public rpp::details::observers::type_erased_observer<TObs>
45 {
46 public:
47 disposable_with_observer(TObs&& observer, std::weak_ptr<subject_state> state)
49 , m_state{std::move(state)}
50 {
51 }
52
53 private:
54 void base_dispose_impl(interface_disposable::Mode) noexcept override
55 {
56 if (const auto shared = m_state.lock())
57 {
58 std::unique_lock lock{shared->m_mutex};
59 process_state_unsafe(shared->m_state,
60 [&](const shared_observers& observers) {
61 shared->m_state = cleanup_observers(observers, this);
62 });
63 }
64 }
65
66 std::weak_ptr<subject_state> m_state{};
67 };
68
69 using observer = std::shared_ptr<rpp::details::observers::observer_vtable<Type>>;
70 using observers = std::deque<observer>;
71 using shared_observers = std::shared_ptr<observers>;
72 using state_t = std::variant<shared_observers, std::exception_ptr, completed, disposed>;
73
74 public:
76
77 subject_state() = default;
78
79 template<rpp::constraint::observer_of_type<Type> TObs>
80 void on_subscribe(TObs&& observer)
81 {
82 std::unique_lock lock{m_mutex};
83 process_state_unsafe(
84 m_state,
85 [&](const shared_observers& observers) {
86 auto d = disposable_wrapper_impl<disposable_with_observer<std::decay_t<TObs>>>::make(std::forward<TObs>(observer), this->wrapper_from_this().lock());
87 auto ptr = d.lock();
88 if (!observers)
89 {
90 auto new_observers = std::make_shared<subject_state::observers>();
91 new_observers->emplace_back(ptr);
92 m_state = std::move(new_observers);
93 }
94 else
95 {
96 observers->emplace_back(ptr);
97 }
98
99 lock.unlock();
100 ptr->set_upstream(d.as_weak());
101 },
102 [&](const std::exception_ptr& err) {
103 lock.unlock();
104 observer.on_error(err);
105 },
106 [&](completed) {
107 lock.unlock();
108 observer.on_completed();
109 });
110 }
111
112 void on_next(const Type& v)
113 {
114 std::unique_lock observers_lock{m_mutex};
115
116 if (!std::holds_alternative<shared_observers>(m_state))
117 return;
118
119 // we are getting copy of curent deque and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call
120 const auto observers = std::get<shared_observers>(m_state);
121 if (!observers)
122 return;
123
124 const auto begin = observers->cbegin();
125 const auto end = observers->cend();
126
127 observers_lock.unlock();
128
129 std::lock_guard lock{m_serialized_mutex};
130 std::for_each(begin, end, [&](const observer& obs) { obs->on_next(v); });
131 }
132
133 void on_error(const std::exception_ptr& err)
134 {
135 {
136 std::lock_guard lock{m_serialized_mutex};
137 if (const auto observers = exchange_observers_under_lock_if_there(err))
138 rpp::utils::for_each(*observers, [&](const observer& obs) { obs->on_error(err); });
139 }
140 dispose();
141 }
142
143 void on_completed()
144 {
145 {
146 std::lock_guard lock{m_serialized_mutex};
147 if (const auto observers = exchange_observers_under_lock_if_there(completed{}))
148 rpp::utils::for_each(*observers, [](const observer& obs) { obs->on_completed(); });
149 }
150 dispose();
151 }
152
153 private:
154 void composite_dispose_impl(interface_disposable::Mode) noexcept override
155 {
156 exchange_observers_under_lock_if_there(disposed{});
157 }
158
159 static shared_observers cleanup_observers(const shared_observers& current_subs, const rpp::details::observers::observer_vtable<Type>* to_delete)
160 {
161 auto subs = std::make_shared<observers>();
162 if (current_subs)
163 {
164 std::copy_if(current_subs->cbegin(),
165 current_subs->cend(),
166 std::back_inserter(*subs),
167 [&to_delete](const observer& obs) {
168 return to_delete != obs.get();
169 });
170 }
171 return subs;
172 }
173
174 static void process_state_unsafe(const state_t& state, const auto&... actions)
175 {
176 std::visit(rpp::utils::overloaded{actions..., rpp::utils::empty_function_any_t{}}, state);
177 }
178
179 shared_observers exchange_observers_under_lock_if_there(state_t&& new_val)
180 {
181 std::lock_guard lock{m_mutex};
182
183 if (!std::holds_alternative<shared_observers>(m_state))
184 return {};
185
186 return std::get<shared_observers>(std::exchange(m_state, std::move(new_val)));
187 }
188
189 private:
190 state_t m_state;
191 std::mutex m_mutex{};
192 RPP_NO_UNIQUE_ADDRESS std::conditional_t<Serialized, std::mutex, rpp::utils::none_mutex> m_serialized_mutex{};
193 };
194} // namespace rpp::subjects::details
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Definition base_disposable.hpp:23
Definition disposable_wrapper.hpp:252
Definition dynamic_observer.hpp:24
Definition dynamic_observer.hpp:56
Main RPP wrapper over disposables.
Definition fwd.hpp:27
Definition subject_state.hpp:41
Definition disposables_strategy.hpp:29
void dispose() noexcept
Dispose disposable and free any underlying resources and etc.
Definition interface_disposable.hpp:36
Definition subject_state.hpp:31
Definition subject_state.hpp:35
Definition functors.hpp:38
Definition functors.hpp:20