ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
new_thread_scheduler.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/schedulers/fwd.hpp> // own forwarding
14#include <rpp/schedulers/details/worker.hpp> // worker
15#include <rpp/subscriptions/composite_subscription.hpp> // lifetime
16#include <rpp/subscriptions/subscription_guard.hpp> // lifetime
17#include <rpp/schedulers/details/queue_worker_state.hpp>// state
18
19#include <concepts>
20#include <chrono>
21#include <functional>
22#include <thread>
23
24namespace rpp::schedulers
25{
32{
33public:
35 {
36 public:
38
40 {
41 if (!sub.is_subscribed())
42 return;
43
44 auto shared = std::make_shared<state>();
45 // init while it is alive as shared
46 shared->init_thread(sub);
47 m_state = shared;
48 }
49
50 bool is_subscribed() const
51 {
52 if (const auto locked = m_state.lock())
53 return locked->is_subscribed();
54 return false;
55 }
56
57 void defer_at(time_point time_point, constraint::schedulable_fn auto&& fn) const
58 {
59 defer_at(time_point, new_thread_schedulable{*this, time_point, std::forward<decltype(fn)>(fn)});
60 }
61
62 void defer_at(time_point time_point, new_thread_schedulable&& fn) const
63 {
64 if (auto locked = m_state.lock())
65 locked->defer_at(time_point, std::move(fn));
66 }
67
68 static time_point now() { return clock_type::now(); }
69
70 private:
71 class state : public std::enable_shared_from_this<state>
72 {
73 public:
74 state() = default;
75 state(const state&) = delete;
76 state(state&&) noexcept = delete;
77
78 bool is_subscribed() const { return m_sub->is_subscribed(); }
79
80 void defer_at(time_point time_point, new_thread_schedulable&& fn)
81 {
82 if (m_sub->is_subscribed())
83 m_queue.emplace(time_point, std::move(fn));
84 }
85
86 void init_thread(const rpp::composite_subscription& sub)
87 {
88 m_thread = std::thread{[state = shared_from_this()]()
89 {
90 state->data_thread();
91 }};
92 const auto callback = rpp::callback_subscription{[state = weak_from_this()]
93 {
94 const auto locked = state.lock();
95 if (!locked)
96 return;
97
98 locked->m_queue.unsubscribe();
99
100 if (locked->m_thread.joinable() && locked->m_thread.get_id() != std::this_thread::get_id())
101 locked->m_thread.join();
102 else
103 locked->m_thread.detach();
104 }};
105 sub.add(callback);
106 m_sub.reset(callback);
107 }
108
109 private:
110 void data_thread()
111 {
112 std::optional<new_thread_schedulable> fn{};
113 while (m_queue.is_subscribed())
114 {
115 if (m_queue.pop_with_wait(fn))
116 {
117 (*fn)();
118 fn.reset();
119 }
120 }
121
122 // clear
123 m_queue.unsubscribe();
124 }
125
127 std::thread m_thread{};
128 rpp::subscription_guard m_sub = rpp::subscription_base::empty();
129 };
130
131 // original shared would alive in thread!
132 std::weak_ptr<state> m_state{};
133 };
134
135 static auto create_worker(const rpp::composite_subscription& sub = composite_subscription{})
136 {
137 return worker<worker_strategy>{sub};
138 }
139};
140} // namespace rpp::schedulers
Subscription which invoke callbable during unsubscribe.
Definition: callback_subscription.hpp:25
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
std::weak_ptr< details::subscription_state > add(const TSub &sub=TSub{}) const
Add any other subscription to this as dependent.
Definition: composite_subscription.hpp:43
Definition: queue_worker_state.hpp:53
scheduler which schedules execution of schedulables via queueing tasks to another thread with priorit...
Definition: new_thread_scheduler.hpp:32
Definition: worker.hpp:31
Definition: worker.hpp:60
bool is_subscribed() const
indicates current status of subscription
Definition: subscription_base.hpp:51
void unsubscribe() const
initiates unsubscription process (if subscribed)
Definition: subscription_base.hpp:59
guard over subscription to auto-unsubscribe during destructor
Definition: subscription_guard.hpp:21
Definition: constraints.hpp:22
Definition: worker.hpp:23