summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <[email protected]>2022-04-01 00:39:21 +0300
committeraneporada <[email protected]>2022-04-01 00:39:21 +0300
commitbfeb38a1f3f0c4d3ed6903569421cecde06885ce (patch)
tree5471ba1e1f1ed9de876cc51b2533d49328e76134
parent62c1c476044dc1568ba47e2bb8e6e491fb13ed1d (diff)
[YQL-10265] Support AnsiCurrentRow
ref:a6488a9cd15defe97e18715d7cc7590582d6ab5f
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_simple1.cpp45
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_list.cpp27
-rw-r--r--ydb/library/yql/core/yql_opt_window.cpp382
3 files changed, 360 insertions, 94 deletions
diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp
index f59b3859c09..9ad79d93664 100644
--- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp
@@ -2835,43 +2835,52 @@ TExprNodeList DeduplicateAndSplitTupleCollectionByTypes(const TExprNode &collect
TExprNode::TPtr MergeCalcOverWindowFrames(const TExprNode::TPtr& frames, TExprContext& ctx) {
YQL_ENSURE(frames->IsList());
-
- TNodeMap<size_t> uniqueFrameIndexes;
- struct TWinOnRowsContent {
+ struct TWinOnContent {
TExprNodeList Args;
TPositionHandle Pos;
};
- TVector<TWinOnRowsContent> winOnRows;
+ struct TMergedFrames {
+ TNodeMap<size_t> UniqIndexes;
+ TVector<TWinOnContent> Frames;
+ };
+
+ TMap<TStringBuf, TMergedFrames> mergeMap;
+ size_t uniqFrameSpecs = 0;
for (auto& winOn: frames->Children()) {
- YQL_ENSURE(winOn->IsCallable("WinOnRows"));
+ YQL_ENSURE(TCoWinOnBase::Match(winOn.Get()));
if (winOn->ChildrenSize() == 1) {
// skip empty frames
continue;
}
+
+ TMergedFrames& merged = mergeMap[winOn->Content()];
+
auto args = winOn->ChildrenList();
auto frameSpec = winOn->Child(0);
- auto frameIt = uniqueFrameIndexes.find(frameSpec);
- if (frameIt == uniqueFrameIndexes.end()) {
- YQL_ENSURE(uniqueFrameIndexes.size() == winOnRows.size());
- uniqueFrameIndexes[frameSpec] = winOnRows.size();
-
- TWinOnRowsContent content{std::move(args), winOn->Pos()};
- winOnRows.emplace_back(std::move(content));
+ auto frameIt = merged.UniqIndexes.find(frameSpec);
+ if (frameIt == merged.UniqIndexes.end()) {
+ YQL_ENSURE(merged.UniqIndexes.size() == merged.Frames.size());
+ merged.UniqIndexes[frameSpec] = merged.Frames.size();
+ TWinOnContent content{std::move(args), winOn->Pos()};
+ merged.Frames.emplace_back(std::move(content));
+ ++uniqFrameSpecs;
} else {
- auto& combined = winOnRows[frameIt->second];
+ auto& combined = merged.Frames[frameIt->second];
combined.Args.insert(combined.Args.end(), args.begin() + 1, args.end());
}
}
- if (uniqueFrameIndexes.size() != frames->ChildrenSize()) {
- TExprNodeList winOnRowsNodes;
- for (auto &item : winOnRows) {
- winOnRowsNodes.emplace_back(ctx.NewCallable(item.Pos, "WinOnRows", std::move(item.Args)));
+ if (uniqFrameSpecs != frames->ChildrenSize()) {
+ TExprNodeList newFrames;
+ for (auto& [name, merged] : mergeMap) {
+ for (auto& item : merged.Frames) {
+ newFrames.emplace_back(ctx.NewCallable(item.Pos, name, std::move(item.Args)));
+ }
}
- return ctx.NewList(frames->Pos(), std::move(winOnRowsNodes));
+ return ctx.NewList(frames->Pos(), std::move(newFrames));
}
return frames;
diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp
index a4f2c5b6fdc..55f177b44dc 100644
--- a/ydb/library/yql/core/type_ann/type_ann_list.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp
@@ -4940,16 +4940,6 @@ namespace {
}
IGraphTransformer::TStatus WinOnWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- if (input->IsCallable("WinOnGroups")) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "GROUPS in frame specification are not supported yet"));
- return IGraphTransformer::TStatus::Error;
- }
- if (input->IsCallable("WinOnRange")) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "RANGE in frame specification is not supported yet"));
- return IGraphTransformer::TStatus::Error;
- }
- YQL_ENSURE(input->IsCallable("WinOnRows"));
-
if (!EnsureMinArgsCount(*input, 1, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
@@ -4968,10 +4958,25 @@ namespace {
}
}
- if (!TWindowFrameSettings::TryParse(*input, ctx.Expr)) {
+ auto frameSettings = TWindowFrameSettings::TryParse(*input, ctx.Expr);
+ if (!frameSettings) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto frameType = frameSettings->GetFrameType();
+ if (frameType == EFrameType::FrameByGroups) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "GROUPS in frame specification are not supported yet"));
return IGraphTransformer::TStatus::Error;
}
+ if (frameType == EFrameType::FrameByRange) {
+ // only UNBOUNDED PRECEDING -> CURRENT ROW is currently supported
+ if (!(IsUnbounded(frameSettings->GetFirst()) && IsCurrentRow(frameSettings->GetLast()))) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "RANGE in frame specification is not supported yet"));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
auto status = NormalizeKeyValueTuples(input, 1, output, ctx.Expr);
if (status != IGraphTransformer::TStatus::Ok) {
return status;
diff --git a/ydb/library/yql/core/yql_opt_window.cpp b/ydb/library/yql/core/yql_opt_window.cpp
index acbfe1bbdeb..bee7c930d9d 100644
--- a/ydb/library/yql/core/yql_opt_window.cpp
+++ b/ydb/library/yql/core/yql_opt_window.cpp
@@ -13,7 +13,7 @@ namespace {
const TStringBuf SessionStartMemberName = "_yql_window_session_start";
const TStringBuf SessionParamsMemberName = "_yql_window_session_params";
-enum class EFrameType : ui8 {
+enum class EFrameBoundsType : ui8 {
EMPTY,
LAGGING,
CURRENT,
@@ -22,26 +22,26 @@ enum class EFrameType : ui8 {
GENERIC,
};
-EFrameType FrameType(const TWindowFrameSettings& settings) {
+EFrameBoundsType FrameBoundsType(const TWindowFrameSettings& settings) {
auto first = settings.GetFirstOffset();
auto last = settings.GetLastOffset();
if (first.Defined() && last.Defined() && first > last) {
- return EFrameType::EMPTY;
+ return EFrameBoundsType::EMPTY;
}
if (!first.Defined()) {
if (!last.Defined()) {
- return EFrameType::FULL;
+ return EFrameBoundsType::FULL;
}
if (*last < 0) {
- return EFrameType::LAGGING;
+ return EFrameBoundsType::LAGGING;
}
- return *last > 0 ? EFrameType::LEADING : EFrameType::CURRENT;
+ return *last > 0 ? EFrameBoundsType::LEADING : EFrameBoundsType::CURRENT;
}
- return EFrameType::GENERIC;
+ return EFrameBoundsType::GENERIC;
}
TExprNode::TPtr ReplaceLastLambdaArgWithStringLiteral(const TExprNode& lambda, TStringBuf literal, TExprContext& ctx) {
@@ -166,20 +166,28 @@ TCalcOverWindowTraits ExtractCalcOverWindowTraits(const TExprNode::TPtr& frames,
ui64 frameOutpace = 0;
ui64 frameLag = 0;
- const EFrameType frameType = FrameType(frameSettings);
- const auto frameFirst = frameSettings.GetFirstOffset();
- const auto frameLast = frameSettings.GetLastOffset();
- if (frameType != EFrameType::EMPTY) {
- if (!frameLast.Defined() || *frameLast > 0) {
- frameOutpace = frameLast.Defined() ? ui64(*frameLast) : Max<ui64>();
- }
+ const EFrameType ft = frameSettings.GetFrameType();
+ if (ft == EFrameType::FrameByRows) {
+ const EFrameBoundsType frameType = FrameBoundsType(frameSettings);
+ const auto frameFirst = frameSettings.GetFirstOffset();
+ const auto frameLast = frameSettings.GetLastOffset();
+
+ if (frameType != EFrameBoundsType::EMPTY) {
+ if (!frameLast.Defined() || *frameLast > 0) {
+ frameOutpace = frameLast.Defined() ? ui64(*frameLast) : Max<ui64>();
+ }
- if (frameFirst.Defined() && *frameFirst < 0) {
- frameLag = ui64(0 - *frameFirst);
+ if (frameFirst.Defined() && *frameFirst < 0) {
+ frameLag = ui64(0 - *frameFirst);
+ }
}
+ } else {
+ // The only frame we currently support
+ YQL_ENSURE(ft == EFrameType::FrameByRange);
+ YQL_ENSURE(IsUnbounded(frameSettings.GetFirst()));
+ YQL_ENSURE(IsCurrentRow(frameSettings.GetLast()));
}
-
const auto& winOnChildren = winOn->ChildrenList();
YQL_ENSURE(winOnChildren.size() > 1);
for (size_t i = 1; i < winOnChildren.size(); ++i) {
@@ -227,11 +235,16 @@ TCalcOverWindowTraits ExtractCalcOverWindowTraits(const TExprNode::TPtr& frames,
rawTraits.CalculateLambda = calculateLambda;
rawTraits.DefaultValue = traits->ChildPtr(5);
- if (frameType == EFrameType::LAGGING) {
- maxUnboundedPrecedingLag = Max(maxUnboundedPrecedingLag, ui64(abs(*frameLast)));
- lagQueueStructItems.push_back(ctx.MakeType<TItemExprType>(name, rawTraits.OutputType));
+ if (ft == EFrameType::FrameByRows) {
+ const EFrameBoundsType frameType = FrameBoundsType(frameSettings);
+ const auto frameLast = frameSettings.GetLastOffset();
+ if (frameType == EFrameBoundsType::LAGGING) {
+ maxUnboundedPrecedingLag = Max(maxUnboundedPrecedingLag, ui64(abs(*frameLast)));
+ lagQueueStructItems.push_back(ctx.MakeType<TItemExprType>(name, rawTraits.OutputType));
+ }
}
} else if (traits->IsCallable({"Lead", "Lag"})) {
+ YQL_ENSURE(ft == EFrameType::FrameByRows);
i64 lead = 1;
if (traits->ChildrenSize() == 3) {
YQL_ENSURE(traits->Child(2)->IsCallable("Int64"));
@@ -1278,9 +1291,15 @@ TVector<TChain1MapTraits::TPtr> BuildFoldMapTraits(TQueueParams& queueParams, co
continue;
}
- switch(FrameType(trait.FrameSettings)) {
- case EFrameType::CURRENT:
- case EFrameType::LAGGING: {
+ if (trait.FrameSettings.GetFrameType() == EFrameType::FrameByRange) {
+ result.push_back(new TChain1MapTraitsCurrentOrLagging(name, trait, {}));
+ continue;
+ }
+
+ YQL_ENSURE(trait.FrameSettings.GetFrameType() == EFrameType::FrameByRows);
+ switch(FrameBoundsType(trait.FrameSettings)) {
+ case EFrameBoundsType::CURRENT:
+ case EFrameBoundsType::LAGGING: {
TMaybe<ui64> lagQueueIndex;
auto end = *trait.FrameSettings.GetLastOffset();
YQL_ENSURE(end <= 0);
@@ -1292,18 +1311,18 @@ TVector<TChain1MapTraits::TPtr> BuildFoldMapTraits(TQueueParams& queueParams, co
result.push_back(new TChain1MapTraitsCurrentOrLagging(name, trait, lagQueueIndex));
break;
}
- case EFrameType::LEADING: {
+ case EFrameBoundsType::LEADING: {
auto end = *trait.FrameSettings.GetLastOffset();
YQL_ENSURE(end > 0);
ui64 lastRowIndex = currentRowIndex + ui64(end);
result.push_back(new TChain1MapTraitsLeading(name, trait, currentRowIndex, lastRowIndex));
break;
}
- case EFrameType::FULL: {
+ case EFrameBoundsType::FULL: {
result.push_back(new TChain1MapTraitsFull(name, trait, currentRowIndex));
break;
}
- case EFrameType::GENERIC: {
+ case EFrameBoundsType::GENERIC: {
queueParams.DataQueueNeeded = true;
auto first = trait.FrameSettings.GetFirstOffset();
auto last = trait.FrameSettings.GetLastOffset();
@@ -1313,7 +1332,7 @@ TVector<TChain1MapTraits::TPtr> BuildFoldMapTraits(TQueueParams& queueParams, co
result.push_back(new TChain1MapTraitsGeneric(name, trait, beginIndex, endIndex));
break;
}
- case EFrameType::EMPTY: {
+ case EFrameBoundsType::EMPTY: {
result.push_back(new TChain1MapTraitsEmpty(name, trait));
break;
}
@@ -1561,11 +1580,10 @@ TExprNode::TPtr BuildChain1MapUpdateLambda(TPositionHandle pos, const TVector<TC
return ctx.NewLambda(pos, ctx.NewArguments(pos, {rowArg, stateArg}), std::move(finalBody));
}
-// TODO: fixme
bool IsNonCompactFullFrame(const TExprNode& winOnRows, TExprContext& ctx) {
- YQL_ENSURE(winOnRows.IsCallable("WinOnRows"));
TWindowFrameSettings frameSettings = TWindowFrameSettings::Parse(winOnRows, ctx);
- return !frameSettings.IsCompact() && !frameSettings.GetFirstOffset().Defined() && !frameSettings.GetLastOffset().Defined();
+ return frameSettings.GetFrameType() == FrameByRows &&
+ !frameSettings.IsCompact() && !frameSettings.GetFirstOffset().Defined() && !frameSettings.GetLastOffset().Defined();
}
TExprNode::TPtr DeduceCompatibleSort(const TExprNode::TPtr& traitsOne, const TExprNode::TPtr& traitsTwo) {
@@ -2233,6 +2251,241 @@ TExprNode::TPtr TryExpandNonCompactFullFrames(TPositionHandle pos, const TExprNo
.Done().Ptr();
}
+void SplitFramesByType(const TExprNode::TPtr& frames, TExprNode::TPtr& rowFrames, TExprNode::TPtr& rangeFrames, TExprNode::TPtr& groupFrames, TExprContext& ctx) {
+ TExprNodeList rows;
+ TExprNodeList range;
+ TExprNodeList groups;
+ for (auto& winOn : frames->ChildrenList()) {
+ if (TCoWinOnRows::Match(winOn.Get())) {
+ rows.push_back(std::move(winOn));
+ } else if (TCoWinOnRange::Match(winOn.Get())) {
+ range.push_back(std::move(winOn));
+ } else {
+ YQL_ENSURE(TCoWinOnGroups::Match(winOn.Get()));
+ groups.push_back(std::move(winOn));
+ }
+ }
+
+ rowFrames = ctx.NewList(frames->Pos(), std::move(rows));
+ rangeFrames = ctx.NewList(frames->Pos(), std::move(range));
+ groupFrames = ctx.NewList(frames->Pos(), std::move(groups));
+}
+
+TExprNode::TPtr ProcessRowsFrames(TPositionHandle pos, const TExprNode::TPtr& input, const TStructExprType& rowType, const TExprNode::TPtr& dependsOn,
+ const TExprNode::TPtr& frames, TExprContext& ctx)
+{
+ TExprNode::TPtr processed = input;
+ TExprNode::TPtr dataQueue;
+ TQueueParams queueParams;
+ TVector<TChain1MapTraits::TPtr> traits = BuildFoldMapTraits(queueParams, frames, ctx);
+ if (queueParams.DataQueueNeeded) {
+ ui64 queueSize = (queueParams.DataOutpace == Max<ui64>()) ? Max<ui64>() : (queueParams.DataOutpace + queueParams.DataLag + 2);
+ dataQueue = BuildQueue(pos, rowType, queueSize, queueParams.DataLag, dependsOn, ctx);
+ processed = ctx.Builder(pos)
+ .Callable("PreserveStream")
+ .Add(0, processed)
+ .Add(1, dataQueue)
+ .Add(2, BuildUint64(pos, queueParams.DataOutpace, ctx))
+ .Seal()
+ .Build();
+ }
+
+ processed = ctx.Builder(pos)
+ .Callable("OrderedMap")
+ .Callable(0, "Chain1Map")
+ .Add(0, std::move(processed))
+ .Add(1, BuildChain1MapInitLambda(pos, traits, dataQueue, queueParams.LagQueueSize, queueParams.LagQueueItemType, ctx))
+ .Add(2, BuildChain1MapUpdateLambda(pos, traits, dataQueue, queueParams.LagQueueSize != 0, ctx))
+ .Seal()
+ .Lambda(1)
+ .Param("pair")
+ .Callable("Nth")
+ .Arg(0, "pair")
+ .Atom(1, "0", TNodeFlags::Default)
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+
+ return processed;
+}
+
+TExprNode::TPtr ProcessRangeFrames(TPositionHandle pos, const TExprNode::TPtr& input, const TExprNode::TPtr& sortKey, const TExprNode::TPtr& frames, TExprContext& ctx) {
+ if (frames->ChildrenSize() == 0) {
+ return input;
+ }
+
+ TExprNode::TPtr processed = input;
+ TQueueParams queueParams;
+ TVector<TChain1MapTraits::TPtr> traits = BuildFoldMapTraits(queueParams, frames, ctx);
+ YQL_ENSURE(!queueParams.DataQueueNeeded);
+ YQL_ENSURE(queueParams.LagQueueSize == 0);
+ YQL_ENSURE(queueParams.LagQueueItemType == nullptr);
+
+ // same processing as in WinOnRows
+ processed = ctx.Builder(pos)
+ .Callable("OrderedMap")
+ .Callable(0, "Chain1Map")
+ .Add(0, std::move(processed))
+ .Add(1, BuildChain1MapInitLambda(pos, traits, nullptr, 0, nullptr, ctx))
+ .Add(2, BuildChain1MapUpdateLambda(pos, traits, nullptr, false, ctx))
+ .Seal()
+ .Lambda(1)
+ .Param("pair")
+ .Callable("Nth")
+ .Arg(0, "pair")
+ .Atom(1, "0", TNodeFlags::Default)
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+
+ // split rows by groups with equal sortKey
+ TExprNode::TPtr sortKeyLambda = sortKey;
+ if (sortKey->IsCallable("Void")) {
+ sortKeyLambda = ctx.Builder(sortKey->Pos())
+ .Lambda()
+ .Param("row")
+ .Callable("Void")
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ processed = ctx.Builder(pos)
+ .Callable("Condense1")
+ .Add(0, processed)
+ .Lambda(1)
+ .Param("row")
+ .List()
+ .Apply(0, sortKeyLambda)
+ .With(0, "row")
+ .Seal()
+ .Callable(1, "AsList")
+ .Arg(0, "row")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Lambda(2)
+ .Param("row")
+ .Param("state")
+ .Callable(0, "AggrNotEquals")
+ .Apply(0, sortKeyLambda)
+ .With(0, "row")
+ .Seal()
+ .Callable(1, "Nth")
+ .Arg(0, "state")
+ .Atom(1, "0", TNodeFlags::Default)
+ .Seal()
+ .Seal()
+ .Seal()
+ .Lambda(3)
+ .Param("row")
+ .Param("state")
+ .List()
+ .Callable(0, "Nth")
+ .Arg(0, "state")
+ .Atom(1, "0", TNodeFlags::Default)
+ .Seal()
+ .Callable(1, "Insert")
+ .Callable(0, "Nth")
+ .Arg(0, "state")
+ .Atom(1, "1", TNodeFlags::Default)
+ .Seal()
+ .Arg(1, "row")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+
+ processed = ctx.Builder(pos)
+ .Callable("OrderedMap")
+ .Add(0, processed)
+ .Lambda(1)
+ .Param("item")
+ .Callable(0, "Nth")
+ .Arg(0, "item")
+ .Atom(1, "1", TNodeFlags::Default)
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+
+ auto lastRowArg = ctx.NewArgument(pos, "lastRow");
+ auto currentRowArg = ctx.NewArgument(pos, "currentRow");
+ auto currentRow = currentRowArg;
+
+ for (auto& trait : traits) {
+ TStringBuf name = trait->GetName();
+ currentRow = ctx.Builder(pos)
+ .Callable("AddMember")
+ .Callable(0, "RemoveMember")
+ .Add(0, currentRow)
+ .Atom(1, name)
+ .Seal()
+ .Atom(1, name)
+ .Callable(2, "Member")
+ .Add(0, lastRowArg)
+ .Atom(1, name)
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ auto overwriteWithLastRowLambda = ctx.NewLambda(pos, ctx.NewArguments(pos, { currentRowArg, lastRowArg }), std::move(currentRow));
+
+ // processed is currently stream of groups (=lists) with equal sort keys
+ processed = ctx.Builder(pos)
+ .Callable("OrderedFlatMap")
+ .Add(0, processed)
+ .Lambda(1)
+ .Param("group")
+ .Callable("If")
+ .Callable(0, "<=")
+ .Callable(0, "Length")
+ .Arg(0, "group")
+ .Seal()
+ .Callable(1, "Uint64")
+ .Atom(0, "1", TNodeFlags::Default)
+ .Seal()
+ .Seal()
+ .Arg(1, "group")
+ .Callable(2, "Map")
+ .Callable(0, "Last")
+ .Arg(0, "group")
+ .Seal()
+ .Lambda(1)
+ .Param("lastRow")
+ .Callable("OrderedMap")
+ .Arg(0, "group")
+ .Lambda(1)
+ .Param("currentRow")
+ .Apply(overwriteWithLastRowLambda)
+ .With(0, "currentRow")
+ .With(1, "lastRow")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+
+ processed = ctx.Builder(pos)
+ .Callable("OrderedFlatMap")
+ .Add(0, processed)
+ .Lambda(1)
+ .Param("group")
+ .Arg("group")
+ .Seal()
+ .Seal()
+ .Build();
+ return processed;
+}
+
TExprNode::TPtr ExpandSingleCalcOverWindow(TPositionHandle pos, const TExprNode::TPtr& inputList, const TExprNode::TPtr& keyColumns,
const TExprNode::TPtr& sortTraits, const TExprNode::TPtr& frames, const TExprNode::TPtr& sessionTraits,
const TExprNode::TPtr& sessionColumns, TExprContext& ctx)
@@ -2242,9 +2495,6 @@ TExprNode::TPtr ExpandSingleCalcOverWindow(TPositionHandle pos, const TExprNode:
return expanded;
}
- TQueueParams queueParams;
- TVector<TChain1MapTraits::TPtr> traits = BuildFoldMapTraits(queueParams, frames, ctx);
-
TExprNode::TPtr sessionKey;
TExprNode::TPtr sessionSortTraits;
const TTypeAnnotationNode* sessionKeyType = nullptr;
@@ -2266,6 +2516,8 @@ TExprNode::TPtr ExpandSingleCalcOverWindow(TPositionHandle pos, const TExprNode:
TExprNode::TPtr sortKey;
TExprNode::TPtr sortOrder;
+ ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx);
+ const TExprNode::TPtr originalSortKey = sortKey;
TExprNode::TPtr input = inputList;
const auto commonSortTraits = DeduceCompatibleSort(sortTraits, sessionSortTraits);
@@ -2301,39 +2553,17 @@ TExprNode::TPtr ExpandSingleCalcOverWindow(TPositionHandle pos, const TExprNode:
sessionKey = sessionInit = sessionUpdate = {};
}
- auto topLevelStreamArg = ctx.NewArgument(pos, "stream");
+ TExprNode::TPtr rowsFrames;
+ TExprNode::TPtr rangeFrames;
+ TExprNode::TPtr groupsFrames;
+ SplitFramesByType(frames, rowsFrames, rangeFrames, groupsFrames, ctx);
+ YQL_ENSURE(groupsFrames->ChildrenSize() == 0);
+ auto topLevelStreamArg = ctx.NewArgument(pos, "stream");
TExprNode::TPtr processed = topLevelStreamArg;
- TExprNode::TPtr dataQueue;
- if (queueParams.DataQueueNeeded) {
- ui64 queueSize = (queueParams.DataOutpace == Max<ui64>()) ? Max<ui64>() : (queueParams.DataOutpace + queueParams.DataLag + 2);
- dataQueue = BuildQueue(pos, *rowType, queueSize, queueParams.DataLag, topLevelStreamArg, ctx);
- processed = ctx.Builder(pos)
- .Callable("PreserveStream")
- .Add(0, topLevelStreamArg)
- .Add(1, dataQueue)
- .Add(2, BuildUint64(pos, queueParams.DataOutpace, ctx))
- .Seal()
- .Build();
- }
-
- processed = ctx.Builder(pos)
- .Callable("OrderedMap")
- .Callable(0, "Chain1Map")
- .Add(0, std::move(processed))
- .Add(1, BuildChain1MapInitLambda(pos, traits, dataQueue, queueParams.LagQueueSize, queueParams.LagQueueItemType, ctx))
- .Add(2, BuildChain1MapUpdateLambda(pos, traits, dataQueue, queueParams.LagQueueSize != 0, ctx))
- .Seal()
- .Lambda(1)
- .Param("pair")
- .Callable("Nth")
- .Arg(0, "pair")
- .Atom(1, "0", TNodeFlags::Default)
- .Seal()
- .Seal()
- .Seal()
- .Build();
+ processed = ProcessRangeFrames(pos, processed, originalSortKey, rangeFrames, ctx);
+ processed = ProcessRowsFrames(pos, processed, *rowType, topLevelStreamArg, rowsFrames, ctx);
auto topLevelStreamProcessingLambda = ctx.NewLambda(pos, ctx.NewArguments(pos, {topLevelStreamArg}), std::move(processed));
@@ -2506,6 +2736,20 @@ TExprNode::TPtr RebuildCalcOverWindowGroup(TPositionHandle pos, const TExprNode:
.Done().Ptr();
}
+bool IsUnbounded(const NNodes::TCoFrameBound& bound) {
+ if (bound.Ref().ChildrenSize() < 2) {
+ return false;
+ }
+ if (auto maybeAtom = bound.Bound().Maybe<TCoAtom>()) {
+ return maybeAtom.Cast().Value() == "unbounded";
+ }
+ return false;
+}
+
+bool IsCurrentRow(const NNodes::TCoFrameBound& bound) {
+ return bound.Setting().Value() == "currentRow";
+}
+
TWindowFrameSettings TWindowFrameSettings::Parse(const TExprNode& node, TExprContext& ctx) {
auto maybeSettings = TryParse(node, ctx);
YQL_ENSURE(maybeSettings);
@@ -2515,7 +2759,14 @@ TWindowFrameSettings TWindowFrameSettings::Parse(const TExprNode& node, TExprCon
TMaybe<TWindowFrameSettings> TWindowFrameSettings::TryParse(const TExprNode& node, TExprContext& ctx) {
TWindowFrameSettings settings;
- YQL_ENSURE(node.IsCallable({"WinOnRows", "WinOnRange", "WinOnGroups"}));
+ if (node.IsCallable("WinOnRows")) {
+ settings.Type = EFrameType::FrameByRows;
+ } else if (node.IsCallable("WinOnRange")) {
+ settings.Type = EFrameType::FrameByRange;
+ } else {
+ YQL_ENSURE(node.IsCallable("WinOnGroups"));
+ settings.Type = EFrameType::FrameByGroups;
+ }
auto frameSpec = node.Child(0);
if (frameSpec->Type() == TExprNode::List) {
bool hasBegin = false;
@@ -2553,6 +2804,7 @@ TMaybe<TWindowFrameSettings> TWindowFrameSettings::TryParse(const TExprNode& nod
return {};
}
+ hasBound = true;
TMaybe<i32>& boundOffset = (settingName == "begin") ? settings.FirstOffset : settings.LastOffset;
TExprNode::TPtr& frameBound = (settingName == "begin") ? settings.First : settings.Last;
@@ -2591,6 +2843,7 @@ TMaybe<TWindowFrameSettings> TWindowFrameSettings::TryParse(const TExprNode& nod
auto boundValue = fb->ChildPtr(1);
if (boundValue->IsAtom()) {
if (boundValue->Content() == "unbounded") {
+ frameBound = fb;
continue;
}
ctx.AddError(TIssue(ctx.GetPosition(fb->Pos()), TStringBuilder() << "Expecting unbounded, but got '" << boundValue->Content() << "'"));
@@ -2663,7 +2916,6 @@ TMaybe<TWindowFrameSettings> TWindowFrameSettings::TryParse(const TExprNode& nod
TStringBuilder() << "Invalid " << settingName << " frame bound - expecting Void or Int32 callable, but got: " << errMsg));
return {};
}
- hasBound = true;
}
if (!hasBegin || !hasEnd) {