ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
Combining Operators

Combining operators are operators that combines emissions of multiple observables into same observable by some rule. More...

Functions

template<constraint::observable ... TOtherObservable, std::invocable< Type, utils::extract_observable_type_t< TOtherObservable >... > TCombiner>
requires is_header_included<combine_latest_tag, TOtherObservable...>
auto observable::combine_latest (TCombiner &&combiner, TOtherObservable &&...observables) const &
 Combines latest emissions from original observable and other observables when any of them emits.
 
template<constraint::observable ... TOtherObservable>
requires is_header_included<combine_latest_tag, TOtherObservable...>
auto observable::combine_latest (TOtherObservable &&...observables) const &
 Combines latest emissions from current observable and other observables when any of them emits. The combining result is std::tuple<...>.
 
template<typename ... Args>
requires (is_header_included<merge_tag, Args...> && rpp::constraint::observable<Type>)
auto observable::merge () const &
 Converts observable of observables of items into observable of items via merging emissions.
 
template<constraint::observable_of_type< Type > ... TObservables>
requires (is_header_included<merge_tag, TObservables...>&& sizeof...(TObservables) >= 1)
auto observable::merge_with (TObservables &&... observables) const &
 Combines submissions from current observable with other observables into one.
 
template<rpp::memory_model memory_model = memory_model::use_stack, constraint::decayed_same_as< Type > ... TTypes>
requires is_header_included<start_with_tag, TTypes...>
auto observable::start_with (TTypes &&...vals_to_start_with) const &
 Combines submissions from current observable with values into one but without overlapping and starting from values provided as arguments.
 
template<constraint::observable_of_type< Type > ... TObservables>
requires is_header_included<start_with_tag, TObservables...>
auto observable::start_with (TObservables &&...observables_to_start_with) const &
 Combines submissions from current observable with other observables into one but without overlapping and starting from observables provided as arguments.
 
template<typename ... Args>
requires (is_header_included<switch_on_next_tag, Args...>&& rpp::constraint::observable<Type>)
auto observable::switch_on_next () const &
 Converts observable of observables into observable of values which emits values from most recent underlying observable till new observable obtained.
 
template<constraint::observable ... TObservables, std::invocable< Type, utils::extract_observable_type_t< TObservables >... > TSelector>
requires is_header_included<with_latest_from_tag, TObservables...>
auto observable::with_latest_from (TSelector &&selector, TObservables &&...observables) const &
 Combines latest emissions from observables with emission from current observable when it sends new value via applying selector.
 
template<constraint::observable ... TObservables>
requires is_header_included<with_latest_from_tag, TObservables...>
auto observable::with_latest_from (TObservables &&...observables) const &
 Combines latest emissions from observables with emission from current observable when it sends new value via making tuple.
 

Detailed Description

Combining operators are operators that combines emissions of multiple observables into same observable by some rule.

See also
https://reactivex.io/documentation/operators.html#combining

Function Documentation

◆ combine_latest() [1/2]

template<constraint::decayed_type Type, typename SpecificObservable >
template<constraint::observable ... TOtherObservable, std::invocable< Type, utils::extract_observable_type_t< TOtherObservable >... > TCombiner>
requires is_header_included<combine_latest_tag, TOtherObservable...>
auto observable::combine_latest ( TCombiner &&  combiner,
TOtherObservable &&...  observables 
) const &
inline

Combines latest emissions from original observable and other observables when any of them emits.

