ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
scan.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/strategy.hpp>
17#include <rpp/utils/utils.hpp>
18
19#include <optional>
20
21namespace rpp::operators::details
22{
23 template<rpp::constraint::observer TObserver, rpp::constraint::decayed_type Seed, rpp::constraint::decayed_type Fn>
25 {
26 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
27
28 RPP_NO_UNIQUE_ADDRESS TObserver observer;
29 RPP_NO_UNIQUE_ADDRESS mutable Seed seed;
30 RPP_NO_UNIQUE_ADDRESS Fn fn;
31
32 RPP_CALL_DURING_CONSTRUCTION(
33 observer.on_next(utils::as_const(seed)););
34
35 template<typename T>
36 void on_next(T&& v) const
37 {
38 seed = fn(std::move(seed), std::forward<T>(v));
39 observer.on_next(utils::as_const(seed));
40 }
41
42 void on_error(const std::exception_ptr& err) const { observer.on_error(err); }
43
44 void on_completed() const { observer.on_completed(); }
45
46 void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); }
47
48 bool is_disposed() const { return observer.is_disposed(); }
49 };
50
51 template<rpp::constraint::decayed_type InitialValue, rpp::constraint::decayed_type Fn>
52 struct scan_t : lift_operator<scan_t<InitialValue, Fn>, InitialValue, Fn>
53 {
54 using operators::details::lift_operator<scan_t<InitialValue, Fn>, InitialValue, Fn>::lift_operator;
55
56 template<rpp::constraint::decayed_type T>
58 {
59 static_assert(std::is_invocable_r_v<InitialValue, Fn, InitialValue&&, T>, "Accumulator is not invocable with Seed&& abnd T returning Seed");
60
61 using result_type = InitialValue;
62
63 template<rpp::constraint::observer_of_type<result_type> TObserver>
65 };
66
67 template<rpp::details::observables::constraint::disposables_strategy Prev>
68 using updated_optimal_disposables_strategy = Prev;
69 };
70
71 template<rpp::constraint::observer TObserver, rpp::constraint::decayed_type Fn>
73 {
74 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
75
76 using Seed = rpp::utils::extract_observer_type_t<TObserver>;
77
78 RPP_NO_UNIQUE_ADDRESS TObserver observer;
79 RPP_NO_UNIQUE_ADDRESS Fn fn;
80 mutable std::optional<Seed> seed{};
81
82 template<rpp::constraint::decayed_same_as<Seed> T>
83 void on_next(T&& v) const
84 {
85 if (seed)
86 seed = fn(std::move(seed).value(), std::forward<T>(v));
87 else
88 seed = std::forward<T>(v);
89
90 observer.on_next(utils::as_const(seed.value()));
91 }
92
93 void on_error(const std::exception_ptr& err) const { observer.on_error(err); }
94
95 void on_completed() const { observer.on_completed(); }
96
97 void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); }
98
99 bool is_disposed() const { return observer.is_disposed(); }
100 };
101
102 template<rpp::constraint::decayed_type Fn>
103 struct scan_no_seed_t : lift_operator<scan_no_seed_t<Fn>, Fn>
104 {
105 using lift_operator<scan_no_seed_t<Fn>, Fn>::lift_operator;
106
107 template<rpp::constraint::decayed_type T>
109 {
110 static_assert(std::is_invocable_r_v<T, Fn, T&&, T>, "Accumulator is not invocable with T&& abnd T returning T");
111
112 using result_type = T;
113
114 template<rpp::constraint::observer_of_type<result_type> TObserver>
116 };
117
118 template<rpp::details::observables::constraint::disposables_strategy Prev>
119 using updated_optimal_disposables_strategy = Prev;
120 };
121} // namespace rpp::operators::details
122
123namespace rpp::operators
124{
153 template<typename InitialValue, typename Fn>
154 requires (!utils::is_not_template_callable<Fn> || std::same_as<std::decay_t<InitialValue>, std::invoke_result_t<Fn, std::decay_t<InitialValue> &&, rpp::utils::convertible_to_any>>)
155 auto scan(InitialValue&& initial_value, Fn&& accumulator)
156 {
157 return details::scan_t<std::decay_t<InitialValue>, std::decay_t<Fn>>{std::forward<InitialValue>(initial_value), std::forward<Fn>(accumulator)};
158 }
159
186 template<typename Fn>
187 auto scan(Fn&& accumulator)
188 {
189 return details::scan_no_seed_t<std::decay_t<Fn>>{std::forward<Fn>(accumulator)};
190 }
191} // namespace rpp::operators
void on_next(const Type &v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:84
void set_upstream(const disposable_wrapper &d) noexcept
Observable calls this method to pass disposable. Observer disposes this disposable WHEN observer want...
Definition observer.hpp:49
void on_completed() const noexcept
Observable calls this method to notify observer about completion of emissions.
Definition observer.hpp:135
void on_error(const std::exception_ptr &err) const noexcept
Observable calls this method to notify observer about some error during generation next data.
Definition observer.hpp:120
bool is_disposed() const noexcept
Observable calls this method to check if observer interested or not in emissions.
Definition observer.hpp:74
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition strategy.hpp:28
Definition function_traits.hpp:45
auto scan(InitialValue &&initial_value, Fn &&accumulator)
Apply accumulator function for each emission from observable and result of accumulator from previous ...
Definition scan.hpp:151
Definition scan.hpp:53
Definition utils.hpp:68