ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
Observables

Observable is the source of any Reactive Stream. More...

Concepts

concept  rpp::constraint::observable_strategy
 A concept that defines the requirements for an observable strategy.
 

Classes

class  rpp::blocking_observable< Type, Strategy >
 Extension over rpp::observable with set of blocking operators - it waits till completion of underlying observable. More...
 
class  rpp::connectable_observable< OriginalObservable, Subject >
 Extension over raw observable with ability to be manually connected at any time or ref_counting (sharing same observable between multiple observers) More...
 
class  rpp::dynamic_observable< Type >
 Type-erased version of the rpp::observable. Any observable can be converted to dynamic_observable via rpp::observable::as_dynamic member function. More...
 
class  rpp::grouped_observable< KeyType, Type, Strategy >
 Extension over rpp::observable for some "subset" of values from original observable grouped by some key. It has get_key() member function. Used in group_by operator to represent grouped observable. More...
 
class  rpp::observable< Type, Strategy >
 Base class for any observable used in RPP. It handles core callbacks of observable. More...
 
class  rpp::variant_observable< Type, Observables >
 Extension over rpp::observable to provide ability statically keep one of multiple observables. More...
 

Detailed Description

Observable is the source of any Reactive Stream.

Observable is the source of any Reactive Stream. Observable provides ability to subscribe observer on stream of events. After subscription observable would emit values to observer.

Reactive programming has an Observable Contract. Please read it.

This contract includes:

‍Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

RPP follows this contract, meaning:

  1. All RPP operators follow this contract.
    All built-in RPP observables/operators emit emissions serially
  2. User-provided callbacks can be non-thread-safe due to the thread-safety of the observable.
    For example: internal logic of take operator doesn't use mutexes or atomics due to underlying observable MUST emit items serially
  3. When implementing your own operator via create, follow this contract!
  4. This is true EXCEPT FOR subjects if used manually. Use serialized_* instead if you can't guarantee serial emissions.

For example:

| rpp::operators::map([](int v)
{
std::cout << "enter " << v << std::endl;
std::this_thread::sleep_for(std::chrono::seconds{1});
std::cout << "exit " << v << std::endl;
return v;
})
Scheduler which schedules invoking of schedulables to another thread via queueing tasks with priority...
Definition new_thread.hpp:31
auto merge_with(TObservable &&observable, TObservables &&... observables)
Combines submissions from current observable with other observables into one.
Definition merge.hpp:252
auto just(const TScheduler &scheduler, T &&item, Ts &&... items)
Creates rpp::observable that emits a particular items and completes.
Definition from.hpp:201
auto map(Fn &&callable)
Transforms the items emitted by an Observable via applying a function to each item and emitting resul...
Definition map.hpp:94
auto subscribe(observer< Type, ObserverStrategy > &&observer)
Subscribes passed observer to emissions from this observable.
Definition subscribe.hpp:226
auto repeat()
Repeats the Observabe's sequence of emissions infinite amount of times via re-subscribing on it durin...
Definition repeat.hpp:86
auto as_blocking()
Converts rpp::observable to rpp::blocking_observable
Definition as_blocking.hpp:47
auto subscribe_on(Scheduler &&scheduler)
OnSubscribe function for this observable will be scheduled via provided scheduler.
Definition subscribe_on.hpp:75

This will never produce:

enter 1
enter 2
exit 2
exit 1

Only serially:

enter 1
exit 1
enter 1
exit 1
enter 2
exit 2
enter 2
exit 2
See also
https://reactivex.io/documentation/observable.html