13#include <rpp/observables/fwd.hpp>
14#include <rpp/schedulers/fwd.hpp>
15#include <rpp/subjects/fwd.hpp>
17#include <rpp/memory_model.hpp>
18#include <rpp/utils/constraints.hpp>
19#include <rpp/utils/utils.hpp>
21namespace rpp::operators
29 template<
typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
30 requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
31 auto combine_latest(TSelector&& selector, TObservable&& observable, TObservables&&... observables);
33 template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
34 auto combine_latest(TObservable&& observable, TObservables&&... observables);
36 template<rpp::schedulers::constra
int::scheduler Scheduler>
37 auto debounce(rpp::schedulers::duration period, Scheduler&& scheduler);
39 template<rpp::schedulers::constra
int::scheduler Scheduler>
40 auto delay(rpp::schedulers::duration delay_duration, Scheduler&& scheduler);
44 template<
typename EqualityFn = rpp::utils::equal_to>
45 requires (!utils::is_not_template_callable<EqualityFn> || std::same_as<bool, std::invoke_result_t<EqualityFn, rpp::utils::convertible_to_any, rpp::utils::convertible_to_any>>)
53 requires (!utils::is_not_template_callable<Fn> || std::same_as<bool, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
54 auto filter(Fn&& predicate);
56 template<rpp::constra
int::is_nothrow_invocable LastFn>
57 auto finally(LastFn&& lastFn);
60 requires (!utils::is_not_template_callable<Fn> || rpp::constraint::observable<std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
63 template<
typename KeySelector,
64 typename ValueSelector = std::identity,
65 typename KeyComparator = rpp::utils::less>
67 (!utils::is_not_template_callable<KeySelector> || !std::same_as<void, std::invoke_result_t<KeySelector, rpp::utils::convertible_to_any>>) && (!utils::is_not_template_callable<ValueSelector> || !std::same_as<void, std::invoke_result_t<ValueSelector, rpp::utils::convertible_to_any>>) && (!utils::is_not_template_callable<KeyComparator> || std::strict_weak_order<KeyComparator, rpp::utils::convertible_to_any, rpp::utils::convertible_to_any>))
68 auto group_by(KeySelector&& key_selector, ValueSelector&& value_selector = {}, KeyComparator&& comparator = {});
73 requires (!utils::is_not_template_callable<Fn> || !std::same_as<void, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
74 auto map(Fn&& callable);
76 template<rpp::constra
int::subject Subject>
79 template<
template<
typename>
typename Subject = rpp::subjects::publish_subject>
82 template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
83 requires constraint::observables_of_same_type<std::decay_t<TObservable>, std::decay_t<TObservables>...>
84 auto merge_with(TObservable&& observable, TObservables&&... observables);
87 template<rpp::schedulers::constra
int::scheduler Scheduler>
88 auto observe_on(Scheduler&& scheduler, rpp::schedulers::duration delay_duration = {});
92 template<
typename Seed,
typename Accumulator>
93 requires (!utils::is_not_template_callable<Accumulator> || std::same_as<std::decay_t<Seed>, std::invoke_result_t<Accumulator, std::decay_t<Seed> &&, rpp::utils::convertible_to_any>>)
94 auto reduce(Seed&& seed, Accumulator&& accumulator);
96 template<
typename Accumulator>
97 auto reduce(Accumulator&& accumulator);
101 auto repeat(
size_t count);
105 auto retry(
size_t count);
109 template<
typename InitialValue,
typename Fn>
110 requires (!utils::is_not_template_callable<Fn> || std::same_as<std::decay_t<InitialValue>, std::invoke_result_t<Fn, std::decay_t<InitialValue> &&, rpp::utils::convertible_to_any>>)
111 auto scan(InitialValue&& initial_value, Fn&& accumulator);
113 template<
typename Fn>
114 auto scan(Fn&& accumulator);
116 auto skip(
size_t count);
118 template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
119 requires constraint::observables_of_same_type<std::decay_t<TObservable>, std::decay_t<TObservables>...>
120 auto start_with(TObservable&& observable, TObservables&&... observables);
122 template<constraint::memory_model MemoryModel = memory_model::use_stack,
typename T,
typename... Ts>
123 requires ((rpp::constraint::decayed_same_as<T, Ts> && ...) && !(rpp::constraint::observable<T> || (rpp::constraint::observable<Ts> || ...)))
126 template<constraint::memory_model MemoryModel = memory_model::use_stack, rpp::schedulers::constraint::scheduler TScheduler,
typename T,
typename... Ts>
127 requires ((rpp::constraint::decayed_same_as<T, Ts> && ...) && !(rpp::constraint::observable<T> || (rpp::constraint::observable<Ts> || ...)))
128 auto start_with(
const TScheduler& scheduler, T&& v, Ts&&... vals);
130 template<constraint::memory_model MemoryModel = memory_model::use_stack,
typename T,
typename... Ts>
131 requires (rpp::constraint::decayed_same_as<T, Ts> && ...)
134 template<constraint::memory_model MemoryModel = memory_model::use_stack, rpp::schedulers::constraint::scheduler TScheduler,
typename T,
typename... Ts>
135 requires (rpp::constraint::decayed_same_as<T, Ts> && ...)
138 template<rpp::schedulers::constra
int::scheduler Scheduler>
143 auto take(
size_t count);
147 template<
typename Fn>
148 requires (!utils::is_not_template_callable<Fn> || std::same_as<bool, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
151 template<rpp::constra
int::observable TObservable>
152 auto take_until(TObservable&& until_observable);
154 template<std::invocable<const std::exception_ptr&> OnError = rpp::utils::empty_function_t<std::exception_ptr>>
155 requires utils::is_not_template_callable<OnError>
156 auto tap(OnError&& on_error);
158 template<std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
159 auto tap(OnCompleted&& on_completed);
161 template<
typename OnNext,
162 std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
163 auto tap(OnNext&& on_next,
164 OnCompleted&& on_completed);
166 template<
typename OnNext = rpp::utils::empty_function_any_t,
167 std::invocable<const std::exception_ptr&> OnError = rpp::utils::empty_function_t<std::exception_ptr>,
168 std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
169 requires utils::is_not_template_callable<OnError>
170 auto tap(OnNext&& on_next = {},
171 OnError&& on_error = {},
172 OnCompleted&& on_completed = {});
174 template<rpp::constra
int::observable TFallbackObservable, rpp::schedulers::constra
int::scheduler TScheduler>
175 auto timeout(rpp::schedulers::duration period, TFallbackObservable&& fallback_observable,
const TScheduler& scheduler);
177 template<rpp::schedulers::constra
int::scheduler TScheduler>
178 auto timeout(rpp::schedulers::duration period,
const TScheduler& scheduler);
180 template<rpp::schedulers::constra
int::scheduler Scheduler = rpp::schedulers::immediate>
181 auto throttle(rpp::schedulers::duration period);
183 template<
typename Selector>
184 requires rpp::constraint::observable<std::invoke_result_t<Selector, std::exception_ptr>>
187 template<
typename Notifier>
188 requires rpp::constraint::observable<std::invoke_result_t<Notifier, std::exception_ptr>>
189 auto retry_when(Notifier&& notifier);
191 template<
typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
192 requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
193 auto with_latest_from(TSelector&& selector, TObservable&& observable, TObservables&&... observables);
195 template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
196 auto with_latest_from(TObservable&& observable, TObservables&&... observables);
198 auto window(
size_t count);
200 template<rpp::constra
int::observable TOpeningsObservable,
typename TClosingsSelectorFn>
201 requires rpp::constraint::observable<std::invoke_result_t<TClosingsSelectorFn, rpp::utils::extract_observable_type_t<TOpeningsObservable>>>
202 auto window_toggle(TOpeningsObservable&& openings, TClosingsSelectorFn&& closings_selector);
204 template<
typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
205 requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
206 auto zip(TSelector&& selector, TObservable&& observable, TObservables&&... observables);
208 template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
209 auto zip(TObservable&& observable, TObservables&&... observables);
214 namespace ops = operators;
auto reduce(Seed &&seed, Accumulator &&accumulator)
Apply a function to each item emitted by an Observable, sequentially, and emit the final value.
Definition reduce.hpp:143
auto merge_with(TObservable &&observable, TObservables &&... observables)
Combines submissions from current observable with other observables into one.
Definition merge.hpp:252
auto merge()
Converts observable of observables of items into observable of items via merging emissions.
Definition merge.hpp:221
auto start_with_values(T &&v, Ts &&... vals)
Combines submissions from current observable with values into one but without overlapping and startin...
Definition start_with.hpp:112
auto switch_on_next()
Converts observable of observables into observable of values which emits values from most recent unde...
Definition switch_on_next.hpp:187
auto with_latest_from(TSelector &&selector, TObservable &&observable, TObservables &&... observables)
Combines latest emissions from observables with emission from current observable when it sends new va...
Definition with_latest_from.hpp:197
auto combine_latest(TSelector &&selector, TObservable &&observable, TObservables &&... observables)
Combines latest emissions from observables with emission from current observable when any observable ...
Definition combine_latest.hpp:98
auto start_with(TObservable &&observable, TObservables &&... observables)
Combines submissions from current observable with other observables into one but without overlapping ...
Definition start_with.hpp:85
auto zip(TSelector &&selector, TObservable &&observable, TObservables &&... observables)
combines emissions from observables and emit single items for each combination based on the results o...
Definition zip.hpp:98
auto take_while(Fn &&predicate)
Sends items from observable while items are satisfy predicate. When condition becomes false -> sends ...
Definition take_while.hpp:91
auto take_until(TObservable &&until_observable)
Discard any items emitted by an Observable after a second Observable emits an item or terminates.
Definition take_until.hpp:142
auto publish()
Converts ordinary observable to rpp::connectable_observable with help of inline instsantiated publish...
Definition publish.hpp:22
auto ref_count()
Forces rpp::connectable_observable to behave like common observable.
Definition ref_count.hpp:29
auto multicast()
Converts ordinary observable to rpp::connectable_observable with help of inline instsantiated subject...
Definition multicast.hpp:85
auto concat()
Make observable which would merge emissions from underlying observables but without overlapping (curr...
Definition concat.hpp:239
auto on_error_resume_next(Selector &&selector)
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition on_error_resume_next.hpp:157
auto retry()
The infinite retry operator continuously attempts to resubscribe to the observable upon error,...
Definition retry.hpp:167
auto element_at(size_t index)
Emit item located at specified index location in the sequence of items emitted by the source observab...
Definition element_at.hpp:97
auto filter(Fn &&predicate)
Emit only those items from an Observable that satisfies a provided predicate.
Definition filter.hpp:91
auto first()
Emit only the first item.
Definition first.hpp:87
auto last()
Emit only the last item provided before on_completed.
Definition last.hpp:95
auto distinct()
For each item from this observable, filter out repeated values and emit only items that have not alre...
Definition distinct.hpp:82
auto skip(size_t count)
Skip first count items provided by observable then send rest items as expected.
Definition skip.hpp:89
auto distinct_until_changed(EqualityFn &&equality_fn)
Suppress consecutive duplicates of emissions from original observable.
Definition distinct_until_changed.hpp:97
auto take_last(size_t count)
Emit only last count items provided by observable, then send on_completed
Definition take_last.hpp:119
auto throttle(rpp::schedulers::duration period)
Emit emission from an Observable and then ignore subsequent values during duration of time.
Definition throttle.hpp:98
auto take(size_t count)
Emit only first count items provided by observable, then send on_completed
Definition take.hpp:92
auto tap(OnError &&on_error)
Register callbacks to inspect observable emissions and perform side-effects.
Definition tap.hpp:94
auto observe_on(Scheduler &&scheduler, rpp::schedulers::duration delay_duration={})
Specify the Scheduler on which an observer will observe this Observable.
Definition observe_on.hpp:38
auto repeat()
Repeats the Observabe's sequence of emissions infinite amount of times via re-subscribing on it durin...
Definition repeat.hpp:86
auto debounce(rpp::schedulers::duration period, Scheduler &&scheduler)
Only emit emission if specified period of time has passed without any other emission....
Definition debounce.hpp:199
auto as_blocking()
Converts rpp::observable to rpp::blocking_observable
Definition as_blocking.hpp:47
auto timeout(rpp::schedulers::duration period, TFallbackObservable &&fallback_observable, const TScheduler &scheduler)
Forwards emissions from original observable, but subscribes on fallback observable if no any events d...
Definition timeout.hpp:203
auto subscribe_on(Scheduler &&scheduler)
OnSubscribe function for this observable will be scheduled via provided scheduler.
Definition subscribe_on.hpp:75
auto delay(rpp::schedulers::duration delay_duration, Scheduler &&scheduler)
Shift the emissions from an Observable forward in time by a particular amount.
Definition delay.hpp:219