aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-02-10 15:07:38 +0300
committeraneporada <aneporada@ydb.tech>2023-02-10 15:07:38 +0300
commitc5c1a23f428643e0fb6675b84dc4d7f00abf1ead (patch)
treea407e42654ba1db084f30173cf46c5378ed5fe9e
parent712a7a36204f16a88f8c57a0e3cc120f3e592718 (diff)
downloadydb-c5c1a23f428643e0fb6675b84dc4d7f00abf1ead.tar.gz
Pushdown filter over aggregation (single usage)
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_flow2.cpp71
-rw-r--r--ydb/library/yql/core/common_opt/yql_flatmap_over_join.cpp40
-rw-r--r--ydb/library/yql/core/yql_opt_utils.cpp47
-rw-r--r--ydb/library/yql/core/yql_opt_utils.h3
4 files changed, 127 insertions, 34 deletions
diff --git a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp
index 3b868489c0a..1dd82fc012f 100644
--- a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp
@@ -1070,8 +1070,67 @@ TExprNode::TPtr OptimizeCollect(const TExprNode::TPtr& node, TExprContext& ctx,
return node;
}
+TExprBase FilterOverAggregate(const TCoFlatMapBase& node, TExprContext& ctx, const TParentsMap& parentsMap) {
+ if (!TCoConditionalValueBase::Match(node.Lambda().Body().Raw())) {
+ return node;
+ }
+
+ TExprBase arg = node.Lambda().Args().Arg(0);
+ TCoConditionalValueBase body = node.Lambda().Body().Cast<TCoConditionalValueBase>();
+ if (HasDependsOn(body.Predicate().Ptr(), arg.Ptr())) {
+ return node;
+ }
+
+ const TCoAggregate agg = node.Input().Cast<TCoAggregate>();
+ THashSet<TStringBuf> keyColumns;
+ for (auto key : agg.Keys()) {
+ keyColumns.insert(key.Value());
+ }
+
+ TExprNodeList andComponents;
+ if (auto maybeAnd = body.Predicate().Maybe<TCoAnd>()) {
+ andComponents = maybeAnd.Cast().Ref().ChildrenList();
+ } else {
+ andComponents.push_back(body.Predicate().Ptr());
+ }
+
+ TExprNodeList pushComponents;
+ TExprNodeList restComponents;
+ for (auto& p : andComponents) {
+ TSet<TStringBuf> usedFields;
+ if (p->IsCallable("Likely") ||
+ !HaveFieldsSubset(p, arg.Ref(), usedFields, parentsMap) ||
+ !AllOf(usedFields, [&](TStringBuf field) { return keyColumns.contains(field); }) ||
+ !IsStrict(p))
+ {
+ restComponents.push_back(p);
+ } else {
+ pushComponents.push_back(p);
+ }
+ }
+
+ if (pushComponents.empty()) {
+ return node;
+ }
+
+ TExprNode::TPtr pushPred = ctx.NewCallable(body.Predicate().Pos(), "And", std::move(pushComponents));
+ TExprNode::TPtr restPred = restComponents.empty() ?
+ MakeBool<true>(body.Predicate().Pos(), ctx) :
+ ctx.NewCallable(body.Predicate().Pos(), "And", std::move(restComponents));
+
+ auto pushBody = ctx.NewCallable(body.Pos(), "OptionalIf", { pushPred, arg.Ptr() });
+ auto pushLambda = ctx.DeepCopyLambda(*ctx.ChangeChild(node.Lambda().Ref(), TCoLambda::idx_Body, std::move(pushBody)));
+
+ auto restBody = ctx.ChangeChild(body.Ref(), TCoConditionalValueBase::idx_Predicate, std::move(restPred));
+ auto restLambda = ctx.DeepCopyLambda(*ctx.ChangeChild(node.Lambda().Ref(), TCoLambda::idx_Body, std::move(restBody)));
+
+ auto newAggInput = ctx.NewCallable(agg.Input().Pos(), "FlatMap", { agg.Input().Ptr(), pushLambda });
+ auto newAgg = ctx.ChangeChild(agg.Ref(), TCoAggregate::idx_Input, std::move(newAggInput));
+ return TExprBase(ctx.NewCallable(node.Pos(), node.Ref().Content(), { newAgg, restLambda }));
}
+} // namespace
+
void RegisterCoFlowCallables2(TCallableOptimizerMap& map) {
using namespace std::placeholders;
@@ -1096,6 +1155,18 @@ void RegisterCoFlowCallables2(TCallableOptimizerMap& map) {
}
}
+ if (self.Input().Ref().IsCallable("Aggregate")) {
+ auto ret = FilterOverAggregate(self, ctx, *optCtx.ParentsMap);
+ if (!ret.Raw()) {
+ return nullptr;
+ }
+
+ if (ret.Raw() != self.Raw()) {
+ YQL_CLOG(DEBUG, Core) << "Filter over Aggregate";
+ return ret.Ptr();
+ }
+ }
+
if (self.Input().Ref().IsCallable(TCoGroupingCore::CallableName())) {
auto groupingCore = self.Input().Cast<TCoGroupingCore>();
const TExprNode* extract = nullptr;
diff --git a/ydb/library/yql/core/common_opt/yql_flatmap_over_join.cpp b/ydb/library/yql/core/common_opt/yql_flatmap_over_join.cpp
index 34afad52542..031fc6b4028 100644
--- a/ydb/library/yql/core/common_opt/yql_flatmap_over_join.cpp
+++ b/ydb/library/yql/core/common_opt/yql_flatmap_over_join.cpp
@@ -134,41 +134,13 @@ TExprNode::TPtr SingleInputPredicatePushdownOverEquiJoin(TExprNode::TPtr equiJoi
return equiJoin;
}
- // TODO: derive strictness from constraints
- bool isStrict = true;
- {
- YQL_ENSURE(args->ChildrenSize() == 1);
- YQL_ENSURE(args->Head().IsArgument());
- bool withDependsOn = false;
- size_t insideAssumeStrict = 0;
- size_t insideDependsOn = 0;
- VisitExpr(predicate, [&](const TExprNode::TPtr& node) {
- if (node->IsCallable("AssumeStrict")) {
- ++insideAssumeStrict;
- } else if (node->IsCallable("DependsOn")) {
- ++insideDependsOn;
- } else if (isStrict && !insideAssumeStrict && node->IsCallable({"Udf", "ScriptUdf", "Unwrap", "Ensure"})) {
- if (!node->IsCallable("Udf") || !HasSetting(*node->Child(TCoUdf::idx_Settings), "strict")) {
- isStrict = false;
- }
- } else if (insideDependsOn && node.Get() == args->Child(0)) {
- withDependsOn = true;
- }
- return !withDependsOn;
- }, [&](const TExprNode::TPtr& node) {
- if (node->IsCallable("AssumeStrict")) {
- YQL_ENSURE(insideAssumeStrict > 0);
- --insideAssumeStrict;
- } else if (node->IsCallable("DependsOn")) {
- YQL_ENSURE(insideDependsOn > 0);
- --insideDependsOn;
- }
- return true;
- });
- if (withDependsOn) {
- return equiJoin;
- }
+ YQL_ENSURE(args->ChildrenSize() == 1);
+ YQL_ENSURE(args->Head().IsArgument());
+ if (HasDependsOn(predicate, args->HeadPtr())) {
+ return equiJoin;
}
+
+ const bool isStrict = IsStrict(predicate);
if (!isStrict && IsRequiredAndFilteredSide(joinTree, labels, firstCandidate)) {
return equiJoin;
}
diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp
index 38b3d7e715f..91037dab188 100644
--- a/ydb/library/yql/core/yql_opt_utils.cpp
+++ b/ydb/library/yql/core/yql_opt_utils.cpp
@@ -1697,4 +1697,51 @@ bool IsYieldTransparent(const TExprNode::TPtr& root, const TTypeAnnotationContex
return !FindNonYieldTransparentNode(root, typeCtx);
}
+bool IsStrict(const TExprNode::TPtr& root) {
+ // TODO: add TExprNode::IsStrict() method (with corresponding flag). Fill it as part of type annotation pass
+ bool isStrict = true;
+ size_t insideAssumeStrict = 0;
+
+ VisitExpr(root, [&](const TExprNode::TPtr& node) {
+ if (node->IsCallable("AssumeStrict")) {
+ ++insideAssumeStrict;
+ } else if (isStrict && !insideAssumeStrict && node->IsCallable({"Udf", "ScriptUdf", "Unwrap", "Ensure"})) {
+ if (!node->IsCallable("Udf") || !HasSetting(*node->Child(TCoUdf::idx_Settings), "strict")) {
+ isStrict = false;
+ }
+ }
+ return isStrict;
+ }, [&](const TExprNode::TPtr& node) {
+ if (node->IsCallable("AssumeStrict")) {
+ YQL_ENSURE(insideAssumeStrict > 0);
+ --insideAssumeStrict;
+ }
+ return true;
+ });
+
+ return isStrict;
+}
+
+bool HasDependsOn(const TExprNode::TPtr& root, const TExprNode::TPtr& arg) {
+ bool withDependsOn = false;
+ size_t insideDependsOn = 0;
+
+ VisitExpr(root, [&](const TExprNode::TPtr& node) {
+ if (node->IsCallable("DependsOn")) {
+ ++insideDependsOn;
+ } else if (insideDependsOn && node == arg) {
+ withDependsOn = true;
+ }
+ return !withDependsOn;
+ }, [&](const TExprNode::TPtr& node) {
+ if (node->IsCallable("DependsOn")) {
+ YQL_ENSURE(insideDependsOn > 0);
+ --insideDependsOn;
+ }
+ return true;
+ });
+
+ return withDependsOn;
+}
+
}
diff --git a/ydb/library/yql/core/yql_opt_utils.h b/ydb/library/yql/core/yql_opt_utils.h
index db9ed559716..ac72e5ef666 100644
--- a/ydb/library/yql/core/yql_opt_utils.h
+++ b/ydb/library/yql/core/yql_opt_utils.h
@@ -127,4 +127,7 @@ TExprNode::TPtr MakeNarrowMap(TPositionHandle pos, const TVector<TString>& colum
TExprNode::TPtr FindNonYieldTransparentNode(const TExprNode::TPtr& root, const TTypeAnnotationContext& typeCtx);
bool IsYieldTransparent(const TExprNode::TPtr& root, const TTypeAnnotationContext& typeCtx);
+bool IsStrict(const TExprNode::TPtr& node);
+bool HasDependsOn(const TExprNode::TPtr& node, const TExprNode::TPtr& arg);
+
}