ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
server_reactor.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/observables/fwd.hpp>
14
15#include <grpcpp/support/server_callback.h>
16#include <rppgrpc/details/base.hpp>
17#include <rppgrpc/fwd.hpp>
18#include <rppgrpc/utils/exceptions.hpp>
19
20namespace rppgrpc
21{
36 template<rpp::constraint::decayed_type Request, rpp::constraint::decayed_type Response>
37 class server_bidi_reactor final : public grpc::ServerBidiReactor<Request, Response>
38 , private details::base_writer<Response>
39 , private details::base_reader<Request>
40 {
41 using Base = grpc::ServerBidiReactor<Request, Response>;
42
43 public:
44 server_bidi_reactor()
45 {
46 Base::StartSendInitialMetadata();
47 details::base_reader<Request>::handle_read_done(true);
48 }
49
50 using details::base_writer<Response>::get_observer;
51 using details::base_reader<Request>::get_observable;
52
53 private:
54 void start_write(const Response& v) override
55 {
56 Base::StartWrite(&v);
57 }
58
59 void start_read(Request& data) override
60 {
61 Base::StartRead(&data);
62 }
63
64 void finish_writes(const grpc::Status& status) override
65 {
66 Base::Finish(status);
67 }
68
69 void OnReadDone(bool ok) override
70 {
71 if (!ok)
72 return;
73
74 details::base_reader<Request>::handle_read_done();
75 }
76
77 void OnWriteDone(bool ok) override
78 {
79 if (!ok)
80 return;
81
82 details::base_writer<Response>::handle_write_done();
83 }
84
85 void OnDone() override
86 {
87 details::base_writer<Response>::handle_on_done();
88 details::base_reader<Request>::handle_on_done({});
89
90 delete this;
91 }
92
93 void OnCancel() override
94 {
95 details::base_writer<Response>::handle_on_done();
96 details::base_reader<Request>::handle_on_done(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"}));
97
98 Base::Finish(grpc::Status::CANCELLED);
99 }
100 };
101
114 template<rpp::constraint::decayed_type Response>
115 class server_write_reactor final : public grpc::ServerWriteReactor<Response>
116 , private details::base_writer<Response>
117 , private details::base_reader<rpp::utils::none>
118 {
119 using Base = grpc::ServerWriteReactor<Response>;
120
121 public:
122 server_write_reactor()
123 {
124 Base::StartSendInitialMetadata();
125 }
126
127 using details::base_writer<Response>::get_observer;
128 using details::base_reader<rpp::utils::none>::get_observable;
129
130 private:
131 void start_write(const Response& v) override
132 {
133 Base::StartWrite(&v);
134 }
135
136 void start_read(rpp::utils::none& data) override {}
137
138 void finish_writes(const grpc::Status& status) override
139 {
140 Base::Finish(status);
141 }
142
143 void OnWriteDone(bool ok) override
144 {
145 if (!ok)
146 {
147 Base::Finish(grpc::Status::OK);
148 return;
149 }
150
151 details::base_writer<Response>::handle_write_done();
152 }
153
154 void OnDone() override
155 {
156 details::base_writer<Response>::handle_on_done();
157 details::base_reader<rpp::utils::none>::handle_on_done({});
158
159 delete this;
160 }
161
162 void OnCancel() override
163 {
164 details::base_writer<Response>::handle_on_done();
165 details::base_reader<rpp::utils::none>::handle_on_done(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"}));
166
167 Base::Finish(grpc::Status::CANCELLED);
168 }
169 };
170
182 template<rpp::constraint::decayed_type Request>
183 class server_read_reactor final : public grpc::ServerReadReactor<Request>
184 , private details::base_reader<Request>
185 {
186 using Base = grpc::ServerReadReactor<Request>;
187
188 public:
189 server_read_reactor()
190 {
191 Base::StartSendInitialMetadata();
192 details::base_reader<Request>::handle_read_done(true);
193 }
194
195 using details::base_reader<Request>::get_observable;
196
197 private:
198 void start_read(Request& data) override
199 {
200 Base::StartRead(&data);
201 }
202
203 void OnReadDone(bool ok) override
204 {
205 if (!ok)
206 {
207 Base::Finish(grpc::Status::OK);
208 return;
209 }
210
211 details::base_reader<Request>::handle_read_done();
212 }
213
214 void OnDone() override
215 {
216 details::base_reader<Request>::handle_on_done({});
217
218 delete this;
219 }
220
221 void OnCancel() override
222 {
223 details::base_reader<Request>::handle_on_done(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"}));
224 }
225 };
226} // namespace rppgrpc
Definition base.hpp:112
Definition base.hpp:28
Definition utils.hpp:25
Definition exceptions.hpp:18