13#include <rpp/operators/lift.hpp>
14#include <rpp/operators/details/subscriber_with_state.hpp>
15#include <rpp/operators/fwd/ref_count.hpp>
16#include <rpp/sources/create.hpp>
17#include <rpp/subscribers/constraints.hpp>
19IMPLEMENTATION_FILE(ref_count_tag);
27 std::lock_guard lock{m_mutex};
28 if (++m_count_of_active_subs != 1)
37 std::lock_guard lock{ m_mutex };
38 if (--m_count_of_active_subs == 0)
45 size_t m_count_of_active_subs{};
50template<constra
int::decayed_type Type, constra
int::observable_of_type<Type> TObs>
54 std::shared_ptr<ref_count_state_t> state = std::make_shared<ref_count_state_t>();
56 template<constra
int::subscriber_of_type<Type> TSub>
57 void operator()(TSub&& subscriber)
const
59 const bool need_to_connect = state->on_subscribe();
61 subscriber.get_subscription().add([state = std::weak_ptr{state}]
63 if (
const auto locked = state.lock())
64 locked->on_unsubscribe();
67 auto sub = subscriber.get_subscription();
68 observable.subscribe(create_subscriber_with_state<Type>(std::move(sub),
69 utils::forwarding_on_next{},
70 utils::forwarding_on_error{},
71 utils::forwarding_on_completed{},
72 std::forward<TSub>(subscriber),
76 observable.connect(state->get_subscription());
80template<constra
int::decayed_type Type, constra
int::observable_of_type<Type> TObs>
81auto ref_count_impl(TObs&& observable)
83 return source::create<Type>(
ref_count_on_subscribe<Type, std::decay_t<TObs>>{std::forward<TObs>(observable)});
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
void unsubscribe() const
initiates unsubscription process (if subscribed)
Definition: subscription_base.hpp:59
Definition: ref_count.hpp:52
Definition: ref_count.hpp:24