aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorcapone212 <capone212@yandex-team.com>2024-05-28 22:06:54 +0300
committercapone212 <capone212@yandex-team.com>2024-05-28 22:17:04 +0300
commit79997886bc7e7221d2a4b84064625e368e13c70e (patch)
tree005c40aaba7b9de6f5354851e7ccc50f1f41bf82
parent875f06219b7bbb7fa063e1713d06abbb38ffa789 (diff)
downloadydb-79997886bc7e7221d2a4b84064625e368e13c70e.tar.gz
YT-21810: Handling i64 overflow inside TReconfigurableThroughputThrottler
YT-21810: Handling i64 overflow inside TReconfigurableThroughputThrottler 172cd87e0636e55e2a90446a1a1cc624ef66c2ed
-rw-r--r--yt/yt/core/concurrency/public.h1
-rw-r--r--yt/yt/core/concurrency/throughput_throttler.cpp29
-rw-r--r--yt/yt/core/concurrency/throughput_throttler.h14
-rw-r--r--yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp22
4 files changed, 61 insertions, 5 deletions
diff --git a/yt/yt/core/concurrency/public.h b/yt/yt/core/concurrency/public.h
index 7bcc13437f..d7533f1232 100644
--- a/yt/yt/core/concurrency/public.h
+++ b/yt/yt/core/concurrency/public.h
@@ -36,6 +36,7 @@ DECLARE_REFCOUNTED_CLASS(TRelativeThroughputThrottlerConfig)
DECLARE_REFCOUNTED_CLASS(TPrefetchingThrottlerConfig)
DECLARE_REFCOUNTED_STRUCT(IThroughputThrottler)
DECLARE_REFCOUNTED_STRUCT(IReconfigurableThroughputThrottler)
+DECLARE_REFCOUNTED_STRUCT(ITestableReconfigurableThroughputThrottler)
DECLARE_REFCOUNTED_STRUCT(IAsyncInputStream)
DECLARE_REFCOUNTED_STRUCT(IAsyncOutputStream)
diff --git a/yt/yt/core/concurrency/throughput_throttler.cpp b/yt/yt/core/concurrency/throughput_throttler.cpp
index aa8d7f845b..120ada9fcc 100644
--- a/yt/yt/core/concurrency/throughput_throttler.cpp
+++ b/yt/yt/core/concurrency/throughput_throttler.cpp
@@ -38,7 +38,7 @@ DEFINE_REFCOUNTED_TYPE(TThrottlerRequest)
////////////////////////////////////////////////////////////////////////////////
class TReconfigurableThroughputThrottler
- : public IReconfigurableThroughputThrottler
+ : public ITestableReconfigurableThroughputThrottler
{
public:
TReconfigurableThroughputThrottler(
@@ -224,6 +224,11 @@ public:
return Available_.load();
}
+ void SetLastUpdated(TInstant lastUpdated) override
+ {
+ LastUpdated_.store(lastUpdated);
+ }
+
private:
const TLogger Logger;
@@ -306,6 +311,21 @@ private:
return request->Promise;
}
+ static i64 GetDeltaAvailable(TInstant current, TInstant lastUpdated, TDuration period, double limit)
+ {
+ auto timePassed = current - lastUpdated;
+
+ if (limit > 1) {
+ // Preventing arithmetic overflows by reducing time interval.
+ timePassed = std::min(period, timePassed);
+ }
+
+ auto deltaAvailable = static_cast<i64>(timePassed.MilliSeconds() * limit / 1000);
+ YT_VERIFY(deltaAvailable >= 0);
+
+ return deltaAvailable;
+ }
+
void DoReconfigure(std::optional<double> limit, TDuration period)
{
VERIFY_THREAD_AFFINITY_ANY();
@@ -327,8 +347,8 @@ private:
Available_ = maxAvailable;
LastUpdated_ = now;
} else {
- auto millisecondsPassed = (now - lastUpdated).MilliSeconds();
- auto deltaAvailable = static_cast<i64>(millisecondsPassed * *limit / 1000);
+ auto deltaAvailable = GetDeltaAvailable(now, lastUpdated, period, *limit);
+
auto newAvailable = Available_.load() + deltaAvailable;
if (newAvailable > maxAvailable) {
LastUpdated_ = now;
@@ -375,8 +395,7 @@ private:
auto current = GetInstant();
auto lastUpdated = LastUpdated_.load();
- auto millisecondsPassed = (current - lastUpdated).MilliSeconds();
- auto deltaAvailable = static_cast<i64>(millisecondsPassed * limit / 1000);
+ auto deltaAvailable = GetDeltaAvailable(current, lastUpdated, period, limit);
if (deltaAvailable == 0) {
return;
diff --git a/yt/yt/core/concurrency/throughput_throttler.h b/yt/yt/core/concurrency/throughput_throttler.h
index df03931b16..18c89b3154 100644
--- a/yt/yt/core/concurrency/throughput_throttler.h
+++ b/yt/yt/core/concurrency/throughput_throttler.h
@@ -115,6 +115,20 @@ DEFINE_REFCOUNTED_TYPE(IReconfigurableThroughputThrottler)
////////////////////////////////////////////////////////////////////////////////
+//! An interface for unit test purpose.
+/*!
+ * Thread affinity: any
+ */
+struct ITestableReconfigurableThroughputThrottler
+ : public IReconfigurableThroughputThrottler
+{
+ virtual void SetLastUpdated(TInstant lastUpdated) = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(ITestableReconfigurableThroughputThrottler)
+
+////////////////////////////////////////////////////////////////////////////////
+
//! Constructs a throttler from #config.
IReconfigurableThroughputThrottlerPtr CreateReconfigurableThroughputThrottler(
TThroughputThrottlerConfigPtr config,
diff --git a/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp b/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp
index f87e4140cf..d3377612db 100644
--- a/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp
@@ -53,6 +53,28 @@ TEST(TReconfigurableThroughputThrottlerTest, TestLimit)
EXPECT_LE(duration, 3000u);
}
+TEST(TReconfigurableThroughputThrottlerTest, TestNoOverflow)
+{
+ auto throttler = CreateReconfigurableThroughputThrottler(
+ TThroughputThrottlerConfig::Create(100_TB));
+
+ auto* testableThrottler = static_cast<ITestableReconfigurableThroughputThrottler*>(throttler.Get());
+
+ NProfiling::TWallTimer timer;
+ testableThrottler->Throttle(1).Get().ThrowOnError();
+ testableThrottler->SetLastUpdated(TInstant::Now() - TDuration::Days(1));
+
+ std::vector<TFuture<void>> futures;
+ for (int i = 0; i < 2; ++i) {
+ futures.push_back(testableThrottler->Throttle(1));
+ testableThrottler->SetLimit(5_TB);
+ }
+
+ WaitFor(AllSucceeded(futures)
+ .WithTimeout(TDuration::Seconds(5)))
+ .ThrowOnError();
+}
+
TEST(TReconfigurableThroughputThrottlerTest, TestScheduleUpdate)
{
auto throttler = CreateReconfigurableThroughputThrottler(