aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorcoteeq <coteeq@yandex-team.com>2024-07-12 19:18:45 +0300
committercoteeq <coteeq@yandex-team.com>2024-07-12 19:37:39 +0300
commitef1c6569e8786b572ada95678ccdcfa9b61d2ec9 (patch)
tree1cfa1f3a164d2d30ab1fce12a382f869dec92e12
parentfef5d8caa053fbeb4e3858ee9e5363d4e1d3f4e6 (diff)
downloadydb-ef1c6569e8786b572ada95678ccdcfa9b61d2ec9.tar.gz
YT-22217: Fix throttler in cases where amount > limit * period
d6a10139f70f3c32bd0b28b916cecfd30e737a9e
-rw-r--r--yt/yt/core/concurrency/throughput_throttler.cpp47
-rw-r--r--yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp21
2 files changed, 57 insertions, 11 deletions
diff --git a/yt/yt/core/concurrency/throughput_throttler.cpp b/yt/yt/core/concurrency/throughput_throttler.cpp
index afa15a0154..76ae776a07 100644
--- a/yt/yt/core/concurrency/throughput_throttler.cpp
+++ b/yt/yt/core/concurrency/throughput_throttler.cpp
@@ -17,6 +17,27 @@ using namespace NLogging;
////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+bool WillOverflowMul(i64 lhs, i64 rhs)
+{
+ i64 result;
+ return __builtin_mul_overflow(lhs, rhs, &result);
+}
+
+i64 ClampingAdd(i64 lhs, i64 rhs, i64 max)
+{
+ i64 result;
+ if (__builtin_add_overflow(lhs, rhs, &result) || result > max) {
+ return max;
+ }
+ return result;
+}
+
+} // namespace
+
+////////////////////////////////////////////////////////////////////////////////
+
DECLARE_REFCOUNTED_STRUCT(TThrottlerRequest)
struct TThrottlerRequest
@@ -315,9 +336,15 @@ private:
{
auto timePassed = current - lastUpdated;
- if (limit * period.SecondsFloat() > 1) {
- // Preventing arithmetic overflows by reducing time interval.
- timePassed = std::min(period, timePassed);
+ if (limit > 1) {
+ constexpr auto maxRepresentableMilliSeconds = static_cast<double>(TDuration::Max().MilliSeconds());
+ auto maxValidMilliSecondsPassed = maxRepresentableMilliSeconds / limit;
+
+ if (timePassed.MilliSeconds() > maxValidMilliSecondsPassed) {
+ // NB(coteeq): Actual timePassed will overflow multiplication below,
+ // so we have nothing better than to just shrink this duration.
+ timePassed = TDuration::MilliSeconds(maxValidMilliSecondsPassed);
+ }
}
auto deltaAvailable = static_cast<i64>(timePassed.MilliSeconds() * limit / 1000);
@@ -340,8 +367,9 @@ private:
TDelayedExecutor::CancelAndClear(UpdateCookie_);
auto now = GetInstant();
if (limit && *limit > 0) {
+ YT_VERIFY(!WillOverflowMul(period.MilliSeconds(), *limit));
auto lastUpdated = LastUpdated_.load();
- auto maxAvailable = static_cast<i64>(Period_.load().SecondsFloat()) * *limit;
+ auto maxAvailable = period.MilliSeconds() * *limit / 1000;
if (lastUpdated == TInstant::Zero()) {
Available_ = maxAvailable;
@@ -349,10 +377,10 @@ private:
} else {
auto deltaAvailable = GetDeltaAvailable(now, lastUpdated, period, *limit);
- auto newAvailable = Available_.load() + deltaAvailable;
- if (newAvailable > maxAvailable) {
+ auto newAvailable = ClampingAdd(Available_.load(), deltaAvailable, maxAvailable);
+ YT_VERIFY(newAvailable <= maxAvailable);
+ if (newAvailable == maxAvailable) {
LastUpdated_ = now;
- newAvailable = maxAvailable;
} else {
LastUpdated_ = lastUpdated + TDuration::MilliSeconds(deltaAvailable * 1000 / *limit);
// Just in case.
@@ -408,10 +436,7 @@ private:
auto throughputPerPeriod = static_cast<i64>(period.SecondsFloat() * limit);
while (true) {
- auto newAvailable = available + deltaAvailable;
- if (newAvailable > throughputPerPeriod) {
- newAvailable = throughputPerPeriod;
- }
+ auto newAvailable = ClampingAdd(available, deltaAvailable, /*max*/ throughputPerPeriod);
if (Available_.compare_exchange_weak(available, newAvailable)) {
break;
}
diff --git a/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp b/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp
index 6bf183dc79..82f4360009 100644
--- a/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp
@@ -213,6 +213,27 @@ TEST(TReconfigurableThroughputThrottlerTest, Overdraft)
EXPECT_FALSE(throttler->IsOverdraft());
}
+TEST(TReconfigurableThroughputThrottlerTest, OverdraftSignificantly)
+{
+ auto throttler = CreateReconfigurableThroughputThrottler(
+ TThroughputThrottlerConfig::Create(100));
+
+ const auto N = 3;
+ NProfiling::TWallTimer timer;
+ for (int i = 0; i < N; ++i) {
+ throttler->Throttle(300).Get().ThrowOnError();
+ }
+
+ auto expectedElapsed = (300 * (N - 1) - 100) * 1000 / 100;
+ // ^^1^^ ^^2^^
+ // NB(coteeq):
+ // 1. The last throttle overdrafts throttler and does not wait, hence minus one.
+ // 2. The first throttle takes 100 of its 300 from initial throttler's amount.
+ auto elapsed = timer.GetElapsedTime().MilliSeconds();
+ EXPECT_GE(elapsed, expectedElapsed - 100u);
+ EXPECT_LE(elapsed, expectedElapsed + 100u);
+}
+
#if !defined(_asan_enabled_) && !defined(_msan_enabled_) && !defined(_tsan_enabled_)
TEST(TReconfigurableThroughputThrottlerTest, Stress)