13#include <rpp/operators/fwd.hpp>
15#include <rpp/defs.hpp>
16#include <rpp/disposables/composite_disposable.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/schedulers/current_thread.hpp>
19#include <rpp/utils/tuple.hpp>
20#include <rpp/utils/utils.hpp>
24namespace rpp::operators::details
26 template<rpp::constra
int::observer TObserver>
36 void increment_on_completed() { m_on_completed_needed.fetch_add(1, std::memory_order::seq_cst); }
39 bool decrement_on_completed() {
return m_on_completed_needed.fetch_sub(1, std::memory_order::seq_cst) == 1; }
41 rpp::utils::pointer_under_lock<TObserver> get_observer_under_lock() {
return m_observer; }
45 std::atomic_size_t m_on_completed_needed{1};
48 template<rpp::constra
int::observer TObserver>
51 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
53 : m_disposable{std::move(disposable)}
58 : m_disposable{disposable}
65 m_disposables.push_back(d);
68 bool is_disposed()
const
70 return m_disposable->is_disposed();
73 void on_error(
const std::exception_ptr& err)
const
75 m_disposable->get_observer_under_lock()->on_error(err);
78 void on_completed()
const
80 if (m_disposable->decrement_on_completed())
82 m_disposable->get_observer_under_lock()->on_completed();
86 for (
const auto& v : m_disposables)
88 m_disposable->remove(v);
95 std::shared_ptr<merge_disposable<TObserver>> m_disposable;
96 mutable std::vector<rpp::disposable_wrapper> m_disposables{};
99 template<rpp::constra
int::observer TObserver>
105 void on_next(T&& v)
const
111 template<rpp::constra
int::observer TObserver>
121 void on_next(T&& v)
const
128 static std::shared_ptr<merge_disposable<TObserver>> init_state(TObserver&&
observer)
132 ptr->get_observer_under_lock()->set_upstream(d.as_weak());
141 template<rpp::constra
int::decayed_type T>
146 using result_type = rpp::utils::extract_observable_type_t<T>;
148 constexpr static bool own_current_queue =
true;
150 template<rpp::constra
int::observer_of_type<result_type> TObserver>
154 template<rpp::details::observables::constra
int::disposables_strategy Prev>
163 template<rpp::constra
int::decayed_type T>
166 static_assert((std::same_as<T, rpp::utils::extract_observable_type_t<TObservables>> && ...),
"T is not same as values of other observables");
168 using result_type = T;
171 template<rpp::details::observables::constra
int::disposables_strategy Prev>
180 auto drain_on_exit = rpp::schedulers::current_thread::own_queue_and_drain_finally_if_not_owned();
182 strategy.on_next(observable_strategy);
183 observables.apply(&apply<std::decay_t<Observer>>, strategy);
184 strategy.on_completed();
188 template<rpp::constra
int::observer Observer>
189 static void apply(
const merge_observer_strategy<Observer>& strategy,
const TObservables&... observables)
191 (strategy.on_next(observables), ...);
196namespace rpp::operators
231 return details::merge_t{};
264 requires constraint::observables_of_same_type<std::decay_t<TObservable>, std::decay_t<TObservables>...>
265 auto merge_with(TObservable&& observable, TObservables&&... observables)
267 return details::merge_with_t<std::decay_t<TObservable>, std::decay_t<TObservables>...>{
268 rpp::utils::tuple{std::forward<TObservable>(observable), std::forward<TObservables>(observables)...}};
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition strategy.hpp:28
auto merge_with(TObservable &&observable, TObservables &&... observables)
Combines submissions from current observable with other observables into one.
Definition merge.hpp:252
auto merge()
Converts observable of observables of items into observable of items via merging emissions.
Definition merge.hpp:221
Definition disposables_strategy.hpp:29