ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
connectable_observable.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/observables/constraints.hpp> // OriginalObservable type
13#include <rpp/operators/fwd/ref_count.hpp> // include forwarding for member_overload
14#include <rpp/subjects/constraints.hpp> // type of subject used
15#include <rpp/subjects/type_traits.hpp> // deduce observable type by subject type
16#include <rpp/subscriptions/composite_subscription.hpp> // lifetime
17#include <rpp/defs.hpp> // RPP_EMPTY_BASES
18
19#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
20
21
22#include <memory>
23#include <mutex>
24
25namespace rpp
26{
35template<constraint::decayed_type Type,
36 subjects::constraint::subject_of_type<Type> Subject,
37 constraint::observable_of_type<Type> OriginalObservable>
38class RPP_EMPTY_BASES connectable_observable
39 : public decltype(std::declval<Subject>().get_observable())
40 , public details::member_overload<Type, connectable_observable<Type, Subject, OriginalObservable>, details::ref_count_tag>
41{
42 using base = decltype(std::declval<Subject>().get_observable());
43public:
44 connectable_observable(const OriginalObservable& original_observable, const Subject& subject = Subject{})
45 : base{subject.get_observable()}
46 , m_original_observable{original_observable}
47 , m_state{std::make_shared<state_t>(subject)} {}
48
49 connectable_observable(OriginalObservable&& original_observable, const Subject& subject = Subject{})
50 : base{subject.get_observable()}
51 , m_original_observable{std::move(original_observable)}
52 , m_state{std::make_shared<state_t>(subject)} {}
53
54 composite_subscription connect(const composite_subscription& subscription = composite_subscription{}) const
55 {
56 auto subscriber = m_state->subject.get_subscriber();
57 const auto& subscriber_subscription = subscriber.get_subscription();
58
59 {
60 std::lock_guard lock(m_state->mutex);
61
62 if (!m_state->sub.is_empty())
63 return subscription;
64
65 subscriber_subscription.add(subscription);
66 m_state->sub = subscription;
67 }
68
69 subscription.add([state = std::weak_ptr{m_state}]
70 {
71 if (const auto locked = state.lock())
72 {
73 auto current_sub = composite_subscription::empty();
74 {
75 std::lock_guard lock(locked->mutex);
76 std::swap(current_sub, locked->sub);
77 }
78 current_sub.unsubscribe();
79 locked->subject.get_subscriber().get_subscription().remove(current_sub);
80 }
81 });
82
83
84 m_original_observable.subscribe(create_subscriber_with_state<Type>(m_state->sub,
85 utils::forwarding_on_next{},
86 utils::forwarding_on_error{},
87 utils::forwarding_on_completed{},
88 subscriber.get_observer(),
89 // capture state to be sure that state is alive while ANY subscriber is alive
90 m_state));
91
92 return subscription;
93 }
94
95private:
96 OriginalObservable m_original_observable;
97 struct state_t
98 {
99 state_t(const Subject& subj) : subject{subj} {}
100
101 Subject subject;
102 std::mutex mutex{};
103 composite_subscription sub = composite_subscription::empty();
104 };
105
106 std::shared_ptr<state_t> m_state{};
107};
108
109template<constraint::observable OriginalObservable, subjects::constraint::subject Subject>
110connectable_observable(const OriginalObservable&, const Subject&) -> connectable_observable<subjects::utils::extract_subject_type_t<Subject>, Subject, OriginalObservable>;
111} // namespace rpp
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
connectable alternative of observable: extends interface with extra functionality....
Definition: connectable_observable.hpp:41
Definition: member_overload.hpp:19