diff options
author | aneporada <aneporada@yandex-team.ru> | 2022-04-08 20:18:35 +0300 |
---|---|---|
committer | aneporada <aneporada@yandex-team.ru> | 2022-04-08 20:18:35 +0300 |
commit | 396f7364e550cf4c90d8dfa7f06d6388ab8c8a23 (patch) | |
tree | 6813018cf3bdb0456233eb06ba8b3fa5431998a5 | |
parent | 47cc8de3deeb1d66d6bff11b9367ff7aefde9943 (diff) | |
download | ydb-396f7364e550cf4c90d8dfa7f06d6388ab8c8a23.tar.gz |
[YQL-10265] Fix handling of mixed WinOnRows/WinOnRange frames
ref:77b70e55080d7fccf904c68c4533307a95e07fe5
-rw-r--r-- | ydb/library/yql/core/yql_opt_window.cpp | 30 |
1 files changed, 27 insertions, 3 deletions
diff --git a/ydb/library/yql/core/yql_opt_window.cpp b/ydb/library/yql/core/yql_opt_window.cpp index 229b6d3fa51..d40033fe837 100644 --- a/ydb/library/yql/core/yql_opt_window.cpp +++ b/ydb/library/yql/core/yql_opt_window.cpp @@ -2304,6 +2304,25 @@ void SplitFramesByType(const TExprNode::TPtr& frames, TExprNode::TPtr& rowFrames groupFrames = ctx.NewList(frames->Pos(), std::move(groups)); } +const TStructExprType* ApplyFramesToType(const TStructExprType& inputType, const TStructExprType& finalOutputType, const TExprNode& frames, TExprContext& ctx) { + TVector<const TItemExprType*> resultItems = inputType.GetItems(); + for (auto& frame : frames.ChildrenList()) { + YQL_ENSURE(TCoWinOnBase::Match(frame.Get())); + for (size_t i = 1; i < frame->ChildrenSize(); ++i) { + YQL_ENSURE(frame->Child(i)->IsList()); + YQL_ENSURE(frame->Child(i)->Head().IsAtom()); + TStringBuf column = frame->Child(i)->Head().Content(); + + const TTypeAnnotationNode* type = finalOutputType.FindItemType(column); + YQL_ENSURE(type); + + resultItems.push_back(ctx.MakeType<TItemExprType>(column, type)); + } + } + + return ctx.MakeType<TStructExprType>(resultItems); +} + TExprNode::TPtr ProcessRowsFrames(TPositionHandle pos, const TExprNode::TPtr& input, const TStructExprType& rowType, const TExprNode::TPtr& dependsOn, const TExprNode::TPtr& frames, TExprContext& ctx) { @@ -2573,7 +2592,7 @@ TExprNode::TPtr ProcessRangeFrames(TPositionHandle pos, const TExprNode::TPtr& i TExprNode::TPtr ExpandSingleCalcOverWindow(TPositionHandle pos, const TExprNode::TPtr& inputList, const TExprNode::TPtr& keyColumns, const TExprNode::TPtr& sortTraits, const TExprNode::TPtr& frames, const TExprNode::TPtr& sessionTraits, - const TExprNode::TPtr& sessionColumns, TExprContext& ctx) + const TExprNode::TPtr& sessionColumns, const TStructExprType& outputRowType, TExprContext& ctx) { if (auto expanded = TryExpandNonCompactFullFrames(pos, inputList, keyColumns, sortTraits, frames, sessionTraits, sessionColumns, ctx)) { YQL_CLOG(INFO, Core) << "Expanded non-compact CalcOverWindow"; @@ -2595,7 +2614,7 @@ TExprNode::TPtr ExpandSingleCalcOverWindow(TPositionHandle pos, const TExprNode: rowItems.push_back(ctx.MakeType<TItemExprType>(SessionStartMemberName, sessionKeyType)); rowItems.push_back(ctx.MakeType<TItemExprType>(SessionParamsMemberName, sessionParamsType)); } - const auto rowType = ctx.MakeType<TStructExprType>(rowItems); + auto rowType = ctx.MakeType<TStructExprType>(rowItems); auto keySelector = BuildKeySelector(pos, *rowType->Cast<TStructExprType>(), keyColumns, ctx); @@ -2647,7 +2666,11 @@ TExprNode::TPtr ExpandSingleCalcOverWindow(TPositionHandle pos, const TExprNode: auto topLevelStreamArg = ctx.NewArgument(pos, "stream"); TExprNode::TPtr processed = topLevelStreamArg; + // All RANGE frames (even simplest RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) + // will require additional memory to store TableRow()'s - so we want to start with minimum size of row + // (i.e. process range frames first) processed = ProcessRangeFrames(pos, processed, originalSortKey, rangeFrames, ctx); + rowType = ApplyFramesToType(*rowType, outputRowType, *rangeFrames, ctx); processed = ProcessRowsFrames(pos, processed, *rowType, topLevelStreamArg, rowsFrames, ctx); auto topLevelStreamProcessingLambda = ctx.NewLambda(pos, ctx.NewArguments(pos, {topLevelStreamArg}), std::move(processed)); @@ -2670,8 +2693,9 @@ TExprNode::TPtr ExpandCalcOverWindow(const TExprNode::TPtr& node, TExprContext& TCoCalcOverWindowTuple calc(calcs.front()); if (calc.Frames().Size() != 0 || calc.SessionColumns().Size() != 0) { + const TStructExprType& outputRowType = *node->GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); input = ExpandSingleCalcOverWindow(node->Pos(), input, calc.Keys().Ptr(), calc.SortSpec().Ptr(), calc.Frames().Ptr(), - calc.SessionSpec().Ptr(), calc.SessionColumns().Ptr(), ctx); + calc.SessionSpec().Ptr(), calc.SessionColumns().Ptr(), outputRowType, ctx); } calcs.erase(calcs.begin()); |