ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
throttle.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/strategy.hpp>
17#include <rpp/schedulers/immediate.hpp>
18
19#include <optional>
20#include <type_traits>
21
22namespace rpp::operators::details
23{
24 template<rpp::constraint::observer TObserver, rpp::schedulers::constraint::scheduler Scheduler>
26 {
27 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
28
29 RPP_NO_UNIQUE_ADDRESS TObserver observer;
30 rpp::schedulers::duration duration{};
31 mutable std::optional<rpp::schedulers::time_point> last_emission_time_point{};
32
33 template<typename T>
34 void on_next(T&& v) const
35 {
36 const auto now = rpp::schedulers::utils::get_worker_t<Scheduler>::now();
37 if (!last_emission_time_point || now >= last_emission_time_point.value() + duration)
38 {
39 observer.on_next(std::forward<T>(v));
40 last_emission_time_point = now;
41 }
42 }
43
44 void on_error(const std::exception_ptr& err) const { observer.on_error(err); }
45
46 void on_completed() const { observer.on_completed(); }
47
48 void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); }
49
50 bool is_disposed() const { return observer.is_disposed(); }
51 };
52
53 template<rpp::schedulers::constraint::scheduler Scheduler>
54 struct throttle_t : lift_operator<throttle_t<Scheduler>, rpp::schedulers::duration>
55 {
56 using lift_operator<throttle_t<Scheduler>, rpp::schedulers::duration>::lift_operator;
57
58 template<rpp::constraint::decayed_type T>
60 {
61 using result_type = T;
62
63 template<rpp::constraint::observer_of_type<result_type> TObserver>
65 };
66
67 template<rpp::details::observables::constraint::disposables_strategy Prev>
68 using updated_optimal_disposables_strategy = Prev;
69 };
70} // namespace rpp::operators::details
71
72namespace rpp::operators
73{
101 template<rpp::schedulers::constraint::scheduler Scheduler /* = rpp::schedulers::immediate*/>
102 auto throttle(rpp::schedulers::duration period)
103 {
105 }
106} // namespace rpp::operators
void on_next(const Type &v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:84
void set_upstream(const disposable_wrapper &d) noexcept
Observable calls this method to pass disposable. Observer disposes this disposable WHEN observer want...
Definition observer.hpp:49
void on_completed() const noexcept
Observable calls this method to notify observer about completion of emissions.
Definition observer.hpp:135
void on_error(const std::exception_ptr &err) const noexcept
Observable calls this method to notify observer about some error during generation next data.
Definition observer.hpp:120
bool is_disposed() const noexcept
Observable calls this method to check if observer interested or not in emissions.
Definition observer.hpp:74
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition strategy.hpp:28
auto throttle(rpp::schedulers::duration period)
Emit emission from an Observable and then ignore subsequent values during duration of time.
Definition throttle.hpp:98
Definition throttle.hpp:55