13#include <rpp/operators/fwd.hpp>
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/strategy.hpp>
18namespace rpp::operators::details
20 template<rpp::constra
int::observer TObserver, constra
int::decayed_type Observable>
23 retry_state_t(TObserver&& in_observer,
const Observable& observable, std::optional<size_t> count)
25 , observer(std::move(in_observer))
26 , observable(observable)
31 std::optional<size_t> count;
32 std::atomic<bool> is_inside_drain{};
34 RPP_NO_UNIQUE_ADDRESS TObserver observer;
35 RPP_NO_UNIQUE_ADDRESS Observable observable;
38 template<rpp::constra
int::observer TObserver,
typename TObservable>
41 template<rpp::constra
int::observer TObserver,
typename TObservable>
44 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean;
46 std::shared_ptr<retry_state_t<TObserver, TObservable>> state;
49 void on_next(T&& v)
const
51 state->observer.on_next(std::forward<T>(v));
54 void on_error(
const std::exception_ptr& err)
const
56 if (state->count == 0)
58 state->observer.on_error(err);
64 if (state->is_inside_drain.exchange(
false, std::memory_order::seq_cst))
70 void on_completed()
const
72 state->observer.on_completed();
80 bool is_disposed()
const {
return state->is_disposed(); }
83 template<rpp::constra
int::observer TObserver,
typename TObservable>
86 while (!state->is_disposed())
89 --state->count.value();
90 state->is_inside_drain.store(
true, std::memory_order::seq_cst);
93 using value_type = rpp::utils::extract_observer_type_t<TObserver>;
96 if (state->is_inside_drain.exchange(
false, std::memory_order::seq_cst))
101 state->observer.on_error(std::current_exception());
109 const std::optional<size_t> count{};
111 template<rpp::constra
int::decayed_type T>
114 using result_type = T;
117 template<rpp::details::observables::constra
int::disposables_strategy Prev>
120 template<rpp::constra
int::observer TObserver,
typename TObservable>
121 void subscribe(TObserver&&
observer, TObservable&& observble)
const
126 ptr->observer.set_upstream(d.as_weak());
132namespace rpp::operators
153 inline auto retry(
size_t count)
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
auto retry()
The infinite retry operator continuously attempts to resubscribe to the observable upon error,...
Definition retry.hpp:167
Definition disposables_strategy.hpp:29