ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
scan.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/defs.hpp> // RPP_NO_UNIQUE_ADDRESS
14#include <rpp/operators/lift.hpp> // required due to operator uses lift
15#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
16#include <rpp/operators/fwd/scan.hpp> // own forwarding
17#include <rpp/operators/reduce.hpp> // reduce to re-use
18#include <rpp/subscribers/constraints.hpp> // constraint::subscriber
19#include <rpp/utils/functors.hpp> // forwarding_on_error
20#include <rpp/utils/utilities.hpp> // utils::as_const
21
22
23IMPLEMENTATION_FILE(scan_tag);
24
25namespace rpp::details
26{
28{
29 template<constraint::decayed_type Result, typename AccumulatorFn>
30 void operator()(auto&& value,
31 const constraint::subscriber auto& sub,
32 const reduce_state<Result, AccumulatorFn>& state) const
33 {
34 reduce_on_next::operator()(std::forward<decltype(value)>(value), sub, state);
35 sub.on_next(utils::as_const(state.seed));
36 }
37};
38
39template<constraint::decayed_type Type, constraint::decayed_type Result, scan_accumulator<Result, Type> AccumulatorFn>
41{
42 Result initial_value;
43 RPP_NO_UNIQUE_ADDRESS AccumulatorFn accumulator;
44
45 template<constraint::subscriber_of_type<Result> TSub>
46 auto operator()(TSub&& subscriber) const
47 {
48 auto subscription = subscriber.get_subscription();
49 // dynamic_state there to make shared_ptr for observer instead of making shared_ptr for state
50 return create_subscriber_with_dynamic_state<Type>(std::move(subscription),
52 utils::forwarding_on_error{},
53 utils::forwarding_on_completed{},
54 std::forward<TSub>(subscriber),
55 reduce_state<Result, AccumulatorFn>{initial_value, accumulator});
56 }
57};
58} // namespace rpp::details
59
Definition: constraints.hpp:19
Definition: reduce.hpp:36
Definition: reduce.hpp:29
Definition: scan.hpp:41
Definition: scan.hpp:28