ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
element_at.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/strategy.hpp>
17#include <rpp/utils/exceptions.hpp>
18
19#include <cstddef>
20
21namespace rpp::operators::details
22{
23 template<rpp::constraint::observer TObserver>
25 {
26 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
27
28 RPP_NO_UNIQUE_ADDRESS TObserver observer;
29 mutable size_t count;
30
31 template<typename T>
32 void on_next(T&& v) const
33 {
34 if (count)
35 {
36 --count;
37 }
38
39 if (!count)
40 {
41 observer.on_next(std::forward<T>(v));
42 observer.on_completed();
43 }
44 }
45
46 void on_error(const std::exception_ptr& err) const { observer.on_error(err); }
47
48 void on_completed() const
49 {
50 if (count)
51 {
52 observer.on_error(std::make_exception_ptr(utils::out_of_range{"index is out of bounds"}));
53 }
54 else
55 {
56 observer.on_completed();
57 }
58 }
59
60 void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); }
61
62 bool is_disposed() const { return observer.is_disposed(); }
63 };
64
65 struct element_at_t : lift_operator<element_at_t, size_t>
66 {
67 using lift_operator<element_at_t, size_t>::lift_operator;
68
69 template<rpp::constraint::decayed_type T>
71 {
72 using result_type = T;
73
74 template<rpp::constraint::observer_of_type<result_type> TObserver>
75 using observer_strategy = element_at_observer_strategy<TObserver>;
76 };
77
78 template<rpp::details::observables::constraint::disposables_strategy Prev>
79 using updated_optimal_disposables_strategy = Prev;
80 };
81} // namespace rpp::operators::details
82
83namespace rpp::operators
84{
97 *
98 * @ingroup filtering_operators
99 * @see https://reactivex.io/documentation/operators/elementat.html
100 */
101 inline auto element_at(size_t index)
102 {
103 return details::element_at_t{index + 1};
104 }
105} // namespace rpp::operators
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
auto element_at(size_t index)
Emit item located at specified index location in the sequence of items emitted by the source observab...
Definition element_at.hpp:97
Definition element_at.hpp:66
Definition exceptions.hpp:33