ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
Combining Operators

Combining operators are operators that combines emissions of multiple observables into same observable by some rule. More...

Functions

template<typename TSelector , rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
auto rpp::operators::combine_latest (TSelector &&selector, TObservable &&observable, TObservables &&... observables)
 Combines latest emissions from observables with emission from current observable when any observable sends new value via applying selector.
 
template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
auto rpp::operators::combine_latest (TObservable &&observable, TObservables &&... observables)
 Combines latest emissions from observables with emission from current observable when any observable sends new value via making tuple.
 
auto rpp::operators::merge ()
 Converts observable of observables of items into observable of items via merging emissions.
 
template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires constraint::observables_of_same_type<std::decay_t<TObservable>, std::decay_t<TObservables>...>
auto rpp::operators::merge_with (TObservable &&observable, TObservables &&... observables)
 Combines submissions from current observable with other observables into one.
 
template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires constraint::observables_of_same_type<std::decay_t<TObservable>, std::decay_t<TObservables>...>
auto rpp::operators::start_with (TObservable &&observable, TObservables &&... observables)
 Combines submissions from current observable with other observables into one but without overlapping and starting from observables provided as arguments.
 
template<constraint::memory_model MemoryModel = memory_model::use_stack, typename T , typename... Ts>
requires (rpp::constraint::decayed_same_as<T, Ts> && ...)
auto rpp::operators::start_with_values (T &&v, Ts &&... vals)
 Combines submissions from current observable with values into one but without overlapping and starting from values provided as arguments.
 
template<constraint::memory_model MemoryModel = memory_model::use_stack, rpp::schedulers::constraint::scheduler TScheduler, typename T , typename... Ts>
requires (rpp::constraint::decayed_same_as<T, Ts> && ...)
auto rpp::operators::start_with_values (const TScheduler &scheduler, T &&v, Ts &&... vals)
 Combines submissions from current observable with values into one but without overlapping and starting from values provided as arguments.
 
template<typename TSelector , rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
auto rpp::operators::with_latest_from (TSelector &&selector, TObservable &&observable, TObservables &&... observables)
 Combines latest emissions from observables with emission from current observable when it sends new value via applying selector.
 
template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
auto rpp::operators::with_latest_from (TObservable &&observable, TObservables &&... observables)
 Combines latest emissions from observables with emission from current observable when it sends new value via making tuple.
 
template<typename TSelector , rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
auto rpp::operators::zip (TSelector &&selector, TObservable &&observable, TObservables &&... observables)
 combines emissions from observables and emit single items for each combination based on the results of provided selector
 
template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
auto rpp::operators::zip (TObservable &&observable, TObservables &&... observables)
 combines emissions from observables and emit tuple of items for each combination
 

Detailed Description

Combining operators are operators that combines emissions of multiple observables into same observable by some rule.

See also
https://reactivex.io/documentation/operators.html#combining

Function Documentation

◆ combine_latest() [1/2]

template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
auto rpp::operators::combine_latest ( TObservable && observable,
TObservables &&... observables )

Combines latest emissions from observables with emission from current observable when any observable sends new value via making tuple.

Actually this operator subscribes on all of theses observables and emits new combined value when any of them emits new emission (and each observable emit values at least one to be able to provide combined value)

Warning
Selector is just packing values to tuple in this case
Performance notes:
  • 1 heap allocation for disposable
  • each value from any observable copied/moved to internal storage
  • mutex acquired every time value obtained
Parameters
observablesare observables whose emissions would be combined when any observable sends new value
Warning
#include <rpp/operators/combine_latest.hpp>
Examples
| rpp::ops::combine_latest(rpp::source::just(rpp::schedulers::current_thread{}, 4, 5, 6)) // source 2
| rpp::ops::subscribe([](const std::tuple<int, int>& v) { std::cout << "-" << v; },
[](const std::exception_ptr&) {},
[]() { std::cout << "-|" << std::endl; });
// source 1: -1---2---3-|
// source 2: -4---5---6-| (note that source 2 is subscribed earlier than source 1)
// Output : -{1,4}-{1,5}-{2,5}-{2,6}-{3,6}}-|
See also
https://reactivex.io/documentation/operators/combinelatest.html

