ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
publish_subject.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/subjects/fwd.hpp>
13#include <rpp/utils/constraints.hpp>
14#include <rpp/subscribers/dynamic_subscriber.hpp>
15#include <rpp/subjects/details/subject_state.hpp>
16#include <rpp/subjects/details/base_subject.hpp>
17
18namespace rpp::subjects::details
19{
20template<rpp::constraint::decayed_type T>
22{
23public:
24 template<rpp::constraint::decayed_same_as<composite_subscription> TSub>
25 publish_strategy(TSub&& sub)
26 : m_sub{std::forward<TSub>(sub)}
27 {
28 m_sub.add([state = std::weak_ptr{m_state}]
29 {
30 if(const auto locked = state.lock())
31 locked->on_unsubscribe();
32 });
33 }
34
35 void on_subscribe(const dynamic_subscriber<T>& sub) const
36 {
37 m_state->on_subscribe(sub);
38 }
39
40 auto get_subscriber() const
41 {
42 return rpp::make_specific_subscriber<T>(m_sub,
43 [state = m_state](const T& v)
44 {
45 state->on_next(v);
46 },
47 [state = m_state](const std::exception_ptr& err)
48 {
49 state->on_error(err);
50 },
51 [state = m_state]()
52 {
53 state->on_completed();
54 });
55 }
56
57private:
58 std::shared_ptr<subject_state<T>> m_state = std::make_shared<subject_state<T>>();
60};
61} // namespace rpp::subjects::details
62
63namespace rpp::subjects
64{
77template<rpp::constraint::decayed_type T>
78class publish_subject final : public details::base_subject<T, details::publish_strategy<T>>{
79public:
82
85};
86} // namespace rpp::subjects
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
std::weak_ptr< details::subscription_state > add(const TSub &sub=TSub{}) const
Add any other subscription to this as dependent.
Definition: composite_subscription.hpp:43
subscriber which uses dynamic_observer<T> to hide original callbacks
Definition: dynamic_subscriber.hpp:24
Definition: base_subject.hpp:23
Definition: publish_subject.hpp:22
Subject which just multicasts values to observers subscribed on it. It contains two parts: subscriber...
Definition: publish_subject.hpp:78