ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
connect.cpp
#include <rpp/rpp.hpp>
#include "rpp/sources/fwd.hpp"
#include <iostream>
int main() // NOLINT(bugprone-exception-escape)
{
const auto observable = rpp::source::interval(std::chrono::milliseconds{50}, rpp::schedulers::new_thread{})
| rpp::ops::map([](int v) {
std::cout << "value in map" << v << std::endl;
return v;
})
std::cout << "CONNECT" << std::endl;
auto d = observable.connect(); // subscribe happens right now
std::this_thread::sleep_for(std::chrono::milliseconds{150});
std::cout << "SUBSCRIBE" << std::endl;
observable.subscribe([](int v) { std::cout << "observer value " << v << std::endl; });
std::this_thread::sleep_for(std::chrono::milliseconds{150});
d.dispose();
std::cout << "DISPOSE" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds{150});
// possible output:
// CONNECT
// value in map0
// value in map1
// value in map2
// SUBSCRIBE
// value in map3
// observer value 3
// value in map4
// observer value 4
// value in map5
// observer value 5
// DISPOSE
}
Scheduler which schedules invoking of schedulables to another thread via queueing tasks with priority...
Definition new_thread.hpp:31
auto publish()
Converts ordinary observable to rpp::connectable_observable with help of inline instsantiated publish...
Definition publish.hpp:22
auto interval(rpp::schedulers::duration initial, rpp::schedulers::duration period, TScheduler &&scheduler)
Creates rpp::observable that emits a sequential integer every specified time interval,...
Definition interval.hpp:72
auto map(Fn &&callable)
Transforms the items emitted by an Observable via applying a function to each item and emitting resul...
Definition map.hpp:94