aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@yandex-team.ru>2022-03-22 14:59:31 +0300
committervvvv <vvvv@yandex-team.ru>2022-03-22 14:59:31 +0300
commit1881e6012364ecc6a8b43812a4c48e04d615a1f2 (patch)
treeeccb57100a39f398280158969cadf826567f32ff
parent3b68a4a9e530fd19cc52362235950c28cdab1b23 (diff)
downloadydb-1881e6012364ecc6a8b43812a4c48e04d615a1f2.tar.gz
YQL-14534 use WithContext for PG agg funcs
ref:c833194098d135d9f2e4fb4ae9c0e38c49bb4419
-rw-r--r--ydb/library/yql/core/yql_opt_aggregate.cpp50
1 files changed, 41 insertions, 9 deletions
diff --git a/ydb/library/yql/core/yql_opt_aggregate.cpp b/ydb/library/yql/core/yql_opt_aggregate.cpp
index 5ea30e33400..fa2898c74cf 100644
--- a/ydb/library/yql/core/yql_opt_aggregate.cpp
+++ b/ydb/library/yql/core/yql_opt_aggregate.cpp
@@ -2,6 +2,7 @@
#include "yql_opt_utils.h"
#include "yql_opt_window.h"
#include "yql_expr_type_annotation.h"
+#include "yql_expr_optimize.h"
namespace NYql {
@@ -35,6 +36,33 @@ TExprNode::TPtr ExpandAggregate(const TExprNode::TPtr& node, TExprContext& ctx,
}
}
+ bool needCtx = false;
+ VisitExpr(*aggregatedColumns, [&needCtx](const TExprNode& node) {
+ if (node.IsCallable("PgResolvedCallCtx")) {
+ needCtx = true;
+ return false;
+ }
+
+ return true;
+ });
+
+ auto contextLambda = needCtx ?
+ ctx.Builder(node->Pos())
+ .Lambda()
+ .Param("stream")
+ .Callable("WithContext")
+ .Atom(0, "Agg")
+ .Arg(1, "stream")
+ .Seal()
+ .Seal()
+ .Build() :
+ ctx.Builder(node->Pos())
+ .Lambda()
+ .Param("stream")
+ .Arg("stream")
+ .Seal()
+ .Build();
+
const TStructExprType* originalRowType = GetSeqItemType(node->Head().GetTypeAnn())->Cast<TStructExprType>();
TVector<const TItemExprType*> rowItems = originalRowType->GetItems();
@@ -1512,16 +1540,20 @@ TExprNode::TPtr ExpandAggregate(const TExprNode::TPtr& node, TExprContext& ctx,
.Add(3, sortKey)
.Lambda(4)
.Param("stream")
- .Callable("Map")
- .Callable(0, "Condense1")
- .Apply(0, preprocessLambda)
- .With(0, "stream")
+ .Apply(contextLambda)
+ .With(0)
+ .Callable("Map")
+ .Callable(0, "Condense1")
+ .Apply(0, preprocessLambda)
+ .With(0, "stream")
+ .Seal()
+ .Add(1, std::move(groupInit))
+ .Add(2, condenseSwitch)
+ .Add(3, std::move(groupMerge))
+ .Seal()
+ .Add(1, std::move(groupSave))
.Seal()
- .Add(1, std::move(groupInit))
- .Add(2, condenseSwitch)
- .Add(3, std::move(groupMerge))
- .Seal()
- .Add(1, std::move(groupSave))
+ .Done()
.Seal()
.Seal()
.Seal().Build();