aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitaly Stoyan <vitstn@gmail.com>2022-06-29 19:12:03 +0300
committerVitaly Stoyan <vitstn@gmail.com>2022-06-29 19:12:03 +0300
commitbd2ec263a24c74d94ea5f058e03c0579c42c7994 (patch)
treecfcadbcefff980456ba9eff7dc325f1248423611
parentcb187723f78136217a1a593ac791d47ff89a7e22 (diff)
downloadydb-bd2ec263a24c74d94ea5f058e03c0579c42c7994.tar.gz
YQL-13966 equijoin with const/single predicates
ref:d86ca22ea33fe977e3fec8939d8d62607dee0b75
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_pgselect.cpp147
1 files changed, 125 insertions, 22 deletions
diff --git a/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
index d479160001f..daf628ce666 100644
--- a/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
@@ -23,6 +23,47 @@ TNodeMap<ui32> GatherSubLinks(const TExprNode::TPtr& lambda) {
return subLinks;
}
+TExprNode::TPtr AsFilterPredicate(TPositionHandle pos, const TExprNode::TPtr& expr, TExprContext& ctx) {
+ return ctx.Builder(pos)
+ .Callable("Coalesce")
+ .Callable(0, "FromPg")
+ .Add(0, expr)
+ .Seal()
+ .Callable(1, "Bool")
+ .Atom(0, "0")
+ .Seal()
+ .Seal()
+ .Build();
+}
+
+std::pair<TExprNode::TPtr, TExprNode::TPtr> SplitByPredicate(TPositionHandle pos, const TExprNode::TPtr& input,
+ const TExprNode::TPtr& predicate, const TExprNode::TPtr& args, TExprContext& ctx) {
+ auto coalescedPredicate = AsFilterPredicate(pos, predicate, ctx);
+ auto inversePredicate = ctx.NewCallable(pos, "Not", { coalescedPredicate });
+ auto lambda = ctx.Builder(pos)
+ .Lambda()
+ .Param("row")
+ .ApplyPartial(args, coalescedPredicate)
+ .With(0, "row")
+ .Seal()
+ .Seal()
+ .Build();
+
+ auto inverseLambda = ctx.Builder(pos)
+ .Lambda()
+ .Param("row")
+ .ApplyPartial(args, inversePredicate)
+ .With(0, "row")
+ .Seal()
+ .Seal()
+ .Build();
+
+ return {
+ ctx.NewCallable(pos, "Filter", { input, lambda }),
+ ctx.NewCallable(pos, "Filter", { input, inverseLambda })
+ };
+}
+
TSet<TString> ExtractExternalColumns(const TExprNode& select) {
TSet<TString> res;
const auto& option = select.Head();
@@ -253,17 +294,7 @@ std::pair<TExprNode::TPtr, TExprNode::TPtr> RewriteSubLinks(TPositionHandle pos,
.Build();
}
- filterExpr = ctx.Builder(node->Pos())
- .Callable("Coalesce")
- .Callable(0, "FromPg")
- .Add(0, filterExpr)
- .Seal()
- .Callable(1, "Bool")
- .Atom(0, "0")
- .Seal()
- .Seal()
- .Build();
-
+ filterExpr = AsFilterPredicate(node->Pos(), filterExpr, ctx);
auto filterLambda = ctx.NewLambda(node->Pos(), std::move(filterArgs), std::move(filterExpr));
auto filtered = ctx.Builder(node->Pos())
@@ -938,16 +969,7 @@ TExprNode::TPtr BuildScalarMinus(TPositionHandle pos, const TExprNode::TPtr& lef
TExprNode::TPtr BuildConstPredicateJoin(TPositionHandle pos, TStringBuf joinType, const TExprNode::TPtr& predicate,
const TExprNode::TPtr& cartesian, const TExprNode::TPtr& left, const TExprNode::TPtr& right, TExprContext& ctx) {
- auto coalescedPredicate = ctx.Builder(pos)
- .Callable("Coalesce")
- .Callable(0, "FromPg")
- .Add(0, predicate->TailPtr())
- .Seal()
- .Callable(1, "Bool")
- .Atom(0, "0")
- .Seal()
- .Seal()
- .Build();
+ auto coalescedPredicate = AsFilterPredicate(pos, predicate->TailPtr(), ctx);
auto main = ctx.Builder(pos)
.Callable("If")
@@ -1223,7 +1245,31 @@ std::tuple<TVector<ui32>, TExprNode::TListType> BuildJoinGroups(TPositionHandle
bool bad = false;
TExprNode::TListType leftColumns;
TExprNode::TListType rightColumns;
+ TExprNode::TListType constPredicates;
+ TExprNode::TListType leftInputPredicates;
+ TExprNode::TListType rightInputPredicates;
for (auto& andTerm : andTerms) {
+ if (!IsDepended(*andTerm, predicate->Head().Head())) {
+ constPredicates.push_back(andTerm);
+ continue;
+ }
+
+ YQL_ENSURE(GatherJoinInputs(*andTerm, predicate->Head().Head(), inputIndex - 1, memberToInput, hasLeftInput, hasRightInput));
+ if (!hasLeftInput && !hasRightInput) {
+ bad = true;
+ break;
+ }
+
+ if (hasLeftInput && !hasRightInput) {
+ leftInputPredicates.push_back(andTerm);
+ continue;
+ }
+
+ if (!hasLeftInput && hasRightInput) {
+ rightInputPredicates.push_back(andTerm);
+ continue;
+ }
+
TExprNode::TPtr left, right;
if (!IsEquality(andTerm, left, right)) {
bad = true;
@@ -1259,7 +1305,64 @@ std::tuple<TVector<ui32>, TExprNode::TListType> BuildJoinGroups(TPositionHandle
}
if (!bad) {
- current = BuildEquiJoin(pos, joinType, current, with, leftColumns, rightColumns, ctx);
+ TExprNode::TPtr constPredicate;
+ if (!constPredicates.empty()) {
+ constPredicate = ctx.NewCallable(pos, "And", std::move(constPredicates));
+ }
+
+ TExprNode::TPtr leftInputPredicate;
+ if (!leftInputPredicates.empty()) {
+ leftInputPredicate = ctx.NewCallable(pos, "And", std::move(leftInputPredicates));
+ }
+
+ TExprNode::TPtr rightInputPredicate;
+ if (!rightInputPredicates.empty()) {
+ rightInputPredicate = ctx.NewCallable(pos, "And", std::move(rightInputPredicates));
+ }
+
+ auto left = current;
+ auto right = with;
+ TExprNode::TPtr notLeft;
+ TExprNode::TPtr notRight;
+ if (leftInputPredicate) {
+ std::tie(left, notLeft) = SplitByPredicate(pos, left, leftInputPredicate, predicate->HeadPtr(), ctx);
+ }
+
+ if (rightInputPredicate) {
+ std::tie(right, notRight) = SplitByPredicate(pos, right, rightInputPredicate, predicate->HeadPtr(), ctx);
+ }
+
+ auto joined = BuildEquiJoin(pos, joinType, left, right, leftColumns, rightColumns, ctx);
+ if (notLeft && (joinType == "left" || joinType == "full")) {
+ joined = ctx.NewCallable(pos, "UnionAll", { joined, notLeft });
+ }
+
+ if (notRight && (joinType == "right" || joinType == "full")) {
+ joined = ctx.NewCallable(pos, "UnionAll", { joined, notRight });
+ }
+
+ if (constPredicate) {
+ TExprNode::TPtr elseValue;
+ if (joinType == "inner") {
+ elseValue = ctx.NewCallable(pos, "EmptyList", {});
+ } else if (joinType == "left") {
+ elseValue = current;
+ } else if (joinType == "right") {
+ elseValue = with;
+ } else {
+ elseValue = ctx.NewCallable(pos, "UnionAll", { current, with });
+ }
+
+ joined = ctx.Builder(pos)
+ .Callable("If")
+ .Add(0, AsFilterPredicate(pos, constPredicate, ctx))
+ .Add(1, joined)
+ .Add(2, elseValue)
+ .Seal()
+ .Build();
+ }
+
+ current = joined;
continue;
}
}