68 static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto;
70 std::reference_wrapper<base_writer> owner{};
72 template<rpp::constra
int::decayed_same_as<TData> T>
73 void on_next(T&& message)
const
75 std::lock_guard lock{owner.get().m_write_mutex};
76 owner.get().m_write.push_back(std::forward<T>(message));
77 if (owner.get().m_write.size() == 1)
78 owner.get().start_write(owner.get().m_write.front());
81 void on_error(
const std::exception_ptr&)
const
83 std::lock_guard lock{owner.get().m_write_mutex};
84 owner.get().m_finished =
true;
86 if (owner.get().m_write.size() == 0)
87 owner.get().finish_writes(grpc::Status{grpc::StatusCode::INTERNAL,
"Internal error happens"});
89 void on_completed()
const
91 std::lock_guard lock{owner.get().m_write_mutex};
92 owner.get().m_finished =
true;
94 if (owner.get().m_write.size() == 0)
95 owner.get().finish_writes(grpc::Status::OK);
98 static constexpr bool is_disposed() {
return false; }