diff options
author | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-06-06 14:52:33 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-06-06 14:52:33 +0300 |
commit | bac265decc94a9ea586d82dad5ddaccad9f7c8eb (patch) | |
tree | 1d5020061cc851233137050a9ad1e4c5ef605993 | |
parent | 54177ae9aa8e3734fb098d2ad23a17ec7306279a (diff) | |
download | ydb-bac265decc94a9ea586d82dad5ddaccad9f7c8eb.tar.gz |
22-2: Fixed async index collector KIKIMR-14851
merge from trunk: r9425658
REVIEW: 2526786
x-ydb-stable-ref: 1d02322b31b9982b35be4d0907d65ef203735d17
-rw-r--r-- | ydb/core/tx/datashard/change_collector_async_index.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_collector.cpp | 63 |
2 files changed, 57 insertions, 10 deletions
diff --git a/ydb/core/tx/datashard/change_collector_async_index.cpp b/ydb/core/tx/datashard/change_collector_async_index.cpp index 992014280e..cc00c433e8 100644 --- a/ydb/core/tx/datashard/change_collector_async_index.cpp +++ b/ydb/core/tx/datashard/change_collector_async_index.cpp @@ -142,7 +142,7 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop, } if (generateUpdates) { - bool needUpdate = false; + bool needUpdate = !generateDeletions || rop == ERowOp::Reset; for (const auto tag : index.KeyColumnIds) { if (updatedTagToPos.contains(tag)) { @@ -153,7 +153,6 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop, const auto& column = userTable->Columns.at(tag); if (rop == ERowOp::Reset && !column.IsKey) { - needUpdate = true; FillKeyWithNull(tag, column.Type); } else { Y_VERIFY(tagToPos.contains(tag)); @@ -171,7 +170,6 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop, needUpdate = true; FillDataFromUpdate(tag, updatedTagToPos.at(tag), updates); } else if (rop == ERowOp::Reset) { - needUpdate = true; Y_VERIFY(userTable->Columns.contains(tag)); FillDataWithNull(tag, userTable->Columns.at(tag).Type); } diff --git a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp index 1a48ef0e44..59aef80e46 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp @@ -290,22 +290,23 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeCollector) { tagToName.emplace(tag, column.Name); } - THashMap<TPathId, TString> indexPathIdToName; + THashMap<TString, TPathId> indexNameToPathId; for (const auto& index : entry.Indexes) { const auto& name = index.GetName(); const auto pathId = TPathId(index.GetPathOwnerId(), index.GetLocalPathId()); - indexPathIdToName.emplace(pathId, name); + indexNameToPathId.emplace(name, pathId); } const auto tabletIds = GetTableShards(server, sender, path); UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1); - for (const auto& [pathId, actual] : GetChangeRecordsWithDetails(runtime, sender, tabletIds[0])) { - UNIT_ASSERT(indexPathIdToName.contains(pathId)); - const auto& name = indexPathIdToName.at(pathId); + const auto actualRecords = GetChangeRecordsWithDetails(runtime, sender, tabletIds[0]); + for (const auto& [name, expected] : expectedRecords) { + UNIT_ASSERT(indexNameToPathId.contains(name)); + const auto& pathId = indexNameToPathId.at(name); - UNIT_ASSERT(expectedRecords.contains(name)); - const auto& expected = expectedRecords.at(name); + UNIT_ASSERT(actualRecords.contains(pathId)); + const auto& actual = actualRecords.at(pathId); UNIT_ASSERT_VALUES_EQUAL(expected.size(), actual.size()); for (size_t i = 0; i < expected.size(); ++i) { @@ -506,6 +507,54 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeCollector) { }); } + Y_UNIT_TEST(AllColumnsInPk) { + const auto schema = TShardedTableOptions() + .Columns({ + {"a", "Uint32", true, false}, + {"b", "Uint32", true, false}, + }) + .Indexes({ + {"by_b", {"b"}, {}, NKikimrSchemeOp::EIndexTypeGlobalAsync}, + }); + + Run("/Root/path", schema, TVector<TString>{ + "UPSERT INTO `/Root/path` (a, b) VALUES (1, 10);", + "UPSERT INTO `/Root/path` (a, b) VALUES (1, 20);", + "UPSERT INTO `/Root/path` (a, b) VALUES (1, 10);", + "UPSERT INTO `/Root/path` (a, b) VALUES (2, 10);", + }, { + {"by_b", { + TStructRecord(NTable::ERowOp::Upsert, {{"b", 10}, {"a", 1}}), + TStructRecord(NTable::ERowOp::Upsert, {{"b", 20}, {"a", 1}}), + TStructRecord(NTable::ERowOp::Upsert, {{"b", 10}, {"a", 2}}), + }}, + }); + } + + Y_UNIT_TEST(UpsertWithoutIndexedValue) { + const auto schema = TShardedTableOptions() + .Columns({ + {"a", "Uint32", true, false}, + {"b", "Uint32", true, false}, + {"c", "Uint32", false, false}, + {"d", "Uint32", false, false}, + }) + .Indexes({ + {"by_c", {"c"}, {}, NKikimrSchemeOp::EIndexTypeGlobalAsync}, + }); + + Run("/Root/path", schema, TVector<TString>{ + "UPSERT INTO `/Root/path` (a, b, d) VALUES (1, 10, 10000);", + "UPSERT INTO `/Root/path` (a, b, c) VALUES (1, 10, 1000);", + }, { + {"by_c", { + TStructRecord(NTable::ERowOp::Upsert, {{"c", Null}, {"a", 1}, {"b", 10}}), + TStructRecord(NTable::ERowOp::Erase, {{"c", Null}, {"a", 1}, {"b", 10}}), + TStructRecord(NTable::ERowOp::Upsert, {{"c", 1000}, {"a", 1}, {"b", 10}}), + }}, + }); + } + } // AsyncIndexChangeCollector Y_UNIT_TEST_SUITE(CdcStreamChangeCollector) { |