ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
connectable_observable.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/subjects/fwd.hpp>
13
14#include <rpp/disposables/refcount_disposable.hpp>
15#include <rpp/observables/observable.hpp>
16
17#include <mutex>
18
19namespace rpp::details
20{
21 template<typename T>
23
24 template<rpp::constraint::observable OriginalObservable, rpp::constraint::subject Subject>
25 struct ref_count_on_subscribe_t<rpp::connectable_observable<OriginalObservable, Subject>>
26 {
28
34
35 std::shared_ptr<state_t> m_state = std::make_shared<state_t>();
36
37 using value_type = rpp::utils::extract_observable_type_t<OriginalObservable>;
39
40 template<constraint::observer_strategy<value_type> Strategy>
41 void subscribe(observer<value_type, Strategy>&& obs) const
42 {
43 auto [disposable, upstream] = on_subscribe();
44
45 obs.set_upstream(disposable);
46 original_observable.subscribe(std::move(obs));
47 if (!upstream.is_disposed())
48 original_observable.connect(std::move(upstream));
49 }
50
51 private:
52 std::pair<rpp::disposable_wrapper, rpp::composite_disposable_wrapper> on_subscribe() const
53 {
54 std::unique_lock lock(m_state->mutex);
55 if (!m_state->disposable.is_disposed())
56 return {m_state->disposable.lock()->add_ref(), composite_disposable_wrapper::empty()};
57
59 return {m_state->disposable.lock()->add_ref(), m_state->disposable};
60 }
61 };
62} // namespace rpp::details
63
64namespace rpp
65{
71 template<rpp::constraint::observable OriginalObservable, typename Subject>
72 class connectable_observable : public decltype(std::declval<Subject>().get_observable())
73 {
74 using base = decltype(std::declval<Subject>().get_observable());
75
76 public:
78
79 connectable_observable(const OriginalObservable& original_observable, const Subject& subject)
80 : base{subject.get_observable()}
81 , m_original_observable{original_observable}
82 , m_subject{subject}
83 {
84 }
85
86 connectable_observable(OriginalObservable && original_observable, const Subject& subject)
87 : base{subject.get_observable()}
88 , m_original_observable{std::move(original_observable)}
89 , m_subject{subject}
90 {
91 }
92
101 {
102 std::unique_lock lock(m_state->mutex);
103
104 if (m_subject.get_disposable().is_disposed())
106
107 if (!m_state->disposable.is_disposed())
108 return m_state->disposable;
109
110 m_state->disposable = wrapper;
111 lock.unlock();
112
113 m_original_observable.subscribe(wrapper, m_subject.get_observer());
114 return wrapper;
115 }
116
132
133 template<typename Op>
134 auto operator|(Op&& op) const &
135 {
136 if constexpr (std::invocable<std::decay_t<Op>, const connectable_observable&>)
137 {
138 static_assert(rpp::constraint::observable<std::invoke_result_t<std::decay_t<Op>, const connectable_observable&>>, "Result of Op should be observable");
139 return std::forward<Op>(op)(*this);
140 }
141 else
142 return static_cast<const base&>(*this) | std::forward<Op>(op);
143 }
144
145 template<typename Op>
146 auto operator|(Op&& op)&&
147 {
148 if constexpr (std::invocable<std::decay_t<Op>, connectable_observable&&>)
149 {
150 static_assert(rpp::constraint::observable<std::invoke_result_t<std::decay_t<Op>, connectable_observable&&>>, "Result of Op should be observable");
151 return std::forward<Op>(op)(std::move(*this));
152 }
153 else
154 return static_cast<base&&>(*this) | std::forward<Op>(op);
155 }
156
157 template<typename Op>
158 auto pipe(Op && op) const &
159 {
160 return *this | std::forward<Op>(op);
161 }
162
163 template<typename Op>
164 auto pipe(Op && op)&&
165 {
166 return std::move(*this) | std::forward<Op>(op);
167 }
168
169 auto as_dynamic_connectable() const &
170 {
171 return rpp::dynamic_connectable_observable<Subject>{m_original_observable.as_dynamic(), m_subject};
172 }
173 auto as_dynamic_connectable()&&
174 {
175 return rpp::dynamic_connectable_observable<Subject>{std::move(m_original_observable).as_dynamic(), std::move(m_subject)};
176 }
177
178 private:
179 RPP_NO_UNIQUE_ADDRESS OriginalObservable m_original_observable;
180 Subject m_subject;
181
182 struct state_t
183 {
184 std::mutex mutex{};
186 };
187
188 std::shared_ptr<state_t> m_state = std::make_shared<state_t>();
189 };
190} // namespace rpp
Extension over raw observable with ability to be manually connected at any time or ref_counting (shar...
Definition connectable_observable.hpp:73
rpp::disposable_wrapper connect(rpp::composite_disposable_wrapper wrapper=composite_disposable_wrapper::make()) const
Connects to underlying observable right-now making it hot-observable.
Definition connectable_observable.hpp:100
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
static disposable_wrapper_impl make(TArgs &&... args)
Main way to create disposable_wrapper. Passed TTarget type can be any type derived from TDisposable.
Definition disposable_wrapper.hpp:164
static disposable_wrapper_impl empty()
Creates disposable_wrapper which behaves like disposed disposable.
Definition disposable_wrapper.hpp:178
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition fwd.hpp:80
Definition fwd.hpp:47
auto ref_count() const
Forces rpp::connectable_observable to behave like common observable.
Definition connectable_observable.hpp:127
disposable_wrapper_impl< interface_composite_disposable > composite_disposable_wrapper
Wrapper to keep "composite" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:41
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition connectable_observable.hpp:22