ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
specific_subscriber.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/defs.hpp>
14#include <rpp/observers/constraints.hpp>
15#include <rpp/observers/specific_observer.hpp>
16#include <rpp/subscribers/fwd.hpp>
17#include <rpp/subscribers/details/subscriber_base.hpp>
18#include <rpp/subscriptions/composite_subscription.hpp>
19#include <rpp/utils/function_traits.hpp>
20
21namespace rpp
22{
28template<constraint::decayed_type Type, constraint::decayed_observer Observer>
31{
32public:
33 template<typename ...Types>
34 specific_subscriber(Types&&...vals) requires std::constructible_from<Observer, Types...>
35 : subscriber_base{}
36 , m_observer{ std::forward<Types>(vals)... } {}
37
38 template<typename ...Types>
39 specific_subscriber(composite_subscription sub, Types&&...vals) requires std::constructible_from<Observer, Types...>
40 : subscriber_base{ std::move(sub)}
41 , m_observer{ std::forward<Types>(vals)... } {}
42
43 const Observer& get_observer() const
44 {
45 return m_observer;
46 }
47
48 void on_next(const Type& val) const
49 {
50 on_next_impl(val);
51 }
52
53 void on_next(Type&& val) const
54 {
55 on_next_impl(std::move(val));
56 }
57
58 void on_error(const std::exception_ptr& err) const
59 {
60 do_if_subscribed_and_unsubscribe([&err, this] { m_observer.on_error(err); });
61 }
62
63 void on_completed() const
64 {
65 do_if_subscribed_and_unsubscribe([this] { m_observer.on_completed(); });
66 }
67
68 auto as_dynamic() const & { return dynamic_subscriber<Type>{*this}; }
69 auto as_dynamic() && { return dynamic_subscriber<Type>{this->get_subscription(), std::move(m_observer)}; }
70
71private:
72 void on_next_impl(auto&& val) const
73 {
74 if (!is_subscribed())
75 return;
76
77 try
78 {
79 m_observer.on_next(std::forward<decltype(val)>(val));
80 }
81 catch (...)
82 {
83 on_error(std::current_exception());
84 }
85 }
86
87 RPP_NO_UNIQUE_ADDRESS Observer m_observer{};
88};
89
90template<constraint::observer TObs>
92
93template<constraint::observer TObs>
95
96template<typename OnNext,
97 typename ...Args,
98 typename Type = std::decay_t<utils::function_argument_t<OnNext>>>
99specific_subscriber(composite_subscription, OnNext, Args ...) -> specific_subscriber<Type, details::deduce_specific_observer_type_t<Type, OnNext, Args...>>;
100
101template<typename OnNext,
102 typename ...Args,
103 typename Type = std::decay_t<utils::function_argument_t<OnNext>>>
104specific_subscriber(OnNext, Args ...) -> specific_subscriber<Type, details::deduce_specific_observer_type_t<Type, OnNext, Args...>>;
105
106
110template<typename Type, typename ...Args>
111auto make_specific_subscriber(Args&& ...args) -> specific_subscriber<Type, details::deduce_specific_observer_type_t<Type, Args...>>
112{
113 return {std::forward<Args>(args)...};
114}
115
116template<typename Type, typename ...Args>
117auto make_specific_subscriber(composite_subscription sub, Args&& ...args) -> specific_subscriber<Type, details::deduce_specific_observer_type_t<Type, Args...>>
118{
119 return {std::move(sub), std::forward<Args>(args)...};
120}
121
122template<typename Type, constraint::observer_of_type<Type> TObs, typename ...Args>
123auto make_specific_subscriber(composite_subscription sub, Args&& ...args) -> specific_subscriber<Type, TObs>
124{
125 return {std::move(sub), std::forward<Args>(args)...};
126}
127} // namespace rpp
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
base implementation of subscriber with possibility to obtain observer's callbacks,...
Definition: subscriber_base.hpp:23
subscriber which uses dynamic_observer<T> to hide original callbacks
Definition: dynamic_subscriber.hpp:24
specific version of subscriber which stores type of observer used inside to prevent extra allocations
Definition: specific_subscriber.hpp:31