diff options
author | Iuliia Sidorina <yulia@ydb.tech> | 2024-07-18 18:24:54 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-18 18:24:54 +0200 |
commit | 6bb1a844163ee900ee36821a9ae355ea16f4a306 (patch) | |
tree | d81351e5e3ce756e126f9f742f8a5d279ad32cec | |
parent | a22a03fda39dcde10f66b73acb1c93016c0ccc57 (diff) | |
download | ydb-6bb1a844163ee900ee36821a9ae355ea16f4a306.tar.gz |
fix(kqp): return one result row per key from stream join actor for left semi strategy (#6642)
-rw-r--r-- | ydb/core/kqp/common/kqp_yql.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_type_ann.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp | 64 | ||||
-rw-r--r-- | ydb/core/protos/kqp_physical.proto | 1 |
8 files changed, 89 insertions, 6 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h index f1a52fc7a8..dfd4bdbc49 100644 --- a/ydb/core/kqp/common/kqp_yql.h +++ b/ydb/core/kqp/common/kqp_yql.h @@ -48,6 +48,7 @@ constexpr TStringBuf KqpTableSinkName = "KqpTableSinkName"; static constexpr std::string_view TKqpStreamLookupStrategyName = "LookupRows"sv; static constexpr std::string_view TKqpStreamLookupJoinStrategyName = "LookupJoinRows"sv; +static constexpr std::string_view TKqpStreamLookupSemiJoinStrategyName = "LookupSemiJoinRows"sv; struct TKqpReadTableSettings { static constexpr TStringBuf SkipNullKeysSettingName = "SkipNullKeys"; diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index ab1c98f876..e9d17c031e 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -502,7 +502,9 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons return TStatus::Error; } - if (lookupStrategy->Content() == TKqpStreamLookupJoinStrategyName) { + if (lookupStrategy->Content() == TKqpStreamLookupJoinStrategyName + || lookupStrategy->Content() == TKqpStreamLookupSemiJoinStrategyName) { + if (!EnsureTupleType(node->Pos(), *lookupType, ctx)) { return TStatus::Error; } @@ -1682,7 +1684,9 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType)); - } else if (lookupStrategy.Value() == TKqpStreamLookupJoinStrategyName) { + } else if (lookupStrategy.Value() == TKqpStreamLookupJoinStrategyName + || lookupStrategy.Value() == TKqpStreamLookupSemiJoinStrategyName) { + if (!EnsureTupleType(node->Pos(), *inputItemType, ctx)) { return TStatus::Error; } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp index a723be2aaa..5c46e5f816 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp @@ -395,11 +395,15 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin( } } + auto strategy = join.JoinType().Value() == "LeftSemi" + ? TKqpStreamLookupSemiJoinStrategyName + : TKqpStreamLookupJoinStrategyName; + TExprBase lookupJoin = Build<TKqlStreamLookupTable>(ctx, join.Pos()) .Table(rightLookup.MainTable) .LookupKeys(leftInput) .Columns(lookupColumns.Cast()) - .LookupStrategy().Build(TKqpStreamLookupJoinStrategyName) + .LookupStrategy().Build(strategy) .Done(); // Stream lookup join output: stream<tuple<left_row_struct, optional<right_row_struct>>> diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 15f65fa957..b3d2cd4930 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -91,6 +91,8 @@ NKqpProto::EStreamLookupStrategy GetStreamLookupStrategy(const std::string_view lookupStrategy = NKqpProto::EStreamLookupStrategy::LOOKUP; } else if (strategy == "LookupJoinRows"sv) { lookupStrategy = NKqpProto::EStreamLookupStrategy::JOIN; + } else if (strategy == "LookupSemiJoinRows"sv) { + lookupStrategy = NKqpProto::EStreamLookupStrategy::SEMI_JOIN; } YQL_ENSURE(lookupStrategy != NKqpProto::EStreamLookupStrategy::UNSPECIFIED, @@ -1275,7 +1277,8 @@ private: break; } - case NKqpProto::EStreamLookupStrategy::JOIN: { + case NKqpProto::EStreamLookupStrategy::JOIN: + case NKqpProto::EStreamLookupStrategy::SEMI_JOIN: { YQL_ENSURE(inputItemType->GetKind() == ETypeAnnotationKind::Tuple); const auto inputTupleType = inputItemType->Cast<TTupleExprType>(); YQL_ENSURE(inputTupleType->GetSize() == 2); diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index 4cfd3ea28c..af995daf05 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -142,7 +142,8 @@ TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSetti , HolderFactory(holderFactory) , InputDesc(inputDesc) , TablePath(settings.GetTable().GetPath()) - , TableId(MakeTableId(settings.GetTable())) { + , TableId(MakeTableId(settings.GetTable())) + , Strategy(settings.GetLookupStrategy()) { KeyColumns.reserve(settings.GetKeyColumns().size()); i32 keyOrder = 0; @@ -748,6 +749,11 @@ public: auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells); YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end()); + if (Strategy == NKqpProto::EStreamLookupStrategy::SEMI_JOIN && leftRowIt->second.RightRowExist) { + // Semi join should return one result row per key + continue; + } + TReadResultStats rowStats; i64 availableSpace = freeSpace - (i64)resultStats.ResultBytesCount; auto resultRow = TryBuildResultRow(leftRowIt->second, row, rowStats, availableSpace, result.ShardId); @@ -962,6 +968,7 @@ std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKq case NKqpProto::EStreamLookupStrategy::LOOKUP: return std::make_unique<TKqpLookupRows>(std::move(settings), typeEnv, holderFactory, inputDesc); case NKqpProto::EStreamLookupStrategy::JOIN: + case NKqpProto::EStreamLookupStrategy::SEMI_JOIN: return std::make_unique<TKqpJoinRows>(std::move(settings), typeEnv, holderFactory, inputDesc); default: return {}; diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h index 46b15745b3..6b9e35a107 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h @@ -71,6 +71,7 @@ protected: std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns; std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns; std::vector<TSysTables::TTableColumnInfo> Columns; + const NKqpProto::EStreamLookupStrategy Strategy; }; std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp index 55ff9b15d3..4a2e667fe7 100644 --- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp @@ -1023,7 +1023,7 @@ Y_UNIT_TEST_TWIN(JoinWithComplexCondition, StreamLookupJoin) { TKikimrSettings serverSettings = TKikimrSettings().SetAppConfig(appConfig);; serverSettings.SetKqpSettings(settings); - TKikimrRunner kikimr(settings); + TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1182,6 +1182,68 @@ Y_UNIT_TEST_TWIN(JoinWithComplexCondition, StreamLookupJoin) { } } +Y_UNIT_TEST_TWIN(LeftSemiJoinWithDuplicatesInRightTable, StreamLookupJoin) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookupJoin); + auto settings = TKikimrSettings().SetAppConfig(appConfig); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { // create tables + const TString query = R"( + CREATE TABLE `/Root/Left` ( + Key1 Int64, + Key2 Int64, + Value String, + PRIMARY KEY (Key1, Key2) + ); + + CREATE TABLE `/Root/Right` ( + Key1 Int64, + Key2 Int64, + Value String, + PRIMARY KEY (Key1, Key2) + ); + )"; + UNIT_ASSERT(session.ExecuteSchemeQuery(query).GetValueSync().IsSuccess()); + } + + { // fill tables + const TString query = R"( + REPLACE INTO `/Root/Left` (Key1, Key2, Value) VALUES + (1, 10, "value1"), + (2, 20, "value2"), + (3, 30, "value3"); + + REPLACE INTO `/Root/Right` (Key1, Key2, Value) VALUES + (10, 100, "value1"), + (10, 101, "value1"), + (10, 102, "value1"), + (20, 200, "value2"), + (20, 201, "value2"), + (30, 300, "value3"); + )"; + UNIT_ASSERT(session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync().IsSuccess()); + } + + { + const TString query = R"( + SELECT l.Key1, l.Key2, l.Value + FROM `/Root/Left` AS l + LEFT SEMI JOIN `/Root/Right` AS r + ON l.Key2 = r.Key1 ORDER BY l.Key1, l.Key2, l.Value + )"; + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + CompareYson(R"([ + [[1];[10];["value1"]]; + [[2];[20];["value2"]]; + [[3];[30];["value3"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } +} + } // suite } // namespace NKqp diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 0ec497cca0..9683814708 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -263,6 +263,7 @@ enum EStreamLookupStrategy { UNSPECIFIED = 0; LOOKUP = 1; JOIN = 2; + SEMI_JOIN = 3; }; message TKqpPhyCnStreamLookup { |