ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
Utility Operators

Utility operators are operators that provide some extra functionality without changing of original values, but changing of behaviour. More...

Functions

auto rpp::operators::as_blocking ()
 Converts rpp::observable to rpp::blocking_observable
 
template<rpp::schedulers::constraint::scheduler Scheduler>
auto rpp::operators::debounce (rpp::schedulers::duration period, Scheduler &&scheduler)
 Only emit emission if specified period of time has passed without any other emission. On each new emission timer reset.
 
template<rpp::schedulers::constraint::scheduler Scheduler>
auto rpp::operators::delay (rpp::schedulers::duration delay_duration, Scheduler &&scheduler)
 Shift the emissions from an Observable forward in time by a particular amount.
 
template<std::invocable<> LastFn>
auto rpp::operators::finally (LastFn &&last_fn)
 Register callback to be called when execution is done and disposable bound to observer is disposed.
 
template<rpp::schedulers::constraint::scheduler Scheduler>
auto rpp::operators::observe_on (Scheduler &&scheduler, rpp::schedulers::duration delay_duration)
 Specify the Scheduler on which an observer will observe this Observable.
 
auto rpp::operators::repeat (size_t count)
 Repeats the Observabe's sequence of emissions count times via re-subscribing on it during on_completed call while count not reached.
 
auto rpp::operators::repeat ()
 Repeats the Observabe's sequence of emissions infinite amount of times via re-subscribing on it during on_completed.
 
template<rpp::constraint::decayed_type Type, rpp::constraint::observer_strategy< Type > ObserverStrategy>
auto rpp::operators::subscribe (observer< Type, ObserverStrategy > &&observer)
 Subscribes passed observer to emissions from this observable.
 
template<rpp::constraint::decayed_type Type, rpp::constraint::observer_strategy< Type > ObserverStrategy>
auto rpp::operators::subscribe (rpp::composite_disposable_wrapper disposable, observer< Type, ObserverStrategy > &&observer)
 Subscribe passed observer to emissions from observable.
 
template<rpp::constraint::decayed_type Type>
auto rpp::operators::subscribe (dynamic_observer< Type > observer)
 Subscribes passed observer to emissions from this observable.
 
template<rpp::constraint::observer_strategy_base ObserverStrategy>
requires (!constraint::observer<ObserverStrategy>)
auto rpp::operators::subscribe (ObserverStrategy &&observer_strategy)
 Subscribes passed observer strategy to emissions from this observable via construction of observer.
 
template<rpp::constraint::decayed_type Type>
auto rpp::operators::subscribe (rpp::composite_disposable_wrapper disposable, dynamic_observer< Type > observer)
 Subscribe passed observer to emissions from observable.
 
template<rpp::constraint::observer_strategy_base ObserverStrategy>
requires (!constraint::observer<ObserverStrategy>)
auto rpp::operators::subscribe (rpp::composite_disposable_wrapper disposable, ObserverStrategy &&observer_strategy)
 Subscribes passed observer strategy to emissions from this observable via construction of observer.
 
template<details::on_next_like OnNext = rpp::utils::empty_function_any_t, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
auto rpp::operators::subscribe (OnNext &&on_next={}, OnError &&on_error={}, OnCompleted &&on_completed={})
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable.
 
template<details::on_next_like OnNext, std::invocable<> OnCompleted>
auto rpp::operators::subscribe (OnNext &&on_next, OnCompleted &&on_completed)
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable.
 
template<details::on_next_like OnNext = rpp::utils::empty_function_any_t, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
auto rpp::operators::subscribe (rpp::composite_disposable_wrapper d, OnNext &&on_next={}, OnError &&on_error={}, OnCompleted &&on_completed={})
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable.
 
template<details::on_next_like OnNext, std::invocable<> OnCompleted>
auto rpp::operators::subscribe (rpp::composite_disposable_wrapper d, OnNext &&on_next, OnCompleted &&on_completed)
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable.
 
template<rpp::constraint::decayed_type Type, rpp::constraint::observer_strategy< Type > ObserverStrategy>
auto rpp::operators::subscribe_with_disposable (observer< Type, ObserverStrategy > &&observer)
 Subscribes passed observer to emissions from this observable.
 
