aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-06-06 14:52:33 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-06-06 14:52:33 +0300
commitbac265decc94a9ea586d82dad5ddaccad9f7c8eb (patch)
tree1d5020061cc851233137050a9ad1e4c5ef605993
parent54177ae9aa8e3734fb098d2ad23a17ec7306279a (diff)
downloadydb-bac265decc94a9ea586d82dad5ddaccad9f7c8eb.tar.gz
22-2: Fixed async index collector KIKIMR-14851
merge from trunk: r9425658 REVIEW: 2526786 x-ydb-stable-ref: 1d02322b31b9982b35be4d0907d65ef203735d17
-rw-r--r--ydb/core/tx/datashard/change_collector_async_index.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_collector.cpp63
2 files changed, 57 insertions, 10 deletions
diff --git a/ydb/core/tx/datashard/change_collector_async_index.cpp b/ydb/core/tx/datashard/change_collector_async_index.cpp
index 992014280e..cc00c433e8 100644
--- a/ydb/core/tx/datashard/change_collector_async_index.cpp
+++ b/ydb/core/tx/datashard/change_collector_async_index.cpp
@@ -142,7 +142,7 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
}
if (generateUpdates) {
- bool needUpdate = false;
+ bool needUpdate = !generateDeletions || rop == ERowOp::Reset;
for (const auto tag : index.KeyColumnIds) {
if (updatedTagToPos.contains(tag)) {
@@ -153,7 +153,6 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
const auto& column = userTable->Columns.at(tag);
if (rop == ERowOp::Reset && !column.IsKey) {
- needUpdate = true;
FillKeyWithNull(tag, column.Type);
} else {
Y_VERIFY(tagToPos.contains(tag));
@@ -171,7 +170,6 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
needUpdate = true;
FillDataFromUpdate(tag, updatedTagToPos.at(tag), updates);
} else if (rop == ERowOp::Reset) {
- needUpdate = true;
Y_VERIFY(userTable->Columns.contains(tag));
FillDataWithNull(tag, userTable->Columns.at(tag).Type);
}
diff --git a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
index 1a48ef0e44..59aef80e46 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
@@ -290,22 +290,23 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeCollector) {
tagToName.emplace(tag, column.Name);
}
- THashMap<TPathId, TString> indexPathIdToName;
+ THashMap<TString, TPathId> indexNameToPathId;
for (const auto& index : entry.Indexes) {
const auto& name = index.GetName();
const auto pathId = TPathId(index.GetPathOwnerId(), index.GetLocalPathId());
- indexPathIdToName.emplace(pathId, name);
+ indexNameToPathId.emplace(name, pathId);
}
const auto tabletIds = GetTableShards(server, sender, path);
UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);
- for (const auto& [pathId, actual] : GetChangeRecordsWithDetails(runtime, sender, tabletIds[0])) {
- UNIT_ASSERT(indexPathIdToName.contains(pathId));
- const auto& name = indexPathIdToName.at(pathId);
+ const auto actualRecords = GetChangeRecordsWithDetails(runtime, sender, tabletIds[0]);
+ for (const auto& [name, expected] : expectedRecords) {
+ UNIT_ASSERT(indexNameToPathId.contains(name));
+ const auto& pathId = indexNameToPathId.at(name);
- UNIT_ASSERT(expectedRecords.contains(name));
- const auto& expected = expectedRecords.at(name);
+ UNIT_ASSERT(actualRecords.contains(pathId));
+ const auto& actual = actualRecords.at(pathId);
UNIT_ASSERT_VALUES_EQUAL(expected.size(), actual.size());
for (size_t i = 0; i < expected.size(); ++i) {
@@ -506,6 +507,54 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeCollector) {
});
}
+ Y_UNIT_TEST(AllColumnsInPk) {
+ const auto schema = TShardedTableOptions()
+ .Columns({
+ {"a", "Uint32", true, false},
+ {"b", "Uint32", true, false},
+ })
+ .Indexes({
+ {"by_b", {"b"}, {}, NKikimrSchemeOp::EIndexTypeGlobalAsync},
+ });
+
+ Run("/Root/path", schema, TVector<TString>{
+ "UPSERT INTO `/Root/path` (a, b) VALUES (1, 10);",
+ "UPSERT INTO `/Root/path` (a, b) VALUES (1, 20);",
+ "UPSERT INTO `/Root/path` (a, b) VALUES (1, 10);",
+ "UPSERT INTO `/Root/path` (a, b) VALUES (2, 10);",
+ }, {
+ {"by_b", {
+ TStructRecord(NTable::ERowOp::Upsert, {{"b", 10}, {"a", 1}}),
+ TStructRecord(NTable::ERowOp::Upsert, {{"b", 20}, {"a", 1}}),
+ TStructRecord(NTable::ERowOp::Upsert, {{"b", 10}, {"a", 2}}),
+ }},
+ });
+ }
+
+ Y_UNIT_TEST(UpsertWithoutIndexedValue) {
+ const auto schema = TShardedTableOptions()
+ .Columns({
+ {"a", "Uint32", true, false},
+ {"b", "Uint32", true, false},
+ {"c", "Uint32", false, false},
+ {"d", "Uint32", false, false},
+ })
+ .Indexes({
+ {"by_c", {"c"}, {}, NKikimrSchemeOp::EIndexTypeGlobalAsync},
+ });
+
+ Run("/Root/path", schema, TVector<TString>{
+ "UPSERT INTO `/Root/path` (a, b, d) VALUES (1, 10, 10000);",
+ "UPSERT INTO `/Root/path` (a, b, c) VALUES (1, 10, 1000);",
+ }, {
+ {"by_c", {
+ TStructRecord(NTable::ERowOp::Upsert, {{"c", Null}, {"a", 1}, {"b", 10}}),
+ TStructRecord(NTable::ERowOp::Erase, {{"c", Null}, {"a", 1}, {"b", 10}}),
+ TStructRecord(NTable::ERowOp::Upsert, {{"c", 1000}, {"a", 1}, {"b", 10}}),
+ }},
+ });
+ }
+
} // AsyncIndexChangeCollector
Y_UNIT_TEST_SUITE(CdcStreamChangeCollector) {