aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-10-14 09:09:36 +0300
committeraneporada <aneporada@ydb.tech>2023-10-14 09:30:46 +0300
commit82c487106cdf6fa8ae9a18967e53de52fb52e4e8 (patch)
tree63af0be9f2b3f7fd329e00aa2cfd6636138d1618
parent15eb57cc7fd28d7d66d2606e30905951bb64bf7a (diff)
downloadydb-82c487106cdf6fa8ae9a18967e53de52fb52e4e8.tar.gz
Improve handling of ExtractMembers over Aggregate
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_extr_members.cpp107
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_simple1.cpp14
2 files changed, 90 insertions, 31 deletions
diff --git a/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp b/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp
index f4155d060c9..ef6a42beda2 100644
--- a/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp
@@ -672,6 +672,86 @@ TExprNode::TPtr ApplyExtractMembersToAggregate(const TExprNode::TPtr& node, cons
outMembers.insert(x->Content());
}
+ // prune handlers first
+ TExprNodeList newHandlers;
+ bool rebuildHandlers = false;
+ for (const auto& handler : aggr.Handlers()) {
+ if (handler.ColumnName().Ref().IsList()) {
+ // many columns
+ auto columns = handler.ColumnName().Cast<TCoAtomList>();
+ TVector<size_t> liveIndexes;
+ for (size_t i = 0; i < columns.Size(); ++i) {
+ if (outMembers.contains(columns.Item(i).Value())) {
+ liveIndexes.push_back(i);
+ }
+ }
+
+ if (liveIndexes.empty()) {
+ // drop handler
+ rebuildHandlers = true;
+ continue;
+ } else if (liveIndexes.size() < columns.Size() && handler.Trait().Maybe<TCoAggregationTraits>()) {
+ auto traits = handler.Trait().Cast<TCoAggregationTraits>();
+
+ TExprNodeList nameNodes;
+ TExprNodeList finishBody;
+ TExprNode::TPtr arg = ctx.NewArgument(traits.FinishHandler().Pos(), "arg");
+ auto originalTuple = ctx.Builder(traits.FinishHandler().Pos())
+ .Apply(traits.FinishHandler().Ref())
+ .With(0, arg)
+ .Seal()
+ .Build();
+
+ for (auto& idx : liveIndexes) {
+ nameNodes.emplace_back(columns.Item(idx).Ptr());
+ finishBody.emplace_back(ctx.Builder(traits.FinishHandler().Pos())
+ .Callable("Nth")
+ .Add(0, originalTuple)
+ .Atom(1, idx)
+ .Seal()
+ .Build());
+ }
+
+ auto finishLambda = ctx.NewLambda(traits.FinishHandler().Pos(),
+ ctx.NewArguments(traits.FinishHandler().Pos(), { arg }),
+ ctx.NewList(traits.FinishHandler().Pos(), std::move(finishBody)));
+
+ auto newHandler = Build<TCoAggregateTuple>(ctx, handler.Pos())
+ .InitFrom(handler)
+ .ColumnName(ctx.NewList(handler.ColumnName().Pos(), std::move(nameNodes)))
+ .Trait<TCoAggregationTraits>()
+ .InitFrom(traits)
+ .FinishHandler(finishLambda)
+ .Build()
+ .Done().Ptr();
+
+ newHandlers.emplace_back(std::move(newHandler));
+ rebuildHandlers = true;
+ continue;
+ }
+ } else {
+ if (!outMembers.contains(handler.ColumnName().Cast<TCoAtom>().Value())) {
+ // drop handler
+ rebuildHandlers = true;
+ continue;
+ }
+ }
+
+ newHandlers.push_back(handler.Ptr());
+ }
+
+ if (rebuildHandlers) {
+ YQL_CLOG(DEBUG, Core) << "Apply ExtractMembers to payloads of " << node->Content() << logSuffix;
+ return Build<TCoExtractMembers>(ctx, aggr.Pos())
+ .Input<TCoAggregate>()
+ .InitFrom(aggr)
+ .Handlers(ctx.NewList(aggr.Pos(), std::move(newHandlers)))
+ .Build()
+ .Members(members)
+ .Done()
+ .Ptr();
+ }
+
TMaybe<TStringBuf> sessionColumn;
const auto sessionSetting = GetSetting(aggr.Settings().Ref(), "session");
if (sessionSetting) {
@@ -698,30 +778,7 @@ TExprNode::TPtr ApplyExtractMembersToAggregate(const TExprNode::TPtr& node, cons
}
}
- TExprNode::TListType newHandlers;
for (const auto& handler : aggr.Handlers()) {
- if (handler.ColumnName().Ref().IsList()) {
- // many columns
- bool hasColumns = false;
- for (const auto& col : handler.ColumnName().Ref().Children()) {
- if (outMembers.contains(col->Content())) {
- hasColumns = true;
- break;
- }
- }
-
- if (!hasColumns) {
- // drop handler
- continue;
- }
- } else {
- if (!outMembers.contains(handler.ColumnName().Ref().Content())) {
- // drop handler
- continue;
- }
- }
-
- newHandlers.push_back(handler.Ptr());
if (handler.DistinctName()) {
usedFields.insert(handler.DistinctName().Cast().Value());
} else {
@@ -791,10 +848,8 @@ TExprNode::TPtr ApplyExtractMembersToAggregate(const TExprNode::TPtr& node, cons
YQL_CLOG(DEBUG, Core) << "Apply ExtractMembers to " << node->Content() << logSuffix;
return Build<TCoExtractMembers>(ctx, aggr.Pos())
.Input<TCoAggregate>()
+ .InitFrom(aggr)
.Input(newInput)
- .Keys(aggr.Keys())
- .Handlers(ctx.NewList(aggr.Pos(), std::move(newHandlers)))
- .Settings(aggr.Settings())
.Build()
.Members(members)
.Done()
diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp
index 032cec5c2ca..601f1c764a4 100644
--- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp
@@ -3028,22 +3028,25 @@ TExprNode::TPtr Normalize(const TCoAggregate& node, TExprContext& ctx) {
for (const auto& aggTuple : node.Handlers()) {
const TExprNode& columns = aggTuple.ColumnName().Ref();
TVector<TStringBuf> names;
- bool namesInOrder = true;
+ bool needRebuildNames = false;
if (columns.IsList()) {
for (auto& column : columns.ChildrenList()) {
YQL_ENSURE(column->IsAtom());
if (!names.empty()) {
- namesInOrder = namesInOrder && (column->Content() >= names.back());
+ needRebuildNames = needRebuildNames || (column->Content() < names.back());
}
names.push_back(column->Content());
}
+ if (names.size() == 1) {
+ needRebuildNames = true;
+ }
} else {
YQL_ENSURE(columns.IsAtom());
names.push_back(columns.Content());
}
TExprNode::TPtr aggTupleNode = aggTuple.Ptr();
- if (!namesInOrder && aggTuple.Trait().Maybe<TCoAggregationTraits>()) {
+ if (needRebuildNames && aggTuple.Trait().Maybe<TCoAggregationTraits>()) {
auto traits = aggTuple.Trait().Cast<TCoAggregationTraits>();
const TTypeAnnotationNode* finishType = traits.FinishHandler().Ref().GetTypeAnn();
if (finishType->GetKind() == ETypeAnnotationKind::Tuple && finishType->Cast<TTupleExprType>()->GetSize() == names.size()) {
@@ -3052,6 +3055,7 @@ TExprNode::TPtr Normalize(const TCoAggregate& node, TExprContext& ctx) {
for (size_t i = 0; i < names.size(); ++i) {
YQL_ENSURE(originalIndexes.insert({ names[i], i}).second);
}
+ YQL_ENSURE(names.size() == originalIndexes.size());
TExprNodeList nameNodes;
TExprNodeList finishBody;
@@ -3074,11 +3078,11 @@ TExprNode::TPtr Normalize(const TCoAggregate& node, TExprContext& ctx) {
auto finishLambda = ctx.NewLambda(traits.FinishHandler().Pos(),
ctx.NewArguments(traits.FinishHandler().Pos(), { arg }),
- ctx.NewList(traits.FinishHandler().Pos(), std::move(finishBody)));
+ (originalIndexes.size() == 1) ? finishBody.front() : ctx.NewList(traits.FinishHandler().Pos(), std::move(finishBody)));
aggTupleNode = Build<TCoAggregateTuple>(ctx, aggTuple.Pos())
.InitFrom(aggTuple)
- .ColumnName(ctx.NewList(aggTuple.ColumnName().Pos(), std::move(nameNodes)))
+ .ColumnName((originalIndexes.size() == 1) ? nameNodes.front() : ctx.NewList(aggTuple.ColumnName().Pos(), std::move(nameNodes)))
.Trait<TCoAggregationTraits>()
.InitFrom(traits)
.FinishHandler(finishLambda)