diff options
author | aneporada <aneporada@yandex-team.ru> | 2022-03-03 00:47:23 +0300 |
---|---|---|
committer | aneporada <aneporada@yandex-team.ru> | 2022-03-03 00:47:23 +0300 |
commit | 4cfc23ae371d1fb8e426470c5ec4e7a2962de9e4 (patch) | |
tree | a758db5cfe3f4f8dab3b2778222ad86d53ff1e4d | |
parent | b65a9faeed938ec73573d4b0d7635987d6eef099 (diff) | |
download | ydb-4cfc23ae371d1fb8e426470c5ec4e7a2962de9e4.tar.gz |
[YQL-10265] Refactor TWindowFrameSettings to support range/groups
ref:781d0d04edfe56877a9df1df5efd9032f9bd598c
-rw-r--r-- | ydb/library/yql/core/expr_nodes/yql_expr_nodes.json | 33 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_list.cpp | 10 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_opt_window.cpp | 117 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_opt_window.h | 40 |
4 files changed, 147 insertions, 53 deletions
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json index 96ed88efbbc..74326b7339b 100644 --- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json +++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json @@ -1294,6 +1294,39 @@ ] }, { + "Name": "TCoFrameBound", + "Base": "TExprBase", + "Match": {"Type": "Tuple"}, + "Children": [ + {"Index": 0, "Name": "Setting", "Type": "TCoAtom"}, + {"Index": 1, "Name": "Bound", "Type": "TExprBase", "Optional": true} + ] + }, + { + "Name": "TCoWinOnBase", + "VarArgBase": "TExprBase", + "Match": {"Type": "CallableBase"}, + "Builder": {"Generate": "None"}, + "Children": [ + {"Index": 0, "Name": "FrameSpec", "Type": "TExprBase"} + ] + }, + { + "Name": "TCoWinOnRows", + "Base": "TCoWinOnBase", + "Match": {"Type": "Callable", "Name": "WinOnRows"} + }, + { + "Name": "TCoWinOnRange", + "Base": "TCoWinOnBase", + "Match": {"Type": "Callable", "Name": "WinOnRange"} + }, + { + "Name": "TCoWinOnGroups", + "Base": "TCoWinOnBase", + "Match": {"Type": "Callable", "Name": "WinOnGroups"} + }, + { "Name": "TCoCalcOverWindowBase", "Base": "TCoInputBase", "Match": {"Type": "CallableBase"}, diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp index 4724ac20b5a..d39b41ecafa 100644 --- a/ydb/library/yql/core/type_ann/type_ann_list.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp @@ -518,12 +518,7 @@ namespace { return IGraphTransformer::TStatus::Error; } - bool frameCanBeEmpty = false; - { - TWindowFrameSettings frameSettings; - YQL_ENSURE(ParseWindowFrameSettings(*winOn, frameSettings, ctx)); - frameCanBeEmpty = !frameSettings.NeverEmpty; - } + bool frameCanBeEmpty = !TWindowFrameSettings::Parse(*winOn, ctx).IsNonEmpty(); for (auto iterFunc = winOn->Children().begin() + 1; iterFunc != winOn->Children().end(); ++iterFunc) { auto func = *iterFunc; @@ -4970,8 +4965,7 @@ namespace { } } - TWindowFrameSettings frameSettings; - if (!ParseWindowFrameSettings(*input, frameSettings, ctx.Expr)) { + if (!TWindowFrameSettings::TryParse(*input, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } diff --git a/ydb/library/yql/core/yql_opt_window.cpp b/ydb/library/yql/core/yql_opt_window.cpp index 8eaa0ca3fb3..cc9804b7cdb 100644 --- a/ydb/library/yql/core/yql_opt_window.cpp +++ b/ydb/library/yql/core/yql_opt_window.cpp @@ -23,8 +23,8 @@ enum class EFrameType : ui8 { }; EFrameType FrameType(const TWindowFrameSettings& settings) { - auto first = settings.First; - auto last = settings.Last; + auto first = settings.GetFirstOffset(); + auto last = settings.GetLastOffset(); if (first.Defined() && last.Defined() && first > last) { return EFrameType::EMPTY; @@ -164,14 +164,13 @@ TCalcOverWindowTraits ExtractCalcOverWindowTraits(const TExprNode::TPtr& frames, for (auto& winOn : frames->ChildrenList()) { YQL_ENSURE(winOn->IsCallable("WinOnRows")); - TWindowFrameSettings frameSettings; - YQL_ENSURE(ParseWindowFrameSettings(*winOn, frameSettings, ctx)); + TWindowFrameSettings frameSettings = TWindowFrameSettings::Parse(*winOn, ctx); ui64 frameOutpace = 0; ui64 frameLag = 0; const EFrameType frameType = FrameType(frameSettings); - const auto frameFirst = frameSettings.First; - const auto frameLast = frameSettings.Last; + const auto frameFirst = frameSettings.GetFirstOffset(); + const auto frameLast = frameSettings.GetLastOffset(); if (frameType != EFrameType::EMPTY) { if (!frameLast.Defined() || *frameLast > 0) { @@ -1073,7 +1072,7 @@ public: : TChain1MapTraitsStateBase(name, raw) , QueueBegin(queueBegin) , QueueEnd(queueEnd) - , FrameNeverEmpty(raw.FrameSettings.NeverEmpty) + , FrameNeverEmpty(raw.FrameSettings.IsNonEmpty()) , OutputIsOptional(raw.OutputType->GetKind() == ETypeAnnotationKind::Optional) { } @@ -1285,7 +1284,7 @@ TVector<TChain1MapTraits::TPtr> BuildFoldMapTraits(TQueueParams& queueParams, co case EFrameType::CURRENT: case EFrameType::LAGGING: { TMaybe<ui64> lagQueueIndex; - auto end = *trait.FrameSettings.Last; + auto end = *trait.FrameSettings.GetLastOffset(); YQL_ENSURE(end <= 0); if (end < 0) { YQL_ENSURE(queueParams.LagQueueSize >= ui64(0 - end)); @@ -1296,7 +1295,7 @@ TVector<TChain1MapTraits::TPtr> BuildFoldMapTraits(TQueueParams& queueParams, co break; } case EFrameType::LEADING: { - auto end = *trait.FrameSettings.Last; + auto end = *trait.FrameSettings.GetLastOffset(); YQL_ENSURE(end > 0); ui64 lastRowIndex = currentRowIndex + ui64(end); result.push_back(new TChain1MapTraitsLeading(name, trait, currentRowIndex, lastRowIndex)); @@ -1308,8 +1307,8 @@ TVector<TChain1MapTraits::TPtr> BuildFoldMapTraits(TQueueParams& queueParams, co } case EFrameType::GENERIC: { queueParams.DataQueueNeeded = true; - auto first = trait.FrameSettings.First; - auto last = trait.FrameSettings.Last; + auto first = trait.FrameSettings.GetFirstOffset(); + auto last = trait.FrameSettings.GetLastOffset(); YQL_ENSURE(first.Defined()); ui64 beginIndex = currentRowIndex + *first; ui64 endIndex = last.Defined() ? (currentRowIndex + *last + 1) : Max<ui64>(); @@ -1564,13 +1563,11 @@ TExprNode::TPtr BuildChain1MapUpdateLambda(TPositionHandle pos, const TVector<TC return ctx.NewLambda(pos, ctx.NewArguments(pos, {rowArg, stateArg}), std::move(finalBody)); } +// TODO: fixme bool IsNonCompactFullFrame(const TExprNode& winOnRows, TExprContext& ctx) { YQL_ENSURE(winOnRows.IsCallable("WinOnRows")); - - TWindowFrameSettings frameSettings; - YQL_ENSURE(ParseWindowFrameSettings(winOnRows, frameSettings, ctx)); - - return !frameSettings.IsCompact && !frameSettings.First.Defined() && !frameSettings.Last.Defined(); + TWindowFrameSettings frameSettings = TWindowFrameSettings::Parse(winOnRows, ctx); + return !frameSettings.IsCompact() && !frameSettings.GetFirstOffset().Defined() && !frameSettings.GetLastOffset().Defined(); } TExprNode::TPtr DeduceCompatibleSort(const TExprNode::TPtr& traitsOne, const TExprNode::TPtr& traitsTwo) { @@ -2509,10 +2506,16 @@ TExprNode::TPtr RebuildCalcOverWindowGroup(TPositionHandle pos, const TExprNode: .Done().Ptr(); } -bool ParseWindowFrameSettings(const TExprNode& node, TWindowFrameSettings& settings, TExprContext& ctx) { - settings = {}; +TWindowFrameSettings TWindowFrameSettings::Parse(const TExprNode& node, TExprContext& ctx) { + auto maybeSettings = TryParse(node, ctx); + YQL_ENSURE(maybeSettings); + return *maybeSettings; +} - YQL_ENSURE(node.IsCallable("WinOnRows")); +TMaybe<TWindowFrameSettings> TWindowFrameSettings::TryParse(const TExprNode& node, TExprContext& ctx) { + TWindowFrameSettings settings; + + YQL_ENSURE(node.IsCallable({"WinOnRows", "WinOnRange", "WinOnGroups"})); auto frameSpec = node.Child(0); if (frameSpec->Type() == TExprNode::List) { bool hasBegin = false; @@ -2520,43 +2523,69 @@ bool ParseWindowFrameSettings(const TExprNode& node, TWindowFrameSettings& setti for (const auto& setting : frameSpec->Children()) { if (!EnsureTupleMinSize(*setting, 1, ctx)) { - return false; + return {}; } if (!EnsureAtom(setting->Head(), ctx)) { - return false; + return {}; } const auto settingName = setting->Head().Content(); if (settingName != "begin" && settingName != "end" && settingName != "compact") { ctx.AddError( TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Invalid frame bound '" << settingName << "'")); - return false; + return {}; } if (settingName == "compact") { - settings.IsCompact = true; + settings.Compact = true; continue; } if (!EnsureTupleSize(*setting, 2, ctx)) { - return false; + return {}; } bool& hasBound = (settingName == "begin") ? hasBegin : hasEnd; if (hasBound) { ctx.AddError( TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Duplicate " << settingName << " frame bound detected")); - return false; + return {}; } - TMaybe<i32>& boundValue = (settingName == "begin") ? settings.First : settings.Last; - if (setting->Tail().IsCallable("Int32")) { + TMaybe<i32>& boundOffset = (settingName == "begin") ? settings.FirstOffset : settings.LastOffset; + TExprNode::TPtr& frameBound = (settingName == "begin") ? settings.First : settings.Last; + + if (setting->Tail().IsList()) { + TExprNode::TPtr fb = setting->TailPtr(); + if (!EnsureTupleMinSize(*fb, 1, ctx)) { + return {}; + } + if (!EnsureAtom(fb->Head(), ctx)) { + return {}; + } + + auto type = fb->Head().Content(); + if (type == "currentRow") { + if (fb->ChildrenSize() == 1) { + continue; + } + ctx.AddError(TIssue(ctx.GetPosition(fb->Pos()), TStringBuilder() << "Expecting no value for '" << type << "'")); + return {}; + } + + if (!(type == "preceding" || type == "following")) { + ctx.AddError(TIssue(ctx.GetPosition(fb->Pos()), TStringBuilder() << "Expecting preceding or follwing, but got '" << type << "'")); + return {}; + } + + frameBound = fb; + } else if (setting->Tail().IsCallable("Int32")) { auto& valNode = setting->Tail().Head(); YQL_ENSURE(valNode.IsAtom()); i32 value; YQL_ENSURE(TryFromString(valNode.Content(), value)); - boundValue = value; + boundOffset = value; } else if (!setting->Tail().IsCallable("Void")) { const TTypeAnnotationNode* type = setting->Tail().GetTypeAnn(); TStringBuilder errMsg; @@ -2570,7 +2599,7 @@ bool ParseWindowFrameSettings(const TExprNode& node, TWindowFrameSettings& setti ctx.AddError(TIssue(ctx.GetPosition(setting->Tail().Pos()), TStringBuilder() << "Invalid " << settingName << " frame bound - expecting Void or Int32 callable, but got: " << errMsg)); - return false; + return {}; } hasBound = true; } @@ -2578,28 +2607,38 @@ bool ParseWindowFrameSettings(const TExprNode& node, TWindowFrameSettings& setti if (!hasBegin || !hasEnd) { ctx.AddError(TIssue(ctx.GetPosition(frameSpec->Pos()), TStringBuilder() << "Missing " << (!hasBegin ? "begin" : "end") << " bound in frame definition")); - return false; + return {}; } } else if (frameSpec->IsCallable("Void")) { - settings.First = {}; - settings.Last = 0; + settings.FirstOffset = {}; + settings.LastOffset = 0; } else { const TTypeAnnotationNode* type = frameSpec->GetTypeAnn(); ctx.AddError(TIssue(ctx.GetPosition(frameSpec->Pos()), TStringBuilder() << "Invalid window frame - expecting Tuple or Void, but got: " << (type ? FormatType(type) : "lambda"))); - return false; + return {}; } // frame will always contain rows if it includes current row - if (!settings.First.Defined()) { - settings.NeverEmpty = !settings.Last.Defined() || *settings.Last >= 0; - } else if (!settings.Last.Defined()) { - settings.NeverEmpty = !settings.First.Defined() || *settings.First <= 0; + if (!settings.FirstOffset) { + settings.NeverEmpty = !settings.LastOffset.Defined() || *settings.LastOffset >= 0; + } else if (!settings.LastOffset.Defined()) { + settings.NeverEmpty = !settings.FirstOffset.Defined() || *settings.FirstOffset <= 0; } else { - settings.NeverEmpty = *settings.First <= *settings.Last && *settings.First <= 0 && *settings.Last >= 0; + settings.NeverEmpty = *settings.FirstOffset <= *settings.LastOffset && *settings.FirstOffset <= 0 && *settings.LastOffset >= 0; } - return true; + return settings; +} + +TMaybe<i32> TWindowFrameSettings::GetFirstOffset() const { + YQL_ENSURE(Type == FrameByRows); + return FirstOffset; +} + +TMaybe<i32> TWindowFrameSettings::GetLastOffset() const { + YQL_ENSURE(Type == FrameByRows); + return LastOffset; } TExprNode::TPtr ZipWithSessionParamsLambda(TPositionHandle pos, const TExprNode::TPtr& partitionKeySelector, diff --git a/ydb/library/yql/core/yql_opt_window.h b/ydb/library/yql/core/yql_opt_window.h index 9664840581c..c0c7d8fa69a 100644 --- a/ydb/library/yql/core/yql_opt_window.h +++ b/ydb/library/yql/core/yql_opt_window.h @@ -8,14 +8,42 @@ TExprNode::TPtr ExpandCalcOverWindow(const TExprNode::TPtr& node, TExprContext& TExprNodeList ExtractCalcsOverWindow(const TExprNode::TPtr& node, TExprContext& ctx); TExprNode::TPtr RebuildCalcOverWindowGroup(TPositionHandle pos, const TExprNode::TPtr& input, const TExprNodeList& calcs, TExprContext& ctx); -struct TWindowFrameSettings { - TMaybe<i32> First; - TMaybe<i32> Last; - bool NeverEmpty = true; - bool IsCompact = false; +enum EFrameType { + FrameByRows, + FrameByRange, + FrameByGroups, }; -bool ParseWindowFrameSettings(const TExprNode& node, TWindowFrameSettings& settings, TExprContext& ctx); +using NNodes::TCoWinOnBase; +using NNodes::TCoFrameBound; + +bool IsUnbounded(const NNodes::TCoFrameBound& bound); +bool IsCurrentRow(const NNodes::TCoFrameBound& bound); + +class TWindowFrameSettings { +public: + static TWindowFrameSettings Parse(const TExprNode& node, TExprContext& ctx); + static TMaybe<TWindowFrameSettings> TryParse(const TExprNode& node, TExprContext& ctx); + + // This two functions can only be used for FrameByRows or FrameByGroups + TMaybe<i32> GetFirstOffset() const; + TMaybe<i32> GetLastOffset() const; + + TCoFrameBound GetFirst(bool& isPreceding) const; + TCoFrameBound GetLast(bool& isPreceding) const; + + bool IsNonEmpty() const { return NeverEmpty; } + bool IsCompact() const { return Compact; } + EFrameType GetFrameType() const { return Type; } +private: + EFrameType Type = FrameByRows; + TExprNode::TPtr First; + TMaybe<i32> FirstOffset; + TExprNode::TPtr Last; + TMaybe<i32> LastOffset; + bool NeverEmpty = true; + bool Compact = false; +}; // Lambda(input: Stream/List<T>) -> Stream/List<Tuple<T, SessionKey, SessionState, ....>> // input is assumed to be partitioned by partitionKeySelector |