13#include <rpp/operators/lift.hpp>
14#include <rpp/operators/details/subscriber_with_state.hpp>
15#include <rpp/operators/fwd/take_last.hpp>
16#include <rpp/subscribers/constraints.hpp>
17#include <rpp/utils/functors.hpp>
22IMPLEMENTATION_FILE(take_last_tag);
26template<constra
int::decayed_type Type>
32 size_t get_next_position(
size_t pos)
const
34 return ++pos >= items.size() ? 0 : pos;
37 mutable std::vector<std::optional<Type>> items;
38 mutable size_t current_end_position{};
43 template<
typename T, constra
int::decayed_type Type>
47 if (state.items.empty())
50 state.items[state.current_end_position].emplace(std::forward<T>(v));
51 state.current_end_position = state.get_next_position(state.current_end_position);
57 template<constra
int::decayed_type Type>
60 if (!state.items.empty())
62 size_t cur_pos = state.current_end_position;
66 if (
auto&& value = state.items[cur_pos])
67 subscriber.on_next(std::move(value.value()));
69 cur_pos = state.get_next_position(cur_pos);
71 }
while (cur_pos != state.current_end_position);
73 subscriber.on_completed();
78template<constra
int::decayed_type Type>
83 template<constra
int::subscriber_of_type<Type> TSub>
84 auto operator()(TSub&& subscriber)
const
86 auto subscription = subscriber.get_subscription();
89 return create_subscriber_with_dynamic_state<Type>(std::move(subscription),
91 utils::forwarding_on_error{},
93 std::forward<TSub>(subscriber),
Definition: take_last.hpp:80
Definition: take_last.hpp:56
Definition: take_last.hpp:42
Definition: take_last.hpp:28