ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
Manual documentation

Introduction to Reactive Programming

Important
It's highly recommended to read this article beforehand: The introduction to Reactive Programming you've been missing

Reactive programming is a design paradigm that focuses on building applications that can efficiently respond to asynchronous events.

Every application or function has two core parts: input and output. Input/output can even be empty:

void function() { }

Input/output can be categorized into two types:

  • Static - The application or function accepts input and handles it. For example, command line arguments or function arguments:
int sum(int a, int b) { return a + b; }
  • Distributed in time - The application or function doesn't know the exact length of input or when it will arrive but knows what to do when it happens:
#include <iostream>
int main()
{
while(true)
{
auto ch = ::getchar();
std::cout << "Obtained char " << ch << std::endl;
}
}

When dealing with input that is distributed in time, there are two ways to handle it:

  • Pulling - You decide when you need extra data and request it. This is often a blocking operation. For example, manually checking a blog for new posts.
  • Pushing - You register interest in a data source and react when new data becomes available. For example, subscribing to a blog and receiving notifications for new posts.

Reactive programming is a powerful way to handle input that is distributed in time. Instead of constantly polling for updates, reactive programming allows you to register callbacks to be executed when the input becomes available.

See https://reactivex.io/intro.html for more details.

Core Concepts

Reactive Programming can be described as follows:

  • An Observer subscribes to an Observable.
  • The Observable notifies its subscribed Observers about new events/emissions:
    • on_next(T) - notifies about a new event/emission
    • on_error(std::exception_ptr) - notifies about an error. This is a termination event. (no more calls from this observable should be expected)
    • on_completed() - notifies about successful completion. This is a termination event. (no more calls from this observable should be expected)
    • set_upstream(disposable) - observable could pass to observer it's own disposable to provide ability for observer to terminate observable's internal actions/state
    • is_disposed() - checks if the observer is still interested in the source data.

During subscription, the Observable can provide a Disposable for the Observer to check if the observable is still alive or to terminate early if needed.

For example:

#include <rpp/rpp.hpp>
#include <iostream>
int main()
{
rpp::source::create<int>([](const auto& observer)
{
while (!observer.is_disposed())
{
char ch = ::getchar();
if (!::isdigit(ch))
{
observer.on_error(std::make_exception_ptr(std::runtime_error{"Invalid symbol"}));
return;
}
int digit = ch - '0';
if (digit == 0)
{
observer.on_completed();
return;
}
observer.on_next(digit);
}
})
.subscribe([](int val)
{
std::cout << "obtained val " << val << std::endl;
},
[](std::exception_ptr err)
{
std::cout << "obtained error " << std::endl;
},
[]()
{
std::cout << "Completed" << std::endl;
});
// input: 123456d
// output: obtained val 1
// obtained val 2
// obtained val 3
// obtained val 4
// obtained val 5
// obtained val 6
// obtained error
// input: 1230
// output: obtained val 1
// obtained val 2
// obtained val 3
// Completed
return 0;
}
auto create(OnSubscribe &&on_subscribe)
Construct observable specialized with passed callback function. Most easiesest way to construct obser...
Definition create.hpp:57
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226

There we are creating observable that emits digits from console input:In case of user promted something else it is error for our observable (it is expected to emit ONLY digits). In this case we are notifying observer about it and just stopping. When user prompts 0, it means "end of observable".

See https://reactivex.io/documentation/observable.html for more details.

In such an way it is not powerful enough, so Reactive Programming provides a list of operators.

Observable contract

Observable is the source of any Reactive Stream.

Observable is the source of any Reactive Stream. Observable provides ability to subscribe observer on stream of events. After subscription observable would emit values to observer.

Reactive programming has an Observable Contract. Please read it.

This contract includes:

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

RPP follows this contract, meaning:

  1. All RPP operators follow this contract.
    All built-in RPP observables/operators emit emissions serially
  2. User-provided callbacks can be non-thread-safe due to the thread-safety of the observable.
    For example: internal logic of take operator doesn't use mutexes or atomics due to underlying observable MUST emit items serially
  3. When implementing your own operator via create, follow this contract!
  4. This is true EXCEPT FOR subjects if used manually. Use serialized_* instead if you can't guarantee serial emissions.

