diff options
author | bulatman <bulatman@yandex-team.com> | 2023-11-17 12:31:07 +0300 |
---|---|---|
committer | bulatman <bulatman@yandex-team.com> | 2023-11-17 13:00:10 +0300 |
commit | c69b72330818bdd7616f4361574d68a82cc937ab (patch) | |
tree | 3e2901a75d77e0601367511473864b42cc1d5077 | |
parent | 2355cf165be67dc04b5ee921427c82d2266ecdee (diff) | |
download | ydb-c69b72330818bdd7616f4361574d68a82cc937ab.tar.gz |
YT: Fix memory leak when enable requests cancelation for grpc
-rw-r--r-- | yt/yt/core/rpc/grpc/server.cpp | 33 | ||||
-rw-r--r-- | yt/yt/core/rpc/service_detail.cpp | 11 |
2 files changed, 28 insertions, 16 deletions
diff --git a/yt/yt/core/rpc/grpc/server.cpp b/yt/yt/core/rpc/grpc/server.cpp index 0d371fa157..da0cb100c4 100644 --- a/yt/yt/core/rpc/grpc/server.cpp +++ b/yt/yt/core/rpc/grpc/server.cpp @@ -288,20 +288,28 @@ private: void SetTosLevel(TTosLevel /*tosLevel*/) override { } - void Terminate(const TError& /*error*/) override - { } + void Terminate(const TError& error) override + { + TerminatedList_.Fire(error); + } - void SubscribeTerminated(const TCallback<void(const TError&)>& /*callback*/) override - { } + void SubscribeTerminated(const TCallback<void(const TError&)>& callback) override + { + TerminatedList_.Subscribe(callback); + } - void UnsubscribeTerminated(const TCallback<void(const TError&)>& /*callback*/) override - { } + void UnsubscribeTerminated(const TCallback<void(const TError&)>& callback) override + { + TerminatedList_.Unsubscribe(callback); + } private: const TWeakPtr<TCallHandler> Handler_; const TNetworkAddress PeerAddress_; const TString PeerAddressString_; const IAttributeDictionaryPtr EndpointAttributes_; + + TSingleShotCallbackList<void(const TError&)> TerminatedList_; }; class TCallHandler @@ -330,6 +338,9 @@ private: ~TCallHandler() { + if (ReplyBus_) { + ReplyBus_->Terminate(TError(NYT::EErrorCode::Canceled, "GRPC call completed")); + } Owner_->OnCallHandlerDestroyed(); } @@ -423,6 +434,8 @@ private: TGrpcSlice ErrorMessageSlice_; int RawCanceled_ = 0; + IBusPtr ReplyBus_; + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, TraceContextSpinLock_); NTracing::TTraceContextHandler TraceContextHandler_; @@ -931,14 +944,16 @@ private: StartBatch(ops, EServerCallCookie::Close); } - auto replyBus = New<TReplyBus>(this); + YT_VERIFY(!ReplyBus_); + ReplyBus_ = New<TReplyBus>(this); + if (Service_) { auto requestMessage = CreateRequestMessage( *header, messageWithAttachments.Message, messageWithAttachments.Attachments); - Service_->HandleRequest(std::move(header), std::move(requestMessage), std::move(replyBus)); + Service_->HandleRequest(std::move(header), std::move(requestMessage), ReplyBus_); } else { auto error = TError( NRpc::EErrorCode::NoSuchService, @@ -947,7 +962,7 @@ private: YT_LOG_WARNING(error); auto responseMessage = CreateErrorResponseMessage(RequestId_, error); - YT_UNUSED_FUTURE(replyBus->Send(std::move(responseMessage), NBus::TSendOptions(EDeliveryTrackingLevel::None))); + YT_UNUSED_FUTURE(ReplyBus_->Send(std::move(responseMessage), NBus::TSendOptions(EDeliveryTrackingLevel::None))); } } diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp index c137e35452..2fd6f483ad 100644 --- a/yt/yt/core/rpc/service_detail.cpp +++ b/yt/yt/core/rpc/service_detail.cpp @@ -297,13 +297,13 @@ public: std::move(acceptedRequest.Message), std::move(logger), acceptedRequest.RuntimeInfo->LogLevel.load(std::memory_order::relaxed)) - , Service_(service) + , Service_(std::move(service)) , RequestId_(acceptedRequest.RequestId) , ReplyBus_(std::move(acceptedRequest.ReplyBus)) , RuntimeInfo_(acceptedRequest.RuntimeInfo) , TraceContext_(std::move(acceptedRequest.TraceContext)) , RequestQueue_(acceptedRequest.RequestQueue) - , ThrottledError_(acceptedRequest.ThrottledError) + , ThrottledError_(std::move(acceptedRequest.ThrottledError)) , MethodPerformanceCounters_(Service_->GetMethodPerformanceCounters( RuntimeInfo_, {GetAuthenticationIdentity().UserTag, RequestQueue_})) @@ -2039,11 +2039,8 @@ void TServiceBase::RegisterRequest(TServiceContext* context) { auto* bucket = GetReplyBusBucket(replyBus); auto guard = Guard(bucket->Lock); - auto it = bucket->ReplyBusToContexts.find(context->GetReplyBus()); - if (it == bucket->ReplyBusToContexts.end()) { - subscribe = true; - it = bucket->ReplyBusToContexts.emplace(replyBus, THashSet<TServiceContext*>()).first; - } + auto [it, inserted] = bucket->ReplyBusToContexts.try_emplace(replyBus); + subscribe = inserted; auto& contexts = it->second; contexts.insert(context); } |