aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@yandex-team.com>2024-11-21 21:20:36 +0300
committeraneporada <aneporada@yandex-team.com>2024-11-21 21:35:07 +0300
commit00bc077e8f2272cd0206de2bca64c53300982883 (patch)
tree5b0e5b3d1958ae19dabd7974bfd1ebc7abdb5f0b
parent038c81c8f86c12a0cdd2631c89bbfee6723cc964 (diff)
downloadydb-00bc077e8f2272cd0206de2bca64c53300982883.tar.gz
Fix filter pushdown over Aggregate with session/hopping window
commit_hash:3d4989f7a92e9b330cd4b19f1850cdca544a1d64
-rw-r--r--yql/essentials/core/common_opt/yql_co_flow2.cpp39
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/result.json14
-rw-r--r--yql/essentials/tests/sql/suites/aggregate/group_by_session_nopush.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/aggregate/group_by_session_nopush.sql14
4 files changed, 51 insertions, 18 deletions
diff --git a/yql/essentials/core/common_opt/yql_co_flow2.cpp b/yql/essentials/core/common_opt/yql_co_flow2.cpp
index bc25d612db..ace0141cb2 100644
--- a/yql/essentials/core/common_opt/yql_co_flow2.cpp
+++ b/yql/essentials/core/common_opt/yql_co_flow2.cpp
@@ -31,16 +31,7 @@ bool AllowComplexFiltersOverAggregatePushdown(const TOptimizeContext& optCtx) {
optCtx.Types->MaxAggPushdownPredicates > 0;
}
-TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprContext& ctx, const TParentsMap& parentsMap) {
- auto inputType = node.Input().Ref().GetTypeAnn();
- auto structType = inputType->GetKind() == ETypeAnnotationKind::List
- ? inputType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>()
- : inputType->Cast<TStreamExprType>()->GetItemType()->Cast<TStructExprType>();
-
- if (structType->GetSize() == 0) {
- return node.Ptr();
- }
-
+THashSet<TStringBuf> GetAggregationInputKeys(const TCoAggregate& node) {
TMaybe<TStringBuf> sessionColumn;
const auto sessionSetting = GetSetting(node.Settings().Ref(), "session");
if (sessionSetting) {
@@ -58,13 +49,28 @@ TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprCon
}
}
- TSet<TStringBuf> usedFields;
+ THashSet<TStringBuf> result;
for (const auto& x : node.Keys()) {
if (x.Value() != sessionColumn && x.Value() != hoppingColumn) {
- usedFields.insert(x.Value());
+ result.insert(x.Value());
}
}
+ return result;
+}
+
+TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprContext& ctx, const TParentsMap& parentsMap) {
+ auto inputType = node.Input().Ref().GetTypeAnn();
+ auto structType = inputType->GetKind() == ETypeAnnotationKind::List
+ ? inputType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>()
+ : inputType->Cast<TStreamExprType>()->GetItemType()->Cast<TStructExprType>();
+
+ if (structType->GetSize() == 0) {
+ return node.Ptr();
+ }
+
+ THashSet<TStringBuf> usedFields = GetAggregationInputKeys(node);
+
if (usedFields.size() == structType->GetSize()) {
return node.Ptr();
}
@@ -96,7 +102,7 @@ TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprCon
}
}
- if (hoppingSetting) {
+ if (auto hoppingSetting = GetSetting(node.Settings().Ref(), "hopping")) {
auto traitsNode = hoppingSetting->ChildPtr(1);
if (traitsNode->IsList()) {
traitsNode = traitsNode->ChildPtr(1);
@@ -120,7 +126,7 @@ TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprCon
}
}
- if (sessionSetting) {
+ if (auto sessionSetting = GetSetting(node.Settings().Ref(), "session")) {
TCoSessionWindowTraits traits(sessionSetting->Child(1)->ChildPtr(1));
auto usedType = traits.ListType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TListExprType>()->
@@ -1326,10 +1332,7 @@ TExprBase FilterOverAggregate(const TCoFlatMapBase& node, TExprContext& ctx, TOp
TCoConditionalValueBase body = node.Lambda().Body().Cast<TCoConditionalValueBase>();
const TCoAggregate agg = node.Input().Cast<TCoAggregate>();
- THashSet<TStringBuf> keyColumns;
- for (auto key : agg.Keys()) {
- keyColumns.insert(key.Value());
- }
+ const THashSet<TStringBuf> keyColumns = GetAggregationInputKeys(agg);
TExprNodeList andComponents;
if (auto maybeAnd = body.Predicate().Maybe<TCoAnd>()) {
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/result.json b/yql/essentials/tests/sql/sql2yql/canondata/result.json
index ef8ca7ee6c..16c6bd5107 100644
--- a/yql/essentials/tests/sql/sql2yql/canondata/result.json
+++ b/yql/essentials/tests/sql/sql2yql/canondata/result.json
@@ -2463,6 +2463,13 @@
"uri": "https://{canondata_backend}/1924537/1ab444909086b08bd4fe21c5a43f5e183c647e0a/resource.tar.gz#test_sql2yql.test_aggregate-group_by_session_extended_tuple_/sql.yql"
}
],
+ "test_sql2yql.test[aggregate-group_by_session_nopush]": [
+ {
+ "checksum": "09f9e4a178067f6aaa81b9e9959b4cec",
+ "size": 3177,
+ "uri": "https://{canondata_backend}/212715/fe819b0081800cfcbf6e2512d273e760949a6cc7/resource.tar.gz#test_sql2yql.test_aggregate-group_by_session_nopush_/sql.yql"
+ }
+ ],
"test_sql2yql.test[aggregate-group_by_session_only]": [
{
"checksum": "0c22dd1ef887ea533c6e0621c0937ffa",
@@ -22322,6 +22329,13 @@
"uri": "https://{canondata_backend}/1880306/64654158d6bfb1289c66c626a8162239289559d0/resource.tar.gz#test_sql_format.test_aggregate-group_by_session_extended_tuple_/formatted.sql"
}
],
+ "test_sql_format.test[aggregate-group_by_session_nopush]": [
+ {
+ "checksum": "382f93f1c899dd2d1d5ea6b04575cfef",
+ "size": 372,
+ "uri": "https://{canondata_backend}/212715/fe819b0081800cfcbf6e2512d273e760949a6cc7/resource.tar.gz#test_sql_format.test_aggregate-group_by_session_nopush_/formatted.sql"
+ }
+ ],
"test_sql_format.test[aggregate-group_by_session_only]": [
{
"checksum": "531ee77369e54e2a1616411e89c86bb7",
diff --git a/yql/essentials/tests/sql/suites/aggregate/group_by_session_nopush.cfg b/yql/essentials/tests/sql/suites/aggregate/group_by_session_nopush.cfg
new file mode 100644
index 0000000000..9e0b837318
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/aggregate/group_by_session_nopush.cfg
@@ -0,0 +1,2 @@
+in Input session1.txt
+providers yt
diff --git a/yql/essentials/tests/sql/suites/aggregate/group_by_session_nopush.sql b/yql/essentials/tests/sql/suites/aggregate/group_by_session_nopush.sql
new file mode 100644
index 0000000000..fd8e46a071
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/aggregate/group_by_session_nopush.sql
@@ -0,0 +1,14 @@
+/* syntax version 1 */
+/* postgres can not */
+/* yt can not */
+
+SELECT * FROM (
+ SELECT
+ user,
+ cast(session_start as Int64) as ss,
+ ListSort(AGGREGATE_LIST(ts)) as session,
+ COUNT(1) as session_len
+ FROM plato.Input
+ GROUP BY SessionWindow(ts, 10) as session_start, user
+)
+WHERE ss != 100500; -- should not push down