ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
zip.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/combining_strategy.hpp>
17#include <rpp/operators/details/strategy.hpp>
18
19#include <deque>
20
21namespace rpp::operators::details
22{
23 template<rpp::constraint::observer Observer, typename TSelector, rpp::constraint::decayed_type... Args>
24 class zip_disposable final : public combining_disposable<Observer>
25 {
26 public:
27 explicit zip_disposable(Observer&& observer, const TSelector& selector)
28 : combining_disposable<Observer>(std::move(observer), sizeof...(Args))
29 , m_selector(selector)
30 {
31 }
32
33 const auto& get_selector() const { return m_selector; }
34
35 auto& get_pendings() { return m_pendings; }
36
37 private:
38 utils::tuple<std::deque<Args>...> m_pendings{};
39
40 RPP_NO_UNIQUE_ADDRESS TSelector m_selector;
41 };
42
43 template<size_t I, rpp::constraint::observer Observer, typename TSelector, rpp::constraint::decayed_type... Args>
45 : public combining_observer_strategy<zip_disposable<Observer, TSelector, Args...>>
46 {
47 using combining_observer_strategy<zip_disposable<Observer, TSelector, Args...>>::disposable;
48
49 template<typename T>
50 void on_next(T&& v) const
51 {
52 const auto observer = disposable->get_observer_under_lock();
53 disposable->get_pendings().template get<I>().push_back(std::forward<T>(v));
54
55 disposable->get_pendings().apply(&apply_impl<decltype(disposable)>, disposable, observer);
56 }
57
58 private:
59 template<typename TDisposable>
60 static void apply_impl(const TDisposable& disposable, const rpp::utils::pointer_under_lock<Observer>& observer, std::deque<Args>&... values)
61 {
62 if ((!values.empty() && ...))
63 {
64 observer->on_next(disposable->get_selector()(std::move(values.front())...));
65 (values.pop_front(), ...);
66 }
67 }
68 };
69
70 template<typename TSelector, rpp::constraint::observable... TObservables>
71 struct zip_t : public combining_operator_t<zip_disposable, zip_observer_strategy, TSelector, TObservables...>
72 {
73 };
74} // namespace rpp::operators::details
75
76namespace rpp::operators
77{
98 * @ingroup combining_operators
99 * @see https://reactivex.io/documentation/operators/zip.html
100 */
101 template<typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
102 requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
103 auto zip(TSelector&& selector, TObservable&& observable, TObservables&&... observables)
104 {
105 return details::zip_t<std::decay_t<TSelector>, std::decay_t<TObservable>, std::decay_t<TObservables>...>{
106 rpp::utils::tuple{std::forward<TObservable>(observable), std::forward<TObservables>(observables)...},
107 std::forward<TSelector>(selector)};
108 }
109
123 * - each value from any observable copied/moved to internal storage
124 * - mutex acquired every time value obtained
125 *
126 * @param observables are observables whose emissions would be zipped with current observable
127 * @note `#include <rpp/operators/zip.hpp>`
128 *
129 * @ingroup combining_operators
130 * @see https://reactivex.io/documentation/operators/zip.html
131 */
132 template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
133 auto zip(TObservable&& observable, TObservables&&... observables)
134 {
135 return zip(rpp::utils::pack_to_tuple{}, std::forward<TObservable>(observable), std::forward<TObservables>(observables)...);
136 }
137} // namespace rpp::operators
void on_next(const Type &v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:84
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition tuple.hpp:105
Definition constraints.hpp:22
Definition fwd.hpp:80
Definition fwd.hpp:250
Definition function_traits.hpp:45
auto zip(TSelector &&selector, TObservable &&observable, TObservables &&... observables)
combines emissions from observables and emit single items for each combination based on the results o...
Definition zip.hpp:98
Definition combining_strategy.hpp:51
Definition combining_strategy.hpp:81
Definition zip.hpp:72
Definition functors.hpp:82