13#include <rpp/operators/fwd.hpp>
15#include <rpp/defs.hpp>
16#include <rpp/disposables/composite_disposable.hpp>
17#include <rpp/schedulers/current_thread.hpp>
18#include <rpp/utils/utils.hpp>
20namespace rpp::operators::details
22 template<rpp::constra
int::observer TObserver>
26 take_until_disposable(TObserver&&
observer)
27 : m_observer_with_mutex(std::move(
observer))
31 take_until_disposable(
const TObserver&
observer)
36 bool is_stopped()
const {
return m_stopped; }
37 bool stop_return_was_stopped() {
return m_stopped.exchange(
true); }
39 rpp::utils::pointer_under_lock<TObserver> get_observer() {
return m_observer_with_mutex; }
43 std::atomic_bool m_stopped{};
46 template<rpp::constra
int::observer TObserver>
49 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto;
51 std::shared_ptr<take_until_disposable<TObserver>> state;
53 void on_error(
const std::exception_ptr& err)
const
55 if (!state->stop_return_was_stopped())
56 state->get_observer()->on_error(err);
59 void on_completed()
const
61 if (!state->stop_return_was_stopped())
62 state->get_observer()->on_completed();
67 bool is_disposed()
const {
return state->is_disposed(); }
70 template<rpp::constra
int::observer TObserver>
74 void on_next(
const T&)
const
76 if (!take_until_observer_strategy_base<TObserver>::state->stop_return_was_stopped())
77 take_until_observer_strategy_base<TObserver>::state->get_observer()->on_completed();
81 template<rpp::constra
int::observer TObserver>
85 void on_next(T&& v)
const
87 if (!take_until_observer_strategy_base<TObserver>::state->is_stopped())
88 take_until_observer_strategy_base<TObserver>::state->get_observer()->on_next(std::forward<T>(v));
92 template<rpp::constra
int::observable TObservable>
95 RPP_NO_UNIQUE_ADDRESS TObservable observable{};
97 template<rpp::constra
int::decayed_type T>
100 using result_type = T;
102 constexpr static bool own_current_queue =
true;
105 template<rpp::details::observables::constra
int::disposables_strategy Prev>
108 template<rpp::constra
int::decayed_type Type, rpp::constra
int::observer Observer>
109 auto lift(Observer&&
observer)
const
113 ptr->get_observer()->set_upstream(d.as_weak());
121namespace rpp::operators
146 template<rpp::constra
int::observable TObservable>
147 auto take_until(TObservable&& until_observable)
Disposable which can keep some other sub-disposables. When this root disposable is disposed,...
Definition composite_disposable.hpp:175
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
auto take_until(TObservable &&until_observable)
Discard any items emitted by an Observable after a second Observable emits an item or terminates.
Definition take_until.hpp:142
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition disposables_strategy.hpp:29
Definition take_until.hpp:48
Definition take_until.hpp:83
Definition take_until.hpp:99
Definition take_until.hpp:94
Definition take_until.hpp:72