aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwhcrc <whcrc@ydb.tech>2022-08-15 18:31:53 +0300
committerwhcrc <whcrc@ydb.tech>2022-08-15 18:31:53 +0300
commit34207d58634f03de42cd726816d6598bd719600d (patch)
treed354945c6d876c2e5f8ecb2dbc7c607c99fcd09b
parentcd7cd49007152e52d94624ce416ccc404cb619e1 (diff)
downloadydb-34207d58634f03de42cd726816d6598bd719600d.tar.gz
no fallback on invalid failed queries
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp4
-rw-r--r--ydb/library/yql/providers/dq/actors/executer_actor.cpp39
-rw-r--r--ydb/library/yql/providers/dq/actors/result_actor_base.h3
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp5
-rw-r--r--ydb/library/yql/providers/dq/api/protos/dqs.proto6
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_common.cpp4
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp75
7 files changed, 65 insertions, 71 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index 891224e931..734211ef68 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -946,8 +946,7 @@ private:
void SaveQueryResponse(NYql::NDqs::TEvQueryResponse::TPtr& ev) {
auto& result = ev->Get()->Record;
- LOG_D("Query response. Retryable: " << result.GetDeprecatedRetriable()
- << ". Result set index: " << DqGraphIndex
+ LOG_D("Query response. Result set index: " << DqGraphIndex
<< ". Issues count: " << result.IssuesSize()
<< ". Rows count: " << result.GetRowsCount());
@@ -1044,7 +1043,6 @@ private:
if (ev->Get()->Record.GetStatusCode() != NYql::NDqProto::StatusIds::SUCCESS) {
ev->Get()->Record.SetStatusCode(NYql::NDqProto::StatusIds::CANCELLED);
}
- ev->Get()->Record.SetDeprecatedRetriable(false); // User aborted => don't retry, only show issues
QueryResponseArrived = true;
SaveQueryResponse(ev);
diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
index e55ff410f7..c7a4449aab 100644
--- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
@@ -88,7 +88,7 @@ private:
issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR);
Issues.AddIssues({issue});
*ExecutionTimeoutCounter += 1;
- Finish(NYql::NDqProto::StatusIds::LIMIT_EXCEEDED, /*retriable=*/ false, /*needFallback=*/ true);
+ Finish(NYql::NDqProto::StatusIds::LIMIT_EXCEEDED);
})
cFunc(TEvents::TEvWakeup::EventType, OnWakeup)
})
@@ -134,9 +134,8 @@ private:
Y_VERIFY(!ResultId);
YQL_CLOG(DEBUG, ProviderDq) << "TDqExecuter::OnGraph";
TFailureInjector::Reach("dq_fail_on_graph", [&] {
- auto ev = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_SUCCESS);
- ev->Record.SetDeprecatedNeedFallback(false);
- ev->Record.SetDeprecatedRetriable(false);
+ // YQL-15117, it's very likely that the status was INTERNAL_ERROR, originated from worker_actor::OnTaskRunnerCreated (with no issues attached)
+ auto ev = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_INTERNAL_ERROR);
Send(SelfId(), std::move(ev));
});
ControlId = NActors::ActorIdFromProto(ev->Get()->Record.GetControlId());
@@ -243,17 +242,15 @@ private:
}
}
- void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode, bool retriable, bool needFallback = false)
+ void Finish(NYql::NDqProto::StatusIds::StatusCode statusCode)
{
- YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << ", retriable=" << retriable << ", needFallback=" << needFallback;
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << " with status=" << static_cast<int>(statusCode) << " issues=" << Issues.ToString();
if (Finished) {
- YQL_CLOG(WARN, ProviderDq) << "Re-Finish IGNORED with Retriable=" << retriable << ", NeedFallback=" << needFallback;
+ YQL_CLOG(WARN, ProviderDq) << "Re-Finish IGNORED with status=" << static_cast<int>(statusCode);
} else {
FlushCounter("ExecutionTime");
TQueryResponse result;
IssuesToMessage(Issues, result.MutableIssues());
- result.SetDeprecatedRetriable(retriable);
- result.SetDeprecatedNeedFallback(needFallback);
result.SetStatusCode(statusCode);
FlushCounters(result);
Send(ControlId, MakeHolder<TEvQueryResponse>(std::move(result)));
@@ -265,24 +262,18 @@ private:
if (!Finished) {
YQL_LOG_CTX_ROOT_SCOPE(TraceId);
YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__
- << " with Retriable=" << ev->Get()->Record.GetDeprecatedRetriable()
- << ", NeedFallback=" << ev->Get()->Record.GetDeprecatedNeedFallback()
- << ", status=" << (int) ev->Get()->Record.GetStatusCode()
+ << ", status=" << static_cast<int>(ev->Get()->Record.GetStatusCode())
+ << ", issues size=" << ev->Get()->Record.IssuesSize()
<< ", sender=" << ev->Sender;
AddCounters(ev->Get()->Record);
- bool retriable = ev->Get()->Record.GetDeprecatedRetriable();
- // while we're investigating YQL-15117, we want to be sure that any failure leads to the fallback (except explicitly retriable ones)
- bool fallback = true;
- auto status = ev->Get()->Record.GetStatusCode();
- if (status == NYql::NDqProto::StatusIds::UNSPECIFIED) {
- status = NYql::NDqProto::StatusIds::INTERNAL_ERROR;
- }
if (ev->Get()->Record.IssuesSize()) {
TIssues issues;
IssuesFromMessage(ev->Get()->Record.GetIssues(), issues);
Issues.AddIssues(issues);
+ YQL_CLOG(DEBUG, ProviderDq) << "Issues: " << Issues.ToString();
}
- Finish(status, retriable, fallback);
+ Y_VERIFY(ev->Get()->Record.GetStatusCode() != NYql::NDqProto::StatusIds::SUCCESS);
+ Finish(ev->Get()->Record.GetStatusCode());
}
}
@@ -292,11 +283,11 @@ private:
if (!Finished) {
try {
TFailureInjector::Reach("dq_fail_on_finish", [] { throw yexception() << "dq_fail_on_finish"; });
- Finish(NYql::NDqProto::StatusIds::SUCCESS, false);
+ Finish(NYql::NDqProto::StatusIds::SUCCESS);
} catch (...) {
YQL_CLOG(ERROR, ProviderDq) << " FailureInjector " << CurrentExceptionMessage();
Issues.AddIssue(TIssue("FailureInjection"));
- Finish(NYql::NDqProto::StatusIds::UNAVAILABLE, true);
+ Finish(NYql::NDqProto::StatusIds::UNAVAILABLE);
}
}
}
@@ -305,7 +296,7 @@ private:
void OnQueryResponse(TEvQueryResponse::TPtr& ev, const TActorContext&) {
YQL_LOG_CTX_ROOT_SCOPE(TraceId);
- YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << " status=" << static_cast<int>(ev->Get()->Record.GetStatusCode()) << " issuses_size=" << ev->Get()->Record.IssuesSize();
Send(PrinterId, ev->Release().Release());
PassAway();
}
@@ -336,7 +327,7 @@ private:
<< ev->Get()->Record.GetError().GetMessage() << ":"
<< static_cast<int>(ev->Get()->Record.GetError().GetErrorCode());
Issues.AddIssue(TIssue(ev->Get()->Record.GetError().GetMessage()).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR));
- Finish(NYql::NDqProto::StatusIds::CLUSTER_OVERLOADED, /*retriable = */ true, /*fallback = */ true);
+ Finish(NYql::NDqProto::StatusIds::CLUSTER_OVERLOADED);
return;
}
case TAllocateWorkersResponse::kNodes:
diff --git a/ydb/library/yql/providers/dq/actors/result_actor_base.h b/ydb/library/yql/providers/dq/actors/result_actor_base.h
index 192dc080da..849d765ebd 100644
--- a/ydb/library/yql/providers/dq/actors/result_actor_base.h
+++ b/ydb/library/yql/providers/dq/actors/result_actor_base.h
@@ -211,9 +211,10 @@ namespace NYql::NDqs::NExecutionHelpers {
void OnFullResultWriterResponse(NYql::NDqs::TEvDqFailure::TPtr& ev, const NActors::TActorContext&) {
YQL_LOG_CTX_ROOT_SCOPE(TraceId);
YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
- if (ev->Get()->Record.IssuesSize() == 0) {
+ if (ev->Get()->Record.IssuesSize() == 0) { // weird way used by writer to acknowledge it's death
DoFinish();
} else {
+ Y_VERIFY(ev->Get()->Record.GetStatusCode() != NYql::NDqProto::StatusIds::SUCCESS);
TBase::Send(ExecuterID, ev->Release().Release());
}
}
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index 41756240c6..2ffdbb76fc 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -184,10 +184,9 @@ private:
}
// hacky conversion to TEvDqFailure
auto convertedError = MakeHolder<TEvDqFailure>();
- convertedError->Record.SetDeprecatedRetriable(NCommon::IsRetriable(ev));
- convertedError->Record.SetDeprecatedNeedFallback(NCommon::NeedFallback(ev));
convertedError->Record.SetStatusCode(ev->Get()->Record.GetStatusCode());
convertedError->Record.MutableIssues()->Swap(ev->Get()->Record.MutableIssues());
+ Y_VERIFY(convertedError->Record.GetStatusCode() != NYql::NDqProto::StatusIds::SUCCESS);
SendFailure(std::move(convertedError)); // enreached with stats inside
}
@@ -248,6 +247,7 @@ private:
TaskRunnerPrepared = true;
try {
+ TFailureInjector::Reach("dq_fail_on_task_runner_created", [] { throw yexception() << "dq_fail_on_task_runner_created"; });
Stat.AddCounters2(ev->Get()->Sensors);
const auto& secureParams = ev->Get()->SecureParams;
@@ -347,6 +347,7 @@ private:
void OnChannelPopFinished(TEvChannelPopFinished::TPtr& ev, const NActors::TActorContext& ctx) {
try {
+ TFailureInjector::Reach("dq_fail_on_channel_pop_finished", [] { throw yexception() << "dq_fail_on_channel_pop_finished"; });
auto outputActorId = OutChannelId2ActorId[ev->Get()->ChannelId];
auto& outChannel = OutputMap[outputActorId];
TPullResponse response;
diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto
index 874b528c51..e2a8199c9b 100644
--- a/ydb/library/yql/providers/dq/api/protos/dqs.proto
+++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto
@@ -170,11 +170,10 @@ message TPullResponse {
}
message TQueryResponse {
+ reserved 4, 6;
Ydb.ResultSet ResultSet = 1;
repeated Ydb.Issue.IssueMessage Issues = 2;
bytes Yson = 3;
- bool DeprecatedRetriable = 4;
- bool DeprecatedNeedFallback = 6;
repeated TMetric Metric = 5;
bool Truncated = 7;
uint64 RowsCount = 8;
@@ -182,10 +181,9 @@ message TQueryResponse {
}
message TDqFailure {
+ reserved 6, 7;
repeated Ydb.Issue.IssueMessage Issues = 4;
repeated TMetric Metric = 5;
- bool DeprecatedRetriable = 6;
- bool DeprecatedNeedFallback = 7;
NYql.NDqProto.StatusIds.StatusCode StatusCode = 8;
};
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp
index a050354be0..78e226e2ee 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp
@@ -101,7 +101,10 @@ bool IsRetriable(NYql::NDqProto::StatusIds::StatusCode statusCode) {
case NYql::NDqProto::StatusIds::BAD_REQUEST:
case NYql::NDqProto::StatusIds::LIMIT_EXCEEDED:
case NYql::NDqProto::StatusIds::UNSUPPORTED:
+ case NYql::NDqProto::StatusIds::ABORTED:
+ case NYql::NDqProto::StatusIds::CANCELLED:
return false;
+ case NYql::NDqProto::StatusIds::UNAVAILABLE:
default:
return true;
}
@@ -121,6 +124,7 @@ bool NeedFallback(NYql::NDqProto::StatusIds::StatusCode statusCode) {
case NYql::NDqProto::StatusIds::PRECONDITION_FAILED:
return false;
case NYql::NDqProto::StatusIds::LIMIT_EXCEEDED:
+ case NYql::NDqProto::StatusIds::UNAVAILABLE:
default:
return true;
}
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index 36e2bd9526..96532f7ba8 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -7,6 +7,7 @@
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
#include <ydb/library/yql/providers/dq/counters/counters.h>
#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h>
+#include <ydb/library/yql/providers/dq/common/yql_dq_common.h>
#include <ydb/library/yql/minikql/mkql_string_util.h>
@@ -84,16 +85,8 @@ public:
})
private:
- struct TRunResult {
- bool Retriable;
- bool Fallback;
- TString FilteredStderr;
-
- TRunResult() : Retriable(false), Fallback(false), FilteredStderr() {
- }
- };
-
- static TRunResult ParseStderr(const TString& input, TIntrusivePtr<TDqConfiguration> settings) {
+ static std::pair<NYql::NDqProto::StatusIds::StatusCode, TString> ParseStderr(const TString& input, TIntrusivePtr<TDqConfiguration> settings) {
+ TString filteredStderr;
THashSet<TString> fallbackOn;
if (settings->_FallbackOnRuntimeErrors.Get()) {
TString parts = settings->_FallbackOnRuntimeErrors.Get().GetOrElse("");
@@ -102,66 +95,73 @@ private:
}
}
- TRunResult result;
+ bool fallback = false;
+ bool retry = false;
for (TStringBuf line: StringSplitter(input).SplitByString("\n").SkipEmpty()) {
if (line.Contains("mlockall failed")) {
// skip
} else {
- if (!result.Fallback) {
+ if (!fallback) {
if (line.Contains("FindColumnInfo(): requirement memberType->GetKind() == TType::EKind::Data")) {
// YQL-14757: temporary workaround for part6/produce-reduce_lambda_list_table-default.txt
- result.Fallback = true;
+ fallback = true;
} else if (line.Contains("embedded:Len")) {
// YQL-14763
- result.Fallback = true;
+ fallback = true;
} else if (line.Contains("Container killed by OOM")) {
// temporary workaround for YQL-12066
- result.Fallback = true;
+ fallback = true;
} else if (line.Contains("Expected data or optional of data, actual:")) {
// temporary workaround for YQL-12835
- result.Fallback = true;
+ fallback = true;
} else if (line.Contains("Pattern nodes can not get computation node by index:")) {
// temporary workaround for YQL-12987
- result.Fallback = true;
+ fallback = true;
} else if (line.Contains("contrib/libs/protobuf/src/google/protobuf/messagext.cc") && line.Contains("Message size") && line.Contains("exceeds")) {
// temporary workaround for YQL-12988
- result.Fallback = true;
+ fallback = true;
} else if (line.Contains("Cannot start container")) {
// temporary workaround for YQL-14221
- result.Retriable = true;
- result.Fallback = true;
+ retry = true;
+ fallback = true;
} else if (line.Contains("Cannot execl")) {
// YQL-14099
- result.Retriable = true;
- result.Fallback = true;
+ retry = true;
+ fallback = true;
} else {
for (const auto& part : fallbackOn) {
if (line.Contains(part)) {
- result.Fallback = true;
+ fallback = true;
}
}
}
}
- result.FilteredStderr += line;
- result.FilteredStderr += "\n";
+ filteredStderr += line;
+ filteredStderr += "\n";
}
}
- return result;
+ auto status = NYql::NDqProto::StatusIds::BAD_REQUEST; // no retries, no fallback, error though
+ if (retry) {
+ status = NYql::NDqProto::StatusIds::UNAVAILABLE; // retries, fallback on retries limit
+ } else if (fallback) {
+ status = NYql::NDqProto::StatusIds::UNSUPPORTED; // no retries, fallback immediately
+ }
+ return {status, filteredStderr};
}
- static THolder<TEvDq::TEvAbortExecution> StatusToError(
+ static THolder<TEvDq::TEvAbortExecution> MakeError(
const TEvError::TStatus& status,
TIntrusivePtr<TDqConfiguration> settings,
ui64 stageId,
TString message = CurrentExceptionMessage()) {
// stderr always affects retriable/fallback flags
- auto runResult = ParseStderr(status.Stderr, settings);
+ auto [queryStatus, filteredStderr] = ParseStderr(status.Stderr, settings);
auto stderrStr = TStringBuilder{}
<< "ExitCode: " << status.ExitCode << "\n"
<< "StageId: " << stageId << "\n"
- << runResult.FilteredStderr;
- auto issueCode = runResult.Fallback
+ << filteredStderr;
+ auto issueCode = NCommon::NeedFallback(queryStatus)
? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR
: TIssuesIds::DQ_GATEWAY_ERROR;
TIssue issue;
@@ -177,7 +177,8 @@ private:
issue.AddSubIssue(MakeIntrusive<TIssue>(YqlIssue(parsedPos.GetOrElse(TPosition()), TIssuesIds::DQ_GATEWAY_ERROR, TString{terminationMessage})));
}
}
- return MakeHolder<NDq::TEvDq::TEvAbortExecution>(runResult.Retriable ? NYql::NDqProto::StatusIds::UNAVAILABLE : (runResult.Fallback ? NYql::NDqProto::StatusIds::UNSUPPORTED : NYql::NDqProto::StatusIds::BAD_REQUEST), TVector<TIssue>{issue});
+ Y_VERIFY(queryStatus != NYql::NDqProto::StatusIds::SUCCESS);
+ return MakeHolder<NDq::TEvDq::TEvAbortExecution>(queryStatus, TVector<TIssue>{issue});
}
void PassAway() override {
@@ -233,7 +234,7 @@ private:
new IEventHandle(
replyTo,
selfId,
- StatusToError({status.ExitCode, status.Stderr}, settings, stageId).Release(),
+ MakeError({status.ExitCode, status.Stderr}, settings, stageId).Release(),
/*flags=*/0,
cookie));
}
@@ -278,7 +279,7 @@ private:
new IEventHandle(
parentId,
selfId,
- StatusToError({status.ExitCode, status.Stderr}, settings, stageId).Release(),
+ MakeError({status.ExitCode, status.Stderr}, settings, stageId).Release(),
/*flags=*/0,
cookie));
}
@@ -356,7 +357,7 @@ private:
new IEventHandle(
replyTo,
selfId,
- StatusToError({status.ExitCode, status.Stderr}, settings, stageId).Release(),
+ MakeError({status.ExitCode, status.Stderr}, settings, stageId).Release(),
/*flags=*/0,
cookie));
}
@@ -424,7 +425,7 @@ private:
new IEventHandle(
replyTo,
selfId,
- StatusToError({status.ExitCode, status.Stderr}, settings, stageId).Release(),
+ MakeError({status.ExitCode, status.Stderr}, settings, stageId).Release(),
/*flags=*/0,
cookie));
}
@@ -496,7 +497,7 @@ private:
} catch (...) {
auto status = taskRunner->GetStatus();
actorSystem->Send(
- new IEventHandle(replyTo, selfId, StatusToError({status.ExitCode, status.Stderr}, settings, stageId).Release(), 0, cookie));
+ new IEventHandle(replyTo, selfId, MakeError({status.ExitCode, status.Stderr}, settings, stageId).Release(), 0, cookie));
}
});
}
@@ -556,7 +557,7 @@ private:
new IEventHandle(
replyTo,
selfId,
- StatusToError({status.ExitCode, status.Stderr}, settings, stageId).Release(),
+ MakeError({status.ExitCode, status.Stderr}, settings, stageId).Release(),
/*flags=*/0,
cookie));
}