diff options
| -rw-r--r-- | yt/yt/core/concurrency/throughput_throttler.cpp | 16 | ||||
| -rw-r--r-- | yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp | 13 |
2 files changed, 26 insertions, 3 deletions
diff --git a/yt/yt/core/concurrency/throughput_throttler.cpp b/yt/yt/core/concurrency/throughput_throttler.cpp index 70a41ff07f3..15320a59156 100644 --- a/yt/yt/core/concurrency/throughput_throttler.cpp +++ b/yt/yt/core/concurrency/throughput_throttler.cpp @@ -106,11 +106,12 @@ public: return true; } - if (Limit_.load() >= 0) { + auto limit = Limit_.load(); + if (limit >= 0) { while (true) { TryUpdateAvailable(); auto available = Available_.load(); - if (available < 0) { + if ((limit > 0 && available < 0) || (limit == 0 && available <= 0)) { return false; } if (Available_.compare_exchange_weak(available, available - amount)) { @@ -472,7 +473,16 @@ private: std::vector<TThrottlerRequestPtr> readyList; auto limit = Limit_.load(); - while (!Requests_.empty() && (limit < 0 || Available_ >= 0)) { + auto canSpend = [&] { + auto available = Available_.load(); + return + limit < 0 || + // NB(coteeq): Do not spend tokens if limit is zero. + (limit == 0 && available > 0) || + (limit > 0 && available >= 0); + }; + + while (!Requests_.empty() && canSpend()) { const auto& request = Requests_.front(); if (!request->Set.test_and_set()) { NTracing::TTraceContextGuard traceGuard(std::move(request->TraceContext)); diff --git a/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp b/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp index 19a61583379..ada4b11c6ca 100644 --- a/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp +++ b/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp @@ -35,6 +35,19 @@ TEST(TReconfigurableThroughputThrottlerTest, NoLimit) EXPECT_LE(timer.GetElapsedTime().MilliSeconds(), 100u); } +TEST(TReconfigurableThroughputThrottlerTest, CannotBeAbusedViaReconfigure) +{ + auto throttler = CreateReconfigurableThroughputThrottler( + TThroughputThrottlerConfig::Create(0)); + + auto future1 = throttler->Throttle(1); + auto future2 = throttler->Throttle(1); + throttler->SetLimit(0); + + EXPECT_FALSE(future1.IsSet()); + EXPECT_FALSE(future2.IsSet()); +} + TEST(TReconfigurableThroughputThrottlerTest, Limit) { auto throttler = CreateReconfigurableThroughputThrottler( |
