diff options
author | Evgeniy Ivanov <eivanov89@yandex-team.ru> | 2022-03-14 11:48:13 +0300 |
---|---|---|
committer | Evgeniy Ivanov <eivanov89@yandex-team.ru> | 2022-03-14 11:48:13 +0300 |
commit | 65240f91f3d36daefcc470fc92240868d6e7cd0a (patch) | |
tree | 38b4d088659bf1b231487b27daf2dfbb8f5dbb19 | |
parent | 957ddeab73bf6c1f1dfeecfb3f8a49ba5218d25b (diff) | |
download | ydb-65240f91f3d36daefcc470fc92240868d6e7cd0a.tar.gz |
KIKIMR-9748: fix wakeup calculation when RPS exhausted
ref:581e77a573a0b6f0aad765cfa0ae0f87856a5e65
-rw-r--r-- | ydb/core/util/operation_queue.h | 9 | ||||
-rw-r--r-- | ydb/core/util/operation_queue_ut.cpp | 32 |
2 files changed, 39 insertions, 2 deletions
diff --git a/ydb/core/util/operation_queue.h b/ydb/core/util/operation_queue.h index 7c69681a93..e4075e0515 100644 --- a/ydb/core/util/operation_queue.h +++ b/ydb/core/util/operation_queue.h @@ -619,8 +619,13 @@ void TOperationQueue<T, TQueue>::ScheduleWakeup() { auto now = Timer.Now(); if (RunningItems.Empty() && !ReadyQueue.Empty()) { - // special case when we failed to start anything - if (!NextWakeup || NextWakeup <= now) { + if (TokenBucket.Available() <= 0) { + // we didn't start anything because of RPS limit + NextWakeup = now + TokenBucket.NextAvailableDelay(); + Timer.SetWakeupTimer(NextWakeup); + return; + } else if (!NextWakeup || NextWakeup <= now) { + // special case when we failed to start anything NextWakeup = now + Config.WakeupInterval; Timer.SetWakeupTimer(NextWakeup); return; diff --git a/ydb/core/util/operation_queue_ut.cpp b/ydb/core/util/operation_queue_ut.cpp index bd6014eb56..3c88f45a6b 100644 --- a/ydb/core/util/operation_queue_ut.cpp +++ b/ydb/core/util/operation_queue_ut.cpp @@ -919,6 +919,38 @@ Y_UNIT_TEST_SUITE(TCircularOperationQueueTest) { UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 5UL); } + Y_UNIT_TEST(CheckWakeupWhenRPSExhausted) { + // regression case for the following case: + // Config: Inglight = 1, RPS = 1/s, MinOperationRepeatDelay = 60s + // 1. Enqueue multiple operations, 1 is running + // 3. OnDone1, OnDone2 within same second, RPS is now exhausted. + // 4. Wakeup should be +1 second, not +MinOperationRepeatDelay=60s as caused by bug + + TQueue::TConfig config; + config.IsCircular = true; + config.InflightLimit = 1; + config.MaxRate = 1.0; + config.Timeout = Timeout; + config.MinOperationRepeatDelay = TDuration::Seconds(60); + TOperationStarter starter; + + TQueue queue(config, starter, starter); + queue.Start(); + + queue.Enqueue(1); + queue.Enqueue(2); + queue.Enqueue(3); + + UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 1UL); + queue.OnDone(1); + queue.OnDone(2); + + UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 0UL); + + auto goldWakeup = starter.TimeProvider.Now() + TDuration::Seconds(1); + UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.back(), goldWakeup); + } + Y_UNIT_TEST(CheckStartAfterStop) { TQueue::TConfig config; config.IsCircular = true; |