Filtering operators are operators that emit only part of items that satisfies some condition.
More...
auto rpp::operators::distinct ()
For each item from this observable, filter out repeated values and emit only items that have not already been emitted.
template<typename EqualityFn >
requires (!utils::is_not_template_callable<EqualityFn> || std::same_as<bool, std::invoke_result_t<EqualityFn, rpp::utils::convertible_to_any, rpp::utils::convertible_to_any>>)
auto rpp::operators::distinct_until_changed (EqualityFn &&equality_fn)
Suppress consecutive duplicates of emissions from original observable.
template<typename Fn >
requires (!utils::is_not_template_callable<Fn> || std::same_as<bool, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
auto rpp::operators::filter (Fn &&predicate)
Emit only those items from an Observable that satisfies a provided predicate.
auto rpp::operators::first ()
Emit only the first item.
auto rpp::operators::last ()
Emit only the last item provided before on_completed.
auto rpp::operators::skip (size_t count)
Skip first count
items provided by observable then send rest items as expected.
auto rpp::operators::take (size_t count)
Emit only first count
items provided by observable, then send on_completed
auto rpp::operators::take_last (size_t count)
Emit only last count
items provided by observable, then send on_completed
template<rpp::schedulers::constraint::scheduler Scheduler = rpp::schedulers::immediate>
auto rpp::operators::throttle (rpp::schedulers::duration period)
Emit emission from an Observable and then ignore subsequent values during duration
of time.
Filtering operators are operators that emit only part of items that satisfies some condition.
See also https://reactivex.io/documentation/operators.html#filtering
◆ distinct()
auto rpp::operators::distinct
(
)
inline
For each item from this observable, filter out repeated values and emit only items that have not already been emitted.
Warning This operator keeps an std::unordered_set<T>
of past values, so std::hash<T> specialization is required.
See also https://reactivex.io/documentation/operators/distinct.html
◆ distinct_until_changed()
template<typename EqualityFn >
requires (!utils::is_not_template_callable<EqualityFn> || std::same_as<bool, std::invoke_result_t<EqualityFn, rpp::utils::convertible_to_any, rpp::utils::convertible_to_any>>)
auto rpp::operators::distinct_until_changed
(
EqualityFn && equality_fn )
◆ filter()
template<typename Fn >
requires (!utils::is_not_template_callable<Fn> || std::same_as<bool, std::invoke_result_t<Fn, rpp::utils::convertible_to_any>>)
auto rpp::operators::filter
(
Fn && predicate )
Emit only those items from an Observable that satisfies a provided predicate.
Actually this operator just checks if predicate returns true, then forwards emission
Performance notes:
No any heap allocations at all
No any copies/moves of emissions, just passing by const& to predicate and then forwarding
Parameters
predicate is predicate used to check emitted items. true -> items satisfies condition, false -> not
Warning #include <rpp/operators/filter.hpp >
Example:
See also https://reactivex.io/documentation/operators/filter.html
Examples debounce.cpp , filter.cpp , and throttle.cpp .
◆ first()
auto rpp::operators::first
(
)
inline
Emit only the first item.
Actually this operator is take(1)
with exception during on_completed
if no any emision happens. So, it just forwards first obtained emission and emits on_completed immediately
Exceptions
rpp::utils::not_enough_emissions in case of on_completed obtained without any emissions
Performance notes:
No any heap allocations
No any copies/moves just forwarding of emission
Warning #include <rpp/operators/first.hpp >
Example:
[](const auto & v) { std::cout << "-" << v; },
[](const std::exception_ptr&) {},
[]() { std::cout << "-|" << std::endl; });
rpp::source::empty<int>()
[](const auto & v) { std::cout << "-" << v; },
[](const std::exception_ptr&) { std::cout << "-x" << std::endl; },
[]() { std::cout << "-|" << std::endl; });
See also https://reactivex.io/documentation/operators/first.html
Examples first.cpp .
◆ last()
auto rpp::operators::last
(
)
inline
Emit only the last item provided before on_completed.
Actually this operator just updates std::optional
on every new emission and emits this value on_completed
Exceptions
rpp::utils::not_enough_emissions in case of on_completed obtained without any emissions
Performance notes:
No any heap allocations
No replace std::optional with each new emission and move value from optional on_completed
Warning #include <rpp/operators/last.hpp >
Example:
[](const auto & v) { std::cout << "-" << v; },
[](const std::exception_ptr&) {},
[]() { std::cout << "-|" << std::endl; });
rpp::source::empty<int>()
[](const auto & v) { std::cout << "-" << v; },
[](const std::exception_ptr&) { std::cout << "-x" ; },
[]() { std::cout << "-|" << std::endl; });
See also https://reactivex.io/documentation/operators/last.html
Examples last.cpp .
◆ skip()
auto rpp::operators::skip
(
size_t count )
inline
Skip first count
items provided by observable then send rest items as expected.
Actually this operator just decrements counter and starts to forward emissions when counter reaches zero.
Performance notes:
No any heap allocations
No any copies/moves just forwarding of emission
Just simple size_t
decrementing
Parameters
count amount of items to be skipped
Warning #include <rpp/operators/skip.hpp >
Example:
See also https://reactivex.io/documentation/operators/skip.html
Examples skip.cpp .
◆ take()
auto rpp::operators::take
(
size_t count )
inline
Emit only first count
items provided by observable, then send on_completed
Actually this operator just emits emissions while counter is not zero and decrements counter on each emission
Performance notes:
No any heap allocations
No any copies/moves just forwarding of emission
Just simple size_t
decrementing
Parameters
count amount of items to be emitted. 0 - instant complete
Warning #include <rpp/operators/take.hpp >
Example:
See also https://reactivex.io/documentation/operators/take.html
Examples interval.cpp , repeat.cpp , and take.cpp .
◆ take_last()
auto rpp::operators::take_last
(
size_t count )
inline
Emit only last count
items provided by observable, then send on_completed
Actually this operator has buffer of requested size inside, keeps last count
values and emit stored values on on_completed
Parameters
count amount of last items to be emitted
Warning #include <rpp/operators/take_last.hpp >
Example
| rpp::ops::take_last(2)
| rpp::ops::subscribe([](int v) { std::cout << v << " " ; });
See also https://reactivex.io/documentation/operators/takelast.html
◆ throttle()
auto rpp::operators::throttle
(
rpp::schedulers::duration period )
Emit emission from an Observable and then ignore subsequent values during duration
of time.
Actually this operator just keeps time of last emission, skips values until duration has passed, then forwards value, updates timepoint and etc...
Performance notes:
No any heap allocations at all
No any copies/moves of emissions, just passing by const& to predicate and then forwarding
Obtaining "now" every emission
Parameters
period is period of time to skip subsequent emissions
Template Parameters
Scheduler is type used to determine now()
. Shouldn't be used in production code
Warning #include <rpp/operators/throttle.hpp >
Example: auto start = rpp::schedulers::clock_type::now();
})
std::cout << "> Sent value " << v << " at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl;
return true ;
})
|
rpp::operators::subscribe ([&](
int v) { std::cout <<
">>> new value " << v <<
" at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl; },
[](const std::exception_ptr&) {},
[&]() { std::cout << ">>> completed at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl; });
See also https://reactivex.io/documentation/operators/debounce.html
Examples throttle.cpp .