ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
Transforming Operators

Transforming operators are operators that transform items provided by observable. More...

Functions

template<typename ... Args>
requires is_header_included<buffer_tag, Args...>
auto observable::buffer (size_t count) const &
 Periodically gather emissions emitted by an original Observable into bundles and emit these bundles rather than emitting the items one at a time.
 
template<flat_map_callable< Type > Callable>
requires is_header_included<flat_map_tag, Callable>
auto observable::flat_map (Callable &&callable) const &
 Transform emissions to observables via provided function and then merge emissions from such an observables.
 
template<std::invocable< Type > KeySelector, std::invocable< Type > ValueSelector = std::identity, typename TKey = rpp::utils::decayed_invoke_result_t<KeySelector, Type>, std::strict_weak_order< TKey, TKey > KeyComparator = std::less<TKey>>
requires is_header_included<group_by_tag, KeySelector, ValueSelector, TKey, KeyComparator>
auto observable::group_by (KeySelector &&key_selector, ValueSelector &&value_selector={}, KeyComparator &&comparator={}) const &
 Divide original observable into multiple observables where each new observable emits some group of values from original observable.
 
template<std::invocable< Type > Callable>
requires is_header_included<map_tag, Callable>
auto observable::map (Callable &&callable) const &
 Transform the items emitted by an Observable via applying a function to each item and emitting result.
 
template<typename Result , scan_accumulator< Result, Type > AccumulatorFn>
requires is_header_included<scan_tag, Result, AccumulatorFn>
auto observable::scan (Result &&initial_value, AccumulatorFn &&accumulator) const &
 Apply accumulator function for each emission from observable and result of accumulator from previous step and emit (and cache) resulting value.
 
template<switch_map_callable< Type > Callable>
requires is_header_included<switch_map_tag, Callable>
auto observable::switch_map (Callable &&callable) const &
 convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables
 
template<typename ... Args>
requires is_header_included<window_tag, Args...>
auto observable::window (size_t window_size) const &
 Subdivide original observable into sub-observables (windowed observables) and emit sub-observables of items instead of original items.
 

Detailed Description

Transforming operators are operators that transform items provided by observable.

See also
https://reactivex.io/documentation/operators.html#transforming

Function Documentation

◆ buffer()

template<constraint::decayed_type Type, typename SpecificObservable >
template<typename ... Args>
requires is_header_included<buffer_tag, Args...>
auto observable::buffer ( size_t  count) const &
inline

Periodically gather emissions emitted by an original Observable into bundles and emit these bundles rather than emitting the items one at a time.

The resulting bundle is std::vector<Type> of requested size. Actually it is similar to window() operator, but it emits vectors instead of observables.

Parameters
countnumber of items being bundled.
Returns
new specific_observable with the buffer operator as most recent operator.
Warning
#include <rpp/operators/buffer.hpp>
Example:
// The stream that uses rvalue overloads for operators
rpp::source::just(1, 2, 3, 4, 5)
.buffer(2)
.subscribe(
[](const std::vector<int>& v) { std::cout << v << "-"; },
[](const std::exception_ptr&) {},
[]() { std::cout << "|" << std::endl; });
// Source: -1-2-3-4-5--|
// Output: {1,2}-{3,4}-{5}-|
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store std::vector<Type> of requested size.
  • OnNext
    • Accumulates emissions inside current bundle and emits this bundle when requested cound reached and starts new bundle.
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Emits current active bundle (if any) and just forwards on_completed
See also
https://reactivex.io/documentation/operators/buffer.html

◆ flat_map()

template<constraint::decayed_type Type, typename SpecificObservable >
template<flat_map_callable< Type > Callable>
requires is_header_included<flat_map_tag, Callable>
auto observable::flat_map ( Callable &&  callable) const &
inline

Transform emissions to observables via provided function and then merge emissions from such an observables.

