ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
queue.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/schedulers/fwd.hpp>
13
14#include <rpp/defs.hpp>
15#include <rpp/schedulers/details/utils.hpp>
16#include <rpp/utils/constraints.hpp>
17#include <rpp/utils/tuple.hpp>
18#include <rpp/utils/utils.hpp>
19
20#include "rpp/utils/functors.hpp"
21
22#include <condition_variable>
23#include <exception>
24#include <memory>
25#include <mutex>
26#include <optional>
27#include <utility>
28#include <variant>
29
30namespace rpp::schedulers::details
31{
33 {
34 public:
35 explicit schedulable_base(const time_point& time_point)
36 : m_time_point{time_point}
37 {
38 }
39
40 virtual ~schedulable_base() noexcept = default;
41
42 virtual std::optional<time_point> operator()() noexcept = 0;
43
45 {
46 public:
47 advanced_call(std::variant<delay_from_now, delay_from_this_timepoint, delay_to> data)
48 : m_data{data}
49 {
50 }
51
52 const std::variant<delay_from_now, delay_from_this_timepoint, delay_to>& get() const { return m_data; }
53
54 auto visit(const auto& fn) const
55 {
56 return std::visit(fn, m_data);
57 }
58
59 bool can_run_immediately() const noexcept
60 {
61 return visit(rpp::utils::overloaded{[](const delay_to&) {
62 return false;
63 },
64 [](const auto& d) {
65 return d.value == rpp::schedulers::duration::zero();
66 }});
67 }
68
69 private:
70 std::variant<delay_from_now, delay_from_this_timepoint, delay_to> m_data;
71 };
72
73 virtual std::optional<advanced_call> make_advanced_call() noexcept = 0;
74 virtual time_point handle_advanced_call(const advanced_call&) noexcept = 0;
75
76 virtual bool is_disposed() const noexcept = 0;
77
78 virtual void on_error(const std::exception_ptr& ep) const = 0;
79
80 time_point get_timepoint() const { return m_time_point; }
81
82 void set_timepoint(const time_point& timepoint) { m_time_point = timepoint; }
83
84 const std::shared_ptr<schedulable_base>& get_next() const { return m_next; }
85
86 void set_next(std::shared_ptr<schedulable_base>&& next) { m_next = std::move(next); }
87
88 void update_next(std::shared_ptr<schedulable_base>&& next)
89 {
90 if (next)
91 next->set_next(std::move(m_next));
92 m_next = std::move(next);
93 }
94
95 protected:
96 template<typename NowStrategy>
97 auto get_advanced_call_handler() const
98 {
100 [](const delay_from_now& v) {
101 return NowStrategy::now() + v.value;
102 },
103 [this](const delay_from_this_timepoint& v) {
104 return get_timepoint() + v.value;
105 },
106 [](const delay_to& v) {
107 return v.value;
108 }};
109 }
110
111 private:
112 std::shared_ptr<schedulable_base> m_next{};
113 time_point m_time_point;
114 };
115
117 requires constraint::schedulable_fn<Fn, Handler, Args...>
119 {
120 public:
121 template<rpp::constraint::decayed_same_as<Fn> TFn, typename... TArgs>
122 explicit specific_schedulable(const time_point& time_point, TFn&& in_fn, TArgs&&... in_args)
123 : schedulable_base{time_point}
124 , m_args{std::forward<TArgs>(in_args)...}
125 , m_fn{std::forward<TFn>(in_fn)}
126 {
127 }
128
129 std::optional<time_point> operator()() noexcept override
130 {
131 try
132 {
133 if (const auto res = m_args.apply(m_fn))
134 return get_advanced_call_handler<NowStrategy>()(res.value());
135 }
136 catch (...)
137 {
138 m_args.template get<0>().on_error(std::current_exception());
139 }
140 return std::nullopt;
141 }
142
143 std::optional<advanced_call> make_advanced_call() noexcept override
144 {
145 try
146 {
147 if (const auto res = m_args.apply(m_fn))
148 return advanced_call{res.value()};
149 }
150 catch (...)
151 {
152 m_args.template get<0>().on_error(std::current_exception());
153 }
154 return std::nullopt;
155 }
156
157 time_point handle_advanced_call(const advanced_call& v) noexcept override
158 {
159 return v.visit(get_advanced_call_handler<NowStrategy>());
160 }
161
162 bool is_disposed() const noexcept override { return m_args.template get<0>().is_disposed(); }
163
164 void on_error(const std::exception_ptr& ep) const override { m_args.template get<0>().on_error(ep); }
165
166 private:
167 RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple<Handler, Args...> m_args;
168 RPP_NO_UNIQUE_ADDRESS Fn m_fn;
169 };
170
171 template<typename Mutex>
173 {
174 public:
175 optional_mutex() = default;
176
177 optional_mutex(Mutex* mutex)
178 : m_mutex{mutex}
179 {
180 }
181
182 void lock() const
183 {
184 if (m_mutex)
185 m_mutex->lock();
186 }
187
188 void unlock() const
189 {
190 if (m_mutex)
191 m_mutex->unlock();
192 }
193
194 private:
195 Mutex* m_mutex{};
196 };
197
199 {
200 std::condition_variable_any cv{};
201 std::recursive_mutex mutex{};
202 };
203
204 template<typename NowStrategy>
206 {
207 public:
208 schedulables_queue() = default;
209 schedulables_queue(const schedulables_queue&) = delete;
210 schedulables_queue(schedulables_queue&&) noexcept = default;
211
212 schedulables_queue& operator=(const schedulables_queue& other) = delete;
213 schedulables_queue& operator=(schedulables_queue&& other) noexcept = default;
214
215 schedulables_queue(std::weak_ptr<shared_queue_data> shared_data)
216 : m_shared_data{std::move(shared_data)}
217 {
218 }
219
220 template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
221 void emplace(const time_point& timepoint, Fn&& fn, Handler&& handler, Args&&... args)
222 {
223 using schedulable_type = specific_schedulable<NowStrategy, std::decay_t<Fn>, std::decay_t<Handler>, std::decay_t<Args>...>;
224
225 emplace_impl(std::make_shared<schedulable_type>(timepoint, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...));
226 }
227
228 void emplace(const time_point& timepoint, std::shared_ptr<schedulable_base>&& schedulable)
229 {
230 if (!schedulable)
231 return;
232
233 schedulable->set_timepoint(timepoint);
234 emplace_impl(std::move(schedulable));
235 }
236
237 bool is_empty() const { return !m_head; }
238
239 std::shared_ptr<schedulable_base> pop()
240 {
241 return std::exchange(m_head, m_head->get_next());
242 }
243
244 const std::shared_ptr<schedulable_base>& top() const
245 {
246 return m_head;
247 }
248
249 private:
250 void emplace_impl(std::shared_ptr<schedulable_base>&& schedulable)
251 {
252 // needed in case of new_thread and current_thread shares same queue
253 const auto s = m_shared_data.lock();
254 const rpp::utils::finally_action _{[&] {
255 if (s)
256 s->cv.notify_one();
257 }};
258
259 optional_mutex<std::recursive_mutex> mutex{s ? &s->mutex : nullptr};
260 std::lock_guard lock{mutex};
261
262 if (!m_head || schedulable->get_timepoint() < m_head->get_timepoint())
263 {
264 schedulable->set_next(std::move(m_head));
265 m_head = std::move(schedulable);
266 return;
267 }
268
269 schedulable_base* current = m_head.get();
270 while (const auto& next = current->get_next())
271 {
272 if (schedulable->get_timepoint() < next->get_timepoint())
273 break;
274 current = next.get();
275 }
276
277 current->update_next(std::move(schedulable));
278 }
279
280 private:
281 std::shared_ptr<schedulable_base> m_head{};
282 std::weak_ptr<shared_queue_data> m_shared_data{};
283 };
284} // namespace rpp::schedulers::details
Calls passed function during destruction.
Definition utils.hpp:120
Definition tuple.hpp:105
Definition constraints.hpp:22
Provide timepoint of next execution explicitly.
Definition fwd.hpp:70
Definition functors.hpp:20