14#include <rpp/operators/lift.hpp>
15#include <rpp/operators/merge.hpp>
16#include <rpp/operators/details/subscriber_with_state.hpp>
17#include <rpp/operators/fwd/take_until.hpp>
18#include <rpp/subscribers/constraints.hpp>
19#include <rpp/utils/functors.hpp>
20#include <rpp/utils/spinlock.hpp>
22IMPLEMENTATION_FILE(take_until_tag);
26using take_until_state = early_unsubscribe_state;
28using take_until_on_next = merge_forwarding_on_next;
29using take_until_on_error = merge_on_error;
30using take_until_on_completed = early_unsubscribe_on_completed;
38 void operator()(
auto&&,
const auto& subscriber,
const std::shared_ptr<take_until_state>& state)
const
41 state->children_subscriptions.unsubscribe();
42 subscriber.on_completed();
51 using take_until_state::take_until_state;
54 utils::spinlock spinlock{};
59template<constra
int::decayed_type Type, constra
int::observable TTriggerObservable>
62 using TriggerType = utils::extract_observable_type_t<TTriggerObservable>;
64 TTriggerObservable m_until_observable;
66 template<constra
int::subscriber_of_type<Type> TSub>
67 auto operator()(TSub&& in_subscriber)
const
69 auto state = std::make_shared<take_until_state_with_serialized_spinlock>(in_subscriber.get_subscription());
71 auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber), std::shared_ptr<utils::spinlock>{state, &state->spinlock});
74 m_until_observable.subscribe(create_subscriber_with_state<TriggerType>(state->children_subscriptions.make_child(),
81 auto subscription = state->children_subscriptions.make_child();
82 return create_subscriber_with_state<Type>(std::move(subscription),
86 std::move(subscriber),
Definition: early_unsubscribe.hpp:39
Definition: early_unsubscribe.hpp:28
Definition: early_unsubscribe.hpp:19
"combine_latest" operator (an OperatorFn used by "lift").
Definition: take_until.hpp:61
Definition: take_until.hpp:50
Definition: take_until.hpp:37