summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--yt/yt/core/concurrency/throughput_throttler.cpp16
-rw-r--r--yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp13
2 files changed, 26 insertions, 3 deletions
diff --git a/yt/yt/core/concurrency/throughput_throttler.cpp b/yt/yt/core/concurrency/throughput_throttler.cpp
index 70a41ff07f3..15320a59156 100644
--- a/yt/yt/core/concurrency/throughput_throttler.cpp
+++ b/yt/yt/core/concurrency/throughput_throttler.cpp
@@ -106,11 +106,12 @@ public:
return true;
}
- if (Limit_.load() >= 0) {
+ auto limit = Limit_.load();
+ if (limit >= 0) {
while (true) {
TryUpdateAvailable();
auto available = Available_.load();
- if (available < 0) {
+ if ((limit > 0 && available < 0) || (limit == 0 && available <= 0)) {
return false;
}
if (Available_.compare_exchange_weak(available, available - amount)) {
@@ -472,7 +473,16 @@ private:
std::vector<TThrottlerRequestPtr> readyList;
auto limit = Limit_.load();
- while (!Requests_.empty() && (limit < 0 || Available_ >= 0)) {
+ auto canSpend = [&] {
+ auto available = Available_.load();
+ return
+ limit < 0 ||
+ // NB(coteeq): Do not spend tokens if limit is zero.
+ (limit == 0 && available > 0) ||
+ (limit > 0 && available >= 0);
+ };
+
+ while (!Requests_.empty() && canSpend()) {
const auto& request = Requests_.front();
if (!request->Set.test_and_set()) {
NTracing::TTraceContextGuard traceGuard(std::move(request->TraceContext));
diff --git a/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp b/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp
index 19a61583379..ada4b11c6ca 100644
--- a/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp
@@ -35,6 +35,19 @@ TEST(TReconfigurableThroughputThrottlerTest, NoLimit)
EXPECT_LE(timer.GetElapsedTime().MilliSeconds(), 100u);
}
+TEST(TReconfigurableThroughputThrottlerTest, CannotBeAbusedViaReconfigure)
+{
+ auto throttler = CreateReconfigurableThroughputThrottler(
+ TThroughputThrottlerConfig::Create(0));
+
+ auto future1 = throttler->Throttle(1);
+ auto future2 = throttler->Throttle(1);
+ throttler->SetLimit(0);
+
+ EXPECT_FALSE(future1.IsSet());
+ EXPECT_FALSE(future2.IsSet());
+}
+
TEST(TReconfigurableThroughputThrottlerTest, Limit)
{
auto throttler = CreateReconfigurableThroughputThrottler(