ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
retry.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/strategy.hpp>
17
18namespace rpp::operators::details
19{
20 template<rpp::constraint::observer TObserver, constraint::decayed_type Observable>
22 {
23 retry_state_t(TObserver&& in_observer, const Observable& observable, std::optional<size_t> count)
24 : count{count}
25 , observer(std::move(in_observer))
27
28 {
29 }
30
31 std::optional<size_t> count;
32 std::atomic<bool> is_inside_drain{};
33
34 RPP_NO_UNIQUE_ADDRESS TObserver observer;
35 RPP_NO_UNIQUE_ADDRESS Observable observable;
36 };
37
38 template<rpp::constraint::observer TObserver, typename TObservable>
39 void drain(const std::shared_ptr<retry_state_t<TObserver, TObservable>>& state);
40
41 template<rpp::constraint::observer TObserver, typename TObservable>
43 {
44 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
45
46 std::shared_ptr<retry_state_t<TObserver, TObservable>> state;
47 mutable bool locally_disposed{};
48
49 template<typename T>
50 void on_next(T&& v) const
51 {
52 state->observer.on_next(std::forward<T>(v));
53 }
54
55 void on_error(const std::exception_ptr& err) const
56 {
57 locally_disposed = true;
58 if (state->count == 0)
59 {
60 state->observer.on_error(err);
61 return;
62 }
63
64 state->clear();
65
66 if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
67 return;
68
69 drain(state);
70 }
71
72 void on_completed() const
73 {
74 locally_disposed = true;
75 state->observer.on_completed();
76 }
77
78 void set_upstream(const disposable_wrapper& d) const
79 {
80 state->add(d);
81 }
82
83 bool is_disposed() const { return locally_disposed || state->is_disposed(); }
84 };
85
86 template<rpp::constraint::observer TObserver, typename TObservable>
87 void drain(const std::shared_ptr<retry_state_t<TObserver, TObservable>>& state)
88 {
89 while (!state->is_disposed())
90 {
91 if (state->count)
92 --state->count.value();
93 state->is_inside_drain.store(true, std::memory_order::seq_cst);
94 try
95 {
96 using value_type = rpp::utils::extract_observer_type_t<TObserver>;
97 state->observable.subscribe(observer<value_type, retry_observer_strategy<TObserver, TObservable>>{state});
98
99 if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
100 return;
101 }
102 catch (...)
103 {
104 state->observer.on_error(std::current_exception());
105 return;
106 }
107 }
108 }
109
110 struct retry_t
111 {
112 const std::optional<size_t> count{};
113
114 template<rpp::constraint::decayed_type T>
116 {
117 using result_type = T;
118 };
119
120 template<rpp::details::observables::constraint::disposables_strategy Prev>
122
123 template<rpp::constraint::observer TObserver, typename TObservable>
124 void subscribe(TObserver&& observer, TObservable&& observble) const
125 {
126 const auto d = disposable_wrapper_impl<retry_state_t<std::decay_t<TObserver>, std::decay_t<TObservable>>>::make(std::forward<TObserver>(observer), std::forward<TObservable>(observble), count ? count.value() + 1 : count);
127 auto ptr = d.lock();
128
129 ptr->observer.set_upstream(d.as_weak());
130 drain(ptr);
131 }
132 };
133} // namespace rpp::operators::details
134
135namespace rpp::operators
136{
156 inline auto retry(size_t count)
157 {
158 return details::retry_t{count};
159 }
160
178 inline auto retry()
179 {
180 return details::retry_t{};
181 }
182} // namespace rpp::operators
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
auto retry()
The infinite retry operator continuously attempts to resubscribe to the observable upon error,...
Definition retry.hpp:170
Definition disposables_strategy.hpp:29
Definition retry.hpp:111