diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-03-06 18:34:16 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-03-06 18:34:16 +0300 |
commit | b112ecc27f19aaa1069ccccebfd4d743152f9964 (patch) | |
tree | 7f61a7d92445ab0eef8a5fb39ceffbab830538f9 | |
parent | b78205b7cd86eb78e7be560fe47a2ff48e35a9ed (diff) | |
download | ydb-b112ecc27f19aaa1069ccccebfd4d743152f9964.tar.gz |
Implicitly update covered column
-rw-r--r-- | ydb/core/tx/datashard/change_collector_async_index.cpp | 16 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_collector_async_index.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_collector.cpp | 23 |
3 files changed, 38 insertions, 2 deletions
diff --git a/ydb/core/tx/datashard/change_collector_async_index.cpp b/ydb/core/tx/datashard/change_collector_async_index.cpp index e3fbc35ca7a..8f899d93216 100644 --- a/ydb/core/tx/datashard/change_collector_async_index.cpp +++ b/ydb/core/tx/datashard/change_collector_async_index.cpp @@ -162,9 +162,16 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop, if (updatedTagToPos.contains(tag)) { needUpdate = true; FillDataFromUpdate(tag, updatedTagToPos.at(tag), updates); - } else if (rop == ERowOp::Reset) { + } else { Y_VERIFY(userTable->Columns.contains(tag)); - FillDataWithNull(tag, userTable->Columns.at(tag).Type); + const auto& column = userTable->Columns.at(tag); + + if (rop == ERowOp::Reset && !column.IsKey) { + FillDataWithNull(tag, column.Type); + } else { + Y_VERIFY(tagToPos.contains(tag)); + FillDataFromRowState(tag, tagToPos.at(tag), row, column.Type); + } } } @@ -250,6 +257,11 @@ void TAsyncIndexChangeCollector::FillKeyWithNull(TTag tag, NScheme::TTypeInfo ty TagsSeen.insert(tag); } +void TAsyncIndexChangeCollector::FillDataFromRowState(TTag tag, TPos pos, const TRowState& rowState, NScheme::TTypeInfo type) { + Y_VERIFY(pos < rowState.Size()); + IndexDataVals.emplace_back(tag, ECellOp::Set, TRawTypeValue(rowState.Get(pos).AsRef(), type)); +} + void TAsyncIndexChangeCollector::FillDataFromUpdate(TTag tag, TPos pos, TArrayRef<const TUpdateOp> updates) { Y_VERIFY(pos < updates.size()); diff --git a/ydb/core/tx/datashard/change_collector_async_index.h b/ydb/core/tx/datashard/change_collector_async_index.h index bb280b38caa..6090674814a 100644 --- a/ydb/core/tx/datashard/change_collector_async_index.h +++ b/ydb/core/tx/datashard/change_collector_async_index.h @@ -34,6 +34,7 @@ class TAsyncIndexChangeCollector: public TBaseChangeCollector { void FillKeyFromKey(NTable::TTag tag, NTable::TPos pos, TArrayRef<const TRawTypeValue> key); void FillKeyFromUpdate(NTable::TTag tag, NTable::TPos pos, TArrayRef<const NTable::TUpdateOp> updates); void FillKeyWithNull(NTable::TTag tag, NScheme::TTypeInfo type); + void FillDataFromRowState(NTable::TTag tag, NTable::TPos pos, const NTable::TRowState& rowState, NScheme::TTypeInfo type); void FillDataFromUpdate(NTable::TTag tag, NTable::TPos pos, TArrayRef<const NTable::TUpdateOp> updates); void FillDataWithNull(NTable::TTag tag, NScheme::TTypeInfo type); diff --git a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp index b6a70d3c3aa..32753c722a9 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp @@ -579,6 +579,29 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeCollector) { }); } + Y_UNIT_TEST(ImplicitlyUpdateCoveredColumn) { + const auto schema = TShardedTableOptions() + .Columns({ + {"a", "Uint32", true, false}, + {"b", "Uint32", false, false}, + {"c", "Uint32", false, false}, + }) + .Indexes({ + {"by_b", {"b"}, {"c"}, NKikimrSchemeOp::EIndexTypeGlobalAsync}, + }); + + Run("/Root/path", schema, TVector<TString>{ + "UPSERT INTO `/Root/path` (a, b, c) VALUES (1, 10, 100);", + "UPSERT INTO `/Root/path` (a, b) VALUES (1, 20);", + }, { + {"by_b", { + TStructRecord(NTable::ERowOp::Upsert, {{"b", 10}, {"a", 1}}, {{"c", 100}}), + TStructRecord(NTable::ERowOp::Erase, {{"b", 10}, {"a", 1}}), + TStructRecord(NTable::ERowOp::Upsert, {{"b", 20}, {"a", 1}}, {{"c", 100}}), + }}, + }); + } + } // AsyncIndexChangeCollector Y_UNIT_TEST_SUITE(CdcStreamChangeCollector) { |