ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
behavior_subject.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/subjects/fwd.hpp>
13#include <rpp/utils/constraints.hpp>
14#include <rpp/subscribers/dynamic_subscriber.hpp>
15#include <rpp/subjects/details/subject_state.hpp>
16#include <rpp/subjects/details/base_subject.hpp>
17
18namespace rpp::subjects::details
19{
20template<rpp::constraint::decayed_type T>
22{
23public:
24 template<rpp::constraint::decayed_same_as<T> TT, rpp::constraint::decayed_same_as<composite_subscription> TSub>
25 behavior_strategy(TT&& v, TSub&& sub)
26 : m_state{std::make_shared<behavior_state>(std::forward<TT>(v))}
27 , m_sub{std::forward<TSub>(sub)}
28 {
29 m_sub.add([state = std::weak_ptr{m_state}]
30 {
31 if (const auto locked = state.lock())
32 locked->on_unsubscribe();
33 });
34 }
35
36 void on_subscribe(const dynamic_subscriber<T>& sub) const
37 {
38 if (m_sub.is_subscribed())
39 sub.on_next(m_state->get_value());
40
41 m_state->on_subscribe(sub);
42 }
43
44 auto get_subscriber() const
45 {
46 return rpp::make_specific_subscriber<T>(m_sub,
47 [state = m_state](const T& v)
48 {
49 state->set_value(v);
50 state->on_next(v);
51 },
52 [state = m_state](const std::exception_ptr& err)
53 {
54 state->on_error(err);
55 },
56 [state = m_state]()
57 {
58 state->on_completed();
59 });
60 }
61
62 T get_value() const
63 {
64 return m_state->get_value();
65 }
66
67private:
68 class behavior_state : public subject_state<T>
69 {
70 public:
71 behavior_state(const T& v)
73 , value{v} {}
74
75 behavior_state(T&& v)
77 , value{std::move(v)} {}
78
79 T get_value()
80 {
81 std::lock_guard lock{mutex};
82 return value;
83 }
84
85
86 void set_value(const T& v)
87 {
88 std::lock_guard lock{mutex};
89 value = v;
90 }
91
92 private:
93
94 std::mutex mutex;
95 T value;
96 };
97
98 std::shared_ptr<behavior_state> m_state;
100};
101} // namespace rpp::subjects::details
102
103namespace rpp::subjects
104{
117template<rpp::constraint::decayed_type T>
118class behavior_subject final : public details::base_subject<T, details::behavior_strategy<T>>
119{
120public:
121 behavior_subject(const T& initial_value, const composite_subscription& sub)
123
124 behavior_subject(T&& initial_value, const composite_subscription& sub)
125 : details::base_subject<T, details::behavior_strategy<T>>{std::move(initial_value), sub} {}
126
127 behavior_subject(const T& initial_value, composite_subscription&& sub = composite_subscription{})
128 : details::base_subject<T, details::behavior_strategy<T>>{initial_value, std::move(sub)} {}
129
131 : details::base_subject<T, details::behavior_strategy<T>>{std::move(initial_value), std::move(sub)} {}
132
133
134 T get_value() const
135 {
136 return details::base_subject<T, details::behavior_strategy<T>>::get_strategy().get_value();
137 }
138};
139} // namespace rpp::subjects
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
std::weak_ptr< details::subscription_state > add(const TSub &sub=TSub{}) const
Add any other subscription to this as dependent.
Definition: composite_subscription.hpp:43
subscriber which uses dynamic_observer<T> to hide original callbacks
Definition: dynamic_subscriber.hpp:24
Subject which multicasts values to observers subscribed on it and sends last emitted value (or initia...
Definition: behavior_subject.hpp:119
Definition: base_subject.hpp:23
Definition: behavior_subject.hpp:22
Definition: subject_state.hpp:30
bool is_subscribed() const
indicates current status of subscription
Definition: subscription_base.hpp:51