diff options
author | ulya-sidorina <yulia@ydb.tech> | 2023-12-12 14:17:51 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2023-12-12 15:13:50 +0300 |
commit | bd909725025e014433eacc67d889c222e89a9c5f (patch) | |
tree | 965c41e8f0d5024445902f80e2417a34540c0530 | |
parent | 33f6499949b1ffbda1a47a9bc18286c698d997cd (diff) | |
download | ydb-bd909725025e014433eacc67d889c222e89a9c5f.tar.gz |
KIKIMR-19831: exclude right columns from left only stream join result
fix(kqp): exclude right columns from left only join result
-rw-r--r-- | ydb/core/kqp/host/kqp_type_ann.cpp | 26 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_program_builder.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp | 34 |
5 files changed, 65 insertions, 14 deletions
diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index 3a5946e3ebd..a5b919a8400 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -23,6 +23,10 @@ using TStatus = IGraphTransformer::TStatus; namespace { +bool RightJoinSideAllowed(const TStringBuf& joinType) { + return joinType != "LeftOnly"; +} + const TTypeAnnotationNode* MakeKqpEffectType(TExprContext& ctx) { return ctx.MakeType<TResourceExprType>(KqpEffectTag); } @@ -1363,6 +1367,7 @@ TStatus AnnotateStreamIdxLookupJoin(const TExprNode::TPtr& node, TExprContext& c } TCoAtom rightLabel(node->Child(TKqlStreamIdxLookupJoin::idx_RightLabel)); + TCoAtom joinType(node->Child(TKqlStreamIdxLookupJoin::idx_JoinType)); const TStructExprType* leftDataType = leftInputTupleType->GetItems()[1]->Cast<TStructExprType>(); TVector<const TItemExprType*> resultStructItems; @@ -1372,10 +1377,12 @@ TStatus AnnotateStreamIdxLookupJoin(const TExprNode::TPtr& node, TExprContext& c ); } - for (const auto& member : rightDataType->Cast<TStructExprType>()->GetItems()) { - resultStructItems.emplace_back( - ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", member->GetName()), member->GetItemType()) - ); + if (RightJoinSideAllowed(joinType.Value())) { + for (const auto& member : rightDataType->Cast<TStructExprType>()->GetItems()) { + resultStructItems.emplace_back( + ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", member->GetName()), member->GetItemType()) + ); + } } auto rowType = ctx.MakeType<TStructExprType>(resultStructItems); @@ -1686,6 +1693,7 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx) } TCoAtom rightLabel(node->Child(TKqpIndexLookupJoin::idx_RightLabel)); + TCoAtom joinType(node->Child(TKqpIndexLookupJoin::idx_JoinType)); TVector<const TItemExprType*> resultStructItems; for (const auto& item : leftRowType->GetItems()) { @@ -1694,10 +1702,12 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx) ); } - for (const auto& item : rightRowType->Cast<TStructExprType>()->GetItems()) { - resultStructItems.emplace_back( - ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", item->GetName()), item->GetItemType()) - ); + if (RightJoinSideAllowed(joinType.Value())) { + for (const auto& item : rightRowType->Cast<TStructExprType>()->GetItems()) { + resultStructItems.emplace_back( + ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", item->GetName()), item->GetItemType()) + ); + } } auto outputRowType = ctx.MakeType<TStructExprType>(resultStructItems); diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp index 85939d28575..e2bfacaa300 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp @@ -362,7 +362,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext } static THashSet<TStringBuf> supportedJoinKinds = {"Inner", "Left", "LeftOnly", "LeftSemi", "RightSemi"}; - static THashSet<TStringBuf> supportedStreamJoinKinds = {"Inner", "Left"}; + static THashSet<TStringBuf> supportedStreamJoinKinds = {"Inner", "Left", "LeftOnly"}; if (!supportedJoinKinds.contains(join.JoinType().Value())) { return {}; } diff --git a/ydb/core/kqp/runtime/kqp_program_builder.cpp b/ydb/core/kqp/runtime/kqp_program_builder.cpp index dd928eb55df..f240afb4b41 100644 --- a/ydb/core/kqp/runtime/kqp_program_builder.cpp +++ b/ydb/core/kqp/runtime/kqp_program_builder.cpp @@ -159,6 +159,10 @@ EJoinKind GetIndexLookupJoinKind(const TString& joinKind) { } } +bool RightJoinSideAllowed(const TString& joinType) { + return joinType != "LeftOnly"; +} + } // namespace TKqpProgramBuilder::TKqpProgramBuilder(const TTypeEnvironment& env, const IFunctionRegistry& functionRegistry) @@ -347,10 +351,12 @@ TRuntimeNode TKqpProgramBuilder::KqpIndexLookupJoin(const TRuntimeNode& input, c rowTypeBuilder.Add(newMemberName, leftRowType->GetMemberType(i)); } - for (ui32 i = 0; i < rightRowType->GetMembersCount(); ++i) { - TString newMemberName = rightLabel.empty() ? TString(rightRowType->GetMemberName(i)) - : TString::Join(rightLabel, ".", rightRowType->GetMemberName(i)); - rowTypeBuilder.Add(newMemberName, rightRowType->GetMemberType(i)); + if (RightJoinSideAllowed(joinType)) { + for (ui32 i = 0; i < rightRowType->GetMembersCount(); ++i) { + TString newMemberName = rightLabel.empty() ? TString(rightRowType->GetMemberName(i)) + : TString::Join(rightLabel, ".", rightRowType->GetMemberName(i)); + rowTypeBuilder.Add(newMemberName, rightRowType->GetMemberType(i)); + } } auto returnType = NewStreamType(rowTypeBuilder.Build()); diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index ee07f67d163..78dd2ab8e01 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -779,7 +779,8 @@ private: resultRowItems[1] = NUdf::TUnboxedValuePod(); } - rowStats.ReadRowsCount += (rightRowSize > 0 ? 1 : 0); + rowStats.ReadRowsCount += (leftRowInfo.RightRowExist ? 1 : 0); + // TODO: use datashard statistics KIKIMR-16924 rowStats.ReadBytesCount += rightRowSize; rowStats.ResultRowsCount += 1; rowStats.ResultBytesCount += leftRowSize + rightRowSize; diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp index 9c69b5ae257..8bcad00b36d 100644 --- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp @@ -414,6 +414,40 @@ Y_UNIT_TEST_TWIN(LeftJoinOnlyLeftColumn, StreamLookup) { ])", 3, StreamLookup); } +Y_UNIT_TEST_TWIN(SimpleLeftOnlyJoin, StreamLookup) { + Test( + R"( + SELECT l.Key, l.Fk, l.Value + FROM `/Root/Left` AS l + LEFT ONLY JOIN `/Root/Right` AS r + ON l.Fk = r.Key + ORDER BY l.Key + )", + R"([ + [[4];[104];["Value2"]]; + [[5];[105];["Value3"]]; + [[6];#;["Value6"]]; + [[7];#;["Value7"]] + ])", 3, StreamLookup); +} + +Y_UNIT_TEST_TWIN(LeftOnlyJoinValueColumn, StreamLookup) { + Test( + R"( + SELECT l.Value + FROM `/Root/Left` AS l + LEFT ONLY JOIN `/Root/Right` AS r + ON l.Fk = r.Key + ORDER BY l.Value + )", + R"([ + [["Value2"]]; + [["Value3"]]; + [["Value6"]]; + [["Value7"]] + ])", 3, StreamLookup); +} + void CreateSimpleTableWithKeyType(TSession session, const TString& columnType) { using namespace fmt::literals; |