13#include <rpp/operators/lift.hpp>
14#include <rpp/operators/merge.hpp>
15#include <rpp/operators/fwd/switch_on_next.hpp>
16#include <rpp/subscribers/constraints.hpp>
17#include <rpp/utils/functors.hpp>
18#include <rpp/utils/spinlock.hpp>
24IMPLEMENTATION_FILE(switch_on_next_tag);
30 using merge_state::merge_state;
38 const std::shared_ptr<switch_on_next_state>& state)
const
41 if (state->count_of_on_completed_needed.load(std::memory_order::acquire) == 1)
46using switch_on_next_on_next_inner = merge_forwarding_on_next;
51 template<constra
int::observable TObs>
52 void operator()(
const TObs& new_observable,
54 const std::shared_ptr<switch_on_next_state>& state)
const
56 using ValueType = utils::extract_observable_type_t<TObs>;
58 state->current_inner_observable.unsubscribe();
59 state->current_inner_observable = state->children_subscriptions.make_child();
60 state->current_inner_observable.add([state = std::weak_ptr{state}]
62 if (
const auto locked = state.lock())
63 locked->count_of_on_completed_needed.fetch_sub(1, std::memory_order::relaxed);
66 state->count_of_on_completed_needed.fetch_add(1, std::memory_order::relaxed);
68 new_observable.subscribe(create_subscriber_with_state<ValueType>(state->current_inner_observable,
69 switch_on_next_on_next_inner{},
81 using switch_on_next_state::switch_on_next_state;
84 utils::spinlock spinlock{};
87template<constra
int::decayed_type Type>
90 using ValueType = utils::extract_observable_type_t<Type>;
92 template<constra
int::subscriber_of_type<ValueType> TSub>
93 auto operator()(TSub&& in_subscriber)
const
95 auto state = std::make_shared<switch_on_next_state_with_serialized_spinlock>(in_subscriber.get_subscription());
98 auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber), std::shared_ptr<utils::spinlock>{state, &state->spinlock});
100 state->count_of_on_completed_needed.fetch_add(1, std::memory_order::relaxed);
102 auto subscription = state->children_subscriptions.make_child();
103 return create_subscriber_with_state<Type>(std::move(subscription),
107 std::move(subscriber),
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
Definition: constraints.hpp:19
Definition: early_unsubscribe.hpp:28
Definition: switch_on_next.hpp:89
Definition: switch_on_next.hpp:36
Definition: switch_on_next.hpp:50
Definition: switch_on_next.hpp:80
Definition: switch_on_next.hpp:29