13#include <rpp/operators/fwd.hpp>
15#include <rpp/defs.hpp>
16#include <rpp/disposables/composite_disposable.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/utils/utils.hpp>
22namespace rpp::operators::details
24 template<rpp::constra
int::observer TObserver>
28 template<rpp::constra
int::decayed_same_as<TObserver> TObs>
31 : m_observer_with_mutex{std::forward<TObs>(obs)}
38 rpp::utils::pointer_under_lock<TObserver> get_observer() {
return m_observer_with_mutex; }
40 rpp::utils::pointer_under_lock<rpp::composite_disposable_wrapper> get_inner_child_disposable() {
return m_inner_child_disposable; }
43 void base_dispose_impl(interface_disposable::Mode)
noexcept override
45 get_base_child_disposable().
dispose();
46 get_inner_child_disposable()->dispose();
55 template<rpp::constra
int::observer TObserver>
59 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
63 , m_refcounted{std::move(refcounted)}
68 void on_next(T&& v)
const
70 m_state->get_observer()->on_next(std::forward<T>(v));
73 void on_error(
const std::exception_ptr& err)
const
75 m_state->get_observer()->on_error(err);
78 void on_completed()
const
80 m_refcounted.dispose();
81 if (m_state->get_base_child_disposable().is_disposed())
82 m_state->get_observer()->on_completed();
86 bool is_disposed()
const {
return m_refcounted.is_disposed(); }
89 std::shared_ptr<switch_on_next_state_t<TObserver>> m_state;
93 template<rpp::constra
int::observer TObserver>
97 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
100 : m_state{init_state(std::move(obs))}
108 void on_next(T&& v)
const
112 auto inner = m_state->get_inner_child_disposable();
114 if (m_state->is_disposed())
122 void on_error(
const std::exception_ptr& err)
const
124 m_state->get_observer()->on_error(err);
127 void on_completed()
const
129 m_state->get_base_child_disposable().dispose();
130 if (m_state->get_inner_child_disposable()->is_disposed())
131 m_state->get_observer()->on_completed();
134 void set_upstream(
const disposable_wrapper& d)
const { m_state->get_base_child_disposable().add(d); }
135 bool is_disposed()
const {
return m_state->get_base_child_disposable().is_disposed(); }
138 static std::shared_ptr<switch_on_next_state_t<TObserver>> init_state(TObserver&&
observer)
142 ptr->get_observer()->set_upstream(d.as_weak());
147 std::shared_ptr<switch_on_next_state_t<TObserver>> m_state;
154 template<rpp::constra
int::decayed_type T>
159 using result_type = rpp::utils::extract_observable_type_t<T>;
161 template<rpp::constra
int::observer_of_type<result_type> TObserver>
165 template<rpp::details::observables::constra
int::disposables_strategy Prev>
170namespace rpp::operators
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Definition base_disposable.hpp:23
static disposable_wrapper_impl make(TArgs &&... args)
Definition disposable_wrapper.hpp:164
static disposable_wrapper_impl empty()
Definition disposable_wrapper.hpp:178
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition strategy.hpp:28
Definition switch_on_next.hpp:57
Definition switch_on_next.hpp:95
Definition switch_on_next.hpp:26
Definition constraints.hpp:19
auto switch_on_next()
Converts observable of observables into observable of values which emits values from most recent unde...
Definition switch_on_next.hpp:187
Definition disposables_strategy.hpp:29
void dispose() noexcept
Dispose disposable and free any underlying resources and etc.
Definition interface_disposable.hpp:36
Definition switch_on_next.hpp:156
Definition switch_on_next.hpp:151