diff options
author | ulya-sidorina <yulia@ydb.tech> | 2023-10-30 16:21:24 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2023-10-30 17:04:52 +0300 |
commit | d16d15ba3b08594fd1066d200c2d10b10cba1a1a (patch) | |
tree | 26f19f0a9b212c881be27cb89a540c523e00d7db | |
parent | 9c3bca047d3ede6fbf47cb27e14fe434cc224d5f (diff) | |
download | ydb-d16d15ba3b08594fd1066d200c2d10b10cba1a1a.tar.gz |
KIKIMR-19884: skip null keys for stream lookup join
fix(kqp): skip null keys for stream lookup join
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp | 45 | ||||
-rw-r--r-- | ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp | 31 |
2 files changed, 47 insertions, 29 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index 62613dc376c..b9947b04a65 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -444,27 +444,29 @@ public: UnprocessedRows.pop_front(); - std::vector<std::pair<ui64, TOwnedTableRange>> partitions; - if (joinKey.size() < KeyColumns.size()) { - // build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) - std::vector<TCell> fromCells(KeyColumns.size()); - fromCells.insert(fromCells.begin(), joinKey.begin(), joinKey.end()); - bool fromInclusive = true; - bool toInclusive = false; - - partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), - TOwnedTableRange(fromCells, fromInclusive, joinKey, toInclusive) - ); - } else { - // full pk, build point - partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), TOwnedTableRange(joinKey)); - } - - for (auto [shardId, range] : partitions) { - if (range.Point) { - pointsPerShard[shardId].push_back(std::move(range)); + if (!joinKey.data()->IsNull()) { // don't use nulls as lookup keys, because null != null + std::vector <std::pair<ui64, TOwnedTableRange>> partitions; + if (joinKey.size() < KeyColumns.size()) { + // build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) + std::vector <TCell> fromCells(KeyColumns.size()); + fromCells.insert(fromCells.begin(), joinKey.begin(), joinKey.end()); + bool fromInclusive = true; + bool toInclusive = false; + + partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), + TOwnedTableRange(fromCells, fromInclusive, joinKey, toInclusive) + ); } else { - rangesPerShard[shardId].push_back(std::move(range)); + // full pk, build point + partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), TOwnedTableRange(joinKey)); + } + + for (auto[shardId, range] : partitions) { + if (range.Point) { + pointsPerShard[shardId].push_back(std::move(range)); + } else { + rangesPerShard[shardId].push_back(std::move(range)); + } } } @@ -519,7 +521,8 @@ public: return UnprocessedRows.empty() && UnprocessedKeys.empty() && PendingKeysByReadId.empty() - && ReadResults.empty(); + && ReadResults.empty() + && PendingLeftRowsByKey.empty(); } void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) final { diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp index 89361b130c3..9c69b5ae257 100644 --- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp @@ -52,13 +52,16 @@ void PrepareTables(TSession session) { (2, 102, "Value1"), (3, 103, "Value2"), (4, 104, "Value2"), - (5, 105, "Value3"); + (5, 105, "Value3"), + (6, NULL, "Value6"), + (7, NULL, "Value7"); REPLACE INTO `/Root/Right` (Key, Value) VALUES (100, "Value20"), (101, "Value21"), (102, "Value22"), - (103, "Value23"); + (103, "Value23"), + (NULL, "Value24"); REPLACE INTO `/Root/LaunchByProcessIdAndPinned` (idx_processId, idx_pinned, idx_launchNumber) VALUES ("eProcess", false, 4), @@ -109,7 +112,7 @@ void Test(const TString& query, const TString& answer, size_t rightTableReads, b UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 2); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Left"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 5); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 7); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(1).name(), "/Root/Right"); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(1).reads().rows(), rightTableReads); } else { @@ -117,7 +120,7 @@ void Test(const TString& query, const TString& answer, size_t rightTableReads, b UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Left"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 5); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 7); UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups @@ -198,7 +201,9 @@ Y_UNIT_TEST(Left) { R"([ [[3];[103];["Value2"];[103];["Value23"]]; [[4];[104];["Value2"];#;#]; - [[5];[105];["Value3"];#;#] + [[5];[105];["Value3"];#;#]; + [[6];#;["Value6"];#;#]; + [[7];#;["Value7"];#;#] ])", 1); } @@ -214,7 +219,9 @@ Y_UNIT_TEST(LeftOnly) { )", R"([ [[4];[104];["Value2"]]; - [[5];[105];["Value3"]] + [[5];[105];["Value3"]]; + [[6];#;["Value6"]]; + [[7];#;["Value7"]] ])", 1); } @@ -341,7 +348,9 @@ Y_UNIT_TEST_TWIN(SimpleLeftJoin, StreamLookup) { [[2];[102];["Value1"];[102];["Value22"]]; [[3];[103];["Value2"];[103];["Value23"]]; [[4];[104];["Value2"];#;#]; - [[5];[105];["Value3"];#;#] + [[5];[105];["Value3"];#;#]; + [[6];#;["Value6"];#;#]; + [[7];#;["Value7"];#;#] ])", 3, StreamLookup); } @@ -359,7 +368,9 @@ Y_UNIT_TEST_TWIN(LeftJoinCustomColumnOrder, StreamLookup) { [["Value22"];[2];[102];["Value1"];[102]]; [["Value23"];[3];[103];["Value2"];[103]]; [#;[4];#;["Value2"];[104]]; - [#;[5];#;["Value3"];[105]] + [#;[5];#;["Value3"];[105]]; + [#;[6];#;["Value6"];#]; + [#;[7];#;["Value7"];#] ])", 3, StreamLookup); } @@ -375,6 +386,8 @@ Y_UNIT_TEST_TWIN(LeftJoinOnlyRightColumn, StreamLookup) { R"([ [#]; [#]; + [#]; + [#]; [["Value21"]]; [["Value22"]]; [["Value23"]] @@ -391,6 +404,8 @@ Y_UNIT_TEST_TWIN(LeftJoinOnlyLeftColumn, StreamLookup) { ORDER BY l.Fk; )", R"([ + [#]; + [#]; [[101]]; [[102]]; [[103]]; |