ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
combine_latest.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/combining_strategy.hpp>
17#include <rpp/operators/details/strategy.hpp>
18
19namespace rpp::operators::details
20{
21 template<rpp::constraint::observer Observer, typename TSelector, rpp::constraint::decayed_type... Args>
22 class combine_latest_disposable final : public combining_disposable<Observer>
23 {
24 public:
25 explicit combine_latest_disposable(Observer&& observer, const TSelector& selector)
26 : combining_disposable<Observer>(std::move(observer), sizeof...(Args))
27 , m_selector(selector)
28 {
29 }
30
31 const auto& get_selector() const { return m_selector; }
32
33 auto& get_values() { return m_values; }
34
35 private:
37
38 RPP_NO_UNIQUE_ADDRESS TSelector m_selector;
39 };
40
41 template<size_t I, rpp::constraint::observer Observer, typename TSelector, rpp::constraint::decayed_type... Args>
43 : public combining_observer_strategy<combine_latest_disposable<Observer, TSelector, Args...>>
44 {
45 using combining_observer_strategy<combine_latest_disposable<Observer, TSelector, Args...>>::disposable;
46
47 template<typename T>
48 void on_next(T&& v) const
49 {
50 // mutex need to be locked during changing of values, generating new values and sending of new values due to we can't update value while we are sending old one
51 const auto observer = disposable->get_observer_under_lock();
52 disposable->get_values().template get<I>().emplace(std::forward<T>(v));
53
54 disposable->get_values().apply(&apply_impl<decltype(disposable)>, disposable, observer);
55 }
56
57 private:
58 template<typename TDisposable>
59 static void apply_impl(const TDisposable& disposable, const rpp::utils::pointer_under_lock<Observer>& observer, const std::optional<Args>&... vals)
60 {
61 if ((vals.has_value() && ...))
62 observer->on_next(disposable->get_selector()(vals.value()...));
63 }
64 };
65
66 template<typename TSelector, rpp::constraint::observable... TObservables>
67 struct combine_latest_t : public combining_operator_t<combine_latest_disposable, combine_latest_observer_strategy, TSelector, TObservables...>
68 {
69 };
70} // namespace rpp::operators::details
71
72namespace rpp::operators
73{
98 * @ingroup combining_operators
99 * @see https://reactivex.io/documentation/operators/combinelatest.html
100 */
101 template<typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
102 requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
103 auto combine_latest(TSelector&& selector, TObservable&& observable, TObservables&&... observables)
104 {
105 return details::combine_latest_t<std::decay_t<TSelector>, std::decay_t<TObservable>, std::decay_t<TObservables>...>{
106 rpp::utils::tuple{std::forward<TObservable>(observable), std::forward<TObservables>(observables)...},
107 std::forward<TSelector>(selector)};
108 }
109
129 * @param observables are observables whose emissions would be combined when any observable sends new value
130 * @note `#include <rpp/operators/combine_latest.hpp>`
131 *
132 * @par Examples
133 * @snippet combine_latest.cpp combine_latest
134 *
135 * @ingroup combining_operators
136 * @see https://reactivex.io/documentation/operators/combinelatest.html
137 */
138 template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
139 auto combine_latest(TObservable&& observable, TObservables&&... observables)
140 {
141 return combine_latest(rpp::utils::pack_to_tuple{}, std::forward<TObservable>(observable), std::forward<TObservables>(observables)...);
142 }
143} // namespace rpp::operators
void on_next(const Type &v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:84
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition tuple.hpp:105
Definition constraints.hpp:22
Definition fwd.hpp:80
Definition fwd.hpp:250
Definition function_traits.hpp:45
auto combine_latest(TSelector &&selector, TObservable &&observable, TObservables &&... observables)
Combines latest emissions from observables with emission from current observable when any observable ...
Definition combine_latest.hpp:98
Definition combine_latest.hpp:68
Definition combining_strategy.hpp:51
Definition combining_strategy.hpp:81
Definition functors.hpp:82