ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
merge.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/disposables/composite_disposable.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/schedulers/current_thread.hpp>
19#include <rpp/utils/tuple.hpp>
20#include <rpp/utils/utils.hpp>
21
22#include <atomic>
23
24namespace rpp::operators::details
25{
26 template<rpp::constraint::observer TObserver>
27 class merge_disposable final : public composite_disposable
28 {
29 public:
30 merge_disposable(TObserver&& observer)
31 : m_observer(std::move(observer))
32 {
33 }
34
35 // just need atomicity, not guarding anything
36 void increment_on_completed() { m_on_completed_needed.fetch_add(1, std::memory_order::seq_cst); }
37
38 // just need atomicity, not guarding anything
39 bool decrement_on_completed() { return m_on_completed_needed.fetch_sub(1, std::memory_order::seq_cst) == 1; }
40
41 rpp::utils::pointer_under_lock<TObserver> get_observer_under_lock() { return m_observer; }
42
43 private:
45 std::atomic_size_t m_on_completed_needed{1};
46 };
47
48 template<rpp::constraint::observer TObserver>
49 struct merge_observer_base_strategy
50 {
51 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean;
52 merge_observer_base_strategy(std::shared_ptr<merge_disposable<TObserver>>&& disposable)
53 : m_disposable{std::move(disposable)}
54 {
55 }
56
57 merge_observer_base_strategy(const std::shared_ptr<merge_disposable<TObserver>>& disposable)
58 : m_disposable{disposable}
59 {
60 }
61
62 void set_upstream(const rpp::disposable_wrapper& d) const
63 {
64 m_disposable->add(d);
65 m_disposables.push_back(d);
66 }
67
68 bool is_disposed() const
69 {
70 return m_disposable->is_disposed();
71 }
72
73 void on_error(const std::exception_ptr& err) const
74 {
75 m_disposable->get_observer_under_lock()->on_error(err);
76 }
77
78 void on_completed() const
79 {
80 if (m_disposable->decrement_on_completed())
81 {
82 m_disposable->get_observer_under_lock()->on_completed();
83 }
84 else
85 {
86 for (const auto& v : m_disposables)
87 {
88 m_disposable->remove(v);
89 v.dispose();
90 }
91 }
92 }
93
94 protected:
95 std::shared_ptr<merge_disposable<TObserver>> m_disposable;
96 mutable std::vector<rpp::disposable_wrapper> m_disposables{};
97 };
98
99 template<rpp::constraint::observer TObserver>
100 struct merge_observer_inner_strategy final : public merge_observer_base_strategy<TObserver>
101 {
102 using merge_observer_base_strategy<TObserver>::merge_observer_base_strategy;
103
104 template<typename T>
105 void on_next(T&& v) const
106 {
107 merge_observer_base_strategy<TObserver>::m_disposable->get_observer_under_lock()->on_next(std::forward<T>(v));
108 }
109 };
110
111 template<rpp::constraint::observer TObserver>
112 class merge_observer_strategy final : public merge_observer_base_strategy<TObserver>
113 {
114 public:
115 explicit merge_observer_strategy(TObserver&& observer)
116 : merge_observer_base_strategy<TObserver>{init_state(std::move(observer))}
117 {
118 }
119
120 template<typename T>
121 void on_next(T&& v) const
122 {
123 merge_observer_base_strategy<TObserver>::m_disposable->increment_on_completed();
124 std::forward<T>(v).subscribe(rpp::observer<rpp::utils::extract_observer_type_t<TObserver>, merge_observer_inner_strategy<TObserver>>{merge_observer_inner_strategy<TObserver>{merge_observer_base_strategy<TObserver>::m_disposable}});
125 }
126
127 private:
128 static std::shared_ptr<merge_disposable<TObserver>> init_state(TObserver&& observer)
129 {
130 const auto d = disposable_wrapper_impl<merge_disposable<TObserver>>::make(std::move(observer));
131 auto ptr = d.lock();
132 ptr->get_observer_under_lock()->set_upstream(d.as_weak());
133 return ptr;
134 }
135 };
136
137 struct merge_t : lift_operator<merge_t>
138 {
139 using lift_operator<merge_t>::lift_operator;
140
141 template<rpp::constraint::decayed_type T>
143 {
144 static_assert(rpp::constraint::observable<T>, "T is not observable");
145
146 using result_type = rpp::utils::extract_observable_type_t<T>;
147
148 constexpr static bool own_current_queue = true;
149
150 template<rpp::constraint::observer_of_type<result_type> TObserver>
151 using observer_strategy = merge_observer_strategy<std::decay_t<TObserver>>;
152 };
153
154 template<rpp::details::observables::constraint::disposables_strategy Prev>
155 using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>;
156 };
157
158 template<rpp::constraint::observable... TObservables>
160 {
161 RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple<TObservables...> observables{};
162
163 template<rpp::constraint::decayed_type T>
165 {
166 static_assert((std::same_as<T, rpp::utils::extract_observable_type_t<TObservables>> && ...), "T is not same as values of other observables");
167
168 using result_type = T;
169 };
170
171 template<rpp::details::observables::constraint::disposables_strategy Prev>
172 using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>;
173
174 template<rpp::constraint::observer Observer, typename... Strategies>
175 void subscribe(Observer&& observer, const rpp::details::observables::chain<Strategies...>& observable_strategy) const
176 {
177 merge_observer_strategy<std::decay_t<Observer>> strategy{std::forward<Observer>(observer)};
178
179 // Need to take ownership over current_thread in case of inner-observables also using it
180 auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();
181
182 strategy.on_next(observable_strategy);
183 observables.apply(&apply<std::decay_t<Observer>>, strategy);
184 strategy.on_completed();
185 }
186
187 private:
188 template<rpp::constraint::observer Observer>
189 static void apply(const merge_observer_strategy<Observer>& strategy, const TObservables&... observables)
190 {
191 (strategy.on_next(observables), ...);
192 }
193 };
194} // namespace rpp::operators::details
195
196namespace rpp::operators
197{
221 * @note `#include <rpp/operators/merge.hpp>`
222 *
223 * @par Example:
224 * @snippet merge.cpp merge
225 *
226 * @ingroup combining_operators
227 * @see https://reactivex.io/documentation/operators/merge.html
228 */
229 inline auto merge()
230 {
231 return details::merge_t{};
232 }
233
252 * - Acquiring mutex during all observer's calls
253 *
254 * @param observables are observables whose emissions would be merged with current observable
255 * @note `#include <rpp/operators/merge.hpp>`
256 *
257 * @par Example:
258 * @snippet merge.cpp merge_with
259 *
260 * @ingroup combining_operators
261 * @see https://reactivex.io/documentation/operators/merge.html
262 */
263 template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
264 requires constraint::observables_of_same_type<std::decay_t<TObservable>, std::decay_t<TObservables>...>
265 auto merge_with(TObservable&& observable, TObservables&&... observables)
266 {
267 return details::merge_with_t<std::decay_t<TObservable>, std::decay_t<TObservables>...>{
268 rpp::utils::tuple{std::forward<TObservable>(observable), std::forward<TObservables>(observables)...}};
269 }
270} // namespace rpp::operators
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Definition chain_strategy.hpp:22
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition tuple.hpp:105
Definition utils.hpp:260
Definition fwd.hpp:80
Definition fwd.hpp:250
auto merge_with(TObservable &&observable, TObservables &&... observables)
Combines submissions from current observable with other observables into one.
Definition merge.hpp:252
auto merge()
Converts observable of observables of items into observable of items via merging emissions.
Definition merge.hpp:221
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition disposables_strategy.hpp:29
Definition merge.hpp:138