ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
Filtering Operators

Filtering operators are operators that emit only part of items that satisfies some condition. More...

Functions

template<std::equivalence_relation< Type, Type > EqualityFn = std::equal_to<Type>>
requires is_header_included<distinct_until_changed_tag, EqualityFn>
auto observable::distinct_until_changed (EqualityFn &&equality_fn=EqualityFn{}) const &
 Suppress consecutive duplicates of emissions from original observable.
 
template<std::predicate< const Type & > Predicate>
requires is_header_included<filter_tag, Predicate>
auto observable::filter (Predicate &&predicate) const &
 Emit only those items from an Observable that satisfies a provided predicate.
 
auto observable::first () const &
 emit only the first item.
 
auto observable::last () const &
 Emit only the last item provided before on_completed.
 
template<schedulers::constraint::scheduler TScheduler, typename ... Args>
requires is_header_included<sample_tag, TScheduler, Args...>
auto observable::sample_with_time (schedulers::duration period, const TScheduler &scheduler) const &
 Emit most recent emitted from original observable emission obtained during last period of time.
 
template<typename... Args>
requires is_header_included<skip_tag, Args...>
auto observable::skip (size_t count) const &
 Skip first count items provided by observable then send rest items as expected.
 
template<typename... Args>
requires is_header_included<take_tag, Args...>
auto observable::take (size_t count) const &
 Emit only first count items provided by observable, then send on_completed
 
template<typename ... Args>
requires is_header_included<take_last_tag, Args...>
auto observable::take_last (size_t count) const &
 Emit only last count items provided by observable, then send on_completed
 

Detailed Description

Filtering operators are operators that emit only part of items that satisfies some condition.

See also
https://reactivex.io/documentation/operators.html#filtering

Function Documentation

◆ distinct_until_changed()

template<constraint::decayed_type Type, typename SpecificObservable >
template<std::equivalence_relation< Type, Type > EqualityFn = std::equal_to<Type>>
requires is_header_included<distinct_until_changed_tag, EqualityFn>
auto observable::distinct_until_changed ( EqualityFn &&  equality_fn = EqualityFn{}) const &
inline

Suppress consecutive duplicates of emissions from original observable.

Actually this operator has std::optional with last item and checks everytime where new emission is same or not.

Parameters
equality_fnoptional equality comparator function
Returns
new specific_observable with the distinct_until_changed operator as most recent operator.
Warning
#include <rpp/operators/distinct_until_changed.hpp>
Example
rpp::source::just(1, 1, 2, 2, 3, 2, 1)
.distinct_until_changed()
.subscribe([](int val) { std::cout << val << " "; });
// Output: 1 2 3 2 1
rpp::source::just(1, 1, 2, 2, 3, 2, 1)
.distinct_until_changed([](int left, int right) {return left != right; })
.subscribe([](int val) { std::cout << val << " "; });
// Output: 1 1 1
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store last emission
  • OnNext
    • Checks if value in state same as new emission
    • If new emission is not same, then updates state and emit this emission
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Just forwards original on_completed
See also
https://reactivex.io/documentation/operators/distinct.html

◆ filter()

template<constraint::decayed_type Type, typename SpecificObservable >
template<std::predicate< const Type & > Predicate>
requires is_header_included<filter_tag, Predicate>
auto observable::filter ( Predicate &&  predicate) const &
inline

Emit only those items from an Observable that satisfies a provided predicate.

Actually this operator just checks if predicate returns true, then forwards emission

Parameters
predicateis predicate used to check emitted items. true -> items satisfies condition, false -> not
Returns
new specific_observable with the Filter operator as most recent operator.
Warning
#include <rpp/operators/filter.hpp>
Example:
rpp::source::just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
.filter([](int v) { return v % 2 == 0; })
.subscribe([](int v) { std::cout << v << " "; });
// Output: 0 2 4 6 8
Implementation details:
  • On subscribe
    • None
  • OnNext
    • Just forwards emission when predicate returns true
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Just forwards original on_completed
See also
https://reactivex.io/documentation/operators/filter.html

◆ first()

template<constraint::decayed_type Type, typename SpecificObservable >
auto observable::first ( ) const &
inline

emit only the first item.

Actually this operator is take(1) with exception during on_completed if no any emision happens. So, it just forwards first obtained emission and emits on_completed immediately

Exceptions
rpp::utils::not_enough_emissionsin case of on_completed obtained without any emissions
Returns
new specific_observable with the first operator as most recent operator.
Warning
#include <rpp/operators/first.hpp>
Example:
rpp::source::just(1, 2, 3, 4, 5)
.first()
.subscribe(
[](const auto &v) { std::cout << "-" << v; },
[](const std::exception_ptr&) {},
[]() { std::cout << "-|" << std::endl; });
// Source: -1-2-3-4-5--|
// Output: -1-|
rpp::source::empty<int>()
.first()
.subscribe(
[](const auto &v) { std::cout << "-" << v; },
[](const std::exception_ptr&) { std::cout << "-x" << std::endl; },
[]() { std::cout << "-|" << std::endl; });
// Source: -1-2-3-4-5--|
// Output: -x
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to keep internal state
  • OnNext
    • Just forwards 1 emission and emit on_completed
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • throws exception if no any emission before
See also
https://reactivex.io/documentation/operators/first.html

◆ last()

template<constraint::decayed_type Type, typename SpecificObservable >
auto observable::last ( ) const &
inline

Emit only the last item provided before on_completed.

Actually this operator just updates std::optional on every new emission and emits this value on_completed

