ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
repeating_strategy.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/observables/fwd.hpp>
14#include <rpp/operators/fwd.hpp>
15
16#include <rpp/defs.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/utils/constraints.hpp>
19#include <rpp/utils/utils.hpp>
20
21#include "rpp/observers/fwd.hpp"
22
23namespace rpp::operators::details
24{
25 template<rpp::constraint::observer TObserver,
26 typename TObservable,
27 typename TNotifier>
28 struct repeating_state final : public rpp::composite_disposable
29 {
30 repeating_state(TObserver&& observer, const TObservable& observable, const TNotifier& notifier)
31 : observer(std::move(observer))
32 , observable(observable)
33 , notifier(notifier)
34 {
35 }
36
37 std::atomic_bool is_inside_drain{};
38
39 RPP_NO_UNIQUE_ADDRESS TObserver observer;
40 RPP_NO_UNIQUE_ADDRESS TObservable observable;
41 RPP_NO_UNIQUE_ADDRESS TNotifier notifier;
42 };
43
44 template<typename TStrategy, rpp::constraint::observer TObserver, typename TObservable, typename TNotifier>
45 void drain(const std::shared_ptr<repeating_state<TObserver, TObservable, TNotifier>>& state);
46
47 template<typename TOuterStrategy,
49 typename TObservable,
50 typename TNotifier>
52 {
53 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
54
55 std::shared_ptr<repeating_state<TObserver, TObservable, TNotifier>> state;
56 mutable bool locally_disposed{};
57
58 template<typename T>
59 void on_next(T&&) const
60 {
61 locally_disposed = true;
62 state->clear();
63
64 if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
65 return;
66
67 drain<TOuterStrategy>(state);
68 }
69
70 void on_error(const std::exception_ptr& err) const
71 {
72 locally_disposed = true;
73 state->observer.on_error(err);
74 }
75
76 void on_completed() const
77 {
78 locally_disposed = true;
79 state->observer.on_completed();
80 }
81
82 void set_upstream(const disposable_wrapper& d) const { state->add(d); }
83
84 bool is_disposed() const { return locally_disposed || state->is_disposed(); }
85 };
86
87 template<rpp::constraint::observer TObserver,
88 typename TObservable,
89 typename TNotifier>
91 {
92 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
93
94 std::shared_ptr<repeating_state<TObserver, TObservable, TNotifier>> state;
95
96 void set_upstream(const disposable_wrapper& d) { state->add(d); }
97
98 bool is_disposed() const { return state->is_disposed(); }
99 };
100
101 template<typename TStrategy, rpp::constraint::observer TObserver, typename TObservable, typename TNotifier>
102 void drain(const std::shared_ptr<repeating_state<TObserver, TObservable, TNotifier>>& state)
103 {
104 while (!state->is_disposed())
105 {
106 state->is_inside_drain.store(true, std::memory_order::seq_cst);
107 try
108 {
109 using value_type = rpp::utils::extract_observer_type_t<TObserver>;
110 state->observable.subscribe(rpp::observer<value_type, TStrategy>{state});
111
112 if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
113 return;
114 }
115 catch (...)
116 {
117 state->observer.on_error(std::current_exception());
118 return;
119 }
120 }
121 }
122} // namespace rpp::operators::details
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition fwd.hpp:250
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition repeating_strategy.hpp:91
Definition repeating_strategy.hpp:29