ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
group_by.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/fwd.hpp>
14
15#include <rpp/disposables/refcount_disposable.hpp>
16#include <rpp/observables/grouped_observable.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/subjects/publish_subject.hpp>
19#include <rpp/utils/function_traits.hpp>
20
21#include <map>
22#include <type_traits>
23
24namespace rpp::operators::details
25{
26 template<rpp::constraint::decayed_type T>
27 struct group_by_observable_strategy;
28} // namespace rpp::operators::details
29
30namespace rpp
31{
32 template<constraint::decayed_type TKey, constraint::decayed_type ResValue>
33 using grouped_observable_group_by = grouped_observable<TKey, ResValue, operators::details::group_by_observable_strategy<ResValue>>;
34} // namespace rpp
35
36namespace rpp::operators::details
37{
38 template<rpp::constraint::observer TObserver>
40 {
41 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
42
43 RPP_NO_UNIQUE_ADDRESS TObserver observer;
45
46 template<typename T>
47 void on_next(T&& v) const
48 {
49 observer.on_next(std::forward<T>(v));
50 }
51
52 void on_error(const std::exception_ptr& err) const { observer.on_error(err); }
53
54 void on_completed() const { observer.on_completed(); }
55
56 bool is_disposed() const { return observer.is_disposed(); }
57
58 void set_upstream(const disposable_wrapper& d) const { disposable.add(d); }
59 };
60
61 template<rpp::constraint::decayed_type T, rpp::constraint::observer TObserver, rpp::constraint::decayed_type KeySelector, rpp::constraint::decayed_type ValueSelector, rpp::constraint::decayed_type KeyComparator>
63 {
64 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
65
66 using TKey = rpp::utils::decayed_invoke_result_t<KeySelector, T>;
67 using Type = rpp::utils::decayed_invoke_result_t<ValueSelector, T>;
68
69 RPP_NO_UNIQUE_ADDRESS TObserver observer;
70 RPP_NO_UNIQUE_ADDRESS KeySelector key_selector;
71 RPP_NO_UNIQUE_ADDRESS ValueSelector value_selector;
72 RPP_NO_UNIQUE_ADDRESS KeyComparator comparator;
73
74 using subject_observer = decltype(std::declval<subjects::publish_subject<Type>>().get_observer());
75
76 mutable std::map<TKey, subject_observer, KeyComparator> key_to_observer{};
77 std::shared_ptr<refcount_disposable> disposable = [&] {
79 observer.set_upstream(ptr->add_ref());
80 return ptr;
81 }();
82
83 void set_upstream(const rpp::disposable_wrapper& d) const
84 {
85 disposable->add(d);
86 }
87
88 bool is_disposed() const
89 {
90 return disposable->is_disposed();
91 }
92
93 template<rpp::constraint::decayed_same_as<T> TT>
94 void on_next(TT&& val) const
95 {
96 const auto subject_observer = deduce_observer(observer, val);
97 if (subject_observer && !subject_observer->is_disposed())
98 subject_observer->on_next(value_selector(std::forward<TT>(val)));
99 }
100
101 void on_error(const std::exception_ptr& err) const
102 {
103 for (const auto& [key, subject_observer] : key_to_observer)
104 subject_observer.on_error(err);
105
106 observer.on_error(err);
107 }
108
109 void on_completed() const
110 {
111 for (const auto& [key, subject_observer] : key_to_observer)
112 subject_observer.on_completed();
113
115 }
116
117 private:
118 template<rpp::constraint::decayed_same_as<T> TT>
119 const subject_observer* deduce_observer(const rpp::constraint::observer auto& obs, const TT& val) const
120 {
121 const auto key = key_selector(utils::as_const(val));
122
123 if (const auto itr = key_to_observer.find(key); itr != key_to_observer.cend())
124 return &itr->second;
125
126 if (obs.is_disposed())
127 return nullptr;
128
130
131 disposable->add(subj.get_disposable().as_weak());
133 key,
134 group_by_observable_strategy<Type>{subj, disposable}});
135
136 return &key_to_observer.emplace(key, subj.get_observer()).first->second;
137 }
138 };
139
140 template<rpp::constraint::decayed_type T>
142 {
143 using value_type = T;
144 using optimal_disposables_strategy = typename rpp::subjects::publish_subject<T>::optimal_disposables_strategy;
145
147 std::weak_ptr<refcount_disposable> disposable;
148
149 template<rpp::constraint::observer_strategy<T> Strategy>
150 void subscribe(observer<T, Strategy>&& obs) const
151 {
152 if (const auto locked = disposable.lock())
153 {
154 auto d = locked->add_ref();
155 obs.set_upstream(d);
156 subj.get_observable()
157 .subscribe(rpp::observer<T, group_by_inner_observer_strategy<observer<T, Strategy>>>{std::move(obs), std::move(d)});
158 }
159 }
160 };
161
162 template<rpp::constraint::decayed_type KeySelector, rpp::constraint::decayed_type ValueSelector, rpp::constraint::decayed_type KeyComparator>
163 struct group_by_t : lift_operator<group_by_t<KeySelector, ValueSelector, KeyComparator>, KeySelector, ValueSelector, KeyComparator>
164 {
165 using operators::details::lift_operator<group_by_t<KeySelector, ValueSelector, KeyComparator>, KeySelector, ValueSelector, KeyComparator>::lift_operator;
166
167 template<rpp::constraint::decayed_type T>
169 {
170 static_assert(!std::same_as<void, std::invoke_result_t<KeySelector, T>>, "KeySelector is not invocacble with T");
171 static_assert(!std::same_as<void, std::invoke_result_t<ValueSelector, T>>, "ValueSelector is not invocable with T");
172 static_assert(std::strict_weak_order<KeyComparator, rpp::utils::decayed_invoke_result_t<KeySelector, T>, rpp::utils::decayed_invoke_result_t<KeySelector, T>>, "KeyComparator is not invocable with result of KeySelector");
173
175
176 template<rpp::constraint::observer_of_type<result_type> TObserver>
178 };
179
180 template<rpp::details::observables::constraint::disposables_strategy Prev>
182 };
183} // namespace rpp::operators::details
184
185namespace rpp::operators
186{
216 template<typename KeySelector,
217 typename ValueSelector,
218 typename KeyComparator>
219 requires (
220 (!utils::is_not_template_callable<KeySelector> || !std::same_as<void, std::invoke_result_t<KeySelector, rpp::utils::convertible_to_any>>) && (!utils::is_not_template_callable<ValueSelector> || !std::same_as<void, std::invoke_result_t<ValueSelector, rpp::utils::convertible_to_any>>) && (!utils::is_not_template_callable<KeyComparator> || std::strict_weak_order<KeyComparator, rpp::utils::convertible_to_any, rpp::utils::convertible_to_any>))
221 auto group_by(KeySelector&& key_selector, ValueSelector&& value_selector, KeyComparator&& comparator)
222 {
223 return details::group_by_t<std::decay_t<KeySelector>, std::decay_t<ValueSelector>, std::decay_t<KeyComparator>>{
224 std::forward<KeySelector>(key_selector),
225 std::forward<ValueSelector>(value_selector),
226 std::forward<KeyComparator>(comparator)};
227 }
228} // namespace rpp::operators
void on_next(const Type &v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:84
void set_upstream(const disposable_wrapper &d) noexcept
Observable calls this method to pass disposable. Observer disposes this disposable WHEN observer want...
Definition observer.hpp:49
void on_completed() const noexcept
Observable calls this method to notify observer about completion of emissions.
Definition observer.hpp:135
void on_error(const std::exception_ptr &err) const noexcept
Observable calls this method to notify observer about some error during generation next data.
Definition observer.hpp:120
bool is_disposed() const noexcept
Observable calls this method to check if observer interested or not in emissions.
Definition observer.hpp:74
static disposable_wrapper_impl make(TArgs &&... args)
Main way to create disposable_wrapper. Passed TTarget type can be any type derived from TDisposable.
Definition disposable_wrapper.hpp:164
Extension over rpp::observable for some "subset" of values from original observable grouped by some k...
Definition grouped_observable.hpp:28
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition strategy.hpp:28
Subject which just multicasts values to observers subscribed on it. It contains two parts: observer a...
Definition publish_subject.hpp:81
Definition fwd.hpp:250
Definition function_traits.hpp:45
auto group_by(KeySelector &&key_selector, ValueSelector &&value_selector={}, KeyComparator &&comparator={})
Divide original observable into multiple observables where each new observable emits some group of va...
Definition group_by.hpp:213
Definition disposables_strategy.hpp:29
Definition group_by.hpp:164