diff options
| author | Sergey Uzhakov <[email protected]> | 2026-06-25 14:02:07 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2026-06-25 14:02:07 +0300 |
| commit | faffe36f3fb75fa15bdf95a5e9372010fa7b72f4 (patch) | |
| tree | 9804deded094065eea16ddad18d44f0360be6ba0 | |
| parent | 8c311a1d89cd47037d87fcd2303ebc6db1bfad84 (diff) | |
improve error message in case of huge DQ plan (#44390)
3 files changed, 239 insertions, 48 deletions
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 6b100585843..ee06ff61468 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -1272,6 +1272,18 @@ private: } } + static TString FormatTooManyStagesError(size_t count, size_t limit) { + return TStringBuilder() + << "The query plan is too complex: " << count << " stages exceeds the limit of " << limit + << ". Consider simplifying the query (e.g. reduce the number of joins, subqueries or unions)."; + } + + static TString FormatTooManyTasksError(size_t count, size_t limit) { + return TStringBuilder() + << "The query plan is too complex: " << count << " tasks exceeds the limit of " << limit + << ". Consider simplifying the query (e.g. reduce the number of joins, subqueries or unions)."; + } + TPublicIds::TPtr GetPublicIds(const TExprNode::TPtr& root) const { TPublicIds::TPtr publicIds = std::make_shared<TPublicIds>(); VisitExpr(root, [&](const TExprNode::TPtr& node) { @@ -1381,7 +1393,7 @@ private: const auto maxTasksPerOperation = State->Settings->MaxTasksPerOperation.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerOperation); auto maxDataSizePerJob = settings->MaxDataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::MaxDataSizePerJob); - auto stagesCount = executionPlanner->StagesCount(); + const auto stagesCount = executionPlanner->StagesCount(); if (!executionPlanner->CanFallback()) { settings->FallbackPolicy = State->TypeCtx->DqFallbackPolicy = EFallbackPolicy::Never; @@ -1389,17 +1401,20 @@ private: bool canFallback = (settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) != EFallbackPolicy::Never && !State->TypeCtx->ForceDq); - if (stagesCount > maxTasksPerOperation && canFallback) { - return SyncStatus(FallbackWithMessage( - pull.Ref(), - TStringBuilder() - << "Too many stages: " - << stagesCount << " > " - << maxTasksPerOperation, ctx, true)); + // Each stage produces at least one task, so stagesCount is a lower bound + // for the total task count. Use it as a fast-path check before running + // the full execution planner. + if (stagesCount > maxTasksPerOperation) { + if (canFallback) { + return SyncStatus(FallbackWithMessage( + pull.Ref(), + TStringBuilder() << "Too many stages: " << stagesCount << " > " << maxTasksPerOperation, + ctx, true)); + } + ctx.AddError(TIssue(ctx.GetPosition(pull.Ref().Pos()), FormatTooManyStagesError(stagesCount, maxTasksPerOperation))); + return SyncError(); } - YQL_ENSURE(stagesCount <= maxTasksPerOperation, "Too many stages: " << stagesCount << " > " << maxTasksPerOperation); - try { while (!executionPlanner->PlanExecution(canFallback) && tasksPerStage > 1) { tasksPerStage /= 2; @@ -1426,17 +1441,17 @@ private: bool localRun = false; auto& tasks = executionPlanner->GetTasks(); - if (tasks.size() > maxTasksPerOperation && canFallback) { - return SyncStatus(FallbackWithMessage( - pull.Ref(), - TStringBuilder() - << "Too many tasks: " - << tasks.size() << " > " - << maxTasksPerOperation, ctx, true)); + if (tasks.size() > maxTasksPerOperation) { + if (canFallback) { + return SyncStatus(FallbackWithMessage( + pull.Ref(), + TStringBuilder() << "Too many tasks: " << tasks.size() << " > " << maxTasksPerOperation, + ctx, true)); + } + ctx.AddError(TIssue(ctx.GetPosition(pull.Ref().Pos()), FormatTooManyTasksError(tasks.size(), maxTasksPerOperation))); + return SyncError(); } - YQL_ENSURE(tasks.size() <= maxTasksPerOperation); - { TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), State->FunctionRegistry->SupportsSizedAllocators()); TTypeEnvironment typeEnv(alloc); @@ -1938,7 +1953,7 @@ private: const auto maxTasksPerOperation = State->Settings->MaxTasksPerOperation.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerOperation); auto maxDataSizePerJob = settings->MaxDataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::MaxDataSizePerJob); - auto stagesCount = executionPlanner->StagesCount(); + const auto stagesCount = executionPlanner->StagesCount(); if (!executionPlanner->CanFallback()) { settings->FallbackPolicy = State->TypeCtx->DqFallbackPolicy = EFallbackPolicy::Never; @@ -1946,17 +1961,19 @@ private: bool canFallback = (settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) != EFallbackPolicy::Never && !State->TypeCtx->ForceDq); - if (stagesCount > maxTasksPerOperation && canFallback) { - return FallbackWithMessage( - *input, - TStringBuilder() - << "Too many stages: " - << stagesCount << " > " - << maxTasksPerOperation, ctx, false); + // Each stage produces at least one task, so stagesCount is a lower + // bound for the total task count. Use it as a fast-path check. + if (stagesCount > maxTasksPerOperation) { + if (canFallback) { + return FallbackWithMessage( + *input, + TStringBuilder() << "Too many stages: " << stagesCount << " > " << maxTasksPerOperation, + ctx, false); + } + ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), FormatTooManyStagesError(stagesCount, maxTasksPerOperation))); + return IGraphTransformer::TStatus::Error; } - YQL_ENSURE(stagesCount <= maxTasksPerOperation); - try { while (!executionPlanner->PlanExecution(canFallback) && tasksPerStage > 1) { tasksPerStage /= 2; @@ -1979,17 +1996,17 @@ private: } auto& tasks = executionPlanner->GetTasks(); - if (tasks.size() > maxTasksPerOperation && canFallback) { - return FallbackWithMessage( - *input, - TStringBuilder() - << "Too many tasks: " - << tasks.size() << " > " - << maxTasksPerOperation, ctx, false); + if (tasks.size() > maxTasksPerOperation) { + if (canFallback) { + return FallbackWithMessage( + *input, + TStringBuilder() << "Too many tasks: " << tasks.size() << " > " << maxTasksPerOperation, + ctx, false); + } + ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), FormatTooManyTasksError(tasks.size(), maxTasksPerOperation))); + return IGraphTransformer::TStatus::Error; } - YQL_ENSURE(tasks.size() <= maxTasksPerOperation); - { TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), State->FunctionRegistry->SupportsSizedAllocators()); TTypeEnvironment typeEnv(alloc); diff --git a/ydb/library/yql/providers/dq/provider/ut/ya.make b/ydb/library/yql/providers/dq/provider/ut/ya.make index 187e89bf8cf..5efa4874a0f 100644 --- a/ydb/library/yql/providers/dq/provider/ut/ya.make +++ b/ydb/library/yql/providers/dq/provider/ut/ya.make @@ -1,16 +1,41 @@ UNITTEST_FOR(ydb/library/yql/providers/dq/provider) -PEERDIR( - library/cpp/testing/unittest - ydb/library/yql/providers/dq/provider - yql/essentials/sql/pg_dummy - yql/essentials/public/udf/service/stub -) - SRCS( yql_dq_provider_ut.cpp ) +PEERDIR( + ydb/library/yql/dq/actors/compute + ydb/library/yql/dq/comp_nodes + ydb/library/yql/dq/transform + ydb/library/yql/providers/dq/local_gateway + ydb/library/yql/providers/dq/provider + ydb/library/yql/providers/dq/provider/exec + library/cpp/lwtrace + library/cpp/lwtrace/mon + library/cpp/testing/unittest + yt/yql/providers/yt/codec/codegen + yt/yql/providers/yt/comp_nodes/llvm16 + yt/yql/providers/yt/gateway/file + yt/yql/providers/yt/lib/ut_common + yt/yql/providers/yt/provider + yql/essentials/core/cbo/simple + yql/essentials/core/facade + yql/essentials/core/file_storage + yql/essentials/core/services/mounts + yql/essentials/minikql/comp_nodes/llvm16 + yql/essentials/providers/common/comp_nodes + yql/essentials/public/udf/service/exception_policy + yql/essentials/sql/pg +) + YQL_LAST_ABI_VERSION() +IF (SANITIZER_TYPE) + SIZE(LARGE) + INCLUDE(${ARCADIA_ROOT}/ydb/tests/large.inc) +ELSE() + SIZE(MEDIUM) +ENDIF() + END() diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp index 339f7299d31..072e428f65b 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp @@ -1,15 +1,138 @@ +#include "yql_dq_statistics_json.h" + #include <library/cpp/testing/unittest/registar.h> +#include <yt/yql/providers/yt/gateway/file/yql_yt_file.h> +#include <yt/yql/providers/yt/gateway/file/yql_yt_file_services.h> +#include <yt/yql/providers/yt/lib/ut_common/yql_ut_common.h> +#include <yt/yql/providers/yt/provider/yql_yt_provider.h> + +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> +#include <ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.h> +#include <ydb/library/yql/dq/transform/yql_common_dq_transform.h> + +#include <yql/essentials/providers/common/comp_nodes/yql_factory.h> +#include <yql/essentials/providers/common/proto/gateways_config.pb.h> +#include <yql/essentials/providers/common/provider/yql_provider_names.h> + #include <ydb/library/yql/providers/dq/common/yql_dq_common.h> #include <ydb/library/yql/providers/dq/counters/counters.h> +#include <ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h> +#include <ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.h> +#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h> +#include <ydb/library/yql/providers/dq/provider/yql_dq_provider.h> -#include "yql_dq_statistics_json.h" +#include <yql/essentials/core/cbo/simple/cbo_simple.h> +#include <yql/essentials/core/facade/yql_facade.h> +#include <yql/essentials/core/file_storage/file_storage.h> +#include <yql/essentials/core/file_storage/proto/file_storage.pb.h> +#include <yql/essentials/core/services/mounts/yql_mounts.h> +#include <yql/essentials/minikql/comp_nodes/mkql_factories.h> +#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h> +#include <yql/essentials/minikql/mkql_function_registry.h> +#include <yql/essentials/utils/log/log.h> + +#include <util/stream/tee.h> +#include <util/string/cast.h> using namespace NYql; -Y_UNIT_TEST_SUITE(TestCommon) { +namespace { +// Runs a YQL/SQL program through the DQ engine with a YT file data source. +// maxTasksPerOperation sets the DQ limit on tasks per query. +bool RunDqProgram( + const TString& code, + ui32 maxTasksPerOperation, + const THashMap<TString, TString>& tableFiles, + TString* errorsMessage = nullptr) +{ + NLog::YqlLoggerScope logger("cerr", false); + + IOutputStream* errorsOutput = &Cerr; + TMaybe<TStringOutput> errorsMessageOutput; + TMaybe<TTeeOutput> tee; + if (errorsMessage) { + errorsMessageOutput.ConstructInPlace(*errorsMessage); + tee.ConstructInPlace(&*errorsMessageOutput, &Cerr); + errorsOutput = &*tee; + } + + TGatewaysConfig gatewaysConfig; + { + auto& dqCfg = *gatewaysConfig.MutableDq(); + auto addSetting = [&](const TString& name, const TString& value) { + auto* s = dqCfg.AddDefaultSettings(); + s->SetName(name); + s->SetValue(value); + }; + addSetting("EnableComputeActor", "1"); + addSetting("MaxTasksPerOperation", ToString(maxTasksPerOperation)); + } + + auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry( + NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone(); + + TVector<TDataProviderInitializer> dataProvidersInit; + + auto yqlNativeServices = NFile::TYtFileServices::Make(functionRegistry.Get(), tableFiles); + auto ytGateway = CreateYtFileGateway(yqlNativeServices); + dataProvidersInit.push_back( + GetYtNativeDataProviderInitializer(ytGateway, MakeSimpleCBOOptimizerFactory(), {})); + + auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({ + NYql::GetCommonDqFactory(), + NKikimr::NMiniKQL::GetYqlFactory() + }); + auto dqTaskTransformFactory = NYql::CreateCompositeTaskTransformFactory({ + NYql::CreateCommonDqTaskTransformFactory() + }); + auto dqGateway = CreateLocalDqGateway( + functionRegistry.Get(), dqCompFactory, dqTaskTransformFactory, + {}, false, MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>()); + + auto storage = NYql::CreateAsyncFileStorage({}); + dataProvidersInit.push_back( + NYql::GetDqDataProviderInitializer( + &CreateDqExecTransformer, dqGateway, dqCompFactory, {}, storage)); + + TExprContext moduleCtx; + IModuleResolver::TPtr moduleResolver; + YQL_ENSURE(GetYqlDefaultModuleResolver(moduleCtx, moduleResolver)); -Y_UNIT_TEST(Empty) { } + TProgramFactory factory(true, functionRegistry.Get(), 0ULL, dataProvidersInit, "ut"); + factory.SetGatewaysConfig(&gatewaysConfig); + factory.SetModules(moduleResolver); + + auto program = factory.Create("program", code); + + NSQLTranslation::TTranslationSettings sqlSettings; + sqlSettings.SyntaxVersion = 1; + sqlSettings.V0Behavior = NSQLTranslation::EV0Behavior::Disable; + sqlSettings.Flags.insert("DqEngineEnable"); + sqlSettings.Flags.insert("DqEngineForce"); + sqlSettings.ClusterMapping["plato"] = TString(YtProviderName); + + if (!program->ParseSql(sqlSettings)) { + program->PrintErrorsTo(*errorsOutput); + return false; + } + + if (!program->Compile("user")) { + program->PrintErrorsTo(*errorsOutput); + return false; + } + + auto status = program->Run("user", nullptr, nullptr, nullptr); + if (status == TProgram::TStatus::Error) { + program->PrintErrorsTo(*errorsOutput); + return false; + } + + return true; +} +} + +Y_UNIT_TEST_SUITE(TestCommon) { Y_UNIT_TEST(ParseCounterName) { TString prefix = "Prefix"; @@ -214,3 +337,29 @@ Y_UNIT_TEST(CollectTaskRunnerStatisticsByTask) { } } + +Y_UNIT_TEST_SUITE(YqlDqExecTests) { + +// Reading from a YT table creates DQ source stages. The number of stages +// serves as a lower bound for the number of tasks (at least one task per +// stage). When stagesCount exceeds maxTasksPerOperation and no fallback is +// allowed (DqEngineForce), the exec transformer must report a user-friendly +// stages error message rather than crashing with YQL_ENSURE. +Y_UNIT_TEST(TooManyStagesErrorMessage) { + const TString code = R"( +USE plato; +SELECT key FROM Input; + )"; + + TTestTablesMapping testTables; + THashMap<TString, TString> tableFiles = { + {"yt.plato.Input", testTables.TmpInput.Name()}, + {"yt.plato.Output", testTables.TmpOutput.Name()}, + }; + + TString errorMessage; + bool ok = RunDqProgram(code, /*maxTasksPerOperation=*/0, tableFiles, &errorMessage); + UNIT_ASSERT_C(!ok, "Expected failure: too many stages"); + UNIT_ASSERT_STRING_CONTAINS(errorMessage, "stages exceeds the limit"); +} +} |
