aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreshcherbin <eshcherbin@yandex-team.com>2024-12-05 08:59:43 +0300
committereshcherbin <eshcherbin@yandex-team.com>2024-12-05 09:16:50 +0300
commitd2daa783976633567d093f5c5cb8051c4640f680 (patch)
treefaad55bfe327bced5340648e210c00f493162bdb
parentf26fc9e45457ec31e54690217edb214bec682fc9 (diff)
downloadydb-d2daa783976633567d093f5c5cb8051c4640f680.tar.gz
Polish concurrency byte limit in RPC service
commit_hash:a43c8d9b79d59636f590dafcc4dda7e69ee4759d
-rw-r--r--yt/yt/core/rpc/service_detail-inl.h32
-rw-r--r--yt/yt/core/rpc/service_detail.cpp94
-rw-r--r--yt/yt/core/rpc/service_detail.h43
3 files changed, 64 insertions, 105 deletions
diff --git a/yt/yt/core/rpc/service_detail-inl.h b/yt/yt/core/rpc/service_detail-inl.h
index 2566351616..7083179260 100644
--- a/yt/yt/core/rpc/service_detail-inl.h
+++ b/yt/yt/core/rpc/service_detail-inl.h
@@ -10,6 +10,38 @@ namespace NYT::NRpc {
////////////////////////////////////////////////////////////////////////////////
+template <class TValue>
+void TDynamicConcurrencyLimit<TValue>::Reconfigure(TValue limit)
+{
+ ConfigLimit_.store(limit, std::memory_order::relaxed);
+ SetDynamicLimit(limit);
+}
+
+template <class TValue>
+TValue TDynamicConcurrencyLimit<TValue>::GetLimitFromConfiguration() const
+{
+ return ConfigLimit_.load(std::memory_order::relaxed);
+}
+
+template <class TValue>
+TValue TDynamicConcurrencyLimit<TValue>::GetDynamicLimit() const
+{
+ return DynamicLimit_.load(std::memory_order::relaxed);
+}
+
+template <class TValue>
+void TDynamicConcurrencyLimit<TValue>::SetDynamicLimit(std::optional<TValue> dynamicLimit)
+{
+ auto limit = dynamicLimit.has_value() ? *dynamicLimit : ConfigLimit_.load(std::memory_order::relaxed);
+ auto oldLimit = DynamicLimit_.exchange(limit, std::memory_order::relaxed);
+
+ if (oldLimit != limit) {
+ Updated_.Fire();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
template <class E>
void TServiceBase::DeclareServerFeature(E featureId)
{
diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp
index 0fa6cdab4b..eaad118052 100644
--- a/yt/yt/core/rpc/service_detail.cpp
+++ b/yt/yt/core/rpc/service_detail.cpp
@@ -113,62 +113,6 @@ THandlerInvocationOptions THandlerInvocationOptions::SetResponseCodec(NCompressi
////////////////////////////////////////////////////////////////////////////////
-void TDynamicConcurrencyLimit::Reconfigure(int limit)
-{
- ConfigLimit_.store(limit, std::memory_order::relaxed);
- SetDynamicLimit(limit);
-}
-
-int TDynamicConcurrencyLimit::GetLimitFromConfiguration() const
-{
- return ConfigLimit_.load(std::memory_order::relaxed);
-}
-
-int TDynamicConcurrencyLimit::GetDynamicLimit() const
-{
- return DynamicLimit_.load(std::memory_order::relaxed);
-}
-
-void TDynamicConcurrencyLimit::SetDynamicLimit(std::optional<int> dynamicLimit)
-{
- auto limit = dynamicLimit.has_value() ? *dynamicLimit : ConfigLimit_.load(std::memory_order::relaxed);
- auto oldLimit = DynamicLimit_.exchange(limit, std::memory_order::relaxed);
-
- if (oldLimit != limit) {
- Updated_.Fire();
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-void TDynamicConcurrencyByteLimit::Reconfigure(i64 limit)
-{
- ConfigByteLimit_.store(limit, std::memory_order::relaxed);
- SetDynamicByteLimit(limit);
-}
-
-i64 TDynamicConcurrencyByteLimit::GetByteLimitFromConfiguration() const
-{
- return ConfigByteLimit_.load(std::memory_order::relaxed);
-}
-
-i64 TDynamicConcurrencyByteLimit::GetDynamicByteLimit() const
-{
- return DynamicByteLimit_.load(std::memory_order::relaxed);
-}
-
-void TDynamicConcurrencyByteLimit::SetDynamicByteLimit(std::optional<i64> dynamicLimit)
-{
- auto limit = dynamicLimit.has_value() ? *dynamicLimit : ConfigByteLimit_.load(std::memory_order::relaxed);
- auto oldLimit = DynamicByteLimit_.exchange(limit, std::memory_order::relaxed);
-
- if (oldLimit != limit) {
- Updated_.Fire();
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
TServiceBase::TMethodDescriptor::TMethodDescriptor(
TString method,
TLiteHandler liteHandler,
@@ -1127,7 +1071,7 @@ private:
}
if (RequestRun_) {
- RequestQueue_->OnRequestFinished(TotalSize_);
+ RequestQueue_->OnRequestFinished(GetTotalSize());
}
if (ActiveRequestCountIncremented_) {
@@ -1474,20 +1418,17 @@ i64 TRequestQueue::GetConcurrencyByte() const
return ConcurrencyByte_.load(std::memory_order::relaxed);
}
-void TRequestQueue::OnRequestArrived(const TServiceBase::TServiceContextPtr& context)
+void TRequestQueue::OnRequestArrived(TServiceBase::TServiceContextPtr context)
{
// Fast path.
- auto concurrencyExceeded = IncrementConcurrency(context);
- if (concurrencyExceeded &&
- !AreThrottlersOverdrafted())
- {
+ if (IncrementConcurrency(context->GetTotalSize()) && !AreThrottlersOverdrafted()) {
RunRequest(std::move(context));
return;
}
// Slow path.
DecrementConcurrency(context->GetTotalSize());
- IncrementQueueSize(context);
+ IncrementQueueSize(context->GetTotalSize());
context->BeforeEnqueued();
YT_VERIFY(Queue_.enqueue(std::move(context)));
@@ -1529,7 +1470,7 @@ void TRequestQueue::ScheduleRequestsFromQueue()
// NB: Racy, may lead to overcommit in concurrency semaphore and request bytes throttler.
auto concurrencyLimit = RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit();
- auto concurrencyByteLimit = RuntimeInfo_->ConcurrencyByteLimit.GetDynamicByteLimit();
+ auto concurrencyByteLimit = RuntimeInfo_->ConcurrencyByteLimit.GetDynamicLimit();
while (QueueSize_.load() > 0 && Concurrency_.load() < concurrencyLimit && ConcurrencyByte_.load() < concurrencyByteLimit) {
if (AreThrottlersOverdrafted()) {
SubscribeToThrottlers();
@@ -1546,14 +1487,14 @@ void TRequestQueue::ScheduleRequestsFromQueue()
return;
}
- DecrementQueueSize(context);
+ DecrementQueueSize(context->GetTotalSize());
context->AfterDequeued();
if (context->IsCanceled()) {
context.Reset();
}
}
- IncrementConcurrency(context);
+ IncrementConcurrency(context->GetTotalSize());
RunRequest(std::move(context));
}
}
@@ -1579,26 +1520,29 @@ void TRequestQueue::RunRequest(TServiceBase::TServiceContextPtr context)
}
}
-void TRequestQueue::IncrementQueueSize(const TServiceBase::TServiceContextPtr& context)
+void TRequestQueue::IncrementQueueSize(i64 requestTotalSize)
{
++QueueSize_;
- QueueByteSize_.fetch_add(context->GetTotalSize());
+ QueueByteSize_.fetch_add(requestTotalSize);
}
-void TRequestQueue::DecrementQueueSize(const TServiceBase::TServiceContextPtr& context)
+void TRequestQueue::DecrementQueueSize(i64 requestTotalSize)
{
auto newQueueSize = --QueueSize_;
- auto oldQueueByteSize = QueueByteSize_.fetch_sub(context->GetTotalSize());
+ auto oldQueueByteSize = QueueByteSize_.fetch_sub(requestTotalSize);
YT_ASSERT(newQueueSize >= 0);
YT_ASSERT(oldQueueByteSize >= 0);
}
-bool TRequestQueue::IncrementConcurrency(const TServiceBase::TServiceContextPtr& context)
+// Returns true if concurrency limits are not exceeded.
+bool TRequestQueue::IncrementConcurrency(i64 requestTotalSize)
{
- auto resultSize = ++Concurrency_ <= RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit();
- auto resultByteSize = ConcurrencyByte_.fetch_add(context->GetTotalSize()) <= RuntimeInfo_->ConcurrencyByteLimit.GetDynamicByteLimit();
- return resultSize && resultByteSize;
+ auto newConcurrencySemaphore = ++Concurrency_;
+ auto newConcurrencyByteSemaphore = ConcurrencyByte_.fetch_add(requestTotalSize);
+
+ return newConcurrencySemaphore <= RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit() &&
+ newConcurrencyByteSemaphore <= RuntimeInfo_->ConcurrencyByteLimit.GetDynamicLimit();
}
void TRequestQueue::DecrementConcurrency(i64 requestTotalSize)
@@ -2626,7 +2570,7 @@ TServiceBase::TRuntimeMethodInfoPtr TServiceBase::RegisterMethod(const TMethodDe
return runtimeInfo->ConcurrencyLimit.GetDynamicLimit();
});
profiler.AddFuncGauge("/concurrency_byte_limit", MakeStrong(this), [=] {
- return runtimeInfo->ConcurrencyByteLimit.GetDynamicByteLimit();
+ return runtimeInfo->ConcurrencyByteLimit.GetDynamicLimit();
});
return runtimeInfo;
diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h
index bba163d581..eeb5f137ba 100644
--- a/yt/yt/core/rpc/service_detail.h
+++ b/yt/yt/core/rpc/service_detail.h
@@ -491,38 +491,21 @@ TRequestQueuePtr CreateRequestQueue(
////////////////////////////////////////////////////////////////////////////////
+template <class TValue>
class TDynamicConcurrencyLimit
{
public:
DEFINE_SIGNAL(void(), Updated);
- void Reconfigure(int limit);
- int GetLimitFromConfiguration() const;
+ void Reconfigure(TValue limit);
+ TValue GetLimitFromConfiguration() const;
- int GetDynamicLimit() const;
- void SetDynamicLimit(std::optional<int> dynamicLimit);
+ TValue GetDynamicLimit() const;
+ void SetDynamicLimit(std::optional<TValue> dynamicLimit);
private:
- std::atomic<int> ConfigLimit_ = 0;
- std::atomic<int> DynamicLimit_ = 0;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TDynamicConcurrencyByteLimit
-{
-public:
- DEFINE_SIGNAL(void(), Updated);
-
- void Reconfigure(i64 limit);
- i64 GetByteLimitFromConfiguration() const;
-
- i64 GetDynamicByteLimit() const;
- void SetDynamicByteLimit(std::optional<i64> dynamicLimit);
-
-private:
- std::atomic<i64> ConfigByteLimit_ = 0;
- std::atomic<i64> DynamicByteLimit_ = 0;
+ std::atomic<TValue> ConfigLimit_{};
+ std::atomic<TValue> DynamicLimit_{};
};
////////////////////////////////////////////////////////////////////////////////
@@ -771,8 +754,8 @@ protected:
std::atomic<int> QueueSizeLimit = 0;
std::atomic<i64> QueueByteSizeLimit = 0;
- TDynamicConcurrencyLimit ConcurrencyLimit;
- TDynamicConcurrencyByteLimit ConcurrencyByteLimit;
+ TDynamicConcurrencyLimit<int> ConcurrencyLimit;
+ TDynamicConcurrencyLimit<i64> ConcurrencyByteLimit;
std::atomic<double> WaitingTimeoutFraction = 0;
NProfiling::TCounter RequestQueueSizeLimitErrorCounter;
@@ -1127,7 +1110,7 @@ public:
int GetConcurrency() const;
i64 GetConcurrencyByte() const;
- void OnRequestArrived(const TServiceBase::TServiceContextPtr& context);
+ void OnRequestArrived(TServiceBase::TServiceContextPtr context);
void OnRequestFinished(i64 requestTotalSize);
void ConfigureWeightThrottler(const NConcurrency::TThroughputThrottlerConfigPtr& config);
@@ -1168,10 +1151,10 @@ private:
void ScheduleRequestsFromQueue();
void RunRequest(TServiceBase::TServiceContextPtr context);
- void IncrementQueueSize(const TServiceBase::TServiceContextPtr& context);
- void DecrementQueueSize(const TServiceBase::TServiceContextPtr& context);
+ void IncrementQueueSize(i64 requestTotalSize);
+ void DecrementQueueSize(i64 requestTotalSize);
- bool IncrementConcurrency(const TServiceBase::TServiceContextPtr& context);
+ bool IncrementConcurrency(i64 requestTotalSize);
void DecrementConcurrency(i64 requestTotalSize);
bool AreThrottlersOverdrafted() const;