summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorakozhikhov <[email protected]>2023-08-16 18:58:55 +0300
committerakozhikhov <[email protected]>2023-08-16 21:03:12 +0300
commit9622542f7f4c0319a0aff3ffb089d974a1cdeaf2 (patch)
tree323d547693c2634d777f1e276c8cacf6606e6a0c
parente831a80e472c89fd300c90a88a90b19c55b98a11 (diff)
Add release method for bandwidth throttler and use it in replication reader
-rw-r--r--yt/yt/core/concurrency/fair_throttler.cpp16
-rw-r--r--yt/yt/core/concurrency/throughput_throttler.cpp60
-rw-r--r--yt/yt/core/concurrency/throughput_throttler.h8
-rw-r--r--yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp14
-rw-r--r--yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp37
5 files changed, 134 insertions, 1 deletions
diff --git a/yt/yt/core/concurrency/fair_throttler.cpp b/yt/yt/core/concurrency/fair_throttler.cpp
index a3b1d67578f..4d95b4a85ce 100644
--- a/yt/yt/core/concurrency/fair_throttler.cpp
+++ b/yt/yt/core/concurrency/fair_throttler.cpp
@@ -345,6 +345,7 @@ public:
: Logger(logger)
, SharedBucket_(sharedBucket)
, Value_(profiler.Counter("/value"))
+ , Released_(profiler.Counter("/released"))
, WaitTime_(profiler.Timer("/wait_time"))
, Quota_(config->BucketAccumulationTicks, profiler.Gauge("/quota"))
, DistributionPeriod_(config->DistributionPeriod)
@@ -432,6 +433,20 @@ public:
Usage_ += amount;
}
+ void Release(i64 amount) override
+ {
+ YT_VERIFY(amount >= 0);
+
+ if (amount == 0) {
+ return;
+ }
+
+ *Quota_.Value += amount;
+ Usage_ -= amount;
+
+ Released_.Increment(amount);
+ }
+
bool IsOverdraft() override
{
return GetQueueTotalAmount() > 0;
@@ -560,6 +575,7 @@ private:
TSharedBucketPtr SharedBucket_;
NProfiling::TCounter Value_;
+ NProfiling::TCounter Released_;
NProfiling::TEventTimer WaitTime_;
TLeakyCounter Quota_;
diff --git a/yt/yt/core/concurrency/throughput_throttler.cpp b/yt/yt/core/concurrency/throughput_throttler.cpp
index eaa996fc8fb..55153a2acda 100644
--- a/yt/yt/core/concurrency/throughput_throttler.cpp
+++ b/yt/yt/core/concurrency/throughput_throttler.cpp
@@ -47,6 +47,7 @@ public:
const NProfiling::TProfiler& profiler)
: Logger(logger)
, ValueCounter_(profiler.Counter("/value"))
+ , ReleaseCounter_(profiler.Counter("/released"))
, QueueSizeCounter_(profiler.Gauge("/queue_size"))
, WaitTimer_(profiler.Timer("/wait_time"))
{
@@ -147,6 +148,19 @@ public:
ValueCounter_.Increment(amount);
}
+ void Release(i64 amount) override
+ {
+ VERIFY_THREAD_AFFINITY_ANY();
+ YT_VERIFY(amount >= 0);
+
+ if (amount == 0) {
+ return;
+ }
+
+ Available_ += amount;
+ ReleaseCounter_.Increment(amount);
+ }
+
bool IsOverdraft() override
{
VERIFY_THREAD_AFFINITY_ANY();
@@ -213,6 +227,7 @@ private:
const TLogger Logger;
NProfiling::TCounter ValueCounter_;
+ NProfiling::TCounter ReleaseCounter_;
NProfiling::TGauge QueueSizeCounter_;
NProfiling::TEventTimer WaitTimer_;
@@ -430,7 +445,6 @@ private:
request->Promise.Set();
}
}
-
};
IReconfigurableThroughputThrottlerPtr CreateReconfigurableThroughputThrottler(
@@ -465,6 +479,7 @@ public:
explicit TUnlimitedThroughputThrottler(
const NProfiling::TProfiler& profiler = {})
: ValueCounter_(profiler.Counter("/value"))
+ , ReleaseCounter_(profiler.Counter("/released"))
{ }
TFuture<void> Throttle(i64 amount) override
@@ -502,6 +517,14 @@ public:
ValueCounter_.Increment(amount);
}
+ void Release(i64 amount) override
+ {
+ VERIFY_THREAD_AFFINITY_ANY();
+ YT_VERIFY(amount >= 0);
+
+ ReleaseCounter_.Increment(amount);
+ }
+
bool IsOverdraft() override
{
VERIFY_THREAD_AFFINITY_ANY();
@@ -527,6 +550,7 @@ public:
private:
NProfiling::TCounter ValueCounter_;
+ NProfiling::TCounter ReleaseCounter_;
};
IThroughputThrottlerPtr GetUnlimitedThrottler()
@@ -591,6 +615,16 @@ public:
}
}
+ void Release(i64 amount) override
+ {
+ VERIFY_THREAD_AFFINITY_ANY();
+ YT_VERIFY(amount >= 0);
+
+ for (const auto& throttler : Throttlers_) {
+ throttler->Release(amount);
+ }
+ }
+
bool IsOverdraft() override
{
VERIFY_THREAD_AFFINITY_ANY();
@@ -689,6 +723,12 @@ public:
Underlying_->Acquire(amount);
}
+ void Release(i64 amount) override
+ {
+ Stealer_->Release(amount);
+ Underlying_->Release(amount);
+ }
+
bool IsOverdraft() override
{
return Stealer_->IsOverdraft();
@@ -830,6 +870,24 @@ public:
}
}
+ void Release(i64 amount) override
+ {
+ VERIFY_THREAD_AFFINITY_ANY();
+ YT_VERIFY(amount >= 0);
+
+ if (amount == 0) {
+ return;
+ }
+
+ {
+ auto guard = Guard(Lock_);
+ Available_ += amount;
+ }
+
+ YT_LOG_DEBUG("Released from prefetching throttler (Amount: %v)",
+ amount);
+ }
+
bool IsOverdraft() override
{
VERIFY_THREAD_AFFINITY_ANY();
diff --git a/yt/yt/core/concurrency/throughput_throttler.h b/yt/yt/core/concurrency/throughput_throttler.h
index c0ef6e269ce..c4dd476a97a 100644
--- a/yt/yt/core/concurrency/throughput_throttler.h
+++ b/yt/yt/core/concurrency/throughput_throttler.h
@@ -53,6 +53,14 @@ struct IThroughputThrottler
*/
virtual void Acquire(i64 amount) = 0;
+ //! Releases #amount units back under control of the throttler.
+ //! This method should be used cautiously as in current implementation
+ //! it may locally disrupt fifo ordering or fairness of throttling requests.
+ /*!
+ * \note Thread affinity: any
+ */
+ virtual void Release(i64 amount) = 0;
+
//! Returns |true| if the throttling limit has been exceeded.
/*!
* \note Thread affinity: any
diff --git a/yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp b/yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp
index c5cae32ee6a..5646e3360ea 100644
--- a/yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp
@@ -258,6 +258,20 @@ TEST_F(TFairThrottlerTest, TryAcquireAvailable)
ASSERT_GE(bucket->GetAvailable(), 0);
}
+TEST_F(TFairThrottlerTest, Release)
+{
+ auto bucket = FairThrottler->CreateBucketThrottler("main", BucketConfig);
+
+ Sleep(TDuration::Seconds(5));
+
+ ASSERT_EQ(bucket->GetAvailable(), 100);
+
+ ASSERT_TRUE(bucket->TryAcquire(100));
+ bucket->Release(100);
+ ASSERT_TRUE(bucket->TryAcquire(100));
+ ASSERT_FALSE(bucket->TryAcquire(100));
+}
+
////////////////////////////////////////////////////////////////////////////////
struct TFairThrottlerIPCTest
diff --git a/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp b/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp
index 3364e553e8e..d75f2459033 100644
--- a/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp
@@ -241,6 +241,22 @@ TEST(TReconfigurableThroughputThrottlerTest, TestZeroLimit)
EXPECT_LE(timer.GetElapsedTime().MilliSeconds(), 1000u);
}
+TEST(TReconfigurableThroughputThrottlerTest, TestRelease)
+{
+ auto throttler = CreateReconfigurableThroughputThrottler(
+ TThroughputThrottlerConfig::Create(100));
+
+ auto future = throttler->Throttle(100);
+ EXPECT_EQ(future, VoidFuture);
+
+ throttler->Release(100);
+ future = throttler->Throttle(100);
+ EXPECT_EQ(future, VoidFuture);
+
+ future = throttler->Throttle(100);
+ EXPECT_FALSE(future.IsSet());
+}
+
////////////////////////////////////////////////////////////////////////////////
DECLARE_REFCOUNTED_CLASS(TMockThrottler)
@@ -266,6 +282,11 @@ public:
YT_UNIMPLEMENTED();
}
+ void Release(i64 /*amount*/) override
+ {
+ YT_UNIMPLEMENTED();
+ }
+
bool IsOverdraft() override
{
YT_UNIMPLEMENTED();
@@ -392,6 +413,22 @@ TEST_F(TPrefetchingThrottlerExponentialGrowthTest, DoNotHangUpAfterAnError)
Throttler_->Throttle(1);
}
+TEST_F(TPrefetchingThrottlerExponentialGrowthTest, Release)
+{
+ EXPECT_CALL(*Underlying_, Throttle(_))
+ .Times(1)
+ .WillRepeatedly(Return(VoidFuture));
+
+ EXPECT_TRUE(Throttler_->Throttle(1).Get().IsOK());
+ EXPECT_TRUE(Throttler_->IsOverdraft());
+
+ Throttler_->Release(1);
+ EXPECT_FALSE(Throttler_->IsOverdraft());
+
+ EXPECT_EQ(Throttler_->Throttle(1), VoidFuture);
+ EXPECT_TRUE(Throttler_->IsOverdraft());
+}
+
////////////////////////////////////////////////////////////////////////////////
struct TStressParameters