diff options
author | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-04-19 19:32:48 +0300 |
---|---|---|
committer | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-04-19 19:32:48 +0300 |
commit | a13e5a3e3147f74935374910ad015543294ff25c (patch) | |
tree | 8a2348da78f5569c3a1e3311e48349ff40ac3ecc | |
parent | a0c35819c365374b81dd55830cedb90bc0e54044 (diff) | |
download | ydb-a13e5a3e3147f74935374910ad015543294ff25c.tar.gz |
Correct DQ statuses processing
ref:db0f534b6668d6dd3eb524e5daa803c15d279fc4
5 files changed, 9 insertions, 5 deletions
diff --git a/ydb/library/yql/dq/actors/protos/dq_status_codes.proto b/ydb/library/yql/dq/actors/protos/dq_status_codes.proto index b7180e2a095..f0cde1171e0 100644 --- a/ydb/library/yql/dq/actors/protos/dq_status_codes.proto +++ b/ydb/library/yql/dq/actors/protos/dq_status_codes.proto @@ -15,5 +15,7 @@ message StatusIds { PRECONDITION_FAILED = 7; CANCELLED = 8; OVERLOADED = 9; + LIMIT_EXCEEDED = 10; + UNSUPPORTED = 11; } } diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index cb330ee1351..420722b0f28 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -87,7 +87,7 @@ private: issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR); Issues.AddIssues({issue}); *ExecutionTimeoutCounter += 1; - Finish(NYql::NDqProto::StatusIds::TIMEOUT, /*retriable=*/ false, /*needFallback=*/ true); + Finish(NYql::NDqProto::StatusIds::LIMIT_EXCEEDED, /*retriable=*/ false, /*needFallback=*/ true); }) cFunc(TEvents::TEvWakeup::EventType, OnWakeup) }) diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index cac1420315c..1fafa3442a2 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -677,12 +677,12 @@ private: source.HasData = true; Send(SelfId(), new TEvContinueRun()); } catch (...) { - SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, CurrentExceptionMessage(), false, false)); + SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSPECIFIED, CurrentExceptionMessage(), false, false)); } } void OnSourceError(const IDqSourceActor::TEvSourceError::TPtr& ev) { Y_UNUSED(ev->Get()->InputIndex); - SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSPECIFIED, ev->Get()->Issues.ToString(), !ev->Get()->IsFatal, !ev->Get()->IsFatal)); + SendFailure(MakeHolder<TEvDqFailure>(ev->Get()->IsFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, ev->Get()->Issues.ToString(), !ev->Get()->IsFatal, !ev->Get()->IsFatal)); } void OnSourcePushFinished(TEvSourcePushFinished::TPtr& ev, const TActorContext& ctx) { auto index = ev->Get()->Index; @@ -698,7 +698,7 @@ private: void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) override { Y_UNUSED(outputIndex); - SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSPECIFIED, issues.ToString(), !isFatal, !isFatal)); + SendFailure(MakeHolder<TEvDqFailure>(isFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, issues.ToString(), !isFatal, !isFatal)); } void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override { diff --git a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp index c3f7e378bf8..fd1a21e54ec 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp @@ -99,6 +99,7 @@ bool IsRetriable(NYql::NDqProto::StatusIds::StatusCode statusCode) { case NYql::NDqProto::StatusIds::UNSPECIFIED: case NYql::NDqProto::StatusIds::SUCCESS: case NYql::NDqProto::StatusIds::BAD_REQUEST: + case NYql::NDqProto::StatusIds::LIMIT_EXCEEDED: return false; default: return true; @@ -118,6 +119,7 @@ bool NeedFallback(NYql::NDqProto::StatusIds::StatusCode statusCode) { case NYql::NDqProto::StatusIds::BAD_REQUEST: case NYql::NDqProto::StatusIds::PRECONDITION_FAILED: return false; + case NYql::NDqProto::StatusIds::LIMIT_EXCEEDED: default: return true; } diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index fc491f0a9f7..f550dea9c40 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -179,7 +179,7 @@ private: issue.AddSubIssue(MakeIntrusive<TIssue>(YqlIssue(parsedPos.GetOrElse(TPosition()), TIssuesIds::DQ_GATEWAY_ERROR, TString{terminationMessage}))); } } - return MakeHolder<NDq::TEvDq::TEvAbortExecution>(runResult.Retriable ? NYql::NDqProto::StatusIds::UNAVAILABLE : NYql::NDqProto::StatusIds::BAD_REQUEST, TVector<TIssue>{issue}); + return MakeHolder<NDq::TEvDq::TEvAbortExecution>(runResult.Retriable ? NYql::NDqProto::StatusIds::UNAVAILABLE : (runResult.Fallback ? NYql::NDqProto::StatusIds::UNSUPPORTED : NYql::NDqProto::StatusIds::BAD_REQUEST), TVector<TIssue>{issue}); } void PassAway() override { |