aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitaly Stoyan <vitstn@gmail.com>2022-06-03 23:15:11 +0300
committerVitaly Stoyan <vitstn@gmail.com>2022-06-03 23:15:11 +0300
commit7a1636325742350f65dddd15efdc67395830171f (patch)
tree61b84df6cfe9e2a5c27185a991811db964cdb820
parent9406dbd646356a468b7f5f43afa0d194bd11e87e (diff)
downloadydb-7a1636325742350f65dddd15efdc67395830171f.tar.gz
YQL-14004 [pg] distinct inside agg functions
ref:0237a37f5c77bd616cf4b25a56215a418b6c3723
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_pgselect.cpp125
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_pg.cpp13
2 files changed, 116 insertions, 22 deletions
diff --git a/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
index 2b664b130be..b86e44bbf4f 100644
--- a/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
@@ -1084,33 +1084,109 @@ std::tuple<TAggs, TNodeMap<ui32>> GatherAggregations(const TExprNode::TPtr& proj
return { aggs, aggId };
}
-TExprNode::TPtr BuildAggregationTraits(TPositionHandle pos, bool onWindow,
+TExprNode::TPtr BuildAggregationTraits(TPositionHandle pos, bool onWindow, const TString& distinctColumnName,
const std::pair<TExprNode::TPtr, TExprNode::TPtr>& agg,
const TExprNode::TPtr& listTypeNode, TExprContext& ctx) {
- auto arg = ctx.NewArgument(pos, "row");
- auto arguments = ctx.NewArguments(pos, { arg });
auto func = agg.first->Head().Content();
- TExprNode::TListType aggFuncArgs;
- for (ui32 j = onWindow ? 3 : 2; j < agg.first->ChildrenSize(); ++j) {
- aggFuncArgs.push_back(ctx.ReplaceNode(agg.first->ChildPtr(j), *agg.second, arg));
- }
+ TExprNode::TPtr type = ctx.Builder(pos)
+ .Callable("ListItemType")
+ .Add(0, listTypeNode)
+ .Seal()
+ .Build();
- auto extractor = ctx.NewLambda(pos, std::move(arguments), std::move(aggFuncArgs));
+ TExprNode::TPtr extractor;
+ if (distinctColumnName) {
+ type = ctx.Builder(pos)
+ .Callable("StructMemberType")
+ .Add(0, type)
+ .Atom(1, distinctColumnName)
+ .Seal()
+ .Build();
+
+ extractor = ctx.Builder(pos)
+ .Lambda()
+ .Param("value")
+ .Arg("value")
+ .Seal()
+ .Build();
+ } else {
+ auto arg = ctx.NewArgument(pos, "row");
+ auto arguments = ctx.NewArguments(pos, { arg });
+ TExprNode::TListType aggFuncArgs;
+ for (ui32 j = onWindow ? 3 : 2; j < agg.first->ChildrenSize(); ++j) {
+ aggFuncArgs.push_back(ctx.ReplaceNode(agg.first->ChildPtr(j), *agg.second, arg));
+ }
+
+ extractor = ctx.NewLambda(pos, std::move(arguments), std::move(aggFuncArgs));
+ }
return ctx.Builder(pos)
- .Callable(onWindow ? "PgWindowTraits" : "PgAggregationTraits")
+ .Callable(TString(onWindow ? "PgWindowTraits" : "PgAggregationTraits") + (distinctColumnName ? "Tuple" : ""))
.Atom(0, func)
- .Callable(1, "ListItemType")
- .Add(0, listTypeNode)
- .Seal()
+ .Add(1, type)
.Add(2, extractor)
.Seal()
.Build();
}
-TExprNode::TPtr BuildGroupByAndHaving(TPositionHandle pos, const TExprNode::TPtr& list, const TAggs& aggs, const TNodeMap<ui32>& aggId,
+TExprNode::TPtr BuildGroupByAndHaving(TPositionHandle pos, TExprNode::TPtr list, const TAggs& aggs, const TNodeMap<ui32>& aggId,
const TExprNode::TPtr& groupBy, const TExprNode::TPtr& having, TExprNode::TPtr& projectionLambda,
const TExprNode::TPtr& finalExtTypes, TExprContext& ctx, TOptimizeContext& optCtx) {
+ bool needRemapForDistinct = false;
+ for (ui32 i = 0; i < aggs.size(); ++i) {
+ if (GetSetting(*aggs[i].first->Child(1), "distinct")) {
+ needRemapForDistinct = true;
+ break;
+ }
+ }
+
+ if (needRemapForDistinct) {
+ auto arg = ctx.NewArgument(pos, "row");
+ auto arguments = ctx.NewArguments(pos, { arg });
+ TExprNode::TListType newColumns;
+ for (ui32 i = 0; i < aggs.size(); ++i) {
+ if (!GetSetting(*aggs[i].first->Child(1), "distinct")) {
+ continue;
+ }
+
+ TExprNode::TListType tupleArgs;
+ for (ui32 j = 2; j < aggs[i].first->ChildrenSize(); ++j) {
+ tupleArgs.push_back(ctx.ReplaceNode(aggs[i].first->ChildPtr(j), *aggs[i].second, arg));
+ }
+
+ auto tuple = ctx.NewList(pos, std::move(tupleArgs));
+ newColumns.push_back(ctx.Builder(pos)
+ .List()
+ .Atom(0, "_yql_distinct_" + ToString(i))
+ .Add(1, tuple)
+ .Seal()
+ .Build());
+ }
+
+ auto newColumnsNode = ctx.NewCallable(pos, "AsStruct", std::move(newColumns));
+ auto root = ctx.Builder(pos)
+ .Callable("FlattenMembers")
+ .List(0)
+ .Atom(0, "")
+ .Add(1, arg)
+ .Seal()
+ .List(1)
+ .Atom(0, "")
+ .Add(1, newColumnsNode)
+ .Seal()
+ .Seal()
+ .Build();
+
+ auto distinctLambda = ctx.NewLambda(pos, std::move(arguments), std::move(root));
+
+ list = ctx.Builder(pos)
+ .Callable("Map")
+ .Add(0, list)
+ .Add(1, distinctLambda)
+ .Seal()
+ .Build();
+ }
+
auto listTypeNode = ctx.Builder(pos)
.Callable("TypeOf")
.Add(0, list)
@@ -1119,13 +1195,24 @@ TExprNode::TPtr BuildGroupByAndHaving(TPositionHandle pos, const TExprNode::TPtr
TExprNode::TListType payloadItems;
for (ui32 i = 0; i < aggs.size(); ++i) {
- auto traits = BuildAggregationTraits(pos, false, aggs[i], listTypeNode, ctx);
- payloadItems.push_back(ctx.Builder(pos)
- .List()
+ const bool distinct = GetSetting(*aggs[i].first->Child(1), "distinct") != nullptr;
+ auto traits = BuildAggregationTraits(pos, false, distinct ? "_yql_distinct_" + ToString(i) : "", aggs[i], listTypeNode, ctx);
+ if (distinct) {
+ payloadItems.push_back(ctx.Builder(pos)
+ .List()
.Atom(0, "_yql_agg_" + ToString(i))
.Add(1, traits)
- .Seal()
- .Build());
+ .Atom(2, "_yql_distinct_" + ToString(i))
+ .Seal()
+ .Build());
+ } else {
+ payloadItems.push_back(ctx.Builder(pos)
+ .List()
+ .Atom(0, "_yql_agg_" + ToString(i))
+ .Add(1, traits)
+ .Seal()
+ .Build());
+ }
}
auto payloadsNode = ctx.NewList(pos, std::move(payloadItems));
@@ -1380,7 +1467,7 @@ TExprNode::TPtr BuildWindows(TPositionHandle pos, const TExprNode::TPtr& list, c
bool isAgg = p.first->IsCallable("PgAggWindowCall");
TExprNode::TPtr value;
if (isAgg) {
- value = BuildAggregationTraits(pos, true, p, listTypeNode, ctx);
+ value = BuildAggregationTraits(pos, true, "", p, listTypeNode, ctx);
} else {
if (name == "row_number") {
value = ctx.Builder(pos)
diff --git a/ydb/library/yql/core/type_ann/type_ann_pg.cpp b/ydb/library/yql/core/type_ann/type_ann_pg.cpp
index d5064eee1b5..0a7c565ed58 100644
--- a/ydb/library/yql/core/type_ann/type_ann_pg.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_pg.cpp
@@ -423,9 +423,16 @@ IGraphTransformer::TStatus PgAggWrapper(const TExprNode::TPtr& input, TExprNode:
}
auto content = setting->Head().Content();
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Unexpected setting " << content << " in aggregate function " << name));
- return IGraphTransformer::TStatus::Error;
+ if (content == "distinct") {
+ if (overWindow) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "distinct over window is not supported"));
+ return IGraphTransformer::TStatus::Error;
+ }
+ } else {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Unexpected setting " << content << " in aggregate function " << name));
+ return IGraphTransformer::TStatus::Error;
+ }
}
TVector<ui32> argTypes;