aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@yandex-team.com>2025-03-13 14:09:23 +0300
committeraneporada <aneporada@yandex-team.com>2025-03-13 14:30:25 +0300
commit202fe61b9a81ca8fff0aa72f46e3b479bb708d4f (patch)
treeb95f21baacff5c80a7e33ccb955ed59a33aa0247
parentbaa41a75d470d12510624a5e07349d34cb86ac33 (diff)
downloadydb-202fe61b9a81ca8fff0aa72f46e3b479bb708d4f.tar.gz
Adapt columnar stats for multicluster environment
commit_hash:8426c8bb44c0b1f426ccbade7274c6d0bf6eafb2
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp5
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp31
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h4
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp8
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_helpers.cpp106
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_helpers.h6
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_join_impl.cpp47
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_join_impl.h10
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp21
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp117
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_optimize.cpp5
11 files changed, 188 insertions, 172 deletions
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
index da0b5919ac..076237b9d7 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
@@ -347,7 +347,6 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EarlyMergeJoin(TExprBas
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBase node, TExprContext& ctx) const {
auto equiJoin = node.Cast<TYtEquiJoin>();
- auto cluster = equiJoin.DataSink().Cluster().StringValue();
const bool tryReorder = State_->Types->CostBasedOptimizer != ECostBasedOptimizerType::Disable
&& equiJoin.Input().Size() > 2
@@ -369,12 +368,12 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBa
if (tryReorder) {
YQL_CLOG(INFO, ProviderYt) << "Collecting cbo stats for equiJoin";
- auto collectStatus = CollectCboStats(cluster, *tree, State_, ctx);
+ auto collectStatus = CollectCboStats(*tree, State_, ctx);
if (collectStatus == TStatus::Repeat) {
return ExportYtEquiJoin(equiJoin, *tree, ctx, State_);
}
- const auto optimizedTree = OrderJoins(tree, State_, cluster, ctx);
+ const auto optimizedTree = OrderJoins(tree, State_, ctx);
if (optimizedTree != tree) {
return ExportYtEquiJoin(equiJoin, *optimizedTree, ctx, State_);
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp b/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp
index 6cf823a7e1..70bb6fed87 100644
--- a/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.cpp
@@ -21,7 +21,6 @@ void AddJoinColumns(THashMap<TString, THashSet<TString>>& relJoinColumns, const
IGraphTransformer::TStatus ExtractInMemorySize(
const TYtState::TPtr& state,
- TString cluster,
TExprContext& ctx,
TMaybe<ui64>& leftMemorySize,
TMaybe<ui64>& rightMemorySize,
@@ -46,7 +45,7 @@ IGraphTransformer::TStatus ExtractInMemorySize(
bool isCross = false;
auto status = CollectStatsAndMapJoinSettings(mode, mapSettings, leftStats, rightStats,
leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys,
- leftLeaf, rightLeaf, *state, isCross, cluster, ctx);
+ leftLeaf, rightLeaf, *state, isCross, ctx);
if (status != IGraphTransformer::TStatus::Ok) {
YQL_CLOG(WARN, ProviderYt) << "Unable to collect paths and labels: " << status;
return status;
@@ -57,7 +56,7 @@ IGraphTransformer::TStatus ExtractInMemorySize(
TVector<TString> leftJoinKeyList(leftJoinKeys.begin(), leftJoinKeys.end());
const ui64 rows = mapSettings.LeftRows;
ui64 size = 0;
- auto status = CalculateJoinLeafSize(size, mapSettings, leftLeaf->Section, *op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables);
+ auto status = CalculateJoinLeafSize(size, mapSettings, leftLeaf->Section, *op, ctx, true, leftItemType, leftJoinKeyList, state, leftTables);
if (status != IGraphTransformer::TStatus::Ok) {
YQL_CLOG(WARN, ProviderYt) << "Unable to calculate left join leaf size: " << status;
return status;
@@ -77,7 +76,7 @@ IGraphTransformer::TStatus ExtractInMemorySize(
const ui64 rows = mapSettings.RightRows;
ui64 size = 0;
- auto status = CalculateJoinLeafSize(size, mapSettings, rightLeaf->Section, *op, ctx, false, rightItemType, rightJoinKeyList, state, cluster, rightTables);
+ auto status = CalculateJoinLeafSize(size, mapSettings, rightLeaf->Section, *op, ctx, false, rightItemType, rightJoinKeyList, state, rightTables);
if (status != IGraphTransformer::TStatus::Ok) {
YQL_CLOG(WARN, ProviderYt) << "Unable to calculate right join leaf size: " << status;
return status;
@@ -94,7 +93,6 @@ IGraphTransformer::TStatus ExtractInMemorySize(
IGraphTransformer::TStatus CollectCboStatsLeaf(
const THashMap<TString, THashSet<TString>>& relJoinColumns,
- const TString& cluster,
TYtJoinNodeLeaf& leaf,
const TYtState::TPtr& state,
TExprContext& ctx)
@@ -115,36 +113,36 @@ IGraphTransformer::TStatus CollectCboStatsLeaf(
}
IYtGateway::TPathStatResult result;
- return TryEstimateDataSizeChecked(result, leaf.Section, cluster, tables, requestedColumnList, *state, ctx);
+ return TryEstimateDataSizeChecked(result, leaf.Section, tables, requestedColumnList, *state, ctx);
}
-IGraphTransformer::TStatus CollectCboStatsNode(THashMap<TString, THashSet<TString>>& relJoinColumns, const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) {
+IGraphTransformer::TStatus CollectCboStatsNode(THashMap<TString, THashSet<TString>>& relJoinColumns, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) {
TYtJoinNodeLeaf* leftLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op.Left.Get());
TYtJoinNodeLeaf* rightLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op.Right.Get());
AddJoinColumns(relJoinColumns, op);
TRelSizeInfo leftSizeInfo;
TRelSizeInfo rightSizeInfo;
- auto result = PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, state, cluster, ctx, &op);
+ auto result = PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, state, ctx, &op);
if (result != IGraphTransformer::TStatus::Ok) {
return result;
}
if (leftLeaf) {
- result = CollectCboStatsLeaf(relJoinColumns, cluster, *leftLeaf, state, ctx);
+ result = CollectCboStatsLeaf(relJoinColumns, *leftLeaf, state, ctx);
} else {
auto& leftOp = *dynamic_cast<TYtJoinNodeOp*>(op.Left.Get());
- result = CollectCboStatsNode(relJoinColumns, cluster, leftOp, state, ctx);
+ result = CollectCboStatsNode(relJoinColumns, leftOp, state, ctx);
}
if (result != IGraphTransformer::TStatus::Ok) {
return result;
}
if (rightLeaf) {
- result = CollectCboStatsLeaf(relJoinColumns, cluster, *rightLeaf, state, ctx);
+ result = CollectCboStatsLeaf(relJoinColumns, *rightLeaf, state, ctx);
} else {
auto& rightOp = *dynamic_cast<TYtJoinNodeOp*>(op.Right.Get());
- result = CollectCboStatsNode(relJoinColumns, cluster, rightOp, state, ctx);
+ result = CollectCboStatsNode(relJoinColumns, rightOp, state, ctx);
}
return result;
}
@@ -155,7 +153,6 @@ IGraphTransformer::TStatus PopulateJoinStrategySizeInfo(
TRelSizeInfo& outLeft,
TRelSizeInfo& outRight,
const TYtState::TPtr& state,
- TString cluster,
TExprContext& ctx,
TYtJoinNodeOp* op) {
auto mapJoinUseFlow = state->Configuration->MapJoinUseFlow.Get().GetOrElse(DEFAULT_MAP_JOIN_USE_FLOW);
@@ -217,22 +214,22 @@ IGraphTransformer::TStatus PopulateJoinStrategySizeInfo(
return IGraphTransformer::TStatus::Ok;
}
- auto status = ExtractInMemorySize(state, cluster, ctx, outLeft.MapJoinMemSize, outRight.MapJoinMemSize, ESizeStatCollectMode::ColumnarSize, op, labels,
+ auto status = ExtractInMemorySize(state, ctx, outLeft.MapJoinMemSize, outRight.MapJoinMemSize, ESizeStatCollectMode::ColumnarSize, op, labels,
numLeaves, leftLeaf, leftTablesReady, leftTables, leftJoinKeys, leftItemType,
rightLeaf, rightTablesReady, rightTables, rightJoinKeys, rightItemType);
if (status != IGraphTransformer::TStatus::Ok) {
return status;
}
- status = ExtractInMemorySize(state, cluster, ctx, outLeft.LookupJoinMemSize, outRight.LookupJoinMemSize, ESizeStatCollectMode::RawSize, op, labels,
+ status = ExtractInMemorySize(state, ctx, outLeft.LookupJoinMemSize, outRight.LookupJoinMemSize, ESizeStatCollectMode::RawSize, op, labels,
numLeaves, leftLeaf, leftTablesReady, leftTables, leftJoinKeys, leftItemType,
rightLeaf, rightTablesReady, rightTables, rightJoinKeys, rightItemType);
return status;
}
-IGraphTransformer::TStatus CollectCboStats(const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) {
+IGraphTransformer::TStatus CollectCboStats(TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) {
THashMap<TString, THashSet<TString>> relJoinColumns;
- return CollectCboStatsNode(relJoinColumns, cluster, op, state, ctx);
+ return CollectCboStatsNode(relJoinColumns, op, state, ctx);
}
TVector<TString> JoinLeafLabels(TExprNode::TPtr label) {
diff --git a/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h b/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h
index 0d05beea7a..fd5b894bb2 100644
--- a/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h
+++ b/yt/yql/providers/yt/provider/yql_yt_cbo_helpers.h
@@ -5,9 +5,9 @@
namespace NYql {
-IGraphTransformer::TStatus CollectCboStats(const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx);
+IGraphTransformer::TStatus CollectCboStats(TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx);
-IGraphTransformer::TStatus PopulateJoinStrategySizeInfo(TRelSizeInfo& outLeft, TRelSizeInfo& outRight, const TYtState::TPtr& state, TString cluster, TExprContext& ctx, TYtJoinNodeOp* op);
+IGraphTransformer::TStatus PopulateJoinStrategySizeInfo(TRelSizeInfo& outLeft, TRelSizeInfo& outRight, const TYtState::TPtr& state, TExprContext& ctx, TYtJoinNodeOp* op);
TVector<TString> JoinLeafLabels(TExprNode::TPtr label);
diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
index 9b691e5554..5ffc4cf1ef 100644
--- a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
@@ -157,7 +157,7 @@ public:
{
}
- TVector<TVector<ui64>> EstimateColumnStats(TExprContext& ctx, const TString& cluster, const TVector<TVector<TYtPathInfo::TPtr>>& groupIdPathInfos, ui64& sumAllTableSizes) {
+ TVector<TVector<ui64>> EstimateColumnStats(TExprContext& ctx, const TVector<TVector<TYtPathInfo::TPtr>>& groupIdPathInfos, ui64& sumAllTableSizes) {
TVector<TVector<ui64>> groupIdColumnarStats;
groupIdColumnarStats.reserve(groupIdPathInfos.size());
TVector<bool> lookupsInfo;
@@ -175,7 +175,7 @@ public:
flattenPaths.push_back(pathInfo);
}
}
- auto result = EstimateDataSize(cluster, flattenPaths, Nothing(), *State_, ctx);
+ auto result = EstimateDataSize(flattenPaths, Nothing(), *State_, ctx);
size_t statIdx = 0;
size_t pathIdx = 0;
for (const auto& [idx, pathInfos]: Enumerate(groupIdPathInfos)) {
@@ -302,7 +302,7 @@ public:
} else {
TVector<TVector<std::tuple<ui64, ui64, NYT::TRichYPath>>> partitionTuplesArr;
ui64 sumAllTableSizes = 0;
- TVector<TVector<ui64>> groupIdColumnarStats = EstimateColumnStats(ctx, cluster, {groupIdPathInfos}, sumAllTableSizes);
+ TVector<TVector<ui64>> groupIdColumnarStats = EstimateColumnStats(ctx, {groupIdPathInfos}, sumAllTableSizes);
ui64 parts = (sumAllTableSizes + dataSizePerJob - 1) / dataSizePerJob;
if (settings.CanFallback && hasErasure && parts > maxTasks) {
auto message = DqFallbackErrorMessageWrap("too big table with erasure codec");
@@ -634,7 +634,7 @@ public:
}
ui64 dataSize = 0;
for (auto& [cluster, info]: clusterToNodesAndErasure) {
- auto res = EstimateColumnStats(ctx, cluster, clusterToGroups[cluster], dataSize);
+ auto res = EstimateColumnStats(ctx, clusterToGroups[cluster], dataSize);
auto codecCpu = State_->Configuration->ErasureCodecCpuForDq.Get(cluster);
if (!codecCpu) {
continue;
diff --git a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp
index 32c1dc2359..b1868c0a4a 100644
--- a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp
@@ -188,7 +188,7 @@ bool IsYtIsolatedLambdaImpl(const TExprNode& lambdaBody, TSyncMap& syncList, TSt
}
IGraphTransformer::TStatus EstimateDataSize(IYtGateway::TPathStatResult& result, TSet<TString>& requestedColumns,
- const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths,
+ const TVector<TYtPathInfo::TPtr>& paths,
const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx, bool sync)
{
result = IYtGateway::TPathStatResult{};
@@ -199,9 +199,10 @@ IGraphTransformer::TStatus EstimateDataSize(IYtGateway::TPathStatResult& result,
const bool useColumnarStat = GetJoinCollectColumnarStatisticsMode(*state.Configuration) != EJoinCollectColumnarStatisticsMode::Disable
&& !state.Types->UseTableMetaFromGraph;
- TVector<size_t> reqMap;
TVector<IYtGateway::TPathStatReq> pathStatReqs;
- ui64 totalChunkCount = 0;
+ THashMap<TString, TVector<size_t>> reqMapByCluster;
+ TMap<TString, TVector<IYtGateway::TPathStatReq>> pathStatReqsByCluster;
+ THashMap<TString, ui64> totalChunkCountByCluster;
for (size_t i: xrange(paths.size())) {
const TYtPathInfo::TPtr& pathInfo = paths[i];
YQL_ENSURE(pathInfo->Table->Stat);
@@ -222,62 +223,83 @@ IGraphTransformer::TStatus EstimateDataSize(IYtGateway::TPathStatResult& result,
overrideColumns = columns;
}
- auto ytPath = BuildYtPathForStatRequest(cluster, *pathInfo, overrideColumns, state, ctx);
+ auto ytPath = BuildYtPathForStatRequest(*pathInfo, overrideColumns, state, ctx);
if (!ytPath) {
return IGraphTransformer::TStatus::Error;
}
if (ytPath->Columns_) {
- pathStatReqs.push_back(
+ const TString cluster = pathInfo->Table->Cluster;
+ YQL_ENSURE(cluster);
+ pathStatReqsByCluster[cluster].push_back(
IYtGateway::TPathStatReq()
.Path(*ytPath)
.IsTemp(pathInfo->Table->IsTemp)
.IsAnonymous(pathInfo->Table->IsAnonymous)
.Epoch(pathInfo->Table->Epoch.GetOrElse(0))
);
- reqMap.push_back(i);
- totalChunkCount += pathInfo->Table->Stat->ChunkCount;
+ reqMapByCluster[cluster].push_back(i);
+ totalChunkCountByCluster[cluster] += pathInfo->Table->Stat->ChunkCount;
}
}
}
- if (!pathStatReqs.empty()) {
- for (auto& req : pathStatReqs) {
- YQL_ENSURE(req.Path().Columns_);
- requestedColumns.insert(req.Path().Columns_->Parts_.begin(), req.Path().Columns_->Parts_.end());
+ if (!pathStatReqsByCluster.empty()) {
+ const TMaybe<ui64> maxChunkCountExtendedStats = state.Configuration->ExtendedStatsMaxChunkCount.Get();
+ TMap<TString, IYtGateway::TPathStatResult> pathStatsByCluster;
+ TMap<TString, NThreading::TFuture<IYtGateway::TPathStatResult>> futuresByCluster;
+ THashSet<TString> extendedStatsRequested;
+ IGraphTransformer::TStatus resultStatus = IGraphTransformer::TStatus::Ok;
+ for (const auto& [cluster, reqs] : pathStatReqsByCluster) {
+ for (auto& req : reqs) {
+ YQL_ENSURE(req.Path().Columns_);
+ requestedColumns.insert(req.Path().Columns_->Parts_.begin(), req.Path().Columns_->Parts_.end());
+ }
+ const bool requestExtendedStats = !sync && maxChunkCountExtendedStats &&
+ (*maxChunkCountExtendedStats == 0 || totalChunkCountByCluster[cluster] <= *maxChunkCountExtendedStats);
+ IYtGateway::TPathStatOptions pathStatOptions =
+ IYtGateway::TPathStatOptions(state.SessionId)
+ .Cluster(cluster)
+ .Paths(reqs)
+ .Config(state.Configuration->Snapshot())
+ .Extended(requestExtendedStats);
+ if (requestExtendedStats) {
+ extendedStatsRequested.insert(cluster);
+ }
+ if (sync) {
+ futuresByCluster[cluster] = state.Gateway->PathStat(std::move(pathStatOptions));
+ } else {
+ auto& pathStats = pathStatsByCluster[cluster];
+ pathStats = state.Gateway->TryPathStat(std::move(pathStatOptions));
+ if (!pathStats.Success()) {
+ resultStatus = resultStatus.Combine(IGraphTransformer::TStatus::Repeat);
+ }
+ }
}
- const TMaybe<ui64> maxChunkCountExtendedStats = state.Configuration->ExtendedStatsMaxChunkCount.Get();
- const bool requestExtendedStats = !sync && maxChunkCountExtendedStats &&
- (*maxChunkCountExtendedStats == 0 || totalChunkCount <= *maxChunkCountExtendedStats);
-
- IYtGateway::TPathStatResult pathStats;
- IYtGateway::TPathStatOptions pathStatOptions =
- IYtGateway::TPathStatOptions(state.SessionId)
- .Cluster(cluster)
- .Paths(pathStatReqs)
- .Config(state.Configuration->Snapshot())
- .Extended(requestExtendedStats);
- if (sync) {
- auto future = state.Gateway->PathStat(std::move(pathStatOptions));
+ for (auto& [cluster, future] : futuresByCluster) {
+ auto& pathStats = pathStatsByCluster[cluster];
pathStats = future.GetValueSync();
pathStats.ReportIssues(ctx.IssueManager);
if (!pathStats.Success()) {
- return IGraphTransformer::TStatus::Error;
- }
- } else {
- pathStats = state.Gateway->TryPathStat(std::move(pathStatOptions));
- if (!pathStats.Success()) {
- return IGraphTransformer::TStatus::Repeat;
+ resultStatus = resultStatus.Combine(IGraphTransformer::TStatus::Error);
}
}
- YQL_ENSURE(pathStats.DataSize.size() == reqMap.size());
- YQL_ENSURE(!requestExtendedStats || pathStats.Extended.size() == reqMap.size());
- for (size_t i: xrange(pathStats.DataSize.size())) {
- result.DataSize[reqMap[i]] = pathStats.DataSize[i];
- if (requestExtendedStats) {
- result.Extended[reqMap[i]] = pathStats.Extended[i];
+ if (resultStatus != IGraphTransformer::TStatus::Ok) {
+ return resultStatus;
+ }
+
+ for (auto& [cluster, pathStats] : pathStatsByCluster) {
+ auto it = reqMapByCluster.find(cluster);
+ YQL_ENSURE(it != reqMapByCluster.end());
+ YQL_ENSURE(pathStats.DataSize.size() == it->second.size());
+ YQL_ENSURE(!extendedStatsRequested.contains(cluster) || pathStats.Extended.size() == it->second.size());
+ for (size_t i: xrange(pathStats.DataSize.size())) {
+ result.DataSize[it->second[i]] = pathStats.DataSize[i];
+ if (extendedStatsRequested.contains(cluster)) {
+ result.Extended[it->second[i]] = pathStats.Extended[i];
+ }
}
}
}
@@ -1847,7 +1869,7 @@ bool IsOutputUsedMultipleTimes(const TExprNode& op, const TParentsMap& parentsMa
return node == nullptr;
}
-TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TString& cluster, const TYtPathInfo& pathInfo,
+TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TYtPathInfo& pathInfo,
const TMaybe<TVector<TString>>& overrideColumns, const TYtState& state, TExprContext& ctx)
{
auto ytPath = NYT::TRichYPath(pathInfo.Table->Name);
@@ -1858,6 +1880,8 @@ TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TString& cluster, const
if (ytPath.Columns_ && dynamic_cast<TYtTableInfo*>(pathInfo.Table.Get()) && pathInfo.Table->IsAnonymous
&& !TYtTableInfo::HasSubstAnonymousLabel(pathInfo.Table->FromNode.Cast())) {
+ const TString cluster = pathInfo.Table->Cluster;
+ YQL_ENSURE(cluster);
TString realTableName = state.AnonymousLabels.Value(std::make_pair(cluster, pathInfo.Table->Name), TString());
if (!realTableName) {
TPositionHandle pos;
@@ -1873,7 +1897,7 @@ TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TString& cluster, const
return ytPath;
}
-TMaybe<TVector<ui64>> EstimateDataSize(const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths,
+TMaybe<TVector<ui64>> EstimateDataSize(const TVector<TYtPathInfo::TPtr>& paths,
const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx)
{
TVector<ui64> result;
@@ -1882,7 +1906,7 @@ TMaybe<TVector<ui64>> EstimateDataSize(const TString& cluster, const TVector<TYt
bool sync = true;
IYtGateway::TPathStatResult res;
- auto status = EstimateDataSize(res, requestedColumns, cluster, paths, columns, state, ctx, sync);
+ auto status = EstimateDataSize(res, requestedColumns, paths, columns, state, ctx, sync);
if (status != IGraphTransformer::TStatus::Ok) {
return {};
}
@@ -1891,11 +1915,11 @@ TMaybe<TVector<ui64>> EstimateDataSize(const TString& cluster, const TVector<TYt
}
IGraphTransformer::TStatus TryEstimateDataSize(IYtGateway::TPathStatResult& result, TSet<TString>& requestedColumns,
- const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths,
+ const TVector<TYtPathInfo::TPtr>& paths,
const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx)
{
bool sync = false;
- return EstimateDataSize(result, requestedColumns, cluster, paths, columns, state, ctx, sync);
+ return EstimateDataSize(result, requestedColumns, paths, columns, state, ctx, sync);
}
TYtSection UpdateInputFields(TYtSection section, TExprBase fields, TExprContext& ctx) {
diff --git a/yt/yql/providers/yt/provider/yql_yt_helpers.h b/yt/yql/providers/yt/provider/yql_yt_helpers.h
index 61a1c4bab5..3a821b16d9 100644
--- a/yt/yql/providers/yt/provider/yql_yt_helpers.h
+++ b/yt/yql/providers/yt/provider/yql_yt_helpers.h
@@ -90,12 +90,12 @@ NNodes::TYtPath CopyOrTrivialMap(TPositionHandle pos, NNodes::TExprBase world, N
const TCopyOrTrivialMapOpts& opts);
bool IsOutputUsedMultipleTimes(const TExprNode& op, const TParentsMap& parentsMap);
-TMaybe<TVector<ui64>> EstimateDataSize(const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths,
+TMaybe<TVector<ui64>> EstimateDataSize( const TVector<TYtPathInfo::TPtr>& paths,
const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx);
IGraphTransformer::TStatus TryEstimateDataSize(IYtGateway::TPathStatResult& result, TSet<TString>& requestedColumns,
- const TString& cluster, const TVector<TYtPathInfo::TPtr>& paths,
+ const TVector<TYtPathInfo::TPtr>& paths,
const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx);
-TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TString& cluster, const TYtPathInfo& pathInfo,
+TMaybe<NYT::TRichYPath> BuildYtPathForStatRequest(const TYtPathInfo& pathInfo,
const TMaybe<TVector<TString>>& overrideColumns, const TYtState& state, TExprContext& ctx);
NNodes::TYtSection UpdateInputFields(NNodes::TYtSection section, NNodes::TExprBase fields, TExprContext& ctx);
diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
index 9f663d2897..f4942ab18d 100644
--- a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
@@ -193,11 +193,11 @@ bool HasNonTrivialAny(const TEquiJoinLinkSettings& linkSettings, const TMapJoinS
TStatus UpdateInMemorySizeSetting(TMapJoinSettings& settings, TYtSection& inputSection, const TJoinLabels& labels,
const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft,
- const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster,
+ const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state,
const TVector<TYtPathInfo::TPtr>& tables, bool mapJoinUseFlow)
{
ui64 size = 0;
- auto status = CalculateJoinLeafSize(size, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, cluster, tables);
+ auto status = CalculateJoinLeafSize(size, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, tables);
if (status != TStatus::Ok) {
return status;
}
@@ -230,11 +230,11 @@ TStatus UpdateInMemorySizeSetting(TMapJoinSettings& settings, TYtSection& inputS
TStatus UpdateInMemorySizeUsingBlocksSetting(TMapJoinSettings& settings, TYtSection& inputSection,
const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft,
- const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster,
+ const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state,
const TVector<TYtPathInfo::TPtr>& tables)
{
ui64 dataSize = 0;
- auto status = CalculateJoinLeafSize(dataSize, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, cluster, tables);
+ auto status = CalculateJoinLeafSize(dataSize, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, tables);
if (status != TStatus::Ok) {
return status;
}
@@ -3050,7 +3050,7 @@ bool RewriteYtEmptyJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, TYtJoin
}
TStatus CollectJoinSideStats(ESizeStatCollectMode sizeMode, TJoinSideStats& stats, TYtSection& inputSection,
- const TYtState& state, const TString& cluster,
+ const TYtState& state,
const TVector<TYtPathInfo::TPtr>& tableInfo, const THashSet<TString>& joinKeys,
bool isCross, TMaybeNode<TCoLambda> premap, TExprContext& ctx)
{
@@ -3103,7 +3103,7 @@ TStatus CollectJoinSideStats(ESizeStatCollectMode sizeMode, TJoinSideStats& stat
}
IYtGateway::TPathStatResult pathStatResult;
- auto status = TryEstimateDataSizeChecked(pathStatResult, inputSection, cluster, tableInfo, {}, state, ctx);
+ auto status = TryEstimateDataSizeChecked(pathStatResult, inputSection, tableInfo, {}, state, ctx);
if (status.Level != TStatus::Ok) {
return status;
}
@@ -3271,8 +3271,6 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
return TStatus::Repeat;
}
- auto cluster = TString{equiJoin.DataSink().Cluster().Value()};
-
TMapJoinSettings mapSettings;
TJoinSideStats leftStats;
TJoinSideStats rightStats;
@@ -3281,7 +3279,7 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
if (allowLookupJoin) {
auto status = CollectStatsAndMapJoinSettings(ESizeStatCollectMode::RawSize, mapSettings, leftStats, rightStats,
leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys,
- &leftLeaf, &rightLeaf, *state, isCross, cluster, ctx);
+ &leftLeaf, &rightLeaf, *state, isCross, ctx);
if (status.Level != TStatus::Ok) {
return (status.Level == TStatus::Repeat) ? TStatus::Ok : status;
}
@@ -3352,7 +3350,7 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
{
auto status = CollectStatsAndMapJoinSettings(ESizeStatCollectMode::ColumnarSize, mapSettings, leftStats, rightStats,
leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys,
- &leftLeaf, &rightLeaf, *state, isCross, cluster, ctx);
+ &leftLeaf, &rightLeaf, *state, isCross, ctx);
if (status.Level != TStatus::Ok) {
return (status.Level == TStatus::Repeat) ? TStatus::Ok : status;
}
@@ -3624,13 +3622,13 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
bool mapJoinUseBlocks = state->Configuration->BlockMapJoin.Get().GetOrElse(state->Types->UseBlocks);
if (leftTablesReady) {
- auto status = UpdateInMemorySizeSetting(mapSettings, leftLeaf.Section, labels, op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables, mapJoinUseFlow);
+ auto status = UpdateInMemorySizeSetting(mapSettings, leftLeaf.Section, labels, op, ctx, true, leftItemType, leftJoinKeyList, state, leftTables, mapJoinUseFlow);
if (status.Level != TStatus::Ok) {
return (status.Level == TStatus::Repeat) ? TStatus::Ok : status;
}
if (mapJoinUseBlocks) {
- auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, leftLeaf.Section, op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables);
+ auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, leftLeaf.Section, op, ctx, true, leftItemType, leftJoinKeyList, state, leftTables);
if (status.Level != TStatus::Ok) {
return (status.Level == TStatus::Repeat) ? TStatus::Ok : status;
}
@@ -3638,13 +3636,13 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
}
if (rightTablesReady) {
- auto status = UpdateInMemorySizeSetting(mapSettings, rightLeaf.Section, labels, op, ctx, false, rightItemType, rightJoinKeyList, state, cluster, rightTables, mapJoinUseFlow);
+ auto status = UpdateInMemorySizeSetting(mapSettings, rightLeaf.Section, labels, op, ctx, false, rightItemType, rightJoinKeyList, state, rightTables, mapJoinUseFlow);
if (status.Level != TStatus::Ok) {
return (status.Level == TStatus::Repeat) ? TStatus::Ok : status;
}
if (mapJoinUseBlocks) {
- auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, rightLeaf.Section, op, ctx, false, rightItemType, rightJoinKeyList, state, cluster, rightTables);
+ auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, rightLeaf.Section, op, ctx, false, rightItemType, rightJoinKeyList, state, rightTables);
if (status.Level != TStatus::Ok) {
return (status.Level == TStatus::Repeat) ? TStatus::Ok : status;
}
@@ -3947,9 +3945,6 @@ void CollectPossibleStarJoins(const TYtEquiJoin& equiJoin, TYtJoinNodeOp& op, co
rightJoinKeyList = BuildJoinKeyList(labels.Inputs[leftLeaf ? 1 : 0], *op.RightLabel);
}
-
- auto cluster = TString{equiJoin.DataSink().Cluster().Value()};
-
TMapJoinSettings mapSettings;
TJoinSideStats leftStats;
TJoinSideStats rightStats;
@@ -3958,7 +3953,7 @@ void CollectPossibleStarJoins(const TYtEquiJoin& equiJoin, TYtJoinNodeOp& op, co
bool isCross = false;
auto status = CollectStatsAndMapJoinSettings(ESizeStatCollectMode::NoSize, mapSettings, leftStats, rightStats,
leftTablesReady, leftTables, leftJoinKeys, rightTablesReady, rightTables, rightJoinKeys,
- leftLeaf, rightLeaf, *state, isCross, cluster, ctx);
+ leftLeaf, rightLeaf, *state, isCross, ctx);
switch (status.Level) {
case TStatus::Error:
@@ -4866,12 +4861,12 @@ EStarRewriteStatus RewriteYtEquiJoinStar(TYtEquiJoin equiJoin, TYtJoinNodeOp& op
} // namespace
-IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResult& result, TYtSection& inputSection, const TString& cluster,
+IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResult& result, TYtSection& inputSection,
const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx)
{
result = IYtGateway::TPathStatResult();
if (GetJoinCollectColumnarStatisticsMode(*state.Configuration) == EJoinCollectColumnarStatisticsMode::Sync) {
- auto syncResult = EstimateDataSize(cluster, paths, columns, state, ctx);
+ auto syncResult = EstimateDataSize(paths, columns, state, ctx);
if (!syncResult) {
return IGraphTransformer::TStatus::Error;
}
@@ -4881,7 +4876,7 @@ IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResul
}
TSet<TString> requestedColumns;
- auto status = TryEstimateDataSize(result, requestedColumns, cluster, paths, columns, state, ctx);
+ auto status = TryEstimateDataSize(result, requestedColumns, paths, columns, state, ctx);
auto settings = inputSection.Settings().Ptr();
if (status == TStatus::Repeat) {
bool hasStatColumns = NYql::HasSetting(inputSection.Settings().Ref(), EYtSettingType::StatColumns);
@@ -4935,7 +4930,7 @@ TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode sizeMode, TMapJoinSe
bool leftTablesReady, const TVector<TYtPathInfo::TPtr>& leftTables, const THashSet<TString>& leftJoinKeys,
bool rightTablesReady, const TVector<TYtPathInfo::TPtr>& rightTables, const THashSet<TString>& rightJoinKeys,
TYtJoinNodeLeaf* leftLeaf, TYtJoinNodeLeaf* rightLeaf, const TYtState& state, bool isCross,
- TString cluster, TExprContext& ctx)
+ TExprContext& ctx)
{
mapSettings = {};
leftStats = {};
@@ -4943,7 +4938,7 @@ TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode sizeMode, TMapJoinSe
if (leftLeaf) {
auto premap = GetPremapLambda(*leftLeaf);
- auto joinSideStatus = CollectJoinSideStats(leftTablesReady ? sizeMode : ESizeStatCollectMode::NoSize, leftStats, leftLeaf->Section, state, cluster,
+ auto joinSideStatus = CollectJoinSideStats(leftTablesReady ? sizeMode : ESizeStatCollectMode::NoSize, leftStats, leftLeaf->Section, state,
leftTables, leftJoinKeys, isCross, premap, ctx);
if (joinSideStatus.Level != TStatus::Ok) {
return joinSideStatus;
@@ -4959,7 +4954,7 @@ TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode sizeMode, TMapJoinSe
if (rightLeaf) {
auto premap = GetPremapLambda(*rightLeaf);
- auto joinSideStatus = CollectJoinSideStats(rightTablesReady ? sizeMode : ESizeStatCollectMode::NoSize, rightStats, rightLeaf->Section, state, cluster,
+ auto joinSideStatus = CollectJoinSideStats(rightTablesReady ? sizeMode : ESizeStatCollectMode::NoSize, rightStats, rightLeaf->Section, state,
rightTables, rightJoinKeys, isCross, premap, ctx);
if (joinSideStatus.Level != TStatus::Ok) {
return joinSideStatus;
@@ -4983,7 +4978,7 @@ TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode sizeMode, TMapJoinSe
TStatus CalculateJoinLeafSize(ui64& result, TMapJoinSettings& settings, TYtSection& inputSection,
const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft,
- const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster,
+ const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state,
const TVector<TYtPathInfo::TPtr>& tables)
{
result = isLeft ? settings.LeftSize : settings.RightSize;
@@ -4992,7 +4987,7 @@ TStatus CalculateJoinLeafSize(ui64& result, TMapJoinSettings& settings, TYtSecti
if (!needPayload && !op.JoinKind->IsAtom("Cross")) {
if (joinKeyList.size() < itemType->GetSize()) {
IYtGateway::TPathStatResult pathStatResult;
- auto status = TryEstimateDataSizeChecked(pathStatResult, inputSection, cluster, tables, joinKeyList, *state, ctx);
+ auto status = TryEstimateDataSizeChecked(pathStatResult, inputSection, tables, joinKeyList, *state, ctx);
if (status.Level != TStatus::Ok) {
return status;
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.h b/yt/yql/providers/yt/provider/yql_yt_join_impl.h
index d8702fa43e..80f79f4d3b 100644
--- a/yt/yql/providers/yt/provider/yql_yt_join_impl.h
+++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.h
@@ -72,12 +72,12 @@ TYtJoinNodeOp::TPtr ImportYtEquiJoin(TYtEquiJoin equiJoin, TExprContext& ctx);
IGraphTransformer::TStatus RewriteYtEquiJoinLeaves(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx);
IGraphTransformer::TStatus RewriteYtEquiJoin(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx);
TMaybeNode<TExprBase> ExportYtEquiJoin(TYtEquiJoin equiJoin, const TYtJoinNodeOp& op, TExprContext& ctx, const TYtState::TPtr& state);
-TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, const TString& cluster, TExprContext& ctx, bool debug = false);
+TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, TExprContext& ctx, bool debug = false);
struct IBaseOptimizerNode;
struct IProviderContext;
-void BuildOptimizerJoinTree(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TOptimizerLinkSettings& settings, TYtJoinNodeOp::TPtr op, TExprContext& ctx);
+void BuildOptimizerJoinTree(TYtState::TPtr state, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TOptimizerLinkSettings& settings, TYtJoinNodeOp::TPtr op, TExprContext& ctx);
TYtJoinNode::TPtr BuildYtJoinTree(std::shared_ptr<IBaseOptimizerNode> node, TExprContext& ctx, TPositionHandle pos);
bool AreSimilarTrees(TYtJoinNode::TPtr node1, TYtJoinNode::TPtr node2);
@@ -89,7 +89,7 @@ IGraphTransformer::TStatus CollectPathsAndLabelsReady(
IGraphTransformer::TStatus CalculateJoinLeafSize(ui64& result, TMapJoinSettings& settings, TYtSection& inputSection,
const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft,
- const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster,
+ const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state,
const TVector<TYtPathInfo::TPtr>& tables);
enum class ESizeStatCollectMode {
@@ -115,9 +115,9 @@ IGraphTransformer::TStatus CollectStatsAndMapJoinSettings(ESizeStatCollectMode s
bool leftTablesReady, const TVector<TYtPathInfo::TPtr>& leftTables, const THashSet<TString>& leftJoinKeys,
bool rightTablesReady, const TVector<TYtPathInfo::TPtr>& rightTables, const THashSet<TString>& rightJoinKeys,
TYtJoinNodeLeaf* leftLeaf, TYtJoinNodeLeaf* rightLeaf, const TYtState& state, bool isCross,
- TString cluster, TExprContext& ctx);
+ TExprContext& ctx);
-IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResult& result, TYtSection& inputSection, const TString& cluster,
+IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResult& result, TYtSection& inputSection,
const TVector<TYtPathInfo::TPtr>& paths, const TMaybe<TVector<TString>>& columns, const TYtState& state, TExprContext& ctx);
ui64 CalcInMemorySizeNoCrossJoin(const TJoinLabel& label, const TYtJoinNodeOp& op, const TMapJoinSettings& settings, bool isLeft,
diff --git a/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp b/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp
index a195a8f5d0..ca808b621b 100644
--- a/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_join_reorder.cpp
@@ -58,12 +58,10 @@ public:
TJoinReorderer(
TYtJoinNodeOp::TPtr op,
const TYtState::TPtr& state,
- const TString& cluster,
TExprContext& ctx,
bool debug = false)
: Root(op)
, State(state)
- , Cluster(cluster)
, Ctx(ctx)
, Debug(debug)
{
@@ -78,7 +76,7 @@ public:
std::shared_ptr<IBaseOptimizerNode> tree;
TOptimizerLinkSettings linkSettings;
std::shared_ptr<IProviderContext> providerCtx;
- BuildOptimizerJoinTree(State, Cluster, tree, providerCtx, linkSettings, Root, Ctx);
+ BuildOptimizerJoinTree(State, tree, providerCtx, linkSettings, Root, Ctx);
auto ytCtx = std::static_pointer_cast<TYtProviderContext>(providerCtx);
std::function<void(const TString& str)> log;
@@ -139,7 +137,6 @@ public:
private:
TYtJoinNodeOp::TPtr Root;
const TYtState::TPtr& State;
- TString Cluster;
TExprContext& Ctx;
bool Debug;
};
@@ -177,9 +174,8 @@ class TOptimizerTreeBuilder
{
public:
TOptimizerLinkSettings LinkSettings;
- TOptimizerTreeBuilder(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TYtJoinNodeOp::TPtr inputTree, TExprContext& ctx)
+ TOptimizerTreeBuilder(TYtState::TPtr state, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TYtJoinNodeOp::TPtr inputTree, TExprContext& ctx)
: State(state)
- , Cluster(cluster)
, Tree(tree)
, OutProviderCtx(providerCtx)
, InputTree(inputTree)
@@ -258,7 +254,7 @@ private:
}
TRelSizeInfo leftSizeInfo;
TRelSizeInfo rightSizeInfo;
- PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, State, Cluster, Ctx, op);
+ PopulateJoinStrategySizeInfo(leftSizeInfo, rightSizeInfo, State, Ctx, op);
auto left = ProcessNode(op->Left, leftSizeInfo);
auto right = ProcessNode(op->Right, rightSizeInfo);
@@ -374,7 +370,7 @@ private:
TSet<TString> requestedColumns;
IYtGateway::TPathStatResult result;
- auto status = TryEstimateDataSize(result, requestedColumns, Cluster, paths, columns, *State, Ctx);
+ auto status = TryEstimateDataSize(result, requestedColumns, paths, columns, *State, Ctx);
YQL_ENSURE(status != IGraphTransformer::TStatus::Error);
if (status != IGraphTransformer::TStatus::Ok) {
YQL_CLOG(WARN, ProviderYt) << "Unable to read path stats that must be already present in cache";
@@ -434,7 +430,6 @@ private:
}
TYtState::TPtr State;
- const TString Cluster;
std::shared_ptr<IBaseOptimizerNode>& Tree;
std::shared_ptr<IProviderContext>& OutProviderCtx;
THashMap<TString, THashSet<TString>> RelJoinColumns;
@@ -517,9 +512,9 @@ bool AreSimilarTrees(TYtJoinNode::TPtr node1, TYtJoinNode::TPtr node2) {
}
}
-void BuildOptimizerJoinTree(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TOptimizerLinkSettings& linkSettings, TYtJoinNodeOp::TPtr op, TExprContext& ctx)
+void BuildOptimizerJoinTree(TYtState::TPtr state, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TOptimizerLinkSettings& linkSettings, TYtJoinNodeOp::TPtr op, TExprContext& ctx)
{
- TOptimizerTreeBuilder builder(state, cluster, tree, providerCtx, op, ctx);
+ TOptimizerTreeBuilder builder(state, tree, providerCtx, op, ctx);
builder.Do();
linkSettings = builder.LinkSettings;
}
@@ -529,13 +524,13 @@ TYtJoinNode::TPtr BuildYtJoinTree(std::shared_ptr<IBaseOptimizerNode> node, TExp
return BuildYtJoinTree(node, scope, ctx, pos);
}
-TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, const TString& cluster, TExprContext& ctx, bool debug)
+TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, TExprContext& ctx, bool debug)
{
if (state->Types->CostBasedOptimizer == ECostBasedOptimizerType::Disable || op->CostBasedOptPassed) {
return op;
}
- auto result = TJoinReorderer(op, state, cluster, ctx, debug).Do();
+ auto result = TJoinReorderer(op, state, ctx, debug).Do();
if (!debug && AreSimilarTrees(result, op)) {
return op;
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp b/yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp
index 789938e02f..525c967853 100644
--- a/yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp
@@ -29,22 +29,21 @@ public:
YQL_ENSURE(PathStatResults.empty());
}
- TNodeMap<IYtGateway::TPathStatResult> PullPathStatResults() {
- TNodeMap<IYtGateway::TPathStatResult> results;
+ TNodeMap<TVector<IYtGateway::TPathStatResult>> PullPathStatResults() {
+ TNodeMap<TVector<IYtGateway::TPathStatResult>> results;
TGuard<TMutex> guard(Lock);
results.swap(PathStatResults);
return results;
}
- void MarkReady(TExprNode* node, const IYtGateway::TPathStatResult& result) {
+ void AddResult(TExprNode* node, const IYtGateway::TPathStatResult& result) {
TGuard<TMutex> guard(Lock);
- YQL_ENSURE(PathStatResults.count(node) == 0);
- PathStatResults[node] = result;
+ PathStatResults[node].push_back(result);
}
private:
mutable TMutex Lock;
- TNodeMap<IYtGateway::TPathStatResult> PathStatResults;
+ TNodeMap<TVector<IYtGateway::TPathStatResult>> PathStatResults;
};
class TYtLoadColumnarStatsTransformer : public TGraphTransformerBase {
@@ -65,7 +64,7 @@ private:
output = input;
PathStatusState->EnsureNoInflightRequests();
- TVector<std::pair<IYtGateway::TPathStatOptions, TExprNode*>> pathStatArgs;
+ TVector<std::pair<TVector<IYtGateway::TPathStatOptions>, TExprNode*>> pathStatArgs;
bool hasError = false;
TNodeOnNodeOwnedMap sectionRewrites;
VisitExpr(input, [this, &pathStatArgs, &hasError, &sectionRewrites, &ctx](const TExprNode::TPtr& node) {
@@ -75,10 +74,9 @@ private:
if (NYql::HasSetting(section.Settings().Ref(), EYtSettingType::StatColumns)) {
auto columnList = NYql::GetSettingAsColumnList(section.Settings().Ref(), EYtSettingType::StatColumns);
- TMaybe<TString> cluster;
- TVector<IYtGateway::TPathStatReq> pathStatReqs;
+ TMap<TString, TVector<IYtGateway::TPathStatReq>> pathStatReqsByCluster;
size_t idx = 0;
- ui64 totalChunkCount = 0;
+ THashMap<TString, ui64> totalChunkCountByCluster;
for (auto path: section.Paths()) {
bool hasStat = false;
if (path.Table().Maybe<TYtTable>().Stat().Maybe<TYtStat>()) {
@@ -108,30 +106,18 @@ private:
}
TYtPathInfo pathInfo(path);
+ const TString cluster = pathInfo.Table->Cluster;
+ YQL_ENSURE(cluster);
YQL_ENSURE(pathInfo.Table->Stat);
- totalChunkCount += pathInfo.Table->Stat->ChunkCount;
+ totalChunkCountByCluster[cluster] += pathInfo.Table->Stat->ChunkCount;
- TString currCluster;
- if (auto ytTable = path.Table().Maybe<TYtTable>()) {
- currCluster = TString{ytTable.Cast().Cluster().Value()};
- } else {
- currCluster = TString{GetOutputOp(path.Table().Cast<TYtOutput>()).DataSink().Cluster().Value()};
- }
- YQL_ENSURE(currCluster);
-
- if (cluster) {
- YQL_ENSURE(currCluster == *cluster);
- } else {
- cluster = currCluster;
- }
-
- auto ytPath = BuildYtPathForStatRequest(*cluster, pathInfo, columnList, *State_, ctx);
+ auto ytPath = BuildYtPathForStatRequest(pathInfo, columnList, *State_, ctx);
if (!ytPath) {
hasError = true;
return false;
}
- pathStatReqs.push_back(
+ pathStatReqsByCluster[cluster].push_back(
IYtGateway::TPathStatReq()
.Path(*ytPath)
.IsTemp(pathInfo.Table->IsTemp)
@@ -142,21 +128,28 @@ private:
++idx;
}
- bool requestExtendedStats = maxChunkCountExtendedStats &&
- (*maxChunkCountExtendedStats == 0 || totalChunkCount <= *maxChunkCountExtendedStats);
-
- if (pathStatReqs) {
- auto pathStatOptions = IYtGateway::TPathStatOptions(State_->SessionId)
- .Cluster(*cluster)
+ TVector<IYtGateway::TPathStatOptions> pathStatOptions;
+ for (auto& [cluster, pathStatReqs] : pathStatReqsByCluster) {
+ auto itCount = totalChunkCountByCluster.find(cluster);
+ YQL_ENSURE(itCount != totalChunkCountByCluster.end());
+ const ui64 totalChunkCount = itCount->second;
+ bool requestExtendedStats = maxChunkCountExtendedStats &&
+ (*maxChunkCountExtendedStats == 0 || totalChunkCount <= *maxChunkCountExtendedStats);
+ YQL_ENSURE(!pathStatReqs.empty());
+ auto options = IYtGateway::TPathStatOptions(State_->SessionId)
+ .Cluster(cluster)
.Paths(pathStatReqs)
.Config(State_->Configuration->Snapshot())
.Extended(requestExtendedStats);
-
- auto tryResult = State_->Gateway->TryPathStat(IYtGateway::TPathStatOptions(pathStatOptions));
+ auto tryResult = State_->Gateway->TryPathStat(IYtGateway::TPathStatOptions(options));
if (!tryResult.Success()) {
- pathStatArgs.emplace_back(std::move(pathStatOptions), node.Get());
+ pathStatOptions.push_back(std::move(options));
}
}
+
+ if (pathStatOptions) {
+ pathStatArgs.emplace_back(std::move(pathStatOptions), node.Get());
+ }
}
}
return !hasError;
@@ -177,16 +170,20 @@ private:
}
TVector<NThreading::TFuture<void>> futures;
- YQL_CLOG(INFO, ProviderYt) << "Starting " << pathStatArgs.size() << " requests for columnar stats";
+ size_t reqCount = 0;
+ for (const auto& arg : pathStatArgs) {
+ reqCount += arg.first.size();
+ }
+ YQL_CLOG(INFO, ProviderYt) << "Starting " << reqCount << " requests for columnar stats";
for (auto& arg : pathStatArgs) {
- IYtGateway::TPathStatOptions& options = arg.first;
+ TVector<IYtGateway::TPathStatOptions>& options = arg.first;
TExprNode* node = arg.second;
-
- auto future = State_->Gateway->PathStat(std::move(options));
-
- futures.push_back(future.Apply([pathStatusState = PathStatusState, node](const NThreading::TFuture<IYtGateway::TPathStatResult>& result) {
- pathStatusState->MarkReady(node, result.GetValueSync());
- }));
+ for (auto& opt : options) {
+ auto future = State_->Gateway->PathStat(std::move(opt));
+ futures.push_back(future.Apply([pathStatusState = PathStatusState, node](const NThreading::TFuture<IYtGateway::TPathStatResult>& result) {
+ pathStatusState->AddResult(node, result.GetValueSync());
+ }));
+ }
}
AsyncFuture = WaitExceptionOrAll(futures);
@@ -201,26 +198,32 @@ private:
TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
output = input;
- TNodeMap<IYtGateway::TPathStatResult> results = PathStatusState->PullPathStatResults();
+ TNodeMap<TVector<IYtGateway::TPathStatResult>> results = PathStatusState->PullPathStatResults();
YQL_ENSURE(!results.empty());
+ size_t applied = 0;
+ TStatus status = TStatus::Repeat;
for (auto& item : results) {
auto& node = item.first;
- auto& result = item.second;
- if (!result.Success()) {
- TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
- return MakeIntrusive<TIssue>(
- ctx.GetPosition(node->Pos()),
- TStringBuilder() << "Execution of node: " << node->Content()
- );
- });
- result.ReportIssues(ctx.IssueManager);
- return TStatus::Error;
+ auto& batch = item.second;
+ TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
+ return MakeIntrusive<TIssue>(
+ ctx.GetPosition(node->Pos()),
+ TStringBuilder() << "Execution of node: " << node->Content()
+ );
+ });
+ for (auto& result : batch) {
+ if (!result.Success()) {
+ result.ReportIssues(ctx.IssueManager);
+ status = status.Combine(TStatus::Error);
+ }
+ ++applied;
}
}
- YQL_CLOG(INFO, ProviderYt) << "Applied " << results.size() << " results of columnar stats";
- return TStatus::Repeat;
+ YQL_CLOG(INFO, ProviderYt) << "Applied " << applied << " results of columnar stats "
+ << (status == TStatus::Error ? "with errors" : "successfully");
+ return status;
}
TYtState::TPtr State_;
diff --git a/yt/yql/providers/yt/provider/yql_yt_optimize.cpp b/yt/yql/providers/yt/provider/yql_yt_optimize.cpp
index cd0b5e2ade..639307c92e 100644
--- a/yt/yql/providers/yt/provider/yql_yt_optimize.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_optimize.cpp
@@ -533,6 +533,9 @@ IGraphTransformer::TStatus UpdateTableContentMemoryUsage(const TExprNode::TPtr&
if (info->Table->Meta->IsDynamic) {
useItemsCount = false;
}
+ if (!info->Table->Cluster) {
+ info->Table->Cluster = maybeRead.Cast().DataSource().Cluster().StringValue();
+ }
records.push_back(tableRecord);
tableInfos.push_back(info);
}
@@ -553,7 +556,7 @@ IGraphTransformer::TStatus UpdateTableContentMemoryUsage(const TExprNode::TPtr&
}
}
if (!hasNotCalculated && !tableInfos.empty()) {
- if (auto dataSizes = EstimateDataSize(TString{maybeRead.Cast().DataSource().Cluster().Value()}, tableInfos, Nothing(), *state, ctx)) {
+ if (auto dataSizes = EstimateDataSize(tableInfos, Nothing(), *state, ctx)) {
YQL_ENSURE(dataSizes->size() == records.size());
for (size_t i: xrange(records.size())) {
for (auto& factor: factors) {