ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
fwd.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/observables/fwd.hpp>
14#include <rpp/schedulers/fwd.hpp>
15#include <rpp/subjects/fwd.hpp>
16
17#include <rpp/memory_model.hpp>
18#include <rpp/utils/constraints.hpp>
19#include <rpp/utils/utils.hpp>
20
21namespace rpp::operators
22{
23 auto as_blocking();
24
25 auto buffer(size_t count);
26
27 auto concat();
28
29 template<typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
30 requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
31 auto combine_latest(TSelector&& selector, TObservable&& observable, TObservables&&... observables);
32
33 template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
34 auto combine_latest(TObservable&& observable, TObservables&&... observables);
35
36 template<rpp::schedulers::constraint::scheduler Scheduler>
37 auto debounce(rpp::schedulers::duration period, Scheduler&& scheduler);
38
39 template<rpp::schedulers::constraint::scheduler Scheduler>
40 auto delay(rpp::schedulers::duration delay_duration, Scheduler&& scheduler);
41
42 auto distinct();
43
44 template<typename EqualityFn = rpp::utils::equal_to>
45 requires (!utils::is_not_template_callable<EqualityFn> || std::same_as<bool, std::invoke_result_t<EqualityFn, rpp::utils::convertible_to_any, rpp::utils::convertible_to_any>>)
46 auto distinct_until_changed(EqualityFn&& equality_fn = {});
47
48 auto element_at(size_t index);
49
50 auto first();
51
52 template<typename Fn>
53 requires (!utils::is_not_template_callable<Fn> || std::same_as<bool, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
54 auto filter(Fn&& predicate);
55
56 template<rpp::constraint::is_nothrow_invocable LastFn>
57 auto finally(LastFn&& lastFn);
58
59 template<typename Fn>
60 requires (!utils::is_not_template_callable<Fn> || rpp::constraint::observable<std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
61 auto flat_map(Fn&& callable);
62
63 template<typename KeySelector,
64 typename ValueSelector = std::identity,
65 typename KeyComparator = rpp::utils::less>
66 requires (
67 (!utils::is_not_template_callable<KeySelector> || !std::same_as<void, std::invoke_result_t<KeySelector, rpp::utils::convertible_to_any>>) && (!utils::is_not_template_callable<ValueSelector> || !std::same_as<void, std::invoke_result_t<ValueSelector, rpp::utils::convertible_to_any>>) && (!utils::is_not_template_callable<KeyComparator> || std::strict_weak_order<KeyComparator, rpp::utils::convertible_to_any, rpp::utils::convertible_to_any>))
68 auto group_by(KeySelector&& key_selector, ValueSelector&& value_selector = {}, KeyComparator&& comparator = {});
69
70 auto last();
71
72 template<typename Fn>
73 requires (!utils::is_not_template_callable<Fn> || !std::same_as<void, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
74 auto map(Fn&& callable);
75
76 template<rpp::constraint::subject Subject>
77 auto multicast(Subject&& subject);
78
79 template<template<typename> typename Subject = rpp::subjects::publish_subject>
80 auto multicast();
81
82 template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
83 requires constraint::observables_of_same_type<std::decay_t<TObservable>, std::decay_t<TObservables>...>
84 auto merge_with(TObservable&& observable, TObservables&&... observables);
85 auto merge();
86
87 template<rpp::schedulers::constraint::scheduler Scheduler>
88 auto observe_on(Scheduler&& scheduler, rpp::schedulers::duration delay_duration = {});
89
90 auto publish();
91
92 template<typename Seed, typename Accumulator>
93 requires (!utils::is_not_template_callable<Accumulator> || std::same_as<std::decay_t<Seed>, std::invoke_result_t<Accumulator, std::decay_t<Seed> &&, rpp::utils::convertible_to_any>>)
94 auto reduce(Seed&& seed, Accumulator&& accumulator);
95
96 template<typename Accumulator>
97 auto reduce(Accumulator&& accumulator);
98
99 auto ref_count();
100
101 auto repeat(size_t count);
102
103 auto repeat();
104
105 auto retry(size_t count);
106
107 auto retry();
108
109 template<typename InitialValue, typename Fn>
110 requires (!utils::is_not_template_callable<Fn> || std::same_as<std::decay_t<InitialValue>, std::invoke_result_t<Fn, std::decay_t<InitialValue> &&, rpp::utils::convertible_to_any>>)
111 auto scan(InitialValue&& initial_value, Fn&& accumulator);
112
113 template<typename Fn>
114 auto scan(Fn&& accumulator);
115
116 auto skip(size_t count);
117
118 template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
119 requires constraint::observables_of_same_type<std::decay_t<TObservable>, std::decay_t<TObservables>...>
120 auto start_with(TObservable&& observable, TObservables&&... observables);
121
122 template<constraint::memory_model MemoryModel = memory_model::use_stack, typename T, typename... Ts>
123 requires ((rpp::constraint::decayed_same_as<T, Ts> && ...) && !(rpp::constraint::observable<T> || (rpp::constraint::observable<Ts> || ...)))
124 auto start_with(T&& v, Ts&&... vals);
125
126 template<constraint::memory_model MemoryModel = memory_model::use_stack, rpp::schedulers::constraint::scheduler TScheduler, typename T, typename... Ts>
127 requires ((rpp::constraint::decayed_same_as<T, Ts> && ...) && !(rpp::constraint::observable<T> || (rpp::constraint::observable<Ts> || ...)))
128 auto start_with(const TScheduler& scheduler, T&& v, Ts&&... vals);
129
130 template<constraint::memory_model MemoryModel = memory_model::use_stack, typename T, typename... Ts>
131 requires (rpp::constraint::decayed_same_as<T, Ts> && ...)
132 auto start_with_values(T&& v, Ts&&... vals);
133
134 template<constraint::memory_model MemoryModel = memory_model::use_stack, rpp::schedulers::constraint::scheduler TScheduler, typename T, typename... Ts>
135 requires (rpp::constraint::decayed_same_as<T, Ts> && ...)
136 auto start_with_values(const TScheduler& scheduler, T&& v, Ts&&... vals);
137
138 template<rpp::schedulers::constraint::scheduler Scheduler>
139 auto subscribe_on(Scheduler&& scheduler);
140
141 auto switch_on_next();
142
143 auto take(size_t count);
144
145 auto take_last(size_t count);
146
147 template<typename Fn>
148 requires (!utils::is_not_template_callable<Fn> || std::same_as<bool, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
149 auto take_while(Fn&& predicate);
150
151 template<rpp::constraint::observable TObservable>
152 auto take_until(TObservable&& until_observable);
153
154 template<std::invocable<const std::exception_ptr&> OnError = rpp::utils::empty_function_t<std::exception_ptr>>
155 requires utils::is_not_template_callable<OnError>
156 auto tap(OnError&& on_error);
157
158 template<std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
159 auto tap(OnCompleted&& on_completed);
160
161 template<typename OnNext,
162 std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
163 auto tap(OnNext&& on_next,
164 OnCompleted&& on_completed);
165
166 template<typename OnNext = rpp::utils::empty_function_any_t,
167 std::invocable<const std::exception_ptr&> OnError = rpp::utils::empty_function_t<std::exception_ptr>,
168 std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
169 requires utils::is_not_template_callable<OnError>
170 auto tap(OnNext&& on_next = {},
171 OnError&& on_error = {},
172 OnCompleted&& on_completed = {});
173
174 template<rpp::constraint::observable TFallbackObservable, rpp::schedulers::constraint::scheduler TScheduler>
175 auto timeout(rpp::schedulers::duration period, TFallbackObservable&& fallback_observable, const TScheduler& scheduler);
176
177 template<rpp::schedulers::constraint::scheduler TScheduler>
178 auto timeout(rpp::schedulers::duration period, const TScheduler& scheduler);
179
180 template<rpp::schedulers::constraint::scheduler Scheduler = rpp::schedulers::immediate>
181 auto throttle(rpp::schedulers::duration period);
182
183 template<typename Selector>
184 requires rpp::constraint::observable<std::invoke_result_t<Selector, std::exception_ptr>>
185 auto on_error_resume_next(Selector&& selector);
186
187 template<typename Notifier>
188 requires rpp::constraint::observable<std::invoke_result_t<Notifier, std::exception_ptr>>
189 auto retry_when(Notifier&& notifier);
190
191 template<typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
192 requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
193 auto with_latest_from(TSelector&& selector, TObservable&& observable, TObservables&&... observables);
194
195 template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
196 auto with_latest_from(TObservable&& observable, TObservables&&... observables);
197
198 auto window(size_t count);
199
200 template<rpp::constraint::observable TOpeningsObservable, typename TClosingsSelectorFn>
201 requires rpp::constraint::observable<std::invoke_result_t<TClosingsSelectorFn, rpp::utils::extract_observable_type_t<TOpeningsObservable>>>
202 auto window_toggle(TOpeningsObservable&& openings, TClosingsSelectorFn&& closings_selector);
203
204 template<typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
205 requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
206 auto zip(TSelector&& selector, TObservable&& observable, TObservables&&... observables);
207
208 template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
209 auto zip(TObservable&& observable, TObservables&&... observables);
210} // namespace rpp::operators
211
212namespace rpp
213{
214 namespace ops = operators;
215} // namespace rpp
auto reduce(Seed &&seed, Accumulator &&accumulator)
Apply a function to each item emitted by an Observable, sequentially, and emit the final value.
Definition reduce.hpp:143
auto merge_with(TObservable &&observable, TObservables &&... observables)
Combines submissions from current observable with other observables into one.
Definition merge.hpp:252
auto merge()
Converts observable of observables of items into observable of items via merging emissions.
Definition merge.hpp:221
auto start_with_values(T &&v, Ts &&... vals)
Combines submissions from current observable with values into one but without overlapping and startin...
Definition start_with.hpp:112
auto switch_on_next()
Converts observable of observables into observable of values which emits values from most recent unde...
Definition switch_on_next.hpp:187
auto with_latest_from(TSelector &&selector, TObservable &&observable, TObservables &&... observables)
Combines latest emissions from observables with emission from current observable when it sends new va...
Definition with_latest_from.hpp:197
auto combine_latest(TSelector &&selector, TObservable &&observable, TObservables &&... observables)
Combines latest emissions from observables with emission from current observable when any observable ...
Definition combine_latest.hpp:98
auto start_with(TObservable &&observable, TObservables &&... observables)
Combines submissions from current observable with other observables into one but without overlapping ...
Definition start_with.hpp:85
auto zip(TSelector &&selector, TObservable &&observable, TObservables &&... observables)
combines emissions from observables and emit single items for each combination based on the results o...
Definition zip.hpp:98
auto take_while(Fn &&predicate)
Sends items from observable while items are satisfy predicate. When condition becomes false -> sends ...
Definition take_while.hpp:91
auto take_until(TObservable &&until_observable)
Discard any items emitted by an Observable after a second Observable emits an item or terminates.
Definition take_until.hpp:142
auto publish()
Converts ordinary observable to rpp::connectable_observable with help of inline instsantiated publish...
Definition publish.hpp:22
auto ref_count()
Forces rpp::connectable_observable to behave like common observable.
Definition ref_count.hpp:29
auto multicast()
Converts ordinary observable to rpp::connectable_observable with help of inline instsantiated subject...
Definition multicast.hpp:85
auto concat()
Make observable which would merge emissions from underlying observables but without overlapping (curr...
Definition concat.hpp:239
auto on_error_resume_next(Selector &&selector)
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition on_error_resume_next.hpp:157
auto retry()
The infinite retry operator continuously attempts to resubscribe to the observable upon error,...
Definition retry.hpp:167
auto element_at(size_t index)
Emit item located at specified index location in the sequence of items emitted by the source observab...
Definition element_at.hpp:97
auto filter(Fn &&predicate)
Emit only those items from an Observable that satisfies a provided predicate.
Definition filter.hpp:91
auto first()
Emit only the first item.
Definition first.hpp:87
auto last()
Emit only the last item provided before on_completed.
Definition last.hpp:95
auto distinct()
For each item from this observable, filter out repeated values and emit only items that have not alre...
Definition distinct.hpp:82
auto skip(size_t count)
Skip first count items provided by observable then send rest items as expected.
Definition skip.hpp:89
auto distinct_until_changed(EqualityFn &&equality_fn)
Suppress consecutive duplicates of emissions from original observable.
Definition distinct_until_changed.hpp:97
auto take_last(size_t count)
Emit only last count items provided by observable, then send on_completed
Definition take_last.hpp:119
auto throttle(rpp::schedulers::duration period)
Emit emission from an Observable and then ignore subsequent values during duration of time.
Definition throttle.hpp:98
auto take(size_t count)
Emit only first count items provided by observable, then send on_completed
Definition take.hpp:92
auto map(Fn &&callable)
Transforms the items emitted by an Observable via applying a function to each item and emitting resul...
Definition map.hpp:94
auto group_by(KeySelector &&key_selector, ValueSelector &&value_selector={}, KeyComparator &&comparator={})
Divide original observable into multiple observables where each new observable emits some group of va...
Definition group_by.hpp:213
auto flat_map(Fn &&callable)
Transform the items emitted by an Observable into Observables, then flatten the emissions from those ...
Definition flat_map.hpp:64
auto buffer(size_t count)
Periodically gather emissions emitted by an original Observable into bundles and emit these bundles r...
Definition buffer.hpp:107
auto window(size_t count)
Subdivide original observable into sub-observables (window observables) and emit sub-observables of i...
Definition window.hpp:150
auto scan(InitialValue &&initial_value, Fn &&accumulator)
Apply accumulator function for each emission from observable and result of accumulator from previous ...
Definition scan.hpp:151
auto window_toggle(TOpeningsObservable &&openings, TClosingsSelectorFn &&closings_selector)
Subdivide original observable into sub-observables (window observables) and emit sub-observables of i...
Definition window_toggle.hpp:236
auto tap(OnError &&on_error)
Register callbacks to inspect observable emissions and perform side-effects.
Definition tap.hpp:94
auto observe_on(Scheduler &&scheduler, rpp::schedulers::duration delay_duration={})
Specify the Scheduler on which an observer will observe this Observable.
Definition observe_on.hpp:38
auto repeat()
Repeats the Observabe's sequence of emissions infinite amount of times via re-subscribing on it durin...
Definition repeat.hpp:86
auto debounce(rpp::schedulers::duration period, Scheduler &&scheduler)
Only emit emission if specified period of time has passed without any other emission....
Definition debounce.hpp:199
auto as_blocking()
Converts rpp::observable to rpp::blocking_observable
Definition as_blocking.hpp:47
auto timeout(rpp::schedulers::duration period, TFallbackObservable &&fallback_observable, const TScheduler &scheduler)
Forwards emissions from original observable, but subscribes on fallback observable if no any events d...
Definition timeout.hpp:203
auto subscribe_on(Scheduler &&scheduler)
OnSubscribe function for this observable will be scheduled via provided scheduler.
Definition subscribe_on.hpp:75
auto delay(rpp::schedulers::duration delay_duration, Scheduler &&scheduler)
Shift the emissions from an Observable forward in time by a particular amount.
Definition delay.hpp:219