12#include <rpp/subjects/fwd.hpp> 
   14#include <rpp/disposables/refcount_disposable.hpp> 
   15#include <rpp/observables/observable.hpp> 
   24    template<rpp::constra
int::observable OriginalObservable, rpp::constra
int::subject Subject>
 
   35        std::shared_ptr<state_t> m_state = std::make_shared<state_t>();
 
   37        using value_type                   = rpp::utils::extract_observable_type_t<OriginalObservable>;
 
   40        template<constra
int::observer_strategy<value_type> Strategy>
 
   43            auto [disposable, upstream] = on_subscribe();
 
   45            obs.set_upstream(disposable);
 
   46            original_observable.subscribe(std::move(obs));
 
   47            if (!upstream.is_disposed())
 
   48                original_observable.connect(std::move(upstream));
 
   52        std::pair<rpp::disposable_wrapper, rpp::composite_disposable_wrapper> on_subscribe()
 const 
   54            std::unique_lock lock(m_state->mutex);
 
   55            if (!m_state->disposable.is_disposed())
 
   59            return {m_state->disposable.lock()->add_ref(), m_state->disposable};
 
 
   71    template<rpp::constra
int::observable OriginalObservable, 
typename Subject>
 
   72    class connectable_observable : 
public decltype(std::declval<Subject>().get_observable())
 
   74        using base = 
decltype(std::declval<Subject>().get_observable());
 
   79        connectable_observable(
const OriginalObservable& original_observable, 
const Subject& subject)
 
   80            : base{subject.get_observable()}
 
   81            , m_original_observable{original_observable}
 
   86        connectable_observable(OriginalObservable && original_observable, 
const Subject& subject)
 
   87            : base{subject.get_observable()}
 
   88            , m_original_observable{std::move(original_observable)}
 
  102            std::unique_lock lock(m_state->mutex);
 
  104            if (m_subject.get_disposable().is_disposed())
 
  107            if (!m_state->disposable.is_disposed())
 
  108                return m_state->disposable;
 
  110            m_state->disposable = wrapper;
 
  113            m_original_observable.subscribe(wrapper, m_subject.get_observer());
 
 
  133        template<
typename Op>
 
  134        auto operator|(Op&& op) 
const &
 
  139                return std::forward<Op>(op)(*this);
 
  142                return static_cast<const base&
>(*this) | std::forward<Op>(op);
 
  145        template<
typename Op>
 
  146        auto operator|(Op&& op) &&
 
  148            if constexpr (std::invocable<std::decay_t<Op>, connectable_observable&&>)
 
  150                static_assert(rpp::constraint::observable<std::invoke_result_t<std::decay_t<Op>, connectable_observable&&>>, 
"Result of Op should be observable");
 
  151                return std::forward<Op>(op)(std::move(*
this));
 
  154                return static_cast<base&&
>(*this) | std::forward<Op>(op);
 
  157        template<
typename Op>
 
  158        auto pipe(Op && op) 
const &
 
  160            return *
this | std::forward<Op>(op);
 
  163        template<
typename Op>
 
  164        auto pipe(Op && op) &&
 
  166            return std::move(*
this) | std::forward<Op>(op);
 
  169        auto as_dynamic_connectable() const &
 
  171            return rpp::dynamic_connectable_observable<Subject>{m_original_observable.as_dynamic(), m_subject};
 
  173        auto as_dynamic_connectable() &&
 
  175            return rpp::dynamic_connectable_observable<Subject>{std::move(m_original_observable).as_dynamic(), std::move(m_subject)};
 
  179        RPP_NO_UNIQUE_ADDRESS OriginalObservable m_original_observable;
 
  188        std::shared_ptr<state_t> m_state = std::make_shared<state_t>();
 
 
Extension over raw observable with ability to be manually connected at any time or ref_counting (shar...
Definition connectable_observable.hpp:73
 
rpp::disposable_wrapper connect(rpp::composite_disposable_wrapper wrapper=composite_disposable_wrapper::make()) const
Connects to underlying observable right-now making it hot-observable.
Definition connectable_observable.hpp:100
 
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
 
static disposable_wrapper_impl make(TArgs &&... args)
Main way to create disposable_wrapper. Passed TTarget type can be any type derived from TDisposable.
Definition disposable_wrapper.hpp:164
 
static disposable_wrapper_impl empty()
Creates disposable_wrapper which behaves like disposed disposable.
Definition disposable_wrapper.hpp:178
 
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
 
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
 
auto ref_count() const
Forces rpp::connectable_observable to behave like common observable.
Definition connectable_observable.hpp:127
 
disposable_wrapper_impl< interface_composite_disposable > composite_disposable_wrapper
Wrapper to keep "composite" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:41
 
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
 
Definition connectable_observable.hpp:30
 
Definition connectable_observable.hpp:22