aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@yandex-team.ru>2022-04-02 01:30:12 +0300
committeraneporada <aneporada@yandex-team.ru>2022-04-02 01:30:12 +0300
commit70dd205a265bca7ee326e1b524ad8b1c64240c7b (patch)
tree96c994b66c2ef4c8a71bde2e12ee08249a6aad1e
parent6754a6a379c0522676071b52818018c46b0a8e58 (diff)
downloadydb-70dd205a265bca7ee326e1b524ad8b1c64240c7b.tar.gz
[YQL-14534] Insert WithContext in window functions
ref:3b759f9a81f889f87c04937418fbb6fb4b4d9373
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.cpp3
-rw-r--r--ydb/library/yql/core/yql_opt_window.cpp83
2 files changed, 66 insertions, 20 deletions
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp
index 3dfbd35331..ca825f41d4 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.cpp
+++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp
@@ -4996,6 +4996,9 @@ bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, bool& convertT
bool HasContextFuncs(const TExprNode& input) {
bool needCtx = false;
VisitExpr(input, [&needCtx](const TExprNode& node) {
+ if (needCtx || node.IsCallable("WithContext")) {
+ return false;
+ }
if (node.IsCallable("PgResolvedCallCtx")) {
needCtx = true;
return false;
diff --git a/ydb/library/yql/core/yql_opt_window.cpp b/ydb/library/yql/core/yql_opt_window.cpp
index bee7c930d9..556d893f85 100644
--- a/ydb/library/yql/core/yql_opt_window.cpp
+++ b/ydb/library/yql/core/yql_opt_window.cpp
@@ -387,6 +387,18 @@ TExprNode::TPtr CoalesceQueueOutput(TPositionHandle pos, const TExprNode::TPtr&
.Build();
}
+TExprNode::TPtr WrapWithWinContext(const TExprNode::TPtr& input, TExprContext& ctx) {
+ if (HasContextFuncs(*input)) {
+ return ctx.Builder(input->Pos())
+ .Callable("WithContext")
+ .Add(0, input)
+ .Atom(1, "WinAgg", TNodeFlags::Default)
+ .Seal()
+ .Build();
+ }
+ return input;
+}
+
TExprNode::TPtr BuildInitLambdaForChain1Map(TPositionHandle pos, const TExprNode::TPtr& initStateLambda,
const TExprNode::TPtr& calculateLambda, TExprContext& ctx)
{
@@ -961,6 +973,8 @@ public:
.Seal()
.Build();
+ state = WrapWithWinContext(state, ctx);
+
auto initBody = ctx.Builder(GetPos())
.List()
.Apply(0, calculate)
@@ -996,6 +1010,8 @@ public:
.Seal()
.Build();
+ state = WrapWithWinContext(state, ctx);
+
auto updateBody = ctx.Builder(GetPos())
.List()
.Apply(0, calculate)
@@ -1043,6 +1059,8 @@ public:
.Seal()
.Build();
+ state = WrapWithWinContext(state, ctx);
+
auto initBody = ctx.Builder(GetPos())
.List()
.Apply(0, calculate)
@@ -1121,13 +1139,19 @@ private:
auto originalUpdate = GetUpdateLambda();
auto calculate = GetCalculateLambda();
+ auto fold1 = ctx.Builder(GetPos())
+ .Callable("Fold1")
+ .Add(0, BuildQueueRange(GetPos(), dataQueue, QueueBegin, QueueEnd, rowArg, ctx))
+ .Add(1, ctx.DeepCopyLambda(*originalInit))
+ .Add(2, ctx.DeepCopyLambda(*originalUpdate))
+ .Seal()
+ .Build();
+
+ fold1 = WrapWithWinContext(fold1, ctx);
+
auto output = ctx.Builder(GetPos())
.Callable("Map")
- .Callable(0, "Fold1")
- .Add(0, BuildQueueRange(GetPos(), dataQueue, QueueBegin, QueueEnd, rowArg, ctx))
- .Add(1, ctx.DeepCopyLambda(*originalInit))
- .Add(2, ctx.DeepCopyLambda(*originalUpdate))
- .Seal()
+ .Add(0, fold1)
.Add(1, ctx.DeepCopyLambda(*calculate))
.Seal()
.Build();
@@ -2014,26 +2038,44 @@ TExprNode::TPtr ExpandNonCompactFullFrames(TPositionHandle pos, const TExprNode:
.Build();
}
+ auto partitionByKeysLambda = ctx.Builder(pos)
+ .Lambda()
+ .Param("stream")
+ .Callable("Map")
+ .Callable(0, "Condense1")
+ .Apply(0, preprocessLambda)
+ .With(0, "stream")
+ .Seal()
+ .Add(1, BuildFold1Lambda(pos, frames, EFold1LambdaKind::INIT, keyColumns, ctx))
+ .Add(2, condenseSwitch)
+ .Add(3, BuildFold1Lambda(pos, frames, EFold1LambdaKind::UPDATE, keyColumns, ctx))
+ .Seal()
+ .Add(1, BuildFold1Lambda(pos, frames, EFold1LambdaKind::CALCULATE, keyColumns, ctx))
+ .Seal()
+ .Seal()
+ .Build();
+
+ if (HasContextFuncs(*partitionByKeysLambda)) {
+ partitionByKeysLambda = ctx.Builder(pos)
+ .Lambda()
+ .Param("stream")
+ .Callable("WithContext")
+ .Apply(0, partitionByKeysLambda)
+ .With(0, "stream")
+ .Seal()
+ .Atom(1, "WinAgg", TNodeFlags::Default)
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
auto aggregated = ctx.Builder(pos)
.Callable("PartitionsByKeys")
.Add(0, input)
.Add(1, keySelector)
.Add(2, sortOrder)
.Add(3, sortKey)
- .Lambda(4)
- .Param("stream")
- .Callable("Map")
- .Callable(0, "Condense1")
- .Apply(0, preprocessLambda)
- .With(0, "stream")
- .Seal()
- .Add(1, BuildFold1Lambda(pos, frames, EFold1LambdaKind::INIT, keyColumns, ctx))
- .Add(2, condenseSwitch)
- .Add(3, BuildFold1Lambda(pos, frames, EFold1LambdaKind::UPDATE, keyColumns, ctx))
- .Seal()
- .Add(1, BuildFold1Lambda(pos, frames, EFold1LambdaKind::CALCULATE, keyColumns, ctx))
- .Seal()
- .Seal()
+ .Add(4, partitionByKeysLambda)
.Seal().Build();
if (sessionUpdate) {
@@ -2307,7 +2349,7 @@ TExprNode::TPtr ProcessRowsFrames(TPositionHandle pos, const TExprNode::TPtr& in
.Seal()
.Build();
- return processed;
+ return WrapWithWinContext(processed, ctx);
}
TExprNode::TPtr ProcessRangeFrames(TPositionHandle pos, const TExprNode::TPtr& input, const TExprNode::TPtr& sortKey, const TExprNode::TPtr& frames, TExprContext& ctx) {
@@ -2339,6 +2381,7 @@ TExprNode::TPtr ProcessRangeFrames(TPositionHandle pos, const TExprNode::TPtr& i
.Seal()
.Seal()
.Build();
+ processed = WrapWithWinContext(processed, ctx);
// split rows by groups with equal sortKey
TExprNode::TPtr sortKeyLambda = sortKey;