12#include <rpp/schedulers/fwd.hpp>
13#include <rpp/schedulers/constraints.hpp>
14#include <rpp/subscriptions/subscription_guard.hpp>
17#include <condition_variable>
21namespace rpp::schedulers::details
23template<
typename SchedulableFn>
27 schedulable(time_point time_point,
size_t id, SchedulableFn&& fn)
28 : m_time_point{time_point}
30 , m_function{std::move(fn)} {}
39 return std::tie(m_time_point, m_id) >= std::tie(other.m_time_point, other.m_id);
42 time_point get_time_point()
const {
return m_time_point; }
43 SchedulableFn&& extract_function()
const {
return std::move(m_function); }
46 time_point m_time_point;
48 mutable SchedulableFn m_function;
51template<
typename SchedulableFn>
61 emplace_safe(time_point, std::forward<
decltype(fn)>(fn));
67 std::lock_guard lock{ m_mutex };
68 return m_queue.empty();
71 bool is_any_ready_schedulable()
const
73 std::lock_guard lock{ m_mutex };
74 return is_any_ready_schedulable_unsafe();
77 bool pop_if_ready(std::optional<SchedulableFn>& out)
79 std::lock_guard lock{ m_mutex };
80 if (!is_any_ready_schedulable_unsafe())
83 out.emplace(std::move(m_queue.top().extract_function()));
88 bool pop_with_wait(std::optional<SchedulableFn>& out)
92 std::unique_lock lock{m_mutex};
94 m_cv.wait(lock, [&] {
return !m_subscription->
is_subscribed() || !m_queue.empty(); });
96 if (m_queue.empty() || !m_cv.wait_until(lock,
97 m_queue.top().get_time_point(),
98 [&] { return !m_subscription->is_subscribed() || is_any_ready_schedulable_unsafe(); }))
104 out.emplace(std::move(m_queue.top().extract_function()));
111 bool is_subscribed()
const
124 std::lock_guard lock{m_mutex};
126 m_queue.emplace(time_point, ++m_current_id, std::forward<
decltype(fn)>(fn));
129 bool is_any_ready_schedulable_unsafe()
const
131 return !m_queue.empty() && m_queue.top().get_time_point() <= clock_type::now();
135 mutable std::mutex m_mutex{};
136 std::condition_variable_any m_cv{};
137 std::priority_queue<schedulable<SchedulableFn>> m_queue{};
138 size_t m_current_id{};
142 std::lock_guard lock{m_mutex};
143 m_queue = std::priority_queue<schedulable<SchedulableFn>>{};
Subscription which invoke callbable during unsubscribe.
Definition: callback_subscription.hpp:25
Definition: queue_worker_state.hpp:53
Definition: queue_worker_state.hpp:25
bool is_subscribed() const
indicates current status of subscription
Definition: subscription_base.hpp:51
void unsubscribe() const
initiates unsubscription process (if subscribed)
Definition: subscription_base.hpp:59
guard over subscription to auto-unsubscribe during destructor
Definition: subscription_guard.hpp:21
Definition: constraints.hpp:25