12#include <rpp/sources/fwd.hpp>
14#include <rpp/defs.hpp>
15#include <rpp/observables/observable.hpp>
16#include <rpp/operators/map.hpp>
17#include <rpp/schedulers/current_thread.hpp>
18#include <rpp/utils/utils.hpp>
28 template<constra
int::decayed_type Container>
32 template<
typename... Ts>
36 : m_container{
new Container{std::forward<Ts>(items)...}}
43 auto begin()
const {
return std::cbegin(*m_container); }
45 auto end()
const {
return std::cend(*m_container); }
48 std::shared_ptr<Container> m_container{};
53 template<constra
int::decayed_type PackedContainer, constra
int::observer_strategy<utils::iterable_value_t<PackedContainer>> Strategy>
54 rpp::schedulers::optional_delay_from_now operator()(
const observer<utils::iterable_value_t<PackedContainer>, Strategy>& obs,
const PackedContainer& cont,
size_t& index)
const
58 auto itr = std::cbegin(cont);
59 auto end = std::cend(cont);
60 std::advance(itr,
static_cast<int64_t
>(index));
64 obs.on_next(utils::as_const(*itr));
65 if (std::next(itr) != end)
76 obs.on_error(std::current_exception());
82 template<constra
int::decayed_type PackedContainer, schedulers::constra
int::scheduler TScheduler>
86 using value_type = rpp::utils::iterable_value_t<PackedContainer>;
89 template<
typename... Args>
91 : container{std::forward<Args>(args)...}
92 , scheduler{scheduler}
97 RPP_NO_UNIQUE_ADDRESS PackedContainer container;
98 RPP_NO_UNIQUE_ADDRESS TScheduler scheduler;
100 template<constra
int::observer_strategy<utils::iterable_value_t<PackedContainer>> Strategy>
101 void subscribe(
observer<utils::iterable_value_t<PackedContainer>, Strategy>&& obs)
const
103 if constexpr (std::same_as<TScheduler, schedulers::immediate>)
107 for (
const auto& v : container)
109 if (obs.is_disposed())
119 obs.on_error(std::current_exception());
124 const auto worker = scheduler.create_worker();
131 auto make_from_iterable_observable(
const TScheduler& scheduler, Args&&... args)
139 template<
typename Callable>
140 auto operator()(Callable&& fn)
const
142 if constexpr (std::same_as<utils::decayed_invoke_result_t<Callable>,
void>)
178 auto from_iterable(Iterable&& iterable,
const TScheduler& scheduler )
181 return details::make_from_iterable_observable<container>(scheduler, std::forward<Iterable>(iterable));
205 template<constraint::memory_model MemoryModel , schedulers::constraint::scheduler TScheduler,
typename T,
typename... Ts>
206 requires (constraint::decayed_same_as<T, Ts> && ...)
207 auto just(
const TScheduler& scheduler, T&& item, Ts&&... items)
209 using inner_container = std::array<std::decay_t<T>,
sizeof...(Ts) + 1>;
210 using container = std::conditional_t<std::same_as<MemoryModel, rpp::memory_model::use_stack>, inner_container, details::shared_container<inner_container>>;
211 return details::make_from_iterable_observable<container>(scheduler, std::forward<T>(item), std::forward<Ts>(items)...);
235 template<constraint::memory_model MemoryModel ,
typename T,
typename... Ts>
236 requires (constraint::decayed_same_as<T, Ts> && ...)
237 auto just(T&& item, Ts&&... items)
239 using inner_container = std::array<std::decay_t<T>,
sizeof...(Ts) + 1>;
240 using container = std::conditional_t<std::same_as<MemoryModel, rpp::memory_model::use_stack>, inner_container, details::shared_container<inner_container>>;
260 template<constraint::memory_model MemoryModel , std::invocable<> Callable>
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Schedules execution of schedulables via queueing tasks to the caller thread with priority to time_poi...
Definition current_thread.hpp:86
Definition constraints.hpp:37
Definition memory_model.hpp:31
Definition constraints.hpp:31
auto just(const TScheduler &scheduler, T &&item, Ts &&... items)
Creates rpp::observable that emits a particular items and completes.
Definition from.hpp:201
auto from_callable(Callable &&callable)
Creates rpp::specific_observable that calls provided callable and emits resulting value of this calla...
Definition from.hpp:249
auto from_iterable(Iterable &&iterable, const TScheduler &scheduler)
Creates observable that emits a items from provided iterable.
Definition from.hpp:175
Definition disposables_strategy.hpp:29
Timepoint of next execution would be calculcated from NOW timpoint (time of returning from schedulabl...
Definition fwd.hpp:36