diff options
author | aneporada <[email protected]> | 2025-03-11 18:42:41 +0300 |
---|---|---|
committer | aneporada <[email protected]> | 2025-03-11 19:04:17 +0300 |
commit | 1016134d2a4483647a89d0c50e2d55d712955de3 (patch) | |
tree | 26ad73d7d670e2f6c2446b7a9e9dc6db5fbf3c0b | |
parent | f10bfbecbac59ae15f32c710fea36cf398b9fa6c (diff) |
Rework phy opt for EquiJoin in multicluster mode
commit_hash:44357310d1eb84381a4db35064ca394e85680b33
3 files changed, 26 insertions, 20 deletions
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp index 99e55411701..da0b5919ac9 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp @@ -25,10 +25,11 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EquiJoin(TExprBase node auto equiJoin = node.Cast<TCoEquiJoin>(); - TMaybeNode<TYtDSink> dataSink; - TString usedCluster; + TString runtimeCluster; const ERuntimeClusterSelectionMode selectionMode = State_->Configuration->RuntimeClusterSelection.Get().GetOrElse(DEFAULT_RUNTIME_CLUSTER_SELECTION); + TVector<TString> inputClusters(equiJoin.ArgCount() - 2); + bool hasYtInput = false; for (size_t i = 0; i + 2 < equiJoin.ArgCount(); ++i) { auto list = equiJoin.Arg(i).Cast<TCoEquiJoinInput>().List(); if (auto maybeExtractMembers = list.Maybe<TCoExtractMembers>()) { @@ -36,32 +37,34 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EquiJoin(TExprBase node } if (auto maybeFlatMap = list.Maybe<TCoFlatMapBase>()) { TSyncMap syncList; - if (!IsYtCompleteIsolatedLambda(maybeFlatMap.Cast().Lambda().Ref(), syncList, usedCluster, false, selectionMode)) { + if (!IsYtCompleteIsolatedLambda(maybeFlatMap.Cast().Lambda().Ref(), syncList, inputClusters[i], false, selectionMode)) { return node; } list = maybeFlatMap.Cast().Input(); } if (!IsYtProviderInput(list)) { TSyncMap syncList; - if (!IsYtCompleteIsolatedLambda(list.Ref(), syncList, usedCluster, false, selectionMode)) { + if (!IsYtCompleteIsolatedLambda(list.Ref(), syncList, inputClusters[i], false, selectionMode)) { + return node; + } + } else { + hasYtInput = true; + auto cluster = ToString(GetClusterName(list)); + if (!UpdateUsedCluster(inputClusters[i], cluster, selectionMode)) { return node; } - continue; - } - - if (!dataSink) { - dataSink = GetDataSink(list, ctx); } - auto cluster = ToString(GetClusterName(list)); - if (!UpdateUsedCluster(usedCluster, cluster, selectionMode)) { + if (inputClusters[i] && !UpdateUsedCluster(runtimeCluster, inputClusters[i], selectionMode)) { return node; } } - if (!dataSink) { + if (!hasYtInput) { return node; } + YQL_ENSURE(runtimeCluster); + THashMap<TStringBuf, std::pair<TVector<TStringBuf>, ui32>> tableSortKeysUsage = CollectTableSortKeysUsage(State_, equiJoin); @@ -89,7 +92,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EquiJoin(TExprBase node if (auto maybeFlatMap = listStepForward.Maybe<TCoFlatMapBase>()) { auto flatMap = maybeFlatMap.Cast(); if (IsLambdaSuitableForPullingIntoEquiJoin(flatMap, joinInput.Scope().Ref(), tableKeysMap, extractedMembers.Get())) { - if (!IsYtCompleteIsolatedLambda(flatMap.Lambda().Ref(), worldList, usedCluster, false, selectionMode)) { + if (!IsYtCompleteIsolatedLambda(flatMap.Lambda().Ref(), worldList, false)) { return node; } @@ -139,7 +142,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EquiJoin(TExprBase node } else { TSyncMap syncList; - if (!IsYtCompleteIsolatedLambda(list.Ref(), syncList, usedCluster, false, selectionMode)) { + if (!IsYtCompleteIsolatedLambda(list.Ref(), syncList, false)) { return node; } @@ -165,14 +168,14 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EquiJoin(TExprBase node return {}; } + TYtDSink dataSink = MakeDataSink(list.Pos(), inputClusters[i] ? inputClusters[i] : runtimeCluster, ctx); section = newSection = Build<TYtSection>(ctx, list.Pos()) .Paths() .Add() .Table<TYtOutput>() .Operation<TYtFill>() .World(ApplySyncListToWorld(ctx.NewWorld(list.Pos()), syncList, ctx)) - // FIXME? - .DataSink(dataSink.Cast()) + .DataSink(dataSink) .Content(MakeJobLambdaNoArg(cleanup.Cast(), ctx)) .Output() .Add(outTable.ToExprNode(ctx, list.Pos()).Cast<TYtOutTable>()) @@ -231,9 +234,10 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EquiJoin(TExprBase node if (!NPrivate::EnsurePersistableYsonTypes(sectionNode.Pos(), *scheme, ctx, State_)) { return {}; } + TYtDSink dataSink = MakeDataSink(list.Pos(), inputClusters[i] ? inputClusters[i] : runtimeCluster, ctx); auto path = CopyOrTrivialMap(sectionNode.Pos(), TExprBase(world ? world : ctx.NewWorld(sectionNode.Pos())), - dataSink.Cast(), + dataSink, *scheme, Build<TYtSection>(ctx, sectionNode.Pos()) .InitFrom(sectionNode) @@ -296,7 +300,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EquiJoin(TExprBase node const auto join = Build<TYtEquiJoin>(ctx, node.Pos()) .World(ApplySyncListToWorld(ctx.NewWorld(node.Pos()), worldList, ctx)) - .DataSink(dataSink.Cast()) + .DataSink(MakeDataSink(node.Pos(), runtimeCluster, ctx)) .Input() .Add(sections) .Build() diff --git a/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp b/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp index dc9b86032e1..8f899ae1107 100644 --- a/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp @@ -198,14 +198,15 @@ private: if (auto maybeTable = path.Table().Maybe<TYtTable>()) { auto table = maybeTable.Cast(); auto tableName = table.Name().Value(); + TString tableCluster{table.Cluster().Value()}; if (!NYql::HasSetting(table.Settings().Ref(), EYtSettingType::UserSchema)) { // Don't validate already substituted anonymous tables if (!TYtTableInfo::HasSubstAnonymousLabel(table)) { - const TYtTableDescription& tableDesc = State_->TablesData->GetTable(clusterName, + const TYtTableDescription& tableDesc = State_->TablesData->GetTable(tableCluster, TString{tableName}, TEpochInfo::Parse(table.Epoch().Ref())); - if (!tableDesc.Validate(ctx.GetPosition(table.Pos()), clusterName, tableName, + if (!tableDesc.Validate(ctx.GetPosition(table.Pos()), tableCluster, tableName, NYql::HasSetting(table.Settings().Ref(), EYtSettingType::WithQB), State_->AnonymousLabels, ctx)) { return TStatus::Error; } diff --git a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp index 09791a5d3fd..058c8ba45bc 100644 --- a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp @@ -321,6 +321,7 @@ bool IsYtIsolatedLambda(const TExprNode& lambdaBody, TSyncMap& syncList, bool su } // unnamed bool UpdateUsedCluster(TString& usedCluster, const TString& newCluster, ERuntimeClusterSelectionMode mode) { + YQL_ENSURE(newCluster); switch (mode) { case NYql::ERuntimeClusterSelectionMode::Disable: { if (!usedCluster) { |