13#include <rpp/schedulers/immediate_scheduler.hpp>
14#include <rpp/observables/dynamic_observable.hpp>
15#include <rpp/operators/lift.hpp>
16#include <rpp/operators/merge.hpp>
17#include <rpp/operators/details/subscriber_with_state.hpp>
18#include <rpp/operators/fwd/concat.hpp>
19#include <rpp/sources/just.hpp>
20#include <rpp/subscribers/constraints.hpp>
21#include <rpp/subscriptions/composite_subscription.hpp>
22#include <rpp/utils/functors.hpp>
23#include <rpp/utils/spinlock.hpp>
30IMPLEMENTATION_FILE(concat_tag);
34template<constra
int::decayed_type ValueType>
39 , source_subscription{children_subscriptions.make_child()} {}
42 std::mutex queue_mutex{};
43 std::queue<dynamic_observable<ValueType>> observables_to_subscribe{};
44 std::atomic_bool inner_subscribed{};
47using concat_on_next_inner = merge_forwarding_on_next;
50template<constra
int::decayed_type ValueType>
53 template<constra
int::observable TObs, constra
int::subscriber TSub>
54 void operator()(TObs&& new_observable,
58 if (state->inner_subscribed.exchange(
true, std::memory_order::acq_rel))
60 std::lock_guard lock{state->queue_mutex};
61 if (state->inner_subscribed.exchange(
true, std::memory_order::relaxed))
63 state->observables_to_subscribe.push(std::forward<TObs>(new_observable).as_dynamic());
67 subscribe_inner_subscriber(new_observable, sub, state);
70 static void subscribe_inner_subscriber(
const auto& observable,
74 observable.subscribe(create_subscriber_with_state<ValueType>(
75 state->children_subscriptions.make_child(),
76 concat_on_next_inner{},
81 std::unique_lock lock{state->queue_mutex};
82 if (!state->observables_to_subscribe.empty())
84 auto res = std::move(state->observables_to_subscribe.front());
85 state->observables_to_subscribe.pop();
87 subscribe_inner_subscriber(res, sub, state);
90 if (state->source_subscription.is_subscribed())
92 state->inner_subscribed.store(false, std::memory_order::relaxed);
104template<constra
int::decayed_type ValueType>
110 std::unique_lock lock{state->queue_mutex};
111 if (!state->inner_subscribed.load(std::memory_order::relaxed))
117template<constra
int::decayed_type ValueType>
123 utils::spinlock spinlock{};
126template<constra
int::decayed_type Type>
129 using ValueType = utils::extract_observable_type_t<Type>;
131 template<constra
int::subscriber_of_type<ValueType> TSub>
132 auto operator()(TSub&& in_subscriber)
const
134 auto state = std::make_shared<concat_state_with_serialized_spinlock<ValueType>>(in_subscriber.get_subscription());
137 auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber), std::shared_ptr<utils::spinlock>{state, &state->spinlock});
139 return create_subscriber_with_state<Type>(state->source_subscription,
143 std::move(subscriber),
148template<constra
int::decayed_type Type, constra
int::observable_of_type<Type> ... TObservables>
149auto concat_with_impl(TObservables&&... observables)
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
Definition: constraints.hpp:19
Definition: concat.hpp:128
Definition: concat.hpp:106
Definition: concat.hpp:52
Definition: concat.hpp:119
Definition: concat.hpp:36
Definition: early_unsubscribe.hpp:28
Definition: early_unsubscribe.hpp:19