diff options
author | nsofya <nsofya@yandex-team.com> | 2023-05-23 17:17:59 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-05-23 17:17:59 +0300 |
commit | 2e7ed60ae97dc177d2c964c338ac10bfbc6977cf (patch) | |
tree | d5acc13c0658bb89b445b7d739129bedbe61dfaf | |
parent | edc61279de69023329a1f01c7e8b7f87d0f9ceab (diff) | |
download | ydb-2e7ed60ae97dc177d2c964c338ac10bfbc6977cf.tar.gz |
Correct portion snapshot
1) Сохранение версии индекса в момент сохранения в InsertTable
2) Приведение блоков к максимальной по вресии схеме при сохранении в индекс.
20 files changed, 335 insertions, 189 deletions
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index cbf27bdccf8..e26ec97adff 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -4350,7 +4350,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } - void InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates) { + void InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const std::function<void()> onBeforeCommit = {}, const EStatus opStatus = EStatus::SUCCESS) { NLongTx::TLongTxBeginResult resBeginTx = LongTxClient.BeginWriteTx().GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString()); @@ -4360,7 +4360,11 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { NLongTx::TLongTxWriteResult resWrite = LongTxClient.Write(txId, table.GetName(), txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), EStatus::SUCCESS, resWrite.Status().GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), opStatus, resWrite.Status().GetIssues().ToString()); + + if (onBeforeCommit) { + onBeforeCommit(); + } NLongTx::TLongTxCommitResult resCommitTx = LongTxClient.CommitTx(txId).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); @@ -4404,7 +4408,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema); testHelper.CreateTable(testTable); - + { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); @@ -4430,7 +4434,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { auto columns = description.GetTableColumns(); UNIT_ASSERT_VALUES_EQUAL(columns.size(), 4); } - + testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]"); testHelper.ReadData("SELECT new_column FROM `/Root/ColumnTableTest` WHERE id=1", "[[#]]"); testHelper.ReadData("SELECT resource_id FROM `/Root/ColumnTableTest` WHERE id=1", "[[[\"test_res_1\"]]]"); @@ -4447,6 +4451,60 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { testHelper.ReadData("SELECT new_column FROM `/Root/ColumnTableTest`", "[[#];[#];[[200u]]]"); } + Y_UNIT_TEST(AddColumnOldScheme) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(runnerSettings); + + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false), + TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8), + TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32) + }; + + TTestHelper::TColumnTable testTable; + + testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema); + testHelper.CreateTable(testTable); + { + auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` ADD COLUMN new_column Uint64;"; + auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + { + TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); + tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); + testHelper.InsertData(testTable, tableInserter, {}, EStatus::SCHEME_ERROR); + } + } + + Y_UNIT_TEST(AddColumnOnSchemeChange) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(runnerSettings); + + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false), + TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8), + TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32) + }; + + TTestHelper::TColumnTable testTable; + + testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema); + testHelper.CreateTable(testTable); + + { + TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); + tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); + testHelper.InsertData(testTable, tableInserter, [&testTable, &testHelper]() { + auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` ADD COLUMN new_column Uint64;"; + auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync(); + }); + } + testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]"); + } + Y_UNIT_TEST(AddColumnWithStore) { TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index 411cd360f2d..913b7056ce7 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -19,12 +19,12 @@ TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstP IndexedData.InitRead(batchNo); // Add cached batches without read for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) { - auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, NOlap::TSnapshot::Zero() }); + auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob::BuildKeyBlob(blobId)); Y_VERIFY(!cmt.empty()); const NOlap::TCommittedBlob& cmtBlob = cmt.key(); ui32 batchNo = cmt.mapped(); - IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.GetSnapshot()); + IndexedData.AddNotIndexed(batchNo, batch, cmtBlob); } // Read all remained committed blobs for (const auto& [cmtBlob, _] : WaitCommitted) { @@ -44,11 +44,11 @@ void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data if (IndexedData.IsIndexedBlob(blobRange)) { IndexedData.AddIndexed(blobRange, data); } else { - auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, NOlap::TSnapshot::Zero() }); + auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob::BuildKeyBlob(blobId)); Y_VERIFY(!cmt.empty()); const NOlap::TCommittedBlob& cmtBlob = cmt.key(); ui32 batchNo = cmt.mapped(); - IndexedData.AddNotIndexed(batchNo, data, cmtBlob.GetSnapshot()); + IndexedData.AddNotIndexed(batchNo, data, cmtBlob); } } diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 5ccca9c758c..33ee2942e37 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -65,7 +65,8 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { TBlobGroupSelector dsGroupSelector(Self->Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - NOlap::TInsertedData insertData(metaShard, writeId, tableId, dedupId, logoBlobId, metaStr, time); + const auto& snapshotSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema(); + NOlap::TInsertedData insertData(metaShard, writeId, tableId, dedupId, logoBlobId, metaStr, time, snapshotSchema->GetSnapshot()); ok = Self->InsertTable->Insert(dbTable, std::move(insertData)); if (ok) { THashSet<TWriteId> writesToAbort = Self->InsertTable->OldWritesToAbort(time); @@ -221,7 +222,9 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize; ++WritesInFly; // write started - ctx.Register(CreateWriteActor(TabletID(), TablesManager.GetIndexInfo(), ctx.SelfID, + + const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema(); + ctx.Register(CreateWriteActor(TabletID(), snapshotSchema->GetIndexInfo(), ctx.SelfID, BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release())); } diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 40c3539ec55..fab45d3bdee 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -59,11 +59,8 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) bool ok = false; if (Ev->Get()->PutStatus == NKikimrProto::OK) { - NOlap::TSnapshot snapshot = changes->ApplySnapshot; - if (snapshot.IsZero()) { - snapshot = NOlap::TSnapshot(Self->LastPlannedStep, Self->LastPlannedTxId); - Y_VERIFY(Ev->Get()->IndexInfo.GetLastSchema()->GetSnapshot() <= snapshot); - } + NOlap::TSnapshot snapshot(Self->LastPlannedStep, Self->LastPlannedTxId); + Y_VERIFY(Ev->Get()->IndexInfo.GetLastSchema()->GetSnapshot() <= snapshot); TBlobGroupSelector dsGroupSelector(Self->Info()); NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector); diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index d81b47a627c..29ead9b7053 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -197,9 +197,11 @@ struct Schema : NIceDb::Schema { struct DedupId : Column<5, NScheme::NTypeIds::String> {}; struct BlobId : Column<6, NScheme::NTypeIds::String> {}; struct Meta : Column<7, NScheme::NTypeIds::String> {}; + struct IndexPlanStep : Column<8, NScheme::NTypeIds::Uint64> {}; + struct IndexTxId : Column<9, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey<Committed, ShardOrPlan, WriteTxId, PathId, DedupId>; - using TColumns = TableColumns<Committed, ShardOrPlan, WriteTxId, PathId, DedupId, BlobId, Meta>; + using TColumns = TableColumns<Committed, ShardOrPlan, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId>; }; struct IndexGranules : NIceDb::Schema::Table<GranulesTableId> { @@ -416,10 +418,19 @@ struct Schema : NIceDb::Schema { // InsertTable activities static void InsertTable_Upsert(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) { - db.Table<InsertTable>().Key((ui8)recType, data.ShardOrPlan, data.WriteTxId, data.PathId, data.DedupId).Update( - NIceDb::TUpdate<InsertTable::BlobId>(data.BlobId.ToStringLegacy()), - NIceDb::TUpdate<InsertTable::Meta>(data.Metadata) - ); + if (data.GetSchemaSnapshot().Valid()) { + db.Table<InsertTable>().Key((ui8)recType, data.ShardOrPlan, data.WriteTxId, data.PathId, data.DedupId).Update( + NIceDb::TUpdate<InsertTable::BlobId>(data.BlobId.ToStringLegacy()), + NIceDb::TUpdate<InsertTable::Meta>(data.Metadata), + NIceDb::TUpdate<InsertTable::IndexPlanStep>(data.GetSchemaSnapshot().GetPlanStep()), + NIceDb::TUpdate<InsertTable::IndexTxId>(data.GetSchemaSnapshot().GetTxId()) + ); + } else { + db.Table<InsertTable>().Key((ui8)recType, data.ShardOrPlan, data.WriteTxId, data.PathId, data.DedupId).Update( + NIceDb::TUpdate<InsertTable::BlobId>(data.BlobId.ToStringLegacy()), + NIceDb::TUpdate<InsertTable::Meta>(data.Metadata) + ); + } } static void InsertTable_Erase(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) { @@ -469,6 +480,13 @@ struct Schema : NIceDb::Schema { TString strBlobId = rowset.GetValue<InsertTable::BlobId>(); TString metaStr = rowset.GetValue<InsertTable::Meta>(); + std::optional<NOlap::TSnapshot> indexSnapshot; + if (rowset.HaveValue<InsertTable::IndexPlanStep>()) { + ui64 indexPlanStep = rowset.GetValue<InsertTable::IndexPlanStep>(); + ui64 indexTxId = rowset.GetValue<InsertTable::IndexTxId>(); + indexSnapshot = NOlap::TSnapshot(indexPlanStep, indexTxId); + } + TString error; NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(strBlobId, dsGroupSelector, error); Y_VERIFY(blobId.IsValid(), "Failied to parse blob id: %s", error.c_str()); @@ -479,7 +497,7 @@ struct Schema : NIceDb::Schema { writeTime = TInstant::Seconds(meta.GetDirtyWriteTimeSeconds()); } - TInsertedData data(shardOrPlan, writeTxId, pathId, dedupId, blobId, metaStr, writeTime); + TInsertedData data(shardOrPlan, writeTxId, pathId, dedupId, blobId, metaStr, writeTime, indexSnapshot); switch (recType) { case EInsertTableIds::Inserted: diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 5669cf59061..007c4565dac 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -169,7 +169,6 @@ public: EType Type; TCompactionLimits Limits; TSnapshot InitSnapshot = TSnapshot::Zero(); - TSnapshot ApplySnapshot = TSnapshot::Zero(); std::unique_ptr<TCompactionInfo> CompactionInfo; std::vector<NOlap::TInsertedData> DataToIndex; std::vector<TPortionInfo> SwitchedPortions; // Portions that would be replaced by new ones diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index bfad74d8dea..05670517985 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -454,11 +454,8 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) { std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(const TCompactionLimits& limits, std::vector<TInsertedData>&& dataToIndex) { Y_VERIFY(dataToIndex.size()); - auto changes = std::make_shared<TChanges>(DefaultMark(), std::move(dataToIndex), limits); + auto changes = TChanges::BuildInsertChanges(DefaultMark(), std::move(dataToIndex), LastSnapshot, limits); ui32 reserveGranules = 0; - - changes->InitSnapshot = LastSnapshot; - for (const auto& data : changes->DataToIndex) { const ui64 pathId = data.PathId; @@ -496,7 +493,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std: Y_VERIFY(info); Y_VERIFY(info->Granules.size() == 1); - auto changes = std::make_shared<TChanges>(DefaultMark(), std::move(info), limits, LastSnapshot); + auto changes = TChanges::BuildCompactionChanges(DefaultMark(), std::move(info), limits, LastSnapshot); const ui64 granule = *changes->CompactionInfo->Granules.begin(); const auto gi = Granules.find(granule); @@ -541,7 +538,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T const TCompactionLimits& limits, THashSet<ui64>& pathsToDrop, ui32 maxRecords) { - auto changes = std::make_shared<TChanges>(DefaultMark(), snapshot, limits); + auto changes = TChanges::BuildClenupChanges(DefaultMark(), snapshot, limits); ui32 affectedRecords = 0; // Add all portions from dropped paths @@ -627,8 +624,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash return {}; } - TSnapshot fakeSnapshot(1, 1); // TODO: better snapshot - auto changes = std::make_shared<TChanges>(DefaultMark(), TColumnEngineChanges::TTL, fakeSnapshot); + auto changes = TChanges::BuildTtlChanges(DefaultMark()); ui64 evicttionSize = 0; bool allowEviction = true; ui64 dropBlobs = 0; @@ -758,9 +754,6 @@ void TColumnEngineForLogs::UpdateOverloaded(const THashMap<ui64, std::shared_ptr bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) { auto changes = std::static_pointer_cast<TChanges>(indexChanges); - if (changes->ApplySnapshot.Valid()) { - Y_VERIFY(changes->ApplySnapshot == snapshot); - } const auto& indexInfo = GetIndexInfo(); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 93de430962e..c16a2397a6c 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -49,6 +49,11 @@ public: }; class TChanges : public TColumnEngineChanges { + private: + TChanges(TColumnEngineChanges::EType type, const TMark& defaultMark, const TCompactionLimits& limits) + : TColumnEngineChanges(type, limits) + , DefaultMark(defaultMark) {} + public: struct TSrcGranule { ui64 PathId{0}; @@ -60,37 +65,34 @@ public: {} }; - TChanges(const TMark& defaultMark, - std::vector<NOlap::TInsertedData>&& blobsToIndex, const TCompactionLimits& limits) - : TColumnEngineChanges(TColumnEngineChanges::INSERT, limits) - , DefaultMark(defaultMark) - { - DataToIndex = std::move(blobsToIndex); + static std::shared_ptr<TChanges> BuildInsertChanges(const TMark& defaultMark, + std::vector<NOlap::TInsertedData>&& blobsToIndex, const TSnapshot& initSnapshot, + const TCompactionLimits& limits) { + std::shared_ptr<TChanges> changes(new TChanges(TColumnEngineChanges::INSERT, defaultMark, limits)); + changes->DataToIndex = blobsToIndex; + changes->InitSnapshot = initSnapshot; + return std::move(changes); } - TChanges(const TMark& defaultMark, - std::unique_ptr<TCompactionInfo>&& info, const TCompactionLimits& limits, const TSnapshot& snapshot) - : TColumnEngineChanges(TColumnEngineChanges::COMPACTION, limits) - , DefaultMark(defaultMark) - { - CompactionInfo = std::move(info); - InitSnapshot = snapshot; + static std::shared_ptr<TChanges> BuildCompactionChanges(const TMark& defaultMark, + std::unique_ptr<TCompactionInfo>&& info, + const TCompactionLimits& limits, + const TSnapshot& initSnapshot) { + std::shared_ptr<TChanges> changes(new TChanges(TColumnEngineChanges::COMPACTION, defaultMark, limits)); + changes->CompactionInfo = std::move(info); + changes->InitSnapshot = initSnapshot; + return std::move(changes); } - TChanges(const TMark& defaultMark, - const TSnapshot& snapshot, const TCompactionLimits& limits) - : TColumnEngineChanges(TColumnEngineChanges::CLEANUP, limits) - , DefaultMark(defaultMark) - { - InitSnapshot = snapshot; + static std::shared_ptr<TChanges> BuildClenupChanges(const TMark& defaultMark, const TSnapshot& initSnapshot, const TCompactionLimits& limits) { + std::shared_ptr<TChanges> changes(new TChanges(TColumnEngineChanges::CLEANUP, defaultMark, limits)); + changes->InitSnapshot = initSnapshot; + return std::move(changes); } - TChanges(const TMark& defaultMark, - TColumnEngineChanges::EType type, const TSnapshot& applySnapshot) - : TColumnEngineChanges(type, TCompactionLimits()) - , DefaultMark(defaultMark) - { - ApplySnapshot = applySnapshot; + static std::shared_ptr<TChanges> BuildTtlChanges(const TMark& defaultMark) { + std::shared_ptr<TChanges> changes(new TChanges(TColumnEngineChanges::TTL, defaultMark, TCompactionLimits())); + return std::move(changes); } bool AddPathIfNotExists(ui64 pathId) { diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp index d552fb3ee4e..8350be964ea 100644 --- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp +++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp @@ -40,8 +40,8 @@ bool TEvictionLogic::UpdateEvictedPortion(TPortionInfo& portionInfo, TPortionInfo undo = portionInfo; - auto blobSchema = IndexInfo.GetSchema(undo.GetSnapshot()); - auto resultSchema = IndexInfo.GetLastSchema(); + auto blobSchema = SchemaVersions.GetSchema(undo.GetSnapshot()); + auto resultSchema = SchemaVersions.GetLastSchema(); auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, srcBlobs); size_t undoSize = newBlobs.size(); @@ -74,11 +74,11 @@ bool TEvictionLogic::UpdateEvictedPortion(TPortionInfo& portionInfo, std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathId, const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, - const TSnapshot& minSnapshot, + const TSnapshot& snapshot, std::vector<TString>& blobs) const { Y_VERIFY(batch->num_rows()); - auto resultSchema = IndexInfo.GetSchema(minSnapshot); + auto resultSchema = SchemaVersions.GetSchema(snapshot); std::vector<TPortionInfo> out; TString tierName; @@ -111,7 +111,7 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI ui32 columnId = resultSchema->GetIndexInfo().GetColumnId(name); /// @warnign records are not valid cause of empty BlobId and zero Portion - TColumnRecord record = TColumnRecord::Make(granule, columnId, minSnapshot, 0); + TColumnRecord record = TColumnRecord::Make(granule, columnId, snapshot, 0); auto columnSaver = resultSchema->GetColumnSaver(name, saverContext); auto blob = portionInfo.AddOneChunkColumn(portionBatch->GetColumnByName(name), field, std::move(record), columnSaver); if (!blob.size()) { @@ -143,20 +143,24 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI return out; } -std::vector<std::shared_ptr<arrow::RecordBatch>> TCompactionLogic::PortionsToBatches(const std::vector<TPortionInfo>& portions, +std::pair<std::vector<std::shared_ptr<arrow::RecordBatch>>, TSnapshot> TCompactionLogic::PortionsToBatches(const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs, bool insertedOnly) const { std::vector<std::shared_ptr<arrow::RecordBatch>> batches; batches.reserve(portions.size()); - auto resultSchema = IndexInfo.GetLastSchema(); + auto resultSchema = SchemaVersions.GetLastSchema(); + TSnapshot maxSnapshot = resultSchema->GetSnapshot(); for (auto& portionInfo : portions) { - auto blobSchema = IndexInfo.GetSchema(portionInfo.GetSnapshot()); + auto blobSchema = SchemaVersions.GetSchema(portionInfo.GetSnapshot()); auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs); if (!insertedOnly || portionInfo.IsInserted()) { batches.push_back(batch); + if (maxSnapshot < portionInfo.GetSnapshot()) { + maxSnapshot = portionInfo.GetSnapshot(); + } } } - return batches; + return std::make_pair(batches, maxSnapshot); } THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> TIndexLogicBase::SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, @@ -208,36 +212,49 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange Y_VERIFY(changes->AppendedPortions.empty()); - TSnapshot minSnapshot = changes->ApplySnapshot; + auto maxSnapshot = TSnapshot::Zero(); for (auto& inserted : changes->DataToIndex) { TSnapshot insertSnap = inserted.GetSnapshot(); Y_VERIFY(insertSnap.Valid()); - if (minSnapshot.IsZero() || insertSnap <= minSnapshot) { - minSnapshot = insertSnap; + if (insertSnap > maxSnapshot) { + maxSnapshot = insertSnap; } } - Y_VERIFY(minSnapshot.Valid()); - auto& indexInfo = IndexInfo.GetSchema(minSnapshot)->GetIndexInfo(); - Y_VERIFY(indexInfo.IsSorted()); + Y_VERIFY(maxSnapshot.Valid()); + + auto resultSchema = SchemaVersions.GetSchema(maxSnapshot); + Y_VERIFY(resultSchema->GetIndexInfo().IsSorted()); THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches; for (auto& inserted : changes->DataToIndex) { TBlobRange blobRange(inserted.BlobId, 0, inserted.BlobId.BlobSize()); + auto blobSchema = SchemaVersions.GetSchema(inserted.GetSchemaSnapshot()); + auto& indexInfo = blobSchema->GetIndexInfo(); + Y_VERIFY(indexInfo.IsSorted()); + std::shared_ptr<arrow::RecordBatch> batch; if (auto it = changes->CachedBlobs.find(inserted.BlobId); it != changes->CachedBlobs.end()) { batch = it->second; } else if (auto* blobData = changes->Blobs.FindPtr(blobRange)) { Y_VERIFY(!blobData->empty(), "Blob data not present"); + // Prepare batch batch = NArrow::DeserializeBatch(*blobData, indexInfo.ArrowSchema()); + if (!batch) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD) + ("event", "cannot_parse") + ("data_snapshot", TStringBuilder() << inserted.GetSnapshot()) + ("index_snapshot", TStringBuilder() << blobSchema->GetSnapshot()); + } } else { Y_VERIFY(blobData, "Data for range %s has not been read", blobRange.ToString().c_str()); } Y_VERIFY(batch); - batch = AddSpecials(batch, indexInfo, inserted); + batch = resultSchema->NormalizeFullBatch(*blobSchema, batch); + batch = AddSpecials(batch, resultSchema->GetIndexInfo(), inserted); pathBatches[inserted.PathId].push_back(batch); - Y_VERIFY_DEBUG(NArrow::IsSorted(pathBatches[inserted.PathId].back(), indexInfo.GetReplaceKey())); + Y_VERIFY_DEBUG(NArrow::IsSorted(pathBatches[inserted.PathId].back(), resultSchema->GetIndexInfo().GetReplaceKey())); } std::vector<TString> blobs; @@ -250,15 +267,15 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange Y_VERIFY(merged); Y_VERIFY_DEBUG(NArrow::IsSorted(merged, indexInfo.GetReplaceKey())); #else - auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortReplaceDescription()); + auto merged = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription()); Y_VERIFY(merged); - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, indexInfo.GetReplaceKey())); + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, resultSchema->GetIndexInfo().GetReplaceKey())); #endif - auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], indexInfo); + auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], resultSchema->GetIndexInfo()); for (auto& [granule, batch] : granuleBatches) { - auto portions = MakeAppendedPortions(pathId, batch, granule, minSnapshot, blobs); + auto portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs); Y_VERIFY(portions.size() > 0); for (auto& portion : portions) { changes->AppendedPortions.emplace_back(std::move(portion)); @@ -270,25 +287,30 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange return blobs; } -std::shared_ptr<arrow::RecordBatch> TCompactionLogic::CompactInOneGranule(ui64 granule, +std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TCompactionLogic::CompactInOneGranule(ui64 granule, const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs) const { std::vector<std::shared_ptr<arrow::RecordBatch>> batches; batches.reserve(portions.size()); - auto resultSchema = IndexInfo.GetLastSchema(); + auto resultSchema = SchemaVersions.GetLastSchema(); + + TSnapshot maxSnapshot = resultSchema->GetSnapshot(); for (auto& portionInfo : portions) { Y_VERIFY(!portionInfo.Empty()); Y_VERIFY(portionInfo.Granule() == granule); - auto blobSchema = IndexInfo.GetSchema(portionInfo.GetSnapshot()); + auto blobSchema = SchemaVersions.GetSchema(portionInfo.GetSnapshot()); auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs); batches.push_back(batch); + if (portionInfo.GetSnapshot() > maxSnapshot) { + maxSnapshot = portionInfo.GetSnapshot(); + } } auto sortedBatch = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription()); Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(sortedBatch, resultSchema->GetIndexInfo().GetReplaceKey())); - return sortedBatch; + return std::make_pair(sortedBatch, maxSnapshot); } std::vector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TColumnEngineForLogs::TChanges> changes) const { @@ -298,9 +320,9 @@ std::vector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TColumnE Y_VERIFY(switchedProtions.size()); ui64 granule = switchedProtions[0].Granule(); - auto batch = CompactInOneGranule(granule, switchedProtions, changes->Blobs); + auto [batch, maxSnapshot] = CompactInOneGranule(granule, switchedProtions, changes->Blobs); - auto resultSchema = IndexInfo.GetLastSchema(); + auto resultSchema = SchemaVersions.GetLastSchema(); std::vector<TPortionInfo> portions; if (!changes->MergeBorders.Empty()) { Y_VERIFY(changes->MergeBorders.GetOrderedMarks().size() > 1); @@ -311,13 +333,13 @@ std::vector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TColumnE if (!slice || slice->num_rows() == 0) { continue; } - auto tmp = MakeAppendedPortions(pathId, slice, granule, TSnapshot::Zero(), blobs); + auto tmp = MakeAppendedPortions(pathId, slice, granule, maxSnapshot, blobs); for (auto&& portionInfo : tmp) { portions.emplace_back(std::move(portionInfo)); } } } else { - portions = MakeAppendedPortions(pathId, batch, granule, TSnapshot::Zero(), blobs); + portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs); } Y_VERIFY(portions.size() > 0); @@ -558,13 +580,11 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr std::vector<std::pair<TMark, ui64>> tsIds; ui64 movedRows = TryMovePortions(ts0, portions, tsIds, changes->PortionsToMove); - const auto& srcBatches = PortionsToBatches(portions, changes->Blobs, movedRows != 0); + auto [srcBatches, maxSnapshot] = PortionsToBatches(portions, changes->Blobs, movedRows != 0); Y_VERIFY(srcBatches.size() == portions.size()); std::vector<TString> blobs; - - auto resultSchema = IndexInfo.GetLastSchema(); - + auto resultSchema = SchemaVersions.GetLastSchema(); if (movedRows) { Y_VERIFY(changes->PortionsToMove.size() >= 2); Y_VERIFY(changes->PortionsToMove.size() == tsIds.size()); @@ -652,7 +672,7 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr for (const auto& batch : idBatches[id]) { // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges(). - auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, TSnapshot::Zero(), blobs); + auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs); Y_VERIFY(newPortions.size() > 0); for (auto& portion : newPortions) { changes->AppendedPortions.emplace_back(std::move(portion)); @@ -668,7 +688,7 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr ui64 tmpGranule = changes->SetTmpGranule(pathId, ts); // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges(). - auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, TSnapshot::Zero(), blobs); + auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs); Y_VERIFY(portions.size() > 0); for (auto& portion : portions) { changes->AppendedPortions.emplace_back(std::move(portion)); diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.h b/ydb/core/tx/columnshard/engines/index_logic_logs.h index 91471217ee3..b13a84de6d0 100644 --- a/ydb/core/tx/columnshard/engines/index_logic_logs.h +++ b/ydb/core/tx/columnshard/engines/index_logic_logs.h @@ -8,19 +8,19 @@ namespace NKikimr::NOlap { class TIndexLogicBase { protected: - const TVersionedIndex& IndexInfo; + const TVersionedIndex& SchemaVersions; private: const THashMap<ui64, NKikimr::NOlap::TTiering>* TieringMap = nullptr; public: TIndexLogicBase(const TVersionedIndex& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap) - : IndexInfo(indexInfo) + : SchemaVersions(indexInfo) , TieringMap(&tieringMap) { } TIndexLogicBase(const TVersionedIndex& indexInfo) - : IndexInfo(indexInfo) + : SchemaVersions(indexInfo) { } @@ -72,7 +72,7 @@ public: private: std::vector<TString> CompactSplitGranule(const std::shared_ptr<TColumnEngineForLogs::TChanges>& changes) const; std::vector<TString> CompactInGranule(std::shared_ptr<TColumnEngineForLogs::TChanges> changes) const; - std::shared_ptr<arrow::RecordBatch> CompactInOneGranule(ui64 granule, const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs) const; + std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> CompactInOneGranule(ui64 granule, const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs) const; /// @return vec({ts, batch}). ts0 <= ts1 <= ... <= tsN /// @note We use ts from PK for split but there could be lots PK with the same ts. @@ -90,7 +90,7 @@ private: std::vector<std::pair<TMark, ui64>>& tsIds, std::vector<std::pair<TPortionInfo, ui64>>& toMove) const; - std::vector<std::shared_ptr<arrow::RecordBatch>> PortionsToBatches(const std::vector<TPortionInfo>& portions, + std::pair<std::vector<std::shared_ptr<arrow::RecordBatch>>, TSnapshot> PortionsToBatches(const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs, bool insertedOnly = false) const; }; diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index edccbfcad1f..cb95e0b13d4 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -188,19 +188,20 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& // Extract columns (without check), filter, attach snapshot, extract columns with check // (do not filter snapshot columns) - auto loadSchema = ReadMetadata->GetLoadSchema(snapshot); + auto dataSchema = ReadMetadata->GetLoadSchema(snapshot); - auto batch = NArrow::ExtractExistedColumns(srcBatch, loadSchema->GetSchema()); + auto batch = NArrow::ExtractExistedColumns(srcBatch, dataSchema->GetSchema()); Y_VERIFY(batch); - + auto filter = FilterNotIndexed(batch, *ReadMetadata); if (filter.IsTotalDenyFilter()) { return nullptr; } auto preparedBatch = batch; - preparedBatch = TIndexInfo::AddSpecialColumns(preparedBatch, snapshot); - preparedBatch = NArrow::ExtractColumns(preparedBatch, loadSchema->GetSchema()); + auto resultSchema = ReadMetadata->GetLoadSchema(); + preparedBatch = resultSchema->NormalizeBatch(*dataSchema, preparedBatch); + preparedBatch = NArrow::ExtractColumns(preparedBatch, resultSchema->GetSchema()); Y_VERIFY(preparedBatch); filter.Apply(preparedBatch); diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index 1673d49808b..52208cfd348 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -65,16 +65,16 @@ public: /// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK) std::vector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch); - void AddNotIndexed(ui32 batchNo, TString blob, const TSnapshot& snapshot) { - auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->GetBlobSchema(snapshot)); - AddNotIndexed(batchNo, batch, snapshot); + void AddNotIndexed(ui32 batchNo, TString blob, const TCommittedBlob& commitedBlob) { + auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->GetBlobSchema(commitedBlob.GetSchemaSnapshot())); + AddNotIndexed(batchNo, batch, commitedBlob); } - void AddNotIndexed(ui32 batchNo, const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) { + void AddNotIndexed(ui32 batchNo, const std::shared_ptr<arrow::RecordBatch>& batch, const TCommittedBlob& commitedBlob) { Y_VERIFY(batchNo < NotIndexed.size()); Y_VERIFY(!NotIndexed[batchNo]); ++ReadyNotIndexed; - NotIndexed[batchNo] = MakeNotIndexedBatch(batch, snapshot); + NotIndexed[batchNo] = MakeNotIndexedBatch(batch, commitedBlob.GetSchemaSnapshot()); } void AddIndexed(const TBlobRange& blobRange, const TString& column); diff --git a/ydb/core/tx/columnshard/engines/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table.cpp index b9a800f510b..50f15ace780 100644 --- a/ydb/core/tx/columnshard/engines/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table.cpp @@ -198,7 +198,7 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& sna for (const auto& data : *committed) { if (std::less_equal<TSnapshot>()(data.GetSnapshot(), snapshot)) { - ret.emplace_back(TCommittedBlob{data.BlobId, data.GetSnapshot()}); + ret.emplace_back(TCommittedBlob(data.BlobId, data.GetSnapshot(), data.GetSchemaSnapshot())); } } diff --git a/ydb/core/tx/columnshard/engines/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table.h index 93357ffbf2e..2e9ddd5f097 100644 --- a/ydb/core/tx/columnshard/engines/insert_table.h +++ b/ydb/core/tx/columnshard/engines/insert_table.h @@ -7,6 +7,7 @@ namespace NKikimr::NOlap { struct TInsertedData { +public: ui64 ShardOrPlan = 0; ui64 WriteTxId = 0; ui64 PathId = 0; @@ -18,15 +19,19 @@ struct TInsertedData { TInsertedData() = delete; // avoid invalid TInsertedData anywhere TInsertedData(ui64 shardOrPlan, ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId, - const TString& meta, const TInstant& writeTime) + const TString& meta, const TInstant& writeTime, const std::optional<TSnapshot>& schemaVersion) : ShardOrPlan(shardOrPlan) , WriteTxId(writeTxId) , PathId(pathId) , DedupId(dedupId) , BlobId(blobId) , Metadata(meta) - , DirtyTime(writeTime) - {} + , DirtyTime(writeTime) { + if (schemaVersion) { + SchemaVersion = *schemaVersion; + Y_VERIFY(SchemaVersion.Valid()); + } + } bool operator < (const TInsertedData& key) const { if (ShardOrPlan < key.ShardOrPlan) { @@ -84,29 +89,46 @@ struct TInsertedData { return TSnapshot(ShardOrPlan, WriteTxId); } + const TSnapshot& GetSchemaSnapshot() const { + return SchemaVersion; + } + ui32 BlobSize() const { return BlobId.BlobSize(); } + +private: + TSnapshot SchemaVersion = TSnapshot::Zero(); }; class TCommittedBlob { private: TUnifiedBlobId BlobId; - TSnapshot Snapshot; + TSnapshot CommitSnapshot; + TSnapshot SchemaSnapshot; public: - TCommittedBlob(const TUnifiedBlobId& blobId, const TSnapshot& snapshot) + TCommittedBlob(const TUnifiedBlobId& blobId, const TSnapshot& snapshot, const TSnapshot& schemaSnapshot) : BlobId(blobId) - , Snapshot(snapshot) + , CommitSnapshot(snapshot) + , SchemaSnapshot(schemaSnapshot) {} + static TCommittedBlob BuildKeyBlob(const TUnifiedBlobId& blobId) { + return TCommittedBlob(blobId, TSnapshot::Zero(), TSnapshot::Zero()); + } + /// It uses trick then we place key wtih planStep:txId in container and find them later by BlobId only. /// So hash() and equality should depend on BlobId only. bool operator == (const TCommittedBlob& key) const { return BlobId == key.BlobId; } ui64 Hash() const noexcept { return BlobId.Hash(); } TString DebugString() const { - return TStringBuilder() << BlobId << ";ps=" << Snapshot.GetPlanStep() << ";ti=" << Snapshot.GetTxId(); + return TStringBuilder() << BlobId << ";ps=" << CommitSnapshot.GetPlanStep() << ";ti=" << CommitSnapshot.GetTxId(); } const TSnapshot& GetSnapshot() const { - return Snapshot; + return CommitSnapshot; + } + + const TSnapshot& GetSchemaSnapshot() const { + return SchemaSnapshot; } const TUnifiedBlobId& GetBlobId() const { diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp index 4bd2230243b..2371b84e821 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portion_info.cpp @@ -4,6 +4,42 @@ namespace NKikimr::NOlap { +std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeFullBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const { + auto schema = GetIndexInfo().ArrowSchema(); + return NormalizeBatchImpl(dataSchema, batch, GetIndexInfo().ArrowSchema()); +} + +std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const { + return NormalizeBatchImpl(dataSchema, batch, GetSchema()); +} + +std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatchImpl(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch, + const std::shared_ptr<arrow::Schema>& resultArrowSchema) const { + if (dataSchema.GetSnapshot() == GetSnapshot()) { + return batch; + } + Y_VERIFY(dataSchema.GetSnapshot() < GetSnapshot()); + std::vector<std::shared_ptr<arrow::Array>> newColumns; + newColumns.resize(resultArrowSchema->num_fields(), 0); + + for (size_t i = 0; i < resultArrowSchema->fields().size(); ++i) { + auto& field = resultArrowSchema->fields()[i]; + auto columnId = GetIndexInfo().GetColumnId(field->name()); + auto oldColumnIndex = dataSchema.GetFieldIndex(columnId); + if (oldColumnIndex >= 0) { // ClumnExists + auto oldColumnInfo = dataSchema.GetField(oldColumnIndex); + Y_VERIFY(oldColumnInfo); + auto columnData = batch->GetColumnByName(oldColumnInfo->name()); + Y_VERIFY(columnData); + newColumns[i] = columnData; + } else { // AddNullColumn + auto nullColumn = NArrow::MakeEmptyBatch(arrow::schema({field}), batch->num_rows()); + newColumns[i] = nullColumn->column(0); + } + } + return arrow::RecordBatch::Make(resultArrowSchema, batch->num_rows(), newColumns); +} + TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array, const std::shared_ptr<arrow::Field>& field, const TColumnSaver saver) { diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index 00bb37bb169..a263be80b69 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -37,6 +37,11 @@ public: virtual const std::shared_ptr<arrow::Schema>& GetSchema() const = 0; virtual const TIndexInfo& GetIndexInfo() const = 0; virtual const TSnapshot& GetSnapshot() const = 0; + + std::shared_ptr<arrow::RecordBatch> NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const; + std::shared_ptr<arrow::RecordBatch> NormalizeFullBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const; +private: + std::shared_ptr<arrow::RecordBatch> NormalizeBatchImpl(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch, const std::shared_ptr<arrow::Schema>& newSchema) const; }; class TSnapshotSchema: public ISnapshotSchema { @@ -65,7 +70,10 @@ public: } virtual int GetFieldIndex(const ui32 columnId) const override { - TString columnName = IndexInfo.GetColumnName(columnId); + TString columnName = IndexInfo.GetColumnName(columnId, false); + if (!columnName) { + return -1; + } std::string name(columnName.data(), columnName.size()); return Schema->GetFieldIndex(name); } @@ -128,7 +136,10 @@ public: if (!ColumnIds.contains(columnId)) { return -1; } - TString columnName = OriginalSnapshot->GetIndexInfo().GetColumnName(columnId); + TString columnName = OriginalSnapshot->GetIndexInfo().GetColumnName(columnId, false); + if (!columnName) { + return -1; + } std::string name(columnName.data(), columnName.size()); return Schema->GetFieldIndex(name); } @@ -290,11 +301,9 @@ struct TPortionInfo { } void UpdateRecords(ui64 portion, const THashMap<ui64, ui64>& granuleRemap, const TSnapshot& snapshot) { + Y_UNUSED(snapshot);; for (auto& rec : Records) { rec.Portion = portion; - if (!rec.ValidSnapshot()) { - rec.SetSnapshot(snapshot); - } } if (!granuleRemap.empty()) { for (auto& rec : Records) { diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp index edc295a6ae4..3688a52e2cd 100644 --- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp @@ -51,19 +51,20 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) { TTestInsertTableDB dbTable; TInsertTable insertTable; + TSnapshot indexSnapshot(1, 1); // insert, not commited TInstant time = TInstant::Now(); - bool ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, time)); + bool ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, time, indexSnapshot)); UNIT_ASSERT(ok); // insert the same blobId1 again - ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, time)); + ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, time, indexSnapshot)); UNIT_ASSERT(!ok); // insert different blodId with the same writeId and dedupId TUnifiedBlobId blobId2(2222, 1, 2, 100, 1); - ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId2, {}, time)); + ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId2, {}, time, indexSnapshot)); UNIT_ASSERT(!ok); // read nothing diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 155a8292ea4..e85dd848ec7 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -289,16 +289,6 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, return engine.ApplyChanges(db, changes, snap); } -bool Insert(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap, - std::vector<TInsertedData>&& dataToIndex, THashMap<TBlobRange, TString>& blobs, ui32& step) { - TColumnEngineForLogs engine(0, TestLimits()); - engine.UpdateDefaultSchema(snap, TIndexInfo(tableInfo)); - THashSet<TUnifiedBlobId> lostBlobs; - engine.Load(db, lostBlobs); - - return Insert(engine, db, snap, std::move(dataToIndex), blobs, step); -} - struct TExpected { ui32 SrcPortions; ui32 NewPortions; @@ -327,15 +317,6 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T return engine.ApplyChanges(db, changes, snap); } -bool Compact(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap, THashMap<TBlobRange, - TString>&& blobs, ui32& step, const TExpected& expected) { - TColumnEngineForLogs engine(0, TestLimits()); - engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo)); - THashSet<TUnifiedBlobId> lostBlobs; - engine.Load(db, lostBlobs); - return Compact(engine, db, snap, std::move(blobs), step, expected); -} - bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, ui32 expectedToDrop) { THashSet<ui64> pathsToDrop; std::shared_ptr<TColumnEngineChanges> changes = engine.StartCleanup(snap, TestLimits(), pathsToDrop, 1000); @@ -351,7 +332,7 @@ bool Ttl(TColumnEngineForLogs& engine, TTestDbWrapper& db, UNIT_ASSERT(changes); UNIT_ASSERT_VALUES_EQUAL(changes->PortionsToDrop.size(), expectedToDrop); - return engine.ApplyChanges(db, changes, changes->ApplySnapshot); + return engine.ApplyChanges(db, changes, TSnapshot(1,0)); } std::shared_ptr<TPredicate> MakePredicate(int64_t ts, NArrow::EOperation op) { @@ -389,9 +370,17 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] TInstant writeTime = TInstant::Now(); + + // load + TColumnEngineForLogs engine(0); + TSnapshot indexSnaphot(1, 1); + engine.UpdateDefaultSchema(indexSnaphot, TIndexInfo(tableInfo)); + THashSet<TUnifiedBlobId> lostBlobs; + engine.Load(db, lostBlobs); + std::vector<TInsertedData> dataToIndex = { - {1, 2, paths[0], "", blobRanges[0].BlobId, "", writeTime}, - {2, 1, paths[0], "", blobRanges[1].BlobId, "", writeTime} + {1, 2, paths[0], "", blobRanges[0].BlobId, "", writeTime, indexSnaphot}, + {2, 1, paths[0], "", blobRanges[1].BlobId, "", writeTime, indexSnaphot} }; // write @@ -400,18 +389,13 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { THashMap<TBlobRange, TString> blobs; blobs[blobRanges[0]] = testBlob; blobs[blobRanges[1]] = testBlob; - Insert(tableInfo, db, TSnapshot(1, 2), std::move(dataToIndex), blobs, step); - - // load - - TColumnEngineForLogs engine(0); - engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo)); - THashSet<TUnifiedBlobId> lostBlobs; - engine.Load(db, lostBlobs); - + Insert(engine, db, TSnapshot(1, 2), std::move(dataToIndex), blobs, step); + // selects - const TIndexInfo& indexInfo = engine.GetIndexInfo(); + auto lastSchema = engine.GetVersionedIndex().GetLastSchema(); + UNIT_ASSERT_EQUAL(lastSchema->GetSnapshot(), indexSnaphot); + const TIndexInfo& indexInfo = lastSchema->GetIndexInfo(); THashSet<ui32> oneColumnId = { indexInfo.GetColumnId(testColumns[0].first) }; THashSet<ui32> columnIds; for (auto& [column, typeId] : testColumns) { @@ -426,13 +410,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0); } - { // select from snap after insert (greater txId) + { // select from snap between insert (greater txId) ui64 planStep = 1; ui64 txId = 2; auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), columnIds, NOlap::TPKRangesFilter(false)); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions[0].NumRecords(), columnIds.size()); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0); } { // select from snap after insert (greater planStep) @@ -473,6 +456,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TTestDbWrapper db; TIndexInfo tableInfo = TestTableInfo(ydbSchema, key); + TSnapshot indexSnapshot(1, 1); + TColumnEngineForLogs engine(0, TestLimits()); + engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo)); + THashSet<TUnifiedBlobId> lostBlobs; + engine.Load(db, lostBlobs); + ui64 pathId = 1; ui32 step = 1000; @@ -490,25 +479,18 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()}); + TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot}); - bool ok = Insert(tableInfo, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); + bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); } // compact planStep = 2; - bool ok = Compact(tableInfo, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4}); + bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4}); UNIT_ASSERT(ok); - // load - - TColumnEngineForLogs engine(0, TestLimits()); - engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo)); - THashSet<TUnifiedBlobId> lostBlobs; - engine.Load(db, lostBlobs); - // read // TODO: read old snapshot @@ -580,7 +562,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { ui64 planStep = 1; TColumnEngineForLogs engine(0, TestLimits()); - engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo)); + TSnapshot indexSnapshot(1, 1); + engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo)); THashSet<TUnifiedBlobId> lostBlobs; engine.Load(db, lostBlobs); @@ -596,7 +579,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()}); + TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot}); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); // first overload returns ok: it's a postcondition @@ -634,7 +617,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()}); + TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot}); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); bool overload = engine.GetOverloadedGranules(pathId); @@ -660,6 +643,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // insert ui64 planStep = 1; + TSnapshot indexSnapshot(1, 1); + TColumnEngineForLogs engine(0, TestLimits()); + engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo)); + THashSet<TUnifiedBlobId> lostBlobs; + engine.Load(db, lostBlobs); + THashMap<TBlobRange, TString> blobs; ui64 numRows = 1000; ui64 rowPos = 0; @@ -671,25 +660,18 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()}); + TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot}); - bool ok = Insert(tableInfo, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); + bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); } // compact planStep = 2; - bool ok = Compact(tableInfo, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4}); + bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4}); UNIT_ASSERT(ok); - // load - - TColumnEngineForLogs engine(0, TestLimits()); - engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo)); - THashSet<TUnifiedBlobId> lostBlobs; - engine.Load(db, lostBlobs); - // read planStep = 3; diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index f3bb215b8dc..99a5fa59bdb 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -65,13 +65,13 @@ public: WaitIndexed.erase(event.BlobRange); IndexedData.AddIndexed(event.BlobRange, event.Data); } else if (CommittedBlobs.contains(blobId)) { - auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, NOlap::TSnapshot::Zero()}); + auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob::BuildKeyBlob(blobId)); if (cmt.empty()) { return; // ignore duplicates } const NOlap::TCommittedBlob& cmtBlob = cmt.key(); ui32 batchNo = cmt.mapped(); - IndexedData.AddNotIndexed(batchNo, event.Data, cmtBlob.GetSnapshot()); + IndexedData.AddNotIndexed(batchNo, event.Data, cmtBlob); } else { LOG_S_ERROR("TEvReadBlobRangeResult returned unexpected blob at tablet " << TabletId << " (read)"); @@ -181,12 +181,12 @@ public: // Add cached batches without read for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) { - auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, NOlap::TSnapshot::Zero()}); + auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob::BuildKeyBlob(blobId)); Y_VERIFY(!cmt.empty()); const NOlap::TCommittedBlob& cmtBlob = cmt.key(); ui32 batchNo = cmt.mapped(); - IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.GetSnapshot()); + IndexedData.AddNotIndexed(batchNo, batch, cmtBlob); } LOG_S_DEBUG("Starting read (" << WaitIndexed.size() << " indexed, " diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 12673e9bdbe..299020b6bba 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -2568,7 +2568,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } if (msg->IndexChanges->CompactionInfo) { ++compactionsHappened; - Cerr << "Compaction at snaphsot "<< msg->IndexChanges->ApplySnapshot + Cerr << "Compaction at snaphsot "<< msg->IndexChanges->InitSnapshot << " old portions:"; ui64 srcGranule{0}; for (const auto& portionInfo : msg->IndexChanges->SwitchedPortions) { @@ -2647,6 +2647,11 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { runtime.GetAppData().Icb->SetValue("ColumnShardControls.BlobCountToTriggerGC", 1, unusedPrev); } + { + TAtomic unusedPrev; + runtime.GetAppData().Icb->SetValue("ColumnShardControls.MaxPortionsInGranule", 10, unusedPrev); + } + // Write different keys: grow on compaction static const ui32 triggerPortionSize = 75 * 1000; |