Aggregate operators are operators on the entire sequence of items provided by observable.
More...
template<typename ... Args>
requires (is_header_included<concat_tag, Args...>&& rpp::constraint::observable <Type>)
auto observable::concat () const &
Converts observable of observables of items into observable of items via merging emissions but without overlapping (current observable completes THEN next started to emit its values)
template<constraint::observable_of_type< Type > ... TObservables>
requires (is_header_included<concat_tag, TObservables...> && sizeof...(TObservables) >= 1)
auto observable::concat_with (TObservables &&... observables) const &
Combines submissions from current observable with other observables into one but without overlapping (current observable completes THEN next started to emit its values)
template<typename Seed , reduce_accumulator< Seed, Type > AccumulatorFn, std::invocable< Seed && > ResultSelectorFn = std::identity>
requires is_header_included<reduce_tag, Seed, AccumulatorFn, ResultSelectorFn>
auto observable::reduce (Seed &&initial_seed, AccumulatorFn &&accumulator, ResultSelectorFn &&result_selector={}) const &
Applies accumulator function to each emission from observable and result of accumulator from previous step and emits final value.
template<typename CastBeforeDivide = Type, typename ... Args>
requires (is_header_included<reduce_tag, CastBeforeDivide, Args...> && is_can_be_averaged<Type, CastBeforeDivide>)
auto observable::average () const &
Calculates the average of emissions and emits final value.
template<typename ... Args>
requires (is_header_included<reduce_tag, Args...> && is_can_be_summed<Type>)
auto observable::sum () const &
Calculates the sum of emissions and emits final value.
template<typename ... Args>
requires is_header_included<reduce_tag, Args...>
auto observable::count () const &
Calculates the amount of emitted emissions and emits this count.
template<std::strict_weak_order< Type, Type > Comparator = std::less<Type>, typename ... Args>
requires is_header_included<reduce_tag, Comparator, Args...>
auto observable::min (Comparator &&comparator={}) const &
Emits the emission which has minimal value from the whole observable.
template<std::strict_weak_order< Type, Type > Comparator = std::less<Type>, typename ... Args>
requires is_header_included<reduce_tag, Comparator, Args...>
auto observable::max (Comparator &&comparator={}) const &
Emits the emission which has maximal value from the whole observable.
Aggregate operators are operators on the entire sequence of items provided by observable.
template<constraint::decayed_type Type, typename SpecificObservable >
template<typename CastBeforeDivide = Type, typename ... Args>
requires (is_header_included<reduce_tag, CastBeforeDivide, Args...> && is_can_be_averaged<Type, CastBeforeDivide>)
Calculates the average of emissions and emits final value.
Template Parameters
CastBeforeDivide cast accumulated value to this type before division
Returns new specific_observable with the average operator as most recent operator.
Exceptions
rpp::utils::not_enough_emissions in case of no any emissions from original observable
Warning #include <rpp/operators/reduce.hpp >
Example rpp::source::just(1,2,3)
.average()
.subscribe([](int v) { std::cout << v << std::endl; });
See also https://reactivex.io/documentation/operators/average.html
template<constraint::decayed_type Type, typename SpecificObservable >
Converts observable of observables of items into observable of items via merging emissions but without overlapping (current observable completes THEN next started to emit its values)
Actually it subscribes on first observable from emissions. When first observable completes, then it subscribes on second observable from emissions and etc...
Returns new specific_observable with the concat operator as most recent operator.
Warning #include <rpp/operators/concat.hpp >
Example rpp::source::just(rpp::source::just(1).as_dynamic(),
rpp::source::never<int>().as_dynamic(),
rpp::source::just(2).as_dynamic())
.concat()
.subscribe([](int v) { std::cout << v << " " ; });
Implementation details:
On subscribe
Allocates one shared_ptr
to store observables (== emissions) and some internal variables
Wraps subscriber with serialization logic to be sure callbacks called serialized
OnNext for original observable
If no any active observable, then subscribes on new obtained observable, else place it in queue
OnError
Just forwards original on_error
OnCompleted from original observable
Just forwards original on_completed if no any active observable (else we need to processa all observables from queue and they would emit on_completed for subscriber)
OnCompleted from inner observable
Subscribe on next observable from queue (if any)
See also https://reactivex.io/documentation/operators/concat.html
template<constraint::decayed_type Type, typename SpecificObservable >
template<constraint::observable_of_type< Type > ... TObservables>
requires (is_header_included<concat_tag, TObservables...> && sizeof...(TObservables) >= 1)
auto observable ::concat_with
(
TObservables &&...
observables )
const &
inline
Combines submissions from current observable with other observables into one but without overlapping (current observable completes THEN next started to emit its values)
Actually this operator subscribes on original observable. When original observable completes, then it subscribes on first observable from arguments and etc...
Returns new specific_observable with the concat operator as most recent operator.
Warning #include <rpp/operators/concat.hpp >
Example rpp::source::just(1)
.concat_with(rpp::source::just(2), rpp::source::never<int>(), rpp::source::just(3))
.subscribe([](int v) { std::cout << v << " " ; });
Implementation details:
On subscribe
Allocates one shared_ptr
to store observables (== emissions) and some internal variables
Wraps subscriber with serialization logic to be sure callbacks called serialized
OnNext
OnError
Just forwards original on_error
OnCompleted from original observable
Just forwards original on_completed if no any active observable (else we need to processa all observables from queue and they would emit on_completed for subscriber)
OnCompleted from inner observable
Subscribe on next observable from queue (if any)
See also https://reactivex.io/documentation/operators/concat.html
template<constraint::decayed_type Type, typename SpecificObservable >
template<std::strict_weak_order< Type, Type > Comparator = std::less<Type>, typename ... Args>
requires is_header_included<reduce_tag, Comparator, Args...>
auto observable ::max
(
Comparator &&
comparator = {}
)
const &
inline
Emits the emission which has maximal value from the whole observable.
Parameters
comparator is function to deduce if left value is less than right
Returns new specific_observable with the max operator as most recent operator.
Exceptions
rpp::utils::not_enough_emissions in case of no any emissions from original observable
Warning #include <rpp/operators/reduce.hpp >
Example rpp::source::just(5,1,2,3)
.max()
.subscribe([](int v) { std::cout << v << std::endl; });
See also https://reactivex.io/documentation/operators/max.html
template<constraint::decayed_type Type, typename SpecificObservable >
template<std::strict_weak_order< Type, Type > Comparator = std::less<Type>, typename ... Args>
requires is_header_included<reduce_tag, Comparator, Args...>
auto observable ::min
(
Comparator &&
comparator = {}
)
const &
inline
Emits the emission which has minimal value from the whole observable.
Parameters
comparator is function to deduce if left value is less than right
Returns new specific_observable with the min operator as most recent operator.
Exceptions
rpp::utils::not_enough_emissions in case of no any emissions from original observable
Warning #include <rpp/operators/reduce.hpp >
Example rpp::source::just(5,1,2,3)
.min()
.subscribe([](int v) { std::cout << v << std::endl; });
See also https://reactivex.io/documentation/operators/min.html
template<constraint::decayed_type Type, typename SpecificObservable >
template<typename Seed , reduce_accumulator< Seed, Type > AccumulatorFn, std::invocable< Seed && > ResultSelectorFn = std::identity>
requires is_header_included<reduce_tag, Seed, AccumulatorFn, ResultSelectorFn>
auto observable ::reduce
(
Seed &&
initial_seed ,
AccumulatorFn &&
accumulator ,
ResultSelectorFn &&
result_selector = {}
)
const &
inline
Applies accumulator function to each emission from observable and result of accumulator from previous step and emits final value.
Actually this operator behaves like scan()
+ take_last(1)
, so, it just accumulates seed
and emits it on_completed
Parameters
initial_seed initial value for seed which will be applied for first value from observable. Then it will be replaced with result and etc.
accumulator function which accepts seed value and new value from observable and return new value of seed. Can accept seed by move-reference.
Returns new specific_observable with the reduce operator as most recent operator.
Warning #include <rpp/operators/reduce.hpp >
Example rpp::source::just(1,2,3)
.reduce(0, std::plus<int>{})
.subscribe([](int v) { std::cout << v << std::endl; });
rpp::source::just(1,2,3)
.reduce(std::vector<int>{}, [](std::vector<int>&& seed, int new_value)
{
seed.push_back(new_value);
return std::move(seed);
})
.subscribe([](const std::vector<int>& v)
{
std::cout << "vector: " ;
for (int val : v)
std::cout << val << " " ;
std::cout << std::endl;
});
Implementation details:
On subscribe
Allocates one shared_ptr
to store internal state
OnNext
Applies accumulator to each emission
OnError
Just forwards original on_error
OnCompleted
Emits accumulated seed via applyting result_selector
See also https://reactivex.io/documentation/operators/reduce.html