ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
concat.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/schedulers/immediate_scheduler.hpp>
14#include <rpp/observables/dynamic_observable.hpp> // dynamic_observable
15#include <rpp/operators/lift.hpp> // required due to operator uses lift
16#include <rpp/operators/merge.hpp> // merge_forwarding_on_next/merge_on_error
17#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
18#include <rpp/operators/fwd/concat.hpp> // own forwarding
19#include <rpp/sources/just.hpp>
20#include <rpp/subscribers/constraints.hpp> // constraint::subscriber_of_type
21#include <rpp/subscriptions/composite_subscription.hpp> // composite_subscription
22#include <rpp/utils/functors.hpp>
23#include <rpp/utils/spinlock.hpp>
24
25#include <memory>
26#include <mutex>
27#include <queue>
28
29
30IMPLEMENTATION_FILE(concat_tag);
31
32namespace rpp::details
33{
34template<constraint::decayed_type ValueType>
36{
37 concat_state(const composite_subscription& subscription_of_subscriber)
38 : early_unsubscribe_state{subscription_of_subscriber}
39 , source_subscription{children_subscriptions.make_child()} {}
40
41 composite_subscription source_subscription;
42 std::mutex queue_mutex{};
43 std::queue<dynamic_observable<ValueType>> observables_to_subscribe{};
44 std::atomic_bool inner_subscribed{};
45};
46
47using concat_on_next_inner = merge_forwarding_on_next;
49
50template<constraint::decayed_type ValueType>
52{
53 template<constraint::observable TObs, constraint::subscriber TSub>
54 void operator()(TObs&& new_observable,
55 const TSub& sub,
56 const std::shared_ptr<concat_state<ValueType>>& state) const
57 {
58 if (state->inner_subscribed.exchange(true, std::memory_order::acq_rel))
59 {
60 std::lock_guard lock{state->queue_mutex};
61 if (state->inner_subscribed.exchange(true, std::memory_order::relaxed))
62 {
63 state->observables_to_subscribe.push(std::forward<TObs>(new_observable).as_dynamic());
64 return;
65 }
66 }
67 subscribe_inner_subscriber(new_observable, sub, state);
68 }
69private:
70 static void subscribe_inner_subscriber(const auto& observable,
71 const constraint::subscriber auto& subscriber,
72 const std::shared_ptr<concat_state<ValueType>>& state)
73 {
74 observable.subscribe(create_subscriber_with_state<ValueType>(
75 state->children_subscriptions.make_child(),
76 concat_on_next_inner{},
78 [](const constraint::subscriber auto& sub, const std::shared_ptr<concat_state<ValueType>>& state)
79 {
80 {
81 std::unique_lock lock{state->queue_mutex};
82 if (!state->observables_to_subscribe.empty())
83 {
84 auto res = std::move(state->observables_to_subscribe.front());
85 state->observables_to_subscribe.pop();
86 lock.unlock();
87 subscribe_inner_subscriber(res, sub, state);
88 return;
89 }
90 if (state->source_subscription.is_subscribed())
91 {
92 state->inner_subscribed.store(false, std::memory_order::relaxed);
93 return;
94 }
95 }
96 sub.on_completed();
97 },
98 subscriber,
99 state));
100 }
101};
102
103
104template<constraint::decayed_type ValueType>
106{
107 void operator()(const constraint::subscriber auto& sub,
108 const std::shared_ptr<concat_state<ValueType>>& state) const
109 {
110 std::unique_lock lock{state->queue_mutex};
111 if (!state->inner_subscribed.load(std::memory_order::relaxed))
112 sub.on_completed();
113 }
114};
115
116
117template<constraint::decayed_type ValueType>
119{
120 using concat_state<ValueType>::concat_state;
121
122 // we can use spinlock there because 99.9% of time only one ever thread would send values from on_next (only one active observable), but we have small probability to get error from main observable immediately
123 utils::spinlock spinlock{};
124};
125
126template<constraint::decayed_type Type>
128{
129 using ValueType = utils::extract_observable_type_t<Type>;
130
131 template<constraint::subscriber_of_type<ValueType> TSub>
132 auto operator()(TSub&& in_subscriber) const
133 {
134 auto state = std::make_shared<concat_state_with_serialized_spinlock<ValueType>>(in_subscriber.get_subscription());
135
136 // change subscriber to serialized to avoid manual using of mutex
137 auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber), std::shared_ptr<utils::spinlock>{state, &state->spinlock});
138
139 return create_subscriber_with_state<Type>(state->source_subscription,
143 std::move(subscriber),
144 std::move(state));
145 }
146};
147
148template<constraint::decayed_type Type, constraint::observable_of_type<Type> ... TObservables>
149auto concat_with_impl(TObservables&&... observables)
150{
151 return source::just(rpp::schedulers::immediate{}, std::forward<TObservables>(observables).as_dynamic()...).concat();
152}
153} // namespace rpp::details
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
immediately calls provided schedulable or waits for time_point (in the caller-thread)
Definition: immediate_scheduler.hpp:28
Definition: constraints.hpp:19
Definition: concat.hpp:128
Definition: concat.hpp:106
Definition: concat.hpp:52
Definition: concat.hpp:36
Definition: early_unsubscribe.hpp:28
Definition: early_unsubscribe.hpp:19