diff options
author | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-05-06 08:14:21 +0300 |
---|---|---|
committer | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-05-06 08:14:21 +0300 |
commit | 264071acbaeeec9bef990f960aa7a434098305dd (patch) | |
tree | 290a6aa5333e456061815351ca0a7e2a13f1d9d5 | |
parent | 1d4a0d58bc4c73b689ec12e075e65ca9bd70a5db (diff) | |
download | ydb-264071acbaeeec9bef990f960aa7a434098305dd.tar.gz |
Do not use Retriable and NeedFallback in code
ref:ff846ab5f5df0864ba682210f06609d183ba7047
11 files changed, 76 insertions, 79 deletions
diff --git a/ydb/core/yq/libs/actors/result_writer.cpp b/ydb/core/yq/libs/actors/result_writer.cpp index 62f47f4c86b..b3fee94c452 100644 --- a/ydb/core/yq/libs/actors/result_writer.cpp +++ b/ydb/core/yq/libs/actors/result_writer.cpp @@ -83,7 +83,7 @@ private: } void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&, const NActors::TActorContext& ) { - auto req = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNAVAILABLE, TIssue("Undelivered").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR), true, /*needFallback=*/false); + auto req = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNAVAILABLE, TIssue("Undelivered").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR)); Send(ExecuterId, req.Release()); HasError = true; } @@ -128,7 +128,7 @@ private: auto it = Requests.find(ev->Get()->Result.request_id()); if (it == Requests.end()) { HasError = true; - auto req = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssue("Unknown RequestId").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR), /*retriable=*/ false, /*needFallback=*/false); + auto req = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::BAD_REQUEST, TIssue("Unknown RequestId").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR)); Send(ExecuterId, req.Release()); return; } @@ -283,8 +283,7 @@ private: } } catch (...) { LOG_E(CurrentExceptionMessage()); - auto req = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssue("Internal error on data write").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR), /*retriable=*/ false, - /*needFallback=*/false); + auto req = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssue("Internal error on data write").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR)); Send(ExecuterId, req.Release()); HasError = true; } diff --git a/ydb/library/yql/providers/dq/actors/events.cpp b/ydb/library/yql/providers/dq/actors/events.cpp index 7a9e3f2aedd..4582deaee8c 100644 --- a/ydb/library/yql/providers/dq/actors/events.cpp +++ b/ydb/library/yql/providers/dq/actors/events.cpp @@ -12,25 +12,22 @@ namespace NYql::NDqs { Record.SetStatusCode(statusCode); } - TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool retriable, bool needFallback) { + TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) { IssuesToMessage(issues, Record.MutableIssues()); - Record.SetDeprecatedRetriable(retriable); - Record.SetDeprecatedNeedFallback(needFallback); Record.SetStatusCode(statusCode); } - TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssue& issue, bool retriable, bool needFallback) - : TEvDqFailure(statusCode, TIssues({issue}), retriable, needFallback) + TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssue& issue) + : TEvDqFailure(statusCode, TIssues({issue})) { } - TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& error, bool retriable, bool needFallback) - : TEvDqFailure( - statusCode, +/* TIssue(error).SetCode( needFallback ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR : TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR), - retriable, - needFallback) { +*/ + TEvDqFailure::TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString&) + : TEvDqFailure(statusCode) { } TEvQueryResponse::TEvQueryResponse(NDqProto::TQueryResponse&& queryResult) { diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h index 19f842dbe9b..220eb453ec2 100644 --- a/ydb/library/yql/providers/dq/actors/events.h +++ b/ydb/library/yql/providers/dq/actors/events.h @@ -22,9 +22,9 @@ namespace NYql::NDqs { struct TEvDqFailure : NActors::TEventPB<TEvDqFailure, NDqProto::TDqFailure, TDqExecuterEvents::ES_DQ_FAILURE> { TEvDqFailure() = default; explicit TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode); - TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool retriable = false, bool needFallback = false); - TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssue& issue, bool retriable = false, bool needFallback = false); - TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& error, bool retriable, bool needFallback); + TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues); + TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssue& issue); + TEvDqFailure(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& error); }; struct TEvQueryResponse diff --git a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp index 9bfd095bdd6..f318eefa4be 100644 --- a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp +++ b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp @@ -87,7 +87,7 @@ private: TFailureInjector::Reach("full_result_fail_on_finish", [] { throw yexception() << "full_result_fail_on_finish"; }); FullResultWriter->Finish(); if (ErrorMessage) { - Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, *ErrorMessage, false, true)); + Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSUPPORTED, TIssue(*ErrorMessage).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR))); } else { Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::SUCCESS).Release()); } @@ -97,7 +97,7 @@ private: if (ErrorMessage) { issue.AddSubIssue(MakeIntrusive<TIssue>(*ErrorMessage)); } - Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, issue, false, true).Release()); + Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSUPPORTED, issue).Release()); } Send(SelfId(), MakeHolder<NActors::TEvents::TEvPoison>()); } @@ -126,7 +126,7 @@ private: Send(AggregatorID, MakeHolder<TEvFullResultWriterAck>(ackRecord)); } catch (...) { ErrorMessage = CurrentExceptionMessage(); - Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, *ErrorMessage, false, true)); + Send(AggregatorID, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSUPPORTED, TIssue(*ErrorMessage).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR))); } if (ErrorMessage) { diff --git a/ydb/library/yql/providers/dq/actors/result_actor_base.h b/ydb/library/yql/providers/dq/actors/result_actor_base.h index 361954d5aca..fe2bc1831cf 100644 --- a/ydb/library/yql/providers/dq/actors/result_actor_base.h +++ b/ydb/library/yql/providers/dq/actors/result_actor_base.h @@ -4,6 +4,7 @@ #include <ydb/library/yql/providers/dq/actors/events.h> #include <ydb/library/yql/providers/dq/actors/proto_builder.h> #include <ydb/library/yql/providers/dq/api/protos/dqs.pb.h> +#include <ydb/library/yql/providers/dq/common/yql_dq_common.h> #include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> #include <ydb/library/yql/providers/dq/counters/counters.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> @@ -95,7 +96,7 @@ namespace NYql::NDqs::NExecutionHelpers { return true; }); } catch (...) { - OnError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, CurrentExceptionMessage(), false, true); + OnError(NYql::NDqProto::StatusIds::UNSUPPORTED, CurrentExceptionMessage()); return; } @@ -124,14 +125,14 @@ namespace NYql::NDqs::NExecutionHelpers { } } - void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message, bool retriable, bool needFallback) { + void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message) { YQL_LOG(ERROR) << "OnError " << message; - auto issueCode = needFallback + auto issueCode = NCommon::NeedFallback(statusCode) ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR : TIssuesIds::DQ_GATEWAY_ERROR; const auto issue = TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR); Issues.AddIssues({issue}); // remember issue to pass it with TEvQueryResponse, cause executor_actor ignores TEvDqFailure after finish - auto req = MakeHolder<TEvDqFailure>(statusCode, issue, retriable, needFallback); + auto req = MakeHolder<TEvDqFailure>(statusCode, issue); FlushCounters(req->Record); TBase::Send(ExecuterID, req.Release()); } @@ -222,7 +223,7 @@ namespace NYql::NDqs::NExecutionHelpers { TString message = "Undelivered from " + ToString(ev->Sender) + " to " + ToString(TBase::SelfId()) + " reason: " + ToString(ev->Get()->Reason) + " sourceType: " + ToString(ev->Get()->SourceType >> 16) + "." + ToString(ev->Get()->SourceType & 0xFFFF); - OnError(NYql::NDqProto::StatusIds::UNAVAILABLE, message, true, true); + OnError(NYql::NDqProto::StatusIds::UNAVAILABLE, message); } void OnFullResultWriterAck(TEvFullResultWriterAck::TPtr& ev, const NActors::TActorContext&) { @@ -283,7 +284,7 @@ namespace NYql::NDqs::NExecutionHelpers { YQL_LOG_CTX_SCOPE(TraceId); if (auto msg = ev->Get()->Record.GetErrorMessage()) { - OnError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, msg, false, true); + OnError(NYql::NDqProto::StatusIds::UNSUPPORTED, msg); } else { NActorsProto::TActorId fullResultWriterProto; ev->Get()->Record.GetMessage().UnpackTo(&fullResultWriterProto); diff --git a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp index 7f6c383238e..cca0b79d9a8 100644 --- a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp +++ b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp @@ -105,11 +105,11 @@ private: YQL_LOG(DEBUG) << __FUNCTION__; auto now = TInstant::Now(); if (PullRequestTimeout && now - PullRequestStartTime > PullRequestTimeout) { - OnError(NYql::NDqProto::StatusIds::TIMEOUT, "Timeout " + ToString(SourceID.NodeId()), true, true); + OnError(NYql::NDqProto::StatusIds::TIMEOUT, "Timeout " + ToString(SourceID.NodeId())); } if (PingTimeout && now - PingStartTime > PingTimeout) { - OnError(NYql::NDqProto::StatusIds::TIMEOUT, "PingTimeout " + ToString(SourceID.NodeId()), true, true); + OnError(NYql::NDqProto::StatusIds::TIMEOUT, "PingTimeout " + ToString(SourceID.NodeId())); } if (!PingRequested) { @@ -171,7 +171,7 @@ private: Schedule(TDuration::MilliSeconds(10), new TEvPullResult()); return; case NYql::NDqProto::ERROR: { - OnError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, ev->Get()->Record.GetErrorMessage(), false, true); + OnError(NYql::NDqProto::StatusIds::UNSUPPORTED, ev->Get()->Record.GetErrorMessage()); break; } case NYql::NDqProto::UNKNOWN: diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp index a7b30fa5286..379a5c0ce91 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller.cpp +++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp @@ -99,7 +99,7 @@ private: TString message = "Undelivered Event " + ToString(ev->Get()->SourceType) + " from " + ToString(SelfId()) + " (Self) to " + ToString(ev->Sender) + + " Reason: " + ToString(ev->Get()->Reason) + " Cookie: " + ToString(ev->Cookie); - OnError(NYql::NDqProto::StatusIds::UNAVAILABLE, message, true, true); + OnError(NYql::NDqProto::StatusIds::UNAVAILABLE, message); } void OnAbortExecution(NDq::TEvDq::TEvAbortExecution::TPtr& ev) { @@ -107,7 +107,7 @@ private: auto statusCode = ev->Get()->Record.GetStatusCode(); TIssues issues = ev->Get()->GetIssues(); YQL_LOG(DEBUG) << "AbortExecution from " << ev->Sender << ":" << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) << " " << issues.ToOneLineString(); - OnError(statusCode, issues, NCommon::IsRetriable(ev), NCommon::NeedFallback(ev)); + OnError(statusCode, issues); } void OnComputeActorState(NDq::TEvDqCompute::TEvState::TPtr& ev) { @@ -134,20 +134,13 @@ private: case NDqProto::COMPUTE_STATE_UNKNOWN: { // TODO: use issues TString message = "unexpected state from " + ToString(computeActor) + ", task: " + ToString(taskId); - OnError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, message, false, false); + OnError(NYql::NDqProto::StatusIds::BAD_REQUEST, message); break; } case NDqProto::COMPUTE_STATE_FAILURE: { // TODO: don't convert issues to string NYql::IssuesFromMessage(state.GetIssues(), Issues); - bool retriable = true; - for (const auto& issue : Issues) { - if (issue.IssueCode == TIssuesIds::UNEXPECTED) { - retriable = false; - break; - } - } - OnError(state.GetStatusCode(), Issues, retriable, false); + OnError(state.GetStatusCode(), Issues); break; } case NDqProto::COMPUTE_STATE_EXECUTING: { @@ -490,25 +483,25 @@ private: } } - void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool retriable, bool needFallback) { + void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) { YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(DEBUG) << "OnError " << issues.ToOneLineString() << " " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode); if (Finished) { YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(WARN) << "OnError IGNORED when Finished, Retriable=" << retriable << ", NeedFallback=" << needFallback; + YQL_LOG(WARN) << "OnError IGNORED when Finished"; } else { - auto req = MakeHolder<TEvDqFailure>(statusCode, issues, retriable, needFallback); + auto req = MakeHolder<TEvDqFailure>(statusCode, issues); FinalStat().FlushCounters(req->Record); Send(ExecuterId, req.Release()); Finished = true; } } - void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message, bool retriable, bool needFallback) { - auto issueCode = needFallback + void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message) { + auto issueCode = NCommon::NeedFallback(statusCode) ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR : TIssuesIds::DQ_GATEWAY_ERROR; - OnError(statusCode, TIssues({TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR)}), retriable, needFallback); + OnError(statusCode, TIssues({TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR)})); } void Finish() { diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index 1fafa3442a2..e0fd163afcd 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -334,7 +334,7 @@ private: Schedule(PingPeriod, new TEvents::TEvWakeup); } } catch (...) { - SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, CurrentExceptionMessage(), false, false)); + SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, CurrentExceptionMessage())); } } @@ -387,7 +387,7 @@ private: Run(ctx); } catch (...) { - SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, CurrentExceptionMessage(), false, false)); + SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, CurrentExceptionMessage())); } } @@ -411,7 +411,7 @@ private: return; } if (responseType == ERROR) { - Send(SelfId(), MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSPECIFIED, ev->Get()->Record.GetErrorMessage(), false, false)); + Send(SelfId(), MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSPECIFIED, ev->Get()->Record.GetErrorMessage())); return; } Y_VERIFY (responseType == FINISH || responseType == CONTINUE); @@ -447,7 +447,7 @@ private: channel.PingStartTime = now; } else if ((now - channel.PingStartTime) > PingTimeout) { Stat.AddCounter("PingTimeout", static_cast<ui64>(1)); - SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::TIMEOUT, "PingTimeout " + TimeoutInfo(channel.ActorID, now, channel.PingStartTime), true, true)); + SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::TIMEOUT, "PingTimeout " + TimeoutInfo(channel.ActorID, now, channel.PingStartTime))); } } @@ -490,7 +490,7 @@ private: ? 0 : maybeChannel->second.Retries ) + " " + JobDebugInfo(ev->Sender); - SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNAVAILABLE, message, /*retriable = */ true, /*fallback =*/ true)); + SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNAVAILABLE, message)); } else if (ev->Get()->SourceType == TEvPullDataRequest::EventType) { TActivationContext::Schedule(TDuration::MilliSeconds(100), new IEventHandle(maybeChannel->second.ActorID, SelfId(), new TEvPullDataRequest(INPUT_SIZE), IEventHandle::FlagTrackDelivery) @@ -560,7 +560,7 @@ private: } else if (channel.Requested && !channel.Finished) { if (PullRequestTimeout && (now - channel.RequestTime) > PullRequestTimeout) { Stat.AddCounter("ReadTimeout", static_cast<ui64>(1)); - SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::TIMEOUT, "PullTimeout " + TimeoutInfo(channel.ActorID, now, channel.RequestTime), false, true)); + SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::TIMEOUT, "PullTimeout " + TimeoutInfo(channel.ActorID, now, channel.RequestTime))); } } } @@ -677,12 +677,12 @@ private: source.HasData = true; Send(SelfId(), new TEvContinueRun()); } catch (...) { - SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSPECIFIED, CurrentExceptionMessage(), false, false)); + SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSPECIFIED, CurrentExceptionMessage())); } } void OnSourceError(const IDqSourceActor::TEvSourceError::TPtr& ev) { Y_UNUSED(ev->Get()->InputIndex); - SendFailure(MakeHolder<TEvDqFailure>(ev->Get()->IsFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, ev->Get()->Issues.ToString(), !ev->Get()->IsFatal, !ev->Get()->IsFatal)); + SendFailure(MakeHolder<TEvDqFailure>(ev->Get()->IsFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, ev->Get()->Issues.ToString())); } void OnSourcePushFinished(TEvSourcePushFinished::TPtr& ev, const TActorContext& ctx) { auto index = ev->Get()->Index; @@ -698,14 +698,14 @@ private: void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) override { Y_UNUSED(outputIndex); - SendFailure(MakeHolder<TEvDqFailure>(isFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, issues.ToString(), !isFatal, !isFatal)); + SendFailure(MakeHolder<TEvDqFailure>(isFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, issues.ToString())); } void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override { Y_UNUSED(state); Y_UNUSED(outputIndex); Y_UNUSED(checkpoint); - SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::BAD_REQUEST, "Unimplemented", false, false)); + SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::BAD_REQUEST, "Unimplemented")); } void SinkSend( diff --git a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp index fd1a21e54ec..a050354be0c 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp @@ -100,6 +100,7 @@ bool IsRetriable(NYql::NDqProto::StatusIds::StatusCode statusCode) { case NYql::NDqProto::StatusIds::SUCCESS: case NYql::NDqProto::StatusIds::BAD_REQUEST: case NYql::NDqProto::StatusIds::LIMIT_EXCEEDED: + case NYql::NDqProto::StatusIds::UNSUPPORTED: return false; default: return true; @@ -126,13 +127,7 @@ bool NeedFallback(NYql::NDqProto::StatusIds::StatusCode statusCode) { } bool NeedFallback(const NDq::TEvDq::TEvAbortExecution::TPtr& ev) { - const auto& issues = ev->Get()->GetIssues(); - for (auto it = issues.begin(); it < issues.end(); it++) { - if (it->GetCode() == TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR) { - return true; - } - } - return false; + return NeedFallback(ev->Get()->Record.GetStatusCode()); } } // namespace NCommon diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp index 8ecd1cb0f93..b771c1df7f9 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.cpp +++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp @@ -131,10 +131,15 @@ namespace NYql::NDqs { queryResult.Mutableresult()->CopyFrom(result.resultset()); queryResult.set_yson(result.yson()); - auto needFallback = result.GetDeprecatedNeedFallback(); - - if (needFallback != NCommon::NeedFallback(ev->Get()->Record.GetStatusCode())) { - Counters->GetSubgroup("MistmatchedNeedFallback", needFallback ? "False" : "True")->GetCounter(NYql::NDqProto::StatusIds_StatusCode_Name(ev->Get()->Record.GetStatusCode()))->Inc(); + bool needFallback; + auto statusCode = result.GetStatusCode(); + if (statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED) { + needFallback = NCommon::NeedFallback(statusCode); + if (needFallback != result.GetDeprecatedNeedFallback()) { + Counters->GetSubgroup("MistmatchedNeedFallback", needFallback ? "True" : "False")->GetCounter(NYql::NDqProto::StatusIds_StatusCode_Name(statusCode))->Inc(); + } + } else { + needFallback = result.GetDeprecatedNeedFallback(); } if (needFallback) { @@ -185,27 +190,34 @@ namespace NYql::NDqs { } void OnReturnResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext& ctx) { + auto& result = ev->Get()->Record; Y_UNUSED(ctx); YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << "TServiceProxyActor::OnReturnResult " << ev->Get()->Record.GetMetric().size(); - QueryStat.AddCounters(ev->Get()->Record); - auto retriable = ev->Get()->Record.GetDeprecatedRetriable(); - - if (retriable != NCommon::IsRetriable(ev->Get()->Record.GetStatusCode())) { - Counters->GetSubgroup("MistmatchedRetriable", retriable ? "False" : "True")->GetCounter(NYql::NDqProto::StatusIds_StatusCode_Name(ev->Get()->Record.GetStatusCode()))->Inc(); + YQL_LOG(DEBUG) << "TServiceProxyActor::OnReturnResult " << result.GetMetric().size(); + QueryStat.AddCounters(result); + + bool retriable; + auto statusCode = result.GetStatusCode(); + if (statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED) { + retriable = NCommon::IsRetriable(statusCode); + if (retriable != result.GetDeprecatedRetriable()) { + Counters->GetSubgroup("MistmatchedRetriable", retriable ? "True" : "False")->GetCounter(NYql::NDqProto::StatusIds_StatusCode_Name(statusCode))->Inc(); + } + } else { + retriable = result.GetDeprecatedRetriable(); } - if (ev->Get()->Record.GetIssues().size() > 0 && retriable && Retry < MaxRetries) { + if (result.GetIssues().size() > 0 && retriable && Retry < MaxRetries) { QueryStat.AddCounter(RetryName, TDuration::MilliSeconds(0)); NYql::TIssues issues; - NYql::IssuesFromMessage(ev->Get()->Record.GetIssues(), issues); + NYql::IssuesFromMessage(result.GetIssues(), issues); YQL_LOG(WARN) << RetryName << " " << Retry << " Issues: " << issues.ToString(); DoRetry(); } else { - auto needFallback = ev->Get()->Record.GetDeprecatedNeedFallback(); - if (ev->Get()->Record.GetIssues().size() > 0) { + auto needFallback = NCommon::NeedFallback(statusCode); + if (result.GetIssues().size() > 0) { NYql::TIssues issues; - NYql::IssuesFromMessage(ev->Get()->Record.GetIssues(), issues); + NYql::IssuesFromMessage(result.GetIssues(), issues); YQL_LOG(WARN) << "Issues: " << issues.ToString(); *ErrorCounter += 1; } diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index 9e15d504b59..a2321bf2529 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -445,7 +445,7 @@ private: TaskRunner = Factory->GetOld(ev->Get()->Task, TraceId); } catch (...) { TString message = "Could not create TaskRunner for " + ToString(taskId) + " on node " + ToString(replyTo.NodeId()) + ", error: " + CurrentExceptionMessage(); - Send(replyTo, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, message, /*retriable = */ true, /*fallback=*/ true), 0, cookie); + Send(replyTo, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, message), 0, cookie); return; } |