13#include <rpp/observables/fwd.hpp>
14#include <rpp/observers/fwd.hpp>
15#include <rpp/operators/fwd.hpp>
17#include <rpp/disposables/composite_disposable.hpp>
18#include <rpp/disposables/disposable_wrapper.hpp>
19#include <rpp/observers/observer.hpp>
20#include <rpp/utils/functors.hpp>
24namespace rpp::operators::details
26 template<
typename... Args>
29 template<rpp::constra
int::decayed_type Type, rpp::constra
int::observer_strategy<Type> ObserverStrategy>
30 class subscribe_t<
observer<Type, ObserverStrategy>>
38 template<rpp::constra
int::observable_strategy<Type> Strategy>
48 template<rpp::constra
int::observer_strategy_base ObserverStrategy>
49 class subscribe_t<ObserverStrategy>
52 explicit subscribe_t(ObserverStrategy&& observer_strategy)
53 : m_observer_strategy{std::move(observer_strategy)}
57 explicit subscribe_t(
const ObserverStrategy& observer_strategy)
58 : m_observer_strategy{observer_strategy}
62 template<rpp::constra
int::observable Observable>
63 void operator()(
const Observable&
observable)
const &
69 template<rpp::constra
int::observable Observable>
70 void operator()(
const Observable&
observable) &&
77 ObserverStrategy m_observer_strategy;
80 template<rpp::constra
int::decayed_type Type, rpp::constra
int::observer_strategy<Type> ObserverStrategy>
85 : m_disposable{std::move(d)}
90 template<rpp::constra
int::observable_strategy<Type> Strategy>
102 template<rpp::constra
int::observer_strategy_base ObserverStrategy>
107 : m_disposable{std::move(d)}
108 , m_observer_strategy{std::move(observer_strategy)}
113 : m_disposable{std::move(d)}
114 , m_observer_strategy{observer_strategy}
118 template<rpp::constra
int::observable Observable>
126 template<rpp::constra
int::observable Observable>
136 ObserverStrategy m_observer_strategy;
139 template<
typename OnNext, std::invocable<const std::exception_ptr&> OnError, std::invocable<> OnCompleted>
140 class subscribe_t<OnNext, OnError, OnCompleted>
143 template<rpp::constra
int::decayed_same_as<OnNext> TOnNext, rpp::constra
int::decayed_same_as<OnError> TOnError, rpp::constra
int::decayed_same_as<OnCompleted> TOnCompleted>
145 explicit subscribe_t(TOnNext&& on_next, TOnError&& on_error, TOnCompleted&& on_completed)
146 : m_on_next{std::forward<TOnNext>(on_next)}
147 , m_on_error{std::forward<TOnError>(on_error)}
148 , m_on_completed{std::forward<TOnCompleted>(on_completed)}
152 template<rpp::constra
int::decayed_type Type, rpp::constra
int::observable_strategy<Type> Strategy>
155 static_assert(std::invocable<OnNext, Type>,
"OnNext should be suitable for type of observable");
156 observable.
subscribe(std::move(m_on_next), std::move(m_on_error), std::move(m_on_completed));
159 template<rpp::constra
int::decayed_type Type, rpp::constra
int::observable_strategy<Type> Strategy>
162 static_assert(std::invocable<OnNext, Type>,
"OnNext should be suitable for type of observable");
167 RPP_NO_UNIQUE_ADDRESS OnNext m_on_next;
168 RPP_NO_UNIQUE_ADDRESS OnError m_on_error;
169 RPP_NO_UNIQUE_ADDRESS OnCompleted m_on_completed;
172 template<
typename OnNext, std::invocable<const std::exception_ptr&> OnError, std::invocable<> OnCompleted>
176 template<rpp::constra
int::decayed_same_as<OnNext> TOnNext, rpp::constra
int::decayed_same_as<OnError> TOnError, rpp::constra
int::decayed_same_as<OnCompleted> TOnCompleted>
178 : m_disposable{std::move(d)}
179 , m_on_next{std::forward<TOnNext>(on_next)}
180 , m_on_error{std::forward<TOnError>(on_error)}
181 , m_on_completed{std::forward<TOnCompleted>(on_completed)}
185 template<rpp::constra
int::decayed_type Type, rpp::constra
int::observable_strategy<Type> Strategy>
188 static_assert(std::invocable<OnNext, Type>,
"OnNext should be suitable for type of observable");
189 observable.
subscribe(m_disposable, std::move(m_on_next), std::move(m_on_error), std::move(m_on_completed));
190 return std::move(m_disposable);
193 template<rpp::constra
int::decayed_type Type, rpp::constra
int::observable_strategy<Type> Strategy>
196 static_assert(std::invocable<OnNext, Type>,
"OnNext should be suitable for type of observable");
203 RPP_NO_UNIQUE_ADDRESS OnNext m_on_next;
204 RPP_NO_UNIQUE_ADDRESS OnError m_on_error;
205 RPP_NO_UNIQUE_ADDRESS OnCompleted m_on_completed;
208 template<
typename... Args>
211 template<
typename OnNext>
215namespace rpp::operators
225 template<rpp::constra
int::decayed_type Type, rpp::constra
int::observer_strategy<Type> ObserverStrategy>
240 template<rpp::constra
int::decayed_type Type, rpp::constra
int::observer_strategy<Type> ObserverStrategy>
251 template<rpp::constra
int::decayed_type Type>
262 template<rpp::constra
int::observer_strategy_base ObserverStrategy>
278 template<rpp::constra
int::decayed_type Type>
293 template<rpp::constra
int::observer_strategy_base ObserverStrategy>
297 return details::subscribe_t{std::move(disposable), std::forward<ObserverStrategy>(observer_strategy)};
305 template<details::on_next_like OnNext = rpp::utils::empty_function_any_t, std::invocable<const std::exception_ptr&> OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
306 auto subscribe(OnNext&& on_next = {}, OnError&& on_error = {}, OnCompleted&& on_completed = {})
308 return details::subscribe_t{std::forward<OnNext>(on_next), std::forward<OnError>(on_error), std::forward<OnCompleted>(on_completed)};
316 template<details::on_next_like OnNext, std::invocable<> OnCompleted>
317 auto subscribe(OnNext&& on_next, OnCompleted&& on_completed)
331 template<details::on_next_like OnNext = rpp::utils::empty_function_any_t, std::invocable<const std::exception_ptr&> OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
334 return details::subscribe_t{std::move(d), std::forward<OnNext>(on_next), std::forward<OnError>(on_error), std::forward<OnCompleted>(on_completed)};
346 template<details::on_next_like OnNext, std::invocable<> OnCompleted>
362 template<rpp::constra
int::decayed_type Type, rpp::constra
int::observer_strategy<Type> ObserverStrategy>
375 template<rpp::constra
int::decayed_type Type>
388 template<details::on_next_like OnNext = rpp::utils::empty_function_any_t, std::invocable<const std::exception_ptr&> OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
401 template<details::on_next_like OnNext, std::invocable<> OnCompleted>
static disposable_wrapper_impl make(TArgs &&... args)
Definition disposable_wrapper.hpp:164
Type-erased version of the rpp::observer. Any observer can be converted to dynamic_observer via rpp::...
Definition dynamic_observer.hpp:129
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
void subscribe(observer< Type, ObserverStrategy > &&observer) const
Subscribes passed observer to emissions from this observable.
Definition observable.hpp:58
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition subscribe.hpp:27
Definition constraints.hpp:19
Concept defines requirements for an user-defined observer strategy.
Definition fwd.hpp:56
Definition subscribe.hpp:212
Definition function_traits.hpp:45
disposable_wrapper_impl< interface_composite_disposable > composite_disposable_wrapper
Wrapper to keep "composite" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:41
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226
auto subscribe_with_disposable(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:363
Definition functors.hpp:54