ReactivePlusPlus
ReactiveX implementation for C++20
Loading...
Searching...
No Matches
client_reactor.hpp
1// ReactivePlusPlus library
2//
3// Copyright Aleksey Loginov 2023 - present.
4// Distributed under the Boost Software License, Version 1.0.
5// (See accompanying file LICENSE_1_0.txt or copy at
6// https://www.boost.org/LICENSE_1_0.txt)
7//
8// Project home: https://github.com/victimsnino/ReactivePlusPlus
9//
10
11#pragma once
12
13#include <rpp/observables/fwd.hpp>
14
15#include <grpcpp/support/client_callback.h>
16#include <rppgrpc/details/base.hpp>
17#include <rppgrpc/fwd.hpp>
18#include <rppgrpc/utils/exceptions.hpp>
19
20namespace rppgrpc
21{
35 template<rpp::constraint::decayed_type Request, rpp::constraint::decayed_type Response>
36 class client_bidi_reactor final : public grpc::ClientBidiReactor<Request, Response>
37 , private details::base_writer<Request>
38 , private details::base_reader<Response>
39 {
40 using Base = grpc::ClientBidiReactor<Request, Response>;
41
42 public:
43 client_bidi_reactor() = default;
44
45 void init()
46 {
47 Base::StartCall();
48 details::base_reader<Response>::handle_read_done(true);
49 }
50
51 using details::base_writer<Request>::get_observer;
52 using details::base_reader<Response>::get_observable;
53
54 private:
55 void start_read(Response& data) override
56 {
57 Base::StartRead(&data);
58 }
59
60 void start_write(const Request& v) override
61 {
62 Base::StartWrite(&v);
63 }
64
65 void finish_writes(const grpc::Status&) override
66 {
67 Base::StartWritesDone();
68 }
69
70 using Base::StartCall;
71 using Base::StartRead;
72
73 void OnReadDone(bool ok) override
74 {
75 if (!ok)
76 return;
77
78 details::base_reader<Response>::handle_read_done();
79 }
80
81 void OnWriteDone(bool ok) override
82 {
83 if (!ok)
84 return;
85
86 details::base_writer<Request>::handle_write_done();
87 }
88
89 void OnDone(const grpc::Status& s) override
90 {
91 details::base_writer<Request>::handle_on_done();
92 details::base_reader<Response>::handle_on_done(s.ok() ? std::exception_ptr{} : std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()}));
93 delete this;
94 }
95 };
96
110 template<rpp::constraint::decayed_type Request>
111 class client_write_reactor final : public grpc::ClientWriteReactor<Request>
112 , private details::base_writer<Request>
113 , private details::base_reader<rpp::utils::none>
114 {
115 using Base = grpc::ClientWriteReactor<Request>;
116
117 public:
118 client_write_reactor() = default;
119
120 void init()
121 {
122 Base::StartCall();
123 }
124
125 using details::base_writer<Request>::get_observer;
126 using details::base_reader<rpp::utils::none>::get_observable;
127
128 private:
129 void start_read(rpp::utils::none& data) override
130 {
131 }
132
133 void start_write(const Request& v) override
134 {
135 Base::StartWrite(&v);
136 }
137
138 void finish_writes(const grpc::Status&) override
139 {
140 Base::StartWritesDone();
141 }
142
143 using Base::StartCall;
144
145 void OnWriteDone(bool ok) override
146 {
147 if (!ok)
148 return;
149
150 details::base_writer<Request>::handle_write_done();
151 }
152
153 void OnDone(const grpc::Status& s) override
154 {
155 details::base_writer<Request>::handle_on_done();
156 details::base_reader<rpp::utils::none>::handle_on_done(s.ok() ? std::exception_ptr{} : std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()}));
157
158 delete this;
159 }
160 };
161
174 template<rpp::constraint::decayed_type Response>
175 class client_read_reactor final : public grpc::ClientReadReactor<Response>
176 , private details::base_reader<Response>
177 {
178 using Base = grpc::ClientReadReactor<Response>;
179
180 public:
181 client_read_reactor() = default;
182
183 void init()
184 {
185 Base::StartCall();
186 details::base_reader<Response>::handle_read_done(true);
187 }
188
189 using details::base_reader<Response>::get_observable;
190
191 private:
192 using Base::StartCall;
193 using Base::StartRead;
194
195 void start_read(Response& data) override
196 {
197 Base::StartRead(&data);
198 }
199
200 void OnReadDone(bool ok) override
201 {
202 if (!ok)
203 return;
204
205 details::base_reader<Response>::handle_read_done();
206 }
207
208 void OnDone(const grpc::Status& s) override
209 {
210 details::base_reader<Response>::handle_on_done(s.ok() ? std::exception_ptr{} : std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()}));
211 delete this;
212 }
213 };
214} // namespace rppgrpc
Definition utils.hpp:25
Definition exceptions.hpp:18