14#include <rpp/defs.hpp>
15#include <rpp/operators/lift.hpp>
16#include <rpp/operators/details/subscriber_with_state.hpp>
17#include <rpp/operators/fwd/delay.hpp>
18#include <rpp/subscribers/constraints.hpp>
19#include <rpp/utils/overloaded.hpp>
23IMPLEMENTATION_FILE(delay_tag);
29template<
typename T,
typename Subscriber,
typename Worker>
30class queue_based_worker final :
public std::enable_shared_from_this<queue_based_worker<T, Subscriber, Worker>>
33 queue_based_worker(schedulers::duration delay, Worker&& worker,
const Subscriber& subscriber)
35 , m_worker{std::move(worker)}
36 , m_subscriber{subscriber} {}
38 queue_based_worker(schedulers::duration delay, Worker&& worker, Subscriber&& subscriber)
40 , m_worker{std::move(worker)}
41 , m_subscriber{std::move(subscriber)} {}
47 state->emplace(std::forward<
decltype(value)>(value));
69 void emplace(TT&& item)
71 if (
const auto timepoint = emplace_safe(std::forward<TT>(item)))
73 m_worker.schedule(timepoint.value(),
74 [state = this->shared_from_this()]()-> schedulers::optional_duration
76 return state->drain_queue();
82 std::optional<schedulers::time_point> emplace_safe(TT&& item)
84 std::lock_guard lock{m_mutex};
85 const auto delay = std::is_same_v<std::exception_ptr, std::decay_t<TT>> ? schedulers::duration{0} : m_delay;
86 m_queue.emplace(++m_current_id, m_worker.now()+delay, std::forward<TT>(item));
87 if (!m_active && m_queue.size() == 1)
90 return m_queue.top().time;
95 schedulers::optional_duration drain_queue()
99 std::unique_lock lock{m_mutex};
106 auto& top = m_queue.top();
107 const auto now = m_worker.now();
109 return top.time - now;
111 auto item = std::move(top.item);
115 std::visit(utils::overloaded
117 [&](T&& v) { m_subscriber.on_next(std::move(v)); },
118 [&](
const std::exception_ptr& err) { m_subscriber.on_error(err); },
119 [&](completion) { m_subscriber.on_completed(); }
128 template<
typename TT>
129 emission(
size_t id, schedulers::time_point time, TT&& item)
131 , time{std::move(time)}
132 , item{std::forward<TT>(item)} {}
135 schedulers::time_point time{};
136 std::variant<T, std::exception_ptr, completion> item{};
138 bool operator<(
const emission& other)
const {
return std::tie(time,
id) >= std::tie(other.time, other.id); }
141 schedulers::duration m_delay;
143 Subscriber m_subscriber;
144 std::mutex m_mutex{};
145 size_t m_current_id{};
146 std::priority_queue<emission> m_queue{};
151template<constra
int::decayed_type Type, schedulers::constra
int::scheduler TScheduler>
154 RPP_NO_UNIQUE_ADDRESS TScheduler scheduler;
155 schedulers::duration delay;
157 template<constra
int::subscriber_of_type<Type> TSub>
158 auto operator()(TSub&& subscriber)
const
160 auto worker = scheduler.create_worker(subscriber.get_subscription());
161 auto subscription = subscriber.get_subscription().make_child();
164 auto state = std::make_shared<state_t>(delay, std::move(worker), std::forward<TSub>(subscriber));
166 return create_subscriber_with_state<Type>(std::move(subscription),
167 typename state_t::on_next{},
168 typename state_t::on_error{},
169 typename state_t::on_completed{},
Definition: delay.hpp:153