ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
User Guide

Introduction

I'm highly recommend to read this article: The introduction to Reactive Programming you've been missing

What is Reactive Programming?

Reactive programming is a design paradigm that focuses on building applications that can efficiently respond to asynchronous events.

Actually, any application or function has two core parts: input and output. Input/output can even be empty:

int main()
{
return 0;
}

Input/output itself can be split into the following two types:

  • Static - Your application or function just accepts such an input and handles it somehow. For example, arguments from the command line or arguments of your function:
int sum(int a, int b) { return a + b; }
  • Distributed in time - Your application or function doesn't know exact length of input, when input (or any parts of it) would arrive, but knows what to do when it happens:
#include <iostream>
int main()
{
while(true)
{
auto ch = ::getchar();
std::cout << "Obtained char " << ch << std::endl;
}
}

When dealing with input that is distributed in time, there are two ways to handle it:

  • Pulling - You decide when you need extra data (e.g., to get something, request, iterate, etc.) and you are simply checking/requesting some data. In most cases, this is a blocking operation of requesting data and waitign to be available or periodically checking its current status. For example, if you like a blog with non-periodical posts, you may check it daily for new posts manually.
  • Pushing - You decide once that you are interested in a source of data, notify this source somehow (e.g., register, subscribe, etc.), and react when new data becomes available to you. For example, you might subscribe to a blog and react to new posts only after receiving a notification on your smartphone, rather than manually checking for updates.

Reactive programming is a powerful way to handle input that is distributed in time. Instead of constantly polling for updates or waiting for input to arrive, reactive programming allows you to register callbacks to be executed when the input becomes available.

See https://reactivex.io/intro.html for more details.

Core concepts of Reactive Programming

In short, Reactive Programming can be described as follows:

  • An Observer subscribes to an Observable.
  • The Observable automatically notifies its subscribed Observers about any new events/emissions. Observable could invoke next observer's method:
    • on_next(T) - notifies about new event/emission
    • on_error(std::exception_ptr) - notified about error during work. It is termination event (no more calls from this observable should be expected)
    • on_completed() - notified about successful completion.It is termination event (no more calls from this observable should be expected)
    • set_upstream(disposable) - observable could pass to observer it's own disposable to provide ability for observer to terminate observable's internal actions/state.
    • is_disposed() - observable could check if observer is still interested in this source data (==false) or disposed and not listening anymore (==true)
  • During subscription, the Observable can return/provide a Disposable for Observer to provide ability to check if observable is still alive or make early termination (==dispose) if needed.

For example:

#include <rpp/rpp.hpp>
#include <iostream>
int main()
{
rpp::source::create<int>([](const auto& observer)
{
while (!observer.is_disposed())
{
char ch = ::getchar();
if (!::isdigit(ch))
{
observer.on_error(std::make_exception_ptr(std::runtime_error{"Invalid symbol"}));
return;
}
int digit = ch - '0';
if (digit == 0)
{
observer.on_completed();
return;
}
observer.on_next(digit);
}
})
.subscribe([](int val)
{
std::cout << "obtained val " << val << std::endl;
},
[](std::exception_ptr err)
{
std::cout << "obtained error " << std::endl;
},
[]()
{
std::cout << "Completed" << std::endl;
});
// input: 123456d
// output: obtained val 1
// obtained val 2
// obtained val 3
// obtained val 4
// obtained val 5
// obtained val 6
// obtained error
// input: 1230
// output: obtained val 1
// obtained val 2
// obtained val 3
// Completed
return 0;
}
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226

There we are creating observable that emits digits from console input:In case of user promted something else it is error for our observable (it is expected to emit ONLY digits). In this case we are notifying observer about it and just stopping. When user prompts 0, it means "end of observable".

See https://reactivex.io/documentation/observable.html for more details.

In such an way it is not powerful enough, so Reactive Programming provides a list of operators.

Observable contract

Reactive programming has Observable Contract. Please, read it.

This contact has next important part:

‍Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications

RPP follows this contract and especially this part. It means, that:

  1. All implemented in RPP operators are following this contract:
    All built-in RPP observables/operators emit emissions serially
  2. Any user-provided callbacks (for operators or observers) can be not thread-safe due to thread-safety of observable is guaranteed.
    For example: internal logic of take operator doesn't use mutexes or atomics due to underlying observable MUST emit items serially
  3. When you implement your own operator via create be careful to follow this contract!
  4. It is true EXCEPT FOR subjects if they are used manually due to users can use subjects for its own purposes there is potentially place for breaking this concept. Be careful and use serialized_* instead if can't guarantee serial emissions!

It means, that for example:

