12#include <rpp/observables/fwd.hpp>
14#include <rpp/defs.hpp>
15#include <rpp/operators/details/strategy.hpp>
17#include <condition_variable>
20namespace rpp::details::observables
27 std::unique_lock lock{m_mutex};
28 m_cv.wait(lock, [
this] {
return m_completed; });
31 void base_dispose_impl(interface_disposable::Mode)
noexcept override
34 std::lock_guard lock{m_mutex};
42 std::condition_variable m_cv{};
46 template<rpp::constra
int::decayed_type Type, rpp::constra
int::observable_strategy<Type> Strategy>
47 class blocking_strategy
50 using value_type = Type;
51 using optimal_disposables_strategy =
typename Strategy::optimal_disposables_strategy::template add<1>;
63 template<rpp::constra
int::observer_strategy<Type> ObserverStrategy>
68 m_original.subscribe(std::move(obs));
71 if (
const auto locked = d.lock())
90 template<constra
int::decayed_type Type, constra
int::observable_strategy<Type> Strategy>
91 class blocking_observable :
public observable<Type, details::observables::blocking_strategy<Type, Strategy>>
Extension over rpp::observable with set of blocking operators - it waits till completion of underlyin...
Definition blocking_observable.hpp:92
Definition blocking_observable.hpp:23
Definition blocking_observable.hpp:48
static disposable_wrapper_impl make(TArgs &&... args)
Main way to create disposable_wrapper. Passed TTarget type can be any type derived from TDisposable.
Definition disposable_wrapper.hpp:164
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172