diff options
author | dvshkurko <dvshkurko@yandex-team.ru> | 2022-02-10 16:45:52 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:52 +0300 |
commit | c768a99151e47c3a4bb7b92c514d256abd301c4d (patch) | |
tree | 1a2c5ffcf89eb53ecd79dbc9bc0a195c27404d0c /contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h | |
parent | 321ee9bce31ec6e238be26dbcbe539cffa2c3309 (diff) | |
download | ydb-c768a99151e47c3a4bb7b92c514d256abd301c4d.tar.gz |
Restoring authorship annotation for <dvshkurko@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h')
-rw-r--r-- | contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h | 1330 |
1 files changed, 665 insertions, 665 deletions
diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h index 9aa12fcc81..8120fcaf85 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h @@ -1,209 +1,209 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H -#define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H - -#include <grpcpp/impl/codegen/message_allocator.h> -#include <grpcpp/impl/codegen/rpc_service_method.h> +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H +#define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H + +#include <grpcpp/impl/codegen/message_allocator.h> +#include <grpcpp/impl/codegen/rpc_service_method.h> #include <grpcpp/impl/codegen/server_callback.h> #include <grpcpp/impl/codegen/server_context.h> -#include <grpcpp/impl/codegen/status.h> - +#include <grpcpp/impl/codegen/status.h> + namespace grpc { -namespace internal { - -template <class RequestType, class ResponseType> -class CallbackUnaryHandler : public ::grpc::internal::MethodHandler { - public: - explicit CallbackUnaryHandler( +namespace internal { + +template <class RequestType, class ResponseType> +class CallbackUnaryHandler : public ::grpc::internal::MethodHandler { + public: + explicit CallbackUnaryHandler( std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*, - const RequestType*, ResponseType*)> - get_reactor) - : get_reactor_(std::move(get_reactor)) {} - - void SetMessageAllocator( - ::grpc::experimental::MessageAllocator<RequestType, ResponseType>* - allocator) { - allocator_ = allocator; - } - - void RunHandler(const HandlerParameter& param) final { - // Arena allocate a controller structure (that includes request/response) - ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); - auto* allocator_state = static_cast< - ::grpc::experimental::MessageHolder<RequestType, ResponseType>*>( - param.internal_data); - - auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( - param.call->call(), sizeof(ServerCallbackUnaryImpl))) - ServerCallbackUnaryImpl( + const RequestType*, ResponseType*)> + get_reactor) + : get_reactor_(std::move(get_reactor)) {} + + void SetMessageAllocator( + ::grpc::experimental::MessageAllocator<RequestType, ResponseType>* + allocator) { + allocator_ = allocator; + } + + void RunHandler(const HandlerParameter& param) final { + // Arena allocate a controller structure (that includes request/response) + ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); + auto* allocator_state = static_cast< + ::grpc::experimental::MessageHolder<RequestType, ResponseType>*>( + param.internal_data); + + auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( + param.call->call(), sizeof(ServerCallbackUnaryImpl))) + ServerCallbackUnaryImpl( static_cast<::grpc::CallbackServerContext*>(param.server_context), - param.call, allocator_state, std::move(param.call_requester)); - param.server_context->BeginCompletionOp( - param.call, [call](bool) { call->MaybeDone(); }, call); - - ServerUnaryReactor* reactor = nullptr; - if (param.status.ok()) { - reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>( - get_reactor_, + param.call, allocator_state, std::move(param.call_requester)); + param.server_context->BeginCompletionOp( + param.call, [call](bool) { call->MaybeDone(); }, call); + + ServerUnaryReactor* reactor = nullptr; + if (param.status.ok()) { + reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>( + get_reactor_, static_cast<::grpc::CallbackServerContext*>(param.server_context), - call->request(), call->response()); - } - - if (reactor == nullptr) { - // if deserialization or reactor creator failed, we need to fail the call - reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( - param.call->call(), sizeof(UnimplementedUnaryReactor))) - UnimplementedUnaryReactor( - ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); - } - - /// Invoke SetupReactor as the last part of the handler - call->SetupReactor(reactor); - } - - void* Deserialize(grpc_call* call, grpc_byte_buffer* req, - ::grpc::Status* status, void** handler_data) final { - ::grpc::ByteBuffer buf; - buf.set_buffer(req); - RequestType* request = nullptr; - ::grpc::experimental::MessageHolder<RequestType, ResponseType>* - allocator_state = nullptr; - if (allocator_ != nullptr) { - allocator_state = allocator_->AllocateMessages(); - } else { - allocator_state = - new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call, sizeof(DefaultMessageHolder<RequestType, ResponseType>))) - DefaultMessageHolder<RequestType, ResponseType>(); - } - *handler_data = allocator_state; - request = allocator_state->request(); - *status = - ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); - buf.Release(); - if (status->ok()) { - return request; - } - // Clean up on deserialization failure. - allocator_state->Release(); - return nullptr; - } - - private: + call->request(), call->response()); + } + + if (reactor == nullptr) { + // if deserialization or reactor creator failed, we need to fail the call + reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( + param.call->call(), sizeof(UnimplementedUnaryReactor))) + UnimplementedUnaryReactor( + ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); + } + + /// Invoke SetupReactor as the last part of the handler + call->SetupReactor(reactor); + } + + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, + ::grpc::Status* status, void** handler_data) final { + ::grpc::ByteBuffer buf; + buf.set_buffer(req); + RequestType* request = nullptr; + ::grpc::experimental::MessageHolder<RequestType, ResponseType>* + allocator_state = nullptr; + if (allocator_ != nullptr) { + allocator_state = allocator_->AllocateMessages(); + } else { + allocator_state = + new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( + call, sizeof(DefaultMessageHolder<RequestType, ResponseType>))) + DefaultMessageHolder<RequestType, ResponseType>(); + } + *handler_data = allocator_state; + request = allocator_state->request(); + *status = + ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); + buf.Release(); + if (status->ok()) { + return request; + } + // Clean up on deserialization failure. + allocator_state->Release(); + return nullptr; + } + + private: std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*, - const RequestType*, ResponseType*)> - get_reactor_; - ::grpc::experimental::MessageAllocator<RequestType, ResponseType>* - allocator_ = nullptr; - - class ServerCallbackUnaryImpl : public ServerCallbackUnary { - public: - void Finish(::grpc::Status s) override { + const RequestType*, ResponseType*)> + get_reactor_; + ::grpc::experimental::MessageAllocator<RequestType, ResponseType>* + allocator_ = nullptr; + + class ServerCallbackUnaryImpl : public ServerCallbackUnary { + public: + void Finish(::grpc::Status s) override { // A callback that only contains a call to MaybeDone can be run as an // inline callback regardless of whether or not OnDone is inlineable // because if the actual OnDone callback needs to be scheduled, MaybeDone // is responsible for dispatching to an executor thread if needed. Thus, // when setting up the finish_tag_, we can set its own callback to // inlineable. - finish_tag_.Set( + finish_tag_.Set( call_.call(), [this](bool) { this->MaybeDone( reactor_.load(std::memory_order_relaxed)->InternalInlineable()); }, &finish_ops_, /*can_inline=*/true); - finish_ops_.set_core_cq_tag(&finish_tag_); - - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - finish_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } - // The response is dropped if the status is not OK. - if (s.ok()) { - finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, - finish_ops_.SendMessagePtr(response())); - } else { - finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); - } - finish_ops_.set_core_cq_tag(&finish_tag_); - call_.PerformOps(&finish_ops_); - } - - void SendInitialMetadata() override { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - this->Ref(); + finish_ops_.set_core_cq_tag(&finish_tag_); + + if (!ctx_->sent_initial_metadata_) { + finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + // The response is dropped if the status is not OK. + if (s.ok()) { + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, + finish_ops_.SendMessagePtr(response())); + } else { + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); + } + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); + } + + void SendInitialMetadata() override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + this->Ref(); // The callback for this function should not be marked inline because it // is directly invoking a user-controlled reaction // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor // thread. However, any OnDone needed after that can be inlined because it // is already running on an executor thread. - meta_tag_.Set(call_.call(), - [this](bool ok) { + meta_tag_.Set(call_.call(), + [this](bool ok) { ServerUnaryReactor* reactor = reactor_.load(std::memory_order_relaxed); reactor->OnSendInitialMetadataDone(ok); this->MaybeDone(/*inlineable_ondone=*/true); - }, + }, &meta_ops_, /*can_inline=*/false); - meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - meta_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - meta_ops_.set_core_cq_tag(&meta_tag_); - call_.PerformOps(&meta_ops_); - } - - private: - friend class CallbackUnaryHandler<RequestType, ResponseType>; - - ServerCallbackUnaryImpl( + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + meta_ops_.set_core_cq_tag(&meta_tag_); + call_.PerformOps(&meta_ops_); + } + + private: + friend class CallbackUnaryHandler<RequestType, ResponseType>; + + ServerCallbackUnaryImpl( ::grpc::CallbackServerContext* ctx, ::grpc::internal::Call* call, - ::grpc::experimental::MessageHolder<RequestType, ResponseType>* - allocator_state, - std::function<void()> call_requester) - : ctx_(ctx), - call_(*call), - allocator_state_(allocator_state), - call_requester_(std::move(call_requester)) { - ctx_->set_message_allocator_state(allocator_state); - } - - /// SetupReactor binds the reactor (which also releases any queued - /// operations), maybe calls OnCancel if possible/needed, and maybe marks - /// the completion of the RPC. This should be the last component of the - /// handler. - void SetupReactor(ServerUnaryReactor* reactor) { - reactor_.store(reactor, std::memory_order_relaxed); - this->BindReactor(reactor); - this->MaybeCallOnCancel(reactor); + ::grpc::experimental::MessageHolder<RequestType, ResponseType>* + allocator_state, + std::function<void()> call_requester) + : ctx_(ctx), + call_(*call), + allocator_state_(allocator_state), + call_requester_(std::move(call_requester)) { + ctx_->set_message_allocator_state(allocator_state); + } + + /// SetupReactor binds the reactor (which also releases any queued + /// operations), maybe calls OnCancel if possible/needed, and maybe marks + /// the completion of the RPC. This should be the last component of the + /// handler. + void SetupReactor(ServerUnaryReactor* reactor) { + reactor_.store(reactor, std::memory_order_relaxed); + this->BindReactor(reactor); + this->MaybeCallOnCancel(reactor); this->MaybeDone(reactor->InternalInlineable()); - } - - const RequestType* request() { return allocator_state_->request(); } - ResponseType* response() { return allocator_state_->response(); } - + } + + const RequestType* request() { return allocator_state_->request(); } + ResponseType* response() { return allocator_state_->response(); } + void CallOnDone() override { reactor_.load(std::memory_order_relaxed)->OnDone(); grpc_call* call = call_.call(); @@ -212,96 +212,96 @@ class CallbackUnaryHandler : public ::grpc::internal::MethodHandler { this->~ServerCallbackUnaryImpl(); // explicitly call destructor ::grpc::g_core_codegen_interface->grpc_call_unref(call); call_requester(); - } - - ServerReactor* reactor() override { - return reactor_.load(std::memory_order_relaxed); - } - - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> - meta_ops_; - ::grpc::internal::CallbackWithSuccessTag meta_tag_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpServerSendStatus> - finish_ops_; - ::grpc::internal::CallbackWithSuccessTag finish_tag_; - + } + + ServerReactor* reactor() override { + return reactor_.load(std::memory_order_relaxed); + } + + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + ::grpc::internal::CallbackWithSuccessTag meta_tag_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> + finish_ops_; + ::grpc::internal::CallbackWithSuccessTag finish_tag_; + ::grpc::CallbackServerContext* const ctx_; - ::grpc::internal::Call call_; - ::grpc::experimental::MessageHolder<RequestType, ResponseType>* const - allocator_state_; - std::function<void()> call_requester_; - // reactor_ can always be loaded/stored with relaxed memory ordering because - // its value is only set once, independently of other data in the object, - // and the loads that use it will always actually come provably later even - // though they are from different threads since they are triggered by - // actions initiated only by the setting up of the reactor_ variable. In - // a sense, it's a delayed "const": it gets its value from the SetupReactor - // method (not the constructor, so it's not a true const), but it doesn't - // change after that and it only gets used by actions caused, directly or - // indirectly, by that setup. This comment also applies to the reactor_ - // variables of the other streaming objects in this file. - std::atomic<ServerUnaryReactor*> reactor_; - // callbacks_outstanding_ follows a refcount pattern - std::atomic<intptr_t> callbacks_outstanding_{ - 3}; // reserve for start, Finish, and CompletionOp - }; -}; - -template <class RequestType, class ResponseType> -class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler { - public: - explicit CallbackClientStreamingHandler( - std::function<ServerReadReactor<RequestType>*( + ::grpc::internal::Call call_; + ::grpc::experimental::MessageHolder<RequestType, ResponseType>* const + allocator_state_; + std::function<void()> call_requester_; + // reactor_ can always be loaded/stored with relaxed memory ordering because + // its value is only set once, independently of other data in the object, + // and the loads that use it will always actually come provably later even + // though they are from different threads since they are triggered by + // actions initiated only by the setting up of the reactor_ variable. In + // a sense, it's a delayed "const": it gets its value from the SetupReactor + // method (not the constructor, so it's not a true const), but it doesn't + // change after that and it only gets used by actions caused, directly or + // indirectly, by that setup. This comment also applies to the reactor_ + // variables of the other streaming objects in this file. + std::atomic<ServerUnaryReactor*> reactor_; + // callbacks_outstanding_ follows a refcount pattern + std::atomic<intptr_t> callbacks_outstanding_{ + 3}; // reserve for start, Finish, and CompletionOp + }; +}; + +template <class RequestType, class ResponseType> +class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler { + public: + explicit CallbackClientStreamingHandler( + std::function<ServerReadReactor<RequestType>*( ::grpc::CallbackServerContext*, ResponseType*)> - get_reactor) - : get_reactor_(std::move(get_reactor)) {} - void RunHandler(const HandlerParameter& param) final { - // Arena allocate a reader structure (that includes response) - ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); - - auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( - param.call->call(), sizeof(ServerCallbackReaderImpl))) - ServerCallbackReaderImpl( + get_reactor) + : get_reactor_(std::move(get_reactor)) {} + void RunHandler(const HandlerParameter& param) final { + // Arena allocate a reader structure (that includes response) + ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); + + auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( + param.call->call(), sizeof(ServerCallbackReaderImpl))) + ServerCallbackReaderImpl( static_cast<::grpc::CallbackServerContext*>(param.server_context), - param.call, std::move(param.call_requester)); + param.call, std::move(param.call_requester)); // Inlineable OnDone can be false in the CompletionOp callback because there // is no read reactor that has an inlineable OnDone; this only applies to // the DefaultReactor (which is unary). - param.server_context->BeginCompletionOp( + param.server_context->BeginCompletionOp( param.call, [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); }, reader); - - ServerReadReactor<RequestType>* reactor = nullptr; - if (param.status.ok()) { - reactor = ::grpc::internal::CatchingReactorGetter< - ServerReadReactor<RequestType>>( - get_reactor_, + + ServerReadReactor<RequestType>* reactor = nullptr; + if (param.status.ok()) { + reactor = ::grpc::internal::CatchingReactorGetter< + ServerReadReactor<RequestType>>( + get_reactor_, static_cast<::grpc::CallbackServerContext*>(param.server_context), - reader->response()); - } - - if (reactor == nullptr) { - // if deserialization or reactor creator failed, we need to fail the call - reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( - param.call->call(), sizeof(UnimplementedReadReactor<RequestType>))) - UnimplementedReadReactor<RequestType>( - ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); - } - - reader->SetupReactor(reactor); - } - - private: + reader->response()); + } + + if (reactor == nullptr) { + // if deserialization or reactor creator failed, we need to fail the call + reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( + param.call->call(), sizeof(UnimplementedReadReactor<RequestType>))) + UnimplementedReadReactor<RequestType>( + ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); + } + + reader->SetupReactor(reactor); + } + + private: std::function<ServerReadReactor<RequestType>*(::grpc::CallbackServerContext*, ResponseType*)> - get_reactor_; - - class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> { - public: - void Finish(::grpc::Status s) override { + get_reactor_; + + class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> { + public: + void Finish(::grpc::Status s) override { // A finish tag with only MaybeDone can have its callback inlined // regardless even if OnDone is not inlineable because this callback just // checks a ref and then decides whether or not to dispatch OnDone. @@ -313,87 +313,87 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler { this->MaybeDone(/*inlineable_ondone=*/false); }, &finish_ops_, /*can_inline=*/true); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - finish_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } - // The response is dropped if the status is not OK. - if (s.ok()) { - finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, - finish_ops_.SendMessagePtr(&resp_)); - } else { - finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); - } - finish_ops_.set_core_cq_tag(&finish_tag_); - call_.PerformOps(&finish_ops_); - } - - void SendInitialMetadata() override { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - this->Ref(); + if (!ctx_->sent_initial_metadata_) { + finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + // The response is dropped if the status is not OK. + if (s.ok()) { + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, + finish_ops_.SendMessagePtr(&resp_)); + } else { + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); + } + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); + } + + void SendInitialMetadata() override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + this->Ref(); // The callback for this function should not be inlined because it invokes // a user-controlled reaction, but any resulting OnDone can be inlined in // the executor to which this callback is dispatched. - meta_tag_.Set(call_.call(), - [this](bool ok) { + meta_tag_.Set(call_.call(), + [this](bool ok) { ServerReadReactor<RequestType>* reactor = reactor_.load(std::memory_order_relaxed); reactor->OnSendInitialMetadataDone(ok); this->MaybeDone(/*inlineable_ondone=*/true); - }, + }, &meta_ops_, /*can_inline=*/false); - meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - meta_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - meta_ops_.set_core_cq_tag(&meta_tag_); - call_.PerformOps(&meta_ops_); - } - - void Read(RequestType* req) override { - this->Ref(); - read_ops_.RecvMessage(req); - call_.PerformOps(&read_ops_); - } - - private: - friend class CallbackClientStreamingHandler<RequestType, ResponseType>; - + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + meta_ops_.set_core_cq_tag(&meta_tag_); + call_.PerformOps(&meta_ops_); + } + + void Read(RequestType* req) override { + this->Ref(); + read_ops_.RecvMessage(req); + call_.PerformOps(&read_ops_); + } + + private: + friend class CallbackClientStreamingHandler<RequestType, ResponseType>; + ServerCallbackReaderImpl(::grpc::CallbackServerContext* ctx, - ::grpc::internal::Call* call, - std::function<void()> call_requester) - : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} - - void SetupReactor(ServerReadReactor<RequestType>* reactor) { - reactor_.store(reactor, std::memory_order_relaxed); + ::grpc::internal::Call* call, + std::function<void()> call_requester) + : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} + + void SetupReactor(ServerReadReactor<RequestType>* reactor) { + reactor_.store(reactor, std::memory_order_relaxed); // The callback for this function should not be inlined because it invokes // a user-controlled reaction, but any resulting OnDone can be inlined in // the executor to which this callback is dispatched. - read_tag_.Set(call_.call(), + read_tag_.Set(call_.call(), [this, reactor](bool ok) { reactor->OnReadDone(ok); this->MaybeDone(/*inlineable_ondone=*/true); - }, + }, &read_ops_, /*can_inline=*/false); - read_ops_.set_core_cq_tag(&read_tag_); - this->BindReactor(reactor); - this->MaybeCallOnCancel(reactor); + read_ops_.set_core_cq_tag(&read_tag_); + this->BindReactor(reactor); + this->MaybeCallOnCancel(reactor); // Inlineable OnDone can be false here because there is no read // reactor that has an inlineable OnDone; this only applies to the // DefaultReactor (which is unary). this->MaybeDone(/*inlineable_ondone=*/false); - } - - ~ServerCallbackReaderImpl() {} - - ResponseType* response() { return &resp_; } - + } + + ~ServerCallbackReaderImpl() {} + + ResponseType* response() { return &resp_; } + void CallOnDone() override { reactor_.load(std::memory_order_relaxed)->OnDone(); grpc_call* call = call_.call(); @@ -401,107 +401,107 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler { this->~ServerCallbackReaderImpl(); // explicitly call destructor ::grpc::g_core_codegen_interface->grpc_call_unref(call); call_requester(); - } - - ServerReactor* reactor() override { - return reactor_.load(std::memory_order_relaxed); - } - - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> - meta_ops_; - ::grpc::internal::CallbackWithSuccessTag meta_tag_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpServerSendStatus> - finish_ops_; - ::grpc::internal::CallbackWithSuccessTag finish_tag_; - ::grpc::internal::CallOpSet< - ::grpc::internal::CallOpRecvMessage<RequestType>> - read_ops_; - ::grpc::internal::CallbackWithSuccessTag read_tag_; - + } + + ServerReactor* reactor() override { + return reactor_.load(std::memory_order_relaxed); + } + + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + ::grpc::internal::CallbackWithSuccessTag meta_tag_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> + finish_ops_; + ::grpc::internal::CallbackWithSuccessTag finish_tag_; + ::grpc::internal::CallOpSet< + ::grpc::internal::CallOpRecvMessage<RequestType>> + read_ops_; + ::grpc::internal::CallbackWithSuccessTag read_tag_; + ::grpc::CallbackServerContext* const ctx_; - ::grpc::internal::Call call_; - ResponseType resp_; - std::function<void()> call_requester_; - // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. - std::atomic<ServerReadReactor<RequestType>*> reactor_; - // callbacks_outstanding_ follows a refcount pattern - std::atomic<intptr_t> callbacks_outstanding_{ - 3}; // reserve for OnStarted, Finish, and CompletionOp - }; -}; - -template <class RequestType, class ResponseType> -class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler { - public: - explicit CallbackServerStreamingHandler( - std::function<ServerWriteReactor<ResponseType>*( + ::grpc::internal::Call call_; + ResponseType resp_; + std::function<void()> call_requester_; + // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. + std::atomic<ServerReadReactor<RequestType>*> reactor_; + // callbacks_outstanding_ follows a refcount pattern + std::atomic<intptr_t> callbacks_outstanding_{ + 3}; // reserve for OnStarted, Finish, and CompletionOp + }; +}; + +template <class RequestType, class ResponseType> +class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler { + public: + explicit CallbackServerStreamingHandler( + std::function<ServerWriteReactor<ResponseType>*( ::grpc::CallbackServerContext*, const RequestType*)> - get_reactor) - : get_reactor_(std::move(get_reactor)) {} - void RunHandler(const HandlerParameter& param) final { - // Arena allocate a writer structure - ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); - - auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( - param.call->call(), sizeof(ServerCallbackWriterImpl))) - ServerCallbackWriterImpl( + get_reactor) + : get_reactor_(std::move(get_reactor)) {} + void RunHandler(const HandlerParameter& param) final { + // Arena allocate a writer structure + ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); + + auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( + param.call->call(), sizeof(ServerCallbackWriterImpl))) + ServerCallbackWriterImpl( static_cast<::grpc::CallbackServerContext*>(param.server_context), - param.call, static_cast<RequestType*>(param.request), - std::move(param.call_requester)); + param.call, static_cast<RequestType*>(param.request), + std::move(param.call_requester)); // Inlineable OnDone can be false in the CompletionOp callback because there // is no write reactor that has an inlineable OnDone; this only applies to // the DefaultReactor (which is unary). - param.server_context->BeginCompletionOp( + param.server_context->BeginCompletionOp( param.call, [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); }, writer); - - ServerWriteReactor<ResponseType>* reactor = nullptr; - if (param.status.ok()) { - reactor = ::grpc::internal::CatchingReactorGetter< - ServerWriteReactor<ResponseType>>( - get_reactor_, + + ServerWriteReactor<ResponseType>* reactor = nullptr; + if (param.status.ok()) { + reactor = ::grpc::internal::CatchingReactorGetter< + ServerWriteReactor<ResponseType>>( + get_reactor_, static_cast<::grpc::CallbackServerContext*>(param.server_context), - writer->request()); - } - if (reactor == nullptr) { - // if deserialization or reactor creator failed, we need to fail the call - reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( - param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>))) - UnimplementedWriteReactor<ResponseType>( - ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); - } - - writer->SetupReactor(reactor); - } - - void* Deserialize(grpc_call* call, grpc_byte_buffer* req, - ::grpc::Status* status, void** /*handler_data*/) final { - ::grpc::ByteBuffer buf; - buf.set_buffer(req); - auto* request = - new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call, sizeof(RequestType))) RequestType(); - *status = - ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); - buf.Release(); - if (status->ok()) { - return request; - } - request->~RequestType(); - return nullptr; - } - - private: - std::function<ServerWriteReactor<ResponseType>*( + writer->request()); + } + if (reactor == nullptr) { + // if deserialization or reactor creator failed, we need to fail the call + reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( + param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>))) + UnimplementedWriteReactor<ResponseType>( + ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); + } + + writer->SetupReactor(reactor); + } + + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, + ::grpc::Status* status, void** /*handler_data*/) final { + ::grpc::ByteBuffer buf; + buf.set_buffer(req); + auto* request = + new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( + call, sizeof(RequestType))) RequestType(); + *status = + ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); + buf.Release(); + if (status->ok()) { + return request; + } + request->~RequestType(); + return nullptr; + } + + private: + std::function<ServerWriteReactor<ResponseType>*( ::grpc::CallbackServerContext*, const RequestType*)> - get_reactor_; - - class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> { - public: - void Finish(::grpc::Status s) override { + get_reactor_; + + class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> { + public: + void Finish(::grpc::Status s) override { // A finish tag with only MaybeDone can have its callback inlined // regardless even if OnDone is not inlineable because this callback just // checks a ref and then decides whether or not to dispatch OnDone. @@ -513,85 +513,85 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler { this->MaybeDone(/*inlineable_ondone=*/false); }, &finish_ops_, /*can_inline=*/true); - finish_ops_.set_core_cq_tag(&finish_tag_); - - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - finish_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } - finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); - call_.PerformOps(&finish_ops_); - } - - void SendInitialMetadata() override { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - this->Ref(); + finish_ops_.set_core_cq_tag(&finish_tag_); + + if (!ctx_->sent_initial_metadata_) { + finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); + call_.PerformOps(&finish_ops_); + } + + void SendInitialMetadata() override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + this->Ref(); // The callback for this function should not be inlined because it invokes // a user-controlled reaction, but any resulting OnDone can be inlined in // the executor to which this callback is dispatched. - meta_tag_.Set(call_.call(), - [this](bool ok) { + meta_tag_.Set(call_.call(), + [this](bool ok) { ServerWriteReactor<ResponseType>* reactor = reactor_.load(std::memory_order_relaxed); reactor->OnSendInitialMetadataDone(ok); this->MaybeDone(/*inlineable_ondone=*/true); - }, + }, &meta_ops_, /*can_inline=*/false); - meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - meta_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - meta_ops_.set_core_cq_tag(&meta_tag_); - call_.PerformOps(&meta_ops_); - } - - void Write(const ResponseType* resp, - ::grpc::WriteOptions options) override { - this->Ref(); - if (options.is_last_message()) { - options.set_buffer_hint(); - } - if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - write_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } - // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); - call_.PerformOps(&write_ops_); - } - - void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options, - ::grpc::Status s) override { - // This combines the write into the finish callback + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + meta_ops_.set_core_cq_tag(&meta_tag_); + call_.PerformOps(&meta_ops_); + } + + void Write(const ResponseType* resp, + ::grpc::WriteOptions options) override { + this->Ref(); + if (options.is_last_message()) { + options.set_buffer_hint(); + } + if (!ctx_->sent_initial_metadata_) { + write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + write_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); + call_.PerformOps(&write_ops_); + } + + void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options, + ::grpc::Status s) override { + // This combines the write into the finish callback // TODO(vjpai): don't assert GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); - Finish(std::move(s)); - } - - private: - friend class CallbackServerStreamingHandler<RequestType, ResponseType>; - + Finish(std::move(s)); + } + + private: + friend class CallbackServerStreamingHandler<RequestType, ResponseType>; + ServerCallbackWriterImpl(::grpc::CallbackServerContext* ctx, - ::grpc::internal::Call* call, - const RequestType* req, - std::function<void()> call_requester) - : ctx_(ctx), - call_(*call), - req_(req), - call_requester_(std::move(call_requester)) {} - - void SetupReactor(ServerWriteReactor<ResponseType>* reactor) { - reactor_.store(reactor, std::memory_order_relaxed); + ::grpc::internal::Call* call, + const RequestType* req, + std::function<void()> call_requester) + : ctx_(ctx), + call_(*call), + req_(req), + call_requester_(std::move(call_requester)) {} + + void SetupReactor(ServerWriteReactor<ResponseType>* reactor) { + reactor_.store(reactor, std::memory_order_relaxed); // The callback for this function should not be inlined because it invokes // a user-controlled reaction, but any resulting OnDone can be inlined in // the executor to which this callback is dispatched. @@ -601,18 +601,18 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler { this->MaybeDone(/*inlineable_ondone=*/true); }, &write_ops_, /*can_inline=*/false); - write_ops_.set_core_cq_tag(&write_tag_); - this->BindReactor(reactor); - this->MaybeCallOnCancel(reactor); + write_ops_.set_core_cq_tag(&write_tag_); + this->BindReactor(reactor); + this->MaybeCallOnCancel(reactor); // Inlineable OnDone can be false here because there is no write // reactor that has an inlineable OnDone; this only applies to the // DefaultReactor (which is unary). this->MaybeDone(/*inlineable_ondone=*/false); - } - ~ServerCallbackWriterImpl() { req_->~RequestType(); } - - const RequestType* request() { return req_; } - + } + ~ServerCallbackWriterImpl() { req_->~RequestType(); } + + const RequestType* request() { return req_; } + void CallOnDone() override { reactor_.load(std::memory_order_relaxed)->OnDone(); grpc_call* call = call_.call(); @@ -620,90 +620,90 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler { this->~ServerCallbackWriterImpl(); // explicitly call destructor ::grpc::g_core_codegen_interface->grpc_call_unref(call); call_requester(); - } - - ServerReactor* reactor() override { - return reactor_.load(std::memory_order_relaxed); - } - - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> - meta_ops_; - ::grpc::internal::CallbackWithSuccessTag meta_tag_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpServerSendStatus> - finish_ops_; - ::grpc::internal::CallbackWithSuccessTag finish_tag_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage> - write_ops_; - ::grpc::internal::CallbackWithSuccessTag write_tag_; - + } + + ServerReactor* reactor() override { + return reactor_.load(std::memory_order_relaxed); + } + + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + ::grpc::internal::CallbackWithSuccessTag meta_tag_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> + finish_ops_; + ::grpc::internal::CallbackWithSuccessTag finish_tag_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage> + write_ops_; + ::grpc::internal::CallbackWithSuccessTag write_tag_; + ::grpc::CallbackServerContext* const ctx_; - ::grpc::internal::Call call_; - const RequestType* req_; - std::function<void()> call_requester_; - // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. - std::atomic<ServerWriteReactor<ResponseType>*> reactor_; - // callbacks_outstanding_ follows a refcount pattern - std::atomic<intptr_t> callbacks_outstanding_{ - 3}; // reserve for OnStarted, Finish, and CompletionOp - }; -}; - -template <class RequestType, class ResponseType> -class CallbackBidiHandler : public ::grpc::internal::MethodHandler { - public: - explicit CallbackBidiHandler( - std::function<ServerBidiReactor<RequestType, ResponseType>*( + ::grpc::internal::Call call_; + const RequestType* req_; + std::function<void()> call_requester_; + // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. + std::atomic<ServerWriteReactor<ResponseType>*> reactor_; + // callbacks_outstanding_ follows a refcount pattern + std::atomic<intptr_t> callbacks_outstanding_{ + 3}; // reserve for OnStarted, Finish, and CompletionOp + }; +}; + +template <class RequestType, class ResponseType> +class CallbackBidiHandler : public ::grpc::internal::MethodHandler { + public: + explicit CallbackBidiHandler( + std::function<ServerBidiReactor<RequestType, ResponseType>*( ::grpc::CallbackServerContext*)> - get_reactor) - : get_reactor_(std::move(get_reactor)) {} - void RunHandler(const HandlerParameter& param) final { - ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); - - auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( - param.call->call(), sizeof(ServerCallbackReaderWriterImpl))) - ServerCallbackReaderWriterImpl( + get_reactor) + : get_reactor_(std::move(get_reactor)) {} + void RunHandler(const HandlerParameter& param) final { + ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); + + auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( + param.call->call(), sizeof(ServerCallbackReaderWriterImpl))) + ServerCallbackReaderWriterImpl( static_cast<::grpc::CallbackServerContext*>(param.server_context), - param.call, std::move(param.call_requester)); + param.call, std::move(param.call_requester)); // Inlineable OnDone can be false in the CompletionOp callback because there // is no bidi reactor that has an inlineable OnDone; this only applies to // the DefaultReactor (which is unary). - param.server_context->BeginCompletionOp( + param.server_context->BeginCompletionOp( param.call, [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); }, stream); - - ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr; - if (param.status.ok()) { - reactor = ::grpc::internal::CatchingReactorGetter< - ServerBidiReactor<RequestType, ResponseType>>( + + ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr; + if (param.status.ok()) { + reactor = ::grpc::internal::CatchingReactorGetter< + ServerBidiReactor<RequestType, ResponseType>>( get_reactor_, static_cast<::grpc::CallbackServerContext*>(param.server_context)); - } - - if (reactor == nullptr) { - // if deserialization or reactor creator failed, we need to fail the call - reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( - param.call->call(), - sizeof(UnimplementedBidiReactor<RequestType, ResponseType>))) - UnimplementedBidiReactor<RequestType, ResponseType>( - ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); - } - - stream->SetupReactor(reactor); - } - - private: - std::function<ServerBidiReactor<RequestType, ResponseType>*( + } + + if (reactor == nullptr) { + // if deserialization or reactor creator failed, we need to fail the call + reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( + param.call->call(), + sizeof(UnimplementedBidiReactor<RequestType, ResponseType>))) + UnimplementedBidiReactor<RequestType, ResponseType>( + ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); + } + + stream->SetupReactor(reactor); + } + + private: + std::function<ServerBidiReactor<RequestType, ResponseType>*( ::grpc::CallbackServerContext*)> - get_reactor_; - - class ServerCallbackReaderWriterImpl - : public ServerCallbackReaderWriter<RequestType, ResponseType> { - public: - void Finish(::grpc::Status s) override { + get_reactor_; + + class ServerCallbackReaderWriterImpl + : public ServerCallbackReaderWriter<RequestType, ResponseType> { + public: + void Finish(::grpc::Status s) override { // A finish tag with only MaybeDone can have its callback inlined // regardless even if OnDone is not inlineable because this callback just // checks a ref and then decides whether or not to dispatch OnDone. @@ -715,86 +715,86 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler { this->MaybeDone(/*inlineable_ondone=*/false); }, &finish_ops_, /*can_inline=*/true); - finish_ops_.set_core_cq_tag(&finish_tag_); - - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - finish_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } - finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); - call_.PerformOps(&finish_ops_); - } - - void SendInitialMetadata() override { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - this->Ref(); + finish_ops_.set_core_cq_tag(&finish_tag_); + + if (!ctx_->sent_initial_metadata_) { + finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); + call_.PerformOps(&finish_ops_); + } + + void SendInitialMetadata() override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + this->Ref(); // The callback for this function should not be inlined because it invokes // a user-controlled reaction, but any resulting OnDone can be inlined in // the executor to which this callback is dispatched. - meta_tag_.Set(call_.call(), - [this](bool ok) { + meta_tag_.Set(call_.call(), + [this](bool ok) { ServerBidiReactor<RequestType, ResponseType>* reactor = reactor_.load(std::memory_order_relaxed); reactor->OnSendInitialMetadataDone(ok); this->MaybeDone(/*inlineable_ondone=*/true); - }, + }, &meta_ops_, /*can_inline=*/false); - meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - meta_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - meta_ops_.set_core_cq_tag(&meta_tag_); - call_.PerformOps(&meta_ops_); - } - - void Write(const ResponseType* resp, - ::grpc::WriteOptions options) override { - this->Ref(); - if (options.is_last_message()) { - options.set_buffer_hint(); - } - if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - write_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } - // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); - call_.PerformOps(&write_ops_); - } - - void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options, - ::grpc::Status s) override { + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + meta_ops_.set_core_cq_tag(&meta_tag_); + call_.PerformOps(&meta_ops_); + } + + void Write(const ResponseType* resp, + ::grpc::WriteOptions options) override { + this->Ref(); + if (options.is_last_message()) { + options.set_buffer_hint(); + } + if (!ctx_->sent_initial_metadata_) { + write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + write_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); + call_.PerformOps(&write_ops_); + } + + void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options, + ::grpc::Status s) override { // TODO(vjpai): don't assert GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); - Finish(std::move(s)); - } - - void Read(RequestType* req) override { - this->Ref(); - read_ops_.RecvMessage(req); - call_.PerformOps(&read_ops_); - } - - private: - friend class CallbackBidiHandler<RequestType, ResponseType>; - + Finish(std::move(s)); + } + + void Read(RequestType* req) override { + this->Ref(); + read_ops_.RecvMessage(req); + call_.PerformOps(&read_ops_); + } + + private: + friend class CallbackBidiHandler<RequestType, ResponseType>; + ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext* ctx, - ::grpc::internal::Call* call, - std::function<void()> call_requester) - : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} - - void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) { - reactor_.store(reactor, std::memory_order_relaxed); + ::grpc::internal::Call* call, + std::function<void()> call_requester) + : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} + + void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) { + reactor_.store(reactor, std::memory_order_relaxed); // The callbacks for these functions should not be inlined because they // invoke user-controlled reactions, but any resulting OnDones can be // inlined in the executor to which a callback is dispatched. @@ -804,22 +804,22 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler { this->MaybeDone(/*inlineable_ondone=*/true); }, &write_ops_, /*can_inline=*/false); - write_ops_.set_core_cq_tag(&write_tag_); - read_tag_.Set(call_.call(), + write_ops_.set_core_cq_tag(&write_tag_); + read_tag_.Set(call_.call(), [this, reactor](bool ok) { reactor->OnReadDone(ok); this->MaybeDone(/*inlineable_ondone=*/true); - }, + }, &read_ops_, /*can_inline=*/false); - read_ops_.set_core_cq_tag(&read_tag_); - this->BindReactor(reactor); - this->MaybeCallOnCancel(reactor); + read_ops_.set_core_cq_tag(&read_tag_); + this->BindReactor(reactor); + this->MaybeCallOnCancel(reactor); // Inlineable OnDone can be false here because there is no bidi // reactor that has an inlineable OnDone; this only applies to the // DefaultReactor (which is unary). this->MaybeDone(/*inlineable_ondone=*/false); - } - + } + void CallOnDone() override { reactor_.load(std::memory_order_relaxed)->OnDone(); grpc_call* call = call_.call(); @@ -827,41 +827,41 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler { this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor ::grpc::g_core_codegen_interface->grpc_call_unref(call); call_requester(); - } - - ServerReactor* reactor() override { - return reactor_.load(std::memory_order_relaxed); - } - - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> - meta_ops_; - ::grpc::internal::CallbackWithSuccessTag meta_tag_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpServerSendStatus> - finish_ops_; - ::grpc::internal::CallbackWithSuccessTag finish_tag_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage> - write_ops_; - ::grpc::internal::CallbackWithSuccessTag write_tag_; - ::grpc::internal::CallOpSet< - ::grpc::internal::CallOpRecvMessage<RequestType>> - read_ops_; - ::grpc::internal::CallbackWithSuccessTag read_tag_; - + } + + ServerReactor* reactor() override { + return reactor_.load(std::memory_order_relaxed); + } + + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + ::grpc::internal::CallbackWithSuccessTag meta_tag_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> + finish_ops_; + ::grpc::internal::CallbackWithSuccessTag finish_tag_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage> + write_ops_; + ::grpc::internal::CallbackWithSuccessTag write_tag_; + ::grpc::internal::CallOpSet< + ::grpc::internal::CallOpRecvMessage<RequestType>> + read_ops_; + ::grpc::internal::CallbackWithSuccessTag read_tag_; + ::grpc::CallbackServerContext* const ctx_; - ::grpc::internal::Call call_; - std::function<void()> call_requester_; - // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. - std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_; - // callbacks_outstanding_ follows a refcount pattern - std::atomic<intptr_t> callbacks_outstanding_{ - 3}; // reserve for OnStarted, Finish, and CompletionOp - }; -}; - -} // namespace internal + ::grpc::internal::Call call_; + std::function<void()> call_requester_; + // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. + std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_; + // callbacks_outstanding_ follows a refcount pattern + std::atomic<intptr_t> callbacks_outstanding_{ + 3}; // reserve for OnStarted, Finish, and CompletionOp + }; +}; + +} // namespace internal } // namespace grpc - -#endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H + +#endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H |