aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry O <alephonea@users.noreply.github.com>2024-09-10 15:36:56 +0300
committerGitHub <noreply@github.com>2024-09-10 15:36:56 +0300
commita7d2186ccb9393cd6d569baaa3579dbfa4bd2303 (patch)
tree386544502edcdc31f570660a4dc905b9aebf6ec2
parentf11c0b20957bbb676ef18ca71b014c4d102de83d (diff)
downloadydb-a7d2186ccb9393cd6d569baaa3579dbfa4bd2303.tar.gz
HLL in YT statistics (#8184)
-rw-r--r--ydb/library/yql/providers/yt/common/yql_yt_settings.cpp1
-rw-r--r--ydb/library/yql/providers/yt/common/yql_yt_settings.h1
-rw-r--r--ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp58
-rw-r--r--ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h12
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp84
-rw-r--r--ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp15
-rw-r--r--ydb/library/yql/providers/yt/provider/ut/yql_yt_cbo_ut.cpp48
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_gateway.h8
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp130
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_join_impl.h8
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp144
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp9
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_provider_context.h17
13 files changed, 446 insertions, 89 deletions
diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp
index 7ce041e525..eab8125f67 100644
--- a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp
+++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp
@@ -484,6 +484,7 @@ TYtConfiguration::TYtConfiguration()
});
REGISTER_SETTING(*this, MinColumnGroupSize).Lower(2);
REGISTER_SETTING(*this, MaxColumnGroups);
+ REGISTER_SETTING(*this, ExtendedStatsMaxChunkCount);
}
EReleaseTempDataMode GetReleaseTempDataMode(const TYtSettings& settings) {
diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.h b/ydb/library/yql/providers/yt/common/yql_yt_settings.h
index 0dc87fa39e..d39c1bdb4a 100644
--- a/ydb/library/yql/providers/yt/common/yql_yt_settings.h
+++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.h
@@ -280,6 +280,7 @@ struct TYtSettings {
NCommon::TConfSetting<EColumnGroupMode, false> ColumnGroupMode;
NCommon::TConfSetting<ui16, false> MinColumnGroupSize;
NCommon::TConfSetting<ui16, false> MaxColumnGroups;
+ NCommon::TConfSetting<ui64, false> ExtendedStatsMaxChunkCount;
};
EReleaseTempDataMode GetReleaseTempDataMode(const TYtSettings& settings);
diff --git a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp
index 7b81c99ce8..485fe97024 100644
--- a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp
+++ b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp
@@ -101,9 +101,9 @@ TMaybe<ui64> TTransactionCache::TEntry::GetColumnarStat(NYT::TRichYPath ytPath)
auto guard = Guard(Lock_);
if (auto p = StatisticsCache.FindPtr(NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text))) {
- ui64 sum = p->LegacyChunksDataWeight;
+ ui64 sum = p->ColumnarStat.LegacyChunksDataWeight;
for (auto& column: columns) {
- if (auto c = p->ColumnDataWeight.FindPtr(column)) {
+ if (auto c = p->ColumnarStat.ColumnDataWeight.FindPtr(column)) {
sum += *c;
} else {
return Nothing();
@@ -114,30 +114,66 @@ TMaybe<ui64> TTransactionCache::TEntry::GetColumnarStat(NYT::TRichYPath ytPath)
return Nothing();
}
+TMaybe<NYT::TTableColumnarStatistics> TTransactionCache::TEntry::GetExtendedColumnarStat(NYT::TRichYPath ytPath) const {
+ TVector<TString> columns(std::move(ytPath.Columns_->Parts_));
+ ytPath.Columns_.Clear();
+ auto cacheKey = NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text);
+
+ auto guard = Guard(Lock_);
+ auto p = StatisticsCache.FindPtr(cacheKey);
+ if (!p) {
+ return Nothing();
+ }
+
+ NYT::TTableColumnarStatistics res;
+ for (auto& column: columns) {
+ if (p->ExtendedStatColumns.count(column) == 0) {
+ return Nothing();
+ }
+ if (auto c = p->ColumnarStat.ColumnDataWeight.FindPtr(column)) {
+ res.ColumnDataWeight[column] = *c;
+ }
+ if (auto c = p->ColumnarStat.ColumnEstimatedUniqueCounts.FindPtr(column)) {
+ res.ColumnEstimatedUniqueCounts[column] = *c;
+ }
+ }
+ return res;
+}
+
void TTransactionCache::TEntry::UpdateColumnarStat(NYT::TRichYPath ytPath, ui64 size) {
YQL_ENSURE(ytPath.Columns_.Defined());
TVector<TString> columns(std::move(ytPath.Columns_->Parts_));
ytPath.Columns_.Clear();
+ auto cacheKey = NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text);
auto guard = Guard(Lock_);
- NYT::TTableColumnarStatistics& cacheColumnStat = StatisticsCache[NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text)];
- cacheColumnStat.LegacyChunksDataWeight = size;
- for (auto& c: cacheColumnStat.ColumnDataWeight) {
+ auto& cacheEntry = StatisticsCache[cacheKey];
+ cacheEntry.ColumnarStat.LegacyChunksDataWeight = size;
+ for (auto& c: cacheEntry.ColumnarStat.ColumnDataWeight) {
c.second = 0;
}
for (auto& c: columns) {
- cacheColumnStat.ColumnDataWeight[c] = 0;
+ cacheEntry.ColumnarStat.ColumnDataWeight[c] = 0;
}
}
-void TTransactionCache::TEntry::UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat) {
+void TTransactionCache::TEntry::UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat, bool extended) {
+ TVector<TString> columns(std::move(ytPath.Columns_->Parts_));
ytPath.Columns_.Clear();
auto guard = Guard(Lock_);
- NYT::TTableColumnarStatistics& cacheColumnStat = StatisticsCache[NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text)];
- cacheColumnStat.LegacyChunksDataWeight = columnStat.LegacyChunksDataWeight;
- cacheColumnStat.TimestampTotalWeight = columnStat.TimestampTotalWeight;
+ auto& cacheEntry = StatisticsCache[NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text)];
+ if (extended) {
+ std::copy(columns.begin(), columns.end(), std::inserter(cacheEntry.ExtendedStatColumns, cacheEntry.ExtendedStatColumns.end()));
+ }
+ cacheEntry.ColumnarStat.LegacyChunksDataWeight = columnStat.LegacyChunksDataWeight;
+ cacheEntry.ColumnarStat.TimestampTotalWeight = columnStat.TimestampTotalWeight;
for (auto& c: columnStat.ColumnDataWeight) {
- cacheColumnStat.ColumnDataWeight[c.first] = c.second;
+ cacheEntry.ColumnarStat.ColumnDataWeight[c.first] = c.second;
+ }
+ if (extended) {
+ for (auto& c : columnStat.ColumnEstimatedUniqueCounts) {
+ cacheEntry.ColumnarStat.ColumnEstimatedUniqueCounts[c.first] = c.second;
+ }
}
}
diff --git a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h
index 27cc98a12e..945902416d 100644
--- a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h
+++ b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h
@@ -47,7 +47,6 @@ public:
bool KeepTables = false;
THashMap<std::pair<TString, ui32>, std::tuple<TString, NYT::TTransactionId, ui64>> Snapshots; // {tablepath, epoch} -> {table_id, transaction_id, revision}
NYT::TNode TransactionSpec;
- THashMap<TString, NYT::TTableColumnarStatistics> StatisticsCache;
THashMap<TString, TString> BinarySnapshots; // remote path -> snapshot path
NYT::ITransactionPtr BinarySnapshotTx;
THashMap<TString, NYT::ITransactionPtr> CheckpointTxs;
@@ -114,8 +113,10 @@ public:
void CompleteWriteTx(const NYT::TTransactionId& id, bool abort);
TMaybe<ui64> GetColumnarStat(NYT::TRichYPath ytPath) const;
+ TMaybe<NYT::TTableColumnarStatistics> GetExtendedColumnarStat(NYT::TRichYPath ytPath) const;
+
void UpdateColumnarStat(NYT::TRichYPath ytPath, ui64 size);
- void UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat);
+ void UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat, bool extended = false);
std::pair<TString, NYT::TTransactionId> GetBinarySnapshot(TString remoteTmpFolder, const TString& md5, const TString& localPath, TDuration expirationInterval);
@@ -124,6 +125,13 @@ public:
using TPtr = TIntrusivePtr<TEntry>;
private:
+ struct TStatisticsCacheEntry {
+ std::unordered_set<TString> ExtendedStatColumns;
+ NYT::TTableColumnarStatistics ColumnarStat;
+ };
+
+ THashMap<TString, TStatisticsCacheEntry> StatisticsCache;
+
void DeleteAtFinalizeUnlocked(const TString& table, bool isInternal);
bool CancelDeleteAtFinalizeUnlocked(const TString& table, bool isInternal);
void DoRemove(const TString& table);
diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
index b6e11c76cc..782455030b 100644
--- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
+++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
@@ -195,6 +195,27 @@ inline TType OptionFromNode(const NYT::TNode& value) {
}
}
+void PopulatePathStatResult(IYtGateway::TPathStatResult& out, int index, NYT::TTableColumnarStatistics& extendedStat) {
+ for (const auto& entry : extendedStat.ColumnDataWeight) {
+ out.DataSize[index] += entry.second;
+ }
+ out.Extended[index] = IYtGateway::TPathStatResult::TExtendedResult{
+ .DataWeight = extendedStat.ColumnDataWeight,
+ .EstimatedUniqueCounts = extendedStat.ColumnEstimatedUniqueCounts
+ };
+}
+
+TString DebugPath(NYT::TRichYPath path) {
+ constexpr int maxDebugColumns = 20;
+ if (!path.Columns_ || std::ssize(path.Columns_->Parts_) <= maxDebugColumns) {
+ return NYT::NodeToCanonicalYsonString(NYT::PathToNode(path), NYT::NYson::EYsonFormat::Text);
+ }
+ int numColumns = std::ssize(path.Columns_->Parts_);
+ path.Columns_->Parts_.erase(path.Columns_->Parts_.begin() + maxDebugColumns, path.Columns_->Parts_.end());
+ path.Columns_->Parts_.push_back("...");
+ return NYT::NodeToCanonicalYsonString(NYT::PathToNode(path), NYT::NYson::EYsonFormat::Text) + " (" + std::to_string(numColumns) + " columns)";
+}
+
} // unnamed
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -4495,6 +4516,7 @@ private:
try {
TPathStatResult res;
res.DataSize.resize(execCtx->Options_.Paths().size(), 0);
+ res.Extended.resize(execCtx->Options_.Paths().size());
auto entry = execCtx->GetOrCreateEntry();
auto tx = entry->Tx;
@@ -4502,6 +4524,7 @@ private:
const NYT::EOptimizeForAttr tmpOptimizeFor = execCtx->Options_.Config()->OptimizeFor.Get(execCtx->Cluster_).GetOrElse(NYT::EOptimizeForAttr::OF_LOOKUP_ATTR);
TVector<NYT::TRichYPath> ytPaths(Reserve(execCtx->Options_.Paths().size()));
TVector<size_t> pathMap;
+ bool extended = execCtx->Options_.Extended();
auto extractSysColumns = [] (NYT::TRichYPath& ytPath) -> TVector<TString> {
TVector<TString> res;
@@ -4545,16 +4568,19 @@ private:
YQL_CLOG(INFO, ProviderYt) << "Adding stat for " << col << ": " << size << " (virtual)";
}
}
- if (auto val = entry->GetColumnarStat(ytPath)) {
- res.DataSize[i] += *val;
- YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << ": " << res.DataSize[i] << " (from cache)";
+ TMaybe<ui64> cachedStat;
+ TMaybe<NYT::TTableColumnarStatistics> cachedExtendedStat;
+ if (!extended && (cachedStat = entry->GetColumnarStat(ytPath))) {
+ res.DataSize[i] += *cachedStat;
+ YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << ": " << res.DataSize[i] << " (from cache, extended: false)";
+ } else if (extended && (cachedExtendedStat = entry->GetExtendedColumnarStat(ytPath))) {
+ PopulatePathStatResult(res, i, *cachedExtendedStat);
+ YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (from cache, extended: true)";
} else if (onlyCached) {
- YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " is missing in cache - sync path stat failed";
+ YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " is missing in cache - sync path stat failed (extended: " << extended << ")";
return res;
- } else if (NYT::EOptimizeForAttr::OF_SCAN_ATTR == tmpOptimizeFor) {
- pathMap.push_back(i);
- ytPaths.push_back(ytPath);
- } else {
+ } else if (NYT::EOptimizeForAttr::OF_SCAN_ATTR != tmpOptimizeFor && !extended) {
+
// Use entire table size for lookup tables (YQL-7257)
if (attrs.IsUndefined()) {
attrs = tx->Get(ytPath.Path_ + "/@", NYT::TGetOptions().AttributeFilter(
@@ -4566,7 +4592,10 @@ private:
auto size = CalcDataSize(ytPath, attrs);
res.DataSize[i] += size;
entry->UpdateColumnarStat(ytPath, size);
- YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << ": " << res.DataSize[i] << " (uncompressed_data_size for lookup)";
+ YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << ": " << res.DataSize[i] << " (uncompressed_data_size for lookup, extended: false)";
+ } else {
+ ytPaths.push_back(ytPath);
+ pathMap.push_back(i);
}
} else {
auto p = entry->Snapshots.FindPtr(std::make_pair(tablePath, req.Epoch()));
@@ -4597,11 +4626,19 @@ private:
YQL_CLOG(INFO, ProviderYt) << "Adding stat for " << col << ": " << size << " (virtual)";
}
}
- if (auto val = entry->GetColumnarStat(ytPath)) {
- res.DataSize[i] += *val;
- YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (from cache)";
+ TMaybe<ui64> cachedStat;
+ TMaybe<NYT::TTableColumnarStatistics> cachedExtendedStat;
+ if (!extended && (cachedStat = entry->GetColumnarStat(ytPath))) {
+ res.DataSize[i] += *cachedStat;
+ YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (from cache, extended: false)";
+ } else if (extended && (cachedExtendedStat = entry->GetExtendedColumnarStat(ytPath))) {
+ PopulatePathStatResult(res, i, *cachedExtendedStat);
+ YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (from cache, extended: true)";
} else if (onlyCached) {
- YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << ") is missing in cache - sync path stat failed";
+ YQL_CLOG(INFO, ProviderYt)
+ << "Stat for " << DebugPath(req.Path())
+ << " (epoch=" << req.Epoch() << ", extended: " << extended
+ << ") is missing in cache - sync path stat failed";
return res;
} else {
if (attrs.IsUndefined()) {
@@ -4613,18 +4650,19 @@ private:
.AddAttribute(TString("schema"))
));
}
- if (attrs.HasKey("optimize_for") && attrs["optimize_for"] == "scan" &&
- AllPathColumnsAreInSchema(req.Path(), attrs))
+ if (extended ||
+ (attrs.HasKey("optimize_for") && attrs["optimize_for"] == "scan" &&
+ AllPathColumnsAreInSchema(req.Path(), attrs)))
{
pathMap.push_back(i);
ytPaths.push_back(ytPath);
- YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << ") add for request with path " << ytPath.Path_;
+ YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (epoch=" << req.Epoch() << ") add for request with path " << ytPath.Path_ << " (extended: " << extended << ")";
} else {
// Use entire table size for lookup tables (YQL-7257)
auto size = CalcDataSize(ytPath, attrs);
res.DataSize[i] += size;
entry->UpdateColumnarStat(ytPath, size);
- YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (uncompressed_data_size for lookup)";
+ YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (uncompressed_data_size for lookup)";
}
}
}
@@ -4632,17 +4670,23 @@ private:
if (ytPaths) {
YQL_ENSURE(!onlyCached);
- auto fetchMode = execCtx->Options_.Config()->JoinColumnarStatisticsFetcherMode.Get().GetOrElse(NYT::EColumnarStatisticsFetcherMode::Fallback);
+ auto fetchMode = extended ?
+ NYT::EColumnarStatisticsFetcherMode::FromNodes :
+ execCtx->Options_.Config()->JoinColumnarStatisticsFetcherMode.Get().GetOrElse(NYT::EColumnarStatisticsFetcherMode::Fallback);
auto columnStats = tx->GetTableColumnarStatistics(ytPaths, NYT::TGetTableColumnarStatisticsOptions().FetcherMode(fetchMode));
YQL_ENSURE(pathMap.size() == columnStats.size());
- for (size_t i: xrange(columnStats.size())) {
+ for (size_t i: xrange(columnStats.size())) {
auto& columnStat = columnStats[i];
const ui64 weight = columnStat.LegacyChunksDataWeight +
Accumulate(columnStat.ColumnDataWeight.begin(), columnStat.ColumnDataWeight.end(), 0ull,
[](ui64 sum, decltype(*columnStat.ColumnDataWeight.begin())& v) { return sum + v.second; });
+ if (extended) {
+ PopulatePathStatResult(res, pathMap[i], columnStat);
+ }
+
res.DataSize[pathMap[i]] += weight;
- entry->UpdateColumnarStat(ytPaths[i], columnStat);
+ entry->UpdateColumnarStat(ytPaths[i], columnStat, extended);
YQL_CLOG(INFO, ProviderYt) << "Stat for " << execCtx->Options_.Paths()[pathMap[i]].Path().Path_ << ": " << weight << " (fetched)";
}
}
diff --git a/ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp b/ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
index a7d66e50a1..1440b22b1a 100644
--- a/ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
+++ b/ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
@@ -7,6 +7,7 @@
#include <ydb/library/yql/core/yql_type_helpers.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
+#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/utils/log/log.h>
@@ -321,6 +322,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EarlyMergeJoin(TExprBas
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBase node, TExprContext& ctx) const {
auto equiJoin = node.Cast<TYtEquiJoin>();
+ auto cluster = equiJoin.DataSink().Cluster().StringValue();
const bool tryReorder = State_->Types->CostBasedOptimizer != ECostBasedOptimizerType::Disable
&& equiJoin.Input().Size() > 2
@@ -338,10 +340,19 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBa
}
}
}
-
const auto tree = ImportYtEquiJoin(equiJoin, ctx);
+
+ const TMaybe<ui64> maxChunkCountExtendedStats = State_->Configuration->ExtendedStatsMaxChunkCount.Get();
+
+ if (tryReorder && waitAllInputs && maxChunkCountExtendedStats) {
+ YQL_CLOG(INFO, ProviderYt) << "Collecting cbo stats for equiJoin";
+ auto collectStatus = CollectCboStats(cluster, *tree, State_, ctx);
+ if (collectStatus == TStatus::Repeat) {
+ return ExportYtEquiJoin(equiJoin, *tree, ctx, State_);
+ }
+ }
if (tryReorder) {
- const auto optimizedTree = OrderJoins(tree, State_, ctx);
+ const auto optimizedTree = OrderJoins(tree, State_, cluster, ctx);
if (optimizedTree != tree) {
return ExportYtEquiJoin(equiJoin, *optimizedTree, ctx, State_);
}
diff --git a/ydb/library/yql/providers/yt/provider/ut/yql_yt_cbo_ut.cpp b/ydb/library/yql/providers/yt/provider/ut/yql_yt_cbo_ut.cpp
index b225d0ed62..c9d54d1aea 100644
--- a/ydb/library/yql/providers/yt/provider/ut/yql_yt_cbo_ut.cpp
+++ b/ydb/library/yql/providers/yt/provider/ut/yql_yt_cbo_ut.cpp
@@ -60,6 +60,7 @@ TYtJoinNodeLeaf::TPtr MakeLeaf(const std::vector<TString>& label, TVector<TStrin
Y_UNIT_TEST_SUITE(TYqlCBO) {
Y_UNIT_TEST(OrderJoinsDoesNothingWhenCBODisabled) {
+ const TString cluster("ut_cluster");
TYtState::TPtr state = MakeIntrusive<TYtState>();
TYtJoinNodeOp::TPtr tree = nullptr;
TYtJoinNodeOp::TPtr optimizedTree;
@@ -68,7 +69,7 @@ Y_UNIT_TEST(OrderJoinsDoesNothingWhenCBODisabled) {
TExprContext ctx;
- optimizedTree = OrderJoins(tree, state, ctx);
+ optimizedTree = OrderJoins(tree, state, cluster, ctx);
UNIT_ASSERT_VALUES_EQUAL(tree, optimizedTree);
}
@@ -98,6 +99,8 @@ Y_UNIT_TEST(NonReordable) {
}
Y_UNIT_TEST(BuildOptimizerTree2Tables) {
+ const TString cluster("ut_cluster");
+ TYtState::TPtr state = MakeIntrusive<TYtState>();
TExprContext exprCtx;
auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n"}, exprCtx);
tree->Left = MakeLeaf({"c"}, {"c"}, 100000, 12333, exprCtx);
@@ -105,7 +108,7 @@ Y_UNIT_TEST(BuildOptimizerTree2Tables) {
std::shared_ptr<IBaseOptimizerNode> resultTree;
std::shared_ptr<IProviderContext> resultCtx;
- BuildOptimizerJoinTree(resultTree, resultCtx, tree);
+ BuildOptimizerJoinTree(state, cluster, resultTree, resultCtx, tree, exprCtx);
UNIT_ASSERT(resultTree->Kind == JoinNodeType);
auto root = std::static_pointer_cast<TJoinOptimizerNode>(resultTree);
@@ -122,6 +125,8 @@ Y_UNIT_TEST(BuildOptimizerTree2Tables) {
}
Y_UNIT_TEST(BuildOptimizerTree2TablesComplexLabel) {
+ const TString cluster("ut_cluster");
+ TYtState::TPtr state = MakeIntrusive<TYtState>();
TExprContext exprCtx;
auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n", "e"}, exprCtx);
tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx);
@@ -129,7 +134,7 @@ Y_UNIT_TEST(BuildOptimizerTree2TablesComplexLabel) {
std::shared_ptr<IBaseOptimizerNode> resultTree;
std::shared_ptr<IProviderContext> resultCtx;
- BuildOptimizerJoinTree(resultTree, resultCtx, tree);
+ BuildOptimizerJoinTree(state, cluster, resultTree, resultCtx, tree, exprCtx);
UNIT_ASSERT(resultTree->Kind == JoinNodeType);
auto root = std::static_pointer_cast<TJoinOptimizerNode>(resultTree);
@@ -146,6 +151,8 @@ Y_UNIT_TEST(BuildOptimizerTree2TablesComplexLabel) {
}
Y_UNIT_TEST(BuildYtJoinTree2Tables) {
+ const TString cluster("ut_cluster");
+ TYtState::TPtr state = MakeIntrusive<TYtState>();
TExprContext exprCtx;
auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n"}, exprCtx);
tree->Left = MakeLeaf({"c"}, {"c"}, 100000, 12333, exprCtx);
@@ -153,7 +160,7 @@ Y_UNIT_TEST(BuildYtJoinTree2Tables) {
std::shared_ptr<IBaseOptimizerNode> resultTree;
std::shared_ptr<IProviderContext> resultCtx;
- BuildOptimizerJoinTree(resultTree, resultCtx, tree);
+ BuildOptimizerJoinTree(state, cluster, resultTree, resultCtx, tree, exprCtx);
auto joinTree = BuildYtJoinTree(resultTree, exprCtx, {});
@@ -161,6 +168,8 @@ Y_UNIT_TEST(BuildYtJoinTree2Tables) {
}
Y_UNIT_TEST(BuildYtJoinTree2TablesForceMergeJoib) {
+ const TString cluster("ut_cluster");
+ TYtState::TPtr state = MakeIntrusive<TYtState>();
TExprContext exprCtx;
auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n"}, exprCtx);
tree->Left = MakeLeaf({"c"}, {"c"}, 100000, 12333, exprCtx);
@@ -169,7 +178,7 @@ Y_UNIT_TEST(BuildYtJoinTree2TablesForceMergeJoib) {
std::shared_ptr<IBaseOptimizerNode> resultTree;
std::shared_ptr<IProviderContext> resultCtx;
- BuildOptimizerJoinTree(resultTree, resultCtx, tree);
+ BuildOptimizerJoinTree(state, cluster, resultTree, resultCtx, tree, exprCtx);
auto joinTree = BuildYtJoinTree(resultTree, exprCtx, {});
@@ -177,6 +186,8 @@ Y_UNIT_TEST(BuildYtJoinTree2TablesForceMergeJoib) {
}
Y_UNIT_TEST(BuildYtJoinTree2TablesComplexLabel) {
+ const TString cluster("ut_cluster");
+ TYtState::TPtr state = MakeIntrusive<TYtState>();
TExprContext exprCtx;
auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n", "e"}, exprCtx);
tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx);
@@ -184,7 +195,7 @@ Y_UNIT_TEST(BuildYtJoinTree2TablesComplexLabel) {
std::shared_ptr<IBaseOptimizerNode> resultTree;
std::shared_ptr<IProviderContext> resultCtx;
- BuildOptimizerJoinTree(resultTree, resultCtx, tree);
+ BuildOptimizerJoinTree(state, cluster, resultTree, resultCtx, tree, exprCtx);
auto joinTree = BuildYtJoinTree(resultTree, exprCtx, {});
UNIT_ASSERT(AreSimilarTrees(joinTree, tree));
@@ -192,6 +203,8 @@ Y_UNIT_TEST(BuildYtJoinTree2TablesComplexLabel) {
Y_UNIT_TEST(BuildYtJoinTree2TablesTableIn2Rels)
{
+ const TString cluster("ut_cluster");
+ TYtState::TPtr state = MakeIntrusive<TYtState>();
TExprContext exprCtx;
auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n", "c"}, exprCtx);
tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx);
@@ -199,7 +212,7 @@ Y_UNIT_TEST(BuildYtJoinTree2TablesTableIn2Rels)
std::shared_ptr<IBaseOptimizerNode> resultTree;
std::shared_ptr<IProviderContext> resultCtx;
- BuildOptimizerJoinTree(resultTree, resultCtx, tree);
+ BuildOptimizerJoinTree(state, cluster, resultTree, resultCtx, tree, exprCtx);
auto joinTree = BuildYtJoinTree(resultTree, exprCtx, {});
UNIT_ASSERT(AreSimilarTrees(joinTree, tree));
@@ -214,6 +227,7 @@ Y_UNIT_TEST(BuildYtJoinTree2TablesTableIn2Rels)
}
void OrderJoins2Tables(auto optimizerType) {
+ const TString cluster("ut_cluster");
TExprContext exprCtx;
auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n"}, exprCtx);
tree->Left = MakeLeaf({"c"}, {"c"}, 100000, 12333, exprCtx);
@@ -223,7 +237,7 @@ void OrderJoins2Tables(auto optimizerType) {
TTypeAnnotationContext typeCtx;
typeCtx.CostBasedOptimizer = optimizerType;
state->Types = &typeCtx;
- auto optimizedTree = OrderJoins(tree, state, exprCtx, true);
+ auto optimizedTree = OrderJoins(tree, state, cluster, exprCtx, true);
UNIT_ASSERT(optimizedTree != tree);
UNIT_ASSERT(optimizedTree->Left);
UNIT_ASSERT(optimizedTree->Right);
@@ -242,6 +256,7 @@ ADD_TEST(OrderJoins2Tables)
void OrderJoins2TablesComplexLabel(auto optimizerType)
{
+ const TString cluster("ut_cluster");
TExprContext exprCtx;
auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n", "e"}, exprCtx);
tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx);
@@ -251,7 +266,7 @@ void OrderJoins2TablesComplexLabel(auto optimizerType)
TYtState::TPtr state = MakeIntrusive<TYtState>();
typeCtx.CostBasedOptimizer = optimizerType;
state->Types = &typeCtx;
- auto optimizedTree = OrderJoins(tree, state, exprCtx, true);
+ auto optimizedTree = OrderJoins(tree, state, cluster, exprCtx, true);
UNIT_ASSERT(optimizedTree != tree);
}
@@ -259,6 +274,7 @@ ADD_TEST(OrderJoins2TablesComplexLabel)
void OrderJoins2TablesTableIn2Rels(auto optimizerType)
{
+ const TString cluster("ut_cluster");
TExprContext exprCtx;
auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n", "e"}, exprCtx);
tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx);
@@ -268,7 +284,7 @@ void OrderJoins2TablesTableIn2Rels(auto optimizerType)
TYtState::TPtr state = MakeIntrusive<TYtState>();
typeCtx.CostBasedOptimizer = optimizerType;
state->Types = &typeCtx;
- auto optimizedTree = OrderJoins(tree, state, exprCtx, true);
+ auto optimizedTree = OrderJoins(tree, state, cluster, exprCtx, true);
UNIT_ASSERT(optimizedTree != tree);
}
@@ -276,6 +292,7 @@ ADD_TEST(OrderJoins2TablesTableIn2Rels)
Y_UNIT_TEST(OrderLeftJoin)
{
+ const TString cluster("ut_cluster");
TExprContext exprCtx;
auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n"}, exprCtx);
tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx);
@@ -286,13 +303,14 @@ Y_UNIT_TEST(OrderLeftJoin)
TYtState::TPtr state = MakeIntrusive<TYtState>();
typeCtx.CostBasedOptimizer = ECostBasedOptimizerType::PG;
state->Types = &typeCtx;
- auto optimizedTree = OrderJoins(tree, state, exprCtx, true);
+ auto optimizedTree = OrderJoins(tree, state, cluster, exprCtx, true);
UNIT_ASSERT(optimizedTree != tree);
UNIT_ASSERT_STRINGS_EQUAL("Left", optimizedTree->JoinKind->Content());
}
Y_UNIT_TEST(UnsupportedJoin)
{
+ const TString cluster("ut_cluster");
TExprContext exprCtx;
auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n"}, exprCtx);
tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx);
@@ -303,11 +321,12 @@ Y_UNIT_TEST(UnsupportedJoin)
TYtState::TPtr state = MakeIntrusive<TYtState>();
typeCtx.CostBasedOptimizer = ECostBasedOptimizerType::PG;
state->Types = &typeCtx;
- auto optimizedTree = OrderJoins(tree, state, exprCtx, true);
+ auto optimizedTree = OrderJoins(tree, state, cluster, exprCtx, true);
UNIT_ASSERT(optimizedTree == tree);
}
Y_UNIT_TEST(OrderJoinSinglePass) {
+ const TString cluster("ut_cluster");
TExprContext exprCtx;
auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n"}, exprCtx);
tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx);
@@ -318,12 +337,13 @@ Y_UNIT_TEST(OrderJoinSinglePass) {
TYtState::TPtr state = MakeIntrusive<TYtState>();
typeCtx.CostBasedOptimizer = ECostBasedOptimizerType::PG;
state->Types = &typeCtx;
- auto optimizedTree = OrderJoins(tree, state, exprCtx, true);
+ auto optimizedTree = OrderJoins(tree, state, cluster, exprCtx, true);
UNIT_ASSERT(optimizedTree != tree);
UNIT_ASSERT(optimizedTree->CostBasedOptPassed);
}
Y_UNIT_TEST(OrderJoinsDoesNothingWhenCBOAlreadyPassed) {
+ const TString cluster("ut_cluster");
TExprContext exprCtx;
auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n"}, exprCtx);
tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx);
@@ -335,7 +355,7 @@ Y_UNIT_TEST(OrderJoinsDoesNothingWhenCBOAlreadyPassed) {
TYtState::TPtr state = MakeIntrusive<TYtState>();
typeCtx.CostBasedOptimizer = ECostBasedOptimizerType::PG;
state->Types = &typeCtx;
- auto optimizedTree = OrderJoins(tree, state, exprCtx, true);
+ auto optimizedTree = OrderJoins(tree, state, cluster, exprCtx, true);
UNIT_ASSERT(optimizedTree == tree);
}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h b/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h
index d6cc076e81..98f0ba50f0 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h
@@ -515,10 +515,16 @@ public:
OPTION_FIELD(TString, Cluster)
OPTION_FIELD(TVector<TPathStatReq>, Paths)
OPTION_FIELD(TYtSettings::TConstPtr, Config)
+ OPTION_FIELD_DEFAULT(bool, Extended, false)
};
struct TPathStatResult: public NCommon::TOperationResult {
+ struct TExtendedResult {
+ THashMap<TString, i64> DataWeight;
+ THashMap<TString, ui64> EstimatedUniqueCounts;
+ };
TVector<ui64> DataSize;
+ TVector<TMaybe<TExtendedResult>> Extended;
};
struct TFullResultTableOptions : public TCommonOptions {
@@ -623,4 +629,4 @@ public:
virtual void AddCluster(const TYtClusterConfig& cluster) = 0;
};
-} \ No newline at end of file
+}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp
index cc87cca423..e9b00bc31e 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp
@@ -240,10 +240,11 @@ IGraphTransformer::TStatus TryEstimateDataSizeChecked(TVector<ui64>& result, TYt
TSet<TString> requestedColumns;
auto status = TryEstimateDataSize(result, requestedColumns, cluster, paths, columns, state, ctx);
+ auto settings = inputSection.Settings().Ptr();
if (status == TStatus::Repeat) {
bool hasStatColumns = NYql::HasSetting(inputSection.Settings().Ref(), EYtSettingType::StatColumns);
if (hasStatColumns) {
- auto oldColumns = NYql::GetSettingAsColumnList(inputSection.Settings().Ref(), EYtSettingType::StatColumns);
+ auto oldColumns = NYql::GetSettingAsColumnList(*settings, EYtSettingType::StatColumns);
TSet<TString> oldColumnSet(oldColumns.begin(), oldColumns.end());
bool alreadyRequested = AllOf(requestedColumns, [&](const auto& c) {
@@ -251,6 +252,8 @@ IGraphTransformer::TStatus TryEstimateDataSizeChecked(TVector<ui64>& result, TYt
});
YQL_ENSURE(!alreadyRequested);
+
+ settings = NYql::RemoveSetting(*settings, EYtSettingType::StatColumns, ctx);
}
YQL_CLOG(INFO, ProviderYt) << "Stat missing for columns: " << JoinSeq(", ", requestedColumns) << ", rebuilding section";
@@ -258,7 +261,7 @@ IGraphTransformer::TStatus TryEstimateDataSizeChecked(TVector<ui64>& result, TYt
inputSection = Build<TYtSection>(ctx, inputSection.Ref().Pos())
.InitFrom(inputSection)
- .Settings(NYql::AddSettingAsColumnList(inputSection.Settings().Ref(), EYtSettingType::StatColumns, requestedColumnList, ctx))
+ .Settings(NYql::AddSettingAsColumnList(*settings, EYtSettingType::StatColumns, requestedColumnList, ctx))
.Done();
}
return status;
@@ -2906,7 +2909,8 @@ TStatus CollectPathsAndLabels(TVector<TYtPathInfo::TPtr>& tables, TJoinLabels& l
return TStatus::Ok;
}
-TStatus CollectPathsAndLabelsReady(bool& ready, TVector<TYtPathInfo::TPtr>& tables, TJoinLabels& labels,
+IGraphTransformer::TStatus CollectPathsAndLabelsReady(
+ bool& ready, TVector<TYtPathInfo::TPtr>& tables, TJoinLabels& labels,
const TStructExprType*& itemType, const TStructExprType*& itemTypeBeforePremap,
const TYtJoinNodeLeaf& leaf, TExprContext& ctx)
{
@@ -4682,6 +4686,111 @@ TYtJoinNodeOp::TPtr ImportYtEquiJoin(TYtEquiJoin equiJoin, TExprContext& ctx) {
return root;
}
+IGraphTransformer::TStatus CollectCboStatsLeaf(
+ const THashMap<TString, THashSet<TString>>& relJoinColumns,
+ const TString& cluster,
+ TYtJoinNodeLeaf& leaf,
+ const TYtState::TPtr& state,
+ TExprContext& ctx) {
+
+ const TMaybe<ui64> maxChunkCountExtendedStats = state->Configuration->ExtendedStatsMaxChunkCount.Get();
+ if (!maxChunkCountExtendedStats) {
+ return TStatus::Ok;
+ }
+
+ TVector<TString> keyList;
+ auto columnsPos = relJoinColumns.find(JoinLeafLabel(leaf.Label));
+ if (columnsPos != relJoinColumns.end()) {
+ keyList.assign(columnsPos->second.begin(), columnsPos->second.end());
+ }
+ TVector<IYtGateway::TPathStatReq> pathStatReqs;
+
+ ui64 sectionChunkCount = 0;
+ TVector<TString> requestedColumnList;
+ for (auto path: leaf.Section.Paths()) {
+ auto pathInfo = MakeIntrusive<TYtPathInfo>(path);
+ sectionChunkCount += pathInfo->Table->Stat->ChunkCount;
+
+ auto ytPath = BuildYtPathForStatRequest(cluster, *pathInfo, keyList, *state, ctx);
+
+ if (!ytPath) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ pathStatReqs.push_back(
+ IYtGateway::TPathStatReq()
+ .Path(*ytPath)
+ .IsTemp(pathInfo->Table->IsTemp)
+ .IsAnonymous(pathInfo->Table->IsAnonymous)
+ .Epoch(pathInfo->Table->Epoch.GetOrElse(0)));
+
+ std::copy(ytPath->Columns_->Parts_.begin(), ytPath->Columns_->Parts_.end(), std::back_inserter(requestedColumnList));
+ }
+
+ if ((*maxChunkCountExtendedStats != 0 && sectionChunkCount > *maxChunkCountExtendedStats)
+ || !pathStatReqs) {
+ return TStatus::Ok;
+ }
+
+ IYtGateway::TPathStatOptions pathStatOptions =
+ IYtGateway::TPathStatOptions(state->SessionId)
+ .Cluster(cluster)
+ .Paths(pathStatReqs)
+ .Config(state->Configuration->Snapshot())
+ .Extended(true);
+
+ IYtGateway::TPathStatResult pathStats = state->Gateway->TryPathStat(std::move(pathStatOptions));
+
+ if (pathStats.Success()) {
+ return TStatus::Ok;
+ }
+ std::sort(requestedColumnList.begin(), requestedColumnList.end());
+ requestedColumnList.erase(std::unique(requestedColumnList.begin(), requestedColumnList.end()),
+ requestedColumnList.end());
+ leaf.Section = Build<TYtSection>(ctx, leaf.Section.Ref().Pos())
+ .InitFrom(leaf.Section)
+ .Settings(NYql::AddSettingAsColumnList(leaf.Section.Settings().Ref(), EYtSettingType::StatColumns, requestedColumnList, ctx))
+ .Done();
+ return TStatus::Repeat;
+}
+
+void AddJoinColumns(THashMap<TString, THashSet<TString>>& relJoinColumns, const TYtJoinNodeOp& op) {
+ for (ui32 i = 0; i < op.LeftLabel->ChildrenSize(); i += 2) {
+ auto ltable = op.LeftLabel->Child(i)->Content();
+ auto lcolumn = op.LeftLabel->Child(i + 1)->Content();
+ auto rtable = op.RightLabel->Child(i)->Content();
+ auto rcolumn = op.RightLabel->Child(i + 1)->Content();
+
+ relJoinColumns[TString(ltable)].insert(TString(lcolumn));
+ relJoinColumns[TString(rtable)].insert(TString(rcolumn));
+ }
+}
+
+IGraphTransformer::TStatus CollectCboStatsNode(THashMap<TString, THashSet<TString>>& relJoinColumns, const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) {
+ IGraphTransformer::TStatus result = TStatus::Ok;
+ TYtJoinNodeLeaf* leftLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op.Left.Get());
+ TYtJoinNodeLeaf* rightLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op.Right.Get());
+ AddJoinColumns(relJoinColumns, op);
+ if (leftLeaf) {
+ result = result.Combine(CollectCboStatsLeaf(relJoinColumns, cluster, *leftLeaf, state, ctx));
+ } else {
+ auto& leftOp = *dynamic_cast<TYtJoinNodeOp*>(op.Left.Get());
+ result = result.Combine(CollectCboStatsNode(relJoinColumns, cluster, leftOp, state, ctx));
+ }
+ if (rightLeaf) {
+ result = result.Combine(CollectCboStatsLeaf(relJoinColumns, cluster, *rightLeaf, state, ctx));
+ } else {
+ auto& rightOp = *dynamic_cast<TYtJoinNodeOp*>(op.Right.Get());
+ result = result.Combine(CollectCboStatsNode(relJoinColumns, cluster, rightOp, state, ctx));
+ }
+ return result;
+}
+
+IGraphTransformer::TStatus CollectCboStats(const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) {
+ THashMap<TString, THashSet<TString>> relJoinColumns;
+ return CollectCboStatsNode(relJoinColumns, cluster, op, state, ctx);
+}
+
IGraphTransformer::TStatus RewriteYtEquiJoin(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) {
switch (RewriteYtEquiJoinStar(equiJoin, op, state, ctx)) {
case EStarRewriteStatus::Error:
@@ -4790,4 +4899,19 @@ TMaybeNode<TExprBase> ExportYtEquiJoin(TYtEquiJoin equiJoin, const TYtJoinNodeOp
return TExprBase(ctx.ChangeChildren(join.Ref(), std::move(children)));
}
+TString JoinLeafLabel(TExprNode::TPtr label) {
+ if (label->ChildrenSize() == 0) {
+ return TString(label->Content());
+ }
+ TString result;
+ for (ui32 i = 0; i < label->ChildrenSize(); ++i) {
+ result += label->Child(i)->Content();
+ if (i+1 != label->ChildrenSize()) {
+ result += ",";
+ }
+ }
+
+ return result;
+}
+
}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.h b/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.h
index 366209de18..e448690c4f 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.h
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.h
@@ -62,15 +62,19 @@ struct TYtJoinNodeOp : TYtJoinNode {
};
TYtJoinNodeOp::TPtr ImportYtEquiJoin(TYtEquiJoin equiJoin, TExprContext& ctx);
+
+IGraphTransformer::TStatus CollectCboStats(const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx);
+
IGraphTransformer::TStatus RewriteYtEquiJoinLeaves(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx);
IGraphTransformer::TStatus RewriteYtEquiJoin(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx);
TMaybeNode<TExprBase> ExportYtEquiJoin(TYtEquiJoin equiJoin, const TYtJoinNodeOp& op, TExprContext& ctx, const TYtState::TPtr& state);
-TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, TExprContext& ctx, bool debug = false);
+TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, const TString& cluster, TExprContext& ctx, bool debug = false);
+TString JoinLeafLabel(TExprNode::TPtr label);
struct IBaseOptimizerNode;
struct IProviderContext;
-void BuildOptimizerJoinTree(std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& ctx, TYtJoinNodeOp::TPtr op);
+void BuildOptimizerJoinTree(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TYtJoinNodeOp::TPtr op, TExprContext& ctx);
TYtJoinNode::TPtr BuildYtJoinTree(std::shared_ptr<IBaseOptimizerNode> node, TExprContext& ctx, TPositionHandle pos);
bool AreSimilarTrees(TYtJoinNode::TPtr node1, TYtJoinNode::TPtr node2);
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp
index 53e8f50e46..9721fbe2a2 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp
@@ -1,12 +1,16 @@
#include "yql_yt_join_impl.h"
#include "yql_yt_helpers.h"
+#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h>
+#include <ydb/library/yql/core/yql_graph_transformer.h>
+#include <ydb/library/yql/dq/opt/dq_opt_log.h>
#include <ydb/library/yql/parser/pg_wrapper/interface/optimizer.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
+#include <ydb/library/yql/providers/yt/opt/yql_yt_join.h>
+#include <ydb/library/yql/providers/yt/provider/yql_yt_provider_context.h>
#include <ydb/library/yql/utils/log/log.h>
-#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h>
-#include <ydb/library/yql/dq/opt/dq_opt_log.h>
+#include <yt/cpp/mapreduce/common/helpers.h>
namespace NYql {
@@ -59,10 +63,12 @@ public:
TJoinReorderer(
TYtJoinNodeOp::TPtr op,
const TYtState::TPtr& state,
+ const TString& cluster,
TExprContext& ctx,
bool debug = false)
: Root(op)
, State(state)
+ , Cluster(cluster)
, Ctx(ctx)
, Debug(debug)
{
@@ -75,9 +81,9 @@ public:
TYtJoinNodeOp::TPtr Do() {
std::shared_ptr<IBaseOptimizerNode> tree;
- std::shared_ptr<IProviderContext> ctx;
- BuildOptimizerJoinTree(tree, ctx, Root);
- auto ytCtx = std::static_pointer_cast<TYtProviderContext>(ctx);
+ std::shared_ptr<IProviderContext> providerCtx;
+ BuildOptimizerJoinTree(State, Cluster, tree, providerCtx, Root, Ctx);
+ auto ytCtx = std::static_pointer_cast<TYtProviderContext>(providerCtx);
std::function<void(const TString& str)> log;
@@ -93,14 +99,14 @@ public:
YQL_CLOG(ERROR, ProviderYt) << "PG CBO does not support link settings";
return Root;
}
- opt = std::unique_ptr<IOptimizerNew>(MakePgOptimizerNew(*ctx, Ctx, log));
+ opt = std::unique_ptr<IOptimizerNew>(MakePgOptimizerNew(*providerCtx, Ctx, log));
break;
case ECostBasedOptimizerType::Native:
if (ytCtx->HasHints) {
YQL_CLOG(ERROR, ProviderYt) << "Native CBO does not suppor link hints";
return Root;
}
- opt = std::unique_ptr<IOptimizerNew>(NDq::MakeNativeOptimizerNew(*ctx, 100000));
+ opt = std::unique_ptr<IOptimizerNew>(NDq::MakeNativeOptimizerNew(*providerCtx, 100000));
break;
default:
YQL_CLOG(ERROR, ProviderYt) << "Unknown optimizer type " << ToString(State->Types->CostBasedOptimizer);
@@ -137,6 +143,7 @@ public:
private:
TYtJoinNodeOp::TPtr Root;
const TYtState::TPtr& State;
+ TString Cluster;
TExprContext& Ctx;
bool Debug;
};
@@ -169,16 +176,19 @@ public:
class TOptimizerTreeBuilder
{
public:
- TOptimizerTreeBuilder(std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& ctx, TYtJoinNodeOp::TPtr inputTree)
- : Tree(tree)
- , OutCtx(ctx)
+ TOptimizerTreeBuilder(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TYtJoinNodeOp::TPtr inputTree, TExprContext& ctx)
+ : State(state)
+ , Cluster(cluster)
+ , Tree(tree)
+ , OutProviderCtx(providerCtx)
, InputTree(inputTree)
+ , Ctx(ctx)
{ }
void Do() {
- Ctx = std::make_shared<TYtProviderContext>();
+ ProviderCtx = std::make_shared<TYtProviderContext>();
Tree = ProcessNode(InputTree);
- OutCtx = Ctx;
+ OutProviderCtx = ProviderCtx;
}
private:
@@ -195,8 +205,6 @@ private:
std::shared_ptr<IBaseOptimizerNode> OnOp(TYtJoinNodeOp* op) {
auto joinKind = ConvertToJoinKind(TString(op->JoinKind->Content()));
- auto left = ProcessNode(op->Left);
- auto right = ProcessNode(op->Right);
YQL_ENSURE(op->LeftLabel->ChildrenSize() == op->RightLabel->ChildrenSize());
std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>> joinConditions;
for (ui32 i = 0; i < op->LeftLabel->ChildrenSize(); i += 2) {
@@ -204,13 +212,17 @@ private:
auto lcolumn = op->LeftLabel->Child(i + 1)->Content();
auto rtable = op->RightLabel->Child(i)->Content();
auto rcolumn = op->RightLabel->Child(i + 1)->Content();
+ AddRelJoinColumn(TString(ltable), TString(lcolumn));
+ AddRelJoinColumn(TString(rtable), TString(rcolumn));
NDq::TJoinColumn lcol{TString(ltable), TString(lcolumn)};
NDq::TJoinColumn rcol{TString(rtable), TString(rcolumn)};
joinConditions.insert({lcol, rcol});
}
+ auto left = ProcessNode(op->Left);
+ auto right = ProcessNode(op->Right);
bool nonReorderable = op->LinkSettings.ForceSortedMerge;
- Ctx->HasForceSortedMerge = Ctx->HasForceSortedMerge || op->LinkSettings.ForceSortedMerge;
- Ctx->HasHints = Ctx->HasHints || !op->LinkSettings.LeftHints.empty() || !op->LinkSettings.RightHints.empty();
+ ProviderCtx->HasForceSortedMerge = ProviderCtx->HasForceSortedMerge || op->LinkSettings.ForceSortedMerge;
+ ProviderCtx->HasHints = ProviderCtx->HasHints || !op->LinkSettings.LeftHints.empty() || !op->LinkSettings.RightHints.empty();
return std::make_shared<TYtJoinOptimizerNode>(
left, right, joinConditions, joinKind, EJoinAlgoType::GraceJoin, nonReorderable ? op : nullptr
@@ -218,20 +230,25 @@ private:
}
std::shared_ptr<IBaseOptimizerNode> OnLeaf(TYtJoinNodeLeaf* leaf) {
- TString label;
- if (leaf->Label->ChildrenSize() == 0) {
- label = leaf->Label->Content();
- } else {
- for (ui32 i = 0; i < leaf->Label->ChildrenSize(); ++i) {
- label += leaf->Label->Child(i)->Content();
- if (i+1 != leaf->Label->ChildrenSize()) {
- label += ",";
- }
- }
+ TString label = JoinLeafLabel(leaf->Label);
+
+ const TMaybe<ui64> maxChunkCountExtendedStats = State->Configuration->ExtendedStatsMaxChunkCount.Get();
+ bool enableExtendedStats = maxChunkCountExtendedStats.Defined();
+
+ TVector<TString> keyList;
+ auto joinColumns = RelJoinColumns.find(label);
+ if (joinColumns != RelJoinColumns.end()) {
+ keyList.assign(joinColumns->second.begin(), joinColumns->second.end());
}
TYtSection section{leaf->Section};
auto stat = std::make_shared<TOptimizerStatistics>();
+ stat->ColumnStatistics = TIntrusivePtr<TOptimizerStatistics::TColumnStatMap>(
+ new TOptimizerStatistics::TColumnStatMap());
+ auto providerStats = std::make_unique<TYtProviderStatistic>();
+ TVector<IYtGateway::TPathStatReq> pathStatReqs;
+ ui64 totalChunkCount = 0;
+
if (Y_UNLIKELY(!section.Settings().Empty()) && Y_UNLIKELY(section.Settings().Item(0).Name() == "Test")) {
for (const auto& setting : section.Settings()) {
if (setting.Name() == "Rows") {
@@ -245,19 +262,80 @@ private:
auto tableStat = TYtTableBaseInfo::GetStat(path.Table());
stat->Cost += tableStat->DataSize;
stat->Nrows += tableStat->RecordsCount;
+ totalChunkCount += tableStat->ChunkCount;
+
+ auto pathInfo = MakeIntrusive<TYtPathInfo>(path);
+ auto ytPath = BuildYtPathForStatRequest(Cluster, *pathInfo, keyList, *State, Ctx);
+ YQL_ENSURE(ytPath);
+
+ if (enableExtendedStats) {
+ pathStatReqs.push_back(
+ IYtGateway::TPathStatReq()
+ .Path(*ytPath)
+ .IsTemp(pathInfo->Table->IsTemp)
+ .IsAnonymous(pathInfo->Table->IsAnonymous)
+ .Epoch(pathInfo->Table->Epoch.GetOrElse(0)));
+ }
+ }
+ auto sorted = section.Ref().GetConstraint<TSortedConstraintNode>();
+ if (sorted) {
+ TVector<TString> key;
+ for (const auto& item : sorted->GetContent()) {
+ for (const auto& path : item.first) {
+ const auto& column = path.front();
+ key.push_back(TString(column));
+ }
+ }
+ providerStats->SortColumns = key;
}
}
+ if (!pathStatReqs.empty() &&
+ (*maxChunkCountExtendedStats == 0 || totalChunkCount <= *maxChunkCountExtendedStats)) {
+ IYtGateway::TPathStatOptions pathStatOptions =
+ IYtGateway::TPathStatOptions(State->SessionId)
+ .Cluster(Cluster)
+ .Paths(pathStatReqs)
+ .Config(State->Configuration->Snapshot())
+ .Extended(true);
+
+ IYtGateway::TPathStatResult pathStats = State->Gateway->TryPathStat(std::move(pathStatOptions));
+ if (pathStats.Success()) {
+ for (const auto& extended : pathStats.Extended) {
+ if (!extended) {
+ continue;
+ }
+ for (const auto& entry : extended->EstimatedUniqueCounts) {
+ stat->ColumnStatistics->Data[entry.first].NumUniqueVals = entry.second;
+ }
+ for (const auto& entry : extended->DataWeight) {
+ providerStats->ColumnStatistics[entry.first] = {
+ .DataWeight = entry.second
+ };
+ }
+ }
+ }
+ }
+
+ stat->Specific = std::unique_ptr<const IProviderStatistics>(providerStats.release());
return std::make_shared<TYtRelOptimizerNode>(
std::move(label), std::move(stat), leaf
);
}
- std::shared_ptr<IBaseOptimizerNode>& Tree;
- std::shared_ptr<TYtProviderContext> Ctx;
- std::shared_ptr<IProviderContext>& OutCtx;
+ void AddRelJoinColumn(const TString& rtable, const TString& rcolumn) {
+ auto entry = RelJoinColumns.insert(std::make_pair(rtable, THashSet<TString>{}));
+ entry.first->second.insert(rcolumn);
+ }
+ TYtState::TPtr State;
+ const TString Cluster;
+ std::shared_ptr<IBaseOptimizerNode>& Tree;
+ std::shared_ptr<TYtProviderContext> ProviderCtx;
+ std::shared_ptr<IProviderContext>& OutProviderCtx;
+ THashMap<TString, THashSet<TString>> RelJoinColumns;
TYtJoinNodeOp::TPtr InputTree;
+ TExprContext& Ctx;
};
TYtJoinNode::TPtr BuildYtJoinTree(std::shared_ptr<IBaseOptimizerNode> node, TVector<TString>& scope, TExprContext& ctx, TPositionHandle pos) {
@@ -330,9 +408,9 @@ bool AreSimilarTrees(TYtJoinNode::TPtr node1, TYtJoinNode::TPtr node2) {
}
}
-void BuildOptimizerJoinTree(std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& ctx, TYtJoinNodeOp::TPtr op)
+void BuildOptimizerJoinTree(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TYtJoinNodeOp::TPtr op, TExprContext& ctx)
{
- TOptimizerTreeBuilder(tree, ctx, op).Do();
+ TOptimizerTreeBuilder(state, cluster, tree, providerCtx, op, ctx).Do();
}
TYtJoinNode::TPtr BuildYtJoinTree(std::shared_ptr<IBaseOptimizerNode> node, TExprContext& ctx, TPositionHandle pos) {
@@ -340,13 +418,13 @@ TYtJoinNode::TPtr BuildYtJoinTree(std::shared_ptr<IBaseOptimizerNode> node, TExp
return BuildYtJoinTree(node, scope, ctx, pos);
}
-TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, TExprContext& ctx, bool debug)
+TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, const TString& cluster, TExprContext& ctx, bool debug)
{
if (state->Types->CostBasedOptimizer == ECostBasedOptimizerType::Disable || op->CostBasedOptPassed) {
return op;
}
- auto result = TJoinReorderer(op, state, ctx, debug).Do();
+ auto result = TJoinReorderer(op, state, cluster, ctx, debug).Do();
if (!debug && AreSimilarTrees(result, op)) {
return op;
}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp
index c7837d8e83..8c9ab6ea03 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp
@@ -69,6 +69,7 @@ private:
bool hasError = false;
TNodeOnNodeOwnedMap sectionRewrites;
VisitExpr(input, [this, &pathStatArgs, &hasError, &sectionRewrites, &ctx](const TExprNode::TPtr& node) {
+ const TMaybe<ui64> maxChunkCountExtendedStats = State_->Configuration->ExtendedStatsMaxChunkCount.Get();
if (auto maybeSection = TMaybeNode<TYtSection>(node)) {
TYtSection section = maybeSection.Cast();
if (NYql::HasSetting(section.Settings().Ref(), EYtSettingType::StatColumns)) {
@@ -77,6 +78,7 @@ private:
TMaybe<TString> cluster;
TVector<IYtGateway::TPathStatReq> pathStatReqs;
size_t idx = 0;
+ ui64 totalChunkCount = 0;
for (auto path: section.Paths()) {
bool hasStat = false;
if (path.Table().Maybe<TYtTable>().Stat().Maybe<TYtStat>()) {
@@ -107,6 +109,7 @@ private:
TYtPathInfo pathInfo(path);
YQL_ENSURE(pathInfo.Table->Stat);
+ totalChunkCount += pathInfo.Table->Stat->ChunkCount;
TString currCluster;
if (auto ytTable = path.Table().Maybe<TYtTable>()) {
@@ -139,11 +142,15 @@ private:
++idx;
}
+ bool requestExtendedStats = maxChunkCountExtendedStats &&
+ (*maxChunkCountExtendedStats == 0 || totalChunkCount <= *maxChunkCountExtendedStats);
+
if (pathStatReqs) {
auto pathStatOptions = IYtGateway::TPathStatOptions(State_->SessionId)
.Cluster(*cluster)
.Paths(pathStatReqs)
- .Config(State_->Configuration->Snapshot());
+ .Config(State_->Configuration->Snapshot())
+ .Extended(requestExtendedStats);
auto tryResult = State_->Gateway->TryPathStat(IYtGateway::TPathStatOptions(pathStatOptions));
if (!tryResult.Success()) {
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider_context.h b/ydb/library/yql/providers/yt/provider/yql_yt_provider_context.h
new file mode 100644
index 0000000000..15c1b4d456
--- /dev/null
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider_context.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include <ydb/library/yql/core/yql_statistics.h>
+
+namespace NYql {
+
+struct TYtColumnStatistic {
+ int64_t DataWeight;
+};
+
+class TYtProviderStatistic : public IProviderStatistics {
+public:
+ std::unordered_map<TString, TYtColumnStatistic> ColumnStatistics;
+ TVector<TString> SortColumns;
+};
+
+} // namespace NYql