ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
Conditional Operators

Conditional operators are operators that emit items based on some condition including condition of items from other observables. More...

Functions

template<rpp::constraint::observable TObservable>
auto rpp::operators::take_until (TObservable &&until_observable)
 Discard any items emitted by an Observable after a second Observable emits an item or terminates.
 
template<typename Fn >
requires (!utils::is_not_template_callable<Fn> || std::same_as<bool, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
auto rpp::operators::take_while (Fn &&predicate)
 Sends items from observable while items are satisfy predicate. When condition becomes false -> sends on_completed
 

Detailed Description

Conditional operators are operators that emit items based on some condition including condition of items from other observables.

See also
https://reactivex.io/documentation/operators.html#conditional

Function Documentation

◆ take_until()

template<rpp::constraint::observable TObservable>
auto rpp::operators::take_until ( TObservable && until_observable)

Discard any items emitted by an Observable after a second Observable emits an item or terminates.

Warning
The take_until subscribes and begins mirroring the source Observable. It also monitors a second Observable that you provide. If this second Observable emits an item or sends a on_error/on_completed notification, the Observable returned by take_until stops mirroring the source Observable and terminates.

Actually this operator just subscribes on 2 observables and completes original when until_observable emits any value

Parameters
until_observableis the observables that stops the source observable from sending values when it emits one value or sends a on_error/on_completed event.
Warning
#include <rpp/operators/take_until.hpp>
Examples
| rpp::ops::take_until(rpp::source::interval(std::chrono::seconds{5}, rpp::schedulers::current_thread{}))
| rpp::ops::subscribe([](int v) { std::cout << "-" << v; },
[](const std::exception_ptr&) { std::cout << "-x" << std::endl; },
[]() { std::cout << "-|" << std::endl; });
// source 1: -0-1-2-3-4-5-6-7- --
// source 2: ---------0---------1- --
// Output : -0-1-2-3-|
See also
https://reactivex.io/documentation/operators/takeuntil.html

◆ take_while()

template<typename Fn >
requires (!utils::is_not_template_callable<Fn> || std::same_as<bool, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
auto rpp::operators::take_while ( Fn && predicate)

Sends items from observable while items are satisfy predicate. When condition becomes false -> sends on_completed

Actually this operator just emits values while predicate returns true

Performance notes:
  • No any heap allocations at all
  • No any copies/moves of emissions, just passing to predicate by const& and then forwarding
Parameters
predicateis predicate used to check items. Accepts value from observable and returns true if value should be forwarded and false if emissions should be stopped and observable should be terminated.
Warning
#include <rpp/operators/take_while.hpp>
Example:
rpp::source::just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
| rpp::operators::take_while([](int v) { return v != 5; })
| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
// Output: 0 1 2 3 4
See also
https://reactivex.io/documentation/operators/takewhile.html
Examples
take_while.cpp.