13#include <rpp/schedulers/immediate_scheduler.hpp>
14#include <rpp/operators/lift.hpp>
15#include <rpp/operators/details/early_unsubscribe.hpp>
16#include <rpp/operators/details/serialized_subscriber.hpp>
17#include <rpp/operators/details/subscriber_with_state.hpp>
18#include <rpp/operators/fwd/merge.hpp>
19#include <rpp/sources/just.hpp>
20#include <rpp/subscribers/constraints.hpp>
21#include <rpp/utils/functors.hpp>
27IMPLEMENTATION_FILE(merge_tag);
33 using early_unsubscribe_state::early_unsubscribe_state;
35 std::atomic_size_t count_of_on_completed_needed{};
38using merge_forwarding_on_next = utils::forwarding_on_next;
44 const std::shared_ptr<merge_state>& state)
const
46 if (state->count_of_on_completed_needed.fetch_sub(1, std::memory_order::acq_rel) == 1)
53 template<constra
int::observable TObs>
54 void operator()(
const TObs& new_observable,
56 const std::shared_ptr<merge_state>& state)
const
58 using ValueType = utils::extract_observable_type_t<TObs>;
60 state->count_of_on_completed_needed.fetch_add(1, std::memory_order::relaxed);
62 new_observable.subscribe(create_subscriber_with_state<ValueType>(state->children_subscriptions.make_child(),
63 merge_forwarding_on_next{},
73 using merge_state::merge_state;
78template<constra
int::decayed_type Type>
81 using ValueType = utils::extract_observable_type_t<Type>;
83 template<constra
int::subscriber_of_type<ValueType> TSub>
84 auto operator()(TSub&& in_subscriber)
const
86 auto state = std::make_shared<merge_state_with_serialized_mutex>(in_subscriber.get_subscription());
88 auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber), std::shared_ptr<std::mutex>{state, &state->mutex});
90 state->count_of_on_completed_needed.fetch_add(1, std::memory_order::relaxed);
92 auto subscription = state->children_subscriptions.make_child();
93 return create_subscriber_with_state<Type>(std::move(subscription),
97 std::move(subscriber),
102template<constra
int::decayed_type Type, constra
int::observable_of_type<Type> ... TObservables>
103auto merge_with_impl(TObservables&&... observables)
Definition: constraints.hpp:19
Definition: early_unsubscribe.hpp:28
Definition: early_unsubscribe.hpp:19