ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
reduce.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/defs.hpp> // RPP_NO_UNIQUE_ADDRESS
14#include <rpp/observables/constraints.hpp>
15#include <rpp/operators/lift.hpp> // required due to operator uses lift
16#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
17#include <rpp/operators/fwd/reduce.hpp> // own forwarding
18#include <rpp/subscribers/constraints.hpp> // constraint::subscriber
19#include <rpp/utils/functors.hpp> // forwarding_on_error
20#include <rpp/utils/utilities.hpp> // utils::as_const
21
22
23IMPLEMENTATION_FILE(reduce_tag);
24
25namespace rpp::details
26{
27template<constraint::decayed_type Seed, typename AccumulatorFn, std::invocable<Seed&&> SelectorFn = std::identity>
29{
30 mutable Seed seed;
31 RPP_NO_UNIQUE_ADDRESS AccumulatorFn accumulator;
32 RPP_NO_UNIQUE_ADDRESS SelectorFn selector{};
33};
34
36{
37 template<constraint::decayed_type Result, typename AccumulatorFn, typename SelectorFn>
38 void operator()(auto&& value,
39 const constraint::subscriber auto&,
41 {
42 state.seed = state.accumulator(std::move(state.seed), std::forward<decltype(value)>(value));
43 }
44};
45
47{
48 template<constraint::decayed_type Result, typename AccumulatorFn, typename SelectorFn>
49 void operator()(const constraint::subscriber auto& sub,
51 {
52 try
53 {
54 sub.on_next(state.selector(std::move(state.seed)));
55 }
56 catch (...)
57 {
58 sub.on_error(std::current_exception());
59 return;
60 }
61 sub.on_completed();
62 }
63};
64
65template<constraint::decayed_type Type, constraint::decayed_type Seed, reduce_accumulator<Seed, Type> AccumulatorFn, std::invocable<Seed&&> ResultSelectorFn>
67{
68 Seed initial_value;
69 RPP_NO_UNIQUE_ADDRESS AccumulatorFn accumulator;
70 RPP_NO_UNIQUE_ADDRESS ResultSelectorFn selector;
71
72 template<constraint::subscriber_of_type<utils::decayed_invoke_result_t<ResultSelectorFn, Seed>> TSub>
73 auto operator()(TSub&& subscriber) const
74 {
75 auto subscription = subscriber.get_subscription();
76 // dynamic_state there to make shared_ptr for observer instead of making shared_ptr for state
77 return create_subscriber_with_dynamic_state<Type>(std::move(subscription),
79 utils::forwarding_on_error{},
81 std::forward<TSub>(subscriber),
82 reduce_state<Seed, AccumulatorFn, ResultSelectorFn>{initial_value, accumulator, selector});
83 }
84};
85
86template<constraint::decayed_type CastBeforeDivide, constraint::observable TObs>
87auto average_impl(TObs&& observable)
88{
89 using Type = utils::extract_observable_type_t<std::decay_t<TObs>>;
90 using Pair = std::pair<std::optional<Type>, int32_t>;
91 return std::forward<TObs>(observable).reduce(Pair{},
92 [](Pair&& seed, auto&& val)
93 {
94 if (seed.first)
95 seed.first.value() += std::forward<decltype(val)>(val);
96 else
97 seed.first = std::forward<decltype(val)>(val);
98 ++seed.second;
99 return std::move(seed);
100 },
101 [](Pair&& seed)
102 {
103 if (!seed.first)
104 throw utils::not_enough_emissions{"`average` operator requires at least one emission to calculate average"};
105
106 return static_cast<CastBeforeDivide>(std::move(seed.first).value()) / seed.second;
107 });
108}
109
110template<constraint::observable TObs>
111auto sum_impl(TObs&& observable)
112{
113 using Type = utils::extract_observable_type_t<std::decay_t<TObs>>;
114 return std::forward<TObs>(observable).reduce(std::optional<Type>{},
115 [](std::optional<Type>&& seed, auto&& val)
116 {
117 if (!seed)
118 seed = std::forward<decltype(val)>(val);
119 else
120 seed.value() += std::forward<decltype(val)>(val);
121 return std::move(seed);
122 },
123 [](std::optional<Type>&& seed)
124 {
125 if (!seed)
126 throw utils::not_enough_emissions{"`sum` operator requires at least one emission to calculate sum"};
127
128 return std::move(seed.value());
129 });
130}
131
132template<constraint::observable TObs>
133auto count_impl(TObs&& observable)
134{
135 return std::forward<TObs>(observable).reduce(size_t{}, [](size_t seed, auto&&) { return ++seed; });
136}
137
138template<constraint::observable TObs, typename Comparator>
139auto min_impl(TObs&& observable, Comparator&& comparator)
140{
141 using Type = utils::extract_observable_type_t<std::decay_t<TObs>>;
142 return std::forward<TObs>(observable).reduce(std::optional<Type>{},
143 [comparator](std::optional<Type>&& seed, auto&& val)
144 {
145 if (!seed || comparator(utils::as_const(val), seed.value()))
146 seed = std::forward<decltype(val)>(val);
147 return std::move(seed);
148 },
149 [](std::optional<Type>&& seed)
150 {
151 if (!seed)
152 throw utils::not_enough_emissions{"`min` operator requires at least one emission to calculate min"};
153
154 return std::move(seed.value());
155 });
156}
157
158template<constraint::observable TObs, typename Comparator>
159auto max_impl(TObs&& observable, Comparator&& comparator)
160{
161 using Type = utils::extract_observable_type_t<std::decay_t<TObs>>;
162 return std::forward<TObs>(observable).reduce(std::optional<Type>{},
163 [comparator](std::optional<Type>&& seed, auto&& val)
164 {
165 if (!seed || comparator(seed.value(), utils::as_const(val)))
166 seed = std::forward<decltype(val)>(val);
167 return std::move(seed);
168 },
169 [](std::optional<Type>&& seed)
170 {
171 if (!seed)
172 throw utils::not_enough_emissions{"`max` operator requires at least one emission to calculate min"};
173
174 return std::move(seed.value());
175 });
176}
177} // namespace rpp::details
178
Definition: constraints.hpp:19
Definition: reduce.hpp:67
Definition: reduce.hpp:47
Definition: reduce.hpp:36
Definition: reduce.hpp:29