| rpp::operators::map([](int v)
{
std::cout << "enter " << v << std::endl;
std::this_thread::sleep_for(std::chrono::seconds{1});
std::cout << "exit " << v << std::endl;
return v;
})
Scheduler which schedules invoking of schedulables to another thread via queueing tasks with priority...
Definition new_thread.hpp:31
auto merge_with(TObservable &&observable, TObservables &&... observables)
Combines submissions from current observable with other observables into one.
Definition merge.hpp:249
auto just(const TScheduler &scheduler, T &&item, Ts &&... items)
Creates rpp::observable that emits a particular items and completes.
Definition from.hpp:206
auto map(Fn &&callable)
Transforms the items emitted by an Observable via applying a function to each item and emitting resul...
Definition map.hpp:94
auto repeat()
Repeats the Observabe's sequence of emissions infinite amount of times via re-subscribing on it durin...
Definition repeat.hpp:86
auto as_blocking()
Converts rpp::observable to rpp::blocking_observable
Definition as_blocking.hpp:47
auto subscribe_on(Scheduler &&scheduler)
OnSubscribe function for this observable will be scheduled via provided scheduler.
Definition subscribe_on.hpp:80

will never produce something like

enter 1
enter 2
exit 2
exit 1

only serially

enter 1
exit 1
enter 1
exit 1
enter 2
exit 2
enter 2
exit 2

Operators

Operators are way to modify the Observable's emissions to adapt values to the Observer.

For example, we can create observable to get chars from console input, do it till ‘0’ char, get only letters and send to observer this letters as UPPER. With operators it is pretty simple to do it in correct way:

#include <rpp/rpp.hpp>
#include <iostream>
int main()
{
| rpp::operators::take_while([](char v) { return v != '0'; })
| rpp::operators::filter(std::not_fn(&::isdigit))
| rpp::operators::map(&::toupper)
| rpp::operators::subscribe([](char v) { std::cout << v; });
// input: 12345qwer5125ttqt0
// output: QWERTTQT
return 0;
}
auto take_while(Fn &&predicate)
Sends items from observable while items are satisfy predicate. When condition becomes false -> sends ...
Definition take_while.hpp:91
auto from_callable(Callable &&callable)
Creates rpp::specific_observable that calls provided callable and emits resulting value of this calla...
Definition from.hpp:254
auto filter(Fn &&predicate)
Emit only those items from an Observable that satisfies a provided predicate.
Definition filter.hpp:91

You can check documentation for each operator on API Reference page. Below you can find details about how operator works and how to create your own custom operator in RPP.

See https://reactivex.io/documentation/operators.html for more details about operators concept.

How operator works?

Let's check this example:

rpp::source::create<int>([](const auto& observer){
observer.on_next(1);
observer.on_completed();
});

This example shows next: we create observble of int via operator create. This observable just emits to observer value 1 and then completes. Type of this observable is rpp::observable<int, ...> where ... implementation defined type. So, actually it is observable of ints. Let's say we want to convert int to std::string. We could subscribe and then convert it or use map operator (also known as transform) to transform some original value to some another value:

rpp::source::create<int>([](const auto& observer){
observer.on_next(1);
observer.on_completed();
})
| rpp::operators::map([](int v){ return std::string{v}; });

For now it is observable of strings due to it is rpp::observable<std::string, ...>. But what is rpp::operators::map then? Actually it is functor-adaptor - just functor accepting observable and returning another observable. It accepts original observable and converts it to observable of "final type". "final type" is result of invocation of passed function against original observable's type. In our case it is decltype([](int v){ return std::string{v}; }(int{})) is it is std::string. So, map can be implemented in the following way:

