ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
run_loop_scheduler.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/schedulers/fwd.hpp> // own forwarding
13#include <rpp/schedulers/details/worker.hpp> // worker
14#include <rpp/subscriptions/composite_subscription.hpp> // lifetime
15#include <rpp/schedulers/details/queue_worker_state.hpp>// state
16
17
18namespace rpp::schedulers
19{
29class run_loop final : public details::scheduler_tag
30{
31private:
32 class worker_strategy;
34
35 class worker_strategy
36 {
37 public:
39 const composite_subscription& sub)
40 : m_queue{std::move(queue)}
41 , m_sub{sub}
42 {}
43
44 bool is_subscribed() const
45 {
46 return m_sub.is_subscribed();
47 }
48
49 void defer_at(time_point time_point, constraint::schedulable_fn auto&& fn) const
50 {
51 defer_at(time_point, run_loop_schedulable{*this, time_point, std::forward<decltype(fn)>(fn)});
52 }
53
54 void defer_at(time_point time_point, run_loop_schedulable&& fn) const
55 {
56 if (m_sub.is_subscribed())
57 if (auto locked = m_queue.lock())
58 locked->emplace(time_point, std::move(fn));
59 }
60
61 static time_point now() { return clock_type::now(); }
62
63 private:
64 std::weak_ptr<details::queue_worker_state<run_loop_schedulable>> m_queue{};
66 };
67
68 class state
69 {
70 public:
72 : m_sub{sub} { }
73
74 state(const state&) = delete;
75 state(state&&) noexcept = delete;
76
77 ~state()
78 {
79 m_queue.unsubscribe();
80 m_sub.unsubscribe();
81 }
82
83 details::queue_worker_state<run_loop_schedulable>& get_queue() { return m_queue; }
84
85 const composite_subscription& get_subscription() const { return m_sub; }
86 private:
89 };
90
91public:
92
94 : m_state(std::make_shared<state>(sub)) {}
95
96 auto create_worker(const composite_subscription& sub = composite_subscription{}) const
97 {
98 auto res = m_state->get_subscription().add(sub);
99 sub.add([weak = std::weak_ptr{m_state}, res]
100 {
101 if (const auto sh = weak.lock())
102 sh->get_subscription().remove(res);
103 });
104 return worker<worker_strategy>{std::shared_ptr<details::queue_worker_state<run_loop_schedulable>>{m_state, &m_state->get_queue()}, sub};
105 }
106
107 bool is_empty() const
108 {
109 return m_state->get_queue().is_empty();
110 }
111
112 bool is_any_ready_schedulable() const
113 {
114 return m_state->get_queue().is_any_ready_schedulable();
115 }
116
117 void dispatch_if_ready() const
118 {
119 std::optional<run_loop_schedulable> fn{};
120 if (m_state->get_queue().pop_if_ready(fn))
121 (*fn)();
122 }
123
124 void dispatch() const
125 {
126 std::optional<run_loop_schedulable> fn{};
127 if (m_state->get_queue().pop_with_wait(fn))
128 (*fn)();
129 }
130
131private:
132 const std::shared_ptr<state> m_state{};
133};
134} // namespace rpp::schedulers
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
Definition: queue_worker_state.hpp:53
scheduler which schedules execution via queueing tasks, but execution of tasks should be manually dis...
Definition: run_loop_scheduler.hpp:30
Definition: worker.hpp:31
Definition: worker.hpp:60
bool is_subscribed() const
indicates current status of subscription
Definition: subscription_base.hpp:51
Definition: constraints.hpp:22
Definition: worker.hpp:23