ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
concat.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/sources/fwd.hpp>
13
14#include <rpp/memory_model.hpp>
15#include <rpp/observables/observable.hpp>
16#include <rpp/observables/variant_observable.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/sources/from.hpp>
19
20#include <exception>
21
22namespace rpp::details
23{
24 template<rpp::constraint::observer TObserver, constraint::decayed_type PackedContainer>
25 struct concat_state_t : public rpp::composite_disposable
26 {
27 concat_state_t(TObserver&& in_observer, const PackedContainer& in_container)
28 : observer(std::move(in_observer))
29 , container(in_container)
30 {
31 try
32 {
33 itr = std::cbegin(container);
34 }
35 catch (...)
36 {
37 this->observer.on_error(std::current_exception());
38 }
39 }
40
41 RPP_NO_UNIQUE_ADDRESS TObserver observer;
42 RPP_NO_UNIQUE_ADDRESS PackedContainer container;
43 std::optional<decltype(std::cbegin(container))> itr{};
44 std::atomic<bool> is_inside_drain{};
45 };
46
47 template<rpp::constraint::observer TObserver, typename PackedContainer>
48 void drain(TObserver&& observer, PackedContainer&& container, size_t index);
49
50 template<rpp::constraint::observer TObserver, constraint::decayed_type PackedContainer>
52 {
53 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean;
54
55 std::shared_ptr<concat_state_t<TObserver, PackedContainer>> state{};
56
57 template<typename T>
58 void on_next(T&& v) const
59 {
60 state->observer.on_next(std::forward<T>(v));
61 }
62
63 void on_error(const std::exception_ptr& err) const
64 {
65 state->observer.on_error(err);
66 }
67
68 void set_upstream(const disposable_wrapper& d) { state->add(d); }
69
70 bool is_disposed() const { return state->is_disposed(); }
71
72 void on_completed() const
73 {
74 state->clear();
75
76 if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
77 return;
78
79 drain(state);
80 }
81 };
82
83 template<rpp::constraint::observer TObserver, typename PackedContainer>
84 void drain(const std::shared_ptr<concat_state_t<TObserver, PackedContainer>>& state)
85 {
86 while (!state->is_disposed())
87 {
88 if (state->itr.value() == std::cend(state->container))
89 {
90 state->observer.on_completed();
91 return;
92 }
93
94 using value_type = rpp::utils::extract_observable_type_t<utils::iterable_value_t<PackedContainer>>;
95 state->is_inside_drain.store(true, std::memory_order::seq_cst);
96 try
97 {
98 (*(state->itr.value()++)).subscribe(observer<value_type, concat_source_observer_strategy<std::decay_t<TObserver>, std::decay_t<PackedContainer>>>{state});
99
100 if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
101 return;
102 }
103 catch (...)
104 {
105 state->observer.on_error(std::current_exception());
106 return;
107 }
108 }
109 }
110
111 template<constraint::decayed_type PackedContainer>
112 struct concat_strategy
113 {
114 template<typename... Args>
116 concat_strategy(Args&&... args)
117 : container{std::forward<Args>(args)...}
118 {
119 }
120
121 RPP_NO_UNIQUE_ADDRESS PackedContainer container;
122
123 using value_type = rpp::utils::extract_observable_type_t<utils::iterable_value_t<PackedContainer>>;
124
125 using optimal_disposables_strategy = rpp::details::observables::default_disposables_strategy;
126
127
128 template<constraint::observer_strategy<value_type> Strategy>
129 void subscribe(observer<value_type, Strategy>&& obs) const
130 {
131 const auto d = disposable_wrapper_impl<concat_state_t<observer<value_type, Strategy>, PackedContainer>>::make(std::move(obs), container);
132 const auto state = d.lock();
133 state->observer.set_upstream(d.as_weak());
134 drain(state);
135 }
136 };
137
138 template<typename PackedContainer, typename... Args>
139 auto make_concat_from_iterable(Args&&... args)
140 {
142 }
143} // namespace rpp::details
144
145namespace rpp::source
146{
168 * @par Example
169 * @snippet concat.cpp concat_as_source
170 *
171 * @ingroup creational_operators
172 * @see https://reactivex.io/documentation/operators/concat.html
173 */
174 template<constraint::memory_model MemoryModel /*= memory_model::use_stack*/, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
175 requires (std::same_as<rpp::utils::extract_observable_type_t<TObservable>, rpp::utils::extract_observable_type_t<TObservables>> && ...)
176 auto concat(TObservable&& obs, TObservables&&... others)
177 {
179 {
180 using inner_container = std::array<std::decay_t<TObservable>, sizeof...(TObservables) + 1>;
181 using container = std::conditional_t<std::same_as<MemoryModel, rpp::memory_model::use_stack>, inner_container, details::shared_container<inner_container>>;
182 return rpp::details::make_concat_from_iterable<container>(std::forward<TObservable>(obs), std::forward<TObservables>(others)...);
183 }
184 else
185 {
186 using variant_observable_t = rpp::variant_observable<rpp::utils::extract_observable_type_t<TObservable>, std::decay_t<TObservable>, std::decay_t<TObservables>...>;
187 return concat<MemoryModel>(variant_observable_t{std::forward<TObservable>(obs)}, variant_observable_t{std::forward<TObservables>(others)}...);
188 }
189 }
190
204 * @details Actually it subscribes on first observable from emissions. When first observable completes, then it subscribes on second observable from emissions and etc...
205 *
206 * @param iterable is container with observables to subscribe on
207 *
208 * @tparam MemoryModel rpp::memory_model strategy used to handle provided observables
209 * @note `#include <rpp/operators/concat.hpp>`
210 *
211 * @par Example
212 * @snippet concat.cpp concat_as_source_vector
213 *
214 * @ingroup creational_operators
215 * @see https://reactivex.io/documentation/operators/concat.html
216 */
217 template<constraint::memory_model MemoryModel /*= memory_model::use_stack*/, constraint::iterable Iterable>
219 auto concat(Iterable&& iterable)
220 {
221 using Container = std::conditional_t<std::same_as<MemoryModel, rpp::memory_model::use_stack>, std::decay_t<Iterable>, details::shared_container<std::decay_t<Iterable>>>;
222 return rpp::details::make_concat_from_iterable<Container>(std::forward<Iterable>(iterable));
223 }
224} // namespace rpp::source
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Definition from.hpp:30
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Extension over rpp::observable to provide ability statically keep one of multiple observables.
Definition variant_observable.hpp:54
Definition constraints.hpp:19
Definition constraints.hpp:37
Definition memory_model.hpp:31
Definition fwd.hpp:80
Definition constraints.hpp:31
auto concat(TObservable &&obs, TObservables &&... others)
Make observable which would merge emissions from underlying observables but without overlapping (curr...
Definition concat.hpp:168
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition concat.hpp:26
Definition concat.hpp:113