aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@yandex-team.com>2025-01-22 13:28:21 +0300
committeraneporada <aneporada@yandex-team.com>2025-01-22 14:11:59 +0300
commit50b5088b27697d2ff6c22c8083ac5ed9343f9e82 (patch)
treebf804a94fe196153eea8c438690bdcc71bc82120
parente11a91ef357f17a112d73f070ee0c307e8329b68 (diff)
downloadydb-50b5088b27697d2ff6c22c8083ac5ed9343f9e82.tar.gz
PullUpExtendOverEquiJoin optimizer
commit_hash:386f6e11f9be5b6322fa0859193bb2197e082e44
-rw-r--r--yql/essentials/core/common_opt/yql_co_flow2.cpp152
-rw-r--r--yql/essentials/core/yql_join.cpp31
-rw-r--r--yql/essentials/core/yql_join.h1
-rw-r--r--yt/yql/tests/sql/suites/join/pullup_extend.cfg4
-rw-r--r--yt/yql/tests/sql/suites/join/pullup_extend.sql18
5 files changed, 188 insertions, 18 deletions
diff --git a/yql/essentials/core/common_opt/yql_co_flow2.cpp b/yql/essentials/core/common_opt/yql_co_flow2.cpp
index ace0141cb2..d7c359f8d5 100644
--- a/yql/essentials/core/common_opt/yql_co_flow2.cpp
+++ b/yql/essentials/core/common_opt/yql_co_flow2.cpp
@@ -31,6 +31,14 @@ bool AllowComplexFiltersOverAggregatePushdown(const TOptimizeContext& optCtx) {
optCtx.Types->MaxAggPushdownPredicates > 0;
}
+bool AllowPullUpExtendOverEquiJoin(const TOptimizeContext& optCtx) {
+ YQL_ENSURE(optCtx.Types);
+ static const TString pull = to_lower(TString("PullUpExtendOverEquiJoin"));
+ static const TString noPull = to_lower(TString("DisablePullUpExtendOverEquiJoin"));
+ return optCtx.Types->OptimizerFlags.contains(pull) &&
+ !optCtx.Types->OptimizerFlags.contains(noPull);
+}
+
THashSet<TStringBuf> GetAggregationInputKeys(const TCoAggregate& node) {
TMaybe<TStringBuf> sessionColumn;
const auto sessionSetting = GetSetting(node.Settings().Ref(), "session");
@@ -548,19 +556,24 @@ bool IsRenamingOrPassthroughFlatMap(const TCoFlatMapBase& flatMap, THashMap<TStr
return false;
}
-bool IsInputSuitableForPullingOverEquiJoin(const TCoEquiJoinInput& input,
- const THashMap<TStringBuf, THashSet<TStringBuf>>& joinKeysByLabel,
- THashMap<TStringBuf, TStringBuf>& renames, TOptimizeContext& optCtx)
-{
- renames.clear();
- YQL_ENSURE(input.Scope().Ref().IsAtom());
-
- auto maybeFlatMap = TMaybeNode<TCoFlatMapBase>(input.List().Ptr());
- if (!maybeFlatMap) {
+bool IsDirectRead(const TExprNode& node, TOptimizeContext& optCtx) {
+ const TExprNode* curr = &node;
+ while (curr->IsCallable(SkippableCallables) || curr->IsCallable("ExtractMembers")) {
+ curr = &curr->Head();
+ }
+ if (!curr->IsCallable("Right!")) {
return false;
}
+ const auto& readNode = curr->Head();
+ YQL_ENSURE(optCtx.Types);
+ return AnyOf(optCtx.Types->DataSourceMap, [&](const auto& entry) { return entry.second->IsRead(readNode); });
+}
- auto flatMap = maybeFlatMap.Cast();
+bool IsFlatmapSuitableForPullUpOverEqiuJoin(const TCoFlatMapBase& flatMap,
+ TStringBuf label,
+ const THashMap<TStringBuf, THashSet<TStringBuf>>& joinKeysByLabel,
+ THashMap<TStringBuf, TStringBuf>& renames, TOptimizeContext& optCtx)
+{
if (flatMap.Lambda().Args().Arg(0).Ref().IsUsedInDependsOn()) {
return false;
}
@@ -569,7 +582,7 @@ bool IsInputSuitableForPullingOverEquiJoin(const TCoEquiJoinInput& input,
return false;
}
- if (!optCtx.IsSingleUsage(input) || !optCtx.IsSingleUsage(flatMap)) {
+ if (!optCtx.IsSingleUsage(flatMap)) {
return false;
}
@@ -591,7 +604,7 @@ bool IsInputSuitableForPullingOverEquiJoin(const TCoEquiJoinInput& input,
return false;
}
- auto keysIt = joinKeysByLabel.find(input.Scope().Ref().Content());
+ auto keysIt = joinKeysByLabel.find(label);
const auto& joinKeys = (keysIt == joinKeysByLabel.end()) ? THashSet<TStringBuf>() : keysIt->second;
size_t joinKeysFound = 0;
@@ -629,6 +642,25 @@ bool IsInputSuitableForPullingOverEquiJoin(const TCoEquiJoinInput& input,
return true;
}
+bool IsInputSuitableForPullingOverEquiJoin(const TCoEquiJoinInput& input,
+ const THashMap<TStringBuf, THashSet<TStringBuf>>& joinKeysByLabel,
+ THashMap<TStringBuf, TStringBuf>& renames, TOptimizeContext& optCtx)
+{
+ renames.clear();
+ YQL_ENSURE(input.Scope().Ref().IsAtom());
+ if (!optCtx.IsSingleUsage(input)) {
+ return false;
+ }
+
+ auto maybeFlatMap = TMaybeNode<TCoFlatMapBase>(input.List().Ptr());
+ if (!maybeFlatMap) {
+ return false;
+ }
+
+ const TStringBuf label = input.Scope().Ref().Content();
+ return IsFlatmapSuitableForPullUpOverEqiuJoin(maybeFlatMap.Cast(), label, joinKeysByLabel, renames, optCtx);
+}
+
TExprNode::TPtr ApplyRenames(const TExprNode::TPtr& input, const TMap<TStringBuf, TVector<TStringBuf>>& renames,
const TStructExprType& noRenamesResultType, TStringBuf canaryBaseName, TExprContext& ctx)
{
@@ -824,8 +856,8 @@ TExprNode::TPtr BuildOutputFlattenMembersArg(const TCoEquiJoinInput& input, cons
.Build();
}
-TExprNode::TPtr PullUpFlatMapOverEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
- if (!optCtx.Types->PullUpFlatMapOverJoin) {
+TExprNode::TPtr PullUpExtendOverEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
+ if (!optCtx.Types->PullUpFlatMapOverJoin || !AllowPullUpExtendOverEquiJoin(optCtx)) {
return node;
}
@@ -838,10 +870,86 @@ TExprNode::TPtr PullUpFlatMapOverEquiJoin(const TExprNode::TPtr& node, TExprCont
}
auto settings = node->ChildPtr(inputsCount + 1);
- for (auto& child : settings->Children()) {
- if (child->Head().IsAtom("flatten")) {
+ if (HasSetting(*settings, "flatten")) {
+ return node;
+ }
+
+ const THashMap<TStringBuf, bool> additiveInputLabels = CollectAdditiveInputLabels(TCoEquiJoinTuple(joinTree));
+ const THashMap<TStringBuf, THashSet<TStringBuf>> joinKeysByLabel = CollectEquiJoinKeyColumnsByLabel(*joinTree);
+ for (ui32 i = 0; i < inputsCount; ++i) {
+ TCoEquiJoinInput input(node->ChildPtr(i));
+ if (!input.Scope().Ref().IsAtom()) {
return node;
}
+ const TStringBuf label = input.Scope().Ref().Content();
+ auto addIt = additiveInputLabels.find(label);
+ YQL_ENSURE(addIt != additiveInputLabels.end());
+ if (!addIt->second) {
+ continue;
+ }
+
+ auto maybeExtend = input.List().Maybe<TCoExtendBase>();
+ if (!maybeExtend) {
+ continue;
+ }
+
+ const TExprNodeList items = maybeExtend.Cast().Ref().ChildrenList();
+ size_t pullableFlatmaps = 0;
+ size_t directReads = 0;
+ for (auto item : items) {
+ auto maybeFlatMap = TMaybeNode<TCoFlatMapBase>(item);
+ if (maybeFlatMap) {
+ THashMap<TStringBuf, TStringBuf> renames;
+ if (IsFlatmapSuitableForPullUpOverEqiuJoin(maybeFlatMap.Cast(), label, joinKeysByLabel, renames, optCtx)) {
+ ++pullableFlatmaps;
+ }
+ } else if (IsDirectRead(*item, optCtx)) {
+ ++directReads;
+ }
+ }
+ if (pullableFlatmaps > 0 && pullableFlatmaps + directReads == items.size()) {
+ YQL_CLOG(DEBUG, Core) << "Will pull up " << maybeExtend.Cast().CallableName() << " over EquiJoin input #" << i;
+ TExprNodeList newItems;
+ TExprNodeList reads;
+ auto processReads = [&]() {
+ if (!reads.empty()) {
+ auto newExtend = ctx.ChangeChildren(maybeExtend.Cast().Ref(), std::move(reads));
+ auto newInput = ctx.ChangeChild(input.Ref(), TCoEquiJoinInput::idx_List, std::move(newExtend));
+ newItems.push_back(ctx.ChangeChild(*node, i, std::move(newInput)));
+ }
+ };
+ for (auto item : items) {
+ if (IsDirectRead(*item, optCtx)) {
+ reads.push_back(item);
+ continue;
+ }
+ processReads();
+ auto newInput = ctx.ChangeChild(input.Ref(), TCoEquiJoinInput::idx_List, std::move(item));
+ newItems.push_back(ctx.ChangeChild(*node, i, std::move(newInput)));
+ }
+ processReads();
+ return ctx.ChangeChildren(maybeExtend.Cast().Ref(), std::move(newItems));
+ }
+ }
+ return node;
+}
+
+TExprNode::TPtr PullUpFlatMapOverEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
+ if (!optCtx.Types->PullUpFlatMapOverJoin) {
+ return node;
+ }
+
+ YQL_ENSURE(node->ChildrenSize() >= 4);
+ auto inputsCount = ui32(node->ChildrenSize() - 2);
+
+ auto joinTree = node->ChildPtr(inputsCount);
+ if (HasOnlyOneJoinType(*joinTree, "Cross")) {
+ return node;
+ }
+
+ auto settings = node->ChildPtr(inputsCount + 1);
+ if (HasSetting(*settings, "flatten")) {
+ return node;
}
static const TStringBuf canaryBaseName = "_yql_canary_";
@@ -1710,8 +1818,16 @@ void RegisterCoFlowCallables2(TCallableOptimizerMap& map) {
}
}
- auto ret = PullUpFlatMapOverEquiJoin(node, ctx, optCtx);
- if (ret != node) {
+ if (auto ret = PullUpExtendOverEquiJoin(node, ctx, optCtx); ret != node) {
+ // This optimizer performs following optimization
+ // (A union all B uinon all ...) join Z -> A join Z union all B join Z union all ...
+ // We do this optimization only if all of A, B, ... are either FlatMaps suitable for PullUpFlatMapOverEquiJoin
+ // or direct reads
+ YQL_CLOG(DEBUG, Core) << "PullUpExtendOverEquiJoin";
+ return ret;
+ }
+
+ if (auto ret = PullUpFlatMapOverEquiJoin(node, ctx, optCtx); ret != node) {
YQL_CLOG(DEBUG, Core) << "PullUpFlatMapOverEquiJoin";
return ret;
}
diff --git a/yql/essentials/core/yql_join.cpp b/yql/essentials/core/yql_join.cpp
index 1206419d69..5b42391941 100644
--- a/yql/essentials/core/yql_join.cpp
+++ b/yql/essentials/core/yql_join.cpp
@@ -430,6 +430,31 @@ namespace {
CollectEquiJoinKeyColumnsFromLeaf(*joinTree.Child(4), tableKeysMap);
}
+ void CollectAdditiveInputLabelsSide(const TCoEquiJoinTuple& joinTree, bool hasAny, THashMap<TStringBuf, bool>& isAdditiveByLabel,
+ bool isLeft, const TEquiJoinLinkSettings& settings);
+
+ void CollectAdditiveInputLabels(const TCoEquiJoinTuple& joinTree, bool hasAny, THashMap<TStringBuf, bool>& isAdditiveByLabel) {
+ auto settings = GetEquiJoinLinkSettings(joinTree.Options().Ref());
+ CollectAdditiveInputLabelsSide(joinTree, hasAny, isAdditiveByLabel, true, settings);
+ CollectAdditiveInputLabelsSide(joinTree, hasAny, isAdditiveByLabel, false, settings);
+ }
+
+ void CollectAdditiveInputLabelsSide(const TCoEquiJoinTuple& joinTree, bool hasAny, THashMap<TStringBuf, bool>& isAdditiveByLabel, bool isLeft, const TEquiJoinLinkSettings& settings) {
+ hasAny = hasAny || (isLeft ? settings.LeftHints : settings.RightHints).contains("any");
+ const auto scope = isLeft ? joinTree.LeftScope() : joinTree.RightScope();
+ TStringBuf joinKind = joinTree.Type().Value();
+ if (scope.Maybe<TCoEquiJoinTuple>()) {
+ CollectAdditiveInputLabels(scope.Cast<TCoEquiJoinTuple>(), hasAny, isAdditiveByLabel);
+ } else {
+ YQL_ENSURE(scope.Maybe<TCoAtom>());
+ bool additive = !hasAny && (joinKind == (isLeft ? "Left" : "Right") || joinKind == "Inner" || joinKind == "Cross");
+ TStringBuf label = scope.Cast<TCoAtom>().Value();
+ if (!additive || !isAdditiveByLabel.contains(label)) {
+ isAdditiveByLabel[label] = additive;
+ }
+ }
+ }
+
bool CollectEquiJoinOnlyParents(const TExprNode& current, const TExprNode* prev, ui32 depth,
TVector<TEquiJoinParent>& results, const TExprNode* extractMembersInScope,
const TParentsMap& parents)
@@ -951,6 +976,12 @@ bool IsRightJoinSideOptional(const TStringBuf& joinType) {
return false;
}
+THashMap<TStringBuf, bool> CollectAdditiveInputLabels(const TCoEquiJoinTuple& joinTree) {
+ THashMap<TStringBuf, bool> result;
+ CollectAdditiveInputLabels(joinTree, false, result);
+ return result;
+}
+
TExprNode::TPtr FilterOutNullJoinColumns(TPositionHandle pos, const TExprNode::TPtr& input,
const TJoinLabel& label, const TSet<TString>& optionalKeyColumns, TExprContext& ctx) {
if (optionalKeyColumns.empty()) {
diff --git a/yql/essentials/core/yql_join.h b/yql/essentials/core/yql_join.h
index d7df5f988b..d79037a3d9 100644
--- a/yql/essentials/core/yql_join.h
+++ b/yql/essentials/core/yql_join.h
@@ -92,6 +92,7 @@ THashMap<TStringBuf, THashSet<TStringBuf>> CollectEquiJoinKeyColumnsByLabel(cons
bool IsLeftJoinSideOptional(const TStringBuf& joinType);
bool IsRightJoinSideOptional(const TStringBuf& joinType);
+THashMap<TStringBuf, bool> CollectAdditiveInputLabels(const NNodes::TCoEquiJoinTuple& joinTree);
TExprNode::TPtr FilterOutNullJoinColumns(TPositionHandle pos, const TExprNode::TPtr& input,
const TJoinLabel& label, const TSet<TString>& optionalKeyColumns, TExprContext& ctx);
diff --git a/yt/yql/tests/sql/suites/join/pullup_extend.cfg b/yt/yql/tests/sql/suites/join/pullup_extend.cfg
new file mode 100644
index 0000000000..829b6d19f2
--- /dev/null
+++ b/yt/yql/tests/sql/suites/join/pullup_extend.cfg
@@ -0,0 +1,4 @@
+in Input1 kv1_sorted.txt
+in Input2 kv2_sorted.txt
+in Input3 kv3_sorted.txt
+in Input4 kv4_sorted.txt
diff --git a/yt/yql/tests/sql/suites/join/pullup_extend.sql b/yt/yql/tests/sql/suites/join/pullup_extend.sql
new file mode 100644
index 0000000000..72987a0e8b
--- /dev/null
+++ b/yt/yql/tests/sql/suites/join/pullup_extend.sql
@@ -0,0 +1,18 @@
+pragma EmitUnionMerge;
+pragma config.flags("OptimizerFlags", "PullUpExtendOverEquiJoin");
+pragma yt.JoinMergeTablesLimit="10";
+
+use plato;
+
+$t1 = select k1 as k2, "111" as v2 from Input1;
+$t2 = select k2, v2 from Input2;
+
+$a = select * from $t1
+ union all
+ select * from $t2
+;
+
+select
+ a.k2 as k,
+ a.v2 as av2
+from $a as a join Input4 as b on a.k2 = b.k4;