ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
Filtering Operators

Filtering operators are operators that emit only part of items that satisfies some condition. More...

Functions

auto rpp::operators::distinct ()
 For each item from this observable, filter out repeated values and emit only items that have not already been emitted.
 
template<typename EqualityFn >
requires (!utils::is_not_template_callable<EqualityFn> || std::same_as<bool, std::invoke_result_t<EqualityFn, rpp::utils::convertible_to_any, rpp::utils::convertible_to_any>>)
auto rpp::operators::distinct_until_changed (EqualityFn &&equality_fn)
 Suppress consecutive duplicates of emissions from original observable.
 
template<typename Fn >
requires (!utils::is_not_template_callable<Fn> || std::same_as<bool, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
auto rpp::operators::filter (Fn &&predicate)
 Emit only those items from an Observable that satisfies a provided predicate.
 
auto rpp::operators::first ()
 Emit only the first item.
 
auto rpp::operators::last ()
 Emit only the last item provided before on_completed.
 
auto rpp::operators::skip (size_t count)
 Skip first count items provided by observable then send rest items as expected.
 
auto rpp::operators::take (size_t count)
 Emit only first count items provided by observable, then send on_completed
 
auto rpp::operators::take_last (size_t count)
 Emit only last count items provided by observable, then send on_completed
 
template<rpp::schedulers::constraint::scheduler Scheduler = rpp::schedulers::immediate>
auto rpp::operators::throttle (rpp::schedulers::duration period)
 Emit emission from an Observable and then ignore subsequent values during duration of time.
 

Detailed Description

Filtering operators are operators that emit only part of items that satisfies some condition.

See also
https://reactivex.io/documentation/operators.html#filtering

Function Documentation

◆ distinct()

auto rpp::operators::distinct ( )
inline

For each item from this observable, filter out repeated values and emit only items that have not already been emitted.

Warning
This operator keeps an std::unordered_set<T> of past values, so std::hash<T> specialization is required.
See also
https://reactivex.io/documentation/operators/distinct.html

◆ distinct_until_changed()

template<typename EqualityFn >
requires (!utils::is_not_template_callable<EqualityFn> || std::same_as<bool, std::invoke_result_t<EqualityFn, rpp::utils::convertible_to_any, rpp::utils::convertible_to_any>>)
auto rpp::operators::distinct_until_changed ( EqualityFn && equality_fn)

Suppress consecutive duplicates of emissions from original observable.

Actually this operator has std::optional with last item and checks everytime where new emission is same or not.

Performance notes:
  • No any heap allocations at all
  • std::optional to keep last value
  • passing last and emitted value to predicate
Parameters
equality_fnoptional equality comparator function
Warning
#include <rpp/operators/distinct_until_changed.hpp>
Example
rpp::source::just(1, 1, 2, 2, 3, 2, 1)
| rpp::operators::subscribe([](int val) { std::cout << val << " "; });
// Output: 1 2 3 2 1
rpp::source::just(1, 1, 2, 2, 3, 2, 1)
| rpp::operators::distinct_until_changed([](int left, int right) { return left != right; })
| rpp::operators::subscribe([](int val) { std::cout << val << " "; });
// Output: 1 1 1
See also
https://reactivex.io/documentation/operators/distinct.html
Examples
distinct_until_changed.cpp.

◆ filter()

template<typename Fn >
requires (!utils::is_not_template_callable<Fn> || std::same_as<bool, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
auto rpp::operators::filter ( Fn && predicate)

Emit only those items from an Observable that satisfies a provided predicate.

Actually this operator just checks if predicate returns true, then forwards emission

Performance notes:
  • No any heap allocations at all
  • No any copies/moves of emissions, just passing by const& to predicate and then forwarding
Parameters
predicateis predicate used to check emitted items. true -> items satisfies condition, false -> not
Warning
#include <rpp/operators/filter.hpp>
Example:
rpp::source::just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
| rpp::operators::filter([](int v) { return v % 2 == 0; })
| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
// Output: 0 2 4 6 8
See also
https://reactivex.io/documentation/operators/filter.html
Examples
debounce.cpp, filter.cpp, and throttle.cpp.

◆ first()

auto rpp::operators::first ( )
inline

Emit only the first item.

Actually this operator is take(1) with exception during on_completed if no any emision happens. So, it just forwards first obtained emission and emits on_completed immediately

Exceptions
rpp::utils::not_enough_emissionsin case of on_completed obtained without any emissions
Performance notes:
  • No any heap allocations
  • No any copies/moves just forwarding of emission
Warning
#include <rpp/operators/first.hpp>
Example:
rpp::source::just(1, 2, 3, 4, 5)
[](const auto& v) { std::cout << "-" << v; },
[](const std::exception_ptr&) {},
[]() { std::cout << "-|" << std::endl; });
// Source: -1-2-3-4-5--|
// Output: -1-|
rpp::source::empty<int>()
[](const auto& v) { std::cout << "-" << v; },
[](const std::exception_ptr&) { std::cout << "-x" << std::endl; },
[]() { std::cout << "-|" << std::endl; });
// Source: -1-2-3-4-5--|
// Output: -x
See also
https://reactivex.io/documentation/operators/first.html
Examples
first.cpp.

