13#include <rpp/operators/fwd.hpp>
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/strategy.hpp>
17#include <rpp/utils/utils.hpp>
21namespace rpp::operators::details
23 template<rpp::constra
int::observer TObserver, rpp::constra
int::decayed_type Seed, rpp::constra
int::decayed_type Fn>
26 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
28 RPP_NO_UNIQUE_ADDRESS TObserver
observer;
29 RPP_NO_UNIQUE_ADDRESS
mutable Seed seed;
30 RPP_NO_UNIQUE_ADDRESS Fn fn;
32 RPP_CALL_DURING_CONSTRUCTION(
36 void on_next(T&& v)
const
38 seed = fn(std::move(seed), std::forward<T>(v));
42 void on_error(
const std::exception_ptr& err)
const {
observer.
on_error(err); }
51 template<rpp::constra
int::decayed_type InitialValue, rpp::constra
int::decayed_type Fn>
56 template<rpp::constra
int::decayed_type T>
59 static_assert(std::is_invocable_r_v<InitialValue, Fn, InitialValue&&, T>,
"Accumulator is not invocable with Seed&& abnd T returning Seed");
61 using result_type = InitialValue;
63 template<rpp::constra
int::observer_of_type<result_type> TObserver>
67 template<rpp::details::observables::constra
int::disposables_strategy Prev>
68 using updated_optimal_disposables_strategy = Prev;
71 template<rpp::constra
int::observer TObserver, rpp::constra
int::decayed_type Fn>
74 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
76 using Seed = rpp::utils::extract_observer_type_t<TObserver>;
78 RPP_NO_UNIQUE_ADDRESS TObserver
observer;
79 RPP_NO_UNIQUE_ADDRESS Fn fn;
80 mutable std::optional<Seed> seed{};
82 template<rpp::constra
int::decayed_same_as<Seed> T>
83 void on_next(T&& v)
const
86 seed = fn(std::move(seed).value(), std::forward<T>(v));
88 seed = std::forward<T>(v);
93 void on_error(
const std::exception_ptr& err)
const {
observer.
on_error(err); }
102 template<rpp::constra
int::decayed_type Fn>
107 template<rpp::constra
int::decayed_type T>
110 static_assert(std::is_invocable_r_v<T, Fn, T&&, T>,
"Accumulator is not invocable with T&& abnd T returning T");
112 using result_type = T;
114 template<rpp::constra
int::observer_of_type<result_type> TObserver>
118 template<rpp::details::observables::constra
int::disposables_strategy Prev>
119 using updated_optimal_disposables_strategy = Prev;
123namespace rpp::operators
153 template<
typename InitialValue,
typename Fn>
155 auto scan(InitialValue&& initial_value, Fn&& accumulator)
186 template<
typename Fn>
187 auto scan(Fn&& accumulator)
189 return details::scan_no_seed_t<std::decay_t<Fn>>{std::forward<Fn>(accumulator)};
void on_next(const Type &v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:84
void set_upstream(const disposable_wrapper &d) noexcept
Observable calls this method to pass disposable. Observer disposes this disposable WHEN observer want...
Definition observer.hpp:49
void on_completed() const noexcept
Observable calls this method to notify observer about completion of emissions.
Definition observer.hpp:135
void on_error(const std::exception_ptr &err) const noexcept
Observable calls this method to notify observer about some error during generation next data.
Definition observer.hpp:120
bool is_disposed() const noexcept
Observable calls this method to check if observer interested or not in emissions.
Definition observer.hpp:74
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition strategy.hpp:28
Definition function_traits.hpp:45