12#include <rpp/sources/fwd.hpp>
14#include <rpp/memory_model.hpp>
15#include <rpp/observables/observable.hpp>
16#include <rpp/observables/variant_observable.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/sources/from.hpp>
24 template<rpp::constra
int::observer TObserver, constra
int::decayed_type PackedContainer>
27 concat_state_t(TObserver&& in_observer,
const PackedContainer& in_container)
28 : observer(std::move(in_observer))
29 , container(in_container)
33 itr = std::cbegin(container);
37 this->observer.on_error(std::current_exception());
41 RPP_NO_UNIQUE_ADDRESS TObserver observer;
42 RPP_NO_UNIQUE_ADDRESS PackedContainer container;
43 std::optional<
decltype(std::cbegin(container))> itr{};
44 std::atomic<bool> is_inside_drain{};
47 template<rpp::constra
int::observer TObserver,
typename PackedContainer>
48 void drain(TObserver&&
observer, PackedContainer&& container,
size_t index);
50 template<rpp::constra
int::observer TObserver, constra
int::decayed_type PackedContainer>
53 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean;
55 std::shared_ptr<concat_state_t<TObserver, PackedContainer>> state{};
58 void on_next(T&& v)
const
60 state->observer.on_next(std::forward<T>(v));
63 void on_error(
const std::exception_ptr& err)
const
65 state->observer.on_error(err);
70 bool is_disposed()
const {
return state->is_disposed(); }
72 void on_completed()
const
76 if (state->is_inside_drain.exchange(
false, std::memory_order::seq_cst))
83 template<rpp::constra
int::observer TObserver,
typename PackedContainer>
86 while (!state->is_disposed())
88 if (state->itr.value() == std::cend(state->container))
90 state->observer.on_completed();
94 using value_type = rpp::utils::extract_observable_type_t<utils::iterable_value_t<PackedContainer>>;
95 state->is_inside_drain.store(
true, std::memory_order::seq_cst);
98 (*(state->itr.value()++)).subscribe(
observer<value_type, concat_source_observer_strategy<std::decay_t<TObserver>, std::decay_t<PackedContainer>>>{state});
100 if (state->is_inside_drain.exchange(
false, std::memory_order::seq_cst))
105 state->observer.on_error(std::current_exception());
111 template<constra
int::decayed_type PackedContainer>
112 struct concat_strategy
114 template<
typename... Args>
116 concat_strategy(Args&&... args)
117 : container{std::forward<Args>(args)...}
121 RPP_NO_UNIQUE_ADDRESS PackedContainer container;
123 using value_type = rpp::utils::extract_observable_type_t<utils::iterable_value_t<PackedContainer>>;
125 using optimal_disposables_strategy = rpp::details::observables::default_disposables_strategy;
128 template<constra
int::observer_strategy<value_type> Strategy>
132 const auto state = d.lock();
133 state->observer.set_upstream(d.as_weak());
138 template<
typename PackedContainer,
typename... Args>
139 auto make_concat_from_iterable(Args&&... args)
175 requires (std::same_as<rpp::utils::extract_observable_type_t<TObservable>, rpp::utils::extract_observable_type_t<TObservables>> && ...)
176 auto concat(TObservable&& obs, TObservables&&... others)
180 using inner_container = std::array<std::decay_t<TObservable>,
sizeof...(TObservables) + 1>;
182 return rpp::details::make_concat_from_iterable<container>(std::forward<TObservable>(obs), std::forward<TObservables>(others)...);
187 return concat<MemoryModel>(variant_observable_t{std::forward<TObservable>(obs)}, variant_observable_t{std::forward<TObservables>(others)}...);
219 auto concat(Iterable&& iterable)
222 return rpp::details::make_concat_from_iterable<Container>(std::forward<Iterable>(iterable));
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Extension over rpp::observable to provide ability statically keep one of multiple observables.
Definition variant_observable.hpp:54
Definition constraints.hpp:19
Definition constraints.hpp:37
Definition memory_model.hpp:31
Definition constraints.hpp:31
auto concat(TObservable &&obs, TObservables &&... others)
Make observable which would merge emissions from underlying observables but without overlapping (curr...
Definition concat.hpp:168
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition concat.hpp:113