ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
Utility Operators

Utility operators are operators that provide some extra functionality without changing of original values, but changing of behaviour. More...

Functions

template<schedulers::constraint::scheduler TScheduler>
requires is_header_included<debounce_tag, TScheduler>
auto observable::debounce (schedulers::duration period, const TScheduler &scheduler=TScheduler{}) const &
 Only emit emission if specified period of time has passed without any other emission. On each new emission timer reset.
 
template<schedulers::constraint::scheduler TScheduler>
requires is_header_included<delay_tag, TScheduler>
auto observable::delay (auto &&delay_duration, TScheduler &&scheduler) const &
 Shift the emissions from an Observable forward in time by a particular amount.
 
template<constraint::observer_of_type< Type > TObs>
requires is_header_included <do_tag, TObs>
auto observable::tap (TObs &&observer) const &
 Register an observer to be called when observable provides any events (on_next/on_error/on_completed)
 
template<constraint::on_next_fn< Type > OnNextFn, constraint::on_error_fn OnErrorFn = utils::empty_function_t<std::exception_ptr>, constraint::on_completed_fn OnCompletedFn = utils::empty_function_t<>>
requires is_header_included <do_tag, OnNextFn, OnErrorFn, OnCompletedFn>
auto observable::tap (OnNextFn &&on_next, OnErrorFn &&on_error=OnErrorFn{}, OnCompletedFn &&on_completed=OnCompletedFn{}) const &
 Register an list of actions to be called when observable provides any events (on_next/on_error/on_completed)
 
template<constraint::on_next_fn< Type > OnNextFn>
requires is_header_included <do_tag, OnNextFn>
auto observable::do_on_next (OnNextFn &&on_next) const &
 Register an callback to be called when observable provides new item (on_next)
 
template<constraint::on_error_fn OnErrorFn>
requires is_header_included <do_tag, OnErrorFn>
auto observable::do_on_error (OnErrorFn &&on_error) const &
 Register an callback to be called when observable provides error (on_error)
 
template<constraint::on_completed_fn OnCompletedFn>
requires is_header_included <do_tag, OnCompletedFn>
auto observable::do_on_completed (OnCompletedFn &&on_completed) const &
 Register an callback to be called when observable provides complete (on_completed)
 
template<schedulers::constraint::scheduler TScheduler>
requires is_header_included<observe_on_tag, TScheduler>
auto observable::observe_on (TScheduler &&scheduler) const &
 Emit emissions of observable starting from this point via provided scheduler.
 
template<typename... Args>
requires is_header_included<repeat_tag, Args...>
auto observable::repeat (size_t count) const &
 Re-subscribes on current observable provided amount of times when on_completed obtained.
 
template<typename... Args>
requires is_header_included<repeat_tag, Args...>
auto observable::repeat () const &
 Re-subscribes on current observable during on_completed infinitely.
 
template<schedulers::constraint::scheduler TScheduler>
requires is_header_included<subscribe_on_tag, TScheduler>
auto observable::subscribe_on (const TScheduler &scheduler) const &
 OnSubscribe function for this observable will be scheduled via provided scheduler.
 
template<constraint::observable_of_type< Type > FallbackObs, schedulers::constraint::scheduler TScheduler>
requires is_header_included<timeout_tag, FallbackObs, TScheduler>
auto observable::timeout (schedulers::duration period, FallbackObs &&fallback_obs, const TScheduler &scheduler=TScheduler{}) const &
 Forwards emissions from original observable, but subscribes on fallback observable if no any events during specified period of time (since last emission)
 
template<schedulers::constraint::scheduler TScheduler>
requires is_header_included<timeout_tag, TScheduler>
auto observable::timeout (schedulers::duration period, const TScheduler &scheduler=TScheduler{}) const &
 Forwards emissions from original observable, but emit error if no any events during specified period of time (since last emission)
 

Detailed Description

Utility operators are operators that provide some extra functionality without changing of original values, but changing of behaviour.

See also
https://reactivex.io/documentation/operators.html#utility

Function Documentation

◆ debounce()

template<constraint::decayed_type Type, typename SpecificObservable >
template<schedulers::constraint::scheduler TScheduler>
requires is_header_included<debounce_tag, TScheduler>
auto observable::debounce ( schedulers::duration  period,
const TScheduler &  scheduler = TScheduler{} 
) const &
inline

Only emit emission if specified period of time has passed without any other emission. On each new emission timer reset.

