ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
serialized_subscriber.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/subscribers/constraints.hpp>
13#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
14#include <rpp/subscriptions/composite_subscription.hpp>
15
16#include <memory>
17#include <mutex>
18
19namespace rpp::details
20{
21template<typename T>
22auto lock(const std::shared_ptr<T>& ptr)
23{
24 return std::lock_guard{*ptr};
25}
26
27template<typename T>
28auto lock(const std::reference_wrapper<T>& ref)
29{
30 return std::lock_guard{ref.get()};
31}
32
34{
35 template<typename T>
36 void operator()(T&& v, const auto& subscriber, const auto& primitive) const
37 {
38 auto lock_guard = lock(primitive);
39 subscriber.on_next(std::forward<T>(v));
40 }
41};
42
44{
45 void operator()(const std::exception_ptr& err,
46 const auto& subscriber,
47 const auto& primitive) const
48 {
49 auto lock_guard = lock(primitive);
50 subscriber.on_error(err);
51 }
52};
53
55{
56 void operator()(const auto& subscriber, const auto& primitive) const
57 {
58 auto lock_guard = lock(primitive);
59 subscriber.on_completed();
60 }
61};
62
63template<typename TSerializationPrimitive, constraint::subscriber TSub>
64auto make_serialized_subscriber(TSub&& subscriber,
65 const std::shared_ptr<TSerializationPrimitive>& primitive)
66{
67 auto sub = subscriber.get_subscription();
68 return create_subscriber_with_state<utils::extract_subscriber_type_t<std::decay_t<TSub>>>(std::move(sub),
70 forwarding_on_error_under_lock{},
71 forwarding_on_completed_under_lock{},
72 std::forward<TSub>(subscriber),
73 primitive);
74}
75
76template<typename TSerializationPrimitive, constraint::subscriber TSub>
77auto make_serialized_subscriber(TSub&& subscriber,
78 std::reference_wrapper<TSerializationPrimitive> primitive)
79{
80 auto sub = subscriber.get_subscription();
81 return create_subscriber_with_state<utils::extract_subscriber_type_t<std::decay_t<TSub>>>(std::move(sub),
82 forwarding_on_next_under_lock{},
83 forwarding_on_error_under_lock{},
84 forwarding_on_completed_under_lock{},
85 std::forward<TSub>(subscriber),
86 primitive);
87}
88} // namespace rpp::details
Definition: serialized_subscriber.hpp:55
Definition: serialized_subscriber.hpp:44
Definition: serialized_subscriber.hpp:34