aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormrlolthe1st <mrlolthe1st@yandex-team.com>2023-08-10 12:55:42 +0300
committermrlolthe1st <mrlolthe1st@yandex-team.com>2023-08-10 13:55:55 +0300
commit6e87e8ed0eb05aba6f3a7bf39cbba277e56c348a (patch)
tree2d3005f99f0d7ce618a225f305450b73c9493773
parent3ab959a0df5defdbcc427cb1484dabc5a0e00321 (diff)
downloadydb-6e87e8ed0eb05aba6f3a7bf39cbba277e56c348a.tar.gz
YQL-16280: Make fast way check before estimating stats
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_opt_build.cpp2
-rw-r--r--ydb/library/yql/dq/integration/yql_dq_integration.h2
-rw-r--r--ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp4
-rw-r--r--ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp2
-rw-r--r--ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h2
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp27
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp4
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp5
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp2
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp5
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp183
11 files changed, 135 insertions, 103 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
index c2f5a31f4d6..0a80aecb5cc 100644
--- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
@@ -339,7 +339,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
&& dqIntegration->EstimateReadSize(
TDqSettings::TDefault::DataSizePerJob,
TDqSettings::TDefault::MaxTasksPerStage,
- *node.Ptr(),
+ {node.Raw()},
ctx))
{
txRes.Ops.insert(node.Raw());
diff --git a/ydb/library/yql/dq/integration/yql_dq_integration.h b/ydb/library/yql/dq/integration/yql_dq_integration.h
index b528faaa828..b34880f6378 100644
--- a/ydb/library/yql/dq/integration/yql_dq_integration.h
+++ b/ydb/library/yql/dq/integration/yql_dq_integration.h
@@ -42,7 +42,7 @@ public:
TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) = 0;
virtual bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues = false) { Y_UNUSED(skipIssues); Y_UNUSED(node); Y_UNUSED(ctx); return true; }
virtual bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues = true) = 0;
- virtual TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& node, TExprContext& ctx) = 0;
+ virtual TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) = 0;
virtual TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) = 0;
// Nothing if callable is not for writing,
diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp
index 8a44248f694..2a5e7ec832a 100644
--- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp
+++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp
@@ -26,8 +26,8 @@ public:
return TClReadTable::Match(&read);
}
- TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext&) override {
- if (TClReadTable::Match(&read)) {
+ TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TVector<const TExprNode*>& read, TExprContext&) override {
+ if (AllOf(read, [](const auto val) { return TClReadTable::Match(val); })) {
return 0ul; // TODO: return real size
}
return Nothing();
diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
index 9ba72f7a867..451ecf606c4 100644
--- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
+++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
@@ -11,7 +11,7 @@ bool TDqIntegrationBase::CanRead(const TExprNode&, TExprContext&, bool) {
return false;
}
-TMaybe<ui64> TDqIntegrationBase::EstimateReadSize(ui64, ui32, const TExprNode &, TExprContext&) {
+TMaybe<ui64> TDqIntegrationBase::EstimateReadSize(ui64, ui32, const TVector<const TExprNode*> &, TExprContext&) {
return Nothing();
}
diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h
index abbc412d085..714aa124dc3 100644
--- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h
+++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h
@@ -9,7 +9,7 @@ public:
ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node,
TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) override;
bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues) override;
- TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& node, TExprContext& ctx) override;
+ TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) override;
TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) override;
void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override;
TMaybe<bool> CanWrite(const TExprNode& write, TExprContext& ctx) override;
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp
index 49f8a5fe79e..eb1f5c14e1e 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp
@@ -40,12 +40,11 @@ private:
[](const TExprNode::TPtr& n) {
return !TDqConnection::Match(n.Get()) && !TDqPhyPrecompute::Match(n.Get()) && !TDqReadWrapBase::Match(n.Get());
},
- [&hasErrors, &ctx = Ctx_, &dataSize = DataSize_, &typeCtx = TypeCtx_, &state = State_](const TExprNode::TPtr& n) {
+ [&readPerProvider_ = ReadsPerProvider_, &hasErrors, &ctx = Ctx_, &typeCtx = TypeCtx_](const TExprNode::TPtr& n) {
if (TCoScriptUdf::Match(n.Get()) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(n->Head().Content()))) {
ReportError(ctx, *n, TStringBuilder() << "Cannot execute system python udf " << n->Content() << " in DQ");
hasErrors = true;
}
-
if (!typeCtx.ForceDq && TDqReadWrapBase::Match(n.Get())) {
auto readNode = n->Child(0);
auto dataSourceName = readNode->Child(1)->Child(0)->Content();
@@ -54,13 +53,7 @@ private:
YQL_ENSURE(datasource);
auto dqIntegration = (*datasource)->GetDqIntegration();
YQL_ENSURE(dqIntegration);
- if (dqIntegration) {
- TMaybe<ui64> size;
- hasErrors |= !(size = dqIntegration->EstimateReadSize(state->Settings->DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob), state->Settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage), *readNode, ctx));
- if (size) {
- dataSize += *size;
- }
- }
+ readPerProvider_[dqIntegration].push_back(readNode);
}
}
return !hasErrors;
@@ -153,9 +146,22 @@ public:
});
bool hasError = false;
-
+
for (const auto n: dqNodes) {
hasError |= !ValidateDqNode(*n);
+ if (hasError) {
+ break;
+ }
+ }
+
+ for (auto& [integration, nodes]: ReadsPerProvider_) {
+ TMaybe<ui64> size;
+ hasError |= !(size = integration->EstimateReadSize(State_->Settings->DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob),
+ State_->Settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage), nodes, Ctx_));
+ if (hasError) {
+ break;
+ }
+ DataSize_ += *size;
}
if (!hasError && hasJoin && DataSize_ > State_->Settings->MaxDataSizePerQuery.Get().GetOrElse(10_GB)) {
@@ -169,6 +175,7 @@ private:
const TTypeAnnotationContext& TypeCtx_;
TExprContext& Ctx_;
TNodeSet Visited_;
+ THashMap<IDqIntegration*, TVector<const TExprNode*>> ReadsPerProvider_;
size_t DataSize_ = 0;
const TDqState::TPtr State_;
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
index 623bcad3fe6..af5325d7d56 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
@@ -29,9 +29,9 @@ namespace NYql {
return TGenReadTable::Match(&read);
}
- TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read,
+ TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TVector<const TExprNode*>& read,
TExprContext&) override {
- if (TGenReadTable::Match(&read)) {
+ if (AllOf(read, [](const auto val) { return TGenReadTable::Match(val); })) {
return 0ul; // TODO: return real size
}
return Nothing();
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index d98ab61154c..fe2e309de93 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -123,11 +123,10 @@ public:
return TS3ReadObject::Match(&read);
}
- TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext&) override {
- if (TS3ReadObject::Match(&read)) {
+ TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TVector<const TExprNode*>& read, TExprContext&) override {
+ if (AllOf(read, [](const auto val) { return TS3ReadObject::Match(val); })) {
return 0ul; // TODO: return real size
}
-
return Nothing();
}
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
index cb82aec4f53..04c03156e66 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
@@ -78,7 +78,7 @@ public:
YQL_ENSURE(false, "Unimplemented");
}
- TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode&, TExprContext&) override {
+ TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TVector<const TExprNode*>&, TExprContext&) override {
YQL_ENSURE(false, "Unimplemented");
}
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp
index 6e887fb99c2..cea1b0aaac5 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp
@@ -73,11 +73,10 @@ public:
return TYdbReadTable::Match(&read);
}
- TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext& ) override {
- if (TYdbReadTable::Match(&read)) {
+ TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TVector<const TExprNode*>& read, TExprContext&) override {
+ if (AllOf(read, [](const auto val) { return TYdbReadTable::Match(val); })) {
return 0ul; // TODO: return real size
}
-
return Nothing();
}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
index 78247608b41..a8b661b22d8 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
@@ -42,37 +42,42 @@ public:
{
}
- TVector<TMaybe<TVector<ui64>>> EstimateColumnStats(TExprContext& ctx, const TString& cluster, const TVector<TVector<TYtPathInfo::TPtr>>& groupIdPathInfos, ui64& sumAllTableSizes) {
- TVector<TMaybe<TVector<ui64>>> groupIdColumnarStats;
+ TVector<TVector<ui64>> EstimateColumnStats(TExprContext& ctx, const TString& cluster, const TVector<TVector<TYtPathInfo::TPtr>>& groupIdPathInfos, ui64& sumAllTableSizes) {
+ TVector<TVector<ui64>> groupIdColumnarStats;
+ groupIdColumnarStats.reserve(groupIdPathInfos.size());
+ TVector<bool> lookupsInfo;
+ TVector<TYtPathInfo::TPtr> flattenPaths;
for (const auto& pathInfos: groupIdPathInfos) {
- TVector<TString> columns;
- bool hasLookups = false;
for (const auto& pathInfo: pathInfos) {
- if (const auto cols = pathInfo->Columns) {
- if (columns.empty() && cols->GetColumns()) {
- for (auto& column : *cols->GetColumns()) {
- columns.push_back(column.Name);
- }
- }
-
- if (pathInfo->Table->Meta && pathInfo->Table->Meta->Attrs.Value("optimize_for", "scan") == "lookup") {
- hasLookups = true;
- }
+ auto hasLookup = pathInfo->Table->Meta && pathInfo->Table->Meta->Attrs.Value("optimize_for", "scan") == "lookup";
+ lookupsInfo.push_back(hasLookup);
+ if (!pathInfo->Table->Stat) {
+ continue;
}
+ if (hasLookup) {
+ continue;
+ }
+ flattenPaths.push_back(pathInfo);
}
-
- TMaybe<TVector<ui64>> columnarStat;
-
- if (!columns.empty() && !hasLookups) {
- columnarStat = EstimateDataSize(cluster, pathInfos, {columns}, *State_, ctx);
- }
-
- groupIdColumnarStats.push_back(columnarStat);
- for (const auto& [pathId, path] : Enumerate(pathInfos)){
+ }
+ auto result = EstimateDataSize(cluster, flattenPaths, Nothing(), *State_, ctx);
+ size_t statIdx = 0;
+ size_t pathIdx = 0;
+ for (const auto& [idx, pathInfos]: Enumerate(groupIdPathInfos)) {
+ TVector<ui64> columnarStatInner;
+ columnarStatInner.reserve(pathInfos.size());
+ for (auto& path: pathInfos) {
const auto& tableInfo = *path->Table;
- ui64 dataSize = columnarStat ? (*columnarStat)[pathId] : tableInfo.Stat->DataSize;
- sumAllTableSizes += dataSize;
+ if (lookupsInfo[pathIdx++] || !tableInfo.Stat) {
+ columnarStatInner.push_back(tableInfo.Stat ? tableInfo.Stat->DataSize : 0);
+ sumAllTableSizes += columnarStatInner.back();
+ continue;
+ }
+ columnarStatInner.push_back(result ? result->at(statIdx) : tableInfo.Stat->DataSize);
+ sumAllTableSizes += columnarStatInner.back();
+ ++statIdx;
}
+ groupIdColumnarStats.emplace_back(std::move(columnarStatInner));
}
return groupIdColumnarStats;
}
@@ -180,11 +185,8 @@ public:
}
} else {
TVector<TVector<std::tuple<ui64, ui64, NYT::TRichYPath>>> partitionTuplesArr;
-
ui64 sumAllTableSizes = 0;
-
- TVector<TMaybe<TVector<ui64>>> groupIdColumnarStats = EstimateColumnStats(ctx, cluster, groupIdPathInfos, sumAllTableSizes);
-
+ TVector<TVector<ui64>> groupIdColumnarStats = EstimateColumnStats(ctx, cluster, {groupIdPathInfos}, sumAllTableSizes);
ui64 parts = (sumAllTableSizes + dataSizePerJob - 1) / dataSizePerJob;
if (canFallback && hasErasure && parts > maxTasks) {
std::string_view message = "DQ cannot execute the query. Cause: too big table with erasure codec";
@@ -202,11 +204,11 @@ public:
if (sampleSetting) {
sample = FromString<double>(sampleSetting->Child(1)->Child(1)->Content());
}
+ auto& groupStats = groupIdColumnarStats[groupId];
for (const auto& [pathId, path] : Enumerate(groupIdPathInfos[groupId])) {
const auto& tableInfo = *path->Table;
YQL_ENSURE(tableInfo.Stat, "Table has no stat.");
- auto columnarStat = groupIdColumnarStats[groupId];
- ui64 dataSize = columnarStat ? (*columnarStat)[pathId] : tableInfo.Stat->DataSize;
+ ui64 dataSize = groupStats[pathId];
if (sample) {
dataSize *=* sample;
}
@@ -334,60 +336,85 @@ public:
return false;
}
- TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& node, TExprContext& ctx) override {
- if (auto maybeRead = TMaybeNode<TYtReadTable>(&node)) {
-
- ui64 dataSize = 0;
- bool hasErasure = false;
- auto cluster = maybeRead.Cast().DataSource().Cluster().StringValue();
-
- const auto canUseYtPartitioningApi = State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false);
-
- TVector<TVector<TYtPathInfo::TPtr>> groupIdPathInfos;
-
- for (auto section: maybeRead.Cast().Input()) {
- groupIdPathInfos.emplace_back();
- for (const auto& path: section.Paths()) {
- auto pathInfo = MakeIntrusive<TYtPathInfo>(path);
- auto tableInfo = pathInfo->Table;
-
- YQL_ENSURE(tableInfo);
- if (!tableInfo->Stat) {
- continue;
- }
-
- if (pathInfo->Ranges && !canUseYtPartitioningApi) {
- AddErrorWrap(ctx, node.Pos(), "table with ranges");
- return Nothing();
- } else if (tableInfo->Meta->IsDynamic && !canUseYtPartitioningApi) {
- AddErrorWrap(ctx, node.Pos(), "dynamic table");
- return Nothing();
- } else { //
- if (tableInfo->Meta->Attrs.Value("erasure_codec", "none") != "none") {
- hasErasure = true;
+ TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) override {
+
+ TVector<bool> hasErasurePerNode;
+ hasErasurePerNode.reserve(nodes.size());
+ TVector<ui64> dataSizes(nodes.size());
+ THashMap<TString, TVector<std::pair<const TExprNode*, bool>>> clusterToNodesAndErasure;
+ THashMap<TString, TVector<TVector<TYtPathInfo::TPtr>>> clusterToGroups;
+ const auto maxChunks = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ);
+ ui64 chunksCount = 0u;
+
+ for (const auto &node_: nodes) {
+ if (auto maybeRead = TMaybeNode<TYtReadTable>(node_)) {
+
+ bool hasErasure = false;
+ auto cluster = maybeRead.Cast().DataSource().Cluster().StringValue();
+ auto& groupIdPathInfo = clusterToGroups[cluster];
+
+ const auto canUseYtPartitioningApi = State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false);
+
+ auto input = maybeRead.Cast().Input();
+ for (auto section: input) {
+ groupIdPathInfo.emplace_back();
+ for (const auto& path: section.Paths()) {
+ auto pathInfo = MakeIntrusive<TYtPathInfo>(path);
+ auto tableInfo = pathInfo->Table;
+
+ YQL_ENSURE(tableInfo);
+
+ if (pathInfo->Ranges && !canUseYtPartitioningApi) {
+ AddErrorWrap(ctx, node_->Pos(), "table with ranges");
+ return Nothing();
+ } else if (tableInfo->Meta->IsDynamic && !canUseYtPartitioningApi) {
+ AddErrorWrap(ctx, node_->Pos(), "dynamic table");
+ return Nothing();
+ } else { //
+ if (tableInfo->Meta->Attrs.Value("erasure_codec", "none") != "none") {
+ hasErasure = true;
+ }
+ if (tableInfo->Stat) {
+ chunksCount += tableInfo->Stat->ChunkCount;
+ }
}
+ groupIdPathInfo.back().emplace_back(pathInfo);
}
- groupIdPathInfos.back().emplace_back(pathInfo);
}
+ if (chunksCount > maxChunks) {
+ AddErrorWrap(ctx, node_->Pos(), "table with too many chunks");
+ return false;
+ }
+ clusterToNodesAndErasure[cluster].push_back({node_, hasErasure});
+ } else {
+ AddErrorWrap(ctx, node_->Pos(), TStringBuilder() << "unsupported callable: " << node_->Content());
+ return Nothing();
}
-
- EstimateColumnStats(ctx, cluster, groupIdPathInfos, dataSize);
-
- if (hasErasure) { //
- if (auto codecCpu = State_->Configuration->ErasureCodecCpuForDq.Get(cluster)) {
- dataSizePerJob = Max(ui64(dataSizePerJob / *codecCpu), 10_KB);
- const ui64 parts = (dataSize + dataSizePerJob - 1) / dataSizePerJob;
- if (parts > maxTasksPerStage) {
- AddErrorWrap(ctx, node.Pos(), "too big table with erasure codec");
- return Nothing();
- }
+ }
+ ui64 dataSize = 0;
+ for (auto& [cluster, info]: clusterToNodesAndErasure) {
+ auto res = EstimateColumnStats(ctx, cluster, clusterToGroups[cluster], dataSize);
+ auto codecCpu = State_->Configuration->ErasureCodecCpuForDq.Get(cluster);
+ if (!codecCpu) {
+ continue;
+ }
+ size_t idx = 0;
+ for (auto& [node, hasErasure]: info) {
+ if (!hasErasure) {
+ ++idx;
+ continue;
+ }
+ ui64 readSize = std::accumulate(res[idx].begin(), res[idx].end(), 0);
+ ++idx;
+ dataSizePerJob = Max(ui64(dataSizePerJob / *codecCpu), 10_KB);
+ const ui64 parts = (readSize + dataSizePerJob - 1) / dataSizePerJob;
+ if (parts > maxTasksPerStage) {
+ AddErrorWrap(ctx, node->Pos(), "too big table with erasure codec");
+ return Nothing();
}
}
-
- return dataSize;
}
- AddErrorWrap(ctx, node.Pos(), TStringBuilder() << "unsupported callable: " << node.Content());
- return Nothing();
+ return dataSize;
}
void AddErrorWrap(TExprContext& ctx, const NYql::TPositionHandle& where, const TString& cause) {