13#include <rpp/memory_model.hpp>
14#include <rpp/schedulers/trampoline_scheduler.hpp>
15#include <rpp/sources/create.hpp>
16#include <rpp/sources/fwd.hpp>
17#include <rpp/utils/utilities.hpp>
18#include <rpp/operators/map.hpp>
19#include <rpp/utils/function_traits.hpp>
20#include <rpp/defs.hpp>
26IMPLEMENTATION_FILE(just_tag);
27IMPLEMENTATION_FILE(from_tag);
29namespace rpp::observable::details
32auto extract_iterable_from_packed(
const T & v) ->
const auto&
38auto extract_iterable_from_packed(
const std::shared_ptr<T> & v) ->
const auto&
43void iterate(
const auto& iterable,
44 const schedulers::constraint::scheduler
auto& scheduler,
45 constraint::subscriber
auto&& subscriber)
47 if constexpr (constraint::decayed_same_as<
decltype(scheduler), schedulers::immediate>)
49 for (
const auto& v : extract_iterable_from_packed(iterable))
51 if (subscriber.is_subscribed())
52 subscriber.on_next(utils::as_const(v));
56 subscriber.on_completed();
60 auto worker = scheduler.create_worker(subscriber.get_subscription());
61 worker.schedule([iterable=iterable,
62 subscriber=std::forward<
decltype(subscriber)>(subscriber),
63 index =
size_t{0}]() mutable-> schedulers::optional_duration
67 const auto& extracted_iterable = extract_iterable_from_packed(iterable);
68 const auto end = std::cend(extracted_iterable);
69 auto itr = std::cbegin(extracted_iterable);
71 std::advance(itr,
static_cast<int64_t
>(index));
75 subscriber.on_next(utils::as_const(*itr));
76 if (std::next(itr) != end)
79 return schedulers::duration{};
83 subscriber.on_completed();
87 subscriber.on_error(std::current_exception());
97template<memory_model memory_model, constraint::iterable Container,
typename ...Ts>
98auto pack_to_container(Ts&& ...items)
100 if constexpr (memory_model == memory_model::use_stack)
101 return Container{std::forward<Ts>(items)...};
104 return std::shared_ptr<Container>(
new Container{std::forward<Ts>(items)...});
107template<memory_model memory_model, constraint::decayed_type T,
typename ...Ts>
108auto pack_variadic(Ts&& ...items)
110 return pack_to_container<memory_model, std::array<T,
sizeof...(Ts)>>(std::forward<Ts>(items)...);
113template<
typename PackedIterable, schedulers::constra
int::scheduler TScheduler>
117 iterate_impl(
const PackedIterable& iterable,
const TScheduler& scheduler)
118 : m_iterable{iterable}
119 , m_scheduler{scheduler} {}
121 iterate_impl(PackedIterable&& iterable,
const TScheduler& scheduler)
122 : m_iterable{std::move(iterable)}
123 , m_scheduler{scheduler} {}
125 template<constra
int::subscriber TSub>
126 void operator()(TSub&& subscriber)
const
128 details::iterate(m_iterable, m_scheduler, std::forward<TSub>(subscriber));
132 PackedIterable m_iterable;
133 RPP_NO_UNIQUE_ADDRESS TScheduler m_scheduler;
137namespace rpp::observable
161template<memory_model memory_model ,
typename T,
typename ...Ts>
162auto just(
const schedulers::constraint::scheduler auto& scheduler, T&& item, Ts&& ...items)
requires (rpp::details::is_header_included<rpp::details::just_tag, T, Ts...> && (constraint::decayed_same_as<T, Ts> && ...))
164 return create<std::decay_t<T>>(
details::iterate_impl{details::pack_variadic<memory_model, std::decay_t<T>>(std::forward<T>(item), std::forward<Ts>(items)...), scheduler });
189template<memory_model memory_model ,
typename T,
typename ...Ts>
190auto just(T&& item, Ts&& ...items)
requires (rpp::details::is_header_included<rpp::details::just_tag, T, Ts...> && (constraint::decayed_same_as<T, Ts> && ...))
192 return just<memory_model>(schedulers::trampoline{}, std::forward<T>(item), std::forward<Ts>(items)...);
215template<memory_model memory_model , schedulers::constraint::scheduler TScheduler >
216auto from_iterable(constraint::iterable
auto&& iterable,
const TScheduler& scheduler )
requires rpp::details::is_header_included<rpp::details::from_tag, TScheduler >
218 using Container = std::decay_t<
decltype(iterable)>;
219 return create<utils::iterable_value_t<Container>>(details::iterate_impl{ details::pack_to_container<memory_model, Container>(std::forward<
decltype(iterable)>(iterable)), scheduler });
238template<memory_model memory_model >
239auto from_callable(std::invocable<>
auto&& callable)
requires rpp::details::is_header_included<rpp::details::from_tag,
decltype(callable)>
241 auto obs = just<memory_model>(std::forward<
decltype(callable)>(callable));
243 if constexpr (std::same_as<utils::decayed_invoke_result_t<
decltype(callable)>,
void>)
244 return std::move(obs).map([](
const auto& fn) { fn();
return utils::none{};});
246 return std::move(obs).map([](
const auto& fn) {
return fn(); });
Definition: constraints.hpp:31
auto from_callable(std::invocable<> auto &&callable)
Creates rpp::specific_observable that calls provided callable and emits resulting value of this calla...
Definition: from.hpp:227
auto from_iterable(constraint::iterable auto &&iterable, const TScheduler &scheduler)
Creates rpp::specific_observable that emits a items from provided iterable.
Definition: from.hpp:207