13#include <rpp/operators/lift.hpp>
14#include <rpp/operators/details/early_unsubscribe.hpp>
15#include <rpp/operators/details/serialized_subscriber.hpp>
16#include <rpp/operators/details/subscriber_with_state.hpp>
17#include <rpp/operators/fwd/timeout.hpp>
18#include <rpp/subscribers/constraints.hpp>
19#include <rpp/utils/exceptions.hpp>
20#include <rpp/sources/error.hpp>
22#include <rpp/utils/spinlock.hpp>
26IMPLEMENTATION_FILE(timeout_tag);
30template<constra
int::observable FallbackObs>
35 , fallback_obs{fallback_obs} {}
37 FallbackObs fallback_obs;
38 std::atomic<schedulers::time_point> last_emission_time{};
40 static constexpr schedulers::time_point s_timeout_reached = schedulers::time_point::min();
43template<constra
int::observable FallbackObs,
typename Worker>
46 template<
typename Value>
50 subscriber.on_next(std::forward<Value>(v));
57template<constra
int::observable FallbackObs>
63 utils::spinlock spinlock{};
66template<constra
int::decayed_type Type, constra
int::observable_of_type<Type> FallbackObs, schedulers::constra
int::scheduler TScheduler>
69 schedulers::duration period;
70 FallbackObs fallback_obs;
73 template<constra
int::subscriber_of_type<Type> TSub>
74 auto operator()(TSub&& in_subscriber)
const
76 auto state = std::make_shared<timeout_state_with_serialized_spinlock<FallbackObs>>(fallback_obs, in_subscriber.get_subscription());
78 auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber),
79 std::shared_ptr<utils::spinlock>{state, &state->spinlock});
81 const auto worker = scheduler.create_worker(state->children_subscriptions);
82 state->last_emission_time.store(worker.now(), std::memory_order_relaxed);
84 const auto last_emission_time = state->last_emission_time.load(std::memory_order_relaxed);
85 worker.schedule(last_emission_time + period,
86 [period = period, prev_emission_time = last_emission_time, subscriber, state]()
mutable -> schedulers::optional_duration
92 if (state->last_emission_time.compare_exchange_strong(prev_emission_time,
94 std::memory_order_acq_rel))
95 return time_is_out(state, subscriber);
98 if (
const auto diff_to_schedule = (prev_emission_time + period) -
decltype(worker)::now();
99 diff_to_schedule > rpp::schedulers::duration{0})
100 return diff_to_schedule;
110 return create_subscriber_with_state<Type>(state->children_subscriptions,
114 std::move(subscriber),
119 static schedulers::optional_duration time_is_out(
const auto& state,
const auto& subscriber)
121 state->children_subscriptions.unsubscribe();
122 state->fallback_obs.subscribe(subscriber);
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
Definition: early_unsubscribe.hpp:39
Definition: early_unsubscribe.hpp:28
Definition: early_unsubscribe.hpp:19
Definition: timeout.hpp:68
Definition: timeout.hpp:45
Definition: timeout.hpp:59
Definition: timeout.hpp:32