13#include <rpp/operators/lift.hpp>                           
   14#include <rpp/operators/merge.hpp> 
   15#include <rpp/operators/fwd/switch_on_next.hpp> 
   16#include <rpp/subscribers/constraints.hpp> 
   17#include <rpp/utils/functors.hpp> 
   18#include <rpp/utils/spinlock.hpp> 
   24IMPLEMENTATION_FILE(switch_on_next_tag);
 
   30    using merge_state::merge_state;
 
   38                    const std::shared_ptr<switch_on_next_state>& state)
 const 
   41        if (state->count_of_on_completed_needed.load(std::memory_order::acquire) == 1)
 
   46using switch_on_next_on_next_inner = merge_forwarding_on_next;
 
   51    template<constra
int::observable TObs>
 
   52    void operator()(
const TObs&                                  new_observable,
 
   54                    const std::shared_ptr<switch_on_next_state>& state)
 const 
   56        using ValueType = utils::extract_observable_type_t<TObs>;
 
   58        state->current_inner_observable.unsubscribe();
 
   59        state->current_inner_observable = state->children_subscriptions.make_child();
 
   60        state->current_inner_observable.add([state = std::weak_ptr{state}]
 
   62            if (
const auto locked = state.lock())
 
   63                locked->count_of_on_completed_needed.fetch_sub(1, std::memory_order::relaxed);
 
   66        state->count_of_on_completed_needed.fetch_add(1, std::memory_order::relaxed);
 
   68        new_observable.subscribe(create_subscriber_with_state<ValueType>(state->current_inner_observable,
 
   69                                                                         switch_on_next_on_next_inner{},
 
   81    using switch_on_next_state::switch_on_next_state;
 
   84    utils::spinlock spinlock{};
 
   87template<constra
int::decayed_type Type>
 
   90    using ValueType = utils::extract_observable_type_t<Type>;
 
   92    template<constra
int::subscriber_of_type<ValueType> TSub>
 
   93    auto operator()(TSub&& in_subscriber)
 const 
   95        auto state = std::make_shared<switch_on_next_state_with_serialized_spinlock>(in_subscriber.get_subscription());
 
   98        auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber), std::shared_ptr<utils::spinlock>{state, &state->spinlock});
 
  100        state->count_of_on_completed_needed.fetch_add(1, std::memory_order::relaxed);
 
  102        auto subscription = state->children_subscriptions.make_child();
 
  103        return create_subscriber_with_state<Type>(std::move(subscription),
 
  107                                                  std::move(subscriber),
 
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
Definition: constraints.hpp:19
Definition: early_unsubscribe.hpp:28
Definition: switch_on_next.hpp:89
Definition: switch_on_next.hpp:36
Definition: switch_on_next.hpp:50
Definition: switch_on_next.hpp:80
Definition: switch_on_next.hpp:29