Actually this operator resets time of last emission, schedules action to send this emission after specified period if no any new emissions till this moment.

Parameters
periodis duration of time should be passed since emission from original observable without any new emissions to emit this emission.
scheduleris scheduler used to run timer for debounce
Returns
new specific_observable with the debounce operator as most recent operator.
Warning
#include <rpp/operators/debounce.hpp>
Example
auto start = rpp::schedulers::clock_type::now();
rpp::source::just(rpp::schedulers::current_thread{}, 1, 2, 5, 6, 9, 10)
.flat_map([](int v)
{
return rpp::source::just(v)
.delay(std::chrono::milliseconds(500) * v, rpp::schedulers::current_thread{});
})
.tap([&](int v)
{
std::cout << "Sent value " << v << " at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl;
})
.debounce(std::chrono::milliseconds{700}, rpp::schedulers::current_thread{})
.subscribe([](int v) { std::cout << "new value " << v << std::endl; },
[]() { std::cout << "completed" << std::endl; });
// Output:
// Sent value 1 at 504
// Sent value 2 at 1009
// new value 2
// Sent value 5 at 2505
// Sent value 6 at 3010
// new value 6
// Sent value 9 at 4507
// Sent value 10 at 5007
// new value 10
// completed
Schedules execution of schedulables via queueing tasks to the caller thread with priority to time_poi...
Definition: trampoline_scheduler.hpp:41
auto debounce(schedulers::duration period, const TScheduler &scheduler=TScheduler{}) const &
Only emit emission if specified period of time has passed without any other emission....
Definition: debounce.hpp:62
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store last emission and time.
    • Wraps subscriber with serialization logic to prevent race-conditions
  • OnNext
    • Saves time when emission happened
    • Saves emission
    • Schedule action to send this emission with check if no any new emissions
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Just forwards original on_completed
    • Immediately send current active emission if any
See also
https://reactivex.io/documentation/operators/debounce.html

◆ delay()

template<constraint::decayed_type Type, typename SpecificObservable >
template<schedulers::constraint::scheduler TScheduler>
requires is_header_included<delay_tag, TScheduler>
auto observable::delay ( auto &&  delay_duration,
TScheduler &&  scheduler 
) const &
inline

Shift the emissions from an Observable forward in time by a particular amount.

The delay operator modifies its source Observable by pausing for a particular increment of time (that you specify) before emitting each of the source Observable’s items. This has the effect of shifting the entire sequence of items emitted by the Observable forward in time by that specified increment.

Actually this operator just schedules emissions via provided scheduler with provided delay_duration.

Parameters
delay_durationis the delay duration for emitting items. Delay duration should be able to cast to rpp::schedulers::duration.
schedulerprovides the threading model for delay. e.g. With a new thread scheduler, the observer sees the values in a new thread after a delay duration to the subscription.
Returns
new specific_observable with the delay operator as most recent operator.
Warning
#include <rpp/operators/delay.hpp>
Examples
auto start = rpp::schedulers::clock_type::now();
rpp::source::just(1, 2, 3)
.do_on_next([&](auto&& v)
{
auto emitting_time = rpp::schedulers::clock_type::now();
std::cout << "emit " << v << " in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast<std::chrono::seconds>(emitting_time - start).count() << "s"<< std::endl;
})
.delay(std::chrono::seconds{3}, rpp::schedulers::new_thread{})
.as_blocking()
.subscribe([&](int v)
{
auto observing_time = rpp::schedulers::clock_type::now();
std::cout << "observe " << v << " in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast<std::chrono::seconds>(observing_time - start).count() <<"s" << std::endl;
});
// Template for output:
// emit 1 in thread{11772} duration since start 0s
// emit 2 in thread{11772} duration since start 0s
// emit 3 in thread{11772} duration since start 0s
// observe 1 in thread{15516} duration since start 3s
// observe 2 in thread{15516} duration since start 3s
// observe 3 in thread{15516} duration since start 3s
scheduler which schedules execution of schedulables via queueing tasks to another thread with priorit...
Definition: new_thread_scheduler.hpp:32
auto delay(auto &&delay_duration, TScheduler &&scheduler) const &
Shift the emissions from an Observable forward in time by a particular amount.
Definition: delay.hpp:60
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store internal state
  • OnNext
    • Move emission to queue and schedule action to drain queue (if not yet)
  • OnError
    • Just forwards original on_error via scheduling
  • OnCompleted
    • Just forwards original on_completed via scheduling
