13#include "rpp/utils/utilities.hpp"
15#include <rpp/subscriptions/subscription_base.hpp>
16#include <rpp/subscriptions/callback_subscription.hpp>
17#include <rpp/utils/constraints.hpp>
18#include <rpp/subscriptions/constraints.hpp>
42 template<constra
int::subscription TSub = subscription_base>
43 std::weak_ptr<details::subscription_state>
add(
const TSub &sub = TSub{})
const
46 return sub.get_state();
48 if (
const auto pstate = std::static_pointer_cast<state>(get_state()))
49 pstate->add(sub.get_state());
52 return sub.get_state();
60 return add<callback_subscription>(sub);
66 ret.
add([weak_handle =
add(ret), state = std::weak_ptr{std::static_pointer_cast<state>(get_state())}]
69 if (
const auto locked_state = state.lock())
70 if (
const auto locked_handle = weak_handle.lock())
71 locked_state->remove(locked_handle);
76 void remove(
const subscription_base &sub)
const
78 if (
const auto pstate = std::static_pointer_cast<state>(get_state()))
79 pstate->remove(sub.get_state());
82 void remove(
const std::weak_ptr<details::subscription_state>& sub)
const
84 if (
const auto locked = sub.lock())
85 if (
const auto pstate = std::static_pointer_cast<state>(get_state()))
86 pstate->remove(locked);
94 static composite_subscription empty()
96 return composite_subscription{empty_tag{}};
102 composite_subscription(
const empty_tag&)
103 : subscription_base{std::shared_ptr<details::subscription_state>{}} {}
105 class state final :
public details::subscription_state
110 void add(std::shared_ptr<details::subscription_state> sub)
112 if (!sub || !sub->is_subscribed())
117 DepsState expected{DepsState::None};
118 if (m_state.compare_exchange_strong(expected, DepsState::Edit, std::memory_order::acq_rel))
120 m_deps.push_back(std::move(sub));
122 m_state.store(DepsState::None, std::memory_order::release);
126 if (expected == DepsState::Unsubscribed)
134 void remove(
const std::shared_ptr<details::subscription_state>& sub)
138 DepsState expected{DepsState::None};
139 if (m_state.compare_exchange_strong(expected, DepsState::Edit, std::memory_order::acq_rel))
141 std::erase(m_deps, sub);
143 m_state.store(DepsState::None, std::memory_order::release);
147 if (expected == DepsState::Unsubscribed)
153 void on_unsubscribe()
override
157 DepsState expected{DepsState::None};
158 if (m_state.compare_exchange_strong(expected, DepsState::Unsubscribed, std::memory_order::acq_rel))
160 rpp::utils::for_each(m_deps, std::mem_fn(&details::subscription_state::unsubscribe));
168 enum class DepsState : uint8_t
175 std::atomic<DepsState> m_state{DepsState::None};
176 std::vector<std::shared_ptr<details::subscription_state>> m_deps{};
Subscription which invoke callbable during unsubscribe.
Definition: callback_subscription.hpp:25
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
std::weak_ptr< details::subscription_state > add(const callback_subscription &sub) const
Add callback/function subscription to this as dependent.
Definition: composite_subscription.hpp:58
std::weak_ptr< details::subscription_state > add(const TSub &sub=TSub{}) const
Add any other subscription to this as dependent.
Definition: composite_subscription.hpp:43
Base subscription implementation used as base class/interface and core implementation for derrived su...
Definition: subscription_base.hpp:25