ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
Creational Operators

Creational operators are operators that create new observable. More...

Functions

auto rpp::operators::concat ()
 Make observable which would merge emissions from underlying observables but without overlapping (current observable completes THEN next started to emit its values)
 
template<constraint::memory_model MemoryModel, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires (std::same_as<rpp::utils::extract_observable_type_t<TObservable>, rpp::utils::extract_observable_type_t<TObservables>> && ...)
auto rpp::source::concat (TObservable &&obs, TObservables &&... others)
 Make observable which would merge emissions from underlying observables but without overlapping (current observable completes THEN next started to emit its values)
 
template<constraint::memory_model MemoryModel, constraint::iterable Iterable>
requires constraint::observable<utils::iterable_value_t<Iterable>>
auto rpp::source::concat (Iterable &&iterable)
 Make observable which would merge emissions from underlying observables but without overlapping (current observable completes THEN next started to emit its values)
 
template<constraint::decayed_type Type, constraint::on_subscribe< Type > OnSubscribe>
auto rpp::source::create (OnSubscribe &&on_subscribe)
 Construct observable specialized with passed callback function. Most easiesest way to construct observable "on the fly" via lambda and etc.
 
template<std::invocable Factory>
requires rpp::constraint::observable<std::invoke_result_t<Factory>>
auto rpp::source::defer (Factory &&observable_factory)
 Creates rpp::observable that calls the specified observable factory to create an observable for each new observer that subscribes.
 
template<constraint::decayed_type Type>
auto rpp::source::empty ()
 Creates rpp::observable that emits no items but terminates normally.
 
template<constraint::decayed_type Type>
auto rpp::source::error (std::exception_ptr err)
 Creates rpp::observable that emits no items and terminates with an error.
 
template<constraint::memory_model MemoryModel, constraint::iterable Iterable, schedulers::constraint::scheduler TScheduler>
auto rpp::source::from_iterable (Iterable &&iterable, const TScheduler &scheduler)
 Creates observable that emits a items from provided iterable.
 
template<constraint::memory_model MemoryModel, schedulers::constraint::scheduler TScheduler, typename T, typename... Ts>
requires (constraint::decayed_same_as<T, Ts> && ...)
auto rpp::source::just (const TScheduler &scheduler, T &&item, Ts &&... items)
 Creates rpp::observable that emits a particular items and completes.
 
template<constraint::memory_model MemoryModel, typename T, typename... Ts>
requires (constraint::decayed_same_as<T, Ts> && ...)
auto rpp::source::just (T &&item, Ts &&... items)
 Creates rpp::observable that emits a particular items and completes.
 
template<constraint::memory_model MemoryModel, std::invocable<> Callable>
auto rpp::source::from_callable (Callable &&callable)
 Creates rpp::specific_observable that calls provided callable and emits resulting value of this callable.
 
template<schedulers::constraint::scheduler TScheduler>
auto rpp::source::interval (rpp::schedulers::duration initial, rpp::schedulers::duration period, TScheduler &&scheduler)
 Creates rpp::observable that emits a sequential integer every specified time interval, on the specified scheduler.
 
template<schedulers::constraint::scheduler TScheduler>
auto rpp::source::interval (rpp::schedulers::time_point initial, rpp::schedulers::duration period, TScheduler &&scheduler)
 Same rpp::source::interval but using a time_point as initial time instead of a duration.
 
template<schedulers::constraint::scheduler TScheduler>
auto rpp::source::interval (rpp::schedulers::duration period, TScheduler &&scheduler)
 Creates rpp::observable that emits a sequential integer every specified time interval, on the specified scheduler.
 
template<constraint::decayed_type Type>
auto rpp::source::never ()
 Creates rpp::observable that emits no items and does not terminate.
 
template<schedulers::constraint::scheduler TScheduler>
auto rpp::source::timer (rpp::schedulers::duration when, TScheduler &&scheduler)
 Creates rpp::observable that emits an integer after a given delay, on the specified scheduler.
 
template<schedulers::constraint::scheduler TScheduler>
auto rpp::source::timer (rpp::schedulers::time_point when, TScheduler &&scheduler)
 Same as rpp::source::timer but using a time_point as delay instead of a duration.
 

Detailed Description

Creational operators are operators that create new observable.

See also
https://reactivex.io/documentation/operators.html#creating

Function Documentation

◆ concat() [1/3]

auto rpp::operators::concat ( )
inline

Make observable which would merge emissions from underlying observables but without overlapping (current observable completes THEN next started to emit its values)

