12#include <rpp/subscribers/constraints.hpp>
13#include <rpp/operators/details/subscriber_with_state.hpp>
14#include <rpp/subscriptions/composite_subscription.hpp>
22auto lock(
const std::shared_ptr<T>& ptr)
24 return std::lock_guard{*ptr};
28auto lock(
const std::reference_wrapper<T>& ref)
30 return std::lock_guard{ref.get()};
36 void operator()(T&& v,
const auto& subscriber,
const auto& primitive)
const
38 auto lock_guard = lock(primitive);
39 subscriber.on_next(std::forward<T>(v));
45 void operator()(
const std::exception_ptr& err,
46 const auto& subscriber,
47 const auto& primitive)
const
49 auto lock_guard = lock(primitive);
50 subscriber.on_error(err);
56 void operator()(
const auto& subscriber,
const auto& primitive)
const
58 auto lock_guard = lock(primitive);
59 subscriber.on_completed();
63template<
typename TSerializationPrimitive, constra
int::subscriber TSub>
64auto make_serialized_subscriber(TSub&& subscriber,
65 const std::shared_ptr<TSerializationPrimitive>& primitive)
67 auto sub = subscriber.get_subscription();
68 return create_subscriber_with_state<utils::extract_subscriber_type_t<std::decay_t<TSub>>>(std::move(sub),
70 forwarding_on_error_under_lock{},
71 forwarding_on_completed_under_lock{},
72 std::forward<TSub>(subscriber),
76template<
typename TSerializationPrimitive, constra
int::subscriber TSub>
77auto make_serialized_subscriber(TSub&& subscriber,
78 std::reference_wrapper<TSerializationPrimitive> primitive)
80 auto sub = subscriber.get_subscription();
81 return create_subscriber_with_state<utils::extract_subscriber_type_t<std::decay_t<TSub>>>(std::move(sub),
82 forwarding_on_next_under_lock{},
83 forwarding_on_error_under_lock{},
84 forwarding_on_completed_under_lock{},
85 std::forward<TSub>(subscriber),
Definition: serialized_subscriber.hpp:55
Definition: serialized_subscriber.hpp:44
Definition: serialized_subscriber.hpp:34