diff options
author | nsofya <nsofya@ydb.tech> | 2024-01-10 20:44:34 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-10 20:44:34 +0300 |
commit | 27420cd0545f794356b8dc14b1a435837fcdc2fa (patch) | |
tree | 0bf552b26eb265ea37c5560d48fee7866d00df4c | |
parent | e75b8c5550c5f46beebf9ddc68b9b043f48b9c28 (diff) | |
download | ydb-27420cd0545f794356b8dc14b1a435837fcdc2fa.tar.gz |
TRowVersion -> TSnapshot (#921)
-rw-r--r-- | ydb/core/tx/columnshard/columnshard.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__progress_tx.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__read.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__scan.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 25 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 19 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_schema.h | 22 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/tables_manager.cpp | 32 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/tables_manager.h | 31 |
9 files changed, 71 insertions, 74 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index ba53d51fac..8d19c31673 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -258,7 +258,7 @@ ui64 TColumnShard::MemoryUsage() const { CommitsInFlight.size() * sizeof(TCommitMeta) + LongTxWrites.size() * (sizeof(TWriteId) + sizeof(TLongTxWriteInfo)) + LongTxWritesByUniqueId.size() * (sizeof(TULID) + sizeof(void*)) + - (WaitingReads.size() + WaitingScans.size()) * (sizeof(TRowVersion) + sizeof(void*)) + + (WaitingReads.size() + WaitingScans.size()) * (sizeof(NOlap::TSnapshot) + sizeof(void*)) + TabletCounters->Simple()[COUNTER_PREPARED_RECORDS].Get() * sizeof(NOlap::TInsertedData) + TabletCounters->Simple()[COUNTER_COMMITTED_RECORDS].Get() * sizeof(NOlap::TInsertedData); memory += TablesManager.GetMemoryUsage(); diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index 522f6a6301..d8450d124a 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -86,7 +86,7 @@ public: case NKikimrTxColumnShard::TX_KIND_SCHEMA: { auto& meta = Self->AltersInFlight.at(txId); - Self->RunSchemaTx(meta.Body, TRowVersion(step, txId), txc); + Self->RunSchemaTx(meta.Body, NOlap::TSnapshot(step, txId), txc); Self->ProtectSchemaSeqNo(meta.Body.GetSeqNo(), txc); for (TActorId subscriber : meta.NotifySubscribers) { TxEvents.emplace_back(subscriber, 0, diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp index c2816e27e8..ae4455099c 100644 --- a/ydb/core/tx/columnshard/columnshard__read.cpp +++ b/ydb/core/tx/columnshard/columnshard__read.cpp @@ -149,13 +149,13 @@ void TColumnShard::Handle(TEvColumnShard::TEvRead::TPtr& ev, const TActorContext LastAccessTime = TAppData::TimeProvider->Now(); const auto* msg = ev->Get(); - TRowVersion readVersion(msg->Record.GetPlanStep(), msg->Record.GetTxId()); - TRowVersion maxReadVersion = GetMaxReadVersion(); + NOlap::TSnapshot readVersion(msg->Record.GetPlanStep(), msg->Record.GetTxId()); + NOlap::TSnapshot maxReadVersion = GetMaxReadVersion(); LOG_S_DEBUG("Read at tablet " << TabletID() << " version=" << readVersion << " readable=" << maxReadVersion); if (maxReadVersion < readVersion) { WaitingReads.emplace(readVersion, std::move(ev)); - WaitPlanStep(readVersion.Step); + WaitPlanStep(readVersion.GetPlanStep()); return; } diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 43e5d0115d..114881eb4e 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -878,8 +878,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvScan::TPtr& ev, const TActorContext const auto& scanId = record.GetScanId(); const auto& snapshot = record.GetSnapshot(); - TRowVersion readVersion(snapshot.GetStep(), snapshot.GetTxId()); - TRowVersion maxReadVersion = GetMaxReadVersion(); + NOlap::TSnapshot readVersion(snapshot.GetStep(), snapshot.GetTxId()); + NOlap::TSnapshot maxReadVersion = GetMaxReadVersion(); LOG_S_DEBUG("EvScan txId: " << txId << " scanId: " << scanId @@ -889,7 +889,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvScan::TPtr& ev, const TActorContext if (maxReadVersion < readVersion) { WaitingScans.emplace(readVersion, std::move(ev)); - WaitPlanStep(readVersion.Step); + WaitPlanStep(readVersion.GetPlanStep()); return; } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index bd93759cc4..4098d2b00c 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -256,10 +256,10 @@ void TColumnShard::SendWaitPlanStep(ui64 step) { void TColumnShard::RescheduleWaitingReads() { ui64 minWaitingStep = Max<ui64>(); - TRowVersion maxReadVersion = GetMaxReadVersion(); + NOlap::TSnapshot maxReadVersion = GetMaxReadVersion(); for (auto it = WaitingReads.begin(); it != WaitingReads.end();) { if (maxReadVersion < it->first) { - minWaitingStep = Min(minWaitingStep, it->first.Step); + minWaitingStep = Min(minWaitingStep, it->first.GetPlanStep()); break; } TActivationContext::Send(it->second.Release()); @@ -267,7 +267,7 @@ void TColumnShard::RescheduleWaitingReads() { } for (auto it = WaitingScans.begin(); it != WaitingScans.end();) { if (maxReadVersion < it->first) { - minWaitingStep = Min(minWaitingStep, it->first.Step); + minWaitingStep = Min(minWaitingStep, it->first.GetPlanStep()); break; } TActivationContext::Send(it->second.Release()); @@ -278,18 +278,19 @@ void TColumnShard::RescheduleWaitingReads() { } } -TRowVersion TColumnShard::GetMaxReadVersion() const { +NOlap::TSnapshot TColumnShard::GetMaxReadVersion() const { auto plannedTx = ProgressTxController->GetPlannedTx(); if (plannedTx) { // We may only read just before the first transaction in the queue - return TRowVersion(plannedTx->Step, plannedTx->TxId).Prev(); + auto maxReadVersion = TRowVersion(plannedTx->Step, plannedTx->TxId).Prev(); + return NOlap::TSnapshot(maxReadVersion.Step, maxReadVersion.TxId); } ui64 step = LastPlannedStep; if (MediatorTimeCastEntry) { ui64 mediatorStep = MediatorTimeCastEntry->Get(TabletID()); step = Max(step, mediatorStep); } - return TRowVersion(step, Max<ui64>()); + return NOlap::TSnapshot(step, Max<ui64>()); } ui64 TColumnShard::GetOutdatedStep() const { @@ -450,7 +451,7 @@ void TColumnShard::ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo& } } -void TColumnShard::RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version, +void TColumnShard::RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) { switch (body.TxBody_case()) { case NKikimrTxColumnShard::TSchemaTxBody::kInitShard: { @@ -483,7 +484,7 @@ void TColumnShard::RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, Y_ABORT("Unsupported schema tx type"); } -void TColumnShard::RunInit(const NKikimrTxColumnShard::TInitShard& proto, const TRowVersion& version, +void TColumnShard::RunInit(const NKikimrTxColumnShard::TInitShard& proto, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) { Y_UNUSED(version); @@ -504,7 +505,7 @@ void TColumnShard::RunInit(const NKikimrTxColumnShard::TInitShard& proto, const } } -void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tableProto, const TRowVersion& version, +void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tableProto, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); @@ -560,7 +561,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl SetCounter(COUNTER_TABLE_TTLS, TablesManager.GetTtl().PathsCount()); } -void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterProto, const TRowVersion& version, +void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterProto, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); @@ -593,7 +594,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP TablesManager.AddTableVersion(pathId, version, tableVerProto, db); } -void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const TRowVersion& version, +void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); @@ -614,7 +615,7 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt TryAbortWrites(db, dbTable, std::move(writesToAbort)); } -void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, const TRowVersion& version, +void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index e9c4062061..cbd33a169c 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -428,8 +428,8 @@ private: THashMap<TWriteId, TLongTxWriteInfo> LongTxWrites; using TPartsForLTXShard = THashMap<ui32, TLongTxWriteInfo*>; THashMap<TULID, TPartsForLTXShard> LongTxWritesByUniqueId; - TMultiMap<TRowVersion, TEvColumnShard::TEvRead::TPtr> WaitingReads; - TMultiMap<TRowVersion, TEvColumnShard::TEvScan::TPtr> WaitingScans; + TMultiMap<NOlap::TSnapshot, TEvColumnShard::TEvRead::TPtr> WaitingReads; + TMultiMap<NOlap::TSnapshot, TEvColumnShard::TEvScan::TPtr> WaitingScans; TBackgroundController BackgroundController; TSettings Settings; TLimits Limits; @@ -442,7 +442,7 @@ private: bool WaitPlanStep(ui64 step); void SendWaitPlanStep(ui64 step); void RescheduleWaitingReads(); - TRowVersion GetMaxReadVersion() const; + NOlap::TSnapshot GetMaxReadVersion() const; ui64 GetMinReadStep() const; ui64 GetOutdatedStep() const; @@ -465,12 +465,12 @@ private: void UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExecutor::TTransactionContext& txc); void ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo& seqNoProto, NTabletFlatExecutor::TTransactionContext& txc); - void RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); - void RunInit(const NKikimrTxColumnShard::TInitShard& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); - void RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); - void RunAlterTable(const NKikimrTxColumnShard::TAlterTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); - void RunDropTable(const NKikimrTxColumnShard::TDropTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); - void RunAlterStore(const NKikimrTxColumnShard::TAlterStore& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); + void RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc); + void RunInit(const NKikimrTxColumnShard::TInitShard& body, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc); + void RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& body, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc); + void RunAlterTable(const NKikimrTxColumnShard::TAlterTable& body, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc); + void RunDropTable(const NKikimrTxColumnShard::TDropTable& body, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc); + void RunAlterStore(const NKikimrTxColumnShard::TAlterStore& body, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc); void StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dataToIndex, const i64 bytesToIndex); void SetupIndexation(); @@ -493,7 +493,6 @@ private: static TDuration GetControllerPeriodicWakeupActivationPeriod(); static TDuration GetControllerStatsReportInterval(); - public: const std::shared_ptr<NOlap::IStoragesManager>& GetStoragesManager() const { return StoragesManager; diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 1dc2ede706..c55cf2ed57 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -392,23 +392,23 @@ struct Schema : NIceDb::Schema { static void SaveSchemaPresetVersionInfo( NIceDb::TNiceDb& db, - ui64 id, const TRowVersion& version, + ui64 id, const NOlap::TSnapshot& version, const NKikimrTxColumnShard::TSchemaPresetVersionInfo& info) { TString serialized; Y_ABORT_UNLESS(info.SerializeToString(&serialized)); - db.Table<SchemaPresetVersionInfo>().Key(id, version.Step, version.TxId).Update( + db.Table<SchemaPresetVersionInfo>().Key(id, version.GetPlanStep(), version.GetTxId()).Update( NIceDb::TUpdate<SchemaPresetVersionInfo::InfoProto>(serialized)); } - static void SaveSchemaPresetDropVersion(NIceDb::TNiceDb& db, ui64 id, const TRowVersion& dropVersion) { + static void SaveSchemaPresetDropVersion(NIceDb::TNiceDb& db, ui64 id, const NOlap::TSnapshot& dropVersion) { db.Table<SchemaPresetInfo>().Key(id).Update( - NIceDb::TUpdate<SchemaPresetInfo::DropStep>(dropVersion.Step), - NIceDb::TUpdate<SchemaPresetInfo::DropTxId>(dropVersion.TxId)); + NIceDb::TUpdate<SchemaPresetInfo::DropStep>(dropVersion.GetPlanStep()), + NIceDb::TUpdate<SchemaPresetInfo::DropTxId>(dropVersion.GetTxId())); } - static void EraseSchemaPresetVersionInfo(NIceDb::TNiceDb& db, ui64 id, const TRowVersion& version) { - db.Table<SchemaPresetVersionInfo>().Key(id, version.Step, version.TxId).Delete(); + static void EraseSchemaPresetVersionInfo(NIceDb::TNiceDb& db, ui64 id, const NOlap::TSnapshot& version) { + db.Table<SchemaPresetVersionInfo>().Key(id, version.GetPlanStep(), version.GetTxId()).Delete(); } static void EraseSchemaPresetInfo(NIceDb::TNiceDb& db, ui64 id) { @@ -424,12 +424,12 @@ struct Schema : NIceDb::Schema { static void SaveTableVersionInfo( NIceDb::TNiceDb& db, - ui64 pathId, const TRowVersion& version, + ui64 pathId, const NOlap::TSnapshot& version, const NKikimrTxColumnShard::TTableVersionInfo& info) { TString serialized; Y_ABORT_UNLESS(info.SerializeToString(&serialized)); - db.Table<TableVersionInfo>().Key(pathId, version.Step, version.TxId).Update( + db.Table<TableVersionInfo>().Key(pathId, version.GetPlanStep(), version.GetTxId()).Update( NIceDb::TUpdate<TableVersionInfo::InfoProto>(serialized)); } @@ -441,8 +441,8 @@ struct Schema : NIceDb::Schema { NIceDb::TUpdate<TableInfo::DropTxId>(dropTxId)); } - static void EraseTableVersionInfo(NIceDb::TNiceDb& db, ui64 pathId, const TRowVersion& version) { - db.Table<TableVersionInfo>().Key(pathId, version.Step, version.TxId).Delete(); + static void EraseTableVersionInfo(NIceDb::TNiceDb& db, ui64 pathId, const NOlap::TSnapshot& version) { + db.Table<TableVersionInfo>().Key(pathId, version.GetPlanStep(), version.GetTxId()).Delete(); } static void EraseTableInfo(NIceDb::TNiceDb& db, ui64 pathId) { diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index d3bd5489d4..c100b8d348 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -95,13 +95,13 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) { return false; } - THashMap<ui64, TRowVersion> lastVersion; + THashMap<ui64, NOlap::TSnapshot> lastVersion; while (!rowset.EndOfSet()) { const ui64 pathId = rowset.GetValue<Schema::TableVersionInfo::PathId>(); Y_ABORT_UNLESS(Tables.contains(pathId)); - TRowVersion version( + NOlap::TSnapshot version( rowset.GetValue<Schema::TableVersionInfo::SinceStep>(), - rowset.GetValue<Schema::TableVersionInfo::SinceTxId>()); + rowset.GetValue<Schema::TableVersionInfo::SinceTxId>()); auto& table = Tables.at(pathId); TTableInfo::TTableVersionInfo versionInfo; @@ -112,10 +112,11 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) { if (!table.IsDropped()) { auto& ttlSettings = versionInfo.GetTtlSettings(); if (ttlSettings.HasEnabled()) { - if (!lastVersion.contains(pathId) || lastVersion[pathId] < version) { + auto vIt = lastVersion.find(pathId); + if (vIt == lastVersion.end() || vIt->second < version) { TTtl::TDescription description(ttlSettings.GetEnabled()); Ttl.SetPathTtl(pathId, std::move(description)); - lastVersion[pathId] = version; + lastVersion.emplace(pathId, version); } } } @@ -136,7 +137,7 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) { const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>(); Y_ABORT_UNLESS(SchemaPresets.contains(id)); auto& preset = SchemaPresets.at(id); - TRowVersion version( + NOlap::TSnapshot version( rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(), rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>()); @@ -208,7 +209,7 @@ ui64 TTablesManager::GetMemoryUsage() const { return memory; } -void TTablesManager::DropTable(const ui64 pathId, const TRowVersion& version, NIceDb::TNiceDb& db) { +void TTablesManager::DropTable(const ui64 pathId, const NOlap::TSnapshot& version, NIceDb::TNiceDb& db) { auto& table = Tables.at(pathId); table.SetDropVersion(version); PathsToDrop.insert(pathId); @@ -216,10 +217,10 @@ void TTablesManager::DropTable(const ui64 pathId, const TRowVersion& version, NI if (PrimaryIndex) { PrimaryIndex->OnTieringModified(nullptr, Ttl); } - Schema::SaveTableDropVersion(db, pathId, version.Step, version.TxId); + Schema::SaveTableDropVersion(db, pathId, version.GetPlanStep(), version.GetTxId()); } -void TTablesManager::DropPreset(const ui32 presetId, const TRowVersion& version, NIceDb::TNiceDb& db) { +void TTablesManager::DropPreset(const ui32 presetId, const NOlap::TSnapshot& version, NIceDb::TNiceDb& db) { auto& preset = SchemaPresets.at(presetId); Y_ABORT_UNLESS(preset.GetName() != "default", "Cannot drop the default preset"); preset.SetDropVersion(version); @@ -247,14 +248,14 @@ bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIc return true; } -void TTablesManager::AddSchemaVersion(const ui32 presetId, const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db) { +void TTablesManager::AddSchemaVersion(const ui32 presetId, const NOlap::TSnapshot& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db) { Y_ABORT_UNLESS(SchemaPresets.contains(presetId)); auto preset = SchemaPresets.at(presetId); TSchemaPreset::TSchemaPresetVersionInfo versionInfo; versionInfo.SetId(presetId); - versionInfo.SetSinceStep(version.Step); - versionInfo.SetSinceTxId(version.TxId); + versionInfo.SetSinceStep(version.GetPlanStep()); + versionInfo.SetSinceTxId(version.GetTxId()); *versionInfo.MutableSchema() = schema; auto& schemaPreset = SchemaPresets.at(presetId); @@ -268,7 +269,7 @@ void TTablesManager::AddSchemaVersion(const ui32 presetId, const TRowVersion& ve } } -void TTablesManager::AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db) { +void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db) { auto& table = Tables.at(pathId); if (versionInfo.HasSchemaPresetId()) { @@ -300,8 +301,7 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const TRowVersion& versi table.AddVersion(version, versionInfo); } -void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema) { - NOlap::TSnapshot snapshot{version.Step, version.TxId}; +void TTablesManager::IndexSchemaVersion(const NOlap::TSnapshot& snapshot, const NKikimrSchemeOp::TColumnTableSchema& schema) { NOlap::TIndexInfo indexInfo = DeserializeIndexInfoFromProto(schema); indexInfo.SetAllKeys(); const bool isFirstPrimaryIndexInitialization = !PrimaryIndex; @@ -341,7 +341,7 @@ bool TTablesManager::TryFinalizeDropPath(NTabletFlatExecutor::TTransactionContex NIceDb::TNiceDb db(txc.DB); NColumnShard::Schema::EraseTableInfo(db, pathId); const auto& table = Tables.find(pathId); - Y_ABORT_UNLESS(table != Tables.end(), "No schema for path %lu", pathId); + Y_ABORT_UNLESS(table != Tables.end(), "No schema for path %lu", pathId); for (auto&& tableVersion : table->second.GetVersions()) { NColumnShard::Schema::EraseTableVersionInfo(db, pathId, tableVersion.first); } diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index 33e6806a13..725bf197c2 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -16,8 +16,8 @@ namespace NKikimr::NColumnShard { template<class TSchemaProto> class TVersionedSchema { protected: - std::optional<TRowVersion> DropVersion; - TMap<TRowVersion, TSchemaProto> Versions; + std::optional<NOlap::TSnapshot> DropVersion; + TMap<NOlap::TSnapshot, TSchemaProto> Versions; public: bool IsDropped() const { @@ -28,15 +28,15 @@ public: return Versions.empty(); } - void SetDropVersion(const TRowVersion& version) { + void SetDropVersion(const NOlap::TSnapshot& version) { DropVersion = version; } - const TMap<TRowVersion, TSchemaProto>& GetVersions() const { + const TMap<NOlap::TSnapshot, TSchemaProto>& GetVersions() const { return Versions; } - const TSchemaProto& GetVersion(const TRowVersion& version) const { + const TSchemaProto& GetVersion(const NOlap::TSnapshot& version) const { const TSchemaProto* result = nullptr; for (auto ver : Versions) { if (ver.first > version) { @@ -48,7 +48,7 @@ public: return *result; } - void AddVersion(const TRowVersion& version, const TSchemaProto& versionInfo) { + void AddVersion(const NOlap::TSnapshot& version, const TSchemaProto& versionInfo) { Versions[version] = versionInfo; } }; @@ -84,9 +84,8 @@ public: if (rowset.template HaveValue<Schema::SchemaPresetInfo::DropStep>() && rowset.template HaveValue<Schema::SchemaPresetInfo::DropTxId>()) { - DropVersion.emplace(); - DropVersion->Step = rowset.template GetValue<Schema::SchemaPresetInfo::DropStep>(); - DropVersion->TxId = rowset.template GetValue<Schema::SchemaPresetInfo::DropTxId>(); + DropVersion.emplace(rowset.template GetValue<Schema::SchemaPresetInfo::DropStep>(), + rowset.template GetValue<Schema::SchemaPresetInfo::DropTxId>()); } return true; } @@ -123,9 +122,7 @@ public: PathId = rowset.template GetValue<Schema::TableInfo::PathId>(); TieringUsage = rowset.template GetValue<Schema::TableInfo::TieringUsage>(); if (rowset.template HaveValue<Schema::TableInfo::DropStep>() && rowset.template HaveValue<Schema::TableInfo::DropTxId>()) { - DropVersion.emplace(); - DropVersion->Step = rowset.template GetValue<Schema::TableInfo::DropStep>(); - DropVersion->TxId = rowset.template GetValue<Schema::TableInfo::DropTxId>(); + DropVersion.emplace(rowset.template GetValue<Schema::TableInfo::DropStep>(), rowset.template GetValue<Schema::TableInfo::DropTxId>()); } return true; } @@ -202,17 +199,17 @@ public: bool IsReadyForWrite(const ui64 pathId) const; bool HasPreset(const ui32 presetId) const; - void DropTable(const ui64 pathId, const TRowVersion& version, NIceDb::TNiceDb& db); - void DropPreset(const ui32 presetId, const TRowVersion& version, NIceDb::TNiceDb& db); + void DropTable(const ui64 pathId, const NOlap::TSnapshot& version, NIceDb::TNiceDb& db); + void DropPreset(const ui32 presetId, const NOlap::TSnapshot& version, NIceDb::TNiceDb& db); void RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db); bool RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db); - void AddSchemaVersion(const ui32 presetId, const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db); - void AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db); + void AddSchemaVersion(const ui32 presetId, const NOlap::TSnapshot& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db); + void AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db); bool FillMonitoringReport(NTabletFlatExecutor::TTransactionContext& txc, NJson::TJsonValue& json); private: - void IndexSchemaVersion(const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema); + void IndexSchemaVersion(const NOlap::TSnapshot& version, const NKikimrSchemeOp::TColumnTableSchema& schema); static NOlap::TIndexInfo DeserializeIndexInfoFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema); }; |