aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <eivanov89@yandex-team.ru>2022-03-14 11:48:13 +0300
committerEvgeniy Ivanov <eivanov89@yandex-team.ru>2022-03-14 11:48:13 +0300
commit65240f91f3d36daefcc470fc92240868d6e7cd0a (patch)
tree38b4d088659bf1b231487b27daf2dfbb8f5dbb19
parent957ddeab73bf6c1f1dfeecfb3f8a49ba5218d25b (diff)
downloadydb-65240f91f3d36daefcc470fc92240868d6e7cd0a.tar.gz
KIKIMR-9748: fix wakeup calculation when RPS exhausted
ref:581e77a573a0b6f0aad765cfa0ae0f87856a5e65
-rw-r--r--ydb/core/util/operation_queue.h9
-rw-r--r--ydb/core/util/operation_queue_ut.cpp32
2 files changed, 39 insertions, 2 deletions
diff --git a/ydb/core/util/operation_queue.h b/ydb/core/util/operation_queue.h
index 7c69681a93..e4075e0515 100644
--- a/ydb/core/util/operation_queue.h
+++ b/ydb/core/util/operation_queue.h
@@ -619,8 +619,13 @@ void TOperationQueue<T, TQueue>::ScheduleWakeup() {
auto now = Timer.Now();
if (RunningItems.Empty() && !ReadyQueue.Empty()) {
- // special case when we failed to start anything
- if (!NextWakeup || NextWakeup <= now) {
+ if (TokenBucket.Available() <= 0) {
+ // we didn't start anything because of RPS limit
+ NextWakeup = now + TokenBucket.NextAvailableDelay();
+ Timer.SetWakeupTimer(NextWakeup);
+ return;
+ } else if (!NextWakeup || NextWakeup <= now) {
+ // special case when we failed to start anything
NextWakeup = now + Config.WakeupInterval;
Timer.SetWakeupTimer(NextWakeup);
return;
diff --git a/ydb/core/util/operation_queue_ut.cpp b/ydb/core/util/operation_queue_ut.cpp
index bd6014eb56..3c88f45a6b 100644
--- a/ydb/core/util/operation_queue_ut.cpp
+++ b/ydb/core/util/operation_queue_ut.cpp
@@ -919,6 +919,38 @@ Y_UNIT_TEST_SUITE(TCircularOperationQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 5UL);
}
+ Y_UNIT_TEST(CheckWakeupWhenRPSExhausted) {
+ // regression case for the following case:
+ // Config: Inglight = 1, RPS = 1/s, MinOperationRepeatDelay = 60s
+ // 1. Enqueue multiple operations, 1 is running
+ // 3. OnDone1, OnDone2 within same second, RPS is now exhausted.
+ // 4. Wakeup should be +1 second, not +MinOperationRepeatDelay=60s as caused by bug
+
+ TQueue::TConfig config;
+ config.IsCircular = true;
+ config.InflightLimit = 1;
+ config.MaxRate = 1.0;
+ config.Timeout = Timeout;
+ config.MinOperationRepeatDelay = TDuration::Seconds(60);
+ TOperationStarter starter;
+
+ TQueue queue(config, starter, starter);
+ queue.Start();
+
+ queue.Enqueue(1);
+ queue.Enqueue(2);
+ queue.Enqueue(3);
+
+ UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 1UL);
+ queue.OnDone(1);
+ queue.OnDone(2);
+
+ UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 0UL);
+
+ auto goldWakeup = starter.TimeProvider.Now() + TDuration::Seconds(1);
+ UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.back(), goldWakeup);
+ }
+
Y_UNIT_TEST(CheckStartAfterStop) {
TQueue::TConfig config;
config.IsCircular = true;