diff options
author | vvvv <vvvv@yandex-team.ru> | 2022-03-22 14:59:31 +0300 |
---|---|---|
committer | vvvv <vvvv@yandex-team.ru> | 2022-03-22 14:59:31 +0300 |
commit | 1881e6012364ecc6a8b43812a4c48e04d615a1f2 (patch) | |
tree | eccb57100a39f398280158969cadf826567f32ff | |
parent | 3b68a4a9e530fd19cc52362235950c28cdab1b23 (diff) | |
download | ydb-1881e6012364ecc6a8b43812a4c48e04d615a1f2.tar.gz |
YQL-14534 use WithContext for PG agg funcs
ref:c833194098d135d9f2e4fb4ae9c0e38c49bb4419
-rw-r--r-- | ydb/library/yql/core/yql_opt_aggregate.cpp | 50 |
1 files changed, 41 insertions, 9 deletions
diff --git a/ydb/library/yql/core/yql_opt_aggregate.cpp b/ydb/library/yql/core/yql_opt_aggregate.cpp index 5ea30e33400..fa2898c74cf 100644 --- a/ydb/library/yql/core/yql_opt_aggregate.cpp +++ b/ydb/library/yql/core/yql_opt_aggregate.cpp @@ -2,6 +2,7 @@ #include "yql_opt_utils.h" #include "yql_opt_window.h" #include "yql_expr_type_annotation.h" +#include "yql_expr_optimize.h" namespace NYql { @@ -35,6 +36,33 @@ TExprNode::TPtr ExpandAggregate(const TExprNode::TPtr& node, TExprContext& ctx, } } + bool needCtx = false; + VisitExpr(*aggregatedColumns, [&needCtx](const TExprNode& node) { + if (node.IsCallable("PgResolvedCallCtx")) { + needCtx = true; + return false; + } + + return true; + }); + + auto contextLambda = needCtx ? + ctx.Builder(node->Pos()) + .Lambda() + .Param("stream") + .Callable("WithContext") + .Atom(0, "Agg") + .Arg(1, "stream") + .Seal() + .Seal() + .Build() : + ctx.Builder(node->Pos()) + .Lambda() + .Param("stream") + .Arg("stream") + .Seal() + .Build(); + const TStructExprType* originalRowType = GetSeqItemType(node->Head().GetTypeAnn())->Cast<TStructExprType>(); TVector<const TItemExprType*> rowItems = originalRowType->GetItems(); @@ -1512,16 +1540,20 @@ TExprNode::TPtr ExpandAggregate(const TExprNode::TPtr& node, TExprContext& ctx, .Add(3, sortKey) .Lambda(4) .Param("stream") - .Callable("Map") - .Callable(0, "Condense1") - .Apply(0, preprocessLambda) - .With(0, "stream") + .Apply(contextLambda) + .With(0) + .Callable("Map") + .Callable(0, "Condense1") + .Apply(0, preprocessLambda) + .With(0, "stream") + .Seal() + .Add(1, std::move(groupInit)) + .Add(2, condenseSwitch) + .Add(3, std::move(groupMerge)) + .Seal() + .Add(1, std::move(groupSave)) .Seal() - .Add(1, std::move(groupInit)) - .Add(2, condenseSwitch) - .Add(3, std::move(groupMerge)) - .Seal() - .Add(1, std::move(groupSave)) + .Done() .Seal() .Seal() .Seal().Build(); |