ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
delay.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// TC Wang 2022 - present.
5// Distributed under the Boost Software License, Version 1.0.
6// (See accompanying file LICENSE_1_0.txt or copy at
7// https://www.boost.org/LICENSE_1_0.txt)
8//
9// Project home: https://github.com/victimsnino/ReactivePlusPlus
10//
11
12#pragma once
13
14#include <rpp/defs.hpp> // RPP_NO_UNIQUE_ADDRESS
15#include <rpp/operators/lift.hpp> // required due to operator uses lift
16#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
17#include <rpp/operators/fwd/delay.hpp> // own forwarding
18#include <rpp/subscribers/constraints.hpp> // constraint::subscriber_of_type
19#include <rpp/utils/overloaded.hpp>
20
21#include <variant>
22
23IMPLEMENTATION_FILE(delay_tag);
24
25namespace rpp::details
26{
27struct completion {};
28
29template<typename T, typename Subscriber, typename Worker>
30class queue_based_worker final : public std::enable_shared_from_this<queue_based_worker<T, Subscriber, Worker>>
31{
32public:
33 queue_based_worker(schedulers::duration delay, Worker&& worker, const Subscriber& subscriber)
34 : m_delay{delay}
35 , m_worker{std::move(worker)}
36 , m_subscriber{subscriber} {}
37
38 queue_based_worker(schedulers::duration delay, Worker&& worker, Subscriber&& subscriber)
39 : m_delay{delay}
40 , m_worker{std::move(worker)}
41 , m_subscriber{std::move(subscriber)} {}
42
43 struct on_next
44 {
45 void operator()(auto&& value, const std::shared_ptr<queue_based_worker<T, Subscriber, Worker>>& state) const
46 {
47 state->emplace(std::forward<decltype(value)>(value));
48 }
49 };
50
51 struct on_error
52 {
53 void operator()(const std::exception_ptr& err, const std::shared_ptr<queue_based_worker<T, Subscriber, Worker>>& state) const
54 {
55 state->emplace(err);
56 }
57 };
58
60 {
61 void operator()(const std::shared_ptr<queue_based_worker<T, Subscriber, Worker>>& state) const
62 {
63 state->emplace(completion{});
64 }
65 };
66
67private:
68 template<typename TT>
69 void emplace(TT&& item)
70 {
71 if (const auto timepoint = emplace_safe(std::forward<TT>(item)))
72 {
73 m_worker.schedule(timepoint.value(),
74 [state = this->shared_from_this()]()-> schedulers::optional_duration
75 {
76 return state->drain_queue();
77 });
78 }
79 }
80
81 template<typename TT>
82 std::optional<schedulers::time_point> emplace_safe(TT&& item)
83 {
84 std::lock_guard lock{m_mutex};
85 const auto delay = std::is_same_v<std::exception_ptr, std::decay_t<TT>> ? schedulers::duration{0} : m_delay;
86 m_queue.emplace(++m_current_id, m_worker.now()+delay, std::forward<TT>(item));
87 if (!m_active && m_queue.size() == 1)
88 {
89 m_active = true;
90 return m_queue.top().time;
91 }
92 return {};
93 }
94
95 schedulers::optional_duration drain_queue()
96 {
97 while (true)
98 {
99 std::unique_lock lock{m_mutex};
100 if (m_queue.empty())
101 {
102 m_active = false;
103 return {};
104 }
105
106 auto& top = m_queue.top();
107 const auto now = m_worker.now();
108 if (top.time > now)
109 return top.time - now;
110
111 auto item = std::move(top.item);
112 m_queue.pop();
113 lock.unlock();
114
115 std::visit(utils::overloaded
116 {
117 [&](T&& v) { m_subscriber.on_next(std::move(v)); },
118 [&](const std::exception_ptr& err) { m_subscriber.on_error(err); },
119 [&](completion) { m_subscriber.on_completed(); }
120 },
121 std::move(item));
122 }
123 }
124
125private:
126 struct emission
127 {
128 template<typename TT>
129 emission(size_t id, schedulers::time_point time, TT&& item)
130 : id{id}
131 , time{std::move(time)}
132 , item{std::forward<TT>(item)} {}
133
134 size_t id{};
135 schedulers::time_point time{};
136 std::variant<T, std::exception_ptr, completion> item{};
137
138 bool operator<(const emission& other) const { return std::tie(time, id) >= std::tie(other.time, other.id); }
139 };
140
141 schedulers::duration m_delay;
142 Worker m_worker;
143 Subscriber m_subscriber;
144 std::mutex m_mutex{};
145 size_t m_current_id{};
146 std::priority_queue<emission> m_queue{};
147 bool m_active{};
148};
149
150
151template<constraint::decayed_type Type, schedulers::constraint::scheduler TScheduler>
153{
154 RPP_NO_UNIQUE_ADDRESS TScheduler scheduler;
155 schedulers::duration delay;
156
157 template<constraint::subscriber_of_type<Type> TSub>
158 auto operator()(TSub&& subscriber) const
159 {
160 auto worker = scheduler.create_worker(subscriber.get_subscription());
161 auto subscription = subscriber.get_subscription().make_child();
162
163 using state_t = queue_based_worker<Type, std::decay_t<TSub>, std::decay_t<decltype(worker)>>;
164 auto state = std::make_shared<state_t>(delay, std::move(worker), std::forward<TSub>(subscriber));
165
166 return create_subscriber_with_state<Type>(std::move(subscription),
167 typename state_t::on_next{},
168 typename state_t::on_error{},
169 typename state_t::on_completed{},
170 std::move(state));
171 }
172};
173} // namespace rpp::details
Definition: delay.hpp:31
Definition: delay.hpp:27
Definition: delay.hpp:153