ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
timeout.cpp
#include <rpp/rpp.hpp>
#include <iostream>
int main()
{
{
subj.get_observable()
.timeout(std::chrono::milliseconds{450}, rpp::schedulers::new_thread{})
.subscribe([](int v) { std::cout << "new value " << v << std::endl; },
[](std::exception_ptr err)
{
try
{
std::rethrow_exception(err);
}
catch (const std::exception& exc)
{
std::cout << "ERR: " << exc.what() << std::endl;
}
},
[]() { std::cout << "completed" << std::endl; });
for (int i = 0; i < 10; ++i)
{
std::this_thread::sleep_for(std::chrono::milliseconds{i * 100});
subj.get_subscriber().on_next(i);
}
// Output:
// new value 0
// new value 1
// new value 2
// new value 3
// new value 4
// ERR : Timeout reached
}
{
subj.get_observable()
.timeout(std::chrono::milliseconds{450}, rpp::source::just(100), rpp::schedulers::new_thread{})
.subscribe([](int v) { std::cout << "new value " << v << std::endl; },
[]() { std::cout << "completed" << std::endl; });
for (int i = 0; i < 10; ++i)
{
std::this_thread::sleep_for(std::chrono::milliseconds{i * 100});
subj.get_subscriber().on_next(i);
}
// Output:
//new value 0
//new value 1
//new value 2
//new value 3
//new value 4
//new value 100
//completed
}
return 0;
}
scheduler which schedules execution of schedulables via queueing tasks to another thread with priorit...
Definition: new_thread_scheduler.hpp:32
Subject which just multicasts values to observers subscribed on it. It contains two parts: subscriber...
Definition: publish_subject.hpp:78