ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
timeout.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/lift.hpp> // required due to operator uses lift
14#include <rpp/operators/details/early_unsubscribe.hpp>
15#include <rpp/operators/details/serialized_subscriber.hpp>
16#include <rpp/operators/details/subscriber_with_state.hpp>
17#include <rpp/operators/fwd/timeout.hpp>
18#include <rpp/subscribers/constraints.hpp>
19#include <rpp/utils/exceptions.hpp>
20#include <rpp/sources/error.hpp>
21
22#include <rpp/utils/spinlock.hpp>
23
24#include <atomic>
25
26IMPLEMENTATION_FILE(timeout_tag);
27
28namespace rpp::details
29{
30template<constraint::observable FallbackObs>
32{
33 timeout_state(const FallbackObs& fallback_obs, const composite_subscription& subscription_of_subscriber)
34 : early_unsubscribe_state(subscription_of_subscriber)
35 , fallback_obs{fallback_obs} {}
36
37 FallbackObs fallback_obs;
38 std::atomic<schedulers::time_point> last_emission_time{};
39
40 static constexpr schedulers::time_point s_timeout_reached = schedulers::time_point::min();
41};
42
43template<constraint::observable FallbackObs, typename Worker>
45{
46 template<typename Value>
47 void operator()(Value&& v, const auto& subscriber, const std::shared_ptr<timeout_state<FallbackObs>>& state) const
48 {
49 if (state->last_emission_time.exchange(Worker::now(), std::memory_order_acq_rel) != timeout_state<FallbackObs>::s_timeout_reached)
50 subscriber.on_next(std::forward<Value>(v));
51 }
52};
53
56
57template<constraint::observable FallbackObs>
59{
60 using timeout_state<FallbackObs>::timeout_state;
61
62 // spinlock because most part of time there is only one thread would be active
63 utils::spinlock spinlock{};
64};
65
66template<constraint::decayed_type Type, constraint::observable_of_type<Type> FallbackObs, schedulers::constraint::scheduler TScheduler>
68{
69 schedulers::duration period;
70 FallbackObs fallback_obs;
71 TScheduler scheduler;
72
73 template<constraint::subscriber_of_type<Type> TSub>
74 auto operator()(TSub&& in_subscriber) const
75 {
76 auto state = std::make_shared<timeout_state_with_serialized_spinlock<FallbackObs>>(fallback_obs, in_subscriber.get_subscription());
77 // change subscriber to serialized to avoid manual using of mutex
78 auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber),
79 std::shared_ptr<utils::spinlock>{state, &state->spinlock});
80
81 const auto worker = scheduler.create_worker(state->children_subscriptions);
82 state->last_emission_time.store(worker.now(), std::memory_order_relaxed);
83
84 const auto last_emission_time = state->last_emission_time.load(std::memory_order_relaxed);
85 worker.schedule(last_emission_time + period,
86 [period = period, prev_emission_time = last_emission_time, subscriber, state]() mutable -> schedulers::optional_duration
87 {
88 while (true)
89 {
90 // last emission time still same value -> timeout reached, else -> prev_emission_time
91 // would be update to actual emission time
92 if (state->last_emission_time.compare_exchange_strong(prev_emission_time,
94 std::memory_order_acq_rel))
95 return time_is_out(state, subscriber);
96
97 // if we still need to wait a bit more -> let's wait
98 if (const auto diff_to_schedule = (prev_emission_time + period) - decltype(worker)::now();
99 diff_to_schedule > rpp::schedulers::duration{0})
100 return diff_to_schedule;
101
102 // okay, we here because:
103 // 1) last_emission_time was not equal to prev_emission_time
104 // 2) last_emission_time + period before now -> we are still in timeout state
105 // 3) prev_emission_time updated to last_emission_time
106 // So we can return to begin
107 }
108 });
109
110 return create_subscriber_with_state<Type>(state->children_subscriptions,
111 timeout_on_next<FallbackObs, decltype(worker)>{},
114 std::move(subscriber),
115 std::move(state));
116 }
117
118private:
119 static schedulers::optional_duration time_is_out(const auto& state, const auto& subscriber)
120 {
121 state->children_subscriptions.unsubscribe();
122 state->fallback_obs.subscribe(subscriber);
123 return std::nullopt;
124 }
125};
126} // namespace rpp::details
rpp::subscription_base with ability to add some dependent subscriptions as a part of this one: in cas...
Definition: composite_subscription.hpp:30
Definition: early_unsubscribe.hpp:39
Definition: early_unsubscribe.hpp:28
Definition: early_unsubscribe.hpp:19
Definition: timeout.hpp:68
Definition: timeout.hpp:45
Definition: timeout.hpp:32