ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
dynamic_observable.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/observables/fwd.hpp>
13#include <rpp/subjects/fwd.hpp>
14
15#include <rpp/observables/observable.hpp>
16#include <rpp/observers/dynamic_observer.hpp>
17
18#include <memory>
19#include <utility>
20
21namespace rpp::details::observables
22{
23 template<typename T, typename Observable>
24 void forwarding_subscribe(const void* const ptr, dynamic_observer<T>&& obs)
25 {
26 static_cast<const Observable*>(ptr)->subscribe(std::move(obs));
27 }
28
29 template<rpp::constraint::decayed_type Type>
30 class dynamic_strategy final
31 {
32 public:
33 using value_type = Type;
34 using optimal_disposables_strategy = rpp::details::observables::default_disposables_strategy;
35
36 template<rpp::constraint::observable_strategy<Type> Strategy>
38 explicit dynamic_strategy(observable<Type, Strategy>&& obs)
39 : m_forwarder{std::make_shared<observable<Type, Strategy>>(std::move(obs))}
40 , m_vtable{vtable::template create<observable<Type, Strategy>>()}
41 {
42 }
43
44 template<rpp::constraint::observable_strategy<Type> Strategy>
46 explicit dynamic_strategy(const observable<Type, Strategy>& obs)
47 : m_forwarder{std::make_shared<observable<Type, Strategy>>(obs)}
48 , m_vtable{vtable::template create<observable<Type, Strategy>>()}
49 {
50 }
51
52 template<rpp::constraint::observer_strategy<Type> ObserverStrategy>
53 void subscribe(observer<Type, ObserverStrategy>&& observer) const
54 {
55 m_vtable->subscribe(m_forwarder.get(), std::move(observer).as_dynamic());
56 }
57
58 private:
59 struct vtable
60 {
61 void (*subscribe)(const void*, dynamic_observer<Type>&&){};
62
63 template<rpp::constraint::observable Observable>
64 static const vtable* create() noexcept
65 {
66 static vtable s_res{
67 .subscribe = forwarding_subscribe<Type, Observable>};
68 return &s_res;
69 }
70 };
71
72 private:
73 std::shared_ptr<void> m_forwarder;
74 const vtable* m_vtable;
75 };
76} // namespace rpp::details::observables
77
78namespace rpp
79{
88 template<constraint::decayed_type Type>
89 class dynamic_observable : public observable<Type, details::observables::dynamic_strategy<Type>>
90 {
91 using base = observable<Type, details::observables::dynamic_strategy<Type>>;
92
93 public:
94 using base::base;
95
96 dynamic_observable(base&& b)
97 : base{std::move(b)}
98 {
99 }
100
101 dynamic_observable(const base& b)
102 : base{b}
103 {
104 }
105 };
106
107 template<typename Subject>
108 using dynamic_connectable_observable = connectable_observable<rpp::dynamic_observable<rpp::subjects::utils::extract_subject_type_t<Subject>>, Subject>;
109} // namespace rpp
Definition constraints.hpp:19
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226
Definition disposables_strategy.hpp:19