ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
rpp::observable< Type, Strategy > Class Template Reference

Base class for any observable used in RPP. It handles core callbacks of observable. More...

#include <observable.hpp>

Public Types

using value_type = Type
 
using strategy_type = Strategy
 
using optimal_disposables_strategy = typename Strategy::optimal_disposables_strategy
 

Public Member Functions

template<typename... Args>
requires (!constraint::variadic_decayed_same_as<observable<Type, Strategy>, Args...> && constraint::is_constructible_from<Strategy, Args && ...>)
 observable (Args &&... args)
 
template<constraint::observer_strategy< Type > ObserverStrategy>
void subscribe (observer< Type, ObserverStrategy > &&observer) const
 Subscribes passed observer to emissions from this observable.
 
void subscribe (dynamic_observer< Type > observer) const
 Subscribe passed observer to emissions from this observable.
 
template<constraint::observer_strategy< Type > ObserverStrategy>
requires (!constraint::observer<ObserverStrategy>)
void subscribe (ObserverStrategy &&observer_strategy) const
 Subscribes passed observer strategy to emissions from this observable via construction of observer.
 
template<constraint::observer_strategy< Type > ObserverStrategy>
composite_disposable_wrapper subscribe (const composite_disposable_wrapper &d, observer< Type, ObserverStrategy > &&obs) const
 Subscribe passed observer to emissions from this observable.
 
template<constraint::observer_strategy< Type > ObserverStrategy>
requires (!constraint::observer<ObserverStrategy>)
composite_disposable_wrapper subscribe (const composite_disposable_wrapper &d, ObserverStrategy &&observer_strategy) const
 Subscribes passed observer strategy to emissions from this observable via construction of observer.
 
template<constraint::observer_strategy< Type > ObserverStrategy>
composite_disposable_wrapper subscribe_with_disposable (observer< Type, ObserverStrategy > &&observer) const
 Subscribes passed observer to emissions from this observable.
 
template<constraint::observer_strategy< Type > ObserverStrategy>
requires (!constraint::observer<ObserverStrategy>)
composite_disposable_wrapper subscribe_with_disposable (ObserverStrategy &&observer_strategy) const
 Subscribes observer strategy to emissions from this observable.
 
composite_disposable_wrapper subscribe_with_disposable (dynamic_observer< Type > observer) const
 Subscribe passed observer to emissions from this observable.
 
template<std::invocable< Type > OnNext, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
void subscribe (OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) const
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
 
template<std::invocable< Type > OnNext, std::invocable<> OnCompleted>
void subscribe (OnNext &&on_next, OnCompleted &&on_completed) const
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
 
template<std::invocable< Type > OnNext, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
composite_disposable_wrapper subscribe_with_disposable (OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) const
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
 
template<std::invocable< Type > OnNext, std::invocable<> OnCompleted>
composite_disposable_wrapper subscribe_with_disposable (OnNext &&on_next, OnCompleted &&on_completed) const
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
 
template<std::invocable< Type > OnNext, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
composite_disposable_wrapper subscribe (const composite_disposable_wrapper &d, OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) const
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
 
template<std::invocable< Type > OnNext, std::invocable<> OnCompleted>
composite_disposable_wrapper subscribe (const composite_disposable_wrapper &d, OnNext &&on_next, OnCompleted &&on_completed) const
 Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.
 
auto as_dynamic () const &
 Convert observable to type-erased version.
 
auto as_dynamic () &&
 Convert observable to type-erased version.
 
template<typename Subscribe>
requires rpp::utils::is_base_of_v<std::decay_t<Subscribe>, rpp::operators::details::subscribe_t>
auto operator| (Subscribe &&op) const
 
template<typename Op>
requires (!rpp::utils::is_base_of_v<std::decay_t<Op>, rpp::operators::details::subscribe_t>)
rpp::constraint::observable auto operator| (Op &&op) const &
 
template<typename Op>
requires (!rpp::utils::is_base_of_v<std::decay_t<Op>, rpp::operators::details::subscribe_t>)
rpp::constraint::observable auto operator| (Op &&op) &&
 
template<typename Op>
auto pipe (Op &&op) const &
 
template<typename Op>
auto pipe (Op &&op) &&
 

Detailed Description

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
class rpp::observable< Type, Strategy >

Base class for any observable used in RPP. It handles core callbacks of observable.

Observable provides only one core function: subscribe - it accepts observer (or any way to construct it) and then invokes underlying Strategy to emit emissions somehow.

Attention
Actually observable "doesn't emit nothing", it only invokes Strategy! Strategy COULD emit emissions immediately OR place observer to some queue or something like this to obtain emissions later (for example subjects)
Expected that observable's strategy would work with observer in serialized way
Note
In case of you are need to keep some "abstract" observable of Type, you can use type-erased version: rpp::dynamic_observable
Template Parameters
Typeof value this observable would provide. Only observers of same type can be subscribed to this observable.
Strategyused to provide logic over observable's callbacks.

Member Function Documentation

◆ subscribe() [1/6]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<constraint::observer_strategy< Type > ObserverStrategy>
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe ( const composite_disposable_wrapper & d,
observer< Type, ObserverStrategy > && obs ) const
inline

Subscribe passed observer to emissions from this observable.

