ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
replay_subject.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/schedulers/fwd.hpp>
13#include <rpp/subjects/fwd.hpp>
14
15#include <rpp/disposables/disposable_wrapper.hpp>
16#include <rpp/observers/observer.hpp>
17#include <rpp/subjects/details/subject_on_subscribe.hpp>
18#include <rpp/subjects/details/subject_state.hpp>
19
20#include <deque>
21#include <utility>
22
23namespace rpp::subjects::details
24{
25 template<rpp::constraint::decayed_type Type, bool Serialized>
27 {
28 struct replay_state final : public subject_state<Type, Serialized>
29 {
30 replay_state(size_t limit = std::numeric_limits<size_t>::max(), rpp::schedulers::duration duration_limit = std::numeric_limits<rpp::schedulers::duration>::max())
31 : m_limit(limit)
32 , m_duration_limit(duration_limit)
33 {
34 }
35
36 void add_value(const Type& v)
37 {
38 std::unique_lock lock{m_values_mutex};
39 while (m_values.size() >= m_limit)
40 m_values.pop_front();
41
42 m_values.emplace_back(v, deduce_timepoint());
43 }
44
46 {
47 value_with_time(const Type& v, rpp::schedulers::clock_type::time_point timepoint)
48 : value{v}
49 , timepoint{timepoint}
50 {
51 }
52
53 Type value;
54 rpp::schedulers::clock_type::time_point timepoint;
55 };
56
57
58 std::deque<value_with_time> get_actual_values()
59 {
60 std::unique_lock lock{m_values_mutex};
61 deduce_timepoint();
62 return m_values;
63 }
64
65 private:
66 rpp::schedulers::clock_type::time_point deduce_timepoint()
67 {
68 if (std::numeric_limits<rpp::schedulers::duration>::max() == m_duration_limit)
69 return rpp::schedulers::clock_type::time_point{};
70
71 auto now = rpp::schedulers::clock_type::now();
72 while (!m_values.empty() && (now - m_values.front().timepoint > m_duration_limit))
73 m_values.pop_front();
74 return now;
75 }
76
77 private:
78 std::mutex m_values_mutex{};
79 std::deque<value_with_time> m_values{};
80
81 const size_t m_limit;
82 const rpp::schedulers::duration m_duration_limit;
83 };
84
85 struct observer_strategy
86 {
87 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
88
89 std::shared_ptr<replay_state> state;
90
91 void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); }
92
93 bool is_disposed() const noexcept { return state->is_disposed(); }
94
95 void on_next(const Type& v) const
96 {
97 state->add_value(v);
98 state->on_next(v);
99 }
100
101 void on_error(const std::exception_ptr& err) const { state->on_error(err); }
102
103 void on_completed() const { state->on_completed(); }
104 };
105
106 public:
107 using optimal_disposables_strategy = typename details::subject_state<Type, Serialized>::optimal_disposables_strategy;
108
109 replay_subject_base()
110 : m_state{disposable_wrapper_impl<replay_state>::make()}
111 {
112 }
113
114 replay_subject_base(size_t count)
115 : m_state{disposable_wrapper_impl<replay_state>::make(std::max<size_t>(1, count))}
116 {
117 }
118
119 replay_subject_base(size_t count, rpp::schedulers::duration duration)
120 : m_state{disposable_wrapper_impl<replay_state>::make(std::max<size_t>(1, count), duration)}
121 {
122 }
123
124 auto get_observer() const
125 {
126 return rpp::observer<Type, observer_strategy>{m_state.lock()};
127 }
128
129 auto get_observable() const
130 {
131 return create_subject_on_subscribe_observable<Type, optimal_disposables_strategy>([state = m_state]<rpp::constraint::observer_of_type<Type> TObs>(TObs&& observer) {
132 const auto locked = state.lock();
133 for (auto&& value : locked->get_actual_values())
134 observer.on_next(std::move(value.value));
135 locked->on_subscribe(std::forward<TObs>(observer));
136 });
137 }
138
139 rpp::disposable_wrapper get_disposable() const
140 {
141 return m_state;
142 }
143
144 private:
145 disposable_wrapper_impl<replay_state> m_state;
146 };
147} // namespace rpp::subjects::details
148
149namespace rpp::subjects
150{
162 template<rpp::constraint::decayed_type Type>
163 class replay_subject final : public details::replay_subject_base<Type, false>
164 {
165 public:
166 using details::replay_subject_base<Type, false>::replay_subject_base;
167 };
168
176 template<rpp::constraint::decayed_type Type>
178 {
179 public:
180 using details::replay_subject_base<Type, true>::replay_subject_base;
181 };
182} // namespace rpp::subjects
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition replay_subject.hpp:27
Definition subject_state.hpp:41
Same as rpp::subjects::publish_subject but send all earlier emitted values to any new observers.
Definition replay_subject.hpp:164
Same as rpp::subjects::replay_subject but on_next/on_error/on_completed calls are serialized via mute...
Definition replay_subject.hpp:178
Definition fwd.hpp:253
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34