diff options
author | aneporada <aneporada@ydb.tech> | 2023-10-14 09:09:36 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-10-14 09:30:46 +0300 |
commit | 82c487106cdf6fa8ae9a18967e53de52fb52e4e8 (patch) | |
tree | 63af0be9f2b3f7fd329e00aa2cfd6636138d1618 | |
parent | 15eb57cc7fd28d7d66d2606e30905951bb64bf7a (diff) | |
download | ydb-82c487106cdf6fa8ae9a18967e53de52fb52e4e8.tar.gz |
Improve handling of ExtractMembers over Aggregate
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_co_extr_members.cpp | 107 | ||||
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_co_simple1.cpp | 14 |
2 files changed, 90 insertions, 31 deletions
diff --git a/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp b/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp index f4155d060c9..ef6a42beda2 100644 --- a/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp @@ -672,6 +672,86 @@ TExprNode::TPtr ApplyExtractMembersToAggregate(const TExprNode::TPtr& node, cons outMembers.insert(x->Content()); } + // prune handlers first + TExprNodeList newHandlers; + bool rebuildHandlers = false; + for (const auto& handler : aggr.Handlers()) { + if (handler.ColumnName().Ref().IsList()) { + // many columns + auto columns = handler.ColumnName().Cast<TCoAtomList>(); + TVector<size_t> liveIndexes; + for (size_t i = 0; i < columns.Size(); ++i) { + if (outMembers.contains(columns.Item(i).Value())) { + liveIndexes.push_back(i); + } + } + + if (liveIndexes.empty()) { + // drop handler + rebuildHandlers = true; + continue; + } else if (liveIndexes.size() < columns.Size() && handler.Trait().Maybe<TCoAggregationTraits>()) { + auto traits = handler.Trait().Cast<TCoAggregationTraits>(); + + TExprNodeList nameNodes; + TExprNodeList finishBody; + TExprNode::TPtr arg = ctx.NewArgument(traits.FinishHandler().Pos(), "arg"); + auto originalTuple = ctx.Builder(traits.FinishHandler().Pos()) + .Apply(traits.FinishHandler().Ref()) + .With(0, arg) + .Seal() + .Build(); + + for (auto& idx : liveIndexes) { + nameNodes.emplace_back(columns.Item(idx).Ptr()); + finishBody.emplace_back(ctx.Builder(traits.FinishHandler().Pos()) + .Callable("Nth") + .Add(0, originalTuple) + .Atom(1, idx) + .Seal() + .Build()); + } + + auto finishLambda = ctx.NewLambda(traits.FinishHandler().Pos(), + ctx.NewArguments(traits.FinishHandler().Pos(), { arg }), + ctx.NewList(traits.FinishHandler().Pos(), std::move(finishBody))); + + auto newHandler = Build<TCoAggregateTuple>(ctx, handler.Pos()) + .InitFrom(handler) + .ColumnName(ctx.NewList(handler.ColumnName().Pos(), std::move(nameNodes))) + .Trait<TCoAggregationTraits>() + .InitFrom(traits) + .FinishHandler(finishLambda) + .Build() + .Done().Ptr(); + + newHandlers.emplace_back(std::move(newHandler)); + rebuildHandlers = true; + continue; + } + } else { + if (!outMembers.contains(handler.ColumnName().Cast<TCoAtom>().Value())) { + // drop handler + rebuildHandlers = true; + continue; + } + } + + newHandlers.push_back(handler.Ptr()); + } + + if (rebuildHandlers) { + YQL_CLOG(DEBUG, Core) << "Apply ExtractMembers to payloads of " << node->Content() << logSuffix; + return Build<TCoExtractMembers>(ctx, aggr.Pos()) + .Input<TCoAggregate>() + .InitFrom(aggr) + .Handlers(ctx.NewList(aggr.Pos(), std::move(newHandlers))) + .Build() + .Members(members) + .Done() + .Ptr(); + } + TMaybe<TStringBuf> sessionColumn; const auto sessionSetting = GetSetting(aggr.Settings().Ref(), "session"); if (sessionSetting) { @@ -698,30 +778,7 @@ TExprNode::TPtr ApplyExtractMembersToAggregate(const TExprNode::TPtr& node, cons } } - TExprNode::TListType newHandlers; for (const auto& handler : aggr.Handlers()) { - if (handler.ColumnName().Ref().IsList()) { - // many columns - bool hasColumns = false; - for (const auto& col : handler.ColumnName().Ref().Children()) { - if (outMembers.contains(col->Content())) { - hasColumns = true; - break; - } - } - - if (!hasColumns) { - // drop handler - continue; - } - } else { - if (!outMembers.contains(handler.ColumnName().Ref().Content())) { - // drop handler - continue; - } - } - - newHandlers.push_back(handler.Ptr()); if (handler.DistinctName()) { usedFields.insert(handler.DistinctName().Cast().Value()); } else { @@ -791,10 +848,8 @@ TExprNode::TPtr ApplyExtractMembersToAggregate(const TExprNode::TPtr& node, cons YQL_CLOG(DEBUG, Core) << "Apply ExtractMembers to " << node->Content() << logSuffix; return Build<TCoExtractMembers>(ctx, aggr.Pos()) .Input<TCoAggregate>() + .InitFrom(aggr) .Input(newInput) - .Keys(aggr.Keys()) - .Handlers(ctx.NewList(aggr.Pos(), std::move(newHandlers))) - .Settings(aggr.Settings()) .Build() .Members(members) .Done() diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp index 032cec5c2ca..601f1c764a4 100644 --- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp @@ -3028,22 +3028,25 @@ TExprNode::TPtr Normalize(const TCoAggregate& node, TExprContext& ctx) { for (const auto& aggTuple : node.Handlers()) { const TExprNode& columns = aggTuple.ColumnName().Ref(); TVector<TStringBuf> names; - bool namesInOrder = true; + bool needRebuildNames = false; if (columns.IsList()) { for (auto& column : columns.ChildrenList()) { YQL_ENSURE(column->IsAtom()); if (!names.empty()) { - namesInOrder = namesInOrder && (column->Content() >= names.back()); + needRebuildNames = needRebuildNames || (column->Content() < names.back()); } names.push_back(column->Content()); } + if (names.size() == 1) { + needRebuildNames = true; + } } else { YQL_ENSURE(columns.IsAtom()); names.push_back(columns.Content()); } TExprNode::TPtr aggTupleNode = aggTuple.Ptr(); - if (!namesInOrder && aggTuple.Trait().Maybe<TCoAggregationTraits>()) { + if (needRebuildNames && aggTuple.Trait().Maybe<TCoAggregationTraits>()) { auto traits = aggTuple.Trait().Cast<TCoAggregationTraits>(); const TTypeAnnotationNode* finishType = traits.FinishHandler().Ref().GetTypeAnn(); if (finishType->GetKind() == ETypeAnnotationKind::Tuple && finishType->Cast<TTupleExprType>()->GetSize() == names.size()) { @@ -3052,6 +3055,7 @@ TExprNode::TPtr Normalize(const TCoAggregate& node, TExprContext& ctx) { for (size_t i = 0; i < names.size(); ++i) { YQL_ENSURE(originalIndexes.insert({ names[i], i}).second); } + YQL_ENSURE(names.size() == originalIndexes.size()); TExprNodeList nameNodes; TExprNodeList finishBody; @@ -3074,11 +3078,11 @@ TExprNode::TPtr Normalize(const TCoAggregate& node, TExprContext& ctx) { auto finishLambda = ctx.NewLambda(traits.FinishHandler().Pos(), ctx.NewArguments(traits.FinishHandler().Pos(), { arg }), - ctx.NewList(traits.FinishHandler().Pos(), std::move(finishBody))); + (originalIndexes.size() == 1) ? finishBody.front() : ctx.NewList(traits.FinishHandler().Pos(), std::move(finishBody))); aggTupleNode = Build<TCoAggregateTuple>(ctx, aggTuple.Pos()) .InitFrom(aggTuple) - .ColumnName(ctx.NewList(aggTuple.ColumnName().Pos(), std::move(nameNodes))) + .ColumnName((originalIndexes.size() == 1) ? nameNodes.front() : ctx.NewList(aggTuple.ColumnName().Pos(), std::move(nameNodes))) .Trait<TCoAggregationTraits>() .InitFrom(traits) .FinishHandler(finishLambda) |