aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <hor911@gmail.com>2022-04-12 23:24:38 +0300
committerAleksandr Khoroshilov <hor911@gmail.com>2022-04-12 23:24:38 +0300
commit1fe30aa6b94852e9165c993b9e3993c93a50b786 (patch)
treea9e999a1f077c4b2ecdeaac4c1724828822fad23
parente1729eae1440f57803573078549027451efa0bab (diff)
downloadydb-1fe30aa6b94852e9165c993b9e3993c93a50b786.tar.gz
Sync StatusCode with Retriable and NeedFallback
ref:fa9c20e0b5a0551b99d3f14bc336ae97aa25b2f0
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp9
-rw-r--r--ydb/library/yql/providers/dq/actors/events.cpp8
-rw-r--r--ydb/library/yql/providers/dq/actors/events.h5
-rw-r--r--ydb/library/yql/providers/dq/actors/executer_actor.cpp21
-rw-r--r--ydb/library/yql/providers/dq/actors/full_result_writer.cpp4
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp5
-rw-r--r--ydb/library/yql/providers/dq/api/protos/dqs.proto8
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_common.cpp28
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_common.h3
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_service.cpp19
10 files changed, 79 insertions, 31 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index 064b86a7d5a..4179d4c0bf1 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -651,13 +651,13 @@ private:
void SaveQueryResponse(NYql::NDqs::TEvQueryResponse::TPtr& ev) {
auto& result = ev->Get()->Record;
- LOG_D("Query response. Retryable: " << result.GetRetriable()
+ LOG_D("Query response. Retryable: " << result.GetDeprecatedRetriable()
<< ". Result set index: " << DqGraphIndex
<< ". Issues count: " << result.IssuesSize()
<< ". Rows count: " << result.GetRowsCount());
AddIssues(result.issues());
- RetryNeeded |= result.GetRetriable();
+ RetryNeeded |= result.GetDeprecatedRetriable();
if (Finishing && !result.issues_size()) { // Race between abort and successful finishing. Override with success and provide results to user.
FinalQueryStatus = YandexQuery::QueryMeta::COMPLETED;
@@ -713,7 +713,10 @@ private:
// In this case we can have race between normal finishing of running query and aborting it.
// If query is finished with success error code or failure != abort, we override abort with this result.
// This logic is located in SaveQueryResponse() method.
- ev->Get()->Record.SetRetriable(false); // User aborted => don't retry, only show issues
+ if (ev->Get()->Record.GetStatusCode() != NYql::NDqProto::StatusIds::SUCCESS) {
+ ev->Get()->Record.SetStatusCode(NYql::NDqProto::StatusIds::CANCELLED);
+ }
+ ev->Get()->Record.SetDeprecatedRetriable(false); // User aborted => don't retry, only show issues
QueryResponseArrived = true;
SaveQueryResponse(ev);
diff --git a/ydb/library/yql/providers/dq/actors/events.cpp b/ydb/library/yql/providers/dq/actors/events.cpp
index a983ff5d873..7a9e3f2aedd 100644
--- a/ydb/library/yql/providers/dq/actors/events.cpp
+++ b/ydb/library/yql/providers/dq/actors/events.cpp
@@ -8,10 +8,14 @@ namespace NYql::NDqs {
*Record.MutableTask() = std::move(task);
}
+ TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode) {
+ Record.SetStatusCode(statusCode);
+ }
+
TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool retriable, bool needFallback) {
IssuesToMessage(issues, Record.MutableIssues());
- Record.SetRetriable(retriable);
- Record.SetNeedFallback(needFallback);
+ Record.SetDeprecatedRetriable(retriable);
+ Record.SetDeprecatedNeedFallback(needFallback);
Record.SetStatusCode(statusCode);
}
diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h
index e2d89453d31..19f842dbe9b 100644
--- a/ydb/library/yql/providers/dq/actors/events.h
+++ b/ydb/library/yql/providers/dq/actors/events.h
@@ -21,8 +21,9 @@ namespace NYql::NDqs {
struct TEvDqFailure : NActors::TEventPB<TEvDqFailure, NDqProto::TDqFailure, TDqExecuterEvents::ES_DQ_FAILURE> {
TEvDqFailure() = default;
- explicit TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool retriable = false, bool needFallback = false);
- explicit TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssue& issue, bool retriable = false, bool needFallback = false);
+ explicit TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode);
+ TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool retriable = false, bool needFallback = false);
+ TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssue& issue, bool retriable = false, bool needFallback = false);
TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& error, bool retriable, bool needFallback);
};
diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
index 91f1caee748..cb330ee1351 100644
--- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
@@ -87,7 +87,7 @@ private:
issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR);
Issues.AddIssues({issue});
*ExecutionTimeoutCounter += 1;
- Finish(/*retriable=*/ false, /*needFallback=*/ true);
+ Finish(NYql::NDqProto::StatusIds::TIMEOUT, /*retriable=*/ false, /*needFallback=*/ true);
})
cFunc(TEvents::TEvWakeup::EventType, OnWakeup)
})
@@ -236,7 +236,7 @@ private:
}
}
- void Finish(bool retriable, bool needFallback = false)
+ void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode, bool retriable, bool needFallback = false)
{
YQL_LOG(DEBUG) << __FUNCTION__ << ", retriable=" << retriable << ", needFallback=" << needFallback;
if (Finished) {
@@ -245,8 +245,9 @@ private:
FlushCounter("ExecutionTime");
TQueryResponse result;
IssuesToMessage(Issues, result.MutableIssues());
- result.SetRetriable(retriable);
- result.SetNeedFallback(needFallback);
+ result.SetDeprecatedRetriable(retriable);
+ result.SetDeprecatedNeedFallback(needFallback);
+ result.SetStatusCode(statusCode);
FlushCounters(result);
Send(ControlId, MakeHolder<TEvQueryResponse>(std::move(result)));
Finished = true;
@@ -258,14 +259,14 @@ private:
YQL_LOG_CTX_SCOPE(TraceId);
YQL_LOG(DEBUG) << __FUNCTION__;
AddCounters(ev->Get()->Record);
- bool retriable = ev->Get()->Record.GetRetriable();
- bool fallback = ev->Get()->Record.GetNeedFallback();
+ bool retriable = ev->Get()->Record.GetDeprecatedRetriable();
+ bool fallback = ev->Get()->Record.GetDeprecatedNeedFallback();
if (ev->Get()->Record.IssuesSize()) {
TIssues issues;
IssuesFromMessage(ev->Get()->Record.GetIssues(), issues);
Issues.AddIssues(issues);
}
- Finish(retriable, fallback);
+ Finish(ev->Get()->Record.GetStatusCode(), retriable, fallback);
}
}
@@ -275,11 +276,11 @@ private:
if (!Finished) {
try {
TFailureInjector::Reach("dq_fail_on_finish", [] { throw yexception() << "dq_fail_on_finish"; });
- Finish(false);
+ Finish(NYql::NDqProto::StatusIds::SUCCESS, false);
} catch (...) {
YQL_LOG(ERROR) << " FailureInjector " << CurrentExceptionMessage();
Issues.AddIssue(TIssue("FailureInjection"));
- Finish(true);
+ Finish(NYql::NDqProto::StatusIds::UNAVAILABLE, true);
}
}
}
@@ -313,7 +314,7 @@ private:
<< ev->Get()->Record.GetError().GetMessage() << ":"
<< static_cast<int>(ev->Get()->Record.GetError().GetErrorCode());
Issues.AddIssue(TIssue(ev->Get()->Record.GetError().GetMessage()).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR));
- Finish(/*retriable = */ true, /*fallback = */ true);
+ Finish(NYql::NDqProto::StatusIds::OVERLOADED, /*retriable = */ true, /*fallback = */ true);
return;
}
case TAllocateWorkersResponse::kNodes:
diff --git a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
index d6b1ba1b94e..9bfd095bdd6 100644
--- a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
+++ b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
@@ -89,7 +89,7 @@ private:
if (ErrorMessage) {
Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, *ErrorMessage, false, true));
} else {
- Send(AggregatorID, MakeHolder<TEvDqFailure>().Release());
+ Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::SUCCESS).Release());
}
} catch (...) {
TIssue issue(CurrentExceptionMessage());
@@ -97,7 +97,7 @@ private:
if (ErrorMessage) {
issue.AddSubIssue(MakeIntrusive<TIssue>(*ErrorMessage));
}
- Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, issue, false, false).Release());
+ Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, issue, false, true).Release());
}
Send(SelfId(), MakeHolder<NActors::TEvents::TEvPoison>());
}
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index e99001bf6f8..8cb708b2f3f 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -194,8 +194,9 @@ private:
}
// hacky conversion to TEvDqFailure
auto convertedError = MakeHolder<TEvDqFailure>();
- convertedError->Record.SetRetriable(NCommon::IsRetriable(ev));
- convertedError->Record.SetNeedFallback(NCommon::NeedFallback(ev));
+ convertedError->Record.SetDeprecatedRetriable(NCommon::IsRetriable(ev));
+ convertedError->Record.SetDeprecatedNeedFallback(NCommon::NeedFallback(ev));
+ convertedError->Record.SetStatusCode(ev->Get()->Record.GetStatusCode());
convertedError->Record.MutableIssues()->Swap(ev->Get()->Record.MutableIssues());
SendFailure(std::move(convertedError)); // enreached with stats inside
}
diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto
index dd643285c10..8145334021f 100644
--- a/ydb/library/yql/providers/dq/api/protos/dqs.proto
+++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto
@@ -173,8 +173,8 @@ message TQueryResponse {
Ydb.ResultSet ResultSet = 1;
repeated Ydb.Issue.IssueMessage Issues = 2;
bytes Yson = 3;
- bool Retriable = 4;
- bool NeedFallback = 6;
+ bool DeprecatedRetriable = 4;
+ bool DeprecatedNeedFallback = 6;
repeated TMetric Metric = 5;
bool Truncated = 7;
uint64 RowsCount = 8;
@@ -184,8 +184,8 @@ message TQueryResponse {
message TDqFailure {
repeated Ydb.Issue.IssueMessage Issues = 4;
repeated TMetric Metric = 5;
- bool Retriable = 6;
- bool NeedFallback = 7;
+ bool DeprecatedRetriable = 6;
+ bool DeprecatedNeedFallback = 7;
NYql.NDqProto.StatusIds.StatusCode StatusCode = 8;
};
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp
index 91bdf3bb7ef..c3f7e378bf8 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp
@@ -94,9 +94,33 @@ bool ParseCounterName(TString* prefix, std::map<TString, TString>* labels, TStri
return !name->empty();
}
+bool IsRetriable(NYql::NDqProto::StatusIds::StatusCode statusCode) {
+ switch (statusCode) {
+ case NYql::NDqProto::StatusIds::UNSPECIFIED:
+ case NYql::NDqProto::StatusIds::SUCCESS:
+ case NYql::NDqProto::StatusIds::BAD_REQUEST:
+ return false;
+ default:
+ return true;
+ }
+}
+
bool IsRetriable(const NDq::TEvDq::TEvAbortExecution::TPtr& ev) {
- const auto statusCode = ev->Get()->Record.GetStatusCode();
- return statusCode != NYql::NDqProto::StatusIds::BAD_REQUEST;
+ return IsRetriable(ev->Get()->Record.GetStatusCode());
+}
+
+bool NeedFallback(NYql::NDqProto::StatusIds::StatusCode statusCode) {
+ switch (statusCode) {
+ case NYql::NDqProto::StatusIds::UNSPECIFIED:
+ case NYql::NDqProto::StatusIds::SUCCESS:
+ case NYql::NDqProto::StatusIds::ABORTED:
+ case NYql::NDqProto::StatusIds::CANCELLED:
+ case NYql::NDqProto::StatusIds::BAD_REQUEST:
+ case NYql::NDqProto::StatusIds::PRECONDITION_FAILED:
+ return false;
+ default:
+ return true;
+ }
}
bool NeedFallback(const NDq::TEvDq::TEvAbortExecution::TPtr& ev) {
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_common.h b/ydb/library/yql/providers/dq/common/yql_dq_common.h
index 3620862b128..0841c112d47 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_common.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_common.h
@@ -14,8 +14,9 @@ TString GetSerializedResultType(const TString& program);
bool ParseCounterName(TString* prefix, std::map<TString, TString>* labels, TString* name, const TString& counterName);
+bool IsRetriable(NYql::NDqProto::StatusIds::StatusCode statusCode);
bool IsRetriable(const NDq::TEvDq::TEvAbortExecution::TPtr& ev);
-
+bool NeedFallback(NYql::NDqProto::StatusIds::StatusCode statusCode);
bool NeedFallback(const NDq::TEvDq::TEvAbortExecution::TPtr& ev);
} // namespace NCommon
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp
index 824c74fa604..8ecd1cb0f93 100644
--- a/ydb/library/yql/providers/dq/service/grpc_service.cpp
+++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp
@@ -131,7 +131,13 @@ namespace NYql::NDqs {
queryResult.Mutableresult()->CopyFrom(result.resultset());
queryResult.set_yson(result.yson());
- if (result.GetNeedFallback()) {
+ auto needFallback = result.GetDeprecatedNeedFallback();
+
+ if (needFallback != NCommon::NeedFallback(ev->Get()->Record.GetStatusCode())) {
+ Counters->GetSubgroup("MistmatchedNeedFallback", needFallback ? "False" : "True")->GetCounter(NYql::NDqProto::StatusIds_StatusCode_Name(ev->Get()->Record.GetStatusCode()))->Inc();
+ }
+
+ if (needFallback) {
NYql::TIssue rootIssue("Fatal Error");
rootIssue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR);
NYql::TIssues issues;
@@ -183,20 +189,27 @@ namespace NYql::NDqs {
YQL_LOG_CTX_SCOPE(TraceId);
YQL_LOG(DEBUG) << "TServiceProxyActor::OnReturnResult " << ev->Get()->Record.GetMetric().size();
QueryStat.AddCounters(ev->Get()->Record);
- if (ev->Get()->Record.GetIssues().size() > 0 && ev->Get()->Record.GetRetriable() && Retry < MaxRetries) {
+ auto retriable = ev->Get()->Record.GetDeprecatedRetriable();
+
+ if (retriable != NCommon::IsRetriable(ev->Get()->Record.GetStatusCode())) {
+ Counters->GetSubgroup("MistmatchedRetriable", retriable ? "False" : "True")->GetCounter(NYql::NDqProto::StatusIds_StatusCode_Name(ev->Get()->Record.GetStatusCode()))->Inc();
+ }
+
+ if (ev->Get()->Record.GetIssues().size() > 0 && retriable && Retry < MaxRetries) {
QueryStat.AddCounter(RetryName, TDuration::MilliSeconds(0));
NYql::TIssues issues;
NYql::IssuesFromMessage(ev->Get()->Record.GetIssues(), issues);
YQL_LOG(WARN) << RetryName << " " << Retry << " Issues: " << issues.ToString();
DoRetry();
} else {
+ auto needFallback = ev->Get()->Record.GetDeprecatedNeedFallback();
if (ev->Get()->Record.GetIssues().size() > 0) {
NYql::TIssues issues;
NYql::IssuesFromMessage(ev->Get()->Record.GetIssues(), issues);
YQL_LOG(WARN) << "Issues: " << issues.ToString();
*ErrorCounter += 1;
}
- if (ev->Get()->Record.GetNeedFallback()) {
+ if (needFallback) {
// TODO: Remove GetNeedFallback, use only issue codes!
*FallbackCounter += 1;
}