ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
with_latest_from.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12
13#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
14
15#include <rpp/defs.hpp>
16
17#include <rpp/observables/constraints.hpp>
18#include <rpp/operators/fwd/with_latest_from.hpp>
19#include <rpp/operators/merge.hpp>
20#include <rpp/subscribers/constraints.hpp>
21#include <rpp/utils/utilities.hpp>
22#include <rpp/utils/functors.hpp>
23#include <rpp/utils/spinlock.hpp>
24
25#include <mutex>
26#include <array>
27#include <tuple>
28
29IMPLEMENTATION_FILE(with_latest_from_tag);
30
31namespace rpp::details
32{
33template<typename TSelector, constraint::decayed_type... ValueTypes>
35{
36 with_latest_from_state(const TSelector& selector, const composite_subscription& subscription_of_subscriber)
37 : early_unsubscribe_state{subscription_of_subscriber}
38 , selector(selector) {}
39
40 // RPP_NO_UNIQUE_ADDRESS commented due to MSVC issue for base classes
41 /*RPP_NO_UNIQUE_ADDRESS*/ TSelector selector;
42 std::array<std::mutex, sizeof...(ValueTypes)> mutexes{};
43 std::tuple<std::optional<ValueTypes>...> vals{};
44};
45
46template<size_t I>
48{
49 void operator()(auto&& value, const constraint::subscriber auto&, const auto& state) const
50 {
51 std::lock_guard lock{state->mutexes[I]};
52 std::get<I>(state->vals) = std::forward<decltype(value)>(value);
53 }
54};
55
58
59template<size_t I, constraint::observable TObs>
60void with_latest_from_subscribe(const auto& state_ptr, const TObs& observable, const auto& subscriber)
61{
62 using Type = utils::extract_observable_type_t<TObs>;
63 observable.subscribe(create_subscriber_with_state<Type>(state_ptr->children_subscriptions.make_child(),
66 [](const auto&, const auto&) {},
67 subscriber,
68 state_ptr));
69}
70
71template<size_t...I>
72void with_latest_from_subscribe_observables(std::index_sequence<I...>,
73 const auto& state_ptr,
74 const auto& subscriber,
75 const auto& observables_tuple)
76{
77 (with_latest_from_subscribe<I>(state_ptr, std::get<I>(observables_tuple), subscriber), ...);
78}
79
80template<typename TSelector, constraint::decayed_type... ValueTypes>
82{
83 template<typename T>
84 void operator()(T&& v, const auto& sub, const auto& state) const
85 {
86 using ResultType = utils::decayed_invoke_result_t<TSelector, std::decay_t<T>, ValueTypes...>;
87
88 auto result = std::apply([&](const auto&...current_cached_vals) -> std::optional<ResultType>
89 {
90 auto lock = std::apply([](auto&...mutexes)
91 {
92 return std::scoped_lock{mutexes...};
93 },
94 state->mutexes);
95
96 if ((current_cached_vals.has_value() && ...))
97 return state->selector(utils::as_const(std::forward<T>(v)),
98 utils::as_const(current_cached_vals.value())...);
99 return std::nullopt;
100 },
101 state->vals);
102
103 if (result.has_value())
104 sub.on_next(std::move(result.value()));
105 }
106};
107
108template<typename TSelector, constraint::decayed_type... ValueTypes>
110{
111 using with_latest_from_state<TSelector, ValueTypes...>::with_latest_from_state;
112
113 // we can use spinlock there because 99.9% of time only one ever thread would send values from on_next (main observable), but we have small probability to get error from inner observables immediately
114 utils::spinlock spinlock{};
115};
116
117template<constraint::decayed_type Type, typename TSelector, constraint::observable ...TObservables>
119{
120 using ResultType = utils::decayed_invoke_result_t<
121 TSelector, Type, utils::extract_observable_type_t<TObservables>...>;
122
123 RPP_NO_UNIQUE_ADDRESS TSelector selector;
124 RPP_NO_UNIQUE_ADDRESS std::tuple<TObservables...> observables;
125
126 template<constraint::subscriber_of_type<ResultType> TSub>
127 auto operator()(TSub&& in_subscriber) const
128 {
129 auto state = std::make_shared<with_latest_from_state_with_serialized_spinlock<TSelector, utils::extract_observable_type_t<TObservables>...>>(selector, in_subscriber.get_subscription());
130 // change subscriber to serialized to avoid manual using of mutex
131 auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber), std::shared_ptr<utils::spinlock>{state, &state->spinlock});
132
133 with_latest_from_subscribe_observables(std::index_sequence_for<TObservables...>{},
134 state,
135 subscriber,
136 observables);
137
138 auto sub = state->children_subscriptions.make_child();
139 return create_subscriber_with_state<Type>(std::move(sub),
140 with_latest_from_on_next_outer<TSelector, utils::extract_observable_type_t<TObservables>...>{},
143 std::move(subscriber),
144 std::move(state));
145 }
146};
147} // namespace rpp::details
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
Definition: constraints.hpp:19
Definition: constraints.hpp:19
Definition: early_unsubscribe.hpp:39
Definition: early_unsubscribe.hpp:28
Definition: early_unsubscribe.hpp:19
Definition: with_latest_from.hpp:119
Definition: with_latest_from.hpp:48
Definition: with_latest_from.hpp:82
Definition: with_latest_from.hpp:35