ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
on_error_resume_next.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/strategy.hpp>
17
18namespace rpp::operators::details
19{
20 template<rpp::constraint::observer TObserver>
22 {
25 , observer(std::move(observer))
26 {
27 }
28
29 RPP_NO_UNIQUE_ADDRESS TObserver observer;
30 };
31
32 template<rpp::constraint::observer TObserver>
34 {
35 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
36
37 std::shared_ptr<TObserver> observer;
38
39 template<typename T>
40 void on_next(T&& v) const
41 {
42 observer->on_next(std::forward<T>(v));
43 }
44
45 void on_error(const std::exception_ptr& err) const
46 {
47 observer->on_error(err);
48 }
49
50 void on_completed() const
51 {
53 }
54
55 void set_upstream(const disposable_wrapper& d) { observer->set_upstream(d); }
56
57 bool is_disposed() const { return observer->is_disposed(); }
58 };
59
60
61 template<rpp::constraint::observer TObserver, rpp::constraint::decayed_type Selector>
63 {
64 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
65
66 on_error_resume_next_observer_strategy(TObserver&& observer, const Selector& selector)
67 : state{init_state(std::move(observer))}
68 , selector{selector}
69 {
70 }
71
72 std::shared_ptr<on_error_resume_next_disposable<TObserver>> state;
73 RPP_NO_UNIQUE_ADDRESS Selector selector;
74
75 template<typename T>
76 void on_next(T&& v) const
77 {
78 state->observer.on_next(std::forward<T>(v));
79 }
80
81 void on_error(const std::exception_ptr& err) const
82 {
83 try
84 {
85 selector(err).subscribe(on_error_resume_next_inner_observer_strategy<TObserver>{std::shared_ptr<TObserver>(state, &state->observer)});
86 }
87 catch (...)
88 {
89 state->observer.on_error(std::current_exception());
90 }
91 state->dispose();
92 }
93
94 void on_completed() const
95 {
96 state->observer.on_completed();
97 }
98
99 void set_upstream(const disposable_wrapper& d) const
100 {
101 state->add(d);
102 }
103
104 bool is_disposed() const { return state->is_disposed(); }
105
106 static std::shared_ptr<on_error_resume_next_disposable<TObserver>> init_state(TObserver&& observer)
107 {
109 auto ptr = d.lock();
110 ptr->observer.set_upstream(d.as_weak());
111 return ptr;
112 }
113 };
114
115 template<rpp::constraint::decayed_type Selector>
116 struct on_error_resume_next_t : lift_operator<on_error_resume_next_t<Selector>, Selector>
117 {
118 using lift_operator<on_error_resume_next_t<Selector>, Selector>::lift_operator;
119
120 template<rpp::constraint::decayed_type T>
122 {
123 using selector_observable_result_type =
124 rpp::utils::extract_observable_type_t<std::invoke_result_t<Selector, std::exception_ptr>>;
125
126 static_assert(
128 "Selector observable result type is not the same as T");
129
130 using result_type = T;
131
132 template<rpp::constraint::observer_of_type<result_type> TObserver>
134 };
135
136 template<rpp::details::observables::constraint::disposables_strategy Prev>
138 };
139} // namespace rpp::operators::details
140
141namespace rpp::operators
142{
159 template<typename Selector>
161 auto on_error_resume_next(Selector&& selector)
162 {
163 return details::on_error_resume_next_t<std::decay_t<Selector>>{std::forward<Selector>(selector)};
164 }
165} // namespace rpp::operators
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
void on_next(const Type &v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:84
void set_upstream(const disposable_wrapper &d) noexcept
Observable calls this method to pass disposable. Observer disposes this disposable WHEN observer want...
Definition observer.hpp:49
void on_completed() const noexcept
Observable calls this method to notify observer about completion of emissions.
Definition observer.hpp:135
void on_error(const std::exception_ptr &err) const noexcept
Observable calls this method to notify observer about some error during generation next data.
Definition observer.hpp:120
bool is_disposed() const noexcept
Observable calls this method to check if observer interested or not in emissions.
Definition observer.hpp:74
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition strategy.hpp:28
Definition constraints.hpp:19
Definition fwd.hpp:80
auto on_error_resume_next(Selector &&selector)
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition on_error_resume_next.hpp:157
Definition disposables_strategy.hpp:19
Definition on_error_resume_next.hpp:22
Definition on_error_resume_next.hpp:117