ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
Creational Operators

Creational operators are operators that create new observable. More...

Functions

template<constraint::decayed_type Type, constraint::on_subscribe_fn< Type > OnSubscribeFn>
requires rpp::details::is_header_included<rpp::details::create_tag, Type, OnSubscribeFn>
auto rpp::observable::create (OnSubscribeFn &&on_subscribe)
 Creates rpp::specific_observable with passed action as OnSubscribe.
 
template<constraint::decayed_type Type>
requires rpp::details::is_header_included<rpp::details::empty_tag, Type>
auto rpp::observable::empty ()
 Creates rpp::specific_observable that emits no items but terminates normally.
 
template<constraint::decayed_type Type>
requires rpp::details::is_header_included<rpp::details::error_tag, Type>
auto rpp::observable::error (const std::exception_ptr &err)
 Creates rpp::specific_observable that emits no items and terminates with an error.
 
template<memory_model memory_model, typename T , typename ... Ts>
requires (rpp::details::is_header_included<rpp::details::just_tag, T, Ts...> && (constraint::decayed_same_as<T, Ts> && ...))
auto rpp::observable::just (const schedulers::constraint::scheduler auto &scheduler, T &&item, Ts &&...items)
 Creates rpp::specific_observable that emits a particular items and completes.
 
template<memory_model memory_model, typename T , typename ... Ts>
requires (rpp::details::is_header_included<rpp::details::just_tag, T, Ts...> && (constraint::decayed_same_as<T, Ts> && ...))
auto rpp::observable::just (T &&item, Ts &&...items)
 Creates rpp::specific_observable that emits a particular items and completes.
 
template<memory_model memory_model, schedulers::constraint::scheduler TScheduler>
requires rpp::details::is_header_included<rpp::details::from_tag, TScheduler >
auto rpp::observable::from_iterable (constraint::iterable auto &&iterable, const TScheduler &scheduler)
 Creates rpp::specific_observable that emits a items from provided iterable.
 
template<memory_model memory_model>
requires rpp::details::is_header_included<rpp::details::from_tag, decltype(callable)>
auto rpp::observable::from_callable (std::invocable<> auto &&callable)
 Creates rpp::specific_observable that calls provided callable and emits resulting value of this callable.
 
template<schedulers::constraint::scheduler TScheduler = schedulers::trampoline>
requires rpp::details::is_header_included<rpp::details::interval_tag, TScheduler>
auto rpp::observable::interval (schedulers::duration period, const TScheduler &scheduler)
 Creates rpp::specific_observable which emits sequence of size_t every provided time interval.
 
template<schedulers::constraint::scheduler TScheduler = schedulers::trampoline>
requires rpp::details::is_header_included<rpp::details::interval_tag, TScheduler>
auto rpp::observable::interval (schedulers::duration first_delay, schedulers::duration period, const TScheduler &scheduler)
 Creates rpp::specific_observable which emits sequence of size_t every provided time interval with first emission after provided delay.
 
template<constraint::decayed_type Type>
requires rpp::details::is_header_included<rpp::details::never_tag, Type>
auto rpp::observable::never ()
 Creates rpp::specific_observable that emits no items and does not terminate.
 

Detailed Description

Creational operators are operators that create new observable.

See also
https://reactivex.io/documentation/operators.html#creating

Function Documentation

◆ create()

template<constraint::decayed_type Type, constraint::on_subscribe_fn< Type > OnSubscribeFn>
requires rpp::details::is_header_included<rpp::details::create_tag, Type, OnSubscribeFn>
auto rpp::observable::create ( OnSubscribeFn &&  on_subscribe)

Creates rpp::specific_observable with passed action as OnSubscribe.

Template Parameters
Typemanually specified type of value provided by this observable
Parameters
on_subscribeis action called after subscription on this observable
Returns
rpp::specific_observable with passed action
Examples:
rpp::source::create<int>([](const auto& sub)
{
sub.on_next(42);
})
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 42
int val = 42;
rpp::source::create<int>([val](const auto& sub)
{
sub.on_next(val);
})
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 42
rpp::source::create([](const rpp::dynamic_subscriber<int>& sub)
{
sub.on_next(42);
})
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 42
subscriber which uses dynamic_observer<T> to hide original callbacks
Definition: dynamic_subscriber.hpp:24
See also
https://reactivex.io/documentation/operators/create.html

