aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <eivanov89@yandex-team.ru>2022-02-11 12:36:12 +0300
committerEvgeniy Ivanov <eivanov89@yandex-team.ru>2022-02-11 12:36:12 +0300
commit97d0a681d1ee69d18453ee1f85ac1efccaae6031 (patch)
tree47f475608377a8e83a5884a9ff7936f09f190c79
parent052532eb6a2c1801fbb52dd18f9381c4c545b5d7 (diff)
downloadydb-97d0a681d1ee69d18453ee1f85ac1efccaae6031.tar.gz
KIKIMR-9748: impl of background compaction by compaction ts and by deleted rows
ref:6d28c65d2d88772834f55a685f94a088467e5e40
-rw-r--r--ydb/core/protos/config.proto9
-rw-r--r--ydb/core/sys_view/ut_common.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.cpp1
-rw-r--r--ydb/core/tx/schemeshard/operation_queue_timer.h264
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__compaction.cpp7
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__table_stats.cpp11
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp18
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h7
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h1
-rw-r--r--ydb/core/tx/schemeshard/ut_compaction.cpp227
-rw-r--r--ydb/core/util/operation_queue.h27
-rw-r--r--ydb/core/util/operation_queue_priority_ut.cpp50
12 files changed, 578 insertions, 49 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 3eab6e1ac9c..2164818e8de 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1306,6 +1306,15 @@ message TCompactionConfig {
optional uint64 WakeupIntervalSeconds = 4 [default = 60];
optional uint64 MinCompactionRepeatDelay = 5 [default = 600];
optional uint32 SearchHeightThreshold = 6 [default = 10];
+
+ // TODO: enable, when schemeshard receive proper stat
+ optional uint32 RowDeletesThreshold = 7 [default = 4294967295];
+
+ // for tests: to allow compaction requests to empty shards
+ // shards below this threshold are not background compacted
+ // at all even when searchHeight or deleted rows match
+ // corresponding thresholds
+ optional uint32 RowCountThreshold = 8 [default = 1];
}
optional TBackgroundCompactionConfig BackgroundCompactionConfig = 1;
diff --git a/ydb/core/sys_view/ut_common.cpp b/ydb/core/sys_view/ut_common.cpp
index 6165258568c..525bc5787ae 100644
--- a/ydb/core/sys_view/ut_common.cpp
+++ b/ydb/core/sys_view/ut_common.cpp
@@ -41,6 +41,11 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, bool
Settings->SetDynamicNodeCount(dynamicNodes);
Settings->SetKqpSettings(kqpSettings);
+ // in some tests we check data size, which depends on compaction,
+ NKikimrConfig::TFeatureFlags featureFlags;
+ featureFlags.SetEnableBackgroundCompaction(false);
+ Settings->SetFeatureFlags(featureFlags);
+
if (enableSVP) {
Settings->SetEnablePersistentQueryStats(true);
Settings->SetEnableDbCounters(true);
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
index edd6ca8e194..6e227a5c98e 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
@@ -249,7 +249,6 @@ bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidated
columnTags.push_back(column.Column);
}
}
- Y_VERIFY_DEBUG(!columnTags.empty());
bool ready = Database->Precharge(localTid,
from,
diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h
index 655fde7c18e..fe6483c9d89 100644
--- a/ydb/core/tx/schemeshard/operation_queue_timer.h
+++ b/ydb/core/tx/schemeshard/operation_queue_timer.h
@@ -1,12 +1,13 @@
#pragma once
#include "schemeshard_identificators.h"
+#include "schemeshard_info_types.h"
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/util/operation_queue.h>
-// TODO: this is a good candidate for core/util, but since
+// TODO: TOperationQueueWithTimer is a good candidate for core/util, but since
// it uses actorlib_impl, which depends on core/util, it
// can't be part of util. No other better place yet and since
// it is used in schemedard only then I put it here.
@@ -39,7 +40,15 @@ public:
typename TBase::IStarter& starter)
: TActorBase(&TThis::StateWork)
, TBase(config, starter, *this)
- { }
+ {}
+
+ template <typename TReadyQueueConfig>
+ TOperationQueueWithTimer(const typename TBase::TConfig& config,
+ const TReadyQueueConfig& queueConfig,
+ typename TBase::IStarter& starter)
+ : TActorBase(&TThis::StateWork)
+ , TBase(config, queueConfig, starter, *this)
+ {}
void Shutdown(const TActorContext &ctx) {
if (LongTimerId)
@@ -82,31 +91,33 @@ namespace NSchemeShard {
struct TShardCompactionInfo {
TShardIdx ShardIdx;
+
ui64 SearchHeight = 0;
+ ui64 LastFullCompactionTs = 0;
+ ui64 RowCount = 0;
+ ui64 RowDeletes = 0;
explicit TShardCompactionInfo(const TShardIdx& id)
: ShardIdx(id)
{}
- TShardCompactionInfo(const TShardIdx& id, ui64 searchHeight)
+ TShardCompactionInfo(const TShardIdx& id, const TTableInfo::TPartitionStats& stats)
: ShardIdx(id)
- , SearchHeight(searchHeight)
+ , SearchHeight(stats.SearchHeight)
+ , LastFullCompactionTs(stats.FullCompactionTs)
+ , RowCount(stats.RowCount)
+ , RowDeletes(stats.RowDeletes)
{}
TShardCompactionInfo(const TShardCompactionInfo&) = default;
+ TShardCompactionInfo& operator =(const TShardCompactionInfo& rhs) = default;
+
bool operator ==(const TShardCompactionInfo& rhs) const {
// note that only identity intentionally checked
return ShardIdx == rhs.ShardIdx;
}
- TShardCompactionInfo& operator =(const TShardCompactionInfo& rhs) {
- // TODO: assert that ID's are the same, because we
- // use it as update rather than real assignment
- SearchHeight = rhs.SearchHeight;
- return *this;
- }
-
size_t Hash() const {
return ShardIdx.Hash();
}
@@ -122,12 +133,241 @@ struct TShardCompactionInfo {
}
};
+ struct TLessByCompactionTs {
+ bool operator()(const TShardCompactionInfo& lhs, const TShardCompactionInfo& rhs) const {
+ // on top we have items with less TS, i.e. older ones
+ return lhs.LastFullCompactionTs < rhs.LastFullCompactionTs;
+ }
+ };
+
+ struct TLessByRowDeletes {
+ bool operator()(const TShardCompactionInfo& lhs, const TShardCompactionInfo& rhs) const {
+ // note ">" is intentional to have on top items with bigger number of deleted rows
+ return lhs.RowDeletes > rhs.RowDeletes;
+ }
+ };
+
TString ToString() const {
TStringStream ss;
- ss << "{" << ShardIdx << "," << SearchHeight << "}";
+ ss << "{" << ShardIdx
+ << ", SH# " << SearchHeight
+ << ", Rows# " << RowCount
+ << ", Deletes# " << RowDeletes
+ << ", Compaction# " << TInstant::Seconds(LastFullCompactionTs) << "}";
+ return ss.Str();
+ }
+};
+
+// The queue contains multiple queues inside:
+// * by last full compaction TS
+// * by search height
+// * by number of deleted rows
+// Queues are active in round robin manner. Same shard might
+// be in all queues.
+//
+// Note that in Enqueue we do some check and might skip
+// the shard being enqueued depending on config.
+//
+// When TOperationQueue::Update() calls TCompactionQueueImpl::UpdateIfFound(),
+// TCompactionQueueImpl might remove updated item depending on the config, but
+// TOperationQueue will not remove the item from running/waiting items, it will
+// be fully deleted only when TOperationQueue tries to TCompactionQueueImpl::Enqueue()
+// the item again.
+class TCompactionQueueImpl {
+public:
+ struct TConfig {
+ ui32 SearchHeightThreshold = 0;
+ ui32 RowDeletesThreshold = 0;
+ ui32 RowCountThreshold = 0;
+
+ TConfig() = default;
+ };
+
+private:
+ using TCompactionQueueLastCompaction = NOperationQueue::TQueueWithPriority<
+ TShardCompactionInfo,
+ TShardCompactionInfo::TLessByCompactionTs>;
+
+ using TCompactionQueueSearchHeight = NOperationQueue::TQueueWithPriority<
+ TShardCompactionInfo,
+ TShardCompactionInfo::TLessBySearchHeight>;
+
+ using TCompactionQueueRowDeletes = NOperationQueue::TQueueWithPriority<
+ TShardCompactionInfo,
+ TShardCompactionInfo::TLessByRowDeletes>;
+
+ // Enumeration defines round robin order
+ enum class EActiveQueue {
+ ByLastCompaction, // must be first
+ BySearchHeight,
+ ByRowDeletes, // must be last, see PopFront()
+ };
+
+private:
+ TConfig Config;
+
+ // all shards from other queues always go into this queue,
+ // i.e. if shard presents in any other queue it also presents here
+ TCompactionQueueLastCompaction QueueLastCompaction;
+
+ // note that it can be empty depending on stats and SearchHeightThreshold
+ TCompactionQueueSearchHeight QueueSearchHeight;
+
+ // note that it can be empty depending on stats and RowDeletesThreshold
+ TCompactionQueueRowDeletes QueueRowDeletes;
+
+ EActiveQueue ActiveQueue = EActiveQueue::ByLastCompaction;
+
+public:
+ TCompactionQueueImpl() = default;
+
+ TCompactionQueueImpl(const TConfig& config)
+ : Config(config)
+ {}
+
+ void UpdateConfig(const TConfig& config) {
+ if (&Config != &config)
+ Config = config;
+ }
+
+ bool Enqueue(const TShardCompactionInfo& info) {
+ if (info.RowCount < Config.RowCountThreshold) {
+ return false;
+ }
+
+ if (info.SearchHeight >= Config.SearchHeightThreshold)
+ QueueSearchHeight.Enqueue(info);
+
+ if (info.RowDeletes >= Config.RowDeletesThreshold) {
+ QueueRowDeletes.Enqueue(info);
+ }
+
+ return QueueLastCompaction.Enqueue(info);
+ }
+
+ bool Remove(const TShardCompactionInfo& info) {
+ QueueSearchHeight.Remove(info);
+ QueueRowDeletes.Remove(info);
+ return QueueLastCompaction.Remove(info);
+ }
+
+ bool UpdateIfFound(const TShardCompactionInfo& info) {
+ if (info.RowCount < Config.RowCountThreshold) {
+ return Remove(info);
+ }
+
+ if (info.SearchHeight >= Config.SearchHeightThreshold) {
+ QueueSearchHeight.UpdateIfFound(info);
+ } else {
+ QueueSearchHeight.Remove(info);
+ }
+
+ if (info.RowDeletes >= Config.RowDeletesThreshold) {
+ QueueRowDeletes.UpdateIfFound(info);
+ } else {
+ QueueRowDeletes.Remove(info);
+ }
+
+ return QueueLastCompaction.UpdateIfFound(info);
+ }
+
+ void Clear() {
+ QueueSearchHeight.Clear();
+ QueueRowDeletes.Clear();
+ QueueLastCompaction.Clear();
+ }
+
+ const TShardCompactionInfo& Front() const {
+ switch (ActiveQueue) {
+ case EActiveQueue::ByLastCompaction:
+ return QueueLastCompaction.Front();
+ case EActiveQueue::BySearchHeight:
+ return QueueSearchHeight.Front();
+ case EActiveQueue::ByRowDeletes:
+ return QueueRowDeletes.Front();
+ }
+ }
+
+ void PopFront() {
+ const auto& front = Front();
+ switch (ActiveQueue) {
+ case EActiveQueue::ByLastCompaction: {
+ QueueSearchHeight.Remove(front);
+ QueueRowDeletes.Remove(front);
+ QueueLastCompaction.PopFront();
+ break;
+ }
+ case EActiveQueue::BySearchHeight: {
+ QueueLastCompaction.Remove(front);
+ QueueRowDeletes.Remove(front);
+ QueueSearchHeight.PopFront();
+ break;
+ }
+ case EActiveQueue::ByRowDeletes: {
+ QueueLastCompaction.Remove(front);
+ QueueSearchHeight.Remove(front);
+ QueueRowDeletes.PopFront();
+ break;
+ }
+ }
+
+ switch (ActiveQueue) {
+ case EActiveQueue::ByLastCompaction:
+ if (!QueueSearchHeight.Empty()) {
+ ActiveQueue = EActiveQueue::BySearchHeight;
+ break;
+ }
+ [[fallthrough]];
+ case EActiveQueue::BySearchHeight:
+ if (!QueueRowDeletes.Empty()) {
+ ActiveQueue = EActiveQueue::ByRowDeletes;
+ break;
+ }
+ [[fallthrough]];
+ case EActiveQueue::ByRowDeletes:
+ ActiveQueue = EActiveQueue::ByLastCompaction;
+ }
+ }
+
+ bool Empty() const {
+ return QueueLastCompaction.Empty();
+ }
+
+ size_t Size() const {
+ return QueueLastCompaction.Size();
+ }
+
+ size_t SizeBySearchHeight() const {
+ return QueueSearchHeight.Size();
+ }
+
+ size_t SizeByRowDeletes() const {
+ return QueueRowDeletes.Size();
+ }
+
+ TString DumpQueueFronts() const {
+ TStringStream ss;
+ ss << "LastCompaction: {";
+ if (!QueueLastCompaction.Empty())
+ ss << QueueLastCompaction.Front();
+ ss << "}, SearchHeight: {";
+ if (!QueueSearchHeight.Empty())
+ ss << QueueSearchHeight.Front();
+ ss << "}, RowDeletes: {";
+ if (!QueueRowDeletes.Empty())
+ ss << QueueRowDeletes.Front();
+ ss << "}";
return ss.Str();
}
};
} // NSchemeShard
} // NKikimr
+
+template<>
+inline void Out<NKikimr::NSchemeShard::TShardCompactionInfo>(
+ IOutputStream& o,
+ const NKikimr::NSchemeShard::TShardCompactionInfo& info)
+{
+ o << info.ToString();
+}
diff --git a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
index faad71935cb..db6d9e329c4 100644
--- a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
@@ -46,8 +46,11 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T
record.GetPathId().GetLocalId());
// it's OK to OnDone InvalidShardIdx
- // note, that we set 0 search height to move shard to the end of queue
- auto duration = CompactionQueue->OnDone(TShardCompactionInfo(shardIdx, 0));
+ // move shard to the end of all queues
+ TInstant now = AppData(ctx)->TimeProvider->Now();
+ TTableInfo::TPartitionStats stats;
+ stats.FullCompactionTs = now.Seconds();
+ auto duration = CompactionQueue->OnDone(TShardCompactionInfo(shardIdx, stats));
if (shardIdx == InvalidShardIdx) {
LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished background compaction of unknown shard "
diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
index b2e7db88b79..6cb5704ad61 100644
--- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
@@ -175,6 +175,7 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte
newStats.WriteIops = GetIops(tabletMetrics.GetGroupWriteIops());
newStats.PartCount = tableStats.GetPartCount();
newStats.SearchHeight = tableStats.GetSearchHeight();
+ newStats.FullCompactionTs = tableStats.GetLastFullCompactionTs();
newStats.StartTime = TInstant::MilliSeconds(rec.GetStartTime());
for (ui64 tabletId : rec.GetUserTablePartOwners()) {
newStats.PartOwners.insert(TTabletId(tabletId));
@@ -191,13 +192,9 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte
table->UpdateShardStats(shardIdx, newStats);
if (Self->CompactionQueue) {
- TShardCompactionInfo compactionInfo(shardIdx, newStats.SearchHeight);
- if (newStats.SearchHeight >= Self->CompactionSearchHeightThreshold) {
- if (!Self->CompactionQueue->Update(compactionInfo))
- Self->CompactionQueue->Enqueue(std::move(compactionInfo));
- } else {
- Self->CompactionQueue->Remove(std::move(compactionInfo));
- }
+ TShardCompactionInfo compactionInfo(shardIdx, newStats);
+ if (!Self->CompactionQueue->Update(compactionInfo))
+ Self->CompactionQueue->Enqueue(std::move(compactionInfo));
}
NIceDb::TNiceDb db(txc.DB);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 0715543a229..7176d01bd4c 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -5781,14 +5781,10 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, T
if (!tableInfo->IsBackup) {
for (const auto& p: newPartitioning) {
- ui64 searchHeight = 0;
const auto& partitionStats = tableInfo->GetStats().PartitionStats;
auto it = partitionStats.find(p.ShardIdx);
- if (it != partitionStats.end())
- searchHeight = it->second.SearchHeight;
-
- if (searchHeight >= CompactionSearchHeightThreshold) {
- CompactionQueue->Enqueue(TShardCompactionInfo(p.ShardIdx, searchHeight));
+ if (it != partitionStats.end()) {
+ CompactionQueue->Enqueue(TShardCompactionInfo(p.ShardIdx, it->second));
}
}
}
@@ -5978,7 +5974,12 @@ void TSchemeShard::ConfigureCompactionQueue(
const NKikimrConfig::TCompactionConfig::TBackgroundCompactionConfig& config,
const TActorContext &ctx)
{
- CompactionSearchHeightThreshold = config.GetSearchHeightThreshold();
+ // note that we use TCompactionQueueImpl::TConfig
+ // instead of its base NOperationQueue::TConfig
+ TCompactionQueueImpl::TConfig queueConfig;
+ queueConfig.SearchHeightThreshold = config.GetSearchHeightThreshold();
+ queueConfig.RowDeletesThreshold = config.GetRowDeletesThreshold();
+ queueConfig.RowCountThreshold = config.GetRowCountThreshold();
TCompactionQueue::TConfig compactionConfig;
@@ -5998,10 +5999,11 @@ void TSchemeShard::ConfigureCompactionQueue(
<< ", Rate# " << compactionConfig.Rate);
if (CompactionQueue) {
- CompactionQueue->UpdateConfig(compactionConfig);
+ CompactionQueue->UpdateConfig(compactionConfig, queueConfig);
} else {
CompactionQueue = new TCompactionQueue(
compactionConfig,
+ queueConfig,
CompactionStarter);
}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index e1a1d08b9a6..f41a5fbc9db 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -75,13 +75,9 @@ private:
TSchemeShard* Self;
};
- using TCompactionBackendQueue = NOperationQueue::TQueueWithPriority<
- TShardCompactionInfo,
- TShardCompactionInfo::TLessBySearchHeight>;
-
using TCompactionQueue = NOperationQueue::TOperationQueueWithTimer<
TShardCompactionInfo,
- TCompactionBackendQueue,
+ TCompactionQueueImpl,
TEvPrivate::EvRunBackgroundCompaction>;
class TCompactionStarter : public TCompactionQueue::IStarter {
@@ -199,7 +195,6 @@ public:
TCompactionQueue* CompactionQueue = nullptr;
bool EnableBackgroundCompaction = false;
bool EnableBackgroundCompactionServerless = false;
- ui32 CompactionSearchHeightThreshold = 0;
TShardDeleter ShardDeleter;
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 2e5710c3d3e..fd0bfd5183e 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -242,6 +242,7 @@ struct TTableInfo : public TSimpleRefCount<TTableInfo> {
THashSet<TTabletId> PartOwners;
ui64 PartCount = 0;
ui64 SearchHeight = 0;
+ ui64 FullCompactionTs = 0;
ui32 ShardState = NKikimrTxDataShard::Unknown;
// True when PartOwners has parts from other tablets
diff --git a/ydb/core/tx/schemeshard/ut_compaction.cpp b/ydb/core/tx/schemeshard/ut_compaction.cpp
index 59f8de87094..f521a282028 100644
--- a/ydb/core/tx/schemeshard/ut_compaction.cpp
+++ b/ydb/core/tx/schemeshard/ut_compaction.cpp
@@ -1,8 +1,12 @@
+#include "operation_queue_timer.h"
+
#include <ydb/core/cms/console/console.h>
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
-#include <ydb/core/tx/schemeshard/schemeshard_billing_helpers.h>
#include <ydb/core/tx/datashard/datashard.h>
+#include <algorithm>
+#include <random>
+
using namespace NKikimr;
using namespace NSchemeShardUT_Private;
@@ -43,6 +47,7 @@ void SetFeatures(
// little hack to simplify life
auto* compactionConfig = request->Record.MutableConfig()->MutableCompactionConfig();
compactionConfig->MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0);
+ compactionConfig->MutableBackgroundCompactionConfig()->SetRowCountThreshold(0);
auto sender = runtime.AllocateEdgeActor();
@@ -76,6 +81,7 @@ void DisableBackgroundCompactionViaRestart(
// little hack to simplify life
auto& compactionConfig = runtime.GetAppData().CompactionConfig;
compactionConfig.MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0);
+ compactionConfig.MutableBackgroundCompactionConfig()->SetRowCountThreshold(0);
TActorId sender = runtime.AllocateEdgeActor();
RebootTablet(runtime, schemeShard, sender);
@@ -94,6 +100,7 @@ void EnableBackgroundCompactionViaRestart(
// little hack to simplify life
auto& compactionConfig = runtime.GetAppData().CompactionConfig;
compactionConfig.MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0);
+ compactionConfig.MutableBackgroundCompactionConfig()->SetRowCountThreshold(0);
TActorId sender = runtime.AllocateEdgeActor();
RebootTablet(runtime, schemeShard, sender);
@@ -425,4 +432,220 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) {
CheckNoCompactions(runtime, env, schemeshardId, "/MyRoot/User/Simple");
}
-}
+};
+
+namespace NKikimr::NSchemeShard {
+
+Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
+ constexpr TShardIdx ShardIdx = TShardIdx(11, 17);
+
+ Y_UNIT_TEST(EnqueuBelowSearchHeightThreshold) {
+ TCompactionQueueImpl::TConfig config;
+ config.SearchHeightThreshold = 10;
+ config.RowDeletesThreshold = 10;
+
+ TTableInfo::TPartitionStats stats;
+ stats.RowCount = 10;
+ stats.RowDeletes = 100;
+ stats.SearchHeight = 3;
+
+ TCompactionQueueImpl queue(config);
+ UNIT_ASSERT(queue.Enqueue({ShardIdx, stats}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 1UL);
+ }
+
+ Y_UNIT_TEST(EnqueueBelowRowDeletesThreshold) {
+ TCompactionQueueImpl::TConfig config;
+ config.SearchHeightThreshold = 10;
+ config.RowDeletesThreshold = 10;
+
+ TTableInfo::TPartitionStats stats;
+ stats.RowCount = 10;
+ stats.RowDeletes = 1;
+ stats.SearchHeight = 20;
+
+ TCompactionQueueImpl queue(config);
+ UNIT_ASSERT(queue.Enqueue({ShardIdx, stats}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
+ }
+
+ Y_UNIT_TEST(ShouldNotEnqueueEmptyShard) {
+ TCompactionQueueImpl::TConfig config;
+ config.SearchHeightThreshold = 10;
+ config.RowDeletesThreshold = 10;
+ config.RowCountThreshold = 1;
+
+ TTableInfo::TPartitionStats stats;
+ stats.RowCount = 0;
+ stats.RowDeletes = 1;
+ stats.SearchHeight = 20;
+
+ TCompactionQueueImpl queue(config);
+ UNIT_ASSERT(!queue.Enqueue({ShardIdx, stats}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
+ }
+
+ Y_UNIT_TEST(UpdateBelowSearchHeightThreshold) {
+ TCompactionQueueImpl::TConfig config;
+ config.SearchHeightThreshold = 10;
+ config.RowDeletesThreshold = 10;
+
+ TTableInfo::TPartitionStats stats;
+ stats.RowCount = 10;
+ stats.RowDeletes = 100;
+ stats.SearchHeight = 20;
+
+ TCompactionQueueImpl queue(config);
+ UNIT_ASSERT(queue.Enqueue({ShardIdx, stats}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 1UL);
+
+ stats.SearchHeight = 1;
+ UNIT_ASSERT(queue.UpdateIfFound({ShardIdx, stats}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 1UL);
+ }
+
+ Y_UNIT_TEST(UpdateBelowRowDeletesThreshold) {
+ TCompactionQueueImpl::TConfig config;
+ config.SearchHeightThreshold = 10;
+ config.RowDeletesThreshold = 10;
+
+ TTableInfo::TPartitionStats stats;
+ stats.RowCount = 10;
+ stats.RowDeletes = 1000;
+ stats.SearchHeight = 20;
+
+ TCompactionQueueImpl queue(config);
+ UNIT_ASSERT(queue.Enqueue({ShardIdx, stats}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 1UL);
+
+ stats.RowDeletes = 1;
+ UNIT_ASSERT(queue.UpdateIfFound({ShardIdx, stats}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
+ }
+
+ Y_UNIT_TEST(UpdateWithEmptyShard) {
+ TCompactionQueueImpl::TConfig config;
+ config.RowCountThreshold = 1;
+ config.SearchHeightThreshold = 10;
+ config.RowDeletesThreshold = 10;
+
+ TTableInfo::TPartitionStats stats;
+ stats.RowCount = 10;
+ stats.RowDeletes = 1000;
+ stats.SearchHeight = 20;
+
+ TCompactionQueueImpl queue(config);
+ UNIT_ASSERT(queue.Enqueue({ShardIdx, stats}));
+
+ stats.RowCount = 0;
+ UNIT_ASSERT(queue.UpdateIfFound({ShardIdx, stats}));
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
+ }
+
+ Y_UNIT_TEST(ShouldPopWhenOnlyLastCompactionQueue) {
+ TCompactionQueueImpl::TConfig config;
+ config.RowCountThreshold = 0;
+ config.SearchHeightThreshold = 100;
+ config.RowDeletesThreshold = 100;
+
+ auto makeInfo = [](ui64 idx, ui64 ts) {
+ TShardIdx shardId = TShardIdx(1, idx);
+ TTableInfo::TPartitionStats stats;
+ stats.FullCompactionTs = ts;
+ return TShardCompactionInfo(shardId, stats);
+ };
+
+ std::vector<TShardCompactionInfo> shardInfos = {
+ // id, ts
+ makeInfo(1, 1),
+ makeInfo(2, 2),
+ makeInfo(3, 3),
+ makeInfo(4, 4)
+ };
+
+ auto rng = std::default_random_engine {};
+ std::shuffle(shardInfos.begin(), shardInfos.end(), rng);
+
+ TCompactionQueueImpl queue(config);
+ for (const auto& info: shardInfos) {
+ UNIT_ASSERT(queue.Enqueue(info));
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 4UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
+
+ for (auto i: xrange(1ul, 5UL)) {
+ UNIT_ASSERT(!queue.Empty());
+ UNIT_ASSERT_VALUES_EQUAL(queue.Front().ShardIdx.GetLocalId().GetValue(), i);
+ queue.PopFront();
+ }
+
+ UNIT_ASSERT(queue.Empty());
+ }
+
+ Y_UNIT_TEST(CheckOrderWhenAllQueues) {
+ TCompactionQueueImpl::TConfig config;
+ config.RowCountThreshold = 0;
+ config.SearchHeightThreshold = 10;
+ config.RowDeletesThreshold = 10;
+
+ auto makeInfo = [](ui64 idx, ui64 ts, ui64 sh, ui64 d) {
+ TShardIdx shardId = TShardIdx(1, idx);
+ TTableInfo::TPartitionStats stats;
+ stats.FullCompactionTs = ts;
+ stats.SearchHeight = sh;
+ stats.RowDeletes = d;
+ return TShardCompactionInfo(shardId, stats);
+ };
+
+ std::vector<TShardCompactionInfo> shardInfos = {
+ // id, ts, sh, d
+ makeInfo(1, 1, 100, 100), // top in TS
+ makeInfo(2, 3, 100, 50), // top in SH
+ makeInfo(3, 4, 50, 100), // top in D
+ makeInfo(4, 2, 0, 0), // 2 in TS
+ makeInfo(5, 3, 90, 0), // 2 in SH
+ makeInfo(6, 4, 0, 90), // 2 in D
+ makeInfo(7, 3, 0, 0), // 3 in TS
+ makeInfo(8, 5, 0, 80), // 3 in D
+ makeInfo(9, 5, 0, 0), // 4 in TS, since this point only TS queue contains items
+ makeInfo(10, 6, 0, 0), // 5 in TS
+ makeInfo(11, 7, 0, 0), // 6 in TS
+ };
+
+ auto rng = std::default_random_engine {};
+ std::shuffle(shardInfos.begin(), shardInfos.end(), rng);
+
+ TCompactionQueueImpl queue(config);
+ for(const auto& info: shardInfos) {
+ UNIT_ASSERT(queue.Enqueue(info));
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), shardInfos.size());
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 4UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 5UL);
+
+ for (auto i: xrange(shardInfos.size())) {
+ UNIT_ASSERT(!queue.Empty());
+ UNIT_ASSERT_VALUES_EQUAL(queue.Front().ShardIdx.GetLocalId().GetValue(), i + 1);
+ queue.PopFront();
+ }
+
+ UNIT_ASSERT(queue.Empty());
+ }
+};
+
+} // NKikimr::NSchemeShard
diff --git a/ydb/core/util/operation_queue.h b/ydb/core/util/operation_queue.h
index bf70acc29be..59df79cef0c 100644
--- a/ydb/core/util/operation_queue.h
+++ b/ydb/core/util/operation_queue.h
@@ -113,6 +113,13 @@ private:
TIntrusiveHeap<THeapItem, typename THeapItem::THeapIndex, typename THeapItem::THeapItemCompare> Heap;
public:
+ TQueueWithPriority() = default;
+
+ bool Contains(const T& item) const {
+ THeapItem heapItem(item);
+ return Items.find(heapItem) != Items.end();
+ }
+
template<typename T2>
bool Enqueue(T2&& item) {
THeapItem heapItem(std::forward<T2>(item));
@@ -262,7 +269,25 @@ public:
, Starter(starter)
, Timer(timer)
{
- UpdateConfig(Config);
+ UpdateConfig(config);
+ }
+
+ template <typename TReadyQueueConfig>
+ TOperationQueue(const TConfig& config,
+ const TReadyQueueConfig& queueConfig,
+ IStarter& starter,
+ ITimer& timer)
+ : Config(config)
+ , Starter(starter)
+ , Timer(timer)
+ {
+ UpdateConfig(config, queueConfig);
+ }
+
+ template <typename TReadyQueueConfig>
+ void UpdateConfig(const TConfig& config, const TReadyQueueConfig& queueConfig) {
+ UpdateConfig(config);
+ ReadyQueue.UpdateConfig(queueConfig);
}
void UpdateConfig(const TConfig& config) {
diff --git a/ydb/core/util/operation_queue_priority_ut.cpp b/ydb/core/util/operation_queue_priority_ut.cpp
index 1639ef8c9d7..6912e0c8c81 100644
--- a/ydb/core/util/operation_queue_priority_ut.cpp
+++ b/ydb/core/util/operation_queue_priority_ut.cpp
@@ -15,11 +15,14 @@ struct TPriorityItem {
int Id = 0;
int Priority = 0;
+ explicit TPriorityItem(int id)
+ : Id(id)
+ {}
+
TPriorityItem(int id, int priority)
: Id(id)
, Priority(priority)
- {
- }
+ {}
TPriorityItem(const TPriorityItem&) = default;
@@ -28,13 +31,7 @@ struct TPriorityItem {
return Id == rhs.Id;
}
- TPriorityItem& operator =(const TPriorityItem& rhs) {
- // TODO: assert that ID's are the same, because we
- // use it as update rather than real assignment
- Id = rhs.Id;
- Priority = rhs.Priority;
- return *this;
- }
+ TPriorityItem& operator =(const TPriorityItem& rhs) = default;
size_t Hash() const {
return THash<int>()(Id);
@@ -114,6 +111,36 @@ struct TOperationStarter : public TPriorityQueue::IStarter, public NOperationQue
} // namespace
+Y_UNIT_TEST_SUITE(TPriorityQueueTest) {
+ Y_UNIT_TEST(TestOrder) {
+ TQueueWithPriority<TPriorityItem, TPriorityItem::TLessByPriority> queue;
+ queue.Enqueue(TPriorityItem(2, 50));
+ queue.Enqueue(TPriorityItem(3, 100));
+ queue.Enqueue(TPriorityItem(6, 90));
+ queue.Enqueue(TPriorityItem(8, 80));
+ queue.Enqueue(TPriorityItem(1, 100));
+
+ UNIT_ASSERT(queue.Remove(TPriorityItem(1)));
+ UNIT_ASSERT(queue.Remove(TPriorityItem(2)));
+
+ UNIT_ASSERT_VALUES_EQUAL(queue.Front(), TPriorityItem(3, 100));
+ queue.PopFront();
+
+ UNIT_ASSERT(!queue.Remove(TPriorityItem(4)));
+ UNIT_ASSERT(!queue.Remove(TPriorityItem(5)));
+
+ UNIT_ASSERT_VALUES_EQUAL(queue.Front(), TPriorityItem(6, 90));
+ queue.PopFront();
+
+ UNIT_ASSERT(!queue.Remove(TPriorityItem(7)));
+
+ UNIT_ASSERT_VALUES_EQUAL(queue.Front(), TPriorityItem(8, 80));
+ queue.PopFront();
+
+ UNIT_ASSERT(queue.Empty());
+ }
+};
+
Y_UNIT_TEST_SUITE(TPriorityOperationQueueTest) {
Y_UNIT_TEST(ShouldStartEmpty) {
TPriorityQueue::TConfig config;
@@ -335,6 +362,9 @@ void Out<TSomeQueue::TItemWithTs>(IOutputStream& o, const TSomeQueue::TItemWithT
}
template<>
-void Out<NKikimr::NOperationQueue::TPriorityItem>(IOutputStream& o, const NKikimr::NOperationQueue::TPriorityItem& item) {
+void Out<NKikimr::NOperationQueue::TPriorityItem>(
+ IOutputStream& o,
+ const NKikimr::NOperationQueue::TPriorityItem& item)
+{
o << item.ToString();
}