Aggregate operators are operators on the entire sequence of items provided by observable.  
More... 
 
template<typename ... Args>  
requires (is_header_included<concat_tag, Args...>&& rpp::constraint::observable <Type>)  
auto  observable::concat  () const & 
  Converts observable of observables of items into observable of items via merging emissions but without overlapping (current observable completes THEN next started to emit its values)    
   
template<constraint::observable_of_type< Type > ... TObservables>  
requires (is_header_included<concat_tag, TObservables...> && sizeof...(TObservables) >= 1)  
auto  observable::concat_with  (TObservables &&... observables) const & 
  Combines submissions from current observable with other observables into one but without overlapping (current observable completes THEN next started to emit its values)    
   
template<typename Seed , reduce_accumulator< Seed, Type > AccumulatorFn, std::invocable< Seed && > ResultSelectorFn = std::identity>  
requires is_header_included<reduce_tag, Seed, AccumulatorFn, ResultSelectorFn>  
auto  observable::reduce  (Seed &&initial_seed, AccumulatorFn &&accumulator, ResultSelectorFn &&result_selector={}) const & 
  Applies accumulator function to each emission from observable and result of accumulator from previous step and emits final value.    
   
template<typename CastBeforeDivide  = Type, typename ... Args>  
requires (is_header_included<reduce_tag, CastBeforeDivide, Args...> && is_can_be_averaged<Type, CastBeforeDivide>)  
auto  observable::average  () const & 
  Calculates the average of emissions and emits final value.    
   
template<typename ... Args>  
requires (is_header_included<reduce_tag, Args...> && is_can_be_summed<Type>)  
auto  observable::sum  () const & 
  Calculates the sum of emissions and emits final value.    
   
template<typename ... Args>  
requires is_header_included<reduce_tag, Args...>  
auto  observable::count  () const & 
  Calculates the amount of emitted emissions and emits this count.    
   
template<std::strict_weak_order< Type, Type > Comparator = std::less<Type>, typename ... Args>  
requires is_header_included<reduce_tag, Comparator, Args...>  
auto  observable::min  (Comparator &&comparator={}) const & 
  Emits the emission which has minimal value from the whole observable.    
   
template<std::strict_weak_order< Type, Type > Comparator = std::less<Type>, typename ... Args>  
requires is_header_included<reduce_tag, Comparator, Args...>  
auto  observable::max  (Comparator &&comparator={}) const & 
  Emits the emission which has maximal value from the whole observable.    
   
Aggregate operators are operators on the entire sequence of items provided by observable. 
template<constraint::decayed_type Type, typename SpecificObservable > 
template<typename CastBeforeDivide  = Type, typename ... Args>  
requires (is_header_included<reduce_tag, CastBeforeDivide, Args...> && is_can_be_averaged<Type, CastBeforeDivide>)
 
Calculates the average of emissions and emits final value. 
 
Template Parameters 
  
    CastBeforeDivide cast accumulated value to this type before division   
  
   
 
Returns new specific_observable  with the average operator as most recent operator.   
Exceptions 
  
    rpp::utils::not_enough_emissions in case of no any emissions from original observable  
  
   
 
Warning #include <rpp/operators/reduce.hpp >  
Example     rpp::source::just(1,2,3)
            .average()
            .subscribe([](int  v) { std::cout << v << std::endl; });
    
 
See also https://reactivex.io/documentation/operators/average.html   
 
 
template<constraint::decayed_type Type, typename SpecificObservable > 
 
Converts observable of observables of items into observable of items via merging emissions but without overlapping (current observable completes THEN next started to emit its values) 
 
Actually it subscribes on first observable from emissions. When first observable completes, then it subscribes on second observable from emissions and etc...
Returns new specific_observable  with the concat operator as most recent operator.   
Warning #include <rpp/operators/concat.hpp >  
Example     rpp::source::just(rpp::source::just(1).as_dynamic(),
                      rpp::source::never<int>().as_dynamic(),
                      rpp::source::just(2).as_dynamic())
            .concat()
            .subscribe([](int  v) { std::cout << v << " " ; });
    
 
Implementation details: 
On subscribe 
Allocates one shared_ptr to store observables (== emissions) and some internal variables 
Wraps subscriber with serialization logic to be sure callbacks called serialized 
 
 
OnNext for original observable 
If no any active observable, then subscribes on new obtained observable, else place it in queue 
 
 
OnError 
Just forwards original on_error 
 
 
OnCompleted from original observable 
Just forwards original on_completed if no any active observable (else we need to processa all observables from queue and they would emit on_completed for subscriber) 
 
 
OnCompleted from inner observable 
Subscribe on next observable from queue (if any) 
 
 
 
 
See also https://reactivex.io/documentation/operators/concat.html   
 
 
template<constraint::decayed_type Type, typename SpecificObservable > 
template<constraint::observable_of_type< Type > ... TObservables>  
requires (is_header_included<concat_tag, TObservables...> && sizeof...(TObservables) >= 1)
  
  
      
        
          auto observable ::concat_with  
          ( 
          TObservables &&...  
          observables ) 
           const & 
         
      
   
  
