ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
concat.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/disposables/composite_disposable.hpp>
16#include <rpp/operators/details/strategy.hpp>
17#include <rpp/utils/utils.hpp>
18
19#include <array>
20#include <cassert>
21#include <queue>
22
23
24namespace rpp::operators::details
25{
26 template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
28
29 enum class ConcatStage : uint8_t
30 {
31 None = 0,
32 Draining = 1,
33 CompletedWhileDraining = 2,
34 Processing = 3,
35 };
36
37 template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
38 class concat_disposable final : public rpp::details::base_disposable
39 , public rpp::details::enable_wrapper_from_this<concat_disposable<TObservable, TObserver>>
40 {
41 public:
42 concat_disposable(TObserver&& observer)
43 : m_observer{std::move(observer)}
44 {
45 }
46
47 rpp::utils::pointer_under_lock<TObserver> get_observer() { return m_observer; }
48 rpp::utils::pointer_under_lock<std::queue<TObservable>> get_queue() { return m_queue; }
49
50 std::atomic<ConcatStage>& stage() { return m_stage; }
51
52 void drain()
53 {
54 while (!is_disposed())
55 {
56 const auto observable = get_observable();
57 if (!observable)
58 {
59 stage().store(ConcatStage::None, std::memory_order::relaxed);
60 if (get_base_child_disposable().is_disposed())
61 get_observer()->on_completed();
62 return;
63 }
64
65 if (handle_observable_impl(observable.value()))
66 return;
67 }
68 }
69
70 void handle_observable(const rpp::constraint::decayed_same_as<TObservable> auto& observable)
71 {
72 if (handle_observable_impl(observable))
73 return;
74
75 drain();
76 }
77
78 rpp::composite_disposable& get_base_child_disposable() { return m_child_disposables[0]; }
79 rpp::composite_disposable& get_inner_child_disposable() { return m_child_disposables[1]; }
80
81 private:
82 bool handle_observable_impl(const rpp::constraint::decayed_same_as<TObservable> auto& observable)
83 {
84 stage().store(ConcatStage::Draining, std::memory_order::relaxed);
85 observable.subscribe(concat_inner_observer_strategy<TObservable, TObserver>{disposable_wrapper_impl<concat_disposable>{this->wrapper_from_this()}.lock()});
86
87 ConcatStage current = ConcatStage::Draining;
88 return stage().compare_exchange_strong(current, ConcatStage::Processing, std::memory_order::seq_cst);
89 }
90
91 void base_dispose_impl(interface_disposable::Mode) noexcept override
92 {
93 for (auto& d : m_child_disposables)
94 d.dispose();
95 }
96
97 std::optional<TObservable> get_observable()
98 {
99 auto queue = get_queue();
100 if (queue->empty())
101 return std::nullopt;
102 auto observable = queue->front();
103 queue->pop();
104 return observable;
105 }
106
107 private:
110 std::atomic<ConcatStage> m_stage{};
111
112 std::array<rpp::composite_disposable, 2> m_child_disposables{};
113 };
114
115 template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
117 {
118 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean;
119
120 std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable{};
121
122 template<typename T>
123 void on_next(T&& v) const
124 {
125 disposable->get_observer()->on_next(std::forward<T>(v));
126 }
127
128 void on_error(const std::exception_ptr& err) const
129 {
130 disposable->get_observer()->on_error(err);
131 }
132
133 void on_completed() const
134 {
135 disposable->get_inner_child_disposable().clear();
136
137 ConcatStage current{ConcatStage::Draining};
138 if (disposable->stage().compare_exchange_strong(current, ConcatStage::CompletedWhileDraining, std::memory_order::seq_cst))
139 return;
140
141 assert(current == ConcatStage::Processing);
142
143 disposable->drain();
144 }
145
146 void set_upstream(const disposable_wrapper& d) const { disposable->get_inner_child_disposable().add(d); }
147
148 bool is_disposed() const { return disposable->get_inner_child_disposable().is_disposed(); }
149 };
150
151 template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
152 struct concat_observer_strategy
153 {
154 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
155
156 std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable;
157
158 concat_observer_strategy(TObserver&& observer)
159 : disposable{init_state(std::move(observer))}
160 {
161 }
162
163 template<typename T>
164 void on_next(T&& v) const
165 {
166 ConcatStage current = ConcatStage::None;
167 if (disposable->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst))
168 disposable->handle_observable(std::forward<T>(v));
169 else
170 disposable->get_queue()->push(std::forward<T>(v));
171 }
172
173 void on_error(const std::exception_ptr& err) const
174 {
175 disposable->get_observer()->on_error(err);
176 }
177
178 void on_completed() const
179 {
180 disposable->get_base_child_disposable().dispose();
181 if (disposable->stage() == ConcatStage::None)
182 disposable->get_observer()->on_completed();
183 }
184
185 void set_upstream(const disposable_wrapper& d) const { disposable->get_base_child_disposable().add(d); }
186
187 bool is_disposed() const { return disposable->get_base_child_disposable().is_disposed(); }
188
189 private:
190 static std::shared_ptr<concat_disposable<TObservable, TObserver>> init_state(TObserver&& observer)
191 {
193 auto ptr = d.lock();
194 ptr->get_observer()->set_upstream(d.as_weak());
195 return ptr;
196 }
197 };
198
199 struct concat_t : lift_operator<concat_t>
200 {
201 using lift_operator<concat_t>::lift_operator;
202
203 template<rpp::constraint::decayed_type T>
205 {
206 static_assert(rpp::constraint::observable<T>, "T is not observable");
207
208 using result_type = rpp::utils::extract_observable_type_t<T>;
209
210 template<rpp::constraint::observer_of_type<result_type> TObserver>
211 using observer_strategy = concat_observer_strategy<T, TObserver>;
212 };
213
214 template<rpp::details::observables::constraint::disposables_strategy Prev>
215 using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>;
216 };
217} // namespace rpp::operators::details
218
219
220namespace rpp::operators
221{
239 * @note `#include <rpp/operators/concat.hpp>`
240 *
241 * @par Example
242 * @snippet concat.cpp concat_as_operator
243 *
244 * @ingroup creational_operators
245 * @see https://reactivex.io/documentation/operators/concat.html
246 */
247 inline auto concat()
248 {
249 return details::concat_t{};
250 }
251} // namespace rpp::operators
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Definition disposable_wrapper.hpp:252
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
void subscribe(observer< Type, ObserverStrategy > &&observer) const
Subscribes passed observer to emissions from this observable.
Definition observable.hpp:58
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition utils.hpp:260
Definition constraints.hpp:19
Definition fwd.hpp:80
auto concat()
Make observable which would merge emissions from underlying observables but without overlapping (curr...
Definition concat.hpp:239
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition disposables_strategy.hpp:29
Definition concat.hpp:200