aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2023-12-12 14:17:51 +0300
committerulya-sidorina <yulia@ydb.tech>2023-12-12 15:13:50 +0300
commitbd909725025e014433eacc67d889c222e89a9c5f (patch)
tree965c41e8f0d5024445902f80e2417a34540c0530
parent33f6499949b1ffbda1a47a9bc18286c698d997cd (diff)
downloadydb-bd909725025e014433eacc67d889c222e89a9c5f.tar.gz
KIKIMR-19831: exclude right columns from left only stream join result
fix(kqp): exclude right columns from left only join result
-rw-r--r--ydb/core/kqp/host/kqp_type_ann.cpp26
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp2
-rw-r--r--ydb/core/kqp/runtime/kqp_program_builder.cpp14
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp3
-rw-r--r--ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp34
5 files changed, 65 insertions, 14 deletions
diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp
index 3a5946e3ebd..a5b919a8400 100644
--- a/ydb/core/kqp/host/kqp_type_ann.cpp
+++ b/ydb/core/kqp/host/kqp_type_ann.cpp
@@ -23,6 +23,10 @@ using TStatus = IGraphTransformer::TStatus;
namespace {
+bool RightJoinSideAllowed(const TStringBuf& joinType) {
+ return joinType != "LeftOnly";
+}
+
const TTypeAnnotationNode* MakeKqpEffectType(TExprContext& ctx) {
return ctx.MakeType<TResourceExprType>(KqpEffectTag);
}
@@ -1363,6 +1367,7 @@ TStatus AnnotateStreamIdxLookupJoin(const TExprNode::TPtr& node, TExprContext& c
}
TCoAtom rightLabel(node->Child(TKqlStreamIdxLookupJoin::idx_RightLabel));
+ TCoAtom joinType(node->Child(TKqlStreamIdxLookupJoin::idx_JoinType));
const TStructExprType* leftDataType = leftInputTupleType->GetItems()[1]->Cast<TStructExprType>();
TVector<const TItemExprType*> resultStructItems;
@@ -1372,10 +1377,12 @@ TStatus AnnotateStreamIdxLookupJoin(const TExprNode::TPtr& node, TExprContext& c
);
}
- for (const auto& member : rightDataType->Cast<TStructExprType>()->GetItems()) {
- resultStructItems.emplace_back(
- ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", member->GetName()), member->GetItemType())
- );
+ if (RightJoinSideAllowed(joinType.Value())) {
+ for (const auto& member : rightDataType->Cast<TStructExprType>()->GetItems()) {
+ resultStructItems.emplace_back(
+ ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", member->GetName()), member->GetItemType())
+ );
+ }
}
auto rowType = ctx.MakeType<TStructExprType>(resultStructItems);
@@ -1686,6 +1693,7 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
}
TCoAtom rightLabel(node->Child(TKqpIndexLookupJoin::idx_RightLabel));
+ TCoAtom joinType(node->Child(TKqpIndexLookupJoin::idx_JoinType));
TVector<const TItemExprType*> resultStructItems;
for (const auto& item : leftRowType->GetItems()) {
@@ -1694,10 +1702,12 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
);
}
- for (const auto& item : rightRowType->Cast<TStructExprType>()->GetItems()) {
- resultStructItems.emplace_back(
- ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", item->GetName()), item->GetItemType())
- );
+ if (RightJoinSideAllowed(joinType.Value())) {
+ for (const auto& item : rightRowType->Cast<TStructExprType>()->GetItems()) {
+ resultStructItems.emplace_back(
+ ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", item->GetName()), item->GetItemType())
+ );
+ }
}
auto outputRowType = ctx.MakeType<TStructExprType>(resultStructItems);
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
index 85939d28575..e2bfacaa300 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
@@ -362,7 +362,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
}
static THashSet<TStringBuf> supportedJoinKinds = {"Inner", "Left", "LeftOnly", "LeftSemi", "RightSemi"};
- static THashSet<TStringBuf> supportedStreamJoinKinds = {"Inner", "Left"};
+ static THashSet<TStringBuf> supportedStreamJoinKinds = {"Inner", "Left", "LeftOnly"};
if (!supportedJoinKinds.contains(join.JoinType().Value())) {
return {};
}
diff --git a/ydb/core/kqp/runtime/kqp_program_builder.cpp b/ydb/core/kqp/runtime/kqp_program_builder.cpp
index dd928eb55df..f240afb4b41 100644
--- a/ydb/core/kqp/runtime/kqp_program_builder.cpp
+++ b/ydb/core/kqp/runtime/kqp_program_builder.cpp
@@ -159,6 +159,10 @@ EJoinKind GetIndexLookupJoinKind(const TString& joinKind) {
}
}
+bool RightJoinSideAllowed(const TString& joinType) {
+ return joinType != "LeftOnly";
+}
+
} // namespace
TKqpProgramBuilder::TKqpProgramBuilder(const TTypeEnvironment& env, const IFunctionRegistry& functionRegistry)
@@ -347,10 +351,12 @@ TRuntimeNode TKqpProgramBuilder::KqpIndexLookupJoin(const TRuntimeNode& input, c
rowTypeBuilder.Add(newMemberName, leftRowType->GetMemberType(i));
}
- for (ui32 i = 0; i < rightRowType->GetMembersCount(); ++i) {
- TString newMemberName = rightLabel.empty() ? TString(rightRowType->GetMemberName(i))
- : TString::Join(rightLabel, ".", rightRowType->GetMemberName(i));
- rowTypeBuilder.Add(newMemberName, rightRowType->GetMemberType(i));
+ if (RightJoinSideAllowed(joinType)) {
+ for (ui32 i = 0; i < rightRowType->GetMembersCount(); ++i) {
+ TString newMemberName = rightLabel.empty() ? TString(rightRowType->GetMemberName(i))
+ : TString::Join(rightLabel, ".", rightRowType->GetMemberName(i));
+ rowTypeBuilder.Add(newMemberName, rightRowType->GetMemberType(i));
+ }
}
auto returnType = NewStreamType(rowTypeBuilder.Build());
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
index ee07f67d163..78dd2ab8e01 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
@@ -779,7 +779,8 @@ private:
resultRowItems[1] = NUdf::TUnboxedValuePod();
}
- rowStats.ReadRowsCount += (rightRowSize > 0 ? 1 : 0);
+ rowStats.ReadRowsCount += (leftRowInfo.RightRowExist ? 1 : 0);
+ // TODO: use datashard statistics KIKIMR-16924
rowStats.ReadBytesCount += rightRowSize;
rowStats.ResultRowsCount += 1;
rowStats.ResultBytesCount += leftRowSize + rightRowSize;
diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
index 9c69b5ae257..8bcad00b36d 100644
--- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
+++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
@@ -414,6 +414,40 @@ Y_UNIT_TEST_TWIN(LeftJoinOnlyLeftColumn, StreamLookup) {
])", 3, StreamLookup);
}
+Y_UNIT_TEST_TWIN(SimpleLeftOnlyJoin, StreamLookup) {
+ Test(
+ R"(
+ SELECT l.Key, l.Fk, l.Value
+ FROM `/Root/Left` AS l
+ LEFT ONLY JOIN `/Root/Right` AS r
+ ON l.Fk = r.Key
+ ORDER BY l.Key
+ )",
+ R"([
+ [[4];[104];["Value2"]];
+ [[5];[105];["Value3"]];
+ [[6];#;["Value6"]];
+ [[7];#;["Value7"]]
+ ])", 3, StreamLookup);
+}
+
+Y_UNIT_TEST_TWIN(LeftOnlyJoinValueColumn, StreamLookup) {
+ Test(
+ R"(
+ SELECT l.Value
+ FROM `/Root/Left` AS l
+ LEFT ONLY JOIN `/Root/Right` AS r
+ ON l.Fk = r.Key
+ ORDER BY l.Value
+ )",
+ R"([
+ [["Value2"]];
+ [["Value3"]];
+ [["Value6"]];
+ [["Value7"]]
+ ])", 3, StreamLookup);
+}
+
void CreateSimpleTableWithKeyType(TSession session, const TString& columnType) {
using namespace fmt::literals;