13#include <rpp/operators/fwd.hpp>
15#include <rpp/defs.hpp>
16#include <rpp/disposables/refcount_disposable.hpp>
17#include <rpp/operators/details/forwarding_subject.hpp>
18#include <rpp/operators/details/strategy.hpp>
19#include <rpp/schedulers/current_thread.hpp>
20#include <rpp/utils/utils.hpp>
26 template<constra
int::decayed_type Type>
27 using window_toggle_observable =
decltype(std::declval<rpp::operators::details::forwarding_subject<Type>>().get_observable());
30namespace rpp::operators::details
32 template<rpp::constra
int::observer TObserver,
typename TClosingsSelectorFn>
35 using Observable = rpp::utils::extract_observer_type_t<TObserver>;
36 using value_type = rpp::utils::extract_observable_type_t<Observable>;
39 static_assert(std::same_as<Observable, decltype(std::declval<Subject>().get_observable())>);
43 RPP_NO_UNIQUE_ADDRESS TObserver
observer;
44 std::list<decltype(std::declval<Subject>().get_observer())> observers{};
49 , m_closings{closings}
53 rpp::utils::pointer_under_lock<state_t> get_state_under_lock() {
return rpp::utils::pointer_under_lock<state_t>{m_state}; }
56 auto get_closing(T&& v)
const
58 return m_closings(std::forward<T>(v));
61 auto on_new_subject(
const Subject& subject)
63 auto locked_state = get_state_under_lock();
64 const auto itr = locked_state->observers.insert(locked_state->observers.cend(), subject.get_observer());
65 locked_state->observer.on_next(subject.get_observable());
71 RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn m_closings;
74 template<rpp::constra
int::decayed_type TState>
77 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
79 std::shared_ptr<rpp::refcount_disposable> disposable;
80 std::shared_ptr<TState> state;
82 decltype(std::declval<TState>().on_new_subject(std::declval<typename TState::Subject>())) itr;
84 void on_next(
const auto&)
const
89 void on_error(
const std::exception_ptr& err)
const
91 auto locked_state = state->get_state_under_lock();
92 for (
const auto& obs : locked_state->observers)
94 locked_state->observer.on_error(err);
97 void on_completed()
const
99 disposable->remove(this_disposable);
102 this_disposable.dispose();
104 auto locked_state = state->get_state_under_lock();
105 locked_state->observers.erase(itr);
109 bool is_disposed()
const {
return this_disposable.is_disposed(); }
112 template<rpp::constra
int::decayed_type TState>
115 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto;
117 std::shared_ptr<rpp::refcount_disposable> disposable;
118 std::shared_ptr<TState> state;
121 void on_next(T&& v)
const
123 typename TState::Subject subject{disposable->wrapper_from_this()};
124 const auto itr = state->on_new_subject(subject);
126 disposable->add(subject.get_disposable());
130 void on_error(
const std::exception_ptr& err)
const
132 const auto locked_state = state->get_state_under_lock();
133 for (
const auto& obs : locked_state->observers)
135 locked_state->observer.on_error(err);
138 static void on_completed() {}
140 static bool is_disposed() {
return false; }
143 template<rpp::constra
int::observer TObserver, rpp::constra
int::observable TOpeningsObservable,
typename TClosingsSelectorFn>
150 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
153 : m_state{std::make_shared<TState>(std::move(
observer), closings)}
155 m_state->get_state_under_lock()->observer.set_upstream(m_disposable->add_ref());
159 void on_next(
const auto& v)
const
161 const auto locked_state = m_state->get_state_under_lock();
162 for (
const auto& obs : locked_state->observers)
166 void on_error(
const std::exception_ptr& err)
const
168 const auto locked_state = m_state->get_state_under_lock();
169 for (
const auto& obs : locked_state->observers)
171 locked_state->observer.on_error(err);
174 void on_completed()
const
176 const auto locked_state = m_state->get_state_under_lock();
177 for (
const auto& obs : locked_state->observers)
179 locked_state->observer.on_completed();
184 bool is_disposed()
const {
return m_disposable->is_disposed(); }
188 std::shared_ptr<TState> m_state;
191 template<rpp::constra
int::observable TOpeningsObservable,
typename TClosingsSelectorFn>
197 template<rpp::constra
int::decayed_type T>
200 using result_type = rpp::window_toggle_observable<T>;
202 constexpr static bool own_current_queue =
true;
204 template<rpp::constra
int::observer_of_type<result_type> TObserver>
208 template<rpp::details::observables::constra
int::disposables_strategy Prev>
213namespace rpp::operators
244 template<rpp::constra
int::observable TOpeningsObservable,
typename TClosingsSelectorFn>
246 auto window_toggle(TOpeningsObservable&& openings, TClosingsSelectorFn&& closings_selector)
static disposable_wrapper_impl make(TArgs &&... args)
Main way to create disposable_wrapper. Passed TTarget type can be any type derived from TDisposable.
Definition disposable_wrapper.hpp:164
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition forwarding_subject.hpp:26
Definition strategy.hpp:28
Definition window_toggle.hpp:146
Definition disposables_strategy.hpp:29
Definition window_toggle.hpp:76
Definition window_toggle.hpp:114
Definition window_toggle.hpp:42
Definition window_toggle.hpp:34
Definition window_toggle.hpp:199
Definition window_toggle.hpp:194