ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
Aggregate Operators

Aggregate operators are operators on the entire sequence of items provided by observable. More...

Functions

template<typename ... Args>
requires (is_header_included<concat_tag, Args...>&& rpp::constraint::observable<Type>)
auto observable::concat () const &
 Converts observable of observables of items into observable of items via merging emissions but without overlapping (current observable completes THEN next started to emit its values)
 
template<constraint::observable_of_type< Type > ... TObservables>
requires (is_header_included<concat_tag, TObservables...> && sizeof...(TObservables) >= 1)
auto observable::concat_with (TObservables &&... observables) const &
 Combines submissions from current observable with other observables into one but without overlapping (current observable completes THEN next started to emit its values)
 
template<typename Seed , reduce_accumulator< Seed, Type > AccumulatorFn, std::invocable< Seed && > ResultSelectorFn = std::identity>
requires is_header_included<reduce_tag, Seed, AccumulatorFn, ResultSelectorFn>
auto observable::reduce (Seed &&initial_seed, AccumulatorFn &&accumulator, ResultSelectorFn &&result_selector={}) const &
 Applies accumulator function to each emission from observable and result of accumulator from previous step and emits final value.
 
template<typename CastBeforeDivide = Type, typename ... Args>
requires (is_header_included<reduce_tag, CastBeforeDivide, Args...> && is_can_be_averaged<Type, CastBeforeDivide>)
auto observable::average () const &
 Calculates the average of emissions and emits final value.
 
template<typename ... Args>
requires (is_header_included<reduce_tag, Args...> && is_can_be_summed<Type>)
auto observable::sum () const &
 Calculates the sum of emissions and emits final value.
 
template<typename ... Args>
requires is_header_included<reduce_tag, Args...>
auto observable::count () const &
 Calculates the amount of emitted emissions and emits this count.
 
template<std::strict_weak_order< Type, Type > Comparator = std::less<Type>, typename ... Args>
requires is_header_included<reduce_tag, Comparator, Args...>
auto observable::min (Comparator &&comparator={}) const &
 Emits the emission which has minimal value from the whole observable.
 
template<std::strict_weak_order< Type, Type > Comparator = std::less<Type>, typename ... Args>
requires is_header_included<reduce_tag, Comparator, Args...>
auto observable::max (Comparator &&comparator={}) const &
 Emits the emission which has maximal value from the whole observable.
 

Detailed Description

Aggregate operators are operators on the entire sequence of items provided by observable.

See also
https://reactivex.io/documentation/operators.html#mathematical

Function Documentation

◆ average()

template<constraint::decayed_type Type, typename SpecificObservable >
template<typename CastBeforeDivide = Type, typename ... Args>
requires (is_header_included<reduce_tag, CastBeforeDivide, Args...> && is_can_be_averaged<Type, CastBeforeDivide>)
auto observable::average ( ) const &
inline

Calculates the average of emissions and emits final value.

Template Parameters
CastBeforeDividecast accumulated value to this type before division
Returns
new specific_observable with the average operator as most recent operator.
Exceptions
rpp::utils::not_enough_emissionsin case of no any emissions from original observable
Warning
#include <rpp/operators/reduce.hpp>
Example
rpp::source::just(1,2,3)
.average()
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 2
See also
https://reactivex.io/documentation/operators/average.html

◆ concat()

template<constraint::decayed_type Type, typename SpecificObservable >
template<typename ... Args>
requires (is_header_included<concat_tag, Args...>&& rpp::constraint::observable<Type>)
auto observable::concat ( ) const &
inline

Converts observable of observables of items into observable of items via merging emissions but without overlapping (current observable completes THEN next started to emit its values)

Actually it subscribes on first observable from emissions. When first observable completes, then it subscribes on second observable from emissions and etc...

Returns
new specific_observable with the concat operator as most recent operator.
Warning
#include <rpp/operators/concat.hpp>
Example
rpp::source::just(rpp::source::just(1).as_dynamic(),
rpp::source::never<int>().as_dynamic(),
rpp::source::just(2).as_dynamic())
.concat()
.subscribe([](int v) { std::cout << v << " "; });
// Output: 1
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store observables (== emissions) and some internal variables
    • Wraps subscriber with serialization logic to be sure callbacks called serialized
  • OnNext for original observable
    • If no any active observable, then subscribes on new obtained observable, else place it in queue
  • OnError
    • Just forwards original on_error
  • OnCompleted from original observable
    • Just forwards original on_completed if no any active observable (else we need to processa all observables from queue and they would emit on_completed for subscriber)
  • OnCompleted from inner observable
    • Subscribe on next observable from queue (if any)
See also
https://reactivex.io/documentation/operators/concat.html

◆ concat_with()

template<constraint::decayed_type Type, typename SpecificObservable >
template<constraint::observable_of_type< Type > ... TObservables>
requires (is_header_included<concat_tag, TObservables...> && sizeof...(TObservables) >= 1)
auto observable::concat_with ( TObservables &&...  observables) const &
inline

Combines submissions from current observable with other observables into one but without overlapping (current observable completes THEN next started to emit its values)

Actually this operator subscribes on original observable. When original observable completes, then it subscribes on first observable from arguments and etc...

