ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
forwarding_subject.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/subjects/fwd.hpp>
13
14#include <rpp/disposables/disposable_wrapper.hpp>
15#include <rpp/disposables/refcount_disposable.hpp>
16#include <rpp/observers/observer.hpp>
17#include <rpp/subjects/details/subject_on_subscribe.hpp>
18#include <rpp/subjects/details/subject_state.hpp>
19
20#include <memory>
21
22namespace rpp::operators::details
23{
24 template<rpp::constraint::decayed_type Type>
25 class forwarding_subject
26 {
27 struct observer_strategy
28 {
29 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
30
31 std::shared_ptr<subjects::details::subject_state<Type, false>> state{};
32
33 void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); }
34
35 bool is_disposed() const noexcept { return state->is_disposed(); }
36
37 void on_next(const Type& v) const { state->on_next(v); }
38
39 void on_error(const std::exception_ptr& err) const { state->on_error(err); }
40
41 void on_completed() const { state->on_completed(); }
42 };
43
44 public:
45 using optimal_disposables_strategy = typename subjects::details::subject_state<Type, false>::optimal_disposables_strategy::template add<1>;
46
47 explicit forwarding_subject(disposable_wrapper_impl<rpp::refcount_disposable> refcount)
48 : m_refcount{std::move(refcount)}
49 {
50 }
51
52 auto get_observer() const
53 {
54 return rpp::observer<Type, observer_strategy>{m_state.lock()};
55 }
56
57 auto get_observable() const
58 {
59 return subjects::details::create_subject_on_subscribe_observable<Type, optimal_disposables_strategy>([state = m_state.as_weak(), refcount = m_refcount]<rpp::constraint::observer_of_type<Type> TObs>(TObs&& observer) {
60 if (const auto locked_state = state.lock())
61 {
62 if (const auto locked = refcount.lock())
63 observer.set_upstream(locked->add_ref());
64 locked_state->on_subscribe(std::forward<TObs>(observer));
65 }
66 });
67 }
68
69 rpp::composite_disposable_wrapper get_disposable() const
70 {
71 return m_state.as_weak();
72 }
73
74 private:
77 };
78} // namespace rpp::operators::details
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition subject_state.hpp:41
Definition fwd.hpp:253
disposable_wrapper_impl< interface_composite_disposable > composite_disposable_wrapper
Wrapper to keep "composite" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:41
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34