ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
take_last.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/strategy.hpp>
17
18#include <cstddef>
19
20namespace rpp::operators::details
21{
22 template<rpp::constraint::observer TObserver>
23 class take_last_observer_strategy
24 {
25 public:
26 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
27
28 take_last_observer_strategy(TObserver&& observer, size_t count)
29 : m_observer{std::move(observer)}
30 {
31 m_data.reserve(count);
32 }
33
34 template<typename T>
35 void on_next(T&& v) const
36 {
37 // handle case "count==0"
38 if (!m_data.capacity())
39 return;
40
41 // handle case "count==0"
42 if (m_data.size() < m_data.capacity())
43 {
44 m_data.push_back(std::forward<T>(v));
45 }
46 else
47 {
48 m_data[m_current_end] = std::forward<T>(v);
49 m_current_end = get_next(m_current_end);
50 }
51 }
52
53 void on_error(const std::exception_ptr& err) const { m_observer.on_error(err); }
54
55 void on_completed() const
56 {
57 for (size_t i = 0; i < m_data.size(); ++i)
58 {
59 m_observer.on_next(std::move(m_data[m_current_end]));
60 m_current_end = get_next(m_current_end);
61 }
62
63 m_observer.on_completed();
64 }
65
66 void set_upstream(const disposable_wrapper& d) { m_observer.set_upstream(d); }
67
68 bool is_disposed() const { return m_observer.is_disposed(); }
69
70 private:
71 size_t get_next(size_t pos) const
72 {
73 return ++pos >= m_data.size() ? 0 : pos;
74 }
75
76
77 private:
78 RPP_NO_UNIQUE_ADDRESS TObserver m_observer;
79 mutable std::vector<rpp::utils::extract_observer_type_t<TObserver>> m_data{};
80 mutable size_t m_current_end{};
81 };
82
83 struct take_last_t : lift_operator<take_last_t, size_t>
84 {
85 using lift_operator<take_last_t, size_t>::lift_operator;
86
87 template<rpp::constraint::decayed_type T>
89 {
90 using result_type = T;
91
92 template<rpp::constraint::observer_of_type<result_type> TObserver>
93 using observer_strategy = take_last_observer_strategy<TObserver>;
94 };
95
96 template<rpp::details::observables::constraint::disposables_strategy Prev>
97 using updated_optimal_disposables_strategy = Prev;
98 };
99} // namespace rpp::operators::details
100
101namespace rpp::operators
102{
120 * @ingroup filtering_operators
121 * @see https://reactivex.io/documentation/operators/takelast.html
122 */
123 inline auto take_last(size_t count)
124 {
125 return details::take_last_t{count};
126 }
127} // namespace rpp::operators
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
auto take_last(size_t count)
Emit only last count items provided by observable, then send on_completed
Definition take_last.hpp:119
Definition take_last.hpp:84