ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
Combining Operators

Combining operators are operators that combines emissions of multiple observables into same observable by some rule. More...

Functions

template<typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
auto rpp::operators::combine_latest (TSelector &&selector, TObservable &&observable, TObservables &&... observables)
 Combines latest emissions from observables with emission from current observable when any observable sends new value via applying selector.
 
template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
auto rpp::operators::combine_latest (TObservable &&observable, TObservables &&... observables)
 Combines latest emissions from observables with emission from current observable when any observable sends new value via making tuple.
 
auto rpp::operators::merge ()
 Converts observable of observables of items into observable of items via merging emissions.
 
template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires constraint::observables_of_same_type<std::decay_t<TObservable>, std::decay_t<TObservables>...>
auto rpp::operators::merge_with (TObservable &&observable, TObservables &&... observables)
 Combines submissions from current observable with other observables into one.
 
template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires constraint::observables_of_same_type<std::decay_t<TObservable>, std::decay_t<TObservables>...>
auto rpp::operators::start_with (TObservable &&observable, TObservables &&... observables)
 Combines submissions from current observable with other observables into one but without overlapping and starting from observables provided as arguments.
 
template<constraint::memory_model MemoryModel = memory_model::use_stack, typename T, typename... Ts>
requires (rpp::constraint::decayed_same_as<T, Ts> && ...)
auto rpp::operators::start_with_values (T &&v, Ts &&... vals)
 Combines submissions from current observable with values into one but without overlapping and starting from values provided as arguments.
 
template<constraint::memory_model MemoryModel = memory_model::use_stack, rpp::schedulers::constraint::scheduler TScheduler, typename T, typename... Ts>
requires (rpp::constraint::decayed_same_as<T, Ts> && ...)
auto rpp::operators::start_with_values (const TScheduler &scheduler, T &&v, Ts &&... vals)
 Combines submissions from current observable with values into one but without overlapping and starting from values provided as arguments.
 
auto rpp::operators::switch_on_next ()
 Converts observable of observables into observable of values which emits values from most recent underlying observable till new observable obtained.
 
template<typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
auto rpp::operators::with_latest_from (TSelector &&selector, TObservable &&observable, TObservables &&... observables)
 Combines latest emissions from observables with emission from current observable when it sends new value via applying selector.
 
template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
auto rpp::operators::with_latest_from (TObservable &&observable, TObservables &&... observables)
 Combines latest emissions from observables with emission from current observable when it sends new value via making tuple.
 
template<typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
auto rpp::operators::zip (TSelector &&selector, TObservable &&observable, TObservables &&... observables)
 combines emissions from observables and emit single items for each combination based on the results of provided selector
 
template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
auto rpp::operators::zip (TObservable &&observable, TObservables &&... observables)
 combines emissions from observables and emit tuple of items for each combination
 

Detailed Description

Combining operators are operators that combines emissions of multiple observables into same observable by some rule.

See also
https://reactivex.io/documentation/operators.html#combining

Function Documentation

◆ combine_latest() [1/2]

template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
auto rpp::operators::combine_latest ( TObservable && observable,
TObservables &&... observables )

Combines latest emissions from observables with emission from current observable when any observable sends new value via making tuple.

Actually this operator subscribes on all of theses observables and emits new combined value when any of them emits new emission (and each observable emit values at least one to be able to provide combined value)

Warning
Selector is just packing values to tuple in this case
Performance notes:
  • 1 heap allocation for disposable
  • each value from any observable copied/moved to internal storage
  • mutex acquired every time value obtained
Parameters
observablesare observables whose emissions would be combined when any observable sends new value
Note
#include <rpp/operators/combine_latest.hpp>
Examples
| rpp::ops::subscribe([](const std::tuple<int, int>& v) { std::cout << "-" << v; },
[](const std::exception_ptr&) {},
[]() { std::cout << "-|" << std::endl; });
// source 1: -1---2---3-|
// source 2: -4---5---6-| (note that source 2 is subscribed earlier than source 1)
// Output : -{1,4}-{1,5}-{2,5}-{2,6}-{3,6}}-|
See also
https://reactivex.io/documentation/operators/combinelatest.html

◆ combine_latest() [2/2]

template<typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
auto rpp::operators::combine_latest ( TSelector && selector,
TObservable && observable,
TObservables &&... observables )

Combines latest emissions from observables with emission from current observable when any observable sends new value via applying selector.

Actually this operator subscribes on all of theses observables and emits new combined value when any of them emits new emission (and each observable emit values at least one to be able to provide combined value)

Performance notes:
  • 1 heap allocation for disposable
  • each value from any observable copied/moved to internal storage
  • mutex acquired every time value obtained
