aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitaly Stoyan <vitstn@gmail.com>2022-06-14 18:54:45 +0300
committerVitaly Stoyan <vitstn@gmail.com>2022-06-14 18:54:45 +0300
commit8c48d1576a569a05856a8d2dde58b9d6038966fb (patch)
treeb00eb60b537ea5e9ca07321f95e612f80ea6f015
parentdb0e3b59f3d00bff597665856acd9c956d1fe71f (diff)
downloadydb-8c48d1576a569a05856a8d2dde58b9d6038966fb.tar.gz
YQL-13966 convert some filters over cross join into inner joins
ref:04ab2fb7c5f18c7997550433d3643d4e1dbb92e2
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_flow2.cpp219
1 files changed, 195 insertions, 24 deletions
diff --git a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp
index 690393bbfe4..1a7dbc7b0c3 100644
--- a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp
@@ -9,6 +9,7 @@
#include <ydb/library/yql/core/yql_type_helpers.h>
#include <ydb/library/yql/utils/log/log.h>
+#include <util/string/type.h>
namespace NYql {
namespace {
@@ -383,6 +384,188 @@ TExprNode::TPtr SingleInputPredicatePushdownOverEquiJoin(TExprNode::TPtr equiJoi
return ret;
}
+void GatherJoinInputs(const TExprNode::TPtr& expr, const TExprNode& row,
+ const TParentsMap& parentsMap, const THashMap<TString, TString>& backRenameMap,
+ const TJoinLabels& labels, TSet<ui32>& inputs, TSet<TStringBuf>& usedFields) {
+ usedFields.clear();
+
+ if (!HaveFieldsSubset(expr, row, usedFields, parentsMap, false)) {
+ const auto inputStructType = RemoveOptionalType(row.GetTypeAnn())->Cast<TStructExprType>();
+ for (const auto& i : inputStructType->GetItems()) {
+ usedFields.insert(i->GetName());
+ }
+ }
+
+ for (auto x : usedFields) {
+ // rename used fields
+ if (auto renamed = backRenameMap.FindPtr(x)) {
+ x = *renamed;
+ }
+
+ TStringBuf part1;
+ TStringBuf part2;
+ SplitTableName(x, part1, part2);
+ inputs.insert(*labels.FindInputIndex(part1));
+ if (inputs.size() == labels.Inputs.size()) {
+ break;
+ }
+ }
+}
+
+TExprNode::TPtr AddLinkToJoinTree(TExprNode::TPtr joinTree, TStringBuf label1, TStringBuf column1, TStringBuf label2, TStringBuf column2,
+ TExprContext& ctx, TMaybe<ui32>& found1, TMaybe<ui32>& found2, bool& updated) {
+ YQL_ENSURE(joinTree->Child(0)->Content() == "Cross" || joinTree->Child(0)->Content() == "Inner");
+ auto children = joinTree->ChildrenList();
+
+ auto& left = children[1];
+ if (!left->IsAtom()) {
+ left = AddLinkToJoinTree(left, label1, column1, label2, column2, ctx, found1, found2, updated);
+ if (found1) {
+ found1 = 1u;
+ }
+
+ if (found2) {
+ found2 = 1u;
+ }
+ } else {
+ if (left->Content() == label1) {
+ found1 = 1u;
+ }
+
+ if (left->Content() == label2) {
+ found2 = 1u;
+ }
+ }
+
+ auto& right = children[2];
+ if (!right->IsAtom()) {
+ right = AddLinkToJoinTree(right, label1, column1, label2, column2, ctx, found1, found2, updated);
+ if (found1) {
+ found1 = 2u;
+ }
+
+ if (found2) {
+ found2 = 2u;
+ }
+ } else {
+ if (right->Content() == label1) {
+ found1 = 2u;
+ }
+
+ if (right->Content() == label2) {
+ found2 = 2u;
+ }
+ }
+
+ if (found1 && found2) {
+ if (!updated) {
+ if (joinTree->Child(0)->Content() == "Cross") {
+ children[0] = ctx.NewAtom(joinTree->Pos(), "Inner");
+ }
+
+ if (*found1 == 2u) {
+ std::swap(label1, label2);
+ std::swap(column1, column2);
+ }
+
+ auto link1 = children[3]->ChildrenList();
+ link1.push_back(ctx.NewAtom(joinTree->Pos(), label1));
+ link1.push_back(ctx.NewAtom(joinTree->Pos(), column1));
+ children[3] = ctx.ChangeChildren(*children[3], std::move(link1));
+
+ auto link2 = children[4]->ChildrenList();
+ link2.push_back(ctx.NewAtom(joinTree->Pos(), label2));
+ link2.push_back(ctx.NewAtom(joinTree->Pos(), column2));
+ children[4] = ctx.ChangeChildren(*children[4], std::move(link2));
+
+ updated = true;
+ }
+ }
+
+ return ctx.ChangeChildren(*joinTree, std::move(children));
+}
+
+TExprNode::TPtr DecayCrossJoinIntoInner(TExprNode::TPtr equiJoin, TExprNode::TPtr predicate,
+ const TJoinLabels& labels, ui32 index1, ui32 index2, const TExprNode& row, const THashMap<TString, TString>& backRenameMap,
+ const TParentsMap& parentsMap, TExprContext& ctx) {
+ YQL_ENSURE(index1 != index2);
+ bool withCoalesce = false;
+ if (predicate->IsCallable("Coalesce")) {
+ if (predicate->Tail().IsCallable("Bool") && IsFalse(predicate->Tail().Head().Content())) {
+ withCoalesce = true;
+ predicate = predicate->HeadPtr();
+ } else {
+ return equiJoin;
+ }
+ }
+
+
+ TExprNode::TPtr left, right;
+ if (predicate->IsCallable("==")) {
+ left = predicate->ChildPtr(0);
+ right = predicate->ChildPtr(1);
+ } else if (predicate->IsCallable("FromPg") && predicate->Head().IsCallable("PgResolvedOp") &&
+ (predicate->Head().Head().Content() == "=")) {
+ left = predicate->Head().ChildPtr(2);
+ right = predicate->Head().ChildPtr(3);
+ } else {
+ return equiJoin;
+ }
+
+ TSet<ui32> leftInputs, rightInputs;
+ TSet<TStringBuf> usedFields;
+ GatherJoinInputs(left, row, parentsMap, backRenameMap, labels, leftInputs, usedFields);
+ GatherJoinInputs(right, row, parentsMap, backRenameMap, labels, rightInputs, usedFields);
+ bool good = false;
+ if (leftInputs.size() == 1 && rightInputs.size() == 1) {
+ if (*leftInputs.begin() == index1 && *rightInputs.begin() == index2) {
+ good = true;
+ } else if (*leftInputs.begin() == index2 && *rightInputs.begin() == index1) {
+ good = true;
+ }
+ }
+
+ if (!good) {
+ return equiJoin;
+ }
+
+ auto inputsCount = equiJoin->ChildrenSize() - 2;
+ auto joinTree = equiJoin->Child(inputsCount);
+ if (!IsRequiredSide(joinTree, labels, index1).first ||
+ !IsRequiredSide(joinTree, labels, index2).first) {
+ return equiJoin;
+ }
+
+ TStringBuf label1, column1, label2, column2;
+ if (left->IsCallable("Member") && left->Child(0) == &row) {
+ auto x = left->Tail().Content();
+ if (auto ptr = backRenameMap.FindPtr(x)) {
+ x = *ptr;
+ }
+
+ SplitTableName(x, label1, column1);
+ } else {
+ return equiJoin;
+ }
+
+ if (right->IsCallable("Member") && right->Child(0) == &row) {
+ auto x = right->Tail().Content();
+ if (auto ptr = backRenameMap.FindPtr(x)) {
+ x = *ptr;
+ }
+
+ SplitTableName(x, label2, column2);
+ } else {
+ return equiJoin;
+ }
+
+ TMaybe<ui32> found1, found2;
+ bool updated = false;
+ auto newJoinTree = AddLinkToJoinTree(joinTree, label1, column1, label2, column2, ctx, found1, found2, updated);
+ YQL_ENSURE(updated);
+ return ctx.ChangeChild(*equiJoin, inputsCount, std::move(newJoinTree));
+}
+
TExprNode::TPtr FlatMapOverEquiJoin(const TCoFlatMapBase& node, TExprContext& ctx, const TParentsMap& parentsMap) {
auto equiJoin = node.Input();
auto structType = equiJoin.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType()
@@ -520,31 +703,8 @@ TExprNode::TPtr FlatMapOverEquiJoin(const TCoFlatMapBase& node, TExprContext& ct
continue;
}
- TSet<TStringBuf> usedFields;
TSet<ui32> inputs;
- if (!HaveFieldsSubset(andTerm, row, usedFields, parentsMap, false)) {
- continue;
- }
-
- for (auto x : usedFields) {
- // rename used fields
- if (auto renamed = backRenameMap.FindPtr(x)) {
- x = *renamed;
- }
-
- TStringBuf part1;
- TStringBuf part2;
- SplitTableName(x, part1, part2);
- inputs.insert(*labels.FindInputIndex(part1));
- if (inputs.size() == labels.Inputs.size()) {
- break;
- }
- }
-
- // all inputs are touched
- if (inputs.size() == labels.Inputs.size()) {
- continue;
- }
+ GatherJoinInputs(andTerm, row, parentsMap, backRenameMap, labels, inputs, usedFields);
if (inputs.size() == 0) {
YQL_CLOG(DEBUG, Core) << "ConstantPredicatePushdownOverEquiJoin";
@@ -563,6 +723,17 @@ TExprNode::TPtr FlatMapOverEquiJoin(const TCoFlatMapBase& node, TExprContext& ct
break;
}
}
+
+ if (inputs.size() == 2) {
+ auto newJoin = DecayCrossJoinIntoInner(equiJoin.Ptr(), andTerm,
+ labels, *inputs.begin(), *(++inputs.begin()), row, backRenameMap, parentsMap, ctx);
+ if (newJoin != equiJoin.Ptr()) {
+ YQL_CLOG(DEBUG, Core) << "DecayCrossJoinIntoInner";
+ ret = newJoin;
+ extraPredicate = FuseAndTerms(node.Pos(), andTerms, andTerm, ctx);
+ break;
+ }
+ }
}
if (!ret) {