Transforming operators are operators that transform items provided by observable.
More...
template<typename ... Args>
requires is_header_included<buffer_tag, Args...>
auto observable::buffer (size_t count) const &
Periodically gather emissions emitted by an original Observable into bundles and emit these bundles rather than emitting the items one at a time.
template<flat_map_callable< Type > Callable>
requires is_header_included<flat_map_tag, Callable>
auto observable::flat_map (Callable &&callable) const &
Transform emissions to observables via provided function and then merge emissions from such an observables.
template<std::invocable< Type > KeySelector, std::invocable< Type > ValueSelector = std::identity, typename TKey = rpp::utils::decayed_invoke_result_t<KeySelector, Type>, std::strict_weak_order< TKey, TKey > KeyComparator = std::less<TKey>>
requires is_header_included<group_by_tag, KeySelector, ValueSelector, TKey, KeyComparator>
auto observable::group_by (KeySelector &&key_selector, ValueSelector &&value_selector={}, KeyComparator &&comparator={}) const &
Divide original observable into multiple observables where each new observable emits some group of values from original observable.
template<std::invocable< Type > Callable>
requires is_header_included<map_tag, Callable>
auto observable::map (Callable &&callable) const &
Transform the items emitted by an Observable via applying a function to each item and emitting result.
template<typename Result , scan_accumulator< Result, Type > AccumulatorFn>
requires is_header_included<scan_tag, Result, AccumulatorFn>
auto observable::scan (Result &&initial_value, AccumulatorFn &&accumulator) const &
Apply accumulator function for each emission from observable and result of accumulator from previous step and emit (and cache) resulting value.
template<switch_map_callable< Type > Callable>
requires is_header_included<switch_map_tag, Callable>
auto observable::switch_map (Callable &&callable) const &
convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables
template<typename ... Args>
requires is_header_included<window_tag, Args...>
auto observable::window (size_t window_size) const &
Subdivide original observable into sub-observables (windowed observables) and emit sub-observables of items instead of original items.
Transforming operators are operators that transform items provided by observable.
template<constraint::decayed_type Type, typename SpecificObservable >
template<typename ... Args>
requires is_header_included<buffer_tag, Args...>
Periodically gather emissions emitted by an original Observable into bundles and emit these bundles rather than emitting the items one at a time.
The resulting bundle is std::vector<Type>
of requested size. Actually it is similar to window()
operator, but it emits vectors instead of observables.
Parameters
count number of items being bundled.
Returns new specific_observable with the buffer operator as most recent operator.
Warning #include <rpp/operators/buffer.hpp >
Example:
rpp::source::just(1, 2, 3, 4, 5)
.buffer(2)
.subscribe(
[](const std::vector<int>& v) { std::cout << v << "-" ; },
[](const std::exception_ptr&) {},
[]() { std::cout << "|" << std::endl; });
Implementation details:
On subscribe
Allocates one shared_ptr
to store std::vector<Type>
of requested size.
OnNext
Accumulates emissions inside current bundle and emits this bundle when requested cound reached and starts new bundle.
OnError
Just forwards original on_error
OnCompleted
Emits current active bundle (if any) and just forwards on_completed
See also https://reactivex.io/documentation/operators/buffer.html
template<constraint::decayed_type Type, typename SpecificObservable >
template<std::invocable< Type > KeySelector, std::invocable< Type > ValueSelector = std::identity, typename TKey = rpp::utils::decayed_invoke_result_t<KeySelector, Type>, std::strict_weak_order< TKey, TKey > KeyComparator = std::less<TKey>>
requires is_header_included<group_by_tag, KeySelector, ValueSelector, TKey, KeyComparator>
auto observable ::group_by
(
KeySelector &&
key_selector ,
ValueSelector &&
value_selector = {}
,
KeyComparator &&
comparator = {}
)
const &
inline
Divide original observable into multiple observables where each new observable emits some group of values from original observable.
Actually this operator applies key_selector
to emission to obtain key, place rpp::grouped_observable to map with corresponding map and then send observable with this key (if not yet). Original values emitted via this grouped_observables
Parameters
key_selector Function which determines key for provided item
value_selector Function which determines value to be emitted to grouped observable
comparator Function to provide strict_weak_order between key types
Returns new specific_observable with the group_by operator as most recent operator.
Warning #include <rpp/operators/group_by.hpp >
Example: rpp::source::just(1, 2, 3, 4, 5, 6, 7, 8)
.group_by([](int v) { return v % 2 == 0; })
{
std::cout << "new grouped observable " << key << std::endl;
{
std::cout << "key [" << key << "] Val: " << val << std::endl;
});
});
Definition: grouped_observable.hpp:20
struct Person
{
std::string name;
int age;
};
rpp::source::just(Person{"Kate" , 18},
Person{"Alex" , 25},
Person{"Nick" , 18},
Person{"Jack" , 25},
Person{"Tom" , 30},
Person{"Vanda" , 18})
.
group_by ([](
const Person& v) {
return v.age; }, [](
const Person& v) {
return v.name; })
.subscribe([](auto grouped_observable)
{
grouped_observable.subscribe([age = grouped_observable.get_key()](const std::string& name)
{
std::cout << "Age [" << age << "] Name: " << name << std::endl;
});
});
Implementation details:
On subscribe
Allocates one shared_ptr
to keep map<key, grouped_observable>
OnNext
Applies key_selector to obtained emission
For calculated key create new entry in map (if not yet)
Emit value via grouped_observable from map for corresponding key
OnError
Just forwards original on_error to both subscribers of observable of grouped observables and grouped observables
OnCompleted
Just forwards original on_completed to both subscribers of observable of grouped observables and grouped observables
See also https://reactivex.io/documentation/operators/groupby.html
template<constraint::decayed_type Type, typename SpecificObservable >
template<std::invocable< Type > Callable>
requires is_header_included<map_tag, Callable>
auto observable ::map
(
Callable &&
callable )
const &
inline
Transform the items emitted by an Observable via applying a function to each item and emitting result.
Note The Map operator can keep same type of value or change it to some another type.
Actually this operator just applies callable to each obtained emission and emit resulting value
Parameters
callable is callable used to provide this transformation. Should accept Type of original observable and return type for new observable
Returns new specific_observable with the Map operator as most recent operator.
Warning #include <rpp/operators/map.hpp >
Example with same type: rpp::source::just(42)
.map([](int value)
{
return value + 10;
})
.subscribe([](int v) { std::cout << v << std::endl; });
Example with changed type: rpp::source::just(42)
.map([](int value)
{
return std::to_string(value) + " VAL" ;
})
.subscribe([](std::string v) { std::cout << v << std::endl; });
Implementation details:
On subscribe
OnNext
Just forwards result of applying callable to emissions
OnError
Just forwards original on_error
OnCompleted
Just forwards original on_completed
See also https://reactivex.io/documentation/operators/map.html
template<constraint::decayed_type Type, typename SpecificObservable >
template<typename Result , scan_accumulator< Result, Type > AccumulatorFn>
requires is_header_included<scan_tag, Result, AccumulatorFn>
auto observable ::scan
(
Result &&
initial_value ,
AccumulatorFn &&
accumulator
)
const &
inline
Apply accumulator function for each emission from observable and result of accumulator from previous step and emit (and cache) resulting value.
Acttually this operator applies provided accumulator function to seed and new emission, emits resulting value and updates seed value for next emission
Parameters
initial_value initial value for seed which will be applied for first value from observable (instead of emitting this as first value). Then it will be replaced with result and etc.
accumulator function which accepts seed value and new value from observable and return new value of seed. Can accept seed by move-reference.
Returns new specific_observable with the scan operator as most recent operator.
Warning #include <rpp/operators/scan.hpp >
Example rpp::source::just(1,2,3)
.scan(0, std::plus<int>{})
.subscribe([](int v) { std::cout << v << std::endl; });
rpp::source::just(1,2,3)
.scan(std::vector<int>{}, [](std::vector<int>&& seed, int new_value)
{
seed.push_back(new_value);
return std::move(seed);
})
.subscribe([](const std::vector<int>& v)
{
std::cout << "vector: " ;
for (int val : v)
std::cout << val << " " ;
std::cout << std::endl;
});
Implementation details:
On subscribe
Allocates one shared_ptr
to store seed
OnNext
Applies accumulator to each emission
Updates seed value
Emits new seed value
OnError
Just forwards original on_error
OnCompleted
Just forwards original on_completed
See also https://reactivex.io/documentation/operators/scan.html
template<constraint::decayed_type Type, typename SpecificObservable >
template<switch_map_callable< Type > Callable>
requires is_header_included<switch_map_tag, Callable>
auto observable ::switch_map
(
Callable &&
callable )
const &
inline
convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables
Actually it makes map
and then switch_on_next
.
Parameters
callable Function to transform item to observable
Returns new specific_observable with the switch_map operator as most recent operator.
Warning #include <rpp/operators/switch_map.hpp >
Example: rpp::source::just(1, 2, 3)
.switch_map([](int val) {
if (val == 1)
return rpp::source::never<int>()
sub.get_subscription().add([&]() {
std::cout << "x-" ;
});
return sub;
})
.as_dynamic();
return rpp::source::from_iterable(std::vector{val, val})
.as_dynamic();
})
.subscribe([](int v) { std::cout << v << "-" ; });
subscriber which uses dynamic_observer<T> to hide original callbacks
Definition: dynamic_subscriber.hpp:24
See also https://reactivex.io/documentation/operators/switchmap.html
template<constraint::decayed_type Type, typename SpecificObservable >
template<typename ... Args>
requires is_header_included<window_tag, Args...>
auto observable ::window
(
size_t
window_size )
const &
inline
Subdivide original observable into sub-observables (windowed observables) and emit sub-observables of items instead of original items.
Actually it is similar to buffer
but it emits observable instead of container.
Parameters
window_size amount of items which every observable would have
Returns new specific_observable with the window operator as most recent operator.
Warning #include <rpp/operators/window.hpp >
Example rpp::source::just(1,2,3,4,5)
.window(3)
.subscribe([](const rpp::windowed_observable<int>& v) { std::cout << "\nNew observable " << std::endl; v.subscribe([](int v) {std::cout << v << " " ; }); });
Implementation details:
On subscribe
Allocates one shared_ptr
to keep internal state
OnNext
Emits new window-observable if previous observable emitted requested amound of emisions
Emits emission via active window-observable
Completes window-observable if requested amound of emisions reached
OnError
Just forwards original on_error
OnCompleted
Just forwards original on_completed
See also https://reactivex.io/documentation/operators/window.html