ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
composite_subscription.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include "rpp/utils/utilities.hpp"
14
15#include <rpp/subscriptions/subscription_base.hpp>
16#include <rpp/subscriptions/callback_subscription.hpp>
17#include <rpp/utils/constraints.hpp>
18#include <rpp/subscriptions/constraints.hpp>
19
20#include <algorithm>
21#include <functional>
22#include <vector>
23
24namespace rpp
25{
30{
31public:
32 composite_subscription() : subscription_base{std::make_shared<state>()} {}
33
36 composite_subscription& operator=(const composite_subscription& other) = default;
37 composite_subscription& operator=(composite_subscription&& other) noexcept = default;
38
42 template<constraint::subscription TSub = subscription_base>
43 std::weak_ptr<details::subscription_state> add(const TSub &sub = TSub{}) const
44 {
45 if (static_cast<const subscription_base *>(&sub) == static_cast<const subscription_base *>(this))
46 return sub.get_state();
47
48 if (const auto pstate = std::static_pointer_cast<state>(get_state()))
49 pstate->add(sub.get_state());
50 else
51 sub.unsubscribe();
52 return sub.get_state();
53 }
54
58 std::weak_ptr<details::subscription_state> add(const callback_subscription &sub) const
59 {
60 return add<callback_subscription>(sub);
61 }
62
63 composite_subscription make_child() const
64 {
66 ret.add([weak_handle = add(ret), state = std::weak_ptr{std::static_pointer_cast<state>(get_state())}]
67 {
68 // add cleanup
69 if (const auto locked_state = state.lock())
70 if (const auto locked_handle = weak_handle.lock())
71 locked_state->remove(locked_handle);
72 });
73 return ret;
74 }
75
76 void remove(const subscription_base &sub) const
77 {
78 if (const auto pstate = std::static_pointer_cast<state>(get_state()))
79 pstate->remove(sub.get_state());
80 }
81
82 void remove(const std::weak_ptr<details::subscription_state>& sub) const
83 {
84 if (const auto locked = sub.lock())
85 if (const auto pstate = std::static_pointer_cast<state>(get_state()))
86 pstate->remove(locked);
87 }
88
89 bool is_empty() const
90 {
91 return !get_state();
92 }
93
94 static composite_subscription empty()
95 {
96 return composite_subscription{empty_tag{}};
97 }
98
99private:
100 struct empty_tag{};
101
102 composite_subscription(const empty_tag&)
103 : subscription_base{std::shared_ptr<details::subscription_state>{}} {}
104
105 class state final : public details::subscription_state
106 {
107 public:
108 state() = default;
109
110 void add(std::shared_ptr<details::subscription_state> sub)
111 {
112 if (!sub || !sub->is_subscribed())
113 return;
114
115 while (true)
116 {
117 DepsState expected{DepsState::None};
118 if (m_state.compare_exchange_strong(expected, DepsState::Edit, std::memory_order::acq_rel))
119 {
120 m_deps.push_back(std::move(sub));
121
122 m_state.store(DepsState::None, std::memory_order::release);
123 return;
124 }
125
126 if (expected == DepsState::Unsubscribed)
127 {
128 sub->unsubscribe();
129 return;
130 }
131 }
132 }
133
134 void remove(const std::shared_ptr<details::subscription_state>& sub)
135 {
136 while (true)
137 {
138 DepsState expected{DepsState::None};
139 if (m_state.compare_exchange_strong(expected, DepsState::Edit, std::memory_order::acq_rel))
140 {
141 std::erase(m_deps, sub);
142
143 m_state.store(DepsState::None, std::memory_order::release);
144 return;
145 }
146
147 if (expected == DepsState::Unsubscribed)
148 return;
149 }
150 }
151
152 private:
153 void on_unsubscribe() override
154 {
155 while (true)
156 {
157 DepsState expected{DepsState::None};
158 if (m_state.compare_exchange_strong(expected, DepsState::Unsubscribed, std::memory_order::acq_rel))
159 {
160 rpp::utils::for_each(m_deps, std::mem_fn(&details::subscription_state::unsubscribe));
161 m_deps.clear();
162 return;
163 }
164 }
165 }
166
167 private:
168 enum class DepsState : uint8_t
169 {
170 None, //< default state
171 Edit, //< set it during adding new element into deps or removing. After success -> FallBack to None
172 Unsubscribed //< permanent state after unsubscribe
173 };
174
175 std::atomic<DepsState> m_state{DepsState::None};
176 std::vector<std::shared_ptr<details::subscription_state>> m_deps{};
177 };
178};
179} // namespace rpp
Subscription which invoke callbable during unsubscribe.
Definition: callback_subscription.hpp:25
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
std::weak_ptr< details::subscription_state > add(const callback_subscription &sub) const
Add callback/function subscription to this as dependent.
Definition: composite_subscription.hpp:58
std::weak_ptr< details::subscription_state > add(const TSub &sub=TSub{}) const
Add any other subscription to this as dependent.
Definition: composite_subscription.hpp:43
Base subscription implementation used as base class/interface and core implementation for derrived su...
Definition: subscription_base.hpp:25