ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
take_last.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/lift.hpp> // required due to operator uses lift
14#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
15#include <rpp/operators/fwd/take_last.hpp>
16#include <rpp/subscribers/constraints.hpp>
17#include <rpp/utils/functors.hpp>
18
19#include <optional>
20#include <vector>
21
22IMPLEMENTATION_FILE(take_last_tag);
23
24namespace rpp::details
25{
26template<constraint::decayed_type Type>
28{
29 explicit take_last_state(size_t count)
30 : items(count) {}
31
32 size_t get_next_position(size_t pos) const
33 {
34 return ++pos >= items.size() ? 0 : pos;
35 }
36
37 mutable std::vector<std::optional<Type>> items;
38 mutable size_t current_end_position{};
39};
40
42{
43 template<typename T, constraint::decayed_type Type>
44 void operator()(T&& v, const auto&, const take_last_state<Type>& state) const
45 {
46 // handle case "count==0"
47 if (state.items.empty())
48 return;
49
50 state.items[state.current_end_position].emplace(std::forward<T>(v));
51 state.current_end_position = state.get_next_position(state.current_end_position);
52 }
53};
54
56{
57 template<constraint::decayed_type Type>
58 void operator()(const auto& subscriber, const take_last_state<Type>& state) const
59 {
60 if (!state.items.empty())
61 {
62 size_t cur_pos = state.current_end_position;
63
64 do
65 {
66 if (auto&& value = state.items[cur_pos])
67 subscriber.on_next(std::move(value.value()));
68
69 cur_pos = state.get_next_position(cur_pos);
70
71 } while (cur_pos != state.current_end_position);
72 }
73 subscriber.on_completed();
74 }
75};
76
77
78template<constraint::decayed_type Type>
80{
81 size_t count;
82
83 template<constraint::subscriber_of_type<Type> TSub>
84 auto operator()(TSub&& subscriber) const
85 {
86 auto subscription = subscriber.get_subscription();
87 // dynamic_state there to make shared_ptr for observer instead of making
88 // shared_ptr for state
89 return create_subscriber_with_dynamic_state<Type>(std::move(subscription),
91 utils::forwarding_on_error{},
93 std::forward<TSub>(subscriber),
95 }
96};
97} // namespace rpp::details
Definition: take_last.hpp:80
Definition: take_last.hpp:56
Definition: take_last.hpp:42
Definition: take_last.hpp:28