ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
run_loop.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/schedulers/fwd.hpp>
14
15#include <rpp/disposables/details/base_disposable.hpp>
16#include <rpp/disposables/disposable_wrapper.hpp>
17#include <rpp/schedulers/current_thread.hpp>
18#include <rpp/schedulers/details/queue.hpp>
19#include <rpp/schedulers/details/worker.hpp>
20#include <rpp/utils/functors.hpp>
21
22namespace rpp::schedulers
23{
30 class run_loop final
31 {
32 class worker_strategy;
33
34 class state_t final : public rpp::details::base_disposable
35 {
36 public:
37 ~state_t() noexcept override { dispose(); }
38
39 template<typename... Args>
40 void emplace_and_notify(time_point timepoint, Args&&... args)
41 {
42 if (is_disposed())
43 return;
44
45 {
46 std::lock_guard lock{m_mutex};
47 m_queue.emplace(timepoint, std::forward<Args>(args)...);
48 }
49 m_cv.notify_one();
50 }
51
52 std::shared_ptr<details::schedulable_base> pop(bool wait)
53 {
54 while (!is_disposed())
55 {
56 std::unique_lock lock{m_mutex};
57 m_cv.wait(lock, [&] { return !wait || is_disposed() || !m_queue.is_empty(); });
58
59 if (is_disposed())
60 break;
61
62 const auto now = worker_strategy::now();
63 if (is_any_ready_schedulable_unsafe(now))
64 return m_queue.pop();
65
66 if (!wait)
67 break;
68
69 m_cv.wait_for(lock, m_queue.top()->get_timepoint() - now, [&]() { return is_disposed() || !m_queue.is_empty() || m_queue.top()->get_timepoint() <= worker_strategy::now(); });
70 }
71 return {};
72 }
73
74 bool is_any_ready_schedulable()
75 {
76 std::lock_guard lock{m_mutex};
77 return is_any_ready_schedulable_unsafe();
78 }
79
80 bool is_empty()
81 {
82 std::lock_guard lock{m_mutex};
83 return m_queue.is_empty();
84 }
85
86 private:
87 bool is_any_ready_schedulable_unsafe(time_point now = worker_strategy::now()) const
88 {
89 return !m_queue.is_empty() && (m_queue.top()->is_disposed() || m_queue.top()->get_timepoint() <= now);
90 }
91
92 void base_dispose_impl(interface_disposable::Mode) noexcept override
93 {
94 {
95 std::lock_guard lock{m_mutex};
97 }
98 m_cv.notify_one();
99 }
100
101 private:
102 std::mutex m_mutex{};
104
105 std::condition_variable m_cv{};
106 };
107
108 class worker_strategy
109 {
110 public:
111 worker_strategy(const std::weak_ptr<state_t>& state)
112 : m_state{state}
113 {
114 }
115
116 template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
117 void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const
118 {
119 if (const auto shared = m_state.lock())
120 shared->emplace_and_notify(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
121 }
122
123 static rpp::schedulers::time_point now() { return details::now(); }
124
125 private:
126 std::weak_ptr<state_t> m_state;
127 };
128
129 public:
130 bool is_empty() const
131 {
132 return m_state->is_empty();
133 }
134
135 bool is_any_ready_schedulable() const
136 {
137 return m_state->is_any_ready_schedulable();
138 }
139
140 void dispatch_if_ready() const
141 {
142 dispatch_impl(false);
143 }
144
145 void dispatch() const
146 {
147 dispatch_impl(true);
148 }
149
150 rpp::schedulers::worker<worker_strategy> create_worker() const
151 {
153 }
154
155 private:
156 void dispatch_impl(bool wait) const
157 {
158 if (auto top = m_state->pop(wait))
159 {
160 if (top->is_disposed())
161 return;
162
163 if (const auto timepoint = (*top)())
164 m_state->emplace_and_notify(timepoint.value(), std::move(top));
165 }
166 }
167
168 private:
169 std::shared_ptr<state_t> m_state = std::make_shared<state_t>();
170 };
171} // namespace rpp::schedulers
Definition base_disposable.hpp:23
scheduler which schedules execution via queueing tasks, but execution of tasks should be manually dis...
Definition run_loop.hpp:31
Definition fwd.hpp:157