ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
current_thread.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/schedulers/fwd.hpp>
14
15#include <rpp/schedulers/details/queue.hpp>
16#include <rpp/schedulers/details/utils.hpp>
17#include <rpp/schedulers/details/worker.hpp>
18#include <rpp/utils/functors.hpp>
19
20namespace rpp::schedulers
21{
86 {
87 public:
88 friend class new_thread;
89 class worker_strategy;
90
92 {
94 return s_queue;
95 }
96
98 {
100
101 bool operator()() const { return queue.is_empty(); }
102 };
103
104
105 static void drain_queue() noexcept
106 {
107 while (get_queue() && !get_queue()->is_empty())
108 {
109 auto top = get_queue()->pop();
110 if (top->is_disposed())
111 continue;
112
113 details::sleep_until(top->get_timepoint());
114
115 while (true)
116 {
117 if (const auto res = top->make_advanced_call())
118 {
119 if (!top->is_disposed())
120 {
121 if (get_queue()->is_empty())
122 {
123 if (const auto d = std::get_if<delay_from_now>(&res->get()))
124 {
125 std::this_thread::sleep_for(d->value);
126 }
127 else
128 {
129 details::sleep_until(top->handle_advanced_call(res.value()));
130 }
131 continue;
132 }
133 const auto tp = top->handle_advanced_call(res.value());
134 get_queue()->emplace(tp, std::move(top));
135 }
136 }
137 break;
138 }
139 }
140
141 get_queue() = nullptr;
142 }
143
145 {
146 public:
147 template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
148 static void defer_for(duration duration, Fn&& fn, Handler&& handler, Args&&... args)
149 {
150 if (handler.is_disposed())
151 return;
152
153 if (!get_queue())
154 {
156 get_queue() = &queue;
157
158 const auto timepoint = details::immediate_scheduling_while_condition<worker_strategy>(duration, is_queue_is_empty{queue}, fn, handler, args...);
159 if (!timepoint || handler.is_disposed())
160 return drain_queue();
161
162 get_queue()->emplace(timepoint.value(), std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
163 return drain_queue();
164 }
165
166 get_queue()->emplace(now() + duration, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
167 }
168
169 template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
170 static void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args)
171 {
172 if (handler.is_disposed())
173 return;
174
175 if (get_queue())
176 {
177 get_queue()->emplace(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
178 }
179 else
180 {
181 defer_for(tp - now(), std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
182 }
183 }
184
185 static rpp::schedulers::time_point now() { return details::now(); }
186 };
187
188 private:
189 class own_queue_guard
190 {
191 public:
192 own_queue_guard()
193 : m_clear_on_destruction{!get_queue()}
194 {
195 if (m_clear_on_destruction)
196 get_queue() = &m_queue;
197 }
198 ~own_queue_guard()
199 {
200 if (m_clear_on_destruction)
201 drain_queue();
202 }
203 own_queue_guard(const own_queue_guard&) = delete;
204 own_queue_guard(own_queue_guard&&) = delete;
205
206 private:
207 details::schedulables_queue<worker_strategy> m_queue{};
208 bool m_clear_on_destruction{};
209 };
210
211 public:
212 static own_queue_guard own_queue_and_drain_finally_if_not_owned()
213 {
214 return own_queue_guard{};
215 }
216
217 static rpp::schedulers::worker<worker_strategy> create_worker()
218 {
219 return rpp::schedulers::worker<worker_strategy>{};
220 }
221 };
222} // namespace rpp::schedulers
Definition current_thread.hpp:145
Schedules execution of schedulables via queueing tasks to the caller thread with priority to time_poi...
Definition current_thread.hpp:86