diff options
author | neksard <neksard@yandex-team.ru> | 2022-02-10 16:45:23 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:23 +0300 |
commit | 8f7cf138264e0caa318144bf8a2c950e0b0a8593 (patch) | |
tree | 83bf5c8c8047c42d8475e6095df90ccdc3d1b57f /contrib/libs/grpc/src/cpp/server/server_context.cc | |
parent | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (diff) | |
download | ydb-8f7cf138264e0caa318144bf8a2c950e0b0a8593.tar.gz |
Restoring authorship annotation for <neksard@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/libs/grpc/src/cpp/server/server_context.cc')
-rw-r--r-- | contrib/libs/grpc/src/cpp/server/server_context.cc | 216 |
1 files changed, 108 insertions, 108 deletions
diff --git a/contrib/libs/grpc/src/cpp/server/server_context.cc b/contrib/libs/grpc/src/cpp/server/server_context.cc index 458ac20d87..fea258e6e7 100644 --- a/contrib/libs/grpc/src/cpp/server/server_context.cc +++ b/contrib/libs/grpc/src/cpp/server/server_context.cc @@ -43,50 +43,50 @@ class ServerContextBase::CompletionOp final : public internal::CallOpSetInterface { public: // initial refs: one in the server context, one in the cq - // must ref the call before calling constructor and after deleting this + // must ref the call before calling constructor and after deleting this CompletionOp(internal::Call* call, ::grpc::internal::ServerCallbackCall* callback_controller) - : call_(*call), + : call_(*call), callback_controller_(callback_controller), - has_tag_(false), + has_tag_(false), tag_(nullptr), - core_cq_tag_(this), + core_cq_tag_(this), refs_(2), finalized_(false), - cancelled_(0), - done_intercepting_(false) {} - - // CompletionOp isn't copyable or movable - CompletionOp(const CompletionOp&) = delete; - CompletionOp& operator=(const CompletionOp&) = delete; - CompletionOp(CompletionOp&&) = delete; - CompletionOp& operator=(CompletionOp&&) = delete; - - ~CompletionOp() { - if (call_.server_rpc_info()) { - call_.server_rpc_info()->Unref(); - } - } - + cancelled_(0), + done_intercepting_(false) {} + + // CompletionOp isn't copyable or movable + CompletionOp(const CompletionOp&) = delete; + CompletionOp& operator=(const CompletionOp&) = delete; + CompletionOp(CompletionOp&&) = delete; + CompletionOp& operator=(CompletionOp&&) = delete; + + ~CompletionOp() { + if (call_.server_rpc_info()) { + call_.server_rpc_info()->Unref(); + } + } + void FillOps(internal::Call* call) override; - - // This should always be arena allocated in the call, so override delete. - // But this class is not trivially destructible, so must actually call delete - // before allowing the arena to be freed + + // This should always be arena allocated in the call, so override delete. + // But this class is not trivially destructible, so must actually call delete + // before allowing the arena to be freed static void operator delete(void* /*ptr*/, std::size_t size) { // Use size to avoid unused-parameter warning since assert seems to be // compiled out and treated as unused in some gcc optimized versions. (void)size; - assert(size == sizeof(CompletionOp)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { assert(0); } - + assert(size == sizeof(CompletionOp)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + bool FinalizeResult(void** tag, bool* status) override; bool CheckCancelled(CompletionQueue* cq) { @@ -100,36 +100,36 @@ class ServerContextBase::CompletionOp final tag_ = tag; } - void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; } - - void* core_cq_tag() override { return core_cq_tag_; } - + void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; } + + void* core_cq_tag() override { return core_cq_tag_; } + void Unref(); - // This will be called while interceptors are run if the RPC is a hijacked - // RPC. This should set hijacking state for each of the ops. - void SetHijackingState() override { - /* Servers don't allow hijacking */ + // This will be called while interceptors are run if the RPC is a hijacked + // RPC. This should set hijacking state for each of the ops. + void SetHijackingState() override { + /* Servers don't allow hijacking */ GPR_ASSERT(false); - } - - /* Should be called after interceptors are done running */ - void ContinueFillOpsAfterInterception() override {} - - /* Should be called after interceptors are done running on the finalize result - * path */ - void ContinueFinalizeResultAfterInterception() override { - done_intercepting_ = true; - if (!has_tag_) { - /* We don't have a tag to return. */ + } + + /* Should be called after interceptors are done running */ + void ContinueFillOpsAfterInterception() override {} + + /* Should be called after interceptors are done running on the finalize result + * path */ + void ContinueFinalizeResultAfterInterception() override { + done_intercepting_ = true; + if (!has_tag_) { + /* We don't have a tag to return. */ Unref(); - return; - } - /* Start a dummy op so that we can return the tag */ + return; + } + /* Start a dummy op so that we can return the tag */ GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, nullptr) == GRPC_CALL_OK); - } - + } + private: bool CheckCancelledNoPluck() { grpc_core::MutexLock lock(&mu_); @@ -140,37 +140,37 @@ class ServerContextBase::CompletionOp final ::grpc::internal::ServerCallbackCall* const callback_controller_; bool has_tag_; void* tag_; - void* core_cq_tag_; + void* core_cq_tag_; grpc_core::RefCount refs_; grpc_core::Mutex mu_; bool finalized_; - int cancelled_; // This is an int (not bool) because it is passed to core - bool done_intercepting_; + int cancelled_; // This is an int (not bool) because it is passed to core + bool done_intercepting_; internal::InterceptorBatchMethodsImpl interceptor_methods_; }; void ServerContextBase::CompletionOp::Unref() { if (refs_.Unref()) { - grpc_call* call = call_.call(); + grpc_call* call = call_.call(); delete this; - grpc_call_unref(call); + grpc_call_unref(call); } } void ServerContextBase::CompletionOp::FillOps(internal::Call* call) { - grpc_op ops; - ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER; - ops.data.recv_close_on_server.cancelled = &cancelled_; - ops.flags = 0; - ops.reserved = nullptr; - interceptor_methods_.SetCall(&call_); - interceptor_methods_.SetReverse(); - interceptor_methods_.SetCallOpSetInterface(this); + grpc_op ops; + ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER; + ops.data.recv_close_on_server.cancelled = &cancelled_; + ops.flags = 0; + ops.reserved = nullptr; + interceptor_methods_.SetCall(&call_); + interceptor_methods_.SetReverse(); + interceptor_methods_.SetCallOpSetInterface(this); // The following call_start_batch is internally-generated so no need for an // explanatory log on failure. GPR_ASSERT(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_, nullptr) == GRPC_CALL_OK); - /* No interceptors to run here */ + /* No interceptors to run here */ } bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { @@ -187,9 +187,9 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { } Unref(); return has_tag; - } + } finalized_ = true; - + // If for some reason the incoming status is false, mark that as a // cancellation. // TODO(vjpai): does this ever happen? @@ -200,24 +200,24 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { call_cancel = (cancelled_ != 0); // Release the lock since we may call a callback and interceptors. } - + if (call_cancel && callback_controller_ != nullptr) { callback_controller_->MaybeCallOnCancel(); } - /* Add interception point and run through interceptors */ - interceptor_methods_.AddInterceptionHookPoint( + /* Add interception point and run through interceptors */ + interceptor_methods_.AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_CLOSE); - if (interceptor_methods_.RunInterceptors()) { + if (interceptor_methods_.RunInterceptors()) { // No interceptors were run bool has_tag = has_tag_; if (has_tag) { - *tag = tag_; - } + *tag = tag_; + } Unref(); return has_tag; - } + } // There are interceptors to be run. Return false for now. - return false; + return false; } // ServerContextBase body @@ -233,17 +233,17 @@ ServerContextBase::ServerContextBase(gpr_timespec deadline, void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline, grpc_metadata_array* arr) { - deadline_ = deadline; - std::swap(*client_metadata_.arr(), *arr); -} - + deadline_ = deadline; + std::swap(*client_metadata_.arr(), *arr); +} + ServerContextBase::~ServerContextBase() { if (completion_op_) { completion_op_->Unref(); } - if (rpc_info_) { - rpc_info_->Unref(); - } + if (rpc_info_) { + rpc_info_->Unref(); + } if (default_reactor_used_.load(std::memory_order_relaxed)) { reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor(); } @@ -261,19 +261,19 @@ void ServerContextBase::BeginCompletionOp( internal::Call* call, std::function<void(bool)> callback, ::grpc::internal::ServerCallbackCall* callback_controller) { GPR_ASSERT(!completion_op_); - if (rpc_info_) { - rpc_info_->Ref(); - } - grpc_call_ref(call->call()); - completion_op_ = - new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) + if (rpc_info_) { + rpc_info_->Ref(); + } + grpc_call_ref(call->call()); + completion_op_ = + new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) CompletionOp(call, callback_controller); if (callback_controller != nullptr) { completion_tag_.Set(call->call(), std::move(callback), completion_op_, true); - completion_op_->set_core_cq_tag(&completion_tag_); - completion_op_->set_tag(completion_op_); - } else if (has_notify_when_done_tag_) { + completion_op_->set_core_cq_tag(&completion_tag_); + completion_op_->set_tag(completion_op_); + } else if (has_notify_when_done_tag_) { completion_op_->set_tag(async_notify_when_done_tag_); } call->PerformOps(completion_op_); @@ -295,11 +295,11 @@ void ServerContextBase::AddTrailingMetadata(const TString& key, void ServerContextBase::TryCancel() const { internal::CancelInterceptorBatchMethods cancel_methods; - if (rpc_info_) { - for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) { - rpc_info_->RunInterceptor(&cancel_methods, i); - } - } + if (rpc_info_) { + for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) { + rpc_info_->RunInterceptor(&cancel_methods, i); + } + } grpc_call_error err = grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr); @@ -309,15 +309,15 @@ void ServerContextBase::TryCancel() const { } bool ServerContextBase::IsCancelled() const { - if (completion_tag_) { - // When using callback API, this result is always valid. - return completion_op_->CheckCancelledAsync(); - } else if (has_notify_when_done_tag_) { - // When using async API, the result is only valid + if (completion_tag_) { + // When using callback API, this result is always valid. + return completion_op_->CheckCancelledAsync(); + } else if (has_notify_when_done_tag_) { + // When using async API, the result is only valid // if the tag has already been delivered at the completion queue return completion_op_ && completion_op_->CheckCancelledAsync(); } else { - // when using sync API, the result is always valid + // when using sync API, the result is always valid return completion_op_ && completion_op_->CheckCancelled(cq_); } } |