ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
group_by.hpp
1#pragma once
2
3#include <rpp/defs.hpp> // RPP_NO_UNIQUE_ADDRESS
4#include <rpp/observables/grouped_observable.hpp> // grouped_observable
5#include <rpp/operators/lift.hpp> // required due to operator uses lift
6#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
7#include <rpp/operators/fwd/group_by.hpp> // own forwarding
8#include <rpp/subjects/publish_subject.hpp> // publish_subject
9#include <rpp/subscribers/constraints.hpp> // constraint::subscriber
10#include <rpp/utils/utilities.hpp> // utils::as_const
11
12#include <atomic>
13#include <map>
14
15IMPLEMENTATION_FILE(group_by_tag);
16
17namespace rpp::details
18{
19class group_by_state_base : public std::enable_shared_from_this<group_by_state_base>
20{
21public:
22 group_by_state_base() = default;
23
24 virtual ~group_by_state_base() noexcept = default;
25
26 void on_subscribe(const composite_subscription& dest)
27 {
28 subscribers.fetch_add(1, std::memory_order::acq_rel);
29 dest.add([state = this->weak_from_this()]
30 {
31 if (const auto locked = state.lock())
32 if (locked->subscribers.fetch_sub(1, std::memory_order::acq_rel) == 1)
33 locked->lifetime.unsubscribe();
34 });
35 }
36
37 const auto& get_source_lifetime() const { return lifetime; }
38
39private:
40 composite_subscription lifetime{};
41 std::atomic_size_t subscribers{};
42};
43
44template<constraint::decayed_type TKey, constraint::decayed_type Type, std::strict_weak_order<TKey, TKey> KeyComparator>
46{
47 group_by_state(const KeyComparator& comparator)
49 , key_to_subject{comparator} { }
50
51 std::map<TKey, subjects::publish_subject<Type>, KeyComparator> key_to_subject;
52
53 void broadcast(const auto& action, const auto& subscriber) const
54 {
55 for (const auto& [_, subject] : key_to_subject)
56 action(subject.get_subscriber());
57
58 action(subscriber);
59 }
60};
61
62template<constraint::decayed_type Type>
64{
66 std::weak_ptr<group_by_state_base> state;
67
68 void operator()(auto&& subscriber) const
69 {
70 if (const auto locked = state.lock())
71 {
72 locked->on_subscribe(subscriber.get_subscription());
73 subject.get_observable().subscribe(std::forward<decltype(subscriber)>(subscriber));
74 }
75 }
76};
77} // namespace rpp::details
78
79namespace rpp
80{
81template<constraint::decayed_type TKey, constraint::decayed_type ResValue>
83}
84
85namespace rpp::details
86{
87template<constraint::decayed_type Type,
88 constraint::decayed_type TKey,
89 std::invocable<Type> KeySelector,
90 std::invocable<Type> ValueSelector,
91 std::strict_weak_order<TKey, TKey> KeyComparator>
93{
94 using ValueType = utils::decayed_invoke_result_t<ValueSelector, Type>;
96
97 RPP_NO_UNIQUE_ADDRESS KeySelector key_selector;
98 RPP_NO_UNIQUE_ADDRESS ValueSelector value_selector;
99 RPP_NO_UNIQUE_ADDRESS KeyComparator comparator;
100
101private:
102 struct observer_state
103 {
104 std::shared_ptr<StateType> state;
105 RPP_NO_UNIQUE_ADDRESS KeySelector key_selector;
106 RPP_NO_UNIQUE_ADDRESS ValueSelector value_selector;
107 };
108
109 struct on_next
110 {
111 void operator()(auto&& val, const auto& subscriber, const observer_state& state) const
112 {
113 auto key = state.key_selector(utils::as_const(val));
114 auto [itr, inserted] = state.state->key_to_subject.try_emplace(key);
115
116 if (inserted)
117 subscriber.on_next(grouped_observable_group_by<TKey, ValueType>{key, group_by_on_subscribe<ValueType>{itr->second, state.state}});
118
119 const auto& subject_sub = itr->second.get_subscriber();
120 if (subject_sub.is_subscribed())
121 subject_sub.on_next(state.value_selector(std::forward<decltype(val)>(val)));
122 }
123 };
124
125 struct on_error
126 {
127 void operator()(const std::exception_ptr& err, const auto& subscriber, const observer_state& state) const
128 {
129 state.state->broadcast([&err](const auto& sub) { sub.on_error(err); }, subscriber);
130 }
131 };
132
133 struct on_completed
134 {
135 void operator()(const auto& subscriber, const observer_state& state) const
136 {
137 state.state->broadcast([](const auto& sub) { sub.on_completed(); }, subscriber);
138 }
139 };
140
141public:
142 template<constraint::subscriber TSub>
143 auto operator()(TSub&& subscriber) const
144 {
145 auto state = std::make_shared<StateType>(comparator);
146
147 state->on_subscribe(subscriber.get_subscription());
148
149 return create_subscriber_with_state<Type>(state->get_source_lifetime(),
150 on_next{},
151 on_error{},
152 on_completed{},
153 std::forward<TSub>(subscriber),
154 observer_state{state, key_selector, value_selector});
155 }
156};
157
158template<constraint::decayed_type Type,
159 constraint::decayed_type TKey,
160 std::invocable<Type> KeySelector,
161 std::invocable<Type> ValueSelector,
162 std::strict_weak_order<TKey, TKey> KeyComparator>
163auto group_by_impl(auto&& observable, KeySelector&& key_selector, ValueSelector&& value_selector, KeyComparator&& comparator)
164{
166 using Lifter = group_by_lift_impl<Type, TKey, std::decay_t<KeySelector>, std::decay_t<ValueSelector>, std::decay_t<KeyComparator>>;
167
168 return std::forward<decltype(observable)>(observable)
169 .template lift<Res>(Lifter{std::forward<KeySelector>(key_selector),
170 std::forward<ValueSelector>(value_selector),
171 std::forward<KeyComparator>(comparator)});
172}
173} // namespace rpp::details
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
std::weak_ptr< details::subscription_state > add(const TSub &sub=TSub{}) const
Add any other subscription to this as dependent.
Definition: composite_subscription.hpp:43
Definition: group_by.hpp:20
Definition: grouped_observable.hpp:20
Subject which just multicasts values to observers subscribed on it. It contains two parts: subscriber...
Definition: publish_subject.hpp:78
Definition: group_by.hpp:93
Definition: group_by.hpp:64
Definition: group_by.hpp:46