ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
from.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/memory_model.hpp>
14#include <rpp/schedulers/trampoline_scheduler.hpp>
15#include <rpp/sources/create.hpp>
16#include <rpp/sources/fwd.hpp>
17#include <rpp/utils/utilities.hpp>
18#include <rpp/operators/map.hpp>
19#include <rpp/utils/function_traits.hpp>
20#include <rpp/defs.hpp>
21
22
23#include <array>
24#include <type_traits>
25
26IMPLEMENTATION_FILE(just_tag);
27IMPLEMENTATION_FILE(from_tag);
28
29namespace rpp::observable::details
30{
31template<typename T>
32auto extract_iterable_from_packed(const T & v) -> const auto&
33{
34 return v;
35}
36
37template<typename T>
38auto extract_iterable_from_packed(const std::shared_ptr<T> & v) -> const auto&
39{
40 return *v;
41}
42
43void iterate(const auto& iterable,
44 const schedulers::constraint::scheduler auto& scheduler,
45 constraint::subscriber auto&& subscriber)
46{
47 if constexpr (constraint::decayed_same_as<decltype(scheduler), schedulers::immediate>)
48 {
49 for (const auto& v : extract_iterable_from_packed(iterable))
50 {
51 if (subscriber.is_subscribed())
52 subscriber.on_next(utils::as_const(v));
53 else
54 return;
55 }
56 subscriber.on_completed();
57 }
58 else
59 {
60 auto worker = scheduler.create_worker(subscriber.get_subscription());
61 worker.schedule([iterable=iterable,
62 subscriber=std::forward<decltype(subscriber)>(subscriber),
63 index = size_t{0}]() mutable-> schedulers::optional_duration
64 {
65 try
66 {
67 const auto& extracted_iterable = extract_iterable_from_packed(iterable);
68 const auto end = std::cend(extracted_iterable);
69 auto itr = std::cbegin(extracted_iterable);
70
71 std::advance(itr, static_cast<int64_t>(index));
72
73 if (itr != end)
74 {
75 subscriber.on_next(utils::as_const(*itr));
76 if (std::next(itr) != end) // it was not last
77 {
78 ++index;
79 return schedulers::duration{}; // re-schedule this
80 }
81 }
82
83 subscriber.on_completed();
84 }
85 catch(...)
86 {
87 subscriber.on_error(std::current_exception());
88 }
89 return std::nullopt;
90
91 });
92 }
93}
94
95
96
97template<memory_model memory_model, constraint::iterable Container, typename ...Ts>
98auto pack_to_container(Ts&& ...items)
99{
100 if constexpr (memory_model == memory_model::use_stack)
101 return Container{std::forward<Ts>(items)...};
102 else
103 // raw new call to avoid extra copy/moves for items
104 return std::shared_ptr<Container>(new Container{std::forward<Ts>(items)...});
105}
106
107template<memory_model memory_model, constraint::decayed_type T, typename ...Ts>
108auto pack_variadic(Ts&& ...items)
109{
110 return pack_to_container<memory_model, std::array<T, sizeof...(Ts)>>(std::forward<Ts>(items)...);
111}
112
113template<typename PackedIterable, schedulers::constraint::scheduler TScheduler>
115{
116public:
117 iterate_impl(const PackedIterable& iterable, const TScheduler& scheduler)
118 : m_iterable{iterable}
119 , m_scheduler{scheduler} {}
120
121 iterate_impl(PackedIterable&& iterable, const TScheduler& scheduler)
122 : m_iterable{std::move(iterable)}
123 , m_scheduler{scheduler} {}
124
125 template<constraint::subscriber TSub>
126 void operator()(TSub&& subscriber) const
127 {
128 details::iterate(m_iterable, m_scheduler, std::forward<TSub>(subscriber));
129 }
130
131private:
132 PackedIterable m_iterable;
133 RPP_NO_UNIQUE_ADDRESS TScheduler m_scheduler;
134};
135} // namespace rpp::observable::details
136
137namespace rpp::observable
138{
161template<memory_model memory_model /* = memory_model::use_stack */, typename T, typename ...Ts>
162auto just(const schedulers::constraint::scheduler auto& scheduler, T&& item, Ts&& ...items) requires (rpp::details::is_header_included<rpp::details::just_tag, T, Ts...> && (constraint::decayed_same_as<T, Ts> && ...))
163{
164 return create<std::decay_t<T>>(details::iterate_impl{details::pack_variadic<memory_model, std::decay_t<T>>(std::forward<T>(item), std::forward<Ts>(items)...), scheduler });
165}
166
189template<memory_model memory_model /* = memory_model::use_stack */, typename T, typename ...Ts>
190auto just(T&& item, Ts&& ...items) requires (rpp::details::is_header_included<rpp::details::just_tag, T, Ts...> && (constraint::decayed_same_as<T, Ts> && ...))
191{
192 return just<memory_model>(schedulers::trampoline{}, std::forward<T>(item), std::forward<Ts>(items)...);
193}
194
215template<memory_model memory_model /* = memory_model::use_stack */, schedulers::constraint::scheduler TScheduler /* = schedulers::trampoline */>
216auto from_iterable(constraint::iterable auto&& iterable, const TScheduler& scheduler /* = TScheduler{} */) requires rpp::details::is_header_included<rpp::details::from_tag, TScheduler >
217{
218 using Container = std::decay_t<decltype(iterable)>;
219 return create<utils::iterable_value_t<Container>>(details::iterate_impl{ details::pack_to_container<memory_model, Container>(std::forward<decltype(iterable)>(iterable)), scheduler });
220}
221
238template<memory_model memory_model /* = memory_model::use_stack */>
239auto from_callable(std::invocable<> auto&& callable) requires rpp::details::is_header_included<rpp::details::from_tag, decltype(callable)>
240{
241 auto obs = just<memory_model>(std::forward<decltype(callable)>(callable));
242
243 if constexpr (std::same_as<utils::decayed_invoke_result_t<decltype(callable)>, void>)
244 return std::move(obs).map([](const auto& fn) { fn(); return utils::none{};});
245 else
246 return std::move(obs).map([](const auto& fn) { return fn(); });
247}
248} // namespace rpp::observable
Definition: constraints.hpp:31
auto from_callable(std::invocable<> auto &&callable)
Creates rpp::specific_observable that calls provided callable and emits resulting value of this calla...
Definition: from.hpp:227
auto from_iterable(constraint::iterable auto &&iterable, const TScheduler &scheduler)
Creates rpp::specific_observable that emits a items from provided iterable.
Definition: from.hpp:207