diff options
author | vvvv <vvvv@ydb.tech> | 2022-08-22 14:13:52 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2022-08-22 14:13:52 +0300 |
commit | 585b7c1363f6ae222441fd24e9ff62872e5b75bb (patch) | |
tree | 36507c8ecce577e20e70e6a8b10703609b47732b | |
parent | e8f0683375a136710ed9228fb6394cdf72de4e06 (diff) | |
download | ydb-585b7c1363f6ae222441fd24e9ff62872e5b75bb.tar.gz |
support of decay cross join in multiusage
-rw-r--r-- | ydb/library/yql/core/common_opt/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_co.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_co_finalizers.cpp | 100 | ||||
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_co_flow2.cpp | 785 | ||||
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_co_transformer.cpp | 19 | ||||
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_flatmap_over_join.cpp | 770 | ||||
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_flatmap_over_join.h | 14 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_join.cpp | 28 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_join.h | 5 |
9 files changed, 930 insertions, 794 deletions
diff --git a/ydb/library/yql/core/common_opt/CMakeLists.txt b/ydb/library/yql/core/common_opt/CMakeLists.txt index 99d70bde80..aa19918c9b 100644 --- a/ydb/library/yql/core/common_opt/CMakeLists.txt +++ b/ydb/library/yql/core/common_opt/CMakeLists.txt @@ -20,6 +20,7 @@ target_link_libraries(yql-core-common_opt PUBLIC ) target_sources(yql-core-common_opt PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/common_opt/yql_flatmap_over_join.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/common_opt/yql_co_finalizers.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/common_opt/yql_co_flow1.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/common_opt/yql_co_flow2.cpp diff --git a/ydb/library/yql/core/common_opt/yql_co.h b/ydb/library/yql/core/common_opt/yql_co.h index b8f2719037..4c82b44241 100644 --- a/ydb/library/yql/core/common_opt/yql_co.h +++ b/ydb/library/yql/core/common_opt/yql_co.h @@ -72,7 +72,7 @@ struct TOptimizeContext { using TCallableOptimizerExt = std::function<TExprNode::TPtr (const TExprNode::TPtr&, TExprContext&, TOptimizeContext&)>; using TCallableOptimizerMap = std::unordered_map<std::string_view, TCallableOptimizerExt>; -using TFinalizingOptimizerExt = std::function<void (const TExprNode::TPtr&, TNodeOnNodeOwnedMap&, TExprContext&, TOptimizeContext&)>; +using TFinalizingOptimizerExt = std::function<bool (const TExprNode::TPtr&, TNodeOnNodeOwnedMap&, TExprContext&, TOptimizeContext&)>; using TFinalizingOptimizerMap = std::unordered_map<std::string_view, TFinalizingOptimizerExt>; struct TCoCallableRules { diff --git a/ydb/library/yql/core/common_opt/yql_co_finalizers.cpp b/ydb/library/yql/core/common_opt/yql_co_finalizers.cpp index 9a72235ea8..8e216c1151 100644 --- a/ydb/library/yql/core/common_opt/yql_co_finalizers.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_finalizers.cpp @@ -1,4 +1,5 @@ #include "yql_co_extr_members.h" +#include "yql_flatmap_over_join.h" #include "yql_co.h" #include <ydb/library/yql/core/yql_expr_type_annotation.h> @@ -12,23 +13,23 @@ namespace { using namespace NNodes; -void SubsetFieldsForNodeWithMultiUsage(const TExprNode::TPtr& node, const TParentsMap& parentsMap, +bool SubsetFieldsForNodeWithMultiUsage(const TExprNode::TPtr& node, const TParentsMap& parentsMap, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, std::function<TExprNode::TPtr(const TExprNode::TPtr&, const TExprNode::TPtr&, const TParentsMap&, TExprContext&)> handler) { // Ignore stream input, because it cannot be used multiple times if (node->GetTypeAnn()->GetKind() != ETypeAnnotationKind::List) { - return; + return false; } auto itemType = node->GetTypeAnn()->Cast<TListExprType>()->GetItemType(); if (itemType->GetKind() != ETypeAnnotationKind::Struct) { - return; + return false; } auto structType = itemType->Cast<TStructExprType>(); auto it = parentsMap.find(node.Get()); if (it == parentsMap.cend() || it->second.size() <= 1) { - return; + return false; } TSet<TStringBuf> usedFields; @@ -37,7 +38,7 @@ void SubsetFieldsForNodeWithMultiUsage(const TExprNode::TPtr& node, const TParen auto flatMap = maybeFlatMap.Cast(); TSet<TStringBuf> lambdaSubset; if (!HaveFieldsSubset(flatMap.Lambda().Body().Ptr(), flatMap.Lambda().Args().Arg(0).Ref(), lambdaSubset, parentsMap)) { - return; + return false; } usedFields.insert(lambdaSubset.cbegin(), lambdaSubset.cend()); } @@ -48,10 +49,10 @@ void SubsetFieldsForNodeWithMultiUsage(const TExprNode::TPtr& node, const TParen } } else { - return; + return false; } if (usedFields.size() == structType->GetSize()) { - return; + return false; } } @@ -62,7 +63,7 @@ void SubsetFieldsForNodeWithMultiUsage(const TExprNode::TPtr& node, const TParen auto newInput = handler(node, ctx.NewList(node->Pos(), std::move(members)), parentsMap, ctx); if (!newInput || newInput == node) { - return; + return false; } for (auto parent: it->second) { @@ -84,6 +85,41 @@ void SubsetFieldsForNodeWithMultiUsage(const TExprNode::TPtr& node, const TParen .Build(); } } + + return true; +} + +IGraphTransformer::TStatus MultiUsageFlatMapOverJoin(const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) { + auto it = (*optCtx.ParentsMap).find(node.Get()); + if (it == (*optCtx.ParentsMap).cend() || it->second.size() <= 1) { + return IGraphTransformer::TStatus::Ok; + } + + TExprNode::TListType newParents; + for (auto parent : it->second) { + if (auto maybeFlatMap = TMaybeNode<TCoFlatMapBase>(parent)) { + auto flatMap = maybeFlatMap.Cast(); + auto newParent = FlatMapOverEquiJoin(flatMap, ctx, *optCtx.ParentsMap, true); + if (!newParent.Raw()) { + return IGraphTransformer::TStatus::Error; + } + + if (newParent.Raw() == flatMap.Raw()) { + return IGraphTransformer::TStatus::Ok; + } + + newParents.push_back(newParent.Ptr()); + } else { + return IGraphTransformer::TStatus::Ok; + } + } + + ui32 index = 0; + for (auto parent : it->second) { + toOptimize[parent] = newParents[index++]; + } + + return IGraphTransformer::TStatus::Repeat; } } @@ -95,6 +131,8 @@ void RegisterCoFinalizers(TFinalizingOptimizerMap& map) { return ApplyExtractMembersToExtend(input, members, ctx, " with multi-usage"); } ); + + return true; }; map[TCoTake::CallableName()] = [](const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) { @@ -103,6 +141,8 @@ void RegisterCoFinalizers(TFinalizingOptimizerMap& map) { return ApplyExtractMembersToTake(input, members, ctx, " with multi-usage"); } ); + + return true; }; map[TCoSkip::CallableName()] = [](const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) { @@ -111,6 +151,8 @@ void RegisterCoFinalizers(TFinalizingOptimizerMap& map) { return ApplyExtractMembersToSkip(input, members, ctx, " with multi-usage"); } ); + + return true; }; map[TCoSkipNullMembers::CallableName()] = [](const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) { @@ -119,6 +161,8 @@ void RegisterCoFinalizers(TFinalizingOptimizerMap& map) { return ApplyExtractMembersToSkipNullMembers(input, members, ctx, " with multi-usage"); } ); + + return true; }; map[TCoFlatMap::CallableName()] = map[TCoOrderedFlatMap::CallableName()] = [](const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) { @@ -127,6 +171,8 @@ void RegisterCoFinalizers(TFinalizingOptimizerMap& map) { return ApplyExtractMembersToFlatMap(input, members, ctx, " with multi-usage"); } ); + + return true; }; map[TCoSort::CallableName()] = map[TCoAssumeSorted::CallableName()] = [](const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) { @@ -135,6 +181,8 @@ void RegisterCoFinalizers(TFinalizingOptimizerMap& map) { return ApplyExtractMembersToSort(input, members, parentsMap, ctx, " with multi-usage"); } ); + + return true; }; map[TCoAssumeUnique::CallableName()] = [](const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) { @@ -143,6 +191,8 @@ void RegisterCoFinalizers(TFinalizingOptimizerMap& map) { return ApplyExtractMembersToAssumeUnique(input, members, ctx, " with multi-usage"); } ); + + return true; }; map[TCoTop::CallableName()] = map[TCoTopSort::CallableName()] = [](const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) { @@ -151,14 +201,28 @@ void RegisterCoFinalizers(TFinalizingOptimizerMap& map) { return ApplyExtractMembersToTop(input, members, parentsMap, ctx, " with multi-usage"); } ); + + return true; }; map[TCoEquiJoin::CallableName()] = [](const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) { - SubsetFieldsForNodeWithMultiUsage(node, *optCtx.ParentsMap, toOptimize, ctx, - [] (const TExprNode::TPtr& input, const TExprNode::TPtr& members, const TParentsMap&, TExprContext& ctx) { + if (SubsetFieldsForNodeWithMultiUsage(node, *optCtx.ParentsMap, toOptimize, ctx, + [](const TExprNode::TPtr& input, const TExprNode::TPtr& members, const TParentsMap&, TExprContext& ctx) { return ApplyExtractMembersToEquiJoin(input, members, ctx, " with multi-usage"); - } - ); + })) { + return true; + } + + auto status = MultiUsageFlatMapOverJoin(node, toOptimize, ctx, optCtx); + if (status == IGraphTransformer::TStatus::Error) { + return false; + } + + if (status == IGraphTransformer::TStatus::Repeat) { + return true; + } + + return true; }; map[TCoPartitionByKey::CallableName()] = [](const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) { @@ -167,6 +231,8 @@ void RegisterCoFinalizers(TFinalizingOptimizerMap& map) { return ApplyExtractMembersToPartitionByKey(input, members, ctx, " with multi-usage"); } ); + + return true; }; map[TCoCalcOverWindowGroup::CallableName()] = map[TCoCalcOverWindow::CallableName()] = @@ -178,6 +244,8 @@ void RegisterCoFinalizers(TFinalizingOptimizerMap& map) { return ApplyExtractMembersToCalcOverWindow(input, members, ctx, " with multi-usage"); } ); + + return true; }; map[TCoAggregate::CallableName()] = [](const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) { @@ -186,6 +254,8 @@ void RegisterCoFinalizers(TFinalizingOptimizerMap& map) { return ApplyExtractMembersToAggregate(input, members, parentsMap, ctx, " with multi-usage"); } ); + + return true; }; map[TCoChopper::CallableName()] = [](const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) { @@ -194,6 +264,8 @@ void RegisterCoFinalizers(TFinalizingOptimizerMap& map) { return ApplyExtractMembersToChopper(input, members, ctx, " with multi-usage"); } ); + + return true; }; map[TCoCollect::CallableName()] = [](const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) { @@ -202,6 +274,8 @@ void RegisterCoFinalizers(TFinalizingOptimizerMap& map) { return ApplyExtractMembersToCollect(input, members, ctx, " with multi-usage"); } ); + + return true; }; map[TCoMapNext::CallableName()] = [](const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) { @@ -210,6 +284,8 @@ void RegisterCoFinalizers(TFinalizingOptimizerMap& map) { return ApplyExtractMembersToMapNext(input, members, ctx, " with multi-usage"); } ); + + return true; }; } diff --git a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp index ff6e32e1c2..8df3f4d297 100644 --- a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp @@ -1,4 +1,5 @@ #include "yql_co_extr_members.h" +#include "yql_flatmap_over_join.h" #include "yql_co.h" #include <ydb/library/yql/core/yql_expr_type_annotation.h> @@ -122,778 +123,6 @@ TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprCon return ret; } -TExprNode::TPtr ConstantPredicatePushdownOverEquiJoin(TExprNode::TPtr equiJoin, TExprNode::TPtr predicate, bool ordered, TExprContext& ctx) { - auto lambda = ctx.Builder(predicate->Pos()) - .Lambda() - .Param("row") - .Set(predicate) - .Seal() - .Build(); - - auto ret = ctx.ShallowCopy(*equiJoin); - auto inputsCount = ret->ChildrenSize() - 2; - for (ui32 i = 0; i < inputsCount; ++i) { - ret->ChildRef(i) = ctx.ShallowCopy(*ret->Child(i)); - ret->Child(i)->ChildRef(0) = ctx.Builder(predicate->Pos()) - .Callable(ordered ? "OrderedFilter" : "Filter") - .Add(0, ret->Child(i)->ChildPtr(0)) - .Add(1, lambda) - .Seal() - .Build(); - } - - return ret; -} - -void GatherKeyAliases(const TExprNode::TPtr& joinTree, TMap<TString, TSet<TString>>& aliases, const TJoinLabels& labels) { - auto left = joinTree->ChildPtr(1); - if (!left->IsAtom()) { - GatherKeyAliases(left, aliases, labels); - } - - auto right = joinTree->ChildPtr(2); - if (!right->IsAtom()) { - GatherKeyAliases(right, aliases, labels); - } - - auto leftColumns = joinTree->Child(3); - auto rightColumns = joinTree->Child(4); - for (ui32 i = 0; i < leftColumns->ChildrenSize(); i += 2) { - auto leftColumn = FullColumnName(leftColumns->Child(i)->Content(), leftColumns->Child(i + 1)->Content()); - auto rightColumn = FullColumnName(rightColumns->Child(i)->Content(), rightColumns->Child(i + 1)->Content()); - auto leftType = *labels.FindColumn(leftColumn); - auto rightType = *labels.FindColumn(rightColumn); - if (IsSameAnnotation(*leftType, *rightType)) { - aliases[leftColumn].insert(rightColumn); - aliases[rightColumn].insert(leftColumn); - } - } -} - -void MakeTransitiveClosure(TMap<TString, TSet<TString>>& aliases) { - for (;;) { - bool hasChanges = false; - for (auto& x : aliases) { - for (auto& y : x.second) { - // x.first->y - for (auto& z : aliases[y]) { - // add x.first->z - if (x.first != z) { - hasChanges = x.second.insert(z).second || hasChanges; - } - } - } - } - - if (!hasChanges) { - return; - } - } -} - -void GatherOptionalKeyColumnsFromEquality(TExprNode::TPtr columns, const TJoinLabels& labels, ui32 inputIndex, - TSet<TString>& optionalKeyColumns) { - for (ui32 i = 0; i < columns->ChildrenSize(); i += 2) { - auto table = columns->Child(i)->Content(); - auto column = columns->Child(i + 1)->Content(); - if (*labels.FindInputIndex(table) == inputIndex) { - auto type = *labels.FindColumn(table, column); - if (type->GetKind() == ETypeAnnotationKind::Optional) { - optionalKeyColumns.insert(FullColumnName(table, column)); - } - } - } -} - -void GatherOptionalKeyColumns(TExprNode::TPtr joinTree, const TJoinLabels& labels, ui32 inputIndex, - TSet<TString>& optionalKeyColumns) { - auto left = joinTree->Child(1); - auto right = joinTree->Child(2); - if (!left->IsAtom()) { - GatherOptionalKeyColumns(left, labels, inputIndex, optionalKeyColumns); - } - - if (!right->IsAtom()) { - GatherOptionalKeyColumns(right, labels, inputIndex, optionalKeyColumns); - } - - auto joinType = joinTree->Child(0)->Content(); - if (joinType == "Inner" || joinType == "LeftSemi") { - GatherOptionalKeyColumnsFromEquality(joinTree->Child(3), labels, inputIndex, optionalKeyColumns); - } - - if (joinType == "Inner" || joinType == "RightSemi") { - GatherOptionalKeyColumnsFromEquality(joinTree->Child(4), labels, inputIndex, optionalKeyColumns); - } -} - -TExprNode::TPtr SingleInputPredicatePushdownOverEquiJoin(TExprNode::TPtr equiJoin, TExprNode::TPtr predicate, - const TSet<TStringBuf>& usedFields, TExprNode::TPtr args, const TJoinLabels& labels, - ui32 firstCandidate, const TMap<TStringBuf, TVector<TStringBuf>>& renameMap, bool ordered, TExprContext& ctx) { - auto inputsCount = equiJoin->ChildrenSize() - 2; - auto joinTree = equiJoin->Child(inputsCount); - TMap<TString, TSet<TString>> aliases; - GatherKeyAliases(joinTree, aliases, labels); - MakeTransitiveClosure(aliases); - TSet<ui32> candidates; - candidates.insert(firstCandidate); - // check whether some used fields are not aliased - bool onlyKeys = true; - for (auto& x : usedFields) { - if (!aliases.contains(TString(x))) { - onlyKeys = false; - break; - } - } - - THashMap<ui32, THashMap<TString, TString>> aliasedKeys; - if (onlyKeys) { - // try to extend inputs - for (ui32 i = 0; i < inputsCount; ++i) { - if (i == firstCandidate) { - continue; - } - - TSet<TString> coveredKeys; - for (auto field : labels.Inputs[i].EnumerateAllColumns()) { - if (auto aliasSet = aliases.FindPtr(field)) { - for (auto alias : *aliasSet) { - if (usedFields.contains(alias)) { - coveredKeys.insert(TString(alias)); - aliasedKeys[i].insert({ field, TString(alias) }); - } - } - } - } - - if (coveredKeys.size() == usedFields.size()) { - candidates.insert(i); - } - } - } - - if (!IsRequiredSide(joinTree, labels, firstCandidate).first) { - return equiJoin; - } - - auto ret = ctx.ShallowCopy(*equiJoin); - for (auto& inputIndex : candidates) { - auto x = IsRequiredSide(joinTree, labels, inputIndex); - if (!x.first) { - continue; - } - - auto prevInput = equiJoin->Child(inputIndex)->ChildPtr(0); - auto newInput = prevInput; - if (x.second) { - // skip null key columns - TSet<TString> optionalKeyColumns; - GatherOptionalKeyColumns(joinTree, labels, inputIndex, optionalKeyColumns); - newInput = FilterOutNullJoinColumns(predicate->Pos(), - prevInput, labels.Inputs[inputIndex], optionalKeyColumns, ctx); - } - - // then apply predicate - newInput = ctx.Builder(predicate->Pos()) - .Callable(ordered ? "OrderedFilter" : "Filter") - .Add(0, newInput) - .Lambda(1) - .Param("row") - .ApplyPartial(args, predicate).With(0) - .Callable("AsStruct") - .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - ui32 index = 0; - const auto& label = labels.Inputs[inputIndex]; - for (auto column : label.EnumerateAllColumns()) { - TVector<TString> targetColumns; - targetColumns.push_back(column); - if (onlyKeys && inputIndex != firstCandidate) { - if (auto aliasedKey = aliasedKeys[inputIndex].FindPtr(column)) { - targetColumns[0] = *aliasedKey; - } else { - continue; - } - } - - TStringBuf part1; - TStringBuf part2; - SplitTableName(column, part1, part2); - auto memberName = label.MemberName(part1, part2); - - if (auto renamed = renameMap.FindPtr(targetColumns[0])) { - if (renamed->empty()) { - continue; - } - - targetColumns.clear(); - for (auto& r : *renamed) { - targetColumns.push_back(TString(r)); - } - } - - for (auto targetColumn : targetColumns) { - parent.List(index++) - .Atom(0, targetColumn) - .Callable(1, "Member") - .Arg(0, "row") - .Atom(1, memberName) - .Seal() - .Seal(); - } - } - - return parent; - }) - .Seal() - .Done().Seal() - .Seal() - .Seal() - .Build(); - - // then return reassembled join - ret->ChildRef(inputIndex) = ctx.ShallowCopy(*ret->Child(inputIndex)); - ret->Child(inputIndex)->ChildRef(0) = newInput; - } - - return ret; -} - -void GatherJoinInputs(const TExprNode::TPtr& expr, const TExprNode& row, - const TParentsMap& parentsMap, const THashMap<TString, TString>& backRenameMap, - const TJoinLabels& labels, TSet<ui32>& inputs, TSet<TStringBuf>& usedFields) { - usedFields.clear(); - - if (!HaveFieldsSubset(expr, row, usedFields, parentsMap, false)) { - const auto inputStructType = RemoveOptionalType(row.GetTypeAnn())->Cast<TStructExprType>(); - for (const auto& i : inputStructType->GetItems()) { - usedFields.insert(i->GetName()); - } - } - - for (auto x : usedFields) { - // rename used fields - if (auto renamed = backRenameMap.FindPtr(x)) { - x = *renamed; - } - - TStringBuf part1; - TStringBuf part2; - SplitTableName(x, part1, part2); - inputs.insert(*labels.FindInputIndex(part1)); - if (inputs.size() == labels.Inputs.size()) { - break; - } - } -} - -class TJoinTreeRebuilder { -public: - TJoinTreeRebuilder(TExprNode::TPtr joinTree, TStringBuf label1, TStringBuf column1, TStringBuf label2, TStringBuf column2, - TExprContext& ctx) - : JoinTree(joinTree) - , Labels{ label1, label2 } - , Columns{ column1, column2 } - , Ctx(ctx) - {} - - TExprNode::TPtr Run() { - auto joinTree = RotateCrossJoin(JoinTree->Pos(), JoinTree); - auto newJoinTree = std::get<0>(AddLink(joinTree)); - YQL_ENSURE(Updated); - return newJoinTree; - } - -private: - TExprNode::TPtr RotateCrossJoin(TPositionHandle pos, TExprNode::TPtr joinTree) { - if (joinTree->Child(0)->Content() != "Cross") { - auto children = joinTree->ChildrenList(); - auto& left = children[1]; - auto& right = children[2]; - - if (!left->IsAtom()) { - left = RotateCrossJoin(pos, left); - } - - if (!right->IsAtom()) { - right = RotateCrossJoin(pos, right); - } - - return Ctx.ChangeChildren(*joinTree, std::move(children)); - } - - CrossJoins.clear(); - RestJoins.clear(); - GatherCross(joinTree); - auto inCross1 = FindPtr(CrossJoins, Labels[0]); - auto inCross2 = FindPtr(CrossJoins, Labels[1]); - if (inCross1 || inCross2) { - if (inCross1 && inCross2) { - // make them a leaf - joinTree = MakeCrossJoin(pos, Ctx.NewAtom(pos, Labels[0]), Ctx.NewAtom(pos, Labels[1]), Ctx); - for (auto label : CrossJoins) { - if (label != Labels[0] && label != Labels[1]) { - joinTree = MakeCrossJoin(pos, joinTree, Ctx.NewAtom(pos, label), Ctx); - } - } - - joinTree = AddRestJoins(pos, joinTree, nullptr); - } else if (inCross1) { - // leaf with table1 and subtree with table2 - auto rest = FindRestJoin(Labels[1]); - YQL_ENSURE(rest); - joinTree = MakeCrossJoin(pos, Ctx.NewAtom(pos, Labels[0]), rest, Ctx); - for (auto label : CrossJoins) { - if (label != Labels[0]) { - joinTree = MakeCrossJoin(pos, joinTree, Ctx.NewAtom(pos, label), Ctx); - } - } - - joinTree = AddRestJoins(pos, joinTree, rest); - } else { - // leaf with table2 and subtree with table1 - auto rest = FindRestJoin(Labels[0]); - YQL_ENSURE(rest); - joinTree = MakeCrossJoin(pos, Ctx.NewAtom(pos, Labels[1]), rest, Ctx); - for (auto label : CrossJoins) { - if (label != Labels[1]) { - joinTree = MakeCrossJoin(pos, joinTree, Ctx.NewAtom(pos, label), Ctx); - } - } - - joinTree = AddRestJoins(pos, joinTree, rest); - } - } - - return joinTree; - } - - TExprNode::TPtr AddRestJoins(TPositionHandle pos, TExprNode::TPtr joinTree, TExprNode::TPtr exclude) { - for (auto join : RestJoins) { - if (join == exclude) { - continue; - } - - joinTree = MakeCrossJoin(pos, joinTree, join, Ctx); - } - - return joinTree; - } - - TExprNode::TPtr FindRestJoin(TStringBuf label) { - for (auto join : RestJoins) { - if (HasTable(join, label)) { - return join; - } - } - - return nullptr; - } - - bool HasTable(TExprNode::TPtr joinTree, TStringBuf label) { - auto left = joinTree->ChildPtr(1); - if (left->IsAtom()) { - if (left->Content() == label) { - return true; - } - } else { - if (HasTable(left, label)) { - return true; - } - } - - auto right = joinTree->ChildPtr(2); - if (right->IsAtom()) { - if (right->Content() == label) { - return true; - } - } else { - if (HasTable(right, label)) { - return true; - } - } - - return false; - } - - void GatherCross(TExprNode::TPtr joinTree) { - auto type = joinTree->Child(0)->Content(); - if (type != "Cross") { - RestJoins.push_back(joinTree); - return; - } - - auto left = joinTree->ChildPtr(1); - if (left->IsAtom()) { - CrossJoins.push_back(left->Content()); - } else { - GatherCross(left); - } - - auto right = joinTree->ChildPtr(2); - if (right->IsAtom()) { - CrossJoins.push_back(right->Content()); - } else { - GatherCross(right); - } - } - - std::tuple<TExprNode::TPtr, TMaybe<ui32>, TMaybe<ui32>> AddLink(TExprNode::TPtr joinTree) { - auto children = joinTree->ChildrenList(); - - TMaybe<ui32> found1; - TMaybe<ui32> found2; - auto& left = children[1]; - if (!left->IsAtom()) { - TMaybe<ui32> leftFound1, leftFound2; - std::tie(left, leftFound1, leftFound2) = AddLink(left); - if (leftFound1) { - found1 = 1u; - } - - if (leftFound2) { - found2 = 1u; - } - } else { - if (left->Content() == Labels[0]) { - found1 = 1u; - } - - if (left->Content() == Labels[1]) { - found2 = 1u; - } - } - - auto& right = children[2]; - if (!right->IsAtom()) { - TMaybe<ui32> rightFound1, rightFound2; - std::tie(right, rightFound1, rightFound2) = AddLink(right); - if (rightFound1) { - found1 = 2u; - } - - if (rightFound2) { - found2 = 2u; - } - } else { - if (right->Content() == Labels[0]) { - found1 = 2u; - } - - if (right->Content() == Labels[1]) { - found2 = 2u; - } - } - - if (found1 && found2) { - if (!Updated) { - if (joinTree->Child(0)->Content() == "Cross") { - children[0] = Ctx.NewAtom(joinTree->Pos(), "Inner"); - } else { - YQL_ENSURE(joinTree->Child(0)->Content() == "Inner"); - } - - ui32 index1 = *found1 - 1; // 0/1 - ui32 index2 = 1 - index1; - - auto link1 = children[3]->ChildrenList(); - link1.push_back(Ctx.NewAtom(joinTree->Pos(), Labels[index1])); - link1.push_back(Ctx.NewAtom(joinTree->Pos(), Columns[index1])); - children[3] = Ctx.ChangeChildren(*children[3], std::move(link1)); - - auto link2 = children[4]->ChildrenList(); - link2.push_back(Ctx.NewAtom(joinTree->Pos(), Labels[index2])); - link2.push_back(Ctx.NewAtom(joinTree->Pos(), Columns[index2])); - children[4] = Ctx.ChangeChildren(*children[4], std::move(link2)); - - Updated = true; - } - } - - return { Ctx.ChangeChildren(*joinTree, std::move(children)), found1, found2 }; - } - -private: - TVector<TStringBuf> CrossJoins; - TVector<TExprNode::TPtr> RestJoins; - - bool Updated = false; - - TExprNode::TPtr JoinTree; - TStringBuf Labels[2]; - TStringBuf Columns[2]; - TExprContext& Ctx; -}; - -TExprNode::TPtr DecayCrossJoinIntoInner(TExprNode::TPtr equiJoin, const TExprNode::TPtr& predicate, - const TJoinLabels& labels, ui32 index1, ui32 index2, const TExprNode& row, const THashMap<TString, TString>& backRenameMap, - const TParentsMap& parentsMap, TExprContext& ctx) { - YQL_ENSURE(index1 != index2); - TExprNode::TPtr left, right; - if (!IsEquality(predicate, left, right)) { - return equiJoin; - } - - TSet<ui32> leftInputs, rightInputs; - TSet<TStringBuf> usedFields; - GatherJoinInputs(left, row, parentsMap, backRenameMap, labels, leftInputs, usedFields); - GatherJoinInputs(right, row, parentsMap, backRenameMap, labels, rightInputs, usedFields); - bool good = false; - if (leftInputs.size() == 1 && rightInputs.size() == 1) { - if (*leftInputs.begin() == index1 && *rightInputs.begin() == index2) { - good = true; - } else if (*leftInputs.begin() == index2 && *rightInputs.begin() == index1) { - good = true; - } - } - - if (!good) { - return equiJoin; - } - - auto inputsCount = equiJoin->ChildrenSize() - 2; - auto joinTree = equiJoin->Child(inputsCount); - if (!IsRequiredSide(joinTree, labels, index1).first || - !IsRequiredSide(joinTree, labels, index2).first) { - return equiJoin; - } - - TStringBuf label1, column1, label2, column2; - if (left->IsCallable("Member") && left->Child(0) == &row) { - auto x = left->Tail().Content(); - if (auto ptr = backRenameMap.FindPtr(x)) { - x = *ptr; - } - - SplitTableName(x, label1, column1); - } else { - return equiJoin; - } - - if (right->IsCallable("Member") && right->Child(0) == &row) { - auto x = right->Tail().Content(); - if (auto ptr = backRenameMap.FindPtr(x)) { - x = *ptr; - } - - SplitTableName(x, label2, column2); - } else { - return equiJoin; - } - - TJoinTreeRebuilder rebuilder(joinTree, label1, column1, label2, column2, ctx); - auto newJoinTree = rebuilder.Run(); - return ctx.ChangeChild(*equiJoin, inputsCount, std::move(newJoinTree)); -} - -TExprNode::TPtr FlatMapOverEquiJoin(const TCoFlatMapBase& node, TExprContext& ctx, const TParentsMap& parentsMap) { - auto equiJoin = node.Input(); - auto structType = equiJoin.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType() - ->Cast<TStructExprType>(); - if (structType->GetSize() == 0) { - return node.Ptr(); - } - - TExprNode::TPtr structNode; - if (IsRenameFlatMap(node, structNode)) { - YQL_CLOG(DEBUG, Core) << "Rename in " << node.CallableName() << " over EquiJoin"; - auto joinSettings = equiJoin.Ref().ChildPtr(equiJoin.Ref().ChildrenSize() - 1); - auto renameMap = LoadJoinRenameMap(*joinSettings); - joinSettings = RemoveSetting(*joinSettings, "rename", ctx); - auto structType = equiJoin.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType() - ->Cast<TStructExprType>(); - THashSet<TStringBuf> usedFields; - TMap<TStringBuf, TVector<TStringBuf>> memberUsageMap; - for (auto& child : structNode->Children()) { - auto item = child->Child(1); - usedFields.insert(item->Child(1)->Content()); - memberUsageMap[item->Child(1)->Content()].push_back(child->Child(0)->Content()); - } - - TMap<TStringBuf, TStringBuf> reversedRenameMap; - TMap<TStringBuf, TVector<TStringBuf>> newRenameMap; - for (auto& x : renameMap) { - if (!x.second.empty()) { - for (auto& y : x.second) { - reversedRenameMap[y] = x.first; - } - } - else { - // previous drops - newRenameMap[x.first].clear(); - } - } - - for (auto& x : structType->GetItems()) { - if (!usedFields.contains(x->GetName())) { - // new drops - auto name = x->GetName(); - if (auto renamed = reversedRenameMap.FindPtr(name)) { - name = *renamed; - } - - newRenameMap[name].clear(); - } - } - - for (auto& x : memberUsageMap) { - auto prevName = x.first; - if (auto renamed = reversedRenameMap.FindPtr(prevName)) { - prevName = *renamed; - } - - for (auto& y : x.second) { - newRenameMap[prevName].push_back(y); - } - } - - TExprNode::TListType joinSettingsNodes = joinSettings->ChildrenList(); - AppendEquiJoinRenameMap(node.Pos(), newRenameMap, joinSettingsNodes, ctx); - joinSettings = ctx.ChangeChildren(*joinSettings, std::move(joinSettingsNodes)); - auto ret = ctx.ShallowCopy(equiJoin.Ref()); - ret->ChildRef(ret->ChildrenSize() - 1) = joinSettings; - return ret; - } - - TSet<TStringBuf> usedFields; - auto& arg = node.Lambda().Args().Arg(0).Ref(); - auto body = node.Lambda().Body().Ptr(); - if (HaveFieldsSubset(body, arg, usedFields, parentsMap)) { - YQL_CLOG(DEBUG, Core) << "FieldsSubset in " << node.CallableName() << " over EquiJoin"; - auto joinSettings = equiJoin.Ref().ChildPtr(equiJoin.Ref().ChildrenSize() - 1); - auto renameMap = LoadJoinRenameMap(*joinSettings); - joinSettings = RemoveSetting(*joinSettings, "rename", ctx); - auto newRenameMap = UpdateUsedFieldsInRenameMap(renameMap, usedFields, structType); - auto newLambda = ctx.Builder(node.Pos()) - .Lambda() - .Param("item") - .ApplyPartial(node.Lambda().Args().Ptr(), body).With(0, "item").Seal() - .Seal() - .Build(); - - TExprNode::TListType joinSettingsNodes = joinSettings->ChildrenList(); - AppendEquiJoinRenameMap(node.Pos(), newRenameMap, joinSettingsNodes, ctx); - joinSettings = ctx.ChangeChildren(*joinSettings, std::move(joinSettingsNodes)); - auto updatedEquiJoin = ctx.ShallowCopy(equiJoin.Ref()); - updatedEquiJoin->ChildRef(updatedEquiJoin->ChildrenSize() - 1) = joinSettings; - - return ctx.Builder(node.Pos()) - .Callable(node.CallableName()) - .Add(0, updatedEquiJoin) - .Add(1, newLambda) - .Seal() - .Build(); - } - - if (IsPredicateFlatMap(node.Lambda().Body().Ref())) { - // predicate pushdown - const auto& row = node.Lambda().Args().Arg(0).Ref(); - auto predicate = node.Lambda().Body().Ref().ChildPtr(0); - auto value = node.Lambda().Body().Ref().ChildPtr(1); - TJoinLabels labels; - for (ui32 i = 0; i < equiJoin.Ref().ChildrenSize() - 2; ++i) { - auto err = labels.Add(ctx, *equiJoin.Ref().Child(i)->Child(1), - equiJoin.Ref().Child(i)->Child(0)->GetTypeAnn()->Cast<TListExprType>() - ->GetItemType()->Cast<TStructExprType>()); - if (err) { - ctx.AddError(*err); - return nullptr; - } - } - - TExprNode::TListType andTerms; - bool isPg; - GatherAndTerms(predicate, andTerms, isPg, ctx); - TExprNode::TPtr ret; - TExprNode::TPtr extraPredicate; - auto joinSettings = equiJoin.Ref().Child(equiJoin.Ref().ChildrenSize() - 1); - auto renameMap = LoadJoinRenameMap(*joinSettings); - THashMap<TString, TString> backRenameMap; - for (auto& x : renameMap) { - if (!x.second.empty()) { - for (auto& y : x.second) { - backRenameMap[y] = x.first; - } - } - } - - const bool ordered = node.Maybe<TCoOrderedFlatMap>().IsValid(); - - for (auto& andTerm : andTerms) { - if (andTerm->IsCallable("Likely")) { - continue; - } - - TSet<ui32> inputs; - GatherJoinInputs(andTerm, row, parentsMap, backRenameMap, labels, inputs, usedFields); - - if (inputs.size() == 0) { - YQL_CLOG(DEBUG, Core) << "ConstantPredicatePushdownOverEquiJoin"; - ret = ConstantPredicatePushdownOverEquiJoin(equiJoin.Ptr(), andTerm, ordered, ctx); - extraPredicate = FuseAndTerms(node.Pos(), andTerms, andTerm, isPg, ctx); - break; - } - - if (inputs.size() == 1) { - auto newJoin = SingleInputPredicatePushdownOverEquiJoin(equiJoin.Ptr(), andTerm, usedFields, - node.Lambda().Args().Ptr(), labels, *inputs.begin(), renameMap, ordered, ctx); - if (newJoin != equiJoin.Ptr()) { - YQL_CLOG(DEBUG, Core) << "SingleInputPredicatePushdownOverEquiJoin"; - ret = newJoin; - extraPredicate = FuseAndTerms(node.Pos(), andTerms, andTerm, isPg, ctx); - break; - } - } - - if (inputs.size() == 2) { - auto newJoin = DecayCrossJoinIntoInner(equiJoin.Ptr(), andTerm, - labels, *inputs.begin(), *(++inputs.begin()), row, backRenameMap, parentsMap, ctx); - if (newJoin != equiJoin.Ptr()) { - YQL_CLOG(DEBUG, Core) << "DecayCrossJoinIntoInner"; - ret = newJoin; - extraPredicate = FuseAndTerms(node.Pos(), andTerms, andTerm, isPg, ctx); - break; - } - } - } - - if (!ret) { - return node.Ptr(); - } - - if (extraPredicate) { - ret = ctx.Builder(node.Pos()) - .Callable(ordered ? "OrderedFilter" : "Filter") - .Add(0, std::move(ret)) - .Lambda(1) - .Param("item") - .ApplyPartial(node.Lambda().Args().Ptr(), std::move(extraPredicate)).WithNode(row, "item").Seal() - .Seal() - .Seal() - .Build(); - } - - if (value != &row) { - TString name = node.Lambda().Body().Ref().Content().StartsWith("Flat") ? "FlatMap" : "Map"; - if (ordered) { - name.prepend("Ordered"); - } - ret = ctx.Builder(node.Pos()) - .Callable(name) - .Add(0, std::move(ret)) - .Lambda(1) - .Param("item") - .ApplyPartial(node.Lambda().Args().Ptr(), std::move(value)).With(0, "item").Seal() - .Seal() - .Seal() - .Build(); - } - - return ret; - } - - return node.Ptr(); -} - TExprNode::TPtr FlatMapSubsetFields(const TCoFlatMapBase& node, TExprContext& ctx, const TParentsMap& parentsMap) { auto it = parentsMap.find(node.Input().Raw()); YQL_ENSURE(it != parentsMap.cend()); @@ -1837,17 +1066,21 @@ void RegisterCoFlowCallables2(TCallableOptimizerMap& map) { map["FromFlow"] = std::bind(&OptimizeFromFlow, _1, _2, _3); map["Collect"] = std::bind(&OptimizeCollect, _1, _2, _3); - map["FlatMap"] = map["OrderedFlatMap"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) { + map["FlatMap"] = map["OrderedFlatMap"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) -> TExprNode::TPtr { TCoFlatMapBase self(node); if (!optCtx.IsSingleUsage(self.Input().Ref())) { return node; } if (self.Input().Ref().IsCallable("EquiJoin")) { - auto ret = FlatMapOverEquiJoin(self, ctx, *optCtx.ParentsMap); - if (ret != node) { + auto ret = FlatMapOverEquiJoin(self, ctx, *optCtx.ParentsMap, false); + if (!ret.Raw()) { + return nullptr; + } + + if (ret.Raw() != self.Raw()) { YQL_CLOG(DEBUG, Core) << node->Content() << "OverEquiJoin"; - return ret; + return ret.Ptr(); } } diff --git a/ydb/library/yql/core/common_opt/yql_co_transformer.cpp b/ydb/library/yql/core/common_opt/yql_co_transformer.cpp index 8a98716eac..42e2cbb5fb 100644 --- a/ydb/library/yql/core/common_opt/yql_co_transformer.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_transformer.cpp @@ -141,21 +141,30 @@ IGraphTransformer::TStatus TCommonOptTransformer::DoTransform(const TExprNode::T optCtx.ParentsMap = &parentsMap; TNodeOnNodeOwnedMap toOptimize; + bool isError = false; VisitExpr(input, - [&toOptimize](const TExprNode::TPtr&) { - return toOptimize.empty(); + [&toOptimize, &isError](const TExprNode::TPtr&) { + return toOptimize.empty() && !isError; }, - [&callables, &toOptimize, &ctx, &optCtx](const TExprNode::TPtr& node) { + [&callables, &toOptimize, &ctx, &optCtx, &isError](const TExprNode::TPtr& node) { + if (isError) { + return false; + } + if (toOptimize.empty()) { const auto rule = callables.find(node->Content()); if (callables.cend() != rule) { - (rule->second)(node, toOptimize, ctx, optCtx); + isError = isError || !(rule->second)(node, toOptimize, ctx, optCtx); } } - return toOptimize.empty(); + return toOptimize.empty() && !isError; } ); + if (isError) { + return IGraphTransformer::TStatus::Error; + } + if (!toOptimize.empty()) { TOptimizeExprSettings settings(TypeCtx); settings.VisitTuples = true; diff --git a/ydb/library/yql/core/common_opt/yql_flatmap_over_join.cpp b/ydb/library/yql/core/common_opt/yql_flatmap_over_join.cpp new file mode 100644 index 0000000000..ea5442a2fd --- /dev/null +++ b/ydb/library/yql/core/common_opt/yql_flatmap_over_join.cpp @@ -0,0 +1,770 @@ +#include "yql_flatmap_over_join.h" +#include "yql_co.h" + +#include <ydb/library/yql/core/yql_join.h> +#include <ydb/library/yql/core/yql_opt_utils.h> + +#include <ydb/library/yql/utils/log/log.h> + +namespace NYql { + +using namespace NNodes; + +namespace { + +TExprNode::TPtr ConstantPredicatePushdownOverEquiJoin(TExprNode::TPtr equiJoin, TExprNode::TPtr predicate, bool ordered, TExprContext& ctx) { + auto lambda = ctx.Builder(predicate->Pos()) + .Lambda() + .Param("row") + .Set(predicate) + .Seal() + .Build(); + + auto ret = ctx.ShallowCopy(*equiJoin); + auto inputsCount = ret->ChildrenSize() - 2; + for (ui32 i = 0; i < inputsCount; ++i) { + ret->ChildRef(i) = ctx.ShallowCopy(*ret->Child(i)); + ret->Child(i)->ChildRef(0) = ctx.Builder(predicate->Pos()) + .Callable(ordered ? "OrderedFilter" : "Filter") + .Add(0, ret->Child(i)->ChildPtr(0)) + .Add(1, lambda) + .Seal() + .Build(); + } + + return ret; +} + +void GatherKeyAliases(const TExprNode::TPtr& joinTree, TMap<TString, TSet<TString>>& aliases, const TJoinLabels& labels) { + auto left = joinTree->ChildPtr(1); + if (!left->IsAtom()) { + GatherKeyAliases(left, aliases, labels); + } + + auto right = joinTree->ChildPtr(2); + if (!right->IsAtom()) { + GatherKeyAliases(right, aliases, labels); + } + + auto leftColumns = joinTree->Child(3); + auto rightColumns = joinTree->Child(4); + for (ui32 i = 0; i < leftColumns->ChildrenSize(); i += 2) { + auto leftColumn = FullColumnName(leftColumns->Child(i)->Content(), leftColumns->Child(i + 1)->Content()); + auto rightColumn = FullColumnName(rightColumns->Child(i)->Content(), rightColumns->Child(i + 1)->Content()); + auto leftType = *labels.FindColumn(leftColumn); + auto rightType = *labels.FindColumn(rightColumn); + if (IsSameAnnotation(*leftType, *rightType)) { + aliases[leftColumn].insert(rightColumn); + aliases[rightColumn].insert(leftColumn); + } + } +} + +void MakeTransitiveClosure(TMap<TString, TSet<TString>>& aliases) { + for (;;) { + bool hasChanges = false; + for (auto& x : aliases) { + for (auto& y : x.second) { + // x.first->y + for (auto& z : aliases[y]) { + // add x.first->z + if (x.first != z) { + hasChanges = x.second.insert(z).second || hasChanges; + } + } + } + } + + if (!hasChanges) { + return; + } + } +} + +void GatherOptionalKeyColumnsFromEquality(TExprNode::TPtr columns, const TJoinLabels& labels, ui32 inputIndex, + TSet<TString>& optionalKeyColumns) { + for (ui32 i = 0; i < columns->ChildrenSize(); i += 2) { + auto table = columns->Child(i)->Content(); + auto column = columns->Child(i + 1)->Content(); + if (*labels.FindInputIndex(table) == inputIndex) { + auto type = *labels.FindColumn(table, column); + if (type->GetKind() == ETypeAnnotationKind::Optional) { + optionalKeyColumns.insert(FullColumnName(table, column)); + } + } + } +} + +void GatherOptionalKeyColumns(TExprNode::TPtr joinTree, const TJoinLabels& labels, ui32 inputIndex, + TSet<TString>& optionalKeyColumns) { + auto left = joinTree->Child(1); + auto right = joinTree->Child(2); + if (!left->IsAtom()) { + GatherOptionalKeyColumns(left, labels, inputIndex, optionalKeyColumns); + } + + if (!right->IsAtom()) { + GatherOptionalKeyColumns(right, labels, inputIndex, optionalKeyColumns); + } + + auto joinType = joinTree->Child(0)->Content(); + if (joinType == "Inner" || joinType == "LeftSemi") { + GatherOptionalKeyColumnsFromEquality(joinTree->Child(3), labels, inputIndex, optionalKeyColumns); + } + + if (joinType == "Inner" || joinType == "RightSemi") { + GatherOptionalKeyColumnsFromEquality(joinTree->Child(4), labels, inputIndex, optionalKeyColumns); + } +} + +TExprNode::TPtr SingleInputPredicatePushdownOverEquiJoin(TExprNode::TPtr equiJoin, TExprNode::TPtr predicate, + const TSet<TStringBuf>& usedFields, TExprNode::TPtr args, const TJoinLabels& labels, + ui32 firstCandidate, const TMap<TStringBuf, TVector<TStringBuf>>& renameMap, bool ordered, TExprContext& ctx) { + auto inputsCount = equiJoin->ChildrenSize() - 2; + auto joinTree = equiJoin->Child(inputsCount); + TMap<TString, TSet<TString>> aliases; + GatherKeyAliases(joinTree, aliases, labels); + MakeTransitiveClosure(aliases); + TSet<ui32> candidates; + candidates.insert(firstCandidate); + // check whether some used fields are not aliased + bool onlyKeys = true; + for (auto& x : usedFields) { + if (!aliases.contains(TString(x))) { + onlyKeys = false; + break; + } + } + + THashMap<ui32, THashMap<TString, TString>> aliasedKeys; + if (onlyKeys) { + // try to extend inputs + for (ui32 i = 0; i < inputsCount; ++i) { + if (i == firstCandidate) { + continue; + } + + TSet<TString> coveredKeys; + for (auto field : labels.Inputs[i].EnumerateAllColumns()) { + if (auto aliasSet = aliases.FindPtr(field)) { + for (auto alias : *aliasSet) { + if (usedFields.contains(alias)) { + coveredKeys.insert(TString(alias)); + aliasedKeys[i].insert({ field, TString(alias) }); + } + } + } + } + + if (coveredKeys.size() == usedFields.size()) { + candidates.insert(i); + } + } + } + + if (!IsRequiredSide(joinTree, labels, firstCandidate).first) { + return equiJoin; + } + + auto ret = ctx.ShallowCopy(*equiJoin); + for (auto& inputIndex : candidates) { + auto x = IsRequiredSide(joinTree, labels, inputIndex); + if (!x.first) { + continue; + } + + auto prevInput = equiJoin->Child(inputIndex)->ChildPtr(0); + auto newInput = prevInput; + if (x.second) { + // skip null key columns + TSet<TString> optionalKeyColumns; + GatherOptionalKeyColumns(joinTree, labels, inputIndex, optionalKeyColumns); + newInput = FilterOutNullJoinColumns(predicate->Pos(), + prevInput, labels.Inputs[inputIndex], optionalKeyColumns, ctx); + } + + // then apply predicate + newInput = ctx.Builder(predicate->Pos()) + .Callable(ordered ? "OrderedFilter" : "Filter") + .Add(0, newInput) + .Lambda(1) + .Param("row") + .ApplyPartial(args, predicate).With(0) + .Callable("AsStruct") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + ui32 index = 0; + const auto& label = labels.Inputs[inputIndex]; + for (auto column : label.EnumerateAllColumns()) { + TVector<TString> targetColumns; + targetColumns.push_back(column); + if (onlyKeys && inputIndex != firstCandidate) { + if (auto aliasedKey = aliasedKeys[inputIndex].FindPtr(column)) { + targetColumns[0] = *aliasedKey; + } else { + continue; + } + } + + TStringBuf part1; + TStringBuf part2; + SplitTableName(column, part1, part2); + auto memberName = label.MemberName(part1, part2); + + if (auto renamed = renameMap.FindPtr(targetColumns[0])) { + if (renamed->empty()) { + continue; + } + + targetColumns.clear(); + for (auto& r : *renamed) { + targetColumns.push_back(TString(r)); + } + } + + for (auto targetColumn : targetColumns) { + parent.List(index++) + .Atom(0, targetColumn) + .Callable(1, "Member") + .Arg(0, "row") + .Atom(1, memberName) + .Seal() + .Seal(); + } + } + + return parent; + }) + .Seal() + .Done().Seal() + .Seal() + .Seal() + .Build(); + + // then return reassembled join + ret->ChildRef(inputIndex) = ctx.ShallowCopy(*ret->Child(inputIndex)); + ret->Child(inputIndex)->ChildRef(0) = newInput; + } + + return ret; +} + +class TJoinTreeRebuilder { +public: + TJoinTreeRebuilder(TExprNode::TPtr joinTree, TStringBuf label1, TStringBuf column1, TStringBuf label2, TStringBuf column2, + TExprContext& ctx) + : JoinTree(joinTree) + , Labels{ label1, label2 } + , Columns{ column1, column2 } + , Ctx(ctx) + {} + + TExprNode::TPtr Run() { + auto joinTree = RotateCrossJoin(JoinTree->Pos(), JoinTree); + auto newJoinTree = std::get<0>(AddLink(joinTree)); + YQL_ENSURE(Updated); + return newJoinTree; + } + +private: + TExprNode::TPtr RotateCrossJoin(TPositionHandle pos, TExprNode::TPtr joinTree) { + if (joinTree->Child(0)->Content() != "Cross") { + auto children = joinTree->ChildrenList(); + auto& left = children[1]; + auto& right = children[2]; + + if (!left->IsAtom()) { + left = RotateCrossJoin(pos, left); + } + + if (!right->IsAtom()) { + right = RotateCrossJoin(pos, right); + } + + return Ctx.ChangeChildren(*joinTree, std::move(children)); + } + + CrossJoins.clear(); + RestJoins.clear(); + GatherCross(joinTree); + auto inCross1 = FindPtr(CrossJoins, Labels[0]); + auto inCross2 = FindPtr(CrossJoins, Labels[1]); + if (inCross1 || inCross2) { + if (inCross1 && inCross2) { + // make them a leaf + joinTree = MakeCrossJoin(pos, Ctx.NewAtom(pos, Labels[0]), Ctx.NewAtom(pos, Labels[1]), Ctx); + for (auto label : CrossJoins) { + if (label != Labels[0] && label != Labels[1]) { + joinTree = MakeCrossJoin(pos, joinTree, Ctx.NewAtom(pos, label), Ctx); + } + } + + joinTree = AddRestJoins(pos, joinTree, nullptr); + } + else if (inCross1) { + // leaf with table1 and subtree with table2 + auto rest = FindRestJoin(Labels[1]); + YQL_ENSURE(rest); + joinTree = MakeCrossJoin(pos, Ctx.NewAtom(pos, Labels[0]), rest, Ctx); + for (auto label : CrossJoins) { + if (label != Labels[0]) { + joinTree = MakeCrossJoin(pos, joinTree, Ctx.NewAtom(pos, label), Ctx); + } + } + + joinTree = AddRestJoins(pos, joinTree, rest); + } + else { + // leaf with table2 and subtree with table1 + auto rest = FindRestJoin(Labels[0]); + YQL_ENSURE(rest); + joinTree = MakeCrossJoin(pos, Ctx.NewAtom(pos, Labels[1]), rest, Ctx); + for (auto label : CrossJoins) { + if (label != Labels[1]) { + joinTree = MakeCrossJoin(pos, joinTree, Ctx.NewAtom(pos, label), Ctx); + } + } + + joinTree = AddRestJoins(pos, joinTree, rest); + } + } + + return joinTree; + } + + TExprNode::TPtr AddRestJoins(TPositionHandle pos, TExprNode::TPtr joinTree, TExprNode::TPtr exclude) { + for (auto join : RestJoins) { + if (join == exclude) { + continue; + } + + joinTree = MakeCrossJoin(pos, joinTree, join, Ctx); + } + + return joinTree; + } + + TExprNode::TPtr FindRestJoin(TStringBuf label) { + for (auto join : RestJoins) { + if (HasTable(join, label)) { + return join; + } + } + + return nullptr; + } + + bool HasTable(TExprNode::TPtr joinTree, TStringBuf label) { + auto left = joinTree->ChildPtr(1); + if (left->IsAtom()) { + if (left->Content() == label) { + return true; + } + } + else { + if (HasTable(left, label)) { + return true; + } + } + + auto right = joinTree->ChildPtr(2); + if (right->IsAtom()) { + if (right->Content() == label) { + return true; + } + } + else { + if (HasTable(right, label)) { + return true; + } + } + + return false; + } + + void GatherCross(TExprNode::TPtr joinTree) { + auto type = joinTree->Child(0)->Content(); + if (type != "Cross") { + RestJoins.push_back(joinTree); + return; + } + + auto left = joinTree->ChildPtr(1); + if (left->IsAtom()) { + CrossJoins.push_back(left->Content()); + } + else { + GatherCross(left); + } + + auto right = joinTree->ChildPtr(2); + if (right->IsAtom()) { + CrossJoins.push_back(right->Content()); + } + else { + GatherCross(right); + } + } + + std::tuple<TExprNode::TPtr, TMaybe<ui32>, TMaybe<ui32>> AddLink(TExprNode::TPtr joinTree) { + auto children = joinTree->ChildrenList(); + + TMaybe<ui32> found1; + TMaybe<ui32> found2; + auto& left = children[1]; + if (!left->IsAtom()) { + TMaybe<ui32> leftFound1, leftFound2; + std::tie(left, leftFound1, leftFound2) = AddLink(left); + if (leftFound1) { + found1 = 1u; + } + + if (leftFound2) { + found2 = 1u; + } + } + else { + if (left->Content() == Labels[0]) { + found1 = 1u; + } + + if (left->Content() == Labels[1]) { + found2 = 1u; + } + } + + auto& right = children[2]; + if (!right->IsAtom()) { + TMaybe<ui32> rightFound1, rightFound2; + std::tie(right, rightFound1, rightFound2) = AddLink(right); + if (rightFound1) { + found1 = 2u; + } + + if (rightFound2) { + found2 = 2u; + } + } + else { + if (right->Content() == Labels[0]) { + found1 = 2u; + } + + if (right->Content() == Labels[1]) { + found2 = 2u; + } + } + + if (found1 && found2) { + if (!Updated) { + if (joinTree->Child(0)->Content() == "Cross") { + children[0] = Ctx.NewAtom(joinTree->Pos(), "Inner"); + } + else { + YQL_ENSURE(joinTree->Child(0)->Content() == "Inner"); + } + + ui32 index1 = *found1 - 1; // 0/1 + ui32 index2 = 1 - index1; + + auto link1 = children[3]->ChildrenList(); + link1.push_back(Ctx.NewAtom(joinTree->Pos(), Labels[index1])); + link1.push_back(Ctx.NewAtom(joinTree->Pos(), Columns[index1])); + children[3] = Ctx.ChangeChildren(*children[3], std::move(link1)); + + auto link2 = children[4]->ChildrenList(); + link2.push_back(Ctx.NewAtom(joinTree->Pos(), Labels[index2])); + link2.push_back(Ctx.NewAtom(joinTree->Pos(), Columns[index2])); + children[4] = Ctx.ChangeChildren(*children[4], std::move(link2)); + + Updated = true; + } + } + + return { Ctx.ChangeChildren(*joinTree, std::move(children)), found1, found2 }; + } + +private: + TVector<TStringBuf> CrossJoins; + TVector<TExprNode::TPtr> RestJoins; + + bool Updated = false; + + TExprNode::TPtr JoinTree; + TStringBuf Labels[2]; + TStringBuf Columns[2]; + TExprContext& Ctx; +}; + +TExprNode::TPtr DecayCrossJoinIntoInner(TExprNode::TPtr equiJoin, const TExprNode::TPtr& predicate, + const TJoinLabels& labels, ui32 index1, ui32 index2, const TExprNode& row, const THashMap<TString, TString>& backRenameMap, + const TParentsMap& parentsMap, TExprContext& ctx) { + YQL_ENSURE(index1 != index2); + TExprNode::TPtr left, right; + if (!IsEquality(predicate, left, right)) { + return equiJoin; + } + + TSet<ui32> leftInputs, rightInputs; + TSet<TStringBuf> usedFields; + GatherJoinInputs(left, row, parentsMap, backRenameMap, labels, leftInputs, usedFields); + GatherJoinInputs(right, row, parentsMap, backRenameMap, labels, rightInputs, usedFields); + bool good = false; + if (leftInputs.size() == 1 && rightInputs.size() == 1) { + if (*leftInputs.begin() == index1 && *rightInputs.begin() == index2) { + good = true; + } else if (*leftInputs.begin() == index2 && *rightInputs.begin() == index1) { + good = true; + } + } + + if (!good) { + return equiJoin; + } + + auto inputsCount = equiJoin->ChildrenSize() - 2; + auto joinTree = equiJoin->Child(inputsCount); + if (!IsRequiredSide(joinTree, labels, index1).first || + !IsRequiredSide(joinTree, labels, index2).first) { + return equiJoin; + } + + TStringBuf label1, column1, label2, column2; + if (left->IsCallable("Member") && left->Child(0) == &row) { + auto x = left->Tail().Content(); + if (auto ptr = backRenameMap.FindPtr(x)) { + x = *ptr; + } + + SplitTableName(x, label1, column1); + } else { + return equiJoin; + } + + if (right->IsCallable("Member") && right->Child(0) == &row) { + auto x = right->Tail().Content(); + if (auto ptr = backRenameMap.FindPtr(x)) { + x = *ptr; + } + + SplitTableName(x, label2, column2); + } else { + return equiJoin; + } + + TJoinTreeRebuilder rebuilder(joinTree, label1, column1, label2, column2, ctx); + auto newJoinTree = rebuilder.Run(); + return ctx.ChangeChild(*equiJoin, inputsCount, std::move(newJoinTree)); +} + +} + +TExprBase FlatMapOverEquiJoin(const TCoFlatMapBase& node, TExprContext& ctx, const TParentsMap& parentsMap, bool multiUsage) { + auto equiJoin = node.Input(); + auto structType = equiJoin.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType() + ->Cast<TStructExprType>(); + if (structType->GetSize() == 0) { + return node; + } + + TExprNode::TPtr structNode; + if (!multiUsage && IsRenameFlatMap(node, structNode)) { + YQL_CLOG(DEBUG, Core) << "Rename in " << node.CallableName() << " over EquiJoin"; + auto joinSettings = equiJoin.Ref().ChildPtr(equiJoin.Ref().ChildrenSize() - 1); + auto renameMap = LoadJoinRenameMap(*joinSettings); + joinSettings = RemoveSetting(*joinSettings, "rename", ctx); + auto structType = equiJoin.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType() + ->Cast<TStructExprType>(); + THashSet<TStringBuf> usedFields; + TMap<TStringBuf, TVector<TStringBuf>> memberUsageMap; + for (auto& child : structNode->Children()) { + auto item = child->Child(1); + usedFields.insert(item->Child(1)->Content()); + memberUsageMap[item->Child(1)->Content()].push_back(child->Child(0)->Content()); + } + + TMap<TStringBuf, TStringBuf> reversedRenameMap; + TMap<TStringBuf, TVector<TStringBuf>> newRenameMap; + for (auto& x : renameMap) { + if (!x.second.empty()) { + for (auto& y : x.second) { + reversedRenameMap[y] = x.first; + } + } + else { + // previous drops + newRenameMap[x.first].clear(); + } + } + + for (auto& x : structType->GetItems()) { + if (!usedFields.contains(x->GetName())) { + // new drops + auto name = x->GetName(); + if (auto renamed = reversedRenameMap.FindPtr(name)) { + name = *renamed; + } + + newRenameMap[name].clear(); + } + } + + for (auto& x : memberUsageMap) { + auto prevName = x.first; + if (auto renamed = reversedRenameMap.FindPtr(prevName)) { + prevName = *renamed; + } + + for (auto& y : x.second) { + newRenameMap[prevName].push_back(y); + } + } + + TExprNode::TListType joinSettingsNodes = joinSettings->ChildrenList(); + AppendEquiJoinRenameMap(node.Pos(), newRenameMap, joinSettingsNodes, ctx); + joinSettings = ctx.ChangeChildren(*joinSettings, std::move(joinSettingsNodes)); + auto ret = ctx.ShallowCopy(equiJoin.Ref()); + ret->ChildRef(ret->ChildrenSize() - 1) = joinSettings; + return TExprBase(ret); + } + + TSet<TStringBuf> usedFields; + auto& arg = node.Lambda().Args().Arg(0).Ref(); + auto body = node.Lambda().Body().Ptr(); + if (!multiUsage && HaveFieldsSubset(body, arg, usedFields, parentsMap)) { + YQL_CLOG(DEBUG, Core) << "FieldsSubset in " << node.CallableName() << " over EquiJoin"; + auto joinSettings = equiJoin.Ref().ChildPtr(equiJoin.Ref().ChildrenSize() - 1); + auto renameMap = LoadJoinRenameMap(*joinSettings); + joinSettings = RemoveSetting(*joinSettings, "rename", ctx); + auto newRenameMap = UpdateUsedFieldsInRenameMap(renameMap, usedFields, structType); + auto newLambda = ctx.Builder(node.Pos()) + .Lambda() + .Param("item") + .ApplyPartial(node.Lambda().Args().Ptr(), body).With(0, "item").Seal() + .Seal() + .Build(); + + TExprNode::TListType joinSettingsNodes = joinSettings->ChildrenList(); + AppendEquiJoinRenameMap(node.Pos(), newRenameMap, joinSettingsNodes, ctx); + joinSettings = ctx.ChangeChildren(*joinSettings, std::move(joinSettingsNodes)); + auto updatedEquiJoin = ctx.ShallowCopy(equiJoin.Ref()); + updatedEquiJoin->ChildRef(updatedEquiJoin->ChildrenSize() - 1) = joinSettings; + + return TExprBase(ctx.Builder(node.Pos()) + .Callable(node.CallableName()) + .Add(0, updatedEquiJoin) + .Add(1, newLambda) + .Seal() + .Build()); + } + + if (IsPredicateFlatMap(node.Lambda().Body().Ref())) { + // predicate pushdown + const auto& row = node.Lambda().Args().Arg(0).Ref(); + auto predicate = node.Lambda().Body().Ref().ChildPtr(0); + auto value = node.Lambda().Body().Ref().ChildPtr(1); + TJoinLabels labels; + for (ui32 i = 0; i < equiJoin.Ref().ChildrenSize() - 2; ++i) { + auto err = labels.Add(ctx, *equiJoin.Ref().Child(i)->Child(1), + equiJoin.Ref().Child(i)->Child(0)->GetTypeAnn()->Cast<TListExprType>() + ->GetItemType()->Cast<TStructExprType>()); + if (err) { + ctx.AddError(*err); + return TExprBase(nullptr); + } + } + + TExprNode::TListType andTerms; + bool isPg; + GatherAndTerms(predicate, andTerms, isPg, ctx); + TExprNode::TPtr ret; + TExprNode::TPtr extraPredicate; + auto joinSettings = equiJoin.Ref().Child(equiJoin.Ref().ChildrenSize() - 1); + auto renameMap = LoadJoinRenameMap(*joinSettings); + THashMap<TString, TString> backRenameMap; + for (auto& x : renameMap) { + if (!x.second.empty()) { + for (auto& y : x.second) { + backRenameMap[y] = x.first; + } + } + } + + const bool ordered = node.Maybe<TCoOrderedFlatMap>().IsValid(); + + for (auto& andTerm : andTerms) { + if (andTerm->IsCallable("Likely")) { + continue; + } + + TSet<ui32> inputs; + GatherJoinInputs(andTerm, row, parentsMap, backRenameMap, labels, inputs, usedFields); + + if (!multiUsage && inputs.size() == 0) { + YQL_CLOG(DEBUG, Core) << "ConstantPredicatePushdownOverEquiJoin"; + ret = ConstantPredicatePushdownOverEquiJoin(equiJoin.Ptr(), andTerm, ordered, ctx); + extraPredicate = FuseAndTerms(node.Pos(), andTerms, andTerm, isPg, ctx); + break; + } + + if (!multiUsage && inputs.size() == 1) { + auto newJoin = SingleInputPredicatePushdownOverEquiJoin(equiJoin.Ptr(), andTerm, usedFields, + node.Lambda().Args().Ptr(), labels, *inputs.begin(), renameMap, ordered, ctx); + if (newJoin != equiJoin.Ptr()) { + YQL_CLOG(DEBUG, Core) << "SingleInputPredicatePushdownOverEquiJoin"; + ret = newJoin; + extraPredicate = FuseAndTerms(node.Pos(), andTerms, andTerm, isPg, ctx); + break; + } + } + + if (inputs.size() == 2) { + auto newJoin = DecayCrossJoinIntoInner(equiJoin.Ptr(), andTerm, + labels, *inputs.begin(), *(++inputs.begin()), row, backRenameMap, parentsMap, ctx); + if (newJoin != equiJoin.Ptr()) { + YQL_CLOG(DEBUG, Core) << "DecayCrossJoinIntoInner"; + ret = newJoin; + extraPredicate = FuseAndTerms(node.Pos(), andTerms, andTerm, isPg, ctx); + break; + } + } + } + + if (!ret) { + return node; + } + + if (extraPredicate) { + ret = ctx.Builder(node.Pos()) + .Callable(ordered ? "OrderedFilter" : "Filter") + .Add(0, std::move(ret)) + .Lambda(1) + .Param("item") + .ApplyPartial(node.Lambda().Args().Ptr(), std::move(extraPredicate)).WithNode(row, "item").Seal() + .Seal() + .Seal() + .Build(); + } + + if (value != &row) { + TString name = node.Lambda().Body().Ref().Content().StartsWith("Flat") ? "FlatMap" : "Map"; + if (ordered) { + name.prepend("Ordered"); + } + ret = ctx.Builder(node.Pos()) + .Callable(name) + .Add(0, std::move(ret)) + .Lambda(1) + .Param("item") + .ApplyPartial(node.Lambda().Args().Ptr(), std::move(value)).With(0, "item").Seal() + .Seal() + .Seal() + .Build(); + } + + return TExprBase(ret); + } + + return node; +} + +} diff --git a/ydb/library/yql/core/common_opt/yql_flatmap_over_join.h b/ydb/library/yql/core/common_opt/yql_flatmap_over_join.h new file mode 100644 index 0000000000..59777bf533 --- /dev/null +++ b/ydb/library/yql/core/common_opt/yql_flatmap_over_join.h @@ -0,0 +1,14 @@ +#pragma once + +#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> + +#include <ydb/library/yql/ast/yql_expr.h> + +#include <util/generic/strbuf.h> + + +namespace NYql { + +NNodes::TExprBase FlatMapOverEquiJoin(const NNodes::TCoFlatMapBase& node, TExprContext& ctx, const TParentsMap& parentsMap, bool multiUsage); + +} // NYql diff --git a/ydb/library/yql/core/yql_join.cpp b/ydb/library/yql/core/yql_join.cpp index 64bd3d03e8..67fdb01931 100644 --- a/ydb/library/yql/core/yql_join.cpp +++ b/ydb/library/yql/core/yql_join.cpp @@ -1725,4 +1725,32 @@ bool IsEquality(TExprNode::TPtr predicate, TExprNode::TPtr& left, TExprNode::TPt return false; } +void GatherJoinInputs(const TExprNode::TPtr& expr, const TExprNode& row, + const TParentsMap& parentsMap, const THashMap<TString, TString>& backRenameMap, + const TJoinLabels& labels, TSet<ui32>& inputs, TSet<TStringBuf>& usedFields) { + usedFields.clear(); + + if (!HaveFieldsSubset(expr, row, usedFields, parentsMap, false)) { + const auto inputStructType = RemoveOptionalType(row.GetTypeAnn())->Cast<TStructExprType>(); + for (const auto& i : inputStructType->GetItems()) { + usedFields.insert(i->GetName()); + } + } + + for (auto x : usedFields) { + // rename used fields + if (auto renamed = backRenameMap.FindPtr(x)) { + x = *renamed; + } + + TStringBuf part1; + TStringBuf part2; + SplitTableName(x, part1, part2); + inputs.insert(*labels.FindInputIndex(part1)); + if (inputs.size() == labels.Inputs.size()) { + break; + } + } +} + } // namespace NYql diff --git a/ydb/library/yql/core/yql_join.h b/ydb/library/yql/core/yql_join.h index da6137f3f0..872e6a7f16 100644 --- a/ydb/library/yql/core/yql_join.h +++ b/ydb/library/yql/core/yql_join.h @@ -149,4 +149,9 @@ TExprNode::TPtr FuseAndTerms(TPositionHandle position, const TExprNode::TListTyp bool IsEquality(TExprNode::TPtr predicate, TExprNode::TPtr& left, TExprNode::TPtr& right); +void GatherJoinInputs(const TExprNode::TPtr& expr, const TExprNode& row, + const TParentsMap& parentsMap, const THashMap<TString, TString>& backRenameMap, + const TJoinLabels& labels, TSet<ui32>& inputs, TSet<TStringBuf>& usedFields); + + } |