inline     
   
 
Combines submissions from current observable with other observables into one but without overlapping (current observable completes THEN next started to emit its values) 
 
Actually this operator subscribes on original observable. When original observable completes, then it subscribes on first observable from arguments and etc...
Returns new specific_observable  with the concat operator as most recent operator.   
Warning #include <rpp/operators/concat.hpp >  
Example     rpp::source::just(1)
            .concat_with(rpp::source::just(2), rpp::source::never<int>(), rpp::source::just(3))
            .subscribe([](int  v) { std::cout << v << " " ; });
    
 
Implementation details: 
On subscribe 
Allocates one shared_ptr to store observables (== emissions) and some internal variables 
Wraps subscriber with serialization logic to be sure callbacks called serialized 
 
 
OnNext 
 
OnError 
Just forwards original on_error 
 
 
OnCompleted from original observable 
Just forwards original on_completed if no any active observable (else we need to processa all observables from queue and they would emit on_completed for subscriber) 
 
 
OnCompleted from inner observable 
Subscribe on next observable from queue (if any) 
 
 
 
 
See also https://reactivex.io/documentation/operators/concat.html   
 
 
template<constraint::decayed_type Type, typename SpecificObservable > 
template<std::strict_weak_order< Type, Type > Comparator = std::less<Type>, typename ... Args>  
requires is_header_included<reduce_tag, Comparator, Args...>
  
  
      
        
          auto observable ::max  
          ( 
          Comparator &&  
          comparator  = {}) 
           const & 
         
      
   
  
inline     
   
 
Emits the emission which has maximal value from the whole observable. 
 
Parameters 
  
    comparator is function to deduce if left value is less than right   
  
   
 
Returns new specific_observable  with the max operator as most recent operator.   
Exceptions 
  
    rpp::utils::not_enough_emissions in case of no any emissions from original observable  
  
   
 
Warning #include <rpp/operators/reduce.hpp >  
Example     rpp::source::just(5,1,2,3)
            .max()
            .subscribe([](int  v) { std::cout << v << std::endl; });
    
 
See also https://reactivex.io/documentation/operators/max.html   
 
 
template<constraint::decayed_type Type, typename SpecificObservable > 
template<std::strict_weak_order< Type, Type > Comparator = std::less<Type>, typename ... Args>  
requires is_header_included<reduce_tag, Comparator, Args...>
  
  
      
        
          auto observable ::min  
          ( 
          Comparator &&  
          comparator  = {}) 
           const & 
         
      
   
  
inline     
   
 
Emits the emission which has minimal value from the whole observable. 
 
Parameters 
  
    comparator is function to deduce if left value is less than right   
  
   
 
Returns new specific_observable  with the min operator as most recent operator.   
Exceptions 
  
    rpp::utils::not_enough_emissions in case of no any emissions from original observable  
  
   
 
Warning #include <rpp/operators/reduce.hpp >  
Example     rpp::source::just(5,1,2,3)
            .min()
            .subscribe([](int  v) { std::cout << v << std::endl; });
    
 
See also https://reactivex.io/documentation/operators/min.html   
 
 
template<constraint::decayed_type Type, typename SpecificObservable > 
template<typename Seed , reduce_accumulator< Seed, Type > AccumulatorFn, std::invocable< Seed && > ResultSelectorFn = std::identity>  
requires is_header_included<reduce_tag, Seed, AccumulatorFn, ResultSelectorFn>
  
  
      
        
          auto observable ::reduce  
          ( 
          Seed &&  
          initial_seed ,  
         
        
           
           
          AccumulatorFn &&  
          accumulator ,  
         
        
           
           
          ResultSelectorFn &&  
          result_selector  = {}  
         
        
           
          ) 
           const & 
         
      
   
  
inline     
   
 
Applies accumulator function to each emission from observable and result of accumulator from previous step and emits final value. 
 
Actually this operator behaves like scan() + take_last(1), so, it just accumulates seed and emits it on_completed
Parameters 
  
    initial_seed initial value for seed which will be applied for first value from observable. Then it will be replaced with result and etc.   
    accumulator function which accepts seed value and new value from observable and return new value of seed. Can accept seed by move-reference.  
  
   
 
Returns new specific_observable  with the reduce operator as most recent operator.   
Warning #include <rpp/operators/reduce.hpp >  
Example     rpp::source::just(1,2,3)
            .reduce(0, std::plus<int>{})
            .subscribe([](int  v) { std::cout << v << std::endl; });
    
      rpp::source::just(1,2,3)
            .reduce(std::vector<int>{}, [](std::vector<int>&& seed, int  new_value)
            {
                seed.push_back(new_value);
                return  std::move(seed);
            })
            .subscribe([](const  std::vector<int>& v)
            {
                std::cout << "vector: " ;
                for (int  val : v)
                    std::cout << val << " " ;
                std::cout << std::endl;
            });
    
 
Implementation details: 
On subscribe 
Allocates one shared_ptr to store internal state 
 
 
OnNext 
Applies accumulator to each emission 
 
 
OnError 
Just forwards original on_error 
 
 
OnCompleted 
Emits accumulated seed via applyting result_selector 
 
 
 
 
See also https://reactivex.io/documentation/operators/reduce.html