13#include <rpp/operators/fwd.hpp>
15#include <rpp/disposables/composite_disposable.hpp>
16#include <rpp/operators/details/strategy.hpp>
17#include <rpp/utils/utils.hpp>
24namespace rpp::operators::details
26 template<rpp::constra
int::observable TObservable, rpp::constra
int::observer TObserver>
27 struct concat_inner_observer_strategy;
29 enum class ConcatStage : uint8_t
33 CompletedWhileDraining = 2,
37 template<rpp::constra
int::observable TObservable, rpp::constra
int::observer TObserver>
47 rpp::utils::pointer_under_lock<TObserver> get_observer() {
return m_observer; }
48 rpp::utils::pointer_under_lock<std::queue<TObservable>> get_queue() {
return m_queue; }
50 std::atomic<ConcatStage>& stage() {
return m_stage; }
54 while (!is_disposed())
59 stage().store(ConcatStage::None, std::memory_order::relaxed);
60 if (get_base_child_disposable().is_disposed())
61 get_observer()->on_completed();
65 if (handle_observable_impl(
observable.value()))
84 stage().store(ConcatStage::Draining, std::memory_order::relaxed);
87 ConcatStage current = ConcatStage::Draining;
88 return stage().compare_exchange_strong(current, ConcatStage::Processing, std::memory_order::seq_cst);
91 void base_dispose_impl(interface_disposable::Mode)
noexcept override
93 for (
auto& d : m_child_disposables)
97 std::optional<TObservable> get_observable()
99 auto queue = get_queue();
110 std::atomic<ConcatStage> m_stage{};
112 std::array<rpp::composite_disposable, 2> m_child_disposables{};
115 template<rpp::constra
int::observable TObservable, rpp::constra
int::observer TObserver>
118 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
120 std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable{};
121 mutable bool locally_disposed{};
124 void on_next(T&& v)
const
126 disposable->get_observer()->on_next(std::forward<T>(v));
129 void on_error(
const std::exception_ptr& err)
const
131 locally_disposed =
true;
132 disposable->get_observer()->on_error(err);
135 void on_completed()
const
137 locally_disposed =
true;
138 disposable->get_inner_child_disposable().clear();
140 ConcatStage current{ConcatStage::Draining};
141 if (disposable->stage().compare_exchange_strong(current, ConcatStage::CompletedWhileDraining, std::memory_order::seq_cst))
144 assert(current == ConcatStage::Processing);
149 void set_upstream(
const disposable_wrapper& d)
const { disposable->get_inner_child_disposable().add(d); }
151 bool is_disposed()
const {
return locally_disposed || disposable->get_inner_child_disposable().is_disposed(); }
154 template<rpp::constra
int::observable TObservable, rpp::constra
int::observer TObserver>
157 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
159 std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable;
162 : disposable{init_state(std::move(
observer))}
167 void on_next(T&& v)
const
169 ConcatStage current = ConcatStage::None;
170 if (disposable->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst))
171 disposable->handle_observable(std::forward<T>(v));
173 disposable->get_queue()->push(std::forward<T>(v));
176 void on_error(
const std::exception_ptr& err)
const
178 disposable->get_observer()->on_error(err);
181 void on_completed()
const
183 disposable->get_base_child_disposable().dispose();
184 if (disposable->stage() == ConcatStage::None)
185 disposable->get_observer()->on_completed();
188 void set_upstream(
const disposable_wrapper& d)
const { disposable->get_base_child_disposable().add(d); }
190 bool is_disposed()
const {
return disposable->get_base_child_disposable().is_disposed(); }
193 static std::shared_ptr<concat_disposable<TObservable, TObserver>> init_state(TObserver&&
observer)
197 ptr->get_observer()->set_upstream(d.as_weak());
206 template<rpp::constra
int::decayed_type T>
211 using result_type = rpp::utils::extract_observable_type_t<T>;
213 template<rpp::constra
int::observer_of_type<result_type> TObserver>
217 template<rpp::details::observables::constra
int::disposables_strategy Prev>
223namespace rpp::operators
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Definition base_disposable.hpp:23
Definition disposable_wrapper.hpp:252
Main RPP wrapper over disposables.
Definition fwd.hpp:27
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
void subscribe(observer< Type, ObserverStrategy > &&observer) const
Subscribes passed observer to emissions from this observable.
Definition observable.hpp:58
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition strategy.hpp:28
Definition constraints.hpp:19
auto concat()
Make observable which would merge emissions from underlying observables but without overlapping (curr...
Definition concat.hpp:242
Definition disposables_strategy.hpp:29
Definition concat.hpp:117
Definition concat.hpp:156
Definition concat.hpp:208
Definition concat.hpp:203