aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-03-01 20:44:25 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-03-01 20:52:43 +0300
commite3bebc56da615c637dc555747e3f407b6084e393 (patch)
tree802960e65746957d1a831ae64d6dc8805f06860a
parenta388d4a51a6512436615d1006ddb75a623852ce2 (diff)
downloadydb-e3bebc56da615c637dc555747e3f407b6084e393.tar.gz
Intermediate changes
-rw-r--r--yt/yt/core/rpc/config.cpp3
-rw-r--r--yt/yt/core/rpc/config.h1
-rw-r--r--yt/yt/core/rpc/service_detail.cpp111
-rw-r--r--yt/yt/core/rpc/service_detail.h36
4 files changed, 122 insertions, 29 deletions
diff --git a/yt/yt/core/rpc/config.cpp b/yt/yt/core/rpc/config.cpp
index 01e27c3649..e715e9b4e4 100644
--- a/yt/yt/core/rpc/config.cpp
+++ b/yt/yt/core/rpc/config.cpp
@@ -104,6 +104,9 @@ void TMethodConfig::Register(TRegistrar registrar)
registrar.Parameter("concurrency_limit", &TThis::ConcurrencyLimit)
.Alias("max_concurrency")
.Optional();
+ registrar.Parameter("concurrency_byte_limit", &TThis::ConcurrencyByteLimit)
+ .Alias("max_concurrency_byte")
+ .Optional();
registrar.Parameter("log_level", &TThis::LogLevel)
.Optional();
registrar.Parameter("request_bytes_throttler", &TThis::RequestBytesThrottler)
diff --git a/yt/yt/core/rpc/config.h b/yt/yt/core/rpc/config.h
index 8a5a14375d..707604e6f5 100644
--- a/yt/yt/core/rpc/config.h
+++ b/yt/yt/core/rpc/config.h
@@ -154,6 +154,7 @@ public:
std::optional<int> QueueSizeLimit;
std::optional<i64> QueueByteSizeLimit;
std::optional<int> ConcurrencyLimit;
+ std::optional<i64> ConcurrencyByteLimit;
std::optional<NLogging::ELogLevel> LogLevel;
std::optional<TDuration> LoggingSuppressionTimeout;
NConcurrency::TThroughputThrottlerConfigPtr RequestBytesThrottler;
diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp
index f99333ac73..0b896c8bf4 100644
--- a/yt/yt/core/rpc/service_detail.cpp
+++ b/yt/yt/core/rpc/service_detail.cpp
@@ -115,6 +115,34 @@ void TDynamicConcurrencyLimit::SetDynamicLimit(std::optional<int> dynamicLimit)
////////////////////////////////////////////////////////////////////////////////
+void TDynamicConcurrencyByteLimit::Reconfigure(i64 limit)
+{
+ ConfigByteLimit_.store(limit, std::memory_order::relaxed);
+ SetDynamicByteLimit(limit);
+}
+
+i64 TDynamicConcurrencyByteLimit::GetByteLimitFromConfiguration() const
+{
+ return ConfigByteLimit_.load(std::memory_order::relaxed);
+}
+
+i64 TDynamicConcurrencyByteLimit::GetDynamicByteLimit() const
+{
+ return DynamicByteLimit_.load(std::memory_order::relaxed);
+}
+
+void TDynamicConcurrencyByteLimit::SetDynamicByteLimit(std::optional<i64> dynamicLimit)
+{
+ auto limit = dynamicLimit.has_value() ? *dynamicLimit : ConfigByteLimit_.load(std::memory_order::relaxed);
+ auto oldLimit = DynamicByteLimit_.exchange(limit, std::memory_order::relaxed);
+
+ if (oldLimit != limit) {
+ Updated_.Fire();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
TServiceBase::TMethodDescriptor::TMethodDescriptor(
TString method,
TLiteHandler liteHandler,
@@ -180,6 +208,13 @@ auto TServiceBase::TMethodDescriptor::SetConcurrencyLimit(int value) const -> TM
return result;
}
+auto TServiceBase::TMethodDescriptor::SetConcurrencyByteLimit(i64 value) const -> TMethodDescriptor
+{
+ auto result = *this;
+ result.ConcurrencyByteLimit = value;
+ return result;
+}
+
auto TServiceBase::TMethodDescriptor::SetSystem(bool value) const -> TMethodDescriptor
{
auto result = *this;
@@ -1042,16 +1077,16 @@ private:
}
if (RequestRun_) {
- RequestQueue_->OnRequestFinished();
+ auto requestTotalSize = GetMessageBodySize(RequestMessage_) +
+ GetTotalMessageAttachmentSize(RequestMessage_);
+ RequestQueue_->OnRequestFinished(requestTotalSize);
}
-
if (ActiveRequestCountIncremented_) {
Service_->DecrementActiveRequestCount();
}
}
-
void LogRequest() override
{
TStringBuilder builder;
@@ -1298,6 +1333,10 @@ bool TRequestQueue::Register(TServiceBase* service, TServiceBase::TRuntimeMethod
RuntimeInfo_->ConcurrencyLimit.SubscribeUpdated(BIND(
&TRequestQueue::OnConcurrencyLimitChanged,
MakeWeak(this)));
+
+ RuntimeInfo_->ConcurrencyByteLimit.SubscribeUpdated(BIND(
+ &TRequestQueue::OnConcurrencyByteLimitChanged,
+ MakeWeak(this)));
}
Registered_.store(true, std::memory_order::release);
}
@@ -1312,6 +1351,13 @@ void TRequestQueue::OnConcurrencyLimitChanged()
}
}
+void TRequestQueue::OnConcurrencyByteLimitChanged()
+{
+ if (QueueByteSize_.load() > 0) {
+ ScheduleRequestsFromQueue();
+ }
+}
+
void TRequestQueue::TRequestThrottler::Reconfigure(const TThroughputThrottlerConfigPtr& config)
{
Throttler->Reconfigure(config ? config : New<TThroughputThrottlerConfig>());
@@ -1369,11 +1415,16 @@ int TRequestQueue::GetConcurrency() const
return Concurrency_.load(std::memory_order::relaxed);
}
-void TRequestQueue::OnRequestArrived(TServiceBase::TServiceContextPtr context)
+i64 TRequestQueue::GetConcurrencyByte() const
+{
+ return ConcurrencyByte_.load(std::memory_order::relaxed);
+}
+
+void TRequestQueue::OnRequestArrived(const TServiceBase::TServiceContextPtr& context)
{
// Fast path.
- auto newConcurrencySemaphore = IncrementConcurrency();
- if (newConcurrencySemaphore <= RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit() &&
+ auto concurrencyExceeded = IncrementConcurrency(context);
+ if (concurrencyExceeded &&
!AreThrottlersOverdrafted())
{
RunRequest(std::move(context));
@@ -1381,7 +1432,7 @@ void TRequestQueue::OnRequestArrived(TServiceBase::TServiceContextPtr context)
}
// Slow path.
- DecrementConcurrency();
+ DecrementConcurrency(GetTotalRequestSize(context));
IncrementQueueSize(context);
context->BeforeEnqueued();
@@ -1390,9 +1441,9 @@ void TRequestQueue::OnRequestArrived(TServiceBase::TServiceContextPtr context)
ScheduleRequestsFromQueue();
}
-void TRequestQueue::OnRequestFinished()
+void TRequestQueue::OnRequestFinished(i64 requestTotalSize)
{
- DecrementConcurrency();
+ DecrementConcurrency(requestTotalSize);
if (QueueSize_.load() > 0) {
// Slow path.
@@ -1447,7 +1498,7 @@ void TRequestQueue::ScheduleRequestsFromQueue()
}
}
- IncrementConcurrency();
+ IncrementConcurrency(context);
RunRequest(std::move(context));
}
}
@@ -1474,34 +1525,38 @@ void TRequestQueue::RunRequest(TServiceBase::TServiceContextPtr context)
void TRequestQueue::IncrementQueueSize(const TServiceBase::TServiceContextPtr& context)
{
++QueueSize_;
-
- auto requestSize =
- GetMessageBodySize(context->GetRequestMessage()) +
- GetTotalMessageAttachmentSize(context->GetRequestMessage());
- QueueByteSize_ += requestSize;
+ QueueByteSize_.fetch_add(GetTotalRequestSize(context));
}
void TRequestQueue::DecrementQueueSize(const TServiceBase::TServiceContextPtr& context)
{
auto newQueueSize = --QueueSize_;
+ auto oldQueueBytesSize = QueueByteSize_.fetch_sub(GetTotalRequestSize(context));
+
YT_ASSERT(newQueueSize >= 0);
+ YT_ASSERT(oldQueueBytesSize >= 0);
+}
- auto requestSize =
- GetMessageBodySize(context->GetRequestMessage()) +
+i64 TRequestQueue::GetTotalRequestSize(const TServiceBase::TServiceContextPtr& context)
+{
+ return GetMessageBodySize(context->GetRequestMessage()) +
GetTotalMessageAttachmentSize(context->GetRequestMessage());
- auto oldQueueBytesSize = QueueByteSize_.fetch_sub(requestSize);
- YT_ASSERT(oldQueueBytesSize >= requestSize);
}
-int TRequestQueue::IncrementConcurrency()
+bool TRequestQueue::IncrementConcurrency(const TServiceBase::TServiceContextPtr& context)
{
- return ++Concurrency_;
+ auto resultSize = ++Concurrency_ <= RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit();
+ auto resultByteSize = ConcurrencyByte_.fetch_add(GetTotalRequestSize(context)) <= RuntimeInfo_->ConcurrencyByteLimit.GetDynamicByteLimit();
+ return resultSize && resultByteSize;
}
-void TRequestQueue::DecrementConcurrency()
+void TRequestQueue::DecrementConcurrency(i64 requestTotalSize)
{
auto newConcurrencySemaphore = --Concurrency_;
+ auto newConcurrencyByteSemaphore = ConcurrencyByte_.fetch_sub(requestTotalSize);
+
YT_ASSERT(newConcurrencySemaphore >= 0);
+ YT_ASSERT(newConcurrencyByteSemaphore >= 0);
}
bool TRequestQueue::AreThrottlersOverdrafted() const
@@ -1516,9 +1571,7 @@ void TRequestQueue::AcquireThrottlers(const TServiceBase::TServiceContextPtr& co
{
if (BytesThrottler_.Specified.load(std::memory_order::acquire)) {
// Slow path.
- auto requestSize =
- GetMessageBodySize(context->GetRequestMessage()) +
- GetTotalMessageAttachmentSize(context->GetRequestMessage());
+ auto requestSize = GetTotalRequestSize(context);
BytesThrottler_.Throttler->Acquire(requestSize);
}
if (WeightThrottler_.Specified.load(std::memory_order::acquire)) {
@@ -1880,6 +1933,9 @@ void TServiceBase::RegisterRequestQueue(
profiler.AddFuncGauge("/concurrency", MakeStrong(this), [=] {
return requestQueue->GetConcurrency();
});
+ profiler.AddFuncGauge("/concurrency_byte", MakeStrong(this), [=] {
+ return requestQueue->GetConcurrencyByte();
+ });
TMethodConfigPtr methodConfig;
if (auto config = Config_.Acquire()) {
@@ -2459,6 +2515,7 @@ TServiceBase::TRuntimeMethodInfoPtr TServiceBase::RegisterMethod(const TMethodDe
runtimeInfo->QueueSizeLimit.store(descriptor.QueueSizeLimit);
runtimeInfo->QueueByteSizeLimit.store(descriptor.QueueByteSizeLimit);
runtimeInfo->ConcurrencyLimit.Reconfigure(descriptor.ConcurrencyLimit);
+ runtimeInfo->ConcurrencyByteLimit.Reconfigure(descriptor.ConcurrencyByteLimit);
runtimeInfo->LogLevel.store(descriptor.LogLevel);
runtimeInfo->LoggingSuppressionTimeout.store(descriptor.LoggingSuppressionTimeout);
@@ -2475,6 +2532,9 @@ TServiceBase::TRuntimeMethodInfoPtr TServiceBase::RegisterMethod(const TMethodDe
profiler.AddFuncGauge("/concurrency_limit", MakeStrong(this), [=] {
return runtimeInfo->ConcurrencyLimit.GetDynamicLimit();
});
+ profiler.AddFuncGauge("/concurrency_byte_limit", MakeStrong(this), [=] {
+ return runtimeInfo->ConcurrencyByteLimit.GetDynamicByteLimit();
+ });
return runtimeInfo;
}
@@ -2537,6 +2597,7 @@ void TServiceBase::DoConfigure(
runtimeInfo->QueueSizeLimit.store(methodConfig->QueueSizeLimit.value_or(descriptor.QueueSizeLimit));
runtimeInfo->QueueByteSizeLimit.store(methodConfig->QueueByteSizeLimit.value_or(descriptor.QueueByteSizeLimit));
runtimeInfo->ConcurrencyLimit.Reconfigure(methodConfig->ConcurrencyLimit.value_or(descriptor.ConcurrencyLimit));
+ runtimeInfo->ConcurrencyByteLimit.Reconfigure(methodConfig->ConcurrencyByteLimit.value_or(descriptor.ConcurrencyByteLimit));
runtimeInfo->LogLevel.store(methodConfig->LogLevel.value_or(descriptor.LogLevel));
runtimeInfo->LoggingSuppressionTimeout.store(methodConfig->LoggingSuppressionTimeout.value_or(descriptor.LoggingSuppressionTimeout));
runtimeInfo->Pooled.store(methodConfig->Pooled.value_or(config->Pooled.value_or(descriptor.Pooled)));
diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h
index dae71d0c1e..a388116d67 100644
--- a/yt/yt/core/rpc/service_detail.h
+++ b/yt/yt/core/rpc/service_detail.h
@@ -476,6 +476,24 @@ private:
////////////////////////////////////////////////////////////////////////////////
+class TDynamicConcurrencyByteLimit
+{
+public:
+ DEFINE_SIGNAL(void(), Updated);
+
+ void Reconfigure(i64 limit);
+ i64 GetByteLimitFromConfiguration() const;
+
+ i64 GetDynamicByteLimit() const;
+ void SetDynamicByteLimit(std::optional<i64> dynamicLimit);
+
+private:
+ std::atomic<i64> ConfigByteLimit_ = 0;
+ std::atomic<i64> DynamicByteLimit_ = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
//! Provides a base for implementing IService.
class TServiceBase
: public virtual IService
@@ -566,6 +584,9 @@ protected:
//! Maximum number of requests executing concurrently.
int ConcurrencyLimit = 10'000;
+ //! Maximum total size of requests executing concurrently.
+ i64 ConcurrencyByteLimit = 4_GB;
+
//! System requests are completely transparent to derived classes;
//! in particular, |BeforeInvoke| is not called.
//! Also system methods do not require authentication.
@@ -605,6 +626,7 @@ protected:
TMethodDescriptor SetQueueSizeLimit(int value) const;
TMethodDescriptor SetQueueByteSizeLimit(i64 value) const;
TMethodDescriptor SetConcurrencyLimit(int value) const;
+ TMethodDescriptor SetConcurrencyByteLimit(i64 value) const;
TMethodDescriptor SetSystem(bool value) const;
TMethodDescriptor SetLogLevel(NLogging::ELogLevel value) const;
TMethodDescriptor SetLoggingSuppressionTimeout(TDuration value) const;
@@ -711,6 +733,7 @@ protected:
std::atomic<i64> QueueByteSizeLimit = 0;
TDynamicConcurrencyLimit ConcurrencyLimit;
+ TDynamicConcurrencyByteLimit ConcurrencyByteLimit;
std::atomic<double> WaitingTimeoutFraction = 0;
NProfiling::TCounter RequestQueueSizeLimitErrorCounter;
@@ -1037,9 +1060,10 @@ public:
int GetQueueSize() const;
i64 GetQueueByteSize() const;
int GetConcurrency() const;
+ i64 GetConcurrencyByte() const;
- void OnRequestArrived(TServiceBase::TServiceContextPtr context);
- void OnRequestFinished();
+ void OnRequestArrived(const TServiceBase::TServiceContextPtr& context);
+ void OnRequestFinished(i64 requestTotalSize);
void ConfigureWeightThrottler(const NConcurrency::TThroughputThrottlerConfigPtr& config);
void ConfigureBytesThrottler(const NConcurrency::TThroughputThrottlerConfigPtr& config);
@@ -1054,6 +1078,7 @@ private:
TServiceBase::TRuntimeMethodInfo* RuntimeInfo_ = nullptr;
std::atomic<int> Concurrency_ = 0;
+ std::atomic<i64> ConcurrencyByte_ = 0;
struct TRequestThrottler
{
@@ -1075,17 +1100,20 @@ private:
void ScheduleRequestsFromQueue();
void RunRequest(TServiceBase::TServiceContextPtr context);
+ i64 GetTotalRequestSize(const TServiceBase::TServiceContextPtr& context);
+
void IncrementQueueSize(const TServiceBase::TServiceContextPtr& context);
void DecrementQueueSize(const TServiceBase::TServiceContextPtr& context);
- int IncrementConcurrency();
- void DecrementConcurrency();
+ bool IncrementConcurrency(const TServiceBase::TServiceContextPtr& context);
+ void DecrementConcurrency(i64 requestTotalSize);
bool AreThrottlersOverdrafted() const;
void AcquireThrottlers(const TServiceBase::TServiceContextPtr& context);
void SubscribeToThrottlers();
void OnConcurrencyLimitChanged();
+ void OnConcurrencyByteLimitChanged();
};
DEFINE_REFCOUNTED_TYPE(TRequestQueue)