ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
rpp::connectable_observable< OriginalObservable, Subject > Class Template Reference

Extension over raw observable with ability to be manually connected at any time or ref_counting (sharing same observable between multiple observers) More...

#include <connectable_observable.hpp>

Public Member Functions

 connectable_observable (const OriginalObservable &original_observable, const Subject &subject=Subject{})
 
 connectable_observable (OriginalObservable &&original_observable, const Subject &subject=Subject{})
 
rpp::disposable_wrapper connect (rpp::composite_disposable_wrapper wrapper=composite_disposable_wrapper::make()) const
 Connects to underlying observable right-now making it hot-observable.
 
auto ref_count () const
 Forces rpp::connectable_observable to behave like common observable.
 
template<typename Op >
auto operator| (Op &&op) const &
 
template<typename Op >
auto operator| (Op &&op) &&
 
template<typename Op >
auto pipe (Op &&op) const &
 
template<typename Op >
auto pipe (Op &&op) &&
 

Detailed Description

template<rpp::constraint::observable OriginalObservable, typename Subject>
class rpp::connectable_observable< OriginalObservable, Subject >

Extension over raw observable with ability to be manually connected at any time or ref_counting (sharing same observable between multiple observers)

Member Function Documentation

◆ connect()

template<rpp::constraint::observable OriginalObservable, typename Subject >
rpp::disposable_wrapper rpp::connectable_observable< OriginalObservable, Subject >::connect ( rpp::composite_disposable_wrapper wrapper = composite_disposable_wrapper::make()) const
inline

Connects to underlying observable right-now making it hot-observable.

Example:
const auto observable = rpp::source::interval(std::chrono::milliseconds{50}, rpp::schedulers::new_thread{})
| rpp::ops::map([](int v) {
std::cout << "value in map" << v << std::endl;
return v;
})
| rpp::ops::publish();
std::cout << "CONNECT" << std::endl;
auto d = observable.connect(); // subscribe happens right now
std::this_thread::sleep_for(std::chrono::milliseconds{150});
std::cout << "SUBSCRIBE" << std::endl;
observable.subscribe([](int v) { std::cout << "observer value " << v << std::endl; });
std::this_thread::sleep_for(std::chrono::milliseconds{150});
d.dispose();
std::cout << "DISPOSE" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds{150});
// possible output:
// CONNECT
// value in map0
// value in map1
// value in map2
// SUBSCRIBE
// value in map3
// observer value 3
// value in map4
// observer value 4
// value in map5
// observer value 5
// DISPOSE

The documentation for this class was generated from the following files: