12#include <rpp/subjects/fwd.hpp>
13#include <rpp/utils/constraints.hpp>
14#include <rpp/subscribers/dynamic_subscriber.hpp>
15#include <rpp/subjects/details/subject_state.hpp>
16#include <rpp/subjects/details/base_subject.hpp>
18namespace rpp::subjects::details
20template<rpp::constra
int::decayed_type T>
24 template<rpp::constra
int::decayed_same_as<T> TT, rpp::constra
int::decayed_same_as<composite_subscription> TSub>
26 : m_state{std::make_shared<behavior_state>(std::forward<TT>(v))}
27 , m_sub{std::forward<TSub>(sub)}
29 m_sub.
add([state = std::weak_ptr{m_state}]
31 if (
const auto locked = state.lock())
32 locked->on_unsubscribe();
39 sub.on_next(m_state->get_value());
41 m_state->on_subscribe(sub);
44 auto get_subscriber()
const
46 return rpp::make_specific_subscriber<T>(m_sub,
47 [state = m_state](
const T& v)
52 [state = m_state](
const std::exception_ptr& err)
58 state->on_completed();
64 return m_state->get_value();
71 behavior_state(
const T& v)
77 , value{std::move(v)} {}
81 std::lock_guard lock{mutex};
86 void set_value(
const T& v)
88 std::lock_guard lock{mutex};
98 std::shared_ptr<behavior_state> m_state;
103namespace rpp::subjects
117template<rpp::constra
int::decayed_type T>
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
std::weak_ptr< details::subscription_state > add(const TSub &sub=TSub{}) const
Add any other subscription to this as dependent.
Definition: composite_subscription.hpp:43
subscriber which uses dynamic_observer<T> to hide original callbacks
Definition: dynamic_subscriber.hpp:24
Subject which multicasts values to observers subscribed on it and sends last emitted value (or initia...
Definition: behavior_subject.hpp:119
Definition: base_subject.hpp:23
Definition: behavior_subject.hpp:22
Definition: subject_state.hpp:30
bool is_subscribed() const
indicates current status of subscription
Definition: subscription_base.hpp:51