aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/grpc/src/cpp/server/server_cc.cc
diff options
context:
space:
mode:
authorneksard <neksard@yandex-team.ru>2022-02-10 16:45:33 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:33 +0300
commit1d9c550e7c38e051d7961f576013a482003a70d9 (patch)
treeb2cc84ee7850122e7ccf51d0ea21e4fa7e7a5685 /contrib/libs/grpc/src/cpp/server/server_cc.cc
parent8f7cf138264e0caa318144bf8a2c950e0b0a8593 (diff)
downloadydb-1d9c550e7c38e051d7961f576013a482003a70d9.tar.gz
Restoring authorship annotation for <neksard@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/libs/grpc/src/cpp/server/server_cc.cc')
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_cc.cc612
1 files changed, 306 insertions, 306 deletions
diff --git a/contrib/libs/grpc/src/cpp/server/server_cc.cc b/contrib/libs/grpc/src/cpp/server/server_cc.cc
index 56bf75d730..c2a911c7f7 100644
--- a/contrib/libs/grpc/src/cpp/server/server_cc.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_cc.cc
@@ -30,10 +30,10 @@
#include <grpcpp/generic/async_generic_service.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/byte_buffer.h>
-#include <grpcpp/impl/codegen/call.h>
+#include <grpcpp/impl/codegen/call.h>
#include <grpcpp/impl/codegen/completion_queue_tag.h>
#include <grpcpp/impl/codegen/method_handler.h>
-#include <grpcpp/impl/codegen/server_interceptor.h>
+#include <grpcpp/impl/codegen/server_interceptor.h>
#include <grpcpp/impl/grpc_library.h>
#include <grpcpp/impl/rpc_service_method.h>
#include <grpcpp/impl/server_initializer.h>
@@ -43,10 +43,10 @@
#include <grpcpp/support/time.h>
#include "src/core/ext/transport/inproc/inproc_transport.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/surface/call.h"
-#include "src/core/lib/surface/completion_queue.h"
+#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/server.h"
#include "src/cpp/client/create_channel_internal.h"
#include "src/cpp/server/external_connection_acceptor_impl.h"
@@ -58,13 +58,13 @@
namespace grpc {
namespace {
-// The default value for maximum number of threads that can be created in the
-// sync server. This value of INT_MAX is chosen to match the default behavior if
-// no ResourceQuota is set. To modify the max number of threads in a sync
-// server, pass a custom ResourceQuota object (with the desired number of
-// max-threads set) to the server builder.
-#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
-
+// The default value for maximum number of threads that can be created in the
+// sync server. This value of INT_MAX is chosen to match the default behavior if
+// no ResourceQuota is set. To modify the max number of threads in a sync
+// server, pass a custom ResourceQuota object (with the desired number of
+// max-threads set) to the server builder.
+#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
+
class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
public:
~DefaultGlobalCallbacks() override {}
@@ -319,10 +319,10 @@ class Server::UnimplementedAsyncResponse final
grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag,
status)) {
- delete this;
- } else {
- // The tag was swallowed due to interception. We will see it again.
- }
+ delete this;
+ } else {
+ // The tag was swallowed due to interception. We will see it again.
+ }
return false;
}
@@ -334,7 +334,7 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
public:
SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag)
: method_(method),
- method_tag_(method_tag),
+ method_tag_(method_tag),
in_flight_(false),
has_request_payload_(method->method_type() ==
grpc::internal::RpcMethod::NORMAL_RPC ||
@@ -362,11 +362,11 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
- if (method_tag_) {
- if (grpc_server_request_registered_call(
- server, method_tag_, &call_, &deadline_, &request_metadata_,
+ if (method_tag_) {
+ if (grpc_server_request_registered_call(
+ server, method_tag_, &call_, &deadline_, &request_metadata_,
has_request_payload_ ? &request_payload_ : nullptr, cq_,
- notify_cq, this) != GRPC_CALL_OK) {
+ notify_cq, this) != GRPC_CALL_OK) {
TeardownRequest();
return;
}
@@ -384,21 +384,21 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
}
}
- void PostShutdownCleanup() {
- if (call_) {
- grpc_call_unref(call_);
- call_ = nullptr;
- }
- if (cq_) {
- grpc_completion_queue_destroy(cq_);
- cq_ = nullptr;
- }
- }
-
+ void PostShutdownCleanup() {
+ if (call_) {
+ grpc_call_unref(call_);
+ call_ = nullptr;
+ }
+ if (cq_) {
+ grpc_completion_queue_destroy(cq_);
+ cq_ = nullptr;
+ }
+ }
+
bool FinalizeResult(void** /*tag*/, bool* status) override {
if (!*status) {
grpc_completion_queue_destroy(cq_);
- cq_ = nullptr;
+ cq_ = nullptr;
}
if (call_details_) {
deadline_ = call_details_->deadline;
@@ -408,26 +408,26 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
return true;
}
- // The CallData class represents a call that is "active" as opposed
- // to just being requested. It wraps and takes ownership of the cq from
- // the call request
+ // The CallData class represents a call that is "active" as opposed
+ // to just being requested. It wraps and takes ownership of the cq from
+ // the call request
class CallData final {
public:
explicit CallData(Server* server, SyncRequest* mrd)
: cq_(mrd->cq_),
ctx_(mrd->deadline_, &mrd->request_metadata_),
has_request_payload_(mrd->has_request_payload_),
- request_payload_(has_request_payload_ ? mrd->request_payload_
- : nullptr),
- request_(nullptr),
- method_(mrd->method_),
- call_(
- mrd->call_, server, &cq_, server->max_receive_message_size(),
- ctx_.set_server_rpc_info(method_->name(), method_->method_type(),
- server->interceptor_creators_)),
- server_(server),
- global_callbacks_(nullptr),
- resources_(false) {
+ request_payload_(has_request_payload_ ? mrd->request_payload_
+ : nullptr),
+ request_(nullptr),
+ method_(mrd->method_),
+ call_(
+ mrd->call_, server, &cq_, server->max_receive_message_size(),
+ ctx_.set_server_rpc_info(method_->name(), method_->method_type(),
+ server->interceptor_creators_)),
+ server_(server),
+ global_callbacks_(nullptr),
+ resources_(false) {
ctx_.set_call(mrd->call_);
ctx_.cq_ = &cq_;
GPR_ASSERT(mrd->in_flight_);
@@ -441,82 +441,82 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
}
}
- void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
- bool resources) {
- global_callbacks_ = global_callbacks;
- resources_ = resources;
+ void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
+ bool resources) {
+ global_callbacks_ = global_callbacks;
+ resources_ = resources;
- interceptor_methods_.SetCall(&call_);
- interceptor_methods_.SetReverse();
- // Set interception point for RECV INITIAL METADATA
- interceptor_methods_.AddInterceptionHookPoint(
+ interceptor_methods_.SetCall(&call_);
+ interceptor_methods_.SetReverse();
+ // Set interception point for RECV INITIAL METADATA
+ interceptor_methods_.AddInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::
POST_RECV_INITIAL_METADATA);
- interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_);
+ interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_);
- if (has_request_payload_) {
- // Set interception point for RECV MESSAGE
- auto* handler = resources_ ? method_->handler()
- : server_->resource_exhausted_handler_.get();
- request_ = handler->Deserialize(call_.call(), request_payload_,
+ if (has_request_payload_) {
+ // Set interception point for RECV MESSAGE
+ auto* handler = resources_ ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
+ request_ = handler->Deserialize(call_.call(), request_payload_,
&request_status_, nullptr);
- request_payload_ = nullptr;
- interceptor_methods_.AddInterceptionHookPoint(
+ request_payload_ = nullptr;
+ interceptor_methods_.AddInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
- interceptor_methods_.SetRecvMessage(request_, nullptr);
- }
-
- if (interceptor_methods_.RunInterceptors(
- [this]() { ContinueRunAfterInterception(); })) {
- ContinueRunAfterInterception();
- } else {
- // There were interceptors to be run, so ContinueRunAfterInterception
- // will be run when interceptors are done.
- }
+ interceptor_methods_.SetRecvMessage(request_, nullptr);
+ }
+
+ if (interceptor_methods_.RunInterceptors(
+ [this]() { ContinueRunAfterInterception(); })) {
+ ContinueRunAfterInterception();
+ } else {
+ // There were interceptors to be run, so ContinueRunAfterInterception
+ // will be run when interceptors are done.
+ }
}
- void ContinueRunAfterInterception() {
- {
- ctx_.BeginCompletionOp(&call_, nullptr, nullptr);
- global_callbacks_->PreSynchronousRequest(&ctx_);
- auto* handler = resources_ ? method_->handler()
- : server_->resource_exhausted_handler_.get();
+ void ContinueRunAfterInterception() {
+ {
+ ctx_.BeginCompletionOp(&call_, nullptr, nullptr);
+ global_callbacks_->PreSynchronousRequest(&ctx_);
+ auto* handler = resources_ ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
&call_, &ctx_, request_, request_status_, nullptr, nullptr));
- request_ = nullptr;
- global_callbacks_->PostSynchronousRequest(&ctx_);
-
- cq_.Shutdown();
-
+ request_ = nullptr;
+ global_callbacks_->PostSynchronousRequest(&ctx_);
+
+ cq_.Shutdown();
+
grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
- cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
-
- /* Ensure the cq_ is shutdown */
+ cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
+
+ /* Ensure the cq_ is shutdown */
grpc::DummyTag ignored_tag;
- GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
- }
- delete this;
- }
-
+ GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
+ }
+ delete this;
+ }
+
private:
grpc::CompletionQueue cq_;
grpc::ServerContext ctx_;
const bool has_request_payload_;
grpc_byte_buffer* request_payload_;
- void* request_;
+ void* request_;
grpc::Status request_status_;
grpc::internal::RpcServiceMethod* const method_;
grpc::internal::Call call_;
- Server* server_;
- std::shared_ptr<GlobalCallbacks> global_callbacks_;
- bool resources_;
+ Server* server_;
+ std::shared_ptr<GlobalCallbacks> global_callbacks_;
+ bool resources_;
grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
};
private:
grpc::internal::RpcServiceMethod* const method_;
- void* const method_tag_;
+ void* const method_tag_;
bool in_flight_;
const bool has_request_payload_;
grpc_call* call_;
@@ -541,19 +541,19 @@ class Server::CallbackRequest final
CallbackRequest(Server* server, grpc::internal::RpcServiceMethod* method,
grpc::CompletionQueue* cq,
grpc_core::Server::RegisteredCallAllocation* data)
- : server_(server),
- method_(method),
+ : server_(server),
+ method_(method),
has_request_payload_(method->method_type() ==
grpc::internal::RpcMethod::NORMAL_RPC ||
method->method_type() ==
grpc::internal::RpcMethod::SERVER_STREAMING),
cq_(cq),
- tag_(this) {
+ tag_(this) {
CommonSetup(server, data);
data->deadline = &deadline_;
data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr;
- }
-
+ }
+
// For generic services, method is nullptr since these services don't have
// pre-defined methods.
CallbackRequest(Server* server, grpc::CompletionQueue* cq,
@@ -567,8 +567,8 @@ class Server::CallbackRequest final
CommonSetup(server, data);
grpc_call_details_init(call_details_);
data->details = call_details_;
- }
-
+ }
+
~CallbackRequest() {
delete call_details_;
grpc_metadata_array_destroy(&request_metadata_);
@@ -576,21 +576,21 @@ class Server::CallbackRequest final
grpc_byte_buffer_destroy(request_payload_);
}
server_->UnrefWithPossibleNotify();
- }
-
+ }
+
// Needs specialization to account for different processing of metadata
// in generic API
bool FinalizeResult(void** tag, bool* status) override;
-
- private:
+
+ private:
// method_name needs to be specialized between named method and generic
const char* method_name() const;
- class CallbackCallTag : public grpc_experimental_completion_queue_functor {
- public:
+ class CallbackCallTag : public grpc_experimental_completion_queue_functor {
+ public:
CallbackCallTag(Server::CallbackRequest<ServerContextType>* req)
: req_(req) {
- functor_run = &CallbackCallTag::StaticRun;
+ functor_run = &CallbackCallTag::StaticRun;
// Set inlineable to true since this callback is internally-controlled
// without taking any locks, and thus does not need to be run from the
// executor (which triggers a thread hop). This should only be used by
@@ -598,42 +598,42 @@ class Server::CallbackRequest final
// here is actually non-trivial, but there is no chance of having user
// locks conflict with each other so it's ok to run inlined.
inlineable = true;
- }
-
- // force_run can not be performed on a tag if operations using this tag
- // have been sent to PerformOpsOnCall. It is intended for error conditions
- // that are detected before the operations are internally processed.
- void force_run(bool ok) { Run(ok); }
-
- private:
+ }
+
+ // force_run can not be performed on a tag if operations using this tag
+ // have been sent to PerformOpsOnCall. It is intended for error conditions
+ // that are detected before the operations are internally processed.
+ void force_run(bool ok) { Run(ok); }
+
+ private:
Server::CallbackRequest<ServerContextType>* req_;
grpc::internal::Call* call_;
-
- static void StaticRun(grpc_experimental_completion_queue_functor* cb,
- int ok) {
- static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok));
- }
- void Run(bool ok) {
- void* ignored = req_;
- bool new_ok = ok;
- GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok));
- GPR_ASSERT(ignored == req_);
-
- if (!ok) {
- // The call has been shutdown.
- // Delete its contents to free up the request.
- delete req_;
- return;
- }
-
- // Bind the call, deadline, and metadata from what we got
- req_->ctx_.set_call(req_->call_);
- req_->ctx_.cq_ = req_->cq_;
- req_->ctx_.BindDeadlineAndMetadata(req_->deadline_,
- &req_->request_metadata_);
- req_->request_metadata_.count = 0;
-
- // Create a C++ Call to control the underlying core call
+
+ static void StaticRun(grpc_experimental_completion_queue_functor* cb,
+ int ok) {
+ static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok));
+ }
+ void Run(bool ok) {
+ void* ignored = req_;
+ bool new_ok = ok;
+ GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok));
+ GPR_ASSERT(ignored == req_);
+
+ if (!ok) {
+ // The call has been shutdown.
+ // Delete its contents to free up the request.
+ delete req_;
+ return;
+ }
+
+ // Bind the call, deadline, and metadata from what we got
+ req_->ctx_.set_call(req_->call_);
+ req_->ctx_.cq_ = req_->cq_;
+ req_->ctx_.BindDeadlineAndMetadata(req_->deadline_,
+ &req_->request_metadata_);
+ req_->request_metadata_.count = 0;
+
+ // Create a C++ Call to control the underlying core call
call_ =
new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call)))
grpc::internal::Call(
@@ -645,71 +645,71 @@ class Server::CallbackRequest final
? req_->method_->method_type()
: grpc::internal::RpcMethod::BIDI_STREAMING,
req_->server_->interceptor_creators_));
-
- req_->interceptor_methods_.SetCall(call_);
- req_->interceptor_methods_.SetReverse();
- // Set interception point for RECV INITIAL METADATA
- req_->interceptor_methods_.AddInterceptionHookPoint(
+
+ req_->interceptor_methods_.SetCall(call_);
+ req_->interceptor_methods_.SetReverse();
+ // Set interception point for RECV INITIAL METADATA
+ req_->interceptor_methods_.AddInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::
POST_RECV_INITIAL_METADATA);
- req_->interceptor_methods_.SetRecvInitialMetadata(
- &req_->ctx_.client_metadata_);
-
- if (req_->has_request_payload_) {
- // Set interception point for RECV MESSAGE
- req_->request_ = req_->method_->handler()->Deserialize(
+ req_->interceptor_methods_.SetRecvInitialMetadata(
+ &req_->ctx_.client_metadata_);
+
+ if (req_->has_request_payload_) {
+ // Set interception point for RECV MESSAGE
+ req_->request_ = req_->method_->handler()->Deserialize(
req_->call_, req_->request_payload_, &req_->request_status_,
&req_->handler_data_);
- req_->request_payload_ = nullptr;
- req_->interceptor_methods_.AddInterceptionHookPoint(
+ req_->request_payload_ = nullptr;
+ req_->interceptor_methods_.AddInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
- req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr);
- }
-
- if (req_->interceptor_methods_.RunInterceptors(
- [this] { ContinueRunAfterInterception(); })) {
- ContinueRunAfterInterception();
- } else {
- // There were interceptors to be run, so ContinueRunAfterInterception
- // will be run when interceptors are done.
- }
- }
- void ContinueRunAfterInterception() {
+ req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr);
+ }
+
+ if (req_->interceptor_methods_.RunInterceptors(
+ [this] { ContinueRunAfterInterception(); })) {
+ ContinueRunAfterInterception();
+ } else {
+ // There were interceptors to be run, so ContinueRunAfterInterception
+ // will be run when interceptors are done.
+ }
+ }
+ void ContinueRunAfterInterception() {
auto* handler = (req_->method_ != nullptr)
? req_->method_->handler()
: req_->server_->generic_handler_.get();
handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
call_, &req_->ctx_, req_->request_, req_->request_status_,
req_->handler_data_, [this] { delete req_; }));
- }
- };
-
+ }
+ };
+
template <class CallAllocation>
void CommonSetup(Server* server, CallAllocation* data) {
server->Ref();
- grpc_metadata_array_init(&request_metadata_);
+ grpc_metadata_array_init(&request_metadata_);
data->tag = &tag_;
data->call = &call_;
data->initial_metadata = &request_metadata_;
- }
-
- Server* const server_;
+ }
+
+ Server* const server_;
grpc::internal::RpcServiceMethod* const method_;
- const bool has_request_payload_;
+ const bool has_request_payload_;
grpc_byte_buffer* request_payload_ = nullptr;
void* request_ = nullptr;
void* handler_data_ = nullptr;
grpc::Status request_status_;
grpc_call_details* const call_details_ = nullptr;
- grpc_call* call_;
- gpr_timespec deadline_;
- grpc_metadata_array request_metadata_;
+ grpc_call* call_;
+ gpr_timespec deadline_;
+ grpc_metadata_array request_metadata_;
grpc::CompletionQueue* const cq_;
- CallbackCallTag tag_;
+ CallbackCallTag tag_;
ServerContextType ctx_;
grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
-};
-
+};
+
template <>
bool Server::CallbackRequest<grpc::CallbackServerContext>::FinalizeResult(
void** /*tag*/, bool* /*status*/) {
@@ -750,13 +750,13 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
public:
SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq,
std::shared_ptr<GlobalCallbacks> global_callbacks,
- grpc_resource_quota* rq, int min_pollers,
- int max_pollers, int cq_timeout_msec)
- : ThreadManager("SyncServer", rq, min_pollers, max_pollers),
+ grpc_resource_quota* rq, int min_pollers,
+ int max_pollers, int cq_timeout_msec)
+ : ThreadManager("SyncServer", rq, min_pollers, max_pollers),
server_(server),
server_cq_(server_cq),
cq_timeout_msec_(cq_timeout_msec),
- global_callbacks_(std::move(global_callbacks)) {}
+ global_callbacks_(std::move(global_callbacks)) {}
WorkStatus PollForWork(void** tag, bool* ok) override {
*tag = nullptr;
@@ -778,7 +778,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
GPR_UNREACHABLE_CODE(return TIMEOUT);
}
- void DoWork(void* tag, bool ok, bool resources) override {
+ void DoWork(void* tag, bool ok, bool resources) override {
SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
if (!sync_req) {
@@ -789,9 +789,9 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
}
if (ok) {
- // Calldata takes ownership of the completion queue and interceptors
- // inside sync_req
- auto* cd = new SyncRequest::CallData(server_, sync_req);
+ // Calldata takes ownership of the completion queue and interceptors
+ // inside sync_req
+ auto* cd = new SyncRequest::CallData(server_, sync_req);
// Prepare for the next request
if (!IsShutdown()) {
sync_req->SetupRequest(); // Create new completion queue for sync_req
@@ -799,7 +799,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
}
GPR_TIMER_SCOPE("cd.Run()", 0);
- cd->Run(global_callbacks_, resources);
+ cd->Run(global_callbacks_, resources);
}
// TODO (sreek) If ok is false here (which it isn't in case of
// grpc_request_registered_call), we should still re-queue the request
@@ -831,17 +831,17 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
void* tag;
bool ok;
while (server_cq_->Next(&tag, &ok)) {
- if (ok) {
- // If a request was pulled off the queue, it means that the thread
- // handling the request added it to the completion queue after shutdown
- // was called - because the thread had already started and checked the
- // shutdown flag before shutdown was called. In this case, we simply
- // clean it up here, *after* calling wait on all the worker threads, at
- // which point we are certain no in-flight requests will add more to the
- // queue. This fixes an intermittent memory leak on shutdown.
- SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
- sync_req->PostShutdownCleanup();
- }
+ if (ok) {
+ // If a request was pulled off the queue, it means that the thread
+ // handling the request added it to the completion queue after shutdown
+ // was called - because the thread had already started and checked the
+ // shutdown flag before shutdown was called. In this case, we simply
+ // clean it up here, *after* calling wait on all the worker threads, at
+ // which point we are certain no in-flight requests will add more to the
+ // queue. This fixes an intermittent memory leak on shutdown.
+ SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
+ sync_req->PostShutdownCleanup();
+ }
}
}
@@ -870,17 +870,17 @@ Server::Server(
grpc::ChannelArguments* args,
std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>
sync_server_cqs,
- int min_pollers, int max_pollers, int sync_cq_timeout_msec,
+ int min_pollers, int max_pollers, int sync_cq_timeout_msec,
std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
acceptors,
- grpc_resource_quota* server_rq,
- std::vector<
+ grpc_resource_quota* server_rq,
+ std::vector<
std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>
- interceptor_creators)
+ interceptor_creators)
: acceptors_(std::move(acceptors)),
interceptor_creators_(std::move(interceptor_creators)),
max_receive_message_size_(INT_MIN),
- sync_server_cqs_(std::move(sync_server_cqs)),
+ sync_server_cqs_(std::move(sync_server_cqs)),
started_(false),
shutdown_(false),
shutdown_notified_(false),
@@ -893,23 +893,23 @@ Server::Server(
global_callbacks_->UpdateArguments(args);
if (sync_server_cqs_ != nullptr) {
- bool default_rq_created = false;
- if (server_rq == nullptr) {
- server_rq = grpc_resource_quota_create("SyncServer-default-rq");
- grpc_resource_quota_set_max_threads(server_rq,
- DEFAULT_MAX_SYNC_SERVER_THREADS);
- default_rq_created = true;
- }
-
+ bool default_rq_created = false;
+ if (server_rq == nullptr) {
+ server_rq = grpc_resource_quota_create("SyncServer-default-rq");
+ grpc_resource_quota_set_max_threads(server_rq,
+ DEFAULT_MAX_SYNC_SERVER_THREADS);
+ default_rq_created = true;
+ }
+
for (const auto& it : *sync_server_cqs_) {
sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
- this, it.get(), global_callbacks_, server_rq, min_pollers,
- max_pollers, sync_cq_timeout_msec));
+ this, it.get(), global_callbacks_, server_rq, min_pollers,
+ max_pollers, sync_cq_timeout_msec));
+ }
+
+ if (default_rq_created) {
+ grpc_resource_quota_unref(server_rq);
}
-
- if (default_rq_created) {
- grpc_resource_quota_unref(server_rq);
- }
}
for (auto& acceptor : acceptors_) {
@@ -974,24 +974,24 @@ std::shared_ptr<grpc::Channel> Server::InProcessChannel(
const grpc::ChannelArguments& args) {
grpc_channel_args channel_args = args.c_channel_args();
return grpc::CreateChannelInternal(
- "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr),
+ "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr),
std::vector<std::unique_ptr<
grpc::experimental::ClientInterceptorFactoryInterface>>());
}
std::shared_ptr<grpc::Channel>
-Server::experimental_type::InProcessChannelWithInterceptors(
+Server::experimental_type::InProcessChannelWithInterceptors(
const grpc::ChannelArguments& args,
- std::vector<
+ std::vector<
std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
- interceptor_creators) {
- grpc_channel_args channel_args = args.c_channel_args();
+ interceptor_creators) {
+ grpc_channel_args channel_args = args.c_channel_args();
return grpc::CreateChannelInternal(
- "inproc",
- grpc_inproc_channel_create(server_->server_, &channel_args, nullptr),
- std::move(interceptor_creators));
-}
-
+ "inproc",
+ grpc_inproc_channel_create(server_->server_, &channel_args, nullptr),
+ std::move(interceptor_creators));
+}
+
static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
grpc::internal::RpcServiceMethod* method) {
switch (method->method_type()) {
@@ -1014,29 +1014,29 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) {
}
const char* method_name = nullptr;
-
+
for (const auto& method : service->methods_) {
if (method.get() == nullptr) { // Handled by generic service if any.
continue;
}
- void* method_registration_tag = grpc_server_register_method(
+ void* method_registration_tag = grpc_server_register_method(
server_, method->name(), host ? host->c_str() : nullptr,
PayloadHandlingForMethod(method.get()), 0);
- if (method_registration_tag == nullptr) {
+ if (method_registration_tag == nullptr) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
return false;
}
- if (method->handler() == nullptr) { // Async method without handler
- method->set_server_tag(method_registration_tag);
- } else if (method->api_type() ==
+ if (method->handler() == nullptr) { // Async method without handler
+ method->set_server_tag(method_registration_tag);
+ } else if (method->api_type() ==
grpc::internal::RpcServiceMethod::ApiType::SYNC) {
for (const auto& value : sync_req_mgrs_) {
value->AddSyncMethod(method.get(), method_registration_tag);
}
- } else {
+ } else {
has_callback_methods_ = true;
grpc::internal::RpcServiceMethod* method_value = method.get();
grpc::CompletionQueue* cq = CallbackCQ();
@@ -1130,23 +1130,23 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
// explicit one.
grpc::ServerCompletionQueue* health_check_cq = nullptr;
grpc::DefaultHealthCheckService::HealthCheckServiceImpl*
- default_health_check_service_impl = nullptr;
+ default_health_check_service_impl = nullptr;
if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
grpc::DefaultHealthCheckServiceEnabled()) {
auto* default_hc_service = new grpc::DefaultHealthCheckService;
- health_check_service_.reset(default_hc_service);
- // We create a non-polling CQ to avoid impacting application
- // performance. This ensures that we don't introduce thread hops
- // for application requests that wind up on this CQ, which is polled
- // in its own thread.
+ health_check_service_.reset(default_hc_service);
+ // We create a non-polling CQ to avoid impacting application
+ // performance. This ensures that we don't introduce thread hops
+ // for application requests that wind up on this CQ, which is polled
+ // in its own thread.
health_check_cq = new grpc::ServerCompletionQueue(
GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr);
- grpc_server_register_completion_queue(server_, health_check_cq->cq(),
- nullptr);
- default_health_check_service_impl =
- default_hc_service->GetHealthCheckService(
+ grpc_server_register_completion_queue(server_, health_check_cq->cq(),
+ nullptr);
+ default_health_check_service_impl =
+ default_hc_service->GetHealthCheckService(
std::unique_ptr<grpc::ServerCompletionQueue>(health_check_cq));
- RegisterService(nullptr, default_health_check_service_impl);
+ RegisterService(nullptr, default_health_check_service_impl);
}
for (auto& acceptor : acceptors_) {
@@ -1179,26 +1179,26 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
new UnimplementedAsyncRequest(this, cqs[i]);
}
}
- if (health_check_cq != nullptr) {
- new UnimplementedAsyncRequest(this, health_check_cq);
- }
+ if (health_check_cq != nullptr) {
+ new UnimplementedAsyncRequest(this, health_check_cq);
+ }
}
- // If this server has any support for synchronous methods (has any sync
- // server CQs), make sure that we have a ResourceExhausted handler
- // to deal with the case of thread exhaustion
- if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) {
+ // If this server has any support for synchronous methods (has any sync
+ // server CQs), make sure that we have a ResourceExhausted handler
+ // to deal with the case of thread exhaustion
+ if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) {
resource_exhausted_handler_.reset(
new grpc::internal::ResourceExhaustedHandler);
- }
-
+ }
+
for (const auto& value : sync_req_mgrs_) {
value->Start();
}
-
- if (default_health_check_service_impl != nullptr) {
- default_health_check_service_impl->StartServingThread();
- }
+
+ if (default_health_check_service_impl != nullptr) {
+ default_health_check_service_impl->StartServingThread();
+ }
for (auto& acceptor : acceptors_) {
acceptor->Start();
@@ -1207,50 +1207,50 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
void Server::ShutdownInternal(gpr_timespec deadline) {
grpc::internal::MutexLock lock(&mu_);
- if (shutdown_) {
- return;
- }
+ if (shutdown_) {
+ return;
+ }
- shutdown_ = true;
+ shutdown_ = true;
for (auto& acceptor : acceptors_) {
acceptor->Shutdown();
}
- /// The completion queue to use for server shutdown completion notification
+ /// The completion queue to use for server shutdown completion notification
grpc::CompletionQueue shutdown_cq;
grpc::ShutdownTag shutdown_tag; // Dummy shutdown tag
- grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
+ grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
- shutdown_cq.Shutdown();
+ shutdown_cq.Shutdown();
- void* tag;
- bool ok;
+ void* tag;
+ bool ok;
grpc::CompletionQueue::NextStatus status =
- shutdown_cq.AsyncNext(&tag, &ok, deadline);
+ shutdown_cq.AsyncNext(&tag, &ok, deadline);
- // If this timed out, it means we are done with the grace period for a clean
- // shutdown. We should force a shutdown now by cancelling all inflight calls
+ // If this timed out, it means we are done with the grace period for a clean
+ // shutdown. We should force a shutdown now by cancelling all inflight calls
if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) {
- grpc_server_cancel_all_calls(server_);
- }
- // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
- // successfully shutdown
+ grpc_server_cancel_all_calls(server_);
+ }
+ // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
+ // successfully shutdown
- // Shutdown all ThreadManagers. This will try to gracefully stop all the
- // threads in the ThreadManagers (once they process any inflight requests)
+ // Shutdown all ThreadManagers. This will try to gracefully stop all the
+ // threads in the ThreadManagers (once they process any inflight requests)
for (const auto& value : sync_req_mgrs_) {
value->Shutdown(); // ThreadManager's Shutdown()
- }
+ }
- // Wait for threads in all ThreadManagers to terminate
+ // Wait for threads in all ThreadManagers to terminate
for (const auto& value : sync_req_mgrs_) {
value->Wait();
- }
+ }
// Drop the shutdown ref and wait for all other refs to drop as well.
UnrefAndWaitLocked();
-
+
// Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it
// will delete itself at true shutdown.
if (callback_cq_ != nullptr) {
@@ -1258,13 +1258,13 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
callback_cq_ = nullptr;
}
- // Drain the shutdown queue (if the previous call to AsyncNext() timed out
- // and we didn't remove the tag from the queue yet)
- while (shutdown_cq.Next(&tag, &ok)) {
- // Nothing to be done here. Just ignore ok and tag values
- }
-
- shutdown_notified_ = true;
+ // Drain the shutdown queue (if the previous call to AsyncNext() timed out
+ // and we didn't remove the tag from the queue yet)
+ while (shutdown_cq.Next(&tag, &ok)) {
+ // Nothing to be done here. Just ignore ok and tag values
+ }
+
+ shutdown_notified_ = true;
shutdown_cv_.Broadcast();
#ifndef NDEBUG
@@ -1286,23 +1286,23 @@ void Server::Wait() {
void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops,
grpc::internal::Call* call) {
- ops->FillOps(call);
+ ops->FillOps(call);
}
bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
bool* status) {
- if (GenericAsyncRequest::FinalizeResult(tag, status)) {
- // We either had no interceptors run or we are done intercepting
- if (*status) {
+ if (GenericAsyncRequest::FinalizeResult(tag, status)) {
+ // We either had no interceptors run or we are done intercepting
+ if (*status) {
// Create a new request/response pair using the server and CQ values
// stored in this object's base class.
new UnimplementedAsyncRequest(server_, notification_cq_);
- new UnimplementedAsyncResponse(this);
- } else {
- delete this;
- }
+ new UnimplementedAsyncResponse(this);
+ } else {
+ delete this;
+ }
} else {
- // The tag was swallowed due to interception. We will see it again.
+ // The tag was swallowed due to interception. We will see it again.
}
return false;
}
@@ -1320,8 +1320,8 @@ grpc::ServerInitializer* Server::initializer() {
}
grpc::CompletionQueue* Server::CallbackCQ() {
- // TODO(vjpai): Consider using a single global CQ for the default CQ
- // if there is no explicit per-server CQ registered
+ // TODO(vjpai): Consider using a single global CQ for the default CQ
+ // if there is no explicit per-server CQ registered
grpc::internal::MutexLock l(&mu_);
if (callback_cq_ != nullptr) {
return callback_cq_;
@@ -1330,11 +1330,11 @@ grpc::CompletionQueue* Server::CallbackCQ() {
callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
shutdown_callback});
-
+
// Transfer ownership of the new cq to its own shutdown callback
shutdown_callback->TakeCQ(callback_cq_);
- return callback_cq_;
+ return callback_cq_;
}
-
+
} // namespace grpc