diff options
author | Evgeniy Ivanov <i@eivanov.com> | 2022-05-20 14:41:35 +0300 |
---|---|---|
committer | Evgeniy Ivanov <i@eivanov.com> | 2022-05-20 14:41:35 +0300 |
commit | 7370cdad1bcf0dacc21edefba942d7ace94f374e (patch) | |
tree | f9de0b715453ed9805e3bc1a1a1740b9335374ae | |
parent | 5b4ed9e881a9fc8cb3aa1d0e1daed0c1d5f06294 (diff) | |
download | ydb-7370cdad1bcf0dacc21edefba942d7ace94f374e.tar.gz |
KIKIMR-14761: fix bug to always wakeup compaction queue
ref:529e94ef1c3510dae5a0239d85ae08f5870c60ca
-rw-r--r-- | library/cpp/actors/core/monotonic.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/operation_queue_timer.h | 21 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__compaction.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_compaction.cpp | 55 | ||||
-rw-r--r-- | ydb/core/util/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/util/operation_queue.h | 36 | ||||
-rw-r--r-- | ydb/core/util/operation_queue_priority_ut.cpp | 17 | ||||
-rw-r--r-- | ydb/core/util/operation_queue_ut.cpp | 53 | ||||
-rw-r--r-- | ydb/core/util/token_bucket.h | 15 |
9 files changed, 157 insertions, 57 deletions
diff --git a/library/cpp/actors/core/monotonic.cpp b/library/cpp/actors/core/monotonic.cpp index 3465149dbe..cff9111ee3 100644 --- a/library/cpp/actors/core/monotonic.cpp +++ b/library/cpp/actors/core/monotonic.cpp @@ -21,3 +21,11 @@ namespace NActors { } } // namespace NActors + +template<> +void Out<NActors::TMonotonic>( + IOutputStream& o, + NActors::TMonotonic t) +{ + o << t - NActors::TMonotonic::Zero(); +} diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h index f4fa7ace57..14d5530153 100644 --- a/ydb/core/tx/schemeshard/operation_queue_timer.h +++ b/ydb/core/tx/schemeshard/operation_queue_timer.h @@ -36,7 +36,7 @@ class TOperationQueueWithTimer private: NKikimrServices::EServiceKikimr ServiceId = NKikimrServices::EServiceKikimr(LogServiceId); TActorId LongTimerId; - TInstant When; + TMonotonic When; public: TOperationQueueWithTimer(const typename TBase::TConfig& config, @@ -60,31 +60,32 @@ public: TActorBase::PassAway(); } - TInstant GetWakeupTime() const { return When; } + TDuration GetWakeupDelta() const { return When - const_cast<TThis*>(this)->Now(); } private: // ITimer, note that it is made private, // since it should be called only from TBase - void SetWakeupTimer(TInstant t) override { - if (When > t) + void SetWakeupTimer(TDuration delta) override { + if (LongTimerId) this->Send(LongTimerId, new TEvents::TEvPoison); - When = t; - auto delta = t - this->Now(); + When = this->Now() + delta; auto ctx = TActivationContext::ActorContextFor(TActorBase::SelfId()); LongTimerId = CreateLongTimer(ctx, delta, new IEventHandle(TActorBase::SelfId(), TActorBase::SelfId(), new TEvWakeupQueue)); LOG_DEBUG_S(ctx, ServiceId, - "Operation queue set NextWakeup# " << When << ", delta# " << delta.Seconds() << " seconds"); + "Operation queue set wakeup after delta# " << delta.Seconds() << " seconds"); } - TInstant Now() override { - return AppData()->TimeProvider->Now(); + TMonotonic Now() override { + return AppData()->MonotonicTimeProvider->Now(); } void HandleWakeup(const TActorContext &ctx) { - LOG_DEBUG_S(ctx, ServiceId, "Operation queue wakeup# " << this->Now()); + LOG_DEBUG_S(ctx, ServiceId, "Operation queue wakeup"); + When = {}; + LongTimerId = {}; TBase::Wakeup(); } diff --git a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp index d16e5b0ae8..913ed27115 100644 --- a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp @@ -24,7 +24,7 @@ NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TSha LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "RunBackgroundCompaction " "for pathId# " << pathId << ", datashard# " << datashardId << ", compactionInfo# " << info - << ", next wakeup# " << CompactionQueue->GetWakeupTime() + << ", next wakeup in# " << CompactionQueue->GetWakeupDelta() << ", rate# " << CompactionQueue->GetRate() << ", in queue# " << CompactionQueue->Size() << " shards" << ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards" @@ -65,7 +65,7 @@ void TSchemeShard::OnBackgroundCompactionTimeout(const TShardCompactionInfo& inf LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Background compaction timeout " "for pathId# " << pathId << ", datashard# " << datashardId << ", compactionInfo# " << info - << ", next wakeup# " << CompactionQueue->GetWakeupTime() + << ", next wakeup in# " << CompactionQueue->GetWakeupDelta() << ", rate# " << CompactionQueue->GetRate() << ", in queue# " << CompactionQueue->Size() << " shards" << ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards" @@ -94,7 +94,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished background compaction of unknown shard " "for pathId# " << pathId << ", datashard# " << tabletId << " in# " << duration.MilliSeconds() << " ms, with status# " << (int)record.GetStatus() - << ", next wakeup# " << CompactionQueue->GetWakeupTime() + << ", next wakeup in# " << CompactionQueue->GetWakeupDelta() << ", rate# " << CompactionQueue->GetRate() << ", in queue# " << CompactionQueue->Size() << " shards" << ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards" @@ -105,7 +105,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T "for pathId# " << pathId << ", datashard# " << tabletId << ", shardIdx# " << shardIdx << " in# " << duration.MilliSeconds() << " ms, with status# " << (int)record.GetStatus() - << ", next wakeup# " << CompactionQueue->GetWakeupTime() + << ", next wakeup in# " << CompactionQueue->GetWakeupDelta() << ", rate# " << CompactionQueue->GetRate() << ", in queue# " << CompactionQueue->Size() << " shards" << ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards" diff --git a/ydb/core/tx/schemeshard/ut_compaction.cpp b/ydb/core/tx/schemeshard/ut_compaction.cpp index 8d768d3aa0..b5e1fac3fe 100644 --- a/ydb/core/tx/schemeshard/ut_compaction.cpp +++ b/ydb/core/tx/schemeshard/ut_compaction.cpp @@ -12,6 +12,9 @@ using namespace NSchemeShardUT_Private; namespace { +constexpr TDuration DefaultTimeout = TDuration::Seconds(30); +constexpr TDuration RetryDelay = TDuration::Seconds(1); + using TTableInfoMap = THashMap<TString, NKikimrTxDataShard::TEvGetInfoResponse::TUserTable>; TShardCompactionInfo MakeCompactionInfo(ui64 idx, ui64 ts, ui64 sh = 0, ui64 d = 0) { @@ -123,6 +126,8 @@ void SetFeatures( compactionConfig->MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0); compactionConfig->MutableBackgroundCompactionConfig()->SetRowCountThreshold(0); compactionConfig->MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true); + compactionConfig->MutableBackgroundCompactionConfig()->SetTimeoutSeconds(DefaultTimeout.Seconds()); + compactionConfig->MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(RetryDelay.Seconds()); // 1 compaction / second compactionConfig->MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0); @@ -163,6 +168,8 @@ void DisableBackgroundCompactionViaRestart( compactionConfig.MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0); compactionConfig.MutableBackgroundCompactionConfig()->SetRowCountThreshold(0); compactionConfig.MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true); + compactionConfig.MutableBackgroundCompactionConfig()->SetTimeoutSeconds(DefaultTimeout.Seconds()); + compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(RetryDelay.Seconds()); // 1 compaction / second compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0); @@ -188,6 +195,8 @@ void EnableBackgroundCompactionViaRestart( compactionConfig.MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0); compactionConfig.MutableBackgroundCompactionConfig()->SetRowCountThreshold(0); compactionConfig.MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true); + compactionConfig.MutableBackgroundCompactionConfig()->SetTimeoutSeconds(DefaultTimeout.Seconds()); + compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(RetryDelay.Seconds()); // 1 compaction / second compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0); @@ -566,9 +575,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) { TTestEnv env(runtime); runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); - //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); - //runtime.SetLogPriority(NKikimrServices::BOOTSTRAPPER, NActors::NLog::PRI_TRACE); // disable for the case, when compaction is enabled by default SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, false); @@ -610,6 +617,50 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) { // original table should not be compacted as well CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/Simple"); } + + Y_UNIT_TEST(SchemeshardShouldHandleCompactionTimeouts) { + // note that this test is good to test TOperationQueueWithTimer + + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true); + + size_t compactionResultCount = 0; + + // capture original observer func by setting dummy one + auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) { + return TTestActorRuntime::EEventAction::PROCESS; + }); + // now set our observer backed up by original + runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::EvCompactTableResult: { + Y_UNUSED(ev.Release()); + ++compactionResultCount; + return TTestActorRuntime::EEventAction::DROP; + } + default: + return originalObserver(runtime, ev); + } + }); + ui64 txId = 1000; + + // note that we create 1-sharded table to avoid complications + CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId); + + env.SimulateSleep(runtime, DefaultTimeout + RetryDelay + TDuration::Seconds(1)); + UNIT_ASSERT_VALUES_EQUAL(compactionResultCount, 1UL); + + env.SimulateSleep(runtime, DefaultTimeout + RetryDelay + TDuration::Seconds(1)); + UNIT_ASSERT_VALUES_EQUAL(compactionResultCount, 2UL); + + env.SimulateSleep(runtime, DefaultTimeout + RetryDelay + TDuration::Seconds(1)); + UNIT_ASSERT_VALUES_EQUAL(compactionResultCount, 3UL); + } }; namespace NKikimr::NSchemeShard { diff --git a/ydb/core/util/CMakeLists.txt b/ydb/core/util/CMakeLists.txt index ae874ee1bf..65da8d7dee 100644 --- a/ydb/core/util/CMakeLists.txt +++ b/ydb/core/util/CMakeLists.txt @@ -11,6 +11,7 @@ add_library(ydb-core-util) target_link_libraries(ydb-core-util PUBLIC contrib-libs-cxxsupp yutil + cpp-actors-core actors-interconnect-mock cpp-actors-util cpp-containers-stack_vector diff --git a/ydb/core/util/operation_queue.h b/ydb/core/util/operation_queue.h index 19c8750b49..514574d160 100644 --- a/ydb/core/util/operation_queue.h +++ b/ydb/core/util/operation_queue.h @@ -4,7 +4,9 @@ #include "intrusive_heap.h" #include "token_bucket.h" -#include <library/cpp/time_provider/time_provider.h> +#include <ydb/core/base/defs.h> + +#include <library/cpp/actors/core/monotonic.h> #include <util/datetime/base.h> #include <util/generic/algorithm.h> @@ -26,9 +28,9 @@ enum class EStartStatus { class ITimer { public: // asks to call TOperationQueue::Wakeup() - virtual void SetWakeupTimer(TInstant t) = 0; + virtual void SetWakeupTimer(TDuration delta) = 0; - virtual TInstant Now() = 0; + virtual TMonotonic Now() = 0; }; template <typename T> @@ -195,18 +197,18 @@ public: struct TItemWithTs { T Item; - TInstant Timestamp; + TMonotonic Timestamp; explicit TItemWithTs(const T& item) : Item(item) { } - TItemWithTs(const T& item, TInstant s) + TItemWithTs(const T& item, TMonotonic s) : Item(item) , Timestamp(s) { } - TItemWithTs(T&& item, TInstant s) + TItemWithTs(T&& item, TMonotonic s) : Item(std::move(item)) , Timestamp(s) { } @@ -259,10 +261,10 @@ private: TRunningItems RunningItems; TWaitingItems WaitingItems; - TTokenBucket TokenBucket; + TTokenBucketBase<TMonotonic> TokenBucket; bool HasRateLimit = false; - TInstant NextWakeup; + TMonotonic NextWakeup; bool Running = false; bool WasRunning = false; @@ -527,8 +529,10 @@ TDuration TOperationQueue<T, TQueue>::OnDone(const T& item) { template <typename T, typename TQueue> void TOperationQueue<T, TQueue>::Wakeup() { + NextWakeup = {}; StartOperations(); - ScheduleWakeup(); + if (!NextWakeup) + ScheduleWakeup(); } template <typename T, typename TQueue> @@ -622,17 +626,17 @@ void TOperationQueue<T, TQueue>::ScheduleWakeup() { if (TokenBucket.Available() <= 0) { // we didn't start anything because of RPS limit NextWakeup = now + TokenBucket.NextAvailableDelay(); - Timer.SetWakeupTimer(NextWakeup); + Timer.SetWakeupTimer(TokenBucket.NextAvailableDelay()); return; - } else if (!NextWakeup || NextWakeup <= now) { + } else if (!NextWakeup) { // special case when we failed to start anything NextWakeup = now + Config.WakeupInterval; - Timer.SetWakeupTimer(NextWakeup); + Timer.SetWakeupTimer(Config.WakeupInterval); return; } } - auto wakeup = TInstant::Max(); + auto wakeup = TMonotonic::Max(); if (Config.Timeout && !RunningItems.Empty()) { const auto& item = RunningItems.Front(); @@ -644,7 +648,7 @@ void TOperationQueue<T, TQueue>::ScheduleWakeup() { wakeup = Min(wakeup, item.Timestamp + Config.MinOperationRepeatDelay); } - if (wakeup == TInstant::Max()) + if (wakeup == TMonotonic::Max()) return; // no sense to wakeup earlier that rate limit allows @@ -652,9 +656,9 @@ void TOperationQueue<T, TQueue>::ScheduleWakeup() { wakeup = Max(wakeup, now + TokenBucket.NextAvailableDelay()); } - if (!NextWakeup || NextWakeup > wakeup || NextWakeup <= now) { + if (!NextWakeup || NextWakeup > wakeup) { NextWakeup = wakeup; - Timer.SetWakeupTimer(NextWakeup); + Timer.SetWakeupTimer(wakeup - now); } } diff --git a/ydb/core/util/operation_queue_priority_ut.cpp b/ydb/core/util/operation_queue_priority_ut.cpp index 32bd274fe6..687350cc3b 100644 --- a/ydb/core/util/operation_queue_priority_ut.cpp +++ b/ydb/core/util/operation_queue_priority_ut.cpp @@ -2,6 +2,7 @@ #include "circular_queue.h" +#include <library/cpp/actors/core/monotonic_provider.h> #include <library/cpp/testing/unittest/registar.h> #include <util/generic/ptr.h> @@ -65,9 +66,9 @@ namespace { TDuration Timeout = TDuration::Minutes(10); -class TSimpleTimeProvider : public ITimeProvider { +class TSimpleTimeProvider : public NActors::IMonotonicTimeProvider { public: - TInstant Now() override { + TMonotonic Now() override { return Now_; } @@ -75,12 +76,12 @@ public: Now_ += delta; } - void Move(TInstant now) { + void Move(TMonotonic now) { Now_ = now; } private: - TInstant Now_; + TMonotonic Now_; }; @@ -88,7 +89,7 @@ struct TOperationStarter : public TPriorityQueue::IStarter, public NOperationQue TSimpleTimeProvider TimeProvider; TVector<TPriorityItem> StartHistory; - TVector<TInstant> WakeupHistory; + TVector<TMonotonic> WakeupHistory; NOperationQueue::EStartStatus StartResult = NOperationQueue::EStartStatus::EOperationRunning; @@ -98,15 +99,15 @@ struct TOperationStarter : public TPriorityQueue::IStarter, public NOperationQue return StartResult; } - void SetWakeupTimer(TInstant t) override + void SetWakeupTimer(TDuration delta) override { - WakeupHistory.push_back(t); + WakeupHistory.push_back(this->Now() + delta); } void OnTimeout(const TPriorityItem&) override {} - TInstant Now() override + TMonotonic Now() override { return TimeProvider.Now(); } diff --git a/ydb/core/util/operation_queue_ut.cpp b/ydb/core/util/operation_queue_ut.cpp index 1015cb217e..7daa22ba67 100644 --- a/ydb/core/util/operation_queue_ut.cpp +++ b/ydb/core/util/operation_queue_ut.cpp @@ -2,6 +2,7 @@ #include "circular_queue.h" +#include <library/cpp/actors/core/monotonic_provider.h> #include <library/cpp/testing/unittest/registar.h> #include <util/generic/ptr.h> @@ -14,9 +15,9 @@ namespace { TDuration Timeout = TDuration::Minutes(10); -class TSimpleTimeProvider : public ITimeProvider { +class TSimpleTimeProvider : public NActors::IMonotonicTimeProvider { public: - TInstant Now() override { + TMonotonic Now() override { return Now_; } @@ -24,12 +25,12 @@ public: Now_ += delta; } - void Move(TInstant now) { + void Move(TMonotonic now) { Now_ = now; } private: - TInstant Now_; + TMonotonic Now_; }; using TQueue = TOperationQueue<int, TFifoQueue<int>>; @@ -38,7 +39,7 @@ struct TOperationStarter : public TQueue::IStarter, public NOperationQueue::ITim TSimpleTimeProvider TimeProvider; TVector<int> StartHistory; - TVector<TInstant> WakeupHistory; + TVector<TMonotonic> WakeupHistory; NOperationQueue::EStartStatus StartResult = NOperationQueue::EStartStatus::EOperationRunning; @@ -48,15 +49,15 @@ struct TOperationStarter : public TQueue::IStarter, public NOperationQueue::ITim return StartResult; } - void SetWakeupTimer(TInstant t) override + void SetWakeupTimer(TDuration delta) override { - WakeupHistory.push_back(t); + WakeupHistory.push_back(this->Now() + delta); } void OnTimeout(const int&) override {} - TInstant Now() override + TMonotonic Now() override { return TimeProvider.Now(); } @@ -68,7 +69,7 @@ void CheckQueue( TVector<TQueue::TItemWithTs> runningGold, TVector<int> inQueueGold, TVector<int> startHistory, - TVector<TInstant> wakeupHistory) + TVector<TMonotonic> wakeupHistory) { auto running = queue.GetRunning(); auto inQueue = queue.GetQueue(); @@ -107,7 +108,7 @@ void TestStartInflightBeforeStart(int inflight, int pushN = 10) { runningGold.push_back({i, now}); } - TVector<TInstant> wakeupsGold = + TVector<TMonotonic> wakeupsGold = { starter.TimeProvider.Now() + config.Timeout }; CheckQueue( @@ -131,7 +132,7 @@ void TestInflightWithEnqueue(int inflight, int pushN = 10) { TVector<TQueue::TItemWithTs> runningGold; TVector<int> queuedGold; - TVector<TInstant> wakeupsGold = + TVector<TMonotonic> wakeupsGold = { starter.TimeProvider.Now() + TDuration::Seconds(1) + config.Timeout }; int lastStarted = 0; @@ -988,6 +989,36 @@ Y_UNIT_TEST_SUITE(TCircularOperationQueueTest) { UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 2UL); UNIT_ASSERT(starter.WakeupHistory.back() > starter.TimeProvider.Now()); } + + Y_UNIT_TEST(ShouldTolerateInaccurateTimer) { + // should properly work when wokeup earlier than requested (i.e. properly set new timer) + // regression test: woke up earlier and didn't set new wakeup + + TQueue::TConfig config; + config.IsCircular = true; + config.InflightLimit = 1; + config.MaxRate = 0.0; + config.Timeout = Timeout; + TOperationStarter starter; + + TQueue queue(config, starter, starter); + queue.Start(); + + UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.size(), 0UL); + + queue.Enqueue(1); + + UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.size(), 1UL); + + // expect to wakeup on Timeout, but wakeup earlier + starter.TimeProvider.Move(Timeout - TDuration::Seconds(1)); + queue.Wakeup(); + + UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 0UL); + UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 1UL); + + UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.size(), 2UL); + } }; } // NOperationQueue diff --git a/ydb/core/util/token_bucket.h b/ydb/core/util/token_bucket.h index 060f8c75a6..8ed8475e53 100644 --- a/ydb/core/util/token_bucket.h +++ b/ydb/core/util/token_bucket.h @@ -6,16 +6,17 @@ namespace NKikimr { -class TTokenBucket { +template<typename TTime> +class TTokenBucketBase { double Tokens = 0.0; // tokens currenty in bucket double Rate = 0.0; // tokens filling rate [tokens/sec] double Capacity = 0.0; // maximum amount of tokens allowed in bucket - TInstant LastFill = TInstant::Zero(); + TTime LastFill; public: // Create unlimited bucket // NOTE: any bucket is created fully filled - TTokenBucket() { + TTokenBucketBase() { SetUnlimited(); } @@ -46,7 +47,7 @@ public: } // Fill bucket with tokens, should be done just before Take() - void Fill(TInstant now) { + void Fill(TTime now) { // NOTE: LastFill is allowed to be zero, the following code will work OK TDuration elapsed = now - LastFill; Tokens += elapsed.SecondsFloat() * Rate; @@ -62,7 +63,7 @@ public: } // Fill and take if available, returns taken amount - double FillAndTryTake(TInstant now, double amount) { + double FillAndTryTake(TTime now, double amount) { Fill(now); amount = Min(amount, Tokens); Take(amount); @@ -78,7 +79,7 @@ public: return TDuration::MicroSeconds(std::ceil(Available() * -1000000.0 / Rate)); } - TDuration FillAndNextAvailableDelay(TInstant now) { + TDuration FillAndNextAvailableDelay(TTime now) { Fill(now); return NextAvailableDelay(); } @@ -101,4 +102,6 @@ public: } }; +using TTokenBucket = TTokenBucketBase<TInstant>; + } // namespace NKikimr |