◆ combine_latest() [2/2]

template<typename TSelector , rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
auto rpp::operators::combine_latest ( TSelector && selector,
TObservable && observable,
TObservables &&... observables )

Combines latest emissions from observables with emission from current observable when any observable sends new value via applying selector.

Actually this operator subscribes on all of theses observables and emits new combined value when any of them emits new emission (and each observable emit values at least one to be able to provide combined value)

Performance notes:
  • 1 heap allocation for disposable
  • each value from any observable copied/moved to internal storage
  • mutex acquired every time value obtained
Parameters
selectoris applied to current emission of current observable and latests emissions from observables
observablesare observables whose emissions would be combined with current observable
Warning
#include <rpp/operators/combine_latest.hpp>
Examples
See also
https://reactivex.io/documentation/operators/combinelatest.html

◆ merge()

auto rpp::operators::merge ( )
inline

Converts observable of observables of items into observable of items via merging emissions.

Warning
According to observable contract (https://reactivex.io/documentation/contract.html) emissions from any observable should be serialized, so, resulting observable uses mutex to satisfy this requirement
During on subscribe operator takes ownership over rpp::schedulers::current_thread to allow mixing of underlying emissions

Actually it subscribes on each observable from emissions. Resulting observables completes when ALL observables completes

Performance notes:
  • 2 heap allocation (1 for state, 1 to convert observer to dynamic_observer)
  • Acquiring mutex during all observer's calls
Warning
#include <rpp/operators/merge.hpp>
Example:
rpp::source::never<int>().as_dynamic(),
rpp::source::just(2).as_dynamic())
| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
// Output: 1 2
See also
https://reactivex.io/documentation/operators/merge.html
Examples
merge.cpp.

◆ merge_with()

template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires constraint::observables_of_same_type<std::decay_t<TObservable>, std::decay_t<TObservables>...>
auto rpp::operators::merge_with ( TObservable && observable,
TObservables &&... observables )

Combines submissions from current observable with other observables into one.

