13#include <rpp/operators/lift.hpp>
14#include <rpp/operators/details/early_unsubscribe.hpp>
15#include <rpp/operators/details/serialized_subscriber.hpp>
16#include <rpp/operators/details/subscriber_with_state.hpp>
17#include <rpp/operators/fwd/debounce.hpp>
18#include <rpp/subscribers/constraints.hpp>
20#include <rpp/utils/spinlock.hpp>
25IMPLEMENTATION_FILE(debounce_tag);
29template<
typename T,
typename Scheduler>
36 , m_worker{scheduler.create_worker(children_subscriptions)} {}
38 std::optional<schedulers::time_point> emplace_safe(
auto&& v)
40 std::lock_guard lock{m_mutex};
41 m_value_to_be_emitted.emplace(std::forward<
decltype(v)>(v));
42 const bool need_to_scheduled = !m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value();
43 m_time_when_value_should_be_emitted = m_worker.now() + m_period;
44 return need_to_scheduled ? m_time_when_value_should_be_emitted : std::optional<schedulers::time_point>{};
47 std::variant<std::monostate, T, schedulers::duration> extract_value_or_time()
49 std::lock_guard lock{m_mutex};
50 if (!m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value())
51 return std::monostate{};
53 const auto now = m_worker.now();
54 if (m_time_when_value_should_be_emitted > now)
55 return m_time_when_value_should_be_emitted.value() - now;
57 m_time_when_value_should_be_emitted.reset();
58 auto v = std::move(m_value_to_be_emitted).value();
59 m_value_to_be_emitted.reset();
63 std::optional<T> extract_value()
65 std::lock_guard lock{m_mutex};
66 std::optional<T> res{};
67 m_value_to_be_emitted.swap(res);
71 using Worker =
decltype(std::declval<Scheduler>().create_worker(std::declval<composite_subscription>()));
72 const Worker& get_worker()
const {
return m_worker; }
75 schedulers::duration m_period;
78 std::optional<schedulers::time_point> m_time_when_value_should_be_emitted{};
79 std::optional<T> m_value_to_be_emitted{};
84 template<
typename Value>
85 void operator()(Value&& v,
const auto& state_ptr)
const
87 if (
const auto time_to_schedule = state_ptr->emplace_safe(std::forward<Value>(v)))
89 state_ptr->get_worker().schedule(time_to_schedule.value(),
90 [state_ptr]() mutable -> schedulers::optional_duration
92 auto value_or_duration = state_ptr->extract_value_or_time();
93 if (auto* duration = std::get_if<schedulers::duration>(&value_or_duration))
96 if (auto* value = std::get_if<std::decay_t<Value>>(&value_or_duration))
97 state_ptr->subscriber.on_next(std::move(*value));
107 void operator()(
const std::exception_ptr& err,
const auto& state)
const
109 state->children_subscriptions.unsubscribe();
110 state->subscriber.on_error(err);
116 void operator()(
const auto& state_ptr)
const
118 state_ptr->children_subscriptions.unsubscribe();
120 if (
auto v = state_ptr->extract_value())
121 state_ptr->subscriber.on_next(std::move(v.value()));
123 state_ptr->subscriber.on_completed();
127template<
typename T,
typename Scheduler,
typename TSub>
131 schedulers::duration period,
132 const Scheduler& scheduler)
134 , subscriber(make_serialized_subscriber(std::forward<
decltype(sub)>(sub), std::ref(spinlock))) {}
137 utils::spinlock spinlock{};
139 using InnerSub =
decltype(make_serialized_subscriber(std::declval<TSub>(), std::declval<std::reference_wrapper<utils::spinlock>>()));
143template<constra
int::decayed_type Type,schedulers::constra
int::scheduler TScheduler>
146 schedulers::duration period;
147 TScheduler scheduler;
149 template<constra
int::subscriber_of_type<Type> TSub>
150 auto operator()(TSub&& in_subscriber)
const
152 auto state = std::make_shared<debounce_state_with_serialized_spinlock<Type, TScheduler, std::decay_t<TSub>>>(std::forward<TSub>(in_subscriber), period, scheduler);
154 return create_subscriber_with_state<Type>(state->children_subscriptions,
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
Definition: debounce.hpp:31
Definition: debounce.hpp:145
Definition: debounce.hpp:115
Definition: debounce.hpp:106
Definition: debounce.hpp:83
Definition: debounce.hpp:129
Definition: early_unsubscribe.hpp:19