14#include <rpp/defs.hpp>
15#include <rpp/operators/lift.hpp>
16#include <rpp/operators/merge.hpp>
17#include <rpp/operators/details/subscriber_with_state.hpp>
18#include <rpp/operators/fwd/combine_latest.hpp>
19#include <rpp/subscribers/constraints.hpp>
20#include <rpp/utils/spinlock.hpp>
24IMPLEMENTATION_FILE(combine_latest_tag);
33template<
typename TCombiner, constraint::decayed_type... Types>
38 , combiner(combiner) {}
42 std::mutex values_mutex{};
43 std::tuple<std::optional<Types>...> values{};
49 template<
typename TCombiner, constraint::decayed_type... Types>
50 void operator()(
auto&& value,
51 const auto& subscriber,
55 std::scoped_lock lock{state->values_mutex};
56 std::get<I>(state->values) = std::forward<decltype(value)>(value);
58 std::apply([&](
const auto&...cached_values)
60 if ((cached_values.has_value() && ...))
61 subscriber.on_next(state->combiner(cached_values.value()...));
70template<
typename TCombiner, constraint::decayed_type... Types>
76 utils::spinlock spinlock{};
85 RPP_NO_UNIQUE_ADDRESS TCombiner m_combiner;
86 RPP_NO_UNIQUE_ADDRESS std::tuple<TOtherObservable...> m_other_observables;
89 static constexpr size_t s_index_of_source_type = 0;
98 void subscribe_other_observables(std::index_sequence<I...>,
100 const auto& subscriber,
101 const std::shared_ptr<
combine_latest_state<TCombiner, Type, utils::extract_observable_type_t<TOtherObservable>...>>& state)
const
105 (subscribe_observable<I + 1>(std::get<I>(m_other_observables), subscriber, state), ...);
108 template<
size_t I, constra
int::observable TObservable>
109 static void subscribe_observable(
const TObservable& observable,
const auto& subscriber,
const std::shared_ptr<
combine_latest_state<TCombiner, Type, utils::extract_observable_type_t<TOtherObservable>...>>& state)
111 using ValueType = utils::extract_observable_type_t<TObservable>;
112 observable.subscribe(create_inner_subscriber<ValueType, I>(subscriber, state));
115 template<
typename ValueType,
size_t I>
116 static auto create_inner_subscriber(
auto&& subscriber,
117 std::shared_ptr<
combine_latest_state<TCombiner, Type, utils::extract_observable_type_t<TOtherObservable>...>> state)
119 auto subscription = state->children_subscriptions.make_child();
120 return create_subscriber_with_state<ValueType>(std::move(subscription),
124 std::forward<decltype(subscriber)>(subscriber),
130 using DownstreamType = utils::decayed_invoke_result_t<TCombiner, Type, utils::extract_observable_type_t<TOtherObservable>...>;
132 template<constra
int::subscriber_of_type<DownstreamType> TSub>
133 auto operator()(TSub&& in_subscriber)
const
135 auto state = std::make_shared<combine_latest_state_with_serialized_spinlock<TCombiner, Type, utils::extract_observable_type_t<TOtherObservable>...>>(m_combiner, in_subscriber.get_subscription());
137 auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber), std::shared_ptr<utils::spinlock>{state, &state->spinlock});
139 state->count_of_on_completed_needed.store(
sizeof...(TOtherObservable) + 1, std::memory_order::relaxed);
142 subscribe_other_observables(std::index_sequence_for<TOtherObservable...>{}, subscriber, state);
145 return create_inner_subscriber<Type, s_index_of_source_type>(std::move(subscriber), std::move(state));
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
Definition: constraints.hpp:19
"combine_latest" operator (an OperatorFn used by "lift").
Definition: combine_latest.hpp:84
Definition: combine_latest.hpp:48
Definition: combine_latest.hpp:72
The state that caches the values from all the observables and dispatches latest caches to the observe...
Definition: combine_latest.hpp:35
Definition: early_unsubscribe.hpp:28