template<rpp::constraint::decayed_type Type>
auto rpp::operators::subscribe_with_disposable (dynamic_observer< Type > observer)
 Subscribes passed observer to emissions from this observable.
 
template<details::on_next_like OnNext = rpp::utils::empty_function_any_t, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
auto rpp::operators::subscribe_with_disposable (OnNext &&on_next={}, OnError &&on_error={}, OnCompleted &&on_completed={})
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable.
 
template<details::on_next_like OnNext, std::invocable<> OnCompleted>
auto rpp::operators::subscribe_with_disposable (OnNext &&on_next, OnCompleted &&on_completed)
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable.
 
template<rpp::schedulers::constraint::scheduler Scheduler>
auto rpp::operators::subscribe_on (Scheduler &&scheduler)
 OnSubscribe function for this observable will be scheduled via provided scheduler.
 
template<std::invocable< const std::exception_ptr & > OnError = rpp::utils::empty_function_t<std::exception_ptr>>
auto rpp::operators::tap (OnError &&on_error)
 Register callbacks to inspect observable emissions and perform side-effects.
 
template<std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
auto rpp::operators::tap (OnCompleted &&on_completed)
 Register callbacks to inspect observable emissions and perform side-effects.
 
template<typename OnNext , std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
auto rpp::operators::tap (OnNext &&on_next, OnCompleted &&on_completed)
 Register callbacks to inspect observable emissions and perform side-effects.
 
template<typename OnNext = rpp::utils::empty_function_any_t, std::invocable< const std::exception_ptr & > OnError = rpp::utils::empty_function_t<std::exception_ptr>, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
auto rpp::operators::tap (OnNext &&on_next, OnError &&on_error, OnCompleted &&on_completed)
 Register callbacks to inspect observable emissions and perform side-effects.
 
template<rpp::constraint::observable TFallbackObservable, rpp::schedulers::constraint::scheduler TScheduler>
auto rpp::operators::timeout (rpp::schedulers::duration period, TFallbackObservable &&fallback_observable, const TScheduler &scheduler)
 Forwards emissions from original observable, but subscribes on fallback observable if no any events during specified period of time (since last emission)
 
template<rpp::schedulers::constraint::scheduler TScheduler>
auto rpp::operators::timeout (rpp::schedulers::duration period, const TScheduler &scheduler)
 Forwards emissions from original observable, but emit error if no any events during specified period of time (since last emission)
 

Detailed Description

Utility operators are operators that provide some extra functionality without changing of original values, but changing of behaviour.

See also
https://reactivex.io/documentation/operators.html#utility

Function Documentation

◆ as_blocking()

auto rpp::operators::as_blocking ( )
inline

Converts rpp::observable to rpp::blocking_observable

rpp::blocking_observable blocks subscribe call till on_completed/on_error happens.

Example:
| rpp::operators::delay(std::chrono::seconds{1}, rpp::schedulers::new_thread{}) // <-- emit from another thread with delay
| rpp::operators::subscribe([](int) {}, []() { std::cout << "COMPLETED" << std::endl; });
std::cout << "done" << std::endl;
// output: COMPLETED done
Examples
as_blocking.cpp, delay.cpp, observe_on.cpp, subscribe_on.cpp, thread_pool.cpp, and timeout.cpp.

◆ debounce()

auto rpp::operators::debounce ( rpp::schedulers::duration period,
Scheduler && scheduler )

Only emit emission if specified period of time has passed without any other emission. On each new emission timer reset.

Actually this operator resets time of last emission, schedules action to send this emission after specified period if no any new emissions till this moment.

