diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 10:54:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 12:28:07 +0300 |
commit | fc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch) | |
tree | c15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /contrib/libs/grpc/src | |
parent | 8a749596d40e91c896a1907afcd108d9221fbde1 (diff) | |
download | ydb-fc1cffcfa7f0497a1f97b384a24bcbf23362f3be.tar.gz |
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'contrib/libs/grpc/src')
-rw-r--r-- | contrib/libs/grpc/src/core/lib/iomgr/tcp_client_posix.cc | 4 | ||||
-rw-r--r-- | contrib/libs/grpc/src/core/lib/iomgr/work_serializer.cc | 21 | ||||
-rw-r--r-- | contrib/libs/grpc/src/cpp/server/server_cc.cc | 23 |
3 files changed, 31 insertions, 17 deletions
diff --git a/contrib/libs/grpc/src/core/lib/iomgr/tcp_client_posix.cc b/contrib/libs/grpc/src/core/lib/iomgr/tcp_client_posix.cc index b65583fcb2..a3f238220d 100644 --- a/contrib/libs/grpc/src/core/lib/iomgr/tcp_client_posix.cc +++ b/contrib/libs/grpc/src/core/lib/iomgr/tcp_client_posix.cc @@ -136,6 +136,7 @@ static void on_writable(void* acp, grpc_error_handle error) { int done; grpc_endpoint** ep = ac->ep; grpc_closure* closure = ac->closure; + std::string addr_str = ac->addr_str; grpc_fd* fd; (void)GRPC_ERROR_REF(error); @@ -221,8 +222,7 @@ finish: TString description = y_absl::StrCat("Failed to connect to remote host: ", str); error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION, description); - error = - grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, ac->addr_str); + error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, addr_str); } if (done) { // This is safe even outside the lock, because "done", the sentinel, is diff --git a/contrib/libs/grpc/src/core/lib/iomgr/work_serializer.cc b/contrib/libs/grpc/src/core/lib/iomgr/work_serializer.cc index 3af315ccb1..dab378c64d 100644 --- a/contrib/libs/grpc/src/core/lib/iomgr/work_serializer.cc +++ b/contrib/libs/grpc/src/core/lib/iomgr/work_serializer.cc @@ -121,10 +121,9 @@ void WorkSerializer::WorkSerializerImpl::Orphan() { if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this); } - uint64_t prev_ref_pair = + const uint64_t prev_ref_pair = refs_.fetch_sub(MakeRefPair(0, 1), std::memory_order_acq_rel); - if (GetSize(prev_ref_pair) == 1) { - GPR_DEBUG_ASSERT(GetOwners(prev_ref_pair) == 0); + if (GetOwners(prev_ref_pair) == 0 && GetSize(prev_ref_pair) == 1) { if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { gpr_log(GPR_INFO, " Destroying"); } @@ -170,13 +169,19 @@ void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() { return; } if (GetSize(prev_ref_pair) == 2) { - // Queue drained. Give up ownership but only if queue remains empty. Note - // that we are using relaxed memory order semantics for the load on - // failure since we don't care about that value. + // Queue drained. Give up ownership but only if queue remains empty. uint64_t expected = MakeRefPair(1, 1); if (refs_.compare_exchange_strong(expected, MakeRefPair(0, 1), - std::memory_order_acq_rel, - std::memory_order_relaxed)) { + std::memory_order_acq_rel)) { + // Queue is drained. + return; + } + if (GetSize(expected) == 0) { + // WorkSerializer got orphaned while this was running + if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { + gpr_log(GPR_INFO, " Queue Drained. Destroying"); + } + delete this; return; } } diff --git a/contrib/libs/grpc/src/cpp/server/server_cc.cc b/contrib/libs/grpc/src/cpp/server/server_cc.cc index edb1fe4d1d..02a52263c9 100644 --- a/contrib/libs/grpc/src/cpp/server/server_cc.cc +++ b/contrib/libs/grpc/src/cpp/server/server_cc.cc @@ -225,17 +225,24 @@ void ServerInterface::RegisteredAsyncRequest::IssueRequest( ServerInterface::GenericAsyncRequest::GenericAsyncRequest( ServerInterface* server, GenericServerContext* context, internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize, + bool delay_start) : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, delete_on_finalize) { grpc_call_details_init(&call_details_); - GPR_ASSERT(notification_cq); - GPR_ASSERT(call_cq); + if (!delay_start) { + Start(); + } +} + +void ServerInterface::GenericAsyncRequest::Start() { + GPR_ASSERT(notification_cq_); + GPR_ASSERT(call_cq_); // The following call_start_batch is internally-generated so no need for an // explanatory log on failure. - GPR_ASSERT(grpc_server_request_call(server->server(), &call_, &call_details_, - context->client_metadata_.arr(), - call_cq->cq(), notification_cq->cq(), + GPR_ASSERT(grpc_server_request_call(server_->server(), &call_, &call_details_, + context_->client_metadata_.arr(), + call_cq_->cq(), notification_cq_->cq(), this) == GRPC_CALL_OK); } @@ -303,7 +310,9 @@ class Server::UnimplementedAsyncRequest final UnimplementedAsyncRequest(ServerInterface* server, grpc::ServerCompletionQueue* cq) : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq, - nullptr, false) {} + nullptr, false, true) { + Start(); + } bool FinalizeResult(void** tag, bool* status) override; |