aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h
diff options
context:
space:
mode:
authordvshkurko <dvshkurko@yandex-team.ru>2022-02-10 16:45:52 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:52 +0300
commitc768a99151e47c3a4bb7b92c514d256abd301c4d (patch)
tree1a2c5ffcf89eb53ecd79dbc9bc0a195c27404d0c /contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h
parent321ee9bce31ec6e238be26dbcbe539cffa2c3309 (diff)
downloadydb-c768a99151e47c3a4bb7b92c514d256abd301c4d.tar.gz
Restoring authorship annotation for <dvshkurko@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h')
-rw-r--r--contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h1330
1 files changed, 665 insertions, 665 deletions
diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h
index 9aa12fcc81..8120fcaf85 100644
--- a/contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h
+++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h
@@ -1,209 +1,209 @@
-/*
- *
- * Copyright 2019 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
-#define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
-
-#include <grpcpp/impl/codegen/message_allocator.h>
-#include <grpcpp/impl/codegen/rpc_service_method.h>
+/*
+ *
+ * Copyright 2019 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
+#define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
+
+#include <grpcpp/impl/codegen/message_allocator.h>
+#include <grpcpp/impl/codegen/rpc_service_method.h>
#include <grpcpp/impl/codegen/server_callback.h>
#include <grpcpp/impl/codegen/server_context.h>
-#include <grpcpp/impl/codegen/status.h>
-
+#include <grpcpp/impl/codegen/status.h>
+
namespace grpc {
-namespace internal {
-
-template <class RequestType, class ResponseType>
-class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
- public:
- explicit CallbackUnaryHandler(
+namespace internal {
+
+template <class RequestType, class ResponseType>
+class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
+ public:
+ explicit CallbackUnaryHandler(
std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
- const RequestType*, ResponseType*)>
- get_reactor)
- : get_reactor_(std::move(get_reactor)) {}
-
- void SetMessageAllocator(
- ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
- allocator) {
- allocator_ = allocator;
- }
-
- void RunHandler(const HandlerParameter& param) final {
- // Arena allocate a controller structure (that includes request/response)
- ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
- auto* allocator_state = static_cast<
- ::grpc::experimental::MessageHolder<RequestType, ResponseType>*>(
- param.internal_data);
-
- auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
- param.call->call(), sizeof(ServerCallbackUnaryImpl)))
- ServerCallbackUnaryImpl(
+ const RequestType*, ResponseType*)>
+ get_reactor)
+ : get_reactor_(std::move(get_reactor)) {}
+
+ void SetMessageAllocator(
+ ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
+ allocator) {
+ allocator_ = allocator;
+ }
+
+ void RunHandler(const HandlerParameter& param) final {
+ // Arena allocate a controller structure (that includes request/response)
+ ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
+ auto* allocator_state = static_cast<
+ ::grpc::experimental::MessageHolder<RequestType, ResponseType>*>(
+ param.internal_data);
+
+ auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
+ param.call->call(), sizeof(ServerCallbackUnaryImpl)))
+ ServerCallbackUnaryImpl(
static_cast<::grpc::CallbackServerContext*>(param.server_context),
- param.call, allocator_state, std::move(param.call_requester));
- param.server_context->BeginCompletionOp(
- param.call, [call](bool) { call->MaybeDone(); }, call);
-
- ServerUnaryReactor* reactor = nullptr;
- if (param.status.ok()) {
- reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
- get_reactor_,
+ param.call, allocator_state, std::move(param.call_requester));
+ param.server_context->BeginCompletionOp(
+ param.call, [call](bool) { call->MaybeDone(); }, call);
+
+ ServerUnaryReactor* reactor = nullptr;
+ if (param.status.ok()) {
+ reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
+ get_reactor_,
static_cast<::grpc::CallbackServerContext*>(param.server_context),
- call->request(), call->response());
- }
-
- if (reactor == nullptr) {
- // if deserialization or reactor creator failed, we need to fail the call
- reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
- param.call->call(), sizeof(UnimplementedUnaryReactor)))
- UnimplementedUnaryReactor(
- ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
- }
-
- /// Invoke SetupReactor as the last part of the handler
- call->SetupReactor(reactor);
- }
-
- void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
- ::grpc::Status* status, void** handler_data) final {
- ::grpc::ByteBuffer buf;
- buf.set_buffer(req);
- RequestType* request = nullptr;
- ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
- allocator_state = nullptr;
- if (allocator_ != nullptr) {
- allocator_state = allocator_->AllocateMessages();
- } else {
- allocator_state =
- new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
- call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
- DefaultMessageHolder<RequestType, ResponseType>();
- }
- *handler_data = allocator_state;
- request = allocator_state->request();
- *status =
- ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
- buf.Release();
- if (status->ok()) {
- return request;
- }
- // Clean up on deserialization failure.
- allocator_state->Release();
- return nullptr;
- }
-
- private:
+ call->request(), call->response());
+ }
+
+ if (reactor == nullptr) {
+ // if deserialization or reactor creator failed, we need to fail the call
+ reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
+ param.call->call(), sizeof(UnimplementedUnaryReactor)))
+ UnimplementedUnaryReactor(
+ ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
+ }
+
+ /// Invoke SetupReactor as the last part of the handler
+ call->SetupReactor(reactor);
+ }
+
+ void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
+ ::grpc::Status* status, void** handler_data) final {
+ ::grpc::ByteBuffer buf;
+ buf.set_buffer(req);
+ RequestType* request = nullptr;
+ ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
+ allocator_state = nullptr;
+ if (allocator_ != nullptr) {
+ allocator_state = allocator_->AllocateMessages();
+ } else {
+ allocator_state =
+ new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
+ call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
+ DefaultMessageHolder<RequestType, ResponseType>();
+ }
+ *handler_data = allocator_state;
+ request = allocator_state->request();
+ *status =
+ ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
+ buf.Release();
+ if (status->ok()) {
+ return request;
+ }
+ // Clean up on deserialization failure.
+ allocator_state->Release();
+ return nullptr;
+ }
+
+ private:
std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
- const RequestType*, ResponseType*)>
- get_reactor_;
- ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
- allocator_ = nullptr;
-
- class ServerCallbackUnaryImpl : public ServerCallbackUnary {
- public:
- void Finish(::grpc::Status s) override {
+ const RequestType*, ResponseType*)>
+ get_reactor_;
+ ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
+ allocator_ = nullptr;
+
+ class ServerCallbackUnaryImpl : public ServerCallbackUnary {
+ public:
+ void Finish(::grpc::Status s) override {
// A callback that only contains a call to MaybeDone can be run as an
// inline callback regardless of whether or not OnDone is inlineable
// because if the actual OnDone callback needs to be scheduled, MaybeDone
// is responsible for dispatching to an executor thread if needed. Thus,
// when setting up the finish_tag_, we can set its own callback to
// inlineable.
- finish_tag_.Set(
+ finish_tag_.Set(
call_.call(),
[this](bool) {
this->MaybeDone(
reactor_.load(std::memory_order_relaxed)->InternalInlineable());
},
&finish_ops_, /*can_inline=*/true);
- finish_ops_.set_core_cq_tag(&finish_tag_);
-
- if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- finish_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
- // The response is dropped if the status is not OK.
- if (s.ok()) {
- finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
- finish_ops_.SendMessagePtr(response()));
- } else {
- finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
- }
- finish_ops_.set_core_cq_tag(&finish_tag_);
- call_.PerformOps(&finish_ops_);
- }
-
- void SendInitialMetadata() override {
- GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
- this->Ref();
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+
+ if (!ctx_->sent_initial_metadata_) {
+ finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ finish_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ // The response is dropped if the status is not OK.
+ if (s.ok()) {
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
+ finish_ops_.SendMessagePtr(response()));
+ } else {
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
+ }
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
+ }
+
+ void SendInitialMetadata() override {
+ GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
+ this->Ref();
// The callback for this function should not be marked inline because it
// is directly invoking a user-controlled reaction
// (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
// thread. However, any OnDone needed after that can be inlined because it
// is already running on an executor thread.
- meta_tag_.Set(call_.call(),
- [this](bool ok) {
+ meta_tag_.Set(call_.call(),
+ [this](bool ok) {
ServerUnaryReactor* reactor =
reactor_.load(std::memory_order_relaxed);
reactor->OnSendInitialMetadataDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
- },
+ },
&meta_ops_, /*can_inline=*/false);
- meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- meta_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- meta_ops_.set_core_cq_tag(&meta_tag_);
- call_.PerformOps(&meta_ops_);
- }
-
- private:
- friend class CallbackUnaryHandler<RequestType, ResponseType>;
-
- ServerCallbackUnaryImpl(
+ meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ meta_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ meta_ops_.set_core_cq_tag(&meta_tag_);
+ call_.PerformOps(&meta_ops_);
+ }
+
+ private:
+ friend class CallbackUnaryHandler<RequestType, ResponseType>;
+
+ ServerCallbackUnaryImpl(
::grpc::CallbackServerContext* ctx, ::grpc::internal::Call* call,
- ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
- allocator_state,
- std::function<void()> call_requester)
- : ctx_(ctx),
- call_(*call),
- allocator_state_(allocator_state),
- call_requester_(std::move(call_requester)) {
- ctx_->set_message_allocator_state(allocator_state);
- }
-
- /// SetupReactor binds the reactor (which also releases any queued
- /// operations), maybe calls OnCancel if possible/needed, and maybe marks
- /// the completion of the RPC. This should be the last component of the
- /// handler.
- void SetupReactor(ServerUnaryReactor* reactor) {
- reactor_.store(reactor, std::memory_order_relaxed);
- this->BindReactor(reactor);
- this->MaybeCallOnCancel(reactor);
+ ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
+ allocator_state,
+ std::function<void()> call_requester)
+ : ctx_(ctx),
+ call_(*call),
+ allocator_state_(allocator_state),
+ call_requester_(std::move(call_requester)) {
+ ctx_->set_message_allocator_state(allocator_state);
+ }
+
+ /// SetupReactor binds the reactor (which also releases any queued
+ /// operations), maybe calls OnCancel if possible/needed, and maybe marks
+ /// the completion of the RPC. This should be the last component of the
+ /// handler.
+ void SetupReactor(ServerUnaryReactor* reactor) {
+ reactor_.store(reactor, std::memory_order_relaxed);
+ this->BindReactor(reactor);
+ this->MaybeCallOnCancel(reactor);
this->MaybeDone(reactor->InternalInlineable());
- }
-
- const RequestType* request() { return allocator_state_->request(); }
- ResponseType* response() { return allocator_state_->response(); }
-
+ }
+
+ const RequestType* request() { return allocator_state_->request(); }
+ ResponseType* response() { return allocator_state_->response(); }
+
void CallOnDone() override {
reactor_.load(std::memory_order_relaxed)->OnDone();
grpc_call* call = call_.call();
@@ -212,96 +212,96 @@ class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
this->~ServerCallbackUnaryImpl(); // explicitly call destructor
::grpc::g_core_codegen_interface->grpc_call_unref(call);
call_requester();
- }
-
- ServerReactor* reactor() override {
- return reactor_.load(std::memory_order_relaxed);
- }
-
- ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
- meta_ops_;
- ::grpc::internal::CallbackWithSuccessTag meta_tag_;
- ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
- ::grpc::internal::CallOpSendMessage,
- ::grpc::internal::CallOpServerSendStatus>
- finish_ops_;
- ::grpc::internal::CallbackWithSuccessTag finish_tag_;
-
+ }
+
+ ServerReactor* reactor() override {
+ return reactor_.load(std::memory_order_relaxed);
+ }
+
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
+ meta_ops_;
+ ::grpc::internal::CallbackWithSuccessTag meta_tag_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpServerSendStatus>
+ finish_ops_;
+ ::grpc::internal::CallbackWithSuccessTag finish_tag_;
+
::grpc::CallbackServerContext* const ctx_;
- ::grpc::internal::Call call_;
- ::grpc::experimental::MessageHolder<RequestType, ResponseType>* const
- allocator_state_;
- std::function<void()> call_requester_;
- // reactor_ can always be loaded/stored with relaxed memory ordering because
- // its value is only set once, independently of other data in the object,
- // and the loads that use it will always actually come provably later even
- // though they are from different threads since they are triggered by
- // actions initiated only by the setting up of the reactor_ variable. In
- // a sense, it's a delayed "const": it gets its value from the SetupReactor
- // method (not the constructor, so it's not a true const), but it doesn't
- // change after that and it only gets used by actions caused, directly or
- // indirectly, by that setup. This comment also applies to the reactor_
- // variables of the other streaming objects in this file.
- std::atomic<ServerUnaryReactor*> reactor_;
- // callbacks_outstanding_ follows a refcount pattern
- std::atomic<intptr_t> callbacks_outstanding_{
- 3}; // reserve for start, Finish, and CompletionOp
- };
-};
-
-template <class RequestType, class ResponseType>
-class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
- public:
- explicit CallbackClientStreamingHandler(
- std::function<ServerReadReactor<RequestType>*(
+ ::grpc::internal::Call call_;
+ ::grpc::experimental::MessageHolder<RequestType, ResponseType>* const
+ allocator_state_;
+ std::function<void()> call_requester_;
+ // reactor_ can always be loaded/stored with relaxed memory ordering because
+ // its value is only set once, independently of other data in the object,
+ // and the loads that use it will always actually come provably later even
+ // though they are from different threads since they are triggered by
+ // actions initiated only by the setting up of the reactor_ variable. In
+ // a sense, it's a delayed "const": it gets its value from the SetupReactor
+ // method (not the constructor, so it's not a true const), but it doesn't
+ // change after that and it only gets used by actions caused, directly or
+ // indirectly, by that setup. This comment also applies to the reactor_
+ // variables of the other streaming objects in this file.
+ std::atomic<ServerUnaryReactor*> reactor_;
+ // callbacks_outstanding_ follows a refcount pattern
+ std::atomic<intptr_t> callbacks_outstanding_{
+ 3}; // reserve for start, Finish, and CompletionOp
+ };
+};
+
+template <class RequestType, class ResponseType>
+class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
+ public:
+ explicit CallbackClientStreamingHandler(
+ std::function<ServerReadReactor<RequestType>*(
::grpc::CallbackServerContext*, ResponseType*)>
- get_reactor)
- : get_reactor_(std::move(get_reactor)) {}
- void RunHandler(const HandlerParameter& param) final {
- // Arena allocate a reader structure (that includes response)
- ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
-
- auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
- param.call->call(), sizeof(ServerCallbackReaderImpl)))
- ServerCallbackReaderImpl(
+ get_reactor)
+ : get_reactor_(std::move(get_reactor)) {}
+ void RunHandler(const HandlerParameter& param) final {
+ // Arena allocate a reader structure (that includes response)
+ ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
+
+ auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
+ param.call->call(), sizeof(ServerCallbackReaderImpl)))
+ ServerCallbackReaderImpl(
static_cast<::grpc::CallbackServerContext*>(param.server_context),
- param.call, std::move(param.call_requester));
+ param.call, std::move(param.call_requester));
// Inlineable OnDone can be false in the CompletionOp callback because there
// is no read reactor that has an inlineable OnDone; this only applies to
// the DefaultReactor (which is unary).
- param.server_context->BeginCompletionOp(
+ param.server_context->BeginCompletionOp(
param.call,
[reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
reader);
-
- ServerReadReactor<RequestType>* reactor = nullptr;
- if (param.status.ok()) {
- reactor = ::grpc::internal::CatchingReactorGetter<
- ServerReadReactor<RequestType>>(
- get_reactor_,
+
+ ServerReadReactor<RequestType>* reactor = nullptr;
+ if (param.status.ok()) {
+ reactor = ::grpc::internal::CatchingReactorGetter<
+ ServerReadReactor<RequestType>>(
+ get_reactor_,
static_cast<::grpc::CallbackServerContext*>(param.server_context),
- reader->response());
- }
-
- if (reactor == nullptr) {
- // if deserialization or reactor creator failed, we need to fail the call
- reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
- param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
- UnimplementedReadReactor<RequestType>(
- ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
- }
-
- reader->SetupReactor(reactor);
- }
-
- private:
+ reader->response());
+ }
+
+ if (reactor == nullptr) {
+ // if deserialization or reactor creator failed, we need to fail the call
+ reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
+ param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
+ UnimplementedReadReactor<RequestType>(
+ ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
+ }
+
+ reader->SetupReactor(reactor);
+ }
+
+ private:
std::function<ServerReadReactor<RequestType>*(::grpc::CallbackServerContext*,
ResponseType*)>
- get_reactor_;
-
- class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
- public:
- void Finish(::grpc::Status s) override {
+ get_reactor_;
+
+ class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
+ public:
+ void Finish(::grpc::Status s) override {
// A finish tag with only MaybeDone can have its callback inlined
// regardless even if OnDone is not inlineable because this callback just
// checks a ref and then decides whether or not to dispatch OnDone.
@@ -313,87 +313,87 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
this->MaybeDone(/*inlineable_ondone=*/false);
},
&finish_ops_, /*can_inline=*/true);
- if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- finish_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
- // The response is dropped if the status is not OK.
- if (s.ok()) {
- finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
- finish_ops_.SendMessagePtr(&resp_));
- } else {
- finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
- }
- finish_ops_.set_core_cq_tag(&finish_tag_);
- call_.PerformOps(&finish_ops_);
- }
-
- void SendInitialMetadata() override {
- GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
- this->Ref();
+ if (!ctx_->sent_initial_metadata_) {
+ finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ finish_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ // The response is dropped if the status is not OK.
+ if (s.ok()) {
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
+ finish_ops_.SendMessagePtr(&resp_));
+ } else {
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
+ }
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
+ }
+
+ void SendInitialMetadata() override {
+ GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
+ this->Ref();
// The callback for this function should not be inlined because it invokes
// a user-controlled reaction, but any resulting OnDone can be inlined in
// the executor to which this callback is dispatched.
- meta_tag_.Set(call_.call(),
- [this](bool ok) {
+ meta_tag_.Set(call_.call(),
+ [this](bool ok) {
ServerReadReactor<RequestType>* reactor =
reactor_.load(std::memory_order_relaxed);
reactor->OnSendInitialMetadataDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
- },
+ },
&meta_ops_, /*can_inline=*/false);
- meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- meta_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- meta_ops_.set_core_cq_tag(&meta_tag_);
- call_.PerformOps(&meta_ops_);
- }
-
- void Read(RequestType* req) override {
- this->Ref();
- read_ops_.RecvMessage(req);
- call_.PerformOps(&read_ops_);
- }
-
- private:
- friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
-
+ meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ meta_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ meta_ops_.set_core_cq_tag(&meta_tag_);
+ call_.PerformOps(&meta_ops_);
+ }
+
+ void Read(RequestType* req) override {
+ this->Ref();
+ read_ops_.RecvMessage(req);
+ call_.PerformOps(&read_ops_);
+ }
+
+ private:
+ friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
+
ServerCallbackReaderImpl(::grpc::CallbackServerContext* ctx,
- ::grpc::internal::Call* call,
- std::function<void()> call_requester)
- : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
-
- void SetupReactor(ServerReadReactor<RequestType>* reactor) {
- reactor_.store(reactor, std::memory_order_relaxed);
+ ::grpc::internal::Call* call,
+ std::function<void()> call_requester)
+ : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
+
+ void SetupReactor(ServerReadReactor<RequestType>* reactor) {
+ reactor_.store(reactor, std::memory_order_relaxed);
// The callback for this function should not be inlined because it invokes
// a user-controlled reaction, but any resulting OnDone can be inlined in
// the executor to which this callback is dispatched.
- read_tag_.Set(call_.call(),
+ read_tag_.Set(call_.call(),
[this, reactor](bool ok) {
reactor->OnReadDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
- },
+ },
&read_ops_, /*can_inline=*/false);
- read_ops_.set_core_cq_tag(&read_tag_);
- this->BindReactor(reactor);
- this->MaybeCallOnCancel(reactor);
+ read_ops_.set_core_cq_tag(&read_tag_);
+ this->BindReactor(reactor);
+ this->MaybeCallOnCancel(reactor);
// Inlineable OnDone can be false here because there is no read
// reactor that has an inlineable OnDone; this only applies to the
// DefaultReactor (which is unary).
this->MaybeDone(/*inlineable_ondone=*/false);
- }
-
- ~ServerCallbackReaderImpl() {}
-
- ResponseType* response() { return &resp_; }
-
+ }
+
+ ~ServerCallbackReaderImpl() {}
+
+ ResponseType* response() { return &resp_; }
+
void CallOnDone() override {
reactor_.load(std::memory_order_relaxed)->OnDone();
grpc_call* call = call_.call();
@@ -401,107 +401,107 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
this->~ServerCallbackReaderImpl(); // explicitly call destructor
::grpc::g_core_codegen_interface->grpc_call_unref(call);
call_requester();
- }
-
- ServerReactor* reactor() override {
- return reactor_.load(std::memory_order_relaxed);
- }
-
- ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
- meta_ops_;
- ::grpc::internal::CallbackWithSuccessTag meta_tag_;
- ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
- ::grpc::internal::CallOpSendMessage,
- ::grpc::internal::CallOpServerSendStatus>
- finish_ops_;
- ::grpc::internal::CallbackWithSuccessTag finish_tag_;
- ::grpc::internal::CallOpSet<
- ::grpc::internal::CallOpRecvMessage<RequestType>>
- read_ops_;
- ::grpc::internal::CallbackWithSuccessTag read_tag_;
-
+ }
+
+ ServerReactor* reactor() override {
+ return reactor_.load(std::memory_order_relaxed);
+ }
+
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
+ meta_ops_;
+ ::grpc::internal::CallbackWithSuccessTag meta_tag_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpServerSendStatus>
+ finish_ops_;
+ ::grpc::internal::CallbackWithSuccessTag finish_tag_;
+ ::grpc::internal::CallOpSet<
+ ::grpc::internal::CallOpRecvMessage<RequestType>>
+ read_ops_;
+ ::grpc::internal::CallbackWithSuccessTag read_tag_;
+
::grpc::CallbackServerContext* const ctx_;
- ::grpc::internal::Call call_;
- ResponseType resp_;
- std::function<void()> call_requester_;
- // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
- std::atomic<ServerReadReactor<RequestType>*> reactor_;
- // callbacks_outstanding_ follows a refcount pattern
- std::atomic<intptr_t> callbacks_outstanding_{
- 3}; // reserve for OnStarted, Finish, and CompletionOp
- };
-};
-
-template <class RequestType, class ResponseType>
-class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
- public:
- explicit CallbackServerStreamingHandler(
- std::function<ServerWriteReactor<ResponseType>*(
+ ::grpc::internal::Call call_;
+ ResponseType resp_;
+ std::function<void()> call_requester_;
+ // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
+ std::atomic<ServerReadReactor<RequestType>*> reactor_;
+ // callbacks_outstanding_ follows a refcount pattern
+ std::atomic<intptr_t> callbacks_outstanding_{
+ 3}; // reserve for OnStarted, Finish, and CompletionOp
+ };
+};
+
+template <class RequestType, class ResponseType>
+class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
+ public:
+ explicit CallbackServerStreamingHandler(
+ std::function<ServerWriteReactor<ResponseType>*(
::grpc::CallbackServerContext*, const RequestType*)>
- get_reactor)
- : get_reactor_(std::move(get_reactor)) {}
- void RunHandler(const HandlerParameter& param) final {
- // Arena allocate a writer structure
- ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
-
- auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
- param.call->call(), sizeof(ServerCallbackWriterImpl)))
- ServerCallbackWriterImpl(
+ get_reactor)
+ : get_reactor_(std::move(get_reactor)) {}
+ void RunHandler(const HandlerParameter& param) final {
+ // Arena allocate a writer structure
+ ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
+
+ auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
+ param.call->call(), sizeof(ServerCallbackWriterImpl)))
+ ServerCallbackWriterImpl(
static_cast<::grpc::CallbackServerContext*>(param.server_context),
- param.call, static_cast<RequestType*>(param.request),
- std::move(param.call_requester));
+ param.call, static_cast<RequestType*>(param.request),
+ std::move(param.call_requester));
// Inlineable OnDone can be false in the CompletionOp callback because there
// is no write reactor that has an inlineable OnDone; this only applies to
// the DefaultReactor (which is unary).
- param.server_context->BeginCompletionOp(
+ param.server_context->BeginCompletionOp(
param.call,
[writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
writer);
-
- ServerWriteReactor<ResponseType>* reactor = nullptr;
- if (param.status.ok()) {
- reactor = ::grpc::internal::CatchingReactorGetter<
- ServerWriteReactor<ResponseType>>(
- get_reactor_,
+
+ ServerWriteReactor<ResponseType>* reactor = nullptr;
+ if (param.status.ok()) {
+ reactor = ::grpc::internal::CatchingReactorGetter<
+ ServerWriteReactor<ResponseType>>(
+ get_reactor_,
static_cast<::grpc::CallbackServerContext*>(param.server_context),
- writer->request());
- }
- if (reactor == nullptr) {
- // if deserialization or reactor creator failed, we need to fail the call
- reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
- param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
- UnimplementedWriteReactor<ResponseType>(
- ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
- }
-
- writer->SetupReactor(reactor);
- }
-
- void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
- ::grpc::Status* status, void** /*handler_data*/) final {
- ::grpc::ByteBuffer buf;
- buf.set_buffer(req);
- auto* request =
- new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
- call, sizeof(RequestType))) RequestType();
- *status =
- ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
- buf.Release();
- if (status->ok()) {
- return request;
- }
- request->~RequestType();
- return nullptr;
- }
-
- private:
- std::function<ServerWriteReactor<ResponseType>*(
+ writer->request());
+ }
+ if (reactor == nullptr) {
+ // if deserialization or reactor creator failed, we need to fail the call
+ reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
+ param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
+ UnimplementedWriteReactor<ResponseType>(
+ ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
+ }
+
+ writer->SetupReactor(reactor);
+ }
+
+ void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
+ ::grpc::Status* status, void** /*handler_data*/) final {
+ ::grpc::ByteBuffer buf;
+ buf.set_buffer(req);
+ auto* request =
+ new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
+ call, sizeof(RequestType))) RequestType();
+ *status =
+ ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
+ buf.Release();
+ if (status->ok()) {
+ return request;
+ }
+ request->~RequestType();
+ return nullptr;
+ }
+
+ private:
+ std::function<ServerWriteReactor<ResponseType>*(
::grpc::CallbackServerContext*, const RequestType*)>
- get_reactor_;
-
- class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
- public:
- void Finish(::grpc::Status s) override {
+ get_reactor_;
+
+ class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
+ public:
+ void Finish(::grpc::Status s) override {
// A finish tag with only MaybeDone can have its callback inlined
// regardless even if OnDone is not inlineable because this callback just
// checks a ref and then decides whether or not to dispatch OnDone.
@@ -513,85 +513,85 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
this->MaybeDone(/*inlineable_ondone=*/false);
},
&finish_ops_, /*can_inline=*/true);
- finish_ops_.set_core_cq_tag(&finish_tag_);
-
- if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- finish_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
- finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
- call_.PerformOps(&finish_ops_);
- }
-
- void SendInitialMetadata() override {
- GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
- this->Ref();
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+
+ if (!ctx_->sent_initial_metadata_) {
+ finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ finish_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
+ call_.PerformOps(&finish_ops_);
+ }
+
+ void SendInitialMetadata() override {
+ GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
+ this->Ref();
// The callback for this function should not be inlined because it invokes
// a user-controlled reaction, but any resulting OnDone can be inlined in
// the executor to which this callback is dispatched.
- meta_tag_.Set(call_.call(),
- [this](bool ok) {
+ meta_tag_.Set(call_.call(),
+ [this](bool ok) {
ServerWriteReactor<ResponseType>* reactor =
reactor_.load(std::memory_order_relaxed);
reactor->OnSendInitialMetadataDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
- },
+ },
&meta_ops_, /*can_inline=*/false);
- meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- meta_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- meta_ops_.set_core_cq_tag(&meta_tag_);
- call_.PerformOps(&meta_ops_);
- }
-
- void Write(const ResponseType* resp,
- ::grpc::WriteOptions options) override {
- this->Ref();
- if (options.is_last_message()) {
- options.set_buffer_hint();
- }
- if (!ctx_->sent_initial_metadata_) {
- write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- write_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
- // TODO(vjpai): don't assert
- GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
- call_.PerformOps(&write_ops_);
- }
-
- void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
- ::grpc::Status s) override {
- // This combines the write into the finish callback
+ meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ meta_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ meta_ops_.set_core_cq_tag(&meta_tag_);
+ call_.PerformOps(&meta_ops_);
+ }
+
+ void Write(const ResponseType* resp,
+ ::grpc::WriteOptions options) override {
+ this->Ref();
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
+ if (!ctx_->sent_initial_metadata_) {
+ write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ write_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
+ void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
+ ::grpc::Status s) override {
+ // This combines the write into the finish callback
// TODO(vjpai): don't assert
GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
- Finish(std::move(s));
- }
-
- private:
- friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
-
+ Finish(std::move(s));
+ }
+
+ private:
+ friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
+
ServerCallbackWriterImpl(::grpc::CallbackServerContext* ctx,
- ::grpc::internal::Call* call,
- const RequestType* req,
- std::function<void()> call_requester)
- : ctx_(ctx),
- call_(*call),
- req_(req),
- call_requester_(std::move(call_requester)) {}
-
- void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
- reactor_.store(reactor, std::memory_order_relaxed);
+ ::grpc::internal::Call* call,
+ const RequestType* req,
+ std::function<void()> call_requester)
+ : ctx_(ctx),
+ call_(*call),
+ req_(req),
+ call_requester_(std::move(call_requester)) {}
+
+ void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
+ reactor_.store(reactor, std::memory_order_relaxed);
// The callback for this function should not be inlined because it invokes
// a user-controlled reaction, but any resulting OnDone can be inlined in
// the executor to which this callback is dispatched.
@@ -601,18 +601,18 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
this->MaybeDone(/*inlineable_ondone=*/true);
},
&write_ops_, /*can_inline=*/false);
- write_ops_.set_core_cq_tag(&write_tag_);
- this->BindReactor(reactor);
- this->MaybeCallOnCancel(reactor);
+ write_ops_.set_core_cq_tag(&write_tag_);
+ this->BindReactor(reactor);
+ this->MaybeCallOnCancel(reactor);
// Inlineable OnDone can be false here because there is no write
// reactor that has an inlineable OnDone; this only applies to the
// DefaultReactor (which is unary).
this->MaybeDone(/*inlineable_ondone=*/false);
- }
- ~ServerCallbackWriterImpl() { req_->~RequestType(); }
-
- const RequestType* request() { return req_; }
-
+ }
+ ~ServerCallbackWriterImpl() { req_->~RequestType(); }
+
+ const RequestType* request() { return req_; }
+
void CallOnDone() override {
reactor_.load(std::memory_order_relaxed)->OnDone();
grpc_call* call = call_.call();
@@ -620,90 +620,90 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
this->~ServerCallbackWriterImpl(); // explicitly call destructor
::grpc::g_core_codegen_interface->grpc_call_unref(call);
call_requester();
- }
-
- ServerReactor* reactor() override {
- return reactor_.load(std::memory_order_relaxed);
- }
-
- ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
- meta_ops_;
- ::grpc::internal::CallbackWithSuccessTag meta_tag_;
- ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
- ::grpc::internal::CallOpSendMessage,
- ::grpc::internal::CallOpServerSendStatus>
- finish_ops_;
- ::grpc::internal::CallbackWithSuccessTag finish_tag_;
- ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
- ::grpc::internal::CallOpSendMessage>
- write_ops_;
- ::grpc::internal::CallbackWithSuccessTag write_tag_;
-
+ }
+
+ ServerReactor* reactor() override {
+ return reactor_.load(std::memory_order_relaxed);
+ }
+
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
+ meta_ops_;
+ ::grpc::internal::CallbackWithSuccessTag meta_tag_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpServerSendStatus>
+ finish_ops_;
+ ::grpc::internal::CallbackWithSuccessTag finish_tag_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage>
+ write_ops_;
+ ::grpc::internal::CallbackWithSuccessTag write_tag_;
+
::grpc::CallbackServerContext* const ctx_;
- ::grpc::internal::Call call_;
- const RequestType* req_;
- std::function<void()> call_requester_;
- // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
- std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
- // callbacks_outstanding_ follows a refcount pattern
- std::atomic<intptr_t> callbacks_outstanding_{
- 3}; // reserve for OnStarted, Finish, and CompletionOp
- };
-};
-
-template <class RequestType, class ResponseType>
-class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
- public:
- explicit CallbackBidiHandler(
- std::function<ServerBidiReactor<RequestType, ResponseType>*(
+ ::grpc::internal::Call call_;
+ const RequestType* req_;
+ std::function<void()> call_requester_;
+ // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
+ std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
+ // callbacks_outstanding_ follows a refcount pattern
+ std::atomic<intptr_t> callbacks_outstanding_{
+ 3}; // reserve for OnStarted, Finish, and CompletionOp
+ };
+};
+
+template <class RequestType, class ResponseType>
+class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
+ public:
+ explicit CallbackBidiHandler(
+ std::function<ServerBidiReactor<RequestType, ResponseType>*(
::grpc::CallbackServerContext*)>
- get_reactor)
- : get_reactor_(std::move(get_reactor)) {}
- void RunHandler(const HandlerParameter& param) final {
- ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
-
- auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
- param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
- ServerCallbackReaderWriterImpl(
+ get_reactor)
+ : get_reactor_(std::move(get_reactor)) {}
+ void RunHandler(const HandlerParameter& param) final {
+ ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
+
+ auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
+ param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
+ ServerCallbackReaderWriterImpl(
static_cast<::grpc::CallbackServerContext*>(param.server_context),
- param.call, std::move(param.call_requester));
+ param.call, std::move(param.call_requester));
// Inlineable OnDone can be false in the CompletionOp callback because there
// is no bidi reactor that has an inlineable OnDone; this only applies to
// the DefaultReactor (which is unary).
- param.server_context->BeginCompletionOp(
+ param.server_context->BeginCompletionOp(
param.call,
[stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
stream);
-
- ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
- if (param.status.ok()) {
- reactor = ::grpc::internal::CatchingReactorGetter<
- ServerBidiReactor<RequestType, ResponseType>>(
+
+ ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
+ if (param.status.ok()) {
+ reactor = ::grpc::internal::CatchingReactorGetter<
+ ServerBidiReactor<RequestType, ResponseType>>(
get_reactor_,
static_cast<::grpc::CallbackServerContext*>(param.server_context));
- }
-
- if (reactor == nullptr) {
- // if deserialization or reactor creator failed, we need to fail the call
- reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
- param.call->call(),
- sizeof(UnimplementedBidiReactor<RequestType, ResponseType>)))
- UnimplementedBidiReactor<RequestType, ResponseType>(
- ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
- }
-
- stream->SetupReactor(reactor);
- }
-
- private:
- std::function<ServerBidiReactor<RequestType, ResponseType>*(
+ }
+
+ if (reactor == nullptr) {
+ // if deserialization or reactor creator failed, we need to fail the call
+ reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
+ param.call->call(),
+ sizeof(UnimplementedBidiReactor<RequestType, ResponseType>)))
+ UnimplementedBidiReactor<RequestType, ResponseType>(
+ ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
+ }
+
+ stream->SetupReactor(reactor);
+ }
+
+ private:
+ std::function<ServerBidiReactor<RequestType, ResponseType>*(
::grpc::CallbackServerContext*)>
- get_reactor_;
-
- class ServerCallbackReaderWriterImpl
- : public ServerCallbackReaderWriter<RequestType, ResponseType> {
- public:
- void Finish(::grpc::Status s) override {
+ get_reactor_;
+
+ class ServerCallbackReaderWriterImpl
+ : public ServerCallbackReaderWriter<RequestType, ResponseType> {
+ public:
+ void Finish(::grpc::Status s) override {
// A finish tag with only MaybeDone can have its callback inlined
// regardless even if OnDone is not inlineable because this callback just
// checks a ref and then decides whether or not to dispatch OnDone.
@@ -715,86 +715,86 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
this->MaybeDone(/*inlineable_ondone=*/false);
},
&finish_ops_, /*can_inline=*/true);
- finish_ops_.set_core_cq_tag(&finish_tag_);
-
- if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- finish_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
- finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
- call_.PerformOps(&finish_ops_);
- }
-
- void SendInitialMetadata() override {
- GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
- this->Ref();
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+
+ if (!ctx_->sent_initial_metadata_) {
+ finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ finish_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
+ call_.PerformOps(&finish_ops_);
+ }
+
+ void SendInitialMetadata() override {
+ GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
+ this->Ref();
// The callback for this function should not be inlined because it invokes
// a user-controlled reaction, but any resulting OnDone can be inlined in
// the executor to which this callback is dispatched.
- meta_tag_.Set(call_.call(),
- [this](bool ok) {
+ meta_tag_.Set(call_.call(),
+ [this](bool ok) {
ServerBidiReactor<RequestType, ResponseType>* reactor =
reactor_.load(std::memory_order_relaxed);
reactor->OnSendInitialMetadataDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
- },
+ },
&meta_ops_, /*can_inline=*/false);
- meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- meta_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- meta_ops_.set_core_cq_tag(&meta_tag_);
- call_.PerformOps(&meta_ops_);
- }
-
- void Write(const ResponseType* resp,
- ::grpc::WriteOptions options) override {
- this->Ref();
- if (options.is_last_message()) {
- options.set_buffer_hint();
- }
- if (!ctx_->sent_initial_metadata_) {
- write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
- ctx_->initial_metadata_flags());
- if (ctx_->compression_level_set()) {
- write_ops_.set_compression_level(ctx_->compression_level());
- }
- ctx_->sent_initial_metadata_ = true;
- }
- // TODO(vjpai): don't assert
- GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
- call_.PerformOps(&write_ops_);
- }
-
- void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
- ::grpc::Status s) override {
+ meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ meta_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ meta_ops_.set_core_cq_tag(&meta_tag_);
+ call_.PerformOps(&meta_ops_);
+ }
+
+ void Write(const ResponseType* resp,
+ ::grpc::WriteOptions options) override {
+ this->Ref();
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
+ if (!ctx_->sent_initial_metadata_) {
+ write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ write_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
+ void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
+ ::grpc::Status s) override {
// TODO(vjpai): don't assert
GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
- Finish(std::move(s));
- }
-
- void Read(RequestType* req) override {
- this->Ref();
- read_ops_.RecvMessage(req);
- call_.PerformOps(&read_ops_);
- }
-
- private:
- friend class CallbackBidiHandler<RequestType, ResponseType>;
-
+ Finish(std::move(s));
+ }
+
+ void Read(RequestType* req) override {
+ this->Ref();
+ read_ops_.RecvMessage(req);
+ call_.PerformOps(&read_ops_);
+ }
+
+ private:
+ friend class CallbackBidiHandler<RequestType, ResponseType>;
+
ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext* ctx,
- ::grpc::internal::Call* call,
- std::function<void()> call_requester)
- : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
-
- void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
- reactor_.store(reactor, std::memory_order_relaxed);
+ ::grpc::internal::Call* call,
+ std::function<void()> call_requester)
+ : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
+
+ void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
+ reactor_.store(reactor, std::memory_order_relaxed);
// The callbacks for these functions should not be inlined because they
// invoke user-controlled reactions, but any resulting OnDones can be
// inlined in the executor to which a callback is dispatched.
@@ -804,22 +804,22 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
this->MaybeDone(/*inlineable_ondone=*/true);
},
&write_ops_, /*can_inline=*/false);
- write_ops_.set_core_cq_tag(&write_tag_);
- read_tag_.Set(call_.call(),
+ write_ops_.set_core_cq_tag(&write_tag_);
+ read_tag_.Set(call_.call(),
[this, reactor](bool ok) {
reactor->OnReadDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
- },
+ },
&read_ops_, /*can_inline=*/false);
- read_ops_.set_core_cq_tag(&read_tag_);
- this->BindReactor(reactor);
- this->MaybeCallOnCancel(reactor);
+ read_ops_.set_core_cq_tag(&read_tag_);
+ this->BindReactor(reactor);
+ this->MaybeCallOnCancel(reactor);
// Inlineable OnDone can be false here because there is no bidi
// reactor that has an inlineable OnDone; this only applies to the
// DefaultReactor (which is unary).
this->MaybeDone(/*inlineable_ondone=*/false);
- }
-
+ }
+
void CallOnDone() override {
reactor_.load(std::memory_order_relaxed)->OnDone();
grpc_call* call = call_.call();
@@ -827,41 +827,41 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
::grpc::g_core_codegen_interface->grpc_call_unref(call);
call_requester();
- }
-
- ServerReactor* reactor() override {
- return reactor_.load(std::memory_order_relaxed);
- }
-
- ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
- meta_ops_;
- ::grpc::internal::CallbackWithSuccessTag meta_tag_;
- ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
- ::grpc::internal::CallOpSendMessage,
- ::grpc::internal::CallOpServerSendStatus>
- finish_ops_;
- ::grpc::internal::CallbackWithSuccessTag finish_tag_;
- ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
- ::grpc::internal::CallOpSendMessage>
- write_ops_;
- ::grpc::internal::CallbackWithSuccessTag write_tag_;
- ::grpc::internal::CallOpSet<
- ::grpc::internal::CallOpRecvMessage<RequestType>>
- read_ops_;
- ::grpc::internal::CallbackWithSuccessTag read_tag_;
-
+ }
+
+ ServerReactor* reactor() override {
+ return reactor_.load(std::memory_order_relaxed);
+ }
+
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
+ meta_ops_;
+ ::grpc::internal::CallbackWithSuccessTag meta_tag_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpServerSendStatus>
+ finish_ops_;
+ ::grpc::internal::CallbackWithSuccessTag finish_tag_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage>
+ write_ops_;
+ ::grpc::internal::CallbackWithSuccessTag write_tag_;
+ ::grpc::internal::CallOpSet<
+ ::grpc::internal::CallOpRecvMessage<RequestType>>
+ read_ops_;
+ ::grpc::internal::CallbackWithSuccessTag read_tag_;
+
::grpc::CallbackServerContext* const ctx_;
- ::grpc::internal::Call call_;
- std::function<void()> call_requester_;
- // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
- std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
- // callbacks_outstanding_ follows a refcount pattern
- std::atomic<intptr_t> callbacks_outstanding_{
- 3}; // reserve for OnStarted, Finish, and CompletionOp
- };
-};
-
-} // namespace internal
+ ::grpc::internal::Call call_;
+ std::function<void()> call_requester_;
+ // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
+ std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
+ // callbacks_outstanding_ follows a refcount pattern
+ std::atomic<intptr_t> callbacks_outstanding_{
+ 3}; // reserve for OnStarted, Finish, and CompletionOp
+ };
+};
+
+} // namespace internal
} // namespace grpc
-
-#endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
+
+#endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H