13#include <rpp/operators/fwd/window.hpp> 
   14#include <rpp/subjects/publish_subject.hpp> 
   15#include <rpp/subscribers/constraints.hpp> 
   16#include <rpp/operators/details/subscriber_with_state.hpp>  
   19IMPLEMENTATION_FILE(window_tag);
 
   23template<constra
int::decayed_type Type>
 
   24using windowed_observable = 
decltype(std::declval<subjects::publish_subject<Type>>().get_observable());
 
   29template<constra
int::decayed_type Type>
 
   32    const size_t                            window_size{};
 
   33    mutable size_t                          items_in_current_window = window_size;
 
   39    template<constra
int::decayed_type Type>
 
   40    void operator()(
auto&& value, 
const auto& subscriber, 
const window_state<Type>& state)
 const 
   43        if (state.items_in_current_window == state.window_size)
 
   45            subscriber.on_next(state.subject.get_observable());
 
   46            state.items_in_current_window = 0;
 
   49        ++state.items_in_current_window;
 
   50        state.subject.get_subscriber().on_next(std::forward<
decltype(value)>(value));
 
   53        if (state.items_in_current_window == state.window_size)
 
   55            state.subject.get_subscriber().on_completed();
 
   63    template<constra
int::decayed_type Type>
 
   64    void operator()(
const std::exception_ptr& err, 
const auto& subscriber, 
const window_state<Type>& state)
 const 
   66        state.subject.get_subscriber().on_error(err);
 
   67        subscriber.on_error(err);
 
   73    template<constra
int::decayed_type Type>
 
   76        state.subject.get_subscriber().on_completed();
 
   77        subscriber.on_completed();
 
   81template<constra
int::decayed_type Type>
 
   86    template<constra
int::subscriber_of_type<windowed_observable<Type>> TSub>
 
   87    auto operator()(TSub&& subscriber)
 const 
   89        auto subscription = subscriber.get_subscription();
 
   92        return create_subscriber_with_dynamic_state<Type>(std::move(subscription),
 
   96                                                          std::forward<TSub>(subscriber),
 
  101template<constra
int::decayed_type Type, constra
int::observable_of_type<Type> TObs>
 
  102auto window_impl(TObs&& obs, 
size_t window_size)
 
  104    return std::forward<TObs>(obs).template lift<windowed_observable<Type>>(
window_lift_impl<Type>{window_size});
 
Subject which just multicasts values to observers subscribed on it. It contains two parts: subscriber...
Definition: publish_subject.hpp:78
Definition: window.hpp:83
Definition: window.hpp:72
Definition: window.hpp:62
Definition: window.hpp:38
Definition: window.hpp:31