diff options
author | nsofya <nsofya@yandex-team.com> | 2023-05-08 17:17:18 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-05-08 17:17:18 +0300 |
commit | 4a5cd6c92018036c598ae098563bef861bcc5316 (patch) | |
tree | 16b99632c2da272b3e1f4348480c639c451fb6bc | |
parent | 8107c88ce7480dfc685a7227eb3b44c9e9f666b1 (diff) | |
download | ydb-4a5cd6c92018036c598ae098563bef861bcc5316.tar.gz |
Incapsulate data in snapshot object
Accurate snapshot usage
33 files changed, 210 insertions, 204 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 178384bb20..8150db60b7 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -341,7 +341,7 @@ void TColumnShard::SendPeriodicStats() { // TODO: we need row/dataSize counters for evicted data (managed by tablet but stored outside) //tabletStats->SetIndexSize(); // TODO: calc size of internal tables tabletStats->SetLastAccessTime(LastAccessTime.MilliSeconds()); - tabletStats->SetLastUpdateTime(lastIndexUpdate.PlanStep); + tabletStats->SetLastUpdateTime(lastIndexUpdate.GetPlanStep()); } } diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index 7c63b8cb53..411cd360f2 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -19,16 +19,16 @@ TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstP IndexedData.InitRead(batchNo); // Add cached batches without read for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) { - auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, 0, 0 }); + auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, NOlap::TSnapshot::Zero() }); Y_VERIFY(!cmt.empty()); const NOlap::TCommittedBlob& cmtBlob = cmt.key(); ui32 batchNo = cmt.mapped(); - IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.PlanStep, cmtBlob.TxId); + IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.GetSnapshot()); } // Read all remained committed blobs for (const auto& [cmtBlob, _] : WaitCommitted) { - auto& blobId = cmtBlob.BlobId; + auto& blobId = cmtBlob.GetBlobId(); FetchBlobsQueue.emplace_front(TBlobRange(blobId, 0, blobId.BlobSize())); } @@ -44,11 +44,11 @@ void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data if (IndexedData.IsIndexedBlob(blobRange)) { IndexedData.AddIndexed(blobRange, data); } else { - auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, 0, 0 }); + auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, NOlap::TSnapshot::Zero() }); Y_VERIFY(!cmt.empty()); const NOlap::TCommittedBlob& cmtBlob = cmt.key(); ui32 batchNo = cmt.mapped(); - IndexedData.AddNotIndexed(batchNo, data, cmtBlob.PlanStep, cmtBlob.TxId); + IndexedData.AddNotIndexed(batchNo, data, cmtBlob.GetSnapshot()); } } diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp index 92d1295cb3..bd7c9894ab 100644 --- a/ydb/core/tx/columnshard/columnshard__read.cpp +++ b/ydb/core/tx/columnshard/columnshard__read.cpp @@ -47,13 +47,11 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) { txc.DB.NoMoreReadsForTx(); auto& record = Proto(Ev->Get()); - const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo(NOlap::TSnapshot().SetPlanStep(record.GetPlanStep()).SetTxId(record.GetTxId())); + const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo(NOlap::TSnapshot(record.GetPlanStep(), record.GetTxId())); ui64 metaShard = record.GetTxInitiator(); - NOlap::TReadDescription read(false); - read.PlanStep = record.GetPlanStep(); - read.TxId = record.GetTxId(); + NOlap::TReadDescription read(NOlap::TSnapshot(record.GetPlanStep(), record.GetTxId()), false); read.PathId = record.GetTableId(); read.ReadNothing = !(Self->TablesManager.HasTable(read.PathId)); read.ColumnIds = ProtoToVector<ui32>(record.GetColumnIds()); @@ -97,7 +95,7 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) { } Result = std::make_unique<TEvColumnShard::TEvReadResult>( - Self->TabletID(), metaShard, read.PlanStep, read.TxId, read.PathId, 0, true, status); + Self->TabletID(), metaShard, read.GetSnapshot().GetPlanStep(), read.GetSnapshot().GetTxId(), read.PathId, 0, true, status); if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) { Self->IncCounter(COUNTER_READ_SUCCESS); diff --git a/ydb/core/tx/columnshard/columnshard__read_base.cpp b/ydb/core/tx/columnshard/columnshard__read_base.cpp index 550144ad25..bf21cb1419 100644 --- a/ydb/core/tx/columnshard/columnshard__read_base.cpp +++ b/ydb/core/tx/columnshard/columnshard__read_base.cpp @@ -15,20 +15,19 @@ TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const NOlap::TReadDes Y_UNUSED(ctx); if (!insertTable || !index) { - return {}; + return nullptr; } - if (read.PlanStep < Self->GetMinReadStep()) { - error = Sprintf("Snapshot %" PRIu64 ":%" PRIu64 " too old", read.PlanStep, read.TxId); - return {}; + if (read.GetSnapshot().GetPlanStep() < Self->GetMinReadStep()) { + error = TStringBuilder() << "Snapshot too old: " << read.GetSnapshot(); + return nullptr; } NOlap::TDataStorageAccessor dataAccessor(insertTable, index, batchCache); - auto readSnapshot = NOlap::TSnapshot().SetPlanStep(read.PlanStep).SetTxId(read.TxId); - auto readMetadata = std::make_shared<NOlap::TReadMetadata>(index->GetVersionedIndex(), readSnapshot, isReverse ? NOlap::TReadMetadata::ESorting::DESC : NOlap::TReadMetadata::ESorting::ASC); + auto readMetadata = std::make_shared<NOlap::TReadMetadata>(index->GetVersionedIndex(), read.GetSnapshot(), isReverse ? NOlap::TReadMetadata::ESorting::DESC : NOlap::TReadMetadata::ESorting::ASC); if (!readMetadata->Init(read, dataAccessor, error)) { - return {}; + return nullptr; } return readMetadata; } diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 279587bbda..4b30d7a5ff 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -842,9 +842,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { ui64 itemsLimit = record.HasItemsLimit() ? record.GetItemsLimit() : 0; - NOlap::TReadDescription read(record.GetReverse()); - read.PlanStep = snapshot.GetStep(); - read.TxId = snapshot.GetTxId(); + NOlap::TReadDescription read(NOlap::TSnapshot(snapshot.GetStep(), snapshot.GetTxId()), record.GetReverse()); read.PathId = record.GetLocalPathId(); read.ReadNothing = !(Self->TablesManager.HasTable(read.PathId)); read.TableName = record.GetTablePath(); @@ -854,7 +852,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { const NOlap::TIndexInfo* indexInfo = nullptr; if (!isIndexStats) { - indexInfo = &(Self->TablesManager.GetIndexInfo(NOlap::TSnapshot().SetPlanStep(snapshot.GetStep()).SetTxId(snapshot.GetTxId()))); + indexInfo = &(Self->TablesManager.GetIndexInfo(NOlap::TSnapshot(snapshot.GetStep(), snapshot.GetTxId()))); } // TODO: move this to CreateReadMetadata? diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index a9bffb34d7..19d7195fb8 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -61,7 +61,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) if (Ev->Get()->PutStatus == NKikimrProto::OK) { NOlap::TSnapshot snapshot = changes->ApplySnapshot; if (snapshot.IsZero()) { - snapshot = {Self->LastPlannedStep, Self->LastPlannedTxId}; + snapshot = NOlap::TSnapshot(Self->LastPlannedStep, Self->LastPlannedTxId); } TBlobGroupSelector dsGroupSelector(Self->Info()); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 2a8d2c77fa..271779dedf 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -778,7 +778,7 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() { LOG_S_DEBUG("Prepare " << *compactionInfo << " at tablet " << TabletID()); ui64 ourdatedStep = GetOutdatedStep(); - auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(std::move(compactionInfo), {ourdatedStep, 0}); + auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(std::move(compactionInfo), NOlap::TSnapshot(ourdatedStep, 0)); if (!indexChanges) { LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID()); return {}; diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 5e5d70ca01..d81b47a627 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -505,8 +505,8 @@ struct Schema : NIceDb::Schema { const TGranuleRecord& row) { db.Table<IndexGranules>().Key(index, row.PathId, engine.SerializeMark(row.Mark)).Update( NIceDb::TUpdate<IndexGranules::Granule>(row.Granule), - NIceDb::TUpdate<IndexGranules::PlanStep>(row.CreatedAt.PlanStep), - NIceDb::TUpdate<IndexGranules::TxId>(row.CreatedAt.TxId) + NIceDb::TUpdate<IndexGranules::PlanStep>(row.GetCreatedAt().GetPlanStep()), + NIceDb::TUpdate<IndexGranules::TxId>(row.GetCreatedAt().GetTxId()) ); } @@ -528,7 +528,7 @@ struct Schema : NIceDb::Schema { ui64 planStep = rowset.GetValue<IndexGranules::PlanStep>(); ui64 txId = rowset.GetValue<IndexGranules::TxId>(); - callback(TGranuleRecord(pathId, granule, {planStep, txId}, engine.DeserializeMark(indexKey))); + callback(TGranuleRecord(pathId, granule, NOlap::TSnapshot(planStep, txId), engine.DeserializeMark(indexKey))); if (!rowset.Next()) return false; diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index 26e1415199..3333820f3a 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -50,27 +50,27 @@ void ProvideTieringSnapshot(TTestBasicRuntime& runtime, TActorId& sender, NMetad bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap) { auto event = std::make_unique<TEvColumnShard::TEvProposeTransaction>( - NKikimrTxColumnShard::TX_KIND_SCHEMA, 0, sender, snap.TxId, txBody); + NKikimrTxColumnShard::TX_KIND_SCHEMA, 0, sender, snap.GetTxId(), txBody); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, event.release()); auto ev = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(sender); const auto& res = ev->Get()->Record; - UNIT_ASSERT_EQUAL(res.GetTxId(), snap.TxId); + UNIT_ASSERT_EQUAL(res.GetTxId(), snap.GetTxId()); UNIT_ASSERT_EQUAL(res.GetTxKind(), NKikimrTxColumnShard::TX_KIND_SCHEMA); return (res.GetStatus() == NKikimrTxColumnShard::PREPARED); } void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap) { - auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(snap.PlanStep, 0, TTestTxConfig::TxTablet0); + auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(snap.GetPlanStep(), 0, TTestTxConfig::TxTablet0); auto tx = plan->Record.AddTransactions(); - tx->SetTxId(snap.TxId); + tx->SetTxId(snap.GetTxId()); ActorIdToProto(sender, tx->MutableAckTo()); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, plan.release()); UNIT_ASSERT(runtime.GrabEdgeEvent<TEvTxProcessing::TEvPlanStepAck>(sender)); auto ev = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(sender); const auto& res = ev->Get()->Record; - UNIT_ASSERT_EQUAL(res.GetTxId(), snap.TxId); + UNIT_ASSERT_EQUAL(res.GetTxId(), snap.GetTxId()); UNIT_ASSERT_EQUAL(res.GetTxKind(), NKikimrTxColumnShard::TX_KIND_SCHEMA); UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::SUCCESS); } @@ -120,7 +120,7 @@ void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const TVector< auto scan = std::make_unique<TEvColumnShard::TEvScan>(); auto& record = scan->Record; - record.SetTxId(snap.PlanStep); + record.SetTxId(snap.GetPlanStep()); record.SetScanId(scanId); // record.SetLocalPathId(0); record.SetTablePath(NOlap::TIndexInfo::STORE_INDEX_STATS_TABLE); @@ -146,8 +146,8 @@ void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const TVector< range.Serialize(*newRange); } - record.MutableSnapshot()->SetStep(snap.PlanStep); - record.MutableSnapshot()->SetTxId(snap.TxId); + record.MutableSnapshot()->SetStep(snap.GetPlanStep()); + record.MutableSnapshot()->SetTxId(snap.GetTxId()); record.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::ARROW); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, scan.release()); diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 9d1eb13e3d..7a8f1f6e91 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -132,8 +132,8 @@ public: EType Type{UNSPECIFIED}; TCompactionLimits Limits; - TSnapshot InitSnapshot; - TSnapshot ApplySnapshot; + TSnapshot InitSnapshot = TSnapshot::Zero(); + TSnapshot ApplySnapshot = TSnapshot::Zero(); std::unique_ptr<TCompactionInfo> CompactionInfo; TVector<NOlap::TInsertedData> DataToIndex; TVector<TPortionInfo> SwitchedPortions; // Portions that would be replaced by new ones @@ -399,7 +399,7 @@ public: virtual const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const = 0; virtual const TColumnEngineStats& GetTotalStats() = 0; virtual ui64 MemoryUsage() const { return 0; } - virtual TSnapshot LastUpdate() const { return {}; } + virtual TSnapshot LastUpdate() const { return TSnapshot::Zero(); } }; } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 2b1cee1579..654504c61c 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -15,7 +15,7 @@ namespace { bool InitInGranuleMerge(const TMark& granuleMark, TVector<TPortionInfo>& portions, const TCompactionLimits& limits, const TSnapshot& snap, TColumnEngineForLogs::TMarksGranules& marksGranules) { - ui64 oldTimePlanStep = snap.PlanStep - TDuration::Seconds(limits.InGranuleCompactSeconds).MilliSeconds(); + ui64 oldTimePlanStep = snap.GetPlanStep() - TDuration::Seconds(limits.InGranuleCompactSeconds).MilliSeconds(); ui32 insertedCount = 0; ui32 insertedNew = 0; @@ -28,7 +28,7 @@ bool InitInGranuleMerge(const TMark& granuleMark, TVector<TPortionInfo>& portion for (const auto& portionInfo : portions) { if (portionInfo.IsInserted()) { ++insertedCount; - if (portionInfo.Snapshot().PlanStep > oldTimePlanStep) { + if (portionInfo.Snapshot().GetPlanStep() > oldTimePlanStep) { ++insertedNew; } } else if (portionInfo.BlobsSizes().second >= limits.GoodBlobSize) { @@ -453,10 +453,10 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) { LastGranule = value; break; case LAST_PLAN_STEP: - LastSnapshot.PlanStep = value; + LastSnapshot = TSnapshot(value, LastSnapshot.GetTxId()); break; case LAST_TX_ID: - LastSnapshot.TxId = value; + LastSnapshot = TSnapshot(LastSnapshot.GetPlanStep(), value); break; } }; @@ -649,7 +649,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash return {}; } - TSnapshot fakeSnapshot = {1, 1}; // TODO: better snapshot + TSnapshot fakeSnapshot(1, 1); // TODO: better snapshot auto changes = std::make_shared<TChanges>(*this, TColumnEngineChanges::TTL, fakeSnapshot); ui64 evicttionSize = 0; bool allowEviction = true; @@ -1082,8 +1082,8 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, if (LastSnapshot < snapshot) { LastSnapshot = snapshot; - CountersTable->Write(db, LAST_PLAN_STEP, LastSnapshot.PlanStep); - CountersTable->Write(db, LAST_TX_ID, LastSnapshot.TxId); + CountersTable->Write(db, LAST_PLAN_STEP, LastSnapshot.GetPlanStep()); + CountersTable->Write(db, LAST_TX_ID, LastSnapshot.GetTxId()); } } return true; @@ -1222,7 +1222,7 @@ TMap<TSnapshot, TVector<ui64>> TColumnEngineForLogs::GetOrderedPortions(ui64 gra TSnapshot recXSnapshot = portionInfo.XSnapshot(); bool visible = (recSnapshot <= snapshot); - if (recXSnapshot.PlanStep) { + if (recXSnapshot.GetPlanStep()) { visible = visible && snapshot < recXSnapshot; } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index b71585a8b8..352206ed7b 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -257,7 +257,7 @@ private: TColumnEngineStats Counters; ui64 LastPortion; ui64 LastGranule; - TSnapshot LastSnapshot; + TSnapshot LastSnapshot = TSnapshot::Zero(); void ClearIndex() { Granules.clear(); @@ -272,7 +272,7 @@ private: LastPortion = 0; LastGranule = 0; - LastSnapshot = {}; + LastSnapshot = TSnapshot::Zero(); } bool LoadGranules(IDbWrapper& db); diff --git a/ydb/core/tx/columnshard/engines/columns_table.h b/ydb/core/tx/columnshard/engines/columns_table.h index 951511d9ea..b98a57c0c4 100644 --- a/ydb/core/tx/columnshard/engines/columns_table.h +++ b/ydb/core/tx/columnshard/engines/columns_table.h @@ -50,22 +50,22 @@ struct TColumnRecord { void SetSnapshot(const TSnapshot& snap) { Y_VERIFY(snap.Valid()); - PlanStep = snap.PlanStep; - TxId = snap.TxId; + PlanStep = snap.GetPlanStep(); + TxId = snap.GetTxId(); } void SetXSnapshot(const TSnapshot& snap) { Y_VERIFY(snap.Valid()); - XPlanStep = snap.PlanStep; - XTxId = snap.TxId; + XPlanStep = snap.GetPlanStep(); + XTxId = snap.GetTxId(); } static TColumnRecord Make(ui64 granule, ui32 columnId, const TSnapshot& minSnapshot, ui64 portion, ui16 chunk = 0) { TColumnRecord row; row.Granule = granule; row.ColumnId = columnId; - row.PlanStep = minSnapshot.PlanStep; - row.TxId = minSnapshot.TxId; + row.PlanStep = minSnapshot.GetPlanStep(); + row.TxId = minSnapshot.GetTxId(); row.Portion = portion; row.Chunk = chunk; //row.BlobId diff --git a/ydb/core/tx/columnshard/engines/defs.h b/ydb/core/tx/columnshard/engines/defs.h index 575b60a259..c45eeef0ff 100644 --- a/ydb/core/tx/columnshard/engines/defs.h +++ b/ydb/core/tx/columnshard/engines/defs.h @@ -12,9 +12,15 @@ enum class TWriteId : ui64 {}; inline TWriteId operator ++(TWriteId& w) noexcept { w = TWriteId{ui64(w) + 1}; return w; } -struct TSnapshot { - ui64 PlanStep{0}; - ui64 TxId{0}; +class TSnapshot { +private: + ui64 PlanStep = 0; + ui64 TxId = 0; +public: + constexpr explicit TSnapshot(const ui64 planStep, const ui64 txId) + : PlanStep(planStep) + , TxId(txId) + {} ui64 GetPlanStep() const { return PlanStep; @@ -24,43 +30,29 @@ struct TSnapshot { return TxId; } - TSnapshot& SetPlanStep(const ui64 value) { - PlanStep = value; - return *this; - } - - TSnapshot& SetTxId(const ui64 value) { - TxId = value; - return *this; - } - - constexpr bool IsZero() const noexcept { + bool IsZero() const noexcept { return PlanStep == 0 && TxId == 0; } - constexpr bool Valid() const noexcept { + bool Valid() const noexcept { return PlanStep && TxId; } - constexpr auto operator <=> (const TSnapshot&) const noexcept = default; + auto operator <=> (const TSnapshot&) const noexcept = default; + static constexpr TSnapshot Zero() noexcept { + return TSnapshot(0, 0); + } + static constexpr TSnapshot Max() noexcept { - return TSnapshot().SetPlanStep(-1ll).SetTxId(-1ll); + return TSnapshot(-1ll, -1ll); } friend IOutputStream& operator << (IOutputStream& out, const TSnapshot& s) { - return out << "{" << s.PlanStep << "," << s.TxId << "}"; + return out << "{" << s.PlanStep << ':' << (s.TxId == std::numeric_limits<ui64>::max() ? "max" : ToString(s.TxId)) << "}"; } }; -inline constexpr bool SnapLess(ui64 planStep1, ui64 txId1, ui64 planStep2, ui64 txId2) noexcept { - return std::less<TSnapshot>()(TSnapshot{planStep1, txId1}, TSnapshot{planStep2, txId2}); -} - -inline constexpr bool SnapLessOrEqual(ui64 planStep1, ui64 txId1, ui64 planStep2, ui64 txId2) noexcept { - return std::less_equal<TSnapshot>()(TSnapshot{planStep1, txId1}, TSnapshot{planStep2, txId2}); -} - class IBlobGroupSelector { protected: diff --git a/ydb/core/tx/columnshard/engines/filter.cpp b/ydb/core/tx/columnshard/engines/filter.cpp index 2e04d3defc..2285dcd3e4 100644 --- a/ydb/core/tx/columnshard/engines/filter.cpp +++ b/ydb/core/tx/columnshard/engines/filter.cpp @@ -26,7 +26,7 @@ public: } bool operator[](const ui32 idx) const { - return SnapLessOrEqual(RawSteps[idx], RawIds[idx], Snapshot.GetPlanStep(), Snapshot.GetTxId()); + return std::less_equal<TSnapshot>()(TSnapshot(RawSteps[idx], RawIds[idx]), Snapshot); } }; diff --git a/ydb/core/tx/columnshard/engines/granules_table.h b/ydb/core/tx/columnshard/engines/granules_table.h index e69cd1e57b..847c9dc4c3 100644 --- a/ydb/core/tx/columnshard/engines/granules_table.h +++ b/ydb/core/tx/columnshard/engines/granules_table.h @@ -5,20 +5,26 @@ namespace NKikimr::NOlap { struct TGranuleRecord { +private: + TSnapshot CreatedAt; +public: ui64 PathId; ui64 Granule; - TSnapshot CreatedAt; NArrow::TReplaceKey Mark; TGranuleRecord(ui64 pathId, ui64 granule, const TSnapshot& createdAt, const NArrow::TReplaceKey& mark) - : PathId(pathId) + : CreatedAt(createdAt) + , PathId(pathId) , Granule(granule) - , CreatedAt(createdAt) , Mark(mark) { Y_VERIFY(Mark.Size()); } + const TSnapshot& GetCreatedAt() const { + return CreatedAt; + } + bool operator == (const TGranuleRecord& rec) const { return (PathId == rec.PathId) && (Mark == rec.Mark); } @@ -27,7 +33,7 @@ struct TGranuleRecord { out << '{'; auto& snap = rec.CreatedAt; out << rec.PathId << '#' << rec.Granule << ' ' - << snap.PlanStep << ':' << (snap.TxId == Max<ui64>() ? "max" : ToString(snap.TxId)); + << snap; out << '}'; return out; } diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp index 299fd63c45..76fc2c9110 100644 --- a/ydb/core/tx/columnshard/engines/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/index_info.cpp @@ -26,20 +26,16 @@ TIndexInfo::TIndexInfo(const TString& name, ui32 id) , Name(name) {} -std::shared_ptr<arrow::RecordBatch> TIndexInfo::AddSpecialColumns( - const std::shared_ptr<arrow::RecordBatch>& batch, - const ui64 planStep, - const ui64 txId) -{ +std::shared_ptr<arrow::RecordBatch> TIndexInfo::AddSpecialColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) { Y_VERIFY(batch); i64 numColumns = batch->num_columns(); i64 numRows = batch->num_rows(); auto res = batch->AddColumn(numColumns, arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()), - NArrow::MakeUI64Array(planStep, numRows)); + NArrow::MakeUI64Array(snapshot.GetPlanStep(), numRows)); Y_VERIFY(res.ok()); res = (*res)->AddColumn(numColumns + 1, arrow::field(SPEC_COL_TX_ID, arrow::uint64()), - NArrow::MakeUI64Array(txId, numRows)); + NArrow::MakeUI64Array(snapshot.GetTxId(), numRows)); Y_VERIFY(res.ok()); Y_VERIFY((*res)->num_columns() == numColumns + 2); return *res; diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index ec28d2dc63..806d39bb09 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -40,8 +40,7 @@ public: /// Appends the special columns to the batch. static std::shared_ptr<arrow::RecordBatch> AddSpecialColumns( const std::shared_ptr<arrow::RecordBatch>& batch, - const ui64 platStep, - const ui64 txId); + const TSnapshot& snapshot); /// Makes schema as set of the special columns. static std::shared_ptr<arrow::Schema> ArrowSchemaSnapshot(); diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp index eed53c1896..6522d25221 100644 --- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp +++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp @@ -36,7 +36,7 @@ arrow::ipc::IpcWriteOptions WriteOptions(const TCompression& compression) { std::shared_ptr<arrow::RecordBatch> TIndexationLogic::AddSpecials(const std::shared_ptr<arrow::RecordBatch>& srcBatch, const TIndexInfo& indexInfo, const TInsertedData& inserted) const { - auto batch = TIndexInfo::AddSpecialColumns(srcBatch, inserted.PlanStep(), inserted.TxId()); + auto batch = TIndexInfo::AddSpecialColumns(srcBatch, inserted.GetSnapshot()); Y_VERIFY(batch); return NArrow::ExtractColumns(batch, indexInfo.ArrowSchemaWithSpecials()); @@ -229,7 +229,7 @@ TVector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChanges> i TSnapshot& minSnapshot = changes->ApplySnapshot; THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches; for (auto& inserted : changes->DataToIndex) { - TSnapshot insertSnap{inserted.PlanStep(), inserted.TxId()}; + TSnapshot insertSnap = inserted.GetSnapshot(); Y_VERIFY(insertSnap.Valid()); if (minSnapshot.IsZero() || insertSnap <= minSnapshot) { minSnapshot = insertSnap; @@ -325,13 +325,13 @@ TVector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TColumnEngin if (!slice || slice->num_rows() == 0) { continue; } - auto tmp = MakeAppendedPortions(pathId, slice, granule, TSnapshot{}, blobs); + auto tmp = MakeAppendedPortions(pathId, slice, granule, TSnapshot::Zero(), blobs); for (auto&& portionInfo : tmp) { portions.emplace_back(std::move(portionInfo)); } } } else { - portions = MakeAppendedPortions(pathId, batch, granule, TSnapshot{}, blobs); + portions = MakeAppendedPortions(pathId, batch, granule, TSnapshot::Zero(), blobs); } Y_VERIFY(portions.size() > 0); @@ -664,7 +664,7 @@ TVector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr<TCo for (const auto& batch : idBatches[id]) { // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges(). - auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, TSnapshot{}, blobs); + auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, TSnapshot::Zero(), blobs); Y_VERIFY(newPortions.size() > 0); for (auto& portion : newPortions) { changes->AppendedPortions.emplace_back(std::move(portion)); @@ -680,7 +680,7 @@ TVector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr<TCo ui64 tmpGranule = changes->SetTmpGranule(pathId, ts); // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges(). - auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, TSnapshot{}, blobs); + auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, TSnapshot::Zero(), blobs); Y_VERIFY(portions.size() > 0); for (auto& portion : portions) { changes->AppendedPortions.emplace_back(std::move(portion)); diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 31ebe70139..735d5ed7b2 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -183,12 +183,12 @@ void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& da } std::shared_ptr<arrow::RecordBatch> -TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& srcBatch, ui64 planStep, ui64 txId) const { +TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& srcBatch, const TSnapshot& snapshot) const { Y_VERIFY(srcBatch); // Extract columns (without check), filter, attach snapshot, extract columns with check // (do not filter snapshot columns) - auto loadSchema = ReadMetadata->GetLoadSchema(TSnapshot().SetPlanStep(planStep).SetTxId(txId)); + auto loadSchema = ReadMetadata->GetLoadSchema(snapshot); auto batch = NArrow::ExtractExistedColumns(srcBatch, loadSchema); Y_VERIFY(batch); @@ -199,9 +199,7 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& } auto preparedBatch = batch; - preparedBatch = TIndexInfo::AddSpecialColumns(preparedBatch, planStep, txId); - Y_VERIFY(preparedBatch); - + preparedBatch = TIndexInfo::AddSpecialColumns(preparedBatch, snapshot); preparedBatch = NArrow::ExtractColumns(preparedBatch, loadSchema); Y_VERIFY(preparedBatch); diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index ad22e96433..1c155767de 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -65,16 +65,16 @@ public: /// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK) TVector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch); - void AddNotIndexed(ui32 batchNo, TString blob, ui64 planStep, ui64 txId) { - auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->GetBlobSchema(TSnapshot().SetPlanStep(planStep).SetTxId(txId))); - AddNotIndexed(batchNo, batch, planStep, txId); + void AddNotIndexed(ui32 batchNo, TString blob, const TSnapshot& snapshot) { + auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->GetBlobSchema(snapshot)); + AddNotIndexed(batchNo, batch, snapshot); } - void AddNotIndexed(ui32 batchNo, const std::shared_ptr<arrow::RecordBatch>& batch, ui64 planStep, ui64 txId) { + void AddNotIndexed(ui32 batchNo, const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) { Y_VERIFY(batchNo < NotIndexed.size()); Y_VERIFY(!NotIndexed[batchNo]); ++ReadyNotIndexed; - NotIndexed[batchNo] = MakeNotIndexedBatch(batch, planStep, txId); + NotIndexed[batchNo] = MakeNotIndexedBatch(batch, snapshot); } void AddIndexed(const TBlobRange& blobRange, const TString& column); @@ -94,7 +94,7 @@ public: private: std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch( - const std::shared_ptr<arrow::RecordBatch>& batch, ui64 planStep, ui64 txId) const; + const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) const; std::shared_ptr<arrow::RecordBatch> MergeNotIndexed( std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const; diff --git a/ydb/core/tx/columnshard/engines/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table.cpp index 448aaf3904..b9a800f510 100644 --- a/ydb/core/tx/columnshard/engines/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table.cpp @@ -187,7 +187,7 @@ bool TInsertTable::Load(IDbWrapper& dbTable, const TInstant& loadTime) { return true; } -std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, ui64 plan, ui64 txId) const { +std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& snapshot) const { const auto* committed = CommittedByPathId.FindPtr(pathId); if (!committed) { return {}; @@ -197,8 +197,8 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, ui64 plan, ui64 txId ret.reserve(committed->size()); for (const auto& data : *committed) { - if (SnapLessOrEqual(data.ShardOrPlan, data.WriteTxId, plan, txId)) { - ret.emplace_back(TCommittedBlob{data.BlobId, data.ShardOrPlan, data.WriteTxId}); + if (std::less_equal<TSnapshot>()(data.GetSnapshot(), snapshot)) { + ret.emplace_back(TCommittedBlob{data.BlobId, data.GetSnapshot()}); } } diff --git a/ydb/core/tx/columnshard/engines/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table.h index 7a48a3c00c..93357ffbf2 100644 --- a/ydb/core/tx/columnshard/engines/insert_table.h +++ b/ydb/core/tx/columnshard/engines/insert_table.h @@ -80,22 +80,37 @@ struct TInsertedData { DedupId.clear(); } - ui64 PlanStep() const { return ShardOrPlan; } - ui64 TxId() const { return WriteTxId; } + TSnapshot GetSnapshot() const { + return TSnapshot(ShardOrPlan, WriteTxId); + } + ui32 BlobSize() const { return BlobId.BlobSize(); } }; -struct TCommittedBlob { +class TCommittedBlob { +private: TUnifiedBlobId BlobId; - ui64 PlanStep{0}; - ui64 TxId{0}; + TSnapshot Snapshot; +public: + TCommittedBlob(const TUnifiedBlobId& blobId, const TSnapshot& snapshot) + : BlobId(blobId) + , Snapshot(snapshot) + {} /// It uses trick then we place key wtih planStep:txId in container and find them later by BlobId only. /// So hash() and equality should depend on BlobId only. bool operator == (const TCommittedBlob& key) const { return BlobId == key.BlobId; } ui64 Hash() const noexcept { return BlobId.Hash(); } TString DebugString() const { - return TStringBuilder() << BlobId << ";ps=" << PlanStep << ";ti=" << TxId; + return TStringBuilder() << BlobId << ";ps=" << Snapshot.GetPlanStep() << ";ti=" << Snapshot.GetTxId(); + } + + const TSnapshot& GetSnapshot() const { + return Snapshot; + } + + const TUnifiedBlobId& GetBlobId() const { + return BlobId; } }; @@ -123,7 +138,7 @@ public: THashSet<TWriteId> DropPath(IDbWrapper& dbTable, ui64 pathId); void EraseCommitted(IDbWrapper& dbTable, const TInsertedData& key); void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key); - std::vector<TCommittedBlob> Read(ui64 pathId, ui64 plan, ui64 txId) const; + std::vector<TCommittedBlob> Read(ui64 pathId, const TSnapshot& snapshot) const; bool Load(IDbWrapper& dbTable, const TInstant& loadTime); const TCounters& GetCountersPrepared() const { return StatsPrepared; } const TCounters& GetCountersCommitted() const { return StatsCommitted; } diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index 8f364f92ec..3e12e32006 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -146,13 +146,13 @@ struct TPortionInfo { TSnapshot Snapshot() const { Y_VERIFY(!Empty()); auto& rec = Records[0]; - return {rec.PlanStep, rec.TxId}; + return TSnapshot(rec.PlanStep, rec.TxId); } TSnapshot XSnapshot() const { Y_VERIFY(!Empty()); auto& rec = Records[0]; - return {rec.XPlanStep, rec.XTxId}; + return TSnapshot(rec.XPlanStep, rec.XTxId); } bool IsActive() const { diff --git a/ydb/core/tx/columnshard/engines/reader/description.h b/ydb/core/tx/columnshard/engines/reader/description.h index 38444123e0..f02c04cdc5 100644 --- a/ydb/core/tx/columnshard/engines/reader/description.h +++ b/ydb/core/tx/columnshard/engines/reader/description.h @@ -15,6 +15,9 @@ public: // Describes read/scan request struct TReadDescription { +private: + TSnapshot Snapshot; +public: // Table ui64 PathId = 0; TString TableName; @@ -31,15 +34,17 @@ struct TReadDescription { // List of columns TVector<ui32> ColumnIds; TVector<TString> ColumnNames; - // Snapshot - ui64 PlanStep = 0; - ui64 TxId = 0; - + std::shared_ptr<NSsa::TProgram> AddProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program); - TReadDescription(const bool isReverse) - : PKRangesFilter(isReverse) { + TReadDescription(const TSnapshot& snapshot, const bool isReverse) + : Snapshot(snapshot) + , PKRangesFilter(isReverse) { } + + const TSnapshot& GetSnapshot() const { + return Snapshot; + } }; } diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index 225bc1058c..c396b88bbb 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -19,13 +19,13 @@ std::shared_ptr<NOlap::TSelectInfo> TDataStorageAccessor::Select(const NOlap::TR return std::make_shared<NOlap::TSelectInfo>(); } return Index->Select(readDescription.PathId, - {readDescription.PlanStep, readDescription.TxId}, + readDescription.GetSnapshot(), columnIds, readDescription.PKRangesFilter); } std::vector<NOlap::TCommittedBlob> TDataStorageAccessor::GetCommitedBlobs(const NOlap::TReadDescription& readDescription) const { - return std::move(InsertTable->Read(readDescription.PathId, readDescription.PlanStep, readDescription.TxId)); + return std::move(InsertTable->Read(readDescription.PathId, readDescription.GetSnapshot())); } std::shared_ptr<arrow::RecordBatch> TDataStorageAccessor::GetCachedBatch(const TUnifiedBlobId& blobId) const { @@ -90,8 +90,8 @@ bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataSto CommittedBlobs = dataAccessor.GetCommitedBlobs(readDescription); for (auto& cmt : CommittedBlobs) { - if (auto batch = dataAccessor.GetCachedBatch(cmt.BlobId)) { - CommittedBatches.emplace(cmt.BlobId, batch); + if (auto batch = dataAccessor.GetCachedBatch(cmt.GetBlobId())) { + CommittedBatches.emplace(cmt.GetBlobId(), batch); } } diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp index 0255cbc3d2..edc295a6ae 100644 --- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp @@ -67,9 +67,9 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) { UNIT_ASSERT(!ok); // read nothing - auto blobs = insertTable.Read(tableId, 0, 0); + auto blobs = insertTable.Read(tableId, TSnapshot::Zero()); UNIT_ASSERT_EQUAL(blobs.size(), 0); - blobs = insertTable.Read(tableId+1, 0, 0); + blobs = insertTable.Read(tableId+1, TSnapshot::Zero()); UNIT_ASSERT_EQUAL(blobs.size(), 0); // commit @@ -82,15 +82,15 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) { UNIT_ASSERT_EQUAL(committed.begin()->second.size(), 1); // read old snapshot - blobs = insertTable.Read(tableId, 0, 0); + blobs = insertTable.Read(tableId, TSnapshot::Zero()); UNIT_ASSERT_EQUAL(blobs.size(), 0); - blobs = insertTable.Read(tableId+1, 0, 0); + blobs = insertTable.Read(tableId+1, TSnapshot::Zero()); UNIT_ASSERT_EQUAL(blobs.size(), 0); // read new snapshot - blobs = insertTable.Read(tableId, planStep, txId); + blobs = insertTable.Read(tableId, TSnapshot(planStep, txId)); UNIT_ASSERT_EQUAL(blobs.size(), 1); - blobs = insertTable.Read(tableId+1, 0, 0); + blobs = insertTable.Read(tableId+1, TSnapshot::Zero()); UNIT_ASSERT_EQUAL(blobs.size(), 0); } } diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 71c9ce8956..0043fb5709 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -292,7 +292,7 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, bool Insert(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap, TVector<TInsertedData>&& dataToIndex, THashMap<TBlobRange, TString>& blobs, ui32& step) { TColumnEngineForLogs engine(0, TestLimits()); - engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo)); + engine.UpdateDefaultSchema(snap, TIndexInfo(tableInfo)); THashSet<TUnifiedBlobId> lostBlobs; engine.Load(db, lostBlobs); @@ -312,7 +312,7 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T UNIT_ASSERT_VALUES_EQUAL(compactionInfo->Granules.size(), 1); UNIT_ASSERT(!compactionInfo->InGranule); - std::shared_ptr<TColumnEngineChanges> changes = engine.StartCompaction(std::move(compactionInfo), {0, 0}); + std::shared_ptr<TColumnEngineChanges> changes = engine.StartCompaction(std::move(compactionInfo), TSnapshot::Zero()); UNIT_ASSERT_VALUES_EQUAL(changes->SwitchedPortions.size(), expected.SrcPortions); changes->SetBlobs(std::move(blobs)); @@ -330,7 +330,7 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T bool Compact(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap, THashMap<TBlobRange, TString>&& blobs, ui32& step, const TExpected& expected) { TColumnEngineForLogs engine(0, TestLimits()); - engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo)); + engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo)); THashSet<TUnifiedBlobId> lostBlobs; engine.Load(db, lostBlobs); return Compact(engine, db, snap, std::move(blobs), step, expected); @@ -400,12 +400,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { THashMap<TBlobRange, TString> blobs; blobs[blobRanges[0]] = testBlob; blobs[blobRanges[1]] = testBlob; - Insert(tableInfo, db, {1, 2}, std::move(dataToIndex), blobs, step); + Insert(tableInfo, db, TSnapshot(1, 2), std::move(dataToIndex), blobs, step); // load TColumnEngineForLogs engine(0); - engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo)); + engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo)); THashSet<TUnifiedBlobId> lostBlobs; engine.Load(db, lostBlobs); @@ -421,7 +421,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // select from snap before insert ui64 planStep = 1; ui64 txId = 0; - auto selectInfo = engine.Select(paths[0], {planStep, txId}, columnIds, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), columnIds, NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 0); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0); } @@ -429,7 +429,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // select from snap after insert (greater txId) ui64 planStep = 1; ui64 txId = 2; - auto selectInfo = engine.Select(paths[0], {planStep, txId}, columnIds, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), columnIds, NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 1); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 1); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions[0].NumRecords(), columnIds.size()); @@ -438,7 +438,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // select from snap after insert (greater planStep) ui64 planStep = 2; ui64 txId = 1; - auto selectInfo = engine.Select(paths[0], {planStep, txId}, oneColumnId, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 1); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 1); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions[0].NumRecords(), 1); @@ -447,7 +447,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // select another pathId ui64 planStep = 2; ui64 txId = 1; - auto selectInfo = engine.Select(paths[1], {planStep, txId}, oneColumnId, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(paths[1], TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 0); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0); } @@ -492,7 +492,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { dataToIndex.push_back( TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()}); - bool ok = Insert(tableInfo, db, {planStep, txId}, std::move(dataToIndex), blobs, step); + bool ok = Insert(tableInfo, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); } @@ -505,7 +505,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // load TColumnEngineForLogs engine(0, TestLimits()); - engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo)); + engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo)); THashSet<TUnifiedBlobId> lostBlobs; engine.Load(db, lostBlobs); @@ -520,7 +520,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // full scan ui64 txId = 1; - auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 4); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 4); } @@ -535,7 +535,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { } NOlap::TPKRangesFilter pkFilter(false); Y_VERIFY(pkFilter.Add(gt10k, nullptr, nullptr)); - auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, pkFilter); + auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, pkFilter); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2); } @@ -548,7 +548,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { } NOlap::TPKRangesFilter pkFilter(false); Y_VERIFY(pkFilter.Add(nullptr, lt10k, nullptr)); - auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, pkFilter); + auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, pkFilter); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2); } @@ -580,7 +580,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { ui64 planStep = 1; TColumnEngineForLogs engine(0, TestLimits()); - engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo)); + engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo)); THashSet<TUnifiedBlobId> lostBlobs; engine.Load(db, lostBlobs); @@ -598,7 +598,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { dataToIndex.push_back( TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()}); - bool ok = Insert(engine, db, {planStep, txId}, std::move(dataToIndex), blobs, step); + bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); // first overload returns ok: it's a postcondition if (!overload) { UNIT_ASSERT(ok); @@ -612,7 +612,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // check it's overloaded after reload TColumnEngineForLogs tmpEngine(0, TestLimits()); - tmpEngine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo)); + tmpEngine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo)); tmpEngine.Load(db, lostBlobs); UNIT_ASSERT(tmpEngine.GetOverloadedGranules(pathId)); } @@ -636,7 +636,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { dataToIndex.push_back( TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()}); - bool ok = Insert(engine, db, {planStep, txId}, std::move(dataToIndex), blobs, step); + bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); bool overload = engine.GetOverloadedGranules(pathId); UNIT_ASSERT(ok); UNIT_ASSERT(!overload); @@ -644,7 +644,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // check it's not overloaded after reload TColumnEngineForLogs tmpEngine(0, TestLimits()); - tmpEngine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo)); + tmpEngine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo)); tmpEngine.Load(db, lostBlobs); UNIT_ASSERT(!tmpEngine.GetOverloadedGranules(pathId)); } @@ -673,7 +673,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { dataToIndex.push_back( TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()}); - bool ok = Insert(tableInfo, db, {planStep, txId}, std::move(dataToIndex), blobs, step); + bool ok = Insert(tableInfo, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); } @@ -686,7 +686,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // load TColumnEngineForLogs engine(0, TestLimits()); - engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo)); + engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo)); THashSet<TUnifiedBlobId> lostBlobs; engine.Load(db, lostBlobs); @@ -698,7 +698,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // full scan ui64 txId = 1; - auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 4); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 4); } @@ -708,7 +708,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // full scan ui64 txId = 1; - auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 4); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 4); } @@ -725,7 +725,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // full scan ui64 txId = 1; - auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2); } @@ -736,7 +736,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // full scan ui64 txId = 1; - auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, NOlap::TPKRangesFilter(false)); + auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2); } diff --git a/ydb/core/tx/columnshard/inflight_request_tracker.h b/ydb/core/tx/columnshard/inflight_request_tracker.h index de55e0d469..c8b95d9ea5 100644 --- a/ydb/core/tx/columnshard/inflight_request_tracker.h +++ b/ydb/core/tx/columnshard/inflight_request_tracker.h @@ -66,7 +66,7 @@ public: } for (const auto& committedBlob : readMeta->CommittedBlobs) { - blobTracker.SetBlobInUse(committedBlob.BlobId, false); + blobTracker.SetBlobInUse(committedBlob.GetBlobId(), false); } } @@ -107,7 +107,7 @@ private: } for (const auto& committedBlob : readMeta->CommittedBlobs) { - blobTracker.SetBlobInUse(committedBlob.BlobId, true); + blobTracker.SetBlobInUse(committedBlob.GetBlobId(), true); } } diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index 99f4d17485..b9aa58f2d4 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -64,13 +64,13 @@ public: WaitIndexed.erase(event.BlobRange); IndexedData.AddIndexed(event.BlobRange, event.Data); } else if (CommittedBlobs.contains(blobId)) { - auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, 0, 0}); + auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, NOlap::TSnapshot::Zero()}); if (cmt.empty()) { return; // ignore duplicates } const NOlap::TCommittedBlob& cmtBlob = cmt.key(); ui32 batchNo = cmt.mapped(); - IndexedData.AddNotIndexed(batchNo, event.Data, cmtBlob.PlanStep, cmtBlob.TxId); + IndexedData.AddNotIndexed(batchNo, event.Data, cmtBlob.GetSnapshot()); } else { LOG_S_ERROR("TEvReadBlobRangeResult returned unexpected blob at tablet " << TabletId << " (read)"); @@ -167,7 +167,7 @@ public: for (size_t i = 0; i < ReadMetadata->CommittedBlobs.size(); ++i, ++notIndexed) { const auto& cmtBlob = ReadMetadata->CommittedBlobs[i]; - CommittedBlobs.emplace(cmtBlob.BlobId); + CommittedBlobs.emplace(cmtBlob.GetBlobId()); WaitCommitted.emplace(cmtBlob, notIndexed); } @@ -180,12 +180,12 @@ public: // Add cached batches without read for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) { - auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, 0, 0}); + auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, NOlap::TSnapshot::Zero()}); Y_VERIFY(!cmt.empty()); const NOlap::TCommittedBlob& cmtBlob = cmt.key(); ui32 batchNo = cmt.mapped(); - IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.PlanStep, cmtBlob.TxId); + IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.GetSnapshot()); } LOG_S_DEBUG("Starting read (" << WaitIndexed.size() << " indexed, " @@ -209,7 +209,7 @@ public: } else { // TODO: Keep inflight for (auto& [cmtBlob, batchNo] : WaitCommitted) { - auto& blobId = cmtBlob.BlobId; + auto& blobId = cmtBlob.GetBlobId(); SendReadRequest(ctx, NBlobCache::TBlobRange(blobId, 0, blobId.BlobSize())); } for (auto&& blobRange : IndexedBlobs) { diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index 423221195b..62a6917e25 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -173,7 +173,7 @@ public: return *PrimaryIndex; } - const NOlap::TIndexInfo& GetIndexInfo(const NOlap::TSnapshot& version = {}) const { + const NOlap::TIndexInfo& GetIndexInfo(const NOlap::TSnapshot& version = NOlap::TSnapshot::Zero()) const { Y_UNUSED(version); Y_VERIFY(!!PrimaryIndex); return PrimaryIndex->GetIndexInfo(); diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 83c252b36a..a940c5f549 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -310,7 +310,7 @@ struct TestTableDescription { void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, const TestTableDescription& table, TString codec = "none") { - NOlap::TSnapshot snap = {10, 10}; + NOlap::TSnapshot snap(10, 10); TString txBody; if (table.InStore) { txBody = TTestSchema::CreateTableTxBody( @@ -2248,7 +2248,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } { // Get index stats - ScanIndexStats(runtime, sender, {tableId, 42}, {planStep, txId}, 0); + ScanIndexStats(runtime, sender, {tableId, 42}, NOlap::TSnapshot(planStep, txId), 0); auto scanInited = runtime.GrabEdgeEvent<NKqp::TEvKqpCompute::TEvScanInitActor>(handle); auto& msg = scanInited->Record; auto scanActorId = ActorIdFromProto(msg.GetScanActorId()); @@ -2408,7 +2408,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { // Cerr << response << Endl; UNIT_ASSERT_VALUES_EQUAL(response.GetStatus(), Ydb::StatusIds::BAD_REQUEST); UNIT_ASSERT_VALUES_EQUAL(response.IssuesSize(), 1); - UNIT_ASSERT_STRING_CONTAINS(response.GetIssues(0).message(), "Snapshot 640000:18446744073709551615 too old"); + UNIT_ASSERT_STRING_CONTAINS(response.GetIssues(0).message(), "Snapshot too old: {640000:max}"); } } diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp index feab71de25..5414091dec 100644 --- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp @@ -70,12 +70,12 @@ bool TriggerTTL(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot s ui64 tsSeconds, const TString& ttlColumnName) { TString txBody = TTestSchema::TtlTxBody(pathIds, ttlColumnName, tsSeconds); auto event = std::make_unique<TEvColumnShard::TEvProposeTransaction>( - NKikimrTxColumnShard::TX_KIND_TTL, sender, snap.TxId, txBody); + NKikimrTxColumnShard::TX_KIND_TTL, sender, snap.GetTxId(), txBody); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, event.release()); auto ev = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(sender); const auto& res = ev->Get()->Record; - UNIT_ASSERT_EQUAL(res.GetTxId(), snap.TxId); + UNIT_ASSERT_EQUAL(res.GetTxId(), snap.GetTxId()); UNIT_ASSERT_EQUAL(res.GetTxKind(), NKikimrTxColumnShard::TX_KIND_TTL); return (res.GetStatus() == NKikimrTxColumnShard::SUCCESS); } @@ -165,7 +165,7 @@ bool TestCreateTable(const TString& txBody, ui64 planStep = 1000, ui64 txId = 10 runtime.DispatchEvents(options); // - return ProposeSchemaTx(runtime, sender, txBody, {++planStep, ++txId}); + return ProposeSchemaTx(runtime, sender, txBody, NOlap::TSnapshot(++planStep, ++txId)); } TString GetReadResult(NKikimrTxColumnShard::TEvReadResult& resRead, @@ -237,9 +237,9 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, } bool ok = ProposeSchemaTx(runtime, sender, TTestSchema::CreateInitShardTxBody(tableId, ydbSchema, testYdbPk, spec, "/Root/olapStore"), - {++planStep, ++txId}); + NOlap::TSnapshot(++planStep, ++txId)); UNIT_ASSERT(ok); - PlanSchemaTx(runtime, sender, {planStep, txId}); + PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId)); if (spec.HasTiers()) { ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec)); } @@ -260,9 +260,9 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, } if (internal) { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn); + TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {}, 0, spec.TtlColumn); } else { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] + ttlIncSeconds, spec.TtlColumn); + TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {tableId}, ts[0] + ttlIncSeconds, spec.TtlColumn); } TAutoPtr<IEventHandle> handle; @@ -297,17 +297,17 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, } ok = ProposeSchemaTx(runtime, sender, TTestSchema::AlterTableTxBody(tableId, 2, spec), - {++planStep, ++txId}); + NOlap::TSnapshot(++planStep, ++txId)); UNIT_ASSERT(ok); - PlanSchemaTx(runtime, sender, {planStep, txId}); + PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId)); if (spec.HasTiers()) { ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec)); } if (internal) { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn); + TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {}, 0, spec.TtlColumn); } else { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[1] + ttlIncSeconds, spec.TtlColumn); + TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {tableId}, ts[1] + ttlIncSeconds, spec.TtlColumn); } { @@ -327,21 +327,21 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, // Disable TTL ok = ProposeSchemaTx(runtime, sender, TTestSchema::AlterTableTxBody(tableId, 3, TTestSchema::TTableSpecials()), - {++planStep, ++txId}); + NOlap::TSnapshot(++planStep, ++txId)); UNIT_ASSERT(ok); if (spec.HasTiers()) { ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(TTestSchema::TTableSpecials())); } - PlanSchemaTx(runtime, sender, {planStep, txId}); + PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId)); UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, blobs[0])); ProposeCommit(runtime, sender, metaShard, ++txId, {writeId}); PlanCommit(runtime, sender, ++planStep, txId); if (internal) { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn); + TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {}, 0, spec.TtlColumn); } else { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] - ttlIncSeconds, spec.TtlColumn); + TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {tableId}, ts[0] - ttlIncSeconds, spec.TtlColumn); } { @@ -514,10 +514,10 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt { const bool ok = ProposeSchemaTx(runtime, sender, TTestSchema::CreateInitShardTxBody(tableId, testYdbSchema, testYdbPk, specs[0], "/Root/olapStore"), - { ++planStep, ++txId }); + NOlap::TSnapshot(++planStep, ++txId)); UNIT_ASSERT(ok); } - PlanSchemaTx(runtime, sender, {planStep, txId}); + PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId)); if (specs[0].Tiers.size()) { ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[0])); } @@ -552,9 +552,9 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt { const bool ok = ProposeSchemaTx(runtime, sender, TTestSchema::AlterTableTxBody(tableId, version, specs[i]), - { ++planStep, ++txId }); + NOlap::TSnapshot(++planStep, ++txId)); UNIT_ASSERT(ok); - PlanSchemaTx(runtime, sender, { planStep, txId }); + PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId)); } } if (specs[i].HasTiers()) { @@ -573,7 +573,7 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt // Eviction - TriggerTTL(runtime, sender, { ++planStep, ++txId }, {}, 0, specs[i].TtlColumn); + TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {}, 0, specs[i].TtlColumn); Cerr << (hasColdEviction ? "Cold" : "Hot") << " tiering, spec " << i << ", num tiers: " << specs[i].Tiers.size() << "\n"; @@ -856,9 +856,9 @@ void TestDrop(bool reboots) { ui64 txId = 100; bool ok = ProposeSchemaTx(runtime, sender, TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk), - {++planStep, ++txId}); + NOlap::TSnapshot(++planStep, ++txId)); UNIT_ASSERT(ok); - PlanSchemaTx(runtime, sender, {planStep, txId}); + PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId)); // @@ -884,9 +884,9 @@ void TestDrop(bool reboots) { } // Drop table - ok = ProposeSchemaTx(runtime, sender, TTestSchema::DropTableTxBody(tableId, 2), {++planStep, ++txId}); + ok = ProposeSchemaTx(runtime, sender, TTestSchema::DropTableTxBody(tableId, 2), NOlap::TSnapshot(++planStep, ++txId)); UNIT_ASSERT(ok); - PlanSchemaTx(runtime, sender, {planStep, txId}); + PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId)); if (reboots) { RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); @@ -931,9 +931,9 @@ void TestDropWriteRace() { UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1")); bool ok = ProposeSchemaTx(runtime, sender, TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk), - {++planStep, ++txId}); + NOlap::TSnapshot(++planStep, ++txId)); UNIT_ASSERT(ok); - PlanSchemaTx(runtime, sender, {planStep, txId}); + PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId)); TString data = MakeTestBlob({0, 100}, testYdbSchema); UNIT_ASSERT(data.size() < NColumnShard::TLimits::MIN_BYTES_TO_INSERT); @@ -945,9 +945,9 @@ void TestDropWriteRace() { auto commitTxId = txId; // Drop table - ok = ProposeSchemaTx(runtime, sender, TTestSchema::DropTableTxBody(tableId, 2), {++planStep, ++txId}); + ok = ProposeSchemaTx(runtime, sender, TTestSchema::DropTableTxBody(tableId, 2), NOlap::TSnapshot(++planStep, ++txId)); if (ok) { - PlanSchemaTx(runtime, sender, {planStep, txId}); + PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId)); } // Plan commit |