aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <hor911@gmail.com>2022-05-06 08:14:21 +0300
committerAleksandr Khoroshilov <hor911@gmail.com>2022-05-06 08:14:21 +0300
commit264071acbaeeec9bef990f960aa7a434098305dd (patch)
tree290a6aa5333e456061815351ca0a7e2a13f1d9d5
parent1d4a0d58bc4c73b689ec12e075e65ca9bd70a5db (diff)
downloadydb-264071acbaeeec9bef990f960aa7a434098305dd.tar.gz
Do not use Retriable and NeedFallback in code
ref:ff846ab5f5df0864ba682210f06609d183ba7047
-rw-r--r--ydb/core/yq/libs/actors/result_writer.cpp7
-rw-r--r--ydb/library/yql/providers/dq/actors/events.cpp17
-rw-r--r--ydb/library/yql/providers/dq/actors/events.h6
-rw-r--r--ydb/library/yql/providers/dq/actors/full_result_writer.cpp6
-rw-r--r--ydb/library/yql/providers/dq/actors/result_actor_base.h13
-rw-r--r--ydb/library/yql/providers/dq/actors/result_aggregator.cpp6
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller.cpp27
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp20
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_common.cpp9
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_service.cpp42
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp2
11 files changed, 76 insertions, 79 deletions
diff --git a/ydb/core/yq/libs/actors/result_writer.cpp b/ydb/core/yq/libs/actors/result_writer.cpp
index 62f47f4c86b..b3fee94c452 100644
--- a/ydb/core/yq/libs/actors/result_writer.cpp
+++ b/ydb/core/yq/libs/actors/result_writer.cpp
@@ -83,7 +83,7 @@ private:
}
void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&, const NActors::TActorContext& ) {
- auto req = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNAVAILABLE, TIssue("Undelivered").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR), true, /*needFallback=*/false);
+ auto req = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNAVAILABLE, TIssue("Undelivered").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR));
Send(ExecuterId, req.Release());
HasError = true;
}
@@ -128,7 +128,7 @@ private:
auto it = Requests.find(ev->Get()->Result.request_id());
if (it == Requests.end()) {
HasError = true;
- auto req = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssue("Unknown RequestId").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR), /*retriable=*/ false, /*needFallback=*/false);
+ auto req = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::BAD_REQUEST, TIssue("Unknown RequestId").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR));
Send(ExecuterId, req.Release());
return;
}
@@ -283,8 +283,7 @@ private:
}
} catch (...) {
LOG_E(CurrentExceptionMessage());
- auto req = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssue("Internal error on data write").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR), /*retriable=*/ false,
- /*needFallback=*/false);
+ auto req = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssue("Internal error on data write").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR));
Send(ExecuterId, req.Release());
HasError = true;
}
diff --git a/ydb/library/yql/providers/dq/actors/events.cpp b/ydb/library/yql/providers/dq/actors/events.cpp
index 7a9e3f2aedd..4582deaee8c 100644
--- a/ydb/library/yql/providers/dq/actors/events.cpp
+++ b/ydb/library/yql/providers/dq/actors/events.cpp
@@ -12,25 +12,22 @@ namespace NYql::NDqs {
Record.SetStatusCode(statusCode);
}
- TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool retriable, bool needFallback) {
+ TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) {
IssuesToMessage(issues, Record.MutableIssues());
- Record.SetDeprecatedRetriable(retriable);
- Record.SetDeprecatedNeedFallback(needFallback);
Record.SetStatusCode(statusCode);
}
- TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssue& issue, bool retriable, bool needFallback)
- : TEvDqFailure(statusCode, TIssues({issue}), retriable, needFallback)
+ TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssue& issue)
+ : TEvDqFailure(statusCode, TIssues({issue}))
{
}
- TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& error, bool retriable, bool needFallback)
- : TEvDqFailure(
- statusCode,
+/*
TIssue(error).SetCode(
needFallback ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR : TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR),
- retriable,
- needFallback) {
+*/
+ TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString&)
+ : TEvDqFailure(statusCode) {
}
TEvQueryResponse::TEvQueryResponse(NDqProto::TQueryResponse&& queryResult) {
diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h
index 19f842dbe9b..220eb453ec2 100644
--- a/ydb/library/yql/providers/dq/actors/events.h
+++ b/ydb/library/yql/providers/dq/actors/events.h
@@ -22,9 +22,9 @@ namespace NYql::NDqs {
struct TEvDqFailure : NActors::TEventPB<TEvDqFailure, NDqProto::TDqFailure, TDqExecuterEvents::ES_DQ_FAILURE> {
TEvDqFailure() = default;
explicit TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode);
- TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool retriable = false, bool needFallback = false);
- TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssue& issue, bool retriable = false, bool needFallback = false);
- TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& error, bool retriable, bool needFallback);
+ TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues);
+ TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssue& issue);
+ TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& error);
};
struct TEvQueryResponse
diff --git a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
index 9bfd095bdd6..f318eefa4be 100644
--- a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
+++ b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
@@ -87,7 +87,7 @@ private:
TFailureInjector::Reach("full_result_fail_on_finish", [] { throw yexception() << "full_result_fail_on_finish"; });
FullResultWriter->Finish();
if (ErrorMessage) {
- Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, *ErrorMessage, false, true));
+ Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSUPPORTED, TIssue(*ErrorMessage).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR)));
} else {
Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::SUCCESS).Release());
}
@@ -97,7 +97,7 @@ private:
if (ErrorMessage) {
issue.AddSubIssue(MakeIntrusive<TIssue>(*ErrorMessage));
}
- Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, issue, false, true).Release());
+ Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSUPPORTED, issue).Release());
}
Send(SelfId(), MakeHolder<NActors::TEvents::TEvPoison>());
}
@@ -126,7 +126,7 @@ private:
Send(AggregatorID, MakeHolder<TEvFullResultWriterAck>(ackRecord));
} catch (...) {
ErrorMessage = CurrentExceptionMessage();
- Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, *ErrorMessage, false, true));
+ Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSUPPORTED, TIssue(*ErrorMessage).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR)));
}
if (ErrorMessage) {
diff --git a/ydb/library/yql/providers/dq/actors/result_actor_base.h b/ydb/library/yql/providers/dq/actors/result_actor_base.h
index 361954d5aca..fe2bc1831cf 100644
--- a/ydb/library/yql/providers/dq/actors/result_actor_base.h
+++ b/ydb/library/yql/providers/dq/actors/result_actor_base.h
@@ -4,6 +4,7 @@
#include <ydb/library/yql/providers/dq/actors/events.h>
#include <ydb/library/yql/providers/dq/actors/proto_builder.h>
#include <ydb/library/yql/providers/dq/api/protos/dqs.pb.h>
+#include <ydb/library/yql/providers/dq/common/yql_dq_common.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
#include <ydb/library/yql/providers/dq/counters/counters.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
@@ -95,7 +96,7 @@ namespace NYql::NDqs::NExecutionHelpers {
return true;
});
} catch (...) {
- OnError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, CurrentExceptionMessage(), false, true);
+ OnError(NYql::NDqProto::StatusIds::UNSUPPORTED, CurrentExceptionMessage());
return;
}
@@ -124,14 +125,14 @@ namespace NYql::NDqs::NExecutionHelpers {
}
}
- void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message, bool retriable, bool needFallback) {
+ void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message) {
YQL_LOG(ERROR) << "OnError " << message;
- auto issueCode = needFallback
+ auto issueCode = NCommon::NeedFallback(statusCode)
? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR
: TIssuesIds::DQ_GATEWAY_ERROR;
const auto issue = TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR);
Issues.AddIssues({issue}); // remember issue to pass it with TEvQueryResponse, cause executor_actor ignores TEvDqFailure after finish
- auto req = MakeHolder<TEvDqFailure>(statusCode, issue, retriable, needFallback);
+ auto req = MakeHolder<TEvDqFailure>(statusCode, issue);
FlushCounters(req->Record);
TBase::Send(ExecuterID, req.Release());
}
@@ -222,7 +223,7 @@ namespace NYql::NDqs::NExecutionHelpers {
TString message = "Undelivered from " + ToString(ev->Sender) + " to " + ToString(TBase::SelfId())
+ " reason: " + ToString(ev->Get()->Reason) + " sourceType: " + ToString(ev->Get()->SourceType >> 16)
+ "." + ToString(ev->Get()->SourceType & 0xFFFF);
- OnError(NYql::NDqProto::StatusIds::UNAVAILABLE, message, true, true);
+ OnError(NYql::NDqProto::StatusIds::UNAVAILABLE, message);
}
void OnFullResultWriterAck(TEvFullResultWriterAck::TPtr& ev, const NActors::TActorContext&) {
@@ -283,7 +284,7 @@ namespace NYql::NDqs::NExecutionHelpers {
YQL_LOG_CTX_SCOPE(TraceId);
if (auto msg = ev->Get()->Record.GetErrorMessage()) {
- OnError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, msg, false, true);
+ OnError(NYql::NDqProto::StatusIds::UNSUPPORTED, msg);
} else {
NActorsProto::TActorId fullResultWriterProto;
ev->Get()->Record.GetMessage().UnpackTo(&fullResultWriterProto);
diff --git a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
index 7f6c383238e..cca0b79d9a8 100644
--- a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
+++ b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp
@@ -105,11 +105,11 @@ private:
YQL_LOG(DEBUG) << __FUNCTION__;
auto now = TInstant::Now();
if (PullRequestTimeout && now - PullRequestStartTime > PullRequestTimeout) {
- OnError(NYql::NDqProto::StatusIds::TIMEOUT, "Timeout " + ToString(SourceID.NodeId()), true, true);
+ OnError(NYql::NDqProto::StatusIds::TIMEOUT, "Timeout " + ToString(SourceID.NodeId()));
}
if (PingTimeout && now - PingStartTime > PingTimeout) {
- OnError(NYql::NDqProto::StatusIds::TIMEOUT, "PingTimeout " + ToString(SourceID.NodeId()), true, true);
+ OnError(NYql::NDqProto::StatusIds::TIMEOUT, "PingTimeout " + ToString(SourceID.NodeId()));
}
if (!PingRequested) {
@@ -171,7 +171,7 @@ private:
Schedule(TDuration::MilliSeconds(10), new TEvPullResult());
return;
case NYql::NDqProto::ERROR: {
- OnError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, ev->Get()->Record.GetErrorMessage(), false, true);
+ OnError(NYql::NDqProto::StatusIds::UNSUPPORTED, ev->Get()->Record.GetErrorMessage());
break;
}
case NYql::NDqProto::UNKNOWN:
diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp
index a7b30fa5286..379a5c0ce91 100644
--- a/ydb/library/yql/providers/dq/actors/task_controller.cpp
+++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp
@@ -99,7 +99,7 @@ private:
TString message = "Undelivered Event " + ToString(ev->Get()->SourceType)
+ " from " + ToString(SelfId()) + " (Self) to " + ToString(ev->Sender) +
+ " Reason: " + ToString(ev->Get()->Reason) + " Cookie: " + ToString(ev->Cookie);
- OnError(NYql::NDqProto::StatusIds::UNAVAILABLE, message, true, true);
+ OnError(NYql::NDqProto::StatusIds::UNAVAILABLE, message);
}
void OnAbortExecution(NDq::TEvDq::TEvAbortExecution::TPtr& ev) {
@@ -107,7 +107,7 @@ private:
auto statusCode = ev->Get()->Record.GetStatusCode();
TIssues issues = ev->Get()->GetIssues();
YQL_LOG(DEBUG) << "AbortExecution from " << ev->Sender << ":" << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) << " " << issues.ToOneLineString();
- OnError(statusCode, issues, NCommon::IsRetriable(ev), NCommon::NeedFallback(ev));
+ OnError(statusCode, issues);
}
void OnComputeActorState(NDq::TEvDqCompute::TEvState::TPtr& ev) {
@@ -134,20 +134,13 @@ private:
case NDqProto::COMPUTE_STATE_UNKNOWN: {
// TODO: use issues
TString message = "unexpected state from " + ToString(computeActor) + ", task: " + ToString(taskId);
- OnError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, message, false, false);
+ OnError(NYql::NDqProto::StatusIds::BAD_REQUEST, message);
break;
}
case NDqProto::COMPUTE_STATE_FAILURE: {
// TODO: don't convert issues to string
NYql::IssuesFromMessage(state.GetIssues(), Issues);
- bool retriable = true;
- for (const auto& issue : Issues) {
- if (issue.IssueCode == TIssuesIds::UNEXPECTED) {
- retriable = false;
- break;
- }
- }
- OnError(state.GetStatusCode(), Issues, retriable, false);
+ OnError(state.GetStatusCode(), Issues);
break;
}
case NDqProto::COMPUTE_STATE_EXECUTING: {
@@ -490,25 +483,25 @@ private:
}
}
- void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool retriable, bool needFallback) {
+ void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) {
YQL_LOG_CTX_SCOPE(TraceId);
YQL_LOG(DEBUG) << "OnError " << issues.ToOneLineString() << " " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode);
if (Finished) {
YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(WARN) << "OnError IGNORED when Finished, Retriable=" << retriable << ", NeedFallback=" << needFallback;
+ YQL_LOG(WARN) << "OnError IGNORED when Finished";
} else {
- auto req = MakeHolder<TEvDqFailure>(statusCode, issues, retriable, needFallback);
+ auto req = MakeHolder<TEvDqFailure>(statusCode, issues);
FinalStat().FlushCounters(req->Record);
Send(ExecuterId, req.Release());
Finished = true;
}
}
- void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message, bool retriable, bool needFallback) {
- auto issueCode = needFallback
+ void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message) {
+ auto issueCode = NCommon::NeedFallback(statusCode)
? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR
: TIssuesIds::DQ_GATEWAY_ERROR;
- OnError(statusCode, TIssues({TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR)}), retriable, needFallback);
+ OnError(statusCode, TIssues({TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR)}));
}
void Finish() {
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index 1fafa3442a2..e0fd163afcd 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -334,7 +334,7 @@ private:
Schedule(PingPeriod, new TEvents::TEvWakeup);
}
} catch (...) {
- SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, CurrentExceptionMessage(), false, false));
+ SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, CurrentExceptionMessage()));
}
}
@@ -387,7 +387,7 @@ private:
Run(ctx);
} catch (...) {
- SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, CurrentExceptionMessage(), false, false));
+ SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, CurrentExceptionMessage()));
}
}
@@ -411,7 +411,7 @@ private:
return;
}
if (responseType == ERROR) {
- Send(SelfId(), MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSPECIFIED, ev->Get()->Record.GetErrorMessage(), false, false));
+ Send(SelfId(), MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSPECIFIED, ev->Get()->Record.GetErrorMessage()));
return;
}
Y_VERIFY (responseType == FINISH || responseType == CONTINUE);
@@ -447,7 +447,7 @@ private:
channel.PingStartTime = now;
} else if ((now - channel.PingStartTime) > PingTimeout) {
Stat.AddCounter("PingTimeout", static_cast<ui64>(1));
- SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::TIMEOUT, "PingTimeout " + TimeoutInfo(channel.ActorID, now, channel.PingStartTime), true, true));
+ SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::TIMEOUT, "PingTimeout " + TimeoutInfo(channel.ActorID, now, channel.PingStartTime)));
}
}
@@ -490,7 +490,7 @@ private:
? 0
: maybeChannel->second.Retries
) + " " + JobDebugInfo(ev->Sender);
- SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNAVAILABLE, message, /*retriable = */ true, /*fallback =*/ true));
+ SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNAVAILABLE, message));
} else if (ev->Get()->SourceType == TEvPullDataRequest::EventType) {
TActivationContext::Schedule(TDuration::MilliSeconds(100),
new IEventHandle(maybeChannel->second.ActorID, SelfId(), new TEvPullDataRequest(INPUT_SIZE), IEventHandle::FlagTrackDelivery)
@@ -560,7 +560,7 @@ private:
} else if (channel.Requested && !channel.Finished) {
if (PullRequestTimeout && (now - channel.RequestTime) > PullRequestTimeout) {
Stat.AddCounter("ReadTimeout", static_cast<ui64>(1));
- SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::TIMEOUT, "PullTimeout " + TimeoutInfo(channel.ActorID, now, channel.RequestTime), false, true));
+ SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::TIMEOUT, "PullTimeout " + TimeoutInfo(channel.ActorID, now, channel.RequestTime)));
}
}
}
@@ -677,12 +677,12 @@ private:
source.HasData = true;
Send(SelfId(), new TEvContinueRun());
} catch (...) {
- SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSPECIFIED, CurrentExceptionMessage(), false, false));
+ SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSPECIFIED, CurrentExceptionMessage()));
}
}
void OnSourceError(const IDqSourceActor::TEvSourceError::TPtr& ev) {
Y_UNUSED(ev->Get()->InputIndex);
- SendFailure(MakeHolder<TEvDqFailure>(ev->Get()->IsFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, ev->Get()->Issues.ToString(), !ev->Get()->IsFatal, !ev->Get()->IsFatal));
+ SendFailure(MakeHolder<TEvDqFailure>(ev->Get()->IsFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, ev->Get()->Issues.ToString()));
}
void OnSourcePushFinished(TEvSourcePushFinished::TPtr& ev, const TActorContext& ctx) {
auto index = ev->Get()->Index;
@@ -698,14 +698,14 @@ private:
void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) override {
Y_UNUSED(outputIndex);
- SendFailure(MakeHolder<TEvDqFailure>(isFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, issues.ToString(), !isFatal, !isFatal));
+ SendFailure(MakeHolder<TEvDqFailure>(isFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, issues.ToString()));
}
void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override {
Y_UNUSED(state);
Y_UNUSED(outputIndex);
Y_UNUSED(checkpoint);
- SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::BAD_REQUEST, "Unimplemented", false, false));
+ SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::BAD_REQUEST, "Unimplemented"));
}
void SinkSend(
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp
index fd1a21e54ec..a050354be0c 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp
@@ -100,6 +100,7 @@ bool IsRetriable(NYql::NDqProto::StatusIds::StatusCode statusCode) {
case NYql::NDqProto::StatusIds::SUCCESS:
case NYql::NDqProto::StatusIds::BAD_REQUEST:
case NYql::NDqProto::StatusIds::LIMIT_EXCEEDED:
+ case NYql::NDqProto::StatusIds::UNSUPPORTED:
return false;
default:
return true;
@@ -126,13 +127,7 @@ bool NeedFallback(NYql::NDqProto::StatusIds::StatusCode statusCode) {
}
bool NeedFallback(const NDq::TEvDq::TEvAbortExecution::TPtr& ev) {
- const auto& issues = ev->Get()->GetIssues();
- for (auto it = issues.begin(); it < issues.end(); it++) {
- if (it->GetCode() == TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR) {
- return true;
- }
- }
- return false;
+ return NeedFallback(ev->Get()->Record.GetStatusCode());
}
} // namespace NCommon
diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp
index 8ecd1cb0f93..b771c1df7f9 100644
--- a/ydb/library/yql/providers/dq/service/grpc_service.cpp
+++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp
@@ -131,10 +131,15 @@ namespace NYql::NDqs {
queryResult.Mutableresult()->CopyFrom(result.resultset());
queryResult.set_yson(result.yson());
- auto needFallback = result.GetDeprecatedNeedFallback();
-
- if (needFallback != NCommon::NeedFallback(ev->Get()->Record.GetStatusCode())) {
- Counters->GetSubgroup("MistmatchedNeedFallback", needFallback ? "False" : "True")->GetCounter(NYql::NDqProto::StatusIds_StatusCode_Name(ev->Get()->Record.GetStatusCode()))->Inc();
+ bool needFallback;
+ auto statusCode = result.GetStatusCode();
+ if (statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED) {
+ needFallback = NCommon::NeedFallback(statusCode);
+ if (needFallback != result.GetDeprecatedNeedFallback()) {
+ Counters->GetSubgroup("MistmatchedNeedFallback", needFallback ? "True" : "False")->GetCounter(NYql::NDqProto::StatusIds_StatusCode_Name(statusCode))->Inc();
+ }
+ } else {
+ needFallback = result.GetDeprecatedNeedFallback();
}
if (needFallback) {
@@ -185,27 +190,34 @@ namespace NYql::NDqs {
}
void OnReturnResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext& ctx) {
+ auto& result = ev->Get()->Record;
Y_UNUSED(ctx);
YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << "TServiceProxyActor::OnReturnResult " << ev->Get()->Record.GetMetric().size();
- QueryStat.AddCounters(ev->Get()->Record);
- auto retriable = ev->Get()->Record.GetDeprecatedRetriable();
-
- if (retriable != NCommon::IsRetriable(ev->Get()->Record.GetStatusCode())) {
- Counters->GetSubgroup("MistmatchedRetriable", retriable ? "False" : "True")->GetCounter(NYql::NDqProto::StatusIds_StatusCode_Name(ev->Get()->Record.GetStatusCode()))->Inc();
+ YQL_LOG(DEBUG) << "TServiceProxyActor::OnReturnResult " << result.GetMetric().size();
+ QueryStat.AddCounters(result);
+
+ bool retriable;
+ auto statusCode = result.GetStatusCode();
+ if (statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED) {
+ retriable = NCommon::IsRetriable(statusCode);
+ if (retriable != result.GetDeprecatedRetriable()) {
+ Counters->GetSubgroup("MistmatchedRetriable", retriable ? "True" : "False")->GetCounter(NYql::NDqProto::StatusIds_StatusCode_Name(statusCode))->Inc();
+ }
+ } else {
+ retriable = result.GetDeprecatedRetriable();
}
- if (ev->Get()->Record.GetIssues().size() > 0 && retriable && Retry < MaxRetries) {
+ if (result.GetIssues().size() > 0 && retriable && Retry < MaxRetries) {
QueryStat.AddCounter(RetryName, TDuration::MilliSeconds(0));
NYql::TIssues issues;
- NYql::IssuesFromMessage(ev->Get()->Record.GetIssues(), issues);
+ NYql::IssuesFromMessage(result.GetIssues(), issues);
YQL_LOG(WARN) << RetryName << " " << Retry << " Issues: " << issues.ToString();
DoRetry();
} else {
- auto needFallback = ev->Get()->Record.GetDeprecatedNeedFallback();
- if (ev->Get()->Record.GetIssues().size() > 0) {
+ auto needFallback = NCommon::NeedFallback(statusCode);
+ if (result.GetIssues().size() > 0) {
NYql::TIssues issues;
- NYql::IssuesFromMessage(ev->Get()->Record.GetIssues(), issues);
+ NYql::IssuesFromMessage(result.GetIssues(), issues);
YQL_LOG(WARN) << "Issues: " << issues.ToString();
*ErrorCounter += 1;
}
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index 9e15d504b59..a2321bf2529 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -445,7 +445,7 @@ private:
TaskRunner = Factory->GetOld(ev->Get()->Task, TraceId);
} catch (...) {
TString message = "Could not create TaskRunner for " + ToString(taskId) + " on node " + ToString(replyTo.NodeId()) + ", error: " + CurrentExceptionMessage();
- Send(replyTo, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, message, /*retriable = */ true, /*fallback=*/ true), 0, cookie);
+ Send(replyTo, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, message), 0, cookie);
return;
}