diff options
author | Evgeniy Ivanov <eivanov89@yandex-team.ru> | 2022-02-11 18:13:43 +0300 |
---|---|---|
committer | Evgeniy Ivanov <eivanov89@yandex-team.ru> | 2022-02-11 18:13:43 +0300 |
commit | 9239e3a134ea21e9ad69912968146743a467bcca (patch) | |
tree | 67a88636e8e455e28bd9931c9d6338fb33636c50 | |
parent | 84ed4aa29f9177dedab2fc11b10f8239d16ff8de (diff) | |
download | ydb-9239e3a134ea21e9ad69912968146743a467bcca.tar.gz |
KIKIMR-9748: calculate compaction rate based on RoundSeconds option
ref:53a8d2dcdf4f6988167b463150c23a7862a30751
-rw-r--r-- | ydb/core/protos/config.proto | 22 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/operation_queue_timer.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__compaction.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.cpp | 8 | ||||
-rw-r--r-- | ydb/core/util/operation_queue.h | 56 | ||||
-rw-r--r-- | ydb/core/util/operation_queue_ut.cpp | 70 |
6 files changed, 147 insertions, 17 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 2164818e8d..b31ca1bb45 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1300,11 +1300,24 @@ message TDataShardConfig { message TCompactionConfig { message TBackgroundCompactionConfig { - optional double Rate = 1 [default = 1]; // 1 compaction / s + optional double MaxRate = 1 [default = 1]; // 1 compaction / s optional uint64 InflightLimit = 2 [default = 1]; + + // After this interval started compaction is considered as finished, + // retry is possible only within MinCompactionRepeatDelaySeconds optional uint64 TimeoutSeconds = 3 [default = 600]; + + // How often schemeshard is waken up to check if it should + // start any compaction. It is used only when there are no + // running compactions, otherwise compaction queue logic + // is triggered either by finished compactions (event from DS) + // or by timeouts (set via TimeoutSeconds) optional uint64 WakeupIntervalSeconds = 4 [default = 60]; - optional uint64 MinCompactionRepeatDelay = 5 [default = 600]; + + // When shard has been compacted, it will be considered for + // compaction only after this amount of time + optional uint64 MinCompactionRepeatDelaySeconds = 5 [default = 600]; + optional uint32 SearchHeightThreshold = 6 [default = 10]; // TODO: enable, when schemeshard receive proper stat @@ -1315,6 +1328,11 @@ message TCompactionConfig { // at all even when searchHeight or deleted rows match // corresponding thresholds optional uint32 RowCountThreshold = 8 [default = 1]; + + // When this option specified, schemeshard calculates compaction rate so + // that the queue is compacted within this interval. + // CompactionRate = Min(QueueSize / RoundSeconds, MaxRate). + optional uint64 RoundSeconds = 9 [default = 86400]; } optional TBackgroundCompactionConfig BackgroundCompactionConfig = 1; diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h index fe6483c9d8..cb745fcb51 100644 --- a/ydb/core/tx/schemeshard/operation_queue_timer.h +++ b/ydb/core/tx/schemeshard/operation_queue_timer.h @@ -348,13 +348,13 @@ public: TString DumpQueueFronts() const { TStringStream ss; ss << "LastCompaction: {"; - if (!QueueLastCompaction.Empty()) + if (!QueueLastCompaction.Empty()) ss << QueueLastCompaction.Front(); ss << "}, SearchHeight: {"; - if (!QueueSearchHeight.Empty()) + if (!QueueSearchHeight.Empty()) ss << QueueSearchHeight.Front(); ss << "}, RowDeletes: {"; - if (!QueueRowDeletes.Empty()) + if (!QueueRowDeletes.Empty()) ss << QueueRowDeletes.Front(); ss << "}"; return ss.Str(); diff --git a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp index db6d9e329c..f715e0e394 100644 --- a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp @@ -49,7 +49,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T // move shard to the end of all queues TInstant now = AppData(ctx)->TimeProvider->Now(); TTableInfo::TPartitionStats stats; - stats.FullCompactionTs = now.Seconds(); + stats.FullCompactionTs = now.Seconds(); auto duration = CompactionQueue->OnDone(TShardCompactionInfo(shardIdx, stats)); if (shardIdx == InvalidShardIdx) { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 014e36dcfc..98011ef1ff 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -5992,14 +5992,16 @@ void TSchemeShard::ConfigureCompactionQueue( compactionConfig.Timeout = TDuration::Seconds(config.GetTimeoutSeconds()); compactionConfig.WakeupInterval = TDuration::Seconds(config.GetWakeupIntervalSeconds()); compactionConfig.InflightLimit = config.GetInflightLimit(); - compactionConfig.Rate = config.GetRate(); - compactionConfig.MinOperationRepeatDelay = TDuration::Seconds(config.GetMinCompactionRepeatDelay()); + compactionConfig.RoundInterval = TDuration::Seconds(config.GetRoundSeconds()); + compactionConfig.MaxRate = config.GetMaxRate(); + compactionConfig.MinOperationRepeatDelay = TDuration::Seconds(config.GetMinCompactionRepeatDelaySeconds()); LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "CompactionQueue configured: Timeout# " << compactionConfig.Timeout << ", WakeupInterval# " << compactionConfig.WakeupInterval + << ", RoundInterval# " << compactionConfig.RoundInterval << ", InflightLimit# " << compactionConfig.InflightLimit - << ", Rate# " << compactionConfig.Rate); + << ", MaxRate# " << compactionConfig.MaxRate); if (CompactionQueue) { CompactionQueue->UpdateConfig(compactionConfig, queueConfig); diff --git a/ydb/core/util/operation_queue.h b/ydb/core/util/operation_queue.h index 59df79cef0..ac39e222d6 100644 --- a/ydb/core/util/operation_queue.h +++ b/ydb/core/util/operation_queue.h @@ -53,8 +53,10 @@ struct TConfig { // shuffle added items on Start() bool ShuffleOnStart = false; + TDuration RoundInterval; + ui32 InflightLimit = 1; - double Rate = 0.0; // max rate operations/s + double MaxRate = 0.0; // max rate operations/s // In case of circular queue start done operation // again only after this interval of time @@ -155,7 +157,7 @@ public: if (it == Items.end()) return false; - auto& ref = const_cast<THeapItem&>(*it); + auto& ref = const_cast<THeapItem&>(*it); ref.Item = item; Heap.Update(&ref); return true; @@ -219,7 +221,7 @@ public: // but in other places we want. // Though in UpdateIfFound rhs.Timestamp should // be always missing while in other cases it - // always presents + // always presents Timestamp = rhs.Timestamp; } return *this; @@ -254,6 +256,7 @@ private: TWaitingItems WaitingItems; TTokenBucket TokenBucket; + bool HasRateLimit = false; TInstant NextWakeup; bool Running = false; @@ -261,6 +264,9 @@ private: ui64 TimeoutCount = 0; + // operations / s + double Rate = 0; + public: TOperationQueue(const TConfig& config, IStarter& starter, @@ -294,12 +300,33 @@ public: if (&Config != &config) Config = config; - if (Config.Rate) { + UpdateRate(); + } + + void UpdateRate() { + if (!Config.MaxRate && !Config.RoundInterval) { + HasRateLimit = false; + Rate = 0; + TokenBucket.SetUnlimited(); + return; + } + + Rate = Config.MaxRate; + if (Config.RoundInterval && TotalQueueSize() > 0) { + double rateByInterval = TotalQueueSize() / (double)Config.RoundInterval.Seconds(); + if (Config.MaxRate) + rateByInterval = Min(rateByInterval, Config.MaxRate); + Rate = rateByInterval; + } + + HasRateLimit = false; + if (Rate) { // by default token bucket is unlimitted, so // configure only when rate is limited TokenBucket.SetCapacity(Config.InflightLimit); - TokenBucket.SetRate(Config.Rate); + TokenBucket.SetRate(Rate); TokenBucket.Fill(Timer.Now()); + HasRateLimit = true; } } @@ -354,6 +381,14 @@ public: size_t WaitingSize() const { return WaitingItems.Size(); } bool WaitingEmpty() const { return WaitingItems.Empty(); } + size_t TotalQueueSize() const { + if (Config.IsCircular) + return Size() + RunningSize(); + return Size(); + } + + double GetRate() const { return Rate; } + ui64 ResetTimeoutCount() { return TimeoutCount; TimeoutCount = 0; } // copies items, should be used in tests only @@ -419,7 +454,11 @@ bool TOperationQueue<T, TQueue>::Enqueue(const T& item) { template <typename T, typename TQueue> template <typename T2> bool TOperationQueue<T, TQueue>::EnqueueNoStart(T2&& item) { - return ReadyQueue.Enqueue(std::forward<T2>(item)); + bool res = ReadyQueue.Enqueue(std::forward<T2>(item)); + if (res) + UpdateRate(); + + return res; } template <typename T, typename TQueue> @@ -452,6 +491,9 @@ bool TOperationQueue<T, TQueue>::Remove(const T& item) { } } + if (removed) + UpdateRate(); + return removed; } @@ -599,7 +641,7 @@ void TOperationQueue<T, TQueue>::ScheduleWakeup() { return; // no sense to wakeup earlier that rate limit allows - if (Config.Rate) { + if (HasRateLimit) { wakeup = Max(wakeup, now + TokenBucket.NextAvailableDelay()); } diff --git a/ydb/core/util/operation_queue_ut.cpp b/ydb/core/util/operation_queue_ut.cpp index f6093d2fb8..bd6014eb56 100644 --- a/ydb/core/util/operation_queue_ut.cpp +++ b/ydb/core/util/operation_queue_ut.cpp @@ -768,7 +768,7 @@ Y_UNIT_TEST_SUITE(TCircularOperationQueueTest) { TQueue::TConfig config; config.IsCircular = true; config.InflightLimit = 2; - config.Rate = 1.0; + config.MaxRate = 1.0; config.Timeout = Timeout; TOperationStarter starter; @@ -818,6 +818,74 @@ Y_UNIT_TEST_SUITE(TCircularOperationQueueTest) { UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 2UL); } + Y_UNIT_TEST(BasicRPSCheckWithRound) { + TQueue::TConfig config; + config.IsCircular = true; + config.InflightLimit = 1; + config.MaxRate = 10.0; + config.RoundInterval = TDuration::Seconds(100); + config.Timeout = Timeout; + TOperationStarter starter; + + TQueue queue(config, starter, starter); + queue.Start(); + + // no items yet, thus rate by MaxRate + UNIT_ASSERT_VALUES_EQUAL(queue.GetRate(), config.MaxRate); + + queue.Enqueue(1); + UNIT_ASSERT_VALUES_EQUAL(queue.GetRate(), 1.0 / 100); + + queue.Enqueue(2); + UNIT_ASSERT_VALUES_EQUAL(queue.GetRate(), 2.0 / 100); + + queue.Enqueue(3); + queue.Enqueue(4); + queue.Enqueue(5); + + UNIT_ASSERT_VALUES_EQUAL(queue.GetRate(), 5.0 / 100); + + // Note that remove should affect rate as well + queue.Remove(5); + UNIT_ASSERT_VALUES_EQUAL(queue.GetRate(), 4.0 / 100); + + UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 3UL); + UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 1UL); + + // OnDone should not affect the rate + queue.OnDone(1); + UNIT_ASSERT_VALUES_EQUAL(queue.GetRate(), 4.0 / 100); + + // should start another one because RPS smoothing + UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 3UL); + UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 1UL); + + queue.OnDone(2); + + // Queue should start items every 25 seconds, + // thus now no items should be running + UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 4UL); + UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 0UL); + + // some spurious wakeup1 + starter.TimeProvider.Move(TDuration::Seconds(10)); + queue.Wakeup(); + UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 4UL); + UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 0UL); + + // some spurious wakeup1 + starter.TimeProvider.Move(TDuration::Seconds(10)); + queue.Wakeup(); + UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 4UL); + UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 0UL); + + // wakeup3 + starter.TimeProvider.Move(TDuration::Seconds(6)); + queue.Wakeup(); + UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 3UL); + UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 1UL); + } + Y_UNIT_TEST(CheckWakeupAfterStop) { TQueue::TConfig config; config.IsCircular = true; |