diff options
author | kungasc <kungasc@yandex-team.com> | 2023-09-25 18:55:27 +0300 |
---|---|---|
committer | kungasc <kungasc@yandex-team.com> | 2023-09-25 19:58:41 +0300 |
commit | 06a588f5d3bd9429cc0ed1822cd4d629f3326f82 (patch) | |
tree | 94a8d66c3ace7f61624a560aafa4e2b50737af12 | |
parent | eb381404ff8196114ef1bc0d45419bce79936dfd (diff) | |
download | ydb-06a588f5d3bd9429cc0ed1822cd4d629f3326f82.tar.gz |
KIKIMR-19139 Delete legacy Comp Shards
28 files changed, 7 insertions, 5411 deletions
diff --git a/ydb/core/base/compile_time_flags.h b/ydb/core/base/compile_time_flags.h index 4259148ee3d..d7adae14681 100644 --- a/ydb/core/base/compile_time_flags.h +++ b/ydb/core/base/compile_time_flags.h @@ -3,16 +3,6 @@ // This file is used for enabling or disabling dangerous kikimr features // These flags may only be changed at compile time -// This feature allows schemeshard to accept sharded compaction for tables -#ifndef KIKIMR_ALLOW_SHARDED_COMPACTION -#define KIKIMR_ALLOW_SHARDED_COMPACTION 0 -#endif - -// This feature switches default to sharded compaction for all tables -#ifndef KIKIMR_DEFAULT_SHARDED_COMPACTION -#define KIKIMR_DEFAULT_SHARDED_COMPACTION 0 -#endif - // This feature enables tablet to serialize bundle deltas in part switches // It may only be enabled after 19-2 or newer is deployed everywhere #ifndef KIKIMR_TABLET_WRITE_BUNDLE_DELTAS diff --git a/ydb/core/base/localdb.cpp b/ydb/core/base/localdb.cpp index d07ee26fc1f..c3ed995d157 100644 --- a/ydb/core/base/localdb.cpp +++ b/ydb/core/base/localdb.cpp @@ -196,10 +196,6 @@ void TCompactionPolicy::Serialize(NKikimrSchemeOp::TCompactionPolicy& policyPb) TCompactionPolicyPtr CreateDefaultTablePolicy() { TCompactionPolicyPtr policy = new TCompactionPolicy; -#if KIKIMR_DEFAULT_SHARDED_COMPACTION - policy->CompactionStrategy = NKikimrSchemeOp::CompactionStrategySharded; - policy->ShardPolicy.SetTaskPriorityBase(100); -#endif return policy; } @@ -212,9 +208,6 @@ TCompactionPolicyPtr CreateDefaultUserTablePolicy() { LegacyQueueIdToTaskName(2), false}); userPolicy->Generations.push_back({400 * 1024 * 1024, 5, 16, 16ull * 1024 * 1024 * 1024, LegacyQueueIdToTaskName(3), false}); -#if KIKIMR_DEFAULT_SHARDED_COMPACTION - userPolicy->CompactionStrategy = NKikimrSchemeOp::CompactionStrategySharded; -#endif return userPolicy; } diff --git a/ydb/core/base/ya.make b/ydb/core/base/ya.make index 7e65496bc43..a9c5c9934f9 100644 --- a/ydb/core/base/ya.make +++ b/ydb/core/base/ya.make @@ -1,12 +1,5 @@ LIBRARY() -IF (KIKIMR_DEFAULT_SHARDED_COMPACTION) - # Makes it easier to test sharded compaction - CFLAGS( - -DKIKIMR_DEFAULT_SHARDED_COMPACTION=1 - ) -ENDIF() - SRCS( actor_activity_names.cpp appdata.h diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 0f68d97e4ec..631840c53a1 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -122,7 +122,7 @@ message TFamilyDescription { enum ECompactionStrategy { CompactionStrategyUnset = 0; CompactionStrategyGenerational = 1; - CompactionStrategySharded = 2; + CompactionStrategySharded = 2; // DEPRECATED: use CompactionStrategyGenerational } message TCompactionPolicy { diff --git a/ydb/core/tablet_flat/CMakeLists.darwin-x86_64.txt b/ydb/core/tablet_flat/CMakeLists.darwin-x86_64.txt index 7d4d14ebb92..9f009108f05 100644 --- a/ydb/core/tablet_flat/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tablet_flat/CMakeLists.darwin-x86_64.txt @@ -38,12 +38,6 @@ get_built_tool_path( enum_parser ) get_built_tool_path( - TOOL_enum_parser_bin - TOOL_enum_parser_dependency - tools/enum_parser/enum_parser - enum_parser -) -get_built_tool_path( TOOL_protoc_bin TOOL_protoc_dependency contrib/tools/protoc/bin @@ -91,7 +85,6 @@ target_sources(ydb-core-tablet_flat PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_create.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_gen.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_shard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_database.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_dbase_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_dbase_apply.cpp @@ -123,7 +116,6 @@ target_sources(ydb-core-tablet_flat PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_part_slice.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_range_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_row_versions.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_stat_part.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_stat_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_store_hotdog.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp @@ -141,11 +133,6 @@ generate_enum_serilization(ydb-core-tablet_flat ydb/core/tablet_flat/flat_comp_gen.h ) generate_enum_serilization(ydb-core-tablet_flat - ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_shard.h - INCLUDE_HEADERS - ydb/core/tablet_flat/flat_comp_shard.h -) -generate_enum_serilization(ydb-core-tablet_flat ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_part_loader.h INCLUDE_HEADERS ydb/core/tablet_flat/flat_part_loader.h diff --git a/ydb/core/tablet_flat/CMakeLists.linux-aarch64.txt b/ydb/core/tablet_flat/CMakeLists.linux-aarch64.txt index 5c9e2af153f..7621ab1675b 100644 --- a/ydb/core/tablet_flat/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tablet_flat/CMakeLists.linux-aarch64.txt @@ -38,12 +38,6 @@ get_built_tool_path( enum_parser ) get_built_tool_path( - TOOL_enum_parser_bin - TOOL_enum_parser_dependency - tools/enum_parser/enum_parser - enum_parser -) -get_built_tool_path( TOOL_protoc_bin TOOL_protoc_dependency contrib/tools/protoc/bin @@ -92,7 +86,6 @@ target_sources(ydb-core-tablet_flat PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_create.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_gen.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_shard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_database.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_dbase_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_dbase_apply.cpp @@ -124,7 +117,6 @@ target_sources(ydb-core-tablet_flat PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_part_slice.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_range_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_row_versions.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_stat_part.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_stat_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_store_hotdog.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp @@ -142,11 +134,6 @@ generate_enum_serilization(ydb-core-tablet_flat ydb/core/tablet_flat/flat_comp_gen.h ) generate_enum_serilization(ydb-core-tablet_flat - ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_shard.h - INCLUDE_HEADERS - ydb/core/tablet_flat/flat_comp_shard.h -) -generate_enum_serilization(ydb-core-tablet_flat ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_part_loader.h INCLUDE_HEADERS ydb/core/tablet_flat/flat_part_loader.h diff --git a/ydb/core/tablet_flat/CMakeLists.linux-x86_64.txt b/ydb/core/tablet_flat/CMakeLists.linux-x86_64.txt index 5c9e2af153f..7621ab1675b 100644 --- a/ydb/core/tablet_flat/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tablet_flat/CMakeLists.linux-x86_64.txt @@ -38,12 +38,6 @@ get_built_tool_path( enum_parser ) get_built_tool_path( - TOOL_enum_parser_bin - TOOL_enum_parser_dependency - tools/enum_parser/enum_parser - enum_parser -) -get_built_tool_path( TOOL_protoc_bin TOOL_protoc_dependency contrib/tools/protoc/bin @@ -92,7 +86,6 @@ target_sources(ydb-core-tablet_flat PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_create.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_gen.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_shard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_database.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_dbase_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_dbase_apply.cpp @@ -124,7 +117,6 @@ target_sources(ydb-core-tablet_flat PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_part_slice.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_range_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_row_versions.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_stat_part.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_stat_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_store_hotdog.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp @@ -142,11 +134,6 @@ generate_enum_serilization(ydb-core-tablet_flat ydb/core/tablet_flat/flat_comp_gen.h ) generate_enum_serilization(ydb-core-tablet_flat - ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_shard.h - INCLUDE_HEADERS - ydb/core/tablet_flat/flat_comp_shard.h -) -generate_enum_serilization(ydb-core-tablet_flat ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_part_loader.h INCLUDE_HEADERS ydb/core/tablet_flat/flat_part_loader.h diff --git a/ydb/core/tablet_flat/CMakeLists.windows-x86_64.txt b/ydb/core/tablet_flat/CMakeLists.windows-x86_64.txt index 7d4d14ebb92..9f009108f05 100644 --- a/ydb/core/tablet_flat/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tablet_flat/CMakeLists.windows-x86_64.txt @@ -38,12 +38,6 @@ get_built_tool_path( enum_parser ) get_built_tool_path( - TOOL_enum_parser_bin - TOOL_enum_parser_dependency - tools/enum_parser/enum_parser - enum_parser -) -get_built_tool_path( TOOL_protoc_bin TOOL_protoc_dependency contrib/tools/protoc/bin @@ -91,7 +85,6 @@ target_sources(ydb-core-tablet_flat PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_create.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_gen.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_shard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_database.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_dbase_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_dbase_apply.cpp @@ -123,7 +116,6 @@ target_sources(ydb-core-tablet_flat PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_part_slice.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_range_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_row_versions.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_stat_part.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_stat_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_store_hotdog.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp @@ -141,11 +133,6 @@ generate_enum_serilization(ydb-core-tablet_flat ydb/core/tablet_flat/flat_comp_gen.h ) generate_enum_serilization(ydb-core-tablet_flat - ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_shard.h - INCLUDE_HEADERS - ydb/core/tablet_flat/flat_comp_shard.h -) -generate_enum_serilization(ydb-core-tablet_flat ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_part_loader.h INCLUDE_HEADERS ydb/core/tablet_flat/flat_part_loader.h diff --git a/ydb/core/tablet_flat/flat_comp_create.cpp b/ydb/core/tablet_flat/flat_comp_create.cpp index d46a1f63e7c..8426b981406 100644 --- a/ydb/core/tablet_flat/flat_comp_create.cpp +++ b/ydb/core/tablet_flat/flat_comp_create.cpp @@ -1,6 +1,5 @@ #include "flat_comp_create.h" #include "flat_comp_gen.h" -#include "flat_comp_shard.h" namespace NKikimr { namespace NTable{ @@ -16,17 +15,5 @@ namespace NTable{ table, backend, broker, time, std::move(taskNameSuffix)); } - THolder<ICompactionStrategy> CreateShardedCompactionStrategy( - ui32 table, - ICompactionBackend* backend, - IResourceBroker* broker, - NUtil::ILogger* logger, - ITimeProvider* time, - TString taskNameSuffix) - { - return MakeHolder<NCompShard::TShardedCompactionStrategy>( - table, backend, broker, logger, time, std::move(taskNameSuffix)); - } - } } diff --git a/ydb/core/tablet_flat/flat_comp_create.h b/ydb/core/tablet_flat/flat_comp_create.h index 5fa047203ea..266f89d9a32 100644 --- a/ydb/core/tablet_flat/flat_comp_create.h +++ b/ydb/core/tablet_flat/flat_comp_create.h @@ -15,13 +15,5 @@ namespace NTable { ITimeProvider* time, TString taskNameSuffix); - THolder<ICompactionStrategy> CreateShardedCompactionStrategy( - ui32 table, - ICompactionBackend* backend, - IResourceBroker* broker, - NUtil::ILogger* logger, - ITimeProvider* time, - TString taskNameSuffix); - } } diff --git a/ydb/core/tablet_flat/flat_comp_shard.cpp b/ydb/core/tablet_flat/flat_comp_shard.cpp deleted file mode 100644 index efb8fe189f9..00000000000 --- a/ydb/core/tablet_flat/flat_comp_shard.cpp +++ /dev/null @@ -1,2656 +0,0 @@ -#include "flat_comp_shard.h" -#include "flat_part_charge.h" -#include "flat_part_iter_multi.h" -#include "flat_stat_part.h" -#include "util_fmt_cell.h" - -#include <ydb/core/tablet_flat/protos/flat_table_shard.pb.h> -#include <ydb/core/util/pb.h> - -#include <library/cpp/monlib/service/pages/templates.h> -#include <util/generic/cast.h> - -#include <optional> - -namespace NKikimr { -namespace NTable { -namespace NCompShard { - - using NKikimr::NTable::NProto::TShardedStrategyStateInfo; - - namespace { - - static constexpr ui32 PRIORITY_UPDATE_FACTOR = 20; - - using NFmt::TPrintableTypedCells; - - class TPrintableShardRange { - public: - TPrintableShardRange(const TTableShard& shard, const TTableInfo& table) - : Shard(shard) - , Table(table) - { } - - friend IOutputStream& operator<<(IOutputStream& out, const TPrintableShardRange& v) { - if (v.Shard.LeftKey == 0 && v.Shard.RightKey == 0) { - out << "all"; - return out; - } - out << '['; - if (v.Shard.LeftKey != 0) { - out << TPrintableTypedCells(v.Table.SplitKeys.at(v.Shard.LeftKey).GetCells(), v.Table.RowScheme->Keys->BasicTypes()); - } else { - out << "-inf"; - } - out << ", "; - if (v.Shard.RightKey != 0) { - out << TPrintableTypedCells(v.Table.SplitKeys.at(v.Shard.RightKey).GetCells(), v.Table.RowScheme->Keys->BasicTypes()); - } else { - out << "+inf"; - } - out << ')'; - return out; - } - - private: - const TTableShard& Shard; - const TTableInfo& Table; - }; - - TPartView MakePartView(TIntrusiveConstPtr<TPart> part, TIntrusiveConstPtr<TSlices> slices) noexcept { - TPartView partView{ std::move(part), nullptr, std::move(slices) }; - partView.Screen = partView.Slices->ToScreen(); // TODO: remove screen from TPartView - return partView; - } - - } - - bool TSplitStatIterator::TCmpHeapByFirstKey::operator()(const TItemState* b, const TItemState* a) const noexcept { - TCellsRef left = a->Slice.FirstKey.GetCells(); - TCellsRef right = b->Slice.FirstKey.GetCells(); - if (int cmp = ComparePartKeys(left, right, KeyCellDefaults)) { - return cmp < 0; - } - return a->Slice.FirstInclusive && !b->Slice.FirstInclusive; - } - - bool TSplitStatIterator::TCmpHeapByNextKey::operator()(const TItemState* b, const TItemState* a) const noexcept { - TCellsRef left = a->NextKey; - TCellsRef right = b->NextKey; - return ComparePartKeys(left, right, KeyCellDefaults) < 0; - } - - bool TSplitStatIterator::TCmpHeapByLastKey::operator()(const TItemState* b, const TItemState* a) const noexcept { - TCellsRef left = a->Slice.LastKey.GetCells(); - TCellsRef right = b->Slice.LastKey.GetCells(); - if (int cmp = ComparePartKeys(left, right, KeyCellDefaults)) { - return cmp < 0; - } - return !a->Slice.LastInclusive && b->Slice.LastInclusive; - } - - bool TSplitStatIterator::HasStarted(const TItemState* item) const noexcept { - TCellsRef left = item->Slice.FirstKey.GetCells(); - TCellsRef right = Key; - return ComparePartKeys(left, right, KeyCellDefaults) < 0; - } - - bool TSplitStatIterator::HasStopped(const TItemState* item) const noexcept { - TCellsRef left = item->Slice.LastKey.GetCells(); - TCellsRef right = Key; - if (int cmp = ComparePartKeys(left, right, KeyCellDefaults)) { - return cmp < 0; - } - return !item->Slice.LastInclusive; - } - - void TSplitStatIterator::TItemState::InitNextKey() noexcept { - auto& columns = Part->Scheme->Groups[0].ColsKeyIdx; - NextKey.resize(columns.size()); - for (size_t pos = 0; pos < columns.size(); ++pos) { - NextKey[pos] = NextPage->Cell(columns[pos]); - } - } - - void TSplitStatIterator::TItemState::InitPageSize() noexcept { - TRowId endRowId = NextPage ? NextPage->GetRowId() : Slice.EndRowId(); - LastPageSize = Helper.CalcSize(LastRowId, endRowId); - LastPageRows = endRowId - LastRowId; - LastRowId = endRowId; - } - - void TSplitStatIterator::AddSlice(const TPart* part, const TSlice& slice, ui64 size) noexcept { - auto* item = &Items.emplace_back(part, slice); - InitQueue.push(item); - RightSize_ += size; - RightRows_ += slice.Rows(); - } - - bool TSplitStatIterator::Next() noexcept { - if (!InitQueue && !NextQueue) { - return false; - } - - // Select the next split key - TCellsRef selected; - if (InitQueue && NextQueue) { - TCellsRef left = InitQueue.top()->Slice.FirstKey.GetCells(); - TCellsRef right = NextQueue.top()->NextKey; - if (ComparePartKeys(left, right, KeyCellDefaults) <= 0) { - selected = left; - } else { - selected = right; - } - } else if (InitQueue) { - selected = InitQueue.top()->Slice.FirstKey.GetCells(); - } else { - selected = NextQueue.top()->NextKey; - } - - Key.reserve(selected.size()); - Key.assign(selected.begin(), selected.end()); - TCellsRef splitKey = Key; - - // Take all matching items from init queue and initialize them - while (InitQueue && ComparePartKeys(InitQueue.top()->Slice.FirstKey.GetCells(), splitKey, KeyCellDefaults) <= 0) { - auto* item = InitQueue.top(); - InitQueue.pop(); - StartQueue.push(item); - - // Find the first page that is after the first slice row - item->NextPage = item->Part->Index.LookupRow(item->Slice.BeginRowId()); - if (++item->NextPage && item->NextPage->GetRowId() < item->Slice.EndRowId()) { - // That first page has a matching row, use it for key enumeration - item->InitNextKey(); - NextQueue.push(item); - } - } - - // Take all slices from start queue that have crossed the boundary - while (StartQueue && HasStarted(StartQueue.top())) { - auto* item = StartQueue.top(); - StartQueue.pop(); - ActivationQueue.push_back(item); - } - - // Everything in activation queue has crossed the boundary and now belongs on the left side - for (auto* item : ActivationQueue) { - item->InitPageSize(); - LeftSize_ += item->LastPageSize; - LeftRows_ += item->LastPageRows; - - // Check if the last page is only partially on the left side - if (item->LastPageRows <= 1) { - item->IsPartial = false; - } else if (item->NextPage) { - item->IsPartial = ComparePartKeys(splitKey, item->NextKey, KeyCellDefaults) < 0; - } else if (HasStopped(item)) { - item->IsPartial = false; - } else { - item->IsPartial = true; - StopQueue.push(item); - } - - // The last page may not belong on the right side any more - if (!item->IsPartial) { - RightSize_ -= item->LastPageSize; - RightRows_ -= item->LastPageRows; - } - } - ActivationQueue.clear(); - - // Take all slices that have next key matching the new boundary - while (NextQueue && ComparePartKeys(NextQueue.top()->NextKey, splitKey, KeyCellDefaults) <= 0) { - auto* item = NextQueue.top(); - NextQueue.pop(); - - // The last page no longer belongs on the right side - if (item->IsPartial) { - RightSize_ -= item->LastPageSize; - RightRows_ -= item->LastPageRows; - item->IsPartial = false; - } - - // Try moving to the next page - if (++item->NextPage && item->NextPage->GetRowId() < item->Slice.EndRowId()) { - item->InitNextKey(); - NextQueue.push(item); - } - - // If there are any rows left the new page will activate on the next key - if (item->LastRowId < item->Slice.EndRowId()) { - ActivationQueue.push_back(item); - } - } - - // Take all slices that have their last row beyond the new boundary - while (StopQueue && HasStopped(StopQueue.top())) { - auto* item = StopQueue.top(); - StopQueue.pop(); - - // Everything up to the end no longer belongs on the right side - if (item->IsPartial) { - RightSize_ -= item->LastPageSize; - RightRows_ -= item->LastPageRows; - item->IsPartial = false; - } - } - - return true; - } - - bool TPageReuseBuilder::TItemState::InitNextKey() noexcept { - if (NextPage && NextPage->GetRowId() < Slice.EndRowId()) { - auto& columns = Part->Scheme->Groups[0].ColsKeyIdx; - NextKey.resize(columns.size()); - for (size_t pos = 0; pos < columns.size(); ++pos) { - NextKey[pos] = NextPage->Cell(columns[pos]); - } - First.Key = NextKey; - First.RowId = NextPage->GetRowId(); - First.Inclusive = true; - return true; - } else { - return false; - } - } - - TRowId TPageReuseBuilder::TItemState::GetNextRowId() const noexcept { - return NextPage ? NextPage->GetRowId() : Part->Index.GetEndRowId(); - } - - void TPageReuseBuilder::AddSlice(const TPart* part, const TSlice& slice, bool reusable) noexcept { - Items.emplace_back(part, slice, reusable); - } - - TPageReuseBuilder::TResults TPageReuseBuilder::Build() noexcept { - TResults results; - TItemState* lastItem = nullptr; - - // Produces a min-heap by first key - auto heapByFirstKeyMin = [this](const TItemState* b, const TItemState* a) noexcept -> bool { - TCellsRef left = a->First.Key; - TCellsRef right = b->First.Key; - if (int cmp = ComparePartKeys(left, right, KeyCellDefaults)) { - return cmp < 0; - } - return a->First.Inclusive && !b->First.Inclusive; - }; - - auto noIntersection = [this](const TRightBoundary& a, const TLeftBoundary& b) noexcept -> bool { - TCellsRef left = a.Key; - TCellsRef right = b.Key; - if (int cmp = ComparePartKeys(left, right, KeyCellDefaults)) { - return cmp < 0; - } - return !a.Inclusive || !b.Inclusive; - }; - - auto lessBoundary = [this](const TRightBoundary& a, const TRightBoundary& b) noexcept -> bool { - TCellsRef left = a.Key; - TCellsRef right = b.Key; - if (int cmp = ComparePartKeys(left, right, KeyCellDefaults)) { - return cmp < 0; - } - return !a.Inclusive && b.Inclusive; - }; - - TVector<TItemState*> enterQueue(Reserve(Items.size())); - for (TItemState& item : Items) { - item.NextPage = item.Part->Index.LookupRow(item.Slice.BeginRowId()); - Y_VERIFY(item.NextPage, - "Cannot find row %" PRIu64 " in the index for part %s", - item.Slice.BeginRowId(), - item.Part->Label.ToString().c_str()); - if (item.NextPage->GetRowId() != item.Slice.BeginRowId()) { - ++item.NextPage; - } - item.First.Key = item.Slice.FirstKey.GetCells(); - item.First.RowId = item.Slice.FirstRowId; - item.First.Inclusive = item.Slice.FirstInclusive; - enterQueue.push_back(&item); - } - - std::make_heap(enterQueue.begin(), enterQueue.end(), heapByFirstKeyMin); - - TVector<TCell> tmpStart; - TVector<TCell> tmpEnd; - - bool reuseActive = true; - while (enterQueue) { - TRightBoundary end; - - // a separate scope to keep local variables - { - auto* item = enterQueue.front(); - std::pop_heap(enterQueue.begin(), enterQueue.end(), heapByFirstKeyMin); - enterQueue.pop_back(); - - TLeftBoundary start = item->First; - bool viable = item->Reusable; - - // Protect start boundary against future overwrites - tmpStart.assign(start.Key.begin(), start.Key.end()); - start.Key = tmpStart; - - if (item->NextPage && item->NextPage->GetRowId() == start.BeginRowId()) { - ++item->NextPage; - } else { - // Sub-slice is not viable unless it starts on a page boundary - viable = false; - } - - if (item->InitNextKey()) { - enterQueue.push_back(item); - std::push_heap(enterQueue.begin(), enterQueue.end(), heapByFirstKeyMin); - - end.Key = item->NextKey; - end.RowId = item->NextPage->GetRowId(); - end.Inclusive = false; - } else { - end.Key = item->Slice.LastKey.GetCells(); - end.RowId = item->Slice.LastRowId; - end.Inclusive = item->Slice.LastInclusive; - } - - // Protect end boundary against future overwrites - tmpEnd.assign(end.Key.begin(), end.Key.end()); - end.Key = tmpEnd; - - if (viable && end.EndRowId() != item->GetNextRowId()) { - // Sub-slice is not viable unless it ends on a page boundary - viable = false; - } - - // Move on if current page has no intersection with others - if (!enterQueue || item == enterQueue.front() || noIntersection(end, enterQueue.front()->First)) { - if (viable) { - if (reuseActive && lastItem == item && results.Reusable.back().Slice.EndRowId() == start.BeginRowId()) { - // Merge adjacent pages as long as it's from the same item - results.Reusable.back().Slice.LastKey = TSerializedCellVec(end.Key); - results.Reusable.back().Slice.LastRowId = end.RowId; - results.Reusable.back().Slice.LastInclusive = end.Inclusive; - } else { - auto& result = results.Reusable.emplace_back(); - result.Part = item->Part; - result.Slice.FirstKey = TSerializedCellVec(start.Key); - result.Slice.FirstRowId = start.RowId; - result.Slice.FirstInclusive = start.Inclusive; - result.Slice.LastKey = TSerializedCellVec(end.Key); - result.Slice.LastRowId = end.RowId; - result.Slice.LastInclusive = end.Inclusive; - ++results.ExpectedSlices; - reuseActive = true; - lastItem = item; - } - } else if (reuseActive) { - ++results.ExpectedSlices; - reuseActive = false; - lastItem = nullptr; - } - continue; - } - } - - if (reuseActive) { - ++results.ExpectedSlices; - reuseActive = false; - lastItem = nullptr; - } - - // Now flush all items that have potential intersections, those cannot be reused - while (enterQueue && !noIntersection(end, enterQueue.front()->First)) { - auto* item = enterQueue.front(); - std::pop_heap(enterQueue.begin(), enterQueue.end(), heapByFirstKeyMin); - enterQueue.pop_back(); - - if (item->NextPage && item->NextPage->GetRowId() == item->First.BeginRowId()) { - ++item->NextPage; - } - - TRightBoundary candidate; - - if (item->InitNextKey()) { - enterQueue.push_back(item); - std::push_heap(enterQueue.begin(), enterQueue.end(), heapByFirstKeyMin); - - candidate.Key = item->NextKey; - candidate.RowId = item->NextPage->GetRowId(); - candidate.Inclusive = false; - } else { - candidate.Key = item->Slice.LastKey.GetCells(); - candidate.RowId = item->Slice.LastRowId; - candidate.Inclusive = item->Slice.LastInclusive; - } - - if (lessBoundary(end, candidate)) { - end = candidate; - - // Protect end boundary against future overwrites - tmpEnd.assign(end.Key.begin(), end.Key.end()); - end.Key = tmpEnd; - } - } - } - - return results; - } - - void TTableShard::RegisterItem(const TTablePart& info, TTablePart::TItem& item, bool isGarbage) noexcept { - auto r = Levels->Add(info.Part, item.Slice); - item.Level = r.Level; - item.Position = r.Position; - - // register stats for the specific level - if (PerLevelStats.size() <= item.Level->Index) { - PerLevelStats.resize(item.Level->Index + 1); - } - PerLevelStats[item.Level->Index] += item.Stats; - - if (PerLevelGarbage.size() <= item.Level->Index) { - PerLevelGarbage.resize(item.Level->Index + 1); - } - if (isGarbage) { - PerLevelGarbage[item.Level->Index] += item.Stats.Size; - } - } - - bool TTableShard::FindSplitKey(TSerializedCellVec& foundKey, const TKeyCellDefaults& keyDefaults) const noexcept { - TSplitStatIterator it(keyDefaults); - - for (const auto& kvInfo : Parts) { - for (const auto& kvSlice : kvInfo.second.Slices) { - it.AddSlice(kvInfo.second.Part.Get(), kvSlice.second.Slice, kvSlice.second.Stats.Size); - } - } - - TVector<TCell> bestSplitKey; - ui64 bestLeftSize = it.LeftSize(); - ui64 bestRightSize = it.RightSize(); - Y_VERIFY(bestLeftSize == 0); - Y_VERIFY(bestRightSize == Stats.Size, "Full stats size is out of sync!"); - - bool stop = false; - while (!stop && it.Next()) { - TCellsRef splitKey = it.CurrentKey(); - auto leftSize = it.LeftSize(); - auto rightSize = it.RightSize(); - - // left size increases, right size decreases - // as soon as left becomes bigger than right it won't get any better - if (leftSize > rightSize) { - stop = true; - auto currentDiff = bestRightSize - bestLeftSize; - if (currentDiff <= leftSize - rightSize) { - // previous iteration has given us a better difference - break; - } - } - - bestSplitKey.assign(splitKey.begin(), splitKey.end()); - bestLeftSize = leftSize; - bestRightSize = rightSize; - } - - if (bestLeftSize < Stats.Size && bestRightSize < Stats.Size) { - // We found a key that splits current shard somewhat evenly and - // the size estimate of individual subshards actually decreses - foundKey = TSerializedCellVec(bestSplitKey); - return true; - } - - // We cannot find split key for some reason - // For example there's a single row-page that is way too big, or there - // are lots of slices stacked on top of each other with no natural way - // to divide them on a page boundary. - return false; - } - - bool TSliceSplitOp::Execute(IPages* env) { - const TIntrusiveConstPtr<TKeyCellDefaults> keyDefaults = Table->RowScheme->Keys; - - TVector<TCell> keyCellsBuffer(keyDefaults->Size()); - auto getKeyCells = [this, &keyDefaults, &keyCellsBuffer](ui64 keyId) { - auto* key = Table->SplitKeys.FindPtr(keyId); - Y_VERIFY(key, "Cannot find split key %" PRIu64, keyId); - - auto keyCells = key->GetCells(); - if (keyCells.size() < keyDefaults->Size()) { - for (size_t i = 0; i < keyCells.size(); ++i) { - keyCellsBuffer[i] = keyCells[i]; - } - for (size_t i = keyCells.size(); i < keyDefaults->Size(); ++i) { - keyCellsBuffer[i] = keyDefaults->Defs[i]; - } - keyCells = keyCellsBuffer; - } - Y_VERIFY_DEBUG(keyCells.size() == keyDefaults->Size()); - - return keyCells; - }; - - bool ok = true; - TCharge charge(env, *Part, TTagsRef{ }); - for (size_t idx = 1; idx < Shards.size(); ++idx) { - auto keyCells = getKeyCells(Shards[idx]->LeftKey); - ok &= charge.SplitKey(keyCells, *keyDefaults, Slice.BeginRowId(), Slice.EndRowId()); - } - - if (!ok) { - // Some pages are missing - return false; - } - - TSlice current = Slice; - TVector<TSlice> results(Reserve(Shards.size())); - TPartSimpleIt it(Part.Get(), { }, keyDefaults, env); - for (size_t idx = 1; idx < Shards.size(); ++idx) { - const TRowId currentBegin = current.BeginRowId(); - const TRowId currentEnd = current.EndRowId(); - if (currentBegin >= currentEnd) { - break; - } - - it.SetBounds(currentBegin, currentEnd); - - auto keyCells = getKeyCells(Shards[idx]->LeftKey); - auto ready = it.Seek(keyCells, ESeek::Lower); - Y_VERIFY(ready != EReady::Page, "Unexpected failure, precharge logic may be faulty"); - - // This is row id of the first row >= split key - const TRowId rowId = ready == EReady::Data ? it.GetRowId() : currentEnd; - Y_VERIFY_DEBUG(currentBegin <= rowId); - Y_VERIFY_DEBUG(rowId <= currentEnd); - - // This is the first key >= split key - TSerializedCellVec rowKey; - if (rowId != currentEnd) { - rowKey = TSerializedCellVec(it.GetRawKey()); - } else if (Y_UNLIKELY(ComparePartKeys(current.LastKey.GetCells(), keyCells, *keyDefaults) < 0)) { - // This shouldn't normally happen, but better safe than sorry - // Current split key is actually out of bounds for the slice - // Since there's no real intersection leave it as is - break; - } - - // Do we have any rows to the left of the split? - if (currentBegin < rowId) { - auto& left = results.emplace_back(); - left.FirstKey = std::move(current.FirstKey); - left.FirstRowId = current.FirstRowId; - left.FirstInclusive = current.FirstInclusive; - if ((rowId - currentBegin) == 1 && left.FirstInclusive) { - // Single row, just reuse the exact same key - left.LastKey = left.FirstKey; - left.LastRowId = left.FirstRowId; - left.LastInclusive = true; - } else { - // Seek to previous row - ready = it.Seek(rowId - 1); - Y_VERIFY(ready == EReady::Data, "Unexpected failure, precharge logic may be faulty"); - left.LastKey = TSerializedCellVec(it.GetRawKey()); - left.LastRowId = it.GetRowId(); - left.LastInclusive = true; - if ((rowId - currentBegin) == 1) { - // Single row, just reuse the exact same key - Y_VERIFY_DEBUG(!left.FirstInclusive); - left.FirstKey = left.LastKey; - left.FirstRowId = left.LastRowId; - left.FirstInclusive = true; - } - } - - Y_VERIFY_DEBUG(left.BeginRowId() == currentBegin); - Y_VERIFY_DEBUG(left.EndRowId() == rowId); - } - - current.FirstKey = std::move(rowKey); - current.FirstRowId = rowId; - current.FirstInclusive = true; - } - - if (current.BeginRowId() < current.EndRowId()) { - auto& left = results.emplace_back(); - if (current.Rows() == 1 && (current.FirstInclusive ^ current.LastInclusive)) { - if (current.FirstInclusive) { - left.FirstKey = current.FirstKey; - left.FirstRowId = current.FirstRowId; - left.FirstInclusive = true; - left.LastKey = current.FirstKey; - left.LastRowId = current.FirstRowId; - left.LastInclusive = true; - } else { - Y_VERIFY_DEBUG(current.LastInclusive); - left.FirstKey = current.LastKey; - left.FirstRowId = current.LastRowId; - left.FirstInclusive = true; - left.LastKey = current.LastKey; - left.LastRowId = current.LastRowId; - left.LastInclusive = true; - } - } else { - left = std::move(current); - } - } - - TSliceSplitResult result; - result.Part = Part; - result.OldSlice = Slice; - result.NewSlices = std::move(results); - result.Shards = std::move(Shards); - Consumer->OnSliceSplitResult(std::move(result)); - - return true; - } - - bool TUnderlayMask::HasKey(TCellsRef key) noexcept { - while (Position != Bounds.end()) { - // TODO: we could use binary search, but it's probably useless - const auto& current = *Position; - auto cmp = ComparePartKeys(key, current.LastKey.GetCells(), *RowScheme->Keys); - if (cmp < 0 || (cmp == 0 && current.LastInclusive)) { - break; - } - ++Position; - } - Y_VERIFY(ValidatePosition(key), "TUnderlayMask::HasKey called with out of order keys"); - if (Position != Bounds.end()) { - const auto& current = *Position; - auto cmp = ComparePartKeys(current.FirstKey.GetCells(), key, *RowScheme->Keys); - return cmp < 0 || (cmp == 0 && current.FirstInclusive); - } - return false; - } - - bool TUnderlayMask::ValidateOrder() const noexcept { - auto it = Bounds.begin(); - if (it == Bounds.end()) { - return true; - } - auto last = it++; - while (it != Bounds.end()) { - if (!TBounds::LessByKey(*last, *it, *RowScheme->Keys)) { - return false; - } - last = it++; - } - return true; - } - - bool TUnderlayMask::ValidatePosition(TCellsRef key) const noexcept { - auto pos = Position; - if (pos == Bounds.begin()) { - return true; - } - auto& prev = *--pos; - auto cmp = ComparePartKeys(prev.LastKey.GetCells(), key, *RowScheme->Keys); - return cmp < 0 || (cmp == 0 && !prev.LastInclusive); - } - - THolder<TUnderlayMask> TUnderlayMask::Build( - TIntrusiveConstPtr<TRowScheme> rowScheme, - TVector<const TBounds*>& input) noexcept - { - const TKeyCellDefaults& keyDefaults = *rowScheme->Keys; - - // Sorts heap by the first key (returns true when a->FirstKey < b->FirstKey) - auto heapByFirstKeyMin = [&keyDefaults](const TBounds* b, const TBounds* a) noexcept -> bool { - TCellsRef left = a->FirstKey.GetCells(); - TCellsRef right = b->FirstKey.GetCells(); - if (int cmp = ComparePartKeys(left, right, keyDefaults)) { - return cmp < 0; - } - return a->FirstInclusive && !b->FirstInclusive; - }; - - // Returns true when a->LastKey < b->LastKey - auto lastKeyLess = [&keyDefaults](const TBounds* a, const TBounds* b) noexcept -> bool { - TCellsRef left = a->LastKey.GetCells(); - TCellsRef right = b->LastKey.GetCells(); - if (int cmp = ComparePartKeys(left, right, keyDefaults)) { - return cmp < 0; - } - return !a->LastInclusive && b->LastInclusive; - }; - - // Returns true when a->LastKey may be stitched with b->FirstKey - auto mayStitch = [&keyDefaults](const TBounds* a, const TBounds* b) noexcept -> bool { - TCellsRef left = a->LastKey.GetCells(); - TCellsRef right = b->FirstKey.GetCells(); - int cmp = ComparePartKeys(left, right, keyDefaults); - if (cmp < 0) { - return false; - } - if (cmp == 0) { - return a->LastInclusive || b->FirstInclusive; - } - return true; - }; - - TVector<TBounds> results(Reserve(input.size())); - - std::make_heap(input.begin(), input.end(), heapByFirstKeyMin); - - while (input) { - const TBounds* first = input.front(); - std::pop_heap(input.begin(), input.end(), heapByFirstKeyMin); - input.pop_back(); - - const TBounds* last = first; - - while (input && mayStitch(last, input.front())) { - auto* current = input.front(); - std::pop_heap(input.begin(), input.end(), heapByFirstKeyMin); - input.pop_back(); - - if (lastKeyLess(last, current)) { - last = current; - } - } - - results.emplace_back( - first->FirstKey, - last->LastKey, - first->FirstInclusive, - last->LastInclusive); - } - - return MakeHolder<TUnderlayMask>(std::move(rowScheme), std::move(results)); - } - - TSplitKeys::TSplitKeys(TIntrusiveConstPtr<TRowScheme> rowScheme, TSplitKeys::TKeysVec keys) - : RowScheme(std::move(rowScheme)) - , Keys(std::move(keys)) - { - Y_VERIFY(ValidateOrder(), "TSplitKeys got keys in an invalid order"); - Reset(); - } - - TSplitKeys::TSplitKeys(TIntrusiveConstPtr<TRowScheme> rowScheme, TVector<const TBounds*> bounds) - : RowScheme(std::move(rowScheme)) - { - auto boundsLess = [this](const TBounds* a, const TBounds* b) noexcept -> bool { - if (auto cmp = ComparePartKeys(a->FirstKey.GetCells(), b->FirstKey.GetCells(), *RowScheme->Keys)) { - return cmp < 0; - } - return a->FirstInclusive && !b->FirstInclusive; - }; - std::sort(bounds.begin(), bounds.end(), boundsLess); - - Keys.reserve(bounds.size()); - Inclusive.reserve(bounds.size()); - - const TBounds* last = nullptr; - for (const TBounds* edge : bounds) { - if (!last || boundsLess(last, edge)) { - Keys.emplace_back(edge->FirstKey); - Inclusive.push_back(edge->FirstInclusive); - } - last = edge; - } - - Y_VERIFY(ValidateOrder(), "TSplitKeys generated keys in an invalid order"); - Reset(); - } - - bool TSplitKeys::ShouldSplit(TCellsRef key) noexcept { - bool splitFound = false; - while (Position != Keys.end()) { - // TODO: we could use binary search, but it's probably useless - const auto& current = *Position; - auto cmp = ComparePartKeys(key, current.GetCells(), *RowScheme->Keys); - if (cmp < 0 || (cmp == 0 && !IsInclusive(Position))) { - break; - } - splitFound = true; - ++Position; - } - Y_VERIFY(ValidatePosition(key), "TSplitKeys::ShouldSplit called with out of order keys"); - return splitFound; - } - - bool TSplitKeys::ValidateOrder() const noexcept { - auto it = Keys.begin(); - if (it == Keys.end()) { - return true; - } - auto posLess = [this](const TKeysVec::const_iterator& a, const TKeysVec::const_iterator& b) noexcept -> bool { - if (auto cmp = ComparePartKeys(a->GetCells(), b->GetCells(), *RowScheme->Keys)) { - return cmp < 0; - } - return IsInclusive(a) && !IsInclusive(b); - }; - auto last = it++; - while (it != Keys.end()) { - if (!posLess(last, it)) { - return false; - } - last = it++; - } - return true; - } - - bool TSplitKeys::ValidatePosition(TCellsRef key) const noexcept { - auto prev = Position; - if (prev == Keys.begin()) { - return true; - } - --prev; - if (auto cmp = ComparePartKeys(prev->GetCells(), key, *RowScheme->Keys)) { - return cmp < 0; - } - return IsInclusive(prev); - } - - bool TSplitKeys::IsInclusive(TVector<TSerializedCellVec>::const_iterator pos) const noexcept { - size_t index = pos - Keys.begin(); - return index >= Inclusive.size() || Inclusive[index]; - } - - void TShardedCompactionParams::Describe(IOutputStream& out) const noexcept { - // TODO: display some additional info - TCompactionParams::Describe(out); - } - - void TShardedCompactionStrategy::Start(TCompactionState state) { - Y_VERIFY(!Policy, "Strategy has already been started"); - - const auto* scheme = Backend->TableScheme(Table); - Policy = scheme->CompactionPolicy; - TableInfo.RowScheme = Backend->RowScheme(Table); - - TShardedStrategyStateInfo header; - TVector<ui64> splitKeyIds; - TVector<ui64> shardIds; - - if (auto* pRawHeader = state.StateSnapshot.FindPtr(0)) { - bool ok = ParseFromStringNoSizeLimit(header, *pRawHeader); - Y_VERIFY(ok); - - TableInfo.LastSplitKey = header.GetLastSplitKey(); - TableInfo.LastShardId = header.GetLastShardId(); - - for (ui64 keyId : header.GetSplitKeys()) { - auto* pRawKey = state.StateSnapshot.FindPtr(keyId); - Y_VERIFY(pRawKey); - TableInfo.SplitKeys[keyId] = TSerializedCellVec(*pRawKey); - splitKeyIds.emplace_back(keyId); - } - - for (ui64 shardId : header.GetShards()) { - shardIds.emplace_back(shardId); - } - } - - const auto& keyDefaults = *TableInfo.RowScheme->Keys; - std::sort(splitKeyIds.begin(), splitKeyIds.end(), [&](ui64 a, ui64 b) -> bool { - auto left = TableInfo.SplitKeys.at(a).GetCells(); - auto right = TableInfo.SplitKeys.at(b).GetCells(); - return ComparePartKeys(left, right, keyDefaults) < 0; - }); - - Shards.PushBack(new TTableShard()); - for (ui64 keyId : splitKeyIds) { - auto* last = Shards.Back(); - Shards.PushBack(new TTableShard()); - auto* shard = Shards.Back(); - last->RightKey = keyId; - shard->LeftKey = keyId; - } - - if (shardIds) { - Y_VERIFY(shardIds.size() == splitKeyIds.size() + 1, - "Have %" PRISZT " shards with %" PRISZT " split keys", - shardIds.size(), splitKeyIds.size()); - - auto it = shardIds.begin(); - for (auto& shard : Shards) { - Y_VERIFY(it != shardIds.end()); - shard.Id = *it++; - } - Y_VERIFY(it == shardIds.end()); - } else if (splitKeyIds) { - // Assume we upgraded from some old dev code, renumber from 1 - for (auto& shard : Shards) { - shard.Id = ++TableInfo.LastShardId; - } - } - - for (auto& shard : Shards) { - // Initialize all shards while empty - shard.Levels.Reset(new TLevels(TableInfo.RowScheme->Keys)); - } - - for (auto& part : Backend->TableColdParts(Table)) { - ColdParts.emplace_back(std::move(part)); - Y_FAIL("Sharded compaction does not support cold parts"); - } - - auto parts = Backend->TableParts(Table); - std::sort(parts.begin(), parts.end(), - [](const TPartView& a, const TPartView& b) -> bool { - if (a->Epoch != b->Epoch) { - return a->Epoch < b->Epoch; - } - return a->Label < b->Label; - }); - - while (parts) { - const auto& partView = parts.back(); - TPartDataSizeHelper helper(partView.Part.Get()); - ui64 fullSize = helper.CalcSize(0, Max<TRowId>()); - if (fullSize >= Policy->ShardPolicy.GetMinSliceSize()) { - break; - } - AllBackingSize += partView.Part->BackingSize(); - NurseryDataSize += fullSize; - auto& nurseryItem = Nursery.emplace_back(); - nurseryItem.PartView = partView; - nurseryItem.DataSize = fullSize; - parts.pop_back(); - } - - for (const auto& partView : parts) { - Y_VERIFY(partView.Slices && !partView.Slices->empty()); - EnsureGlobalPart(partView.Part, partView.Slices); - } - - AddParts(std::move(parts)); - - CheckCompactions(); - - // TODO: schedule split/merge if necessary (performed in ApplyChanges) - } - - void TShardedCompactionStrategy::Stop() { - while (PendingSliceSplits) { - auto* opWeak = PendingSliceSplits.Front(); - auto readId = opWeak->GetReadId(); - bool ok = Backend->CancelRead(readId); - Y_VERIFY(ok, "Failed to cancel read with id %" PRIu64, readId); - } - - CancelTask(ForcedCompactionTask); - ForcedCompactionPending = false; - - for (auto& shard : Shards) { - CancelTask(shard.Task); - } - - if (MemCompactionId != 0) { - Backend->CancelCompaction(MemCompactionId); - MemCompactionId = 0; - MemCompactionForced = false; - NurseryTaken = 0; - } - - // Make it possible to Start again - TableInfo = { }; - Policy = nullptr; - Shards.Clear(); - SliceSplitResults.clear(); - AllParts.clear(); - AllBackingSize = 0; - Nursery.clear(); - NurseryDataSize = 0; - ColdParts.clear(); - } - - void TShardedCompactionStrategy::ReflectSchema() { - const auto* scheme = Backend->TableScheme(Table); - - TString err; - bool ok = NLocalDb::ValidateCompactionPolicyChange(*Policy, *scheme->CompactionPolicy, err); - Y_VERIFY(ok, "table %s id %u: %s", scheme->Name.c_str(), scheme->Id, err.c_str()); - - Policy = scheme->CompactionPolicy; - TableInfo.RowScheme = Backend->RowScheme(Table); - - CheckCompactions(); - } - - void TShardedCompactionStrategy::ReflectRemovedRowVersions() { - // nothing - } - - void TShardedCompactionStrategy::UpdateCompactions() { - CheckCompactions(); - } - - float TShardedCompactionStrategy::GetOverloadFactor() { - // TODO: implement overload factor - return 0.0f; - } - - ui64 TShardedCompactionStrategy::GetBackingSize() { - return AllBackingSize; - } - - ui64 TShardedCompactionStrategy::GetBackingSize(ui64 ownerTabletId) { - // FIXME: maybe implement some day - Y_UNUSED(ownerTabletId); - return AllBackingSize; - } - - ui64 TShardedCompactionStrategy::BeginMemCompaction(TTaskId taskId, TSnapEdge edge, ui64 forcedCompactionId) { - auto params = MakeHolder<TShardedCompactionParams>(); - params->Table = Table; - params->TaskId = taskId; - params->Edge = edge; - params->KeepInCache = true; - - NurseryTaken = 0; - if (edge.Head == TEpoch::Max() && Nursery && forcedCompactionId == 0) { - ui64 expectedSize = Backend->TableMemSize(Table, edge.Head); - if (expectedSize > 0) { - bool takeAll = ( - (expectedSize + NurseryDataSize) >= (2 * Policy->ShardPolicy.GetMinSliceSize()) || - (Nursery.size() >= Policy->ShardPolicy.GetMaxSlicesPerLevel())); - while (NurseryTaken < Nursery.size()) { - auto& next = Nursery[NurseryTaken]; - if (next.DataSize > expectedSize && !takeAll) { - break; - } - params->Parts.push_back(next.PartView); - expectedSize += next.DataSize; - ++NurseryTaken; - } - } - } - - if (!Policy->KeepEraseMarkers) { - params->IsFinal = NurseryTaken == Nursery.size() && AllParts.empty(); - } else { - params->IsFinal = false; - } - - if (!params->IsFinal && !Policy->KeepEraseMarkers) { - TVector<const TBounds*> allBounds; - for (size_t pos = NurseryTaken; pos < Nursery.size(); ++pos) { - auto& item = Nursery[pos]; - for (const auto& slice : *item.PartView.Slices) { - allBounds.push_back(&slice); - } - } - for (auto& kv : AllParts) { - for (auto& slice : *kv.second.Slices) { - allBounds.push_back(&slice); - } - } - Y_VERIFY(allBounds, "Unexpected lack of slice bounds"); - params->UnderlayMask = TUnderlayMask::Build(TableInfo.RowScheme, allBounds); - } - - if (TableInfo.SplitKeys) { - TVector<TSerializedCellVec> splitKeys; - auto* last = Shards.Back(); - auto* shard = Shards.Front(); - while (shard != last) { - splitKeys.push_back(TableInfo.SplitKeys.at(shard->RightKey)); - shard = shard->Next()->Node(); - } - Y_VERIFY(splitKeys, "Unexpected lack of split keys"); - params->SplitKeys = MakeHolder<TSplitKeys>(TableInfo.RowScheme, std::move(splitKeys)); - } - - MemCompactionId = Backend->BeginCompaction(std::move(params)); - MemCompactionForced = forcedCompactionId != 0; - return MemCompactionId; - } - - TCompactionChanges TShardedCompactionStrategy::CompactionFinished( - ui64 compactionId, - THolder<TCompactionParams> paramsRaw, - THolder<TCompactionResult> result) - { - auto* params = CheckedCast<TShardedCompactionParams*>(paramsRaw.Get()); - bool processNursery = false; - bool doNurseryFlush = false; - - if (auto* shard = params->InputShard) { - Y_VERIFY(shard->Task.State == EState::Compacting); - Y_VERIFY(shard->Task.CompactionId == compactionId); - shard->Task.State = EState::Free; - shard->Task.TaskId = 0; - shard->Task.CompactionId = 0; - shard->FailingLevel = Max<size_t>(); - - // Every compacted part should have the original from our state - Y_VERIFY(params->Parts.size() == params->Original.size()); - } else if (compactionId == ForcedCompactionTask.CompactionId) { - Y_VERIFY(ForcedCompactionTask.State == EState::Compacting); - ForcedCompactionTask.State = EState::Free; - ForcedCompactionTask.TaskId = 0; - ForcedCompactionTask.CompactionId = 0; - } else { - Y_VERIFY(MemCompactionId == compactionId); - MemCompactionId = 0; - - if (MemCompactionForced) { - doNurseryFlush = true; - MemCompactionForced = false; - ForcedCompactionPending = true; - } else { - processNursery = true; - } - - // Mem compaction only uses nursery, no reused or original parts expected - Y_VERIFY(!params->Original); - Y_VERIFY(!params->Reused); - - for (const auto& partView : params->Parts) { - Y_VERIFY(NurseryTaken > 0); - auto& item = Nursery.front(); - Y_VERIFY(partView.Part == item.PartView.Part); - AllBackingSize -= partView.Part->BackingSize(); - NurseryDataSize -= item.DataSize; - Nursery.pop_front(); - --NurseryTaken; - } - Y_VERIFY(NurseryTaken == 0); - params->Parts.clear(); - } - - // These parts may have become garbage after compaction - THashSet<TGlobalPart*> updateGarbageQueue; - - // Remove compacted slices from global parts - for (const auto& partView : params->Parts) { - auto label = partView->Label; - auto* allInfo = AllParts.FindPtr(label); - Y_VERIFY(allInfo, "Compacted part %s is not registered", label.ToString().c_str()); - - allInfo->Slices = TSlices::Subtract(allInfo->Slices, partView.Slices); - updateGarbageQueue.emplace(allInfo); - } - - // Remove originally chosen slices from our state - for (auto& partView : params->Original) { - auto label = partView->Label; - auto* allInfo = AllParts.FindPtr(label); - Y_VERIFY(allInfo, "Compacted part %s is not registered", label.ToString().c_str()); - - size_t removedCount = 0; - - // TODO: make it more efficient - auto writePos = allInfo->Shards.begin(); - auto readPos = allInfo->Shards.begin(); - while (readPos != allInfo->Shards.end()) { - auto* partShard = *readPos; - auto* info = partShard->Parts.FindPtr(label); - Y_VERIFY(info, "Compacted part %s cannot be found in a shard", label.ToString().c_str()); - for (const auto& input : *partView.Slices) { - auto pos = info->Slices.find(input); - if (pos != info->Slices.end()) { - auto& item = pos->second; - partShard->Levels.Reset(); - partShard->PerLevelStats.clear(); - partShard->PerLevelGarbage.clear(); - partShard->Stats -= item.Stats; - info->Slices.erase(pos); - ++removedCount; - } - } - if (info->Slices.empty()) { - // This part no longer contributes to this shard - partShard->Parts.erase(label); - ++readPos; - continue; - } - if (writePos != readPos) { - *writePos = *readPos; - } - ++writePos; - ++readPos; - } - if (writePos != allInfo->Shards.end()) { - allInfo->Shards.erase(writePos, allInfo->Shards.end()); - } - - Y_VERIFY(removedCount == partView.Slices->size(), "Not all slices have been removed"); - - if (allInfo->Shards.empty() && allInfo->SplitBlocks == 0 && allInfo->Slices->empty()) { - updateGarbageQueue.erase(allInfo); - TableInfo.GarbageParts.erase(label); - AllBackingSize -= allInfo->Part->BackingSize(); - AllParts.erase(label); - } - } - - for (auto* allInfo : updateGarbageQueue) { - UpdateGarbageStats(allInfo); - } - - TVector<TPartView> parts; - - auto flushNursery = [this, &parts]() { - while (Nursery) { - // Go from oldest to newest - auto& item = Nursery.back(); - auto& partView = item.PartView; - AllBackingSize -= partView.Part->BackingSize(); - NurseryDataSize -= item.DataSize; - parts.push_back(std::move(partView)); - Nursery.pop_back(); - } - }; - - if (doNurseryFlush) { - flushNursery(); - } - - if (processNursery) { - // We expect parts to be from the same epoch and in correct order - for (auto& partView : result->Parts) { - TPartDataSizeHelper helper(partView.Part.Get()); - ui64 fullSize = helper.CalcSize(0, Max<TRowId>()); - if (fullSize >= Policy->ShardPolicy.GetMinSliceSize()) { - flushNursery(); - parts.push_back(std::move(partView)); - continue; - } - // Add new item to front (it's the newest) - AllBackingSize += partView.Part->BackingSize(); - NurseryDataSize += fullSize; - auto& nurseryItem = Nursery.emplace_front(); - nurseryItem.PartView = std::move(partView); - nurseryItem.DataSize = fullSize; - } - } else { - parts.reserve(parts.size() + result->Parts.size()); - for (auto& partView : result->Parts) { - parts.push_back(std::move(partView)); - } - } - result->Parts.clear(); - - // Register all "new" parts in the global state - for (const auto& partView : parts) { - Y_VERIFY(partView.Slices && !partView.Slices->empty()); - EnsureGlobalPart(partView.Part, partView.Slices); - } - - // Add any reused slices back to our state - if (params->Reused) { - parts.reserve(parts.size() + params->Reused.size()); - for (auto& partView : params->Reused) { - parts.emplace_back(std::move(partView)); - } - params->Reused.clear(); - } - - AddParts(std::move(parts)); - - return ApplyChanges(); - } - - void TShardedCompactionStrategy::PartMerged(TPartView part, ui32 level) { - Y_VERIFY(level == 255, "Unexpected level of the merged part"); - - Y_VERIFY(part.Slices && !part.Slices->empty()); - EnsureGlobalPart(part.Part, part.Slices); - - TVector<TPartView> parts(Reserve(1)); - parts.emplace_back(std::move(part)); - AddParts(std::move(parts)); - } - - void TShardedCompactionStrategy::PartMerged(TIntrusiveConstPtr<TColdPart> part, ui32 level) { - Y_VERIFY(level == 255, "Unexpected level of the merged part"); - - ColdParts.emplace_back(std::move(part)); - Y_FAIL("Sharded compaction does not support cold parts"); - } - - TCompactionChanges TShardedCompactionStrategy::PartsRemoved(TArrayRef<const TLogoBlobID> parts) { - Y_UNUSED(parts); - - // For simplicity just stop and start again - auto state = SnapshotState(); - - Stop(); - - Start(std::move(state)); - - // We don't keep per-part state, so no changes - return { }; - } - - TCompactionChanges TShardedCompactionStrategy::ApplyChanges() { - RequestChangesPending = false; - - TCompactionChanges changes{ }; - - struct TState : public TIntrusiveListItem<TState> { - size_t Index = Max<size_t>(); - }; - - TIntrusiveList<TState> needSort; - THashMap<TLogoBlobID, TState> byLabel; - - TVector<TPartView> parts(Reserve(SliceSplitResults.size())); - - for (auto& result : SliceSplitResults) { - Y_VERIFY(result.Part, "Unexpected result without a part"); - - TChangeSlices* change; - - auto& state = byLabel[result.Part->Label]; - if (state.Index != Max<size_t>()) { - needSort.PushBack(&state); - change = &changes.SliceChanges[state.Index]; - } else { - state.Index = changes.SliceChanges.size(); - change = &changes.SliceChanges.emplace_back(); - change->Label = result.Part->Label; - } - - // New slices are guaranteed to be already sorted - change->NewSlices.insert(change->NewSlices.end(), - result.NewSlices.begin(), result.NewSlices.end()); - - auto* allInfo = AllParts.FindPtr(result.Part->Label); - Y_VERIFY(allInfo, "Cannot find a globally registered part %s", result.Part->Label.ToString().c_str()); - allInfo->SplitBlocks--; - - // Construct a fake TPartView so we may reuse AddParts method - parts.emplace_back(TPartView{ std::move(result.Part), nullptr, new TSlices(std::move(result.NewSlices)) }); - - // All affected shards are no longer blocked by this op - for (auto* affectedShard : result.Shards) { - affectedShard->SplitBlocks--; - } - } - SliceSplitResults.clear(); - - // We may need to sort some changes - for (const auto& state : needSort) { - auto cmpByRowId = [](const TSlice& a, const TSlice& b) noexcept -> bool { - return a.EndRowId() <= b.BeginRowId(); - }; - auto& change = changes.SliceChanges[state.Index]; - std::sort(change.NewSlices.begin(), change.NewSlices.end(), cmpByRowId); - } - - // Apply changes to our global state (matches the state after method returns) - for (const auto& change : changes.SliceChanges) { - auto* allInfo = AllParts.FindPtr(change.Label); - Y_VERIFY(allInfo, "Generated changes to unregistered part %s", change.Label.ToString().c_str()); - allInfo->Slices = TSlices::Replace(allInfo->Slices, change.NewSlices); - } - - // We couldn't see these slices during split, add them back - AddParts(std::move(parts)); - - // Check for split/merge unless forced compaction is currently running - if (ForcedCompactionTask.State != EState::Compacting) { - const auto& policy = Policy->ShardPolicy; - - auto isFreeShard = [](TTableShard* shard) -> bool { - if (shard->SplitBlocks) { - return false; - } - return shard->Task.State == EState::Free || shard->Task.State == EState::Pending; - }; - - bool stateChanged = false; - const auto minShardSize = policy.GetMinShardSize(); - const auto maxShardSize = policy.GetMaxShardSize(); - - // Split shards that are too big - if (maxShardSize > 0) { - TVector<TTableShard*> bigShards; - for (auto& shard : Shards) { - if (isFreeShard(&shard) && shard.Stats.Size > maxShardSize) { - if (auto logl = Logger->Log(NUtil::ELnLev::Debug)) { - logl << "Table " << Table << " has a big shard " << shard.Id << " with " - << shard.Stats.Size << " > " << maxShardSize << " bytes"; - } - - bigShards.push_back(&shard); - } - } - - parts.clear(); - for (auto* shard : bigShards) { - TSerializedCellVec splitKey; - if (shard->FindSplitKey(splitKey, *TableInfo.RowScheme->Keys)) { - ui64 newKeyId = ++TableInfo.LastSplitKey; - - if (auto logl = Logger->Log(NUtil::ELnLev::Debug)) { - logl << "Table " << Table << " adding shard key " << newKeyId - << ": " << TPrintableTypedCells(splitKey.GetCells(), TableInfo.RowScheme->Keys->BasicTypes()); - } - - changes.StateChanges[newKeyId] = splitKey.GetBuffer(); - TableInfo.SplitKeys[newKeyId] = splitKey; - - auto left = MakeHolder<TTableShard>(); - auto right = MakeHolder<TTableShard>(); - left->Id = ++TableInfo.LastShardId; - left->LeftKey = shard->LeftKey; - left->RightKey = newKeyId; - right->Id = ++TableInfo.LastShardId; - right->LeftKey = newKeyId; - right->RightKey = shard->RightKey; - - for (auto& kvPart : shard->Parts) { - TIntrusiveConstPtr<TPart> part = kvPart.second.Part; - TVector<TSlice> slices(Reserve(kvPart.second.Slices.size())); - for (auto& kvSlice : kvPart.second.Slices) { - slices.emplace_back(kvSlice.second.Slice); - } - - TGlobalPart* allInfo = AllParts.FindPtr(part->Label); - Y_VERIFY(allInfo); - auto end = std::remove(allInfo->Shards.begin(), allInfo->Shards.end(), shard); - allInfo->Shards.erase(end, allInfo->Shards.end()); - - // Construct a fake TPartView so we may reuse AddParts method - parts.emplace_back(TPartView{std::move(part), nullptr, new TSlices(std::move(slices))}); - } - - left.Release()->LinkBefore(shard); - right.Release()->LinkBefore(shard); - - CancelTask(shard->Task); - delete shard; - - stateChanged = true; - } - } - - if (parts) { - // Add back all the slices we have taken from splits - AddParts(std::move(parts)); - } - } - - // Merge shards that are too small - if (Shards.Size() > 1) { - using TShardList = TList<TTableShard*>; - using TShardHash = THashMap<TTableShard*, TShardList::iterator>; - TShardList smallShardList; - TShardHash smallShardHash; - for (auto& shard : Shards) { - if (isFreeShard(&shard) && shard.Stats.Size <= minShardSize) { - if (auto logl = Logger->Log(NUtil::ELnLev::Debug)) { - logl << "Table " << Table << " has a small shard " << shard.Id << " with " - << shard.Stats.Size << " > " << minShardSize << " bytes"; - } - - smallShardHash[&shard] = smallShardList.insert(smallShardList.end(), &shard); - } - } - - // sort small shards by their data size - smallShardList.sort([](TTableShard* a, TTableShard* b) -> bool { - return a->Stats.Size < b->Stats.Size; - }); - - parts.clear(); - while (!smallShardList.empty()) { - auto* first = smallShardList.front(); - smallShardHash.erase(first); - smallShardList.pop_front(); - - // Empty shards are a special case, it's ok for them to just disappear - // This also doesn't change the state of their neighbors in any way - // The only side effect would be for those shards to become wider - if (first->Parts.empty()) { - auto* shard = first; - Y_VERIFY(!shard->SplitBlocks); - - ui64 removedKey = 0; - if (shard != Shards.Back()) { - removedKey = shard->RightKey; - Y_VERIFY(removedKey != 0); - auto* target = shard->Next()->Node(); - target->LeftKey = shard->LeftKey; - } else if (shard != Shards.Front()) { - removedKey = shard->LeftKey; - Y_VERIFY(removedKey != 0); - auto* target = shard->Prev()->Node(); - target->RightKey = shard->RightKey; - } - - if (removedKey) { - auto it = TableInfo.SplitKeys.find(removedKey); - Y_VERIFY(it != TableInfo.SplitKeys.end()); - - if (auto logl = Logger->Log(NUtil::ELnLev::Debug)) { - logl << "Table " << Table << " removing shard key " << it->first - << ": " << TPrintableTypedCells(it->second.GetCells(), TableInfo.RowScheme->Keys->BasicTypes()); - } - - changes.StateChanges[removedKey] = { }; - TableInfo.SplitKeys.erase(it); - - CancelTask(shard->Task); - delete shard; - - stateChanged = true; - } - - continue; - } - - auto* last = first; - ui64 mergedSize = first->Stats.Size; - - while (true) { - auto* left = first != Shards.Front() ? first->Prev()->Node() : nullptr; - auto* right = last != Shards.Back() ? last->Next()->Node() : nullptr; - if (left && !smallShardHash.contains(left)) { - left = nullptr; - } - if (right && !smallShardHash.contains(right)) { - right = nullptr; - } - if (left && right) { - if (left->Stats.Size < right->Stats.Size) { - right = nullptr; - } else { - left = nullptr; - } - } - if (left) { - if (mergedSize + left->Stats.Size > maxShardSize && maxShardSize > 0) { - break; - } - first = left; - smallShardList.erase(smallShardHash.at(left)); - smallShardHash.erase(left); - mergedSize += left->Stats.Size; - } else if (right) { - if (mergedSize + right->Stats.Size > maxShardSize && maxShardSize > 0) { - break; - } - last = right; - smallShardList.erase(smallShardHash.at(right)); - smallShardHash.erase(right); - mergedSize += right->Stats.Size; - } else { - break; - } - } - - if (first != last) { - auto merged = MakeHolder<TTableShard>(); - merged->Id = ++TableInfo.LastShardId; - merged->LeftKey = first->LeftKey; - merged->RightKey = last->RightKey; - merged.Release()->LinkBefore(first); - - TTableShard* next = first; - while (next) { - auto* shard = next; - next = (shard != last) ? shard->Next()->Node() : nullptr; - - for (auto& kvPart : shard->Parts) { - TIntrusiveConstPtr<TPart> part = kvPart.second.Part; - TVector<TSlice> slices(Reserve(kvPart.second.Slices.size())); - for (auto& kvSlice : kvPart.second.Slices) { - slices.emplace_back(kvSlice.second.Slice); - } - - TGlobalPart* allInfo = AllParts.FindPtr(part->Label); - Y_VERIFY(allInfo); - auto end = std::remove(allInfo->Shards.begin(), allInfo->Shards.end(), shard); - allInfo->Shards.erase(end, allInfo->Shards.end()); - - // Construct a fake TPartView so we may reuse AddParts method - parts.emplace_back(TPartView{std::move(part), nullptr, new TSlices(std::move(slices))}); - } - - if (shard != first) { - auto it = TableInfo.SplitKeys.find(shard->LeftKey); - Y_VERIFY(it != TableInfo.SplitKeys.end()); - - if (auto logl = Logger->Log(NUtil::ELnLev::Debug)) { - logl << "Table " << Table << " removing shard key " << it->first - << ": " << TPrintableTypedCells(it->second.GetCells(), TableInfo.RowScheme->Keys->BasicTypes()); - } - - changes.StateChanges[shard->LeftKey] = { }; - TableInfo.SplitKeys.erase(it); - } - - CancelTask(shard->Task); - delete shard; - - stateChanged = true; - } - } - } - - if (parts) { - // Add back all the slices we have taken from merges - AddParts(std::move(parts)); - } - } - - if (stateChanged) { - SerializeStateInfo(&changes.StateChanges[0]); - } - } - - CheckCompactions(); - - return changes; - } - - TCompactionState TShardedCompactionStrategy::SnapshotState() { - TCompactionState state; - - SerializeStateInfo(&state.StateSnapshot[0]); - for (auto& kv : TableInfo.SplitKeys) { - state.StateSnapshot[kv.first] = kv.second.GetBuffer(); - } - - return state; - } - - bool TShardedCompactionStrategy::AllowForcedCompaction() { - return !MemCompactionForced && !ForcedCompactionPending && ForcedCompactionTask.State == EState::Free; - } - - void TShardedCompactionStrategy::OutputHtml(IOutputStream &out) { - HTML(out) { - if (ForcedCompactionPending || ForcedCompactionTask.State != EState::Free) { - DIV_CLASS("row") { - out << "Forced compaction"; - if (ForcedCompactionPending) { - out << " waiting"; - } - out << ", " << ForcedCompactionTask.State; - if (ForcedCompactionTask.State != EState::Free) { - out << ", Task #" << ForcedCompactionTask.TaskId; - out << " (priority " << ForcedCompactionTask.Priority << ")"; - out << " submitted " << ForcedCompactionTask.SubmissionTimestamp.ToStringLocal(); - } - } - } - - for (auto& shard : Shards) { - DIV_CLASS("row") { - out << "Shard " << shard.Id << ' '; - out << TPrintableShardRange(shard, TableInfo); - out << ", " << shard.Parts.size() << " parts"; - if (shard.Levels) { - out << ", " << shard.Levels->size() << " levels"; - } else if (shard.SplitBlocks) { - out << ", " << shard.SplitBlocks << " slice splits"; - } - out << ", " << shard.Stats.Size << " bytes"; - out << ", " << shard.Stats.Rows << " rows"; - out << ", " << shard.Task.State; - if (shard.Task.State != EState::Free) { - out << ", Task #" << shard.Task.TaskId; - out << " (priority " << shard.Task.Priority << ")"; - out << " submitted " << shard.Task.SubmissionTimestamp.ToStringLocal(); - } - } - } - - PRE() { - for (const auto& item : Nursery) { - const auto* part = item.PartView.Part.Get(); - const auto& label = part->Label; - out << "Genstep: " << label.Generation() << ":" << label.Step(); - out << " epoch " << part->Epoch; - out << ", Backing size: " << part->BackingSize(); - out << ", Base: " << label; - out << ", nursery"; - out << Endl; - } - - TVector<const TPart*> parts(Reserve(AllParts.size())); - for (auto& kv : AllParts) { - parts.push_back(kv.second.Part.Get()); - } - std::sort(parts.begin(), parts.end(), [](const TPart* a, const TPart* b) -> bool { - if (a->Epoch != b->Epoch) { - return a->Epoch > b->Epoch; - } - return a->Label > b->Label; - }); - for (const TPart* part : parts) { - auto& label = part->Label; - out << "Genstep: " << label.Generation() << ":" << label.Step(); - out << " epoch " << part->Epoch; - out << ", Backing size: " << part->BackingSize(); - out << ", Base: " << label; - auto& info = AllParts.at(label); - out << ", " << info.Slices->size() << " slices"; - if (info.Shards) { - out << ", " << info.Shards.size() << " shards"; - } - if (info.SplitBlocks) { - out << ", " << info.SplitBlocks << " slice splits"; - } - out << Endl; - } - } - } - } - - void TShardedCompactionStrategy::OnSliceSplitResult(TSliceSplitResult result) { - SliceSplitResults.emplace_back(std::move(result)); - RequestChanges(); - } - - void TShardedCompactionStrategy::RequestChanges() noexcept { - if (!RequestChangesPending) { - RequestChangesPending = true; - Backend->RequestChanges(Table); - } - } - - void TShardedCompactionStrategy::CheckCompactions() noexcept { - CheckForcedCompaction(); - - for (auto& shard : Shards) { - CheckShardCompaction(&shard); - } - } - - void TShardedCompactionStrategy::AddParts(TVector<TPartView> parts) { - // Types and default values for current table keys - const auto& keyDefaults = *TableInfo.RowScheme->Keys; - - // A part/slice combination we scheduled to add - struct TItem { - TIntrusiveConstPtr<TPart> Part; - TSlice Slice; - - TItem(TIntrusiveConstPtr<TPart> part, TSlice slice) - : Part(std::move(part)) - , Slice(std::move(slice)) - { } - }; - - // Sorts items by their first key - auto cmpByFirstKey = [&keyDefaults](const TItem& a, const TItem& b) noexcept -> bool { - auto left = a.Slice.FirstKey.GetCells(); - auto right = b.Slice.FirstKey.GetCells(); - if (int cmp = ComparePartKeys(left, right, keyDefaults)) { - return cmp < 0; - } - return a.Slice.FirstInclusive && !b.Slice.FirstInclusive; - }; - - // Returns true if the item is completely to the left of boundary - auto cmpFullyInside = [&keyDefaults](const TItem& a, const TSerializedCellVec& boundary) noexcept -> bool { - auto left = a.Slice.LastKey.GetCells(); - if (Y_UNLIKELY(!left)) { - return false; // +inf - } - if (int cmp = ComparePartKeys(left, boundary.GetCells(), keyDefaults)) { - return cmp < 0; - } - return !a.Slice.LastInclusive; - }; - - // Returns true if the item is completely to the right of boundary - auto cmpFullyOutside = [&keyDefaults](const TItem& a, const TSerializedCellVec& boundary) noexcept -> bool { - auto right = a.Slice.FirstKey.GetCells(); - if (Y_UNLIKELY(!right)) { - return false; // -inf - } - return ComparePartKeys(boundary.GetCells(), right, keyDefaults) <= 0; - }; - - // A part/slice combination that crosses at least one shard boundary - struct TSplitItem { - TIntrusiveConstPtr<TPart> Part; - TSlice Slice; - TVector<TTableShard*> Shards; - - TSplitItem(TIntrusiveConstPtr<TPart> part, TSlice slice, TTableShard* firstShard) - : Part(std::move(part)) - , Slice(std::move(slice)) - { - Shards.push_back(firstShard); - } - }; - - // Heap order of split items by their last key - auto cmpHeapByLastKey = [&keyDefaults](const TSplitItem& a, const TSplitItem& b) noexcept -> bool { - auto left = b.Slice.LastKey.GetCells(); - auto right = a.Slice.LastKey.GetCells(); - if (int cmp = ComparePartKeys(left, right, keyDefaults)) { - return cmp < 0; - } - return !b.Slice.LastInclusive && a.Slice.LastInclusive; - }; - - // Returns true if item is completely to the left of boundary - auto cmpSplitFullyInside = [&keyDefaults](const TSplitItem& a, const TSerializedCellVec& boundary) noexcept -> bool { - auto left = a.Slice.LastKey.GetCells(); - if (Y_UNLIKELY(!left)) { - return false; // +inf - } - if (int cmp = ComparePartKeys(left, boundary.GetCells(), keyDefaults)) { - return cmp < 0; - } - return !a.Slice.LastInclusive; - }; - - struct TPendingState { - TTableShard* const Shard; - TTablePart* const Info; - TPartDataSizeHelper SizeHelper; - const bool IsGarbage; - - TPendingState(TTableShard* shard, TTablePart* info, bool isGarbage) noexcept - : Shard(shard) - , Info(info) - , SizeHelper(info->Part.Get()) - , IsGarbage(isGarbage) - { - } - - void AddSlice(TSlice slice) noexcept { - TTablePart::TSliceId sliceId(slice); - auto r = Info->Slices.emplace( - std::piecewise_construct, - std::forward_as_tuple(sliceId), - std::forward_as_tuple()); - - Y_VERIFY(r.second, "Duplicate slices for rows [%" PRIu64 ",%" PRIu64 ")", - sliceId.Begin, sliceId.End); - - auto& item = r.first->second; - item.Slice = std::move(slice); - item.Stats.Init(Info->Part.Get(), item.Slice, SizeHelper); - - Shard->Stats += item.Stats; - - if (Shard->Levels) { - if (Info->Part->Epoch >= Shard->Levels->GetMaxEpoch()) { - Shard->RegisterItem(*Info, item, IsGarbage); - } else { - // Adding new slices is no longer trivial - Shard->Levels.Reset(); - } - } - } - }; - - size_t slicesCount = 0; - for (auto& partView : parts) { - Y_VERIFY(!partView.Slices->empty(), "Attempt to add part without slices"); - - slicesCount += partView.Slices->size(); - - auto* allInfo = AllParts.FindPtr(partView->Label); - Y_VERIFY(allInfo, "Added part %s is not globally registered", partView->Label.ToString().c_str()); - } - - // We may need to split slices over multiple shards - if (Shards.Size() > 1) { - - TVector<TItem> queue(Reserve(slicesCount)); - for (auto& partView : parts) { - for (const auto& slice : *partView.Slices) { - queue.emplace_back(partView.Part, slice); - } - } - std::sort(queue.begin(), queue.end(), cmpByFirstKey); - - TTableShard* shard = Shards.Front(); - TSerializedCellVec* shardRightKey = TableInfo.SplitKeys.FindPtr(shard->RightKey); - Y_VERIFY(shardRightKey); - - TTableShard* lastShard = Shards.Back(); - - TVector<TItem*> pending; - auto flushPending = [&]() noexcept -> void { - std::sort(pending.begin(), pending.end(), [](const TItem* a, const TItem* b) noexcept -> bool { - if (a->Part->Epoch != b->Part->Epoch) { - // Prefer older epochs first - return a->Part->Epoch < b->Part->Epoch; - } - if (a->Part.Get() != b->Part.Get()) { - // Prefer same-epoch parts in their natural order - return a->Part->Label < b->Part->Label; - } - // Prefer same-part slices in their natural order - return a->Slice.BeginRowId() < b->Slice.BeginRowId(); - }); - - std::optional<TPendingState> state; - - for (auto* pItem : pending) { - if (!state || state->Info->Part.Get() != pItem->Part.Get()) { - state.reset(); - auto label = pItem->Part->Label; - state.emplace(shard, EnsurePart(shard, std::move(pItem->Part)), TableInfo.GarbageParts.contains(label)); - } - state->AddSlice(std::move(pItem->Slice)); - } - - state.reset(); - - pending.clear(); - }; - - TVector<TSplitItem> splitQueue; - auto flushShard = [&]() noexcept -> void { - Y_VERIFY(shard != lastShard); - - // Move to the next shard - shard = shard->Next()->Node(); - - if (shard != lastShard) { - shardRightKey = TableInfo.SplitKeys.FindPtr(shard->RightKey); - Y_VERIFY(shardRightKey); - } else { - shardRightKey = nullptr; - } - - if (splitQueue) { - // Everything left in split queue crosses into the next shard - for (auto& splitItem : splitQueue) { - splitItem.Shards.push_back(shard); - } - shard->SplitBlocks += splitQueue.size(); - - // Don't even try building levels for this shard - shard->Levels.Reset(); - shard->PerLevelStats.clear(); - } - - while (!splitQueue.empty() && (shard == lastShard || cmpSplitFullyInside(splitQueue.front(), *shardRightKey))) { - std::pop_heap(splitQueue.begin(), splitQueue.end(), cmpHeapByLastKey); - auto& splitItem = splitQueue.back(); - - AllParts.at(splitItem.Part->Label).SplitBlocks++; - - auto op = MakeHolder<TSliceSplitOp>( - this, - &TableInfo, - std::move(splitItem.Shards), - std::move(splitItem.Part), - std::move(splitItem.Slice)); - - auto* opWeak = op.Get(); - PendingSliceSplits.PushBack(opWeak); - - if (auto readId = Backend->BeginRead(std::move(op))) { - // The read operation is currently pending - // Remember its id so we may cancel it later - opWeak->SetReadId(readId); - } - - splitQueue.pop_back(); - } - }; - - auto pos = queue.begin(); - while (pos != queue.end()) { - auto& item = *pos; - - if (shard == lastShard || cmpFullyInside(item, *shardRightKey)) { - pending.emplace_back(&item); - ++pos; - continue; - } - - if (cmpFullyOutside(item, *shardRightKey)) { - flushPending(); - flushShard(); - continue; - } - - // This is the first shard where slice crosses into the next shard - splitQueue.emplace_back(std::move(item.Part), std::move(item.Slice), shard); - std::push_heap(splitQueue.begin(), splitQueue.end(), cmpHeapByLastKey); - shard->SplitBlocks++; - ++pos; - - // Don't even try building levels for this shard - shard->Levels.Reset(); - shard->PerLevelStats.clear(); - } - - // Flush any pending slices for the last shard - flushPending(); - - while (splitQueue && shard != lastShard) { - flushShard(); - } - - Y_VERIFY(pending.empty()); - Y_VERIFY(pos == queue.end()); - Y_VERIFY(splitQueue.empty()); - - } else { - - // Avoid complex logic when there's a single shard - auto* shard = Shards.Front(); - - // Make sure new parts are sorted by epoch - std::sort(parts.begin(), parts.end(), [](const TPartView& a, const TPartView& b) -> bool { - if (a->Epoch != b->Epoch) { - return a->Epoch < b->Epoch; - } - return a->Label < b->Label; - }); - - for (auto& partView : parts) { - auto label = partView->Label; - - TPendingState state(shard, EnsurePart(shard, std::move(partView.Part)), TableInfo.GarbageParts.contains(label)); - - for (const auto& slice : *partView.Slices) { - state.AddSlice(slice); - } - } - } - } - - TShardedCompactionStrategy::TGlobalPart* TShardedCompactionStrategy::EnsureGlobalPart( - const TIntrusiveConstPtr<TPart>& part, - const TIntrusiveConstPtr<TSlices>& slices) noexcept - { - auto* allInfo = AllParts.FindPtr(part->Label); - if (!allInfo) { - TPartDataSizeHelper helper(part.Get()); - allInfo = &AllParts[part->Label]; - allInfo->Part = part; - allInfo->TotalSize = helper.CalcSize(0, Max<TRowId>()); - allInfo->GarbageSize = 0; - AllBackingSize += part->BackingSize(); - } - allInfo->Slices = TSlices::Merge(allInfo->Slices, slices); - UpdateGarbageStats(allInfo); - return allInfo; - } - - void TShardedCompactionStrategy::UpdateGarbageStats(TShardedCompactionStrategy::TGlobalPart* allInfo) noexcept { - const auto& label = allInfo->Part->Label; - - TRowId last = 0; - allInfo->GarbageSize = 0; - TPartDataSizeHelper helper(allInfo->Part.Get()); - for (const auto& slice : *allInfo->Slices) { - TRowId next = slice.BeginRowId(); - if (last < next) { - allInfo->GarbageSize += helper.CalcSize(last, next); - } - last = slice.EndRowId(); - } - allInfo->GarbageSize += helper.CalcSize(last, Max<TRowId>()); - - ui32 garbagePercent = allInfo->GarbageSize * 100 / allInfo->TotalSize; - if (garbagePercent >= Policy->ShardPolicy.GetMaxGarbagePercentToReuse()) { - if (TableInfo.GarbageParts.emplace(label).second) { - // This part just turned into garbage, update all shards - for (auto* shard : allInfo->Shards) { - if (!shard->Levels) { - continue; - } - if (auto* partInfo = shard->Parts.FindPtr(label)) { - for (auto& kv : partInfo->Slices) { - size_t index = kv.second.Level->Index; - if (index < shard->PerLevelGarbage.size()) { - shard->PerLevelGarbage[index] += kv.second.Stats.Size; - } - } - } - } - } - } else { - if (TableInfo.GarbageParts.erase(label)) { - // This part just stopped being garbage, update all shards - for (auto* shard : allInfo->Shards) { - if (!shard->Levels) { - continue; - } - if (auto* partInfo = shard->Parts.FindPtr(label)) { - for (auto& kv : partInfo->Slices) { - size_t index = kv.second.Level->Index; - if (index < shard->PerLevelGarbage.size()) { - shard->PerLevelGarbage[index] -= kv.second.Stats.Size; - } - } - } - } - } - } - } - - TTablePart* TShardedCompactionStrategy::EnsurePart(TTableShard* shard, TIntrusiveConstPtr<TPart> part) noexcept { - auto* info = shard->Parts.FindPtr(part->Label); - if (!info) { - AllParts.at(part->Label).Shards.emplace_back(shard); - - info = &shard->Parts[part->Label]; - info->Part = std::move(part); - } else { - Y_VERIFY(info->Part.Get() == part.Get(), - "Multiple parts with the same label %s", - part->Label.ToString().c_str()); - } - return info; - } - - void TShardedCompactionStrategy::RebuildLevels(TTableShard* shard) noexcept { - Y_VERIFY(!shard->SplitBlocks, "Unexpected RebuildLevels while blocked by split"); - - shard->Levels.Reset(new TLevels(TableInfo.RowScheme->Keys)); - shard->PerLevelStats.clear(); - - TVector<TTablePart*> parts(Reserve(shard->Parts.size())); - for (auto& kv : shard->Parts) { - parts.emplace_back(&kv.second); - } - std::sort(parts.begin(), parts.end(), [](TTablePart* a, TTablePart* b) noexcept -> bool { - if (a->Part->Epoch != b->Part->Epoch) { - return a->Part->Epoch < b->Part->Epoch; - } - return a->Part->Label < b->Part->Label; - }); - - for (TTablePart* info : parts) { - Y_VERIFY(info->Part->Epoch >= shard->Levels->GetMaxEpoch()); - - bool isGarbage = TableInfo.GarbageParts.contains(info->Part->Label); - - for (auto& kv : info->Slices) { - auto& item = kv.second; - shard->RegisterItem(*info, item, isGarbage); - } - } - } - - void TShardedCompactionStrategy::CancelTask(TCompactionTask& task) noexcept { - switch (task.State) { - case EState::Free: { - // nothing - break; - } - case EState::Pending: { - Broker->CancelTask(task.TaskId); - break; - } - case EState::Compacting: { - Backend->CancelCompaction(task.CompactionId); - break; - } - } - task.State = EState::Free; - task.TaskId = 0; - task.CompactionId = 0; - } - - bool TShardedCompactionStrategy::CheckShardCompaction(TTableShard* shard, bool schedule) noexcept { - if (shard->SplitBlocks || shard->Task.State == EState::Compacting) { - // cannot compact until we have the whole picture - return false; - } - - bool garbage = false; - bool critical = false; - shard->FailingLevel = Max<size_t>(); - - if (!shard->Levels) { - // build the overall picture about this shard - RebuildLevels(shard); - } - - const auto& policy = Policy->ShardPolicy; - - TVector<TStats> cumulative; - cumulative.resize(shard->Levels->size()); - - size_t idx = shard->Levels->size(); - for (auto level = shard->Levels->begin(); level != shard->Levels->end(); ++level) { - bool failed = false; - size_t nextIdx = idx--; - auto& current = cumulative[idx]; - current += shard->PerLevelStats[idx]; - - if (shard->PerLevelGarbage[idx] > 0) { - garbage = true; - failed = true; - } - - if (nextIdx < cumulative.size()) { - auto& next = cumulative[nextIdx]; - if (!failed && - current.Size > 0 && - policy.GetNewDataPercentToCompact() > 0 && - next.Size * 100 > current.Size * policy.GetNewDataPercentToCompact()) - { - failed = true; - } - if (!failed && - current.Rows > 0 && - policy.GetNewRowsPercentToCompact() > 0 && - next.Rows * 100 > current.Rows * policy.GetNewRowsPercentToCompact()) - { - failed = true; - } - current += next; - } - if (!failed && - !Policy->KeepEraseMarkers && - Policy->DroppedRowsPercentToCompact > 0 && - current.DroppedPercent() >= Policy->DroppedRowsPercentToCompact) - { - failed = true; - } - if (policy.GetMaxSlicesPerLevel() > 0 && - level->size() > policy.GetMaxSlicesPerLevel()) - { - // We want to compact this even if it's just a single level - critical = true; - failed = true; - } - if (failed) { - shard->FailingLevel = idx; - } - } - - if (critical && shard->FailingLevel > 0) { - // Take an extra level below so we eventually escape from the - // critical (e.g. by degenerating into full compaction of all - // existing levels) - shard->FailingLevel--; - } - - if (shard->Levels->size() > policy.GetMaxTotalLevels()) { - // We want MaxTotalLevels, compact everything above - size_t level = policy.GetMaxTotalLevels() - 1; - shard->FailingLevel = Min(shard->FailingLevel, level); - } - - if (!critical && !garbage && - shard->FailingLevel != Max<size_t>() && - shard->Levels->size() - shard->FailingLevel < policy.GetMinLevelsToCompact()) - { - // We don't have enough levels to compact - shard->FailingLevel = Max<size_t>(); - } - - if (shard->FailingLevel == Max<size_t>()) { - // We don't want to compact this shard at this time - return false; - } - - if (ForcedCompactionTask.State == EState::Compacting) { - // No shard compactions while forced compaction is in progress - return false; - } - - if (schedule) { - TString type = policy.GetResourceBrokerTask(); - - // TODO: figure out how to assign priorities - ui32 newPriority = policy.GetTaskPriorityBase(); - - // For every N levels over min levels we increase priority - if (policy.GetTaskPriorityLevelsBoost() > 0) { - ui32 levelsBoost = shard->Levels->size() - shard->FailingLevel; - levelsBoost -= Min(levelsBoost, policy.GetMinLevelsToCompact()); - levelsBoost /= policy.GetTaskPriorityLevelsBoost(); - newPriority -= Min(newPriority, levelsBoost); - } - - // For every N bytes of input data we decrease priority - if (policy.GetTaskPrioritySizePenalty() > 0) { - ui32 sizePenalty = cumulative[shard->FailingLevel].Size; - sizePenalty /= policy.GetTaskPrioritySizePenalty(); - newPriority += Min(sizePenalty, ui32(1000)); - } - - switch (shard->Task.State) { - case EState::Free: { - // submit a new task - TString name; - { - TStringOutput out(name); - out << "shard-" << shard->Id; - out << "-table-" << Table; - out << "-" << TaskNameSuffix; - } - shard->Task.SubmissionTimestamp = Time->Now(); - shard->Task.Priority = newPriority; - shard->Task.TaskId = Broker->SubmitTask( - std::move(name), - TResourceParams(std::move(type)) - .WithCPU(1) - .WithPriority(shard->Task.Priority), - [this, shard](TTaskId taskId) { - BeginShardCompaction(shard, taskId); - }); - shard->Task.State = EState::Pending; - break; - } - case EState::Pending: { - // update task priority - if (newPriority < shard->Task.Priority && - (shard->Task.Priority - newPriority) >= shard->Task.Priority / PRIORITY_UPDATE_FACTOR) { - shard->Task.Priority = newPriority; - Broker->UpdateTask( - shard->Task.TaskId, - TResourceParams(std::move(type)) - .WithCPU(1) - .WithPriority(shard->Task.Priority)); - } - break; - } - default: { - Y_FAIL("Unexpected shard state"); - } - } - } - - return true; - } - - bool TShardedCompactionStrategy::CheckForcedCompaction(bool schedule) noexcept { - if (!ForcedCompactionPending || - ForcedCompactionTask.State == EState::Compacting || - PendingSliceSplits || - SliceSplitResults) - { - // Forced compaction cannot run at this time - return false; - } - - if (MemCompactionForced) { - // Don't start big compaction during another forced mem compaction - return false; - } - - if (schedule) { - const auto& policy = Policy->ShardPolicy; - - TString type = policy.GetResourceBrokerTask(); - - // TODO: figure out how to assign priorities - ui32 newPriority = policy.GetTaskPriorityBase(); - - switch (ForcedCompactionTask.State) { - case EState::Free: { - // submit a new task - TString name; - { - TStringOutput out(name); - out << "forced-table-" << Table; - out << "-" << TaskNameSuffix; - } - ForcedCompactionTask.SubmissionTimestamp = Time->Now(); - ForcedCompactionTask.Priority = newPriority; - ForcedCompactionTask.TaskId = Broker->SubmitTask( - std::move(name), - TResourceParams(std::move(type)) - .WithCPU(1) - .WithPriority(ForcedCompactionTask.Priority), - [this](TTaskId taskId) { - BeginForcedCompaction(taskId); - }); - ForcedCompactionTask.State = EState::Pending; - break; - } - case EState::Pending: { - // update task priority - if (newPriority < ForcedCompactionTask.Priority && - (ForcedCompactionTask.Priority - newPriority) >= ForcedCompactionTask.Priority / PRIORITY_UPDATE_FACTOR) { - ForcedCompactionTask.Priority = newPriority; - Broker->UpdateTask( - ForcedCompactionTask.TaskId, - TResourceParams(std::move(type)) - .WithCPU(1) - .WithPriority(ForcedCompactionTask.Priority)); - } - break; - } - default: { - Y_FAIL("Unexpected forced compaction state"); - } - } - } - - return true; - } - - void TShardedCompactionStrategy::BeginShardCompaction(TTableShard* shard, TTaskId taskId) noexcept { - Y_VERIFY(shard->Task.State == EState::Pending); - Y_VERIFY(shard->Task.TaskId == taskId); - if (!CheckShardCompaction(shard, false)) { - // We no longer want to compact this shard for some reason - Broker->FinishTask(taskId, EResourceStatus::Cancelled); - shard->Task.State = EState::Free; - shard->Task.TaskId = 0; - return; - } - - auto params = MakeHolder<TShardedCompactionParams>(); - params->Table = Table; - params->TaskId = shard->Task.TaskId; - params->InputShard = shard; - - struct TInput { - TIntrusiveConstPtr<TPart> Part; - TVector<TSlice> Slices; - TVector<TSlice> Reused; - TIntrusiveConstPtr<TSlices> ReusedRef; - }; - - TEpoch maxEpoch = TEpoch::Min(); - TVector<const TBounds*> edges; - THashMap<const TPart*, TInput> inputs; - - // First gather all inputs - for (auto& level : *shard->Levels) { - if (level.Index < shard->FailingLevel) { - continue; - } - for (auto& item : level) { - auto& input = inputs[item.Part.Get()]; - if (!input.Part) { - maxEpoch = Max(maxEpoch, item.Part->Epoch); - input.Part = item.Part; - } - input.Slices.emplace_back(item.Slice); - } - } - - // Try to find slices that may be reused - TPageReuseBuilder reuseBuilder(*TableInfo.RowScheme->Keys); - for (auto& kv : inputs) { - auto& input = kv.second; - bool reusable = true; - // Part is not reusable if it has any drops - if (reusable && input.Part->Stat.Drops > 0) { - reusable = false; - } - // Avoid reusing parts that are already considered garbage - if (reusable && TableInfo.GarbageParts.contains(input.Part->Label)) { - reusable = false; - } - for (const auto& slice : input.Slices) { - reuseBuilder.AddSlice(kv.first, slice, reusable); - } - } - auto reuseResults = reuseBuilder.Build(); - - // Reuse all we can if not too many output slices are expected - if (reuseResults.ExpectedSlices < Policy->ShardPolicy.GetMaxSlicesPerLevel()) { - for (auto& reusable : reuseResults.Reusable) { - TPartDataSizeHelper helper(reusable.Part); - ui64 dataSize = helper.CalcSize(reusable.Slice.BeginRowId(), reusable.Slice.EndRowId()); - if (dataSize < Policy->ShardPolicy.GetMinSliceSizeToReuse()) { - // Avoid reusing slices that are too small, better to recompact - continue; - } - auto* input = inputs.FindPtr(reusable.Part); - Y_VERIFY(input, "Cannot find reusable part %s in our inputs", - reusable.Part->Label.ToString().c_str()); - input->Reused.emplace_back(std::move(reusable.Slice)); - } - } - - // Now turn those inputs into TPartView structures - for (auto& kv : inputs) { - auto& input = kv.second; - std::sort(input.Slices.begin(), input.Slices.end(), [](const TSlice& a, const TSlice& b) noexcept -> bool { - return a.BeginRowId() < b.BeginRowId(); - }); - TIntrusiveConstPtr<TSlices> original = new TSlices(std::move(input.Slices)); - TIntrusiveConstPtr<TSlices> reused = new TSlices(std::move(input.Reused)); - TIntrusiveConstPtr<TSlices> compacted = TSlices::Subtract(original, reused); - - // Everything we're compacting will become garbage soon - if (reused && !reused->empty() && compacted && !compacted->empty()) { - auto* allInfo = AllParts.FindPtr(input.Part->Label); - Y_VERIFY(allInfo, "Reused part %s is not registered", input.Part->Label.ToString().c_str()); - - TIntrusiveConstPtr<TSlices> afterCompaction = TSlices::Subtract(allInfo->Slices, compacted); - - // Calculate how much garbage this part would have after compaction - TRowId last = 0; - ui64 newGarbage = 0; - TPartDataSizeHelper helper(input.Part.Get()); - for (const auto& slice : *afterCompaction) { - TRowId next = slice.BeginRowId(); - if (last < next) { - newGarbage += helper.CalcSize(last, next); - } - last = slice.EndRowId(); - } - newGarbage += helper.CalcSize(last, Max<TRowId>()); - ui64 newGarbagePercent = newGarbage * 100 / allInfo->TotalSize; - - if (newGarbagePercent >= Policy->ShardPolicy.GetMaxGarbagePercentToReuse()) { - compacted = original; - reused = nullptr; - } - } - - // Make sure compacted data would not interfere with reused slices - if (reused && !reused->empty()) { - input.ReusedRef = reused; // keep pointers alive - for (const auto& slice : *reused) { - edges.emplace_back(&slice); - } - } - - // Create necessary records if there is anything to compact - if (compacted && !compacted->empty()) { - params->Parts.emplace_back(MakePartView(input.Part, std::move(compacted))); - params->Original.emplace_back(MakePartView(input.Part, std::move(original))); - if (reused && !reused->empty()) { - params->Reused.emplace_back(MakePartView(input.Part, std::move(reused))); - } - } - } - - if (!Policy->KeepEraseMarkers) { - params->IsFinal = shard->FailingLevel == 0; - } else { - params->IsFinal = false; - } - - if (!params->IsFinal) { - TVector<const TBounds*> allBounds; - for (auto& level : *shard->Levels) { - if (level.Index < shard->FailingLevel) { - for (auto& item : level) { - if (!Policy->KeepEraseMarkers) { - allBounds.push_back(&item.Slice); - } - if (item.Part->Epoch >= maxEpoch) { - // Prevent newer slices from bubbling up just because - // we have compacted older disjoint slices that happened - // to be higher in the hierarchy. - edges.push_back(&item.Slice); - } - } - } - } - - if (!Policy->KeepEraseMarkers) { - Y_VERIFY(allBounds, "Unexpected lack of underlay slices"); - params->UnderlayMask = TUnderlayMask::Build(TableInfo.RowScheme, allBounds); - } - } - - if (edges) { - params->SplitKeys = MakeHolder<TSplitKeys>(TableInfo.RowScheme, std::move(edges)); - } - - shard->Task.CompactionId = Backend->BeginCompaction(std::move(params)); - shard->Task.State = EState::Compacting; - } - - void TShardedCompactionStrategy::BeginForcedCompaction(TTaskId taskId) noexcept { - Y_VERIFY(ForcedCompactionTask.State == EState::Pending); - Y_VERIFY(ForcedCompactionTask.TaskId == taskId); - Y_VERIFY(ForcedCompactionPending); - - if (!CheckForcedCompaction(false)) { - Broker->FinishTask(taskId, EResourceStatus::Cancelled); - ForcedCompactionTask.State = EState::Free; - ForcedCompactionTask.TaskId = 0; - return; - } - - // Avoid active splits changing state under us - Y_VERIFY(!PendingSliceSplits); - Y_VERIFY(!SliceSplitResults); - - // Cancel all running shard compactions - for (auto& shard : Shards) { - CancelTask(shard.Task); - } - - auto params = MakeHolder<TShardedCompactionParams>(); - params->Table = Table; - params->TaskId = taskId; - params->IsFinal = !Policy->KeepEraseMarkers; - - // Take all non-nursery parts into a new giant compaction - for (auto& kv : AllParts) { - params->Parts.push_back(MakePartView(kv.second.Part, kv.second.Slices)); - params->Original.push_back(params->Parts.back()); - } - - if (TableInfo.SplitKeys) { - TVector<TSerializedCellVec> splitKeys; - auto* last = Shards.Back(); - auto* shard = Shards.Front(); - while (shard != last) { - splitKeys.push_back(TableInfo.SplitKeys.at(shard->RightKey)); - shard = shard->Next()->Node(); - } - Y_VERIFY(splitKeys, "Unexpected lack of split keys"); - params->SplitKeys = MakeHolder<TSplitKeys>(TableInfo.RowScheme, std::move(splitKeys)); - } - - ForcedCompactionTask.CompactionId = Backend->BeginCompaction(std::move(params)); - ForcedCompactionTask.State = EState::Compacting; - ForcedCompactionPending = false; - } - - void TShardedCompactionStrategy::SerializeStateInfo(TString* out) const noexcept { - TShardedStrategyStateInfo header; - header.SetLastSplitKey(TableInfo.LastSplitKey); - header.SetLastShardId(TableInfo.LastShardId); - for (auto& kv : TableInfo.SplitKeys) { - header.AddSplitKeys(kv.first); - } - for (auto& shard : Shards) { - header.AddShards(shard.Id); - } - Y_PROTOBUF_SUPPRESS_NODISCARD header.SerializeToString(out); - } - -} -} -} diff --git a/ydb/core/tablet_flat/flat_comp_shard.h b/ydb/core/tablet_flat/flat_comp_shard.h deleted file mode 100644 index 073a5f731ea..00000000000 --- a/ydb/core/tablet_flat/flat_comp_shard.h +++ /dev/null @@ -1,694 +0,0 @@ -#pragma once - -#include "flat_comp.h" -#include "flat_stat_part.h" -#include "util_fmt_line.h" - -#include <library/cpp/time_provider/time_provider.h> - -#include <util/generic/queue.h> - -namespace NKikimr { -namespace NTable { -namespace NCompShard { - - // Forward declarations - struct TTableInfo; - struct TTablePart; - struct TTableShard; - - /** - * Stats used for data estimation - */ - struct TStats { - // Size of data we may potentially read during compaction - ui64 Size = 0; - - // Number of rows we may potentially read during compaction (including any deletion markers) - ui64 Rows = 0; - - // Estimated number of deletion markers in the [0, Rows] range - double Drops = 0.0; - - void Reset() { - Size = 0; - Rows = 0; - Drops = 0.0; - } - - void Init(const TPart* part, const TSlice& slice) { - TPartDataSizeHelper helper(part); - Init(part, slice, helper); - } - - void Init(const TPart* part, const TSlice& slice, TPartDataSizeHelper& helper) { - Size = helper.CalcSize(slice.BeginRowId(), slice.EndRowId()); - Rows = slice.Rows(); - if (part->Stat.Rows > 0) { - ui64 partRows = part->Stat.Rows; - ui64 partDrops = part->Stat.Drops; - - // Assume worst case: every drop has a hidden row that won't go away - partDrops -= Min(partDrops, part->Stat.HiddenRows); - - if (Rows < partRows) { - Drops = double(partDrops) * Rows / partRows; - } else { - Drops = partDrops; - } - } else { - Drops = 0.0; - } - } - - TStats& operator+=(const TStats& o) { - Size += o.Size; - Rows += o.Rows; - Drops += o.Drops; - if (Drops > Rows) { - Drops = Rows; - } - return *this; - } - - TStats& operator-=(const TStats& o) { - Size -= o.Size; - Rows -= o.Rows; - Drops -= o.Drops; - if (Rows == 0) { - Drops = 0.0; - } - return *this; - } - - ui32 DroppedPercent() const { - ui64 normalRows = Rows > Drops ? ui64(Rows - Drops) : 0; - if (Drops > 0) { - if (Drops >= normalRows) { - return 100; - } - if (normalRows > 0) { - return Min(ui32(Drops * 100 / normalRows), ui32(100)); - } - } - return 0; - } - }; - - /** - * A helper class used to find keys that evenly split slices - */ - class TSplitStatIterator { - public: - TSplitStatIterator(const TKeyCellDefaults& keyDefaults) - : KeyCellDefaults(keyDefaults) - , InitQueue(TCmpHeapByFirstKey{KeyCellDefaults}) - , NextQueue(TCmpHeapByNextKey{KeyCellDefaults}) - , StartQueue(TCmpHeapByFirstKey{KeyCellDefaults}) - , StopQueue(TCmpHeapByLastKey{KeyCellDefaults}) - { } - - void AddSlice(const TPart* part, const TSlice& slice, ui64 size) noexcept; - - bool Next() noexcept; - - TCellsRef CurrentKey() const noexcept { return Key; } - - ui64 LeftSize() const noexcept { return LeftSize_; } - ui64 RightSize() const noexcept { return RightSize_; } - ui64 LeftRows() const noexcept { return LeftRows_; } - ui64 RightRows() const noexcept { return RightRows_; } - - private: - struct TItemState { - const TPart* const Part; - const TSlice Slice; - TPartDataSizeHelper Helper; - NPage::TIndex::TIter NextPage; - TVector<TCell> NextKey; - - TRowId LastRowId; - ui64 LastPageSize = 0; - ui64 LastPageRows = 0; - bool IsPartial = false; - - TItemState(const TPart* part, const TSlice& slice) - : Part(part) - , Slice(slice) - , Helper(Part) - , LastRowId(Slice.BeginRowId()) - { } - - // Make sure this structure is stable in memory - TItemState(const TItemState&) = delete; - TItemState(TItemState&&) = delete; - - void InitNextKey() noexcept; - void InitPageSize() noexcept; - }; - - struct TCmpHeapByFirstKey { - const TKeyCellDefaults& KeyCellDefaults; - - bool operator()(const TItemState* b, const TItemState* a) const noexcept; - }; - - struct TCmpHeapByNextKey { - const TKeyCellDefaults& KeyCellDefaults; - - bool operator()(const TItemState* b, const TItemState* a) const noexcept; - }; - - struct TCmpHeapByLastKey { - const TKeyCellDefaults& KeyCellDefaults; - - bool operator()(const TItemState* b, const TItemState* a) const noexcept; - }; - - template<class TCmp> - using TItemQueue = TPriorityQueue<TItemState*, TVector<TItemState*>, TCmp>; - - private: - bool HasStarted(const TItemState* item) const noexcept; - bool HasStopped(const TItemState* item) const noexcept; - - void InitNextKey(TItemState* item) const noexcept; - void InitPageSize(TItemState* item) const noexcept; - - private: - const TKeyCellDefaults& KeyCellDefaults; - TVector<TCell> Key; - TDeque<TItemState> Items; - TItemQueue<TCmpHeapByFirstKey> InitQueue; - TItemQueue<TCmpHeapByNextKey> NextQueue; - TItemQueue<TCmpHeapByFirstKey> StartQueue; - TItemQueue<TCmpHeapByLastKey> StopQueue; - TVector<TItemState*> ActivationQueue; - ui64 LeftSize_ = 0; - ui64 RightSize_ = 0; - ui64 LeftRows_ = 0; - ui64 RightRows_ = 0; - }; - - /** - * A helper class used to find pages that may be reused during compaction - */ - class TPageReuseBuilder { - public: - TPageReuseBuilder(const TKeyCellDefaults& keyDefaults) - : KeyCellDefaults(keyDefaults) - { } - - void AddSlice(const TPart* part, const TSlice& slice, bool reusable) noexcept; - - public: - struct TReusable { - const TPart* Part; - TSlice Slice; - }; - - struct TResults { - TDeque<TReusable> Reusable; - size_t ExpectedSlices = 0; - }; - - TResults Build() noexcept; - - private: - // A lightweight version of either left or right slice boundary - struct TBoundary { - TCellsRef Key; - TRowId RowId; - bool Inclusive; - }; - - struct TLeftBoundary : public TBoundary { - TRowId BeginRowId() const { - return RowId + !Inclusive; - } - }; - - struct TRightBoundary : public TBoundary { - TRowId EndRowId() const { - return RowId + Inclusive; - } - }; - - struct TItemState { - const TPart* const Part; - const TSlice Slice; - NPage::TIndex::TIter NextPage; - TVector<TCell> NextKey; - TLeftBoundary First; - bool Reusable; - - TItemState(const TPart* part, const TSlice& slice, bool reusable) - : Part(part) - , Slice(slice) - , Reusable(reusable) - { } - - // Make sure this structure is stable in memory - TItemState(const TItemState&) = delete; - TItemState(TItemState&&) = delete; - - bool InitNextKey() noexcept; - TRowId GetNextRowId() const noexcept; - }; - - private: - const TKeyCellDefaults& KeyCellDefaults; - TDeque<TItemState> Items; - }; - - enum class EState { - Free, - Pending, - Compacting, - }; - - struct TCompactionTask { - EState State = EState::Free; - ui32 Priority = 0; - ui64 TaskId = 0; - TInstant SubmissionTimestamp; - ui64 CompactionId = 0; - }; - - /** - * This structure is pinned in memory as long as strategy is alive - */ - struct TTableInfo { - // Current row scheme of the table, updated on schema changes - TIntrusiveConstPtr<TRowScheme> RowScheme; - - // Current known shard keys, changed on split/merge - THashMap<ui64, TSerializedCellVec> SplitKeys; - - // Last used key id (incremented for each new key) - ui64 LastSplitKey = 0; - - // Last used shard id (incremented for each new shard) - ui64 LastShardId = 0; - - // A set of undesirable part labels we would like to compact - THashSet<TLogoBlobID> GarbageParts; - }; - - /** - * This structure describes everything known about a part within a shard - */ - struct TTablePart { - struct TSliceId { - const TRowId Begin; - const TRowId End; - - TSliceId(TRowId begin, TRowId end) - : Begin(begin) - , End(end) - { } - - TSliceId(const TSlice& slice) - : Begin(slice.BeginRowId()) - , End(slice.EndRowId()) - { } - - friend bool operator<(const TSliceId& a, const TSliceId& b) { - return a.End <= b.Begin; - } - }; - - struct TItem { - // A single slice description - TSlice Slice; - - // Stats for the above slice (upper bound) - TStats Stats; - - // A stable iterator into the level where slice is currently placed - TLevels::iterator Level; - - // A stable iterator into the run where slice is currently placed - TRun::iterator Position; - }; - - // Reference to the part - TIntrusiveConstPtr<TPart> Part; - - // A sorted run of used slices - TMap<TSliceId, TItem> Slices; - }; - - /** - * This structure describes a single shard of the table - */ - struct TTableShard - : public TIntrusiveListItem<TTableShard> - { - // Unique shard id, primarily used in html and broker task names - ui64 Id = 0; - - // Unique per-table id of left and right boundary keys - // The special value 0 is used as a marker for +/- infinity - ui64 LeftKey = 0; - ui64 RightKey = 0; - - // Full map of part slices that are fully inside this shard - THashMap<TLogoBlobID, TTablePart> Parts; - - // Full stats of the shard, this includes every active part - TStats Stats; - - // Number of split operations that are currently blocking this shard - size_t SplitBlocks = 0; - - // Non-null when dynamic levels are fully formed - // TODO: may be better if this is not recomputed every time - THolder<TLevels> Levels; - - // Valid when Levels are non-null, cached stats for each level - TVector<TStats> PerLevelStats; - - // Valid when Levels are non-null, counts number of undesirable bytes - // on each level, which may be freed after compaction. - TVector<size_t> PerLevelGarbage; - - // The first level where invariants no longer hold - size_t FailingLevel = Max<size_t>(); - - // Current compaction task - TCompactionTask Task; - - void RegisterItem(const TTablePart& info, TTablePart::TItem& item, bool isGarbage) noexcept; - - bool FindSplitKey(TSerializedCellVec& foundKey, const TKeyCellDefaults& keyDefaults) const noexcept; - }; - - /** - * A result of a split of a single slice over a number of shards - */ - struct TSliceSplitResult { - TIntrusiveConstPtr<TPart> Part; - TSlice OldSlice; - TVector<TSlice> NewSlices; - TVector<TTableShard*> Shards; - }; - - /** - * Backend interface for consuming split results - */ - class ISliceSplitResultConsumer { - public: - virtual ~ISliceSplitResultConsumer() = default; - - /** - * Called when split completes successfully - */ - virtual void OnSliceSplitResult(TSliceSplitResult result) = 0; - }; - - /** - * A running split operation of a slice over a number of shards - * - * This object is only valid until operation has completed. The intrusive - * list is used while it's still pending so it may be cancelled. - */ - class TSliceSplitOp - : public ICompactionRead - , public TIntrusiveListItem<TSliceSplitOp> - { - public: - TSliceSplitOp( - ISliceSplitResultConsumer* consumer, - const TTableInfo* table, - TVector<TTableShard*> shards, - TIntrusiveConstPtr<TPart> part, - TSlice slice) - : Consumer(consumer) - , Table(table) - , Shards(std::move(shards)) - , Part(std::move(part)) - , Slice(std::move(slice)) - { - Y_VERIFY(Shards.size() >= 2, "You need at least two shards for a split"); - for (size_t idx = 1; idx < Shards.size(); ++idx) { - Y_VERIFY(Shards[idx-1]->RightKey == Shards[idx]->LeftKey, "Adjacent shards must have matching keys"); - Y_VERIFY(Shards[idx]->LeftKey != 0, "Adjacent shards must have a valid key"); - } - } - - bool Execute(IPages* env) override; - - void SetReadId(ui64 readId) { - Y_VERIFY(ReadId == 0, "Read id must be set exactly once"); - Y_VERIFY(readId != 0, "Attempting to assign an invalid read id"); - ReadId = readId; - } - - ui64 GetReadId() const { - Y_VERIFY(ReadId != 0, "Read id has not been set"); - return ReadId; - } - - public: - ISliceSplitResultConsumer* const Consumer; - const TTableInfo* const Table; - const TVector<TTableShard*> Shards; - const TIntrusiveConstPtr<TPart> Part; - const TSlice Slice; - - private: - ui64 ReadId = 0; - }; - - class TUnderlayMask final : public NPage::IKeySpace { - public: - TUnderlayMask(TIntrusiveConstPtr<TRowScheme> rowScheme, TVector<TBounds> bounds) - : RowScheme(std::move(rowScheme)) - , Bounds(std::move(bounds)) - { - Y_VERIFY(ValidateOrder(), "TUnderlayMask got bounds in an invalid order"); - Reset(); - } - - const TVector<TBounds>& GetBounds() const { - return Bounds; - } - - void Reset() noexcept override { - Position = Bounds.begin(); - } - - bool HasKey(TCellsRef key) noexcept override; - - public: - static THolder<TUnderlayMask> Build( - TIntrusiveConstPtr<TRowScheme> rowScheme, - TVector<const TBounds*>& input) noexcept; - - private: - bool ValidateOrder() const noexcept; - bool ValidatePosition(TCellsRef key) const noexcept; - - private: - TIntrusiveConstPtr<TRowScheme> RowScheme; - TVector<TBounds> Bounds; - TVector<TBounds>::const_iterator Position; - }; - - class TSplitKeys final : public NPage::ISplitKeys { - public: - using TKeysVec = TVector<TSerializedCellVec>; - - TSplitKeys(TIntrusiveConstPtr<TRowScheme> rowScheme, TKeysVec keys); - TSplitKeys(TIntrusiveConstPtr<TRowScheme> rowScheme, TVector<const TBounds*> bounds); - - const TKeysVec& GetKeys() const { - return Keys; - } - - void Reset() noexcept override { - Position = Keys.begin(); - } - - bool ShouldSplit(TCellsRef key) noexcept override; - - private: - bool ValidateOrder() const noexcept; - bool ValidatePosition(TCellsRef key) const noexcept; - bool IsInclusive(TKeysVec::const_iterator pos) const noexcept; - - private: - TIntrusiveConstPtr<TRowScheme> RowScheme; - TKeysVec Keys; - TKeysVec::const_iterator Position; - TVector<bool> Inclusive; - }; - - class TShardedCompactionParams final : public TCompactionParams { - public: - void Describe(IOutputStream& out) const noexcept override; - - public: - TTableShard* InputShard = nullptr; - TVector<TPartView> Original; - TVector<TPartView> Reused; - }; - - class TShardedCompactionStrategy final - : public ICompactionStrategy - , public ISliceSplitResultConsumer - { - public: - TShardedCompactionStrategy( - ui32 table, - ICompactionBackend* backend, - IResourceBroker* broker, - NUtil::ILogger* logger, - ITimeProvider* time, - TString taskNameSuffix) - : Table(table) - , Backend(backend) - , Broker(broker) - , Logger(logger) - , Time(time) - , TaskNameSuffix(std::move(taskNameSuffix)) - { - } - - void Start(TCompactionState state) override; - void Stop() override; - void ReflectSchema() override; - void ReflectRemovedRowVersions() override; - void UpdateCompactions() override; - float GetOverloadFactor() override; - ui64 GetBackingSize() override; - ui64 GetBackingSize(ui64 ownerTabletId) override; - ui64 BeginMemCompaction(TTaskId taskId, TSnapEdge edge, ui64 forcedCompactionId) override; - ui64 GetLastFinishedForcedCompactionId() const override { return 0; } // TODO! - TInstant GetLastFinishedForcedCompactionTs() const override { return TInstant::Zero(); } // TODO! - TCompactionChanges CompactionFinished( - ui64 compactionId, - THolder<TCompactionParams> params, - THolder<TCompactionResult> result) override; - void PartMerged(TPartView part, ui32 level) override; - void PartMerged(TIntrusiveConstPtr<TColdPart> part, ui32 level) override; - TCompactionChanges PartsRemoved(TArrayRef<const TLogoBlobID> parts) override; - TCompactionChanges ApplyChanges() override; - TCompactionState SnapshotState() override; - bool AllowForcedCompaction() override; - void OutputHtml(IOutputStream &out) override; - - void OnSliceSplitResult(TSliceSplitResult result) override; - - private: - /** - * Schedules an eventual ApplyChanges call - */ - void RequestChanges() noexcept; - - /** - * Check and schedule pending compactions - */ - void CheckCompactions() noexcept; - - /** - * Adds parts to current compaction state - * - * These parts don't necessarily have to be new parts, but it's - * important that during replacements old data is removed first. - */ - void AddParts(TVector<TPartView> parts); - - /** - * Returns existing or creates a new TTablePart for the specified part - */ - TTablePart* EnsurePart(TTableShard* shard, TIntrusiveConstPtr<TPart> part) noexcept; - - /** - * Reinitializes dynamic levels and related structures - */ - void RebuildLevels(TTableShard* shard) noexcept; - - /** - * Common helper for cancelling compaction tasks - */ - void CancelTask(TCompactionTask& task) noexcept; - - /** - * Checks current shard state and schedules compaction when necessary - */ - bool CheckShardCompaction(TTableShard* shard, bool schedule = true) noexcept; - - /** - * Checks table and shcedules forced compaction when necessary - */ - bool CheckForcedCompaction(bool schedule = true) noexcept; - - /** - * Called when shard compaction is ready to start - */ - void BeginShardCompaction(TTableShard* shard, TTaskId taskId) noexcept; - - /** - * Called when forced compaction is ready to start - */ - void BeginForcedCompaction(TTaskId taskId) noexcept; - - /** - * Serializes current state info (header) in a specified string - */ - void SerializeStateInfo(TString* out) const noexcept; - - private: - struct TNurseryItem { - TPartView PartView; - ui64 DataSize = 0; - }; - - struct TGlobalPart { - TIntrusiveConstPtr<TPart> Part; - TIntrusiveConstPtr<TSlices> Slices; - TVector<TTableShard*> Shards; - ui64 TotalSize = 0; - ui64 GarbageSize = 0; - size_t SplitBlocks = 0; - }; - - TGlobalPart* EnsureGlobalPart(const TIntrusiveConstPtr<TPart>& part, const TIntrusiveConstPtr<TSlices>& slices) noexcept; - void UpdateGarbageStats(TGlobalPart* allInfo) noexcept; - - private: - const ui32 Table; - ICompactionBackend* const Backend; - IResourceBroker* const Broker; - NUtil::ILogger* const Logger; - ITimeProvider* const Time; - const TString TaskNameSuffix; - - TTableInfo TableInfo; - TIntrusiveConstPtr<TCompactionPolicy> Policy; - TIntrusiveListWithAutoDelete<TTableShard, TDelete> Shards; - TIntrusiveList<TSliceSplitOp> PendingSliceSplits; - TVector<TSliceSplitResult> SliceSplitResults; - THashMap<TLogoBlobID, TGlobalPart> AllParts; // does not include nursery - ui64 AllBackingSize = 0; - - TVector<TIntrusiveConstPtr<TColdPart>> ColdParts; - - ui64 MemCompactionId = 0; - bool MemCompactionForced = false; - - TDeque<TNurseryItem> Nursery; - ui64 NurseryDataSize = 0; - size_t NurseryTaken = 0; - - TCompactionTask ForcedCompactionTask; - bool ForcedCompactionPending = false; - - bool RequestChangesPending = false; - }; - -} -} -} diff --git a/ydb/core/tablet_flat/flat_dbase_scheme.h b/ydb/core/tablet_flat/flat_dbase_scheme.h index 521ca85803c..f821b0f998a 100644 --- a/ydb/core/tablet_flat/flat_dbase_scheme.h +++ b/ydb/core/tablet_flat/flat_dbase_scheme.h @@ -180,8 +180,8 @@ public: if (auto *table = GetTableInfo(id)) { auto strategy = table->CompactionPolicy->CompactionStrategy; if (strategy != NKikimrSchemeOp::CompactionStrategyUnset) { - if (table->ColdBorrow && strategy == NKikimrSchemeOp::CompactionStrategySharded) { - // Sharded strategy does not support cold borrow + if (strategy == NKikimrSchemeOp::CompactionStrategySharded) { + // Sharded strategy doesn't exist anymore // Use the safe generational strategy instead strategy = NKikimrSchemeOp::CompactionStrategyGenerational; } diff --git a/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp b/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp index 407d460d1b3..fa5c0f211bc 100644 --- a/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp +++ b/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp @@ -331,15 +331,13 @@ THolder<NTable::ICompactionStrategy> TCompactionLogic::CreateStrategy( ui32 tableId, NKikimrSchemeOp::ECompactionStrategy strategy) { + Y_UNUSED(Logger); + switch (strategy) { case NKikimrSchemeOp::CompactionStrategyGenerational: return NTable::CreateGenCompactionStrategy( tableId, Backend, Broker, Time, TaskNameSuffix); - case NKikimrSchemeOp::CompactionStrategySharded: - return NTable::CreateShardedCompactionStrategy( - tableId, Backend, Broker, Logger, Time, TaskNameSuffix); - default: Y_FAIL("Unsupported strategy %s", NKikimrSchemeOp::ECompactionStrategy_Name(strategy).c_str()); } diff --git a/ydb/core/tablet_flat/flat_executor_ut.cpp b/ydb/core/tablet_flat/flat_executor_ut.cpp index 8b5d8b97e0c..b009c258d38 100644 --- a/ydb/core/tablet_flat/flat_executor_ut.cpp +++ b/ydb/core/tablet_flat/flat_executor_ut.cpp @@ -1809,116 +1809,6 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorSliceOverlapScan) { } -Y_UNIT_TEST_SUITE(TFlatTableExecutorShardedCompaction) { - - Y_UNIT_TEST(TestAutoSplit) { - TMyEnvBase env; - TRowsModel rows; - - //env->SetLogPriority(NKikimrServices::RESOURCE_BROKER, NActors::NLog::PRI_DEBUG); - //env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG); - - env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp)); - - TIntrusivePtr<TCompactionPolicy> policy = new TCompactionPolicy; - policy->InMemForceSizeToSnapshot = 1024 * 1024; - policy->CompactionStrategy = NKikimrSchemeOp::CompactionStrategySharded; - policy->ShardPolicy.SetMinSliceSize(0); - policy->ShardPolicy.SetMinShardSize(512 * 1024); - policy->ShardPolicy.SetMaxShardSize(5 * 1024 * 1024 / 2); - policy->ShardPolicy.SetNewDataPercentToCompact(99); - - env.SendSync(rows.MakeScheme(std::move(policy))); - - // Insert ~32MB of data, this will generate a 32MB slice, which - // will split into ~2MB pieces over multiple rounds. There should be - // at least 16 shards after this compaction settles. - env.SendSync(rows.RowTo(1000000).MakeRows(32768, 1024, 32768)); - env.WaitFor<NFake::TEvCompacted>(); - - // Insert 32MB more data, slices will lay on top and cause data to be - // resharded and then recompacted in parallel. Since it briefly has - // twice as much data, there should be 32 shards compacted in parallel. - env.SendSync(rows.RowTo(1000000).MakeRows(32768, 1024, 32768)); - env.WaitFor<NFake::TEvCompacted>(/* memtable */ 1 + /* shards */ 32); - - // If we didn't crash, then assume the test succeeded - env.SendSync(new TEvents::TEvPoison, false, true); - } - - Y_UNIT_TEST(TestAutoMerge) { - TMyEnvBase env; - TRowsModel rows; - - //env->SetLogPriority(NKikimrServices::RESOURCE_BROKER, NActors::NLog::PRI_DEBUG); - //env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG); - - env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp)); - - TIntrusivePtr<TCompactionPolicy> policy = new TCompactionPolicy; - policy->InMemForceSizeToSnapshot = 1024; - policy->CompactionStrategy = NKikimrSchemeOp::CompactionStrategySharded; - policy->ShardPolicy.SetMinSliceSize(0); - policy->ShardPolicy.SetMinShardSize(512 * 1024); - policy->ShardPolicy.SetMaxShardSize(5 * 1024 * 1024 / 2); - policy->ShardPolicy.SetNewRowsPercentToCompact(99); - - env.SendSync(rows.MakeScheme(std::move(policy))); - - // Insert ~32MB of data, this will generate a 32MB slice, which - // will split into ~2MB pieces over multiple rounds. There should be - // at least 16 shards after this compaction settles. - env.SendSync(rows.RowTo(1000000).MakeRows(32768, 1024, 32768)); - env.WaitFor<NFake::TEvCompacted>(); - - // Erase all previously inserted rows, this will force every shard to - // be compacted, after which all of them will merge back into a single - // shard. - env.SendSync(rows.RowTo(1000000).MakeErase(32768, 32768)); - env.WaitFor<NFake::TEvCompacted>(/* memtable */ 1 + /* shards */ 16); - - // If we didn't crash, then assume the test succeeded - env.SendSync(new TEvents::TEvPoison, false, true); - } - - Y_UNIT_TEST(TestSnapshotEraseMarkers) { - TMyEnvBase env; - TRowsModel rows; - - //env->SetLogPriority(NKikimrServices::RESOURCE_BROKER, NActors::NLog::PRI_DEBUG); - //env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG); - - env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp)); - - TIntrusivePtr<TCompactionPolicy> policy = new TCompactionPolicy; - policy->InMemForceSizeToSnapshot = 1024 * 1024; - policy->CompactionStrategy = NKikimrSchemeOp::CompactionStrategySharded; - policy->ShardPolicy.SetMinSliceSize(0); - policy->ShardPolicy.SetMinShardSize(512 * 1024); - policy->ShardPolicy.SetMaxShardSize(5 * 1024 * 1024 / 2); - policy->ShardPolicy.SetNewRowsPercentToCompact(99); - - env.SendSync(rows.MakeScheme(std::move(policy))); - - // Insert some rows - env.SendSync(rows.RowTo(1000000).MakeRows(16, 32)); - { - auto snapshot = TSnapshotModel::Create(); - env.SendSync(snapshot->Start()); - UNIT_ASSERT_VALUES_EQUAL(snapshot->Result().Flatten.size(), 1u); - } - - // Erase some rows - env.SendSync(rows.RowTo(1000000).MakeErase(16)); - { - auto snapshot = TSnapshotModel::Create(); - env.SendSync(snapshot->Start()); - UNIT_ASSERT_VALUES_EQUAL(snapshot->Result().Flatten.size(), 2u); - } - } - -} - Y_UNIT_TEST_SUITE(TFlatTableExecutorColumnGroups) { struct TTxSelectRows : public ITransaction { diff --git a/ydb/core/tablet_flat/flat_part_charge.h b/ydb/core/tablet_flat/flat_part_charge.h index 4ed618db6e0..ddc743f9c77 100644 --- a/ydb/core/tablet_flat/flat_part_charge.h +++ b/ydb/core/tablet_flat/flat_part_charge.h @@ -168,65 +168,6 @@ namespace NTable { } /** - * Precharges data around the specified splitKey - * - * This method will ensure pages with the first key >= splitKey and the - * last key < splitKey are precharged. This method will not try to - * load pages outside of [beginRowId, endRowId) range. - */ - bool SplitKey(const TCells splitKey, const TKeyCellDefaults& keyDefaults, - const TRowId beginRowId, const TRowId endRowId) const noexcept - { - Y_VERIFY_DEBUG(beginRowId < endRowId, "Unexpected empty row range"); - Y_VERIFY_DEBUG(!Groups, "Unexpected column groups during SplitKey precharge"); - - auto index = Index.TryLoadRaw(); - if (!index) { - return false; - } - - bool ready = true; - - // The first page that may contain splitKey - auto found = index->LookupKey(splitKey, Scheme.Groups[0], ESeek::Lower, &keyDefaults); - - // Note: as we may have cut index key we may both need prev and next pages - - if (auto prev = found; prev.Off() && --prev) { - TRowId pageBegin = prev->GetRowId(); - TRowId pageEnd = found ? found->GetRowId() : index->GetEndRowId(); - if (pageBegin < endRowId && beginRowId < pageEnd) { - ready &= bool(Env->TryGetPage(Part, prev->GetPageId())); - } - } - - if (found && found->GetRowId() < endRowId) { - bool needNext = true; - if (found->GetRowId() < beginRowId) { - // iterator may re-seek to the first page that's in range - auto adjusted = index->LookupRow(beginRowId, found); - if (found != adjusted) { - found = adjusted; - needNext = false; - } - } - if (found) { - ready &= bool(Env->TryGetPage(Part, found->GetPageId())); - } - if (needNext) { - // splitKey may be on the next page - if (auto next = found; ++next) { - if (next->GetRowId() < endRowId) { - ready &= bool(Env->TryGetPage(Part, next->GetPageId())); - } - } - } - } - - return ready; - } - - /** * Precharges data for rows between row1 and row2 inclusive * * Important caveat: assumes iteration won't touch any row > row2 diff --git a/ydb/core/tablet_flat/flat_stat_part.cpp b/ydb/core/tablet_flat/flat_stat_part.cpp deleted file mode 100644 index 5fff09a577e..00000000000 --- a/ydb/core/tablet_flat/flat_stat_part.cpp +++ /dev/null @@ -1,98 +0,0 @@ -#include "flat_stat_part.h" - -namespace NKikimr { -namespace NTable { - -ui64 TPartSmallSizeHelper::CalcSize(TRowId begin, TRowId end) -{ - if (!Small || end <= begin) { - return 0; - } - if (EndRow <= begin) { - // Resynchronize so we start from an already known end - BeginRow = EndRow; - BeginPage = EndPage; - Size = 0; - } - if (BeginRow != begin) { - if (begin > BeginRow) { - // Move starting position forward - BeginPage = Small->Lower(begin, BeginPage, Max<ui32>()); - } else { - // Move starting position backwards (shouldn't happen normally) - BeginPage = Small->Lower(begin, 0, BeginPage); - } - BeginRow = begin; - EndRow = begin; - EndPage = BeginPage; - Size = 0; - } else if (end < EndRow) { - // We seem to have some extra rows, dial back - EndRow = BeginRow; - EndPage = BeginPage; - Size = 0; - } - Y_VERIFY(EndRow <= end); - if (EndRow < end) { - while (auto& rel = Small->Relation(EndPage)) { - if (rel.Row < end) { - Size += rel.Size; - ++EndPage; - } else { - Y_VERIFY(rel.IsHead(), "Got unaligned NPage::TFrames head record"); - break; - } - } - EndRow = end; - } - return Size; -} - -ui64 TPartGroupSizeHelper::CalcSize(TRowId begin, TRowId end) -{ - if (end <= begin) { - return 0; - } - if (EndRow <= begin) { - // Start searching from an already known end - BeginRow = EndRow = begin; - Begin = End = Index.LookupRow(BeginRow, End); - Size = 0; - Y_VERIFY(Begin, "Unexpected failure to find an index record"); - } else if (BeginRow != begin) { - // Start searching from a previous start - BeginRow = EndRow = begin; - Begin = End = Index.LookupRow(BeginRow, Begin); - Size = 0; - Y_VERIFY(Begin, "Unexpected failure to find an index record"); - } else if (EndRow > end) { - // We seem to have some extra rows, dial back - EndRow = BeginRow; - End = Begin; - Size = 0; - } - Y_VERIFY(EndRow <= end); - if (EndRow < end) { - while (End && End->GetRowId() < end) { - Size += Part->GetPageSize(End->GetPageId(), GroupId); - ++End; - } - EndRow = end; - } - return Size; -} - -ui64 TPartDataSizeHelper::CalcSize(TRowId begin, TRowId end) -{ - if (begin >= PartEndRowId || end <= begin) { - return 0; - } - ui64 size = SmallHelper.CalcSize(begin, end); - for (auto& g : GroupHelpers) { - size += g.CalcSize(begin, end); - } - return size; -} - -} -} diff --git a/ydb/core/tablet_flat/flat_stat_part.h b/ydb/core/tablet_flat/flat_stat_part.h index 212101611f2..b0e036768b6 100644 --- a/ydb/core/tablet_flat/flat_stat_part.h +++ b/ydb/core/tablet_flat/flat_stat_part.h @@ -30,87 +30,6 @@ struct TPartDataStats { TPartDataSize DataSize = { }; }; -/** - * Helper for calculating small blobs size between a pair of rows - */ -class TPartSmallSizeHelper { -public: - TPartSmallSizeHelper(const TPart* part) - : Small(part->Small.Get()) - { - } - - /** - * Returns size of small blobs in bytes between begin and end (not inclusive) - */ - ui64 CalcSize(TRowId begin, TRowId end); - -private: - const NPage::TFrames* Small; - TRowId BeginRow = 0; - TRowId EndRow = 0; - ui32 BeginPage = 0; - ui32 EndPage = 0; - ui64 Size = 0; -}; - -/** - * Helper for calculating column group size between a pair of rows - */ -class TPartGroupSizeHelper { -public: - TPartGroupSizeHelper(const TPart* part, NPage::TGroupId groupId) - : Part(part) - , GroupId(groupId) - , Index(Part->GetGroupIndex(groupId)) - , Begin(Index->Begin()) - , End(Begin) - { - Y_VERIFY(Begin, "Cannot find the first index page"); - } - - /** - * Returns size of group in bytes between begin and end (not inclusive) - */ - ui64 CalcSize(TRowId begin, TRowId end); - -private: - const TPart* Part; - const NPage::TGroupId GroupId; - const NPage::TIndex& Index; - NPage::TIndex::TIter Begin; - NPage::TIndex::TIter End; - TRowId BeginRow = 0; - TRowId EndRow = 0; - ui64 Size = 0; -}; - -/** - * Helper for calculating upper bound size of part between a pair of rows - */ -class TPartDataSizeHelper { -public: - TPartDataSizeHelper(const TPart* part) - : PartEndRowId(part->Index.GetEndRowId()) - , SmallHelper(part) - { - GroupHelpers.reserve(part->GroupsCount); - for (ui32 group : xrange(part->GroupsCount)) { - GroupHelpers.emplace_back(part, NPage::TGroupId(group)); - } - } - - /** - * Returns size of part in bytes between begin and end (not inclusive) - */ - ui64 CalcSize(TRowId begin, TRowId end); - -private: - const TRowId PartEndRowId; - TPartSmallSizeHelper SmallHelper; - TSmallVec<TPartGroupSizeHelper> GroupHelpers; -}; - // Iterates over part index and calculates total row count and data size // NOTE: we don't know row count for the last page so we also ignore its size // This shouldn't be a problem for big parts with many pages diff --git a/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt index 9b99d9f774b..98947f97796 100644 --- a/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt @@ -58,7 +58,6 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_sausage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_stat.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_comp_gen.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_comp_shard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_compaction.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_compaction_multi.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_charge.cpp diff --git a/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt b/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt index 4b6443427c2..42bf30b3bad 100644 --- a/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt @@ -61,7 +61,6 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_sausage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_stat.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_comp_gen.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_comp_shard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_compaction.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_compaction_multi.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_charge.cpp diff --git a/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt index 4323c2f2a36..af845e1051f 100644 --- a/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt @@ -62,7 +62,6 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_sausage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_stat.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_comp_gen.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_comp_shard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_compaction.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_compaction_multi.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_charge.cpp diff --git a/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt index 3252f6dd46f..ff33e86aa7a 100644 --- a/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt @@ -51,7 +51,6 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_sausage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_stat.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_comp_gen.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_comp_shard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_compaction.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_compaction_multi.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_charge.cpp diff --git a/ydb/core/tablet_flat/ut/ut_comp_shard.cpp b/ydb/core/tablet_flat/ut/ut_comp_shard.cpp deleted file mode 100644 index aa69db2495e..00000000000 --- a/ydb/core/tablet_flat/ut/ut_comp_shard.cpp +++ /dev/null @@ -1,1556 +0,0 @@ -#include "flat_comp_ut_common.h" - -#include <ydb/core/tablet_flat/flat_comp_shard.h> - -#include <ydb/core/tablet_flat/test/libs/table/test_envs.h> -#include <ydb/core/tablet_flat/test/libs/rows/cook.h> -#include <ydb/core/tablet_flat/test/libs/rows/layout.h> -#include <ydb/core/tablet_flat/test/libs/rows/heap.h> -#include <ydb/core/tablet_flat/test/libs/rows/rows.h> -#include <ydb/core/tablet_flat/test/libs/rows/tool.h> -#include <ydb/core/tablet_flat/test/libs/table/wrap_part.h> -#include <ydb/core/tablet_flat/test/libs/table/test_comp.h> -#include <ydb/core/tablet_flat/test/libs/table/test_writer.h> - -#include <ydb/core/tablet_flat/protos/flat_table_shard.pb.h> -#include <ydb/core/tablet_flat/flat_cxx_database.h> -#include <ydb/core/util/pb.h> - -#include <library/cpp/testing/unittest/registar.h> - -#include <util/generic/hash.h> -#include <util/generic/hash_set.h> - -#include <optional> - -namespace NKikimr { -namespace NTable { -namespace NCompShard { - -using namespace NTest; - -namespace { - - /** - * A special kind of TTestEnv that will fail loading every page until - * Load() call, and then would only pass previously requested pages - */ - class TStrictEnv : public TTestEnv { - public: - const TSharedData *TryGetPage(const TPart *part, TPageId ref, TGroupId groupId) override { - ui64 token = ref | (ui64(groupId.Raw()) << 32); - auto& info = Parts[part]; - - info.Touched.insert(token); - - if (info.Loaded.contains(token)) { - return TTestEnv::TryGetPage(part, ref, groupId); - } - - return nullptr; - } - - void Load() { - for (auto &p: Parts) { - p.second.Loaded = std::move(p.second.Touched); - } - } - - private: - struct TPartInfo { - THashSet<ui64> Touched; - THashSet<ui64> Loaded; - }; - - private: - THashMap<const TPart*, TPartInfo> Parts; - }; - - struct TSimpleConsumer : public ISliceSplitResultConsumer { - std::optional<TSliceSplitResult> Result; - - void OnSliceSplitResult(TSliceSplitResult result) override { - Y_VERIFY(!Result, "Cannot store multiple results"); - Result.emplace(std::move(result)); - } - }; - - NPage::TConf CreateConf(ui32 rowsPerPage) { - NPage::TConf conf(false, 8192); - conf.Group(0).PageRows = rowsPerPage; - return conf; - } - - NPage::TConf CreateConf(ui32 rowsPerPage, NPage::IKeySpace* underlayMask) { - NPage::TConf conf = CreateConf(rowsPerPage); - conf.UnderlayMask = underlayMask; - return conf; - } - - NPage::TConf CreateConf(ui32 rowsPerPage, NPage::ISplitKeys* splitKeys) { - NPage::TConf conf = CreateConf(rowsPerPage); - conf.SplitKeys = splitKeys; - return conf; - } - - TPartView CreatePart(const TLayoutCook& lay, const TRowsHeap& rows, ui32 rowsPerPage) { - return TPartCook(lay, CreateConf(rowsPerPage)) - .Add(rows.begin(), rows.end()) - .Finish() - .ToPartView(); - } - - struct TSizeChange { - ui64 AddLeftSize = 0; - ui64 AddLeftRows = 0; - ui64 SubRightSize = 0; - ui64 SubRightRows = 0; - - TSizeChange& AddLeft(ui64 leftSize, ui64 leftRows) { - AddLeftSize += leftSize; - AddLeftRows += leftRows; - return *this; - } - - TSizeChange& SubRight(ui64 rightSize, ui64 rightRows) { - SubRightSize += rightSize; - SubRightRows += rightRows; - return *this; - } - - TSizeChange& Expect() { - return *this; - } - }; - - using TSizeChanges = TMap<ui64, TSizeChange>; - - struct TSizeStat { - ui64 Key; - ui64 LeftSize; - ui64 LeftRows; - ui64 RightSize; - ui64 RightRows; - - bool operator==(const TSizeStat& o) const { - return ( - Key == o.Key && - LeftSize == o.LeftSize && - LeftRows == o.LeftRows && - RightSize == o.RightSize && - RightRows == o.RightRows); - } - - friend IOutputStream& operator<<(IOutputStream& out, const TSizeStat& v) { - out << "key=" << v.Key - << " left=" << v.LeftSize << "b/" << v.LeftRows << "r" - << " right=" << v.RightSize << "b/" << v.RightRows << "r"; - return out; - } - - friend IOutputStream& operator<<(IOutputStream& out, const TVector<TSizeStat>& v) { - out << '{'; - bool first = true; - for (auto& value : v) { - if (first) { - first = false; - } else { - out << ','; - } - out << ' '; - out << value; - } - out << " }"; - return out; - } - }; - - void VerifySizeChanges(TSplitStatIterator& it, const TSizeChanges& sizeChanges) { - TVector<TSizeStat> expected; - { - ui64 leftSize = 0; - ui64 leftRows = 0; - ui64 rightSize = it.RightSize(); - ui64 rightRows = it.RightRows(); - for (auto& kv : sizeChanges) { - leftSize += kv.second.AddLeftSize; - leftRows += kv.second.AddLeftRows; - rightSize -= kv.second.SubRightSize; - rightRows -= kv.second.SubRightRows; - expected.push_back({ kv.first, leftSize, leftRows, rightSize, rightRows }); - } - } - - TVector<TSizeStat> actual; - while (it.Next()) { - auto keyCells = it.CurrentKey(); - UNIT_ASSERT_VALUES_EQUAL(keyCells.size(), 1u); - auto key = keyCells[0].AsValue<ui64>(); - actual.push_back({ key, it.LeftSize(), it.LeftRows(), it.RightSize(), it.RightRows() }); - } - - auto getFirstDiffKey = [&]() -> ui64 { - auto a = actual.begin(); - auto b = expected.begin(); - while (a != actual.end() && b != expected.end()) { - if (!(*a == *b)) { - return Min(a->Key, b->Key); - } - ++a; - ++b; - } - if (a != actual.end()) { - return a->Key; - } - if (b != expected.end()) { - return b->Key; - } - return Max<ui64>(); - }; - - UNIT_ASSERT_C(actual == expected, - "Diff at " << getFirstDiffKey() - << ": got " << actual - << ", expected " << expected); - } - -} - -Y_UNIT_TEST_SUITE(TShardedCompaction) { - - Y_UNIT_TEST(SplitSingleKey) { - TLayoutCook lay; - lay - .Col(0, 0, NScheme::NTypeIds::Uint64) - .Col(0, 1, NScheme::NTypeIds::Uint32) - .Col(0, 8, NScheme::NTypeIds::Uint32) - .Key({ 0, 1 }); - - TRowTool tool(*lay); - TRowsHeap rows(64 * 1024); - for (ui64 seq = 0; seq < 4*4; ++seq) { - rows.Put(*TSchemedCookRow(*lay).Col(seq, 500_u32, 42_u32)); - } - - auto partView = CreatePart(lay, rows, 4); - - for (size_t beginRow = 0; beginRow < rows.Size(); ++beginRow) { - for (size_t endRow = beginRow + 1; endRow <= rows.Size(); ++endRow) { - for (ui64 flags = 0; flags < 4; ++flags) { - TSlice slice; - slice.FirstInclusive = (flags & 1) == 1; - slice.LastInclusive = (flags & 2) == 2; - if (!slice.FirstInclusive && beginRow == 0) { - continue; - } - if (!slice.LastInclusive && endRow == rows.Size()) { - continue; - } - if (!slice.FirstInclusive && !slice.LastInclusive && endRow - beginRow <= 1) { - continue; - } - slice.FirstRowId = beginRow - !slice.FirstInclusive; - slice.LastRowId = endRow - slice.LastInclusive; - slice.FirstKey = TSerializedCellVec(tool.KeyCells(rows[slice.FirstRowId])); - slice.LastKey = TSerializedCellVec(tool.KeyCells(rows[slice.LastRowId])); - - for (ui64 splitRow = slice.FirstRowId; splitRow <= slice.LastRowId; ++splitRow) { - for (ui64 splitFlags = 0; splitFlags < 2; ++splitFlags) { - bool moveLeft = (splitFlags & 1) == 1; - if (splitRow == slice.FirstRowId && (moveLeft || !slice.FirstInclusive)) { - continue; - } - - auto splitCells = tool.KeyCells(rows[splitRow]); - if (moveLeft) { - splitCells[1] = Cimple(0_u32); - } - - TTableInfo table; - table.RowScheme = lay.RowScheme(); - table.SplitKeys[1] = TSerializedCellVec(splitCells); - TTableShard left; - TTableShard right; - left.RightKey = 1; - right.LeftKey = 1; - TVector<TTableShard*> pshards; - pshards.push_back(&left); - pshards.push_back(&right); - - TStrictEnv env; - TSimpleConsumer consumer; - TSliceSplitOp op(&consumer, &table, pshards, partView.Part, slice); - - // load index - bool ok1 = op.Execute(&env); - UNIT_ASSERT_VALUES_EQUAL(ok1, false); - env.Load(); - - // load data - bool ok2 = op.Execute(&env); - UNIT_ASSERT_VALUES_EQUAL(ok2, false); - env.Load(); - - bool ok3 = op.Execute(&env); - UNIT_ASSERT_VALUES_EQUAL(ok3, true); - - auto& result = consumer.Result.value(); - size_t pos = 0; - if (beginRow < splitRow) { - TSlice output = result.NewSlices.at(pos++); - UNIT_ASSERT_VALUES_EQUAL(output.BeginRowId(), beginRow); - UNIT_ASSERT_VALUES_EQUAL(output.EndRowId(), splitRow); - UNIT_ASSERT(output.LastInclusive); - if (output.Rows() == 1) { - UNIT_ASSERT(output.FirstInclusive); - } - } - if (splitRow < endRow) { - TSlice output = result.NewSlices.at(pos++); - UNIT_ASSERT_VALUES_EQUAL(output.BeginRowId(), splitRow); - UNIT_ASSERT_VALUES_EQUAL(output.EndRowId(), endRow); - UNIT_ASSERT(output.FirstInclusive); - if (output.Rows() == 1) { - UNIT_ASSERT(output.LastInclusive); - } - } - UNIT_ASSERT_VALUES_EQUAL(result.NewSlices.size(), pos); - } - } - } - } - } - } - - Y_UNIT_TEST(SplitMiddleEdgeCase) { - TLayoutCook lay; - lay - .Col(0, 0, NScheme::NTypeIds::Uint64) - .Col(0, 1, NScheme::NTypeIds::Uint32) - .Col(0, 8, NScheme::NTypeIds::Uint32) - .Key({ 0, 1 }); - - TRowTool tool(*lay); - TRowsHeap rows(64 * 1024); - for (ui64 seq = 0; seq < 4*4+2; ++seq) { - rows.Put(*TSchemedCookRow(*lay).Col(seq, 500_u32, 42_u32)); - } - - auto partView = CreatePart(lay, rows, 4); - - TSlice slice; - slice.FirstRowId = 0; - slice.FirstInclusive = true; - slice.LastRowId = rows.Size() - 1; - slice.LastInclusive = true; - slice.FirstKey = TSerializedCellVec(tool.KeyCells(rows[slice.FirstRowId])); - slice.LastKey = TSerializedCellVec(tool.KeyCells(rows[slice.LastRowId])); - - TRowId splitRow = rows.Size() / 2; - - TTableInfo table; - table.RowScheme = lay.RowScheme(); - TIntrusiveListWithAutoDelete<TTableShard, TDelete> shards; - shards.PushBack(new TTableShard); - for (ui32 k = 1; k <= 5; ++k) { - auto cells = tool.KeyCells(rows[splitRow]); - cells[1] = Cimple(k); - table.SplitKeys[k] = TSerializedCellVec(cells); - auto* left = shards.Back(); - shards.PushBack(new TTableShard); - auto* right = shards.Back(); - left->RightKey = k; - right->LeftKey = k; - } - TVector<TTableShard*> pshards; - for (auto& shard : shards) { - pshards.push_back(&shard); - } - - TStrictEnv env; - TSimpleConsumer consumer; - TSliceSplitOp op(&consumer, &table, pshards, partView.Part, slice); - - // load index - bool ok1 = op.Execute(&env); - UNIT_ASSERT_VALUES_EQUAL(ok1, false); - env.Load(); - - // load data - bool ok2 = op.Execute(&env); - UNIT_ASSERT_VALUES_EQUAL(ok2, false); - env.Load(); - - bool ok3 = op.Execute(&env); - UNIT_ASSERT_VALUES_EQUAL(ok3, true); - - auto& result = consumer.Result.value(); - UNIT_ASSERT_VALUES_EQUAL(result.NewSlices.size(), 2u); - UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[0].FirstRowId, 0u); - UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[0].LastRowId, splitRow-1); - UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[1].FirstRowId, splitRow); - UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[1].LastRowId, rows.Size()-1); - } - - Y_UNIT_TEST(SplitOutOfBoundsKeys) { - TLayoutCook lay; - lay - .Col(0, 0, NScheme::NTypeIds::Uint64) - .Col(0, 8, NScheme::NTypeIds::Uint32) - .Key({ 0 }); - - TRowTool tool(*lay); - TRowsHeap rows(64 * 1024); - for (ui64 seq = 0; seq < 10*4+3; ++seq) { - rows.Put(*TSchemedCookRow(*lay).Col(1000 + seq, 42_u32)); - } - - auto partView = CreatePart(lay, rows, 4); - - TSlice slice; - slice.FirstRowId = 3; // the first 4 rows are not included - slice.FirstInclusive = false; - slice.LastRowId = rows.Size() - 3; // the last 3 rows are not included - slice.LastInclusive = false; - slice.FirstKey = TSerializedCellVec(tool.KeyCells(rows[slice.FirstRowId])); - slice.LastKey = TSerializedCellVec(tool.KeyCells(rows[slice.LastRowId])); - - TRowId splitRow = rows.Size() / 2; - TVector<TSerializedCellVec> splitKeys; - splitKeys.emplace_back(tool.KeyCells(*TSchemedCookRow(*lay).Col(500_u64, 42_u32))); - splitKeys.emplace_back(tool.KeyCells(rows[splitRow])); - splitKeys.emplace_back(tool.KeyCells(*TSchemedCookRow(*lay).Col(5000_u64, 42_u32))); - - TTableInfo table; - table.RowScheme = lay.RowScheme(); - TIntrusiveListWithAutoDelete<TTableShard, TDelete> shards; - shards.PushBack(new TTableShard); - for (size_t k = 0; k < splitKeys.size(); ++k) { - table.SplitKeys[k+1] = splitKeys[k]; - auto* left = shards.Back(); - shards.PushBack(new TTableShard); - auto* right = shards.Back(); - left->RightKey = k + 1; - right->LeftKey = k + 1; - } - - TVector<TTableShard*> pshards; - for (auto& shard : shards) { - pshards.push_back(&shard); - } - - TStrictEnv env; - TSimpleConsumer consumer; - TSliceSplitOp op(&consumer, &table, pshards, partView.Part, slice); - - // load index - bool ok1 = op.Execute(&env); - UNIT_ASSERT_VALUES_EQUAL(ok1, false); - env.Load(); - - // load data - bool ok2 = op.Execute(&env); - UNIT_ASSERT_VALUES_EQUAL(ok2, false); - env.Load(); - - bool ok3 = op.Execute(&env); - UNIT_ASSERT_VALUES_EQUAL(ok3, true); - - auto& result = consumer.Result.value(); - UNIT_ASSERT_VALUES_EQUAL(result.NewSlices.size(), 2u); - - // The first row inclusion is currently a side effect - UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[0].FirstRowId, 4u); - UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[0].FirstInclusive, true); - UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[0].LastRowId, splitRow - 1); - UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[0].LastInclusive, true); - - // The last row is not included just like in the original - UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[1].FirstRowId, splitRow); - UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[1].FirstInclusive, true); - UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[1].LastRowId, rows.Size() - 3); - UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[1].LastInclusive, false); - } - - Y_UNIT_TEST(SplitStatSimple) { - TLayoutCook lay; - lay - .Col(0, 0, NScheme::NTypeIds::Uint64) - .Col(0, 8, NScheme::NTypeIds::Uint32) - .Key({ 0 }); - - const ui64 pageCount = 16; - const ui64 rowsPerPage = 4; - - TRowTool tool(*lay); - TRowsHeap rows(64 * 1024); - for (ui64 seq = 0; seq < rowsPerPage * pageCount; ++seq) { - rows.Put(*TSchemedCookRow(*lay).Col(1000 + seq, 42_u32)); - } - - auto partView = CreatePart(lay, rows, rowsPerPage); - UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 1u); - - const auto* part = partView.Part.Get(); - const auto slice = partView.Slices->front(); - - // All pages are expected to have the same size - auto pageSize = part->GetPageSize(part->Index->Begin()->GetPageId()); - - for (int count = 1; count <= 3; ++count) { - TSplitStatIterator it(*(*lay).Keys); - - // It shouldn't matter that we're adding the same slice - // What matters is they all produce the exact same key - for (int i = 0; i < count; ++i) { - it.AddSlice(part, slice, pageSize * pageCount); - } - - ui64 keyIndex = 0; - ui64 leftSize = 0; - ui64 leftRows = 0; - ui64 rightSize = pageSize * pageCount * count; - ui64 rightRows = rowsPerPage * pageCount * count; - - while (it.Next()) { - // The first key is expected to be the first slice key - auto key = it.CurrentKey(); - UNIT_ASSERT_VALUES_EQUAL(key.size(), 1u); - auto expected = tool.KeyCells(rows[keyIndex]); - UNIT_ASSERT_VALUES_EQUAL(expected.size(), 1u); - - UNIT_ASSERT_VALUES_EQUAL(key[0].AsValue<ui64>(), expected[0].AsValue<ui64>()); - UNIT_ASSERT_VALUES_EQUAL(it.LeftSize(), leftSize); - UNIT_ASSERT_VALUES_EQUAL(it.RightSize(), rightSize); - UNIT_ASSERT_VALUES_EQUAL(it.LeftRows(), leftRows); - UNIT_ASSERT_VALUES_EQUAL(it.RightRows(), rightRows); - - keyIndex += rowsPerPage; - leftSize += pageSize * count; - leftRows += rowsPerPage * count; - rightSize -= pageSize * count; - rightRows -= rowsPerPage * count; - } - - UNIT_ASSERT_VALUES_EQUAL(keyIndex, rowsPerPage * pageCount); - } - } - - Y_UNIT_TEST(SplitStatComplex) { - TLayoutCook lay; - lay - .Col(0, 0, NScheme::NTypeIds::Uint64) - .Col(0, 8, NScheme::NTypeIds::Uint32) - .Key({ 0 }); - - const ui64 pageCount = 16; - - TRowsHeap rows1(64 * 1024); - for (ui64 seq = 0; seq < 4 * pageCount; ++seq) { - rows1.Put(*TSchemedCookRow(*lay).Col(1000 + seq, 42_u32)); - } - - TRowsHeap rows2(64 * 1024); - for (ui64 seq = 0; seq < 2 * pageCount; ++seq) { - rows2.Put(*TSchemedCookRow(*lay).Col(1002 + seq * 2, 42_u32)); - } - - TRowsHeap rows3(64 * 1024); - for (ui64 seq = 0; seq < 1 * pageCount; ++seq) { - rows3.Put(*TSchemedCookRow(*lay).Col(1003 + seq * 4, 42_u32)); - } - - auto partView1 = CreatePart(lay, rows1, 4); - auto partView2 = CreatePart(lay, rows2, 2); - auto partView3 = CreatePart(lay, rows3, 1); - for (const auto& partView : { partView1, partView2, partView3 }) { - UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 1u); - } - - auto pageSize1 = partView1.Part->GetPageSize(partView1.Part->Index->Begin()->GetPageId()); - auto pageSize2 = partView2.Part->GetPageSize(partView2.Part->Index->Begin()->GetPageId()); - auto pageSize3 = partView3.Part->GetPageSize(partView3.Part->Index->Begin()->GetPageId()); - - // The expected keys are: - // 1000, 1002, 1003, 1004, 1006, 1007, 1008, ... - TSizeChanges sizeChanges; - - for (ui64 page = 0; page < pageCount; ++page) { - // 1000, 1004, 1008, ... - sizeChanges[1000 + page * 4] - .Expect(); - - // 1002, 1006, 1010, ... - sizeChanges[1002 + page * 4] - .AddLeft(pageSize1, 4); - - // 1003, 1007, 1011, ... - sizeChanges[1003 + page * 4] - .AddLeft(pageSize2, 2); - - if (page > 0) { - // 1004, 1008, ... - sizeChanges[1000 + page * 4] - .SubRight(pageSize1, 4) - .AddLeft(pageSize3, 1) - .SubRight(pageSize3, 1); - - // 1006, 1010, ... - sizeChanges[1002 + page * 4] - .SubRight(pageSize2, 2); - } - } - - TSplitStatIterator it(*(*lay).Keys); - it.AddSlice(partView1.Part.Get(), partView1.Slices->front(), pageSize1 * pageCount); - it.AddSlice(partView2.Part.Get(), partView2.Slices->front(), pageSize2 * pageCount); - it.AddSlice(partView3.Part.Get(), partView3.Slices->front(), pageSize3 * pageCount); - VerifySizeChanges(it, sizeChanges); - } - - Y_UNIT_TEST(SplitStatNextVersusStartStop) { - TLayoutCook lay; - lay - .Col(0, 0, NScheme::NTypeIds::Uint64) - .Col(0, 8, NScheme::NTypeIds::Uint32) - .Key({ 0 }); - - const ui64 pageCount = 4; - - TRowsHeap rows1(64 * 1024); - for (ui64 seq = 0; seq < 4 * pageCount; ++seq) { - rows1.Put(*TSchemedCookRow(*lay).Col(1000 + seq, 42_u32)); - } - - TRowsHeap rows2(64 * 1024); - for (ui64 seq = 0; seq < 2 * pageCount; ++seq) { - rows2.Put(*TSchemedCookRow(*lay).Col(1006 + seq * 2, 42_u32)); - } - - auto partView1 = CreatePart(lay, rows1, 4); - auto partView2 = CreatePart(lay, rows2, 2); - for (const auto& partView : { partView1, partView2 }) { - UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 1u); - } - - auto pageSize1 = partView1.Part->GetPageSize(partView1.Part->Index->Begin()->GetPageId()); - auto pageSize2 = partView2.Part->GetPageSize(partView2.Part->Index->Begin()->GetPageId()); - - // Expected keys are 1000, 1004, 1006, 1008, 1010, 1012, ... - TSizeChanges sizeChanges; - - for (ui64 page = 0; page < pageCount; ++page) { - // 1000, 1004, 1008, ... - sizeChanges[1000 + 4 * page].Expect(); - - // 1006, 1010, 1014, ... - sizeChanges[1006 + 4 * page].Expect(); - - if (page > 0) { - // 1004, 1008, ... - sizeChanges[1000 + 4 * page].SubRight(pageSize1, 4); - - // 1010, 1014, ... - sizeChanges[1006 + 4 * page].SubRight(pageSize2, 2); - } - - if (page > 1) { - // 1008, 1012, ... - sizeChanges[1000 + 4 * page].AddLeft(pageSize2, 2); - } - - if (page < pageCount - 1) { - // 1006, 1010, ... - sizeChanges[1006 + 4 * page].AddLeft(pageSize1, 4); - } - } - - sizeChanges[1004].AddLeft(pageSize1, 4); - sizeChanges[1006 + (pageCount - 1) * 4].SubRight(pageSize1, 4); - sizeChanges[1006 + (pageCount - 1) * 4].AddLeft(pageSize2, 2); - - TSplitStatIterator it(*(*lay).Keys); - it.AddSlice(partView1.Part.Get(), partView1.Slices->front(), pageSize1 * pageCount); - it.AddSlice(partView2.Part.Get(), partView2.Slices->front(), pageSize2 * pageCount); - VerifySizeChanges(it, sizeChanges); - } - - Y_UNIT_TEST(SplitStatPageInsidePage) { - TLayoutCook lay; - lay - .Col(0, 0, NScheme::NTypeIds::Uint64) - .Col(0, 8, NScheme::NTypeIds::Uint32) - .Key({ 0 }); - - const ui64 pageCount = 3; - - TRowsHeap rows1(64 * 1024); - for (ui64 seq = 0; seq < 4 * pageCount; ++seq) { - rows1.Put(*TSchemedCookRow(*lay).Col(1000 + seq, 42_u32)); - } - - TRowsHeap rows2(64 * 1024); - rows2.Put(*TSchemedCookRow(*lay).Col(1005_u64, 42_u32)); - rows2.Put(*TSchemedCookRow(*lay).Col(1006_u64, 42_u32)); - - auto partView1 = CreatePart(lay, rows1, 4); - auto partView2 = CreatePart(lay, rows2, 4); - for (const auto& partView : { partView1, partView2 }) { - UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 1u); - } - - auto pageSize1 = partView1.Part->GetPageSize(partView1.Part->Index->Begin()->GetPageId()); - auto pageSize2 = partView2.Part->GetPageSize(partView2.Part->Index->Begin()->GetPageId()); - - // Expected keys are 1000, 1004, 1005, 1008 - TSizeChanges sizeChanges; - - sizeChanges[1000].Expect(); - sizeChanges[1004] - .AddLeft(pageSize1, 4) - .SubRight(pageSize1, 4); - sizeChanges[1005] - .AddLeft(pageSize1, 4); - sizeChanges[1008] - .AddLeft(pageSize2, 2) - .SubRight(pageSize2, 2) - .SubRight(pageSize1, 4); - - TSplitStatIterator it(*(*lay).Keys); - it.AddSlice(partView1.Part.Get(), partView1.Slices->front(), pageSize1 * pageCount); - it.AddSlice(partView2.Part.Get(), partView2.Slices->front(), pageSize2 * pageCount); - VerifySizeChanges(it, sizeChanges); - } - - Y_UNIT_TEST(PageReuseTest) { - TLayoutCook lay; - lay - .Col(0, 0, NScheme::NTypeIds::Uint64) - .Col(0, 8, NScheme::NTypeIds::Uint32) - .Key({ 0 }); - - const ui64 pageCount = 16; - TRowTool tool(*lay); - - TRowsHeap rows1(64 * 1024); - for (ui64 seq = 0; seq < 4 * pageCount; ++seq) { - rows1.Put(*TSchemedCookRow(*lay).Col(1000 + seq, 42_u32)); - } - - TRowsHeap rows2(64 * 1024); - for (ui64 seq = 0; seq < 5; ++seq) { - rows2.Put(*TSchemedCookRow(*lay).Col(1009 + seq, 42_u32)); - } - - auto partView1 = CreatePart(lay, rows1, 4); - auto partView2 = CreatePart(lay, rows2, 4); - for (const auto& partView : { partView1, partView2 }) { - UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 1u); - } - - TPageReuseBuilder builder(*lay.RowScheme()->Keys); - - { - TSlice slice = partView1.Slices->at(0); - slice.FirstRowId += 2; - slice.LastRowId -= 2; - slice.FirstKey = TSerializedCellVec(tool.KeyCells(rows1[slice.FirstRowId])); - slice.LastKey = TSerializedCellVec(tool.KeyCells(rows1[slice.LastRowId])); - builder.AddSlice(partView1.Part.Get(), slice, true); - } - - builder.AddSlice(partView2.Part.Get(), partView2.Slices->at(0), true); - - auto results = builder.Build(); - - UNIT_ASSERT_VALUES_EQUAL(results.Reusable.size(), 2u); - - auto& reusable1 = results.Reusable.at(0); - UNIT_ASSERT_VALUES_EQUAL(reusable1.Slice.FirstRowId, 4u); - UNIT_ASSERT_VALUES_EQUAL(reusable1.Slice.FirstInclusive, true); - UNIT_ASSERT_VALUES_EQUAL(reusable1.Slice.LastRowId, 8u); - UNIT_ASSERT_VALUES_EQUAL(reusable1.Slice.LastInclusive, false); - - auto& reusable2 = results.Reusable.at(1); - UNIT_ASSERT_VALUES_EQUAL(reusable2.Slice.FirstRowId, 16u); - UNIT_ASSERT_VALUES_EQUAL(reusable2.Slice.FirstInclusive, true); - UNIT_ASSERT_VALUES_EQUAL(reusable2.Slice.LastRowId, partView1->Index.GetEndRowId() - 4); - UNIT_ASSERT_VALUES_EQUAL(reusable2.Slice.LastInclusive, false); - - UNIT_ASSERT_VALUES_EQUAL(results.ExpectedSlices, 5u); - } - - Y_UNIT_TEST(CompactWithUnderlayMask) { - TLayoutCook lay; - lay - .Col(0, 0, NScheme::NTypeIds::Uint64) - .Col(0, 8, NScheme::NTypeIds::Uint32) - .Key({ 0 }); - - const ui64 rowsCount = 20; - - TRowTool tool(*lay); - TRowsHeap rows(64 * 1024); - TPartCook cook(lay, { false, 4096 }); - for (ui64 seq = 0; seq < rowsCount; ++seq) { - rows.Put(*TSchemedCookRow(*lay).Col(1000 + seq, 42_u32)); - cook.AddOpN(ERowOp::Erase, 1000 + seq); - } - - auto source = cook.Finish(); - - { - auto partView = source.ToPartView(); - UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 1u); - UNIT_ASSERT_VALUES_EQUAL(partView.Slices->at(0).Rows(), rowsCount); - } - - TVector<TBounds> underlayMaskValues; - - { - auto& bounds = underlayMaskValues.emplace_back(); - bounds.FirstKey = TSerializedCellVec(tool.KeyCells(rows[3])); - bounds.LastKey = TSerializedCellVec(tool.KeyCells(rows[7])); - bounds.FirstInclusive = false; - bounds.LastInclusive = true; - } - - { - auto& bounds = underlayMaskValues.emplace_back(); - bounds.FirstKey = TSerializedCellVec(tool.KeyCells(rows[11])); - bounds.LastKey = TSerializedCellVec(tool.KeyCells(rows[14])); - bounds.FirstInclusive = true; - bounds.LastInclusive = false; - } - - TVector<const TBounds*> underlayMaskPointers; - underlayMaskPointers.emplace_back(&underlayMaskValues.at(0)); - underlayMaskPointers.emplace_back(&underlayMaskValues.at(1)); - auto underlayMask = TUnderlayMask::Build(lay.RowScheme(), underlayMaskPointers); - - auto born = TCompaction(nullptr, CreateConf(16, underlayMask.Get())) - .Do(lay.RowScheme(), { &source }); - - // Check only erase markers under the mask are kept intact - TCheckIt(born, { nullptr, 0 }, nullptr, true /* expand defaults */) - .To(10).Seek({ }, ESeek::Lower).Is(EReady::Data) - .To(21).IsOpN(ERowOp::Erase, 1004_u64, ECellOp::Empty).Next() - .To(22).IsOpN(ERowOp::Erase, 1005_u64, ECellOp::Empty).Next() - .To(23).IsOpN(ERowOp::Erase, 1006_u64, ECellOp::Empty).Next() - .To(24).IsOpN(ERowOp::Erase, 1007_u64, ECellOp::Empty).Next() - .To(25).IsOpN(ERowOp::Erase, 1011_u64, ECellOp::Empty).Next() - .To(26).IsOpN(ERowOp::Erase, 1012_u64, ECellOp::Empty).Next() - .To(27).IsOpN(ERowOp::Erase, 1013_u64, ECellOp::Empty).Next() - .To(30).Is(EReady::Gone); - } - - Y_UNIT_TEST(CompactWithSplitKeys) { - TLayoutCook lay; - lay - .Col(0, 0, NScheme::NTypeIds::Uint64) - .Col(0, 8, NScheme::NTypeIds::Uint32) - .Key({ 0 }); - - const ui64 pageCount = 5; - - TRowTool tool(*lay); - TRowsHeap rows(64 * 1024); - for (ui64 seq = 0; seq < 4 * pageCount; ++seq) { - rows.Put(*TSchemedCookRow(*lay).Col(1000 + seq, 42_u32)); - } - - auto source = TPartCook(lay, CreateConf(4)) - .Add(rows.begin(), rows.end()) - .Finish(); - - { - auto partView = source.ToPartView(); - UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 1u); - UNIT_ASSERT_VALUES_EQUAL(partView->Index->Count, pageCount); - } - - TVector<TSerializedCellVec> splitKeyValues; - splitKeyValues.emplace_back(tool.KeyCells(rows[6])); - splitKeyValues.emplace_back(tool.KeyCells(rows[14])); - TSplitKeys splitKeys(lay.RowScheme(), std::move(splitKeyValues)); - - auto born = TCompaction(nullptr, CreateConf(4, &splitKeys)) - .Do(lay.RowScheme(), { &source }); - - { - auto partView = born.ToPartView(); - UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 3u); - UNIT_ASSERT_VALUES_EQUAL(partView->Index->Count, 6u); - - UNIT_ASSERT_VALUES_EQUAL(partView->Index->Record(0)->GetRowId(), 0u); - UNIT_ASSERT_VALUES_EQUAL(partView->Index->Record(1)->GetRowId(), 4u); - UNIT_ASSERT_VALUES_EQUAL(partView->Index->Record(2)->GetRowId(), 6u); - UNIT_ASSERT_VALUES_EQUAL(partView->Index->Record(3)->GetRowId(), 10u); - UNIT_ASSERT_VALUES_EQUAL(partView->Index->Record(4)->GetRowId(), 14u); - UNIT_ASSERT_VALUES_EQUAL(partView->Index->Record(5)->GetRowId(), 18u); - - auto key0 = TSerializedCellVec::Serialize(tool.KeyCells(rows[0])); - auto key5 = TSerializedCellVec::Serialize(tool.KeyCells(rows[5])); - auto& slice0 = partView.Slices->at(0); - UNIT_ASSERT_VALUES_EQUAL(slice0.FirstRowId, 0u); - UNIT_ASSERT_VALUES_EQUAL(slice0.LastRowId, 5u); - UNIT_ASSERT_VALUES_EQUAL(slice0.FirstKey.GetBuffer(), key0); - UNIT_ASSERT_VALUES_EQUAL(slice0.LastKey.GetBuffer(), key5); - - auto key6 = TSerializedCellVec::Serialize(tool.KeyCells(rows[6])); - auto key13 = TSerializedCellVec::Serialize(tool.KeyCells(rows[13])); - auto& slice1 = partView.Slices->at(1); - UNIT_ASSERT_VALUES_EQUAL(slice1.FirstRowId, 6u); - UNIT_ASSERT_VALUES_EQUAL(slice1.LastRowId, 13u); - UNIT_ASSERT_VALUES_EQUAL(slice1.FirstKey.GetBuffer(), key6); - UNIT_ASSERT_VALUES_EQUAL(slice1.LastKey.GetBuffer(), key13); - - auto key14 = TSerializedCellVec::Serialize(tool.KeyCells(rows[14])); - auto key19 = TSerializedCellVec::Serialize(tool.KeyCells(rows[19])); - auto& slice2 = partView.Slices->at(2); - UNIT_ASSERT_VALUES_EQUAL(slice2.FirstRowId, 14u); - UNIT_ASSERT_VALUES_EQUAL(slice2.LastRowId, 19u); - UNIT_ASSERT_VALUES_EQUAL(slice2.FirstKey.GetBuffer(), key14); - UNIT_ASSERT_VALUES_EQUAL(slice2.LastKey.GetBuffer(), key19); - } - } - -} - -Y_UNIT_TEST_SUITE(TShardedCompactionScenarios) { - - struct Schema : NIceDb::Schema { - struct Data : Table<1> { - struct Key : Column<1, NScheme::NTypeIds::Uint64> { }; - struct Value : Column<2, NScheme::NTypeIds::Uint32> { }; - - using TKey = TableKey<Key>; - using TColumns = TableColumns<Key, Value>; - }; - - using TTables = SchemaTables<Data>; - }; - - Y_UNIT_TEST(SimpleNoResharding) { - TSimpleBackend backend; - TSimpleBroker broker; - TSimpleLogger logger; - TSimpleTime time; - - // Initialize the schema - { - auto db = backend.Begin(); - db.Materialize<Schema>(); - - // special policy with disabled nursery - TCompactionPolicy policy; - policy.ShardPolicy.SetMinSliceSize(0); - backend.DB.Alter().SetCompactionPolicy(1, policy); - - backend.Commit(); - } - - // Insert some initial rows and compact them outside of strategy - { - auto db = backend.Begin(); - for (ui64 seq = 0; seq < 64; ++seq) { - db.Table<Schema::Data>().Key(1000 + seq).Update<Schema::Data::Value>(42); - } - backend.Commit(); - backend.SimpleMemCompaction(1); - - UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(1).size(), 1u); - } - - TShardedCompactionStrategy strategy(1, &backend, &broker, &logger, &time, "suffix"); - - { - TCompactionState initialState; - strategy.Start(initialState); - } - - // Don't expect any tasks or change requests - UNIT_ASSERT(!broker.HasPending()); - UNIT_ASSERT(!backend.PendingReads); - UNIT_ASSERT(!backend.StartedCompactions); - UNIT_ASSERT(!backend.CheckChangesFlag()); - - // Erase and insert some more rows - { - auto db = backend.Begin(); - for (ui64 seq = 0; seq < 256; ++seq) { - if (seq < 128) { - db.Table<Schema::Data>().Key(1000 - 64 + seq).Delete(); - } else { - db.Table<Schema::Data>().Key(1000 - 64 + seq).Update<Schema::Data::Value>(43); - } - } - backend.Commit(); - } - - // Start a memtable compaction using this strategy - { - auto memCompactionId = strategy.BeginMemCompaction(0, { 0, TEpoch::Max() }, 0); - UNIT_ASSERT(memCompactionId != 0); - auto outcome = backend.RunCompaction(memCompactionId); - - UNIT_ASSERT(outcome.Params); - UNIT_ASSERT(!outcome.Params->Parts); - UNIT_ASSERT(!outcome.Params->IsFinal); - - // We expect that only 64 out of 128 drops survive - UNIT_ASSERT(outcome.Result); - UNIT_ASSERT_VALUES_EQUAL(outcome.Result->Parts.size(), 1u); - UNIT_ASSERT(outcome.Result->Parts[0]); - UNIT_ASSERT_VALUES_EQUAL(outcome.Result->Parts[0]->Stat.Drops, 64u); - - auto changes = strategy.CompactionFinished( - memCompactionId, std::move(outcome.Params), std::move(outcome.Result)); - - // Don't expect any slice changes at this time - UNIT_ASSERT(changes.SliceChanges.empty()); - } - - // There should be a compaction task pending right now - UNIT_ASSERT(broker.RunPending()); - UNIT_ASSERT(!broker.HasPending()); - - // There should be compaction started right now - UNIT_ASSERT_VALUES_EQUAL(backend.StartedCompactions.size(), 1u); - - // Perform this compaction - { - auto outcome = backend.RunCompaction(); - UNIT_ASSERT(outcome.Params->Parts); - UNIT_ASSERT(outcome.Params->IsFinal); - - // We expect that none of the original drops survive - UNIT_ASSERT_VALUES_EQUAL(outcome.Result->Parts.size(), 1u); - UNIT_ASSERT_VALUES_EQUAL(outcome.Result->Parts[0]->Stat.Drops, 0u); - - auto changes = strategy.CompactionFinished( - outcome.CompactionId, std::move(outcome.Params), std::move(outcome.Result)); - - // Don't expect any slice changes at this time - UNIT_ASSERT(changes.SliceChanges.empty()); - } - - UNIT_ASSERT(!broker.HasPending()); - UNIT_ASSERT(!backend.StartedCompactions); - } - - Y_UNIT_TEST(NurserySequential) { - TSimpleBackend backend; - TSimpleBroker broker; - TSimpleLogger logger; - TSimpleTime time; - - const ui64 triggerSize = 16 * 1024; - - // Initialize the schema - { - auto db = backend.Begin(); - db.Materialize<Schema>(); - - // special policy with a small nursery - TCompactionPolicy policy; - policy.ShardPolicy.SetMinSliceSize(triggerSize); - backend.DB.Alter().SetCompactionPolicy(1, policy); - - backend.Commit(); - } - - TShardedCompactionStrategy strategy(1, &backend, &broker, &logger, &time, "suffix"); - strategy.Start({ }); - - size_t memCompactions = 0; - for (ui64 base = 0; base < 16 * 1024; base += 128) { - auto db = backend.Begin(); - for (ui64 seq = 0; seq < 128; ++seq) { - db.Table<Schema::Data>().Key(base + seq).Update<Schema::Data::Value>(42); - } - backend.Commit(); - backend.SimpleMemCompaction(&strategy); - ++memCompactions; - - UNIT_ASSERT_C(!broker.HasPending(), - "Strategy shouldn't request tasks in fully sequential case"); - } - - UNIT_ASSERT(!broker.HasPending()); - UNIT_ASSERT(!broker.HasRunning()); - UNIT_ASSERT(!backend.StartedCompactions); - - size_t countAll = 0; - size_t countBig = 0; - size_t countSmall = 0; - for (auto& part : backend.TableParts(1)) { - ++countAll; - if (part->BackingSize() >= triggerSize) { - ++countBig; - } else { - ++countSmall; - } - } - - UNIT_ASSERT_C(countAll < memCompactions && countBig > countSmall, - "Produced " << countAll << " parts after " << memCompactions << " compactions (" - << countBig << " big and " << countSmall << " small)"); - } - - Y_UNIT_TEST(SequentialOverlap) { - TSimpleBackend backend; - TSimpleBroker broker; - TSimpleLogger logger; - TSimpleTime time; - - // Initialize the schema - { - auto db = backend.Begin(); - db.Materialize<Schema>(); - - // special policy without nursery and extremely reusable slices - TCompactionPolicy policy; - policy.ShardPolicy.SetMinSliceSize(0); - policy.ShardPolicy.SetMinSliceSizeToReuse(1); - policy.ShardPolicy.SetNewDataPercentToCompact(50); - backend.DB.Alter().SetCompactionPolicy(1, policy); - - backend.Commit(); - } - - TShardedCompactionStrategy strategy(1, &backend, &broker, &logger, &time, "suffix"); - strategy.Start({ }); - - const ui64 rowsPerTx = 16 * 1024; - for (ui64 attempt = 0; attempt < 2; ++attempt) { - const ui64 base = (rowsPerTx - 1) * attempt; - auto db = backend.Begin(); - for (ui64 seq = 0; seq < rowsPerTx; ++seq) { - db.Table<Schema::Data>().Key(base + seq).Update<Schema::Data::Value>(42); - } - backend.Commit(); - backend.SimpleMemCompaction(&strategy); - - // Run all pending tasks and compactions - while (broker.RunPending()) { - while (backend.SimpleTableCompaction(1, &broker, &strategy)) { - // nothing - } - } - } - - UNIT_ASSERT(!broker.HasPending()); - UNIT_ASSERT(!broker.HasRunning()); - UNIT_ASSERT(!backend.StartedCompactions); - - // We expect overlap to be compacted with the rest reused - UNIT_ASSERT_VALUES_EQUAL( - backend.DumpKeyRanges(1, true), - "[{0}, {16168})@1/4 [{16168}, {16758}]@2/8 [{16759}, {32766}]@2/7"); - } - - Y_UNIT_TEST(SequentialSplit) { - TSimpleBackend backend; - TSimpleBroker broker; - TSimpleLogger logger; - TSimpleTime time; - - // Initialize the schema - { - auto db = backend.Begin(); - db.Materialize<Schema>(); - - // special policy without nursery and small shard size - TCompactionPolicy policy; - policy.ShardPolicy.SetMinSliceSize(0); - policy.ShardPolicy.SetMaxShardSize(512 * 1024); - backend.DB.Alter().SetCompactionPolicy(1, policy); - - backend.Commit(); - } - - TShardedCompactionStrategy strategy(1, &backend, &broker, &logger, &time, "suffix"); - strategy.Start({ }); - - const ui64 rowsPerTx = 16 * 1024; - for (ui64 index = 0; index < 2; ++index) { - const ui64 base = rowsPerTx * index; - auto db = backend.Begin(); - for (ui64 seq = 0; seq < rowsPerTx; ++seq) { - db.Table<Schema::Data>().Key(base + seq).Update<Schema::Data::Value>(42); - } - backend.Commit(); - backend.SimpleMemCompaction(&strategy); - - UNIT_ASSERT_C(!broker.HasPending(), "Pending task at index " << index); - UNIT_ASSERT_C(!backend.StartedCompactions, "Started compaction at index " << index); - - UNIT_ASSERT_C(!backend.PendingReads, "Pending read at index " << index); - } - - auto& state = backend.TableState[1]; - - NProto::TShardedStrategyStateInfo header; - UNIT_ASSERT(state.contains(0)); - UNIT_ASSERT(ParseFromStringNoSizeLimit(header, state[0])); - - // We expect two shards - UNIT_ASSERT_VALUES_EQUAL(header.GetShards().size(), 2u); - - // We expect a single split key - UNIT_ASSERT_VALUES_EQUAL(header.GetSplitKeys().size(), 1u); - - // We expect split key to be the start of the second part - TSerializedCellVec splitKey(state.at(header.GetSplitKeys(0))); - UNIT_ASSERT(splitKey.GetCells()); - UNIT_ASSERT_VALUES_EQUAL(splitKey.GetCells().at(0).AsValue<ui64>(), rowsPerTx); - } - - Y_UNIT_TEST(NormalSplit) { - TSimpleBackend backend; - TSimpleBroker broker; - TSimpleLogger logger; - TSimpleTime time; - - // Initialize the schema - { - auto db = backend.Begin(); - db.Materialize<Schema>(); - - // special policy without nursery, small shard size and large compaction trigger - TCompactionPolicy policy; - policy.ShardPolicy.SetMinSliceSize(0); - policy.ShardPolicy.SetMaxShardSize(512 * 1024); - policy.ShardPolicy.SetNewDataPercentToCompact(300); - backend.DB.Alter().SetCompactionPolicy(1, policy); - - backend.Commit(); - } - - TShardedCompactionStrategy strategy(1, &backend, &broker, &logger, &time, "suffix"); - strategy.Start({ }); - - const ui64 rowsPerTx = 16 * 1024; - for (ui64 index = 0; index < 2; ++index) { - const ui64 base = index; - auto db = backend.Begin(); - for (ui64 seq = 0; seq < rowsPerTx; ++seq) { - db.Table<Schema::Data>().Key(base + seq * 3).Update<Schema::Data::Value>(42); - } - backend.Commit(); - backend.SimpleMemCompaction(&strategy); - - UNIT_ASSERT_C(!broker.HasPending(), "Pending task at index " << index); - UNIT_ASSERT_C(!backend.StartedCompactions, "Started compaction at index " << index); - - if (index == 0) { - UNIT_ASSERT_C(!backend.PendingReads, "Pending read at index " << index); - } - } - - UNIT_ASSERT_VALUES_EQUAL(backend.PendingReads.size(), 2u); - while (backend.PendingReads) { - TStrictEnv env; - // load index - auto first = backend.RunRead(&env); - UNIT_ASSERT(!first.Completed); - env.Load(); - // load data - auto second = backend.RunRead(&env); - UNIT_ASSERT(!second.Completed); - env.Load(); - auto third = backend.RunRead(first.ReadId, &env); - UNIT_ASSERT(third.Completed); - } - - UNIT_ASSERT(backend.CheckChangesFlag()); - auto changes = strategy.ApplyChanges(); - UNIT_ASSERT(changes.SliceChanges); - backend.ApplyChanges(1, std::move(changes)); - - auto& state = backend.TableState[1]; - - NProto::TShardedStrategyStateInfo header; - UNIT_ASSERT(state.contains(0)); - UNIT_ASSERT(ParseFromStringNoSizeLimit(header, state[0])); - - // We expect two shards - UNIT_ASSERT_VALUES_EQUAL(header.GetShards().size(), 2u); - - // We expect a single split key - UNIT_ASSERT_VALUES_EQUAL(header.GetSplitKeys().size(), 1u); - - // Stop current strategy - strategy.Stop(); - - // Start a new strategy instance from the same initial state - TShardedCompactionStrategy reloaded(1, &backend, &broker, &logger, &time, "suffix"); - { - TCompactionState initialState; - initialState.StateSnapshot = backend.TableState[1]; - reloaded.Start(std::move(initialState)); - } - - // Strategy must accept current slices without unexpected actions - UNIT_ASSERT(!broker.HasPending()); - UNIT_ASSERT(!backend.PendingReads); - UNIT_ASSERT(!backend.StartedCompactions); - - for (ui64 index = 2; index < 3; ++index) { - const ui64 base = index; - auto db = backend.Begin(); - for (ui64 seq = 0; seq < rowsPerTx; ++seq) { - db.Table<Schema::Data>().Key(base + seq * 3).Update<Schema::Data::Value>(42); - } - backend.Commit(); - backend.SimpleMemCompaction(&reloaded); - - UNIT_ASSERT_C(!broker.HasPending(), "Pending task at index " << index); - UNIT_ASSERT_C(!backend.StartedCompactions, "Started compaction at index " << index); - - // The part should be generated without additional reads - UNIT_ASSERT(!backend.PendingReads); - } - - // There should be 3 parts by now - UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(1).size(), 3u); - - for (auto& partView : backend.TableParts(1)) { - // All parts are expected to have 2 slices - UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 2u); - } - } - - Y_UNIT_TEST(CompactionHoles) { - TSimpleBackend backend; - TSimpleBroker broker; - TSimpleLogger logger; - TSimpleTime time; - - // Initialize the schema - { - auto db = backend.Begin(); - db.Materialize<Schema>(); - - // special policy without nursery and 99% compaction trigger - TCompactionPolicy policy; - policy.ShardPolicy.SetMinSliceSize(0); - policy.ShardPolicy.SetNewDataPercentToCompact(99); - policy.ShardPolicy.SetNewRowsPercentToCompact(99); - backend.DB.Alter().SetCompactionPolicy(1, policy); - - backend.Commit(); - } - - auto strategy = MakeHolder<TShardedCompactionStrategy>(1, &backend, &broker, &logger, &time, "suffix"); - strategy->Start({ }); - - // Arrange parts (epochs) like this in key space: - // . 5 . 6 . - // . 3 . 4 . - // 7 1 8 2 9 - // We ignore compaction requests so it's not messed up until we're done. - const ui64 rowsPerTx = 16 * 1024; - for (ui64 level = 0; level < 3; ++level) { - for (ui64 index = 0; index < 2; ++index) { - const ui64 base = rowsPerTx + index * 2 * rowsPerTx; - auto db = backend.Begin(); - for (ui64 seq = 0; seq < rowsPerTx; ++seq) { - db.Table<Schema::Data>().Key(base + seq).Update<Schema::Data::Value>(42); - } - backend.Commit(); - backend.SimpleMemCompaction(strategy.Get()); - } - } - for (ui64 index = 0; index < 3; ++index) { - const ui64 base = index * 2 * rowsPerTx; - auto db = backend.Begin(); - for (ui64 seq = 0; seq < rowsPerTx; ++seq) { - db.Table<Schema::Data>().Key(base + seq).Update<Schema::Data::Value>(42); - } - backend.Commit(); - backend.SimpleMemCompaction(strategy.Get()); - } - - UNIT_ASSERT_VALUES_EQUAL(backend.DumpKeyRanges(1), - "[{0}, {16383}]@7 [{16384}, {32767}]@1 [{16384}, {32767}]@3 [{16384}, {32767}]@5 [{32768}, {49151}]@8 [{49152}, {65535}]@2 [{49152}, {65535}]@4 [{49152}, {65535}]@6 [{65536}, {81919}]@9"); - - auto savedState = strategy->SnapshotState(); - strategy->Stop(); - strategy.Reset(); - - // Strategy must discard all its current tasks when stopping - UNIT_ASSERT(!broker.HasPending()); - UNIT_ASSERT(!backend.PendingReads); - UNIT_ASSERT(!backend.StartedCompactions); - - strategy = MakeHolder<TShardedCompactionStrategy>(1, &backend, &broker, &logger, &time, "suffix"); - strategy->Start(std::move(savedState)); - - // We expect a single compaction of two upper levels - UNIT_ASSERT(broker.RunPending()); - UNIT_ASSERT(!broker.HasPending()); - UNIT_ASSERT_VALUES_EQUAL(backend.StartedCompactions.size(), 1u); - UNIT_ASSERT(backend.SimpleTableCompaction(1, &broker, strategy.Get())); - - // We expect the following at the end of compaction: - // . 6 . 6 . - // 7 1 8 2 9 - UNIT_ASSERT_VALUES_EQUAL(backend.DumpKeyRanges(1), - "[{0}, {16383}]@7 [{16384}, {32767}]@1 [{16384}, {32767}]@6 [{32768}, {49151}]@8 [{49152}, {65535}]@2 [{49152}, {65535}]@6 [{65536}, {81919}]@9"); - } - - Y_UNIT_TEST(CompactionGarbage) { - TSimpleBackend backend; - TSimpleBroker broker; - TSimpleLogger logger; - TSimpleTime time; - - // Initialize the schema - { - auto db = backend.Begin(); - db.Materialize<Schema>(); - - // special policy without nursery and 25% compaction triggers - TCompactionPolicy policy; - policy.ShardPolicy.SetMinSliceSize(0); - policy.ShardPolicy.SetMinShardSize(0); - policy.ShardPolicy.SetNewDataPercentToCompact(25); - policy.ShardPolicy.SetNewRowsPercentToCompact(25); - policy.ShardPolicy.SetMaxGarbagePercentToReuse(25); - backend.DB.Alter().SetCompactionPolicy(1, policy); - - backend.Commit(); - } - - const ui64 rowsPerShard = 16 * 1024; - - // Create initial state with 2 shards - TCompactionState initialState; - { - const ui64 splitKey = rowsPerShard; - initialState.StateSnapshot[1] = TSerializedCellVec::Serialize({ - TCell::Make(splitKey) - }); - NProto::TShardedStrategyStateInfo header; - header.SetLastSplitKey(1); - header.AddSplitKeys(1); - header.SetLastShardId(2); - header.AddShards(1); - header.AddShards(2); - Y_PROTOBUF_SUPPRESS_NODISCARD header.SerializeToString(&initialState.StateSnapshot[0]); - } - - TShardedCompactionStrategy strategy(1, &backend, &broker, &logger, &time, "suffix"); - strategy.Start(std::move(initialState)); - - // Create a large slice over both shards - { - auto db = backend.Begin(); - for (ui64 seq = 0; seq < rowsPerShard * 2; ++seq) { - db.Table<Schema::Data>().Key(seq).Update<Schema::Data::Value>(42); - } - backend.Commit(); - backend.SimpleMemCompaction(&strategy); - } - - UNIT_ASSERT_VALUES_EQUAL( - backend.DumpKeyRanges(1), - "[{0}, {16383}]@1 [{16384}, {32767}]@1"); - - // Create a large slice in the first shard, to trigger its compaction - { - auto db = backend.Begin(); - for (ui64 seq = 0; seq < rowsPerShard; ++seq) { - db.Table<Schema::Data>().Key(seq).Update<Schema::Data::Value>(42); - } - backend.Commit(); - backend.SimpleMemCompaction(&strategy); - } - - UNIT_ASSERT_VALUES_EQUAL( - backend.DumpKeyRanges(1, true), - "[{0}, {16383}]@1/4 [{0}, {16383}]@2/7 [{16384}, {32767}]@1/4"); - - // New slice should have triggered compaction in the left shard - UNIT_ASSERT(broker.RunPending()); - UNIT_ASSERT(!broker.HasPending()); - UNIT_ASSERT(backend.SimpleTableCompaction(1, &broker, &strategy)); - - UNIT_ASSERT_VALUES_EQUAL( - backend.DumpKeyRanges(1, true), - "[{0}, {16383}]@2/8 [{16384}, {32767}]@1/4"); - - // Now we should have triggered garbage compaction in the right shard - UNIT_ASSERT(broker.RunPending()); - UNIT_ASSERT(!broker.HasPending()); - UNIT_ASSERT(backend.SimpleTableCompaction(1, &broker, &strategy)); - - UNIT_ASSERT_VALUES_EQUAL( - backend.DumpKeyRanges(1, true), - "[{0}, {16383}]@2/8 [{16384}, {32767}]@1/9"); - - UNIT_ASSERT(!broker.HasPending()); - } - - Y_UNIT_TEST(CompactionGarbageNoReuse) { - TSimpleBackend backend; - TSimpleBroker broker; - TSimpleLogger logger; - TSimpleTime time; - - // Initialize the schema - { - auto db = backend.Begin(); - db.Materialize<Schema>(); - - // special policy without nursery, 25% compaction triggers - TCompactionPolicy policy; - policy.ShardPolicy.SetMinSliceSize(0); - policy.ShardPolicy.SetMinShardSize(0); - policy.ShardPolicy.SetMinSliceSizeToReuse(1); - policy.ShardPolicy.SetNewDataPercentToCompact(25); - policy.ShardPolicy.SetNewRowsPercentToCompact(25); - policy.ShardPolicy.SetMaxGarbagePercentToReuse(25); - backend.DB.Alter().SetCompactionPolicy(1, policy); - - backend.Commit(); - } - - TShardedCompactionStrategy strategy(1, &backend, &broker, &logger, &time, "suffix"); - strategy.Start({ }); - - const ui64 rowsPerSlice = 16 * 1024; - - // Create two slices that largely overlap - for (ui64 index = 0; index < 2; ++index) { - const ui64 base = index * rowsPerSlice / 2; - auto db = backend.Begin(); - for (ui64 seq = 0; seq < rowsPerSlice; ++seq) { - db.Table<Schema::Data>().Key(base + seq).Update<Schema::Data::Value>(42); - } - backend.Commit(); - backend.SimpleMemCompaction(&strategy); - } - - UNIT_ASSERT_VALUES_EQUAL( - backend.DumpKeyRanges(1, true), - "[{0}, {16383}]@1/4 [{8192}, {24575}]@2/7"); - - // Run the triggered compaction - UNIT_ASSERT(broker.RunPending()); - UNIT_ASSERT(!broker.HasPending()); - UNIT_ASSERT(backend.SimpleTableCompaction(1, &broker, &strategy)); - - // We expect the middle to not be reused, as it would produce too much garbage - UNIT_ASSERT_VALUES_EQUAL( - backend.DumpKeyRanges(1, true), - "[{0}, {24575}]@2/8"); - } - -} - -} -} -} diff --git a/ydb/core/tablet_flat/ut/ya.make b/ydb/core/tablet_flat/ut/ya.make index d1b143bc1a6..29789db2786 100644 --- a/ydb/core/tablet_flat/ut/ya.make +++ b/ydb/core/tablet_flat/ut/ya.make @@ -34,7 +34,6 @@ SRCS( ut_sausage.cpp ut_stat.cpp ut_comp_gen.cpp - ut_comp_shard.cpp ut_compaction.cpp ut_compaction_multi.cpp ut_charge.cpp diff --git a/ydb/core/tablet_flat/ya.make b/ydb/core/tablet_flat/ya.make index f6d499884da..12d88cc87e3 100644 --- a/ydb/core/tablet_flat/ya.make +++ b/ydb/core/tablet_flat/ya.make @@ -7,7 +7,6 @@ SRCS( flat_comp.cpp flat_comp_create.cpp flat_comp_gen.cpp - flat_comp_shard.cpp flat_cxx_database.h flat_database.cpp flat_database.h @@ -51,7 +50,6 @@ SRCS( flat_part_slice.cpp flat_range_cache.cpp flat_row_versions.cpp - flat_stat_part.cpp flat_stat_part.h flat_stat_table.h flat_stat_table.cpp @@ -76,8 +74,6 @@ SRCS( GENERATE_ENUM_SERIALIZATION(flat_comp_gen.h) -GENERATE_ENUM_SERIALIZATION(flat_comp_shard.h) - GENERATE_ENUM_SERIALIZATION(flat_part_loader.h) GENERATE_ENUM_SERIALIZATION(flat_executor_compaction_logic.h) diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index ebb5a4f6cc7..ff64a0c82e5 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -1167,12 +1167,6 @@ bool TPartitionConfigMerger::VerifyCompactionPolicy(const NKikimrSchemeOp::TComp case NKikimrSchemeOp::CompactionStrategyGenerational: break; case NKikimrSchemeOp::CompactionStrategySharded: - if (!KIKIMR_ALLOW_SHARDED_COMPACTION) { - err = TStringBuilder() - << "Unsupported compaction strategy."; - return false; - } - break; default: err = TStringBuilder() << "Unsupported compaction strategy."; diff --git a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp index 943bdcff4be..d33a669908a 100644 --- a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp +++ b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp @@ -9337,22 +9337,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { } } )", - { KIKIMR_ALLOW_SHARDED_COMPACTION ? - NKikimrScheme::StatusAccepted : - NKikimrScheme::StatusInvalidParameter }); - - if (KIKIMR_ALLOW_SHARDED_COMPACTION) { - env.TestWaitNotification(runtime, txId); - TestDescribeResult(DescribePath(runtime, "/MyRoot/Table2"), - { [](const auto& result) { - auto strategy = result - .GetPathDescription() - .GetTable() - .GetPartitionConfig() - .GetCompactionPolicy() - .GetCompactionStrategy(); - UNIT_ASSERT(strategy == NKikimrSchemeOp::CompactionStrategySharded);}}); - } + { NKikimrScheme::StatusInvalidParameter }); } Y_UNIT_TEST(AlterTableWithCompactionStrategies) { //+ @@ -9391,22 +9376,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { } } )", - { KIKIMR_ALLOW_SHARDED_COMPACTION ? - NKikimrScheme::StatusAccepted : - NKikimrScheme::StatusInvalidParameter }); - - if (KIKIMR_ALLOW_SHARDED_COMPACTION) { - env.TestWaitNotification(runtime, txId); - TestDescribeResult(DescribePath(runtime, "/MyRoot/Table1"), - { [](const auto& result) { - auto strategy = result - .GetPathDescription() - .GetTable() - .GetPartitionConfig() - .GetCompactionPolicy() - .GetCompactionStrategy(); - UNIT_ASSERT(strategy == NKikimrSchemeOp::CompactionStrategySharded);}}); - } + { NKikimrScheme::StatusInvalidParameter }); } Y_UNIT_TEST(SimultaneousDropForceDrop) { //+ diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index c662e6b6ff1..8d91105781f 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -49,12 +49,6 @@ RECURSE_FOR_TESTS( LIBRARY() -IF (KIKIMR_ALLOW_SHARDED_COMPACTION) - CFLAGS( - -DKIKIMR_ALLOW_SHARDED_COMPACTION=1 - ) -ENDIF() - SRCS( defs.h schemeshard.cpp |