aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/grpc/src/cpp/client/channel_cc.cc
diff options
context:
space:
mode:
authorheretic <heretic@yandex-team.ru>2022-03-25 12:34:53 +0300
committerheretic <heretic@yandex-team.ru>2022-03-25 12:34:53 +0300
commita41f3739eed6fceb6f62056a7620d220958a47e7 (patch)
tree278103258b510cb4a96761ea79d6ccd397ca05a0 /contrib/libs/grpc/src/cpp/client/channel_cc.cc
parent73d3613a82e5c217fcbe0ab8bbf8120c1ed1af55 (diff)
downloadydb-a41f3739eed6fceb6f62056a7620d220958a47e7.tar.gz
Update grpc to 1.43.2 DTCC-864
ref:50a492c335cda70f458797cf945e49fe739c2715
Diffstat (limited to 'contrib/libs/grpc/src/cpp/client/channel_cc.cc')
-rw-r--r--contrib/libs/grpc/src/cpp/client/channel_cc.cc38
1 files changed, 23 insertions, 15 deletions
diff --git a/contrib/libs/grpc/src/cpp/client/channel_cc.cc b/contrib/libs/grpc/src/cpp/client/channel_cc.cc
index 1793161d6c..280537bbbd 100644
--- a/contrib/libs/grpc/src/cpp/client/channel_cc.cc
+++ b/contrib/libs/grpc/src/cpp/client/channel_cc.cc
@@ -16,8 +16,6 @@
*
*/
-#include <grpcpp/channel.h>
-
#include <cstring>
#include <memory>
@@ -27,6 +25,7 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
+#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/completion_queue.h>
#include <grpcpp/impl/call.h>
@@ -57,12 +56,13 @@ Channel::Channel(const TString& host, grpc_channel* channel,
Channel::~Channel() {
grpc_channel_destroy(c_channel_);
- if (callback_cq_ != nullptr) {
+ CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_relaxed);
+ if (callback_cq != nullptr) {
if (grpc_iomgr_run_in_background()) {
// gRPC-core provides the backing needed for the preferred CQ type
- callback_cq_->Shutdown();
+ callback_cq->Shutdown();
} else {
- CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_);
+ CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq);
}
}
}
@@ -146,9 +146,9 @@ void ChannelResetConnectionBackoff(Channel* channel) {
// ClientRpcInfo should be set before call because set_call also checks
// whether the call has been cancelled, and if the call was cancelled, we
// should notify the interceptors too.
- auto* info =
- context->set_client_rpc_info(method.name(), method.method_type(), this,
- interceptor_creators_, interceptor_pos);
+ auto* info = context->set_client_rpc_info(
+ method.name(), method.suffix_for_stats(), method.method_type(), this,
+ interceptor_creators_, interceptor_pos);
context->set_call(c_call, shared_from_this());
return ::grpc::internal::Call(c_call, this, cq, info);
@@ -213,7 +213,7 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed,
}
namespace {
-class ShutdownCallback : public grpc_experimental_completion_queue_functor {
+class ShutdownCallback : public grpc_completion_queue_functor {
public:
ShutdownCallback() {
functor_run = &ShutdownCallback::Run;
@@ -229,7 +229,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
// The Run function will get invoked by the completion queue library
// when the shutdown is actually complete
- static void Run(grpc_experimental_completion_queue_functor* cb, int) {
+ static void Run(grpc_completion_queue_functor* cb, int) {
auto* callback = static_cast<ShutdownCallback*>(cb);
delete callback->cq_;
delete callback;
@@ -243,25 +243,33 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
::grpc::CompletionQueue* Channel::CallbackCQ() {
// TODO(vjpai): Consider using a single global CQ for the default CQ
// if there is no explicit per-channel CQ registered
+ CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_acquire);
+ if (callback_cq != nullptr) {
+ return callback_cq;
+ }
+ // The callback_cq_ wasn't already set, so grab a lock and set it up exactly
+ // once for this channel.
grpc::internal::MutexLock l(&mu_);
- if (callback_cq_ == nullptr) {
+ callback_cq = callback_cq_.load(std::memory_order_relaxed);
+ if (callback_cq == nullptr) {
if (grpc_iomgr_run_in_background()) {
// gRPC-core provides the backing needed for the preferred CQ type
auto* shutdown_callback = new ShutdownCallback;
- callback_cq_ =
+ callback_cq =
new ::grpc::CompletionQueue(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK,
GRPC_CQ_DEFAULT_POLLING, shutdown_callback});
// Transfer ownership of the new cq to its own shutdown callback
- shutdown_callback->TakeCQ(callback_cq_);
+ shutdown_callback->TakeCQ(callback_cq);
} else {
// Otherwise we need to use the alternative CQ variant
- callback_cq_ = CompletionQueue::CallbackAlternativeCQ();
+ callback_cq = CompletionQueue::CallbackAlternativeCQ();
}
+ callback_cq_.store(callback_cq, std::memory_order_release);
}
- return callback_cq_;
+ return callback_cq;
}
} // namespace grpc