13#include <rpp/defs.hpp>
14#include <rpp/observables/constraints.hpp>
15#include <rpp/operators/lift.hpp>
16#include <rpp/operators/details/subscriber_with_state.hpp>
17#include <rpp/operators/fwd/reduce.hpp>
18#include <rpp/subscribers/constraints.hpp>
19#include <rpp/utils/functors.hpp>
20#include <rpp/utils/utilities.hpp>
23IMPLEMENTATION_FILE(reduce_tag);
27template<constra
int::decayed_type Seed,
typename AccumulatorFn, std::invocable<Seed&&> SelectorFn = std::
identity>
31 RPP_NO_UNIQUE_ADDRESS AccumulatorFn accumulator;
32 RPP_NO_UNIQUE_ADDRESS SelectorFn selector{};
37 template<constra
int::decayed_type Result,
typename AccumulatorFn,
typename SelectorFn>
38 void operator()(
auto&& value,
42 state.seed = state.accumulator(std::move(state.seed), std::forward<
decltype(value)>(value));
48 template<constra
int::decayed_type Result,
typename AccumulatorFn,
typename SelectorFn>
54 sub.on_next(state.selector(std::move(state.seed)));
58 sub.on_error(std::current_exception());
65template<constra
int::decayed_type Type, constra
int::decayed_type Seed, reduce_accumulator<Seed, Type> AccumulatorFn, std::invocable<Seed&&> ResultSelectorFn>
69 RPP_NO_UNIQUE_ADDRESS AccumulatorFn accumulator;
70 RPP_NO_UNIQUE_ADDRESS ResultSelectorFn selector;
72 template<constra
int::subscriber_of_type<utils::decayed_invoke_result_t<ResultSelectorFn, Seed>> TSub>
73 auto operator()(TSub&& subscriber)
const
75 auto subscription = subscriber.get_subscription();
77 return create_subscriber_with_dynamic_state<Type>(std::move(subscription),
79 utils::forwarding_on_error{},
81 std::forward<TSub>(subscriber),
86template<constra
int::decayed_type CastBeforeDiv
ide, constra
int::observable TObs>
87auto average_impl(TObs&& observable)
89 using Type = utils::extract_observable_type_t<std::decay_t<TObs>>;
90 using Pair = std::pair<std::optional<Type>, int32_t>;
91 return std::forward<TObs>(observable).reduce(Pair{},
92 [](Pair&& seed,
auto&& val)
95 seed.first.value() += std::forward<decltype(val)>(val);
97 seed.first = std::forward<decltype(val)>(val);
99 return std::move(seed);
104 throw utils::not_enough_emissions{
"`average` operator requires at least one emission to calculate average"};
106 return static_cast<CastBeforeDivide
>(std::move(seed.first).value()) / seed.second;
110template<constra
int::observable TObs>
111auto sum_impl(TObs&& observable)
113 using Type = utils::extract_observable_type_t<std::decay_t<TObs>>;
114 return std::forward<TObs>(observable).reduce(std::optional<Type>{},
115 [](std::optional<Type>&& seed,
auto&& val)
118 seed = std::forward<decltype(val)>(val);
120 seed.value() += std::forward<decltype(val)>(val);
121 return std::move(seed);
123 [](std::optional<Type>&& seed)
126 throw utils::not_enough_emissions{
"`sum` operator requires at least one emission to calculate sum"};
128 return std::move(seed.value());
132template<constra
int::observable TObs>
133auto count_impl(TObs&& observable)
135 return std::forward<TObs>(observable).reduce(
size_t{}, [](
size_t seed,
auto&&) {
return ++seed; });
138template<constra
int::observable TObs,
typename Comparator>
139auto min_impl(TObs&& observable, Comparator&& comparator)
141 using Type = utils::extract_observable_type_t<std::decay_t<TObs>>;
142 return std::forward<TObs>(observable).reduce(std::optional<Type>{},
143 [comparator](std::optional<Type>&& seed,
auto&& val)
145 if (!seed || comparator(utils::as_const(val), seed.value()))
146 seed = std::forward<decltype(val)>(val);
147 return std::move(seed);
149 [](std::optional<Type>&& seed)
152 throw utils::not_enough_emissions{
"`min` operator requires at least one emission to calculate min"};
154 return std::move(seed.value());
158template<constra
int::observable TObs,
typename Comparator>
159auto max_impl(TObs&& observable, Comparator&& comparator)
161 using Type = utils::extract_observable_type_t<std::decay_t<TObs>>;
162 return std::forward<TObs>(observable).reduce(std::optional<Type>{},
163 [comparator](std::optional<Type>&& seed,
auto&& val)
165 if (!seed || comparator(seed.value(), utils::as_const(val)))
166 seed = std::forward<decltype(val)>(val);
167 return std::move(seed);
169 [](std::optional<Type>&& seed)
172 throw utils::not_enough_emissions{
"`max` operator requires at least one emission to calculate min"};
174 return std::move(seed.value());
Definition: constraints.hpp:19
Definition: reduce.hpp:67
Definition: reduce.hpp:47
Definition: reduce.hpp:36
Definition: reduce.hpp:29