Warning
According to observable contract (https://reactivex.io/documentation/contract.html) emissions from any observable should be serialized, so, resulting observable uses std::mutex to satisfy this requirement

Actually this operator subscribes on all of theses observables and emits new combined value when any of them emits new emission (and each observable emit values at least one to be able to provide combined value)

Parameters
combinercombines emissions from all the observables using custom composition.
observablesare observables whose emissions would be combined with the current observable's emissions
Returns
new specific_observable with the combine_latest operator as most recent operator.
Warning
#include <rpp/operators/combine_latest.hpp>
Examples
rpp::source::just(1, 2, 3) // source 1
.combine_latest(
[](int left, int right) { return left + right; }, // custom combiner
rpp::source::just(4, 5, 6)) // source 2
.subscribe(
[](int v) { std::cout << "-" << v; },
[](const std::exception_ptr&) {},
[]() { std::cout << "-|" << std::endl; });
// source 1: -1---2---3-|
// source 2: -4---5---6-| (note that source 2 is subscribed earlier than source 1)
// Output : -7-8-9-|
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store last emissions.
    • Wraps subscriber with serialization logic to be sure callbacks called serialized
  • OnNext
    • Keeps last emission from each observable
    • Applies combiner function and emits result if there is last emissions for each observable
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Just forwards original on_completed
See also
https://reactivex.io/documentation/operators/combinelatest.html

◆ combine_latest() [2/2]

template<constraint::decayed_type Type, typename SpecificObservable >
template<constraint::observable ... TOtherObservable>
requires is_header_included<combine_latest_tag, TOtherObservable...>
auto observable::combine_latest ( TOtherObservable &&...  observables) const &
inline

Combines latest emissions from current observable and other observables when any of them emits. The combining result is std::tuple<...>.

Actually this operator subscribes on all of theses observables and emits std::tuple of last emissions when any of them emits new emission (and each observable emit values at least one to be able to provide combined value)

Parameters
observablesare observables whose emissions would be combined with the current observable's emissions
Returns
new specific_observable with the combine_latest operator as most recent operator.
Warning
#include <rpp/operators/combine_latest.hpp>
Examples
rpp::source::just(1, 2, 3) // source 1
.combine_latest(
[](int left, int right) { return left + right; }, // custom combiner
rpp::source::just(4, 5, 6)) // source 2
.subscribe(
[](int v) { std::cout << "-" << v; },
[](const std::exception_ptr&) {},
[]() { std::cout << "-|" << std::endl; });
// source 1: -1---2---3-|
// source 2: -4---5---6-| (note that source 2 is subscribed earlier than source 1)
// Output : -7-8-9-|
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store last emissions.
    • Wraps subscriber with serialization logic to be sure callbacks called serialized
  • OnNext
    • Keeps last emission from each observable
    • Emits std::tuple of last emissions if there is last emissions for each observable
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Just forwards original on_completed
See also
https://reactivex.io/documentation/operators/combinelatest.html

◆ merge()

template<constraint::decayed_type Type, typename SpecificObservable >
template<typename ... Args>
requires (is_header_included<merge_tag, Args...> && rpp::constraint::observable<Type>)
auto observable::merge ( ) const &
inline

Converts observable of observables of items into observable of items via merging emissions.

Warning
According to observable contract (https://reactivex.io/documentation/contract.html) emissions from any observable should be serialized, so, resulting observable uses mutex to satisfy this requirement

Actually it subscribes on each observable from emissions. Resulting observables completes when ALL observables completes

Returns
new specific_observable with the merge operator as most recent operator.
Warning
#include <rpp/operators/merge.hpp>
Example:
rpp::source::just(rpp::source::just(1).as_dynamic(),
rpp::source::never<int>().as_dynamic(),
rpp::source::just(2).as_dynamic())
.merge()
.subscribe([](int v) { std::cout << v << " "; });
// Output: 1 2
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store interal state
    • Wraps subscriber with serialization logic to be sure callbacks called serialized
  • OnNext for original observable
    • Subscribes on obtained observable
  • OnNext for inner observable
    • Just forwards original on_next
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Just forwards original on_completed when all observables emit on_completed
See also
https://reactivex.io/documentation/operators/merge.html

◆ merge_with()

template<constraint::decayed_type Type, typename SpecificObservable >
template<constraint::observable_of_type< Type > ... TObservables>
requires (is_header_included<merge_tag, TObservables...>&& sizeof...(TObservables) >= 1)
auto observable::merge_with ( TObservables &&...  observables) const &
inline

Combines submissions from current observable with other observables into one.

Warning
According to observable contract (https://reactivex.io/documentation/contract.html) emissions from any observable should be serialized, so, resulting observable uses mutex to satisfy this requirement

Actually it subscribes on each observable. Resulting observables completes when ALL observables completes

Parameters
observablesare observables whose emissions would be merged with current observable
Returns
new specific_observable with the merge operator as most recent operator.
Warning
#include <rpp/operators/merge.hpp>
Example:
rpp::source::just(1)
.merge_with(rpp::source::just(2))
.subscribe([](int v) { std::cout << v << " "; });
// Output: 1 2
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store interal state
    • Wraps subscriber with serialization logic to be sure callbacks called serialized
  • OnNext
    • Just forwards original on_next
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Just forwards original on_completed when all observables emit on_completed
See also
https://reactivex.io/documentation/operators/merge.html

◆ start_with() [1/2]

template<constraint::decayed_type Type, typename SpecificObservable >
template<constraint::observable_of_type< Type > ... TObservables>
requires is_header_included<start_with_tag, TObservables...>
auto observable::start_with ( TObservables &&...  observables_to_start_with) const &
inline

Combines submissions from current observable with other observables into one but without overlapping and starting from observables provided as arguments.

Actually it makes concat(observables_to_start_with..., current_observable) so observables from argument subscribed before current observable

Parameters
observableslist of observables which should be used before current observable
Returns
new specific_observable with the start_with operator as most recent operator.
Warning
#include <rpp/operators/start_with.hpp>
Example
rpp::source::just(1, 2, 3)
.start_with(rpp::source::just(5), rpp::source::just(6))
.subscribe([](int v) { std::cout << v << " "; });
// Output: 5 6 1 2 3
See also
https://reactivex.io/documentation/operators/startwith.html

◆ start_with() [2/2]

template<constraint::decayed_type Type, typename SpecificObservable >
template<rpp::memory_model memory_model = memory_model::use_stack, constraint::decayed_same_as< Type > ... TTypes>
requires is_header_included<start_with_tag, TTypes...>
auto observable::start_with ( TTypes &&...  vals_to_start_with) const &
inline

Combines submissions from current observable with values into one but without overlapping and starting from values provided as arguments.

Actually it makes concat(rpp::source::just(vals_to_start_with)..., current_observable) so observables from argument subscribed before current observable

Template Parameters
memory_modelmemory_model strategy used to store provided values
Parameters
valslist of values which should be emitted before current observable
Returns
new specific_observable with the start_with operator as most recent operator.
Warning
#include <rpp/operators/start_with.hpp>
Example
rpp::source::just(1,2,3)
.start_with(5, 6)
.subscribe([](int v) { std::cout << v << " "; });
// Output: 5 6 1 2 3
See also
https://reactivex.io/documentation/operators/startwith.html

◆ switch_on_next()

template<constraint::decayed_type Type, typename SpecificObservable >
template<typename ... Args>
requires (is_header_included<switch_on_next_tag, Args...>&& rpp::constraint::observable<Type>)
auto observable::switch_on_next ( ) const &
inline

Converts observable of observables into observable of values which emits values from most recent underlying observable till new observable obtained.

Actually this operator just unsubscribes from previous observable and subscribes on new observable when obtained in on_next

Returns
new specific_observable with the switch_on_next operator as most recent operator.
Warning
#include <rpp/operators/switch_on_next.hpp>
Example:
rpp::source::just(rpp::source::just(1).as_dynamic(),
rpp::source::never<int>().as_dynamic(),
rpp::source::just(2).as_dynamic())
.switch_on_next()
.subscribe([](int v) { std::cout << v << " "; });
// Output: 1 2
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store internal state
  • OnNext
    • Unsubscribed from previous observable (if any)
    • Subscribed on new emitted observable
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Just forwards original on_completed if no any active inner observable or original observable yet
See also
https://reactivex.io/documentation/operators/switch.html

◆ with_latest_from() [1/2]

template<constraint::decayed_type Type, typename SpecificObservable >
template<constraint::observable ... TObservables>
requires is_header_included<with_latest_from_tag, TObservables...>
auto observable::with_latest_from ( TObservables &&...  observables) const &
inline

Combines latest emissions from observables with emission from current observable when it sends new value via making tuple.

Parameters
observablesare observables whose emissions would be combined when current observable sends new value
Returns
new specific_observable with the with_latest_from operator as most recent operator.
Warning
#include <rpp/operators/with_latest_from.hpp>
Examples
rpp::source::just(1, 2, 3)
.with_latest_from(rpp::source::just(3, 4, 5, 6))
.subscribe([](std::tuple<int,int> v) { std::cout << std::get<0>(v) << ":" << std::get<1>(v) << " "; });
// Output: 1:6 2:6 3:6
See also
https://reactivex.io/documentation/operators/combinelatest.html

◆ with_latest_from() [2/2]

template<constraint::decayed_type Type, typename SpecificObservable >
template<constraint::observable ... TObservables, std::invocable< Type, utils::extract_observable_type_t< TObservables >... > TSelector>
requires is_header_included<with_latest_from_tag, TObservables...>
auto observable::with_latest_from ( TSelector &&  selector,
TObservables &&...  observables 
) const &
inline

Combines latest emissions from observables with emission from current observable when it sends new value via applying selector.

Actually this operator just keeps last values from all other observables and combines them together with each new emission from original observable

Parameters
selectoris applied to current emission of current observable and latests emissions from observables
observablesare observables whose emissions would be combined when current observable sends new value
Returns
new specific_observable with the with_latest_from operator as most recent operator.
Warning
#include <rpp/operators/with_latest_from.hpp>
Examples
rpp::source::just(1, 2, 3)
.with_latest_from([](int left, int right) { return left + right; },
rpp::source::just(3, 4, 5, 6))
.subscribe([](int v) { std::cout << v << " "; });
// Output: 7 8 9
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to keep last values from all observables
  • OnNext for original observable
    • Applies selector to new emission and all saved last values from other observable (if any value for all observables)
  • OnNext other original observables
    • Just updates last value for this observable
  • OnError
    • Just forwards original on_error
  • OnCompleted for original observable
    • Just forwards original on_completed
  • OnCompleted for other observables
    • None
See also
https://reactivex.io/documentation/operators/combinelatest.html