diff options
author | tea-mur <tea-mur@yandex-team.com> | 2025-07-29 10:53:24 +0300 |
---|---|---|
committer | tea-mur <tea-mur@yandex-team.com> | 2025-07-29 11:09:47 +0300 |
commit | 7205ec3af9a64418b46f5adf260678d2b3c24980 (patch) | |
tree | cd0b62f66cb8b601ac6b2f0cffed66fef6ece3c1 | |
parent | 8e6de60fcf5d5cd206adf0e4eb104e456b52fdb8 (diff) | |
download | ydb-7205ec3af9a64418b46f5adf260678d2b3c24980.tar.gz |
YT-25605: Limit and guarantee metrics for buckets of fair throttler
commit_hash:4ccd8df07b5ca5199beb598a6ef033784552f873
-rw-r--r-- | yt/yt/core/concurrency/fair_throttler.cpp | 53 |
1 files changed, 36 insertions, 17 deletions
diff --git a/yt/yt/core/concurrency/fair_throttler.cpp b/yt/yt/core/concurrency/fair_throttler.cpp index 1d3f1edeb41..34559da8474 100644 --- a/yt/yt/core/concurrency/fair_throttler.cpp +++ b/yt/yt/core/concurrency/fair_throttler.cpp @@ -345,17 +345,17 @@ public: TFairThrottlerConfigPtr config) : Logger(logger) , SharedBucket_(sharedBucket) - , Value_(profiler.Counter("/value")) - , Released_(profiler.Counter("/released")) - , WaitTime_(profiler.Timer("/wait_time")) - , Quota_(config->BucketAccumulationTicks, profiler.Gauge("/quota")) - , DistributionPeriod_(config->DistributionPeriod) + , Profiler_(profiler) + , Value_(Profiler_.Counter("/value")) + , Released_(Profiler_.Counter("/released")) + , WaitTime_(Profiler_.Timer("/wait_time")) + , Quota_(config->BucketAccumulationTicks, Profiler_.Gauge("/quota")) { - profiler.AddFuncGauge("/queue_size", MakeStrong(this), [this] { + Profiler_.AddFuncGauge("/queue_size", MakeStrong(this), [this] { return GetQueueTotalAmount(); }); - profiler.AddFuncGauge("/throttled", MakeStrong(this), [this] { + Profiler_.AddFuncGauge("/throttled", MakeStrong(this), [this] { return IsOverdraft(); }); } @@ -568,14 +568,30 @@ public: } } - void SetDistributionPeriod(TDuration distributionPeriod) + void Update( + const TFairThrottlerConfigPtr& config, + const TFairThrottlerBucketConfigPtr& bucketConfig) { - DistributionPeriod_.store(distributionPeriod); - } + Limited_ = bucketConfig->Limit || bucketConfig->RelativeLimit; + DistributionPeriod_.store(config->DistributionPeriod); - void SetLimited(bool limited) - { - Limited_ = limited; + if (const auto& limit = bucketConfig->GetLimit(config->TotalLimit)) { + if (!Limit_) { + Limit_ = Profiler_.Gauge("/limit"); + } + Limit_->Update(limit.value()); + } else { + Limit_ = std::nullopt; + } + + if (const auto& guarantee = bucketConfig->GetGuarantee(config->TotalLimit)) { + if (!Guarantee_) { + Guarantee_ = Profiler_.Gauge("/guarantee"); + } + Guarantee_->Update(guarantee.value()); + } else { + Guarantee_ = std::nullopt; + } } bool IsLimited() const @@ -588,9 +604,12 @@ private: TSharedBucketPtr SharedBucket_; + NProfiling::TProfiler Profiler_; NProfiling::TCounter Value_; NProfiling::TCounter Released_; NProfiling::TEventTimer WaitTime_; + std::optional<NProfiling::TGauge> Limit_ = std::nullopt; + std::optional<NProfiling::TGauge> Guarantee_ = std::nullopt; TLeakyCounter Quota_; std::atomic<i64> EstimatedLimit_ = 0; @@ -599,7 +618,7 @@ private: std::atomic<bool> Limited_ = false; - std::atomic<TDuration> DistributionPeriod_; + std::atomic<TDuration> DistributionPeriod_{}; YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock_); std::deque<TBucketThrottleRequestPtr> Queue_; @@ -655,10 +674,10 @@ IThroughputThrottlerPtr TFairThrottler::CreateBucketThrottler( auto guard = Guard(Lock_); if (auto it = Buckets_.find(name); it != Buckets_.end()) { - it->second.Throttler->SetLimited(config->Limit || config->RelativeLimit); + it->second.Throttler->Update(Config_, config); it->second.Config = std::move(config); - it->second.Throttler->SetDistributionPeriod(Config_->DistributionPeriod); + return it->second.Throttler; } @@ -668,7 +687,7 @@ IThroughputThrottlerPtr TFairThrottler::CreateBucketThrottler( SharedBucket_, Config_); - throttler->SetLimited(config->Limit || config->RelativeLimit); + throttler->Update(Config_, config); IIpcBucketPtr state; if (Ipc_) { |