13#include <rpp/operators/fwd.hpp>
15#include <rpp/defs.hpp>
16#include <rpp/disposables/composite_disposable.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/schedulers/current_thread.hpp>
19#include <rpp/utils/tuple.hpp>
20#include <rpp/utils/utils.hpp>
24namespace rpp::operators::details
26 template<rpp::constra
int::observer TObserver>
30 merge_disposable(TObserver&&
observer)
36 void increment_on_completed() { m_on_completed_needed.fetch_add(1, std::memory_order::seq_cst); }
39 bool decrement_on_completed() {
return m_on_completed_needed.fetch_sub(1, std::memory_order::seq_cst) == 1; }
41 rpp::utils::pointer_under_lock<TObserver> get_observer_under_lock() {
return m_observer; }
45 std::atomic_size_t m_on_completed_needed{1};
48 template<rpp::constra
int::observer TObserver>
49 struct merge_observer_base_strategy
51 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean;
53 : m_disposable{std::move(disposable)}
58 : m_disposable{disposable}
65 m_disposables.push_back(d);
68 bool is_disposed()
const
70 return m_disposable->is_disposed();
73 void on_error(
const std::exception_ptr& err)
const
75 m_disposable->get_observer_under_lock()->on_error(err);
78 void on_completed()
const
80 if (m_disposable->decrement_on_completed())
82 m_disposable->get_observer_under_lock()->on_completed();
86 for (
const auto& v : m_disposables)
88 m_disposable->remove(v);
95 std::shared_ptr<merge_disposable<TObserver>> m_disposable;
96 mutable std::vector<rpp::disposable_wrapper> m_disposables{};
99 template<rpp::constra
int::observer TObserver>
102 using merge_observer_base_strategy<TObserver>::merge_observer_base_strategy;
105 void on_next(T&& v)
const
107 merge_observer_base_strategy<TObserver>::m_disposable->get_observer_under_lock()->on_next(std::forward<T>(v));
111 template<rpp::constra
int::observer TObserver>
112 class merge_observer_strategy final :
public merge_observer_base_strategy<TObserver>
115 explicit merge_observer_strategy(TObserver&&
observer)
116 : merge_observer_base_strategy<TObserver>{init_state(std::move(
observer))}
121 void on_next(T&& v)
const
123 merge_observer_base_strategy<TObserver>::m_disposable->increment_on_completed();
128 static std::shared_ptr<merge_disposable<TObserver>> init_state(TObserver&&
observer)
132 ptr->get_observer_under_lock()->set_upstream(d.as_weak());
139 using lift_operator<
merge_t>::lift_operator;
141 template<rpp::constra
int::decayed_type T>
146 using result_type = rpp::utils::extract_observable_type_t<T>;
148 constexpr static bool own_current_queue =
true;
150 template<rpp::constra
int::observer_of_type<result_type> TObserver>
154 template<rpp::details::observables::constra
int::disposables_strategy Prev>
163 template<rpp::constra
int::decayed_type T>
166 static_assert((std::same_as<T, rpp::utils::extract_observable_type_t<TObservables>> && ...),
"T is not same as values of other observables");
168 using result_type = T;
171 template<rpp::details::observables::constra
int::disposables_strategy Prev>
180 auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();
182 strategy.on_next(observable_strategy);
183 observables.apply(&apply<std::decay_t<Observer>>, strategy);
184 strategy.on_completed();
188 template<rpp::constra
int::observer Observer>
189 static void apply(
const merge_observer_strategy<Observer>& strategy,
const TObservables&... observables)
191 (strategy.on_next(observables), ...);
196namespace rpp::operators
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Definition chain_strategy.hpp:22
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
auto merge_with(TObservable &&observable, TObservables &&... observables)
Combines submissions from current observable with other observables into one.
Definition merge.hpp:252
auto merge()
Converts observable of observables of items into observable of items via merging emissions.
Definition merge.hpp:221
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition disposables_strategy.hpp:29