ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
retry_when.cpp
#include <rpp/rpp.hpp>
#include <iostream>
#include <string>
int main()
{
size_t retry_count = 0;
rpp::source::create<std::string>([&retry_count](const auto& sub) {
if (++retry_count != 4)
{
sub.on_error({});
}
else
{
sub.on_next(std::string{"success"});
sub.on_completed();
}
})
| rpp::operators::retry_when([](const std::exception_ptr&) {
return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{});
})
| rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; });
// Source observable is resubscribed after 5 seconds on each error emission
retry_count = 0;
rpp::source::create<std::string>([&retry_count](const auto& sub) {
if (++retry_count != 4)
{
sub.on_error({});
}
else
{
sub.on_next(std::string{"success"});
sub.on_completed();
}
})
| rpp::operators::retry_when([](const std::exception_ptr& ep) {
try
{
std::rethrow_exception(ep);
}
catch (const std::runtime_error&)
{
return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{});
}
catch (...)
{
throw;
}
})
| rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; });
// Source observable is resubscribed after 5 seconds only on particular error emissions
return 0;
}
Schedules execution of schedulables via queueing tasks to the caller thread with priority to time_poi...
Definition current_thread.hpp:86
auto create(OnSubscribe &&on_subscribe)
Construct observable specialized with passed callback function. Most easiesest way to construct obser...
Definition create.hpp:57
auto timer(rpp::schedulers::duration when, TScheduler &&scheduler)
Creates rpp::observable that emits an integer after a given delay, on the specified scheduler.
Definition timer.hpp:22
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226