13#include <rpp/operators/fwd.hpp>
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/strategy.hpp>
18namespace rpp::operators::details
20 template<rpp::constra
int::observer TObserver, rpp::constra
int::decayed_type Accumulator>
23 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
24 using Seed = rpp::utils::extract_observer_type_t<TObserver>;
26 RPP_NO_UNIQUE_ADDRESS TObserver
observer;
27 RPP_NO_UNIQUE_ADDRESS
mutable Seed seed;
28 RPP_NO_UNIQUE_ADDRESS Accumulator accumulator;
31 void on_next(T&& v)
const
33 seed = accumulator(std::move(seed), std::forward<T>(v));
36 void on_error(
const std::exception_ptr& err)
const {
observer.
on_error(err); }
38 void on_completed()
const
49 template<rpp::constra
int::decayed_type Seed, rpp::constra
int::decayed_type Accumulator>
54 template<rpp::constra
int::decayed_type T>
57 static_assert(std::is_invocable_r_v<Seed, Accumulator, Seed&&, T>,
"Accumulator is not invocable with Seed&& abnd T returning Seed");
59 using result_type = Seed;
61 template<rpp::constra
int::observer_of_type<result_type> TObserver>
65 template<rpp::details::observables::constra
int::disposables_strategy Prev>
66 using updated_optimal_disposables_strategy = Prev;
69 template<rpp::constra
int::observer TObserver, rpp::constra
int::decayed_type Accumulator>
72 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
73 using Seed = rpp::utils::extract_observer_type_t<TObserver>;
75 RPP_NO_UNIQUE_ADDRESS TObserver
observer;
76 RPP_NO_UNIQUE_ADDRESS Accumulator accumulator;
77 mutable std::optional<Seed> seed{};
80 void on_next(T&& v)
const
83 seed = accumulator(std::move(seed).value(), std::forward<T>(v));
85 seed = std::forward<T>(v);
88 void on_error(
const std::exception_ptr& err)
const {
observer.
on_error(err); }
90 void on_completed()
const
102 template<rpp::constra
int::decayed_type Accumulator>
107 template<rpp::constra
int::decayed_type T>
110 static_assert(std::is_invocable_r_v<T, Accumulator, T&&, T>,
"Accumulator is not invocable with T&& abnd T returning T");
112 using result_type = T;
114 template<rpp::constra
int::observer_of_type<result_type> TObserver>
118 template<rpp::details::observables::constra
int::disposables_strategy Prev>
119 using updated_optimal_disposables_strategy = Prev;
123namespace rpp::operators
145 template<
typename Seed,
typename Accumulator>
147 auto reduce(Seed&& seed, Accumulator&& accumulator)
173 template<
typename Accumulator>
174 auto reduce(Accumulator&& accumulator)
176 return details::reduce_no_seed_t<std::decay_t<Accumulator>>{std::forward<Accumulator>(accumulator)};
void on_next(const Type &v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:84
void set_upstream(const disposable_wrapper &d) noexcept
Observable calls this method to pass disposable. Observer disposes this disposable WHEN observer want...
Definition observer.hpp:49
void on_completed() const noexcept
Observable calls this method to notify observer about completion of emissions.
Definition observer.hpp:135
void on_error(const std::exception_ptr &err) const noexcept
Observable calls this method to notify observer about some error during generation next data.
Definition observer.hpp:120
bool is_disposed() const noexcept
Observable calls this method to check if observer interested or not in emissions.
Definition observer.hpp:74
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition strategy.hpp:28
Definition function_traits.hpp:45
auto reduce(Seed &&seed, Accumulator &&accumulator)
Apply a function to each item emitted by an Observable, sequentially, and emit the final value.
Definition reduce.hpp:143
Definition reduce.hpp:109
Definition reduce.hpp:104