diff options
author | aneporada <aneporada@yandex-team.com> | 2024-11-21 21:20:36 +0300 |
---|---|---|
committer | aneporada <aneporada@yandex-team.com> | 2024-11-21 21:35:07 +0300 |
commit | 00bc077e8f2272cd0206de2bca64c53300982883 (patch) | |
tree | 5b0e5b3d1958ae19dabd7974bfd1ebc7abdb5f0b | |
parent | 038c81c8f86c12a0cdd2631c89bbfee6723cc964 (diff) | |
download | ydb-00bc077e8f2272cd0206de2bca64c53300982883.tar.gz |
Fix filter pushdown over Aggregate with session/hopping window
commit_hash:3d4989f7a92e9b330cd4b19f1850cdca544a1d64
4 files changed, 51 insertions, 18 deletions
diff --git a/yql/essentials/core/common_opt/yql_co_flow2.cpp b/yql/essentials/core/common_opt/yql_co_flow2.cpp index bc25d612db..ace0141cb2 100644 --- a/yql/essentials/core/common_opt/yql_co_flow2.cpp +++ b/yql/essentials/core/common_opt/yql_co_flow2.cpp @@ -31,16 +31,7 @@ bool AllowComplexFiltersOverAggregatePushdown(const TOptimizeContext& optCtx) { optCtx.Types->MaxAggPushdownPredicates > 0; } -TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprContext& ctx, const TParentsMap& parentsMap) { - auto inputType = node.Input().Ref().GetTypeAnn(); - auto structType = inputType->GetKind() == ETypeAnnotationKind::List - ? inputType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>() - : inputType->Cast<TStreamExprType>()->GetItemType()->Cast<TStructExprType>(); - - if (structType->GetSize() == 0) { - return node.Ptr(); - } - +THashSet<TStringBuf> GetAggregationInputKeys(const TCoAggregate& node) { TMaybe<TStringBuf> sessionColumn; const auto sessionSetting = GetSetting(node.Settings().Ref(), "session"); if (sessionSetting) { @@ -58,13 +49,28 @@ TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprCon } } - TSet<TStringBuf> usedFields; + THashSet<TStringBuf> result; for (const auto& x : node.Keys()) { if (x.Value() != sessionColumn && x.Value() != hoppingColumn) { - usedFields.insert(x.Value()); + result.insert(x.Value()); } } + return result; +} + +TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprContext& ctx, const TParentsMap& parentsMap) { + auto inputType = node.Input().Ref().GetTypeAnn(); + auto structType = inputType->GetKind() == ETypeAnnotationKind::List + ? inputType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>() + : inputType->Cast<TStreamExprType>()->GetItemType()->Cast<TStructExprType>(); + + if (structType->GetSize() == 0) { + return node.Ptr(); + } + + THashSet<TStringBuf> usedFields = GetAggregationInputKeys(node); + if (usedFields.size() == structType->GetSize()) { return node.Ptr(); } @@ -96,7 +102,7 @@ TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprCon } } - if (hoppingSetting) { + if (auto hoppingSetting = GetSetting(node.Settings().Ref(), "hopping")) { auto traitsNode = hoppingSetting->ChildPtr(1); if (traitsNode->IsList()) { traitsNode = traitsNode->ChildPtr(1); @@ -120,7 +126,7 @@ TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprCon } } - if (sessionSetting) { + if (auto sessionSetting = GetSetting(node.Settings().Ref(), "session")) { TCoSessionWindowTraits traits(sessionSetting->Child(1)->ChildPtr(1)); auto usedType = traits.ListType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TListExprType>()-> @@ -1326,10 +1332,7 @@ TExprBase FilterOverAggregate(const TCoFlatMapBase& node, TExprContext& ctx, TOp TCoConditionalValueBase body = node.Lambda().Body().Cast<TCoConditionalValueBase>(); const TCoAggregate agg = node.Input().Cast<TCoAggregate>(); - THashSet<TStringBuf> keyColumns; - for (auto key : agg.Keys()) { - keyColumns.insert(key.Value()); - } + const THashSet<TStringBuf> keyColumns = GetAggregationInputKeys(agg); TExprNodeList andComponents; if (auto maybeAnd = body.Predicate().Maybe<TCoAnd>()) { diff --git a/yql/essentials/tests/sql/sql2yql/canondata/result.json b/yql/essentials/tests/sql/sql2yql/canondata/result.json index ef8ca7ee6c..16c6bd5107 100644 --- a/yql/essentials/tests/sql/sql2yql/canondata/result.json +++ b/yql/essentials/tests/sql/sql2yql/canondata/result.json @@ -2463,6 +2463,13 @@ "uri": "https://{canondata_backend}/1924537/1ab444909086b08bd4fe21c5a43f5e183c647e0a/resource.tar.gz#test_sql2yql.test_aggregate-group_by_session_extended_tuple_/sql.yql" } ], + "test_sql2yql.test[aggregate-group_by_session_nopush]": [ + { + "checksum": "09f9e4a178067f6aaa81b9e9959b4cec", + "size": 3177, + "uri": "https://{canondata_backend}/212715/fe819b0081800cfcbf6e2512d273e760949a6cc7/resource.tar.gz#test_sql2yql.test_aggregate-group_by_session_nopush_/sql.yql" + } + ], "test_sql2yql.test[aggregate-group_by_session_only]": [ { "checksum": "0c22dd1ef887ea533c6e0621c0937ffa", @@ -22322,6 +22329,13 @@ "uri": "https://{canondata_backend}/1880306/64654158d6bfb1289c66c626a8162239289559d0/resource.tar.gz#test_sql_format.test_aggregate-group_by_session_extended_tuple_/formatted.sql" } ], + "test_sql_format.test[aggregate-group_by_session_nopush]": [ + { + "checksum": "382f93f1c899dd2d1d5ea6b04575cfef", + "size": 372, + "uri": "https://{canondata_backend}/212715/fe819b0081800cfcbf6e2512d273e760949a6cc7/resource.tar.gz#test_sql_format.test_aggregate-group_by_session_nopush_/formatted.sql" + } + ], "test_sql_format.test[aggregate-group_by_session_only]": [ { "checksum": "531ee77369e54e2a1616411e89c86bb7", diff --git a/yql/essentials/tests/sql/suites/aggregate/group_by_session_nopush.cfg b/yql/essentials/tests/sql/suites/aggregate/group_by_session_nopush.cfg new file mode 100644 index 0000000000..9e0b837318 --- /dev/null +++ b/yql/essentials/tests/sql/suites/aggregate/group_by_session_nopush.cfg @@ -0,0 +1,2 @@ +in Input session1.txt +providers yt diff --git a/yql/essentials/tests/sql/suites/aggregate/group_by_session_nopush.sql b/yql/essentials/tests/sql/suites/aggregate/group_by_session_nopush.sql new file mode 100644 index 0000000000..fd8e46a071 --- /dev/null +++ b/yql/essentials/tests/sql/suites/aggregate/group_by_session_nopush.sql @@ -0,0 +1,14 @@ +/* syntax version 1 */ +/* postgres can not */ +/* yt can not */ + +SELECT * FROM ( + SELECT + user, + cast(session_start as Int64) as ss, + ListSort(AGGREGATE_LIST(ts)) as session, + COUNT(1) as session_len + FROM plato.Input + GROUP BY SessionWindow(ts, 10) as session_start, user +) +WHERE ss != 100500; -- should not push down |