diff options
author | aneporada <aneporada@yandex-team.com> | 2025-03-13 14:09:23 +0300 |
---|---|---|
committer | aneporada <aneporada@yandex-team.com> | 2025-03-13 14:30:25 +0300 |
commit | 202fe61b9a81ca8fff0aa72f46e3b479bb708d4f (patch) | |
tree | b95f21baacff5c80a7e33ccb955ed59a33aa0247 | |
parent | baa41a75d470d12510624a5e07349d34cb86ac33 (diff) | |
download | ydb-202fe61b9a81ca8fff0aa72f46e3b479bb708d4f.tar.gz |
Adapt columnar stats for multicluster environment
commit_hash:8426c8bb44c0b1f426ccbade7274c6d0bf6eafb2
11 files changed, 188 insertions, 172 deletions
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp index da0b5919ac..076237b9d7 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp @@ -347,7 +347,6 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EarlyMergeJoin(TExprBas TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBase node, TExprContext& ctx) const { auto equiJoin = node.Cast<TYtEquiJoin>(); - auto cluster = equiJoin.DataSink().Cluster().StringValue(); const bool tryReorder = State_->Types->CostBasedOptimizer != ECostBasedOptimizerType::Disable && equiJoin.Input().Size() > 2 @@ -369,12 +368,12 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBa if (tryReorder) { YQL_CLOG(INFO, ProviderYt) << "Collecting cbo stats for equiJoin"; - auto collectStatus = CollectCboStats(cluster, *tree, State_, ctx); + auto collectStatus = CollectCboStats(*tree, State_, ctx); if (collectStatus == TStatus::Repeat) { return ExportYtEquiJoin(equiJoin, *tree, ctx, State_); } - const auto optimizedTree = OrderJoins(tree, State_, cluster, ctx); + const auto optimizedTree = OrderJoins(tree, State_, ctx); if (optimizedTree != tree) { return ExportYtEquiJoin(equiJoin, *optimizedTree, ctx, State_); } diff --git a/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp b/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp index 6cf823a7e1..70bb6fed87 100644 --- a/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp @@ -21,7 +21,6 @@ void AddJoinColumns(THashMap<TString, THashSet<TString>>& relJoinColumns, const IGraphTransformer::TStatus ExtractInMemorySize( const TYtState::TPtr& state, - TString cluster, TExprContext& ctx, TMaybe<ui64>& leftMemorySize, TMaybe<ui64>& rightMemorySize, @@ -46,7 +45,7 @@ IGraphTransformer::TStatus ExtractInMemorySize( bool isCross = false; auto status = CollectStatsAndMapJoinSettings(mode, mapSettings, leftStats, rightStats, leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys, - leftLeaf, rightLeaf, *state, isCross, cluster, ctx); + leftLeaf, rightLeaf, *state, isCross, ctx); if (status != IGraphTransformer::TStatus::Ok) { YQL_CLOG(WARN, ProviderYt) << "Unable to collect paths and labels: " << status; return status; @@ -57,7 +56,7 @@ IGraphTransformer::TStatus ExtractInMemorySize( TVector<TString> leftJoinKeyList(leftJoinKeys.begin(), leftJoinKeys.end()); const ui64 rows = mapSettings.LeftRows; ui64 size = 0; - auto status = CalculateJoinLeafSize(size, mapSettings, leftLeaf->Section, *op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables); + auto status = CalculateJoinLeafSize(size, mapSettings, leftLeaf->Section, *op, ctx, true, leftItemType, leftJoinKeyList, state, leftTables); if (status != IGraphTransformer::TStatus::Ok) { YQL_CLOG(WARN, ProviderYt) << "Unable to calculate left join leaf size: " << status; return status; @@ -77,7 +76,7 @@ IGraphTransformer::TStatus ExtractInMemorySize( const ui64 rows = mapSettings.RightRows; ui64 size = 0; - auto status = CalculateJoinLeafSize(size, mapSettings, rightLeaf->Section, *op, ctx, false, rightItemType, rightJoinKeyList, state, cluster, rightTables); + auto status = CalculateJoinLeafSize(size, mapSettings, rightLeaf->Section, *op, ctx, false, rightItemType, rightJoinKeyList, state, rightTables); if (status != IGraphTransformer::TStatus::Ok) { YQL_CLOG(WARN, ProviderYt) << "Unable to calculate right join leaf size: " << status; return status; @@ -94,7 +93,6 @@ IGraphTransformer::TStatus ExtractInMemorySize( IGraphTransformer::TStatus CollectCboStatsLeaf( const THashMap<TString, THashSet<TString>>& relJoinColumns, - const TString& cluster, TYtJoinNodeLeaf& leaf, const TYtState::TPtr& state, TExprContext& ctx) @@ -115,36 +113,36 @@ IGraphTransformer::TStatus CollectCboStatsLeaf( } IYtGateway::TPathStatResult result; - return TryEstimateDataSizeChecked(result, leaf.Section, cluster, tables, requestedColumnList, *state, ctx); + return TryEstimateDataSizeChecked(result, leaf.Section, tables, requestedColumnList, *state, ctx); } -IGraphTransformer::TStatus CollectCboStatsNode(THashMap<TString, THashSet<TString>>& relJoinColumns, const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) { +IGraphTransformer::TStatus CollectCboStatsNode(THashMap<TString, THashSet<TString>>& relJoinColumns, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) { TYtJoinNodeLeaf* leftLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op.Left.Get()); TYtJoinNodeLeaf* rightLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op.Right.Get()); AddJoinColumns(relJoinColumns, op); TRelSizeInfo leftSizeInfo; TRelSizeInfo rightSizeInfo; - auto result = PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, state, cluster, ctx, &op); + auto result = PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, state, ctx, &op); if (result != IGraphTransformer::TStatus::Ok) { return result; } if (leftLeaf) { - result = CollectCboStatsLeaf(relJoinColumns, cluster, *leftLeaf, state, ctx); + result = CollectCboStatsLeaf(relJoinColumns, *leftLeaf, state, ctx); } else { auto& leftOp = *dynamic_cast<TYtJoinNodeOp*>(op.Left.Get()); - result = CollectCboStatsNode(relJoinColumns, cluster, leftOp, state, ctx); + result = CollectCboStatsNode(relJoinColumns, leftOp, state, ctx); } if (result != IGraphTransformer::TStatus::Ok) { return result; } if (rightLeaf) { - result = CollectCboStatsLeaf(relJoinColumns, cluster, *rightLeaf, state, ctx); + result = CollectCboStatsLeaf(relJoinColumns, *rightLeaf, state, ctx); } else { auto& rightOp = *dynamic_cast<TYtJoinNodeOp*>(op.Right.Get()); - result = CollectCboStatsNode(relJoinColumns, cluster, rightOp, state, ctx); + result = CollectCboStatsNode(relJoinColumns, rightOp, state, ctx); } return result; } @@ -155,7 +153,6 @@ IGraphTransformer::TStatus PopulateJoinStrategySizeInfo( TRelSizeInfo& outLeft, TRelSizeInfo& outRight, const TYtState::TPtr& state, - TString cluster, TExprContext& ctx, TYtJoinNodeOp* op) { auto mapJoinUseFlow = state->Configuration->MapJoinUseFlow.Get().GetOrElse(DEFAULT_MAP_JOIN_USE_FLOW); @@ -217,22 +214,22 @@ IGraphTransformer::TStatus PopulateJoinStrategySizeInfo( return IGraphTransformer::TStatus::Ok; } - auto status = ExtractInMemorySize(state, cluster, ctx, outLeft.MapJoinMemSize, outRight.MapJoinMemSize, ESizeStatCollectMode::ColumnarSize, op, labels, + auto status = ExtractInMemorySize(state, ctx, outLeft.MapJoinMemSize, outRight.MapJoinMemSize, ESizeStatCollectMode::ColumnarSize, op, labels, numLeaves, leftLeaf, leftTablesReady, leftTables, leftJoinKeys, leftItemType, rightLeaf, rightTablesReady, rightTables, rightJoinKeys, rightItemType); if (status != IGraphTransformer::TStatus::Ok) { return status; } - status = ExtractInMemorySize(state, cluster, ctx, outLeft.LookupJoinMemSize, outRight.LookupJoinMemSize, ESizeStatCollectMode::RawSize, op, labels, + status = ExtractInMemorySize(state, ctx, outLeft.LookupJoinMemSize, outRight.LookupJoinMemSize, ESizeStatCollectMode::RawSize, op, labels, numLeaves, leftLeaf, leftTablesReady, leftTables, leftJoinKeys, leftItemType, rightLeaf, rightTablesReady, rightTables, rightJoinKeys, rightItemType); return status; } -IGraphTransformer::TStatus CollectCboStats(const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) { +IGraphTransformer::TStatus CollectCboStats(TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) { THashMap<TString, THashSet<TString>> relJoinColumns; - return CollectCboStatsNode(relJoinColumns, cluster, op, state, ctx); + return CollectCboStatsNode(relJoinColumns, op, state, ctx); } TVector<TString> JoinLeafLabels(TExprNode::TPtr label) { diff --git a/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h b/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h index 0d05beea7a..fd5b894bb2 100644 --- a/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h +++ b/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h @@ -5,9 +5,9 @@ namespace NYql { -IGraphTransformer::TStatus CollectCboStats(const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx); +IGraphTransformer::TStatus CollectCboStats(TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx); -IGraphTransformer::TStatus PopulateJoinStrategySizeInfo(TRelSizeInfo& outLeft, TRelSizeInfo& outRight, const TYtState::TPtr& state, TString cluster, TExprContext& ctx, TYtJoinNodeOp* op); +IGraphTransformer::TStatus PopulateJoinStrategySizeInfo(TRelSizeInfo& outLeft, TRelSizeInfo& outRight, const TYtState::TPtr& state, TExprContext& ctx, TYtJoinNodeOp* op); TVector<TString> JoinLeafLabels(TExprNode::TPtr label); diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp index 9b691e5554..5ffc4cf1ef 100644 --- a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -157,7 +157,7 @@ public: { } - TVector<TVector<ui64>> EstimateColumnStats(TExprContext& ctx, const TString& cluster, const TVector<TVector<TYtPathInfo::TPtr>>& groupIdPathInfos, ui64& sumAllTableSizes) { + TVector<TVector<ui64>> EstimateColumnStats(TExprContext& ctx, const TVector<TVector<TYtPathInfo::TPtr>>& groupIdPathInfos, ui64& sumAllTableSizes) { TVector<TVector<ui64>> groupIdColumnarStats; groupIdColumnarStats.reserve(groupIdPathInfos.size()); TVector<bool> lookupsInfo; @@ -175,7 +175,7 @@ public: flattenPaths.push_back(pathInfo); } } - auto result = EstimateDataSize(cluster, flattenPaths, Nothing(), *State_, ctx); + auto result = EstimateDataSize(flattenPaths, Nothing(), *State_, ctx); size_t statIdx = 0; size_t pathIdx = 0; for (const auto& [idx, pathInfos]: Enumerate(groupIdPathInfos)) { @@ -302,7 +302,7 @@ public: } else { TVector<TVector<std::tuple<ui64, ui64, NYT::TRichYPath>>> partitionTuplesArr; ui64 sumAllTableSizes = 0; - TVector<TVector<ui64>> groupIdColumnarStats = EstimateColumnStats(ctx, cluster, {groupIdPathInfos}, sumAllTableSizes); + TVector<TVector<ui64>> groupIdColumnarStats = EstimateColumnStats(ctx, {groupIdPathInfos}, sumAllTableSizes); ui64 parts = (sumAllTableSizes + dataSizePerJob - 1) / dataSizePerJob; if (settings.CanFallback && hasErasure && parts > maxTasks) { auto message = DqFallbackErrorMessageWrap("too big table with erasure codec"); @@ -634,7 +634,7 @@ public: } ui64 dataSize = 0; for (auto& [cluster, info]: clusterToNodesAndErasure) { - auto res = EstimateColumnStats(ctx, cluster, clusterToGroups[cluster], dataSize); + auto res = EstimateColumnStats(ctx, clusterToGroups[cluster], dataSize); auto codecCpu = State_->Configuration->ErasureCodecCpuForDq.Get(cluster); if (!codecCpu) { continue; diff --git a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp index 32c1dc2359..b1868c0a4a 100644 --- a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp @@ -188,7 +188,7 @@ bool IsYtIsolatedLambdaImpl(const TExprNode& lambdaBody, TSyncMap& syncList, TSt } IGraphTransformer::TStatus EstimateDataSize(IYtGateway::TPathStatResult& result, TSet<TString>& requestedColumns, - const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths, + const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx, bool sync) { result = IYtGateway::TPathStatResult{}; @@ -199,9 +199,10 @@ IGraphTransformer::TStatus EstimateDataSize(IYtGateway::TPathStatResult& result, const bool useColumnarStat = GetJoinCollectColumnarStatisticsMode(*state.Configuration) != EJoinCollectColumnarStatisticsMode::Disable && !state.Types->UseTableMetaFromGraph; - TVector<size_t> reqMap; TVector<IYtGateway::TPathStatReq> pathStatReqs; - ui64 totalChunkCount = 0; + THashMap<TString, TVector<size_t>> reqMapByCluster; + TMap<TString, TVector<IYtGateway::TPathStatReq>> pathStatReqsByCluster; + THashMap<TString, ui64> totalChunkCountByCluster; for (size_t i: xrange(paths.size())) { const TYtPathInfo::TPtr& pathInfo = paths[i]; YQL_ENSURE(pathInfo->Table->Stat); @@ -222,62 +223,83 @@ IGraphTransformer::TStatus EstimateDataSize(IYtGateway::TPathStatResult& result, overrideColumns = columns; } - auto ytPath = BuildYtPathForStatRequest(cluster, *pathInfo, overrideColumns, state, ctx); + auto ytPath = BuildYtPathForStatRequest(*pathInfo, overrideColumns, state, ctx); if (!ytPath) { return IGraphTransformer::TStatus::Error; } if (ytPath->Columns_) { - pathStatReqs.push_back( + const TString cluster = pathInfo->Table->Cluster; + YQL_ENSURE(cluster); + pathStatReqsByCluster[cluster].push_back( IYtGateway::TPathStatReq() .Path(*ytPath) .IsTemp(pathInfo->Table->IsTemp) .IsAnonymous(pathInfo->Table->IsAnonymous) .Epoch(pathInfo->Table->Epoch.GetOrElse(0)) ); - reqMap.push_back(i); - totalChunkCount += pathInfo->Table->Stat->ChunkCount; + reqMapByCluster[cluster].push_back(i); + totalChunkCountByCluster[cluster] += pathInfo->Table->Stat->ChunkCount; } } } - if (!pathStatReqs.empty()) { - for (auto& req : pathStatReqs) { - YQL_ENSURE(req.Path().Columns_); - requestedColumns.insert(req.Path().Columns_->Parts_.begin(), req.Path().Columns_->Parts_.end()); + if (!pathStatReqsByCluster.empty()) { + const TMaybe<ui64> maxChunkCountExtendedStats = state.Configuration->ExtendedStatsMaxChunkCount.Get(); + TMap<TString, IYtGateway::TPathStatResult> pathStatsByCluster; + TMap<TString, NThreading::TFuture<IYtGateway::TPathStatResult>> futuresByCluster; + THashSet<TString> extendedStatsRequested; + IGraphTransformer::TStatus resultStatus = IGraphTransformer::TStatus::Ok; + for (const auto& [cluster, reqs] : pathStatReqsByCluster) { + for (auto& req : reqs) { + YQL_ENSURE(req.Path().Columns_); + requestedColumns.insert(req.Path().Columns_->Parts_.begin(), req.Path().Columns_->Parts_.end()); + } + const bool requestExtendedStats = !sync && maxChunkCountExtendedStats && + (*maxChunkCountExtendedStats == 0 || totalChunkCountByCluster[cluster] <= *maxChunkCountExtendedStats); + IYtGateway::TPathStatOptions pathStatOptions = + IYtGateway::TPathStatOptions(state.SessionId) + .Cluster(cluster) + .Paths(reqs) + .Config(state.Configuration->Snapshot()) + .Extended(requestExtendedStats); + if (requestExtendedStats) { + extendedStatsRequested.insert(cluster); + } + if (sync) { + futuresByCluster[cluster] = state.Gateway->PathStat(std::move(pathStatOptions)); + } else { + auto& pathStats = pathStatsByCluster[cluster]; + pathStats = state.Gateway->TryPathStat(std::move(pathStatOptions)); + if (!pathStats.Success()) { + resultStatus = resultStatus.Combine(IGraphTransformer::TStatus::Repeat); + } + } } - const TMaybe<ui64> maxChunkCountExtendedStats = state.Configuration->ExtendedStatsMaxChunkCount.Get(); - const bool requestExtendedStats = !sync && maxChunkCountExtendedStats && - (*maxChunkCountExtendedStats == 0 || totalChunkCount <= *maxChunkCountExtendedStats); - - IYtGateway::TPathStatResult pathStats; - IYtGateway::TPathStatOptions pathStatOptions = - IYtGateway::TPathStatOptions(state.SessionId) - .Cluster(cluster) - .Paths(pathStatReqs) - .Config(state.Configuration->Snapshot()) - .Extended(requestExtendedStats); - if (sync) { - auto future = state.Gateway->PathStat(std::move(pathStatOptions)); + for (auto& [cluster, future] : futuresByCluster) { + auto& pathStats = pathStatsByCluster[cluster]; pathStats = future.GetValueSync(); pathStats.ReportIssues(ctx.IssueManager); if (!pathStats.Success()) { - return IGraphTransformer::TStatus::Error; - } - } else { - pathStats = state.Gateway->TryPathStat(std::move(pathStatOptions)); - if (!pathStats.Success()) { - return IGraphTransformer::TStatus::Repeat; + resultStatus = resultStatus.Combine(IGraphTransformer::TStatus::Error); } } - YQL_ENSURE(pathStats.DataSize.size() == reqMap.size()); - YQL_ENSURE(!requestExtendedStats || pathStats.Extended.size() == reqMap.size()); - for (size_t i: xrange(pathStats.DataSize.size())) { - result.DataSize[reqMap[i]] = pathStats.DataSize[i]; - if (requestExtendedStats) { - result.Extended[reqMap[i]] = pathStats.Extended[i]; + if (resultStatus != IGraphTransformer::TStatus::Ok) { + return resultStatus; + } + + for (auto& [cluster, pathStats] : pathStatsByCluster) { + auto it = reqMapByCluster.find(cluster); + YQL_ENSURE(it != reqMapByCluster.end()); + YQL_ENSURE(pathStats.DataSize.size() == it->second.size()); + YQL_ENSURE(!extendedStatsRequested.contains(cluster) || pathStats.Extended.size() == it->second.size()); + for (size_t i: xrange(pathStats.DataSize.size())) { + result.DataSize[it->second[i]] = pathStats.DataSize[i]; + if (extendedStatsRequested.contains(cluster)) { + result.Extended[it->second[i]] = pathStats.Extended[i]; + } } } } @@ -1847,7 +1869,7 @@ bool IsOutputUsedMultipleTimes(const TExprNode& op, const TParentsMap& parentsMa return node == nullptr; } -TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TString& cluster, const TYtPathInfo& pathInfo, +TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TYtPathInfo& pathInfo, const TMaybe<TVector<TString>>& overrideColumns, const TYtState& state, TExprContext& ctx) { auto ytPath = NYT::TRichYPath(pathInfo.Table->Name); @@ -1858,6 +1880,8 @@ TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TString& cluster, const if (ytPath.Columns_ && dynamic_cast<TYtTableInfo*>(pathInfo.Table.Get()) && pathInfo.Table->IsAnonymous && !TYtTableInfo::HasSubstAnonymousLabel(pathInfo.Table->FromNode.Cast())) { + const TString cluster = pathInfo.Table->Cluster; + YQL_ENSURE(cluster); TString realTableName = state.AnonymousLabels.Value(std::make_pair(cluster, pathInfo.Table->Name), TString()); if (!realTableName) { TPositionHandle pos; @@ -1873,7 +1897,7 @@ TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TString& cluster, const return ytPath; } -TMaybe<TVector<ui64>> EstimateDataSize(const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths, +TMaybe<TVector<ui64>> EstimateDataSize(const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx) { TVector<ui64> result; @@ -1882,7 +1906,7 @@ TMaybe<TVector<ui64>> EstimateDataSize(const TString& cluster, const TVector<TYt bool sync = true; IYtGateway::TPathStatResult res; - auto status = EstimateDataSize(res, requestedColumns, cluster, paths, columns, state, ctx, sync); + auto status = EstimateDataSize(res, requestedColumns, paths, columns, state, ctx, sync); if (status != IGraphTransformer::TStatus::Ok) { return {}; } @@ -1891,11 +1915,11 @@ TMaybe<TVector<ui64>> EstimateDataSize(const TString& cluster, const TVector<TYt } IGraphTransformer::TStatus TryEstimateDataSize(IYtGateway::TPathStatResult& result, TSet<TString>& requestedColumns, - const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths, + const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx) { bool sync = false; - return EstimateDataSize(result, requestedColumns, cluster, paths, columns, state, ctx, sync); + return EstimateDataSize(result, requestedColumns, paths, columns, state, ctx, sync); } TYtSection UpdateInputFields(TYtSection section, TExprBase fields, TExprContext& ctx) { diff --git a/yt/yql/providers/yt/provider/yql_yt_helpers.h b/yt/yql/providers/yt/provider/yql_yt_helpers.h index 61a1c4bab5..3a821b16d9 100644 --- a/yt/yql/providers/yt/provider/yql_yt_helpers.h +++ b/yt/yql/providers/yt/provider/yql_yt_helpers.h @@ -90,12 +90,12 @@ NNodes::TYtPath CopyOrTrivialMap(TPositionHandle pos, NNodes::TExprBase world, N const TCopyOrTrivialMapOpts& opts); bool IsOutputUsedMultipleTimes(const TExprNode& op, const TParentsMap& parentsMap); -TMaybe<TVector<ui64>> EstimateDataSize(const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths, +TMaybe<TVector<ui64>> EstimateDataSize( const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx); IGraphTransformer::TStatus TryEstimateDataSize(IYtGateway::TPathStatResult& result, TSet<TString>& requestedColumns, - const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths, + const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx); -TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TString& cluster, const TYtPathInfo& pathInfo, +TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TYtPathInfo& pathInfo, const TMaybe<TVector<TString>>& overrideColumns, const TYtState& state, TExprContext& ctx); NNodes::TYtSection UpdateInputFields(NNodes::TYtSection section, NNodes::TExprBase fields, TExprContext& ctx); diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp index 9f663d2897..f4942ab18d 100644 --- a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp @@ -193,11 +193,11 @@ bool HasNonTrivialAny(const TEquiJoinLinkSettings& linkSettings, const TMapJoinS TStatus UpdateInMemorySizeSetting(TMapJoinSettings& settings, TYtSection& inputSection, const TJoinLabels& labels, const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft, - const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster, + const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TVector<TYtPathInfo::TPtr>& tables, bool mapJoinUseFlow) { ui64 size = 0; - auto status = CalculateJoinLeafSize(size, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, cluster, tables); + auto status = CalculateJoinLeafSize(size, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, tables); if (status != TStatus::Ok) { return status; } @@ -230,11 +230,11 @@ TStatus UpdateInMemorySizeSetting(TMapJoinSettings& settings, TYtSection& inputS TStatus UpdateInMemorySizeUsingBlocksSetting(TMapJoinSettings& settings, TYtSection& inputSection, const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft, - const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster, + const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TVector<TYtPathInfo::TPtr>& tables) { ui64 dataSize = 0; - auto status = CalculateJoinLeafSize(dataSize, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, cluster, tables); + auto status = CalculateJoinLeafSize(dataSize, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, tables); if (status != TStatus::Ok) { return status; } @@ -3050,7 +3050,7 @@ bool RewriteYtEmptyJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, TYtJoin } TStatus CollectJoinSideStats(ESizeStatCollectMode sizeMode, TJoinSideStats& stats, TYtSection& inputSection, - const TYtState& state, const TString& cluster, + const TYtState& state, const TVector<TYtPathInfo::TPtr>& tableInfo, const THashSet<TString>& joinKeys, bool isCross, TMaybeNode<TCoLambda> premap, TExprContext& ctx) { @@ -3103,7 +3103,7 @@ TStatus CollectJoinSideStats(ESizeStatCollectMode sizeMode, TJoinSideStats& stat } IYtGateway::TPathStatResult pathStatResult; - auto status = TryEstimateDataSizeChecked(pathStatResult, inputSection, cluster, tableInfo, {}, state, ctx); + auto status = TryEstimateDataSizeChecked(pathStatResult, inputSection, tableInfo, {}, state, ctx); if (status.Level != TStatus::Ok) { return status; } @@ -3271,8 +3271,6 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo return TStatus::Repeat; } - auto cluster = TString{equiJoin.DataSink().Cluster().Value()}; - TMapJoinSettings mapSettings; TJoinSideStats leftStats; TJoinSideStats rightStats; @@ -3281,7 +3279,7 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo if (allowLookupJoin) { auto status = CollectStatsAndMapJoinSettings(ESizeStatCollectMode::RawSize, mapSettings, leftStats, rightStats, leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys, - &leftLeaf, &rightLeaf, *state, isCross, cluster, ctx); + &leftLeaf, &rightLeaf, *state, isCross, ctx); if (status.Level != TStatus::Ok) { return (status.Level == TStatus::Repeat) ? TStatus::Ok : status; } @@ -3352,7 +3350,7 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo { auto status = CollectStatsAndMapJoinSettings(ESizeStatCollectMode::ColumnarSize, mapSettings, leftStats, rightStats, leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys, - &leftLeaf, &rightLeaf, *state, isCross, cluster, ctx); + &leftLeaf, &rightLeaf, *state, isCross, ctx); if (status.Level != TStatus::Ok) { return (status.Level == TStatus::Repeat) ? TStatus::Ok : status; } @@ -3624,13 +3622,13 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo bool mapJoinUseBlocks = state->Configuration->BlockMapJoin.Get().GetOrElse(state->Types->UseBlocks); if (leftTablesReady) { - auto status = UpdateInMemorySizeSetting(mapSettings, leftLeaf.Section, labels, op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables, mapJoinUseFlow); + auto status = UpdateInMemorySizeSetting(mapSettings, leftLeaf.Section, labels, op, ctx, true, leftItemType, leftJoinKeyList, state, leftTables, mapJoinUseFlow); if (status.Level != TStatus::Ok) { return (status.Level == TStatus::Repeat) ? TStatus::Ok : status; } if (mapJoinUseBlocks) { - auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, leftLeaf.Section, op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables); + auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, leftLeaf.Section, op, ctx, true, leftItemType, leftJoinKeyList, state, leftTables); if (status.Level != TStatus::Ok) { return (status.Level == TStatus::Repeat) ? TStatus::Ok : status; } @@ -3638,13 +3636,13 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo } if (rightTablesReady) { - auto status = UpdateInMemorySizeSetting(mapSettings, rightLeaf.Section, labels, op, ctx, false, rightItemType, rightJoinKeyList, state, cluster, rightTables, mapJoinUseFlow); + auto status = UpdateInMemorySizeSetting(mapSettings, rightLeaf.Section, labels, op, ctx, false, rightItemType, rightJoinKeyList, state, rightTables, mapJoinUseFlow); if (status.Level != TStatus::Ok) { return (status.Level == TStatus::Repeat) ? TStatus::Ok : status; } if (mapJoinUseBlocks) { - auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, rightLeaf.Section, op, ctx, false, rightItemType, rightJoinKeyList, state, cluster, rightTables); + auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, rightLeaf.Section, op, ctx, false, rightItemType, rightJoinKeyList, state, rightTables); if (status.Level != TStatus::Ok) { return (status.Level == TStatus::Repeat) ? TStatus::Ok : status; } @@ -3947,9 +3945,6 @@ void CollectPossibleStarJoins(const TYtEquiJoin& equiJoin, TYtJoinNodeOp& op, co rightJoinKeyList = BuildJoinKeyList(labels.Inputs[leftLeaf ? 1 : 0], *op.RightLabel); } - - auto cluster = TString{equiJoin.DataSink().Cluster().Value()}; - TMapJoinSettings mapSettings; TJoinSideStats leftStats; TJoinSideStats rightStats; @@ -3958,7 +3953,7 @@ void CollectPossibleStarJoins(const TYtEquiJoin& equiJoin, TYtJoinNodeOp& op, co bool isCross = false; auto status = CollectStatsAndMapJoinSettings(ESizeStatCollectMode::NoSize, mapSettings, leftStats, rightStats, leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys, - leftLeaf, rightLeaf, *state, isCross, cluster, ctx); + leftLeaf, rightLeaf, *state, isCross, ctx); switch (status.Level) { case TStatus::Error: @@ -4866,12 +4861,12 @@ EStarRewriteStatus RewriteYtEquiJoinStar(TYtEquiJoin equiJoin, TYtJoinNodeOp& op } // namespace -IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResult& result, TYtSection& inputSection, const TString& cluster, +IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResult& result, TYtSection& inputSection, const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx) { result = IYtGateway::TPathStatResult(); if (GetJoinCollectColumnarStatisticsMode(*state.Configuration) == EJoinCollectColumnarStatisticsMode::Sync) { - auto syncResult = EstimateDataSize(cluster, paths, columns, state, ctx); + auto syncResult = EstimateDataSize(paths, columns, state, ctx); if (!syncResult) { return IGraphTransformer::TStatus::Error; } @@ -4881,7 +4876,7 @@ IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResul } TSet<TString> requestedColumns; - auto status = TryEstimateDataSize(result, requestedColumns, cluster, paths, columns, state, ctx); + auto status = TryEstimateDataSize(result, requestedColumns, paths, columns, state, ctx); auto settings = inputSection.Settings().Ptr(); if (status == TStatus::Repeat) { bool hasStatColumns = NYql::HasSetting(inputSection.Settings().Ref(), EYtSettingType::StatColumns); @@ -4935,7 +4930,7 @@ TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode sizeMode, TMapJoinSe bool leftTablesReady, const TVector<TYtPathInfo::TPtr>& leftTables, const THashSet<TString>& leftJoinKeys, bool rightTablesReady, const TVector<TYtPathInfo::TPtr>& rightTables, const THashSet<TString>& rightJoinKeys, TYtJoinNodeLeaf* leftLeaf, TYtJoinNodeLeaf* rightLeaf, const TYtState& state, bool isCross, - TString cluster, TExprContext& ctx) + TExprContext& ctx) { mapSettings = {}; leftStats = {}; @@ -4943,7 +4938,7 @@ TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode sizeMode, TMapJoinSe if (leftLeaf) { auto premap = GetPremapLambda(*leftLeaf); - auto joinSideStatus = CollectJoinSideStats(leftTablesReady ? sizeMode : ESizeStatCollectMode::NoSize, leftStats, leftLeaf->Section, state, cluster, + auto joinSideStatus = CollectJoinSideStats(leftTablesReady ? sizeMode : ESizeStatCollectMode::NoSize, leftStats, leftLeaf->Section, state, leftTables, leftJoinKeys, isCross, premap, ctx); if (joinSideStatus.Level != TStatus::Ok) { return joinSideStatus; @@ -4959,7 +4954,7 @@ TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode sizeMode, TMapJoinSe if (rightLeaf) { auto premap = GetPremapLambda(*rightLeaf); - auto joinSideStatus = CollectJoinSideStats(rightTablesReady ? sizeMode : ESizeStatCollectMode::NoSize, rightStats, rightLeaf->Section, state, cluster, + auto joinSideStatus = CollectJoinSideStats(rightTablesReady ? sizeMode : ESizeStatCollectMode::NoSize, rightStats, rightLeaf->Section, state, rightTables, rightJoinKeys, isCross, premap, ctx); if (joinSideStatus.Level != TStatus::Ok) { return joinSideStatus; @@ -4983,7 +4978,7 @@ TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode sizeMode, TMapJoinSe TStatus CalculateJoinLeafSize(ui64& result, TMapJoinSettings& settings, TYtSection& inputSection, const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft, - const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster, + const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TVector<TYtPathInfo::TPtr>& tables) { result = isLeft ? settings.LeftSize : settings.RightSize; @@ -4992,7 +4987,7 @@ TStatus CalculateJoinLeafSize(ui64& result, TMapJoinSettings& settings, TYtSecti if (!needPayload && !op.JoinKind->IsAtom("Cross")) { if (joinKeyList.size() < itemType->GetSize()) { IYtGateway::TPathStatResult pathStatResult; - auto status = TryEstimateDataSizeChecked(pathStatResult, inputSection, cluster, tables, joinKeyList, *state, ctx); + auto status = TryEstimateDataSizeChecked(pathStatResult, inputSection, tables, joinKeyList, *state, ctx); if (status.Level != TStatus::Ok) { return status; } diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.h b/yt/yql/providers/yt/provider/yql_yt_join_impl.h index d8702fa43e..80f79f4d3b 100644 --- a/yt/yql/providers/yt/provider/yql_yt_join_impl.h +++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.h @@ -72,12 +72,12 @@ TYtJoinNodeOp::TPtr ImportYtEquiJoin(TYtEquiJoin equiJoin, TExprContext& ctx); IGraphTransformer::TStatus RewriteYtEquiJoinLeaves(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx); IGraphTransformer::TStatus RewriteYtEquiJoin(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx); TMaybeNode<TExprBase> ExportYtEquiJoin(TYtEquiJoin equiJoin, const TYtJoinNodeOp& op, TExprContext& ctx, const TYtState::TPtr& state); -TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, const TString& cluster, TExprContext& ctx, bool debug = false); +TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, TExprContext& ctx, bool debug = false); struct IBaseOptimizerNode; struct IProviderContext; -void BuildOptimizerJoinTree(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TOptimizerLinkSettings& settings, TYtJoinNodeOp::TPtr op, TExprContext& ctx); +void BuildOptimizerJoinTree(TYtState::TPtr state, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TOptimizerLinkSettings& settings, TYtJoinNodeOp::TPtr op, TExprContext& ctx); TYtJoinNode::TPtr BuildYtJoinTree(std::shared_ptr<IBaseOptimizerNode> node, TExprContext& ctx, TPositionHandle pos); bool AreSimilarTrees(TYtJoinNode::TPtr node1, TYtJoinNode::TPtr node2); @@ -89,7 +89,7 @@ IGraphTransformer::TStatus CollectPathsAndLabelsReady( IGraphTransformer::TStatus CalculateJoinLeafSize(ui64& result, TMapJoinSettings& settings, TYtSection& inputSection, const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft, - const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster, + const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TVector<TYtPathInfo::TPtr>& tables); enum class ESizeStatCollectMode { @@ -115,9 +115,9 @@ IGraphTransformer::TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode s bool leftTablesReady, const TVector<TYtPathInfo::TPtr>& leftTables, const THashSet<TString>& leftJoinKeys, bool rightTablesReady, const TVector<TYtPathInfo::TPtr>& rightTables, const THashSet<TString>& rightJoinKeys, TYtJoinNodeLeaf* leftLeaf, TYtJoinNodeLeaf* rightLeaf, const TYtState& state, bool isCross, - TString cluster, TExprContext& ctx); + TExprContext& ctx); -IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResult& result, TYtSection& inputSection, const TString& cluster, +IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResult& result, TYtSection& inputSection, const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx); ui64 CalcInMemorySizeNoCrossJoin(const TJoinLabel& label, const TYtJoinNodeOp& op, const TMapJoinSettings& settings, bool isLeft, diff --git a/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp b/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp index a195a8f5d0..ca808b621b 100644 --- a/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp @@ -58,12 +58,10 @@ public: TJoinReorderer( TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, - const TString& cluster, TExprContext& ctx, bool debug = false) : Root(op) , State(state) - , Cluster(cluster) , Ctx(ctx) , Debug(debug) { @@ -78,7 +76,7 @@ public: std::shared_ptr<IBaseOptimizerNode> tree; TOptimizerLinkSettings linkSettings; std::shared_ptr<IProviderContext> providerCtx; - BuildOptimizerJoinTree(State, Cluster, tree, providerCtx, linkSettings, Root, Ctx); + BuildOptimizerJoinTree(State, tree, providerCtx, linkSettings, Root, Ctx); auto ytCtx = std::static_pointer_cast<TYtProviderContext>(providerCtx); std::function<void(const TString& str)> log; @@ -139,7 +137,6 @@ public: private: TYtJoinNodeOp::TPtr Root; const TYtState::TPtr& State; - TString Cluster; TExprContext& Ctx; bool Debug; }; @@ -177,9 +174,8 @@ class TOptimizerTreeBuilder { public: TOptimizerLinkSettings LinkSettings; - TOptimizerTreeBuilder(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TYtJoinNodeOp::TPtr inputTree, TExprContext& ctx) + TOptimizerTreeBuilder(TYtState::TPtr state, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TYtJoinNodeOp::TPtr inputTree, TExprContext& ctx) : State(state) - , Cluster(cluster) , Tree(tree) , OutProviderCtx(providerCtx) , InputTree(inputTree) @@ -258,7 +254,7 @@ private: } TRelSizeInfo leftSizeInfo; TRelSizeInfo rightSizeInfo; - PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, State, Cluster, Ctx, op); + PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, State, Ctx, op); auto left = ProcessNode(op->Left, leftSizeInfo); auto right = ProcessNode(op->Right, rightSizeInfo); @@ -374,7 +370,7 @@ private: TSet<TString> requestedColumns; IYtGateway::TPathStatResult result; - auto status = TryEstimateDataSize(result, requestedColumns, Cluster, paths, columns, *State, Ctx); + auto status = TryEstimateDataSize(result, requestedColumns, paths, columns, *State, Ctx); YQL_ENSURE(status != IGraphTransformer::TStatus::Error); if (status != IGraphTransformer::TStatus::Ok) { YQL_CLOG(WARN, ProviderYt) << "Unable to read path stats that must be already present in cache"; @@ -434,7 +430,6 @@ private: } TYtState::TPtr State; - const TString Cluster; std::shared_ptr<IBaseOptimizerNode>& Tree; std::shared_ptr<IProviderContext>& OutProviderCtx; THashMap<TString, THashSet<TString>> RelJoinColumns; @@ -517,9 +512,9 @@ bool AreSimilarTrees(TYtJoinNode::TPtr node1, TYtJoinNode::TPtr node2) { } } -void BuildOptimizerJoinTree(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TOptimizerLinkSettings& linkSettings, TYtJoinNodeOp::TPtr op, TExprContext& ctx) +void BuildOptimizerJoinTree(TYtState::TPtr state, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TOptimizerLinkSettings& linkSettings, TYtJoinNodeOp::TPtr op, TExprContext& ctx) { - TOptimizerTreeBuilder builder(state, cluster, tree, providerCtx, op, ctx); + TOptimizerTreeBuilder builder(state, tree, providerCtx, op, ctx); builder.Do(); linkSettings = builder.LinkSettings; } @@ -529,13 +524,13 @@ TYtJoinNode::TPtr BuildYtJoinTree(std::shared_ptr<IBaseOptimizerNode> node, TExp return BuildYtJoinTree(node, scope, ctx, pos); } -TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, const TString& cluster, TExprContext& ctx, bool debug) +TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, TExprContext& ctx, bool debug) { if (state->Types->CostBasedOptimizer == ECostBasedOptimizerType::Disable || op->CostBasedOptPassed) { return op; } - auto result = TJoinReorderer(op, state, cluster, ctx, debug).Do(); + auto result = TJoinReorderer(op, state, ctx, debug).Do(); if (!debug && AreSimilarTrees(result, op)) { return op; } diff --git a/yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp b/yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp index 789938e02f..525c967853 100644 --- a/yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp @@ -29,22 +29,21 @@ public: YQL_ENSURE(PathStatResults.empty()); } - TNodeMap<IYtGateway::TPathStatResult> PullPathStatResults() { - TNodeMap<IYtGateway::TPathStatResult> results; + TNodeMap<TVector<IYtGateway::TPathStatResult>> PullPathStatResults() { + TNodeMap<TVector<IYtGateway::TPathStatResult>> results; TGuard<TMutex> guard(Lock); results.swap(PathStatResults); return results; } - void MarkReady(TExprNode* node, const IYtGateway::TPathStatResult& result) { + void AddResult(TExprNode* node, const IYtGateway::TPathStatResult& result) { TGuard<TMutex> guard(Lock); - YQL_ENSURE(PathStatResults.count(node) == 0); - PathStatResults[node] = result; + PathStatResults[node].push_back(result); } private: mutable TMutex Lock; - TNodeMap<IYtGateway::TPathStatResult> PathStatResults; + TNodeMap<TVector<IYtGateway::TPathStatResult>> PathStatResults; }; class TYtLoadColumnarStatsTransformer : public TGraphTransformerBase { @@ -65,7 +64,7 @@ private: output = input; PathStatusState->EnsureNoInflightRequests(); - TVector<std::pair<IYtGateway::TPathStatOptions, TExprNode*>> pathStatArgs; + TVector<std::pair<TVector<IYtGateway::TPathStatOptions>, TExprNode*>> pathStatArgs; bool hasError = false; TNodeOnNodeOwnedMap sectionRewrites; VisitExpr(input, [this, &pathStatArgs, &hasError, §ionRewrites, &ctx](const TExprNode::TPtr& node) { @@ -75,10 +74,9 @@ private: if (NYql::HasSetting(section.Settings().Ref(), EYtSettingType::StatColumns)) { auto columnList = NYql::GetSettingAsColumnList(section.Settings().Ref(), EYtSettingType::StatColumns); - TMaybe<TString> cluster; - TVector<IYtGateway::TPathStatReq> pathStatReqs; + TMap<TString, TVector<IYtGateway::TPathStatReq>> pathStatReqsByCluster; size_t idx = 0; - ui64 totalChunkCount = 0; + THashMap<TString, ui64> totalChunkCountByCluster; for (auto path: section.Paths()) { bool hasStat = false; if (path.Table().Maybe<TYtTable>().Stat().Maybe<TYtStat>()) { @@ -108,30 +106,18 @@ private: } TYtPathInfo pathInfo(path); + const TString cluster = pathInfo.Table->Cluster; + YQL_ENSURE(cluster); YQL_ENSURE(pathInfo.Table->Stat); - totalChunkCount += pathInfo.Table->Stat->ChunkCount; + totalChunkCountByCluster[cluster] += pathInfo.Table->Stat->ChunkCount; - TString currCluster; - if (auto ytTable = path.Table().Maybe<TYtTable>()) { - currCluster = TString{ytTable.Cast().Cluster().Value()}; - } else { - currCluster = TString{GetOutputOp(path.Table().Cast<TYtOutput>()).DataSink().Cluster().Value()}; - } - YQL_ENSURE(currCluster); - - if (cluster) { - YQL_ENSURE(currCluster == *cluster); - } else { - cluster = currCluster; - } - - auto ytPath = BuildYtPathForStatRequest(*cluster, pathInfo, columnList, *State_, ctx); + auto ytPath = BuildYtPathForStatRequest(pathInfo, columnList, *State_, ctx); if (!ytPath) { hasError = true; return false; } - pathStatReqs.push_back( + pathStatReqsByCluster[cluster].push_back( IYtGateway::TPathStatReq() .Path(*ytPath) .IsTemp(pathInfo.Table->IsTemp) @@ -142,21 +128,28 @@ private: ++idx; } - bool requestExtendedStats = maxChunkCountExtendedStats && - (*maxChunkCountExtendedStats == 0 || totalChunkCount <= *maxChunkCountExtendedStats); - - if (pathStatReqs) { - auto pathStatOptions = IYtGateway::TPathStatOptions(State_->SessionId) - .Cluster(*cluster) + TVector<IYtGateway::TPathStatOptions> pathStatOptions; + for (auto& [cluster, pathStatReqs] : pathStatReqsByCluster) { + auto itCount = totalChunkCountByCluster.find(cluster); + YQL_ENSURE(itCount != totalChunkCountByCluster.end()); + const ui64 totalChunkCount = itCount->second; + bool requestExtendedStats = maxChunkCountExtendedStats && + (*maxChunkCountExtendedStats == 0 || totalChunkCount <= *maxChunkCountExtendedStats); + YQL_ENSURE(!pathStatReqs.empty()); + auto options = IYtGateway::TPathStatOptions(State_->SessionId) + .Cluster(cluster) .Paths(pathStatReqs) .Config(State_->Configuration->Snapshot()) .Extended(requestExtendedStats); - - auto tryResult = State_->Gateway->TryPathStat(IYtGateway::TPathStatOptions(pathStatOptions)); + auto tryResult = State_->Gateway->TryPathStat(IYtGateway::TPathStatOptions(options)); if (!tryResult.Success()) { - pathStatArgs.emplace_back(std::move(pathStatOptions), node.Get()); + pathStatOptions.push_back(std::move(options)); } } + + if (pathStatOptions) { + pathStatArgs.emplace_back(std::move(pathStatOptions), node.Get()); + } } } return !hasError; @@ -177,16 +170,20 @@ private: } TVector<NThreading::TFuture<void>> futures; - YQL_CLOG(INFO, ProviderYt) << "Starting " << pathStatArgs.size() << " requests for columnar stats"; + size_t reqCount = 0; + for (const auto& arg : pathStatArgs) { + reqCount += arg.first.size(); + } + YQL_CLOG(INFO, ProviderYt) << "Starting " << reqCount << " requests for columnar stats"; for (auto& arg : pathStatArgs) { - IYtGateway::TPathStatOptions& options = arg.first; + TVector<IYtGateway::TPathStatOptions>& options = arg.first; TExprNode* node = arg.second; - - auto future = State_->Gateway->PathStat(std::move(options)); - - futures.push_back(future.Apply([pathStatusState = PathStatusState, node](const NThreading::TFuture<IYtGateway::TPathStatResult>& result) { - pathStatusState->MarkReady(node, result.GetValueSync()); - })); + for (auto& opt : options) { + auto future = State_->Gateway->PathStat(std::move(opt)); + futures.push_back(future.Apply([pathStatusState = PathStatusState, node](const NThreading::TFuture<IYtGateway::TPathStatResult>& result) { + pathStatusState->AddResult(node, result.GetValueSync()); + })); + } } AsyncFuture = WaitExceptionOrAll(futures); @@ -201,26 +198,32 @@ private: TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override { output = input; - TNodeMap<IYtGateway::TPathStatResult> results = PathStatusState->PullPathStatResults(); + TNodeMap<TVector<IYtGateway::TPathStatResult>> results = PathStatusState->PullPathStatResults(); YQL_ENSURE(!results.empty()); + size_t applied = 0; + TStatus status = TStatus::Repeat; for (auto& item : results) { auto& node = item.first; - auto& result = item.second; - if (!result.Success()) { - TIssueScopeGuard issueScope(ctx.IssueManager, [&]() { - return MakeIntrusive<TIssue>( - ctx.GetPosition(node->Pos()), - TStringBuilder() << "Execution of node: " << node->Content() - ); - }); - result.ReportIssues(ctx.IssueManager); - return TStatus::Error; + auto& batch = item.second; + TIssueScopeGuard issueScope(ctx.IssueManager, [&]() { + return MakeIntrusive<TIssue>( + ctx.GetPosition(node->Pos()), + TStringBuilder() << "Execution of node: " << node->Content() + ); + }); + for (auto& result : batch) { + if (!result.Success()) { + result.ReportIssues(ctx.IssueManager); + status = status.Combine(TStatus::Error); + } + ++applied; } } - YQL_CLOG(INFO, ProviderYt) << "Applied " << results.size() << " results of columnar stats"; - return TStatus::Repeat; + YQL_CLOG(INFO, ProviderYt) << "Applied " << applied << " results of columnar stats " + << (status == TStatus::Error ? "with errors" : "successfully"); + return status; } TYtState::TPtr State_; diff --git a/yt/yql/providers/yt/provider/yql_yt_optimize.cpp b/yt/yql/providers/yt/provider/yql_yt_optimize.cpp index cd0b5e2ade..639307c92e 100644 --- a/yt/yql/providers/yt/provider/yql_yt_optimize.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_optimize.cpp @@ -533,6 +533,9 @@ IGraphTransformer::TStatus UpdateTableContentMemoryUsage(const TExprNode::TPtr& if (info->Table->Meta->IsDynamic) { useItemsCount = false; } + if (!info->Table->Cluster) { + info->Table->Cluster = maybeRead.Cast().DataSource().Cluster().StringValue(); + } records.push_back(tableRecord); tableInfos.push_back(info); } @@ -553,7 +556,7 @@ IGraphTransformer::TStatus UpdateTableContentMemoryUsage(const TExprNode::TPtr& } } if (!hasNotCalculated && !tableInfos.empty()) { - if (auto dataSizes = EstimateDataSize(TString{maybeRead.Cast().DataSource().Cluster().Value()}, tableInfos, Nothing(), *state, ctx)) { + if (auto dataSizes = EstimateDataSize(tableInfos, Nothing(), *state, ctx)) { YQL_ENSURE(dataSizes->size() == records.size()); for (size_t i: xrange(records.size())) { for (auto& factor: factors) { |