diff options
author | nsofya <nsofya@yandex-team.com> | 2023-04-06 13:36:33 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-04-06 13:36:33 +0300 |
commit | 1081f842b292b5ac2aad05bb887c47956bbcc45c (patch) | |
tree | cca95d9800e0a68987de5bb708a4553d05b37bf7 | |
parent | 4065b7ff9ca3eb0fde79193508385c0afc42f2c4 (diff) | |
download | ydb-1081f842b292b5ac2aad05bb887c47956bbcc45c.tar.gz |
temporary revert rXXXXXX commit due to standalone tables scheme problem
17 files changed, 431 insertions, 680 deletions
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt index 28261168eb2..a81d192131f 100644 --- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt @@ -74,7 +74,6 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp ) generate_enum_serilization(core-tx-columnshard ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt index 66d5ee15570..77e8b4de116 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt @@ -75,7 +75,6 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp ) generate_enum_serilization(core-tx-columnshard ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt index 66d5ee15570..77e8b4de116 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt @@ -75,7 +75,6 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp ) generate_enum_serilization(core-tx-columnshard ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt index 28261168eb2..a81d192131f 100644 --- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt @@ -74,7 +74,6 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp ) generate_enum_serilization(core-tx-columnshard ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 20033b28c87..4fa8d2fd8fd 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -29,8 +29,8 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) { IndexingActor = ctx.Register(CreateIndexingActor(TabletID(), ctx.SelfID)); CompactionActor = ctx.Register(CreateCompactionActor(TabletID(), ctx.SelfID)); EvictionActor = ctx.Register(CreateEvictionActor(TabletID(), ctx.SelfID)); - for (auto&& i : TablesManager.GetTables()) { - ActivateTiering(i.first, i.second.GetTieringUsage()); + for (auto&& i : Tables) { + ActivateTiering(i.first, i.second.TieringUsage); } SignalTabletActive(ctx); } @@ -193,11 +193,11 @@ void TColumnShard::UpdateInsertTableCounters() { } void TColumnShard::UpdateIndexCounters() { - if (!TablesManager.HasPrimaryIndex()) { + if (!PrimaryIndex) { return; } - auto& stats = TablesManager.MutablePrimaryIndex().GetTotalStats(); + auto& stats = PrimaryIndex->GetTotalStats(); SetCounter(COUNTER_INDEX_TABLES, stats.Tables); SetCounter(COUNTER_INDEX_GRANULES, stats.Granules); SetCounter(COUNTER_INDEX_EMPTY_GRANULES, stats.EmptyGranules); @@ -243,6 +243,10 @@ void TColumnShard::UpdateIndexCounters() { ui64 TColumnShard::MemoryUsage() const { ui64 memory = + Tables.size() * sizeof(TTableInfo) + + PathsToDrop.size() * sizeof(ui64) + + Ttl.PathsCount() * sizeof(TTtl::TDescription) + + SchemaPresets.size() * sizeof(TSchemaPreset) + BasicTxInfo.size() * sizeof(TBasicTxInfo) + DeadlineQueue.size() * sizeof(TDeadlineQueueItem) + (PlanQueue.size() + RunningQueue.size()) * sizeof(TPlanQueueItem) + @@ -254,7 +258,9 @@ ui64 TColumnShard::MemoryUsage() const { (WaitingReads.size() + WaitingScans.size()) * (sizeof(TRowVersion) + sizeof(void*)) + TabletCounters->Simple()[COUNTER_PREPARED_RECORDS].Get() * sizeof(NOlap::TInsertedData) + TabletCounters->Simple()[COUNTER_COMMITTED_RECORDS].Get() * sizeof(NOlap::TInsertedData); - memory += TablesManager.GetMemoryUsage(); + if (PrimaryIndex) { + memory += PrimaryIndex->MemoryUsage(); + } memory += BatchCache.Bytes(); return memory; } @@ -323,9 +329,9 @@ void TColumnShard::SendPeriodicStats() { tabletStats->SetTxRejectedBySpace(TabletCounters->Cumulative()[COUNTER_OUT_OF_SPACE].Get()); tabletStats->SetInFlightTxCount(Executor()->GetStats().TxInFly); - if (TablesManager.HasPrimaryIndex()) { - const auto& indexStats = TablesManager.MutablePrimaryIndex().GetTotalStats(); - NOlap::TSnapshot lastIndexUpdate = TablesManager.GetPrimaryIndexSafe().LastUpdate(); + if (PrimaryIndex) { + const auto& indexStats = PrimaryIndex->GetTotalStats(); + NOlap::TSnapshot lastIndexUpdate = PrimaryIndex->LastUpdate(); auto activeIndexStats = indexStats.Active(); // data stats excluding inactive and evicted tabletStats->SetRowCount(activeIndexStats.Rows); diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index d3cc84cf09f..23a25c8d83d 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -41,7 +41,8 @@ void TTxInit::SetDefaults() { Self->PlanQueue.clear(); Self->AltersInFlight.clear(); Self->CommitsInFlight.clear(); - Self->TablesManager.Clear(); + Self->SchemaPresets.clear(); + Self->Tables.clear(); Self->LongTxWrites.clear(); Self->LongTxWritesByUniqueId.clear(); } @@ -136,10 +137,132 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) } } - Self->TablesManager.InitFromDB(db, Self->TabletID()); - Self->SetCounter(COUNTER_TABLES, Self->TablesManager.GetTables().size()); - Self->SetCounter(COUNTER_TABLE_PRESETS, Self->TablesManager.GetSchemaPresets().size()); - Self->SetCounter(COUNTER_TABLE_TTLS, Self->TablesManager.GetTtl().PathsCount()); + // Primary index defaut schema and TTL (both are versioned) + TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaPreset; + TMap<NOlap::TSnapshot, NOlap::TIndexInfo> commonSchema; + THashMap<ui64, TMap<TRowVersion, TTtl::TDescription>> ttls; + + { // Load schema presets + auto rowset = db.Table<Schema::SchemaPresetInfo>().Select(); + if (!rowset.IsReady()) + return false; + + while (!rowset.EndOfSet()) { + const ui32 id = rowset.GetValue<Schema::SchemaPresetInfo::Id>(); + auto& preset = Self->SchemaPresets[id]; + preset.Id = id; + if (id) { + preset.Name = rowset.GetValue<Schema::SchemaPresetInfo::Name>(); + } + Y_VERIFY(!id || preset.Name == "default", "Unsupported preset at load time"); + + if (rowset.HaveValue<Schema::SchemaPresetInfo::DropStep>() && + rowset.HaveValue<Schema::SchemaPresetInfo::DropTxId>()) + { + preset.DropVersion.Step = rowset.GetValue<Schema::SchemaPresetInfo::DropStep>(); + preset.DropVersion.TxId = rowset.GetValue<Schema::SchemaPresetInfo::DropTxId>(); + } + + if (!rowset.Next()) + return false; + } + } + + { // Load schema preset versions + auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select(); + if (!rowset.IsReady()) + return false; + + while (!rowset.EndOfSet()) { + const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>(); + Y_VERIFY(Self->SchemaPresets.contains(id)); + auto& preset = Self->SchemaPresets.at(id); + TRowVersion version( + rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(), + rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>()); + auto& info = preset.Versions[version]; + Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>())); + + NOlap::TSnapshot snap{version.Step, version.TxId}; + if (!id) { + commonSchema.emplace(snap, Self->ConvertSchema(info.GetSchema())); + } else if (preset.Name == "default") { + schemaPreset.emplace(snap, Self->ConvertSchema(info.GetSchema())); + } + + if (!rowset.Next()) + return false; + } + } + + { // Load tables + auto rowset = db.Table<Schema::TableInfo>().Select(); + if (!rowset.IsReady()) + return false; + + while (!rowset.EndOfSet()) { + const ui64 pathId = rowset.GetValue<Schema::TableInfo::PathId>(); + auto& table = Self->Tables[pathId]; + table.PathId = pathId; + table.TieringUsage = rowset.GetValue<Schema::TableInfo::TieringUsage>(); + if (rowset.HaveValue<Schema::TableInfo::DropStep>() && + rowset.HaveValue<Schema::TableInfo::DropTxId>()) + { + table.DropVersion.Step = rowset.GetValue<Schema::TableInfo::DropStep>(); + table.DropVersion.TxId = rowset.GetValue<Schema::TableInfo::DropTxId>(); + Self->PathsToDrop.insert(pathId); + } + + if (!rowset.Next()) + return false; + } + } + + { // Load table versions + auto rowset = db.Table<Schema::TableVersionInfo>().Select(); + if (!rowset.IsReady()) + return false; + + while (!rowset.EndOfSet()) { + const ui64 pathId = rowset.GetValue<Schema::TableVersionInfo::PathId>(); + Y_VERIFY(Self->Tables.contains(pathId)); + auto& table = Self->Tables.at(pathId); + TRowVersion version( + rowset.GetValue<Schema::TableVersionInfo::SinceStep>(), + rowset.GetValue<Schema::TableVersionInfo::SinceTxId>()); + auto& info = table.Versions[version]; + Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>())); + + if (!Self->PathsToDrop.count(pathId)) { + auto& ttlSettings = info.GetTtlSettings(); + if (ttlSettings.HasEnabled()) { + ttls[pathId].emplace(version, TTtl::TDescription(ttlSettings.GetEnabled())); + } + } + + if (!rowset.Next()) + return false; + } + } + + for (auto& [pathId, map] : ttls) { + auto& description = map.rbegin()->second; // last version if many + Self->Ttl.SetPathTtl(pathId, std::move(description)); + } + + Self->SetCounter(COUNTER_TABLES, Self->Tables.size()); + Self->SetCounter(COUNTER_TABLE_PRESETS, Self->SchemaPresets.size()); + Self->SetCounter(COUNTER_TABLE_TTLS, ttls.size()); + + if (!schemaPreset.empty()) { + Y_VERIFY(commonSchema.empty(), "Mix of schema preset and common schema"); + Self->SetPrimaryIndex(std::move(schemaPreset)); + } else if (!commonSchema.empty()) { + Self->SetPrimaryIndex(std::move(commonSchema)); + } else { + Y_VERIFY(Self->Tables.empty()); + Y_VERIFY(Self->SchemaPresets.empty()); + } { // Load long tx writes auto rowset = db.Table<Schema::LongTxWrites>().Select(); @@ -187,8 +310,13 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) } } - if (!Self->TablesManager.LoadIndex(dbTable, lostEvictions)) { - return false; + // Load primary index + if (Self->PrimaryIndex) { + TBlobGroupSelector dsGroupSelector(Self->Info()); + NOlap::TDbWrapper idxDB(txc.DB, &dsGroupSelector); + if (!Self->PrimaryIndex->Load(idxDB, lostEvictions, Self->PathsToDrop)) { + return false; + } } // Set dropped evicting records to be erased in future cleanups diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index 2ab3ab2a8e8..beaa2f7b770 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -142,7 +142,8 @@ public: } auto pathExists = [&](ui64 pathId) { - return Self->TablesManager.HasTable(pathId); + auto it = Self->Tables.find(pathId); + return it != Self->Tables.end() && !it->second.IsDropped(); }; auto counters = Self->InsertTable->Commit(dbTable, step, txId, meta.MetaShard, meta.WriteIds, diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index c6a9274b20c..de3bf81635b 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -276,13 +276,13 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex break; } - if (!Self->TablesManager.HasPrimaryIndex()) { + if (!Self->PrimaryIndex) { statusMessage = "No primary index for TTL"; status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; break; } - auto schema = Self->TablesManager.GetIndexInfo().ArrowSchema(); + auto schema = Self->PrimaryIndex->GetIndexInfo().ArrowSchema(); auto ttlColumn = schema->GetFieldByName(columnName); if (!ttlColumn) { statusMessage = "TTL tx wrong TTL column '" + columnName + "'"; diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp index 3857498fe19..8fe9ba771a8 100644 --- a/ydb/core/tx/columnshard/columnshard__read.cpp +++ b/ydb/core/tx/columnshard/columnshard__read.cpp @@ -38,13 +38,13 @@ private: bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) { Y_VERIFY(Ev); - Y_VERIFY(Self->TablesManager.HasPrimaryIndex()); + Y_VERIFY(Self->PrimaryIndex); Y_UNUSED(txc); LOG_S_DEBUG("TTxRead.Execute at tablet " << Self->TabletID()); txc.DB.NoMoreReadsForTx(); - const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo(); + const NOlap::TIndexInfo& indexInfo = Self->PrimaryIndex->GetIndexInfo(); auto& record = Proto(Ev->Get()); ui64 metaShard = record.GetTxInitiator(); @@ -53,7 +53,7 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) { read.PlanStep = record.GetPlanStep(); read.TxId = record.GetTxId(); read.PathId = record.GetTableId(); - read.ReadNothing = !(Self->TablesManager.HasTable(read.PathId)); + read.ReadNothing = Self->PathsToDrop.count(read.PathId); read.ColumnIds = ProtoToVector<ui32>(record.GetColumnIds()); read.ColumnNames = ProtoToVector<TString>(record.GetColumnNames()); if (read.ColumnIds.empty() && read.ColumnNames.empty()) { @@ -75,11 +75,11 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) { } bool parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read, - TIndexColumnResolver(Self->TablesManager.GetIndexInfo())); + TIndexColumnResolver(Self->PrimaryIndex->GetIndexInfo())); std::shared_ptr<NOlap::TReadMetadata> metadata; if (parseResult) { - metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(), Self->BatchCache, + metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, Self->BatchCache, ErrorDescription); } diff --git a/ydb/core/tx/columnshard/columnshard__read_base.cpp b/ydb/core/tx/columnshard/columnshard__read_base.cpp index 63c8239a6e7..53b659cfd45 100644 --- a/ydb/core/tx/columnshard/columnshard__read_base.cpp +++ b/ydb/core/tx/columnshard/columnshard__read_base.cpp @@ -192,7 +192,7 @@ bool TTxReadBase::ParseProgram(const TActorContext& ctx, NKikimrSchemeOp::EOlapP ErrorDescription = TStringBuilder() << "Wrong olap program"; return false; } - if (!ssaProgram->Steps.empty() && Self->TablesManager.HasPrimaryIndex()) { + if (!ssaProgram->Steps.empty() && Self->PrimaryIndex) { NSsa::OptimizeProgram(*ssaProgram); } diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 8c17c5b83b9..da8da9ee25e 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -842,9 +842,9 @@ std::shared_ptr<NOlap::TReadMetadataBase> TTxScan::CreateReadMetadata(const TAct { std::shared_ptr<NOlap::TReadMetadataBase> metadata; if (indexStats) { - metadata = PrepareStatsReadMetadata(Self->TabletID(), read, Self->TablesManager.GetPrimaryIndex(), ErrorDescription); + metadata = PrepareStatsReadMetadata(Self->TabletID(), read, Self->PrimaryIndex, ErrorDescription); } else { - metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(), Self->BatchCache, + metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, Self->BatchCache, ErrorDescription); } @@ -880,7 +880,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { read.PlanStep = snapshot.GetStep(); read.TxId = snapshot.GetTxId(); read.PathId = record.GetLocalPathId(); - read.ReadNothing = !(Self->TablesManager.HasTable(read.PathId)); + read.ReadNothing = Self->PathsToDrop.count(read.PathId); read.TableName = record.GetTablePath(); bool isIndexStats = read.TableName.EndsWith(NOlap::TIndexInfo::STORE_INDEX_STATS_TABLE) || read.TableName.EndsWith(NOlap::TIndexInfo::TABLE_INDEX_STATS_TABLE); @@ -891,7 +891,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { // "SELECT COUNT(*)" requests empty column list but we need non-empty list for PrepareReadMetadata. // So we add first PK column to the request. if (!isIndexStats) { - read.ColumnIds.push_back(Self->TablesManager.GetIndexInfo().GetPKFirstColumnId()); + read.ColumnIds.push_back(Self->PrimaryIndex->GetIndexInfo().GetPKFirstColumnId()); } else { read.ColumnIds.push_back(PrimaryIndexStatsSchema.KeyColumns.front()); } @@ -900,7 +900,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { bool parseResult; if (!isIndexStats) { - TIndexColumnResolver columnResolver(Self->TablesManager.GetIndexInfo()); + TIndexColumnResolver columnResolver(Self->PrimaryIndex->GetIndexInfo()); parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read, columnResolver); } else { TStatsColumnResolver columnResolver; @@ -926,7 +926,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { auto ydbKey = isIndexStats ? NOlap::GetColumns(PrimaryIndexStatsSchema, PrimaryIndexStatsSchema.KeyColumns) : - Self->TablesManager.GetIndexInfo().GetPrimaryKey(); + Self->PrimaryIndex->GetIndexInfo().GetPrimaryKey(); for (auto& range: record.GetRanges()) { FillPredicatesFromRange(read, range, ydbKey, Self->TabletID()); @@ -1023,7 +1023,7 @@ void TTxScan::Complete(const TActorContext& ctx) { if (request.GetReverse()) { std::reverse(rMetadataRanges.begin(), rMetadataRanges.end()); } - NOlap::NCosts::TKeyRangesBuilder krBuilder(Self->TablesManager.GetIndexInfo()); + NOlap::NCosts::TKeyRangesBuilder krBuilder(Self->PrimaryIndex->GetIndexInfo()); { ui32 recordsCount = 0; for (auto&& i : rMetadataRanges) { diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index e27ca500ac5..7d42fb25c5e 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -49,7 +49,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { Y_VERIFY(logoBlobId.IsValid()); bool ok = false; - if (!Self->TablesManager.HasPrimaryIndex() || !Self->TablesManager.IsWritableTable(tableId)) { + if (!Self->PrimaryIndex || !Self->IsTableWritable(tableId)) { status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; } else { if (record.HasLongTxId()) { @@ -138,15 +138,15 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex TString dedupId = record.GetDedupId(); auto putStatus = ev->Get()->PutStatus; - bool isWritable = TablesManager.IsWritableTable(tableId); - bool error = data.empty() || data.size() > TLimits::GetMaxBlobSize() || !TablesManager.HasPrimaryIndex() || !isWritable; + bool isWritable = IsTableWritable(tableId); + bool error = data.empty() || data.size() > TLimits::GetMaxBlobSize() || !PrimaryIndex || !isWritable; bool errorReturned = (putStatus != NKikimrProto::OK) && (putStatus != NKikimrProto::UNKNOWN); bool isOutOfSpace = IsAnyChannelYellowStop(); if (error || errorReturned) { LOG_S_NOTICE("Write (fail) " << data.size() << " bytes into pathId " << tableId << ", status " << putStatus - << (TablesManager.HasPrimaryIndex()? "": ", no index") << (isWritable? "": ", ro") + << (PrimaryIndex? "": ", no index") << (isWritable? "": ", ro") << " at tablet " << TabletID()); IncCounter(COUNTER_WRITE_FAIL); @@ -221,7 +221,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize; ++WritesInFly; // write started - ctx.Register(CreateWriteActor(TabletID(), TablesManager.GetIndexInfo(), ctx.SelfID, + ctx.Register(CreateWriteActor(TabletID(), PrimaryIndex->GetIndexInfo(), ctx.SelfID, BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release())); } diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 8ddfd4e2965..61149c87fa4 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -43,7 +43,7 @@ private: bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) { Y_VERIFY(Ev); Y_VERIFY(Self->InsertTable); - Y_VERIFY(Self->TablesManager.HasPrimaryIndex()); + Y_VERIFY(Self->PrimaryIndex); txc.DB.NoMoreReadsForTx(); @@ -66,7 +66,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) TBlobGroupSelector dsGroupSelector(Self->Info()); NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector); - ok = Self->TablesManager.MutablePrimaryIndex().ApplyChanges(dbWrap, changes, snapshot); // update changes + apply + ok = Self->PrimaryIndex->ApplyChanges(dbWrap, changes, snapshot); // update changes + apply if (ok) { LOG_S_DEBUG("TTxWriteIndex (" << changes->TypeString() << ") apply at tablet " << Self->TabletID()); @@ -248,7 +248,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) Schema::SaveSpecialValue(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo); } - Self->TablesManager.MutablePrimaryIndex().FreeLocks(changes); + Self->PrimaryIndex->FreeLocks(changes); if (changes->IsInsert()) { Self->ActiveIndexingOrCompaction = false; diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 5db1a69b302..41f3d952b3e 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -395,6 +395,46 @@ void TColumnShard::ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo& } } +bool TColumnShard::IsTableWritable(ui64 tableId) const { + auto it = Tables.find(tableId); + if (it == Tables.end()) { + return false; + } + return !it->second.IsDropped(); +} + +ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, ui32 presetId, const TString& name, + const NKikimrSchemeOp::TColumnTableSchema& schemaProto, + const TRowVersion& version) { + if (!SchemaPresets.contains(presetId)) { + LOG_S_DEBUG("EnsureSchemaPreset " << presetId << " at tablet " << TabletID()); + + auto& preset = SchemaPresets[presetId]; + preset.Id = presetId; + preset.Name = name; + auto& info = preset.Versions[version]; + info.SetId(preset.Id); + info.SetSinceStep(version.Step); + info.SetSinceTxId(version.TxId); + *info.MutableSchema() = schemaProto; + + Schema::SaveSchemaPresetInfo(db, preset.Id, preset.Name); + Schema::SaveSchemaPresetVersionInfo(db, preset.Id, version, info); + SetCounter(COUNTER_TABLE_PRESETS, SchemaPresets.size()); + } else { + LOG_S_DEBUG("EnsureSchemaPreset for existed preset " << presetId << " at tablet " << TabletID()); + } + + return presetId; +} + +ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, + const TRowVersion& version) { + Y_VERIFY(presetProto.GetName() == "default", "Only schema preset named 'default' is supported"); + + return EnsureSchemaPreset(db, presetProto.GetId(), presetProto.GetName(), presetProto.GetSchema(), version); +} + void TColumnShard::RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc) { switch (body.TxBody_case()) { @@ -454,53 +494,70 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl NIceDb::TNiceDb db(txc.DB); const ui64 pathId = tableProto.GetPathId(); - if (TablesManager.HasTable(pathId)) { - LOG_S_DEBUG("EnsureTable for existed pathId: " << pathId << " at tablet " << TabletID()); - return; - } + if (!Tables.contains(pathId)) { + LOG_S_DEBUG("EnsureTable for pathId: " << pathId + << " ttl settings: " << tableProto.GetTtlSettings() + << " at tablet " << TabletID()); - LOG_S_DEBUG("EnsureTable for pathId: " << pathId - << " ttl settings: " << tableProto.GetTtlSettings() - << " at tablet " << TabletID()); + ui32 schemaPresetId = 0; + if (tableProto.HasSchemaPreset()) { + Y_VERIFY(!tableProto.HasSchema(), "Tables has either schema or preset"); - TTableInfo::TTableVersionInfo tableVerProto; - tableVerProto.SetPathId(pathId); + schemaPresetId = EnsureSchemaPreset(db, tableProto.GetSchemaPreset(), version); + Y_VERIFY(schemaPresetId); + } else { + Y_VERIFY(tableProto.HasSchema(), "Tables has either schema or preset"); - if (tableProto.HasSchemaPreset()) { - Y_VERIFY(!tableProto.HasSchema(), "Tables has either schema or preset"); + // Save first table schema as common one with schemaPresetId == 0 - TSchemaPreset preset; - preset.Deserialize(tableProto.GetSchemaPreset()); - Y_VERIFY(!preset.IsStandaloneTable()); - tableVerProto.SetSchemaPresetId(preset.GetId()); - - if (TablesManager.RegisterSchemaPreset(preset, db)) { - TablesManager.AddPresetVersion(tableProto.GetSchemaPreset().GetId(), version, tableProto.GetSchemaPreset().GetSchema(), db); + if (SchemaPresets.count(0)) { + LOG_S_WARN("Colocated standalone tables are not supported. " + << "EnsureTable failed at tablet " << TabletID()); + return; + } + + schemaPresetId = EnsureSchemaPreset(db, 0, "", tableProto.GetSchema(), version); + Y_VERIFY(!schemaPresetId); } - } else { - Y_VERIFY(tableProto.HasSchema(), "Tables has either schema or preset"); - *tableVerProto.MutableSchema() = tableProto.GetSchema(); - } - TTableInfo table(pathId); - if (tableProto.HasTtlSettings()) { - const auto& ttlSettings = tableProto.GetTtlSettings(); - *tableVerProto.MutableTtlSettings() = ttlSettings; - if (ttlSettings.HasUseTiering()) { - table.SetTieringUsage(ttlSettings.GetUseTiering()); - ActivateTiering(pathId, table.GetTieringUsage()); + auto& table = Tables[pathId]; + table.PathId = pathId; + auto& tableVerProto = table.Versions[version]; + tableVerProto.SetPathId(pathId); + tableVerProto.SetSchemaPresetId(schemaPresetId); + + if (tableProto.HasTtlSettings()) { + *tableVerProto.MutableTtlSettings() = tableProto.GetTtlSettings(); + auto& ttlInfo = tableProto.GetTtlSettings(); + if (ttlInfo.HasEnabled()) { + Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlInfo.GetEnabled())); + SetCounter(COUNTER_TABLE_TTLS, Ttl.PathsCount()); + } + if (ttlInfo.HasUseTiering()) { + table.TieringUsage = ttlInfo.GetUseTiering(); + ActivateTiering(pathId, table.TieringUsage); + } } - } - tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj()); - tableVerProto.SetTtlSettingsPresetVersionAdj(tableProto.GetTtlSettingsPresetVersionAdj()); + if (!PrimaryIndex) { + TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaHistory; + + auto& schemaPresetVerProto = SchemaPresets[schemaPresetId].Versions[version]; + schemaHistory.emplace(NOlap::TSnapshot{version.Step, version.TxId}, + ConvertSchema(schemaPresetVerProto.GetSchema())); - TablesManager.RegisterTable(std::move(table), db); - TablesManager.AddTableVersion(pathId, version, tableVerProto, db); + SetPrimaryIndex(std::move(schemaHistory)); + } + + tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj()); + tableVerProto.SetTtlSettingsPresetVersionAdj(tableProto.GetTtlSettingsPresetVersionAdj()); - SetCounter(COUNTER_TABLES, TablesManager.GetTables().size()); - SetCounter(COUNTER_TABLE_PRESETS, TablesManager.GetSchemaPresets().size()); - SetCounter(COUNTER_TABLE_TTLS, TablesManager.GetTtl().PathsCount()); + Schema::SaveTableInfo(db, table.PathId, table.TieringUsage); + Schema::SaveTableVersionInfo(db, table.PathId, version, tableVerProto); + SetCounter(COUNTER_TABLES, Tables.size()); + } else { + LOG_S_DEBUG("EnsureTable for existed pathId: " << pathId << " at tablet " << TabletID()); + } } void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterProto, const TRowVersion& version, @@ -508,33 +565,39 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP NIceDb::TNiceDb db(txc.DB); const ui64 pathId = alterProto.GetPathId(); - Y_VERIFY(TablesManager.HasTable(pathId), "AlterTable on a dropped or non-existent table"); - + auto* tablePtr = Tables.FindPtr(pathId); + Y_VERIFY(tablePtr && !tablePtr->IsDropped(), "AlterTable on a dropped or non-existent table"); + auto& table = *tablePtr; + auto& ttlSettings = alterProto.GetTtlSettings(); + LOG_S_DEBUG("AlterTable for pathId: " << pathId << " schema: " << alterProto.GetSchema() - << " ttl settings: " << alterProto.GetTtlSettings() + << " ttl settings: " << ttlSettings << " at tablet " << TabletID()); - TTableInfo::TTableVersionInfo tableVerProto; + auto& info = table.Versions[version]; + if (alterProto.HasSchemaPreset()) { - tableVerProto.SetSchemaPresetId(alterProto.GetSchemaPreset().GetId()); - TablesManager.AddPresetVersion(alterProto.GetSchemaPreset().GetId(), version, alterProto.GetSchemaPreset().GetSchema(), db); - } else if (alterProto.HasSchema()) { - *tableVerProto.MutableSchema() = alterProto.GetSchema(); + info.SetSchemaPresetId(EnsureSchemaPreset(db, alterProto.GetSchemaPreset(), version)); } - const auto& ttlSettings = alterProto.GetTtlSettings(); // Note: Not valid behaviour for full alter implementation const TString& tieringUsage = ttlSettings.GetUseTiering(); + ActivateTiering(pathId, tieringUsage); if (alterProto.HasTtlSettings()) { - const auto& ttlSettings = alterProto.GetTtlSettings(); - *tableVerProto.MutableTtlSettings() = ttlSettings; + *info.MutableTtlSettings() = ttlSettings; + if (ttlSettings.HasEnabled()) { + Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlSettings.GetEnabled())); + } else { + Ttl.DropPathTtl(pathId); + } + } else { + Ttl.DropPathTtl(pathId); } - ActivateTiering(pathId, tieringUsage); - Schema::SaveTableInfo(db, pathId, tieringUsage); + Ttl.Repeat(); // Atler TTL triggers TTL activity - tableVerProto.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj()); - TablesManager.AddTableVersion(pathId, version, tableVerProto, db); - TablesManager.OnTtlUpdate(); + info.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj()); + Schema::SaveTableInfo(db, table.PathId, tieringUsage); + Schema::SaveTableVersionInfo(db, table.PathId, version, info); } void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const TRowVersion& version, @@ -542,20 +605,25 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt NIceDb::TNiceDb db(txc.DB); const ui64 pathId = dropProto.GetPathId(); - if (!TablesManager.HasTable(pathId)) { + auto* table = Tables.FindPtr(pathId); + Y_VERIFY_DEBUG(table && !table->IsDropped()); + if (table && !table->IsDropped()) { + LOG_S_DEBUG("DropTable for pathId: " << pathId << " at tablet " << TabletID()); + + PathsToDrop.insert(pathId); + Ttl.DropPathTtl(pathId); + + // TODO: Allow to read old snapshots after DROP + TBlobGroupSelector dsGroupSelector(Info()); + NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); + THashSet<TWriteId> writesToAbort = InsertTable->DropPath(dbTable, pathId); + TryAbortWrites(db, dbTable, std::move(writesToAbort)); + + table->DropVersion = version; + Schema::SaveTableDropVersion(db, pathId, version.Step, version.TxId); + } else { LOG_S_DEBUG("DropTable for unknown or deleted pathId: " << pathId << " at tablet " << TabletID()); - return; } - - LOG_S_DEBUG("DropTable for pathId: " << pathId << " at tablet " << TabletID()); - TablesManager.DropTable(pathId, version, db); - - // TODO: Allow to read old snapshots after DROP - TBlobGroupSelector dsGroupSelector(Info()); - NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - THashSet<TWriteId> writesToAbort = InsertTable->DropPath(dbTable, pathId); - - TryAbortWrites(db, dbTable, std::move(writesToAbort)); } void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, const TRowVersion& version, @@ -567,18 +635,54 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId); } + TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaHistory; + for (ui32 id : proto.GetDroppedSchemaPresets()) { - if (!TablesManager.HasPreset(id)) { + if (!SchemaPresets.contains(id)) { continue; } - TablesManager.DropPreset(id, version, db); + auto& preset = SchemaPresets.at(id); + Y_VERIFY(preset.Name != "default", "Cannot drop the default preset"); + preset.DropVersion = version; + Schema::SaveSchemaPresetDropVersion(db, id, version); } for (const auto& presetProto : proto.GetSchemaPresets()) { - if (!TablesManager.HasPreset(presetProto.GetId())) { + if (!SchemaPresets.contains(presetProto.GetId())) { continue; // we don't update presets that we don't use } - TablesManager.AddPresetVersion(presetProto.GetId(), version, presetProto.GetSchema(), db); + + auto& preset = SchemaPresets[presetProto.GetId()]; + auto& info = preset.Versions[version]; + info.SetId(preset.Id); + info.SetSinceStep(version.Step); + info.SetSinceTxId(version.TxId); + *info.MutableSchema() = presetProto.GetSchema(); + + if (preset.Name == "default") { + schemaHistory.emplace(NOlap::TSnapshot{version.Step, version.TxId}, ConvertSchema(info.GetSchema())); + } + + Schema::SaveSchemaPresetVersionInfo(db, preset.Id, version, info); + } + + if (!schemaHistory.empty()) { + SetPrimaryIndex(std::move(schemaHistory)); + } +} + +void TColumnShard::SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions) { + for (auto& [snap, indexInfo] : schemaVersions) { + if (!PrimaryIndex) { + PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(std::move(indexInfo), TabletID()); + SetCounter(COUNTER_INDEXES, 1); + } else { + PrimaryIndex->UpdateDefaultSchema(snap, std::move(indexInfo)); + } + } + + for (auto& columnName : Ttl.TtlColumns()) { + PrimaryIndex->GetIndexInfo().CheckTtlColumn(columnName); } } @@ -660,7 +764,7 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() { LOG_S_DEBUG("Indexing/compaction already in progress at tablet " << TabletID()); return {}; } - if (!TablesManager.HasPrimaryIndex()) { + if (!PrimaryIndex) { LOG_S_NOTICE("Indexing not started: no index at tablet " << TabletID()); return {}; } @@ -681,7 +785,7 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() { if (bytesToIndex && (bytesToIndex + dataSize) > (ui64)Limits.MaxInsertBytes) { continue; } - if (auto* pMap = TablesManager.GetPrimaryIndexSafe().GetOverloadedGranules(data.PathId)) { + if (auto* pMap = PrimaryIndex->GetOverloadedGranules(data.PathId)) { overloadedPathGranules[pathId] = pMap->size(); InsertTable->SetOverloaded(data.PathId, true); ++ignored; @@ -729,13 +833,13 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() { } Y_VERIFY(data.size()); - auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(std::move(data)); + auto indexChanges = PrimaryIndex->StartInsert(std::move(data)); if (!indexChanges) { LOG_S_NOTICE("Cannot prepare indexing at tablet " << TabletID()); return {}; } - auto actualIndexInfo = TablesManager.GetIndexInfo(); + auto actualIndexInfo = PrimaryIndex->GetIndexInfo(); if (Tiers) { auto pathTiering = Tiers->GetTiering(); // TODO: pathIds actualIndexInfo.UpdatePathTiering(pathTiering); @@ -753,13 +857,13 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() { LOG_S_DEBUG("Compaction/indexing already in progress at tablet " << TabletID()); return {}; } - if (!TablesManager.HasPrimaryIndex()) { + if (!PrimaryIndex) { LOG_S_NOTICE("Compaction not started: no index at tablet " << TabletID()); return {}; } - TablesManager.MutablePrimaryIndex().UpdateCompactionLimits(CompactionLimits.Get()); - auto compactionInfo = TablesManager.MutablePrimaryIndex().Compact(LastCompactedGranule); + PrimaryIndex->UpdateCompactionLimits(CompactionLimits.Get()); + auto compactionInfo = PrimaryIndex->Compact(LastCompactedGranule); if (!compactionInfo || compactionInfo->Empty()) { LOG_S_DEBUG("Compaction not started: no portions to compact at tablet " << TabletID()); return {}; @@ -770,13 +874,13 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() { LOG_S_DEBUG("Prepare " << *compactionInfo << " at tablet " << TabletID()); ui64 ourdatedStep = GetOutdatedStep(); - auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(std::move(compactionInfo), {ourdatedStep, 0}); + auto indexChanges = PrimaryIndex->StartCompaction(std::move(compactionInfo), {ourdatedStep, 0}); if (!indexChanges) { LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID()); return {}; } - auto actualIndexInfo = TablesManager.GetIndexInfo(); + auto actualIndexInfo = PrimaryIndex->GetIndexInfo(); if (Tiers) { auto pathTiering = Tiers->GetTiering(); // TODO: pathIds actualIndexInfo.UpdatePathTiering(pathTiering); @@ -799,7 +903,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u LOG_S_DEBUG("Do not start TTL while eviction is in progress at tablet " << TabletID()); return {}; } - if (!TablesManager.HasPrimaryIndex()) { + if (!PrimaryIndex) { LOG_S_NOTICE("TTL not started. No index for TTL at tablet " << TabletID()); return {}; } @@ -809,7 +913,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u if (Tiers) { eviction = Tiers->GetTiering(); // TODO: pathIds } - TablesManager.AddTtls(eviction, TInstant::Now(), force); + Ttl.AddTtls(eviction, TInstant::Now(), force); } if (eviction.empty()) { @@ -823,11 +927,11 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u LOG_S_DEBUG("Evicting path " << i.first << " with " << i.second.GetDebugString() << " at tablet " << TabletID()); } - auto actualIndexInfo = TablesManager.GetIndexInfo(); + auto actualIndexInfo = PrimaryIndex->GetIndexInfo(); actualIndexInfo.UpdatePathTiering(eviction); std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges; - indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction); + indexChanges = PrimaryIndex->StartTtl(eviction); actualIndexInfo.SetPathTiering(std::move(eviction)); @@ -836,7 +940,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u return {}; } if (indexChanges->NeedRepeat) { - TablesManager.OnTtlUpdate(); + Ttl.Repeat(); } bool needWrites = !indexChanges->PortionsToEvict.empty(); @@ -851,14 +955,14 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { LOG_S_DEBUG("Cleanup already in progress at tablet " << TabletID()); return {}; } - if (!TablesManager.HasPrimaryIndex()) { + if (!PrimaryIndex) { LOG_S_NOTICE("Cleanup not started. No index for cleanup at tablet " << TabletID()); return {}; } NOlap::TSnapshot cleanupSnapshot{GetMinReadStep(), 0}; - auto changes = TablesManager.StartIndexCleanup(cleanupSnapshot, TLimits::MAX_TX_RECORDS); + auto changes = PrimaryIndex->StartCleanup(cleanupSnapshot, PathsToDrop, TLimits::MAX_TX_RECORDS); if (!changes) { LOG_S_NOTICE("Cannot prepare cleanup at tablet " << TabletID()); return {}; @@ -891,7 +995,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { return {}; } - auto actualIndexInfo = TablesManager.GetIndexInfo(); + auto actualIndexInfo = PrimaryIndex->GetIndexInfo(); #if 0 // No need for now if (Tiers) { ... @@ -905,6 +1009,33 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { return ev; } +NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) { + Y_VERIFY(schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); + + ui32 indexId = 0; + NOlap::TIndexInfo indexInfo("", indexId); + + for (const auto& col : schema.GetColumns()) { + const ui32 id = col.GetId(); + const TString& name = col.GetName(); + auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(), + col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr); + indexInfo.Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod); + indexInfo.ColumnNames[name] = id; + } + + for (const auto& keyName : schema.GetKeyColumnNames()) { + Y_VERIFY(indexInfo.ColumnNames.count(keyName)); + indexInfo.KeyColumns.push_back(indexInfo.ColumnNames[keyName]); + } + + if (schema.HasDefaultCompression()) { + NOlap::TCompression compression = NTiers::ConvertCompression(schema.GetDefaultCompression()); + indexInfo.SetDefaultCompression(compression); + } + + return indexInfo; +} void TColumnShard::MapExternBlobs(const TActorContext& /*ctx*/, NOlap::TReadMetadata& metadata) { if (!metadata.SelectInfo) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 383d51a3592..3283e97440e 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -5,7 +5,6 @@ #include "columnshard_ttl.h" #include "columnshard_private_events.h" #include "blob_manager.h" -#include "tables_manager.h" #include "inflight_request_tracker.h" #include <ydb/core/tablet/tablet_counters.h> @@ -332,8 +331,31 @@ private: } }; - using TSchemaPreset = TSchemaPreset; - using TTableInfo = TTableInfo; + struct TSchemaPreset { + using TSchemaPresetVersionInfo = NKikimrTxColumnShard::TSchemaPresetVersionInfo; + + ui32 Id; + TString Name; + TMap<TRowVersion, TSchemaPresetVersionInfo> Versions; + TRowVersion DropVersion = TRowVersion::Max(); + + bool IsDropped() const { + return DropVersion != TRowVersion::Max(); + } + }; + + struct TTableInfo { + using TTableVersionInfo = NKikimrTxColumnShard::TTableVersionInfo; + + ui64 PathId; + std::map<TRowVersion, TTableVersionInfo> Versions; + TRowVersion DropVersion = TRowVersion::Max(); + TString TieringUsage; + + bool IsDropped() const { + return DropVersion != TRowVersion::Max(); + } + }; struct TLongTxWriteInfo { ui64 WriteId; @@ -342,8 +364,6 @@ private: ui64 PreparedTxId = 0; }; - TTablesManager TablesManager; - ui64 CurrentSchemeShardId = 0; TMessageSeqNo LastSchemaSeqNo; std::optional<NKikimrSubDomains::TProcessingParams> ProcessingParams; @@ -381,7 +401,9 @@ private: TTabletCountersBase* TabletCounters; std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache; std::unique_ptr<NOlap::TInsertTable> InsertTable; + std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex; TBatchCache BatchCache; + TTtl Ttl; THashMap<ui64, TBasicTxInfo> BasicTxInfo; TSet<TDeadlineQueueItem> DeadlineQueue; @@ -391,11 +413,14 @@ private: THashMap<ui64, TInstant> ScanTxInFlight; THashMap<ui64, TAlterMeta> AltersInFlight; THashMap<ui64, TCommitMeta> CommitsInFlight; // key is TxId from propose + THashMap<ui32, TSchemaPreset> SchemaPresets; + THashMap<ui64, TTableInfo> Tables; THashMap<TWriteId, TLongTxWriteInfo> LongTxWrites; using TPartsForLTXShard = THashMap<ui32, TLongTxWriteInfo*>; THashMap<TULID, TPartsForLTXShard> LongTxWritesByUniqueId; TMultiMap<TRowVersion, TEvColumnShard::TEvRead::TPtr> WaitingReads; TMultiMap<TRowVersion, TEvColumnShard::TEvScan::TPtr> WaitingScans; + THashSet<ui64> PathsToDrop; bool ActiveIndexingOrCompaction = false; bool ActiveCleanup = false; bool ActiveTtl = false; @@ -430,7 +455,7 @@ private: } bool IndexOverloaded() const { - return TablesManager.IndexOverloaded(); + return PrimaryIndex && PrimaryIndex->HasOverloadedGranules(); } TWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId); @@ -447,13 +472,22 @@ private: void UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExecutor::TTransactionContext& txc); void ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo& seqNoProto, NTabletFlatExecutor::TTransactionContext& txc); + bool IsTableWritable(ui64 tableId) const; + + ui32 EnsureSchemaPreset(NIceDb::TNiceDb& db, ui32 presetId, const TString& name, + const NKikimrSchemeOp::TColumnTableSchema& schemaProto, const TRowVersion& version); + ui32 EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, const TRowVersion& version); + //ui32 EnsureTtlSettingsPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableTtlSettingsPreset& presetProto, const TRowVersion& version); + void RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); void RunInit(const NKikimrTxColumnShard::TInitShard& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); void RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); void RunAlterTable(const NKikimrTxColumnShard::TAlterTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); void RunDropTable(const NKikimrTxColumnShard::TDropTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); void RunAlterStore(const NKikimrTxColumnShard::TAlterStore& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); + void SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions); + NOlap::TIndexInfo ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema); void MapExternBlobs(const TActorContext& ctx, NOlap::TReadMetadata& metadata); TActorId GetS3ActorForTier(const TString& tierId) const; void ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName, ui64 pathId, diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp deleted file mode 100644 index cbcf0abe8c3..00000000000 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ /dev/null @@ -1,324 +0,0 @@ -#include "tables_manager.h" -#include "columnshard_schema.h" -#include "blob_manager_db.h" -#include "engines/column_engine_logs.h" -#include <ydb/core/scheme/scheme_types_proto.h> -#include <ydb/core/tx/tiering/manager.h> - - - -namespace NKikimr::NColumnShard { - -void TSchemaPreset::Deserialize(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) { - Id = presetProto.GetId(); - Name = presetProto.GetName(); -} - -bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db, const ui64 tabletId) { - TabletId = tabletId; - { - auto rowset = db.Table<Schema::TableInfo>().Select(); - if (!rowset.IsReady()) { - return false; - } - - while (!rowset.EndOfSet()) { - TTableInfo table; - if (!table.InitFromDB(rowset)) { - return false; - } - if (table.IsDropped()) { - PathsToDrop.insert(table.GetPathId()); - } - Tables.insert_or_assign(table.GetPathId(), std::move(table)); - - if (!rowset.Next()) { - return false; - } - } - } - - bool isFakePresetOnly = true; - { - auto rowset = db.Table<Schema::SchemaPresetInfo>().Select(); - if (!rowset.IsReady()) { - return false; - } - - while (!rowset.EndOfSet()) { - TSchemaPreset preset; - preset.InitFromDB(rowset); - - if (preset.IsStandaloneTable()) { - Y_VERIFY_S(!preset.GetName(), "Preset name: " + preset.GetName()); - } else { - Y_VERIFY_S(preset.GetName() == "default", "Preset name: " + preset.GetName()); - isFakePresetOnly = false; - } - SchemaPresets.insert_or_assign(preset.GetId(), preset); - if (!rowset.Next()) { - return false; - } - } - } - - { - auto rowset = db.Table<Schema::TableVersionInfo>().Select(); - if (!rowset.IsReady()) { - return false; - } - - THashMap<ui64, TRowVersion> lastVersion; - while (!rowset.EndOfSet()) { - const ui64 pathId = rowset.GetValue<Schema::TableVersionInfo::PathId>(); - Y_VERIFY(Tables.contains(pathId)); - TRowVersion version( - rowset.GetValue<Schema::TableVersionInfo::SinceStep>(), - rowset.GetValue<Schema::TableVersionInfo::SinceTxId>()); - - auto& table = Tables.at(pathId); - TTableInfo::TTableVersionInfo versionInfo; - Y_VERIFY(versionInfo.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>())); - Y_VERIFY(SchemaPresets.contains(versionInfo.GetSchemaPresetId())); - - if (!table.IsDropped()) { - auto& ttlSettings = versionInfo.GetTtlSettings(); - if (ttlSettings.HasEnabled()) { - if (!lastVersion.contains(pathId) || lastVersion[pathId] < version) { - TTtl::TDescription description(ttlSettings.GetEnabled()); - Ttl.SetPathTtl(pathId, std::move(description)); - lastVersion[pathId] = version; - } - } - } - table.AddVersion(version, versionInfo); - if (!rowset.Next()) { - return false; - } - } - } - - { - auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select(); - if (!rowset.IsReady()) { - return false; - } - - while (!rowset.EndOfSet()) { - const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>(); - Y_VERIFY(SchemaPresets.contains(id)); - auto& preset = SchemaPresets.at(id); - TRowVersion version( - rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(), - rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>()); - - TSchemaPreset::TSchemaPresetVersionInfo info; - Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>())); - preset.AddVersion(version, info); - if (!rowset.Next()) { - return false; - } - } - } - - if (isFakePresetOnly) { - Y_VERIFY(Tables.size() <= 1); - if (!Tables.empty()) { - for (const auto& [version, tableInfo] : Tables.begin()->second.GetVersions()) { - if (tableInfo.HasSchema()) { - IndexSchemaVersion(version, tableInfo.GetSchema()); - } - } - } - } else { - for (const auto& [id, preset] : SchemaPresets) { - Y_VERIFY(!!id); - for (const auto& [version, schemaInfo] : preset.GetVersions()) { - if (schemaInfo.HasSchema()) { - IndexSchemaVersion(version, schemaInfo.GetSchema()); - } - } - } - } - return true; -} - -bool TTablesManager::LoadIndex(NOlap::TDbWrapper& idxDB, THashSet<NOlap::TUnifiedBlobId>& lostEvictions) { - if (PrimaryIndex) { - if (!PrimaryIndex->Load(idxDB, lostEvictions, PathsToDrop)) { - return false; - } - } - return true; -} - -void TTablesManager::Clear() { - Tables.clear(); - SchemaPresets.clear(); - PathsToDrop.clear(); -} - -bool TTablesManager::HasTable(const ui64 pathId) const { - auto it = Tables.find(pathId); - if (it == Tables.end() || it->second.IsDropped()) { - return false; - } - return true; -} - -bool TTablesManager::IsWritableTable(const ui64 pathId) const { - return HasTable(pathId); -} - -bool TTablesManager::HasPreset(const ui32 presetId) const { - return SchemaPresets.contains(presetId); -} - -const TTableInfo& TTablesManager::GetTable(const ui64 pathId) const { - Y_VERIFY(HasTable(pathId)); - return Tables.at(pathId); -} - -ui64 TTablesManager::GetMemoryUsage() const { - ui64 memory = - Tables.size() * sizeof(TTableInfo) + - PathsToDrop.size() * sizeof(ui64) + - Ttl.PathsCount() * sizeof(TTtl::TDescription) + - SchemaPresets.size() * sizeof(TSchemaPreset); - if (PrimaryIndex) { - memory += PrimaryIndex->MemoryUsage(); - } - return memory; -} - -void TTablesManager::OnTtlUpdate() { - Ttl.Repeat(); -} - -void TTablesManager::DropTable(const ui64 pathId, const TRowVersion& version, NIceDb::TNiceDb& db) { - auto& table = Tables.at(pathId); - table.SetDropVersion(version); - PathsToDrop.insert(pathId); - Ttl.DropPathTtl(pathId); - Schema::SaveTableDropVersion(db, pathId, version.Step, version.TxId); -} - -void TTablesManager::DropPreset(const ui32 presetId, const TRowVersion& version, NIceDb::TNiceDb& db) { - auto& preset = SchemaPresets.at(presetId); - Y_VERIFY(preset.GetName() != "default", "Cannot drop the default preset"); - preset.SetDropVersion(version); - Schema::SaveSchemaPresetDropVersion(db, presetId, version); -} - -void TTablesManager::RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db) { - Y_VERIFY(!HasTable(table.GetPathId())); - Y_VERIFY(table.IsEmpty()); - - Schema::SaveTableInfo(db, table.GetPathId(), table.GetTieringUsage()); - Tables.insert_or_assign(table.GetPathId(), std::move(table)); -} - -bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db) { - if (SchemaPresets.contains(schemaPreset.GetId())) { - return false; - } - Schema::SaveSchemaPresetInfo(db, schemaPreset.GetId(), schemaPreset.GetName()); - SchemaPresets.insert_or_assign(schemaPreset.GetId(), schemaPreset); - return true; -} - -void TTablesManager::AddPresetVersion(const ui32 presetId, const TRowVersion& version, const TTableSchema& schema, NIceDb::TNiceDb& db) { - Y_VERIFY(SchemaPresets.contains(presetId)); - TSchemaPreset::TSchemaPresetVersionInfo versionInfo; - versionInfo.SetId(presetId); - versionInfo.SetSinceStep(version.Step); - versionInfo.SetSinceTxId(version.TxId); - *versionInfo.MutableSchema() = schema; - - auto& schemaPreset = SchemaPresets.at(presetId); - Schema::SaveSchemaPresetVersionInfo(db, presetId, version, versionInfo); - schemaPreset.AddVersion(version, versionInfo); - if (presetId != 0 && versionInfo.HasSchema()) { - IndexSchemaVersion(version, versionInfo.GetSchema()); - } -} - -void TTablesManager::AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db) { - auto& table = Tables.at(pathId); - - if (table.IsEmpty()) { - if (versionInfo.HasSchemaPresetId()) { - Y_VERIFY(SchemaPresets.contains(versionInfo.GetSchemaPresetId())); - } else { - Y_VERIFY(SchemaPresets.empty()); - TSchemaPreset preset; - Y_VERIFY(RegisterSchemaPreset(preset, db)); - AddPresetVersion(preset.GetId(), version, versionInfo.GetSchema(), db); - } - } - Y_VERIFY(SchemaPresets.contains(versionInfo.GetSchemaPresetId())); - - if (versionInfo.HasTtlSettings()) { - const auto& ttlSettings = versionInfo.GetTtlSettings(); - if (ttlSettings.HasEnabled()) { - Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlSettings.GetEnabled())); - } else { - Ttl.DropPathTtl(pathId); - } - } - - Schema::SaveTableVersionInfo(db, pathId, version, versionInfo); - table.AddVersion(version, versionInfo); - if (versionInfo.HasSchema()) { - IndexSchemaVersion(version, versionInfo.GetSchema()); - } -} - -void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const TTableSchema& schema) { - NOlap::TSnapshot snapshot{version.Step, version.TxId}; - NOlap::TIndexInfo indexInfo = ConvertSchema(schema); - - if (!PrimaryIndex) { - PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(std::move(indexInfo), TabletId); - } else { - PrimaryIndex->UpdateDefaultSchema(snapshot, std::move(indexInfo)); - } - - for (auto& columnName : Ttl.TtlColumns()) { - PrimaryIndex->GetIndexInfo().CheckTtlColumn(columnName); - } -} - -std::shared_ptr<NOlap::TColumnEngineChanges> TTablesManager::StartIndexCleanup(const NOlap::TSnapshot& snapshot, ui32 maxRecords) { - Y_VERIFY(PrimaryIndex); - return PrimaryIndex->StartCleanup(snapshot, PathsToDrop, maxRecords); -} - -NOlap::TIndexInfo TTablesManager::ConvertSchema(const TTableSchema& schema) { - Y_VERIFY(schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); - - ui32 indexId = 0; - NOlap::TIndexInfo indexInfo("", indexId); - - for (const auto& col : schema.GetColumns()) { - const ui32 id = col.GetId(); - const TString& name = col.GetName(); - auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(), - col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr); - indexInfo.Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod); - indexInfo.ColumnNames[name] = id; - } - - for (const auto& keyName : schema.GetKeyColumnNames()) { - Y_VERIFY(indexInfo.ColumnNames.count(keyName)); - indexInfo.KeyColumns.push_back(indexInfo.ColumnNames[keyName]); - } - - if (schema.HasDefaultCompression()) { - NOlap::TCompression compression = NTiers::ConvertCompression(schema.GetDefaultCompression()); - indexInfo.SetDefaultCompression(compression); - } - - return indexInfo; -} -} diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h deleted file mode 100644 index 423221195b7..00000000000 --- a/ydb/core/tx/columnshard/tables_manager.h +++ /dev/null @@ -1,221 +0,0 @@ -#pragma once - -#include "columnshard_schema.h" -#include "columnshard_ttl.h" -#include "engines/column_engine.h" - -#include "ydb/core/base/row_version.h" -#include "ydb/library/accessor/accessor.h" -#include "ydb/core/protos/tx_columnshard.pb.h" - - -namespace NKikimr::NColumnShard { - -template<class TSchemaProto> -class TVersionedSchema { -protected: - std::optional<TRowVersion> DropVersion; - TMap<TRowVersion, TSchemaProto> Versions; - -public: - bool IsDropped() const { - return DropVersion.has_value(); - } - - bool IsEmpty() const { - return Versions.empty(); - } - - void SetDropVersion(const TRowVersion& version) { - DropVersion = version; - } - - const TMap<TRowVersion, TSchemaProto>& GetVersions() const { - return Versions; - } - - const TSchemaProto& GetVersion(const TRowVersion& version) const { - const TSchemaProto* result = nullptr; - for (auto ver : Versions) { - if (ver.first > version) { - break; - } - result = &ver.second; - } - Y_VERIFY(!!result); - return *result; - } - - void AddVersion(const TRowVersion& version, const TSchemaProto& versionInfo) { - Versions[version] = versionInfo; - } -}; - -using TTableSchema = NKikimrSchemeOp::TColumnTableSchema; - -class TSchemaPreset : public TVersionedSchema<NKikimrTxColumnShard::TSchemaPresetVersionInfo> { -public: - using TSchemaPresetVersionInfo = NKikimrTxColumnShard::TSchemaPresetVersionInfo; - ui32 Id = 0; - TString Name; -public: - bool IsStandaloneTable() const { - return Id == 0; - } - - const TString& GetName() const { - return Name; - } - - ui32 GetId() const { - return Id; - } - - void Deserialize(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto); - - template <class TRow> - bool InitFromDB(const TRow& rowset) { - Id = rowset.template GetValue<Schema::SchemaPresetInfo::Id>(); - if (!IsStandaloneTable()) { - Name = rowset.template GetValue<Schema::SchemaPresetInfo::Name>(); - } - Y_VERIFY(!Id || Name == "default", "Unsupported preset at load time"); - - if (rowset.template HaveValue<Schema::SchemaPresetInfo::DropStep>() && - rowset.template HaveValue<Schema::SchemaPresetInfo::DropTxId>()) - { - DropVersion.emplace(); - DropVersion->Step = rowset.template GetValue<Schema::SchemaPresetInfo::DropStep>(); - DropVersion->TxId = rowset.template GetValue<Schema::SchemaPresetInfo::DropTxId>(); - } - return true; - } -}; - -class TTableInfo : public TVersionedSchema<NKikimrTxColumnShard::TTableVersionInfo> { -public: - using TTableVersionInfo = NKikimrTxColumnShard::TTableVersionInfo; - ui64 PathId; - TString TieringUsage; - -public: - const TString& GetTieringUsage() const { - return TieringUsage; - } - - TTableInfo& SetTieringUsage(const TString& data) { - TieringUsage = data; - return *this; - } - - ui64 GetPathId() const { - return PathId; - } - - TTableInfo() = default; - - TTableInfo(const ui64 pathId) - : PathId(pathId) - {} - - template <class TRow> - bool InitFromDB(const TRow& rowset) { - PathId = rowset.template GetValue<Schema::TableInfo::PathId>(); - TieringUsage = rowset.template GetValue<Schema::TableInfo::TieringUsage>(); - if (rowset.template HaveValue<Schema::TableInfo::DropStep>() && rowset.template HaveValue<Schema::TableInfo::DropTxId>()) { - DropVersion.emplace(); - DropVersion->Step = rowset.template GetValue<Schema::TableInfo::DropStep>(); - DropVersion->TxId = rowset.template GetValue<Schema::TableInfo::DropTxId>(); - } - return true; - } -}; - -class TTablesManager { -private: - THashMap<ui64, TTableInfo> Tables; - THashMap<ui32, TSchemaPreset> SchemaPresets; - THashSet<ui64> PathsToDrop; - TTtl Ttl; - std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex; - ui64 TabletId; -public: - const TTtl& GetTtl() const { - return Ttl; - } - - void AddTtls(THashMap<ui64, NOlap::TTiering>& eviction, TInstant now, bool force) { - Ttl.AddTtls(eviction, now, force); - } - - const THashSet<ui64>& GetPathsToDrop() const { - return PathsToDrop; - } - - const THashMap<ui64, TTableInfo>& GetTables() const { - return Tables; - } - - const THashMap<ui32, TSchemaPreset>& GetSchemaPresets() const { - return SchemaPresets; - } - - bool IndexOverloaded() const { - return PrimaryIndex && PrimaryIndex->HasOverloadedGranules(); - } - - bool HasPrimaryIndex() const { - return !!PrimaryIndex; - } - - NOlap::IColumnEngine& MutablePrimaryIndex() { - Y_VERIFY(!!PrimaryIndex); - return *PrimaryIndex; - } - - const NOlap::TIndexInfo& GetIndexInfo(const NOlap::TSnapshot& version = {}) const { - Y_UNUSED(version); - Y_VERIFY(!!PrimaryIndex); - return PrimaryIndex->GetIndexInfo(); - } - - const std::unique_ptr<NOlap::IColumnEngine>& GetPrimaryIndex() const { - return PrimaryIndex; - } - - const NOlap::IColumnEngine& GetPrimaryIndexSafe() const { - Y_VERIFY(!!PrimaryIndex); - return *PrimaryIndex; - } - - bool InitFromDB(NIceDb::TNiceDb& db, const ui64 tabletId); - bool LoadIndex(NOlap::TDbWrapper& db, THashSet<NOlap::TUnifiedBlobId>& lostEvictions); - - void Clear(); - - const TTableInfo& GetTable(const ui64 pathId) const; - ui64 GetMemoryUsage() const; - - bool HasTable(const ui64 pathId) const; - bool IsWritableTable(const ui64 pathId) const; - bool HasPreset(const ui32 presetId) const; - - void DropTable(const ui64 pathId, const TRowVersion& version, NIceDb::TNiceDb& db); - void DropPreset(const ui32 presetId, const TRowVersion& version, NIceDb::TNiceDb& db); - - void RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db); - bool RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db); - - void AddPresetVersion(const ui32 presetId, const TRowVersion& version, const TTableSchema& schema, NIceDb::TNiceDb& db); - void AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db); - - void OnTtlUpdate(); - - std::shared_ptr<NOlap::TColumnEngineChanges> StartIndexCleanup(const NOlap::TSnapshot& snapshot, ui32 maxRecords); - -private: - void IndexSchemaVersion(const TRowVersion& version, const TTableSchema& schema); - static NOlap::TIndexInfo ConvertSchema(const TTableSchema& schema); -}; - -} |