ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
composite_disposable.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/disposables/fwd.hpp>
14
15#include <rpp/disposables/details/container.hpp>
16#include <rpp/disposables/disposable_wrapper.hpp>
17#include <rpp/disposables/interface_composite_disposable.hpp>
18
19#include <atomic>
20
21namespace rpp
22{
29 template<details::disposables::constraint::disposables_container Container>
30 class composite_disposable_impl : public interface_composite_disposable
31 {
32 public:
33 composite_disposable_impl() = default;
34 composite_disposable_impl(const composite_disposable_impl&) = delete;
35 composite_disposable_impl(composite_disposable_impl&& other) noexcept = delete;
36
37 bool is_disposed() const noexcept final
38 {
39 // just need atomicity, not guarding anything
40 return m_current_state.load(std::memory_order::seq_cst) == State::Disposed;
41 }
42
43 void dispose_impl(interface_disposable::Mode mode) noexcept final
44 {
45 while (true)
46 {
47 State expected{State::None};
48 // need to acquire possible state changing from `add`
49 if (m_current_state.compare_exchange_strong(expected, State::Disposed, std::memory_order::seq_cst))
50 {
51 composite_dispose_impl(mode);
52
53 m_disposables.dispose();
54 m_disposables.clear();
55 return;
56 }
57
58 if (expected == State::Disposed)
59 return;
60 }
61 }
62
63 using interface_composite_disposable::add;
64
65 void add(disposable_wrapper disposable) override
66 {
67 if (disposable.is_disposed() || disposable.lock().get() == this)
68 return;
69
70 while (true)
71 {
72 State expected{State::None};
73 // need to acquire possible disposables state changing from other `add`
74 if (m_current_state.compare_exchange_strong(expected, State::Edit, std::memory_order::seq_cst))
75 {
76 try
77 {
78 m_disposables.push_back(std::move(disposable));
79 }
80 catch (...)
81 {
82 m_current_state.store(State::None, std::memory_order::seq_cst);
83 throw;
84 }
85 // need to propogate disposables state changing to others
86 m_current_state.store(State::None, std::memory_order::seq_cst);
87 return;
88 }
89
90 if (expected == State::Disposed)
91 {
92 disposable.dispose();
93 return;
94 }
95 }
96 }
97
98 void remove(const disposable_wrapper& disposable) override
99 {
100 while (true)
101 {
102 State expected{State::None};
103 // need to acquire possible disposables state changing from other `add` or `remove`
104 if (m_current_state.compare_exchange_strong(expected, State::Edit, std::memory_order::seq_cst))
105 {
106 try
107 {
108 m_disposables.remove(disposable);
109 }
110 catch (...)
111 {
112 m_current_state.store(State::None, std::memory_order::seq_cst);
113 throw;
114 }
115 // need to propogate disposables state changing to others
116 m_current_state.store(State::None, std::memory_order::seq_cst);
117 return;
118 }
119
120 if (expected == State::Disposed)
121 return;
122 }
123 }
124
125 void clear() override
126 {
127 while (true)
128 {
129 State expected{State::None};
130 // need to acquire possible disposables state changing from other `add` or `remove`
131 if (m_current_state.compare_exchange_strong(expected, State::Edit, std::memory_order::seq_cst))
132 {
133 try
134 {
135 m_disposables.dispose();
136 m_disposables.clear();
137 }
138 catch (...)
139 {
140 m_current_state.store(State::None, std::memory_order::seq_cst);
141 throw;
142 }
143 // need to propogate disposables state changing to others
144 m_current_state.store(State::None, std::memory_order::seq_cst);
145 return;
146 }
147
148 if (expected == State::Disposed)
149 return;
150 }
151 }
152
153 protected:
154 virtual void composite_dispose_impl(interface_disposable::Mode) noexcept {}
155
156 private:
157 enum class State : uint8_t
158 {
159 None, // default state
160 Edit, // set it during adding new element into deps or removing. After success -> back to None
161 Disposed // permanent state after dispose
162 };
163
164 Container m_disposables{};
165 std::atomic<State> m_current_state{};
166 };
167
174 class composite_disposable : public composite_disposable_impl<rpp::details::disposables::default_disposables_container>
175 {
176 };
177} // namespace rpp
bool is_disposed() const noexcept final
Check if this disposable is just disposed.
Definition composite_disposable.hpp:37
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition interface_composite_disposable.hpp:19