ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
tap.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/strategy.hpp>
17
18namespace rpp::operators::details
19{
20 template<
21 rpp::constraint::observer TObserver,
22 rpp::constraint::decayed_type OnNext,
23 rpp::constraint::decayed_type OnError,
24 rpp::constraint::decayed_type OnCompleted>
26 {
27 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
28
29 RPP_NO_UNIQUE_ADDRESS TObserver observer;
30 RPP_NO_UNIQUE_ADDRESS OnNext onNext;
31 RPP_NO_UNIQUE_ADDRESS OnError onError;
32 RPP_NO_UNIQUE_ADDRESS OnCompleted onCompleted;
33
34 template<typename T>
35 void on_next(T&& v) const
36 {
37 onNext(utils::as_const(v));
38 observer.on_next(std::forward<T>(v));
39 }
40
41 void on_error(const std::exception_ptr& err) const
42 {
43 onError(err);
44 observer.on_error(err);
45 }
46
47 void on_completed() const
48 {
49 onCompleted();
50 observer.on_completed();
51 }
52
53 void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); }
54
55 bool is_disposed() const { return observer.is_disposed(); }
56 };
57
58 template<
62 struct tap_t : public operators::details::lift_operator<tap_t<OnNext, OnError, OnCompleted>, OnNext, OnError, OnCompleted>
63 {
64 using operators::details::lift_operator<tap_t<OnNext, OnError, OnCompleted>, OnNext, OnError, OnCompleted>::lift_operator;
65
66 template<rpp::constraint::decayed_type T>
68 {
69 static_assert(rpp::constraint::invocable_r_v<void, OnNext, T>, "OnNext is not invocable with T");
70
71 using result_type = T;
72
73 template<rpp::constraint::observer_of_type<result_type> TObserver>
75 };
76
77 template<rpp::details::observables::constraint::disposables_strategy Prev>
78 using updated_optimal_disposables_strategy = Prev;
79 };
80} // namespace rpp::operators::details
81
82namespace rpp::operators
83{
92 template<std::invocable<const std::exception_ptr&> OnError /* = rpp::utils::empty_function_t<std::exception_ptr> */>
94 auto tap(OnError&& on_error)
95 {
97 using OnCompleted = rpp::utils::empty_function_t<>;
98
99 return details::tap_t<std::decay_t<OnNext>, std::decay_t<OnError>, std::decay_t<OnCompleted>>{
100 OnNext{},
101 std::forward<OnError>(on_error),
102 OnCompleted{}};
103 }
104
113 template<std::invocable<> OnCompleted /* = rpp::utils::empty_function_t<> */>
114 auto tap(OnCompleted&& on_completed)
115 {
118
119 return details::tap_t<std::decay_t<OnNext>, std::decay_t<OnError>, std::decay_t<OnCompleted>>{
120 OnNext{},
121 OnError{},
122 std::forward<OnCompleted>(on_completed)};
123 }
124
134 template<typename OnNext,
135 std::invocable<> OnCompleted /* = rpp::utils::empty_function_t<> */>
136 auto tap(OnNext&& on_next,
137 OnCompleted&& on_completed)
138 {
140
141 return details::tap_t<std::decay_t<OnNext>, std::decay_t<OnError>, std::decay_t<OnCompleted>>{
142 std::forward<OnNext>(on_next),
143 OnError{},
144 std::forward<OnCompleted>(on_completed)};
145 }
146
157 template<typename OnNext /* = rpp::utils::empty_function_any_t */,
158 std::invocable<const std::exception_ptr&> OnError /* = rpp::utils::empty_function_t<std::exception_ptr> */,
159 std::invocable<> OnCompleted /* = rpp::utils::empty_function_t<> */>
161 auto tap(OnNext&& on_next /* = {} */,
162 OnError&& on_error /* = {} */,
163 OnCompleted&& on_completed /* = {} */)
164 {
165 return details::tap_t<std::decay_t<OnNext>, std::decay_t<OnError>, std::decay_t<OnCompleted>>{
166 std::forward<OnNext>(on_next),
167 std::forward<OnError>(on_error),
168 std::forward<OnCompleted>(on_completed)};
169 }
170} // namespace rpp::operators
Definition strategy.hpp:28
Definition constraints.hpp:22
Definition constraints.hpp:50
Definition function_traits.hpp:45
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
auto tap(OnError &&on_error)
Register callbacks to inspect observable emissions and perform side-effects.
Definition tap.hpp:94
Definition tap.hpp:63
Definition functors.hpp:38
Definition functors.hpp:28