aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordon-dron <don-dron@yandex-team.com>2024-07-15 09:41:20 +0300
committerdon-dron <don-dron@yandex-team.com>2024-07-15 10:19:12 +0300
commit853998ef91546e36757de9e51dd79bda129cbaa3 (patch)
treebee9c887dcdbea0dad89484da3e93c9e9f286cf1
parent91ebedebadd578973f56f3575227bd29504770ff (diff)
downloadydb-853998ef91546e36757de9e51dd79bda129cbaa3.tar.gz
YT-22233: Fix client memory tracking
1797ea8791ffda5095eab7d2002799c80e565f8c
-rw-r--r--yt/yt/core/rpc/bus/channel.cpp5
-rw-r--r--yt/yt/core/rpc/channel.h2
-rw-r--r--yt/yt/core/rpc/channel_detail.cpp5
-rw-r--r--yt/yt/core/rpc/channel_detail.h2
-rw-r--r--yt/yt/core/rpc/client-inl.h1
-rw-r--r--yt/yt/core/rpc/client.cpp23
-rw-r--r--yt/yt/core/rpc/client.h8
-rw-r--r--yt/yt/core/rpc/grpc/channel.cpp5
-rw-r--r--yt/yt/core/rpc/hedging_channel.cpp5
-rw-r--r--yt/yt/core/rpc/http/channel.cpp5
-rw-r--r--yt/yt/core/rpc/local_channel.cpp5
-rw-r--r--yt/yt/core/rpc/null_channel.cpp6
-rw-r--r--yt/yt/core/rpc/roaming_channel.cpp5
-rw-r--r--yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp5
-rw-r--r--yt/yt/core/test_framework/test_proxy_service.cpp5
-rw-r--r--yt/yt/core/test_framework/test_proxy_service.h2
16 files changed, 78 insertions, 11 deletions
diff --git a/yt/yt/core/rpc/bus/channel.cpp b/yt/yt/core/rpc/bus/channel.cpp
index 92ac4a8b9e..70809661c0 100644
--- a/yt/yt/core/rpc/bus/channel.cpp
+++ b/yt/yt/core/rpc/bus/channel.cpp
@@ -150,6 +150,11 @@ public:
return requestCount;
}
+ IMemoryUsageTrackerPtr GetChannelMemoryTracker() override
+ {
+ return MemoryUsageTracker_;
+ }
+
private:
class TSession;
using TSessionPtr = TIntrusivePtr<TSession>;
diff --git a/yt/yt/core/rpc/channel.h b/yt/yt/core/rpc/channel.h
index 8248a938d5..8affb32b47 100644
--- a/yt/yt/core/rpc/channel.h
+++ b/yt/yt/core/rpc/channel.h
@@ -123,6 +123,8 @@ struct IChannel
DECLARE_INTERFACE_SIGNAL(void(const TError&), Terminated);
virtual int GetInflightRequestCount() = 0;
+
+ virtual IMemoryUsageTrackerPtr GetChannelMemoryTracker() = 0;
};
DEFINE_REFCOUNTED_TYPE(IChannel)
diff --git a/yt/yt/core/rpc/channel_detail.cpp b/yt/yt/core/rpc/channel_detail.cpp
index 181a779f48..fd4233826f 100644
--- a/yt/yt/core/rpc/channel_detail.cpp
+++ b/yt/yt/core/rpc/channel_detail.cpp
@@ -62,6 +62,11 @@ int TChannelWrapper::GetInflightRequestCount()
return UnderlyingChannel_->GetInflightRequestCount();
}
+IMemoryUsageTrackerPtr TChannelWrapper::GetChannelMemoryTracker()
+{
+ return UnderlyingChannel_->GetChannelMemoryTracker();
+}
+
////////////////////////////////////////////////////////////////////////////////
void TClientRequestControlThunk::SetUnderlying(IClientRequestControlPtr underlying)
diff --git a/yt/yt/core/rpc/channel_detail.h b/yt/yt/core/rpc/channel_detail.h
index a3ca87a5d4..e01579f361 100644
--- a/yt/yt/core/rpc/channel_detail.h
+++ b/yt/yt/core/rpc/channel_detail.h
@@ -29,6 +29,8 @@ public:
int GetInflightRequestCount() override;
+ IMemoryUsageTrackerPtr GetChannelMemoryTracker() override;
+
protected:
const IChannelPtr UnderlyingChannel_;
};
diff --git a/yt/yt/core/rpc/client-inl.h b/yt/yt/core/rpc/client-inl.h
index c57f63d1bd..fb844d4a23 100644
--- a/yt/yt/core/rpc/client-inl.h
+++ b/yt/yt/core/rpc/client-inl.h
@@ -149,6 +149,7 @@ TIntrusivePtr<T> TProxyBase::CreateRequest(const TMethodDescriptor& methodDescri
request->SetAcknowledgementTimeout(DefaultAcknowledgementTimeout_);
request->SetRequestCodec(DefaultRequestCodec_);
request->SetResponseCodec(DefaultResponseCodec_);
+ request->SetMemoryUsageTracker(DefaultMemoryUsageTracker_);
request->SetEnableLegacyRpcCodecs(DefaultEnableLegacyRpcCodecs_);
request->SetMultiplexingBand(methodDescriptor.MultiplexingBand);
diff --git a/yt/yt/core/rpc/client.cpp b/yt/yt/core/rpc/client.cpp
index cbeef69214..acd67934f3 100644
--- a/yt/yt/core/rpc/client.cpp
+++ b/yt/yt/core/rpc/client.cpp
@@ -40,7 +40,8 @@ TClientContext::TClientContext(
TFeatureIdFormatter featureIdFormatter,
bool responseIsHeavy,
TAttachmentsOutputStreamPtr requestAttachmentsStream,
- TAttachmentsInputStreamPtr responseAttachmentsStream)
+ TAttachmentsInputStreamPtr responseAttachmentsStream,
+ IMemoryUsageTrackerPtr memoryUsageTracker)
: RequestId_(requestId)
, TraceContext_(std::move(traceContext))
, Service_(std::move(service))
@@ -49,6 +50,7 @@ TClientContext::TClientContext(
, ResponseHeavy_(responseIsHeavy)
, RequestAttachmentsStream_(std::move(requestAttachmentsStream))
, ResponseAttachmentsStream_(std::move(responseAttachmentsStream))
+ , MemoryUsageTracker_(std::move(memoryUsageTracker))
{ }
////////////////////////////////////////////////////////////////////////////////
@@ -81,6 +83,7 @@ TClientRequest::TClientRequest(const TClientRequest& other)
, RequestCodec_(other.RequestCodec_)
, ResponseCodec_(other.ResponseCodec_)
, GenerateAttachmentChecksums_(other.GenerateAttachmentChecksums_)
+ , MemoryUsageTracker_(other.MemoryUsageTracker_)
, Channel_(other.Channel_)
, StreamingEnabled_(other.StreamingEnabled_)
, SendBaggage_(other.SendBaggage_)
@@ -347,7 +350,8 @@ TClientContextPtr TClientRequest::CreateClientContext()
FeatureIdFormatter_,
ResponseHeavy_,
RequestAttachmentsStream_,
- ResponseAttachmentsStream_);
+ ResponseAttachmentsStream_,
+ MemoryUsageTracker_ ? MemoryUsageTracker_ : Channel_->GetChannelMemoryTracker());
}
void TClientRequest::OnPullRequestAttachmentsStream()
@@ -616,18 +620,17 @@ void TClientResponse::Deserialize(TSharedRefArray responseMessage)
}
auto compressedAttachments = MakeRange(ResponseMessage_.Begin() + 2, ResponseMessage_.End());
+ auto memoryUsageTracker = ClientContext_->GetMemoryUsageTracker();
+
if (attachmentCodecId == NCompression::ECodec::None) {
- Attachments_.clear();
- Attachments_.reserve(compressedAttachments.Size());
- for (auto& attachment : compressedAttachments) {
- struct TCopiedAttachmentTag
- { };
- auto copiedAttachment = TSharedMutableRef::MakeCopy<TCopiedAttachmentTag>(attachment);
- Attachments_.push_back(std::move(copiedAttachment));
- }
+ Attachments_ = compressedAttachments.ToVector();
} else {
Attachments_ = DecompressAttachments(compressedAttachments, attachmentCodecId);
}
+
+ for (auto& attachment : Attachments_) {
+ attachment = TrackMemory(memoryUsageTracker, attachment);
+ }
}
void TClientResponse::HandleAcknowledgement()
diff --git a/yt/yt/core/rpc/client.h b/yt/yt/core/rpc/client.h
index 96e5c27db7..c67980d0d5 100644
--- a/yt/yt/core/rpc/client.h
+++ b/yt/yt/core/rpc/client.h
@@ -12,6 +12,8 @@
#include <yt/yt/core/misc/property.h>
#include <yt/yt/core/misc/protobuf_helpers.h>
+#include <yt/yt/core/misc/memory_usage_tracker.h>
+
#include <yt/yt/core/rpc/helpers.h>
#include <yt/yt/core/rpc/message.h>
#include <yt/yt_proto/yt/core/rpc/proto/rpc.pb.h>
@@ -104,6 +106,7 @@ public:
DEFINE_BYVAL_RO_PROPERTY(bool, ResponseHeavy);
DEFINE_BYVAL_RO_PROPERTY(TAttachmentsOutputStreamPtr, RequestAttachmentsStream);
DEFINE_BYVAL_RO_PROPERTY(TAttachmentsInputStreamPtr, ResponseAttachmentsStream);
+ DEFINE_BYVAL_RO_PROPERTY(IMemoryUsageTrackerPtr, MemoryUsageTracker);
public:
TClientContext(
@@ -114,7 +117,8 @@ public:
TFeatureIdFormatter featureIdFormatter,
bool heavy,
TAttachmentsOutputStreamPtr requestAttachmentsStream,
- TAttachmentsInputStreamPtr responseAttachmentsStream);
+ TAttachmentsInputStreamPtr responseAttachmentsStream,
+ IMemoryUsageTrackerPtr memoryUsageTracker);
};
DEFINE_REFCOUNTED_TYPE(TClientContext)
@@ -137,6 +141,7 @@ public:
DEFINE_BYVAL_RW_PROPERTY(NCompression::ECodec, ResponseCodec, NCompression::ECodec::None);
DEFINE_BYVAL_RW_PROPERTY(bool, EnableLegacyRpcCodecs, true);
DEFINE_BYVAL_RW_PROPERTY(bool, GenerateAttachmentChecksums, true);
+ DEFINE_BYVAL_RW_PROPERTY(IMemoryUsageTrackerPtr, MemoryUsageTracker);
// Field is used on client side only. So it is never serialized.
DEFINE_BYREF_RW_PROPERTY(NTracing::TTraceContext::TTagList, TracingTags);
// For testing purposes only.
@@ -467,6 +472,7 @@ public:
DEFINE_BYVAL_RW_PROPERTY(std::optional<TDuration>, DefaultAcknowledgementTimeout);
DEFINE_BYVAL_RW_PROPERTY(NCompression::ECodec, DefaultRequestCodec, NCompression::ECodec::None);
DEFINE_BYVAL_RW_PROPERTY(NCompression::ECodec, DefaultResponseCodec, NCompression::ECodec::None);
+ DEFINE_BYVAL_RW_PROPERTY(IMemoryUsageTrackerPtr, DefaultMemoryUsageTracker);
DEFINE_BYVAL_RW_PROPERTY(bool, DefaultEnableLegacyRpcCodecs, true);
DEFINE_BYREF_RW_PROPERTY(TStreamingParameters, DefaultClientAttachmentsStreamingParameters);
diff --git a/yt/yt/core/rpc/grpc/channel.cpp b/yt/yt/core/rpc/grpc/channel.cpp
index f20763f549..0c0ed1fc88 100644
--- a/yt/yt/core/rpc/grpc/channel.cpp
+++ b/yt/yt/core/rpc/grpc/channel.cpp
@@ -216,6 +216,11 @@ public:
YT_UNIMPLEMENTED();
}
+ IMemoryUsageTrackerPtr GetChannelMemoryTracker() override
+ {
+ return GetNullMemoryUsageTracker();
+ }
+
private:
const TChannelConfigPtr Config_;
const TString EndpointAddress_;
diff --git a/yt/yt/core/rpc/hedging_channel.cpp b/yt/yt/core/rpc/hedging_channel.cpp
index f5fb0dcbae..b24e797b4e 100644
--- a/yt/yt/core/rpc/hedging_channel.cpp
+++ b/yt/yt/core/rpc/hedging_channel.cpp
@@ -392,6 +392,11 @@ public:
YT_UNIMPLEMENTED();
}
+ IMemoryUsageTrackerPtr GetChannelMemoryTracker() override
+ {
+ return PrimaryChannel_->GetChannelMemoryTracker();
+ }
+
private:
const IChannelPtr PrimaryChannel_;
const IChannelPtr BackupChannel_;
diff --git a/yt/yt/core/rpc/http/channel.cpp b/yt/yt/core/rpc/http/channel.cpp
index ebbebf2254..fc58a04b76 100644
--- a/yt/yt/core/rpc/http/channel.cpp
+++ b/yt/yt/core/rpc/http/channel.cpp
@@ -121,6 +121,11 @@ public:
YT_UNIMPLEMENTED();
}
+ IMemoryUsageTrackerPtr GetChannelMemoryTracker() override
+ {
+ return GetNullMemoryUsageTracker();
+ }
+
private:
IClientPtr Client_;
std::optional<TDuration> ClientTimeout_;
diff --git a/yt/yt/core/rpc/local_channel.cpp b/yt/yt/core/rpc/local_channel.cpp
index ccfafd9655..6c83590794 100644
--- a/yt/yt/core/rpc/local_channel.cpp
+++ b/yt/yt/core/rpc/local_channel.cpp
@@ -133,6 +133,11 @@ public:
return 0;
}
+ IMemoryUsageTrackerPtr GetChannelMemoryTracker() override
+ {
+ return GetNullMemoryUsageTracker();
+ }
+
private:
class TSession;
using TSessionPtr = TIntrusivePtr<TSession>;
diff --git a/yt/yt/core/rpc/null_channel.cpp b/yt/yt/core/rpc/null_channel.cpp
index 46f0c82520..a0b391dd74 100644
--- a/yt/yt/core/rpc/null_channel.cpp
+++ b/yt/yt/core/rpc/null_channel.cpp
@@ -4,6 +4,7 @@
#include <yt/yt/core/ytree/helpers.h>
+#include <yt/yt/core/misc/memory_usage_tracker.h>
#include <yt/yt/core/misc/singleton.h>
namespace NYT::NRpc {
@@ -48,6 +49,11 @@ public:
return 0;
}
+ IMemoryUsageTrackerPtr GetChannelMemoryTracker() override
+ {
+ return GetNullMemoryUsageTracker();
+ }
+
private:
const TString Address_;
};
diff --git a/yt/yt/core/rpc/roaming_channel.cpp b/yt/yt/core/rpc/roaming_channel.cpp
index 2c2bccc733..af4e689061 100644
--- a/yt/yt/core/rpc/roaming_channel.cpp
+++ b/yt/yt/core/rpc/roaming_channel.cpp
@@ -183,6 +183,11 @@ public:
return 0;
}
+ IMemoryUsageTrackerPtr GetChannelMemoryTracker() override
+ {
+ return GetNullMemoryUsageTracker();
+ }
+
private:
const IRoamingChannelProviderPtr Provider_;
};
diff --git a/yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp b/yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp
index 20055b71e9..8096c20e08 100644
--- a/yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp
+++ b/yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp
@@ -129,6 +129,11 @@ public:
return 0;
}
+ IMemoryUsageTrackerPtr GetChannelMemoryTracker() override
+ {
+ return GetNullMemoryUsageTracker();
+ }
+
DEFINE_SIGNAL_OVERRIDE(void(const TError&), Terminated);
private:
TString Address_;
diff --git a/yt/yt/core/test_framework/test_proxy_service.cpp b/yt/yt/core/test_framework/test_proxy_service.cpp
index ab891501bc..8eb5b8da3d 100644
--- a/yt/yt/core/test_framework/test_proxy_service.cpp
+++ b/yt/yt/core/test_framework/test_proxy_service.cpp
@@ -167,6 +167,11 @@ int TTestChannel::GetInflightRequestCount()
return 0;
}
+IMemoryUsageTrackerPtr TTestChannel::GetChannelMemoryTracker()
+{
+ return GetNullMemoryUsageTracker();
+}
+
////////////////////////////////////////////////////////////////////////////////
TTestBus::TTestBus(TString address)
diff --git a/yt/yt/core/test_framework/test_proxy_service.h b/yt/yt/core/test_framework/test_proxy_service.h
index d1e710c559..389d1eb344 100644
--- a/yt/yt/core/test_framework/test_proxy_service.h
+++ b/yt/yt/core/test_framework/test_proxy_service.h
@@ -110,6 +110,8 @@ public:
int GetInflightRequestCount() override;
+ IMemoryUsageTrackerPtr GetChannelMemoryTracker() override;
+
void SubscribeTerminated(const TCallback<void(const TError&)>& callback) override;
void UnsubscribeTerminated(const TCallback<void(const TError&)>& callback) override;