aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <hor911@gmail.com>2022-04-19 19:32:48 +0300
committerAleksandr Khoroshilov <hor911@gmail.com>2022-04-19 19:32:48 +0300
commita13e5a3e3147f74935374910ad015543294ff25c (patch)
tree8a2348da78f5569c3a1e3311e48349ff40ac3ecc
parenta0c35819c365374b81dd55830cedb90bc0e54044 (diff)
downloadydb-a13e5a3e3147f74935374910ad015543294ff25c.tar.gz
Correct DQ statuses processing
ref:db0f534b6668d6dd3eb524e5daa803c15d279fc4
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_status_codes.proto2
-rw-r--r--ydb/library/yql/providers/dq/actors/executer_actor.cpp2
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp6
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_common.cpp2
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp2
5 files changed, 9 insertions, 5 deletions
diff --git a/ydb/library/yql/dq/actors/protos/dq_status_codes.proto b/ydb/library/yql/dq/actors/protos/dq_status_codes.proto
index b7180e2a095..f0cde1171e0 100644
--- a/ydb/library/yql/dq/actors/protos/dq_status_codes.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_status_codes.proto
@@ -15,5 +15,7 @@ message StatusIds {
PRECONDITION_FAILED = 7;
CANCELLED = 8;
OVERLOADED = 9;
+ LIMIT_EXCEEDED = 10;
+ UNSUPPORTED = 11;
}
}
diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
index cb330ee1351..420722b0f28 100644
--- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
@@ -87,7 +87,7 @@ private:
issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR);
Issues.AddIssues({issue});
*ExecutionTimeoutCounter += 1;
- Finish(NYql::NDqProto::StatusIds::TIMEOUT, /*retriable=*/ false, /*needFallback=*/ true);
+ Finish(NYql::NDqProto::StatusIds::LIMIT_EXCEEDED, /*retriable=*/ false, /*needFallback=*/ true);
})
cFunc(TEvents::TEvWakeup::EventType, OnWakeup)
})
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index cac1420315c..1fafa3442a2 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -677,12 +677,12 @@ private:
source.HasData = true;
Send(SelfId(), new TEvContinueRun());
} catch (...) {
- SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, CurrentExceptionMessage(), false, false));
+ SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSPECIFIED, CurrentExceptionMessage(), false, false));
}
}
void OnSourceError(const IDqSourceActor::TEvSourceError::TPtr& ev) {
Y_UNUSED(ev->Get()->InputIndex);
- SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSPECIFIED, ev->Get()->Issues.ToString(), !ev->Get()->IsFatal, !ev->Get()->IsFatal));
+ SendFailure(MakeHolder<TEvDqFailure>(ev->Get()->IsFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, ev->Get()->Issues.ToString(), !ev->Get()->IsFatal, !ev->Get()->IsFatal));
}
void OnSourcePushFinished(TEvSourcePushFinished::TPtr& ev, const TActorContext& ctx) {
auto index = ev->Get()->Index;
@@ -698,7 +698,7 @@ private:
void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) override {
Y_UNUSED(outputIndex);
- SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSPECIFIED, issues.ToString(), !isFatal, !isFatal));
+ SendFailure(MakeHolder<TEvDqFailure>(isFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, issues.ToString(), !isFatal, !isFatal));
}
void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override {
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp
index c3f7e378bf8..fd1a21e54ec 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp
@@ -99,6 +99,7 @@ bool IsRetriable(NYql::NDqProto::StatusIds::StatusCode statusCode) {
case NYql::NDqProto::StatusIds::UNSPECIFIED:
case NYql::NDqProto::StatusIds::SUCCESS:
case NYql::NDqProto::StatusIds::BAD_REQUEST:
+ case NYql::NDqProto::StatusIds::LIMIT_EXCEEDED:
return false;
default:
return true;
@@ -118,6 +119,7 @@ bool NeedFallback(NYql::NDqProto::StatusIds::StatusCode statusCode) {
case NYql::NDqProto::StatusIds::BAD_REQUEST:
case NYql::NDqProto::StatusIds::PRECONDITION_FAILED:
return false;
+ case NYql::NDqProto::StatusIds::LIMIT_EXCEEDED:
default:
return true;
}
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index fc491f0a9f7..f550dea9c40 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -179,7 +179,7 @@ private:
issue.AddSubIssue(MakeIntrusive<TIssue>(YqlIssue(parsedPos.GetOrElse(TPosition()), TIssuesIds::DQ_GATEWAY_ERROR, TString{terminationMessage})));
}
}
- return MakeHolder<NDq::TEvDq::TEvAbortExecution>(runResult.Retriable ? NYql::NDqProto::StatusIds::UNAVAILABLE : NYql::NDqProto::StatusIds::BAD_REQUEST, TVector<TIssue>{issue});
+ return MakeHolder<NDq::TEvDq::TEvAbortExecution>(runResult.Retriable ? NYql::NDqProto::StatusIds::UNAVAILABLE : (runResult.Fallback ? NYql::NDqProto::StatusIds::UNSUPPORTED : NYql::NDqProto::StatusIds::BAD_REQUEST), TVector<TIssue>{issue});
}
void PassAway() override {