This overloading attaches passed disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Parameters
dis disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens
obsis observer to subscribe to this observable
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed
Example
| rpp::operators::subscribe(disposable, rpp::make_lambda_observer([](int) { std::cout << "NEW VALUE" << std::endl; }));
std::this_thread::sleep_for(std::chrono::seconds(1));
disposable.dispose();
std::this_thread::sleep_for(std::chrono::seconds(1));
static disposable_wrapper_impl make(TArgs &&... args)
Definition disposable_wrapper.hpp:164
Scheduler which schedules invoking of schedulables to another thread via queueing tasks with priority...
Definition new_thread.hpp:31
auto just(const TScheduler &scheduler, T &&item, Ts &&... items)
Creates rpp::observable that emits a particular items and completes.
Definition from.hpp:201
auto make_lambda_observer(OnNext &&on_next, OnError &&on_error={}, OnCompleted &&on_completed={}) -> lambda_observer< Type, std::decay_t< OnNext >, std::decay_t< OnError >, std::decay_t< OnCompleted > >
Constructs observer specialized with passed callbacks. Most easiesest way to construct observer "on t...
Definition lambda_observer.hpp:51
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226
auto repeat()
Repeats the Observabe's sequence of emissions infinite amount of times via re-subscribing on it durin...
Definition repeat.hpp:86
auto subscribe_on(Scheduler &&scheduler)
OnSubscribe function for this observable will be scheduled via provided scheduler.
Definition subscribe_on.hpp:75

◆ subscribe() [2/6]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<constraint::observer_strategy< Type > ObserverStrategy>
requires (!constraint::observer<ObserverStrategy>)
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe ( const composite_disposable_wrapper & d,
ObserverStrategy && observer_strategy ) const
inline

Subscribes passed observer strategy to emissions from this observable via construction of observer.

This overloading attaches passed disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Parameters
dis disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens
observer_strategyis strategy to create observer to subscribe to this observable
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed

◆ subscribe() [3/6]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<std::invocable< Type > OnNext, std::invocable<> OnCompleted>
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe ( const composite_disposable_wrapper & d,
OnNext && on_next,
OnCompleted && on_completed ) const
inline

Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.

This overloading attaches passed disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Parameters
dis disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens
on_nextis callback to handle values from this observable
on_completedis callback to handle completion of this observable
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed
Example
| rpp::operators::subscribe(disposable, [](int) { std::cout << "NEW VALUE" << std::endl; });
std::this_thread::sleep_for(std::chrono::seconds(1));
disposable.dispose();
std::this_thread::sleep_for(std::chrono::seconds(1));

◆ subscribe() [4/6]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<std::invocable< Type > OnNext, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe ( const composite_disposable_wrapper & d,
OnNext && on_next,
OnError && on_error = {},
OnCompleted && on_completed = {} ) const
inline

Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.

This overloading attaches passed disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Parameters
dis disposable to be attached to observer. If disposable is nullptr or disposed -> no any subscription happens
on_nextis callback to handle values from this observable
on_erroris callback to handle error from this observable
on_completedis callback to handle completion of this observable
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed
Example
| rpp::operators::subscribe(disposable, [](int) { std::cout << "NEW VALUE" << std::endl; });
std::this_thread::sleep_for(std::chrono::seconds(1));
disposable.dispose();
std::this_thread::sleep_for(std::chrono::seconds(1));

◆ subscribe() [5/6]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
void rpp::observable< Type, Strategy >::subscribe ( dynamic_observer< Type > observer) const
inline

Subscribe passed observer to emissions from this observable.

Special overloading for dynamic observer to enable copy of observer

◆ subscribe() [6/6]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<constraint::observer_strategy< Type > ObserverStrategy>
void rpp::observable< Type, Strategy >::subscribe ( observer< Type, ObserverStrategy > && observer) const
inline

Subscribes passed observer to emissions from this observable.

Attention
Observer must be moved in to subscribe method. (Not recommended) If you need to copy observer, convert it to dynamic_observer

◆ subscribe_with_disposable() [1/5]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe_with_disposable ( dynamic_observer< Type > observer) const
inlinenodiscard

Subscribe passed observer to emissions from this observable.

This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed

Special overloading for dynamic observer to enable copy of observer

◆ subscribe_with_disposable() [2/5]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<constraint::observer_strategy< Type > ObserverStrategy>
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe_with_disposable ( observer< Type, ObserverStrategy > && observer) const
inlinenodiscard

Subscribes passed observer to emissions from this observable.

This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed
Attention
Observer must be moved in to subscribe method. (Not recommended) If you need to copy observer, convert it to dynamic_observer

◆ subscribe_with_disposable() [3/5]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<constraint::observer_strategy< Type > ObserverStrategy>
requires (!constraint::observer<ObserverStrategy>)
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe_with_disposable ( ObserverStrategy && observer_strategy) const
inlinenodiscard

Subscribes observer strategy to emissions from this observable.

This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed

◆ subscribe_with_disposable() [4/5]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<std::invocable< Type > OnNext, std::invocable<> OnCompleted>
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe_with_disposable ( OnNext && on_next,
OnCompleted && on_completed ) const
inlinenodiscard

Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.

This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed

◆ subscribe_with_disposable() [5/5]

template<constraint::decayed_type Type, constraint::observable_strategy< Type > Strategy>
template<std::invocable< Type > OnNext, std::invocable< const std::exception_ptr & > OnError = rpp::utils::rethrow_error_t, std::invocable<> OnCompleted = rpp::utils::empty_function_t<>>
composite_disposable_wrapper rpp::observable< Type, Strategy >::subscribe_with_disposable ( OnNext && on_next,
OnError && on_error = {},
OnCompleted && on_completed = {} ) const
inlinenodiscard

Construct rpp::lambda_observer on the fly and subscribe it to emissions from this observable.

This overloading attaches disposable to observer and return it to provide ability to dispose/disconnect observer early if needed.

Warning
This overloading has some performance penalties, use it only when you really need to use disposable
Returns
composite_disposable_wrapper is disposable to be able to dispose observer when it needed

The documentation for this class was generated from the following files: