ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
switch_on_next.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/disposables/composite_disposable.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/utils/utils.hpp>
19
20#include <array>
21
22namespace rpp::operators::details
23{
24 template<rpp::constraint::observer TObserver>
26 {
27 public:
28 template<rpp::constraint::decayed_same_as<TObserver> TObs>
30 switch_on_next_state_t(TObs&& obs)
31 : m_observer_with_mutex{std::forward<TObs>(obs)}
32 {
33 }
34
37
38 rpp::utils::pointer_under_lock<TObserver> get_observer() { return m_observer_with_mutex; }
39 rpp::composite_disposable& get_base_child_disposable() { return m_base_child_disposable; }
40 rpp::utils::pointer_under_lock<rpp::composite_disposable_wrapper> get_inner_child_disposable() { return m_inner_child_disposable; }
41
42 private:
43 void base_dispose_impl(interface_disposable::Mode) noexcept override
44 {
45 get_base_child_disposable().dispose();
46 get_inner_child_disposable()->dispose();
47 }
48
49 private:
50 rpp::utils::value_with_mutex<TObserver> m_observer_with_mutex{};
51 rpp::composite_disposable m_base_child_disposable{};
53 };
54
55 template<rpp::constraint::observer TObserver>
57 {
58 public:
59 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
60
62 : m_state{state}
63 , m_refcounted{std::move(refcounted)}
64 {
65 }
66
67 template<typename T>
68 void on_next(T&& v) const
69 {
70 m_state->get_observer()->on_next(std::forward<T>(v));
71 }
72
73 void on_error(const std::exception_ptr& err) const
74 {
75 m_state->get_observer()->on_error(err);
76 }
77
78 void on_completed() const
79 {
80 m_refcounted.dispose();
81 if (m_state->get_base_child_disposable().is_disposed())
82 m_state->get_observer()->on_completed();
83 }
84
85 void set_upstream(const disposable_wrapper& d) const { m_refcounted.add(d); }
86 bool is_disposed() const { return m_refcounted.is_disposed(); }
87
88 private:
89 std::shared_ptr<switch_on_next_state_t<TObserver>> m_state;
91 };
92
93 template<rpp::constraint::observer TObserver>
95 {
96 public:
97 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
98
100 : m_state{init_state(std::move(obs))}
101 {
102 }
103
106
107 template<typename T>
108 void on_next(T&& v) const
109 {
111 {
112 auto inner = m_state->get_inner_child_disposable();
113 inner->dispose();
114 if (m_state->is_disposed())
115 return;
116
117 *inner = new_inner;
118 }
119 std::forward<T>(v).subscribe(switch_on_next_inner_observer_strategy<TObserver>{m_state, std::move(new_inner)});
120 }
121
122 void on_error(const std::exception_ptr& err) const
123 {
124 m_state->get_observer()->on_error(err);
125 }
126
127 void on_completed() const
128 {
129 m_state->get_base_child_disposable().dispose();
130 if (m_state->get_inner_child_disposable()->is_disposed())
131 m_state->get_observer()->on_completed();
132 }
133
134 void set_upstream(const disposable_wrapper& d) const { m_state->get_base_child_disposable().add(d); }
135 bool is_disposed() const { return m_state->get_base_child_disposable().is_disposed(); }
136
137 private:
138 static std::shared_ptr<switch_on_next_state_t<TObserver>> init_state(TObserver&& observer)
139 {
141 auto ptr = d.lock();
142 ptr->get_observer()->set_upstream(d.as_weak());
143 return ptr;
144 }
145
146 private:
147 std::shared_ptr<switch_on_next_state_t<TObserver>> m_state;
148 };
149
150 struct switch_on_next_t : lift_operator<switch_on_next_t>
151 {
152 using lift_operator<switch_on_next_t>::lift_operator;
153
154 template<rpp::constraint::decayed_type T>
156 {
157 static_assert(rpp::constraint::observable<T>, "T is not observable");
158
159 using result_type = rpp::utils::extract_observable_type_t<T>;
160
161 template<rpp::constraint::observer_of_type<result_type> TObserver>
163 };
164
165 template<rpp::details::observables::constraint::disposables_strategy Prev>
167 };
168} // namespace rpp::operators::details
169
170namespace rpp::operators
171{
196 inline auto switch_on_next()
197 {
199 }
200} // namespace rpp::operators
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Definition base_disposable.hpp:23
static disposable_wrapper_impl make(TArgs &&... args)
Definition disposable_wrapper.hpp:164
static disposable_wrapper_impl empty()
Definition disposable_wrapper.hpp:178
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition strategy.hpp:28
Definition switch_on_next.hpp:26
Definition utils.hpp:260
Definition constraints.hpp:19
Definition fwd.hpp:80
auto switch_on_next()
Converts observable of observables into observable of values which emits values from most recent unde...
Definition switch_on_next.hpp:187
Definition disposables_strategy.hpp:29
void dispose() noexcept
Dispose disposable and free any underlying resources and etc.
Definition interface_disposable.hpp:36
Definition switch_on_next.hpp:151