diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-03-01 20:44:25 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-03-01 20:52:43 +0300 |
commit | e3bebc56da615c637dc555747e3f407b6084e393 (patch) | |
tree | 802960e65746957d1a831ae64d6dc8805f06860a | |
parent | a388d4a51a6512436615d1006ddb75a623852ce2 (diff) | |
download | ydb-e3bebc56da615c637dc555747e3f407b6084e393.tar.gz |
Intermediate changes
-rw-r--r-- | yt/yt/core/rpc/config.cpp | 3 | ||||
-rw-r--r-- | yt/yt/core/rpc/config.h | 1 | ||||
-rw-r--r-- | yt/yt/core/rpc/service_detail.cpp | 111 | ||||
-rw-r--r-- | yt/yt/core/rpc/service_detail.h | 36 |
4 files changed, 122 insertions, 29 deletions
diff --git a/yt/yt/core/rpc/config.cpp b/yt/yt/core/rpc/config.cpp index 01e27c3649..e715e9b4e4 100644 --- a/yt/yt/core/rpc/config.cpp +++ b/yt/yt/core/rpc/config.cpp @@ -104,6 +104,9 @@ void TMethodConfig::Register(TRegistrar registrar) registrar.Parameter("concurrency_limit", &TThis::ConcurrencyLimit) .Alias("max_concurrency") .Optional(); + registrar.Parameter("concurrency_byte_limit", &TThis::ConcurrencyByteLimit) + .Alias("max_concurrency_byte") + .Optional(); registrar.Parameter("log_level", &TThis::LogLevel) .Optional(); registrar.Parameter("request_bytes_throttler", &TThis::RequestBytesThrottler) diff --git a/yt/yt/core/rpc/config.h b/yt/yt/core/rpc/config.h index 8a5a14375d..707604e6f5 100644 --- a/yt/yt/core/rpc/config.h +++ b/yt/yt/core/rpc/config.h @@ -154,6 +154,7 @@ public: std::optional<int> QueueSizeLimit; std::optional<i64> QueueByteSizeLimit; std::optional<int> ConcurrencyLimit; + std::optional<i64> ConcurrencyByteLimit; std::optional<NLogging::ELogLevel> LogLevel; std::optional<TDuration> LoggingSuppressionTimeout; NConcurrency::TThroughputThrottlerConfigPtr RequestBytesThrottler; diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp index f99333ac73..0b896c8bf4 100644 --- a/yt/yt/core/rpc/service_detail.cpp +++ b/yt/yt/core/rpc/service_detail.cpp @@ -115,6 +115,34 @@ void TDynamicConcurrencyLimit::SetDynamicLimit(std::optional<int> dynamicLimit) //////////////////////////////////////////////////////////////////////////////// +void TDynamicConcurrencyByteLimit::Reconfigure(i64 limit) +{ + ConfigByteLimit_.store(limit, std::memory_order::relaxed); + SetDynamicByteLimit(limit); +} + +i64 TDynamicConcurrencyByteLimit::GetByteLimitFromConfiguration() const +{ + return ConfigByteLimit_.load(std::memory_order::relaxed); +} + +i64 TDynamicConcurrencyByteLimit::GetDynamicByteLimit() const +{ + return DynamicByteLimit_.load(std::memory_order::relaxed); +} + +void TDynamicConcurrencyByteLimit::SetDynamicByteLimit(std::optional<i64> dynamicLimit) +{ + auto limit = dynamicLimit.has_value() ? *dynamicLimit : ConfigByteLimit_.load(std::memory_order::relaxed); + auto oldLimit = DynamicByteLimit_.exchange(limit, std::memory_order::relaxed); + + if (oldLimit != limit) { + Updated_.Fire(); + } +} + +//////////////////////////////////////////////////////////////////////////////// + TServiceBase::TMethodDescriptor::TMethodDescriptor( TString method, TLiteHandler liteHandler, @@ -180,6 +208,13 @@ auto TServiceBase::TMethodDescriptor::SetConcurrencyLimit(int value) const -> TM return result; } +auto TServiceBase::TMethodDescriptor::SetConcurrencyByteLimit(i64 value) const -> TMethodDescriptor +{ + auto result = *this; + result.ConcurrencyByteLimit = value; + return result; +} + auto TServiceBase::TMethodDescriptor::SetSystem(bool value) const -> TMethodDescriptor { auto result = *this; @@ -1042,16 +1077,16 @@ private: } if (RequestRun_) { - RequestQueue_->OnRequestFinished(); + auto requestTotalSize = GetMessageBodySize(RequestMessage_) + + GetTotalMessageAttachmentSize(RequestMessage_); + RequestQueue_->OnRequestFinished(requestTotalSize); } - if (ActiveRequestCountIncremented_) { Service_->DecrementActiveRequestCount(); } } - void LogRequest() override { TStringBuilder builder; @@ -1298,6 +1333,10 @@ bool TRequestQueue::Register(TServiceBase* service, TServiceBase::TRuntimeMethod RuntimeInfo_->ConcurrencyLimit.SubscribeUpdated(BIND( &TRequestQueue::OnConcurrencyLimitChanged, MakeWeak(this))); + + RuntimeInfo_->ConcurrencyByteLimit.SubscribeUpdated(BIND( + &TRequestQueue::OnConcurrencyByteLimitChanged, + MakeWeak(this))); } Registered_.store(true, std::memory_order::release); } @@ -1312,6 +1351,13 @@ void TRequestQueue::OnConcurrencyLimitChanged() } } +void TRequestQueue::OnConcurrencyByteLimitChanged() +{ + if (QueueByteSize_.load() > 0) { + ScheduleRequestsFromQueue(); + } +} + void TRequestQueue::TRequestThrottler::Reconfigure(const TThroughputThrottlerConfigPtr& config) { Throttler->Reconfigure(config ? config : New<TThroughputThrottlerConfig>()); @@ -1369,11 +1415,16 @@ int TRequestQueue::GetConcurrency() const return Concurrency_.load(std::memory_order::relaxed); } -void TRequestQueue::OnRequestArrived(TServiceBase::TServiceContextPtr context) +i64 TRequestQueue::GetConcurrencyByte() const +{ + return ConcurrencyByte_.load(std::memory_order::relaxed); +} + +void TRequestQueue::OnRequestArrived(const TServiceBase::TServiceContextPtr& context) { // Fast path. - auto newConcurrencySemaphore = IncrementConcurrency(); - if (newConcurrencySemaphore <= RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit() && + auto concurrencyExceeded = IncrementConcurrency(context); + if (concurrencyExceeded && !AreThrottlersOverdrafted()) { RunRequest(std::move(context)); @@ -1381,7 +1432,7 @@ void TRequestQueue::OnRequestArrived(TServiceBase::TServiceContextPtr context) } // Slow path. - DecrementConcurrency(); + DecrementConcurrency(GetTotalRequestSize(context)); IncrementQueueSize(context); context->BeforeEnqueued(); @@ -1390,9 +1441,9 @@ void TRequestQueue::OnRequestArrived(TServiceBase::TServiceContextPtr context) ScheduleRequestsFromQueue(); } -void TRequestQueue::OnRequestFinished() +void TRequestQueue::OnRequestFinished(i64 requestTotalSize) { - DecrementConcurrency(); + DecrementConcurrency(requestTotalSize); if (QueueSize_.load() > 0) { // Slow path. @@ -1447,7 +1498,7 @@ void TRequestQueue::ScheduleRequestsFromQueue() } } - IncrementConcurrency(); + IncrementConcurrency(context); RunRequest(std::move(context)); } } @@ -1474,34 +1525,38 @@ void TRequestQueue::RunRequest(TServiceBase::TServiceContextPtr context) void TRequestQueue::IncrementQueueSize(const TServiceBase::TServiceContextPtr& context) { ++QueueSize_; - - auto requestSize = - GetMessageBodySize(context->GetRequestMessage()) + - GetTotalMessageAttachmentSize(context->GetRequestMessage()); - QueueByteSize_ += requestSize; + QueueByteSize_.fetch_add(GetTotalRequestSize(context)); } void TRequestQueue::DecrementQueueSize(const TServiceBase::TServiceContextPtr& context) { auto newQueueSize = --QueueSize_; + auto oldQueueBytesSize = QueueByteSize_.fetch_sub(GetTotalRequestSize(context)); + YT_ASSERT(newQueueSize >= 0); + YT_ASSERT(oldQueueBytesSize >= 0); +} - auto requestSize = - GetMessageBodySize(context->GetRequestMessage()) + +i64 TRequestQueue::GetTotalRequestSize(const TServiceBase::TServiceContextPtr& context) +{ + return GetMessageBodySize(context->GetRequestMessage()) + GetTotalMessageAttachmentSize(context->GetRequestMessage()); - auto oldQueueBytesSize = QueueByteSize_.fetch_sub(requestSize); - YT_ASSERT(oldQueueBytesSize >= requestSize); } -int TRequestQueue::IncrementConcurrency() +bool TRequestQueue::IncrementConcurrency(const TServiceBase::TServiceContextPtr& context) { - return ++Concurrency_; + auto resultSize = ++Concurrency_ <= RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit(); + auto resultByteSize = ConcurrencyByte_.fetch_add(GetTotalRequestSize(context)) <= RuntimeInfo_->ConcurrencyByteLimit.GetDynamicByteLimit(); + return resultSize && resultByteSize; } -void TRequestQueue::DecrementConcurrency() +void TRequestQueue::DecrementConcurrency(i64 requestTotalSize) { auto newConcurrencySemaphore = --Concurrency_; + auto newConcurrencyByteSemaphore = ConcurrencyByte_.fetch_sub(requestTotalSize); + YT_ASSERT(newConcurrencySemaphore >= 0); + YT_ASSERT(newConcurrencyByteSemaphore >= 0); } bool TRequestQueue::AreThrottlersOverdrafted() const @@ -1516,9 +1571,7 @@ void TRequestQueue::AcquireThrottlers(const TServiceBase::TServiceContextPtr& co { if (BytesThrottler_.Specified.load(std::memory_order::acquire)) { // Slow path. - auto requestSize = - GetMessageBodySize(context->GetRequestMessage()) + - GetTotalMessageAttachmentSize(context->GetRequestMessage()); + auto requestSize = GetTotalRequestSize(context); BytesThrottler_.Throttler->Acquire(requestSize); } if (WeightThrottler_.Specified.load(std::memory_order::acquire)) { @@ -1880,6 +1933,9 @@ void TServiceBase::RegisterRequestQueue( profiler.AddFuncGauge("/concurrency", MakeStrong(this), [=] { return requestQueue->GetConcurrency(); }); + profiler.AddFuncGauge("/concurrency_byte", MakeStrong(this), [=] { + return requestQueue->GetConcurrencyByte(); + }); TMethodConfigPtr methodConfig; if (auto config = Config_.Acquire()) { @@ -2459,6 +2515,7 @@ TServiceBase::TRuntimeMethodInfoPtr TServiceBase::RegisterMethod(const TMethodDe runtimeInfo->QueueSizeLimit.store(descriptor.QueueSizeLimit); runtimeInfo->QueueByteSizeLimit.store(descriptor.QueueByteSizeLimit); runtimeInfo->ConcurrencyLimit.Reconfigure(descriptor.ConcurrencyLimit); + runtimeInfo->ConcurrencyByteLimit.Reconfigure(descriptor.ConcurrencyByteLimit); runtimeInfo->LogLevel.store(descriptor.LogLevel); runtimeInfo->LoggingSuppressionTimeout.store(descriptor.LoggingSuppressionTimeout); @@ -2475,6 +2532,9 @@ TServiceBase::TRuntimeMethodInfoPtr TServiceBase::RegisterMethod(const TMethodDe profiler.AddFuncGauge("/concurrency_limit", MakeStrong(this), [=] { return runtimeInfo->ConcurrencyLimit.GetDynamicLimit(); }); + profiler.AddFuncGauge("/concurrency_byte_limit", MakeStrong(this), [=] { + return runtimeInfo->ConcurrencyByteLimit.GetDynamicByteLimit(); + }); return runtimeInfo; } @@ -2537,6 +2597,7 @@ void TServiceBase::DoConfigure( runtimeInfo->QueueSizeLimit.store(methodConfig->QueueSizeLimit.value_or(descriptor.QueueSizeLimit)); runtimeInfo->QueueByteSizeLimit.store(methodConfig->QueueByteSizeLimit.value_or(descriptor.QueueByteSizeLimit)); runtimeInfo->ConcurrencyLimit.Reconfigure(methodConfig->ConcurrencyLimit.value_or(descriptor.ConcurrencyLimit)); + runtimeInfo->ConcurrencyByteLimit.Reconfigure(methodConfig->ConcurrencyByteLimit.value_or(descriptor.ConcurrencyByteLimit)); runtimeInfo->LogLevel.store(methodConfig->LogLevel.value_or(descriptor.LogLevel)); runtimeInfo->LoggingSuppressionTimeout.store(methodConfig->LoggingSuppressionTimeout.value_or(descriptor.LoggingSuppressionTimeout)); runtimeInfo->Pooled.store(methodConfig->Pooled.value_or(config->Pooled.value_or(descriptor.Pooled))); diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h index dae71d0c1e..a388116d67 100644 --- a/yt/yt/core/rpc/service_detail.h +++ b/yt/yt/core/rpc/service_detail.h @@ -476,6 +476,24 @@ private: //////////////////////////////////////////////////////////////////////////////// +class TDynamicConcurrencyByteLimit +{ +public: + DEFINE_SIGNAL(void(), Updated); + + void Reconfigure(i64 limit); + i64 GetByteLimitFromConfiguration() const; + + i64 GetDynamicByteLimit() const; + void SetDynamicByteLimit(std::optional<i64> dynamicLimit); + +private: + std::atomic<i64> ConfigByteLimit_ = 0; + std::atomic<i64> DynamicByteLimit_ = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + //! Provides a base for implementing IService. class TServiceBase : public virtual IService @@ -566,6 +584,9 @@ protected: //! Maximum number of requests executing concurrently. int ConcurrencyLimit = 10'000; + //! Maximum total size of requests executing concurrently. + i64 ConcurrencyByteLimit = 4_GB; + //! System requests are completely transparent to derived classes; //! in particular, |BeforeInvoke| is not called. //! Also system methods do not require authentication. @@ -605,6 +626,7 @@ protected: TMethodDescriptor SetQueueSizeLimit(int value) const; TMethodDescriptor SetQueueByteSizeLimit(i64 value) const; TMethodDescriptor SetConcurrencyLimit(int value) const; + TMethodDescriptor SetConcurrencyByteLimit(i64 value) const; TMethodDescriptor SetSystem(bool value) const; TMethodDescriptor SetLogLevel(NLogging::ELogLevel value) const; TMethodDescriptor SetLoggingSuppressionTimeout(TDuration value) const; @@ -711,6 +733,7 @@ protected: std::atomic<i64> QueueByteSizeLimit = 0; TDynamicConcurrencyLimit ConcurrencyLimit; + TDynamicConcurrencyByteLimit ConcurrencyByteLimit; std::atomic<double> WaitingTimeoutFraction = 0; NProfiling::TCounter RequestQueueSizeLimitErrorCounter; @@ -1037,9 +1060,10 @@ public: int GetQueueSize() const; i64 GetQueueByteSize() const; int GetConcurrency() const; + i64 GetConcurrencyByte() const; - void OnRequestArrived(TServiceBase::TServiceContextPtr context); - void OnRequestFinished(); + void OnRequestArrived(const TServiceBase::TServiceContextPtr& context); + void OnRequestFinished(i64 requestTotalSize); void ConfigureWeightThrottler(const NConcurrency::TThroughputThrottlerConfigPtr& config); void ConfigureBytesThrottler(const NConcurrency::TThroughputThrottlerConfigPtr& config); @@ -1054,6 +1078,7 @@ private: TServiceBase::TRuntimeMethodInfo* RuntimeInfo_ = nullptr; std::atomic<int> Concurrency_ = 0; + std::atomic<i64> ConcurrencyByte_ = 0; struct TRequestThrottler { @@ -1075,17 +1100,20 @@ private: void ScheduleRequestsFromQueue(); void RunRequest(TServiceBase::TServiceContextPtr context); + i64 GetTotalRequestSize(const TServiceBase::TServiceContextPtr& context); + void IncrementQueueSize(const TServiceBase::TServiceContextPtr& context); void DecrementQueueSize(const TServiceBase::TServiceContextPtr& context); - int IncrementConcurrency(); - void DecrementConcurrency(); + bool IncrementConcurrency(const TServiceBase::TServiceContextPtr& context); + void DecrementConcurrency(i64 requestTotalSize); bool AreThrottlersOverdrafted() const; void AcquireThrottlers(const TServiceBase::TServiceContextPtr& context); void SubscribeToThrottlers(); void OnConcurrencyLimitChanged(); + void OnConcurrencyByteLimitChanged(); }; DEFINE_REFCOUNTED_TYPE(TRequestQueue) |