See also
https://reactivex.io/documentation/operators/delay.html

◆ do_on_completed()

template<constraint::decayed_type Type, typename SpecificObservable >
template<constraint::on_completed_fn OnCompletedFn>
requires is_header_included <do_tag, OnCompletedFn>
auto observable::do_on_completed ( OnCompletedFn &&  on_completed) const &
inline

Register an callback to be called when observable provides complete (on_completed)

Note
on_completed from tap would be invoked BEFORE on_completed from subscriber
Parameters
on_completed- action in case of completion
Returns
new specific_observable with the do_on_completed operator as most recent operator.
Warning
#include <rpp/operators/do.hpp>
Example
rpp::source::empty<int>()
.do_on_completed([]() { std::cout << "(TAP) Completed" << std::endl; })
.subscribe([](int) {}, []() { std::cout << "Completed" << std::endl; });
// Output:
// (TAP) Completed
// Completed
See also
https://reactivex.io/documentation/operators/do.html

◆ do_on_error()

template<constraint::decayed_type Type, typename SpecificObservable >
template<constraint::on_error_fn OnErrorFn>
requires is_header_included <do_tag, OnErrorFn>
auto observable::do_on_error ( OnErrorFn &&  on_error) const &
inline

Register an callback to be called when observable provides error (on_error)

Note
on_error from tap would be invoked BEFORE on_error from subscriber
Parameters
on_error- action over std::exception_ptr in case of any error
Returns
new specific_observable with the do_on_error operator as most recent operator.
Warning
#include <rpp/operators/do.hpp>
Example
rpp::source::error<int>(std::make_exception_ptr(std::runtime_error{""}))
.do_on_error([](std::exception_ptr) { std::cout << "(TAP) NEW error" << std::endl; })
.subscribe([](int ) {}, [](std::exception_ptr) { std::cout << "NEW error" << std::endl; });
// Output:
// (TAP) NEW error
// NEW error
See also
https://reactivex.io/documentation/operators/do.html

◆ do_on_next()

template<constraint::decayed_type Type, typename SpecificObservable >
template<constraint::on_next_fn< Type > OnNextFn>
requires is_header_included <do_tag, OnNextFn>
auto observable::do_on_next ( OnNextFn &&  on_next) const &
inline

Register an callback to be called when observable provides new item (on_next)

Note
on_next from tap would be invoked BEFORE on_next from subscriber
Parameters
on_next- action over new emitted item
Returns
new specific_observable with the do_on_next operator as most recent operator.
Warning
#include <rpp/operators/do.hpp>
Example
rpp::source::just(1, 2)
.do_on_next([](int v) { std::cout << "(TAP) NEW item " << v << std::endl; })
.subscribe([](int v) { std::cout << "NEW item " << v << std::endl; },
[]() { std::cout << "Completed" << std::endl; });
// Output:
// (TAP) NEW item 1
// NEW item 1
// (TAP) NEW item 2
// NEW item 2
// Completed
See also
https://reactivex.io/documentation/operators/do.html

◆ observe_on()

template<constraint::decayed_type Type, typename SpecificObservable >
template<schedulers::constraint::scheduler TScheduler>
requires is_header_included<observe_on_tag, TScheduler>
auto observable::observe_on ( TScheduler &&  scheduler) const &
inline

Emit emissions of observable starting from this point via provided scheduler.

Actually this operator just schedules emissions via provided scheduler. So, actually it is delay(0) operator

Parameters
scheduleris scheduler used for scheduling of OnNext
Returns
new specific_observable with the observe_on operator as most recent operator.
Warning
#include <rpp/operators/observe_on.hpp>
Example:
std::cout << std::this_thread::get_id() << std::endl;
rpp::source::just(10, 15, 20)
.as_blocking()
.subscribe([](int v) { std::cout << "[" << std::this_thread::get_id() << "] : " << v << "\n"; });
// Template for output:
// TH1
// [TH2]: 10
// [TH2]: 15
// [TH2]: 20
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store internal state
  • OnNext
    • Move emission to queue and schedule action to drain queue (if not yet)
  • OnError
    • Just forwards original on_error via scheduling
  • OnCompleted
    • Just forwards original on_completed via scheduling
See also
https://reactivex.io/documentation/operators/observeon.html

◆ repeat() [1/2]

template<constraint::decayed_type Type, typename SpecificObservable >
template<typename... Args>
requires is_header_included<repeat_tag, Args...>
auto observable::repeat ( ) const &
inline

