ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
rpp::observable< Type, Strategy > Class Template Reference

Base class for any observable used in RPP. It handles core callbacks of observable. More...

#include <observable.hpp>

Inheritance diagram for rpp::observable< Type, Strategy >:
rpp::grouped_observable< KeyType, Type, Strategy >

Public Types

using value_type = Type
 
using strategy_type = Strategy
 
using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t<Strategy>
 

Public Member Functions

template<typename... Args>
requires (!constraint::variadic_decayed_same_as<observable<Type, Strategy>, Args...> && constraint::is_constructible_from<Strategy, Args && ...>)
 observable (Args &&... args)
 
template<constraint::observer_strategy< Type > ObserverStrategy>
void subscribe (observer< Type, ObserverStrategy > &&observer) const
 Subscribes passed observer to emissions from this observable.
 
void subscribe (dynamic_observer< Type > observer) const
 Subscribe passed observer to emissions from this observable.
 
template<constraint::observer_strategy< Type > ObserverStrategy>
requires (!constraint::observer<ObserverStrategy>)
void subscribe (ObserverStrategy &&observer_strategy) const
 Subscribes passed observer strategy to emissions from this observable via construction of observer.
 
template<constraint::observer_strategy< Type > ObserverStrategy>
composite_disposable_wrapper subscribe (const composite_disposable_wrapper &d, observer< Type, ObserverStrategy > &&obs) const
 Subscribe passed observer to emissions from this observable.
 
template<constraint::observer_strategy< Type > ObserverStrategy>
requires (!constraint::observer<ObserverStrategy>)
composite_disposable_wrapper subscribe (const composite_disposable_wrapper &d, ObserverStrategy &&observer_strategy) const
 Subscribes passed observer strategy to emissions from this observable via construction of observer.
 
template<constraint::observer_strategy< Type > ObserverStrategy>
composite_disposable_wrapper subscribe_with_disposable (observer< Type, ObserverStrategy > &&observer) const
 Subscribes passed observer to emissions from this observable.
 
template<constraint::observer_strategy< Type > ObserverStrategy>
requires (!constraint::observer<ObserverStrategy>)
composite_disposable_wrapper subscribe_with_disposable (ObserverStrategy &&observer_strategy) const
 Subscribes observer strategy to emissions from this observable.
 
composite_disposable_wrapper subscribe_with_disposable (dynamic_observer< Type > observer) const
 Subscribe passed observer to emissions from this observable.
 
template<std::invocable< Type > OnNext, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
void subscribe (OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) const
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
 
template<std::invocable< Type > OnNext, std::invocable<> OnCompleted>
void subscribe (OnNext &&on_next, OnCompleted &&on_completed) const
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
 
template<std::invocable< Type > OnNext, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
composite_disposable_wrapper subscribe_with_disposable (OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) const
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
 
template<std::invocable< Type > OnNext, std::invocable<> OnCompleted>
composite_disposable_wrapper subscribe_with_disposable (OnNext &&on_next, OnCompleted &&on_completed) const
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
 
template<std::invocable< Type > OnNext, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
composite_disposable_wrapper subscribe (const composite_disposable_wrapper &d, OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) const
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
 
template<std::invocable< Type > OnNext, std::invocable<> OnCompleted>
composite_disposable_wrapper subscribe (const composite_disposable_wrapper &d, OnNext &&on_next, OnCompleted &&on_completed) const
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
 
auto as_dynamic () const &
 Convert observable to type-erased version.
 
auto as_dynamic () &&
 Convert observable to type-erased version.
 
template<constraint::operator_base< Type > Op>
auto operator| (Op &&op) const &
 
template<constraint::operator_base< Type > Op>
auto operator| (Op &&op) &&
 
template<constraint::operator_observable_transform< const observable & > Op>
auto operator| (Op &&op) const &
 
template<constraint::operator_observable_transform< observable && > Op>
auto operator| (Op &&op) &&
 
template<typename... Args>
auto operator| (const rpp::operators::details::subscribe_t< Args... > &op) const
 
template<typename... Args>
auto operator| (rpp::operators::details::subscribe_t< Args... > &&op) const
 
template<typename Op >
auto pipe (Op &&op) const &
 
template<typename Op >
auto pipe (Op &&op) &&
 

Detailed Description

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
class rpp::observable< Type, Strategy >

Base class for any observable used in RPP. It handles core callbacks of observable.

Observable provides only one core function: subscribe - it accepts observer (or any way to construct it) and then invokes underlying Strategy to emit emissions somehow.

Warning
Actually observable "doesn't emit nothing", it only invokes Strategy! Strategy COULD emit emissions immediately OR place observer to some queue or something like this to obtain emissions later (for example subjects)
Expected that observable's strategy would work with observer in serialized way
Note
In case of you are need to keep some "abstract" observable of Type, you can use type-erased version: rpp::dynamic_observable
Template Parameters
Typeof value this observable would provide. Only observers of same type can be subscribed to this observable.
Strategyused to provide logic over observable's callbacks.

Member Function Documentation

◆ subscribe() [1/6]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<constraint::observer_strategy< Type > ObserverStrategy>
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe ( const composite_disposable_wrapper & d,
observer< Type, ObserverStrategy > && obs ) const
inline

