ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
delay.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/disposables/composite_disposable.hpp>
17#include <rpp/operators/details/strategy.hpp>
18
19#include <mutex>
20#include <queue>
21
22namespace rpp::operators::details
23{
24 template<typename T>
25 struct emission
26 {
27 template<typename TT>
28 emission(TT&& item, schedulers::time_point time)
29 : value{std::forward<TT>(item)}
30 , time_point{time}
31 {
32 }
33
34 std::variant<T, std::exception_ptr, rpp::utils::none> value{};
35 rpp::schedulers::time_point time_point{};
36 };
37
38 template<rpp::constraint::observer Observer, typename Worker, rpp::details::disposables::constraint::disposables_container Container>
39 struct delay_disposable final : public rpp::composite_disposable_impl<Container>
40 {
41 using T = rpp::utils::extract_observer_type_t<Observer>;
42
43 delay_disposable(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration delay)
44 : observer(std::move(in_observer))
45 , worker{std::move(in_worker)}
46 , delay{delay}
47 {
48 }
49
50 RPP_NO_UNIQUE_ADDRESS Observer observer;
51 RPP_NO_UNIQUE_ADDRESS Worker worker;
52 rpp::schedulers::duration delay;
53
54 std::mutex mutex{};
55 std::queue<emission<T>> queue;
56 bool is_active{};
57 };
58
59 template<rpp::constraint::observer Observer, typename Worker, rpp::details::disposables::constraint::disposables_container Container>
61 {
62 std::shared_ptr<delay_disposable<Observer, Worker, Container>> disposable{};
63
64 bool is_disposed() const { return disposable->is_disposed(); }
65
66 void on_error(const std::exception_ptr& err) const { disposable->observer.on_error(err); }
67 };
68
69 template<rpp::constraint::observer Observer, typename Worker, rpp::details::disposables::constraint::disposables_container Container, bool ClearOnError>
71 {
72 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean;
73 std::shared_ptr<delay_disposable<Observer, Worker, Container>> disposable{};
74
75 void set_upstream(const rpp::disposable_wrapper& d) const
76 {
77 disposable->add(d);
78 }
79
80 bool is_disposed() const
81 {
82 return disposable->is_disposed();
83 }
84
85 template<typename T>
86 void on_next(T&& v) const
87 {
88 emplace(std::forward<T>(v));
89 }
90
91 void on_error(const std::exception_ptr& err) const noexcept
92 {
93 emplace(err);
94 disposable->clear();
95 }
96
97 void on_completed() const noexcept
98 {
99 emplace(rpp::utils::none{});
100 disposable->clear();
101 }
102
103 private:
104 template<typename TT>
105 void emplace(TT&& value) const
106 {
107 if (const auto tp = emplace_safe(std::forward<TT>(value)))
108 {
109 disposable->worker.schedule(
110 tp.value(),
111 [](const delay_disposable_wrapper<Observer, Worker, Container>& wrapper) { return drain_queue(wrapper.disposable); },
113 }
114 }
115
116 template<typename TT>
117 std::optional<rpp::schedulers::time_point> emplace_safe(TT&& item) const
118 {
119 std::lock_guard lock{disposable->mutex};
121 {
122 disposable->queue = std::queue<emission<rpp::utils::extract_observer_type_t<Observer>>>{};
123 disposable->observer.on_error(std::forward<TT>(item));
124 return std::nullopt;
125 }
126 else
127 {
128 const auto tp = disposable->worker.now() + disposable->delay;
129 disposable->queue.emplace(std::forward<TT>(item), tp);
130 if (!disposable->is_active)
131 {
132 disposable->is_active = true;
133 return tp;
134 }
135 return std::nullopt;
136 }
137 }
138
139 static schedulers::optional_delay_to drain_queue(const std::shared_ptr<delay_disposable<Observer, Worker, Container>>& disposable)
140 {
141 while (true)
142 {
143 std::unique_lock lock{disposable->mutex};
144 if (disposable->queue.empty())
145 {
146 disposable->is_active = false;
147 return std::nullopt;
148 }
149
150 auto& top = disposable->queue.front();
151 if (top.time_point > disposable->worker.now())
152 return schedulers::optional_delay_to{top.time_point};
153
154 auto item = std::move(top.value);
155 disposable->queue.pop();
156 lock.unlock();
157
158 std::visit(rpp::utils::overloaded{[&](rpp::utils::extract_observer_type_t<Observer>&& v) { disposable->observer.on_next(std::move(v)); },
159 [&](const std::exception_ptr& err) { disposable->observer.on_error(err); },
160 [&](rpp::utils::none) {
161 disposable->observer.on_completed();
162 }},
163 std::move(item));
164 }
165 }
166 };
167
168 template<rpp::schedulers::constraint::scheduler Scheduler, bool ClearOnError>
169 struct delay_t
170 {
171 template<rpp::constraint::decayed_type T>
173 {
174 using result_type = T;
175 };
176
177 template<rpp::details::observables::constraint::disposables_strategy Prev>
178 using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>;
179
180 rpp::schedulers::duration duration;
181 RPP_NO_UNIQUE_ADDRESS Scheduler scheduler;
182
183 template<rpp::constraint::decayed_type Type, rpp::details::observables::constraint::disposables_strategy DisposableStrategy, rpp::constraint::observer Observer>
184 auto lift_with_disposables_strategy(Observer&& observer) const
185 {
186 using worker_t = rpp::schedulers::utils::get_worker_t<Scheduler>;
187 using container = typename DisposableStrategy::disposables_container;
188
189 const auto disposable = disposable_wrapper_impl<delay_disposable<std::decay_t<Observer>, worker_t, container>>::make(std::forward<Observer>(observer), scheduler.create_worker(), duration);
190 auto ptr = disposable.lock();
191 ptr->observer.set_upstream(disposable.as_weak());
192 return rpp::observer<Type, delay_observer_strategy<std::decay_t<Observer>, worker_t, container, ClearOnError>>{std::move(ptr)};
193 }
194 };
195} // namespace rpp::operators::details
196
197namespace rpp::operators
198{
219 * @ingroup utility_operators
220 * @see https://reactivex.io/documentation/operators/delay.html
221 */
222 template<rpp::schedulers::constraint::scheduler Scheduler>
223 auto delay(rpp::schedulers::duration delay_duration, Scheduler&& scheduler)
224 {
225 return details::delay_t<std::decay_t<Scheduler>, false>{delay_duration, std::forward<Scheduler>(scheduler)};
226 }
227} // namespace rpp::operators
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:31
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition constraints.hpp:19
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
auto delay(rpp::schedulers::duration delay_duration, Scheduler &&scheduler)
Shift the emissions from an Observable forward in time by a particular amount.
Definition delay.hpp:219
Definition disposables_strategy.hpp:29
Definition delay.hpp:170
Definition utils.hpp:25
Definition functors.hpp:20