ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
buffer.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/strategy.hpp>
17
18#include <cstddef>
19
20namespace rpp::operators::details
21{
22 template<rpp::constraint::observer TObserver>
23 class buffer_observer_strategy
24 {
25 using container = rpp::utils::extract_observer_type_t<TObserver>;
26 using value_type = typename container::value_type;
27 static_assert(std::same_as<container, std::vector<value_type>>);
28
29 public:
30 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
31
32 buffer_observer_strategy(TObserver&& observer, size_t count)
33 : m_observer{std::move(observer)}
34 {
35 m_bucket.reserve(std::max(size_t{1}, count));
36 }
37
38 template<typename T>
39 void on_next(T&& v) const
40 {
41 m_bucket.push_back(std::forward<T>(v));
42 if (m_bucket.size() == m_bucket.capacity())
43 {
44 const auto capacity = m_bucket.capacity();
45 m_observer.on_next(std::move(m_bucket));
46
47 m_bucket.clear();
48 m_bucket.reserve(capacity);
49 }
50 }
51
52 void on_error(const std::exception_ptr& err) const { m_observer.on_error(err); }
53
54 void on_completed() const
55 {
56 if (!m_bucket.empty())
57 m_observer.on_next(std::move(m_bucket));
58 m_observer.on_completed();
59 }
60
61 void set_upstream(const disposable_wrapper& d) { m_observer.set_upstream(d); }
62
63 bool is_disposed() const { return m_observer.is_disposed(); }
64
65 private:
66 RPP_NO_UNIQUE_ADDRESS TObserver m_observer;
67 mutable std::vector<value_type> m_bucket;
68 };
69
70 struct buffer_t : lift_operator<buffer_t, size_t>
71 {
72 using lift_operator<buffer_t, size_t>::lift_operator;
73
74 template<rpp::constraint::decayed_type T>
76 {
77 using result_type = std::vector<T>;
78
79 template<rpp::constraint::observer_of_type<result_type> TObserver>
80 using observer_strategy = buffer_observer_strategy<TObserver>;
81 };
82
83 template<rpp::details::observables::constraint::disposables_strategy Prev>
84 using updated_optimal_disposables_strategy = Prev;
85 };
86} // namespace rpp::operators::details
87
88namespace rpp::operators
89{
108 * @ingroup transforming_operators
109 * @see https://reactivex.io/documentation/operators/buffer.html
110 */
111 inline auto buffer(size_t count)
112 {
113 return details::buffer_t{count};
114 }
115} // namespace rpp::operators
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
auto buffer(size_t count)
Periodically gather emissions emitted by an original Observable into bundles and emit these bundles r...
Definition buffer.hpp:107
Definition buffer.hpp:71