ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
from.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9
10#pragma once
11
12#include <rpp/sources/fwd.hpp>
13
14#include <rpp/defs.hpp>
15#include <rpp/observables/observable.hpp>
16#include <rpp/operators/map.hpp>
17#include <rpp/schedulers/current_thread.hpp>
18#include <rpp/utils/utils.hpp>
19
20#include <array>
21#include <exception>
22#include <memory>
23#include <type_traits>
24#include <utility>
25
26namespace rpp::details
27{
28 template<constraint::decayed_type Container>
30 {
31 public:
32 template<typename... Ts>
34 explicit shared_container(Ts&&... items)
35 // raw "new" call to avoid extra copy/moves for items
36 : m_container{new Container{std::forward<Ts>(items)...}}
37 {
38 }
39
40 shared_container(const shared_container&) = default;
41 shared_container(shared_container&&) noexcept = default;
42
43 auto begin() const { return std::cbegin(*m_container); }
44
45 auto end() const { return std::cend(*m_container); }
46
47 private:
48 std::shared_ptr<Container> m_container{};
49 };
50
52 {
53 template<constraint::decayed_type PackedContainer, constraint::observer_strategy<utils::iterable_value_t<PackedContainer>> Strategy>
54 rpp::schedulers::optional_delay_from_now operator()(const observer<utils::iterable_value_t<PackedContainer>, Strategy>& obs, const PackedContainer& cont, size_t& index) const
55 {
56 try
57 {
58 auto itr = std::cbegin(cont);
59 auto end = std::cend(cont);
60 std::advance(itr, static_cast<int64_t>(index));
61
62 if (itr != end)
63 {
64 obs.on_next(utils::as_const(*itr));
65 if (std::next(itr) != end) // it was not last
66 {
67 ++index;
68 return schedulers::delay_from_now{}; // re-schedule this
69 }
70 }
71
72 obs.on_completed();
73 }
74 catch (...)
75 {
76 obs.on_error(std::current_exception());
77 }
78 return std::nullopt;
79 }
80 };
81
82 template<constraint::decayed_type PackedContainer, schedulers::constraint::scheduler TScheduler>
84 {
85 public:
86 using value_type = rpp::utils::iterable_value_t<PackedContainer>;
88
89 template<typename... Args>
90 from_iterable_strategy(const TScheduler& scheduler, Args&&... args)
91 : container{std::forward<Args>(args)...}
92 , scheduler{scheduler}
93 {
94 }
95
96
97 RPP_NO_UNIQUE_ADDRESS PackedContainer container;
98 RPP_NO_UNIQUE_ADDRESS TScheduler scheduler;
99
100 template<constraint::observer_strategy<utils::iterable_value_t<PackedContainer>> Strategy>
101 void subscribe(observer<utils::iterable_value_t<PackedContainer>, Strategy>&& obs) const
102 {
103 if constexpr (std::same_as<TScheduler, schedulers::immediate>)
104 {
105 try
106 {
107 for (const auto& v : container)
108 {
109 if (obs.is_disposed())
110 return;
111
112 obs.on_next(v);
113 }
114
115 obs.on_completed();
116 }
117 catch (...)
118 {
119 obs.on_error(std::current_exception());
120 }
121 }
122 else
123 {
124 const auto worker = scheduler.create_worker();
125 worker.schedule(from_iterable_schedulable{}, std::move(obs), container, size_t{});
126 }
127 }
128 };
129
130 template<typename PackedContainer, schedulers::constraint::scheduler TScheduler, typename... Args>
131 auto make_from_iterable_observable(const TScheduler& scheduler, Args&&... args)
132 {
134 details::from_iterable_strategy<std::decay_t<PackedContainer>, TScheduler>>{scheduler, std::forward<Args>(args)...};
135 }
136
138 {
139 template<typename Callable>
140 auto operator()(Callable&& fn) const
141 {
142 if constexpr (std::same_as<utils::decayed_invoke_result_t<Callable>, void>)
143 {
144 fn();
145 return rpp::utils::none{};
146 }
147 else
148 {
149 return fn();
150 }
151 }
152 };
153} // namespace rpp::details
154
155namespace rpp::source
156{
177 template<constraint::memory_model MemoryModel /* = memory_model::use_stack*/, constraint::iterable Iterable, schedulers::constraint::scheduler TScheduler /* = rpp::schedulers::defaults::iteration_scheduler*/>
178 auto from_iterable(Iterable&& iterable, const TScheduler& scheduler /* = TScheduler{}*/)
179 {
180 using container = std::conditional_t<std::same_as<MemoryModel, rpp::memory_model::use_stack>, std::decay_t<Iterable>, details::shared_container<std::decay_t<Iterable>>>;
181 return details::make_from_iterable_observable<container>(scheduler, std::forward<Iterable>(iterable));
182 }
183
205 template<constraint::memory_model MemoryModel /* = memory_model::use_stack */, schedulers::constraint::scheduler TScheduler, typename T, typename... Ts>
206 requires (constraint::decayed_same_as<T, Ts> && ...)
207 auto just(const TScheduler& scheduler, T&& item, Ts&&... items)
208 {
209 using inner_container = std::array<std::decay_t<T>, sizeof...(Ts) + 1>;
210 using container = std::conditional_t<std::same_as<MemoryModel, rpp::memory_model::use_stack>, inner_container, details::shared_container<inner_container>>;
211 return details::make_from_iterable_observable<container>(scheduler, std::forward<T>(item), std::forward<Ts>(items)...);
212 }
213
235 template<constraint::memory_model MemoryModel /* = memory_model::use_stack */, typename T, typename... Ts>
236 requires (constraint::decayed_same_as<T, Ts> && ...)
237 auto just(T&& item, Ts&&... items)
238 {
239 using inner_container = std::array<std::decay_t<T>, sizeof...(Ts) + 1>;
240 using container = std::conditional_t<std::same_as<MemoryModel, rpp::memory_model::use_stack>, inner_container, details::shared_container<inner_container>>;
241 return details::make_from_iterable_observable<container>(rpp::schedulers::defaults::iteration_scheduler{}, std::forward<T>(item), std::forward<Ts>(items)...);
242 }
243
260 template<constraint::memory_model MemoryModel /* = memory_model::use_stack */, std::invocable<> Callable>
261 auto from_callable(Callable&& callable)
262 {
263 return just<MemoryModel>(std::forward<Callable>(callable)) | rpp::operators::map(details::from_callable_invoke{});
264 }
265} // namespace rpp::source
Definition from.hpp:30
Base class for any observable used in RPP. It handles core callbacks of observable.
Definition observable.hpp:38
Base class for any observer used in RPP. It handles core callbacks of observers. Objects of this clas...
Definition observer.hpp:172
Schedules execution of schedulables via queueing tasks to the caller thread with priority to time_poi...
Definition current_thread.hpp:86
Definition constraints.hpp:37
Definition memory_model.hpp:31
Definition constraints.hpp:31
auto just(const TScheduler &scheduler, T &&item, Ts &&... items)
Creates rpp::observable that emits a particular items and completes.
Definition from.hpp:201
auto from_callable(Callable &&callable)
Creates rpp::specific_observable that calls provided callable and emits resulting value of this calla...
Definition from.hpp:249
auto from_iterable(Iterable &&iterable, const TScheduler &scheduler)
Creates observable that emits a items from provided iterable.
Definition from.hpp:175
auto map(Fn &&callable)
Transforms the items emitted by an Observable via applying a function to each item and emitting resul...
Definition map.hpp:94
Definition from.hpp:138
Definition disposables_strategy.hpp:29
Timepoint of next execution would be calculcated from NOW timpoint (time of returning from schedulabl...
Definition fwd.hpp:36
Definition utils.hpp:25