aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-02-16 10:13:56 +0300
committerInnokentii Mokin <innokentii@ydb.tech>2024-02-16 18:35:24 +0000
commite3fde7188076c08ea40e941d7e4b28c76c2e28ad (patch)
treeee02e3c1619dbc6dab65b799ae2128b87175b334 /yt
parent161fd436046f9d08f484b0bfd7b264fa004ba6c5 (diff)
downloadydb-e3fde7188076c08ea40e941d7e4b28c76c2e28ad.tar.gz
Intermediate changes
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/core/rpc/config.cpp3
-rw-r--r--yt/yt/core/rpc/config.h1
-rw-r--r--yt/yt/core/rpc/service_detail.cpp58
-rw-r--r--yt/yt/core/rpc/service_detail.h14
4 files changed, 64 insertions, 12 deletions
diff --git a/yt/yt/core/rpc/config.cpp b/yt/yt/core/rpc/config.cpp
index ec29c3b88c..cd0d4a28a5 100644
--- a/yt/yt/core/rpc/config.cpp
+++ b/yt/yt/core/rpc/config.cpp
@@ -98,6 +98,9 @@ void TMethodConfig::Register(TRegistrar registrar)
registrar.Parameter("queue_size_limit", &TThis::QueueSizeLimit)
.Alias("max_queue_size")
.Optional();
+ registrar.Parameter("queue_bytes_size_limit", &TThis::QueueBytesSizeLimit)
+ .Alias("max_queue_bytes_size")
+ .Optional();
registrar.Parameter("concurrency_limit", &TThis::ConcurrencyLimit)
.Alias("max_concurrency")
.Optional();
diff --git a/yt/yt/core/rpc/config.h b/yt/yt/core/rpc/config.h
index 9cdf971d18..26214ea5fc 100644
--- a/yt/yt/core/rpc/config.h
+++ b/yt/yt/core/rpc/config.h
@@ -152,6 +152,7 @@ class TMethodConfig
public:
std::optional<bool> Heavy;
std::optional<int> QueueSizeLimit;
+ std::optional<i64> QueueBytesSizeLimit;
std::optional<int> ConcurrencyLimit;
std::optional<NLogging::ELogLevel> LogLevel;
std::optional<TDuration> LoggingSuppressionTimeout;
diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp
index 92e81ea0ed..5c37360540 100644
--- a/yt/yt/core/rpc/service_detail.cpp
+++ b/yt/yt/core/rpc/service_detail.cpp
@@ -166,6 +166,13 @@ auto TServiceBase::TMethodDescriptor::SetQueueSizeLimit(int value) const -> TMet
return result;
}
+auto TServiceBase::TMethodDescriptor::SetQueueBytesSizeLimit(i64 value) const -> TMethodDescriptor
+{
+ auto result = *this;
+ result.QueueBytesSizeLimit = value;
+ return result;
+}
+
auto TServiceBase::TMethodDescriptor::SetConcurrencyLimit(int value) const -> TMethodDescriptor
{
auto result = *this;
@@ -271,6 +278,7 @@ TServiceBase::TRuntimeMethodInfo::TRuntimeMethodInfo(
, ResponseLoggingAnchor(NLogging::TLogManager::Get()->RegisterDynamicAnchor(
Format("%v.%v ->", ServiceId.ServiceName, Descriptor.Method)))
, RequestQueueSizeLimitErrorCounter(Profiler.Counter("/request_queue_size_errors"))
+ , RequestQueueBytesSizeLimitErrorCounter(Profiler.Counter("/request_queue_bytes_size_errors"))
, UnauthenticatedRequestsCounter(Profiler.Counter("/unauthenticated_requests"))
, LoggingSuppressionFailedRequestThrottler(
CreateReconfigurableThroughputThrottler(
@@ -1313,13 +1321,18 @@ void TRequestQueue::ConfigureWeightThrottler(const TThroughputThrottlerConfigPtr
WeightThrottler_.Reconfigure(config);
}
-bool TRequestQueue::IsQueueLimitSizeExceeded() const
+bool TRequestQueue::IsQueueSizeLimitExceeded() const
{
- return
- QueueSize_.load(std::memory_order::relaxed) >=
+ return QueueSize_.load(std::memory_order::relaxed) >=
RuntimeInfo_->QueueSizeLimit.load(std::memory_order::relaxed);
}
+bool TRequestQueue::IsQueueBytesSizeLimitExceeded() const
+{
+ return QueueBytesSize_.load(std::memory_order::relaxed) >=
+ RuntimeInfo_->QueueBytesSizeLimit.load(std::memory_order::relaxed);
+}
+
int TRequestQueue::GetQueueSize() const
{
return QueueSize_.load(std::memory_order::relaxed);
@@ -1343,7 +1356,7 @@ void TRequestQueue::OnRequestArrived(TServiceBase::TServiceContextPtr context)
// Slow path.
DecrementConcurrency();
- IncrementQueueSize();
+ IncrementQueueSize(context);
context->BeforeEnqueued();
YT_VERIFY(Queue_.enqueue(std::move(context)));
@@ -1401,7 +1414,7 @@ void TRequestQueue::ScheduleRequestsFromQueue()
return;
}
- DecrementQueueSize();
+ DecrementQueueSize(context);
context->AfterDequeued();
if (context->IsCanceled()) {
context.Reset();
@@ -1432,15 +1445,26 @@ void TRequestQueue::RunRequest(TServiceBase::TServiceContextPtr context)
}
}
-int TRequestQueue::IncrementQueueSize()
+void TRequestQueue::IncrementQueueSize(const TServiceBase::TServiceContextPtr& context)
{
- return ++QueueSize_;
+ ++QueueSize_;
+
+ auto requestSize =
+ GetMessageBodySize(context->GetRequestMessage()) +
+ GetTotalMessageAttachmentSize(context->GetRequestMessage());
+ QueueBytesSize_ += requestSize;
}
-void TRequestQueue::DecrementQueueSize()
+void TRequestQueue::DecrementQueueSize(const TServiceBase::TServiceContextPtr& context)
{
auto newQueueSize = --QueueSize_;
YT_ASSERT(newQueueSize >= 0);
+
+ auto requestSize =
+ GetMessageBodySize(context->GetRequestMessage()) +
+ GetTotalMessageAttachmentSize(context->GetRequestMessage());
+ auto oldQueueBytesSize = QueueBytesSize_.fetch_sub(requestSize);
+ YT_ASSERT(oldQueueBytesSize >= requestSize);
}
int TRequestQueue::IncrementConcurrency()
@@ -1610,7 +1634,7 @@ void TServiceBase::HandleRequest(
auto maybeThrottled = GetThrottledError(*header);
- if (requestQueue->IsQueueLimitSizeExceeded()) {
+ if (requestQueue->IsQueueSizeLimitExceeded()) {
runtimeInfo->RequestQueueSizeLimitErrorCounter.Increment();
replyError(TError(
NRpc::EErrorCode::RequestQueueSizeLimitExceeded,
@@ -1621,6 +1645,17 @@ void TServiceBase::HandleRequest(
return;
}
+ if (requestQueue->IsQueueBytesSizeLimitExceeded()) {
+ runtimeInfo->RequestQueueBytesSizeLimitErrorCounter.Increment();
+ replyError(TError(
+ NRpc::EErrorCode::RequestQueueSizeLimitExceeded,
+ "Request queue bytes size limit exceeded")
+ << TErrorAttribute("limit", runtimeInfo->QueueBytesSizeLimit.load())
+ << TErrorAttribute("queue", requestQueue->GetName())
+ << maybeThrottled);
+ return;
+ }
+
TCurrentTraceContextGuard traceContextGuard(traceContext);
// NOTE: Do not use replyError() after this line.
@@ -2387,6 +2422,7 @@ TServiceBase::TRuntimeMethodInfoPtr TServiceBase::RegisterMethod(const TMethodDe
runtimeInfo->Heavy.store(descriptor.Options.Heavy);
runtimeInfo->QueueSizeLimit.store(descriptor.QueueSizeLimit);
+ runtimeInfo->QueueBytesSizeLimit.store(descriptor.QueueBytesSizeLimit);
runtimeInfo->ConcurrencyLimit.Reconfigure(descriptor.ConcurrencyLimit);
runtimeInfo->LogLevel.store(descriptor.LogLevel);
runtimeInfo->LoggingSuppressionTimeout.store(descriptor.LoggingSuppressionTimeout);
@@ -2398,6 +2434,9 @@ TServiceBase::TRuntimeMethodInfoPtr TServiceBase::RegisterMethod(const TMethodDe
profiler.AddFuncGauge("/request_queue_size_limit", MakeStrong(this), [=] {
return runtimeInfo->QueueSizeLimit.load(std::memory_order::relaxed);
});
+ profiler.AddFuncGauge("/request_queue_bytes_size_limit", MakeStrong(this), [=] {
+ return runtimeInfo->QueueBytesSizeLimit.load(std::memory_order::relaxed);
+ });
profiler.AddFuncGauge("/concurrency_limit", MakeStrong(this), [=] {
return runtimeInfo->ConcurrencyLimit.GetDynamicLimit();
});
@@ -2461,6 +2500,7 @@ void TServiceBase::DoConfigure(
runtimeInfo->Heavy.store(methodConfig->Heavy.value_or(descriptor.Options.Heavy));
runtimeInfo->QueueSizeLimit.store(methodConfig->QueueSizeLimit.value_or(descriptor.QueueSizeLimit));
+ runtimeInfo->QueueBytesSizeLimit.store(methodConfig->QueueBytesSizeLimit.value_or(descriptor.QueueBytesSizeLimit));
runtimeInfo->ConcurrencyLimit.Reconfigure(methodConfig->ConcurrencyLimit.value_or(descriptor.ConcurrencyLimit));
runtimeInfo->LogLevel.store(methodConfig->LogLevel.value_or(descriptor.LogLevel));
runtimeInfo->LoggingSuppressionTimeout.store(methodConfig->LoggingSuppressionTimeout.value_or(descriptor.LoggingSuppressionTimeout));
diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h
index ba52cf514b..87a9458431 100644
--- a/yt/yt/core/rpc/service_detail.h
+++ b/yt/yt/core/rpc/service_detail.h
@@ -560,6 +560,9 @@ protected:
//! Maximum number of requests in queue (both waiting and executing).
int QueueSizeLimit = 10'000;
+ //! Maximum total size of requests in queue (both waiting and executing).
+ i64 QueueBytesSizeLimit = 2_GB;
+
//! Maximum number of requests executing concurrently.
int ConcurrencyLimit = 10'000;
@@ -597,6 +600,7 @@ protected:
TMethodDescriptor SetHeavy(bool value) const;
TMethodDescriptor SetResponseCodec(NCompression::ECodec value) const;
TMethodDescriptor SetQueueSizeLimit(int value) const;
+ TMethodDescriptor SetQueueBytesSizeLimit(i64 value) const;
TMethodDescriptor SetConcurrencyLimit(int value) const;
TMethodDescriptor SetSystem(bool value) const;
TMethodDescriptor SetLogLevel(NLogging::ELogLevel value) const;
@@ -700,11 +704,13 @@ protected:
std::atomic<bool> Pooled = true;
std::atomic<int> QueueSizeLimit = 0;
+ std::atomic<i64> QueueBytesSizeLimit = 0;
TDynamicConcurrencyLimit ConcurrencyLimit;
std::atomic<double> WaitingTimeoutFraction = 0;
NProfiling::TCounter RequestQueueSizeLimitErrorCounter;
+ NProfiling::TCounter RequestQueueBytesSizeLimitErrorCounter;
NProfiling::TCounter UnauthenticatedRequestsCounter;
std::atomic<NLogging::ELogLevel> LogLevel = {};
@@ -1019,7 +1025,8 @@ public:
bool Register(TServiceBase* service, TServiceBase::TRuntimeMethodInfo* runtimeInfo);
void Configure(const TMethodConfigPtr& config);
- bool IsQueueLimitSizeExceeded() const;
+ bool IsQueueSizeLimitExceeded() const;
+ bool IsQueueBytesSizeLimitExceeded() const;
int GetQueueSize() const;
int GetConcurrency() const;
@@ -1054,14 +1061,15 @@ private:
std::atomic<bool> Throttled_ = false;
std::atomic<int> QueueSize_ = 0;
+ std::atomic<i64> QueueBytesSize_ = 0;
moodycamel::ConcurrentQueue<TServiceBase::TServiceContextPtr> Queue_;
void ScheduleRequestsFromQueue();
void RunRequest(TServiceBase::TServiceContextPtr context);
- int IncrementQueueSize();
- void DecrementQueueSize();
+ void IncrementQueueSize(const TServiceBase::TServiceContextPtr& context);
+ void DecrementQueueSize(const TServiceBase::TServiceContextPtr& context);
int IncrementConcurrency();
void DecrementConcurrency();