ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
thread_pool.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/schedulers/fwd.hpp>
14
15#include <rpp/schedulers/new_thread.hpp>
16
17#include <vector>
18
19namespace rpp::schedulers
20{
30 class thread_pool final
31 {
32 using original_worker = decltype(new_thread::create_worker());
33
34 class worker_strategy
35 {
36 public:
37 worker_strategy(const original_worker& original_worker)
38 : m_original_worker{original_worker}
39 {
40 }
41
42 template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
43 void defer_for(duration duration, Fn&& fn, Handler&& handler, Args&&... args) const
44 {
45 m_original_worker.schedule(duration, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
46 }
47
48 template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
49 void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const
50 {
51 m_original_worker.schedule(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
52 }
53
54 static rpp::schedulers::time_point now() { return original_worker::now(); }
55
56 private:
57 original_worker m_original_worker;
58 };
59
60 public:
61 explicit thread_pool(size_t threads_count = std::thread::hardware_concurrency())
62 : m_state{std::make_shared<state>(threads_count)}
63 {
64 }
65
67 {
68 return rpp::schedulers::worker<worker_strategy>{m_state->get()};
69 }
70
71 private:
72 class state
73 {
74 public:
75 explicit state(size_t threads_count)
76 {
77 threads_count = std::max(size_t{1}, threads_count);
78 m_workers.reserve(threads_count);
79 for (size_t i = 0; i < threads_count; ++i)
80 m_workers.emplace_back(new_thread::create_worker());
81 }
82
83 const original_worker& get() { return m_workers[m_index++ % m_workers.size()]; }
84
85 private:
86 std::vector<original_worker> m_workers{};
87 size_t m_index{};
88 };
89
90 std::shared_ptr<state> m_state{};
91 };
92} // namespace rpp::schedulers
Scheduler owning static thread pool of workers and using "some" thread from this pool on create_worke...
Definition thread_pool.hpp:31
Definition fwd.hpp:157