diff options
author | coteeq <coteeq@yandex-team.com> | 2024-07-12 19:18:45 +0300 |
---|---|---|
committer | coteeq <coteeq@yandex-team.com> | 2024-07-12 19:37:39 +0300 |
commit | ef1c6569e8786b572ada95678ccdcfa9b61d2ec9 (patch) | |
tree | 1cfa1f3a164d2d30ab1fce12a382f869dec92e12 | |
parent | fef5d8caa053fbeb4e3858ee9e5363d4e1d3f4e6 (diff) | |
download | ydb-ef1c6569e8786b572ada95678ccdcfa9b61d2ec9.tar.gz |
YT-22217: Fix throttler in cases where amount > limit * period
d6a10139f70f3c32bd0b28b916cecfd30e737a9e
-rw-r--r-- | yt/yt/core/concurrency/throughput_throttler.cpp | 47 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp | 21 |
2 files changed, 57 insertions, 11 deletions
diff --git a/yt/yt/core/concurrency/throughput_throttler.cpp b/yt/yt/core/concurrency/throughput_throttler.cpp index afa15a0154..76ae776a07 100644 --- a/yt/yt/core/concurrency/throughput_throttler.cpp +++ b/yt/yt/core/concurrency/throughput_throttler.cpp @@ -17,6 +17,27 @@ using namespace NLogging; //////////////////////////////////////////////////////////////////////////////// +namespace { + +bool WillOverflowMul(i64 lhs, i64 rhs) +{ + i64 result; + return __builtin_mul_overflow(lhs, rhs, &result); +} + +i64 ClampingAdd(i64 lhs, i64 rhs, i64 max) +{ + i64 result; + if (__builtin_add_overflow(lhs, rhs, &result) || result > max) { + return max; + } + return result; +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + DECLARE_REFCOUNTED_STRUCT(TThrottlerRequest) struct TThrottlerRequest @@ -315,9 +336,15 @@ private: { auto timePassed = current - lastUpdated; - if (limit * period.SecondsFloat() > 1) { - // Preventing arithmetic overflows by reducing time interval. - timePassed = std::min(period, timePassed); + if (limit > 1) { + constexpr auto maxRepresentableMilliSeconds = static_cast<double>(TDuration::Max().MilliSeconds()); + auto maxValidMilliSecondsPassed = maxRepresentableMilliSeconds / limit; + + if (timePassed.MilliSeconds() > maxValidMilliSecondsPassed) { + // NB(coteeq): Actual timePassed will overflow multiplication below, + // so we have nothing better than to just shrink this duration. + timePassed = TDuration::MilliSeconds(maxValidMilliSecondsPassed); + } } auto deltaAvailable = static_cast<i64>(timePassed.MilliSeconds() * limit / 1000); @@ -340,8 +367,9 @@ private: TDelayedExecutor::CancelAndClear(UpdateCookie_); auto now = GetInstant(); if (limit && *limit > 0) { + YT_VERIFY(!WillOverflowMul(period.MilliSeconds(), *limit)); auto lastUpdated = LastUpdated_.load(); - auto maxAvailable = static_cast<i64>(Period_.load().SecondsFloat()) * *limit; + auto maxAvailable = period.MilliSeconds() * *limit / 1000; if (lastUpdated == TInstant::Zero()) { Available_ = maxAvailable; @@ -349,10 +377,10 @@ private: } else { auto deltaAvailable = GetDeltaAvailable(now, lastUpdated, period, *limit); - auto newAvailable = Available_.load() + deltaAvailable; - if (newAvailable > maxAvailable) { + auto newAvailable = ClampingAdd(Available_.load(), deltaAvailable, maxAvailable); + YT_VERIFY(newAvailable <= maxAvailable); + if (newAvailable == maxAvailable) { LastUpdated_ = now; - newAvailable = maxAvailable; } else { LastUpdated_ = lastUpdated + TDuration::MilliSeconds(deltaAvailable * 1000 / *limit); // Just in case. @@ -408,10 +436,7 @@ private: auto throughputPerPeriod = static_cast<i64>(period.SecondsFloat() * limit); while (true) { - auto newAvailable = available + deltaAvailable; - if (newAvailable > throughputPerPeriod) { - newAvailable = throughputPerPeriod; - } + auto newAvailable = ClampingAdd(available, deltaAvailable, /*max*/ throughputPerPeriod); if (Available_.compare_exchange_weak(available, newAvailable)) { break; } diff --git a/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp b/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp index 6bf183dc79..82f4360009 100644 --- a/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp +++ b/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp @@ -213,6 +213,27 @@ TEST(TReconfigurableThroughputThrottlerTest, Overdraft) EXPECT_FALSE(throttler->IsOverdraft()); } +TEST(TReconfigurableThroughputThrottlerTest, OverdraftSignificantly) +{ + auto throttler = CreateReconfigurableThroughputThrottler( + TThroughputThrottlerConfig::Create(100)); + + const auto N = 3; + NProfiling::TWallTimer timer; + for (int i = 0; i < N; ++i) { + throttler->Throttle(300).Get().ThrowOnError(); + } + + auto expectedElapsed = (300 * (N - 1) - 100) * 1000 / 100; + // ^^1^^ ^^2^^ + // NB(coteeq): + // 1. The last throttle overdrafts throttler and does not wait, hence minus one. + // 2. The first throttle takes 100 of its 300 from initial throttler's amount. + auto elapsed = timer.GetElapsedTime().MilliSeconds(); + EXPECT_GE(elapsed, expectedElapsed - 100u); + EXPECT_LE(elapsed, expectedElapsed + 100u); +} + #if !defined(_asan_enabled_) && !defined(_msan_enabled_) && !defined(_tsan_enabled_) TEST(TReconfigurableThroughputThrottlerTest, Stress) |