13#include <rpp/operators/fwd.hpp>
15#include <rpp/defs.hpp>
16#include <rpp/disposables/composite_disposable.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/schedulers/current_thread.hpp>
19#include <rpp/utils/utils.hpp>
23namespace rpp::operators::details
25 template<rpp::constraint::observer Observer,
typename TSelector, rpp::constraint::decayed_type... RestArgs>
29 explicit with_latest_from_disposable(Observer&&
observer,
const TSelector& selector)
30 : m_observer_with_mutex{std::move(
observer)}
31 , m_selector{selector}
35 rpp::utils::pointer_under_lock<Observer> get_observer_under_lock() {
return m_observer_with_mutex; }
39 const TSelector& get_selector()
const {
return m_selector; }
44 RPP_NO_UNIQUE_ADDRESS TSelector m_selector;
50 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto;
58 bool is_disposed()
const
60 return disposable->is_disposed();
64 void on_next(T&& v)
const
66 auto locked_value = disposable->get_values().template get<I>().lock();
67 locked_value->emplace(std::forward<T>(v));
70 void on_error(
const std::exception_ptr& err)
const
72 disposable->get_observer_under_lock()->on_error(err);
79 requires std::invocable<TSelector, OriginalValue, RestArgs...>
83 using Result = std::invoke_result_t<TSelector, OriginalValue, RestArgs...>;
84 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
86 std::shared_ptr<Disposable> disposable{};
93 bool is_disposed()
const
95 return disposable->is_disposed();
99 void on_next(T&& v)
const
101 auto result = disposable->get_values().apply([&d = this->disposable, &v](
rpp::utils::value_with_mutex<std::optional<RestArgs>>&... vals) -> std::optional<Result> {
102 auto lock = std::scoped_lock{vals.get_mutex()...};
104 if ((vals.get_value_unsafe().has_value() && ...))
105 return d->get_selector()(rpp::utils::as_const(std::forward<T>(v)), rpp::utils::as_const(vals.get_value_unsafe().value())...);
109 if (result.has_value())
110 disposable->get_observer_under_lock()->on_next(std::move(result).value());
113 void on_error(
const std::exception_ptr& err)
const
115 disposable->get_observer_under_lock()->on_error(err);
118 void on_completed()
const
120 disposable->get_observer_under_lock()->on_completed();
128 RPP_NO_UNIQUE_ADDRESS TSelector selector;
130 template<rpp::constra
int::decayed_type T>
133 static_assert(std::invocable<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>,
"TSelector is not invocable with T and types of rest observables");
135 using result_type = std::invoke_result_t<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>;
137 constexpr static bool own_current_queue =
true;
140 template<rpp::details::observables::constra
int::disposables_strategy Prev>
143 template<rpp::constra
int::decayed_type Type, rpp::constra
int::observer Observer>
144 auto lift(Observer&&
observer)
const
146 return observables.apply(&subscribe_impl<Type, Observer>, std::forward<Observer>(
observer), selector);
150 template<rpp::constra
int::decayed_type Type, rpp::constra
int::observer Observer>
151 static auto subscribe_impl(Observer&&
observer,
const TSelector& selector,
const TObservables&... observables)
156 auto ptr = disposable.lock();
157 ptr->get_observer_under_lock()->set_upstream(disposable.as_weak());
158 subscribe(ptr, std::index_sequence_for<TObservables...>{}, observables...);
164 static void subscribe(
const std::shared_ptr<with_latest_from_disposable<Observer, TSelector, rpp::utils::extract_observable_type_t<TObservables>...>>& disposable, std::index_sequence<I...>,
const TObservables&... observables)
166 (..., observables.subscribe(
rpp::observer<rpp::utils::extract_observable_type_t<TObservables>, with_latest_from_inner_observer_strategy<I, Observer, TSelector, rpp::utils::extract_observable_type_t<TObservables>...>>{disposable}));
171namespace rpp::operators
206 std::forward<TSelector>(selector)};
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
static disposable_wrapper_impl make(TArgs &&... args)
Main way to create disposable_wrapper. Passed TTarget type can be any type derived from TDisposable.
Definition disposable_wrapper.hpp:164
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition with_latest_from.hpp:27
Definition constraints.hpp:22
Definition function_traits.hpp:45
auto with_latest_from(TSelector &&selector, TObservable &&observable, TObservables &&... observables)
Combines latest emissions from observables with emission from current observable when it sends new va...
Definition with_latest_from.hpp:197
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226
Definition disposables_strategy.hpp:29
Definition with_latest_from.hpp:49
Definition with_latest_from.hpp:81
Definition with_latest_from.hpp:132
Definition with_latest_from.hpp:126
Definition functors.hpp:28
Definition functors.hpp:82