ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
observer.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/observers/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/disposables/composite_disposable.hpp>
17#include <rpp/disposables/disposable_wrapper.hpp>
18#include <rpp/observers/details/disposables_strategy.hpp>
19#include <rpp/utils/exceptions.hpp>
20#include <rpp/utils/functors.hpp>
21#include <rpp/utils/utils.hpp>
22
23#include <exception>
24
25namespace rpp::details
26{
27 template<constraint::decayed_type Type, constraint::observer_strategy<Type> Strategy, observers::constraint::disposables_strategy DisposablesStrategy>
28 class observer_impl
29 {
30 protected:
31 template<typename... Args>
32 requires constraint::is_constructible_from<Strategy, Args&&...>
33 explicit observer_impl(DisposablesStrategy strategy, Args&&... args)
34 : m_strategy{std::forward<Args>(args)...}
35 , m_disposable{std::move(strategy)}
36 {
37 }
38
39 public:
40 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
41
42 using on_next_lvalue = void (observer_impl::*)(const Type&) const noexcept;
43 using on_next_rvalue = void (observer_impl::*)(Type&&) const noexcept;
44
49 void set_upstream(const disposable_wrapper& d) noexcept
50 {
51 if (is_disposed())
52 {
53 d.dispose();
54 return;
55 }
56
57 try
58 {
59 m_disposable.add(d);
60 m_strategy.set_upstream(d);
61 }
62 catch (...)
63 {
64 on_error(std::current_exception());
65 }
66 }
67
74 bool is_disposed() const noexcept
75 {
76 return m_disposable.is_disposed() || m_strategy.is_disposed();
77 }
78
84 void on_next(const Type& v) const noexcept
85 {
86 try
87 {
88 if (!is_disposed())
89 m_strategy.on_next(v);
90 }
91 catch (...)
92 {
93 on_error(std::current_exception());
94 }
95 }
96
102 void on_next(Type&& v) const noexcept
103 {
104 try
105 {
106 if (!is_disposed())
107 m_strategy.on_next(std::move(v));
108 }
109 catch (...)
110 {
111 on_error(std::current_exception());
112 }
113 }
114
120 void on_error(const std::exception_ptr& err) const noexcept
121 {
122 if (!is_disposed())
123 {
124 rpp::utils::finally_action finally{[&] {
125 m_disposable.dispose();
126 }};
127 m_strategy.on_error(err);
128 }
129 }
130
135 void on_completed() const noexcept
136 {
137 if (!is_disposed())
138 {
139 rpp::utils::finally_action finally{[&] {
140 m_disposable.dispose();
141 }};
142 m_strategy.on_completed();
143 }
144 }
145
146 private:
147 RPP_NO_UNIQUE_ADDRESS Strategy m_strategy;
148 RPP_NO_UNIQUE_ADDRESS mutable DisposablesStrategy m_disposable;
149 };
150} // namespace rpp::details
151
152namespace rpp
153{
167 template<constraint::decayed_type Type, constraint::observer_strategy<Type> Strategy>
168 class observer;
169
170 template<constraint::decayed_type Type, constraint::observer_strategy<Type> Strategy>
171 class observer final : public details::observer_impl<Type, Strategy, details::observers::deduce_optimal_disposables_strategy_t<Strategy::preferred_disposables_mode>>
172 {
173 public:
174 using DisposableStrategy = details::observers::deduce_optimal_disposables_strategy_t<Strategy::preferred_disposables_mode>;
176
177 template<typename... Args>
178 requires constraint::is_constructible_from<Strategy, Args&&...>
179 explicit observer(DisposableStrategy strategy, Args&&... args)
180 : Base{std::move(strategy), std::forward<Args>(args)...}
181 {
182 }
183
184 template<typename... Args>
185 requires (constraint::is_constructible_from<Strategy, Args && ...> && !rpp::constraint::variadic_decayed_same_as<observer, Args...>)
186 explicit observer(Args&&... args)
187 : Base{DisposableStrategy{}, std::forward<Args>(args)...}
188 {
189 }
190
191 observer(const observer&) = delete;
192 observer(observer&&) noexcept = default;
193
198 {
199 return dynamic_observer<Type>{std::move(*this)};
200 }
201 };
202
203 template<constraint::decayed_type Type, constraint::observer_strategy<Type> Strategy, rpp::details::observers::constraint::disposables_strategy DisposableStrategy>
204 class observer<Type, details::observers::override_disposables_strategy<Strategy, DisposableStrategy>> final : public details::observer_impl<Type, Strategy, DisposableStrategy>
205 {
206 public:
208
209 template<typename... Args>
210 requires constraint::is_constructible_from<Strategy, Args&&...>
211 explicit observer(DisposableStrategy strategy, Args&&... args)
212 : Base{std::move(strategy), std::forward<Args>(args)...}
213 {
214 }
215
216 template<typename... Args>
217 requires (constraint::is_constructible_from<Strategy, Args && ...> && !rpp::constraint::variadic_decayed_same_as<observer, Args...>)
218 explicit observer(Args&&... args)
219 : Base{DisposableStrategy{}, std::forward<Args>(args)...}
220 {
221 }
222
223 observer(const observer&) = delete;
224 observer(observer&&) noexcept = default;
225
230 {
231 return dynamic_observer<Type>{std::move(*this)};
232 }
233 };
234
235 template<constraint::decayed_type Type>
236 class observer<Type, rpp::details::observers::dynamic_strategy<Type>>
237 : public details::observer_impl<Type, rpp::details::observers::dynamic_strategy<Type>, details::observers::none_disposables_strategy>
238 {
239 public:
240 template<constraint::observer_strategy<Type> TStrategy>
241 requires (!std::same_as<TStrategy, rpp::details::observers::dynamic_strategy<Type>>)
242 observer(observer<Type, TStrategy>&& other)
244 {
245 }
246
248 {
249 return dynamic_observer<Type>{std::move(*this)};
250 }
251
253 {
254 return dynamic_observer<Type>{*this};
255 }
256 };
257
258
259} // namespace rpp
Definition observer.hpp:29
void on_next(const Type &v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:84
void set_upstream(const disposable_wrapper &d) noexcept
Observable calls this method to pass disposable. Observer disposes this disposable WHEN observer want...
Definition observer.hpp:49
void on_next(Type &&v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:102
void on_completed() const noexcept
Observable calls this method to notify observer about completion of emissions.
Definition observer.hpp:135
void on_error(const std::exception_ptr &err) const noexcept
Observable calls this method to notify observer about some error during generation next data.
Definition observer.hpp:120
bool is_disposed() const noexcept
Observable calls this method to check if observer interested or not in emissions.
Definition observer.hpp:74
Definition dynamic_observer.hpp:92
Type-erased version of the rpp::observer. Any observer can be converted to dynamic_observer via rpp::...
Definition dynamic_observer.hpp:129
dynamic_observer< Type > as_dynamic() &&
Convert current observer to type-erased version. Useful if you need to COPY your observer or to store...
Definition observer.hpp:229
dynamic_observer< Type > as_dynamic() &&
Convert current observer to type-erased version. Useful if you need to COPY your observer or to store...
Definition observer.hpp:197
Calls passed function during destruction.
Definition utils.hpp:120
Definition constraints.hpp:43
Definition constraints.hpp:31
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition disposables_strategy.hpp:51