ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
repeat_when.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/observables/fwd.hpp>
14#include <rpp/operators/fwd.hpp>
15
16#include <rpp/operators/details/repeating_strategy.hpp>
17
18namespace rpp::operators::details
19{
20 template<rpp::constraint::observer TObserver,
21 typename TObservable,
22 typename TNotifier>
23 struct repeat_when_impl_strategy final : public repeating_observer_strategy<TObserver, TObservable, TNotifier>
24 {
26
27 using repeating_observer_strategy<TObserver, TObservable, TNotifier>::state;
28
29 template<typename T>
30 void on_next(T&& v) const
31 {
32 state->observer.on_next(std::forward<T>(v));
33 }
34
35 void on_error(const std::exception_ptr& err) const
36 {
37 state->observer.on_error(err);
38 }
39
40 void on_completed() const
41 {
42 try
43 {
45 }
46 catch (...)
47 {
48 state->observer.on_error(std::current_exception());
49 }
50 }
51 };
52
53 template<rpp::constraint::decayed_type TNotifier>
55 {
56 RPP_NO_UNIQUE_ADDRESS TNotifier notifier;
57
58 template<rpp::constraint::decayed_type T>
60 {
61 using result_type = T;
62 };
63
64 template<rpp::details::observables::constraint::disposables_strategy Prev>
65 using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>;
66
67 template<rpp::constraint::observer TObserver, typename TObservable>
68 void subscribe(TObserver&& observer, TObservable&& observable) const
69 {
70 const auto d = disposable_wrapper_impl<repeating_state<std::decay_t<TObserver>, std::decay_t<TObservable>, std::decay_t<TNotifier>>>::make(std::forward<TObserver>(observer), std::forward<TObservable>(observable), notifier);
71 auto ptr = d.lock();
72
73 ptr->observer.set_upstream(d.as_weak());
74
75 drain<repeat_when_impl_strategy<std::decay_t<TObserver>, std::decay_t<TObservable>, std::decay_t<TNotifier>>>(ptr);
76 }
77 };
78} // namespace rpp::operators::details
79
80namespace rpp::operators
81{
94 template<typename TNotifier>
96 auto repeat_when(TNotifier&& notifier)
97 {
98 return details::repeat_when_t<std::decay_t<TNotifier>>{std::forward<TNotifier>(notifier)};
99 }
100} // namespace rpp::operators
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition fwd.hpp:80
Definition disposables_strategy.hpp:29
Definition repeat_when.hpp:55
Definition repeating_strategy.hpp:91