ReactivePlusPlus
ReactiveX implementation for C++20
|
Utility operators are operators that provide some extra functionality without changing of original values, but changing of behaviour. More...
Functions | |
auto | rpp::operators::as_blocking () |
Converts rpp::observable to rpp::blocking_observable | |
template<rpp::schedulers::constraint::scheduler Scheduler> | |
auto | rpp::operators::debounce (rpp::schedulers::duration period, Scheduler &&scheduler) |
Only emit emission if specified period of time has passed without any other emission. On each new emission timer reset. | |
template<rpp::schedulers::constraint::scheduler Scheduler> | |
auto | rpp::operators::delay (rpp::schedulers::duration delay_duration, Scheduler &&scheduler) |
Shift the emissions from an Observable forward in time by a particular amount. | |
template<rpp::constraint::is_nothrow_invocable LastFn> | |
auto | rpp::operators::finally (LastFn &&last_fn) |
Register callback to be called when execution is done and disposable bound to observer is disposed. | |
template<rpp::schedulers::constraint::scheduler Scheduler> | |
auto | rpp::operators::observe_on (Scheduler &&scheduler, rpp::schedulers::duration delay_duration) |
Specify the Scheduler on which an observer will observe this Observable. | |
auto | rpp::operators::repeat (size_t count) |
Repeats the Observabe's sequence of emissions count times via re-subscribing on it during on_completed call while count not reached. | |
auto | rpp::operators::repeat () |
Repeats the Observabe's sequence of emissions infinite amount of times via re-subscribing on it during on_completed . | |
template<rpp::constraint::decayed_type Type, rpp::constraint::observer_strategy< Type > ObserverStrategy> | |
auto | rpp::operators::subscribe (observer< Type, ObserverStrategy > &&observer) |
Subscribes passed observer to emissions from this observable. | |
template<rpp::constraint::decayed_type Type, rpp::constraint::observer_strategy< Type > ObserverStrategy> | |
auto | rpp::operators::subscribe (rpp::composite_disposable_wrapper disposable, observer< Type, ObserverStrategy > &&observer) |
Subscribe passed observer to emissions from observable. | |
template<rpp::constraint::decayed_type Type> | |
auto | rpp::operators::subscribe (dynamic_observer< Type > observer) |
Subscribes passed observer to emissions from this observable. | |
template<rpp::constraint::observer_strategy_base ObserverStrategy> requires (!constraint::observer<ObserverStrategy>) | |
auto | rpp::operators::subscribe (ObserverStrategy &&observer_strategy) |
Subscribes passed observer strategy to emissions from this observable via construction of observer. | |
template<rpp::constraint::decayed_type Type> | |
auto | rpp::operators::subscribe (rpp::composite_disposable_wrapper disposable, dynamic_observer< Type > observer) |
Subscribe passed observer to emissions from observable. | |
template<rpp::constraint::observer_strategy_base ObserverStrategy> requires (!constraint::observer<ObserverStrategy>) | |
auto | rpp::operators::subscribe (rpp::composite_disposable_wrapper disposable, ObserverStrategy &&observer_strategy) |
Subscribes passed observer strategy to emissions from this observable via construction of observer. | |
template<details::on_next_like OnNext = rpp::utils::empty_function_any_t, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> | |
auto | rpp::operators::subscribe (OnNext &&on_next={}, OnError &&on_error={}, OnCompleted &&on_completed={}) |
Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable. | |
template<details::on_next_like OnNext, std::invocable<> OnCompleted> | |
auto | rpp::operators::subscribe (OnNext &&on_next, OnCompleted &&on_completed) |
Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable. | |
template<details::on_next_like OnNext = rpp::utils::empty_function_any_t, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> | |
auto | rpp::operators::subscribe (rpp::composite_disposable_wrapper d, OnNext &&on_next={}, OnError &&on_error={}, OnCompleted &&on_completed={}) |
Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable. | |
template<details::on_next_like OnNext, std::invocable<> OnCompleted> | |
auto | rpp::operators::subscribe (rpp::composite_disposable_wrapper d, OnNext &&on_next, OnCompleted &&on_completed) |
Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable. | |
template<rpp::constraint::decayed_type Type, rpp::constraint::observer_strategy< Type > ObserverStrategy> | |
auto | rpp::operators::subscribe_with_disposable (observer< Type, ObserverStrategy > &&observer) |
Subscribes passed observer to emissions from this observable. | |
template<rpp::constraint::decayed_type Type> | |
auto | rpp::operators::subscribe_with_disposable (dynamic_observer< Type > observer) |
Subscribes passed observer to emissions from this observable. | |
template<details::on_next_like OnNext = rpp::utils::empty_function_any_t, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> | |
auto | rpp::operators::subscribe_with_disposable (OnNext &&on_next={}, OnError &&on_error={}, OnCompleted &&on_completed={}) |
Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable. | |
template<details::on_next_like OnNext, std::invocable<> OnCompleted> | |
auto | rpp::operators::subscribe_with_disposable (OnNext &&on_next, OnCompleted &&on_completed) |
Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable. | |
template<rpp::schedulers::constraint::scheduler Scheduler> | |
auto | rpp::operators::subscribe_on (Scheduler &&scheduler) |
OnSubscribe function for this observable will be scheduled via provided scheduler. | |
template<std::invocable< const std::exception_ptr & > OnError = rpp::utils::empty_function_t<std::exception_ptr>> requires utils::is_not_template_callable<OnError> | |
auto | rpp::operators::tap (OnError &&on_error) |
Register callbacks to inspect observable emissions and perform side-effects. | |
template<std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> | |
auto | rpp::operators::tap (OnCompleted &&on_completed) |
Register callbacks to inspect observable emissions and perform side-effects. | |
template<typename OnNext, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> | |
auto | rpp::operators::tap (OnNext &&on_next, OnCompleted &&on_completed) |
Register callbacks to inspect observable emissions and perform side-effects. | |
template<typename OnNext = rpp::utils::empty_function_any_t, std::invocable< const std::exception_ptr & > OnError = rpp::utils::empty_function_t<std::exception_ptr>, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> requires utils::is_not_template_callable<OnError> | |
auto | rpp::operators::tap (OnNext &&on_next, OnError &&on_error, OnCompleted &&on_completed) |
Register callbacks to inspect observable emissions and perform side-effects. | |
template<rpp::constraint::observable TFallbackObservable, rpp::schedulers::constraint::scheduler TScheduler> | |
auto | rpp::operators::timeout (rpp::schedulers::duration period, TFallbackObservable &&fallback_observable, const TScheduler &scheduler) |
Forwards emissions from original observable, but subscribes on fallback observable if no any events during specified period of time (since last emission) | |
template<rpp::schedulers::constraint::scheduler TScheduler> | |
auto | rpp::operators::timeout (rpp::schedulers::duration period, const TScheduler &scheduler) |
Forwards emissions from original observable, but emit error if no any events during specified period of time (since last emission) | |
Utility operators are operators that provide some extra functionality without changing of original values, but changing of behaviour.
|
inline |
Converts rpp::observable
to rpp::blocking_observable
rpp::blocking_observable
blocks subscribe
call till on_completed/on_error happens.
auto rpp::operators::debounce | ( | rpp::schedulers::duration | period, |
Scheduler && | scheduler ) |
Only emit emission if specified period of time has passed without any other emission. On each new emission timer reset.
Actually this operator resets time of last emission, schedules action to send this emission after specified period if no any new emissions till this moment.
period | is duration of time should be passed since emission from original observable without any new emissions to emit this emission. |
scheduler | is scheduler used to run timer for debounce |
#include <rpp/operators/debounce.hpp>
auto rpp::operators::delay | ( | rpp::schedulers::duration | delay_duration, |
Scheduler && | scheduler ) |
Shift the emissions from an Observable forward in time by a particular amount.
The delay operator modifies its source Observable by pausing for a particular increment of time (that you specify) before emitting each of the source Observable’s items. This has the effect of shifting the entire sequence of items emitted by the Observable forward in time by that specified increment.
Actually this operator just schedules emissions via provided scheduler with provided delay_duration.
on_error
immediately, use observe_on
instead.delay_duration | is the delay duration for emitting items. Delay duration should be able to cast to rpp::schedulers::duration. |
scheduler | provides the threading model for delay. e.g. With a new thread scheduler, the observer sees the values in a new thread after a delay duration to the subscription. |
#include <rpp/operators/delay.hpp>
auto rpp::operators::finally | ( | LastFn && | last_fn | ) |
Register callback to be called when execution is done and disposable bound to observer is disposed.
last_fn | action callback |
#include <rpp/operators/finally.hpp>
action callback needs to be noexcept as it is called on dispose, throwing during this time could potentially break internal disposable state.
auto rpp::operators::observe_on | ( | Scheduler && | scheduler, |
rpp::schedulers::duration | delay_duration ) |
Specify the Scheduler on which an observer will observe this Observable.
The observe_on operator modifies its source Observable by emitting all emissions via provided scheduler, so, all emissions/callbacks happens via scheduler.
Actually this operator is just delay
, but in case of obtaining on_error
this operator cancels all scheduled but not emited emissions and forward error immediately. In case of you need to delay also on_error
, use delay
instead.
scheduler | provides the threading model for delay. e.g. With a new thread scheduler, the observer sees the values in a new thread after a delay duration to the subscription. |
delay_duration | is the delay duration for emitting items. Delay duration should be able to cast to rpp::schedulers::duration. |
#include <rpp/operators/observe_on.hpp>
|
inline |
Repeats the Observabe's sequence of emissions infinite amount of times via re-subscribing on it during on_completed
.
Actually this operator is kind of concat(obs, obs...)
#include <rpp/operators/repeat.hpp>
|
inline |
Repeats the Observabe's sequence of emissions count
times via re-subscribing on it during on_completed
call while count
not reached.
Actually this operator is kind of concat(obs, obs...)
where obs repeated count
times
count | total amount of times subscription happens. For example:
|
#include <rpp/operators/repeat.hpp>
auto rpp::operators::subscribe | ( | observer< Type, ObserverStrategy > && | observer | ) |
Subscribes passed observer to emissions from this observable.
auto rpp::operators::subscribe | ( | rpp::composite_disposable_wrapper | d, |
OnNext && | on_next, | ||
OnCompleted && | on_completed ) |
Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable.
This overloading attaches passed disposable to observer and return it to provide ability to dispose observer early if needed.
d | is disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens |
auto rpp::operators::subscribe | ( | rpp::composite_disposable_wrapper | d, |
OnNext && | on_next = {}, | ||
OnError && | on_error = {}, | ||
OnCompleted && | on_completed = {} ) |
Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable.
This overloading attaches passed disposable to observer and return it to provide ability to dispose observer early if needed.
d | is disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens |
auto rpp::operators::subscribe | ( | rpp::composite_disposable_wrapper | disposable, |
dynamic_observer< Type > | observer ) |
Subscribe passed observer to emissions from observable.
This overloading attaches passed disposable to observer and return it to provide ability to dispose observer early if needed.
disposable | is disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens |
auto rpp::operators::subscribe | ( | rpp::composite_disposable_wrapper | disposable, |
observer< Type, ObserverStrategy > && | observer ) |
Subscribe passed observer to emissions from observable.
This overloading attaches passed disposable to observer and return it to provide ability to dispose observer early if needed.
disposable | is disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens |
auto rpp::operators::subscribe | ( | rpp::composite_disposable_wrapper | disposable, |
ObserverStrategy && | observer_strategy ) |
Subscribes passed observer strategy to emissions from this observable via construction of observer.
This overloading attaches passed disposable to observer and return it to provide ability to dispose observer early if needed.
disposable | is disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens |
auto rpp::operators::subscribe_on | ( | Scheduler && | scheduler | ) |
OnSubscribe function for this observable will be scheduled via provided scheduler.
Actually this operator just schedules subscription on original observable to provided scheduler
scheduler | is scheduler used for scheduling of OnSubscribe |
#include <rpp/operators/subscribe_on.hpp>
auto rpp::operators::subscribe_with_disposable | ( | dynamic_observer< Type > | observer | ) |
Subscribes passed observer to emissions from this observable.
This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.
auto rpp::operators::subscribe_with_disposable | ( | observer< Type, ObserverStrategy > && | observer | ) |
Subscribes passed observer to emissions from this observable.
This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.
auto rpp::operators::subscribe_with_disposable | ( | OnNext && | on_next, |
OnCompleted && | on_completed ) |
Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable.
This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.
auto rpp::operators::subscribe_with_disposable | ( | OnNext && | on_next = {}, |
OnError && | on_error = {}, | ||
OnCompleted && | on_completed = {} ) |
Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable.
This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.
auto rpp::operators::tap | ( | OnCompleted && | on_completed | ) |
Register callbacks to inspect observable emissions and perform side-effects.
on_completed | completion handler |
auto rpp::operators::tap | ( | OnError && | on_error | ) |
Register callbacks to inspect observable emissions and perform side-effects.
on_error | error handler |
auto rpp::operators::tap | ( | OnNext && | on_next, |
OnCompleted && | on_completed ) |
Register callbacks to inspect observable emissions and perform side-effects.
on_next | next handler |
on_completed | completion handler |
auto rpp::operators::tap | ( | OnNext && | on_next, |
OnError && | on_error, | ||
OnCompleted && | on_completed ) |
Register callbacks to inspect observable emissions and perform side-effects.
on_next | next handler |
on_error | error handler |
on_completed | completion handler |
auto rpp::operators::timeout | ( | rpp::schedulers::duration | period, |
const TScheduler & | scheduler ) |
Forwards emissions from original observable, but emit error if no any events during specified period of time (since last emission)
period | is maximum duration between emitted items before a timeout occurs |
scheduler | is scheduler used to run timer for timeout |
#include <rpp/operators/timeout.hpp>
auto rpp::operators::timeout | ( | rpp::schedulers::duration | period, |
TFallbackObservable && | fallback_observable, | ||
const TScheduler & | scheduler ) |
Forwards emissions from original observable, but subscribes on fallback observable if no any events during specified period of time (since last emission)
period | is maximum duration between emitted items before a timeout occurs |
fallback_observable | is observable to subscribe on when timeout reached |
scheduler | is scheduler used to run timer for timeout |
#include <rpp/operators/timeout.hpp>