13#include <rpp/operators/fwd.hpp>
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/strategy.hpp>
18namespace rpp::operators::details
20 template<rpp::constra
int::observer TObserver, rpp::constra
int::decayed_type Accumulator>
23 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
24 using Seed = rpp::utils::extract_observer_type_t<TObserver>;
26 RPP_NO_UNIQUE_ADDRESS TObserver observer;
27 RPP_NO_UNIQUE_ADDRESS
mutable Seed seed;
28 RPP_NO_UNIQUE_ADDRESS Accumulator accumulator;
31 void on_next(T&& v)
const
33 seed = accumulator(std::move(seed), std::forward<T>(v));
36 void on_error(
const std::exception_ptr& err)
const { observer.on_error(err); }
38 void on_completed()
const
40 observer.on_next(std::move(seed));
41 observer.on_completed();
46 bool is_disposed()
const {
return observer.is_disposed(); }
49 template<rpp::constra
int::decayed_type Seed, rpp::constra
int::decayed_type Accumulator>
50 struct reduce_t : lift_operator<reduce_t<Seed, Accumulator>, Seed, Accumulator>
54 template<rpp::constra
int::decayed_type T>
57 static_assert(std::is_invocable_r_v<Seed, Accumulator, Seed&&, T>,
"Accumulator is not invocable with Seed&& abnd T returning Seed");
59 using result_type = Seed;
61 template<rpp::constra
int::observer_of_type<result_type> TObserver>
65 template<rpp::details::observables::constra
int::disposables_strategy Prev>
66 using updated_optimal_disposables_strategy = Prev;
69 template<rpp::constra
int::observer TObserver, rpp::constra
int::decayed_type Accumulator>
72 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
73 using Seed = rpp::utils::extract_observer_type_t<TObserver>;
75 RPP_NO_UNIQUE_ADDRESS TObserver observer;
76 RPP_NO_UNIQUE_ADDRESS Accumulator accumulator;
77 mutable std::optional<Seed> seed{};
80 void on_next(T&& v)
const
83 seed = accumulator(std::move(seed).value(), std::forward<T>(v));
85 seed = std::forward<T>(v);
88 void on_error(
const std::exception_ptr& err)
const { observer.on_error(err); }
90 void on_completed()
const
93 observer.on_next(std::move(seed).value());
94 observer.on_completed();
99 bool is_disposed()
const {
return observer.is_disposed(); }
102 template<rpp::constra
int::decayed_type Accumulator>
107 template<rpp::constra
int::decayed_type T>
110 static_assert(std::is_invocable_r_v<T, Accumulator, T&&, T>,
"Accumulator is not invocable with T&& abnd T returning T");
112 using result_type = T;
114 template<rpp::constra
int::observer_of_type<result_type> TObserver>
118 template<rpp::details::observables::constra
int::disposables_strategy Prev>
119 using updated_optimal_disposables_strategy = Prev;
123namespace rpp::operators
145 template<
typename Seed,
typename Accumulator>
147 auto reduce(Seed&& seed, Accumulator&& accumulator)
173 template<
typename Accumulator>
174 auto reduce(Accumulator&& accumulator)
Definition strategy.hpp:28
Definition function_traits.hpp:45
auto reduce(Seed &&seed, Accumulator &&accumulator)
Apply a function to each item emitted by an Observable, sequentially, and emit the final value.
Definition reduce.hpp:143
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition reduce.hpp:109
Definition reduce.hpp:104