diff options
author | ivanmorozov333 <[email protected]> | 2024-09-23 12:09:29 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2024-09-23 12:09:29 +0300 |
commit | 6862d3c0e8769366a7221ccabe621a18ff0549bf (patch) | |
tree | b1b8c848f05f608cbbdc565998d62fcb4a4b93dd | |
parent | b47a8b8f9c631d8bc6a79dc4c5cb839d1b8356e0 (diff) |
fix mvcc tests. use write id as row feature for conflicts resolving (#9598)
17 files changed, 156 insertions, 122 deletions
diff --git a/ydb/core/formats/arrow/arrow_filter.cpp b/ydb/core/formats/arrow/arrow_filter.cpp index c404a016f4b..62b47a66e24 100644 --- a/ydb/core/formats/arrow/arrow_filter.cpp +++ b/ydb/core/formats/arrow/arrow_filter.cpp @@ -575,8 +575,10 @@ TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter } TColumnFilter::TIterator TColumnFilter::GetIterator(const bool reverse, const ui32 expectedSize) const { - if ((IsTotalAllowFilter() || IsTotalDenyFilter()) && !Filter.size()) { - return TIterator(reverse, expectedSize, LastValue); + if (IsTotalAllowFilter()) { + return TIterator(reverse, expectedSize, true); + } else if (IsTotalDenyFilter()) { + return TIterator(reverse, expectedSize, false); } else { AFL_VERIFY(expectedSize == Size())("expected", expectedSize)("size", Size())("reverse", reverse); return TIterator(reverse, Filter, GetStartValue(reverse)); diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 62e345ba825..ec81be006de 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -7903,10 +7903,10 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { testHelper.ReadData("SELECT * FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[3;\"-321\";\"-3.14\";[\"test_res_3\"]]]"); testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[\"-3.14\"]]"); testHelper.ReadData("SELECT resource_id FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[[\"test_res_3\"]]]"); - testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest`", "[[#];[#];[\"-3.14\"]]"); + testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` ORDER BY new_column", "[[#];[#];[\"-3.14\"]]"); testHelper.RebootTablets(testTable.GetName()); - testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest`", "[[#];[#];[\"-3.14\"]]"); + testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` ORDER BY new_column", "[[#];[#];[\"-3.14\"]]"); } Y_UNIT_TEST(AddColumnErrors) { diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index 52b2398d7f0..f8c4d1c49aa 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -162,5 +162,6 @@ message TFeatureFlags { optional bool EnableExternalDataSourcesOnServerless = 143 [default = true]; optional bool EnableSparsedColumns = 144 [default = false]; optional bool EnableParameterizedDecimal = 145 [default = false]; - optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false]; + optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false]; + optional bool EnableInsertWriteIdSpecialColumnCompatibility = 147 [default = false]; } diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 8b104b9dcd5..4f08426ddd7 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -306,9 +306,10 @@ struct Schema : NIceDb::Schema { struct BlobRangeOffset: Column<11, NScheme::NTypeIds::Uint64> {}; struct BlobRangeSize: Column<12, NScheme::NTypeIds::Uint64> {}; + struct InsertWriteId: Column<13, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey<Committed, PlanStep, WriteTxId, PathId, DedupId>; - using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion, BlobRangeOffset, BlobRangeSize>; + using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion, BlobRangeOffset, BlobRangeSize, InsertWriteId>; }; struct IndexGranules : NIceDb::Schema::Table<GranulesTableId> { @@ -808,6 +809,7 @@ struct Schema : NIceDb::Schema { .Key((ui8)recType, 0, (ui64)data.GetInsertWriteId(), data.GetPathId(), "") .Update(NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange().GetBlobId().ToStringLegacy()), NIceDb::TUpdate<InsertTable::BlobRangeOffset>(data.GetBlobRange().Offset), + NIceDb::TUpdate<InsertTable::InsertWriteId>((ui64)data.GetInsertWriteId()), NIceDb::TUpdate<InsertTable::BlobRangeSize>(data.GetBlobRange().Size), NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()), NIceDb::TUpdate<InsertTable::SchemaVersion>(data.GetSchemaVersion())); @@ -818,6 +820,7 @@ struct Schema : NIceDb::Schema { .Key((ui8)EInsertTableIds::Committed, data.GetSnapshot().GetPlanStep(), data.GetSnapshot().GetTxId(), data.GetPathId(), data.GetDedupId()) .Update(NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange().GetBlobId().ToStringLegacy()), + NIceDb::TUpdate<InsertTable::InsertWriteId>((ui64)data.GetInsertWriteId()), NIceDb::TUpdate<InsertTable::BlobRangeOffset>(data.GetBlobRange().Offset), NIceDb::TUpdate<InsertTable::BlobRangeSize>(data.GetBlobRange().Size), NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()), @@ -982,6 +985,7 @@ private: NColumnShard::Schema::EInsertTableIds RecType; ui64 PlanStep; ui64 WriteTxId; + TInsertWriteId InsertWriteId; ui64 PathId; YDB_ACCESSOR_DEF(TString, DedupId); ui64 SchemaVersion; @@ -989,8 +993,8 @@ private: std::optional<NOlap::TUnifiedBlobId> BlobId; TString MetadataString; std::optional<NKikimrTxColumnShard::TLogicalMetadata> Metadata; - std::optional<ui64> RangeOffset; - std::optional<ui64> RangeSize; + ui64 RangeOffset; + ui64 RangeSize; void Prepare(const IBlobGroupSelector* dsGroupSelector) { AFL_VERIFY(!PreparedFlag); @@ -1004,7 +1008,6 @@ private: AFL_VERIFY(MetadataString); Y_ABORT_UNLESS(meta.ParseFromString(MetadataString)); Metadata = std::move(meta); - AFL_VERIFY(!!RangeOffset == !!RangeSize); } bool PreparedFlag = false; @@ -1013,8 +1016,13 @@ private: public: TInsertWriteId GetInsertWriteId() const { AFL_VERIFY(ParsedFlag); - AFL_VERIFY(RecType != NColumnShard::Schema::EInsertTableIds::Committed); - return (TInsertWriteId)WriteTxId; + return InsertWriteId; + } + + ui64 GetTxId() const { + AFL_VERIFY(ParsedFlag); + AFL_VERIFY(RecType == NColumnShard::Schema::EInsertTableIds::Committed); + return WriteTxId; } NColumnShard::Schema::EInsertTableIds GetRecType() const { @@ -1024,6 +1032,7 @@ public: ui64 GetPlanStep() const { AFL_VERIFY(ParsedFlag); + AFL_VERIFY(RecType == NColumnShard::Schema::EInsertTableIds::Committed); return PlanStep; } @@ -1035,19 +1044,12 @@ public: void Upsert(NIceDb::TNiceDb& db) const { AFL_VERIFY(ParsedFlag); using namespace NColumnShard; - if (RangeOffset) { - db.Table<Schema::InsertTable>() - .Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId) - .Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString), - NIceDb::TUpdate<Schema::InsertTable::BlobRangeOffset>(*RangeOffset), - NIceDb::TUpdate<Schema::InsertTable::BlobRangeSize>(*RangeSize), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString), - NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion)); - } else { - db.Table<Schema::InsertTable>() - .Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId) - .Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString), - NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion)); - } + db.Table<Schema::InsertTable>() + .Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId) + .Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString), + NIceDb::TUpdate<Schema::InsertTable::BlobRangeOffset>(RangeOffset), + NIceDb::TUpdate<Schema::InsertTable::BlobRangeSize>(RangeSize), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString), + NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion)); } template <class TRowset> @@ -1059,41 +1061,40 @@ public: PlanStep = rowset.template GetValue<Schema::InsertTable::PlanStep>(); WriteTxId = rowset.template GetValueOrDefault<Schema::InsertTable::WriteTxId>(); AFL_VERIFY(WriteTxId); + InsertWriteId = (TInsertWriteId)rowset.template GetValueOrDefault<Schema::InsertTable::InsertWriteId>(WriteTxId); PathId = rowset.template GetValue<Schema::InsertTable::PathId>(); DedupId = rowset.template GetValue<Schema::InsertTable::DedupId>(); - SchemaVersion = - rowset.template HaveValue<Schema::InsertTable::SchemaVersion>() ? rowset.template GetValue<Schema::InsertTable::SchemaVersion>() : 0; + SchemaVersion = rowset.template GetValueOrDefault<Schema::InsertTable::SchemaVersion>(0); BlobIdString = rowset.template GetValue<Schema::InsertTable::BlobId>(); MetadataString = rowset.template GetValue<Schema::InsertTable::Meta>(); - if (rowset.template HaveValue<Schema::InsertTable::BlobRangeOffset>()) { - RangeOffset = rowset.template GetValue<Schema::InsertTable::BlobRangeOffset>(); - } - if (rowset.template HaveValue<Schema::InsertTable::BlobRangeSize>()) { - RangeSize = rowset.template GetValue<Schema::InsertTable::BlobRangeSize>(); - } + AFL_VERIFY(rowset.template HaveValue<Schema::InsertTable::BlobRangeOffset>()); + AFL_VERIFY(rowset.template HaveValue<Schema::InsertTable::BlobRangeSize>()); + RangeOffset = rowset.template GetValue<Schema::InsertTable::BlobRangeOffset>(); + RangeSize = rowset.template GetValue<Schema::InsertTable::BlobRangeSize>(); } NOlap::TCommittedData BuildCommitted(const IBlobGroupSelector* dsGroupSelector) { Prepare(dsGroupSelector); using namespace NColumnShard; AFL_VERIFY(RecType == Schema::EInsertTableIds::Committed); - auto userData = std::make_shared<NOlap::TUserData>(PathId, - NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt); + auto userData = std::make_shared<NOlap::TUserData>( + PathId, NOlap::TBlobRange(*BlobId, RangeOffset, RangeSize), *Metadata, SchemaVersion, std::nullopt); AFL_VERIFY(!!DedupId); AFL_VERIFY(PlanStep); - return NOlap::TCommittedData(userData, PlanStep, WriteTxId, DedupId); + return NOlap::TCommittedData(userData, PlanStep, WriteTxId, InsertWriteId, DedupId); } NOlap::TInsertedData BuildInsertedOrAborted(const IBlobGroupSelector* dsGroupSelector) { Prepare(dsGroupSelector); using namespace NColumnShard; + AFL_VERIFY(InsertWriteId == (TInsertWriteId)WriteTxId)("insert", InsertWriteId)("write", WriteTxId); AFL_VERIFY(RecType != Schema::EInsertTableIds::Committed); - auto userData = std::make_shared<NOlap::TUserData>(PathId, - NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt); + auto userData = std::make_shared<NOlap::TUserData>( + PathId, NOlap::TBlobRange(*BlobId, RangeOffset, RangeSize), *Metadata, SchemaVersion, std::nullopt); AFL_VERIFY(!DedupId); AFL_VERIFY(!PlanStep); - return NOlap::TInsertedData((TInsertWriteId)WriteTxId, userData); + return NOlap::TInsertedData(InsertWriteId, userData); } }; diff --git a/ydb/core/tx/columnshard/common/portion.h b/ydb/core/tx/columnshard/common/portion.h index d8497d5174a..311cfa23269 100644 --- a/ydb/core/tx/columnshard/common/portion.h +++ b/ydb/core/tx/columnshard/common/portion.h @@ -17,10 +17,12 @@ class TSpecialColumns { public: static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step"; static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id"; + static constexpr const char* SPEC_COL_WRITE_ID = "_yql_write_id"; static constexpr const char* SPEC_COL_DELETE_FLAG = "_yql_delete_flag"; static const ui32 SPEC_COL_PLAN_STEP_INDEX = 0xffffff00; static const ui32 SPEC_COL_TX_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 1; - static const ui32 SPEC_COL_DELETE_FLAG_INDEX = SPEC_COL_PLAN_STEP_INDEX + 2; + static const ui32 SPEC_COL_WRITE_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 2; + static const ui32 SPEC_COL_DELETE_FLAG_INDEX = SPEC_COL_PLAN_STEP_INDEX + 3; }; } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp index 43942643986..825f65f8010 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp @@ -76,6 +76,10 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared TMergingContext mergingContext(batchResults, Batches); for (auto&& [columnId, columnData] : columnsData) { + if (columnId == (ui32)IIndexInfo::ESpecialColumn::WRITE_ID && + (!HasAppData() || !AppDataVerified().FeatureFlags.GetEnableInsertWriteIdSpecialColumnCompatibility())) { + continue; + } const TString& columnName = resultFiltered->GetIndexInfo().GetColumnName(columnId); NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("field_name", columnName)); auto columnInfo = stats->GetColumnInfo(columnId); @@ -125,13 +129,6 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())( "current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first); } - auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP); - auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID); - Y_ABORT_UNLESS(columnSnapshotPlanStepIdx); - Y_ABORT_UNLESS(columnSnapshotTxIdx); - Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id); - Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id); - std::vector<TGeneralSerializedSlice> batchSlices; std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultFiltered, stats)); diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index ea7b6ddc2eb..2f76ab4b177 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -114,6 +114,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks( if (dataColumnIds.contains((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG)) { pkColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG); } + dataColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::WRITE_ID); } resultFiltered = std::make_shared<TFilteredSnapshotSchema>(resultSchema, dataColumnIds); { diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 22ca7fd2c73..edce92470ad 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -244,7 +244,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema)); blobSchema->AdaptBatchToSchema(*batch, resultSchema); } - IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot()); + IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot(), (ui64)inserted.GetInsertWriteId()); auto& pathInfo = pathBatches.GetPathInfo(inserted.GetPathId()); diff --git a/ydb/core/tx/columnshard/engines/insert_table/committed.h b/ydb/core/tx/columnshard/engines/insert_table/committed.h index b075d9b8a39..141ca4cb562 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/committed.h +++ b/ydb/core/tx/columnshard/engines/insert_table/committed.h @@ -9,6 +9,7 @@ class TCommittedData: public TUserDataContainer { private: using TBase = TUserDataContainer; YDB_READONLY(TSnapshot, Snapshot, NOlap::TSnapshot::Zero()); + YDB_READONLY(TInsertWriteId, InsertWriteId, (TInsertWriteId)0); YDB_READONLY_DEF(TString, DedupId); YDB_READONLY(bool, Remove, false); @@ -16,19 +17,23 @@ public: TCommittedData(const std::shared_ptr<TUserData>& userData, const ui64 planStep, const ui64 txId, const TInsertWriteId insertWriteId) : TBase(userData) , Snapshot(planStep, txId) + , InsertWriteId(insertWriteId) , DedupId(ToString(planStep) + ":" + ToString((ui64)insertWriteId)) { } - TCommittedData(const std::shared_ptr<TUserData>& userData, const ui64 planStep, const ui64 txId, const TString& dedupId) + TCommittedData(const std::shared_ptr<TUserData>& userData, const ui64 planStep, const ui64 txId, const TInsertWriteId insertWriteId, + const TString& dedupId) : TBase(userData) , Snapshot(planStep, txId) + , InsertWriteId(insertWriteId) , DedupId(dedupId) { } TCommittedData(const std::shared_ptr<TUserData>& userData, const TSnapshot& ss, const ui64 generation, const TInsertWriteId ephemeralWriteId) : TBase(userData) , Snapshot(ss) - , DedupId(ToString(generation) + ":" + ToString(ephemeralWriteId)) { + , InsertWriteId(ephemeralWriteId) + , DedupId(ToString(generation) + ":" + ToString((ui64)ephemeralWriteId)) { } void SetRemove() { @@ -52,7 +57,8 @@ public: class TCommittedBlob { private: TBlobRange BlobRange; - std::variant<TSnapshot, TInsertWriteId> WriteInfo; + std::optional<TSnapshot> CommittedSnapshot; + const TInsertWriteId InsertWriteId; YDB_READONLY(ui64, SchemaVersion, 0); YDB_READONLY(ui64, RecordsCount, 0); YDB_READONLY(bool, IsDelete, false); @@ -61,6 +67,31 @@ private: YDB_READONLY_DEF(NArrow::TSchemaSubset, SchemaSubset); public: + const std::optional<TSnapshot>& GetCommittedSnapshot() const { + return CommittedSnapshot; + } + + const TSnapshot& GetCommittedSnapshotDef(const TSnapshot& def) const { + if (CommittedSnapshot) { + return *CommittedSnapshot; + } else { + return def; + } + } + + const TSnapshot& GetCommittedSnapshotVerified() const { + AFL_VERIFY(!!CommittedSnapshot); + return *CommittedSnapshot; + } + + bool IsCommitted() const { + return !!CommittedSnapshot; + } + + TInsertWriteId GetInsertWriteId() const { + return InsertWriteId; + } + const NArrow::TReplaceKey& GetFirst() const { return First; } @@ -72,11 +103,12 @@ public: return BlobRange.Size; } - TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const ui64 schemaVersion, const ui64 recordsCount, + TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const TInsertWriteId insertWriteId, const ui64 schemaVersion, const ui64 recordsCount, const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete, const NArrow::TSchemaSubset& subset) : BlobRange(blobRange) - , WriteInfo(snapshot) + , CommittedSnapshot(snapshot) + , InsertWriteId(insertWriteId) , SchemaVersion(schemaVersion) , RecordsCount(recordsCount) , IsDelete(isDelete) @@ -85,11 +117,11 @@ public: , SchemaSubset(subset) { } - TCommittedBlob(const TBlobRange& blobRange, const TInsertWriteId writeId, const ui64 schemaVersion, const ui64 recordsCount, + TCommittedBlob(const TBlobRange& blobRange, const TInsertWriteId insertWriteId, const ui64 schemaVersion, const ui64 recordsCount, const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete, const NArrow::TSchemaSubset& subset) : BlobRange(blobRange) - , WriteInfo(writeId) + , InsertWriteId(insertWriteId) , SchemaVersion(schemaVersion) , RecordsCount(recordsCount) , IsDelete(isDelete) @@ -107,43 +139,13 @@ public: return BlobRange.Hash(); } TString DebugString() const { - if (auto* ss = GetSnapshotOptional()) { - return TStringBuilder() << BlobRange << ";snapshot=" << ss->DebugString(); - } else { - return TStringBuilder() << BlobRange << ";write_id=" << (ui64)GetWriteIdVerified(); + TStringBuilder sb; + sb << BlobRange; + if (CommittedSnapshot) { + sb << ";snapshot=" << CommittedSnapshot->DebugString(); } - } - - bool HasSnapshot() const { - return GetSnapshotOptional(); - } - - const TSnapshot& GetSnapshotDef(const TSnapshot& def) const { - if (auto* snapshot = GetSnapshotOptional()) { - return *snapshot; - } else { - return def; - } - } - - const TSnapshot* GetSnapshotOptional() const { - return std::get_if<TSnapshot>(&WriteInfo); - } - - const TSnapshot& GetSnapshotVerified() const { - auto* result = GetSnapshotOptional(); - AFL_VERIFY(result); - return *result; - } - - const TInsertWriteId* GetWriteIdOptional() const { - return std::get_if<TInsertWriteId>(&WriteInfo); - } - - TInsertWriteId GetWriteIdVerified() const { - auto* result = GetWriteIdOptional(); - AFL_VERIFY(result); - return *result; + sb << ";write_id=" << GetInsertWriteId(); + return sb; } const TBlobRange& GetBlobRange() const { diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp index 7b8bd9334ac..56a4f730a42 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp @@ -148,7 +148,7 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const std::optional< if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(start, finish) == TPKRangeFilter::EUsageClass::DontUsage) { continue; } - result.emplace_back(TCommittedBlob(data.GetBlobRange(), data.GetSnapshot(), data.GetSchemaVersion(), data.GetMeta().GetNumRows(), + result.emplace_back(TCommittedBlob(data.GetBlobRange(), data.GetSnapshot(), data.GetInsertWriteId(), data.GetSchemaVersion(), data.GetMeta().GetNumRows(), start, finish, data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset())); } } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp index c24fbe0577a..5d93272c07b 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp @@ -34,11 +34,11 @@ TConclusionStatus TReadMetadata::Init( if (LockId) { for (auto&& i : CommittedBlobs) { - if (auto writeId = i.GetWriteIdOptional()) { - if (owner->HasLongTxWrites(*writeId)) { + if (!i.IsCommitted()) { + if (owner->HasLongTxWrites(i.GetInsertWriteId())) { } else { - auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(*writeId); - AddWriteIdToCheck(*writeId, op->GetLockId()); + auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(i.GetInsertWriteId()); + AddWriteIdToCheck(i.GetInsertWriteId(), op->GetLockId()); } } } @@ -125,7 +125,7 @@ void TReadMetadata::DoOnReplyConstruction(const ui64 tabletId, NKqp::NInternalIm bool TReadMetadata::IsMyUncommitted(const TInsertWriteId writeId) const { AFL_VERIFY(LockSharingInfo); auto it = ConflictedWriteIds.find(writeId); - AFL_VERIFY(it != ConflictedWriteIds.end()); + AFL_VERIFY(it != ConflictedWriteIds.end())("write_id", writeId)("write_ids_count", ConflictedWriteIds.size()); return it->second.GetLockId() == LockSharingInfo->GetLockId(); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp index 04ed0d1c6f2..5780b321918 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp @@ -22,21 +22,21 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context) sources.emplace_back(std::make_shared<TPortionDataSource>(sourceIdx++, i, SpecialReadContext)); } for (auto&& i : committed) { - if (i.HasSnapshot()) { + if (i.IsCommitted()) { continue; } - if (GetReadMetadata()->IsMyUncommitted(i.GetWriteIdVerified())) { + if (GetReadMetadata()->IsMyUncommitted(i.GetInsertWriteId())) { continue; } if (GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetFirst()) || GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetLast())) { - GetReadMetadata()->SetConflictedWriteId(i.GetWriteIdVerified()); + GetReadMetadata()->SetConflictedWriteId(i.GetInsertWriteId()); } } for (auto&& i : committed) { - if (!i.HasSnapshot()) { - if (GetReadMetadata()->IsWriteConflictable(i.GetWriteIdVerified())) { + if (!i.IsCommitted()) { + if (GetReadMetadata()->IsWriteConflictable(i.GetInsertWriteId())) { continue; } } else if (GetReadMetadata()->GetPKRangesFilter().IsPortionInPartialUsage(i.GetFirst(), i.GetLast()) == diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp index bef10d38f6b..5e4d80fbfe4 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp @@ -225,9 +225,20 @@ void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr<TColumnsSet>& AFL_VERIFY(rBatch)("schema", schema->ToString()); auto batch = std::make_shared<NArrow::TGeneralContainer>(rBatch); batchSchema->AdaptBatchToSchema(*batch, resultSchema); - GetContext()->GetReadMetadata()->GetIndexInfo().AddSnapshotColumns(*batch, CommittedBlob.GetSnapshotDef(TSnapshot::Zero())); + TSnapshot ss = TSnapshot::Zero(); + if (CommittedBlob.IsCommitted()) { + ss = CommittedBlob.GetCommittedSnapshotVerified(); + } else { + ss = GetContext()->GetReadMetadata()->IsMyUncommitted(CommittedBlob.GetInsertWriteId()) + ? GetContext()->GetReadMetadata()->GetRequestSnapshot() + : TSnapshot::Zero(); + } + GetContext()->GetReadMetadata()->GetIndexInfo().AddSnapshotColumns(*batch, ss, (ui64)CommittedBlob.GetInsertWriteId()); GetContext()->GetReadMetadata()->GetIndexInfo().AddDeleteFlagsColumn(*batch, CommittedBlob.GetIsDelete()); MutableStageData().AddBatch(batch); + if (CommittedBlob.GetIsDelete()) { + MutableStageData().AddFilter(NArrow::TColumnFilter::BuildDenyFilter()); + } } MutableStageData().SyncTableColumns(columns->GetSchema()->fields(), *resultSchema); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h index 889f9fe5e7d..69b39059bff 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h @@ -420,11 +420,11 @@ private: } virtual bool DoAddTxConflict() override { - if (CommittedBlob.HasSnapshot()) { + if (CommittedBlob.IsCommitted()) { GetContext()->GetReadMetadata()->SetBrokenWithCommitted(); return true; - } else if (!GetContext()->GetReadMetadata()->IsMyUncommitted(CommittedBlob.GetWriteIdVerified())) { - GetContext()->GetReadMetadata()->SetConflictedWriteId(CommittedBlob.GetWriteIdVerified()); + } else if (!GetContext()->GetReadMetadata()->IsMyUncommitted(CommittedBlob.GetInsertWriteId())) { + GetContext()->GetReadMetadata()->SetConflictedWriteId(CommittedBlob.GetInsertWriteId()); return true; } return false; @@ -467,8 +467,8 @@ public: } TCommittedDataSource(const ui32 sourceIdx, const TCommittedBlob& committed, const std::shared_ptr<TSpecialReadContext>& context) - : TBase(sourceIdx, context, committed.GetFirst(), committed.GetLast(), committed.GetSnapshotDef(TSnapshot::Zero()), - committed.GetSnapshotDef(TSnapshot::Zero()), committed.GetRecordsCount(), {}, committed.GetIsDelete()) + : TBase(sourceIdx, context, committed.GetFirst(), committed.GetLast(), committed.GetCommittedSnapshotDef(TSnapshot::Zero()), + committed.GetCommittedSnapshotDef(TSnapshot::Zero()), committed.GetRecordsCount(), {}, committed.GetIsDelete()) , CommittedBlob(committed) { } }; diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp index 78fae44fb6d..c1c31cb5487 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp @@ -21,11 +21,12 @@ void IIndexInfo::AddDeleteFlagsColumn(NArrow::TGeneralContainer& batch, const bo NArrow::TThreadSimpleArraysCache::GetConst(arrow::boolean(), std::make_shared<arrow::BooleanScalar>(isDelete), numRows)).Validate(); } -void IIndexInfo::AddSnapshotColumns(NArrow::TGeneralContainer& batch, const TSnapshot& snapshot) { +void IIndexInfo::AddSnapshotColumns(NArrow::TGeneralContainer& batch, const TSnapshot& snapshot, const ui64 insertWriteId) { const i64 numRows = batch.num_rows(); batch.AddField(arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()), NArrow::MakeUI64Array(snapshot.GetPlanStep(), numRows)).Validate(); batch.AddField(arrow::field(SPEC_COL_TX_ID, arrow::uint64()), NArrow::MakeUI64Array(snapshot.GetTxId(), numRows)).Validate(); + batch.AddField(arrow::field(SPEC_COL_WRITE_ID, arrow::uint64()), NArrow::MakeUI64Array(insertWriteId, numRows)).Validate(); } void IIndexInfo::NormalizeDeletionColumn(NArrow::TGeneralContainer& batch) { @@ -40,6 +41,8 @@ std::optional<ui32> IIndexInfo::GetColumnIdOptional(const std::string& name) con return ui32(ESpecialColumn::PLAN_STEP); } else if (name == SPEC_COL_TX_ID) { return ui32(ESpecialColumn::TX_ID); + } else if (name == SPEC_COL_WRITE_ID) { + return ui32(ESpecialColumn::WRITE_ID); } else if (name == SPEC_COL_DELETE_FLAG) { return ui32(ESpecialColumn::DELETE_FLAG); } @@ -51,8 +54,10 @@ std::optional<ui32> IIndexInfo::GetColumnIndexOptional(const std::string& name, return shift + 0; } else if (name == SPEC_COL_TX_ID) { return shift + 1; - } else if (name == SPEC_COL_DELETE_FLAG) { + } else if (name == SPEC_COL_WRITE_ID) { return shift + 2; + } else if (name == SPEC_COL_DELETE_FLAG) { + return shift + 3; } return {}; } @@ -62,6 +67,8 @@ TString IIndexInfo::GetColumnName(const ui32 id, const bool required) const { return SPEC_COL_PLAN_STEP; } else if (ESpecialColumn(id) == ESpecialColumn::TX_ID) { return SPEC_COL_TX_ID; + } else if (ESpecialColumn(id) == ESpecialColumn::WRITE_ID) { + return SPEC_COL_WRITE_ID; } else if (ESpecialColumn(id) == ESpecialColumn::DELETE_FLAG) { return SPEC_COL_DELETE_FLAG; } else { @@ -88,6 +95,8 @@ std::shared_ptr<arrow::Field> IIndexInfo::GetColumnFieldOptional(const ui32 colu return ArrowSchemaSnapshot()->field(0); } else if (ESpecialColumn(columnId) == ESpecialColumn::TX_ID) { return ArrowSchemaSnapshot()->field(1); + } else if (ESpecialColumn(columnId) == ESpecialColumn::WRITE_ID) { + return ArrowSchemaSnapshot()->field(2); } else if (ESpecialColumn(columnId) == ESpecialColumn::DELETE_FLAG) { return ArrowSchemaDeletion()->field(0); } else { @@ -106,6 +115,8 @@ std::shared_ptr<arrow::Scalar> IIndexInfo::DefaultColumnValue(const ui32 colId) return nullptr; } else if (colId == (ui32)ESpecialColumn::TX_ID) { return nullptr; + } else if (colId == (ui32)ESpecialColumn::WRITE_ID) { + return nullptr; } else if (colId == (ui32)ESpecialColumn::DELETE_FLAG) { static const std::shared_ptr<arrow::Scalar> deleteDefault(new arrow::BooleanScalar(false)); return deleteDefault; diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h index c986fe6b2aa..19fbe2267a7 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h @@ -16,7 +16,8 @@ public: enum class ESpecialColumn : ui32 { PLAN_STEP = NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP_INDEX, TX_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_TX_ID_INDEX, - DELETE_FLAG = NOlap::NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX + WRITE_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_WRITE_ID_INDEX, + DELETE_FLAG = NOlap::NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX, }; using TSystemColumnsSet = ui64; @@ -28,6 +29,7 @@ public: static constexpr const char* SPEC_COL_PLAN_STEP = NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP; static constexpr const char* SPEC_COL_TX_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_TX_ID; + static constexpr const char* SPEC_COL_WRITE_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_WRITE_ID; static constexpr const char* SPEC_COL_DELETE_FLAG = NOlap::NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG; static const char* GetDeleteFlagColumnName() { @@ -35,17 +37,18 @@ public: } static const std::set<ui32>& GetNecessarySystemColumnIdsSet() { - static const std::set<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID }; + static const std::set<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID, (ui32)ESpecialColumn::WRITE_ID }; return result; } static const std::vector<std::string>& GetSnapshotColumnNames() { - static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID) }; + static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID), + std::string(SPEC_COL_WRITE_ID) }; return result; } static const std::vector<ui32>& GetSnapshotColumnIds() { - static const std::vector<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID }; + static const std::vector<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID, (ui32)ESpecialColumn::WRITE_ID }; return result; } @@ -85,8 +88,10 @@ public: static void AddSnapshotFields(std::vector<std::shared_ptr<arrow::Field>>& fields) { static const std::shared_ptr<arrow::Field> ps = arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()); static const std::shared_ptr<arrow::Field> txid = arrow::field(SPEC_COL_TX_ID, arrow::uint64()); + static const std::shared_ptr<arrow::Field> writeId = arrow::field(SPEC_COL_WRITE_ID, arrow::uint64()); fields.push_back(ps); fields.push_back(txid); + fields.push_back(writeId); } static void AddDeleteFields(std::vector<std::shared_ptr<arrow::Field>>& fields) { @@ -94,18 +99,18 @@ public: } static const std::set<ui32>& GetSnapshotColumnIdsSet() { - static const std::set<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID }; + static const std::set<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID, (ui32)ESpecialColumn::WRITE_ID }; return result; } static const std::vector<std::string>& GetSystemColumnNames() { static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID), - std::string(SPEC_COL_DELETE_FLAG) }; + std::string(SPEC_COL_WRITE_ID), std::string(SPEC_COL_DELETE_FLAG) }; return result; } static const std::vector<ui32>& GetSystemColumnIds() { - static const std::vector<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID, + static const std::vector<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID, (ui32)ESpecialColumn::WRITE_ID, (ui32)ESpecialColumn::DELETE_FLAG }; return result; } @@ -143,7 +148,7 @@ public: static void NormalizeDeletionColumn(NArrow::TGeneralContainer& batch); - static void AddSnapshotColumns(NArrow::TGeneralContainer& batch, const TSnapshot& snapshot); + static void AddSnapshotColumns(NArrow::TGeneralContainer& batch, const TSnapshot& snapshot, const ui64 insertWriteId); static void AddDeleteFlagsColumn(NArrow::TGeneralContainer& batch, const bool isDelete); static ui64 GetSpecialColumnsRecordSize() { @@ -151,8 +156,8 @@ public: } static std::shared_ptr<arrow::Schema> ArrowSchemaSnapshot() { - static std::shared_ptr<arrow::Schema> result = std::make_shared<arrow::Schema>( - arrow::FieldVector{ arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()), arrow::field(SPEC_COL_TX_ID, arrow::uint64()) }); + static std::shared_ptr<arrow::Schema> result = std::make_shared<arrow::Schema>(arrow::FieldVector{ arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()), + arrow::field(SPEC_COL_TX_ID, arrow::uint64()), arrow::field(SPEC_COL_WRITE_ID, arrow::uint64()) }); return result; } @@ -167,12 +172,13 @@ public: } static bool IsSpecialColumn(const std::string& fieldName) { - return fieldName == SPEC_COL_PLAN_STEP || fieldName == SPEC_COL_TX_ID || fieldName == SPEC_COL_DELETE_FLAG; + return fieldName == SPEC_COL_PLAN_STEP || fieldName == SPEC_COL_TX_ID || fieldName == SPEC_COL_WRITE_ID || + fieldName == SPEC_COL_DELETE_FLAG; } static bool IsSpecialColumn(const ui32 fieldId) { return fieldId == (ui32)ESpecialColumn::PLAN_STEP || fieldId == (ui32)ESpecialColumn::TX_ID || - fieldId == (ui32)ESpecialColumn::DELETE_FLAG; + fieldId == (ui32)ESpecialColumn::WRITE_ID || fieldId == (ui32)ESpecialColumn::DELETE_FLAG; } static bool IsNullableVerified(const ui32 /*fieldId*/) { diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index 622c022a168..38dc6ffa044 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -497,7 +497,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { ui64 txId = 1; auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false)); UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK[0]->NumChunks(), columnIds.size() + TIndexInfo::GetSnapshotColumnIdsSet().size()); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK[0]->NumChunks(), columnIds.size() + TIndexInfo::GetSnapshotColumnIdsSet().size() - 1); } { // select another pathId |