ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
retry.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/strategy.hpp>
17
18namespace rpp::operators::details
19{
20 template<rpp::constraint::observer TObserver, constraint::decayed_type Observable>
21 struct retry_state_t final : public rpp::composite_disposable
22 {
23 retry_state_t(TObserver&& in_observer, const Observable& observable, std::optional<size_t> count)
24 : count{count}
25 , observer(std::move(in_observer))
26 , observable(observable)
27
28 {
29 }
30
31 std::optional<size_t> count;
32 std::atomic<bool> is_inside_drain{};
33
34 RPP_NO_UNIQUE_ADDRESS TObserver observer;
35 RPP_NO_UNIQUE_ADDRESS Observable observable;
36 };
37
38 template<rpp::constraint::observer TObserver, typename TObservable>
39 void drain(const std::shared_ptr<retry_state_t<TObserver, TObservable>>& state);
40
41 template<rpp::constraint::observer TObserver, typename TObservable>
43 {
44 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Boolean;
45
46 std::shared_ptr<retry_state_t<TObserver, TObservable>> state;
47
48 template<typename T>
49 void on_next(T&& v) const
50 {
51 state->observer.on_next(std::forward<T>(v));
52 }
53
54 void on_error(const std::exception_ptr& err) const
55 {
56 if (state->count == 0)
57 {
58 state->observer.on_error(err);
59 return;
60 }
61
62 state->clear();
63
64 if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
65 return;
66
67 drain(state);
68 }
69
70 void on_completed() const
71 {
72 state->observer.on_completed();
73 }
74
75 void set_upstream(const disposable_wrapper& d) const
76 {
77 state->add(d);
78 }
79
80 bool is_disposed() const { return state->is_disposed(); }
81 };
82
83 template<rpp::constraint::observer TObserver, typename TObservable>
84 void drain(const std::shared_ptr<retry_state_t<TObserver, TObservable>>& state)
85 {
86 while (!state->is_disposed())
87 {
88 if (state->count)
89 --state->count.value();
90 state->is_inside_drain.store(true, std::memory_order::seq_cst);
91 try
92 {
93 using value_type = rpp::utils::extract_observer_type_t<TObserver>;
94 state->observable.subscribe(observer<value_type, retry_observer_strategy<TObserver, TObservable>>{state});
95
96 if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
97 return;
98 }
99 catch (...)
100 {
101 state->observer.on_error(std::current_exception());
102 return;
103 }
104 }
105 }
106
107 struct retry_t
108 {
109 const std::optional<size_t> count{};
110
111 template<rpp::constraint::decayed_type T>
113 {
114 using result_type = T;
115 };
116
117 template<rpp::details::observables::constraint::disposables_strategy Prev>
118 using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>;
119
120 template<rpp::constraint::observer TObserver, typename TObservable>
121 void subscribe(TObserver&& observer, TObservable&& observble) const
122 {
123 const auto d = disposable_wrapper_impl<retry_state_t<std::decay_t<TObserver>, std::decay_t<TObservable>>>::make(std::forward<TObserver>(observer), std::forward<TObservable>(observble), count ? count.value() + 1 : count);
124 auto ptr = d.lock();
125
126 ptr->observer.set_upstream(d.as_weak());
127 drain(ptr);
128 }
129 };
130} // namespace rpp::operators::details
131
132namespace rpp::operators
133{
150 * @ingroup error_handling_operators
151 * @see https://reactivex.io/documentation/operators/retry.html
152 */
153 inline auto retry(size_t count)
154 {
155 return details::retry_t{count};
156 }
157
167 * @note `#include <rpp/operators/retry.hpp>`
168 *
169 * @par Examples:
170 * @snippet retry.cpp retry_infinitely
171 *
172 * @ingroup error_handling_operators
173 * @see https://reactivex.io/documentation/operators/retry.html
174 */
175 inline auto retry()
176 {
177 return details::retry_t{};
178 }
179} // namespace rpp::operators
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
auto retry()
The infinite retry operator continuously attempts to resubscribe to the observable upon error,...
Definition retry.hpp:167
Definition disposables_strategy.hpp:29
Definition retry.hpp:108