ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
retry_when.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/observables/fwd.hpp>
14#include <rpp/operators/fwd.hpp>
15
16#include <rpp/operators/details/repeating_strategy.hpp>
17
18namespace rpp::operators::details
19{
20 template<rpp::constraint::observer TObserver,
21 typename TObservable,
22 typename TNotifier>
23 struct retry_when_impl_strategy final : public repeating_observer_strategy<TObserver, TObservable, TNotifier>
24 {
26
27 using repeating_observer_strategy<TObserver, TObservable, TNotifier>::state;
28
29 template<typename T>
30 void on_next(T&& v) const
31 {
32 state->observer.on_next(std::forward<T>(v));
33 }
34
35 void on_error(const std::exception_ptr& err) const
36 {
37 try
38 {
40 }
41 catch (...)
42 {
43 state->observer.on_error(std::current_exception());
44 }
45 }
46
47 void on_completed() const
48 {
49 state->observer.on_completed();
50 }
51 };
52
53 template<rpp::constraint::decayed_type TNotifier>
55 {
56 RPP_NO_UNIQUE_ADDRESS TNotifier notifier;
57
58 template<rpp::constraint::decayed_type T>
60 {
61 using result_type = T;
62 };
63
64 template<rpp::details::observables::constraint::disposables_strategy Prev>
65 using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>;
66
67 template<rpp::constraint::observer TObserver, typename TObservable>
68 void subscribe(TObserver&& observer, TObservable&& observable) const
69 {
70 const auto d = disposable_wrapper_impl<repeating_state<std::decay_t<TObserver>, std::decay_t<TObservable>, std::decay_t<TNotifier>>>::make(std::forward<TObserver>(observer), std::forward<TObservable>(observable), notifier);
71 auto ptr = d.lock();
72
73 ptr->observer.set_upstream(d.as_weak());
74
75 drain<retry_when_impl_strategy<std::decay_t<TObserver>, std::decay_t<TObservable>, std::decay_t<TNotifier>>>(ptr);
76 }
77 };
78} // namespace rpp::operators::details
79
80namespace rpp::operators
81{
103 template<typename TNotifier>
105 auto retry_when(TNotifier&& notifier)
106 {
107 return details::retry_when_t<std::decay_t<TNotifier>>{std::forward<TNotifier>(notifier)};
108 }
109} // namespace rpp::operators
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition fwd.hpp:80
Definition disposables_strategy.hpp:29
Definition repeating_strategy.hpp:91
Definition retry_when.hpp:55