29 explicit combining_disposable(Observer&&
observer,
size_t on_completed_needed)
30 : m_observer_with_mutex{std::move(
observer)}
31 , m_on_completed_needed{on_completed_needed}
35 rpp::utils::pointer_under_lock<Observer> get_observer_under_lock() {
return m_observer_with_mutex; }
37 bool decrement_on_completed()
40 return m_on_completed_needed.fetch_sub(1, std::memory_order::seq_cst) == 1;
46 std::atomic_size_t m_on_completed_needed;
53 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto;
55 std::shared_ptr<TDisposable> disposable{};
62 bool is_disposed()
const
64 return disposable->is_disposed();
67 void on_error(
const std::exception_ptr& err)
const
69 disposable->get_observer_under_lock()->on_error(err);
72 void on_completed()
const
74 if (disposable->decrement_on_completed())
75 disposable->get_observer_under_lock()->on_completed();
83 RPP_NO_UNIQUE_ADDRESS TSelector selector;
85 template<rpp::constra
int::decayed_type T>
88 static_assert(std::invocable<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>,
"Selector is not callable with passed T type");
90 using result_type = std::invoke_result_t<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>;
92 constexpr static bool own_current_queue =
true;
95 template<rpp::details::observables::constra
int::disposables_strategy Prev>
98 template<rpp::constra
int::decayed_type Type, rpp::constra
int::observer Observer>
101 return observables.apply(&subscribe_impl<Type, Observer>, std::forward<Observer>(
observer), selector);
105 template<rpp::constra
int::decayed_type Type, rpp::constra
int::observer Observer>
106 static auto subscribe_impl(Observer&&
observer,
const TSelector& selector,
const TObservables&... observables)
108 using Disposable = TDisposable<Observer, TSelector, Type, rpp::utils::extract_observable_type_t<TObservables>...>;
111 auto locked = disposable.lock();
112 locked->get_observer_under_lock()->set_upstream(disposable.as_weak());
120 static void subscribe(
const std::shared_ptr<TDisposable<Observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t<TObservables>...>>& disposable, std::index_sequence<I...>,
const TObservables&... observables)
122 (..., observables.subscribe(
rpp::observer<rpp::utils::extract_observable_type_t<TObservables>, TStrategy<I + 1, Observer, TSelector, ExpectedValue, rpp::utils::extract_observable_type_t<TObservables>...>>{disposable}));
static disposable_wrapper_impl make(TArgs &&... args)
Main way to create disposable_wrapper. Passed TTarget type can be any type derived from TDisposable.
Definition disposable_wrapper.hpp:164
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226