ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
switch_on_next.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/lift.hpp> // required due to operator uses lift
14#include <rpp/operators/merge.hpp>
15#include <rpp/operators/fwd/switch_on_next.hpp>
16#include <rpp/subscribers/constraints.hpp>
17#include <rpp/utils/functors.hpp>
18#include <rpp/utils/spinlock.hpp>
19
20
21#include <atomic>
22#include <memory>
23
24IMPLEMENTATION_FILE(switch_on_next_tag);
25
26namespace rpp::details
27{
29{
30 using merge_state::merge_state;
31
32 composite_subscription current_inner_observable = rpp::composite_subscription::empty();
33};
34
36{
37 void operator()(const constraint::subscriber auto& sub,
38 const std::shared_ptr<switch_on_next_state>& state) const
39 {
40 // 1 because decrement happens in composite_subscription_callback
41 if (state->count_of_on_completed_needed.load(std::memory_order::acquire) == 1)
42 sub.on_completed();
43 }
44};
45
46using switch_on_next_on_next_inner = merge_forwarding_on_next;
48
50{
51 template<constraint::observable TObs>
52 void operator()(const TObs& new_observable,
53 const constraint::subscriber auto& sub,
54 const std::shared_ptr<switch_on_next_state>& state) const
55 {
56 using ValueType = utils::extract_observable_type_t<TObs>;
57
58 state->current_inner_observable.unsubscribe();
59 state->current_inner_observable = state->children_subscriptions.make_child();
60 state->current_inner_observable.add([state = std::weak_ptr{state}]
61 {
62 if (const auto locked = state.lock())
63 locked->count_of_on_completed_needed.fetch_sub(1, std::memory_order::relaxed);
64 });
65
66 state->count_of_on_completed_needed.fetch_add(1, std::memory_order::relaxed);
67
68 new_observable.subscribe(create_subscriber_with_state<ValueType>(state->current_inner_observable,
69 switch_on_next_on_next_inner{},
72 sub,
73 state));
74 }
75};
76
78
80{
81 using switch_on_next_state::switch_on_next_state;
82
83 // we can use spinlock there because 99.9% of time only one ever thread would send values from on_next (only one active observable), but we have small probability to get error from main observable immediately
84 utils::spinlock spinlock{};
85};
86
87template<constraint::decayed_type Type>
89{
90 using ValueType = utils::extract_observable_type_t<Type>;
91
92 template<constraint::subscriber_of_type<ValueType> TSub>
93 auto operator()(TSub&& in_subscriber) const
94 {
95 auto state = std::make_shared<switch_on_next_state_with_serialized_spinlock>(in_subscriber.get_subscription());
96
97 // change subscriber to serialized to avoid manual using of mutex
98 auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber), std::shared_ptr<utils::spinlock>{state, &state->spinlock});
99
100 state->count_of_on_completed_needed.fetch_add(1, std::memory_order::relaxed);
101
102 auto subscription = state->children_subscriptions.make_child();
103 return create_subscriber_with_state<Type>(std::move(subscription),
107 std::move(subscriber),
108 std::move(state));
109 }
110};
111} // namespace rpp::details
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
Definition: constraints.hpp:19
Definition: early_unsubscribe.hpp:28
Definition: merge.hpp:42
Definition: merge.hpp:32
Definition: switch_on_next.hpp:89
Definition: switch_on_next.hpp:36
Definition: switch_on_next.hpp:50
Definition: switch_on_next.hpp:29