ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
timeout.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/operators/details/strategy.hpp>
16#include <rpp/sources/error.hpp>
17#include <rpp/utils/exceptions.hpp>
18#include <rpp/utils/utils.hpp>
19
20namespace rpp::operators::details
21{
22 template<rpp::constraint::observer TObserver, rpp::constraint::observable TFallbackObservable, rpp::details::disposables::constraint::disposables_container Container>
23 class timeout_disposable final : public rpp::composite_disposable_impl<Container>
24 {
25 public:
27 {
28 RPP_NO_UNIQUE_ADDRESS TObserver observer;
29 rpp::schedulers::time_point timeout;
30 };
31
32 timeout_disposable(TObserver&& observer, rpp::schedulers::duration period, const TFallbackObservable& fallback, rpp::schedulers::time_point timeout)
33 : m_observer_with_timeout{observer_with_timeout{std::move(observer), timeout}}
34 , m_period{period}
35 , m_fallback{fallback}
36 {
37 }
38 rpp::utils::pointer_under_lock<observer_with_timeout> get_observer_with_timeout_under_lock() { return m_observer_with_timeout; }
39
40 const TFallbackObservable& get_fallback() const { return m_fallback; }
41
42 rpp::schedulers::duration get_period() const { return m_period; }
43
44 private:
46
47 const rpp::schedulers::duration m_period;
48 RPP_NO_UNIQUE_ADDRESS const TFallbackObservable m_fallback;
49 };
50
51 template<rpp::constraint::observer TObserver, rpp::constraint::observable TFallbackObservable, rpp::details::disposables::constraint::disposables_container Container>
53 {
54 std::shared_ptr<timeout_disposable<TObserver, TFallbackObservable, Container>> disposable;
55
56 bool is_disposed() const { return disposable->is_disposed(); }
57
58 void on_error(const std::exception_ptr& err) const
59 {
60 disposable->get_observer_with_timeout_under_lock()->observer.on_error(err);
61 }
62 };
63
64 template<rpp::constraint::observer TObserver, rpp::constraint::observable TFallbackObservable, rpp::details::disposables::constraint::disposables_container Container, rpp::schedulers::constraint::scheduler TScheduler>
66 {
67 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
68
69 std::shared_ptr<timeout_disposable<TObserver, TFallbackObservable, Container>> disposable;
70
71 void set_upstream(const rpp::disposable_wrapper& d) const
72 {
73 disposable->add(d);
74 }
75
76 bool is_disposed() const
77 {
78 return disposable->is_disposed();
79 }
80
81 template<typename T>
82 void on_next(T&& v) const
83 {
84 auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock();
85 obs_with_timeout->observer.on_next(std::forward<T>(v));
86 obs_with_timeout->timeout = rpp::schedulers::utils::get_worker_t<TScheduler>::now() + disposable->get_period();
87 }
88
89 void on_error(const std::exception_ptr& err) const noexcept
90 {
91 const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock();
92 if (!disposable->is_disposed())
93 obs_with_timeout->observer.on_error(err);
94 }
95
96 void on_completed() const noexcept
97 {
98 const auto obs_with_timeout = disposable->get_observer_with_timeout_under_lock();
99 if (!disposable->is_disposed())
100 obs_with_timeout->observer.on_completed();
101 }
102 };
103
104 template<rpp::constraint::observable TFallbackObservable, rpp::schedulers::constraint::scheduler TScheduler>
106 {
107 template<rpp::constraint::decayed_type T>
109 {
110 static_assert(rpp::constraint::observable_of_type<TFallbackObservable, T>, "TFallbackObservable should be the same type as T");
111
112 using result_type = T;
113
114 constexpr static bool own_current_queue = true;
115 };
116
117 template<rpp::details::observables::constraint::disposables_strategy Prev>
118 using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>;
119
120 rpp::schedulers::duration period;
121 RPP_NO_UNIQUE_ADDRESS TFallbackObservable fallback;
122 RPP_NO_UNIQUE_ADDRESS TScheduler scheduler;
123
124 template<rpp::constraint::decayed_type Type, rpp::details::observables::constraint::disposables_strategy DisposableStrategy, rpp::constraint::observer Observer>
125 auto lift_with_disposables_strategy(Observer&& observer) const
126 {
127 using worker_t = rpp::schedulers::utils::get_worker_t<TScheduler>;
128 using container = typename DisposableStrategy::disposables_container;
129
130 const auto timeout = worker_t::now() + period;
131
132 const auto disposable = disposable_wrapper_impl<timeout_disposable<std::decay_t<Observer>, TFallbackObservable, container>>::make(std::forward<Observer>(observer), period, fallback, timeout);
133 auto ptr = disposable.lock();
134 ptr->get_observer_with_timeout_under_lock()->observer.set_upstream(disposable.as_weak());
135
136 const auto worker = scheduler.create_worker();
137 using wrapper = timeout_disposable_wrapper<std::decay_t<Observer>, TFallbackObservable, container>;
138 worker.schedule(
139 timeout,
140 [](wrapper& handler) -> rpp::schedulers::optional_delay_to {
141 auto locked_obs_with_timeout = handler.disposable->get_observer_with_timeout_under_lock();
142 if (rpp::schedulers::utils::get_worker_t<TScheduler>::now() < locked_obs_with_timeout->timeout)
143 return rpp::schedulers::delay_to(locked_obs_with_timeout->timeout);
144
145 if (!handler.disposable->is_disposed())
146 {
147 handler.disposable->dispose();
148 handler.disposable->get_fallback().subscribe(std::move(locked_obs_with_timeout->observer));
149 }
150 return std::nullopt;
151 },
152 wrapper{ptr});
153
154 return rpp::observer<Type, timeout_observer_strategy<std::decay_t<Observer>, TFallbackObservable, container, TScheduler>>{std::move(ptr)};
155 }
156 };
157
158 template<rpp::schedulers::constraint::scheduler TScheduler>
160 {
161 template<rpp::constraint::decayed_type T>
163 {
164 using result_type = T;
165
166 constexpr static bool own_current_queue = true;
167 };
168
169 template<rpp::details::observables::constraint::disposables_strategy Prev>
170 using updated_optimal_disposables_strategy = typename timeout_t<rpp::error_observable<int>, TScheduler>::template updated_optimal_disposables_strategy<Prev>;
171
172 rpp::schedulers::duration period;
173 RPP_NO_UNIQUE_ADDRESS TScheduler scheduler;
174
175 template<rpp::constraint::decayed_type Type, rpp::details::observables::constraint::disposables_strategy DisposableStrategy, rpp::constraint::observer Observer>
176 auto lift_with_disposables_strategy(Observer&& observer) const
177 {
178 return timeout_t<rpp::error_observable<Type>, TScheduler>{period, rpp::source::error<rpp::utils::extract_observer_type_t<Observer>>(std::make_exception_ptr(rpp::utils::timeout_reached{"Timeout reached"})), scheduler}
179 .template lift_with_disposables_strategy<Type, DisposableStrategy>(std::forward<Observer>(observer));
180 }
181 };
182} // namespace rpp::operators::details
183
184namespace rpp::operators
185{
203 * @ingroup utility_operators
204 * @see https://reactivex.io/documentation/operators/timeout.html
205 */
206 template<rpp::constraint::observable TFallbackObservable, rpp::schedulers::constraint::scheduler TScheduler>
207 auto timeout(rpp::schedulers::duration period, TFallbackObservable&& fallback_observable, const TScheduler& scheduler)
208 {
209 return details::timeout_t<std::decay_t<TFallbackObservable>, TScheduler>{period, std::forward<TFallbackObservable>(fallback_observable), scheduler};
210 }
211
224 * @par Example
225 * @snippet timeout.cpp default
226 *
227 * @ingroup utility_operators
228 * @see https://reactivex.io/documentation/operators/timeout.html
229 */
230 template<rpp::schedulers::constraint::scheduler TScheduler>
231 auto timeout(rpp::schedulers::duration period, const TScheduler& scheduler)
232 {
233 return details::timeout_with_error_t<TScheduler>{period, scheduler};
234 }
235} // namespace rpp::operators
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:31
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition utils.hpp:260
auto error(std::exception_ptr err)
Creates rpp::observable that emits no items and terminates with an error.
Definition error.hpp:49
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
auto timeout(rpp::schedulers::duration period, TFallbackObservable &&fallback_observable, const TScheduler &scheduler)
Forwards emissions from original observable, but subscribes on fallback observable if no any events d...
Definition timeout.hpp:203
Definition disposables_strategy.hpp:29
Definition timeout.hpp:106
Provide timepoint of next execution explicitly.
Definition fwd.hpp:70
Definition exceptions.hpp:28