ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
concat.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/disposables/composite_disposable.hpp>
16#include <rpp/operators/details/strategy.hpp>
17#include <rpp/utils/utils.hpp>
18
19#include <array>
20#include <cassert>
21#include <queue>
22
23
24namespace rpp::operators::details
25{
26 template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
27 struct concat_inner_observer_strategy;
28
29 enum class ConcatStage : uint8_t
30 {
31 None = 0,
32 Draining = 1,
33 CompletedWhileDraining = 2,
34 Processing = 3,
35 };
36
37 template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
39 , public rpp::details::enable_wrapper_from_this<concat_disposable<TObservable, TObserver>>
40 {
41 public:
42 concat_disposable(TObserver&& observer)
43 : m_observer{std::move(observer)}
44 {
45 }
46
47 rpp::utils::pointer_under_lock<TObserver> get_observer() { return m_observer; }
48 rpp::utils::pointer_under_lock<std::queue<TObservable>> get_queue() { return m_queue; }
49
50 std::atomic<ConcatStage>& stage() { return m_stage; }
51
52 void drain()
53 {
54 while (!is_disposed())
55 {
56 const auto observable = get_observable();
57 if (!observable)
58 {
59 stage().store(ConcatStage::None, std::memory_order::relaxed);
60 if (get_base_child_disposable().is_disposed())
61 get_observer()->on_completed();
62 return;
63 }
64
65 if (handle_observable_impl(observable.value()))
66 return;
67 }
68 }
69
70 void handle_observable(const rpp::constraint::decayed_same_as<TObservable> auto& observable)
71 {
72 if (handle_observable_impl(observable))
73 return;
74
75 drain();
76 }
77
78 rpp::composite_disposable& get_base_child_disposable() { return m_child_disposables[0]; }
79 rpp::composite_disposable& get_inner_child_disposable() { return m_child_disposables[1]; }
80
81 private:
82 bool handle_observable_impl(const rpp::constraint::decayed_same_as<TObservable> auto& observable)
83 {
84 stage().store(ConcatStage::Draining, std::memory_order::relaxed);
86
87 ConcatStage current = ConcatStage::Draining;
88 return stage().compare_exchange_strong(current, ConcatStage::Processing, std::memory_order::seq_cst);
89 }
90
91 void base_dispose_impl(interface_disposable::Mode) noexcept override
92 {
93 for (auto& d : m_child_disposables)
94 d.dispose();
95 }
96
97 std::optional<TObservable> get_observable()
98 {
99 auto queue = get_queue();
100 if (queue->empty())
101 return std::nullopt;
102 auto observable = queue->front();
103 queue->pop();
104 return observable;
105 }
106
107 private:
110 std::atomic<ConcatStage> m_stage{};
111
112 std::array<rpp::composite_disposable, 2> m_child_disposables{};
113 };
114
115 template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
117 {
118 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
119
120 std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable{};
121 mutable bool locally_disposed{};
122
123 template<typename T>
124 void on_next(T&& v) const
125 {
126 disposable->get_observer()->on_next(std::forward<T>(v));
127 }
128
129 void on_error(const std::exception_ptr& err) const
130 {
131 locally_disposed = true;
132 disposable->get_observer()->on_error(err);
133 }
134
135 void on_completed() const
136 {
137 locally_disposed = true;
138 disposable->get_inner_child_disposable().clear();
139
140 ConcatStage current{ConcatStage::Draining};
141 if (disposable->stage().compare_exchange_strong(current, ConcatStage::CompletedWhileDraining, std::memory_order::seq_cst))
142 return;
143
144 assert(current == ConcatStage::Processing);
145
146 disposable->drain();
147 }
148
149 void set_upstream(const disposable_wrapper& d) const { disposable->get_inner_child_disposable().add(d); }
150
151 bool is_disposed() const { return locally_disposed || disposable->get_inner_child_disposable().is_disposed(); }
152 };
153
154 template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
156 {
157 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
158
159 std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable;
160
162 : disposable{init_state(std::move(observer))}
163 {
164 }
165
166 template<typename T>
167 void on_next(T&& v) const
168 {
169 ConcatStage current = ConcatStage::None;
170 if (disposable->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst))
171 disposable->handle_observable(std::forward<T>(v));
172 else
173 disposable->get_queue()->push(std::forward<T>(v));
174 }
175
176 void on_error(const std::exception_ptr& err) const
177 {
178 disposable->get_observer()->on_error(err);
179 }
180
181 void on_completed() const
182 {
183 disposable->get_base_child_disposable().dispose();
184 if (disposable->stage() == ConcatStage::None)
185 disposable->get_observer()->on_completed();
186 }
187
188 void set_upstream(const disposable_wrapper& d) const { disposable->get_base_child_disposable().add(d); }
189
190 bool is_disposed() const { return disposable->get_base_child_disposable().is_disposed(); }
191
192 private:
193 static std::shared_ptr<concat_disposable<TObservable, TObserver>> init_state(TObserver&& observer)
194 {
196 auto ptr = d.lock();
197 ptr->get_observer()->set_upstream(d.as_weak());
198 return ptr;
199 }
200 };
201
202 struct concat_t : lift_operator<concat_t>
203 {
204 using lift_operator<concat_t>::lift_operator;
205
206 template<rpp::constraint::decayed_type T>
208 {
209 static_assert(rpp::constraint::observable<T>, "T is not observable");
210
211 using result_type = rpp::utils::extract_observable_type_t<T>;
212
213 template<rpp::constraint::observer_of_type<result_type> TObserver>
215 };
216
217 template<rpp::details::observables::constraint::disposables_strategy Prev>
219 };
220} // namespace rpp::operators::details
221
222
223namespace rpp::operators
224{
250 inline auto concat()
251 {
252 return details::concat_t{};
253 }
254} // namespace rpp::operators
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Definition base_disposable.hpp:23
Definition disposable_wrapper.hpp:252
Main RPP wrapper over disposables.
Definition fwd.hpp:27
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
void subscribe(observer< Type, ObserverStrategy > &&observer) const
Subscribes passed observer to emissions from this observable.
Definition observable.hpp:58
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition strategy.hpp:28
Definition utils.hpp:260
Definition constraints.hpp:19
Definition fwd.hpp:80
auto concat()
Make observable which would merge emissions from underlying observables but without overlapping (curr...
Definition concat.hpp:242
Definition disposables_strategy.hpp:29
Definition concat.hpp:203