28 emission(TT&& item, schedulers::time_point time)
29 : value{std::forward<TT>(item)}
34 std::variant<T, std::exception_ptr, rpp::utils::none> value{};
35 rpp::schedulers::time_point time_point{};
38 template<rpp::constra
int::observer Observer,
typename Worker, rpp::details::disposables::constra
int::disposables_container Container>
41 using T = rpp::utils::extract_observer_type_t<Observer>;
43 delay_disposable(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration delay)
44 : observer(std::move(in_observer))
45 , worker{std::move(in_worker)}
50 RPP_NO_UNIQUE_ADDRESS Observer observer;
51 RPP_NO_UNIQUE_ADDRESS Worker worker;
52 rpp::schedulers::duration delay;
55 std::queue<emission<T>> queue;
59 template<rpp::constra
int::observer Observer,
typename Worker, rpp::details::disposables::constra
int::disposables_container Container>
69 template<rpp::constra
int::observer Observer,
typename Worker, rpp::details::disposables::constra
int::disposables_container Container,
bool ClearOnError>
72 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean;
73 std::shared_ptr<delay_disposable<Observer, Worker, Container>> disposable{};
80 bool is_disposed()
const
82 return disposable->is_disposed();
86 void on_next(T&& v)
const
88 emplace(std::forward<T>(v));
91 void on_error(
const std::exception_ptr& err)
const noexcept
97 void on_completed()
const noexcept
104 template<
typename TT>
105 void emplace(TT&& value)
const
107 if (
const auto tp = emplace_safe(std::forward<TT>(value)))
109 disposable->worker.schedule(
116 template<
typename TT>
117 std::optional<rpp::schedulers::time_point> emplace_safe(TT&& item)
const
119 std::lock_guard lock{disposable->mutex};
122 disposable->queue = std::queue<emission<rpp::utils::extract_observer_type_t<Observer>>>{};
123 disposable->observer.on_error(std::forward<TT>(item));
128 const auto tp = disposable->worker.now() + disposable->delay;
129 disposable->queue.emplace(std::forward<TT>(item), tp);
130 if (!disposable->is_active)
132 disposable->is_active =
true;
143 std::unique_lock lock{disposable->mutex};
144 if (disposable->queue.empty())
146 disposable->is_active =
false;
150 auto& top = disposable->queue.front();
151 if (top.time_point > disposable->worker.now())
152 return schedulers::optional_delay_to{top.time_point};
154 auto item = std::move(top.value);
155 disposable->queue.pop();
158 std::visit(
rpp::utils::overloaded{[&](rpp::utils::extract_observer_type_t<Observer>&& v) { disposable->observer.on_next(std::move(v)); },
159 [&](
const std::exception_ptr& err) { disposable->observer.on_error(err); },
161 disposable->observer.on_completed();
171 template<rpp::constra
int::decayed_type T>
174 using result_type = T;
177 template<rpp::details::observables::constra
int::disposables_strategy Prev>
180 rpp::schedulers::duration duration;
181 RPP_NO_UNIQUE_ADDRESS Scheduler scheduler;
183 template<rpp::constra
int::decayed_type Type, rpp::details::observables::constra
int::disposables_strategy DisposableStrategy, rpp::constra
int::observer Observer>
184 auto lift_with_disposables_strategy(Observer&&
observer)
const
186 using worker_t = rpp::schedulers::utils::get_worker_t<Scheduler>;
187 using container =
typename DisposableStrategy::disposables_container;
190 auto ptr = disposable.lock();
191 ptr->observer.set_upstream(disposable.as_weak());
auto delay(rpp::schedulers::duration delay_duration, Scheduler &&scheduler)
Shift the emissions from an Observable forward in time by a particular amount.
Definition delay.hpp:219