aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-04-06 13:36:33 +0300
committernsofya <nsofya@yandex-team.com>2023-04-06 13:36:33 +0300
commit1081f842b292b5ac2aad05bb887c47956bbcc45c (patch)
treecca95d9800e0a68987de5bb708a4553d05b37bf7
parent4065b7ff9ca3eb0fde79193508385c0afc42f2c4 (diff)
downloadydb-1081f842b292b5ac2aad05bb887c47956bbcc45c.tar.gz
temporary revert rXXXXXX commit due to standalone tables scheme problem
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp22
-rw-r--r--ydb/core/tx/columnshard/columnshard__init.cpp142
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp3
-rw-r--r--ydb/core/tx/columnshard/columnshard__propose_transaction.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard__read.cpp10
-rw-r--r--ydb/core/tx/columnshard/columnshard__read_base.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp14
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp10
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp6
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp303
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h46
-rw-r--r--ydb/core/tx/columnshard/tables_manager.cpp324
-rw-r--r--ydb/core/tx/columnshard/tables_manager.h221
17 files changed, 431 insertions, 680 deletions
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
index 28261168eb2..a81d192131f 100644
--- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
@@ -74,7 +74,6 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp
)
generate_enum_serilization(core-tx-columnshard
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
index 66d5ee15570..77e8b4de116 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
@@ -75,7 +75,6 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp
)
generate_enum_serilization(core-tx-columnshard
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
index 66d5ee15570..77e8b4de116 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
@@ -75,7 +75,6 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp
)
generate_enum_serilization(core-tx-columnshard
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h
diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
index 28261168eb2..a81d192131f 100644
--- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
@@ -74,7 +74,6 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp
)
generate_enum_serilization(core-tx-columnshard
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index 20033b28c87..4fa8d2fd8fd 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -29,8 +29,8 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
IndexingActor = ctx.Register(CreateIndexingActor(TabletID(), ctx.SelfID));
CompactionActor = ctx.Register(CreateCompactionActor(TabletID(), ctx.SelfID));
EvictionActor = ctx.Register(CreateEvictionActor(TabletID(), ctx.SelfID));
- for (auto&& i : TablesManager.GetTables()) {
- ActivateTiering(i.first, i.second.GetTieringUsage());
+ for (auto&& i : Tables) {
+ ActivateTiering(i.first, i.second.TieringUsage);
}
SignalTabletActive(ctx);
}
@@ -193,11 +193,11 @@ void TColumnShard::UpdateInsertTableCounters() {
}
void TColumnShard::UpdateIndexCounters() {
- if (!TablesManager.HasPrimaryIndex()) {
+ if (!PrimaryIndex) {
return;
}
- auto& stats = TablesManager.MutablePrimaryIndex().GetTotalStats();
+ auto& stats = PrimaryIndex->GetTotalStats();
SetCounter(COUNTER_INDEX_TABLES, stats.Tables);
SetCounter(COUNTER_INDEX_GRANULES, stats.Granules);
SetCounter(COUNTER_INDEX_EMPTY_GRANULES, stats.EmptyGranules);
@@ -243,6 +243,10 @@ void TColumnShard::UpdateIndexCounters() {
ui64 TColumnShard::MemoryUsage() const {
ui64 memory =
+ Tables.size() * sizeof(TTableInfo) +
+ PathsToDrop.size() * sizeof(ui64) +
+ Ttl.PathsCount() * sizeof(TTtl::TDescription) +
+ SchemaPresets.size() * sizeof(TSchemaPreset) +
BasicTxInfo.size() * sizeof(TBasicTxInfo) +
DeadlineQueue.size() * sizeof(TDeadlineQueueItem) +
(PlanQueue.size() + RunningQueue.size()) * sizeof(TPlanQueueItem) +
@@ -254,7 +258,9 @@ ui64 TColumnShard::MemoryUsage() const {
(WaitingReads.size() + WaitingScans.size()) * (sizeof(TRowVersion) + sizeof(void*)) +
TabletCounters->Simple()[COUNTER_PREPARED_RECORDS].Get() * sizeof(NOlap::TInsertedData) +
TabletCounters->Simple()[COUNTER_COMMITTED_RECORDS].Get() * sizeof(NOlap::TInsertedData);
- memory += TablesManager.GetMemoryUsage();
+ if (PrimaryIndex) {
+ memory += PrimaryIndex->MemoryUsage();
+ }
memory += BatchCache.Bytes();
return memory;
}
@@ -323,9 +329,9 @@ void TColumnShard::SendPeriodicStats() {
tabletStats->SetTxRejectedBySpace(TabletCounters->Cumulative()[COUNTER_OUT_OF_SPACE].Get());
tabletStats->SetInFlightTxCount(Executor()->GetStats().TxInFly);
- if (TablesManager.HasPrimaryIndex()) {
- const auto& indexStats = TablesManager.MutablePrimaryIndex().GetTotalStats();
- NOlap::TSnapshot lastIndexUpdate = TablesManager.GetPrimaryIndexSafe().LastUpdate();
+ if (PrimaryIndex) {
+ const auto& indexStats = PrimaryIndex->GetTotalStats();
+ NOlap::TSnapshot lastIndexUpdate = PrimaryIndex->LastUpdate();
auto activeIndexStats = indexStats.Active(); // data stats excluding inactive and evicted
tabletStats->SetRowCount(activeIndexStats.Rows);
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp
index d3cc84cf09f..23a25c8d83d 100644
--- a/ydb/core/tx/columnshard/columnshard__init.cpp
+++ b/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -41,7 +41,8 @@ void TTxInit::SetDefaults() {
Self->PlanQueue.clear();
Self->AltersInFlight.clear();
Self->CommitsInFlight.clear();
- Self->TablesManager.Clear();
+ Self->SchemaPresets.clear();
+ Self->Tables.clear();
Self->LongTxWrites.clear();
Self->LongTxWritesByUniqueId.clear();
}
@@ -136,10 +137,132 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
}
}
- Self->TablesManager.InitFromDB(db, Self->TabletID());
- Self->SetCounter(COUNTER_TABLES, Self->TablesManager.GetTables().size());
- Self->SetCounter(COUNTER_TABLE_PRESETS, Self->TablesManager.GetSchemaPresets().size());
- Self->SetCounter(COUNTER_TABLE_TTLS, Self->TablesManager.GetTtl().PathsCount());
+ // Primary index defaut schema and TTL (both are versioned)
+ TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaPreset;
+ TMap<NOlap::TSnapshot, NOlap::TIndexInfo> commonSchema;
+ THashMap<ui64, TMap<TRowVersion, TTtl::TDescription>> ttls;
+
+ { // Load schema presets
+ auto rowset = db.Table<Schema::SchemaPresetInfo>().Select();
+ if (!rowset.IsReady())
+ return false;
+
+ while (!rowset.EndOfSet()) {
+ const ui32 id = rowset.GetValue<Schema::SchemaPresetInfo::Id>();
+ auto& preset = Self->SchemaPresets[id];
+ preset.Id = id;
+ if (id) {
+ preset.Name = rowset.GetValue<Schema::SchemaPresetInfo::Name>();
+ }
+ Y_VERIFY(!id || preset.Name == "default", "Unsupported preset at load time");
+
+ if (rowset.HaveValue<Schema::SchemaPresetInfo::DropStep>() &&
+ rowset.HaveValue<Schema::SchemaPresetInfo::DropTxId>())
+ {
+ preset.DropVersion.Step = rowset.GetValue<Schema::SchemaPresetInfo::DropStep>();
+ preset.DropVersion.TxId = rowset.GetValue<Schema::SchemaPresetInfo::DropTxId>();
+ }
+
+ if (!rowset.Next())
+ return false;
+ }
+ }
+
+ { // Load schema preset versions
+ auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select();
+ if (!rowset.IsReady())
+ return false;
+
+ while (!rowset.EndOfSet()) {
+ const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>();
+ Y_VERIFY(Self->SchemaPresets.contains(id));
+ auto& preset = Self->SchemaPresets.at(id);
+ TRowVersion version(
+ rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(),
+ rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>());
+ auto& info = preset.Versions[version];
+ Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>()));
+
+ NOlap::TSnapshot snap{version.Step, version.TxId};
+ if (!id) {
+ commonSchema.emplace(snap, Self->ConvertSchema(info.GetSchema()));
+ } else if (preset.Name == "default") {
+ schemaPreset.emplace(snap, Self->ConvertSchema(info.GetSchema()));
+ }
+
+ if (!rowset.Next())
+ return false;
+ }
+ }
+
+ { // Load tables
+ auto rowset = db.Table<Schema::TableInfo>().Select();
+ if (!rowset.IsReady())
+ return false;
+
+ while (!rowset.EndOfSet()) {
+ const ui64 pathId = rowset.GetValue<Schema::TableInfo::PathId>();
+ auto& table = Self->Tables[pathId];
+ table.PathId = pathId;
+ table.TieringUsage = rowset.GetValue<Schema::TableInfo::TieringUsage>();
+ if (rowset.HaveValue<Schema::TableInfo::DropStep>() &&
+ rowset.HaveValue<Schema::TableInfo::DropTxId>())
+ {
+ table.DropVersion.Step = rowset.GetValue<Schema::TableInfo::DropStep>();
+ table.DropVersion.TxId = rowset.GetValue<Schema::TableInfo::DropTxId>();
+ Self->PathsToDrop.insert(pathId);
+ }
+
+ if (!rowset.Next())
+ return false;
+ }
+ }
+
+ { // Load table versions
+ auto rowset = db.Table<Schema::TableVersionInfo>().Select();
+ if (!rowset.IsReady())
+ return false;
+
+ while (!rowset.EndOfSet()) {
+ const ui64 pathId = rowset.GetValue<Schema::TableVersionInfo::PathId>();
+ Y_VERIFY(Self->Tables.contains(pathId));
+ auto& table = Self->Tables.at(pathId);
+ TRowVersion version(
+ rowset.GetValue<Schema::TableVersionInfo::SinceStep>(),
+ rowset.GetValue<Schema::TableVersionInfo::SinceTxId>());
+ auto& info = table.Versions[version];
+ Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>()));
+
+ if (!Self->PathsToDrop.count(pathId)) {
+ auto& ttlSettings = info.GetTtlSettings();
+ if (ttlSettings.HasEnabled()) {
+ ttls[pathId].emplace(version, TTtl::TDescription(ttlSettings.GetEnabled()));
+ }
+ }
+
+ if (!rowset.Next())
+ return false;
+ }
+ }
+
+ for (auto& [pathId, map] : ttls) {
+ auto& description = map.rbegin()->second; // last version if many
+ Self->Ttl.SetPathTtl(pathId, std::move(description));
+ }
+
+ Self->SetCounter(COUNTER_TABLES, Self->Tables.size());
+ Self->SetCounter(COUNTER_TABLE_PRESETS, Self->SchemaPresets.size());
+ Self->SetCounter(COUNTER_TABLE_TTLS, ttls.size());
+
+ if (!schemaPreset.empty()) {
+ Y_VERIFY(commonSchema.empty(), "Mix of schema preset and common schema");
+ Self->SetPrimaryIndex(std::move(schemaPreset));
+ } else if (!commonSchema.empty()) {
+ Self->SetPrimaryIndex(std::move(commonSchema));
+ } else {
+ Y_VERIFY(Self->Tables.empty());
+ Y_VERIFY(Self->SchemaPresets.empty());
+ }
{ // Load long tx writes
auto rowset = db.Table<Schema::LongTxWrites>().Select();
@@ -187,8 +310,13 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
}
}
- if (!Self->TablesManager.LoadIndex(dbTable, lostEvictions)) {
- return false;
+ // Load primary index
+ if (Self->PrimaryIndex) {
+ TBlobGroupSelector dsGroupSelector(Self->Info());
+ NOlap::TDbWrapper idxDB(txc.DB, &dsGroupSelector);
+ if (!Self->PrimaryIndex->Load(idxDB, lostEvictions, Self->PathsToDrop)) {
+ return false;
+ }
}
// Set dropped evicting records to be erased in future cleanups
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index 2ab3ab2a8e8..beaa2f7b770 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -142,7 +142,8 @@ public:
}
auto pathExists = [&](ui64 pathId) {
- return Self->TablesManager.HasTable(pathId);
+ auto it = Self->Tables.find(pathId);
+ return it != Self->Tables.end() && !it->second.IsDropped();
};
auto counters = Self->InsertTable->Commit(dbTable, step, txId, meta.MetaShard, meta.WriteIds,
diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
index c6a9274b20c..de3bf81635b 100644
--- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
+++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
@@ -276,13 +276,13 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
break;
}
- if (!Self->TablesManager.HasPrimaryIndex()) {
+ if (!Self->PrimaryIndex) {
statusMessage = "No primary index for TTL";
status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
break;
}
- auto schema = Self->TablesManager.GetIndexInfo().ArrowSchema();
+ auto schema = Self->PrimaryIndex->GetIndexInfo().ArrowSchema();
auto ttlColumn = schema->GetFieldByName(columnName);
if (!ttlColumn) {
statusMessage = "TTL tx wrong TTL column '" + columnName + "'";
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp
index 3857498fe19..8fe9ba771a8 100644
--- a/ydb/core/tx/columnshard/columnshard__read.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read.cpp
@@ -38,13 +38,13 @@ private:
bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) {
Y_VERIFY(Ev);
- Y_VERIFY(Self->TablesManager.HasPrimaryIndex());
+ Y_VERIFY(Self->PrimaryIndex);
Y_UNUSED(txc);
LOG_S_DEBUG("TTxRead.Execute at tablet " << Self->TabletID());
txc.DB.NoMoreReadsForTx();
- const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo();
+ const NOlap::TIndexInfo& indexInfo = Self->PrimaryIndex->GetIndexInfo();
auto& record = Proto(Ev->Get());
ui64 metaShard = record.GetTxInitiator();
@@ -53,7 +53,7 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) {
read.PlanStep = record.GetPlanStep();
read.TxId = record.GetTxId();
read.PathId = record.GetTableId();
- read.ReadNothing = !(Self->TablesManager.HasTable(read.PathId));
+ read.ReadNothing = Self->PathsToDrop.count(read.PathId);
read.ColumnIds = ProtoToVector<ui32>(record.GetColumnIds());
read.ColumnNames = ProtoToVector<TString>(record.GetColumnNames());
if (read.ColumnIds.empty() && read.ColumnNames.empty()) {
@@ -75,11 +75,11 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) {
}
bool parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read,
- TIndexColumnResolver(Self->TablesManager.GetIndexInfo()));
+ TIndexColumnResolver(Self->PrimaryIndex->GetIndexInfo()));
std::shared_ptr<NOlap::TReadMetadata> metadata;
if (parseResult) {
- metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(), Self->BatchCache,
+ metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, Self->BatchCache,
ErrorDescription);
}
diff --git a/ydb/core/tx/columnshard/columnshard__read_base.cpp b/ydb/core/tx/columnshard/columnshard__read_base.cpp
index 63c8239a6e7..53b659cfd45 100644
--- a/ydb/core/tx/columnshard/columnshard__read_base.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read_base.cpp
@@ -192,7 +192,7 @@ bool TTxReadBase::ParseProgram(const TActorContext& ctx, NKikimrSchemeOp::EOlapP
ErrorDescription = TStringBuilder() << "Wrong olap program";
return false;
}
- if (!ssaProgram->Steps.empty() && Self->TablesManager.HasPrimaryIndex()) {
+ if (!ssaProgram->Steps.empty() && Self->PrimaryIndex) {
NSsa::OptimizeProgram(*ssaProgram);
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 8c17c5b83b9..da8da9ee25e 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -842,9 +842,9 @@ std::shared_ptr<NOlap::TReadMetadataBase> TTxScan::CreateReadMetadata(const TAct
{
std::shared_ptr<NOlap::TReadMetadataBase> metadata;
if (indexStats) {
- metadata = PrepareStatsReadMetadata(Self->TabletID(), read, Self->TablesManager.GetPrimaryIndex(), ErrorDescription);
+ metadata = PrepareStatsReadMetadata(Self->TabletID(), read, Self->PrimaryIndex, ErrorDescription);
} else {
- metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(), Self->BatchCache,
+ metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, Self->BatchCache,
ErrorDescription);
}
@@ -880,7 +880,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
read.PlanStep = snapshot.GetStep();
read.TxId = snapshot.GetTxId();
read.PathId = record.GetLocalPathId();
- read.ReadNothing = !(Self->TablesManager.HasTable(read.PathId));
+ read.ReadNothing = Self->PathsToDrop.count(read.PathId);
read.TableName = record.GetTablePath();
bool isIndexStats = read.TableName.EndsWith(NOlap::TIndexInfo::STORE_INDEX_STATS_TABLE) ||
read.TableName.EndsWith(NOlap::TIndexInfo::TABLE_INDEX_STATS_TABLE);
@@ -891,7 +891,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
// "SELECT COUNT(*)" requests empty column list but we need non-empty list for PrepareReadMetadata.
// So we add first PK column to the request.
if (!isIndexStats) {
- read.ColumnIds.push_back(Self->TablesManager.GetIndexInfo().GetPKFirstColumnId());
+ read.ColumnIds.push_back(Self->PrimaryIndex->GetIndexInfo().GetPKFirstColumnId());
} else {
read.ColumnIds.push_back(PrimaryIndexStatsSchema.KeyColumns.front());
}
@@ -900,7 +900,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
bool parseResult;
if (!isIndexStats) {
- TIndexColumnResolver columnResolver(Self->TablesManager.GetIndexInfo());
+ TIndexColumnResolver columnResolver(Self->PrimaryIndex->GetIndexInfo());
parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read, columnResolver);
} else {
TStatsColumnResolver columnResolver;
@@ -926,7 +926,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
auto ydbKey = isIndexStats ?
NOlap::GetColumns(PrimaryIndexStatsSchema, PrimaryIndexStatsSchema.KeyColumns) :
- Self->TablesManager.GetIndexInfo().GetPrimaryKey();
+ Self->PrimaryIndex->GetIndexInfo().GetPrimaryKey();
for (auto& range: record.GetRanges()) {
FillPredicatesFromRange(read, range, ydbKey, Self->TabletID());
@@ -1023,7 +1023,7 @@ void TTxScan::Complete(const TActorContext& ctx) {
if (request.GetReverse()) {
std::reverse(rMetadataRanges.begin(), rMetadataRanges.end());
}
- NOlap::NCosts::TKeyRangesBuilder krBuilder(Self->TablesManager.GetIndexInfo());
+ NOlap::NCosts::TKeyRangesBuilder krBuilder(Self->PrimaryIndex->GetIndexInfo());
{
ui32 recordsCount = 0;
for (auto&& i : rMetadataRanges) {
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index e27ca500ac5..7d42fb25c5e 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -49,7 +49,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
Y_VERIFY(logoBlobId.IsValid());
bool ok = false;
- if (!Self->TablesManager.HasPrimaryIndex() || !Self->TablesManager.IsWritableTable(tableId)) {
+ if (!Self->PrimaryIndex || !Self->IsTableWritable(tableId)) {
status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
} else {
if (record.HasLongTxId()) {
@@ -138,15 +138,15 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
TString dedupId = record.GetDedupId();
auto putStatus = ev->Get()->PutStatus;
- bool isWritable = TablesManager.IsWritableTable(tableId);
- bool error = data.empty() || data.size() > TLimits::GetMaxBlobSize() || !TablesManager.HasPrimaryIndex() || !isWritable;
+ bool isWritable = IsTableWritable(tableId);
+ bool error = data.empty() || data.size() > TLimits::GetMaxBlobSize() || !PrimaryIndex || !isWritable;
bool errorReturned = (putStatus != NKikimrProto::OK) && (putStatus != NKikimrProto::UNKNOWN);
bool isOutOfSpace = IsAnyChannelYellowStop();
if (error || errorReturned) {
LOG_S_NOTICE("Write (fail) " << data.size() << " bytes into pathId " << tableId
<< ", status " << putStatus
- << (TablesManager.HasPrimaryIndex()? "": ", no index") << (isWritable? "": ", ro")
+ << (PrimaryIndex? "": ", no index") << (isWritable? "": ", ro")
<< " at tablet " << TabletID());
IncCounter(COUNTER_WRITE_FAIL);
@@ -221,7 +221,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize;
++WritesInFly; // write started
- ctx.Register(CreateWriteActor(TabletID(), TablesManager.GetIndexInfo(), ctx.SelfID,
+ ctx.Register(CreateWriteActor(TabletID(), PrimaryIndex->GetIndexInfo(), ctx.SelfID,
BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release()));
}
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index 8ddfd4e2965..61149c87fa4 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -43,7 +43,7 @@ private:
bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) {
Y_VERIFY(Ev);
Y_VERIFY(Self->InsertTable);
- Y_VERIFY(Self->TablesManager.HasPrimaryIndex());
+ Y_VERIFY(Self->PrimaryIndex);
txc.DB.NoMoreReadsForTx();
@@ -66,7 +66,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector);
- ok = Self->TablesManager.MutablePrimaryIndex().ApplyChanges(dbWrap, changes, snapshot); // update changes + apply
+ ok = Self->PrimaryIndex->ApplyChanges(dbWrap, changes, snapshot); // update changes + apply
if (ok) {
LOG_S_DEBUG("TTxWriteIndex (" << changes->TypeString() << ") apply at tablet " << Self->TabletID());
@@ -248,7 +248,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
Schema::SaveSpecialValue(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo);
}
- Self->TablesManager.MutablePrimaryIndex().FreeLocks(changes);
+ Self->PrimaryIndex->FreeLocks(changes);
if (changes->IsInsert()) {
Self->ActiveIndexingOrCompaction = false;
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 5db1a69b302..41f3d952b3e 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -395,6 +395,46 @@ void TColumnShard::ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo&
}
}
+bool TColumnShard::IsTableWritable(ui64 tableId) const {
+ auto it = Tables.find(tableId);
+ if (it == Tables.end()) {
+ return false;
+ }
+ return !it->second.IsDropped();
+}
+
+ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, ui32 presetId, const TString& name,
+ const NKikimrSchemeOp::TColumnTableSchema& schemaProto,
+ const TRowVersion& version) {
+ if (!SchemaPresets.contains(presetId)) {
+ LOG_S_DEBUG("EnsureSchemaPreset " << presetId << " at tablet " << TabletID());
+
+ auto& preset = SchemaPresets[presetId];
+ preset.Id = presetId;
+ preset.Name = name;
+ auto& info = preset.Versions[version];
+ info.SetId(preset.Id);
+ info.SetSinceStep(version.Step);
+ info.SetSinceTxId(version.TxId);
+ *info.MutableSchema() = schemaProto;
+
+ Schema::SaveSchemaPresetInfo(db, preset.Id, preset.Name);
+ Schema::SaveSchemaPresetVersionInfo(db, preset.Id, version, info);
+ SetCounter(COUNTER_TABLE_PRESETS, SchemaPresets.size());
+ } else {
+ LOG_S_DEBUG("EnsureSchemaPreset for existed preset " << presetId << " at tablet " << TabletID());
+ }
+
+ return presetId;
+}
+
+ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto,
+ const TRowVersion& version) {
+ Y_VERIFY(presetProto.GetName() == "default", "Only schema preset named 'default' is supported");
+
+ return EnsureSchemaPreset(db, presetProto.GetId(), presetProto.GetName(), presetProto.GetSchema(), version);
+}
+
void TColumnShard::RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version,
NTabletFlatExecutor::TTransactionContext& txc) {
switch (body.TxBody_case()) {
@@ -454,53 +494,70 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
NIceDb::TNiceDb db(txc.DB);
const ui64 pathId = tableProto.GetPathId();
- if (TablesManager.HasTable(pathId)) {
- LOG_S_DEBUG("EnsureTable for existed pathId: " << pathId << " at tablet " << TabletID());
- return;
- }
+ if (!Tables.contains(pathId)) {
+ LOG_S_DEBUG("EnsureTable for pathId: " << pathId
+ << " ttl settings: " << tableProto.GetTtlSettings()
+ << " at tablet " << TabletID());
- LOG_S_DEBUG("EnsureTable for pathId: " << pathId
- << " ttl settings: " << tableProto.GetTtlSettings()
- << " at tablet " << TabletID());
+ ui32 schemaPresetId = 0;
+ if (tableProto.HasSchemaPreset()) {
+ Y_VERIFY(!tableProto.HasSchema(), "Tables has either schema or preset");
- TTableInfo::TTableVersionInfo tableVerProto;
- tableVerProto.SetPathId(pathId);
+ schemaPresetId = EnsureSchemaPreset(db, tableProto.GetSchemaPreset(), version);
+ Y_VERIFY(schemaPresetId);
+ } else {
+ Y_VERIFY(tableProto.HasSchema(), "Tables has either schema or preset");
- if (tableProto.HasSchemaPreset()) {
- Y_VERIFY(!tableProto.HasSchema(), "Tables has either schema or preset");
+ // Save first table schema as common one with schemaPresetId == 0
- TSchemaPreset preset;
- preset.Deserialize(tableProto.GetSchemaPreset());
- Y_VERIFY(!preset.IsStandaloneTable());
- tableVerProto.SetSchemaPresetId(preset.GetId());
-
- if (TablesManager.RegisterSchemaPreset(preset, db)) {
- TablesManager.AddPresetVersion(tableProto.GetSchemaPreset().GetId(), version, tableProto.GetSchemaPreset().GetSchema(), db);
+ if (SchemaPresets.count(0)) {
+ LOG_S_WARN("Colocated standalone tables are not supported. "
+ << "EnsureTable failed at tablet " << TabletID());
+ return;
+ }
+
+ schemaPresetId = EnsureSchemaPreset(db, 0, "", tableProto.GetSchema(), version);
+ Y_VERIFY(!schemaPresetId);
}
- } else {
- Y_VERIFY(tableProto.HasSchema(), "Tables has either schema or preset");
- *tableVerProto.MutableSchema() = tableProto.GetSchema();
- }
- TTableInfo table(pathId);
- if (tableProto.HasTtlSettings()) {
- const auto& ttlSettings = tableProto.GetTtlSettings();
- *tableVerProto.MutableTtlSettings() = ttlSettings;
- if (ttlSettings.HasUseTiering()) {
- table.SetTieringUsage(ttlSettings.GetUseTiering());
- ActivateTiering(pathId, table.GetTieringUsage());
+ auto& table = Tables[pathId];
+ table.PathId = pathId;
+ auto& tableVerProto = table.Versions[version];
+ tableVerProto.SetPathId(pathId);
+ tableVerProto.SetSchemaPresetId(schemaPresetId);
+
+ if (tableProto.HasTtlSettings()) {
+ *tableVerProto.MutableTtlSettings() = tableProto.GetTtlSettings();
+ auto& ttlInfo = tableProto.GetTtlSettings();
+ if (ttlInfo.HasEnabled()) {
+ Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlInfo.GetEnabled()));
+ SetCounter(COUNTER_TABLE_TTLS, Ttl.PathsCount());
+ }
+ if (ttlInfo.HasUseTiering()) {
+ table.TieringUsage = ttlInfo.GetUseTiering();
+ ActivateTiering(pathId, table.TieringUsage);
+ }
}
- }
- tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj());
- tableVerProto.SetTtlSettingsPresetVersionAdj(tableProto.GetTtlSettingsPresetVersionAdj());
+ if (!PrimaryIndex) {
+ TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaHistory;
+
+ auto& schemaPresetVerProto = SchemaPresets[schemaPresetId].Versions[version];
+ schemaHistory.emplace(NOlap::TSnapshot{version.Step, version.TxId},
+ ConvertSchema(schemaPresetVerProto.GetSchema()));
- TablesManager.RegisterTable(std::move(table), db);
- TablesManager.AddTableVersion(pathId, version, tableVerProto, db);
+ SetPrimaryIndex(std::move(schemaHistory));
+ }
+
+ tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj());
+ tableVerProto.SetTtlSettingsPresetVersionAdj(tableProto.GetTtlSettingsPresetVersionAdj());
- SetCounter(COUNTER_TABLES, TablesManager.GetTables().size());
- SetCounter(COUNTER_TABLE_PRESETS, TablesManager.GetSchemaPresets().size());
- SetCounter(COUNTER_TABLE_TTLS, TablesManager.GetTtl().PathsCount());
+ Schema::SaveTableInfo(db, table.PathId, table.TieringUsage);
+ Schema::SaveTableVersionInfo(db, table.PathId, version, tableVerProto);
+ SetCounter(COUNTER_TABLES, Tables.size());
+ } else {
+ LOG_S_DEBUG("EnsureTable for existed pathId: " << pathId << " at tablet " << TabletID());
+ }
}
void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterProto, const TRowVersion& version,
@@ -508,33 +565,39 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
NIceDb::TNiceDb db(txc.DB);
const ui64 pathId = alterProto.GetPathId();
- Y_VERIFY(TablesManager.HasTable(pathId), "AlterTable on a dropped or non-existent table");
-
+ auto* tablePtr = Tables.FindPtr(pathId);
+ Y_VERIFY(tablePtr && !tablePtr->IsDropped(), "AlterTable on a dropped or non-existent table");
+ auto& table = *tablePtr;
+ auto& ttlSettings = alterProto.GetTtlSettings();
+
LOG_S_DEBUG("AlterTable for pathId: " << pathId
<< " schema: " << alterProto.GetSchema()
- << " ttl settings: " << alterProto.GetTtlSettings()
+ << " ttl settings: " << ttlSettings
<< " at tablet " << TabletID());
- TTableInfo::TTableVersionInfo tableVerProto;
+ auto& info = table.Versions[version];
+
if (alterProto.HasSchemaPreset()) {
- tableVerProto.SetSchemaPresetId(alterProto.GetSchemaPreset().GetId());
- TablesManager.AddPresetVersion(alterProto.GetSchemaPreset().GetId(), version, alterProto.GetSchemaPreset().GetSchema(), db);
- } else if (alterProto.HasSchema()) {
- *tableVerProto.MutableSchema() = alterProto.GetSchema();
+ info.SetSchemaPresetId(EnsureSchemaPreset(db, alterProto.GetSchemaPreset(), version));
}
- const auto& ttlSettings = alterProto.GetTtlSettings(); // Note: Not valid behaviour for full alter implementation
const TString& tieringUsage = ttlSettings.GetUseTiering();
+ ActivateTiering(pathId, tieringUsage);
if (alterProto.HasTtlSettings()) {
- const auto& ttlSettings = alterProto.GetTtlSettings();
- *tableVerProto.MutableTtlSettings() = ttlSettings;
+ *info.MutableTtlSettings() = ttlSettings;
+ if (ttlSettings.HasEnabled()) {
+ Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlSettings.GetEnabled()));
+ } else {
+ Ttl.DropPathTtl(pathId);
+ }
+ } else {
+ Ttl.DropPathTtl(pathId);
}
- ActivateTiering(pathId, tieringUsage);
- Schema::SaveTableInfo(db, pathId, tieringUsage);
+ Ttl.Repeat(); // Atler TTL triggers TTL activity
- tableVerProto.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj());
- TablesManager.AddTableVersion(pathId, version, tableVerProto, db);
- TablesManager.OnTtlUpdate();
+ info.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj());
+ Schema::SaveTableInfo(db, table.PathId, tieringUsage);
+ Schema::SaveTableVersionInfo(db, table.PathId, version, info);
}
void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const TRowVersion& version,
@@ -542,20 +605,25 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt
NIceDb::TNiceDb db(txc.DB);
const ui64 pathId = dropProto.GetPathId();
- if (!TablesManager.HasTable(pathId)) {
+ auto* table = Tables.FindPtr(pathId);
+ Y_VERIFY_DEBUG(table && !table->IsDropped());
+ if (table && !table->IsDropped()) {
+ LOG_S_DEBUG("DropTable for pathId: " << pathId << " at tablet " << TabletID());
+
+ PathsToDrop.insert(pathId);
+ Ttl.DropPathTtl(pathId);
+
+ // TODO: Allow to read old snapshots after DROP
+ TBlobGroupSelector dsGroupSelector(Info());
+ NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
+ THashSet<TWriteId> writesToAbort = InsertTable->DropPath(dbTable, pathId);
+ TryAbortWrites(db, dbTable, std::move(writesToAbort));
+
+ table->DropVersion = version;
+ Schema::SaveTableDropVersion(db, pathId, version.Step, version.TxId);
+ } else {
LOG_S_DEBUG("DropTable for unknown or deleted pathId: " << pathId << " at tablet " << TabletID());
- return;
}
-
- LOG_S_DEBUG("DropTable for pathId: " << pathId << " at tablet " << TabletID());
- TablesManager.DropTable(pathId, version, db);
-
- // TODO: Allow to read old snapshots after DROP
- TBlobGroupSelector dsGroupSelector(Info());
- NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
- THashSet<TWriteId> writesToAbort = InsertTable->DropPath(dbTable, pathId);
-
- TryAbortWrites(db, dbTable, std::move(writesToAbort));
}
void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, const TRowVersion& version,
@@ -567,18 +635,54 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto,
Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId);
}
+ TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaHistory;
+
for (ui32 id : proto.GetDroppedSchemaPresets()) {
- if (!TablesManager.HasPreset(id)) {
+ if (!SchemaPresets.contains(id)) {
continue;
}
- TablesManager.DropPreset(id, version, db);
+ auto& preset = SchemaPresets.at(id);
+ Y_VERIFY(preset.Name != "default", "Cannot drop the default preset");
+ preset.DropVersion = version;
+ Schema::SaveSchemaPresetDropVersion(db, id, version);
}
for (const auto& presetProto : proto.GetSchemaPresets()) {
- if (!TablesManager.HasPreset(presetProto.GetId())) {
+ if (!SchemaPresets.contains(presetProto.GetId())) {
continue; // we don't update presets that we don't use
}
- TablesManager.AddPresetVersion(presetProto.GetId(), version, presetProto.GetSchema(), db);
+
+ auto& preset = SchemaPresets[presetProto.GetId()];
+ auto& info = preset.Versions[version];
+ info.SetId(preset.Id);
+ info.SetSinceStep(version.Step);
+ info.SetSinceTxId(version.TxId);
+ *info.MutableSchema() = presetProto.GetSchema();
+
+ if (preset.Name == "default") {
+ schemaHistory.emplace(NOlap::TSnapshot{version.Step, version.TxId}, ConvertSchema(info.GetSchema()));
+ }
+
+ Schema::SaveSchemaPresetVersionInfo(db, preset.Id, version, info);
+ }
+
+ if (!schemaHistory.empty()) {
+ SetPrimaryIndex(std::move(schemaHistory));
+ }
+}
+
+void TColumnShard::SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions) {
+ for (auto& [snap, indexInfo] : schemaVersions) {
+ if (!PrimaryIndex) {
+ PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(std::move(indexInfo), TabletID());
+ SetCounter(COUNTER_INDEXES, 1);
+ } else {
+ PrimaryIndex->UpdateDefaultSchema(snap, std::move(indexInfo));
+ }
+ }
+
+ for (auto& columnName : Ttl.TtlColumns()) {
+ PrimaryIndex->GetIndexInfo().CheckTtlColumn(columnName);
}
}
@@ -660,7 +764,7 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() {
LOG_S_DEBUG("Indexing/compaction already in progress at tablet " << TabletID());
return {};
}
- if (!TablesManager.HasPrimaryIndex()) {
+ if (!PrimaryIndex) {
LOG_S_NOTICE("Indexing not started: no index at tablet " << TabletID());
return {};
}
@@ -681,7 +785,7 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() {
if (bytesToIndex && (bytesToIndex + dataSize) > (ui64)Limits.MaxInsertBytes) {
continue;
}
- if (auto* pMap = TablesManager.GetPrimaryIndexSafe().GetOverloadedGranules(data.PathId)) {
+ if (auto* pMap = PrimaryIndex->GetOverloadedGranules(data.PathId)) {
overloadedPathGranules[pathId] = pMap->size();
InsertTable->SetOverloaded(data.PathId, true);
++ignored;
@@ -729,13 +833,13 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() {
}
Y_VERIFY(data.size());
- auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(std::move(data));
+ auto indexChanges = PrimaryIndex->StartInsert(std::move(data));
if (!indexChanges) {
LOG_S_NOTICE("Cannot prepare indexing at tablet " << TabletID());
return {};
}
- auto actualIndexInfo = TablesManager.GetIndexInfo();
+ auto actualIndexInfo = PrimaryIndex->GetIndexInfo();
if (Tiers) {
auto pathTiering = Tiers->GetTiering(); // TODO: pathIds
actualIndexInfo.UpdatePathTiering(pathTiering);
@@ -753,13 +857,13 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() {
LOG_S_DEBUG("Compaction/indexing already in progress at tablet " << TabletID());
return {};
}
- if (!TablesManager.HasPrimaryIndex()) {
+ if (!PrimaryIndex) {
LOG_S_NOTICE("Compaction not started: no index at tablet " << TabletID());
return {};
}
- TablesManager.MutablePrimaryIndex().UpdateCompactionLimits(CompactionLimits.Get());
- auto compactionInfo = TablesManager.MutablePrimaryIndex().Compact(LastCompactedGranule);
+ PrimaryIndex->UpdateCompactionLimits(CompactionLimits.Get());
+ auto compactionInfo = PrimaryIndex->Compact(LastCompactedGranule);
if (!compactionInfo || compactionInfo->Empty()) {
LOG_S_DEBUG("Compaction not started: no portions to compact at tablet " << TabletID());
return {};
@@ -770,13 +874,13 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() {
LOG_S_DEBUG("Prepare " << *compactionInfo << " at tablet " << TabletID());
ui64 ourdatedStep = GetOutdatedStep();
- auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(std::move(compactionInfo), {ourdatedStep, 0});
+ auto indexChanges = PrimaryIndex->StartCompaction(std::move(compactionInfo), {ourdatedStep, 0});
if (!indexChanges) {
LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID());
return {};
}
- auto actualIndexInfo = TablesManager.GetIndexInfo();
+ auto actualIndexInfo = PrimaryIndex->GetIndexInfo();
if (Tiers) {
auto pathTiering = Tiers->GetTiering(); // TODO: pathIds
actualIndexInfo.UpdatePathTiering(pathTiering);
@@ -799,7 +903,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
LOG_S_DEBUG("Do not start TTL while eviction is in progress at tablet " << TabletID());
return {};
}
- if (!TablesManager.HasPrimaryIndex()) {
+ if (!PrimaryIndex) {
LOG_S_NOTICE("TTL not started. No index for TTL at tablet " << TabletID());
return {};
}
@@ -809,7 +913,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
if (Tiers) {
eviction = Tiers->GetTiering(); // TODO: pathIds
}
- TablesManager.AddTtls(eviction, TInstant::Now(), force);
+ Ttl.AddTtls(eviction, TInstant::Now(), force);
}
if (eviction.empty()) {
@@ -823,11 +927,11 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
LOG_S_DEBUG("Evicting path " << i.first << " with " << i.second.GetDebugString() << " at tablet " << TabletID());
}
- auto actualIndexInfo = TablesManager.GetIndexInfo();
+ auto actualIndexInfo = PrimaryIndex->GetIndexInfo();
actualIndexInfo.UpdatePathTiering(eviction);
std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges;
- indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction);
+ indexChanges = PrimaryIndex->StartTtl(eviction);
actualIndexInfo.SetPathTiering(std::move(eviction));
@@ -836,7 +940,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
return {};
}
if (indexChanges->NeedRepeat) {
- TablesManager.OnTtlUpdate();
+ Ttl.Repeat();
}
bool needWrites = !indexChanges->PortionsToEvict.empty();
@@ -851,14 +955,14 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
LOG_S_DEBUG("Cleanup already in progress at tablet " << TabletID());
return {};
}
- if (!TablesManager.HasPrimaryIndex()) {
+ if (!PrimaryIndex) {
LOG_S_NOTICE("Cleanup not started. No index for cleanup at tablet " << TabletID());
return {};
}
NOlap::TSnapshot cleanupSnapshot{GetMinReadStep(), 0};
- auto changes = TablesManager.StartIndexCleanup(cleanupSnapshot, TLimits::MAX_TX_RECORDS);
+ auto changes = PrimaryIndex->StartCleanup(cleanupSnapshot, PathsToDrop, TLimits::MAX_TX_RECORDS);
if (!changes) {
LOG_S_NOTICE("Cannot prepare cleanup at tablet " << TabletID());
return {};
@@ -891,7 +995,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
return {};
}
- auto actualIndexInfo = TablesManager.GetIndexInfo();
+ auto actualIndexInfo = PrimaryIndex->GetIndexInfo();
#if 0 // No need for now
if (Tiers) {
...
@@ -905,6 +1009,33 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
return ev;
}
+NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) {
+ Y_VERIFY(schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES);
+
+ ui32 indexId = 0;
+ NOlap::TIndexInfo indexInfo("", indexId);
+
+ for (const auto& col : schema.GetColumns()) {
+ const ui32 id = col.GetId();
+ const TString& name = col.GetName();
+ auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(),
+ col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr);
+ indexInfo.Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod);
+ indexInfo.ColumnNames[name] = id;
+ }
+
+ for (const auto& keyName : schema.GetKeyColumnNames()) {
+ Y_VERIFY(indexInfo.ColumnNames.count(keyName));
+ indexInfo.KeyColumns.push_back(indexInfo.ColumnNames[keyName]);
+ }
+
+ if (schema.HasDefaultCompression()) {
+ NOlap::TCompression compression = NTiers::ConvertCompression(schema.GetDefaultCompression());
+ indexInfo.SetDefaultCompression(compression);
+ }
+
+ return indexInfo;
+}
void TColumnShard::MapExternBlobs(const TActorContext& /*ctx*/, NOlap::TReadMetadata& metadata) {
if (!metadata.SelectInfo) {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 383d51a3592..3283e97440e 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -5,7 +5,6 @@
#include "columnshard_ttl.h"
#include "columnshard_private_events.h"
#include "blob_manager.h"
-#include "tables_manager.h"
#include "inflight_request_tracker.h"
#include <ydb/core/tablet/tablet_counters.h>
@@ -332,8 +331,31 @@ private:
}
};
- using TSchemaPreset = TSchemaPreset;
- using TTableInfo = TTableInfo;
+ struct TSchemaPreset {
+ using TSchemaPresetVersionInfo = NKikimrTxColumnShard::TSchemaPresetVersionInfo;
+
+ ui32 Id;
+ TString Name;
+ TMap<TRowVersion, TSchemaPresetVersionInfo> Versions;
+ TRowVersion DropVersion = TRowVersion::Max();
+
+ bool IsDropped() const {
+ return DropVersion != TRowVersion::Max();
+ }
+ };
+
+ struct TTableInfo {
+ using TTableVersionInfo = NKikimrTxColumnShard::TTableVersionInfo;
+
+ ui64 PathId;
+ std::map<TRowVersion, TTableVersionInfo> Versions;
+ TRowVersion DropVersion = TRowVersion::Max();
+ TString TieringUsage;
+
+ bool IsDropped() const {
+ return DropVersion != TRowVersion::Max();
+ }
+ };
struct TLongTxWriteInfo {
ui64 WriteId;
@@ -342,8 +364,6 @@ private:
ui64 PreparedTxId = 0;
};
- TTablesManager TablesManager;
-
ui64 CurrentSchemeShardId = 0;
TMessageSeqNo LastSchemaSeqNo;
std::optional<NKikimrSubDomains::TProcessingParams> ProcessingParams;
@@ -381,7 +401,9 @@ private:
TTabletCountersBase* TabletCounters;
std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache;
std::unique_ptr<NOlap::TInsertTable> InsertTable;
+ std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex;
TBatchCache BatchCache;
+ TTtl Ttl;
THashMap<ui64, TBasicTxInfo> BasicTxInfo;
TSet<TDeadlineQueueItem> DeadlineQueue;
@@ -391,11 +413,14 @@ private:
THashMap<ui64, TInstant> ScanTxInFlight;
THashMap<ui64, TAlterMeta> AltersInFlight;
THashMap<ui64, TCommitMeta> CommitsInFlight; // key is TxId from propose
+ THashMap<ui32, TSchemaPreset> SchemaPresets;
+ THashMap<ui64, TTableInfo> Tables;
THashMap<TWriteId, TLongTxWriteInfo> LongTxWrites;
using TPartsForLTXShard = THashMap<ui32, TLongTxWriteInfo*>;
THashMap<TULID, TPartsForLTXShard> LongTxWritesByUniqueId;
TMultiMap<TRowVersion, TEvColumnShard::TEvRead::TPtr> WaitingReads;
TMultiMap<TRowVersion, TEvColumnShard::TEvScan::TPtr> WaitingScans;
+ THashSet<ui64> PathsToDrop;
bool ActiveIndexingOrCompaction = false;
bool ActiveCleanup = false;
bool ActiveTtl = false;
@@ -430,7 +455,7 @@ private:
}
bool IndexOverloaded() const {
- return TablesManager.IndexOverloaded();
+ return PrimaryIndex && PrimaryIndex->HasOverloadedGranules();
}
TWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId);
@@ -447,13 +472,22 @@ private:
void UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExecutor::TTransactionContext& txc);
void ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo& seqNoProto, NTabletFlatExecutor::TTransactionContext& txc);
+ bool IsTableWritable(ui64 tableId) const;
+
+ ui32 EnsureSchemaPreset(NIceDb::TNiceDb& db, ui32 presetId, const TString& name,
+ const NKikimrSchemeOp::TColumnTableSchema& schemaProto, const TRowVersion& version);
+ ui32 EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, const TRowVersion& version);
+ //ui32 EnsureTtlSettingsPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableTtlSettingsPreset& presetProto, const TRowVersion& version);
+
void RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
void RunInit(const NKikimrTxColumnShard::TInitShard& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
void RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
void RunAlterTable(const NKikimrTxColumnShard::TAlterTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
void RunDropTable(const NKikimrTxColumnShard::TDropTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
void RunAlterStore(const NKikimrTxColumnShard::TAlterStore& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
+ void SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions);
+ NOlap::TIndexInfo ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema);
void MapExternBlobs(const TActorContext& ctx, NOlap::TReadMetadata& metadata);
TActorId GetS3ActorForTier(const TString& tierId) const;
void ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName, ui64 pathId,
diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp
deleted file mode 100644
index cbcf0abe8c3..00000000000
--- a/ydb/core/tx/columnshard/tables_manager.cpp
+++ /dev/null
@@ -1,324 +0,0 @@
-#include "tables_manager.h"
-#include "columnshard_schema.h"
-#include "blob_manager_db.h"
-#include "engines/column_engine_logs.h"
-#include <ydb/core/scheme/scheme_types_proto.h>
-#include <ydb/core/tx/tiering/manager.h>
-
-
-
-namespace NKikimr::NColumnShard {
-
-void TSchemaPreset::Deserialize(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) {
- Id = presetProto.GetId();
- Name = presetProto.GetName();
-}
-
-bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db, const ui64 tabletId) {
- TabletId = tabletId;
- {
- auto rowset = db.Table<Schema::TableInfo>().Select();
- if (!rowset.IsReady()) {
- return false;
- }
-
- while (!rowset.EndOfSet()) {
- TTableInfo table;
- if (!table.InitFromDB(rowset)) {
- return false;
- }
- if (table.IsDropped()) {
- PathsToDrop.insert(table.GetPathId());
- }
- Tables.insert_or_assign(table.GetPathId(), std::move(table));
-
- if (!rowset.Next()) {
- return false;
- }
- }
- }
-
- bool isFakePresetOnly = true;
- {
- auto rowset = db.Table<Schema::SchemaPresetInfo>().Select();
- if (!rowset.IsReady()) {
- return false;
- }
-
- while (!rowset.EndOfSet()) {
- TSchemaPreset preset;
- preset.InitFromDB(rowset);
-
- if (preset.IsStandaloneTable()) {
- Y_VERIFY_S(!preset.GetName(), "Preset name: " + preset.GetName());
- } else {
- Y_VERIFY_S(preset.GetName() == "default", "Preset name: " + preset.GetName());
- isFakePresetOnly = false;
- }
- SchemaPresets.insert_or_assign(preset.GetId(), preset);
- if (!rowset.Next()) {
- return false;
- }
- }
- }
-
- {
- auto rowset = db.Table<Schema::TableVersionInfo>().Select();
- if (!rowset.IsReady()) {
- return false;
- }
-
- THashMap<ui64, TRowVersion> lastVersion;
- while (!rowset.EndOfSet()) {
- const ui64 pathId = rowset.GetValue<Schema::TableVersionInfo::PathId>();
- Y_VERIFY(Tables.contains(pathId));
- TRowVersion version(
- rowset.GetValue<Schema::TableVersionInfo::SinceStep>(),
- rowset.GetValue<Schema::TableVersionInfo::SinceTxId>());
-
- auto& table = Tables.at(pathId);
- TTableInfo::TTableVersionInfo versionInfo;
- Y_VERIFY(versionInfo.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>()));
- Y_VERIFY(SchemaPresets.contains(versionInfo.GetSchemaPresetId()));
-
- if (!table.IsDropped()) {
- auto& ttlSettings = versionInfo.GetTtlSettings();
- if (ttlSettings.HasEnabled()) {
- if (!lastVersion.contains(pathId) || lastVersion[pathId] < version) {
- TTtl::TDescription description(ttlSettings.GetEnabled());
- Ttl.SetPathTtl(pathId, std::move(description));
- lastVersion[pathId] = version;
- }
- }
- }
- table.AddVersion(version, versionInfo);
- if (!rowset.Next()) {
- return false;
- }
- }
- }
-
- {
- auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select();
- if (!rowset.IsReady()) {
- return false;
- }
-
- while (!rowset.EndOfSet()) {
- const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>();
- Y_VERIFY(SchemaPresets.contains(id));
- auto& preset = SchemaPresets.at(id);
- TRowVersion version(
- rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(),
- rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>());
-
- TSchemaPreset::TSchemaPresetVersionInfo info;
- Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>()));
- preset.AddVersion(version, info);
- if (!rowset.Next()) {
- return false;
- }
- }
- }
-
- if (isFakePresetOnly) {
- Y_VERIFY(Tables.size() <= 1);
- if (!Tables.empty()) {
- for (const auto& [version, tableInfo] : Tables.begin()->second.GetVersions()) {
- if (tableInfo.HasSchema()) {
- IndexSchemaVersion(version, tableInfo.GetSchema());
- }
- }
- }
- } else {
- for (const auto& [id, preset] : SchemaPresets) {
- Y_VERIFY(!!id);
- for (const auto& [version, schemaInfo] : preset.GetVersions()) {
- if (schemaInfo.HasSchema()) {
- IndexSchemaVersion(version, schemaInfo.GetSchema());
- }
- }
- }
- }
- return true;
-}
-
-bool TTablesManager::LoadIndex(NOlap::TDbWrapper& idxDB, THashSet<NOlap::TUnifiedBlobId>& lostEvictions) {
- if (PrimaryIndex) {
- if (!PrimaryIndex->Load(idxDB, lostEvictions, PathsToDrop)) {
- return false;
- }
- }
- return true;
-}
-
-void TTablesManager::Clear() {
- Tables.clear();
- SchemaPresets.clear();
- PathsToDrop.clear();
-}
-
-bool TTablesManager::HasTable(const ui64 pathId) const {
- auto it = Tables.find(pathId);
- if (it == Tables.end() || it->second.IsDropped()) {
- return false;
- }
- return true;
-}
-
-bool TTablesManager::IsWritableTable(const ui64 pathId) const {
- return HasTable(pathId);
-}
-
-bool TTablesManager::HasPreset(const ui32 presetId) const {
- return SchemaPresets.contains(presetId);
-}
-
-const TTableInfo& TTablesManager::GetTable(const ui64 pathId) const {
- Y_VERIFY(HasTable(pathId));
- return Tables.at(pathId);
-}
-
-ui64 TTablesManager::GetMemoryUsage() const {
- ui64 memory =
- Tables.size() * sizeof(TTableInfo) +
- PathsToDrop.size() * sizeof(ui64) +
- Ttl.PathsCount() * sizeof(TTtl::TDescription) +
- SchemaPresets.size() * sizeof(TSchemaPreset);
- if (PrimaryIndex) {
- memory += PrimaryIndex->MemoryUsage();
- }
- return memory;
-}
-
-void TTablesManager::OnTtlUpdate() {
- Ttl.Repeat();
-}
-
-void TTablesManager::DropTable(const ui64 pathId, const TRowVersion& version, NIceDb::TNiceDb& db) {
- auto& table = Tables.at(pathId);
- table.SetDropVersion(version);
- PathsToDrop.insert(pathId);
- Ttl.DropPathTtl(pathId);
- Schema::SaveTableDropVersion(db, pathId, version.Step, version.TxId);
-}
-
-void TTablesManager::DropPreset(const ui32 presetId, const TRowVersion& version, NIceDb::TNiceDb& db) {
- auto& preset = SchemaPresets.at(presetId);
- Y_VERIFY(preset.GetName() != "default", "Cannot drop the default preset");
- preset.SetDropVersion(version);
- Schema::SaveSchemaPresetDropVersion(db, presetId, version);
-}
-
-void TTablesManager::RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db) {
- Y_VERIFY(!HasTable(table.GetPathId()));
- Y_VERIFY(table.IsEmpty());
-
- Schema::SaveTableInfo(db, table.GetPathId(), table.GetTieringUsage());
- Tables.insert_or_assign(table.GetPathId(), std::move(table));
-}
-
-bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db) {
- if (SchemaPresets.contains(schemaPreset.GetId())) {
- return false;
- }
- Schema::SaveSchemaPresetInfo(db, schemaPreset.GetId(), schemaPreset.GetName());
- SchemaPresets.insert_or_assign(schemaPreset.GetId(), schemaPreset);
- return true;
-}
-
-void TTablesManager::AddPresetVersion(const ui32 presetId, const TRowVersion& version, const TTableSchema& schema, NIceDb::TNiceDb& db) {
- Y_VERIFY(SchemaPresets.contains(presetId));
- TSchemaPreset::TSchemaPresetVersionInfo versionInfo;
- versionInfo.SetId(presetId);
- versionInfo.SetSinceStep(version.Step);
- versionInfo.SetSinceTxId(version.TxId);
- *versionInfo.MutableSchema() = schema;
-
- auto& schemaPreset = SchemaPresets.at(presetId);
- Schema::SaveSchemaPresetVersionInfo(db, presetId, version, versionInfo);
- schemaPreset.AddVersion(version, versionInfo);
- if (presetId != 0 && versionInfo.HasSchema()) {
- IndexSchemaVersion(version, versionInfo.GetSchema());
- }
-}
-
-void TTablesManager::AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db) {
- auto& table = Tables.at(pathId);
-
- if (table.IsEmpty()) {
- if (versionInfo.HasSchemaPresetId()) {
- Y_VERIFY(SchemaPresets.contains(versionInfo.GetSchemaPresetId()));
- } else {
- Y_VERIFY(SchemaPresets.empty());
- TSchemaPreset preset;
- Y_VERIFY(RegisterSchemaPreset(preset, db));
- AddPresetVersion(preset.GetId(), version, versionInfo.GetSchema(), db);
- }
- }
- Y_VERIFY(SchemaPresets.contains(versionInfo.GetSchemaPresetId()));
-
- if (versionInfo.HasTtlSettings()) {
- const auto& ttlSettings = versionInfo.GetTtlSettings();
- if (ttlSettings.HasEnabled()) {
- Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlSettings.GetEnabled()));
- } else {
- Ttl.DropPathTtl(pathId);
- }
- }
-
- Schema::SaveTableVersionInfo(db, pathId, version, versionInfo);
- table.AddVersion(version, versionInfo);
- if (versionInfo.HasSchema()) {
- IndexSchemaVersion(version, versionInfo.GetSchema());
- }
-}
-
-void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const TTableSchema& schema) {
- NOlap::TSnapshot snapshot{version.Step, version.TxId};
- NOlap::TIndexInfo indexInfo = ConvertSchema(schema);
-
- if (!PrimaryIndex) {
- PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(std::move(indexInfo), TabletId);
- } else {
- PrimaryIndex->UpdateDefaultSchema(snapshot, std::move(indexInfo));
- }
-
- for (auto& columnName : Ttl.TtlColumns()) {
- PrimaryIndex->GetIndexInfo().CheckTtlColumn(columnName);
- }
-}
-
-std::shared_ptr<NOlap::TColumnEngineChanges> TTablesManager::StartIndexCleanup(const NOlap::TSnapshot& snapshot, ui32 maxRecords) {
- Y_VERIFY(PrimaryIndex);
- return PrimaryIndex->StartCleanup(snapshot, PathsToDrop, maxRecords);
-}
-
-NOlap::TIndexInfo TTablesManager::ConvertSchema(const TTableSchema& schema) {
- Y_VERIFY(schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES);
-
- ui32 indexId = 0;
- NOlap::TIndexInfo indexInfo("", indexId);
-
- for (const auto& col : schema.GetColumns()) {
- const ui32 id = col.GetId();
- const TString& name = col.GetName();
- auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(),
- col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr);
- indexInfo.Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod);
- indexInfo.ColumnNames[name] = id;
- }
-
- for (const auto& keyName : schema.GetKeyColumnNames()) {
- Y_VERIFY(indexInfo.ColumnNames.count(keyName));
- indexInfo.KeyColumns.push_back(indexInfo.ColumnNames[keyName]);
- }
-
- if (schema.HasDefaultCompression()) {
- NOlap::TCompression compression = NTiers::ConvertCompression(schema.GetDefaultCompression());
- indexInfo.SetDefaultCompression(compression);
- }
-
- return indexInfo;
-}
-}
diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h
deleted file mode 100644
index 423221195b7..00000000000
--- a/ydb/core/tx/columnshard/tables_manager.h
+++ /dev/null
@@ -1,221 +0,0 @@
-#pragma once
-
-#include "columnshard_schema.h"
-#include "columnshard_ttl.h"
-#include "engines/column_engine.h"
-
-#include "ydb/core/base/row_version.h"
-#include "ydb/library/accessor/accessor.h"
-#include "ydb/core/protos/tx_columnshard.pb.h"
-
-
-namespace NKikimr::NColumnShard {
-
-template<class TSchemaProto>
-class TVersionedSchema {
-protected:
- std::optional<TRowVersion> DropVersion;
- TMap<TRowVersion, TSchemaProto> Versions;
-
-public:
- bool IsDropped() const {
- return DropVersion.has_value();
- }
-
- bool IsEmpty() const {
- return Versions.empty();
- }
-
- void SetDropVersion(const TRowVersion& version) {
- DropVersion = version;
- }
-
- const TMap<TRowVersion, TSchemaProto>& GetVersions() const {
- return Versions;
- }
-
- const TSchemaProto& GetVersion(const TRowVersion& version) const {
- const TSchemaProto* result = nullptr;
- for (auto ver : Versions) {
- if (ver.first > version) {
- break;
- }
- result = &ver.second;
- }
- Y_VERIFY(!!result);
- return *result;
- }
-
- void AddVersion(const TRowVersion& version, const TSchemaProto& versionInfo) {
- Versions[version] = versionInfo;
- }
-};
-
-using TTableSchema = NKikimrSchemeOp::TColumnTableSchema;
-
-class TSchemaPreset : public TVersionedSchema<NKikimrTxColumnShard::TSchemaPresetVersionInfo> {
-public:
- using TSchemaPresetVersionInfo = NKikimrTxColumnShard::TSchemaPresetVersionInfo;
- ui32 Id = 0;
- TString Name;
-public:
- bool IsStandaloneTable() const {
- return Id == 0;
- }
-
- const TString& GetName() const {
- return Name;
- }
-
- ui32 GetId() const {
- return Id;
- }
-
- void Deserialize(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto);
-
- template <class TRow>
- bool InitFromDB(const TRow& rowset) {
- Id = rowset.template GetValue<Schema::SchemaPresetInfo::Id>();
- if (!IsStandaloneTable()) {
- Name = rowset.template GetValue<Schema::SchemaPresetInfo::Name>();
- }
- Y_VERIFY(!Id || Name == "default", "Unsupported preset at load time");
-
- if (rowset.template HaveValue<Schema::SchemaPresetInfo::DropStep>() &&
- rowset.template HaveValue<Schema::SchemaPresetInfo::DropTxId>())
- {
- DropVersion.emplace();
- DropVersion->Step = rowset.template GetValue<Schema::SchemaPresetInfo::DropStep>();
- DropVersion->TxId = rowset.template GetValue<Schema::SchemaPresetInfo::DropTxId>();
- }
- return true;
- }
-};
-
-class TTableInfo : public TVersionedSchema<NKikimrTxColumnShard::TTableVersionInfo> {
-public:
- using TTableVersionInfo = NKikimrTxColumnShard::TTableVersionInfo;
- ui64 PathId;
- TString TieringUsage;
-
-public:
- const TString& GetTieringUsage() const {
- return TieringUsage;
- }
-
- TTableInfo& SetTieringUsage(const TString& data) {
- TieringUsage = data;
- return *this;
- }
-
- ui64 GetPathId() const {
- return PathId;
- }
-
- TTableInfo() = default;
-
- TTableInfo(const ui64 pathId)
- : PathId(pathId)
- {}
-
- template <class TRow>
- bool InitFromDB(const TRow& rowset) {
- PathId = rowset.template GetValue<Schema::TableInfo::PathId>();
- TieringUsage = rowset.template GetValue<Schema::TableInfo::TieringUsage>();
- if (rowset.template HaveValue<Schema::TableInfo::DropStep>() && rowset.template HaveValue<Schema::TableInfo::DropTxId>()) {
- DropVersion.emplace();
- DropVersion->Step = rowset.template GetValue<Schema::TableInfo::DropStep>();
- DropVersion->TxId = rowset.template GetValue<Schema::TableInfo::DropTxId>();
- }
- return true;
- }
-};
-
-class TTablesManager {
-private:
- THashMap<ui64, TTableInfo> Tables;
- THashMap<ui32, TSchemaPreset> SchemaPresets;
- THashSet<ui64> PathsToDrop;
- TTtl Ttl;
- std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex;
- ui64 TabletId;
-public:
- const TTtl& GetTtl() const {
- return Ttl;
- }
-
- void AddTtls(THashMap<ui64, NOlap::TTiering>& eviction, TInstant now, bool force) {
- Ttl.AddTtls(eviction, now, force);
- }
-
- const THashSet<ui64>& GetPathsToDrop() const {
- return PathsToDrop;
- }
-
- const THashMap<ui64, TTableInfo>& GetTables() const {
- return Tables;
- }
-
- const THashMap<ui32, TSchemaPreset>& GetSchemaPresets() const {
- return SchemaPresets;
- }
-
- bool IndexOverloaded() const {
- return PrimaryIndex && PrimaryIndex->HasOverloadedGranules();
- }
-
- bool HasPrimaryIndex() const {
- return !!PrimaryIndex;
- }
-
- NOlap::IColumnEngine& MutablePrimaryIndex() {
- Y_VERIFY(!!PrimaryIndex);
- return *PrimaryIndex;
- }
-
- const NOlap::TIndexInfo& GetIndexInfo(const NOlap::TSnapshot& version = {}) const {
- Y_UNUSED(version);
- Y_VERIFY(!!PrimaryIndex);
- return PrimaryIndex->GetIndexInfo();
- }
-
- const std::unique_ptr<NOlap::IColumnEngine>& GetPrimaryIndex() const {
- return PrimaryIndex;
- }
-
- const NOlap::IColumnEngine& GetPrimaryIndexSafe() const {
- Y_VERIFY(!!PrimaryIndex);
- return *PrimaryIndex;
- }
-
- bool InitFromDB(NIceDb::TNiceDb& db, const ui64 tabletId);
- bool LoadIndex(NOlap::TDbWrapper& db, THashSet<NOlap::TUnifiedBlobId>& lostEvictions);
-
- void Clear();
-
- const TTableInfo& GetTable(const ui64 pathId) const;
- ui64 GetMemoryUsage() const;
-
- bool HasTable(const ui64 pathId) const;
- bool IsWritableTable(const ui64 pathId) const;
- bool HasPreset(const ui32 presetId) const;
-
- void DropTable(const ui64 pathId, const TRowVersion& version, NIceDb::TNiceDb& db);
- void DropPreset(const ui32 presetId, const TRowVersion& version, NIceDb::TNiceDb& db);
-
- void RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db);
- bool RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db);
-
- void AddPresetVersion(const ui32 presetId, const TRowVersion& version, const TTableSchema& schema, NIceDb::TNiceDb& db);
- void AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db);
-
- void OnTtlUpdate();
-
- std::shared_ptr<NOlap::TColumnEngineChanges> StartIndexCleanup(const NOlap::TSnapshot& snapshot, ui32 maxRecords);
-
-private:
- void IndexSchemaVersion(const TRowVersion& version, const TTableSchema& schema);
- static NOlap::TIndexInfo ConvertSchema(const TTableSchema& schema);
-};
-
-}