13#include <rpp/operators/fwd.hpp>
15#include <rpp/disposables/composite_disposable.hpp>
16#include <rpp/operators/details/strategy.hpp>
17#include <rpp/utils/utils.hpp>
24namespace rpp::operators::details
26 template<rpp::constra
int::observable TObservable, rpp::constra
int::observer TObserver>
29 enum class ConcatStage : uint8_t
33 CompletedWhileDraining = 2,
37 template<rpp::constra
int::observable TObservable, rpp::constra
int::observer TObserver>
38 class concat_disposable final :
public rpp::details::base_disposable
42 concat_disposable(TObserver&&
observer)
47 rpp::utils::pointer_under_lock<TObserver> get_observer() {
return m_observer; }
48 rpp::utils::pointer_under_lock<std::queue<TObservable>> get_queue() {
return m_queue; }
50 std::atomic<ConcatStage>& stage() {
return m_stage; }
54 while (!is_disposed())
59 stage().store(ConcatStage::None, std::memory_order::relaxed);
60 if (get_base_child_disposable().is_disposed())
61 get_observer()->on_completed();
65 if (handle_observable_impl(
observable.value()))
84 stage().store(ConcatStage::Draining, std::memory_order::relaxed);
87 ConcatStage current = ConcatStage::Draining;
88 return stage().compare_exchange_strong(current, ConcatStage::Processing, std::memory_order::seq_cst);
91 void base_dispose_impl(interface_disposable::Mode)
noexcept override
93 for (
auto& d : m_child_disposables)
97 std::optional<TObservable> get_observable()
99 auto queue = get_queue();
110 std::atomic<ConcatStage> m_stage{};
112 std::array<rpp::composite_disposable, 2> m_child_disposables{};
115 template<rpp::constra
int::observable TObservable, rpp::constra
int::observer TObserver>
118 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean;
120 std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable{};
123 void on_next(T&& v)
const
125 disposable->get_observer()->on_next(std::forward<T>(v));
128 void on_error(
const std::exception_ptr& err)
const
130 disposable->get_observer()->on_error(err);
133 void on_completed()
const
135 disposable->get_inner_child_disposable().clear();
137 ConcatStage current{ConcatStage::Draining};
138 if (disposable->stage().compare_exchange_strong(current, ConcatStage::CompletedWhileDraining, std::memory_order::seq_cst))
141 assert(current == ConcatStage::Processing);
146 void set_upstream(
const disposable_wrapper& d)
const { disposable->get_inner_child_disposable().add(d); }
148 bool is_disposed()
const {
return disposable->get_inner_child_disposable().is_disposed(); }
151 template<rpp::constra
int::observable TObservable, rpp::constra
int::observer TObserver>
152 struct concat_observer_strategy
154 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
156 std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable;
158 concat_observer_strategy(TObserver&&
observer)
159 : disposable{init_state(std::move(
observer))}
164 void on_next(T&& v)
const
166 ConcatStage current = ConcatStage::None;
167 if (disposable->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst))
168 disposable->handle_observable(std::forward<T>(v));
170 disposable->get_queue()->push(std::forward<T>(v));
173 void on_error(
const std::exception_ptr& err)
const
175 disposable->get_observer()->on_error(err);
178 void on_completed()
const
180 disposable->get_base_child_disposable().dispose();
181 if (disposable->stage() == ConcatStage::None)
182 disposable->get_observer()->on_completed();
185 void set_upstream(
const disposable_wrapper& d)
const { disposable->get_base_child_disposable().add(d); }
187 bool is_disposed()
const {
return disposable->get_base_child_disposable().is_disposed(); }
190 static std::shared_ptr<concat_disposable<TObservable, TObserver>> init_state(TObserver&&
observer)
194 ptr->get_observer()->set_upstream(d.as_weak());
201 using lift_operator<
concat_t>::lift_operator;
203 template<rpp::constra
int::decayed_type T>
208 using result_type = rpp::utils::extract_observable_type_t<T>;
210 template<rpp::constra
int::observer_of_type<result_type> TObserver>
214 template<rpp::details::observables::constra
int::disposables_strategy Prev>
220namespace rpp::operators
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Definition disposable_wrapper.hpp:252
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
void subscribe(observer< Type, ObserverStrategy > &&observer) const
Subscribes passed observer to emissions from this observable.
Definition observable.hpp:58
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition constraints.hpp:19
auto concat()
Make observable which would merge emissions from underlying observables but without overlapping (curr...
Definition concat.hpp:239
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition disposables_strategy.hpp:29
Definition concat.hpp:117
Definition concat.hpp:153
Definition concat.hpp:205
Definition concat.hpp:200