ReactivePlusPlus
ReactiveX implementation for C++20
|
Reactive programming is a design paradigm that focuses on building applications that can efficiently respond to asynchronous events.
Every application or function has two core parts: input and output. Input/output can even be empty:
Input/output can be categorized into two types:
When dealing with input that is distributed in time, there are two ways to handle it:
Reactive programming is a powerful way to handle input that is distributed in time. Instead of constantly polling for updates, reactive programming allows you to register callbacks to be executed when the input becomes available.
See https://reactivex.io/intro.html for more details.
Reactive Programming can be described as follows:
During subscription, the Observable can provide a Disposable for the Observer to check if the observable is still alive or to terminate early if needed.
For example:
There we are creating observable that emits digits from console input:In case of user promted something else it is error for our observable (it is expected to emit ONLY digits). In this case we are notifying observer about it and just stopping. When user prompts 0
, it means "end of observable".
See https://reactivex.io/documentation/observable.html for more details.
In such an way it is not powerful enough, so Reactive Programming provides a list of operators.
Observable is the source of any Reactive Stream.
Observable is the source of any Reactive Stream. Observable provides ability to subscribe observer on stream of events. After subscription observable would emit values to observer.
Reactive programming has an Observable Contract. Please read it.
This contract includes:
Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.
RPP follows this contract, meaning:
take
operator doesn't use mutexes or atomics due to underlying observable MUST emit items seriallycreate
, follow this contract!For example:
This will never produce:
Only serially:
Observer subscribes on Observable and obtains values provided by Observable.
In fact observer is kind of wrapper over 3 core functions:
on_next(T)
- callback with new emission provided by observableon_error(err)
- failure termination callback with reason of failure of observable (why observable can't continue processing)on_completed()
- succeed termination callback - observable is done, no any future emissions from thisAdditionally in RPP observer handles disposables related logic:
set_upstream(disposable)
- observable could pass to observer it's own disposable to provide ability for observer to terminate observable's internal actions/state.is_disposed()
- observable could check if observer is still interested in emissions (false
) or done and no any futher calls would be success (true
)subscribe
function of observables. Something like this: make_lambda_observer
function, but it is not recommended too: it could disable some built-in optimizations and cause worse performance. Operators modify observables and extend them with custom logic.
Observables emit values based on underlying logic, such as iterating over a vector and etc. Operators allow you to enhance this stream, for example, by filtering values, transforming them, etc., resulting in a more suitable stream for specific cases.
Example: Create an observable to read characters from console input, continue until '0' is encountered, filter out non-letter characters, and send the remaining letters as uppercase to the observer:
This example creates an observable of int
using the create
operator, which emits the value 1
and then completes. The type of this observable is rpp::observable<int, ...>
, where ...
is an implementation-defined type. To convert int
to std::string
, you can use the map
operator:
Now it is an observable of strings
(rpp::observable<std::string, ...>
). The map
operator is a functor-adaptor that accepts an observable and returns another observable. It transforms the original observable's type to the "final type" by invoking the passed function. In this case, the final type is std::string
. The map
operator can be implemented in multiple ways:
1) call-based (function/functor or others) - operator accepts (old) observable and returns new (modified) observable
It is template for such an functor-adaptor. It is also fully valid example of call-based operator:
This converts the observable to a concatenation of the original observable and just(2)
.
2) type-traits based - should satisfy rpp::constraint::operator_ concept.
For example, you can implement such an operator like this:
But in this case you are missing disposables-related functionality. So, it is better to implement it via providing custom observer's strategy with correct handling of disposables. Check real rpp::operators::map implementation for it =)
Check the operators for more details about operators.
Reactive programming becomes even more powerful when observables can operate across multiple threads, rather than being confined to the thread of creation and subscription. This allows for non-blocking, asynchronous operations and provides significant advantages over raw iteration or other pull-based approaches. To enable multithreading in your observables, you can use Schedulers.
By default, an Observable will perform its work in the thread where the subscribe operation occurs. However, you can change this behavior using the subscribe_on operator. This operator forces the observable to perform the subscription and any subsequent work in the specified scheduler. For example:
In this case subscribe to 3
happens in current thread (where subscribe invoked). But during subscription to 2
it schedules subscription to 1
to provided new_thread
scheduler. So, subscription to final observable and it's internal logic (iterating and emitting of values) happens inside new_thread. Actually it is something like this:
The observe_on operator specifies the scheduler that will be used for emission during the processing of further operators after observe_on. For example
In this case whole subscription flow happens in thread of subscription, but emission of values transfers to another thread. Actually it is something like this:
A Scheduler is responsible for controlling the type of multithreading behavior (or lack thereof) used in the observable. For example, a scheduler can utilize a new thread, a thread pool, or a raw queue to manage its processing.
Check the API Reference for more details about schedulers.
See https://reactivex.io/documentation/scheduler.html for more details about schedulers.
Disposable is handle/resource passed from observable to observer via the set_upstream
method. Observer disposes this disposable when it wants to unsubscribe from observable.
In reactive programming, a disposable is an object that represents a resource that needs to be released or disposed of when it is no longer needed. This can include things like file handles, network connections, or any other resource that needs to be cleaned up after use. The purpose of a disposable is to provide a way to manage resources in a safe and efficient manner. By using disposables, you can ensure that resources are released in a timely manner, preventing memory leaks and other issues that can arise from resource leaks.
There are 2 main purposes of disposables:
subscribe
method or use subscribe_with_disposable
overload instead.Check API reference of disposables for more details
In non-reactive programming functions/modules throws exception in case of something invalid. As a result, user can catch it and handle it somehow while internal state of objects can be in some state (invalid/untouched/partly valid) and etc.
In reactive programming there is another way of exception mechanism: throwing exception as is from original place is useless. Notification about "something goes wrong" need to receive observer/subscriber, not owner of callstack. As a result, ANY exception obtained during emitting items and etc WOULD be delivered to subscriber/observer via on_error
function and then unsubscribe happens. As a result, no any raw exceptions would be throws during using RPP. In case of emitting on_error
whole internal state of observable keeps valid but it doesn't matter - whole chain would be destroyed due to on_error
forces unsubscribe. Reactive catching mechanisms like catch
or retry
re-subscribes on observable. it means, that new chain with new states would be created, not re-used existing one.
In ReactivePlusPlus there is new concept unique for this implementation: rpp::memory_model:
Some of the operators and sources like rpp::source::just
or rpp::operators::start_with
accepts user's variables for usage. Some of this types can be such an expensive to copy or move and it would be preferable to copy it once to heap, but some other types (like POD) are cheap enough and usage of heap would be overkill. But these variables should be saved inside somehow!
So, RPP provides ability to select strategy "how to deal with such a variables" via rpp::memory_model
enum.
For example, rpp::source::just
by default just
uses rpp::memory_model::use_stack
and my_custom_variable
would be copied and moved everywhere when needed. On the other hand
makes only 1 copy/move to shared_ptr and then uses it instead.
As a a result, users can select preferable way of handling of their types.
Rpp has following disposables related classes:
interface_disposable
- is base inerface for all disposables in RPP. Simplest ever disposable with dispose()
and is_disposed()
method. This type of disposable observable is passing to observer.callback_disposable
- is just noexcept to be called on dispose. Can be constructed like this: interface_composite_disposable
- is base interface for disposables able to keep dependent disposables inside: main difference - new method add
accepting another dispoable inhereting from interface_disposable
. Main idea: interface_composite_disposable
is aggregating other disposables inside and during dispose()
method calling dispose()
method of its dependents.composite_disposable
- is concrete realization of interface_composite_disposable
refcount_disposable
- is variant of composite_disposable
but it keeps refcounter inside. This counter can be incremented with help of add_ref()
method returning new dependent composite_disposable
. Idea is simple: original refcount_disposable
would be disposed IF all of its dependents disposables (created via add_ref()
) dispose()
methods were called.All disposable in RPP should be created and used via rpp::disposable_wrapper_impl<T>
wrapper. For simplicity usage it has 2 base aliases:
disposable_wrapper
- wrapper over interface_disposable
composite_disposable_wrapper
- wrapper over interface_composite_disposable
Most of the classes inside rpp library including observable
, observer
and others are heavy-templated classes. It means, it could has a lot of template params. In most cases you shouldn't worry about it due to it is purely internal problem.
But in some cases you want to keep observable or observer inside your classes or return it from function. In most cases I strongly recommend you to use auto
to deduce type automatically. But in some cases it is not possible (for example, to keep observable as member variable). For such an usage you could use dynamic_observable
and dynamic_observer
:
as_dynamic()
member function or just pass them to ctorobserver<T>
and observable<T>
but provides EXPLICIT definition of dynamic
factdynamic_
provides some minor performance penalties due to extra usage of shared_ptr
to keep internal state + indirect calls. It is not critical in case of storing it as member function, but could be important in case of using it on hot paths like this: flat_map
have to convert observable to dynamic version via extra heap, but it is unnecessary. It is better to use auto
in this case. RPP is library to build reactive streams. But in general applicaton uses some another framework/library to build core logic of application. With some of them RPP can be unified to build much more better software. Below you can find list of extensions for RPP with adaption to external frameworks for much more easiser integeration with RPP. These extensions are part of RPP library:
RppQt is extension of RPP which enables support of Qt library.
RppQt is set of wrappers to connect QT world with RPP.
RppGrpc is extension of RPP which enables support of grpc library.
gRPC provides way to handle requests and responses with help of reactors. RppGrpc is set of reactors (for both: client and server side) with all possible stream modes (client, server or bidirectional stream) to pass such an reactors to gRPC and handle them via RPP.
RppAsio is extension of RPP which enables support of boost-asio library.
Check API reference of rppasio for more details