13#include <rpp/observables/fwd.hpp>
14#include <rpp/operators/fwd.hpp>
16#include <rpp/defs.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/utils/constraints.hpp>
19#include <rpp/utils/utils.hpp>
21namespace rpp::operators::details
23 template<rpp::constraint::observer TObserver,
28 retry_when_state(TObserver&& observer,
const TObservable& observable,
const TNotifier& notifier)
29 : observer(std::move(observer))
30 , observable(observable)
35 std::atomic_bool is_inside_drain{};
37 RPP_NO_UNIQUE_ADDRESS TObserver observer;
38 RPP_NO_UNIQUE_ADDRESS TObservable observable;
39 RPP_NO_UNIQUE_ADDRESS TNotifier notifier;
42 template<rpp::constra
int::observer TObserver,
typename TObservable,
typename TNotifier>
50 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
52 std::shared_ptr<retry_when_state<TObserver, TObservable, TNotifier>> state;
53 mutable bool locally_disposed{};
56 void on_next(T&&)
const
58 locally_disposed =
true;
61 if (state->is_inside_drain.exchange(
false, std::memory_order::seq_cst))
64 drain<TObserver, TObservable, TNotifier>(state);
67 void on_error(
const std::exception_ptr& err)
const
69 locally_disposed =
true;
70 state->observer.on_error(err);
73 void on_completed()
const
75 locally_disposed =
true;
76 state->observer.on_completed();
81 bool is_disposed()
const {
return locally_disposed || state->is_disposed(); }
89 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
91 std::shared_ptr<retry_when_state<TObserver, TObservable, TNotifier>> state;
94 void on_next(T&& v)
const
96 state->observer.on_next(std::forward<T>(v));
99 void on_error(
const std::exception_ptr& err)
const
107 state->observer.on_error(std::current_exception());
111 void on_completed()
const
113 state->observer.on_completed();
118 bool is_disposed()
const {
return state->is_disposed(); }
121 template<rpp::constra
int::observer TObserver,
typename TObservable,
typename TNotifier>
124 while (!state->is_disposed())
126 state->is_inside_drain.store(
true, std::memory_order::seq_cst);
129 using value_type = rpp::utils::extract_observer_type_t<TObserver>;
132 if (state->is_inside_drain.exchange(
false, std::memory_order::seq_cst))
137 state->observer.on_error(std::current_exception());
143 template<rpp::constra
int::decayed_type TNotifier>
146 RPP_NO_UNIQUE_ADDRESS TNotifier notifier;
148 template<rpp::constra
int::decayed_type T>
151 using result_type = T;
154 template<rpp::details::observables::constra
int::disposables_strategy Prev>
157 template<rpp::constra
int::observer TObserver,
typename TObservable>
163 ptr->observer.set_upstream(d.as_weak());
169namespace rpp::operators
192 template<
typename TNotifier>
194 auto retry_when(TNotifier&& notifier)
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition disposables_strategy.hpp:29
Definition retry_when.hpp:49
Definition retry_when.hpp:88
Definition retry_when.hpp:27
Definition retry_when.hpp:150
Definition retry_when.hpp:145