ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
concat.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/sources/fwd.hpp>
13
14#include <rpp/memory_model.hpp>
15#include <rpp/observables/observable.hpp>
16#include <rpp/observables/variant_observable.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/sources/from.hpp>
19
20#include <exception>
21
22namespace rpp::details
23{
24 template<rpp::constraint::observer TObserver, constraint::decayed_type PackedContainer>
26 {
27 concat_state_t(TObserver&& in_observer, const PackedContainer& in_container)
28 : observer(std::move(in_observer))
29 , container(in_container)
30 {
31 try
32 {
33 itr = std::cbegin(container);
34 }
35 catch (...)
36 {
37 this->observer.on_error(std::current_exception());
38 }
39 }
40
41 RPP_NO_UNIQUE_ADDRESS TObserver observer;
42 RPP_NO_UNIQUE_ADDRESS PackedContainer container;
43 std::optional<decltype(std::cbegin(container))> itr{};
44 std::atomic<bool> is_inside_drain{};
45 };
46
47 template<rpp::constraint::observer TObserver, typename PackedContainer>
48 void drain(TObserver&& observer, PackedContainer&& container, size_t index);
49
50 template<rpp::constraint::observer TObserver, constraint::decayed_type PackedContainer>
52 {
53 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
54
55 std::shared_ptr<concat_state_t<TObserver, PackedContainer>> state{};
56 mutable bool locally_disposed{};
57
58 template<typename T>
59 void on_next(T&& v) const
60 {
61 state->observer.on_next(std::forward<T>(v));
62 }
63
64 void on_error(const std::exception_ptr& err) const
65 {
66 locally_disposed = true;
67 state->observer.on_error(err);
68 }
69
70 void set_upstream(const disposable_wrapper& d) { state->add(d); }
71
72 bool is_disposed() const { return locally_disposed || state->is_disposed(); }
73
74 void on_completed() const
75 {
76 locally_disposed = true;
77 state->clear();
78
79 if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
80 return;
81
82 drain(state);
83 }
84 };
85
86 template<rpp::constraint::observer TObserver, typename PackedContainer>
87 void drain(const std::shared_ptr<concat_state_t<TObserver, PackedContainer>>& state)
88 {
89 while (!state->is_disposed())
90 {
91 if (state->itr.value() == std::cend(state->container))
92 {
93 state->observer.on_completed();
94 return;
95 }
96
97 using value_type = rpp::utils::extract_observable_type_t<utils::iterable_value_t<PackedContainer>>;
98 state->is_inside_drain.store(true, std::memory_order::seq_cst);
99 try
100 {
101 (*(state->itr.value()++)).subscribe(observer<value_type, concat_source_observer_strategy<std::decay_t<TObserver>, std::decay_t<PackedContainer>>>{state});
102
103 if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
104 return;
105 }
106 catch (...)
107 {
108 state->observer.on_error(std::current_exception());
109 return;
110 }
111 }
112 }
113
114 template<constraint::decayed_type PackedContainer>
116 {
117 template<typename... Args>
119 concat_strategy(Args&&... args)
120 : container{std::forward<Args>(args)...}
121 {
122 }
123
124 RPP_NO_UNIQUE_ADDRESS PackedContainer container;
125
126 using value_type = rpp::utils::extract_observable_type_t<utils::iterable_value_t<PackedContainer>>;
127
129
130
131 template<constraint::observer_strategy<value_type> Strategy>
132 void subscribe(observer<value_type, Strategy>&& obs) const
133 {
134 const auto d = disposable_wrapper_impl<concat_state_t<observer<value_type, Strategy>, PackedContainer>>::make(std::move(obs), container);
135 const auto state = d.lock();
136 state->observer.set_upstream(d.as_weak());
137 drain(state);
138 }
139 };
140
141 template<typename PackedContainer, typename... Args>
142 auto make_concat_from_iterable(Args&&... args)
143 {
145 }
146} // namespace rpp::details
147
148namespace rpp::source
149{
177 template<constraint::memory_model MemoryModel /*= memory_model::use_stack*/, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
178 requires (std::same_as<rpp::utils::extract_observable_type_t<TObservable>, rpp::utils::extract_observable_type_t<TObservables>> && ...)
179 auto concat(TObservable&& obs, TObservables&&... others)
180 {
182 {
183 using inner_container = std::array<std::decay_t<TObservable>, sizeof...(TObservables) + 1>;
184 using container = std::conditional_t<std::same_as<MemoryModel, rpp::memory_model::use_stack>, inner_container, details::shared_container<inner_container>>;
185 return rpp::details::make_concat_from_iterable<container>(std::forward<TObservable>(obs), std::forward<TObservables>(others)...);
186 }
187 else
188 {
189 using variant_observable_t = rpp::variant_observable<rpp::utils::extract_observable_type_t<TObservable>, std::decay_t<TObservable>, std::decay_t<TObservables>...>;
190 return concat<MemoryModel>(variant_observable_t{std::forward<TObservable>(obs)}, variant_observable_t{std::forward<TObservables>(others)}...);
191 }
192 }
193
220 template<constraint::memory_model MemoryModel /*= memory_model::use_stack*/, constraint::iterable Iterable>
221 requires constraint::observable<utils::iterable_value_t<Iterable>>
222 auto concat(Iterable&& iterable)
223 {
224 using Container = std::conditional_t<std::same_as<MemoryModel, rpp::memory_model::use_stack>, std::decay_t<Iterable>, details::shared_container<std::decay_t<Iterable>>>;
225 return rpp::details::make_concat_from_iterable<Container>(std::forward<Iterable>(iterable));
226 }
227} // namespace rpp::source
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
void on_error(const std::exception_ptr &err) const noexcept
Observable calls this method to notify observer about some error during generation next data.
Definition observer.hpp:120
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Extension over rpp::observable to provide ability statically keep one of multiple observables.
Definition variant_observable.hpp:54
Definition constraints.hpp:19
Definition fwd.hpp:80
Definition constraints.hpp:31
auto concat(TObservable &&obs, TObservables &&... others)
Make observable which would merge emissions from underlying observables but without overlapping (curr...
Definition concat.hpp:171
Definition concat.hpp:26
Definition concat.hpp:116
Definition disposables_strategy.hpp:19