ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
Filtering Operators

Filtering operators are operators that emit only part of items that satisfies some condition. More...

Functions

auto rpp::operators::distinct ()
 For each item from this observable, filter out repeated values and emit only items that have not already been emitted.
 
template<typename EqualityFn>
requires (!utils::is_not_template_callable<EqualityFn> || std::same_as<bool, std::invoke_result_t<EqualityFn, rpp::utils::convertible_to_any, rpp::utils::convertible_to_any>>)
auto rpp::operators::distinct_until_changed (EqualityFn &&equality_fn)
 Suppress consecutive duplicates of emissions from original observable.
 
auto rpp::operators::element_at (size_t index)
 Emit item located at specified index location in the sequence of items emitted by the source observable.
 
template<typename Fn>
requires (!utils::is_not_template_callable<Fn> || std::same_as<bool, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
auto rpp::operators::filter (Fn &&predicate)
 Emit only those items from an Observable that satisfies a provided predicate.
 
auto rpp::operators::first ()
 Emit only the first item.
 
auto rpp::operators::last ()
 Emit only the last item provided before on_completed.
 
auto rpp::operators::skip (size_t count)
 Skip first count items provided by observable then send rest items as expected.
 
auto rpp::operators::take (size_t count)
 Emit only first count items provided by observable, then send on_completed
 
auto rpp::operators::take_last (size_t count)
 Emit only last count items provided by observable, then send on_completed
 
template<rpp::schedulers::constraint::scheduler Scheduler = rpp::schedulers::immediate>
auto rpp::operators::throttle (rpp::schedulers::duration period)
 Emit emission from an Observable and then ignore subsequent values during duration of time.
 

Detailed Description

Filtering operators are operators that emit only part of items that satisfies some condition.

See also
https://reactivex.io/documentation/operators.html#filtering

Function Documentation

◆ distinct()

auto rpp::operators::distinct ( )
inline

For each item from this observable, filter out repeated values and emit only items that have not already been emitted.

Warning
This operator keeps an std::unordered_set<T> of past values, so std::hash<T> specialization is required.
See also
https://reactivex.io/documentation/operators/distinct.html

◆ distinct_until_changed()

template<typename EqualityFn>
requires (!utils::is_not_template_callable<EqualityFn> || std::same_as<bool, std::invoke_result_t<EqualityFn, rpp::utils::convertible_to_any, rpp::utils::convertible_to_any>>)
auto rpp::operators::distinct_until_changed ( EqualityFn && equality_fn)

Suppress consecutive duplicates of emissions from original observable.

Actually this operator has std::optional with last item and checks everytime where new emission is same or not.

Performance notes:
  • No any heap allocations at all
  • std::optional to keep last value
  • passing last and emitted value to predicate
Parameters
equality_fnoptional equality comparator function
Note
#include <rpp/operators/distinct_until_changed.hpp>
Example
rpp::source::just(1, 1, 2, 2, 3, 2, 1)
| rpp::operators::subscribe([](int val) { std::cout << val << " "; });
// Output: 1 2 3 2 1
rpp::source::just(1, 1, 2, 2, 3, 2, 1)
| rpp::operators::distinct_until_changed([](int left, int right) { return left != right; })
| rpp::operators::subscribe([](int val) { std::cout << val << " "; });
// Output: 1 1 1
See also
https://reactivex.io/documentation/operators/distinct.html
Examples
distinct_until_changed.cpp.

◆ element_at()

auto rpp::operators::element_at ( size_t index)
inline

Emit item located at specified index location in the sequence of items emitted by the source observable.

If source observable completes without emitting at least index + 1 items, observable emits an error

Parameters
indexindex of the item to return
Note
#include <rpp/operators/element_at.hpp>
See also
https://reactivex.io/documentation/operators/elementat.html

◆ filter()

template<typename Fn>
requires (!utils::is_not_template_callable<Fn> || std::same_as<bool, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
auto rpp::operators::filter ( Fn && predicate)

Emit only those items from an Observable that satisfies a provided predicate.

Actually this operator just checks if predicate returns true, then forwards emission

Performance notes:
  • No any heap allocations at all
  • No any copies/moves of emissions, just passing by const& to predicate and then forwarding
Parameters
predicateis predicate used to check emitted items. true -> items satisfies condition, false -> not
Note
#include <rpp/operators/filter.hpp>
Example:
rpp::source::just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
| rpp::operators::filter([](int v) { return v % 2 == 0; })
| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
// Output: 0 2 4 6 8
See also
https://reactivex.io/documentation/operators/filter.html
Examples
filter.cpp, and window_toggle.cpp.

◆ first()

auto rpp::operators::first ( )
inline

Emit only the first item.

Actually this operator is take(1) with exception during on_completed if no any emision happens. So, it just forwards first obtained emission and emits on_completed immediately

Exceptions
rpp::utils::not_enough_emissionsin case of on_completed obtained without any emissions
Performance notes:
  • No any heap allocations
  • No any copies/moves just forwarding of emission
Note
#include <rpp/operators/first.hpp>
Example:
rpp::source::just(1, 2, 3, 4, 5)
[](const auto& v) { std::cout << "-" << v; },
[](const std::exception_ptr&) {},
[]() { std::cout << "-|" << std::endl; });
// Source: -1-2-3-4-5--|
// Output: -1-|
[](const auto& v) { std::cout << "-" << v; },
[](const std::exception_ptr&) { std::cout << "-x" << std::endl; },
[]() { std::cout << "-|" << std::endl; });
// Source: -1-2-3-4-5--|
// Output: -x
See also
https://reactivex.io/documentation/operators/first.html
Examples
first.cpp.

