diff options
author | whcrc <whcrc@ydb.tech> | 2022-08-15 18:31:53 +0300 |
---|---|---|
committer | whcrc <whcrc@ydb.tech> | 2022-08-15 18:31:53 +0300 |
commit | 34207d58634f03de42cd726816d6598bd719600d (patch) | |
tree | d354945c6d876c2e5f8ecb2dbc7c607c99fcd09b | |
parent | cd7cd49007152e52d94624ce416ccc404cb619e1 (diff) | |
download | ydb-34207d58634f03de42cd726816d6598bd719600d.tar.gz |
no fallback on invalid failed queries
7 files changed, 65 insertions, 71 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index 891224e931..734211ef68 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -946,8 +946,7 @@ private: void SaveQueryResponse(NYql::NDqs::TEvQueryResponse::TPtr& ev) { auto& result = ev->Get()->Record; - LOG_D("Query response. Retryable: " << result.GetDeprecatedRetriable() - << ". Result set index: " << DqGraphIndex + LOG_D("Query response. Result set index: " << DqGraphIndex << ". Issues count: " << result.IssuesSize() << ". Rows count: " << result.GetRowsCount()); @@ -1044,7 +1043,6 @@ private: if (ev->Get()->Record.GetStatusCode() != NYql::NDqProto::StatusIds::SUCCESS) { ev->Get()->Record.SetStatusCode(NYql::NDqProto::StatusIds::CANCELLED); } - ev->Get()->Record.SetDeprecatedRetriable(false); // User aborted => don't retry, only show issues QueryResponseArrived = true; SaveQueryResponse(ev); diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index e55ff410f7..c7a4449aab 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -88,7 +88,7 @@ private: issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR); Issues.AddIssues({issue}); *ExecutionTimeoutCounter += 1; - Finish(NYql::NDqProto::StatusIds::LIMIT_EXCEEDED, /*retriable=*/ false, /*needFallback=*/ true); + Finish(NYql::NDqProto::StatusIds::LIMIT_EXCEEDED); }) cFunc(TEvents::TEvWakeup::EventType, OnWakeup) }) @@ -134,9 +134,8 @@ private: Y_VERIFY(!ResultId); YQL_CLOG(DEBUG, ProviderDq) << "TDqExecuter::OnGraph"; TFailureInjector::Reach("dq_fail_on_graph", [&] { - auto ev = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_SUCCESS); - ev->Record.SetDeprecatedNeedFallback(false); - ev->Record.SetDeprecatedRetriable(false); + // YQL-15117, it's very likely that the status was INTERNAL_ERROR, originated from worker_actor::OnTaskRunnerCreated (with no issues attached) + auto ev = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_INTERNAL_ERROR); Send(SelfId(), std::move(ev)); }); ControlId = NActors::ActorIdFromProto(ev->Get()->Record.GetControlId()); @@ -243,17 +242,15 @@ private: } } - void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode, bool retriable, bool needFallback = false) + void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode) { - YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << ", retriable=" << retriable << ", needFallback=" << needFallback; + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << " with status=" << static_cast<int>(statusCode) << " issues=" << Issues.ToString(); if (Finished) { - YQL_CLOG(WARN, ProviderDq) << "Re-Finish IGNORED with Retriable=" << retriable << ", NeedFallback=" << needFallback; + YQL_CLOG(WARN, ProviderDq) << "Re-Finish IGNORED with status=" << static_cast<int>(statusCode); } else { FlushCounter("ExecutionTime"); TQueryResponse result; IssuesToMessage(Issues, result.MutableIssues()); - result.SetDeprecatedRetriable(retriable); - result.SetDeprecatedNeedFallback(needFallback); result.SetStatusCode(statusCode); FlushCounters(result); Send(ControlId, MakeHolder<TEvQueryResponse>(std::move(result))); @@ -265,24 +262,18 @@ private: if (!Finished) { YQL_LOG_CTX_ROOT_SCOPE(TraceId); YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ - << " with Retriable=" << ev->Get()->Record.GetDeprecatedRetriable() - << ", NeedFallback=" << ev->Get()->Record.GetDeprecatedNeedFallback() - << ", status=" << (int) ev->Get()->Record.GetStatusCode() + << ", status=" << static_cast<int>(ev->Get()->Record.GetStatusCode()) + << ", issues size=" << ev->Get()->Record.IssuesSize() << ", sender=" << ev->Sender; AddCounters(ev->Get()->Record); - bool retriable = ev->Get()->Record.GetDeprecatedRetriable(); - // while we're investigating YQL-15117, we want to be sure that any failure leads to the fallback (except explicitly retriable ones) - bool fallback = true; - auto status = ev->Get()->Record.GetStatusCode(); - if (status == NYql::NDqProto::StatusIds::UNSPECIFIED) { - status = NYql::NDqProto::StatusIds::INTERNAL_ERROR; - } if (ev->Get()->Record.IssuesSize()) { TIssues issues; IssuesFromMessage(ev->Get()->Record.GetIssues(), issues); Issues.AddIssues(issues); + YQL_CLOG(DEBUG, ProviderDq) << "Issues: " << Issues.ToString(); } - Finish(status, retriable, fallback); + Y_VERIFY(ev->Get()->Record.GetStatusCode() != NYql::NDqProto::StatusIds::SUCCESS); + Finish(ev->Get()->Record.GetStatusCode()); } } @@ -292,11 +283,11 @@ private: if (!Finished) { try { TFailureInjector::Reach("dq_fail_on_finish", [] { throw yexception() << "dq_fail_on_finish"; }); - Finish(NYql::NDqProto::StatusIds::SUCCESS, false); + Finish(NYql::NDqProto::StatusIds::SUCCESS); } catch (...) { YQL_CLOG(ERROR, ProviderDq) << " FailureInjector " << CurrentExceptionMessage(); Issues.AddIssue(TIssue("FailureInjection")); - Finish(NYql::NDqProto::StatusIds::UNAVAILABLE, true); + Finish(NYql::NDqProto::StatusIds::UNAVAILABLE); } } } @@ -305,7 +296,7 @@ private: void OnQueryResponse(TEvQueryResponse::TPtr& ev, const TActorContext&) { YQL_LOG_CTX_ROOT_SCOPE(TraceId); - YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << " status=" << static_cast<int>(ev->Get()->Record.GetStatusCode()) << " issuses_size=" << ev->Get()->Record.IssuesSize(); Send(PrinterId, ev->Release().Release()); PassAway(); } @@ -336,7 +327,7 @@ private: << ev->Get()->Record.GetError().GetMessage() << ":" << static_cast<int>(ev->Get()->Record.GetError().GetErrorCode()); Issues.AddIssue(TIssue(ev->Get()->Record.GetError().GetMessage()).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR)); - Finish(NYql::NDqProto::StatusIds::CLUSTER_OVERLOADED, /*retriable = */ true, /*fallback = */ true); + Finish(NYql::NDqProto::StatusIds::CLUSTER_OVERLOADED); return; } case TAllocateWorkersResponse::kNodes: diff --git a/ydb/library/yql/providers/dq/actors/result_actor_base.h b/ydb/library/yql/providers/dq/actors/result_actor_base.h index 192dc080da..849d765ebd 100644 --- a/ydb/library/yql/providers/dq/actors/result_actor_base.h +++ b/ydb/library/yql/providers/dq/actors/result_actor_base.h @@ -211,9 +211,10 @@ namespace NYql::NDqs::NExecutionHelpers { void OnFullResultWriterResponse(NYql::NDqs::TEvDqFailure::TPtr& ev, const NActors::TActorContext&) { YQL_LOG_CTX_ROOT_SCOPE(TraceId); YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; - if (ev->Get()->Record.IssuesSize() == 0) { + if (ev->Get()->Record.IssuesSize() == 0) { // weird way used by writer to acknowledge it's death DoFinish(); } else { + Y_VERIFY(ev->Get()->Record.GetStatusCode() != NYql::NDqProto::StatusIds::SUCCESS); TBase::Send(ExecuterID, ev->Release().Release()); } } diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index 41756240c6..2ffdbb76fc 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -184,10 +184,9 @@ private: } // hacky conversion to TEvDqFailure auto convertedError = MakeHolder<TEvDqFailure>(); - convertedError->Record.SetDeprecatedRetriable(NCommon::IsRetriable(ev)); - convertedError->Record.SetDeprecatedNeedFallback(NCommon::NeedFallback(ev)); convertedError->Record.SetStatusCode(ev->Get()->Record.GetStatusCode()); convertedError->Record.MutableIssues()->Swap(ev->Get()->Record.MutableIssues()); + Y_VERIFY(convertedError->Record.GetStatusCode() != NYql::NDqProto::StatusIds::SUCCESS); SendFailure(std::move(convertedError)); // enreached with stats inside } @@ -248,6 +247,7 @@ private: TaskRunnerPrepared = true; try { + TFailureInjector::Reach("dq_fail_on_task_runner_created", [] { throw yexception() << "dq_fail_on_task_runner_created"; }); Stat.AddCounters2(ev->Get()->Sensors); const auto& secureParams = ev->Get()->SecureParams; @@ -347,6 +347,7 @@ private: void OnChannelPopFinished(TEvChannelPopFinished::TPtr& ev, const NActors::TActorContext& ctx) { try { + TFailureInjector::Reach("dq_fail_on_channel_pop_finished", [] { throw yexception() << "dq_fail_on_channel_pop_finished"; }); auto outputActorId = OutChannelId2ActorId[ev->Get()->ChannelId]; auto& outChannel = OutputMap[outputActorId]; TPullResponse response; diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto index 874b528c51..e2a8199c9b 100644 --- a/ydb/library/yql/providers/dq/api/protos/dqs.proto +++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto @@ -170,11 +170,10 @@ message TPullResponse { } message TQueryResponse { + reserved 4, 6; Ydb.ResultSet ResultSet = 1; repeated Ydb.Issue.IssueMessage Issues = 2; bytes Yson = 3; - bool DeprecatedRetriable = 4; - bool DeprecatedNeedFallback = 6; repeated TMetric Metric = 5; bool Truncated = 7; uint64 RowsCount = 8; @@ -182,10 +181,9 @@ message TQueryResponse { } message TDqFailure { + reserved 6, 7; repeated Ydb.Issue.IssueMessage Issues = 4; repeated TMetric Metric = 5; - bool DeprecatedRetriable = 6; - bool DeprecatedNeedFallback = 7; NYql.NDqProto.StatusIds.StatusCode StatusCode = 8; }; diff --git a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp index a050354be0..78e226e2ee 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp @@ -101,7 +101,10 @@ bool IsRetriable(NYql::NDqProto::StatusIds::StatusCode statusCode) { case NYql::NDqProto::StatusIds::BAD_REQUEST: case NYql::NDqProto::StatusIds::LIMIT_EXCEEDED: case NYql::NDqProto::StatusIds::UNSUPPORTED: + case NYql::NDqProto::StatusIds::ABORTED: + case NYql::NDqProto::StatusIds::CANCELLED: return false; + case NYql::NDqProto::StatusIds::UNAVAILABLE: default: return true; } @@ -121,6 +124,7 @@ bool NeedFallback(NYql::NDqProto::StatusIds::StatusCode statusCode) { case NYql::NDqProto::StatusIds::PRECONDITION_FAILED: return false; case NYql::NDqProto::StatusIds::LIMIT_EXCEEDED: + case NYql::NDqProto::StatusIds::UNAVAILABLE: default: return true; } diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index 36e2bd9526..96532f7ba8 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -7,6 +7,7 @@ #include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> #include <ydb/library/yql/providers/dq/counters/counters.h> #include <ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h> +#include <ydb/library/yql/providers/dq/common/yql_dq_common.h> #include <ydb/library/yql/minikql/mkql_string_util.h> @@ -84,16 +85,8 @@ public: }) private: - struct TRunResult { - bool Retriable; - bool Fallback; - TString FilteredStderr; - - TRunResult() : Retriable(false), Fallback(false), FilteredStderr() { - } - }; - - static TRunResult ParseStderr(const TString& input, TIntrusivePtr<TDqConfiguration> settings) { + static std::pair<NYql::NDqProto::StatusIds::StatusCode, TString> ParseStderr(const TString& input, TIntrusivePtr<TDqConfiguration> settings) { + TString filteredStderr; THashSet<TString> fallbackOn; if (settings->_FallbackOnRuntimeErrors.Get()) { TString parts = settings->_FallbackOnRuntimeErrors.Get().GetOrElse(""); @@ -102,66 +95,73 @@ private: } } - TRunResult result; + bool fallback = false; + bool retry = false; for (TStringBuf line: StringSplitter(input).SplitByString("\n").SkipEmpty()) { if (line.Contains("mlockall failed")) { // skip } else { - if (!result.Fallback) { + if (!fallback) { if (line.Contains("FindColumnInfo(): requirement memberType->GetKind() == TType::EKind::Data")) { // YQL-14757: temporary workaround for part6/produce-reduce_lambda_list_table-default.txt - result.Fallback = true; + fallback = true; } else if (line.Contains("embedded:Len")) { // YQL-14763 - result.Fallback = true; + fallback = true; } else if (line.Contains("Container killed by OOM")) { // temporary workaround for YQL-12066 - result.Fallback = true; + fallback = true; } else if (line.Contains("Expected data or optional of data, actual:")) { // temporary workaround for YQL-12835 - result.Fallback = true; + fallback = true; } else if (line.Contains("Pattern nodes can not get computation node by index:")) { // temporary workaround for YQL-12987 - result.Fallback = true; + fallback = true; } else if (line.Contains("contrib/libs/protobuf/src/google/protobuf/messagext.cc") && line.Contains("Message size") && line.Contains("exceeds")) { // temporary workaround for YQL-12988 - result.Fallback = true; + fallback = true; } else if (line.Contains("Cannot start container")) { // temporary workaround for YQL-14221 - result.Retriable = true; - result.Fallback = true; + retry = true; + fallback = true; } else if (line.Contains("Cannot execl")) { // YQL-14099 - result.Retriable = true; - result.Fallback = true; + retry = true; + fallback = true; } else { for (const auto& part : fallbackOn) { if (line.Contains(part)) { - result.Fallback = true; + fallback = true; } } } } - result.FilteredStderr += line; - result.FilteredStderr += "\n"; + filteredStderr += line; + filteredStderr += "\n"; } } - return result; + auto status = NYql::NDqProto::StatusIds::BAD_REQUEST; // no retries, no fallback, error though + if (retry) { + status = NYql::NDqProto::StatusIds::UNAVAILABLE; // retries, fallback on retries limit + } else if (fallback) { + status = NYql::NDqProto::StatusIds::UNSUPPORTED; // no retries, fallback immediately + } + return {status, filteredStderr}; } - static THolder<TEvDq::TEvAbortExecution> StatusToError( + static THolder<TEvDq::TEvAbortExecution> MakeError( const TEvError::TStatus& status, TIntrusivePtr<TDqConfiguration> settings, ui64 stageId, TString message = CurrentExceptionMessage()) { // stderr always affects retriable/fallback flags - auto runResult = ParseStderr(status.Stderr, settings); + auto [queryStatus, filteredStderr] = ParseStderr(status.Stderr, settings); auto stderrStr = TStringBuilder{} << "ExitCode: " << status.ExitCode << "\n" << "StageId: " << stageId << "\n" - << runResult.FilteredStderr; - auto issueCode = runResult.Fallback + << filteredStderr; + auto issueCode = NCommon::NeedFallback(queryStatus) ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR : TIssuesIds::DQ_GATEWAY_ERROR; TIssue issue; @@ -177,7 +177,8 @@ private: issue.AddSubIssue(MakeIntrusive<TIssue>(YqlIssue(parsedPos.GetOrElse(TPosition()), TIssuesIds::DQ_GATEWAY_ERROR, TString{terminationMessage}))); } } - return MakeHolder<NDq::TEvDq::TEvAbortExecution>(runResult.Retriable ? NYql::NDqProto::StatusIds::UNAVAILABLE : (runResult.Fallback ? NYql::NDqProto::StatusIds::UNSUPPORTED : NYql::NDqProto::StatusIds::BAD_REQUEST), TVector<TIssue>{issue}); + Y_VERIFY(queryStatus != NYql::NDqProto::StatusIds::SUCCESS); + return MakeHolder<NDq::TEvDq::TEvAbortExecution>(queryStatus, TVector<TIssue>{issue}); } void PassAway() override { @@ -233,7 +234,7 @@ private: new IEventHandle( replyTo, selfId, - StatusToError({status.ExitCode, status.Stderr}, settings, stageId).Release(), + MakeError({status.ExitCode, status.Stderr}, settings, stageId).Release(), /*flags=*/0, cookie)); } @@ -278,7 +279,7 @@ private: new IEventHandle( parentId, selfId, - StatusToError({status.ExitCode, status.Stderr}, settings, stageId).Release(), + MakeError({status.ExitCode, status.Stderr}, settings, stageId).Release(), /*flags=*/0, cookie)); } @@ -356,7 +357,7 @@ private: new IEventHandle( replyTo, selfId, - StatusToError({status.ExitCode, status.Stderr}, settings, stageId).Release(), + MakeError({status.ExitCode, status.Stderr}, settings, stageId).Release(), /*flags=*/0, cookie)); } @@ -424,7 +425,7 @@ private: new IEventHandle( replyTo, selfId, - StatusToError({status.ExitCode, status.Stderr}, settings, stageId).Release(), + MakeError({status.ExitCode, status.Stderr}, settings, stageId).Release(), /*flags=*/0, cookie)); } @@ -496,7 +497,7 @@ private: } catch (...) { auto status = taskRunner->GetStatus(); actorSystem->Send( - new IEventHandle(replyTo, selfId, StatusToError({status.ExitCode, status.Stderr}, settings, stageId).Release(), 0, cookie)); + new IEventHandle(replyTo, selfId, MakeError({status.ExitCode, status.Stderr}, settings, stageId).Release(), 0, cookie)); } }); } @@ -556,7 +557,7 @@ private: new IEventHandle( replyTo, selfId, - StatusToError({status.ExitCode, status.Stderr}, settings, stageId).Release(), + MakeError({status.ExitCode, status.Stderr}, settings, stageId).Release(), /*flags=*/0, cookie)); } |