ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
subscribe.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/observables/fwd.hpp>
14#include <rpp/observers/fwd.hpp>
15#include <rpp/operators/fwd.hpp>
16
17#include <rpp/disposables/composite_disposable.hpp>
18#include <rpp/disposables/disposable_wrapper.hpp>
19#include <rpp/observers/observer.hpp>
20#include <rpp/utils/functors.hpp>
21
22#include <utility>
23
24namespace rpp::operators::details
25{
26 template<typename... Args>
28
29 template<rpp::constraint::decayed_type Type, rpp::constraint::observer_strategy<Type> ObserverStrategy>
30 class subscribe_t<observer<Type, ObserverStrategy>>
31 {
32 public:
33 explicit subscribe_t(observer<Type, ObserverStrategy>&& observer)
34 : m_observer{std::move(observer)}
35 {
36 }
37
38 template<rpp::constraint::observable_strategy<Type> Strategy>
39 void operator()(const rpp::observable<Type, Strategy>& observable) &&
40 {
41 observable.subscribe(std::move(m_observer));
42 }
43
44 private:
46 };
47
48 template<rpp::constraint::observer_strategy_base ObserverStrategy>
49 class subscribe_t<ObserverStrategy>
50 {
51 public:
52 explicit subscribe_t(ObserverStrategy&& observer_strategy)
53 : m_observer_strategy{std::move(observer_strategy)}
54 {
55 }
56
57 explicit subscribe_t(const ObserverStrategy& observer_strategy)
58 : m_observer_strategy{observer_strategy}
59 {
60 }
61
62 template<rpp::constraint::observable Observable>
63 void operator()(const Observable& observable) const &
64 {
65 static_assert(rpp::constraint::observer_strategy<ObserverStrategy, rpp::utils::extract_observable_type_t<Observable>>, "observer and observable should be of same type");
66 observable.subscribe(m_observer_strategy);
67 }
68
69 template<rpp::constraint::observable Observable>
70 void operator()(const Observable& observable) &&
71 {
72 static_assert(rpp::constraint::observer_strategy<ObserverStrategy, rpp::utils::extract_observable_type_t<Observable>>, "observer and observable should be of same type");
73 observable.subscribe(std::move(m_observer_strategy));
74 }
75
76 private:
77 ObserverStrategy m_observer_strategy;
78 };
79
80 template<rpp::constraint::decayed_type Type, rpp::constraint::observer_strategy<Type> ObserverStrategy>
81 class subscribe_t<rpp::composite_disposable_wrapper, observer<Type, ObserverStrategy>>
82 {
83 public:
85 : m_disposable{std::move(d)}
86 , m_observer{std::move(observer)}
87 {
88 }
89
90 template<rpp::constraint::observable_strategy<Type> Strategy>
92 {
93 observable.subscribe(observer_with_external_disposable<Type, observer<Type, ObserverStrategy>>{m_disposable, std::move(m_observer)});
94 return m_disposable;
95 }
96
97 private:
100 };
101
102 template<rpp::constraint::observer_strategy_base ObserverStrategy>
103 class subscribe_t<rpp::composite_disposable_wrapper, ObserverStrategy>
104 {
105 public:
106 explicit subscribe_t(rpp::composite_disposable_wrapper&& d, ObserverStrategy&& observer_strategy)
107 : m_disposable{std::move(d)}
108 , m_observer_strategy{std::move(observer_strategy)}
109 {
110 }
111
112 explicit subscribe_t(rpp::composite_disposable_wrapper&& d, const ObserverStrategy& observer_strategy)
113 : m_disposable{std::move(d)}
114 , m_observer_strategy{observer_strategy}
115 {
116 }
117
118 template<rpp::constraint::observable Observable>
119 rpp::composite_disposable_wrapper operator()(const Observable& observable) const &
120 {
121 static_assert(rpp::constraint::observer_strategy<ObserverStrategy, rpp::utils::extract_observable_type_t<Observable>>, "observer and observable should be of same type");
122 observable.subscribe(m_disposable, m_observer_strategy);
123 return m_disposable;
124 }
125
126 template<rpp::constraint::observable Observable>
127 rpp::composite_disposable_wrapper operator()(const Observable& observable) &&
128 {
129 static_assert(rpp::constraint::observer_strategy<ObserverStrategy, rpp::utils::extract_observable_type_t<Observable>>, "observer and observable should be of same type");
130 observable.subscribe(m_disposable, std::move(m_observer_strategy));
131 return m_disposable;
132 }
133
134 private:
136 ObserverStrategy m_observer_strategy;
137 };
138
139 template<typename OnNext, std::invocable<const std::exception_ptr&> OnError, std::invocable<> OnCompleted>
140 class subscribe_t<OnNext, OnError, OnCompleted>
141 {
142 public:
143 template<rpp::constraint::decayed_same_as<OnNext> TOnNext, rpp::constraint::decayed_same_as<OnError> TOnError, rpp::constraint::decayed_same_as<OnCompleted> TOnCompleted>
145 explicit subscribe_t(TOnNext&& on_next, TOnError&& on_error, TOnCompleted&& on_completed)
146 : m_on_next{std::forward<TOnNext>(on_next)}
147 , m_on_error{std::forward<TOnError>(on_error)}
148 , m_on_completed{std::forward<TOnCompleted>(on_completed)}
149 {
150 }
151
152 template<rpp::constraint::decayed_type Type, rpp::constraint::observable_strategy<Type> Strategy>
153 void operator()(const rpp::observable<Type, Strategy>& observable) &&
154 {
155 static_assert(std::invocable<OnNext, Type>, "OnNext should be suitable for type of observable");
156 observable.subscribe(std::move(m_on_next), std::move(m_on_error), std::move(m_on_completed));
157 }
158
159 template<rpp::constraint::decayed_type Type, rpp::constraint::observable_strategy<Type> Strategy>
160 void operator()(const rpp::observable<Type, Strategy>& observable) const &
161 {
162 static_assert(std::invocable<OnNext, Type>, "OnNext should be suitable for type of observable");
163 observable.subscribe(m_on_next, m_on_error, m_on_completed);
164 }
165
166 private:
167 RPP_NO_UNIQUE_ADDRESS OnNext m_on_next;
168 RPP_NO_UNIQUE_ADDRESS OnError m_on_error;
169 RPP_NO_UNIQUE_ADDRESS OnCompleted m_on_completed;
170 };
171
172 template<typename OnNext, std::invocable<const std::exception_ptr&> OnError, std::invocable<> OnCompleted>
173 class subscribe_t<rpp::composite_disposable_wrapper, OnNext, OnError, OnCompleted>
174 {
175 public:
176 template<rpp::constraint::decayed_same_as<OnNext> TOnNext, rpp::constraint::decayed_same_as<OnError> TOnError, rpp::constraint::decayed_same_as<OnCompleted> TOnCompleted>
177 explicit subscribe_t(rpp::composite_disposable_wrapper d, TOnNext&& on_next, TOnError&& on_error, TOnCompleted&& on_completed)
178 : m_disposable{std::move(d)}
179 , m_on_next{std::forward<TOnNext>(on_next)}
180 , m_on_error{std::forward<TOnError>(on_error)}
181 , m_on_completed{std::forward<TOnCompleted>(on_completed)}
182 {
183 }
184
185 template<rpp::constraint::decayed_type Type, rpp::constraint::observable_strategy<Type> Strategy>
187 {
188 static_assert(std::invocable<OnNext, Type>, "OnNext should be suitable for type of observable");
189 observable.subscribe(m_disposable, std::move(m_on_next), std::move(m_on_error), std::move(m_on_completed));
190 return std::move(m_disposable);
191 }
192
193 template<rpp::constraint::decayed_type Type, rpp::constraint::observable_strategy<Type> Strategy>
195 {
196 static_assert(std::invocable<OnNext, Type>, "OnNext should be suitable for type of observable");
197 observable.subscribe(m_disposable, m_on_next, m_on_error, m_on_completed);
198 return m_disposable;
199 }
200
201 private:
203 RPP_NO_UNIQUE_ADDRESS OnNext m_on_next;
204 RPP_NO_UNIQUE_ADDRESS OnError m_on_error;
205 RPP_NO_UNIQUE_ADDRESS OnCompleted m_on_completed;
206 };
207
208 template<typename... Args>
209 subscribe_t(const Args&...) -> subscribe_t<Args...>;
210
211 template<typename OnNext>
213} // namespace rpp::operators::details
214
215namespace rpp::operators
216{
225 template<rpp::constraint::decayed_type Type, rpp::constraint::observer_strategy<Type> ObserverStrategy>
230
240 template<rpp::constraint::decayed_type Type, rpp::constraint::observer_strategy<Type> ObserverStrategy>
242 {
243 return details::subscribe_t{std::move(disposable), std::move(observer)};
244 }
245
251 template<rpp::constraint::decayed_type Type>
253 {
254 return details::subscribe_t{std::move(observer)};
255 }
256
262 template<rpp::constraint::observer_strategy_base ObserverStrategy>
264 auto subscribe(ObserverStrategy&& observer_strategy)
265 {
266 return details::subscribe_t{std::forward<ObserverStrategy>(observer_strategy)};
267 }
268
278 template<rpp::constraint::decayed_type Type>
280 {
281 return details::subscribe_t{std::move(disposable), std::move(observer)};
282 }
283
293 template<rpp::constraint::observer_strategy_base ObserverStrategy>
295 auto subscribe(rpp::composite_disposable_wrapper disposable, ObserverStrategy&& observer_strategy)
296 {
297 return details::subscribe_t{std::move(disposable), std::forward<ObserverStrategy>(observer_strategy)};
298 }
299
305 template<details::on_next_like OnNext = rpp::utils::empty_function_any_t, std::invocable<const std::exception_ptr&> OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
306 auto subscribe(OnNext&& on_next = {}, OnError&& on_error = {}, OnCompleted&& on_completed = {})
307 {
308 return details::subscribe_t{std::forward<OnNext>(on_next), std::forward<OnError>(on_error), std::forward<OnCompleted>(on_completed)};
309 }
310
316 template<details::on_next_like OnNext, std::invocable<> OnCompleted>
317 auto subscribe(OnNext&& on_next, OnCompleted&& on_completed)
318 {
319 return details::subscribe_t{std::forward<OnNext>(on_next), rpp::utils::rethrow_error_t{}, std::forward<OnCompleted>(on_completed)};
320 }
321
331 template<details::on_next_like OnNext = rpp::utils::empty_function_any_t, std::invocable<const std::exception_ptr&> OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
332 auto subscribe(rpp::composite_disposable_wrapper d, OnNext&& on_next = {}, OnError&& on_error = {}, OnCompleted&& on_completed = {})
333 {
334 return details::subscribe_t{std::move(d), std::forward<OnNext>(on_next), std::forward<OnError>(on_error), std::forward<OnCompleted>(on_completed)};
335 }
336
346 template<details::on_next_like OnNext, std::invocable<> OnCompleted>
347 auto subscribe(rpp::composite_disposable_wrapper d, OnNext&& on_next, OnCompleted&& on_completed)
348 {
349 return details::subscribe_t{std::move(d), std::forward<OnNext>(on_next), rpp::utils::rethrow_error_t{}, std::forward<OnCompleted>(on_completed)};
350 }
351
362 template<rpp::constraint::decayed_type Type, rpp::constraint::observer_strategy<Type> ObserverStrategy>
367
375 template<rpp::constraint::decayed_type Type>
380
388 template<details::on_next_like OnNext = rpp::utils::empty_function_any_t, std::invocable<const std::exception_ptr&> OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
389 auto subscribe_with_disposable(OnNext&& on_next = {}, OnError&& on_error = {}, OnCompleted&& on_completed = {})
390 {
391 return subscribe(composite_disposable_wrapper::make(), std::forward<OnNext>(on_next), std::forward<OnError>(on_error), std::forward<OnCompleted>(on_completed));
392 }
393
401 template<details::on_next_like OnNext, std::invocable<> OnCompleted>
402 auto subscribe_with_disposable(OnNext&& on_next, OnCompleted&& on_completed)
403 {
404 return subscribe(composite_disposable_wrapper::make(), std::forward<OnNext>(on_next), rpp::utils::rethrow_error_t{}, std::forward<OnCompleted>(on_completed));
405 }
406} // namespace rpp::operators
static disposable_wrapper_impl make(TArgs &&... args)
Definition disposable_wrapper.hpp:164
Type-erased version of the rpp::observer. Any observer can be converted to dynamic_observer via rpp::...
Definition dynamic_observer.hpp:129
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
void subscribe(observer< Type, ObserverStrategy > &&observer) const
Subscribes passed observer to emissions from this observable.
Definition observable.hpp:58
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition subscribe.hpp:27
Definition constraints.hpp:19
Concept defines requirements for an user-defined observer strategy.
Definition fwd.hpp:56
Definition fwd.hpp:250
Definition subscribe.hpp:212
Definition function_traits.hpp:45
disposable_wrapper_impl< interface_composite_disposable > composite_disposable_wrapper
Wrapper to keep "composite" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:41
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226
auto subscribe_with_disposable(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:363
Definition functors.hpp:54