13#include <rpp/operators/fwd.hpp>
15#include <rpp/disposables/refcount_disposable.hpp>
16#include <rpp/observables/grouped_observable.hpp>
17#include <rpp/operators/details/strategy.hpp>
18#include <rpp/subjects/publish_subject.hpp>
19#include <rpp/utils/function_traits.hpp>
24namespace rpp::operators::details
26 template<rpp::constra
int::decayed_type T>
32 template<constra
int::decayed_type TKey, constra
int::decayed_type ResValue>
36namespace rpp::operators::details
38 template<rpp::constra
int::observer TObserver>
41 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
43 RPP_NO_UNIQUE_ADDRESS TObserver observer;
47 void on_next(T&& v)
const
49 observer.on_next(std::forward<T>(v));
52 void on_error(
const std::exception_ptr& err)
const { observer.on_error(err); }
54 void on_completed()
const { observer.on_completed(); }
56 bool is_disposed()
const {
return observer.is_disposed(); }
61 template<rpp::constra
int::decayed_type T, rpp::constra
int::observer TObserver, rpp::constra
int::decayed_type KeySelector, rpp::constra
int::decayed_type ValueSelector, rpp::constra
int::decayed_type KeyComparator>
64 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
66 using TKey = rpp::utils::decayed_invoke_result_t<KeySelector, T>;
67 using Type = rpp::utils::decayed_invoke_result_t<ValueSelector, T>;
69 RPP_NO_UNIQUE_ADDRESS TObserver observer;
70 RPP_NO_UNIQUE_ADDRESS KeySelector key_selector;
71 RPP_NO_UNIQUE_ADDRESS ValueSelector value_selector;
72 RPP_NO_UNIQUE_ADDRESS KeyComparator comparator;
74 using subject_observer =
decltype(std::declval<subjects::publish_subject<Type>>().get_observer());
76 mutable std::map<TKey, subject_observer, KeyComparator> key_to_observer{};
77 std::shared_ptr<refcount_disposable> disposable = [&] {
79 observer.set_upstream(ptr->add_ref());
88 bool is_disposed()
const
90 return disposable->is_disposed();
93 template<rpp::constra
int::decayed_same_as<T> TT>
94 void on_next(TT&& val)
const
96 const auto subject_observer = deduce_observer(observer, val);
97 if (subject_observer && !subject_observer->is_disposed())
98 subject_observer->on_next(value_selector(std::forward<TT>(val)));
101 void on_error(
const std::exception_ptr& err)
const
103 for (
const auto& [key, subject_observer] : key_to_observer)
104 subject_observer.on_error(err);
106 observer.on_error(err);
109 void on_completed()
const
111 for (
const auto& [key, subject_observer] : key_to_observer)
112 subject_observer.on_completed();
114 observer.on_completed();
118 template<rpp::constra
int::decayed_same_as<T> TT>
121 const auto key = key_selector(utils::as_const(val));
123 if (
const auto itr = key_to_observer.find(key); itr != key_to_observer.cend())
126 if (obs.is_disposed())
131 disposable->add(subj.get_disposable().as_weak());
132 obs.on_next(rpp::grouped_observable_group_by<TKey, Type>{
136 return &key_to_observer.emplace(key, subj.get_observer()).first->second;
140 template<rpp::constra
int::decayed_type T>
143 using value_type = T;
144 using optimal_disposables_strategy =
typename rpp::subjects::publish_subject<T>::optimal_disposables_strategy;
147 std::weak_ptr<refcount_disposable> disposable;
149 template<rpp::constra
int::observer_strategy<T> Strategy>
152 if (
const auto locked = disposable.lock())
154 auto d = locked->add_ref();
156 subj.get_observable()
162 template<rpp::constra
int::decayed_type KeySelector, rpp::constra
int::decayed_type ValueSelector, rpp::constra
int::decayed_type KeyComparator>
163 struct group_by_t : lift_operator<group_by_t<KeySelector, ValueSelector, KeyComparator>, KeySelector, ValueSelector, KeyComparator>
167 template<rpp::constra
int::decayed_type T>
170 static_assert(!std::same_as<void, std::invoke_result_t<KeySelector, T>>,
"KeySelector is not invocacble with T");
171 static_assert(!std::same_as<void, std::invoke_result_t<ValueSelector, T>>,
"ValueSelector is not invocable with T");
172 static_assert(std::strict_weak_order<KeyComparator, rpp::utils::decayed_invoke_result_t<KeySelector, T>, rpp::utils::decayed_invoke_result_t<KeySelector, T>>,
"KeyComparator is not invocable with result of KeySelector");
176 template<rpp::constra
int::observer_of_type<result_type> TObserver>
180 template<rpp::details::observables::constra
int::disposables_strategy Prev>
185namespace rpp::operators
216 template<
typename KeySelector,
217 typename ValueSelector,
218 typename KeyComparator>
221 auto group_by(KeySelector&& key_selector, ValueSelector&& value_selector, KeyComparator&& comparator)
224 std::forward<KeySelector>(key_selector),
225 std::forward<ValueSelector>(value_selector),
226 std::forward<KeyComparator>(comparator)};
static disposable_wrapper_impl make(TArgs &&... args)
Main way to create disposable_wrapper. Passed TTarget type can be any type derived from TDisposable.
Definition disposable_wrapper.hpp:164
Extension over rpp::observable for some "subset" of values from original observable grouped by some k...
Definition grouped_observable.hpp:28
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition strategy.hpp:28
Subject which just multicasts values to observers subscribed on it. It contains two parts: observer a...
Definition publish_subject.hpp:81
Definition function_traits.hpp:45
disposable_wrapper_impl< interface_composite_disposable > composite_disposable_wrapper
Wrapper to keep "composite" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:41
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34
Definition disposables_strategy.hpp:29
Definition group_by.hpp:40
Definition group_by.hpp:142
Definition group_by.hpp:63
Definition group_by.hpp:169
Definition group_by.hpp:164