Returns
new specific_observable with the concat operator as most recent operator.
Warning
#include <rpp/operators/concat.hpp>
Example
rpp::source::just(1)
.concat_with(rpp::source::just(2), rpp::source::never<int>(), rpp::source::just(3))
.subscribe([](int v) { std::cout << v << " "; });
// Output: 1 2
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store observables (== emissions) and some internal variables
    • Wraps subscriber with serialization logic to be sure callbacks called serialized
  • OnNext
    • Just forwards on_next
  • OnError
    • Just forwards original on_error
  • OnCompleted from original observable
    • Just forwards original on_completed if no any active observable (else we need to processa all observables from queue and they would emit on_completed for subscriber)
  • OnCompleted from inner observable
    • Subscribe on next observable from queue (if any)
See also
https://reactivex.io/documentation/operators/concat.html

◆ count()

template<constraint::decayed_type Type, typename SpecificObservable >
template<typename ... Args>
requires is_header_included<reduce_tag, Args...>
auto observable::count ( ) const &
inline

Calculates the amount of emitted emissions and emits this count.

Returns
new specific_observable with the count operator as most recent operator.
Warning
#include <rpp/operators/reduce.hpp>
Example
rpp::source::just(1,2,3)
.count()
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 3
See also
https://reactivex.io/documentation/operators/count.html

◆ max()

template<constraint::decayed_type Type, typename SpecificObservable >
template<std::strict_weak_order< Type, Type > Comparator = std::less<Type>, typename ... Args>
requires is_header_included<reduce_tag, Comparator, Args...>
auto observable::max ( Comparator &&  comparator = {}) const &
inline

Emits the emission which has maximal value from the whole observable.

Parameters
comparatoris function to deduce if left value is less than right
Returns
new specific_observable with the max operator as most recent operator.
Exceptions
rpp::utils::not_enough_emissionsin case of no any emissions from original observable
Warning
#include <rpp/operators/reduce.hpp>
Example
rpp::source::just(5,1,2,3)
.max()
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 5
See also
https://reactivex.io/documentation/operators/max.html

◆ min()

template<constraint::decayed_type Type, typename SpecificObservable >
template<std::strict_weak_order< Type, Type > Comparator = std::less<Type>, typename ... Args>
requires is_header_included<reduce_tag, Comparator, Args...>
auto observable::min ( Comparator &&  comparator = {}) const &
inline

Emits the emission which has minimal value from the whole observable.

Parameters
comparatoris function to deduce if left value is less than right
Returns
new specific_observable with the min operator as most recent operator.
Exceptions
rpp::utils::not_enough_emissionsin case of no any emissions from original observable
Warning
#include <rpp/operators/reduce.hpp>
Example
rpp::source::just(5,1,2,3)
.min()
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 1
See also
https://reactivex.io/documentation/operators/min.html

◆ reduce()

template<constraint::decayed_type Type, typename SpecificObservable >
template<typename Seed , reduce_accumulator< Seed, Type > AccumulatorFn, std::invocable< Seed && > ResultSelectorFn = std::identity>
requires is_header_included<reduce_tag, Seed, AccumulatorFn, ResultSelectorFn>
auto observable::reduce ( Seed &&  initial_seed,
AccumulatorFn &&  accumulator,
ResultSelectorFn &&  result_selector = {} 
) const &
inline

Applies accumulator function to each emission from observable and result of accumulator from previous step and emits final value.

Actually this operator behaves like scan() + take_last(1), so, it just accumulates seed and emits it on_completed

Parameters
initial_seedinitial value for seed which will be applied for first value from observable. Then it will be replaced with result and etc.
accumulatorfunction which accepts seed value and new value from observable and return new value of seed. Can accept seed by move-reference.
Returns
new specific_observable with the reduce operator as most recent operator.
Warning
#include <rpp/operators/reduce.hpp>
Example
rpp::source::just(1,2,3)
.reduce(0, std::plus<int>{})
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 6
rpp::source::just(1,2,3)
.reduce(std::vector<int>{}, [](std::vector<int>&& seed, int new_value)
{
seed.push_back(new_value);
return std::move(seed);
})
.subscribe([](const std::vector<int>& v)
{
std::cout << "vector: ";
for(int val : v)
std::cout << val << " ";
std::cout << std::endl;
});
// Output: vector: 1 2 3
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store internal state
  • OnNext
    • Applies accumulator to each emission
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Emits accumulated seed via applyting result_selector
See also
https://reactivex.io/documentation/operators/reduce.html

◆ sum()

template<constraint::decayed_type Type, typename SpecificObservable >
template<typename ... Args>
requires (is_header_included<reduce_tag, Args...> && is_can_be_summed<Type>)
auto observable::sum ( ) const &
inline

Calculates the sum of emissions and emits final value.

Returns
new specific_observable with the sum operator as most recent operator.
Exceptions
rpp::utils::not_enough_emissionsin case of no any emissions from original observable
Warning
#include <rpp/operators/reduce.hpp>
Example
rpp::source::just(1,2,3)
.sum()
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 6
See also
https://reactivex.io/documentation/operators/sum.html