Re-subscribes on current observable during on_completed infinitely.

Returns
new specific_observable with the repeat operator as most recent operator.
Warning
#include <rpp/operators/repeat.hpp>
Examples:
rpp::source::just(1, 2, 3)
.repeat()
.take(10)
.subscribe([](int v) { std::cout << v << " "; },
[]()
{
std::cout << "completed" << std::endl;
});
// Output: 1 2 3 1 2 3 1 2 3 1 completed
See also
https://reactivex.io/documentation/operators/repeat.html

◆ repeat() [2/2]

template<constraint::decayed_type Type, typename SpecificObservable >
template<typename... Args>
requires is_header_included<repeat_tag, Args...>
auto observable::repeat ( size_t  count) const &
inline

Re-subscribes on current observable provided amount of times when on_completed obtained.

Actually this operator re-subscribes on same observable when on_completed obtained while counter not reached zero

Parameters
counttotal amount of times subscription happens. For example:
  • count(0) - means no any subscription at all
  • count(1) - behave like ordinal observable
  • count(10) - 1 normal subscription and 9 re-subscriptions during on_completed
Returns
new specific_observable with the repeat operator as most recent operator.
Warning
#include <rpp/operators/repeat.hpp>
Examples:
rpp::source::just(1, 2, 3)
.repeat(2)
.subscribe([](int v) { std::cout << v << " "; },
[]()
{
std::cout << "completed" << std::endl;
});
// Output: 1 2 3 1 2 3 completed
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store counter
  • OnNext
    • Just forwards original on_next
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Decrements counter
    • If counter not zero, then re-subscribes on the same observable
See also
https://reactivex.io/documentation/operators/repeat.html

◆ subscribe_on()

template<constraint::decayed_type Type, typename SpecificObservable >
template<schedulers::constraint::scheduler TScheduler>
requires is_header_included<subscribe_on_tag, TScheduler>
auto observable::subscribe_on ( const TScheduler &  scheduler) const &
inline

OnSubscribe function for this observable will be scheduled via provided scheduler.

Actually this operator just schedules subscription on original observable to provided scheduler

Parameters
scheduleris scheduler used for scheduling of OnSubscribe
Returns
new specific_observable with the subscribe_on operator as most recent operator.
Warning
#include <rpp/operators/subscribe_on.hpp>
Example:
std::cout << std::this_thread::get_id() << std::endl;
rpp::source::create<int>([](const auto& sub)
{
std::cout << "on_subscribe thread " << std::this_thread::get_id() << std::endl;
sub.on_next(1);
sub.on_completed();
})
.as_blocking()
.subscribe([](int v) { std::cout << "[" << std::this_thread::get_id() << "] : " << v << "\n"; });
// Template for output:
// TH1
// on_subscribe thread TH2
// [TH2]: 1
auto subscribe_on(const TScheduler &scheduler) const &
OnSubscribe function for this observable will be scheduled via provided scheduler.
Definition: subscribe_on.hpp:46
See also
https://reactivex.io/documentation/operators/subscribeon.html

◆ tap() [1/2]

template<constraint::decayed_type Type, typename SpecificObservable >
template<constraint::on_next_fn< Type > OnNextFn, constraint::on_error_fn OnErrorFn = utils::empty_function_t<std::exception_ptr>, constraint::on_completed_fn OnCompletedFn = utils::empty_function_t<>>
requires is_header_included <do_tag, OnNextFn, OnErrorFn, OnCompletedFn>
auto observable::tap ( OnNextFn &&  on_next,
OnErrorFn &&  on_error = OnErrorFn{},
OnCompletedFn &&  on_completed = OnCompletedFn{} 
) const &
inline

Register an list of actions to be called when observable provides any events (on_next/on_error/on_completed)

Note
Callbacks from tap would be invoked BEFORE subscribed subscriber
Parameters
on_next- action over new emitted item
on_error- action over std::exception_ptr in case of any error
on_completed- action in case of completion
Returns
new specific_observable with the tap operator as most recent operator.
Warning
#include <rpp/operators/do.hpp>
Example
rpp::source::just(1, 2)
.tap([](int v) { std::cout << "(TAP) NEW item " << v << std::endl; },
[](std::exception_ptr) {},
[]() { std::cout << "(TAP) Completed" << std::endl; })
.subscribe([](int v) { std::cout << "NEW item " << v << std::endl; },
[]() { std::cout << "Completed" << std::endl; });
// Output:
// (TAP) NEW item 1
// NEW item 1
// (TAP) NEW item 2
// NEW item 2
// (TAP) Completed
// Completed
See also
https://reactivex.io/documentation/operators/do.html

