summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitaly Stoyan <[email protected]>2022-06-01 22:13:52 +0300
committerVitaly Stoyan <[email protected]>2022-06-01 22:13:52 +0300
commite8fac0102c6dbd874df5ca1799b131090a7da527 (patch)
tree8ba5f92ff4886fe56c61dfc390ade2cd7e566d76
parentb1e1c9a6232b6f081569039d2e9db0b62ea4a7b3 (diff)
YQL-14728 corellated sublinks in projection
ref:399b43117a9e5352723d741ca7113eb2f32468da
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_pgselect.cpp120
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_pg.cpp57
2 files changed, 119 insertions, 58 deletions
diff --git a/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
index fa7cd874455..961423ffcd5 100644
--- a/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
@@ -105,15 +105,17 @@ TExprNode::TPtr JoinColumns(TPositionHandle pos, const TExprNode::TPtr& list1, c
.Build();
}
-std::pair<TExprNode::TPtr, TExprNode::TPtr> RewriteSubLinks(TPositionHandle pos, const TExprNode::TPtr& list, const TExprNode::TPtr& lambda,
- const TNodeMap<ui32>& subLinks, TExprContext& ctx, TOptimizeContext& optCtx) {
- Y_UNUSED(pos);
+std::pair<TExprNode::TPtr, TExprNode::TPtr> RewriteSubLinks(TPositionHandle pos,
+ const TExprNode::TPtr& list, const TExprNode::TPtr& lambda,
+ const TNodeMap<ui32>& subLinks, const TExprNode::TPtr& joins, TExprContext& ctx, TOptimizeContext& optCtx) {
ui32 sublinkColumnIndex = 0;
auto newList = list;
- auto newLambda = lambda;
auto originalRow = lambda->Head().HeadPtr();
+ auto arg = ctx.NewArgument(pos, "row");
+ auto arguments = ctx.NewArguments(pos, { arg });
+ auto root = lambda->TailPtr();
TNodeOnNodeOwnedMap deepClones;
- auto status = OptimizeExpr(newLambda, newLambda, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
+ auto status = OptimizeExpr(root, root, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
auto it = subLinks.find(node.Get());
if (it != subLinks.end()) {
auto linkType = node->Head().Content();
@@ -248,14 +250,14 @@ std::pair<TExprNode::TPtr, TExprNode::TPtr> RewriteSubLinks(TPositionHandle pos,
auto outerList = ctx.Builder(node->Pos())
.Callable("ExtractMembers")
- .Add(0, list)
+ .Add(0, joins)
.Add(1, colList)
.Seal()
.Build();
auto uniqueOuterList = ctx.Builder(node->Pos())
.Callable("Aggregate")
- .Add(0, list)
+ .Add(0, outerList)
.Add(1, colList)
.List(2)
.Seal()
@@ -415,13 +417,20 @@ std::pair<TExprNode::TPtr, TExprNode::TPtr> RewriteSubLinks(TPositionHandle pos,
if (linkType == "exists") {
return ctx.Builder(node->Pos())
- .Callable(">")
- .Callable(0, "Member")
- .Add(0, originalRow)
- .Atom(1, columnName)
- .Seal()
- .Callable(1, "Uint64")
- .Atom(0, "0")
+ .Callable("ToPg")
+ .Callable(0, ">")
+ .Callable(0, "Coalesce")
+ .Callable(0, "Member")
+ .Add(0, originalRow)
+ .Atom(1, columnName)
+ .Seal()
+ .Callable(1, "Uint64")
+ .Atom(0, "0")
+ .Seal()
+ .Seal()
+ .Callable(1, "Uint64")
+ .Atom(0, "0")
+ .Seal()
.Seal()
.Seal()
.Build();
@@ -453,31 +462,35 @@ std::pair<TExprNode::TPtr, TExprNode::TPtr> RewriteSubLinks(TPositionHandle pos,
.Build();
} else if (linkType == "any") {
return ctx.Builder(node->Pos())
- .Callable(">")
- .Callable(0, "Member")
- .Add(0, originalRow)
- .Atom(1, columnName)
- .Seal()
- .Callable(1, "Uint64")
- .Atom(0, "0")
+ .Callable("ToPg")
+ .Callable(0, ">")
+ .Callable(0, "Member")
+ .Add(0, originalRow)
+ .Atom(1, columnName)
+ .Seal()
+ .Callable(1, "Uint64")
+ .Atom(0, "0")
+ .Seal()
.Seal()
.Seal()
.Build();
} else if (linkType == "all") {
return ctx.Builder(node->Pos())
- .Callable("==")
- .Callable(0, "Coalesce")
- .Callable(0, "Member")
- .Add(0, originalRow)
- .Atom(1, columnName)
+ .Callable("ToPg")
+ .Callable(0, "==")
+ .Callable(0, "Coalesce")
+ .Callable(0, "Member")
+ .Add(0, originalRow)
+ .Atom(1, columnName)
+ .Seal()
+ .Callable(1, "Uint64")
+ .Atom(0, "0")
+ .Seal()
.Seal()
.Callable(1, "Uint64")
.Atom(0, "0")
.Seal()
.Seal()
- .Callable(1, "Uint64")
- .Atom(0, "0")
- .Seal()
.Seal()
.Build();
}
@@ -489,6 +502,8 @@ std::pair<TExprNode::TPtr, TExprNode::TPtr> RewriteSubLinks(TPositionHandle pos,
return node;
}, ctx, TOptimizeExprSettings(optCtx.Types));
+ root = ctx.ReplaceNode(std::move(root), *originalRow, arg);
+ auto newLambda = ctx.NewLambda(pos, std::move(arguments), std::move(root));
YQL_ENSURE(status.Level != IGraphTransformer::TStatus::Error);
return {
newList,
@@ -497,10 +512,10 @@ std::pair<TExprNode::TPtr, TExprNode::TPtr> RewriteSubLinks(TPositionHandle pos,
}
TExprNode::TPtr BuildFilter(TPositionHandle pos, const TExprNode::TPtr& list, const TExprNode::TPtr& filter, TExprContext& ctx, TOptimizeContext& optCtx) {
- TExprNode::TPtr actualList = list, actualFilter = filter->Tail().TailPtr();
- auto subLinks = GatherSubLinks(actualFilter);
+ TExprNode::TPtr actualList = list, actualFilter = filter;
+ auto subLinks = GatherSubLinks(filter);
if (!subLinks.empty()) {
- std::tie(actualList, actualFilter) = RewriteSubLinks(filter->Pos(), actualList, actualFilter, subLinks, ctx, optCtx);
+ std::tie(actualList, actualFilter) = RewriteSubLinks(filter->Pos(), list, filter, subLinks, list, ctx, optCtx);
}
return ctx.Builder(pos)
@@ -593,12 +608,18 @@ TExprNode::TPtr BuildValues(TPositionHandle pos, const TExprNode::TPtr& values,
}
std::tuple<TExprNode::TPtr, TExprNode::TPtr> BuildOneRow(TPositionHandle pos, const TExprNode::TPtr& result, TExprContext& ctx) {
+ auto arg = ctx.NewArgument(pos, "row");
+ auto arguments = ctx.NewArguments(pos, { arg });
+
TExprNode::TListType rowItems;
for (const auto& x : result->Tail().Children()) {
- rowItems.push_back(ctx.NewList(x->Pos(), { x->HeadPtr(), x->Tail().TailPtr() }));
+ auto value = ctx.ReplaceNode(x->Tail().TailPtr(), x->Tail().Head().Head(), arg);
+ rowItems.push_back(ctx.NewList(x->Pos(), { x->HeadPtr(), value }));
}
auto row = ctx.NewCallable(pos, "AsStruct", std::move(rowItems));
+ auto projectionLambda = ctx.NewLambda(pos, std::move(arguments), std::move(row));
+
auto list = ctx.Builder(pos)
.Callable("AsList")
.Callable(0, "AsStruct")
@@ -606,13 +627,6 @@ std::tuple<TExprNode::TPtr, TExprNode::TPtr> BuildOneRow(TPositionHandle pos, co
.Seal()
.Build();
- auto projectionLambda = ctx.Builder(pos)
- .Lambda()
- .Param("row")
- .Set(row)
- .Seal()
- .Build();
-
return { list, projectionLambda };
}
@@ -625,6 +639,21 @@ TUsedColumns GatherUsedColumns(const TExprNode::TPtr& result, const TExprNode::T
for (auto item : type->GetItems()) {
usedColumns.insert(std::make_pair(TString(item->GetName()), std::make_pair(Max<ui32>(), TString())));
}
+
+ auto subLinks = GatherSubLinks(x->TailPtr());
+ for (const auto& s : subLinks) {
+ auto extColumns = ExtractExternalColumns(s.first->Tail());
+ for (const auto& c : extColumns) {
+ usedColumns.insert(std::make_pair(c, std::make_pair(Max<ui32>(), TString())));
+ }
+
+ if (!s.first->Child(2)->IsCallable("Void")) {
+ auto type = s.first->Child(2)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
+ for (auto item : type->GetItems()) {
+ usedColumns.insert(std::make_pair(TString(item->GetName()), std::make_pair(Max<ui32>(), TString())));
+ }
+ }
+ }
}
for (ui32 groupNo = 0; groupNo < joinOps->Tail().ChildrenSize(); ++groupNo) {
@@ -849,7 +878,7 @@ std::tuple<TVector<ui32>, TExprNode::TListType> BuildJoinGroups(TPositionHandle
TExprNode::TPtr filteredCartesian;
if (joinType != "cross") {
- filteredCartesian = BuildFilter(pos, cartesian, join, ctx, optCtx);
+ filteredCartesian = BuildFilter(pos, cartesian, join->Tail().TailPtr(), ctx, optCtx);
}
if (joinType == "cross") {
@@ -1581,8 +1610,9 @@ TExprNode::TPtr ExpandPgSelectImpl(const TExprNode::TPtr& node, TExprContext& ct
list = JoinColumns(node->Pos(), list, outer, nullptr, ctx);
}
+ auto joins = list;
if (filter) {
- list = BuildFilter(node->Pos(), list, filter, ctx, optCtx);
+ list = BuildFilter(node->Pos(), list, filter->Tail().TailPtr(), ctx, optCtx);
}
TAggs aggs;
@@ -1593,6 +1623,12 @@ TExprNode::TPtr ExpandPgSelectImpl(const TExprNode::TPtr& node, TExprContext& ct
}
list = BuildWindows(node->Pos(), list, window, projectionLambda, ctx, optCtx);
+ auto projectionSubLinks = GatherSubLinks(projectionLambda);
+ if (!projectionSubLinks.empty()) {
+ std::tie(list, projectionLambda) = RewriteSubLinks(projectionLambda->Pos(), list, projectionLambda,
+ projectionSubLinks, joins, ctx, optCtx);
+ }
+
if (finalExtTypes) {
projectionLambda = AddExtColumns(projectionLambda, finalExtTypes->TailPtr(), ctx);
}
diff --git a/ydb/library/yql/core/type_ann/type_ann_pg.cpp b/ydb/library/yql/core/type_ann/type_ann_pg.cpp
index e8933a7af7a..e08650c54fe 100644
--- a/ydb/library/yql/core/type_ann/type_ann_pg.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_pg.cpp
@@ -1462,7 +1462,8 @@ bool ScanColumns(TExprNode::TPtr root, TInputs& inputs, const THashSet<TString>&
}
bool ScanColumnsForSublinks(bool& needRebuildSubLinks, const TNodeSet& sublinks,
- TInputs& inputs, const THashSet<TString>& possibleAliases, bool& hasColumnRef, THashSet<TString>& refs, TExtContext& ctx) {
+ TInputs& inputs, const THashSet<TString>& possibleAliases, bool& hasColumnRef, THashSet<TString>& refs,
+ THashMap<TString, THashSet<TString>>* qualifiedRefs, TExtContext& ctx) {
needRebuildSubLinks = false;
for (const auto& s : sublinks) {
if (s->Child(1)->IsCallable("Void")) {
@@ -1473,7 +1474,7 @@ bool ScanColumnsForSublinks(bool& needRebuildSubLinks, const TNodeSet& sublinks,
if (!testRowLambda.IsCallable("Void")) {
YQL_ENSURE(testRowLambda.IsLambda());
if (!ScanColumns(testRowLambda.TailPtr(), inputs, possibleAliases, nullptr, hasColumnRef,
- refs, nullptr, ctx)) {
+ refs, qualifiedRefs, ctx)) {
return false;
}
}
@@ -2000,11 +2001,20 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN
THashMap<TString, THashSet<TString>> qualifiedRefs;
if (column->Child(1)->IsCallable("Void")) {
// no effective type yet, scan lambda body
+ TNodeSet sublinks;
+ ScanSublinks(column->Tail().TailPtr(), sublinks);
+
if (!ScanColumns(column->Tail().TailPtr(), joinInputs, possibleAliases, &hasStar, hasColumnRef,
refs, &qualifiedRefs, ctx)) {
return IGraphTransformer::TStatus::Error;
}
+ bool needRebuildSubLinks;
+ if (!ScanColumnsForSublinks(needRebuildSubLinks, sublinks, joinInputs, possibleAliases,
+ hasColumnRef, refs, &qualifiedRefs, ctx)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
if (!scanColumnsOnly) {
TVector<const TItemExprType*> items;
AddColumns(joinInputs, &hasStar, refs, &qualifiedRefs, items);
@@ -2013,23 +2023,37 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN
return IGraphTransformer::TStatus::Error;
}
- auto expandedColumns = column->HeadPtr();
auto typeNode = ExpandType(column->Pos(), *effectiveType, ctx.Expr);
- auto argNode = ctx.Expr.NewArgument(column->Pos(), "row");
- auto arguments = ctx.Expr.NewArguments(column->Pos(), { argNode });
- TExprNode::TPtr newRoot;
- auto status = RebuildLambdaColumns(column->Tail().TailPtr(), argNode, newRoot, joinInputs, &expandedColumns, ctx);
- if (status == IGraphTransformer::TStatus::Error) {
- return IGraphTransformer::TStatus::Error;
- }
+ auto newColumnChildren = column->ChildrenList();
+ if (needRebuildSubLinks) {
+ auto arguments = ctx.Expr.NewArguments(column->Pos(), { });
+
+ TExprNode::TPtr newRoot;
+ auto status = RebuildSubLinks(column->Tail().TailPtr(), newRoot, sublinks, joinInputs, typeNode, ctx);
+ if (status == IGraphTransformer::TStatus::Error) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto newLambda = ctx.Expr.NewLambda(column->Pos(), std::move(arguments), std::move(newRoot));
+ newColumnChildren[2] = newLambda;
+ } else {
+ auto argNode = ctx.Expr.NewArgument(column->Pos(), "row");
+ auto arguments = ctx.Expr.NewArguments(column->Pos(), { argNode });
+ auto expandedColumns = column->HeadPtr();
+
+ TExprNode::TPtr newRoot;
+ auto status = RebuildLambdaColumns(column->Tail().TailPtr(), argNode, newRoot, joinInputs, &expandedColumns, ctx);
+ if (status == IGraphTransformer::TStatus::Error) {
+ return IGraphTransformer::TStatus::Error;
+ }
- auto newLambda = ctx.Expr.NewLambda(column->Pos(), std::move(arguments), std::move(newRoot));
+ auto newLambda = ctx.Expr.NewLambda(column->Pos(), std::move(arguments), std::move(newRoot));
+ newColumnChildren[0] = expandedColumns;
+ newColumnChildren[1] = typeNode;
+ newColumnChildren[2] = newLambda;
+ }
- auto newColumnChildren = column->ChildrenList();
- newColumnChildren[0] = expandedColumns;
- newColumnChildren[1] = typeNode;
- newColumnChildren[2] = newLambda;
auto newColumn = ctx.Expr.NewCallable(column->Pos(), "PgResultItem", std::move(newColumnChildren));
newResult.push_back(newColumn);
}
@@ -2221,7 +2245,8 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN
}
bool needRebuildSubLinks;
- if (!ScanColumnsForSublinks(needRebuildSubLinks, sublinks, joinInputs, possibleAliases, hasColumnRef, refs, ctx)) {
+ if (!ScanColumnsForSublinks(needRebuildSubLinks, sublinks, joinInputs, possibleAliases, hasColumnRef,
+ refs, nullptr, ctx)) {
return IGraphTransformer::TStatus::Error;
}