aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortea-mur <tea-mur@yandex-team.com>2025-07-29 10:53:24 +0300
committertea-mur <tea-mur@yandex-team.com>2025-07-29 11:09:47 +0300
commit7205ec3af9a64418b46f5adf260678d2b3c24980 (patch)
treecd0b62f66cb8b601ac6b2f0cffed66fef6ece3c1
parent8e6de60fcf5d5cd206adf0e4eb104e456b52fdb8 (diff)
downloadydb-7205ec3af9a64418b46f5adf260678d2b3c24980.tar.gz
YT-25605: Limit and guarantee metrics for buckets of fair throttler
commit_hash:4ccd8df07b5ca5199beb598a6ef033784552f873
-rw-r--r--yt/yt/core/concurrency/fair_throttler.cpp53
1 files changed, 36 insertions, 17 deletions
diff --git a/yt/yt/core/concurrency/fair_throttler.cpp b/yt/yt/core/concurrency/fair_throttler.cpp
index 1d3f1edeb41..34559da8474 100644
--- a/yt/yt/core/concurrency/fair_throttler.cpp
+++ b/yt/yt/core/concurrency/fair_throttler.cpp
@@ -345,17 +345,17 @@ public:
TFairThrottlerConfigPtr config)
: Logger(logger)
, SharedBucket_(sharedBucket)
- , Value_(profiler.Counter("/value"))
- , Released_(profiler.Counter("/released"))
- , WaitTime_(profiler.Timer("/wait_time"))
- , Quota_(config->BucketAccumulationTicks, profiler.Gauge("/quota"))
- , DistributionPeriod_(config->DistributionPeriod)
+ , Profiler_(profiler)
+ , Value_(Profiler_.Counter("/value"))
+ , Released_(Profiler_.Counter("/released"))
+ , WaitTime_(Profiler_.Timer("/wait_time"))
+ , Quota_(config->BucketAccumulationTicks, Profiler_.Gauge("/quota"))
{
- profiler.AddFuncGauge("/queue_size", MakeStrong(this), [this] {
+ Profiler_.AddFuncGauge("/queue_size", MakeStrong(this), [this] {
return GetQueueTotalAmount();
});
- profiler.AddFuncGauge("/throttled", MakeStrong(this), [this] {
+ Profiler_.AddFuncGauge("/throttled", MakeStrong(this), [this] {
return IsOverdraft();
});
}
@@ -568,14 +568,30 @@ public:
}
}
- void SetDistributionPeriod(TDuration distributionPeriod)
+ void Update(
+ const TFairThrottlerConfigPtr& config,
+ const TFairThrottlerBucketConfigPtr& bucketConfig)
{
- DistributionPeriod_.store(distributionPeriod);
- }
+ Limited_ = bucketConfig->Limit || bucketConfig->RelativeLimit;
+ DistributionPeriod_.store(config->DistributionPeriod);
- void SetLimited(bool limited)
- {
- Limited_ = limited;
+ if (const auto& limit = bucketConfig->GetLimit(config->TotalLimit)) {
+ if (!Limit_) {
+ Limit_ = Profiler_.Gauge("/limit");
+ }
+ Limit_->Update(limit.value());
+ } else {
+ Limit_ = std::nullopt;
+ }
+
+ if (const auto& guarantee = bucketConfig->GetGuarantee(config->TotalLimit)) {
+ if (!Guarantee_) {
+ Guarantee_ = Profiler_.Gauge("/guarantee");
+ }
+ Guarantee_->Update(guarantee.value());
+ } else {
+ Guarantee_ = std::nullopt;
+ }
}
bool IsLimited() const
@@ -588,9 +604,12 @@ private:
TSharedBucketPtr SharedBucket_;
+ NProfiling::TProfiler Profiler_;
NProfiling::TCounter Value_;
NProfiling::TCounter Released_;
NProfiling::TEventTimer WaitTime_;
+ std::optional<NProfiling::TGauge> Limit_ = std::nullopt;
+ std::optional<NProfiling::TGauge> Guarantee_ = std::nullopt;
TLeakyCounter Quota_;
std::atomic<i64> EstimatedLimit_ = 0;
@@ -599,7 +618,7 @@ private:
std::atomic<bool> Limited_ = false;
- std::atomic<TDuration> DistributionPeriod_;
+ std::atomic<TDuration> DistributionPeriod_{};
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock_);
std::deque<TBucketThrottleRequestPtr> Queue_;
@@ -655,10 +674,10 @@ IThroughputThrottlerPtr TFairThrottler::CreateBucketThrottler(
auto guard = Guard(Lock_);
if (auto it = Buckets_.find(name); it != Buckets_.end()) {
- it->second.Throttler->SetLimited(config->Limit || config->RelativeLimit);
+ it->second.Throttler->Update(Config_, config);
it->second.Config = std::move(config);
- it->second.Throttler->SetDistributionPeriod(Config_->DistributionPeriod);
+
return it->second.Throttler;
}
@@ -668,7 +687,7 @@ IThroughputThrottlerPtr TFairThrottler::CreateBucketThrottler(
SharedBucket_,
Config_);
- throttler->SetLimited(config->Limit || config->RelativeLimit);
+ throttler->Update(Config_, config);
IIpcBucketPtr state;
if (Ipc_) {