Actually it subscribes on first observable from emissions. When first observable completes, then it subscribes on second observable from emissions and etc...

Template Parameters
MemoryModelrpp::memory_model strategy used to handle provided observables
Note
#include <rpp/operators/concat.hpp>
Example
rpp::source::just(2).as_dynamic(),
rpp::source::just(1, 2, 3).as_dynamic())
| rpp::operators::subscribe([](int v) { std::cout << v << ", "; }, [](const std::exception_ptr&) {}, []() { std::cout << "completed\n"; });
// Output: 1, 2, 1, 2, 3, completed
See also
https://reactivex.io/documentation/operators/concat.html
Examples
concat.cpp.

◆ concat() [2/3]

template<constraint::memory_model MemoryModel, constraint::iterable Iterable>
requires constraint::observable<utils::iterable_value_t<Iterable>>
auto rpp::source::concat ( Iterable && iterable)

Make observable which would merge emissions from underlying observables but without overlapping (current observable completes THEN next started to emit its values)

Actually it subscribes on first observable from emissions. When first observable completes, then it subscribes on second observable from emissions and etc...

Parameters
iterableis container with observables to subscribe on
Template Parameters
MemoryModelrpp::memory_model strategy used to handle provided observables
Note
#include <rpp/operators/concat.hpp>
Example
auto observables = std::vector{rpp::source::just(1), rpp::source::just(2)};
rpp::source::concat<rpp::memory_model::use_shared>(observables).subscribe([](int v) { std::cout << v << ", "; }, [](const std::exception_ptr&) {}, []() { std::cout << "completed\n"; });
// Output: 1, 2, completed
See also
https://reactivex.io/documentation/operators/concat.html

◆ concat() [3/3]

template<constraint::memory_model MemoryModel, rpp::constraint::observable TObservable, rpp::constraint::observable... TObservables>
requires (std::same_as<rpp::utils::extract_observable_type_t<TObservable>, rpp::utils::extract_observable_type_t<TObservables>> && ...)
auto rpp::source::concat ( TObservable && obs,
TObservables &&... others )

Make observable which would merge emissions from underlying observables but without overlapping (current observable completes THEN next started to emit its values)

Actually it subscribes on first observable from emissions. When first observable completes, then it subscribes on second observable from emissions and etc...

Parameters
obsfirst observalbe to subscribe on
othersrest list of observables to subscribe on
Template Parameters
MemoryModelrpp::memory_model strategy used to handle provided observables
Note
#include <rpp/operators/concat.hpp>
Example
rpp::source::concat(rpp::source::just(1), rpp::source::just(2), rpp::source::just(1, 2, 3)).subscribe([](int v) { std::cout << v << ", "; }, [](const std::exception_ptr&) {}, []() { std::cout << "completed\n"; });
// Output: 1, 2, 1, 2, 3, completed
See also
https://reactivex.io/documentation/operators/concat.html
Examples
concat.cpp, and retry.cpp.

◆ create()

template<constraint::decayed_type Type, constraint::on_subscribe< Type > OnSubscribe>
auto rpp::source::create ( OnSubscribe && on_subscribe)

Construct observable specialized with passed callback function. Most easiesest way to construct observable "on the fly" via lambda and etc.

Warning
Be sure, that your callback doesn't violates observable rules: 1) observable must to emit emissions in serial way 2) observable must not to call any callbacks after termination events - on_error/on_completed
Keep in mind, obtained observer is non-copyable, but movable by default. So, prefer perfect-forwarding. In case of you need to copy observer, cast it it dynamic_observer via passing it as argument type or via as_dynamic() member function
Template Parameters
Typeis type of values observable would emit
OnSubscribeis callback function to implement core logic of observable
Examples:
rpp::source::create<int>([](const auto& sub) {
sub.on_next(42);
})
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 42
int val = 42;
rpp::source::create<int>([val](const auto& sub) {
sub.on_next(val);
})
.subscribe([](int v) { std::cout << v << std::endl; });
// Output: 42
See also
https://reactivex.io/documentation/operators/create.html
Examples
create.cpp, defer.cpp, delay.cpp, observe_on.cpp, ref_count.cpp, retry_when.cpp, and subscribe_on.cpp.

◆ defer()

template<std::invocable Factory>
requires rpp::constraint::observable<std::invoke_result_t<Factory>>
auto rpp::source::defer ( Factory && observable_factory)

Creates rpp::observable that calls the specified observable factory to create an observable for each new observer that subscribes.

