diff options
author | nsofya <nsofya@yandex-team.com> | 2023-04-05 14:17:40 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-04-05 14:17:40 +0300 |
commit | 1a13012ff58dbd96aab8a178e274cd82dda52b6f (patch) | |
tree | 59f98faa718ab2da8f6fc1af16709fc5aa543a7f | |
parent | 38d49a3490392bec1a4a5dfddf4efce5fd9a4463 (diff) | |
download | ydb-1a13012ff58dbd96aab8a178e274cd82dda52b6f.tar.gz |
Group table schema processing in one class
Первый этап подготовки к модификации схем: группировка всех объектов, работающих с их состоянием внутри одного менеджера.
1. Объединила в одном классе работу с Tables + SchemaPreset + Ttl
2. В него же перенесла работу с PrimaryIndex, чтобы нативно выполнять все проверки (он не сразу строится, поэтому они не лишние) в нужные моменты вызывать функции, обновляющие сам PrimaryIndex. Пока не достаточно погрузилась в то, как правильно восстанавливать схемы для arrow из этого индекса, но это будет следующим шагом.
3. Местами не очень красиво выходит наружу MutablePrimaryIndex. В перспективе я бы всю работу с ним хотела спрятать в TablesManager. но пока унесла внутрь только те части, которые касаются обновления схемы, иначе погрязну. Вернусь к этому как буду больше понимать в том, что происходит.
мои ожидания, что этот PR ничего не меняет в текущем поведении, но закладывает механизм будущего обновления схем, который надо реализовать уже внутри PrimaryIndex
17 files changed, 680 insertions, 431 deletions
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt index a81d192131f..28261168eb2 100644 --- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt @@ -74,6 +74,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp ) generate_enum_serilization(core-tx-columnshard ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt index 77e8b4de116..66d5ee15570 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt @@ -75,6 +75,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp ) generate_enum_serilization(core-tx-columnshard ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt index 77e8b4de116..66d5ee15570 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt @@ -75,6 +75,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp ) generate_enum_serilization(core-tx-columnshard ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt index a81d192131f..28261168eb2 100644 --- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt @@ -74,6 +74,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp ) generate_enum_serilization(core-tx-columnshard ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 4fa8d2fd8fd..20033b28c87 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -29,8 +29,8 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) { IndexingActor = ctx.Register(CreateIndexingActor(TabletID(), ctx.SelfID)); CompactionActor = ctx.Register(CreateCompactionActor(TabletID(), ctx.SelfID)); EvictionActor = ctx.Register(CreateEvictionActor(TabletID(), ctx.SelfID)); - for (auto&& i : Tables) { - ActivateTiering(i.first, i.second.TieringUsage); + for (auto&& i : TablesManager.GetTables()) { + ActivateTiering(i.first, i.second.GetTieringUsage()); } SignalTabletActive(ctx); } @@ -193,11 +193,11 @@ void TColumnShard::UpdateInsertTableCounters() { } void TColumnShard::UpdateIndexCounters() { - if (!PrimaryIndex) { + if (!TablesManager.HasPrimaryIndex()) { return; } - auto& stats = PrimaryIndex->GetTotalStats(); + auto& stats = TablesManager.MutablePrimaryIndex().GetTotalStats(); SetCounter(COUNTER_INDEX_TABLES, stats.Tables); SetCounter(COUNTER_INDEX_GRANULES, stats.Granules); SetCounter(COUNTER_INDEX_EMPTY_GRANULES, stats.EmptyGranules); @@ -243,10 +243,6 @@ void TColumnShard::UpdateIndexCounters() { ui64 TColumnShard::MemoryUsage() const { ui64 memory = - Tables.size() * sizeof(TTableInfo) + - PathsToDrop.size() * sizeof(ui64) + - Ttl.PathsCount() * sizeof(TTtl::TDescription) + - SchemaPresets.size() * sizeof(TSchemaPreset) + BasicTxInfo.size() * sizeof(TBasicTxInfo) + DeadlineQueue.size() * sizeof(TDeadlineQueueItem) + (PlanQueue.size() + RunningQueue.size()) * sizeof(TPlanQueueItem) + @@ -258,9 +254,7 @@ ui64 TColumnShard::MemoryUsage() const { (WaitingReads.size() + WaitingScans.size()) * (sizeof(TRowVersion) + sizeof(void*)) + TabletCounters->Simple()[COUNTER_PREPARED_RECORDS].Get() * sizeof(NOlap::TInsertedData) + TabletCounters->Simple()[COUNTER_COMMITTED_RECORDS].Get() * sizeof(NOlap::TInsertedData); - if (PrimaryIndex) { - memory += PrimaryIndex->MemoryUsage(); - } + memory += TablesManager.GetMemoryUsage(); memory += BatchCache.Bytes(); return memory; } @@ -329,9 +323,9 @@ void TColumnShard::SendPeriodicStats() { tabletStats->SetTxRejectedBySpace(TabletCounters->Cumulative()[COUNTER_OUT_OF_SPACE].Get()); tabletStats->SetInFlightTxCount(Executor()->GetStats().TxInFly); - if (PrimaryIndex) { - const auto& indexStats = PrimaryIndex->GetTotalStats(); - NOlap::TSnapshot lastIndexUpdate = PrimaryIndex->LastUpdate(); + if (TablesManager.HasPrimaryIndex()) { + const auto& indexStats = TablesManager.MutablePrimaryIndex().GetTotalStats(); + NOlap::TSnapshot lastIndexUpdate = TablesManager.GetPrimaryIndexSafe().LastUpdate(); auto activeIndexStats = indexStats.Active(); // data stats excluding inactive and evicted tabletStats->SetRowCount(activeIndexStats.Rows); diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index 23a25c8d83d..d3cc84cf09f 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -41,8 +41,7 @@ void TTxInit::SetDefaults() { Self->PlanQueue.clear(); Self->AltersInFlight.clear(); Self->CommitsInFlight.clear(); - Self->SchemaPresets.clear(); - Self->Tables.clear(); + Self->TablesManager.Clear(); Self->LongTxWrites.clear(); Self->LongTxWritesByUniqueId.clear(); } @@ -137,132 +136,10 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) } } - // Primary index defaut schema and TTL (both are versioned) - TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaPreset; - TMap<NOlap::TSnapshot, NOlap::TIndexInfo> commonSchema; - THashMap<ui64, TMap<TRowVersion, TTtl::TDescription>> ttls; - - { // Load schema presets - auto rowset = db.Table<Schema::SchemaPresetInfo>().Select(); - if (!rowset.IsReady()) - return false; - - while (!rowset.EndOfSet()) { - const ui32 id = rowset.GetValue<Schema::SchemaPresetInfo::Id>(); - auto& preset = Self->SchemaPresets[id]; - preset.Id = id; - if (id) { - preset.Name = rowset.GetValue<Schema::SchemaPresetInfo::Name>(); - } - Y_VERIFY(!id || preset.Name == "default", "Unsupported preset at load time"); - - if (rowset.HaveValue<Schema::SchemaPresetInfo::DropStep>() && - rowset.HaveValue<Schema::SchemaPresetInfo::DropTxId>()) - { - preset.DropVersion.Step = rowset.GetValue<Schema::SchemaPresetInfo::DropStep>(); - preset.DropVersion.TxId = rowset.GetValue<Schema::SchemaPresetInfo::DropTxId>(); - } - - if (!rowset.Next()) - return false; - } - } - - { // Load schema preset versions - auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select(); - if (!rowset.IsReady()) - return false; - - while (!rowset.EndOfSet()) { - const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>(); - Y_VERIFY(Self->SchemaPresets.contains(id)); - auto& preset = Self->SchemaPresets.at(id); - TRowVersion version( - rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(), - rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>()); - auto& info = preset.Versions[version]; - Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>())); - - NOlap::TSnapshot snap{version.Step, version.TxId}; - if (!id) { - commonSchema.emplace(snap, Self->ConvertSchema(info.GetSchema())); - } else if (preset.Name == "default") { - schemaPreset.emplace(snap, Self->ConvertSchema(info.GetSchema())); - } - - if (!rowset.Next()) - return false; - } - } - - { // Load tables - auto rowset = db.Table<Schema::TableInfo>().Select(); - if (!rowset.IsReady()) - return false; - - while (!rowset.EndOfSet()) { - const ui64 pathId = rowset.GetValue<Schema::TableInfo::PathId>(); - auto& table = Self->Tables[pathId]; - table.PathId = pathId; - table.TieringUsage = rowset.GetValue<Schema::TableInfo::TieringUsage>(); - if (rowset.HaveValue<Schema::TableInfo::DropStep>() && - rowset.HaveValue<Schema::TableInfo::DropTxId>()) - { - table.DropVersion.Step = rowset.GetValue<Schema::TableInfo::DropStep>(); - table.DropVersion.TxId = rowset.GetValue<Schema::TableInfo::DropTxId>(); - Self->PathsToDrop.insert(pathId); - } - - if (!rowset.Next()) - return false; - } - } - - { // Load table versions - auto rowset = db.Table<Schema::TableVersionInfo>().Select(); - if (!rowset.IsReady()) - return false; - - while (!rowset.EndOfSet()) { - const ui64 pathId = rowset.GetValue<Schema::TableVersionInfo::PathId>(); - Y_VERIFY(Self->Tables.contains(pathId)); - auto& table = Self->Tables.at(pathId); - TRowVersion version( - rowset.GetValue<Schema::TableVersionInfo::SinceStep>(), - rowset.GetValue<Schema::TableVersionInfo::SinceTxId>()); - auto& info = table.Versions[version]; - Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>())); - - if (!Self->PathsToDrop.count(pathId)) { - auto& ttlSettings = info.GetTtlSettings(); - if (ttlSettings.HasEnabled()) { - ttls[pathId].emplace(version, TTtl::TDescription(ttlSettings.GetEnabled())); - } - } - - if (!rowset.Next()) - return false; - } - } - - for (auto& [pathId, map] : ttls) { - auto& description = map.rbegin()->second; // last version if many - Self->Ttl.SetPathTtl(pathId, std::move(description)); - } - - Self->SetCounter(COUNTER_TABLES, Self->Tables.size()); - Self->SetCounter(COUNTER_TABLE_PRESETS, Self->SchemaPresets.size()); - Self->SetCounter(COUNTER_TABLE_TTLS, ttls.size()); - - if (!schemaPreset.empty()) { - Y_VERIFY(commonSchema.empty(), "Mix of schema preset and common schema"); - Self->SetPrimaryIndex(std::move(schemaPreset)); - } else if (!commonSchema.empty()) { - Self->SetPrimaryIndex(std::move(commonSchema)); - } else { - Y_VERIFY(Self->Tables.empty()); - Y_VERIFY(Self->SchemaPresets.empty()); - } + Self->TablesManager.InitFromDB(db, Self->TabletID()); + Self->SetCounter(COUNTER_TABLES, Self->TablesManager.GetTables().size()); + Self->SetCounter(COUNTER_TABLE_PRESETS, Self->TablesManager.GetSchemaPresets().size()); + Self->SetCounter(COUNTER_TABLE_TTLS, Self->TablesManager.GetTtl().PathsCount()); { // Load long tx writes auto rowset = db.Table<Schema::LongTxWrites>().Select(); @@ -310,13 +187,8 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) } } - // Load primary index - if (Self->PrimaryIndex) { - TBlobGroupSelector dsGroupSelector(Self->Info()); - NOlap::TDbWrapper idxDB(txc.DB, &dsGroupSelector); - if (!Self->PrimaryIndex->Load(idxDB, lostEvictions, Self->PathsToDrop)) { - return false; - } + if (!Self->TablesManager.LoadIndex(dbTable, lostEvictions)) { + return false; } // Set dropped evicting records to be erased in future cleanups diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index beaa2f7b770..2ab3ab2a8e8 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -142,8 +142,7 @@ public: } auto pathExists = [&](ui64 pathId) { - auto it = Self->Tables.find(pathId); - return it != Self->Tables.end() && !it->second.IsDropped(); + return Self->TablesManager.HasTable(pathId); }; auto counters = Self->InsertTable->Commit(dbTable, step, txId, meta.MetaShard, meta.WriteIds, diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index de3bf81635b..c6a9274b20c 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -276,13 +276,13 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex break; } - if (!Self->PrimaryIndex) { + if (!Self->TablesManager.HasPrimaryIndex()) { statusMessage = "No primary index for TTL"; status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; break; } - auto schema = Self->PrimaryIndex->GetIndexInfo().ArrowSchema(); + auto schema = Self->TablesManager.GetIndexInfo().ArrowSchema(); auto ttlColumn = schema->GetFieldByName(columnName); if (!ttlColumn) { statusMessage = "TTL tx wrong TTL column '" + columnName + "'"; diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp index 8fe9ba771a8..3857498fe19 100644 --- a/ydb/core/tx/columnshard/columnshard__read.cpp +++ b/ydb/core/tx/columnshard/columnshard__read.cpp @@ -38,13 +38,13 @@ private: bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) { Y_VERIFY(Ev); - Y_VERIFY(Self->PrimaryIndex); + Y_VERIFY(Self->TablesManager.HasPrimaryIndex()); Y_UNUSED(txc); LOG_S_DEBUG("TTxRead.Execute at tablet " << Self->TabletID()); txc.DB.NoMoreReadsForTx(); - const NOlap::TIndexInfo& indexInfo = Self->PrimaryIndex->GetIndexInfo(); + const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo(); auto& record = Proto(Ev->Get()); ui64 metaShard = record.GetTxInitiator(); @@ -53,7 +53,7 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) { read.PlanStep = record.GetPlanStep(); read.TxId = record.GetTxId(); read.PathId = record.GetTableId(); - read.ReadNothing = Self->PathsToDrop.count(read.PathId); + read.ReadNothing = !(Self->TablesManager.HasTable(read.PathId)); read.ColumnIds = ProtoToVector<ui32>(record.GetColumnIds()); read.ColumnNames = ProtoToVector<TString>(record.GetColumnNames()); if (read.ColumnIds.empty() && read.ColumnNames.empty()) { @@ -75,11 +75,11 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) { } bool parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read, - TIndexColumnResolver(Self->PrimaryIndex->GetIndexInfo())); + TIndexColumnResolver(Self->TablesManager.GetIndexInfo())); std::shared_ptr<NOlap::TReadMetadata> metadata; if (parseResult) { - metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, Self->BatchCache, + metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(), Self->BatchCache, ErrorDescription); } diff --git a/ydb/core/tx/columnshard/columnshard__read_base.cpp b/ydb/core/tx/columnshard/columnshard__read_base.cpp index 53b659cfd45..63c8239a6e7 100644 --- a/ydb/core/tx/columnshard/columnshard__read_base.cpp +++ b/ydb/core/tx/columnshard/columnshard__read_base.cpp @@ -192,7 +192,7 @@ bool TTxReadBase::ParseProgram(const TActorContext& ctx, NKikimrSchemeOp::EOlapP ErrorDescription = TStringBuilder() << "Wrong olap program"; return false; } - if (!ssaProgram->Steps.empty() && Self->PrimaryIndex) { + if (!ssaProgram->Steps.empty() && Self->TablesManager.HasPrimaryIndex()) { NSsa::OptimizeProgram(*ssaProgram); } diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index da8da9ee25e..8c17c5b83b9 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -842,9 +842,9 @@ std::shared_ptr<NOlap::TReadMetadataBase> TTxScan::CreateReadMetadata(const TAct { std::shared_ptr<NOlap::TReadMetadataBase> metadata; if (indexStats) { - metadata = PrepareStatsReadMetadata(Self->TabletID(), read, Self->PrimaryIndex, ErrorDescription); + metadata = PrepareStatsReadMetadata(Self->TabletID(), read, Self->TablesManager.GetPrimaryIndex(), ErrorDescription); } else { - metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, Self->BatchCache, + metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(), Self->BatchCache, ErrorDescription); } @@ -880,7 +880,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { read.PlanStep = snapshot.GetStep(); read.TxId = snapshot.GetTxId(); read.PathId = record.GetLocalPathId(); - read.ReadNothing = Self->PathsToDrop.count(read.PathId); + read.ReadNothing = !(Self->TablesManager.HasTable(read.PathId)); read.TableName = record.GetTablePath(); bool isIndexStats = read.TableName.EndsWith(NOlap::TIndexInfo::STORE_INDEX_STATS_TABLE) || read.TableName.EndsWith(NOlap::TIndexInfo::TABLE_INDEX_STATS_TABLE); @@ -891,7 +891,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { // "SELECT COUNT(*)" requests empty column list but we need non-empty list for PrepareReadMetadata. // So we add first PK column to the request. if (!isIndexStats) { - read.ColumnIds.push_back(Self->PrimaryIndex->GetIndexInfo().GetPKFirstColumnId()); + read.ColumnIds.push_back(Self->TablesManager.GetIndexInfo().GetPKFirstColumnId()); } else { read.ColumnIds.push_back(PrimaryIndexStatsSchema.KeyColumns.front()); } @@ -900,7 +900,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { bool parseResult; if (!isIndexStats) { - TIndexColumnResolver columnResolver(Self->PrimaryIndex->GetIndexInfo()); + TIndexColumnResolver columnResolver(Self->TablesManager.GetIndexInfo()); parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read, columnResolver); } else { TStatsColumnResolver columnResolver; @@ -926,7 +926,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { auto ydbKey = isIndexStats ? NOlap::GetColumns(PrimaryIndexStatsSchema, PrimaryIndexStatsSchema.KeyColumns) : - Self->PrimaryIndex->GetIndexInfo().GetPrimaryKey(); + Self->TablesManager.GetIndexInfo().GetPrimaryKey(); for (auto& range: record.GetRanges()) { FillPredicatesFromRange(read, range, ydbKey, Self->TabletID()); @@ -1023,7 +1023,7 @@ void TTxScan::Complete(const TActorContext& ctx) { if (request.GetReverse()) { std::reverse(rMetadataRanges.begin(), rMetadataRanges.end()); } - NOlap::NCosts::TKeyRangesBuilder krBuilder(Self->PrimaryIndex->GetIndexInfo()); + NOlap::NCosts::TKeyRangesBuilder krBuilder(Self->TablesManager.GetIndexInfo()); { ui32 recordsCount = 0; for (auto&& i : rMetadataRanges) { diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 7d42fb25c5e..e27ca500ac5 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -49,7 +49,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { Y_VERIFY(logoBlobId.IsValid()); bool ok = false; - if (!Self->PrimaryIndex || !Self->IsTableWritable(tableId)) { + if (!Self->TablesManager.HasPrimaryIndex() || !Self->TablesManager.IsWritableTable(tableId)) { status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; } else { if (record.HasLongTxId()) { @@ -138,15 +138,15 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex TString dedupId = record.GetDedupId(); auto putStatus = ev->Get()->PutStatus; - bool isWritable = IsTableWritable(tableId); - bool error = data.empty() || data.size() > TLimits::GetMaxBlobSize() || !PrimaryIndex || !isWritable; + bool isWritable = TablesManager.IsWritableTable(tableId); + bool error = data.empty() || data.size() > TLimits::GetMaxBlobSize() || !TablesManager.HasPrimaryIndex() || !isWritable; bool errorReturned = (putStatus != NKikimrProto::OK) && (putStatus != NKikimrProto::UNKNOWN); bool isOutOfSpace = IsAnyChannelYellowStop(); if (error || errorReturned) { LOG_S_NOTICE("Write (fail) " << data.size() << " bytes into pathId " << tableId << ", status " << putStatus - << (PrimaryIndex? "": ", no index") << (isWritable? "": ", ro") + << (TablesManager.HasPrimaryIndex()? "": ", no index") << (isWritable? "": ", ro") << " at tablet " << TabletID()); IncCounter(COUNTER_WRITE_FAIL); @@ -221,7 +221,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize; ++WritesInFly; // write started - ctx.Register(CreateWriteActor(TabletID(), PrimaryIndex->GetIndexInfo(), ctx.SelfID, + ctx.Register(CreateWriteActor(TabletID(), TablesManager.GetIndexInfo(), ctx.SelfID, BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release())); } diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 61149c87fa4..8ddfd4e2965 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -43,7 +43,7 @@ private: bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) { Y_VERIFY(Ev); Y_VERIFY(Self->InsertTable); - Y_VERIFY(Self->PrimaryIndex); + Y_VERIFY(Self->TablesManager.HasPrimaryIndex()); txc.DB.NoMoreReadsForTx(); @@ -66,7 +66,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) TBlobGroupSelector dsGroupSelector(Self->Info()); NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector); - ok = Self->PrimaryIndex->ApplyChanges(dbWrap, changes, snapshot); // update changes + apply + ok = Self->TablesManager.MutablePrimaryIndex().ApplyChanges(dbWrap, changes, snapshot); // update changes + apply if (ok) { LOG_S_DEBUG("TTxWriteIndex (" << changes->TypeString() << ") apply at tablet " << Self->TabletID()); @@ -248,7 +248,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) Schema::SaveSpecialValue(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo); } - Self->PrimaryIndex->FreeLocks(changes); + Self->TablesManager.MutablePrimaryIndex().FreeLocks(changes); if (changes->IsInsert()) { Self->ActiveIndexingOrCompaction = false; diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 41f3d952b3e..5db1a69b302 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -395,46 +395,6 @@ void TColumnShard::ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo& } } -bool TColumnShard::IsTableWritable(ui64 tableId) const { - auto it = Tables.find(tableId); - if (it == Tables.end()) { - return false; - } - return !it->second.IsDropped(); -} - -ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, ui32 presetId, const TString& name, - const NKikimrSchemeOp::TColumnTableSchema& schemaProto, - const TRowVersion& version) { - if (!SchemaPresets.contains(presetId)) { - LOG_S_DEBUG("EnsureSchemaPreset " << presetId << " at tablet " << TabletID()); - - auto& preset = SchemaPresets[presetId]; - preset.Id = presetId; - preset.Name = name; - auto& info = preset.Versions[version]; - info.SetId(preset.Id); - info.SetSinceStep(version.Step); - info.SetSinceTxId(version.TxId); - *info.MutableSchema() = schemaProto; - - Schema::SaveSchemaPresetInfo(db, preset.Id, preset.Name); - Schema::SaveSchemaPresetVersionInfo(db, preset.Id, version, info); - SetCounter(COUNTER_TABLE_PRESETS, SchemaPresets.size()); - } else { - LOG_S_DEBUG("EnsureSchemaPreset for existed preset " << presetId << " at tablet " << TabletID()); - } - - return presetId; -} - -ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, - const TRowVersion& version) { - Y_VERIFY(presetProto.GetName() == "default", "Only schema preset named 'default' is supported"); - - return EnsureSchemaPreset(db, presetProto.GetId(), presetProto.GetName(), presetProto.GetSchema(), version); -} - void TColumnShard::RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc) { switch (body.TxBody_case()) { @@ -494,70 +454,53 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl NIceDb::TNiceDb db(txc.DB); const ui64 pathId = tableProto.GetPathId(); - if (!Tables.contains(pathId)) { - LOG_S_DEBUG("EnsureTable for pathId: " << pathId - << " ttl settings: " << tableProto.GetTtlSettings() - << " at tablet " << TabletID()); - - ui32 schemaPresetId = 0; - if (tableProto.HasSchemaPreset()) { - Y_VERIFY(!tableProto.HasSchema(), "Tables has either schema or preset"); + if (TablesManager.HasTable(pathId)) { + LOG_S_DEBUG("EnsureTable for existed pathId: " << pathId << " at tablet " << TabletID()); + return; + } - schemaPresetId = EnsureSchemaPreset(db, tableProto.GetSchemaPreset(), version); - Y_VERIFY(schemaPresetId); - } else { - Y_VERIFY(tableProto.HasSchema(), "Tables has either schema or preset"); + LOG_S_DEBUG("EnsureTable for pathId: " << pathId + << " ttl settings: " << tableProto.GetTtlSettings() + << " at tablet " << TabletID()); - // Save first table schema as common one with schemaPresetId == 0 + TTableInfo::TTableVersionInfo tableVerProto; + tableVerProto.SetPathId(pathId); - if (SchemaPresets.count(0)) { - LOG_S_WARN("Colocated standalone tables are not supported. " - << "EnsureTable failed at tablet " << TabletID()); - return; - } + if (tableProto.HasSchemaPreset()) { + Y_VERIFY(!tableProto.HasSchema(), "Tables has either schema or preset"); - schemaPresetId = EnsureSchemaPreset(db, 0, "", tableProto.GetSchema(), version); - Y_VERIFY(!schemaPresetId); + TSchemaPreset preset; + preset.Deserialize(tableProto.GetSchemaPreset()); + Y_VERIFY(!preset.IsStandaloneTable()); + tableVerProto.SetSchemaPresetId(preset.GetId()); + + if (TablesManager.RegisterSchemaPreset(preset, db)) { + TablesManager.AddPresetVersion(tableProto.GetSchemaPreset().GetId(), version, tableProto.GetSchemaPreset().GetSchema(), db); } + } else { + Y_VERIFY(tableProto.HasSchema(), "Tables has either schema or preset"); + *tableVerProto.MutableSchema() = tableProto.GetSchema(); + } - auto& table = Tables[pathId]; - table.PathId = pathId; - auto& tableVerProto = table.Versions[version]; - tableVerProto.SetPathId(pathId); - tableVerProto.SetSchemaPresetId(schemaPresetId); - - if (tableProto.HasTtlSettings()) { - *tableVerProto.MutableTtlSettings() = tableProto.GetTtlSettings(); - auto& ttlInfo = tableProto.GetTtlSettings(); - if (ttlInfo.HasEnabled()) { - Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlInfo.GetEnabled())); - SetCounter(COUNTER_TABLE_TTLS, Ttl.PathsCount()); - } - if (ttlInfo.HasUseTiering()) { - table.TieringUsage = ttlInfo.GetUseTiering(); - ActivateTiering(pathId, table.TieringUsage); - } + TTableInfo table(pathId); + if (tableProto.HasTtlSettings()) { + const auto& ttlSettings = tableProto.GetTtlSettings(); + *tableVerProto.MutableTtlSettings() = ttlSettings; + if (ttlSettings.HasUseTiering()) { + table.SetTieringUsage(ttlSettings.GetUseTiering()); + ActivateTiering(pathId, table.GetTieringUsage()); } + } - if (!PrimaryIndex) { - TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaHistory; - - auto& schemaPresetVerProto = SchemaPresets[schemaPresetId].Versions[version]; - schemaHistory.emplace(NOlap::TSnapshot{version.Step, version.TxId}, - ConvertSchema(schemaPresetVerProto.GetSchema())); - - SetPrimaryIndex(std::move(schemaHistory)); - } + tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj()); + tableVerProto.SetTtlSettingsPresetVersionAdj(tableProto.GetTtlSettingsPresetVersionAdj()); - tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj()); - tableVerProto.SetTtlSettingsPresetVersionAdj(tableProto.GetTtlSettingsPresetVersionAdj()); + TablesManager.RegisterTable(std::move(table), db); + TablesManager.AddTableVersion(pathId, version, tableVerProto, db); - Schema::SaveTableInfo(db, table.PathId, table.TieringUsage); - Schema::SaveTableVersionInfo(db, table.PathId, version, tableVerProto); - SetCounter(COUNTER_TABLES, Tables.size()); - } else { - LOG_S_DEBUG("EnsureTable for existed pathId: " << pathId << " at tablet " << TabletID()); - } + SetCounter(COUNTER_TABLES, TablesManager.GetTables().size()); + SetCounter(COUNTER_TABLE_PRESETS, TablesManager.GetSchemaPresets().size()); + SetCounter(COUNTER_TABLE_TTLS, TablesManager.GetTtl().PathsCount()); } void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterProto, const TRowVersion& version, @@ -565,39 +508,33 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP NIceDb::TNiceDb db(txc.DB); const ui64 pathId = alterProto.GetPathId(); - auto* tablePtr = Tables.FindPtr(pathId); - Y_VERIFY(tablePtr && !tablePtr->IsDropped(), "AlterTable on a dropped or non-existent table"); - auto& table = *tablePtr; - auto& ttlSettings = alterProto.GetTtlSettings(); - + Y_VERIFY(TablesManager.HasTable(pathId), "AlterTable on a dropped or non-existent table"); + LOG_S_DEBUG("AlterTable for pathId: " << pathId << " schema: " << alterProto.GetSchema() - << " ttl settings: " << ttlSettings + << " ttl settings: " << alterProto.GetTtlSettings() << " at tablet " << TabletID()); - auto& info = table.Versions[version]; - + TTableInfo::TTableVersionInfo tableVerProto; if (alterProto.HasSchemaPreset()) { - info.SetSchemaPresetId(EnsureSchemaPreset(db, alterProto.GetSchemaPreset(), version)); + tableVerProto.SetSchemaPresetId(alterProto.GetSchemaPreset().GetId()); + TablesManager.AddPresetVersion(alterProto.GetSchemaPreset().GetId(), version, alterProto.GetSchemaPreset().GetSchema(), db); + } else if (alterProto.HasSchema()) { + *tableVerProto.MutableSchema() = alterProto.GetSchema(); } + const auto& ttlSettings = alterProto.GetTtlSettings(); // Note: Not valid behaviour for full alter implementation const TString& tieringUsage = ttlSettings.GetUseTiering(); - ActivateTiering(pathId, tieringUsage); if (alterProto.HasTtlSettings()) { - *info.MutableTtlSettings() = ttlSettings; - if (ttlSettings.HasEnabled()) { - Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlSettings.GetEnabled())); - } else { - Ttl.DropPathTtl(pathId); - } - } else { - Ttl.DropPathTtl(pathId); + const auto& ttlSettings = alterProto.GetTtlSettings(); + *tableVerProto.MutableTtlSettings() = ttlSettings; } - Ttl.Repeat(); // Atler TTL triggers TTL activity + ActivateTiering(pathId, tieringUsage); + Schema::SaveTableInfo(db, pathId, tieringUsage); - info.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj()); - Schema::SaveTableInfo(db, table.PathId, tieringUsage); - Schema::SaveTableVersionInfo(db, table.PathId, version, info); + tableVerProto.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj()); + TablesManager.AddTableVersion(pathId, version, tableVerProto, db); + TablesManager.OnTtlUpdate(); } void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const TRowVersion& version, @@ -605,25 +542,20 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt NIceDb::TNiceDb db(txc.DB); const ui64 pathId = dropProto.GetPathId(); - auto* table = Tables.FindPtr(pathId); - Y_VERIFY_DEBUG(table && !table->IsDropped()); - if (table && !table->IsDropped()) { - LOG_S_DEBUG("DropTable for pathId: " << pathId << " at tablet " << TabletID()); - - PathsToDrop.insert(pathId); - Ttl.DropPathTtl(pathId); - - // TODO: Allow to read old snapshots after DROP - TBlobGroupSelector dsGroupSelector(Info()); - NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - THashSet<TWriteId> writesToAbort = InsertTable->DropPath(dbTable, pathId); - TryAbortWrites(db, dbTable, std::move(writesToAbort)); - - table->DropVersion = version; - Schema::SaveTableDropVersion(db, pathId, version.Step, version.TxId); - } else { + if (!TablesManager.HasTable(pathId)) { LOG_S_DEBUG("DropTable for unknown or deleted pathId: " << pathId << " at tablet " << TabletID()); + return; } + + LOG_S_DEBUG("DropTable for pathId: " << pathId << " at tablet " << TabletID()); + TablesManager.DropTable(pathId, version, db); + + // TODO: Allow to read old snapshots after DROP + TBlobGroupSelector dsGroupSelector(Info()); + NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); + THashSet<TWriteId> writesToAbort = InsertTable->DropPath(dbTable, pathId); + + TryAbortWrites(db, dbTable, std::move(writesToAbort)); } void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, const TRowVersion& version, @@ -635,54 +567,18 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId); } - TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaHistory; - for (ui32 id : proto.GetDroppedSchemaPresets()) { - if (!SchemaPresets.contains(id)) { + if (!TablesManager.HasPreset(id)) { continue; } - auto& preset = SchemaPresets.at(id); - Y_VERIFY(preset.Name != "default", "Cannot drop the default preset"); - preset.DropVersion = version; - Schema::SaveSchemaPresetDropVersion(db, id, version); + TablesManager.DropPreset(id, version, db); } for (const auto& presetProto : proto.GetSchemaPresets()) { - if (!SchemaPresets.contains(presetProto.GetId())) { + if (!TablesManager.HasPreset(presetProto.GetId())) { continue; // we don't update presets that we don't use } - - auto& preset = SchemaPresets[presetProto.GetId()]; - auto& info = preset.Versions[version]; - info.SetId(preset.Id); - info.SetSinceStep(version.Step); - info.SetSinceTxId(version.TxId); - *info.MutableSchema() = presetProto.GetSchema(); - - if (preset.Name == "default") { - schemaHistory.emplace(NOlap::TSnapshot{version.Step, version.TxId}, ConvertSchema(info.GetSchema())); - } - - Schema::SaveSchemaPresetVersionInfo(db, preset.Id, version, info); - } - - if (!schemaHistory.empty()) { - SetPrimaryIndex(std::move(schemaHistory)); - } -} - -void TColumnShard::SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions) { - for (auto& [snap, indexInfo] : schemaVersions) { - if (!PrimaryIndex) { - PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(std::move(indexInfo), TabletID()); - SetCounter(COUNTER_INDEXES, 1); - } else { - PrimaryIndex->UpdateDefaultSchema(snap, std::move(indexInfo)); - } - } - - for (auto& columnName : Ttl.TtlColumns()) { - PrimaryIndex->GetIndexInfo().CheckTtlColumn(columnName); + TablesManager.AddPresetVersion(presetProto.GetId(), version, presetProto.GetSchema(), db); } } @@ -764,7 +660,7 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() { LOG_S_DEBUG("Indexing/compaction already in progress at tablet " << TabletID()); return {}; } - if (!PrimaryIndex) { + if (!TablesManager.HasPrimaryIndex()) { LOG_S_NOTICE("Indexing not started: no index at tablet " << TabletID()); return {}; } @@ -785,7 +681,7 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() { if (bytesToIndex && (bytesToIndex + dataSize) > (ui64)Limits.MaxInsertBytes) { continue; } - if (auto* pMap = PrimaryIndex->GetOverloadedGranules(data.PathId)) { + if (auto* pMap = TablesManager.GetPrimaryIndexSafe().GetOverloadedGranules(data.PathId)) { overloadedPathGranules[pathId] = pMap->size(); InsertTable->SetOverloaded(data.PathId, true); ++ignored; @@ -833,13 +729,13 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() { } Y_VERIFY(data.size()); - auto indexChanges = PrimaryIndex->StartInsert(std::move(data)); + auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(std::move(data)); if (!indexChanges) { LOG_S_NOTICE("Cannot prepare indexing at tablet " << TabletID()); return {}; } - auto actualIndexInfo = PrimaryIndex->GetIndexInfo(); + auto actualIndexInfo = TablesManager.GetIndexInfo(); if (Tiers) { auto pathTiering = Tiers->GetTiering(); // TODO: pathIds actualIndexInfo.UpdatePathTiering(pathTiering); @@ -857,13 +753,13 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() { LOG_S_DEBUG("Compaction/indexing already in progress at tablet " << TabletID()); return {}; } - if (!PrimaryIndex) { + if (!TablesManager.HasPrimaryIndex()) { LOG_S_NOTICE("Compaction not started: no index at tablet " << TabletID()); return {}; } - PrimaryIndex->UpdateCompactionLimits(CompactionLimits.Get()); - auto compactionInfo = PrimaryIndex->Compact(LastCompactedGranule); + TablesManager.MutablePrimaryIndex().UpdateCompactionLimits(CompactionLimits.Get()); + auto compactionInfo = TablesManager.MutablePrimaryIndex().Compact(LastCompactedGranule); if (!compactionInfo || compactionInfo->Empty()) { LOG_S_DEBUG("Compaction not started: no portions to compact at tablet " << TabletID()); return {}; @@ -874,13 +770,13 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() { LOG_S_DEBUG("Prepare " << *compactionInfo << " at tablet " << TabletID()); ui64 ourdatedStep = GetOutdatedStep(); - auto indexChanges = PrimaryIndex->StartCompaction(std::move(compactionInfo), {ourdatedStep, 0}); + auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(std::move(compactionInfo), {ourdatedStep, 0}); if (!indexChanges) { LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID()); return {}; } - auto actualIndexInfo = PrimaryIndex->GetIndexInfo(); + auto actualIndexInfo = TablesManager.GetIndexInfo(); if (Tiers) { auto pathTiering = Tiers->GetTiering(); // TODO: pathIds actualIndexInfo.UpdatePathTiering(pathTiering); @@ -903,7 +799,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u LOG_S_DEBUG("Do not start TTL while eviction is in progress at tablet " << TabletID()); return {}; } - if (!PrimaryIndex) { + if (!TablesManager.HasPrimaryIndex()) { LOG_S_NOTICE("TTL not started. No index for TTL at tablet " << TabletID()); return {}; } @@ -913,7 +809,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u if (Tiers) { eviction = Tiers->GetTiering(); // TODO: pathIds } - Ttl.AddTtls(eviction, TInstant::Now(), force); + TablesManager.AddTtls(eviction, TInstant::Now(), force); } if (eviction.empty()) { @@ -927,11 +823,11 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u LOG_S_DEBUG("Evicting path " << i.first << " with " << i.second.GetDebugString() << " at tablet " << TabletID()); } - auto actualIndexInfo = PrimaryIndex->GetIndexInfo(); + auto actualIndexInfo = TablesManager.GetIndexInfo(); actualIndexInfo.UpdatePathTiering(eviction); std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges; - indexChanges = PrimaryIndex->StartTtl(eviction); + indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction); actualIndexInfo.SetPathTiering(std::move(eviction)); @@ -940,7 +836,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u return {}; } if (indexChanges->NeedRepeat) { - Ttl.Repeat(); + TablesManager.OnTtlUpdate(); } bool needWrites = !indexChanges->PortionsToEvict.empty(); @@ -955,14 +851,14 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { LOG_S_DEBUG("Cleanup already in progress at tablet " << TabletID()); return {}; } - if (!PrimaryIndex) { + if (!TablesManager.HasPrimaryIndex()) { LOG_S_NOTICE("Cleanup not started. No index for cleanup at tablet " << TabletID()); return {}; } NOlap::TSnapshot cleanupSnapshot{GetMinReadStep(), 0}; - auto changes = PrimaryIndex->StartCleanup(cleanupSnapshot, PathsToDrop, TLimits::MAX_TX_RECORDS); + auto changes = TablesManager.StartIndexCleanup(cleanupSnapshot, TLimits::MAX_TX_RECORDS); if (!changes) { LOG_S_NOTICE("Cannot prepare cleanup at tablet " << TabletID()); return {}; @@ -995,7 +891,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { return {}; } - auto actualIndexInfo = PrimaryIndex->GetIndexInfo(); + auto actualIndexInfo = TablesManager.GetIndexInfo(); #if 0 // No need for now if (Tiers) { ... @@ -1009,33 +905,6 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { return ev; } -NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) { - Y_VERIFY(schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); - - ui32 indexId = 0; - NOlap::TIndexInfo indexInfo("", indexId); - - for (const auto& col : schema.GetColumns()) { - const ui32 id = col.GetId(); - const TString& name = col.GetName(); - auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(), - col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr); - indexInfo.Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod); - indexInfo.ColumnNames[name] = id; - } - - for (const auto& keyName : schema.GetKeyColumnNames()) { - Y_VERIFY(indexInfo.ColumnNames.count(keyName)); - indexInfo.KeyColumns.push_back(indexInfo.ColumnNames[keyName]); - } - - if (schema.HasDefaultCompression()) { - NOlap::TCompression compression = NTiers::ConvertCompression(schema.GetDefaultCompression()); - indexInfo.SetDefaultCompression(compression); - } - - return indexInfo; -} void TColumnShard::MapExternBlobs(const TActorContext& /*ctx*/, NOlap::TReadMetadata& metadata) { if (!metadata.SelectInfo) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 3283e97440e..383d51a3592 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -5,6 +5,7 @@ #include "columnshard_ttl.h" #include "columnshard_private_events.h" #include "blob_manager.h" +#include "tables_manager.h" #include "inflight_request_tracker.h" #include <ydb/core/tablet/tablet_counters.h> @@ -331,31 +332,8 @@ private: } }; - struct TSchemaPreset { - using TSchemaPresetVersionInfo = NKikimrTxColumnShard::TSchemaPresetVersionInfo; - - ui32 Id; - TString Name; - TMap<TRowVersion, TSchemaPresetVersionInfo> Versions; - TRowVersion DropVersion = TRowVersion::Max(); - - bool IsDropped() const { - return DropVersion != TRowVersion::Max(); - } - }; - - struct TTableInfo { - using TTableVersionInfo = NKikimrTxColumnShard::TTableVersionInfo; - - ui64 PathId; - std::map<TRowVersion, TTableVersionInfo> Versions; - TRowVersion DropVersion = TRowVersion::Max(); - TString TieringUsage; - - bool IsDropped() const { - return DropVersion != TRowVersion::Max(); - } - }; + using TSchemaPreset = TSchemaPreset; + using TTableInfo = TTableInfo; struct TLongTxWriteInfo { ui64 WriteId; @@ -364,6 +342,8 @@ private: ui64 PreparedTxId = 0; }; + TTablesManager TablesManager; + ui64 CurrentSchemeShardId = 0; TMessageSeqNo LastSchemaSeqNo; std::optional<NKikimrSubDomains::TProcessingParams> ProcessingParams; @@ -401,9 +381,7 @@ private: TTabletCountersBase* TabletCounters; std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache; std::unique_ptr<NOlap::TInsertTable> InsertTable; - std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex; TBatchCache BatchCache; - TTtl Ttl; THashMap<ui64, TBasicTxInfo> BasicTxInfo; TSet<TDeadlineQueueItem> DeadlineQueue; @@ -413,14 +391,11 @@ private: THashMap<ui64, TInstant> ScanTxInFlight; THashMap<ui64, TAlterMeta> AltersInFlight; THashMap<ui64, TCommitMeta> CommitsInFlight; // key is TxId from propose - THashMap<ui32, TSchemaPreset> SchemaPresets; - THashMap<ui64, TTableInfo> Tables; THashMap<TWriteId, TLongTxWriteInfo> LongTxWrites; using TPartsForLTXShard = THashMap<ui32, TLongTxWriteInfo*>; THashMap<TULID, TPartsForLTXShard> LongTxWritesByUniqueId; TMultiMap<TRowVersion, TEvColumnShard::TEvRead::TPtr> WaitingReads; TMultiMap<TRowVersion, TEvColumnShard::TEvScan::TPtr> WaitingScans; - THashSet<ui64> PathsToDrop; bool ActiveIndexingOrCompaction = false; bool ActiveCleanup = false; bool ActiveTtl = false; @@ -455,7 +430,7 @@ private: } bool IndexOverloaded() const { - return PrimaryIndex && PrimaryIndex->HasOverloadedGranules(); + return TablesManager.IndexOverloaded(); } TWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId); @@ -472,22 +447,13 @@ private: void UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExecutor::TTransactionContext& txc); void ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo& seqNoProto, NTabletFlatExecutor::TTransactionContext& txc); - bool IsTableWritable(ui64 tableId) const; - - ui32 EnsureSchemaPreset(NIceDb::TNiceDb& db, ui32 presetId, const TString& name, - const NKikimrSchemeOp::TColumnTableSchema& schemaProto, const TRowVersion& version); - ui32 EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, const TRowVersion& version); - //ui32 EnsureTtlSettingsPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableTtlSettingsPreset& presetProto, const TRowVersion& version); - void RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); void RunInit(const NKikimrTxColumnShard::TInitShard& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); void RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); void RunAlterTable(const NKikimrTxColumnShard::TAlterTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); void RunDropTable(const NKikimrTxColumnShard::TDropTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); void RunAlterStore(const NKikimrTxColumnShard::TAlterStore& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); - void SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions); - NOlap::TIndexInfo ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema); void MapExternBlobs(const TActorContext& ctx, NOlap::TReadMetadata& metadata); TActorId GetS3ActorForTier(const TString& tierId) const; void ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName, ui64 pathId, diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp new file mode 100644 index 00000000000..cbcf0abe8c3 --- /dev/null +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -0,0 +1,324 @@ +#include "tables_manager.h" +#include "columnshard_schema.h" +#include "blob_manager_db.h" +#include "engines/column_engine_logs.h" +#include <ydb/core/scheme/scheme_types_proto.h> +#include <ydb/core/tx/tiering/manager.h> + + + +namespace NKikimr::NColumnShard { + +void TSchemaPreset::Deserialize(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) { + Id = presetProto.GetId(); + Name = presetProto.GetName(); +} + +bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db, const ui64 tabletId) { + TabletId = tabletId; + { + auto rowset = db.Table<Schema::TableInfo>().Select(); + if (!rowset.IsReady()) { + return false; + } + + while (!rowset.EndOfSet()) { + TTableInfo table; + if (!table.InitFromDB(rowset)) { + return false; + } + if (table.IsDropped()) { + PathsToDrop.insert(table.GetPathId()); + } + Tables.insert_or_assign(table.GetPathId(), std::move(table)); + + if (!rowset.Next()) { + return false; + } + } + } + + bool isFakePresetOnly = true; + { + auto rowset = db.Table<Schema::SchemaPresetInfo>().Select(); + if (!rowset.IsReady()) { + return false; + } + + while (!rowset.EndOfSet()) { + TSchemaPreset preset; + preset.InitFromDB(rowset); + + if (preset.IsStandaloneTable()) { + Y_VERIFY_S(!preset.GetName(), "Preset name: " + preset.GetName()); + } else { + Y_VERIFY_S(preset.GetName() == "default", "Preset name: " + preset.GetName()); + isFakePresetOnly = false; + } + SchemaPresets.insert_or_assign(preset.GetId(), preset); + if (!rowset.Next()) { + return false; + } + } + } + + { + auto rowset = db.Table<Schema::TableVersionInfo>().Select(); + if (!rowset.IsReady()) { + return false; + } + + THashMap<ui64, TRowVersion> lastVersion; + while (!rowset.EndOfSet()) { + const ui64 pathId = rowset.GetValue<Schema::TableVersionInfo::PathId>(); + Y_VERIFY(Tables.contains(pathId)); + TRowVersion version( + rowset.GetValue<Schema::TableVersionInfo::SinceStep>(), + rowset.GetValue<Schema::TableVersionInfo::SinceTxId>()); + + auto& table = Tables.at(pathId); + TTableInfo::TTableVersionInfo versionInfo; + Y_VERIFY(versionInfo.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>())); + Y_VERIFY(SchemaPresets.contains(versionInfo.GetSchemaPresetId())); + + if (!table.IsDropped()) { + auto& ttlSettings = versionInfo.GetTtlSettings(); + if (ttlSettings.HasEnabled()) { + if (!lastVersion.contains(pathId) || lastVersion[pathId] < version) { + TTtl::TDescription description(ttlSettings.GetEnabled()); + Ttl.SetPathTtl(pathId, std::move(description)); + lastVersion[pathId] = version; + } + } + } + table.AddVersion(version, versionInfo); + if (!rowset.Next()) { + return false; + } + } + } + + { + auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select(); + if (!rowset.IsReady()) { + return false; + } + + while (!rowset.EndOfSet()) { + const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>(); + Y_VERIFY(SchemaPresets.contains(id)); + auto& preset = SchemaPresets.at(id); + TRowVersion version( + rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(), + rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>()); + + TSchemaPreset::TSchemaPresetVersionInfo info; + Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>())); + preset.AddVersion(version, info); + if (!rowset.Next()) { + return false; + } + } + } + + if (isFakePresetOnly) { + Y_VERIFY(Tables.size() <= 1); + if (!Tables.empty()) { + for (const auto& [version, tableInfo] : Tables.begin()->second.GetVersions()) { + if (tableInfo.HasSchema()) { + IndexSchemaVersion(version, tableInfo.GetSchema()); + } + } + } + } else { + for (const auto& [id, preset] : SchemaPresets) { + Y_VERIFY(!!id); + for (const auto& [version, schemaInfo] : preset.GetVersions()) { + if (schemaInfo.HasSchema()) { + IndexSchemaVersion(version, schemaInfo.GetSchema()); + } + } + } + } + return true; +} + +bool TTablesManager::LoadIndex(NOlap::TDbWrapper& idxDB, THashSet<NOlap::TUnifiedBlobId>& lostEvictions) { + if (PrimaryIndex) { + if (!PrimaryIndex->Load(idxDB, lostEvictions, PathsToDrop)) { + return false; + } + } + return true; +} + +void TTablesManager::Clear() { + Tables.clear(); + SchemaPresets.clear(); + PathsToDrop.clear(); +} + +bool TTablesManager::HasTable(const ui64 pathId) const { + auto it = Tables.find(pathId); + if (it == Tables.end() || it->second.IsDropped()) { + return false; + } + return true; +} + +bool TTablesManager::IsWritableTable(const ui64 pathId) const { + return HasTable(pathId); +} + +bool TTablesManager::HasPreset(const ui32 presetId) const { + return SchemaPresets.contains(presetId); +} + +const TTableInfo& TTablesManager::GetTable(const ui64 pathId) const { + Y_VERIFY(HasTable(pathId)); + return Tables.at(pathId); +} + +ui64 TTablesManager::GetMemoryUsage() const { + ui64 memory = + Tables.size() * sizeof(TTableInfo) + + PathsToDrop.size() * sizeof(ui64) + + Ttl.PathsCount() * sizeof(TTtl::TDescription) + + SchemaPresets.size() * sizeof(TSchemaPreset); + if (PrimaryIndex) { + memory += PrimaryIndex->MemoryUsage(); + } + return memory; +} + +void TTablesManager::OnTtlUpdate() { + Ttl.Repeat(); +} + +void TTablesManager::DropTable(const ui64 pathId, const TRowVersion& version, NIceDb::TNiceDb& db) { + auto& table = Tables.at(pathId); + table.SetDropVersion(version); + PathsToDrop.insert(pathId); + Ttl.DropPathTtl(pathId); + Schema::SaveTableDropVersion(db, pathId, version.Step, version.TxId); +} + +void TTablesManager::DropPreset(const ui32 presetId, const TRowVersion& version, NIceDb::TNiceDb& db) { + auto& preset = SchemaPresets.at(presetId); + Y_VERIFY(preset.GetName() != "default", "Cannot drop the default preset"); + preset.SetDropVersion(version); + Schema::SaveSchemaPresetDropVersion(db, presetId, version); +} + +void TTablesManager::RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db) { + Y_VERIFY(!HasTable(table.GetPathId())); + Y_VERIFY(table.IsEmpty()); + + Schema::SaveTableInfo(db, table.GetPathId(), table.GetTieringUsage()); + Tables.insert_or_assign(table.GetPathId(), std::move(table)); +} + +bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db) { + if (SchemaPresets.contains(schemaPreset.GetId())) { + return false; + } + Schema::SaveSchemaPresetInfo(db, schemaPreset.GetId(), schemaPreset.GetName()); + SchemaPresets.insert_or_assign(schemaPreset.GetId(), schemaPreset); + return true; +} + +void TTablesManager::AddPresetVersion(const ui32 presetId, const TRowVersion& version, const TTableSchema& schema, NIceDb::TNiceDb& db) { + Y_VERIFY(SchemaPresets.contains(presetId)); + TSchemaPreset::TSchemaPresetVersionInfo versionInfo; + versionInfo.SetId(presetId); + versionInfo.SetSinceStep(version.Step); + versionInfo.SetSinceTxId(version.TxId); + *versionInfo.MutableSchema() = schema; + + auto& schemaPreset = SchemaPresets.at(presetId); + Schema::SaveSchemaPresetVersionInfo(db, presetId, version, versionInfo); + schemaPreset.AddVersion(version, versionInfo); + if (presetId != 0 && versionInfo.HasSchema()) { + IndexSchemaVersion(version, versionInfo.GetSchema()); + } +} + +void TTablesManager::AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db) { + auto& table = Tables.at(pathId); + + if (table.IsEmpty()) { + if (versionInfo.HasSchemaPresetId()) { + Y_VERIFY(SchemaPresets.contains(versionInfo.GetSchemaPresetId())); + } else { + Y_VERIFY(SchemaPresets.empty()); + TSchemaPreset preset; + Y_VERIFY(RegisterSchemaPreset(preset, db)); + AddPresetVersion(preset.GetId(), version, versionInfo.GetSchema(), db); + } + } + Y_VERIFY(SchemaPresets.contains(versionInfo.GetSchemaPresetId())); + + if (versionInfo.HasTtlSettings()) { + const auto& ttlSettings = versionInfo.GetTtlSettings(); + if (ttlSettings.HasEnabled()) { + Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlSettings.GetEnabled())); + } else { + Ttl.DropPathTtl(pathId); + } + } + + Schema::SaveTableVersionInfo(db, pathId, version, versionInfo); + table.AddVersion(version, versionInfo); + if (versionInfo.HasSchema()) { + IndexSchemaVersion(version, versionInfo.GetSchema()); + } +} + +void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const TTableSchema& schema) { + NOlap::TSnapshot snapshot{version.Step, version.TxId}; + NOlap::TIndexInfo indexInfo = ConvertSchema(schema); + + if (!PrimaryIndex) { + PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(std::move(indexInfo), TabletId); + } else { + PrimaryIndex->UpdateDefaultSchema(snapshot, std::move(indexInfo)); + } + + for (auto& columnName : Ttl.TtlColumns()) { + PrimaryIndex->GetIndexInfo().CheckTtlColumn(columnName); + } +} + +std::shared_ptr<NOlap::TColumnEngineChanges> TTablesManager::StartIndexCleanup(const NOlap::TSnapshot& snapshot, ui32 maxRecords) { + Y_VERIFY(PrimaryIndex); + return PrimaryIndex->StartCleanup(snapshot, PathsToDrop, maxRecords); +} + +NOlap::TIndexInfo TTablesManager::ConvertSchema(const TTableSchema& schema) { + Y_VERIFY(schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); + + ui32 indexId = 0; + NOlap::TIndexInfo indexInfo("", indexId); + + for (const auto& col : schema.GetColumns()) { + const ui32 id = col.GetId(); + const TString& name = col.GetName(); + auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(), + col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr); + indexInfo.Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod); + indexInfo.ColumnNames[name] = id; + } + + for (const auto& keyName : schema.GetKeyColumnNames()) { + Y_VERIFY(indexInfo.ColumnNames.count(keyName)); + indexInfo.KeyColumns.push_back(indexInfo.ColumnNames[keyName]); + } + + if (schema.HasDefaultCompression()) { + NOlap::TCompression compression = NTiers::ConvertCompression(schema.GetDefaultCompression()); + indexInfo.SetDefaultCompression(compression); + } + + return indexInfo; +} +} diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h new file mode 100644 index 00000000000..423221195b7 --- /dev/null +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -0,0 +1,221 @@ +#pragma once + +#include "columnshard_schema.h" +#include "columnshard_ttl.h" +#include "engines/column_engine.h" + +#include "ydb/core/base/row_version.h" +#include "ydb/library/accessor/accessor.h" +#include "ydb/core/protos/tx_columnshard.pb.h" + + +namespace NKikimr::NColumnShard { + +template<class TSchemaProto> +class TVersionedSchema { +protected: + std::optional<TRowVersion> DropVersion; + TMap<TRowVersion, TSchemaProto> Versions; + +public: + bool IsDropped() const { + return DropVersion.has_value(); + } + + bool IsEmpty() const { + return Versions.empty(); + } + + void SetDropVersion(const TRowVersion& version) { + DropVersion = version; + } + + const TMap<TRowVersion, TSchemaProto>& GetVersions() const { + return Versions; + } + + const TSchemaProto& GetVersion(const TRowVersion& version) const { + const TSchemaProto* result = nullptr; + for (auto ver : Versions) { + if (ver.first > version) { + break; + } + result = &ver.second; + } + Y_VERIFY(!!result); + return *result; + } + + void AddVersion(const TRowVersion& version, const TSchemaProto& versionInfo) { + Versions[version] = versionInfo; + } +}; + +using TTableSchema = NKikimrSchemeOp::TColumnTableSchema; + +class TSchemaPreset : public TVersionedSchema<NKikimrTxColumnShard::TSchemaPresetVersionInfo> { +public: + using TSchemaPresetVersionInfo = NKikimrTxColumnShard::TSchemaPresetVersionInfo; + ui32 Id = 0; + TString Name; +public: + bool IsStandaloneTable() const { + return Id == 0; + } + + const TString& GetName() const { + return Name; + } + + ui32 GetId() const { + return Id; + } + + void Deserialize(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto); + + template <class TRow> + bool InitFromDB(const TRow& rowset) { + Id = rowset.template GetValue<Schema::SchemaPresetInfo::Id>(); + if (!IsStandaloneTable()) { + Name = rowset.template GetValue<Schema::SchemaPresetInfo::Name>(); + } + Y_VERIFY(!Id || Name == "default", "Unsupported preset at load time"); + + if (rowset.template HaveValue<Schema::SchemaPresetInfo::DropStep>() && + rowset.template HaveValue<Schema::SchemaPresetInfo::DropTxId>()) + { + DropVersion.emplace(); + DropVersion->Step = rowset.template GetValue<Schema::SchemaPresetInfo::DropStep>(); + DropVersion->TxId = rowset.template GetValue<Schema::SchemaPresetInfo::DropTxId>(); + } + return true; + } +}; + +class TTableInfo : public TVersionedSchema<NKikimrTxColumnShard::TTableVersionInfo> { +public: + using TTableVersionInfo = NKikimrTxColumnShard::TTableVersionInfo; + ui64 PathId; + TString TieringUsage; + +public: + const TString& GetTieringUsage() const { + return TieringUsage; + } + + TTableInfo& SetTieringUsage(const TString& data) { + TieringUsage = data; + return *this; + } + + ui64 GetPathId() const { + return PathId; + } + + TTableInfo() = default; + + TTableInfo(const ui64 pathId) + : PathId(pathId) + {} + + template <class TRow> + bool InitFromDB(const TRow& rowset) { + PathId = rowset.template GetValue<Schema::TableInfo::PathId>(); + TieringUsage = rowset.template GetValue<Schema::TableInfo::TieringUsage>(); + if (rowset.template HaveValue<Schema::TableInfo::DropStep>() && rowset.template HaveValue<Schema::TableInfo::DropTxId>()) { + DropVersion.emplace(); + DropVersion->Step = rowset.template GetValue<Schema::TableInfo::DropStep>(); + DropVersion->TxId = rowset.template GetValue<Schema::TableInfo::DropTxId>(); + } + return true; + } +}; + +class TTablesManager { +private: + THashMap<ui64, TTableInfo> Tables; + THashMap<ui32, TSchemaPreset> SchemaPresets; + THashSet<ui64> PathsToDrop; + TTtl Ttl; + std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex; + ui64 TabletId; +public: + const TTtl& GetTtl() const { + return Ttl; + } + + void AddTtls(THashMap<ui64, NOlap::TTiering>& eviction, TInstant now, bool force) { + Ttl.AddTtls(eviction, now, force); + } + + const THashSet<ui64>& GetPathsToDrop() const { + return PathsToDrop; + } + + const THashMap<ui64, TTableInfo>& GetTables() const { + return Tables; + } + + const THashMap<ui32, TSchemaPreset>& GetSchemaPresets() const { + return SchemaPresets; + } + + bool IndexOverloaded() const { + return PrimaryIndex && PrimaryIndex->HasOverloadedGranules(); + } + + bool HasPrimaryIndex() const { + return !!PrimaryIndex; + } + + NOlap::IColumnEngine& MutablePrimaryIndex() { + Y_VERIFY(!!PrimaryIndex); + return *PrimaryIndex; + } + + const NOlap::TIndexInfo& GetIndexInfo(const NOlap::TSnapshot& version = {}) const { + Y_UNUSED(version); + Y_VERIFY(!!PrimaryIndex); + return PrimaryIndex->GetIndexInfo(); + } + + const std::unique_ptr<NOlap::IColumnEngine>& GetPrimaryIndex() const { + return PrimaryIndex; + } + + const NOlap::IColumnEngine& GetPrimaryIndexSafe() const { + Y_VERIFY(!!PrimaryIndex); + return *PrimaryIndex; + } + + bool InitFromDB(NIceDb::TNiceDb& db, const ui64 tabletId); + bool LoadIndex(NOlap::TDbWrapper& db, THashSet<NOlap::TUnifiedBlobId>& lostEvictions); + + void Clear(); + + const TTableInfo& GetTable(const ui64 pathId) const; + ui64 GetMemoryUsage() const; + + bool HasTable(const ui64 pathId) const; + bool IsWritableTable(const ui64 pathId) const; + bool HasPreset(const ui32 presetId) const; + + void DropTable(const ui64 pathId, const TRowVersion& version, NIceDb::TNiceDb& db); + void DropPreset(const ui32 presetId, const TRowVersion& version, NIceDb::TNiceDb& db); + + void RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db); + bool RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db); + + void AddPresetVersion(const ui32 presetId, const TRowVersion& version, const TTableSchema& schema, NIceDb::TNiceDb& db); + void AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db); + + void OnTtlUpdate(); + + std::shared_ptr<NOlap::TColumnEngineChanges> StartIndexCleanup(const NOlap::TSnapshot& snapshot, ui32 maxRecords); + +private: + void IndexSchemaVersion(const TRowVersion& version, const TTableSchema& schema); + static NOlap::TIndexInfo ConvertSchema(const TTableSchema& schema); +}; + +} |