4#include <rpp/observables/grouped_observable.hpp>
5#include <rpp/operators/lift.hpp>
6#include <rpp/operators/details/subscriber_with_state.hpp>
7#include <rpp/operators/fwd/group_by.hpp>
8#include <rpp/subjects/publish_subject.hpp>
9#include <rpp/subscribers/constraints.hpp>
10#include <rpp/utils/utilities.hpp>
15IMPLEMENTATION_FILE(group_by_tag);
28 subscribers.fetch_add(1, std::memory_order::acq_rel);
29 dest.
add([state = this->weak_from_this()]
31 if (
const auto locked = state.lock())
32 if (locked->subscribers.fetch_sub(1, std::memory_order::acq_rel) == 1)
33 locked->lifetime.unsubscribe();
37 const auto& get_source_lifetime()
const {
return lifetime; }
41 std::atomic_size_t subscribers{};
44template<constra
int::decayed_type TKey, constra
int::decayed_type Type, std::strict_weak_order<TKey, TKey> KeyComparator>
49 , key_to_subject{comparator} { }
51 std::map<TKey, subjects::publish_subject<Type>, KeyComparator> key_to_subject;
53 void broadcast(
const auto& action,
const auto& subscriber)
const
55 for (
const auto& [_, subject] : key_to_subject)
56 action(subject.get_subscriber());
62template<constra
int::decayed_type Type>
66 std::weak_ptr<group_by_state_base> state;
68 void operator()(
auto&& subscriber)
const
70 if (
const auto locked = state.lock())
72 locked->on_subscribe(subscriber.get_subscription());
73 subject.get_observable().subscribe(std::forward<
decltype(subscriber)>(subscriber));
81template<constra
int::decayed_type TKey, constra
int::decayed_type ResValue>
87template<constraint::decayed_type Type,
88 constraint::decayed_type TKey,
89 std::invocable<Type> KeySelector,
90 std::invocable<Type> ValueSelector,
91 std::strict_weak_order<TKey, TKey> KeyComparator>
94 using ValueType = utils::decayed_invoke_result_t<ValueSelector, Type>;
97 RPP_NO_UNIQUE_ADDRESS KeySelector key_selector;
98 RPP_NO_UNIQUE_ADDRESS ValueSelector value_selector;
99 RPP_NO_UNIQUE_ADDRESS KeyComparator comparator;
102 struct observer_state
104 std::shared_ptr<StateType> state;
105 RPP_NO_UNIQUE_ADDRESS KeySelector key_selector;
106 RPP_NO_UNIQUE_ADDRESS ValueSelector value_selector;
111 void operator()(
auto&& val,
const auto& subscriber,
const observer_state& state)
const
113 auto key = state.key_selector(utils::as_const(val));
114 auto [itr, inserted] = state.state->key_to_subject.try_emplace(key);
119 const auto& subject_sub = itr->second.get_subscriber();
120 if (subject_sub.is_subscribed())
121 subject_sub.on_next(state.value_selector(std::forward<
decltype(val)>(val)));
127 void operator()(
const std::exception_ptr& err,
const auto& subscriber,
const observer_state& state)
const
129 state.state->broadcast([&err](
const auto& sub) { sub.on_error(err); }, subscriber);
135 void operator()(
const auto& subscriber,
const observer_state& state)
const
137 state.state->broadcast([](
const auto& sub) { sub.on_completed(); }, subscriber);
142 template<constra
int::subscriber TSub>
143 auto operator()(TSub&& subscriber)
const
145 auto state = std::make_shared<StateType>(comparator);
147 state->on_subscribe(subscriber.get_subscription());
149 return create_subscriber_with_state<Type>(state->get_source_lifetime(),
153 std::forward<TSub>(subscriber),
154 observer_state{state, key_selector, value_selector});
158template<constraint::decayed_type Type,
159 constraint::decayed_type TKey,
160 std::invocable<Type> KeySelector,
161 std::invocable<Type> ValueSelector,
162 std::strict_weak_order<TKey, TKey> KeyComparator>
163auto group_by_impl(
auto&& observable, KeySelector&& key_selector, ValueSelector&& value_selector, KeyComparator&& comparator)
168 return std::forward<decltype(observable)>(observable)
169 .template lift<Res>(Lifter{std::forward<KeySelector>(key_selector),
170 std::forward<ValueSelector>(value_selector),
171 std::forward<KeyComparator>(comparator)});
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
std::weak_ptr< details::subscription_state > add(const TSub &sub=TSub{}) const
Add any other subscription to this as dependent.
Definition: composite_subscription.hpp:43
Definition: group_by.hpp:20
Definition: grouped_observable.hpp:20
Subject which just multicasts values to observers subscribed on it. It contains two parts: subscriber...
Definition: publish_subject.hpp:78
Definition: group_by.hpp:93
Definition: group_by.hpp:64
Definition: group_by.hpp:46