ReactivePlusPlus
One more implementation of ReactiveX approach in C++ with care about performance and templates in mind
 
Loading...
Searching...
No Matches
sample.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2022 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/operators/lift.hpp> // required due to operator uses lift
14#include <rpp/operators/details/early_unsubscribe.hpp>
15#include <rpp/operators/details/serialized_subscriber.hpp>
16#include <rpp/operators/fwd/sample.hpp>
17#include <rpp/subscribers/constraints.hpp>
18#include <rpp/utils/spinlock.hpp>
19
20#include <mutex>
21#include <optional>
22
23IMPLEMENTATION_FILE(sample_tag);
24
25namespace rpp::details
26{
27template<constraint::decayed_type Type>
29{
30 using early_unsubscribe_state::early_unsubscribe_state;
31
32 std::mutex value_mutex{};
33 std::optional<Type> value{};
34};
35
36template<constraint::decayed_type Type>
38{
39 using sample_state<Type>::sample_state;
40
41 utils::spinlock spinlock{};
42};
43
45{
46 template<typename Value>
47 void operator()(Value&& value, const auto&, const std::shared_ptr<sample_state<std::decay_t<Value>>>& state) const
48 {
49 std::lock_guard lock{state->value_mutex};
50 state->value.emplace(std::forward<Value>(value));
51 }
52};
53
55
57{
58 void operator()(const auto& subscriber, const auto& state) const
59 {
60 state->children_subscriptions.unsubscribe();
61
62 {
63 std::lock_guard lock{state->value_mutex};
64 if (state->value.has_value())
65 subscriber.on_next(std::move(state->value.value()));
66 }
67 subscriber.on_completed();
68 }
69};
70
71template<constraint::decayed_type Type, schedulers::constraint::scheduler TScheduler>
73{
74 schedulers::duration period;
75 TScheduler scheduler;
76
77 template<constraint::subscriber_of_type<Type> TSub>
78 auto operator()(TSub&& in_subscriber) const
79 {
80 auto state = std::make_shared<sample_state_with_serialized_spinlock<Type>>(in_subscriber.get_subscription());
81 // change subscriber to serialized to avoid manual using of mutex
82 auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber),
83 std::shared_ptr<utils::spinlock>{state, &state->spinlock});
84
85 scheduler.create_worker(state->children_subscriptions)
86 .schedule(period,
87 [period = period, subscriber = subscriber, state]() -> rpp::schedulers::optional_duration
88 {
89 std::optional<Type> extracted{};
90 {
91 std::lock_guard lock{state->value_mutex};
92 std::swap(extracted, state->value);
93 }
94 if (extracted.has_value())
95 subscriber.on_next(std::move(extracted.value()));
96 return period;
97 });
98
99 return create_subscriber_with_state<Type>(state->children_subscriptions,
103 std::move(subscriber),
104 std::move(state));
105 }
106};
107} // namespace rpp::details
Definition: early_unsubscribe.hpp:28
Definition: early_unsubscribe.hpp:19
Definition: sample.hpp:57
Definition: sample.hpp:45
Definition: sample.hpp:29
Definition: sample.hpp:73