aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorbulatman <bulatman@yandex-team.com>2023-11-17 12:31:07 +0300
committerbulatman <bulatman@yandex-team.com>2023-11-17 13:00:10 +0300
commitc69b72330818bdd7616f4361574d68a82cc937ab (patch)
tree3e2901a75d77e0601367511473864b42cc1d5077 /yt
parent2355cf165be67dc04b5ee921427c82d2266ecdee (diff)
downloadydb-c69b72330818bdd7616f4361574d68a82cc937ab.tar.gz
YT: Fix memory leak when enable requests cancelation for grpc
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/core/rpc/grpc/server.cpp33
-rw-r--r--yt/yt/core/rpc/service_detail.cpp11
2 files changed, 28 insertions, 16 deletions
diff --git a/yt/yt/core/rpc/grpc/server.cpp b/yt/yt/core/rpc/grpc/server.cpp
index 0d371fa157..da0cb100c4 100644
--- a/yt/yt/core/rpc/grpc/server.cpp
+++ b/yt/yt/core/rpc/grpc/server.cpp
@@ -288,20 +288,28 @@ private:
void SetTosLevel(TTosLevel /*tosLevel*/) override
{ }
- void Terminate(const TError& /*error*/) override
- { }
+ void Terminate(const TError& error) override
+ {
+ TerminatedList_.Fire(error);
+ }
- void SubscribeTerminated(const TCallback<void(const TError&)>& /*callback*/) override
- { }
+ void SubscribeTerminated(const TCallback<void(const TError&)>& callback) override
+ {
+ TerminatedList_.Subscribe(callback);
+ }
- void UnsubscribeTerminated(const TCallback<void(const TError&)>& /*callback*/) override
- { }
+ void UnsubscribeTerminated(const TCallback<void(const TError&)>& callback) override
+ {
+ TerminatedList_.Unsubscribe(callback);
+ }
private:
const TWeakPtr<TCallHandler> Handler_;
const TNetworkAddress PeerAddress_;
const TString PeerAddressString_;
const IAttributeDictionaryPtr EndpointAttributes_;
+
+ TSingleShotCallbackList<void(const TError&)> TerminatedList_;
};
class TCallHandler
@@ -330,6 +338,9 @@ private:
~TCallHandler()
{
+ if (ReplyBus_) {
+ ReplyBus_->Terminate(TError(NYT::EErrorCode::Canceled, "GRPC call completed"));
+ }
Owner_->OnCallHandlerDestroyed();
}
@@ -423,6 +434,8 @@ private:
TGrpcSlice ErrorMessageSlice_;
int RawCanceled_ = 0;
+ IBusPtr ReplyBus_;
+
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, TraceContextSpinLock_);
NTracing::TTraceContextHandler TraceContextHandler_;
@@ -931,14 +944,16 @@ private:
StartBatch(ops, EServerCallCookie::Close);
}
- auto replyBus = New<TReplyBus>(this);
+ YT_VERIFY(!ReplyBus_);
+ ReplyBus_ = New<TReplyBus>(this);
+
if (Service_) {
auto requestMessage = CreateRequestMessage(
*header,
messageWithAttachments.Message,
messageWithAttachments.Attachments);
- Service_->HandleRequest(std::move(header), std::move(requestMessage), std::move(replyBus));
+ Service_->HandleRequest(std::move(header), std::move(requestMessage), ReplyBus_);
} else {
auto error = TError(
NRpc::EErrorCode::NoSuchService,
@@ -947,7 +962,7 @@ private:
YT_LOG_WARNING(error);
auto responseMessage = CreateErrorResponseMessage(RequestId_, error);
- YT_UNUSED_FUTURE(replyBus->Send(std::move(responseMessage), NBus::TSendOptions(EDeliveryTrackingLevel::None)));
+ YT_UNUSED_FUTURE(ReplyBus_->Send(std::move(responseMessage), NBus::TSendOptions(EDeliveryTrackingLevel::None)));
}
}
diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp
index c137e35452..2fd6f483ad 100644
--- a/yt/yt/core/rpc/service_detail.cpp
+++ b/yt/yt/core/rpc/service_detail.cpp
@@ -297,13 +297,13 @@ public:
std::move(acceptedRequest.Message),
std::move(logger),
acceptedRequest.RuntimeInfo->LogLevel.load(std::memory_order::relaxed))
- , Service_(service)
+ , Service_(std::move(service))
, RequestId_(acceptedRequest.RequestId)
, ReplyBus_(std::move(acceptedRequest.ReplyBus))
, RuntimeInfo_(acceptedRequest.RuntimeInfo)
, TraceContext_(std::move(acceptedRequest.TraceContext))
, RequestQueue_(acceptedRequest.RequestQueue)
- , ThrottledError_(acceptedRequest.ThrottledError)
+ , ThrottledError_(std::move(acceptedRequest.ThrottledError))
, MethodPerformanceCounters_(Service_->GetMethodPerformanceCounters(
RuntimeInfo_,
{GetAuthenticationIdentity().UserTag, RequestQueue_}))
@@ -2039,11 +2039,8 @@ void TServiceBase::RegisterRequest(TServiceContext* context)
{
auto* bucket = GetReplyBusBucket(replyBus);
auto guard = Guard(bucket->Lock);
- auto it = bucket->ReplyBusToContexts.find(context->GetReplyBus());
- if (it == bucket->ReplyBusToContexts.end()) {
- subscribe = true;
- it = bucket->ReplyBusToContexts.emplace(replyBus, THashSet<TServiceContext*>()).first;
- }
+ auto [it, inserted] = bucket->ReplyBusToContexts.try_emplace(replyBus);
+ subscribe = inserted;
auto& contexts = it->second;
contexts.insert(context);
}