For example:

| rpp::operators::map([](int v)
{
std::cout << "enter " << v << std::endl;
std::this_thread::sleep_for(std::chrono::seconds{1});
std::cout << "exit " << v << std::endl;
return v;
})
Scheduler which schedules invoking of schedulables to another thread via queueing tasks with priority...
Definition new_thread.hpp:31
auto merge_with(TObservable &&observable, TObservables &&... observables)
Combines submissions from current observable with other observables into one.
Definition merge.hpp:252
auto just(const TScheduler &scheduler, T &&item, Ts &&... items)
Creates rpp::observable that emits a particular items and completes.
Definition from.hpp:201
auto map(Fn &&callable)
Transforms the items emitted by an Observable via applying a function to each item and emitting resul...
Definition map.hpp:94
auto repeat()
Repeats the Observabe's sequence of emissions infinite amount of times via re-subscribing on it durin...
Definition repeat.hpp:86
auto as_blocking()
Converts rpp::observable to rpp::blocking_observable
Definition as_blocking.hpp:47
auto subscribe_on(Scheduler &&scheduler)
OnSubscribe function for this observable will be scheduled via provided scheduler.
Definition subscribe_on.hpp:75

This will never produce:

enter 1
enter 2
exit 2
exit 1

Only serially:

enter 1
exit 1
enter 1
exit 1
enter 2
exit 2
enter 2
exit 2
See also
https://reactivex.io/documentation/observable.html

Observers:

Observer subscribes on Observable and obtains values provided by Observable.

In fact observer is kind of wrapper over 3 core functions:

  • on_next(T) - callback with new emission provided by observable
  • on_error(err) - failure termination callback with reason of failure of observable (why observable can't continue processing)
  • on_completed() - succeed termination callback - observable is done, no any future emissions from this

Additionally in RPP observer handles disposables related logic:

  • set_upstream(disposable) - observable could pass to observer it's own disposable to provide ability for observer to terminate observable's internal actions/state.
  • is_disposed() - observable could check if observer is still interested in emissions (false) or done and no any futher calls would be success (true)
Observer creation:
  • Observer creation inside subscribe:
    RPP expects user to create observers only inside subscribe function of observables. Something like this:
    rpp::source::just(1).subscribe([](int){}, [](const std::exception_ptr&){}, [](){});
    rpp::source::just(1) | rpp::operators::subscribe([](int){}, [](const std::exception_ptr&){}, [](){});
    Some of the callbacks (on_next/on_error/on_completed) can be omitted. Check rpp::operators::subscribe for more details.
  • Advanced observer creation:
    Technically it is possible to create custom observer via creating new class/struct which satisfies concept rpp::constraint::observer_strategy, but it is highly not-recommended for most cases
    Also technically you could create your observer via make_lambda_observer function, but it is not recommended too: it could disable some built-in optimizations and cause worse performance.
    Also it is most probably bad pattern and invalid usage of RX if you want to keep/store observers as member variables/fields. Most probably you are doing something wrong IF you are not implementing custom observable/operator.
See also
https://reactivex.io/documentation/observable.html

Operators

Operators modify observables and extend them with custom logic.

Observables emit values based on underlying logic, such as iterating over a vector and etc. Operators allow you to enhance this stream, for example, by filtering values, transforming them, etc., resulting in a more suitable stream for specific cases.

Example: Create an observable to read characters from console input, continue until '0' is encountered, filter out non-letter characters, and send the remaining letters as uppercase to the observer:

#include <rpp/rpp.hpp>
#include <iostream>
int main()
{
| rpp::operators::take_while([](char v) { return v != '0'; })
| rpp::operators::filter(std::not_fn(&::isdigit))
| rpp::operators::map(&::toupper)
| rpp::operators::subscribe([](char v) { std::cout << v; });
// input: 12345qwer5125ttqt0
// output: QWERTTQT
return 0;
}
auto take_while(Fn &&predicate)
Sends items from observable while items are satisfy predicate. When condition becomes false -> sends ...
Definition take_while.hpp:91
auto from_callable(Callable &&callable)
Creates rpp::specific_observable that calls provided callable and emits resulting value of this calla...
Definition from.hpp:249
auto filter(Fn &&predicate)
Emit only those items from an Observable that satisfies a provided predicate.
Definition filter.hpp:91
How operators work and how to create your own?
Example:
rpp::source::create<int>([](const auto& observer){
observer.on_next(1);
observer.on_completed();
});

This example creates an observable of int using the create operator, which emits the value 1 and then completes. The type of this observable is rpp::observable<int, ...>, where ... is an implementation-defined type. To convert int to std::string, you can use the map operator:

rpp::source::create<int>([](const auto& observer){
observer.on_next(1);
observer.on_completed();
})
| rpp::operators::map([](int v){ return std::to_string(v); });

Now it is an observable of strings (rpp::observable<std::string, ...>). The map operator is a functor-adaptor that accepts an observable and returns another observable. It transforms the original observable's type to the "final type" by invoking the passed function. In this case, the final type is std::string. The map operator can be implemented in multiple ways:

1) call-based (function/functor or others) - operator accepts (old) observable and returns new (modified) observable

