ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
window.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd/window.hpp>
14#include <rpp/subjects/publish_subject.hpp>
15#include <rpp/subscribers/constraints.hpp>
16#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
17
18
19IMPLEMENTATION_FILE(window_tag);
20
21namespace rpp
22{
23template<constraint::decayed_type Type>
24using windowed_observable = decltype(std::declval<subjects::publish_subject<Type>>().get_observable());
25}
26
27namespace rpp::details
28{
29template<constraint::decayed_type Type>
31{
32 const size_t window_size{};
33 mutable size_t items_in_current_window = window_size;
34 mutable subjects::publish_subject<Type> subject{};
35};
36
38{
39 template<constraint::decayed_type Type>
40 void operator()(auto&& value, const auto& subscriber, const window_state<Type>& state) const
41 {
42 // need to send new subject due to NEW item appeared (we avoid sending new subjects if no any new items)
43 if (state.items_in_current_window == state.window_size)
44 {
45 subscriber.on_next(state.subject.get_observable());
46 state.items_in_current_window = 0;
47 }
48
49 ++state.items_in_current_window;
50 state.subject.get_subscriber().on_next(std::forward<decltype(value)>(value));
51
52 // cleanup current subject, but don't send due to wait for new value
53 if (state.items_in_current_window == state.window_size)
54 {
55 state.subject.get_subscriber().on_completed();
57 }
58 }
59};
60
62{
63 template<constraint::decayed_type Type>
64 void operator()(const std::exception_ptr& err, const auto& subscriber, const window_state<Type>& state) const
65 {
66 state.subject.get_subscriber().on_error(err);
67 subscriber.on_error(err);
68 }
69};
70
72{
73 template<constraint::decayed_type Type>
74 void operator()(const auto& subscriber, const window_state<Type>& state) const
75 {
76 state.subject.get_subscriber().on_completed();
77 subscriber.on_completed();
78 }
79};
80
81template<constraint::decayed_type Type>
83{
84 size_t window_size{};
85
86 template<constraint::subscriber_of_type<windowed_observable<Type>> TSub>
87 auto operator()(TSub&& subscriber) const
88 {
89 auto subscription = subscriber.get_subscription();
90
91 // dynamic_state there to make shared_ptr for observer instead of making shared_ptr for state
92 return create_subscriber_with_dynamic_state<Type>(std::move(subscription),
96 std::forward<TSub>(subscriber),
97 window_state<Type>{window_size});
98 }
99};
100
101template<constraint::decayed_type Type, constraint::observable_of_type<Type> TObs>
102auto window_impl(TObs&& obs, size_t window_size)
103{
104 return std::forward<TObs>(obs).template lift<windowed_observable<Type>>(window_lift_impl<Type>{window_size});
105}
106} // namespace rpp::details
Subject which just multicasts values to observers subscribed on it. It contains two parts: subscriber...
Definition: publish_subject.hpp:78
Definition: window.hpp:83
Definition: window.hpp:72
Definition: window.hpp:62
Definition: window.hpp:38
Definition: window.hpp:31