ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
worker.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/schedulers/constraints.hpp> // schedulable_fn
14#include <rpp/schedulers/fwd.hpp> // own forwarding
15#include <rpp/utils/constraints.hpp>
16
17#include <algorithm>
18#include <functional>
19
20namespace rpp::schedulers
21{
22template<typename T>
23concept worker_strategy = std::copyable<T> && requires(const T t)
24{
25 t.defer_at(time_point{}, std::declval<optional_duration(*)()>());
26 { t.now() } -> std::same_as<time_point>;
27};
28
29template<typename Strategy>
31{
32public:
33 template<constraint::schedulable_fn Fn>
34 schedulable_wrapper(const Strategy& strategy, time_point time_point, Fn&& fn)
35 : m_strategy{strategy}
36 , m_time_point{time_point}
37 , m_fn{std::forward<Fn>(fn)} {}
38
39 void operator()()
40 {
41 if (!m_strategy.is_subscribed())
42 return;
43
44 if (const auto duration = m_fn())
45 {
46 m_time_point = std::max(m_strategy.now(), m_time_point + duration.value());
47
48 m_strategy.defer_at(m_time_point, std::move(*this));
49 }
50 }
51
52private:
53 Strategy m_strategy;
54 time_point m_time_point;
55 std::function<optional_duration()> m_fn{};
56};
57
58template<worker_strategy Strategy>
59class worker final : public details::worker_tag
60{
61public:
62 template<typename ...Args>
63 requires (!rpp::constraint::variadic_is_same_type<worker<Strategy>, Args...>)
64 worker(Args&& ...args) : m_strategy{std::forward<Args>(args)...} {}
65
66 void schedule(constraint::schedulable_fn auto&& fn) const
67 {
68 schedule(m_strategy.now(), std::forward<decltype(fn)>(fn));
69 }
70
71 void schedule(duration delay, constraint::schedulable_fn auto&& fn) const
72 {
73 schedule(m_strategy.now() + delay, std::forward<decltype(fn)>(fn));
74 }
75
76 void schedule(time_point time_point, constraint::schedulable_fn auto&& fn) const
77 {
78 m_strategy.defer_at(time_point, std::forward<decltype(fn)>(fn));
79 }
80
81 static time_point now() { return Strategy::now(); }
82
83private:
84 Strategy m_strategy;
85};
86} // namespace rpp::schedulers
Definition: worker.hpp:31
Definition: worker.hpp:60
Definition: constraints.hpp:22
Definition: worker.hpp:23