diff options
author | Vitaly Stoyan <vitstn@gmail.com> | 2022-06-29 19:12:03 +0300 |
---|---|---|
committer | Vitaly Stoyan <vitstn@gmail.com> | 2022-06-29 19:12:03 +0300 |
commit | bd2ec263a24c74d94ea5f058e03c0579c42c7994 (patch) | |
tree | cfcadbcefff980456ba9eff7dc325f1248423611 | |
parent | cb187723f78136217a1a593ac791d47ff89a7e22 (diff) | |
download | ydb-bd2ec263a24c74d94ea5f058e03c0579c42c7994.tar.gz |
YQL-13966 equijoin with const/single predicates
ref:d86ca22ea33fe977e3fec8939d8d62607dee0b75
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_co_pgselect.cpp | 147 |
1 files changed, 125 insertions, 22 deletions
diff --git a/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp index d479160001f..daf628ce666 100644 --- a/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp @@ -23,6 +23,47 @@ TNodeMap<ui32> GatherSubLinks(const TExprNode::TPtr& lambda) { return subLinks; } +TExprNode::TPtr AsFilterPredicate(TPositionHandle pos, const TExprNode::TPtr& expr, TExprContext& ctx) { + return ctx.Builder(pos) + .Callable("Coalesce") + .Callable(0, "FromPg") + .Add(0, expr) + .Seal() + .Callable(1, "Bool") + .Atom(0, "0") + .Seal() + .Seal() + .Build(); +} + +std::pair<TExprNode::TPtr, TExprNode::TPtr> SplitByPredicate(TPositionHandle pos, const TExprNode::TPtr& input, + const TExprNode::TPtr& predicate, const TExprNode::TPtr& args, TExprContext& ctx) { + auto coalescedPredicate = AsFilterPredicate(pos, predicate, ctx); + auto inversePredicate = ctx.NewCallable(pos, "Not", { coalescedPredicate }); + auto lambda = ctx.Builder(pos) + .Lambda() + .Param("row") + .ApplyPartial(args, coalescedPredicate) + .With(0, "row") + .Seal() + .Seal() + .Build(); + + auto inverseLambda = ctx.Builder(pos) + .Lambda() + .Param("row") + .ApplyPartial(args, inversePredicate) + .With(0, "row") + .Seal() + .Seal() + .Build(); + + return { + ctx.NewCallable(pos, "Filter", { input, lambda }), + ctx.NewCallable(pos, "Filter", { input, inverseLambda }) + }; +} + TSet<TString> ExtractExternalColumns(const TExprNode& select) { TSet<TString> res; const auto& option = select.Head(); @@ -253,17 +294,7 @@ std::pair<TExprNode::TPtr, TExprNode::TPtr> RewriteSubLinks(TPositionHandle pos, .Build(); } - filterExpr = ctx.Builder(node->Pos()) - .Callable("Coalesce") - .Callable(0, "FromPg") - .Add(0, filterExpr) - .Seal() - .Callable(1, "Bool") - .Atom(0, "0") - .Seal() - .Seal() - .Build(); - + filterExpr = AsFilterPredicate(node->Pos(), filterExpr, ctx); auto filterLambda = ctx.NewLambda(node->Pos(), std::move(filterArgs), std::move(filterExpr)); auto filtered = ctx.Builder(node->Pos()) @@ -938,16 +969,7 @@ TExprNode::TPtr BuildScalarMinus(TPositionHandle pos, const TExprNode::TPtr& lef TExprNode::TPtr BuildConstPredicateJoin(TPositionHandle pos, TStringBuf joinType, const TExprNode::TPtr& predicate, const TExprNode::TPtr& cartesian, const TExprNode::TPtr& left, const TExprNode::TPtr& right, TExprContext& ctx) { - auto coalescedPredicate = ctx.Builder(pos) - .Callable("Coalesce") - .Callable(0, "FromPg") - .Add(0, predicate->TailPtr()) - .Seal() - .Callable(1, "Bool") - .Atom(0, "0") - .Seal() - .Seal() - .Build(); + auto coalescedPredicate = AsFilterPredicate(pos, predicate->TailPtr(), ctx); auto main = ctx.Builder(pos) .Callable("If") @@ -1223,7 +1245,31 @@ std::tuple<TVector<ui32>, TExprNode::TListType> BuildJoinGroups(TPositionHandle bool bad = false; TExprNode::TListType leftColumns; TExprNode::TListType rightColumns; + TExprNode::TListType constPredicates; + TExprNode::TListType leftInputPredicates; + TExprNode::TListType rightInputPredicates; for (auto& andTerm : andTerms) { + if (!IsDepended(*andTerm, predicate->Head().Head())) { + constPredicates.push_back(andTerm); + continue; + } + + YQL_ENSURE(GatherJoinInputs(*andTerm, predicate->Head().Head(), inputIndex - 1, memberToInput, hasLeftInput, hasRightInput)); + if (!hasLeftInput && !hasRightInput) { + bad = true; + break; + } + + if (hasLeftInput && !hasRightInput) { + leftInputPredicates.push_back(andTerm); + continue; + } + + if (!hasLeftInput && hasRightInput) { + rightInputPredicates.push_back(andTerm); + continue; + } + TExprNode::TPtr left, right; if (!IsEquality(andTerm, left, right)) { bad = true; @@ -1259,7 +1305,64 @@ std::tuple<TVector<ui32>, TExprNode::TListType> BuildJoinGroups(TPositionHandle } if (!bad) { - current = BuildEquiJoin(pos, joinType, current, with, leftColumns, rightColumns, ctx); + TExprNode::TPtr constPredicate; + if (!constPredicates.empty()) { + constPredicate = ctx.NewCallable(pos, "And", std::move(constPredicates)); + } + + TExprNode::TPtr leftInputPredicate; + if (!leftInputPredicates.empty()) { + leftInputPredicate = ctx.NewCallable(pos, "And", std::move(leftInputPredicates)); + } + + TExprNode::TPtr rightInputPredicate; + if (!rightInputPredicates.empty()) { + rightInputPredicate = ctx.NewCallable(pos, "And", std::move(rightInputPredicates)); + } + + auto left = current; + auto right = with; + TExprNode::TPtr notLeft; + TExprNode::TPtr notRight; + if (leftInputPredicate) { + std::tie(left, notLeft) = SplitByPredicate(pos, left, leftInputPredicate, predicate->HeadPtr(), ctx); + } + + if (rightInputPredicate) { + std::tie(right, notRight) = SplitByPredicate(pos, right, rightInputPredicate, predicate->HeadPtr(), ctx); + } + + auto joined = BuildEquiJoin(pos, joinType, left, right, leftColumns, rightColumns, ctx); + if (notLeft && (joinType == "left" || joinType == "full")) { + joined = ctx.NewCallable(pos, "UnionAll", { joined, notLeft }); + } + + if (notRight && (joinType == "right" || joinType == "full")) { + joined = ctx.NewCallable(pos, "UnionAll", { joined, notRight }); + } + + if (constPredicate) { + TExprNode::TPtr elseValue; + if (joinType == "inner") { + elseValue = ctx.NewCallable(pos, "EmptyList", {}); + } else if (joinType == "left") { + elseValue = current; + } else if (joinType == "right") { + elseValue = with; + } else { + elseValue = ctx.NewCallable(pos, "UnionAll", { current, with }); + } + + joined = ctx.Builder(pos) + .Callable("If") + .Add(0, AsFilterPredicate(pos, constPredicate, ctx)) + .Add(1, joined) + .Add(2, elseValue) + .Seal() + .Build(); + } + + current = joined; continue; } } |