aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@yandex-team.ru>2022-03-03 00:47:23 +0300
committeraneporada <aneporada@yandex-team.ru>2022-03-03 00:47:23 +0300
commit4cfc23ae371d1fb8e426470c5ec4e7a2962de9e4 (patch)
treea758db5cfe3f4f8dab3b2778222ad86d53ff1e4d
parentb65a9faeed938ec73573d4b0d7635987d6eef099 (diff)
downloadydb-4cfc23ae371d1fb8e426470c5ec4e7a2962de9e4.tar.gz
[YQL-10265] Refactor TWindowFrameSettings to support range/groups
ref:781d0d04edfe56877a9df1df5efd9032f9bd598c
-rw-r--r--ydb/library/yql/core/expr_nodes/yql_expr_nodes.json33
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_list.cpp10
-rw-r--r--ydb/library/yql/core/yql_opt_window.cpp117
-rw-r--r--ydb/library/yql/core/yql_opt_window.h40
4 files changed, 147 insertions, 53 deletions
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
index 96ed88efbbc..74326b7339b 100644
--- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
+++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
@@ -1294,6 +1294,39 @@
]
},
{
+ "Name": "TCoFrameBound",
+ "Base": "TExprBase",
+ "Match": {"Type": "Tuple"},
+ "Children": [
+ {"Index": 0, "Name": "Setting", "Type": "TCoAtom"},
+ {"Index": 1, "Name": "Bound", "Type": "TExprBase", "Optional": true}
+ ]
+ },
+ {
+ "Name": "TCoWinOnBase",
+ "VarArgBase": "TExprBase",
+ "Match": {"Type": "CallableBase"},
+ "Builder": {"Generate": "None"},
+ "Children": [
+ {"Index": 0, "Name": "FrameSpec", "Type": "TExprBase"}
+ ]
+ },
+ {
+ "Name": "TCoWinOnRows",
+ "Base": "TCoWinOnBase",
+ "Match": {"Type": "Callable", "Name": "WinOnRows"}
+ },
+ {
+ "Name": "TCoWinOnRange",
+ "Base": "TCoWinOnBase",
+ "Match": {"Type": "Callable", "Name": "WinOnRange"}
+ },
+ {
+ "Name": "TCoWinOnGroups",
+ "Base": "TCoWinOnBase",
+ "Match": {"Type": "Callable", "Name": "WinOnGroups"}
+ },
+ {
"Name": "TCoCalcOverWindowBase",
"Base": "TCoInputBase",
"Match": {"Type": "CallableBase"},
diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp
index 4724ac20b5a..d39b41ecafa 100644
--- a/ydb/library/yql/core/type_ann/type_ann_list.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp
@@ -518,12 +518,7 @@ namespace {
return IGraphTransformer::TStatus::Error;
}
- bool frameCanBeEmpty = false;
- {
- TWindowFrameSettings frameSettings;
- YQL_ENSURE(ParseWindowFrameSettings(*winOn, frameSettings, ctx));
- frameCanBeEmpty = !frameSettings.NeverEmpty;
- }
+ bool frameCanBeEmpty = !TWindowFrameSettings::Parse(*winOn, ctx).IsNonEmpty();
for (auto iterFunc = winOn->Children().begin() + 1; iterFunc != winOn->Children().end(); ++iterFunc) {
auto func = *iterFunc;
@@ -4970,8 +4965,7 @@ namespace {
}
}
- TWindowFrameSettings frameSettings;
- if (!ParseWindowFrameSettings(*input, frameSettings, ctx.Expr)) {
+ if (!TWindowFrameSettings::TryParse(*input, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
diff --git a/ydb/library/yql/core/yql_opt_window.cpp b/ydb/library/yql/core/yql_opt_window.cpp
index 8eaa0ca3fb3..cc9804b7cdb 100644
--- a/ydb/library/yql/core/yql_opt_window.cpp
+++ b/ydb/library/yql/core/yql_opt_window.cpp
@@ -23,8 +23,8 @@ enum class EFrameType : ui8 {
};
EFrameType FrameType(const TWindowFrameSettings& settings) {
- auto first = settings.First;
- auto last = settings.Last;
+ auto first = settings.GetFirstOffset();
+ auto last = settings.GetLastOffset();
if (first.Defined() && last.Defined() && first > last) {
return EFrameType::EMPTY;
@@ -164,14 +164,13 @@ TCalcOverWindowTraits ExtractCalcOverWindowTraits(const TExprNode::TPtr& frames,
for (auto& winOn : frames->ChildrenList()) {
YQL_ENSURE(winOn->IsCallable("WinOnRows"));
- TWindowFrameSettings frameSettings;
- YQL_ENSURE(ParseWindowFrameSettings(*winOn, frameSettings, ctx));
+ TWindowFrameSettings frameSettings = TWindowFrameSettings::Parse(*winOn, ctx);
ui64 frameOutpace = 0;
ui64 frameLag = 0;
const EFrameType frameType = FrameType(frameSettings);
- const auto frameFirst = frameSettings.First;
- const auto frameLast = frameSettings.Last;
+ const auto frameFirst = frameSettings.GetFirstOffset();
+ const auto frameLast = frameSettings.GetLastOffset();
if (frameType != EFrameType::EMPTY) {
if (!frameLast.Defined() || *frameLast > 0) {
@@ -1073,7 +1072,7 @@ public:
: TChain1MapTraitsStateBase(name, raw)
, QueueBegin(queueBegin)
, QueueEnd(queueEnd)
- , FrameNeverEmpty(raw.FrameSettings.NeverEmpty)
+ , FrameNeverEmpty(raw.FrameSettings.IsNonEmpty())
, OutputIsOptional(raw.OutputType->GetKind() == ETypeAnnotationKind::Optional)
{
}
@@ -1285,7 +1284,7 @@ TVector<TChain1MapTraits::TPtr> BuildFoldMapTraits(TQueueParams& queueParams, co
case EFrameType::CURRENT:
case EFrameType::LAGGING: {
TMaybe<ui64> lagQueueIndex;
- auto end = *trait.FrameSettings.Last;
+ auto end = *trait.FrameSettings.GetLastOffset();
YQL_ENSURE(end <= 0);
if (end < 0) {
YQL_ENSURE(queueParams.LagQueueSize >= ui64(0 - end));
@@ -1296,7 +1295,7 @@ TVector<TChain1MapTraits::TPtr> BuildFoldMapTraits(TQueueParams& queueParams, co
break;
}
case EFrameType::LEADING: {
- auto end = *trait.FrameSettings.Last;
+ auto end = *trait.FrameSettings.GetLastOffset();
YQL_ENSURE(end > 0);
ui64 lastRowIndex = currentRowIndex + ui64(end);
result.push_back(new TChain1MapTraitsLeading(name, trait, currentRowIndex, lastRowIndex));
@@ -1308,8 +1307,8 @@ TVector<TChain1MapTraits::TPtr> BuildFoldMapTraits(TQueueParams& queueParams, co
}
case EFrameType::GENERIC: {
queueParams.DataQueueNeeded = true;
- auto first = trait.FrameSettings.First;
- auto last = trait.FrameSettings.Last;
+ auto first = trait.FrameSettings.GetFirstOffset();
+ auto last = trait.FrameSettings.GetLastOffset();
YQL_ENSURE(first.Defined());
ui64 beginIndex = currentRowIndex + *first;
ui64 endIndex = last.Defined() ? (currentRowIndex + *last + 1) : Max<ui64>();
@@ -1564,13 +1563,11 @@ TExprNode::TPtr BuildChain1MapUpdateLambda(TPositionHandle pos, const TVector<TC
return ctx.NewLambda(pos, ctx.NewArguments(pos, {rowArg, stateArg}), std::move(finalBody));
}
+// TODO: fixme
bool IsNonCompactFullFrame(const TExprNode& winOnRows, TExprContext& ctx) {
YQL_ENSURE(winOnRows.IsCallable("WinOnRows"));
-
- TWindowFrameSettings frameSettings;
- YQL_ENSURE(ParseWindowFrameSettings(winOnRows, frameSettings, ctx));
-
- return !frameSettings.IsCompact && !frameSettings.First.Defined() && !frameSettings.Last.Defined();
+ TWindowFrameSettings frameSettings = TWindowFrameSettings::Parse(winOnRows, ctx);
+ return !frameSettings.IsCompact() && !frameSettings.GetFirstOffset().Defined() && !frameSettings.GetLastOffset().Defined();
}
TExprNode::TPtr DeduceCompatibleSort(const TExprNode::TPtr& traitsOne, const TExprNode::TPtr& traitsTwo) {
@@ -2509,10 +2506,16 @@ TExprNode::TPtr RebuildCalcOverWindowGroup(TPositionHandle pos, const TExprNode:
.Done().Ptr();
}
-bool ParseWindowFrameSettings(const TExprNode& node, TWindowFrameSettings& settings, TExprContext& ctx) {
- settings = {};
+TWindowFrameSettings TWindowFrameSettings::Parse(const TExprNode& node, TExprContext& ctx) {
+ auto maybeSettings = TryParse(node, ctx);
+ YQL_ENSURE(maybeSettings);
+ return *maybeSettings;
+}
- YQL_ENSURE(node.IsCallable("WinOnRows"));
+TMaybe<TWindowFrameSettings> TWindowFrameSettings::TryParse(const TExprNode& node, TExprContext& ctx) {
+ TWindowFrameSettings settings;
+
+ YQL_ENSURE(node.IsCallable({"WinOnRows", "WinOnRange", "WinOnGroups"}));
auto frameSpec = node.Child(0);
if (frameSpec->Type() == TExprNode::List) {
bool hasBegin = false;
@@ -2520,43 +2523,69 @@ bool ParseWindowFrameSettings(const TExprNode& node, TWindowFrameSettings& setti
for (const auto& setting : frameSpec->Children()) {
if (!EnsureTupleMinSize(*setting, 1, ctx)) {
- return false;
+ return {};
}
if (!EnsureAtom(setting->Head(), ctx)) {
- return false;
+ return {};
}
const auto settingName = setting->Head().Content();
if (settingName != "begin" && settingName != "end" && settingName != "compact") {
ctx.AddError(
TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Invalid frame bound '" << settingName << "'"));
- return false;
+ return {};
}
if (settingName == "compact") {
- settings.IsCompact = true;
+ settings.Compact = true;
continue;
}
if (!EnsureTupleSize(*setting, 2, ctx)) {
- return false;
+ return {};
}
bool& hasBound = (settingName == "begin") ? hasBegin : hasEnd;
if (hasBound) {
ctx.AddError(
TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Duplicate " << settingName << " frame bound detected"));
- return false;
+ return {};
}
- TMaybe<i32>& boundValue = (settingName == "begin") ? settings.First : settings.Last;
- if (setting->Tail().IsCallable("Int32")) {
+ TMaybe<i32>& boundOffset = (settingName == "begin") ? settings.FirstOffset : settings.LastOffset;
+ TExprNode::TPtr& frameBound = (settingName == "begin") ? settings.First : settings.Last;
+
+ if (setting->Tail().IsList()) {
+ TExprNode::TPtr fb = setting->TailPtr();
+ if (!EnsureTupleMinSize(*fb, 1, ctx)) {
+ return {};
+ }
+ if (!EnsureAtom(fb->Head(), ctx)) {
+ return {};
+ }
+
+ auto type = fb->Head().Content();
+ if (type == "currentRow") {
+ if (fb->ChildrenSize() == 1) {
+ continue;
+ }
+ ctx.AddError(TIssue(ctx.GetPosition(fb->Pos()), TStringBuilder() << "Expecting no value for '" << type << "'"));
+ return {};
+ }
+
+ if (!(type == "preceding" || type == "following")) {
+ ctx.AddError(TIssue(ctx.GetPosition(fb->Pos()), TStringBuilder() << "Expecting preceding or follwing, but got '" << type << "'"));
+ return {};
+ }
+
+ frameBound = fb;
+ } else if (setting->Tail().IsCallable("Int32")) {
auto& valNode = setting->Tail().Head();
YQL_ENSURE(valNode.IsAtom());
i32 value;
YQL_ENSURE(TryFromString(valNode.Content(), value));
- boundValue = value;
+ boundOffset = value;
} else if (!setting->Tail().IsCallable("Void")) {
const TTypeAnnotationNode* type = setting->Tail().GetTypeAnn();
TStringBuilder errMsg;
@@ -2570,7 +2599,7 @@ bool ParseWindowFrameSettings(const TExprNode& node, TWindowFrameSettings& setti
ctx.AddError(TIssue(ctx.GetPosition(setting->Tail().Pos()),
TStringBuilder() << "Invalid " << settingName << " frame bound - expecting Void or Int32 callable, but got: " << errMsg));
- return false;
+ return {};
}
hasBound = true;
}
@@ -2578,28 +2607,38 @@ bool ParseWindowFrameSettings(const TExprNode& node, TWindowFrameSettings& setti
if (!hasBegin || !hasEnd) {
ctx.AddError(TIssue(ctx.GetPosition(frameSpec->Pos()),
TStringBuilder() << "Missing " << (!hasBegin ? "begin" : "end") << " bound in frame definition"));
- return false;
+ return {};
}
} else if (frameSpec->IsCallable("Void")) {
- settings.First = {};
- settings.Last = 0;
+ settings.FirstOffset = {};
+ settings.LastOffset = 0;
} else {
const TTypeAnnotationNode* type = frameSpec->GetTypeAnn();
ctx.AddError(TIssue(ctx.GetPosition(frameSpec->Pos()),
TStringBuilder() << "Invalid window frame - expecting Tuple or Void, but got: " << (type ? FormatType(type) : "lambda")));
- return false;
+ return {};
}
// frame will always contain rows if it includes current row
- if (!settings.First.Defined()) {
- settings.NeverEmpty = !settings.Last.Defined() || *settings.Last >= 0;
- } else if (!settings.Last.Defined()) {
- settings.NeverEmpty = !settings.First.Defined() || *settings.First <= 0;
+ if (!settings.FirstOffset) {
+ settings.NeverEmpty = !settings.LastOffset.Defined() || *settings.LastOffset >= 0;
+ } else if (!settings.LastOffset.Defined()) {
+ settings.NeverEmpty = !settings.FirstOffset.Defined() || *settings.FirstOffset <= 0;
} else {
- settings.NeverEmpty = *settings.First <= *settings.Last && *settings.First <= 0 && *settings.Last >= 0;
+ settings.NeverEmpty = *settings.FirstOffset <= *settings.LastOffset && *settings.FirstOffset <= 0 && *settings.LastOffset >= 0;
}
- return true;
+ return settings;
+}
+
+TMaybe<i32> TWindowFrameSettings::GetFirstOffset() const {
+ YQL_ENSURE(Type == FrameByRows);
+ return FirstOffset;
+}
+
+TMaybe<i32> TWindowFrameSettings::GetLastOffset() const {
+ YQL_ENSURE(Type == FrameByRows);
+ return LastOffset;
}
TExprNode::TPtr ZipWithSessionParamsLambda(TPositionHandle pos, const TExprNode::TPtr& partitionKeySelector,
diff --git a/ydb/library/yql/core/yql_opt_window.h b/ydb/library/yql/core/yql_opt_window.h
index 9664840581c..c0c7d8fa69a 100644
--- a/ydb/library/yql/core/yql_opt_window.h
+++ b/ydb/library/yql/core/yql_opt_window.h
@@ -8,14 +8,42 @@ TExprNode::TPtr ExpandCalcOverWindow(const TExprNode::TPtr& node, TExprContext&
TExprNodeList ExtractCalcsOverWindow(const TExprNode::TPtr& node, TExprContext& ctx);
TExprNode::TPtr RebuildCalcOverWindowGroup(TPositionHandle pos, const TExprNode::TPtr& input, const TExprNodeList& calcs, TExprContext& ctx);
-struct TWindowFrameSettings {
- TMaybe<i32> First;
- TMaybe<i32> Last;
- bool NeverEmpty = true;
- bool IsCompact = false;
+enum EFrameType {
+ FrameByRows,
+ FrameByRange,
+ FrameByGroups,
};
-bool ParseWindowFrameSettings(const TExprNode& node, TWindowFrameSettings& settings, TExprContext& ctx);
+using NNodes::TCoWinOnBase;
+using NNodes::TCoFrameBound;
+
+bool IsUnbounded(const NNodes::TCoFrameBound& bound);
+bool IsCurrentRow(const NNodes::TCoFrameBound& bound);
+
+class TWindowFrameSettings {
+public:
+ static TWindowFrameSettings Parse(const TExprNode& node, TExprContext& ctx);
+ static TMaybe<TWindowFrameSettings> TryParse(const TExprNode& node, TExprContext& ctx);
+
+ // This two functions can only be used for FrameByRows or FrameByGroups
+ TMaybe<i32> GetFirstOffset() const;
+ TMaybe<i32> GetLastOffset() const;
+
+ TCoFrameBound GetFirst(bool& isPreceding) const;
+ TCoFrameBound GetLast(bool& isPreceding) const;
+
+ bool IsNonEmpty() const { return NeverEmpty; }
+ bool IsCompact() const { return Compact; }
+ EFrameType GetFrameType() const { return Type; }
+private:
+ EFrameType Type = FrameByRows;
+ TExprNode::TPtr First;
+ TMaybe<i32> FirstOffset;
+ TExprNode::TPtr Last;
+ TMaybe<i32> LastOffset;
+ bool NeverEmpty = true;
+ bool Compact = false;
+};
// Lambda(input: Stream/List<T>) -> Stream/List<Tuple<T, SessionKey, SessionState, ....>>
// input is assumed to be partitioned by partitionKeySelector