15#include <rpp/schedulers/fwd.hpp>
16#include <rpp/schedulers/details/worker.hpp>
17#include <rpp/subscriptions/composite_subscription.hpp>
18#include <rpp/schedulers/details/queue_worker_state.hpp>
19#include <rpp/utils/utilities.hpp>
20#include <rpp/schedulers/details/utils.hpp>
28namespace rpp::schedulers
42 class current_thread_schedulable;
51 : m_sub{ subscription } {}
53 bool is_subscribed()
const
60 if (!m_sub.is_subscribed())
63 const bool someone_owns_queue = s_queue.has_value();
65 const auto drain_on_exit = utils::finally_action(!someone_owns_queue ? &drain_queue : +[]{});
67 if (!someone_owns_queue)
69 s_queue = std::priority_queue<current_thread_schedulable>{};
71 if (!details::immediate_scheduling_while_condition(time_point, fn, m_sub, []() {
return s_queue->empty(); }))
75 time_point = std::max(now(), time_point);
83 if (!m_sub.is_subscribed())
86 s_queue->emplace(time_point, std::move(fn), m_sub);
89 static time_point now() {
return clock_type::now(); }
95 static void drain_queue()
97 if (!s_queue.has_value())
100 auto reset_at_final = utils::finally_action{ [] { s_queue.reset(); } };
101 std::optional<trampoline_schedulable> function{};
103 while (!s_queue->empty())
105 const auto& top = s_queue->top();
107 wait_and_extract_executable_if_subscribed(top, function);
119 static void wait_and_extract_executable_if_subscribed(
const current_thread_schedulable& schedulable, std::optional<trampoline_schedulable>& out)
121 if (!schedulable.is_subscribed())
125 if (
const auto requested_time = schedulable.get_time_point(); details::s_last_sleep_timepoint < requested_time)
127 std::this_thread::sleep_until(requested_time);
128 details::s_last_sleep_timepoint = requested_time;
130 if (!schedulable.is_subscribed())
134 out.emplace(std::move(schedulable.extract_function()));
140 current_thread_schedulable(time_point time_point,
141 std::invocable
auto&& fn,
143 : schedulable(time_point, get_thread_local_id(), std::forward<
decltype(fn)>(fn))
144 , m_subscription{std::move(subscription)} {}
146 bool is_subscribed()
const {
return m_subscription.is_subscribed(); }
149 static size_t get_thread_local_id()
151 static thread_local size_t s_id;
162 inline static thread_local std::optional<std::priority_queue<current_thread_schedulable>> s_queue{};
165 static utils::finally_action<void (*)()> own_queue_and_drain_finally_if_not_owned()
167 const bool someone_owns_queue = s_queue.has_value();
169 if (!someone_owns_queue)
170 s_queue = std::priority_queue<current_thread_schedulable>{};
172 return {!someone_owns_queue ? &drain_queue : +[] {}};
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
Definition: queue_worker_state.hpp:25
Definition: worker.hpp:31
Schedules execution of schedulables via queueing tasks to the caller thread with priority to time_poi...
Definition: trampoline_scheduler.hpp:41
Definition: worker.hpp:60
bool is_subscribed() const
indicates current status of subscription
Definition: subscription_base.hpp:51
Definition: constraints.hpp:22
Definition: worker.hpp:23