13#include <rpp/schedulers/immediate_scheduler.hpp> 
   14#include <rpp/operators/lift.hpp>                            
   15#include <rpp/operators/details/early_unsubscribe.hpp>       
   16#include <rpp/operators/details/serialized_subscriber.hpp>   
   17#include <rpp/operators/details/subscriber_with_state.hpp>   
   18#include <rpp/operators/fwd/merge.hpp>                       
   19#include <rpp/sources/just.hpp>                              
   20#include <rpp/subscribers/constraints.hpp>                   
   21#include <rpp/utils/functors.hpp>                            
   27IMPLEMENTATION_FILE(merge_tag);
 
   33    using early_unsubscribe_state::early_unsubscribe_state;
 
   35    std::atomic_size_t count_of_on_completed_needed{};
 
   38using merge_forwarding_on_next = utils::forwarding_on_next;
 
   44                    const std::shared_ptr<merge_state>& state)
 const 
   46        if (state->count_of_on_completed_needed.fetch_sub(1, std::memory_order::acq_rel) == 1)
 
   53    template<constra
int::observable TObs>
 
   54    void operator()(
const TObs&                         new_observable,
 
   56                    const std::shared_ptr<merge_state>& state)
 const 
   58        using ValueType = utils::extract_observable_type_t<TObs>;
 
   60        state->count_of_on_completed_needed.fetch_add(1, std::memory_order::relaxed);
 
   62        new_observable.subscribe(create_subscriber_with_state<ValueType>(state->children_subscriptions.make_child(),
 
   63                                                                         merge_forwarding_on_next{},
 
   73    using merge_state::merge_state;
 
   78template<constra
int::decayed_type Type>
 
   81    using ValueType = utils::extract_observable_type_t<Type>;
 
   83    template<constra
int::subscriber_of_type<ValueType> TSub>
 
   84    auto operator()(TSub&& in_subscriber)
 const 
   86        auto state = std::make_shared<merge_state_with_serialized_mutex>(in_subscriber.get_subscription());
 
   88        auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber), std::shared_ptr<std::mutex>{state, &state->mutex});
 
   90        state->count_of_on_completed_needed.fetch_add(1, std::memory_order::relaxed);
 
   92        auto subscription = state->children_subscriptions.make_child();
 
   93        return create_subscriber_with_state<Type>(std::move(subscription),
 
   97                                                  std::move(subscriber),
 
  102template<constra
int::decayed_type Type, constra
int::observable_of_type<Type> ... TObservables>
 
  103auto merge_with_impl(TObservables&&... observables)
 
Definition: constraints.hpp:19
Definition: early_unsubscribe.hpp:28
Definition: early_unsubscribe.hpp:19