21 template<rpp::constra
int::observer Observer,
typename Worker, rpp::details::disposables::constra
int::disposables_container Container>
24 template<rpp::constra
int::observer Observer,
typename Worker, rpp::details::disposables::constra
int::disposables_container Container>
34 template<rpp::constra
int::observer Observer,
typename Worker, rpp::details::disposables::constra
int::disposables_container Container>
38 using T = rpp::utils::extract_observer_type_t<Observer>;
41 debounce_disposable(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration period)
42 : m_observer(std::move(in_observer))
43 , m_worker{std::move(in_worker)}
49 void emplace_safe(TT&& v)
51 std::lock_guard lock{m_mutex};
52 m_value_to_be_emitted.emplace(std::forward<TT>(v));
53 const bool need_to_scheduled = !m_time_when_value_should_be_emitted.has_value();
54 m_time_when_value_should_be_emitted = m_worker.now() + m_period;
55 if (need_to_scheduled)
61 std::optional<T> extract_value()
63 std::lock_guard lock{m_mutex};
64 return std::exchange(m_value_to_be_emitted, std::optional<T>{});
67 rpp::utils::pointer_under_lock<Observer> get_observer_under_lock() {
return m_observer; }
73 m_time_when_value_should_be_emitted.value(),
75 auto value_or_duration = handler.disposable->extract_value_or_time();
76 if (auto* timepoint = std::get_if<schedulers::time_point>(&value_or_duration))
77 return schedulers::optional_delay_to{*timepoint};
79 if (
auto* value = std::get_if<T>(&value_or_duration))
80 handler.disposable->get_observer_under_lock()->on_next(std::move(*value));
87 std::variant<std::monostate, T, schedulers::time_point> extract_value_or_time()
89 std::lock_guard lock{m_mutex};
90 if (!m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value())
91 return std::monostate{};
93 if (m_time_when_value_should_be_emitted > m_worker.now())
94 return m_time_when_value_should_be_emitted.value();
96 m_time_when_value_should_be_emitted.reset();
97 auto v = std::move(m_value_to_be_emitted).value();
98 m_value_to_be_emitted.reset();
103 RPP_NO_UNIQUE_ADDRESS Worker m_worker;
104 rpp::schedulers::duration m_period;
106 std::mutex m_mutex{};
107 std::optional<schedulers::time_point> m_time_when_value_should_be_emitted{};
108 std::optional<T> m_value_to_be_emitted{};
111 template<rpp::constra
int::observer Observer,
typename Worker, rpp::details::disposables::constra
int::disposables_container Container>
114 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
116 std::shared_ptr<debounce_disposable<Observer, Worker, Container>> disposable{};
123 bool is_disposed()
const
125 return disposable->is_disposed();
129 void on_next(T&& v)
const
131 disposable->emplace_safe(std::forward<T>(v));
134 void on_error(
const std::exception_ptr& err)
const noexcept
136 disposable->get_observer_under_lock()->on_error(err);
139 void on_completed()
const noexcept
141 const auto observer = disposable->get_observer_under_lock();
142 if (
const auto value = disposable->extract_value())
151 template<rpp::constra
int::decayed_type T>
154 using result_type = T;
157 template<rpp::details::observables::constra
int::disposables_strategy Prev>
160 rpp::schedulers::duration duration;
161 RPP_NO_UNIQUE_ADDRESS Scheduler scheduler;
163 template<rpp::constra
int::decayed_type Type, rpp::details::observables::constra
int::disposables_strategy DisposableStrategy, rpp::constra
int::observer Observer>
164 auto lift_with_disposables_strategy(Observer&&
observer)
const
166 using worker_t = rpp::schedulers::utils::get_worker_t<Scheduler>;
167 using container =
typename DisposableStrategy::disposables_container;
171 auto ptr = disposable.lock();
172 ptr->get_observer_under_lock()->set_upstream(disposable.as_weak());
auto debounce(rpp::schedulers::duration period, Scheduler &&scheduler)
Only emit emission if specified period of time has passed without any other emission....
Definition debounce.hpp:199