32 class schedulable_base
35 explicit schedulable_base(
const time_point& time_point)
36 : m_time_point{time_point}
40 virtual ~schedulable_base()
noexcept =
default;
42 virtual std::optional<time_point> operator()()
noexcept = 0;
47 advanced_call(std::variant<delay_from_now, delay_from_this_timepoint, delay_to> data)
52 const std::variant<delay_from_now, delay_from_this_timepoint, delay_to>& get()
const {
return m_data; }
54 auto visit(
const auto& fn)
const
56 return std::visit(fn, m_data);
59 bool can_run_immediately()
const noexcept
65 return d.value == rpp::schedulers::duration::zero();
70 std::variant<delay_from_now, delay_from_this_timepoint, delay_to> m_data;
73 virtual std::optional<advanced_call> make_advanced_call() noexcept = 0;
74 virtual time_point handle_advanced_call(const
advanced_call&) noexcept = 0;
76 virtual
bool is_disposed() const noexcept = 0;
78 virtual
void on_error(const std::exception_ptr& ep) const = 0;
80 time_point get_timepoint()
const {
return m_time_point; }
82 void set_timepoint(
const time_point& timepoint) { m_time_point = timepoint; }
84 const std::shared_ptr<schedulable_base>& get_next()
const {
return m_next; }
86 void set_next(std::shared_ptr<schedulable_base>&& next) { m_next = std::move(next); }
88 void update_next(std::shared_ptr<schedulable_base>&& next)
91 next->set_next(std::move(m_next));
92 m_next = std::move(next);
96 template<
typename NowStrategy>
97 auto get_advanced_call_handler()
const
99 return rpp::utils::overloaded{
100 [](
const delay_from_now& v) {
101 return NowStrategy::now() + v.value;
103 [
this](
const delay_from_this_timepoint& v) {
104 return get_timepoint() + v.value;
106 [](
const delay_to& v) {
112 std::shared_ptr<schedulable_base> m_next{};
113 time_point m_time_point;
116 template<
typename NowStrategy, rpp::constraint::decayed_type Fn, rpp::schedulers::constraint::schedulable_handler Handler, rpp::constraint::decayed_type... Args>
118 class specific_schedulable final :
public schedulable_base
121 template<rpp::constra
int::decayed_same_as<Fn> TFn,
typename... TArgs>
122 explicit specific_schedulable(
const time_point& time_point, TFn&& in_fn, TArgs&&... in_args)
123 : schedulable_base{time_point}
124 , m_args{std::forward<TArgs>(in_args)...}
125 , m_fn{std::forward<TFn>(in_fn)}
129 std::optional<time_point> operator()()
noexcept override
133 if (
const auto res = m_args.apply(m_fn))
134 return get_advanced_call_handler<NowStrategy>()(res.value());
138 m_args.template get<0>().on_error(std::current_exception());
143 std::optional<advanced_call> make_advanced_call()
noexcept override
147 if (
const auto res = m_args.apply(m_fn))
152 m_args.template get<0>().on_error(std::current_exception());
157 time_point handle_advanced_call(
const advanced_call& v)
noexcept override
159 return v.visit(get_advanced_call_handler<NowStrategy>());
162 bool is_disposed()
const noexcept override {
return m_args.template get<0>().is_disposed(); }
164 void on_error(
const std::exception_ptr& ep)
const override { m_args.template get<0>().on_error(ep); }
168 RPP_NO_UNIQUE_ADDRESS Fn m_fn;
205 class schedulables_queue
208 schedulables_queue() =
default;
209 schedulables_queue(
const schedulables_queue&) =
delete;
210 schedulables_queue(schedulables_queue&&)
noexcept =
default;
212 schedulables_queue& operator=(
const schedulables_queue& other) =
delete;
213 schedulables_queue& operator=(schedulables_queue&& other)
noexcept =
default;
215 schedulables_queue(std::weak_ptr<shared_queue_data> shared_data)
216 : m_shared_data{std::move(shared_data)}
221 void emplace(
const time_point& timepoint, Fn&& fn, Handler&& handler, Args&&... args)
225 emplace_impl(std::make_shared<schedulable_type>(timepoint, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...));
228 void emplace(
const time_point& timepoint, std::shared_ptr<schedulable_base>&& schedulable)
233 schedulable->set_timepoint(timepoint);
234 emplace_impl(std::move(schedulable));
237 bool is_empty()
const {
return !m_head; }
239 std::shared_ptr<schedulable_base> pop()
241 return std::exchange(m_head, m_head->get_next());
244 const std::shared_ptr<schedulable_base>& top()
const
250 void emplace_impl(std::shared_ptr<schedulable_base>&& schedulable)
253 const auto s = m_shared_data.lock();
260 std::lock_guard lock{mutex};
262 if (!m_head || schedulable->get_timepoint() < m_head->get_timepoint())
264 schedulable->set_next(std::move(m_head));
265 m_head = std::move(schedulable);
270 while (
const auto& next = current->get_next())
272 if (schedulable->get_timepoint() < next->get_timepoint())
274 current = next.get();
277 current->update_next(std::move(schedulable));
281 std::shared_ptr<schedulable_base> m_head{};
282 std::weak_ptr<shared_queue_data> m_shared_data{};