aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-04-05 14:17:40 +0300
committernsofya <nsofya@yandex-team.com>2023-04-05 14:17:40 +0300
commit1a13012ff58dbd96aab8a178e274cd82dda52b6f (patch)
tree59f98faa718ab2da8f6fc1af16709fc5aa543a7f
parent38d49a3490392bec1a4a5dfddf4efce5fd9a4463 (diff)
downloadydb-1a13012ff58dbd96aab8a178e274cd82dda52b6f.tar.gz
Group table schema processing in one class
Первый этап подготовки к модификации схем: группировка всех объектов, работающих с их состоянием внутри одного менеджера. 1. Объединила в одном классе работу с Tables + SchemaPreset + Ttl 2. В него же перенесла работу с PrimaryIndex, чтобы нативно выполнять все проверки (он не сразу строится, поэтому они не лишние) в нужные моменты вызывать функции, обновляющие сам PrimaryIndex. Пока не достаточно погрузилась в то, как правильно восстанавливать схемы для arrow из этого индекса, но это будет следующим шагом. 3. Местами не очень красиво выходит наружу MutablePrimaryIndex. В перспективе я бы всю работу с ним хотела спрятать в TablesManager. но пока унесла внутрь только те части, которые касаются обновления схемы, иначе погрязну. Вернусь к этому как буду больше понимать в том, что происходит. мои ожидания, что этот PR ничего не меняет в текущем поведении, но закладывает механизм будущего обновления схем, который надо реализовать уже внутри PrimaryIndex
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp22
-rw-r--r--ydb/core/tx/columnshard/columnshard__init.cpp142
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp3
-rw-r--r--ydb/core/tx/columnshard/columnshard__propose_transaction.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard__read.cpp10
-rw-r--r--ydb/core/tx/columnshard/columnshard__read_base.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp14
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp10
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp6
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp303
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h46
-rw-r--r--ydb/core/tx/columnshard/tables_manager.cpp324
-rw-r--r--ydb/core/tx/columnshard/tables_manager.h221
17 files changed, 680 insertions, 431 deletions
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
index a81d192131f..28261168eb2 100644
--- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
@@ -74,6 +74,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp
)
generate_enum_serilization(core-tx-columnshard
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
index 77e8b4de116..66d5ee15570 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
@@ -75,6 +75,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp
)
generate_enum_serilization(core-tx-columnshard
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
index 77e8b4de116..66d5ee15570 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
@@ -75,6 +75,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp
)
generate_enum_serilization(core-tx-columnshard
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h
diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
index a81d192131f..28261168eb2 100644
--- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
@@ -74,6 +74,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp
)
generate_enum_serilization(core-tx-columnshard
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index 4fa8d2fd8fd..20033b28c87 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -29,8 +29,8 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
IndexingActor = ctx.Register(CreateIndexingActor(TabletID(), ctx.SelfID));
CompactionActor = ctx.Register(CreateCompactionActor(TabletID(), ctx.SelfID));
EvictionActor = ctx.Register(CreateEvictionActor(TabletID(), ctx.SelfID));
- for (auto&& i : Tables) {
- ActivateTiering(i.first, i.second.TieringUsage);
+ for (auto&& i : TablesManager.GetTables()) {
+ ActivateTiering(i.first, i.second.GetTieringUsage());
}
SignalTabletActive(ctx);
}
@@ -193,11 +193,11 @@ void TColumnShard::UpdateInsertTableCounters() {
}
void TColumnShard::UpdateIndexCounters() {
- if (!PrimaryIndex) {
+ if (!TablesManager.HasPrimaryIndex()) {
return;
}
- auto& stats = PrimaryIndex->GetTotalStats();
+ auto& stats = TablesManager.MutablePrimaryIndex().GetTotalStats();
SetCounter(COUNTER_INDEX_TABLES, stats.Tables);
SetCounter(COUNTER_INDEX_GRANULES, stats.Granules);
SetCounter(COUNTER_INDEX_EMPTY_GRANULES, stats.EmptyGranules);
@@ -243,10 +243,6 @@ void TColumnShard::UpdateIndexCounters() {
ui64 TColumnShard::MemoryUsage() const {
ui64 memory =
- Tables.size() * sizeof(TTableInfo) +
- PathsToDrop.size() * sizeof(ui64) +
- Ttl.PathsCount() * sizeof(TTtl::TDescription) +
- SchemaPresets.size() * sizeof(TSchemaPreset) +
BasicTxInfo.size() * sizeof(TBasicTxInfo) +
DeadlineQueue.size() * sizeof(TDeadlineQueueItem) +
(PlanQueue.size() + RunningQueue.size()) * sizeof(TPlanQueueItem) +
@@ -258,9 +254,7 @@ ui64 TColumnShard::MemoryUsage() const {
(WaitingReads.size() + WaitingScans.size()) * (sizeof(TRowVersion) + sizeof(void*)) +
TabletCounters->Simple()[COUNTER_PREPARED_RECORDS].Get() * sizeof(NOlap::TInsertedData) +
TabletCounters->Simple()[COUNTER_COMMITTED_RECORDS].Get() * sizeof(NOlap::TInsertedData);
- if (PrimaryIndex) {
- memory += PrimaryIndex->MemoryUsage();
- }
+ memory += TablesManager.GetMemoryUsage();
memory += BatchCache.Bytes();
return memory;
}
@@ -329,9 +323,9 @@ void TColumnShard::SendPeriodicStats() {
tabletStats->SetTxRejectedBySpace(TabletCounters->Cumulative()[COUNTER_OUT_OF_SPACE].Get());
tabletStats->SetInFlightTxCount(Executor()->GetStats().TxInFly);
- if (PrimaryIndex) {
- const auto& indexStats = PrimaryIndex->GetTotalStats();
- NOlap::TSnapshot lastIndexUpdate = PrimaryIndex->LastUpdate();
+ if (TablesManager.HasPrimaryIndex()) {
+ const auto& indexStats = TablesManager.MutablePrimaryIndex().GetTotalStats();
+ NOlap::TSnapshot lastIndexUpdate = TablesManager.GetPrimaryIndexSafe().LastUpdate();
auto activeIndexStats = indexStats.Active(); // data stats excluding inactive and evicted
tabletStats->SetRowCount(activeIndexStats.Rows);
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp
index 23a25c8d83d..d3cc84cf09f 100644
--- a/ydb/core/tx/columnshard/columnshard__init.cpp
+++ b/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -41,8 +41,7 @@ void TTxInit::SetDefaults() {
Self->PlanQueue.clear();
Self->AltersInFlight.clear();
Self->CommitsInFlight.clear();
- Self->SchemaPresets.clear();
- Self->Tables.clear();
+ Self->TablesManager.Clear();
Self->LongTxWrites.clear();
Self->LongTxWritesByUniqueId.clear();
}
@@ -137,132 +136,10 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
}
}
- // Primary index defaut schema and TTL (both are versioned)
- TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaPreset;
- TMap<NOlap::TSnapshot, NOlap::TIndexInfo> commonSchema;
- THashMap<ui64, TMap<TRowVersion, TTtl::TDescription>> ttls;
-
- { // Load schema presets
- auto rowset = db.Table<Schema::SchemaPresetInfo>().Select();
- if (!rowset.IsReady())
- return false;
-
- while (!rowset.EndOfSet()) {
- const ui32 id = rowset.GetValue<Schema::SchemaPresetInfo::Id>();
- auto& preset = Self->SchemaPresets[id];
- preset.Id = id;
- if (id) {
- preset.Name = rowset.GetValue<Schema::SchemaPresetInfo::Name>();
- }
- Y_VERIFY(!id || preset.Name == "default", "Unsupported preset at load time");
-
- if (rowset.HaveValue<Schema::SchemaPresetInfo::DropStep>() &&
- rowset.HaveValue<Schema::SchemaPresetInfo::DropTxId>())
- {
- preset.DropVersion.Step = rowset.GetValue<Schema::SchemaPresetInfo::DropStep>();
- preset.DropVersion.TxId = rowset.GetValue<Schema::SchemaPresetInfo::DropTxId>();
- }
-
- if (!rowset.Next())
- return false;
- }
- }
-
- { // Load schema preset versions
- auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select();
- if (!rowset.IsReady())
- return false;
-
- while (!rowset.EndOfSet()) {
- const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>();
- Y_VERIFY(Self->SchemaPresets.contains(id));
- auto& preset = Self->SchemaPresets.at(id);
- TRowVersion version(
- rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(),
- rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>());
- auto& info = preset.Versions[version];
- Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>()));
-
- NOlap::TSnapshot snap{version.Step, version.TxId};
- if (!id) {
- commonSchema.emplace(snap, Self->ConvertSchema(info.GetSchema()));
- } else if (preset.Name == "default") {
- schemaPreset.emplace(snap, Self->ConvertSchema(info.GetSchema()));
- }
-
- if (!rowset.Next())
- return false;
- }
- }
-
- { // Load tables
- auto rowset = db.Table<Schema::TableInfo>().Select();
- if (!rowset.IsReady())
- return false;
-
- while (!rowset.EndOfSet()) {
- const ui64 pathId = rowset.GetValue<Schema::TableInfo::PathId>();
- auto& table = Self->Tables[pathId];
- table.PathId = pathId;
- table.TieringUsage = rowset.GetValue<Schema::TableInfo::TieringUsage>();
- if (rowset.HaveValue<Schema::TableInfo::DropStep>() &&
- rowset.HaveValue<Schema::TableInfo::DropTxId>())
- {
- table.DropVersion.Step = rowset.GetValue<Schema::TableInfo::DropStep>();
- table.DropVersion.TxId = rowset.GetValue<Schema::TableInfo::DropTxId>();
- Self->PathsToDrop.insert(pathId);
- }
-
- if (!rowset.Next())
- return false;
- }
- }
-
- { // Load table versions
- auto rowset = db.Table<Schema::TableVersionInfo>().Select();
- if (!rowset.IsReady())
- return false;
-
- while (!rowset.EndOfSet()) {
- const ui64 pathId = rowset.GetValue<Schema::TableVersionInfo::PathId>();
- Y_VERIFY(Self->Tables.contains(pathId));
- auto& table = Self->Tables.at(pathId);
- TRowVersion version(
- rowset.GetValue<Schema::TableVersionInfo::SinceStep>(),
- rowset.GetValue<Schema::TableVersionInfo::SinceTxId>());
- auto& info = table.Versions[version];
- Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>()));
-
- if (!Self->PathsToDrop.count(pathId)) {
- auto& ttlSettings = info.GetTtlSettings();
- if (ttlSettings.HasEnabled()) {
- ttls[pathId].emplace(version, TTtl::TDescription(ttlSettings.GetEnabled()));
- }
- }
-
- if (!rowset.Next())
- return false;
- }
- }
-
- for (auto& [pathId, map] : ttls) {
- auto& description = map.rbegin()->second; // last version if many
- Self->Ttl.SetPathTtl(pathId, std::move(description));
- }
-
- Self->SetCounter(COUNTER_TABLES, Self->Tables.size());
- Self->SetCounter(COUNTER_TABLE_PRESETS, Self->SchemaPresets.size());
- Self->SetCounter(COUNTER_TABLE_TTLS, ttls.size());
-
- if (!schemaPreset.empty()) {
- Y_VERIFY(commonSchema.empty(), "Mix of schema preset and common schema");
- Self->SetPrimaryIndex(std::move(schemaPreset));
- } else if (!commonSchema.empty()) {
- Self->SetPrimaryIndex(std::move(commonSchema));
- } else {
- Y_VERIFY(Self->Tables.empty());
- Y_VERIFY(Self->SchemaPresets.empty());
- }
+ Self->TablesManager.InitFromDB(db, Self->TabletID());
+ Self->SetCounter(COUNTER_TABLES, Self->TablesManager.GetTables().size());
+ Self->SetCounter(COUNTER_TABLE_PRESETS, Self->TablesManager.GetSchemaPresets().size());
+ Self->SetCounter(COUNTER_TABLE_TTLS, Self->TablesManager.GetTtl().PathsCount());
{ // Load long tx writes
auto rowset = db.Table<Schema::LongTxWrites>().Select();
@@ -310,13 +187,8 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
}
}
- // Load primary index
- if (Self->PrimaryIndex) {
- TBlobGroupSelector dsGroupSelector(Self->Info());
- NOlap::TDbWrapper idxDB(txc.DB, &dsGroupSelector);
- if (!Self->PrimaryIndex->Load(idxDB, lostEvictions, Self->PathsToDrop)) {
- return false;
- }
+ if (!Self->TablesManager.LoadIndex(dbTable, lostEvictions)) {
+ return false;
}
// Set dropped evicting records to be erased in future cleanups
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index beaa2f7b770..2ab3ab2a8e8 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -142,8 +142,7 @@ public:
}
auto pathExists = [&](ui64 pathId) {
- auto it = Self->Tables.find(pathId);
- return it != Self->Tables.end() && !it->second.IsDropped();
+ return Self->TablesManager.HasTable(pathId);
};
auto counters = Self->InsertTable->Commit(dbTable, step, txId, meta.MetaShard, meta.WriteIds,
diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
index de3bf81635b..c6a9274b20c 100644
--- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
+++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
@@ -276,13 +276,13 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
break;
}
- if (!Self->PrimaryIndex) {
+ if (!Self->TablesManager.HasPrimaryIndex()) {
statusMessage = "No primary index for TTL";
status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
break;
}
- auto schema = Self->PrimaryIndex->GetIndexInfo().ArrowSchema();
+ auto schema = Self->TablesManager.GetIndexInfo().ArrowSchema();
auto ttlColumn = schema->GetFieldByName(columnName);
if (!ttlColumn) {
statusMessage = "TTL tx wrong TTL column '" + columnName + "'";
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp
index 8fe9ba771a8..3857498fe19 100644
--- a/ydb/core/tx/columnshard/columnshard__read.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read.cpp
@@ -38,13 +38,13 @@ private:
bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) {
Y_VERIFY(Ev);
- Y_VERIFY(Self->PrimaryIndex);
+ Y_VERIFY(Self->TablesManager.HasPrimaryIndex());
Y_UNUSED(txc);
LOG_S_DEBUG("TTxRead.Execute at tablet " << Self->TabletID());
txc.DB.NoMoreReadsForTx();
- const NOlap::TIndexInfo& indexInfo = Self->PrimaryIndex->GetIndexInfo();
+ const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo();
auto& record = Proto(Ev->Get());
ui64 metaShard = record.GetTxInitiator();
@@ -53,7 +53,7 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) {
read.PlanStep = record.GetPlanStep();
read.TxId = record.GetTxId();
read.PathId = record.GetTableId();
- read.ReadNothing = Self->PathsToDrop.count(read.PathId);
+ read.ReadNothing = !(Self->TablesManager.HasTable(read.PathId));
read.ColumnIds = ProtoToVector<ui32>(record.GetColumnIds());
read.ColumnNames = ProtoToVector<TString>(record.GetColumnNames());
if (read.ColumnIds.empty() && read.ColumnNames.empty()) {
@@ -75,11 +75,11 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) {
}
bool parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read,
- TIndexColumnResolver(Self->PrimaryIndex->GetIndexInfo()));
+ TIndexColumnResolver(Self->TablesManager.GetIndexInfo()));
std::shared_ptr<NOlap::TReadMetadata> metadata;
if (parseResult) {
- metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, Self->BatchCache,
+ metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(), Self->BatchCache,
ErrorDescription);
}
diff --git a/ydb/core/tx/columnshard/columnshard__read_base.cpp b/ydb/core/tx/columnshard/columnshard__read_base.cpp
index 53b659cfd45..63c8239a6e7 100644
--- a/ydb/core/tx/columnshard/columnshard__read_base.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read_base.cpp
@@ -192,7 +192,7 @@ bool TTxReadBase::ParseProgram(const TActorContext& ctx, NKikimrSchemeOp::EOlapP
ErrorDescription = TStringBuilder() << "Wrong olap program";
return false;
}
- if (!ssaProgram->Steps.empty() && Self->PrimaryIndex) {
+ if (!ssaProgram->Steps.empty() && Self->TablesManager.HasPrimaryIndex()) {
NSsa::OptimizeProgram(*ssaProgram);
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index da8da9ee25e..8c17c5b83b9 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -842,9 +842,9 @@ std::shared_ptr<NOlap::TReadMetadataBase> TTxScan::CreateReadMetadata(const TAct
{
std::shared_ptr<NOlap::TReadMetadataBase> metadata;
if (indexStats) {
- metadata = PrepareStatsReadMetadata(Self->TabletID(), read, Self->PrimaryIndex, ErrorDescription);
+ metadata = PrepareStatsReadMetadata(Self->TabletID(), read, Self->TablesManager.GetPrimaryIndex(), ErrorDescription);
} else {
- metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, Self->BatchCache,
+ metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(), Self->BatchCache,
ErrorDescription);
}
@@ -880,7 +880,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
read.PlanStep = snapshot.GetStep();
read.TxId = snapshot.GetTxId();
read.PathId = record.GetLocalPathId();
- read.ReadNothing = Self->PathsToDrop.count(read.PathId);
+ read.ReadNothing = !(Self->TablesManager.HasTable(read.PathId));
read.TableName = record.GetTablePath();
bool isIndexStats = read.TableName.EndsWith(NOlap::TIndexInfo::STORE_INDEX_STATS_TABLE) ||
read.TableName.EndsWith(NOlap::TIndexInfo::TABLE_INDEX_STATS_TABLE);
@@ -891,7 +891,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
// "SELECT COUNT(*)" requests empty column list but we need non-empty list for PrepareReadMetadata.
// So we add first PK column to the request.
if (!isIndexStats) {
- read.ColumnIds.push_back(Self->PrimaryIndex->GetIndexInfo().GetPKFirstColumnId());
+ read.ColumnIds.push_back(Self->TablesManager.GetIndexInfo().GetPKFirstColumnId());
} else {
read.ColumnIds.push_back(PrimaryIndexStatsSchema.KeyColumns.front());
}
@@ -900,7 +900,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
bool parseResult;
if (!isIndexStats) {
- TIndexColumnResolver columnResolver(Self->PrimaryIndex->GetIndexInfo());
+ TIndexColumnResolver columnResolver(Self->TablesManager.GetIndexInfo());
parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read, columnResolver);
} else {
TStatsColumnResolver columnResolver;
@@ -926,7 +926,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
auto ydbKey = isIndexStats ?
NOlap::GetColumns(PrimaryIndexStatsSchema, PrimaryIndexStatsSchema.KeyColumns) :
- Self->PrimaryIndex->GetIndexInfo().GetPrimaryKey();
+ Self->TablesManager.GetIndexInfo().GetPrimaryKey();
for (auto& range: record.GetRanges()) {
FillPredicatesFromRange(read, range, ydbKey, Self->TabletID());
@@ -1023,7 +1023,7 @@ void TTxScan::Complete(const TActorContext& ctx) {
if (request.GetReverse()) {
std::reverse(rMetadataRanges.begin(), rMetadataRanges.end());
}
- NOlap::NCosts::TKeyRangesBuilder krBuilder(Self->PrimaryIndex->GetIndexInfo());
+ NOlap::NCosts::TKeyRangesBuilder krBuilder(Self->TablesManager.GetIndexInfo());
{
ui32 recordsCount = 0;
for (auto&& i : rMetadataRanges) {
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 7d42fb25c5e..e27ca500ac5 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -49,7 +49,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
Y_VERIFY(logoBlobId.IsValid());
bool ok = false;
- if (!Self->PrimaryIndex || !Self->IsTableWritable(tableId)) {
+ if (!Self->TablesManager.HasPrimaryIndex() || !Self->TablesManager.IsWritableTable(tableId)) {
status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
} else {
if (record.HasLongTxId()) {
@@ -138,15 +138,15 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
TString dedupId = record.GetDedupId();
auto putStatus = ev->Get()->PutStatus;
- bool isWritable = IsTableWritable(tableId);
- bool error = data.empty() || data.size() > TLimits::GetMaxBlobSize() || !PrimaryIndex || !isWritable;
+ bool isWritable = TablesManager.IsWritableTable(tableId);
+ bool error = data.empty() || data.size() > TLimits::GetMaxBlobSize() || !TablesManager.HasPrimaryIndex() || !isWritable;
bool errorReturned = (putStatus != NKikimrProto::OK) && (putStatus != NKikimrProto::UNKNOWN);
bool isOutOfSpace = IsAnyChannelYellowStop();
if (error || errorReturned) {
LOG_S_NOTICE("Write (fail) " << data.size() << " bytes into pathId " << tableId
<< ", status " << putStatus
- << (PrimaryIndex? "": ", no index") << (isWritable? "": ", ro")
+ << (TablesManager.HasPrimaryIndex()? "": ", no index") << (isWritable? "": ", ro")
<< " at tablet " << TabletID());
IncCounter(COUNTER_WRITE_FAIL);
@@ -221,7 +221,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize;
++WritesInFly; // write started
- ctx.Register(CreateWriteActor(TabletID(), PrimaryIndex->GetIndexInfo(), ctx.SelfID,
+ ctx.Register(CreateWriteActor(TabletID(), TablesManager.GetIndexInfo(), ctx.SelfID,
BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release()));
}
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index 61149c87fa4..8ddfd4e2965 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -43,7 +43,7 @@ private:
bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) {
Y_VERIFY(Ev);
Y_VERIFY(Self->InsertTable);
- Y_VERIFY(Self->PrimaryIndex);
+ Y_VERIFY(Self->TablesManager.HasPrimaryIndex());
txc.DB.NoMoreReadsForTx();
@@ -66,7 +66,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector);
- ok = Self->PrimaryIndex->ApplyChanges(dbWrap, changes, snapshot); // update changes + apply
+ ok = Self->TablesManager.MutablePrimaryIndex().ApplyChanges(dbWrap, changes, snapshot); // update changes + apply
if (ok) {
LOG_S_DEBUG("TTxWriteIndex (" << changes->TypeString() << ") apply at tablet " << Self->TabletID());
@@ -248,7 +248,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
Schema::SaveSpecialValue(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo);
}
- Self->PrimaryIndex->FreeLocks(changes);
+ Self->TablesManager.MutablePrimaryIndex().FreeLocks(changes);
if (changes->IsInsert()) {
Self->ActiveIndexingOrCompaction = false;
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 41f3d952b3e..5db1a69b302 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -395,46 +395,6 @@ void TColumnShard::ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo&
}
}
-bool TColumnShard::IsTableWritable(ui64 tableId) const {
- auto it = Tables.find(tableId);
- if (it == Tables.end()) {
- return false;
- }
- return !it->second.IsDropped();
-}
-
-ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, ui32 presetId, const TString& name,
- const NKikimrSchemeOp::TColumnTableSchema& schemaProto,
- const TRowVersion& version) {
- if (!SchemaPresets.contains(presetId)) {
- LOG_S_DEBUG("EnsureSchemaPreset " << presetId << " at tablet " << TabletID());
-
- auto& preset = SchemaPresets[presetId];
- preset.Id = presetId;
- preset.Name = name;
- auto& info = preset.Versions[version];
- info.SetId(preset.Id);
- info.SetSinceStep(version.Step);
- info.SetSinceTxId(version.TxId);
- *info.MutableSchema() = schemaProto;
-
- Schema::SaveSchemaPresetInfo(db, preset.Id, preset.Name);
- Schema::SaveSchemaPresetVersionInfo(db, preset.Id, version, info);
- SetCounter(COUNTER_TABLE_PRESETS, SchemaPresets.size());
- } else {
- LOG_S_DEBUG("EnsureSchemaPreset for existed preset " << presetId << " at tablet " << TabletID());
- }
-
- return presetId;
-}
-
-ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto,
- const TRowVersion& version) {
- Y_VERIFY(presetProto.GetName() == "default", "Only schema preset named 'default' is supported");
-
- return EnsureSchemaPreset(db, presetProto.GetId(), presetProto.GetName(), presetProto.GetSchema(), version);
-}
-
void TColumnShard::RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version,
NTabletFlatExecutor::TTransactionContext& txc) {
switch (body.TxBody_case()) {
@@ -494,70 +454,53 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
NIceDb::TNiceDb db(txc.DB);
const ui64 pathId = tableProto.GetPathId();
- if (!Tables.contains(pathId)) {
- LOG_S_DEBUG("EnsureTable for pathId: " << pathId
- << " ttl settings: " << tableProto.GetTtlSettings()
- << " at tablet " << TabletID());
-
- ui32 schemaPresetId = 0;
- if (tableProto.HasSchemaPreset()) {
- Y_VERIFY(!tableProto.HasSchema(), "Tables has either schema or preset");
+ if (TablesManager.HasTable(pathId)) {
+ LOG_S_DEBUG("EnsureTable for existed pathId: " << pathId << " at tablet " << TabletID());
+ return;
+ }
- schemaPresetId = EnsureSchemaPreset(db, tableProto.GetSchemaPreset(), version);
- Y_VERIFY(schemaPresetId);
- } else {
- Y_VERIFY(tableProto.HasSchema(), "Tables has either schema or preset");
+ LOG_S_DEBUG("EnsureTable for pathId: " << pathId
+ << " ttl settings: " << tableProto.GetTtlSettings()
+ << " at tablet " << TabletID());
- // Save first table schema as common one with schemaPresetId == 0
+ TTableInfo::TTableVersionInfo tableVerProto;
+ tableVerProto.SetPathId(pathId);
- if (SchemaPresets.count(0)) {
- LOG_S_WARN("Colocated standalone tables are not supported. "
- << "EnsureTable failed at tablet " << TabletID());
- return;
- }
+ if (tableProto.HasSchemaPreset()) {
+ Y_VERIFY(!tableProto.HasSchema(), "Tables has either schema or preset");
- schemaPresetId = EnsureSchemaPreset(db, 0, "", tableProto.GetSchema(), version);
- Y_VERIFY(!schemaPresetId);
+ TSchemaPreset preset;
+ preset.Deserialize(tableProto.GetSchemaPreset());
+ Y_VERIFY(!preset.IsStandaloneTable());
+ tableVerProto.SetSchemaPresetId(preset.GetId());
+
+ if (TablesManager.RegisterSchemaPreset(preset, db)) {
+ TablesManager.AddPresetVersion(tableProto.GetSchemaPreset().GetId(), version, tableProto.GetSchemaPreset().GetSchema(), db);
}
+ } else {
+ Y_VERIFY(tableProto.HasSchema(), "Tables has either schema or preset");
+ *tableVerProto.MutableSchema() = tableProto.GetSchema();
+ }
- auto& table = Tables[pathId];
- table.PathId = pathId;
- auto& tableVerProto = table.Versions[version];
- tableVerProto.SetPathId(pathId);
- tableVerProto.SetSchemaPresetId(schemaPresetId);
-
- if (tableProto.HasTtlSettings()) {
- *tableVerProto.MutableTtlSettings() = tableProto.GetTtlSettings();
- auto& ttlInfo = tableProto.GetTtlSettings();
- if (ttlInfo.HasEnabled()) {
- Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlInfo.GetEnabled()));
- SetCounter(COUNTER_TABLE_TTLS, Ttl.PathsCount());
- }
- if (ttlInfo.HasUseTiering()) {
- table.TieringUsage = ttlInfo.GetUseTiering();
- ActivateTiering(pathId, table.TieringUsage);
- }
+ TTableInfo table(pathId);
+ if (tableProto.HasTtlSettings()) {
+ const auto& ttlSettings = tableProto.GetTtlSettings();
+ *tableVerProto.MutableTtlSettings() = ttlSettings;
+ if (ttlSettings.HasUseTiering()) {
+ table.SetTieringUsage(ttlSettings.GetUseTiering());
+ ActivateTiering(pathId, table.GetTieringUsage());
}
+ }
- if (!PrimaryIndex) {
- TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaHistory;
-
- auto& schemaPresetVerProto = SchemaPresets[schemaPresetId].Versions[version];
- schemaHistory.emplace(NOlap::TSnapshot{version.Step, version.TxId},
- ConvertSchema(schemaPresetVerProto.GetSchema()));
-
- SetPrimaryIndex(std::move(schemaHistory));
- }
+ tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj());
+ tableVerProto.SetTtlSettingsPresetVersionAdj(tableProto.GetTtlSettingsPresetVersionAdj());
- tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj());
- tableVerProto.SetTtlSettingsPresetVersionAdj(tableProto.GetTtlSettingsPresetVersionAdj());
+ TablesManager.RegisterTable(std::move(table), db);
+ TablesManager.AddTableVersion(pathId, version, tableVerProto, db);
- Schema::SaveTableInfo(db, table.PathId, table.TieringUsage);
- Schema::SaveTableVersionInfo(db, table.PathId, version, tableVerProto);
- SetCounter(COUNTER_TABLES, Tables.size());
- } else {
- LOG_S_DEBUG("EnsureTable for existed pathId: " << pathId << " at tablet " << TabletID());
- }
+ SetCounter(COUNTER_TABLES, TablesManager.GetTables().size());
+ SetCounter(COUNTER_TABLE_PRESETS, TablesManager.GetSchemaPresets().size());
+ SetCounter(COUNTER_TABLE_TTLS, TablesManager.GetTtl().PathsCount());
}
void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterProto, const TRowVersion& version,
@@ -565,39 +508,33 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
NIceDb::TNiceDb db(txc.DB);
const ui64 pathId = alterProto.GetPathId();
- auto* tablePtr = Tables.FindPtr(pathId);
- Y_VERIFY(tablePtr && !tablePtr->IsDropped(), "AlterTable on a dropped or non-existent table");
- auto& table = *tablePtr;
- auto& ttlSettings = alterProto.GetTtlSettings();
-
+ Y_VERIFY(TablesManager.HasTable(pathId), "AlterTable on a dropped or non-existent table");
+
LOG_S_DEBUG("AlterTable for pathId: " << pathId
<< " schema: " << alterProto.GetSchema()
- << " ttl settings: " << ttlSettings
+ << " ttl settings: " << alterProto.GetTtlSettings()
<< " at tablet " << TabletID());
- auto& info = table.Versions[version];
-
+ TTableInfo::TTableVersionInfo tableVerProto;
if (alterProto.HasSchemaPreset()) {
- info.SetSchemaPresetId(EnsureSchemaPreset(db, alterProto.GetSchemaPreset(), version));
+ tableVerProto.SetSchemaPresetId(alterProto.GetSchemaPreset().GetId());
+ TablesManager.AddPresetVersion(alterProto.GetSchemaPreset().GetId(), version, alterProto.GetSchemaPreset().GetSchema(), db);
+ } else if (alterProto.HasSchema()) {
+ *tableVerProto.MutableSchema() = alterProto.GetSchema();
}
+ const auto& ttlSettings = alterProto.GetTtlSettings(); // Note: Not valid behaviour for full alter implementation
const TString& tieringUsage = ttlSettings.GetUseTiering();
- ActivateTiering(pathId, tieringUsage);
if (alterProto.HasTtlSettings()) {
- *info.MutableTtlSettings() = ttlSettings;
- if (ttlSettings.HasEnabled()) {
- Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlSettings.GetEnabled()));
- } else {
- Ttl.DropPathTtl(pathId);
- }
- } else {
- Ttl.DropPathTtl(pathId);
+ const auto& ttlSettings = alterProto.GetTtlSettings();
+ *tableVerProto.MutableTtlSettings() = ttlSettings;
}
- Ttl.Repeat(); // Atler TTL triggers TTL activity
+ ActivateTiering(pathId, tieringUsage);
+ Schema::SaveTableInfo(db, pathId, tieringUsage);
- info.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj());
- Schema::SaveTableInfo(db, table.PathId, tieringUsage);
- Schema::SaveTableVersionInfo(db, table.PathId, version, info);
+ tableVerProto.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj());
+ TablesManager.AddTableVersion(pathId, version, tableVerProto, db);
+ TablesManager.OnTtlUpdate();
}
void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const TRowVersion& version,
@@ -605,25 +542,20 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt
NIceDb::TNiceDb db(txc.DB);
const ui64 pathId = dropProto.GetPathId();
- auto* table = Tables.FindPtr(pathId);
- Y_VERIFY_DEBUG(table && !table->IsDropped());
- if (table && !table->IsDropped()) {
- LOG_S_DEBUG("DropTable for pathId: " << pathId << " at tablet " << TabletID());
-
- PathsToDrop.insert(pathId);
- Ttl.DropPathTtl(pathId);
-
- // TODO: Allow to read old snapshots after DROP
- TBlobGroupSelector dsGroupSelector(Info());
- NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
- THashSet<TWriteId> writesToAbort = InsertTable->DropPath(dbTable, pathId);
- TryAbortWrites(db, dbTable, std::move(writesToAbort));
-
- table->DropVersion = version;
- Schema::SaveTableDropVersion(db, pathId, version.Step, version.TxId);
- } else {
+ if (!TablesManager.HasTable(pathId)) {
LOG_S_DEBUG("DropTable for unknown or deleted pathId: " << pathId << " at tablet " << TabletID());
+ return;
}
+
+ LOG_S_DEBUG("DropTable for pathId: " << pathId << " at tablet " << TabletID());
+ TablesManager.DropTable(pathId, version, db);
+
+ // TODO: Allow to read old snapshots after DROP
+ TBlobGroupSelector dsGroupSelector(Info());
+ NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
+ THashSet<TWriteId> writesToAbort = InsertTable->DropPath(dbTable, pathId);
+
+ TryAbortWrites(db, dbTable, std::move(writesToAbort));
}
void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, const TRowVersion& version,
@@ -635,54 +567,18 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto,
Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId);
}
- TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaHistory;
-
for (ui32 id : proto.GetDroppedSchemaPresets()) {
- if (!SchemaPresets.contains(id)) {
+ if (!TablesManager.HasPreset(id)) {
continue;
}
- auto& preset = SchemaPresets.at(id);
- Y_VERIFY(preset.Name != "default", "Cannot drop the default preset");
- preset.DropVersion = version;
- Schema::SaveSchemaPresetDropVersion(db, id, version);
+ TablesManager.DropPreset(id, version, db);
}
for (const auto& presetProto : proto.GetSchemaPresets()) {
- if (!SchemaPresets.contains(presetProto.GetId())) {
+ if (!TablesManager.HasPreset(presetProto.GetId())) {
continue; // we don't update presets that we don't use
}
-
- auto& preset = SchemaPresets[presetProto.GetId()];
- auto& info = preset.Versions[version];
- info.SetId(preset.Id);
- info.SetSinceStep(version.Step);
- info.SetSinceTxId(version.TxId);
- *info.MutableSchema() = presetProto.GetSchema();
-
- if (preset.Name == "default") {
- schemaHistory.emplace(NOlap::TSnapshot{version.Step, version.TxId}, ConvertSchema(info.GetSchema()));
- }
-
- Schema::SaveSchemaPresetVersionInfo(db, preset.Id, version, info);
- }
-
- if (!schemaHistory.empty()) {
- SetPrimaryIndex(std::move(schemaHistory));
- }
-}
-
-void TColumnShard::SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions) {
- for (auto& [snap, indexInfo] : schemaVersions) {
- if (!PrimaryIndex) {
- PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(std::move(indexInfo), TabletID());
- SetCounter(COUNTER_INDEXES, 1);
- } else {
- PrimaryIndex->UpdateDefaultSchema(snap, std::move(indexInfo));
- }
- }
-
- for (auto& columnName : Ttl.TtlColumns()) {
- PrimaryIndex->GetIndexInfo().CheckTtlColumn(columnName);
+ TablesManager.AddPresetVersion(presetProto.GetId(), version, presetProto.GetSchema(), db);
}
}
@@ -764,7 +660,7 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() {
LOG_S_DEBUG("Indexing/compaction already in progress at tablet " << TabletID());
return {};
}
- if (!PrimaryIndex) {
+ if (!TablesManager.HasPrimaryIndex()) {
LOG_S_NOTICE("Indexing not started: no index at tablet " << TabletID());
return {};
}
@@ -785,7 +681,7 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() {
if (bytesToIndex && (bytesToIndex + dataSize) > (ui64)Limits.MaxInsertBytes) {
continue;
}
- if (auto* pMap = PrimaryIndex->GetOverloadedGranules(data.PathId)) {
+ if (auto* pMap = TablesManager.GetPrimaryIndexSafe().GetOverloadedGranules(data.PathId)) {
overloadedPathGranules[pathId] = pMap->size();
InsertTable->SetOverloaded(data.PathId, true);
++ignored;
@@ -833,13 +729,13 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() {
}
Y_VERIFY(data.size());
- auto indexChanges = PrimaryIndex->StartInsert(std::move(data));
+ auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(std::move(data));
if (!indexChanges) {
LOG_S_NOTICE("Cannot prepare indexing at tablet " << TabletID());
return {};
}
- auto actualIndexInfo = PrimaryIndex->GetIndexInfo();
+ auto actualIndexInfo = TablesManager.GetIndexInfo();
if (Tiers) {
auto pathTiering = Tiers->GetTiering(); // TODO: pathIds
actualIndexInfo.UpdatePathTiering(pathTiering);
@@ -857,13 +753,13 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() {
LOG_S_DEBUG("Compaction/indexing already in progress at tablet " << TabletID());
return {};
}
- if (!PrimaryIndex) {
+ if (!TablesManager.HasPrimaryIndex()) {
LOG_S_NOTICE("Compaction not started: no index at tablet " << TabletID());
return {};
}
- PrimaryIndex->UpdateCompactionLimits(CompactionLimits.Get());
- auto compactionInfo = PrimaryIndex->Compact(LastCompactedGranule);
+ TablesManager.MutablePrimaryIndex().UpdateCompactionLimits(CompactionLimits.Get());
+ auto compactionInfo = TablesManager.MutablePrimaryIndex().Compact(LastCompactedGranule);
if (!compactionInfo || compactionInfo->Empty()) {
LOG_S_DEBUG("Compaction not started: no portions to compact at tablet " << TabletID());
return {};
@@ -874,13 +770,13 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() {
LOG_S_DEBUG("Prepare " << *compactionInfo << " at tablet " << TabletID());
ui64 ourdatedStep = GetOutdatedStep();
- auto indexChanges = PrimaryIndex->StartCompaction(std::move(compactionInfo), {ourdatedStep, 0});
+ auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(std::move(compactionInfo), {ourdatedStep, 0});
if (!indexChanges) {
LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID());
return {};
}
- auto actualIndexInfo = PrimaryIndex->GetIndexInfo();
+ auto actualIndexInfo = TablesManager.GetIndexInfo();
if (Tiers) {
auto pathTiering = Tiers->GetTiering(); // TODO: pathIds
actualIndexInfo.UpdatePathTiering(pathTiering);
@@ -903,7 +799,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
LOG_S_DEBUG("Do not start TTL while eviction is in progress at tablet " << TabletID());
return {};
}
- if (!PrimaryIndex) {
+ if (!TablesManager.HasPrimaryIndex()) {
LOG_S_NOTICE("TTL not started. No index for TTL at tablet " << TabletID());
return {};
}
@@ -913,7 +809,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
if (Tiers) {
eviction = Tiers->GetTiering(); // TODO: pathIds
}
- Ttl.AddTtls(eviction, TInstant::Now(), force);
+ TablesManager.AddTtls(eviction, TInstant::Now(), force);
}
if (eviction.empty()) {
@@ -927,11 +823,11 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
LOG_S_DEBUG("Evicting path " << i.first << " with " << i.second.GetDebugString() << " at tablet " << TabletID());
}
- auto actualIndexInfo = PrimaryIndex->GetIndexInfo();
+ auto actualIndexInfo = TablesManager.GetIndexInfo();
actualIndexInfo.UpdatePathTiering(eviction);
std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges;
- indexChanges = PrimaryIndex->StartTtl(eviction);
+ indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction);
actualIndexInfo.SetPathTiering(std::move(eviction));
@@ -940,7 +836,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
return {};
}
if (indexChanges->NeedRepeat) {
- Ttl.Repeat();
+ TablesManager.OnTtlUpdate();
}
bool needWrites = !indexChanges->PortionsToEvict.empty();
@@ -955,14 +851,14 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
LOG_S_DEBUG("Cleanup already in progress at tablet " << TabletID());
return {};
}
- if (!PrimaryIndex) {
+ if (!TablesManager.HasPrimaryIndex()) {
LOG_S_NOTICE("Cleanup not started. No index for cleanup at tablet " << TabletID());
return {};
}
NOlap::TSnapshot cleanupSnapshot{GetMinReadStep(), 0};
- auto changes = PrimaryIndex->StartCleanup(cleanupSnapshot, PathsToDrop, TLimits::MAX_TX_RECORDS);
+ auto changes = TablesManager.StartIndexCleanup(cleanupSnapshot, TLimits::MAX_TX_RECORDS);
if (!changes) {
LOG_S_NOTICE("Cannot prepare cleanup at tablet " << TabletID());
return {};
@@ -995,7 +891,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
return {};
}
- auto actualIndexInfo = PrimaryIndex->GetIndexInfo();
+ auto actualIndexInfo = TablesManager.GetIndexInfo();
#if 0 // No need for now
if (Tiers) {
...
@@ -1009,33 +905,6 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
return ev;
}
-NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) {
- Y_VERIFY(schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES);
-
- ui32 indexId = 0;
- NOlap::TIndexInfo indexInfo("", indexId);
-
- for (const auto& col : schema.GetColumns()) {
- const ui32 id = col.GetId();
- const TString& name = col.GetName();
- auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(),
- col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr);
- indexInfo.Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod);
- indexInfo.ColumnNames[name] = id;
- }
-
- for (const auto& keyName : schema.GetKeyColumnNames()) {
- Y_VERIFY(indexInfo.ColumnNames.count(keyName));
- indexInfo.KeyColumns.push_back(indexInfo.ColumnNames[keyName]);
- }
-
- if (schema.HasDefaultCompression()) {
- NOlap::TCompression compression = NTiers::ConvertCompression(schema.GetDefaultCompression());
- indexInfo.SetDefaultCompression(compression);
- }
-
- return indexInfo;
-}
void TColumnShard::MapExternBlobs(const TActorContext& /*ctx*/, NOlap::TReadMetadata& metadata) {
if (!metadata.SelectInfo) {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 3283e97440e..383d51a3592 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -5,6 +5,7 @@
#include "columnshard_ttl.h"
#include "columnshard_private_events.h"
#include "blob_manager.h"
+#include "tables_manager.h"
#include "inflight_request_tracker.h"
#include <ydb/core/tablet/tablet_counters.h>
@@ -331,31 +332,8 @@ private:
}
};
- struct TSchemaPreset {
- using TSchemaPresetVersionInfo = NKikimrTxColumnShard::TSchemaPresetVersionInfo;
-
- ui32 Id;
- TString Name;
- TMap<TRowVersion, TSchemaPresetVersionInfo> Versions;
- TRowVersion DropVersion = TRowVersion::Max();
-
- bool IsDropped() const {
- return DropVersion != TRowVersion::Max();
- }
- };
-
- struct TTableInfo {
- using TTableVersionInfo = NKikimrTxColumnShard::TTableVersionInfo;
-
- ui64 PathId;
- std::map<TRowVersion, TTableVersionInfo> Versions;
- TRowVersion DropVersion = TRowVersion::Max();
- TString TieringUsage;
-
- bool IsDropped() const {
- return DropVersion != TRowVersion::Max();
- }
- };
+ using TSchemaPreset = TSchemaPreset;
+ using TTableInfo = TTableInfo;
struct TLongTxWriteInfo {
ui64 WriteId;
@@ -364,6 +342,8 @@ private:
ui64 PreparedTxId = 0;
};
+ TTablesManager TablesManager;
+
ui64 CurrentSchemeShardId = 0;
TMessageSeqNo LastSchemaSeqNo;
std::optional<NKikimrSubDomains::TProcessingParams> ProcessingParams;
@@ -401,9 +381,7 @@ private:
TTabletCountersBase* TabletCounters;
std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache;
std::unique_ptr<NOlap::TInsertTable> InsertTable;
- std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex;
TBatchCache BatchCache;
- TTtl Ttl;
THashMap<ui64, TBasicTxInfo> BasicTxInfo;
TSet<TDeadlineQueueItem> DeadlineQueue;
@@ -413,14 +391,11 @@ private:
THashMap<ui64, TInstant> ScanTxInFlight;
THashMap<ui64, TAlterMeta> AltersInFlight;
THashMap<ui64, TCommitMeta> CommitsInFlight; // key is TxId from propose
- THashMap<ui32, TSchemaPreset> SchemaPresets;
- THashMap<ui64, TTableInfo> Tables;
THashMap<TWriteId, TLongTxWriteInfo> LongTxWrites;
using TPartsForLTXShard = THashMap<ui32, TLongTxWriteInfo*>;
THashMap<TULID, TPartsForLTXShard> LongTxWritesByUniqueId;
TMultiMap<TRowVersion, TEvColumnShard::TEvRead::TPtr> WaitingReads;
TMultiMap<TRowVersion, TEvColumnShard::TEvScan::TPtr> WaitingScans;
- THashSet<ui64> PathsToDrop;
bool ActiveIndexingOrCompaction = false;
bool ActiveCleanup = false;
bool ActiveTtl = false;
@@ -455,7 +430,7 @@ private:
}
bool IndexOverloaded() const {
- return PrimaryIndex && PrimaryIndex->HasOverloadedGranules();
+ return TablesManager.IndexOverloaded();
}
TWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId);
@@ -472,22 +447,13 @@ private:
void UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExecutor::TTransactionContext& txc);
void ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo& seqNoProto, NTabletFlatExecutor::TTransactionContext& txc);
- bool IsTableWritable(ui64 tableId) const;
-
- ui32 EnsureSchemaPreset(NIceDb::TNiceDb& db, ui32 presetId, const TString& name,
- const NKikimrSchemeOp::TColumnTableSchema& schemaProto, const TRowVersion& version);
- ui32 EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, const TRowVersion& version);
- //ui32 EnsureTtlSettingsPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableTtlSettingsPreset& presetProto, const TRowVersion& version);
-
void RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
void RunInit(const NKikimrTxColumnShard::TInitShard& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
void RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
void RunAlterTable(const NKikimrTxColumnShard::TAlterTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
void RunDropTable(const NKikimrTxColumnShard::TDropTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
void RunAlterStore(const NKikimrTxColumnShard::TAlterStore& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
- void SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions);
- NOlap::TIndexInfo ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema);
void MapExternBlobs(const TActorContext& ctx, NOlap::TReadMetadata& metadata);
TActorId GetS3ActorForTier(const TString& tierId) const;
void ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName, ui64 pathId,
diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp
new file mode 100644
index 00000000000..cbcf0abe8c3
--- /dev/null
+++ b/ydb/core/tx/columnshard/tables_manager.cpp
@@ -0,0 +1,324 @@
+#include "tables_manager.h"
+#include "columnshard_schema.h"
+#include "blob_manager_db.h"
+#include "engines/column_engine_logs.h"
+#include <ydb/core/scheme/scheme_types_proto.h>
+#include <ydb/core/tx/tiering/manager.h>
+
+
+
+namespace NKikimr::NColumnShard {
+
+void TSchemaPreset::Deserialize(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) {
+ Id = presetProto.GetId();
+ Name = presetProto.GetName();
+}
+
+bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db, const ui64 tabletId) {
+ TabletId = tabletId;
+ {
+ auto rowset = db.Table<Schema::TableInfo>().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ while (!rowset.EndOfSet()) {
+ TTableInfo table;
+ if (!table.InitFromDB(rowset)) {
+ return false;
+ }
+ if (table.IsDropped()) {
+ PathsToDrop.insert(table.GetPathId());
+ }
+ Tables.insert_or_assign(table.GetPathId(), std::move(table));
+
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+ }
+
+ bool isFakePresetOnly = true;
+ {
+ auto rowset = db.Table<Schema::SchemaPresetInfo>().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ while (!rowset.EndOfSet()) {
+ TSchemaPreset preset;
+ preset.InitFromDB(rowset);
+
+ if (preset.IsStandaloneTable()) {
+ Y_VERIFY_S(!preset.GetName(), "Preset name: " + preset.GetName());
+ } else {
+ Y_VERIFY_S(preset.GetName() == "default", "Preset name: " + preset.GetName());
+ isFakePresetOnly = false;
+ }
+ SchemaPresets.insert_or_assign(preset.GetId(), preset);
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+ }
+
+ {
+ auto rowset = db.Table<Schema::TableVersionInfo>().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ THashMap<ui64, TRowVersion> lastVersion;
+ while (!rowset.EndOfSet()) {
+ const ui64 pathId = rowset.GetValue<Schema::TableVersionInfo::PathId>();
+ Y_VERIFY(Tables.contains(pathId));
+ TRowVersion version(
+ rowset.GetValue<Schema::TableVersionInfo::SinceStep>(),
+ rowset.GetValue<Schema::TableVersionInfo::SinceTxId>());
+
+ auto& table = Tables.at(pathId);
+ TTableInfo::TTableVersionInfo versionInfo;
+ Y_VERIFY(versionInfo.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>()));
+ Y_VERIFY(SchemaPresets.contains(versionInfo.GetSchemaPresetId()));
+
+ if (!table.IsDropped()) {
+ auto& ttlSettings = versionInfo.GetTtlSettings();
+ if (ttlSettings.HasEnabled()) {
+ if (!lastVersion.contains(pathId) || lastVersion[pathId] < version) {
+ TTtl::TDescription description(ttlSettings.GetEnabled());
+ Ttl.SetPathTtl(pathId, std::move(description));
+ lastVersion[pathId] = version;
+ }
+ }
+ }
+ table.AddVersion(version, versionInfo);
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+ }
+
+ {
+ auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ while (!rowset.EndOfSet()) {
+ const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>();
+ Y_VERIFY(SchemaPresets.contains(id));
+ auto& preset = SchemaPresets.at(id);
+ TRowVersion version(
+ rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(),
+ rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>());
+
+ TSchemaPreset::TSchemaPresetVersionInfo info;
+ Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>()));
+ preset.AddVersion(version, info);
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+ }
+
+ if (isFakePresetOnly) {
+ Y_VERIFY(Tables.size() <= 1);
+ if (!Tables.empty()) {
+ for (const auto& [version, tableInfo] : Tables.begin()->second.GetVersions()) {
+ if (tableInfo.HasSchema()) {
+ IndexSchemaVersion(version, tableInfo.GetSchema());
+ }
+ }
+ }
+ } else {
+ for (const auto& [id, preset] : SchemaPresets) {
+ Y_VERIFY(!!id);
+ for (const auto& [version, schemaInfo] : preset.GetVersions()) {
+ if (schemaInfo.HasSchema()) {
+ IndexSchemaVersion(version, schemaInfo.GetSchema());
+ }
+ }
+ }
+ }
+ return true;
+}
+
+bool TTablesManager::LoadIndex(NOlap::TDbWrapper& idxDB, THashSet<NOlap::TUnifiedBlobId>& lostEvictions) {
+ if (PrimaryIndex) {
+ if (!PrimaryIndex->Load(idxDB, lostEvictions, PathsToDrop)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void TTablesManager::Clear() {
+ Tables.clear();
+ SchemaPresets.clear();
+ PathsToDrop.clear();
+}
+
+bool TTablesManager::HasTable(const ui64 pathId) const {
+ auto it = Tables.find(pathId);
+ if (it == Tables.end() || it->second.IsDropped()) {
+ return false;
+ }
+ return true;
+}
+
+bool TTablesManager::IsWritableTable(const ui64 pathId) const {
+ return HasTable(pathId);
+}
+
+bool TTablesManager::HasPreset(const ui32 presetId) const {
+ return SchemaPresets.contains(presetId);
+}
+
+const TTableInfo& TTablesManager::GetTable(const ui64 pathId) const {
+ Y_VERIFY(HasTable(pathId));
+ return Tables.at(pathId);
+}
+
+ui64 TTablesManager::GetMemoryUsage() const {
+ ui64 memory =
+ Tables.size() * sizeof(TTableInfo) +
+ PathsToDrop.size() * sizeof(ui64) +
+ Ttl.PathsCount() * sizeof(TTtl::TDescription) +
+ SchemaPresets.size() * sizeof(TSchemaPreset);
+ if (PrimaryIndex) {
+ memory += PrimaryIndex->MemoryUsage();
+ }
+ return memory;
+}
+
+void TTablesManager::OnTtlUpdate() {
+ Ttl.Repeat();
+}
+
+void TTablesManager::DropTable(const ui64 pathId, const TRowVersion& version, NIceDb::TNiceDb& db) {
+ auto& table = Tables.at(pathId);
+ table.SetDropVersion(version);
+ PathsToDrop.insert(pathId);
+ Ttl.DropPathTtl(pathId);
+ Schema::SaveTableDropVersion(db, pathId, version.Step, version.TxId);
+}
+
+void TTablesManager::DropPreset(const ui32 presetId, const TRowVersion& version, NIceDb::TNiceDb& db) {
+ auto& preset = SchemaPresets.at(presetId);
+ Y_VERIFY(preset.GetName() != "default", "Cannot drop the default preset");
+ preset.SetDropVersion(version);
+ Schema::SaveSchemaPresetDropVersion(db, presetId, version);
+}
+
+void TTablesManager::RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db) {
+ Y_VERIFY(!HasTable(table.GetPathId()));
+ Y_VERIFY(table.IsEmpty());
+
+ Schema::SaveTableInfo(db, table.GetPathId(), table.GetTieringUsage());
+ Tables.insert_or_assign(table.GetPathId(), std::move(table));
+}
+
+bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db) {
+ if (SchemaPresets.contains(schemaPreset.GetId())) {
+ return false;
+ }
+ Schema::SaveSchemaPresetInfo(db, schemaPreset.GetId(), schemaPreset.GetName());
+ SchemaPresets.insert_or_assign(schemaPreset.GetId(), schemaPreset);
+ return true;
+}
+
+void TTablesManager::AddPresetVersion(const ui32 presetId, const TRowVersion& version, const TTableSchema& schema, NIceDb::TNiceDb& db) {
+ Y_VERIFY(SchemaPresets.contains(presetId));
+ TSchemaPreset::TSchemaPresetVersionInfo versionInfo;
+ versionInfo.SetId(presetId);
+ versionInfo.SetSinceStep(version.Step);
+ versionInfo.SetSinceTxId(version.TxId);
+ *versionInfo.MutableSchema() = schema;
+
+ auto& schemaPreset = SchemaPresets.at(presetId);
+ Schema::SaveSchemaPresetVersionInfo(db, presetId, version, versionInfo);
+ schemaPreset.AddVersion(version, versionInfo);
+ if (presetId != 0 && versionInfo.HasSchema()) {
+ IndexSchemaVersion(version, versionInfo.GetSchema());
+ }
+}
+
+void TTablesManager::AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db) {
+ auto& table = Tables.at(pathId);
+
+ if (table.IsEmpty()) {
+ if (versionInfo.HasSchemaPresetId()) {
+ Y_VERIFY(SchemaPresets.contains(versionInfo.GetSchemaPresetId()));
+ } else {
+ Y_VERIFY(SchemaPresets.empty());
+ TSchemaPreset preset;
+ Y_VERIFY(RegisterSchemaPreset(preset, db));
+ AddPresetVersion(preset.GetId(), version, versionInfo.GetSchema(), db);
+ }
+ }
+ Y_VERIFY(SchemaPresets.contains(versionInfo.GetSchemaPresetId()));
+
+ if (versionInfo.HasTtlSettings()) {
+ const auto& ttlSettings = versionInfo.GetTtlSettings();
+ if (ttlSettings.HasEnabled()) {
+ Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlSettings.GetEnabled()));
+ } else {
+ Ttl.DropPathTtl(pathId);
+ }
+ }
+
+ Schema::SaveTableVersionInfo(db, pathId, version, versionInfo);
+ table.AddVersion(version, versionInfo);
+ if (versionInfo.HasSchema()) {
+ IndexSchemaVersion(version, versionInfo.GetSchema());
+ }
+}
+
+void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const TTableSchema& schema) {
+ NOlap::TSnapshot snapshot{version.Step, version.TxId};
+ NOlap::TIndexInfo indexInfo = ConvertSchema(schema);
+
+ if (!PrimaryIndex) {
+ PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(std::move(indexInfo), TabletId);
+ } else {
+ PrimaryIndex->UpdateDefaultSchema(snapshot, std::move(indexInfo));
+ }
+
+ for (auto& columnName : Ttl.TtlColumns()) {
+ PrimaryIndex->GetIndexInfo().CheckTtlColumn(columnName);
+ }
+}
+
+std::shared_ptr<NOlap::TColumnEngineChanges> TTablesManager::StartIndexCleanup(const NOlap::TSnapshot& snapshot, ui32 maxRecords) {
+ Y_VERIFY(PrimaryIndex);
+ return PrimaryIndex->StartCleanup(snapshot, PathsToDrop, maxRecords);
+}
+
+NOlap::TIndexInfo TTablesManager::ConvertSchema(const TTableSchema& schema) {
+ Y_VERIFY(schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES);
+
+ ui32 indexId = 0;
+ NOlap::TIndexInfo indexInfo("", indexId);
+
+ for (const auto& col : schema.GetColumns()) {
+ const ui32 id = col.GetId();
+ const TString& name = col.GetName();
+ auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(),
+ col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr);
+ indexInfo.Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod);
+ indexInfo.ColumnNames[name] = id;
+ }
+
+ for (const auto& keyName : schema.GetKeyColumnNames()) {
+ Y_VERIFY(indexInfo.ColumnNames.count(keyName));
+ indexInfo.KeyColumns.push_back(indexInfo.ColumnNames[keyName]);
+ }
+
+ if (schema.HasDefaultCompression()) {
+ NOlap::TCompression compression = NTiers::ConvertCompression(schema.GetDefaultCompression());
+ indexInfo.SetDefaultCompression(compression);
+ }
+
+ return indexInfo;
+}
+}
diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h
new file mode 100644
index 00000000000..423221195b7
--- /dev/null
+++ b/ydb/core/tx/columnshard/tables_manager.h
@@ -0,0 +1,221 @@
+#pragma once
+
+#include "columnshard_schema.h"
+#include "columnshard_ttl.h"
+#include "engines/column_engine.h"
+
+#include "ydb/core/base/row_version.h"
+#include "ydb/library/accessor/accessor.h"
+#include "ydb/core/protos/tx_columnshard.pb.h"
+
+
+namespace NKikimr::NColumnShard {
+
+template<class TSchemaProto>
+class TVersionedSchema {
+protected:
+ std::optional<TRowVersion> DropVersion;
+ TMap<TRowVersion, TSchemaProto> Versions;
+
+public:
+ bool IsDropped() const {
+ return DropVersion.has_value();
+ }
+
+ bool IsEmpty() const {
+ return Versions.empty();
+ }
+
+ void SetDropVersion(const TRowVersion& version) {
+ DropVersion = version;
+ }
+
+ const TMap<TRowVersion, TSchemaProto>& GetVersions() const {
+ return Versions;
+ }
+
+ const TSchemaProto& GetVersion(const TRowVersion& version) const {
+ const TSchemaProto* result = nullptr;
+ for (auto ver : Versions) {
+ if (ver.first > version) {
+ break;
+ }
+ result = &ver.second;
+ }
+ Y_VERIFY(!!result);
+ return *result;
+ }
+
+ void AddVersion(const TRowVersion& version, const TSchemaProto& versionInfo) {
+ Versions[version] = versionInfo;
+ }
+};
+
+using TTableSchema = NKikimrSchemeOp::TColumnTableSchema;
+
+class TSchemaPreset : public TVersionedSchema<NKikimrTxColumnShard::TSchemaPresetVersionInfo> {
+public:
+ using TSchemaPresetVersionInfo = NKikimrTxColumnShard::TSchemaPresetVersionInfo;
+ ui32 Id = 0;
+ TString Name;
+public:
+ bool IsStandaloneTable() const {
+ return Id == 0;
+ }
+
+ const TString& GetName() const {
+ return Name;
+ }
+
+ ui32 GetId() const {
+ return Id;
+ }
+
+ void Deserialize(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto);
+
+ template <class TRow>
+ bool InitFromDB(const TRow& rowset) {
+ Id = rowset.template GetValue<Schema::SchemaPresetInfo::Id>();
+ if (!IsStandaloneTable()) {
+ Name = rowset.template GetValue<Schema::SchemaPresetInfo::Name>();
+ }
+ Y_VERIFY(!Id || Name == "default", "Unsupported preset at load time");
+
+ if (rowset.template HaveValue<Schema::SchemaPresetInfo::DropStep>() &&
+ rowset.template HaveValue<Schema::SchemaPresetInfo::DropTxId>())
+ {
+ DropVersion.emplace();
+ DropVersion->Step = rowset.template GetValue<Schema::SchemaPresetInfo::DropStep>();
+ DropVersion->TxId = rowset.template GetValue<Schema::SchemaPresetInfo::DropTxId>();
+ }
+ return true;
+ }
+};
+
+class TTableInfo : public TVersionedSchema<NKikimrTxColumnShard::TTableVersionInfo> {
+public:
+ using TTableVersionInfo = NKikimrTxColumnShard::TTableVersionInfo;
+ ui64 PathId;
+ TString TieringUsage;
+
+public:
+ const TString& GetTieringUsage() const {
+ return TieringUsage;
+ }
+
+ TTableInfo& SetTieringUsage(const TString& data) {
+ TieringUsage = data;
+ return *this;
+ }
+
+ ui64 GetPathId() const {
+ return PathId;
+ }
+
+ TTableInfo() = default;
+
+ TTableInfo(const ui64 pathId)
+ : PathId(pathId)
+ {}
+
+ template <class TRow>
+ bool InitFromDB(const TRow& rowset) {
+ PathId = rowset.template GetValue<Schema::TableInfo::PathId>();
+ TieringUsage = rowset.template GetValue<Schema::TableInfo::TieringUsage>();
+ if (rowset.template HaveValue<Schema::TableInfo::DropStep>() && rowset.template HaveValue<Schema::TableInfo::DropTxId>()) {
+ DropVersion.emplace();
+ DropVersion->Step = rowset.template GetValue<Schema::TableInfo::DropStep>();
+ DropVersion->TxId = rowset.template GetValue<Schema::TableInfo::DropTxId>();
+ }
+ return true;
+ }
+};
+
+class TTablesManager {
+private:
+ THashMap<ui64, TTableInfo> Tables;
+ THashMap<ui32, TSchemaPreset> SchemaPresets;
+ THashSet<ui64> PathsToDrop;
+ TTtl Ttl;
+ std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex;
+ ui64 TabletId;
+public:
+ const TTtl& GetTtl() const {
+ return Ttl;
+ }
+
+ void AddTtls(THashMap<ui64, NOlap::TTiering>& eviction, TInstant now, bool force) {
+ Ttl.AddTtls(eviction, now, force);
+ }
+
+ const THashSet<ui64>& GetPathsToDrop() const {
+ return PathsToDrop;
+ }
+
+ const THashMap<ui64, TTableInfo>& GetTables() const {
+ return Tables;
+ }
+
+ const THashMap<ui32, TSchemaPreset>& GetSchemaPresets() const {
+ return SchemaPresets;
+ }
+
+ bool IndexOverloaded() const {
+ return PrimaryIndex && PrimaryIndex->HasOverloadedGranules();
+ }
+
+ bool HasPrimaryIndex() const {
+ return !!PrimaryIndex;
+ }
+
+ NOlap::IColumnEngine& MutablePrimaryIndex() {
+ Y_VERIFY(!!PrimaryIndex);
+ return *PrimaryIndex;
+ }
+
+ const NOlap::TIndexInfo& GetIndexInfo(const NOlap::TSnapshot& version = {}) const {
+ Y_UNUSED(version);
+ Y_VERIFY(!!PrimaryIndex);
+ return PrimaryIndex->GetIndexInfo();
+ }
+
+ const std::unique_ptr<NOlap::IColumnEngine>& GetPrimaryIndex() const {
+ return PrimaryIndex;
+ }
+
+ const NOlap::IColumnEngine& GetPrimaryIndexSafe() const {
+ Y_VERIFY(!!PrimaryIndex);
+ return *PrimaryIndex;
+ }
+
+ bool InitFromDB(NIceDb::TNiceDb& db, const ui64 tabletId);
+ bool LoadIndex(NOlap::TDbWrapper& db, THashSet<NOlap::TUnifiedBlobId>& lostEvictions);
+
+ void Clear();
+
+ const TTableInfo& GetTable(const ui64 pathId) const;
+ ui64 GetMemoryUsage() const;
+
+ bool HasTable(const ui64 pathId) const;
+ bool IsWritableTable(const ui64 pathId) const;
+ bool HasPreset(const ui32 presetId) const;
+
+ void DropTable(const ui64 pathId, const TRowVersion& version, NIceDb::TNiceDb& db);
+ void DropPreset(const ui32 presetId, const TRowVersion& version, NIceDb::TNiceDb& db);
+
+ void RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db);
+ bool RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db);
+
+ void AddPresetVersion(const ui32 presetId, const TRowVersion& version, const TTableSchema& schema, NIceDb::TNiceDb& db);
+ void AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db);
+
+ void OnTtlUpdate();
+
+ std::shared_ptr<NOlap::TColumnEngineChanges> StartIndexCleanup(const NOlap::TSnapshot& snapshot, ui32 maxRecords);
+
+private:
+ void IndexSchemaVersion(const TRowVersion& version, const TTableSchema& schema);
+ static NOlap::TIndexInfo ConvertSchema(const TTableSchema& schema);
+};
+
+}