ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
Error Handling Operators

Operators that help to recover from error notifications from an Observable. More...

Functions

template<typename Selector>
requires rpp::constraint::observable<std::invoke_result_t<Selector, std::exception_ptr>>
auto rpp::operators::on_error_resume_next (Selector &&selector)
 If an error occurs, take the result from the Selector and subscribe to that instead.
 
auto rpp::operators::retry (size_t count)
 The retry operator attempts to resubscribe to the observable when an error occurs, up to the specified number of retries.
 
auto rpp::operators::retry ()
 The infinite retry operator continuously attempts to resubscribe to the observable upon error, without a retry limit.
 
template<typename TNotifier>
requires rpp::constraint::observable<std::invoke_result_t<TNotifier, std::exception_ptr>>
auto rpp::operators::retry_when (TNotifier &&notifier)
 If an error occurs, invoke notifier and when returned observable emits a value resubscribe to the source observable. If the notifier throws or returns an error/empty observable, then error/completed emission is forwarded to original subscription.
 

Detailed Description

Operators that help to recover from error notifications from an Observable.

See also
https://reactivex.io/documentation/operators.html#error

Function Documentation

◆ on_error_resume_next()

template<typename Selector>
requires rpp::constraint::observable<std::invoke_result_t<Selector, std::exception_ptr>>
auto rpp::operators::on_error_resume_next ( Selector && selector)

If an error occurs, take the result from the Selector and subscribe to that instead.

Parameters
selectorcallable taking a std::exception_ptr and returning observable to continue on
Note
#include <rpp/operators/on_error_resume_next.hpp>
See also
https://reactivex.io/documentation/operators/catch.html

◆ retry() [1/2]

auto rpp::operators::retry ( )
inline

The infinite retry operator continuously attempts to resubscribe to the observable upon error, without a retry limit.

Note
#include <rpp/operators/retry.hpp>
Examples:
| rpp::operators::subscribe([](int v) { std::cout << v << " "; },
[](const std::exception_ptr&) { std::cout << "error" << std::endl; },
[]() { std::cout << "completed" << std::endl; });
// Output: 1 2 3 1 2 3 1 2 3 1 completed
See also
https://reactivex.io/documentation/operators/retry.html

◆ retry() [2/2]

auto rpp::operators::retry ( size_t count)
inline

The retry operator attempts to resubscribe to the observable when an error occurs, up to the specified number of retries.

Parameters
countis the number of retries
Note
#include <rpp/operators/retry.hpp>
Examples:
| rpp::operators::subscribe([](int v) { std::cout << v << " "; },
[](const std::exception_ptr&) { std::cout << "error" << std::endl; },
[]() { std::cout << "completed" << std::endl; });
// Output: 1 2 3 1 2 3 1 2 3 error
See also
https://reactivex.io/documentation/operators/retry.html

◆ retry_when()

template<typename TNotifier>
requires rpp::constraint::observable<std::invoke_result_t<TNotifier, std::exception_ptr>>
auto rpp::operators::retry_when ( TNotifier && notifier)

If an error occurs, invoke notifier and when returned observable emits a value resubscribe to the source observable. If the notifier throws or returns an error/empty observable, then error/completed emission is forwarded to original subscription.

Parameters
notifiercallable taking a std::exception_ptr and returning observable notifying when to resubscribe
Warning
retry_when along with other re-subscribing operators needs to be carefully used with hot observables, as re-subscribing to a hot observable can have unwanted behaviors. For example, a hot observable behind a replay subject can indefinitely yield an error on each re-subscription and using retry_when on it would lead to an infinite execution.
Note
#include <rpp/operators/retry_when.hpp>
Examples:
size_t retry_count = 0;
rpp::source::create<std::string>([&retry_count](const auto& sub) {
if (++retry_count != 4)
{
sub.on_error({});
}
else
{
sub.on_next(std::string{"success"});
sub.on_completed();
}
})
| rpp::operators::retry_when([](const std::exception_ptr&) {
return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{});
})
| rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; });
// Source observable is resubscribed after 5 seconds on each error emission
retry_count = 0;
rpp::source::create<std::string>([&retry_count](const auto& sub) {
if (++retry_count != 4)
{
sub.on_error({});
}
else
{
sub.on_next(std::string{"success"});
sub.on_completed();
}
})
| rpp::operators::retry_when([](const std::exception_ptr& ep) {
try
{
std::rethrow_exception(ep);
}
catch (const std::runtime_error&)
{
return rpp::source::timer(std::chrono::seconds{5}, rpp::schedulers::current_thread{});
}
catch (...)
{
throw;
}
})
| rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; });
// Source observable is resubscribed after 5 seconds only on particular error emissions
See also
https://reactivex.io/documentation/operators/retry.html