aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-05-08 17:17:18 +0300
committernsofya <nsofya@yandex-team.com>2023-05-08 17:17:18 +0300
commit4a5cd6c92018036c598ae098563bef861bcc5316 (patch)
tree16b99632c2da272b3e1f4348480c639c451fb6bc
parent8107c88ce7480dfc685a7227eb3b44c9e9f666b1 (diff)
downloadydb-4a5cd6c92018036c598ae098563bef861bcc5316.tar.gz
Incapsulate data in snapshot object
Accurate snapshot usage
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp10
-rw-r--r--ydb/core/tx/columnshard/columnshard__read.cpp8
-rw-r--r--ydb/core/tx/columnshard/columnshard__read_base.cpp13
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp6
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h6
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp16
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h6
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp16
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h4
-rw-r--r--ydb/core/tx/columnshard/engines/columns_table.h12
-rw-r--r--ydb/core/tx/columnshard/engines/defs.h44
-rw-r--r--ydb/core/tx/columnshard/engines/filter.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/granules_table.h14
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h3
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.cpp12
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h12
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table.h29
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/description.h17
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/ut_insert_table.cpp12
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp50
-rw-r--r--ydb/core/tx/columnshard/inflight_request_tracker.h4
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp12
-rw-r--r--ydb/core/tx/columnshard/tables_manager.h2
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp6
-rw-r--r--ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp56
33 files changed, 210 insertions, 204 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index 178384bb20..8150db60b7 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -341,7 +341,7 @@ void TColumnShard::SendPeriodicStats() {
// TODO: we need row/dataSize counters for evicted data (managed by tablet but stored outside)
//tabletStats->SetIndexSize(); // TODO: calc size of internal tables
tabletStats->SetLastAccessTime(LastAccessTime.MilliSeconds());
- tabletStats->SetLastUpdateTime(lastIndexUpdate.PlanStep);
+ tabletStats->SetLastUpdateTime(lastIndexUpdate.GetPlanStep());
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index 7c63b8cb53..411cd360f2 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -19,16 +19,16 @@ TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstP
IndexedData.InitRead(batchNo);
// Add cached batches without read
for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) {
- auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, 0, 0 });
+ auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, NOlap::TSnapshot::Zero() });
Y_VERIFY(!cmt.empty());
const NOlap::TCommittedBlob& cmtBlob = cmt.key();
ui32 batchNo = cmt.mapped();
- IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.PlanStep, cmtBlob.TxId);
+ IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.GetSnapshot());
}
// Read all remained committed blobs
for (const auto& [cmtBlob, _] : WaitCommitted) {
- auto& blobId = cmtBlob.BlobId;
+ auto& blobId = cmtBlob.GetBlobId();
FetchBlobsQueue.emplace_front(TBlobRange(blobId, 0, blobId.BlobSize()));
}
@@ -44,11 +44,11 @@ void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data
if (IndexedData.IsIndexedBlob(blobRange)) {
IndexedData.AddIndexed(blobRange, data);
} else {
- auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, 0, 0 });
+ auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, NOlap::TSnapshot::Zero() });
Y_VERIFY(!cmt.empty());
const NOlap::TCommittedBlob& cmtBlob = cmt.key();
ui32 batchNo = cmt.mapped();
- IndexedData.AddNotIndexed(batchNo, data, cmtBlob.PlanStep, cmtBlob.TxId);
+ IndexedData.AddNotIndexed(batchNo, data, cmtBlob.GetSnapshot());
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp
index 92d1295cb3..bd7c9894ab 100644
--- a/ydb/core/tx/columnshard/columnshard__read.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read.cpp
@@ -47,13 +47,11 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) {
txc.DB.NoMoreReadsForTx();
auto& record = Proto(Ev->Get());
- const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo(NOlap::TSnapshot().SetPlanStep(record.GetPlanStep()).SetTxId(record.GetTxId()));
+ const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo(NOlap::TSnapshot(record.GetPlanStep(), record.GetTxId()));
ui64 metaShard = record.GetTxInitiator();
- NOlap::TReadDescription read(false);
- read.PlanStep = record.GetPlanStep();
- read.TxId = record.GetTxId();
+ NOlap::TReadDescription read(NOlap::TSnapshot(record.GetPlanStep(), record.GetTxId()), false);
read.PathId = record.GetTableId();
read.ReadNothing = !(Self->TablesManager.HasTable(read.PathId));
read.ColumnIds = ProtoToVector<ui32>(record.GetColumnIds());
@@ -97,7 +95,7 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) {
}
Result = std::make_unique<TEvColumnShard::TEvReadResult>(
- Self->TabletID(), metaShard, read.PlanStep, read.TxId, read.PathId, 0, true, status);
+ Self->TabletID(), metaShard, read.GetSnapshot().GetPlanStep(), read.GetSnapshot().GetTxId(), read.PathId, 0, true, status);
if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) {
Self->IncCounter(COUNTER_READ_SUCCESS);
diff --git a/ydb/core/tx/columnshard/columnshard__read_base.cpp b/ydb/core/tx/columnshard/columnshard__read_base.cpp
index 550144ad25..bf21cb1419 100644
--- a/ydb/core/tx/columnshard/columnshard__read_base.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read_base.cpp
@@ -15,20 +15,19 @@ TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const NOlap::TReadDes
Y_UNUSED(ctx);
if (!insertTable || !index) {
- return {};
+ return nullptr;
}
- if (read.PlanStep < Self->GetMinReadStep()) {
- error = Sprintf("Snapshot %" PRIu64 ":%" PRIu64 " too old", read.PlanStep, read.TxId);
- return {};
+ if (read.GetSnapshot().GetPlanStep() < Self->GetMinReadStep()) {
+ error = TStringBuilder() << "Snapshot too old: " << read.GetSnapshot();
+ return nullptr;
}
NOlap::TDataStorageAccessor dataAccessor(insertTable, index, batchCache);
- auto readSnapshot = NOlap::TSnapshot().SetPlanStep(read.PlanStep).SetTxId(read.TxId);
- auto readMetadata = std::make_shared<NOlap::TReadMetadata>(index->GetVersionedIndex(), readSnapshot, isReverse ? NOlap::TReadMetadata::ESorting::DESC : NOlap::TReadMetadata::ESorting::ASC);
+ auto readMetadata = std::make_shared<NOlap::TReadMetadata>(index->GetVersionedIndex(), read.GetSnapshot(), isReverse ? NOlap::TReadMetadata::ESorting::DESC : NOlap::TReadMetadata::ESorting::ASC);
if (!readMetadata->Init(read, dataAccessor, error)) {
- return {};
+ return nullptr;
}
return readMetadata;
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 279587bbda..4b30d7a5ff 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -842,9 +842,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
ui64 itemsLimit = record.HasItemsLimit() ? record.GetItemsLimit() : 0;
- NOlap::TReadDescription read(record.GetReverse());
- read.PlanStep = snapshot.GetStep();
- read.TxId = snapshot.GetTxId();
+ NOlap::TReadDescription read(NOlap::TSnapshot(snapshot.GetStep(), snapshot.GetTxId()), record.GetReverse());
read.PathId = record.GetLocalPathId();
read.ReadNothing = !(Self->TablesManager.HasTable(read.PathId));
read.TableName = record.GetTablePath();
@@ -854,7 +852,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
const NOlap::TIndexInfo* indexInfo = nullptr;
if (!isIndexStats) {
- indexInfo = &(Self->TablesManager.GetIndexInfo(NOlap::TSnapshot().SetPlanStep(snapshot.GetStep()).SetTxId(snapshot.GetTxId())));
+ indexInfo = &(Self->TablesManager.GetIndexInfo(NOlap::TSnapshot(snapshot.GetStep(), snapshot.GetTxId())));
}
// TODO: move this to CreateReadMetadata?
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index a9bffb34d7..19d7195fb8 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -61,7 +61,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
if (Ev->Get()->PutStatus == NKikimrProto::OK) {
NOlap::TSnapshot snapshot = changes->ApplySnapshot;
if (snapshot.IsZero()) {
- snapshot = {Self->LastPlannedStep, Self->LastPlannedTxId};
+ snapshot = NOlap::TSnapshot(Self->LastPlannedStep, Self->LastPlannedTxId);
}
TBlobGroupSelector dsGroupSelector(Self->Info());
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 2a8d2c77fa..271779dedf 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -778,7 +778,7 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() {
LOG_S_DEBUG("Prepare " << *compactionInfo << " at tablet " << TabletID());
ui64 ourdatedStep = GetOutdatedStep();
- auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(std::move(compactionInfo), {ourdatedStep, 0});
+ auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(std::move(compactionInfo), NOlap::TSnapshot(ourdatedStep, 0));
if (!indexChanges) {
LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID());
return {};
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index 5e5d70ca01..d81b47a627 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -505,8 +505,8 @@ struct Schema : NIceDb::Schema {
const TGranuleRecord& row) {
db.Table<IndexGranules>().Key(index, row.PathId, engine.SerializeMark(row.Mark)).Update(
NIceDb::TUpdate<IndexGranules::Granule>(row.Granule),
- NIceDb::TUpdate<IndexGranules::PlanStep>(row.CreatedAt.PlanStep),
- NIceDb::TUpdate<IndexGranules::TxId>(row.CreatedAt.TxId)
+ NIceDb::TUpdate<IndexGranules::PlanStep>(row.GetCreatedAt().GetPlanStep()),
+ NIceDb::TUpdate<IndexGranules::TxId>(row.GetCreatedAt().GetTxId())
);
}
@@ -528,7 +528,7 @@ struct Schema : NIceDb::Schema {
ui64 planStep = rowset.GetValue<IndexGranules::PlanStep>();
ui64 txId = rowset.GetValue<IndexGranules::TxId>();
- callback(TGranuleRecord(pathId, granule, {planStep, txId}, engine.DeserializeMark(indexKey)));
+ callback(TGranuleRecord(pathId, granule, NOlap::TSnapshot(planStep, txId), engine.DeserializeMark(indexKey)));
if (!rowset.Next())
return false;
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index 26e1415199..3333820f3a 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -50,27 +50,27 @@ void ProvideTieringSnapshot(TTestBasicRuntime& runtime, TActorId& sender, NMetad
bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap) {
auto event = std::make_unique<TEvColumnShard::TEvProposeTransaction>(
- NKikimrTxColumnShard::TX_KIND_SCHEMA, 0, sender, snap.TxId, txBody);
+ NKikimrTxColumnShard::TX_KIND_SCHEMA, 0, sender, snap.GetTxId(), txBody);
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, event.release());
auto ev = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(sender);
const auto& res = ev->Get()->Record;
- UNIT_ASSERT_EQUAL(res.GetTxId(), snap.TxId);
+ UNIT_ASSERT_EQUAL(res.GetTxId(), snap.GetTxId());
UNIT_ASSERT_EQUAL(res.GetTxKind(), NKikimrTxColumnShard::TX_KIND_SCHEMA);
return (res.GetStatus() == NKikimrTxColumnShard::PREPARED);
}
void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap) {
- auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(snap.PlanStep, 0, TTestTxConfig::TxTablet0);
+ auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(snap.GetPlanStep(), 0, TTestTxConfig::TxTablet0);
auto tx = plan->Record.AddTransactions();
- tx->SetTxId(snap.TxId);
+ tx->SetTxId(snap.GetTxId());
ActorIdToProto(sender, tx->MutableAckTo());
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, plan.release());
UNIT_ASSERT(runtime.GrabEdgeEvent<TEvTxProcessing::TEvPlanStepAck>(sender));
auto ev = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(sender);
const auto& res = ev->Get()->Record;
- UNIT_ASSERT_EQUAL(res.GetTxId(), snap.TxId);
+ UNIT_ASSERT_EQUAL(res.GetTxId(), snap.GetTxId());
UNIT_ASSERT_EQUAL(res.GetTxKind(), NKikimrTxColumnShard::TX_KIND_SCHEMA);
UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::SUCCESS);
}
@@ -120,7 +120,7 @@ void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const TVector<
auto scan = std::make_unique<TEvColumnShard::TEvScan>();
auto& record = scan->Record;
- record.SetTxId(snap.PlanStep);
+ record.SetTxId(snap.GetPlanStep());
record.SetScanId(scanId);
// record.SetLocalPathId(0);
record.SetTablePath(NOlap::TIndexInfo::STORE_INDEX_STATS_TABLE);
@@ -146,8 +146,8 @@ void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const TVector<
range.Serialize(*newRange);
}
- record.MutableSnapshot()->SetStep(snap.PlanStep);
- record.MutableSnapshot()->SetTxId(snap.TxId);
+ record.MutableSnapshot()->SetStep(snap.GetPlanStep());
+ record.MutableSnapshot()->SetTxId(snap.GetTxId());
record.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::ARROW);
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, scan.release());
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 9d1eb13e3d..7a8f1f6e91 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -132,8 +132,8 @@ public:
EType Type{UNSPECIFIED};
TCompactionLimits Limits;
- TSnapshot InitSnapshot;
- TSnapshot ApplySnapshot;
+ TSnapshot InitSnapshot = TSnapshot::Zero();
+ TSnapshot ApplySnapshot = TSnapshot::Zero();
std::unique_ptr<TCompactionInfo> CompactionInfo;
TVector<NOlap::TInsertedData> DataToIndex;
TVector<TPortionInfo> SwitchedPortions; // Portions that would be replaced by new ones
@@ -399,7 +399,7 @@ public:
virtual const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const = 0;
virtual const TColumnEngineStats& GetTotalStats() = 0;
virtual ui64 MemoryUsage() const { return 0; }
- virtual TSnapshot LastUpdate() const { return {}; }
+ virtual TSnapshot LastUpdate() const { return TSnapshot::Zero(); }
};
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 2b1cee1579..654504c61c 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -15,7 +15,7 @@ namespace {
bool InitInGranuleMerge(const TMark& granuleMark, TVector<TPortionInfo>& portions, const TCompactionLimits& limits,
const TSnapshot& snap, TColumnEngineForLogs::TMarksGranules& marksGranules) {
- ui64 oldTimePlanStep = snap.PlanStep - TDuration::Seconds(limits.InGranuleCompactSeconds).MilliSeconds();
+ ui64 oldTimePlanStep = snap.GetPlanStep() - TDuration::Seconds(limits.InGranuleCompactSeconds).MilliSeconds();
ui32 insertedCount = 0;
ui32 insertedNew = 0;
@@ -28,7 +28,7 @@ bool InitInGranuleMerge(const TMark& granuleMark, TVector<TPortionInfo>& portion
for (const auto& portionInfo : portions) {
if (portionInfo.IsInserted()) {
++insertedCount;
- if (portionInfo.Snapshot().PlanStep > oldTimePlanStep) {
+ if (portionInfo.Snapshot().GetPlanStep() > oldTimePlanStep) {
++insertedNew;
}
} else if (portionInfo.BlobsSizes().second >= limits.GoodBlobSize) {
@@ -453,10 +453,10 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) {
LastGranule = value;
break;
case LAST_PLAN_STEP:
- LastSnapshot.PlanStep = value;
+ LastSnapshot = TSnapshot(value, LastSnapshot.GetTxId());
break;
case LAST_TX_ID:
- LastSnapshot.TxId = value;
+ LastSnapshot = TSnapshot(LastSnapshot.GetPlanStep(), value);
break;
}
};
@@ -649,7 +649,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
return {};
}
- TSnapshot fakeSnapshot = {1, 1}; // TODO: better snapshot
+ TSnapshot fakeSnapshot(1, 1); // TODO: better snapshot
auto changes = std::make_shared<TChanges>(*this, TColumnEngineChanges::TTL, fakeSnapshot);
ui64 evicttionSize = 0;
bool allowEviction = true;
@@ -1082,8 +1082,8 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
if (LastSnapshot < snapshot) {
LastSnapshot = snapshot;
- CountersTable->Write(db, LAST_PLAN_STEP, LastSnapshot.PlanStep);
- CountersTable->Write(db, LAST_TX_ID, LastSnapshot.TxId);
+ CountersTable->Write(db, LAST_PLAN_STEP, LastSnapshot.GetPlanStep());
+ CountersTable->Write(db, LAST_TX_ID, LastSnapshot.GetTxId());
}
}
return true;
@@ -1222,7 +1222,7 @@ TMap<TSnapshot, TVector<ui64>> TColumnEngineForLogs::GetOrderedPortions(ui64 gra
TSnapshot recXSnapshot = portionInfo.XSnapshot();
bool visible = (recSnapshot <= snapshot);
- if (recXSnapshot.PlanStep) {
+ if (recXSnapshot.GetPlanStep()) {
visible = visible && snapshot < recXSnapshot;
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index b71585a8b8..352206ed7b 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -257,7 +257,7 @@ private:
TColumnEngineStats Counters;
ui64 LastPortion;
ui64 LastGranule;
- TSnapshot LastSnapshot;
+ TSnapshot LastSnapshot = TSnapshot::Zero();
void ClearIndex() {
Granules.clear();
@@ -272,7 +272,7 @@ private:
LastPortion = 0;
LastGranule = 0;
- LastSnapshot = {};
+ LastSnapshot = TSnapshot::Zero();
}
bool LoadGranules(IDbWrapper& db);
diff --git a/ydb/core/tx/columnshard/engines/columns_table.h b/ydb/core/tx/columnshard/engines/columns_table.h
index 951511d9ea..b98a57c0c4 100644
--- a/ydb/core/tx/columnshard/engines/columns_table.h
+++ b/ydb/core/tx/columnshard/engines/columns_table.h
@@ -50,22 +50,22 @@ struct TColumnRecord {
void SetSnapshot(const TSnapshot& snap) {
Y_VERIFY(snap.Valid());
- PlanStep = snap.PlanStep;
- TxId = snap.TxId;
+ PlanStep = snap.GetPlanStep();
+ TxId = snap.GetTxId();
}
void SetXSnapshot(const TSnapshot& snap) {
Y_VERIFY(snap.Valid());
- XPlanStep = snap.PlanStep;
- XTxId = snap.TxId;
+ XPlanStep = snap.GetPlanStep();
+ XTxId = snap.GetTxId();
}
static TColumnRecord Make(ui64 granule, ui32 columnId, const TSnapshot& minSnapshot, ui64 portion, ui16 chunk = 0) {
TColumnRecord row;
row.Granule = granule;
row.ColumnId = columnId;
- row.PlanStep = minSnapshot.PlanStep;
- row.TxId = minSnapshot.TxId;
+ row.PlanStep = minSnapshot.GetPlanStep();
+ row.TxId = minSnapshot.GetTxId();
row.Portion = portion;
row.Chunk = chunk;
//row.BlobId
diff --git a/ydb/core/tx/columnshard/engines/defs.h b/ydb/core/tx/columnshard/engines/defs.h
index 575b60a259..c45eeef0ff 100644
--- a/ydb/core/tx/columnshard/engines/defs.h
+++ b/ydb/core/tx/columnshard/engines/defs.h
@@ -12,9 +12,15 @@ enum class TWriteId : ui64 {};
inline TWriteId operator ++(TWriteId& w) noexcept { w = TWriteId{ui64(w) + 1}; return w; }
-struct TSnapshot {
- ui64 PlanStep{0};
- ui64 TxId{0};
+class TSnapshot {
+private:
+ ui64 PlanStep = 0;
+ ui64 TxId = 0;
+public:
+ constexpr explicit TSnapshot(const ui64 planStep, const ui64 txId)
+ : PlanStep(planStep)
+ , TxId(txId)
+ {}
ui64 GetPlanStep() const {
return PlanStep;
@@ -24,43 +30,29 @@ struct TSnapshot {
return TxId;
}
- TSnapshot& SetPlanStep(const ui64 value) {
- PlanStep = value;
- return *this;
- }
-
- TSnapshot& SetTxId(const ui64 value) {
- TxId = value;
- return *this;
- }
-
- constexpr bool IsZero() const noexcept {
+ bool IsZero() const noexcept {
return PlanStep == 0 && TxId == 0;
}
- constexpr bool Valid() const noexcept {
+ bool Valid() const noexcept {
return PlanStep && TxId;
}
- constexpr auto operator <=> (const TSnapshot&) const noexcept = default;
+ auto operator <=> (const TSnapshot&) const noexcept = default;
+ static constexpr TSnapshot Zero() noexcept {
+ return TSnapshot(0, 0);
+ }
+
static constexpr TSnapshot Max() noexcept {
- return TSnapshot().SetPlanStep(-1ll).SetTxId(-1ll);
+ return TSnapshot(-1ll, -1ll);
}
friend IOutputStream& operator << (IOutputStream& out, const TSnapshot& s) {
- return out << "{" << s.PlanStep << "," << s.TxId << "}";
+ return out << "{" << s.PlanStep << ':' << (s.TxId == std::numeric_limits<ui64>::max() ? "max" : ToString(s.TxId)) << "}";
}
};
-inline constexpr bool SnapLess(ui64 planStep1, ui64 txId1, ui64 planStep2, ui64 txId2) noexcept {
- return std::less<TSnapshot>()(TSnapshot{planStep1, txId1}, TSnapshot{planStep2, txId2});
-}
-
-inline constexpr bool SnapLessOrEqual(ui64 planStep1, ui64 txId1, ui64 planStep2, ui64 txId2) noexcept {
- return std::less_equal<TSnapshot>()(TSnapshot{planStep1, txId1}, TSnapshot{planStep2, txId2});
-}
-
class IBlobGroupSelector {
protected:
diff --git a/ydb/core/tx/columnshard/engines/filter.cpp b/ydb/core/tx/columnshard/engines/filter.cpp
index 2e04d3defc..2285dcd3e4 100644
--- a/ydb/core/tx/columnshard/engines/filter.cpp
+++ b/ydb/core/tx/columnshard/engines/filter.cpp
@@ -26,7 +26,7 @@ public:
}
bool operator[](const ui32 idx) const {
- return SnapLessOrEqual(RawSteps[idx], RawIds[idx], Snapshot.GetPlanStep(), Snapshot.GetTxId());
+ return std::less_equal<TSnapshot>()(TSnapshot(RawSteps[idx], RawIds[idx]), Snapshot);
}
};
diff --git a/ydb/core/tx/columnshard/engines/granules_table.h b/ydb/core/tx/columnshard/engines/granules_table.h
index e69cd1e57b..847c9dc4c3 100644
--- a/ydb/core/tx/columnshard/engines/granules_table.h
+++ b/ydb/core/tx/columnshard/engines/granules_table.h
@@ -5,20 +5,26 @@
namespace NKikimr::NOlap {
struct TGranuleRecord {
+private:
+ TSnapshot CreatedAt;
+public:
ui64 PathId;
ui64 Granule;
- TSnapshot CreatedAt;
NArrow::TReplaceKey Mark;
TGranuleRecord(ui64 pathId, ui64 granule, const TSnapshot& createdAt, const NArrow::TReplaceKey& mark)
- : PathId(pathId)
+ : CreatedAt(createdAt)
+ , PathId(pathId)
, Granule(granule)
- , CreatedAt(createdAt)
, Mark(mark)
{
Y_VERIFY(Mark.Size());
}
+ const TSnapshot& GetCreatedAt() const {
+ return CreatedAt;
+ }
+
bool operator == (const TGranuleRecord& rec) const {
return (PathId == rec.PathId) && (Mark == rec.Mark);
}
@@ -27,7 +33,7 @@ struct TGranuleRecord {
out << '{';
auto& snap = rec.CreatedAt;
out << rec.PathId << '#' << rec.Granule << ' '
- << snap.PlanStep << ':' << (snap.TxId == Max<ui64>() ? "max" : ToString(snap.TxId));
+ << snap;
out << '}';
return out;
}
diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp
index 299fd63c45..76fc2c9110 100644
--- a/ydb/core/tx/columnshard/engines/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/index_info.cpp
@@ -26,20 +26,16 @@ TIndexInfo::TIndexInfo(const TString& name, ui32 id)
, Name(name)
{}
-std::shared_ptr<arrow::RecordBatch> TIndexInfo::AddSpecialColumns(
- const std::shared_ptr<arrow::RecordBatch>& batch,
- const ui64 planStep,
- const ui64 txId)
-{
+std::shared_ptr<arrow::RecordBatch> TIndexInfo::AddSpecialColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) {
Y_VERIFY(batch);
i64 numColumns = batch->num_columns();
i64 numRows = batch->num_rows();
auto res = batch->AddColumn(numColumns, arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()),
- NArrow::MakeUI64Array(planStep, numRows));
+ NArrow::MakeUI64Array(snapshot.GetPlanStep(), numRows));
Y_VERIFY(res.ok());
res = (*res)->AddColumn(numColumns + 1, arrow::field(SPEC_COL_TX_ID, arrow::uint64()),
- NArrow::MakeUI64Array(txId, numRows));
+ NArrow::MakeUI64Array(snapshot.GetTxId(), numRows));
Y_VERIFY(res.ok());
Y_VERIFY((*res)->num_columns() == numColumns + 2);
return *res;
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index ec28d2dc63..806d39bb09 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -40,8 +40,7 @@ public:
/// Appends the special columns to the batch.
static std::shared_ptr<arrow::RecordBatch> AddSpecialColumns(
const std::shared_ptr<arrow::RecordBatch>& batch,
- const ui64 platStep,
- const ui64 txId);
+ const TSnapshot& snapshot);
/// Makes schema as set of the special columns.
static std::shared_ptr<arrow::Schema> ArrowSchemaSnapshot();
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
index eed53c1896..6522d25221 100644
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
@@ -36,7 +36,7 @@ arrow::ipc::IpcWriteOptions WriteOptions(const TCompression& compression) {
std::shared_ptr<arrow::RecordBatch> TIndexationLogic::AddSpecials(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
const TIndexInfo& indexInfo, const TInsertedData& inserted) const {
- auto batch = TIndexInfo::AddSpecialColumns(srcBatch, inserted.PlanStep(), inserted.TxId());
+ auto batch = TIndexInfo::AddSpecialColumns(srcBatch, inserted.GetSnapshot());
Y_VERIFY(batch);
return NArrow::ExtractColumns(batch, indexInfo.ArrowSchemaWithSpecials());
@@ -229,7 +229,7 @@ TVector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChanges> i
TSnapshot& minSnapshot = changes->ApplySnapshot;
THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches;
for (auto& inserted : changes->DataToIndex) {
- TSnapshot insertSnap{inserted.PlanStep(), inserted.TxId()};
+ TSnapshot insertSnap = inserted.GetSnapshot();
Y_VERIFY(insertSnap.Valid());
if (minSnapshot.IsZero() || insertSnap <= minSnapshot) {
minSnapshot = insertSnap;
@@ -325,13 +325,13 @@ TVector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TColumnEngin
if (!slice || slice->num_rows() == 0) {
continue;
}
- auto tmp = MakeAppendedPortions(pathId, slice, granule, TSnapshot{}, blobs);
+ auto tmp = MakeAppendedPortions(pathId, slice, granule, TSnapshot::Zero(), blobs);
for (auto&& portionInfo : tmp) {
portions.emplace_back(std::move(portionInfo));
}
}
} else {
- portions = MakeAppendedPortions(pathId, batch, granule, TSnapshot{}, blobs);
+ portions = MakeAppendedPortions(pathId, batch, granule, TSnapshot::Zero(), blobs);
}
Y_VERIFY(portions.size() > 0);
@@ -664,7 +664,7 @@ TVector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr<TCo
for (const auto& batch : idBatches[id]) {
// Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
- auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, TSnapshot{}, blobs);
+ auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, TSnapshot::Zero(), blobs);
Y_VERIFY(newPortions.size() > 0);
for (auto& portion : newPortions) {
changes->AppendedPortions.emplace_back(std::move(portion));
@@ -680,7 +680,7 @@ TVector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr<TCo
ui64 tmpGranule = changes->SetTmpGranule(pathId, ts);
// Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
- auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, TSnapshot{}, blobs);
+ auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, TSnapshot::Zero(), blobs);
Y_VERIFY(portions.size() > 0);
for (auto& portion : portions) {
changes->AppendedPortions.emplace_back(std::move(portion));
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 31ebe70139..735d5ed7b2 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -183,12 +183,12 @@ void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& da
}
std::shared_ptr<arrow::RecordBatch>
-TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& srcBatch, ui64 planStep, ui64 txId) const {
+TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& srcBatch, const TSnapshot& snapshot) const {
Y_VERIFY(srcBatch);
// Extract columns (without check), filter, attach snapshot, extract columns with check
// (do not filter snapshot columns)
- auto loadSchema = ReadMetadata->GetLoadSchema(TSnapshot().SetPlanStep(planStep).SetTxId(txId));
+ auto loadSchema = ReadMetadata->GetLoadSchema(snapshot);
auto batch = NArrow::ExtractExistedColumns(srcBatch, loadSchema);
Y_VERIFY(batch);
@@ -199,9 +199,7 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>&
}
auto preparedBatch = batch;
- preparedBatch = TIndexInfo::AddSpecialColumns(preparedBatch, planStep, txId);
- Y_VERIFY(preparedBatch);
-
+ preparedBatch = TIndexInfo::AddSpecialColumns(preparedBatch, snapshot);
preparedBatch = NArrow::ExtractColumns(preparedBatch, loadSchema);
Y_VERIFY(preparedBatch);
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index ad22e96433..1c155767de 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -65,16 +65,16 @@ public:
/// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK)
TVector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch);
- void AddNotIndexed(ui32 batchNo, TString blob, ui64 planStep, ui64 txId) {
- auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->GetBlobSchema(TSnapshot().SetPlanStep(planStep).SetTxId(txId)));
- AddNotIndexed(batchNo, batch, planStep, txId);
+ void AddNotIndexed(ui32 batchNo, TString blob, const TSnapshot& snapshot) {
+ auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->GetBlobSchema(snapshot));
+ AddNotIndexed(batchNo, batch, snapshot);
}
- void AddNotIndexed(ui32 batchNo, const std::shared_ptr<arrow::RecordBatch>& batch, ui64 planStep, ui64 txId) {
+ void AddNotIndexed(ui32 batchNo, const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) {
Y_VERIFY(batchNo < NotIndexed.size());
Y_VERIFY(!NotIndexed[batchNo]);
++ReadyNotIndexed;
- NotIndexed[batchNo] = MakeNotIndexedBatch(batch, planStep, txId);
+ NotIndexed[batchNo] = MakeNotIndexedBatch(batch, snapshot);
}
void AddIndexed(const TBlobRange& blobRange, const TString& column);
@@ -94,7 +94,7 @@ public:
private:
std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch(
- const std::shared_ptr<arrow::RecordBatch>& batch, ui64 planStep, ui64 txId) const;
+ const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) const;
std::shared_ptr<arrow::RecordBatch> MergeNotIndexed(
std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const;
diff --git a/ydb/core/tx/columnshard/engines/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table.cpp
index 448aaf3904..b9a800f510 100644
--- a/ydb/core/tx/columnshard/engines/insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table.cpp
@@ -187,7 +187,7 @@ bool TInsertTable::Load(IDbWrapper& dbTable, const TInstant& loadTime) {
return true;
}
-std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, ui64 plan, ui64 txId) const {
+std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& snapshot) const {
const auto* committed = CommittedByPathId.FindPtr(pathId);
if (!committed) {
return {};
@@ -197,8 +197,8 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, ui64 plan, ui64 txId
ret.reserve(committed->size());
for (const auto& data : *committed) {
- if (SnapLessOrEqual(data.ShardOrPlan, data.WriteTxId, plan, txId)) {
- ret.emplace_back(TCommittedBlob{data.BlobId, data.ShardOrPlan, data.WriteTxId});
+ if (std::less_equal<TSnapshot>()(data.GetSnapshot(), snapshot)) {
+ ret.emplace_back(TCommittedBlob{data.BlobId, data.GetSnapshot()});
}
}
diff --git a/ydb/core/tx/columnshard/engines/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table.h
index 7a48a3c00c..93357ffbf2 100644
--- a/ydb/core/tx/columnshard/engines/insert_table.h
+++ b/ydb/core/tx/columnshard/engines/insert_table.h
@@ -80,22 +80,37 @@ struct TInsertedData {
DedupId.clear();
}
- ui64 PlanStep() const { return ShardOrPlan; }
- ui64 TxId() const { return WriteTxId; }
+ TSnapshot GetSnapshot() const {
+ return TSnapshot(ShardOrPlan, WriteTxId);
+ }
+
ui32 BlobSize() const { return BlobId.BlobSize(); }
};
-struct TCommittedBlob {
+class TCommittedBlob {
+private:
TUnifiedBlobId BlobId;
- ui64 PlanStep{0};
- ui64 TxId{0};
+ TSnapshot Snapshot;
+public:
+ TCommittedBlob(const TUnifiedBlobId& blobId, const TSnapshot& snapshot)
+ : BlobId(blobId)
+ , Snapshot(snapshot)
+ {}
/// It uses trick then we place key wtih planStep:txId in container and find them later by BlobId only.
/// So hash() and equality should depend on BlobId only.
bool operator == (const TCommittedBlob& key) const { return BlobId == key.BlobId; }
ui64 Hash() const noexcept { return BlobId.Hash(); }
TString DebugString() const {
- return TStringBuilder() << BlobId << ";ps=" << PlanStep << ";ti=" << TxId;
+ return TStringBuilder() << BlobId << ";ps=" << Snapshot.GetPlanStep() << ";ti=" << Snapshot.GetTxId();
+ }
+
+ const TSnapshot& GetSnapshot() const {
+ return Snapshot;
+ }
+
+ const TUnifiedBlobId& GetBlobId() const {
+ return BlobId;
}
};
@@ -123,7 +138,7 @@ public:
THashSet<TWriteId> DropPath(IDbWrapper& dbTable, ui64 pathId);
void EraseCommitted(IDbWrapper& dbTable, const TInsertedData& key);
void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key);
- std::vector<TCommittedBlob> Read(ui64 pathId, ui64 plan, ui64 txId) const;
+ std::vector<TCommittedBlob> Read(ui64 pathId, const TSnapshot& snapshot) const;
bool Load(IDbWrapper& dbTable, const TInstant& loadTime);
const TCounters& GetCountersPrepared() const { return StatsPrepared; }
const TCounters& GetCountersCommitted() const { return StatsCommitted; }
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index 8f364f92ec..3e12e32006 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -146,13 +146,13 @@ struct TPortionInfo {
TSnapshot Snapshot() const {
Y_VERIFY(!Empty());
auto& rec = Records[0];
- return {rec.PlanStep, rec.TxId};
+ return TSnapshot(rec.PlanStep, rec.TxId);
}
TSnapshot XSnapshot() const {
Y_VERIFY(!Empty());
auto& rec = Records[0];
- return {rec.XPlanStep, rec.XTxId};
+ return TSnapshot(rec.XPlanStep, rec.XTxId);
}
bool IsActive() const {
diff --git a/ydb/core/tx/columnshard/engines/reader/description.h b/ydb/core/tx/columnshard/engines/reader/description.h
index 38444123e0..f02c04cdc5 100644
--- a/ydb/core/tx/columnshard/engines/reader/description.h
+++ b/ydb/core/tx/columnshard/engines/reader/description.h
@@ -15,6 +15,9 @@ public:
// Describes read/scan request
struct TReadDescription {
+private:
+ TSnapshot Snapshot;
+public:
// Table
ui64 PathId = 0;
TString TableName;
@@ -31,15 +34,17 @@ struct TReadDescription {
// List of columns
TVector<ui32> ColumnIds;
TVector<TString> ColumnNames;
- // Snapshot
- ui64 PlanStep = 0;
- ui64 TxId = 0;
-
+
std::shared_ptr<NSsa::TProgram> AddProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program);
- TReadDescription(const bool isReverse)
- : PKRangesFilter(isReverse) {
+ TReadDescription(const TSnapshot& snapshot, const bool isReverse)
+ : Snapshot(snapshot)
+ , PKRangesFilter(isReverse) {
}
+
+ const TSnapshot& GetSnapshot() const {
+ return Snapshot;
+ }
};
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
index 225bc1058c..c396b88bbb 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
@@ -19,13 +19,13 @@ std::shared_ptr<NOlap::TSelectInfo> TDataStorageAccessor::Select(const NOlap::TR
return std::make_shared<NOlap::TSelectInfo>();
}
return Index->Select(readDescription.PathId,
- {readDescription.PlanStep, readDescription.TxId},
+ readDescription.GetSnapshot(),
columnIds,
readDescription.PKRangesFilter);
}
std::vector<NOlap::TCommittedBlob> TDataStorageAccessor::GetCommitedBlobs(const NOlap::TReadDescription& readDescription) const {
- return std::move(InsertTable->Read(readDescription.PathId, readDescription.PlanStep, readDescription.TxId));
+ return std::move(InsertTable->Read(readDescription.PathId, readDescription.GetSnapshot()));
}
std::shared_ptr<arrow::RecordBatch> TDataStorageAccessor::GetCachedBatch(const TUnifiedBlobId& blobId) const {
@@ -90,8 +90,8 @@ bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataSto
CommittedBlobs = dataAccessor.GetCommitedBlobs(readDescription);
for (auto& cmt : CommittedBlobs) {
- if (auto batch = dataAccessor.GetCachedBatch(cmt.BlobId)) {
- CommittedBatches.emplace(cmt.BlobId, batch);
+ if (auto batch = dataAccessor.GetCachedBatch(cmt.GetBlobId())) {
+ CommittedBatches.emplace(cmt.GetBlobId(), batch);
}
}
diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
index 0255cbc3d2..edc295a6ae 100644
--- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
@@ -67,9 +67,9 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) {
UNIT_ASSERT(!ok);
// read nothing
- auto blobs = insertTable.Read(tableId, 0, 0);
+ auto blobs = insertTable.Read(tableId, TSnapshot::Zero());
UNIT_ASSERT_EQUAL(blobs.size(), 0);
- blobs = insertTable.Read(tableId+1, 0, 0);
+ blobs = insertTable.Read(tableId+1, TSnapshot::Zero());
UNIT_ASSERT_EQUAL(blobs.size(), 0);
// commit
@@ -82,15 +82,15 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) {
UNIT_ASSERT_EQUAL(committed.begin()->second.size(), 1);
// read old snapshot
- blobs = insertTable.Read(tableId, 0, 0);
+ blobs = insertTable.Read(tableId, TSnapshot::Zero());
UNIT_ASSERT_EQUAL(blobs.size(), 0);
- blobs = insertTable.Read(tableId+1, 0, 0);
+ blobs = insertTable.Read(tableId+1, TSnapshot::Zero());
UNIT_ASSERT_EQUAL(blobs.size(), 0);
// read new snapshot
- blobs = insertTable.Read(tableId, planStep, txId);
+ blobs = insertTable.Read(tableId, TSnapshot(planStep, txId));
UNIT_ASSERT_EQUAL(blobs.size(), 1);
- blobs = insertTable.Read(tableId+1, 0, 0);
+ blobs = insertTable.Read(tableId+1, TSnapshot::Zero());
UNIT_ASSERT_EQUAL(blobs.size(), 0);
}
}
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 71c9ce8956..0043fb5709 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -292,7 +292,7 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap,
bool Insert(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap,
TVector<TInsertedData>&& dataToIndex, THashMap<TBlobRange, TString>& blobs, ui32& step) {
TColumnEngineForLogs engine(0, TestLimits());
- engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo));
+ engine.UpdateDefaultSchema(snap, TIndexInfo(tableInfo));
THashSet<TUnifiedBlobId> lostBlobs;
engine.Load(db, lostBlobs);
@@ -312,7 +312,7 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T
UNIT_ASSERT_VALUES_EQUAL(compactionInfo->Granules.size(), 1);
UNIT_ASSERT(!compactionInfo->InGranule);
- std::shared_ptr<TColumnEngineChanges> changes = engine.StartCompaction(std::move(compactionInfo), {0, 0});
+ std::shared_ptr<TColumnEngineChanges> changes = engine.StartCompaction(std::move(compactionInfo), TSnapshot::Zero());
UNIT_ASSERT_VALUES_EQUAL(changes->SwitchedPortions.size(), expected.SrcPortions);
changes->SetBlobs(std::move(blobs));
@@ -330,7 +330,7 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T
bool Compact(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap, THashMap<TBlobRange,
TString>&& blobs, ui32& step, const TExpected& expected) {
TColumnEngineForLogs engine(0, TestLimits());
- engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo));
+ engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo));
THashSet<TUnifiedBlobId> lostBlobs;
engine.Load(db, lostBlobs);
return Compact(engine, db, snap, std::move(blobs), step, expected);
@@ -400,12 +400,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
THashMap<TBlobRange, TString> blobs;
blobs[blobRanges[0]] = testBlob;
blobs[blobRanges[1]] = testBlob;
- Insert(tableInfo, db, {1, 2}, std::move(dataToIndex), blobs, step);
+ Insert(tableInfo, db, TSnapshot(1, 2), std::move(dataToIndex), blobs, step);
// load
TColumnEngineForLogs engine(0);
- engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo));
+ engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo));
THashSet<TUnifiedBlobId> lostBlobs;
engine.Load(db, lostBlobs);
@@ -421,7 +421,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // select from snap before insert
ui64 planStep = 1;
ui64 txId = 0;
- auto selectInfo = engine.Select(paths[0], {planStep, txId}, columnIds, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), columnIds, NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 0);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
}
@@ -429,7 +429,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // select from snap after insert (greater txId)
ui64 planStep = 1;
ui64 txId = 2;
- auto selectInfo = engine.Select(paths[0], {planStep, txId}, columnIds, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), columnIds, NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions[0].NumRecords(), columnIds.size());
@@ -438,7 +438,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // select from snap after insert (greater planStep)
ui64 planStep = 2;
ui64 txId = 1;
- auto selectInfo = engine.Select(paths[0], {planStep, txId}, oneColumnId, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions[0].NumRecords(), 1);
@@ -447,7 +447,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // select another pathId
ui64 planStep = 2;
ui64 txId = 1;
- auto selectInfo = engine.Select(paths[1], {planStep, txId}, oneColumnId, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(paths[1], TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 0);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
}
@@ -492,7 +492,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
dataToIndex.push_back(
TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()});
- bool ok = Insert(tableInfo, db, {planStep, txId}, std::move(dataToIndex), blobs, step);
+ bool ok = Insert(tableInfo, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
}
@@ -505,7 +505,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// load
TColumnEngineForLogs engine(0, TestLimits());
- engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo));
+ engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo));
THashSet<TUnifiedBlobId> lostBlobs;
engine.Load(db, lostBlobs);
@@ -520,7 +520,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
- auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 4);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 4);
}
@@ -535,7 +535,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
}
NOlap::TPKRangesFilter pkFilter(false);
Y_VERIFY(pkFilter.Add(gt10k, nullptr, nullptr));
- auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, pkFilter);
+ auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, pkFilter);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2);
}
@@ -548,7 +548,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
}
NOlap::TPKRangesFilter pkFilter(false);
Y_VERIFY(pkFilter.Add(nullptr, lt10k, nullptr));
- auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, pkFilter);
+ auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, pkFilter);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2);
}
@@ -580,7 +580,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
ui64 planStep = 1;
TColumnEngineForLogs engine(0, TestLimits());
- engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo));
+ engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo));
THashSet<TUnifiedBlobId> lostBlobs;
engine.Load(db, lostBlobs);
@@ -598,7 +598,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
dataToIndex.push_back(
TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()});
- bool ok = Insert(engine, db, {planStep, txId}, std::move(dataToIndex), blobs, step);
+ bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
// first overload returns ok: it's a postcondition
if (!overload) {
UNIT_ASSERT(ok);
@@ -612,7 +612,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // check it's overloaded after reload
TColumnEngineForLogs tmpEngine(0, TestLimits());
- tmpEngine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo));
+ tmpEngine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo));
tmpEngine.Load(db, lostBlobs);
UNIT_ASSERT(tmpEngine.GetOverloadedGranules(pathId));
}
@@ -636,7 +636,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
dataToIndex.push_back(
TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()});
- bool ok = Insert(engine, db, {planStep, txId}, std::move(dataToIndex), blobs, step);
+ bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
bool overload = engine.GetOverloadedGranules(pathId);
UNIT_ASSERT(ok);
UNIT_ASSERT(!overload);
@@ -644,7 +644,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // check it's not overloaded after reload
TColumnEngineForLogs tmpEngine(0, TestLimits());
- tmpEngine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo));
+ tmpEngine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo));
tmpEngine.Load(db, lostBlobs);
UNIT_ASSERT(!tmpEngine.GetOverloadedGranules(pathId));
}
@@ -673,7 +673,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
dataToIndex.push_back(
TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()});
- bool ok = Insert(tableInfo, db, {planStep, txId}, std::move(dataToIndex), blobs, step);
+ bool ok = Insert(tableInfo, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
}
@@ -686,7 +686,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// load
TColumnEngineForLogs engine(0, TestLimits());
- engine.UpdateDefaultSchema(TSnapshot(), TIndexInfo(tableInfo));
+ engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo));
THashSet<TUnifiedBlobId> lostBlobs;
engine.Load(db, lostBlobs);
@@ -698,7 +698,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
- auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 4);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 4);
}
@@ -708,7 +708,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
- auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 4);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 4);
}
@@ -725,7 +725,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
- auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2);
}
@@ -736,7 +736,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
- auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, NOlap::TPKRangesFilter(false));
+ auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2);
}
diff --git a/ydb/core/tx/columnshard/inflight_request_tracker.h b/ydb/core/tx/columnshard/inflight_request_tracker.h
index de55e0d469..c8b95d9ea5 100644
--- a/ydb/core/tx/columnshard/inflight_request_tracker.h
+++ b/ydb/core/tx/columnshard/inflight_request_tracker.h
@@ -66,7 +66,7 @@ public:
}
for (const auto& committedBlob : readMeta->CommittedBlobs) {
- blobTracker.SetBlobInUse(committedBlob.BlobId, false);
+ blobTracker.SetBlobInUse(committedBlob.GetBlobId(), false);
}
}
@@ -107,7 +107,7 @@ private:
}
for (const auto& committedBlob : readMeta->CommittedBlobs) {
- blobTracker.SetBlobInUse(committedBlob.BlobId, true);
+ blobTracker.SetBlobInUse(committedBlob.GetBlobId(), true);
}
}
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index 99f4d17485..b9aa58f2d4 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -64,13 +64,13 @@ public:
WaitIndexed.erase(event.BlobRange);
IndexedData.AddIndexed(event.BlobRange, event.Data);
} else if (CommittedBlobs.contains(blobId)) {
- auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, 0, 0});
+ auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, NOlap::TSnapshot::Zero()});
if (cmt.empty()) {
return; // ignore duplicates
}
const NOlap::TCommittedBlob& cmtBlob = cmt.key();
ui32 batchNo = cmt.mapped();
- IndexedData.AddNotIndexed(batchNo, event.Data, cmtBlob.PlanStep, cmtBlob.TxId);
+ IndexedData.AddNotIndexed(batchNo, event.Data, cmtBlob.GetSnapshot());
} else {
LOG_S_ERROR("TEvReadBlobRangeResult returned unexpected blob at tablet "
<< TabletId << " (read)");
@@ -167,7 +167,7 @@ public:
for (size_t i = 0; i < ReadMetadata->CommittedBlobs.size(); ++i, ++notIndexed) {
const auto& cmtBlob = ReadMetadata->CommittedBlobs[i];
- CommittedBlobs.emplace(cmtBlob.BlobId);
+ CommittedBlobs.emplace(cmtBlob.GetBlobId());
WaitCommitted.emplace(cmtBlob, notIndexed);
}
@@ -180,12 +180,12 @@ public:
// Add cached batches without read
for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) {
- auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, 0, 0});
+ auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, NOlap::TSnapshot::Zero()});
Y_VERIFY(!cmt.empty());
const NOlap::TCommittedBlob& cmtBlob = cmt.key();
ui32 batchNo = cmt.mapped();
- IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.PlanStep, cmtBlob.TxId);
+ IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.GetSnapshot());
}
LOG_S_DEBUG("Starting read (" << WaitIndexed.size() << " indexed, "
@@ -209,7 +209,7 @@ public:
} else {
// TODO: Keep inflight
for (auto& [cmtBlob, batchNo] : WaitCommitted) {
- auto& blobId = cmtBlob.BlobId;
+ auto& blobId = cmtBlob.GetBlobId();
SendReadRequest(ctx, NBlobCache::TBlobRange(blobId, 0, blobId.BlobSize()));
}
for (auto&& blobRange : IndexedBlobs) {
diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h
index 423221195b..62a6917e25 100644
--- a/ydb/core/tx/columnshard/tables_manager.h
+++ b/ydb/core/tx/columnshard/tables_manager.h
@@ -173,7 +173,7 @@ public:
return *PrimaryIndex;
}
- const NOlap::TIndexInfo& GetIndexInfo(const NOlap::TSnapshot& version = {}) const {
+ const NOlap::TIndexInfo& GetIndexInfo(const NOlap::TSnapshot& version = NOlap::TSnapshot::Zero()) const {
Y_UNUSED(version);
Y_VERIFY(!!PrimaryIndex);
return PrimaryIndex->GetIndexInfo();
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 83c252b36a..a940c5f549 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -310,7 +310,7 @@ struct TestTableDescription {
void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId,
const TestTableDescription& table, TString codec = "none") {
- NOlap::TSnapshot snap = {10, 10};
+ NOlap::TSnapshot snap(10, 10);
TString txBody;
if (table.InStore) {
txBody = TTestSchema::CreateTableTxBody(
@@ -2248,7 +2248,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
{ // Get index stats
- ScanIndexStats(runtime, sender, {tableId, 42}, {planStep, txId}, 0);
+ ScanIndexStats(runtime, sender, {tableId, 42}, NOlap::TSnapshot(planStep, txId), 0);
auto scanInited = runtime.GrabEdgeEvent<NKqp::TEvKqpCompute::TEvScanInitActor>(handle);
auto& msg = scanInited->Record;
auto scanActorId = ActorIdFromProto(msg.GetScanActorId());
@@ -2408,7 +2408,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
// Cerr << response << Endl;
UNIT_ASSERT_VALUES_EQUAL(response.GetStatus(), Ydb::StatusIds::BAD_REQUEST);
UNIT_ASSERT_VALUES_EQUAL(response.IssuesSize(), 1);
- UNIT_ASSERT_STRING_CONTAINS(response.GetIssues(0).message(), "Snapshot 640000:18446744073709551615 too old");
+ UNIT_ASSERT_STRING_CONTAINS(response.GetIssues(0).message(), "Snapshot too old: {640000:max}");
}
}
diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
index feab71de25..5414091dec 100644
--- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
@@ -70,12 +70,12 @@ bool TriggerTTL(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot s
ui64 tsSeconds, const TString& ttlColumnName) {
TString txBody = TTestSchema::TtlTxBody(pathIds, ttlColumnName, tsSeconds);
auto event = std::make_unique<TEvColumnShard::TEvProposeTransaction>(
- NKikimrTxColumnShard::TX_KIND_TTL, sender, snap.TxId, txBody);
+ NKikimrTxColumnShard::TX_KIND_TTL, sender, snap.GetTxId(), txBody);
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, event.release());
auto ev = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(sender);
const auto& res = ev->Get()->Record;
- UNIT_ASSERT_EQUAL(res.GetTxId(), snap.TxId);
+ UNIT_ASSERT_EQUAL(res.GetTxId(), snap.GetTxId());
UNIT_ASSERT_EQUAL(res.GetTxKind(), NKikimrTxColumnShard::TX_KIND_TTL);
return (res.GetStatus() == NKikimrTxColumnShard::SUCCESS);
}
@@ -165,7 +165,7 @@ bool TestCreateTable(const TString& txBody, ui64 planStep = 1000, ui64 txId = 10
runtime.DispatchEvents(options);
//
- return ProposeSchemaTx(runtime, sender, txBody, {++planStep, ++txId});
+ return ProposeSchemaTx(runtime, sender, txBody, NOlap::TSnapshot(++planStep, ++txId));
}
TString GetReadResult(NKikimrTxColumnShard::TEvReadResult& resRead,
@@ -237,9 +237,9 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
}
bool ok = ProposeSchemaTx(runtime, sender,
TTestSchema::CreateInitShardTxBody(tableId, ydbSchema, testYdbPk, spec, "/Root/olapStore"),
- {++planStep, ++txId});
+ NOlap::TSnapshot(++planStep, ++txId));
UNIT_ASSERT(ok);
- PlanSchemaTx(runtime, sender, {planStep, txId});
+ PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId));
if (spec.HasTiers()) {
ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec));
}
@@ -260,9 +260,9 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
}
if (internal) {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn);
+ TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {}, 0, spec.TtlColumn);
} else {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] + ttlIncSeconds, spec.TtlColumn);
+ TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {tableId}, ts[0] + ttlIncSeconds, spec.TtlColumn);
}
TAutoPtr<IEventHandle> handle;
@@ -297,17 +297,17 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
}
ok = ProposeSchemaTx(runtime, sender,
TTestSchema::AlterTableTxBody(tableId, 2, spec),
- {++planStep, ++txId});
+ NOlap::TSnapshot(++planStep, ++txId));
UNIT_ASSERT(ok);
- PlanSchemaTx(runtime, sender, {planStep, txId});
+ PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId));
if (spec.HasTiers()) {
ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec));
}
if (internal) {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn);
+ TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {}, 0, spec.TtlColumn);
} else {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[1] + ttlIncSeconds, spec.TtlColumn);
+ TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {tableId}, ts[1] + ttlIncSeconds, spec.TtlColumn);
}
{
@@ -327,21 +327,21 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
// Disable TTL
ok = ProposeSchemaTx(runtime, sender,
TTestSchema::AlterTableTxBody(tableId, 3, TTestSchema::TTableSpecials()),
- {++planStep, ++txId});
+ NOlap::TSnapshot(++planStep, ++txId));
UNIT_ASSERT(ok);
if (spec.HasTiers()) {
ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(TTestSchema::TTableSpecials()));
}
- PlanSchemaTx(runtime, sender, {planStep, txId});
+ PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId));
UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, blobs[0]));
ProposeCommit(runtime, sender, metaShard, ++txId, {writeId});
PlanCommit(runtime, sender, ++planStep, txId);
if (internal) {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn);
+ TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {}, 0, spec.TtlColumn);
} else {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] - ttlIncSeconds, spec.TtlColumn);
+ TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {tableId}, ts[0] - ttlIncSeconds, spec.TtlColumn);
}
{
@@ -514,10 +514,10 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
{
const bool ok = ProposeSchemaTx(runtime, sender,
TTestSchema::CreateInitShardTxBody(tableId, testYdbSchema, testYdbPk, specs[0], "/Root/olapStore"),
- { ++planStep, ++txId });
+ NOlap::TSnapshot(++planStep, ++txId));
UNIT_ASSERT(ok);
}
- PlanSchemaTx(runtime, sender, {planStep, txId});
+ PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId));
if (specs[0].Tiers.size()) {
ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[0]));
}
@@ -552,9 +552,9 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
{
const bool ok = ProposeSchemaTx(runtime, sender,
TTestSchema::AlterTableTxBody(tableId, version, specs[i]),
- { ++planStep, ++txId });
+ NOlap::TSnapshot(++planStep, ++txId));
UNIT_ASSERT(ok);
- PlanSchemaTx(runtime, sender, { planStep, txId });
+ PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId));
}
}
if (specs[i].HasTiers()) {
@@ -573,7 +573,7 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
// Eviction
- TriggerTTL(runtime, sender, { ++planStep, ++txId }, {}, 0, specs[i].TtlColumn);
+ TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {}, 0, specs[i].TtlColumn);
Cerr << (hasColdEviction ? "Cold" : "Hot")
<< " tiering, spec " << i << ", num tiers: " << specs[i].Tiers.size() << "\n";
@@ -856,9 +856,9 @@ void TestDrop(bool reboots) {
ui64 txId = 100;
bool ok = ProposeSchemaTx(runtime, sender, TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk),
- {++planStep, ++txId});
+ NOlap::TSnapshot(++planStep, ++txId));
UNIT_ASSERT(ok);
- PlanSchemaTx(runtime, sender, {planStep, txId});
+ PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId));
//
@@ -884,9 +884,9 @@ void TestDrop(bool reboots) {
}
// Drop table
- ok = ProposeSchemaTx(runtime, sender, TTestSchema::DropTableTxBody(tableId, 2), {++planStep, ++txId});
+ ok = ProposeSchemaTx(runtime, sender, TTestSchema::DropTableTxBody(tableId, 2), NOlap::TSnapshot(++planStep, ++txId));
UNIT_ASSERT(ok);
- PlanSchemaTx(runtime, sender, {planStep, txId});
+ PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId));
if (reboots) {
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
@@ -931,9 +931,9 @@ void TestDropWriteRace() {
UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1"));
bool ok = ProposeSchemaTx(runtime, sender, TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk),
- {++planStep, ++txId});
+ NOlap::TSnapshot(++planStep, ++txId));
UNIT_ASSERT(ok);
- PlanSchemaTx(runtime, sender, {planStep, txId});
+ PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId));
TString data = MakeTestBlob({0, 100}, testYdbSchema);
UNIT_ASSERT(data.size() < NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
@@ -945,9 +945,9 @@ void TestDropWriteRace() {
auto commitTxId = txId;
// Drop table
- ok = ProposeSchemaTx(runtime, sender, TTestSchema::DropTableTxBody(tableId, 2), {++planStep, ++txId});
+ ok = ProposeSchemaTx(runtime, sender, TTestSchema::DropTableTxBody(tableId, 2), NOlap::TSnapshot(++planStep, ++txId));
if (ok) {
- PlanSchemaTx(runtime, sender, {planStep, txId});
+ PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId));
}
// Plan commit