diff options
author | heretic <heretic@yandex-team.ru> | 2022-03-25 12:34:53 +0300 |
---|---|---|
committer | heretic <heretic@yandex-team.ru> | 2022-03-25 12:34:53 +0300 |
commit | a41f3739eed6fceb6f62056a7620d220958a47e7 (patch) | |
tree | 278103258b510cb4a96761ea79d6ccd397ca05a0 /contrib/libs/grpc/src/cpp/client/channel_cc.cc | |
parent | 73d3613a82e5c217fcbe0ab8bbf8120c1ed1af55 (diff) | |
download | ydb-a41f3739eed6fceb6f62056a7620d220958a47e7.tar.gz |
Update grpc to 1.43.2 DTCC-864
ref:50a492c335cda70f458797cf945e49fe739c2715
Diffstat (limited to 'contrib/libs/grpc/src/cpp/client/channel_cc.cc')
-rw-r--r-- | contrib/libs/grpc/src/cpp/client/channel_cc.cc | 38 |
1 files changed, 23 insertions, 15 deletions
diff --git a/contrib/libs/grpc/src/cpp/client/channel_cc.cc b/contrib/libs/grpc/src/cpp/client/channel_cc.cc index 1793161d6c..280537bbbd 100644 --- a/contrib/libs/grpc/src/cpp/client/channel_cc.cc +++ b/contrib/libs/grpc/src/cpp/client/channel_cc.cc @@ -16,8 +16,6 @@ * */ -#include <grpcpp/channel.h> - #include <cstring> #include <memory> @@ -27,6 +25,7 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include <grpcpp/channel.h> #include <grpcpp/client_context.h> #include <grpcpp/completion_queue.h> #include <grpcpp/impl/call.h> @@ -57,12 +56,13 @@ Channel::Channel(const TString& host, grpc_channel* channel, Channel::~Channel() { grpc_channel_destroy(c_channel_); - if (callback_cq_ != nullptr) { + CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_relaxed); + if (callback_cq != nullptr) { if (grpc_iomgr_run_in_background()) { // gRPC-core provides the backing needed for the preferred CQ type - callback_cq_->Shutdown(); + callback_cq->Shutdown(); } else { - CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_); + CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq); } } } @@ -146,9 +146,9 @@ void ChannelResetConnectionBackoff(Channel* channel) { // ClientRpcInfo should be set before call because set_call also checks // whether the call has been cancelled, and if the call was cancelled, we // should notify the interceptors too. - auto* info = - context->set_client_rpc_info(method.name(), method.method_type(), this, - interceptor_creators_, interceptor_pos); + auto* info = context->set_client_rpc_info( + method.name(), method.suffix_for_stats(), method.method_type(), this, + interceptor_creators_, interceptor_pos); context->set_call(c_call, shared_from_this()); return ::grpc::internal::Call(c_call, this, cq, info); @@ -213,7 +213,7 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed, } namespace { -class ShutdownCallback : public grpc_experimental_completion_queue_functor { +class ShutdownCallback : public grpc_completion_queue_functor { public: ShutdownCallback() { functor_run = &ShutdownCallback::Run; @@ -229,7 +229,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { // The Run function will get invoked by the completion queue library // when the shutdown is actually complete - static void Run(grpc_experimental_completion_queue_functor* cb, int) { + static void Run(grpc_completion_queue_functor* cb, int) { auto* callback = static_cast<ShutdownCallback*>(cb); delete callback->cq_; delete callback; @@ -243,25 +243,33 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { ::grpc::CompletionQueue* Channel::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ // if there is no explicit per-channel CQ registered + CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_acquire); + if (callback_cq != nullptr) { + return callback_cq; + } + // The callback_cq_ wasn't already set, so grab a lock and set it up exactly + // once for this channel. grpc::internal::MutexLock l(&mu_); - if (callback_cq_ == nullptr) { + callback_cq = callback_cq_.load(std::memory_order_relaxed); + if (callback_cq == nullptr) { if (grpc_iomgr_run_in_background()) { // gRPC-core provides the backing needed for the preferred CQ type auto* shutdown_callback = new ShutdownCallback; - callback_cq_ = + callback_cq = new ::grpc::CompletionQueue(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}); // Transfer ownership of the new cq to its own shutdown callback - shutdown_callback->TakeCQ(callback_cq_); + shutdown_callback->TakeCQ(callback_cq); } else { // Otherwise we need to use the alternative CQ variant - callback_cq_ = CompletionQueue::CallbackAlternativeCQ(); + callback_cq = CompletionQueue::CallbackAlternativeCQ(); } + callback_cq_.store(callback_cq, std::memory_order_release); } - return callback_cq_; + return callback_cq; } } // namespace grpc |