diff options
author | maxkovalev <maxkovalev@yandex-team.com> | 2023-04-19 12:58:24 +0300 |
---|---|---|
committer | maxkovalev <maxkovalev@yandex-team.com> | 2023-04-19 12:58:24 +0300 |
commit | 9143f30e39d2d0d9d81048632bce5f25de587b3f (patch) | |
tree | 0dc45c4babfc71c3d990c2abd1206db1d1e37481 | |
parent | a75913ec6f266ffaf2967f0bc2e18b5cb6df2c8d (diff) | |
download | ydb-9143f30e39d2d0d9d81048632bce5f25de587b3f.tar.gz |
YQL-15811: Remove DQ fallback for YT errors
YQL-15811: Remove DQ fallback for YT errors
-rw-r--r-- | ydb/library/yql/core/facade/yql_facade.cpp | 131 | ||||
-rw-r--r-- | ydb/library/yql/core/facade/yql_facade.h | 11 |
2 files changed, 74 insertions, 68 deletions
diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp index 6c5b2d18b1..7cfae4e751 100644 --- a/ydb/library/yql/core/facade/yql_facade.cpp +++ b/ydb/library/yql/core/facade/yql_facade.cpp @@ -22,7 +22,6 @@ #include <ydb/library/yql/providers/common/arrow_resolve/yql_simple_arrow_resolver.h> #include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> #include <ydb/library/yql/providers/common/config/yql_setting.h> -#include <ydb/library/yql/public/issue/yql_issue.h> #include <library/cpp/yson/node/node_io.h> #include <library/cpp/deprecated/split/split_iterator.h> @@ -927,6 +926,44 @@ void TProgram::SaveExprRoot() { SavedExprRoot_ = ExprCtx_->DeepCopy(*ExprRoot_, *ExprCtx_, deepClones, /*internStrings*/false, /*copyTypes*/true, /*copyResult*/false, {}); } +std::optional<bool> TProgram::CheckFallbackIssues(const TIssues& issues) { + auto isFallback = std::optional<bool>(); + auto checkIssue = [&](const TIssue& issue) { + if (issue.GetCode() == TIssuesIds::DQ_GATEWAY_ERROR) { + YQL_LOG(DEBUG) << "Gateway Error " << issue; + isFallback = false; + } else if (issue.GetCode() == TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR) { + YQL_LOG(DEBUG) << "Gateway Fallback Error " << issue; + isFallback = true; + } else if (issue.GetCode() == TIssuesIds::DQ_OPTIMIZE_ERROR) { + YQL_LOG(DEBUG) << "Optimize Error " << issue; + isFallback = true; + } else if (issue.GetCode() >= TIssuesIds::YT_ACCESS_DENIED && + issue.GetCode() <= TIssuesIds::YT_FOLDER_INPUT_IS_NOT_A_FOLDER && + (issue.GetSeverity() == TSeverityIds::S_ERROR || + issue.GetSeverity() == TSeverityIds::S_FATAL)) { + YQL_LOG(DEBUG) << "Yt Error " << issue; + isFallback = false; + } + }; + + std::function<void(const TIssuePtr& issue)> recursiveCheck = [&](const TIssuePtr& issue) { + checkIssue(*issue); + for (const auto& subissue : issue->GetSubIssues()) { + recursiveCheck(subissue); + } + }; + + for (const auto& issue : issues) { + checkIssue(issue); + // check subissues + for (const auto& subissue : issue.GetSubIssues()) { + recursiveCheck(subissue); + } + } + return isFallback; +} + TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool applyAsyncChanges) { return AsyncTransform(*Transformer_, ExprRoot_, *ExprCtx_, applyAsyncChanges).Apply([this](const TFuture<IGraphTransformer::TStatus>& res) { @@ -938,40 +975,9 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap && TypeCtx_->DqFallbackPolicy != EFallbackPolicy::Never) { auto issues = ExprCtx_->IssueManager.GetIssues(); - bool hasDqGatewayError = false; - bool hasDqGatewayFallbackError = false; - bool hasDqOptimizeError = false; - - auto checkIssue = [&](const TIssue& issue) { - if (issue.GetCode() == TIssuesIds::DQ_GATEWAY_ERROR) { - YQL_LOG(DEBUG) << "Gateway Error " << issue; - hasDqGatewayError = true; - } else if (issue.GetCode() == TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR) { - YQL_LOG(DEBUG) << "Gateway Fallback Error " << issue; - hasDqGatewayError = true; - hasDqGatewayFallbackError = true; - } else if (issue.GetCode() == TIssuesIds::DQ_OPTIMIZE_ERROR) { - YQL_LOG(DEBUG) << "Optimize Error " << issue; - hasDqOptimizeError = true; - } - }; - - std::function<void(const TIssuePtr& issue)> recursiveCheck = [&](const TIssuePtr& issue) { - checkIssue(*issue); - for (const auto& subissue : issue->GetSubIssues()) { - recursiveCheck(subissue); - } - }; + bool isFallback = CheckFallbackIssues(issues).value_or(true); - for (const auto& issue : issues) { - checkIssue(issue); - // check subissues - for (const auto& subissue : issue.GetSubIssues()) { - recursiveCheck(subissue); - } - } - - if (hasDqGatewayError && !hasDqGatewayFallbackError && TypeCtx_->DqFallbackPolicy != EFallbackPolicy::Always) { + if (!isFallback && TypeCtx_->DqFallbackPolicy != EFallbackPolicy::Always) { // unrecoverable error return res; } @@ -994,40 +1000,37 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap } CleanupLastSession(); - if (hasDqGatewayError || hasDqOptimizeError) { - - std::function<void(const TIssuePtr& issue)> toInfo = [&](const TIssuePtr& issue) { - if (issue->Severity == TSeverityIds::S_ERROR - || issue->Severity == TSeverityIds::S_FATAL - || issue->Severity == TSeverityIds::S_WARNING) - { - issue->Severity = TSeverityIds::S_INFO; - } - for (const auto& subissue : issue->GetSubIssues()) { - toInfo(subissue); - } - }; - - TIssue info("DQ cannot execute the query"); - info.Severity = TSeverityIds::S_INFO; - - for (auto& issue : issues) { - TIssuePtr newIssue = new TIssue(issue); - if (newIssue->Severity == TSeverityIds::S_ERROR - || issue.Severity == TSeverityIds::S_FATAL - || issue.Severity == TSeverityIds::S_WARNING) - { - newIssue->Severity = TSeverityIds::S_INFO; - } - for (auto& subissue : newIssue->GetSubIssues()) { - toInfo(subissue); - } - info.AddSubIssue(newIssue); + std::function<void(const TIssuePtr& issue)> toInfo = [&](const TIssuePtr& issue) { + if (issue->Severity == TSeverityIds::S_ERROR + || issue->Severity == TSeverityIds::S_FATAL + || issue->Severity == TSeverityIds::S_WARNING) + { + issue->Severity = TSeverityIds::S_INFO; } + for (const auto& subissue : issue->GetSubIssues()) { + toInfo(subissue); + } + }; + + TIssue info("DQ cannot execute the query"); + info.Severity = TSeverityIds::S_INFO; - ExprCtx_->IssueManager.AddIssues({info}); + for (auto& issue : issues) { + TIssuePtr newIssue = new TIssue(issue); + if (newIssue->Severity == TSeverityIds::S_ERROR + || issue.Severity == TSeverityIds::S_FATAL + || issue.Severity == TSeverityIds::S_WARNING) + { + newIssue->Severity = TSeverityIds::S_INFO; + } + for (auto& subissue : newIssue->GetSubIssues()) { + toInfo(subissue); + } + info.AddSubIssue(newIssue); } + ExprCtx_->IssueManager.AddIssues({info}); + ++FallbackCounter_; // don't execute recapture again ExprCtx_->Step.Done(TExprStep::Recapture); diff --git a/ydb/library/yql/core/facade/yql_facade.h b/ydb/library/yql/core/facade/yql_facade.h index cbbb758efd..08062c0623 100644 --- a/ydb/library/yql/core/facade/yql_facade.h +++ b/ydb/library/yql/core/facade/yql_facade.h @@ -1,13 +1,14 @@ #pragma once +#include <ydb/library/yql/core/credentials/yql_credentials.h> +#include <ydb/library/yql/core/file_storage/file_storage.h> #include <ydb/library/yql/core/services/yql_plan.h> #include <ydb/library/yql/core/services/yql_transform_pipeline.h> -#include <ydb/library/yql/core/file_storage/file_storage.h> -#include <ydb/library/yql/core/credentials/yql_credentials.h> -#include <ydb/library/yql/providers/config/yql_config_provider.h> -#include <ydb/library/yql/providers/result/provider/yql_result_provider.h> #include <ydb/library/yql/core/yql_type_annotation.h> #include <ydb/library/yql/core/yql_user_data.h> +#include <ydb/library/yql/providers/config/yql_config_provider.h> +#include <ydb/library/yql/providers/result/provider/yql_result_provider.h> +#include <ydb/library/yql/public/issue/yql_issue.h> #include <ydb/library/yql/sql/sql.h> #include <library/cpp/random_provider/random_provider.h> @@ -348,6 +349,8 @@ private: void SaveExprRoot(); private: + std::optional<bool> CheckFallbackIssues(const TIssues& issues); + const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry_; const TIntrusivePtr<IRandomProvider> RandomProvider_; const TIntrusivePtr<ITimeProvider> TimeProvider_; |