◆ empty()

template<constraint::decayed_type Type>
requires rpp::details::is_header_included<rpp::details::empty_tag, Type>
auto rpp::observable::empty ( )

Creates rpp::specific_observable that emits no items but terminates normally.

Template Parameters
Typetype of value to specify observable
See also
https://reactivex.io/documentation/operators/empty-never-throw.html

◆ error()

template<constraint::decayed_type Type>
requires rpp::details::is_header_included<rpp::details::error_tag, Type>
auto rpp::observable::error ( const std::exception_ptr &  err)

Creates rpp::specific_observable that emits no items and terminates with an error.

Template Parameters
Typetype of value to specify observable
Parameters
errexception ptr to be sent to subscriber
See also
https://reactivex.io/documentation/operators/empty-never-throw.html

◆ from_callable()

template<memory_model memory_model>
requires rpp::details::is_header_included<rpp::details::from_tag, decltype(callable)>
auto rpp::observable::from_callable ( std::invocable<> auto &&  callable)

Creates rpp::specific_observable that calls provided callable and emits resulting value of this callable.

Template Parameters
memory_modelrpp::memory_model strategy used to handle callable
Example
rpp::source::from_callable([]() {return 49; }).subscribe([](int v) {std::cout << v << " "; });
// Output: 49
See also
https://reactivex.io/documentation/operators/from.html

◆ from_iterable()

template<memory_model memory_model, schedulers::constraint::scheduler TScheduler>
requires rpp::details::is_header_included<rpp::details::from_tag, TScheduler >
auto rpp::observable::from_iterable ( constraint::iterable auto &&  iterable,
const TScheduler &  scheduler 
)

Creates rpp::specific_observable that emits a items from provided iterable.

Template Parameters
memory_modelrpp::memory_model strategy used to handle provided iterable
Parameters
scheduleris scheduler used for scheduling of submissions: next item will be submitted to scheduler when previous one is executed
iterablecontainer with values which will be flattened
Examples:
std::vector<int> vals{ 1,2,3 };
rpp::source::from_iterable(vals).subscribe([](int v) {std::cout << v << " "; });
std::vector<int> vals{ 1,2,3 };
rpp::source::from_iterable<rpp::memory_model::use_shared>(vals).subscribe([](int v) {std::cout << v << " "; });
std::vector<int> vals{ 1,2,3 };
rpp::source::from_iterable(vals, rpp::schedulers::new_thread{}).as_blocking().subscribe([](int v) {std::cout << v << " "; });
scheduler which schedules execution of schedulables via queueing tasks to another thread with priorit...
Definition: new_thread_scheduler.hpp:32
See also
https://reactivex.io/documentation/operators/from.html

◆ interval() [1/2]

template<schedulers::constraint::scheduler TScheduler = schedulers::trampoline>
requires rpp::details::is_header_included<rpp::details::interval_tag, TScheduler>
auto rpp::observable::interval ( schedulers::duration  first_delay,
schedulers::duration  period,
const TScheduler &  scheduler 
)

Creates rpp::specific_observable which emits sequence of size_t every provided time interval with first emission after provided delay.

Parameters
first_delayperiod which would be used to delay first emission
periodperiod which would be used to delay emissions between each other
schedulerused for scheduling this periodic emissions
Returns
rpp::specific_observable which emits values with provided time_interval
Examples:
auto cur_time = std::chrono::high_resolution_clock::now();
rpp::source::interval(std::chrono::seconds{ 1}, std::chrono::seconds{ 2 })
.take(5)
.subscribe([cur_time](size_t v)
{
auto diff = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - cur_time).count();
std::cout << "Seconds since start " << diff << " value " << v << std::endl;
});
// Output: Seconds since start 1 value 0
// Seconds since start 3 value 1
// Seconds since start 5 value 2
// Seconds since start 7 value 3
// Seconds since start 9 value 4
See also
https://reactivex.io/documentation/operators/interval.html