Warning
According to observable contract (https://reactivex.io/documentation/contract.html) emissions from any observable should be serialized, so, resulting observable uses mutex to satisfy this requirement
During on subscribe operator takes ownership over rpp::schedulers::current_thread to allow mixing of underlying emissions

Actually it subscribes on each observable. Resulting observables completes when ALL observables completes

Performance notes:
  • 2 heap allocation (1 for state, 1 to convert observer to dynamic_observer)
  • Acquiring mutex during all observer's calls
Parameters
observablesare observables whose emissions would be merged with current observable
Warning
#include <rpp/operators/merge.hpp>
Example:
| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
// Output: 1 2
See also
https://reactivex.io/documentation/operators/merge.html
Examples
merge.cpp.

◆ start_with()

template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires constraint::observables_of_same_type<std::decay_t<TObservable>, std::decay_t<TObservables>...>
auto rpp::operators::start_with ( TObservable && observable,
TObservables &&... observables )

Combines submissions from current observable with other observables into one but without overlapping and starting from observables provided as arguments.

Warning
If by some reason you need to interpet observables as "values", not sources of data, then use start_with_values instead

Actually it makes concat(observables_to_start_with..., current_observable) so observables from argument subscribed before current observable

Parameters
observableslist of observables which should be used before current observable
Warning
#include <rpp/operators/start_with.hpp>
Example
See also
https://reactivex.io/documentation/operators/startwith.html

◆ start_with_values() [1/2]

template<constraint::memory_model MemoryModel = memory_model::use_stack, rpp::schedulers::constraint::scheduler TScheduler, typename T , typename... Ts>
requires (rpp::constraint::decayed_same_as<T, Ts> && ...)
auto rpp::operators::start_with_values ( const TScheduler & scheduler,
T && v,
Ts &&... vals )

Combines submissions from current observable with values into one but without overlapping and starting from values provided as arguments.

Actually it makes concat(rpp::source::just(vals_to_start_with)..., current_observable) so observables from argument subscribed before current observable

Template Parameters
memory_modelmemory_model strategy used to store provided values
Parameters
valslist of values which should be emitted before current observable
Warning
#include <rpp/operators/start_with.hpp>
Example
See also
https://reactivex.io/documentation/operators/startwith.html

◆ start_with_values() [2/2]

template<constraint::memory_model MemoryModel = memory_model::use_stack, typename T , typename... Ts>
requires (rpp::constraint::decayed_same_as<T, Ts> && ...)
auto rpp::operators::start_with_values ( T && v,
Ts &&... vals )

Combines submissions from current observable with values into one but without overlapping and starting from values provided as arguments.

Actually it makes concat(rpp::source::just(vals_to_start_with)..., current_observable) so observables from argument subscribed before current observable

This overloading operates on rpp::schedulers::current_thread by default

Template Parameters
memory_modelmemory_model strategy used to store provided values
Parameters
valslist of values which should be emitted before current observable
Warning
#include <rpp/operators/start_with.hpp>
Example
See also
https://reactivex.io/documentation/operators/startwith.html

◆ with_latest_from() [1/2]

template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
auto rpp::operators::with_latest_from ( TObservable && observable,
TObservables &&... observables )

Combines latest emissions from observables with emission from current observable when it sends new value via making tuple.

Warning
Selector is just packing values to tuple in this case
Performance notes:
  • 1 heap allocation for disposable
  • each value from "others" copied/moved to internal storage
  • mutex acquired every time value obtained
Parameters
observablesare observables whose emissions would be combined when current observable sends new value
Warning
#include <rpp/operators/with_latest_from.hpp>
Examples
rpp::source::just(1, 2, 3, 4, 5, 6)
| rpp::operators::subscribe([](std::tuple<int, int> v) { std::cout << std::get<0>(v) << ":" << std::get<1>(v) << " "; });
// Output: 1:3 2:4 3:5 4:5 5:5 6:5
std::cout << std::endl;
| rpp::operators::subscribe([](std::tuple<int, int> v) { std::cout << std::get<0>(v) << ":" << std::get<1>(v) << " "; });
// Output: 1:3 2:4 3:5
See also
https://reactivex.io/documentation/operators/combinelatest.html

◆ with_latest_from() [2/2]

template<typename TSelector , rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
auto rpp::operators::with_latest_from ( TSelector && selector,
TObservable && observable,
TObservables &&... observables )

Combines latest emissions from observables with emission from current observable when it sends new value via applying selector.

Actually this operator just keeps last values from all other observables and combines them together with each new emission from original observable

Performance notes:
  • 1 heap allocation for disposable
  • each value from "others" copied/moved to internal storage
  • mutex acquired every time value obtained
Parameters
selectoris applied to current emission of current observable and latests emissions from observables
observablesare observables whose emissions would be combined when current observable sends new value
Warning
#include <rpp/operators/with_latest_from.hpp>
Examples
rpp::source::just(1, 2, 3, 4)
| rpp::operators::with_latest_from([](int left, int right) { return left + right; },
| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
// Output: 4 6 8 9
See also
https://reactivex.io/documentation/operators/combinelatest.html
Examples
with_latest_from.cpp.

◆ zip() [1/2]

template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
auto rpp::operators::zip ( TObservable && observable,
TObservables &&... observables )

combines emissions from observables and emit tuple of items for each combination

Performance notes:
  • 1 heap allocation for disposable
  • each value from any observable copied/moved to internal storage
  • mutex acquired every time value obtained
Parameters
observablesare observables whose emissions would be zipped with current observable
Warning
#include <rpp/operators/zip.hpp>
See also
https://reactivex.io/documentation/operators/zip.html

◆ zip() [2/2]

template<typename TSelector , rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
auto rpp::operators::zip ( TSelector && selector,
TObservable && observable,
TObservables &&... observables )

combines emissions from observables and emit single items for each combination based on the results of provided selector

Performance notes:
  • 1 heap allocation for disposable
  • each value from any observable copied/moved to internal storage
  • mutex acquired every time value obtained
Parameters
selectoris applied to current emission of current observable and latests emissions from observables
observablesare observables whose emissions would be zipped with current observable
Warning
#include <rpp/operators/zip.hpp>
See also
https://reactivex.io/documentation/operators/zip.html