ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
rpp::schedulers::current_thread Class Reference

Schedules execution of schedulables via queueing tasks to the caller thread with priority to time_point and order. More...

#include <current_thread.hpp>

Classes

struct  is_queue_is_empty
 
class  worker_strategy
 

Static Public Member Functions

static details::schedulables_queue< worker_strategy > *& get_queue ()
 
static void drain_queue () noexcept
 
static own_queue_guard own_queue_and_drain_finally_if_not_owned ()
 
static rpp::schedulers::worker< worker_strategycreate_worker ()
 

Friends

class new_thread
 

Detailed Description

Schedules execution of schedulables via queueing tasks to the caller thread with priority to time_point and order.

Warning
Caller thread is thread where schedule called.

When this scheduler passed to some operators, then caller thread is thread where scheduling of some action happens. In most cases it is where on_next was called.

Why do we need it?
This scheduler used to prevent recursion calls and making planar linear execution of schedulables. For example:
auto worker = rpp::schedulers::current_thread::create_worker();
worker.schedule([&worker](const auto& handler)
{
std::cout << "Task 1 starts" << std::endl;
worker.schedule([&worker](const auto& handler)
{
std::cout << "Task 2 starts" << std::endl;
worker.schedule([](const auto&)
{
std::cout << "Task 4" << std::endl;
return rpp::schedulers::optional_delay_from_now{};
}, handler);
std::cout << "Task 2 ends" << std::endl;
return rpp::schedulers::optional_delay_from_now{};
}, handler);
worker.schedule([](const auto&)
{
std::cout << "Task 3" << std::endl;
return rpp::schedulers::optional_delay_from_now{};
}, handler);
std::cout << "Task 1 ends" << std::endl;
return rpp::schedulers::optional_delay_from_now{};
}, handler);
Definition worker.hpp:23
Would lead to:
  • "Task 1 starts"
  • "Task 1 ends"
  • "Task 2 starts"
  • "Task 2 ends"
  • "Task 3"
  • "Task 4"
How to use it properly?
To have any visible impact you need to use it at least twice during same observable. For example, rpp::source::just source uses it as default scheduler as well as rpp::operators::merge operator (which just "owns" it during subscription).

For example, this one

| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
auto merge_with(TObservable &&observable, TObservables &&... observables)
Combines submissions from current observable with other observables into one.
Definition merge.hpp:252
auto just(const TScheduler &scheduler, T &&item, Ts &&... items)
Creates rpp::observable that emits a particular items and completes.
Definition from.hpp:201
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226

Procedes output 1 4 2 5 3 6 due to merge_with takes ownership over this scheduler during subscription, both sources schedule their first emissions into scheduler, then merge_with frees scheduler and it starts to proceed scheduled actions. As a result it continues interleaving of values. In case of usingg rpp::schedulers::immediate it would be:

| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
immediately calls provided schedulable or waits for time_point (in the caller-thread)
Definition immediate.hpp:65

With output 1 2 3 4 5 6

Examples
combine_latest.cpp, debounce.cpp, from.cpp, just.cpp, retry_when.cpp, take_until.cpp, throttle.cpp, timeout.cpp, and window_toggle.cpp.

The documentation for this class was generated from the following file: