aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2023-10-30 16:21:24 +0300
committerulya-sidorina <yulia@ydb.tech>2023-10-30 17:04:52 +0300
commitd16d15ba3b08594fd1066d200c2d10b10cba1a1a (patch)
tree26f19f0a9b212c881be27cb89a540c523e00d7db
parent9c3bca047d3ede6fbf47cb27e14fe434cc224d5f (diff)
downloadydb-d16d15ba3b08594fd1066d200c2d10b10cba1a1a.tar.gz
KIKIMR-19884: skip null keys for stream lookup join
fix(kqp): skip null keys for stream lookup join
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp45
-rw-r--r--ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp31
2 files changed, 47 insertions, 29 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
index 62613dc376c..b9947b04a65 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
@@ -444,27 +444,29 @@ public:
UnprocessedRows.pop_front();
- std::vector<std::pair<ui64, TOwnedTableRange>> partitions;
- if (joinKey.size() < KeyColumns.size()) {
- // build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf])
- std::vector<TCell> fromCells(KeyColumns.size());
- fromCells.insert(fromCells.begin(), joinKey.begin(), joinKey.end());
- bool fromInclusive = true;
- bool toInclusive = false;
-
- partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(),
- TOwnedTableRange(fromCells, fromInclusive, joinKey, toInclusive)
- );
- } else {
- // full pk, build point
- partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), TOwnedTableRange(joinKey));
- }
-
- for (auto [shardId, range] : partitions) {
- if (range.Point) {
- pointsPerShard[shardId].push_back(std::move(range));
+ if (!joinKey.data()->IsNull()) { // don't use nulls as lookup keys, because null != null
+ std::vector <std::pair<ui64, TOwnedTableRange>> partitions;
+ if (joinKey.size() < KeyColumns.size()) {
+ // build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf])
+ std::vector <TCell> fromCells(KeyColumns.size());
+ fromCells.insert(fromCells.begin(), joinKey.begin(), joinKey.end());
+ bool fromInclusive = true;
+ bool toInclusive = false;
+
+ partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(),
+ TOwnedTableRange(fromCells, fromInclusive, joinKey, toInclusive)
+ );
} else {
- rangesPerShard[shardId].push_back(std::move(range));
+ // full pk, build point
+ partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), TOwnedTableRange(joinKey));
+ }
+
+ for (auto[shardId, range] : partitions) {
+ if (range.Point) {
+ pointsPerShard[shardId].push_back(std::move(range));
+ } else {
+ rangesPerShard[shardId].push_back(std::move(range));
+ }
}
}
@@ -519,7 +521,8 @@ public:
return UnprocessedRows.empty()
&& UnprocessedKeys.empty()
&& PendingKeysByReadId.empty()
- && ReadResults.empty();
+ && ReadResults.empty()
+ && PendingLeftRowsByKey.empty();
}
void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) final {
diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
index 89361b130c3..9c69b5ae257 100644
--- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
+++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
@@ -52,13 +52,16 @@ void PrepareTables(TSession session) {
(2, 102, "Value1"),
(3, 103, "Value2"),
(4, 104, "Value2"),
- (5, 105, "Value3");
+ (5, 105, "Value3"),
+ (6, NULL, "Value6"),
+ (7, NULL, "Value7");
REPLACE INTO `/Root/Right` (Key, Value) VALUES
(100, "Value20"),
(101, "Value21"),
(102, "Value22"),
- (103, "Value23");
+ (103, "Value23"),
+ (NULL, "Value24");
REPLACE INTO `/Root/LaunchByProcessIdAndPinned` (idx_processId, idx_pinned, idx_launchNumber) VALUES
("eProcess", false, 4),
@@ -109,7 +112,7 @@ void Test(const TString& query, const TString& answer, size_t rightTableReads, b
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Left");
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 5);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 7);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(1).name(), "/Root/Right");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(1).reads().rows(), rightTableReads);
} else {
@@ -117,7 +120,7 @@ void Test(const TString& query, const TString& answer, size_t rightTableReads, b
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Left");
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 5);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 7);
UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups
@@ -198,7 +201,9 @@ Y_UNIT_TEST(Left) {
R"([
[[3];[103];["Value2"];[103];["Value23"]];
[[4];[104];["Value2"];#;#];
- [[5];[105];["Value3"];#;#]
+ [[5];[105];["Value3"];#;#];
+ [[6];#;["Value6"];#;#];
+ [[7];#;["Value7"];#;#]
])", 1);
}
@@ -214,7 +219,9 @@ Y_UNIT_TEST(LeftOnly) {
)",
R"([
[[4];[104];["Value2"]];
- [[5];[105];["Value3"]]
+ [[5];[105];["Value3"]];
+ [[6];#;["Value6"]];
+ [[7];#;["Value7"]]
])", 1);
}
@@ -341,7 +348,9 @@ Y_UNIT_TEST_TWIN(SimpleLeftJoin, StreamLookup) {
[[2];[102];["Value1"];[102];["Value22"]];
[[3];[103];["Value2"];[103];["Value23"]];
[[4];[104];["Value2"];#;#];
- [[5];[105];["Value3"];#;#]
+ [[5];[105];["Value3"];#;#];
+ [[6];#;["Value6"];#;#];
+ [[7];#;["Value7"];#;#]
])", 3, StreamLookup);
}
@@ -359,7 +368,9 @@ Y_UNIT_TEST_TWIN(LeftJoinCustomColumnOrder, StreamLookup) {
[["Value22"];[2];[102];["Value1"];[102]];
[["Value23"];[3];[103];["Value2"];[103]];
[#;[4];#;["Value2"];[104]];
- [#;[5];#;["Value3"];[105]]
+ [#;[5];#;["Value3"];[105]];
+ [#;[6];#;["Value6"];#];
+ [#;[7];#;["Value7"];#]
])", 3, StreamLookup);
}
@@ -375,6 +386,8 @@ Y_UNIT_TEST_TWIN(LeftJoinOnlyRightColumn, StreamLookup) {
R"([
[#];
[#];
+ [#];
+ [#];
[["Value21"]];
[["Value22"]];
[["Value23"]]
@@ -391,6 +404,8 @@ Y_UNIT_TEST_TWIN(LeftJoinOnlyLeftColumn, StreamLookup) {
ORDER BY l.Fk;
)",
R"([
+ [#];
+ [#];
[[101]];
[[102]];
[[103]];