ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
queue_worker_state.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/schedulers/fwd.hpp> // own forwarding
13#include <rpp/schedulers/constraints.hpp>
14#include <rpp/subscriptions/subscription_guard.hpp>
15
16#include <mutex>
17#include <condition_variable>
18#include <functional>
19#include <queue>
20
21namespace rpp::schedulers::details
22{
23template<typename SchedulableFn>
25{
26public:
27 schedulable(time_point time_point, size_t id, SchedulableFn&& fn)
28 : m_time_point{time_point}
29 , m_id{id}
30 , m_function{std::move(fn)} {}
31
32 schedulable(const schedulable& other) = default;
33 schedulable(schedulable&& other) noexcept = default;
34 schedulable& operator=(const schedulable& other) = default;
35 schedulable& operator=(schedulable&& other) noexcept = default;
36
37 bool operator<(const schedulable& other) const
38 {
39 return std::tie(m_time_point, m_id) >= std::tie(other.m_time_point, other.m_id);
40 }
41
42 time_point get_time_point() const { return m_time_point; }
43 SchedulableFn&& extract_function() const { return std::move(m_function); }
44
45private:
46 time_point m_time_point;
47 size_t m_id;
48 mutable SchedulableFn m_function;
49};
50
51template<typename SchedulableFn>
53{
54public:
55 queue_worker_state() = default;
58
59 void emplace(time_point time_point, constraint::inner_schedulable_fn auto&& fn)
60 {
61 emplace_safe(time_point, std::forward<decltype(fn)>(fn));
62 m_cv.notify_one();
63 }
64
65 bool is_empty() const
66 {
67 std::lock_guard lock{ m_mutex };
68 return m_queue.empty();
69 }
70
71 bool is_any_ready_schedulable() const
72 {
73 std::lock_guard lock{ m_mutex };
74 return is_any_ready_schedulable_unsafe();
75 }
76
77 bool pop_if_ready(std::optional<SchedulableFn>& out)
78 {
79 std::lock_guard lock{ m_mutex };
80 if (!is_any_ready_schedulable_unsafe())
81 return false;
82
83 out.emplace(std::move(m_queue.top().extract_function()));
84 m_queue.pop();
85 return true;
86 }
87
88 bool pop_with_wait(std::optional<SchedulableFn>& out)
89 {
90 while (m_subscription->is_subscribed())
91 {
92 std::unique_lock lock{m_mutex};
93
94 m_cv.wait(lock, [&] { return !m_subscription->is_subscribed() || !m_queue.empty(); });
95
96 if (m_queue.empty() || !m_cv.wait_until(lock,
97 m_queue.top().get_time_point(),
98 [&] { return !m_subscription->is_subscribed() || is_any_ready_schedulable_unsafe(); }))
99 continue;
100
101 if (!m_subscription->is_subscribed())
102 return false;
103
104 out.emplace(std::move(m_queue.top().extract_function()));
105 m_queue.pop();
106 return true;
107 }
108 return false;
109 }
110
111 bool is_subscribed() const
112 {
113 return m_subscription->is_subscribed();
114 }
115
116 void unsubscribe()
117 {
118 m_subscription->unsubscribe();
119 }
120
121private:
122 void emplace_safe(time_point time_point, constraint::inner_schedulable_fn auto&& fn)
123 {
124 std::lock_guard lock{m_mutex};
125 if (m_subscription->is_subscribed())
126 m_queue.emplace(time_point, ++m_current_id, std::forward<decltype(fn)>(fn));
127 }
128
129 bool is_any_ready_schedulable_unsafe() const
130 {
131 return !m_queue.empty() && m_queue.top().get_time_point() <= clock_type::now();
132 }
133
134private:
135 mutable std::mutex m_mutex{};
136 std::condition_variable_any m_cv{};
137 std::priority_queue<schedulable<SchedulableFn>> m_queue{};
138 size_t m_current_id{};
139 subscription_guard m_subscription = callback_subscription{[&]
140 {
141 {
142 std::lock_guard lock{m_mutex};
143 m_queue = std::priority_queue<schedulable<SchedulableFn>>{};
144 }
145 m_cv.notify_one();
146 }};
147};
148} // namespace rpp::schedulers::details
Subscription which invoke callbable during unsubscribe.
Definition: callback_subscription.hpp:25
Definition: queue_worker_state.hpp:53
Definition: queue_worker_state.hpp:25
bool is_subscribed() const
indicates current status of subscription
Definition: subscription_base.hpp:51
void unsubscribe() const
initiates unsubscription process (if subscribed)
Definition: subscription_base.hpp:59
guard over subscription to auto-unsubscribe during destructor
Definition: subscription_guard.hpp:21