aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-03-06 18:34:16 +0300
committerilnaz <ilnaz@ydb.tech>2023-03-06 18:34:16 +0300
commitb112ecc27f19aaa1069ccccebfd4d743152f9964 (patch)
tree7f61a7d92445ab0eef8a5fb39ceffbab830538f9
parentb78205b7cd86eb78e7be560fe47a2ff48e35a9ed (diff)
downloadydb-b112ecc27f19aaa1069ccccebfd4d743152f9964.tar.gz
Implicitly update covered column
-rw-r--r--ydb/core/tx/datashard/change_collector_async_index.cpp16
-rw-r--r--ydb/core/tx/datashard/change_collector_async_index.h1
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_collector.cpp23
3 files changed, 38 insertions, 2 deletions
diff --git a/ydb/core/tx/datashard/change_collector_async_index.cpp b/ydb/core/tx/datashard/change_collector_async_index.cpp
index e3fbc35ca7a..8f899d93216 100644
--- a/ydb/core/tx/datashard/change_collector_async_index.cpp
+++ b/ydb/core/tx/datashard/change_collector_async_index.cpp
@@ -162,9 +162,16 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
if (updatedTagToPos.contains(tag)) {
needUpdate = true;
FillDataFromUpdate(tag, updatedTagToPos.at(tag), updates);
- } else if (rop == ERowOp::Reset) {
+ } else {
Y_VERIFY(userTable->Columns.contains(tag));
- FillDataWithNull(tag, userTable->Columns.at(tag).Type);
+ const auto& column = userTable->Columns.at(tag);
+
+ if (rop == ERowOp::Reset && !column.IsKey) {
+ FillDataWithNull(tag, column.Type);
+ } else {
+ Y_VERIFY(tagToPos.contains(tag));
+ FillDataFromRowState(tag, tagToPos.at(tag), row, column.Type);
+ }
}
}
@@ -250,6 +257,11 @@ void TAsyncIndexChangeCollector::FillKeyWithNull(TTag tag, NScheme::TTypeInfo ty
TagsSeen.insert(tag);
}
+void TAsyncIndexChangeCollector::FillDataFromRowState(TTag tag, TPos pos, const TRowState& rowState, NScheme::TTypeInfo type) {
+ Y_VERIFY(pos < rowState.Size());
+ IndexDataVals.emplace_back(tag, ECellOp::Set, TRawTypeValue(rowState.Get(pos).AsRef(), type));
+}
+
void TAsyncIndexChangeCollector::FillDataFromUpdate(TTag tag, TPos pos, TArrayRef<const TUpdateOp> updates) {
Y_VERIFY(pos < updates.size());
diff --git a/ydb/core/tx/datashard/change_collector_async_index.h b/ydb/core/tx/datashard/change_collector_async_index.h
index bb280b38caa..6090674814a 100644
--- a/ydb/core/tx/datashard/change_collector_async_index.h
+++ b/ydb/core/tx/datashard/change_collector_async_index.h
@@ -34,6 +34,7 @@ class TAsyncIndexChangeCollector: public TBaseChangeCollector {
void FillKeyFromKey(NTable::TTag tag, NTable::TPos pos, TArrayRef<const TRawTypeValue> key);
void FillKeyFromUpdate(NTable::TTag tag, NTable::TPos pos, TArrayRef<const NTable::TUpdateOp> updates);
void FillKeyWithNull(NTable::TTag tag, NScheme::TTypeInfo type);
+ void FillDataFromRowState(NTable::TTag tag, NTable::TPos pos, const NTable::TRowState& rowState, NScheme::TTypeInfo type);
void FillDataFromUpdate(NTable::TTag tag, NTable::TPos pos, TArrayRef<const NTable::TUpdateOp> updates);
void FillDataWithNull(NTable::TTag tag, NScheme::TTypeInfo type);
diff --git a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
index b6a70d3c3aa..32753c722a9 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
@@ -579,6 +579,29 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeCollector) {
});
}
+ Y_UNIT_TEST(ImplicitlyUpdateCoveredColumn) {
+ const auto schema = TShardedTableOptions()
+ .Columns({
+ {"a", "Uint32", true, false},
+ {"b", "Uint32", false, false},
+ {"c", "Uint32", false, false},
+ })
+ .Indexes({
+ {"by_b", {"b"}, {"c"}, NKikimrSchemeOp::EIndexTypeGlobalAsync},
+ });
+
+ Run("/Root/path", schema, TVector<TString>{
+ "UPSERT INTO `/Root/path` (a, b, c) VALUES (1, 10, 100);",
+ "UPSERT INTO `/Root/path` (a, b) VALUES (1, 20);",
+ }, {
+ {"by_b", {
+ TStructRecord(NTable::ERowOp::Upsert, {{"b", 10}, {"a", 1}}, {{"c", 100}}),
+ TStructRecord(NTable::ERowOp::Erase, {{"b", 10}, {"a", 1}}),
+ TStructRecord(NTable::ERowOp::Upsert, {{"b", 20}, {"a", 1}}, {{"c", 100}}),
+ }},
+ });
+ }
+
} // AsyncIndexChangeCollector
Y_UNIT_TEST_SUITE(CdcStreamChangeCollector) {