aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <i@eivanov.com>2022-05-20 14:41:35 +0300
committerEvgeniy Ivanov <i@eivanov.com>2022-05-20 14:41:35 +0300
commit7370cdad1bcf0dacc21edefba942d7ace94f374e (patch)
treef9de0b715453ed9805e3bc1a1a1740b9335374ae
parent5b4ed9e881a9fc8cb3aa1d0e1daed0c1d5f06294 (diff)
downloadydb-7370cdad1bcf0dacc21edefba942d7ace94f374e.tar.gz
KIKIMR-14761: fix bug to always wakeup compaction queue
ref:529e94ef1c3510dae5a0239d85ae08f5870c60ca
-rw-r--r--library/cpp/actors/core/monotonic.cpp8
-rw-r--r--ydb/core/tx/schemeshard/operation_queue_timer.h21
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__compaction.cpp8
-rw-r--r--ydb/core/tx/schemeshard/ut_compaction.cpp55
-rw-r--r--ydb/core/util/CMakeLists.txt1
-rw-r--r--ydb/core/util/operation_queue.h36
-rw-r--r--ydb/core/util/operation_queue_priority_ut.cpp17
-rw-r--r--ydb/core/util/operation_queue_ut.cpp53
-rw-r--r--ydb/core/util/token_bucket.h15
9 files changed, 157 insertions, 57 deletions
diff --git a/library/cpp/actors/core/monotonic.cpp b/library/cpp/actors/core/monotonic.cpp
index 3465149dbe..cff9111ee3 100644
--- a/library/cpp/actors/core/monotonic.cpp
+++ b/library/cpp/actors/core/monotonic.cpp
@@ -21,3 +21,11 @@ namespace NActors {
}
} // namespace NActors
+
+template<>
+void Out<NActors::TMonotonic>(
+ IOutputStream& o,
+ NActors::TMonotonic t)
+{
+ o << t - NActors::TMonotonic::Zero();
+}
diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h
index f4fa7ace57..14d5530153 100644
--- a/ydb/core/tx/schemeshard/operation_queue_timer.h
+++ b/ydb/core/tx/schemeshard/operation_queue_timer.h
@@ -36,7 +36,7 @@ class TOperationQueueWithTimer
private:
NKikimrServices::EServiceKikimr ServiceId = NKikimrServices::EServiceKikimr(LogServiceId);
TActorId LongTimerId;
- TInstant When;
+ TMonotonic When;
public:
TOperationQueueWithTimer(const typename TBase::TConfig& config,
@@ -60,31 +60,32 @@ public:
TActorBase::PassAway();
}
- TInstant GetWakeupTime() const { return When; }
+ TDuration GetWakeupDelta() const { return When - const_cast<TThis*>(this)->Now(); }
private:
// ITimer, note that it is made private,
// since it should be called only from TBase
- void SetWakeupTimer(TInstant t) override {
- if (When > t)
+ void SetWakeupTimer(TDuration delta) override {
+ if (LongTimerId)
this->Send(LongTimerId, new TEvents::TEvPoison);
- When = t;
- auto delta = t - this->Now();
+ When = this->Now() + delta;
auto ctx = TActivationContext::ActorContextFor(TActorBase::SelfId());
LongTimerId = CreateLongTimer(ctx, delta,
new IEventHandle(TActorBase::SelfId(), TActorBase::SelfId(), new TEvWakeupQueue));
LOG_DEBUG_S(ctx, ServiceId,
- "Operation queue set NextWakeup# " << When << ", delta# " << delta.Seconds() << " seconds");
+ "Operation queue set wakeup after delta# " << delta.Seconds() << " seconds");
}
- TInstant Now() override {
- return AppData()->TimeProvider->Now();
+ TMonotonic Now() override {
+ return AppData()->MonotonicTimeProvider->Now();
}
void HandleWakeup(const TActorContext &ctx) {
- LOG_DEBUG_S(ctx, ServiceId, "Operation queue wakeup# " << this->Now());
+ LOG_DEBUG_S(ctx, ServiceId, "Operation queue wakeup");
+ When = {};
+ LongTimerId = {};
TBase::Wakeup();
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
index d16e5b0ae8..913ed27115 100644
--- a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
@@ -24,7 +24,7 @@ NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TSha
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "RunBackgroundCompaction "
"for pathId# " << pathId << ", datashard# " << datashardId
<< ", compactionInfo# " << info
- << ", next wakeup# " << CompactionQueue->GetWakeupTime()
+ << ", next wakeup in# " << CompactionQueue->GetWakeupDelta()
<< ", rate# " << CompactionQueue->GetRate()
<< ", in queue# " << CompactionQueue->Size() << " shards"
<< ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards"
@@ -65,7 +65,7 @@ void TSchemeShard::OnBackgroundCompactionTimeout(const TShardCompactionInfo& inf
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Background compaction timeout "
"for pathId# " << pathId << ", datashard# " << datashardId
<< ", compactionInfo# " << info
- << ", next wakeup# " << CompactionQueue->GetWakeupTime()
+ << ", next wakeup in# " << CompactionQueue->GetWakeupDelta()
<< ", rate# " << CompactionQueue->GetRate()
<< ", in queue# " << CompactionQueue->Size() << " shards"
<< ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards"
@@ -94,7 +94,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T
LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished background compaction of unknown shard "
"for pathId# " << pathId << ", datashard# " << tabletId
<< " in# " << duration.MilliSeconds() << " ms, with status# " << (int)record.GetStatus()
- << ", next wakeup# " << CompactionQueue->GetWakeupTime()
+ << ", next wakeup in# " << CompactionQueue->GetWakeupDelta()
<< ", rate# " << CompactionQueue->GetRate()
<< ", in queue# " << CompactionQueue->Size() << " shards"
<< ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards"
@@ -105,7 +105,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T
"for pathId# " << pathId << ", datashard# " << tabletId
<< ", shardIdx# " << shardIdx
<< " in# " << duration.MilliSeconds() << " ms, with status# " << (int)record.GetStatus()
- << ", next wakeup# " << CompactionQueue->GetWakeupTime()
+ << ", next wakeup in# " << CompactionQueue->GetWakeupDelta()
<< ", rate# " << CompactionQueue->GetRate()
<< ", in queue# " << CompactionQueue->Size() << " shards"
<< ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards"
diff --git a/ydb/core/tx/schemeshard/ut_compaction.cpp b/ydb/core/tx/schemeshard/ut_compaction.cpp
index 8d768d3aa0..b5e1fac3fe 100644
--- a/ydb/core/tx/schemeshard/ut_compaction.cpp
+++ b/ydb/core/tx/schemeshard/ut_compaction.cpp
@@ -12,6 +12,9 @@ using namespace NSchemeShardUT_Private;
namespace {
+constexpr TDuration DefaultTimeout = TDuration::Seconds(30);
+constexpr TDuration RetryDelay = TDuration::Seconds(1);
+
using TTableInfoMap = THashMap<TString, NKikimrTxDataShard::TEvGetInfoResponse::TUserTable>;
TShardCompactionInfo MakeCompactionInfo(ui64 idx, ui64 ts, ui64 sh = 0, ui64 d = 0) {
@@ -123,6 +126,8 @@ void SetFeatures(
compactionConfig->MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0);
compactionConfig->MutableBackgroundCompactionConfig()->SetRowCountThreshold(0);
compactionConfig->MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true);
+ compactionConfig->MutableBackgroundCompactionConfig()->SetTimeoutSeconds(DefaultTimeout.Seconds());
+ compactionConfig->MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(RetryDelay.Seconds());
// 1 compaction / second
compactionConfig->MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0);
@@ -163,6 +168,8 @@ void DisableBackgroundCompactionViaRestart(
compactionConfig.MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0);
compactionConfig.MutableBackgroundCompactionConfig()->SetRowCountThreshold(0);
compactionConfig.MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true);
+ compactionConfig.MutableBackgroundCompactionConfig()->SetTimeoutSeconds(DefaultTimeout.Seconds());
+ compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(RetryDelay.Seconds());
// 1 compaction / second
compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0);
@@ -188,6 +195,8 @@ void EnableBackgroundCompactionViaRestart(
compactionConfig.MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0);
compactionConfig.MutableBackgroundCompactionConfig()->SetRowCountThreshold(0);
compactionConfig.MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true);
+ compactionConfig.MutableBackgroundCompactionConfig()->SetTimeoutSeconds(DefaultTimeout.Seconds());
+ compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(RetryDelay.Seconds());
// 1 compaction / second
compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0);
@@ -566,9 +575,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) {
TTestEnv env(runtime);
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
- //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
- //runtime.SetLogPriority(NKikimrServices::BOOTSTRAPPER, NActors::NLog::PRI_TRACE);
// disable for the case, when compaction is enabled by default
SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, false);
@@ -610,6 +617,50 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) {
// original table should not be compacted as well
CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/Simple");
}
+
+ Y_UNIT_TEST(SchemeshardShouldHandleCompactionTimeouts) {
+ // note that this test is good to test TOperationQueueWithTimer
+
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+
+ SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true);
+
+ size_t compactionResultCount = 0;
+
+ // capture original observer func by setting dummy one
+ auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+ // now set our observer backed up by original
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::EvCompactTableResult: {
+ Y_UNUSED(ev.Release());
+ ++compactionResultCount;
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ default:
+ return originalObserver(runtime, ev);
+ }
+ });
+ ui64 txId = 1000;
+
+ // note that we create 1-sharded table to avoid complications
+ CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId);
+
+ env.SimulateSleep(runtime, DefaultTimeout + RetryDelay + TDuration::Seconds(1));
+ UNIT_ASSERT_VALUES_EQUAL(compactionResultCount, 1UL);
+
+ env.SimulateSleep(runtime, DefaultTimeout + RetryDelay + TDuration::Seconds(1));
+ UNIT_ASSERT_VALUES_EQUAL(compactionResultCount, 2UL);
+
+ env.SimulateSleep(runtime, DefaultTimeout + RetryDelay + TDuration::Seconds(1));
+ UNIT_ASSERT_VALUES_EQUAL(compactionResultCount, 3UL);
+ }
};
namespace NKikimr::NSchemeShard {
diff --git a/ydb/core/util/CMakeLists.txt b/ydb/core/util/CMakeLists.txt
index ae874ee1bf..65da8d7dee 100644
--- a/ydb/core/util/CMakeLists.txt
+++ b/ydb/core/util/CMakeLists.txt
@@ -11,6 +11,7 @@ add_library(ydb-core-util)
target_link_libraries(ydb-core-util PUBLIC
contrib-libs-cxxsupp
yutil
+ cpp-actors-core
actors-interconnect-mock
cpp-actors-util
cpp-containers-stack_vector
diff --git a/ydb/core/util/operation_queue.h b/ydb/core/util/operation_queue.h
index 19c8750b49..514574d160 100644
--- a/ydb/core/util/operation_queue.h
+++ b/ydb/core/util/operation_queue.h
@@ -4,7 +4,9 @@
#include "intrusive_heap.h"
#include "token_bucket.h"
-#include <library/cpp/time_provider/time_provider.h>
+#include <ydb/core/base/defs.h>
+
+#include <library/cpp/actors/core/monotonic.h>
#include <util/datetime/base.h>
#include <util/generic/algorithm.h>
@@ -26,9 +28,9 @@ enum class EStartStatus {
class ITimer {
public:
// asks to call TOperationQueue::Wakeup()
- virtual void SetWakeupTimer(TInstant t) = 0;
+ virtual void SetWakeupTimer(TDuration delta) = 0;
- virtual TInstant Now() = 0;
+ virtual TMonotonic Now() = 0;
};
template <typename T>
@@ -195,18 +197,18 @@ public:
struct TItemWithTs {
T Item;
- TInstant Timestamp;
+ TMonotonic Timestamp;
explicit TItemWithTs(const T& item)
: Item(item)
{ }
- TItemWithTs(const T& item, TInstant s)
+ TItemWithTs(const T& item, TMonotonic s)
: Item(item)
, Timestamp(s)
{ }
- TItemWithTs(T&& item, TInstant s)
+ TItemWithTs(T&& item, TMonotonic s)
: Item(std::move(item))
, Timestamp(s)
{ }
@@ -259,10 +261,10 @@ private:
TRunningItems RunningItems;
TWaitingItems WaitingItems;
- TTokenBucket TokenBucket;
+ TTokenBucketBase<TMonotonic> TokenBucket;
bool HasRateLimit = false;
- TInstant NextWakeup;
+ TMonotonic NextWakeup;
bool Running = false;
bool WasRunning = false;
@@ -527,8 +529,10 @@ TDuration TOperationQueue<T, TQueue>::OnDone(const T& item) {
template <typename T, typename TQueue>
void TOperationQueue<T, TQueue>::Wakeup() {
+ NextWakeup = {};
StartOperations();
- ScheduleWakeup();
+ if (!NextWakeup)
+ ScheduleWakeup();
}
template <typename T, typename TQueue>
@@ -622,17 +626,17 @@ void TOperationQueue<T, TQueue>::ScheduleWakeup() {
if (TokenBucket.Available() <= 0) {
// we didn't start anything because of RPS limit
NextWakeup = now + TokenBucket.NextAvailableDelay();
- Timer.SetWakeupTimer(NextWakeup);
+ Timer.SetWakeupTimer(TokenBucket.NextAvailableDelay());
return;
- } else if (!NextWakeup || NextWakeup <= now) {
+ } else if (!NextWakeup) {
// special case when we failed to start anything
NextWakeup = now + Config.WakeupInterval;
- Timer.SetWakeupTimer(NextWakeup);
+ Timer.SetWakeupTimer(Config.WakeupInterval);
return;
}
}
- auto wakeup = TInstant::Max();
+ auto wakeup = TMonotonic::Max();
if (Config.Timeout && !RunningItems.Empty()) {
const auto& item = RunningItems.Front();
@@ -644,7 +648,7 @@ void TOperationQueue<T, TQueue>::ScheduleWakeup() {
wakeup = Min(wakeup, item.Timestamp + Config.MinOperationRepeatDelay);
}
- if (wakeup == TInstant::Max())
+ if (wakeup == TMonotonic::Max())
return;
// no sense to wakeup earlier that rate limit allows
@@ -652,9 +656,9 @@ void TOperationQueue<T, TQueue>::ScheduleWakeup() {
wakeup = Max(wakeup, now + TokenBucket.NextAvailableDelay());
}
- if (!NextWakeup || NextWakeup > wakeup || NextWakeup <= now) {
+ if (!NextWakeup || NextWakeup > wakeup) {
NextWakeup = wakeup;
- Timer.SetWakeupTimer(NextWakeup);
+ Timer.SetWakeupTimer(wakeup - now);
}
}
diff --git a/ydb/core/util/operation_queue_priority_ut.cpp b/ydb/core/util/operation_queue_priority_ut.cpp
index 32bd274fe6..687350cc3b 100644
--- a/ydb/core/util/operation_queue_priority_ut.cpp
+++ b/ydb/core/util/operation_queue_priority_ut.cpp
@@ -2,6 +2,7 @@
#include "circular_queue.h"
+#include <library/cpp/actors/core/monotonic_provider.h>
#include <library/cpp/testing/unittest/registar.h>
#include <util/generic/ptr.h>
@@ -65,9 +66,9 @@ namespace {
TDuration Timeout = TDuration::Minutes(10);
-class TSimpleTimeProvider : public ITimeProvider {
+class TSimpleTimeProvider : public NActors::IMonotonicTimeProvider {
public:
- TInstant Now() override {
+ TMonotonic Now() override {
return Now_;
}
@@ -75,12 +76,12 @@ public:
Now_ += delta;
}
- void Move(TInstant now) {
+ void Move(TMonotonic now) {
Now_ = now;
}
private:
- TInstant Now_;
+ TMonotonic Now_;
};
@@ -88,7 +89,7 @@ struct TOperationStarter : public TPriorityQueue::IStarter, public NOperationQue
TSimpleTimeProvider TimeProvider;
TVector<TPriorityItem> StartHistory;
- TVector<TInstant> WakeupHistory;
+ TVector<TMonotonic> WakeupHistory;
NOperationQueue::EStartStatus StartResult = NOperationQueue::EStartStatus::EOperationRunning;
@@ -98,15 +99,15 @@ struct TOperationStarter : public TPriorityQueue::IStarter, public NOperationQue
return StartResult;
}
- void SetWakeupTimer(TInstant t) override
+ void SetWakeupTimer(TDuration delta) override
{
- WakeupHistory.push_back(t);
+ WakeupHistory.push_back(this->Now() + delta);
}
void OnTimeout(const TPriorityItem&) override
{}
- TInstant Now() override
+ TMonotonic Now() override
{
return TimeProvider.Now();
}
diff --git a/ydb/core/util/operation_queue_ut.cpp b/ydb/core/util/operation_queue_ut.cpp
index 1015cb217e..7daa22ba67 100644
--- a/ydb/core/util/operation_queue_ut.cpp
+++ b/ydb/core/util/operation_queue_ut.cpp
@@ -2,6 +2,7 @@
#include "circular_queue.h"
+#include <library/cpp/actors/core/monotonic_provider.h>
#include <library/cpp/testing/unittest/registar.h>
#include <util/generic/ptr.h>
@@ -14,9 +15,9 @@ namespace {
TDuration Timeout = TDuration::Minutes(10);
-class TSimpleTimeProvider : public ITimeProvider {
+class TSimpleTimeProvider : public NActors::IMonotonicTimeProvider {
public:
- TInstant Now() override {
+ TMonotonic Now() override {
return Now_;
}
@@ -24,12 +25,12 @@ public:
Now_ += delta;
}
- void Move(TInstant now) {
+ void Move(TMonotonic now) {
Now_ = now;
}
private:
- TInstant Now_;
+ TMonotonic Now_;
};
using TQueue = TOperationQueue<int, TFifoQueue<int>>;
@@ -38,7 +39,7 @@ struct TOperationStarter : public TQueue::IStarter, public NOperationQueue::ITim
TSimpleTimeProvider TimeProvider;
TVector<int> StartHistory;
- TVector<TInstant> WakeupHistory;
+ TVector<TMonotonic> WakeupHistory;
NOperationQueue::EStartStatus StartResult = NOperationQueue::EStartStatus::EOperationRunning;
@@ -48,15 +49,15 @@ struct TOperationStarter : public TQueue::IStarter, public NOperationQueue::ITim
return StartResult;
}
- void SetWakeupTimer(TInstant t) override
+ void SetWakeupTimer(TDuration delta) override
{
- WakeupHistory.push_back(t);
+ WakeupHistory.push_back(this->Now() + delta);
}
void OnTimeout(const int&) override
{}
- TInstant Now() override
+ TMonotonic Now() override
{
return TimeProvider.Now();
}
@@ -68,7 +69,7 @@ void CheckQueue(
TVector<TQueue::TItemWithTs> runningGold,
TVector<int> inQueueGold,
TVector<int> startHistory,
- TVector<TInstant> wakeupHistory)
+ TVector<TMonotonic> wakeupHistory)
{
auto running = queue.GetRunning();
auto inQueue = queue.GetQueue();
@@ -107,7 +108,7 @@ void TestStartInflightBeforeStart(int inflight, int pushN = 10) {
runningGold.push_back({i, now});
}
- TVector<TInstant> wakeupsGold =
+ TVector<TMonotonic> wakeupsGold =
{ starter.TimeProvider.Now() + config.Timeout };
CheckQueue(
@@ -131,7 +132,7 @@ void TestInflightWithEnqueue(int inflight, int pushN = 10) {
TVector<TQueue::TItemWithTs> runningGold;
TVector<int> queuedGold;
- TVector<TInstant> wakeupsGold =
+ TVector<TMonotonic> wakeupsGold =
{ starter.TimeProvider.Now() + TDuration::Seconds(1) + config.Timeout };
int lastStarted = 0;
@@ -988,6 +989,36 @@ Y_UNIT_TEST_SUITE(TCircularOperationQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 2UL);
UNIT_ASSERT(starter.WakeupHistory.back() > starter.TimeProvider.Now());
}
+
+ Y_UNIT_TEST(ShouldTolerateInaccurateTimer) {
+ // should properly work when wokeup earlier than requested (i.e. properly set new timer)
+ // regression test: woke up earlier and didn't set new wakeup
+
+ TQueue::TConfig config;
+ config.IsCircular = true;
+ config.InflightLimit = 1;
+ config.MaxRate = 0.0;
+ config.Timeout = Timeout;
+ TOperationStarter starter;
+
+ TQueue queue(config, starter, starter);
+ queue.Start();
+
+ UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.size(), 0UL);
+
+ queue.Enqueue(1);
+
+ UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.size(), 1UL);
+
+ // expect to wakeup on Timeout, but wakeup earlier
+ starter.TimeProvider.Move(Timeout - TDuration::Seconds(1));
+ queue.Wakeup();
+
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 1UL);
+
+ UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.size(), 2UL);
+ }
};
} // NOperationQueue
diff --git a/ydb/core/util/token_bucket.h b/ydb/core/util/token_bucket.h
index 060f8c75a6..8ed8475e53 100644
--- a/ydb/core/util/token_bucket.h
+++ b/ydb/core/util/token_bucket.h
@@ -6,16 +6,17 @@
namespace NKikimr {
-class TTokenBucket {
+template<typename TTime>
+class TTokenBucketBase {
double Tokens = 0.0; // tokens currenty in bucket
double Rate = 0.0; // tokens filling rate [tokens/sec]
double Capacity = 0.0; // maximum amount of tokens allowed in bucket
- TInstant LastFill = TInstant::Zero();
+ TTime LastFill;
public:
// Create unlimited bucket
// NOTE: any bucket is created fully filled
- TTokenBucket() {
+ TTokenBucketBase() {
SetUnlimited();
}
@@ -46,7 +47,7 @@ public:
}
// Fill bucket with tokens, should be done just before Take()
- void Fill(TInstant now) {
+ void Fill(TTime now) {
// NOTE: LastFill is allowed to be zero, the following code will work OK
TDuration elapsed = now - LastFill;
Tokens += elapsed.SecondsFloat() * Rate;
@@ -62,7 +63,7 @@ public:
}
// Fill and take if available, returns taken amount
- double FillAndTryTake(TInstant now, double amount) {
+ double FillAndTryTake(TTime now, double amount) {
Fill(now);
amount = Min(amount, Tokens);
Take(amount);
@@ -78,7 +79,7 @@ public:
return TDuration::MicroSeconds(std::ceil(Available() * -1000000.0 / Rate));
}
- TDuration FillAndNextAvailableDelay(TInstant now) {
+ TDuration FillAndNextAvailableDelay(TTime now) {
Fill(now);
return NextAvailableDelay();
}
@@ -101,4 +102,6 @@ public:
}
};
+using TTokenBucket = TTokenBucketBase<TInstant>;
+
} // namespace NKikimr