diff options
author | aneporada <aneporada@ydb.tech> | 2023-02-10 15:07:38 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-02-10 15:07:38 +0300 |
commit | c5c1a23f428643e0fb6675b84dc4d7f00abf1ead (patch) | |
tree | a407e42654ba1db084f30173cf46c5378ed5fe9e | |
parent | 712a7a36204f16a88f8c57a0e3cc120f3e592718 (diff) | |
download | ydb-c5c1a23f428643e0fb6675b84dc4d7f00abf1ead.tar.gz |
Pushdown filter over aggregation (single usage)
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_co_flow2.cpp | 71 | ||||
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_flatmap_over_join.cpp | 40 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_opt_utils.cpp | 47 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_opt_utils.h | 3 |
4 files changed, 127 insertions, 34 deletions
diff --git a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp index 3b868489c0a..1dd82fc012f 100644 --- a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp @@ -1070,8 +1070,67 @@ TExprNode::TPtr OptimizeCollect(const TExprNode::TPtr& node, TExprContext& ctx, return node; } +TExprBase FilterOverAggregate(const TCoFlatMapBase& node, TExprContext& ctx, const TParentsMap& parentsMap) { + if (!TCoConditionalValueBase::Match(node.Lambda().Body().Raw())) { + return node; + } + + TExprBase arg = node.Lambda().Args().Arg(0); + TCoConditionalValueBase body = node.Lambda().Body().Cast<TCoConditionalValueBase>(); + if (HasDependsOn(body.Predicate().Ptr(), arg.Ptr())) { + return node; + } + + const TCoAggregate agg = node.Input().Cast<TCoAggregate>(); + THashSet<TStringBuf> keyColumns; + for (auto key : agg.Keys()) { + keyColumns.insert(key.Value()); + } + + TExprNodeList andComponents; + if (auto maybeAnd = body.Predicate().Maybe<TCoAnd>()) { + andComponents = maybeAnd.Cast().Ref().ChildrenList(); + } else { + andComponents.push_back(body.Predicate().Ptr()); + } + + TExprNodeList pushComponents; + TExprNodeList restComponents; + for (auto& p : andComponents) { + TSet<TStringBuf> usedFields; + if (p->IsCallable("Likely") || + !HaveFieldsSubset(p, arg.Ref(), usedFields, parentsMap) || + !AllOf(usedFields, [&](TStringBuf field) { return keyColumns.contains(field); }) || + !IsStrict(p)) + { + restComponents.push_back(p); + } else { + pushComponents.push_back(p); + } + } + + if (pushComponents.empty()) { + return node; + } + + TExprNode::TPtr pushPred = ctx.NewCallable(body.Predicate().Pos(), "And", std::move(pushComponents)); + TExprNode::TPtr restPred = restComponents.empty() ? + MakeBool<true>(body.Predicate().Pos(), ctx) : + ctx.NewCallable(body.Predicate().Pos(), "And", std::move(restComponents)); + + auto pushBody = ctx.NewCallable(body.Pos(), "OptionalIf", { pushPred, arg.Ptr() }); + auto pushLambda = ctx.DeepCopyLambda(*ctx.ChangeChild(node.Lambda().Ref(), TCoLambda::idx_Body, std::move(pushBody))); + + auto restBody = ctx.ChangeChild(body.Ref(), TCoConditionalValueBase::idx_Predicate, std::move(restPred)); + auto restLambda = ctx.DeepCopyLambda(*ctx.ChangeChild(node.Lambda().Ref(), TCoLambda::idx_Body, std::move(restBody))); + + auto newAggInput = ctx.NewCallable(agg.Input().Pos(), "FlatMap", { agg.Input().Ptr(), pushLambda }); + auto newAgg = ctx.ChangeChild(agg.Ref(), TCoAggregate::idx_Input, std::move(newAggInput)); + return TExprBase(ctx.NewCallable(node.Pos(), node.Ref().Content(), { newAgg, restLambda })); } +} // namespace + void RegisterCoFlowCallables2(TCallableOptimizerMap& map) { using namespace std::placeholders; @@ -1096,6 +1155,18 @@ void RegisterCoFlowCallables2(TCallableOptimizerMap& map) { } } + if (self.Input().Ref().IsCallable("Aggregate")) { + auto ret = FilterOverAggregate(self, ctx, *optCtx.ParentsMap); + if (!ret.Raw()) { + return nullptr; + } + + if (ret.Raw() != self.Raw()) { + YQL_CLOG(DEBUG, Core) << "Filter over Aggregate"; + return ret.Ptr(); + } + } + if (self.Input().Ref().IsCallable(TCoGroupingCore::CallableName())) { auto groupingCore = self.Input().Cast<TCoGroupingCore>(); const TExprNode* extract = nullptr; diff --git a/ydb/library/yql/core/common_opt/yql_flatmap_over_join.cpp b/ydb/library/yql/core/common_opt/yql_flatmap_over_join.cpp index 34afad52542..031fc6b4028 100644 --- a/ydb/library/yql/core/common_opt/yql_flatmap_over_join.cpp +++ b/ydb/library/yql/core/common_opt/yql_flatmap_over_join.cpp @@ -134,41 +134,13 @@ TExprNode::TPtr SingleInputPredicatePushdownOverEquiJoin(TExprNode::TPtr equiJoi return equiJoin; } - // TODO: derive strictness from constraints - bool isStrict = true; - { - YQL_ENSURE(args->ChildrenSize() == 1); - YQL_ENSURE(args->Head().IsArgument()); - bool withDependsOn = false; - size_t insideAssumeStrict = 0; - size_t insideDependsOn = 0; - VisitExpr(predicate, [&](const TExprNode::TPtr& node) { - if (node->IsCallable("AssumeStrict")) { - ++insideAssumeStrict; - } else if (node->IsCallable("DependsOn")) { - ++insideDependsOn; - } else if (isStrict && !insideAssumeStrict && node->IsCallable({"Udf", "ScriptUdf", "Unwrap", "Ensure"})) { - if (!node->IsCallable("Udf") || !HasSetting(*node->Child(TCoUdf::idx_Settings), "strict")) { - isStrict = false; - } - } else if (insideDependsOn && node.Get() == args->Child(0)) { - withDependsOn = true; - } - return !withDependsOn; - }, [&](const TExprNode::TPtr& node) { - if (node->IsCallable("AssumeStrict")) { - YQL_ENSURE(insideAssumeStrict > 0); - --insideAssumeStrict; - } else if (node->IsCallable("DependsOn")) { - YQL_ENSURE(insideDependsOn > 0); - --insideDependsOn; - } - return true; - }); - if (withDependsOn) { - return equiJoin; - } + YQL_ENSURE(args->ChildrenSize() == 1); + YQL_ENSURE(args->Head().IsArgument()); + if (HasDependsOn(predicate, args->HeadPtr())) { + return equiJoin; } + + const bool isStrict = IsStrict(predicate); if (!isStrict && IsRequiredAndFilteredSide(joinTree, labels, firstCandidate)) { return equiJoin; } diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp index 38b3d7e715f..91037dab188 100644 --- a/ydb/library/yql/core/yql_opt_utils.cpp +++ b/ydb/library/yql/core/yql_opt_utils.cpp @@ -1697,4 +1697,51 @@ bool IsYieldTransparent(const TExprNode::TPtr& root, const TTypeAnnotationContex return !FindNonYieldTransparentNode(root, typeCtx); } +bool IsStrict(const TExprNode::TPtr& root) { + // TODO: add TExprNode::IsStrict() method (with corresponding flag). Fill it as part of type annotation pass + bool isStrict = true; + size_t insideAssumeStrict = 0; + + VisitExpr(root, [&](const TExprNode::TPtr& node) { + if (node->IsCallable("AssumeStrict")) { + ++insideAssumeStrict; + } else if (isStrict && !insideAssumeStrict && node->IsCallable({"Udf", "ScriptUdf", "Unwrap", "Ensure"})) { + if (!node->IsCallable("Udf") || !HasSetting(*node->Child(TCoUdf::idx_Settings), "strict")) { + isStrict = false; + } + } + return isStrict; + }, [&](const TExprNode::TPtr& node) { + if (node->IsCallable("AssumeStrict")) { + YQL_ENSURE(insideAssumeStrict > 0); + --insideAssumeStrict; + } + return true; + }); + + return isStrict; +} + +bool HasDependsOn(const TExprNode::TPtr& root, const TExprNode::TPtr& arg) { + bool withDependsOn = false; + size_t insideDependsOn = 0; + + VisitExpr(root, [&](const TExprNode::TPtr& node) { + if (node->IsCallable("DependsOn")) { + ++insideDependsOn; + } else if (insideDependsOn && node == arg) { + withDependsOn = true; + } + return !withDependsOn; + }, [&](const TExprNode::TPtr& node) { + if (node->IsCallable("DependsOn")) { + YQL_ENSURE(insideDependsOn > 0); + --insideDependsOn; + } + return true; + }); + + return withDependsOn; +} + } diff --git a/ydb/library/yql/core/yql_opt_utils.h b/ydb/library/yql/core/yql_opt_utils.h index db9ed559716..ac72e5ef666 100644 --- a/ydb/library/yql/core/yql_opt_utils.h +++ b/ydb/library/yql/core/yql_opt_utils.h @@ -127,4 +127,7 @@ TExprNode::TPtr MakeNarrowMap(TPositionHandle pos, const TVector<TString>& colum TExprNode::TPtr FindNonYieldTransparentNode(const TExprNode::TPtr& root, const TTypeAnnotationContext& typeCtx); bool IsYieldTransparent(const TExprNode::TPtr& root, const TTypeAnnotationContext& typeCtx); +bool IsStrict(const TExprNode::TPtr& node); +bool HasDependsOn(const TExprNode::TPtr& node, const TExprNode::TPtr& arg); + } |