ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
subject_state.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/observables/fwd.hpp>
13
14#include <rpp/disposables/callback_disposable.hpp>
15#include <rpp/disposables/composite_disposable.hpp>
16#include <rpp/disposables/disposable_wrapper.hpp>
17#include <rpp/observers/dynamic_observer.hpp>
18#include <rpp/utils/constraints.hpp>
19#include <rpp/utils/functors.hpp>
20#include <rpp/utils/utils.hpp>
21
22#include <algorithm>
23#include <list>
24#include <memory>
25#include <mutex>
26#include <variant>
27
28namespace rpp::subjects::details
29{
30 struct completed
31 {
32 };
33
34 struct disposed
35 {
36 };
37
38 template<rpp::constraint::decayed_type Type, bool Serialized>
39 class subject_state : public composite_disposable
40 , public rpp::details::enable_wrapper_from_this<subject_state<Type, Serialized>>
41 {
42 template<rpp::constraint::observer TObs>
43 class disposable_with_observer : public rpp::details::observers::type_erased_observer<TObs>
44 , public rpp::details::base_disposable
45 {
46 public:
47 disposable_with_observer(TObs&& observer, std::weak_ptr<subject_state> state)
49 , m_state{std::move(state)}
50 {
51 }
52
53 private:
54 void base_dispose_impl(interface_disposable::Mode) noexcept override
55 {
56 if (const auto shared = m_state.lock())
57 {
58 std::unique_lock lock{shared->m_mutex};
59 process_state_unsafe(shared->m_state,
60 [&](const shared_observers& observers) {
61 shared->m_state = cleanup_observers(observers, this);
62 });
63 }
64 }
65
66 std::weak_ptr<subject_state> m_state{};
67 };
68
69 using observer = std::shared_ptr<rpp::details::observers::observer_vtable<Type>>;
70 using observers = std::list<observer>;
71 using shared_observers = std::shared_ptr<observers>;
72 using state_t = std::variant<shared_observers, std::exception_ptr, completed, disposed>;
73
74 public:
75 using optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>;
76
77 subject_state() = default;
78
79 template<rpp::constraint::observer_of_type<Type> TObs>
80 void on_subscribe(TObs&& observer)
81 {
82 std::unique_lock lock{m_mutex};
83 process_state_unsafe(
84 m_state,
85 [&](const shared_observers& observers) {
86 auto d = disposable_wrapper_impl<disposable_with_observer<std::decay_t<TObs>>>::make(std::forward<TObs>(observer), this->wrapper_from_this().lock());
87 auto ptr = d.lock();
88 if (!observers)
89 {
90 auto new_observers = std::make_shared<subject_state::observers>();
91 new_observers->emplace_back(ptr);
92 m_state = std::move(new_observers);
93 }
94 else
95 {
96 observers->emplace_back(ptr);
97 }
98
99 lock.unlock();
100 ptr->set_upstream(d.as_weak());
101 },
102 [&](const std::exception_ptr& err) {
103 lock.unlock();
104 observer.on_error(err);
105 },
106 [&](completed) {
107 lock.unlock();
108 observer.on_completed();
109 });
110 }
111
112 void on_next(const Type& v)
113 {
114 std::unique_lock observers_lock{m_mutex};
115 process_state_unsafe(m_state, [&](shared_observers observers) {
116 if (!observers)
117 return;
118
119 auto itr = observers->cbegin();
120 const auto size = observers->size();
121
122 observers_lock.unlock();
123
124 std::lock_guard lock{m_serialized_mutex};
125 for (size_t i = 0; i < size; ++i)
126 {
127 (*(itr++))->on_next(v);
128 }
129 });
130 }
131
132 void on_error(const std::exception_ptr& err)
133 {
134 {
135 std::lock_guard lock{m_serialized_mutex};
136 if (const auto observers = exchange_observers_under_lock_if_there(err))
137 rpp::utils::for_each(*observers, [&](const observer& obs) { obs->on_error(err); });
138 }
139 dispose();
140 }
141
142 void on_completed()
143 {
144 {
145 std::lock_guard lock{m_serialized_mutex};
146 if (const auto observers = exchange_observers_under_lock_if_there(completed{}))
147 rpp::utils::for_each(*observers, [](const observer& obs) { obs->on_completed(); });
148 }
149 dispose();
150 }
151
152 private:
153 void composite_dispose_impl(interface_disposable::Mode) noexcept override
154 {
155 exchange_observers_under_lock_if_there(disposed{});
156 }
157
158 static shared_observers cleanup_observers(const shared_observers& current_subs, const rpp::details::observers::observer_vtable<Type>* to_delete)
159 {
160 auto subs = std::make_shared<observers>();
161 if (current_subs)
162 {
163 std::copy_if(current_subs->cbegin(),
164 current_subs->cend(),
165 std::back_inserter(*subs),
166 [&to_delete](const observer& obs) {
167 return to_delete != obs.get();
168 });
169 }
170 return subs;
171 }
172
173 static auto process_state_unsafe(const state_t& state, const auto&... actions)
174 {
175 return std::visit(rpp::utils::overloaded{actions..., rpp::utils::empty_function_any_t{}}, state);
176 }
177
178 shared_observers exchange_observers_under_lock_if_there(state_t&& new_val)
179 {
180 std::lock_guard lock{m_mutex};
181
182 return process_state_unsafe(m_state, [&](shared_observers observers) {
183 m_state = std::move(new_val);
184 return observers; }, [](auto) { return shared_observers{}; });
185 }
186
187 private:
188 state_t m_state;
189 std::mutex m_mutex{};
190 RPP_NO_UNIQUE_ADDRESS std::conditional_t<Serialized, std::mutex, rpp::utils::none_mutex> m_serialized_mutex{};
191 };
192} // namespace rpp::subjects::details
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Definition disposable_wrapper.hpp:252
Definition dynamic_observer.hpp:24
Definition dynamic_observer.hpp:56
Definition disposables_strategy.hpp:29
void dispose() noexcept
Dispose disposable and free any underlying resources and etc.
Definition interface_disposable.hpp:36
Definition subject_state.hpp:31
Definition subject_state.hpp:35
Definition functors.hpp:38
Definition functors.hpp:20