◆ last()

auto rpp::operators::last ( )
inline

Emit only the last item provided before on_completed.

Actually this operator just updates std::optional on every new emission and emits this value on_completed

Exceptions
rpp::utils::not_enough_emissionsin case of on_completed obtained without any emissions
Performance notes:
  • No any heap allocations
  • No replace std::optional with each new emission and move value from optional on_completed
Warning
#include <rpp/operators/last.hpp>
Example:
rpp::source::just(1, 2, 3, 4, 5)
[](const auto& v) { std::cout << "-" << v; },
[](const std::exception_ptr&) {},
[]() { std::cout << "-|" << std::endl; });
// Source: -1-2-3-4-5--|
// Output: -5-|
rpp::source::empty<int>()
[](const auto& v) { std::cout << "-" << v; },
[](const std::exception_ptr&) { std::cout << "-x"; },
[]() { std::cout << "-|" << std::endl; });
// Source: -1-2-3-4-5--|
// Output: -x
See also
https://reactivex.io/documentation/operators/last.html
Examples
last.cpp.

◆ skip()

auto rpp::operators::skip ( size_t count)
inline

Skip first count items provided by observable then send rest items as expected.

Actually this operator just decrements counter and starts to forward emissions when counter reaches zero.

Performance notes:
  • No any heap allocations
  • No any copies/moves just forwarding of emission
  • Just simple size_t decrementing
Parameters
countamount of items to be skipped
Warning
#include <rpp/operators/skip.hpp>
Example:
rpp::source::just(0, 1, 2, 3, 4, 5)
| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
// Output: 2 3 4 5
See also
https://reactivex.io/documentation/operators/skip.html
Examples
skip.cpp.

◆ take()

auto rpp::operators::take ( size_t count)
inline

Emit only first count items provided by observable, then send on_completed

Actually this operator just emits emissions while counter is not zero and decrements counter on each emission

Performance notes:
  • No any heap allocations
  • No any copies/moves just forwarding of emission
  • Just simple size_t decrementing
Parameters
countamount of items to be emitted. 0 - instant complete
Warning
#include <rpp/operators/take.hpp>
Example:
rpp::source::from_iterable(std::vector{0, 1, 2, 3, 4})
| rpp::operators::subscribe([](int v) { std::cout << v << " "; });
// Output: 0 1
See also
https://reactivex.io/documentation/operators/take.html
Examples
interval.cpp, repeat.cpp, and take.cpp.

◆ take_last()

auto rpp::operators::take_last ( size_t count)
inline

Emit only last count items provided by observable, then send on_completed

Actually this operator has buffer of requested size inside, keeps last count values and emit stored values on on_completed

Parameters
countamount of last items to be emitted
Warning
#include <rpp/operators/take_last.hpp>
Example
rpp::source::just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
| rpp::ops::take_last(2)
| rpp::ops::subscribe([](int v) { std::cout << v << " "; });
// Output: 8 9
See also
https://reactivex.io/documentation/operators/takelast.html

◆ throttle()

template<rpp::schedulers::constraint::scheduler Scheduler = rpp::schedulers::immediate>
auto rpp::operators::throttle ( rpp::schedulers::duration period)

Emit emission from an Observable and then ignore subsequent values during duration of time.

Actually this operator just keeps time of last emission, skips values ​​until duration has passed, then forwards value, updates timepoint and etc...

Performance notes:
  • No any heap allocations at all
  • No any copies/moves of emissions, just passing by const& to predicate and then forwarding
  • Obtaining "now" every emission
Parameters
periodis period of time to skip subsequent emissions
Template Parameters
Scheduleris type used to determine now(). Shouldn't be used in production code
Warning
#include <rpp/operators/throttle.hpp>
Example:
auto start = rpp::schedulers::clock_type::now();
return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds(500) * v, rpp::schedulers::current_thread{});
})
| rpp::operators::filter([&](int v) {
std::cout << "> Sent value " << v << " at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl;
return true;
})
| rpp::operators::throttle(std::chrono::milliseconds{700})
| rpp::operators::subscribe([&](int v) { std::cout << ">>> new value " << v << " at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl; },
[](const std::exception_ptr&) {},
[&]() { std::cout << ">>> completed at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl; });
// Output:
// > Sent value 1 at 500
// >>> new value 1 at 500
// > Sent value 2 at 1000
// > Sent value 5 at 2500
// >>> new value 5 at 2500
// > Sent value 6 at 3000
// > Sent value 9 at 4500
// >>> new value 9 at 4500
// > Sent value 10 at 5000
// >>> completed at 5000
See also
https://reactivex.io/documentation/operators/debounce.html
Examples
throttle.cpp.