diff options
author | capone212 <capone212@yandex-team.com> | 2024-05-28 22:06:54 +0300 |
---|---|---|
committer | capone212 <capone212@yandex-team.com> | 2024-05-28 22:17:04 +0300 |
commit | 79997886bc7e7221d2a4b84064625e368e13c70e (patch) | |
tree | 005c40aaba7b9de6f5354851e7ccc50f1f41bf82 | |
parent | 875f06219b7bbb7fa063e1713d06abbb38ffa789 (diff) | |
download | ydb-79997886bc7e7221d2a4b84064625e368e13c70e.tar.gz |
YT-21810: Handling i64 overflow inside TReconfigurableThroughputThrottler
YT-21810: Handling i64 overflow inside TReconfigurableThroughputThrottler
172cd87e0636e55e2a90446a1a1cc624ef66c2ed
-rw-r--r-- | yt/yt/core/concurrency/public.h | 1 | ||||
-rw-r--r-- | yt/yt/core/concurrency/throughput_throttler.cpp | 29 | ||||
-rw-r--r-- | yt/yt/core/concurrency/throughput_throttler.h | 14 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp | 22 |
4 files changed, 61 insertions, 5 deletions
diff --git a/yt/yt/core/concurrency/public.h b/yt/yt/core/concurrency/public.h index 7bcc13437f..d7533f1232 100644 --- a/yt/yt/core/concurrency/public.h +++ b/yt/yt/core/concurrency/public.h @@ -36,6 +36,7 @@ DECLARE_REFCOUNTED_CLASS(TRelativeThroughputThrottlerConfig) DECLARE_REFCOUNTED_CLASS(TPrefetchingThrottlerConfig) DECLARE_REFCOUNTED_STRUCT(IThroughputThrottler) DECLARE_REFCOUNTED_STRUCT(IReconfigurableThroughputThrottler) +DECLARE_REFCOUNTED_STRUCT(ITestableReconfigurableThroughputThrottler) DECLARE_REFCOUNTED_STRUCT(IAsyncInputStream) DECLARE_REFCOUNTED_STRUCT(IAsyncOutputStream) diff --git a/yt/yt/core/concurrency/throughput_throttler.cpp b/yt/yt/core/concurrency/throughput_throttler.cpp index aa8d7f845b..120ada9fcc 100644 --- a/yt/yt/core/concurrency/throughput_throttler.cpp +++ b/yt/yt/core/concurrency/throughput_throttler.cpp @@ -38,7 +38,7 @@ DEFINE_REFCOUNTED_TYPE(TThrottlerRequest) //////////////////////////////////////////////////////////////////////////////// class TReconfigurableThroughputThrottler - : public IReconfigurableThroughputThrottler + : public ITestableReconfigurableThroughputThrottler { public: TReconfigurableThroughputThrottler( @@ -224,6 +224,11 @@ public: return Available_.load(); } + void SetLastUpdated(TInstant lastUpdated) override + { + LastUpdated_.store(lastUpdated); + } + private: const TLogger Logger; @@ -306,6 +311,21 @@ private: return request->Promise; } + static i64 GetDeltaAvailable(TInstant current, TInstant lastUpdated, TDuration period, double limit) + { + auto timePassed = current - lastUpdated; + + if (limit > 1) { + // Preventing arithmetic overflows by reducing time interval. + timePassed = std::min(period, timePassed); + } + + auto deltaAvailable = static_cast<i64>(timePassed.MilliSeconds() * limit / 1000); + YT_VERIFY(deltaAvailable >= 0); + + return deltaAvailable; + } + void DoReconfigure(std::optional<double> limit, TDuration period) { VERIFY_THREAD_AFFINITY_ANY(); @@ -327,8 +347,8 @@ private: Available_ = maxAvailable; LastUpdated_ = now; } else { - auto millisecondsPassed = (now - lastUpdated).MilliSeconds(); - auto deltaAvailable = static_cast<i64>(millisecondsPassed * *limit / 1000); + auto deltaAvailable = GetDeltaAvailable(now, lastUpdated, period, *limit); + auto newAvailable = Available_.load() + deltaAvailable; if (newAvailable > maxAvailable) { LastUpdated_ = now; @@ -375,8 +395,7 @@ private: auto current = GetInstant(); auto lastUpdated = LastUpdated_.load(); - auto millisecondsPassed = (current - lastUpdated).MilliSeconds(); - auto deltaAvailable = static_cast<i64>(millisecondsPassed * limit / 1000); + auto deltaAvailable = GetDeltaAvailable(current, lastUpdated, period, limit); if (deltaAvailable == 0) { return; diff --git a/yt/yt/core/concurrency/throughput_throttler.h b/yt/yt/core/concurrency/throughput_throttler.h index df03931b16..18c89b3154 100644 --- a/yt/yt/core/concurrency/throughput_throttler.h +++ b/yt/yt/core/concurrency/throughput_throttler.h @@ -115,6 +115,20 @@ DEFINE_REFCOUNTED_TYPE(IReconfigurableThroughputThrottler) //////////////////////////////////////////////////////////////////////////////// +//! An interface for unit test purpose. +/*! + * Thread affinity: any + */ +struct ITestableReconfigurableThroughputThrottler + : public IReconfigurableThroughputThrottler +{ + virtual void SetLastUpdated(TInstant lastUpdated) = 0; +}; + +DEFINE_REFCOUNTED_TYPE(ITestableReconfigurableThroughputThrottler) + +//////////////////////////////////////////////////////////////////////////////// + //! Constructs a throttler from #config. IReconfigurableThroughputThrottlerPtr CreateReconfigurableThroughputThrottler( TThroughputThrottlerConfigPtr config, diff --git a/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp b/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp index f87e4140cf..d3377612db 100644 --- a/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp +++ b/yt/yt/core/concurrency/unittests/throughput_throttler_ut.cpp @@ -53,6 +53,28 @@ TEST(TReconfigurableThroughputThrottlerTest, TestLimit) EXPECT_LE(duration, 3000u); } +TEST(TReconfigurableThroughputThrottlerTest, TestNoOverflow) +{ + auto throttler = CreateReconfigurableThroughputThrottler( + TThroughputThrottlerConfig::Create(100_TB)); + + auto* testableThrottler = static_cast<ITestableReconfigurableThroughputThrottler*>(throttler.Get()); + + NProfiling::TWallTimer timer; + testableThrottler->Throttle(1).Get().ThrowOnError(); + testableThrottler->SetLastUpdated(TInstant::Now() - TDuration::Days(1)); + + std::vector<TFuture<void>> futures; + for (int i = 0; i < 2; ++i) { + futures.push_back(testableThrottler->Throttle(1)); + testableThrottler->SetLimit(5_TB); + } + + WaitFor(AllSucceeded(futures) + .WithTimeout(TDuration::Seconds(5))) + .ThrowOnError(); +} + TEST(TReconfigurableThroughputThrottlerTest, TestScheduleUpdate) { auto throttler = CreateReconfigurableThroughputThrottler( |