39 if (!m_thread.joinable())
43 std::lock_guard lock{m_state->mutex};
44 m_state->is_stoping =
true;
46 m_state->cv.notify_all();
51 void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args)
53 m_state->queue.emplace(time_point, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
54 m_state->has_fresh_data.store(
true);
62 std::atomic_bool has_fresh_data{
false};
65 static void data_thread(std::shared_ptr<queue_data> state)
67 current_thread::get_queue() = &state->queue;
71 std::unique_lock lock{state->mutex};
72 if (state->queue.is_empty() && state->is_stoping)
75 state->cv.wait(lock, [&] {
return !state->queue.is_empty() || state->is_stoping; });
77 if (state->queue.is_empty())
80 if (state->queue.top()->is_disposed())
86 if (details::s_last_now_time < state->queue.top()->get_timepoint())
88 if (
const auto now = worker_strategy::now(); now < state->queue.top()->get_timepoint())
90 state->cv.wait_for(lock, state->queue.top()->get_timepoint() - now, [&] { return state->queue.top()->is_disposed() || worker_strategy::now() >= state->queue.top()->get_timepoint(); });
95 auto top = state->queue.pop();
96 state->has_fresh_data.store(!state->queue.is_empty());
101 if (
const auto res = top->make_advanced_call())
103 if (!top->is_disposed())
105 if (res->can_run_immediately() && !state->has_fresh_data.load())
108 const auto tp = top->handle_advanced_call(res.value());
109 state->queue.emplace(tp, std::move(top));
116 current_thread::get_queue() =
nullptr;
120 std::shared_ptr<queue_data> m_state = std::make_shared<queue_data>();
124 std::thread m_thread{&data_thread, m_state};
128 class worker_strategy
131 worker_strategy() =
default;
134 void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args)
const
136 m_state->defer_to(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
139 static rpp::schedulers::time_point now() {
return details::now(); }
142 std::shared_ptr<state_t> m_state = std::make_shared<state_t>();