ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
distinct_until_changed.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/defs.hpp> // RPP_NO_UNIQUE_ADDRESS
14#include <rpp/operators/lift.hpp> // required due to operator uses lift
15#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_dynamic_state
16#include <rpp/operators/fwd/distinct_until_changed.hpp> // own forwarding
17#include <rpp/subscribers/constraints.hpp> // constraint::subscriber_of_type
18#include <rpp/utils/functors.hpp> // forwarding_on_error/forwarding_on_completed
19#include <rpp/utils/utilities.hpp> // as_const
20
21#include <optional>
22
23
24IMPLEMENTATION_FILE(distinct_until_changed_tag);
25
26namespace rpp::details
27{
28template<constraint::decayed_type Type, std::equivalence_relation<Type, Type> EqualityFn>
30{
31 RPP_NO_UNIQUE_ADDRESS EqualityFn equality_comparator;
32 mutable std::optional<Type> last_value{};
33};
34
36{
37 template<constraint::decayed_type Type, std::equivalence_relation<Type, Type> EqualityFn>
38 void operator()(auto&& new_value,
39 const constraint::subscriber auto& sub,
41 {
42 if (state.last_value.has_value() &&
43 state.equality_comparator(utils::as_const(state.last_value.value()),
44 utils::as_const(new_value)))
45 return;
46
47 state.last_value.emplace(new_value);
48 sub.on_next(std::forward<decltype(new_value)>(new_value));
49 }
50};
51
52template<constraint::decayed_type Type, std::equivalence_relation<Type, Type> EqualityFn>
54{
55 RPP_NO_UNIQUE_ADDRESS EqualityFn equality_comparator;
56
57 template<constraint::subscriber_of_type<Type> TSub>
58 auto operator()(TSub&& subscriber) const
59 {
60 auto subscription = subscriber.get_subscription();
61 // dynamic_state there to make shared_ptr for observer instead of making shared_ptr for state
62 return create_subscriber_with_dynamic_state<Type>(std::move(subscription),
64 utils::forwarding_on_error{},
65 utils::forwarding_on_completed{},
66 std::forward<TSub>(subscriber),
68 }
69};
70} // namespace rpp::details
Definition: constraints.hpp:19
Definition: distinct_until_changed.hpp:54
Definition: distinct_until_changed.hpp:36
Definition: distinct_until_changed.hpp:30