diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-02-16 10:13:56 +0300 |
---|---|---|
committer | Innokentii Mokin <innokentii@ydb.tech> | 2024-02-16 18:35:24 +0000 |
commit | e3fde7188076c08ea40e941d7e4b28c76c2e28ad (patch) | |
tree | ee02e3c1619dbc6dab65b799ae2128b87175b334 /yt | |
parent | 161fd436046f9d08f484b0bfd7b264fa004ba6c5 (diff) | |
download | ydb-e3fde7188076c08ea40e941d7e4b28c76c2e28ad.tar.gz |
Intermediate changes
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/core/rpc/config.cpp | 3 | ||||
-rw-r--r-- | yt/yt/core/rpc/config.h | 1 | ||||
-rw-r--r-- | yt/yt/core/rpc/service_detail.cpp | 58 | ||||
-rw-r--r-- | yt/yt/core/rpc/service_detail.h | 14 |
4 files changed, 64 insertions, 12 deletions
diff --git a/yt/yt/core/rpc/config.cpp b/yt/yt/core/rpc/config.cpp index ec29c3b88c..cd0d4a28a5 100644 --- a/yt/yt/core/rpc/config.cpp +++ b/yt/yt/core/rpc/config.cpp @@ -98,6 +98,9 @@ void TMethodConfig::Register(TRegistrar registrar) registrar.Parameter("queue_size_limit", &TThis::QueueSizeLimit) .Alias("max_queue_size") .Optional(); + registrar.Parameter("queue_bytes_size_limit", &TThis::QueueBytesSizeLimit) + .Alias("max_queue_bytes_size") + .Optional(); registrar.Parameter("concurrency_limit", &TThis::ConcurrencyLimit) .Alias("max_concurrency") .Optional(); diff --git a/yt/yt/core/rpc/config.h b/yt/yt/core/rpc/config.h index 9cdf971d18..26214ea5fc 100644 --- a/yt/yt/core/rpc/config.h +++ b/yt/yt/core/rpc/config.h @@ -152,6 +152,7 @@ class TMethodConfig public: std::optional<bool> Heavy; std::optional<int> QueueSizeLimit; + std::optional<i64> QueueBytesSizeLimit; std::optional<int> ConcurrencyLimit; std::optional<NLogging::ELogLevel> LogLevel; std::optional<TDuration> LoggingSuppressionTimeout; diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp index 92e81ea0ed..5c37360540 100644 --- a/yt/yt/core/rpc/service_detail.cpp +++ b/yt/yt/core/rpc/service_detail.cpp @@ -166,6 +166,13 @@ auto TServiceBase::TMethodDescriptor::SetQueueSizeLimit(int value) const -> TMet return result; } +auto TServiceBase::TMethodDescriptor::SetQueueBytesSizeLimit(i64 value) const -> TMethodDescriptor +{ + auto result = *this; + result.QueueBytesSizeLimit = value; + return result; +} + auto TServiceBase::TMethodDescriptor::SetConcurrencyLimit(int value) const -> TMethodDescriptor { auto result = *this; @@ -271,6 +278,7 @@ TServiceBase::TRuntimeMethodInfo::TRuntimeMethodInfo( , ResponseLoggingAnchor(NLogging::TLogManager::Get()->RegisterDynamicAnchor( Format("%v.%v ->", ServiceId.ServiceName, Descriptor.Method))) , RequestQueueSizeLimitErrorCounter(Profiler.Counter("/request_queue_size_errors")) + , RequestQueueBytesSizeLimitErrorCounter(Profiler.Counter("/request_queue_bytes_size_errors")) , UnauthenticatedRequestsCounter(Profiler.Counter("/unauthenticated_requests")) , LoggingSuppressionFailedRequestThrottler( CreateReconfigurableThroughputThrottler( @@ -1313,13 +1321,18 @@ void TRequestQueue::ConfigureWeightThrottler(const TThroughputThrottlerConfigPtr WeightThrottler_.Reconfigure(config); } -bool TRequestQueue::IsQueueLimitSizeExceeded() const +bool TRequestQueue::IsQueueSizeLimitExceeded() const { - return - QueueSize_.load(std::memory_order::relaxed) >= + return QueueSize_.load(std::memory_order::relaxed) >= RuntimeInfo_->QueueSizeLimit.load(std::memory_order::relaxed); } +bool TRequestQueue::IsQueueBytesSizeLimitExceeded() const +{ + return QueueBytesSize_.load(std::memory_order::relaxed) >= + RuntimeInfo_->QueueBytesSizeLimit.load(std::memory_order::relaxed); +} + int TRequestQueue::GetQueueSize() const { return QueueSize_.load(std::memory_order::relaxed); @@ -1343,7 +1356,7 @@ void TRequestQueue::OnRequestArrived(TServiceBase::TServiceContextPtr context) // Slow path. DecrementConcurrency(); - IncrementQueueSize(); + IncrementQueueSize(context); context->BeforeEnqueued(); YT_VERIFY(Queue_.enqueue(std::move(context))); @@ -1401,7 +1414,7 @@ void TRequestQueue::ScheduleRequestsFromQueue() return; } - DecrementQueueSize(); + DecrementQueueSize(context); context->AfterDequeued(); if (context->IsCanceled()) { context.Reset(); @@ -1432,15 +1445,26 @@ void TRequestQueue::RunRequest(TServiceBase::TServiceContextPtr context) } } -int TRequestQueue::IncrementQueueSize() +void TRequestQueue::IncrementQueueSize(const TServiceBase::TServiceContextPtr& context) { - return ++QueueSize_; + ++QueueSize_; + + auto requestSize = + GetMessageBodySize(context->GetRequestMessage()) + + GetTotalMessageAttachmentSize(context->GetRequestMessage()); + QueueBytesSize_ += requestSize; } -void TRequestQueue::DecrementQueueSize() +void TRequestQueue::DecrementQueueSize(const TServiceBase::TServiceContextPtr& context) { auto newQueueSize = --QueueSize_; YT_ASSERT(newQueueSize >= 0); + + auto requestSize = + GetMessageBodySize(context->GetRequestMessage()) + + GetTotalMessageAttachmentSize(context->GetRequestMessage()); + auto oldQueueBytesSize = QueueBytesSize_.fetch_sub(requestSize); + YT_ASSERT(oldQueueBytesSize >= requestSize); } int TRequestQueue::IncrementConcurrency() @@ -1610,7 +1634,7 @@ void TServiceBase::HandleRequest( auto maybeThrottled = GetThrottledError(*header); - if (requestQueue->IsQueueLimitSizeExceeded()) { + if (requestQueue->IsQueueSizeLimitExceeded()) { runtimeInfo->RequestQueueSizeLimitErrorCounter.Increment(); replyError(TError( NRpc::EErrorCode::RequestQueueSizeLimitExceeded, @@ -1621,6 +1645,17 @@ void TServiceBase::HandleRequest( return; } + if (requestQueue->IsQueueBytesSizeLimitExceeded()) { + runtimeInfo->RequestQueueBytesSizeLimitErrorCounter.Increment(); + replyError(TError( + NRpc::EErrorCode::RequestQueueSizeLimitExceeded, + "Request queue bytes size limit exceeded") + << TErrorAttribute("limit", runtimeInfo->QueueBytesSizeLimit.load()) + << TErrorAttribute("queue", requestQueue->GetName()) + << maybeThrottled); + return; + } + TCurrentTraceContextGuard traceContextGuard(traceContext); // NOTE: Do not use replyError() after this line. @@ -2387,6 +2422,7 @@ TServiceBase::TRuntimeMethodInfoPtr TServiceBase::RegisterMethod(const TMethodDe runtimeInfo->Heavy.store(descriptor.Options.Heavy); runtimeInfo->QueueSizeLimit.store(descriptor.QueueSizeLimit); + runtimeInfo->QueueBytesSizeLimit.store(descriptor.QueueBytesSizeLimit); runtimeInfo->ConcurrencyLimit.Reconfigure(descriptor.ConcurrencyLimit); runtimeInfo->LogLevel.store(descriptor.LogLevel); runtimeInfo->LoggingSuppressionTimeout.store(descriptor.LoggingSuppressionTimeout); @@ -2398,6 +2434,9 @@ TServiceBase::TRuntimeMethodInfoPtr TServiceBase::RegisterMethod(const TMethodDe profiler.AddFuncGauge("/request_queue_size_limit", MakeStrong(this), [=] { return runtimeInfo->QueueSizeLimit.load(std::memory_order::relaxed); }); + profiler.AddFuncGauge("/request_queue_bytes_size_limit", MakeStrong(this), [=] { + return runtimeInfo->QueueBytesSizeLimit.load(std::memory_order::relaxed); + }); profiler.AddFuncGauge("/concurrency_limit", MakeStrong(this), [=] { return runtimeInfo->ConcurrencyLimit.GetDynamicLimit(); }); @@ -2461,6 +2500,7 @@ void TServiceBase::DoConfigure( runtimeInfo->Heavy.store(methodConfig->Heavy.value_or(descriptor.Options.Heavy)); runtimeInfo->QueueSizeLimit.store(methodConfig->QueueSizeLimit.value_or(descriptor.QueueSizeLimit)); + runtimeInfo->QueueBytesSizeLimit.store(methodConfig->QueueBytesSizeLimit.value_or(descriptor.QueueBytesSizeLimit)); runtimeInfo->ConcurrencyLimit.Reconfigure(methodConfig->ConcurrencyLimit.value_or(descriptor.ConcurrencyLimit)); runtimeInfo->LogLevel.store(methodConfig->LogLevel.value_or(descriptor.LogLevel)); runtimeInfo->LoggingSuppressionTimeout.store(methodConfig->LoggingSuppressionTimeout.value_or(descriptor.LoggingSuppressionTimeout)); diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h index ba52cf514b..87a9458431 100644 --- a/yt/yt/core/rpc/service_detail.h +++ b/yt/yt/core/rpc/service_detail.h @@ -560,6 +560,9 @@ protected: //! Maximum number of requests in queue (both waiting and executing). int QueueSizeLimit = 10'000; + //! Maximum total size of requests in queue (both waiting and executing). + i64 QueueBytesSizeLimit = 2_GB; + //! Maximum number of requests executing concurrently. int ConcurrencyLimit = 10'000; @@ -597,6 +600,7 @@ protected: TMethodDescriptor SetHeavy(bool value) const; TMethodDescriptor SetResponseCodec(NCompression::ECodec value) const; TMethodDescriptor SetQueueSizeLimit(int value) const; + TMethodDescriptor SetQueueBytesSizeLimit(i64 value) const; TMethodDescriptor SetConcurrencyLimit(int value) const; TMethodDescriptor SetSystem(bool value) const; TMethodDescriptor SetLogLevel(NLogging::ELogLevel value) const; @@ -700,11 +704,13 @@ protected: std::atomic<bool> Pooled = true; std::atomic<int> QueueSizeLimit = 0; + std::atomic<i64> QueueBytesSizeLimit = 0; TDynamicConcurrencyLimit ConcurrencyLimit; std::atomic<double> WaitingTimeoutFraction = 0; NProfiling::TCounter RequestQueueSizeLimitErrorCounter; + NProfiling::TCounter RequestQueueBytesSizeLimitErrorCounter; NProfiling::TCounter UnauthenticatedRequestsCounter; std::atomic<NLogging::ELogLevel> LogLevel = {}; @@ -1019,7 +1025,8 @@ public: bool Register(TServiceBase* service, TServiceBase::TRuntimeMethodInfo* runtimeInfo); void Configure(const TMethodConfigPtr& config); - bool IsQueueLimitSizeExceeded() const; + bool IsQueueSizeLimitExceeded() const; + bool IsQueueBytesSizeLimitExceeded() const; int GetQueueSize() const; int GetConcurrency() const; @@ -1054,14 +1061,15 @@ private: std::atomic<bool> Throttled_ = false; std::atomic<int> QueueSize_ = 0; + std::atomic<i64> QueueBytesSize_ = 0; moodycamel::ConcurrentQueue<TServiceBase::TServiceContextPtr> Queue_; void ScheduleRequestsFromQueue(); void RunRequest(TServiceBase::TServiceContextPtr context); - int IncrementQueueSize(); - void DecrementQueueSize(); + void IncrementQueueSize(const TServiceBase::TServiceContextPtr& context); + void DecrementQueueSize(const TServiceBase::TServiceContextPtr& context); int IncrementConcurrency(); void DecrementConcurrency(); |