ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
buffer.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// TC Wang 2022 - present.
5// Distributed under the Boost Software License, Version 1.0.
6// (See accompanying file LICENSE_1_0.txt or copy at
7// https://www.boost.org/LICENSE_1_0.txt)
8//
9// Project home: https://github.com/victimsnino/ReactivePlusPlus
10//
11
12#pragma once
13
14#include <rpp/operators/lift.hpp> // required due to operator uses lift
15#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_dynamic_state
16#include <rpp/operators/fwd/buffer.hpp> // own forwarding
17#include <rpp/subscribers/constraints.hpp> // subscriber_of_type
18#include <rpp/utils/functors.hpp> // forwarding_on_error
19
20#include <algorithm>
21
22
23IMPLEMENTATION_FILE(buffer_tag);
24
25namespace rpp::details
26{
29template<constraint::decayed_type UpstreamType>
31{
34 explicit buffer_state(size_t count)
35 : max(std::max(size_t{1}, count))
36 {
37 clear_and_reserve_buckets();
38 }
39
40 buffer_state(const buffer_state& other) = delete;
41 buffer_state(buffer_state&&) noexcept = default;
42 buffer_state& operator=(const buffer_state&) = delete;
43 buffer_state& operator=(buffer_state&&) noexcept = default;
44
45 void clear_and_reserve_buckets() const
46 {
47 buckets.clear();
48 buckets.reserve(max);
49 }
50
51 const size_t max;
52 mutable buffer_bundle_type<UpstreamType> buckets;
53};
54
56{
57 template<constraint::decayed_type UpstreamType>
58 void operator()(auto&& value, const auto& subscriber, const buffer_state<UpstreamType>& state) const
59 {
60 state.buckets.push_back(std::forward<decltype(value)>(value));
61 if (state.buckets.size() == state.max)
62 {
63 subscriber.on_next(std::move(state.buckets));
64 state.clear_and_reserve_buckets();
65 }
66 }
67};
68
70{
71 template<constraint::decayed_type UpstreamType>
72 void operator()(const auto& subscriber, const buffer_state<UpstreamType>& state) const
73 {
74 if (!state.buckets.empty())
75 subscriber.on_next(std::move(state.buckets));
76 subscriber.on_completed();
77 }
78};
79
80template<constraint::decayed_type Type>
82{
83 const size_t count;
84
85 template<constraint::subscriber_of_type<buffer_bundle_type<Type>> TSub>
86 auto operator()(TSub&& subscriber) const
87 {
88 auto subscription = subscriber.get_subscription();
89
90 // dynamic_state there to make shared_ptr for observer instead of making shared_ptr for state
91 return create_subscriber_with_dynamic_state<Type>(std::move(subscription),
93 utils::forwarding_on_error{},
95 std::forward<TSub>(subscriber),
96 buffer_state<Type>{count});
97 }
98};
99
100} // namespace rpp::details
Definition: buffer.hpp:82
Definition: buffer.hpp:70
Definition: buffer.hpp:56
Definition: buffer.hpp:31
buffer_state(size_t count)
Definition: buffer.hpp:34