23 class take_last_observer_strategy
26 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
28 take_last_observer_strategy(TObserver&&
observer,
size_t count)
31 m_data.reserve(count);
35 void on_next(T&& v)
const
38 if (!m_data.capacity())
42 if (m_data.size() < m_data.capacity())
44 m_data.push_back(std::forward<T>(v));
48 m_data[m_current_end] = std::forward<T>(v);
49 m_current_end = get_next(m_current_end);
53 void on_error(
const std::exception_ptr& err)
const { m_observer.on_error(err); }
55 void on_completed()
const
57 for (
size_t i = 0; i < m_data.size(); ++i)
59 m_observer.on_next(std::move(m_data[m_current_end]));
60 m_current_end = get_next(m_current_end);
63 m_observer.on_completed();
68 bool is_disposed()
const {
return m_observer.is_disposed(); }
71 size_t get_next(
size_t pos)
const
73 return ++pos >= m_data.size() ? 0 : pos;
78 RPP_NO_UNIQUE_ADDRESS TObserver m_observer;
79 mutable std::vector<rpp::utils::extract_observer_type_t<TObserver>> m_data{};
80 mutable size_t m_current_end{};
auto take_last(size_t count)
Emit only last count items provided by observable, then send on_completed
Definition take_last.hpp:119