13#include <rpp/operators/fwd.hpp>
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/combining_strategy.hpp>
17#include <rpp/operators/details/strategy.hpp>
21namespace rpp::operators::details
23 template<rpp::constraint::observer Observer,
typename TSelector, rpp::constraint::decayed_type... Args>
24 class zip_disposable final :
public combining_disposable<Observer>
27 explicit zip_disposable(Observer&&
observer,
const TSelector& selector)
28 : combining_disposable<Observer>(std::move(
observer),
sizeof...(Args))
29 , m_selector(selector)
33 const auto& get_selector()
const {
return m_selector; }
35 auto& get_pendings() {
return m_pendings; }
40 RPP_NO_UNIQUE_ADDRESS TSelector m_selector;
50 void on_next(T&& v)
const
52 const auto observer = disposable->get_observer_under_lock();
53 disposable->get_pendings().template get<I>().push_back(std::forward<T>(v));
55 disposable->get_pendings().apply(&apply_impl<
decltype(disposable)>, disposable,
observer);
59 template<
typename TDisposable>
60 static void apply_impl(
const TDisposable& disposable,
const rpp::utils::pointer_under_lock<Observer>&
observer, std::deque<Args>&... values)
62 if ((!values.empty() && ...))
64 observer->
on_next(disposable->get_selector()(std::move(values.front())...));
65 (values.pop_front(), ...);
76namespace rpp::operators
103 auto zip(TSelector&& selector, TObservable&&
observable, TObservables&&... observables)
107 std::forward<TSelector>(selector)};
133 auto zip(TObservable&&
observable, TObservables&&... observables)
void on_next(const Type &v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:84
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition constraints.hpp:22
Definition function_traits.hpp:45
auto zip(TSelector &&selector, TObservable &&observable, TObservables &&... observables)
combines emissions from observables and emit single items for each combination based on the results o...
Definition zip.hpp:98
Definition combining_strategy.hpp:51
Definition combining_strategy.hpp:81
Definition functors.hpp:82