summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergey Uzhakov <[email protected]>2026-06-25 14:02:07 +0300
committerGitHub <[email protected]>2026-06-25 14:02:07 +0300
commitfaffe36f3fb75fa15bdf95a5e9372010fa7b72f4 (patch)
tree9804deded094065eea16ddad18d44f0360be6ba0
parent8c311a1d89cd47037d87fcd2303ebc6db1bfad84 (diff)
improve error message in case of huge DQ plan (#44390)
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp93
-rw-r--r--ydb/library/yql/providers/dq/provider/ut/ya.make39
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp155
3 files changed, 239 insertions, 48 deletions
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index 6b100585843..ee06ff61468 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -1272,6 +1272,18 @@ private:
}
}
+ static TString FormatTooManyStagesError(size_t count, size_t limit) {
+ return TStringBuilder()
+ << "The query plan is too complex: " << count << " stages exceeds the limit of " << limit
+ << ". Consider simplifying the query (e.g. reduce the number of joins, subqueries or unions).";
+ }
+
+ static TString FormatTooManyTasksError(size_t count, size_t limit) {
+ return TStringBuilder()
+ << "The query plan is too complex: " << count << " tasks exceeds the limit of " << limit
+ << ". Consider simplifying the query (e.g. reduce the number of joins, subqueries or unions).";
+ }
+
TPublicIds::TPtr GetPublicIds(const TExprNode::TPtr& root) const {
TPublicIds::TPtr publicIds = std::make_shared<TPublicIds>();
VisitExpr(root, [&](const TExprNode::TPtr& node) {
@@ -1381,7 +1393,7 @@ private:
const auto maxTasksPerOperation = State->Settings->MaxTasksPerOperation.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerOperation);
auto maxDataSizePerJob = settings->MaxDataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::MaxDataSizePerJob);
- auto stagesCount = executionPlanner->StagesCount();
+ const auto stagesCount = executionPlanner->StagesCount();
if (!executionPlanner->CanFallback()) {
settings->FallbackPolicy = State->TypeCtx->DqFallbackPolicy = EFallbackPolicy::Never;
@@ -1389,17 +1401,20 @@ private:
bool canFallback = (settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) != EFallbackPolicy::Never && !State->TypeCtx->ForceDq);
- if (stagesCount > maxTasksPerOperation && canFallback) {
- return SyncStatus(FallbackWithMessage(
- pull.Ref(),
- TStringBuilder()
- << "Too many stages: "
- << stagesCount << " > "
- << maxTasksPerOperation, ctx, true));
+ // Each stage produces at least one task, so stagesCount is a lower bound
+ // for the total task count. Use it as a fast-path check before running
+ // the full execution planner.
+ if (stagesCount > maxTasksPerOperation) {
+ if (canFallback) {
+ return SyncStatus(FallbackWithMessage(
+ pull.Ref(),
+ TStringBuilder() << "Too many stages: " << stagesCount << " > " << maxTasksPerOperation,
+ ctx, true));
+ }
+ ctx.AddError(TIssue(ctx.GetPosition(pull.Ref().Pos()), FormatTooManyStagesError(stagesCount, maxTasksPerOperation)));
+ return SyncError();
}
- YQL_ENSURE(stagesCount <= maxTasksPerOperation, "Too many stages: " << stagesCount << " > " << maxTasksPerOperation);
-
try {
while (!executionPlanner->PlanExecution(canFallback) && tasksPerStage > 1) {
tasksPerStage /= 2;
@@ -1426,17 +1441,17 @@ private:
bool localRun = false;
auto& tasks = executionPlanner->GetTasks();
- if (tasks.size() > maxTasksPerOperation && canFallback) {
- return SyncStatus(FallbackWithMessage(
- pull.Ref(),
- TStringBuilder()
- << "Too many tasks: "
- << tasks.size() << " > "
- << maxTasksPerOperation, ctx, true));
+ if (tasks.size() > maxTasksPerOperation) {
+ if (canFallback) {
+ return SyncStatus(FallbackWithMessage(
+ pull.Ref(),
+ TStringBuilder() << "Too many tasks: " << tasks.size() << " > " << maxTasksPerOperation,
+ ctx, true));
+ }
+ ctx.AddError(TIssue(ctx.GetPosition(pull.Ref().Pos()), FormatTooManyTasksError(tasks.size(), maxTasksPerOperation)));
+ return SyncError();
}
- YQL_ENSURE(tasks.size() <= maxTasksPerOperation);
-
{
TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), State->FunctionRegistry->SupportsSizedAllocators());
TTypeEnvironment typeEnv(alloc);
@@ -1938,7 +1953,7 @@ private:
const auto maxTasksPerOperation = State->Settings->MaxTasksPerOperation.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerOperation);
auto maxDataSizePerJob = settings->MaxDataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::MaxDataSizePerJob);
- auto stagesCount = executionPlanner->StagesCount();
+ const auto stagesCount = executionPlanner->StagesCount();
if (!executionPlanner->CanFallback()) {
settings->FallbackPolicy = State->TypeCtx->DqFallbackPolicy = EFallbackPolicy::Never;
@@ -1946,17 +1961,19 @@ private:
bool canFallback = (settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) != EFallbackPolicy::Never && !State->TypeCtx->ForceDq);
- if (stagesCount > maxTasksPerOperation && canFallback) {
- return FallbackWithMessage(
- *input,
- TStringBuilder()
- << "Too many stages: "
- << stagesCount << " > "
- << maxTasksPerOperation, ctx, false);
+ // Each stage produces at least one task, so stagesCount is a lower
+ // bound for the total task count. Use it as a fast-path check.
+ if (stagesCount > maxTasksPerOperation) {
+ if (canFallback) {
+ return FallbackWithMessage(
+ *input,
+ TStringBuilder() << "Too many stages: " << stagesCount << " > " << maxTasksPerOperation,
+ ctx, false);
+ }
+ ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), FormatTooManyStagesError(stagesCount, maxTasksPerOperation)));
+ return IGraphTransformer::TStatus::Error;
}
- YQL_ENSURE(stagesCount <= maxTasksPerOperation);
-
try {
while (!executionPlanner->PlanExecution(canFallback) && tasksPerStage > 1) {
tasksPerStage /= 2;
@@ -1979,17 +1996,17 @@ private:
}
auto& tasks = executionPlanner->GetTasks();
- if (tasks.size() > maxTasksPerOperation && canFallback) {
- return FallbackWithMessage(
- *input,
- TStringBuilder()
- << "Too many tasks: "
- << tasks.size() << " > "
- << maxTasksPerOperation, ctx, false);
+ if (tasks.size() > maxTasksPerOperation) {
+ if (canFallback) {
+ return FallbackWithMessage(
+ *input,
+ TStringBuilder() << "Too many tasks: " << tasks.size() << " > " << maxTasksPerOperation,
+ ctx, false);
+ }
+ ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), FormatTooManyTasksError(tasks.size(), maxTasksPerOperation)));
+ return IGraphTransformer::TStatus::Error;
}
- YQL_ENSURE(tasks.size() <= maxTasksPerOperation);
-
{
TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), State->FunctionRegistry->SupportsSizedAllocators());
TTypeEnvironment typeEnv(alloc);
diff --git a/ydb/library/yql/providers/dq/provider/ut/ya.make b/ydb/library/yql/providers/dq/provider/ut/ya.make
index 187e89bf8cf..5efa4874a0f 100644
--- a/ydb/library/yql/providers/dq/provider/ut/ya.make
+++ b/ydb/library/yql/providers/dq/provider/ut/ya.make
@@ -1,16 +1,41 @@
UNITTEST_FOR(ydb/library/yql/providers/dq/provider)
-PEERDIR(
- library/cpp/testing/unittest
- ydb/library/yql/providers/dq/provider
- yql/essentials/sql/pg_dummy
- yql/essentials/public/udf/service/stub
-)
-
SRCS(
yql_dq_provider_ut.cpp
)
+PEERDIR(
+ ydb/library/yql/dq/actors/compute
+ ydb/library/yql/dq/comp_nodes
+ ydb/library/yql/dq/transform
+ ydb/library/yql/providers/dq/local_gateway
+ ydb/library/yql/providers/dq/provider
+ ydb/library/yql/providers/dq/provider/exec
+ library/cpp/lwtrace
+ library/cpp/lwtrace/mon
+ library/cpp/testing/unittest
+ yt/yql/providers/yt/codec/codegen
+ yt/yql/providers/yt/comp_nodes/llvm16
+ yt/yql/providers/yt/gateway/file
+ yt/yql/providers/yt/lib/ut_common
+ yt/yql/providers/yt/provider
+ yql/essentials/core/cbo/simple
+ yql/essentials/core/facade
+ yql/essentials/core/file_storage
+ yql/essentials/core/services/mounts
+ yql/essentials/minikql/comp_nodes/llvm16
+ yql/essentials/providers/common/comp_nodes
+ yql/essentials/public/udf/service/exception_policy
+ yql/essentials/sql/pg
+)
+
YQL_LAST_ABI_VERSION()
+IF (SANITIZER_TYPE)
+ SIZE(LARGE)
+ INCLUDE(${ARCADIA_ROOT}/ydb/tests/large.inc)
+ELSE()
+ SIZE(MEDIUM)
+ENDIF()
+
END()
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp
index 339f7299d31..072e428f65b 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp
@@ -1,15 +1,138 @@
+#include "yql_dq_statistics_json.h"
+
#include <library/cpp/testing/unittest/registar.h>
+#include <yt/yql/providers/yt/gateway/file/yql_yt_file.h>
+#include <yt/yql/providers/yt/gateway/file/yql_yt_file_services.h>
+#include <yt/yql/providers/yt/lib/ut_common/yql_ut_common.h>
+#include <yt/yql/providers/yt/provider/yql_yt_provider.h>
+
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
+#include <ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.h>
+#include <ydb/library/yql/dq/transform/yql_common_dq_transform.h>
+
+#include <yql/essentials/providers/common/comp_nodes/yql_factory.h>
+#include <yql/essentials/providers/common/proto/gateways_config.pb.h>
+#include <yql/essentials/providers/common/provider/yql_provider_names.h>
+
#include <ydb/library/yql/providers/dq/common/yql_dq_common.h>
#include <ydb/library/yql/providers/dq/counters/counters.h>
+#include <ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h>
+#include <ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.h>
+#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
+#include <ydb/library/yql/providers/dq/provider/yql_dq_provider.h>
-#include "yql_dq_statistics_json.h"
+#include <yql/essentials/core/cbo/simple/cbo_simple.h>
+#include <yql/essentials/core/facade/yql_facade.h>
+#include <yql/essentials/core/file_storage/file_storage.h>
+#include <yql/essentials/core/file_storage/proto/file_storage.pb.h>
+#include <yql/essentials/core/services/mounts/yql_mounts.h>
+#include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
+#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
+#include <yql/essentials/minikql/mkql_function_registry.h>
+#include <yql/essentials/utils/log/log.h>
+
+#include <util/stream/tee.h>
+#include <util/string/cast.h>
using namespace NYql;
-Y_UNIT_TEST_SUITE(TestCommon) {
+namespace {
+// Runs a YQL/SQL program through the DQ engine with a YT file data source.
+// maxTasksPerOperation sets the DQ limit on tasks per query.
+bool RunDqProgram(
+ const TString& code,
+ ui32 maxTasksPerOperation,
+ const THashMap<TString, TString>& tableFiles,
+ TString* errorsMessage = nullptr)
+{
+ NLog::YqlLoggerScope logger("cerr", false);
+
+ IOutputStream* errorsOutput = &Cerr;
+ TMaybe<TStringOutput> errorsMessageOutput;
+ TMaybe<TTeeOutput> tee;
+ if (errorsMessage) {
+ errorsMessageOutput.ConstructInPlace(*errorsMessage);
+ tee.ConstructInPlace(&*errorsMessageOutput, &Cerr);
+ errorsOutput = &*tee;
+ }
+
+ TGatewaysConfig gatewaysConfig;
+ {
+ auto& dqCfg = *gatewaysConfig.MutableDq();
+ auto addSetting = [&](const TString& name, const TString& value) {
+ auto* s = dqCfg.AddDefaultSettings();
+ s->SetName(name);
+ s->SetValue(value);
+ };
+ addSetting("EnableComputeActor", "1");
+ addSetting("MaxTasksPerOperation", ToString(maxTasksPerOperation));
+ }
+
+ auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(
+ NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone();
+
+ TVector<TDataProviderInitializer> dataProvidersInit;
+
+ auto yqlNativeServices = NFile::TYtFileServices::Make(functionRegistry.Get(), tableFiles);
+ auto ytGateway = CreateYtFileGateway(yqlNativeServices);
+ dataProvidersInit.push_back(
+ GetYtNativeDataProviderInitializer(ytGateway, MakeSimpleCBOOptimizerFactory(), {}));
+
+ auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({
+ NYql::GetCommonDqFactory(),
+ NKikimr::NMiniKQL::GetYqlFactory()
+ });
+ auto dqTaskTransformFactory = NYql::CreateCompositeTaskTransformFactory({
+ NYql::CreateCommonDqTaskTransformFactory()
+ });
+ auto dqGateway = CreateLocalDqGateway(
+ functionRegistry.Get(), dqCompFactory, dqTaskTransformFactory,
+ {}, false, MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>());
+
+ auto storage = NYql::CreateAsyncFileStorage({});
+ dataProvidersInit.push_back(
+ NYql::GetDqDataProviderInitializer(
+ &CreateDqExecTransformer, dqGateway, dqCompFactory, {}, storage));
+
+ TExprContext moduleCtx;
+ IModuleResolver::TPtr moduleResolver;
+ YQL_ENSURE(GetYqlDefaultModuleResolver(moduleCtx, moduleResolver));
-Y_UNIT_TEST(Empty) { }
+ TProgramFactory factory(true, functionRegistry.Get(), 0ULL, dataProvidersInit, "ut");
+ factory.SetGatewaysConfig(&gatewaysConfig);
+ factory.SetModules(moduleResolver);
+
+ auto program = factory.Create("program", code);
+
+ NSQLTranslation::TTranslationSettings sqlSettings;
+ sqlSettings.SyntaxVersion = 1;
+ sqlSettings.V0Behavior = NSQLTranslation::EV0Behavior::Disable;
+ sqlSettings.Flags.insert("DqEngineEnable");
+ sqlSettings.Flags.insert("DqEngineForce");
+ sqlSettings.ClusterMapping["plato"] = TString(YtProviderName);
+
+ if (!program->ParseSql(sqlSettings)) {
+ program->PrintErrorsTo(*errorsOutput);
+ return false;
+ }
+
+ if (!program->Compile("user")) {
+ program->PrintErrorsTo(*errorsOutput);
+ return false;
+ }
+
+ auto status = program->Run("user", nullptr, nullptr, nullptr);
+ if (status == TProgram::TStatus::Error) {
+ program->PrintErrorsTo(*errorsOutput);
+ return false;
+ }
+
+ return true;
+}
+}
+
+Y_UNIT_TEST_SUITE(TestCommon) {
Y_UNIT_TEST(ParseCounterName) {
TString prefix = "Prefix";
@@ -214,3 +337,29 @@ Y_UNIT_TEST(CollectTaskRunnerStatisticsByTask) {
}
}
+
+Y_UNIT_TEST_SUITE(YqlDqExecTests) {
+
+// Reading from a YT table creates DQ source stages. The number of stages
+// serves as a lower bound for the number of tasks (at least one task per
+// stage). When stagesCount exceeds maxTasksPerOperation and no fallback is
+// allowed (DqEngineForce), the exec transformer must report a user-friendly
+// stages error message rather than crashing with YQL_ENSURE.
+Y_UNIT_TEST(TooManyStagesErrorMessage) {
+ const TString code = R"(
+USE plato;
+SELECT key FROM Input;
+ )";
+
+ TTestTablesMapping testTables;
+ THashMap<TString, TString> tableFiles = {
+ {"yt.plato.Input", testTables.TmpInput.Name()},
+ {"yt.plato.Output", testTables.TmpOutput.Name()},
+ };
+
+ TString errorMessage;
+ bool ok = RunDqProgram(code, /*maxTasksPerOperation=*/0, tableFiles, &errorMessage);
+ UNIT_ASSERT_C(!ok, "Expected failure: too many stages");
+ UNIT_ASSERT_STRING_CONTAINS(errorMessage, "stages exceeds the limit");
+}
+}