Exceptions
rpp::utils::not_enough_emissionsin case of on_completed obtained without any emissions
Returns
new specific_observable with the last operator as most recent operator.
Warning
#include <rpp/operators/last.hpp>
Example:
rpp::source::just(1, 2, 3, 4, 5)
.last()
.subscribe(
[](const auto &v) { std::cout << "-" << v; },
[](const std::exception_ptr &) {},
[]() { std::cout << "-|" << std::endl; });
// Source: -1-2-3-4-5--|
// Output: -5-|
rpp::source::empty<int>()
.last()
.subscribe(
[](const auto &v) { std::cout << "-" << v; },
[](const std::exception_ptr &) { std::cout << "-x"; },
[]() { std::cout << "-|" << std::endl; });
// Source: -1-2-3-4-5--|
// Output: -x
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to keep std::optional
  • OnNext
    • Just saves emission to std::optional
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Emits saved emission or throws exception if no any emissions
See also
https://reactivex.io/documentation/operators/last.html

◆ sample_with_time()

template<constraint::decayed_type Type, typename SpecificObservable >
template<schedulers::constraint::scheduler TScheduler, typename ... Args>
requires is_header_included<sample_tag, TScheduler, Args...>
auto observable::sample_with_time ( schedulers::duration  period,
const TScheduler &  scheduler 
) const &
inline

Emit most recent emitted from original observable emission obtained during last period of time.

Emit item immediately in case of completion of the original observable

Actually operator just schedules periodical action and on each schedulable execution just emits last emitted emission (if any)

Parameters
periodsampling period \scheduler scheduler to use to schedule emissions with provided sampling period
Returns
new specific_observable with the sample_with_time operator as most recent operator.
Warning
#include <rpp/operators/sample.hpp>
Example
rpp::source::interval(std::chrono::milliseconds{100}, rpp::schedulers::trampoline{})
.sample_with_time(std::chrono::milliseconds{300}, rpp::schedulers::trampoline{})
.take(5)
.subscribe([](int v) { std::cout << v << " "; });
// Output: 1 4 7 10 13
Schedules execution of schedulables via queueing tasks to the caller thread with priority to time_poi...
Definition: trampoline_scheduler.hpp:41
auto sample_with_time(schedulers::duration period, const TScheduler &scheduler) const &
Emit most recent emitted from original observable emission obtained during last period of time.
Definition: sample.hpp:62
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store last emitted value
    • Schedules periodical action to emit stored value (if any)
  • OnNext
    • Updates stored value
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Just forwards original on_completed
    • Emit last emitted value (if any)
See also
https://reactivex.io/documentation/operators/sample.htmlhttps://reactivex.io/documentation/operators/sample.html

◆ skip()

template<constraint::decayed_type Type, typename SpecificObservable >
template<typename... Args>
requires is_header_included<skip_tag, Args...>
auto observable::skip ( size_t  count) const &
inline

Skip first count items provided by observable then send rest items as expected.

Actually this operator just decrements counter and starts to forward emissions when counter reaches zero.

Parameters
countamount of items to be skipped
Returns
new specific_observable with the skip operator as most recent operator.
Warning
#include <rpp/operators/skip.hpp>
Example:
rpp::source::just(0,1,2,3,4,5)
.skip(2)
.subscribe([](int v) { std::cout << v << " "; });
// Output: 2 3 4 5
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store counter
  • OnNext
    • Forwards emission if counter is zero
    • Decrements counter if not zero
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Just forwards original on_completed
See also
https://reactivex.io/documentation/operators/skip.html

◆ take()

template<constraint::decayed_type Type, typename SpecificObservable >
template<typename... Args>
requires is_header_included<take_tag, Args...>
auto observable::take ( size_t  count) const &
inline

Emit only first count items provided by observable, then send on_completed

Actually this operator just emits emissions while counter is not zero and decrements counter on each emission

Parameters
countamount of items to be emitted. 0 - instant complete
Returns
new specific_observable with the Take operator as most recent operator.
Warning
#include <rpp/operators/take.hpp>
Example:
rpp::source::just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
.take(2)
.subscribe([](int v) { std::cout << v << " "; });
// Output: 0 1
Implementation details:
  • On subscribe
    • Allocate one shared_ptr to store counter
  • OnNext
    • Just forwards emission if counter is not zero
    • Decrements counter if not zero
    • If counter reached zero, then emits OnCompleted
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Just forwards original on_completed
See also
https://reactivex.io/documentation/operators/take.html

◆ take_last()

template<constraint::decayed_type Type, typename SpecificObservable >
template<typename ... Args>
requires is_header_included<take_last_tag, Args...>
auto observable::take_last ( size_t  count) const &
inline

Emit only last count items provided by observable, then send on_completed

Actually this operator has buffer of requested size inside, keeps last count values and emit stored values on on_completed

Parameters
countamount of last items to be emitted
Returns
new specific_observable with the take_last operator as most recent operator.
Warning
#include <rpp/operators/take_last.hpp>
Example
rpp::source::just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
.take_last(2)
.subscribe([](int v) { std::cout << v << " "; });
// Output: 8 9
Implementation details:
  • On subscribe
    • Allocates one shared_ptr to store internal buffer
  • OnNext
    • Place obtained value into queue
    • If queue contains more values than expected - remove oldest one
  • OnError
    • Just forwards original on_error
  • OnCompleted
    • Emits values stored in queue
See also
https://reactivex.io/documentation/operators/takelast.html