ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
throttle.cpp
#include <rpp/rpp.hpp>
#include <iostream>
int main()
{
auto start = rpp::schedulers::clock_type::now();
return rpp::source::just(v) | rpp::operators::delay(std::chrono::milliseconds(500) * v, rpp::schedulers::current_thread{});
})
| rpp::operators::filter([&](int v) {
std::cout << "> Sent value " << v << " at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl;
return true;
})
| rpp::operators::throttle(std::chrono::milliseconds{700})
| rpp::operators::subscribe([&](int v) { std::cout << ">>> new value " << v << " at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl; },
[](const std::exception_ptr&) {},
[&]() { std::cout << ">>> completed at " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << std::endl; });
// Output:
// > Sent value 1 at 500
// >>> new value 1 at 500
// > Sent value 2 at 1000
// > Sent value 5 at 2500
// >>> new value 5 at 2500
// > Sent value 6 at 3000
// > Sent value 9 at 4500
// >>> new value 9 at 4500
// > Sent value 10 at 5000
// >>> completed at 5000
return 0;
}
Schedules execution of schedulables via queueing tasks to the caller thread with priority to time_poi...
Definition current_thread.hpp:86
auto just(const TScheduler &scheduler, T &&item, Ts &&... items)
Creates rpp::observable that emits a particular items and completes.
Definition from.hpp:201
auto filter(Fn &&predicate)
Emit only those items from an Observable that satisfies a provided predicate.
Definition filter.hpp:91
auto throttle(rpp::schedulers::duration period)
Emit emission from an Observable and then ignore subsequent values during duration of time.
Definition throttle.hpp:98
auto flat_map(Fn &&callable)
Transform the items emitted by an Observable into Observables, then flatten the emissions from those ...
Definition flat_map.hpp:64
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226
auto delay(rpp::schedulers::duration delay_duration, Scheduler &&scheduler)
Shift the emissions from an Observable forward in time by a particular amount.
Definition delay.hpp:219