12#include <rpp/observables/constraints.hpp>
13#include <rpp/operators/fwd/ref_count.hpp>
14#include <rpp/subjects/constraints.hpp>
15#include <rpp/subjects/type_traits.hpp>
16#include <rpp/subscriptions/composite_subscription.hpp>
17#include <rpp/defs.hpp>
19#include <rpp/operators/details/subscriber_with_state.hpp>
35template<constraint::decayed_type Type,
36 subjects::constraint::subject_of_type<Type> Subject,
37 constraint::observable_of_type<Type> OriginalObservable>
39 :
public decltype(std::declval<Subject>().get_observable())
40 ,
public details::member_overload<Type, connectable_observable<Type, Subject, OriginalObservable>, details::ref_count_tag>
42 using base =
decltype(std::declval<Subject>().get_observable());
45 : base{subject.get_observable()}
46 , m_original_observable{original_observable}
47 , m_state{std::make_shared<state_t>(subject)} {}
50 : base{subject.get_observable()}
51 , m_original_observable{std::move(original_observable)}
52 , m_state{std::make_shared<state_t>(subject)} {}
56 auto subscriber = m_state->subject.get_subscriber();
57 const auto& subscriber_subscription = subscriber.get_subscription();
60 std::lock_guard lock(m_state->mutex);
62 if (!m_state->sub.is_empty())
65 subscriber_subscription.add(subscription);
66 m_state->sub = subscription;
69 subscription.add([state = std::weak_ptr{m_state}]
71 if (
const auto locked = state.lock())
73 auto current_sub = composite_subscription::empty();
75 std::lock_guard lock(locked->mutex);
76 std::swap(current_sub, locked->sub);
78 current_sub.unsubscribe();
79 locked->subject.get_subscriber().get_subscription().remove(current_sub);
84 m_original_observable.subscribe(create_subscriber_with_state<Type>(m_state->sub,
85 utils::forwarding_on_next{},
86 utils::forwarding_on_error{},
87 utils::forwarding_on_completed{},
88 subscriber.get_observer(),
96 OriginalObservable m_original_observable;
99 state_t(
const Subject& subj) : subject{subj} {}
103 composite_subscription sub = composite_subscription::empty();
106 std::shared_ptr<state_t> m_state{};
109template<constra
int::observable OriginalObservable, subjects::constra
int::subject Subject>
110connectable_observable(
const OriginalObservable&,
const Subject&) -> connectable_observable<subjects::utils::extract_subject_type_t<Subject>, Subject, OriginalObservable>;
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
connectable alternative of observable: extends interface with extra functionality....
Definition: connectable_observable.hpp:41
Definition: member_overload.hpp:19