13#include <rpp/operators/lift.hpp>
14#include <rpp/operators/details/early_unsubscribe.hpp>
15#include <rpp/operators/details/serialized_subscriber.hpp>
16#include <rpp/operators/fwd/sample.hpp>
17#include <rpp/subscribers/constraints.hpp>
18#include <rpp/utils/spinlock.hpp>
23IMPLEMENTATION_FILE(sample_tag);
27template<constra
int::decayed_type Type>
30 using early_unsubscribe_state::early_unsubscribe_state;
32 std::mutex value_mutex{};
33 std::optional<Type> value{};
36template<constra
int::decayed_type Type>
41 utils::spinlock spinlock{};
46 template<
typename Value>
47 void operator()(Value&& value,
const auto&,
const std::shared_ptr<
sample_state<std::decay_t<Value>>>& state)
const
49 std::lock_guard lock{state->value_mutex};
50 state->value.emplace(std::forward<Value>(value));
58 void operator()(
const auto& subscriber,
const auto& state)
const
60 state->children_subscriptions.unsubscribe();
63 std::lock_guard lock{state->value_mutex};
64 if (state->value.has_value())
65 subscriber.on_next(std::move(state->value.value()));
67 subscriber.on_completed();
71template<constra
int::decayed_type Type, schedulers::constra
int::scheduler TScheduler>
74 schedulers::duration period;
77 template<constra
int::subscriber_of_type<Type> TSub>
78 auto operator()(TSub&& in_subscriber)
const
80 auto state = std::make_shared<sample_state_with_serialized_spinlock<Type>>(in_subscriber.get_subscription());
82 auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber),
83 std::shared_ptr<utils::spinlock>{state, &state->spinlock});
85 scheduler.create_worker(state->children_subscriptions)
87 [period = period, subscriber = subscriber, state]() -> rpp::schedulers::optional_duration
89 std::optional<Type> extracted{};
91 std::lock_guard lock{state->value_mutex};
92 std::swap(extracted, state->value);
94 if (extracted.has_value())
95 subscriber.on_next(std::move(extracted.value()));
99 return create_subscriber_with_state<Type>(state->children_subscriptions,
103 std::move(subscriber),
Definition: early_unsubscribe.hpp:28
Definition: early_unsubscribe.hpp:19
Definition: sample.hpp:57
Definition: sample.hpp:45
Definition: sample.hpp:38
Definition: sample.hpp:29
Definition: sample.hpp:73