30 class window_observer_strategy
32 using Observable = rpp::utils::extract_observer_type_t<TObserver>;
33 using value_type = rpp::utils::extract_observable_type_t<Observable>;
36 static_assert(std::same_as<Observable, decltype(std::declval<Subject>().get_observable())>);
39 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
41 window_observer_strategy(TObserver&&
observer,
size_t count)
43 , m_window_size{std::max(
size_t{1}, count)}
45 m_observer.set_upstream(m_disposable->add_ref());
49 void on_next(T&& v)
const
52 if (m_items_in_current_window == m_window_size)
54 Subject subject{m_disposable->wrapper_from_this()};
55 m_subject_data.emplace(subject.get_observer(), subject.get_disposable());
56 m_disposable->add(m_subject_data->disposable);
57 m_observer.on_next(subject.get_observable());
58 m_items_in_current_window = 0;
61 m_subject_data->observer.on_next(std::forward<T>(v));
64 if (++m_items_in_current_window == m_window_size)
66 m_subject_data->observer.on_completed();
67 m_disposable->remove(m_subject_data->disposable);
68 m_subject_data.reset();
72 void on_error(
const std::exception_ptr& err)
const
75 m_subject_data->observer.on_error(err);
76 m_observer.on_error(err);
79 void on_completed()
const
82 m_subject_data->observer.on_completed();
83 m_observer.on_completed();
88 bool is_disposed()
const {
return m_disposable->is_disposed(); }
92 RPP_NO_UNIQUE_ADDRESS TObserver m_observer;
96 using TObs =
decltype(std::declval<Subject>().get_observer());
100 , disposable{std::move(d)}
108 mutable std::optional<subject_data> m_subject_data;
109 const size_t m_window_size;
110 mutable size_t m_items_in_current_window = m_window_size;
static disposable_wrapper_impl make(TArgs &&... args)
Main way to create disposable_wrapper. Passed TTarget type can be any type derived from TDisposable.
Definition disposable_wrapper.hpp:164