ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
distinct_until_changed.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/defs.hpp>
16#include <rpp/operators/details/strategy.hpp>
17
18#include <type_traits>
19
20namespace rpp::operators::details
21{
22 template<rpp::constraint::decayed_type Type, rpp::constraint::observer TObserver, rpp::constraint::decayed_type EqualityFn>
24 {
25 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
26
27 RPP_NO_UNIQUE_ADDRESS TObserver observer;
28 RPP_NO_UNIQUE_ADDRESS EqualityFn comparator;
29 mutable std::optional<Type> last_value{};
30
31 template<typename T>
32 void on_next(T&& v) const
33 {
34 if (last_value.has_value() && comparator(utils::as_const(last_value.value()), rpp::utils::as_const(v)))
35 return;
36
37 last_value.emplace(std::forward<T>(v));
38 observer.on_next(utils::as_const(last_value.value()));
39 }
40
41 void on_error(const std::exception_ptr& err) const { observer.on_error(err); }
42
43 void on_completed() const { observer.on_completed(); }
44
45 void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); }
46
47 bool is_disposed() const { return observer.is_disposed(); }
48 };
49
50 template<rpp::constraint::decayed_type EqualityFn>
51 struct distinct_until_changed_t : public operators::details::lift_operator<distinct_until_changed_t<EqualityFn>, EqualityFn>
52 {
54
55 template<rpp::constraint::decayed_type T>
57 {
58 static_assert(rpp::constraint::invocable_r_v<bool, EqualityFn, T, T>, "EqualityFn is not invocable with T and T returning bool");
59
60 using result_type = T;
61
62 template<rpp::constraint::observer_of_type<result_type> TObserver>
64 };
65
66 template<rpp::details::observables::constraint::disposables_strategy Prev>
67 using updated_optimal_disposables_strategy = Prev;
68 };
69} // namespace rpp::operators::details
70
71namespace rpp::operators
72{
97 * @see https://reactivex.io/documentation/operators/distinct.html
98 */
99 template<typename EqualityFn>
100 requires (!utils::is_not_template_callable<EqualityFn> || std::same_as<bool, std::invoke_result_t<EqualityFn, rpp::utils::convertible_to_any, rpp::utils::convertible_to_any>>)
101 auto distinct_until_changed(EqualityFn&& equality_fn)
102 {
103 return details::distinct_until_changed_t<std::decay_t<EqualityFn>>{std::forward<EqualityFn>(equality_fn)};
104 }
105} // namespace rpp::operators
Definition strategy.hpp:28
Definition constraints.hpp:50
Definition function_traits.hpp:45
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
auto distinct_until_changed(EqualityFn &&equality_fn)
Suppress consecutive duplicates of emissions from original observable.
Definition distinct_until_changed.hpp:97
Definition distinct_until_changed.hpp:52