ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
window_toggle.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/disposables/refcount_disposable.hpp>
17#include <rpp/operators/details/forwarding_subject.hpp>
18#include <rpp/operators/details/strategy.hpp>
19#include <rpp/schedulers/current_thread.hpp>
20#include <rpp/utils/utils.hpp>
21
22#include <list>
23
24namespace rpp
25{
26 template<constraint::decayed_type Type>
27 using window_toggle_observable = decltype(std::declval<rpp::operators::details::forwarding_subject<Type>>().get_observable());
28} // namespace rpp
29
30namespace rpp::operators::details
31{
32 template<rpp::constraint::observer TObserver, typename TClosingsSelectorFn>
33 struct window_toggle_state
34 {
35 using Observable = rpp::utils::extract_observer_type_t<TObserver>;
36 using value_type = rpp::utils::extract_observable_type_t<Observable>;
37 using Subject = forwarding_subject<value_type>;
38
39 static_assert(std::same_as<Observable, decltype(std::declval<Subject>().get_observable())>);
40
41 struct state_t
42 {
43 RPP_NO_UNIQUE_ADDRESS TObserver observer;
44 std::list<decltype(std::declval<Subject>().get_observer())> observers{};
45 };
46
47 window_toggle_state(TObserver&& observer, const TClosingsSelectorFn& closings)
48 : m_state{state_t{std::move(observer)}}
49 , m_closings{closings}
50 {
51 }
52
53 rpp::utils::pointer_under_lock<state_t> get_state_under_lock() { return rpp::utils::pointer_under_lock<state_t>{m_state}; }
54
55 template<typename T>
56 auto get_closing(T&& v) const
57 {
58 return m_closings(std::forward<T>(v));
59 }
60
61 auto on_new_subject(const Subject& subject)
62 {
63 auto locked_state = get_state_under_lock();
64 const auto itr = locked_state->observers.insert(locked_state->observers.cend(), subject.get_observer());
65 locked_state->observer.on_next(subject.get_observable());
66 return itr;
67 }
68
69 private:
70 rpp::utils::value_with_mutex<state_t> m_state{};
71 RPP_NO_UNIQUE_ADDRESS TClosingsSelectorFn m_closings;
72 };
73
74 template<rpp::constraint::decayed_type TState>
76 {
77 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
78
79 std::shared_ptr<rpp::refcount_disposable> disposable;
80 std::shared_ptr<TState> state;
82 decltype(std::declval<TState>().on_new_subject(std::declval<typename TState::Subject>())) itr;
83
84 void on_next(const auto&) const
85 {
86 on_completed();
87 }
88
89 void on_error(const std::exception_ptr& err) const
90 {
91 auto locked_state = state->get_state_under_lock();
92 for (const auto& obs : locked_state->observers)
93 obs.on_error(err);
94 locked_state->observer.on_error(err);
95 }
96
97 void on_completed() const
98 {
99 disposable->remove(this_disposable);
100
101 itr->on_completed();
102 this_disposable.dispose();
103
104 auto locked_state = state->get_state_under_lock();
105 locked_state->observers.erase(itr);
106 }
107
108 void set_upstream(const disposable_wrapper& d) const { this_disposable.add(d); }
109 bool is_disposed() const { return this_disposable.is_disposed(); }
110 };
111
112 template<rpp::constraint::decayed_type TState>
114 {
115 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto;
116
117 std::shared_ptr<rpp::refcount_disposable> disposable;
118 std::shared_ptr<TState> state;
119
120 template<typename T>
121 void on_next(T&& v) const
122 {
123 typename TState::Subject subject{disposable->wrapper_from_this()};
124 const auto itr = state->on_new_subject(subject);
125
126 disposable->add(subject.get_disposable());
127 state->get_closing(std::forward<T>(v)).subscribe(window_toggle_closing_observer_strategy<TState>{disposable, state, subject.get_disposable(), itr});
128 }
129
130 void on_error(const std::exception_ptr& err) const
131 {
132 const auto locked_state = state->get_state_under_lock();
133 for (const auto& obs : locked_state->observers)
134 obs.on_error(err);
135 locked_state->observer.on_error(err);
136 }
137
138 static void on_completed() {}
139 static void set_upstream(const disposable_wrapper&) {}
140 static bool is_disposed() { return false; }
141 };
142
143 template<rpp::constraint::observer TObserver, rpp::constraint::observable TOpeningsObservable, typename TClosingsSelectorFn>
145 class window_toggle_observer_strategy
146 {
148
149 public:
150 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
151
152 window_toggle_observer_strategy(TObserver&& observer, const TOpeningsObservable& openings, const TClosingsSelectorFn& closings)
153 : m_state{std::make_shared<TState>(std::move(observer), closings)}
154 {
155 m_state->get_state_under_lock()->observer.set_upstream(m_disposable->add_ref());
156 m_disposable->add(openings.subscribe_with_disposable(window_toggle_opening_observer_strategy<TState>{m_disposable, m_state}));
157 }
158
159 void on_next(const auto& v) const
160 {
161 const auto locked_state = m_state->get_state_under_lock();
162 for (const auto& obs : locked_state->observers)
163 obs.on_next(v);
164 }
165
166 void on_error(const std::exception_ptr& err) const
167 {
168 const auto locked_state = m_state->get_state_under_lock();
169 for (const auto& obs : locked_state->observers)
170 obs.on_error(err);
171 locked_state->observer.on_error(err);
172 }
173
174 void on_completed() const
175 {
176 const auto locked_state = m_state->get_state_under_lock();
177 for (const auto& obs : locked_state->observers)
178 obs.on_completed();
179 locked_state->observer.on_completed();
180 }
181
182 void set_upstream(const disposable_wrapper& d) const { m_disposable->add(d); }
183
184 bool is_disposed() const { return m_disposable->is_disposed(); }
185
186 private:
187 std::shared_ptr<rpp::refcount_disposable> m_disposable = disposable_wrapper_impl<rpp::refcount_disposable>::make().lock();
188 std::shared_ptr<TState> m_state;
189 };
190
191 template<rpp::constraint::observable TOpeningsObservable, typename TClosingsSelectorFn>
193 struct window_toggle_t : lift_operator<window_toggle_t<TOpeningsObservable, TClosingsSelectorFn>, TOpeningsObservable, TClosingsSelectorFn>
194 {
195 using lift_operator<window_toggle_t<TOpeningsObservable, TClosingsSelectorFn>, TOpeningsObservable, TClosingsSelectorFn>::lift_operator;
196
197 template<rpp::constraint::decayed_type T>
199 {
200 using result_type = rpp::window_toggle_observable<T>;
201
202 constexpr static bool own_current_queue = true;
203
204 template<rpp::constraint::observer_of_type<result_type> TObserver>
205 using observer_strategy = window_toggle_observer_strategy<std::decay_t<TObserver>, TOpeningsObservable, TClosingsSelectorFn>;
206 };
207
208 template<rpp::details::observables::constraint::disposables_strategy Prev>
209 using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>;
210 };
211} // namespace rpp::operators::details
212
213namespace rpp::operators
214{
236 * @note `#include <rpp/operators/window.hpp>`
237 *
238 * @par Example
239 * @snippet window_toggle.cpp window_toggle
240 *
241 * @ingroup transforming_operators
242 * @see https://reactivex.io/documentation/operators/window.html
243 */
244 template<rpp::constraint::observable TOpeningsObservable, typename TClosingsSelectorFn>
246 auto window_toggle(TOpeningsObservable&& openings, TClosingsSelectorFn&& closings_selector)
247 {
248 return details::window_toggle_t<std::decay_t<TOpeningsObservable>, std::decay_t<TClosingsSelectorFn>>{std::forward<TOpeningsObservable>(openings), std::forward<TClosingsSelectorFn>(closings_selector)};
249 }
250} // namespace rpp::operators
static disposable_wrapper_impl make(TArgs &&... args)
Main way to create disposable_wrapper. Passed TTarget type can be any type derived from TDisposable.
Definition disposable_wrapper.hpp:164
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition forwarding_subject.hpp:26
Definition fwd.hpp:80
disposable_wrapper_impl< interface_composite_disposable > composite_disposable_wrapper
Wrapper to keep "composite" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:41
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
auto window_toggle(TOpeningsObservable &&openings, TClosingsSelectorFn &&closings_selector)
Subdivide original observable into sub-observables (window observables) and emit sub-observables of i...
Definition window_toggle.hpp:236
Definition disposables_strategy.hpp:29
Definition window_toggle.hpp:34
Definition window_toggle.hpp:194