ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
retry_when.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/observables/fwd.hpp>
14#include <rpp/operators/fwd.hpp>
15
16#include <rpp/defs.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/utils/constraints.hpp>
19#include <rpp/utils/utils.hpp>
20
21namespace rpp::operators::details
22{
23 template<rpp::constraint::observer TObserver,
24 typename TObservable,
25 typename TNotifier>
26 struct retry_when_state final : public rpp::composite_disposable
27 {
28 retry_when_state(TObserver&& observer, const TObservable& observable, const TNotifier& notifier)
29 : observer(std::move(observer))
30 , observable(observable)
31 , notifier(notifier)
32 {
33 }
34
35 std::atomic_bool is_inside_drain{};
36
37 RPP_NO_UNIQUE_ADDRESS TObserver observer;
38 RPP_NO_UNIQUE_ADDRESS TObservable observable;
39 RPP_NO_UNIQUE_ADDRESS TNotifier notifier;
40 };
41
42 template<rpp::constraint::observer TObserver, typename TObservable, typename TNotifier>
43 void drain(const std::shared_ptr<retry_when_state<TObserver, TObservable, TNotifier>>& state);
44
45 template<rpp::constraint::observer TObserver,
46 typename TObservable,
47 typename TNotifier>
49 {
50 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
51
52 std::shared_ptr<retry_when_state<TObserver, TObservable, TNotifier>> state;
53 mutable bool locally_disposed{};
54
55 template<typename T>
56 void on_next(T&&) const
57 {
58 locally_disposed = true;
59 state->clear();
60
61 if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
62 return;
63
64 drain<TObserver, TObservable, TNotifier>(state);
65 }
66
67 void on_error(const std::exception_ptr& err) const
68 {
69 locally_disposed = true;
70 state->observer.on_error(err);
71 }
72
73 void on_completed() const
74 {
75 locally_disposed = true;
76 state->observer.on_completed();
77 }
78
79 void set_upstream(const disposable_wrapper& d) const { state->add(d); }
80
81 bool is_disposed() const { return locally_disposed || state->is_disposed(); }
82 };
83
84 template<rpp::constraint::observer TObserver,
85 typename TObservable,
86 typename TNotifier>
88 {
89 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
90
91 std::shared_ptr<retry_when_state<TObserver, TObservable, TNotifier>> state;
92
93 template<typename T>
94 void on_next(T&& v) const
95 {
96 state->observer.on_next(std::forward<T>(v));
97 }
98
99 void on_error(const std::exception_ptr& err) const
100 {
101 try
102 {
103 state->notifier(err).subscribe(retry_when_impl_inner_strategy<TObserver, TObservable, TNotifier>{state});
104 }
105 catch (...)
106 {
107 state->observer.on_error(std::current_exception());
108 }
109 }
110
111 void on_completed() const
112 {
113 state->observer.on_completed();
114 }
115
116 void set_upstream(const disposable_wrapper& d) { state->add(d); }
117
118 bool is_disposed() const { return state->is_disposed(); }
119 };
120
121 template<rpp::constraint::observer TObserver, typename TObservable, typename TNotifier>
122 void drain(const std::shared_ptr<retry_when_state<TObserver, TObservable, TNotifier>>& state)
123 {
124 while (!state->is_disposed())
125 {
126 state->is_inside_drain.store(true, std::memory_order::seq_cst);
127 try
128 {
129 using value_type = rpp::utils::extract_observer_type_t<TObserver>;
130 state->observable.subscribe(rpp::observer<value_type, retry_when_impl_strategy<TObserver, TObservable, TNotifier>>{state});
131
132 if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
133 return;
134 }
135 catch (...)
136 {
137 state->observer.on_error(std::current_exception());
138 return;
139 }
140 }
141 }
142
143 template<rpp::constraint::decayed_type TNotifier>
145 {
146 RPP_NO_UNIQUE_ADDRESS TNotifier notifier;
147
148 template<rpp::constraint::decayed_type T>
150 {
151 using result_type = T;
152 };
153
154 template<rpp::details::observables::constraint::disposables_strategy Prev>
155 using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>;
156
157 template<rpp::constraint::observer TObserver, typename TObservable>
158 void subscribe(TObserver&& observer, TObservable&& observable) const
159 {
160 const auto d = disposable_wrapper_impl<retry_when_state<std::decay_t<TObserver>, std::decay_t<TObservable>, std::decay_t<TNotifier>>>::make(std::forward<TObserver>(observer), std::forward<TObservable>(observable), notifier);
161 auto ptr = d.lock();
162
163 ptr->observer.set_upstream(d.as_weak());
164 drain(ptr);
165 }
166 };
167} // namespace rpp::operators::details
168
169namespace rpp::operators
170{
192 template<typename TNotifier>
194 auto retry_when(TNotifier&& notifier)
195 {
196 return details::retry_when_t<std::decay_t<TNotifier>>{std::forward<TNotifier>(notifier)};
197 }
198} // namespace rpp::operators
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition fwd.hpp:80
Definition fwd.hpp:250
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition disposables_strategy.hpp:29
Definition retry_when.hpp:27
Definition retry_when.hpp:145