14#include <rpp/operators/lift.hpp>
15#include <rpp/operators/details/subscriber_with_state.hpp>
16#include <rpp/operators/fwd/buffer.hpp>
17#include <rpp/subscribers/constraints.hpp>
18#include <rpp/utils/functors.hpp>
23IMPLEMENTATION_FILE(buffer_tag);
29template<constra
int::decayed_type UpstreamType>
35 : max(std::max(size_t{1}, count))
37 clear_and_reserve_buckets();
45 void clear_and_reserve_buckets()
const
52 mutable buffer_bundle_type<UpstreamType> buckets;
57 template<constra
int::decayed_type UpstreamType>
60 state.buckets.push_back(std::forward<
decltype(value)>(value));
61 if (state.buckets.size() == state.max)
63 subscriber.on_next(std::move(state.buckets));
64 state.clear_and_reserve_buckets();
71 template<constra
int::decayed_type UpstreamType>
74 if (!state.buckets.empty())
75 subscriber.on_next(std::move(state.buckets));
76 subscriber.on_completed();
80template<constra
int::decayed_type Type>
85 template<constra
int::subscriber_of_type<buffer_bundle_type<Type>> TSub>
86 auto operator()(TSub&& subscriber)
const
88 auto subscription = subscriber.get_subscription();
91 return create_subscriber_with_dynamic_state<Type>(std::move(subscription),
93 utils::forwarding_on_error{},
95 std::forward<TSub>(subscriber),
Definition: buffer.hpp:82
Definition: buffer.hpp:70
Definition: buffer.hpp:56
Definition: buffer.hpp:31
buffer_state(size_t count)
Definition: buffer.hpp:34