Parameters
observable_factoryis function to create observable to subscribe on.
Example:
std::cout << "Observable factory called\n";
return rpp::source::from_iterable(std::array{ 1,2,3 }); })
.subscribe([](int v) { std::cout << v << "\n"; }, rpp::utils::rethrow_error_t{}, []() { std::cout << "On complete\n"; });
// Output: Observable factory called
// 1
// 2
// 3
// On complete
See also
https://reactivex.io/documentation/operators/defer.html
Examples
defer.cpp.

◆ empty()

template<constraint::decayed_type Type>
auto rpp::source::empty ( )

Creates rpp::observable that emits no items but terminates normally.

Template Parameters
Typetype of value to specify observable
See also
https://reactivex.io/documentation/operators/empty-never-throw.html
Examples
first.cpp, and last.cpp.

◆ error()

template<constraint::decayed_type Type>
auto rpp::source::error ( std::exception_ptr err)

Creates rpp::observable that emits no items and terminates with an error.

Template Parameters
Typetype of value to specify observable
Parameters
errexception ptr to be sent to subscriber
See also
https://reactivex.io/documentation/operators/empty-never-throw.html

◆ from_callable()

template<constraint::memory_model MemoryModel, std::invocable<> Callable>
auto rpp::source::from_callable ( Callable && callable)

Creates rpp::specific_observable that calls provided callable and emits resulting value of this callable.

Template Parameters
memory_modelrpp::memory_model strategy used to handle callable
Example
rpp::source::from_callable([]() { return 49; }).subscribe([](int v) { std::cout << v << " "; });
// Output: 49
See also
https://reactivex.io/documentation/operators/from.html
Examples
from.cpp, and readme.cpp.

◆ from_iterable()

template<constraint::memory_model MemoryModel, constraint::iterable Iterable, schedulers::constraint::scheduler TScheduler>
auto rpp::source::from_iterable ( Iterable && iterable,
const TScheduler & scheduler )

Creates observable that emits a items from provided iterable.

Template Parameters
memory_modelrpp::memory_model strategy used to handle provided iterable
Parameters
scheduleris scheduler used for scheduling of submissions: next item will be submitted to scheduler when previous one is executed
iterablecontainer with values which will be flattened
Examples:
std::vector<int> vals{1, 2, 3};
rpp::source::from_iterable(vals).subscribe([](int v) { std::cout << v << " "; });
std::vector<int> vals{1, 2, 3};
rpp::source::from_iterable<rpp::memory_model::use_shared>(vals).subscribe([](int v) { std::cout << v << " "; });
std::vector<int> vals{1, 2, 3};
rpp::source::from_iterable(vals, rpp::schedulers::immediate{}).subscribe([](int v) { std::cout << v << " "; });
rpp::source::from_iterable(vals, rpp::schedulers::current_thread{}).subscribe([](int v) { std::cout << v << " "; });
See also
https://reactivex.io/documentation/operators/from.html
Examples
defer.cpp, from.cpp, and take.cpp.

◆ interval() [1/3]

template<schedulers::constraint::scheduler TScheduler>
auto rpp::source::interval ( rpp::schedulers::duration initial,
rpp::schedulers::duration period,
TScheduler && scheduler )

Creates rpp::observable that emits a sequential integer every specified time interval, on the specified scheduler.

Parameters
initialduration before first emission
periodperiod between emitted values
schedulerthe scheduler to use for scheduling the items
Example:
rpp::source::interval(std::chrono::milliseconds(10), rpp::schedulers::immediate{})
[start = rpp::schedulers::clock_type::now()](size_t v) { std::cout << "emit " << v << " duration since start " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << "ms\n"; },
[]() { std::cout << "On complete\n"; });
// Output: Observable factory called
// emit 1 duration since start 0ms
// emit 2 duration since start 10ms
// emit 3 duration since start 20ms
// On complete
See also
https://reactivex.io/documentation/operators/interval.html
Examples
connect.cpp, interval.cpp, and take_until.cpp.

◆ interval() [2/3]

template<schedulers::constraint::scheduler TScheduler>
auto rpp::source::interval ( rpp::schedulers::duration period,
TScheduler && scheduler )

Creates rpp::observable that emits a sequential integer every specified time interval, on the specified scheduler.

Parameters
periodperiod between emitted values
schedulerthe scheduler to use for scheduling the items
Example:
rpp::source::interval(std::chrono::milliseconds(10), rpp::schedulers::immediate{})
[start = rpp::schedulers::clock_type::now()](size_t v) { std::cout << "emit " << v << " duration since start " << std::chrono::duration_cast<std::chrono::milliseconds>(rpp::schedulers::clock_type::now() - start).count() << "ms\n"; },
[]() { std::cout << "On complete\n"; });
// Output: Observable factory called
// emit 1 duration since start 0ms
// emit 2 duration since start 10ms
// emit 3 duration since start 20ms
// On complete
See also
https://reactivex.io/documentation/operators/interval.html

