13#include <rpp/operators/fwd/window.hpp>
14#include <rpp/subjects/publish_subject.hpp>
15#include <rpp/subscribers/constraints.hpp>
16#include <rpp/operators/details/subscriber_with_state.hpp>
19IMPLEMENTATION_FILE(window_tag);
23template<constra
int::decayed_type Type>
24using windowed_observable =
decltype(std::declval<subjects::publish_subject<Type>>().get_observable());
29template<constra
int::decayed_type Type>
32 const size_t window_size{};
33 mutable size_t items_in_current_window = window_size;
39 template<constra
int::decayed_type Type>
40 void operator()(
auto&& value,
const auto& subscriber,
const window_state<Type>& state)
const
43 if (state.items_in_current_window == state.window_size)
45 subscriber.on_next(state.subject.get_observable());
46 state.items_in_current_window = 0;
49 ++state.items_in_current_window;
50 state.subject.get_subscriber().on_next(std::forward<
decltype(value)>(value));
53 if (state.items_in_current_window == state.window_size)
55 state.subject.get_subscriber().on_completed();
63 template<constra
int::decayed_type Type>
64 void operator()(
const std::exception_ptr& err,
const auto& subscriber,
const window_state<Type>& state)
const
66 state.subject.get_subscriber().on_error(err);
67 subscriber.on_error(err);
73 template<constra
int::decayed_type Type>
76 state.subject.get_subscriber().on_completed();
77 subscriber.on_completed();
81template<constra
int::decayed_type Type>
86 template<constra
int::subscriber_of_type<windowed_observable<Type>> TSub>
87 auto operator()(TSub&& subscriber)
const
89 auto subscription = subscriber.get_subscription();
92 return create_subscriber_with_dynamic_state<Type>(std::move(subscription),
96 std::forward<TSub>(subscriber),
101template<constra
int::decayed_type Type, constra
int::observable_of_type<Type> TObs>
102auto window_impl(TObs&& obs,
size_t window_size)
104 return std::forward<TObs>(obs).template lift<windowed_observable<Type>>(
window_lift_impl<Type>{window_size});
Subject which just multicasts values to observers subscribed on it. It contains two parts: subscriber...
Definition: publish_subject.hpp:78
Definition: window.hpp:83
Definition: window.hpp:72
Definition: window.hpp:62
Definition: window.hpp:38
Definition: window.hpp:31