ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
trampoline_scheduler.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// TC Wang 2022 - present.
5// Distributed under the Boost Software License, Version 1.0.
6// (See accompanying file LICENSE_1_0.txt or copy at
7// https://www.boost.org/LICENSE_1_0.txt)
8//
9// Project home: https://github.com/victimsnino/ReactivePlusPlus
10//
11
12#pragma once
13
14
15#include <rpp/schedulers/fwd.hpp> // own forwarding
16#include <rpp/schedulers/details/worker.hpp> // worker
17#include <rpp/subscriptions/composite_subscription.hpp> // lifetime
18#include <rpp/schedulers/details/queue_worker_state.hpp>// state
19#include <rpp/utils/utilities.hpp>
20#include <rpp/schedulers/details/utils.hpp>
21
22#include <concepts>
23#include <chrono>
24#include <functional>
25#include <thread>
26
27
28namespace rpp::schedulers
29{
41{
42 class current_thread_schedulable;
43 class worker_strategy;
44
46
47 class worker_strategy
48 {
49 public:
50 explicit worker_strategy(const rpp::composite_subscription& subscription)
51 : m_sub{ subscription } {}
52
53 bool is_subscribed() const
54 {
55 return m_sub.is_subscribed();
56 }
57
58 void defer_at(time_point time_point, constraint::schedulable_fn auto&& fn) const
59 {
60 if (!m_sub.is_subscribed())
61 return;
62
63 const bool someone_owns_queue = s_queue.has_value();
64
65 const auto drain_on_exit = utils::finally_action(!someone_owns_queue ? &drain_queue : +[]{});
66
67 if (!someone_owns_queue)
68 {
69 s_queue = std::priority_queue<current_thread_schedulable>{};
70
71 if (!details::immediate_scheduling_while_condition(time_point, fn, m_sub, []() { return s_queue->empty(); }))
72 return;
73
74 // update time to make it more accurate due to we are going to push it to queue
75 time_point = std::max(now(), time_point);
76 }
77
78 defer_at(time_point, trampoline_schedulable{ *this, time_point, std::forward<decltype(fn)>(fn) });
79 }
80
81 void defer_at(time_point time_point, trampoline_schedulable&& fn) const
82 {
83 if (!m_sub.is_subscribed())
84 return;
85
86 s_queue->emplace(time_point, std::move(fn), m_sub);
87 }
88
89 static time_point now() { return clock_type::now(); }
90
91 private:
93 };
94
95 static void drain_queue()
96 {
97 if (!s_queue.has_value())
98 return;
99
100 auto reset_at_final = utils::finally_action{ [] { s_queue.reset(); } };
101 std::optional<trampoline_schedulable> function{};
102
103 while (!s_queue->empty())
104 {
105 const auto& top = s_queue->top();
106
107 wait_and_extract_executable_if_subscribed(top, function);
108
109 // firstly we need to pop schedulable from queue due to execution of function can add new schedulable
110 s_queue->pop();
111
112 if (function)
113 (*function)();
114
115 function.reset();
116 }
117 }
118
119 static void wait_and_extract_executable_if_subscribed(const current_thread_schedulable& schedulable, std::optional<trampoline_schedulable>& out)
120 {
121 if (!schedulable.is_subscribed())
122 return;
123
124 // wait only if needed!
125 if (const auto requested_time = schedulable.get_time_point(); details::s_last_sleep_timepoint < requested_time)
126 {
127 std::this_thread::sleep_until(requested_time);
128 details::s_last_sleep_timepoint = requested_time;
129
130 if (!schedulable.is_subscribed())
131 return;
132 }
133
134 out.emplace(std::move(schedulable.extract_function()));
135 }
136
137 class current_thread_schedulable : public details::schedulable<trampoline_schedulable>
138 {
139 public:
140 current_thread_schedulable(time_point time_point,
141 std::invocable auto&& fn,
142 rpp::composite_subscription subscription)
143 : schedulable(time_point, get_thread_local_id(), std::forward<decltype(fn)>(fn))
144 , m_subscription{std::move(subscription)} {}
145
146 bool is_subscribed() const { return m_subscription.is_subscribed(); }
147
148 private:
149 static size_t get_thread_local_id()
150 {
151 static thread_local size_t s_id;
152 return s_id++;
153 }
154
155 private:
156 rpp::composite_subscription m_subscription{};
157 };
158
162 inline static thread_local std::optional<std::priority_queue<current_thread_schedulable>> s_queue{};
163
164public:
165 static utils::finally_action<void (*)()> own_queue_and_drain_finally_if_not_owned()
166 {
167 const bool someone_owns_queue = s_queue.has_value();
168
169 if (!someone_owns_queue)
170 s_queue = std::priority_queue<current_thread_schedulable>{};
171
172 return {!someone_owns_queue ? &drain_queue : +[] {}};
173 }
174
175 static auto create_worker(const rpp::composite_subscription& sub = composite_subscription{})
176 {
177 return worker<worker_strategy>{sub};
178 }
179};
180} // namespace rpp::schedulers
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
Definition: queue_worker_state.hpp:25
Definition: worker.hpp:31
Schedules execution of schedulables via queueing tasks to the caller thread with priority to time_poi...
Definition: trampoline_scheduler.hpp:41
Definition: worker.hpp:60
bool is_subscribed() const
indicates current status of subscription
Definition: subscription_base.hpp:51
Definition: constraints.hpp:22
Definition: worker.hpp:23