template<typename Fn>
struct map
{
Fn fn{};
template<typename Type, typename Internal>
auto operator()(const rpp::observable<Type, Internal>& observable) const {
using FinalType = std::invoke_result_t<Fn, Type>;
return rpp::source::create<FinalType>([observable, fn](const rpp::dynamic_observer<FinalType>& observer)
{
observable.subscribe([observer, fn](const auto& v) { observer.on_next(fn(v)); },
[observer](const std::exception_ptr& err) { observer.on_error(err); },
[observer]() { observer.on_completed(); });
});
}
}
void on_next(const Type &v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:84
void on_completed() const noexcept
Observable calls this method to notify observer about completion of emissions.
Definition observer.hpp:135
void on_error(const std::exception_ptr &err) const noexcept
Observable calls this method to notify observer about some error during generation next data.
Definition observer.hpp:120
Type-erased version of the rpp::observer. Any observer can be converted to dynamic_observer via rpp::...
Definition dynamic_observer.hpp:129
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
void subscribe(observer< Type, ObserverStrategy > &&observer) const
Subscribes passed observer to emissions from this observable.
Definition observable.hpp:58

It is template for such an functor-adaptor. It is also fully valid example of call-based operator:

| [](const auto& observable) { return rpp::source::concat(observable, rpp::source::just(2)); };
auto concat(TObservable &&obs, TObservables &&... others)
Make observable which would merge emissions from underlying observables but without overlapping (curr...
Definition concat.hpp:168

This converts the observable to a concatenation of the original observable and just(2).

2) type-traits based - should satisfy rpp::constraint::operator_ concept.
For example, you can implement such an operator like this:

template<typename Fn>
struct simple_map
{
simple_map(const Fn& fn)
: fn(fn)
{
}
Fn fn{};
// 1: define traits for the operator with upstream (previous type) type
template<rpp::constraint::decayed_type T>
{
// 1.1: it could have static asserts to be sure T is applicable for this operator
static_assert(std::invocable<Fn, T>, "Fn is not invocable with T");
// 1.2: it should have `result_type` is type of new observable after applying this operator
using result_type = std::invoke_result_t<Fn, T>;
};
// 2: define updated optimal disposables strategy. Set to `rpp::details::observables::default_disposables_strategy` if you don't know what is that.
template<rpp::details::observables::constraint::disposables_strategy Prev>
using updated_optimal_disposables_strategy = Prev;
// 3: implement core logic of operator: accept downstream observer (of result_type) and convert it to upstream observer (of T).
template<typename Upstream, rpp::constraint::observer Observer>
auto lift(Observer&& observer) const
{
const auto dynamic_observer = std::forward<Observer>(observer).as_dynamic();
return rpp::make_lambda_observer<Upstream>([dynamic_observer, fn = fn](const auto& v) { dynamic_observer.on_next(fn(v)); },
[dynamic_observer](const std::exception_ptr& err) { dynamic_observer.on_error(err); },
[dynamic_observer]() { dynamic_observer.on_completed(); });
}
};
template<typename Fn>
simple_map(const Fn& fn) -> simple_map<Fn>;
void test()
{
rpp::source::just(1) | simple_map([](int v) { return std::to_string(v); }) | rpp::ops::subscribe();
}

But in this case you are missing disposables-related functionality. So, it is better to implement it via providing custom observer's strategy with correct handling of disposables. Check real rpp::operators::map implementation for it =)

See also
https://reactivex.io/documentation/operators.html

Check the operators for more details about operators.

Schedulers

Reactive programming becomes even more powerful when observables can operate across multiple threads, rather than being confined to the thread of creation and subscription. This allows for non-blocking, asynchronous operations and provides significant advantages over raw iteration or other pull-based approaches. To enable multithreading in your observables, you can use Schedulers.

By default, an Observable will perform its work in the thread where the subscribe operation occurs. However, you can change this behavior using the subscribe_on operator. This operator forces the observable to perform the subscription and any subsequent work in the specified scheduler. For example:

In this case subscribe to 3 happens in current thread (where subscribe invoked). But during subscription to 2 it schedules subscription to 1 to provided new_thread scheduler. So, subscription to final observable and it's internal logic (iterating and emitting of values) happens inside new_thread. Actually it is something like this:

rpp::source::create<int>([](const auto& observer)
{
rpp::schedulers::new_thread{}.create_worker([](...) {
rpp::source::just(1,2,3,4,5).subscribe(observer);
})
}).subscribe(...);

The observe_on operator specifies the scheduler that will be used for emission during the processing of further operators after observe_on. For example

auto observe_on(Scheduler &&scheduler, rpp::schedulers::duration delay_duration={})
Specify the Scheduler on which an observer will observe this Observable.
Definition observe_on.hpp:38

In this case whole subscription flow happens in thread of subscription, but emission of values transfers to another thread. Actually it is something like this:

rpp::source::create<int>([](const auto& observer)
{
auto worker = rpp::schedulers::new_thread{}.create_worker();
rpp::source::just(1,2,3,4,5).subscribe([](int v) {
worker.scheduler([](...) {
observer.on_next(v);
}
})
}).subscribe(...);

A Scheduler is responsible for controlling the type of multithreading behavior (or lack thereof) used in the observable. For example, a scheduler can utilize a new thread, a thread pool, or a raw queue to manage its processing.

Check the API Reference for more details about schedulers.

See https://reactivex.io/documentation/scheduler.html for more details about schedulers.

Disposable

Disposable is handle/resource passed from observable to observer via the set_upstream method. Observer disposes this disposable when it wants to unsubscribe from observable.

In reactive programming, a disposable is an object that represents a resource that needs to be released or disposed of when it is no longer needed. This can include things like file handles, network connections, or any other resource that needs to be cleaned up after use. The purpose of a disposable is to provide a way to manage resources in a safe and efficient manner. By using disposables, you can ensure that resources are released in a timely manner, preventing memory leaks and other issues that can arise from resource leaks.

There are 2 main purposes of disposables:

  1. Upstream disposable
    This is a disposable that the observable puts into the observer. The upstream disposable keeps some state or callback that should be disposed of when the observer is disposed (== no longer wants to receive emissions, for example, was completed/errored or just unsubscribed) This ensures that any resources used by the observable are properly cleaned up when the observer obtains on_error/on_completed or disposed in any other way.
  2. External disposable
    This is a disposable that allows the observer to be disposed of from outside the observer itself. This can be useful in situations where you need to cancel an ongoing operation or release resources before the observable has completed its work. To achieve this in rpp you can pass disposable to subscribe method or use subscribe_with_disposable overload instead.
Note
In rpp all disposables should be created via rpp::disposable_wrapper_impl instead of manually.
Warning
From user of rpp library it is not really expected to handle disposables manually somehow except of case where user want to control lifetime of observable-observer connection manually.

Check API reference of disposables for more details

Exception Guarantee

In non-reactive programming functions/modules throws exception in case of something invalid. As a result, user can catch it and handle it somehow while internal state of objects can be in some state (invalid/untouched/partly valid) and etc.

In reactive programming there is another way of exception mechanism: throwing exception as is from original place is useless. Notification about "something goes wrong" need to receive observer/subscriber, not owner of callstack. As a result, ANY exception obtained during emitting items and etc WOULD be delivered to subscriber/observer via on_error function and then unsubscribe happens. As a result, no any raw exceptions would be throws during using RPP. In case of emitting on_error whole internal state of observable keeps valid but it doesn't matter - whole chain would be destroyed due to on_error forces unsubscribe. Reactive catching mechanisms like catch or retry re-subscribes on observable. it means, that new chain with new states would be created, not re-used existing one.

Memory Model

In ReactivePlusPlus there is new concept unique for this implementation: rpp::memory_model:

Some of the operators and sources like rpp::source::just or rpp::operators::start_with accepts user's variables for usage. Some of this types can be such an expensive to copy or move and it would be preferable to copy it once to heap, but some other types (like POD) are cheap enough and usage of heap would be overkill. But these variables should be saved inside somehow!

So, RPP provides ability to select strategy "how to deal with such a variables" via rpp::memory_model enum.

Examples

For example, rpp::source::just

rpp::source::just(my_custom_variable);

by default just uses rpp::memory_model::use_stack and my_custom_variable would be copied and moved everywhere when needed. On the other hand

makes only 1 copy/move to shared_ptr and then uses it instead.

As a a result, users can select preferable way of handling of their types.

Advanced details

Disposable

Rpp has following disposables related classes:

  • interface_disposable - is base inerface for all disposables in RPP. Simplest ever disposable with dispose() and is_disposed() method. This type of disposable observable is passing to observer.
    • callback_disposable - is just noexcept to be called on dispose. Can be constructed like this:
      auto d = rpp::make_callback_disposable([]() noexcept { std::cout << "DISPOSED! " << std::endl; });
  • interface_composite_disposable - is base interface for disposables able to keep dependent disposables inside: main difference - new method add accepting another dispoable inhereting from interface_disposable. Main idea: interface_composite_disposable is aggregating other disposables inside and during dispose() method calling dispose() method of its dependents.
    • composite_disposable - is concrete realization of interface_composite_disposable
    • refcount_disposable - is variant of composite_disposable but it keeps refcounter inside. This counter can be incremented with help of add_ref() method returning new dependent composite_disposable. Idea is simple: original refcount_disposable would be disposed IF all of its dependents disposables (created via add_ref() ) dispose() methods were called.

All disposable in RPP should be created and used via rpp::disposable_wrapper_impl<T> wrapper. For simplicity usage it has 2 base aliases:

  • disposable_wrapper - wrapper over interface_disposable
  • composite_disposable_wrapper - wrapper over interface_composite_disposable

dynamic_* versions to keep classes as variables

Most of the classes inside rpp library including observable, observer and others are heavy-templated classes. It means, it could has a lot of template params. In most cases you shouldn't worry about it due to it is purely internal problem.

But in some cases you want to keep observable or observer inside your classes or return it from function. In most cases I strongly recommend you to use auto to deduce type automatically. But in some cases it is not possible (for example, to keep observable as member variable). For such an usage you could use dynamic_observable and dynamic_observer:

  • they are type-erased wrappers over regular observable/observer with goal to hide all unnecessary stuff from user's code. For example, you can easily use it as:
    #include <rpp/rpp.hpp>
    #include <iostream>
    struct some_data
    {
    };
    int main() {
    some_data v{rpp::source::just(1,2,3),
    std::cout << value << std::endl;
    })};
    v.observable.subscribe(v.observer);
    }
    Type-erased version of the rpp::observable. Any observable can be converted to dynamic_observable via...
    Definition dynamic_observable.hpp:89
    auto make_lambda_observer(OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) -> lambda_observer< Type, std::decay_t< OnNext >, std::decay_t< OnError >, std::decay_t< OnCompleted > >
    Constructs observer specialized with passed callbacks. Most easiesest way to construct observer "on t...
    Definition lambda_observer.hpp:51
  • to convert observable/observer to dynamic_* version you could manually call as_dynamic() member function or just pass them to ctor
  • actually they are similar to rxcpp's observer<T> and observable<T> but provides EXPLICIT definition of dynamic fact
  • due to type-erasure mechanism dynamic_ provides some minor performance penalties due to extra usage of shared_ptr to keep internal state + indirect calls. It is not critical in case of storing it as member function, but could be important in case of using it on hot paths like this:
    | rpp::ops::map([](int v) { return rpp::source::just(v); })
    return observable | rpp::ops::filter([](int v){ return v % 2 == 0;});
    });
    auto flat_map(Fn &&callable)
    Transform the items emitted by an Observable into Observables, then flatten the emissions from those ...
    Definition flat_map.hpp:64
    ^^^ while it is fully valid code, flat_map have to convert observable to dynamic version via extra heap, but it is unnecessary. It is better to use auto in this case.
    | rpp::ops::map([](int v) { return rpp::source::just(v); })
    | rpp::ops::flat_map([](const rpp::constraint::observable_of_type<int> auto& observable) { // or just `const auto& observable`
    return observable | rpp::ops::filter([](int v){ return v % 2 == 0;});
    });

