diff options
author | eshcherbin <eshcherbin@yandex-team.com> | 2024-12-05 08:59:43 +0300 |
---|---|---|
committer | eshcherbin <eshcherbin@yandex-team.com> | 2024-12-05 09:16:50 +0300 |
commit | d2daa783976633567d093f5c5cb8051c4640f680 (patch) | |
tree | faad55bfe327bced5340648e210c00f493162bdb | |
parent | f26fc9e45457ec31e54690217edb214bec682fc9 (diff) | |
download | ydb-d2daa783976633567d093f5c5cb8051c4640f680.tar.gz |
Polish concurrency byte limit in RPC service
commit_hash:a43c8d9b79d59636f590dafcc4dda7e69ee4759d
-rw-r--r-- | yt/yt/core/rpc/service_detail-inl.h | 32 | ||||
-rw-r--r-- | yt/yt/core/rpc/service_detail.cpp | 94 | ||||
-rw-r--r-- | yt/yt/core/rpc/service_detail.h | 43 |
3 files changed, 64 insertions, 105 deletions
diff --git a/yt/yt/core/rpc/service_detail-inl.h b/yt/yt/core/rpc/service_detail-inl.h index 2566351616..7083179260 100644 --- a/yt/yt/core/rpc/service_detail-inl.h +++ b/yt/yt/core/rpc/service_detail-inl.h @@ -10,6 +10,38 @@ namespace NYT::NRpc { //////////////////////////////////////////////////////////////////////////////// +template <class TValue> +void TDynamicConcurrencyLimit<TValue>::Reconfigure(TValue limit) +{ + ConfigLimit_.store(limit, std::memory_order::relaxed); + SetDynamicLimit(limit); +} + +template <class TValue> +TValue TDynamicConcurrencyLimit<TValue>::GetLimitFromConfiguration() const +{ + return ConfigLimit_.load(std::memory_order::relaxed); +} + +template <class TValue> +TValue TDynamicConcurrencyLimit<TValue>::GetDynamicLimit() const +{ + return DynamicLimit_.load(std::memory_order::relaxed); +} + +template <class TValue> +void TDynamicConcurrencyLimit<TValue>::SetDynamicLimit(std::optional<TValue> dynamicLimit) +{ + auto limit = dynamicLimit.has_value() ? *dynamicLimit : ConfigLimit_.load(std::memory_order::relaxed); + auto oldLimit = DynamicLimit_.exchange(limit, std::memory_order::relaxed); + + if (oldLimit != limit) { + Updated_.Fire(); + } +} + +//////////////////////////////////////////////////////////////////////////////// + template <class E> void TServiceBase::DeclareServerFeature(E featureId) { diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp index 0fa6cdab4b..eaad118052 100644 --- a/yt/yt/core/rpc/service_detail.cpp +++ b/yt/yt/core/rpc/service_detail.cpp @@ -113,62 +113,6 @@ THandlerInvocationOptions THandlerInvocationOptions::SetResponseCodec(NCompressi //////////////////////////////////////////////////////////////////////////////// -void TDynamicConcurrencyLimit::Reconfigure(int limit) -{ - ConfigLimit_.store(limit, std::memory_order::relaxed); - SetDynamicLimit(limit); -} - -int TDynamicConcurrencyLimit::GetLimitFromConfiguration() const -{ - return ConfigLimit_.load(std::memory_order::relaxed); -} - -int TDynamicConcurrencyLimit::GetDynamicLimit() const -{ - return DynamicLimit_.load(std::memory_order::relaxed); -} - -void TDynamicConcurrencyLimit::SetDynamicLimit(std::optional<int> dynamicLimit) -{ - auto limit = dynamicLimit.has_value() ? *dynamicLimit : ConfigLimit_.load(std::memory_order::relaxed); - auto oldLimit = DynamicLimit_.exchange(limit, std::memory_order::relaxed); - - if (oldLimit != limit) { - Updated_.Fire(); - } -} - -//////////////////////////////////////////////////////////////////////////////// - -void TDynamicConcurrencyByteLimit::Reconfigure(i64 limit) -{ - ConfigByteLimit_.store(limit, std::memory_order::relaxed); - SetDynamicByteLimit(limit); -} - -i64 TDynamicConcurrencyByteLimit::GetByteLimitFromConfiguration() const -{ - return ConfigByteLimit_.load(std::memory_order::relaxed); -} - -i64 TDynamicConcurrencyByteLimit::GetDynamicByteLimit() const -{ - return DynamicByteLimit_.load(std::memory_order::relaxed); -} - -void TDynamicConcurrencyByteLimit::SetDynamicByteLimit(std::optional<i64> dynamicLimit) -{ - auto limit = dynamicLimit.has_value() ? *dynamicLimit : ConfigByteLimit_.load(std::memory_order::relaxed); - auto oldLimit = DynamicByteLimit_.exchange(limit, std::memory_order::relaxed); - - if (oldLimit != limit) { - Updated_.Fire(); - } -} - -//////////////////////////////////////////////////////////////////////////////// - TServiceBase::TMethodDescriptor::TMethodDescriptor( TString method, TLiteHandler liteHandler, @@ -1127,7 +1071,7 @@ private: } if (RequestRun_) { - RequestQueue_->OnRequestFinished(TotalSize_); + RequestQueue_->OnRequestFinished(GetTotalSize()); } if (ActiveRequestCountIncremented_) { @@ -1474,20 +1418,17 @@ i64 TRequestQueue::GetConcurrencyByte() const return ConcurrencyByte_.load(std::memory_order::relaxed); } -void TRequestQueue::OnRequestArrived(const TServiceBase::TServiceContextPtr& context) +void TRequestQueue::OnRequestArrived(TServiceBase::TServiceContextPtr context) { // Fast path. - auto concurrencyExceeded = IncrementConcurrency(context); - if (concurrencyExceeded && - !AreThrottlersOverdrafted()) - { + if (IncrementConcurrency(context->GetTotalSize()) && !AreThrottlersOverdrafted()) { RunRequest(std::move(context)); return; } // Slow path. DecrementConcurrency(context->GetTotalSize()); - IncrementQueueSize(context); + IncrementQueueSize(context->GetTotalSize()); context->BeforeEnqueued(); YT_VERIFY(Queue_.enqueue(std::move(context))); @@ -1529,7 +1470,7 @@ void TRequestQueue::ScheduleRequestsFromQueue() // NB: Racy, may lead to overcommit in concurrency semaphore and request bytes throttler. auto concurrencyLimit = RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit(); - auto concurrencyByteLimit = RuntimeInfo_->ConcurrencyByteLimit.GetDynamicByteLimit(); + auto concurrencyByteLimit = RuntimeInfo_->ConcurrencyByteLimit.GetDynamicLimit(); while (QueueSize_.load() > 0 && Concurrency_.load() < concurrencyLimit && ConcurrencyByte_.load() < concurrencyByteLimit) { if (AreThrottlersOverdrafted()) { SubscribeToThrottlers(); @@ -1546,14 +1487,14 @@ void TRequestQueue::ScheduleRequestsFromQueue() return; } - DecrementQueueSize(context); + DecrementQueueSize(context->GetTotalSize()); context->AfterDequeued(); if (context->IsCanceled()) { context.Reset(); } } - IncrementConcurrency(context); + IncrementConcurrency(context->GetTotalSize()); RunRequest(std::move(context)); } } @@ -1579,26 +1520,29 @@ void TRequestQueue::RunRequest(TServiceBase::TServiceContextPtr context) } } -void TRequestQueue::IncrementQueueSize(const TServiceBase::TServiceContextPtr& context) +void TRequestQueue::IncrementQueueSize(i64 requestTotalSize) { ++QueueSize_; - QueueByteSize_.fetch_add(context->GetTotalSize()); + QueueByteSize_.fetch_add(requestTotalSize); } -void TRequestQueue::DecrementQueueSize(const TServiceBase::TServiceContextPtr& context) +void TRequestQueue::DecrementQueueSize(i64 requestTotalSize) { auto newQueueSize = --QueueSize_; - auto oldQueueByteSize = QueueByteSize_.fetch_sub(context->GetTotalSize()); + auto oldQueueByteSize = QueueByteSize_.fetch_sub(requestTotalSize); YT_ASSERT(newQueueSize >= 0); YT_ASSERT(oldQueueByteSize >= 0); } -bool TRequestQueue::IncrementConcurrency(const TServiceBase::TServiceContextPtr& context) +// Returns true if concurrency limits are not exceeded. +bool TRequestQueue::IncrementConcurrency(i64 requestTotalSize) { - auto resultSize = ++Concurrency_ <= RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit(); - auto resultByteSize = ConcurrencyByte_.fetch_add(context->GetTotalSize()) <= RuntimeInfo_->ConcurrencyByteLimit.GetDynamicByteLimit(); - return resultSize && resultByteSize; + auto newConcurrencySemaphore = ++Concurrency_; + auto newConcurrencyByteSemaphore = ConcurrencyByte_.fetch_add(requestTotalSize); + + return newConcurrencySemaphore <= RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit() && + newConcurrencyByteSemaphore <= RuntimeInfo_->ConcurrencyByteLimit.GetDynamicLimit(); } void TRequestQueue::DecrementConcurrency(i64 requestTotalSize) @@ -2626,7 +2570,7 @@ TServiceBase::TRuntimeMethodInfoPtr TServiceBase::RegisterMethod(const TMethodDe return runtimeInfo->ConcurrencyLimit.GetDynamicLimit(); }); profiler.AddFuncGauge("/concurrency_byte_limit", MakeStrong(this), [=] { - return runtimeInfo->ConcurrencyByteLimit.GetDynamicByteLimit(); + return runtimeInfo->ConcurrencyByteLimit.GetDynamicLimit(); }); return runtimeInfo; diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h index bba163d581..eeb5f137ba 100644 --- a/yt/yt/core/rpc/service_detail.h +++ b/yt/yt/core/rpc/service_detail.h @@ -491,38 +491,21 @@ TRequestQueuePtr CreateRequestQueue( //////////////////////////////////////////////////////////////////////////////// +template <class TValue> class TDynamicConcurrencyLimit { public: DEFINE_SIGNAL(void(), Updated); - void Reconfigure(int limit); - int GetLimitFromConfiguration() const; + void Reconfigure(TValue limit); + TValue GetLimitFromConfiguration() const; - int GetDynamicLimit() const; - void SetDynamicLimit(std::optional<int> dynamicLimit); + TValue GetDynamicLimit() const; + void SetDynamicLimit(std::optional<TValue> dynamicLimit); private: - std::atomic<int> ConfigLimit_ = 0; - std::atomic<int> DynamicLimit_ = 0; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TDynamicConcurrencyByteLimit -{ -public: - DEFINE_SIGNAL(void(), Updated); - - void Reconfigure(i64 limit); - i64 GetByteLimitFromConfiguration() const; - - i64 GetDynamicByteLimit() const; - void SetDynamicByteLimit(std::optional<i64> dynamicLimit); - -private: - std::atomic<i64> ConfigByteLimit_ = 0; - std::atomic<i64> DynamicByteLimit_ = 0; + std::atomic<TValue> ConfigLimit_{}; + std::atomic<TValue> DynamicLimit_{}; }; //////////////////////////////////////////////////////////////////////////////// @@ -771,8 +754,8 @@ protected: std::atomic<int> QueueSizeLimit = 0; std::atomic<i64> QueueByteSizeLimit = 0; - TDynamicConcurrencyLimit ConcurrencyLimit; - TDynamicConcurrencyByteLimit ConcurrencyByteLimit; + TDynamicConcurrencyLimit<int> ConcurrencyLimit; + TDynamicConcurrencyLimit<i64> ConcurrencyByteLimit; std::atomic<double> WaitingTimeoutFraction = 0; NProfiling::TCounter RequestQueueSizeLimitErrorCounter; @@ -1127,7 +1110,7 @@ public: int GetConcurrency() const; i64 GetConcurrencyByte() const; - void OnRequestArrived(const TServiceBase::TServiceContextPtr& context); + void OnRequestArrived(TServiceBase::TServiceContextPtr context); void OnRequestFinished(i64 requestTotalSize); void ConfigureWeightThrottler(const NConcurrency::TThroughputThrottlerConfigPtr& config); @@ -1168,10 +1151,10 @@ private: void ScheduleRequestsFromQueue(); void RunRequest(TServiceBase::TServiceContextPtr context); - void IncrementQueueSize(const TServiceBase::TServiceContextPtr& context); - void DecrementQueueSize(const TServiceBase::TServiceContextPtr& context); + void IncrementQueueSize(i64 requestTotalSize); + void DecrementQueueSize(i64 requestTotalSize); - bool IncrementConcurrency(const TServiceBase::TServiceContextPtr& context); + bool IncrementConcurrency(i64 requestTotalSize); void DecrementConcurrency(i64 requestTotalSize); bool AreThrottlersOverdrafted() const; |