diff options
author | neksard <neksard@yandex-team.ru> | 2022-02-10 16:45:33 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:33 +0300 |
commit | 1d9c550e7c38e051d7961f576013a482003a70d9 (patch) | |
tree | b2cc84ee7850122e7ccf51d0ea21e4fa7e7a5685 /contrib/libs/grpc/src/cpp/server/server_cc.cc | |
parent | 8f7cf138264e0caa318144bf8a2c950e0b0a8593 (diff) | |
download | ydb-1d9c550e7c38e051d7961f576013a482003a70d9.tar.gz |
Restoring authorship annotation for <neksard@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/libs/grpc/src/cpp/server/server_cc.cc')
-rw-r--r-- | contrib/libs/grpc/src/cpp/server/server_cc.cc | 612 |
1 files changed, 306 insertions, 306 deletions
diff --git a/contrib/libs/grpc/src/cpp/server/server_cc.cc b/contrib/libs/grpc/src/cpp/server/server_cc.cc index 56bf75d730..c2a911c7f7 100644 --- a/contrib/libs/grpc/src/cpp/server/server_cc.cc +++ b/contrib/libs/grpc/src/cpp/server/server_cc.cc @@ -30,10 +30,10 @@ #include <grpcpp/generic/async_generic_service.h> #include <grpcpp/impl/codegen/async_unary_call.h> #include <grpcpp/impl/codegen/byte_buffer.h> -#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/call.h> #include <grpcpp/impl/codegen/completion_queue_tag.h> #include <grpcpp/impl/codegen/method_handler.h> -#include <grpcpp/impl/codegen/server_interceptor.h> +#include <grpcpp/impl/codegen/server_interceptor.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/impl/rpc_service_method.h> #include <grpcpp/impl/server_initializer.h> @@ -43,10 +43,10 @@ #include <grpcpp/support/time.h> #include "src/core/ext/transport/inproc/inproc_transport.h" -#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/surface/call.h" -#include "src/core/lib/surface/completion_queue.h" +#include "src/core/lib/surface/completion_queue.h" #include "src/core/lib/surface/server.h" #include "src/cpp/client/create_channel_internal.h" #include "src/cpp/server/external_connection_acceptor_impl.h" @@ -58,13 +58,13 @@ namespace grpc { namespace { -// The default value for maximum number of threads that can be created in the -// sync server. This value of INT_MAX is chosen to match the default behavior if -// no ResourceQuota is set. To modify the max number of threads in a sync -// server, pass a custom ResourceQuota object (with the desired number of -// max-threads set) to the server builder. -#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX - +// The default value for maximum number of threads that can be created in the +// sync server. This value of INT_MAX is chosen to match the default behavior if +// no ResourceQuota is set. To modify the max number of threads in a sync +// server, pass a custom ResourceQuota object (with the desired number of +// max-threads set) to the server builder. +#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX + class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { public: ~DefaultGlobalCallbacks() override {} @@ -319,10 +319,10 @@ class Server::UnimplementedAsyncResponse final grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag, status)) { - delete this; - } else { - // The tag was swallowed due to interception. We will see it again. - } + delete this; + } else { + // The tag was swallowed due to interception. We will see it again. + } return false; } @@ -334,7 +334,7 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { public: SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag) : method_(method), - method_tag_(method_tag), + method_tag_(method_tag), in_flight_(false), has_request_payload_(method->method_type() == grpc::internal::RpcMethod::NORMAL_RPC || @@ -362,11 +362,11 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; - if (method_tag_) { - if (grpc_server_request_registered_call( - server, method_tag_, &call_, &deadline_, &request_metadata_, + if (method_tag_) { + if (grpc_server_request_registered_call( + server, method_tag_, &call_, &deadline_, &request_metadata_, has_request_payload_ ? &request_payload_ : nullptr, cq_, - notify_cq, this) != GRPC_CALL_OK) { + notify_cq, this) != GRPC_CALL_OK) { TeardownRequest(); return; } @@ -384,21 +384,21 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { } } - void PostShutdownCleanup() { - if (call_) { - grpc_call_unref(call_); - call_ = nullptr; - } - if (cq_) { - grpc_completion_queue_destroy(cq_); - cq_ = nullptr; - } - } - + void PostShutdownCleanup() { + if (call_) { + grpc_call_unref(call_); + call_ = nullptr; + } + if (cq_) { + grpc_completion_queue_destroy(cq_); + cq_ = nullptr; + } + } + bool FinalizeResult(void** /*tag*/, bool* status) override { if (!*status) { grpc_completion_queue_destroy(cq_); - cq_ = nullptr; + cq_ = nullptr; } if (call_details_) { deadline_ = call_details_->deadline; @@ -408,26 +408,26 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { return true; } - // The CallData class represents a call that is "active" as opposed - // to just being requested. It wraps and takes ownership of the cq from - // the call request + // The CallData class represents a call that is "active" as opposed + // to just being requested. It wraps and takes ownership of the cq from + // the call request class CallData final { public: explicit CallData(Server* server, SyncRequest* mrd) : cq_(mrd->cq_), ctx_(mrd->deadline_, &mrd->request_metadata_), has_request_payload_(mrd->has_request_payload_), - request_payload_(has_request_payload_ ? mrd->request_payload_ - : nullptr), - request_(nullptr), - method_(mrd->method_), - call_( - mrd->call_, server, &cq_, server->max_receive_message_size(), - ctx_.set_server_rpc_info(method_->name(), method_->method_type(), - server->interceptor_creators_)), - server_(server), - global_callbacks_(nullptr), - resources_(false) { + request_payload_(has_request_payload_ ? mrd->request_payload_ + : nullptr), + request_(nullptr), + method_(mrd->method_), + call_( + mrd->call_, server, &cq_, server->max_receive_message_size(), + ctx_.set_server_rpc_info(method_->name(), method_->method_type(), + server->interceptor_creators_)), + server_(server), + global_callbacks_(nullptr), + resources_(false) { ctx_.set_call(mrd->call_); ctx_.cq_ = &cq_; GPR_ASSERT(mrd->in_flight_); @@ -441,82 +441,82 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { } } - void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, - bool resources) { - global_callbacks_ = global_callbacks; - resources_ = resources; + void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, + bool resources) { + global_callbacks_ = global_callbacks; + resources_ = resources; - interceptor_methods_.SetCall(&call_); - interceptor_methods_.SetReverse(); - // Set interception point for RECV INITIAL METADATA - interceptor_methods_.AddInterceptionHookPoint( + interceptor_methods_.SetCall(&call_); + interceptor_methods_.SetReverse(); + // Set interception point for RECV INITIAL METADATA + interceptor_methods_.AddInterceptionHookPoint( grpc::experimental::InterceptionHookPoints:: POST_RECV_INITIAL_METADATA); - interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_); + interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_); - if (has_request_payload_) { - // Set interception point for RECV MESSAGE - auto* handler = resources_ ? method_->handler() - : server_->resource_exhausted_handler_.get(); - request_ = handler->Deserialize(call_.call(), request_payload_, + if (has_request_payload_) { + // Set interception point for RECV MESSAGE + auto* handler = resources_ ? method_->handler() + : server_->resource_exhausted_handler_.get(); + request_ = handler->Deserialize(call_.call(), request_payload_, &request_status_, nullptr); - request_payload_ = nullptr; - interceptor_methods_.AddInterceptionHookPoint( + request_payload_ = nullptr; + interceptor_methods_.AddInterceptionHookPoint( grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); - interceptor_methods_.SetRecvMessage(request_, nullptr); - } - - if (interceptor_methods_.RunInterceptors( - [this]() { ContinueRunAfterInterception(); })) { - ContinueRunAfterInterception(); - } else { - // There were interceptors to be run, so ContinueRunAfterInterception - // will be run when interceptors are done. - } + interceptor_methods_.SetRecvMessage(request_, nullptr); + } + + if (interceptor_methods_.RunInterceptors( + [this]() { ContinueRunAfterInterception(); })) { + ContinueRunAfterInterception(); + } else { + // There were interceptors to be run, so ContinueRunAfterInterception + // will be run when interceptors are done. + } } - void ContinueRunAfterInterception() { - { - ctx_.BeginCompletionOp(&call_, nullptr, nullptr); - global_callbacks_->PreSynchronousRequest(&ctx_); - auto* handler = resources_ ? method_->handler() - : server_->resource_exhausted_handler_.get(); + void ContinueRunAfterInterception() { + { + ctx_.BeginCompletionOp(&call_, nullptr, nullptr); + global_callbacks_->PreSynchronousRequest(&ctx_); + auto* handler = resources_ ? method_->handler() + : server_->resource_exhausted_handler_.get(); handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( &call_, &ctx_, request_, request_status_, nullptr, nullptr)); - request_ = nullptr; - global_callbacks_->PostSynchronousRequest(&ctx_); - - cq_.Shutdown(); - + request_ = nullptr; + global_callbacks_->PostSynchronousRequest(&ctx_); + + cq_.Shutdown(); + grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); - cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); - - /* Ensure the cq_ is shutdown */ + cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); + + /* Ensure the cq_ is shutdown */ grpc::DummyTag ignored_tag; - GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); - } - delete this; - } - + GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); + } + delete this; + } + private: grpc::CompletionQueue cq_; grpc::ServerContext ctx_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; - void* request_; + void* request_; grpc::Status request_status_; grpc::internal::RpcServiceMethod* const method_; grpc::internal::Call call_; - Server* server_; - std::shared_ptr<GlobalCallbacks> global_callbacks_; - bool resources_; + Server* server_; + std::shared_ptr<GlobalCallbacks> global_callbacks_; + bool resources_; grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; }; private: grpc::internal::RpcServiceMethod* const method_; - void* const method_tag_; + void* const method_tag_; bool in_flight_; const bool has_request_payload_; grpc_call* call_; @@ -541,19 +541,19 @@ class Server::CallbackRequest final CallbackRequest(Server* server, grpc::internal::RpcServiceMethod* method, grpc::CompletionQueue* cq, grpc_core::Server::RegisteredCallAllocation* data) - : server_(server), - method_(method), + : server_(server), + method_(method), has_request_payload_(method->method_type() == grpc::internal::RpcMethod::NORMAL_RPC || method->method_type() == grpc::internal::RpcMethod::SERVER_STREAMING), cq_(cq), - tag_(this) { + tag_(this) { CommonSetup(server, data); data->deadline = &deadline_; data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr; - } - + } + // For generic services, method is nullptr since these services don't have // pre-defined methods. CallbackRequest(Server* server, grpc::CompletionQueue* cq, @@ -567,8 +567,8 @@ class Server::CallbackRequest final CommonSetup(server, data); grpc_call_details_init(call_details_); data->details = call_details_; - } - + } + ~CallbackRequest() { delete call_details_; grpc_metadata_array_destroy(&request_metadata_); @@ -576,21 +576,21 @@ class Server::CallbackRequest final grpc_byte_buffer_destroy(request_payload_); } server_->UnrefWithPossibleNotify(); - } - + } + // Needs specialization to account for different processing of metadata // in generic API bool FinalizeResult(void** tag, bool* status) override; - - private: + + private: // method_name needs to be specialized between named method and generic const char* method_name() const; - class CallbackCallTag : public grpc_experimental_completion_queue_functor { - public: + class CallbackCallTag : public grpc_experimental_completion_queue_functor { + public: CallbackCallTag(Server::CallbackRequest<ServerContextType>* req) : req_(req) { - functor_run = &CallbackCallTag::StaticRun; + functor_run = &CallbackCallTag::StaticRun; // Set inlineable to true since this callback is internally-controlled // without taking any locks, and thus does not need to be run from the // executor (which triggers a thread hop). This should only be used by @@ -598,42 +598,42 @@ class Server::CallbackRequest final // here is actually non-trivial, but there is no chance of having user // locks conflict with each other so it's ok to run inlined. inlineable = true; - } - - // force_run can not be performed on a tag if operations using this tag - // have been sent to PerformOpsOnCall. It is intended for error conditions - // that are detected before the operations are internally processed. - void force_run(bool ok) { Run(ok); } - - private: + } + + // force_run can not be performed on a tag if operations using this tag + // have been sent to PerformOpsOnCall. It is intended for error conditions + // that are detected before the operations are internally processed. + void force_run(bool ok) { Run(ok); } + + private: Server::CallbackRequest<ServerContextType>* req_; grpc::internal::Call* call_; - - static void StaticRun(grpc_experimental_completion_queue_functor* cb, - int ok) { - static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok)); - } - void Run(bool ok) { - void* ignored = req_; - bool new_ok = ok; - GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok)); - GPR_ASSERT(ignored == req_); - - if (!ok) { - // The call has been shutdown. - // Delete its contents to free up the request. - delete req_; - return; - } - - // Bind the call, deadline, and metadata from what we got - req_->ctx_.set_call(req_->call_); - req_->ctx_.cq_ = req_->cq_; - req_->ctx_.BindDeadlineAndMetadata(req_->deadline_, - &req_->request_metadata_); - req_->request_metadata_.count = 0; - - // Create a C++ Call to control the underlying core call + + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok)); + } + void Run(bool ok) { + void* ignored = req_; + bool new_ok = ok; + GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok)); + GPR_ASSERT(ignored == req_); + + if (!ok) { + // The call has been shutdown. + // Delete its contents to free up the request. + delete req_; + return; + } + + // Bind the call, deadline, and metadata from what we got + req_->ctx_.set_call(req_->call_); + req_->ctx_.cq_ = req_->cq_; + req_->ctx_.BindDeadlineAndMetadata(req_->deadline_, + &req_->request_metadata_); + req_->request_metadata_.count = 0; + + // Create a C++ Call to control the underlying core call call_ = new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call))) grpc::internal::Call( @@ -645,71 +645,71 @@ class Server::CallbackRequest final ? req_->method_->method_type() : grpc::internal::RpcMethod::BIDI_STREAMING, req_->server_->interceptor_creators_)); - - req_->interceptor_methods_.SetCall(call_); - req_->interceptor_methods_.SetReverse(); - // Set interception point for RECV INITIAL METADATA - req_->interceptor_methods_.AddInterceptionHookPoint( + + req_->interceptor_methods_.SetCall(call_); + req_->interceptor_methods_.SetReverse(); + // Set interception point for RECV INITIAL METADATA + req_->interceptor_methods_.AddInterceptionHookPoint( grpc::experimental::InterceptionHookPoints:: POST_RECV_INITIAL_METADATA); - req_->interceptor_methods_.SetRecvInitialMetadata( - &req_->ctx_.client_metadata_); - - if (req_->has_request_payload_) { - // Set interception point for RECV MESSAGE - req_->request_ = req_->method_->handler()->Deserialize( + req_->interceptor_methods_.SetRecvInitialMetadata( + &req_->ctx_.client_metadata_); + + if (req_->has_request_payload_) { + // Set interception point for RECV MESSAGE + req_->request_ = req_->method_->handler()->Deserialize( req_->call_, req_->request_payload_, &req_->request_status_, &req_->handler_data_); - req_->request_payload_ = nullptr; - req_->interceptor_methods_.AddInterceptionHookPoint( + req_->request_payload_ = nullptr; + req_->interceptor_methods_.AddInterceptionHookPoint( grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); - req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr); - } - - if (req_->interceptor_methods_.RunInterceptors( - [this] { ContinueRunAfterInterception(); })) { - ContinueRunAfterInterception(); - } else { - // There were interceptors to be run, so ContinueRunAfterInterception - // will be run when interceptors are done. - } - } - void ContinueRunAfterInterception() { + req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr); + } + + if (req_->interceptor_methods_.RunInterceptors( + [this] { ContinueRunAfterInterception(); })) { + ContinueRunAfterInterception(); + } else { + // There were interceptors to be run, so ContinueRunAfterInterception + // will be run when interceptors are done. + } + } + void ContinueRunAfterInterception() { auto* handler = (req_->method_ != nullptr) ? req_->method_->handler() : req_->server_->generic_handler_.get(); handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( call_, &req_->ctx_, req_->request_, req_->request_status_, req_->handler_data_, [this] { delete req_; })); - } - }; - + } + }; + template <class CallAllocation> void CommonSetup(Server* server, CallAllocation* data) { server->Ref(); - grpc_metadata_array_init(&request_metadata_); + grpc_metadata_array_init(&request_metadata_); data->tag = &tag_; data->call = &call_; data->initial_metadata = &request_metadata_; - } - - Server* const server_; + } + + Server* const server_; grpc::internal::RpcServiceMethod* const method_; - const bool has_request_payload_; + const bool has_request_payload_; grpc_byte_buffer* request_payload_ = nullptr; void* request_ = nullptr; void* handler_data_ = nullptr; grpc::Status request_status_; grpc_call_details* const call_details_ = nullptr; - grpc_call* call_; - gpr_timespec deadline_; - grpc_metadata_array request_metadata_; + grpc_call* call_; + gpr_timespec deadline_; + grpc_metadata_array request_metadata_; grpc::CompletionQueue* const cq_; - CallbackCallTag tag_; + CallbackCallTag tag_; ServerContextType ctx_; grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; -}; - +}; + template <> bool Server::CallbackRequest<grpc::CallbackServerContext>::FinalizeResult( void** /*tag*/, bool* /*status*/) { @@ -750,13 +750,13 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { public: SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq, std::shared_ptr<GlobalCallbacks> global_callbacks, - grpc_resource_quota* rq, int min_pollers, - int max_pollers, int cq_timeout_msec) - : ThreadManager("SyncServer", rq, min_pollers, max_pollers), + grpc_resource_quota* rq, int min_pollers, + int max_pollers, int cq_timeout_msec) + : ThreadManager("SyncServer", rq, min_pollers, max_pollers), server_(server), server_cq_(server_cq), cq_timeout_msec_(cq_timeout_msec), - global_callbacks_(std::move(global_callbacks)) {} + global_callbacks_(std::move(global_callbacks)) {} WorkStatus PollForWork(void** tag, bool* ok) override { *tag = nullptr; @@ -778,7 +778,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { GPR_UNREACHABLE_CODE(return TIMEOUT); } - void DoWork(void* tag, bool ok, bool resources) override { + void DoWork(void* tag, bool ok, bool resources) override { SyncRequest* sync_req = static_cast<SyncRequest*>(tag); if (!sync_req) { @@ -789,9 +789,9 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { } if (ok) { - // Calldata takes ownership of the completion queue and interceptors - // inside sync_req - auto* cd = new SyncRequest::CallData(server_, sync_req); + // Calldata takes ownership of the completion queue and interceptors + // inside sync_req + auto* cd = new SyncRequest::CallData(server_, sync_req); // Prepare for the next request if (!IsShutdown()) { sync_req->SetupRequest(); // Create new completion queue for sync_req @@ -799,7 +799,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { } GPR_TIMER_SCOPE("cd.Run()", 0); - cd->Run(global_callbacks_, resources); + cd->Run(global_callbacks_, resources); } // TODO (sreek) If ok is false here (which it isn't in case of // grpc_request_registered_call), we should still re-queue the request @@ -831,17 +831,17 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { void* tag; bool ok; while (server_cq_->Next(&tag, &ok)) { - if (ok) { - // If a request was pulled off the queue, it means that the thread - // handling the request added it to the completion queue after shutdown - // was called - because the thread had already started and checked the - // shutdown flag before shutdown was called. In this case, we simply - // clean it up here, *after* calling wait on all the worker threads, at - // which point we are certain no in-flight requests will add more to the - // queue. This fixes an intermittent memory leak on shutdown. - SyncRequest* sync_req = static_cast<SyncRequest*>(tag); - sync_req->PostShutdownCleanup(); - } + if (ok) { + // If a request was pulled off the queue, it means that the thread + // handling the request added it to the completion queue after shutdown + // was called - because the thread had already started and checked the + // shutdown flag before shutdown was called. In this case, we simply + // clean it up here, *after* calling wait on all the worker threads, at + // which point we are certain no in-flight requests will add more to the + // queue. This fixes an intermittent memory leak on shutdown. + SyncRequest* sync_req = static_cast<SyncRequest*>(tag); + sync_req->PostShutdownCleanup(); + } } } @@ -870,17 +870,17 @@ Server::Server( grpc::ChannelArguments* args, std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>> sync_server_cqs, - int min_pollers, int max_pollers, int sync_cq_timeout_msec, + int min_pollers, int max_pollers, int sync_cq_timeout_msec, std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> acceptors, - grpc_resource_quota* server_rq, - std::vector< + grpc_resource_quota* server_rq, + std::vector< std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> - interceptor_creators) + interceptor_creators) : acceptors_(std::move(acceptors)), interceptor_creators_(std::move(interceptor_creators)), max_receive_message_size_(INT_MIN), - sync_server_cqs_(std::move(sync_server_cqs)), + sync_server_cqs_(std::move(sync_server_cqs)), started_(false), shutdown_(false), shutdown_notified_(false), @@ -893,23 +893,23 @@ Server::Server( global_callbacks_->UpdateArguments(args); if (sync_server_cqs_ != nullptr) { - bool default_rq_created = false; - if (server_rq == nullptr) { - server_rq = grpc_resource_quota_create("SyncServer-default-rq"); - grpc_resource_quota_set_max_threads(server_rq, - DEFAULT_MAX_SYNC_SERVER_THREADS); - default_rq_created = true; - } - + bool default_rq_created = false; + if (server_rq == nullptr) { + server_rq = grpc_resource_quota_create("SyncServer-default-rq"); + grpc_resource_quota_set_max_threads(server_rq, + DEFAULT_MAX_SYNC_SERVER_THREADS); + default_rq_created = true; + } + for (const auto& it : *sync_server_cqs_) { sync_req_mgrs_.emplace_back(new SyncRequestThreadManager( - this, it.get(), global_callbacks_, server_rq, min_pollers, - max_pollers, sync_cq_timeout_msec)); + this, it.get(), global_callbacks_, server_rq, min_pollers, + max_pollers, sync_cq_timeout_msec)); + } + + if (default_rq_created) { + grpc_resource_quota_unref(server_rq); } - - if (default_rq_created) { - grpc_resource_quota_unref(server_rq); - } } for (auto& acceptor : acceptors_) { @@ -974,24 +974,24 @@ std::shared_ptr<grpc::Channel> Server::InProcessChannel( const grpc::ChannelArguments& args) { grpc_channel_args channel_args = args.c_channel_args(); return grpc::CreateChannelInternal( - "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr), + "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr), std::vector<std::unique_ptr< grpc::experimental::ClientInterceptorFactoryInterface>>()); } std::shared_ptr<grpc::Channel> -Server::experimental_type::InProcessChannelWithInterceptors( +Server::experimental_type::InProcessChannelWithInterceptors( const grpc::ChannelArguments& args, - std::vector< + std::vector< std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> - interceptor_creators) { - grpc_channel_args channel_args = args.c_channel_args(); + interceptor_creators) { + grpc_channel_args channel_args = args.c_channel_args(); return grpc::CreateChannelInternal( - "inproc", - grpc_inproc_channel_create(server_->server_, &channel_args, nullptr), - std::move(interceptor_creators)); -} - + "inproc", + grpc_inproc_channel_create(server_->server_, &channel_args, nullptr), + std::move(interceptor_creators)); +} + static grpc_server_register_method_payload_handling PayloadHandlingForMethod( grpc::internal::RpcServiceMethod* method) { switch (method->method_type()) { @@ -1014,29 +1014,29 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) { } const char* method_name = nullptr; - + for (const auto& method : service->methods_) { if (method.get() == nullptr) { // Handled by generic service if any. continue; } - void* method_registration_tag = grpc_server_register_method( + void* method_registration_tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, PayloadHandlingForMethod(method.get()), 0); - if (method_registration_tag == nullptr) { + if (method_registration_tag == nullptr) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); return false; } - if (method->handler() == nullptr) { // Async method without handler - method->set_server_tag(method_registration_tag); - } else if (method->api_type() == + if (method->handler() == nullptr) { // Async method without handler + method->set_server_tag(method_registration_tag); + } else if (method->api_type() == grpc::internal::RpcServiceMethod::ApiType::SYNC) { for (const auto& value : sync_req_mgrs_) { value->AddSyncMethod(method.get(), method_registration_tag); } - } else { + } else { has_callback_methods_ = true; grpc::internal::RpcServiceMethod* method_value = method.get(); grpc::CompletionQueue* cq = CallbackCQ(); @@ -1130,23 +1130,23 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { // explicit one. grpc::ServerCompletionQueue* health_check_cq = nullptr; grpc::DefaultHealthCheckService::HealthCheckServiceImpl* - default_health_check_service_impl = nullptr; + default_health_check_service_impl = nullptr; if (health_check_service_ == nullptr && !health_check_service_disabled_ && grpc::DefaultHealthCheckServiceEnabled()) { auto* default_hc_service = new grpc::DefaultHealthCheckService; - health_check_service_.reset(default_hc_service); - // We create a non-polling CQ to avoid impacting application - // performance. This ensures that we don't introduce thread hops - // for application requests that wind up on this CQ, which is polled - // in its own thread. + health_check_service_.reset(default_hc_service); + // We create a non-polling CQ to avoid impacting application + // performance. This ensures that we don't introduce thread hops + // for application requests that wind up on this CQ, which is polled + // in its own thread. health_check_cq = new grpc::ServerCompletionQueue( GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); - grpc_server_register_completion_queue(server_, health_check_cq->cq(), - nullptr); - default_health_check_service_impl = - default_hc_service->GetHealthCheckService( + grpc_server_register_completion_queue(server_, health_check_cq->cq(), + nullptr); + default_health_check_service_impl = + default_hc_service->GetHealthCheckService( std::unique_ptr<grpc::ServerCompletionQueue>(health_check_cq)); - RegisterService(nullptr, default_health_check_service_impl); + RegisterService(nullptr, default_health_check_service_impl); } for (auto& acceptor : acceptors_) { @@ -1179,26 +1179,26 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { new UnimplementedAsyncRequest(this, cqs[i]); } } - if (health_check_cq != nullptr) { - new UnimplementedAsyncRequest(this, health_check_cq); - } + if (health_check_cq != nullptr) { + new UnimplementedAsyncRequest(this, health_check_cq); + } } - // If this server has any support for synchronous methods (has any sync - // server CQs), make sure that we have a ResourceExhausted handler - // to deal with the case of thread exhaustion - if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) { + // If this server has any support for synchronous methods (has any sync + // server CQs), make sure that we have a ResourceExhausted handler + // to deal with the case of thread exhaustion + if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) { resource_exhausted_handler_.reset( new grpc::internal::ResourceExhaustedHandler); - } - + } + for (const auto& value : sync_req_mgrs_) { value->Start(); } - - if (default_health_check_service_impl != nullptr) { - default_health_check_service_impl->StartServingThread(); - } + + if (default_health_check_service_impl != nullptr) { + default_health_check_service_impl->StartServingThread(); + } for (auto& acceptor : acceptors_) { acceptor->Start(); @@ -1207,50 +1207,50 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { void Server::ShutdownInternal(gpr_timespec deadline) { grpc::internal::MutexLock lock(&mu_); - if (shutdown_) { - return; - } + if (shutdown_) { + return; + } - shutdown_ = true; + shutdown_ = true; for (auto& acceptor : acceptors_) { acceptor->Shutdown(); } - /// The completion queue to use for server shutdown completion notification + /// The completion queue to use for server shutdown completion notification grpc::CompletionQueue shutdown_cq; grpc::ShutdownTag shutdown_tag; // Dummy shutdown tag - grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); + grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); - shutdown_cq.Shutdown(); + shutdown_cq.Shutdown(); - void* tag; - bool ok; + void* tag; + bool ok; grpc::CompletionQueue::NextStatus status = - shutdown_cq.AsyncNext(&tag, &ok, deadline); + shutdown_cq.AsyncNext(&tag, &ok, deadline); - // If this timed out, it means we are done with the grace period for a clean - // shutdown. We should force a shutdown now by cancelling all inflight calls + // If this timed out, it means we are done with the grace period for a clean + // shutdown. We should force a shutdown now by cancelling all inflight calls if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) { - grpc_server_cancel_all_calls(server_); - } - // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has - // successfully shutdown + grpc_server_cancel_all_calls(server_); + } + // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has + // successfully shutdown - // Shutdown all ThreadManagers. This will try to gracefully stop all the - // threads in the ThreadManagers (once they process any inflight requests) + // Shutdown all ThreadManagers. This will try to gracefully stop all the + // threads in the ThreadManagers (once they process any inflight requests) for (const auto& value : sync_req_mgrs_) { value->Shutdown(); // ThreadManager's Shutdown() - } + } - // Wait for threads in all ThreadManagers to terminate + // Wait for threads in all ThreadManagers to terminate for (const auto& value : sync_req_mgrs_) { value->Wait(); - } + } // Drop the shutdown ref and wait for all other refs to drop as well. UnrefAndWaitLocked(); - + // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it // will delete itself at true shutdown. if (callback_cq_ != nullptr) { @@ -1258,13 +1258,13 @@ void Server::ShutdownInternal(gpr_timespec deadline) { callback_cq_ = nullptr; } - // Drain the shutdown queue (if the previous call to AsyncNext() timed out - // and we didn't remove the tag from the queue yet) - while (shutdown_cq.Next(&tag, &ok)) { - // Nothing to be done here. Just ignore ok and tag values - } - - shutdown_notified_ = true; + // Drain the shutdown queue (if the previous call to AsyncNext() timed out + // and we didn't remove the tag from the queue yet) + while (shutdown_cq.Next(&tag, &ok)) { + // Nothing to be done here. Just ignore ok and tag values + } + + shutdown_notified_ = true; shutdown_cv_.Broadcast(); #ifndef NDEBUG @@ -1286,23 +1286,23 @@ void Server::Wait() { void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops, grpc::internal::Call* call) { - ops->FillOps(call); + ops->FillOps(call); } bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, bool* status) { - if (GenericAsyncRequest::FinalizeResult(tag, status)) { - // We either had no interceptors run or we are done intercepting - if (*status) { + if (GenericAsyncRequest::FinalizeResult(tag, status)) { + // We either had no interceptors run or we are done intercepting + if (*status) { // Create a new request/response pair using the server and CQ values // stored in this object's base class. new UnimplementedAsyncRequest(server_, notification_cq_); - new UnimplementedAsyncResponse(this); - } else { - delete this; - } + new UnimplementedAsyncResponse(this); + } else { + delete this; + } } else { - // The tag was swallowed due to interception. We will see it again. + // The tag was swallowed due to interception. We will see it again. } return false; } @@ -1320,8 +1320,8 @@ grpc::ServerInitializer* Server::initializer() { } grpc::CompletionQueue* Server::CallbackCQ() { - // TODO(vjpai): Consider using a single global CQ for the default CQ - // if there is no explicit per-server CQ registered + // TODO(vjpai): Consider using a single global CQ for the default CQ + // if there is no explicit per-server CQ registered grpc::internal::MutexLock l(&mu_); if (callback_cq_ != nullptr) { return callback_cq_; @@ -1330,11 +1330,11 @@ grpc::CompletionQueue* Server::CallbackCQ() { callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}); - + // Transfer ownership of the new cq to its own shutdown callback shutdown_callback->TakeCQ(callback_cq_); - return callback_cq_; + return callback_cq_; } - + } // namespace grpc |