diff options
author | Vitaly Stoyan <vitstn@gmail.com> | 2022-06-14 18:54:45 +0300 |
---|---|---|
committer | Vitaly Stoyan <vitstn@gmail.com> | 2022-06-14 18:54:45 +0300 |
commit | 8c48d1576a569a05856a8d2dde58b9d6038966fb (patch) | |
tree | b00eb60b537ea5e9ca07321f95e612f80ea6f015 | |
parent | db0e3b59f3d00bff597665856acd9c956d1fe71f (diff) | |
download | ydb-8c48d1576a569a05856a8d2dde58b9d6038966fb.tar.gz |
YQL-13966 convert some filters over cross join into inner joins
ref:04ab2fb7c5f18c7997550433d3643d4e1dbb92e2
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_co_flow2.cpp | 219 |
1 files changed, 195 insertions, 24 deletions
diff --git a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp index 690393bbfe4..1a7dbc7b0c3 100644 --- a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp @@ -9,6 +9,7 @@ #include <ydb/library/yql/core/yql_type_helpers.h> #include <ydb/library/yql/utils/log/log.h> +#include <util/string/type.h> namespace NYql { namespace { @@ -383,6 +384,188 @@ TExprNode::TPtr SingleInputPredicatePushdownOverEquiJoin(TExprNode::TPtr equiJoi return ret; } +void GatherJoinInputs(const TExprNode::TPtr& expr, const TExprNode& row, + const TParentsMap& parentsMap, const THashMap<TString, TString>& backRenameMap, + const TJoinLabels& labels, TSet<ui32>& inputs, TSet<TStringBuf>& usedFields) { + usedFields.clear(); + + if (!HaveFieldsSubset(expr, row, usedFields, parentsMap, false)) { + const auto inputStructType = RemoveOptionalType(row.GetTypeAnn())->Cast<TStructExprType>(); + for (const auto& i : inputStructType->GetItems()) { + usedFields.insert(i->GetName()); + } + } + + for (auto x : usedFields) { + // rename used fields + if (auto renamed = backRenameMap.FindPtr(x)) { + x = *renamed; + } + + TStringBuf part1; + TStringBuf part2; + SplitTableName(x, part1, part2); + inputs.insert(*labels.FindInputIndex(part1)); + if (inputs.size() == labels.Inputs.size()) { + break; + } + } +} + +TExprNode::TPtr AddLinkToJoinTree(TExprNode::TPtr joinTree, TStringBuf label1, TStringBuf column1, TStringBuf label2, TStringBuf column2, + TExprContext& ctx, TMaybe<ui32>& found1, TMaybe<ui32>& found2, bool& updated) { + YQL_ENSURE(joinTree->Child(0)->Content() == "Cross" || joinTree->Child(0)->Content() == "Inner"); + auto children = joinTree->ChildrenList(); + + auto& left = children[1]; + if (!left->IsAtom()) { + left = AddLinkToJoinTree(left, label1, column1, label2, column2, ctx, found1, found2, updated); + if (found1) { + found1 = 1u; + } + + if (found2) { + found2 = 1u; + } + } else { + if (left->Content() == label1) { + found1 = 1u; + } + + if (left->Content() == label2) { + found2 = 1u; + } + } + + auto& right = children[2]; + if (!right->IsAtom()) { + right = AddLinkToJoinTree(right, label1, column1, label2, column2, ctx, found1, found2, updated); + if (found1) { + found1 = 2u; + } + + if (found2) { + found2 = 2u; + } + } else { + if (right->Content() == label1) { + found1 = 2u; + } + + if (right->Content() == label2) { + found2 = 2u; + } + } + + if (found1 && found2) { + if (!updated) { + if (joinTree->Child(0)->Content() == "Cross") { + children[0] = ctx.NewAtom(joinTree->Pos(), "Inner"); + } + + if (*found1 == 2u) { + std::swap(label1, label2); + std::swap(column1, column2); + } + + auto link1 = children[3]->ChildrenList(); + link1.push_back(ctx.NewAtom(joinTree->Pos(), label1)); + link1.push_back(ctx.NewAtom(joinTree->Pos(), column1)); + children[3] = ctx.ChangeChildren(*children[3], std::move(link1)); + + auto link2 = children[4]->ChildrenList(); + link2.push_back(ctx.NewAtom(joinTree->Pos(), label2)); + link2.push_back(ctx.NewAtom(joinTree->Pos(), column2)); + children[4] = ctx.ChangeChildren(*children[4], std::move(link2)); + + updated = true; + } + } + + return ctx.ChangeChildren(*joinTree, std::move(children)); +} + +TExprNode::TPtr DecayCrossJoinIntoInner(TExprNode::TPtr equiJoin, TExprNode::TPtr predicate, + const TJoinLabels& labels, ui32 index1, ui32 index2, const TExprNode& row, const THashMap<TString, TString>& backRenameMap, + const TParentsMap& parentsMap, TExprContext& ctx) { + YQL_ENSURE(index1 != index2); + bool withCoalesce = false; + if (predicate->IsCallable("Coalesce")) { + if (predicate->Tail().IsCallable("Bool") && IsFalse(predicate->Tail().Head().Content())) { + withCoalesce = true; + predicate = predicate->HeadPtr(); + } else { + return equiJoin; + } + } + + + TExprNode::TPtr left, right; + if (predicate->IsCallable("==")) { + left = predicate->ChildPtr(0); + right = predicate->ChildPtr(1); + } else if (predicate->IsCallable("FromPg") && predicate->Head().IsCallable("PgResolvedOp") && + (predicate->Head().Head().Content() == "=")) { + left = predicate->Head().ChildPtr(2); + right = predicate->Head().ChildPtr(3); + } else { + return equiJoin; + } + + TSet<ui32> leftInputs, rightInputs; + TSet<TStringBuf> usedFields; + GatherJoinInputs(left, row, parentsMap, backRenameMap, labels, leftInputs, usedFields); + GatherJoinInputs(right, row, parentsMap, backRenameMap, labels, rightInputs, usedFields); + bool good = false; + if (leftInputs.size() == 1 && rightInputs.size() == 1) { + if (*leftInputs.begin() == index1 && *rightInputs.begin() == index2) { + good = true; + } else if (*leftInputs.begin() == index2 && *rightInputs.begin() == index1) { + good = true; + } + } + + if (!good) { + return equiJoin; + } + + auto inputsCount = equiJoin->ChildrenSize() - 2; + auto joinTree = equiJoin->Child(inputsCount); + if (!IsRequiredSide(joinTree, labels, index1).first || + !IsRequiredSide(joinTree, labels, index2).first) { + return equiJoin; + } + + TStringBuf label1, column1, label2, column2; + if (left->IsCallable("Member") && left->Child(0) == &row) { + auto x = left->Tail().Content(); + if (auto ptr = backRenameMap.FindPtr(x)) { + x = *ptr; + } + + SplitTableName(x, label1, column1); + } else { + return equiJoin; + } + + if (right->IsCallable("Member") && right->Child(0) == &row) { + auto x = right->Tail().Content(); + if (auto ptr = backRenameMap.FindPtr(x)) { + x = *ptr; + } + + SplitTableName(x, label2, column2); + } else { + return equiJoin; + } + + TMaybe<ui32> found1, found2; + bool updated = false; + auto newJoinTree = AddLinkToJoinTree(joinTree, label1, column1, label2, column2, ctx, found1, found2, updated); + YQL_ENSURE(updated); + return ctx.ChangeChild(*equiJoin, inputsCount, std::move(newJoinTree)); +} + TExprNode::TPtr FlatMapOverEquiJoin(const TCoFlatMapBase& node, TExprContext& ctx, const TParentsMap& parentsMap) { auto equiJoin = node.Input(); auto structType = equiJoin.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType() @@ -520,31 +703,8 @@ TExprNode::TPtr FlatMapOverEquiJoin(const TCoFlatMapBase& node, TExprContext& ct continue; } - TSet<TStringBuf> usedFields; TSet<ui32> inputs; - if (!HaveFieldsSubset(andTerm, row, usedFields, parentsMap, false)) { - continue; - } - - for (auto x : usedFields) { - // rename used fields - if (auto renamed = backRenameMap.FindPtr(x)) { - x = *renamed; - } - - TStringBuf part1; - TStringBuf part2; - SplitTableName(x, part1, part2); - inputs.insert(*labels.FindInputIndex(part1)); - if (inputs.size() == labels.Inputs.size()) { - break; - } - } - - // all inputs are touched - if (inputs.size() == labels.Inputs.size()) { - continue; - } + GatherJoinInputs(andTerm, row, parentsMap, backRenameMap, labels, inputs, usedFields); if (inputs.size() == 0) { YQL_CLOG(DEBUG, Core) << "ConstantPredicatePushdownOverEquiJoin"; @@ -563,6 +723,17 @@ TExprNode::TPtr FlatMapOverEquiJoin(const TCoFlatMapBase& node, TExprContext& ct break; } } + + if (inputs.size() == 2) { + auto newJoin = DecayCrossJoinIntoInner(equiJoin.Ptr(), andTerm, + labels, *inputs.begin(), *(++inputs.begin()), row, backRenameMap, parentsMap, ctx); + if (newJoin != equiJoin.Ptr()) { + YQL_CLOG(DEBUG, Core) << "DecayCrossJoinIntoInner"; + ret = newJoin; + extraPredicate = FuseAndTerms(node.Pos(), andTerms, andTerm, ctx); + break; + } + } } if (!ret) { |