ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
dynamic_observer.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/disposables/fwd.hpp>
13#include <rpp/observers/fwd.hpp>
14
15#include <rpp/observers/observer.hpp>
16
17#include <memory>
18#include <utility>
19
20namespace rpp::details::observers
21{
22 template<typename Type>
24 {
25 public:
26 void set_upstream(const disposable_wrapper& d) noexcept { m_vtable.set_upstream_ptr(this, d); }
27 bool is_disposed() const noexcept { return m_vtable.is_disposed_ptr(this); }
28
29 void on_next(const Type& v) const noexcept { m_vtable.on_next_lvalue_ptr(this, v); }
30 void on_next(Type&& v) const noexcept { m_vtable.on_next_rvalue_ptr(this, std::move(v)); }
31 void on_error(const std::exception_ptr& err) const noexcept { m_vtable.on_error_ptr(this, err); }
32 void on_completed() const noexcept { m_vtable.on_completed_ptr(this); }
33
34 protected:
35 struct vtable_t
36 {
37 void (*const on_next_lvalue_ptr)(const observer_vtable*, const Type&){};
38 void (*const on_next_rvalue_ptr)(const observer_vtable*, Type&&){};
39 void (*const on_error_ptr)(const observer_vtable*, const std::exception_ptr&){};
40 void (*const on_completed_ptr)(const observer_vtable*){};
41
42 void (*const set_upstream_ptr)(observer_vtable*, const disposable_wrapper&){};
43 bool (*const is_disposed_ptr)(const observer_vtable*){};
44 };
45
47 : m_vtable{std::move(vtable)}
48 {
49 }
50
51 const vtable_t m_vtable{};
52 };
53
54 template<rpp::constraint::observer TObs>
55 class type_erased_observer : public observer_vtable<rpp::utils::extract_observer_type_t<TObs>>
56 {
57 using Type = rpp::utils::extract_observer_type_t<TObs>;
59 using Vtable = typename Base::vtable_t;
60
61 static constexpr const TObs& cast(const Base* ptr)
62 {
63 return static_cast<const type_erased_observer*>(ptr)->m_observer;
64 }
65
66 static constexpr TObs& cast(Base* ptr)
67 {
68 return static_cast<type_erased_observer*>(ptr)->m_observer;
69 }
70
71 public:
73 : Base{Vtable{
74 .on_next_lvalue_ptr = +[](const Base* b, const Type& v) { cast(b).on_next(v); },
75 .on_next_rvalue_ptr = +[](const Base* b, Type&& v) { cast(b).on_next(std::move(v)); },
76 .on_error_ptr = +[](const Base* b, const std::exception_ptr& err) { cast(b).on_error(err); },
77 .on_completed_ptr = +[](const Base* b) { cast(b).on_completed(); },
78 .set_upstream_ptr = +[](Base* b, const rpp::disposable_wrapper& d) { cast(b).set_upstream(d); },
79 .is_disposed_ptr = +[](const Base* b) {
80 return cast(b).is_disposed();
81 }}}
82 , m_observer{std::move(observer)}
83 {
84 }
85
86 private:
87 RPP_NO_UNIQUE_ADDRESS TObs m_observer;
88 };
89
90 template<rpp::constraint::decayed_type Type>
91 class dynamic_strategy final
92 {
93 public:
94 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
95
96 template<rpp::constraint::observer_strategy<Type> Strategy>
99 : m_observer{std::make_shared<type_erased_observer<observer<Type, Strategy>>>(std::move(obs))}
100 {
101 }
102
103 void set_upstream(const disposable_wrapper& d) noexcept { m_observer->set_upstream(d); }
104 bool is_disposed() const noexcept { return m_observer->is_disposed(); }
105
106 void on_next(const Type& v) const noexcept { m_observer->on_next(v); }
107 void on_next(Type&& v) const noexcept { m_observer->on_next(std::move(v)); }
108 void on_error(const std::exception_ptr& err) const noexcept { m_observer->on_error(err); }
109 void on_completed() const noexcept { m_observer->on_completed(); }
110
111 private:
112 std::shared_ptr<observer_vtable<Type>> m_observer;
113 };
114} // namespace rpp::details::observers
115
116
117namespace rpp
118{
127 template<constraint::decayed_type Type>
128 class dynamic_observer final : public observer<Type, details::observers::dynamic_strategy<Type>>
129 {
130 using base = observer<Type, details::observers::dynamic_strategy<Type>>;
131
132 public:
133 using base::base;
134
135 dynamic_observer(base&& b)
136 : base{std::move(b)}
137 {
138 }
139
140 dynamic_observer(const base& b)
141 : base{b}
142 {
143 }
144 };
145} // namespace rpp
Definition dynamic_observer.hpp:24
Definition dynamic_observer.hpp:56
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition constraints.hpp:19
Definition dynamic_observer.hpp:36