ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
timeout.cpp
#include <rpp/rpp.hpp>
#include <chrono>
#include <ctime>
#include <iostream>
int main() // NOLINT(bugprone-exception-escape)
{
{
auto start = rpp::schedulers::clock_type::now();
rpp::source::just(10, 30, 90, 110)
return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds{v}, rpp::schedulers::current_thread{});
})
| rpp::operators::subscribe([start](int v) { std::cout << "received " << v << " at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl; },
[start](const std::exception_ptr&) {
std::cout << "received error at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl;
});
}
{
auto start = rpp::schedulers::clock_type::now();
rpp::source::just(10, 30, 90, 110)
return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds{v}, rpp::schedulers::current_thread{});
})
| rpp::operators::timeout(std::chrono::milliseconds{35}, rpp::schedulers::new_thread{})
| rpp::operators::subscribe([start](int v) { std::cout << "received " << v << " at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl; },
[start](const std::exception_ptr&) {
std::cout << "received error at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl;
});
}
}
Schedules execution of schedulables via queueing tasks to the caller thread with priority to time_poi...
Definition current_thread.hpp:86
immediately calls provided schedulable or waits for time_point (in the caller-thread)
Definition immediate.hpp:65
Scheduler which schedules invoking of schedulables to another thread via queueing tasks with priority...
Definition new_thread.hpp:31
auto just(const TScheduler &scheduler, T &&item, Ts &&... items)
Creates rpp::observable that emits a particular items and completes.
Definition from.hpp:201
auto flat_map(Fn &&callable)
Transform the items emitted by an Observable into Observables, then flatten the emissions from those ...
Definition flat_map.hpp:64
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226
auto as_blocking()
Converts rpp::observable to rpp::blocking_observable
Definition as_blocking.hpp:47
auto timeout(rpp::schedulers::duration period, TFallbackObservable &&fallback_observable, const TScheduler &scheduler)
Forwards emissions from original observable, but subscribes on fallback observable if no any events d...
Definition timeout.hpp:203
auto delay(rpp::schedulers::duration delay_duration, Scheduler &&scheduler)
Shift the emissions from an Observable forward in time by a particular amount.
Definition delay.hpp:219