ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
behavior_subject.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/schedulers/fwd.hpp>
13#include <rpp/subjects/fwd.hpp>
14
15#include <rpp/disposables/disposable_wrapper.hpp>
16#include <rpp/observers/observer.hpp>
17#include <rpp/subjects/details/subject_on_subscribe.hpp>
18#include <rpp/subjects/details/subject_state.hpp>
19
20#include <utility>
21
22namespace rpp::subjects::details
23{
24 template<rpp::constraint::decayed_type Type, bool Serialized>
25 class behavior_subject_base
26 {
27 class behavior_state final : public subject_state<Type, Serialized>
28 {
29 public:
30 behavior_state(const Type& v)
31 : m_value{v}
32 {
33 }
34 behavior_state(Type&& v)
35 : m_value{std::move(v)}
36 {
37 }
38
39 rpp::utils::pointer_under_lock<Type> get_value() { return rpp::utils::pointer_under_lock<Type>{m_value}; }
40
41 private:
43 };
44
45 struct observer_strategy
46 {
47 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;
48
49 std::shared_ptr<behavior_state> state;
50
51 void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); }
52
53 bool is_disposed() const noexcept { return state->is_disposed(); }
54
55 void on_next(const Type& v) const
56 {
57 *state->get_value() = v;
58 state->on_next(v);
59 }
60
61 void on_error(const std::exception_ptr& err) const { state->on_error(err); }
62
63 void on_completed() const { state->on_completed(); }
64 };
65
66 public:
67 using optimal_disposables_strategy = typename details::subject_state<Type, Serialized>::optimal_disposables_strategy;
68
69 explicit behavior_subject_base(const Type& value)
71 {
72 }
73
74 explicit behavior_subject_base(Type&& value)
75 : m_state{disposable_wrapper_impl<behavior_state>::make(std::move(value))}
76 {
77 }
78
79 auto get_observer() const
80 {
81 return rpp::observer<Type, observer_strategy>{m_state.lock()};
82 }
83
84 auto get_observable() const
85 {
86 return create_subject_on_subscribe_observable<Type, optimal_disposables_strategy>([state = m_state]<rpp::constraint::observer_of_type<Type> TObs>(TObs&& observer) {
87 const auto locked = state.lock();
88 if (!locked->is_disposed())
89 {
90 auto v = *locked->get_value();
91 observer.on_next(std::move(v));
92 }
93 locked->on_subscribe(std::forward<TObs>(observer));
94 });
95 }
96
97 rpp::disposable_wrapper get_disposable() const
98 {
99 return m_state;
100 }
101
102 Type get_value() const
103 {
104 return *m_state.lock()->get_value();
105 }
106
107
108 private:
110 };
111} // namespace rpp::subjects::details
112
113namespace rpp::subjects
114{
123 template<rpp::constraint::decayed_type Type>
124 class behavior_subject final : public details::behavior_subject_base<Type, false>
125 {
126 public:
127 using details::behavior_subject_base<Type, false>::behavior_subject_base;
128 };
129
137 template<rpp::constraint::decayed_type Type>
139 {
140 public:
141 using details::behavior_subject_base<Type, true>::behavior_subject_base;
142 };
143} // namespace rpp::subjects
void on_next(const Type &v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:84
Main RPP wrapper over disposables.
Definition disposable_wrapper.hpp:142
static disposable_wrapper_impl make(TArgs &&... args)
Main way to create disposable_wrapper. Passed TTarget type can be any type derived from TDisposable.
Definition disposable_wrapper.hpp:164
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Same as rpp::subjects::publish_subject but keeps last value (or default) and emits it to newly subscr...
Definition behavior_subject.hpp:125
Definition behavior_subject.hpp:26
Definition subject_state.hpp:41
Same as rpp::subjects::behavior_subject but on_next/on_error/on_completed calls are serialized via mu...
Definition behavior_subject.hpp:139
Definition utils.hpp:260
Definition fwd.hpp:253
disposable_wrapper_impl< interface_disposable > disposable_wrapper
Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl.
Definition fwd.hpp:34