Parameters
selectoris applied to current emission of current observable and latests emissions from observables
observablesare observables whose emissions would be combined with current observable
Note
#include <rpp/operators/combine_latest.hpp>
Examples
rpp::source::just(1, 2, 3) // source 1
| rpp::ops::combine_latest([](int left, int right) { return left + right; }, // custom combiner
rpp::source::just(4, 5, 6)) // source 2
| rpp::ops::subscribe([](int v) { std::cout << "-" << v; },
[](const std::exception_ptr&) {},
[]() { std::cout << "-|" << std::endl; });
// source 1: -1---2---3-|
// source 2: -4---5---6-| (note that source 2 is subscribed earlier than source 1)
// Output : -5-6-7-8-9-|
See also
https://reactivex.io/documentation/operators/combinelatest.html
Examples
combine_latest.cpp.

◆ merge()

auto rpp::operators::merge ( )
inline

Converts observable of observables of items into observable of items via merging emissions.

Invariant
According to observable contract (https://reactivex.io/documentation/contract.html) emissions from any observable should be serialized, so, resulting observable uses mutex to satisfy this requirement
Attention
During on subscribe operator takes ownership over rpp::schedulers::current_thread to allow mixing of underlying emissions

Actually it subscribes on each observable from emissions. Resulting observables completes when ALL observables completes

Performance notes:
  • 2 heap allocation (1 for state, 1 to convert observer to dynamic_observer)
  • Acquiring mutex during all observer's calls
Note
#include <rpp/operators/merge.hpp>
Example:
rpp::source::never<int>().as_dynamic(),
rpp::source::just(2).as_dynamic())
| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
// Output: 1 2
See also
https://reactivex.io/documentation/operators/merge.html
Examples
merge.cpp, and start_with.cpp.

◆ merge_with()

template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires constraint::observables_of_same_type<std::decay_t<TObservable>, std::decay_t<TObservables>...>
auto rpp::operators::merge_with ( TObservable && observable,
TObservables &&... observables )

Combines submissions from current observable with other observables into one.

Warning
According to observable contract (https://reactivex.io/documentation/contract.html) emissions from any observable should be serialized, so, resulting observable uses mutex to satisfy this requirement
During on subscribe operator takes ownership over rpp::schedulers::current_thread to allow mixing of underlying emissions

Actually it subscribes on each observable. Resulting observables completes when ALL observables completes

Performance notes:
  • 2 heap allocation (1 for state, 1 to convert observer to dynamic_observer)
  • Acquiring mutex during all observer's calls
Parameters
observablesare observables whose emissions would be merged with current observable
Note
#include <rpp/operators/merge.hpp>
Example:
| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
// Output: 1 2
See also
https://reactivex.io/documentation/operators/merge.html
Examples
merge.cpp.

◆ start_with()

template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires constraint::observables_of_same_type<std::decay_t<TObservable>, std::decay_t<TObservables>...>
auto rpp::operators::start_with ( TObservable && observable,
TObservables &&... observables )

Combines submissions from current observable with other observables into one but without overlapping and starting from observables provided as arguments.

Warning
If by some reason you need to interpet observables as "values", not sources of data, then use start_with_values instead

Actually it makes concat(observables_to_start_with..., current_observable) so observables from argument subscribed before current observable

Parameters
observableslist of observables which should be used before current observable
Note
#include <rpp/operators/start_with.hpp>
Example
| rpp::ops::subscribe([](int v) { std::cout << v << " "; });
// Output: 5 6 1 2 3
See also
https://reactivex.io/documentation/operators/startwith.html
Examples
start_with.cpp.

◆ start_with_values() [1/2]

template<constraint::memory_model MemoryModel = memory_model::use_stack, rpp::schedulers::constraint::scheduler TScheduler, typename T, typename... Ts>
requires (rpp::constraint::decayed_same_as<T, Ts> && ...)
auto rpp::operators::start_with_values ( const TScheduler & scheduler,
T && v,
Ts &&... vals )

Combines submissions from current observable with values into one but without overlapping and starting from values provided as arguments.

Actually it makes concat(rpp::source::just(vals_to_start_with)..., current_observable) so observables from argument subscribed before current observable

Template Parameters
memory_modelmemory_model strategy used to store provided values
Parameters
valslist of values which should be emitted before current observable
Note
#include <rpp/operators/start_with.hpp>
Example
| rpp::ops::subscribe([](int v) { std::cout << v << " "; });
// Output: 5 6 1 2 3
See also
https://reactivex.io/documentation/operators/startwith.html

◆ start_with_values() [2/2]

template<constraint::memory_model MemoryModel = memory_model::use_stack, typename T, typename... Ts>
requires (rpp::constraint::decayed_same_as<T, Ts> && ...)
auto rpp::operators::start_with_values ( T && v,
Ts &&... vals )

Combines submissions from current observable with values into one but without overlapping and starting from values provided as arguments.

Actually it makes concat(rpp::source::just(vals_to_start_with)..., current_observable) so observables from argument subscribed before current observable

This overloading operates on rpp::schedulers::current_thread by default

Template Parameters
memory_modelmemory_model strategy used to store provided values
Parameters
valslist of values which should be emitted before current observable
Note
#include <rpp/operators/start_with.hpp>
Example
| rpp::ops::subscribe([](int v) { std::cout << v << " "; });
// Output: 5 6 1 2 3
See also
https://reactivex.io/documentation/operators/startwith.html
Examples
start_with.cpp.

◆ switch_on_next()

auto rpp::operators::switch_on_next ( )
inline

Converts observable of observables into observable of values which emits values from most recent underlying observable till new observable obtained.

Actually this operator just unsubscribes from previous observable and subscribes on new observable when obtained in on_next

Note
#include <rpp/operators/switch_on_next.hpp>
Example:
rpp::source::never<int>().as_dynamic(),
rpp::source::just(2).as_dynamic())
| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
// Output: 1 2
See also
https://reactivex.io/documentation/operators/switch.html
Examples
switch_on_next.cpp.

