Conditional operators are operators that emit items based on some condition including condition of items from other observables.
More...
template<constraint::observable TTriggerObservable>
requires is_header_included<take_until_tag, TTriggerObservable>
auto observable::take_until (TTriggerObservable &&until_observable) const &
Discard any items emitted by an Observable after a second Observable emits an item or terminates.
template<std::predicate< const Type & > Predicate>
requires is_header_included<take_while_tag, Predicate>
auto observable::take_while (Predicate &&predicate) const &
Sends items provided by observable while items are satisfy predicate. When condition becomes false -> sends on_completed
Conditional operators are operators that emit items based on some condition including condition of items from other observables.
See also https://reactivex.io/documentation/operators.html#conditional
◆ take_until()
template<constraint::decayed_type Type, typename SpecificObservable >
template<constraint::observable TTriggerObservable>
requires is_header_included<take_until_tag, TTriggerObservable>
auto observable ::take_until
(
TTriggerObservable &&
until_observable )
const &
inline
Discard any items emitted by an Observable after a second Observable emits an item or terminates.
Warning The take_until subscribes and begins mirroring the source Observable. It also monitors a second Observable that you provide. If this second Observable emits an item or sends a on_error/on_completed notification, the Observable returned by take_until stops mirroring the source Observable and terminates.
Actually this operator just subscribes on 2 observables and completes original when until_observable
emits any value
Parameters
until_observable is the observables that stops the source observable from sending values when it emits one value or sends a on_error/on_completed event.
Returns new specific_observable with the take_until operator as most recent operator.
Warning #include <rpp/operators/take_until.hpp >
Examples
.subscribe([](int v) { std::cout << "-" << v; },
[](const std::exception_ptr&) { std::cout << "-x" << std::endl; },
[]() { std::cout << "-|" << std::endl; });
Schedules execution of schedulables via queueing tasks to the caller thread with priority to time_poi...
Definition: trampoline_scheduler.hpp:41
auto take_until(TTriggerObservable &&until_observable) const &
Discard any items emitted by an Observable after a second Observable emits an item or terminates.
Definition: take_until.hpp:67
Implementation details:
On subscribe
Allocates one shared_ptr
to store internal state
OnNext for original observable
Just forward original on_next
OnError for original observable
Just forwards original on_error
OnCompleted for original observable
Just forwards original on_completed
OnNext for until observable
OnError for until observable
OnCompleted for until observable
Just forwards on_completed
See also https://reactivex.io/documentation/operators/takeuntil.html
◆ take_while()
template<constraint::decayed_type Type, typename SpecificObservable >
template<std::predicate< const Type & > Predicate>
requires is_header_included<take_while_tag, Predicate>
auto observable ::take_while
(
Predicate &&
predicate )
const &
inline
Sends items provided by observable while items are satisfy predicate. When condition becomes false -> sends on_completed
Actually this operator just emits values while predicate returns true
Parameters
predicate is predicate used to check items
Returns new specific_observable with the take_while operator as most recent operator.
Warning #include <rpp/operators/take_while.hpp >
Example: rpp::source::just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
.take_while([](int v) { return v != 5; })
.subscribe([](int v) { std::cout << v << " " ; });
Implementation details:
On subscribe
OnNext
Just forwards emission if predicate returns true
Emits OnCompleted if predicate returns false
OnError
Just forwards original on_error
OnCompleted
Just forwards original on_completed
See also https://reactivex.io/documentation/operators/takewhile.html