12#include <rpp/observables/fwd.hpp>
14#include <rpp/defs.hpp>
15#include <rpp/disposables/composite_disposable.hpp>
16#include <rpp/disposables/disposable_wrapper.hpp>
17#include <rpp/observables/details/chain_strategy.hpp>
18#include <rpp/observers/lambda_observer.hpp>
19#include <rpp/operators/subscribe.hpp>
36 template<constra
int::decayed_type Type, constra
int::observable_strategy<Type> Strategy>
40 using value_type = Type;
41 using strategy_type = Strategy;
43 using optimal_disposables_strategy =
typename Strategy::optimal_disposables_strategy;
45 template<
typename... Args>
47 observable(Args&&... args)
48 : m_strategy{std::forward<Args>(args)...}
57 template<constra
int::observer_strategy<Type> ObserverStrategy>
61 m_strategy.subscribe(std::move(
observer));
76 template<constra
int::observer_strategy<Type> ObserverStrategy>
78 void subscribe(ObserverStrategy&& observer_strategy)
const
80 if constexpr (std::decay_t<ObserverStrategy>::preferred_disposables_mode == rpp::details::observers::disposables_mode::Auto)
83 subscribe(
rpp::observer<Type, std::decay_t<ObserverStrategy>>{std::forward<ObserverStrategy>(observer_strategy)});
109 template<constra
int::observer_strategy<Type> ObserverStrategy>
112 if (!d.is_disposed())
126 template<constra
int::observer_strategy<Type> ObserverStrategy>
130 subscribe(observer_with_external_disposable<Type, std::decay_t<ObserverStrategy>>{d, std::forward<ObserverStrategy>(observer_strategy)});
143 template<constra
int::observer_strategy<Type> ObserverStrategy>
158 template<constra
int::observer_strategy<Type> ObserverStrategy>
182 template<std::invocable<Type> OnNext,
183 std::invocable<const std::exception_ptr&> OnError = rpp::utils::rethrow_error_t,
184 std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
186 OnError&& on_error = {},
187 OnCompleted&& on_completed = {})
const
192 std::forward<OnError>(on_error),
193 std::forward<OnCompleted>(on_completed)});
199 template<std::invocable<Type> OnNext,
200 std::invocable<> OnCompleted>
202 OnCompleted&& on_completed)
const
214 template<std::invocable<Type> OnNext,
215 std::invocable<const std::exception_ptr&> OnError = rpp::utils::rethrow_error_t,
216 std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
221 std::forward<OnNext>(on_next),
222 std::forward<OnError>(on_error),
223 std::forward<OnCompleted>(on_completed)));
234 template<std::invocable<Type> OnNext,
235 std::invocable<> OnCompleted>
266 template<std::invocable<Type> OnNext,
267 std::invocable<const std::exception_ptr&> OnError = rpp::utils::rethrow_error_t,
268 std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
271 if (!d.is_disposed())
273 std::forward<OnNext>(on_next),
274 std::forward<OnError>(on_error),
275 std::forward<OnCompleted>(on_completed)));
304 template<std::invocable<Type> OnNext,
305 std::invocable<> OnCompleted>
321 template<
typename Subscribe>
323 auto operator|(Subscribe&& op)
const
325 return std::forward<Subscribe>(op)(*this);
328 template<
typename Op>
332 if constexpr (
requires {
typename std::decay_t<Op>::template operator_traits<Type>; })
334 using result_type =
typename std::decay_t<Op>::template operator_traits<Type>::result_type;
335 if constexpr (
requires {
typename std::decay_t<Op>::template operator_traits<Type>::result_type; })
336 return observable<result_type, details::observables::make_chain_t<std::decay_t<Op>, Strategy>>{std::forward<Op>(op), m_strategy};
340 return std::forward<Op>(op)(*this);
344 template<
typename Op>
345 requires (!rpp::utils::is_base_of_v<std::decay_t<Op>, rpp::operators::details::subscribe_t>)
346 rpp::constraint::observable
auto operator|(Op&& op) &&
348 if constexpr (
requires {
typename std::decay_t<Op>::template operator_traits<Type>; })
350 using result_type =
typename std::decay_t<Op>::template operator_traits<Type>::result_type;
351 if constexpr (
requires {
typename std::decay_t<Op>::template operator_traits<Type>::result_type; })
352 return observable<result_type, details::observables::make_chain_t<std::decay_t<Op>, Strategy>>{std::forward<Op>(op), std::move(m_strategy)};
356 return std::forward<Op>(op)(std::move(*
this));
360 template<
typename Op>
361 auto pipe(Op&& op)
const &
363 return *
this | std::forward<Op>(op);
366 template<
typename Op>
367 auto pipe(Op&& op) &&
369 return std::move(*
this) | std::forward<Op>(op);
373 RPP_NO_UNIQUE_ADDRESS Strategy m_strategy;
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:31
bool is_disposed() const noexcept
Observable calls this method to check if observer interested or not in emissions.
Definition observer.hpp:74
static disposable_wrapper_impl make(TArgs &&... args)
Definition disposable_wrapper.hpp:164
static disposable_wrapper_impl empty()
Definition disposable_wrapper.hpp:178
Type-erased version of the rpp::observable. Any observable can be converted to dynamic_observable via...
Definition dynamic_observable.hpp:89
Type-erased version of the rpp::observer. Any observer can be converted to dynamic_observer via rpp::...
Definition dynamic_observer.hpp:129
composite_disposable_wrapper subscribe(const composite_disposable_wrapper &d, OnNext &&on_next, OnCompleted &&on_completed) const
Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
Definition observable.hpp:306
composite_disposable_wrapper subscribe(const composite_disposable_wrapper &d, observer< Type, ObserverStrategy > &&obs) const
Subscribe passed observer to emissions from this observable.
Definition observable.hpp:110
void subscribe(observer< Type, ObserverStrategy > &&observer) const
Subscribes passed observer to emissions from this observable.
Definition observable.hpp:58
composite_disposable_wrapper subscribe(const composite_disposable_wrapper &d, ObserverStrategy &&observer_strategy) const
Subscribes passed observer strategy to emissions from this observable via construction of observer.
Definition observable.hpp:128
auto as_dynamic() &&
Convert observable to type-erased version.
Definition observable.hpp:319
composite_disposable_wrapper subscribe_with_disposable(ObserverStrategy &&observer_strategy) const
Subscribes observer strategy to emissions from this observable.
Definition observable.hpp:160
void subscribe(dynamic_observer< Type > observer) const
Subscribe passed observer to emissions from this observable.
Definition observable.hpp:68
composite_disposable_wrapper subscribe_with_disposable(OnNext &&on_next, OnCompleted &&on_completed) const
Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
Definition observable.hpp:236
auto as_dynamic() const &
Convert observable to type-erased version.
Definition observable.hpp:314
void subscribe(ObserverStrategy &&observer_strategy) const
Subscribes passed observer strategy to emissions from this observable via construction of observer.
Definition observable.hpp:78
void subscribe(OnNext &&on_next, OnCompleted &&on_completed) const
Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
Definition observable.hpp:201
composite_disposable_wrapper subscribe_with_disposable(observer< Type, ObserverStrategy > &&observer) const
Subscribes passed observer to emissions from this observable.
Definition observable.hpp:144
composite_disposable_wrapper subscribe_with_disposable(OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) const
Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
Definition observable.hpp:217
composite_disposable_wrapper subscribe(const composite_disposable_wrapper &d, OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) const
Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
Definition observable.hpp:269
void subscribe(OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) const
Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
Definition observable.hpp:185
composite_disposable_wrapper subscribe_with_disposable(dynamic_observer< Type > observer) const
Subscribe passed observer to emissions from this observable.
Definition observable.hpp:174
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition subscribe.hpp:27
Definition constraints.hpp:43
Definition constraints.hpp:31
disposable_wrapper_impl< interface_composite_disposable > composite_disposable_wrapper
Wrapper to keep "composite" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:41
auto make_lambda_observer(OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) -> lambda_observer< Type, std::decay_t< OnNext >, std::decay_t< OnError >, std::decay_t< OnCompleted > >
Constructs observer specialized with passed callbacks. Most easiesest way to construct observer "on t...
Definition lambda_observer.hpp:51
Definition lambda_observer.hpp:24
Definition functors.hpp:54