ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
base.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/disposables/fwd.hpp>
14
15#include <rpp/utils/constraints.hpp>
16
17#include <grpcpp/support/status.h>
18
19#include "rpp/subjects/publish_subject.hpp"
20
21#include <deque>
22#include <mutex>
23
24namespace rppgrpc::details
25{
26 template<rpp::constraint::decayed_type TData>
28 {
29 public:
31 {
32 m_subject.get_observable().subscribe(typename details::base_writer<TData>::observer_strategy{*this});
33 }
34
35 virtual ~base_writer() noexcept = default;
36
37 auto get_observer() const
38 {
39 return m_subject.get_observer();
40 }
41
42 protected:
43 virtual void start_write(const TData& v) = 0;
44 virtual void finish_writes(const grpc::Status& status) = 0;
45
46 void handle_on_done()
47 {
48 m_subject.get_disposable().dispose();
49 }
50
51 void handle_write_done()
52 {
53 std::lock_guard lock{m_write_mutex};
54 m_write.pop_front();
55
56 if (!m_write.empty())
57 {
58 start_write(m_write.front());
59 }
60 else if (m_finished)
61 {
62 finish_writes(grpc::Status::OK);
63 }
64 }
65
67 {
68 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto;
69
70 std::reference_wrapper<base_writer> owner{};
71
72 template<rpp::constraint::decayed_same_as<TData> T>
73 void on_next(T&& message) const
74 {
75 std::lock_guard lock{owner.get().m_write_mutex};
76 owner.get().m_write.push_back(std::forward<T>(message));
77 if (owner.get().m_write.size() == 1)
78 owner.get().start_write(owner.get().m_write.front());
79 }
80
81 void on_error(const std::exception_ptr&) const
82 {
83 std::lock_guard lock{owner.get().m_write_mutex};
84 owner.get().m_finished = true;
85
86 if (owner.get().m_write.size() == 0)
87 owner.get().finish_writes(grpc::Status{grpc::StatusCode::INTERNAL, "Internal error happens"});
88 }
89 void on_completed() const
90 {
91 std::lock_guard lock{owner.get().m_write_mutex};
92 owner.get().m_finished = true;
93
94 if (owner.get().m_write.size() == 0)
95 owner.get().finish_writes(grpc::Status::OK);
96 }
97
98 static constexpr bool is_disposed() { return false; }
99 static constexpr void set_upstream(const rpp::disposable_wrapper&) {}
100 };
101
102 private:
104
105 std::mutex m_write_mutex{};
106 std::deque<TData> m_write{};
107 bool m_finished{};
108 };
109
110 template<rpp::constraint::decayed_type TData>
112 {
113 public:
114 base_reader() = default;
115 virtual ~base_reader() = default;
116
117 auto get_observable()
118 {
119 return m_observer.get_observable();
120 }
121
122 protected:
123 virtual void start_read(TData& data) = 0;
124
125 void handle_read_done(bool initial = false)
126 {
127 if (!initial)
128 m_observer.get_observer().on_next(m_data);
129 start_read(m_data);
130 }
131
132 void handle_on_done(std::exception_ptr err)
133 {
134 if (err)
135 m_observer.get_observer().on_error(err);
136 else
137 m_observer.get_observer().on_completed();
138 }
139
140 private:
142 RPP_NO_UNIQUE_ADDRESS TData m_data{};
143 };
144
145} // namespace rppgrpc::details
Subject which just multicasts values to observers subscribed on it. It contains two parts: observer a...
Definition publish_subject.hpp:81
Serialized version of rpp::subjects::publish_subject.
Definition publish_subject.hpp:95
Definition base.hpp:112
Definition base.hpp:28