diff options
author | whcrc <whcrc@yandex-team.ru> | 2022-03-16 13:00:11 +0300 |
---|---|---|
committer | whcrc <whcrc@yandex-team.ru> | 2022-03-16 13:00:11 +0300 |
commit | 300df038e49352c7e8a2bf1f4e55a306465705f2 (patch) | |
tree | e5c46bb5dc317843e662d93fb6526bffe00672c1 | |
parent | f2bea43f564daffc657e2b30cad3b044bd2cb7c7 (diff) | |
download | ydb-300df038e49352c7e8a2bf1f4e55a306465705f2.tar.gz |
YQL-14357: CA errors, use TEvAbortExecution
ref:4a6fca8c7f7963986154be3d16f2d027f8622026
8 files changed, 89 insertions, 74 deletions
diff --git a/ydb/library/yql/dq/actors/task_runner/CMakeLists.txt b/ydb/library/yql/dq/actors/task_runner/CMakeLists.txt index f144d290c8f..1810e784dc9 100644 --- a/ydb/library/yql/dq/actors/task_runner/CMakeLists.txt +++ b/ydb/library/yql/dq/actors/task_runner/CMakeLists.txt @@ -22,7 +22,6 @@ target_link_libraries(dq-actors-task_runner PUBLIC yql-minikql-computation yql-utils-actors ydb-core-protos - dq-api-protos ) target_sources(dq-actors-task_runner PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/task_runner/events.cpp diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index a3bfa2680a5..fc6d7ba952b 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -2,16 +2,14 @@ #include <ydb/core/protos/services.pb.h> +#include <ydb/library/yql/core/issue/protos/issue_id.pb.h> + #include <ydb/library/yql/dq/actors/dq.h> #include <ydb/library/yql/dq/actors/task_runner/task_runner_actor.h> #include <ydb/library/yql/dq/runtime/dq_tasks_runner.h> -#include <ydb/library/yql/providers/dq/actors/events.h> -#include <ydb/library/yql/providers/dq/api/protos/service.pb.h> -#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> - #include <library/cpp/actors/core/hfunc.h> using namespace NActors; @@ -32,7 +30,6 @@ public: , Parent(parent) , Factory(factory) , TraceId(traceId) - , Settings(MakeIntrusive<TDqConfiguration>()) { } ~TLocalTaskRunnerActor() @@ -222,13 +219,6 @@ private: } void OnDqTask(TEvTaskRunnerCreate::TPtr& ev, const NActors::TActorContext& ctx) { - { - Yql::DqsProto::TTaskMeta taskMeta; - ev->Get()->Task.GetMeta().UnpackTo(&taskMeta); - Settings->Dispatch(taskMeta.GetSettings()); - Settings->FreezeDefaults(); - } - ParentId = ev->Sender; TaskRunner = Factory(ev->Get()->Task, [](const TString& message) { LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, message); @@ -274,12 +264,8 @@ private: } } - THolder<IEventBase> GetError(const TString& message) { - const auto issue = TIssue(message).SetCode(TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR); - if (Settings->EnableComputeActor.Get().GetOrElse(false)) { - return MakeHolder<NDq::TEvDq::TEvAbortExecution>(Ydb::StatusIds::BAD_REQUEST, TVector<TIssue>{issue}); - } - return MakeHolder<NDqs::TEvDqFailure>(issue, false, false); + THolder<TEvDq::TEvAbortExecution> GetError(const TString& message) { + return MakeHolder<TEvDq::TEvAbortExecution>(Ydb::StatusIds::BAD_REQUEST, TVector<TIssue>{TIssue(message).SetCode(TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR)}); } NActors::TActorId ParentId; @@ -289,7 +275,6 @@ private: THashSet<ui32> Inputs; THashSet<ui32> Sources; TIntrusivePtr<NDq::IDqTaskRunner> TaskRunner; - TIntrusivePtr<TDqConfiguration> Settings; }; struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory { diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp index 8a8d63ffaac..83996830524 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller.cpp +++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp @@ -107,18 +107,7 @@ private: auto ydbStatusId = ev->Get()->Record.GetStatusCode(); TIssues issues = ev->Get()->GetIssues(); YQL_LOG(DEBUG) << "AbortExecution from " << ev->Sender << ":" << ydbStatusId << " " << issues.ToOneLineString(); - auto retry = true; - if (ydbStatusId == Ydb::StatusIds::BAD_REQUEST) { - retry = false; - } - auto fallback = false; - for (auto it = issues.begin(); it < issues.end(); it++) { - if (it->GetCode() == TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR) { - fallback = true; - break; - } - } - OnError(issues, retry, fallback); + OnError(issues, NCommon::IsRetriable(ev), NCommon::NeedFallback(ev)); } void OnComputeActorState(NDq::TEvDqCompute::TEvState::TPtr& ev) { diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index 36e36daa0f3..b08f00916cd 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -1,5 +1,8 @@ #include "worker_actor.h" +#include <ydb/library/yql/dq/actors/dq.h> +#include <ydb/library/yql/providers/dq/common/yql_dq_common.h> + #include <ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h> #include <ydb/library/yql/providers/dq/runtime/runtime_data.h> @@ -136,7 +139,10 @@ private: HFunc(TEvTaskRunFinished, OnRunFinished); HFunc(TEvSourcePushFinished, OnSourcePushFinished); - HFunc(TEvDqFailure, OnErrorFromPipe) + // weird to have two events for error handling, but we need to use TEvDqFailure + // between worker_actor <-> executer_actor, cause it transmits statistics in 'Metric' field + HFunc(NDq::TEvDq::TEvAbortExecution, OnErrorFromPipe); // received from task_runner_actor + HFunc(TEvDqFailure, OnError); // received from this actor itself HFunc(TEvContinueRun, OnContinueRun); cFunc(TEvents::TEvWakeup::EventType, OnWakeup); @@ -144,7 +150,57 @@ private: hFunc(IDqSourceActor::TEvSourceError, OnSourceError); }) - void OnErrorFromPipe(TEvDqFailure::TPtr& ev, const TActorContext&) { + void ExtractStats(::Ydb::Issue::IssueMessage* issue) { + TString filteredMessage; + for (auto line : StringSplitter(TString(issue->message().data(), issue->message().size())).SplitByString("\n").SkipEmpty()) { + if (line.StartsWith("Counter1:")) { + TVector<TString> parts; + Split(TString(line), " ", parts); + if (parts.size() >= 3) { + auto name = parts[1]; + i64 value; + if (TryFromString<i64>(parts[2], value)) { + Stat.AddCounter(name, TDuration::MilliSeconds(value)); + } + } + } else if (line.StartsWith("Counter:")) { + TVector<TString> parts; + Split(TString(line), " ", parts); + // name sum min max avg count + if (parts.size() >= 7) { + auto name = parts[1]; + TCounters::TEntry entry; + if ( + TryFromString<i64>(parts[2], entry.Sum) && + TryFromString<i64>(parts[3], entry.Min) && + TryFromString<i64>(parts[4], entry.Max) && + TryFromString<i64>(parts[5], entry.Avg) && + TryFromString<i64>(parts[6], entry.Count)) + { + Stat.AddCounter(name, entry); + } + } + } else { + filteredMessage += line; + filteredMessage += "\n"; + } + } + issue->set_message(filteredMessage); + } + + void OnErrorFromPipe(NDq::TEvDq::TEvAbortExecution::TPtr& ev, const TActorContext&) { + for (size_t i = 0; i < ev->Get()->Record.IssuesSize(); i++) { + ExtractStats(ev->Get()->Record.MutableIssues(i)); + } + // hacky conversion to TEvDqFailure + auto convertedError = MakeHolder<TEvDqFailure>(); + convertedError->Record.SetRetriable(NCommon::IsRetriable(ev)); + convertedError->Record.SetNeedFallback(NCommon::NeedFallback(ev)); + convertedError->Record.MutableIssues()->Swap(ev->Get()->Record.MutableIssues()); + SendFailure(std::move(convertedError)); // enreached with stats inside + } + + void OnError(TEvDqFailure::TPtr& ev, const TActorContext&) { SendFailure(ev->Release()); } diff --git a/ydb/library/yql/providers/dq/common/CMakeLists.txt b/ydb/library/yql/providers/dq/common/CMakeLists.txt index 78e044f65e2..8fb26007ac1 100644 --- a/ydb/library/yql/providers/dq/common/CMakeLists.txt +++ b/ydb/library/yql/providers/dq/common/CMakeLists.txt @@ -19,6 +19,7 @@ target_link_libraries(providers-dq-common PUBLIC library-yql-sql library-yql-utils yql-utils-log + yql-dq-actors ) target_sources(providers-dq-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/common/attrs.cpp diff --git a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp index bd339634427..b77b1c21540 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp @@ -1,5 +1,7 @@ #include "yql_dq_common.h" +#include <ydb/library/yql/core/issue/protos/issue_id.pb.h> + #include <ydb/library/yql/utils/yql_panic.h> #include <ydb/library/yql/minikql/mkql_alloc.h> @@ -92,6 +94,20 @@ bool ParseCounterName(TString* prefix, std::map<TString, TString>* labels, TStri return !name->empty(); } +bool IsRetriable(const NDq::TEvDq::TEvAbortExecution::TPtr& ev) { + const auto& ydbStatusId = ev->Get()->Record.GetStatusCode(); + return ydbStatusId != Ydb::StatusIds::BAD_REQUEST; +} + +bool NeedFallback(const NDq::TEvDq::TEvAbortExecution::TPtr& ev) { + const auto& issues = ev->Get()->GetIssues(); + for (auto it = issues.begin(); it < issues.end(); it++) { + if (it->GetCode() == TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR) { + return true; + } + } + return false; +} } // namespace NCommon } // namespace NYql diff --git a/ydb/library/yql/providers/dq/common/yql_dq_common.h b/ydb/library/yql/providers/dq/common/yql_dq_common.h index c1c865a7e5b..3620862b128 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_common.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_common.h @@ -1,5 +1,7 @@ #pragma once +#include <ydb/library/yql/dq/actors/dq.h> + #include <util/generic/string.h> #include <map> @@ -12,5 +14,8 @@ TString GetSerializedResultType(const TString& program); bool ParseCounterName(TString* prefix, std::map<TString, TString>* labels, TString* name, const TString& counterName); +bool IsRetriable(const NDq::TEvDq::TEvAbortExecution::TPtr& ev); + +bool NeedFallback(const NDq::TEvDq::TEvAbortExecution::TPtr& ev); } // namespace NCommon } // namespace NYql diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index af7151f48b5..11f6dc01037 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -82,9 +82,8 @@ private: bool Retriable; bool Fallback; TString FilteredStderr; - NDqProto::TDqFailure MetricContainer; - TRunResult() : Retriable(false), Fallback(false), FilteredStderr(), MetricContainer() { + TRunResult() : Retriable(false), Fallback(false), FilteredStderr() { } }; @@ -98,36 +97,8 @@ private: } TRunResult result; - NYql::TCounters stat; for (TStringBuf line: StringSplitter(input).SplitByString("\n").SkipEmpty()) { - if (line.StartsWith("Counter1:")) { - TVector<TString> parts; - Split(TString(line), " ", parts); - if (parts.size() >= 3) { - auto name = parts[1]; - i64 value; - if (TryFromString<i64>(parts[2], value)) { - stat.AddCounter(name, TDuration::MilliSeconds(value)); - } - } - } else if (line.StartsWith("Counter:")) { - TVector<TString> parts; - Split(TString(line), " ", parts); - // name sum min max avg count - if (parts.size() >= 7) { - auto name = parts[1]; - TCounters::TEntry entry; - if ( - TryFromString<i64>(parts[2], entry.Sum) && - TryFromString<i64>(parts[3], entry.Min) && - TryFromString<i64>(parts[4], entry.Max) && - TryFromString<i64>(parts[5], entry.Avg) && - TryFromString<i64>(parts[6], entry.Count)) - { - stat.AddCounter(name, entry); - } - } - } else if (line.Contains("mlockall failed")) { + if (line.Contains("mlockall failed")) { // skip } else { if (!result.Fallback) { @@ -178,11 +149,10 @@ private: result.FilteredStderr += "\n"; } } - stat.FlushCounters(result.MetricContainer); return result; } - static THolder<IEventBase> StatusToError( + static THolder<TEvDq::TEvAbortExecution> StatusToError( const TEvError::TStatus& status, TIntrusivePtr<TDqConfiguration> settings, ui64 stageId, @@ -209,13 +179,7 @@ private: issue.AddSubIssue(MakeIntrusive<TIssue>(YqlIssue(parsedPos.GetOrElse(TPosition()), TIssuesIds::DQ_GATEWAY_ERROR, TString{terminationMessage}))); } } - - if (settings->EnableComputeActor.Get().GetOrElse(false)) { - return MakeHolder<NDq::TEvDq::TEvAbortExecution>(runResult.Retriable ? Ydb::StatusIds::UNAVAILABLE : Ydb::StatusIds::BAD_REQUEST, TVector<TIssue>{issue}); - } - auto dqFailure = MakeHolder<TEvDqFailure>(issue, runResult.Retriable, runResult.Fallback); - dqFailure->Record.MutableMetric()->Swap(runResult.MetricContainer.MutableMetric()); - return dqFailure; + return MakeHolder<NDq::TEvDq::TEvAbortExecution>(runResult.Retriable ? Ydb::StatusIds::UNAVAILABLE : Ydb::StatusIds::BAD_REQUEST, TVector<TIssue>{issue}); } void PassAway() override { |