ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
Operators

Operators modify observables and extend them with custom logic. More...

Topics

 Transforming Operators
 Transforming operators are operators that transform items provided by observable.
 
 Filtering Operators
 Filtering operators are operators that emit only part of items that satisfies some condition.
 
 Conditional Operators
 Conditional operators are operators that emit items based on some condition including condition of items from other observables.
 
 Combining Operators
 Combining operators are operators that combines emissions of multiple observables into same observable by some rule.
 
 Utility Operators
 Utility operators are operators that provide some extra functionality without changing of original values, but changing of behaviour.
 
 Connectable Operators
 Connectable operators are operators that provide extra functionality for multicasting of controlling of subscription.
 
 Aggregate Operators
 Aggregate operators are operators that operate on the entire sequence of items emitted by an Observable.
 
 Error Handling Operators
 Operators that help to recover from error notifications from an Observable.
 
 Creational Operators
 Creational operators are operators that create new observable.
 

Concepts

concept  rpp::constraint::operator_subscribe
 Simple operator defining logic how to subscribe passed observer to passed observable. In most cases it means operator have some custom logic over observable too, so, you need to have access to observable, for example, subscribe to observable multiple times.
 
concept  rpp::constraint::operator_lift
 Accept downstream observer and return new upstream (of type Type) observer.
 
concept  rpp::constraint::operator_lift_with_disposables_strategy
 Same as rpp::constraint::operator_lift but with custom disposables logic. For example, if you are manually create storage for disposables and want to do it optimal.
 
concept  rpp::constraint::operator_
 

Detailed Description

Operators modify observables and extend them with custom logic.

Observables emit values based on underlying logic, such as iterating over a vector and etc. Operators allow you to enhance this stream, for example, by filtering values, transforming them, etc., resulting in a more suitable stream for specific cases.

Example: Create an observable to read characters from console input, continue until '0' is encountered, filter out non-letter characters, and send the remaining letters as uppercase to the observer:

#include <rpp/rpp.hpp>
#include <iostream>
int main()
{
| rpp::operators::take_while([](char v) { return v != '0'; })
| rpp::operators::filter(std::not_fn(&::isdigit))
| rpp::operators::map(&::toupper)
| rpp::operators::subscribe([](char v) { std::cout << v; });
// input: 12345qwer5125ttqt0
// output: QWERTTQT
return 0;
}
auto take_while(Fn &&predicate)
Sends items from observable while items are satisfy predicate. When condition becomes false -> sends ...
Definition take_while.hpp:91
auto from_callable(Callable &&callable)
Creates rpp::specific_observable that calls provided callable and emits resulting value of this calla...
Definition from.hpp:249
auto filter(Fn &&predicate)
Emit only those items from an Observable that satisfies a provided predicate.
Definition filter.hpp:91
auto map(Fn &&callable)
Transforms the items emitted by an Observable via applying a function to each item and emitting resul...
Definition map.hpp:94
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226
auto repeat()
Repeats the Observabe's sequence of emissions infinite amount of times via re-subscribing on it durin...
Definition repeat.hpp:86
How operators work and how to create your own?
Example:
rpp::source::create<int>([](const auto& observer){
observer.on_next(1);
observer.on_completed();
});
auto create(OnSubscribe &&on_subscribe)
Construct observable specialized with passed callback function. Most easiesest way to construct obser...
Definition create.hpp:57

This example creates an observable of int using the create operator, which emits the value 1 and then completes. The type of this observable is rpp::observable<int, ...>, where ... is an implementation-defined type. To convert int to std::string, you can use the map operator:

rpp::source::create<int>([](const auto& observer){
observer.on_next(1);
observer.on_completed();
})
| rpp::operators::map([](int v){ return std::to_string(v); });

Now it is an observable of strings (rpp::observable<std::string, ...>). The map operator is a functor-adaptor that accepts an observable and returns another observable. It transforms the original observable's type to the "final type" by invoking the passed function. In this case, the final type is std::string. The map operator can be implemented in multiple ways:

