diff options
author | aneporada <aneporada@yandex-team.com> | 2025-01-22 13:28:21 +0300 |
---|---|---|
committer | aneporada <aneporada@yandex-team.com> | 2025-01-22 14:11:59 +0300 |
commit | 50b5088b27697d2ff6c22c8083ac5ed9343f9e82 (patch) | |
tree | bf804a94fe196153eea8c438690bdcc71bc82120 | |
parent | e11a91ef357f17a112d73f070ee0c307e8329b68 (diff) | |
download | ydb-50b5088b27697d2ff6c22c8083ac5ed9343f9e82.tar.gz |
PullUpExtendOverEquiJoin optimizer
commit_hash:386f6e11f9be5b6322fa0859193bb2197e082e44
-rw-r--r-- | yql/essentials/core/common_opt/yql_co_flow2.cpp | 152 | ||||
-rw-r--r-- | yql/essentials/core/yql_join.cpp | 31 | ||||
-rw-r--r-- | yql/essentials/core/yql_join.h | 1 | ||||
-rw-r--r-- | yt/yql/tests/sql/suites/join/pullup_extend.cfg | 4 | ||||
-rw-r--r-- | yt/yql/tests/sql/suites/join/pullup_extend.sql | 18 |
5 files changed, 188 insertions, 18 deletions
diff --git a/yql/essentials/core/common_opt/yql_co_flow2.cpp b/yql/essentials/core/common_opt/yql_co_flow2.cpp index ace0141cb2..d7c359f8d5 100644 --- a/yql/essentials/core/common_opt/yql_co_flow2.cpp +++ b/yql/essentials/core/common_opt/yql_co_flow2.cpp @@ -31,6 +31,14 @@ bool AllowComplexFiltersOverAggregatePushdown(const TOptimizeContext& optCtx) { optCtx.Types->MaxAggPushdownPredicates > 0; } +bool AllowPullUpExtendOverEquiJoin(const TOptimizeContext& optCtx) { + YQL_ENSURE(optCtx.Types); + static const TString pull = to_lower(TString("PullUpExtendOverEquiJoin")); + static const TString noPull = to_lower(TString("DisablePullUpExtendOverEquiJoin")); + return optCtx.Types->OptimizerFlags.contains(pull) && + !optCtx.Types->OptimizerFlags.contains(noPull); +} + THashSet<TStringBuf> GetAggregationInputKeys(const TCoAggregate& node) { TMaybe<TStringBuf> sessionColumn; const auto sessionSetting = GetSetting(node.Settings().Ref(), "session"); @@ -548,19 +556,24 @@ bool IsRenamingOrPassthroughFlatMap(const TCoFlatMapBase& flatMap, THashMap<TStr return false; } -bool IsInputSuitableForPullingOverEquiJoin(const TCoEquiJoinInput& input, - const THashMap<TStringBuf, THashSet<TStringBuf>>& joinKeysByLabel, - THashMap<TStringBuf, TStringBuf>& renames, TOptimizeContext& optCtx) -{ - renames.clear(); - YQL_ENSURE(input.Scope().Ref().IsAtom()); - - auto maybeFlatMap = TMaybeNode<TCoFlatMapBase>(input.List().Ptr()); - if (!maybeFlatMap) { +bool IsDirectRead(const TExprNode& node, TOptimizeContext& optCtx) { + const TExprNode* curr = &node; + while (curr->IsCallable(SkippableCallables) || curr->IsCallable("ExtractMembers")) { + curr = &curr->Head(); + } + if (!curr->IsCallable("Right!")) { return false; } + const auto& readNode = curr->Head(); + YQL_ENSURE(optCtx.Types); + return AnyOf(optCtx.Types->DataSourceMap, [&](const auto& entry) { return entry.second->IsRead(readNode); }); +} - auto flatMap = maybeFlatMap.Cast(); +bool IsFlatmapSuitableForPullUpOverEqiuJoin(const TCoFlatMapBase& flatMap, + TStringBuf label, + const THashMap<TStringBuf, THashSet<TStringBuf>>& joinKeysByLabel, + THashMap<TStringBuf, TStringBuf>& renames, TOptimizeContext& optCtx) +{ if (flatMap.Lambda().Args().Arg(0).Ref().IsUsedInDependsOn()) { return false; } @@ -569,7 +582,7 @@ bool IsInputSuitableForPullingOverEquiJoin(const TCoEquiJoinInput& input, return false; } - if (!optCtx.IsSingleUsage(input) || !optCtx.IsSingleUsage(flatMap)) { + if (!optCtx.IsSingleUsage(flatMap)) { return false; } @@ -591,7 +604,7 @@ bool IsInputSuitableForPullingOverEquiJoin(const TCoEquiJoinInput& input, return false; } - auto keysIt = joinKeysByLabel.find(input.Scope().Ref().Content()); + auto keysIt = joinKeysByLabel.find(label); const auto& joinKeys = (keysIt == joinKeysByLabel.end()) ? THashSet<TStringBuf>() : keysIt->second; size_t joinKeysFound = 0; @@ -629,6 +642,25 @@ bool IsInputSuitableForPullingOverEquiJoin(const TCoEquiJoinInput& input, return true; } +bool IsInputSuitableForPullingOverEquiJoin(const TCoEquiJoinInput& input, + const THashMap<TStringBuf, THashSet<TStringBuf>>& joinKeysByLabel, + THashMap<TStringBuf, TStringBuf>& renames, TOptimizeContext& optCtx) +{ + renames.clear(); + YQL_ENSURE(input.Scope().Ref().IsAtom()); + if (!optCtx.IsSingleUsage(input)) { + return false; + } + + auto maybeFlatMap = TMaybeNode<TCoFlatMapBase>(input.List().Ptr()); + if (!maybeFlatMap) { + return false; + } + + const TStringBuf label = input.Scope().Ref().Content(); + return IsFlatmapSuitableForPullUpOverEqiuJoin(maybeFlatMap.Cast(), label, joinKeysByLabel, renames, optCtx); +} + TExprNode::TPtr ApplyRenames(const TExprNode::TPtr& input, const TMap<TStringBuf, TVector<TStringBuf>>& renames, const TStructExprType& noRenamesResultType, TStringBuf canaryBaseName, TExprContext& ctx) { @@ -824,8 +856,8 @@ TExprNode::TPtr BuildOutputFlattenMembersArg(const TCoEquiJoinInput& input, cons .Build(); } -TExprNode::TPtr PullUpFlatMapOverEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) { - if (!optCtx.Types->PullUpFlatMapOverJoin) { +TExprNode::TPtr PullUpExtendOverEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) { + if (!optCtx.Types->PullUpFlatMapOverJoin || !AllowPullUpExtendOverEquiJoin(optCtx)) { return node; } @@ -838,10 +870,86 @@ TExprNode::TPtr PullUpFlatMapOverEquiJoin(const TExprNode::TPtr& node, TExprCont } auto settings = node->ChildPtr(inputsCount + 1); - for (auto& child : settings->Children()) { - if (child->Head().IsAtom("flatten")) { + if (HasSetting(*settings, "flatten")) { + return node; + } + + const THashMap<TStringBuf, bool> additiveInputLabels = CollectAdditiveInputLabels(TCoEquiJoinTuple(joinTree)); + const THashMap<TStringBuf, THashSet<TStringBuf>> joinKeysByLabel = CollectEquiJoinKeyColumnsByLabel(*joinTree); + for (ui32 i = 0; i < inputsCount; ++i) { + TCoEquiJoinInput input(node->ChildPtr(i)); + if (!input.Scope().Ref().IsAtom()) { return node; } + const TStringBuf label = input.Scope().Ref().Content(); + auto addIt = additiveInputLabels.find(label); + YQL_ENSURE(addIt != additiveInputLabels.end()); + if (!addIt->second) { + continue; + } + + auto maybeExtend = input.List().Maybe<TCoExtendBase>(); + if (!maybeExtend) { + continue; + } + + const TExprNodeList items = maybeExtend.Cast().Ref().ChildrenList(); + size_t pullableFlatmaps = 0; + size_t directReads = 0; + for (auto item : items) { + auto maybeFlatMap = TMaybeNode<TCoFlatMapBase>(item); + if (maybeFlatMap) { + THashMap<TStringBuf, TStringBuf> renames; + if (IsFlatmapSuitableForPullUpOverEqiuJoin(maybeFlatMap.Cast(), label, joinKeysByLabel, renames, optCtx)) { + ++pullableFlatmaps; + } + } else if (IsDirectRead(*item, optCtx)) { + ++directReads; + } + } + if (pullableFlatmaps > 0 && pullableFlatmaps + directReads == items.size()) { + YQL_CLOG(DEBUG, Core) << "Will pull up " << maybeExtend.Cast().CallableName() << " over EquiJoin input #" << i; + TExprNodeList newItems; + TExprNodeList reads; + auto processReads = [&]() { + if (!reads.empty()) { + auto newExtend = ctx.ChangeChildren(maybeExtend.Cast().Ref(), std::move(reads)); + auto newInput = ctx.ChangeChild(input.Ref(), TCoEquiJoinInput::idx_List, std::move(newExtend)); + newItems.push_back(ctx.ChangeChild(*node, i, std::move(newInput))); + } + }; + for (auto item : items) { + if (IsDirectRead(*item, optCtx)) { + reads.push_back(item); + continue; + } + processReads(); + auto newInput = ctx.ChangeChild(input.Ref(), TCoEquiJoinInput::idx_List, std::move(item)); + newItems.push_back(ctx.ChangeChild(*node, i, std::move(newInput))); + } + processReads(); + return ctx.ChangeChildren(maybeExtend.Cast().Ref(), std::move(newItems)); + } + } + return node; +} + +TExprNode::TPtr PullUpFlatMapOverEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) { + if (!optCtx.Types->PullUpFlatMapOverJoin) { + return node; + } + + YQL_ENSURE(node->ChildrenSize() >= 4); + auto inputsCount = ui32(node->ChildrenSize() - 2); + + auto joinTree = node->ChildPtr(inputsCount); + if (HasOnlyOneJoinType(*joinTree, "Cross")) { + return node; + } + + auto settings = node->ChildPtr(inputsCount + 1); + if (HasSetting(*settings, "flatten")) { + return node; } static const TStringBuf canaryBaseName = "_yql_canary_"; @@ -1710,8 +1818,16 @@ void RegisterCoFlowCallables2(TCallableOptimizerMap& map) { } } - auto ret = PullUpFlatMapOverEquiJoin(node, ctx, optCtx); - if (ret != node) { + if (auto ret = PullUpExtendOverEquiJoin(node, ctx, optCtx); ret != node) { + // This optimizer performs following optimization + // (A union all B uinon all ...) join Z -> A join Z union all B join Z union all ... + // We do this optimization only if all of A, B, ... are either FlatMaps suitable for PullUpFlatMapOverEquiJoin + // or direct reads + YQL_CLOG(DEBUG, Core) << "PullUpExtendOverEquiJoin"; + return ret; + } + + if (auto ret = PullUpFlatMapOverEquiJoin(node, ctx, optCtx); ret != node) { YQL_CLOG(DEBUG, Core) << "PullUpFlatMapOverEquiJoin"; return ret; } diff --git a/yql/essentials/core/yql_join.cpp b/yql/essentials/core/yql_join.cpp index 1206419d69..5b42391941 100644 --- a/yql/essentials/core/yql_join.cpp +++ b/yql/essentials/core/yql_join.cpp @@ -430,6 +430,31 @@ namespace { CollectEquiJoinKeyColumnsFromLeaf(*joinTree.Child(4), tableKeysMap); } + void CollectAdditiveInputLabelsSide(const TCoEquiJoinTuple& joinTree, bool hasAny, THashMap<TStringBuf, bool>& isAdditiveByLabel, + bool isLeft, const TEquiJoinLinkSettings& settings); + + void CollectAdditiveInputLabels(const TCoEquiJoinTuple& joinTree, bool hasAny, THashMap<TStringBuf, bool>& isAdditiveByLabel) { + auto settings = GetEquiJoinLinkSettings(joinTree.Options().Ref()); + CollectAdditiveInputLabelsSide(joinTree, hasAny, isAdditiveByLabel, true, settings); + CollectAdditiveInputLabelsSide(joinTree, hasAny, isAdditiveByLabel, false, settings); + } + + void CollectAdditiveInputLabelsSide(const TCoEquiJoinTuple& joinTree, bool hasAny, THashMap<TStringBuf, bool>& isAdditiveByLabel, bool isLeft, const TEquiJoinLinkSettings& settings) { + hasAny = hasAny || (isLeft ? settings.LeftHints : settings.RightHints).contains("any"); + const auto scope = isLeft ? joinTree.LeftScope() : joinTree.RightScope(); + TStringBuf joinKind = joinTree.Type().Value(); + if (scope.Maybe<TCoEquiJoinTuple>()) { + CollectAdditiveInputLabels(scope.Cast<TCoEquiJoinTuple>(), hasAny, isAdditiveByLabel); + } else { + YQL_ENSURE(scope.Maybe<TCoAtom>()); + bool additive = !hasAny && (joinKind == (isLeft ? "Left" : "Right") || joinKind == "Inner" || joinKind == "Cross"); + TStringBuf label = scope.Cast<TCoAtom>().Value(); + if (!additive || !isAdditiveByLabel.contains(label)) { + isAdditiveByLabel[label] = additive; + } + } + } + bool CollectEquiJoinOnlyParents(const TExprNode& current, const TExprNode* prev, ui32 depth, TVector<TEquiJoinParent>& results, const TExprNode* extractMembersInScope, const TParentsMap& parents) @@ -951,6 +976,12 @@ bool IsRightJoinSideOptional(const TStringBuf& joinType) { return false; } +THashMap<TStringBuf, bool> CollectAdditiveInputLabels(const TCoEquiJoinTuple& joinTree) { + THashMap<TStringBuf, bool> result; + CollectAdditiveInputLabels(joinTree, false, result); + return result; +} + TExprNode::TPtr FilterOutNullJoinColumns(TPositionHandle pos, const TExprNode::TPtr& input, const TJoinLabel& label, const TSet<TString>& optionalKeyColumns, TExprContext& ctx) { if (optionalKeyColumns.empty()) { diff --git a/yql/essentials/core/yql_join.h b/yql/essentials/core/yql_join.h index d7df5f988b..d79037a3d9 100644 --- a/yql/essentials/core/yql_join.h +++ b/yql/essentials/core/yql_join.h @@ -92,6 +92,7 @@ THashMap<TStringBuf, THashSet<TStringBuf>> CollectEquiJoinKeyColumnsByLabel(cons bool IsLeftJoinSideOptional(const TStringBuf& joinType); bool IsRightJoinSideOptional(const TStringBuf& joinType); +THashMap<TStringBuf, bool> CollectAdditiveInputLabels(const NNodes::TCoEquiJoinTuple& joinTree); TExprNode::TPtr FilterOutNullJoinColumns(TPositionHandle pos, const TExprNode::TPtr& input, const TJoinLabel& label, const TSet<TString>& optionalKeyColumns, TExprContext& ctx); diff --git a/yt/yql/tests/sql/suites/join/pullup_extend.cfg b/yt/yql/tests/sql/suites/join/pullup_extend.cfg new file mode 100644 index 0000000000..829b6d19f2 --- /dev/null +++ b/yt/yql/tests/sql/suites/join/pullup_extend.cfg @@ -0,0 +1,4 @@ +in Input1 kv1_sorted.txt +in Input2 kv2_sorted.txt +in Input3 kv3_sorted.txt +in Input4 kv4_sorted.txt diff --git a/yt/yql/tests/sql/suites/join/pullup_extend.sql b/yt/yql/tests/sql/suites/join/pullup_extend.sql new file mode 100644 index 0000000000..72987a0e8b --- /dev/null +++ b/yt/yql/tests/sql/suites/join/pullup_extend.sql @@ -0,0 +1,18 @@ +pragma EmitUnionMerge; +pragma config.flags("OptimizerFlags", "PullUpExtendOverEquiJoin"); +pragma yt.JoinMergeTablesLimit="10"; + +use plato; + +$t1 = select k1 as k2, "111" as v2 from Input1; +$t2 = select k2, v2 from Input2; + +$a = select * from $t1 + union all + select * from $t2 +; + +select + a.k2 as k, + a.v2 as av2 +from $a as a join Input4 as b on a.k2 = b.k4; |