ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
window.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/disposables/refcount_disposable.hpp>
17#include <rpp/operators/details/forwarding_subject.hpp>
18#include <rpp/operators/details/strategy.hpp>
19
20#include <cstddef>
21
22namespace rpp
23{
24 template<constraint::decayed_type Type>
25 using window_observable = decltype(std::declval<rpp::operators::details::forwarding_subject<Type>>().get_observable());
26} // namespace rpp
27namespace rpp::operators::details
28{
29 template<rpp::constraint::observer TObserver>
30 class window_observer_strategy
31 {
32 using Observable = rpp::utils::extract_observer_type_t<TObserver>;
33 using value_type = rpp::utils::extract_observable_type_t<Observable>;
34 using Subject = forwarding_subject<value_type>;
35
36 static_assert(std::same_as<Observable, decltype(std::declval<Subject>().get_observable())>);
37
38 public:
39 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
40
41 window_observer_strategy(TObserver&& observer, size_t count)
42 : m_observer{std::move(observer)}
43 , m_window_size{std::max(size_t{1}, count)}
44 {
45 m_observer.set_upstream(m_disposable->add_ref());
46 }
47
48 template<typename T>
49 void on_next(T&& v) const
50 {
51 // need to send new subject due to NEW item appeared (we avoid sending new subjects if no any new items)
52 if (m_items_in_current_window == m_window_size)
53 {
54 Subject subject{m_disposable->wrapper_from_this()};
55 m_subject_data.emplace(subject.get_observer(), subject.get_disposable());
56 m_disposable->add(m_subject_data->disposable);
57 m_observer.on_next(subject.get_observable());
58 m_items_in_current_window = 0;
59 }
60
61 m_subject_data->observer.on_next(std::forward<T>(v));
62
63 // cleanup current subject, but don't send due to wait for new value
64 if (++m_items_in_current_window == m_window_size)
65 {
66 m_subject_data->observer.on_completed();
67 m_disposable->remove(m_subject_data->disposable);
68 m_subject_data.reset();
69 }
70 }
71
72 void on_error(const std::exception_ptr& err) const
73 {
74 if (m_subject_data)
75 m_subject_data->observer.on_error(err);
76 m_observer.on_error(err);
77 }
78
79 void on_completed() const
80 {
81 if (m_subject_data)
82 m_subject_data->observer.on_completed();
83 m_observer.on_completed();
84 }
85
86 void set_upstream(const disposable_wrapper& d) const { m_disposable->add(d); }
87
88 bool is_disposed() const { return m_disposable->is_disposed(); }
89
90 private:
91 std::shared_ptr<refcount_disposable> m_disposable = disposable_wrapper_impl<refcount_disposable>::make().lock();
92 RPP_NO_UNIQUE_ADDRESS TObserver m_observer;
93
94 struct subject_data
95 {
96 using TObs = decltype(std::declval<Subject>().get_observer());
97
98 subject_data(TObs&& obs, rpp::disposable_wrapper&& d)
99 : observer{std::move(obs)}
100 , disposable{std::move(d)}
101 {
102 }
103
104 TObs observer;
105 rpp::disposable_wrapper disposable;
106 };
107
108 mutable std::optional<subject_data> m_subject_data;
109 const size_t m_window_size;
110 mutable size_t m_items_in_current_window = m_window_size;
111 };
112
113 struct window_t : lift_operator<window_t, size_t>
114 {
115 using lift_operator<window_t, size_t>::lift_operator;
116
117 template<rpp::constraint::decayed_type T>
119 {
120 using result_type = window_observable<T>;
121
122 template<rpp::constraint::observer_of_type<result_type> TObserver>
123 using observer_strategy = window_observer_strategy<TObserver>;
124 };
125
126 template<rpp::details::observables::constraint::disposables_strategy Prev>
127 using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>;
128 };
129} // namespace rpp::operators::details
130
131namespace rpp::operators
132{
150 * @param count amount of items which every observable would have
151 *
152 * @note `#include <rpp/operators/window.hpp>`
153 *
154 * @par Example
155 * @snippet window.cpp window
156 *
157 * @ingroup transforming_operators
158 * @see https://reactivex.io/documentation/operators/window.html
159 */
160 inline auto window(size_t count)
161 {
162 return details::window_t{count};
163 }
164} // namespace rpp::operators
static disposable_wrapper_impl make(TArgs &&... args)
Main way to create disposable_wrapper. Passed TTarget type can be any type derived from TDisposable.
Definition disposable_wrapper.hpp:164
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition forwarding_subject.hpp:26
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
auto window(size_t count)
Subdivide original observable into sub-observables (window observables) and emit sub-observables of i...
Definition window.hpp:150
Definition disposables_strategy.hpp:29
Definition window.hpp:114