diff options
author | Evgeniy Ivanov <eivanov89@yandex-team.ru> | 2022-02-11 12:36:12 +0300 |
---|---|---|
committer | Evgeniy Ivanov <eivanov89@yandex-team.ru> | 2022-02-11 12:36:12 +0300 |
commit | 97d0a681d1ee69d18453ee1f85ac1efccaae6031 (patch) | |
tree | 47f475608377a8e83a5884a9ff7936f09f190c79 | |
parent | 052532eb6a2c1801fbb52dd18f9381c4c545b5d7 (diff) | |
download | ydb-97d0a681d1ee69d18453ee1f85ac1efccaae6031.tar.gz |
KIKIMR-9748: impl of background compaction by compaction ts and by deleted rows
ref:6d28c65d2d88772834f55a685f94a088467e5e40
-rw-r--r-- | ydb/core/protos/config.proto | 9 | ||||
-rw-r--r-- | ydb/core/sys_view/ut_common.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_compute.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/operation_queue_timer.h | 264 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__compaction.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__table_stats.cpp | 11 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.cpp | 18 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.h | 7 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_info_types.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_compaction.cpp | 227 | ||||
-rw-r--r-- | ydb/core/util/operation_queue.h | 27 | ||||
-rw-r--r-- | ydb/core/util/operation_queue_priority_ut.cpp | 50 |
12 files changed, 578 insertions, 49 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 3eab6e1ac9c..2164818e8de 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1306,6 +1306,15 @@ message TCompactionConfig { optional uint64 WakeupIntervalSeconds = 4 [default = 60]; optional uint64 MinCompactionRepeatDelay = 5 [default = 600]; optional uint32 SearchHeightThreshold = 6 [default = 10]; + + // TODO: enable, when schemeshard receive proper stat + optional uint32 RowDeletesThreshold = 7 [default = 4294967295]; + + // for tests: to allow compaction requests to empty shards + // shards below this threshold are not background compacted + // at all even when searchHeight or deleted rows match + // corresponding thresholds + optional uint32 RowCountThreshold = 8 [default = 1]; } optional TBackgroundCompactionConfig BackgroundCompactionConfig = 1; diff --git a/ydb/core/sys_view/ut_common.cpp b/ydb/core/sys_view/ut_common.cpp index 6165258568c..525bc5787ae 100644 --- a/ydb/core/sys_view/ut_common.cpp +++ b/ydb/core/sys_view/ut_common.cpp @@ -41,6 +41,11 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, bool Settings->SetDynamicNodeCount(dynamicNodes); Settings->SetKqpSettings(kqpSettings); + // in some tests we check data size, which depends on compaction, + NKikimrConfig::TFeatureFlags featureFlags; + featureFlags.SetEnableBackgroundCompaction(false); + Settings->SetFeatureFlags(featureFlags); + if (enableSVP) { Settings->SetEnablePersistentQueryStats(true); Settings->SetEnableDbCounters(true); diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp index edd6ca8e194..6e227a5c98e 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp @@ -249,7 +249,6 @@ bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidated columnTags.push_back(column.Column); } } - Y_VERIFY_DEBUG(!columnTags.empty()); bool ready = Database->Precharge(localTid, from, diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h index 655fde7c18e..fe6483c9d89 100644 --- a/ydb/core/tx/schemeshard/operation_queue_timer.h +++ b/ydb/core/tx/schemeshard/operation_queue_timer.h @@ -1,12 +1,13 @@ #pragma once #include "schemeshard_identificators.h" +#include "schemeshard_info_types.h" #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/appdata.h> #include <ydb/core/util/operation_queue.h> -// TODO: this is a good candidate for core/util, but since +// TODO: TOperationQueueWithTimer is a good candidate for core/util, but since // it uses actorlib_impl, which depends on core/util, it // can't be part of util. No other better place yet and since // it is used in schemedard only then I put it here. @@ -39,7 +40,15 @@ public: typename TBase::IStarter& starter) : TActorBase(&TThis::StateWork) , TBase(config, starter, *this) - { } + {} + + template <typename TReadyQueueConfig> + TOperationQueueWithTimer(const typename TBase::TConfig& config, + const TReadyQueueConfig& queueConfig, + typename TBase::IStarter& starter) + : TActorBase(&TThis::StateWork) + , TBase(config, queueConfig, starter, *this) + {} void Shutdown(const TActorContext &ctx) { if (LongTimerId) @@ -82,31 +91,33 @@ namespace NSchemeShard { struct TShardCompactionInfo { TShardIdx ShardIdx; + ui64 SearchHeight = 0; + ui64 LastFullCompactionTs = 0; + ui64 RowCount = 0; + ui64 RowDeletes = 0; explicit TShardCompactionInfo(const TShardIdx& id) : ShardIdx(id) {} - TShardCompactionInfo(const TShardIdx& id, ui64 searchHeight) + TShardCompactionInfo(const TShardIdx& id, const TTableInfo::TPartitionStats& stats) : ShardIdx(id) - , SearchHeight(searchHeight) + , SearchHeight(stats.SearchHeight) + , LastFullCompactionTs(stats.FullCompactionTs) + , RowCount(stats.RowCount) + , RowDeletes(stats.RowDeletes) {} TShardCompactionInfo(const TShardCompactionInfo&) = default; + TShardCompactionInfo& operator =(const TShardCompactionInfo& rhs) = default; + bool operator ==(const TShardCompactionInfo& rhs) const { // note that only identity intentionally checked return ShardIdx == rhs.ShardIdx; } - TShardCompactionInfo& operator =(const TShardCompactionInfo& rhs) { - // TODO: assert that ID's are the same, because we - // use it as update rather than real assignment - SearchHeight = rhs.SearchHeight; - return *this; - } - size_t Hash() const { return ShardIdx.Hash(); } @@ -122,12 +133,241 @@ struct TShardCompactionInfo { } }; + struct TLessByCompactionTs { + bool operator()(const TShardCompactionInfo& lhs, const TShardCompactionInfo& rhs) const { + // on top we have items with less TS, i.e. older ones + return lhs.LastFullCompactionTs < rhs.LastFullCompactionTs; + } + }; + + struct TLessByRowDeletes { + bool operator()(const TShardCompactionInfo& lhs, const TShardCompactionInfo& rhs) const { + // note ">" is intentional to have on top items with bigger number of deleted rows + return lhs.RowDeletes > rhs.RowDeletes; + } + }; + TString ToString() const { TStringStream ss; - ss << "{" << ShardIdx << "," << SearchHeight << "}"; + ss << "{" << ShardIdx + << ", SH# " << SearchHeight + << ", Rows# " << RowCount + << ", Deletes# " << RowDeletes + << ", Compaction# " << TInstant::Seconds(LastFullCompactionTs) << "}"; + return ss.Str(); + } +}; + +// The queue contains multiple queues inside: +// * by last full compaction TS +// * by search height +// * by number of deleted rows +// Queues are active in round robin manner. Same shard might +// be in all queues. +// +// Note that in Enqueue we do some check and might skip +// the shard being enqueued depending on config. +// +// When TOperationQueue::Update() calls TCompactionQueueImpl::UpdateIfFound(), +// TCompactionQueueImpl might remove updated item depending on the config, but +// TOperationQueue will not remove the item from running/waiting items, it will +// be fully deleted only when TOperationQueue tries to TCompactionQueueImpl::Enqueue() +// the item again. +class TCompactionQueueImpl { +public: + struct TConfig { + ui32 SearchHeightThreshold = 0; + ui32 RowDeletesThreshold = 0; + ui32 RowCountThreshold = 0; + + TConfig() = default; + }; + +private: + using TCompactionQueueLastCompaction = NOperationQueue::TQueueWithPriority< + TShardCompactionInfo, + TShardCompactionInfo::TLessByCompactionTs>; + + using TCompactionQueueSearchHeight = NOperationQueue::TQueueWithPriority< + TShardCompactionInfo, + TShardCompactionInfo::TLessBySearchHeight>; + + using TCompactionQueueRowDeletes = NOperationQueue::TQueueWithPriority< + TShardCompactionInfo, + TShardCompactionInfo::TLessByRowDeletes>; + + // Enumeration defines round robin order + enum class EActiveQueue { + ByLastCompaction, // must be first + BySearchHeight, + ByRowDeletes, // must be last, see PopFront() + }; + +private: + TConfig Config; + + // all shards from other queues always go into this queue, + // i.e. if shard presents in any other queue it also presents here + TCompactionQueueLastCompaction QueueLastCompaction; + + // note that it can be empty depending on stats and SearchHeightThreshold + TCompactionQueueSearchHeight QueueSearchHeight; + + // note that it can be empty depending on stats and RowDeletesThreshold + TCompactionQueueRowDeletes QueueRowDeletes; + + EActiveQueue ActiveQueue = EActiveQueue::ByLastCompaction; + +public: + TCompactionQueueImpl() = default; + + TCompactionQueueImpl(const TConfig& config) + : Config(config) + {} + + void UpdateConfig(const TConfig& config) { + if (&Config != &config) + Config = config; + } + + bool Enqueue(const TShardCompactionInfo& info) { + if (info.RowCount < Config.RowCountThreshold) { + return false; + } + + if (info.SearchHeight >= Config.SearchHeightThreshold) + QueueSearchHeight.Enqueue(info); + + if (info.RowDeletes >= Config.RowDeletesThreshold) { + QueueRowDeletes.Enqueue(info); + } + + return QueueLastCompaction.Enqueue(info); + } + + bool Remove(const TShardCompactionInfo& info) { + QueueSearchHeight.Remove(info); + QueueRowDeletes.Remove(info); + return QueueLastCompaction.Remove(info); + } + + bool UpdateIfFound(const TShardCompactionInfo& info) { + if (info.RowCount < Config.RowCountThreshold) { + return Remove(info); + } + + if (info.SearchHeight >= Config.SearchHeightThreshold) { + QueueSearchHeight.UpdateIfFound(info); + } else { + QueueSearchHeight.Remove(info); + } + + if (info.RowDeletes >= Config.RowDeletesThreshold) { + QueueRowDeletes.UpdateIfFound(info); + } else { + QueueRowDeletes.Remove(info); + } + + return QueueLastCompaction.UpdateIfFound(info); + } + + void Clear() { + QueueSearchHeight.Clear(); + QueueRowDeletes.Clear(); + QueueLastCompaction.Clear(); + } + + const TShardCompactionInfo& Front() const { + switch (ActiveQueue) { + case EActiveQueue::ByLastCompaction: + return QueueLastCompaction.Front(); + case EActiveQueue::BySearchHeight: + return QueueSearchHeight.Front(); + case EActiveQueue::ByRowDeletes: + return QueueRowDeletes.Front(); + } + } + + void PopFront() { + const auto& front = Front(); + switch (ActiveQueue) { + case EActiveQueue::ByLastCompaction: { + QueueSearchHeight.Remove(front); + QueueRowDeletes.Remove(front); + QueueLastCompaction.PopFront(); + break; + } + case EActiveQueue::BySearchHeight: { + QueueLastCompaction.Remove(front); + QueueRowDeletes.Remove(front); + QueueSearchHeight.PopFront(); + break; + } + case EActiveQueue::ByRowDeletes: { + QueueLastCompaction.Remove(front); + QueueSearchHeight.Remove(front); + QueueRowDeletes.PopFront(); + break; + } + } + + switch (ActiveQueue) { + case EActiveQueue::ByLastCompaction: + if (!QueueSearchHeight.Empty()) { + ActiveQueue = EActiveQueue::BySearchHeight; + break; + } + [[fallthrough]]; + case EActiveQueue::BySearchHeight: + if (!QueueRowDeletes.Empty()) { + ActiveQueue = EActiveQueue::ByRowDeletes; + break; + } + [[fallthrough]]; + case EActiveQueue::ByRowDeletes: + ActiveQueue = EActiveQueue::ByLastCompaction; + } + } + + bool Empty() const { + return QueueLastCompaction.Empty(); + } + + size_t Size() const { + return QueueLastCompaction.Size(); + } + + size_t SizeBySearchHeight() const { + return QueueSearchHeight.Size(); + } + + size_t SizeByRowDeletes() const { + return QueueRowDeletes.Size(); + } + + TString DumpQueueFronts() const { + TStringStream ss; + ss << "LastCompaction: {"; + if (!QueueLastCompaction.Empty()) + ss << QueueLastCompaction.Front(); + ss << "}, SearchHeight: {"; + if (!QueueSearchHeight.Empty()) + ss << QueueSearchHeight.Front(); + ss << "}, RowDeletes: {"; + if (!QueueRowDeletes.Empty()) + ss << QueueRowDeletes.Front(); + ss << "}"; return ss.Str(); } }; } // NSchemeShard } // NKikimr + +template<> +inline void Out<NKikimr::NSchemeShard::TShardCompactionInfo>( + IOutputStream& o, + const NKikimr::NSchemeShard::TShardCompactionInfo& info) +{ + o << info.ToString(); +} diff --git a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp index faad71935cb..db6d9e329c4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp @@ -46,8 +46,11 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T record.GetPathId().GetLocalId()); // it's OK to OnDone InvalidShardIdx - // note, that we set 0 search height to move shard to the end of queue - auto duration = CompactionQueue->OnDone(TShardCompactionInfo(shardIdx, 0)); + // move shard to the end of all queues + TInstant now = AppData(ctx)->TimeProvider->Now(); + TTableInfo::TPartitionStats stats; + stats.FullCompactionTs = now.Seconds(); + auto duration = CompactionQueue->OnDone(TShardCompactionInfo(shardIdx, stats)); if (shardIdx == InvalidShardIdx) { LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished background compaction of unknown shard " diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp index b2e7db88b79..6cb5704ad61 100644 --- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp @@ -175,6 +175,7 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte newStats.WriteIops = GetIops(tabletMetrics.GetGroupWriteIops()); newStats.PartCount = tableStats.GetPartCount(); newStats.SearchHeight = tableStats.GetSearchHeight(); + newStats.FullCompactionTs = tableStats.GetLastFullCompactionTs(); newStats.StartTime = TInstant::MilliSeconds(rec.GetStartTime()); for (ui64 tabletId : rec.GetUserTablePartOwners()) { newStats.PartOwners.insert(TTabletId(tabletId)); @@ -191,13 +192,9 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte table->UpdateShardStats(shardIdx, newStats); if (Self->CompactionQueue) { - TShardCompactionInfo compactionInfo(shardIdx, newStats.SearchHeight); - if (newStats.SearchHeight >= Self->CompactionSearchHeightThreshold) { - if (!Self->CompactionQueue->Update(compactionInfo)) - Self->CompactionQueue->Enqueue(std::move(compactionInfo)); - } else { - Self->CompactionQueue->Remove(std::move(compactionInfo)); - } + TShardCompactionInfo compactionInfo(shardIdx, newStats); + if (!Self->CompactionQueue->Update(compactionInfo)) + Self->CompactionQueue->Enqueue(std::move(compactionInfo)); } NIceDb::TNiceDb db(txc.DB); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 0715543a229..7176d01bd4c 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -5781,14 +5781,10 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, T if (!tableInfo->IsBackup) { for (const auto& p: newPartitioning) { - ui64 searchHeight = 0; const auto& partitionStats = tableInfo->GetStats().PartitionStats; auto it = partitionStats.find(p.ShardIdx); - if (it != partitionStats.end()) - searchHeight = it->second.SearchHeight; - - if (searchHeight >= CompactionSearchHeightThreshold) { - CompactionQueue->Enqueue(TShardCompactionInfo(p.ShardIdx, searchHeight)); + if (it != partitionStats.end()) { + CompactionQueue->Enqueue(TShardCompactionInfo(p.ShardIdx, it->second)); } } } @@ -5978,7 +5974,12 @@ void TSchemeShard::ConfigureCompactionQueue( const NKikimrConfig::TCompactionConfig::TBackgroundCompactionConfig& config, const TActorContext &ctx) { - CompactionSearchHeightThreshold = config.GetSearchHeightThreshold(); + // note that we use TCompactionQueueImpl::TConfig + // instead of its base NOperationQueue::TConfig + TCompactionQueueImpl::TConfig queueConfig; + queueConfig.SearchHeightThreshold = config.GetSearchHeightThreshold(); + queueConfig.RowDeletesThreshold = config.GetRowDeletesThreshold(); + queueConfig.RowCountThreshold = config.GetRowCountThreshold(); TCompactionQueue::TConfig compactionConfig; @@ -5998,10 +5999,11 @@ void TSchemeShard::ConfigureCompactionQueue( << ", Rate# " << compactionConfig.Rate); if (CompactionQueue) { - CompactionQueue->UpdateConfig(compactionConfig); + CompactionQueue->UpdateConfig(compactionConfig, queueConfig); } else { CompactionQueue = new TCompactionQueue( compactionConfig, + queueConfig, CompactionStarter); } } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index e1a1d08b9a6..f41a5fbc9db 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -75,13 +75,9 @@ private: TSchemeShard* Self; }; - using TCompactionBackendQueue = NOperationQueue::TQueueWithPriority< - TShardCompactionInfo, - TShardCompactionInfo::TLessBySearchHeight>; - using TCompactionQueue = NOperationQueue::TOperationQueueWithTimer< TShardCompactionInfo, - TCompactionBackendQueue, + TCompactionQueueImpl, TEvPrivate::EvRunBackgroundCompaction>; class TCompactionStarter : public TCompactionQueue::IStarter { @@ -199,7 +195,6 @@ public: TCompactionQueue* CompactionQueue = nullptr; bool EnableBackgroundCompaction = false; bool EnableBackgroundCompactionServerless = false; - ui32 CompactionSearchHeightThreshold = 0; TShardDeleter ShardDeleter; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 2e5710c3d3e..fd0bfd5183e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -242,6 +242,7 @@ struct TTableInfo : public TSimpleRefCount<TTableInfo> { THashSet<TTabletId> PartOwners; ui64 PartCount = 0; ui64 SearchHeight = 0; + ui64 FullCompactionTs = 0; ui32 ShardState = NKikimrTxDataShard::Unknown; // True when PartOwners has parts from other tablets diff --git a/ydb/core/tx/schemeshard/ut_compaction.cpp b/ydb/core/tx/schemeshard/ut_compaction.cpp index 59f8de87094..f521a282028 100644 --- a/ydb/core/tx/schemeshard/ut_compaction.cpp +++ b/ydb/core/tx/schemeshard/ut_compaction.cpp @@ -1,8 +1,12 @@ +#include "operation_queue_timer.h" + #include <ydb/core/cms/console/console.h> #include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> -#include <ydb/core/tx/schemeshard/schemeshard_billing_helpers.h> #include <ydb/core/tx/datashard/datashard.h> +#include <algorithm> +#include <random> + using namespace NKikimr; using namespace NSchemeShardUT_Private; @@ -43,6 +47,7 @@ void SetFeatures( // little hack to simplify life auto* compactionConfig = request->Record.MutableConfig()->MutableCompactionConfig(); compactionConfig->MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0); + compactionConfig->MutableBackgroundCompactionConfig()->SetRowCountThreshold(0); auto sender = runtime.AllocateEdgeActor(); @@ -76,6 +81,7 @@ void DisableBackgroundCompactionViaRestart( // little hack to simplify life auto& compactionConfig = runtime.GetAppData().CompactionConfig; compactionConfig.MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0); + compactionConfig.MutableBackgroundCompactionConfig()->SetRowCountThreshold(0); TActorId sender = runtime.AllocateEdgeActor(); RebootTablet(runtime, schemeShard, sender); @@ -94,6 +100,7 @@ void EnableBackgroundCompactionViaRestart( // little hack to simplify life auto& compactionConfig = runtime.GetAppData().CompactionConfig; compactionConfig.MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0); + compactionConfig.MutableBackgroundCompactionConfig()->SetRowCountThreshold(0); TActorId sender = runtime.AllocateEdgeActor(); RebootTablet(runtime, schemeShard, sender); @@ -425,4 +432,220 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) { CheckNoCompactions(runtime, env, schemeshardId, "/MyRoot/User/Simple"); } -} +}; + +namespace NKikimr::NSchemeShard { + +Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { + constexpr TShardIdx ShardIdx = TShardIdx(11, 17); + + Y_UNIT_TEST(EnqueuBelowSearchHeightThreshold) { + TCompactionQueueImpl::TConfig config; + config.SearchHeightThreshold = 10; + config.RowDeletesThreshold = 10; + + TTableInfo::TPartitionStats stats; + stats.RowCount = 10; + stats.RowDeletes = 100; + stats.SearchHeight = 3; + + TCompactionQueueImpl queue(config); + UNIT_ASSERT(queue.Enqueue({ShardIdx, stats})); + UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 1UL); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 1UL); + } + + Y_UNIT_TEST(EnqueueBelowRowDeletesThreshold) { + TCompactionQueueImpl::TConfig config; + config.SearchHeightThreshold = 10; + config.RowDeletesThreshold = 10; + + TTableInfo::TPartitionStats stats; + stats.RowCount = 10; + stats.RowDeletes = 1; + stats.SearchHeight = 20; + + TCompactionQueueImpl queue(config); + UNIT_ASSERT(queue.Enqueue({ShardIdx, stats})); + UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 1UL); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 1UL); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL); + } + + Y_UNIT_TEST(ShouldNotEnqueueEmptyShard) { + TCompactionQueueImpl::TConfig config; + config.SearchHeightThreshold = 10; + config.RowDeletesThreshold = 10; + config.RowCountThreshold = 1; + + TTableInfo::TPartitionStats stats; + stats.RowCount = 0; + stats.RowDeletes = 1; + stats.SearchHeight = 20; + + TCompactionQueueImpl queue(config); + UNIT_ASSERT(!queue.Enqueue({ShardIdx, stats})); + UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 0UL); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL); + } + + Y_UNIT_TEST(UpdateBelowSearchHeightThreshold) { + TCompactionQueueImpl::TConfig config; + config.SearchHeightThreshold = 10; + config.RowDeletesThreshold = 10; + + TTableInfo::TPartitionStats stats; + stats.RowCount = 10; + stats.RowDeletes = 100; + stats.SearchHeight = 20; + + TCompactionQueueImpl queue(config); + UNIT_ASSERT(queue.Enqueue({ShardIdx, stats})); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 1UL); + + stats.SearchHeight = 1; + UNIT_ASSERT(queue.UpdateIfFound({ShardIdx, stats})); + UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 1UL); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 1UL); + } + + Y_UNIT_TEST(UpdateBelowRowDeletesThreshold) { + TCompactionQueueImpl::TConfig config; + config.SearchHeightThreshold = 10; + config.RowDeletesThreshold = 10; + + TTableInfo::TPartitionStats stats; + stats.RowCount = 10; + stats.RowDeletes = 1000; + stats.SearchHeight = 20; + + TCompactionQueueImpl queue(config); + UNIT_ASSERT(queue.Enqueue({ShardIdx, stats})); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 1UL); + + stats.RowDeletes = 1; + UNIT_ASSERT(queue.UpdateIfFound({ShardIdx, stats})); + UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 1UL); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 1UL); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL); + } + + Y_UNIT_TEST(UpdateWithEmptyShard) { + TCompactionQueueImpl::TConfig config; + config.RowCountThreshold = 1; + config.SearchHeightThreshold = 10; + config.RowDeletesThreshold = 10; + + TTableInfo::TPartitionStats stats; + stats.RowCount = 10; + stats.RowDeletes = 1000; + stats.SearchHeight = 20; + + TCompactionQueueImpl queue(config); + UNIT_ASSERT(queue.Enqueue({ShardIdx, stats})); + + stats.RowCount = 0; + UNIT_ASSERT(queue.UpdateIfFound({ShardIdx, stats})); + UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 0UL); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL); + } + + Y_UNIT_TEST(ShouldPopWhenOnlyLastCompactionQueue) { + TCompactionQueueImpl::TConfig config; + config.RowCountThreshold = 0; + config.SearchHeightThreshold = 100; + config.RowDeletesThreshold = 100; + + auto makeInfo = [](ui64 idx, ui64 ts) { + TShardIdx shardId = TShardIdx(1, idx); + TTableInfo::TPartitionStats stats; + stats.FullCompactionTs = ts; + return TShardCompactionInfo(shardId, stats); + }; + + std::vector<TShardCompactionInfo> shardInfos = { + // id, ts + makeInfo(1, 1), + makeInfo(2, 2), + makeInfo(3, 3), + makeInfo(4, 4) + }; + + auto rng = std::default_random_engine {}; + std::shuffle(shardInfos.begin(), shardInfos.end(), rng); + + TCompactionQueueImpl queue(config); + for (const auto& info: shardInfos) { + UNIT_ASSERT(queue.Enqueue(info)); + } + + UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 4UL); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL); + + for (auto i: xrange(1ul, 5UL)) { + UNIT_ASSERT(!queue.Empty()); + UNIT_ASSERT_VALUES_EQUAL(queue.Front().ShardIdx.GetLocalId().GetValue(), i); + queue.PopFront(); + } + + UNIT_ASSERT(queue.Empty()); + } + + Y_UNIT_TEST(CheckOrderWhenAllQueues) { + TCompactionQueueImpl::TConfig config; + config.RowCountThreshold = 0; + config.SearchHeightThreshold = 10; + config.RowDeletesThreshold = 10; + + auto makeInfo = [](ui64 idx, ui64 ts, ui64 sh, ui64 d) { + TShardIdx shardId = TShardIdx(1, idx); + TTableInfo::TPartitionStats stats; + stats.FullCompactionTs = ts; + stats.SearchHeight = sh; + stats.RowDeletes = d; + return TShardCompactionInfo(shardId, stats); + }; + + std::vector<TShardCompactionInfo> shardInfos = { + // id, ts, sh, d + makeInfo(1, 1, 100, 100), // top in TS + makeInfo(2, 3, 100, 50), // top in SH + makeInfo(3, 4, 50, 100), // top in D + makeInfo(4, 2, 0, 0), // 2 in TS + makeInfo(5, 3, 90, 0), // 2 in SH + makeInfo(6, 4, 0, 90), // 2 in D + makeInfo(7, 3, 0, 0), // 3 in TS + makeInfo(8, 5, 0, 80), // 3 in D + makeInfo(9, 5, 0, 0), // 4 in TS, since this point only TS queue contains items + makeInfo(10, 6, 0, 0), // 5 in TS + makeInfo(11, 7, 0, 0), // 6 in TS + }; + + auto rng = std::default_random_engine {}; + std::shuffle(shardInfos.begin(), shardInfos.end(), rng); + + TCompactionQueueImpl queue(config); + for(const auto& info: shardInfos) { + UNIT_ASSERT(queue.Enqueue(info)); + } + + UNIT_ASSERT_VALUES_EQUAL(queue.Size(), shardInfos.size()); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 4UL); + UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 5UL); + + for (auto i: xrange(shardInfos.size())) { + UNIT_ASSERT(!queue.Empty()); + UNIT_ASSERT_VALUES_EQUAL(queue.Front().ShardIdx.GetLocalId().GetValue(), i + 1); + queue.PopFront(); + } + + UNIT_ASSERT(queue.Empty()); + } +}; + +} // NKikimr::NSchemeShard diff --git a/ydb/core/util/operation_queue.h b/ydb/core/util/operation_queue.h index bf70acc29be..59df79cef0c 100644 --- a/ydb/core/util/operation_queue.h +++ b/ydb/core/util/operation_queue.h @@ -113,6 +113,13 @@ private: TIntrusiveHeap<THeapItem, typename THeapItem::THeapIndex, typename THeapItem::THeapItemCompare> Heap; public: + TQueueWithPriority() = default; + + bool Contains(const T& item) const { + THeapItem heapItem(item); + return Items.find(heapItem) != Items.end(); + } + template<typename T2> bool Enqueue(T2&& item) { THeapItem heapItem(std::forward<T2>(item)); @@ -262,7 +269,25 @@ public: , Starter(starter) , Timer(timer) { - UpdateConfig(Config); + UpdateConfig(config); + } + + template <typename TReadyQueueConfig> + TOperationQueue(const TConfig& config, + const TReadyQueueConfig& queueConfig, + IStarter& starter, + ITimer& timer) + : Config(config) + , Starter(starter) + , Timer(timer) + { + UpdateConfig(config, queueConfig); + } + + template <typename TReadyQueueConfig> + void UpdateConfig(const TConfig& config, const TReadyQueueConfig& queueConfig) { + UpdateConfig(config); + ReadyQueue.UpdateConfig(queueConfig); } void UpdateConfig(const TConfig& config) { diff --git a/ydb/core/util/operation_queue_priority_ut.cpp b/ydb/core/util/operation_queue_priority_ut.cpp index 1639ef8c9d7..6912e0c8c81 100644 --- a/ydb/core/util/operation_queue_priority_ut.cpp +++ b/ydb/core/util/operation_queue_priority_ut.cpp @@ -15,11 +15,14 @@ struct TPriorityItem { int Id = 0; int Priority = 0; + explicit TPriorityItem(int id) + : Id(id) + {} + TPriorityItem(int id, int priority) : Id(id) , Priority(priority) - { - } + {} TPriorityItem(const TPriorityItem&) = default; @@ -28,13 +31,7 @@ struct TPriorityItem { return Id == rhs.Id; } - TPriorityItem& operator =(const TPriorityItem& rhs) { - // TODO: assert that ID's are the same, because we - // use it as update rather than real assignment - Id = rhs.Id; - Priority = rhs.Priority; - return *this; - } + TPriorityItem& operator =(const TPriorityItem& rhs) = default; size_t Hash() const { return THash<int>()(Id); @@ -114,6 +111,36 @@ struct TOperationStarter : public TPriorityQueue::IStarter, public NOperationQue } // namespace +Y_UNIT_TEST_SUITE(TPriorityQueueTest) { + Y_UNIT_TEST(TestOrder) { + TQueueWithPriority<TPriorityItem, TPriorityItem::TLessByPriority> queue; + queue.Enqueue(TPriorityItem(2, 50)); + queue.Enqueue(TPriorityItem(3, 100)); + queue.Enqueue(TPriorityItem(6, 90)); + queue.Enqueue(TPriorityItem(8, 80)); + queue.Enqueue(TPriorityItem(1, 100)); + + UNIT_ASSERT(queue.Remove(TPriorityItem(1))); + UNIT_ASSERT(queue.Remove(TPriorityItem(2))); + + UNIT_ASSERT_VALUES_EQUAL(queue.Front(), TPriorityItem(3, 100)); + queue.PopFront(); + + UNIT_ASSERT(!queue.Remove(TPriorityItem(4))); + UNIT_ASSERT(!queue.Remove(TPriorityItem(5))); + + UNIT_ASSERT_VALUES_EQUAL(queue.Front(), TPriorityItem(6, 90)); + queue.PopFront(); + + UNIT_ASSERT(!queue.Remove(TPriorityItem(7))); + + UNIT_ASSERT_VALUES_EQUAL(queue.Front(), TPriorityItem(8, 80)); + queue.PopFront(); + + UNIT_ASSERT(queue.Empty()); + } +}; + Y_UNIT_TEST_SUITE(TPriorityOperationQueueTest) { Y_UNIT_TEST(ShouldStartEmpty) { TPriorityQueue::TConfig config; @@ -335,6 +362,9 @@ void Out<TSomeQueue::TItemWithTs>(IOutputStream& o, const TSomeQueue::TItemWithT } template<> -void Out<NKikimr::NOperationQueue::TPriorityItem>(IOutputStream& o, const NKikimr::NOperationQueue::TPriorityItem& item) { +void Out<NKikimr::NOperationQueue::TPriorityItem>( + IOutputStream& o, + const NKikimr::NOperationQueue::TPriorityItem& item) +{ o << item.ToString(); } |