aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordon-dron <don-dron@yandex-team.com>2024-04-12 00:30:10 +0300
committerdon-dron <don-dron@yandex-team.com>2024-04-12 00:38:07 +0300
commit384788280f0a875070dc2590512b7eea09c0ac03 (patch)
treeadc67bb617d3d988a501718d7336ae1d551f3a14
parent00ff372db04018582b653a028eb3647b40544256 (diff)
downloadydb-384788280f0a875070dc2590512b7eea09c0ac03.tar.gz
YT-21534: Modify RPC memory tracking
76e19fad047a3929e625cf7c8411551b5eedb174
-rw-r--r--yt/yt/core/rpc/message.cpp5
-rw-r--r--yt/yt/core/rpc/message.h1
-rw-r--r--yt/yt/core/rpc/server_detail.cpp32
-rw-r--r--yt/yt/core/rpc/server_detail.h25
-rw-r--r--yt/yt/core/rpc/service.h6
-rw-r--r--yt/yt/core/rpc/service_detail.cpp52
-rw-r--r--yt/yt/core/rpc/service_detail.h33
-rw-r--r--yt/yt/core/rpc/unittests/bin/main.cpp2
-rw-r--r--yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp16
-rw-r--r--yt/yt/core/rpc/unittests/lib/common.cpp25
-rw-r--r--yt/yt/core/rpc/unittests/lib/common.h162
-rw-r--r--yt/yt/core/rpc/unittests/lib/test_service.cpp13
-rw-r--r--yt/yt/core/rpc/unittests/lib/test_service.h3
-rw-r--r--yt/yt/core/rpc/unittests/mock/service.h14
-rw-r--r--yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp4
-rw-r--r--yt/yt/core/rpc/unittests/rpc_ut.cpp160
-rw-r--r--yt/yt/core/ytree/ypath_detail.cpp4
17 files changed, 439 insertions, 118 deletions
diff --git a/yt/yt/core/rpc/message.cpp b/yt/yt/core/rpc/message.cpp
index 6a5b6b7b2b..5df17b9513 100644
--- a/yt/yt/core/rpc/message.cpp
+++ b/yt/yt/core/rpc/message.cpp
@@ -385,6 +385,11 @@ bool ParseStreamingFeedbackHeader(
////////////////////////////////////////////////////////////////////////////////
+i64 GetMessageHeaderSize(const TSharedRefArray& message)
+{
+ return message.Size() >= 1 ? static_cast<i64>(message[0].Size()) : 0;
+}
+
i64 GetMessageBodySize(const TSharedRefArray& message)
{
return message.Size() >= 2 ? static_cast<i64>(message[1].Size()) : 0;
diff --git a/yt/yt/core/rpc/message.h b/yt/yt/core/rpc/message.h
index 1b573dc576..4f8baa031a 100644
--- a/yt/yt/core/rpc/message.h
+++ b/yt/yt/core/rpc/message.h
@@ -105,6 +105,7 @@ bool ParseStreamingFeedbackHeader(
const TSharedRefArray& message,
NProto::TStreamingFeedbackHeader* header);
+i64 GetMessageHeaderSize(const TSharedRefArray& message);
i64 GetMessageBodySize(const TSharedRefArray& message);
int GetMessageAttachmentCount(const TSharedRefArray& message);
i64 GetTotalMessageAttachmentSize(const TSharedRefArray& message);
diff --git a/yt/yt/core/rpc/server_detail.cpp b/yt/yt/core/rpc/server_detail.cpp
index 018fe746cb..bf8886478b 100644
--- a/yt/yt/core/rpc/server_detail.cpp
+++ b/yt/yt/core/rpc/server_detail.cpp
@@ -27,10 +27,14 @@ using NYT::FromProto;
TServiceContextBase::TServiceContextBase(
std::unique_ptr<TRequestHeader> header,
TSharedRefArray requestMessage,
+ TMemoryUsageTrackerGuard memoryGuard,
+ IMemoryUsageTrackerPtr memoryUsageTracker,
NLogging::TLogger logger,
NLogging::ELogLevel logLevel)
: RequestHeader_(std::move(header))
, RequestMessage_(std::move(requestMessage))
+ , RequestMemoryGuard_(std::move(memoryGuard))
+ , MemoryUsageTracker_(std::move(memoryUsageTracker))
, Logger(std::move(logger))
, LogLevel_(logLevel)
{
@@ -39,10 +43,14 @@ TServiceContextBase::TServiceContextBase(
TServiceContextBase::TServiceContextBase(
TSharedRefArray requestMessage,
+ TMemoryUsageTrackerGuard memoryGuard,
+ IMemoryUsageTrackerPtr memoryUsageTracker,
NLogging::TLogger logger,
NLogging::ELogLevel logLevel)
: RequestHeader_(new TRequestHeader())
, RequestMessage_(std::move(requestMessage))
+ , RequestMemoryGuard_(std::move(memoryGuard))
+ , MemoryUsageTracker_(std::move(memoryUsageTracker))
, Logger(std::move(logger))
, LogLevel_(logLevel)
{
@@ -67,6 +75,10 @@ void TServiceContextBase::Initialize()
RequestAttachments_ = std::vector<TSharedRef>(
RequestMessage_.Begin() + 2,
RequestMessage_.End());
+ TotalSize_ = TypicalRequestSize +
+ GetMessageHeaderSize(RequestMessage_) +
+ GetMessageBodySize(RequestMessage_) +
+ GetTotalMessageAttachmentSize(RequestMessage_);
}
void TServiceContextBase::Reply(const TError& error)
@@ -299,6 +311,11 @@ TRequestId TServiceContextBase::GetRequestId() const
return RequestId_;
}
+i64 TServiceContextBase::GetTotalSize() const
+{
+ return TotalSize_;
+}
+
TBusNetworkStatistics TServiceContextBase::GetBusNetworkStatistics() const
{
return {};
@@ -450,6 +467,11 @@ void TServiceContextBase::SetRawResponseInfo(TString info, bool incremental)
}
}
+const IMemoryUsageTrackerPtr& TServiceContextBase::GetMemoryUsageTracker() const
+{
+ return MemoryUsageTracker_;
+}
+
const NLogging::TLogger& TServiceContextBase::GetLogger() const
{
return Logger;
@@ -591,6 +613,11 @@ TRealmId TServiceContextWrapper::GetRealmId() const
return UnderlyingContext_->GetRealmId();
}
+i64 TServiceContextWrapper::GetTotalSize() const
+{
+ return UnderlyingContext_->GetTotalSize();
+}
+
const TAuthenticationIdentity& TServiceContextWrapper::GetAuthenticationIdentity() const
{
return UnderlyingContext_->GetAuthenticationIdentity();
@@ -724,6 +751,11 @@ void TServiceContextWrapper::SetRawResponseInfo(TString info, bool incremental)
UnderlyingContext_->SetRawResponseInfo(std::move(info), incremental);
}
+const IMemoryUsageTrackerPtr& TServiceContextWrapper::GetMemoryUsageTracker() const
+{
+ return UnderlyingContext_->GetMemoryUsageTracker();
+}
+
const NLogging::TLogger& TServiceContextWrapper::GetLogger() const
{
return UnderlyingContext_->GetLogger();
diff --git a/yt/yt/core/rpc/server_detail.h b/yt/yt/core/rpc/server_detail.h
index aceba1f14e..64e076d1d5 100644
--- a/yt/yt/core/rpc/server_detail.h
+++ b/yt/yt/core/rpc/server_detail.h
@@ -7,6 +7,8 @@
#include <yt/yt/core/logging/log.h>
+#include <yt/yt/core/misc/memory_usage_tracker.h>
+
#include <yt/yt_proto/yt/core/rpc/proto/rpc.pb.h>
#include <library/cpp/yt/threading/rw_spin_lock.h>
@@ -16,6 +18,11 @@ namespace NYT::NRpc {
////////////////////////////////////////////////////////////////////////////////
+// Magic constant! This is the lower limit of the memory allocated for the request.
+constexpr i64 TypicalRequestSize = 4_KB;
+
+////////////////////////////////////////////////////////////////////////////////
+
//! \note Thread affinity: single-threaded (unless noted otherwise)
class TServiceContextBase
: public virtual IServiceContext
@@ -29,6 +36,8 @@ public:
const NYTree::IAttributeDictionary& GetEndpointAttributes() const override;
const TString& GetEndpointDescription() const override;
+ i64 GetTotalSize() const override;
+
std::optional<TInstant> GetStartTime() const override;
std::optional<TDuration> GetTimeout() const override;
TInstant GetArriveInstant() const override;
@@ -94,6 +103,8 @@ public:
void SuppressMissingRequestInfoCheck() override;
void SetRawResponseInfo(TString info, bool incremental) override;
+ const IMemoryUsageTrackerPtr& GetMemoryUsageTracker() const override;
+
const NLogging::TLogger& GetLogger() const override;
NLogging::ELogLevel GetLogLevel() const override;
@@ -108,6 +119,10 @@ public:
protected:
std::unique_ptr<NProto::TRequestHeader> RequestHeader_;
TSharedRefArray RequestMessage_;
+
+ TMemoryUsageTrackerGuard RequestMemoryGuard_;
+ const IMemoryUsageTrackerPtr MemoryUsageTracker_;
+
const NLogging::TLogger Logger;
const NLogging::ELogLevel LogLevel_;
@@ -124,6 +139,8 @@ protected:
std::atomic<bool> Replied_ = false;
TError Error_;
+ i64 TotalSize_ = 0;
+
TSharedRef ResponseBody_;
std::vector<TSharedRef> ResponseAttachments_;
@@ -140,10 +157,14 @@ protected:
TServiceContextBase(
std::unique_ptr<NProto::TRequestHeader> header,
TSharedRefArray requestMessage,
+ TMemoryUsageTrackerGuard memoryGuard,
+ IMemoryUsageTrackerPtr memoryUsageTracker,
NLogging::TLogger logger,
NLogging::ELogLevel logLevel);
TServiceContextBase(
TSharedRefArray requestMessage,
+ TMemoryUsageTrackerGuard memoryGuard,
+ IMemoryUsageTrackerPtr memoryUsageTracker,
NLogging::TLogger logger,
NLogging::ELogLevel logLevel);
@@ -199,6 +220,8 @@ public:
TRealmId GetRealmId() const override;
const TAuthenticationIdentity& GetAuthenticationIdentity() const override;
+ i64 GetTotalSize() const override;
+
bool IsReplied() const override;
void Reply(const TError& error) override;
void Reply(const TSharedRefArray& responseMessage) override;
@@ -241,6 +264,8 @@ public:
void SuppressMissingRequestInfoCheck() override;
void SetRawResponseInfo(TString info, bool incremental) override;
+ const IMemoryUsageTrackerPtr& GetMemoryUsageTracker() const override;
+
const NLogging::TLogger& GetLogger() const override;
NLogging::ELogLevel GetLogLevel() const override;
diff --git a/yt/yt/core/rpc/service.h b/yt/yt/core/rpc/service.h
index 1c2e721730..b92aabac5d 100644
--- a/yt/yt/core/rpc/service.h
+++ b/yt/yt/core/rpc/service.h
@@ -216,6 +216,9 @@ struct IServiceContext
*/
virtual void SetRawResponseInfo(TString info, bool incremental) = 0;
+ //! Returns the memory usage tracker for request/response messages.
+ virtual const IMemoryUsageTrackerPtr& GetMemoryUsageTracker() const = 0;
+
//! Returns the logger for request/response messages.
virtual const NLogging::TLogger& GetLogger() const = 0;
@@ -236,6 +239,9 @@ struct IServiceContext
virtual bool IsResponseBodySerializedWithCompression() const = 0;
virtual void SetResponseBodySerializedWithCompression() = 0;
+ //! Return total size of request. Sum of Header, Body, Attachments, TServiceContext, parsed RequestHeaderProto and TraceContext.
+ virtual i64 GetTotalSize() const = 0;
+
// Extension methods.
void SetRequestInfo();
diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp
index 7440d9c9c9..23c7284dab 100644
--- a/yt/yt/core/rpc/service_detail.cpp
+++ b/yt/yt/core/rpc/service_detail.cpp
@@ -345,6 +345,8 @@ public:
: TServiceContextBase(
std::move(acceptedRequest.Header),
std::move(acceptedRequest.Message),
+ std::move(acceptedRequest.MemoryGuard),
+ std::move(acceptedRequest.MemoryUsageTracker),
std::move(logger),
acceptedRequest.RuntimeInfo->LogLevel.load(std::memory_order::relaxed))
, Service_(std::move(service))
@@ -722,7 +724,6 @@ private:
TAttachmentsInputStreamPtr RequestAttachmentsStream_;
TAttachmentsOutputStreamPtr ResponseAttachmentsStream_;
-
bool IsRegistrable()
{
if (Cancelable_) {
@@ -1077,9 +1078,7 @@ private:
}
if (RequestRun_) {
- auto requestTotalSize = GetMessageBodySize(RequestMessage_) +
- GetTotalMessageAttachmentSize(RequestMessage_);
- RequestQueue_->OnRequestFinished(requestTotalSize);
+ RequestQueue_->OnRequestFinished(TotalSize_);
}
if (ActiveRequestCountIncremented_) {
@@ -1432,7 +1431,7 @@ void TRequestQueue::OnRequestArrived(const TServiceBase::TServiceContextPtr& con
}
// Slow path.
- DecrementConcurrency(GetTotalRequestSize(context));
+ DecrementConcurrency(context->GetTotalSize());
IncrementQueueSize(context);
context->BeforeEnqueued();
@@ -1526,28 +1525,22 @@ void TRequestQueue::RunRequest(TServiceBase::TServiceContextPtr context)
void TRequestQueue::IncrementQueueSize(const TServiceBase::TServiceContextPtr& context)
{
++QueueSize_;
- QueueByteSize_.fetch_add(GetTotalRequestSize(context));
+ QueueByteSize_.fetch_add(context->GetTotalSize());
}
void TRequestQueue::DecrementQueueSize(const TServiceBase::TServiceContextPtr& context)
{
auto newQueueSize = --QueueSize_;
- auto oldQueueByteSize = QueueByteSize_.fetch_sub(GetTotalRequestSize(context));
+ auto oldQueueByteSize = QueueByteSize_.fetch_sub(context->GetTotalSize());
YT_ASSERT(newQueueSize >= 0);
YT_ASSERT(oldQueueByteSize >= 0);
}
-i64 TRequestQueue::GetTotalRequestSize(const TServiceBase::TServiceContextPtr& context)
-{
- return GetMessageBodySize(context->GetRequestMessage()) +
- GetTotalMessageAttachmentSize(context->GetRequestMessage());
-}
-
bool TRequestQueue::IncrementConcurrency(const TServiceBase::TServiceContextPtr& context)
{
auto resultSize = ++Concurrency_ <= RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit();
- auto resultByteSize = ConcurrencyByte_.fetch_add(GetTotalRequestSize(context)) <= RuntimeInfo_->ConcurrencyByteLimit.GetDynamicByteLimit();
+ auto resultByteSize = ConcurrencyByte_.fetch_add(context->GetTotalSize()) <= RuntimeInfo_->ConcurrencyByteLimit.GetDynamicByteLimit();
return resultSize && resultByteSize;
}
@@ -1572,7 +1565,7 @@ void TRequestQueue::AcquireThrottlers(const TServiceBase::TServiceContextPtr& co
{
if (BytesThrottler_.Specified.load(std::memory_order::acquire)) {
// Slow path.
- auto requestSize = GetTotalRequestSize(context);
+ auto requestSize = context->GetTotalSize();
BytesThrottler_.Throttler->Acquire(requestSize);
}
if (WeightThrottler_.Specified.load(std::memory_order::acquire)) {
@@ -1637,11 +1630,28 @@ TServiceBase::TServiceBase(
const NLogging::TLogger& logger,
TRealmId realmId,
IAuthenticatorPtr authenticator)
+ : TServiceBase(
+ std::move(defaultInvoker),
+ descriptor,
+ GetNullMemoryUsageTracker(),
+ logger,
+ realmId,
+ authenticator)
+{ }
+
+TServiceBase::TServiceBase(
+ IInvokerPtr defaultInvoker,
+ const TServiceDescriptor& descriptor,
+ IMemoryUsageTrackerPtr memoryUsageTracker,
+ const NLogging::TLogger& logger,
+ TRealmId realmId,
+ IAuthenticatorPtr authenticator)
: Logger(logger)
, DefaultInvoker_(std::move(defaultInvoker))
, Authenticator_(std::move(authenticator))
, ServiceDescriptor_(descriptor)
, ServiceId_(descriptor.FullServiceName, realmId)
+ , MemoryUsageTracker_(std::move(memoryUsageTracker))
, Profiler_(RpcServerProfiler.WithHot().WithTag("yt_service", TString(ServiceId_.ServiceName)))
, AuthenticationTimer_(Profiler_.Timer("/authentication_time"))
, ServiceLivenessChecker_(New<TPeriodicExecutor>(
@@ -1701,6 +1711,14 @@ void TServiceBase::HandleRequest(
return;
}
+ auto memoryGuard = TMemoryUsageTrackerGuard::Acquire(MemoryUsageTracker_, TypicalRequestSize);
+ message = TrackMemory(MemoryUsageTracker_, std::move(message));
+ if (MemoryUsageTracker_ && MemoryUsageTracker_->IsExceeded()) {
+ return replyError(TError(
+ NRpc::EErrorCode::MemoryOverflow,
+ "Request is dropped due to high memory pressure"));
+ }
+
auto tracingMode = runtimeInfo->TracingMode.load(std::memory_order::relaxed);
auto traceContext = tracingMode == ERequestTracingMode::Disable
? NTracing::TTraceContextPtr()
@@ -1747,7 +1765,9 @@ void TServiceBase::HandleRequest(
std::move(header),
std::move(message),
requestQueue,
- maybeThrottled
+ maybeThrottled,
+ std::move(memoryGuard),
+ MemoryUsageTracker_,
};
if (!IsAuthenticationNeeded(acceptedRequest)) {
diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h
index e21e9d59c2..2c2f7f3b1c 100644
--- a/yt/yt/core/rpc/service_detail.h
+++ b/yt/yt/core/rpc/service_detail.h
@@ -20,6 +20,7 @@
#include <yt/yt/core/misc/object_pool.h>
#include <yt/yt/core/misc/protobuf_helpers.h>
#include <yt/yt/core/misc/ring_queue.h>
+#include <yt/yt/core/misc/memory_usage_tracker.h>
#include <yt/yt/core/profiling/timing.h>
@@ -195,6 +196,7 @@ public:
}
Request_->Context_ = underlyingContext.Get();
+ auto tracker = Request_->Context_->GetMemoryUsageTracker();
const auto& requestHeader = this->GetRequestHeader();
// COMPAT(danilalexeev): legacy RPC codecs
@@ -226,12 +228,12 @@ public:
formatOptionsYson = NYson::TYsonString(requestHeader.request_format_options());
}
if (format != EMessageFormat::Protobuf) {
- body = ConvertMessageFromFormat(
+ body = TrackMemory(tracker, ConvertMessageFromFormat(
body,
format,
NYson::ReflectProtobufMessageType<TRequestMessage>(),
formatOptionsYson,
- !bodyCodecId.has_value());
+ !bodyCodecId.has_value()));
}
}
@@ -247,9 +249,19 @@ public:
std::vector<TSharedRef> requestAttachments;
try {
- requestAttachments = DecompressAttachments(
- underlyingContext->RequestAttachments(),
- attachmentCodecId);
+ if (attachmentCodecId == NCompression::ECodec::None) {
+ requestAttachments = underlyingContext->RequestAttachments();
+ } else {
+ requestAttachments = DecompressAttachments(
+ underlyingContext->RequestAttachments(),
+ attachmentCodecId);
+
+ // For decompressed blocks, memory tracking must be used again,
+ // since they are allocated in a new allocation.
+ for (auto& attachment : requestAttachments) {
+ attachment = TrackMemory(tracker, attachment);
+ }
+ }
} catch (const std::exception& ex) {
underlyingContext->Reply(TError(
NRpc::EErrorCode::ProtocolError,
@@ -815,6 +827,14 @@ protected:
TRealmId realmId = NullRealmId,
IAuthenticatorPtr authenticator = nullptr);
+ TServiceBase(
+ IInvokerPtr defaultInvoker,
+ const TServiceDescriptor& descriptor,
+ IMemoryUsageTrackerPtr memoryUsageTracker,
+ const NLogging::TLogger& logger,
+ TRealmId realmId = NullRealmId,
+ IAuthenticatorPtr authenticator = nullptr);
+
//! Registers a method handler.
//! This call is must be performed prior to service registration.
virtual TRuntimeMethodInfoPtr RegisterMethod(const TMethodDescriptor& descriptor);
@@ -894,6 +914,7 @@ private:
const IAuthenticatorPtr Authenticator_;
const TServiceDescriptor ServiceDescriptor_;
const TServiceId ServiceId_;
+ const IMemoryUsageTrackerPtr MemoryUsageTracker_;
const NProfiling::TProfiler Profiler_;
@@ -977,6 +998,8 @@ private:
TSharedRefArray Message;
TRequestQueue* RequestQueue;
std::optional<TError> ThrottledError;
+ TMemoryUsageTrackerGuard MemoryGuard;
+ IMemoryUsageTrackerPtr MemoryUsageTracker;
};
void DoDeclareServerFeature(int featureId);
diff --git a/yt/yt/core/rpc/unittests/bin/main.cpp b/yt/yt/core/rpc/unittests/bin/main.cpp
index 2b3e4f2c9e..a4c90bb160 100644
--- a/yt/yt/core/rpc/unittests/bin/main.cpp
+++ b/yt/yt/core/rpc/unittests/bin/main.cpp
@@ -28,7 +28,7 @@ int main(int argc, char* argv[])
auto server = CreateBusServer(busServer);
auto workerPool = CreateThreadPool(4, "Worker");
- auto service = CreateTestService(workerPool->GetInvoker(), false, /*createChannel*/ {});
+ auto service = CreateTestService(workerPool->GetInvoker(), false, /*createChannel*/ {}, GetNullMemoryUsageTracker());
server->RegisterService(service);
server->Start();
diff --git a/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp b/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp
index fd9165b570..23196016bc 100644
--- a/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp
+++ b/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp
@@ -16,9 +16,13 @@ class THandleChannelFailureTestBase
: public ::testing::Test
{
public:
- IServerPtr CreateServer(const TTestServerHost& serverHost, IMemoryUsageTrackerPtr memoryUsageTracker)
+ IServerPtr CreateServer(
+ const TTestServerHost& serverHost,
+ IMemoryUsageTrackerPtr memoryUsageTracker)
{
- return TImpl::CreateServer(serverHost.GetPort(), memoryUsageTracker);
+ return TImpl::CreateServer(
+ serverHost.GetPort(),
+ memoryUsageTracker);
}
IChannelPtr CreateChannel(const TString& address)
@@ -50,7 +54,9 @@ TYPED_TEST(THandleChannelFailureTest, HandleChannelFailureTest)
auto workerPool = NConcurrency::CreateThreadPool(4, "Worker");
outerServer.InitializeServer(
- this->CreateServer(outerServer, New<TTestNodeMemoryTracker>(32_MB)),
+ this->CreateServer(
+ outerServer,
+ outerServer.GetMemoryUsageTracker()),
workerPool->GetInvoker(),
/*secure*/ false,
BIND([&] (const TString& address) {
@@ -58,7 +64,9 @@ TYPED_TEST(THandleChannelFailureTest, HandleChannelFailureTest)
}));
innerServer.InitializeServer(
- this->CreateServer(innerServer, New<TTestNodeMemoryTracker>(32_MB)),
+ this->CreateServer(
+ innerServer,
+ innerServer.GetMemoryUsageTracker()),
workerPool->GetInvoker(),
/*secure*/ false,
/*createChannel*/ {});
diff --git a/yt/yt/core/rpc/unittests/lib/common.cpp b/yt/yt/core/rpc/unittests/lib/common.cpp
index 089d3b90cd..a1f72f5c07 100644
--- a/yt/yt/core/rpc/unittests/lib/common.cpp
+++ b/yt/yt/core/rpc/unittests/lib/common.cpp
@@ -108,11 +108,28 @@ i64 TTestNodeMemoryTracker::GetTotalUsage() const
return TotalUsage_;
}
-TSharedRef TTestNodeMemoryTracker::Track(
- TSharedRef reference,
- bool /*keepHolder*/)
+TSharedRef TTestNodeMemoryTracker::Track(TSharedRef reference, bool keepExistingTracking)
{
- return reference;
+ if (!reference) {
+ return reference;
+ }
+
+ auto rawReference = TRef(reference);
+ const auto& holder = reference.GetHolder();
+
+ // Reference could be without a holder, e.g. empty reference.
+ if (!holder) {
+ YT_VERIFY(reference.Begin() == TRef::MakeEmpty().Begin());
+ return reference;
+ }
+
+ auto guard = TMemoryUsageTrackerGuard::Acquire(this, reference.Size());
+
+ auto underlyingHolder = holder->Clone({.KeepMemoryReferenceTracking = keepExistingTracking});
+ auto underlyingReference = TSharedRef(rawReference, std::move(underlyingHolder));
+ return TSharedRef(
+ rawReference,
+ New<TTestTrackedReferenceHolder>(std::move(underlyingReference), std::move(guard)));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/rpc/unittests/lib/common.h b/yt/yt/core/rpc/unittests/lib/common.h
index 71fd251a80..f87c544c20 100644
--- a/yt/yt/core/rpc/unittests/lib/common.h
+++ b/yt/yt/core/rpc/unittests/lib/common.h
@@ -59,6 +59,74 @@ namespace NYT::NRpc {
////////////////////////////////////////////////////////////////////////////////
+class TTestNodeMemoryTracker
+ : public IMemoryUsageTracker
+{
+public:
+ explicit TTestNodeMemoryTracker(size_t limit);
+
+ i64 GetLimit() const override;
+ i64 GetUsed() const override;
+ i64 GetFree() const override;
+ bool IsExceeded() const override;
+
+ TError TryAcquire(i64 size) override;
+ TError TryChange(i64 size) override;
+ bool Acquire(i64 size) override;
+ void Release(i64 size) override;
+ void SetLimit(i64 size) override;
+
+ void ClearTotalUsage();
+ i64 GetTotalUsage() const;
+
+ TSharedRef Track(
+ TSharedRef reference,
+ bool keepHolder = false) override;
+private:
+ class TTestTrackedReferenceHolder
+ : public TSharedRangeHolder
+ {
+ public:
+ TTestTrackedReferenceHolder(
+ TSharedRef underlying,
+ TMemoryUsageTrackerGuard guard)
+ : Underlying_(std::move(underlying))
+ , Guard_(std::move(guard))
+ { }
+
+ TSharedRangeHolderPtr Clone(const TSharedRangeHolderCloneOptions& options) override
+ {
+ if (options.KeepMemoryReferenceTracking) {
+ return this;
+ }
+ return Underlying_.GetHolder()->Clone(options);
+ }
+
+ std::optional<size_t> GetTotalByteSize() const override
+ {
+ return Underlying_.GetHolder()->GetTotalByteSize();
+ }
+
+ private:
+ const TSharedRef Underlying_;
+ const TMemoryUsageTrackerGuard Guard_;
+ };
+
+ YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock_);
+ i64 Usage_;
+ i64 Limit_;
+ i64 TotalUsage_;
+
+ TError DoTryAcquire(i64 size);
+ void DoAcquire(i64 size);
+ void DoRelease(i64 size);
+};
+
+DECLARE_REFCOUNTED_CLASS(TTestNodeMemoryTracker)
+DEFINE_REFCOUNTED_TYPE(TTestNodeMemoryTracker)
+
+////////////////////////////////////////////////////////////////////////////////
+
class TTestServerHost
{
public:
@@ -66,6 +134,7 @@ public:
{
Port_ = NTesting::GetFreePort();
Address_ = Format("localhost:%v", Port_);
+ MemoryUsageTracker_ = New<TTestNodeMemoryTracker>(32_MB);
}
void InitializeServer(
@@ -75,7 +144,7 @@ public:
TTestCreateChannelCallback createChannel)
{
Server_ = server;
- TestService_ = CreateTestService(invoker, secure, createChannel);
+ TestService_ = CreateTestService(invoker, secure, createChannel, MemoryUsageTracker_);
NoBaggageService_ = CreateNoBaggageService(invoker);
Server_->RegisterService(TestService_);
Server_->RegisterService(NoBaggageService_);
@@ -93,11 +162,26 @@ public:
return Port_;
}
+ TTestNodeMemoryTrackerPtr GetMemoryUsageTracker()
+ {
+ return MemoryUsageTracker_;
+ }
+
TString GetAddress() const
{
return Address_;
}
+ ITestServicePtr GetTestService()
+ {
+ return TestService_;
+ }
+
+ IServerPtr GetServer()
+ {
+ return Server_;
+ }
+
protected:
NTesting::TPortHolder Port_;
TString Address_;
@@ -105,64 +189,24 @@ protected:
ITestServicePtr TestService_;
IServicePtr NoBaggageService_;
IServerPtr Server_;
+ TTestNodeMemoryTrackerPtr MemoryUsageTracker_;
};
////////////////////////////////////////////////////////////////////////////////
-class TTestNodeMemoryTracker
- : public IMemoryUsageTracker
-{
-public:
- explicit TTestNodeMemoryTracker(size_t limit);
-
- i64 GetLimit() const override;
- i64 GetUsed() const override;
- i64 GetFree() const override;
- bool IsExceeded() const override;
-
- TError TryAcquire(i64 size) override;
- TError TryChange(i64 size) override;
- bool Acquire(i64 size) override;
- void Release(i64 size) override;
- void SetLimit(i64 size) override;
-
- void ClearTotalUsage();
- i64 GetTotalUsage() const;
-
- TSharedRef Track(
- TSharedRef reference,
- bool keepHolder = false) override;
-private:
- YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock_);
- i64 Usage_;
- i64 Limit_;
- i64 TotalUsage_;
-
- TError DoTryAcquire(i64 size);
- void DoAcquire(i64 size);
- void DoRelease(i64 size);
-};
-
-DECLARE_REFCOUNTED_CLASS(TTestNodeMemoryTracker)
-DEFINE_REFCOUNTED_TYPE(TTestNodeMemoryTracker)
-
-////////////////////////////////////////////////////////////////////////////////
-
template <class TImpl>
class TTestBase
: public ::testing::Test
- , public TTestServerHost
{
public:
void SetUp() final
{
- TTestServerHost::InitilizeAddress();
+ Host_.InitilizeAddress();
WorkerPool_ = NConcurrency::CreateThreadPool(4, "Worker");
bool secure = TImpl::Secure;
- MemoryUsageTracker_ = New<TTestNodeMemoryTracker>(32_MB);
- TTestServerHost::InitializeServer(
- TImpl::CreateServer(Port_, MemoryUsageTracker_),
+ Host_.InitializeServer(
+ TImpl::CreateServer(Host_.GetPort(), Host_.GetMemoryUsageTracker()),
WorkerPool_->GetInvoker(),
secure,
/*createChannel*/ {});
@@ -170,12 +214,7 @@ public:
void TearDown() final
{
- TTestServerHost::TearDown();
- }
-
- TTestNodeMemoryTrackerPtr GetNodeMemoryUsageTracker()
- {
- return MemoryUsageTracker_;
+ Host_.TearDown();
}
IChannelPtr CreateChannel(
@@ -183,12 +222,27 @@ public:
THashMap<TString, NYTree::INodePtr> grpcArguments = {})
{
if (address) {
- return TImpl::CreateChannel(*address, Address_, std::move(grpcArguments));
+ return TImpl::CreateChannel(*address, Host_.GetAddress(), std::move(grpcArguments));
} else {
- return TImpl::CreateChannel(Address_, Address_, std::move(grpcArguments));
+ return TImpl::CreateChannel(Host_.GetAddress(), Host_.GetAddress(), std::move(grpcArguments));
}
}
+ TTestNodeMemoryTrackerPtr GetMemoryUsageTracker()
+ {
+ return Host_.GetMemoryUsageTracker();
+ }
+
+ ITestServicePtr GetTestService()
+ {
+ return Host_.GetTestService();
+ }
+
+ IServerPtr GetServer()
+ {
+ return Host_.GetServer();
+ }
+
static bool CheckCancelCode(TErrorCode code)
{
if (code == NYT::EErrorCode::Canceled) {
@@ -213,7 +267,7 @@ public:
private:
NConcurrency::IThreadPoolPtr WorkerPool_;
- TTestNodeMemoryTrackerPtr MemoryUsageTracker_;
+ TTestServerHost Host_;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/rpc/unittests/lib/test_service.cpp b/yt/yt/core/rpc/unittests/lib/test_service.cpp
index 0e51ffe6e7..5794a239cd 100644
--- a/yt/yt/core/rpc/unittests/lib/test_service.cpp
+++ b/yt/yt/core/rpc/unittests/lib/test_service.cpp
@@ -28,10 +28,12 @@ public:
TTestService(
IInvokerPtr invoker,
bool secure,
- TTestCreateChannelCallback createChannel)
+ TTestCreateChannelCallback createChannel,
+ IMemoryUsageTrackerPtr memoryUsageTracker)
: TServiceBase(
invoker,
TTestProxy::GetDescriptor(),
+ std::move(memoryUsageTracker),
NLogging::TLogger("Main"))
, Secure_(secure)
, CreateChannel_(createChannel)
@@ -380,9 +382,14 @@ private:
ITestServicePtr CreateTestService(
IInvokerPtr invoker,
bool secure,
- TTestCreateChannelCallback createChannel)
+ TTestCreateChannelCallback createChannel,
+ IMemoryUsageTrackerPtr memoryUsageTracker)
{
- return New<TTestService>(invoker, secure, createChannel);
+ return New<TTestService>(
+ invoker,
+ secure,
+ createChannel,
+ std::move(memoryUsageTracker));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/rpc/unittests/lib/test_service.h b/yt/yt/core/rpc/unittests/lib/test_service.h
index da18cf132d..45d261f002 100644
--- a/yt/yt/core/rpc/unittests/lib/test_service.h
+++ b/yt/yt/core/rpc/unittests/lib/test_service.h
@@ -70,7 +70,8 @@ using TTestCreateChannelCallback = TCallback<IChannelPtr(const TString& address)
ITestServicePtr CreateTestService(
IInvokerPtr invoker,
bool secure,
- TTestCreateChannelCallback createChannel);
+ TTestCreateChannelCallback createChannel,
+ IMemoryUsageTrackerPtr memoryUsageTracker);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/rpc/unittests/mock/service.h b/yt/yt/core/rpc/unittests/mock/service.h
index 21204b12d9..d99c1a9370 100644
--- a/yt/yt/core/rpc/unittests/mock/service.h
+++ b/yt/yt/core/rpc/unittests/mock/service.h
@@ -49,6 +49,13 @@ public:
);
MOCK_METHOD(
+ i64,
+ GetTotalSize,
+ (),
+ (const, override)
+ );
+
+ MOCK_METHOD(
NYTree::IAttributeDictionary&,
GetEndpointAttributes,
(),
@@ -329,6 +336,13 @@ public:
);
MOCK_METHOD(
+ const IMemoryUsageTrackerPtr&,
+ GetMemoryUsageTracker,
+ (),
+ (const, override)
+ );
+
+ MOCK_METHOD(
const NLogging::TLogger&,
GetLogger,
(),
diff --git a/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp b/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp
index dce15890fd..f8bc40e8ed 100644
--- a/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp
+++ b/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp
@@ -29,7 +29,7 @@ TYPED_TEST_SUITE(TRpcTest, TAllTransports);
TYPED_TEST(TRpcTest, ResponseWithAllocationTags)
{
- auto memoryUsageTracker = this->GetNodeMemoryUsageTracker();
+ auto memoryUsageTracker = this->GetMemoryUsageTracker();
auto previousLimit = memoryUsageTracker->GetLimit();
memoryUsageTracker->SetLimit(2_GB);
static TMemoryTag testMemoryTag = 1 << 20;
@@ -79,7 +79,7 @@ TYPED_TEST(TRpcTest, ResponseWithAllocationTags)
}
auto memoryUsageBefore = CollectMemoryUsageSnapshot()->GetUsage(MemoryAllocationTag, ToString(testMemoryTag));
- EXPECT_LE(memoryUsageBefore, numberOfLoops * 1536_KB);
+ EXPECT_LE(memoryUsageBefore, numberOfLoops * 2048_KB);
for (const auto& rsp : responses) {
WaitFor(rsp).ValueOrThrow();
diff --git a/yt/yt/core/rpc/unittests/rpc_ut.cpp b/yt/yt/core/rpc/unittests/rpc_ut.cpp
index a28e9e0a37..eb3e393603 100644
--- a/yt/yt/core/rpc/unittests/rpc_ut.cpp
+++ b/yt/yt/core/rpc/unittests/rpc_ut.cpp
@@ -250,7 +250,7 @@ TYPED_TEST(TNotGrpcTest, ServerStreamsAborted)
auto rspOrError = WaitFor(req->Invoke());
EXPECT_EQ(NYT::EErrorCode::Timeout, rspOrError.GetCode());
- WaitFor(this->TestService_->GetServerStreamsAborted())
+ WaitFor(this->GetTestService()->GetServerStreamsAborted())
.ThrowOnError();
}
@@ -339,7 +339,7 @@ TYPED_TEST(TNotGrpcTest, ServerNotReading)
EXPECT_EQ(expectedInvokeErrorCode, rspOrError.GetCode());
}
- WaitFor(this->TestService_->GetSlowCallCanceled())
+ WaitFor(this->GetTestService()->GetSlowCallCanceled())
.ThrowOnError();
}
@@ -365,7 +365,7 @@ TYPED_TEST(TNotGrpcTest, ServerNotWriting)
EXPECT_EQ(expectedInvokeErrorCode, rspOrError.GetCode());
}
- WaitFor(this->TestService_->GetSlowCallCanceled())
+ WaitFor(this->GetTestService()->GetSlowCallCanceled())
.ThrowOnError();
}
@@ -400,7 +400,7 @@ TYPED_TEST(TNotGrpcTest, VeryLaggyStreamingRequest)
};
})");
auto config = ConvertTo<TServerConfigPtr>(TYsonString(configText));
- this->Server_->Configure(config);
+ this->GetServer()->Configure(config);
TTestProxy proxy(this->CreateChannel());
proxy.DefaultServerAttachmentsStreamingParameters().ReadTimeout = TDuration::MilliSeconds(500);
@@ -506,6 +506,36 @@ TYPED_TEST(TRpcTest, RegularAttachments)
EXPECT_EQ("TTestProxy_", StringFromSharedRef(attachments[2]));
}
+TYPED_TEST(TNotGrpcTest, TrackedRegularAttachments)
+{
+ TTestProxy proxy(this->CreateChannel());
+ auto req = proxy.RegularAttachments();
+
+ auto memoryUsageTracker = this->GetMemoryUsageTracker();
+ memoryUsageTracker->ClearTotalUsage();
+
+ req->Attachments().push_back(TSharedRef::FromString("Hello"));
+ req->Attachments().push_back(TSharedRef::FromString("from"));
+ req->Attachments().push_back(TSharedRef::FromString("TTestProxy"));
+
+ auto rspOrError = req->Invoke().Get();
+ EXPECT_TRUE(rspOrError.IsOK());
+ const auto& rsp = rspOrError.Value();
+
+ const auto& attachments = rsp->Attachments();
+ // Attachment allocator proactively allocate slice of 4 KB.
+ // See NYT::NBus::TPacketDecoder::TChunkedMemoryTrackingAllocator::Allocate.
+ // default stub = 4096.
+ // header + body = 110 bytes.
+ // attachments = 22 bytes.
+ // sum is 4228 bytes.
+ EXPECT_EQ(4228 + 32768, memoryUsageTracker->GetTotalUsage());
+ EXPECT_EQ(3u, attachments.size());
+ EXPECT_EQ("Hello_", StringFromSharedRef(attachments[0]));
+ EXPECT_EQ("from_", StringFromSharedRef(attachments[1]));
+ EXPECT_EQ("TTestProxy_", StringFromSharedRef(attachments[2]));
+}
+
TYPED_TEST(TRpcTest, NullAndEmptyAttachments)
{
TTestProxy proxy(this->CreateChannel());
@@ -530,6 +560,9 @@ TYPED_TEST(TNotGrpcTest, Compression)
const auto requestCodecId = NCompression::ECodec::Zstd_2;
const auto responseCodecId = NCompression::ECodec::Snappy;
+ auto memoryUsageTracker = this->GetMemoryUsageTracker();
+ memoryUsageTracker->ClearTotalUsage();
+
TString message("This is a message string.");
std::vector<TString> attachmentStrings({
"This is an attachment string.",
@@ -554,6 +587,15 @@ TYPED_TEST(TNotGrpcTest, Compression)
EXPECT_TRUE(rspOrError.IsOK());
auto rsp = rspOrError.Value();
+ // Attachment allocator proactively allocate slice of 4 KB.
+ // 32 KB - is read/write buffers per connection.
+ // See NYT::NBus::TPacketDecoder::TChunkedMemoryTrackingAllocator::Allocate.
+ // default stub = 4096.
+ // attachmentStrings[0].size() = 29 * 2 bytes from decoder.
+ // attachmentStrings[1].size() = 36 * 2 bytes from decoder.
+ // attachmentStrings[2].size() = 90 * 2 bytes from decoder.
+ // sum is 4591 bytes.
+ EXPECT_EQ(4591 + 32768, memoryUsageTracker->GetTotalUsage());
EXPECT_TRUE(rsp->message() == message);
EXPECT_TRUE(rsp->GetResponseMessage().Size() >= 2);
const auto& serializedResponseBody = SerializeProtoToRefWithCompression(*rsp, responseCodecId);
@@ -620,7 +662,7 @@ TYPED_TEST(TNotGrpcTest, RequestBytesThrottling)
};
})");
auto config = ConvertTo<TServerConfigPtr>(TYsonString(configText));
- this->Server_->Configure(config);
+ this->GetServer()->Configure(config);
TTestProxy proxy(this->CreateChannel());
@@ -640,8 +682,7 @@ TYPED_TEST(TNotGrpcTest, RequestBytesThrottling)
EXPECT_LE(std::abs(static_cast<i64>(timer.GetElapsedTime().MilliSeconds()) - 3000), 200);
}
-// Now test different types of errors
-
+// Now test different types of errors.
TYPED_TEST(TRpcTest, OK)
{
TTestProxy proxy(this->CreateChannel());
@@ -710,7 +751,7 @@ TYPED_TEST(TRpcTest, ServerTimeout)
auto req = proxy.SlowCanceledCall();
auto rspOrError = req->Invoke().Get();
EXPECT_TRUE(this->CheckTimeoutCode(rspOrError.GetCode()));
- WaitFor(this->TestService_->GetSlowCallCanceled())
+ WaitFor(this->GetTestService()->GetSlowCallCanceled())
.ThrowOnError();
}
@@ -726,7 +767,7 @@ TYPED_TEST(TRpcTest, ClientCancel)
EXPECT_TRUE(asyncRspOrError.IsSet());
auto rspOrError = asyncRspOrError.Get();
EXPECT_TRUE(this->CheckCancelCode(rspOrError.GetCode()));
- WaitFor(this->TestService_->GetSlowCallCanceled())
+ WaitFor(this->GetTestService()->GetSlowCallCanceled())
.ThrowOnError();
}
@@ -744,7 +785,7 @@ TYPED_TEST(TRpcTest, RequestQueueSizeLimit)
std::vector<TFuture<void>> futures;
std::vector<TTestProxy> proxies;
- // Concurrency byte limit + queue byte size limit = 10 + 20 = 30
+ // Concurrency byte limit + queue byte size limit = 10 + 20 = 30.
// First 30 requests must be successful, 31st request must be failed.
for (int i = 0; i < 30; ++i) {
proxies.push_back(TTestProxy(this->CreateChannel()));
@@ -767,11 +808,30 @@ TYPED_TEST(TRpcTest, RequestQueueSizeLimit)
EXPECT_TRUE(AllSucceeded(std::move(futures)).Get().IsOK());
}
+TYPED_TEST(TNotGrpcTest, RequestMemoryOverflowException)
+{
+ auto memoryUsageTracker = this->GetMemoryUsageTracker();
+ memoryUsageTracker->ClearTotalUsage();
+ auto memoryReferenceUsageTracker = this->GetMemoryUsageTracker();
+ memoryReferenceUsageTracker->ClearTotalUsage();
+
+ TTestProxy proxy(this->CreateChannel());
+ proxy.SetDefaultTimeout(TDuration::Seconds(10.0));
+ auto req = proxy.SomeCall();
+ req->set_a(42);
+ req->Attachments().push_back(TSharedRef::FromString(TString(34_MB, 'x')));
+ auto result = WaitFor(req->Invoke().AsVoid());
+
+ // Limit of memory is 32 MB.
+ EXPECT_EQ(NRpc::EErrorCode::MemoryOverflow, req->Invoke().Get().GetCode());
+}
+
TYPED_TEST(TNotGrpcTest, MemoryTracking)
{
TTestProxy proxy(this->CreateChannel());
- auto memoryUsageTracker = this->GetNodeMemoryUsageTracker();
+ auto memoryUsageTracker = this->GetMemoryUsageTracker();
memoryUsageTracker->ClearTotalUsage();
+
proxy.SetDefaultTimeout(TDuration::Seconds(10.0));
for (int i = 0; i < 300; ++i) {
auto req = proxy.SomeCall();
@@ -779,15 +839,20 @@ TYPED_TEST(TNotGrpcTest, MemoryTracking)
WaitFor(req->Invoke().AsVoid()).ThrowOnError();
}
+ Sleep(TDuration::MilliSeconds(200));
+
{
auto rpcUsage = memoryUsageTracker->GetTotalUsage();
- EXPECT_EQ(rpcUsage, (static_cast<i64>(32_KB)));
+
+ // 1261568 = 32768 + 1228800 = 32768 + 4096 * 300 + 300 * 110 (header + body).
+ // 32768 - socket buffers, 4096 - default size per request.
+ EXPECT_EQ(rpcUsage, 1294568);
}
}
TYPED_TEST(TNotGrpcTest, MemoryTrackingMultipleConnections)
{
- auto memoryUsageTracker = this->GetNodeMemoryUsageTracker();
+ auto memoryUsageTracker = this->GetMemoryUsageTracker();
memoryUsageTracker->ClearTotalUsage();
for (int i = 0; i < 300; ++i) {
TTestProxy proxy(this->CreateChannel());
@@ -798,15 +863,18 @@ TYPED_TEST(TNotGrpcTest, MemoryTrackingMultipleConnections)
}
{
- auto rpcUsage = memoryUsageTracker->GetTotalUsage();
- EXPECT_EQ(rpcUsage, (static_cast<i64>(32_KB) * 300));
+ // 11059200 / 300 = 36974 = 32768 + 4096 + 110 (header + body).
+ // 4 KB - stub for request.
+ // See NYT::NBus::TPacketDecoder::TChunkedMemoryTrackingAllocator::Allocate.
+ EXPECT_EQ(11092200, memoryUsageTracker->GetTotalUsage());
}
}
TYPED_TEST(TNotGrpcTest, MemoryTrackingMultipleConcurrent)
{
- auto memoryUsageTracker = this->GetNodeMemoryUsageTracker();
+ auto memoryUsageTracker = this->GetMemoryUsageTracker();
memoryUsageTracker->ClearTotalUsage();
+
std::vector<TFuture<void>> futures;
std::vector<TTestProxy> proxies;
@@ -820,15 +888,44 @@ TYPED_TEST(TNotGrpcTest, MemoryTrackingMultipleConcurrent)
futures.push_back(req->Invoke().AsVoid());
}
- Sleep(TDuration::MilliSeconds(300));
+ Sleep(TDuration::MilliSeconds(100));
+
{
auto rpcUsage = memoryUsageTracker->GetUsed();
- // 20 = concurrency (10) + queue (20)
- EXPECT_EQ(rpcUsage, (static_cast<i64>(32_KB) * 40));
+
+ // connections count - per connection size.
+ // 40 per connections, 30 per request (concurrency + queue = 10 + 20 = 30). Each request 4096 - by default + 108 (body + header).
+ EXPECT_TRUE(rpcUsage > (static_cast<i64>(32_KB) * 40));
}
+
EXPECT_TRUE(AllSet(std::move(futures)).Get().IsOK());
}
+TYPED_TEST(TNotGrpcTest, MemoryOvercommit)
+{
+ const auto requestCodecId = NCompression::ECodec::Zstd_2;
+
+ auto memoryReferenceUsageTracker = this->GetMemoryUsageTracker();
+ memoryReferenceUsageTracker->ClearTotalUsage();
+
+ TTestProxy proxy(this->CreateChannel());
+ proxy.SetDefaultTimeout(TDuration::Seconds(60.0));
+ auto req = proxy.SlowCall();
+ req->set_request_codec(static_cast<int>(requestCodecId));
+ req->Attachments().push_back(TSharedRef::FromString(TString(6_KB, 'x')));
+ WaitFor(req->Invoke()).ThrowOnError();
+ {
+ auto rpcUsage = memoryReferenceUsageTracker->GetTotalUsage();
+
+ // Attachment allocator proactively allocate slice of 4 KB.
+ // See NYT::NBus::TPacketDecoder::TChunkedMemoryTrackingAllocator::Allocate.
+ // default stub = 4096.
+ // header + body = 110 bytes.
+ // attachments = 6_KB kbytes.
+ EXPECT_EQ(rpcUsage, 32768 + 4096 + 6144 + 110);
+ }
+}
+
TYPED_TEST(TNotGrpcTest, RequestQueueByteSizeLimit)
{
const auto requestCodecId = NCompression::ECodec::Zstd_2;
@@ -837,7 +934,7 @@ TYPED_TEST(TNotGrpcTest, RequestQueueByteSizeLimit)
std::vector<TTestProxy> proxies;
// Every request contains 2 MB, 15 requests contain 30 MB.
- // Concurrency byte limit + queue byte size limit = 10 MB + 20 MB = 30 MB
+ // Concurrency byte limit + queue byte size limit = 10 MB + 20 MB = 30 MB.
// First 15 requests must be successful, 16th request must be failed.
for (int i = 0; i < 15; ++i) {
proxies.push_back(TTestProxy(this->CreateChannel()));
@@ -866,17 +963,19 @@ TYPED_TEST(TNotGrpcTest, RequestQueueByteSizeLimit)
TYPED_TEST(TRpcTest, ConcurrencyLimit)
{
- TTestProxy proxy(this->CreateChannel());
std::vector<TFuture<void>> futures;
for (int i = 0; i < 10; ++i) {
+ TTestProxy proxy(this->CreateChannel());
+ proxy.SetDefaultTimeout(TDuration::Seconds(10.0));
auto req = proxy.SlowCall();
futures.push_back(req->Invoke().AsVoid());
}
- Sleep(TDuration::MilliSeconds(100));
+ Sleep(TDuration::MilliSeconds(200));
TFuture<void> backlogFuture;
{
+ TTestProxy proxy(this->CreateChannel());
auto req = proxy.SlowCall();
backlogFuture = req->Invoke().AsVoid();
}
@@ -908,7 +1007,7 @@ TYPED_TEST(TRpcTest, CustomErrorMessage)
TYPED_TEST(TRpcTest, ServerStopped)
{
- this->Server_->Stop().Get().ThrowOnError();
+ this->GetServer()->Stop().Get().ThrowOnError();
TTestProxy proxy(this->CreateChannel());
auto req = proxy.SomeCall();
req->set_a(42);
@@ -926,14 +1025,14 @@ TYPED_TEST(TRpcTest, ConnectionLost)
Sleep(TDuration::Seconds(0.5));
EXPECT_FALSE(asyncRspOrError.IsSet());
- YT_UNUSED_FUTURE(this->Server_->Stop(false));
+ YT_UNUSED_FUTURE(this->GetServer()->Stop(false));
Sleep(TDuration::Seconds(2));
EXPECT_TRUE(asyncRspOrError.IsSet());
auto rspOrError = asyncRspOrError.Get();
EXPECT_EQ(NRpc::EErrorCode::TransportError, rspOrError.GetCode());
- WaitFor(this->TestService_->GetSlowCallCanceled())
+ WaitFor(this->GetTestService()->GetSlowCallCanceled())
.ThrowOnError();
}
@@ -988,7 +1087,7 @@ TYPED_TEST(TNotGrpcTest, RequiredClientFeatureNotSupported)
TYPED_TEST(TRpcTest, StopWithoutActiveRequests)
{
- auto stopResult = this->TestService_->Stop();
+ auto stopResult = this->GetTestService()->Stop();
EXPECT_TRUE(stopResult.IsSet());
}
@@ -997,8 +1096,11 @@ TYPED_TEST(TRpcTest, StopWithActiveRequests)
TTestProxy proxy(this->CreateChannel());
auto req = proxy.SlowCall();
auto reqResult = req->Invoke();
+
Sleep(TDuration::Seconds(0.5));
- auto stopResult = this->TestService_->Stop();
+
+ auto stopResult = this->GetTestService()->Stop();
+
EXPECT_FALSE(stopResult.IsSet());
EXPECT_TRUE(reqResult.Get().IsOK());
Sleep(TDuration::Seconds(0.5));
@@ -1007,11 +1109,13 @@ TYPED_TEST(TRpcTest, StopWithActiveRequests)
TYPED_TEST(TRpcTest, NoMoreRequestsAfterStop)
{
- auto stopResult = this->TestService_->Stop();
+ auto stopResult = this->GetTestService()->Stop();
EXPECT_TRUE(stopResult.IsSet());
+
TTestProxy proxy(this->CreateChannel());
auto req = proxy.SlowCall();
auto reqResult = req->Invoke();
+
EXPECT_FALSE(reqResult.Get().IsOK());
}
diff --git a/yt/yt/core/ytree/ypath_detail.cpp b/yt/yt/core/ytree/ypath_detail.cpp
index 1cc159b24f..0e15508993 100644
--- a/yt/yt/core/ytree/ypath_detail.cpp
+++ b/yt/yt/core/ytree/ypath_detail.cpp
@@ -1775,6 +1775,8 @@ IYPathServiceContextPtr CreateYPathContext(
return New<TYPathServiceContext>(
std::move(requestMessage),
+ TMemoryUsageTrackerGuard{},
+ GetNullMemoryUsageTracker(),
std::move(logger),
logLevel);
}
@@ -1790,6 +1792,8 @@ IYPathServiceContextPtr CreateYPathContext(
return New<TYPathServiceContext>(
std::move(requestHeader),
std::move(requestMessage),
+ TMemoryUsageTrackerGuard{},
+ GetNullMemoryUsageTracker(),
std::move(logger),
logLevel);
}