diff options
author | akozhikhov <[email protected]> | 2023-08-16 18:58:55 +0300 |
---|---|---|
committer | akozhikhov <[email protected]> | 2023-08-16 21:03:12 +0300 |
commit | 9622542f7f4c0319a0aff3ffb089d974a1cdeaf2 (patch) | |
tree | 323d547693c2634d777f1e276c8cacf6606e6a0c | |
parent | e831a80e472c89fd300c90a88a90b19c55b98a11 (diff) |
Add release method for bandwidth throttler and use it in replication reader
-rw-r--r-- | yt/yt/core/concurrency/fair_throttler.cpp | 16 | ||||
-rw-r--r-- | yt/yt/core/concurrency/throughput_throttler.cpp | 60 | ||||
-rw-r--r-- | yt/yt/core/concurrency/throughput_throttler.h | 8 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp | 14 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp | 37 |
5 files changed, 134 insertions, 1 deletions
diff --git a/yt/yt/core/concurrency/fair_throttler.cpp b/yt/yt/core/concurrency/fair_throttler.cpp index a3b1d67578f..4d95b4a85ce 100644 --- a/yt/yt/core/concurrency/fair_throttler.cpp +++ b/yt/yt/core/concurrency/fair_throttler.cpp @@ -345,6 +345,7 @@ public: : Logger(logger) , SharedBucket_(sharedBucket) , Value_(profiler.Counter("/value")) + , Released_(profiler.Counter("/released")) , WaitTime_(profiler.Timer("/wait_time")) , Quota_(config->BucketAccumulationTicks, profiler.Gauge("/quota")) , DistributionPeriod_(config->DistributionPeriod) @@ -432,6 +433,20 @@ public: Usage_ += amount; } + void Release(i64 amount) override + { + YT_VERIFY(amount >= 0); + + if (amount == 0) { + return; + } + + *Quota_.Value += amount; + Usage_ -= amount; + + Released_.Increment(amount); + } + bool IsOverdraft() override { return GetQueueTotalAmount() > 0; @@ -560,6 +575,7 @@ private: TSharedBucketPtr SharedBucket_; NProfiling::TCounter Value_; + NProfiling::TCounter Released_; NProfiling::TEventTimer WaitTime_; TLeakyCounter Quota_; diff --git a/yt/yt/core/concurrency/throughput_throttler.cpp b/yt/yt/core/concurrency/throughput_throttler.cpp index eaa996fc8fb..55153a2acda 100644 --- a/yt/yt/core/concurrency/throughput_throttler.cpp +++ b/yt/yt/core/concurrency/throughput_throttler.cpp @@ -47,6 +47,7 @@ public: const NProfiling::TProfiler& profiler) : Logger(logger) , ValueCounter_(profiler.Counter("/value")) + , ReleaseCounter_(profiler.Counter("/released")) , QueueSizeCounter_(profiler.Gauge("/queue_size")) , WaitTimer_(profiler.Timer("/wait_time")) { @@ -147,6 +148,19 @@ public: ValueCounter_.Increment(amount); } + void Release(i64 amount) override + { + VERIFY_THREAD_AFFINITY_ANY(); + YT_VERIFY(amount >= 0); + + if (amount == 0) { + return; + } + + Available_ += amount; + ReleaseCounter_.Increment(amount); + } + bool IsOverdraft() override { VERIFY_THREAD_AFFINITY_ANY(); @@ -213,6 +227,7 @@ private: const TLogger Logger; NProfiling::TCounter ValueCounter_; + NProfiling::TCounter ReleaseCounter_; NProfiling::TGauge QueueSizeCounter_; NProfiling::TEventTimer WaitTimer_; @@ -430,7 +445,6 @@ private: request->Promise.Set(); } } - }; IReconfigurableThroughputThrottlerPtr CreateReconfigurableThroughputThrottler( @@ -465,6 +479,7 @@ public: explicit TUnlimitedThroughputThrottler( const NProfiling::TProfiler& profiler = {}) : ValueCounter_(profiler.Counter("/value")) + , ReleaseCounter_(profiler.Counter("/released")) { } TFuture<void> Throttle(i64 amount) override @@ -502,6 +517,14 @@ public: ValueCounter_.Increment(amount); } + void Release(i64 amount) override + { + VERIFY_THREAD_AFFINITY_ANY(); + YT_VERIFY(amount >= 0); + + ReleaseCounter_.Increment(amount); + } + bool IsOverdraft() override { VERIFY_THREAD_AFFINITY_ANY(); @@ -527,6 +550,7 @@ public: private: NProfiling::TCounter ValueCounter_; + NProfiling::TCounter ReleaseCounter_; }; IThroughputThrottlerPtr GetUnlimitedThrottler() @@ -591,6 +615,16 @@ public: } } + void Release(i64 amount) override + { + VERIFY_THREAD_AFFINITY_ANY(); + YT_VERIFY(amount >= 0); + + for (const auto& throttler : Throttlers_) { + throttler->Release(amount); + } + } + bool IsOverdraft() override { VERIFY_THREAD_AFFINITY_ANY(); @@ -689,6 +723,12 @@ public: Underlying_->Acquire(amount); } + void Release(i64 amount) override + { + Stealer_->Release(amount); + Underlying_->Release(amount); + } + bool IsOverdraft() override { return Stealer_->IsOverdraft(); @@ -830,6 +870,24 @@ public: } } + void Release(i64 amount) override + { + VERIFY_THREAD_AFFINITY_ANY(); + YT_VERIFY(amount >= 0); + + if (amount == 0) { + return; + } + + { + auto guard = Guard(Lock_); + Available_ += amount; + } + + YT_LOG_DEBUG("Released from prefetching throttler (Amount: %v)", + amount); + } + bool IsOverdraft() override { VERIFY_THREAD_AFFINITY_ANY(); diff --git a/yt/yt/core/concurrency/throughput_throttler.h b/yt/yt/core/concurrency/throughput_throttler.h index c0ef6e269ce..c4dd476a97a 100644 --- a/yt/yt/core/concurrency/throughput_throttler.h +++ b/yt/yt/core/concurrency/throughput_throttler.h @@ -53,6 +53,14 @@ struct IThroughputThrottler */ virtual void Acquire(i64 amount) = 0; + //! Releases #amount units back under control of the throttler. + //! This method should be used cautiously as in current implementation + //! it may locally disrupt fifo ordering or fairness of throttling requests. + /*! + * \note Thread affinity: any + */ + virtual void Release(i64 amount) = 0; + //! Returns |true| if the throttling limit has been exceeded. /*! * \note Thread affinity: any diff --git a/yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp b/yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp index c5cae32ee6a..5646e3360ea 100644 --- a/yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp +++ b/yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp @@ -258,6 +258,20 @@ TEST_F(TFairThrottlerTest, TryAcquireAvailable) ASSERT_GE(bucket->GetAvailable(), 0); } +TEST_F(TFairThrottlerTest, Release) +{ + auto bucket = FairThrottler->CreateBucketThrottler("main", BucketConfig); + + Sleep(TDuration::Seconds(5)); + + ASSERT_EQ(bucket->GetAvailable(), 100); + + ASSERT_TRUE(bucket->TryAcquire(100)); + bucket->Release(100); + ASSERT_TRUE(bucket->TryAcquire(100)); + ASSERT_FALSE(bucket->TryAcquire(100)); +} + //////////////////////////////////////////////////////////////////////////////// struct TFairThrottlerIPCTest diff --git a/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp b/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp index 3364e553e8e..d75f2459033 100644 --- a/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp +++ b/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp @@ -241,6 +241,22 @@ TEST(TReconfigurableThroughputThrottlerTest, TestZeroLimit) EXPECT_LE(timer.GetElapsedTime().MilliSeconds(), 1000u); } +TEST(TReconfigurableThroughputThrottlerTest, TestRelease) +{ + auto throttler = CreateReconfigurableThroughputThrottler( + TThroughputThrottlerConfig::Create(100)); + + auto future = throttler->Throttle(100); + EXPECT_EQ(future, VoidFuture); + + throttler->Release(100); + future = throttler->Throttle(100); + EXPECT_EQ(future, VoidFuture); + + future = throttler->Throttle(100); + EXPECT_FALSE(future.IsSet()); +} + //////////////////////////////////////////////////////////////////////////////// DECLARE_REFCOUNTED_CLASS(TMockThrottler) @@ -266,6 +282,11 @@ public: YT_UNIMPLEMENTED(); } + void Release(i64 /*amount*/) override + { + YT_UNIMPLEMENTED(); + } + bool IsOverdraft() override { YT_UNIMPLEMENTED(); @@ -392,6 +413,22 @@ TEST_F(TPrefetchingThrottlerExponentialGrowthTest, DoNotHangUpAfterAnError) Throttler_->Throttle(1); } +TEST_F(TPrefetchingThrottlerExponentialGrowthTest, Release) +{ + EXPECT_CALL(*Underlying_, Throttle(_)) + .Times(1) + .WillRepeatedly(Return(VoidFuture)); + + EXPECT_TRUE(Throttler_->Throttle(1).Get().IsOK()); + EXPECT_TRUE(Throttler_->IsOverdraft()); + + Throttler_->Release(1); + EXPECT_FALSE(Throttler_->IsOverdraft()); + + EXPECT_EQ(Throttler_->Throttle(1), VoidFuture); + EXPECT_TRUE(Throttler_->IsOverdraft()); +} + //////////////////////////////////////////////////////////////////////////////// struct TStressParameters |