summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <[email protected]>2025-03-11 18:42:41 +0300
committeraneporada <[email protected]>2025-03-11 19:04:17 +0300
commit1016134d2a4483647a89d0c50e2d55d712955de3 (patch)
tree26ad73d7d670e2f6c2446b7a9e9dc6db5fbf3c0b
parentf10bfbecbac59ae15f32c710fea36cf398b9fa6c (diff)
Rework phy opt for EquiJoin in multicluster mode
commit_hash:44357310d1eb84381a4db35064ca394e85680b33
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp40
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp5
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_helpers.cpp1
3 files changed, 26 insertions, 20 deletions
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
index 99e55411701..da0b5919ac9 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
@@ -25,10 +25,11 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EquiJoin(TExprBase node
auto equiJoin = node.Cast<TCoEquiJoin>();
- TMaybeNode<TYtDSink> dataSink;
- TString usedCluster;
+ TString runtimeCluster;
const ERuntimeClusterSelectionMode selectionMode =
State_->Configuration->RuntimeClusterSelection.Get().GetOrElse(DEFAULT_RUNTIME_CLUSTER_SELECTION);
+ TVector<TString> inputClusters(equiJoin.ArgCount() - 2);
+ bool hasYtInput = false;
for (size_t i = 0; i + 2 < equiJoin.ArgCount(); ++i) {
auto list = equiJoin.Arg(i).Cast<TCoEquiJoinInput>().List();
if (auto maybeExtractMembers = list.Maybe<TCoExtractMembers>()) {
@@ -36,32 +37,34 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EquiJoin(TExprBase node
}
if (auto maybeFlatMap = list.Maybe<TCoFlatMapBase>()) {
TSyncMap syncList;
- if (!IsYtCompleteIsolatedLambda(maybeFlatMap.Cast().Lambda().Ref(), syncList, usedCluster, false, selectionMode)) {
+ if (!IsYtCompleteIsolatedLambda(maybeFlatMap.Cast().Lambda().Ref(), syncList, inputClusters[i], false, selectionMode)) {
return node;
}
list = maybeFlatMap.Cast().Input();
}
if (!IsYtProviderInput(list)) {
TSyncMap syncList;
- if (!IsYtCompleteIsolatedLambda(list.Ref(), syncList, usedCluster, false, selectionMode)) {
+ if (!IsYtCompleteIsolatedLambda(list.Ref(), syncList, inputClusters[i], false, selectionMode)) {
+ return node;
+ }
+ } else {
+ hasYtInput = true;
+ auto cluster = ToString(GetClusterName(list));
+ if (!UpdateUsedCluster(inputClusters[i], cluster, selectionMode)) {
return node;
}
- continue;
- }
-
- if (!dataSink) {
- dataSink = GetDataSink(list, ctx);
}
- auto cluster = ToString(GetClusterName(list));
- if (!UpdateUsedCluster(usedCluster, cluster, selectionMode)) {
+ if (inputClusters[i] && !UpdateUsedCluster(runtimeCluster, inputClusters[i], selectionMode)) {
return node;
}
}
- if (!dataSink) {
+ if (!hasYtInput) {
return node;
}
+ YQL_ENSURE(runtimeCluster);
+
THashMap<TStringBuf, std::pair<TVector<TStringBuf>, ui32>> tableSortKeysUsage =
CollectTableSortKeysUsage(State_, equiJoin);
@@ -89,7 +92,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EquiJoin(TExprBase node
if (auto maybeFlatMap = listStepForward.Maybe<TCoFlatMapBase>()) {
auto flatMap = maybeFlatMap.Cast();
if (IsLambdaSuitableForPullingIntoEquiJoin(flatMap, joinInput.Scope().Ref(), tableKeysMap, extractedMembers.Get())) {
- if (!IsYtCompleteIsolatedLambda(flatMap.Lambda().Ref(), worldList, usedCluster, false, selectionMode)) {
+ if (!IsYtCompleteIsolatedLambda(flatMap.Lambda().Ref(), worldList, false)) {
return node;
}
@@ -139,7 +142,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EquiJoin(TExprBase node
}
else {
TSyncMap syncList;
- if (!IsYtCompleteIsolatedLambda(list.Ref(), syncList, usedCluster, false, selectionMode)) {
+ if (!IsYtCompleteIsolatedLambda(list.Ref(), syncList, false)) {
return node;
}
@@ -165,14 +168,14 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EquiJoin(TExprBase node
return {};
}
+ TYtDSink dataSink = MakeDataSink(list.Pos(), inputClusters[i] ? inputClusters[i] : runtimeCluster, ctx);
section = newSection = Build<TYtSection>(ctx, list.Pos())
.Paths()
.Add()
.Table<TYtOutput>()
.Operation<TYtFill>()
.World(ApplySyncListToWorld(ctx.NewWorld(list.Pos()), syncList, ctx))
- // FIXME?
- .DataSink(dataSink.Cast())
+ .DataSink(dataSink)
.Content(MakeJobLambdaNoArg(cleanup.Cast(), ctx))
.Output()
.Add(outTable.ToExprNode(ctx, list.Pos()).Cast<TYtOutTable>())
@@ -231,9 +234,10 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EquiJoin(TExprBase node
if (!NPrivate::EnsurePersistableYsonTypes(sectionNode.Pos(), *scheme, ctx, State_)) {
return {};
}
+ TYtDSink dataSink = MakeDataSink(list.Pos(), inputClusters[i] ? inputClusters[i] : runtimeCluster, ctx);
auto path = CopyOrTrivialMap(sectionNode.Pos(),
TExprBase(world ? world : ctx.NewWorld(sectionNode.Pos())),
- dataSink.Cast(),
+ dataSink,
*scheme,
Build<TYtSection>(ctx, sectionNode.Pos())
.InitFrom(sectionNode)
@@ -296,7 +300,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EquiJoin(TExprBase node
const auto join = Build<TYtEquiJoin>(ctx, node.Pos())
.World(ApplySyncListToWorld(ctx.NewWorld(node.Pos()), worldList, ctx))
- .DataSink(dataSink.Cast())
+ .DataSink(MakeDataSink(node.Pos(), runtimeCluster, ctx))
.Input()
.Add(sections)
.Build()
diff --git a/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp b/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp
index dc9b86032e1..8f899ae1107 100644
--- a/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp
@@ -198,14 +198,15 @@ private:
if (auto maybeTable = path.Table().Maybe<TYtTable>()) {
auto table = maybeTable.Cast();
auto tableName = table.Name().Value();
+ TString tableCluster{table.Cluster().Value()};
if (!NYql::HasSetting(table.Settings().Ref(), EYtSettingType::UserSchema)) {
// Don't validate already substituted anonymous tables
if (!TYtTableInfo::HasSubstAnonymousLabel(table)) {
- const TYtTableDescription& tableDesc = State_->TablesData->GetTable(clusterName,
+ const TYtTableDescription& tableDesc = State_->TablesData->GetTable(tableCluster,
TString{tableName},
TEpochInfo::Parse(table.Epoch().Ref()));
- if (!tableDesc.Validate(ctx.GetPosition(table.Pos()), clusterName, tableName,
+ if (!tableDesc.Validate(ctx.GetPosition(table.Pos()), tableCluster, tableName,
NYql::HasSetting(table.Settings().Ref(), EYtSettingType::WithQB), State_->AnonymousLabels, ctx)) {
return TStatus::Error;
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp
index 09791a5d3fd..058c8ba45bc 100644
--- a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp
@@ -321,6 +321,7 @@ bool IsYtIsolatedLambda(const TExprNode& lambdaBody, TSyncMap& syncList, bool su
} // unnamed
bool UpdateUsedCluster(TString& usedCluster, const TString& newCluster, ERuntimeClusterSelectionMode mode) {
+ YQL_ENSURE(newCluster);
switch (mode) {
case NYql::ERuntimeClusterSelectionMode::Disable: {
if (!usedCluster) {