ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
reduce.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/strategy.hpp>
17
18namespace rpp::operators::details
19{
20 template<rpp::constraint::observer TObserver, rpp::constraint::decayed_type Accumulator>
22 {
23 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
24 using Seed = rpp::utils::extract_observer_type_t<TObserver>;
25
26 RPP_NO_UNIQUE_ADDRESS TObserver observer;
27 RPP_NO_UNIQUE_ADDRESS mutable Seed seed;
28 RPP_NO_UNIQUE_ADDRESS Accumulator accumulator;
29
30 template<typename T>
31 void on_next(T&& v) const
32 {
33 seed = accumulator(std::move(seed), std::forward<T>(v));
34 }
35
36 void on_error(const std::exception_ptr& err) const { observer.on_error(err); }
37
38 void on_completed() const
39 {
40 observer.on_next(std::move(seed));
41 observer.on_completed();
42 }
43
44 void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); }
45
46 bool is_disposed() const { return observer.is_disposed(); }
47 };
48
49 template<rpp::constraint::decayed_type Seed, rpp::constraint::decayed_type Accumulator>
50 struct reduce_t : lift_operator<reduce_t<Seed, Accumulator>, Seed, Accumulator>
51 {
52 using operators::details::lift_operator<reduce_t<Seed, Accumulator>, Seed, Accumulator>::lift_operator;
53
54 template<rpp::constraint::decayed_type T>
56 {
57 static_assert(std::is_invocable_r_v<Seed, Accumulator, Seed&&, T>, "Accumulator is not invocable with Seed&& abnd T returning Seed");
58
59 using result_type = Seed;
60
61 template<rpp::constraint::observer_of_type<result_type> TObserver>
63 };
64
65 template<rpp::details::observables::constraint::disposables_strategy Prev>
66 using updated_optimal_disposables_strategy = Prev;
67 };
68
69 template<rpp::constraint::observer TObserver, rpp::constraint::decayed_type Accumulator>
71 {
72 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
73 using Seed = rpp::utils::extract_observer_type_t<TObserver>;
74
75 RPP_NO_UNIQUE_ADDRESS TObserver observer;
76 RPP_NO_UNIQUE_ADDRESS Accumulator accumulator;
77 mutable std::optional<Seed> seed{};
78
79 template<typename T>
80 void on_next(T&& v) const
81 {
82 if (seed.has_value())
83 seed = accumulator(std::move(seed).value(), std::forward<T>(v));
84 else
85 seed = std::forward<T>(v);
86 }
87
88 void on_error(const std::exception_ptr& err) const { observer.on_error(err); }
89
90 void on_completed() const
91 {
92 if (seed.has_value())
93 observer.on_next(std::move(seed).value());
94 observer.on_completed();
95 }
96
97 void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); }
98
99 bool is_disposed() const { return observer.is_disposed(); }
100 };
101
102 template<rpp::constraint::decayed_type Accumulator>
103 struct reduce_no_seed_t : lift_operator<reduce_no_seed_t<Accumulator>, Accumulator>
104 {
105 using lift_operator<reduce_no_seed_t<Accumulator>, Accumulator>::lift_operator;
106
107 template<rpp::constraint::decayed_type T>
109 {
110 static_assert(std::is_invocable_r_v<T, Accumulator, T&&, T>, "Accumulator is not invocable with T&& abnd T returning T");
111
112 using result_type = T;
113
114 template<rpp::constraint::observer_of_type<result_type> TObserver>
116 };
117
118 template<rpp::details::observables::constraint::disposables_strategy Prev>
119 using updated_optimal_disposables_strategy = Prev;
120 };
121} // namespace rpp::operators::details
122
123namespace rpp::operators
124{
143 * @see https://reactivex.io/documentation/operators/reduce.html
144 */
145 template<typename Seed, typename Accumulator>
146 requires (!utils::is_not_template_callable<Accumulator> || std::same_as<std::decay_t<Seed>, std::invoke_result_t<Accumulator, std::decay_t<Seed> &&, rpp::utils::convertible_to_any>>)
147 auto reduce(Seed&& seed, Accumulator&& accumulator)
148 {
149 return details::reduce_t<std::decay_t<Seed>, std::decay_t<Accumulator>>{std::forward<Seed>(seed), std::forward<Accumulator>(accumulator)};
150 }
151
167 * @par Example
168 * @snippet reduce.cpp reduce_no_seed
169 *
170 * @ingroup aggregate_operators
171 * @see https://reactivex.io/documentation/operators/reduce.html
172 */
173 template<typename Accumulator>
174 auto reduce(Accumulator&& accumulator)
175 {
176 return details::reduce_no_seed_t<std::decay_t<Accumulator>>{std::forward<Accumulator>(accumulator)};
177 }
178} // namespace rpp::operators
Definition strategy.hpp:28
Definition function_traits.hpp:45
auto reduce(Seed &&seed, Accumulator &&accumulator)
Apply a function to each item emitted by an Observable, sequentially, and emit the final value.
Definition reduce.hpp:143
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition reduce.hpp:51
Definition utils.hpp:68