diff options
author | don-dron <don-dron@yandex-team.com> | 2024-04-12 00:30:10 +0300 |
---|---|---|
committer | don-dron <don-dron@yandex-team.com> | 2024-04-12 00:38:07 +0300 |
commit | 384788280f0a875070dc2590512b7eea09c0ac03 (patch) | |
tree | adc67bb617d3d988a501718d7336ae1d551f3a14 | |
parent | 00ff372db04018582b653a028eb3647b40544256 (diff) | |
download | ydb-384788280f0a875070dc2590512b7eea09c0ac03.tar.gz |
YT-21534: Modify RPC memory tracking
76e19fad047a3929e625cf7c8411551b5eedb174
-rw-r--r-- | yt/yt/core/rpc/message.cpp | 5 | ||||
-rw-r--r-- | yt/yt/core/rpc/message.h | 1 | ||||
-rw-r--r-- | yt/yt/core/rpc/server_detail.cpp | 32 | ||||
-rw-r--r-- | yt/yt/core/rpc/server_detail.h | 25 | ||||
-rw-r--r-- | yt/yt/core/rpc/service.h | 6 | ||||
-rw-r--r-- | yt/yt/core/rpc/service_detail.cpp | 52 | ||||
-rw-r--r-- | yt/yt/core/rpc/service_detail.h | 33 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/bin/main.cpp | 2 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp | 16 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/lib/common.cpp | 25 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/lib/common.h | 162 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/lib/test_service.cpp | 13 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/lib/test_service.h | 3 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/mock/service.h | 14 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp | 4 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/rpc_ut.cpp | 160 | ||||
-rw-r--r-- | yt/yt/core/ytree/ypath_detail.cpp | 4 |
17 files changed, 439 insertions, 118 deletions
diff --git a/yt/yt/core/rpc/message.cpp b/yt/yt/core/rpc/message.cpp index 6a5b6b7b2b..5df17b9513 100644 --- a/yt/yt/core/rpc/message.cpp +++ b/yt/yt/core/rpc/message.cpp @@ -385,6 +385,11 @@ bool ParseStreamingFeedbackHeader( //////////////////////////////////////////////////////////////////////////////// +i64 GetMessageHeaderSize(const TSharedRefArray& message) +{ + return message.Size() >= 1 ? static_cast<i64>(message[0].Size()) : 0; +} + i64 GetMessageBodySize(const TSharedRefArray& message) { return message.Size() >= 2 ? static_cast<i64>(message[1].Size()) : 0; diff --git a/yt/yt/core/rpc/message.h b/yt/yt/core/rpc/message.h index 1b573dc576..4f8baa031a 100644 --- a/yt/yt/core/rpc/message.h +++ b/yt/yt/core/rpc/message.h @@ -105,6 +105,7 @@ bool ParseStreamingFeedbackHeader( const TSharedRefArray& message, NProto::TStreamingFeedbackHeader* header); +i64 GetMessageHeaderSize(const TSharedRefArray& message); i64 GetMessageBodySize(const TSharedRefArray& message); int GetMessageAttachmentCount(const TSharedRefArray& message); i64 GetTotalMessageAttachmentSize(const TSharedRefArray& message); diff --git a/yt/yt/core/rpc/server_detail.cpp b/yt/yt/core/rpc/server_detail.cpp index 018fe746cb..bf8886478b 100644 --- a/yt/yt/core/rpc/server_detail.cpp +++ b/yt/yt/core/rpc/server_detail.cpp @@ -27,10 +27,14 @@ using NYT::FromProto; TServiceContextBase::TServiceContextBase( std::unique_ptr<TRequestHeader> header, TSharedRefArray requestMessage, + TMemoryUsageTrackerGuard memoryGuard, + IMemoryUsageTrackerPtr memoryUsageTracker, NLogging::TLogger logger, NLogging::ELogLevel logLevel) : RequestHeader_(std::move(header)) , RequestMessage_(std::move(requestMessage)) + , RequestMemoryGuard_(std::move(memoryGuard)) + , MemoryUsageTracker_(std::move(memoryUsageTracker)) , Logger(std::move(logger)) , LogLevel_(logLevel) { @@ -39,10 +43,14 @@ TServiceContextBase::TServiceContextBase( TServiceContextBase::TServiceContextBase( TSharedRefArray requestMessage, + TMemoryUsageTrackerGuard memoryGuard, + IMemoryUsageTrackerPtr memoryUsageTracker, NLogging::TLogger logger, NLogging::ELogLevel logLevel) : RequestHeader_(new TRequestHeader()) , RequestMessage_(std::move(requestMessage)) + , RequestMemoryGuard_(std::move(memoryGuard)) + , MemoryUsageTracker_(std::move(memoryUsageTracker)) , Logger(std::move(logger)) , LogLevel_(logLevel) { @@ -67,6 +75,10 @@ void TServiceContextBase::Initialize() RequestAttachments_ = std::vector<TSharedRef>( RequestMessage_.Begin() + 2, RequestMessage_.End()); + TotalSize_ = TypicalRequestSize + + GetMessageHeaderSize(RequestMessage_) + + GetMessageBodySize(RequestMessage_) + + GetTotalMessageAttachmentSize(RequestMessage_); } void TServiceContextBase::Reply(const TError& error) @@ -299,6 +311,11 @@ TRequestId TServiceContextBase::GetRequestId() const return RequestId_; } +i64 TServiceContextBase::GetTotalSize() const +{ + return TotalSize_; +} + TBusNetworkStatistics TServiceContextBase::GetBusNetworkStatistics() const { return {}; @@ -450,6 +467,11 @@ void TServiceContextBase::SetRawResponseInfo(TString info, bool incremental) } } +const IMemoryUsageTrackerPtr& TServiceContextBase::GetMemoryUsageTracker() const +{ + return MemoryUsageTracker_; +} + const NLogging::TLogger& TServiceContextBase::GetLogger() const { return Logger; @@ -591,6 +613,11 @@ TRealmId TServiceContextWrapper::GetRealmId() const return UnderlyingContext_->GetRealmId(); } +i64 TServiceContextWrapper::GetTotalSize() const +{ + return UnderlyingContext_->GetTotalSize(); +} + const TAuthenticationIdentity& TServiceContextWrapper::GetAuthenticationIdentity() const { return UnderlyingContext_->GetAuthenticationIdentity(); @@ -724,6 +751,11 @@ void TServiceContextWrapper::SetRawResponseInfo(TString info, bool incremental) UnderlyingContext_->SetRawResponseInfo(std::move(info), incremental); } +const IMemoryUsageTrackerPtr& TServiceContextWrapper::GetMemoryUsageTracker() const +{ + return UnderlyingContext_->GetMemoryUsageTracker(); +} + const NLogging::TLogger& TServiceContextWrapper::GetLogger() const { return UnderlyingContext_->GetLogger(); diff --git a/yt/yt/core/rpc/server_detail.h b/yt/yt/core/rpc/server_detail.h index aceba1f14e..64e076d1d5 100644 --- a/yt/yt/core/rpc/server_detail.h +++ b/yt/yt/core/rpc/server_detail.h @@ -7,6 +7,8 @@ #include <yt/yt/core/logging/log.h> +#include <yt/yt/core/misc/memory_usage_tracker.h> + #include <yt/yt_proto/yt/core/rpc/proto/rpc.pb.h> #include <library/cpp/yt/threading/rw_spin_lock.h> @@ -16,6 +18,11 @@ namespace NYT::NRpc { //////////////////////////////////////////////////////////////////////////////// +// Magic constant! This is the lower limit of the memory allocated for the request. +constexpr i64 TypicalRequestSize = 4_KB; + +//////////////////////////////////////////////////////////////////////////////// + //! \note Thread affinity: single-threaded (unless noted otherwise) class TServiceContextBase : public virtual IServiceContext @@ -29,6 +36,8 @@ public: const NYTree::IAttributeDictionary& GetEndpointAttributes() const override; const TString& GetEndpointDescription() const override; + i64 GetTotalSize() const override; + std::optional<TInstant> GetStartTime() const override; std::optional<TDuration> GetTimeout() const override; TInstant GetArriveInstant() const override; @@ -94,6 +103,8 @@ public: void SuppressMissingRequestInfoCheck() override; void SetRawResponseInfo(TString info, bool incremental) override; + const IMemoryUsageTrackerPtr& GetMemoryUsageTracker() const override; + const NLogging::TLogger& GetLogger() const override; NLogging::ELogLevel GetLogLevel() const override; @@ -108,6 +119,10 @@ public: protected: std::unique_ptr<NProto::TRequestHeader> RequestHeader_; TSharedRefArray RequestMessage_; + + TMemoryUsageTrackerGuard RequestMemoryGuard_; + const IMemoryUsageTrackerPtr MemoryUsageTracker_; + const NLogging::TLogger Logger; const NLogging::ELogLevel LogLevel_; @@ -124,6 +139,8 @@ protected: std::atomic<bool> Replied_ = false; TError Error_; + i64 TotalSize_ = 0; + TSharedRef ResponseBody_; std::vector<TSharedRef> ResponseAttachments_; @@ -140,10 +157,14 @@ protected: TServiceContextBase( std::unique_ptr<NProto::TRequestHeader> header, TSharedRefArray requestMessage, + TMemoryUsageTrackerGuard memoryGuard, + IMemoryUsageTrackerPtr memoryUsageTracker, NLogging::TLogger logger, NLogging::ELogLevel logLevel); TServiceContextBase( TSharedRefArray requestMessage, + TMemoryUsageTrackerGuard memoryGuard, + IMemoryUsageTrackerPtr memoryUsageTracker, NLogging::TLogger logger, NLogging::ELogLevel logLevel); @@ -199,6 +220,8 @@ public: TRealmId GetRealmId() const override; const TAuthenticationIdentity& GetAuthenticationIdentity() const override; + i64 GetTotalSize() const override; + bool IsReplied() const override; void Reply(const TError& error) override; void Reply(const TSharedRefArray& responseMessage) override; @@ -241,6 +264,8 @@ public: void SuppressMissingRequestInfoCheck() override; void SetRawResponseInfo(TString info, bool incremental) override; + const IMemoryUsageTrackerPtr& GetMemoryUsageTracker() const override; + const NLogging::TLogger& GetLogger() const override; NLogging::ELogLevel GetLogLevel() const override; diff --git a/yt/yt/core/rpc/service.h b/yt/yt/core/rpc/service.h index 1c2e721730..b92aabac5d 100644 --- a/yt/yt/core/rpc/service.h +++ b/yt/yt/core/rpc/service.h @@ -216,6 +216,9 @@ struct IServiceContext */ virtual void SetRawResponseInfo(TString info, bool incremental) = 0; + //! Returns the memory usage tracker for request/response messages. + virtual const IMemoryUsageTrackerPtr& GetMemoryUsageTracker() const = 0; + //! Returns the logger for request/response messages. virtual const NLogging::TLogger& GetLogger() const = 0; @@ -236,6 +239,9 @@ struct IServiceContext virtual bool IsResponseBodySerializedWithCompression() const = 0; virtual void SetResponseBodySerializedWithCompression() = 0; + //! Return total size of request. Sum of Header, Body, Attachments, TServiceContext, parsed RequestHeaderProto and TraceContext. + virtual i64 GetTotalSize() const = 0; + // Extension methods. void SetRequestInfo(); diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp index 7440d9c9c9..23c7284dab 100644 --- a/yt/yt/core/rpc/service_detail.cpp +++ b/yt/yt/core/rpc/service_detail.cpp @@ -345,6 +345,8 @@ public: : TServiceContextBase( std::move(acceptedRequest.Header), std::move(acceptedRequest.Message), + std::move(acceptedRequest.MemoryGuard), + std::move(acceptedRequest.MemoryUsageTracker), std::move(logger), acceptedRequest.RuntimeInfo->LogLevel.load(std::memory_order::relaxed)) , Service_(std::move(service)) @@ -722,7 +724,6 @@ private: TAttachmentsInputStreamPtr RequestAttachmentsStream_; TAttachmentsOutputStreamPtr ResponseAttachmentsStream_; - bool IsRegistrable() { if (Cancelable_) { @@ -1077,9 +1078,7 @@ private: } if (RequestRun_) { - auto requestTotalSize = GetMessageBodySize(RequestMessage_) + - GetTotalMessageAttachmentSize(RequestMessage_); - RequestQueue_->OnRequestFinished(requestTotalSize); + RequestQueue_->OnRequestFinished(TotalSize_); } if (ActiveRequestCountIncremented_) { @@ -1432,7 +1431,7 @@ void TRequestQueue::OnRequestArrived(const TServiceBase::TServiceContextPtr& con } // Slow path. - DecrementConcurrency(GetTotalRequestSize(context)); + DecrementConcurrency(context->GetTotalSize()); IncrementQueueSize(context); context->BeforeEnqueued(); @@ -1526,28 +1525,22 @@ void TRequestQueue::RunRequest(TServiceBase::TServiceContextPtr context) void TRequestQueue::IncrementQueueSize(const TServiceBase::TServiceContextPtr& context) { ++QueueSize_; - QueueByteSize_.fetch_add(GetTotalRequestSize(context)); + QueueByteSize_.fetch_add(context->GetTotalSize()); } void TRequestQueue::DecrementQueueSize(const TServiceBase::TServiceContextPtr& context) { auto newQueueSize = --QueueSize_; - auto oldQueueByteSize = QueueByteSize_.fetch_sub(GetTotalRequestSize(context)); + auto oldQueueByteSize = QueueByteSize_.fetch_sub(context->GetTotalSize()); YT_ASSERT(newQueueSize >= 0); YT_ASSERT(oldQueueByteSize >= 0); } -i64 TRequestQueue::GetTotalRequestSize(const TServiceBase::TServiceContextPtr& context) -{ - return GetMessageBodySize(context->GetRequestMessage()) + - GetTotalMessageAttachmentSize(context->GetRequestMessage()); -} - bool TRequestQueue::IncrementConcurrency(const TServiceBase::TServiceContextPtr& context) { auto resultSize = ++Concurrency_ <= RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit(); - auto resultByteSize = ConcurrencyByte_.fetch_add(GetTotalRequestSize(context)) <= RuntimeInfo_->ConcurrencyByteLimit.GetDynamicByteLimit(); + auto resultByteSize = ConcurrencyByte_.fetch_add(context->GetTotalSize()) <= RuntimeInfo_->ConcurrencyByteLimit.GetDynamicByteLimit(); return resultSize && resultByteSize; } @@ -1572,7 +1565,7 @@ void TRequestQueue::AcquireThrottlers(const TServiceBase::TServiceContextPtr& co { if (BytesThrottler_.Specified.load(std::memory_order::acquire)) { // Slow path. - auto requestSize = GetTotalRequestSize(context); + auto requestSize = context->GetTotalSize(); BytesThrottler_.Throttler->Acquire(requestSize); } if (WeightThrottler_.Specified.load(std::memory_order::acquire)) { @@ -1637,11 +1630,28 @@ TServiceBase::TServiceBase( const NLogging::TLogger& logger, TRealmId realmId, IAuthenticatorPtr authenticator) + : TServiceBase( + std::move(defaultInvoker), + descriptor, + GetNullMemoryUsageTracker(), + logger, + realmId, + authenticator) +{ } + +TServiceBase::TServiceBase( + IInvokerPtr defaultInvoker, + const TServiceDescriptor& descriptor, + IMemoryUsageTrackerPtr memoryUsageTracker, + const NLogging::TLogger& logger, + TRealmId realmId, + IAuthenticatorPtr authenticator) : Logger(logger) , DefaultInvoker_(std::move(defaultInvoker)) , Authenticator_(std::move(authenticator)) , ServiceDescriptor_(descriptor) , ServiceId_(descriptor.FullServiceName, realmId) + , MemoryUsageTracker_(std::move(memoryUsageTracker)) , Profiler_(RpcServerProfiler.WithHot().WithTag("yt_service", TString(ServiceId_.ServiceName))) , AuthenticationTimer_(Profiler_.Timer("/authentication_time")) , ServiceLivenessChecker_(New<TPeriodicExecutor>( @@ -1701,6 +1711,14 @@ void TServiceBase::HandleRequest( return; } + auto memoryGuard = TMemoryUsageTrackerGuard::Acquire(MemoryUsageTracker_, TypicalRequestSize); + message = TrackMemory(MemoryUsageTracker_, std::move(message)); + if (MemoryUsageTracker_ && MemoryUsageTracker_->IsExceeded()) { + return replyError(TError( + NRpc::EErrorCode::MemoryOverflow, + "Request is dropped due to high memory pressure")); + } + auto tracingMode = runtimeInfo->TracingMode.load(std::memory_order::relaxed); auto traceContext = tracingMode == ERequestTracingMode::Disable ? NTracing::TTraceContextPtr() @@ -1747,7 +1765,9 @@ void TServiceBase::HandleRequest( std::move(header), std::move(message), requestQueue, - maybeThrottled + maybeThrottled, + std::move(memoryGuard), + MemoryUsageTracker_, }; if (!IsAuthenticationNeeded(acceptedRequest)) { diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h index e21e9d59c2..2c2f7f3b1c 100644 --- a/yt/yt/core/rpc/service_detail.h +++ b/yt/yt/core/rpc/service_detail.h @@ -20,6 +20,7 @@ #include <yt/yt/core/misc/object_pool.h> #include <yt/yt/core/misc/protobuf_helpers.h> #include <yt/yt/core/misc/ring_queue.h> +#include <yt/yt/core/misc/memory_usage_tracker.h> #include <yt/yt/core/profiling/timing.h> @@ -195,6 +196,7 @@ public: } Request_->Context_ = underlyingContext.Get(); + auto tracker = Request_->Context_->GetMemoryUsageTracker(); const auto& requestHeader = this->GetRequestHeader(); // COMPAT(danilalexeev): legacy RPC codecs @@ -226,12 +228,12 @@ public: formatOptionsYson = NYson::TYsonString(requestHeader.request_format_options()); } if (format != EMessageFormat::Protobuf) { - body = ConvertMessageFromFormat( + body = TrackMemory(tracker, ConvertMessageFromFormat( body, format, NYson::ReflectProtobufMessageType<TRequestMessage>(), formatOptionsYson, - !bodyCodecId.has_value()); + !bodyCodecId.has_value())); } } @@ -247,9 +249,19 @@ public: std::vector<TSharedRef> requestAttachments; try { - requestAttachments = DecompressAttachments( - underlyingContext->RequestAttachments(), - attachmentCodecId); + if (attachmentCodecId == NCompression::ECodec::None) { + requestAttachments = underlyingContext->RequestAttachments(); + } else { + requestAttachments = DecompressAttachments( + underlyingContext->RequestAttachments(), + attachmentCodecId); + + // For decompressed blocks, memory tracking must be used again, + // since they are allocated in a new allocation. + for (auto& attachment : requestAttachments) { + attachment = TrackMemory(tracker, attachment); + } + } } catch (const std::exception& ex) { underlyingContext->Reply(TError( NRpc::EErrorCode::ProtocolError, @@ -815,6 +827,14 @@ protected: TRealmId realmId = NullRealmId, IAuthenticatorPtr authenticator = nullptr); + TServiceBase( + IInvokerPtr defaultInvoker, + const TServiceDescriptor& descriptor, + IMemoryUsageTrackerPtr memoryUsageTracker, + const NLogging::TLogger& logger, + TRealmId realmId = NullRealmId, + IAuthenticatorPtr authenticator = nullptr); + //! Registers a method handler. //! This call is must be performed prior to service registration. virtual TRuntimeMethodInfoPtr RegisterMethod(const TMethodDescriptor& descriptor); @@ -894,6 +914,7 @@ private: const IAuthenticatorPtr Authenticator_; const TServiceDescriptor ServiceDescriptor_; const TServiceId ServiceId_; + const IMemoryUsageTrackerPtr MemoryUsageTracker_; const NProfiling::TProfiler Profiler_; @@ -977,6 +998,8 @@ private: TSharedRefArray Message; TRequestQueue* RequestQueue; std::optional<TError> ThrottledError; + TMemoryUsageTrackerGuard MemoryGuard; + IMemoryUsageTrackerPtr MemoryUsageTracker; }; void DoDeclareServerFeature(int featureId); diff --git a/yt/yt/core/rpc/unittests/bin/main.cpp b/yt/yt/core/rpc/unittests/bin/main.cpp index 2b3e4f2c9e..a4c90bb160 100644 --- a/yt/yt/core/rpc/unittests/bin/main.cpp +++ b/yt/yt/core/rpc/unittests/bin/main.cpp @@ -28,7 +28,7 @@ int main(int argc, char* argv[]) auto server = CreateBusServer(busServer); auto workerPool = CreateThreadPool(4, "Worker"); - auto service = CreateTestService(workerPool->GetInvoker(), false, /*createChannel*/ {}); + auto service = CreateTestService(workerPool->GetInvoker(), false, /*createChannel*/ {}, GetNullMemoryUsageTracker()); server->RegisterService(service); server->Start(); diff --git a/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp b/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp index fd9165b570..23196016bc 100644 --- a/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp +++ b/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp @@ -16,9 +16,13 @@ class THandleChannelFailureTestBase : public ::testing::Test { public: - IServerPtr CreateServer(const TTestServerHost& serverHost, IMemoryUsageTrackerPtr memoryUsageTracker) + IServerPtr CreateServer( + const TTestServerHost& serverHost, + IMemoryUsageTrackerPtr memoryUsageTracker) { - return TImpl::CreateServer(serverHost.GetPort(), memoryUsageTracker); + return TImpl::CreateServer( + serverHost.GetPort(), + memoryUsageTracker); } IChannelPtr CreateChannel(const TString& address) @@ -50,7 +54,9 @@ TYPED_TEST(THandleChannelFailureTest, HandleChannelFailureTest) auto workerPool = NConcurrency::CreateThreadPool(4, "Worker"); outerServer.InitializeServer( - this->CreateServer(outerServer, New<TTestNodeMemoryTracker>(32_MB)), + this->CreateServer( + outerServer, + outerServer.GetMemoryUsageTracker()), workerPool->GetInvoker(), /*secure*/ false, BIND([&] (const TString& address) { @@ -58,7 +64,9 @@ TYPED_TEST(THandleChannelFailureTest, HandleChannelFailureTest) })); innerServer.InitializeServer( - this->CreateServer(innerServer, New<TTestNodeMemoryTracker>(32_MB)), + this->CreateServer( + innerServer, + innerServer.GetMemoryUsageTracker()), workerPool->GetInvoker(), /*secure*/ false, /*createChannel*/ {}); diff --git a/yt/yt/core/rpc/unittests/lib/common.cpp b/yt/yt/core/rpc/unittests/lib/common.cpp index 089d3b90cd..a1f72f5c07 100644 --- a/yt/yt/core/rpc/unittests/lib/common.cpp +++ b/yt/yt/core/rpc/unittests/lib/common.cpp @@ -108,11 +108,28 @@ i64 TTestNodeMemoryTracker::GetTotalUsage() const return TotalUsage_; } -TSharedRef TTestNodeMemoryTracker::Track( - TSharedRef reference, - bool /*keepHolder*/) +TSharedRef TTestNodeMemoryTracker::Track(TSharedRef reference, bool keepExistingTracking) { - return reference; + if (!reference) { + return reference; + } + + auto rawReference = TRef(reference); + const auto& holder = reference.GetHolder(); + + // Reference could be without a holder, e.g. empty reference. + if (!holder) { + YT_VERIFY(reference.Begin() == TRef::MakeEmpty().Begin()); + return reference; + } + + auto guard = TMemoryUsageTrackerGuard::Acquire(this, reference.Size()); + + auto underlyingHolder = holder->Clone({.KeepMemoryReferenceTracking = keepExistingTracking}); + auto underlyingReference = TSharedRef(rawReference, std::move(underlyingHolder)); + return TSharedRef( + rawReference, + New<TTestTrackedReferenceHolder>(std::move(underlyingReference), std::move(guard))); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/unittests/lib/common.h b/yt/yt/core/rpc/unittests/lib/common.h index 71fd251a80..f87c544c20 100644 --- a/yt/yt/core/rpc/unittests/lib/common.h +++ b/yt/yt/core/rpc/unittests/lib/common.h @@ -59,6 +59,74 @@ namespace NYT::NRpc { //////////////////////////////////////////////////////////////////////////////// +class TTestNodeMemoryTracker + : public IMemoryUsageTracker +{ +public: + explicit TTestNodeMemoryTracker(size_t limit); + + i64 GetLimit() const override; + i64 GetUsed() const override; + i64 GetFree() const override; + bool IsExceeded() const override; + + TError TryAcquire(i64 size) override; + TError TryChange(i64 size) override; + bool Acquire(i64 size) override; + void Release(i64 size) override; + void SetLimit(i64 size) override; + + void ClearTotalUsage(); + i64 GetTotalUsage() const; + + TSharedRef Track( + TSharedRef reference, + bool keepHolder = false) override; +private: + class TTestTrackedReferenceHolder + : public TSharedRangeHolder + { + public: + TTestTrackedReferenceHolder( + TSharedRef underlying, + TMemoryUsageTrackerGuard guard) + : Underlying_(std::move(underlying)) + , Guard_(std::move(guard)) + { } + + TSharedRangeHolderPtr Clone(const TSharedRangeHolderCloneOptions& options) override + { + if (options.KeepMemoryReferenceTracking) { + return this; + } + return Underlying_.GetHolder()->Clone(options); + } + + std::optional<size_t> GetTotalByteSize() const override + { + return Underlying_.GetHolder()->GetTotalByteSize(); + } + + private: + const TSharedRef Underlying_; + const TMemoryUsageTrackerGuard Guard_; + }; + + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock_); + i64 Usage_; + i64 Limit_; + i64 TotalUsage_; + + TError DoTryAcquire(i64 size); + void DoAcquire(i64 size); + void DoRelease(i64 size); +}; + +DECLARE_REFCOUNTED_CLASS(TTestNodeMemoryTracker) +DEFINE_REFCOUNTED_TYPE(TTestNodeMemoryTracker) + +//////////////////////////////////////////////////////////////////////////////// + class TTestServerHost { public: @@ -66,6 +134,7 @@ public: { Port_ = NTesting::GetFreePort(); Address_ = Format("localhost:%v", Port_); + MemoryUsageTracker_ = New<TTestNodeMemoryTracker>(32_MB); } void InitializeServer( @@ -75,7 +144,7 @@ public: TTestCreateChannelCallback createChannel) { Server_ = server; - TestService_ = CreateTestService(invoker, secure, createChannel); + TestService_ = CreateTestService(invoker, secure, createChannel, MemoryUsageTracker_); NoBaggageService_ = CreateNoBaggageService(invoker); Server_->RegisterService(TestService_); Server_->RegisterService(NoBaggageService_); @@ -93,11 +162,26 @@ public: return Port_; } + TTestNodeMemoryTrackerPtr GetMemoryUsageTracker() + { + return MemoryUsageTracker_; + } + TString GetAddress() const { return Address_; } + ITestServicePtr GetTestService() + { + return TestService_; + } + + IServerPtr GetServer() + { + return Server_; + } + protected: NTesting::TPortHolder Port_; TString Address_; @@ -105,64 +189,24 @@ protected: ITestServicePtr TestService_; IServicePtr NoBaggageService_; IServerPtr Server_; + TTestNodeMemoryTrackerPtr MemoryUsageTracker_; }; //////////////////////////////////////////////////////////////////////////////// -class TTestNodeMemoryTracker - : public IMemoryUsageTracker -{ -public: - explicit TTestNodeMemoryTracker(size_t limit); - - i64 GetLimit() const override; - i64 GetUsed() const override; - i64 GetFree() const override; - bool IsExceeded() const override; - - TError TryAcquire(i64 size) override; - TError TryChange(i64 size) override; - bool Acquire(i64 size) override; - void Release(i64 size) override; - void SetLimit(i64 size) override; - - void ClearTotalUsage(); - i64 GetTotalUsage() const; - - TSharedRef Track( - TSharedRef reference, - bool keepHolder = false) override; -private: - YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock_); - i64 Usage_; - i64 Limit_; - i64 TotalUsage_; - - TError DoTryAcquire(i64 size); - void DoAcquire(i64 size); - void DoRelease(i64 size); -}; - -DECLARE_REFCOUNTED_CLASS(TTestNodeMemoryTracker) -DEFINE_REFCOUNTED_TYPE(TTestNodeMemoryTracker) - -//////////////////////////////////////////////////////////////////////////////// - template <class TImpl> class TTestBase : public ::testing::Test - , public TTestServerHost { public: void SetUp() final { - TTestServerHost::InitilizeAddress(); + Host_.InitilizeAddress(); WorkerPool_ = NConcurrency::CreateThreadPool(4, "Worker"); bool secure = TImpl::Secure; - MemoryUsageTracker_ = New<TTestNodeMemoryTracker>(32_MB); - TTestServerHost::InitializeServer( - TImpl::CreateServer(Port_, MemoryUsageTracker_), + Host_.InitializeServer( + TImpl::CreateServer(Host_.GetPort(), Host_.GetMemoryUsageTracker()), WorkerPool_->GetInvoker(), secure, /*createChannel*/ {}); @@ -170,12 +214,7 @@ public: void TearDown() final { - TTestServerHost::TearDown(); - } - - TTestNodeMemoryTrackerPtr GetNodeMemoryUsageTracker() - { - return MemoryUsageTracker_; + Host_.TearDown(); } IChannelPtr CreateChannel( @@ -183,12 +222,27 @@ public: THashMap<TString, NYTree::INodePtr> grpcArguments = {}) { if (address) { - return TImpl::CreateChannel(*address, Address_, std::move(grpcArguments)); + return TImpl::CreateChannel(*address, Host_.GetAddress(), std::move(grpcArguments)); } else { - return TImpl::CreateChannel(Address_, Address_, std::move(grpcArguments)); + return TImpl::CreateChannel(Host_.GetAddress(), Host_.GetAddress(), std::move(grpcArguments)); } } + TTestNodeMemoryTrackerPtr GetMemoryUsageTracker() + { + return Host_.GetMemoryUsageTracker(); + } + + ITestServicePtr GetTestService() + { + return Host_.GetTestService(); + } + + IServerPtr GetServer() + { + return Host_.GetServer(); + } + static bool CheckCancelCode(TErrorCode code) { if (code == NYT::EErrorCode::Canceled) { @@ -213,7 +267,7 @@ public: private: NConcurrency::IThreadPoolPtr WorkerPool_; - TTestNodeMemoryTrackerPtr MemoryUsageTracker_; + TTestServerHost Host_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/unittests/lib/test_service.cpp b/yt/yt/core/rpc/unittests/lib/test_service.cpp index 0e51ffe6e7..5794a239cd 100644 --- a/yt/yt/core/rpc/unittests/lib/test_service.cpp +++ b/yt/yt/core/rpc/unittests/lib/test_service.cpp @@ -28,10 +28,12 @@ public: TTestService( IInvokerPtr invoker, bool secure, - TTestCreateChannelCallback createChannel) + TTestCreateChannelCallback createChannel, + IMemoryUsageTrackerPtr memoryUsageTracker) : TServiceBase( invoker, TTestProxy::GetDescriptor(), + std::move(memoryUsageTracker), NLogging::TLogger("Main")) , Secure_(secure) , CreateChannel_(createChannel) @@ -380,9 +382,14 @@ private: ITestServicePtr CreateTestService( IInvokerPtr invoker, bool secure, - TTestCreateChannelCallback createChannel) + TTestCreateChannelCallback createChannel, + IMemoryUsageTrackerPtr memoryUsageTracker) { - return New<TTestService>(invoker, secure, createChannel); + return New<TTestService>( + invoker, + secure, + createChannel, + std::move(memoryUsageTracker)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/unittests/lib/test_service.h b/yt/yt/core/rpc/unittests/lib/test_service.h index da18cf132d..45d261f002 100644 --- a/yt/yt/core/rpc/unittests/lib/test_service.h +++ b/yt/yt/core/rpc/unittests/lib/test_service.h @@ -70,7 +70,8 @@ using TTestCreateChannelCallback = TCallback<IChannelPtr(const TString& address) ITestServicePtr CreateTestService( IInvokerPtr invoker, bool secure, - TTestCreateChannelCallback createChannel); + TTestCreateChannelCallback createChannel, + IMemoryUsageTrackerPtr memoryUsageTracker); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/unittests/mock/service.h b/yt/yt/core/rpc/unittests/mock/service.h index 21204b12d9..d99c1a9370 100644 --- a/yt/yt/core/rpc/unittests/mock/service.h +++ b/yt/yt/core/rpc/unittests/mock/service.h @@ -49,6 +49,13 @@ public: ); MOCK_METHOD( + i64, + GetTotalSize, + (), + (const, override) + ); + + MOCK_METHOD( NYTree::IAttributeDictionary&, GetEndpointAttributes, (), @@ -329,6 +336,13 @@ public: ); MOCK_METHOD( + const IMemoryUsageTrackerPtr&, + GetMemoryUsageTracker, + (), + (const, override) + ); + + MOCK_METHOD( const NLogging::TLogger&, GetLogger, (), diff --git a/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp b/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp index dce15890fd..f8bc40e8ed 100644 --- a/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp +++ b/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp @@ -29,7 +29,7 @@ TYPED_TEST_SUITE(TRpcTest, TAllTransports); TYPED_TEST(TRpcTest, ResponseWithAllocationTags) { - auto memoryUsageTracker = this->GetNodeMemoryUsageTracker(); + auto memoryUsageTracker = this->GetMemoryUsageTracker(); auto previousLimit = memoryUsageTracker->GetLimit(); memoryUsageTracker->SetLimit(2_GB); static TMemoryTag testMemoryTag = 1 << 20; @@ -79,7 +79,7 @@ TYPED_TEST(TRpcTest, ResponseWithAllocationTags) } auto memoryUsageBefore = CollectMemoryUsageSnapshot()->GetUsage(MemoryAllocationTag, ToString(testMemoryTag)); - EXPECT_LE(memoryUsageBefore, numberOfLoops * 1536_KB); + EXPECT_LE(memoryUsageBefore, numberOfLoops * 2048_KB); for (const auto& rsp : responses) { WaitFor(rsp).ValueOrThrow(); diff --git a/yt/yt/core/rpc/unittests/rpc_ut.cpp b/yt/yt/core/rpc/unittests/rpc_ut.cpp index a28e9e0a37..eb3e393603 100644 --- a/yt/yt/core/rpc/unittests/rpc_ut.cpp +++ b/yt/yt/core/rpc/unittests/rpc_ut.cpp @@ -250,7 +250,7 @@ TYPED_TEST(TNotGrpcTest, ServerStreamsAborted) auto rspOrError = WaitFor(req->Invoke()); EXPECT_EQ(NYT::EErrorCode::Timeout, rspOrError.GetCode()); - WaitFor(this->TestService_->GetServerStreamsAborted()) + WaitFor(this->GetTestService()->GetServerStreamsAborted()) .ThrowOnError(); } @@ -339,7 +339,7 @@ TYPED_TEST(TNotGrpcTest, ServerNotReading) EXPECT_EQ(expectedInvokeErrorCode, rspOrError.GetCode()); } - WaitFor(this->TestService_->GetSlowCallCanceled()) + WaitFor(this->GetTestService()->GetSlowCallCanceled()) .ThrowOnError(); } @@ -365,7 +365,7 @@ TYPED_TEST(TNotGrpcTest, ServerNotWriting) EXPECT_EQ(expectedInvokeErrorCode, rspOrError.GetCode()); } - WaitFor(this->TestService_->GetSlowCallCanceled()) + WaitFor(this->GetTestService()->GetSlowCallCanceled()) .ThrowOnError(); } @@ -400,7 +400,7 @@ TYPED_TEST(TNotGrpcTest, VeryLaggyStreamingRequest) }; })"); auto config = ConvertTo<TServerConfigPtr>(TYsonString(configText)); - this->Server_->Configure(config); + this->GetServer()->Configure(config); TTestProxy proxy(this->CreateChannel()); proxy.DefaultServerAttachmentsStreamingParameters().ReadTimeout = TDuration::MilliSeconds(500); @@ -506,6 +506,36 @@ TYPED_TEST(TRpcTest, RegularAttachments) EXPECT_EQ("TTestProxy_", StringFromSharedRef(attachments[2])); } +TYPED_TEST(TNotGrpcTest, TrackedRegularAttachments) +{ + TTestProxy proxy(this->CreateChannel()); + auto req = proxy.RegularAttachments(); + + auto memoryUsageTracker = this->GetMemoryUsageTracker(); + memoryUsageTracker->ClearTotalUsage(); + + req->Attachments().push_back(TSharedRef::FromString("Hello")); + req->Attachments().push_back(TSharedRef::FromString("from")); + req->Attachments().push_back(TSharedRef::FromString("TTestProxy")); + + auto rspOrError = req->Invoke().Get(); + EXPECT_TRUE(rspOrError.IsOK()); + const auto& rsp = rspOrError.Value(); + + const auto& attachments = rsp->Attachments(); + // Attachment allocator proactively allocate slice of 4 KB. + // See NYT::NBus::TPacketDecoder::TChunkedMemoryTrackingAllocator::Allocate. + // default stub = 4096. + // header + body = 110 bytes. + // attachments = 22 bytes. + // sum is 4228 bytes. + EXPECT_EQ(4228 + 32768, memoryUsageTracker->GetTotalUsage()); + EXPECT_EQ(3u, attachments.size()); + EXPECT_EQ("Hello_", StringFromSharedRef(attachments[0])); + EXPECT_EQ("from_", StringFromSharedRef(attachments[1])); + EXPECT_EQ("TTestProxy_", StringFromSharedRef(attachments[2])); +} + TYPED_TEST(TRpcTest, NullAndEmptyAttachments) { TTestProxy proxy(this->CreateChannel()); @@ -530,6 +560,9 @@ TYPED_TEST(TNotGrpcTest, Compression) const auto requestCodecId = NCompression::ECodec::Zstd_2; const auto responseCodecId = NCompression::ECodec::Snappy; + auto memoryUsageTracker = this->GetMemoryUsageTracker(); + memoryUsageTracker->ClearTotalUsage(); + TString message("This is a message string."); std::vector<TString> attachmentStrings({ "This is an attachment string.", @@ -554,6 +587,15 @@ TYPED_TEST(TNotGrpcTest, Compression) EXPECT_TRUE(rspOrError.IsOK()); auto rsp = rspOrError.Value(); + // Attachment allocator proactively allocate slice of 4 KB. + // 32 KB - is read/write buffers per connection. + // See NYT::NBus::TPacketDecoder::TChunkedMemoryTrackingAllocator::Allocate. + // default stub = 4096. + // attachmentStrings[0].size() = 29 * 2 bytes from decoder. + // attachmentStrings[1].size() = 36 * 2 bytes from decoder. + // attachmentStrings[2].size() = 90 * 2 bytes from decoder. + // sum is 4591 bytes. + EXPECT_EQ(4591 + 32768, memoryUsageTracker->GetTotalUsage()); EXPECT_TRUE(rsp->message() == message); EXPECT_TRUE(rsp->GetResponseMessage().Size() >= 2); const auto& serializedResponseBody = SerializeProtoToRefWithCompression(*rsp, responseCodecId); @@ -620,7 +662,7 @@ TYPED_TEST(TNotGrpcTest, RequestBytesThrottling) }; })"); auto config = ConvertTo<TServerConfigPtr>(TYsonString(configText)); - this->Server_->Configure(config); + this->GetServer()->Configure(config); TTestProxy proxy(this->CreateChannel()); @@ -640,8 +682,7 @@ TYPED_TEST(TNotGrpcTest, RequestBytesThrottling) EXPECT_LE(std::abs(static_cast<i64>(timer.GetElapsedTime().MilliSeconds()) - 3000), 200); } -// Now test different types of errors - +// Now test different types of errors. TYPED_TEST(TRpcTest, OK) { TTestProxy proxy(this->CreateChannel()); @@ -710,7 +751,7 @@ TYPED_TEST(TRpcTest, ServerTimeout) auto req = proxy.SlowCanceledCall(); auto rspOrError = req->Invoke().Get(); EXPECT_TRUE(this->CheckTimeoutCode(rspOrError.GetCode())); - WaitFor(this->TestService_->GetSlowCallCanceled()) + WaitFor(this->GetTestService()->GetSlowCallCanceled()) .ThrowOnError(); } @@ -726,7 +767,7 @@ TYPED_TEST(TRpcTest, ClientCancel) EXPECT_TRUE(asyncRspOrError.IsSet()); auto rspOrError = asyncRspOrError.Get(); EXPECT_TRUE(this->CheckCancelCode(rspOrError.GetCode())); - WaitFor(this->TestService_->GetSlowCallCanceled()) + WaitFor(this->GetTestService()->GetSlowCallCanceled()) .ThrowOnError(); } @@ -744,7 +785,7 @@ TYPED_TEST(TRpcTest, RequestQueueSizeLimit) std::vector<TFuture<void>> futures; std::vector<TTestProxy> proxies; - // Concurrency byte limit + queue byte size limit = 10 + 20 = 30 + // Concurrency byte limit + queue byte size limit = 10 + 20 = 30. // First 30 requests must be successful, 31st request must be failed. for (int i = 0; i < 30; ++i) { proxies.push_back(TTestProxy(this->CreateChannel())); @@ -767,11 +808,30 @@ TYPED_TEST(TRpcTest, RequestQueueSizeLimit) EXPECT_TRUE(AllSucceeded(std::move(futures)).Get().IsOK()); } +TYPED_TEST(TNotGrpcTest, RequestMemoryOverflowException) +{ + auto memoryUsageTracker = this->GetMemoryUsageTracker(); + memoryUsageTracker->ClearTotalUsage(); + auto memoryReferenceUsageTracker = this->GetMemoryUsageTracker(); + memoryReferenceUsageTracker->ClearTotalUsage(); + + TTestProxy proxy(this->CreateChannel()); + proxy.SetDefaultTimeout(TDuration::Seconds(10.0)); + auto req = proxy.SomeCall(); + req->set_a(42); + req->Attachments().push_back(TSharedRef::FromString(TString(34_MB, 'x'))); + auto result = WaitFor(req->Invoke().AsVoid()); + + // Limit of memory is 32 MB. + EXPECT_EQ(NRpc::EErrorCode::MemoryOverflow, req->Invoke().Get().GetCode()); +} + TYPED_TEST(TNotGrpcTest, MemoryTracking) { TTestProxy proxy(this->CreateChannel()); - auto memoryUsageTracker = this->GetNodeMemoryUsageTracker(); + auto memoryUsageTracker = this->GetMemoryUsageTracker(); memoryUsageTracker->ClearTotalUsage(); + proxy.SetDefaultTimeout(TDuration::Seconds(10.0)); for (int i = 0; i < 300; ++i) { auto req = proxy.SomeCall(); @@ -779,15 +839,20 @@ TYPED_TEST(TNotGrpcTest, MemoryTracking) WaitFor(req->Invoke().AsVoid()).ThrowOnError(); } + Sleep(TDuration::MilliSeconds(200)); + { auto rpcUsage = memoryUsageTracker->GetTotalUsage(); - EXPECT_EQ(rpcUsage, (static_cast<i64>(32_KB))); + + // 1261568 = 32768 + 1228800 = 32768 + 4096 * 300 + 300 * 110 (header + body). + // 32768 - socket buffers, 4096 - default size per request. + EXPECT_EQ(rpcUsage, 1294568); } } TYPED_TEST(TNotGrpcTest, MemoryTrackingMultipleConnections) { - auto memoryUsageTracker = this->GetNodeMemoryUsageTracker(); + auto memoryUsageTracker = this->GetMemoryUsageTracker(); memoryUsageTracker->ClearTotalUsage(); for (int i = 0; i < 300; ++i) { TTestProxy proxy(this->CreateChannel()); @@ -798,15 +863,18 @@ TYPED_TEST(TNotGrpcTest, MemoryTrackingMultipleConnections) } { - auto rpcUsage = memoryUsageTracker->GetTotalUsage(); - EXPECT_EQ(rpcUsage, (static_cast<i64>(32_KB) * 300)); + // 11059200 / 300 = 36974 = 32768 + 4096 + 110 (header + body). + // 4 KB - stub for request. + // See NYT::NBus::TPacketDecoder::TChunkedMemoryTrackingAllocator::Allocate. + EXPECT_EQ(11092200, memoryUsageTracker->GetTotalUsage()); } } TYPED_TEST(TNotGrpcTest, MemoryTrackingMultipleConcurrent) { - auto memoryUsageTracker = this->GetNodeMemoryUsageTracker(); + auto memoryUsageTracker = this->GetMemoryUsageTracker(); memoryUsageTracker->ClearTotalUsage(); + std::vector<TFuture<void>> futures; std::vector<TTestProxy> proxies; @@ -820,15 +888,44 @@ TYPED_TEST(TNotGrpcTest, MemoryTrackingMultipleConcurrent) futures.push_back(req->Invoke().AsVoid()); } - Sleep(TDuration::MilliSeconds(300)); + Sleep(TDuration::MilliSeconds(100)); + { auto rpcUsage = memoryUsageTracker->GetUsed(); - // 20 = concurrency (10) + queue (20) - EXPECT_EQ(rpcUsage, (static_cast<i64>(32_KB) * 40)); + + // connections count - per connection size. + // 40 per connections, 30 per request (concurrency + queue = 10 + 20 = 30). Each request 4096 - by default + 108 (body + header). + EXPECT_TRUE(rpcUsage > (static_cast<i64>(32_KB) * 40)); } + EXPECT_TRUE(AllSet(std::move(futures)).Get().IsOK()); } +TYPED_TEST(TNotGrpcTest, MemoryOvercommit) +{ + const auto requestCodecId = NCompression::ECodec::Zstd_2; + + auto memoryReferenceUsageTracker = this->GetMemoryUsageTracker(); + memoryReferenceUsageTracker->ClearTotalUsage(); + + TTestProxy proxy(this->CreateChannel()); + proxy.SetDefaultTimeout(TDuration::Seconds(60.0)); + auto req = proxy.SlowCall(); + req->set_request_codec(static_cast<int>(requestCodecId)); + req->Attachments().push_back(TSharedRef::FromString(TString(6_KB, 'x'))); + WaitFor(req->Invoke()).ThrowOnError(); + { + auto rpcUsage = memoryReferenceUsageTracker->GetTotalUsage(); + + // Attachment allocator proactively allocate slice of 4 KB. + // See NYT::NBus::TPacketDecoder::TChunkedMemoryTrackingAllocator::Allocate. + // default stub = 4096. + // header + body = 110 bytes. + // attachments = 6_KB kbytes. + EXPECT_EQ(rpcUsage, 32768 + 4096 + 6144 + 110); + } +} + TYPED_TEST(TNotGrpcTest, RequestQueueByteSizeLimit) { const auto requestCodecId = NCompression::ECodec::Zstd_2; @@ -837,7 +934,7 @@ TYPED_TEST(TNotGrpcTest, RequestQueueByteSizeLimit) std::vector<TTestProxy> proxies; // Every request contains 2 MB, 15 requests contain 30 MB. - // Concurrency byte limit + queue byte size limit = 10 MB + 20 MB = 30 MB + // Concurrency byte limit + queue byte size limit = 10 MB + 20 MB = 30 MB. // First 15 requests must be successful, 16th request must be failed. for (int i = 0; i < 15; ++i) { proxies.push_back(TTestProxy(this->CreateChannel())); @@ -866,17 +963,19 @@ TYPED_TEST(TNotGrpcTest, RequestQueueByteSizeLimit) TYPED_TEST(TRpcTest, ConcurrencyLimit) { - TTestProxy proxy(this->CreateChannel()); std::vector<TFuture<void>> futures; for (int i = 0; i < 10; ++i) { + TTestProxy proxy(this->CreateChannel()); + proxy.SetDefaultTimeout(TDuration::Seconds(10.0)); auto req = proxy.SlowCall(); futures.push_back(req->Invoke().AsVoid()); } - Sleep(TDuration::MilliSeconds(100)); + Sleep(TDuration::MilliSeconds(200)); TFuture<void> backlogFuture; { + TTestProxy proxy(this->CreateChannel()); auto req = proxy.SlowCall(); backlogFuture = req->Invoke().AsVoid(); } @@ -908,7 +1007,7 @@ TYPED_TEST(TRpcTest, CustomErrorMessage) TYPED_TEST(TRpcTest, ServerStopped) { - this->Server_->Stop().Get().ThrowOnError(); + this->GetServer()->Stop().Get().ThrowOnError(); TTestProxy proxy(this->CreateChannel()); auto req = proxy.SomeCall(); req->set_a(42); @@ -926,14 +1025,14 @@ TYPED_TEST(TRpcTest, ConnectionLost) Sleep(TDuration::Seconds(0.5)); EXPECT_FALSE(asyncRspOrError.IsSet()); - YT_UNUSED_FUTURE(this->Server_->Stop(false)); + YT_UNUSED_FUTURE(this->GetServer()->Stop(false)); Sleep(TDuration::Seconds(2)); EXPECT_TRUE(asyncRspOrError.IsSet()); auto rspOrError = asyncRspOrError.Get(); EXPECT_EQ(NRpc::EErrorCode::TransportError, rspOrError.GetCode()); - WaitFor(this->TestService_->GetSlowCallCanceled()) + WaitFor(this->GetTestService()->GetSlowCallCanceled()) .ThrowOnError(); } @@ -988,7 +1087,7 @@ TYPED_TEST(TNotGrpcTest, RequiredClientFeatureNotSupported) TYPED_TEST(TRpcTest, StopWithoutActiveRequests) { - auto stopResult = this->TestService_->Stop(); + auto stopResult = this->GetTestService()->Stop(); EXPECT_TRUE(stopResult.IsSet()); } @@ -997,8 +1096,11 @@ TYPED_TEST(TRpcTest, StopWithActiveRequests) TTestProxy proxy(this->CreateChannel()); auto req = proxy.SlowCall(); auto reqResult = req->Invoke(); + Sleep(TDuration::Seconds(0.5)); - auto stopResult = this->TestService_->Stop(); + + auto stopResult = this->GetTestService()->Stop(); + EXPECT_FALSE(stopResult.IsSet()); EXPECT_TRUE(reqResult.Get().IsOK()); Sleep(TDuration::Seconds(0.5)); @@ -1007,11 +1109,13 @@ TYPED_TEST(TRpcTest, StopWithActiveRequests) TYPED_TEST(TRpcTest, NoMoreRequestsAfterStop) { - auto stopResult = this->TestService_->Stop(); + auto stopResult = this->GetTestService()->Stop(); EXPECT_TRUE(stopResult.IsSet()); + TTestProxy proxy(this->CreateChannel()); auto req = proxy.SlowCall(); auto reqResult = req->Invoke(); + EXPECT_FALSE(reqResult.Get().IsOK()); } diff --git a/yt/yt/core/ytree/ypath_detail.cpp b/yt/yt/core/ytree/ypath_detail.cpp index 1cc159b24f..0e15508993 100644 --- a/yt/yt/core/ytree/ypath_detail.cpp +++ b/yt/yt/core/ytree/ypath_detail.cpp @@ -1775,6 +1775,8 @@ IYPathServiceContextPtr CreateYPathContext( return New<TYPathServiceContext>( std::move(requestMessage), + TMemoryUsageTrackerGuard{}, + GetNullMemoryUsageTracker(), std::move(logger), logLevel); } @@ -1790,6 +1792,8 @@ IYPathServiceContextPtr CreateYPathContext( return New<TYPathServiceContext>( std::move(requestHeader), std::move(requestMessage), + TMemoryUsageTrackerGuard{}, + GetNullMemoryUsageTracker(), std::move(logger), logLevel); } |