aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkungasc <kungasc@yandex-team.com>2023-09-25 18:55:27 +0300
committerkungasc <kungasc@yandex-team.com>2023-09-25 19:58:41 +0300
commit06a588f5d3bd9429cc0ed1822cd4d629f3326f82 (patch)
tree94a8d66c3ace7f61624a560aafa4e2b50737af12
parenteb381404ff8196114ef1bc0d45419bce79936dfd (diff)
downloadydb-06a588f5d3bd9429cc0ed1822cd4d629f3326f82.tar.gz
KIKIMR-19139 Delete legacy Comp Shards
-rw-r--r--ydb/core/base/compile_time_flags.h10
-rw-r--r--ydb/core/base/localdb.cpp7
-rw-r--r--ydb/core/base/ya.make7
-rw-r--r--ydb/core/protos/flat_scheme_op.proto2
-rw-r--r--ydb/core/tablet_flat/CMakeLists.darwin-x86_64.txt13
-rw-r--r--ydb/core/tablet_flat/CMakeLists.linux-aarch64.txt13
-rw-r--r--ydb/core/tablet_flat/CMakeLists.linux-x86_64.txt13
-rw-r--r--ydb/core/tablet_flat/CMakeLists.windows-x86_64.txt13
-rw-r--r--ydb/core/tablet_flat/flat_comp_create.cpp13
-rw-r--r--ydb/core/tablet_flat/flat_comp_create.h8
-rw-r--r--ydb/core/tablet_flat/flat_comp_shard.cpp2656
-rw-r--r--ydb/core/tablet_flat/flat_comp_shard.h694
-rw-r--r--ydb/core/tablet_flat/flat_dbase_scheme.h4
-rw-r--r--ydb/core/tablet_flat/flat_executor_compaction_logic.cpp6
-rw-r--r--ydb/core/tablet_flat/flat_executor_ut.cpp110
-rw-r--r--ydb/core/tablet_flat/flat_part_charge.h59
-rw-r--r--ydb/core/tablet_flat/flat_stat_part.cpp98
-rw-r--r--ydb/core/tablet_flat/flat_stat_part.h81
-rw-r--r--ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tablet_flat/ut/ut_comp_shard.cpp1556
-rw-r--r--ydb/core/tablet_flat/ut/ya.make1
-rw-r--r--ydb/core/tablet_flat/ya.make4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.cpp6
-rw-r--r--ydb/core/tx/schemeshard/ut_base/ut_base.cpp34
-rw-r--r--ydb/core/tx/schemeshard/ya.make6
28 files changed, 7 insertions, 5411 deletions
diff --git a/ydb/core/base/compile_time_flags.h b/ydb/core/base/compile_time_flags.h
index 4259148ee3d..d7adae14681 100644
--- a/ydb/core/base/compile_time_flags.h
+++ b/ydb/core/base/compile_time_flags.h
@@ -3,16 +3,6 @@
// This file is used for enabling or disabling dangerous kikimr features
// These flags may only be changed at compile time
-// This feature allows schemeshard to accept sharded compaction for tables
-#ifndef KIKIMR_ALLOW_SHARDED_COMPACTION
-#define KIKIMR_ALLOW_SHARDED_COMPACTION 0
-#endif
-
-// This feature switches default to sharded compaction for all tables
-#ifndef KIKIMR_DEFAULT_SHARDED_COMPACTION
-#define KIKIMR_DEFAULT_SHARDED_COMPACTION 0
-#endif
-
// This feature enables tablet to serialize bundle deltas in part switches
// It may only be enabled after 19-2 or newer is deployed everywhere
#ifndef KIKIMR_TABLET_WRITE_BUNDLE_DELTAS
diff --git a/ydb/core/base/localdb.cpp b/ydb/core/base/localdb.cpp
index d07ee26fc1f..c3ed995d157 100644
--- a/ydb/core/base/localdb.cpp
+++ b/ydb/core/base/localdb.cpp
@@ -196,10 +196,6 @@ void TCompactionPolicy::Serialize(NKikimrSchemeOp::TCompactionPolicy& policyPb)
TCompactionPolicyPtr CreateDefaultTablePolicy() {
TCompactionPolicyPtr policy = new TCompactionPolicy;
-#if KIKIMR_DEFAULT_SHARDED_COMPACTION
- policy->CompactionStrategy = NKikimrSchemeOp::CompactionStrategySharded;
- policy->ShardPolicy.SetTaskPriorityBase(100);
-#endif
return policy;
}
@@ -212,9 +208,6 @@ TCompactionPolicyPtr CreateDefaultUserTablePolicy() {
LegacyQueueIdToTaskName(2), false});
userPolicy->Generations.push_back({400 * 1024 * 1024, 5, 16, 16ull * 1024 * 1024 * 1024,
LegacyQueueIdToTaskName(3), false});
-#if KIKIMR_DEFAULT_SHARDED_COMPACTION
- userPolicy->CompactionStrategy = NKikimrSchemeOp::CompactionStrategySharded;
-#endif
return userPolicy;
}
diff --git a/ydb/core/base/ya.make b/ydb/core/base/ya.make
index 7e65496bc43..a9c5c9934f9 100644
--- a/ydb/core/base/ya.make
+++ b/ydb/core/base/ya.make
@@ -1,12 +1,5 @@
LIBRARY()
-IF (KIKIMR_DEFAULT_SHARDED_COMPACTION)
- # Makes it easier to test sharded compaction
- CFLAGS(
- -DKIKIMR_DEFAULT_SHARDED_COMPACTION=1
- )
-ENDIF()
-
SRCS(
actor_activity_names.cpp
appdata.h
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 0f68d97e4ec..631840c53a1 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -122,7 +122,7 @@ message TFamilyDescription {
enum ECompactionStrategy {
CompactionStrategyUnset = 0;
CompactionStrategyGenerational = 1;
- CompactionStrategySharded = 2;
+ CompactionStrategySharded = 2; // DEPRECATED: use CompactionStrategyGenerational
}
message TCompactionPolicy {
diff --git a/ydb/core/tablet_flat/CMakeLists.darwin-x86_64.txt b/ydb/core/tablet_flat/CMakeLists.darwin-x86_64.txt
index 7d4d14ebb92..9f009108f05 100644
--- a/ydb/core/tablet_flat/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tablet_flat/CMakeLists.darwin-x86_64.txt
@@ -38,12 +38,6 @@ get_built_tool_path(
enum_parser
)
get_built_tool_path(
- TOOL_enum_parser_bin
- TOOL_enum_parser_dependency
- tools/enum_parser/enum_parser
- enum_parser
-)
-get_built_tool_path(
TOOL_protoc_bin
TOOL_protoc_dependency
contrib/tools/protoc/bin
@@ -91,7 +85,6 @@ target_sources(ydb-core-tablet_flat PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_create.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_gen.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_shard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_database.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_dbase_scheme.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_dbase_apply.cpp
@@ -123,7 +116,6 @@ target_sources(ydb-core-tablet_flat PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_part_slice.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_range_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_row_versions.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_stat_part.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_stat_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_store_hotdog.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp
@@ -141,11 +133,6 @@ generate_enum_serilization(ydb-core-tablet_flat
ydb/core/tablet_flat/flat_comp_gen.h
)
generate_enum_serilization(ydb-core-tablet_flat
- ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_shard.h
- INCLUDE_HEADERS
- ydb/core/tablet_flat/flat_comp_shard.h
-)
-generate_enum_serilization(ydb-core-tablet_flat
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_part_loader.h
INCLUDE_HEADERS
ydb/core/tablet_flat/flat_part_loader.h
diff --git a/ydb/core/tablet_flat/CMakeLists.linux-aarch64.txt b/ydb/core/tablet_flat/CMakeLists.linux-aarch64.txt
index 5c9e2af153f..7621ab1675b 100644
--- a/ydb/core/tablet_flat/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tablet_flat/CMakeLists.linux-aarch64.txt
@@ -38,12 +38,6 @@ get_built_tool_path(
enum_parser
)
get_built_tool_path(
- TOOL_enum_parser_bin
- TOOL_enum_parser_dependency
- tools/enum_parser/enum_parser
- enum_parser
-)
-get_built_tool_path(
TOOL_protoc_bin
TOOL_protoc_dependency
contrib/tools/protoc/bin
@@ -92,7 +86,6 @@ target_sources(ydb-core-tablet_flat PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_create.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_gen.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_shard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_database.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_dbase_scheme.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_dbase_apply.cpp
@@ -124,7 +117,6 @@ target_sources(ydb-core-tablet_flat PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_part_slice.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_range_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_row_versions.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_stat_part.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_stat_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_store_hotdog.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp
@@ -142,11 +134,6 @@ generate_enum_serilization(ydb-core-tablet_flat
ydb/core/tablet_flat/flat_comp_gen.h
)
generate_enum_serilization(ydb-core-tablet_flat
- ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_shard.h
- INCLUDE_HEADERS
- ydb/core/tablet_flat/flat_comp_shard.h
-)
-generate_enum_serilization(ydb-core-tablet_flat
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_part_loader.h
INCLUDE_HEADERS
ydb/core/tablet_flat/flat_part_loader.h
diff --git a/ydb/core/tablet_flat/CMakeLists.linux-x86_64.txt b/ydb/core/tablet_flat/CMakeLists.linux-x86_64.txt
index 5c9e2af153f..7621ab1675b 100644
--- a/ydb/core/tablet_flat/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tablet_flat/CMakeLists.linux-x86_64.txt
@@ -38,12 +38,6 @@ get_built_tool_path(
enum_parser
)
get_built_tool_path(
- TOOL_enum_parser_bin
- TOOL_enum_parser_dependency
- tools/enum_parser/enum_parser
- enum_parser
-)
-get_built_tool_path(
TOOL_protoc_bin
TOOL_protoc_dependency
contrib/tools/protoc/bin
@@ -92,7 +86,6 @@ target_sources(ydb-core-tablet_flat PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_create.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_gen.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_shard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_database.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_dbase_scheme.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_dbase_apply.cpp
@@ -124,7 +117,6 @@ target_sources(ydb-core-tablet_flat PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_part_slice.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_range_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_row_versions.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_stat_part.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_stat_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_store_hotdog.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp
@@ -142,11 +134,6 @@ generate_enum_serilization(ydb-core-tablet_flat
ydb/core/tablet_flat/flat_comp_gen.h
)
generate_enum_serilization(ydb-core-tablet_flat
- ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_shard.h
- INCLUDE_HEADERS
- ydb/core/tablet_flat/flat_comp_shard.h
-)
-generate_enum_serilization(ydb-core-tablet_flat
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_part_loader.h
INCLUDE_HEADERS
ydb/core/tablet_flat/flat_part_loader.h
diff --git a/ydb/core/tablet_flat/CMakeLists.windows-x86_64.txt b/ydb/core/tablet_flat/CMakeLists.windows-x86_64.txt
index 7d4d14ebb92..9f009108f05 100644
--- a/ydb/core/tablet_flat/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tablet_flat/CMakeLists.windows-x86_64.txt
@@ -38,12 +38,6 @@ get_built_tool_path(
enum_parser
)
get_built_tool_path(
- TOOL_enum_parser_bin
- TOOL_enum_parser_dependency
- tools/enum_parser/enum_parser
- enum_parser
-)
-get_built_tool_path(
TOOL_protoc_bin
TOOL_protoc_dependency
contrib/tools/protoc/bin
@@ -91,7 +85,6 @@ target_sources(ydb-core-tablet_flat PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_create.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_gen.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_shard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_database.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_dbase_scheme.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_dbase_apply.cpp
@@ -123,7 +116,6 @@ target_sources(ydb-core-tablet_flat PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_part_slice.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_range_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_row_versions.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_stat_part.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_stat_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_store_hotdog.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp
@@ -141,11 +133,6 @@ generate_enum_serilization(ydb-core-tablet_flat
ydb/core/tablet_flat/flat_comp_gen.h
)
generate_enum_serilization(ydb-core-tablet_flat
- ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_comp_shard.h
- INCLUDE_HEADERS
- ydb/core/tablet_flat/flat_comp_shard.h
-)
-generate_enum_serilization(ydb-core-tablet_flat
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_part_loader.h
INCLUDE_HEADERS
ydb/core/tablet_flat/flat_part_loader.h
diff --git a/ydb/core/tablet_flat/flat_comp_create.cpp b/ydb/core/tablet_flat/flat_comp_create.cpp
index d46a1f63e7c..8426b981406 100644
--- a/ydb/core/tablet_flat/flat_comp_create.cpp
+++ b/ydb/core/tablet_flat/flat_comp_create.cpp
@@ -1,6 +1,5 @@
#include "flat_comp_create.h"
#include "flat_comp_gen.h"
-#include "flat_comp_shard.h"
namespace NKikimr {
namespace NTable{
@@ -16,17 +15,5 @@ namespace NTable{
table, backend, broker, time, std::move(taskNameSuffix));
}
- THolder<ICompactionStrategy> CreateShardedCompactionStrategy(
- ui32 table,
- ICompactionBackend* backend,
- IResourceBroker* broker,
- NUtil::ILogger* logger,
- ITimeProvider* time,
- TString taskNameSuffix)
- {
- return MakeHolder<NCompShard::TShardedCompactionStrategy>(
- table, backend, broker, logger, time, std::move(taskNameSuffix));
- }
-
}
}
diff --git a/ydb/core/tablet_flat/flat_comp_create.h b/ydb/core/tablet_flat/flat_comp_create.h
index 5fa047203ea..266f89d9a32 100644
--- a/ydb/core/tablet_flat/flat_comp_create.h
+++ b/ydb/core/tablet_flat/flat_comp_create.h
@@ -15,13 +15,5 @@ namespace NTable {
ITimeProvider* time,
TString taskNameSuffix);
- THolder<ICompactionStrategy> CreateShardedCompactionStrategy(
- ui32 table,
- ICompactionBackend* backend,
- IResourceBroker* broker,
- NUtil::ILogger* logger,
- ITimeProvider* time,
- TString taskNameSuffix);
-
}
}
diff --git a/ydb/core/tablet_flat/flat_comp_shard.cpp b/ydb/core/tablet_flat/flat_comp_shard.cpp
deleted file mode 100644
index efb8fe189f9..00000000000
--- a/ydb/core/tablet_flat/flat_comp_shard.cpp
+++ /dev/null
@@ -1,2656 +0,0 @@
-#include "flat_comp_shard.h"
-#include "flat_part_charge.h"
-#include "flat_part_iter_multi.h"
-#include "flat_stat_part.h"
-#include "util_fmt_cell.h"
-
-#include <ydb/core/tablet_flat/protos/flat_table_shard.pb.h>
-#include <ydb/core/util/pb.h>
-
-#include <library/cpp/monlib/service/pages/templates.h>
-#include <util/generic/cast.h>
-
-#include <optional>
-
-namespace NKikimr {
-namespace NTable {
-namespace NCompShard {
-
- using NKikimr::NTable::NProto::TShardedStrategyStateInfo;
-
- namespace {
-
- static constexpr ui32 PRIORITY_UPDATE_FACTOR = 20;
-
- using NFmt::TPrintableTypedCells;
-
- class TPrintableShardRange {
- public:
- TPrintableShardRange(const TTableShard& shard, const TTableInfo& table)
- : Shard(shard)
- , Table(table)
- { }
-
- friend IOutputStream& operator<<(IOutputStream& out, const TPrintableShardRange& v) {
- if (v.Shard.LeftKey == 0 && v.Shard.RightKey == 0) {
- out << "all";
- return out;
- }
- out << '[';
- if (v.Shard.LeftKey != 0) {
- out << TPrintableTypedCells(v.Table.SplitKeys.at(v.Shard.LeftKey).GetCells(), v.Table.RowScheme->Keys->BasicTypes());
- } else {
- out << "-inf";
- }
- out << ", ";
- if (v.Shard.RightKey != 0) {
- out << TPrintableTypedCells(v.Table.SplitKeys.at(v.Shard.RightKey).GetCells(), v.Table.RowScheme->Keys->BasicTypes());
- } else {
- out << "+inf";
- }
- out << ')';
- return out;
- }
-
- private:
- const TTableShard& Shard;
- const TTableInfo& Table;
- };
-
- TPartView MakePartView(TIntrusiveConstPtr<TPart> part, TIntrusiveConstPtr<TSlices> slices) noexcept {
- TPartView partView{ std::move(part), nullptr, std::move(slices) };
- partView.Screen = partView.Slices->ToScreen(); // TODO: remove screen from TPartView
- return partView;
- }
-
- }
-
- bool TSplitStatIterator::TCmpHeapByFirstKey::operator()(const TItemState* b, const TItemState* a) const noexcept {
- TCellsRef left = a->Slice.FirstKey.GetCells();
- TCellsRef right = b->Slice.FirstKey.GetCells();
- if (int cmp = ComparePartKeys(left, right, KeyCellDefaults)) {
- return cmp < 0;
- }
- return a->Slice.FirstInclusive && !b->Slice.FirstInclusive;
- }
-
- bool TSplitStatIterator::TCmpHeapByNextKey::operator()(const TItemState* b, const TItemState* a) const noexcept {
- TCellsRef left = a->NextKey;
- TCellsRef right = b->NextKey;
- return ComparePartKeys(left, right, KeyCellDefaults) < 0;
- }
-
- bool TSplitStatIterator::TCmpHeapByLastKey::operator()(const TItemState* b, const TItemState* a) const noexcept {
- TCellsRef left = a->Slice.LastKey.GetCells();
- TCellsRef right = b->Slice.LastKey.GetCells();
- if (int cmp = ComparePartKeys(left, right, KeyCellDefaults)) {
- return cmp < 0;
- }
- return !a->Slice.LastInclusive && b->Slice.LastInclusive;
- }
-
- bool TSplitStatIterator::HasStarted(const TItemState* item) const noexcept {
- TCellsRef left = item->Slice.FirstKey.GetCells();
- TCellsRef right = Key;
- return ComparePartKeys(left, right, KeyCellDefaults) < 0;
- }
-
- bool TSplitStatIterator::HasStopped(const TItemState* item) const noexcept {
- TCellsRef left = item->Slice.LastKey.GetCells();
- TCellsRef right = Key;
- if (int cmp = ComparePartKeys(left, right, KeyCellDefaults)) {
- return cmp < 0;
- }
- return !item->Slice.LastInclusive;
- }
-
- void TSplitStatIterator::TItemState::InitNextKey() noexcept {
- auto& columns = Part->Scheme->Groups[0].ColsKeyIdx;
- NextKey.resize(columns.size());
- for (size_t pos = 0; pos < columns.size(); ++pos) {
- NextKey[pos] = NextPage->Cell(columns[pos]);
- }
- }
-
- void TSplitStatIterator::TItemState::InitPageSize() noexcept {
- TRowId endRowId = NextPage ? NextPage->GetRowId() : Slice.EndRowId();
- LastPageSize = Helper.CalcSize(LastRowId, endRowId);
- LastPageRows = endRowId - LastRowId;
- LastRowId = endRowId;
- }
-
- void TSplitStatIterator::AddSlice(const TPart* part, const TSlice& slice, ui64 size) noexcept {
- auto* item = &Items.emplace_back(part, slice);
- InitQueue.push(item);
- RightSize_ += size;
- RightRows_ += slice.Rows();
- }
-
- bool TSplitStatIterator::Next() noexcept {
- if (!InitQueue && !NextQueue) {
- return false;
- }
-
- // Select the next split key
- TCellsRef selected;
- if (InitQueue && NextQueue) {
- TCellsRef left = InitQueue.top()->Slice.FirstKey.GetCells();
- TCellsRef right = NextQueue.top()->NextKey;
- if (ComparePartKeys(left, right, KeyCellDefaults) <= 0) {
- selected = left;
- } else {
- selected = right;
- }
- } else if (InitQueue) {
- selected = InitQueue.top()->Slice.FirstKey.GetCells();
- } else {
- selected = NextQueue.top()->NextKey;
- }
-
- Key.reserve(selected.size());
- Key.assign(selected.begin(), selected.end());
- TCellsRef splitKey = Key;
-
- // Take all matching items from init queue and initialize them
- while (InitQueue && ComparePartKeys(InitQueue.top()->Slice.FirstKey.GetCells(), splitKey, KeyCellDefaults) <= 0) {
- auto* item = InitQueue.top();
- InitQueue.pop();
- StartQueue.push(item);
-
- // Find the first page that is after the first slice row
- item->NextPage = item->Part->Index.LookupRow(item->Slice.BeginRowId());
- if (++item->NextPage && item->NextPage->GetRowId() < item->Slice.EndRowId()) {
- // That first page has a matching row, use it for key enumeration
- item->InitNextKey();
- NextQueue.push(item);
- }
- }
-
- // Take all slices from start queue that have crossed the boundary
- while (StartQueue && HasStarted(StartQueue.top())) {
- auto* item = StartQueue.top();
- StartQueue.pop();
- ActivationQueue.push_back(item);
- }
-
- // Everything in activation queue has crossed the boundary and now belongs on the left side
- for (auto* item : ActivationQueue) {
- item->InitPageSize();
- LeftSize_ += item->LastPageSize;
- LeftRows_ += item->LastPageRows;
-
- // Check if the last page is only partially on the left side
- if (item->LastPageRows <= 1) {
- item->IsPartial = false;
- } else if (item->NextPage) {
- item->IsPartial = ComparePartKeys(splitKey, item->NextKey, KeyCellDefaults) < 0;
- } else if (HasStopped(item)) {
- item->IsPartial = false;
- } else {
- item->IsPartial = true;
- StopQueue.push(item);
- }
-
- // The last page may not belong on the right side any more
- if (!item->IsPartial) {
- RightSize_ -= item->LastPageSize;
- RightRows_ -= item->LastPageRows;
- }
- }
- ActivationQueue.clear();
-
- // Take all slices that have next key matching the new boundary
- while (NextQueue && ComparePartKeys(NextQueue.top()->NextKey, splitKey, KeyCellDefaults) <= 0) {
- auto* item = NextQueue.top();
- NextQueue.pop();
-
- // The last page no longer belongs on the right side
- if (item->IsPartial) {
- RightSize_ -= item->LastPageSize;
- RightRows_ -= item->LastPageRows;
- item->IsPartial = false;
- }
-
- // Try moving to the next page
- if (++item->NextPage && item->NextPage->GetRowId() < item->Slice.EndRowId()) {
- item->InitNextKey();
- NextQueue.push(item);
- }
-
- // If there are any rows left the new page will activate on the next key
- if (item->LastRowId < item->Slice.EndRowId()) {
- ActivationQueue.push_back(item);
- }
- }
-
- // Take all slices that have their last row beyond the new boundary
- while (StopQueue && HasStopped(StopQueue.top())) {
- auto* item = StopQueue.top();
- StopQueue.pop();
-
- // Everything up to the end no longer belongs on the right side
- if (item->IsPartial) {
- RightSize_ -= item->LastPageSize;
- RightRows_ -= item->LastPageRows;
- item->IsPartial = false;
- }
- }
-
- return true;
- }
-
- bool TPageReuseBuilder::TItemState::InitNextKey() noexcept {
- if (NextPage && NextPage->GetRowId() < Slice.EndRowId()) {
- auto& columns = Part->Scheme->Groups[0].ColsKeyIdx;
- NextKey.resize(columns.size());
- for (size_t pos = 0; pos < columns.size(); ++pos) {
- NextKey[pos] = NextPage->Cell(columns[pos]);
- }
- First.Key = NextKey;
- First.RowId = NextPage->GetRowId();
- First.Inclusive = true;
- return true;
- } else {
- return false;
- }
- }
-
- TRowId TPageReuseBuilder::TItemState::GetNextRowId() const noexcept {
- return NextPage ? NextPage->GetRowId() : Part->Index.GetEndRowId();
- }
-
- void TPageReuseBuilder::AddSlice(const TPart* part, const TSlice& slice, bool reusable) noexcept {
- Items.emplace_back(part, slice, reusable);
- }
-
- TPageReuseBuilder::TResults TPageReuseBuilder::Build() noexcept {
- TResults results;
- TItemState* lastItem = nullptr;
-
- // Produces a min-heap by first key
- auto heapByFirstKeyMin = [this](const TItemState* b, const TItemState* a) noexcept -> bool {
- TCellsRef left = a->First.Key;
- TCellsRef right = b->First.Key;
- if (int cmp = ComparePartKeys(left, right, KeyCellDefaults)) {
- return cmp < 0;
- }
- return a->First.Inclusive && !b->First.Inclusive;
- };
-
- auto noIntersection = [this](const TRightBoundary& a, const TLeftBoundary& b) noexcept -> bool {
- TCellsRef left = a.Key;
- TCellsRef right = b.Key;
- if (int cmp = ComparePartKeys(left, right, KeyCellDefaults)) {
- return cmp < 0;
- }
- return !a.Inclusive || !b.Inclusive;
- };
-
- auto lessBoundary = [this](const TRightBoundary& a, const TRightBoundary& b) noexcept -> bool {
- TCellsRef left = a.Key;
- TCellsRef right = b.Key;
- if (int cmp = ComparePartKeys(left, right, KeyCellDefaults)) {
- return cmp < 0;
- }
- return !a.Inclusive && b.Inclusive;
- };
-
- TVector<TItemState*> enterQueue(Reserve(Items.size()));
- for (TItemState& item : Items) {
- item.NextPage = item.Part->Index.LookupRow(item.Slice.BeginRowId());
- Y_VERIFY(item.NextPage,
- "Cannot find row %" PRIu64 " in the index for part %s",
- item.Slice.BeginRowId(),
- item.Part->Label.ToString().c_str());
- if (item.NextPage->GetRowId() != item.Slice.BeginRowId()) {
- ++item.NextPage;
- }
- item.First.Key = item.Slice.FirstKey.GetCells();
- item.First.RowId = item.Slice.FirstRowId;
- item.First.Inclusive = item.Slice.FirstInclusive;
- enterQueue.push_back(&item);
- }
-
- std::make_heap(enterQueue.begin(), enterQueue.end(), heapByFirstKeyMin);
-
- TVector<TCell> tmpStart;
- TVector<TCell> tmpEnd;
-
- bool reuseActive = true;
- while (enterQueue) {
- TRightBoundary end;
-
- // a separate scope to keep local variables
- {
- auto* item = enterQueue.front();
- std::pop_heap(enterQueue.begin(), enterQueue.end(), heapByFirstKeyMin);
- enterQueue.pop_back();
-
- TLeftBoundary start = item->First;
- bool viable = item->Reusable;
-
- // Protect start boundary against future overwrites
- tmpStart.assign(start.Key.begin(), start.Key.end());
- start.Key = tmpStart;
-
- if (item->NextPage && item->NextPage->GetRowId() == start.BeginRowId()) {
- ++item->NextPage;
- } else {
- // Sub-slice is not viable unless it starts on a page boundary
- viable = false;
- }
-
- if (item->InitNextKey()) {
- enterQueue.push_back(item);
- std::push_heap(enterQueue.begin(), enterQueue.end(), heapByFirstKeyMin);
-
- end.Key = item->NextKey;
- end.RowId = item->NextPage->GetRowId();
- end.Inclusive = false;
- } else {
- end.Key = item->Slice.LastKey.GetCells();
- end.RowId = item->Slice.LastRowId;
- end.Inclusive = item->Slice.LastInclusive;
- }
-
- // Protect end boundary against future overwrites
- tmpEnd.assign(end.Key.begin(), end.Key.end());
- end.Key = tmpEnd;
-
- if (viable && end.EndRowId() != item->GetNextRowId()) {
- // Sub-slice is not viable unless it ends on a page boundary
- viable = false;
- }
-
- // Move on if current page has no intersection with others
- if (!enterQueue || item == enterQueue.front() || noIntersection(end, enterQueue.front()->First)) {
- if (viable) {
- if (reuseActive && lastItem == item && results.Reusable.back().Slice.EndRowId() == start.BeginRowId()) {
- // Merge adjacent pages as long as it's from the same item
- results.Reusable.back().Slice.LastKey = TSerializedCellVec(end.Key);
- results.Reusable.back().Slice.LastRowId = end.RowId;
- results.Reusable.back().Slice.LastInclusive = end.Inclusive;
- } else {
- auto& result = results.Reusable.emplace_back();
- result.Part = item->Part;
- result.Slice.FirstKey = TSerializedCellVec(start.Key);
- result.Slice.FirstRowId = start.RowId;
- result.Slice.FirstInclusive = start.Inclusive;
- result.Slice.LastKey = TSerializedCellVec(end.Key);
- result.Slice.LastRowId = end.RowId;
- result.Slice.LastInclusive = end.Inclusive;
- ++results.ExpectedSlices;
- reuseActive = true;
- lastItem = item;
- }
- } else if (reuseActive) {
- ++results.ExpectedSlices;
- reuseActive = false;
- lastItem = nullptr;
- }
- continue;
- }
- }
-
- if (reuseActive) {
- ++results.ExpectedSlices;
- reuseActive = false;
- lastItem = nullptr;
- }
-
- // Now flush all items that have potential intersections, those cannot be reused
- while (enterQueue && !noIntersection(end, enterQueue.front()->First)) {
- auto* item = enterQueue.front();
- std::pop_heap(enterQueue.begin(), enterQueue.end(), heapByFirstKeyMin);
- enterQueue.pop_back();
-
- if (item->NextPage && item->NextPage->GetRowId() == item->First.BeginRowId()) {
- ++item->NextPage;
- }
-
- TRightBoundary candidate;
-
- if (item->InitNextKey()) {
- enterQueue.push_back(item);
- std::push_heap(enterQueue.begin(), enterQueue.end(), heapByFirstKeyMin);
-
- candidate.Key = item->NextKey;
- candidate.RowId = item->NextPage->GetRowId();
- candidate.Inclusive = false;
- } else {
- candidate.Key = item->Slice.LastKey.GetCells();
- candidate.RowId = item->Slice.LastRowId;
- candidate.Inclusive = item->Slice.LastInclusive;
- }
-
- if (lessBoundary(end, candidate)) {
- end = candidate;
-
- // Protect end boundary against future overwrites
- tmpEnd.assign(end.Key.begin(), end.Key.end());
- end.Key = tmpEnd;
- }
- }
- }
-
- return results;
- }
-
- void TTableShard::RegisterItem(const TTablePart& info, TTablePart::TItem& item, bool isGarbage) noexcept {
- auto r = Levels->Add(info.Part, item.Slice);
- item.Level = r.Level;
- item.Position = r.Position;
-
- // register stats for the specific level
- if (PerLevelStats.size() <= item.Level->Index) {
- PerLevelStats.resize(item.Level->Index + 1);
- }
- PerLevelStats[item.Level->Index] += item.Stats;
-
- if (PerLevelGarbage.size() <= item.Level->Index) {
- PerLevelGarbage.resize(item.Level->Index + 1);
- }
- if (isGarbage) {
- PerLevelGarbage[item.Level->Index] += item.Stats.Size;
- }
- }
-
- bool TTableShard::FindSplitKey(TSerializedCellVec& foundKey, const TKeyCellDefaults& keyDefaults) const noexcept {
- TSplitStatIterator it(keyDefaults);
-
- for (const auto& kvInfo : Parts) {
- for (const auto& kvSlice : kvInfo.second.Slices) {
- it.AddSlice(kvInfo.second.Part.Get(), kvSlice.second.Slice, kvSlice.second.Stats.Size);
- }
- }
-
- TVector<TCell> bestSplitKey;
- ui64 bestLeftSize = it.LeftSize();
- ui64 bestRightSize = it.RightSize();
- Y_VERIFY(bestLeftSize == 0);
- Y_VERIFY(bestRightSize == Stats.Size, "Full stats size is out of sync!");
-
- bool stop = false;
- while (!stop && it.Next()) {
- TCellsRef splitKey = it.CurrentKey();
- auto leftSize = it.LeftSize();
- auto rightSize = it.RightSize();
-
- // left size increases, right size decreases
- // as soon as left becomes bigger than right it won't get any better
- if (leftSize > rightSize) {
- stop = true;
- auto currentDiff = bestRightSize - bestLeftSize;
- if (currentDiff <= leftSize - rightSize) {
- // previous iteration has given us a better difference
- break;
- }
- }
-
- bestSplitKey.assign(splitKey.begin(), splitKey.end());
- bestLeftSize = leftSize;
- bestRightSize = rightSize;
- }
-
- if (bestLeftSize < Stats.Size && bestRightSize < Stats.Size) {
- // We found a key that splits current shard somewhat evenly and
- // the size estimate of individual subshards actually decreses
- foundKey = TSerializedCellVec(bestSplitKey);
- return true;
- }
-
- // We cannot find split key for some reason
- // For example there's a single row-page that is way too big, or there
- // are lots of slices stacked on top of each other with no natural way
- // to divide them on a page boundary.
- return false;
- }
-
- bool TSliceSplitOp::Execute(IPages* env) {
- const TIntrusiveConstPtr<TKeyCellDefaults> keyDefaults = Table->RowScheme->Keys;
-
- TVector<TCell> keyCellsBuffer(keyDefaults->Size());
- auto getKeyCells = [this, &keyDefaults, &keyCellsBuffer](ui64 keyId) {
- auto* key = Table->SplitKeys.FindPtr(keyId);
- Y_VERIFY(key, "Cannot find split key %" PRIu64, keyId);
-
- auto keyCells = key->GetCells();
- if (keyCells.size() < keyDefaults->Size()) {
- for (size_t i = 0; i < keyCells.size(); ++i) {
- keyCellsBuffer[i] = keyCells[i];
- }
- for (size_t i = keyCells.size(); i < keyDefaults->Size(); ++i) {
- keyCellsBuffer[i] = keyDefaults->Defs[i];
- }
- keyCells = keyCellsBuffer;
- }
- Y_VERIFY_DEBUG(keyCells.size() == keyDefaults->Size());
-
- return keyCells;
- };
-
- bool ok = true;
- TCharge charge(env, *Part, TTagsRef{ });
- for (size_t idx = 1; idx < Shards.size(); ++idx) {
- auto keyCells = getKeyCells(Shards[idx]->LeftKey);
- ok &= charge.SplitKey(keyCells, *keyDefaults, Slice.BeginRowId(), Slice.EndRowId());
- }
-
- if (!ok) {
- // Some pages are missing
- return false;
- }
-
- TSlice current = Slice;
- TVector<TSlice> results(Reserve(Shards.size()));
- TPartSimpleIt it(Part.Get(), { }, keyDefaults, env);
- for (size_t idx = 1; idx < Shards.size(); ++idx) {
- const TRowId currentBegin = current.BeginRowId();
- const TRowId currentEnd = current.EndRowId();
- if (currentBegin >= currentEnd) {
- break;
- }
-
- it.SetBounds(currentBegin, currentEnd);
-
- auto keyCells = getKeyCells(Shards[idx]->LeftKey);
- auto ready = it.Seek(keyCells, ESeek::Lower);
- Y_VERIFY(ready != EReady::Page, "Unexpected failure, precharge logic may be faulty");
-
- // This is row id of the first row >= split key
- const TRowId rowId = ready == EReady::Data ? it.GetRowId() : currentEnd;
- Y_VERIFY_DEBUG(currentBegin <= rowId);
- Y_VERIFY_DEBUG(rowId <= currentEnd);
-
- // This is the first key >= split key
- TSerializedCellVec rowKey;
- if (rowId != currentEnd) {
- rowKey = TSerializedCellVec(it.GetRawKey());
- } else if (Y_UNLIKELY(ComparePartKeys(current.LastKey.GetCells(), keyCells, *keyDefaults) < 0)) {
- // This shouldn't normally happen, but better safe than sorry
- // Current split key is actually out of bounds for the slice
- // Since there's no real intersection leave it as is
- break;
- }
-
- // Do we have any rows to the left of the split?
- if (currentBegin < rowId) {
- auto& left = results.emplace_back();
- left.FirstKey = std::move(current.FirstKey);
- left.FirstRowId = current.FirstRowId;
- left.FirstInclusive = current.FirstInclusive;
- if ((rowId - currentBegin) == 1 && left.FirstInclusive) {
- // Single row, just reuse the exact same key
- left.LastKey = left.FirstKey;
- left.LastRowId = left.FirstRowId;
- left.LastInclusive = true;
- } else {
- // Seek to previous row
- ready = it.Seek(rowId - 1);
- Y_VERIFY(ready == EReady::Data, "Unexpected failure, precharge logic may be faulty");
- left.LastKey = TSerializedCellVec(it.GetRawKey());
- left.LastRowId = it.GetRowId();
- left.LastInclusive = true;
- if ((rowId - currentBegin) == 1) {
- // Single row, just reuse the exact same key
- Y_VERIFY_DEBUG(!left.FirstInclusive);
- left.FirstKey = left.LastKey;
- left.FirstRowId = left.LastRowId;
- left.FirstInclusive = true;
- }
- }
-
- Y_VERIFY_DEBUG(left.BeginRowId() == currentBegin);
- Y_VERIFY_DEBUG(left.EndRowId() == rowId);
- }
-
- current.FirstKey = std::move(rowKey);
- current.FirstRowId = rowId;
- current.FirstInclusive = true;
- }
-
- if (current.BeginRowId() < current.EndRowId()) {
- auto& left = results.emplace_back();
- if (current.Rows() == 1 && (current.FirstInclusive ^ current.LastInclusive)) {
- if (current.FirstInclusive) {
- left.FirstKey = current.FirstKey;
- left.FirstRowId = current.FirstRowId;
- left.FirstInclusive = true;
- left.LastKey = current.FirstKey;
- left.LastRowId = current.FirstRowId;
- left.LastInclusive = true;
- } else {
- Y_VERIFY_DEBUG(current.LastInclusive);
- left.FirstKey = current.LastKey;
- left.FirstRowId = current.LastRowId;
- left.FirstInclusive = true;
- left.LastKey = current.LastKey;
- left.LastRowId = current.LastRowId;
- left.LastInclusive = true;
- }
- } else {
- left = std::move(current);
- }
- }
-
- TSliceSplitResult result;
- result.Part = Part;
- result.OldSlice = Slice;
- result.NewSlices = std::move(results);
- result.Shards = std::move(Shards);
- Consumer->OnSliceSplitResult(std::move(result));
-
- return true;
- }
-
- bool TUnderlayMask::HasKey(TCellsRef key) noexcept {
- while (Position != Bounds.end()) {
- // TODO: we could use binary search, but it's probably useless
- const auto& current = *Position;
- auto cmp = ComparePartKeys(key, current.LastKey.GetCells(), *RowScheme->Keys);
- if (cmp < 0 || (cmp == 0 && current.LastInclusive)) {
- break;
- }
- ++Position;
- }
- Y_VERIFY(ValidatePosition(key), "TUnderlayMask::HasKey called with out of order keys");
- if (Position != Bounds.end()) {
- const auto& current = *Position;
- auto cmp = ComparePartKeys(current.FirstKey.GetCells(), key, *RowScheme->Keys);
- return cmp < 0 || (cmp == 0 && current.FirstInclusive);
- }
- return false;
- }
-
- bool TUnderlayMask::ValidateOrder() const noexcept {
- auto it = Bounds.begin();
- if (it == Bounds.end()) {
- return true;
- }
- auto last = it++;
- while (it != Bounds.end()) {
- if (!TBounds::LessByKey(*last, *it, *RowScheme->Keys)) {
- return false;
- }
- last = it++;
- }
- return true;
- }
-
- bool TUnderlayMask::ValidatePosition(TCellsRef key) const noexcept {
- auto pos = Position;
- if (pos == Bounds.begin()) {
- return true;
- }
- auto& prev = *--pos;
- auto cmp = ComparePartKeys(prev.LastKey.GetCells(), key, *RowScheme->Keys);
- return cmp < 0 || (cmp == 0 && !prev.LastInclusive);
- }
-
- THolder<TUnderlayMask> TUnderlayMask::Build(
- TIntrusiveConstPtr<TRowScheme> rowScheme,
- TVector<const TBounds*>& input) noexcept
- {
- const TKeyCellDefaults& keyDefaults = *rowScheme->Keys;
-
- // Sorts heap by the first key (returns true when a->FirstKey < b->FirstKey)
- auto heapByFirstKeyMin = [&keyDefaults](const TBounds* b, const TBounds* a) noexcept -> bool {
- TCellsRef left = a->FirstKey.GetCells();
- TCellsRef right = b->FirstKey.GetCells();
- if (int cmp = ComparePartKeys(left, right, keyDefaults)) {
- return cmp < 0;
- }
- return a->FirstInclusive && !b->FirstInclusive;
- };
-
- // Returns true when a->LastKey < b->LastKey
- auto lastKeyLess = [&keyDefaults](const TBounds* a, const TBounds* b) noexcept -> bool {
- TCellsRef left = a->LastKey.GetCells();
- TCellsRef right = b->LastKey.GetCells();
- if (int cmp = ComparePartKeys(left, right, keyDefaults)) {
- return cmp < 0;
- }
- return !a->LastInclusive && b->LastInclusive;
- };
-
- // Returns true when a->LastKey may be stitched with b->FirstKey
- auto mayStitch = [&keyDefaults](const TBounds* a, const TBounds* b) noexcept -> bool {
- TCellsRef left = a->LastKey.GetCells();
- TCellsRef right = b->FirstKey.GetCells();
- int cmp = ComparePartKeys(left, right, keyDefaults);
- if (cmp < 0) {
- return false;
- }
- if (cmp == 0) {
- return a->LastInclusive || b->FirstInclusive;
- }
- return true;
- };
-
- TVector<TBounds> results(Reserve(input.size()));
-
- std::make_heap(input.begin(), input.end(), heapByFirstKeyMin);
-
- while (input) {
- const TBounds* first = input.front();
- std::pop_heap(input.begin(), input.end(), heapByFirstKeyMin);
- input.pop_back();
-
- const TBounds* last = first;
-
- while (input && mayStitch(last, input.front())) {
- auto* current = input.front();
- std::pop_heap(input.begin(), input.end(), heapByFirstKeyMin);
- input.pop_back();
-
- if (lastKeyLess(last, current)) {
- last = current;
- }
- }
-
- results.emplace_back(
- first->FirstKey,
- last->LastKey,
- first->FirstInclusive,
- last->LastInclusive);
- }
-
- return MakeHolder<TUnderlayMask>(std::move(rowScheme), std::move(results));
- }
-
- TSplitKeys::TSplitKeys(TIntrusiveConstPtr<TRowScheme> rowScheme, TSplitKeys::TKeysVec keys)
- : RowScheme(std::move(rowScheme))
- , Keys(std::move(keys))
- {
- Y_VERIFY(ValidateOrder(), "TSplitKeys got keys in an invalid order");
- Reset();
- }
-
- TSplitKeys::TSplitKeys(TIntrusiveConstPtr<TRowScheme> rowScheme, TVector<const TBounds*> bounds)
- : RowScheme(std::move(rowScheme))
- {
- auto boundsLess = [this](const TBounds* a, const TBounds* b) noexcept -> bool {
- if (auto cmp = ComparePartKeys(a->FirstKey.GetCells(), b->FirstKey.GetCells(), *RowScheme->Keys)) {
- return cmp < 0;
- }
- return a->FirstInclusive && !b->FirstInclusive;
- };
- std::sort(bounds.begin(), bounds.end(), boundsLess);
-
- Keys.reserve(bounds.size());
- Inclusive.reserve(bounds.size());
-
- const TBounds* last = nullptr;
- for (const TBounds* edge : bounds) {
- if (!last || boundsLess(last, edge)) {
- Keys.emplace_back(edge->FirstKey);
- Inclusive.push_back(edge->FirstInclusive);
- }
- last = edge;
- }
-
- Y_VERIFY(ValidateOrder(), "TSplitKeys generated keys in an invalid order");
- Reset();
- }
-
- bool TSplitKeys::ShouldSplit(TCellsRef key) noexcept {
- bool splitFound = false;
- while (Position != Keys.end()) {
- // TODO: we could use binary search, but it's probably useless
- const auto& current = *Position;
- auto cmp = ComparePartKeys(key, current.GetCells(), *RowScheme->Keys);
- if (cmp < 0 || (cmp == 0 && !IsInclusive(Position))) {
- break;
- }
- splitFound = true;
- ++Position;
- }
- Y_VERIFY(ValidatePosition(key), "TSplitKeys::ShouldSplit called with out of order keys");
- return splitFound;
- }
-
- bool TSplitKeys::ValidateOrder() const noexcept {
- auto it = Keys.begin();
- if (it == Keys.end()) {
- return true;
- }
- auto posLess = [this](const TKeysVec::const_iterator& a, const TKeysVec::const_iterator& b) noexcept -> bool {
- if (auto cmp = ComparePartKeys(a->GetCells(), b->GetCells(), *RowScheme->Keys)) {
- return cmp < 0;
- }
- return IsInclusive(a) && !IsInclusive(b);
- };
- auto last = it++;
- while (it != Keys.end()) {
- if (!posLess(last, it)) {
- return false;
- }
- last = it++;
- }
- return true;
- }
-
- bool TSplitKeys::ValidatePosition(TCellsRef key) const noexcept {
- auto prev = Position;
- if (prev == Keys.begin()) {
- return true;
- }
- --prev;
- if (auto cmp = ComparePartKeys(prev->GetCells(), key, *RowScheme->Keys)) {
- return cmp < 0;
- }
- return IsInclusive(prev);
- }
-
- bool TSplitKeys::IsInclusive(TVector<TSerializedCellVec>::const_iterator pos) const noexcept {
- size_t index = pos - Keys.begin();
- return index >= Inclusive.size() || Inclusive[index];
- }
-
- void TShardedCompactionParams::Describe(IOutputStream& out) const noexcept {
- // TODO: display some additional info
- TCompactionParams::Describe(out);
- }
-
- void TShardedCompactionStrategy::Start(TCompactionState state) {
- Y_VERIFY(!Policy, "Strategy has already been started");
-
- const auto* scheme = Backend->TableScheme(Table);
- Policy = scheme->CompactionPolicy;
- TableInfo.RowScheme = Backend->RowScheme(Table);
-
- TShardedStrategyStateInfo header;
- TVector<ui64> splitKeyIds;
- TVector<ui64> shardIds;
-
- if (auto* pRawHeader = state.StateSnapshot.FindPtr(0)) {
- bool ok = ParseFromStringNoSizeLimit(header, *pRawHeader);
- Y_VERIFY(ok);
-
- TableInfo.LastSplitKey = header.GetLastSplitKey();
- TableInfo.LastShardId = header.GetLastShardId();
-
- for (ui64 keyId : header.GetSplitKeys()) {
- auto* pRawKey = state.StateSnapshot.FindPtr(keyId);
- Y_VERIFY(pRawKey);
- TableInfo.SplitKeys[keyId] = TSerializedCellVec(*pRawKey);
- splitKeyIds.emplace_back(keyId);
- }
-
- for (ui64 shardId : header.GetShards()) {
- shardIds.emplace_back(shardId);
- }
- }
-
- const auto& keyDefaults = *TableInfo.RowScheme->Keys;
- std::sort(splitKeyIds.begin(), splitKeyIds.end(), [&](ui64 a, ui64 b) -> bool {
- auto left = TableInfo.SplitKeys.at(a).GetCells();
- auto right = TableInfo.SplitKeys.at(b).GetCells();
- return ComparePartKeys(left, right, keyDefaults) < 0;
- });
-
- Shards.PushBack(new TTableShard());
- for (ui64 keyId : splitKeyIds) {
- auto* last = Shards.Back();
- Shards.PushBack(new TTableShard());
- auto* shard = Shards.Back();
- last->RightKey = keyId;
- shard->LeftKey = keyId;
- }
-
- if (shardIds) {
- Y_VERIFY(shardIds.size() == splitKeyIds.size() + 1,
- "Have %" PRISZT " shards with %" PRISZT " split keys",
- shardIds.size(), splitKeyIds.size());
-
- auto it = shardIds.begin();
- for (auto& shard : Shards) {
- Y_VERIFY(it != shardIds.end());
- shard.Id = *it++;
- }
- Y_VERIFY(it == shardIds.end());
- } else if (splitKeyIds) {
- // Assume we upgraded from some old dev code, renumber from 1
- for (auto& shard : Shards) {
- shard.Id = ++TableInfo.LastShardId;
- }
- }
-
- for (auto& shard : Shards) {
- // Initialize all shards while empty
- shard.Levels.Reset(new TLevels(TableInfo.RowScheme->Keys));
- }
-
- for (auto& part : Backend->TableColdParts(Table)) {
- ColdParts.emplace_back(std::move(part));
- Y_FAIL("Sharded compaction does not support cold parts");
- }
-
- auto parts = Backend->TableParts(Table);
- std::sort(parts.begin(), parts.end(),
- [](const TPartView& a, const TPartView& b) -> bool {
- if (a->Epoch != b->Epoch) {
- return a->Epoch < b->Epoch;
- }
- return a->Label < b->Label;
- });
-
- while (parts) {
- const auto& partView = parts.back();
- TPartDataSizeHelper helper(partView.Part.Get());
- ui64 fullSize = helper.CalcSize(0, Max<TRowId>());
- if (fullSize >= Policy->ShardPolicy.GetMinSliceSize()) {
- break;
- }
- AllBackingSize += partView.Part->BackingSize();
- NurseryDataSize += fullSize;
- auto& nurseryItem = Nursery.emplace_back();
- nurseryItem.PartView = partView;
- nurseryItem.DataSize = fullSize;
- parts.pop_back();
- }
-
- for (const auto& partView : parts) {
- Y_VERIFY(partView.Slices && !partView.Slices->empty());
- EnsureGlobalPart(partView.Part, partView.Slices);
- }
-
- AddParts(std::move(parts));
-
- CheckCompactions();
-
- // TODO: schedule split/merge if necessary (performed in ApplyChanges)
- }
-
- void TShardedCompactionStrategy::Stop() {
- while (PendingSliceSplits) {
- auto* opWeak = PendingSliceSplits.Front();
- auto readId = opWeak->GetReadId();
- bool ok = Backend->CancelRead(readId);
- Y_VERIFY(ok, "Failed to cancel read with id %" PRIu64, readId);
- }
-
- CancelTask(ForcedCompactionTask);
- ForcedCompactionPending = false;
-
- for (auto& shard : Shards) {
- CancelTask(shard.Task);
- }
-
- if (MemCompactionId != 0) {
- Backend->CancelCompaction(MemCompactionId);
- MemCompactionId = 0;
- MemCompactionForced = false;
- NurseryTaken = 0;
- }
-
- // Make it possible to Start again
- TableInfo = { };
- Policy = nullptr;
- Shards.Clear();
- SliceSplitResults.clear();
- AllParts.clear();
- AllBackingSize = 0;
- Nursery.clear();
- NurseryDataSize = 0;
- ColdParts.clear();
- }
-
- void TShardedCompactionStrategy::ReflectSchema() {
- const auto* scheme = Backend->TableScheme(Table);
-
- TString err;
- bool ok = NLocalDb::ValidateCompactionPolicyChange(*Policy, *scheme->CompactionPolicy, err);
- Y_VERIFY(ok, "table %s id %u: %s", scheme->Name.c_str(), scheme->Id, err.c_str());
-
- Policy = scheme->CompactionPolicy;
- TableInfo.RowScheme = Backend->RowScheme(Table);
-
- CheckCompactions();
- }
-
- void TShardedCompactionStrategy::ReflectRemovedRowVersions() {
- // nothing
- }
-
- void TShardedCompactionStrategy::UpdateCompactions() {
- CheckCompactions();
- }
-
- float TShardedCompactionStrategy::GetOverloadFactor() {
- // TODO: implement overload factor
- return 0.0f;
- }
-
- ui64 TShardedCompactionStrategy::GetBackingSize() {
- return AllBackingSize;
- }
-
- ui64 TShardedCompactionStrategy::GetBackingSize(ui64 ownerTabletId) {
- // FIXME: maybe implement some day
- Y_UNUSED(ownerTabletId);
- return AllBackingSize;
- }
-
- ui64 TShardedCompactionStrategy::BeginMemCompaction(TTaskId taskId, TSnapEdge edge, ui64 forcedCompactionId) {
- auto params = MakeHolder<TShardedCompactionParams>();
- params->Table = Table;
- params->TaskId = taskId;
- params->Edge = edge;
- params->KeepInCache = true;
-
- NurseryTaken = 0;
- if (edge.Head == TEpoch::Max() && Nursery && forcedCompactionId == 0) {
- ui64 expectedSize = Backend->TableMemSize(Table, edge.Head);
- if (expectedSize > 0) {
- bool takeAll = (
- (expectedSize + NurseryDataSize) >= (2 * Policy->ShardPolicy.GetMinSliceSize()) ||
- (Nursery.size() >= Policy->ShardPolicy.GetMaxSlicesPerLevel()));
- while (NurseryTaken < Nursery.size()) {
- auto& next = Nursery[NurseryTaken];
- if (next.DataSize > expectedSize && !takeAll) {
- break;
- }
- params->Parts.push_back(next.PartView);
- expectedSize += next.DataSize;
- ++NurseryTaken;
- }
- }
- }
-
- if (!Policy->KeepEraseMarkers) {
- params->IsFinal = NurseryTaken == Nursery.size() && AllParts.empty();
- } else {
- params->IsFinal = false;
- }
-
- if (!params->IsFinal && !Policy->KeepEraseMarkers) {
- TVector<const TBounds*> allBounds;
- for (size_t pos = NurseryTaken; pos < Nursery.size(); ++pos) {
- auto& item = Nursery[pos];
- for (const auto& slice : *item.PartView.Slices) {
- allBounds.push_back(&slice);
- }
- }
- for (auto& kv : AllParts) {
- for (auto& slice : *kv.second.Slices) {
- allBounds.push_back(&slice);
- }
- }
- Y_VERIFY(allBounds, "Unexpected lack of slice bounds");
- params->UnderlayMask = TUnderlayMask::Build(TableInfo.RowScheme, allBounds);
- }
-
- if (TableInfo.SplitKeys) {
- TVector<TSerializedCellVec> splitKeys;
- auto* last = Shards.Back();
- auto* shard = Shards.Front();
- while (shard != last) {
- splitKeys.push_back(TableInfo.SplitKeys.at(shard->RightKey));
- shard = shard->Next()->Node();
- }
- Y_VERIFY(splitKeys, "Unexpected lack of split keys");
- params->SplitKeys = MakeHolder<TSplitKeys>(TableInfo.RowScheme, std::move(splitKeys));
- }
-
- MemCompactionId = Backend->BeginCompaction(std::move(params));
- MemCompactionForced = forcedCompactionId != 0;
- return MemCompactionId;
- }
-
- TCompactionChanges TShardedCompactionStrategy::CompactionFinished(
- ui64 compactionId,
- THolder<TCompactionParams> paramsRaw,
- THolder<TCompactionResult> result)
- {
- auto* params = CheckedCast<TShardedCompactionParams*>(paramsRaw.Get());
- bool processNursery = false;
- bool doNurseryFlush = false;
-
- if (auto* shard = params->InputShard) {
- Y_VERIFY(shard->Task.State == EState::Compacting);
- Y_VERIFY(shard->Task.CompactionId == compactionId);
- shard->Task.State = EState::Free;
- shard->Task.TaskId = 0;
- shard->Task.CompactionId = 0;
- shard->FailingLevel = Max<size_t>();
-
- // Every compacted part should have the original from our state
- Y_VERIFY(params->Parts.size() == params->Original.size());
- } else if (compactionId == ForcedCompactionTask.CompactionId) {
- Y_VERIFY(ForcedCompactionTask.State == EState::Compacting);
- ForcedCompactionTask.State = EState::Free;
- ForcedCompactionTask.TaskId = 0;
- ForcedCompactionTask.CompactionId = 0;
- } else {
- Y_VERIFY(MemCompactionId == compactionId);
- MemCompactionId = 0;
-
- if (MemCompactionForced) {
- doNurseryFlush = true;
- MemCompactionForced = false;
- ForcedCompactionPending = true;
- } else {
- processNursery = true;
- }
-
- // Mem compaction only uses nursery, no reused or original parts expected
- Y_VERIFY(!params->Original);
- Y_VERIFY(!params->Reused);
-
- for (const auto& partView : params->Parts) {
- Y_VERIFY(NurseryTaken > 0);
- auto& item = Nursery.front();
- Y_VERIFY(partView.Part == item.PartView.Part);
- AllBackingSize -= partView.Part->BackingSize();
- NurseryDataSize -= item.DataSize;
- Nursery.pop_front();
- --NurseryTaken;
- }
- Y_VERIFY(NurseryTaken == 0);
- params->Parts.clear();
- }
-
- // These parts may have become garbage after compaction
- THashSet<TGlobalPart*> updateGarbageQueue;
-
- // Remove compacted slices from global parts
- for (const auto& partView : params->Parts) {
- auto label = partView->Label;
- auto* allInfo = AllParts.FindPtr(label);
- Y_VERIFY(allInfo, "Compacted part %s is not registered", label.ToString().c_str());
-
- allInfo->Slices = TSlices::Subtract(allInfo->Slices, partView.Slices);
- updateGarbageQueue.emplace(allInfo);
- }
-
- // Remove originally chosen slices from our state
- for (auto& partView : params->Original) {
- auto label = partView->Label;
- auto* allInfo = AllParts.FindPtr(label);
- Y_VERIFY(allInfo, "Compacted part %s is not registered", label.ToString().c_str());
-
- size_t removedCount = 0;
-
- // TODO: make it more efficient
- auto writePos = allInfo->Shards.begin();
- auto readPos = allInfo->Shards.begin();
- while (readPos != allInfo->Shards.end()) {
- auto* partShard = *readPos;
- auto* info = partShard->Parts.FindPtr(label);
- Y_VERIFY(info, "Compacted part %s cannot be found in a shard", label.ToString().c_str());
- for (const auto& input : *partView.Slices) {
- auto pos = info->Slices.find(input);
- if (pos != info->Slices.end()) {
- auto& item = pos->second;
- partShard->Levels.Reset();
- partShard->PerLevelStats.clear();
- partShard->PerLevelGarbage.clear();
- partShard->Stats -= item.Stats;
- info->Slices.erase(pos);
- ++removedCount;
- }
- }
- if (info->Slices.empty()) {
- // This part no longer contributes to this shard
- partShard->Parts.erase(label);
- ++readPos;
- continue;
- }
- if (writePos != readPos) {
- *writePos = *readPos;
- }
- ++writePos;
- ++readPos;
- }
- if (writePos != allInfo->Shards.end()) {
- allInfo->Shards.erase(writePos, allInfo->Shards.end());
- }
-
- Y_VERIFY(removedCount == partView.Slices->size(), "Not all slices have been removed");
-
- if (allInfo->Shards.empty() && allInfo->SplitBlocks == 0 && allInfo->Slices->empty()) {
- updateGarbageQueue.erase(allInfo);
- TableInfo.GarbageParts.erase(label);
- AllBackingSize -= allInfo->Part->BackingSize();
- AllParts.erase(label);
- }
- }
-
- for (auto* allInfo : updateGarbageQueue) {
- UpdateGarbageStats(allInfo);
- }
-
- TVector<TPartView> parts;
-
- auto flushNursery = [this, &parts]() {
- while (Nursery) {
- // Go from oldest to newest
- auto& item = Nursery.back();
- auto& partView = item.PartView;
- AllBackingSize -= partView.Part->BackingSize();
- NurseryDataSize -= item.DataSize;
- parts.push_back(std::move(partView));
- Nursery.pop_back();
- }
- };
-
- if (doNurseryFlush) {
- flushNursery();
- }
-
- if (processNursery) {
- // We expect parts to be from the same epoch and in correct order
- for (auto& partView : result->Parts) {
- TPartDataSizeHelper helper(partView.Part.Get());
- ui64 fullSize = helper.CalcSize(0, Max<TRowId>());
- if (fullSize >= Policy->ShardPolicy.GetMinSliceSize()) {
- flushNursery();
- parts.push_back(std::move(partView));
- continue;
- }
- // Add new item to front (it's the newest)
- AllBackingSize += partView.Part->BackingSize();
- NurseryDataSize += fullSize;
- auto& nurseryItem = Nursery.emplace_front();
- nurseryItem.PartView = std::move(partView);
- nurseryItem.DataSize = fullSize;
- }
- } else {
- parts.reserve(parts.size() + result->Parts.size());
- for (auto& partView : result->Parts) {
- parts.push_back(std::move(partView));
- }
- }
- result->Parts.clear();
-
- // Register all "new" parts in the global state
- for (const auto& partView : parts) {
- Y_VERIFY(partView.Slices && !partView.Slices->empty());
- EnsureGlobalPart(partView.Part, partView.Slices);
- }
-
- // Add any reused slices back to our state
- if (params->Reused) {
- parts.reserve(parts.size() + params->Reused.size());
- for (auto& partView : params->Reused) {
- parts.emplace_back(std::move(partView));
- }
- params->Reused.clear();
- }
-
- AddParts(std::move(parts));
-
- return ApplyChanges();
- }
-
- void TShardedCompactionStrategy::PartMerged(TPartView part, ui32 level) {
- Y_VERIFY(level == 255, "Unexpected level of the merged part");
-
- Y_VERIFY(part.Slices && !part.Slices->empty());
- EnsureGlobalPart(part.Part, part.Slices);
-
- TVector<TPartView> parts(Reserve(1));
- parts.emplace_back(std::move(part));
- AddParts(std::move(parts));
- }
-
- void TShardedCompactionStrategy::PartMerged(TIntrusiveConstPtr<TColdPart> part, ui32 level) {
- Y_VERIFY(level == 255, "Unexpected level of the merged part");
-
- ColdParts.emplace_back(std::move(part));
- Y_FAIL("Sharded compaction does not support cold parts");
- }
-
- TCompactionChanges TShardedCompactionStrategy::PartsRemoved(TArrayRef<const TLogoBlobID> parts) {
- Y_UNUSED(parts);
-
- // For simplicity just stop and start again
- auto state = SnapshotState();
-
- Stop();
-
- Start(std::move(state));
-
- // We don't keep per-part state, so no changes
- return { };
- }
-
- TCompactionChanges TShardedCompactionStrategy::ApplyChanges() {
- RequestChangesPending = false;
-
- TCompactionChanges changes{ };
-
- struct TState : public TIntrusiveListItem<TState> {
- size_t Index = Max<size_t>();
- };
-
- TIntrusiveList<TState> needSort;
- THashMap<TLogoBlobID, TState> byLabel;
-
- TVector<TPartView> parts(Reserve(SliceSplitResults.size()));
-
- for (auto& result : SliceSplitResults) {
- Y_VERIFY(result.Part, "Unexpected result without a part");
-
- TChangeSlices* change;
-
- auto& state = byLabel[result.Part->Label];
- if (state.Index != Max<size_t>()) {
- needSort.PushBack(&state);
- change = &changes.SliceChanges[state.Index];
- } else {
- state.Index = changes.SliceChanges.size();
- change = &changes.SliceChanges.emplace_back();
- change->Label = result.Part->Label;
- }
-
- // New slices are guaranteed to be already sorted
- change->NewSlices.insert(change->NewSlices.end(),
- result.NewSlices.begin(), result.NewSlices.end());
-
- auto* allInfo = AllParts.FindPtr(result.Part->Label);
- Y_VERIFY(allInfo, "Cannot find a globally registered part %s", result.Part->Label.ToString().c_str());
- allInfo->SplitBlocks--;
-
- // Construct a fake TPartView so we may reuse AddParts method
- parts.emplace_back(TPartView{ std::move(result.Part), nullptr, new TSlices(std::move(result.NewSlices)) });
-
- // All affected shards are no longer blocked by this op
- for (auto* affectedShard : result.Shards) {
- affectedShard->SplitBlocks--;
- }
- }
- SliceSplitResults.clear();
-
- // We may need to sort some changes
- for (const auto& state : needSort) {
- auto cmpByRowId = [](const TSlice& a, const TSlice& b) noexcept -> bool {
- return a.EndRowId() <= b.BeginRowId();
- };
- auto& change = changes.SliceChanges[state.Index];
- std::sort(change.NewSlices.begin(), change.NewSlices.end(), cmpByRowId);
- }
-
- // Apply changes to our global state (matches the state after method returns)
- for (const auto& change : changes.SliceChanges) {
- auto* allInfo = AllParts.FindPtr(change.Label);
- Y_VERIFY(allInfo, "Generated changes to unregistered part %s", change.Label.ToString().c_str());
- allInfo->Slices = TSlices::Replace(allInfo->Slices, change.NewSlices);
- }
-
- // We couldn't see these slices during split, add them back
- AddParts(std::move(parts));
-
- // Check for split/merge unless forced compaction is currently running
- if (ForcedCompactionTask.State != EState::Compacting) {
- const auto& policy = Policy->ShardPolicy;
-
- auto isFreeShard = [](TTableShard* shard) -> bool {
- if (shard->SplitBlocks) {
- return false;
- }
- return shard->Task.State == EState::Free || shard->Task.State == EState::Pending;
- };
-
- bool stateChanged = false;
- const auto minShardSize = policy.GetMinShardSize();
- const auto maxShardSize = policy.GetMaxShardSize();
-
- // Split shards that are too big
- if (maxShardSize > 0) {
- TVector<TTableShard*> bigShards;
- for (auto& shard : Shards) {
- if (isFreeShard(&shard) && shard.Stats.Size > maxShardSize) {
- if (auto logl = Logger->Log(NUtil::ELnLev::Debug)) {
- logl << "Table " << Table << " has a big shard " << shard.Id << " with "
- << shard.Stats.Size << " > " << maxShardSize << " bytes";
- }
-
- bigShards.push_back(&shard);
- }
- }
-
- parts.clear();
- for (auto* shard : bigShards) {
- TSerializedCellVec splitKey;
- if (shard->FindSplitKey(splitKey, *TableInfo.RowScheme->Keys)) {
- ui64 newKeyId = ++TableInfo.LastSplitKey;
-
- if (auto logl = Logger->Log(NUtil::ELnLev::Debug)) {
- logl << "Table " << Table << " adding shard key " << newKeyId
- << ": " << TPrintableTypedCells(splitKey.GetCells(), TableInfo.RowScheme->Keys->BasicTypes());
- }
-
- changes.StateChanges[newKeyId] = splitKey.GetBuffer();
- TableInfo.SplitKeys[newKeyId] = splitKey;
-
- auto left = MakeHolder<TTableShard>();
- auto right = MakeHolder<TTableShard>();
- left->Id = ++TableInfo.LastShardId;
- left->LeftKey = shard->LeftKey;
- left->RightKey = newKeyId;
- right->Id = ++TableInfo.LastShardId;
- right->LeftKey = newKeyId;
- right->RightKey = shard->RightKey;
-
- for (auto& kvPart : shard->Parts) {
- TIntrusiveConstPtr<TPart> part = kvPart.second.Part;
- TVector<TSlice> slices(Reserve(kvPart.second.Slices.size()));
- for (auto& kvSlice : kvPart.second.Slices) {
- slices.emplace_back(kvSlice.second.Slice);
- }
-
- TGlobalPart* allInfo = AllParts.FindPtr(part->Label);
- Y_VERIFY(allInfo);
- auto end = std::remove(allInfo->Shards.begin(), allInfo->Shards.end(), shard);
- allInfo->Shards.erase(end, allInfo->Shards.end());
-
- // Construct a fake TPartView so we may reuse AddParts method
- parts.emplace_back(TPartView{std::move(part), nullptr, new TSlices(std::move(slices))});
- }
-
- left.Release()->LinkBefore(shard);
- right.Release()->LinkBefore(shard);
-
- CancelTask(shard->Task);
- delete shard;
-
- stateChanged = true;
- }
- }
-
- if (parts) {
- // Add back all the slices we have taken from splits
- AddParts(std::move(parts));
- }
- }
-
- // Merge shards that are too small
- if (Shards.Size() > 1) {
- using TShardList = TList<TTableShard*>;
- using TShardHash = THashMap<TTableShard*, TShardList::iterator>;
- TShardList smallShardList;
- TShardHash smallShardHash;
- for (auto& shard : Shards) {
- if (isFreeShard(&shard) && shard.Stats.Size <= minShardSize) {
- if (auto logl = Logger->Log(NUtil::ELnLev::Debug)) {
- logl << "Table " << Table << " has a small shard " << shard.Id << " with "
- << shard.Stats.Size << " > " << minShardSize << " bytes";
- }
-
- smallShardHash[&shard] = smallShardList.insert(smallShardList.end(), &shard);
- }
- }
-
- // sort small shards by their data size
- smallShardList.sort([](TTableShard* a, TTableShard* b) -> bool {
- return a->Stats.Size < b->Stats.Size;
- });
-
- parts.clear();
- while (!smallShardList.empty()) {
- auto* first = smallShardList.front();
- smallShardHash.erase(first);
- smallShardList.pop_front();
-
- // Empty shards are a special case, it's ok for them to just disappear
- // This also doesn't change the state of their neighbors in any way
- // The only side effect would be for those shards to become wider
- if (first->Parts.empty()) {
- auto* shard = first;
- Y_VERIFY(!shard->SplitBlocks);
-
- ui64 removedKey = 0;
- if (shard != Shards.Back()) {
- removedKey = shard->RightKey;
- Y_VERIFY(removedKey != 0);
- auto* target = shard->Next()->Node();
- target->LeftKey = shard->LeftKey;
- } else if (shard != Shards.Front()) {
- removedKey = shard->LeftKey;
- Y_VERIFY(removedKey != 0);
- auto* target = shard->Prev()->Node();
- target->RightKey = shard->RightKey;
- }
-
- if (removedKey) {
- auto it = TableInfo.SplitKeys.find(removedKey);
- Y_VERIFY(it != TableInfo.SplitKeys.end());
-
- if (auto logl = Logger->Log(NUtil::ELnLev::Debug)) {
- logl << "Table " << Table << " removing shard key " << it->first
- << ": " << TPrintableTypedCells(it->second.GetCells(), TableInfo.RowScheme->Keys->BasicTypes());
- }
-
- changes.StateChanges[removedKey] = { };
- TableInfo.SplitKeys.erase(it);
-
- CancelTask(shard->Task);
- delete shard;
-
- stateChanged = true;
- }
-
- continue;
- }
-
- auto* last = first;
- ui64 mergedSize = first->Stats.Size;
-
- while (true) {
- auto* left = first != Shards.Front() ? first->Prev()->Node() : nullptr;
- auto* right = last != Shards.Back() ? last->Next()->Node() : nullptr;
- if (left && !smallShardHash.contains(left)) {
- left = nullptr;
- }
- if (right && !smallShardHash.contains(right)) {
- right = nullptr;
- }
- if (left && right) {
- if (left->Stats.Size < right->Stats.Size) {
- right = nullptr;
- } else {
- left = nullptr;
- }
- }
- if (left) {
- if (mergedSize + left->Stats.Size > maxShardSize && maxShardSize > 0) {
- break;
- }
- first = left;
- smallShardList.erase(smallShardHash.at(left));
- smallShardHash.erase(left);
- mergedSize += left->Stats.Size;
- } else if (right) {
- if (mergedSize + right->Stats.Size > maxShardSize && maxShardSize > 0) {
- break;
- }
- last = right;
- smallShardList.erase(smallShardHash.at(right));
- smallShardHash.erase(right);
- mergedSize += right->Stats.Size;
- } else {
- break;
- }
- }
-
- if (first != last) {
- auto merged = MakeHolder<TTableShard>();
- merged->Id = ++TableInfo.LastShardId;
- merged->LeftKey = first->LeftKey;
- merged->RightKey = last->RightKey;
- merged.Release()->LinkBefore(first);
-
- TTableShard* next = first;
- while (next) {
- auto* shard = next;
- next = (shard != last) ? shard->Next()->Node() : nullptr;
-
- for (auto& kvPart : shard->Parts) {
- TIntrusiveConstPtr<TPart> part = kvPart.second.Part;
- TVector<TSlice> slices(Reserve(kvPart.second.Slices.size()));
- for (auto& kvSlice : kvPart.second.Slices) {
- slices.emplace_back(kvSlice.second.Slice);
- }
-
- TGlobalPart* allInfo = AllParts.FindPtr(part->Label);
- Y_VERIFY(allInfo);
- auto end = std::remove(allInfo->Shards.begin(), allInfo->Shards.end(), shard);
- allInfo->Shards.erase(end, allInfo->Shards.end());
-
- // Construct a fake TPartView so we may reuse AddParts method
- parts.emplace_back(TPartView{std::move(part), nullptr, new TSlices(std::move(slices))});
- }
-
- if (shard != first) {
- auto it = TableInfo.SplitKeys.find(shard->LeftKey);
- Y_VERIFY(it != TableInfo.SplitKeys.end());
-
- if (auto logl = Logger->Log(NUtil::ELnLev::Debug)) {
- logl << "Table " << Table << " removing shard key " << it->first
- << ": " << TPrintableTypedCells(it->second.GetCells(), TableInfo.RowScheme->Keys->BasicTypes());
- }
-
- changes.StateChanges[shard->LeftKey] = { };
- TableInfo.SplitKeys.erase(it);
- }
-
- CancelTask(shard->Task);
- delete shard;
-
- stateChanged = true;
- }
- }
- }
-
- if (parts) {
- // Add back all the slices we have taken from merges
- AddParts(std::move(parts));
- }
- }
-
- if (stateChanged) {
- SerializeStateInfo(&changes.StateChanges[0]);
- }
- }
-
- CheckCompactions();
-
- return changes;
- }
-
- TCompactionState TShardedCompactionStrategy::SnapshotState() {
- TCompactionState state;
-
- SerializeStateInfo(&state.StateSnapshot[0]);
- for (auto& kv : TableInfo.SplitKeys) {
- state.StateSnapshot[kv.first] = kv.second.GetBuffer();
- }
-
- return state;
- }
-
- bool TShardedCompactionStrategy::AllowForcedCompaction() {
- return !MemCompactionForced && !ForcedCompactionPending && ForcedCompactionTask.State == EState::Free;
- }
-
- void TShardedCompactionStrategy::OutputHtml(IOutputStream &out) {
- HTML(out) {
- if (ForcedCompactionPending || ForcedCompactionTask.State != EState::Free) {
- DIV_CLASS("row") {
- out << "Forced compaction";
- if (ForcedCompactionPending) {
- out << " waiting";
- }
- out << ", " << ForcedCompactionTask.State;
- if (ForcedCompactionTask.State != EState::Free) {
- out << ", Task #" << ForcedCompactionTask.TaskId;
- out << " (priority " << ForcedCompactionTask.Priority << ")";
- out << " submitted " << ForcedCompactionTask.SubmissionTimestamp.ToStringLocal();
- }
- }
- }
-
- for (auto& shard : Shards) {
- DIV_CLASS("row") {
- out << "Shard " << shard.Id << ' ';
- out << TPrintableShardRange(shard, TableInfo);
- out << ", " << shard.Parts.size() << " parts";
- if (shard.Levels) {
- out << ", " << shard.Levels->size() << " levels";
- } else if (shard.SplitBlocks) {
- out << ", " << shard.SplitBlocks << " slice splits";
- }
- out << ", " << shard.Stats.Size << " bytes";
- out << ", " << shard.Stats.Rows << " rows";
- out << ", " << shard.Task.State;
- if (shard.Task.State != EState::Free) {
- out << ", Task #" << shard.Task.TaskId;
- out << " (priority " << shard.Task.Priority << ")";
- out << " submitted " << shard.Task.SubmissionTimestamp.ToStringLocal();
- }
- }
- }
-
- PRE() {
- for (const auto& item : Nursery) {
- const auto* part = item.PartView.Part.Get();
- const auto& label = part->Label;
- out << "Genstep: " << label.Generation() << ":" << label.Step();
- out << " epoch " << part->Epoch;
- out << ", Backing size: " << part->BackingSize();
- out << ", Base: " << label;
- out << ", nursery";
- out << Endl;
- }
-
- TVector<const TPart*> parts(Reserve(AllParts.size()));
- for (auto& kv : AllParts) {
- parts.push_back(kv.second.Part.Get());
- }
- std::sort(parts.begin(), parts.end(), [](const TPart* a, const TPart* b) -> bool {
- if (a->Epoch != b->Epoch) {
- return a->Epoch > b->Epoch;
- }
- return a->Label > b->Label;
- });
- for (const TPart* part : parts) {
- auto& label = part->Label;
- out << "Genstep: " << label.Generation() << ":" << label.Step();
- out << " epoch " << part->Epoch;
- out << ", Backing size: " << part->BackingSize();
- out << ", Base: " << label;
- auto& info = AllParts.at(label);
- out << ", " << info.Slices->size() << " slices";
- if (info.Shards) {
- out << ", " << info.Shards.size() << " shards";
- }
- if (info.SplitBlocks) {
- out << ", " << info.SplitBlocks << " slice splits";
- }
- out << Endl;
- }
- }
- }
- }
-
- void TShardedCompactionStrategy::OnSliceSplitResult(TSliceSplitResult result) {
- SliceSplitResults.emplace_back(std::move(result));
- RequestChanges();
- }
-
- void TShardedCompactionStrategy::RequestChanges() noexcept {
- if (!RequestChangesPending) {
- RequestChangesPending = true;
- Backend->RequestChanges(Table);
- }
- }
-
- void TShardedCompactionStrategy::CheckCompactions() noexcept {
- CheckForcedCompaction();
-
- for (auto& shard : Shards) {
- CheckShardCompaction(&shard);
- }
- }
-
- void TShardedCompactionStrategy::AddParts(TVector<TPartView> parts) {
- // Types and default values for current table keys
- const auto& keyDefaults = *TableInfo.RowScheme->Keys;
-
- // A part/slice combination we scheduled to add
- struct TItem {
- TIntrusiveConstPtr<TPart> Part;
- TSlice Slice;
-
- TItem(TIntrusiveConstPtr<TPart> part, TSlice slice)
- : Part(std::move(part))
- , Slice(std::move(slice))
- { }
- };
-
- // Sorts items by their first key
- auto cmpByFirstKey = [&keyDefaults](const TItem& a, const TItem& b) noexcept -> bool {
- auto left = a.Slice.FirstKey.GetCells();
- auto right = b.Slice.FirstKey.GetCells();
- if (int cmp = ComparePartKeys(left, right, keyDefaults)) {
- return cmp < 0;
- }
- return a.Slice.FirstInclusive && !b.Slice.FirstInclusive;
- };
-
- // Returns true if the item is completely to the left of boundary
- auto cmpFullyInside = [&keyDefaults](const TItem& a, const TSerializedCellVec& boundary) noexcept -> bool {
- auto left = a.Slice.LastKey.GetCells();
- if (Y_UNLIKELY(!left)) {
- return false; // +inf
- }
- if (int cmp = ComparePartKeys(left, boundary.GetCells(), keyDefaults)) {
- return cmp < 0;
- }
- return !a.Slice.LastInclusive;
- };
-
- // Returns true if the item is completely to the right of boundary
- auto cmpFullyOutside = [&keyDefaults](const TItem& a, const TSerializedCellVec& boundary) noexcept -> bool {
- auto right = a.Slice.FirstKey.GetCells();
- if (Y_UNLIKELY(!right)) {
- return false; // -inf
- }
- return ComparePartKeys(boundary.GetCells(), right, keyDefaults) <= 0;
- };
-
- // A part/slice combination that crosses at least one shard boundary
- struct TSplitItem {
- TIntrusiveConstPtr<TPart> Part;
- TSlice Slice;
- TVector<TTableShard*> Shards;
-
- TSplitItem(TIntrusiveConstPtr<TPart> part, TSlice slice, TTableShard* firstShard)
- : Part(std::move(part))
- , Slice(std::move(slice))
- {
- Shards.push_back(firstShard);
- }
- };
-
- // Heap order of split items by their last key
- auto cmpHeapByLastKey = [&keyDefaults](const TSplitItem& a, const TSplitItem& b) noexcept -> bool {
- auto left = b.Slice.LastKey.GetCells();
- auto right = a.Slice.LastKey.GetCells();
- if (int cmp = ComparePartKeys(left, right, keyDefaults)) {
- return cmp < 0;
- }
- return !b.Slice.LastInclusive && a.Slice.LastInclusive;
- };
-
- // Returns true if item is completely to the left of boundary
- auto cmpSplitFullyInside = [&keyDefaults](const TSplitItem& a, const TSerializedCellVec& boundary) noexcept -> bool {
- auto left = a.Slice.LastKey.GetCells();
- if (Y_UNLIKELY(!left)) {
- return false; // +inf
- }
- if (int cmp = ComparePartKeys(left, boundary.GetCells(), keyDefaults)) {
- return cmp < 0;
- }
- return !a.Slice.LastInclusive;
- };
-
- struct TPendingState {
- TTableShard* const Shard;
- TTablePart* const Info;
- TPartDataSizeHelper SizeHelper;
- const bool IsGarbage;
-
- TPendingState(TTableShard* shard, TTablePart* info, bool isGarbage) noexcept
- : Shard(shard)
- , Info(info)
- , SizeHelper(info->Part.Get())
- , IsGarbage(isGarbage)
- {
- }
-
- void AddSlice(TSlice slice) noexcept {
- TTablePart::TSliceId sliceId(slice);
- auto r = Info->Slices.emplace(
- std::piecewise_construct,
- std::forward_as_tuple(sliceId),
- std::forward_as_tuple());
-
- Y_VERIFY(r.second, "Duplicate slices for rows [%" PRIu64 ",%" PRIu64 ")",
- sliceId.Begin, sliceId.End);
-
- auto& item = r.first->second;
- item.Slice = std::move(slice);
- item.Stats.Init(Info->Part.Get(), item.Slice, SizeHelper);
-
- Shard->Stats += item.Stats;
-
- if (Shard->Levels) {
- if (Info->Part->Epoch >= Shard->Levels->GetMaxEpoch()) {
- Shard->RegisterItem(*Info, item, IsGarbage);
- } else {
- // Adding new slices is no longer trivial
- Shard->Levels.Reset();
- }
- }
- }
- };
-
- size_t slicesCount = 0;
- for (auto& partView : parts) {
- Y_VERIFY(!partView.Slices->empty(), "Attempt to add part without slices");
-
- slicesCount += partView.Slices->size();
-
- auto* allInfo = AllParts.FindPtr(partView->Label);
- Y_VERIFY(allInfo, "Added part %s is not globally registered", partView->Label.ToString().c_str());
- }
-
- // We may need to split slices over multiple shards
- if (Shards.Size() > 1) {
-
- TVector<TItem> queue(Reserve(slicesCount));
- for (auto& partView : parts) {
- for (const auto& slice : *partView.Slices) {
- queue.emplace_back(partView.Part, slice);
- }
- }
- std::sort(queue.begin(), queue.end(), cmpByFirstKey);
-
- TTableShard* shard = Shards.Front();
- TSerializedCellVec* shardRightKey = TableInfo.SplitKeys.FindPtr(shard->RightKey);
- Y_VERIFY(shardRightKey);
-
- TTableShard* lastShard = Shards.Back();
-
- TVector<TItem*> pending;
- auto flushPending = [&]() noexcept -> void {
- std::sort(pending.begin(), pending.end(), [](const TItem* a, const TItem* b) noexcept -> bool {
- if (a->Part->Epoch != b->Part->Epoch) {
- // Prefer older epochs first
- return a->Part->Epoch < b->Part->Epoch;
- }
- if (a->Part.Get() != b->Part.Get()) {
- // Prefer same-epoch parts in their natural order
- return a->Part->Label < b->Part->Label;
- }
- // Prefer same-part slices in their natural order
- return a->Slice.BeginRowId() < b->Slice.BeginRowId();
- });
-
- std::optional<TPendingState> state;
-
- for (auto* pItem : pending) {
- if (!state || state->Info->Part.Get() != pItem->Part.Get()) {
- state.reset();
- auto label = pItem->Part->Label;
- state.emplace(shard, EnsurePart(shard, std::move(pItem->Part)), TableInfo.GarbageParts.contains(label));
- }
- state->AddSlice(std::move(pItem->Slice));
- }
-
- state.reset();
-
- pending.clear();
- };
-
- TVector<TSplitItem> splitQueue;
- auto flushShard = [&]() noexcept -> void {
- Y_VERIFY(shard != lastShard);
-
- // Move to the next shard
- shard = shard->Next()->Node();
-
- if (shard != lastShard) {
- shardRightKey = TableInfo.SplitKeys.FindPtr(shard->RightKey);
- Y_VERIFY(shardRightKey);
- } else {
- shardRightKey = nullptr;
- }
-
- if (splitQueue) {
- // Everything left in split queue crosses into the next shard
- for (auto& splitItem : splitQueue) {
- splitItem.Shards.push_back(shard);
- }
- shard->SplitBlocks += splitQueue.size();
-
- // Don't even try building levels for this shard
- shard->Levels.Reset();
- shard->PerLevelStats.clear();
- }
-
- while (!splitQueue.empty() && (shard == lastShard || cmpSplitFullyInside(splitQueue.front(), *shardRightKey))) {
- std::pop_heap(splitQueue.begin(), splitQueue.end(), cmpHeapByLastKey);
- auto& splitItem = splitQueue.back();
-
- AllParts.at(splitItem.Part->Label).SplitBlocks++;
-
- auto op = MakeHolder<TSliceSplitOp>(
- this,
- &TableInfo,
- std::move(splitItem.Shards),
- std::move(splitItem.Part),
- std::move(splitItem.Slice));
-
- auto* opWeak = op.Get();
- PendingSliceSplits.PushBack(opWeak);
-
- if (auto readId = Backend->BeginRead(std::move(op))) {
- // The read operation is currently pending
- // Remember its id so we may cancel it later
- opWeak->SetReadId(readId);
- }
-
- splitQueue.pop_back();
- }
- };
-
- auto pos = queue.begin();
- while (pos != queue.end()) {
- auto& item = *pos;
-
- if (shard == lastShard || cmpFullyInside(item, *shardRightKey)) {
- pending.emplace_back(&item);
- ++pos;
- continue;
- }
-
- if (cmpFullyOutside(item, *shardRightKey)) {
- flushPending();
- flushShard();
- continue;
- }
-
- // This is the first shard where slice crosses into the next shard
- splitQueue.emplace_back(std::move(item.Part), std::move(item.Slice), shard);
- std::push_heap(splitQueue.begin(), splitQueue.end(), cmpHeapByLastKey);
- shard->SplitBlocks++;
- ++pos;
-
- // Don't even try building levels for this shard
- shard->Levels.Reset();
- shard->PerLevelStats.clear();
- }
-
- // Flush any pending slices for the last shard
- flushPending();
-
- while (splitQueue && shard != lastShard) {
- flushShard();
- }
-
- Y_VERIFY(pending.empty());
- Y_VERIFY(pos == queue.end());
- Y_VERIFY(splitQueue.empty());
-
- } else {
-
- // Avoid complex logic when there's a single shard
- auto* shard = Shards.Front();
-
- // Make sure new parts are sorted by epoch
- std::sort(parts.begin(), parts.end(), [](const TPartView& a, const TPartView& b) -> bool {
- if (a->Epoch != b->Epoch) {
- return a->Epoch < b->Epoch;
- }
- return a->Label < b->Label;
- });
-
- for (auto& partView : parts) {
- auto label = partView->Label;
-
- TPendingState state(shard, EnsurePart(shard, std::move(partView.Part)), TableInfo.GarbageParts.contains(label));
-
- for (const auto& slice : *partView.Slices) {
- state.AddSlice(slice);
- }
- }
- }
- }
-
- TShardedCompactionStrategy::TGlobalPart* TShardedCompactionStrategy::EnsureGlobalPart(
- const TIntrusiveConstPtr<TPart>& part,
- const TIntrusiveConstPtr<TSlices>& slices) noexcept
- {
- auto* allInfo = AllParts.FindPtr(part->Label);
- if (!allInfo) {
- TPartDataSizeHelper helper(part.Get());
- allInfo = &AllParts[part->Label];
- allInfo->Part = part;
- allInfo->TotalSize = helper.CalcSize(0, Max<TRowId>());
- allInfo->GarbageSize = 0;
- AllBackingSize += part->BackingSize();
- }
- allInfo->Slices = TSlices::Merge(allInfo->Slices, slices);
- UpdateGarbageStats(allInfo);
- return allInfo;
- }
-
- void TShardedCompactionStrategy::UpdateGarbageStats(TShardedCompactionStrategy::TGlobalPart* allInfo) noexcept {
- const auto& label = allInfo->Part->Label;
-
- TRowId last = 0;
- allInfo->GarbageSize = 0;
- TPartDataSizeHelper helper(allInfo->Part.Get());
- for (const auto& slice : *allInfo->Slices) {
- TRowId next = slice.BeginRowId();
- if (last < next) {
- allInfo->GarbageSize += helper.CalcSize(last, next);
- }
- last = slice.EndRowId();
- }
- allInfo->GarbageSize += helper.CalcSize(last, Max<TRowId>());
-
- ui32 garbagePercent = allInfo->GarbageSize * 100 / allInfo->TotalSize;
- if (garbagePercent >= Policy->ShardPolicy.GetMaxGarbagePercentToReuse()) {
- if (TableInfo.GarbageParts.emplace(label).second) {
- // This part just turned into garbage, update all shards
- for (auto* shard : allInfo->Shards) {
- if (!shard->Levels) {
- continue;
- }
- if (auto* partInfo = shard->Parts.FindPtr(label)) {
- for (auto& kv : partInfo->Slices) {
- size_t index = kv.second.Level->Index;
- if (index < shard->PerLevelGarbage.size()) {
- shard->PerLevelGarbage[index] += kv.second.Stats.Size;
- }
- }
- }
- }
- }
- } else {
- if (TableInfo.GarbageParts.erase(label)) {
- // This part just stopped being garbage, update all shards
- for (auto* shard : allInfo->Shards) {
- if (!shard->Levels) {
- continue;
- }
- if (auto* partInfo = shard->Parts.FindPtr(label)) {
- for (auto& kv : partInfo->Slices) {
- size_t index = kv.second.Level->Index;
- if (index < shard->PerLevelGarbage.size()) {
- shard->PerLevelGarbage[index] -= kv.second.Stats.Size;
- }
- }
- }
- }
- }
- }
- }
-
- TTablePart* TShardedCompactionStrategy::EnsurePart(TTableShard* shard, TIntrusiveConstPtr<TPart> part) noexcept {
- auto* info = shard->Parts.FindPtr(part->Label);
- if (!info) {
- AllParts.at(part->Label).Shards.emplace_back(shard);
-
- info = &shard->Parts[part->Label];
- info->Part = std::move(part);
- } else {
- Y_VERIFY(info->Part.Get() == part.Get(),
- "Multiple parts with the same label %s",
- part->Label.ToString().c_str());
- }
- return info;
- }
-
- void TShardedCompactionStrategy::RebuildLevels(TTableShard* shard) noexcept {
- Y_VERIFY(!shard->SplitBlocks, "Unexpected RebuildLevels while blocked by split");
-
- shard->Levels.Reset(new TLevels(TableInfo.RowScheme->Keys));
- shard->PerLevelStats.clear();
-
- TVector<TTablePart*> parts(Reserve(shard->Parts.size()));
- for (auto& kv : shard->Parts) {
- parts.emplace_back(&kv.second);
- }
- std::sort(parts.begin(), parts.end(), [](TTablePart* a, TTablePart* b) noexcept -> bool {
- if (a->Part->Epoch != b->Part->Epoch) {
- return a->Part->Epoch < b->Part->Epoch;
- }
- return a->Part->Label < b->Part->Label;
- });
-
- for (TTablePart* info : parts) {
- Y_VERIFY(info->Part->Epoch >= shard->Levels->GetMaxEpoch());
-
- bool isGarbage = TableInfo.GarbageParts.contains(info->Part->Label);
-
- for (auto& kv : info->Slices) {
- auto& item = kv.second;
- shard->RegisterItem(*info, item, isGarbage);
- }
- }
- }
-
- void TShardedCompactionStrategy::CancelTask(TCompactionTask& task) noexcept {
- switch (task.State) {
- case EState::Free: {
- // nothing
- break;
- }
- case EState::Pending: {
- Broker->CancelTask(task.TaskId);
- break;
- }
- case EState::Compacting: {
- Backend->CancelCompaction(task.CompactionId);
- break;
- }
- }
- task.State = EState::Free;
- task.TaskId = 0;
- task.CompactionId = 0;
- }
-
- bool TShardedCompactionStrategy::CheckShardCompaction(TTableShard* shard, bool schedule) noexcept {
- if (shard->SplitBlocks || shard->Task.State == EState::Compacting) {
- // cannot compact until we have the whole picture
- return false;
- }
-
- bool garbage = false;
- bool critical = false;
- shard->FailingLevel = Max<size_t>();
-
- if (!shard->Levels) {
- // build the overall picture about this shard
- RebuildLevels(shard);
- }
-
- const auto& policy = Policy->ShardPolicy;
-
- TVector<TStats> cumulative;
- cumulative.resize(shard->Levels->size());
-
- size_t idx = shard->Levels->size();
- for (auto level = shard->Levels->begin(); level != shard->Levels->end(); ++level) {
- bool failed = false;
- size_t nextIdx = idx--;
- auto& current = cumulative[idx];
- current += shard->PerLevelStats[idx];
-
- if (shard->PerLevelGarbage[idx] > 0) {
- garbage = true;
- failed = true;
- }
-
- if (nextIdx < cumulative.size()) {
- auto& next = cumulative[nextIdx];
- if (!failed &&
- current.Size > 0 &&
- policy.GetNewDataPercentToCompact() > 0 &&
- next.Size * 100 > current.Size * policy.GetNewDataPercentToCompact())
- {
- failed = true;
- }
- if (!failed &&
- current.Rows > 0 &&
- policy.GetNewRowsPercentToCompact() > 0 &&
- next.Rows * 100 > current.Rows * policy.GetNewRowsPercentToCompact())
- {
- failed = true;
- }
- current += next;
- }
- if (!failed &&
- !Policy->KeepEraseMarkers &&
- Policy->DroppedRowsPercentToCompact > 0 &&
- current.DroppedPercent() >= Policy->DroppedRowsPercentToCompact)
- {
- failed = true;
- }
- if (policy.GetMaxSlicesPerLevel() > 0 &&
- level->size() > policy.GetMaxSlicesPerLevel())
- {
- // We want to compact this even if it's just a single level
- critical = true;
- failed = true;
- }
- if (failed) {
- shard->FailingLevel = idx;
- }
- }
-
- if (critical && shard->FailingLevel > 0) {
- // Take an extra level below so we eventually escape from the
- // critical (e.g. by degenerating into full compaction of all
- // existing levels)
- shard->FailingLevel--;
- }
-
- if (shard->Levels->size() > policy.GetMaxTotalLevels()) {
- // We want MaxTotalLevels, compact everything above
- size_t level = policy.GetMaxTotalLevels() - 1;
- shard->FailingLevel = Min(shard->FailingLevel, level);
- }
-
- if (!critical && !garbage &&
- shard->FailingLevel != Max<size_t>() &&
- shard->Levels->size() - shard->FailingLevel < policy.GetMinLevelsToCompact())
- {
- // We don't have enough levels to compact
- shard->FailingLevel = Max<size_t>();
- }
-
- if (shard->FailingLevel == Max<size_t>()) {
- // We don't want to compact this shard at this time
- return false;
- }
-
- if (ForcedCompactionTask.State == EState::Compacting) {
- // No shard compactions while forced compaction is in progress
- return false;
- }
-
- if (schedule) {
- TString type = policy.GetResourceBrokerTask();
-
- // TODO: figure out how to assign priorities
- ui32 newPriority = policy.GetTaskPriorityBase();
-
- // For every N levels over min levels we increase priority
- if (policy.GetTaskPriorityLevelsBoost() > 0) {
- ui32 levelsBoost = shard->Levels->size() - shard->FailingLevel;
- levelsBoost -= Min(levelsBoost, policy.GetMinLevelsToCompact());
- levelsBoost /= policy.GetTaskPriorityLevelsBoost();
- newPriority -= Min(newPriority, levelsBoost);
- }
-
- // For every N bytes of input data we decrease priority
- if (policy.GetTaskPrioritySizePenalty() > 0) {
- ui32 sizePenalty = cumulative[shard->FailingLevel].Size;
- sizePenalty /= policy.GetTaskPrioritySizePenalty();
- newPriority += Min(sizePenalty, ui32(1000));
- }
-
- switch (shard->Task.State) {
- case EState::Free: {
- // submit a new task
- TString name;
- {
- TStringOutput out(name);
- out << "shard-" << shard->Id;
- out << "-table-" << Table;
- out << "-" << TaskNameSuffix;
- }
- shard->Task.SubmissionTimestamp = Time->Now();
- shard->Task.Priority = newPriority;
- shard->Task.TaskId = Broker->SubmitTask(
- std::move(name),
- TResourceParams(std::move(type))
- .WithCPU(1)
- .WithPriority(shard->Task.Priority),
- [this, shard](TTaskId taskId) {
- BeginShardCompaction(shard, taskId);
- });
- shard->Task.State = EState::Pending;
- break;
- }
- case EState::Pending: {
- // update task priority
- if (newPriority < shard->Task.Priority &&
- (shard->Task.Priority - newPriority) >= shard->Task.Priority / PRIORITY_UPDATE_FACTOR) {
- shard->Task.Priority = newPriority;
- Broker->UpdateTask(
- shard->Task.TaskId,
- TResourceParams(std::move(type))
- .WithCPU(1)
- .WithPriority(shard->Task.Priority));
- }
- break;
- }
- default: {
- Y_FAIL("Unexpected shard state");
- }
- }
- }
-
- return true;
- }
-
- bool TShardedCompactionStrategy::CheckForcedCompaction(bool schedule) noexcept {
- if (!ForcedCompactionPending ||
- ForcedCompactionTask.State == EState::Compacting ||
- PendingSliceSplits ||
- SliceSplitResults)
- {
- // Forced compaction cannot run at this time
- return false;
- }
-
- if (MemCompactionForced) {
- // Don't start big compaction during another forced mem compaction
- return false;
- }
-
- if (schedule) {
- const auto& policy = Policy->ShardPolicy;
-
- TString type = policy.GetResourceBrokerTask();
-
- // TODO: figure out how to assign priorities
- ui32 newPriority = policy.GetTaskPriorityBase();
-
- switch (ForcedCompactionTask.State) {
- case EState::Free: {
- // submit a new task
- TString name;
- {
- TStringOutput out(name);
- out << "forced-table-" << Table;
- out << "-" << TaskNameSuffix;
- }
- ForcedCompactionTask.SubmissionTimestamp = Time->Now();
- ForcedCompactionTask.Priority = newPriority;
- ForcedCompactionTask.TaskId = Broker->SubmitTask(
- std::move(name),
- TResourceParams(std::move(type))
- .WithCPU(1)
- .WithPriority(ForcedCompactionTask.Priority),
- [this](TTaskId taskId) {
- BeginForcedCompaction(taskId);
- });
- ForcedCompactionTask.State = EState::Pending;
- break;
- }
- case EState::Pending: {
- // update task priority
- if (newPriority < ForcedCompactionTask.Priority &&
- (ForcedCompactionTask.Priority - newPriority) >= ForcedCompactionTask.Priority / PRIORITY_UPDATE_FACTOR) {
- ForcedCompactionTask.Priority = newPriority;
- Broker->UpdateTask(
- ForcedCompactionTask.TaskId,
- TResourceParams(std::move(type))
- .WithCPU(1)
- .WithPriority(ForcedCompactionTask.Priority));
- }
- break;
- }
- default: {
- Y_FAIL("Unexpected forced compaction state");
- }
- }
- }
-
- return true;
- }
-
- void TShardedCompactionStrategy::BeginShardCompaction(TTableShard* shard, TTaskId taskId) noexcept {
- Y_VERIFY(shard->Task.State == EState::Pending);
- Y_VERIFY(shard->Task.TaskId == taskId);
- if (!CheckShardCompaction(shard, false)) {
- // We no longer want to compact this shard for some reason
- Broker->FinishTask(taskId, EResourceStatus::Cancelled);
- shard->Task.State = EState::Free;
- shard->Task.TaskId = 0;
- return;
- }
-
- auto params = MakeHolder<TShardedCompactionParams>();
- params->Table = Table;
- params->TaskId = shard->Task.TaskId;
- params->InputShard = shard;
-
- struct TInput {
- TIntrusiveConstPtr<TPart> Part;
- TVector<TSlice> Slices;
- TVector<TSlice> Reused;
- TIntrusiveConstPtr<TSlices> ReusedRef;
- };
-
- TEpoch maxEpoch = TEpoch::Min();
- TVector<const TBounds*> edges;
- THashMap<const TPart*, TInput> inputs;
-
- // First gather all inputs
- for (auto& level : *shard->Levels) {
- if (level.Index < shard->FailingLevel) {
- continue;
- }
- for (auto& item : level) {
- auto& input = inputs[item.Part.Get()];
- if (!input.Part) {
- maxEpoch = Max(maxEpoch, item.Part->Epoch);
- input.Part = item.Part;
- }
- input.Slices.emplace_back(item.Slice);
- }
- }
-
- // Try to find slices that may be reused
- TPageReuseBuilder reuseBuilder(*TableInfo.RowScheme->Keys);
- for (auto& kv : inputs) {
- auto& input = kv.second;
- bool reusable = true;
- // Part is not reusable if it has any drops
- if (reusable && input.Part->Stat.Drops > 0) {
- reusable = false;
- }
- // Avoid reusing parts that are already considered garbage
- if (reusable && TableInfo.GarbageParts.contains(input.Part->Label)) {
- reusable = false;
- }
- for (const auto& slice : input.Slices) {
- reuseBuilder.AddSlice(kv.first, slice, reusable);
- }
- }
- auto reuseResults = reuseBuilder.Build();
-
- // Reuse all we can if not too many output slices are expected
- if (reuseResults.ExpectedSlices < Policy->ShardPolicy.GetMaxSlicesPerLevel()) {
- for (auto& reusable : reuseResults.Reusable) {
- TPartDataSizeHelper helper(reusable.Part);
- ui64 dataSize = helper.CalcSize(reusable.Slice.BeginRowId(), reusable.Slice.EndRowId());
- if (dataSize < Policy->ShardPolicy.GetMinSliceSizeToReuse()) {
- // Avoid reusing slices that are too small, better to recompact
- continue;
- }
- auto* input = inputs.FindPtr(reusable.Part);
- Y_VERIFY(input, "Cannot find reusable part %s in our inputs",
- reusable.Part->Label.ToString().c_str());
- input->Reused.emplace_back(std::move(reusable.Slice));
- }
- }
-
- // Now turn those inputs into TPartView structures
- for (auto& kv : inputs) {
- auto& input = kv.second;
- std::sort(input.Slices.begin(), input.Slices.end(), [](const TSlice& a, const TSlice& b) noexcept -> bool {
- return a.BeginRowId() < b.BeginRowId();
- });
- TIntrusiveConstPtr<TSlices> original = new TSlices(std::move(input.Slices));
- TIntrusiveConstPtr<TSlices> reused = new TSlices(std::move(input.Reused));
- TIntrusiveConstPtr<TSlices> compacted = TSlices::Subtract(original, reused);
-
- // Everything we're compacting will become garbage soon
- if (reused && !reused->empty() && compacted && !compacted->empty()) {
- auto* allInfo = AllParts.FindPtr(input.Part->Label);
- Y_VERIFY(allInfo, "Reused part %s is not registered", input.Part->Label.ToString().c_str());
-
- TIntrusiveConstPtr<TSlices> afterCompaction = TSlices::Subtract(allInfo->Slices, compacted);
-
- // Calculate how much garbage this part would have after compaction
- TRowId last = 0;
- ui64 newGarbage = 0;
- TPartDataSizeHelper helper(input.Part.Get());
- for (const auto& slice : *afterCompaction) {
- TRowId next = slice.BeginRowId();
- if (last < next) {
- newGarbage += helper.CalcSize(last, next);
- }
- last = slice.EndRowId();
- }
- newGarbage += helper.CalcSize(last, Max<TRowId>());
- ui64 newGarbagePercent = newGarbage * 100 / allInfo->TotalSize;
-
- if (newGarbagePercent >= Policy->ShardPolicy.GetMaxGarbagePercentToReuse()) {
- compacted = original;
- reused = nullptr;
- }
- }
-
- // Make sure compacted data would not interfere with reused slices
- if (reused && !reused->empty()) {
- input.ReusedRef = reused; // keep pointers alive
- for (const auto& slice : *reused) {
- edges.emplace_back(&slice);
- }
- }
-
- // Create necessary records if there is anything to compact
- if (compacted && !compacted->empty()) {
- params->Parts.emplace_back(MakePartView(input.Part, std::move(compacted)));
- params->Original.emplace_back(MakePartView(input.Part, std::move(original)));
- if (reused && !reused->empty()) {
- params->Reused.emplace_back(MakePartView(input.Part, std::move(reused)));
- }
- }
- }
-
- if (!Policy->KeepEraseMarkers) {
- params->IsFinal = shard->FailingLevel == 0;
- } else {
- params->IsFinal = false;
- }
-
- if (!params->IsFinal) {
- TVector<const TBounds*> allBounds;
- for (auto& level : *shard->Levels) {
- if (level.Index < shard->FailingLevel) {
- for (auto& item : level) {
- if (!Policy->KeepEraseMarkers) {
- allBounds.push_back(&item.Slice);
- }
- if (item.Part->Epoch >= maxEpoch) {
- // Prevent newer slices from bubbling up just because
- // we have compacted older disjoint slices that happened
- // to be higher in the hierarchy.
- edges.push_back(&item.Slice);
- }
- }
- }
- }
-
- if (!Policy->KeepEraseMarkers) {
- Y_VERIFY(allBounds, "Unexpected lack of underlay slices");
- params->UnderlayMask = TUnderlayMask::Build(TableInfo.RowScheme, allBounds);
- }
- }
-
- if (edges) {
- params->SplitKeys = MakeHolder<TSplitKeys>(TableInfo.RowScheme, std::move(edges));
- }
-
- shard->Task.CompactionId = Backend->BeginCompaction(std::move(params));
- shard->Task.State = EState::Compacting;
- }
-
- void TShardedCompactionStrategy::BeginForcedCompaction(TTaskId taskId) noexcept {
- Y_VERIFY(ForcedCompactionTask.State == EState::Pending);
- Y_VERIFY(ForcedCompactionTask.TaskId == taskId);
- Y_VERIFY(ForcedCompactionPending);
-
- if (!CheckForcedCompaction(false)) {
- Broker->FinishTask(taskId, EResourceStatus::Cancelled);
- ForcedCompactionTask.State = EState::Free;
- ForcedCompactionTask.TaskId = 0;
- return;
- }
-
- // Avoid active splits changing state under us
- Y_VERIFY(!PendingSliceSplits);
- Y_VERIFY(!SliceSplitResults);
-
- // Cancel all running shard compactions
- for (auto& shard : Shards) {
- CancelTask(shard.Task);
- }
-
- auto params = MakeHolder<TShardedCompactionParams>();
- params->Table = Table;
- params->TaskId = taskId;
- params->IsFinal = !Policy->KeepEraseMarkers;
-
- // Take all non-nursery parts into a new giant compaction
- for (auto& kv : AllParts) {
- params->Parts.push_back(MakePartView(kv.second.Part, kv.second.Slices));
- params->Original.push_back(params->Parts.back());
- }
-
- if (TableInfo.SplitKeys) {
- TVector<TSerializedCellVec> splitKeys;
- auto* last = Shards.Back();
- auto* shard = Shards.Front();
- while (shard != last) {
- splitKeys.push_back(TableInfo.SplitKeys.at(shard->RightKey));
- shard = shard->Next()->Node();
- }
- Y_VERIFY(splitKeys, "Unexpected lack of split keys");
- params->SplitKeys = MakeHolder<TSplitKeys>(TableInfo.RowScheme, std::move(splitKeys));
- }
-
- ForcedCompactionTask.CompactionId = Backend->BeginCompaction(std::move(params));
- ForcedCompactionTask.State = EState::Compacting;
- ForcedCompactionPending = false;
- }
-
- void TShardedCompactionStrategy::SerializeStateInfo(TString* out) const noexcept {
- TShardedStrategyStateInfo header;
- header.SetLastSplitKey(TableInfo.LastSplitKey);
- header.SetLastShardId(TableInfo.LastShardId);
- for (auto& kv : TableInfo.SplitKeys) {
- header.AddSplitKeys(kv.first);
- }
- for (auto& shard : Shards) {
- header.AddShards(shard.Id);
- }
- Y_PROTOBUF_SUPPRESS_NODISCARD header.SerializeToString(out);
- }
-
-}
-}
-}
diff --git a/ydb/core/tablet_flat/flat_comp_shard.h b/ydb/core/tablet_flat/flat_comp_shard.h
deleted file mode 100644
index 073a5f731ea..00000000000
--- a/ydb/core/tablet_flat/flat_comp_shard.h
+++ /dev/null
@@ -1,694 +0,0 @@
-#pragma once
-
-#include "flat_comp.h"
-#include "flat_stat_part.h"
-#include "util_fmt_line.h"
-
-#include <library/cpp/time_provider/time_provider.h>
-
-#include <util/generic/queue.h>
-
-namespace NKikimr {
-namespace NTable {
-namespace NCompShard {
-
- // Forward declarations
- struct TTableInfo;
- struct TTablePart;
- struct TTableShard;
-
- /**
- * Stats used for data estimation
- */
- struct TStats {
- // Size of data we may potentially read during compaction
- ui64 Size = 0;
-
- // Number of rows we may potentially read during compaction (including any deletion markers)
- ui64 Rows = 0;
-
- // Estimated number of deletion markers in the [0, Rows] range
- double Drops = 0.0;
-
- void Reset() {
- Size = 0;
- Rows = 0;
- Drops = 0.0;
- }
-
- void Init(const TPart* part, const TSlice& slice) {
- TPartDataSizeHelper helper(part);
- Init(part, slice, helper);
- }
-
- void Init(const TPart* part, const TSlice& slice, TPartDataSizeHelper& helper) {
- Size = helper.CalcSize(slice.BeginRowId(), slice.EndRowId());
- Rows = slice.Rows();
- if (part->Stat.Rows > 0) {
- ui64 partRows = part->Stat.Rows;
- ui64 partDrops = part->Stat.Drops;
-
- // Assume worst case: every drop has a hidden row that won't go away
- partDrops -= Min(partDrops, part->Stat.HiddenRows);
-
- if (Rows < partRows) {
- Drops = double(partDrops) * Rows / partRows;
- } else {
- Drops = partDrops;
- }
- } else {
- Drops = 0.0;
- }
- }
-
- TStats& operator+=(const TStats& o) {
- Size += o.Size;
- Rows += o.Rows;
- Drops += o.Drops;
- if (Drops > Rows) {
- Drops = Rows;
- }
- return *this;
- }
-
- TStats& operator-=(const TStats& o) {
- Size -= o.Size;
- Rows -= o.Rows;
- Drops -= o.Drops;
- if (Rows == 0) {
- Drops = 0.0;
- }
- return *this;
- }
-
- ui32 DroppedPercent() const {
- ui64 normalRows = Rows > Drops ? ui64(Rows - Drops) : 0;
- if (Drops > 0) {
- if (Drops >= normalRows) {
- return 100;
- }
- if (normalRows > 0) {
- return Min(ui32(Drops * 100 / normalRows), ui32(100));
- }
- }
- return 0;
- }
- };
-
- /**
- * A helper class used to find keys that evenly split slices
- */
- class TSplitStatIterator {
- public:
- TSplitStatIterator(const TKeyCellDefaults& keyDefaults)
- : KeyCellDefaults(keyDefaults)
- , InitQueue(TCmpHeapByFirstKey{KeyCellDefaults})
- , NextQueue(TCmpHeapByNextKey{KeyCellDefaults})
- , StartQueue(TCmpHeapByFirstKey{KeyCellDefaults})
- , StopQueue(TCmpHeapByLastKey{KeyCellDefaults})
- { }
-
- void AddSlice(const TPart* part, const TSlice& slice, ui64 size) noexcept;
-
- bool Next() noexcept;
-
- TCellsRef CurrentKey() const noexcept { return Key; }
-
- ui64 LeftSize() const noexcept { return LeftSize_; }
- ui64 RightSize() const noexcept { return RightSize_; }
- ui64 LeftRows() const noexcept { return LeftRows_; }
- ui64 RightRows() const noexcept { return RightRows_; }
-
- private:
- struct TItemState {
- const TPart* const Part;
- const TSlice Slice;
- TPartDataSizeHelper Helper;
- NPage::TIndex::TIter NextPage;
- TVector<TCell> NextKey;
-
- TRowId LastRowId;
- ui64 LastPageSize = 0;
- ui64 LastPageRows = 0;
- bool IsPartial = false;
-
- TItemState(const TPart* part, const TSlice& slice)
- : Part(part)
- , Slice(slice)
- , Helper(Part)
- , LastRowId(Slice.BeginRowId())
- { }
-
- // Make sure this structure is stable in memory
- TItemState(const TItemState&) = delete;
- TItemState(TItemState&&) = delete;
-
- void InitNextKey() noexcept;
- void InitPageSize() noexcept;
- };
-
- struct TCmpHeapByFirstKey {
- const TKeyCellDefaults& KeyCellDefaults;
-
- bool operator()(const TItemState* b, const TItemState* a) const noexcept;
- };
-
- struct TCmpHeapByNextKey {
- const TKeyCellDefaults& KeyCellDefaults;
-
- bool operator()(const TItemState* b, const TItemState* a) const noexcept;
- };
-
- struct TCmpHeapByLastKey {
- const TKeyCellDefaults& KeyCellDefaults;
-
- bool operator()(const TItemState* b, const TItemState* a) const noexcept;
- };
-
- template<class TCmp>
- using TItemQueue = TPriorityQueue<TItemState*, TVector<TItemState*>, TCmp>;
-
- private:
- bool HasStarted(const TItemState* item) const noexcept;
- bool HasStopped(const TItemState* item) const noexcept;
-
- void InitNextKey(TItemState* item) const noexcept;
- void InitPageSize(TItemState* item) const noexcept;
-
- private:
- const TKeyCellDefaults& KeyCellDefaults;
- TVector<TCell> Key;
- TDeque<TItemState> Items;
- TItemQueue<TCmpHeapByFirstKey> InitQueue;
- TItemQueue<TCmpHeapByNextKey> NextQueue;
- TItemQueue<TCmpHeapByFirstKey> StartQueue;
- TItemQueue<TCmpHeapByLastKey> StopQueue;
- TVector<TItemState*> ActivationQueue;
- ui64 LeftSize_ = 0;
- ui64 RightSize_ = 0;
- ui64 LeftRows_ = 0;
- ui64 RightRows_ = 0;
- };
-
- /**
- * A helper class used to find pages that may be reused during compaction
- */
- class TPageReuseBuilder {
- public:
- TPageReuseBuilder(const TKeyCellDefaults& keyDefaults)
- : KeyCellDefaults(keyDefaults)
- { }
-
- void AddSlice(const TPart* part, const TSlice& slice, bool reusable) noexcept;
-
- public:
- struct TReusable {
- const TPart* Part;
- TSlice Slice;
- };
-
- struct TResults {
- TDeque<TReusable> Reusable;
- size_t ExpectedSlices = 0;
- };
-
- TResults Build() noexcept;
-
- private:
- // A lightweight version of either left or right slice boundary
- struct TBoundary {
- TCellsRef Key;
- TRowId RowId;
- bool Inclusive;
- };
-
- struct TLeftBoundary : public TBoundary {
- TRowId BeginRowId() const {
- return RowId + !Inclusive;
- }
- };
-
- struct TRightBoundary : public TBoundary {
- TRowId EndRowId() const {
- return RowId + Inclusive;
- }
- };
-
- struct TItemState {
- const TPart* const Part;
- const TSlice Slice;
- NPage::TIndex::TIter NextPage;
- TVector<TCell> NextKey;
- TLeftBoundary First;
- bool Reusable;
-
- TItemState(const TPart* part, const TSlice& slice, bool reusable)
- : Part(part)
- , Slice(slice)
- , Reusable(reusable)
- { }
-
- // Make sure this structure is stable in memory
- TItemState(const TItemState&) = delete;
- TItemState(TItemState&&) = delete;
-
- bool InitNextKey() noexcept;
- TRowId GetNextRowId() const noexcept;
- };
-
- private:
- const TKeyCellDefaults& KeyCellDefaults;
- TDeque<TItemState> Items;
- };
-
- enum class EState {
- Free,
- Pending,
- Compacting,
- };
-
- struct TCompactionTask {
- EState State = EState::Free;
- ui32 Priority = 0;
- ui64 TaskId = 0;
- TInstant SubmissionTimestamp;
- ui64 CompactionId = 0;
- };
-
- /**
- * This structure is pinned in memory as long as strategy is alive
- */
- struct TTableInfo {
- // Current row scheme of the table, updated on schema changes
- TIntrusiveConstPtr<TRowScheme> RowScheme;
-
- // Current known shard keys, changed on split/merge
- THashMap<ui64, TSerializedCellVec> SplitKeys;
-
- // Last used key id (incremented for each new key)
- ui64 LastSplitKey = 0;
-
- // Last used shard id (incremented for each new shard)
- ui64 LastShardId = 0;
-
- // A set of undesirable part labels we would like to compact
- THashSet<TLogoBlobID> GarbageParts;
- };
-
- /**
- * This structure describes everything known about a part within a shard
- */
- struct TTablePart {
- struct TSliceId {
- const TRowId Begin;
- const TRowId End;
-
- TSliceId(TRowId begin, TRowId end)
- : Begin(begin)
- , End(end)
- { }
-
- TSliceId(const TSlice& slice)
- : Begin(slice.BeginRowId())
- , End(slice.EndRowId())
- { }
-
- friend bool operator<(const TSliceId& a, const TSliceId& b) {
- return a.End <= b.Begin;
- }
- };
-
- struct TItem {
- // A single slice description
- TSlice Slice;
-
- // Stats for the above slice (upper bound)
- TStats Stats;
-
- // A stable iterator into the level where slice is currently placed
- TLevels::iterator Level;
-
- // A stable iterator into the run where slice is currently placed
- TRun::iterator Position;
- };
-
- // Reference to the part
- TIntrusiveConstPtr<TPart> Part;
-
- // A sorted run of used slices
- TMap<TSliceId, TItem> Slices;
- };
-
- /**
- * This structure describes a single shard of the table
- */
- struct TTableShard
- : public TIntrusiveListItem<TTableShard>
- {
- // Unique shard id, primarily used in html and broker task names
- ui64 Id = 0;
-
- // Unique per-table id of left and right boundary keys
- // The special value 0 is used as a marker for +/- infinity
- ui64 LeftKey = 0;
- ui64 RightKey = 0;
-
- // Full map of part slices that are fully inside this shard
- THashMap<TLogoBlobID, TTablePart> Parts;
-
- // Full stats of the shard, this includes every active part
- TStats Stats;
-
- // Number of split operations that are currently blocking this shard
- size_t SplitBlocks = 0;
-
- // Non-null when dynamic levels are fully formed
- // TODO: may be better if this is not recomputed every time
- THolder<TLevels> Levels;
-
- // Valid when Levels are non-null, cached stats for each level
- TVector<TStats> PerLevelStats;
-
- // Valid when Levels are non-null, counts number of undesirable bytes
- // on each level, which may be freed after compaction.
- TVector<size_t> PerLevelGarbage;
-
- // The first level where invariants no longer hold
- size_t FailingLevel = Max<size_t>();
-
- // Current compaction task
- TCompactionTask Task;
-
- void RegisterItem(const TTablePart& info, TTablePart::TItem& item, bool isGarbage) noexcept;
-
- bool FindSplitKey(TSerializedCellVec& foundKey, const TKeyCellDefaults& keyDefaults) const noexcept;
- };
-
- /**
- * A result of a split of a single slice over a number of shards
- */
- struct TSliceSplitResult {
- TIntrusiveConstPtr<TPart> Part;
- TSlice OldSlice;
- TVector<TSlice> NewSlices;
- TVector<TTableShard*> Shards;
- };
-
- /**
- * Backend interface for consuming split results
- */
- class ISliceSplitResultConsumer {
- public:
- virtual ~ISliceSplitResultConsumer() = default;
-
- /**
- * Called when split completes successfully
- */
- virtual void OnSliceSplitResult(TSliceSplitResult result) = 0;
- };
-
- /**
- * A running split operation of a slice over a number of shards
- *
- * This object is only valid until operation has completed. The intrusive
- * list is used while it's still pending so it may be cancelled.
- */
- class TSliceSplitOp
- : public ICompactionRead
- , public TIntrusiveListItem<TSliceSplitOp>
- {
- public:
- TSliceSplitOp(
- ISliceSplitResultConsumer* consumer,
- const TTableInfo* table,
- TVector<TTableShard*> shards,
- TIntrusiveConstPtr<TPart> part,
- TSlice slice)
- : Consumer(consumer)
- , Table(table)
- , Shards(std::move(shards))
- , Part(std::move(part))
- , Slice(std::move(slice))
- {
- Y_VERIFY(Shards.size() >= 2, "You need at least two shards for a split");
- for (size_t idx = 1; idx < Shards.size(); ++idx) {
- Y_VERIFY(Shards[idx-1]->RightKey == Shards[idx]->LeftKey, "Adjacent shards must have matching keys");
- Y_VERIFY(Shards[idx]->LeftKey != 0, "Adjacent shards must have a valid key");
- }
- }
-
- bool Execute(IPages* env) override;
-
- void SetReadId(ui64 readId) {
- Y_VERIFY(ReadId == 0, "Read id must be set exactly once");
- Y_VERIFY(readId != 0, "Attempting to assign an invalid read id");
- ReadId = readId;
- }
-
- ui64 GetReadId() const {
- Y_VERIFY(ReadId != 0, "Read id has not been set");
- return ReadId;
- }
-
- public:
- ISliceSplitResultConsumer* const Consumer;
- const TTableInfo* const Table;
- const TVector<TTableShard*> Shards;
- const TIntrusiveConstPtr<TPart> Part;
- const TSlice Slice;
-
- private:
- ui64 ReadId = 0;
- };
-
- class TUnderlayMask final : public NPage::IKeySpace {
- public:
- TUnderlayMask(TIntrusiveConstPtr<TRowScheme> rowScheme, TVector<TBounds> bounds)
- : RowScheme(std::move(rowScheme))
- , Bounds(std::move(bounds))
- {
- Y_VERIFY(ValidateOrder(), "TUnderlayMask got bounds in an invalid order");
- Reset();
- }
-
- const TVector<TBounds>& GetBounds() const {
- return Bounds;
- }
-
- void Reset() noexcept override {
- Position = Bounds.begin();
- }
-
- bool HasKey(TCellsRef key) noexcept override;
-
- public:
- static THolder<TUnderlayMask> Build(
- TIntrusiveConstPtr<TRowScheme> rowScheme,
- TVector<const TBounds*>& input) noexcept;
-
- private:
- bool ValidateOrder() const noexcept;
- bool ValidatePosition(TCellsRef key) const noexcept;
-
- private:
- TIntrusiveConstPtr<TRowScheme> RowScheme;
- TVector<TBounds> Bounds;
- TVector<TBounds>::const_iterator Position;
- };
-
- class TSplitKeys final : public NPage::ISplitKeys {
- public:
- using TKeysVec = TVector<TSerializedCellVec>;
-
- TSplitKeys(TIntrusiveConstPtr<TRowScheme> rowScheme, TKeysVec keys);
- TSplitKeys(TIntrusiveConstPtr<TRowScheme> rowScheme, TVector<const TBounds*> bounds);
-
- const TKeysVec& GetKeys() const {
- return Keys;
- }
-
- void Reset() noexcept override {
- Position = Keys.begin();
- }
-
- bool ShouldSplit(TCellsRef key) noexcept override;
-
- private:
- bool ValidateOrder() const noexcept;
- bool ValidatePosition(TCellsRef key) const noexcept;
- bool IsInclusive(TKeysVec::const_iterator pos) const noexcept;
-
- private:
- TIntrusiveConstPtr<TRowScheme> RowScheme;
- TKeysVec Keys;
- TKeysVec::const_iterator Position;
- TVector<bool> Inclusive;
- };
-
- class TShardedCompactionParams final : public TCompactionParams {
- public:
- void Describe(IOutputStream& out) const noexcept override;
-
- public:
- TTableShard* InputShard = nullptr;
- TVector<TPartView> Original;
- TVector<TPartView> Reused;
- };
-
- class TShardedCompactionStrategy final
- : public ICompactionStrategy
- , public ISliceSplitResultConsumer
- {
- public:
- TShardedCompactionStrategy(
- ui32 table,
- ICompactionBackend* backend,
- IResourceBroker* broker,
- NUtil::ILogger* logger,
- ITimeProvider* time,
- TString taskNameSuffix)
- : Table(table)
- , Backend(backend)
- , Broker(broker)
- , Logger(logger)
- , Time(time)
- , TaskNameSuffix(std::move(taskNameSuffix))
- {
- }
-
- void Start(TCompactionState state) override;
- void Stop() override;
- void ReflectSchema() override;
- void ReflectRemovedRowVersions() override;
- void UpdateCompactions() override;
- float GetOverloadFactor() override;
- ui64 GetBackingSize() override;
- ui64 GetBackingSize(ui64 ownerTabletId) override;
- ui64 BeginMemCompaction(TTaskId taskId, TSnapEdge edge, ui64 forcedCompactionId) override;
- ui64 GetLastFinishedForcedCompactionId() const override { return 0; } // TODO!
- TInstant GetLastFinishedForcedCompactionTs() const override { return TInstant::Zero(); } // TODO!
- TCompactionChanges CompactionFinished(
- ui64 compactionId,
- THolder<TCompactionParams> params,
- THolder<TCompactionResult> result) override;
- void PartMerged(TPartView part, ui32 level) override;
- void PartMerged(TIntrusiveConstPtr<TColdPart> part, ui32 level) override;
- TCompactionChanges PartsRemoved(TArrayRef<const TLogoBlobID> parts) override;
- TCompactionChanges ApplyChanges() override;
- TCompactionState SnapshotState() override;
- bool AllowForcedCompaction() override;
- void OutputHtml(IOutputStream &out) override;
-
- void OnSliceSplitResult(TSliceSplitResult result) override;
-
- private:
- /**
- * Schedules an eventual ApplyChanges call
- */
- void RequestChanges() noexcept;
-
- /**
- * Check and schedule pending compactions
- */
- void CheckCompactions() noexcept;
-
- /**
- * Adds parts to current compaction state
- *
- * These parts don't necessarily have to be new parts, but it's
- * important that during replacements old data is removed first.
- */
- void AddParts(TVector<TPartView> parts);
-
- /**
- * Returns existing or creates a new TTablePart for the specified part
- */
- TTablePart* EnsurePart(TTableShard* shard, TIntrusiveConstPtr<TPart> part) noexcept;
-
- /**
- * Reinitializes dynamic levels and related structures
- */
- void RebuildLevels(TTableShard* shard) noexcept;
-
- /**
- * Common helper for cancelling compaction tasks
- */
- void CancelTask(TCompactionTask& task) noexcept;
-
- /**
- * Checks current shard state and schedules compaction when necessary
- */
- bool CheckShardCompaction(TTableShard* shard, bool schedule = true) noexcept;
-
- /**
- * Checks table and shcedules forced compaction when necessary
- */
- bool CheckForcedCompaction(bool schedule = true) noexcept;
-
- /**
- * Called when shard compaction is ready to start
- */
- void BeginShardCompaction(TTableShard* shard, TTaskId taskId) noexcept;
-
- /**
- * Called when forced compaction is ready to start
- */
- void BeginForcedCompaction(TTaskId taskId) noexcept;
-
- /**
- * Serializes current state info (header) in a specified string
- */
- void SerializeStateInfo(TString* out) const noexcept;
-
- private:
- struct TNurseryItem {
- TPartView PartView;
- ui64 DataSize = 0;
- };
-
- struct TGlobalPart {
- TIntrusiveConstPtr<TPart> Part;
- TIntrusiveConstPtr<TSlices> Slices;
- TVector<TTableShard*> Shards;
- ui64 TotalSize = 0;
- ui64 GarbageSize = 0;
- size_t SplitBlocks = 0;
- };
-
- TGlobalPart* EnsureGlobalPart(const TIntrusiveConstPtr<TPart>& part, const TIntrusiveConstPtr<TSlices>& slices) noexcept;
- void UpdateGarbageStats(TGlobalPart* allInfo) noexcept;
-
- private:
- const ui32 Table;
- ICompactionBackend* const Backend;
- IResourceBroker* const Broker;
- NUtil::ILogger* const Logger;
- ITimeProvider* const Time;
- const TString TaskNameSuffix;
-
- TTableInfo TableInfo;
- TIntrusiveConstPtr<TCompactionPolicy> Policy;
- TIntrusiveListWithAutoDelete<TTableShard, TDelete> Shards;
- TIntrusiveList<TSliceSplitOp> PendingSliceSplits;
- TVector<TSliceSplitResult> SliceSplitResults;
- THashMap<TLogoBlobID, TGlobalPart> AllParts; // does not include nursery
- ui64 AllBackingSize = 0;
-
- TVector<TIntrusiveConstPtr<TColdPart>> ColdParts;
-
- ui64 MemCompactionId = 0;
- bool MemCompactionForced = false;
-
- TDeque<TNurseryItem> Nursery;
- ui64 NurseryDataSize = 0;
- size_t NurseryTaken = 0;
-
- TCompactionTask ForcedCompactionTask;
- bool ForcedCompactionPending = false;
-
- bool RequestChangesPending = false;
- };
-
-}
-}
-}
diff --git a/ydb/core/tablet_flat/flat_dbase_scheme.h b/ydb/core/tablet_flat/flat_dbase_scheme.h
index 521ca85803c..f821b0f998a 100644
--- a/ydb/core/tablet_flat/flat_dbase_scheme.h
+++ b/ydb/core/tablet_flat/flat_dbase_scheme.h
@@ -180,8 +180,8 @@ public:
if (auto *table = GetTableInfo(id)) {
auto strategy = table->CompactionPolicy->CompactionStrategy;
if (strategy != NKikimrSchemeOp::CompactionStrategyUnset) {
- if (table->ColdBorrow && strategy == NKikimrSchemeOp::CompactionStrategySharded) {
- // Sharded strategy does not support cold borrow
+ if (strategy == NKikimrSchemeOp::CompactionStrategySharded) {
+ // Sharded strategy doesn't exist anymore
// Use the safe generational strategy instead
strategy = NKikimrSchemeOp::CompactionStrategyGenerational;
}
diff --git a/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp b/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp
index 407d460d1b3..fa5c0f211bc 100644
--- a/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp
+++ b/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp
@@ -331,15 +331,13 @@ THolder<NTable::ICompactionStrategy> TCompactionLogic::CreateStrategy(
ui32 tableId,
NKikimrSchemeOp::ECompactionStrategy strategy)
{
+ Y_UNUSED(Logger);
+
switch (strategy) {
case NKikimrSchemeOp::CompactionStrategyGenerational:
return NTable::CreateGenCompactionStrategy(
tableId, Backend, Broker, Time, TaskNameSuffix);
- case NKikimrSchemeOp::CompactionStrategySharded:
- return NTable::CreateShardedCompactionStrategy(
- tableId, Backend, Broker, Logger, Time, TaskNameSuffix);
-
default:
Y_FAIL("Unsupported strategy %s", NKikimrSchemeOp::ECompactionStrategy_Name(strategy).c_str());
}
diff --git a/ydb/core/tablet_flat/flat_executor_ut.cpp b/ydb/core/tablet_flat/flat_executor_ut.cpp
index 8b5d8b97e0c..b009c258d38 100644
--- a/ydb/core/tablet_flat/flat_executor_ut.cpp
+++ b/ydb/core/tablet_flat/flat_executor_ut.cpp
@@ -1809,116 +1809,6 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorSliceOverlapScan) {
}
-Y_UNIT_TEST_SUITE(TFlatTableExecutorShardedCompaction) {
-
- Y_UNIT_TEST(TestAutoSplit) {
- TMyEnvBase env;
- TRowsModel rows;
-
- //env->SetLogPriority(NKikimrServices::RESOURCE_BROKER, NActors::NLog::PRI_DEBUG);
- //env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG);
-
- env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
-
- TIntrusivePtr<TCompactionPolicy> policy = new TCompactionPolicy;
- policy->InMemForceSizeToSnapshot = 1024 * 1024;
- policy->CompactionStrategy = NKikimrSchemeOp::CompactionStrategySharded;
- policy->ShardPolicy.SetMinSliceSize(0);
- policy->ShardPolicy.SetMinShardSize(512 * 1024);
- policy->ShardPolicy.SetMaxShardSize(5 * 1024 * 1024 / 2);
- policy->ShardPolicy.SetNewDataPercentToCompact(99);
-
- env.SendSync(rows.MakeScheme(std::move(policy)));
-
- // Insert ~32MB of data, this will generate a 32MB slice, which
- // will split into ~2MB pieces over multiple rounds. There should be
- // at least 16 shards after this compaction settles.
- env.SendSync(rows.RowTo(1000000).MakeRows(32768, 1024, 32768));
- env.WaitFor<NFake::TEvCompacted>();
-
- // Insert 32MB more data, slices will lay on top and cause data to be
- // resharded and then recompacted in parallel. Since it briefly has
- // twice as much data, there should be 32 shards compacted in parallel.
- env.SendSync(rows.RowTo(1000000).MakeRows(32768, 1024, 32768));
- env.WaitFor<NFake::TEvCompacted>(/* memtable */ 1 + /* shards */ 32);
-
- // If we didn't crash, then assume the test succeeded
- env.SendSync(new TEvents::TEvPoison, false, true);
- }
-
- Y_UNIT_TEST(TestAutoMerge) {
- TMyEnvBase env;
- TRowsModel rows;
-
- //env->SetLogPriority(NKikimrServices::RESOURCE_BROKER, NActors::NLog::PRI_DEBUG);
- //env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG);
-
- env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
-
- TIntrusivePtr<TCompactionPolicy> policy = new TCompactionPolicy;
- policy->InMemForceSizeToSnapshot = 1024;
- policy->CompactionStrategy = NKikimrSchemeOp::CompactionStrategySharded;
- policy->ShardPolicy.SetMinSliceSize(0);
- policy->ShardPolicy.SetMinShardSize(512 * 1024);
- policy->ShardPolicy.SetMaxShardSize(5 * 1024 * 1024 / 2);
- policy->ShardPolicy.SetNewRowsPercentToCompact(99);
-
- env.SendSync(rows.MakeScheme(std::move(policy)));
-
- // Insert ~32MB of data, this will generate a 32MB slice, which
- // will split into ~2MB pieces over multiple rounds. There should be
- // at least 16 shards after this compaction settles.
- env.SendSync(rows.RowTo(1000000).MakeRows(32768, 1024, 32768));
- env.WaitFor<NFake::TEvCompacted>();
-
- // Erase all previously inserted rows, this will force every shard to
- // be compacted, after which all of them will merge back into a single
- // shard.
- env.SendSync(rows.RowTo(1000000).MakeErase(32768, 32768));
- env.WaitFor<NFake::TEvCompacted>(/* memtable */ 1 + /* shards */ 16);
-
- // If we didn't crash, then assume the test succeeded
- env.SendSync(new TEvents::TEvPoison, false, true);
- }
-
- Y_UNIT_TEST(TestSnapshotEraseMarkers) {
- TMyEnvBase env;
- TRowsModel rows;
-
- //env->SetLogPriority(NKikimrServices::RESOURCE_BROKER, NActors::NLog::PRI_DEBUG);
- //env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG);
-
- env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
-
- TIntrusivePtr<TCompactionPolicy> policy = new TCompactionPolicy;
- policy->InMemForceSizeToSnapshot = 1024 * 1024;
- policy->CompactionStrategy = NKikimrSchemeOp::CompactionStrategySharded;
- policy->ShardPolicy.SetMinSliceSize(0);
- policy->ShardPolicy.SetMinShardSize(512 * 1024);
- policy->ShardPolicy.SetMaxShardSize(5 * 1024 * 1024 / 2);
- policy->ShardPolicy.SetNewRowsPercentToCompact(99);
-
- env.SendSync(rows.MakeScheme(std::move(policy)));
-
- // Insert some rows
- env.SendSync(rows.RowTo(1000000).MakeRows(16, 32));
- {
- auto snapshot = TSnapshotModel::Create();
- env.SendSync(snapshot->Start());
- UNIT_ASSERT_VALUES_EQUAL(snapshot->Result().Flatten.size(), 1u);
- }
-
- // Erase some rows
- env.SendSync(rows.RowTo(1000000).MakeErase(16));
- {
- auto snapshot = TSnapshotModel::Create();
- env.SendSync(snapshot->Start());
- UNIT_ASSERT_VALUES_EQUAL(snapshot->Result().Flatten.size(), 2u);
- }
- }
-
-}
-
Y_UNIT_TEST_SUITE(TFlatTableExecutorColumnGroups) {
struct TTxSelectRows : public ITransaction {
diff --git a/ydb/core/tablet_flat/flat_part_charge.h b/ydb/core/tablet_flat/flat_part_charge.h
index 4ed618db6e0..ddc743f9c77 100644
--- a/ydb/core/tablet_flat/flat_part_charge.h
+++ b/ydb/core/tablet_flat/flat_part_charge.h
@@ -168,65 +168,6 @@ namespace NTable {
}
/**
- * Precharges data around the specified splitKey
- *
- * This method will ensure pages with the first key >= splitKey and the
- * last key < splitKey are precharged. This method will not try to
- * load pages outside of [beginRowId, endRowId) range.
- */
- bool SplitKey(const TCells splitKey, const TKeyCellDefaults& keyDefaults,
- const TRowId beginRowId, const TRowId endRowId) const noexcept
- {
- Y_VERIFY_DEBUG(beginRowId < endRowId, "Unexpected empty row range");
- Y_VERIFY_DEBUG(!Groups, "Unexpected column groups during SplitKey precharge");
-
- auto index = Index.TryLoadRaw();
- if (!index) {
- return false;
- }
-
- bool ready = true;
-
- // The first page that may contain splitKey
- auto found = index->LookupKey(splitKey, Scheme.Groups[0], ESeek::Lower, &keyDefaults);
-
- // Note: as we may have cut index key we may both need prev and next pages
-
- if (auto prev = found; prev.Off() && --prev) {
- TRowId pageBegin = prev->GetRowId();
- TRowId pageEnd = found ? found->GetRowId() : index->GetEndRowId();
- if (pageBegin < endRowId && beginRowId < pageEnd) {
- ready &= bool(Env->TryGetPage(Part, prev->GetPageId()));
- }
- }
-
- if (found && found->GetRowId() < endRowId) {
- bool needNext = true;
- if (found->GetRowId() < beginRowId) {
- // iterator may re-seek to the first page that's in range
- auto adjusted = index->LookupRow(beginRowId, found);
- if (found != adjusted) {
- found = adjusted;
- needNext = false;
- }
- }
- if (found) {
- ready &= bool(Env->TryGetPage(Part, found->GetPageId()));
- }
- if (needNext) {
- // splitKey may be on the next page
- if (auto next = found; ++next) {
- if (next->GetRowId() < endRowId) {
- ready &= bool(Env->TryGetPage(Part, next->GetPageId()));
- }
- }
- }
- }
-
- return ready;
- }
-
- /**
* Precharges data for rows between row1 and row2 inclusive
*
* Important caveat: assumes iteration won't touch any row > row2
diff --git a/ydb/core/tablet_flat/flat_stat_part.cpp b/ydb/core/tablet_flat/flat_stat_part.cpp
deleted file mode 100644
index 5fff09a577e..00000000000
--- a/ydb/core/tablet_flat/flat_stat_part.cpp
+++ /dev/null
@@ -1,98 +0,0 @@
-#include "flat_stat_part.h"
-
-namespace NKikimr {
-namespace NTable {
-
-ui64 TPartSmallSizeHelper::CalcSize(TRowId begin, TRowId end)
-{
- if (!Small || end <= begin) {
- return 0;
- }
- if (EndRow <= begin) {
- // Resynchronize so we start from an already known end
- BeginRow = EndRow;
- BeginPage = EndPage;
- Size = 0;
- }
- if (BeginRow != begin) {
- if (begin > BeginRow) {
- // Move starting position forward
- BeginPage = Small->Lower(begin, BeginPage, Max<ui32>());
- } else {
- // Move starting position backwards (shouldn't happen normally)
- BeginPage = Small->Lower(begin, 0, BeginPage);
- }
- BeginRow = begin;
- EndRow = begin;
- EndPage = BeginPage;
- Size = 0;
- } else if (end < EndRow) {
- // We seem to have some extra rows, dial back
- EndRow = BeginRow;
- EndPage = BeginPage;
- Size = 0;
- }
- Y_VERIFY(EndRow <= end);
- if (EndRow < end) {
- while (auto& rel = Small->Relation(EndPage)) {
- if (rel.Row < end) {
- Size += rel.Size;
- ++EndPage;
- } else {
- Y_VERIFY(rel.IsHead(), "Got unaligned NPage::TFrames head record");
- break;
- }
- }
- EndRow = end;
- }
- return Size;
-}
-
-ui64 TPartGroupSizeHelper::CalcSize(TRowId begin, TRowId end)
-{
- if (end <= begin) {
- return 0;
- }
- if (EndRow <= begin) {
- // Start searching from an already known end
- BeginRow = EndRow = begin;
- Begin = End = Index.LookupRow(BeginRow, End);
- Size = 0;
- Y_VERIFY(Begin, "Unexpected failure to find an index record");
- } else if (BeginRow != begin) {
- // Start searching from a previous start
- BeginRow = EndRow = begin;
- Begin = End = Index.LookupRow(BeginRow, Begin);
- Size = 0;
- Y_VERIFY(Begin, "Unexpected failure to find an index record");
- } else if (EndRow > end) {
- // We seem to have some extra rows, dial back
- EndRow = BeginRow;
- End = Begin;
- Size = 0;
- }
- Y_VERIFY(EndRow <= end);
- if (EndRow < end) {
- while (End && End->GetRowId() < end) {
- Size += Part->GetPageSize(End->GetPageId(), GroupId);
- ++End;
- }
- EndRow = end;
- }
- return Size;
-}
-
-ui64 TPartDataSizeHelper::CalcSize(TRowId begin, TRowId end)
-{
- if (begin >= PartEndRowId || end <= begin) {
- return 0;
- }
- ui64 size = SmallHelper.CalcSize(begin, end);
- for (auto& g : GroupHelpers) {
- size += g.CalcSize(begin, end);
- }
- return size;
-}
-
-}
-}
diff --git a/ydb/core/tablet_flat/flat_stat_part.h b/ydb/core/tablet_flat/flat_stat_part.h
index 212101611f2..b0e036768b6 100644
--- a/ydb/core/tablet_flat/flat_stat_part.h
+++ b/ydb/core/tablet_flat/flat_stat_part.h
@@ -30,87 +30,6 @@ struct TPartDataStats {
TPartDataSize DataSize = { };
};
-/**
- * Helper for calculating small blobs size between a pair of rows
- */
-class TPartSmallSizeHelper {
-public:
- TPartSmallSizeHelper(const TPart* part)
- : Small(part->Small.Get())
- {
- }
-
- /**
- * Returns size of small blobs in bytes between begin and end (not inclusive)
- */
- ui64 CalcSize(TRowId begin, TRowId end);
-
-private:
- const NPage::TFrames* Small;
- TRowId BeginRow = 0;
- TRowId EndRow = 0;
- ui32 BeginPage = 0;
- ui32 EndPage = 0;
- ui64 Size = 0;
-};
-
-/**
- * Helper for calculating column group size between a pair of rows
- */
-class TPartGroupSizeHelper {
-public:
- TPartGroupSizeHelper(const TPart* part, NPage::TGroupId groupId)
- : Part(part)
- , GroupId(groupId)
- , Index(Part->GetGroupIndex(groupId))
- , Begin(Index->Begin())
- , End(Begin)
- {
- Y_VERIFY(Begin, "Cannot find the first index page");
- }
-
- /**
- * Returns size of group in bytes between begin and end (not inclusive)
- */
- ui64 CalcSize(TRowId begin, TRowId end);
-
-private:
- const TPart* Part;
- const NPage::TGroupId GroupId;
- const NPage::TIndex& Index;
- NPage::TIndex::TIter Begin;
- NPage::TIndex::TIter End;
- TRowId BeginRow = 0;
- TRowId EndRow = 0;
- ui64 Size = 0;
-};
-
-/**
- * Helper for calculating upper bound size of part between a pair of rows
- */
-class TPartDataSizeHelper {
-public:
- TPartDataSizeHelper(const TPart* part)
- : PartEndRowId(part->Index.GetEndRowId())
- , SmallHelper(part)
- {
- GroupHelpers.reserve(part->GroupsCount);
- for (ui32 group : xrange(part->GroupsCount)) {
- GroupHelpers.emplace_back(part, NPage::TGroupId(group));
- }
- }
-
- /**
- * Returns size of part in bytes between begin and end (not inclusive)
- */
- ui64 CalcSize(TRowId begin, TRowId end);
-
-private:
- const TRowId PartEndRowId;
- TPartSmallSizeHelper SmallHelper;
- TSmallVec<TPartGroupSizeHelper> GroupHelpers;
-};
-
// Iterates over part index and calculates total row count and data size
// NOTE: we don't know row count for the last page so we also ignore its size
// This shouldn't be a problem for big parts with many pages
diff --git a/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt
index 9b99d9f774b..98947f97796 100644
--- a/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tablet_flat/ut/CMakeLists.darwin-x86_64.txt
@@ -58,7 +58,6 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_sausage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_stat.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_comp_gen.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_comp_shard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_compaction.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_compaction_multi.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_charge.cpp
diff --git a/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt b/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt
index 4b6443427c2..42bf30b3bad 100644
--- a/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tablet_flat/ut/CMakeLists.linux-aarch64.txt
@@ -61,7 +61,6 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_sausage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_stat.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_comp_gen.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_comp_shard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_compaction.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_compaction_multi.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_charge.cpp
diff --git a/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt
index 4323c2f2a36..af845e1051f 100644
--- a/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tablet_flat/ut/CMakeLists.linux-x86_64.txt
@@ -62,7 +62,6 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_sausage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_stat.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_comp_gen.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_comp_shard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_compaction.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_compaction_multi.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_charge.cpp
diff --git a/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt b/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt
index 3252f6dd46f..ff33e86aa7a 100644
--- a/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tablet_flat/ut/CMakeLists.windows-x86_64.txt
@@ -51,7 +51,6 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_sausage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_stat.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_comp_gen.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_comp_shard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_compaction.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_compaction_multi.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_charge.cpp
diff --git a/ydb/core/tablet_flat/ut/ut_comp_shard.cpp b/ydb/core/tablet_flat/ut/ut_comp_shard.cpp
deleted file mode 100644
index aa69db2495e..00000000000
--- a/ydb/core/tablet_flat/ut/ut_comp_shard.cpp
+++ /dev/null
@@ -1,1556 +0,0 @@
-#include "flat_comp_ut_common.h"
-
-#include <ydb/core/tablet_flat/flat_comp_shard.h>
-
-#include <ydb/core/tablet_flat/test/libs/table/test_envs.h>
-#include <ydb/core/tablet_flat/test/libs/rows/cook.h>
-#include <ydb/core/tablet_flat/test/libs/rows/layout.h>
-#include <ydb/core/tablet_flat/test/libs/rows/heap.h>
-#include <ydb/core/tablet_flat/test/libs/rows/rows.h>
-#include <ydb/core/tablet_flat/test/libs/rows/tool.h>
-#include <ydb/core/tablet_flat/test/libs/table/wrap_part.h>
-#include <ydb/core/tablet_flat/test/libs/table/test_comp.h>
-#include <ydb/core/tablet_flat/test/libs/table/test_writer.h>
-
-#include <ydb/core/tablet_flat/protos/flat_table_shard.pb.h>
-#include <ydb/core/tablet_flat/flat_cxx_database.h>
-#include <ydb/core/util/pb.h>
-
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <util/generic/hash.h>
-#include <util/generic/hash_set.h>
-
-#include <optional>
-
-namespace NKikimr {
-namespace NTable {
-namespace NCompShard {
-
-using namespace NTest;
-
-namespace {
-
- /**
- * A special kind of TTestEnv that will fail loading every page until
- * Load() call, and then would only pass previously requested pages
- */
- class TStrictEnv : public TTestEnv {
- public:
- const TSharedData *TryGetPage(const TPart *part, TPageId ref, TGroupId groupId) override {
- ui64 token = ref | (ui64(groupId.Raw()) << 32);
- auto& info = Parts[part];
-
- info.Touched.insert(token);
-
- if (info.Loaded.contains(token)) {
- return TTestEnv::TryGetPage(part, ref, groupId);
- }
-
- return nullptr;
- }
-
- void Load() {
- for (auto &p: Parts) {
- p.second.Loaded = std::move(p.second.Touched);
- }
- }
-
- private:
- struct TPartInfo {
- THashSet<ui64> Touched;
- THashSet<ui64> Loaded;
- };
-
- private:
- THashMap<const TPart*, TPartInfo> Parts;
- };
-
- struct TSimpleConsumer : public ISliceSplitResultConsumer {
- std::optional<TSliceSplitResult> Result;
-
- void OnSliceSplitResult(TSliceSplitResult result) override {
- Y_VERIFY(!Result, "Cannot store multiple results");
- Result.emplace(std::move(result));
- }
- };
-
- NPage::TConf CreateConf(ui32 rowsPerPage) {
- NPage::TConf conf(false, 8192);
- conf.Group(0).PageRows = rowsPerPage;
- return conf;
- }
-
- NPage::TConf CreateConf(ui32 rowsPerPage, NPage::IKeySpace* underlayMask) {
- NPage::TConf conf = CreateConf(rowsPerPage);
- conf.UnderlayMask = underlayMask;
- return conf;
- }
-
- NPage::TConf CreateConf(ui32 rowsPerPage, NPage::ISplitKeys* splitKeys) {
- NPage::TConf conf = CreateConf(rowsPerPage);
- conf.SplitKeys = splitKeys;
- return conf;
- }
-
- TPartView CreatePart(const TLayoutCook& lay, const TRowsHeap& rows, ui32 rowsPerPage) {
- return TPartCook(lay, CreateConf(rowsPerPage))
- .Add(rows.begin(), rows.end())
- .Finish()
- .ToPartView();
- }
-
- struct TSizeChange {
- ui64 AddLeftSize = 0;
- ui64 AddLeftRows = 0;
- ui64 SubRightSize = 0;
- ui64 SubRightRows = 0;
-
- TSizeChange& AddLeft(ui64 leftSize, ui64 leftRows) {
- AddLeftSize += leftSize;
- AddLeftRows += leftRows;
- return *this;
- }
-
- TSizeChange& SubRight(ui64 rightSize, ui64 rightRows) {
- SubRightSize += rightSize;
- SubRightRows += rightRows;
- return *this;
- }
-
- TSizeChange& Expect() {
- return *this;
- }
- };
-
- using TSizeChanges = TMap<ui64, TSizeChange>;
-
- struct TSizeStat {
- ui64 Key;
- ui64 LeftSize;
- ui64 LeftRows;
- ui64 RightSize;
- ui64 RightRows;
-
- bool operator==(const TSizeStat& o) const {
- return (
- Key == o.Key &&
- LeftSize == o.LeftSize &&
- LeftRows == o.LeftRows &&
- RightSize == o.RightSize &&
- RightRows == o.RightRows);
- }
-
- friend IOutputStream& operator<<(IOutputStream& out, const TSizeStat& v) {
- out << "key=" << v.Key
- << " left=" << v.LeftSize << "b/" << v.LeftRows << "r"
- << " right=" << v.RightSize << "b/" << v.RightRows << "r";
- return out;
- }
-
- friend IOutputStream& operator<<(IOutputStream& out, const TVector<TSizeStat>& v) {
- out << '{';
- bool first = true;
- for (auto& value : v) {
- if (first) {
- first = false;
- } else {
- out << ',';
- }
- out << ' ';
- out << value;
- }
- out << " }";
- return out;
- }
- };
-
- void VerifySizeChanges(TSplitStatIterator& it, const TSizeChanges& sizeChanges) {
- TVector<TSizeStat> expected;
- {
- ui64 leftSize = 0;
- ui64 leftRows = 0;
- ui64 rightSize = it.RightSize();
- ui64 rightRows = it.RightRows();
- for (auto& kv : sizeChanges) {
- leftSize += kv.second.AddLeftSize;
- leftRows += kv.second.AddLeftRows;
- rightSize -= kv.second.SubRightSize;
- rightRows -= kv.second.SubRightRows;
- expected.push_back({ kv.first, leftSize, leftRows, rightSize, rightRows });
- }
- }
-
- TVector<TSizeStat> actual;
- while (it.Next()) {
- auto keyCells = it.CurrentKey();
- UNIT_ASSERT_VALUES_EQUAL(keyCells.size(), 1u);
- auto key = keyCells[0].AsValue<ui64>();
- actual.push_back({ key, it.LeftSize(), it.LeftRows(), it.RightSize(), it.RightRows() });
- }
-
- auto getFirstDiffKey = [&]() -> ui64 {
- auto a = actual.begin();
- auto b = expected.begin();
- while (a != actual.end() && b != expected.end()) {
- if (!(*a == *b)) {
- return Min(a->Key, b->Key);
- }
- ++a;
- ++b;
- }
- if (a != actual.end()) {
- return a->Key;
- }
- if (b != expected.end()) {
- return b->Key;
- }
- return Max<ui64>();
- };
-
- UNIT_ASSERT_C(actual == expected,
- "Diff at " << getFirstDiffKey()
- << ": got " << actual
- << ", expected " << expected);
- }
-
-}
-
-Y_UNIT_TEST_SUITE(TShardedCompaction) {
-
- Y_UNIT_TEST(SplitSingleKey) {
- TLayoutCook lay;
- lay
- .Col(0, 0, NScheme::NTypeIds::Uint64)
- .Col(0, 1, NScheme::NTypeIds::Uint32)
- .Col(0, 8, NScheme::NTypeIds::Uint32)
- .Key({ 0, 1 });
-
- TRowTool tool(*lay);
- TRowsHeap rows(64 * 1024);
- for (ui64 seq = 0; seq < 4*4; ++seq) {
- rows.Put(*TSchemedCookRow(*lay).Col(seq, 500_u32, 42_u32));
- }
-
- auto partView = CreatePart(lay, rows, 4);
-
- for (size_t beginRow = 0; beginRow < rows.Size(); ++beginRow) {
- for (size_t endRow = beginRow + 1; endRow <= rows.Size(); ++endRow) {
- for (ui64 flags = 0; flags < 4; ++flags) {
- TSlice slice;
- slice.FirstInclusive = (flags & 1) == 1;
- slice.LastInclusive = (flags & 2) == 2;
- if (!slice.FirstInclusive && beginRow == 0) {
- continue;
- }
- if (!slice.LastInclusive && endRow == rows.Size()) {
- continue;
- }
- if (!slice.FirstInclusive && !slice.LastInclusive && endRow - beginRow <= 1) {
- continue;
- }
- slice.FirstRowId = beginRow - !slice.FirstInclusive;
- slice.LastRowId = endRow - slice.LastInclusive;
- slice.FirstKey = TSerializedCellVec(tool.KeyCells(rows[slice.FirstRowId]));
- slice.LastKey = TSerializedCellVec(tool.KeyCells(rows[slice.LastRowId]));
-
- for (ui64 splitRow = slice.FirstRowId; splitRow <= slice.LastRowId; ++splitRow) {
- for (ui64 splitFlags = 0; splitFlags < 2; ++splitFlags) {
- bool moveLeft = (splitFlags & 1) == 1;
- if (splitRow == slice.FirstRowId && (moveLeft || !slice.FirstInclusive)) {
- continue;
- }
-
- auto splitCells = tool.KeyCells(rows[splitRow]);
- if (moveLeft) {
- splitCells[1] = Cimple(0_u32);
- }
-
- TTableInfo table;
- table.RowScheme = lay.RowScheme();
- table.SplitKeys[1] = TSerializedCellVec(splitCells);
- TTableShard left;
- TTableShard right;
- left.RightKey = 1;
- right.LeftKey = 1;
- TVector<TTableShard*> pshards;
- pshards.push_back(&left);
- pshards.push_back(&right);
-
- TStrictEnv env;
- TSimpleConsumer consumer;
- TSliceSplitOp op(&consumer, &table, pshards, partView.Part, slice);
-
- // load index
- bool ok1 = op.Execute(&env);
- UNIT_ASSERT_VALUES_EQUAL(ok1, false);
- env.Load();
-
- // load data
- bool ok2 = op.Execute(&env);
- UNIT_ASSERT_VALUES_EQUAL(ok2, false);
- env.Load();
-
- bool ok3 = op.Execute(&env);
- UNIT_ASSERT_VALUES_EQUAL(ok3, true);
-
- auto& result = consumer.Result.value();
- size_t pos = 0;
- if (beginRow < splitRow) {
- TSlice output = result.NewSlices.at(pos++);
- UNIT_ASSERT_VALUES_EQUAL(output.BeginRowId(), beginRow);
- UNIT_ASSERT_VALUES_EQUAL(output.EndRowId(), splitRow);
- UNIT_ASSERT(output.LastInclusive);
- if (output.Rows() == 1) {
- UNIT_ASSERT(output.FirstInclusive);
- }
- }
- if (splitRow < endRow) {
- TSlice output = result.NewSlices.at(pos++);
- UNIT_ASSERT_VALUES_EQUAL(output.BeginRowId(), splitRow);
- UNIT_ASSERT_VALUES_EQUAL(output.EndRowId(), endRow);
- UNIT_ASSERT(output.FirstInclusive);
- if (output.Rows() == 1) {
- UNIT_ASSERT(output.LastInclusive);
- }
- }
- UNIT_ASSERT_VALUES_EQUAL(result.NewSlices.size(), pos);
- }
- }
- }
- }
- }
- }
-
- Y_UNIT_TEST(SplitMiddleEdgeCase) {
- TLayoutCook lay;
- lay
- .Col(0, 0, NScheme::NTypeIds::Uint64)
- .Col(0, 1, NScheme::NTypeIds::Uint32)
- .Col(0, 8, NScheme::NTypeIds::Uint32)
- .Key({ 0, 1 });
-
- TRowTool tool(*lay);
- TRowsHeap rows(64 * 1024);
- for (ui64 seq = 0; seq < 4*4+2; ++seq) {
- rows.Put(*TSchemedCookRow(*lay).Col(seq, 500_u32, 42_u32));
- }
-
- auto partView = CreatePart(lay, rows, 4);
-
- TSlice slice;
- slice.FirstRowId = 0;
- slice.FirstInclusive = true;
- slice.LastRowId = rows.Size() - 1;
- slice.LastInclusive = true;
- slice.FirstKey = TSerializedCellVec(tool.KeyCells(rows[slice.FirstRowId]));
- slice.LastKey = TSerializedCellVec(tool.KeyCells(rows[slice.LastRowId]));
-
- TRowId splitRow = rows.Size() / 2;
-
- TTableInfo table;
- table.RowScheme = lay.RowScheme();
- TIntrusiveListWithAutoDelete<TTableShard, TDelete> shards;
- shards.PushBack(new TTableShard);
- for (ui32 k = 1; k <= 5; ++k) {
- auto cells = tool.KeyCells(rows[splitRow]);
- cells[1] = Cimple(k);
- table.SplitKeys[k] = TSerializedCellVec(cells);
- auto* left = shards.Back();
- shards.PushBack(new TTableShard);
- auto* right = shards.Back();
- left->RightKey = k;
- right->LeftKey = k;
- }
- TVector<TTableShard*> pshards;
- for (auto& shard : shards) {
- pshards.push_back(&shard);
- }
-
- TStrictEnv env;
- TSimpleConsumer consumer;
- TSliceSplitOp op(&consumer, &table, pshards, partView.Part, slice);
-
- // load index
- bool ok1 = op.Execute(&env);
- UNIT_ASSERT_VALUES_EQUAL(ok1, false);
- env.Load();
-
- // load data
- bool ok2 = op.Execute(&env);
- UNIT_ASSERT_VALUES_EQUAL(ok2, false);
- env.Load();
-
- bool ok3 = op.Execute(&env);
- UNIT_ASSERT_VALUES_EQUAL(ok3, true);
-
- auto& result = consumer.Result.value();
- UNIT_ASSERT_VALUES_EQUAL(result.NewSlices.size(), 2u);
- UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[0].FirstRowId, 0u);
- UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[0].LastRowId, splitRow-1);
- UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[1].FirstRowId, splitRow);
- UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[1].LastRowId, rows.Size()-1);
- }
-
- Y_UNIT_TEST(SplitOutOfBoundsKeys) {
- TLayoutCook lay;
- lay
- .Col(0, 0, NScheme::NTypeIds::Uint64)
- .Col(0, 8, NScheme::NTypeIds::Uint32)
- .Key({ 0 });
-
- TRowTool tool(*lay);
- TRowsHeap rows(64 * 1024);
- for (ui64 seq = 0; seq < 10*4+3; ++seq) {
- rows.Put(*TSchemedCookRow(*lay).Col(1000 + seq, 42_u32));
- }
-
- auto partView = CreatePart(lay, rows, 4);
-
- TSlice slice;
- slice.FirstRowId = 3; // the first 4 rows are not included
- slice.FirstInclusive = false;
- slice.LastRowId = rows.Size() - 3; // the last 3 rows are not included
- slice.LastInclusive = false;
- slice.FirstKey = TSerializedCellVec(tool.KeyCells(rows[slice.FirstRowId]));
- slice.LastKey = TSerializedCellVec(tool.KeyCells(rows[slice.LastRowId]));
-
- TRowId splitRow = rows.Size() / 2;
- TVector<TSerializedCellVec> splitKeys;
- splitKeys.emplace_back(tool.KeyCells(*TSchemedCookRow(*lay).Col(500_u64, 42_u32)));
- splitKeys.emplace_back(tool.KeyCells(rows[splitRow]));
- splitKeys.emplace_back(tool.KeyCells(*TSchemedCookRow(*lay).Col(5000_u64, 42_u32)));
-
- TTableInfo table;
- table.RowScheme = lay.RowScheme();
- TIntrusiveListWithAutoDelete<TTableShard, TDelete> shards;
- shards.PushBack(new TTableShard);
- for (size_t k = 0; k < splitKeys.size(); ++k) {
- table.SplitKeys[k+1] = splitKeys[k];
- auto* left = shards.Back();
- shards.PushBack(new TTableShard);
- auto* right = shards.Back();
- left->RightKey = k + 1;
- right->LeftKey = k + 1;
- }
-
- TVector<TTableShard*> pshards;
- for (auto& shard : shards) {
- pshards.push_back(&shard);
- }
-
- TStrictEnv env;
- TSimpleConsumer consumer;
- TSliceSplitOp op(&consumer, &table, pshards, partView.Part, slice);
-
- // load index
- bool ok1 = op.Execute(&env);
- UNIT_ASSERT_VALUES_EQUAL(ok1, false);
- env.Load();
-
- // load data
- bool ok2 = op.Execute(&env);
- UNIT_ASSERT_VALUES_EQUAL(ok2, false);
- env.Load();
-
- bool ok3 = op.Execute(&env);
- UNIT_ASSERT_VALUES_EQUAL(ok3, true);
-
- auto& result = consumer.Result.value();
- UNIT_ASSERT_VALUES_EQUAL(result.NewSlices.size(), 2u);
-
- // The first row inclusion is currently a side effect
- UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[0].FirstRowId, 4u);
- UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[0].FirstInclusive, true);
- UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[0].LastRowId, splitRow - 1);
- UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[0].LastInclusive, true);
-
- // The last row is not included just like in the original
- UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[1].FirstRowId, splitRow);
- UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[1].FirstInclusive, true);
- UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[1].LastRowId, rows.Size() - 3);
- UNIT_ASSERT_VALUES_EQUAL(result.NewSlices[1].LastInclusive, false);
- }
-
- Y_UNIT_TEST(SplitStatSimple) {
- TLayoutCook lay;
- lay
- .Col(0, 0, NScheme::NTypeIds::Uint64)
- .Col(0, 8, NScheme::NTypeIds::Uint32)
- .Key({ 0 });
-
- const ui64 pageCount = 16;
- const ui64 rowsPerPage = 4;
-
- TRowTool tool(*lay);
- TRowsHeap rows(64 * 1024);
- for (ui64 seq = 0; seq < rowsPerPage * pageCount; ++seq) {
- rows.Put(*TSchemedCookRow(*lay).Col(1000 + seq, 42_u32));
- }
-
- auto partView = CreatePart(lay, rows, rowsPerPage);
- UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 1u);
-
- const auto* part = partView.Part.Get();
- const auto slice = partView.Slices->front();
-
- // All pages are expected to have the same size
- auto pageSize = part->GetPageSize(part->Index->Begin()->GetPageId());
-
- for (int count = 1; count <= 3; ++count) {
- TSplitStatIterator it(*(*lay).Keys);
-
- // It shouldn't matter that we're adding the same slice
- // What matters is they all produce the exact same key
- for (int i = 0; i < count; ++i) {
- it.AddSlice(part, slice, pageSize * pageCount);
- }
-
- ui64 keyIndex = 0;
- ui64 leftSize = 0;
- ui64 leftRows = 0;
- ui64 rightSize = pageSize * pageCount * count;
- ui64 rightRows = rowsPerPage * pageCount * count;
-
- while (it.Next()) {
- // The first key is expected to be the first slice key
- auto key = it.CurrentKey();
- UNIT_ASSERT_VALUES_EQUAL(key.size(), 1u);
- auto expected = tool.KeyCells(rows[keyIndex]);
- UNIT_ASSERT_VALUES_EQUAL(expected.size(), 1u);
-
- UNIT_ASSERT_VALUES_EQUAL(key[0].AsValue<ui64>(), expected[0].AsValue<ui64>());
- UNIT_ASSERT_VALUES_EQUAL(it.LeftSize(), leftSize);
- UNIT_ASSERT_VALUES_EQUAL(it.RightSize(), rightSize);
- UNIT_ASSERT_VALUES_EQUAL(it.LeftRows(), leftRows);
- UNIT_ASSERT_VALUES_EQUAL(it.RightRows(), rightRows);
-
- keyIndex += rowsPerPage;
- leftSize += pageSize * count;
- leftRows += rowsPerPage * count;
- rightSize -= pageSize * count;
- rightRows -= rowsPerPage * count;
- }
-
- UNIT_ASSERT_VALUES_EQUAL(keyIndex, rowsPerPage * pageCount);
- }
- }
-
- Y_UNIT_TEST(SplitStatComplex) {
- TLayoutCook lay;
- lay
- .Col(0, 0, NScheme::NTypeIds::Uint64)
- .Col(0, 8, NScheme::NTypeIds::Uint32)
- .Key({ 0 });
-
- const ui64 pageCount = 16;
-
- TRowsHeap rows1(64 * 1024);
- for (ui64 seq = 0; seq < 4 * pageCount; ++seq) {
- rows1.Put(*TSchemedCookRow(*lay).Col(1000 + seq, 42_u32));
- }
-
- TRowsHeap rows2(64 * 1024);
- for (ui64 seq = 0; seq < 2 * pageCount; ++seq) {
- rows2.Put(*TSchemedCookRow(*lay).Col(1002 + seq * 2, 42_u32));
- }
-
- TRowsHeap rows3(64 * 1024);
- for (ui64 seq = 0; seq < 1 * pageCount; ++seq) {
- rows3.Put(*TSchemedCookRow(*lay).Col(1003 + seq * 4, 42_u32));
- }
-
- auto partView1 = CreatePart(lay, rows1, 4);
- auto partView2 = CreatePart(lay, rows2, 2);
- auto partView3 = CreatePart(lay, rows3, 1);
- for (const auto& partView : { partView1, partView2, partView3 }) {
- UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 1u);
- }
-
- auto pageSize1 = partView1.Part->GetPageSize(partView1.Part->Index->Begin()->GetPageId());
- auto pageSize2 = partView2.Part->GetPageSize(partView2.Part->Index->Begin()->GetPageId());
- auto pageSize3 = partView3.Part->GetPageSize(partView3.Part->Index->Begin()->GetPageId());
-
- // The expected keys are:
- // 1000, 1002, 1003, 1004, 1006, 1007, 1008, ...
- TSizeChanges sizeChanges;
-
- for (ui64 page = 0; page < pageCount; ++page) {
- // 1000, 1004, 1008, ...
- sizeChanges[1000 + page * 4]
- .Expect();
-
- // 1002, 1006, 1010, ...
- sizeChanges[1002 + page * 4]
- .AddLeft(pageSize1, 4);
-
- // 1003, 1007, 1011, ...
- sizeChanges[1003 + page * 4]
- .AddLeft(pageSize2, 2);
-
- if (page > 0) {
- // 1004, 1008, ...
- sizeChanges[1000 + page * 4]
- .SubRight(pageSize1, 4)
- .AddLeft(pageSize3, 1)
- .SubRight(pageSize3, 1);
-
- // 1006, 1010, ...
- sizeChanges[1002 + page * 4]
- .SubRight(pageSize2, 2);
- }
- }
-
- TSplitStatIterator it(*(*lay).Keys);
- it.AddSlice(partView1.Part.Get(), partView1.Slices->front(), pageSize1 * pageCount);
- it.AddSlice(partView2.Part.Get(), partView2.Slices->front(), pageSize2 * pageCount);
- it.AddSlice(partView3.Part.Get(), partView3.Slices->front(), pageSize3 * pageCount);
- VerifySizeChanges(it, sizeChanges);
- }
-
- Y_UNIT_TEST(SplitStatNextVersusStartStop) {
- TLayoutCook lay;
- lay
- .Col(0, 0, NScheme::NTypeIds::Uint64)
- .Col(0, 8, NScheme::NTypeIds::Uint32)
- .Key({ 0 });
-
- const ui64 pageCount = 4;
-
- TRowsHeap rows1(64 * 1024);
- for (ui64 seq = 0; seq < 4 * pageCount; ++seq) {
- rows1.Put(*TSchemedCookRow(*lay).Col(1000 + seq, 42_u32));
- }
-
- TRowsHeap rows2(64 * 1024);
- for (ui64 seq = 0; seq < 2 * pageCount; ++seq) {
- rows2.Put(*TSchemedCookRow(*lay).Col(1006 + seq * 2, 42_u32));
- }
-
- auto partView1 = CreatePart(lay, rows1, 4);
- auto partView2 = CreatePart(lay, rows2, 2);
- for (const auto& partView : { partView1, partView2 }) {
- UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 1u);
- }
-
- auto pageSize1 = partView1.Part->GetPageSize(partView1.Part->Index->Begin()->GetPageId());
- auto pageSize2 = partView2.Part->GetPageSize(partView2.Part->Index->Begin()->GetPageId());
-
- // Expected keys are 1000, 1004, 1006, 1008, 1010, 1012, ...
- TSizeChanges sizeChanges;
-
- for (ui64 page = 0; page < pageCount; ++page) {
- // 1000, 1004, 1008, ...
- sizeChanges[1000 + 4 * page].Expect();
-
- // 1006, 1010, 1014, ...
- sizeChanges[1006 + 4 * page].Expect();
-
- if (page > 0) {
- // 1004, 1008, ...
- sizeChanges[1000 + 4 * page].SubRight(pageSize1, 4);
-
- // 1010, 1014, ...
- sizeChanges[1006 + 4 * page].SubRight(pageSize2, 2);
- }
-
- if (page > 1) {
- // 1008, 1012, ...
- sizeChanges[1000 + 4 * page].AddLeft(pageSize2, 2);
- }
-
- if (page < pageCount - 1) {
- // 1006, 1010, ...
- sizeChanges[1006 + 4 * page].AddLeft(pageSize1, 4);
- }
- }
-
- sizeChanges[1004].AddLeft(pageSize1, 4);
- sizeChanges[1006 + (pageCount - 1) * 4].SubRight(pageSize1, 4);
- sizeChanges[1006 + (pageCount - 1) * 4].AddLeft(pageSize2, 2);
-
- TSplitStatIterator it(*(*lay).Keys);
- it.AddSlice(partView1.Part.Get(), partView1.Slices->front(), pageSize1 * pageCount);
- it.AddSlice(partView2.Part.Get(), partView2.Slices->front(), pageSize2 * pageCount);
- VerifySizeChanges(it, sizeChanges);
- }
-
- Y_UNIT_TEST(SplitStatPageInsidePage) {
- TLayoutCook lay;
- lay
- .Col(0, 0, NScheme::NTypeIds::Uint64)
- .Col(0, 8, NScheme::NTypeIds::Uint32)
- .Key({ 0 });
-
- const ui64 pageCount = 3;
-
- TRowsHeap rows1(64 * 1024);
- for (ui64 seq = 0; seq < 4 * pageCount; ++seq) {
- rows1.Put(*TSchemedCookRow(*lay).Col(1000 + seq, 42_u32));
- }
-
- TRowsHeap rows2(64 * 1024);
- rows2.Put(*TSchemedCookRow(*lay).Col(1005_u64, 42_u32));
- rows2.Put(*TSchemedCookRow(*lay).Col(1006_u64, 42_u32));
-
- auto partView1 = CreatePart(lay, rows1, 4);
- auto partView2 = CreatePart(lay, rows2, 4);
- for (const auto& partView : { partView1, partView2 }) {
- UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 1u);
- }
-
- auto pageSize1 = partView1.Part->GetPageSize(partView1.Part->Index->Begin()->GetPageId());
- auto pageSize2 = partView2.Part->GetPageSize(partView2.Part->Index->Begin()->GetPageId());
-
- // Expected keys are 1000, 1004, 1005, 1008
- TSizeChanges sizeChanges;
-
- sizeChanges[1000].Expect();
- sizeChanges[1004]
- .AddLeft(pageSize1, 4)
- .SubRight(pageSize1, 4);
- sizeChanges[1005]
- .AddLeft(pageSize1, 4);
- sizeChanges[1008]
- .AddLeft(pageSize2, 2)
- .SubRight(pageSize2, 2)
- .SubRight(pageSize1, 4);
-
- TSplitStatIterator it(*(*lay).Keys);
- it.AddSlice(partView1.Part.Get(), partView1.Slices->front(), pageSize1 * pageCount);
- it.AddSlice(partView2.Part.Get(), partView2.Slices->front(), pageSize2 * pageCount);
- VerifySizeChanges(it, sizeChanges);
- }
-
- Y_UNIT_TEST(PageReuseTest) {
- TLayoutCook lay;
- lay
- .Col(0, 0, NScheme::NTypeIds::Uint64)
- .Col(0, 8, NScheme::NTypeIds::Uint32)
- .Key({ 0 });
-
- const ui64 pageCount = 16;
- TRowTool tool(*lay);
-
- TRowsHeap rows1(64 * 1024);
- for (ui64 seq = 0; seq < 4 * pageCount; ++seq) {
- rows1.Put(*TSchemedCookRow(*lay).Col(1000 + seq, 42_u32));
- }
-
- TRowsHeap rows2(64 * 1024);
- for (ui64 seq = 0; seq < 5; ++seq) {
- rows2.Put(*TSchemedCookRow(*lay).Col(1009 + seq, 42_u32));
- }
-
- auto partView1 = CreatePart(lay, rows1, 4);
- auto partView2 = CreatePart(lay, rows2, 4);
- for (const auto& partView : { partView1, partView2 }) {
- UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 1u);
- }
-
- TPageReuseBuilder builder(*lay.RowScheme()->Keys);
-
- {
- TSlice slice = partView1.Slices->at(0);
- slice.FirstRowId += 2;
- slice.LastRowId -= 2;
- slice.FirstKey = TSerializedCellVec(tool.KeyCells(rows1[slice.FirstRowId]));
- slice.LastKey = TSerializedCellVec(tool.KeyCells(rows1[slice.LastRowId]));
- builder.AddSlice(partView1.Part.Get(), slice, true);
- }
-
- builder.AddSlice(partView2.Part.Get(), partView2.Slices->at(0), true);
-
- auto results = builder.Build();
-
- UNIT_ASSERT_VALUES_EQUAL(results.Reusable.size(), 2u);
-
- auto& reusable1 = results.Reusable.at(0);
- UNIT_ASSERT_VALUES_EQUAL(reusable1.Slice.FirstRowId, 4u);
- UNIT_ASSERT_VALUES_EQUAL(reusable1.Slice.FirstInclusive, true);
- UNIT_ASSERT_VALUES_EQUAL(reusable1.Slice.LastRowId, 8u);
- UNIT_ASSERT_VALUES_EQUAL(reusable1.Slice.LastInclusive, false);
-
- auto& reusable2 = results.Reusable.at(1);
- UNIT_ASSERT_VALUES_EQUAL(reusable2.Slice.FirstRowId, 16u);
- UNIT_ASSERT_VALUES_EQUAL(reusable2.Slice.FirstInclusive, true);
- UNIT_ASSERT_VALUES_EQUAL(reusable2.Slice.LastRowId, partView1->Index.GetEndRowId() - 4);
- UNIT_ASSERT_VALUES_EQUAL(reusable2.Slice.LastInclusive, false);
-
- UNIT_ASSERT_VALUES_EQUAL(results.ExpectedSlices, 5u);
- }
-
- Y_UNIT_TEST(CompactWithUnderlayMask) {
- TLayoutCook lay;
- lay
- .Col(0, 0, NScheme::NTypeIds::Uint64)
- .Col(0, 8, NScheme::NTypeIds::Uint32)
- .Key({ 0 });
-
- const ui64 rowsCount = 20;
-
- TRowTool tool(*lay);
- TRowsHeap rows(64 * 1024);
- TPartCook cook(lay, { false, 4096 });
- for (ui64 seq = 0; seq < rowsCount; ++seq) {
- rows.Put(*TSchemedCookRow(*lay).Col(1000 + seq, 42_u32));
- cook.AddOpN(ERowOp::Erase, 1000 + seq);
- }
-
- auto source = cook.Finish();
-
- {
- auto partView = source.ToPartView();
- UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 1u);
- UNIT_ASSERT_VALUES_EQUAL(partView.Slices->at(0).Rows(), rowsCount);
- }
-
- TVector<TBounds> underlayMaskValues;
-
- {
- auto& bounds = underlayMaskValues.emplace_back();
- bounds.FirstKey = TSerializedCellVec(tool.KeyCells(rows[3]));
- bounds.LastKey = TSerializedCellVec(tool.KeyCells(rows[7]));
- bounds.FirstInclusive = false;
- bounds.LastInclusive = true;
- }
-
- {
- auto& bounds = underlayMaskValues.emplace_back();
- bounds.FirstKey = TSerializedCellVec(tool.KeyCells(rows[11]));
- bounds.LastKey = TSerializedCellVec(tool.KeyCells(rows[14]));
- bounds.FirstInclusive = true;
- bounds.LastInclusive = false;
- }
-
- TVector<const TBounds*> underlayMaskPointers;
- underlayMaskPointers.emplace_back(&underlayMaskValues.at(0));
- underlayMaskPointers.emplace_back(&underlayMaskValues.at(1));
- auto underlayMask = TUnderlayMask::Build(lay.RowScheme(), underlayMaskPointers);
-
- auto born = TCompaction(nullptr, CreateConf(16, underlayMask.Get()))
- .Do(lay.RowScheme(), { &source });
-
- // Check only erase markers under the mask are kept intact
- TCheckIt(born, { nullptr, 0 }, nullptr, true /* expand defaults */)
- .To(10).Seek({ }, ESeek::Lower).Is(EReady::Data)
- .To(21).IsOpN(ERowOp::Erase, 1004_u64, ECellOp::Empty).Next()
- .To(22).IsOpN(ERowOp::Erase, 1005_u64, ECellOp::Empty).Next()
- .To(23).IsOpN(ERowOp::Erase, 1006_u64, ECellOp::Empty).Next()
- .To(24).IsOpN(ERowOp::Erase, 1007_u64, ECellOp::Empty).Next()
- .To(25).IsOpN(ERowOp::Erase, 1011_u64, ECellOp::Empty).Next()
- .To(26).IsOpN(ERowOp::Erase, 1012_u64, ECellOp::Empty).Next()
- .To(27).IsOpN(ERowOp::Erase, 1013_u64, ECellOp::Empty).Next()
- .To(30).Is(EReady::Gone);
- }
-
- Y_UNIT_TEST(CompactWithSplitKeys) {
- TLayoutCook lay;
- lay
- .Col(0, 0, NScheme::NTypeIds::Uint64)
- .Col(0, 8, NScheme::NTypeIds::Uint32)
- .Key({ 0 });
-
- const ui64 pageCount = 5;
-
- TRowTool tool(*lay);
- TRowsHeap rows(64 * 1024);
- for (ui64 seq = 0; seq < 4 * pageCount; ++seq) {
- rows.Put(*TSchemedCookRow(*lay).Col(1000 + seq, 42_u32));
- }
-
- auto source = TPartCook(lay, CreateConf(4))
- .Add(rows.begin(), rows.end())
- .Finish();
-
- {
- auto partView = source.ToPartView();
- UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 1u);
- UNIT_ASSERT_VALUES_EQUAL(partView->Index->Count, pageCount);
- }
-
- TVector<TSerializedCellVec> splitKeyValues;
- splitKeyValues.emplace_back(tool.KeyCells(rows[6]));
- splitKeyValues.emplace_back(tool.KeyCells(rows[14]));
- TSplitKeys splitKeys(lay.RowScheme(), std::move(splitKeyValues));
-
- auto born = TCompaction(nullptr, CreateConf(4, &splitKeys))
- .Do(lay.RowScheme(), { &source });
-
- {
- auto partView = born.ToPartView();
- UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 3u);
- UNIT_ASSERT_VALUES_EQUAL(partView->Index->Count, 6u);
-
- UNIT_ASSERT_VALUES_EQUAL(partView->Index->Record(0)->GetRowId(), 0u);
- UNIT_ASSERT_VALUES_EQUAL(partView->Index->Record(1)->GetRowId(), 4u);
- UNIT_ASSERT_VALUES_EQUAL(partView->Index->Record(2)->GetRowId(), 6u);
- UNIT_ASSERT_VALUES_EQUAL(partView->Index->Record(3)->GetRowId(), 10u);
- UNIT_ASSERT_VALUES_EQUAL(partView->Index->Record(4)->GetRowId(), 14u);
- UNIT_ASSERT_VALUES_EQUAL(partView->Index->Record(5)->GetRowId(), 18u);
-
- auto key0 = TSerializedCellVec::Serialize(tool.KeyCells(rows[0]));
- auto key5 = TSerializedCellVec::Serialize(tool.KeyCells(rows[5]));
- auto& slice0 = partView.Slices->at(0);
- UNIT_ASSERT_VALUES_EQUAL(slice0.FirstRowId, 0u);
- UNIT_ASSERT_VALUES_EQUAL(slice0.LastRowId, 5u);
- UNIT_ASSERT_VALUES_EQUAL(slice0.FirstKey.GetBuffer(), key0);
- UNIT_ASSERT_VALUES_EQUAL(slice0.LastKey.GetBuffer(), key5);
-
- auto key6 = TSerializedCellVec::Serialize(tool.KeyCells(rows[6]));
- auto key13 = TSerializedCellVec::Serialize(tool.KeyCells(rows[13]));
- auto& slice1 = partView.Slices->at(1);
- UNIT_ASSERT_VALUES_EQUAL(slice1.FirstRowId, 6u);
- UNIT_ASSERT_VALUES_EQUAL(slice1.LastRowId, 13u);
- UNIT_ASSERT_VALUES_EQUAL(slice1.FirstKey.GetBuffer(), key6);
- UNIT_ASSERT_VALUES_EQUAL(slice1.LastKey.GetBuffer(), key13);
-
- auto key14 = TSerializedCellVec::Serialize(tool.KeyCells(rows[14]));
- auto key19 = TSerializedCellVec::Serialize(tool.KeyCells(rows[19]));
- auto& slice2 = partView.Slices->at(2);
- UNIT_ASSERT_VALUES_EQUAL(slice2.FirstRowId, 14u);
- UNIT_ASSERT_VALUES_EQUAL(slice2.LastRowId, 19u);
- UNIT_ASSERT_VALUES_EQUAL(slice2.FirstKey.GetBuffer(), key14);
- UNIT_ASSERT_VALUES_EQUAL(slice2.LastKey.GetBuffer(), key19);
- }
- }
-
-}
-
-Y_UNIT_TEST_SUITE(TShardedCompactionScenarios) {
-
- struct Schema : NIceDb::Schema {
- struct Data : Table<1> {
- struct Key : Column<1, NScheme::NTypeIds::Uint64> { };
- struct Value : Column<2, NScheme::NTypeIds::Uint32> { };
-
- using TKey = TableKey<Key>;
- using TColumns = TableColumns<Key, Value>;
- };
-
- using TTables = SchemaTables<Data>;
- };
-
- Y_UNIT_TEST(SimpleNoResharding) {
- TSimpleBackend backend;
- TSimpleBroker broker;
- TSimpleLogger logger;
- TSimpleTime time;
-
- // Initialize the schema
- {
- auto db = backend.Begin();
- db.Materialize<Schema>();
-
- // special policy with disabled nursery
- TCompactionPolicy policy;
- policy.ShardPolicy.SetMinSliceSize(0);
- backend.DB.Alter().SetCompactionPolicy(1, policy);
-
- backend.Commit();
- }
-
- // Insert some initial rows and compact them outside of strategy
- {
- auto db = backend.Begin();
- for (ui64 seq = 0; seq < 64; ++seq) {
- db.Table<Schema::Data>().Key(1000 + seq).Update<Schema::Data::Value>(42);
- }
- backend.Commit();
- backend.SimpleMemCompaction(1);
-
- UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(1).size(), 1u);
- }
-
- TShardedCompactionStrategy strategy(1, &backend, &broker, &logger, &time, "suffix");
-
- {
- TCompactionState initialState;
- strategy.Start(initialState);
- }
-
- // Don't expect any tasks or change requests
- UNIT_ASSERT(!broker.HasPending());
- UNIT_ASSERT(!backend.PendingReads);
- UNIT_ASSERT(!backend.StartedCompactions);
- UNIT_ASSERT(!backend.CheckChangesFlag());
-
- // Erase and insert some more rows
- {
- auto db = backend.Begin();
- for (ui64 seq = 0; seq < 256; ++seq) {
- if (seq < 128) {
- db.Table<Schema::Data>().Key(1000 - 64 + seq).Delete();
- } else {
- db.Table<Schema::Data>().Key(1000 - 64 + seq).Update<Schema::Data::Value>(43);
- }
- }
- backend.Commit();
- }
-
- // Start a memtable compaction using this strategy
- {
- auto memCompactionId = strategy.BeginMemCompaction(0, { 0, TEpoch::Max() }, 0);
- UNIT_ASSERT(memCompactionId != 0);
- auto outcome = backend.RunCompaction(memCompactionId);
-
- UNIT_ASSERT(outcome.Params);
- UNIT_ASSERT(!outcome.Params->Parts);
- UNIT_ASSERT(!outcome.Params->IsFinal);
-
- // We expect that only 64 out of 128 drops survive
- UNIT_ASSERT(outcome.Result);
- UNIT_ASSERT_VALUES_EQUAL(outcome.Result->Parts.size(), 1u);
- UNIT_ASSERT(outcome.Result->Parts[0]);
- UNIT_ASSERT_VALUES_EQUAL(outcome.Result->Parts[0]->Stat.Drops, 64u);
-
- auto changes = strategy.CompactionFinished(
- memCompactionId, std::move(outcome.Params), std::move(outcome.Result));
-
- // Don't expect any slice changes at this time
- UNIT_ASSERT(changes.SliceChanges.empty());
- }
-
- // There should be a compaction task pending right now
- UNIT_ASSERT(broker.RunPending());
- UNIT_ASSERT(!broker.HasPending());
-
- // There should be compaction started right now
- UNIT_ASSERT_VALUES_EQUAL(backend.StartedCompactions.size(), 1u);
-
- // Perform this compaction
- {
- auto outcome = backend.RunCompaction();
- UNIT_ASSERT(outcome.Params->Parts);
- UNIT_ASSERT(outcome.Params->IsFinal);
-
- // We expect that none of the original drops survive
- UNIT_ASSERT_VALUES_EQUAL(outcome.Result->Parts.size(), 1u);
- UNIT_ASSERT_VALUES_EQUAL(outcome.Result->Parts[0]->Stat.Drops, 0u);
-
- auto changes = strategy.CompactionFinished(
- outcome.CompactionId, std::move(outcome.Params), std::move(outcome.Result));
-
- // Don't expect any slice changes at this time
- UNIT_ASSERT(changes.SliceChanges.empty());
- }
-
- UNIT_ASSERT(!broker.HasPending());
- UNIT_ASSERT(!backend.StartedCompactions);
- }
-
- Y_UNIT_TEST(NurserySequential) {
- TSimpleBackend backend;
- TSimpleBroker broker;
- TSimpleLogger logger;
- TSimpleTime time;
-
- const ui64 triggerSize = 16 * 1024;
-
- // Initialize the schema
- {
- auto db = backend.Begin();
- db.Materialize<Schema>();
-
- // special policy with a small nursery
- TCompactionPolicy policy;
- policy.ShardPolicy.SetMinSliceSize(triggerSize);
- backend.DB.Alter().SetCompactionPolicy(1, policy);
-
- backend.Commit();
- }
-
- TShardedCompactionStrategy strategy(1, &backend, &broker, &logger, &time, "suffix");
- strategy.Start({ });
-
- size_t memCompactions = 0;
- for (ui64 base = 0; base < 16 * 1024; base += 128) {
- auto db = backend.Begin();
- for (ui64 seq = 0; seq < 128; ++seq) {
- db.Table<Schema::Data>().Key(base + seq).Update<Schema::Data::Value>(42);
- }
- backend.Commit();
- backend.SimpleMemCompaction(&strategy);
- ++memCompactions;
-
- UNIT_ASSERT_C(!broker.HasPending(),
- "Strategy shouldn't request tasks in fully sequential case");
- }
-
- UNIT_ASSERT(!broker.HasPending());
- UNIT_ASSERT(!broker.HasRunning());
- UNIT_ASSERT(!backend.StartedCompactions);
-
- size_t countAll = 0;
- size_t countBig = 0;
- size_t countSmall = 0;
- for (auto& part : backend.TableParts(1)) {
- ++countAll;
- if (part->BackingSize() >= triggerSize) {
- ++countBig;
- } else {
- ++countSmall;
- }
- }
-
- UNIT_ASSERT_C(countAll < memCompactions && countBig > countSmall,
- "Produced " << countAll << " parts after " << memCompactions << " compactions ("
- << countBig << " big and " << countSmall << " small)");
- }
-
- Y_UNIT_TEST(SequentialOverlap) {
- TSimpleBackend backend;
- TSimpleBroker broker;
- TSimpleLogger logger;
- TSimpleTime time;
-
- // Initialize the schema
- {
- auto db = backend.Begin();
- db.Materialize<Schema>();
-
- // special policy without nursery and extremely reusable slices
- TCompactionPolicy policy;
- policy.ShardPolicy.SetMinSliceSize(0);
- policy.ShardPolicy.SetMinSliceSizeToReuse(1);
- policy.ShardPolicy.SetNewDataPercentToCompact(50);
- backend.DB.Alter().SetCompactionPolicy(1, policy);
-
- backend.Commit();
- }
-
- TShardedCompactionStrategy strategy(1, &backend, &broker, &logger, &time, "suffix");
- strategy.Start({ });
-
- const ui64 rowsPerTx = 16 * 1024;
- for (ui64 attempt = 0; attempt < 2; ++attempt) {
- const ui64 base = (rowsPerTx - 1) * attempt;
- auto db = backend.Begin();
- for (ui64 seq = 0; seq < rowsPerTx; ++seq) {
- db.Table<Schema::Data>().Key(base + seq).Update<Schema::Data::Value>(42);
- }
- backend.Commit();
- backend.SimpleMemCompaction(&strategy);
-
- // Run all pending tasks and compactions
- while (broker.RunPending()) {
- while (backend.SimpleTableCompaction(1, &broker, &strategy)) {
- // nothing
- }
- }
- }
-
- UNIT_ASSERT(!broker.HasPending());
- UNIT_ASSERT(!broker.HasRunning());
- UNIT_ASSERT(!backend.StartedCompactions);
-
- // We expect overlap to be compacted with the rest reused
- UNIT_ASSERT_VALUES_EQUAL(
- backend.DumpKeyRanges(1, true),
- "[{0}, {16168})@1/4 [{16168}, {16758}]@2/8 [{16759}, {32766}]@2/7");
- }
-
- Y_UNIT_TEST(SequentialSplit) {
- TSimpleBackend backend;
- TSimpleBroker broker;
- TSimpleLogger logger;
- TSimpleTime time;
-
- // Initialize the schema
- {
- auto db = backend.Begin();
- db.Materialize<Schema>();
-
- // special policy without nursery and small shard size
- TCompactionPolicy policy;
- policy.ShardPolicy.SetMinSliceSize(0);
- policy.ShardPolicy.SetMaxShardSize(512 * 1024);
- backend.DB.Alter().SetCompactionPolicy(1, policy);
-
- backend.Commit();
- }
-
- TShardedCompactionStrategy strategy(1, &backend, &broker, &logger, &time, "suffix");
- strategy.Start({ });
-
- const ui64 rowsPerTx = 16 * 1024;
- for (ui64 index = 0; index < 2; ++index) {
- const ui64 base = rowsPerTx * index;
- auto db = backend.Begin();
- for (ui64 seq = 0; seq < rowsPerTx; ++seq) {
- db.Table<Schema::Data>().Key(base + seq).Update<Schema::Data::Value>(42);
- }
- backend.Commit();
- backend.SimpleMemCompaction(&strategy);
-
- UNIT_ASSERT_C(!broker.HasPending(), "Pending task at index " << index);
- UNIT_ASSERT_C(!backend.StartedCompactions, "Started compaction at index " << index);
-
- UNIT_ASSERT_C(!backend.PendingReads, "Pending read at index " << index);
- }
-
- auto& state = backend.TableState[1];
-
- NProto::TShardedStrategyStateInfo header;
- UNIT_ASSERT(state.contains(0));
- UNIT_ASSERT(ParseFromStringNoSizeLimit(header, state[0]));
-
- // We expect two shards
- UNIT_ASSERT_VALUES_EQUAL(header.GetShards().size(), 2u);
-
- // We expect a single split key
- UNIT_ASSERT_VALUES_EQUAL(header.GetSplitKeys().size(), 1u);
-
- // We expect split key to be the start of the second part
- TSerializedCellVec splitKey(state.at(header.GetSplitKeys(0)));
- UNIT_ASSERT(splitKey.GetCells());
- UNIT_ASSERT_VALUES_EQUAL(splitKey.GetCells().at(0).AsValue<ui64>(), rowsPerTx);
- }
-
- Y_UNIT_TEST(NormalSplit) {
- TSimpleBackend backend;
- TSimpleBroker broker;
- TSimpleLogger logger;
- TSimpleTime time;
-
- // Initialize the schema
- {
- auto db = backend.Begin();
- db.Materialize<Schema>();
-
- // special policy without nursery, small shard size and large compaction trigger
- TCompactionPolicy policy;
- policy.ShardPolicy.SetMinSliceSize(0);
- policy.ShardPolicy.SetMaxShardSize(512 * 1024);
- policy.ShardPolicy.SetNewDataPercentToCompact(300);
- backend.DB.Alter().SetCompactionPolicy(1, policy);
-
- backend.Commit();
- }
-
- TShardedCompactionStrategy strategy(1, &backend, &broker, &logger, &time, "suffix");
- strategy.Start({ });
-
- const ui64 rowsPerTx = 16 * 1024;
- for (ui64 index = 0; index < 2; ++index) {
- const ui64 base = index;
- auto db = backend.Begin();
- for (ui64 seq = 0; seq < rowsPerTx; ++seq) {
- db.Table<Schema::Data>().Key(base + seq * 3).Update<Schema::Data::Value>(42);
- }
- backend.Commit();
- backend.SimpleMemCompaction(&strategy);
-
- UNIT_ASSERT_C(!broker.HasPending(), "Pending task at index " << index);
- UNIT_ASSERT_C(!backend.StartedCompactions, "Started compaction at index " << index);
-
- if (index == 0) {
- UNIT_ASSERT_C(!backend.PendingReads, "Pending read at index " << index);
- }
- }
-
- UNIT_ASSERT_VALUES_EQUAL(backend.PendingReads.size(), 2u);
- while (backend.PendingReads) {
- TStrictEnv env;
- // load index
- auto first = backend.RunRead(&env);
- UNIT_ASSERT(!first.Completed);
- env.Load();
- // load data
- auto second = backend.RunRead(&env);
- UNIT_ASSERT(!second.Completed);
- env.Load();
- auto third = backend.RunRead(first.ReadId, &env);
- UNIT_ASSERT(third.Completed);
- }
-
- UNIT_ASSERT(backend.CheckChangesFlag());
- auto changes = strategy.ApplyChanges();
- UNIT_ASSERT(changes.SliceChanges);
- backend.ApplyChanges(1, std::move(changes));
-
- auto& state = backend.TableState[1];
-
- NProto::TShardedStrategyStateInfo header;
- UNIT_ASSERT(state.contains(0));
- UNIT_ASSERT(ParseFromStringNoSizeLimit(header, state[0]));
-
- // We expect two shards
- UNIT_ASSERT_VALUES_EQUAL(header.GetShards().size(), 2u);
-
- // We expect a single split key
- UNIT_ASSERT_VALUES_EQUAL(header.GetSplitKeys().size(), 1u);
-
- // Stop current strategy
- strategy.Stop();
-
- // Start a new strategy instance from the same initial state
- TShardedCompactionStrategy reloaded(1, &backend, &broker, &logger, &time, "suffix");
- {
- TCompactionState initialState;
- initialState.StateSnapshot = backend.TableState[1];
- reloaded.Start(std::move(initialState));
- }
-
- // Strategy must accept current slices without unexpected actions
- UNIT_ASSERT(!broker.HasPending());
- UNIT_ASSERT(!backend.PendingReads);
- UNIT_ASSERT(!backend.StartedCompactions);
-
- for (ui64 index = 2; index < 3; ++index) {
- const ui64 base = index;
- auto db = backend.Begin();
- for (ui64 seq = 0; seq < rowsPerTx; ++seq) {
- db.Table<Schema::Data>().Key(base + seq * 3).Update<Schema::Data::Value>(42);
- }
- backend.Commit();
- backend.SimpleMemCompaction(&reloaded);
-
- UNIT_ASSERT_C(!broker.HasPending(), "Pending task at index " << index);
- UNIT_ASSERT_C(!backend.StartedCompactions, "Started compaction at index " << index);
-
- // The part should be generated without additional reads
- UNIT_ASSERT(!backend.PendingReads);
- }
-
- // There should be 3 parts by now
- UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(1).size(), 3u);
-
- for (auto& partView : backend.TableParts(1)) {
- // All parts are expected to have 2 slices
- UNIT_ASSERT_VALUES_EQUAL(partView.Slices->size(), 2u);
- }
- }
-
- Y_UNIT_TEST(CompactionHoles) {
- TSimpleBackend backend;
- TSimpleBroker broker;
- TSimpleLogger logger;
- TSimpleTime time;
-
- // Initialize the schema
- {
- auto db = backend.Begin();
- db.Materialize<Schema>();
-
- // special policy without nursery and 99% compaction trigger
- TCompactionPolicy policy;
- policy.ShardPolicy.SetMinSliceSize(0);
- policy.ShardPolicy.SetNewDataPercentToCompact(99);
- policy.ShardPolicy.SetNewRowsPercentToCompact(99);
- backend.DB.Alter().SetCompactionPolicy(1, policy);
-
- backend.Commit();
- }
-
- auto strategy = MakeHolder<TShardedCompactionStrategy>(1, &backend, &broker, &logger, &time, "suffix");
- strategy->Start({ });
-
- // Arrange parts (epochs) like this in key space:
- // . 5 . 6 .
- // . 3 . 4 .
- // 7 1 8 2 9
- // We ignore compaction requests so it's not messed up until we're done.
- const ui64 rowsPerTx = 16 * 1024;
- for (ui64 level = 0; level < 3; ++level) {
- for (ui64 index = 0; index < 2; ++index) {
- const ui64 base = rowsPerTx + index * 2 * rowsPerTx;
- auto db = backend.Begin();
- for (ui64 seq = 0; seq < rowsPerTx; ++seq) {
- db.Table<Schema::Data>().Key(base + seq).Update<Schema::Data::Value>(42);
- }
- backend.Commit();
- backend.SimpleMemCompaction(strategy.Get());
- }
- }
- for (ui64 index = 0; index < 3; ++index) {
- const ui64 base = index * 2 * rowsPerTx;
- auto db = backend.Begin();
- for (ui64 seq = 0; seq < rowsPerTx; ++seq) {
- db.Table<Schema::Data>().Key(base + seq).Update<Schema::Data::Value>(42);
- }
- backend.Commit();
- backend.SimpleMemCompaction(strategy.Get());
- }
-
- UNIT_ASSERT_VALUES_EQUAL(backend.DumpKeyRanges(1),
- "[{0}, {16383}]@7 [{16384}, {32767}]@1 [{16384}, {32767}]@3 [{16384}, {32767}]@5 [{32768}, {49151}]@8 [{49152}, {65535}]@2 [{49152}, {65535}]@4 [{49152}, {65535}]@6 [{65536}, {81919}]@9");
-
- auto savedState = strategy->SnapshotState();
- strategy->Stop();
- strategy.Reset();
-
- // Strategy must discard all its current tasks when stopping
- UNIT_ASSERT(!broker.HasPending());
- UNIT_ASSERT(!backend.PendingReads);
- UNIT_ASSERT(!backend.StartedCompactions);
-
- strategy = MakeHolder<TShardedCompactionStrategy>(1, &backend, &broker, &logger, &time, "suffix");
- strategy->Start(std::move(savedState));
-
- // We expect a single compaction of two upper levels
- UNIT_ASSERT(broker.RunPending());
- UNIT_ASSERT(!broker.HasPending());
- UNIT_ASSERT_VALUES_EQUAL(backend.StartedCompactions.size(), 1u);
- UNIT_ASSERT(backend.SimpleTableCompaction(1, &broker, strategy.Get()));
-
- // We expect the following at the end of compaction:
- // . 6 . 6 .
- // 7 1 8 2 9
- UNIT_ASSERT_VALUES_EQUAL(backend.DumpKeyRanges(1),
- "[{0}, {16383}]@7 [{16384}, {32767}]@1 [{16384}, {32767}]@6 [{32768}, {49151}]@8 [{49152}, {65535}]@2 [{49152}, {65535}]@6 [{65536}, {81919}]@9");
- }
-
- Y_UNIT_TEST(CompactionGarbage) {
- TSimpleBackend backend;
- TSimpleBroker broker;
- TSimpleLogger logger;
- TSimpleTime time;
-
- // Initialize the schema
- {
- auto db = backend.Begin();
- db.Materialize<Schema>();
-
- // special policy without nursery and 25% compaction triggers
- TCompactionPolicy policy;
- policy.ShardPolicy.SetMinSliceSize(0);
- policy.ShardPolicy.SetMinShardSize(0);
- policy.ShardPolicy.SetNewDataPercentToCompact(25);
- policy.ShardPolicy.SetNewRowsPercentToCompact(25);
- policy.ShardPolicy.SetMaxGarbagePercentToReuse(25);
- backend.DB.Alter().SetCompactionPolicy(1, policy);
-
- backend.Commit();
- }
-
- const ui64 rowsPerShard = 16 * 1024;
-
- // Create initial state with 2 shards
- TCompactionState initialState;
- {
- const ui64 splitKey = rowsPerShard;
- initialState.StateSnapshot[1] = TSerializedCellVec::Serialize({
- TCell::Make(splitKey)
- });
- NProto::TShardedStrategyStateInfo header;
- header.SetLastSplitKey(1);
- header.AddSplitKeys(1);
- header.SetLastShardId(2);
- header.AddShards(1);
- header.AddShards(2);
- Y_PROTOBUF_SUPPRESS_NODISCARD header.SerializeToString(&initialState.StateSnapshot[0]);
- }
-
- TShardedCompactionStrategy strategy(1, &backend, &broker, &logger, &time, "suffix");
- strategy.Start(std::move(initialState));
-
- // Create a large slice over both shards
- {
- auto db = backend.Begin();
- for (ui64 seq = 0; seq < rowsPerShard * 2; ++seq) {
- db.Table<Schema::Data>().Key(seq).Update<Schema::Data::Value>(42);
- }
- backend.Commit();
- backend.SimpleMemCompaction(&strategy);
- }
-
- UNIT_ASSERT_VALUES_EQUAL(
- backend.DumpKeyRanges(1),
- "[{0}, {16383}]@1 [{16384}, {32767}]@1");
-
- // Create a large slice in the first shard, to trigger its compaction
- {
- auto db = backend.Begin();
- for (ui64 seq = 0; seq < rowsPerShard; ++seq) {
- db.Table<Schema::Data>().Key(seq).Update<Schema::Data::Value>(42);
- }
- backend.Commit();
- backend.SimpleMemCompaction(&strategy);
- }
-
- UNIT_ASSERT_VALUES_EQUAL(
- backend.DumpKeyRanges(1, true),
- "[{0}, {16383}]@1/4 [{0}, {16383}]@2/7 [{16384}, {32767}]@1/4");
-
- // New slice should have triggered compaction in the left shard
- UNIT_ASSERT(broker.RunPending());
- UNIT_ASSERT(!broker.HasPending());
- UNIT_ASSERT(backend.SimpleTableCompaction(1, &broker, &strategy));
-
- UNIT_ASSERT_VALUES_EQUAL(
- backend.DumpKeyRanges(1, true),
- "[{0}, {16383}]@2/8 [{16384}, {32767}]@1/4");
-
- // Now we should have triggered garbage compaction in the right shard
- UNIT_ASSERT(broker.RunPending());
- UNIT_ASSERT(!broker.HasPending());
- UNIT_ASSERT(backend.SimpleTableCompaction(1, &broker, &strategy));
-
- UNIT_ASSERT_VALUES_EQUAL(
- backend.DumpKeyRanges(1, true),
- "[{0}, {16383}]@2/8 [{16384}, {32767}]@1/9");
-
- UNIT_ASSERT(!broker.HasPending());
- }
-
- Y_UNIT_TEST(CompactionGarbageNoReuse) {
- TSimpleBackend backend;
- TSimpleBroker broker;
- TSimpleLogger logger;
- TSimpleTime time;
-
- // Initialize the schema
- {
- auto db = backend.Begin();
- db.Materialize<Schema>();
-
- // special policy without nursery, 25% compaction triggers
- TCompactionPolicy policy;
- policy.ShardPolicy.SetMinSliceSize(0);
- policy.ShardPolicy.SetMinShardSize(0);
- policy.ShardPolicy.SetMinSliceSizeToReuse(1);
- policy.ShardPolicy.SetNewDataPercentToCompact(25);
- policy.ShardPolicy.SetNewRowsPercentToCompact(25);
- policy.ShardPolicy.SetMaxGarbagePercentToReuse(25);
- backend.DB.Alter().SetCompactionPolicy(1, policy);
-
- backend.Commit();
- }
-
- TShardedCompactionStrategy strategy(1, &backend, &broker, &logger, &time, "suffix");
- strategy.Start({ });
-
- const ui64 rowsPerSlice = 16 * 1024;
-
- // Create two slices that largely overlap
- for (ui64 index = 0; index < 2; ++index) {
- const ui64 base = index * rowsPerSlice / 2;
- auto db = backend.Begin();
- for (ui64 seq = 0; seq < rowsPerSlice; ++seq) {
- db.Table<Schema::Data>().Key(base + seq).Update<Schema::Data::Value>(42);
- }
- backend.Commit();
- backend.SimpleMemCompaction(&strategy);
- }
-
- UNIT_ASSERT_VALUES_EQUAL(
- backend.DumpKeyRanges(1, true),
- "[{0}, {16383}]@1/4 [{8192}, {24575}]@2/7");
-
- // Run the triggered compaction
- UNIT_ASSERT(broker.RunPending());
- UNIT_ASSERT(!broker.HasPending());
- UNIT_ASSERT(backend.SimpleTableCompaction(1, &broker, &strategy));
-
- // We expect the middle to not be reused, as it would produce too much garbage
- UNIT_ASSERT_VALUES_EQUAL(
- backend.DumpKeyRanges(1, true),
- "[{0}, {24575}]@2/8");
- }
-
-}
-
-}
-}
-}
diff --git a/ydb/core/tablet_flat/ut/ya.make b/ydb/core/tablet_flat/ut/ya.make
index d1b143bc1a6..29789db2786 100644
--- a/ydb/core/tablet_flat/ut/ya.make
+++ b/ydb/core/tablet_flat/ut/ya.make
@@ -34,7 +34,6 @@ SRCS(
ut_sausage.cpp
ut_stat.cpp
ut_comp_gen.cpp
- ut_comp_shard.cpp
ut_compaction.cpp
ut_compaction_multi.cpp
ut_charge.cpp
diff --git a/ydb/core/tablet_flat/ya.make b/ydb/core/tablet_flat/ya.make
index f6d499884da..12d88cc87e3 100644
--- a/ydb/core/tablet_flat/ya.make
+++ b/ydb/core/tablet_flat/ya.make
@@ -7,7 +7,6 @@ SRCS(
flat_comp.cpp
flat_comp_create.cpp
flat_comp_gen.cpp
- flat_comp_shard.cpp
flat_cxx_database.h
flat_database.cpp
flat_database.h
@@ -51,7 +50,6 @@ SRCS(
flat_part_slice.cpp
flat_range_cache.cpp
flat_row_versions.cpp
- flat_stat_part.cpp
flat_stat_part.h
flat_stat_table.h
flat_stat_table.cpp
@@ -76,8 +74,6 @@ SRCS(
GENERATE_ENUM_SERIALIZATION(flat_comp_gen.h)
-GENERATE_ENUM_SERIALIZATION(flat_comp_shard.h)
-
GENERATE_ENUM_SERIALIZATION(flat_part_loader.h)
GENERATE_ENUM_SERIALIZATION(flat_executor_compaction_logic.h)
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
index ebb5a4f6cc7..ff64a0c82e5 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
@@ -1167,12 +1167,6 @@ bool TPartitionConfigMerger::VerifyCompactionPolicy(const NKikimrSchemeOp::TComp
case NKikimrSchemeOp::CompactionStrategyGenerational:
break;
case NKikimrSchemeOp::CompactionStrategySharded:
- if (!KIKIMR_ALLOW_SHARDED_COMPACTION) {
- err = TStringBuilder()
- << "Unsupported compaction strategy.";
- return false;
- }
- break;
default:
err = TStringBuilder()
<< "Unsupported compaction strategy.";
diff --git a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp
index 943bdcff4be..d33a669908a 100644
--- a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp
+++ b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp
@@ -9337,22 +9337,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
}
}
)",
- { KIKIMR_ALLOW_SHARDED_COMPACTION ?
- NKikimrScheme::StatusAccepted :
- NKikimrScheme::StatusInvalidParameter });
-
- if (KIKIMR_ALLOW_SHARDED_COMPACTION) {
- env.TestWaitNotification(runtime, txId);
- TestDescribeResult(DescribePath(runtime, "/MyRoot/Table2"),
- { [](const auto& result) {
- auto strategy = result
- .GetPathDescription()
- .GetTable()
- .GetPartitionConfig()
- .GetCompactionPolicy()
- .GetCompactionStrategy();
- UNIT_ASSERT(strategy == NKikimrSchemeOp::CompactionStrategySharded);}});
- }
+ { NKikimrScheme::StatusInvalidParameter });
}
Y_UNIT_TEST(AlterTableWithCompactionStrategies) { //+
@@ -9391,22 +9376,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
}
}
)",
- { KIKIMR_ALLOW_SHARDED_COMPACTION ?
- NKikimrScheme::StatusAccepted :
- NKikimrScheme::StatusInvalidParameter });
-
- if (KIKIMR_ALLOW_SHARDED_COMPACTION) {
- env.TestWaitNotification(runtime, txId);
- TestDescribeResult(DescribePath(runtime, "/MyRoot/Table1"),
- { [](const auto& result) {
- auto strategy = result
- .GetPathDescription()
- .GetTable()
- .GetPartitionConfig()
- .GetCompactionPolicy()
- .GetCompactionStrategy();
- UNIT_ASSERT(strategy == NKikimrSchemeOp::CompactionStrategySharded);}});
- }
+ { NKikimrScheme::StatusInvalidParameter });
}
Y_UNIT_TEST(SimultaneousDropForceDrop) { //+
diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make
index c662e6b6ff1..8d91105781f 100644
--- a/ydb/core/tx/schemeshard/ya.make
+++ b/ydb/core/tx/schemeshard/ya.make
@@ -49,12 +49,6 @@ RECURSE_FOR_TESTS(
LIBRARY()
-IF (KIKIMR_ALLOW_SHARDED_COMPACTION)
- CFLAGS(
- -DKIKIMR_ALLOW_SHARDED_COMPACTION=1
- )
-ENDIF()
-
SRCS(
defs.h
schemeshard.cpp