ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
test_scheduler.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/schedulers.hpp>
13
14namespace rpp::schedulers
15{
16
17
18 class test_scheduler final
19 {
20 public:
21 static inline rpp::schedulers::time_point s_current_time{std::chrono::seconds{10}};
22
23 class worker_strategy;
24
25 struct state
26 {
27 state() = default;
28
29 template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, rpp::schedulers::constraint::schedulable_fn<Handler, Args...> Fn>
30 void schedule(rpp::schedulers::time_point time_point, Fn&& fn, Handler&& handler, Args&&... args)
31 {
32 schedulings.push_back(time_point);
33 queue.emplace(time_point, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
34 }
35
36 void drain()
37 {
38 while (!queue.is_empty())
39 {
40 if (queue.top()->get_timepoint() > s_current_time)
41 return;
42
43 auto fn = queue.top();
44 queue.pop();
45
46 if (fn->is_disposed())
47 continue;
48
49 executions.push_back(s_current_time);
50 if (auto new_timepoint = (*fn)())
51 {
52 if (fn->is_disposed())
53 continue;
54
55 schedulings.push_back(std::max(s_current_time, new_timepoint.value()));
56 queue.emplace(schedulings.back(), std::move(fn));
57 }
58 }
59 }
60
61 std::vector<rpp::schedulers::time_point> schedulings{};
62 std::vector<rpp::schedulers::time_point> executions{};
64 };
65
66 class worker_strategy
67 {
68 public:
69 worker_strategy(std::weak_ptr<state> state)
70 : m_state{std::move(state)}
71 {
72 }
73
74 template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, rpp::schedulers::constraint::schedulable_fn<Handler, Args...> Fn>
75 void defer_for(rpp::schedulers::duration duration, Fn&& fn, Handler&& handler, Args&&... args) const
76 {
77 if (auto locked = m_state.lock())
78 {
79 locked->schedule(now() + duration, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
80 locked->drain();
81 }
82 }
83
84 static rpp::schedulers::time_point now() { return s_current_time; }
85
86 private:
87 std::weak_ptr<state> m_state;
88 };
89
90 test_scheduler() = default;
91
93 {
95 }
96
97 const auto& get_schedulings() const { return m_state->schedulings; }
98 const auto& get_executions() const { return m_state->executions; }
99
100 static rpp::schedulers::time_point now() { return s_current_time; }
101
102 void time_advance(rpp::schedulers::duration dur) const
103 {
104 s_current_time += dur;
105 m_state->drain();
106 }
107
108 private:
109 std::shared_ptr<state> m_state = std::make_shared<state>();
110 };
111} // namespace rpp::schedulers
Definition test_scheduler.hpp:67
Definition worker.hpp:23
Definition test_scheduler.hpp:26