Warning
According to observable contract (https://reactivex.io/documentation/contract.html) emissions from any observable should be serialized, so, resulting observable uses mutex to satisfy this requirement

Actually it makes map(callable) and then merge.

Parameters
callableFunction to transform item to observable
Returns
new specific_observable with the flat_map operator as most recent operator.
Warning
#include <rpp/operators/flat_map.hpp>
Example:
rpp::source::just(1, 2, 3)
.flat_map([](int val) { return rpp::source::from_iterable(std::vector(val, val)); })
.subscribe([](int v) { std::cout << v << " "; });
// Output: 1 2 2 3 3 3
See also
https://reactivex.io/documentation/operators/flatmap.html

◆ group_by()

template<constraint::decayed_type Type, typename SpecificObservable >
template<std::invocable< Type > KeySelector, std::invocable< Type > ValueSelector = std::identity, typename TKey = rpp::utils::decayed_invoke_result_t<KeySelector, Type>, std::strict_weak_order< TKey, TKey > KeyComparator = std::less<TKey>>
requires is_header_included<group_by_tag, KeySelector, ValueSelector, TKey, KeyComparator>
auto observable::group_by ( KeySelector &&  key_selector,
ValueSelector &&  value_selector = {},
KeyComparator &&  comparator = {} 
) const &
inline

Divide original observable into multiple observables where each new observable emits some group of values from original observable.

Actually this operator applies key_selector to emission to obtain key, place rpp::grouped_observable to map with corresponding map and then send observable with this key (if not yet). Original values emitted via this grouped_observables

Parameters
key_selectorFunction which determines key for provided item
value_selectorFunction which determines value to be emitted to grouped observable
comparatorFunction to provide strict_weak_order between key types
Returns
new specific_observable with the group_by operator as most recent operator.
Warning
#include <rpp/operators/group_by.hpp>
Example:
rpp::source::just(1, 2, 3, 4, 5, 6, 7, 8)
.group_by([](int v) { return v % 2 == 0; })
.subscribe([](auto grouped_observable)
{
auto key = grouped_observable.get_key();
std::cout << "new grouped observable " << key << std::endl;
grouped_observable.subscribe([key](int val)
{
std::cout << "key [" << key << "] Val: " << val << std::endl;
});
});
// Output: new grouped observable 0
// key [0] Val: 1
// new grouped observable 1
// key [1] Val: 2
// key [0] Val: 3
// key [1] Val: 4
// key [0] Val: 5
// key [1] Val: 6
// key [0] Val: 7
// key [1] Val: 8
Definition: grouped_observable.hpp:20
struct Person
{
std::string name;
int age;
};
rpp::source::just(Person{"Kate", 18},
Person{"Alex", 25},
Person{"Nick", 18},
Person{"Jack", 25},
Person{"Tom", 30},
Person{"Vanda", 18})
.group_by([](const Person& v) { return v.age; }, [](const Person& v) { return v.name; })
.subscribe([](auto grouped_observable)
{
grouped_observable.subscribe([age = grouped_observable.get_key()](const std::string& name)
{
std::cout << "Age [" << age << "] Name: " << name << std::endl;
});
});
// Output: Age [18] Name: Kate
// Age [25] Name: Alex
// Age [18] Name: Nick
// Age [25] Name: Jack
// Age [30] Name: Tom
// Age [18] Name: Vanda
auto group_by(KeySelector &&key_selector, ValueSelector &&value_selector={}, KeyComparator &&comparator={}) const &
Divide original observable into multiple observables where each new observable emits some group of va...
Definition: group_by.hpp:65
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to keep map<key, grouped_observable>
  • OnNext
    • Applies key_selector to obtained emission
    • For calculated key create new entry in map (if not yet)
    • Emit value via grouped_observable from map for corresponding key
  • OnError
    • Just forwards original on_error to both subscribers of observable of grouped observables and grouped observables
  • OnCompleted
    • Just forwards original on_completed to both subscribers of observable of grouped observables and grouped observables
See also
https://reactivex.io/documentation/operators/groupby.html

◆ map()

template<constraint::decayed_type Type, typename SpecificObservable >
template<std::invocable< Type > Callable>
requires is_header_included<map_tag, Callable>
auto observable::map ( Callable &&  callable) const &
inline

Transform the items emitted by an Observable via applying a function to each item and emitting result.

Note
The Map operator can keep same type of value or change it to some another type.

Actually this operator just applies callable to each obtained emission and emit resulting value

Parameters
callableis callable used to provide this transformation. Should accept Type of original observable and return type for new observable
Returns
new specific_observable with the Map operator as most recent operator.
Warning
#include <rpp/operators/map.hpp>
Example with same type:
rpp::source::just(42)
.map([](int value)
{
return value + 10;
})
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 52
Example with changed type:
rpp::source::just(42)
.map([](int value)
{
return std::to_string(value) + " VAL";
})
.subscribe([](std::string v) { std::cout << v << std::endl; });
// Output: 42 VAL
Implementation details:
  • On subscribe
    • None
  • OnNext
    • Just forwards result of applying callable to emissions
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Just forwards original on_completed
See also
https://reactivex.io/documentation/operators/map.html

◆ scan()

template<constraint::decayed_type Type, typename SpecificObservable >
template<typename Result , scan_accumulator< Result, Type > AccumulatorFn>
requires is_header_included<scan_tag, Result, AccumulatorFn>
auto observable::scan ( Result &&  initial_value,
AccumulatorFn &&  accumulator 
) const &
inline

Apply accumulator function for each emission from observable and result of accumulator from previous step and emit (and cache) resulting value.

Acttually this operator applies provided accumulator function to seed and new emission, emits resulting value and updates seed value for next emission

Parameters
initial_valueinitial value for seed which will be applied for first value from observable (instead of emitting this as first value). Then it will be replaced with result and etc.
accumulatorfunction which accepts seed value and new value from observable and return new value of seed. Can accept seed by move-reference.
Returns
new specific_observable with the scan operator as most recent operator.
Warning
#include <rpp/operators/scan.hpp>
Example
rpp::source::just(1,2,3)
.scan(0, std::plus<int>{})
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 1 3 6
rpp::source::just(1,2,3)
.scan(std::vector<int>{}, [](std::vector<int>&& seed, int new_value)
{
seed.push_back(new_value);
return std::move(seed);
})
.subscribe([](const std::vector<int>& v)
{
std::cout << "vector: ";
for(int val : v)
std::cout << val << " ";
std::cout << std::endl;
});
// Output: vector: 1
// vector: 1 2
// vector: 1 2 3
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store seed
  • OnNext
    • Applies accumulator to each emission
    • Updates seed value
    • Emits new seed value
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Just forwards original on_completed
See also
https://reactivex.io/documentation/operators/scan.html

◆ switch_map()

template<constraint::decayed_type Type, typename SpecificObservable >
template<switch_map_callable< Type > Callable>
requires is_header_included<switch_map_tag, Callable>
auto observable::switch_map ( Callable &&  callable) const &
inline

convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables

Actually it makes map and then switch_on_next.

Parameters
callableFunction to transform item to observable
Returns
new specific_observable with the switch_map operator as most recent operator.
Warning
#include <rpp/operators/switch_map.hpp>
Example:
rpp::source::just(1, 2, 3)
.switch_map([](int val) {
if (val == 1)
return rpp::source::never<int>()
sub.get_subscription().add([&]() {
std::cout << "x-"; // x is notation for unsubscribed
});
return sub;
})
.as_dynamic();
return rpp::source::from_iterable(std::vector{val, val})
.as_dynamic();
})
.subscribe([](int v) { std::cout << v << "-"; });
// Output: x-2-2-3-3-
subscriber which uses dynamic_observer<T> to hide original callbacks
Definition: dynamic_subscriber.hpp:24
See also
https://reactivex.io/documentation/operators/switchmap.html

◆ window()

template<constraint::decayed_type Type, typename SpecificObservable >
template<typename ... Args>
requires is_header_included<window_tag, Args...>
auto observable::window ( size_t  window_size) const &
inline

Subdivide original observable into sub-observables (windowed observables) and emit sub-observables of items instead of original items.

Actually it is similar to buffer but it emits observable instead of container.

Parameters
window_sizeamount of items which every observable would have
Returns
new specific_observable with the window operator as most recent operator.
Warning
#include <rpp/operators/window.hpp>
Example
rpp::source::just(1,2,3,4,5)
.window(3)
.subscribe([](const rpp::windowed_observable<int>& v) { std::cout << "\nNew observable " << std::endl; v.subscribe([](int v) {std::cout << v << " "; }); });
// Output: New observable
// 1 2 3
// New observable
// 4 5
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to keep internal state
  • OnNext
    • Emits new window-observable if previous observable emitted requested amound of emisions
    • Emits emission via active window-observable
    • Completes window-observable if requested amound of emisions reached
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Just forwards original on_completed
See also
https://reactivex.io/documentation/operators/window.html