summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <[email protected]>2022-03-30 18:30:55 +0300
committerEvgeniy Ivanov <[email protected]>2022-03-30 18:30:55 +0300
commit7e3bb182b4c0f2dac288002e3fd04d03131c36b2 (patch)
treee19e2f1761b497558fbb0f1483e8c35caf7c032a
parent543be08158636fd07db5678e23aa4a0b5df7a938 (diff)
KIKIMR-14610: properly update active subqueue when update/remove compactionQueue
ref:7a71ee8fb483bc44a4859583c336469db1cf8711
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.txt5
-rw-r--r--ydb/core/tx/schemeshard/operation_queue_timer.h77
-rw-r--r--ydb/core/tx/schemeshard/ut_compaction.cpp325
3 files changed, 337 insertions, 70 deletions
diff --git a/ydb/core/tx/schemeshard/CMakeLists.txt b/ydb/core/tx/schemeshard/CMakeLists.txt
index eff2e9ed2c7..34eaa4f2e9a 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.txt
@@ -191,3 +191,8 @@ generate_enum_serilization(core-tx-schemeshard
INCLUDE_HEADERS
ydb/core/tx/schemeshard/schemeshard_types.h
)
+generate_enum_serilization(core-tx-schemeshard
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/operation_queue_timer.h
+ INCLUDE_HEADERS
+ ydb/core/tx/schemeshard/operation_queue_timer.h
+)
diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h
index 92bf21b59f3..1ba33fc7d47 100644
--- a/ydb/core/tx/schemeshard/operation_queue_timer.h
+++ b/ydb/core/tx/schemeshard/operation_queue_timer.h
@@ -192,6 +192,13 @@ public:
TConfig() = default;
};
+ // Enumeration defines round robin order
+ enum class EActiveQueue {
+ ByLastCompaction, // must be first
+ BySearchHeight,
+ ByRowDeletes, // must be last, see PopFront()
+ };
+
private:
using TCompactionQueueLastCompaction = NOperationQueue::TQueueWithPriority<
TShardCompactionInfo,
@@ -205,13 +212,6 @@ private:
TShardCompactionInfo,
TShardCompactionInfo::TLessByRowDeletes>;
- // Enumeration defines round robin order
- enum class EActiveQueue {
- ByLastCompaction, // must be first
- BySearchHeight,
- ByRowDeletes, // must be last, see PopFront()
- };
-
private:
TConfig Config;
@@ -267,7 +267,10 @@ public:
bool Remove(const TShardCompactionInfo& info) {
QueueSearchHeight.Remove(info);
QueueRowDeletes.Remove(info);
- return QueueLastCompaction.Remove(info);
+ bool res = QueueLastCompaction.Remove(info);
+
+ UpdateActiveQueue();
+ return res;
}
bool UpdateIfFound(const TShardCompactionInfo& info) {
@@ -275,19 +278,27 @@ public:
return Remove(info);
}
+ if (!QueueLastCompaction.UpdateIfFound(info)) {
+ // this queue contains all shards, no reason to check other shards
+ return false;
+ }
+
if (info.SearchHeight >= Config.SearchHeightThreshold) {
- QueueSearchHeight.UpdateIfFound(info);
+ if (!QueueSearchHeight.UpdateIfFound(info))
+ QueueSearchHeight.Enqueue(info);
} else {
QueueSearchHeight.Remove(info);
}
if (info.RowDeletes >= Config.RowDeletesThreshold) {
- QueueRowDeletes.UpdateIfFound(info);
+ if (!QueueRowDeletes.UpdateIfFound(info))
+ QueueRowDeletes.Enqueue(info);
} else {
QueueRowDeletes.Remove(info);
}
- return QueueLastCompaction.UpdateIfFound(info);
+ UpdateActiveQueue();
+ return true;
}
void Clear() {
@@ -299,10 +310,13 @@ public:
const TShardCompactionInfo& Front() const {
switch (ActiveQueue) {
case EActiveQueue::ByLastCompaction:
+ Y_VERIFY(!QueueLastCompaction.Empty(), "QueueLastCompaction empty");
return QueueLastCompaction.Front();
case EActiveQueue::BySearchHeight:
+ Y_VERIFY(!QueueSearchHeight.Empty(), "QueueSearchHeight empty");
return QueueSearchHeight.Front();
case EActiveQueue::ByRowDeletes:
+ Y_VERIFY(!QueueRowDeletes.Empty(), "QueueRowDeletes empty");
return QueueRowDeletes.Front();
}
}
@@ -332,18 +346,32 @@ public:
switch (ActiveQueue) {
case EActiveQueue::ByLastCompaction:
- if (!QueueSearchHeight.Empty()) {
ActiveQueue = EActiveQueue::BySearchHeight;
break;
- }
- [[fallthrough]];
case EActiveQueue::BySearchHeight:
- if (!QueueRowDeletes.Empty()) {
ActiveQueue = EActiveQueue::ByRowDeletes;
break;
- }
+ case EActiveQueue::ByRowDeletes:
+ ActiveQueue = EActiveQueue::ByLastCompaction;
+ }
+ UpdateActiveQueue();
+ }
+
+ void UpdateActiveQueue() {
+ switch (ActiveQueue) {
+ case EActiveQueue::ByLastCompaction:
+ // if this queue is empty, all other queues are empty too,
+ // thus no reason to do any check
+ return;
+ case EActiveQueue::BySearchHeight:
+ if (!QueueSearchHeight.Empty())
+ return;
+ ActiveQueue = EActiveQueue::ByRowDeletes;
[[fallthrough]];
case EActiveQueue::ByRowDeletes:
+ if (!QueueRowDeletes.Empty()) {
+ return;
+ }
ActiveQueue = EActiveQueue::ByLastCompaction;
}
}
@@ -364,6 +392,23 @@ public:
return QueueRowDeletes.Size();
}
+ // for tests
+ size_t ActiveQueueSize() const {
+ switch (ActiveQueue) {
+ case EActiveQueue::ByLastCompaction:
+ return QueueLastCompaction.Size();
+ case EActiveQueue::BySearchHeight:
+ return QueueSearchHeight.Size();
+ case EActiveQueue::ByRowDeletes:
+ return QueueRowDeletes.Size();
+ }
+ }
+
+ // for tests
+ EActiveQueue ActiveQueueType() const {
+ return ActiveQueue;
+ }
+
TString DumpQueueFronts() const {
TStringStream ss;
ss << "LastCompaction: {";
diff --git a/ydb/core/tx/schemeshard/ut_compaction.cpp b/ydb/core/tx/schemeshard/ut_compaction.cpp
index e87ca69101f..bcc38c2d8ac 100644
--- a/ydb/core/tx/schemeshard/ut_compaction.cpp
+++ b/ydb/core/tx/schemeshard/ut_compaction.cpp
@@ -14,6 +14,17 @@ namespace {
using TTableInfoMap = THashMap<TString, NKikimrTxDataShard::TEvGetInfoResponse::TUserTable>;
+TShardCompactionInfo MakeCompactionInfo(ui64 idx, ui64 ts, ui64 sh = 0, ui64 d = 0) {
+ TShardIdx shardId = TShardIdx(1, idx);
+ TTableInfo::TPartitionStats stats;
+ stats.FullCompactionTs = ts;
+ stats.SearchHeight = sh;
+ stats.RowDeletes = d;
+ stats.PartCount = 100; // random number to not consider shard as empty
+ stats.RowCount = 100; // random number to not consider shard as empty
+ return TShardCompactionInfo(shardId, stats);
+}
+
std::pair<TTableInfoMap, ui64> GetTables(
TTestActorRuntime &runtime,
ui64 tabletId)
@@ -560,48 +571,272 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
}
- Y_UNIT_TEST(UpdateBelowSearchHeightThreshold) {
+ Y_UNIT_TEST(RemoveLastShardFromSubQueues) {
+ // check that when last shard is removed from BySearchHeight
+ // or from ByRowDeletes, active queue is properly switched
TCompactionQueueImpl::TConfig config;
config.SearchHeightThreshold = 10;
config.RowDeletesThreshold = 10;
- TTableInfo::TPartitionStats stats;
- stats.RowCount = 10;
- stats.RowDeletes = 100;
- stats.SearchHeight = 20;
- stats.PartCount = 100; // random number to not consider shard as empty
+ std::vector<TShardCompactionInfo> shardInfos = {
+ // id, ts, sh, d
+ MakeCompactionInfo(0, 0, 0, 0),
+ MakeCompactionInfo(1, 1, 0, 0),
+ MakeCompactionInfo(2, 2, 0, 0),
+ MakeCompactionInfo(3, 3, 0, 0),
+ MakeCompactionInfo(4, 4, 100, 0),
+ MakeCompactionInfo(5, 5, 100, 0),
+ MakeCompactionInfo(6, 6, 0, 100),
+ MakeCompactionInfo(7, 7, 0, 100),
+ };
TCompactionQueueImpl queue(config);
- UNIT_ASSERT(queue.Enqueue({ShardIdx, stats}));
+ for(const auto& info: shardInfos) {
+ UNIT_ASSERT(queue.Enqueue(info));
+ }
+
+ // initial queue state
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 8UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 2UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 2UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::ByLastCompaction);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 8);
+
+ // remove from LastCompaction, active queue should not change
+ UNIT_ASSERT(queue.Remove({TShardIdx(1, 0), TTableInfo::TPartitionStats()}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 7UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 2UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 2UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::ByLastCompaction);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 7);
+
+ // pop from LastCompaction, BySearchHeight is active now
+ queue.PopFront();
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 6UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 2UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 2UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::BySearchHeight);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 2);
+
+ // remove1 from BySearchHeight (active queue should not change)
+ UNIT_ASSERT(queue.Remove({TShardIdx(1, 4), TTableInfo::TPartitionStats()}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 5UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 2UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::BySearchHeight);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1);
- stats.SearchHeight = 1;
- UNIT_ASSERT(queue.UpdateIfFound({ShardIdx, stats}));
+ // remove2 from BySearchHeight, ByRowDeletes is active now
+ UNIT_ASSERT(queue.Remove({TShardIdx(1, 5), TTableInfo::TPartitionStats()}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 4UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 2UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::ByRowDeletes);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 2);
+
+ // remove1 from ByRowDeletes
+ UNIT_ASSERT(queue.Remove({TShardIdx(1, 6), TTableInfo::TPartitionStats()}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 3UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::ByRowDeletes);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1);
+
+ // remove2 from ByRowDeletes
+ UNIT_ASSERT(queue.Remove({TShardIdx(1, 7), TTableInfo::TPartitionStats()}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 2UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::ByLastCompaction);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 2);
+
+ // remove1 from LastCompaction
+ UNIT_ASSERT(queue.Remove({TShardIdx(1, 2), TTableInfo::TPartitionStats()}));
UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::ByLastCompaction);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1);
+
+ // remove2 from LastCompaction
+ UNIT_ASSERT(queue.Remove({TShardIdx(1, 3), TTableInfo::TPartitionStats()}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::ByLastCompaction);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 0);
+
+ // check case BySearchHeight -> ByLastCompaction, i.e. empty ByRowDeletes
+
+ shardInfos = {
+ // id, ts, sh, d
+ MakeCompactionInfo(1, 1, 0, 0),
+ MakeCompactionInfo(2, 2, 0, 0),
+ MakeCompactionInfo(3, 3, 0, 0),
+ MakeCompactionInfo(4, 4, 100, 0),
+ };
+
+ for(const auto& info: shardInfos) {
+ UNIT_ASSERT(queue.Enqueue(info));
+ }
+
+ // pop from LastCompaction, BySearchHeight not empty
+ queue.PopFront();
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 3UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::BySearchHeight);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1);
+
+ // remove from BySearchHeight
+ UNIT_ASSERT(queue.Remove({TShardIdx(1, 4), TTableInfo::TPartitionStats()}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 2UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::ByLastCompaction);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 2);
+
+ // check case ByLastCompaction -> ByRowDeletes, i.e. BySearchHeight is empty
+
+ while (!queue.Empty())
+ queue.PopFront();
+
+ shardInfos = {
+ // id, ts, sh, d
+ MakeCompactionInfo(1, 1, 0, 0),
+ MakeCompactionInfo(2, 2, 0, 0),
+ MakeCompactionInfo(3, 3, 0, 0),
+ MakeCompactionInfo(4, 4, 0, 100),
+ };
+
+ for(const auto& info: shardInfos) {
+ UNIT_ASSERT(queue.Enqueue(info));
+ }
+
+ // pop from LastCompaction, BySearchHeight empty, ByRowDeletes is not empty
+ queue.PopFront();
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 3UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::ByRowDeletes);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1);
}
- Y_UNIT_TEST(UpdateBelowRowDeletesThreshold) {
+ Y_UNIT_TEST(UpdateBelowThreshold) {
+ // check that last shard is removed (via low threshold and stats update) from BySearchHeight queue
+ // while ByRowDeletes queue is not empty, thus becomes active
TCompactionQueueImpl::TConfig config;
config.SearchHeightThreshold = 10;
config.RowDeletesThreshold = 10;
- TTableInfo::TPartitionStats stats;
- stats.RowCount = 10;
- stats.RowDeletes = 1000;
- stats.SearchHeight = 20;
- stats.PartCount = 100; // random number to not consider shard as empty
+ std::vector<TShardCompactionInfo> shardInfos = {
+ // id, ts, sh, d
+ MakeCompactionInfo(1, 1, 0, 0),
+ MakeCompactionInfo(2, 2, 0, 0),
+ MakeCompactionInfo(3, 3, 0, 0),
+ MakeCompactionInfo(4, 4, 100, 0),
+ MakeCompactionInfo(5, 5, 0, 100),
+ };
TCompactionQueueImpl queue(config);
- UNIT_ASSERT(queue.Enqueue({ShardIdx, stats}));
+ for(const auto& info: shardInfos) {
+ UNIT_ASSERT(queue.Enqueue(info));
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 5UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::ByLastCompaction);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 5);
- stats.RowDeletes = 1;
- UNIT_ASSERT(queue.UpdateIfFound({ShardIdx, stats}));
- UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 1UL);
+ // change to BySearchHeight queue
+ queue.PopFront();
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 4UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::BySearchHeight);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1);
+
+ TTableInfo::TPartitionStats statsBelow;
+ statsBelow.RowDeletes = 100;
+ statsBelow.FullCompactionTs = 4;
+ statsBelow.SearchHeight = 1; // below threshold
+ statsBelow.RowDeletes = 1; // below threshold
+ statsBelow.PartCount = 100; // random number to not consider shard as empty
+ statsBelow.RowCount = 100; // random number to not consider shard as empty
+
+ // remove from BySearchHeight by updating stats (note that shard remains in LastCompaction queue)
+ UNIT_ASSERT(queue.UpdateIfFound({TShardIdx(1, 4), statsBelow}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 4UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::ByRowDeletes);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1);
+
+ // remove from BySearchHeight by updating stats (note that shard remains in LastCompaction queue)
+ statsBelow.FullCompactionTs = 5;
+ UNIT_ASSERT(queue.UpdateIfFound({TShardIdx(1, 5), statsBelow}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 4UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::ByLastCompaction);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 4);
+
+ // Now check transition from BySearchHeight to LastCompaction, i.e. empty RowDeletes
+
+ // step1: populate w with item
+ TTableInfo::TPartitionStats statsSh;
+ statsSh.FullCompactionTs = 4;
+ statsSh.SearchHeight = 100; // above threshold
+ statsSh.RowDeletes = 1; // below threshold
+ statsSh.PartCount = 100; // random number to not consider shard as empty
+ statsSh.RowCount = 100; // random number to not consider shard as empty
+ UNIT_ASSERT(queue.UpdateIfFound({TShardIdx(1, 4), statsSh}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 4UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::ByLastCompaction);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 4);
+
+ // step2: change to BySearchHeight queue
+ queue.PopFront();
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 3UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::BySearchHeight);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1);
+
+ // step3: BySearchHeight -> LastCompaction
+ UNIT_ASSERT(queue.UpdateIfFound({TShardIdx(1, 4), statsBelow}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 3UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::ByLastCompaction);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 3);
+
+ // check ByLastCompaction -> ByRowDeletes, i.e. empty BySearchHeight
+
+ // step1: populate ByRowDeletes with item
+ TTableInfo::TPartitionStats statsDel;
+ statsDel.FullCompactionTs = 5;
+ statsDel.SearchHeight = 1; // below threshold
+ statsDel.RowDeletes = 100; // above threshold
+ statsDel.PartCount = 100; // random number to not consider shard as empty
+ statsDel.RowCount = 100; // random number to not consider shard as empty
+ UNIT_ASSERT(queue.UpdateIfFound({TShardIdx(1, 5), statsDel}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 3UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::ByLastCompaction);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 3);
+
+ // step2: change to ByRowDeletes
+ queue.PopFront();
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 2UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::ByRowDeletes);
+ UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1);
}
Y_UNIT_TEST(UpdateWithEmptyShard) {
@@ -633,20 +868,12 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
config.SearchHeightThreshold = 100;
config.RowDeletesThreshold = 100;
- auto makeInfo = [](ui64 idx, ui64 ts) {
- TShardIdx shardId = TShardIdx(1, idx);
- TTableInfo::TPartitionStats stats;
- stats.FullCompactionTs = ts;
- stats.PartCount = 100; // random number to not consider shard as empty
- return TShardCompactionInfo(shardId, stats);
- };
-
std::vector<TShardCompactionInfo> shardInfos = {
- // id, ts
- makeInfo(1, 1),
- makeInfo(2, 2),
- makeInfo(3, 3),
- makeInfo(4, 4)
+ // id, ts
+ MakeCompactionInfo(1, 1),
+ MakeCompactionInfo(2, 2),
+ MakeCompactionInfo(3, 3),
+ MakeCompactionInfo(4, 4)
};
auto rng = std::default_random_engine {};
@@ -676,29 +903,19 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
config.SearchHeightThreshold = 10;
config.RowDeletesThreshold = 10;
- auto makeInfo = [](ui64 idx, ui64 ts, ui64 sh, ui64 d) {
- TShardIdx shardId = TShardIdx(1, idx);
- TTableInfo::TPartitionStats stats;
- stats.FullCompactionTs = ts;
- stats.SearchHeight = sh;
- stats.RowDeletes = d;
- stats.PartCount = 100; // random number to not consider shard as empty
- return TShardCompactionInfo(shardId, stats);
- };
-
std::vector<TShardCompactionInfo> shardInfos = {
- // id, ts, sh, d
- makeInfo(1, 1, 100, 100), // top in TS
- makeInfo(2, 3, 100, 50), // top in SH
- makeInfo(3, 4, 50, 100), // top in D
- makeInfo(4, 2, 0, 0), // 2 in TS
- makeInfo(5, 3, 90, 0), // 2 in SH
- makeInfo(6, 4, 0, 90), // 2 in D
- makeInfo(7, 3, 0, 0), // 3 in TS
- makeInfo(8, 5, 0, 80), // 3 in D
- makeInfo(9, 5, 0, 0), // 4 in TS, since this point only TS queue contains items
- makeInfo(10, 6, 0, 0), // 5 in TS
- makeInfo(11, 7, 0, 0), // 6 in TS
+ // id, ts, sh, d
+ MakeCompactionInfo(1, 1, 100, 100), // top in TS
+ MakeCompactionInfo(2, 3, 100, 50), // top in SH
+ MakeCompactionInfo(3, 4, 50, 100), // top in D
+ MakeCompactionInfo(4, 2, 0, 0), // 2 in TS
+ MakeCompactionInfo(5, 3, 90, 0), // 2 in SH
+ MakeCompactionInfo(6, 4, 0, 90), // 2 in D
+ MakeCompactionInfo(7, 3, 0, 0), // 3 in TS
+ MakeCompactionInfo(8, 5, 0, 80), // 3 in D
+ MakeCompactionInfo(9, 5, 0, 0), // 4 in TS, since this point only TS queue contains items
+ MakeCompactionInfo(10, 6, 0, 0), // 5 in TS
+ MakeCompactionInfo(11, 7, 0, 0), // 6 in TS
};
auto rng = std::default_random_engine {};