ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
Transforming Operators

Transforming operators are operators that transform items provided by observable. More...

Functions

auto rpp::operators::buffer (size_t count)
 Periodically gather emissions emitted by an original Observable into bundles and emit these bundles rather than emitting the items one at a time.
 
template<typename Fn>
requires (!utils::is_not_template_callable<Fn> || rpp::constraint::observable<std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
auto rpp::operators::flat_map (Fn &&callable)
 Transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable.
 
template<typename KeySelector, typename ValueSelector = std::identity, typename KeyComparator = rpp::utils::less>
requires ( (!utils::is_not_template_callable<KeySelector> || !std::same_as<void, std::invoke_result_t<KeySelector, rpp::utils::convertible_to_any>>) && (!utils::is_not_template_callable<ValueSelector> || !std::same_as<void, std::invoke_result_t<ValueSelector, rpp::utils::convertible_to_any>>) && (!utils::is_not_template_callable<KeyComparator> || std::strict_weak_order<KeyComparator, rpp::utils::convertible_to_any, rpp::utils::convertible_to_any>))
auto rpp::operators::group_by (KeySelector &&key_selector, ValueSelector &&value_selector, KeyComparator &&comparator)
 Divide original observable into multiple observables where each new observable emits some group of values from original observable.
 
template<typename Fn>
requires (!utils::is_not_template_callable<Fn> || !std::same_as<void, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
auto rpp::operators::map (Fn &&callable)
 Transforms the items emitted by an Observable via applying a function to each item and emitting result.
 
template<typename InitialValue, typename Fn>
requires (!utils::is_not_template_callable<Fn> || std::same_as<std::decay_t<InitialValue>, std::invoke_result_t<Fn, std::decay_t<InitialValue> &&, rpp::utils::convertible_to_any>>)
auto rpp::operators::scan (InitialValue &&initial_value, Fn &&accumulator)
 Apply accumulator function for each emission from observable and result of accumulator from previous step and emit (and cache) resulting value.
 
template<typename Fn>
auto rpp::operators::scan (Fn &&accumulator)
 Apply accumulator function for each emission from observable and result of accumulator from previous step and emit (and cache) resulting value.
 
auto rpp::operators::window (size_t count)
 Subdivide original observable into sub-observables (window observables) and emit sub-observables of items instead of original items.
 
template<rpp::constraint::observable TOpeningsObservable, typename TClosingsSelectorFn>
requires rpp::constraint::observable<std::invoke_result_t<TClosingsSelectorFn, rpp::utils::extract_observable_type_t<TOpeningsObservable>>>
auto rpp::operators::window_toggle (TOpeningsObservable &&openings, TClosingsSelectorFn &&closings_selector)
 Subdivide original observable into sub-observables (window observables) and emit sub-observables of items instead of original items.
 

Detailed Description

Transforming operators are operators that transform items provided by observable.

See also
https://reactivex.io/documentation/operators.html#transforming

Function Documentation

◆ buffer()

auto rpp::operators::buffer ( size_t count)
inline

Periodically gather emissions emitted by an original Observable into bundles and emit these bundles rather than emitting the items one at a time.

The resulting bundle is std::vector<Type> of requested size. Actually it is similar to window() operator, but it emits vectors instead of observables.

Parameters
countnumber of items being bundled.
Note
#include <rpp/operators/buffer.hpp>
Example:
// The stream that uses rvalue overloads for operators
rpp::source::just(1, 2, 3, 4, 5)
[](const std::vector<int>& v) { std::cout << v << "-"; },
[](const std::exception_ptr&) {},
[]() { std::cout << "|" << std::endl; });
// Source: -1-2-3-4-5--|
// Output: {1,2}-{3,4}-{5}-|
See also
https://reactivex.io/documentation/operators/buffer.html
Examples
buffer.cpp.

◆ flat_map()

template<typename Fn>
requires (!utils::is_not_template_callable<Fn> || rpp::constraint::observable<std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
auto rpp::operators::flat_map ( Fn && callable)

Transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable.

Actually it makes map(callable) and then merge.

Note that flat_map merges the emissions of these Observables, so that they may interleave.

Parameters
callablefunction that returns an observable for each item emitted by the source observable.
Note
#include <rpp/operators/flat_map.hpp>
See also
https://reactivex.io/documentation/operators/flatmap.html
Examples
thread_pool.cpp, and timeout.cpp.

◆ group_by()

template<typename KeySelector, typename ValueSelector = std::identity, typename KeyComparator = rpp::utils::less>
requires ( (!utils::is_not_template_callable<KeySelector> || !std::same_as<void, std::invoke_result_t<KeySelector, rpp::utils::convertible_to_any>>) && (!utils::is_not_template_callable<ValueSelector> || !std::same_as<void, std::invoke_result_t<ValueSelector, rpp::utils::convertible_to_any>>) && (!utils::is_not_template_callable<KeyComparator> || std::strict_weak_order<KeyComparator, rpp::utils::convertible_to_any, rpp::utils::convertible_to_any>))
auto rpp::operators::group_by ( KeySelector && key_selector,
ValueSelector && value_selector,
KeyComparator && comparator )

Divide original observable into multiple observables where each new observable emits some group of values from original observable.

Actually this operator applies key_selector to emission to obtain key, place rpp::grouped_observable to map with corresponding map and then send observable with this key (if not yet). Original values emitted via this grouped_observables

Parameters
key_selectorFunction which determines key for provided item
value_selectorFunction which determines value to be emitted to grouped observable
comparatorFunction to provide strict_weak_order between key types
Note
#include <rpp/operators/group_by.hpp>
Example:
rpp::source::just(1, 2, 3, 4, 5, 6, 7, 8)
| rpp::operators::group_by([](int v) { return v % 2 == 0; })
| rpp::operators::subscribe([](auto grouped_observable) {
auto key = grouped_observable.get_key();
std::cout << "new grouped observable " << key << std::endl;
grouped_observable.subscribe([key](int val) {
std::cout << "key [" << key << "] Val: " << val << std::endl;
});
});
// Output: new grouped observable 0
// key [0] Val: 1
// new grouped observable 1
// key [1] Val: 2
// key [0] Val: 3
// key [1] Val: 4
// key [0] Val: 5
// key [1] Val: 6
// key [0] Val: 7
// key [1] Val: 8
struct person
{
std::string name;
int age;
};
rpp::source::just(person{"Kate", 18},
person{"Alex", 25},
person{"Nick", 18},
person{"Jack", 25},
person{"Tom", 30},
person{"Vanda", 18})
| rpp::operators::group_by([](const person& v) { return v.age; }, [](const person& v) { return v.name; })
| rpp::operators::subscribe([](auto grouped_observable) {
grouped_observable.subscribe([age = grouped_observable.get_key()](const std::string& name) {
std::cout << "Age [" << age << "] Name: " << name << std::endl;
});
});
// Output: Age [18] Name: Kate
// Age [25] Name: Alex
// Age [18] Name: Nick
// Age [25] Name: Jack
// Age [30] Name: Tom
// Age [18] Name: Vanda
See also
https://reactivex.io/documentation/operators/groupby.html
Examples
group_by.cpp.

◆ map()

template<typename Fn>
requires (!utils::is_not_template_callable<Fn> || !std::same_as<void, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
auto rpp::operators::map ( Fn && callable)

Transforms the items emitted by an Observable via applying a function to each item and emitting result.

Note
The Map operator can keep same type of value or change it to some another type.

Actually this operator just applies callable to each obtained emission and emit resulting value

Performance notes:
  • No any heap allocations at all
  • No any copies/moves of emissions, just forwarding to callable
Parameters
callableis callable used to provide this transformation. Should accept Type of original observable and return type for new observable
Note
#include <rpp/operators/map.hpp>
Example with same type:
| rpp::operators::map([](int value) { return value + 10; })
| rpp::operators::subscribe([](int v) { std::cout << v << std::endl; });
// Output: 52
Example with changed type:
| rpp::operators::map([](int value) { return std::to_string(value) + " VAL"; })
| rpp::operators::subscribe([](const std::string& v) { std::cout << v << std::endl; });
// Output: 42 VAL
See also
https://reactivex.io/documentation/operators/map.html
Examples
from_signal.cpp, map.cpp, and readme.cpp.

◆ scan() [1/2]

template<typename Fn>
auto rpp::operators::scan ( Fn && accumulator)

Apply accumulator function for each emission from observable and result of accumulator from previous step and emit (and cache) resulting value.

Actually this operator applies provided accumulator function to seed and new emission, emits resulting value and updates seed value for next emission

Warning
There is no initial value for seed, so, first value would be used as seed value and forwarded as is.
Performance notes:
  • No any heap allocations at all
  • Keep actual seed/cache inside observable and updating it every emission
Parameters
accumulatorfunction which accepts seed value and new value from observable and return new value of seed. Can accept seed by move-reference.
Note
#include <rpp/operators/scan.hpp>
Example
| rpp::operators::scan(std::plus<int>{})
| rpp::operators::subscribe([](int v) { std::cout << v << std::endl; });
// Output: 1 3 6
See also
https://reactivex.io/documentation/operators/scan.html

◆ scan() [2/2]

template<typename InitialValue, typename Fn>
requires (!utils::is_not_template_callable<Fn> || std::same_as<std::decay_t<InitialValue>, std::invoke_result_t<Fn, std::decay_t<InitialValue> &&, rpp::utils::convertible_to_any>>)
auto rpp::operators::scan ( InitialValue && initial_value,
Fn && accumulator )

Apply accumulator function for each emission from observable and result of accumulator from previous step and emit (and cache) resulting value.

Actually this operator applies provided accumulator function to seed and new emission, emits resulting value and updates seed value for next emission

Warning
Initial value would be used as first value from this observable (would be sent during subscription) and initial value for cache
Performance notes:
  • No any heap allocations at all
  • Keep actual seed/cache inside observable and updating it every emission
Parameters
initial_valueinitial value for seed which would be sent during subscription and applied for first value from observable. Then it will be replaced with result and etc.
accumulatorfunction which accepts seed value and new value from observable and return new value of seed. Can accept seed by move-reference.
Note
#include <rpp/operators/scan.hpp>
Example
| rpp::operators::scan(10, std::plus<int>{})
| rpp::operators::subscribe([](int v) { std::cout << v << std::endl; });
// Output: 10 11 13 16
| rpp::operators::scan(std::vector<int>{}, [](std::vector<int>&& seed, int new_value) {
seed.push_back(new_value);
return std::move(seed);
})
| rpp::operators::subscribe([](const std::vector<int>& v) {
std::cout << "vector: ";
for (int val : v)
std::cout << val << " ";
std::cout << std::endl;
});
// Output: vector:
// vector: 1
// vector: 1 2
// vector: 1 2 3
See also
https://reactivex.io/documentation/operators/scan.html
Examples
scan.cpp.

◆ window()

auto rpp::operators::window ( size_t count)
inline

Subdivide original observable into sub-observables (window observables) and emit sub-observables of items instead of original items.

Actually it is similar to buffer but it emits observable instead of container.

Parameters
countamount of items which every observable would have
Note
#include <rpp/operators/window.hpp>
Example
rpp::source::just(1, 2, 3, 4, 5)
| rpp::operators::subscribe([](const rpp::window_observable<int>& v) {
std::cout << "\nNew observable " << std::endl;
v.subscribe([](int v) { std::cout << v << " "; });
});
// Output: New observable
// 1 2 3
// New observable
// 4 5
See also
https://reactivex.io/documentation/operators/window.html
Examples
window.cpp.

◆ window_toggle()

template<rpp::constraint::observable TOpeningsObservable, typename TClosingsSelectorFn>
requires rpp::constraint::observable<std::invoke_result_t<TClosingsSelectorFn, rpp::utils::extract_observable_type_t<TOpeningsObservable>>>
auto rpp::operators::window_toggle ( TOpeningsObservable && openings,
TClosingsSelectorFn && closings_selector )

Subdivide original observable into sub-observables (window observables) and emit sub-observables of items instead of original items.

Values from openings observable used to specify moment when new window will be opened. closings_selector is used to obtain observable to specify moment when new window will be closed.

Actually it is similar to buffer but it emits observable instead of container.

Parameters
openingsis observable which emissions used to start new window
closings_selectoris function which returns observable which emission/completion means closing of opened window
Note
#include <rpp/operators/window.hpp>
Example
size_t counter{};
source
| rpp::operators::window_toggle(source, [source](int) { return source | rpp::ops::filter([](int v) { return v % 2 == 0; }); })
| rpp::operators::subscribe([&counter](const rpp::window_toggle_observable<int>& obs) {
std::cout << "New observable " << ++counter << std::endl;
obs.subscribe([counter](int v) { std::cout << counter << ": " << v << " " << std::endl; }, [counter]() { std::cout << "closing " << counter << std::endl; });
});
// Output:
// New observable 1
// 1: 1
// New observable 2
// 1: 2
// 2: 2
// closing 1
// New observable 3
// 2: 3
// 3: 3
// New observable 4
// 2: 4
// 3: 4
// 4: 4
// closing 2
// closing 3
// New observable 5
// 4: 5
// 5: 5
// closing 4
// closing 5
See also
https://reactivex.io/documentation/operators/window.html
Examples
window_toggle.cpp.