ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
utils.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/schedulers/fwd.hpp>
13
14#include <exception>
15#include <optional>
16#include <thread>
17
18namespace rpp::schedulers::details
19{
20 inline thread_local time_point s_last_now_time{};
21
22 inline rpp::schedulers::time_point now()
23 {
24 return s_last_now_time = clock_type::now();
25 }
26
27 inline bool sleep_until(const time_point timepoint)
28 {
29 if (timepoint <= details::s_last_now_time)
30 return false;
31
32 const auto now = clock_type::now();
33 std::this_thread::sleep_for(timepoint - now);
34 details::s_last_now_time = std::max(now, timepoint);
35 return timepoint > now;
36 }
37
42 template<typename NowStrategy, rpp::schedulers::constraint::schedulable_handler Handler, typename... Args>
43 std::optional<time_point> immediate_scheduling_while_condition(duration duration,
44 const std::predicate auto& condition,
45 constraint::schedulable_delay_from_this_timepoint_fn<Handler, Args...> auto&& fn,
46 Handler&& handler,
47 Args&&... args) noexcept
48 {
49 auto timepoint = NowStrategy::now() + duration;
50 while (condition())
51 {
52 if (handler.is_disposed())
53 return std::nullopt;
54
55 if (sleep_until(timepoint) && handler.is_disposed())
56 return std::nullopt;
57
58 try
59 {
60 if (const auto duration_from_timepoint = fn(handler, args...))
61 timepoint += duration_from_timepoint->value;
62 else
63 return std::nullopt;
64 }
65 catch (...)
66 {
67 handler.on_error(std::current_exception());
68 return std::nullopt;
69 }
70 }
71
72 return timepoint;
73 }
74
79 template<typename NowStrategy, rpp::schedulers::constraint::schedulable_handler Handler, typename... Args>
80 std::optional<time_point> immediate_scheduling_while_condition(duration duration,
81 const std::predicate auto& condition,
82 constraint::schedulable_delay_from_now_fn<Handler, Args...> auto&& fn,
83 Handler&& handler,
84 Args&&... args) noexcept
85 {
86 while (condition())
87 {
88 if (handler.is_disposed())
89 return std::nullopt;
90
91 if (duration > duration::zero())
92 {
93 std::this_thread::sleep_for(duration);
94
95 if (handler.is_disposed())
96 return std::nullopt;
97 }
98
99 try
100 {
101 if (const auto new_duration = fn(handler, args...))
102 duration = new_duration->value;
103 else
104 return std::nullopt;
105 }
106 catch (...)
107 {
108 handler.on_error(std::current_exception());
109 return std::nullopt;
110 }
111 }
112
113 return NowStrategy::now() + duration;
114 }
115
120 template<typename NowStrategy, rpp::schedulers::constraint::schedulable_handler Handler, typename... Args>
121 std::optional<time_point> immediate_scheduling_while_condition(duration duration,
122 const std::predicate auto& condition,
123 constraint::schedulable_delay_to_fn<Handler, Args...> auto&& fn,
124 Handler&& handler,
125 Args&&... args) noexcept
126 {
127 std::optional<time_point> timepoint{};
128 while (condition())
129 {
130 if (handler.is_disposed())
131 return std::nullopt;
132
133 if (!timepoint.has_value())
134 {
135 if (duration > duration::zero())
136 {
137 std::this_thread::sleep_for(duration);
138
139 if (handler.is_disposed())
140 return std::nullopt;
141 }
142 }
143 else if (sleep_until(timepoint.value()) && handler.is_disposed())
144 return std::nullopt;
145
146 try
147 {
148 if (const auto new_timepoint = fn(handler, args...))
149 timepoint = new_timepoint->value;
150 else
151 return std::nullopt;
152 }
153 catch (...)
154 {
155 handler.on_error(std::current_exception());
156 return std::nullopt;
157 }
158 }
159
160 return timepoint;
161 }
162} // namespace rpp::schedulers::details