ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
debounce.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/lift.hpp> // required due to operator uses lift
14#include <rpp/operators/details/early_unsubscribe.hpp>
15#include <rpp/operators/details/serialized_subscriber.hpp>
16#include <rpp/operators/details/subscriber_with_state.hpp>
17#include <rpp/operators/fwd/debounce.hpp>
18#include <rpp/subscribers/constraints.hpp>
19
20#include <rpp/utils/spinlock.hpp>
21
22#include <type_traits>
23#include <variant>
24
25IMPLEMENTATION_FILE(debounce_tag);
26
27namespace rpp::details
28{
29template<typename T, typename Scheduler>
31{
32public:
33 debounce_state(schedulers::duration period, const Scheduler& scheduler, const composite_subscription& subscription_of_subscriber)
34 : early_unsubscribe_state(subscription_of_subscriber)
35 , m_period{period}
36 , m_worker{scheduler.create_worker(children_subscriptions)} {}
37
38 std::optional<schedulers::time_point> emplace_safe(auto&& v)
39 {
40 std::lock_guard lock{m_mutex};
41 m_value_to_be_emitted.emplace(std::forward<decltype(v)>(v));
42 const bool need_to_scheduled = !m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value();
43 m_time_when_value_should_be_emitted = m_worker.now() + m_period;
44 return need_to_scheduled ? m_time_when_value_should_be_emitted : std::optional<schedulers::time_point>{};
45 }
46
47 std::variant<std::monostate, T, schedulers::duration> extract_value_or_time()
48 {
49 std::lock_guard lock{m_mutex};
50 if (!m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value())
51 return std::monostate{};
52
53 const auto now = m_worker.now();
54 if (m_time_when_value_should_be_emitted > now)
55 return m_time_when_value_should_be_emitted.value() - now;
56
57 m_time_when_value_should_be_emitted.reset();
58 auto v = std::move(m_value_to_be_emitted).value();
59 m_value_to_be_emitted.reset();
60 return v;
61 }
62
63 std::optional<T> extract_value()
64 {
65 std::lock_guard lock{m_mutex};
66 std::optional<T> res{};
67 m_value_to_be_emitted.swap(res);
68 return res;
69 }
70
71 using Worker = decltype(std::declval<Scheduler>().create_worker(std::declval<composite_subscription>()));
72 const Worker& get_worker() const { return m_worker; }
73
74private:
75 schedulers::duration m_period;
76 Worker m_worker;
77 std::mutex m_mutex{};
78 std::optional<schedulers::time_point> m_time_when_value_should_be_emitted{};
79 std::optional<T> m_value_to_be_emitted{};
80};
81
83{
84 template<typename Value>
85 void operator()(Value&& v, const auto& state_ptr) const
86 {
87 if (const auto time_to_schedule = state_ptr->emplace_safe(std::forward<Value>(v)))
88 {
89 state_ptr->get_worker().schedule(time_to_schedule.value(),
90 [state_ptr]() mutable -> schedulers::optional_duration
91 {
92 auto value_or_duration = state_ptr->extract_value_or_time();
93 if (auto* duration = std::get_if<schedulers::duration>(&value_or_duration))
94 return *duration;
95
96 if (auto* value = std::get_if<std::decay_t<Value>>(&value_or_duration))
97 state_ptr->subscriber.on_next(std::move(*value));
98
99 return std::nullopt;
100 });
101 }
102 }
103};
104
106{
107 void operator()(const std::exception_ptr& err, const auto& state) const
108 {
109 state->children_subscriptions.unsubscribe();
110 state->subscriber.on_error(err);
111 }
112};
113
115{
116 void operator()(const auto& state_ptr) const
117 {
118 state_ptr->children_subscriptions.unsubscribe();
119
120 if (auto v = state_ptr->extract_value())
121 state_ptr->subscriber.on_next(std::move(v.value()));
122
123 state_ptr->subscriber.on_completed();
124 }
125};
126
127template<typename T, typename Scheduler, typename TSub>
129{
131 schedulers::duration period,
132 const Scheduler& scheduler)
133 : debounce_state<T, Scheduler>{std::move(period), scheduler, sub.get_subscription()}
134 , subscriber(make_serialized_subscriber(std::forward<decltype(sub)>(sub), std::ref(spinlock))) {}
135
136 // spinlock because most part of time there is only one thread would be active
137 utils::spinlock spinlock{};
138
139 using InnerSub = decltype(make_serialized_subscriber(std::declval<TSub>(), std::declval<std::reference_wrapper<utils::spinlock>>()));
140 InnerSub subscriber;
141};
142
143template<constraint::decayed_type Type,schedulers::constraint::scheduler TScheduler>
145{
146 schedulers::duration period;
147 TScheduler scheduler;
148
149 template<constraint::subscriber_of_type<Type> TSub>
150 auto operator()(TSub&& in_subscriber) const
151 {
152 auto state = std::make_shared<debounce_state_with_serialized_spinlock<Type, TScheduler, std::decay_t<TSub>>>(std::forward<TSub>(in_subscriber), period, scheduler);
153
154 return create_subscriber_with_state<Type>(state->children_subscriptions,
158 std::move(state));
159 }
160};
161} // namespace rpp::details
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
Definition: debounce.hpp:31
Definition: debounce.hpp:145
Definition: debounce.hpp:115
Definition: debounce.hpp:106
Definition: debounce.hpp:83
Definition: early_unsubscribe.hpp:19