12#include <rpp/schedulers/fwd.hpp>
13#include <rpp/schedulers/details/worker.hpp>
14#include <rpp/subscriptions/composite_subscription.hpp>
15#include <rpp/schedulers/details/queue_worker_state.hpp>
18namespace rpp::schedulers
40 : m_queue{std::move(queue)}
44 bool is_subscribed()
const
56 if (m_sub.is_subscribed())
57 if (
auto locked = m_queue.lock())
58 locked->emplace(time_point, std::move(fn));
61 static time_point now() {
return clock_type::now(); }
64 std::weak_ptr<details::queue_worker_state<run_loop_schedulable>> m_queue{};
74 state(
const state&) =
delete;
75 state(state&&)
noexcept =
delete;
79 m_queue.unsubscribe();
94 : m_state(std::make_shared<state>(sub)) {}
98 auto res = m_state->get_subscription().add(sub);
99 sub.add([weak = std::weak_ptr{m_state}, res]
101 if (
const auto sh = weak.lock())
102 sh->get_subscription().remove(res);
104 return worker<worker_strategy>{std::shared_ptr<details::queue_worker_state<run_loop_schedulable>>{m_state, &m_state->get_queue()}, sub};
107 bool is_empty()
const
109 return m_state->get_queue().is_empty();
112 bool is_any_ready_schedulable()
const
114 return m_state->get_queue().is_any_ready_schedulable();
117 void dispatch_if_ready()
const
119 std::optional<run_loop_schedulable> fn{};
120 if (m_state->get_queue().pop_if_ready(fn))
124 void dispatch()
const
126 std::optional<run_loop_schedulable> fn{};
127 if (m_state->get_queue().pop_with_wait(fn))
132 const std::shared_ptr<state> m_state{};
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
Definition: queue_worker_state.hpp:53
scheduler which schedules execution via queueing tasks, but execution of tasks should be manually dis...
Definition: run_loop_scheduler.hpp:30
Definition: worker.hpp:31
Definition: worker.hpp:60
bool is_subscribed() const
indicates current status of subscription
Definition: subscription_base.hpp:51
Definition: constraints.hpp:22
Definition: worker.hpp:23