13#include <rpp/operators/details/subscriber_with_state.hpp>
15#include <rpp/defs.hpp>
17#include <rpp/observables/constraints.hpp>
18#include <rpp/operators/fwd/with_latest_from.hpp>
19#include <rpp/operators/merge.hpp>
20#include <rpp/subscribers/constraints.hpp>
21#include <rpp/utils/utilities.hpp>
22#include <rpp/utils/functors.hpp>
23#include <rpp/utils/spinlock.hpp>
29IMPLEMENTATION_FILE(with_latest_from_tag);
33template<
typename TSelector, constraint::decayed_type... ValueTypes>
38 , selector(selector) {}
42 std::array<std::mutex,
sizeof...(ValueTypes)> mutexes{};
43 std::tuple<std::optional<ValueTypes>...> vals{};
51 std::lock_guard lock{state->mutexes[I]};
52 std::get<I>(state->vals) = std::forward<decltype(value)>(value);
59template<
size_t I, constra
int::observable TObs>
60void with_latest_from_subscribe(
const auto& state_ptr,
const TObs& observable,
const auto& subscriber)
62 using Type = utils::extract_observable_type_t<TObs>;
63 observable.subscribe(create_subscriber_with_state<Type>(state_ptr->children_subscriptions.make_child(),
66 [](
const auto&,
const auto&) {},
72void with_latest_from_subscribe_observables(std::index_sequence<I...>,
73 const auto& state_ptr,
74 const auto& subscriber,
75 const auto& observables_tuple)
77 (with_latest_from_subscribe<I>(state_ptr, std::get<I>(observables_tuple), subscriber), ...);
80template<
typename TSelector, constraint::decayed_type... ValueTypes>
84 void operator()(T&& v,
const auto& sub,
const auto& state)
const
86 using ResultType = utils::decayed_invoke_result_t<TSelector, std::decay_t<T>, ValueTypes...>;
88 auto result = std::apply([&](
const auto&...current_cached_vals) -> std::optional<ResultType>
90 auto lock = std::apply([](
auto&...mutexes)
92 return std::scoped_lock{mutexes...};
96 if ((current_cached_vals.has_value() && ...))
97 return state->selector(utils::as_const(std::forward<T>(v)),
98 utils::as_const(current_cached_vals.value())...);
103 if (result.has_value())
104 sub.on_next(std::move(result.value()));
108template<
typename TSelector, constraint::decayed_type... ValueTypes>
114 utils::spinlock spinlock{};
120 using ResultType = utils::decayed_invoke_result_t<
121 TSelector, Type, utils::extract_observable_type_t<TObservables>...>;
123 RPP_NO_UNIQUE_ADDRESS TSelector selector;
124 RPP_NO_UNIQUE_ADDRESS std::tuple<TObservables...> observables;
126 template<constra
int::subscriber_of_type<ResultType> TSub>
127 auto operator()(TSub&& in_subscriber)
const
129 auto state = std::make_shared<with_latest_from_state_with_serialized_spinlock<TSelector, utils::extract_observable_type_t<TObservables>...>>(selector, in_subscriber.get_subscription());
131 auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber), std::shared_ptr<utils::spinlock>{state, &state->spinlock});
133 with_latest_from_subscribe_observables(std::index_sequence_for<TObservables...>{},
138 auto sub = state->children_subscriptions.make_child();
139 return create_subscriber_with_state<Type>(std::move(sub),
143 std::move(subscriber),
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
Definition: constraints.hpp:19
Definition: constraints.hpp:19
Definition: early_unsubscribe.hpp:39
Definition: early_unsubscribe.hpp:28
Definition: early_unsubscribe.hpp:19
Definition: with_latest_from.hpp:119
Definition: with_latest_from.hpp:48
Definition: with_latest_from.hpp:82
Definition: with_latest_from.hpp:110
Definition: with_latest_from.hpp:35