template<typename Fn>
struct map
{
Fn fn{};
template<typename Type, typename Internal>
auto operator()(const rpp::observable<Type, Internal>& observable) const {
using FinalType = std::invoke_result_t<Fn, Type>;
return rpp::source::create<FinalType>([observable, fn](const rpp::dynamic_observer<FinalType>& observer)
{
observable.subscribe([observer, fn](const auto& v) { observer.on_next(fn(v)); },
[observer](const std::exception_ptr& err) { observer.on_error(err); },
[observer]() { observer.on_completed(); });
};);
}
}
void on_next(const Type &v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:84
void on_completed() const noexcept
Observable calls this method to notify observer about completion of emissions.
Definition observer.hpp:135
void on_error(const std::exception_ptr &err) const noexcept
Observable calls this method to notify observer about some error during generation next data.
Definition observer.hpp:120
Type-erased version of the rpp::observer. Any observer can be converted to dynamic_observer via rpp::...
Definition dynamic_observer.hpp:110
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
void subscribe(observer< Type, ObserverStrategy > &&observer) const
Subscribes passed observer to emissions from this observable.
Definition observable.hpp:57

It is template for such an functor-adaptor. Provided example - is simplest possible way to implement new operators - just provide function for transformation of observable. For example, it is fully valid example:

| [](const auto& observable) { return rpp::source::concat(observable, rpp::source::just(2)); };
auto concat(TObservable &&obs, TObservables &&... others)
Make observable which would merge emissions from underlying observables but without overlapping (curr...
Definition concat.hpp:169

There we convert observable to concatenation of original observable and just(2).

One more posible but a bit more advanced way to implement operators - is to lift observer. To do this, your functor-adapter must to satisfy rpp::constraint::operator_lift concept. Actually, your class must to have:

  • member function lift accepting downstream observer and returning new upstream observer
  • inner template<rpp::constraint::decayed_type T> struct traits struct accepting typename of upstream and providing:
    • using result_type = with typename of new resulting type for new observable
    • (optionally) struct requirements with static_asserts over passed type

Example:

template<typename Fn>
struct map
{
template<rpp::constraint::decayed_type T>
struct traits
{
struct requirements
{
static_assert(std::invocable<Fn, T>, "Fn is not invocable with T");
};
using result_type = std::invoke_result_t<Fn, T>;
};
Fn fn{};
template<typename Upstream, typename Downstream>
auto lift(const rpp::dynamic_observer<Downstream>& observer) const
{
return rpp::make_lambda_observer<Upstream>([observer, fn](const auto& v){ observer.on_next(fn(v)); },
[observer](const std::exception_ptr& err) { observer.on_error(err); },
[observer]() { observer.on_completed(); });
}
}

In this case you providing logic how to convert downstream observer to upstream observer. Actually this implementation is equal to previous one, but without handling of observable - you are expressing your operator in terms of observers

**(Advanced)** In case of implementing operator via lift you can control disposable strategy via updated_disposable_strategy parameter. It accepts disposable strategy of upstream and returns disposable strategy for downstream. It needed only for optimization and reducing disposables handling cost and it is purely advanced thing. Not sure if anyone is going to use it by its own for now =)

Schedulers

Reactive programming becomes even more powerful when observables can operate across multiple threads, rather than being confined to the thread of creation and subscription. This allows for non-blocking, asynchronous operations and provides significant advantages over raw iteration or other pull-based approaches. To enable multithreading in your observables, you can use Schedulers.

By default, an Observable will perform its work in the thread where the subscribe operation occurs. However, you can change this behavior using the subscribe_on operator. This operator forces the observable to perform the subscription and any subsequent work in the specified scheduler. For example:

In this case subscribe to 3 happens in current thread (where subscribe invoked). But during subscription to 2 it schedules subscription to 1 to provided new_thread scheduler. So, subscription to final observable and it's internal logic (iterating and emitting of values) happens inside new_thread. Actually it is something like this:

rpp::source::create<int>([](const auto& observer)
{
rpp::schedulers::new_thread{}.create_worker([](...) {
rpp::source::just(1,2,3,4,5).subscribe(observer);
})
}).subscribe(...);

The observe_on operator specifies the scheduler that will be used for emission during the processing of further operators after observe_on. For example

auto observe_on(Scheduler &&scheduler, rpp::schedulers::duration delay_duration={})
Specify the Scheduler on which an observer will observe this Observable.
Definition observe_on.hpp:38

In this case whole subscription flow happens in thread of subscription, but emission of values transfers to another thread. Actually it is something like this:

rpp::source::create<int>([](const auto& observer)
{
auto worker = rpp::schedulers::new_thread{}.create_worker();
rpp::source::just(1,2,3,4,5).subscribe([](int v) {
worker.scheduler([](...) {
observer.on_next(v);
}
})
}).subscribe(...);

A Scheduler is responsible for controlling the type of multithreading behavior (or lack thereof) used in the observable. For example, a scheduler can utilize a new thread, a thread pool, or a raw queue to manage its processing.

Checkout API Reference to learn more about schedulers in RPP.

See https://reactivex.io/documentation/scheduler.html for more details about scheduler concept.

Disposable

In reactive programming, a disposable is an object that represents a resource that needs to be released or disposed of when it is no longer needed. This can include things like file handles, network connections, or any other resource that needs to be cleaned up after use.

The purpose of a disposable is to provide a way to manage resources in a safe and efficient manner. By using disposables, you can ensure that resources are released in a timely manner, preventing memory leaks and other issues that can arise from resource leaks.

In most cases disposables are placed in observers. RPP's observer can use two types of disposables:

  1. Upstream disposable - This is a disposable that the observable puts into the observer. The upstream disposable keeps some state or callback that should be disposed of when the observer is disposed. This ensures that any resources used by the observable are properly cleaned up when the observer obtains on_error/on_completed or disposed in any other way.
  2. External disposable - This is a disposable that allows the observer to be disposed of from outside the observer itself. This can be useful in situations where you need to cancel an ongoing operation or release resources before the observable has completed its work.

Exception guarantee

In non-reactive programming functions/modules throws exception in case of something invalid. As a result, user can catch it and handle it somehow while internal state of objects can be in some state (invalid/untouched/partly valid) and etc.

In reactive programming there is another way of exception mechanism: throwing exception as is from original place is useless. Notification about "something goes wrong" need to receive observer/subscriber, not owner of callstack. As a result, ANY exception obtained during emitting items and etc WOULD be delivered to subscriber/observer via on_error function and then unsubscribe happens. As a result, no any raw exceptions would be throws during using RPP. In case of emitting on_error whole internal state of observable keeps valid but it doesn't matter - whole chain would be destroyed due to on_error forces unsubscribe. Reactive catching mechanisms like catch or retry re-subscribes on observable. it means, that new chain with new states would be created, not re-used existing one.

Memory Model

In ReactivePlusPlus there is new concept unique for this implementation: rpp::memory_model:

Some of the operators and sources like rpp::source::just or rpp::operators::start_with accepts user's variables for usage. Some of this types can be such an expensive to copy or move and it would be preferable to copy it once to heap, but some other types (like POD) are cheap enough and usage of heap would be overkill. But these variables should be saved inside somehow!

So, RPP provides ability to select strategy "how to deal with such a variables" via rpp::memory_model enum.

Examples

For example, rpp::source::just

rpp::source::just(my_custom_variable);

by default just uses rpp::memory_model::use_stack and my_custom_variable would be copied and moved everywhere when needed. On the other hand

rpp::source::just<rpp::memory_model::use_shared>(my_custom_variable);

makes only 1 copy/move to shared_ptr and then uses it instead.

As a a result, users can select preferable way of handling of their types.

ReactivePlusPlus specific

dynamic_* versions to keep classes as variables

Most of the classes inside rpp library including observable, observer and others are heavy-templated classes. It means, it could has a lot of template params. In most cases you shouldn't worry about it due to it is purely internal problem.

But in some cases you want to keep observable or observer inside your classes or return it from function. In most cases I strongly recommend you to use auto to deduce type automatically. But in some cases it is not possible (for example, to keep observable as member variable). For such an usage you could use dynamic_observable and dynamic_observer:

  • they are type-erased wrappers over regular observable/observer with goal to hide all unnecessary stuff from user's code. For example, you can easily use it as:
    #include <rpp/rpp.hpp>
    #include <iostream>
    struct some_data
    {
    };
    int main() {
    some_data v{rpp::source::just(1,2,3),
    std::cout << value << std::endl;
    })};
    v.observable.subscribe(v.observer);
    }
    Type-erased version of the rpp::observable. Any observable can be converted to dynamic_observable via...
    Definition dynamic_observable.hpp:88
    auto make_lambda_observer(OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) -> lambda_observer< Type, std::decay_t< OnNext >, std::decay_t< OnError >, std::decay_t< OnCompleted > >
    Constructs observer specialized with passed callbacks. Most easiesest way to construct observer "on t...
    Definition lambda_observer.hpp:49
  • to convert observable/observer to dynamic_* version you could manually call as_dynamic() member function or just pass them to ctor
  • actually they are similar to rxcpp's observer<T> and observable<T> but provides EXPLICIT definition of dynamic fact
  • due to type-erasure mechanism dynamic_ provides some minor performance penalties due to extra usage of shared_ptr to keep internal state + indirect calls. It is not critical in case of storing it as member function, but could be important in case of using it on hot paths like this:
    | rpp::ops::map([](int v) { return rpp::source::just(v); })
    | rpp::ops::flat_map([](rpp::dynamic_observable<int> observable) {
    return observable | rpp::ops::filter([](int v){ return v % 2 == 0;});
    });
    ^^^ while it is fully valid code, flat_map have to convert observable to dynamic version via extra heap, but it is unnecessary. It is better to use auto in this case.
    | rpp::ops::map([](int v) { return rpp::source::just(v); })
    | rpp::ops::flat_map([](const rpp::constraint::observable_of_type<int> auto& observable) { // or just `const auto& observable`
    return observable | rpp::ops::filter([](int v){ return v % 2 == 0;});
    });