ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
new_thread.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/disposables/details/base_disposable.hpp>
14#include <rpp/schedulers/current_thread.hpp>
15
16#include <atomic>
17#include <condition_variable>
18#include <memory>
19#include <mutex>
20#include <thread>
21
22namespace rpp::schedulers
23{
31 {
32 class state_t final
33 {
34 public:
35 state_t() = default;
36
37 ~state_t() noexcept
38 {
39 if (!m_thread.joinable())
40 return;
41
42 {
43 std::lock_guard lock{m_state->mutex};
44 m_state->is_stoping = true;
45 }
46 m_state->cv.notify_all();
47 m_thread.detach();
48 }
49
50 template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
51 void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args)
52 {
53 m_state->queue.emplace(time_point, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
54 m_state->has_fresh_data.store(true);
55 }
56
57 private:
58 struct queue_data : public details::shared_queue_data
59 {
61 bool is_stoping{};
62 std::atomic_bool has_fresh_data{false};
63 };
64
65 static void data_thread(std::shared_ptr<queue_data> state)
66 {
67 current_thread::get_queue() = &state->queue;
68
69 while (true)
70 {
71 std::unique_lock lock{state->mutex};
72 if (state->queue.is_empty() && state->is_stoping)
73 break;
74
75 state->cv.wait(lock, [&] { return !state->queue.is_empty() || state->is_stoping; });
76
77 if (state->queue.is_empty())
78 break;
79
80 if (state->queue.top()->is_disposed())
81 {
82 state->queue.pop();
83 continue;
84 }
85
86 if (details::s_last_now_time < state->queue.top()->get_timepoint())
87 {
88 if (const auto now = worker_strategy::now(); now < state->queue.top()->get_timepoint())
89 {
90 state->cv.wait_for(lock, state->queue.top()->get_timepoint() - now, [&] { return state->queue.top()->is_disposed() || worker_strategy::now() >= state->queue.top()->get_timepoint(); });
91 continue;
92 }
93 }
94
95 auto top = state->queue.pop();
96 state->has_fresh_data.store(!state->queue.is_empty());
97 lock.unlock();
98
99 while (true)
100 {
101 if (const auto res = top->make_advanced_call())
102 {
103 if (!top->is_disposed())
104 {
105 if (res->can_run_immediately() && !state->has_fresh_data.load())
106 continue;
107
108 const auto tp = top->handle_advanced_call(res.value());
109 state->queue.emplace(tp, std::move(top));
110 }
111 }
112 break;
113 }
114 }
115
116 current_thread::get_queue() = nullptr;
117 }
118
119 private:
120 std::shared_ptr<queue_data> m_state = std::make_shared<queue_data>();
121
122 RPP_CALL_DURING_CONSTRUCTION(m_state->queue = details::schedulables_queue<current_thread::worker_strategy>(m_state));
123
124 std::thread m_thread{&data_thread, m_state};
125 };
126
127 public:
128 class worker_strategy
129 {
130 public:
131 worker_strategy() = default;
132
133 template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
134 void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const
135 {
136 m_state->defer_to(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
137 }
138
139 static rpp::schedulers::time_point now() { return details::now(); }
140
141 private:
142 std::shared_ptr<state_t> m_state = std::make_shared<state_t>();
143 };
144
145 static rpp::schedulers::worker<worker_strategy> create_worker()
146 {
148 }
149 };
150} // namespace rpp::schedulers
Scheduler which schedules invoking of schedulables to another thread via queueing tasks with priority...
Definition new_thread.hpp:31
Definition worker.hpp:23