ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
take_until.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/disposables/composite_disposable.hpp>
17#include <rpp/schedulers/current_thread.hpp>
18#include <rpp/utils/utils.hpp>
19
20namespace rpp::operators::details
21{
22 template<rpp::constraint::observer TObserver>
23 class take_until_disposable final : public rpp::composite_disposable
24 {
25 public:
26 take_until_disposable(TObserver&& observer)
27 : m_observer_with_mutex(std::move(observer))
28 {
29 }
30
31 take_until_disposable(const TObserver& observer)
32 : m_observer_with_mutex(observer)
33 {
34 }
35
36 bool is_stopped() const { return m_stopped; }
37 bool stop_return_was_stopped() { return m_stopped.exchange(true); }
38
39 rpp::utils::pointer_under_lock<TObserver> get_observer() { return m_observer_with_mutex; }
40
41 private:
42 rpp::utils::value_with_mutex<TObserver> m_observer_with_mutex{};
43 std::atomic_bool m_stopped{};
44 };
45
46 template<rpp::constraint::observer TObserver>
48 {
49 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto;
50
51 std::shared_ptr<take_until_disposable<TObserver>> state;
52
53 void on_error(const std::exception_ptr& err) const
54 {
55 if (!state->stop_return_was_stopped())
56 state->get_observer()->on_error(err);
57 }
58
59 void on_completed() const
60 {
61 if (!state->stop_return_was_stopped())
62 state->get_observer()->on_completed();
63 }
64
65 void set_upstream(const disposable_wrapper& d) { state->add(d); }
66
67 bool is_disposed() const { return state->is_disposed(); }
68 };
69
70 template<rpp::constraint::observer TObserver>
72 {
73 template<typename T>
74 void on_next(const T&) const
75 {
76 if (!take_until_observer_strategy_base<TObserver>::state->stop_return_was_stopped())
77 take_until_observer_strategy_base<TObserver>::state->get_observer()->on_completed();
78 }
79 };
80
81 template<rpp::constraint::observer TObserver>
83 {
84 template<typename T>
85 void on_next(T&& v) const
86 {
87 if (!take_until_observer_strategy_base<TObserver>::state->is_stopped())
88 take_until_observer_strategy_base<TObserver>::state->get_observer()->on_next(std::forward<T>(v));
89 }
90 };
91
92 template<rpp::constraint::observable TObservable>
94 {
95 RPP_NO_UNIQUE_ADDRESS TObservable observable{};
96
97 template<rpp::constraint::decayed_type T>
99 {
100 using result_type = T;
101
102 constexpr static bool own_current_queue = true;
103 };
104
105 template<rpp::details::observables::constraint::disposables_strategy Prev>
106 using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>;
107
108 template<rpp::constraint::decayed_type Type, rpp::constraint::observer Observer>
109 auto lift(Observer&& observer) const
110 {
111 const auto d = disposable_wrapper_impl<take_until_disposable<std::decay_t<Observer>>>::make(std::forward<Observer>(observer));
112 auto ptr = d.lock();
113 ptr->get_observer()->set_upstream(d.as_weak());
114
115 observable.subscribe(take_until_throttle_observer_strategy<std::decay_t<Observer>>{ptr});
117 }
118 };
119} // namespace rpp::operators::details
120
121namespace rpp::operators
122{
143 * @ingroup conditional_operators
144 * @see https://reactivex.io/documentation/operators/takeuntil.html
145 */
146 template<rpp::constraint::observable TObservable>
147 auto take_until(TObservable&& until_observable)
148 {
149 return details::take_until_t<std::decay_t<TObservable>>{std::forward<TObservable>(until_observable)};
150 }
151} // namespace rpp::operators
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition utils.hpp:260
auto take_until(TObservable &&until_observable)
Discard any items emitted by an Observable after a second Observable emits an item or terminates.
Definition take_until.hpp:142
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition disposables_strategy.hpp:29
Definition take_until.hpp:94