diff options
author | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-04-12 23:24:38 +0300 |
---|---|---|
committer | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-04-12 23:24:38 +0300 |
commit | 1fe30aa6b94852e9165c993b9e3993c93a50b786 (patch) | |
tree | a9e999a1f077c4b2ecdeaac4c1724828822fad23 | |
parent | e1729eae1440f57803573078549027451efa0bab (diff) | |
download | ydb-1fe30aa6b94852e9165c993b9e3993c93a50b786.tar.gz |
Sync StatusCode with Retriable and NeedFallback
ref:fa9c20e0b5a0551b99d3f14bc336ae97aa25b2f0
10 files changed, 79 insertions, 31 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index 064b86a7d5a..4179d4c0bf1 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -651,13 +651,13 @@ private: void SaveQueryResponse(NYql::NDqs::TEvQueryResponse::TPtr& ev) { auto& result = ev->Get()->Record; - LOG_D("Query response. Retryable: " << result.GetRetriable() + LOG_D("Query response. Retryable: " << result.GetDeprecatedRetriable() << ". Result set index: " << DqGraphIndex << ". Issues count: " << result.IssuesSize() << ". Rows count: " << result.GetRowsCount()); AddIssues(result.issues()); - RetryNeeded |= result.GetRetriable(); + RetryNeeded |= result.GetDeprecatedRetriable(); if (Finishing && !result.issues_size()) { // Race between abort and successful finishing. Override with success and provide results to user. FinalQueryStatus = YandexQuery::QueryMeta::COMPLETED; @@ -713,7 +713,10 @@ private: // In this case we can have race between normal finishing of running query and aborting it. // If query is finished with success error code or failure != abort, we override abort with this result. // This logic is located in SaveQueryResponse() method. - ev->Get()->Record.SetRetriable(false); // User aborted => don't retry, only show issues + if (ev->Get()->Record.GetStatusCode() != NYql::NDqProto::StatusIds::SUCCESS) { + ev->Get()->Record.SetStatusCode(NYql::NDqProto::StatusIds::CANCELLED); + } + ev->Get()->Record.SetDeprecatedRetriable(false); // User aborted => don't retry, only show issues QueryResponseArrived = true; SaveQueryResponse(ev); diff --git a/ydb/library/yql/providers/dq/actors/events.cpp b/ydb/library/yql/providers/dq/actors/events.cpp index a983ff5d873..7a9e3f2aedd 100644 --- a/ydb/library/yql/providers/dq/actors/events.cpp +++ b/ydb/library/yql/providers/dq/actors/events.cpp @@ -8,10 +8,14 @@ namespace NYql::NDqs { *Record.MutableTask() = std::move(task); } + TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode) { + Record.SetStatusCode(statusCode); + } + TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool retriable, bool needFallback) { IssuesToMessage(issues, Record.MutableIssues()); - Record.SetRetriable(retriable); - Record.SetNeedFallback(needFallback); + Record.SetDeprecatedRetriable(retriable); + Record.SetDeprecatedNeedFallback(needFallback); Record.SetStatusCode(statusCode); } diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h index e2d89453d31..19f842dbe9b 100644 --- a/ydb/library/yql/providers/dq/actors/events.h +++ b/ydb/library/yql/providers/dq/actors/events.h @@ -21,8 +21,9 @@ namespace NYql::NDqs { struct TEvDqFailure : NActors::TEventPB<TEvDqFailure, NDqProto::TDqFailure, TDqExecuterEvents::ES_DQ_FAILURE> { TEvDqFailure() = default; - explicit TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool retriable = false, bool needFallback = false); - explicit TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssue& issue, bool retriable = false, bool needFallback = false); + explicit TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode); + TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool retriable = false, bool needFallback = false); + TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssue& issue, bool retriable = false, bool needFallback = false); TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& error, bool retriable, bool needFallback); }; diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index 91f1caee748..cb330ee1351 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -87,7 +87,7 @@ private: issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR); Issues.AddIssues({issue}); *ExecutionTimeoutCounter += 1; - Finish(/*retriable=*/ false, /*needFallback=*/ true); + Finish(NYql::NDqProto::StatusIds::TIMEOUT, /*retriable=*/ false, /*needFallback=*/ true); }) cFunc(TEvents::TEvWakeup::EventType, OnWakeup) }) @@ -236,7 +236,7 @@ private: } } - void Finish(bool retriable, bool needFallback = false) + void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode, bool retriable, bool needFallback = false) { YQL_LOG(DEBUG) << __FUNCTION__ << ", retriable=" << retriable << ", needFallback=" << needFallback; if (Finished) { @@ -245,8 +245,9 @@ private: FlushCounter("ExecutionTime"); TQueryResponse result; IssuesToMessage(Issues, result.MutableIssues()); - result.SetRetriable(retriable); - result.SetNeedFallback(needFallback); + result.SetDeprecatedRetriable(retriable); + result.SetDeprecatedNeedFallback(needFallback); + result.SetStatusCode(statusCode); FlushCounters(result); Send(ControlId, MakeHolder<TEvQueryResponse>(std::move(result))); Finished = true; @@ -258,14 +259,14 @@ private: YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(DEBUG) << __FUNCTION__; AddCounters(ev->Get()->Record); - bool retriable = ev->Get()->Record.GetRetriable(); - bool fallback = ev->Get()->Record.GetNeedFallback(); + bool retriable = ev->Get()->Record.GetDeprecatedRetriable(); + bool fallback = ev->Get()->Record.GetDeprecatedNeedFallback(); if (ev->Get()->Record.IssuesSize()) { TIssues issues; IssuesFromMessage(ev->Get()->Record.GetIssues(), issues); Issues.AddIssues(issues); } - Finish(retriable, fallback); + Finish(ev->Get()->Record.GetStatusCode(), retriable, fallback); } } @@ -275,11 +276,11 @@ private: if (!Finished) { try { TFailureInjector::Reach("dq_fail_on_finish", [] { throw yexception() << "dq_fail_on_finish"; }); - Finish(false); + Finish(NYql::NDqProto::StatusIds::SUCCESS, false); } catch (...) { YQL_LOG(ERROR) << " FailureInjector " << CurrentExceptionMessage(); Issues.AddIssue(TIssue("FailureInjection")); - Finish(true); + Finish(NYql::NDqProto::StatusIds::UNAVAILABLE, true); } } } @@ -313,7 +314,7 @@ private: << ev->Get()->Record.GetError().GetMessage() << ":" << static_cast<int>(ev->Get()->Record.GetError().GetErrorCode()); Issues.AddIssue(TIssue(ev->Get()->Record.GetError().GetMessage()).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR)); - Finish(/*retriable = */ true, /*fallback = */ true); + Finish(NYql::NDqProto::StatusIds::OVERLOADED, /*retriable = */ true, /*fallback = */ true); return; } case TAllocateWorkersResponse::kNodes: diff --git a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp index d6b1ba1b94e..9bfd095bdd6 100644 --- a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp +++ b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp @@ -89,7 +89,7 @@ private: if (ErrorMessage) { Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, *ErrorMessage, false, true)); } else { - Send(AggregatorID, MakeHolder<TEvDqFailure>().Release()); + Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::SUCCESS).Release()); } } catch (...) { TIssue issue(CurrentExceptionMessage()); @@ -97,7 +97,7 @@ private: if (ErrorMessage) { issue.AddSubIssue(MakeIntrusive<TIssue>(*ErrorMessage)); } - Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, issue, false, false).Release()); + Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, issue, false, true).Release()); } Send(SelfId(), MakeHolder<NActors::TEvents::TEvPoison>()); } diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index e99001bf6f8..8cb708b2f3f 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -194,8 +194,9 @@ private: } // hacky conversion to TEvDqFailure auto convertedError = MakeHolder<TEvDqFailure>(); - convertedError->Record.SetRetriable(NCommon::IsRetriable(ev)); - convertedError->Record.SetNeedFallback(NCommon::NeedFallback(ev)); + convertedError->Record.SetDeprecatedRetriable(NCommon::IsRetriable(ev)); + convertedError->Record.SetDeprecatedNeedFallback(NCommon::NeedFallback(ev)); + convertedError->Record.SetStatusCode(ev->Get()->Record.GetStatusCode()); convertedError->Record.MutableIssues()->Swap(ev->Get()->Record.MutableIssues()); SendFailure(std::move(convertedError)); // enreached with stats inside } diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto index dd643285c10..8145334021f 100644 --- a/ydb/library/yql/providers/dq/api/protos/dqs.proto +++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto @@ -173,8 +173,8 @@ message TQueryResponse { Ydb.ResultSet ResultSet = 1; repeated Ydb.Issue.IssueMessage Issues = 2; bytes Yson = 3; - bool Retriable = 4; - bool NeedFallback = 6; + bool DeprecatedRetriable = 4; + bool DeprecatedNeedFallback = 6; repeated TMetric Metric = 5; bool Truncated = 7; uint64 RowsCount = 8; @@ -184,8 +184,8 @@ message TQueryResponse { message TDqFailure { repeated Ydb.Issue.IssueMessage Issues = 4; repeated TMetric Metric = 5; - bool Retriable = 6; - bool NeedFallback = 7; + bool DeprecatedRetriable = 6; + bool DeprecatedNeedFallback = 7; NYql.NDqProto.StatusIds.StatusCode StatusCode = 8; }; diff --git a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp index 91bdf3bb7ef..c3f7e378bf8 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp @@ -94,9 +94,33 @@ bool ParseCounterName(TString* prefix, std::map<TString, TString>* labels, TStri return !name->empty(); } +bool IsRetriable(NYql::NDqProto::StatusIds::StatusCode statusCode) { + switch (statusCode) { + case NYql::NDqProto::StatusIds::UNSPECIFIED: + case NYql::NDqProto::StatusIds::SUCCESS: + case NYql::NDqProto::StatusIds::BAD_REQUEST: + return false; + default: + return true; + } +} + bool IsRetriable(const NDq::TEvDq::TEvAbortExecution::TPtr& ev) { - const auto statusCode = ev->Get()->Record.GetStatusCode(); - return statusCode != NYql::NDqProto::StatusIds::BAD_REQUEST; + return IsRetriable(ev->Get()->Record.GetStatusCode()); +} + +bool NeedFallback(NYql::NDqProto::StatusIds::StatusCode statusCode) { + switch (statusCode) { + case NYql::NDqProto::StatusIds::UNSPECIFIED: + case NYql::NDqProto::StatusIds::SUCCESS: + case NYql::NDqProto::StatusIds::ABORTED: + case NYql::NDqProto::StatusIds::CANCELLED: + case NYql::NDqProto::StatusIds::BAD_REQUEST: + case NYql::NDqProto::StatusIds::PRECONDITION_FAILED: + return false; + default: + return true; + } } bool NeedFallback(const NDq::TEvDq::TEvAbortExecution::TPtr& ev) { diff --git a/ydb/library/yql/providers/dq/common/yql_dq_common.h b/ydb/library/yql/providers/dq/common/yql_dq_common.h index 3620862b128..0841c112d47 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_common.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_common.h @@ -14,8 +14,9 @@ TString GetSerializedResultType(const TString& program); bool ParseCounterName(TString* prefix, std::map<TString, TString>* labels, TString* name, const TString& counterName); +bool IsRetriable(NYql::NDqProto::StatusIds::StatusCode statusCode); bool IsRetriable(const NDq::TEvDq::TEvAbortExecution::TPtr& ev); - +bool NeedFallback(NYql::NDqProto::StatusIds::StatusCode statusCode); bool NeedFallback(const NDq::TEvDq::TEvAbortExecution::TPtr& ev); } // namespace NCommon } // namespace NYql diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp index 824c74fa604..8ecd1cb0f93 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.cpp +++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp @@ -131,7 +131,13 @@ namespace NYql::NDqs { queryResult.Mutableresult()->CopyFrom(result.resultset()); queryResult.set_yson(result.yson()); - if (result.GetNeedFallback()) { + auto needFallback = result.GetDeprecatedNeedFallback(); + + if (needFallback != NCommon::NeedFallback(ev->Get()->Record.GetStatusCode())) { + Counters->GetSubgroup("MistmatchedNeedFallback", needFallback ? "False" : "True")->GetCounter(NYql::NDqProto::StatusIds_StatusCode_Name(ev->Get()->Record.GetStatusCode()))->Inc(); + } + + if (needFallback) { NYql::TIssue rootIssue("Fatal Error"); rootIssue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR); NYql::TIssues issues; @@ -183,20 +189,27 @@ namespace NYql::NDqs { YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(DEBUG) << "TServiceProxyActor::OnReturnResult " << ev->Get()->Record.GetMetric().size(); QueryStat.AddCounters(ev->Get()->Record); - if (ev->Get()->Record.GetIssues().size() > 0 && ev->Get()->Record.GetRetriable() && Retry < MaxRetries) { + auto retriable = ev->Get()->Record.GetDeprecatedRetriable(); + + if (retriable != NCommon::IsRetriable(ev->Get()->Record.GetStatusCode())) { + Counters->GetSubgroup("MistmatchedRetriable", retriable ? "False" : "True")->GetCounter(NYql::NDqProto::StatusIds_StatusCode_Name(ev->Get()->Record.GetStatusCode()))->Inc(); + } + + if (ev->Get()->Record.GetIssues().size() > 0 && retriable && Retry < MaxRetries) { QueryStat.AddCounter(RetryName, TDuration::MilliSeconds(0)); NYql::TIssues issues; NYql::IssuesFromMessage(ev->Get()->Record.GetIssues(), issues); YQL_LOG(WARN) << RetryName << " " << Retry << " Issues: " << issues.ToString(); DoRetry(); } else { + auto needFallback = ev->Get()->Record.GetDeprecatedNeedFallback(); if (ev->Get()->Record.GetIssues().size() > 0) { NYql::TIssues issues; NYql::IssuesFromMessage(ev->Get()->Record.GetIssues(), issues); YQL_LOG(WARN) << "Issues: " << issues.ToString(); *ErrorCounter += 1; } - if (ev->Get()->Record.GetNeedFallback()) { + if (needFallback) { // TODO: Remove GetNeedFallback, use only issue codes! *FallbackCounter += 1; } |