34 class state_t :
public std::enable_shared_from_this<state_t>
36 class current_thread_queue_guard;
39 state_t(
const asio::io_context::executor_type& executor)
40 : m_strand{executor.context()}
44 template<
typename Handler,
typename... Args,
typename Fn>
45 void defer(Fn&& fn, Handler&& handler, Args&&... args)
const
47 if (handler.is_disposed())
50 asio::post(asio::bind_executor(m_strand, [self = this->shared_from_this(), fn = std::forward<Fn>(fn), handler = std::forward<Handler>(handler), ... args = std::forward<Args>(args)]()
mutable {
51 if (handler.is_disposed())
54 current_thread_queue_guard guard{*self};
55 if (
const auto new_duration = fn(handler, args...))
56 self->defer_with_time(new_duration->value, std::move(fn), std::move(handler), std::move(args)...);
60 template<
typename Time,
typename Handler,
typename... Args,
typename Fn>
61 void defer_with_time(Time time, Fn&& fn, Handler&& handler, Args&&... args)
const
63 if (handler.is_disposed())
66 auto timer = std::make_shared<asio::basic_waitable_timer<rpp::schedulers::clock_type>>(m_strand.context(), time);
67 timer->async_wait(asio::bind_executor(m_strand, [self = this->shared_from_this(), timer, fn = std::forward<Fn>(fn), handler = std::forward<Handler>(handler), ... args = std::forward<Args>(args)](
const asio::error_code& ec)
mutable {
68 if (ec || handler.is_disposed())
71 current_thread_queue_guard guard{*self};
72 if (
const auto new_duration = fn(handler, args...))
73 self->defer_with_time(new_duration->value, std::move(fn), std::move(handler), std::move(args)...);
79 class current_thread_queue_guard
82 current_thread_queue_guard(
const state_t& state)
83 : m_process_on_destruction{!rpp::schedulers::current_thread::get_queue()}
86 if (m_process_on_destruction)
87 rpp::schedulers::current_thread::get_queue() = &m_queue;
89 ~current_thread_queue_guard()
91 if (m_process_on_destruction)
94 current_thread_queue_guard(
const current_thread_queue_guard&) =
delete;
95 current_thread_queue_guard(current_thread_queue_guard&&) =
delete;
100 bool is_disposed()
const noexcept
102 return m_schedulable->is_disposed();
105 void on_error(
const std::exception_ptr& ep)
const
107 m_schedulable->on_error(ep);
110 std::shared_ptr<rpp::schedulers::details::schedulable_base> m_schedulable;
116 while (!m_queue.is_empty())
118 const auto top = m_queue.pop();
119 if (top->is_disposed())
122 m_state.defer_with_time(
123 top->get_timepoint(),
124 [top](
const auto&) -> rpp::schedulers::optional_delay_to {
125 if (const auto advanced_call = top->make_advanced_call())
127 const auto tp = top->handle_advanced_call(*advanced_call);
128 top->set_timepoint(tp);
129 return rpp::schedulers::delay_to{tp};
135 rpp::schedulers::current_thread::get_queue() =
nullptr;
140 bool m_process_on_destruction;
141 const state_t& m_state;
145 asio::io_context::strand m_strand;
148 class worker_strategy
151 explicit worker_strategy(
const asio::io_context::executor_type& executor)
152 : m_state{std::make_shared<state_t>(executor)}
157 void defer_for(rpp::schedulers::duration duration, Fn&& fn, Handler&& handler, Args&&... args)
const
159 if (duration == rpp::schedulers::duration::zero())
160 m_state->defer(std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
162 m_state->defer_with_time(duration, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
166 void defer_to(rpp::schedulers::time_point tp, Fn&& fn, Handler&& handler, Args&&... args)
const
168 m_state->defer_with_time(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
171 static rpp::schedulers::time_point now() {
return rpp::schedulers::clock_type::now(); }
174 std::shared_ptr<state_t> m_state;
178 explicit strand(asio::io_context::executor_type executor)
179 : m_executor{std::move(executor)}
183 auto create_worker()
const
188 asio::io_context::executor_type m_executor;