13#include <rpp/observables/fwd.hpp>
14#include <rpp/operators/fwd.hpp>
16#include <rpp/defs.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/utils/constraints.hpp>
19#include <rpp/utils/utils.hpp>
21#include "rpp/observers/fwd.hpp"
23namespace rpp::operators::details
25 template<rpp::constraint::observer TObserver,
30 repeating_state(TObserver&& observer,
const TObservable& observable,
const TNotifier& notifier)
31 : observer(std::move(observer))
32 , observable(observable)
37 std::atomic_bool is_inside_drain{};
39 RPP_NO_UNIQUE_ADDRESS TObserver observer;
40 RPP_NO_UNIQUE_ADDRESS TObservable observable;
41 RPP_NO_UNIQUE_ADDRESS TNotifier notifier;
44 template<
typename TStrategy, rpp::constra
int::observer TObserver,
typename TObservable,
typename TNotifier>
47 template<
typename TOuterStrategy,
53 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
55 std::shared_ptr<repeating_state<TObserver, TObservable, TNotifier>> state;
56 mutable bool locally_disposed{};
59 void on_next(T&&)
const
61 locally_disposed =
true;
64 if (state->is_inside_drain.exchange(
false, std::memory_order::seq_cst))
67 drain<TOuterStrategy>(state);
70 void on_error(
const std::exception_ptr& err)
const
72 locally_disposed =
true;
73 state->observer.on_error(err);
76 void on_completed()
const
78 locally_disposed =
true;
79 state->observer.on_completed();
84 bool is_disposed()
const {
return locally_disposed || state->is_disposed(); }
92 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
94 std::shared_ptr<repeating_state<TObserver, TObservable, TNotifier>> state;
98 bool is_disposed()
const {
return state->is_disposed(); }
101 template<
typename TStrategy, rpp::constra
int::observer TObserver,
typename TObservable,
typename TNotifier>
104 while (!state->is_disposed())
106 state->is_inside_drain.store(
true, std::memory_order::seq_cst);
109 using value_type = rpp::utils::extract_observer_type_t<TObserver>;
112 if (state->is_inside_drain.exchange(
false, std::memory_order::seq_cst))
117 state->observer.on_error(std::current_exception());
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition repeating_strategy.hpp:52
Definition repeating_strategy.hpp:91
Definition repeating_strategy.hpp:29