13#include <rpp/schedulers/fwd.hpp>
14#include <rpp/schedulers/details/worker.hpp>
15#include <rpp/subscriptions/composite_subscription.hpp>
16#include <rpp/subscriptions/subscription_guard.hpp>
17#include <rpp/schedulers/details/queue_worker_state.hpp>
24namespace rpp::schedulers
44 auto shared = std::make_shared<state>();
46 shared->init_thread(sub);
50 bool is_subscribed()
const
52 if (
const auto locked = m_state.lock())
53 return locked->is_subscribed();
64 if (
auto locked = m_state.lock())
65 locked->defer_at(time_point, std::move(fn));
68 static time_point now() {
return clock_type::now(); }
71 class state :
public std::enable_shared_from_this<state>
75 state(
const state&) =
delete;
76 state(state&&)
noexcept =
delete;
78 bool is_subscribed()
const {
return m_sub->is_subscribed(); }
82 if (m_sub->is_subscribed())
83 m_queue.emplace(time_point, std::move(fn));
88 m_thread = std::thread{[state = shared_from_this()]()
94 const auto locked = state.lock();
100 if (locked->m_thread.joinable() && locked->m_thread.get_id() != std::this_thread::get_id())
101 locked->m_thread.join();
103 locked->m_thread.detach();
106 m_sub.reset(callback);
112 std::optional<new_thread_schedulable> fn{};
113 while (m_queue.is_subscribed())
115 if (m_queue.pop_with_wait(fn))
123 m_queue.unsubscribe();
127 std::thread m_thread{};
132 std::weak_ptr<state> m_state{};
Subscription which invoke callbable during unsubscribe.
Definition: callback_subscription.hpp:25
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
std::weak_ptr< details::subscription_state > add(const TSub &sub=TSub{}) const
Add any other subscription to this as dependent.
Definition: composite_subscription.hpp:43
Definition: queue_worker_state.hpp:53
scheduler which schedules execution of schedulables via queueing tasks to another thread with priorit...
Definition: new_thread_scheduler.hpp:32
Definition: worker.hpp:31
Definition: worker.hpp:60
bool is_subscribed() const
indicates current status of subscription
Definition: subscription_base.hpp:51
void unsubscribe() const
initiates unsubscription process (if subscribed)
Definition: subscription_base.hpp:59
guard over subscription to auto-unsubscribe during destructor
Definition: subscription_guard.hpp:21
Definition: constraints.hpp:22
Definition: worker.hpp:23