ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
take.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/lift.hpp> // required due to operator uses lift
14#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
15#include <rpp/operators/fwd/take.hpp>
16#include <rpp/subscribers/constraints.hpp>
17#include <rpp/utils/functors.hpp>
18
19IMPLEMENTATION_FILE(take_tag);
20
21namespace rpp::details
22{
24{
25 mutable size_t count;
26};
27
29{
30 void operator()(auto&& value, const constraint::subscriber auto& subscriber, const take_state& state) const
31 {
32 if (state.count > 0)
33 {
34 --state.count;
35 subscriber.on_next(std::forward<decltype(value)>(value));
36 }
37
38 if (state.count == 0)
39 subscriber.on_completed();
40 }
41};
42
43template<constraint::decayed_type Type>
45{
46 size_t count;
47
48 template<constraint::subscriber_of_type<Type> TSub>
49 auto operator()(TSub&& subscriber) const
50 {
51 auto subscription = subscriber.get_subscription();
52 // dynamic_state there to make shared_ptr for observer instead of making shared_ptr for state
53 return create_subscriber_with_dynamic_state<Type>(std::move(subscription),
55 utils::forwarding_on_error{},
56 utils::forwarding_on_completed{},
57 std::forward<TSub>(subscriber),
58 take_state{count});
59 }
60};
61} // namespace rpp::details
Definition: constraints.hpp:19
Definition: take.hpp:45
Definition: take.hpp:29
Definition: take.hpp:24