diff options
author | mrlolthe1st <mrlolthe1st@yandex-team.com> | 2023-08-10 12:55:42 +0300 |
---|---|---|
committer | mrlolthe1st <mrlolthe1st@yandex-team.com> | 2023-08-10 13:55:55 +0300 |
commit | 6e87e8ed0eb05aba6f3a7bf39cbba277e56c348a (patch) | |
tree | 2d3005f99f0d7ce618a225f305450b73c9493773 | |
parent | 3ab959a0df5defdbcc427cb1484dabc5a0e00321 (diff) | |
download | ydb-6e87e8ed0eb05aba6f3a7bf39cbba277e56c348a.tar.gz |
YQL-16280: Make fast way check before estimating stats
11 files changed, 135 insertions, 103 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp index c2f5a31f4d6..0a80aecb5cc 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp @@ -339,7 +339,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T && dqIntegration->EstimateReadSize( TDqSettings::TDefault::DataSizePerJob, TDqSettings::TDefault::MaxTasksPerStage, - *node.Ptr(), + {node.Raw()}, ctx)) { txRes.Ops.insert(node.Raw()); diff --git a/ydb/library/yql/dq/integration/yql_dq_integration.h b/ydb/library/yql/dq/integration/yql_dq_integration.h index b528faaa828..b34880f6378 100644 --- a/ydb/library/yql/dq/integration/yql_dq_integration.h +++ b/ydb/library/yql/dq/integration/yql_dq_integration.h @@ -42,7 +42,7 @@ public: TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) = 0; virtual bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues = false) { Y_UNUSED(skipIssues); Y_UNUSED(node); Y_UNUSED(ctx); return true; } virtual bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues = true) = 0; - virtual TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& node, TExprContext& ctx) = 0; + virtual TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) = 0; virtual TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) = 0; // Nothing if callable is not for writing, diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp index 8a44248f694..2a5e7ec832a 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp @@ -26,8 +26,8 @@ public: return TClReadTable::Match(&read); } - TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext&) override { - if (TClReadTable::Match(&read)) { + TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TVector<const TExprNode*>& read, TExprContext&) override { + if (AllOf(read, [](const auto val) { return TClReadTable::Match(val); })) { return 0ul; // TODO: return real size } return Nothing(); diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp index 9ba72f7a867..451ecf606c4 100644 --- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp +++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp @@ -11,7 +11,7 @@ bool TDqIntegrationBase::CanRead(const TExprNode&, TExprContext&, bool) { return false; } -TMaybe<ui64> TDqIntegrationBase::EstimateReadSize(ui64, ui32, const TExprNode &, TExprContext&) { +TMaybe<ui64> TDqIntegrationBase::EstimateReadSize(ui64, ui32, const TVector<const TExprNode*> &, TExprContext&) { return Nothing(); } diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h index abbc412d085..714aa124dc3 100644 --- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h +++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h @@ -9,7 +9,7 @@ public: ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node, TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) override; bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues) override; - TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& node, TExprContext& ctx) override; + TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) override; TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) override; void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override; TMaybe<bool> CanWrite(const TExprNode& write, TExprContext& ctx) override; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp index 49f8a5fe79e..eb1f5c14e1e 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp @@ -40,12 +40,11 @@ private: [](const TExprNode::TPtr& n) { return !TDqConnection::Match(n.Get()) && !TDqPhyPrecompute::Match(n.Get()) && !TDqReadWrapBase::Match(n.Get()); }, - [&hasErrors, &ctx = Ctx_, &dataSize = DataSize_, &typeCtx = TypeCtx_, &state = State_](const TExprNode::TPtr& n) { + [&readPerProvider_ = ReadsPerProvider_, &hasErrors, &ctx = Ctx_, &typeCtx = TypeCtx_](const TExprNode::TPtr& n) { if (TCoScriptUdf::Match(n.Get()) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(n->Head().Content()))) { ReportError(ctx, *n, TStringBuilder() << "Cannot execute system python udf " << n->Content() << " in DQ"); hasErrors = true; } - if (!typeCtx.ForceDq && TDqReadWrapBase::Match(n.Get())) { auto readNode = n->Child(0); auto dataSourceName = readNode->Child(1)->Child(0)->Content(); @@ -54,13 +53,7 @@ private: YQL_ENSURE(datasource); auto dqIntegration = (*datasource)->GetDqIntegration(); YQL_ENSURE(dqIntegration); - if (dqIntegration) { - TMaybe<ui64> size; - hasErrors |= !(size = dqIntegration->EstimateReadSize(state->Settings->DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob), state->Settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage), *readNode, ctx)); - if (size) { - dataSize += *size; - } - } + readPerProvider_[dqIntegration].push_back(readNode); } } return !hasErrors; @@ -153,9 +146,22 @@ public: }); bool hasError = false; - + for (const auto n: dqNodes) { hasError |= !ValidateDqNode(*n); + if (hasError) { + break; + } + } + + for (auto& [integration, nodes]: ReadsPerProvider_) { + TMaybe<ui64> size; + hasError |= !(size = integration->EstimateReadSize(State_->Settings->DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob), + State_->Settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage), nodes, Ctx_)); + if (hasError) { + break; + } + DataSize_ += *size; } if (!hasError && hasJoin && DataSize_ > State_->Settings->MaxDataSizePerQuery.Get().GetOrElse(10_GB)) { @@ -169,6 +175,7 @@ private: const TTypeAnnotationContext& TypeCtx_; TExprContext& Ctx_; TNodeSet Visited_; + THashMap<IDqIntegration*, TVector<const TExprNode*>> ReadsPerProvider_; size_t DataSize_ = 0; const TDqState::TPtr State_; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp index 623bcad3fe6..af5325d7d56 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp @@ -29,9 +29,9 @@ namespace NYql { return TGenReadTable::Match(&read); } - TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, + TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TVector<const TExprNode*>& read, TExprContext&) override { - if (TGenReadTable::Match(&read)) { + if (AllOf(read, [](const auto val) { return TGenReadTable::Match(val); })) { return 0ul; // TODO: return real size } return Nothing(); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index d98ab61154c..fe2e309de93 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -123,11 +123,10 @@ public: return TS3ReadObject::Match(&read); } - TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext&) override { - if (TS3ReadObject::Match(&read)) { + TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TVector<const TExprNode*>& read, TExprContext&) override { + if (AllOf(read, [](const auto val) { return TS3ReadObject::Match(val); })) { return 0ul; // TODO: return real size } - return Nothing(); } diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp index cb82aec4f53..04c03156e66 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp @@ -78,7 +78,7 @@ public: YQL_ENSURE(false, "Unimplemented"); } - TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode&, TExprContext&) override { + TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TVector<const TExprNode*>&, TExprContext&) override { YQL_ENSURE(false, "Unimplemented"); } diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp index 6e887fb99c2..cea1b0aaac5 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp @@ -73,11 +73,10 @@ public: return TYdbReadTable::Match(&read); } - TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext& ) override { - if (TYdbReadTable::Match(&read)) { + TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TVector<const TExprNode*>& read, TExprContext&) override { + if (AllOf(read, [](const auto val) { return TYdbReadTable::Match(val); })) { return 0ul; // TODO: return real size } - return Nothing(); } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp index 78247608b41..a8b661b22d8 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -42,37 +42,42 @@ public: { } - TVector<TMaybe<TVector<ui64>>> EstimateColumnStats(TExprContext& ctx, const TString& cluster, const TVector<TVector<TYtPathInfo::TPtr>>& groupIdPathInfos, ui64& sumAllTableSizes) { - TVector<TMaybe<TVector<ui64>>> groupIdColumnarStats; + TVector<TVector<ui64>> EstimateColumnStats(TExprContext& ctx, const TString& cluster, const TVector<TVector<TYtPathInfo::TPtr>>& groupIdPathInfos, ui64& sumAllTableSizes) { + TVector<TVector<ui64>> groupIdColumnarStats; + groupIdColumnarStats.reserve(groupIdPathInfos.size()); + TVector<bool> lookupsInfo; + TVector<TYtPathInfo::TPtr> flattenPaths; for (const auto& pathInfos: groupIdPathInfos) { - TVector<TString> columns; - bool hasLookups = false; for (const auto& pathInfo: pathInfos) { - if (const auto cols = pathInfo->Columns) { - if (columns.empty() && cols->GetColumns()) { - for (auto& column : *cols->GetColumns()) { - columns.push_back(column.Name); - } - } - - if (pathInfo->Table->Meta && pathInfo->Table->Meta->Attrs.Value("optimize_for", "scan") == "lookup") { - hasLookups = true; - } + auto hasLookup = pathInfo->Table->Meta && pathInfo->Table->Meta->Attrs.Value("optimize_for", "scan") == "lookup"; + lookupsInfo.push_back(hasLookup); + if (!pathInfo->Table->Stat) { + continue; } + if (hasLookup) { + continue; + } + flattenPaths.push_back(pathInfo); } - - TMaybe<TVector<ui64>> columnarStat; - - if (!columns.empty() && !hasLookups) { - columnarStat = EstimateDataSize(cluster, pathInfos, {columns}, *State_, ctx); - } - - groupIdColumnarStats.push_back(columnarStat); - for (const auto& [pathId, path] : Enumerate(pathInfos)){ + } + auto result = EstimateDataSize(cluster, flattenPaths, Nothing(), *State_, ctx); + size_t statIdx = 0; + size_t pathIdx = 0; + for (const auto& [idx, pathInfos]: Enumerate(groupIdPathInfos)) { + TVector<ui64> columnarStatInner; + columnarStatInner.reserve(pathInfos.size()); + for (auto& path: pathInfos) { const auto& tableInfo = *path->Table; - ui64 dataSize = columnarStat ? (*columnarStat)[pathId] : tableInfo.Stat->DataSize; - sumAllTableSizes += dataSize; + if (lookupsInfo[pathIdx++] || !tableInfo.Stat) { + columnarStatInner.push_back(tableInfo.Stat ? tableInfo.Stat->DataSize : 0); + sumAllTableSizes += columnarStatInner.back(); + continue; + } + columnarStatInner.push_back(result ? result->at(statIdx) : tableInfo.Stat->DataSize); + sumAllTableSizes += columnarStatInner.back(); + ++statIdx; } + groupIdColumnarStats.emplace_back(std::move(columnarStatInner)); } return groupIdColumnarStats; } @@ -180,11 +185,8 @@ public: } } else { TVector<TVector<std::tuple<ui64, ui64, NYT::TRichYPath>>> partitionTuplesArr; - ui64 sumAllTableSizes = 0; - - TVector<TMaybe<TVector<ui64>>> groupIdColumnarStats = EstimateColumnStats(ctx, cluster, groupIdPathInfos, sumAllTableSizes); - + TVector<TVector<ui64>> groupIdColumnarStats = EstimateColumnStats(ctx, cluster, {groupIdPathInfos}, sumAllTableSizes); ui64 parts = (sumAllTableSizes + dataSizePerJob - 1) / dataSizePerJob; if (canFallback && hasErasure && parts > maxTasks) { std::string_view message = "DQ cannot execute the query. Cause: too big table with erasure codec"; @@ -202,11 +204,11 @@ public: if (sampleSetting) { sample = FromString<double>(sampleSetting->Child(1)->Child(1)->Content()); } + auto& groupStats = groupIdColumnarStats[groupId]; for (const auto& [pathId, path] : Enumerate(groupIdPathInfos[groupId])) { const auto& tableInfo = *path->Table; YQL_ENSURE(tableInfo.Stat, "Table has no stat."); - auto columnarStat = groupIdColumnarStats[groupId]; - ui64 dataSize = columnarStat ? (*columnarStat)[pathId] : tableInfo.Stat->DataSize; + ui64 dataSize = groupStats[pathId]; if (sample) { dataSize *=* sample; } @@ -334,60 +336,85 @@ public: return false; } - TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& node, TExprContext& ctx) override { - if (auto maybeRead = TMaybeNode<TYtReadTable>(&node)) { - - ui64 dataSize = 0; - bool hasErasure = false; - auto cluster = maybeRead.Cast().DataSource().Cluster().StringValue(); - - const auto canUseYtPartitioningApi = State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false); - - TVector<TVector<TYtPathInfo::TPtr>> groupIdPathInfos; - - for (auto section: maybeRead.Cast().Input()) { - groupIdPathInfos.emplace_back(); - for (const auto& path: section.Paths()) { - auto pathInfo = MakeIntrusive<TYtPathInfo>(path); - auto tableInfo = pathInfo->Table; - - YQL_ENSURE(tableInfo); - if (!tableInfo->Stat) { - continue; - } - - if (pathInfo->Ranges && !canUseYtPartitioningApi) { - AddErrorWrap(ctx, node.Pos(), "table with ranges"); - return Nothing(); - } else if (tableInfo->Meta->IsDynamic && !canUseYtPartitioningApi) { - AddErrorWrap(ctx, node.Pos(), "dynamic table"); - return Nothing(); - } else { // - if (tableInfo->Meta->Attrs.Value("erasure_codec", "none") != "none") { - hasErasure = true; + TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) override { + + TVector<bool> hasErasurePerNode; + hasErasurePerNode.reserve(nodes.size()); + TVector<ui64> dataSizes(nodes.size()); + THashMap<TString, TVector<std::pair<const TExprNode*, bool>>> clusterToNodesAndErasure; + THashMap<TString, TVector<TVector<TYtPathInfo::TPtr>>> clusterToGroups; + const auto maxChunks = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); + ui64 chunksCount = 0u; + + for (const auto &node_: nodes) { + if (auto maybeRead = TMaybeNode<TYtReadTable>(node_)) { + + bool hasErasure = false; + auto cluster = maybeRead.Cast().DataSource().Cluster().StringValue(); + auto& groupIdPathInfo = clusterToGroups[cluster]; + + const auto canUseYtPartitioningApi = State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false); + + auto input = maybeRead.Cast().Input(); + for (auto section: input) { + groupIdPathInfo.emplace_back(); + for (const auto& path: section.Paths()) { + auto pathInfo = MakeIntrusive<TYtPathInfo>(path); + auto tableInfo = pathInfo->Table; + + YQL_ENSURE(tableInfo); + + if (pathInfo->Ranges && !canUseYtPartitioningApi) { + AddErrorWrap(ctx, node_->Pos(), "table with ranges"); + return Nothing(); + } else if (tableInfo->Meta->IsDynamic && !canUseYtPartitioningApi) { + AddErrorWrap(ctx, node_->Pos(), "dynamic table"); + return Nothing(); + } else { // + if (tableInfo->Meta->Attrs.Value("erasure_codec", "none") != "none") { + hasErasure = true; + } + if (tableInfo->Stat) { + chunksCount += tableInfo->Stat->ChunkCount; + } } + groupIdPathInfo.back().emplace_back(pathInfo); } - groupIdPathInfos.back().emplace_back(pathInfo); } + if (chunksCount > maxChunks) { + AddErrorWrap(ctx, node_->Pos(), "table with too many chunks"); + return false; + } + clusterToNodesAndErasure[cluster].push_back({node_, hasErasure}); + } else { + AddErrorWrap(ctx, node_->Pos(), TStringBuilder() << "unsupported callable: " << node_->Content()); + return Nothing(); } - - EstimateColumnStats(ctx, cluster, groupIdPathInfos, dataSize); - - if (hasErasure) { // - if (auto codecCpu = State_->Configuration->ErasureCodecCpuForDq.Get(cluster)) { - dataSizePerJob = Max(ui64(dataSizePerJob / *codecCpu), 10_KB); - const ui64 parts = (dataSize + dataSizePerJob - 1) / dataSizePerJob; - if (parts > maxTasksPerStage) { - AddErrorWrap(ctx, node.Pos(), "too big table with erasure codec"); - return Nothing(); - } + } + ui64 dataSize = 0; + for (auto& [cluster, info]: clusterToNodesAndErasure) { + auto res = EstimateColumnStats(ctx, cluster, clusterToGroups[cluster], dataSize); + auto codecCpu = State_->Configuration->ErasureCodecCpuForDq.Get(cluster); + if (!codecCpu) { + continue; + } + size_t idx = 0; + for (auto& [node, hasErasure]: info) { + if (!hasErasure) { + ++idx; + continue; + } + ui64 readSize = std::accumulate(res[idx].begin(), res[idx].end(), 0); + ++idx; + dataSizePerJob = Max(ui64(dataSizePerJob / *codecCpu), 10_KB); + const ui64 parts = (readSize + dataSizePerJob - 1) / dataSizePerJob; + if (parts > maxTasksPerStage) { + AddErrorWrap(ctx, node->Pos(), "too big table with erasure codec"); + return Nothing(); } } - - return dataSize; } - AddErrorWrap(ctx, node.Pos(), TStringBuilder() << "unsupported callable: " << node.Content()); - return Nothing(); + return dataSize; } void AddErrorWrap(TExprContext& ctx, const NYql::TPositionHandle& where, const TString& cause) { |