aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <eivanov89@yandex-team.ru>2022-02-11 18:13:43 +0300
committerEvgeniy Ivanov <eivanov89@yandex-team.ru>2022-02-11 18:13:43 +0300
commit9239e3a134ea21e9ad69912968146743a467bcca (patch)
tree67a88636e8e455e28bd9931c9d6338fb33636c50
parent84ed4aa29f9177dedab2fc11b10f8239d16ff8de (diff)
downloadydb-9239e3a134ea21e9ad69912968146743a467bcca.tar.gz
KIKIMR-9748: calculate compaction rate based on RoundSeconds option
ref:53a8d2dcdf4f6988167b463150c23a7862a30751
-rw-r--r--ydb/core/protos/config.proto22
-rw-r--r--ydb/core/tx/schemeshard/operation_queue_timer.h6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__compaction.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp8
-rw-r--r--ydb/core/util/operation_queue.h56
-rw-r--r--ydb/core/util/operation_queue_ut.cpp70
6 files changed, 147 insertions, 17 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 2164818e8d..b31ca1bb45 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1300,11 +1300,24 @@ message TDataShardConfig {
message TCompactionConfig {
message TBackgroundCompactionConfig {
- optional double Rate = 1 [default = 1]; // 1 compaction / s
+ optional double MaxRate = 1 [default = 1]; // 1 compaction / s
optional uint64 InflightLimit = 2 [default = 1];
+
+ // After this interval started compaction is considered as finished,
+ // retry is possible only within MinCompactionRepeatDelaySeconds
optional uint64 TimeoutSeconds = 3 [default = 600];
+
+ // How often schemeshard is waken up to check if it should
+ // start any compaction. It is used only when there are no
+ // running compactions, otherwise compaction queue logic
+ // is triggered either by finished compactions (event from DS)
+ // or by timeouts (set via TimeoutSeconds)
optional uint64 WakeupIntervalSeconds = 4 [default = 60];
- optional uint64 MinCompactionRepeatDelay = 5 [default = 600];
+
+ // When shard has been compacted, it will be considered for
+ // compaction only after this amount of time
+ optional uint64 MinCompactionRepeatDelaySeconds = 5 [default = 600];
+
optional uint32 SearchHeightThreshold = 6 [default = 10];
// TODO: enable, when schemeshard receive proper stat
@@ -1315,6 +1328,11 @@ message TCompactionConfig {
// at all even when searchHeight or deleted rows match
// corresponding thresholds
optional uint32 RowCountThreshold = 8 [default = 1];
+
+ // When this option specified, schemeshard calculates compaction rate so
+ // that the queue is compacted within this interval.
+ // CompactionRate = Min(QueueSize / RoundSeconds, MaxRate).
+ optional uint64 RoundSeconds = 9 [default = 86400];
}
optional TBackgroundCompactionConfig BackgroundCompactionConfig = 1;
diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h
index fe6483c9d8..cb745fcb51 100644
--- a/ydb/core/tx/schemeshard/operation_queue_timer.h
+++ b/ydb/core/tx/schemeshard/operation_queue_timer.h
@@ -348,13 +348,13 @@ public:
TString DumpQueueFronts() const {
TStringStream ss;
ss << "LastCompaction: {";
- if (!QueueLastCompaction.Empty())
+ if (!QueueLastCompaction.Empty())
ss << QueueLastCompaction.Front();
ss << "}, SearchHeight: {";
- if (!QueueSearchHeight.Empty())
+ if (!QueueSearchHeight.Empty())
ss << QueueSearchHeight.Front();
ss << "}, RowDeletes: {";
- if (!QueueRowDeletes.Empty())
+ if (!QueueRowDeletes.Empty())
ss << QueueRowDeletes.Front();
ss << "}";
return ss.Str();
diff --git a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
index db6d9e329c..f715e0e394 100644
--- a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
@@ -49,7 +49,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T
// move shard to the end of all queues
TInstant now = AppData(ctx)->TimeProvider->Now();
TTableInfo::TPartitionStats stats;
- stats.FullCompactionTs = now.Seconds();
+ stats.FullCompactionTs = now.Seconds();
auto duration = CompactionQueue->OnDone(TShardCompactionInfo(shardIdx, stats));
if (shardIdx == InvalidShardIdx) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 014e36dcfc..98011ef1ff 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -5992,14 +5992,16 @@ void TSchemeShard::ConfigureCompactionQueue(
compactionConfig.Timeout = TDuration::Seconds(config.GetTimeoutSeconds());
compactionConfig.WakeupInterval = TDuration::Seconds(config.GetWakeupIntervalSeconds());
compactionConfig.InflightLimit = config.GetInflightLimit();
- compactionConfig.Rate = config.GetRate();
- compactionConfig.MinOperationRepeatDelay = TDuration::Seconds(config.GetMinCompactionRepeatDelay());
+ compactionConfig.RoundInterval = TDuration::Seconds(config.GetRoundSeconds());
+ compactionConfig.MaxRate = config.GetMaxRate();
+ compactionConfig.MinOperationRepeatDelay = TDuration::Seconds(config.GetMinCompactionRepeatDelaySeconds());
LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"CompactionQueue configured: Timeout# " << compactionConfig.Timeout
<< ", WakeupInterval# " << compactionConfig.WakeupInterval
+ << ", RoundInterval# " << compactionConfig.RoundInterval
<< ", InflightLimit# " << compactionConfig.InflightLimit
- << ", Rate# " << compactionConfig.Rate);
+ << ", MaxRate# " << compactionConfig.MaxRate);
if (CompactionQueue) {
CompactionQueue->UpdateConfig(compactionConfig, queueConfig);
diff --git a/ydb/core/util/operation_queue.h b/ydb/core/util/operation_queue.h
index 59df79cef0..ac39e222d6 100644
--- a/ydb/core/util/operation_queue.h
+++ b/ydb/core/util/operation_queue.h
@@ -53,8 +53,10 @@ struct TConfig {
// shuffle added items on Start()
bool ShuffleOnStart = false;
+ TDuration RoundInterval;
+
ui32 InflightLimit = 1;
- double Rate = 0.0; // max rate operations/s
+ double MaxRate = 0.0; // max rate operations/s
// In case of circular queue start done operation
// again only after this interval of time
@@ -155,7 +157,7 @@ public:
if (it == Items.end())
return false;
- auto& ref = const_cast<THeapItem&>(*it);
+ auto& ref = const_cast<THeapItem&>(*it);
ref.Item = item;
Heap.Update(&ref);
return true;
@@ -219,7 +221,7 @@ public:
// but in other places we want.
// Though in UpdateIfFound rhs.Timestamp should
// be always missing while in other cases it
- // always presents
+ // always presents
Timestamp = rhs.Timestamp;
}
return *this;
@@ -254,6 +256,7 @@ private:
TWaitingItems WaitingItems;
TTokenBucket TokenBucket;
+ bool HasRateLimit = false;
TInstant NextWakeup;
bool Running = false;
@@ -261,6 +264,9 @@ private:
ui64 TimeoutCount = 0;
+ // operations / s
+ double Rate = 0;
+
public:
TOperationQueue(const TConfig& config,
IStarter& starter,
@@ -294,12 +300,33 @@ public:
if (&Config != &config)
Config = config;
- if (Config.Rate) {
+ UpdateRate();
+ }
+
+ void UpdateRate() {
+ if (!Config.MaxRate && !Config.RoundInterval) {
+ HasRateLimit = false;
+ Rate = 0;
+ TokenBucket.SetUnlimited();
+ return;
+ }
+
+ Rate = Config.MaxRate;
+ if (Config.RoundInterval && TotalQueueSize() > 0) {
+ double rateByInterval = TotalQueueSize() / (double)Config.RoundInterval.Seconds();
+ if (Config.MaxRate)
+ rateByInterval = Min(rateByInterval, Config.MaxRate);
+ Rate = rateByInterval;
+ }
+
+ HasRateLimit = false;
+ if (Rate) {
// by default token bucket is unlimitted, so
// configure only when rate is limited
TokenBucket.SetCapacity(Config.InflightLimit);
- TokenBucket.SetRate(Config.Rate);
+ TokenBucket.SetRate(Rate);
TokenBucket.Fill(Timer.Now());
+ HasRateLimit = true;
}
}
@@ -354,6 +381,14 @@ public:
size_t WaitingSize() const { return WaitingItems.Size(); }
bool WaitingEmpty() const { return WaitingItems.Empty(); }
+ size_t TotalQueueSize() const {
+ if (Config.IsCircular)
+ return Size() + RunningSize();
+ return Size();
+ }
+
+ double GetRate() const { return Rate; }
+
ui64 ResetTimeoutCount() { return TimeoutCount; TimeoutCount = 0; }
// copies items, should be used in tests only
@@ -419,7 +454,11 @@ bool TOperationQueue<T, TQueue>::Enqueue(const T& item) {
template <typename T, typename TQueue>
template <typename T2>
bool TOperationQueue<T, TQueue>::EnqueueNoStart(T2&& item) {
- return ReadyQueue.Enqueue(std::forward<T2>(item));
+ bool res = ReadyQueue.Enqueue(std::forward<T2>(item));
+ if (res)
+ UpdateRate();
+
+ return res;
}
template <typename T, typename TQueue>
@@ -452,6 +491,9 @@ bool TOperationQueue<T, TQueue>::Remove(const T& item) {
}
}
+ if (removed)
+ UpdateRate();
+
return removed;
}
@@ -599,7 +641,7 @@ void TOperationQueue<T, TQueue>::ScheduleWakeup() {
return;
// no sense to wakeup earlier that rate limit allows
- if (Config.Rate) {
+ if (HasRateLimit) {
wakeup = Max(wakeup, now + TokenBucket.NextAvailableDelay());
}
diff --git a/ydb/core/util/operation_queue_ut.cpp b/ydb/core/util/operation_queue_ut.cpp
index f6093d2fb8..bd6014eb56 100644
--- a/ydb/core/util/operation_queue_ut.cpp
+++ b/ydb/core/util/operation_queue_ut.cpp
@@ -768,7 +768,7 @@ Y_UNIT_TEST_SUITE(TCircularOperationQueueTest) {
TQueue::TConfig config;
config.IsCircular = true;
config.InflightLimit = 2;
- config.Rate = 1.0;
+ config.MaxRate = 1.0;
config.Timeout = Timeout;
TOperationStarter starter;
@@ -818,6 +818,74 @@ Y_UNIT_TEST_SUITE(TCircularOperationQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 2UL);
}
+ Y_UNIT_TEST(BasicRPSCheckWithRound) {
+ TQueue::TConfig config;
+ config.IsCircular = true;
+ config.InflightLimit = 1;
+ config.MaxRate = 10.0;
+ config.RoundInterval = TDuration::Seconds(100);
+ config.Timeout = Timeout;
+ TOperationStarter starter;
+
+ TQueue queue(config, starter, starter);
+ queue.Start();
+
+ // no items yet, thus rate by MaxRate
+ UNIT_ASSERT_VALUES_EQUAL(queue.GetRate(), config.MaxRate);
+
+ queue.Enqueue(1);
+ UNIT_ASSERT_VALUES_EQUAL(queue.GetRate(), 1.0 / 100);
+
+ queue.Enqueue(2);
+ UNIT_ASSERT_VALUES_EQUAL(queue.GetRate(), 2.0 / 100);
+
+ queue.Enqueue(3);
+ queue.Enqueue(4);
+ queue.Enqueue(5);
+
+ UNIT_ASSERT_VALUES_EQUAL(queue.GetRate(), 5.0 / 100);
+
+ // Note that remove should affect rate as well
+ queue.Remove(5);
+ UNIT_ASSERT_VALUES_EQUAL(queue.GetRate(), 4.0 / 100);
+
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 3UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 1UL);
+
+ // OnDone should not affect the rate
+ queue.OnDone(1);
+ UNIT_ASSERT_VALUES_EQUAL(queue.GetRate(), 4.0 / 100);
+
+ // should start another one because RPS smoothing
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 3UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 1UL);
+
+ queue.OnDone(2);
+
+ // Queue should start items every 25 seconds,
+ // thus now no items should be running
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 4UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 0UL);
+
+ // some spurious wakeup1
+ starter.TimeProvider.Move(TDuration::Seconds(10));
+ queue.Wakeup();
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 4UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 0UL);
+
+ // some spurious wakeup1
+ starter.TimeProvider.Move(TDuration::Seconds(10));
+ queue.Wakeup();
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 4UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 0UL);
+
+ // wakeup3
+ starter.TimeProvider.Move(TDuration::Seconds(6));
+ queue.Wakeup();
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 3UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 1UL);
+ }
+
Y_UNIT_TEST(CheckWakeupAfterStop) {
TQueue::TConfig config;
config.IsCircular = true;