◆ interval() [3/3]

template<schedulers::constraint::scheduler TScheduler>
auto rpp::source::interval ( rpp::schedulers::time_point initial,
rpp::schedulers::duration period,
TScheduler && scheduler )

Same rpp::source::interval but using a time_point as initial time instead of a duration.

Parameters
initialtime_point before first emission
periodperiod between emitted values
schedulerthe scheduler to use for scheduling the items
See also
https://reactivex.io/documentation/operators/interval.html

◆ just() [1/2]

template<constraint::memory_model MemoryModel, schedulers::constraint::scheduler TScheduler, typename T, typename... Ts>
requires (constraint::decayed_same_as<T, Ts> && ...)
auto rpp::source::just ( const TScheduler & scheduler,
T && item,
Ts &&... items )

Creates rpp::observable that emits a particular items and completes.

Template Parameters
memory_modelrpp::memory_model startegy used to handle provided items
Schedulertype of scheduler used for scheduling of submissions: next item will be submitted to scheduler when previous one is executed
Parameters
itemfirst value to be sent
itemsrest values to be sent
Examples:
rpp::source::just(42, 53, 10, 1).subscribe([](int v) { std::cout << v << std::endl; });
// Output: 42 53 10 1
std::array<int, 100> expensive_to_copy_1{};
std::array<int, 100> expensive_to_copy_2{};
rpp::source::just<rpp::memory_model::use_shared>(expensive_to_copy_1, expensive_to_copy_2).subscribe([](const auto&) {});
See also
https://reactivex.io/documentation/operators/just.html
Examples
as_blocking.cpp, buffer.cpp, combine_latest.cpp, concat.cpp, debounce.cpp, distinct_until_changed.cpp, filter.cpp, first.cpp, group_by.cpp, just.cpp, last.cpp, map.cpp, merge.cpp, multicast.cpp, readme.cpp, reduce.cpp, ref_count.cpp, repeat.cpp, scan.cpp, skip.cpp, start_with.cpp, switch_on_next.cpp, take_last.cpp, take_while.cpp, thread_pool.cpp, throttle.cpp, timeout.cpp, window.cpp, window_toggle.cpp, and with_latest_from.cpp.

◆ just() [2/2]

template<constraint::memory_model MemoryModel, typename T, typename... Ts>
requires (constraint::decayed_same_as<T, Ts> && ...)
auto rpp::source::just ( T && item,
Ts &&... items )

Creates rpp::observable that emits a particular items and completes.

Warning
this overloading uses trampoline scheduler as default
Template Parameters
memory_modelrpp::memory_model strategy used to handle provided items
Parameters
itemfirst value to be sent
itemsrest values to be sent
Examples:
rpp::source::just(42, 53, 10, 1).subscribe([](int v) { std::cout << v << std::endl; });
// Output: 42 53 10 1
std::array<int, 100> expensive_to_copy_1{};
std::array<int, 100> expensive_to_copy_2{};
rpp::source::just<rpp::memory_model::use_shared>(expensive_to_copy_1, expensive_to_copy_2).subscribe([](const auto&) {});
See also
https://reactivex.io/documentation/operators/just.html

◆ never()

template<constraint::decayed_type Type>
auto rpp::source::never ( )

Creates rpp::observable that emits no items and does not terminate.

Template Parameters
Typetype of value to specify observable
See also
https://reactivex.io/documentation/operators/empty-never-throw.html
Examples
take_until.cpp.

◆ timer() [1/2]

template<schedulers::constraint::scheduler TScheduler>
auto rpp::source::timer ( rpp::schedulers::duration when,
TScheduler && scheduler )

Creates rpp::observable that emits an integer after a given delay, on the specified scheduler.

Parameters
whenduration from now when the value is emitted
schedulerthe scheduler to use for scheduling the items
See also
https://reactivex.io/documentation/operators/timer.html
Examples
retry_when.cpp.

◆ timer() [2/2]

template<schedulers::constraint::scheduler TScheduler>
auto rpp::source::timer ( rpp::schedulers::time_point when,
TScheduler && scheduler )

Same as rpp::source::timer but using a time_point as delay instead of a duration.

Parameters
whentime point when the value is emitted
schedulerthe scheduler to use for scheduling the items
See also
https://reactivex.io/documentation/operators/timer.html