ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
take_while.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/strategy.hpp>
17
18namespace rpp::operators::details
19{
20 template<rpp::constraint::observer TObserver, rpp::constraint::decayed_type Fn>
22 {
23 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
24
25 RPP_NO_UNIQUE_ADDRESS TObserver observer;
26 RPP_NO_UNIQUE_ADDRESS Fn fn;
27
28 template<typename T>
29 void on_next(T&& v) const
30 {
31 if (fn(rpp::utils::as_const(v)))
32 observer.on_next(std::forward<T>(v));
33 else
35 }
36
37 void on_error(const std::exception_ptr& err) const { observer.on_error(err); }
38
39 void on_completed() const { observer.on_completed(); }
40
41 void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); }
42
43 bool is_disposed() const { return observer.is_disposed(); }
44 };
45
46 template<rpp::constraint::decayed_type Fn>
47 struct take_while_t : lift_operator<take_while_t<Fn>, Fn>
48 {
49 using lift_operator<take_while_t<Fn>, Fn>::lift_operator;
50
51 template<rpp::constraint::decayed_type T>
53 {
54 static_assert(std::is_invocable_r_v<bool, Fn, T>, "Fn is not invocable with T returning bool");
55
56 using result_type = T;
57
58 template<rpp::constraint::observer_of_type<result_type> TObserver>
60 };
61
62 template<rpp::details::observables::constraint::disposables_strategy Prev>
63 using updated_optimal_disposables_strategy = Prev;
64 };
65} // namespace rpp::operators::details
66
67namespace rpp::operators
68{
93 template<typename Fn>
94 requires (!utils::is_not_template_callable<Fn> || std::same_as<bool, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
95 auto take_while(Fn&& predicate)
96 {
97 return details::take_while_t<std::decay_t<Fn>>{std::forward<Fn>(predicate)};
98 }
99} // namespace rpp::operators
void on_next(const Type &v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:84
void set_upstream(const disposable_wrapper &d) noexcept
Observable calls this method to pass disposable. Observer disposes this disposable WHEN observer want...
Definition observer.hpp:49
void on_completed() const noexcept
Observable calls this method to notify observer about completion of emissions.
Definition observer.hpp:135
void on_error(const std::exception_ptr &err) const noexcept
Observable calls this method to notify observer about some error during generation next data.
Definition observer.hpp:120
bool is_disposed() const noexcept
Observable calls this method to check if observer interested or not in emissions.
Definition observer.hpp:74
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition strategy.hpp:28
Definition function_traits.hpp:45
auto take_while(Fn &&predicate)
Sends items from observable while items are satisfy predicate. When condition becomes false -> sends ...
Definition take_while.hpp:91
Definition take_while.hpp:48