ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
ref_count.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/lift.hpp> // required due to operator uses lift
14#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
15#include <rpp/operators/fwd/ref_count.hpp> // own forwarding
16#include <rpp/sources/create.hpp> // create observable
17#include <rpp/subscribers/constraints.hpp> // constraint::subscriber_of_type
18
19IMPLEMENTATION_FILE(ref_count_tag);
20
21namespace rpp::details
22{
24{
25 bool on_subscribe()
26 {
27 std::lock_guard lock{m_mutex};
28 if (++m_count_of_active_subs != 1)
29 return false;
30
31 m_sub = composite_subscription{};
32 return true;
33 }
34
35 void on_unsubscribe()
36 {
37 std::lock_guard lock{ m_mutex };
38 if (--m_count_of_active_subs == 0)
39 m_sub.unsubscribe();
40 }
41
42 const composite_subscription& get_subscription() const { return m_sub; }
43
44private:
45 size_t m_count_of_active_subs{};
46 composite_subscription m_sub = composite_subscription::empty();
47 std::mutex m_mutex{};
48};
49
50template<constraint::decayed_type Type, constraint::observable_of_type<Type> TObs>
52{
53 TObs observable;
54 std::shared_ptr<ref_count_state_t> state = std::make_shared<ref_count_state_t>();
55
56 template<constraint::subscriber_of_type<Type> TSub>
57 void operator()(TSub&& subscriber) const
58 {
59 const bool need_to_connect = state->on_subscribe();
60
61 subscriber.get_subscription().add([state = std::weak_ptr{state}]
62 {
63 if (const auto locked = state.lock())
64 locked->on_unsubscribe();
65 });
66
67 auto sub = subscriber.get_subscription();
68 observable.subscribe(create_subscriber_with_state<Type>(std::move(sub),
69 utils::forwarding_on_next{},
70 utils::forwarding_on_error{},
71 utils::forwarding_on_completed{},
72 std::forward<TSub>(subscriber),
73 // capture state to be sure that state is alive while ANY subscriber is alive
74 state));
75 if (need_to_connect)
76 observable.connect(state->get_subscription());
77 }
78};
79
80template<constraint::decayed_type Type, constraint::observable_of_type<Type> TObs>
81auto ref_count_impl(TObs&& observable)
82{
83 return source::create<Type>(ref_count_on_subscribe<Type, std::decay_t<TObs>>{std::forward<TObs>(observable)});
84}
85} // namespace rpp::details
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
void unsubscribe() const
initiates unsubscription process (if subscribed)
Definition: subscription_base.hpp:59
Definition: ref_count.hpp:52
Definition: ref_count.hpp:24