aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormaxkovalev <maxkovalev@yandex-team.com>2023-04-19 12:58:24 +0300
committermaxkovalev <maxkovalev@yandex-team.com>2023-04-19 12:58:24 +0300
commit9143f30e39d2d0d9d81048632bce5f25de587b3f (patch)
tree0dc45c4babfc71c3d990c2abd1206db1d1e37481
parenta75913ec6f266ffaf2967f0bc2e18b5cb6df2c8d (diff)
downloadydb-9143f30e39d2d0d9d81048632bce5f25de587b3f.tar.gz
YQL-15811: Remove DQ fallback for YT errors
YQL-15811: Remove DQ fallback for YT errors
-rw-r--r--ydb/library/yql/core/facade/yql_facade.cpp131
-rw-r--r--ydb/library/yql/core/facade/yql_facade.h11
2 files changed, 74 insertions, 68 deletions
diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp
index 6c5b2d18b1..7cfae4e751 100644
--- a/ydb/library/yql/core/facade/yql_facade.cpp
+++ b/ydb/library/yql/core/facade/yql_facade.cpp
@@ -22,7 +22,6 @@
#include <ydb/library/yql/providers/common/arrow_resolve/yql_simple_arrow_resolver.h>
#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
#include <ydb/library/yql/providers/common/config/yql_setting.h>
-#include <ydb/library/yql/public/issue/yql_issue.h>
#include <library/cpp/yson/node/node_io.h>
#include <library/cpp/deprecated/split/split_iterator.h>
@@ -927,6 +926,44 @@ void TProgram::SaveExprRoot() {
SavedExprRoot_ = ExprCtx_->DeepCopy(*ExprRoot_, *ExprCtx_, deepClones, /*internStrings*/false, /*copyTypes*/true, /*copyResult*/false, {});
}
+std::optional<bool> TProgram::CheckFallbackIssues(const TIssues& issues) {
+ auto isFallback = std::optional<bool>();
+ auto checkIssue = [&](const TIssue& issue) {
+ if (issue.GetCode() == TIssuesIds::DQ_GATEWAY_ERROR) {
+ YQL_LOG(DEBUG) << "Gateway Error " << issue;
+ isFallback = false;
+ } else if (issue.GetCode() == TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR) {
+ YQL_LOG(DEBUG) << "Gateway Fallback Error " << issue;
+ isFallback = true;
+ } else if (issue.GetCode() == TIssuesIds::DQ_OPTIMIZE_ERROR) {
+ YQL_LOG(DEBUG) << "Optimize Error " << issue;
+ isFallback = true;
+ } else if (issue.GetCode() >= TIssuesIds::YT_ACCESS_DENIED &&
+ issue.GetCode() <= TIssuesIds::YT_FOLDER_INPUT_IS_NOT_A_FOLDER &&
+ (issue.GetSeverity() == TSeverityIds::S_ERROR ||
+ issue.GetSeverity() == TSeverityIds::S_FATAL)) {
+ YQL_LOG(DEBUG) << "Yt Error " << issue;
+ isFallback = false;
+ }
+ };
+
+ std::function<void(const TIssuePtr& issue)> recursiveCheck = [&](const TIssuePtr& issue) {
+ checkIssue(*issue);
+ for (const auto& subissue : issue->GetSubIssues()) {
+ recursiveCheck(subissue);
+ }
+ };
+
+ for (const auto& issue : issues) {
+ checkIssue(issue);
+ // check subissues
+ for (const auto& subissue : issue.GetSubIssues()) {
+ recursiveCheck(subissue);
+ }
+ }
+ return isFallback;
+}
+
TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool applyAsyncChanges)
{
return AsyncTransform(*Transformer_, ExprRoot_, *ExprCtx_, applyAsyncChanges).Apply([this](const TFuture<IGraphTransformer::TStatus>& res) {
@@ -938,40 +975,9 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap
&& TypeCtx_->DqFallbackPolicy != EFallbackPolicy::Never)
{
auto issues = ExprCtx_->IssueManager.GetIssues();
- bool hasDqGatewayError = false;
- bool hasDqGatewayFallbackError = false;
- bool hasDqOptimizeError = false;
-
- auto checkIssue = [&](const TIssue& issue) {
- if (issue.GetCode() == TIssuesIds::DQ_GATEWAY_ERROR) {
- YQL_LOG(DEBUG) << "Gateway Error " << issue;
- hasDqGatewayError = true;
- } else if (issue.GetCode() == TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR) {
- YQL_LOG(DEBUG) << "Gateway Fallback Error " << issue;
- hasDqGatewayError = true;
- hasDqGatewayFallbackError = true;
- } else if (issue.GetCode() == TIssuesIds::DQ_OPTIMIZE_ERROR) {
- YQL_LOG(DEBUG) << "Optimize Error " << issue;
- hasDqOptimizeError = true;
- }
- };
-
- std::function<void(const TIssuePtr& issue)> recursiveCheck = [&](const TIssuePtr& issue) {
- checkIssue(*issue);
- for (const auto& subissue : issue->GetSubIssues()) {
- recursiveCheck(subissue);
- }
- };
+ bool isFallback = CheckFallbackIssues(issues).value_or(true);
- for (const auto& issue : issues) {
- checkIssue(issue);
- // check subissues
- for (const auto& subissue : issue.GetSubIssues()) {
- recursiveCheck(subissue);
- }
- }
-
- if (hasDqGatewayError && !hasDqGatewayFallbackError && TypeCtx_->DqFallbackPolicy != EFallbackPolicy::Always) {
+ if (!isFallback && TypeCtx_->DqFallbackPolicy != EFallbackPolicy::Always) {
// unrecoverable error
return res;
}
@@ -994,40 +1000,37 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap
}
CleanupLastSession();
- if (hasDqGatewayError || hasDqOptimizeError) {
-
- std::function<void(const TIssuePtr& issue)> toInfo = [&](const TIssuePtr& issue) {
- if (issue->Severity == TSeverityIds::S_ERROR
- || issue->Severity == TSeverityIds::S_FATAL
- || issue->Severity == TSeverityIds::S_WARNING)
- {
- issue->Severity = TSeverityIds::S_INFO;
- }
- for (const auto& subissue : issue->GetSubIssues()) {
- toInfo(subissue);
- }
- };
-
- TIssue info("DQ cannot execute the query");
- info.Severity = TSeverityIds::S_INFO;
-
- for (auto& issue : issues) {
- TIssuePtr newIssue = new TIssue(issue);
- if (newIssue->Severity == TSeverityIds::S_ERROR
- || issue.Severity == TSeverityIds::S_FATAL
- || issue.Severity == TSeverityIds::S_WARNING)
- {
- newIssue->Severity = TSeverityIds::S_INFO;
- }
- for (auto& subissue : newIssue->GetSubIssues()) {
- toInfo(subissue);
- }
- info.AddSubIssue(newIssue);
+ std::function<void(const TIssuePtr& issue)> toInfo = [&](const TIssuePtr& issue) {
+ if (issue->Severity == TSeverityIds::S_ERROR
+ || issue->Severity == TSeverityIds::S_FATAL
+ || issue->Severity == TSeverityIds::S_WARNING)
+ {
+ issue->Severity = TSeverityIds::S_INFO;
}
+ for (const auto& subissue : issue->GetSubIssues()) {
+ toInfo(subissue);
+ }
+ };
+
+ TIssue info("DQ cannot execute the query");
+ info.Severity = TSeverityIds::S_INFO;
- ExprCtx_->IssueManager.AddIssues({info});
+ for (auto& issue : issues) {
+ TIssuePtr newIssue = new TIssue(issue);
+ if (newIssue->Severity == TSeverityIds::S_ERROR
+ || issue.Severity == TSeverityIds::S_FATAL
+ || issue.Severity == TSeverityIds::S_WARNING)
+ {
+ newIssue->Severity = TSeverityIds::S_INFO;
+ }
+ for (auto& subissue : newIssue->GetSubIssues()) {
+ toInfo(subissue);
+ }
+ info.AddSubIssue(newIssue);
}
+ ExprCtx_->IssueManager.AddIssues({info});
+
++FallbackCounter_;
// don't execute recapture again
ExprCtx_->Step.Done(TExprStep::Recapture);
diff --git a/ydb/library/yql/core/facade/yql_facade.h b/ydb/library/yql/core/facade/yql_facade.h
index cbbb758efd..08062c0623 100644
--- a/ydb/library/yql/core/facade/yql_facade.h
+++ b/ydb/library/yql/core/facade/yql_facade.h
@@ -1,13 +1,14 @@
#pragma once
+#include <ydb/library/yql/core/credentials/yql_credentials.h>
+#include <ydb/library/yql/core/file_storage/file_storage.h>
#include <ydb/library/yql/core/services/yql_plan.h>
#include <ydb/library/yql/core/services/yql_transform_pipeline.h>
-#include <ydb/library/yql/core/file_storage/file_storage.h>
-#include <ydb/library/yql/core/credentials/yql_credentials.h>
-#include <ydb/library/yql/providers/config/yql_config_provider.h>
-#include <ydb/library/yql/providers/result/provider/yql_result_provider.h>
#include <ydb/library/yql/core/yql_type_annotation.h>
#include <ydb/library/yql/core/yql_user_data.h>
+#include <ydb/library/yql/providers/config/yql_config_provider.h>
+#include <ydb/library/yql/providers/result/provider/yql_result_provider.h>
+#include <ydb/library/yql/public/issue/yql_issue.h>
#include <ydb/library/yql/sql/sql.h>
#include <library/cpp/random_provider/random_provider.h>
@@ -348,6 +349,8 @@ private:
void SaveExprRoot();
private:
+ std::optional<bool> CheckFallbackIssues(const TIssues& issues);
+
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry_;
const TIntrusivePtr<IRandomProvider> RandomProvider_;
const TIntrusivePtr<ITimeProvider> TimeProvider_;