ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
combine_latest.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// TC Wang 2022 - present.
5// Distributed under the Boost Software License, Version 1.0.
6// (See accompanying file LICENSE_1_0.txt or copy at
7// https://www.boost.org/LICENSE_1_0.txt)
8//
9// Project home: https://github.com/victimsnino/ReactivePlusPlus
10//
11
12#pragma once
13
14#include <rpp/defs.hpp> // RPP_NO_UNIQUE_ADDRESS
15#include <rpp/operators/lift.hpp> // required due to operator uses lift
16#include <rpp/operators/merge.hpp> // merge_state
17#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
18#include <rpp/operators/fwd/combine_latest.hpp> // own forwarding
19#include <rpp/subscribers/constraints.hpp> // constraint::subscriber_of_type
20#include <rpp/utils/spinlock.hpp> // spinlock
21
22#include <algorithm>
23
24IMPLEMENTATION_FILE(combine_latest_tag);
25
26namespace rpp::details
27{
33template<typename TCombiner, constraint::decayed_type... Types>
35{
36 explicit combine_latest_state(const TCombiner& combiner, const composite_subscription& subscription_of_subscriber)
37 : merge_state(subscription_of_subscriber)
38 , combiner(combiner) {}
39
40 // don't use NO_UNIQUE_ADDRESS there due to issue in MSVC base class becomes invalid
41 /*NO_UNIQUE_ADDRESS*/ TCombiner combiner;
42 std::mutex values_mutex{};
43 std::tuple<std::optional<Types>...> values{};
44};
45
46template<size_t I>
48{
49 template<typename TCombiner, constraint::decayed_type... Types>
50 void operator()(auto&& value,
51 const auto& subscriber,
52 const std::shared_ptr<combine_latest_state<TCombiner, Types...>>& state) const
53 {
54 // mutex need to be locked during changing of values, generating new values and sending of new values due to we can't update value while we are sending old one
55 std::scoped_lock lock{state->values_mutex};
56 std::get<I>(state->values) = std::forward<decltype(value)>(value);
57
58 std::apply([&](const auto&...cached_values)
59 {
60 if ((cached_values.has_value() && ...))
61 subscriber.on_next(state->combiner(cached_values.value()...));
62 },
63 state->values);
64 }
65};
66
69
70template<typename TCombiner, constraint::decayed_type... Types>
72{
73 using combine_latest_state<TCombiner,Types...>::combine_latest_state;
74
75 // we can use spinlock there because 99.9% of time only one ever thread would send values from on_next serialized (due to values_mutex), but we have small probability to get error from another observable immediately
76 utils::spinlock spinlock{};
77};
78
82template<constraint::decayed_type Type, typename TCombiner, constraint::observable ...TOtherObservable>
84{
85 RPP_NO_UNIQUE_ADDRESS TCombiner m_combiner;
86 RPP_NO_UNIQUE_ADDRESS std::tuple<TOtherObservable...> m_other_observables;
87
88private:
89 static constexpr size_t s_index_of_source_type = 0;
90
97 template<size_t...I>
98 void subscribe_other_observables(std::index_sequence<I...>,
99 // Used in compile time for variadic expansion
100 const auto& subscriber,
101 const std::shared_ptr<combine_latest_state<TCombiner, Type, utils::extract_observable_type_t<TOtherObservable>...>>& state) const
102 {
103 // +1 because the first element in tuple is the current observable, and you want to subscribe to the 'other' observables.
104 // (Use variadic expansion to iterate the observables)
105 (subscribe_observable<I + 1>(std::get<I>(m_other_observables), subscriber, state), ...);
106 }
107
108 template<size_t I, constraint::observable TObservable>
109 static void subscribe_observable(const TObservable& observable, const auto& subscriber, const std::shared_ptr<combine_latest_state<TCombiner, Type, utils::extract_observable_type_t<TOtherObservable>...>>& state)
110 {
111 using ValueType = utils::extract_observable_type_t<TObservable>;
112 observable.subscribe(create_inner_subscriber<ValueType, I>(subscriber, state));
113 }
114
115 template<typename ValueType, size_t I>
116 static auto create_inner_subscriber(auto&& subscriber,
117 std::shared_ptr<combine_latest_state<TCombiner, Type, utils::extract_observable_type_t<TOtherObservable>...>> state)
118 {
119 auto subscription = state->children_subscriptions.make_child();
120 return create_subscriber_with_state<ValueType>(std::move(subscription),
124 std::forward<decltype(subscriber)>(subscriber),
125 std::move(state));
126 }
127
128
129public:
130 using DownstreamType = utils::decayed_invoke_result_t<TCombiner, Type, utils::extract_observable_type_t<TOtherObservable>...>;
131
132 template<constraint::subscriber_of_type<DownstreamType> TSub>
133 auto operator()(TSub&& in_subscriber) const
134 {
135 auto state = std::make_shared<combine_latest_state_with_serialized_spinlock<TCombiner, Type, utils::extract_observable_type_t<TOtherObservable>...>>(m_combiner, in_subscriber.get_subscription());
136 // change subscriber to serialized to avoid manual using of mutex
137 auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber), std::shared_ptr<utils::spinlock>{state, &state->spinlock});
138
139 state->count_of_on_completed_needed.store(sizeof...(TOtherObservable) + 1, std::memory_order::relaxed);
140
141 // Subscribe to other observables and redirect on_next event to state
142 subscribe_other_observables(std::index_sequence_for<TOtherObservable...>{}, subscriber, state);
143
144 // Redirect values from this observable to the state for value composition
145 return create_inner_subscriber<Type, s_index_of_source_type>(std::move(subscriber), std::move(state));
146 }
147};
148} // namespace rpp::details
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
Definition: constraints.hpp:19
"combine_latest" operator (an OperatorFn used by "lift").
Definition: combine_latest.hpp:84
Definition: combine_latest.hpp:48
The state that caches the values from all the observables and dispatches latest caches to the observe...
Definition: combine_latest.hpp:35
Definition: early_unsubscribe.hpp:28
Definition: merge.hpp:42
Definition: merge.hpp:32