ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
take_until.cpp
#include <rpp/rpp.hpp>
#include <chrono>
#include <iostream>
int main()
{
| rpp::ops::subscribe([](int v) { std::cout << "-" << v; },
[](const std::exception_ptr&) { std::cout << "-x" << std::endl; },
[]() { std::cout << "-|" << std::endl; });
// source 1: -0-1-2-3-4-5-6-7- --
// source 2: ---------0---------1- --
// Output : -0-1-2-3-|
| rpp::ops::take_until(rpp::source::error<bool>(std::make_exception_ptr(std::runtime_error{""})))
| rpp::ops::subscribe([](int v) { std::cout << "-" << v; },
[](const std::exception_ptr&) { std::cout << "-x" << std::endl; },
[]() { std::cout << "-|" << std::endl; });
// source 1: -
// source 2: -x
// Output : -x
return 0;
}
Schedules execution of schedulables via queueing tasks to the caller thread with priority to time_poi...
Definition current_thread.hpp:86
auto take_until(TObservable &&until_observable)
Discard any items emitted by an Observable after a second Observable emits an item or terminates.
Definition take_until.hpp:142
auto never()
Creates rpp::observable that emits no items and does not terminate.
Definition never.hpp:46
auto interval(rpp::schedulers::duration initial, rpp::schedulers::duration period, TScheduler &&scheduler)
Creates rpp::observable that emits a sequential integer every specified time interval,...
Definition interval.hpp:72
auto error(std::exception_ptr err)
Creates rpp::observable that emits no items and terminates with an error.
Definition error.hpp:49
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226