diff options
author | don-dron <don-dron@yandex-team.com> | 2024-07-15 09:41:20 +0300 |
---|---|---|
committer | don-dron <don-dron@yandex-team.com> | 2024-07-15 10:19:12 +0300 |
commit | 853998ef91546e36757de9e51dd79bda129cbaa3 (patch) | |
tree | bee9c887dcdbea0dad89484da3e93c9e9f286cf1 | |
parent | 91ebedebadd578973f56f3575227bd29504770ff (diff) | |
download | ydb-853998ef91546e36757de9e51dd79bda129cbaa3.tar.gz |
YT-22233: Fix client memory tracking
1797ea8791ffda5095eab7d2002799c80e565f8c
-rw-r--r-- | yt/yt/core/rpc/bus/channel.cpp | 5 | ||||
-rw-r--r-- | yt/yt/core/rpc/channel.h | 2 | ||||
-rw-r--r-- | yt/yt/core/rpc/channel_detail.cpp | 5 | ||||
-rw-r--r-- | yt/yt/core/rpc/channel_detail.h | 2 | ||||
-rw-r--r-- | yt/yt/core/rpc/client-inl.h | 1 | ||||
-rw-r--r-- | yt/yt/core/rpc/client.cpp | 23 | ||||
-rw-r--r-- | yt/yt/core/rpc/client.h | 8 | ||||
-rw-r--r-- | yt/yt/core/rpc/grpc/channel.cpp | 5 | ||||
-rw-r--r-- | yt/yt/core/rpc/hedging_channel.cpp | 5 | ||||
-rw-r--r-- | yt/yt/core/rpc/http/channel.cpp | 5 | ||||
-rw-r--r-- | yt/yt/core/rpc/local_channel.cpp | 5 | ||||
-rw-r--r-- | yt/yt/core/rpc/null_channel.cpp | 6 | ||||
-rw-r--r-- | yt/yt/core/rpc/roaming_channel.cpp | 5 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp | 5 | ||||
-rw-r--r-- | yt/yt/core/test_framework/test_proxy_service.cpp | 5 | ||||
-rw-r--r-- | yt/yt/core/test_framework/test_proxy_service.h | 2 |
16 files changed, 78 insertions, 11 deletions
diff --git a/yt/yt/core/rpc/bus/channel.cpp b/yt/yt/core/rpc/bus/channel.cpp index 92ac4a8b9e..70809661c0 100644 --- a/yt/yt/core/rpc/bus/channel.cpp +++ b/yt/yt/core/rpc/bus/channel.cpp @@ -150,6 +150,11 @@ public: return requestCount; } + IMemoryUsageTrackerPtr GetChannelMemoryTracker() override + { + return MemoryUsageTracker_; + } + private: class TSession; using TSessionPtr = TIntrusivePtr<TSession>; diff --git a/yt/yt/core/rpc/channel.h b/yt/yt/core/rpc/channel.h index 8248a938d5..8affb32b47 100644 --- a/yt/yt/core/rpc/channel.h +++ b/yt/yt/core/rpc/channel.h @@ -123,6 +123,8 @@ struct IChannel DECLARE_INTERFACE_SIGNAL(void(const TError&), Terminated); virtual int GetInflightRequestCount() = 0; + + virtual IMemoryUsageTrackerPtr GetChannelMemoryTracker() = 0; }; DEFINE_REFCOUNTED_TYPE(IChannel) diff --git a/yt/yt/core/rpc/channel_detail.cpp b/yt/yt/core/rpc/channel_detail.cpp index 181a779f48..fd4233826f 100644 --- a/yt/yt/core/rpc/channel_detail.cpp +++ b/yt/yt/core/rpc/channel_detail.cpp @@ -62,6 +62,11 @@ int TChannelWrapper::GetInflightRequestCount() return UnderlyingChannel_->GetInflightRequestCount(); } +IMemoryUsageTrackerPtr TChannelWrapper::GetChannelMemoryTracker() +{ + return UnderlyingChannel_->GetChannelMemoryTracker(); +} + //////////////////////////////////////////////////////////////////////////////// void TClientRequestControlThunk::SetUnderlying(IClientRequestControlPtr underlying) diff --git a/yt/yt/core/rpc/channel_detail.h b/yt/yt/core/rpc/channel_detail.h index a3ca87a5d4..e01579f361 100644 --- a/yt/yt/core/rpc/channel_detail.h +++ b/yt/yt/core/rpc/channel_detail.h @@ -29,6 +29,8 @@ public: int GetInflightRequestCount() override; + IMemoryUsageTrackerPtr GetChannelMemoryTracker() override; + protected: const IChannelPtr UnderlyingChannel_; }; diff --git a/yt/yt/core/rpc/client-inl.h b/yt/yt/core/rpc/client-inl.h index c57f63d1bd..fb844d4a23 100644 --- a/yt/yt/core/rpc/client-inl.h +++ b/yt/yt/core/rpc/client-inl.h @@ -149,6 +149,7 @@ TIntrusivePtr<T> TProxyBase::CreateRequest(const TMethodDescriptor& methodDescri request->SetAcknowledgementTimeout(DefaultAcknowledgementTimeout_); request->SetRequestCodec(DefaultRequestCodec_); request->SetResponseCodec(DefaultResponseCodec_); + request->SetMemoryUsageTracker(DefaultMemoryUsageTracker_); request->SetEnableLegacyRpcCodecs(DefaultEnableLegacyRpcCodecs_); request->SetMultiplexingBand(methodDescriptor.MultiplexingBand); diff --git a/yt/yt/core/rpc/client.cpp b/yt/yt/core/rpc/client.cpp index cbeef69214..acd67934f3 100644 --- a/yt/yt/core/rpc/client.cpp +++ b/yt/yt/core/rpc/client.cpp @@ -40,7 +40,8 @@ TClientContext::TClientContext( TFeatureIdFormatter featureIdFormatter, bool responseIsHeavy, TAttachmentsOutputStreamPtr requestAttachmentsStream, - TAttachmentsInputStreamPtr responseAttachmentsStream) + TAttachmentsInputStreamPtr responseAttachmentsStream, + IMemoryUsageTrackerPtr memoryUsageTracker) : RequestId_(requestId) , TraceContext_(std::move(traceContext)) , Service_(std::move(service)) @@ -49,6 +50,7 @@ TClientContext::TClientContext( , ResponseHeavy_(responseIsHeavy) , RequestAttachmentsStream_(std::move(requestAttachmentsStream)) , ResponseAttachmentsStream_(std::move(responseAttachmentsStream)) + , MemoryUsageTracker_(std::move(memoryUsageTracker)) { } //////////////////////////////////////////////////////////////////////////////// @@ -81,6 +83,7 @@ TClientRequest::TClientRequest(const TClientRequest& other) , RequestCodec_(other.RequestCodec_) , ResponseCodec_(other.ResponseCodec_) , GenerateAttachmentChecksums_(other.GenerateAttachmentChecksums_) + , MemoryUsageTracker_(other.MemoryUsageTracker_) , Channel_(other.Channel_) , StreamingEnabled_(other.StreamingEnabled_) , SendBaggage_(other.SendBaggage_) @@ -347,7 +350,8 @@ TClientContextPtr TClientRequest::CreateClientContext() FeatureIdFormatter_, ResponseHeavy_, RequestAttachmentsStream_, - ResponseAttachmentsStream_); + ResponseAttachmentsStream_, + MemoryUsageTracker_ ? MemoryUsageTracker_ : Channel_->GetChannelMemoryTracker()); } void TClientRequest::OnPullRequestAttachmentsStream() @@ -616,18 +620,17 @@ void TClientResponse::Deserialize(TSharedRefArray responseMessage) } auto compressedAttachments = MakeRange(ResponseMessage_.Begin() + 2, ResponseMessage_.End()); + auto memoryUsageTracker = ClientContext_->GetMemoryUsageTracker(); + if (attachmentCodecId == NCompression::ECodec::None) { - Attachments_.clear(); - Attachments_.reserve(compressedAttachments.Size()); - for (auto& attachment : compressedAttachments) { - struct TCopiedAttachmentTag - { }; - auto copiedAttachment = TSharedMutableRef::MakeCopy<TCopiedAttachmentTag>(attachment); - Attachments_.push_back(std::move(copiedAttachment)); - } + Attachments_ = compressedAttachments.ToVector(); } else { Attachments_ = DecompressAttachments(compressedAttachments, attachmentCodecId); } + + for (auto& attachment : Attachments_) { + attachment = TrackMemory(memoryUsageTracker, attachment); + } } void TClientResponse::HandleAcknowledgement() diff --git a/yt/yt/core/rpc/client.h b/yt/yt/core/rpc/client.h index 96e5c27db7..c67980d0d5 100644 --- a/yt/yt/core/rpc/client.h +++ b/yt/yt/core/rpc/client.h @@ -12,6 +12,8 @@ #include <yt/yt/core/misc/property.h> #include <yt/yt/core/misc/protobuf_helpers.h> +#include <yt/yt/core/misc/memory_usage_tracker.h> + #include <yt/yt/core/rpc/helpers.h> #include <yt/yt/core/rpc/message.h> #include <yt/yt_proto/yt/core/rpc/proto/rpc.pb.h> @@ -104,6 +106,7 @@ public: DEFINE_BYVAL_RO_PROPERTY(bool, ResponseHeavy); DEFINE_BYVAL_RO_PROPERTY(TAttachmentsOutputStreamPtr, RequestAttachmentsStream); DEFINE_BYVAL_RO_PROPERTY(TAttachmentsInputStreamPtr, ResponseAttachmentsStream); + DEFINE_BYVAL_RO_PROPERTY(IMemoryUsageTrackerPtr, MemoryUsageTracker); public: TClientContext( @@ -114,7 +117,8 @@ public: TFeatureIdFormatter featureIdFormatter, bool heavy, TAttachmentsOutputStreamPtr requestAttachmentsStream, - TAttachmentsInputStreamPtr responseAttachmentsStream); + TAttachmentsInputStreamPtr responseAttachmentsStream, + IMemoryUsageTrackerPtr memoryUsageTracker); }; DEFINE_REFCOUNTED_TYPE(TClientContext) @@ -137,6 +141,7 @@ public: DEFINE_BYVAL_RW_PROPERTY(NCompression::ECodec, ResponseCodec, NCompression::ECodec::None); DEFINE_BYVAL_RW_PROPERTY(bool, EnableLegacyRpcCodecs, true); DEFINE_BYVAL_RW_PROPERTY(bool, GenerateAttachmentChecksums, true); + DEFINE_BYVAL_RW_PROPERTY(IMemoryUsageTrackerPtr, MemoryUsageTracker); // Field is used on client side only. So it is never serialized. DEFINE_BYREF_RW_PROPERTY(NTracing::TTraceContext::TTagList, TracingTags); // For testing purposes only. @@ -467,6 +472,7 @@ public: DEFINE_BYVAL_RW_PROPERTY(std::optional<TDuration>, DefaultAcknowledgementTimeout); DEFINE_BYVAL_RW_PROPERTY(NCompression::ECodec, DefaultRequestCodec, NCompression::ECodec::None); DEFINE_BYVAL_RW_PROPERTY(NCompression::ECodec, DefaultResponseCodec, NCompression::ECodec::None); + DEFINE_BYVAL_RW_PROPERTY(IMemoryUsageTrackerPtr, DefaultMemoryUsageTracker); DEFINE_BYVAL_RW_PROPERTY(bool, DefaultEnableLegacyRpcCodecs, true); DEFINE_BYREF_RW_PROPERTY(TStreamingParameters, DefaultClientAttachmentsStreamingParameters); diff --git a/yt/yt/core/rpc/grpc/channel.cpp b/yt/yt/core/rpc/grpc/channel.cpp index f20763f549..0c0ed1fc88 100644 --- a/yt/yt/core/rpc/grpc/channel.cpp +++ b/yt/yt/core/rpc/grpc/channel.cpp @@ -216,6 +216,11 @@ public: YT_UNIMPLEMENTED(); } + IMemoryUsageTrackerPtr GetChannelMemoryTracker() override + { + return GetNullMemoryUsageTracker(); + } + private: const TChannelConfigPtr Config_; const TString EndpointAddress_; diff --git a/yt/yt/core/rpc/hedging_channel.cpp b/yt/yt/core/rpc/hedging_channel.cpp index f5fb0dcbae..b24e797b4e 100644 --- a/yt/yt/core/rpc/hedging_channel.cpp +++ b/yt/yt/core/rpc/hedging_channel.cpp @@ -392,6 +392,11 @@ public: YT_UNIMPLEMENTED(); } + IMemoryUsageTrackerPtr GetChannelMemoryTracker() override + { + return PrimaryChannel_->GetChannelMemoryTracker(); + } + private: const IChannelPtr PrimaryChannel_; const IChannelPtr BackupChannel_; diff --git a/yt/yt/core/rpc/http/channel.cpp b/yt/yt/core/rpc/http/channel.cpp index ebbebf2254..fc58a04b76 100644 --- a/yt/yt/core/rpc/http/channel.cpp +++ b/yt/yt/core/rpc/http/channel.cpp @@ -121,6 +121,11 @@ public: YT_UNIMPLEMENTED(); } + IMemoryUsageTrackerPtr GetChannelMemoryTracker() override + { + return GetNullMemoryUsageTracker(); + } + private: IClientPtr Client_; std::optional<TDuration> ClientTimeout_; diff --git a/yt/yt/core/rpc/local_channel.cpp b/yt/yt/core/rpc/local_channel.cpp index ccfafd9655..6c83590794 100644 --- a/yt/yt/core/rpc/local_channel.cpp +++ b/yt/yt/core/rpc/local_channel.cpp @@ -133,6 +133,11 @@ public: return 0; } + IMemoryUsageTrackerPtr GetChannelMemoryTracker() override + { + return GetNullMemoryUsageTracker(); + } + private: class TSession; using TSessionPtr = TIntrusivePtr<TSession>; diff --git a/yt/yt/core/rpc/null_channel.cpp b/yt/yt/core/rpc/null_channel.cpp index 46f0c82520..a0b391dd74 100644 --- a/yt/yt/core/rpc/null_channel.cpp +++ b/yt/yt/core/rpc/null_channel.cpp @@ -4,6 +4,7 @@ #include <yt/yt/core/ytree/helpers.h> +#include <yt/yt/core/misc/memory_usage_tracker.h> #include <yt/yt/core/misc/singleton.h> namespace NYT::NRpc { @@ -48,6 +49,11 @@ public: return 0; } + IMemoryUsageTrackerPtr GetChannelMemoryTracker() override + { + return GetNullMemoryUsageTracker(); + } + private: const TString Address_; }; diff --git a/yt/yt/core/rpc/roaming_channel.cpp b/yt/yt/core/rpc/roaming_channel.cpp index 2c2bccc733..af4e689061 100644 --- a/yt/yt/core/rpc/roaming_channel.cpp +++ b/yt/yt/core/rpc/roaming_channel.cpp @@ -183,6 +183,11 @@ public: return 0; } + IMemoryUsageTrackerPtr GetChannelMemoryTracker() override + { + return GetNullMemoryUsageTracker(); + } + private: const IRoamingChannelProviderPtr Provider_; }; diff --git a/yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp b/yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp index 20055b71e9..8096c20e08 100644 --- a/yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp +++ b/yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp @@ -129,6 +129,11 @@ public: return 0; } + IMemoryUsageTrackerPtr GetChannelMemoryTracker() override + { + return GetNullMemoryUsageTracker(); + } + DEFINE_SIGNAL_OVERRIDE(void(const TError&), Terminated); private: TString Address_; diff --git a/yt/yt/core/test_framework/test_proxy_service.cpp b/yt/yt/core/test_framework/test_proxy_service.cpp index ab891501bc..8eb5b8da3d 100644 --- a/yt/yt/core/test_framework/test_proxy_service.cpp +++ b/yt/yt/core/test_framework/test_proxy_service.cpp @@ -167,6 +167,11 @@ int TTestChannel::GetInflightRequestCount() return 0; } +IMemoryUsageTrackerPtr TTestChannel::GetChannelMemoryTracker() +{ + return GetNullMemoryUsageTracker(); +} + //////////////////////////////////////////////////////////////////////////////// TTestBus::TTestBus(TString address) diff --git a/yt/yt/core/test_framework/test_proxy_service.h b/yt/yt/core/test_framework/test_proxy_service.h index d1e710c559..389d1eb344 100644 --- a/yt/yt/core/test_framework/test_proxy_service.h +++ b/yt/yt/core/test_framework/test_proxy_service.h @@ -110,6 +110,8 @@ public: int GetInflightRequestCount() override; + IMemoryUsageTrackerPtr GetChannelMemoryTracker() override; + void SubscribeTerminated(const TCallback<void(const TError&)>& callback) override; void UnsubscribeTerminated(const TCallback<void(const TError&)>& callback) override; |