ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
strand.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/schedulers/current_thread.hpp>
14#include <rpp/schedulers/details/worker.hpp>
15
16#include <asio/basic_waitable_timer.hpp>
17#include <asio/bind_executor.hpp>
18#include <asio/strand.hpp>
19
20namespace rppasio::schedulers
21{
31 class strand
32 {
33 private:
34 class state_t : public std::enable_shared_from_this<state_t>
35 {
36 class current_thread_queue_guard;
37
38 public:
39 state_t(const asio::io_context::executor_type& executor)
40 : m_strand{executor.context()}
41 {
42 }
43
44 template<typename Handler, typename... Args, typename Fn>
45 void defer(Fn&& fn, Handler&& handler, Args&&... args) const
46 {
47 if (handler.is_disposed())
48 return;
49
50 asio::post(asio::bind_executor(m_strand, [self = this->shared_from_this(), fn = std::forward<Fn>(fn), handler = std::forward<Handler>(handler), ... args = std::forward<Args>(args)]() mutable {
51 if (handler.is_disposed())
52 return;
53
54 current_thread_queue_guard guard{*self};
55 if (const auto new_duration = fn(handler, args...))
56 self->defer_with_time(new_duration->value, std::move(fn), std::move(handler), std::move(args)...);
57 }));
58 }
59
60 template<typename Time, typename Handler, typename... Args, typename Fn>
61 void defer_with_time(Time time, Fn&& fn, Handler&& handler, Args&&... args) const
62 {
63 if (handler.is_disposed())
64 return;
65
66 auto timer = std::make_shared<asio::basic_waitable_timer<rpp::schedulers::clock_type>>(m_strand.context(), time);
67 timer->async_wait(asio::bind_executor(m_strand, [self = this->shared_from_this(), timer, fn = std::forward<Fn>(fn), handler = std::forward<Handler>(handler), ... args = std::forward<Args>(args)](const asio::error_code& ec) mutable {
68 if (ec || handler.is_disposed())
69 return;
70
71 current_thread_queue_guard guard{*self};
72 if (const auto new_duration = fn(handler, args...))
73 self->defer_with_time(new_duration->value, std::move(fn), std::move(handler), std::move(args)...);
74 }));
75 }
76
77 private:
78 // Guard draining schedulables queued to thread local queue to schedule them back to strand queue
79 class current_thread_queue_guard
80 {
81 public:
82 current_thread_queue_guard(const state_t& state)
83 : m_process_on_destruction{!rpp::schedulers::current_thread::get_queue()}
84 , m_state(state)
85 {
86 if (m_process_on_destruction)
87 rpp::schedulers::current_thread::get_queue() = &m_queue;
88 }
89 ~current_thread_queue_guard()
90 {
91 if (m_process_on_destruction)
92 process_queue();
93 }
94 current_thread_queue_guard(const current_thread_queue_guard&) = delete;
95 current_thread_queue_guard(current_thread_queue_guard&&) = delete;
96
97 private:
98 struct handler
99 {
100 bool is_disposed() const noexcept
101 {
102 return m_schedulable->is_disposed();
103 }
104
105 void on_error(const std::exception_ptr& ep) const
106 {
107 m_schedulable->on_error(ep);
108 }
109
110 std::shared_ptr<rpp::schedulers::details::schedulable_base> m_schedulable;
111 };
112
113 private:
114 void process_queue()
115 {
116 while (!m_queue.is_empty())
117 {
118 const auto top = m_queue.pop();
119 if (top->is_disposed())
120 continue;
121
122 m_state.defer_with_time(
123 top->get_timepoint(),
124 [top](const auto&) -> rpp::schedulers::optional_delay_to {
125 if (const auto advanced_call = top->make_advanced_call())
126 {
127 const auto tp = top->handle_advanced_call(*advanced_call);
128 top->set_timepoint(tp);
129 return rpp::schedulers::delay_to{tp};
130 }
131 return std::nullopt;
132 },
133 handler{top});
134 }
135 rpp::schedulers::current_thread::get_queue() = nullptr;
136 }
137
138 private:
140 bool m_process_on_destruction;
141 const state_t& m_state;
142 };
143
144 private:
145 asio::io_context::strand m_strand;
146 };
147
148 class worker_strategy
149 {
150 public:
151 explicit worker_strategy(const asio::io_context::executor_type& executor)
152 : m_state{std::make_shared<state_t>(executor)}
153 {
154 }
155
156 template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, rpp::schedulers::constraint::schedulable_fn<Handler, Args...> Fn>
157 void defer_for(rpp::schedulers::duration duration, Fn&& fn, Handler&& handler, Args&&... args) const
158 {
159 if (duration == rpp::schedulers::duration::zero())
160 m_state->defer(std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
161 else
162 m_state->defer_with_time(duration, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
163 }
164
165 template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, rpp::schedulers::constraint::schedulable_fn<Handler, Args...> Fn>
166 void defer_to(rpp::schedulers::time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const
167 {
168 m_state->defer_with_time(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
169 }
170
171 static rpp::schedulers::time_point now() { return rpp::schedulers::clock_type::now(); }
172
173 private:
174 std::shared_ptr<state_t> m_state;
175 };
176
177 public:
178 explicit strand(asio::io_context::executor_type executor)
179 : m_executor{std::move(executor)}
180 {
181 }
182
183 auto create_worker() const
184 {
185 return rpp::schedulers::worker<worker_strategy>{m_executor};
186 }
187
188 asio::io_context::executor_type m_executor;
189 };
190} // namespace rppasio::schedulers
Asio based scheduler where each worker is assigned an asio strand to execute schedulables with the gu...
Definition strand.hpp:32