23 class buffer_observer_strategy
25 using container = rpp::utils::extract_observer_type_t<TObserver>;
26 using value_type =
typename container::value_type;
27 static_assert(std::same_as<container, std::vector<value_type>>);
30 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
32 buffer_observer_strategy(TObserver&&
observer,
size_t count)
35 m_bucket.reserve(std::max(
size_t{1}, count));
39 void on_next(T&& v)
const
41 m_bucket.push_back(std::forward<T>(v));
42 if (m_bucket.size() == m_bucket.capacity())
44 const auto capacity = m_bucket.capacity();
45 m_observer.on_next(std::move(m_bucket));
48 m_bucket.reserve(capacity);
52 void on_error(
const std::exception_ptr& err)
const { m_observer.on_error(err); }
54 void on_completed()
const
56 if (!m_bucket.empty())
57 m_observer.on_next(std::move(m_bucket));
58 m_observer.on_completed();
63 bool is_disposed()
const {
return m_observer.is_disposed(); }
66 RPP_NO_UNIQUE_ADDRESS TObserver m_observer;
67 mutable std::vector<value_type> m_bucket;