12#include <rpp/sources/fwd.hpp>
14#include <rpp/memory_model.hpp>
15#include <rpp/observables/observable.hpp>
16#include <rpp/observables/variant_observable.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/sources/from.hpp>
24 template<rpp::constra
int::observer TObserver, constra
int::decayed_type PackedContainer>
27 concat_state_t(TObserver&& in_observer,
const PackedContainer& in_container)
29 , container(in_container)
33 itr = std::cbegin(container);
41 RPP_NO_UNIQUE_ADDRESS TObserver
observer;
42 RPP_NO_UNIQUE_ADDRESS PackedContainer container;
43 std::optional<
decltype(std::cbegin(container))> itr{};
44 std::atomic<bool> is_inside_drain{};
47 template<rpp::constra
int::observer TObserver,
typename PackedContainer>
48 void drain(TObserver&&
observer, PackedContainer&& container,
size_t index);
50 template<rpp::constra
int::observer TObserver, constra
int::decayed_type PackedContainer>
53 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
55 std::shared_ptr<concat_state_t<TObserver, PackedContainer>> state{};
56 mutable bool locally_disposed{};
59 void on_next(T&& v)
const
61 state->observer.on_next(std::forward<T>(v));
64 void on_error(
const std::exception_ptr& err)
const
66 locally_disposed =
true;
67 state->observer.on_error(err);
72 bool is_disposed()
const {
return locally_disposed || state->is_disposed(); }
74 void on_completed()
const
76 locally_disposed =
true;
79 if (state->is_inside_drain.exchange(
false, std::memory_order::seq_cst))
86 template<rpp::constra
int::observer TObserver,
typename PackedContainer>
89 while (!state->is_disposed())
91 if (state->itr.value() == std::cend(state->container))
93 state->observer.on_completed();
97 using value_type = rpp::utils::extract_observable_type_t<utils::iterable_value_t<PackedContainer>>;
98 state->is_inside_drain.store(
true, std::memory_order::seq_cst);
101 (*(state->itr.value()++)).subscribe(
observer<value_type, concat_source_observer_strategy<std::decay_t<TObserver>, std::decay_t<PackedContainer>>>{state});
103 if (state->is_inside_drain.exchange(
false, std::memory_order::seq_cst))
108 state->observer.on_error(std::current_exception());
114 template<constra
int::decayed_type PackedContainer>
117 template<
typename... Args>
120 : container{std::forward<Args>(args)...}
124 RPP_NO_UNIQUE_ADDRESS PackedContainer container;
126 using value_type = rpp::utils::extract_observable_type_t<utils::iterable_value_t<PackedContainer>>;
131 template<constra
int::observer_strategy<value_type> Strategy>
135 const auto state = d.lock();
136 state->observer.set_upstream(d.as_weak());
141 template<
typename PackedContainer,
typename... Args>
142 auto make_concat_from_iterable(Args&&... args)
178 requires (std::same_as<rpp::utils::extract_observable_type_t<TObservable>, rpp::utils::extract_observable_type_t<TObservables>> && ...)
179 auto concat(TObservable&& obs, TObservables&&... others)
183 using inner_container = std::array<std::decay_t<TObservable>,
sizeof...(TObservables) + 1>;
184 using container = std::conditional_t<std::same_as<MemoryModel, rpp::memory_model::use_stack>, inner_container, details::shared_container<inner_container>>;
185 return rpp::details::make_concat_from_iterable<container>(std::forward<TObservable>(obs), std::forward<TObservables>(others)...);
190 return concat<MemoryModel>(variant_observable_t{std::forward<TObservable>(obs)}, variant_observable_t{std::forward<TObservables>(others)}...);
220 template<constraint::memory_model MemoryModel , constraint::iterable Iterable>
221 requires constraint::observable<utils::iterable_value_t<Iterable>>
222 auto concat(Iterable&& iterable)
224 using Container = std::conditional_t<std::same_as<MemoryModel, rpp::memory_model::use_stack>, std::decay_t<Iterable>, details::shared_container<std::decay_t<Iterable>>>;
225 return rpp::details::make_concat_from_iterable<Container>(std::forward<Iterable>(iterable));
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
void on_error(const std::exception_ptr &err) const noexcept
Observable calls this method to notify observer about some error during generation next data.
Definition observer.hpp:120
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Extension over rpp::observable to provide ability statically keep one of multiple observables.
Definition variant_observable.hpp:54
Definition constraints.hpp:19
Definition constraints.hpp:31
auto concat(TObservable &&obs, TObservables &&... others)
Make observable which would merge emissions from underlying observables but without overlapping (curr...
Definition concat.hpp:171
Definition concat.hpp:116
Definition disposables_strategy.hpp:19