◆ with_latest_from() [1/2]

template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
auto rpp::operators::with_latest_from ( TObservable && observable,
TObservables &&... observables )

Combines latest emissions from observables with emission from current observable when it sends new value via making tuple.

Warning
Selector is just packing values to tuple in this case
Performance notes:
  • 1 heap allocation for disposable
  • each value from "others" copied/moved to internal storage
  • mutex acquired every time value obtained
Parameters
observablesare observables whose emissions would be combined when current observable sends new value
Note
#include <rpp/operators/with_latest_from.hpp>
Examples
rpp::source::just(1, 2, 3, 4, 5, 6)
| rpp::operators::subscribe([](std::tuple<int, int> v) { std::cout << std::get<0>(v) << ":" << std::get<1>(v) << " "; });
// Output: 1:3 2:4 3:5 4:5 5:5 6:5
std::cout << std::endl;
| rpp::operators::subscribe([](std::tuple<int, int> v) { std::cout << std::get<0>(v) << ":" << std::get<1>(v) << " "; });
// Output: 1:3 2:4 3:5
See also
https://reactivex.io/documentation/operators/combinelatest.html

◆ with_latest_from() [2/2]

template<typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
auto rpp::operators::with_latest_from ( TSelector && selector,
TObservable && observable,
TObservables &&... observables )

Combines latest emissions from observables with emission from current observable when it sends new value via applying selector.

Actually this operator just keeps last values from all other observables and combines them together with each new emission from original observable

Performance notes:
  • 1 heap allocation for disposable
  • each value from "others" copied/moved to internal storage
  • mutex acquired every time value obtained
Parameters
selectoris applied to current emission of current observable and latests emissions from observables
observablesare observables whose emissions would be combined when current observable sends new value
Note
#include <rpp/operators/with_latest_from.hpp>
Examples
rpp::source::just(1, 2, 3, 4)
| rpp::operators::with_latest_from([](int left, int right) { return left + right; },
| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
// Output: 4 6 8 9
See also
https://reactivex.io/documentation/operators/combinelatest.html
Examples
with_latest_from.cpp.

◆ zip() [1/2]

template<rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
auto rpp::operators::zip ( TObservable && observable,
TObservables &&... observables )

combines emissions from observables and emit tuple of items for each combination

Performance notes:
  • 1 heap allocation for disposable
  • each value from any observable copied/moved to internal storage
  • mutex acquired every time value obtained
Parameters
observablesare observables whose emissions would be zipped with current observable
Note
#include <rpp/operators/zip.hpp>
See also
https://reactivex.io/documentation/operators/zip.html

◆ zip() [2/2]

template<typename TSelector, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires (!rpp::constraint::observable<TSelector> && (!utils::is_not_template_callable<TSelector> || std::invocable<TSelector, rpp::utils::convertible_to_any, utils::extract_observable_type_t<TObservable>, utils::extract_observable_type_t<TObservables>...>))
auto rpp::operators::zip ( TSelector && selector,
TObservable && observable,
TObservables &&... observables )

combines emissions from observables and emit single items for each combination based on the results of provided selector

Performance notes:
  • 1 heap allocation for disposable
  • each value from any observable copied/moved to internal storage
  • mutex acquired every time value obtained
Parameters
selectoris applied to current emission of current observable and latests emissions from observables
observablesare observables whose emissions would be zipped with current observable
Note
#include <rpp/operators/zip.hpp>
See also
https://reactivex.io/documentation/operators/zip.html