32 class worker_strategy;
34 class state_t final :
public rpp::details::base_disposable
37 ~state_t()
noexcept override {
dispose(); }
39 template<
typename... Args>
40 void emplace_and_notify(time_point timepoint, Args&&... args)
46 std::lock_guard lock{m_mutex};
47 m_queue.emplace(timepoint, std::forward<Args>(args)...);
52 std::shared_ptr<details::schedulable_base> pop(
bool wait)
54 while (!is_disposed())
56 std::unique_lock lock{m_mutex};
57 m_cv.wait(lock, [&] {
return !wait || is_disposed() || !m_queue.is_empty(); });
62 const auto now = worker_strategy::now();
63 if (is_any_ready_schedulable_unsafe(now))
69 m_cv.wait_for(lock, m_queue.top()->get_timepoint() - now, [&]() { return is_disposed() || !m_queue.is_empty() || m_queue.top()->get_timepoint() <= worker_strategy::now(); });
74 bool is_any_ready_schedulable()
76 std::lock_guard lock{m_mutex};
77 return is_any_ready_schedulable_unsafe();
82 std::lock_guard lock{m_mutex};
83 return m_queue.is_empty();
87 bool is_any_ready_schedulable_unsafe(time_point now = worker_strategy::now())
const
89 return !m_queue.is_empty() && (m_queue.top()->is_disposed() || m_queue.top()->get_timepoint() <= now);
92 void base_dispose_impl(interface_disposable::Mode)
noexcept override
95 std::lock_guard lock{m_mutex};
102 std::mutex m_mutex{};
105 std::condition_variable m_cv{};
108 class worker_strategy
111 worker_strategy(
const std::weak_ptr<state_t>& state)
117 void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args)
const
119 if (
const auto shared = m_state.lock())
120 shared->emplace_and_notify(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
123 static rpp::schedulers::time_point now() {
return details::now(); }
126 std::weak_ptr<state_t> m_state;
130 bool is_empty()
const
132 return m_state->is_empty();
135 bool is_any_ready_schedulable()
const
137 return m_state->is_any_ready_schedulable();
140 void dispatch_if_ready()
const
142 dispatch_impl(
false);
145 void dispatch()
const
156 void dispatch_impl(
bool wait)
const
158 if (
auto top = m_state->pop(wait))
160 if (top->is_disposed())
163 if (
const auto timepoint = (*top)())
164 m_state->emplace_and_notify(timepoint.value(), std::move(top));
169 std::shared_ptr<state_t> m_state = std::make_shared<state_t>();