Extensions:

RPP is library to build reactive streams. But in general applicaton uses some another framework/library to build core logic of application. With some of them RPP can be unified to build much more better software. Below you can find list of extensions for RPP with adaption to external frameworks for much more easiser integeration with RPP. These extensions are part of RPP library:

rppqt

RppQt is extension of RPP which enables support of Qt library.

RppQt is set of wrappers to connect QT world with RPP.

Example:
auto button = new QPushButton("Click me!");
auto label = new QLabel();
rppqt::source::from_signal(*button, &QPushButton::clicked) // <------ react on signals
| rpp::ops::tap([](int) { std::this_thread::sleep_for(std::chrono::milliseconds{500}); }) // some heavy job
| rpp::operators::scan(0, [](int seed, auto) { return ++seed; })
| rpp::operators::observe_on(rppqt::schedulers::main_thread_scheduler{}) // <--- go back to main QT scheduler
| rpp::operators::subscribe([&label](int clicks) {
label->setText(QString{"Clicked %1 times in total!"}.arg(clicks));
});
Check API reference of rppqt for more details

rppgrpc

RppGrpc is extension of RPP which enables support of grpc library.

gRPC provides way to handle requests and responses with help of reactors. RppGrpc is set of reactors (for both: client and server side) with all possible stream modes (client, server or bidirectional stream) to pass such an reactors to gRPC and handle them via RPP.

Server-side example:
grpc::ServerBidiReactor<Request, Response>* Bidirectional(grpc::CallbackServerContext* /*context*/) override
{
const auto reactor = new rppgrpc::server_bidi_reactor<Request, Response>();
reactor->get_observable().subscribe([](const Request&) {}, []() { std::cout << "DONE" << std::endl; });
reactor->get_observer().on_next(Response{});
return reactor;
}
Client-side example:
auto channel = grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials());
auto stub = TestService::NewStub(channel);
grpc::ClientContext ctx{};
stub->async()->Bidirectional(&ctx, reactor);
reactor->get_observable().subscribe([](const Response&) {});
reactor->init();
reactor->get_observer().on_next(Request{});
Check API reference of rppgrpc for more details

rppasio

RppAsio is extension of RPP which enables support of boost-asio library.

Check API reference of rppasio for more details