ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
interval.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/sources/fwd.hpp>
13
14#include <rpp/defs.hpp>
15#include <rpp/observables/observable.hpp>
16
17namespace rpp::details
18{
20 {
21 rpp::schedulers::optional_delay_from_this_timepoint operator()(const auto& observer, rpp::schedulers::duration period, size_t& counter) const
22 {
23 observer.on_next(counter++);
24 return rpp::schedulers::optional_delay_from_this_timepoint{period};
25 }
26 };
27
28 template<typename TScheduler, typename TimePointOrDuration>
30 {
31 using value_type = size_t;
33
34 RPP_NO_UNIQUE_ADDRESS TScheduler scheduler;
35 TimePointOrDuration initial;
36 rpp::schedulers::duration period;
37
38 template<rpp::constraint::observer_of_type<value_type> TObs>
39 void subscribe(TObs&& observer) const
40 {
41 const auto worker = scheduler.create_worker();
42 worker.schedule(initial, interval_schedulable{}, std::forward<TObs>(observer), period, size_t{});
43 }
44 };
45} // namespace rpp::details
46
47namespace rpp
48{
49 template<schedulers::constraint::scheduler TScheduler, typename TimePointOrDuration>
51} // namespace rpp
52
53namespace rpp::source
54{
55
74 template<schedulers::constraint::scheduler TScheduler>
75 auto interval(rpp::schedulers::duration initial, rpp::schedulers::duration period, TScheduler&& scheduler)
76 {
77 return interval_observable<std::decay_t<TScheduler>, rpp::schedulers::duration>{std::forward<TScheduler>(scheduler), initial, period};
78 }
79
90 template<schedulers::constraint::scheduler TScheduler>
91 auto interval(rpp::schedulers::time_point initial, rpp::schedulers::duration period, TScheduler&& scheduler)
92 {
93 return interval_observable<std::decay_t<TScheduler>, rpp::schedulers::time_point>{std::forward<TScheduler>(scheduler), initial, period};
94 }
95
112 template<schedulers::constraint::scheduler TScheduler>
113 auto interval(rpp::schedulers::duration period, TScheduler&& scheduler)
114 {
115 return interval(period, period, std::forward<TScheduler>(scheduler));
116 }
117} // namespace rpp::source
void on_next(const Type &v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:84
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
auto interval(rpp::schedulers::duration initial, rpp::schedulers::duration period, TScheduler &&scheduler)
Creates rpp::observable that emits a sequential integer every specified time interval,...
Definition interval.hpp:72
Definition interval.hpp:20
Definition interval.hpp:30
Definition disposables_strategy.hpp:29