I'm highly recommend to read this article: The introduction to Reactive Programming you've been missing
Reactive programming is a design paradigm that focuses on building applications that can efficiently respond to asynchronous events.
Actually, any application or function has two core parts: input and output. Input/output can even be empty:
Input/output itself can be split into the following two types:
When dealing with input that is distributed in time, there are two ways to handle it:
Reactive programming is a powerful way to handle input that is distributed in time. Instead of constantly polling for updates or waiting for input to arrive, reactive programming allows you to register callbacks to be executed when the input becomes available.
See https://reactivex.io/intro.html for more details.
In short, Reactive Programming can be described as follows:
For example:
There we are creating observable that emits digits from console input:In case of user promted something else it is error for our observable (it is expected to emit ONLY digits). In this case we are notifying observer about it and just stopping. When user prompts 0
, it means "end of observable".
See https://reactivex.io/documentation/observable.html for more details.
In such an way it is not powerful enough, so Reactive Programming provides a list of operators.
Reactive programming has Observable Contract. Please, read it.
This contact has next important part:
Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications
RPP follows this contract and especially this part. It means, that:
take
operator doesn't use mutexes or atomics due to underlying observable MUST emit items seriallycreate
be careful to follow this contract!It means, that for example:
will never produce something like
only serially
Operators are way to modify the Observable's emissions to adapt values to the Observer.
For example, we can create observable to get chars from console input, do it till ‘0’ char, get only letters and send to observer this letters as UPPER. With operators it is pretty simple to do it in correct way:
You can check documentation for each operator on API Reference page. Below you can find details about how operator works and how to create your own custom operator in RPP.
See https://reactivex.io/documentation/operators.html for more details about operators concept.
Let's check this example:
This example shows next: we create observble of int
via operator create
. This observable just emits to observer value 1
and then completes. Type of this observable is rpp::observable<int, ...>
where ...
implementation defined type. So, actually it is observable of ints
. Let's say we want to convert int
to std::string
. We could subscribe and then convert it or use map
operator (also known as transform
) to transform some original value to some another value:
For now it is observable of strings
due to it is rpp::observable<std::string, ...>
. But what is rpp::operators::map
then? Actually it is functor-adaptor - just functor accepting observable and returning another observable. It accepts original observable and converts it to observable of "final type". "final type" is result of invocation of passed function against original observable's type. In our case it is decltype([](int v){ return std::string{v}; }(int{}))
is it is std::string
. So, map
can be implemented in the following way:
It is template for such an functor-adaptor. Provided example - is simplest possible way to implement new operators - just provide function for transformation of observable. For example, it is fully valid example:
There we convert observable to concatenation of original observable and just(2)
.
One more posible but a bit more advanced way to implement operators - is to lift observer. To do this, your functor-adapter must to satisfy rpp::constraint::operator_lift
concept. Actually, your class must to have:
lift
accepting downstream observer and returning new upstream observertemplate<rpp::constraint::decayed_type T> struct traits
struct accepting typename of upstream and providing:using result_type =
with typename of new resulting type for new observablestruct requirements
with static_asserts over passed typeExample:
In this case you providing logic how to convert downstream observer to upstream observer. Actually this implementation is equal to previous one, but without handling of observable - you are expressing your operator in terms of observers
**(Advanced)** In case of implementing operator via lift
you can control disposable strategy via updated_disposable_strategy
parameter. It accepts disposable strategy of upstream and returns disposable strategy for downstream. It needed only for optimization and reducing disposables handling cost and it is purely advanced thing. Not sure if anyone is going to use it by its own for now =)
Reactive programming becomes even more powerful when observables can operate across multiple threads, rather than being confined to the thread of creation and subscription. This allows for non-blocking, asynchronous operations and provides significant advantages over raw iteration or other pull-based approaches. To enable multithreading in your observables, you can use Schedulers.
By default, an Observable will perform its work in the thread where the subscribe operation occurs. However, you can change this behavior using the subscribe_on operator. This operator forces the observable to perform the subscription and any subsequent work in the specified scheduler. For example:
In this case subscribe to 3
happens in current thread (where subscribe invoked). But during subscription to 2
it schedules subscription to 1
to provided new_thread
scheduler. So, subscription to final observable and it's internal logic (iterating and emitting of values) happens inside new_thread. Actually it is something like this:
The observe_on operator specifies the scheduler that will be used for emission during the processing of further operators after observe_on. For example
In this case whole subscription flow happens in thread of subscription, but emission of values transfers to another thread. Actually it is something like this:
A Scheduler is responsible for controlling the type of multithreading behavior (or lack thereof) used in the observable. For example, a scheduler can utilize a new thread, a thread pool, or a raw queue to manage its processing.
Checkout API Reference to learn more about schedulers in RPP.
See https://reactivex.io/documentation/scheduler.html for more details about scheduler concept.
In reactive programming, a disposable is an object that represents a resource that needs to be released or disposed of when it is no longer needed. This can include things like file handles, network connections, or any other resource that needs to be cleaned up after use.
The purpose of a disposable is to provide a way to manage resources in a safe and efficient manner. By using disposables, you can ensure that resources are released in a timely manner, preventing memory leaks and other issues that can arise from resource leaks.
In most cases disposables are placed in observers. RPP's observer can use two types of disposables:
In non-reactive programming functions/modules throws exception in case of something invalid. As a result, user can catch it and handle it somehow while internal state of objects can be in some state (invalid/untouched/partly valid) and etc.
In reactive programming there is another way of exception mechanism: throwing exception as is from original place is useless. Notification about "something goes wrong" need to receive observer/subscriber, not owner of callstack. As a result, ANY exception obtained during emitting items and etc WOULD be delivered to subscriber/observer via on_error
function and then unsubscribe happens. As a result, no any raw exceptions would be throws during using RPP. In case of emitting on_error
whole internal state of observable keeps valid but it doesn't matter - whole chain would be destroyed due to on_error
forces unsubscribe. Reactive catching mechanisms like catch
or retry
re-subscribes on observable. it means, that new chain with new states would be created, not re-used existing one.
In ReactivePlusPlus there is new concept unique for this implementation: rpp::memory_model:
Some of the operators and sources like rpp::source::just
or rpp::operators::start_with
accepts user's variables for usage. Some of this types can be such an expensive to copy or move and it would be preferable to copy it once to heap, but some other types (like POD) are cheap enough and usage of heap would be overkill. But these variables should be saved inside somehow!
So, RPP provides ability to select strategy "how to deal with such a variables" via rpp::memory_model
enum.
For example, rpp::source::just
by default just
uses rpp::memory_model::use_stack
and my_custom_variable
would be copied and moved everywhere when needed. On the other hand
makes only 1 copy/move to shared_ptr and then uses it instead.
As a a result, users can select preferable way of handling of their types.
Most of the classes inside rpp library including observable
, observer
and others are heavy-templated classes. It means, it could has a lot of template params. In most cases you shouldn't worry about it due to it is purely internal problem.
But in some cases you want to keep observable or observer inside your classes or return it from function. In most cases I strongly recommend you to use auto
to deduce type automatically. But in some cases it is not possible (for example, to keep observable as member variable). For such an usage you could use dynamic_observable
and dynamic_observer
:
as_dynamic()
member function or just pass them to ctorobserver<T>
and observable<T>
but provides EXPLICIT definition of dynamic
factdynamic_
provides some minor performance penalties due to extra usage of shared_ptr
to keep internal state + indirect calls. It is not critical in case of storing it as member function, but could be important in case of using it on hot paths like this: flat_map
have to convert observable to dynamic version via extra heap, but it is unnecessary. It is better to use auto
in this case.