diff options
author | Dmitry O <alephonea@users.noreply.github.com> | 2024-09-10 15:36:56 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-09-10 15:36:56 +0300 |
commit | a7d2186ccb9393cd6d569baaa3579dbfa4bd2303 (patch) | |
tree | 386544502edcdc31f570660a4dc905b9aebf6ec2 | |
parent | f11c0b20957bbb676ef18ca71b014c4d102de83d (diff) | |
download | ydb-a7d2186ccb9393cd6d569baaa3579dbfa4bd2303.tar.gz |
HLL in YT statistics (#8184)
13 files changed, 446 insertions, 89 deletions
diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp index 7ce041e525..eab8125f67 100644 --- a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp +++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp @@ -484,6 +484,7 @@ TYtConfiguration::TYtConfiguration() }); REGISTER_SETTING(*this, MinColumnGroupSize).Lower(2); REGISTER_SETTING(*this, MaxColumnGroups); + REGISTER_SETTING(*this, ExtendedStatsMaxChunkCount); } EReleaseTempDataMode GetReleaseTempDataMode(const TYtSettings& settings) { diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.h b/ydb/library/yql/providers/yt/common/yql_yt_settings.h index 0dc87fa39e..d39c1bdb4a 100644 --- a/ydb/library/yql/providers/yt/common/yql_yt_settings.h +++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.h @@ -280,6 +280,7 @@ struct TYtSettings { NCommon::TConfSetting<EColumnGroupMode, false> ColumnGroupMode; NCommon::TConfSetting<ui16, false> MinColumnGroupSize; NCommon::TConfSetting<ui16, false> MaxColumnGroups; + NCommon::TConfSetting<ui64, false> ExtendedStatsMaxChunkCount; }; EReleaseTempDataMode GetReleaseTempDataMode(const TYtSettings& settings); diff --git a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp index 7b81c99ce8..485fe97024 100644 --- a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp +++ b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp @@ -101,9 +101,9 @@ TMaybe<ui64> TTransactionCache::TEntry::GetColumnarStat(NYT::TRichYPath ytPath) auto guard = Guard(Lock_); if (auto p = StatisticsCache.FindPtr(NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text))) { - ui64 sum = p->LegacyChunksDataWeight; + ui64 sum = p->ColumnarStat.LegacyChunksDataWeight; for (auto& column: columns) { - if (auto c = p->ColumnDataWeight.FindPtr(column)) { + if (auto c = p->ColumnarStat.ColumnDataWeight.FindPtr(column)) { sum += *c; } else { return Nothing(); @@ -114,30 +114,66 @@ TMaybe<ui64> TTransactionCache::TEntry::GetColumnarStat(NYT::TRichYPath ytPath) return Nothing(); } +TMaybe<NYT::TTableColumnarStatistics> TTransactionCache::TEntry::GetExtendedColumnarStat(NYT::TRichYPath ytPath) const { + TVector<TString> columns(std::move(ytPath.Columns_->Parts_)); + ytPath.Columns_.Clear(); + auto cacheKey = NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text); + + auto guard = Guard(Lock_); + auto p = StatisticsCache.FindPtr(cacheKey); + if (!p) { + return Nothing(); + } + + NYT::TTableColumnarStatistics res; + for (auto& column: columns) { + if (p->ExtendedStatColumns.count(column) == 0) { + return Nothing(); + } + if (auto c = p->ColumnarStat.ColumnDataWeight.FindPtr(column)) { + res.ColumnDataWeight[column] = *c; + } + if (auto c = p->ColumnarStat.ColumnEstimatedUniqueCounts.FindPtr(column)) { + res.ColumnEstimatedUniqueCounts[column] = *c; + } + } + return res; +} + void TTransactionCache::TEntry::UpdateColumnarStat(NYT::TRichYPath ytPath, ui64 size) { YQL_ENSURE(ytPath.Columns_.Defined()); TVector<TString> columns(std::move(ytPath.Columns_->Parts_)); ytPath.Columns_.Clear(); + auto cacheKey = NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text); auto guard = Guard(Lock_); - NYT::TTableColumnarStatistics& cacheColumnStat = StatisticsCache[NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text)]; - cacheColumnStat.LegacyChunksDataWeight = size; - for (auto& c: cacheColumnStat.ColumnDataWeight) { + auto& cacheEntry = StatisticsCache[cacheKey]; + cacheEntry.ColumnarStat.LegacyChunksDataWeight = size; + for (auto& c: cacheEntry.ColumnarStat.ColumnDataWeight) { c.second = 0; } for (auto& c: columns) { - cacheColumnStat.ColumnDataWeight[c] = 0; + cacheEntry.ColumnarStat.ColumnDataWeight[c] = 0; } } -void TTransactionCache::TEntry::UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat) { +void TTransactionCache::TEntry::UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat, bool extended) { + TVector<TString> columns(std::move(ytPath.Columns_->Parts_)); ytPath.Columns_.Clear(); auto guard = Guard(Lock_); - NYT::TTableColumnarStatistics& cacheColumnStat = StatisticsCache[NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text)]; - cacheColumnStat.LegacyChunksDataWeight = columnStat.LegacyChunksDataWeight; - cacheColumnStat.TimestampTotalWeight = columnStat.TimestampTotalWeight; + auto& cacheEntry = StatisticsCache[NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text)]; + if (extended) { + std::copy(columns.begin(), columns.end(), std::inserter(cacheEntry.ExtendedStatColumns, cacheEntry.ExtendedStatColumns.end())); + } + cacheEntry.ColumnarStat.LegacyChunksDataWeight = columnStat.LegacyChunksDataWeight; + cacheEntry.ColumnarStat.TimestampTotalWeight = columnStat.TimestampTotalWeight; for (auto& c: columnStat.ColumnDataWeight) { - cacheColumnStat.ColumnDataWeight[c.first] = c.second; + cacheEntry.ColumnarStat.ColumnDataWeight[c.first] = c.second; + } + if (extended) { + for (auto& c : columnStat.ColumnEstimatedUniqueCounts) { + cacheEntry.ColumnarStat.ColumnEstimatedUniqueCounts[c.first] = c.second; + } } } diff --git a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h index 27cc98a12e..945902416d 100644 --- a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h +++ b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h @@ -47,7 +47,6 @@ public: bool KeepTables = false; THashMap<std::pair<TString, ui32>, std::tuple<TString, NYT::TTransactionId, ui64>> Snapshots; // {tablepath, epoch} -> {table_id, transaction_id, revision} NYT::TNode TransactionSpec; - THashMap<TString, NYT::TTableColumnarStatistics> StatisticsCache; THashMap<TString, TString> BinarySnapshots; // remote path -> snapshot path NYT::ITransactionPtr BinarySnapshotTx; THashMap<TString, NYT::ITransactionPtr> CheckpointTxs; @@ -114,8 +113,10 @@ public: void CompleteWriteTx(const NYT::TTransactionId& id, bool abort); TMaybe<ui64> GetColumnarStat(NYT::TRichYPath ytPath) const; + TMaybe<NYT::TTableColumnarStatistics> GetExtendedColumnarStat(NYT::TRichYPath ytPath) const; + void UpdateColumnarStat(NYT::TRichYPath ytPath, ui64 size); - void UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat); + void UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat, bool extended = false); std::pair<TString, NYT::TTransactionId> GetBinarySnapshot(TString remoteTmpFolder, const TString& md5, const TString& localPath, TDuration expirationInterval); @@ -124,6 +125,13 @@ public: using TPtr = TIntrusivePtr<TEntry>; private: + struct TStatisticsCacheEntry { + std::unordered_set<TString> ExtendedStatColumns; + NYT::TTableColumnarStatistics ColumnarStat; + }; + + THashMap<TString, TStatisticsCacheEntry> StatisticsCache; + void DeleteAtFinalizeUnlocked(const TString& table, bool isInternal); bool CancelDeleteAtFinalizeUnlocked(const TString& table, bool isInternal); void DoRemove(const TString& table); diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp index b6e11c76cc..782455030b 100644 --- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -195,6 +195,27 @@ inline TType OptionFromNode(const NYT::TNode& value) { } } +void PopulatePathStatResult(IYtGateway::TPathStatResult& out, int index, NYT::TTableColumnarStatistics& extendedStat) { + for (const auto& entry : extendedStat.ColumnDataWeight) { + out.DataSize[index] += entry.second; + } + out.Extended[index] = IYtGateway::TPathStatResult::TExtendedResult{ + .DataWeight = extendedStat.ColumnDataWeight, + .EstimatedUniqueCounts = extendedStat.ColumnEstimatedUniqueCounts + }; +} + +TString DebugPath(NYT::TRichYPath path) { + constexpr int maxDebugColumns = 20; + if (!path.Columns_ || std::ssize(path.Columns_->Parts_) <= maxDebugColumns) { + return NYT::NodeToCanonicalYsonString(NYT::PathToNode(path), NYT::NYson::EYsonFormat::Text); + } + int numColumns = std::ssize(path.Columns_->Parts_); + path.Columns_->Parts_.erase(path.Columns_->Parts_.begin() + maxDebugColumns, path.Columns_->Parts_.end()); + path.Columns_->Parts_.push_back("..."); + return NYT::NodeToCanonicalYsonString(NYT::PathToNode(path), NYT::NYson::EYsonFormat::Text) + " (" + std::to_string(numColumns) + " columns)"; +} + } // unnamed /////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -4495,6 +4516,7 @@ private: try { TPathStatResult res; res.DataSize.resize(execCtx->Options_.Paths().size(), 0); + res.Extended.resize(execCtx->Options_.Paths().size()); auto entry = execCtx->GetOrCreateEntry(); auto tx = entry->Tx; @@ -4502,6 +4524,7 @@ private: const NYT::EOptimizeForAttr tmpOptimizeFor = execCtx->Options_.Config()->OptimizeFor.Get(execCtx->Cluster_).GetOrElse(NYT::EOptimizeForAttr::OF_LOOKUP_ATTR); TVector<NYT::TRichYPath> ytPaths(Reserve(execCtx->Options_.Paths().size())); TVector<size_t> pathMap; + bool extended = execCtx->Options_.Extended(); auto extractSysColumns = [] (NYT::TRichYPath& ytPath) -> TVector<TString> { TVector<TString> res; @@ -4545,16 +4568,19 @@ private: YQL_CLOG(INFO, ProviderYt) << "Adding stat for " << col << ": " << size << " (virtual)"; } } - if (auto val = entry->GetColumnarStat(ytPath)) { - res.DataSize[i] += *val; - YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << ": " << res.DataSize[i] << " (from cache)"; + TMaybe<ui64> cachedStat; + TMaybe<NYT::TTableColumnarStatistics> cachedExtendedStat; + if (!extended && (cachedStat = entry->GetColumnarStat(ytPath))) { + res.DataSize[i] += *cachedStat; + YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << ": " << res.DataSize[i] << " (from cache, extended: false)"; + } else if (extended && (cachedExtendedStat = entry->GetExtendedColumnarStat(ytPath))) { + PopulatePathStatResult(res, i, *cachedExtendedStat); + YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (from cache, extended: true)"; } else if (onlyCached) { - YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " is missing in cache - sync path stat failed"; + YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " is missing in cache - sync path stat failed (extended: " << extended << ")"; return res; - } else if (NYT::EOptimizeForAttr::OF_SCAN_ATTR == tmpOptimizeFor) { - pathMap.push_back(i); - ytPaths.push_back(ytPath); - } else { + } else if (NYT::EOptimizeForAttr::OF_SCAN_ATTR != tmpOptimizeFor && !extended) { + // Use entire table size for lookup tables (YQL-7257) if (attrs.IsUndefined()) { attrs = tx->Get(ytPath.Path_ + "/@", NYT::TGetOptions().AttributeFilter( @@ -4566,7 +4592,10 @@ private: auto size = CalcDataSize(ytPath, attrs); res.DataSize[i] += size; entry->UpdateColumnarStat(ytPath, size); - YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << ": " << res.DataSize[i] << " (uncompressed_data_size for lookup)"; + YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << ": " << res.DataSize[i] << " (uncompressed_data_size for lookup, extended: false)"; + } else { + ytPaths.push_back(ytPath); + pathMap.push_back(i); } } else { auto p = entry->Snapshots.FindPtr(std::make_pair(tablePath, req.Epoch())); @@ -4597,11 +4626,19 @@ private: YQL_CLOG(INFO, ProviderYt) << "Adding stat for " << col << ": " << size << " (virtual)"; } } - if (auto val = entry->GetColumnarStat(ytPath)) { - res.DataSize[i] += *val; - YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (from cache)"; + TMaybe<ui64> cachedStat; + TMaybe<NYT::TTableColumnarStatistics> cachedExtendedStat; + if (!extended && (cachedStat = entry->GetColumnarStat(ytPath))) { + res.DataSize[i] += *cachedStat; + YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (from cache, extended: false)"; + } else if (extended && (cachedExtendedStat = entry->GetExtendedColumnarStat(ytPath))) { + PopulatePathStatResult(res, i, *cachedExtendedStat); + YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (from cache, extended: true)"; } else if (onlyCached) { - YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << ") is missing in cache - sync path stat failed"; + YQL_CLOG(INFO, ProviderYt) + << "Stat for " << DebugPath(req.Path()) + << " (epoch=" << req.Epoch() << ", extended: " << extended + << ") is missing in cache - sync path stat failed"; return res; } else { if (attrs.IsUndefined()) { @@ -4613,18 +4650,19 @@ private: .AddAttribute(TString("schema")) )); } - if (attrs.HasKey("optimize_for") && attrs["optimize_for"] == "scan" && - AllPathColumnsAreInSchema(req.Path(), attrs)) + if (extended || + (attrs.HasKey("optimize_for") && attrs["optimize_for"] == "scan" && + AllPathColumnsAreInSchema(req.Path(), attrs))) { pathMap.push_back(i); ytPaths.push_back(ytPath); - YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << ") add for request with path " << ytPath.Path_; + YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (epoch=" << req.Epoch() << ") add for request with path " << ytPath.Path_ << " (extended: " << extended << ")"; } else { // Use entire table size for lookup tables (YQL-7257) auto size = CalcDataSize(ytPath, attrs); res.DataSize[i] += size; entry->UpdateColumnarStat(ytPath, size); - YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (uncompressed_data_size for lookup)"; + YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (uncompressed_data_size for lookup)"; } } } @@ -4632,17 +4670,23 @@ private: if (ytPaths) { YQL_ENSURE(!onlyCached); - auto fetchMode = execCtx->Options_.Config()->JoinColumnarStatisticsFetcherMode.Get().GetOrElse(NYT::EColumnarStatisticsFetcherMode::Fallback); + auto fetchMode = extended ? + NYT::EColumnarStatisticsFetcherMode::FromNodes : + execCtx->Options_.Config()->JoinColumnarStatisticsFetcherMode.Get().GetOrElse(NYT::EColumnarStatisticsFetcherMode::Fallback); auto columnStats = tx->GetTableColumnarStatistics(ytPaths, NYT::TGetTableColumnarStatisticsOptions().FetcherMode(fetchMode)); YQL_ENSURE(pathMap.size() == columnStats.size()); - for (size_t i: xrange(columnStats.size())) { + for (size_t i: xrange(columnStats.size())) { auto& columnStat = columnStats[i]; const ui64 weight = columnStat.LegacyChunksDataWeight + Accumulate(columnStat.ColumnDataWeight.begin(), columnStat.ColumnDataWeight.end(), 0ull, [](ui64 sum, decltype(*columnStat.ColumnDataWeight.begin())& v) { return sum + v.second; }); + if (extended) { + PopulatePathStatResult(res, pathMap[i], columnStat); + } + res.DataSize[pathMap[i]] += weight; - entry->UpdateColumnarStat(ytPaths[i], columnStat); + entry->UpdateColumnarStat(ytPaths[i], columnStat, extended); YQL_CLOG(INFO, ProviderYt) << "Stat for " << execCtx->Options_.Paths()[pathMap[i]].Path().Path_ << ": " << weight << " (fetched)"; } } diff --git a/ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp b/ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp index a7d66e50a1..1440b22b1a 100644 --- a/ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp +++ b/ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp @@ -7,6 +7,7 @@ #include <ydb/library/yql/core/yql_type_helpers.h> #include <ydb/library/yql/core/yql_opt_utils.h> +#include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/utils/log/log.h> @@ -321,6 +322,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EarlyMergeJoin(TExprBas TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBase node, TExprContext& ctx) const { auto equiJoin = node.Cast<TYtEquiJoin>(); + auto cluster = equiJoin.DataSink().Cluster().StringValue(); const bool tryReorder = State_->Types->CostBasedOptimizer != ECostBasedOptimizerType::Disable && equiJoin.Input().Size() > 2 @@ -338,10 +340,19 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBa } } } - const auto tree = ImportYtEquiJoin(equiJoin, ctx); + + const TMaybe<ui64> maxChunkCountExtendedStats = State_->Configuration->ExtendedStatsMaxChunkCount.Get(); + + if (tryReorder && waitAllInputs && maxChunkCountExtendedStats) { + YQL_CLOG(INFO, ProviderYt) << "Collecting cbo stats for equiJoin"; + auto collectStatus = CollectCboStats(cluster, *tree, State_, ctx); + if (collectStatus == TStatus::Repeat) { + return ExportYtEquiJoin(equiJoin, *tree, ctx, State_); + } + } if (tryReorder) { - const auto optimizedTree = OrderJoins(tree, State_, ctx); + const auto optimizedTree = OrderJoins(tree, State_, cluster, ctx); if (optimizedTree != tree) { return ExportYtEquiJoin(equiJoin, *optimizedTree, ctx, State_); } diff --git a/ydb/library/yql/providers/yt/provider/ut/yql_yt_cbo_ut.cpp b/ydb/library/yql/providers/yt/provider/ut/yql_yt_cbo_ut.cpp index b225d0ed62..c9d54d1aea 100644 --- a/ydb/library/yql/providers/yt/provider/ut/yql_yt_cbo_ut.cpp +++ b/ydb/library/yql/providers/yt/provider/ut/yql_yt_cbo_ut.cpp @@ -60,6 +60,7 @@ TYtJoinNodeLeaf::TPtr MakeLeaf(const std::vector<TString>& label, TVector<TStrin Y_UNIT_TEST_SUITE(TYqlCBO) { Y_UNIT_TEST(OrderJoinsDoesNothingWhenCBODisabled) { + const TString cluster("ut_cluster"); TYtState::TPtr state = MakeIntrusive<TYtState>(); TYtJoinNodeOp::TPtr tree = nullptr; TYtJoinNodeOp::TPtr optimizedTree; @@ -68,7 +69,7 @@ Y_UNIT_TEST(OrderJoinsDoesNothingWhenCBODisabled) { TExprContext ctx; - optimizedTree = OrderJoins(tree, state, ctx); + optimizedTree = OrderJoins(tree, state, cluster, ctx); UNIT_ASSERT_VALUES_EQUAL(tree, optimizedTree); } @@ -98,6 +99,8 @@ Y_UNIT_TEST(NonReordable) { } Y_UNIT_TEST(BuildOptimizerTree2Tables) { + const TString cluster("ut_cluster"); + TYtState::TPtr state = MakeIntrusive<TYtState>(); TExprContext exprCtx; auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n"}, exprCtx); tree->Left = MakeLeaf({"c"}, {"c"}, 100000, 12333, exprCtx); @@ -105,7 +108,7 @@ Y_UNIT_TEST(BuildOptimizerTree2Tables) { std::shared_ptr<IBaseOptimizerNode> resultTree; std::shared_ptr<IProviderContext> resultCtx; - BuildOptimizerJoinTree(resultTree, resultCtx, tree); + BuildOptimizerJoinTree(state, cluster, resultTree, resultCtx, tree, exprCtx); UNIT_ASSERT(resultTree->Kind == JoinNodeType); auto root = std::static_pointer_cast<TJoinOptimizerNode>(resultTree); @@ -122,6 +125,8 @@ Y_UNIT_TEST(BuildOptimizerTree2Tables) { } Y_UNIT_TEST(BuildOptimizerTree2TablesComplexLabel) { + const TString cluster("ut_cluster"); + TYtState::TPtr state = MakeIntrusive<TYtState>(); TExprContext exprCtx; auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n", "e"}, exprCtx); tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx); @@ -129,7 +134,7 @@ Y_UNIT_TEST(BuildOptimizerTree2TablesComplexLabel) { std::shared_ptr<IBaseOptimizerNode> resultTree; std::shared_ptr<IProviderContext> resultCtx; - BuildOptimizerJoinTree(resultTree, resultCtx, tree); + BuildOptimizerJoinTree(state, cluster, resultTree, resultCtx, tree, exprCtx); UNIT_ASSERT(resultTree->Kind == JoinNodeType); auto root = std::static_pointer_cast<TJoinOptimizerNode>(resultTree); @@ -146,6 +151,8 @@ Y_UNIT_TEST(BuildOptimizerTree2TablesComplexLabel) { } Y_UNIT_TEST(BuildYtJoinTree2Tables) { + const TString cluster("ut_cluster"); + TYtState::TPtr state = MakeIntrusive<TYtState>(); TExprContext exprCtx; auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n"}, exprCtx); tree->Left = MakeLeaf({"c"}, {"c"}, 100000, 12333, exprCtx); @@ -153,7 +160,7 @@ Y_UNIT_TEST(BuildYtJoinTree2Tables) { std::shared_ptr<IBaseOptimizerNode> resultTree; std::shared_ptr<IProviderContext> resultCtx; - BuildOptimizerJoinTree(resultTree, resultCtx, tree); + BuildOptimizerJoinTree(state, cluster, resultTree, resultCtx, tree, exprCtx); auto joinTree = BuildYtJoinTree(resultTree, exprCtx, {}); @@ -161,6 +168,8 @@ Y_UNIT_TEST(BuildYtJoinTree2Tables) { } Y_UNIT_TEST(BuildYtJoinTree2TablesForceMergeJoib) { + const TString cluster("ut_cluster"); + TYtState::TPtr state = MakeIntrusive<TYtState>(); TExprContext exprCtx; auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n"}, exprCtx); tree->Left = MakeLeaf({"c"}, {"c"}, 100000, 12333, exprCtx); @@ -169,7 +178,7 @@ Y_UNIT_TEST(BuildYtJoinTree2TablesForceMergeJoib) { std::shared_ptr<IBaseOptimizerNode> resultTree; std::shared_ptr<IProviderContext> resultCtx; - BuildOptimizerJoinTree(resultTree, resultCtx, tree); + BuildOptimizerJoinTree(state, cluster, resultTree, resultCtx, tree, exprCtx); auto joinTree = BuildYtJoinTree(resultTree, exprCtx, {}); @@ -177,6 +186,8 @@ Y_UNIT_TEST(BuildYtJoinTree2TablesForceMergeJoib) { } Y_UNIT_TEST(BuildYtJoinTree2TablesComplexLabel) { + const TString cluster("ut_cluster"); + TYtState::TPtr state = MakeIntrusive<TYtState>(); TExprContext exprCtx; auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n", "e"}, exprCtx); tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx); @@ -184,7 +195,7 @@ Y_UNIT_TEST(BuildYtJoinTree2TablesComplexLabel) { std::shared_ptr<IBaseOptimizerNode> resultTree; std::shared_ptr<IProviderContext> resultCtx; - BuildOptimizerJoinTree(resultTree, resultCtx, tree); + BuildOptimizerJoinTree(state, cluster, resultTree, resultCtx, tree, exprCtx); auto joinTree = BuildYtJoinTree(resultTree, exprCtx, {}); UNIT_ASSERT(AreSimilarTrees(joinTree, tree)); @@ -192,6 +203,8 @@ Y_UNIT_TEST(BuildYtJoinTree2TablesComplexLabel) { Y_UNIT_TEST(BuildYtJoinTree2TablesTableIn2Rels) { + const TString cluster("ut_cluster"); + TYtState::TPtr state = MakeIntrusive<TYtState>(); TExprContext exprCtx; auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n", "c"}, exprCtx); tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx); @@ -199,7 +212,7 @@ Y_UNIT_TEST(BuildYtJoinTree2TablesTableIn2Rels) std::shared_ptr<IBaseOptimizerNode> resultTree; std::shared_ptr<IProviderContext> resultCtx; - BuildOptimizerJoinTree(resultTree, resultCtx, tree); + BuildOptimizerJoinTree(state, cluster, resultTree, resultCtx, tree, exprCtx); auto joinTree = BuildYtJoinTree(resultTree, exprCtx, {}); UNIT_ASSERT(AreSimilarTrees(joinTree, tree)); @@ -214,6 +227,7 @@ Y_UNIT_TEST(BuildYtJoinTree2TablesTableIn2Rels) } void OrderJoins2Tables(auto optimizerType) { + const TString cluster("ut_cluster"); TExprContext exprCtx; auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n"}, exprCtx); tree->Left = MakeLeaf({"c"}, {"c"}, 100000, 12333, exprCtx); @@ -223,7 +237,7 @@ void OrderJoins2Tables(auto optimizerType) { TTypeAnnotationContext typeCtx; typeCtx.CostBasedOptimizer = optimizerType; state->Types = &typeCtx; - auto optimizedTree = OrderJoins(tree, state, exprCtx, true); + auto optimizedTree = OrderJoins(tree, state, cluster, exprCtx, true); UNIT_ASSERT(optimizedTree != tree); UNIT_ASSERT(optimizedTree->Left); UNIT_ASSERT(optimizedTree->Right); @@ -242,6 +256,7 @@ ADD_TEST(OrderJoins2Tables) void OrderJoins2TablesComplexLabel(auto optimizerType) { + const TString cluster("ut_cluster"); TExprContext exprCtx; auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n", "e"}, exprCtx); tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx); @@ -251,7 +266,7 @@ void OrderJoins2TablesComplexLabel(auto optimizerType) TYtState::TPtr state = MakeIntrusive<TYtState>(); typeCtx.CostBasedOptimizer = optimizerType; state->Types = &typeCtx; - auto optimizedTree = OrderJoins(tree, state, exprCtx, true); + auto optimizedTree = OrderJoins(tree, state, cluster, exprCtx, true); UNIT_ASSERT(optimizedTree != tree); } @@ -259,6 +274,7 @@ ADD_TEST(OrderJoins2TablesComplexLabel) void OrderJoins2TablesTableIn2Rels(auto optimizerType) { + const TString cluster("ut_cluster"); TExprContext exprCtx; auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n", "e"}, exprCtx); tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx); @@ -268,7 +284,7 @@ void OrderJoins2TablesTableIn2Rels(auto optimizerType) TYtState::TPtr state = MakeIntrusive<TYtState>(); typeCtx.CostBasedOptimizer = optimizerType; state->Types = &typeCtx; - auto optimizedTree = OrderJoins(tree, state, exprCtx, true); + auto optimizedTree = OrderJoins(tree, state, cluster, exprCtx, true); UNIT_ASSERT(optimizedTree != tree); } @@ -276,6 +292,7 @@ ADD_TEST(OrderJoins2TablesTableIn2Rels) Y_UNIT_TEST(OrderLeftJoin) { + const TString cluster("ut_cluster"); TExprContext exprCtx; auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n"}, exprCtx); tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx); @@ -286,13 +303,14 @@ Y_UNIT_TEST(OrderLeftJoin) TYtState::TPtr state = MakeIntrusive<TYtState>(); typeCtx.CostBasedOptimizer = ECostBasedOptimizerType::PG; state->Types = &typeCtx; - auto optimizedTree = OrderJoins(tree, state, exprCtx, true); + auto optimizedTree = OrderJoins(tree, state, cluster, exprCtx, true); UNIT_ASSERT(optimizedTree != tree); UNIT_ASSERT_STRINGS_EQUAL("Left", optimizedTree->JoinKind->Content()); } Y_UNIT_TEST(UnsupportedJoin) { + const TString cluster("ut_cluster"); TExprContext exprCtx; auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n"}, exprCtx); tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx); @@ -303,11 +321,12 @@ Y_UNIT_TEST(UnsupportedJoin) TYtState::TPtr state = MakeIntrusive<TYtState>(); typeCtx.CostBasedOptimizer = ECostBasedOptimizerType::PG; state->Types = &typeCtx; - auto optimizedTree = OrderJoins(tree, state, exprCtx, true); + auto optimizedTree = OrderJoins(tree, state, cluster, exprCtx, true); UNIT_ASSERT(optimizedTree == tree); } Y_UNIT_TEST(OrderJoinSinglePass) { + const TString cluster("ut_cluster"); TExprContext exprCtx; auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n"}, exprCtx); tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx); @@ -318,12 +337,13 @@ Y_UNIT_TEST(OrderJoinSinglePass) { TYtState::TPtr state = MakeIntrusive<TYtState>(); typeCtx.CostBasedOptimizer = ECostBasedOptimizerType::PG; state->Types = &typeCtx; - auto optimizedTree = OrderJoins(tree, state, exprCtx, true); + auto optimizedTree = OrderJoins(tree, state, cluster, exprCtx, true); UNIT_ASSERT(optimizedTree != tree); UNIT_ASSERT(optimizedTree->CostBasedOptPassed); } Y_UNIT_TEST(OrderJoinsDoesNothingWhenCBOAlreadyPassed) { + const TString cluster("ut_cluster"); TExprContext exprCtx; auto tree = MakeOp({"c", "c_nationkey"}, {"n", "n_nationkey"}, {"c", "n"}, exprCtx); tree->Left = MakeLeaf({"c"}, {"c"}, 1000000, 1233333, exprCtx); @@ -335,7 +355,7 @@ Y_UNIT_TEST(OrderJoinsDoesNothingWhenCBOAlreadyPassed) { TYtState::TPtr state = MakeIntrusive<TYtState>(); typeCtx.CostBasedOptimizer = ECostBasedOptimizerType::PG; state->Types = &typeCtx; - auto optimizedTree = OrderJoins(tree, state, exprCtx, true); + auto optimizedTree = OrderJoins(tree, state, cluster, exprCtx, true); UNIT_ASSERT(optimizedTree == tree); } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h b/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h index d6cc076e81..98f0ba50f0 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h @@ -515,10 +515,16 @@ public: OPTION_FIELD(TString, Cluster) OPTION_FIELD(TVector<TPathStatReq>, Paths) OPTION_FIELD(TYtSettings::TConstPtr, Config) + OPTION_FIELD_DEFAULT(bool, Extended, false) }; struct TPathStatResult: public NCommon::TOperationResult { + struct TExtendedResult { + THashMap<TString, i64> DataWeight; + THashMap<TString, ui64> EstimatedUniqueCounts; + }; TVector<ui64> DataSize; + TVector<TMaybe<TExtendedResult>> Extended; }; struct TFullResultTableOptions : public TCommonOptions { @@ -623,4 +629,4 @@ public: virtual void AddCluster(const TYtClusterConfig& cluster) = 0; }; -}
\ No newline at end of file +} diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp index cc87cca423..e9b00bc31e 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp @@ -240,10 +240,11 @@ IGraphTransformer::TStatus TryEstimateDataSizeChecked(TVector<ui64>& result, TYt TSet<TString> requestedColumns; auto status = TryEstimateDataSize(result, requestedColumns, cluster, paths, columns, state, ctx); + auto settings = inputSection.Settings().Ptr(); if (status == TStatus::Repeat) { bool hasStatColumns = NYql::HasSetting(inputSection.Settings().Ref(), EYtSettingType::StatColumns); if (hasStatColumns) { - auto oldColumns = NYql::GetSettingAsColumnList(inputSection.Settings().Ref(), EYtSettingType::StatColumns); + auto oldColumns = NYql::GetSettingAsColumnList(*settings, EYtSettingType::StatColumns); TSet<TString> oldColumnSet(oldColumns.begin(), oldColumns.end()); bool alreadyRequested = AllOf(requestedColumns, [&](const auto& c) { @@ -251,6 +252,8 @@ IGraphTransformer::TStatus TryEstimateDataSizeChecked(TVector<ui64>& result, TYt }); YQL_ENSURE(!alreadyRequested); + + settings = NYql::RemoveSetting(*settings, EYtSettingType::StatColumns, ctx); } YQL_CLOG(INFO, ProviderYt) << "Stat missing for columns: " << JoinSeq(", ", requestedColumns) << ", rebuilding section"; @@ -258,7 +261,7 @@ IGraphTransformer::TStatus TryEstimateDataSizeChecked(TVector<ui64>& result, TYt inputSection = Build<TYtSection>(ctx, inputSection.Ref().Pos()) .InitFrom(inputSection) - .Settings(NYql::AddSettingAsColumnList(inputSection.Settings().Ref(), EYtSettingType::StatColumns, requestedColumnList, ctx)) + .Settings(NYql::AddSettingAsColumnList(*settings, EYtSettingType::StatColumns, requestedColumnList, ctx)) .Done(); } return status; @@ -2906,7 +2909,8 @@ TStatus CollectPathsAndLabels(TVector<TYtPathInfo::TPtr>& tables, TJoinLabels& l return TStatus::Ok; } -TStatus CollectPathsAndLabelsReady(bool& ready, TVector<TYtPathInfo::TPtr>& tables, TJoinLabels& labels, +IGraphTransformer::TStatus CollectPathsAndLabelsReady( + bool& ready, TVector<TYtPathInfo::TPtr>& tables, TJoinLabels& labels, const TStructExprType*& itemType, const TStructExprType*& itemTypeBeforePremap, const TYtJoinNodeLeaf& leaf, TExprContext& ctx) { @@ -4682,6 +4686,111 @@ TYtJoinNodeOp::TPtr ImportYtEquiJoin(TYtEquiJoin equiJoin, TExprContext& ctx) { return root; } +IGraphTransformer::TStatus CollectCboStatsLeaf( + const THashMap<TString, THashSet<TString>>& relJoinColumns, + const TString& cluster, + TYtJoinNodeLeaf& leaf, + const TYtState::TPtr& state, + TExprContext& ctx) { + + const TMaybe<ui64> maxChunkCountExtendedStats = state->Configuration->ExtendedStatsMaxChunkCount.Get(); + if (!maxChunkCountExtendedStats) { + return TStatus::Ok; + } + + TVector<TString> keyList; + auto columnsPos = relJoinColumns.find(JoinLeafLabel(leaf.Label)); + if (columnsPos != relJoinColumns.end()) { + keyList.assign(columnsPos->second.begin(), columnsPos->second.end()); + } + TVector<IYtGateway::TPathStatReq> pathStatReqs; + + ui64 sectionChunkCount = 0; + TVector<TString> requestedColumnList; + for (auto path: leaf.Section.Paths()) { + auto pathInfo = MakeIntrusive<TYtPathInfo>(path); + sectionChunkCount += pathInfo->Table->Stat->ChunkCount; + + auto ytPath = BuildYtPathForStatRequest(cluster, *pathInfo, keyList, *state, ctx); + + if (!ytPath) { + return IGraphTransformer::TStatus::Error; + } + + pathStatReqs.push_back( + IYtGateway::TPathStatReq() + .Path(*ytPath) + .IsTemp(pathInfo->Table->IsTemp) + .IsAnonymous(pathInfo->Table->IsAnonymous) + .Epoch(pathInfo->Table->Epoch.GetOrElse(0))); + + std::copy(ytPath->Columns_->Parts_.begin(), ytPath->Columns_->Parts_.end(), std::back_inserter(requestedColumnList)); + } + + if ((*maxChunkCountExtendedStats != 0 && sectionChunkCount > *maxChunkCountExtendedStats) + || !pathStatReqs) { + return TStatus::Ok; + } + + IYtGateway::TPathStatOptions pathStatOptions = + IYtGateway::TPathStatOptions(state->SessionId) + .Cluster(cluster) + .Paths(pathStatReqs) + .Config(state->Configuration->Snapshot()) + .Extended(true); + + IYtGateway::TPathStatResult pathStats = state->Gateway->TryPathStat(std::move(pathStatOptions)); + + if (pathStats.Success()) { + return TStatus::Ok; + } + std::sort(requestedColumnList.begin(), requestedColumnList.end()); + requestedColumnList.erase(std::unique(requestedColumnList.begin(), requestedColumnList.end()), + requestedColumnList.end()); + leaf.Section = Build<TYtSection>(ctx, leaf.Section.Ref().Pos()) + .InitFrom(leaf.Section) + .Settings(NYql::AddSettingAsColumnList(leaf.Section.Settings().Ref(), EYtSettingType::StatColumns, requestedColumnList, ctx)) + .Done(); + return TStatus::Repeat; +} + +void AddJoinColumns(THashMap<TString, THashSet<TString>>& relJoinColumns, const TYtJoinNodeOp& op) { + for (ui32 i = 0; i < op.LeftLabel->ChildrenSize(); i += 2) { + auto ltable = op.LeftLabel->Child(i)->Content(); + auto lcolumn = op.LeftLabel->Child(i + 1)->Content(); + auto rtable = op.RightLabel->Child(i)->Content(); + auto rcolumn = op.RightLabel->Child(i + 1)->Content(); + + relJoinColumns[TString(ltable)].insert(TString(lcolumn)); + relJoinColumns[TString(rtable)].insert(TString(rcolumn)); + } +} + +IGraphTransformer::TStatus CollectCboStatsNode(THashMap<TString, THashSet<TString>>& relJoinColumns, const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) { + IGraphTransformer::TStatus result = TStatus::Ok; + TYtJoinNodeLeaf* leftLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op.Left.Get()); + TYtJoinNodeLeaf* rightLeaf = dynamic_cast<TYtJoinNodeLeaf*>(op.Right.Get()); + AddJoinColumns(relJoinColumns, op); + if (leftLeaf) { + result = result.Combine(CollectCboStatsLeaf(relJoinColumns, cluster, *leftLeaf, state, ctx)); + } else { + auto& leftOp = *dynamic_cast<TYtJoinNodeOp*>(op.Left.Get()); + result = result.Combine(CollectCboStatsNode(relJoinColumns, cluster, leftOp, state, ctx)); + } + if (rightLeaf) { + result = result.Combine(CollectCboStatsLeaf(relJoinColumns, cluster, *rightLeaf, state, ctx)); + } else { + auto& rightOp = *dynamic_cast<TYtJoinNodeOp*>(op.Right.Get()); + result = result.Combine(CollectCboStatsNode(relJoinColumns, cluster, rightOp, state, ctx)); + } + return result; +} + +IGraphTransformer::TStatus CollectCboStats(const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) { + THashMap<TString, THashSet<TString>> relJoinColumns; + return CollectCboStatsNode(relJoinColumns, cluster, op, state, ctx); +} + IGraphTransformer::TStatus RewriteYtEquiJoin(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx) { switch (RewriteYtEquiJoinStar(equiJoin, op, state, ctx)) { case EStarRewriteStatus::Error: @@ -4790,4 +4899,19 @@ TMaybeNode<TExprBase> ExportYtEquiJoin(TYtEquiJoin equiJoin, const TYtJoinNodeOp return TExprBase(ctx.ChangeChildren(join.Ref(), std::move(children))); } +TString JoinLeafLabel(TExprNode::TPtr label) { + if (label->ChildrenSize() == 0) { + return TString(label->Content()); + } + TString result; + for (ui32 i = 0; i < label->ChildrenSize(); ++i) { + result += label->Child(i)->Content(); + if (i+1 != label->ChildrenSize()) { + result += ","; + } + } + + return result; +} + } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.h b/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.h index 366209de18..e448690c4f 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.h @@ -62,15 +62,19 @@ struct TYtJoinNodeOp : TYtJoinNode { }; TYtJoinNodeOp::TPtr ImportYtEquiJoin(TYtEquiJoin equiJoin, TExprContext& ctx); + +IGraphTransformer::TStatus CollectCboStats(const TString& cluster, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx); + IGraphTransformer::TStatus RewriteYtEquiJoinLeaves(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx); IGraphTransformer::TStatus RewriteYtEquiJoin(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, const TYtState::TPtr& state, TExprContext& ctx); TMaybeNode<TExprBase> ExportYtEquiJoin(TYtEquiJoin equiJoin, const TYtJoinNodeOp& op, TExprContext& ctx, const TYtState::TPtr& state); -TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, TExprContext& ctx, bool debug = false); +TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, const TString& cluster, TExprContext& ctx, bool debug = false); +TString JoinLeafLabel(TExprNode::TPtr label); struct IBaseOptimizerNode; struct IProviderContext; -void BuildOptimizerJoinTree(std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& ctx, TYtJoinNodeOp::TPtr op); +void BuildOptimizerJoinTree(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TYtJoinNodeOp::TPtr op, TExprContext& ctx); TYtJoinNode::TPtr BuildYtJoinTree(std::shared_ptr<IBaseOptimizerNode> node, TExprContext& ctx, TPositionHandle pos); bool AreSimilarTrees(TYtJoinNode::TPtr node1, TYtJoinNode::TPtr node2); diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp index 53e8f50e46..9721fbe2a2 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp @@ -1,12 +1,16 @@ #include "yql_yt_join_impl.h" #include "yql_yt_helpers.h" +#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h> +#include <ydb/library/yql/core/yql_graph_transformer.h> +#include <ydb/library/yql/dq/opt/dq_opt_log.h> #include <ydb/library/yql/parser/pg_wrapper/interface/optimizer.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> +#include <ydb/library/yql/providers/yt/opt/yql_yt_join.h> +#include <ydb/library/yql/providers/yt/provider/yql_yt_provider_context.h> #include <ydb/library/yql/utils/log/log.h> -#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h> -#include <ydb/library/yql/dq/opt/dq_opt_log.h> +#include <yt/cpp/mapreduce/common/helpers.h> namespace NYql { @@ -59,10 +63,12 @@ public: TJoinReorderer( TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, + const TString& cluster, TExprContext& ctx, bool debug = false) : Root(op) , State(state) + , Cluster(cluster) , Ctx(ctx) , Debug(debug) { @@ -75,9 +81,9 @@ public: TYtJoinNodeOp::TPtr Do() { std::shared_ptr<IBaseOptimizerNode> tree; - std::shared_ptr<IProviderContext> ctx; - BuildOptimizerJoinTree(tree, ctx, Root); - auto ytCtx = std::static_pointer_cast<TYtProviderContext>(ctx); + std::shared_ptr<IProviderContext> providerCtx; + BuildOptimizerJoinTree(State, Cluster, tree, providerCtx, Root, Ctx); + auto ytCtx = std::static_pointer_cast<TYtProviderContext>(providerCtx); std::function<void(const TString& str)> log; @@ -93,14 +99,14 @@ public: YQL_CLOG(ERROR, ProviderYt) << "PG CBO does not support link settings"; return Root; } - opt = std::unique_ptr<IOptimizerNew>(MakePgOptimizerNew(*ctx, Ctx, log)); + opt = std::unique_ptr<IOptimizerNew>(MakePgOptimizerNew(*providerCtx, Ctx, log)); break; case ECostBasedOptimizerType::Native: if (ytCtx->HasHints) { YQL_CLOG(ERROR, ProviderYt) << "Native CBO does not suppor link hints"; return Root; } - opt = std::unique_ptr<IOptimizerNew>(NDq::MakeNativeOptimizerNew(*ctx, 100000)); + opt = std::unique_ptr<IOptimizerNew>(NDq::MakeNativeOptimizerNew(*providerCtx, 100000)); break; default: YQL_CLOG(ERROR, ProviderYt) << "Unknown optimizer type " << ToString(State->Types->CostBasedOptimizer); @@ -137,6 +143,7 @@ public: private: TYtJoinNodeOp::TPtr Root; const TYtState::TPtr& State; + TString Cluster; TExprContext& Ctx; bool Debug; }; @@ -169,16 +176,19 @@ public: class TOptimizerTreeBuilder { public: - TOptimizerTreeBuilder(std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& ctx, TYtJoinNodeOp::TPtr inputTree) - : Tree(tree) - , OutCtx(ctx) + TOptimizerTreeBuilder(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TYtJoinNodeOp::TPtr inputTree, TExprContext& ctx) + : State(state) + , Cluster(cluster) + , Tree(tree) + , OutProviderCtx(providerCtx) , InputTree(inputTree) + , Ctx(ctx) { } void Do() { - Ctx = std::make_shared<TYtProviderContext>(); + ProviderCtx = std::make_shared<TYtProviderContext>(); Tree = ProcessNode(InputTree); - OutCtx = Ctx; + OutProviderCtx = ProviderCtx; } private: @@ -195,8 +205,6 @@ private: std::shared_ptr<IBaseOptimizerNode> OnOp(TYtJoinNodeOp* op) { auto joinKind = ConvertToJoinKind(TString(op->JoinKind->Content())); - auto left = ProcessNode(op->Left); - auto right = ProcessNode(op->Right); YQL_ENSURE(op->LeftLabel->ChildrenSize() == op->RightLabel->ChildrenSize()); std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>> joinConditions; for (ui32 i = 0; i < op->LeftLabel->ChildrenSize(); i += 2) { @@ -204,13 +212,17 @@ private: auto lcolumn = op->LeftLabel->Child(i + 1)->Content(); auto rtable = op->RightLabel->Child(i)->Content(); auto rcolumn = op->RightLabel->Child(i + 1)->Content(); + AddRelJoinColumn(TString(ltable), TString(lcolumn)); + AddRelJoinColumn(TString(rtable), TString(rcolumn)); NDq::TJoinColumn lcol{TString(ltable), TString(lcolumn)}; NDq::TJoinColumn rcol{TString(rtable), TString(rcolumn)}; joinConditions.insert({lcol, rcol}); } + auto left = ProcessNode(op->Left); + auto right = ProcessNode(op->Right); bool nonReorderable = op->LinkSettings.ForceSortedMerge; - Ctx->HasForceSortedMerge = Ctx->HasForceSortedMerge || op->LinkSettings.ForceSortedMerge; - Ctx->HasHints = Ctx->HasHints || !op->LinkSettings.LeftHints.empty() || !op->LinkSettings.RightHints.empty(); + ProviderCtx->HasForceSortedMerge = ProviderCtx->HasForceSortedMerge || op->LinkSettings.ForceSortedMerge; + ProviderCtx->HasHints = ProviderCtx->HasHints || !op->LinkSettings.LeftHints.empty() || !op->LinkSettings.RightHints.empty(); return std::make_shared<TYtJoinOptimizerNode>( left, right, joinConditions, joinKind, EJoinAlgoType::GraceJoin, nonReorderable ? op : nullptr @@ -218,20 +230,25 @@ private: } std::shared_ptr<IBaseOptimizerNode> OnLeaf(TYtJoinNodeLeaf* leaf) { - TString label; - if (leaf->Label->ChildrenSize() == 0) { - label = leaf->Label->Content(); - } else { - for (ui32 i = 0; i < leaf->Label->ChildrenSize(); ++i) { - label += leaf->Label->Child(i)->Content(); - if (i+1 != leaf->Label->ChildrenSize()) { - label += ","; - } - } + TString label = JoinLeafLabel(leaf->Label); + + const TMaybe<ui64> maxChunkCountExtendedStats = State->Configuration->ExtendedStatsMaxChunkCount.Get(); + bool enableExtendedStats = maxChunkCountExtendedStats.Defined(); + + TVector<TString> keyList; + auto joinColumns = RelJoinColumns.find(label); + if (joinColumns != RelJoinColumns.end()) { + keyList.assign(joinColumns->second.begin(), joinColumns->second.end()); } TYtSection section{leaf->Section}; auto stat = std::make_shared<TOptimizerStatistics>(); + stat->ColumnStatistics = TIntrusivePtr<TOptimizerStatistics::TColumnStatMap>( + new TOptimizerStatistics::TColumnStatMap()); + auto providerStats = std::make_unique<TYtProviderStatistic>(); + TVector<IYtGateway::TPathStatReq> pathStatReqs; + ui64 totalChunkCount = 0; + if (Y_UNLIKELY(!section.Settings().Empty()) && Y_UNLIKELY(section.Settings().Item(0).Name() == "Test")) { for (const auto& setting : section.Settings()) { if (setting.Name() == "Rows") { @@ -245,19 +262,80 @@ private: auto tableStat = TYtTableBaseInfo::GetStat(path.Table()); stat->Cost += tableStat->DataSize; stat->Nrows += tableStat->RecordsCount; + totalChunkCount += tableStat->ChunkCount; + + auto pathInfo = MakeIntrusive<TYtPathInfo>(path); + auto ytPath = BuildYtPathForStatRequest(Cluster, *pathInfo, keyList, *State, Ctx); + YQL_ENSURE(ytPath); + + if (enableExtendedStats) { + pathStatReqs.push_back( + IYtGateway::TPathStatReq() + .Path(*ytPath) + .IsTemp(pathInfo->Table->IsTemp) + .IsAnonymous(pathInfo->Table->IsAnonymous) + .Epoch(pathInfo->Table->Epoch.GetOrElse(0))); + } + } + auto sorted = section.Ref().GetConstraint<TSortedConstraintNode>(); + if (sorted) { + TVector<TString> key; + for (const auto& item : sorted->GetContent()) { + for (const auto& path : item.first) { + const auto& column = path.front(); + key.push_back(TString(column)); + } + } + providerStats->SortColumns = key; } } + if (!pathStatReqs.empty() && + (*maxChunkCountExtendedStats == 0 || totalChunkCount <= *maxChunkCountExtendedStats)) { + IYtGateway::TPathStatOptions pathStatOptions = + IYtGateway::TPathStatOptions(State->SessionId) + .Cluster(Cluster) + .Paths(pathStatReqs) + .Config(State->Configuration->Snapshot()) + .Extended(true); + + IYtGateway::TPathStatResult pathStats = State->Gateway->TryPathStat(std::move(pathStatOptions)); + if (pathStats.Success()) { + for (const auto& extended : pathStats.Extended) { + if (!extended) { + continue; + } + for (const auto& entry : extended->EstimatedUniqueCounts) { + stat->ColumnStatistics->Data[entry.first].NumUniqueVals = entry.second; + } + for (const auto& entry : extended->DataWeight) { + providerStats->ColumnStatistics[entry.first] = { + .DataWeight = entry.second + }; + } + } + } + } + + stat->Specific = std::unique_ptr<const IProviderStatistics>(providerStats.release()); return std::make_shared<TYtRelOptimizerNode>( std::move(label), std::move(stat), leaf ); } - std::shared_ptr<IBaseOptimizerNode>& Tree; - std::shared_ptr<TYtProviderContext> Ctx; - std::shared_ptr<IProviderContext>& OutCtx; + void AddRelJoinColumn(const TString& rtable, const TString& rcolumn) { + auto entry = RelJoinColumns.insert(std::make_pair(rtable, THashSet<TString>{})); + entry.first->second.insert(rcolumn); + } + TYtState::TPtr State; + const TString Cluster; + std::shared_ptr<IBaseOptimizerNode>& Tree; + std::shared_ptr<TYtProviderContext> ProviderCtx; + std::shared_ptr<IProviderContext>& OutProviderCtx; + THashMap<TString, THashSet<TString>> RelJoinColumns; TYtJoinNodeOp::TPtr InputTree; + TExprContext& Ctx; }; TYtJoinNode::TPtr BuildYtJoinTree(std::shared_ptr<IBaseOptimizerNode> node, TVector<TString>& scope, TExprContext& ctx, TPositionHandle pos) { @@ -330,9 +408,9 @@ bool AreSimilarTrees(TYtJoinNode::TPtr node1, TYtJoinNode::TPtr node2) { } } -void BuildOptimizerJoinTree(std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& ctx, TYtJoinNodeOp::TPtr op) +void BuildOptimizerJoinTree(TYtState::TPtr state, const TString& cluster, std::shared_ptr<IBaseOptimizerNode>& tree, std::shared_ptr<IProviderContext>& providerCtx, TYtJoinNodeOp::TPtr op, TExprContext& ctx) { - TOptimizerTreeBuilder(tree, ctx, op).Do(); + TOptimizerTreeBuilder(state, cluster, tree, providerCtx, op, ctx).Do(); } TYtJoinNode::TPtr BuildYtJoinTree(std::shared_ptr<IBaseOptimizerNode> node, TExprContext& ctx, TPositionHandle pos) { @@ -340,13 +418,13 @@ TYtJoinNode::TPtr BuildYtJoinTree(std::shared_ptr<IBaseOptimizerNode> node, TExp return BuildYtJoinTree(node, scope, ctx, pos); } -TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, TExprContext& ctx, bool debug) +TYtJoinNodeOp::TPtr OrderJoins(TYtJoinNodeOp::TPtr op, const TYtState::TPtr& state, const TString& cluster, TExprContext& ctx, bool debug) { if (state->Types->CostBasedOptimizer == ECostBasedOptimizerType::Disable || op->CostBasedOptPassed) { return op; } - auto result = TJoinReorderer(op, state, ctx, debug).Do(); + auto result = TJoinReorderer(op, state, cluster, ctx, debug).Do(); if (!debug && AreSimilarTrees(result, op)) { return op; } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp index c7837d8e83..8c9ab6ea03 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_load_columnar_stats.cpp @@ -69,6 +69,7 @@ private: bool hasError = false; TNodeOnNodeOwnedMap sectionRewrites; VisitExpr(input, [this, &pathStatArgs, &hasError, §ionRewrites, &ctx](const TExprNode::TPtr& node) { + const TMaybe<ui64> maxChunkCountExtendedStats = State_->Configuration->ExtendedStatsMaxChunkCount.Get(); if (auto maybeSection = TMaybeNode<TYtSection>(node)) { TYtSection section = maybeSection.Cast(); if (NYql::HasSetting(section.Settings().Ref(), EYtSettingType::StatColumns)) { @@ -77,6 +78,7 @@ private: TMaybe<TString> cluster; TVector<IYtGateway::TPathStatReq> pathStatReqs; size_t idx = 0; + ui64 totalChunkCount = 0; for (auto path: section.Paths()) { bool hasStat = false; if (path.Table().Maybe<TYtTable>().Stat().Maybe<TYtStat>()) { @@ -107,6 +109,7 @@ private: TYtPathInfo pathInfo(path); YQL_ENSURE(pathInfo.Table->Stat); + totalChunkCount += pathInfo.Table->Stat->ChunkCount; TString currCluster; if (auto ytTable = path.Table().Maybe<TYtTable>()) { @@ -139,11 +142,15 @@ private: ++idx; } + bool requestExtendedStats = maxChunkCountExtendedStats && + (*maxChunkCountExtendedStats == 0 || totalChunkCount <= *maxChunkCountExtendedStats); + if (pathStatReqs) { auto pathStatOptions = IYtGateway::TPathStatOptions(State_->SessionId) .Cluster(*cluster) .Paths(pathStatReqs) - .Config(State_->Configuration->Snapshot()); + .Config(State_->Configuration->Snapshot()) + .Extended(requestExtendedStats); auto tryResult = State_->Gateway->TryPathStat(IYtGateway::TPathStatOptions(pathStatOptions)); if (!tryResult.Success()) { diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider_context.h b/ydb/library/yql/providers/yt/provider/yql_yt_provider_context.h new file mode 100644 index 0000000000..15c1b4d456 --- /dev/null +++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider_context.h @@ -0,0 +1,17 @@ +#pragma once + +#include <ydb/library/yql/core/yql_statistics.h> + +namespace NYql { + +struct TYtColumnStatistic { + int64_t DataWeight; +}; + +class TYtProviderStatistic : public IProviderStatistics { +public: + std::unordered_map<TString, TYtColumnStatistic> ColumnStatistics; + TVector<TString> SortColumns; +}; + +} // namespace NYql |