diff options
author | dvshkurko <dvshkurko@yandex-team.ru> | 2022-02-10 16:45:51 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:51 +0300 |
commit | 321ee9bce31ec6e238be26dbcbe539cffa2c3309 (patch) | |
tree | 14407a2757cbf29eb97e266b7f07e851f971000c /contrib/libs/grpc/src/cpp/server/server_cc.cc | |
parent | 2f6ca198245aeffd5e2d82b65927c2465b68b4f5 (diff) | |
download | ydb-321ee9bce31ec6e238be26dbcbe539cffa2c3309.tar.gz |
Restoring authorship annotation for <dvshkurko@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/libs/grpc/src/cpp/server/server_cc.cc')
-rw-r--r-- | contrib/libs/grpc/src/cpp/server/server_cc.cc | 842 |
1 files changed, 421 insertions, 421 deletions
diff --git a/contrib/libs/grpc/src/cpp/server/server_cc.cc b/contrib/libs/grpc/src/cpp/server/server_cc.cc index c2a911c7f7..5cf50d9266 100644 --- a/contrib/libs/grpc/src/cpp/server/server_cc.cc +++ b/contrib/libs/grpc/src/cpp/server/server_cc.cc @@ -19,7 +19,7 @@ #include <cstdlib> #include <sstream> -#include <type_traits> +#include <type_traits> #include <utility> #include <grpc/grpc.h> @@ -29,10 +29,10 @@ #include <grpcpp/completion_queue.h> #include <grpcpp/generic/async_generic_service.h> #include <grpcpp/impl/codegen/async_unary_call.h> -#include <grpcpp/impl/codegen/byte_buffer.h> +#include <grpcpp/impl/codegen/byte_buffer.h> #include <grpcpp/impl/codegen/call.h> #include <grpcpp/impl/codegen/completion_queue_tag.h> -#include <grpcpp/impl/codegen/method_handler.h> +#include <grpcpp/impl/codegen/method_handler.h> #include <grpcpp/impl/codegen/server_interceptor.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/impl/rpc_service_method.h> @@ -49,7 +49,7 @@ #include "src/core/lib/surface/completion_queue.h" #include "src/core/lib/surface/server.h" #include "src/cpp/client/create_channel_internal.h" -#include "src/cpp/server/external_connection_acceptor_impl.h" +#include "src/cpp/server/external_connection_acceptor_impl.h" #include "src/cpp/server/health/default_health_check_service.h" #include "src/cpp/thread_manager/thread_manager.h" @@ -68,8 +68,8 @@ namespace { class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { public: ~DefaultGlobalCallbacks() override {} - void PreSynchronousRequest(ServerContext* /*context*/) override {} - void PostSynchronousRequest(ServerContext* /*context*/) override {} + void PreSynchronousRequest(ServerContext* /*context*/) override {} + void PostSynchronousRequest(ServerContext* /*context*/) override {} }; std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr; @@ -83,12 +83,12 @@ void InitGlobalCallbacks() { class ShutdownTag : public internal::CompletionQueueTag { public: - bool FinalizeResult(void** /*tag*/, bool* /*status*/) { return false; } + bool FinalizeResult(void** /*tag*/, bool* /*status*/) { return false; } }; class DummyTag : public internal::CompletionQueueTag { public: - bool FinalizeResult(void** /*tag*/, bool* /*status*/) { return true; } + bool FinalizeResult(void** /*tag*/, bool* /*status*/) { return true; } }; class UnimplementedAsyncRequestContext { @@ -110,187 +110,187 @@ using ::grpc::experimental::GenericCallbackServerContext; } // namespace -ServerInterface::BaseAsyncRequest::BaseAsyncRequest( - ServerInterface* server, ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) - : server_(server), - context_(context), - stream_(stream), - call_cq_(call_cq), - notification_cq_(notification_cq), - tag_(tag), - delete_on_finalize_(delete_on_finalize), - call_(nullptr), - done_intercepting_(false) { - /* Set up interception state partially for the receive ops. call_wrapper_ is - * not filled at this point, but it will be filled before the interceptors are - * run. */ - interceptor_methods_.SetCall(&call_wrapper_); - interceptor_methods_.SetReverse(); - call_cq_->RegisterAvalanching(); // This op will trigger more ops -} - -ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { - call_cq_->CompleteAvalanching(); -} - -bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, - bool* status) { - if (done_intercepting_) { - *tag = tag_; - if (delete_on_finalize_) { - delete this; - } - return true; - } - context_->set_call(call_); - context_->cq_ = call_cq_; - if (call_wrapper_.call() == nullptr) { - // Fill it since it is empty. - call_wrapper_ = internal::Call( - call_, server_, call_cq_, server_->max_receive_message_size(), nullptr); - } - - // just the pointers inside call are copied here - stream_->BindCall(&call_wrapper_); - - if (*status && call_ && call_wrapper_.server_rpc_info()) { - done_intercepting_ = true; - // Set interception point for RECV INITIAL METADATA - interceptor_methods_.AddInterceptionHookPoint( - experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); - interceptor_methods_.SetRecvInitialMetadata(&context_->client_metadata_); - if (interceptor_methods_.RunInterceptors( - [this]() { ContinueFinalizeResultAfterInterception(); })) { - // There are no interceptors to run. Continue - } else { - // There were interceptors to be run, so - // ContinueFinalizeResultAfterInterception will be run when interceptors - // are done. - return false; - } - } - if (*status && call_) { - context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); - } - *tag = tag_; - if (delete_on_finalize_) { - delete this; - } - return true; -} - -void ServerInterface::BaseAsyncRequest:: - ContinueFinalizeResultAfterInterception() { - context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); - // Queue a tag which will be returned immediately - grpc_core::ExecCtx exec_ctx; - grpc_cq_begin_op(notification_cq_->cq(), this); - grpc_cq_end_op( - notification_cq_->cq(), this, GRPC_ERROR_NONE, - [](void* /*arg*/, grpc_cq_completion* completion) { delete completion; }, - nullptr, new grpc_cq_completion()); -} - -ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( - ServerInterface* server, ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, const char* name, - internal::RpcMethod::RpcType type) - : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, - true), - name_(name), - type_(type) {} - -void ServerInterface::RegisteredAsyncRequest::IssueRequest( - void* registered_method, grpc_byte_buffer** payload, - ServerCompletionQueue* notification_cq) { - // The following call_start_batch is internally-generated so no need for an - // explanatory log on failure. - GPR_ASSERT(grpc_server_request_registered_call( - server_->server(), registered_method, &call_, - &context_->deadline_, context_->client_metadata_.arr(), - payload, call_cq_->cq(), notification_cq->cq(), - this) == GRPC_CALL_OK); -} - -ServerInterface::GenericAsyncRequest::GenericAsyncRequest( - ServerInterface* server, GenericServerContext* context, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) - : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, - delete_on_finalize) { - grpc_call_details_init(&call_details_); - GPR_ASSERT(notification_cq); - GPR_ASSERT(call_cq); - // The following call_start_batch is internally-generated so no need for an - // explanatory log on failure. - GPR_ASSERT(grpc_server_request_call(server->server(), &call_, &call_details_, - context->client_metadata_.arr(), - call_cq->cq(), notification_cq->cq(), - this) == GRPC_CALL_OK); -} - -bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, - bool* status) { - // If we are done intercepting, there is nothing more for us to do - if (done_intercepting_) { - return BaseAsyncRequest::FinalizeResult(tag, status); - } - // TODO(yangg) remove the copy here. - if (*status) { - static_cast<GenericServerContext*>(context_)->method_ = - StringFromCopiedSlice(call_details_.method); - static_cast<GenericServerContext*>(context_)->host_ = - StringFromCopiedSlice(call_details_.host); - context_->deadline_ = call_details_.deadline; - } - grpc_slice_unref(call_details_.method); - grpc_slice_unref(call_details_.host); - call_wrapper_ = internal::Call( - call_, server_, call_cq_, server_->max_receive_message_size(), - context_->set_server_rpc_info( - static_cast<GenericServerContext*>(context_)->method_.c_str(), - internal::RpcMethod::BIDI_STREAMING, - *server_->interceptor_creators())); - return BaseAsyncRequest::FinalizeResult(tag, status); -} - -namespace { -class ShutdownCallback : public grpc_experimental_completion_queue_functor { - public: - ShutdownCallback() { - functor_run = &ShutdownCallback::Run; - // Set inlineable to true since this callback is trivial and thus does not - // need to be run from the executor (triggering a thread hop). This should - // only be used by internal callbacks like this and not by user application - // code. - inlineable = true; - } - // TakeCQ takes ownership of the cq into the shutdown callback - // so that the shutdown callback will be responsible for destroying it - void TakeCQ(CompletionQueue* cq) { cq_ = cq; } - - // The Run function will get invoked by the completion queue library - // when the shutdown is actually complete - static void Run(grpc_experimental_completion_queue_functor* cb, int) { - auto* callback = static_cast<ShutdownCallback*>(cb); - delete callback->cq_; - delete callback; - } - - private: - CompletionQueue* cq_ = nullptr; -}; -} // namespace - +ServerInterface::BaseAsyncRequest::BaseAsyncRequest( + ServerInterface* server, ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) + : server_(server), + context_(context), + stream_(stream), + call_cq_(call_cq), + notification_cq_(notification_cq), + tag_(tag), + delete_on_finalize_(delete_on_finalize), + call_(nullptr), + done_intercepting_(false) { + /* Set up interception state partially for the receive ops. call_wrapper_ is + * not filled at this point, but it will be filled before the interceptors are + * run. */ + interceptor_methods_.SetCall(&call_wrapper_); + interceptor_methods_.SetReverse(); + call_cq_->RegisterAvalanching(); // This op will trigger more ops +} + +ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { + call_cq_->CompleteAvalanching(); +} + +bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, + bool* status) { + if (done_intercepting_) { + *tag = tag_; + if (delete_on_finalize_) { + delete this; + } + return true; + } + context_->set_call(call_); + context_->cq_ = call_cq_; + if (call_wrapper_.call() == nullptr) { + // Fill it since it is empty. + call_wrapper_ = internal::Call( + call_, server_, call_cq_, server_->max_receive_message_size(), nullptr); + } + + // just the pointers inside call are copied here + stream_->BindCall(&call_wrapper_); + + if (*status && call_ && call_wrapper_.server_rpc_info()) { + done_intercepting_ = true; + // Set interception point for RECV INITIAL METADATA + interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); + interceptor_methods_.SetRecvInitialMetadata(&context_->client_metadata_); + if (interceptor_methods_.RunInterceptors( + [this]() { ContinueFinalizeResultAfterInterception(); })) { + // There are no interceptors to run. Continue + } else { + // There were interceptors to be run, so + // ContinueFinalizeResultAfterInterception will be run when interceptors + // are done. + return false; + } + } + if (*status && call_) { + context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); + } + *tag = tag_; + if (delete_on_finalize_) { + delete this; + } + return true; +} + +void ServerInterface::BaseAsyncRequest:: + ContinueFinalizeResultAfterInterception() { + context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); + // Queue a tag which will be returned immediately + grpc_core::ExecCtx exec_ctx; + grpc_cq_begin_op(notification_cq_->cq(), this); + grpc_cq_end_op( + notification_cq_->cq(), this, GRPC_ERROR_NONE, + [](void* /*arg*/, grpc_cq_completion* completion) { delete completion; }, + nullptr, new grpc_cq_completion()); +} + +ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( + ServerInterface* server, ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, const char* name, + internal::RpcMethod::RpcType type) + : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, + true), + name_(name), + type_(type) {} + +void ServerInterface::RegisteredAsyncRequest::IssueRequest( + void* registered_method, grpc_byte_buffer** payload, + ServerCompletionQueue* notification_cq) { + // The following call_start_batch is internally-generated so no need for an + // explanatory log on failure. + GPR_ASSERT(grpc_server_request_registered_call( + server_->server(), registered_method, &call_, + &context_->deadline_, context_->client_metadata_.arr(), + payload, call_cq_->cq(), notification_cq->cq(), + this) == GRPC_CALL_OK); +} + +ServerInterface::GenericAsyncRequest::GenericAsyncRequest( + ServerInterface* server, GenericServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) + : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, + delete_on_finalize) { + grpc_call_details_init(&call_details_); + GPR_ASSERT(notification_cq); + GPR_ASSERT(call_cq); + // The following call_start_batch is internally-generated so no need for an + // explanatory log on failure. + GPR_ASSERT(grpc_server_request_call(server->server(), &call_, &call_details_, + context->client_metadata_.arr(), + call_cq->cq(), notification_cq->cq(), + this) == GRPC_CALL_OK); +} + +bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, + bool* status) { + // If we are done intercepting, there is nothing more for us to do + if (done_intercepting_) { + return BaseAsyncRequest::FinalizeResult(tag, status); + } + // TODO(yangg) remove the copy here. + if (*status) { + static_cast<GenericServerContext*>(context_)->method_ = + StringFromCopiedSlice(call_details_.method); + static_cast<GenericServerContext*>(context_)->host_ = + StringFromCopiedSlice(call_details_.host); + context_->deadline_ = call_details_.deadline; + } + grpc_slice_unref(call_details_.method); + grpc_slice_unref(call_details_.host); + call_wrapper_ = internal::Call( + call_, server_, call_cq_, server_->max_receive_message_size(), + context_->set_server_rpc_info( + static_cast<GenericServerContext*>(context_)->method_.c_str(), + internal::RpcMethod::BIDI_STREAMING, + *server_->interceptor_creators())); + return BaseAsyncRequest::FinalizeResult(tag, status); +} + +namespace { +class ShutdownCallback : public grpc_experimental_completion_queue_functor { + public: + ShutdownCallback() { + functor_run = &ShutdownCallback::Run; + // Set inlineable to true since this callback is trivial and thus does not + // need to be run from the executor (triggering a thread hop). This should + // only be used by internal callbacks like this and not by user application + // code. + inlineable = true; + } + // TakeCQ takes ownership of the cq into the shutdown callback + // so that the shutdown callback will be responsible for destroying it + void TakeCQ(CompletionQueue* cq) { cq_ = cq; } + + // The Run function will get invoked by the completion queue library + // when the shutdown is actually complete + static void Run(grpc_experimental_completion_queue_functor* cb, int) { + auto* callback = static_cast<ShutdownCallback*>(cb); + delete callback->cq_; + delete callback; + } + + private: + CompletionQueue* cq_ = nullptr; +}; +} // namespace + /// Use private inheritance rather than composition only to establish order /// of construction, since the public base class should be constructed after the /// elements belonging to the private base class are constructed. This is not /// possible using true composition. class Server::UnimplementedAsyncRequest final - : private grpc::UnimplementedAsyncRequestContext, + : private grpc::UnimplementedAsyncRequestContext, public GenericAsyncRequest { public: UnimplementedAsyncRequest(ServerInterface* server, @@ -300,25 +300,25 @@ class Server::UnimplementedAsyncRequest final bool FinalizeResult(void** tag, bool* status) override; - grpc::ServerContext* context() { return &server_context_; } - grpc::GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } + grpc::ServerContext* context() { return &server_context_; } + grpc::GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } }; /// UnimplementedAsyncResponse should not post user-visible completions to the /// C++ completion queue, but is generated as a CQ event by the core class Server::UnimplementedAsyncResponse final - : public grpc::internal::CallOpSet< - grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpServerSendStatus> { + : public grpc::internal::CallOpSet< + grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpServerSendStatus> { public: UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); ~UnimplementedAsyncResponse() { delete request_; } bool FinalizeResult(void** tag, bool* status) override { - if (grpc::internal::CallOpSet< - grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag, - status)) { + if (grpc::internal::CallOpSet< + grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag, + status)) { delete this; } else { // The tag was swallowed due to interception. We will see it again. @@ -330,16 +330,16 @@ class Server::UnimplementedAsyncResponse final UnimplementedAsyncRequest* const request_; }; -class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { +class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { public: - SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag) + SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag) : method_(method), method_tag_(method_tag), in_flight_(false), - has_request_payload_(method->method_type() == - grpc::internal::RpcMethod::NORMAL_RPC || - method->method_type() == - grpc::internal::RpcMethod::SERVER_STREAMING), + has_request_payload_(method->method_type() == + grpc::internal::RpcMethod::NORMAL_RPC || + method->method_type() == + grpc::internal::RpcMethod::SERVER_STREAMING), call_details_(nullptr), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); @@ -395,7 +395,7 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { } } - bool FinalizeResult(void** /*tag*/, bool* status) override { + bool FinalizeResult(void** /*tag*/, bool* status) override { if (!*status) { grpc_completion_queue_destroy(cq_); cq_ = nullptr; @@ -450,8 +450,8 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { interceptor_methods_.SetReverse(); // Set interception point for RECV INITIAL METADATA interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints:: - POST_RECV_INITIAL_METADATA); + grpc::experimental::InterceptionHookPoints:: + POST_RECV_INITIAL_METADATA); interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_); if (has_request_payload_) { @@ -459,11 +459,11 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); request_ = handler->Deserialize(call_.call(), request_payload_, - &request_status_, nullptr); + &request_status_, nullptr); request_payload_ = nullptr; interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); interceptor_methods_.SetRecvMessage(request_, nullptr); } @@ -482,40 +482,40 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { global_callbacks_->PreSynchronousRequest(&ctx_); auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); - handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( - &call_, &ctx_, request_, request_status_, nullptr, nullptr)); + handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( + &call_, &ctx_, request_, request_status_, nullptr, nullptr)); request_ = nullptr; global_callbacks_->PostSynchronousRequest(&ctx_); cq_.Shutdown(); - grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); + grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); /* Ensure the cq_ is shutdown */ - grpc::DummyTag ignored_tag; + grpc::DummyTag ignored_tag; GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); } delete this; } private: - grpc::CompletionQueue cq_; - grpc::ServerContext ctx_; + grpc::CompletionQueue cq_; + grpc::ServerContext ctx_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; void* request_; - grpc::Status request_status_; - grpc::internal::RpcServiceMethod* const method_; - grpc::internal::Call call_; + grpc::Status request_status_; + grpc::internal::RpcServiceMethod* const method_; + grpc::internal::Call call_; Server* server_; std::shared_ptr<GlobalCallbacks> global_callbacks_; bool resources_; - grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; + grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; }; private: - grpc::internal::RpcServiceMethod* const method_; + grpc::internal::RpcServiceMethod* const method_; void* const method_tag_; bool in_flight_; const bool has_request_payload_; @@ -527,14 +527,14 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { grpc_completion_queue* cq_; }; -template <class ServerContextType> +template <class ServerContextType> class Server::CallbackRequest final : public grpc::internal::CompletionQueueTag { - public: + public: static_assert( std::is_base_of<grpc::CallbackServerContext, ServerContextType>::value, "ServerContextType must be derived from CallbackServerContext"); - + // For codegen services, the value of method represents the defined // characteristics of the method being requested. For generic services, method // is nullptr since these services don't have pre-defined methods. @@ -578,26 +578,26 @@ class Server::CallbackRequest final server_->UnrefWithPossibleNotify(); } - // Needs specialization to account for different processing of metadata - // in generic API - bool FinalizeResult(void** tag, bool* status) override; + // Needs specialization to account for different processing of metadata + // in generic API + bool FinalizeResult(void** tag, bool* status) override; private: - // method_name needs to be specialized between named method and generic - const char* method_name() const; - + // method_name needs to be specialized between named method and generic + const char* method_name() const; + class CallbackCallTag : public grpc_experimental_completion_queue_functor { public: - CallbackCallTag(Server::CallbackRequest<ServerContextType>* req) - : req_(req) { + CallbackCallTag(Server::CallbackRequest<ServerContextType>* req) + : req_(req) { functor_run = &CallbackCallTag::StaticRun; - // Set inlineable to true since this callback is internally-controlled - // without taking any locks, and thus does not need to be run from the - // executor (which triggers a thread hop). This should only be used by - // internal callbacks like this and not by user application code. The work - // here is actually non-trivial, but there is no chance of having user - // locks conflict with each other so it's ok to run inlined. - inlineable = true; + // Set inlineable to true since this callback is internally-controlled + // without taking any locks, and thus does not need to be run from the + // executor (which triggers a thread hop). This should only be used by + // internal callbacks like this and not by user application code. The work + // here is actually non-trivial, but there is no chance of having user + // locks conflict with each other so it's ok to run inlined. + inlineable = true; } // force_run can not be performed on a tag if operations using this tag @@ -606,8 +606,8 @@ class Server::CallbackRequest final void force_run(bool ok) { Run(ok); } private: - Server::CallbackRequest<ServerContextType>* req_; - grpc::internal::Call* call_; + Server::CallbackRequest<ServerContextType>* req_; + grpc::internal::Call* call_; static void StaticRun(grpc_experimental_completion_queue_functor* cb, int ok) { @@ -634,35 +634,35 @@ class Server::CallbackRequest final req_->request_metadata_.count = 0; // Create a C++ Call to control the underlying core call - call_ = - new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call))) - grpc::internal::Call( - req_->call_, req_->server_, req_->cq_, - req_->server_->max_receive_message_size(), - req_->ctx_.set_server_rpc_info( - req_->method_name(), - (req_->method_ != nullptr) - ? req_->method_->method_type() - : grpc::internal::RpcMethod::BIDI_STREAMING, - req_->server_->interceptor_creators_)); + call_ = + new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call))) + grpc::internal::Call( + req_->call_, req_->server_, req_->cq_, + req_->server_->max_receive_message_size(), + req_->ctx_.set_server_rpc_info( + req_->method_name(), + (req_->method_ != nullptr) + ? req_->method_->method_type() + : grpc::internal::RpcMethod::BIDI_STREAMING, + req_->server_->interceptor_creators_)); req_->interceptor_methods_.SetCall(call_); req_->interceptor_methods_.SetReverse(); // Set interception point for RECV INITIAL METADATA req_->interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints:: - POST_RECV_INITIAL_METADATA); + grpc::experimental::InterceptionHookPoints:: + POST_RECV_INITIAL_METADATA); req_->interceptor_methods_.SetRecvInitialMetadata( &req_->ctx_.client_metadata_); if (req_->has_request_payload_) { // Set interception point for RECV MESSAGE req_->request_ = req_->method_->handler()->Deserialize( - req_->call_, req_->request_payload_, &req_->request_status_, - &req_->handler_data_); + req_->call_, req_->request_payload_, &req_->request_status_, + &req_->handler_data_); req_->request_payload_ = nullptr; req_->interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr); } @@ -675,11 +675,11 @@ class Server::CallbackRequest final } } void ContinueRunAfterInterception() { - auto* handler = (req_->method_ != nullptr) - ? req_->method_->handler() - : req_->server_->generic_handler_.get(); - handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( - call_, &req_->ctx_, req_->request_, req_->request_status_, + auto* handler = (req_->method_ != nullptr) + ? req_->method_->handler() + : req_->server_->generic_handler_.get(); + handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( + call_, &req_->ctx_, req_->request_, req_->request_status_, req_->handler_data_, [this] { delete req_; })); } }; @@ -694,61 +694,61 @@ class Server::CallbackRequest final } Server* const server_; - grpc::internal::RpcServiceMethod* const method_; + grpc::internal::RpcServiceMethod* const method_; const bool has_request_payload_; grpc_byte_buffer* request_payload_ = nullptr; void* request_ = nullptr; void* handler_data_ = nullptr; - grpc::Status request_status_; + grpc::Status request_status_; grpc_call_details* const call_details_ = nullptr; grpc_call* call_; gpr_timespec deadline_; grpc_metadata_array request_metadata_; grpc::CompletionQueue* const cq_; CallbackCallTag tag_; - ServerContextType ctx_; - grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; + ServerContextType ctx_; + grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; }; -template <> +template <> bool Server::CallbackRequest<grpc::CallbackServerContext>::FinalizeResult( void** /*tag*/, bool* /*status*/) { - return false; -} - -template <> + return false; +} + +template <> bool Server::CallbackRequest< grpc::GenericCallbackServerContext>::FinalizeResult(void** /*tag*/, bool* status) { - if (*status) { + if (*status) { deadline_ = call_details_->deadline; - // TODO(yangg) remove the copy here - ctx_.method_ = grpc::StringFromCopiedSlice(call_details_->method); - ctx_.host_ = grpc::StringFromCopiedSlice(call_details_->host); - } - grpc_slice_unref(call_details_->method); - grpc_slice_unref(call_details_->host); - return false; -} - -template <> + // TODO(yangg) remove the copy here + ctx_.method_ = grpc::StringFromCopiedSlice(call_details_->method); + ctx_.host_ = grpc::StringFromCopiedSlice(call_details_->host); + } + grpc_slice_unref(call_details_->method); + grpc_slice_unref(call_details_->host); + return false; +} + +template <> const char* Server::CallbackRequest<grpc::CallbackServerContext>::method_name() const { - return method_->name(); -} - -template <> -const char* Server::CallbackRequest< + return method_->name(); +} + +template <> +const char* Server::CallbackRequest< grpc::GenericCallbackServerContext>::method_name() const { - return ctx_.method().c_str(); -} - + return ctx_.method().c_str(); +} + // Implementation of ThreadManager. Each instance of SyncRequestThreadManager // manages a pool of threads that poll for incoming Sync RPCs and call the // appropriate RPC handlers -class Server::SyncRequestThreadManager : public grpc::ThreadManager { +class Server::SyncRequestThreadManager : public grpc::ThreadManager { public: - SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq, + SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq, std::shared_ptr<GlobalCallbacks> global_callbacks, grpc_resource_quota* rq, int min_pollers, int max_pollers, int cq_timeout_msec) @@ -767,11 +767,11 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN)); switch (server_cq_->AsyncNext(tag, ok, deadline)) { - case grpc::CompletionQueue::TIMEOUT: + case grpc::CompletionQueue::TIMEOUT: return TIMEOUT; - case grpc::CompletionQueue::SHUTDOWN: + case grpc::CompletionQueue::SHUTDOWN: return SHUTDOWN; - case grpc::CompletionQueue::GOT_EVENT: + case grpc::CompletionQueue::GOT_EVENT: return WORK_FOUND; } @@ -806,15 +806,15 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { // object } - void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) { + void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) { sync_requests_.emplace_back(new SyncRequest(method, tag)); } void AddUnknownSyncMethod() { if (!sync_requests_.empty()) { - unknown_method_.reset(new grpc::internal::RpcServiceMethod( - "unknown", grpc::internal::RpcMethod::BIDI_STREAMING, - new grpc::internal::UnknownMethodHandler)); + unknown_method_.reset(new grpc::internal::RpcServiceMethod( + "unknown", grpc::internal::RpcMethod::BIDI_STREAMING, + new grpc::internal::UnknownMethodHandler)); sync_requests_.emplace_back( new SyncRequest(unknown_method_.get(), nullptr)); } @@ -847,9 +847,9 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { void Start() { if (!sync_requests_.empty()) { - for (const auto& value : sync_requests_) { - value->SetupRequest(); - value->Request(server_->c_server(), server_cq_->cq()); + for (const auto& value : sync_requests_) { + value->SetupRequest(); + value->Request(server_->c_server(), server_cq_->cq()); } Initialize(); // ThreadManager's Initialize() @@ -858,27 +858,27 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { private: Server* server_; - grpc::CompletionQueue* server_cq_; + grpc::CompletionQueue* server_cq_; int cq_timeout_msec_; std::vector<std::unique_ptr<SyncRequest>> sync_requests_; - std::unique_ptr<grpc::internal::RpcServiceMethod> unknown_method_; + std::unique_ptr<grpc::internal::RpcServiceMethod> unknown_method_; std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; }; -static grpc::internal::GrpcLibraryInitializer g_gli_initializer; +static grpc::internal::GrpcLibraryInitializer g_gli_initializer; Server::Server( grpc::ChannelArguments* args, - std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>> + std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>> sync_server_cqs, int min_pollers, int max_pollers, int sync_cq_timeout_msec, - std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> - acceptors, + std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> + acceptors, grpc_resource_quota* server_rq, std::vector< - std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> + std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> interceptor_creators) - : acceptors_(std::move(acceptors)), - interceptor_creators_(std::move(interceptor_creators)), + : acceptors_(std::move(acceptors)), + interceptor_creators_(std::move(interceptor_creators)), max_receive_message_size_(INT_MIN), sync_server_cqs_(std::move(sync_server_cqs)), started_(false), @@ -888,8 +888,8 @@ Server::Server( server_initializer_(new ServerInitializer(this)), health_check_service_disabled_(false) { g_gli_initializer.summon(); - gpr_once_init(&grpc::g_once_init_callbacks, grpc::InitGlobalCallbacks); - global_callbacks_ = grpc::g_callbacks; + gpr_once_init(&grpc::g_once_init_callbacks, grpc::InitGlobalCallbacks); + global_callbacks_ = grpc::g_callbacks; global_callbacks_->UpdateArguments(args); if (sync_server_cqs_ != nullptr) { @@ -912,22 +912,22 @@ Server::Server( } } - for (auto& acceptor : acceptors_) { - acceptor->SetToChannelArgs(args); - } - + for (auto& acceptor : acceptors_) { + acceptor->SetToChannelArgs(args); + } + grpc_channel_args channel_args; args->SetChannelArgs(&channel_args); for (size_t i = 0; i < channel_args.num_args; i++) { - if (0 == strcmp(channel_args.args[i].key, - grpc::kHealthCheckServiceInterfaceArg)) { + if (0 == strcmp(channel_args.args[i].key, + grpc::kHealthCheckServiceInterfaceArg)) { if (channel_args.args[i].value.pointer.p == nullptr) { health_check_service_disabled_ = true; } else { - health_check_service_.reset( - static_cast<grpc::HealthCheckServiceInterface*>( - channel_args.args[i].value.pointer.p)); + health_check_service_.reset( + static_cast<grpc::HealthCheckServiceInterface*>( + channel_args.args[i].value.pointer.p)); } } if (0 == @@ -940,19 +940,19 @@ Server::Server( Server::~Server() { { - grpc::internal::ReleasableMutexLock lock(&mu_); + grpc::internal::ReleasableMutexLock lock(&mu_); if (started_ && !shutdown_) { - lock.Unlock(); + lock.Unlock(); Shutdown(); } else if (!started_) { // Shutdown the completion queues - for (const auto& value : sync_req_mgrs_) { - value->Shutdown(); - } - if (callback_cq_ != nullptr) { - callback_cq_->Shutdown(); - callback_cq_ = nullptr; + for (const auto& value : sync_req_mgrs_) { + value->Shutdown(); } + if (callback_cq_ != nullptr) { + callback_cq_->Shutdown(); + callback_cq_ = nullptr; + } } } // Destroy health check service before we destroy the C server so that @@ -963,43 +963,43 @@ Server::~Server() { } void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { - GPR_ASSERT(!grpc::g_callbacks); + GPR_ASSERT(!grpc::g_callbacks); GPR_ASSERT(callbacks); - grpc::g_callbacks.reset(callbacks); + grpc::g_callbacks.reset(callbacks); } grpc_server* Server::c_server() { return server_; } -std::shared_ptr<grpc::Channel> Server::InProcessChannel( - const grpc::ChannelArguments& args) { +std::shared_ptr<grpc::Channel> Server::InProcessChannel( + const grpc::ChannelArguments& args) { grpc_channel_args channel_args = args.c_channel_args(); - return grpc::CreateChannelInternal( + return grpc::CreateChannelInternal( "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr), - std::vector<std::unique_ptr< - grpc::experimental::ClientInterceptorFactoryInterface>>()); + std::vector<std::unique_ptr< + grpc::experimental::ClientInterceptorFactoryInterface>>()); } -std::shared_ptr<grpc::Channel> +std::shared_ptr<grpc::Channel> Server::experimental_type::InProcessChannelWithInterceptors( - const grpc::ChannelArguments& args, + const grpc::ChannelArguments& args, std::vector< - std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> + std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_creators) { grpc_channel_args channel_args = args.c_channel_args(); - return grpc::CreateChannelInternal( + return grpc::CreateChannelInternal( "inproc", grpc_inproc_channel_create(server_->server_, &channel_args, nullptr), std::move(interceptor_creators)); } static grpc_server_register_method_payload_handling PayloadHandlingForMethod( - grpc::internal::RpcServiceMethod* method) { + grpc::internal::RpcServiceMethod* method) { switch (method->method_type()) { - case grpc::internal::RpcMethod::NORMAL_RPC: - case grpc::internal::RpcMethod::SERVER_STREAMING: + case grpc::internal::RpcMethod::NORMAL_RPC: + case grpc::internal::RpcMethod::SERVER_STREAMING: return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER; - case grpc::internal::RpcMethod::CLIENT_STREAMING: - case grpc::internal::RpcMethod::BIDI_STREAMING: + case grpc::internal::RpcMethod::CLIENT_STREAMING: + case grpc::internal::RpcMethod::BIDI_STREAMING: return GRPC_SRM_PAYLOAD_NONE; } GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;); @@ -1015,14 +1015,14 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) { const char* method_name = nullptr; - for (const auto& method : service->methods_) { - if (method.get() == nullptr) { // Handled by generic service if any. + for (const auto& method : service->methods_) { + if (method.get() == nullptr) { // Handled by generic service if any. continue; } void* method_registration_tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, - PayloadHandlingForMethod(method.get()), 0); + PayloadHandlingForMethod(method.get()), 0); if (method_registration_tag == nullptr) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); @@ -1032,9 +1032,9 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) { if (method->handler() == nullptr) { // Async method without handler method->set_server_tag(method_registration_tag); } else if (method->api_type() == - grpc::internal::RpcServiceMethod::ApiType::SYNC) { - for (const auto& value : sync_req_mgrs_) { - value->AddSyncMethod(method.get(), method_registration_tag); + grpc::internal::RpcServiceMethod::ApiType::SYNC) { + for (const auto& value : sync_req_mgrs_) { + value->AddSyncMethod(method.get(), method_registration_tag); } } else { has_callback_methods_ = true; @@ -1064,32 +1064,32 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) { return true; } -void Server::RegisterAsyncGenericService(grpc::AsyncGenericService* service) { +void Server::RegisterAsyncGenericService(grpc::AsyncGenericService* service) { GPR_ASSERT(service->server_ == nullptr && "Can only register an async generic service against one server."); service->server_ = this; - has_async_generic_service_ = true; + has_async_generic_service_ = true; } -void Server::RegisterCallbackGenericService( +void Server::RegisterCallbackGenericService( grpc::CallbackGenericService* service) { - GPR_ASSERT( - service->server_ == nullptr && - "Can only register a callback generic service against one server."); - service->server_ = this; - has_callback_generic_service_ = true; - generic_handler_.reset(service->Handler()); - + GPR_ASSERT( + service->server_ == nullptr && + "Can only register a callback generic service against one server."); + service->server_ = this; + has_callback_generic_service_ = true; + generic_handler_.reset(service->Handler()); + grpc::CompletionQueue* cq = CallbackCQ(); server_->core_server->SetBatchMethodAllocator(cq->cq(), [this, cq] { grpc_core::Server::BatchCallAllocation result; new CallbackRequest<grpc::GenericCallbackServerContext>(this, cq, &result); return result; }); -} - +} + int Server::AddListeningPort(const TString& addr, - grpc::ServerCredentials* creds) { + grpc::ServerCredentials* creds) { GPR_ASSERT(!started_); int port = creds->AddPortToServer(addr, server_); global_callbacks_->AddPort(this, addr, creds, port); @@ -1121,46 +1121,46 @@ void Server::UnrefAndWaitLocked() { shutdown_done_cv_.WaitUntil(&mu_, [this] { return shutdown_done_; }); } -void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { +void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { GPR_ASSERT(!started_); global_callbacks_->PreServerStart(this); started_ = true; // Only create default health check service when user did not provide an // explicit one. - grpc::ServerCompletionQueue* health_check_cq = nullptr; - grpc::DefaultHealthCheckService::HealthCheckServiceImpl* + grpc::ServerCompletionQueue* health_check_cq = nullptr; + grpc::DefaultHealthCheckService::HealthCheckServiceImpl* default_health_check_service_impl = nullptr; if (health_check_service_ == nullptr && !health_check_service_disabled_ && - grpc::DefaultHealthCheckServiceEnabled()) { - auto* default_hc_service = new grpc::DefaultHealthCheckService; + grpc::DefaultHealthCheckServiceEnabled()) { + auto* default_hc_service = new grpc::DefaultHealthCheckService; health_check_service_.reset(default_hc_service); // We create a non-polling CQ to avoid impacting application // performance. This ensures that we don't introduce thread hops // for application requests that wind up on this CQ, which is polled // in its own thread. - health_check_cq = new grpc::ServerCompletionQueue( - GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); + health_check_cq = new grpc::ServerCompletionQueue( + GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); grpc_server_register_completion_queue(server_, health_check_cq->cq(), nullptr); default_health_check_service_impl = default_hc_service->GetHealthCheckService( - std::unique_ptr<grpc::ServerCompletionQueue>(health_check_cq)); + std::unique_ptr<grpc::ServerCompletionQueue>(health_check_cq)); RegisterService(nullptr, default_health_check_service_impl); } - for (auto& acceptor : acceptors_) { - acceptor->GetCredentials()->AddPortToServer(acceptor->name(), server_); - } - - // If this server uses callback methods, then create a callback generic - // service to handle any unimplemented methods using the default reactor - // creator + for (auto& acceptor : acceptors_) { + acceptor->GetCredentials()->AddPortToServer(acceptor->name(), server_); + } + + // If this server uses callback methods, then create a callback generic + // service to handle any unimplemented methods using the default reactor + // creator if (has_callback_methods_ && !has_callback_generic_service_) { unimplemented_service_.reset(new grpc::CallbackGenericService); - RegisterCallbackGenericService(unimplemented_service_.get()); - } - + RegisterCallbackGenericService(unimplemented_service_.get()); + } + #ifndef NDEBUG for (size_t i = 0; i < num_cqs; i++) { cq_list_.push_back(cqs[i]); @@ -1169,9 +1169,9 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { grpc_server_start(server_); - if (!has_async_generic_service_ && !has_callback_generic_service_) { - for (const auto& value : sync_req_mgrs_) { - value->AddUnknownSyncMethod(); + if (!has_async_generic_service_ && !has_callback_generic_service_) { + for (const auto& value : sync_req_mgrs_) { + value->AddUnknownSyncMethod(); } for (size_t i = 0; i < num_cqs; i++) { @@ -1188,50 +1188,50 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { // server CQs), make sure that we have a ResourceExhausted handler // to deal with the case of thread exhaustion if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) { - resource_exhausted_handler_.reset( - new grpc::internal::ResourceExhaustedHandler); + resource_exhausted_handler_.reset( + new grpc::internal::ResourceExhaustedHandler); } - for (const auto& value : sync_req_mgrs_) { - value->Start(); + for (const auto& value : sync_req_mgrs_) { + value->Start(); } if (default_health_check_service_impl != nullptr) { default_health_check_service_impl->StartServingThread(); } - - for (auto& acceptor : acceptors_) { - acceptor->Start(); - } + + for (auto& acceptor : acceptors_) { + acceptor->Start(); + } } void Server::ShutdownInternal(gpr_timespec deadline) { - grpc::internal::MutexLock lock(&mu_); + grpc::internal::MutexLock lock(&mu_); if (shutdown_) { return; } shutdown_ = true; - for (auto& acceptor : acceptors_) { - acceptor->Shutdown(); - } - + for (auto& acceptor : acceptors_) { + acceptor->Shutdown(); + } + /// The completion queue to use for server shutdown completion notification - grpc::CompletionQueue shutdown_cq; - grpc::ShutdownTag shutdown_tag; // Dummy shutdown tag + grpc::CompletionQueue shutdown_cq; + grpc::ShutdownTag shutdown_tag; // Dummy shutdown tag grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); shutdown_cq.Shutdown(); void* tag; bool ok; - grpc::CompletionQueue::NextStatus status = + grpc::CompletionQueue::NextStatus status = shutdown_cq.AsyncNext(&tag, &ok, deadline); // If this timed out, it means we are done with the grace period for a clean // shutdown. We should force a shutdown now by cancelling all inflight calls - if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) { + if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) { grpc_server_cancel_all_calls(server_); } // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has @@ -1239,25 +1239,25 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // Shutdown all ThreadManagers. This will try to gracefully stop all the // threads in the ThreadManagers (once they process any inflight requests) - for (const auto& value : sync_req_mgrs_) { - value->Shutdown(); // ThreadManager's Shutdown() + for (const auto& value : sync_req_mgrs_) { + value->Shutdown(); // ThreadManager's Shutdown() } // Wait for threads in all ThreadManagers to terminate - for (const auto& value : sync_req_mgrs_) { - value->Wait(); + for (const auto& value : sync_req_mgrs_) { + value->Wait(); } // Drop the shutdown ref and wait for all other refs to drop as well. UnrefAndWaitLocked(); - // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it - // will delete itself at true shutdown. - if (callback_cq_ != nullptr) { - callback_cq_->Shutdown(); - callback_cq_ = nullptr; - } - + // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it + // will delete itself at true shutdown. + if (callback_cq_ != nullptr) { + callback_cq_->Shutdown(); + callback_cq_ = nullptr; + } + // Drain the shutdown queue (if the previous call to AsyncNext() timed out // and we didn't remove the tag from the queue yet) while (shutdown_cq.Next(&tag, &ok)) { @@ -1265,7 +1265,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { } shutdown_notified_ = true; - shutdown_cv_.Broadcast(); + shutdown_cv_.Broadcast(); #ifndef NDEBUG // Unregister this server with the CQs passed into it by the user so that @@ -1278,14 +1278,14 @@ void Server::ShutdownInternal(gpr_timespec deadline) { } void Server::Wait() { - grpc::internal::MutexLock lock(&mu_); + grpc::internal::MutexLock lock(&mu_); while (started_ && !shutdown_notified_) { - shutdown_cv_.Wait(&mu_); + shutdown_cv_.Wait(&mu_); } } -void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops, - grpc::internal::Call* call) { +void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops, + grpc::internal::Call* call) { ops->FillOps(call); } @@ -1310,19 +1310,19 @@ bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( UnimplementedAsyncRequest* request) : request_(request) { - grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, ""); - grpc::internal::UnknownMethodHandler::FillOps(request_->context(), this); + grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, ""); + grpc::internal::UnknownMethodHandler::FillOps(request_->context(), this); request_->stream()->call_.PerformOps(this); } -grpc::ServerInitializer* Server::initializer() { - return server_initializer_.get(); -} +grpc::ServerInitializer* Server::initializer() { + return server_initializer_.get(); +} -grpc::CompletionQueue* Server::CallbackCQ() { +grpc::CompletionQueue* Server::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ // if there is no explicit per-server CQ registered - grpc::internal::MutexLock l(&mu_); + grpc::internal::MutexLock l(&mu_); if (callback_cq_ != nullptr) { return callback_cq_; } @@ -1335,6 +1335,6 @@ grpc::CompletionQueue* Server::CallbackCQ() { shutdown_callback->TakeCQ(callback_cq_); return callback_cq_; -} +} } // namespace grpc |