◆ last()

auto rpp::operators::last ( )
inline

Emit only the last item provided before on_completed.

Actually this operator just updates std::optional on every new emission and emits this value on_completed

Exceptions
rpp::utils::not_enough_emissionsin case of on_completed obtained without any emissions
Performance notes:
  • No any heap allocations
  • No replace std::optional with each new emission and move value from optional on_completed
Note
#include <rpp/operators/last.hpp>
Example:
rpp::source::just(1, 2, 3, 4, 5)
[](const auto& v) { std::cout << "-" << v; },
[](const std::exception_ptr&) {},
[]() { std::cout << "-|" << std::endl; });
// Source: -1-2-3-4-5--|
// Output: -5-|
[](const auto& v) { std::cout << "-" << v; },
[](const std::exception_ptr&) { std::cout << "-x"; },
[]() { std::cout << "-|" << std::endl; });
// Source: -1-2-3-4-5--|
// Output: -x
See also
https://reactivex.io/documentation/operators/last.html
Examples
last.cpp.

◆ skip()

auto rpp::operators::skip ( size_t count)
inline

Skip first count items provided by observable then send rest items as expected.

Actually this operator just decrements counter and starts to forward emissions when counter reaches zero.

Performance notes:
  • No any heap allocations
  • No any copies/moves just forwarding of emission
  • Just simple size_t decrementing
Parameters
countamount of items to be skipped
Note
#include <rpp/operators/skip.hpp>
Example:
rpp::source::just(0, 1, 2, 3, 4, 5)
| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
// Output: 2 3 4 5
See also
https://reactivex.io/documentation/operators/skip.html
Examples
skip.cpp.

◆ take()

auto rpp::operators::take ( size_t count)
inline

Emit only first count items provided by observable, then send on_completed

Actually this operator just emits emissions while counter is not zero and decrements counter on each emission

Performance notes:
  • No any heap allocations
  • No any copies/moves just forwarding of emission
  • Just simple size_t decrementing
Parameters
countamount of items to be emitted. 0 - instant complete
Note
#include <rpp/operators/take.hpp>
Example:
rpp::source::from_iterable(std::vector{0, 1, 2, 3, 4})
| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
// Output: 0 1
See also
https://reactivex.io/documentation/operators/take.html
Examples
repeat.cpp, and retry.cpp.

◆ take_last()

auto rpp::operators::take_last ( size_t count)
inline

Emit only last count items provided by observable, then send on_completed

Actually this operator has buffer of requested size inside, keeps last count values and emit stored values on on_completed

Parameters
countamount of last items to be emitted
Note
#include <rpp/operators/take_last.hpp>
Example
rpp::source::just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
| rpp::ops::subscribe([](int v) { std::cout << v << " "; });
// Output: 8 9
See also
https://reactivex.io/documentation/operators/takelast.html
Examples
take_last.cpp.

◆ throttle()

template<rpp::schedulers::constraint::scheduler Scheduler = rpp::schedulers::immediate>
auto rpp::operators::throttle ( rpp::schedulers::duration period)

Emit emission from an Observable and then ignore subsequent values during duration of time.

Actually this operator just keeps time of last emission, skips values ​​until duration has passed, then forwards value, updates timepoint and etc...

Performance notes:
  • No any heap allocations at all
  • No any copies/moves of emissions, just passing by const& to predicate and then forwarding
  • Obtaining "now" every emission
Parameters
periodis period of time to skip subsequent emissions
Template Parameters
Scheduleris type used to determine now(). Shouldn't be used in production code
Note
#include <rpp/operators/throttle.hpp>
Example:
auto start = rpp::schedulers::clock_type::now();
return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds(500) * v, rpp::schedulers::current_thread{});
})
| rpp::operators::filter([&](int v) {
std::cout << "> Sent value " << v << " at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl;
return true;
})
| rpp::operators::throttle(std::chrono::milliseconds{700})
| rpp::operators::subscribe([&](int v) { std::cout << ">>> new value " << v << " at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl; },
[](const std::exception_ptr&) {},
[&]() { std::cout << ">>> completed at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl; });
// Output:
// > Sent value 1 at 500
// >>> new value 1 at 500
// > Sent value 2 at 1000
// > Sent value 5 at 2500
// >>> new value 5 at 2500
// > Sent value 6 at 3000
// > Sent value 9 at 4500
// >>> new value 9 at 4500
// > Sent value 10 at 5000
// >>> completed at 5000
See also
https://reactivex.io/documentation/operators/debounce.html