12#include <rpp/subjects/fwd.hpp>
13#include <rpp/utils/constraints.hpp>
14#include <rpp/subscribers/dynamic_subscriber.hpp>
15#include <rpp/subjects/details/subject_state.hpp>
16#include <rpp/subjects/details/base_subject.hpp>
18namespace rpp::subjects::details
20template<rpp::constra
int::decayed_type T>
24 template<rpp::constra
int::decayed_same_as<composite_subscription> TSub>
26 : m_sub{std::forward<TSub>(sub)}
28 m_sub.
add([state = std::weak_ptr{m_state}]
30 if(
const auto locked = state.lock())
31 locked->on_unsubscribe();
37 m_state->on_subscribe(sub);
40 auto get_subscriber()
const
42 return rpp::make_specific_subscriber<T>(m_sub,
43 [state = m_state](
const T& v)
47 [state = m_state](
const std::exception_ptr& err)
53 state->on_completed();
58 std::shared_ptr<subject_state<T>> m_state = std::make_shared<subject_state<T>>();
63namespace rpp::subjects
77template<rpp::constra
int::decayed_type T>
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
std::weak_ptr< details::subscription_state > add(const TSub &sub=TSub{}) const
Add any other subscription to this as dependent.
Definition: composite_subscription.hpp:43
subscriber which uses dynamic_observer<T> to hide original callbacks
Definition: dynamic_subscriber.hpp:24
Definition: base_subject.hpp:23
Definition: publish_subject.hpp:22
Subject which just multicasts values to observers subscribed on it. It contains two parts: subscriber...
Definition: publish_subject.hpp:78