ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
thread_pool.cpp
#include <rpp/rpp.hpp>
#include <iostream>
int main() // NOLINT(bugprone-exception-escape)
{
const auto scheduler = rpp::schedulers::thread_pool{4};
rpp::source::just(1, 2, 3, 4, 5, 6, 7, 8)
| rpp::operators::flat_map([scheduler](int value) { return rpp::source::just(scheduler, value)
| rpp::operators::delay(std::chrono::nanoseconds{500}, rpp::schedulers::immediate{}); })
| rpp::operators::subscribe([](int v) { std::cout << "[" << std::this_thread::get_id() << "] " << v << std::endl; });
// Output: (can be in any order but same correlation between thread and values)
// [thread_1] 1
// [thread_2] 2
// [thread_3] 3
// [thread_4] 4
// [thread_1] 5
// [thread_2] 6
// [thread_3] 7
// [thread_4] 8
rpp::source::just(1, 2, 3, 4, 5, 6, 7, 8)
| rpp::operators::delay(std::chrono::nanoseconds{500}, rpp::schedulers::immediate{}); })
| rpp::operators::subscribe([](int v) { std::cout << "[" << std::this_thread::get_id() << "] " << v << std::endl; });
// Output: (can be in any order but same correlation between thread and values)
// [thread_1] 1
// [thread_2] 2
// [thread_3] 3
// [thread_4] 4
// [thread_1] 5
// [thread_2] 6
// [thread_3] 7
// [thread_4] 8
return 0;
}
Scheduler owning static thread pool of workers and using "some" thread from this pool on create_worke...
Definition computational.hpp:30
immediately calls provided schedulable or waits for time_point (in the caller-thread)
Definition immediate.hpp:65
Scheduler owning static thread pool of workers and using "some" thread from this pool on create_worke...
Definition thread_pool.hpp:31
auto just(const TScheduler &scheduler, T &&item, Ts &&... items)
Creates rpp::observable that emits a particular items and completes.
Definition from.hpp:201
auto flat_map(Fn &&callable)
Transform the items emitted by an Observable into Observables, then flatten the emissions from those ...
Definition flat_map.hpp:64
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226
auto as_blocking()
Converts rpp::observable to rpp::blocking_observable
Definition as_blocking.hpp:47
auto delay(rpp::schedulers::duration delay_duration, Scheduler &&scheduler)
Shift the emissions from an Observable forward in time by a particular amount.
Definition delay.hpp:219