ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
observable.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/observables/fwd.hpp>
13
14#include <rpp/defs.hpp>
15#include <rpp/disposables/composite_disposable.hpp>
16#include <rpp/disposables/disposable_wrapper.hpp>
17#include <rpp/observables/details/chain_strategy.hpp>
18#include <rpp/observers/lambda_observer.hpp>
19#include <rpp/operators/subscribe.hpp>
20
21namespace rpp
22{
36 template<constraint::decayed_type Type, constraint::observable_strategy<Type> Strategy>
38 {
39 public:
40 using value_type = Type;
41 using strategy_type = Strategy;
42
43 using optimal_disposables_strategy = typename Strategy::optimal_disposables_strategy;
44
45 template<typename... Args>
47 observable(Args&&... args)
48 : m_strategy{std::forward<Args>(args)...}
49 {
50 }
51
57 template<constraint::observer_strategy<Type> ObserverStrategy>
59 {
60 if (!observer.is_disposed())
61 m_strategy.subscribe(std::move(observer));
62 }
63
72
76 template<constraint::observer_strategy<Type> ObserverStrategy>
78 void subscribe(ObserverStrategy&& observer_strategy) const
79 {
80 if constexpr (std::decay_t<ObserverStrategy>::preferred_disposables_mode == rpp::details::observers::disposables_mode::Auto)
81 subscribe(rpp::observer<Type, rpp::details::observers::override_disposables_strategy<std::decay_t<ObserverStrategy>, typename optimal_disposables_strategy::observer_disposables_strategy>>{std::forward<ObserverStrategy>(observer_strategy)});
82 else
83 subscribe(rpp::observer<Type, std::decay_t<ObserverStrategy>>{std::forward<ObserverStrategy>(observer_strategy)});
84 }
85
109 template<constraint::observer_strategy<Type> ObserverStrategy>
111 {
112 if (!d.is_disposed())
113 m_strategy.subscribe(observer_with_external_disposable<Type, observer<Type, ObserverStrategy>>{d, std::move(obs)});
114 return d;
115 }
116
126 template<constraint::observer_strategy<Type> ObserverStrategy>
128 composite_disposable_wrapper subscribe(const composite_disposable_wrapper& d, ObserverStrategy&& observer_strategy) const
129 {
130 subscribe(observer_with_external_disposable<Type, std::decay_t<ObserverStrategy>>{d, std::forward<ObserverStrategy>(observer_strategy)});
131 return d;
132 }
133
143 template<constraint::observer_strategy<Type> ObserverStrategy>
150
158 template<constraint::observer_strategy<Type> ObserverStrategy>
160 [[nodiscard("Use returned disposable or use subscribe(observer) instead")]] composite_disposable_wrapper subscribe_with_disposable(ObserverStrategy&& observer_strategy) const
161 {
163 }
164
178
182 template<std::invocable<Type> OnNext,
183 std::invocable<const std::exception_ptr&> OnError = rpp::utils::rethrow_error_t,
184 std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
185 void subscribe(OnNext&& on_next,
186 OnError&& on_error = {},
187 OnCompleted&& on_completed = {}) const
188 {
189 using strategy = rpp::details::observers::lambda_strategy<Type, std::decay_t<OnNext>, std::decay_t<OnError>, std::decay_t<OnCompleted>>;
190
192 std::forward<OnError>(on_error),
193 std::forward<OnCompleted>(on_completed)});
194 }
195
199 template<std::invocable<Type> OnNext,
200 std::invocable<> OnCompleted>
201 void subscribe(OnNext&& on_next,
202 OnCompleted&& on_completed) const
203 {
204 subscribe(std::forward<OnNext>(on_next), rpp::utils::rethrow_error_t{}, std::forward<OnCompleted>(on_completed));
205 }
206
214 template<std::invocable<Type> OnNext,
215 std::invocable<const std::exception_ptr&> OnError = rpp::utils::rethrow_error_t,
216 std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
217 [[nodiscard("Use returned disposable or use subscribe(on_next, on_error, on_completed) instead")]] composite_disposable_wrapper subscribe_with_disposable(OnNext&& on_next, OnError&& on_error = {}, OnCompleted&& on_completed = {}) const
218 {
221 std::forward<OnNext>(on_next),
222 std::forward<OnError>(on_error),
223 std::forward<OnCompleted>(on_completed)));
224 return res;
225 }
226
234 template<std::invocable<Type> OnNext,
235 std::invocable<> OnCompleted>
236 [[nodiscard("Use returned disposable or use subscribe(on_next, on_error, on_completed) instead")]] composite_disposable_wrapper subscribe_with_disposable(OnNext&& on_next, OnCompleted&& on_completed) const
237 {
238 return subscribe_with_disposable(std::forward<OnNext>(on_next), rpp::utils::rethrow_error_t{}, std::forward<OnCompleted>(on_completed));
239 }
240
266 template<std::invocable<Type> OnNext,
267 std::invocable<const std::exception_ptr&> OnError = rpp::utils::rethrow_error_t,
268 std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
269 composite_disposable_wrapper subscribe(const composite_disposable_wrapper& d, OnNext&& on_next, OnError&& on_error = {}, OnCompleted&& on_completed = {}) const
270 {
271 if (!d.is_disposed())
273 std::forward<OnNext>(on_next),
274 std::forward<OnError>(on_error),
275 std::forward<OnCompleted>(on_completed)));
276 return d;
277 }
278
304 template<std::invocable<Type> OnNext,
305 std::invocable<> OnCompleted>
306 composite_disposable_wrapper subscribe(const composite_disposable_wrapper& d, OnNext&& on_next, OnCompleted&& on_completed) const
307 {
308 return subscribe(d, std::forward<OnNext>(on_next), rpp::utils::rethrow_error_t{}, std::forward<OnCompleted>(on_completed));
309 }
310
314 auto as_dynamic() const & { return rpp::dynamic_observable<Type>{*this}; }
315
319 auto as_dynamic() && { return rpp::dynamic_observable<Type>{std::move(*this)}; }
320
321 template<typename Subscribe>
323 auto operator|(Subscribe&& op) const
324 {
325 return std::forward<Subscribe>(op)(*this);
326 }
327
328 template<typename Op>
330 rpp::constraint::observable auto operator|(Op&& op) const &
331 {
332 if constexpr (requires { typename std::decay_t<Op>::template operator_traits<Type>; })
333 {
334 using result_type = typename std::decay_t<Op>::template operator_traits<Type>::result_type;
335 if constexpr (requires { typename std::decay_t<Op>::template operator_traits<Type>::result_type; }) // narrow compilataion error a bit
336 return observable<result_type, details::observables::make_chain_t<std::decay_t<Op>, Strategy>>{std::forward<Op>(op), m_strategy};
337 }
338 else
339 {
340 return std::forward<Op>(op)(*this);
341 }
342 }
343
344 template<typename Op>
346 rpp::constraint::observable auto operator|(Op&& op) &&
347 {
348 if constexpr (requires { typename std::decay_t<Op>::template operator_traits<Type>; })
349 {
350 using result_type = typename std::decay_t<Op>::template operator_traits<Type>::result_type;
351 if constexpr (requires { typename std::decay_t<Op>::template operator_traits<Type>::result_type; }) // narrow compilataion error a bit
352 return observable<result_type, details::observables::make_chain_t<std::decay_t<Op>, Strategy>>{std::forward<Op>(op), std::move(m_strategy)};
353 }
354 else
355 {
356 return std::forward<Op>(op)(std::move(*this));
357 }
358 }
359
360 template<typename Op>
361 auto pipe(Op&& op) const &
362 {
363 return *this | std::forward<Op>(op);
364 }
365
366 template<typename Op>
367 auto pipe(Op&& op) &&
368 {
369 return std::move(*this) | std::forward<Op>(op);
370 }
371
372 private:
373 RPP_NO_UNIQUE_ADDRESS Strategy m_strategy;
374 };
375} // namespace rpp
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:31
bool is_disposed() const noexcept
Observable calls this method to check if observer interested or not in emissions.
Definition observer.hpp:74
static disposable_wrapper_impl make(TArgs &&... args)
Definition disposable_wrapper.hpp:164
static disposable_wrapper_impl empty()
Definition disposable_wrapper.hpp:178
Type-erased version of the rpp::observable. Any observable can be converted to dynamic_observable via...
Definition fwd.hpp:173
Type-erased version of the rpp::observer. Any observer can be converted to dynamic_observer via rpp::...
Definition fwd.hpp:106
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
composite_disposable_wrapper subscribe(const composite_disposable_wrapper &d, OnNext &&on_next, OnCompleted &&on_completed) const
Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
Definition observable.hpp:306
composite_disposable_wrapper subscribe(const composite_disposable_wrapper &d, observer< Type, ObserverStrategy > &&obs) const
Subscribe passed observer to emissions from this observable.
Definition observable.hpp:110
void subscribe(observer< Type, ObserverStrategy > &&observer) const
Subscribes passed observer to emissions from this observable.
Definition observable.hpp:58
composite_disposable_wrapper subscribe(const composite_disposable_wrapper &d, ObserverStrategy &&observer_strategy) const
Subscribes passed observer strategy to emissions from this observable via construction of observer.
Definition observable.hpp:128
auto as_dynamic() &&
Convert observable to type-erased version.
Definition observable.hpp:319
composite_disposable_wrapper subscribe_with_disposable(ObserverStrategy &&observer_strategy) const
Subscribes observer strategy to emissions from this observable.
Definition observable.hpp:160
void subscribe(dynamic_observer< Type > observer) const
Subscribe passed observer to emissions from this observable.
Definition observable.hpp:68
composite_disposable_wrapper subscribe_with_disposable(OnNext &&on_next, OnCompleted &&on_completed) const
Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
Definition observable.hpp:236
auto as_dynamic() const &
Convert observable to type-erased version.
Definition observable.hpp:314
void subscribe(ObserverStrategy &&observer_strategy) const
Subscribes passed observer strategy to emissions from this observable via construction of observer.
Definition observable.hpp:78
void subscribe(OnNext &&on_next, OnCompleted &&on_completed) const
Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
Definition observable.hpp:201
composite_disposable_wrapper subscribe_with_disposable(observer< Type, ObserverStrategy > &&observer) const
Subscribes passed observer to emissions from this observable.
Definition observable.hpp:144
composite_disposable_wrapper subscribe_with_disposable(OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) const
Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
Definition observable.hpp:217
composite_disposable_wrapper subscribe(const composite_disposable_wrapper &d, OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) const
Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
Definition observable.hpp:269
void subscribe(OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) const
Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
Definition observable.hpp:185
composite_disposable_wrapper subscribe_with_disposable(dynamic_observer< Type > observer) const
Subscribe passed observer to emissions from this observable.
Definition observable.hpp:174
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition subscribe.hpp:27
Definition constraints.hpp:43
Definition fwd.hpp:80
Definition fwd.hpp:250
Definition constraints.hpp:31
Definition utils.hpp:48
auto make_lambda_observer(OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) -> lambda_observer< Type, std::decay_t< OnNext >, std::decay_t< OnError >, std::decay_t< OnCompleted > >
Constructs observer specialized with passed callbacks. Most easiesest way to construct observer "on t...
Definition lambda_observer.hpp:51
Definition lambda_observer.hpp:24
Definition functors.hpp:54