Parameters
periodis duration of time should be passed since emission from original observable without any new emissions to emit this emission.
scheduleris scheduler used to run timer for debounce
Warning
#include <rpp/operators/debounce.hpp>
Example
auto start = rpp::schedulers::clock_type::now();
return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds(500) * v, rpp::schedulers::current_thread{});
})
| rpp::operators::filter([&](int v) {
std::cout << "> Sent value " << v << " at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl;
return true;
})
| rpp::operators::debounce(std::chrono::milliseconds{700}, rpp::schedulers::current_thread{})
| rpp::operators::subscribe([&](int v) { std::cout << ">>> new value " << v << " at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl; },
[](const std::exception_ptr&) {},
[&]() { std::cout << ">>> completed at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl; });
// Output:
// > Sent value 1 at 500
// > Sent value 2 at 1000
// >>> new value 2 at 1700
// > Sent value 5 at 2500
// > Sent value 6 at 3000
// >>> new value 6 at 3700
// > Sent value 9 at 4500
// > Sent value 10 at 5000
// >>> new value 10 at 5000
// >>> completed at 5000
See also
https://reactivex.io/documentation/operators/debounce.html
Examples
debounce.cpp.

◆ delay()

auto rpp::operators::delay ( rpp::schedulers::duration delay_duration,
Scheduler && scheduler )

Shift the emissions from an Observable forward in time by a particular amount.

The delay operator modifies its source Observable by pausing for a particular increment of time (that you specify) before emitting each of the source Observable’s items. This has the effect of shifting the entire sequence of items emitted by the Observable forward in time by that specified increment.

Actually this operator just schedules emissions via provided scheduler with provided delay_duration.

Warning
on_error/on_completed invoking also would be delayed as any other emissions, so, WHOLE observable would be shifter. If you want to obtain on_error immediately, use observe_on instead.
Parameters
delay_durationis the delay duration for emitting items. Delay duration should be able to cast to rpp::schedulers::duration.
schedulerprovides the threading model for delay. e.g. With a new thread scheduler, the observer sees the values in a new thread after a delay duration to the subscription.
Warning
#include <rpp/operators/delay.hpp>
Examples
auto start = rpp::schedulers::clock_type::now();
rpp::source::create<int>([&start](const auto& obs) {
for (int i = 0; i < 3; ++i)
{
auto emitting_time = rpp::schedulers::clock_type::now();
std::cout << "emit " << i << " in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast<std::chrono::seconds>(emitting_time - start).count() << "s" << std::endl;
obs.on_next(i);
std::this_thread::sleep_for(std::chrono::seconds{1});
}
auto emitting_time = rpp::schedulers::clock_type::now();
std::cout << "emit error in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast<std::chrono::seconds>(emitting_time - start).count() << "s" << std::endl;
obs.on_error({});
})
| rpp::operators::delay(std::chrono::seconds{3}, rpp::schedulers::new_thread{})
auto observing_time = rpp::schedulers::clock_type::now();
std::cout << "observe " << v << " in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast<std::chrono::seconds>(observing_time - start).count() <<"s" << std::endl; },
[&](const std::exception_ptr&) {
auto observing_time = rpp::schedulers::clock_type::now();
std::cout << "observe error in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast<std::chrono::seconds>(observing_time - start).count() << "s" << std::endl;
});
// Template for output:
// emit 0 in thread{139855196489600} duration since start 0s
// emit 1 in thread{139855196489600} duration since start 1s
// emit 2 in thread{139855196489600} duration since start 2s
// observe 0 in thread{139855196485184} duration since start 3s
// emit error in thread{139855196489600} duration since start 3s
// observe 1 in thread{139855196485184} duration since start 4s
// observe 2 in thread{139855196485184} duration since start 5s
// observe error in thread{139855196485184} duration since start 6s
See also
https://reactivex.io/documentation/operators/delay.html
Examples
as_blocking.cpp, debounce.cpp, delay.cpp, thread_pool.cpp, throttle.cpp, and timeout.cpp.

◆ finally()

template<std::invocable<> LastFn>
auto rpp::operators::finally ( LastFn && last_fn)

Register callback to be called when execution is done and disposable bound to observer is disposed.

Parameters
last_fnaction callback
Warning
#include <rpp/operators/finally.hpp>

action callback needs to be noexcept as it is called on dispose, throwing during this time could potentially break internal disposable state.

See also
https://reactivex.io/documentation/operators/do.html

◆ observe_on()

auto rpp::operators::observe_on ( Scheduler && scheduler,
rpp::schedulers::duration delay_duration )

Specify the Scheduler on which an observer will observe this Observable.

The observe_on operator modifies its source Observable by emitting all emissions via provided scheduler, so, all emissions/callbacks happens via scheduler.

Actually this operator is just delay, but in case of obtaining on_error this operator cancels all scheduled but not emited emissions and forward error immediately. In case of you need to delay also on_error, use delay instead.

Parameters
schedulerprovides the threading model for delay. e.g. With a new thread scheduler, the observer sees the values in a new thread after a delay duration to the subscription.
delay_durationis the delay duration for emitting items. Delay duration should be able to cast to rpp::schedulers::duration.
Warning
#include <rpp/operators/observe_on.hpp>
Examples
auto start = rpp::schedulers::clock_type::now();
rpp::source::create<int>([&start](const auto& obs) {
for (int i = 0; i < 3; ++i)
{
auto emitting_time = rpp::schedulers::clock_type::now();
std::cout << "emit " << i << " in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast<std::chrono::seconds>(emitting_time - start).count() << "s" << std::endl;
obs.on_next(i);
std::this_thread::sleep_for(std::chrono::seconds{1});
}
auto emitting_time = rpp::schedulers::clock_type::now();
std::cout << "emit error in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast<std::chrono::seconds>(emitting_time - start).count() << "s" << std::endl;
obs.on_error({});
})
auto observing_time = rpp::schedulers::clock_type::now();
std::cout << "observe " << v << " in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast<std::chrono::seconds>(observing_time - start).count() <<"s" << std::endl; },
[&](const std::exception_ptr&) {
auto observing_time = rpp::schedulers::clock_type::now();
std::cout << "observe error in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast<std::chrono::seconds>(observing_time - start).count() << "s" << std::endl;
});
// Template for output:
// emit 0 in thread{139800298538880} duration since start 0s
// emit 1 in thread{139800298538880} duration since start 1s
// emit 2 in thread{139800298538880} duration since start 2s
// observe 0 in thread{139800298534464} duration since start 3s
// emit error in thread{139800298538880} duration since start 3s
// observe error in thread{139800298538880} duration since start 3s
See also
https://reactivex.io/documentation/operators/observeon.html
Examples
observe_on.cpp.

◆ repeat() [1/2]

auto rpp::operators::repeat ( )
inline

Repeats the Observabe's sequence of emissions infinite amount of times via re-subscribing on it during on_completed.

Actually this operator is kind of concat(obs, obs...)

Warning
#include <rpp/operators/repeat.hpp>
Examples:
| rpp::operators::subscribe([](int v) { std::cout << v << " "; },
[](const std::exception_ptr&) {},
[]() {
std::cout << "completed" << std::endl;
});
// Output: 1 2 3 1 2 3 1 2 3 1 completed
See also
https://reactivex.io/documentation/operators/repeat.html
Examples
repeat.cpp.

◆ repeat() [2/2]

auto rpp::operators::repeat ( size_t count)
inline

Repeats the Observabe's sequence of emissions count times via re-subscribing on it during on_completed call while count not reached.

Actually this operator is kind of concat(obs, obs...) where obs repeated count times

Parameters
counttotal amount of times subscription happens. For example:
  • repeat(0) - means no any subscription at all
  • repeat(1) - behave like ordinal observable
  • repeat(10) - 1 normal subscription and 9 re-subscriptions during on_completed
Warning
#include <rpp/operators/repeat.hpp>
Examples:
| rpp::operators::subscribe([](int v) { std::cout << v << " "; },
[](const std::exception_ptr&) {},
[]() { std::cout << "completed" << std::endl; });
// Output: 1 2 3 1 2 3 completed
See also
https://reactivex.io/documentation/operators/repeat.html

◆ subscribe() [1/6]

template<rpp::constraint::decayed_type Type, rpp::constraint::observer_strategy< Type > ObserverStrategy>
auto rpp::operators::subscribe ( observer< Type, ObserverStrategy > && observer)

Subscribes passed observer to emissions from this observable.

Warning
Observer must be moved in to subscribe method. (Not recommended) If you need to copy observer, convert it to dynamic_observer
Examples
as_blocking.cpp, concat.cpp, debounce.cpp, delay.cpp, distinct_until_changed.cpp, filter.cpp, first.cpp, group_by.cpp, interval.cpp, last.cpp, map.cpp, merge.cpp, observe_on.cpp, reduce.cpp, repeat.cpp, scan.cpp, skip.cpp, subscribe_on.cpp, take.cpp, take_while.cpp, thread_pool.cpp, throttle.cpp, timeout.cpp, window.cpp, window_toggle.cpp, and with_latest_from.cpp.

◆ subscribe() [2/6]

template<details::on_next_like OnNext, std::invocable<> OnCompleted>
auto rpp::operators::subscribe ( rpp::composite_disposable_wrapper d,
OnNext && on_next,
OnCompleted && on_completed )

Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable.

This overloading attaches passed disposable to observer and return it to provide ability to dispose observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Parameters
dis disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens

◆ subscribe() [3/6]

template<details::on_next_like OnNext = rpp::utils::empty_function_any_t, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
auto rpp::operators::subscribe ( rpp::composite_disposable_wrapper d,
OnNext && on_next = {},
OnError && on_error = {},
OnCompleted && on_completed = {} )

Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable.

This overloading attaches passed disposable to observer and return it to provide ability to dispose observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Parameters
dis disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens

◆ subscribe() [4/6]

template<rpp::constraint::decayed_type Type>
auto rpp::operators::subscribe ( rpp::composite_disposable_wrapper disposable,
dynamic_observer< Type > observer )

Subscribe passed observer to emissions from observable.

This overloading attaches passed disposable to observer and return it to provide ability to dispose observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Parameters
dis disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens

◆ subscribe() [5/6]

template<rpp::constraint::decayed_type Type, rpp::constraint::observer_strategy< Type > ObserverStrategy>
auto rpp::operators::subscribe ( rpp::composite_disposable_wrapper disposable,
observer< Type, ObserverStrategy > && observer )

Subscribe passed observer to emissions from observable.

This overloading attaches passed disposable to observer and return it to provide ability to dispose observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Parameters
dis disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens

◆ subscribe() [6/6]

template<rpp::constraint::observer_strategy_base ObserverStrategy>
requires (!constraint::observer<ObserverStrategy>)
auto rpp::operators::subscribe ( rpp::composite_disposable_wrapper disposable,
ObserverStrategy && observer_strategy )

Subscribes passed observer strategy to emissions from this observable via construction of observer.

This overloading attaches passed disposable to observer and return it to provide ability to dispose observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Parameters
dis disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens

◆ subscribe_on()

auto rpp::operators::subscribe_on ( Scheduler && scheduler)

OnSubscribe function for this observable will be scheduled via provided scheduler.

Actually this operator just schedules subscription on original observable to provided scheduler

Parameters
scheduleris scheduler used for scheduling of OnSubscribe
Warning
#include <rpp/operators/subscribe_on.hpp>
Example:
std::cout << std::this_thread::get_id() << std::endl;
rpp::source::create<int>([](const auto& sub) {
std::cout << "on_subscribe thread " << std::this_thread::get_id() << std::endl;
sub.on_next(1);
sub.on_completed();
})
| rpp::operators::subscribe([](int v) { std::cout << "[" << std::this_thread::get_id() << "] : " << v << "\n"; });
std::cout << std::this_thread::get_id() << std::endl;
// Template for output:
// TH1
// on_subscribe thread TH2
// [TH2]: 1
// TH1
See also
https://reactivex.io/documentation/operators/subscribeon.html
Examples
subscribe_on.cpp.

◆ subscribe_with_disposable() [1/4]

template<rpp::constraint::decayed_type Type>
auto rpp::operators::subscribe_with_disposable ( dynamic_observer< Type > observer)

Subscribes passed observer to emissions from this observable.

This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable

◆ subscribe_with_disposable() [2/4]

template<rpp::constraint::decayed_type Type, rpp::constraint::observer_strategy< Type > ObserverStrategy>
auto rpp::operators::subscribe_with_disposable ( observer< Type, ObserverStrategy > && observer)

Subscribes passed observer to emissions from this observable.

This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Observer must be moved in to subscribe method. (Not recommended) If you need to copy observer, convert it to dynamic_observer

◆ subscribe_with_disposable() [3/4]

template<details::on_next_like OnNext, std::invocable<> OnCompleted>
auto rpp::operators::subscribe_with_disposable ( OnNext && on_next,
OnCompleted && on_completed )

Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable.

This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable

◆ subscribe_with_disposable() [4/4]

template<details::on_next_like OnNext = rpp::utils::empty_function_any_t, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
auto rpp::operators::subscribe_with_disposable ( OnNext && on_next = {},
OnError && on_error = {},
OnCompleted && on_completed = {} )

Construct rpp::lambda_observer on the fly and subscribe it to emissions from observable.

This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable

◆ tap() [1/4]

template<std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
auto rpp::operators::tap ( OnCompleted && on_completed)

Register callbacks to inspect observable emissions and perform side-effects.

Parameters
on_completedcompletion handler
See also
https://reactivex.io/documentation/operators/do.html

◆ tap() [2/4]

template<std::invocable< const std::exception_ptr & > OnError = rpp::utils::empty_function_t<std::exception_ptr>>
auto rpp::operators::tap ( OnError && on_error)

Register callbacks to inspect observable emissions and perform side-effects.

Parameters
on_errorerror handler
See also
https://reactivex.io/documentation/operators/do.html

◆ tap() [3/4]

template<typename OnNext , std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
auto rpp::operators::tap ( OnNext && on_next,
OnCompleted && on_completed )

Register callbacks to inspect observable emissions and perform side-effects.

Parameters
on_nextnext handler
on_completedcompletion handler
See also
https://reactivex.io/documentation/operators/do.html

◆ tap() [4/4]

template<typename OnNext = rpp::utils::empty_function_any_t, std::invocable< const std::exception_ptr & > OnError = rpp::utils::empty_function_t<std::exception_ptr>, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
auto rpp::operators::tap ( OnNext && on_next,
OnError && on_error,
OnCompleted && on_completed )

Register callbacks to inspect observable emissions and perform side-effects.

Parameters
on_nextnext handler
on_errorerror handler
on_completedcompletion handler
See also
https://reactivex.io/documentation/operators/do.html

◆ timeout() [1/2]

auto rpp::operators::timeout ( rpp::schedulers::duration period,
const TScheduler & scheduler )

Forwards emissions from original observable, but emit error if no any events during specified period of time (since last emission)

Parameters
periodis maximum duration between emitted items before a timeout occurs
scheduleris scheduler used to run timer for timeout
Warning
#include <rpp/operators/timeout.hpp>
Example
auto start = rpp::schedulers::clock_type::now();
rpp::source::just(10, 30, 90, 110)
return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds{v}, rpp::schedulers::current_thread{});
})
| rpp::operators::timeout(std::chrono::milliseconds{35}, rpp::schedulers::new_thread{})
| rpp::operators::subscribe([start](int v) { std::cout << "received " << v << " at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl; },
[start](const std::exception_ptr&) {
std::cout << "received error at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl;
});
See also
https://reactivex.io/documentation/operators/timeout.html

◆ timeout() [2/2]

template<rpp::constraint::observable TFallbackObservable, rpp::schedulers::constraint::scheduler TScheduler>
auto rpp::operators::timeout ( rpp::schedulers::duration period,
TFallbackObservable && fallback_observable,
const TScheduler & scheduler )

Forwards emissions from original observable, but subscribes on fallback observable if no any events during specified period of time (since last emission)

Parameters
periodis maximum duration between emitted items before a timeout occurs
fallback_observableis observable to subscribe on when timeout reached
scheduleris scheduler used to run timer for timeout
Warning
#include <rpp/operators/timeout.hpp>
Example
auto start = rpp::schedulers::clock_type::now();
rpp::source::just(10, 30, 90, 110)
return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds{v}, rpp::schedulers::current_thread{});
})
| rpp::operators::subscribe([start](int v) { std::cout << "received " << v << " at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl; },
[start](const std::exception_ptr&) {
std::cout << "received error at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl;
});
See also
https://reactivex.io/documentation/operators/timeout.html
Examples
timeout.cpp.