ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
publish_subject.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/subjects/fwd.hpp>
13
14#include <rpp/disposables/disposable_wrapper.hpp>
15#include <rpp/observers/observer.hpp>
16#include <rpp/subjects/details/subject_on_subscribe.hpp>
17#include <rpp/subjects/details/subject_state.hpp>
18
19namespace rpp::subjects::details
20{
21 template<rpp::constraint::decayed_type Type, bool Serialized>
22 class publish_subject_base
23 {
24 struct observer_strategy
25 {
26 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
27
28 std::shared_ptr<details::subject_state<Type, Serialized>> state{};
29
30 void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); }
31
32 bool is_disposed() const noexcept { return state->is_disposed(); }
33
34 void on_next(const Type& v) const { state->on_next(v); }
35
36 void on_error(const std::exception_ptr& err) const { state->on_error(err); }
37
38 void on_completed() const { state->on_completed(); }
39 };
40
41 public:
42 using optimal_disposables_strategy = typename details::subject_state<Type, Serialized>::optimal_disposables_strategy;
43
44 publish_subject_base() = default;
45
46 auto get_observer() const
47 {
48 return rpp::observer<Type, observer_strategy>{m_state.lock()};
49 }
50
51 auto get_observable() const
52 {
53 return create_subject_on_subscribe_observable<Type, optimal_disposables_strategy>([state = m_state]<rpp::constraint::observer_of_type<Type> TObs>(TObs&& observer) { state.lock()->on_subscribe(std::forward<TObs>(observer)); });
54 }
55
56 rpp::disposable_wrapper get_disposable() const
57 {
58 return m_state;
59 }
60
61 private:
63 };
64} // namespace rpp::subjects::details
65namespace rpp::subjects
66{
79 template<rpp::constraint::decayed_type Type>
80 class publish_subject final : public details::publish_subject_base<Type, false>
81 {
82 public:
83 using details::publish_subject_base<Type, false>::publish_subject_base;
84 };
85
93 template<rpp::constraint::decayed_type Type>
95 {
96 public:
97 using details::publish_subject_base<Type, true>::publish_subject_base;
98 };
99} // namespace rpp::subjects
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Definition publish_subject.hpp:23
Subject which just multicasts values to observers subscribed on it. It contains two parts: observer a...
Definition publish_subject.hpp:81
Serialized version of rpp::subjects::publish_subject.
Definition publish_subject.hpp:95
Definition fwd.hpp:253
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34