ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
observe_on.cpp
#include <rpp/rpp.hpp>
#include <chrono>
#include <ctime>
#include <iostream>
int main() // NOLINT(bugprone-exception-escape)
{
auto start = rpp::schedulers::clock_type::now();
rpp::source::create<int>([&start](const auto& obs) {
for (int i = 0; i < 3; ++i)
{
auto emitting_time = rpp::schedulers::clock_type::now();
std::cout << "emit " << i << " in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast<std::chrono::seconds>(emitting_time - start).count() << "s" << std::endl;
obs.on_next(i);
std::this_thread::sleep_for(std::chrono::seconds{1});
}
auto emitting_time = rpp::schedulers::clock_type::now();
std::cout << "emit error in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast<std::chrono::seconds>(emitting_time - start).count() << "s" << std::endl;
obs.on_error({});
})
auto observing_time = rpp::schedulers::clock_type::now();
std::cout << "observe " << v << " in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast<std::chrono::seconds>(observing_time - start).count() <<"s" << std::endl; },
[&](const std::exception_ptr&) {
auto observing_time = rpp::schedulers::clock_type::now();
std::cout << "observe error in thread{" << std::this_thread::get_id() << "} duration since start " << std::chrono::duration_cast<std::chrono::seconds>(observing_time - start).count() << "s" << std::endl;
});
// Template for output:
// emit 0 in thread{139800298538880} duration since start 0s
// emit 1 in thread{139800298538880} duration since start 1s
// emit 2 in thread{139800298538880} duration since start 2s
// observe 0 in thread{139800298534464} duration since start 3s
// emit error in thread{139800298538880} duration since start 3s
// observe error in thread{139800298538880} duration since start 3s
return 0;
}
Scheduler which schedules invoking of schedulables to another thread via queueing tasks with priority...
Definition new_thread.hpp:31
auto create(OnSubscribe &&on_subscribe)
Construct observable specialized with passed callback function. Most easiesest way to construct obser...
Definition create.hpp:57
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226
auto observe_on(Scheduler &&scheduler, rpp::schedulers::duration delay_duration={})
Specify the Scheduler on which an observer will observe this Observable.
Definition observe_on.hpp:38
auto as_blocking()
Converts rpp::observable to rpp::blocking_observable
Definition as_blocking.hpp:47