34        class state_t : 
public std::enable_shared_from_this<state_t>
 
   36            class current_thread_queue_guard;
 
   39            state_t(
const asio::io_context::executor_type& executor)
 
   40                : m_strand{executor.context()}
 
   44            template<
typename Handler, 
typename... Args, 
typename Fn>
 
   45            void defer(Fn&& fn, Handler&& handler, Args&&... args)
 const 
   47                if (handler.is_disposed())
 
   50                asio::post(asio::bind_executor(m_strand, [self = this->shared_from_this(), fn = std::forward<Fn>(fn), handler = std::forward<Handler>(handler), ... args = std::forward<Args>(args)]() 
mutable {
 
   51                    if (handler.is_disposed())
 
   54                    current_thread_queue_guard guard{*self};
 
   55                    if (
const auto new_duration = fn(handler, args...))
 
   56                        self->defer_with_time(new_duration->value, std::move(fn), std::move(handler), std::move(args)...);
 
   60            template<
typename Time, 
typename Handler, 
typename... Args, 
typename Fn>
 
   61            void defer_with_time(Time time, Fn&& fn, Handler&& handler, Args&&... args)
 const 
   63                if (handler.is_disposed())
 
   66                auto timer = std::make_shared<asio::basic_waitable_timer<rpp::schedulers::clock_type>>(m_strand.context(), time);
 
   67                timer->async_wait(asio::bind_executor(m_strand, [self = this->shared_from_this(), timer, fn = std::forward<Fn>(fn), handler = std::forward<Handler>(handler), ... args = std::forward<Args>(args)](
const asio::error_code& ec) 
mutable {
 
   68                    if (ec || handler.is_disposed())
 
   71                    current_thread_queue_guard guard{*self};
 
   72                    if (
const auto new_duration = fn(handler, args...))
 
   73                        self->defer_with_time(new_duration->value, std::move(fn), std::move(handler), std::move(args)...);
 
   79            class current_thread_queue_guard
 
   82                current_thread_queue_guard(
const state_t& state)
 
   83                    : m_process_on_destruction{!rpp::schedulers::current_thread::get_queue()}
 
   86                    if (m_process_on_destruction)
 
   87                        rpp::schedulers::current_thread::get_queue() = &m_queue;
 
   89                ~current_thread_queue_guard()
 
   91                    if (m_process_on_destruction)
 
   94                current_thread_queue_guard(
const current_thread_queue_guard&) = 
delete;
 
   95                current_thread_queue_guard(current_thread_queue_guard&&)      = 
delete;
 
  100                    bool is_disposed() 
const noexcept 
  102                        return m_schedulable->is_disposed();
 
  105                    void on_error(
const std::exception_ptr& ep)
 const 
  107                        m_schedulable->on_error(ep);
 
  110                    std::shared_ptr<rpp::schedulers::details::schedulable_base> m_schedulable;
 
  116                    while (!m_queue.is_empty())
 
  118                        const auto top = m_queue.pop();
 
  119                        if (top->is_disposed())
 
  122                        m_state.defer_with_time(
 
  123                            top->get_timepoint(),
 
  124                            [top](
const auto&) -> rpp::schedulers::optional_delay_to {
 
  125                                if (const auto advanced_call = top->make_advanced_call())
 
  127                                    const auto tp = top->handle_advanced_call(*advanced_call);
 
  128                                    top->set_timepoint(tp);
 
  129                                    return rpp::schedulers::delay_to{tp};
 
  135                    rpp::schedulers::current_thread::get_queue() = 
nullptr;
 
  140                bool                                                                                           m_process_on_destruction;
 
  141                const state_t&                                                                                 m_state;
 
  145            asio::io_context::strand m_strand;
 
  148        class worker_strategy
 
  151            explicit worker_strategy(
const asio::io_context::executor_type& executor)
 
  152                : m_state{std::make_shared<state_t>(executor)}
 
  157            void defer_for(rpp::schedulers::duration duration, Fn&& fn, Handler&& handler, Args&&... args)
 const 
  159                if (duration == rpp::schedulers::duration::zero())
 
  160                    m_state->defer(std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
 
  162                    m_state->defer_with_time(duration, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
 
  166            void defer_to(rpp::schedulers::time_point tp, Fn&& fn, Handler&& handler, Args&&... args)
 const 
  168                m_state->defer_with_time(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
 
  171            static rpp::schedulers::time_point now() { 
return rpp::schedulers::clock_type::now(); }
 
  174            std::shared_ptr<state_t> m_state;
 
  178        explicit strand(asio::io_context::executor_type executor)
 
  179            : m_executor{std::move(executor)}
 
  183        auto create_worker()
 const 
  185            return rpp::schedulers::worker<worker_strategy>{m_executor};
 
  188        asio::io_context::executor_type m_executor;