ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
debounce.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/disposables/composite_disposable.hpp>
16#include <rpp/operators/details/strategy.hpp>
17#include <rpp/utils/utils.hpp>
18
19namespace rpp::operators::details
20{
21 template<rpp::constraint::observer Observer, typename Worker, rpp::details::disposables::constraint::disposables_container Container>
23
24 template<rpp::constraint::observer Observer, typename Worker, rpp::details::disposables::constraint::disposables_container Container>
26 {
27 std::shared_ptr<debounce_disposable<Observer, Worker, Container>> disposable{};
28
29 bool is_disposed() const { return disposable->is_disposed(); }
30
31 void on_error(const std::exception_ptr& err) const { disposable->get_observer_under_lock()->on_error(err); }
32 };
33
34 template<rpp::constraint::observer Observer, typename Worker, rpp::details::disposables::constraint::disposables_container Container>
35 class debounce_disposable final : public rpp::composite_disposable_impl<Container>
36 , public rpp::details::enable_wrapper_from_this<debounce_disposable<Observer, Worker, Container>>
37 {
38 using T = rpp::utils::extract_observer_type_t<Observer>;
39
40 public:
41 debounce_disposable(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration period)
42 : m_observer(std::move(in_observer))
43 , m_worker{std::move(in_worker)}
44 , m_period{period}
45 {
46 }
47
48 template<typename TT>
49 void emplace_safe(TT&& v)
50 {
51 std::lock_guard lock{m_mutex};
52 m_value_to_be_emitted.emplace(std::forward<TT>(v));
53 const bool need_to_scheduled = !m_time_when_value_should_be_emitted.has_value();
54 m_time_when_value_should_be_emitted = m_worker.now() + m_period;
55 if (need_to_scheduled)
56 {
57 schedule();
58 }
59 }
60
61 std::optional<T> extract_value()
62 {
63 std::lock_guard lock{m_mutex};
64 return std::exchange(m_value_to_be_emitted, std::optional<T>{});
65 }
66
67 rpp::utils::pointer_under_lock<Observer> get_observer_under_lock() { return m_observer; }
68
69 private:
70 void schedule()
71 {
72 m_worker.schedule(
73 m_time_when_value_should_be_emitted.value(),
74 [](const debounce_disposable_wrapper<Observer, Worker, Container>& handler) -> schedulers::optional_delay_to {
75 auto value_or_duration = handler.disposable->extract_value_or_time();
76 if (auto* timepoint = std::get_if<schedulers::time_point>(&value_or_duration))
77 return schedulers::optional_delay_to{*timepoint};
78
79 if (auto* value = std::get_if<T>(&value_or_duration))
80 handler.disposable->get_observer_under_lock()->on_next(std::move(*value));
81
82 return std::nullopt;
83 },
84 debounce_disposable_wrapper<Observer, Worker, Container>{this->wrapper_from_this().lock()});
85 }
86
87 std::variant<std::monostate, T, schedulers::time_point> extract_value_or_time()
88 {
89 std::lock_guard lock{m_mutex};
90 if (!m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value())
91 return std::monostate{};
92
93 if (m_time_when_value_should_be_emitted > m_worker.now())
94 return m_time_when_value_should_be_emitted.value();
95
96 m_time_when_value_should_be_emitted.reset();
97 auto v = std::move(m_value_to_be_emitted).value();
98 m_value_to_be_emitted.reset();
99 return v;
100 }
101
103 RPP_NO_UNIQUE_ADDRESS Worker m_worker;
104 rpp::schedulers::duration m_period;
105
106 std::mutex m_mutex{};
107 std::optional<schedulers::time_point> m_time_when_value_should_be_emitted{};
108 std::optional<T> m_value_to_be_emitted{};
109 };
110
111 template<rpp::constraint::observer Observer, typename Worker, rpp::details::disposables::constraint::disposables_container Container>
113 {
114 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
115
116 std::shared_ptr<debounce_disposable<Observer, Worker, Container>> disposable{};
117
118 void set_upstream(const rpp::disposable_wrapper& d) const
119 {
120 disposable->add(d);
121 }
122
123 bool is_disposed() const
124 {
125 return disposable->is_disposed();
126 }
127
128 template<typename T>
129 void on_next(T&& v) const
130 {
131 disposable->emplace_safe(std::forward<T>(v));
132 }
133
134 void on_error(const std::exception_ptr& err) const noexcept
135 {
136 disposable->get_observer_under_lock()->on_error(err);
137 }
138
139 void on_completed() const noexcept
140 {
141 const auto observer = disposable->get_observer_under_lock();
142 if (const auto value = disposable->extract_value())
143 observer->on_next(std::move(value).value());
145 }
146 };
147
148 template<rpp::schedulers::constraint::scheduler Scheduler>
150 {
151 template<rpp::constraint::decayed_type T>
153 {
154 using result_type = T;
155 };
156
157 template<rpp::details::observables::constraint::disposables_strategy Prev>
158 using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>;
159
160 rpp::schedulers::duration duration;
161 RPP_NO_UNIQUE_ADDRESS Scheduler scheduler;
162
163 template<rpp::constraint::decayed_type Type, rpp::details::observables::constraint::disposables_strategy DisposableStrategy, rpp::constraint::observer Observer>
164 auto lift_with_disposables_strategy(Observer&& observer) const
165 {
166 using worker_t = rpp::schedulers::utils::get_worker_t<Scheduler>;
167 using container = typename DisposableStrategy::disposables_container;
168
169
170 const auto disposable = disposable_wrapper_impl<debounce_disposable<std::decay_t<Observer>, worker_t, container>>::make(std::forward<Observer>(observer), scheduler.create_worker(), duration);
171 auto ptr = disposable.lock();
172 ptr->get_observer_under_lock()->set_upstream(disposable.as_weak());
173 return rpp::observer<Type, debounce_observer_strategy<std::decay_t<Observer>, worker_t, container>>{std::move(ptr)};
174 }
175 };
176} // namespace rpp::operators::details
177
178namespace rpp::operators
179{
199 * @ingroup utility_operators
200 * @see https://reactivex.io/documentation/operators/debounce.html
201 */
202 template<rpp::schedulers::constraint::scheduler Scheduler>
203 auto debounce(rpp::schedulers::duration period, Scheduler&& scheduler)
204 {
205 return details::debounce_t<std::decay_t<Scheduler>>{period, std::forward<Scheduler>(scheduler)};
206 }
207} // namespace rpp::operators
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:31
Definition disposable_wrapper.hpp:252
void on_next(const Type &v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:84
void on_completed() const noexcept
Observable calls this method to notify observer about completion of emissions.
Definition observer.hpp:135
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition utils.hpp:260
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
auto debounce(rpp::schedulers::duration period, Scheduler &&scheduler)
Only emit emission if specified period of time has passed without any other emission....
Definition debounce.hpp:199
Definition disposables_strategy.hpp:29
Definition debounce.hpp:150