ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
Conditional Operators

Conditional operators are operators that emit items based on some condition including condition of items from other observables. More...

Functions

template<constraint::observable TTriggerObservable>
requires is_header_included<take_until_tag, TTriggerObservable>
auto observable::take_until (TTriggerObservable &&until_observable) const &
 Discard any items emitted by an Observable after a second Observable emits an item or terminates.
 
template<std::predicate< const Type & > Predicate>
requires is_header_included<take_while_tag, Predicate>
auto observable::take_while (Predicate &&predicate) const &
 Sends items provided by observable while items are satisfy predicate. When condition becomes false -> sends on_completed
 

Detailed Description

Conditional operators are operators that emit items based on some condition including condition of items from other observables.

See also
https://reactivex.io/documentation/operators.html#conditional

Function Documentation

◆ take_until()

template<constraint::decayed_type Type, typename SpecificObservable >
template<constraint::observable TTriggerObservable>
requires is_header_included<take_until_tag, TTriggerObservable>
auto observable::take_until ( TTriggerObservable &&  until_observable) const &
inline

Discard any items emitted by an Observable after a second Observable emits an item or terminates.

Warning
The take_until subscribes and begins mirroring the source Observable. It also monitors a second Observable that you provide. If this second Observable emits an item or sends a on_error/on_completed notification, the Observable returned by take_until stops mirroring the source Observable and terminates.

Actually this operator just subscribes on 2 observables and completes original when until_observable emits any value

Parameters
until_observableis the observables that stops the source observable from sending values when it emits one value or sends a on_error/on_completed event.
Returns
new specific_observable with the take_until operator as most recent operator.
Warning
#include <rpp/operators/take_until.hpp>
Examples
rpp::source::interval(std::chrono::seconds{1}, rpp::schedulers::trampoline{})
.take_until(rpp::source::interval(std::chrono::seconds{5}, rpp::schedulers::trampoline{}))
.subscribe([](int v) { std::cout << "-" << v; },
[](const std::exception_ptr&) { std::cout << "-x" << std::endl; },
[]() { std::cout << "-|" << std::endl; });
// source 1: -0-1-2-3-4-5-6-7- --
// source 2: ---------0---------1- --
// Output : -0-1-2-3-|
Schedules execution of schedulables via queueing tasks to the caller thread with priority to time_poi...
Definition: trampoline_scheduler.hpp:41
auto take_until(TTriggerObservable &&until_observable) const &
Discard any items emitted by an Observable after a second Observable emits an item or terminates.
Definition: take_until.hpp:67
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store internal state
  • OnNext for original observable
    • Just forward original on_next
  • OnError for original observable
    • Just forwards original on_error
  • OnCompleted for original observable
    • Just forwards original on_completed
  • OnNext for until observable
    • Emits on_completed
  • OnError for until observable
    • Just forwards on_error
  • OnCompleted for until observable
    • Just forwards on_completed
See also
https://reactivex.io/documentation/operators/takeuntil.html

◆ take_while()

template<constraint::decayed_type Type, typename SpecificObservable >
template<std::predicate< const Type & > Predicate>
requires is_header_included<take_while_tag, Predicate>
auto observable::take_while ( Predicate &&  predicate) const &
inline

Sends items provided by observable while items are satisfy predicate. When condition becomes false -> sends on_completed

Actually this operator just emits values while predicate returns true

Parameters
predicateis predicate used to check items
Returns
new specific_observable with the take_while operator as most recent operator.
Warning
#include <rpp/operators/take_while.hpp>
Example:
rpp::source::just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
.take_while([](int v) { return v != 5; })
.subscribe([](int v) { std::cout << v << " "; });
// Output: 0 1 2 3 4
Implementation details:
  • On subscribe
    • None
  • OnNext
    • Just forwards emission if predicate returns true
    • Emits OnCompleted if predicate returns false
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Just forwards original on_completed
See also
https://reactivex.io/documentation/operators/takewhile.html