◆ tap() [2/2]

template<constraint::decayed_type Type, typename SpecificObservable >
template<constraint::observer_of_type< Type > TObs>
requires is_header_included <do_tag, TObs>
auto observable::tap ( TObs &&  observer) const &
inline

Register an observer to be called when observable provides any events (on_next/on_error/on_completed)

Note
Callbacks from tap would be invoked BEFORE subscribed subscriber
Parameters
observer- observer which would accept callbacks
Returns
new specific_observable with the tap operator as most recent operator.
Warning
#include <rpp/operators/do.hpp>
Example
rpp::source::just(1, 2)
.tap(rpp::make_specific_observer<int>([](int v) { std::cout << "(TAP) NEW item " << v << std::endl; },
[]() { std::cout << "(TAP) Completed" << std::endl; }))
.subscribe([](int v) { std::cout << "NEW item " << v << std::endl; },
[]() { std::cout << "Completed" << std::endl; });
// Output:
// (TAP) NEW item 1
// NEW item 1
// (TAP) NEW item 2
// NEW item 2
// (TAP) Completed
// Completed
See also
https://reactivex.io/documentation/operators/do.html

◆ timeout() [1/2]

template<constraint::decayed_type Type, typename SpecificObservable >
template<schedulers::constraint::scheduler TScheduler>
requires is_header_included<timeout_tag, TScheduler>
auto observable::timeout ( schedulers::duration  period,
const TScheduler &  scheduler = TScheduler{} 
) const &
inline

Forwards emissions from original observable, but emit error if no any events during specified period of time (since last emission)

Parameters
periodis maximum duration between emitted items before a timeout occurs
scheduleris scheduler used to run timer for timeout
Returns
new specific_observable with the timeout operator as most recent operator.
Warning
#include <rpp/operators/timeout.hpp>
Example
subj.get_observable()
.timeout(std::chrono::milliseconds{450}, rpp::schedulers::new_thread{})
.subscribe([](int v) { std::cout << "new value " << v << std::endl; },
[](std::exception_ptr err)
{
try
{
std::rethrow_exception(err);
}
catch (const std::exception& exc)
{
std::cout << "ERR: " << exc.what() << std::endl;
}
},
[]() { std::cout << "completed" << std::endl; });
for (int i = 0; i < 10; ++i)
{
std::this_thread::sleep_for(std::chrono::milliseconds{i * 100});
subj.get_subscriber().on_next(i);
}
// Output:
// new value 0
// new value 1
// new value 2
// new value 3
// new value 4
// ERR : Timeout reached
Subject which just multicasts values to observers subscribed on it. It contains two parts: subscriber...
Definition: publish_subject.hpp:78
See also
https://reactivex.io/documentation/operators/timeout.html

◆ timeout() [2/2]

template<constraint::decayed_type Type, typename SpecificObservable >
template<constraint::observable_of_type< Type > FallbackObs, schedulers::constraint::scheduler TScheduler>
requires is_header_included<timeout_tag, FallbackObs, TScheduler>
auto observable::timeout ( schedulers::duration  period,
FallbackObs &&  fallback_obs,
const TScheduler &  scheduler = TScheduler{} 
) const &
inline

Forwards emissions from original observable, but subscribes on fallback observable if no any events during specified period of time (since last emission)

Parameters
periodis maximum duration between emitted items before a timeout occurs
fallback_obsis observable to subscribe on when timeout reached
scheduleris scheduler used to run timer for timeout
Returns
new specific_observable with the timeout operator as most recent operator.
Warning
#include <rpp/operators/timeout.hpp>
Example
subj.get_observable()
.timeout(std::chrono::milliseconds{450}, rpp::source::just(100), rpp::schedulers::new_thread{})
.subscribe([](int v) { std::cout << "new value " << v << std::endl; },
[]() { std::cout << "completed" << std::endl; });
for (int i = 0; i < 10; ++i)
{
std::this_thread::sleep_for(std::chrono::milliseconds{i * 100});
subj.get_subscriber().on_next(i);
}
// Output:
//new value 0
//new value 1
//new value 2
//new value 3
//new value 4
//new value 100
//completed
See also
https://reactivex.io/documentation/operators/timeout.html