26 class replay_subject_base
28 struct replay_state final :
public subject_state<Type, Serialized>
30 replay_state(
size_t limit = std::numeric_limits<size_t>::max(), rpp::schedulers::duration duration_limit = std::numeric_limits<rpp::schedulers::duration>::max())
32 , m_duration_limit(duration_limit)
36 void add_value(
const Type& v)
38 std::unique_lock lock{m_values_mutex};
39 while (m_values.size() >= m_limit)
42 m_values.emplace_back(v, deduce_timepoint());
45 struct value_with_time
47 value_with_time(
const Type& v, rpp::schedulers::clock_type::time_point timepoint)
49 , timepoint{timepoint}
54 rpp::schedulers::clock_type::time_point timepoint;
58 std::deque<value_with_time> get_actual_values()
60 std::unique_lock lock{m_values_mutex};
66 rpp::schedulers::clock_type::time_point deduce_timepoint()
68 if (std::numeric_limits<rpp::schedulers::duration>::max() == m_duration_limit)
69 return rpp::schedulers::clock_type::time_point{};
71 auto now = rpp::schedulers::clock_type::now();
72 while (!m_values.empty() && (now - m_values.front().timepoint > m_duration_limit))
78 std::mutex m_values_mutex{};
79 std::deque<value_with_time> m_values{};
82 const rpp::schedulers::duration m_duration_limit;
85 struct observer_strategy
87 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
89 std::shared_ptr<replay_state> state;
93 bool is_disposed() const noexcept {
return state->is_disposed(); }
95 void on_next(
const Type& v)
const
101 void on_error(
const std::exception_ptr& err)
const { state->on_error(err); }
103 void on_completed()
const { state->on_completed(); }
107 using optimal_disposables_strategy =
typename details::subject_state<Type, Serialized>::optimal_disposables_strategy;
109 replay_subject_base()
110 : m_state{disposable_wrapper_impl<replay_state>::make()}
114 replay_subject_base(
size_t count)
115 : m_state{disposable_wrapper_impl<replay_state>::make(std::max<size_t>(1, count))}
119 replay_subject_base(
size_t count, rpp::schedulers::duration duration)
120 : m_state{disposable_wrapper_impl<replay_state>::make(std::max<size_t>(1, count), duration)}
124 auto get_observer()
const
126 return rpp::observer<Type, observer_strategy>{m_state.lock()};
129 auto get_observable()
const
131 return create_subject_on_subscribe_observable<Type, optimal_disposables_strategy>([state = m_state]<rpp::constraint::observer_of_type<Type> TObs>(TObs&& observer) {
132 const auto locked = state.lock();
133 for (
auto&& value : locked->get_actual_values())
134 observer.on_next(std::move(value.value));
135 locked->on_subscribe(std::forward<TObs>(observer));
145 disposable_wrapper_impl<replay_state> m_state;