Subscribe passed observer to emissions from this observable.

This overloading attaches passed disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Parameters
dis disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed
Example
| rpp::operators::subscribe(disposable, rpp::make_lambda_observer([](int) { std::cout << "NEW VALUE" << std::endl; }));
std::this_thread::sleep_for(std::chrono::seconds(1));
disposable.dispose();
std::this_thread::sleep_for(std::chrono::seconds(1));
static disposable_wrapper_impl make(TArgs &&... args)
Way to create disposable_wrapper. Passed TTarget type can be any type derived from TDisposable.
Definition disposable_wrapper.hpp:133
Scheduler which schedules invoking of schedulables to another thread via queueing tasks with priority...
Definition new_thread.hpp:31
auto just(const TScheduler &scheduler, T &&item, Ts &&... items)
Creates rpp::observable that emits a particular items and completes.
Definition from.hpp:206
auto make_lambda_observer(OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) -> lambda_observer< Type, std::decay_t< OnNext >, std::decay_t< OnError >, std::decay_t< OnCompleted > >
Constructs observer specialized with passed callbacks. Most easiesest way to construct observer "on t...
Definition lambda_observer.hpp:49
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226
auto repeat()
Repeats the Observabe's sequence of emissions infinite amount of times via re-subscribing on it durin...
Definition repeat.hpp:86
auto subscribe_on(Scheduler &&scheduler)
OnSubscribe function for this observable will be scheduled via provided scheduler.
Definition subscribe_on.hpp:80

◆ subscribe() [2/6]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<constraint::observer_strategy< Type > ObserverStrategy>
requires (!constraint::observer<ObserverStrategy>)
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe ( const composite_disposable_wrapper & d,
ObserverStrategy && observer_strategy ) const
inline

Subscribes passed observer strategy to emissions from this observable via construction of observer.

This overloading attaches passed disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Parameters
dis disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed

◆ subscribe() [3/6]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<std::invocable< Type > OnNext, std::invocable<> OnCompleted>
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe ( const composite_disposable_wrapper & d,
OnNext && on_next,
OnCompleted && on_completed ) const
inline

Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.

This overloading attaches passed disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Parameters
dis disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed
Example
| rpp::operators::subscribe(disposable, [](int) { std::cout << "NEW VALUE" << std::endl; });
std::this_thread::sleep_for(std::chrono::seconds(1));
disposable.dispose();
std::this_thread::sleep_for(std::chrono::seconds(1));

◆ subscribe() [4/6]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<std::invocable< Type > OnNext, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe ( const composite_disposable_wrapper & d,
OnNext && on_next,
OnError && on_error = {},
OnCompleted && on_completed = {} ) const
inline

Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.

This overloading attaches passed disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Parameters
dis disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed
Example
| rpp::operators::subscribe(disposable, [](int) { std::cout << "NEW VALUE" << std::endl; });
std::this_thread::sleep_for(std::chrono::seconds(1));
disposable.dispose();
std::this_thread::sleep_for(std::chrono::seconds(1));

◆ subscribe() [5/6]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
void rpp::observable< Type, Strategy >::subscribe ( dynamic_observer< Type > observer) const
inline

Subscribe passed observer to emissions from this observable.

Special overloading for dynamic observer to enable copy of observer

◆ subscribe() [6/6]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<constraint::observer_strategy< Type > ObserverStrategy>
void rpp::observable< Type, Strategy >::subscribe ( observer< Type, ObserverStrategy > && observer) const
inline

Subscribes passed observer to emissions from this observable.

Warning
Observer must be moved in to subscribe method. (Not recommended) If you need to copy observer, convert it to dynamic_observer

◆ subscribe_with_disposable() [1/5]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe_with_disposable ( dynamic_observer< Type > observer) const
inline

Subscribe passed observer to emissions from this observable.

This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed

Special overloading for dynamic observer to enable copy of observer

◆ subscribe_with_disposable() [2/5]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<constraint::observer_strategy< Type > ObserverStrategy>
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe_with_disposable ( observer< Type, ObserverStrategy > && observer) const
inline

Subscribes passed observer to emissions from this observable.

This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed
Warning
Observer must be moved in to subscribe method. (Not recommended) If you need to copy observer, convert it to dynamic_observer

◆ subscribe_with_disposable() [3/5]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<constraint::observer_strategy< Type > ObserverStrategy>
requires (!constraint::observer<ObserverStrategy>)
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe_with_disposable ( ObserverStrategy && observer_strategy) const
inline

Subscribes observer strategy to emissions from this observable.

This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed

◆ subscribe_with_disposable() [4/5]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<std::invocable< Type > OnNext, std::invocable<> OnCompleted>
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe_with_disposable ( OnNext && on_next,
OnCompleted && on_completed ) const
inline

Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.

This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed

◆ subscribe_with_disposable() [5/5]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<std::invocable< Type > OnNext, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe_with_disposable ( OnNext && on_next,
OnError && on_error = {},
OnCompleted && on_completed = {} ) const
inline

Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.

This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed

The documentation for this class was generated from the following files: