aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/grpc/src
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 10:54:08 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 12:28:07 +0300
commitfc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch)
treec15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /contrib/libs/grpc/src
parent8a749596d40e91c896a1907afcd108d9221fbde1 (diff)
downloadydb-fc1cffcfa7f0497a1f97b384a24bcbf23362f3be.tar.gz
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'contrib/libs/grpc/src')
-rw-r--r--contrib/libs/grpc/src/core/lib/iomgr/tcp_client_posix.cc4
-rw-r--r--contrib/libs/grpc/src/core/lib/iomgr/work_serializer.cc21
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_cc.cc23
3 files changed, 31 insertions, 17 deletions
diff --git a/contrib/libs/grpc/src/core/lib/iomgr/tcp_client_posix.cc b/contrib/libs/grpc/src/core/lib/iomgr/tcp_client_posix.cc
index b65583fcb2..a3f238220d 100644
--- a/contrib/libs/grpc/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/contrib/libs/grpc/src/core/lib/iomgr/tcp_client_posix.cc
@@ -136,6 +136,7 @@ static void on_writable(void* acp, grpc_error_handle error) {
int done;
grpc_endpoint** ep = ac->ep;
grpc_closure* closure = ac->closure;
+ std::string addr_str = ac->addr_str;
grpc_fd* fd;
(void)GRPC_ERROR_REF(error);
@@ -221,8 +222,7 @@ finish:
TString description =
y_absl::StrCat("Failed to connect to remote host: ", str);
error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION, description);
- error =
- grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, ac->addr_str);
+ error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, addr_str);
}
if (done) {
// This is safe even outside the lock, because "done", the sentinel, is
diff --git a/contrib/libs/grpc/src/core/lib/iomgr/work_serializer.cc b/contrib/libs/grpc/src/core/lib/iomgr/work_serializer.cc
index 3af315ccb1..dab378c64d 100644
--- a/contrib/libs/grpc/src/core/lib/iomgr/work_serializer.cc
+++ b/contrib/libs/grpc/src/core/lib/iomgr/work_serializer.cc
@@ -121,10 +121,9 @@ void WorkSerializer::WorkSerializerImpl::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this);
}
- uint64_t prev_ref_pair =
+ const uint64_t prev_ref_pair =
refs_.fetch_sub(MakeRefPair(0, 1), std::memory_order_acq_rel);
- if (GetSize(prev_ref_pair) == 1) {
- GPR_DEBUG_ASSERT(GetOwners(prev_ref_pair) == 0);
+ if (GetOwners(prev_ref_pair) == 0 && GetSize(prev_ref_pair) == 1) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Destroying");
}
@@ -170,13 +169,19 @@ void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() {
return;
}
if (GetSize(prev_ref_pair) == 2) {
- // Queue drained. Give up ownership but only if queue remains empty. Note
- // that we are using relaxed memory order semantics for the load on
- // failure since we don't care about that value.
+ // Queue drained. Give up ownership but only if queue remains empty.
uint64_t expected = MakeRefPair(1, 1);
if (refs_.compare_exchange_strong(expected, MakeRefPair(0, 1),
- std::memory_order_acq_rel,
- std::memory_order_relaxed)) {
+ std::memory_order_acq_rel)) {
+ // Queue is drained.
+ return;
+ }
+ if (GetSize(expected) == 0) {
+ // WorkSerializer got orphaned while this was running
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
+ gpr_log(GPR_INFO, " Queue Drained. Destroying");
+ }
+ delete this;
return;
}
}
diff --git a/contrib/libs/grpc/src/cpp/server/server_cc.cc b/contrib/libs/grpc/src/cpp/server/server_cc.cc
index edb1fe4d1d..02a52263c9 100644
--- a/contrib/libs/grpc/src/cpp/server/server_cc.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_cc.cc
@@ -225,17 +225,24 @@ void ServerInterface::RegisteredAsyncRequest::IssueRequest(
ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
ServerInterface* server, GenericServerContext* context,
internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
+ ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize,
+ bool delay_start)
: BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag,
delete_on_finalize) {
grpc_call_details_init(&call_details_);
- GPR_ASSERT(notification_cq);
- GPR_ASSERT(call_cq);
+ if (!delay_start) {
+ Start();
+ }
+}
+
+void ServerInterface::GenericAsyncRequest::Start() {
+ GPR_ASSERT(notification_cq_);
+ GPR_ASSERT(call_cq_);
// The following call_start_batch is internally-generated so no need for an
// explanatory log on failure.
- GPR_ASSERT(grpc_server_request_call(server->server(), &call_, &call_details_,
- context->client_metadata_.arr(),
- call_cq->cq(), notification_cq->cq(),
+ GPR_ASSERT(grpc_server_request_call(server_->server(), &call_, &call_details_,
+ context_->client_metadata_.arr(),
+ call_cq_->cq(), notification_cq_->cq(),
this) == GRPC_CALL_OK);
}
@@ -303,7 +310,9 @@ class Server::UnimplementedAsyncRequest final
UnimplementedAsyncRequest(ServerInterface* server,
grpc::ServerCompletionQueue* cq)
: GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
- nullptr, false) {}
+ nullptr, false, true) {
+ Start();
+ }
bool FinalizeResult(void** tag, bool* status) override;