◆ interval() [2/2]

template<schedulers::constraint::scheduler TScheduler = schedulers::trampoline>
requires rpp::details::is_header_included<rpp::details::interval_tag, TScheduler>
auto rpp::observable::interval ( schedulers::duration  period,
const TScheduler &  scheduler 
)

Creates rpp::specific_observable which emits sequence of size_t every provided time interval.

\warn First emission also scheduled and delayed with same interval

Parameters
periodperiod which would be used to delay emissions between each other
schedulerused for scheduling this periodic emissions
Returns
rpp::specific_observable which emits values with provided time_interval
Examples:
auto cur_time = std::chrono::high_resolution_clock::now();
rpp::source::interval(std::chrono::seconds{ 2 })
.take(5)
.subscribe([cur_time](size_t v)
{
auto diff = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - cur_time).count();
std::cout << "Seconds since start " << diff << " value " << v << std::endl;
});
// Output: Seconds since start 2 value 0
// Seconds since start 4 value 1
// Seconds since start 6 value 2
// Seconds since start 8 value 3
// Seconds since start 10 value 4
See also
https://reactivex.io/documentation/operators/interval.html

◆ just() [1/2]

template<memory_model memory_model, typename T , typename ... Ts>
requires (rpp::details::is_header_included<rpp::details::just_tag, T, Ts...> && (constraint::decayed_same_as<T, Ts> && ...))
auto rpp::observable::just ( const schedulers::constraint::scheduler auto &  scheduler,
T &&  item,
Ts &&...  items 
)

Creates rpp::specific_observable that emits a particular items and completes.

Template Parameters
memory_modelrpp::memory_model startegy used to handle provided items
Schedulertype of scheduler used for scheduling of submissions: next item will be submitted to scheduler when previous one is executed
Parameters
itemfirst value to be sent
itemsrest values to be sent
Returns
rpp::specific_observable with provided item
Examples:
rpp::source::just(42, 53, 10, 1).subscribe([](int v) { std::cout << v << std::endl; });
// Output: 42 53 10 1
std::array<int, 100> cheap_to_copy_1{};
std::array<int, 100> cheap_to_copy_2{};
rpp::source::just<rpp::memory_model::use_shared>(cheap_to_copy_1, cheap_to_copy_2).subscribe();
rpp::source::just(rpp::schedulers::new_thread{}, 42, 53).subscribe();
See also
https://reactivex.io/documentation/operators/just.html

◆ just() [2/2]

template<memory_model memory_model, typename T , typename ... Ts>
requires (rpp::details::is_header_included<rpp::details::just_tag, T, Ts...> && (constraint::decayed_same_as<T, Ts> && ...))
auto rpp::observable::just ( T &&  item,
Ts &&...  items 
)

Creates rpp::specific_observable that emits a particular items and completes.

Warning
this overloading uses trampoline scheduler as default
Template Parameters
memory_modelrpp::memory_model startegy used to handle provided items
Parameters
itemfirst value to be sent
itemsrest values to be sent
Returns
rpp::specific_observable with provided item
Examples:
rpp::source::just(42, 53, 10, 1).subscribe([](int v) { std::cout << v << std::endl; });
// Output: 42 53 10 1
std::array<int, 100> cheap_to_copy_1{};
std::array<int, 100> cheap_to_copy_2{};
rpp::source::just<rpp::memory_model::use_shared>(cheap_to_copy_1, cheap_to_copy_2).subscribe();
rpp::source::just(rpp::schedulers::new_thread{}, 42, 53).subscribe();
See also
https://reactivex.io/documentation/operators/just.html

◆ never()

template<constraint::decayed_type Type>
requires rpp::details::is_header_included<rpp::details::never_tag, Type>
auto rpp::observable::never ( )

Creates rpp::specific_observable that emits no items and does not terminate.

Template Parameters
Typetype of value to specify observable
See also
https://reactivex.io/documentation/operators/empty-never-throw.html