13#include <rpp/operators/fwd.hpp>
15#include <rpp/operators/details/strategy.hpp>
16#include <rpp/sources/error.hpp>
17#include <rpp/utils/exceptions.hpp>
18#include <rpp/utils/utils.hpp>
20namespace rpp::operators::details
22 template<rpp::constra
int::observer TObserver, rpp::constra
int::observable TFallbackObservable, rpp::details::disposables::constra
int::disposables_container Container>
28 RPP_NO_UNIQUE_ADDRESS TObserver observer;
29 rpp::schedulers::time_point timeout;
32 timeout_disposable(TObserver&&
observer, rpp::schedulers::duration period,
const TFallbackObservable& fallback, rpp::schedulers::time_point
timeout)
35 , m_fallback{fallback}
38 rpp::utils::pointer_under_lock<observer_with_timeout> get_observer_with_timeout_under_lock() {
return m_observer_with_timeout; }
40 const TFallbackObservable& get_fallback()
const {
return m_fallback; }
42 rpp::schedulers::duration get_period()
const {
return m_period; }
47 const rpp::schedulers::duration m_period;
48 RPP_NO_UNIQUE_ADDRESS
const TFallbackObservable m_fallback;
51 template<rpp::constra
int::observer TObserver, rpp::constra
int::observable TFallbackObservable, rpp::details::disposables::constra
int::disposables_container Container>
54 std::shared_ptr<timeout_disposable<TObserver, TFallbackObservable, Container>> disposable;
56 bool is_disposed()
const {
return disposable->is_disposed(); }
58 void on_error(
const std::exception_ptr& err)
const
60 disposable->get_observer_with_timeout_under_lock()->observer.on_error(err);
64 template<rpp::constra
int::observer TObserver, rpp::constra
int::observable TFallbackObservable, rpp::details::disposables::constra
int::disposables_container Container, rpp::schedulers::constra
int::scheduler TScheduler>
67 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
69 std::shared_ptr<timeout_disposable<TObserver, TFallbackObservable, Container>> disposable;
76 bool is_disposed()
const
78 return disposable->is_disposed();
82 void on_next(T&& v)
const
84 auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock();
85 obs_with_timeout->observer.on_next(std::forward<T>(v));
86 obs_with_timeout->timeout = rpp::schedulers::utils::get_worker_t<TScheduler>::now() + disposable->get_period();
89 void on_error(
const std::exception_ptr& err)
const noexcept
91 const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock();
92 if (!disposable->is_disposed())
93 obs_with_timeout->observer.on_error(err);
96 void on_completed()
const noexcept
98 const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock();
99 if (!disposable->is_disposed())
100 obs_with_timeout->observer.on_completed();
104 template<rpp::constra
int::observable TFallbackObservable, rpp::schedulers::constra
int::scheduler TScheduler>
107 template<rpp::constra
int::decayed_type T>
112 using result_type = T;
114 constexpr static bool own_current_queue =
true;
117 template<rpp::details::observables::constra
int::disposables_strategy Prev>
120 rpp::schedulers::duration period;
121 RPP_NO_UNIQUE_ADDRESS TFallbackObservable fallback;
122 RPP_NO_UNIQUE_ADDRESS TScheduler scheduler;
124 template<rpp::constra
int::decayed_type Type, rpp::details::observables::constra
int::disposables_strategy DisposableStrategy, rpp::constra
int::observer Observer>
125 auto lift_with_disposables_strategy(Observer&&
observer)
const
127 using worker_t = rpp::schedulers::utils::get_worker_t<TScheduler>;
128 using container =
typename DisposableStrategy::disposables_container;
130 const auto timeout = worker_t::now() + period;
133 auto ptr = disposable.lock();
134 ptr->get_observer_with_timeout_under_lock()->observer.set_upstream(disposable.as_weak());
136 const auto worker = scheduler.create_worker();
140 [](wrapper& handler) -> rpp::schedulers::optional_delay_to {
141 auto locked_obs_with_timeout = handler.disposable->get_observer_with_timeout_under_lock();
142 if (rpp::schedulers::utils::get_worker_t<TScheduler>::now() < locked_obs_with_timeout->timeout)
145 if (!handler.disposable->is_disposed())
147 handler.disposable->dispose();
148 handler.disposable->get_fallback().subscribe(std::move(locked_obs_with_timeout->observer));
158 template<rpp::schedulers::constra
int::scheduler TScheduler>
161 template<rpp::constra
int::decayed_type T>
164 using result_type = T;
166 constexpr static bool own_current_queue =
true;
169 template<rpp::details::observables::constra
int::disposables_strategy Prev>
172 rpp::schedulers::duration period;
173 RPP_NO_UNIQUE_ADDRESS TScheduler scheduler;
175 template<rpp::constra
int::decayed_type Type, rpp::details::observables::constra
int::disposables_strategy DisposableStrategy, rpp::constra
int::observer Observer>
176 auto lift_with_disposables_strategy(Observer&&
observer)
const
179 .template lift_with_disposables_strategy<Type, DisposableStrategy>(std::forward<Observer>(
observer));
184namespace rpp::operators
206 template<rpp::constra
int::observable TFallbackObservable, rpp::schedulers::constra
int::scheduler TScheduler>
207 auto timeout(rpp::schedulers::duration period, TFallbackObservable&& fallback_observable,
const TScheduler& scheduler)
230 template<rpp::schedulers::constra
int::scheduler TScheduler>
231 auto timeout(rpp::schedulers::duration period,
const TScheduler& scheduler)
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:31
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
auto error(std::exception_ptr err)
Creates rpp::observable that emits no items and terminates with an error.
Definition error.hpp:49
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
auto timeout(rpp::schedulers::duration period, TFallbackObservable &&fallback_observable, const TScheduler &scheduler)
Forwards emissions from original observable, but subscribes on fallback observable if no any events d...
Definition timeout.hpp:203
Definition disposables_strategy.hpp:29
Definition timeout.hpp:27
Definition timeout.hpp:53
Definition timeout.hpp:66
Definition timeout.hpp:109
Definition timeout.hpp:106
Definition timeout.hpp:163
Definition timeout.hpp:160
Provide timepoint of next execution explicitly.
Definition fwd.hpp:70
Definition exceptions.hpp:28