ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
last.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// TC Wang 2022 - present.
5// Distributed under the Boost Software License, Version 1.0.
6// (See accompanying file LICENSE_1_0.txt or copy at
7// https://www.boost.org/LICENSE_1_0.txt)
8//
9// Project home: https://github.com/victimsnino/ReactivePlusPlus
10//
11
12#pragma once
13
14#include <rpp/operators/lift.hpp> // required due to operator uses lift
15#include <rpp/operators/take_last.hpp> // take_last
16#include <rpp/operators/fwd/last.hpp> // own forwarding
17#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
18#include <rpp/subscribers/constraints.hpp> // constraint::subscriber
19#include <rpp/utils/exceptions.hpp> // not_enough_emissions
20#include <rpp/utils/functors.hpp> // forwarding_on_error
21
22IMPLEMENTATION_FILE(last_tag);
23
24namespace rpp::details
25{
26template<constraint::decayed_type Type>
27struct last_state : public take_last_state<Type>
28{
29 explicit last_state() : take_last_state<Type>{1} {}
30};
31
36
41{
42 template<constraint::decayed_type Type>
43 void operator()(const constraint::subscriber auto& subscriber,
44 const last_state<Type>& state) const
45 {
46 auto&& last_value = state.items.at(0);
47 if (!last_value.has_value())
48 {
49 subscriber.on_error(std::make_exception_ptr(utils::not_enough_emissions{"last() operator expects at least one emission from observable before completion"}));
50 return;
51 }
52
53 subscriber.on_next(std::move(last_value.value()));
54 subscriber.on_completed();
55 }
56};
57
58template<constraint::decayed_type Type>
60{
61public:
62 template<constraint::subscriber_of_type<Type> TSub>
63 auto operator()(TSub&& subscriber) const
64 {
65 auto subscription = subscriber.get_subscription();
66
67 return create_subscriber_with_dynamic_state<Type>(std::move(subscription),
69 utils::forwarding_on_error{},
71 std::forward<TSub>(subscriber),
73 }
74};
75} // namespace rpp::details
Definition: constraints.hpp:19
Definition: last.hpp:60
Definition: last.hpp:41
Definition: last.hpp:28
Definition: take_last.hpp:42
Definition: take_last.hpp:28