diff options
author | aneporada <aneporada@yandex-team.ru> | 2022-04-02 01:30:12 +0300 |
---|---|---|
committer | aneporada <aneporada@yandex-team.ru> | 2022-04-02 01:30:12 +0300 |
commit | 70dd205a265bca7ee326e1b524ad8b1c64240c7b (patch) | |
tree | 96c994b66c2ef4c8a71bde2e12ee08249a6aad1e | |
parent | 6754a6a379c0522676071b52818018c46b0a8e58 (diff) | |
download | ydb-70dd205a265bca7ee326e1b524ad8b1c64240c7b.tar.gz |
[YQL-14534] Insert WithContext in window functions
ref:3b759f9a81f889f87c04937418fbb6fb4b4d9373
-rw-r--r-- | ydb/library/yql/core/yql_expr_type_annotation.cpp | 3 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_opt_window.cpp | 83 |
2 files changed, 66 insertions, 20 deletions
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp index 3dfbd35331..ca825f41d4 100644 --- a/ydb/library/yql/core/yql_expr_type_annotation.cpp +++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp @@ -4996,6 +4996,9 @@ bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, bool& convertT bool HasContextFuncs(const TExprNode& input) { bool needCtx = false; VisitExpr(input, [&needCtx](const TExprNode& node) { + if (needCtx || node.IsCallable("WithContext")) { + return false; + } if (node.IsCallable("PgResolvedCallCtx")) { needCtx = true; return false; diff --git a/ydb/library/yql/core/yql_opt_window.cpp b/ydb/library/yql/core/yql_opt_window.cpp index bee7c930d9..556d893f85 100644 --- a/ydb/library/yql/core/yql_opt_window.cpp +++ b/ydb/library/yql/core/yql_opt_window.cpp @@ -387,6 +387,18 @@ TExprNode::TPtr CoalesceQueueOutput(TPositionHandle pos, const TExprNode::TPtr& .Build(); } +TExprNode::TPtr WrapWithWinContext(const TExprNode::TPtr& input, TExprContext& ctx) { + if (HasContextFuncs(*input)) { + return ctx.Builder(input->Pos()) + .Callable("WithContext") + .Add(0, input) + .Atom(1, "WinAgg", TNodeFlags::Default) + .Seal() + .Build(); + } + return input; +} + TExprNode::TPtr BuildInitLambdaForChain1Map(TPositionHandle pos, const TExprNode::TPtr& initStateLambda, const TExprNode::TPtr& calculateLambda, TExprContext& ctx) { @@ -961,6 +973,8 @@ public: .Seal() .Build(); + state = WrapWithWinContext(state, ctx); + auto initBody = ctx.Builder(GetPos()) .List() .Apply(0, calculate) @@ -996,6 +1010,8 @@ public: .Seal() .Build(); + state = WrapWithWinContext(state, ctx); + auto updateBody = ctx.Builder(GetPos()) .List() .Apply(0, calculate) @@ -1043,6 +1059,8 @@ public: .Seal() .Build(); + state = WrapWithWinContext(state, ctx); + auto initBody = ctx.Builder(GetPos()) .List() .Apply(0, calculate) @@ -1121,13 +1139,19 @@ private: auto originalUpdate = GetUpdateLambda(); auto calculate = GetCalculateLambda(); + auto fold1 = ctx.Builder(GetPos()) + .Callable("Fold1") + .Add(0, BuildQueueRange(GetPos(), dataQueue, QueueBegin, QueueEnd, rowArg, ctx)) + .Add(1, ctx.DeepCopyLambda(*originalInit)) + .Add(2, ctx.DeepCopyLambda(*originalUpdate)) + .Seal() + .Build(); + + fold1 = WrapWithWinContext(fold1, ctx); + auto output = ctx.Builder(GetPos()) .Callable("Map") - .Callable(0, "Fold1") - .Add(0, BuildQueueRange(GetPos(), dataQueue, QueueBegin, QueueEnd, rowArg, ctx)) - .Add(1, ctx.DeepCopyLambda(*originalInit)) - .Add(2, ctx.DeepCopyLambda(*originalUpdate)) - .Seal() + .Add(0, fold1) .Add(1, ctx.DeepCopyLambda(*calculate)) .Seal() .Build(); @@ -2014,26 +2038,44 @@ TExprNode::TPtr ExpandNonCompactFullFrames(TPositionHandle pos, const TExprNode: .Build(); } + auto partitionByKeysLambda = ctx.Builder(pos) + .Lambda() + .Param("stream") + .Callable("Map") + .Callable(0, "Condense1") + .Apply(0, preprocessLambda) + .With(0, "stream") + .Seal() + .Add(1, BuildFold1Lambda(pos, frames, EFold1LambdaKind::INIT, keyColumns, ctx)) + .Add(2, condenseSwitch) + .Add(3, BuildFold1Lambda(pos, frames, EFold1LambdaKind::UPDATE, keyColumns, ctx)) + .Seal() + .Add(1, BuildFold1Lambda(pos, frames, EFold1LambdaKind::CALCULATE, keyColumns, ctx)) + .Seal() + .Seal() + .Build(); + + if (HasContextFuncs(*partitionByKeysLambda)) { + partitionByKeysLambda = ctx.Builder(pos) + .Lambda() + .Param("stream") + .Callable("WithContext") + .Apply(0, partitionByKeysLambda) + .With(0, "stream") + .Seal() + .Atom(1, "WinAgg", TNodeFlags::Default) + .Seal() + .Seal() + .Build(); + } + auto aggregated = ctx.Builder(pos) .Callable("PartitionsByKeys") .Add(0, input) .Add(1, keySelector) .Add(2, sortOrder) .Add(3, sortKey) - .Lambda(4) - .Param("stream") - .Callable("Map") - .Callable(0, "Condense1") - .Apply(0, preprocessLambda) - .With(0, "stream") - .Seal() - .Add(1, BuildFold1Lambda(pos, frames, EFold1LambdaKind::INIT, keyColumns, ctx)) - .Add(2, condenseSwitch) - .Add(3, BuildFold1Lambda(pos, frames, EFold1LambdaKind::UPDATE, keyColumns, ctx)) - .Seal() - .Add(1, BuildFold1Lambda(pos, frames, EFold1LambdaKind::CALCULATE, keyColumns, ctx)) - .Seal() - .Seal() + .Add(4, partitionByKeysLambda) .Seal().Build(); if (sessionUpdate) { @@ -2307,7 +2349,7 @@ TExprNode::TPtr ProcessRowsFrames(TPositionHandle pos, const TExprNode::TPtr& in .Seal() .Build(); - return processed; + return WrapWithWinContext(processed, ctx); } TExprNode::TPtr ProcessRangeFrames(TPositionHandle pos, const TExprNode::TPtr& input, const TExprNode::TPtr& sortKey, const TExprNode::TPtr& frames, TExprContext& ctx) { @@ -2339,6 +2381,7 @@ TExprNode::TPtr ProcessRangeFrames(TPositionHandle pos, const TExprNode::TPtr& i .Seal() .Seal() .Build(); + processed = WrapWithWinContext(processed, ctx); // split rows by groups with equal sortKey TExprNode::TPtr sortKeyLambda = sortKey; |