ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
with_latest_from.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/disposables/composite_disposable.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/schedulers/current_thread.hpp>
19#include <rpp/utils/utils.hpp>
20
21#include <memory>
22
23namespace rpp::operators::details
24{
25 template<rpp::constraint::observer Observer, typename TSelector, rpp::constraint::decayed_type... RestArgs>
26 class with_latest_from_disposable final : public composite_disposable
27 {
28 public:
29 explicit with_latest_from_disposable(Observer&& observer, const TSelector& selector)
30 : m_observer_with_mutex{std::move(observer)}
31 , m_selector{selector}
32 {
33 }
34
35 rpp::utils::pointer_under_lock<Observer> get_observer_under_lock() { return m_observer_with_mutex; }
36
38
39 const TSelector& get_selector() const { return m_selector; }
40
41 private:
42 rpp::utils::value_with_mutex<Observer> m_observer_with_mutex{};
44 RPP_NO_UNIQUE_ADDRESS TSelector m_selector;
45 };
46
47 template<size_t I, rpp::constraint::observer Observer, typename TSelector, rpp::constraint::decayed_type... RestArgs>
49 {
50 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto;
51 std::shared_ptr<with_latest_from_disposable<Observer, TSelector, RestArgs...>> disposable{};
52
53 void set_upstream(const rpp::disposable_wrapper& d) const
54 {
55 disposable->add(d);
56 }
57
58 bool is_disposed() const
59 {
60 return disposable->is_disposed();
61 }
62
63 template<typename T>
64 void on_next(T&& v) const
65 {
66 auto locked_value = disposable->get_values().template get<I>().lock();
67 locked_value->emplace(std::forward<T>(v));
68 }
69
70 void on_error(const std::exception_ptr& err) const
71 {
72 disposable->get_observer_under_lock()->on_error(err);
73 }
74
75 static constexpr rpp::utils::empty_function_t<> on_completed{};
76 };
77
78 template<rpp::constraint::observer Observer, typename TSelector, typename OriginalValue, rpp::constraint::decayed_type... RestArgs>
79 requires std::invocable<TSelector, OriginalValue, RestArgs...>
81 {
82 using Disposable = with_latest_from_disposable<Observer, TSelector, RestArgs...>;
83 using Result = std::invoke_result_t<TSelector, OriginalValue, RestArgs...>;
84 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
85
86 std::shared_ptr<Disposable> disposable{};
87
88 void set_upstream(const rpp::disposable_wrapper& d) const
89 {
90 disposable->add(d);
91 }
92
93 bool is_disposed() const
94 {
95 return disposable->is_disposed();
96 }
97
98 template<typename T>
99 void on_next(T&& v) const
100 {
101 auto result = disposable->get_values().apply([&d = this->disposable, &v](rpp::utils::value_with_mutex<std::optional<RestArgs>>&... vals) -> std::optional<Result> {
102 auto lock = std::scoped_lock{vals.get_mutex()...};
103
104 if ((vals.get_value_unsafe().has_value() && ...))
105 return d->get_selector()(rpp::utils::as_const(std::forward<T>(v)), rpp::utils::as_const(vals.get_value_unsafe().value())...);
106 return std::nullopt;
107 });
108
109 if (result.has_value())
110 disposable->get_observer_under_lock()->on_next(std::move(result).value());
111 }
112
113 void on_error(const std::exception_ptr& err) const
114 {
115 disposable->get_observer_under_lock()->on_error(err);
116 }
117
118 void on_completed() const
119 {
120 disposable->get_observer_under_lock()->on_completed();
121 }
122 };
123
124 template<typename TSelector, rpp::constraint::observable... TObservables>
126 {
127 RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple<TObservables...> observables;
128 RPP_NO_UNIQUE_ADDRESS TSelector selector;
129
130 template<rpp::constraint::decayed_type T>
132 {
133 static_assert(std::invocable<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>, "TSelector is not invocable with T and types of rest observables");
134
135 using result_type = std::invoke_result_t<TSelector, T, rpp::utils::extract_observable_type_t<TObservables>...>;
136
137 constexpr static bool own_current_queue = true;
138 };
139
140 template<rpp::details::observables::constraint::disposables_strategy Prev>
141 using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>;
142
143 template<rpp::constraint::decayed_type Type, rpp::constraint::observer Observer>
144 auto lift(Observer&& observer) const
145 {
146 return observables.apply(&subscribe_impl<Type, Observer>, std::forward<Observer>(observer), selector);
147 }
148
149 private:
150 template<rpp::constraint::decayed_type Type, rpp::constraint::observer Observer>
151 static auto subscribe_impl(Observer&& observer, const TSelector& selector, const TObservables&... observables)
152 {
154
155 const auto disposable = disposable_wrapper_impl<Disposable>::make(std::forward<Observer>(observer), selector);
156 auto ptr = disposable.lock();
157 ptr->get_observer_under_lock()->set_upstream(disposable.as_weak());
158 subscribe(ptr, std::index_sequence_for<TObservables...>{}, observables...);
159
160 return rpp::observer<Type, with_latest_from_observer_strategy<std::decay_t<Observer>, TSelector, Type, rpp::utils::extract_observable_type_t<TObservables>...>>{std::move(ptr)};
161 }
162
163 template<rpp::constraint::observer Observer, size_t... I>
164 static void subscribe(const std::shared_ptr<with_latest_from_disposable<Observer, TSelector, rpp::utils::extract_observable_type_t<TObservables>...>>& disposable, std::index_sequence<I...>, const TObservables&... observables)
165 {
166 (..., observables.subscribe(rpp::observer<rpp::utils::extract_observable_type_t<TObservables>, with_latest_from_inner_observer_strategy<I, Observer, TSelector, rpp::utils::extract_observable_type_t<TObservables>...>>{disposable}));
167 }
168 };
169} // namespace rpp::operators::details
170
171namespace rpp::operators
172{
197 * @ingroup combining_operators
198 * @see https://reactivex.io/documentation/operators/combinelatest.html
199 */
200 template<typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
201 requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
202 auto with_latest_from(TSelector&& selector, TObservable&& observable, TObservables&&... observables)
203 {
204 return details::with_latest_from_t<std::decay_t<TSelector>, std::decay_t<TObservable>, std::decay_t<TObservables>...>{
205 rpp::utils::tuple{std::forward<TObservable>(observable), std::forward<TObservables>(observables)...},
206 std::forward<TSelector>(selector)};
207 }
208
226 * @param observables are observables whose emissions would be combined when current observable sends new value
227 * @note `#include <rpp/operators/with_latest_from.hpp>`
228 *
229 * @par Examples
230 * @snippet with_latest_from.cpp with_latest_from
231 *
232 * @ingroup combining_operators
233 * @see https://reactivex.io/documentation/operators/combinelatest.html
234 */
235 template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
236 auto with_latest_from(TObservable&& observable, TObservables&&... observables)
237 {
238 return with_latest_from(rpp::utils::pack_to_tuple{}, std::forward<TObservable>(observable), std::forward<TObservables>(observables)...);
239 }
240} // namespace rpp::operators
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
static disposable_wrapper_impl make(TArgs &&... args)
Main way to create disposable_wrapper. Passed TTarget type can be any type derived from TDisposable.
Definition disposable_wrapper.hpp:164
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition tuple.hpp:105
Definition utils.hpp:260
Definition constraints.hpp:22
Definition fwd.hpp:80
Definition fwd.hpp:250
Definition function_traits.hpp:45
auto with_latest_from(TSelector &&selector, TObservable &&observable, TObservables &&... observables)
Combines latest emissions from observables with emission from current observable when it sends new va...
Definition with_latest_from.hpp:197
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226
Definition disposables_strategy.hpp:29
Definition with_latest_from.hpp:126
Definition functors.hpp:28
Definition functors.hpp:82