12#include <rpp/subjects/fwd.hpp>
14#include <rpp/disposables/refcount_disposable.hpp>
15#include <rpp/observables/observable.hpp>
24 template<rpp::constra
int::observable OriginalObservable, rpp::constra
int::subject Subject>
35 std::shared_ptr<state_t> m_state = std::make_shared<state_t>();
37 using value_type = rpp::utils::extract_observable_type_t<OriginalObservable>;
40 template<constra
int::observer_strategy<value_type> Strategy>
43 auto [disposable, upstream] = on_subscribe();
45 obs.set_upstream(disposable);
46 original_observable.subscribe(std::move(obs));
47 if (!upstream.is_disposed())
48 original_observable.connect(std::move(upstream));
52 std::pair<rpp::disposable_wrapper, rpp::composite_disposable_wrapper> on_subscribe()
const
54 std::unique_lock lock(m_state->mutex);
55 if (!m_state->disposable.is_disposed())
59 return {m_state->disposable.lock()->add_ref(), m_state->disposable};
71 template<rpp::constra
int::observable OriginalObservable,
typename Subject>
72 class connectable_observable final :
public decltype(std::declval<Subject>().get_observable())
74 using base =
decltype(std::declval<Subject>().get_observable());
79 connectable_observable(
const OriginalObservable& original_observable,
const Subject& subject = Subject{})
80 : base{subject.get_observable()}
81 , m_original_observable{original_observable}
86 connectable_observable(OriginalObservable && original_observable,
const Subject& subject = Subject{})
87 : base{subject.get_observable()}
88 , m_original_observable{std::move(original_observable)}
102 std::unique_lock lock(m_state->mutex);
104 if (m_subject.get_disposable().is_disposed())
107 if (!m_state->disposable.is_disposed())
108 return m_state->disposable;
110 m_state->disposable = wrapper;
113 m_original_observable.subscribe(wrapper, m_subject.get_observer());
133 template<
typename Op>
134 auto operator|(Op&& op)
const &
139 return std::forward<Op>(op)(*this);
142 return static_cast<const base&
>(*this) | std::forward<Op>(op);
145 template<
typename Op>
146 auto operator|(Op&& op)&&
148 if constexpr (std::invocable<std::decay_t<Op>, connectable_observable&&>)
151 return std::forward<Op>(op)(std::move(*
this));
154 return static_cast<base&&
>(*this) | std::forward<Op>(op);
157 template<
typename Op>
158 auto pipe(Op && op)
const &
160 return *
this | std::forward<Op>(op);
163 template<
typename Op>
164 auto pipe(Op && op)&&
166 return std::move(*
this) | std::forward<Op>(op);
170 RPP_NO_UNIQUE_ADDRESS OriginalObservable m_original_observable;
179 std::shared_ptr<state_t> m_state = std::make_shared<state_t>();
Extension over raw observable with ability to be manually connected at any time or ref_counting (shar...
Definition fwd.hpp:86
rpp::disposable_wrapper connect(rpp::composite_disposable_wrapper wrapper=composite_disposable_wrapper::make()) const
Connects to underlying observable right-now making it hot-observable.
Definition connectable_observable.hpp:100
Main RPP wrapper over disposables.
Definition fwd.hpp:27
static disposable_wrapper_impl make(TArgs &&... args)
Main way to create disposable_wrapper. Passed TTarget type can be any type derived from TDisposable.
Definition disposable_wrapper.hpp:164
static disposable_wrapper_impl empty()
Definition disposable_wrapper.hpp:178
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
auto ref_count() const
Forces rpp::connectable_observable to behave like common observable.
Definition connectable_observable.hpp:127
Definition connectable_observable.hpp:30
Definition connectable_observable.hpp:22