1) call-based (function/functor or others) - operator accepts (old) observable and returns new (modified) observable

template<typename Fn>
struct map
{
Fn fn{};
template<typename Type, typename Internal>
auto operator()(const rpp::observable<Type, Internal>& observable) const {
using FinalType = std::invoke_result_t<Fn, Type>;
return rpp::source::create<FinalType>([observable, fn](const rpp::dynamic_observer<FinalType>& observer)
{
observable.subscribe([observer, fn](const auto& v) { observer.on_next(fn(v)); },
[observer](const std::exception_ptr& err) { observer.on_error(err); },
[observer]() { observer.on_completed(); });
});
}
}
void on_next(const Type &v) const noexcept
Observable calls this method to notify observer about new value.
Definition observer.hpp:84
void on_completed() const noexcept
Observable calls this method to notify observer about completion of emissions.
Definition observer.hpp:135
void on_error(const std::exception_ptr &err) const noexcept
Observable calls this method to notify observer about some error during generation next data.
Definition observer.hpp:120
Type-erased version of the rpp::observer. Any observer can be converted to dynamic_observer via rpp::...
Definition dynamic_observer.hpp:129
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
void subscribe(observer< Type, ObserverStrategy > &&observer) const
Subscribes passed observer to emissions from this observable.
Definition observable.hpp:58

It is template for such an functor-adaptor. It is also fully valid example of call-based operator:

| [](const auto& observable) { return rpp::source::concat(observable, rpp::source::just(2)); };
auto just(const TScheduler &scheduler, T &&item, Ts &&... items)
Creates rpp::observable that emits a particular items and completes.
Definition from.hpp:201
auto concat(TObservable &&obs, TObservables &&... others)
Make observable which would merge emissions from underlying observables but without overlapping (curr...
Definition concat.hpp:168

This converts the observable to a concatenation of the original observable and just(2).

2) type-traits based - should satisfy rpp::constraint::operator_ concept.
For example, you can implement such an operator like this:

template<typename Fn>
struct simple_map
{
simple_map(const Fn& fn)
: fn(fn)
{
}
Fn fn{};
// 1: define traits for the operator with upstream (previous type) type
template<rpp::constraint::decayed_type T>
{
// 1.1: it could have static asserts to be sure T is applicable for this operator
static_assert(std::invocable<Fn, T>, "Fn is not invocable with T");
// 1.2: it should have `result_type` is type of new observable after applying this operator
using result_type = std::invoke_result_t<Fn, T>;
};
// 2: define updated optimal disposables strategy. Set to `rpp::details::observables::default_disposables_strategy` if you don't know what is that.
template<rpp::details::observables::constraint::disposables_strategy Prev>
using updated_optimal_disposables_strategy = Prev;
// 3: implement core logic of operator: accept downstream observer (of result_type) and convert it to upstream observer (of T).
template<typename Upstream, rpp::constraint::observer Observer>
auto lift(Observer&& observer) const
{
const auto dynamic_observer = std::forward<Observer>(observer).as_dynamic();
return rpp::make_lambda_observer<Upstream>([dynamic_observer, fn = fn](const auto& v) { dynamic_observer.on_next(fn(v)); },
[dynamic_observer](const std::exception_ptr& err) { dynamic_observer.on_error(err); },
[dynamic_observer]() { dynamic_observer.on_completed(); });
}
};
template<typename Fn>
simple_map(const Fn& fn) -> simple_map<Fn>;
void test()
{
rpp::source::just(1) | simple_map([](int v) { return std::to_string(v); }) | rpp::ops::subscribe();
}

But in this case you are missing disposables-related functionality. So, it is better to implement it via providing custom observer's strategy with correct handling of disposables. Check real rpp::operators::map implementation for it =)

See also
https://reactivex.io/documentation/operators.html