diff options
| author | aneporada <[email protected]> | 2022-04-01 00:39:21 +0300 |
|---|---|---|
| committer | aneporada <[email protected]> | 2022-04-01 00:39:21 +0300 |
| commit | bfeb38a1f3f0c4d3ed6903569421cecde06885ce (patch) | |
| tree | 5471ba1e1f1ed9de876cc51b2533d49328e76134 | |
| parent | 62c1c476044dc1568ba47e2bb8e6e491fb13ed1d (diff) | |
[YQL-10265] Support AnsiCurrentRow
ref:a6488a9cd15defe97e18715d7cc7590582d6ab5f
| -rw-r--r-- | ydb/library/yql/core/common_opt/yql_co_simple1.cpp | 45 | ||||
| -rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_list.cpp | 27 | ||||
| -rw-r--r-- | ydb/library/yql/core/yql_opt_window.cpp | 382 |
3 files changed, 360 insertions, 94 deletions
diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp index f59b3859c09..9ad79d93664 100644 --- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp @@ -2835,43 +2835,52 @@ TExprNodeList DeduplicateAndSplitTupleCollectionByTypes(const TExprNode &collect TExprNode::TPtr MergeCalcOverWindowFrames(const TExprNode::TPtr& frames, TExprContext& ctx) { YQL_ENSURE(frames->IsList()); - - TNodeMap<size_t> uniqueFrameIndexes; - struct TWinOnRowsContent { + struct TWinOnContent { TExprNodeList Args; TPositionHandle Pos; }; - TVector<TWinOnRowsContent> winOnRows; + struct TMergedFrames { + TNodeMap<size_t> UniqIndexes; + TVector<TWinOnContent> Frames; + }; + + TMap<TStringBuf, TMergedFrames> mergeMap; + size_t uniqFrameSpecs = 0; for (auto& winOn: frames->Children()) { - YQL_ENSURE(winOn->IsCallable("WinOnRows")); + YQL_ENSURE(TCoWinOnBase::Match(winOn.Get())); if (winOn->ChildrenSize() == 1) { // skip empty frames continue; } + + TMergedFrames& merged = mergeMap[winOn->Content()]; + auto args = winOn->ChildrenList(); auto frameSpec = winOn->Child(0); - auto frameIt = uniqueFrameIndexes.find(frameSpec); - if (frameIt == uniqueFrameIndexes.end()) { - YQL_ENSURE(uniqueFrameIndexes.size() == winOnRows.size()); - uniqueFrameIndexes[frameSpec] = winOnRows.size(); - - TWinOnRowsContent content{std::move(args), winOn->Pos()}; - winOnRows.emplace_back(std::move(content)); + auto frameIt = merged.UniqIndexes.find(frameSpec); + if (frameIt == merged.UniqIndexes.end()) { + YQL_ENSURE(merged.UniqIndexes.size() == merged.Frames.size()); + merged.UniqIndexes[frameSpec] = merged.Frames.size(); + TWinOnContent content{std::move(args), winOn->Pos()}; + merged.Frames.emplace_back(std::move(content)); + ++uniqFrameSpecs; } else { - auto& combined = winOnRows[frameIt->second]; + auto& combined = merged.Frames[frameIt->second]; combined.Args.insert(combined.Args.end(), args.begin() + 1, args.end()); } } - if (uniqueFrameIndexes.size() != frames->ChildrenSize()) { - TExprNodeList winOnRowsNodes; - for (auto &item : winOnRows) { - winOnRowsNodes.emplace_back(ctx.NewCallable(item.Pos, "WinOnRows", std::move(item.Args))); + if (uniqFrameSpecs != frames->ChildrenSize()) { + TExprNodeList newFrames; + for (auto& [name, merged] : mergeMap) { + for (auto& item : merged.Frames) { + newFrames.emplace_back(ctx.NewCallable(item.Pos, name, std::move(item.Args))); + } } - return ctx.NewList(frames->Pos(), std::move(winOnRowsNodes)); + return ctx.NewList(frames->Pos(), std::move(newFrames)); } return frames; diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp index a4f2c5b6fdc..55f177b44dc 100644 --- a/ydb/library/yql/core/type_ann/type_ann_list.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp @@ -4940,16 +4940,6 @@ namespace { } IGraphTransformer::TStatus WinOnWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { - if (input->IsCallable("WinOnGroups")) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "GROUPS in frame specification are not supported yet")); - return IGraphTransformer::TStatus::Error; - } - if (input->IsCallable("WinOnRange")) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "RANGE in frame specification is not supported yet")); - return IGraphTransformer::TStatus::Error; - } - YQL_ENSURE(input->IsCallable("WinOnRows")); - if (!EnsureMinArgsCount(*input, 1, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } @@ -4968,10 +4958,25 @@ namespace { } } - if (!TWindowFrameSettings::TryParse(*input, ctx.Expr)) { + auto frameSettings = TWindowFrameSettings::TryParse(*input, ctx.Expr); + if (!frameSettings) { + return IGraphTransformer::TStatus::Error; + } + + auto frameType = frameSettings->GetFrameType(); + if (frameType == EFrameType::FrameByGroups) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "GROUPS in frame specification are not supported yet")); return IGraphTransformer::TStatus::Error; } + if (frameType == EFrameType::FrameByRange) { + // only UNBOUNDED PRECEDING -> CURRENT ROW is currently supported + if (!(IsUnbounded(frameSettings->GetFirst()) && IsCurrentRow(frameSettings->GetLast()))) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "RANGE in frame specification is not supported yet")); + return IGraphTransformer::TStatus::Error; + } + } + auto status = NormalizeKeyValueTuples(input, 1, output, ctx.Expr); if (status != IGraphTransformer::TStatus::Ok) { return status; diff --git a/ydb/library/yql/core/yql_opt_window.cpp b/ydb/library/yql/core/yql_opt_window.cpp index acbfe1bbdeb..bee7c930d9d 100644 --- a/ydb/library/yql/core/yql_opt_window.cpp +++ b/ydb/library/yql/core/yql_opt_window.cpp @@ -13,7 +13,7 @@ namespace { const TStringBuf SessionStartMemberName = "_yql_window_session_start"; const TStringBuf SessionParamsMemberName = "_yql_window_session_params"; -enum class EFrameType : ui8 { +enum class EFrameBoundsType : ui8 { EMPTY, LAGGING, CURRENT, @@ -22,26 +22,26 @@ enum class EFrameType : ui8 { GENERIC, }; -EFrameType FrameType(const TWindowFrameSettings& settings) { +EFrameBoundsType FrameBoundsType(const TWindowFrameSettings& settings) { auto first = settings.GetFirstOffset(); auto last = settings.GetLastOffset(); if (first.Defined() && last.Defined() && first > last) { - return EFrameType::EMPTY; + return EFrameBoundsType::EMPTY; } if (!first.Defined()) { if (!last.Defined()) { - return EFrameType::FULL; + return EFrameBoundsType::FULL; } if (*last < 0) { - return EFrameType::LAGGING; + return EFrameBoundsType::LAGGING; } - return *last > 0 ? EFrameType::LEADING : EFrameType::CURRENT; + return *last > 0 ? EFrameBoundsType::LEADING : EFrameBoundsType::CURRENT; } - return EFrameType::GENERIC; + return EFrameBoundsType::GENERIC; } TExprNode::TPtr ReplaceLastLambdaArgWithStringLiteral(const TExprNode& lambda, TStringBuf literal, TExprContext& ctx) { @@ -166,20 +166,28 @@ TCalcOverWindowTraits ExtractCalcOverWindowTraits(const TExprNode::TPtr& frames, ui64 frameOutpace = 0; ui64 frameLag = 0; - const EFrameType frameType = FrameType(frameSettings); - const auto frameFirst = frameSettings.GetFirstOffset(); - const auto frameLast = frameSettings.GetLastOffset(); - if (frameType != EFrameType::EMPTY) { - if (!frameLast.Defined() || *frameLast > 0) { - frameOutpace = frameLast.Defined() ? ui64(*frameLast) : Max<ui64>(); - } + const EFrameType ft = frameSettings.GetFrameType(); + if (ft == EFrameType::FrameByRows) { + const EFrameBoundsType frameType = FrameBoundsType(frameSettings); + const auto frameFirst = frameSettings.GetFirstOffset(); + const auto frameLast = frameSettings.GetLastOffset(); + + if (frameType != EFrameBoundsType::EMPTY) { + if (!frameLast.Defined() || *frameLast > 0) { + frameOutpace = frameLast.Defined() ? ui64(*frameLast) : Max<ui64>(); + } - if (frameFirst.Defined() && *frameFirst < 0) { - frameLag = ui64(0 - *frameFirst); + if (frameFirst.Defined() && *frameFirst < 0) { + frameLag = ui64(0 - *frameFirst); + } } + } else { + // The only frame we currently support + YQL_ENSURE(ft == EFrameType::FrameByRange); + YQL_ENSURE(IsUnbounded(frameSettings.GetFirst())); + YQL_ENSURE(IsCurrentRow(frameSettings.GetLast())); } - const auto& winOnChildren = winOn->ChildrenList(); YQL_ENSURE(winOnChildren.size() > 1); for (size_t i = 1; i < winOnChildren.size(); ++i) { @@ -227,11 +235,16 @@ TCalcOverWindowTraits ExtractCalcOverWindowTraits(const TExprNode::TPtr& frames, rawTraits.CalculateLambda = calculateLambda; rawTraits.DefaultValue = traits->ChildPtr(5); - if (frameType == EFrameType::LAGGING) { - maxUnboundedPrecedingLag = Max(maxUnboundedPrecedingLag, ui64(abs(*frameLast))); - lagQueueStructItems.push_back(ctx.MakeType<TItemExprType>(name, rawTraits.OutputType)); + if (ft == EFrameType::FrameByRows) { + const EFrameBoundsType frameType = FrameBoundsType(frameSettings); + const auto frameLast = frameSettings.GetLastOffset(); + if (frameType == EFrameBoundsType::LAGGING) { + maxUnboundedPrecedingLag = Max(maxUnboundedPrecedingLag, ui64(abs(*frameLast))); + lagQueueStructItems.push_back(ctx.MakeType<TItemExprType>(name, rawTraits.OutputType)); + } } } else if (traits->IsCallable({"Lead", "Lag"})) { + YQL_ENSURE(ft == EFrameType::FrameByRows); i64 lead = 1; if (traits->ChildrenSize() == 3) { YQL_ENSURE(traits->Child(2)->IsCallable("Int64")); @@ -1278,9 +1291,15 @@ TVector<TChain1MapTraits::TPtr> BuildFoldMapTraits(TQueueParams& queueParams, co continue; } - switch(FrameType(trait.FrameSettings)) { - case EFrameType::CURRENT: - case EFrameType::LAGGING: { + if (trait.FrameSettings.GetFrameType() == EFrameType::FrameByRange) { + result.push_back(new TChain1MapTraitsCurrentOrLagging(name, trait, {})); + continue; + } + + YQL_ENSURE(trait.FrameSettings.GetFrameType() == EFrameType::FrameByRows); + switch(FrameBoundsType(trait.FrameSettings)) { + case EFrameBoundsType::CURRENT: + case EFrameBoundsType::LAGGING: { TMaybe<ui64> lagQueueIndex; auto end = *trait.FrameSettings.GetLastOffset(); YQL_ENSURE(end <= 0); @@ -1292,18 +1311,18 @@ TVector<TChain1MapTraits::TPtr> BuildFoldMapTraits(TQueueParams& queueParams, co result.push_back(new TChain1MapTraitsCurrentOrLagging(name, trait, lagQueueIndex)); break; } - case EFrameType::LEADING: { + case EFrameBoundsType::LEADING: { auto end = *trait.FrameSettings.GetLastOffset(); YQL_ENSURE(end > 0); ui64 lastRowIndex = currentRowIndex + ui64(end); result.push_back(new TChain1MapTraitsLeading(name, trait, currentRowIndex, lastRowIndex)); break; } - case EFrameType::FULL: { + case EFrameBoundsType::FULL: { result.push_back(new TChain1MapTraitsFull(name, trait, currentRowIndex)); break; } - case EFrameType::GENERIC: { + case EFrameBoundsType::GENERIC: { queueParams.DataQueueNeeded = true; auto first = trait.FrameSettings.GetFirstOffset(); auto last = trait.FrameSettings.GetLastOffset(); @@ -1313,7 +1332,7 @@ TVector<TChain1MapTraits::TPtr> BuildFoldMapTraits(TQueueParams& queueParams, co result.push_back(new TChain1MapTraitsGeneric(name, trait, beginIndex, endIndex)); break; } - case EFrameType::EMPTY: { + case EFrameBoundsType::EMPTY: { result.push_back(new TChain1MapTraitsEmpty(name, trait)); break; } @@ -1561,11 +1580,10 @@ TExprNode::TPtr BuildChain1MapUpdateLambda(TPositionHandle pos, const TVector<TC return ctx.NewLambda(pos, ctx.NewArguments(pos, {rowArg, stateArg}), std::move(finalBody)); } -// TODO: fixme bool IsNonCompactFullFrame(const TExprNode& winOnRows, TExprContext& ctx) { - YQL_ENSURE(winOnRows.IsCallable("WinOnRows")); TWindowFrameSettings frameSettings = TWindowFrameSettings::Parse(winOnRows, ctx); - return !frameSettings.IsCompact() && !frameSettings.GetFirstOffset().Defined() && !frameSettings.GetLastOffset().Defined(); + return frameSettings.GetFrameType() == FrameByRows && + !frameSettings.IsCompact() && !frameSettings.GetFirstOffset().Defined() && !frameSettings.GetLastOffset().Defined(); } TExprNode::TPtr DeduceCompatibleSort(const TExprNode::TPtr& traitsOne, const TExprNode::TPtr& traitsTwo) { @@ -2233,6 +2251,241 @@ TExprNode::TPtr TryExpandNonCompactFullFrames(TPositionHandle pos, const TExprNo .Done().Ptr(); } +void SplitFramesByType(const TExprNode::TPtr& frames, TExprNode::TPtr& rowFrames, TExprNode::TPtr& rangeFrames, TExprNode::TPtr& groupFrames, TExprContext& ctx) { + TExprNodeList rows; + TExprNodeList range; + TExprNodeList groups; + for (auto& winOn : frames->ChildrenList()) { + if (TCoWinOnRows::Match(winOn.Get())) { + rows.push_back(std::move(winOn)); + } else if (TCoWinOnRange::Match(winOn.Get())) { + range.push_back(std::move(winOn)); + } else { + YQL_ENSURE(TCoWinOnGroups::Match(winOn.Get())); + groups.push_back(std::move(winOn)); + } + } + + rowFrames = ctx.NewList(frames->Pos(), std::move(rows)); + rangeFrames = ctx.NewList(frames->Pos(), std::move(range)); + groupFrames = ctx.NewList(frames->Pos(), std::move(groups)); +} + +TExprNode::TPtr ProcessRowsFrames(TPositionHandle pos, const TExprNode::TPtr& input, const TStructExprType& rowType, const TExprNode::TPtr& dependsOn, + const TExprNode::TPtr& frames, TExprContext& ctx) +{ + TExprNode::TPtr processed = input; + TExprNode::TPtr dataQueue; + TQueueParams queueParams; + TVector<TChain1MapTraits::TPtr> traits = BuildFoldMapTraits(queueParams, frames, ctx); + if (queueParams.DataQueueNeeded) { + ui64 queueSize = (queueParams.DataOutpace == Max<ui64>()) ? Max<ui64>() : (queueParams.DataOutpace + queueParams.DataLag + 2); + dataQueue = BuildQueue(pos, rowType, queueSize, queueParams.DataLag, dependsOn, ctx); + processed = ctx.Builder(pos) + .Callable("PreserveStream") + .Add(0, processed) + .Add(1, dataQueue) + .Add(2, BuildUint64(pos, queueParams.DataOutpace, ctx)) + .Seal() + .Build(); + } + + processed = ctx.Builder(pos) + .Callable("OrderedMap") + .Callable(0, "Chain1Map") + .Add(0, std::move(processed)) + .Add(1, BuildChain1MapInitLambda(pos, traits, dataQueue, queueParams.LagQueueSize, queueParams.LagQueueItemType, ctx)) + .Add(2, BuildChain1MapUpdateLambda(pos, traits, dataQueue, queueParams.LagQueueSize != 0, ctx)) + .Seal() + .Lambda(1) + .Param("pair") + .Callable("Nth") + .Arg(0, "pair") + .Atom(1, "0", TNodeFlags::Default) + .Seal() + .Seal() + .Seal() + .Build(); + + return processed; +} + +TExprNode::TPtr ProcessRangeFrames(TPositionHandle pos, const TExprNode::TPtr& input, const TExprNode::TPtr& sortKey, const TExprNode::TPtr& frames, TExprContext& ctx) { + if (frames->ChildrenSize() == 0) { + return input; + } + + TExprNode::TPtr processed = input; + TQueueParams queueParams; + TVector<TChain1MapTraits::TPtr> traits = BuildFoldMapTraits(queueParams, frames, ctx); + YQL_ENSURE(!queueParams.DataQueueNeeded); + YQL_ENSURE(queueParams.LagQueueSize == 0); + YQL_ENSURE(queueParams.LagQueueItemType == nullptr); + + // same processing as in WinOnRows + processed = ctx.Builder(pos) + .Callable("OrderedMap") + .Callable(0, "Chain1Map") + .Add(0, std::move(processed)) + .Add(1, BuildChain1MapInitLambda(pos, traits, nullptr, 0, nullptr, ctx)) + .Add(2, BuildChain1MapUpdateLambda(pos, traits, nullptr, false, ctx)) + .Seal() + .Lambda(1) + .Param("pair") + .Callable("Nth") + .Arg(0, "pair") + .Atom(1, "0", TNodeFlags::Default) + .Seal() + .Seal() + .Seal() + .Build(); + + // split rows by groups with equal sortKey + TExprNode::TPtr sortKeyLambda = sortKey; + if (sortKey->IsCallable("Void")) { + sortKeyLambda = ctx.Builder(sortKey->Pos()) + .Lambda() + .Param("row") + .Callable("Void") + .Seal() + .Seal() + .Build(); + } + + processed = ctx.Builder(pos) + .Callable("Condense1") + .Add(0, processed) + .Lambda(1) + .Param("row") + .List() + .Apply(0, sortKeyLambda) + .With(0, "row") + .Seal() + .Callable(1, "AsList") + .Arg(0, "row") + .Seal() + .Seal() + .Seal() + .Lambda(2) + .Param("row") + .Param("state") + .Callable(0, "AggrNotEquals") + .Apply(0, sortKeyLambda) + .With(0, "row") + .Seal() + .Callable(1, "Nth") + .Arg(0, "state") + .Atom(1, "0", TNodeFlags::Default) + .Seal() + .Seal() + .Seal() + .Lambda(3) + .Param("row") + .Param("state") + .List() + .Callable(0, "Nth") + .Arg(0, "state") + .Atom(1, "0", TNodeFlags::Default) + .Seal() + .Callable(1, "Insert") + .Callable(0, "Nth") + .Arg(0, "state") + .Atom(1, "1", TNodeFlags::Default) + .Seal() + .Arg(1, "row") + .Seal() + .Seal() + .Seal() + .Seal() + .Build(); + + processed = ctx.Builder(pos) + .Callable("OrderedMap") + .Add(0, processed) + .Lambda(1) + .Param("item") + .Callable(0, "Nth") + .Arg(0, "item") + .Atom(1, "1", TNodeFlags::Default) + .Seal() + .Seal() + .Seal() + .Build(); + + auto lastRowArg = ctx.NewArgument(pos, "lastRow"); + auto currentRowArg = ctx.NewArgument(pos, "currentRow"); + auto currentRow = currentRowArg; + + for (auto& trait : traits) { + TStringBuf name = trait->GetName(); + currentRow = ctx.Builder(pos) + .Callable("AddMember") + .Callable(0, "RemoveMember") + .Add(0, currentRow) + .Atom(1, name) + .Seal() + .Atom(1, name) + .Callable(2, "Member") + .Add(0, lastRowArg) + .Atom(1, name) + .Seal() + .Seal() + .Build(); + } + + auto overwriteWithLastRowLambda = ctx.NewLambda(pos, ctx.NewArguments(pos, { currentRowArg, lastRowArg }), std::move(currentRow)); + + // processed is currently stream of groups (=lists) with equal sort keys + processed = ctx.Builder(pos) + .Callable("OrderedFlatMap") + .Add(0, processed) + .Lambda(1) + .Param("group") + .Callable("If") + .Callable(0, "<=") + .Callable(0, "Length") + .Arg(0, "group") + .Seal() + .Callable(1, "Uint64") + .Atom(0, "1", TNodeFlags::Default) + .Seal() + .Seal() + .Arg(1, "group") + .Callable(2, "Map") + .Callable(0, "Last") + .Arg(0, "group") + .Seal() + .Lambda(1) + .Param("lastRow") + .Callable("OrderedMap") + .Arg(0, "group") + .Lambda(1) + .Param("currentRow") + .Apply(overwriteWithLastRowLambda) + .With(0, "currentRow") + .With(1, "lastRow") + .Seal() + .Seal() + .Seal() + .Seal() + .Seal() + .Seal() + .Seal() + .Seal() + .Build(); + + processed = ctx.Builder(pos) + .Callable("OrderedFlatMap") + .Add(0, processed) + .Lambda(1) + .Param("group") + .Arg("group") + .Seal() + .Seal() + .Build(); + return processed; +} + TExprNode::TPtr ExpandSingleCalcOverWindow(TPositionHandle pos, const TExprNode::TPtr& inputList, const TExprNode::TPtr& keyColumns, const TExprNode::TPtr& sortTraits, const TExprNode::TPtr& frames, const TExprNode::TPtr& sessionTraits, const TExprNode::TPtr& sessionColumns, TExprContext& ctx) @@ -2242,9 +2495,6 @@ TExprNode::TPtr ExpandSingleCalcOverWindow(TPositionHandle pos, const TExprNode: return expanded; } - TQueueParams queueParams; - TVector<TChain1MapTraits::TPtr> traits = BuildFoldMapTraits(queueParams, frames, ctx); - TExprNode::TPtr sessionKey; TExprNode::TPtr sessionSortTraits; const TTypeAnnotationNode* sessionKeyType = nullptr; @@ -2266,6 +2516,8 @@ TExprNode::TPtr ExpandSingleCalcOverWindow(TPositionHandle pos, const TExprNode: TExprNode::TPtr sortKey; TExprNode::TPtr sortOrder; + ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx); + const TExprNode::TPtr originalSortKey = sortKey; TExprNode::TPtr input = inputList; const auto commonSortTraits = DeduceCompatibleSort(sortTraits, sessionSortTraits); @@ -2301,39 +2553,17 @@ TExprNode::TPtr ExpandSingleCalcOverWindow(TPositionHandle pos, const TExprNode: sessionKey = sessionInit = sessionUpdate = {}; } - auto topLevelStreamArg = ctx.NewArgument(pos, "stream"); + TExprNode::TPtr rowsFrames; + TExprNode::TPtr rangeFrames; + TExprNode::TPtr groupsFrames; + SplitFramesByType(frames, rowsFrames, rangeFrames, groupsFrames, ctx); + YQL_ENSURE(groupsFrames->ChildrenSize() == 0); + auto topLevelStreamArg = ctx.NewArgument(pos, "stream"); TExprNode::TPtr processed = topLevelStreamArg; - TExprNode::TPtr dataQueue; - if (queueParams.DataQueueNeeded) { - ui64 queueSize = (queueParams.DataOutpace == Max<ui64>()) ? Max<ui64>() : (queueParams.DataOutpace + queueParams.DataLag + 2); - dataQueue = BuildQueue(pos, *rowType, queueSize, queueParams.DataLag, topLevelStreamArg, ctx); - processed = ctx.Builder(pos) - .Callable("PreserveStream") - .Add(0, topLevelStreamArg) - .Add(1, dataQueue) - .Add(2, BuildUint64(pos, queueParams.DataOutpace, ctx)) - .Seal() - .Build(); - } - - processed = ctx.Builder(pos) - .Callable("OrderedMap") - .Callable(0, "Chain1Map") - .Add(0, std::move(processed)) - .Add(1, BuildChain1MapInitLambda(pos, traits, dataQueue, queueParams.LagQueueSize, queueParams.LagQueueItemType, ctx)) - .Add(2, BuildChain1MapUpdateLambda(pos, traits, dataQueue, queueParams.LagQueueSize != 0, ctx)) - .Seal() - .Lambda(1) - .Param("pair") - .Callable("Nth") - .Arg(0, "pair") - .Atom(1, "0", TNodeFlags::Default) - .Seal() - .Seal() - .Seal() - .Build(); + processed = ProcessRangeFrames(pos, processed, originalSortKey, rangeFrames, ctx); + processed = ProcessRowsFrames(pos, processed, *rowType, topLevelStreamArg, rowsFrames, ctx); auto topLevelStreamProcessingLambda = ctx.NewLambda(pos, ctx.NewArguments(pos, {topLevelStreamArg}), std::move(processed)); @@ -2506,6 +2736,20 @@ TExprNode::TPtr RebuildCalcOverWindowGroup(TPositionHandle pos, const TExprNode: .Done().Ptr(); } +bool IsUnbounded(const NNodes::TCoFrameBound& bound) { + if (bound.Ref().ChildrenSize() < 2) { + return false; + } + if (auto maybeAtom = bound.Bound().Maybe<TCoAtom>()) { + return maybeAtom.Cast().Value() == "unbounded"; + } + return false; +} + +bool IsCurrentRow(const NNodes::TCoFrameBound& bound) { + return bound.Setting().Value() == "currentRow"; +} + TWindowFrameSettings TWindowFrameSettings::Parse(const TExprNode& node, TExprContext& ctx) { auto maybeSettings = TryParse(node, ctx); YQL_ENSURE(maybeSettings); @@ -2515,7 +2759,14 @@ TWindowFrameSettings TWindowFrameSettings::Parse(const TExprNode& node, TExprCon TMaybe<TWindowFrameSettings> TWindowFrameSettings::TryParse(const TExprNode& node, TExprContext& ctx) { TWindowFrameSettings settings; - YQL_ENSURE(node.IsCallable({"WinOnRows", "WinOnRange", "WinOnGroups"})); + if (node.IsCallable("WinOnRows")) { + settings.Type = EFrameType::FrameByRows; + } else if (node.IsCallable("WinOnRange")) { + settings.Type = EFrameType::FrameByRange; + } else { + YQL_ENSURE(node.IsCallable("WinOnGroups")); + settings.Type = EFrameType::FrameByGroups; + } auto frameSpec = node.Child(0); if (frameSpec->Type() == TExprNode::List) { bool hasBegin = false; @@ -2553,6 +2804,7 @@ TMaybe<TWindowFrameSettings> TWindowFrameSettings::TryParse(const TExprNode& nod return {}; } + hasBound = true; TMaybe<i32>& boundOffset = (settingName == "begin") ? settings.FirstOffset : settings.LastOffset; TExprNode::TPtr& frameBound = (settingName == "begin") ? settings.First : settings.Last; @@ -2591,6 +2843,7 @@ TMaybe<TWindowFrameSettings> TWindowFrameSettings::TryParse(const TExprNode& nod auto boundValue = fb->ChildPtr(1); if (boundValue->IsAtom()) { if (boundValue->Content() == "unbounded") { + frameBound = fb; continue; } ctx.AddError(TIssue(ctx.GetPosition(fb->Pos()), TStringBuilder() << "Expecting unbounded, but got '" << boundValue->Content() << "'")); @@ -2663,7 +2916,6 @@ TMaybe<TWindowFrameSettings> TWindowFrameSettings::TryParse(const TExprNode& nod TStringBuilder() << "Invalid " << settingName << " frame bound - expecting Void or Int32 callable, but got: " << errMsg)); return {}; } - hasBound = true; } if (!hasBegin || !hasEnd) { |
