summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <[email protected]>2024-09-23 12:09:29 +0300
committerGitHub <[email protected]>2024-09-23 12:09:29 +0300
commit6862d3c0e8769366a7221ccabe621a18ff0549bf (patch)
treeb1b8c848f05f608cbbdc565998d62fcb4a4b93dd
parentb47a8b8f9c631d8bc6a79dc4c5cb839d1b8356e0 (diff)
fix mvcc tests. use write id as row feature for conflicts resolving (#9598)
-rw-r--r--ydb/core/formats/arrow/arrow_filter.cpp6
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp4
-rw-r--r--ydb/core/protos/feature_flags.proto3
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h67
-rw-r--r--ydb/core/tx/columnshard/common/portion.h4
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp11
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/committed.h88
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp13
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h10
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp15
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h30
-rw-r--r--ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp2
17 files changed, 156 insertions, 122 deletions
diff --git a/ydb/core/formats/arrow/arrow_filter.cpp b/ydb/core/formats/arrow/arrow_filter.cpp
index c404a016f4b..62b47a66e24 100644
--- a/ydb/core/formats/arrow/arrow_filter.cpp
+++ b/ydb/core/formats/arrow/arrow_filter.cpp
@@ -575,8 +575,10 @@ TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter
}
TColumnFilter::TIterator TColumnFilter::GetIterator(const bool reverse, const ui32 expectedSize) const {
- if ((IsTotalAllowFilter() || IsTotalDenyFilter()) && !Filter.size()) {
- return TIterator(reverse, expectedSize, LastValue);
+ if (IsTotalAllowFilter()) {
+ return TIterator(reverse, expectedSize, true);
+ } else if (IsTotalDenyFilter()) {
+ return TIterator(reverse, expectedSize, false);
} else {
AFL_VERIFY(expectedSize == Size())("expected", expectedSize)("size", Size())("reverse", reverse);
return TIterator(reverse, Filter, GetStartValue(reverse));
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index 62e345ba825..ec81be006de 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -7903,10 +7903,10 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testHelper.ReadData("SELECT * FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[3;\"-321\";\"-3.14\";[\"test_res_3\"]]]");
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[\"-3.14\"]]");
testHelper.ReadData("SELECT resource_id FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[[\"test_res_3\"]]]");
- testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest`", "[[#];[#];[\"-3.14\"]]");
+ testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` ORDER BY new_column", "[[#];[#];[\"-3.14\"]]");
testHelper.RebootTablets(testTable.GetName());
- testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest`", "[[#];[#];[\"-3.14\"]]");
+ testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` ORDER BY new_column", "[[#];[#];[\"-3.14\"]]");
}
Y_UNIT_TEST(AddColumnErrors) {
diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto
index 52b2398d7f0..f8c4d1c49aa 100644
--- a/ydb/core/protos/feature_flags.proto
+++ b/ydb/core/protos/feature_flags.proto
@@ -162,5 +162,6 @@ message TFeatureFlags {
optional bool EnableExternalDataSourcesOnServerless = 143 [default = true];
optional bool EnableSparsedColumns = 144 [default = false];
optional bool EnableParameterizedDecimal = 145 [default = false];
- optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false];
+ optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false];
+ optional bool EnableInsertWriteIdSpecialColumnCompatibility = 147 [default = false];
}
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index 8b104b9dcd5..4f08426ddd7 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -306,9 +306,10 @@ struct Schema : NIceDb::Schema {
struct BlobRangeOffset: Column<11, NScheme::NTypeIds::Uint64> {};
struct BlobRangeSize: Column<12, NScheme::NTypeIds::Uint64> {};
+ struct InsertWriteId: Column<13, NScheme::NTypeIds::Uint64> {};
using TKey = TableKey<Committed, PlanStep, WriteTxId, PathId, DedupId>;
- using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion, BlobRangeOffset, BlobRangeSize>;
+ using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion, BlobRangeOffset, BlobRangeSize, InsertWriteId>;
};
struct IndexGranules : NIceDb::Schema::Table<GranulesTableId> {
@@ -808,6 +809,7 @@ struct Schema : NIceDb::Schema {
.Key((ui8)recType, 0, (ui64)data.GetInsertWriteId(), data.GetPathId(), "")
.Update(NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange().GetBlobId().ToStringLegacy()),
NIceDb::TUpdate<InsertTable::BlobRangeOffset>(data.GetBlobRange().Offset),
+ NIceDb::TUpdate<InsertTable::InsertWriteId>((ui64)data.GetInsertWriteId()),
NIceDb::TUpdate<InsertTable::BlobRangeSize>(data.GetBlobRange().Size),
NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()),
NIceDb::TUpdate<InsertTable::SchemaVersion>(data.GetSchemaVersion()));
@@ -818,6 +820,7 @@ struct Schema : NIceDb::Schema {
.Key((ui8)EInsertTableIds::Committed, data.GetSnapshot().GetPlanStep(), data.GetSnapshot().GetTxId(), data.GetPathId(),
data.GetDedupId())
.Update(NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange().GetBlobId().ToStringLegacy()),
+ NIceDb::TUpdate<InsertTable::InsertWriteId>((ui64)data.GetInsertWriteId()),
NIceDb::TUpdate<InsertTable::BlobRangeOffset>(data.GetBlobRange().Offset),
NIceDb::TUpdate<InsertTable::BlobRangeSize>(data.GetBlobRange().Size),
NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()),
@@ -982,6 +985,7 @@ private:
NColumnShard::Schema::EInsertTableIds RecType;
ui64 PlanStep;
ui64 WriteTxId;
+ TInsertWriteId InsertWriteId;
ui64 PathId;
YDB_ACCESSOR_DEF(TString, DedupId);
ui64 SchemaVersion;
@@ -989,8 +993,8 @@ private:
std::optional<NOlap::TUnifiedBlobId> BlobId;
TString MetadataString;
std::optional<NKikimrTxColumnShard::TLogicalMetadata> Metadata;
- std::optional<ui64> RangeOffset;
- std::optional<ui64> RangeSize;
+ ui64 RangeOffset;
+ ui64 RangeSize;
void Prepare(const IBlobGroupSelector* dsGroupSelector) {
AFL_VERIFY(!PreparedFlag);
@@ -1004,7 +1008,6 @@ private:
AFL_VERIFY(MetadataString);
Y_ABORT_UNLESS(meta.ParseFromString(MetadataString));
Metadata = std::move(meta);
- AFL_VERIFY(!!RangeOffset == !!RangeSize);
}
bool PreparedFlag = false;
@@ -1013,8 +1016,13 @@ private:
public:
TInsertWriteId GetInsertWriteId() const {
AFL_VERIFY(ParsedFlag);
- AFL_VERIFY(RecType != NColumnShard::Schema::EInsertTableIds::Committed);
- return (TInsertWriteId)WriteTxId;
+ return InsertWriteId;
+ }
+
+ ui64 GetTxId() const {
+ AFL_VERIFY(ParsedFlag);
+ AFL_VERIFY(RecType == NColumnShard::Schema::EInsertTableIds::Committed);
+ return WriteTxId;
}
NColumnShard::Schema::EInsertTableIds GetRecType() const {
@@ -1024,6 +1032,7 @@ public:
ui64 GetPlanStep() const {
AFL_VERIFY(ParsedFlag);
+ AFL_VERIFY(RecType == NColumnShard::Schema::EInsertTableIds::Committed);
return PlanStep;
}
@@ -1035,19 +1044,12 @@ public:
void Upsert(NIceDb::TNiceDb& db) const {
AFL_VERIFY(ParsedFlag);
using namespace NColumnShard;
- if (RangeOffset) {
- db.Table<Schema::InsertTable>()
- .Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
- .Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString),
- NIceDb::TUpdate<Schema::InsertTable::BlobRangeOffset>(*RangeOffset),
- NIceDb::TUpdate<Schema::InsertTable::BlobRangeSize>(*RangeSize), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
- NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
- } else {
- db.Table<Schema::InsertTable>()
- .Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
- .Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
- NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
- }
+ db.Table<Schema::InsertTable>()
+ .Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
+ .Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString),
+ NIceDb::TUpdate<Schema::InsertTable::BlobRangeOffset>(RangeOffset),
+ NIceDb::TUpdate<Schema::InsertTable::BlobRangeSize>(RangeSize), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
+ NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
}
template <class TRowset>
@@ -1059,41 +1061,40 @@ public:
PlanStep = rowset.template GetValue<Schema::InsertTable::PlanStep>();
WriteTxId = rowset.template GetValueOrDefault<Schema::InsertTable::WriteTxId>();
AFL_VERIFY(WriteTxId);
+ InsertWriteId = (TInsertWriteId)rowset.template GetValueOrDefault<Schema::InsertTable::InsertWriteId>(WriteTxId);
PathId = rowset.template GetValue<Schema::InsertTable::PathId>();
DedupId = rowset.template GetValue<Schema::InsertTable::DedupId>();
- SchemaVersion =
- rowset.template HaveValue<Schema::InsertTable::SchemaVersion>() ? rowset.template GetValue<Schema::InsertTable::SchemaVersion>() : 0;
+ SchemaVersion = rowset.template GetValueOrDefault<Schema::InsertTable::SchemaVersion>(0);
BlobIdString = rowset.template GetValue<Schema::InsertTable::BlobId>();
MetadataString = rowset.template GetValue<Schema::InsertTable::Meta>();
- if (rowset.template HaveValue<Schema::InsertTable::BlobRangeOffset>()) {
- RangeOffset = rowset.template GetValue<Schema::InsertTable::BlobRangeOffset>();
- }
- if (rowset.template HaveValue<Schema::InsertTable::BlobRangeSize>()) {
- RangeSize = rowset.template GetValue<Schema::InsertTable::BlobRangeSize>();
- }
+ AFL_VERIFY(rowset.template HaveValue<Schema::InsertTable::BlobRangeOffset>());
+ AFL_VERIFY(rowset.template HaveValue<Schema::InsertTable::BlobRangeSize>());
+ RangeOffset = rowset.template GetValue<Schema::InsertTable::BlobRangeOffset>();
+ RangeSize = rowset.template GetValue<Schema::InsertTable::BlobRangeSize>();
}
NOlap::TCommittedData BuildCommitted(const IBlobGroupSelector* dsGroupSelector) {
Prepare(dsGroupSelector);
using namespace NColumnShard;
AFL_VERIFY(RecType == Schema::EInsertTableIds::Committed);
- auto userData = std::make_shared<NOlap::TUserData>(PathId,
- NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt);
+ auto userData = std::make_shared<NOlap::TUserData>(
+ PathId, NOlap::TBlobRange(*BlobId, RangeOffset, RangeSize), *Metadata, SchemaVersion, std::nullopt);
AFL_VERIFY(!!DedupId);
AFL_VERIFY(PlanStep);
- return NOlap::TCommittedData(userData, PlanStep, WriteTxId, DedupId);
+ return NOlap::TCommittedData(userData, PlanStep, WriteTxId, InsertWriteId, DedupId);
}
NOlap::TInsertedData BuildInsertedOrAborted(const IBlobGroupSelector* dsGroupSelector) {
Prepare(dsGroupSelector);
using namespace NColumnShard;
+ AFL_VERIFY(InsertWriteId == (TInsertWriteId)WriteTxId)("insert", InsertWriteId)("write", WriteTxId);
AFL_VERIFY(RecType != Schema::EInsertTableIds::Committed);
- auto userData = std::make_shared<NOlap::TUserData>(PathId,
- NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt);
+ auto userData = std::make_shared<NOlap::TUserData>(
+ PathId, NOlap::TBlobRange(*BlobId, RangeOffset, RangeSize), *Metadata, SchemaVersion, std::nullopt);
AFL_VERIFY(!DedupId);
AFL_VERIFY(!PlanStep);
- return NOlap::TInsertedData((TInsertWriteId)WriteTxId, userData);
+ return NOlap::TInsertedData(InsertWriteId, userData);
}
};
diff --git a/ydb/core/tx/columnshard/common/portion.h b/ydb/core/tx/columnshard/common/portion.h
index d8497d5174a..311cfa23269 100644
--- a/ydb/core/tx/columnshard/common/portion.h
+++ b/ydb/core/tx/columnshard/common/portion.h
@@ -17,10 +17,12 @@ class TSpecialColumns {
public:
static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step";
static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id";
+ static constexpr const char* SPEC_COL_WRITE_ID = "_yql_write_id";
static constexpr const char* SPEC_COL_DELETE_FLAG = "_yql_delete_flag";
static const ui32 SPEC_COL_PLAN_STEP_INDEX = 0xffffff00;
static const ui32 SPEC_COL_TX_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 1;
- static const ui32 SPEC_COL_DELETE_FLAG_INDEX = SPEC_COL_PLAN_STEP_INDEX + 2;
+ static const ui32 SPEC_COL_WRITE_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 2;
+ static const ui32 SPEC_COL_DELETE_FLAG_INDEX = SPEC_COL_PLAN_STEP_INDEX + 3;
};
}
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
index 43942643986..825f65f8010 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
@@ -76,6 +76,10 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared
TMergingContext mergingContext(batchResults, Batches);
for (auto&& [columnId, columnData] : columnsData) {
+ if (columnId == (ui32)IIndexInfo::ESpecialColumn::WRITE_ID &&
+ (!HasAppData() || !AppDataVerified().FeatureFlags.GetEnableInsertWriteIdSpecialColumnCompatibility())) {
+ continue;
+ }
const TString& columnName = resultFiltered->GetIndexInfo().GetColumnName(columnId);
NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("field_name", columnName));
auto columnInfo = stats->GetColumnInfo(columnId);
@@ -125,13 +129,6 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared
AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())(
"current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first);
}
- auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
- auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
- Y_ABORT_UNLESS(columnSnapshotPlanStepIdx);
- Y_ABORT_UNLESS(columnSnapshotTxIdx);
- Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id);
- Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id);
-
std::vector<TGeneralSerializedSlice> batchSlices;
std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultFiltered, stats));
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
index ea7b6ddc2eb..2f76ab4b177 100644
--- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
@@ -114,6 +114,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(
if (dataColumnIds.contains((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG)) {
pkColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
}
+ dataColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::WRITE_ID);
}
resultFiltered = std::make_shared<TFilteredSnapshotSchema>(resultSchema, dataColumnIds);
{
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
index 22ca7fd2c73..edce92470ad 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
@@ -244,7 +244,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema));
blobSchema->AdaptBatchToSchema(*batch, resultSchema);
}
- IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot());
+ IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot(), (ui64)inserted.GetInsertWriteId());
auto& pathInfo = pathBatches.GetPathInfo(inserted.GetPathId());
diff --git a/ydb/core/tx/columnshard/engines/insert_table/committed.h b/ydb/core/tx/columnshard/engines/insert_table/committed.h
index b075d9b8a39..141ca4cb562 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/committed.h
+++ b/ydb/core/tx/columnshard/engines/insert_table/committed.h
@@ -9,6 +9,7 @@ class TCommittedData: public TUserDataContainer {
private:
using TBase = TUserDataContainer;
YDB_READONLY(TSnapshot, Snapshot, NOlap::TSnapshot::Zero());
+ YDB_READONLY(TInsertWriteId, InsertWriteId, (TInsertWriteId)0);
YDB_READONLY_DEF(TString, DedupId);
YDB_READONLY(bool, Remove, false);
@@ -16,19 +17,23 @@ public:
TCommittedData(const std::shared_ptr<TUserData>& userData, const ui64 planStep, const ui64 txId, const TInsertWriteId insertWriteId)
: TBase(userData)
, Snapshot(planStep, txId)
+ , InsertWriteId(insertWriteId)
, DedupId(ToString(planStep) + ":" + ToString((ui64)insertWriteId)) {
}
- TCommittedData(const std::shared_ptr<TUserData>& userData, const ui64 planStep, const ui64 txId, const TString& dedupId)
+ TCommittedData(const std::shared_ptr<TUserData>& userData, const ui64 planStep, const ui64 txId, const TInsertWriteId insertWriteId,
+ const TString& dedupId)
: TBase(userData)
, Snapshot(planStep, txId)
+ , InsertWriteId(insertWriteId)
, DedupId(dedupId) {
}
TCommittedData(const std::shared_ptr<TUserData>& userData, const TSnapshot& ss, const ui64 generation, const TInsertWriteId ephemeralWriteId)
: TBase(userData)
, Snapshot(ss)
- , DedupId(ToString(generation) + ":" + ToString(ephemeralWriteId)) {
+ , InsertWriteId(ephemeralWriteId)
+ , DedupId(ToString(generation) + ":" + ToString((ui64)ephemeralWriteId)) {
}
void SetRemove() {
@@ -52,7 +57,8 @@ public:
class TCommittedBlob {
private:
TBlobRange BlobRange;
- std::variant<TSnapshot, TInsertWriteId> WriteInfo;
+ std::optional<TSnapshot> CommittedSnapshot;
+ const TInsertWriteId InsertWriteId;
YDB_READONLY(ui64, SchemaVersion, 0);
YDB_READONLY(ui64, RecordsCount, 0);
YDB_READONLY(bool, IsDelete, false);
@@ -61,6 +67,31 @@ private:
YDB_READONLY_DEF(NArrow::TSchemaSubset, SchemaSubset);
public:
+ const std::optional<TSnapshot>& GetCommittedSnapshot() const {
+ return CommittedSnapshot;
+ }
+
+ const TSnapshot& GetCommittedSnapshotDef(const TSnapshot& def) const {
+ if (CommittedSnapshot) {
+ return *CommittedSnapshot;
+ } else {
+ return def;
+ }
+ }
+
+ const TSnapshot& GetCommittedSnapshotVerified() const {
+ AFL_VERIFY(!!CommittedSnapshot);
+ return *CommittedSnapshot;
+ }
+
+ bool IsCommitted() const {
+ return !!CommittedSnapshot;
+ }
+
+ TInsertWriteId GetInsertWriteId() const {
+ return InsertWriteId;
+ }
+
const NArrow::TReplaceKey& GetFirst() const {
return First;
}
@@ -72,11 +103,12 @@ public:
return BlobRange.Size;
}
- TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const ui64 schemaVersion, const ui64 recordsCount,
+ TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const TInsertWriteId insertWriteId, const ui64 schemaVersion, const ui64 recordsCount,
const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete,
const NArrow::TSchemaSubset& subset)
: BlobRange(blobRange)
- , WriteInfo(snapshot)
+ , CommittedSnapshot(snapshot)
+ , InsertWriteId(insertWriteId)
, SchemaVersion(schemaVersion)
, RecordsCount(recordsCount)
, IsDelete(isDelete)
@@ -85,11 +117,11 @@ public:
, SchemaSubset(subset) {
}
- TCommittedBlob(const TBlobRange& blobRange, const TInsertWriteId writeId, const ui64 schemaVersion, const ui64 recordsCount,
+ TCommittedBlob(const TBlobRange& blobRange, const TInsertWriteId insertWriteId, const ui64 schemaVersion, const ui64 recordsCount,
const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete,
const NArrow::TSchemaSubset& subset)
: BlobRange(blobRange)
- , WriteInfo(writeId)
+ , InsertWriteId(insertWriteId)
, SchemaVersion(schemaVersion)
, RecordsCount(recordsCount)
, IsDelete(isDelete)
@@ -107,43 +139,13 @@ public:
return BlobRange.Hash();
}
TString DebugString() const {
- if (auto* ss = GetSnapshotOptional()) {
- return TStringBuilder() << BlobRange << ";snapshot=" << ss->DebugString();
- } else {
- return TStringBuilder() << BlobRange << ";write_id=" << (ui64)GetWriteIdVerified();
+ TStringBuilder sb;
+ sb << BlobRange;
+ if (CommittedSnapshot) {
+ sb << ";snapshot=" << CommittedSnapshot->DebugString();
}
- }
-
- bool HasSnapshot() const {
- return GetSnapshotOptional();
- }
-
- const TSnapshot& GetSnapshotDef(const TSnapshot& def) const {
- if (auto* snapshot = GetSnapshotOptional()) {
- return *snapshot;
- } else {
- return def;
- }
- }
-
- const TSnapshot* GetSnapshotOptional() const {
- return std::get_if<TSnapshot>(&WriteInfo);
- }
-
- const TSnapshot& GetSnapshotVerified() const {
- auto* result = GetSnapshotOptional();
- AFL_VERIFY(result);
- return *result;
- }
-
- const TInsertWriteId* GetWriteIdOptional() const {
- return std::get_if<TInsertWriteId>(&WriteInfo);
- }
-
- TInsertWriteId GetWriteIdVerified() const {
- auto* result = GetWriteIdOptional();
- AFL_VERIFY(result);
- return *result;
+ sb << ";write_id=" << GetInsertWriteId();
+ return sb;
}
const TBlobRange& GetBlobRange() const {
diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
index 7b8bd9334ac..56a4f730a42 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
@@ -148,7 +148,7 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const std::optional<
if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(start, finish) == TPKRangeFilter::EUsageClass::DontUsage) {
continue;
}
- result.emplace_back(TCommittedBlob(data.GetBlobRange(), data.GetSnapshot(), data.GetSchemaVersion(), data.GetMeta().GetNumRows(),
+ result.emplace_back(TCommittedBlob(data.GetBlobRange(), data.GetSnapshot(), data.GetInsertWriteId(), data.GetSchemaVersion(), data.GetMeta().GetNumRows(),
start, finish, data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset()));
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp
index c24fbe0577a..5d93272c07b 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp
@@ -34,11 +34,11 @@ TConclusionStatus TReadMetadata::Init(
if (LockId) {
for (auto&& i : CommittedBlobs) {
- if (auto writeId = i.GetWriteIdOptional()) {
- if (owner->HasLongTxWrites(*writeId)) {
+ if (!i.IsCommitted()) {
+ if (owner->HasLongTxWrites(i.GetInsertWriteId())) {
} else {
- auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(*writeId);
- AddWriteIdToCheck(*writeId, op->GetLockId());
+ auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(i.GetInsertWriteId());
+ AddWriteIdToCheck(i.GetInsertWriteId(), op->GetLockId());
}
}
}
@@ -125,7 +125,7 @@ void TReadMetadata::DoOnReplyConstruction(const ui64 tabletId, NKqp::NInternalIm
bool TReadMetadata::IsMyUncommitted(const TInsertWriteId writeId) const {
AFL_VERIFY(LockSharingInfo);
auto it = ConflictedWriteIds.find(writeId);
- AFL_VERIFY(it != ConflictedWriteIds.end());
+ AFL_VERIFY(it != ConflictedWriteIds.end())("write_id", writeId)("write_ids_count", ConflictedWriteIds.size());
return it->second.GetLockId() == LockSharingInfo->GetLockId();
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp
index 04ed0d1c6f2..5780b321918 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp
@@ -22,21 +22,21 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
sources.emplace_back(std::make_shared<TPortionDataSource>(sourceIdx++, i, SpecialReadContext));
}
for (auto&& i : committed) {
- if (i.HasSnapshot()) {
+ if (i.IsCommitted()) {
continue;
}
- if (GetReadMetadata()->IsMyUncommitted(i.GetWriteIdVerified())) {
+ if (GetReadMetadata()->IsMyUncommitted(i.GetInsertWriteId())) {
continue;
}
if (GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetFirst()) ||
GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetLast())) {
- GetReadMetadata()->SetConflictedWriteId(i.GetWriteIdVerified());
+ GetReadMetadata()->SetConflictedWriteId(i.GetInsertWriteId());
}
}
for (auto&& i : committed) {
- if (!i.HasSnapshot()) {
- if (GetReadMetadata()->IsWriteConflictable(i.GetWriteIdVerified())) {
+ if (!i.IsCommitted()) {
+ if (GetReadMetadata()->IsWriteConflictable(i.GetInsertWriteId())) {
continue;
}
} else if (GetReadMetadata()->GetPKRangesFilter().IsPortionInPartialUsage(i.GetFirst(), i.GetLast()) ==
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp
index bef10d38f6b..5e4d80fbfe4 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp
@@ -225,9 +225,20 @@ void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr<TColumnsSet>&
AFL_VERIFY(rBatch)("schema", schema->ToString());
auto batch = std::make_shared<NArrow::TGeneralContainer>(rBatch);
batchSchema->AdaptBatchToSchema(*batch, resultSchema);
- GetContext()->GetReadMetadata()->GetIndexInfo().AddSnapshotColumns(*batch, CommittedBlob.GetSnapshotDef(TSnapshot::Zero()));
+ TSnapshot ss = TSnapshot::Zero();
+ if (CommittedBlob.IsCommitted()) {
+ ss = CommittedBlob.GetCommittedSnapshotVerified();
+ } else {
+ ss = GetContext()->GetReadMetadata()->IsMyUncommitted(CommittedBlob.GetInsertWriteId())
+ ? GetContext()->GetReadMetadata()->GetRequestSnapshot()
+ : TSnapshot::Zero();
+ }
+ GetContext()->GetReadMetadata()->GetIndexInfo().AddSnapshotColumns(*batch, ss, (ui64)CommittedBlob.GetInsertWriteId());
GetContext()->GetReadMetadata()->GetIndexInfo().AddDeleteFlagsColumn(*batch, CommittedBlob.GetIsDelete());
MutableStageData().AddBatch(batch);
+ if (CommittedBlob.GetIsDelete()) {
+ MutableStageData().AddFilter(NArrow::TColumnFilter::BuildDenyFilter());
+ }
}
MutableStageData().SyncTableColumns(columns->GetSchema()->fields(), *resultSchema);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h
index 889f9fe5e7d..69b39059bff 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h
@@ -420,11 +420,11 @@ private:
}
virtual bool DoAddTxConflict() override {
- if (CommittedBlob.HasSnapshot()) {
+ if (CommittedBlob.IsCommitted()) {
GetContext()->GetReadMetadata()->SetBrokenWithCommitted();
return true;
- } else if (!GetContext()->GetReadMetadata()->IsMyUncommitted(CommittedBlob.GetWriteIdVerified())) {
- GetContext()->GetReadMetadata()->SetConflictedWriteId(CommittedBlob.GetWriteIdVerified());
+ } else if (!GetContext()->GetReadMetadata()->IsMyUncommitted(CommittedBlob.GetInsertWriteId())) {
+ GetContext()->GetReadMetadata()->SetConflictedWriteId(CommittedBlob.GetInsertWriteId());
return true;
}
return false;
@@ -467,8 +467,8 @@ public:
}
TCommittedDataSource(const ui32 sourceIdx, const TCommittedBlob& committed, const std::shared_ptr<TSpecialReadContext>& context)
- : TBase(sourceIdx, context, committed.GetFirst(), committed.GetLast(), committed.GetSnapshotDef(TSnapshot::Zero()),
- committed.GetSnapshotDef(TSnapshot::Zero()), committed.GetRecordsCount(), {}, committed.GetIsDelete())
+ : TBase(sourceIdx, context, committed.GetFirst(), committed.GetLast(), committed.GetCommittedSnapshotDef(TSnapshot::Zero()),
+ committed.GetCommittedSnapshotDef(TSnapshot::Zero()), committed.GetRecordsCount(), {}, committed.GetIsDelete())
, CommittedBlob(committed) {
}
};
diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp
index 78fae44fb6d..c1c31cb5487 100644
--- a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp
@@ -21,11 +21,12 @@ void IIndexInfo::AddDeleteFlagsColumn(NArrow::TGeneralContainer& batch, const bo
NArrow::TThreadSimpleArraysCache::GetConst(arrow::boolean(), std::make_shared<arrow::BooleanScalar>(isDelete), numRows)).Validate();
}
-void IIndexInfo::AddSnapshotColumns(NArrow::TGeneralContainer& batch, const TSnapshot& snapshot) {
+void IIndexInfo::AddSnapshotColumns(NArrow::TGeneralContainer& batch, const TSnapshot& snapshot, const ui64 insertWriteId) {
const i64 numRows = batch.num_rows();
batch.AddField(arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()), NArrow::MakeUI64Array(snapshot.GetPlanStep(), numRows)).Validate();
batch.AddField(arrow::field(SPEC_COL_TX_ID, arrow::uint64()), NArrow::MakeUI64Array(snapshot.GetTxId(), numRows)).Validate();
+ batch.AddField(arrow::field(SPEC_COL_WRITE_ID, arrow::uint64()), NArrow::MakeUI64Array(insertWriteId, numRows)).Validate();
}
void IIndexInfo::NormalizeDeletionColumn(NArrow::TGeneralContainer& batch) {
@@ -40,6 +41,8 @@ std::optional<ui32> IIndexInfo::GetColumnIdOptional(const std::string& name) con
return ui32(ESpecialColumn::PLAN_STEP);
} else if (name == SPEC_COL_TX_ID) {
return ui32(ESpecialColumn::TX_ID);
+ } else if (name == SPEC_COL_WRITE_ID) {
+ return ui32(ESpecialColumn::WRITE_ID);
} else if (name == SPEC_COL_DELETE_FLAG) {
return ui32(ESpecialColumn::DELETE_FLAG);
}
@@ -51,8 +54,10 @@ std::optional<ui32> IIndexInfo::GetColumnIndexOptional(const std::string& name,
return shift + 0;
} else if (name == SPEC_COL_TX_ID) {
return shift + 1;
- } else if (name == SPEC_COL_DELETE_FLAG) {
+ } else if (name == SPEC_COL_WRITE_ID) {
return shift + 2;
+ } else if (name == SPEC_COL_DELETE_FLAG) {
+ return shift + 3;
}
return {};
}
@@ -62,6 +67,8 @@ TString IIndexInfo::GetColumnName(const ui32 id, const bool required) const {
return SPEC_COL_PLAN_STEP;
} else if (ESpecialColumn(id) == ESpecialColumn::TX_ID) {
return SPEC_COL_TX_ID;
+ } else if (ESpecialColumn(id) == ESpecialColumn::WRITE_ID) {
+ return SPEC_COL_WRITE_ID;
} else if (ESpecialColumn(id) == ESpecialColumn::DELETE_FLAG) {
return SPEC_COL_DELETE_FLAG;
} else {
@@ -88,6 +95,8 @@ std::shared_ptr<arrow::Field> IIndexInfo::GetColumnFieldOptional(const ui32 colu
return ArrowSchemaSnapshot()->field(0);
} else if (ESpecialColumn(columnId) == ESpecialColumn::TX_ID) {
return ArrowSchemaSnapshot()->field(1);
+ } else if (ESpecialColumn(columnId) == ESpecialColumn::WRITE_ID) {
+ return ArrowSchemaSnapshot()->field(2);
} else if (ESpecialColumn(columnId) == ESpecialColumn::DELETE_FLAG) {
return ArrowSchemaDeletion()->field(0);
} else {
@@ -106,6 +115,8 @@ std::shared_ptr<arrow::Scalar> IIndexInfo::DefaultColumnValue(const ui32 colId)
return nullptr;
} else if (colId == (ui32)ESpecialColumn::TX_ID) {
return nullptr;
+ } else if (colId == (ui32)ESpecialColumn::WRITE_ID) {
+ return nullptr;
} else if (colId == (ui32)ESpecialColumn::DELETE_FLAG) {
static const std::shared_ptr<arrow::Scalar> deleteDefault(new arrow::BooleanScalar(false));
return deleteDefault;
diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h
index c986fe6b2aa..19fbe2267a7 100644
--- a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h
+++ b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h
@@ -16,7 +16,8 @@ public:
enum class ESpecialColumn : ui32 {
PLAN_STEP = NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP_INDEX,
TX_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_TX_ID_INDEX,
- DELETE_FLAG = NOlap::NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX
+ WRITE_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_WRITE_ID_INDEX,
+ DELETE_FLAG = NOlap::NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX,
};
using TSystemColumnsSet = ui64;
@@ -28,6 +29,7 @@ public:
static constexpr const char* SPEC_COL_PLAN_STEP = NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP;
static constexpr const char* SPEC_COL_TX_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_TX_ID;
+ static constexpr const char* SPEC_COL_WRITE_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_WRITE_ID;
static constexpr const char* SPEC_COL_DELETE_FLAG = NOlap::NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG;
static const char* GetDeleteFlagColumnName() {
@@ -35,17 +37,18 @@ public:
}
static const std::set<ui32>& GetNecessarySystemColumnIdsSet() {
- static const std::set<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID };
+ static const std::set<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID, (ui32)ESpecialColumn::WRITE_ID };
return result;
}
static const std::vector<std::string>& GetSnapshotColumnNames() {
- static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID) };
+ static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID),
+ std::string(SPEC_COL_WRITE_ID) };
return result;
}
static const std::vector<ui32>& GetSnapshotColumnIds() {
- static const std::vector<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID };
+ static const std::vector<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID, (ui32)ESpecialColumn::WRITE_ID };
return result;
}
@@ -85,8 +88,10 @@ public:
static void AddSnapshotFields(std::vector<std::shared_ptr<arrow::Field>>& fields) {
static const std::shared_ptr<arrow::Field> ps = arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64());
static const std::shared_ptr<arrow::Field> txid = arrow::field(SPEC_COL_TX_ID, arrow::uint64());
+ static const std::shared_ptr<arrow::Field> writeId = arrow::field(SPEC_COL_WRITE_ID, arrow::uint64());
fields.push_back(ps);
fields.push_back(txid);
+ fields.push_back(writeId);
}
static void AddDeleteFields(std::vector<std::shared_ptr<arrow::Field>>& fields) {
@@ -94,18 +99,18 @@ public:
}
static const std::set<ui32>& GetSnapshotColumnIdsSet() {
- static const std::set<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID };
+ static const std::set<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID, (ui32)ESpecialColumn::WRITE_ID };
return result;
}
static const std::vector<std::string>& GetSystemColumnNames() {
static const std::vector<std::string> result = { std::string(SPEC_COL_PLAN_STEP), std::string(SPEC_COL_TX_ID),
- std::string(SPEC_COL_DELETE_FLAG) };
+ std::string(SPEC_COL_WRITE_ID), std::string(SPEC_COL_DELETE_FLAG) };
return result;
}
static const std::vector<ui32>& GetSystemColumnIds() {
- static const std::vector<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID,
+ static const std::vector<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID, (ui32)ESpecialColumn::WRITE_ID,
(ui32)ESpecialColumn::DELETE_FLAG };
return result;
}
@@ -143,7 +148,7 @@ public:
static void NormalizeDeletionColumn(NArrow::TGeneralContainer& batch);
- static void AddSnapshotColumns(NArrow::TGeneralContainer& batch, const TSnapshot& snapshot);
+ static void AddSnapshotColumns(NArrow::TGeneralContainer& batch, const TSnapshot& snapshot, const ui64 insertWriteId);
static void AddDeleteFlagsColumn(NArrow::TGeneralContainer& batch, const bool isDelete);
static ui64 GetSpecialColumnsRecordSize() {
@@ -151,8 +156,8 @@ public:
}
static std::shared_ptr<arrow::Schema> ArrowSchemaSnapshot() {
- static std::shared_ptr<arrow::Schema> result = std::make_shared<arrow::Schema>(
- arrow::FieldVector{ arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()), arrow::field(SPEC_COL_TX_ID, arrow::uint64()) });
+ static std::shared_ptr<arrow::Schema> result = std::make_shared<arrow::Schema>(arrow::FieldVector{ arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()),
+ arrow::field(SPEC_COL_TX_ID, arrow::uint64()), arrow::field(SPEC_COL_WRITE_ID, arrow::uint64()) });
return result;
}
@@ -167,12 +172,13 @@ public:
}
static bool IsSpecialColumn(const std::string& fieldName) {
- return fieldName == SPEC_COL_PLAN_STEP || fieldName == SPEC_COL_TX_ID || fieldName == SPEC_COL_DELETE_FLAG;
+ return fieldName == SPEC_COL_PLAN_STEP || fieldName == SPEC_COL_TX_ID || fieldName == SPEC_COL_WRITE_ID ||
+ fieldName == SPEC_COL_DELETE_FLAG;
}
static bool IsSpecialColumn(const ui32 fieldId) {
return fieldId == (ui32)ESpecialColumn::PLAN_STEP || fieldId == (ui32)ESpecialColumn::TX_ID ||
- fieldId == (ui32)ESpecialColumn::DELETE_FLAG;
+ fieldId == (ui32)ESpecialColumn::WRITE_ID || fieldId == (ui32)ESpecialColumn::DELETE_FLAG;
}
static bool IsNullableVerified(const ui32 /*fieldId*/) {
diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
index 622c022a168..38dc6ffa044 100644
--- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
@@ -497,7 +497,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
ui64 txId = 1;
auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false));
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK[0]->NumChunks(), columnIds.size() + TIndexInfo::GetSnapshotColumnIdsSet().size());
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK[0]->NumChunks(), columnIds.size() + TIndexInfo::GetSnapshotColumnIdsSet().size() - 1);
}
{ // select another pathId