aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwhcrc <whcrc@yandex-team.ru>2022-03-16 13:00:11 +0300
committerwhcrc <whcrc@yandex-team.ru>2022-03-16 13:00:11 +0300
commit300df038e49352c7e8a2bf1f4e55a306465705f2 (patch)
treee5c46bb5dc317843e662d93fb6526bffe00672c1
parentf2bea43f564daffc657e2b30cad3b044bd2cb7c7 (diff)
downloadydb-300df038e49352c7e8a2bf1f4e55a306465705f2.tar.gz
YQL-14357: CA errors, use TEvAbortExecution
ref:4a6fca8c7f7963986154be3d16f2d027f8622026
-rw-r--r--ydb/library/yql/dq/actors/task_runner/CMakeLists.txt1
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp23
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller.cpp13
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp60
-rw-r--r--ydb/library/yql/providers/dq/common/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_common.cpp16
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_common.h5
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp44
8 files changed, 89 insertions, 74 deletions
diff --git a/ydb/library/yql/dq/actors/task_runner/CMakeLists.txt b/ydb/library/yql/dq/actors/task_runner/CMakeLists.txt
index f144d290c8f..1810e784dc9 100644
--- a/ydb/library/yql/dq/actors/task_runner/CMakeLists.txt
+++ b/ydb/library/yql/dq/actors/task_runner/CMakeLists.txt
@@ -22,7 +22,6 @@ target_link_libraries(dq-actors-task_runner PUBLIC
yql-minikql-computation
yql-utils-actors
ydb-core-protos
- dq-api-protos
)
target_sources(dq-actors-task_runner PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/task_runner/events.cpp
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index a3bfa2680a5..fc6d7ba952b 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -2,16 +2,14 @@
#include <ydb/core/protos/services.pb.h>
+#include <ydb/library/yql/core/issue/protos/issue_id.pb.h>
+
#include <ydb/library/yql/dq/actors/dq.h>
#include <ydb/library/yql/dq/actors/task_runner/task_runner_actor.h>
#include <ydb/library/yql/dq/runtime/dq_tasks_runner.h>
-#include <ydb/library/yql/providers/dq/actors/events.h>
-#include <ydb/library/yql/providers/dq/api/protos/service.pb.h>
-#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
-
#include <library/cpp/actors/core/hfunc.h>
using namespace NActors;
@@ -32,7 +30,6 @@ public:
, Parent(parent)
, Factory(factory)
, TraceId(traceId)
- , Settings(MakeIntrusive<TDqConfiguration>())
{ }
~TLocalTaskRunnerActor()
@@ -222,13 +219,6 @@ private:
}
void OnDqTask(TEvTaskRunnerCreate::TPtr& ev, const NActors::TActorContext& ctx) {
- {
- Yql::DqsProto::TTaskMeta taskMeta;
- ev->Get()->Task.GetMeta().UnpackTo(&taskMeta);
- Settings->Dispatch(taskMeta.GetSettings());
- Settings->FreezeDefaults();
- }
-
ParentId = ev->Sender;
TaskRunner = Factory(ev->Get()->Task, [](const TString& message) {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, message);
@@ -274,12 +264,8 @@ private:
}
}
- THolder<IEventBase> GetError(const TString& message) {
- const auto issue = TIssue(message).SetCode(TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR);
- if (Settings->EnableComputeActor.Get().GetOrElse(false)) {
- return MakeHolder<NDq::TEvDq::TEvAbortExecution>(Ydb::StatusIds::BAD_REQUEST, TVector<TIssue>{issue});
- }
- return MakeHolder<NDqs::TEvDqFailure>(issue, false, false);
+ THolder<TEvDq::TEvAbortExecution> GetError(const TString& message) {
+ return MakeHolder<TEvDq::TEvAbortExecution>(Ydb::StatusIds::BAD_REQUEST, TVector<TIssue>{TIssue(message).SetCode(TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR)});
}
NActors::TActorId ParentId;
@@ -289,7 +275,6 @@ private:
THashSet<ui32> Inputs;
THashSet<ui32> Sources;
TIntrusivePtr<NDq::IDqTaskRunner> TaskRunner;
- TIntrusivePtr<TDqConfiguration> Settings;
};
struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory {
diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp
index 8a8d63ffaac..83996830524 100644
--- a/ydb/library/yql/providers/dq/actors/task_controller.cpp
+++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp
@@ -107,18 +107,7 @@ private:
auto ydbStatusId = ev->Get()->Record.GetStatusCode();
TIssues issues = ev->Get()->GetIssues();
YQL_LOG(DEBUG) << "AbortExecution from " << ev->Sender << ":" << ydbStatusId << " " << issues.ToOneLineString();
- auto retry = true;
- if (ydbStatusId == Ydb::StatusIds::BAD_REQUEST) {
- retry = false;
- }
- auto fallback = false;
- for (auto it = issues.begin(); it < issues.end(); it++) {
- if (it->GetCode() == TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR) {
- fallback = true;
- break;
- }
- }
- OnError(issues, retry, fallback);
+ OnError(issues, NCommon::IsRetriable(ev), NCommon::NeedFallback(ev));
}
void OnComputeActorState(NDq::TEvDqCompute::TEvState::TPtr& ev) {
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index 36e36daa0f3..b08f00916cd 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -1,5 +1,8 @@
#include "worker_actor.h"
+#include <ydb/library/yql/dq/actors/dq.h>
+#include <ydb/library/yql/providers/dq/common/yql_dq_common.h>
+
#include <ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h>
#include <ydb/library/yql/providers/dq/runtime/runtime_data.h>
@@ -136,7 +139,10 @@ private:
HFunc(TEvTaskRunFinished, OnRunFinished);
HFunc(TEvSourcePushFinished, OnSourcePushFinished);
- HFunc(TEvDqFailure, OnErrorFromPipe)
+ // weird to have two events for error handling, but we need to use TEvDqFailure
+ // between worker_actor <-> executer_actor, cause it transmits statistics in 'Metric' field
+ HFunc(NDq::TEvDq::TEvAbortExecution, OnErrorFromPipe); // received from task_runner_actor
+ HFunc(TEvDqFailure, OnError); // received from this actor itself
HFunc(TEvContinueRun, OnContinueRun);
cFunc(TEvents::TEvWakeup::EventType, OnWakeup);
@@ -144,7 +150,57 @@ private:
hFunc(IDqSourceActor::TEvSourceError, OnSourceError);
})
- void OnErrorFromPipe(TEvDqFailure::TPtr& ev, const TActorContext&) {
+ void ExtractStats(::Ydb::Issue::IssueMessage* issue) {
+ TString filteredMessage;
+ for (auto line : StringSplitter(TString(issue->message().data(), issue->message().size())).SplitByString("\n").SkipEmpty()) {
+ if (line.StartsWith("Counter1:")) {
+ TVector<TString> parts;
+ Split(TString(line), " ", parts);
+ if (parts.size() >= 3) {
+ auto name = parts[1];
+ i64 value;
+ if (TryFromString<i64>(parts[2], value)) {
+ Stat.AddCounter(name, TDuration::MilliSeconds(value));
+ }
+ }
+ } else if (line.StartsWith("Counter:")) {
+ TVector<TString> parts;
+ Split(TString(line), " ", parts);
+ // name sum min max avg count
+ if (parts.size() >= 7) {
+ auto name = parts[1];
+ TCounters::TEntry entry;
+ if (
+ TryFromString<i64>(parts[2], entry.Sum) &&
+ TryFromString<i64>(parts[3], entry.Min) &&
+ TryFromString<i64>(parts[4], entry.Max) &&
+ TryFromString<i64>(parts[5], entry.Avg) &&
+ TryFromString<i64>(parts[6], entry.Count))
+ {
+ Stat.AddCounter(name, entry);
+ }
+ }
+ } else {
+ filteredMessage += line;
+ filteredMessage += "\n";
+ }
+ }
+ issue->set_message(filteredMessage);
+ }
+
+ void OnErrorFromPipe(NDq::TEvDq::TEvAbortExecution::TPtr& ev, const TActorContext&) {
+ for (size_t i = 0; i < ev->Get()->Record.IssuesSize(); i++) {
+ ExtractStats(ev->Get()->Record.MutableIssues(i));
+ }
+ // hacky conversion to TEvDqFailure
+ auto convertedError = MakeHolder<TEvDqFailure>();
+ convertedError->Record.SetRetriable(NCommon::IsRetriable(ev));
+ convertedError->Record.SetNeedFallback(NCommon::NeedFallback(ev));
+ convertedError->Record.MutableIssues()->Swap(ev->Get()->Record.MutableIssues());
+ SendFailure(std::move(convertedError)); // enreached with stats inside
+ }
+
+ void OnError(TEvDqFailure::TPtr& ev, const TActorContext&) {
SendFailure(ev->Release());
}
diff --git a/ydb/library/yql/providers/dq/common/CMakeLists.txt b/ydb/library/yql/providers/dq/common/CMakeLists.txt
index 78e044f65e2..8fb26007ac1 100644
--- a/ydb/library/yql/providers/dq/common/CMakeLists.txt
+++ b/ydb/library/yql/providers/dq/common/CMakeLists.txt
@@ -19,6 +19,7 @@ target_link_libraries(providers-dq-common PUBLIC
library-yql-sql
library-yql-utils
yql-utils-log
+ yql-dq-actors
)
target_sources(providers-dq-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/common/attrs.cpp
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp
index bd339634427..b77b1c21540 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp
@@ -1,5 +1,7 @@
#include "yql_dq_common.h"
+#include <ydb/library/yql/core/issue/protos/issue_id.pb.h>
+
#include <ydb/library/yql/utils/yql_panic.h>
#include <ydb/library/yql/minikql/mkql_alloc.h>
@@ -92,6 +94,20 @@ bool ParseCounterName(TString* prefix, std::map<TString, TString>* labels, TStri
return !name->empty();
}
+bool IsRetriable(const NDq::TEvDq::TEvAbortExecution::TPtr& ev) {
+ const auto& ydbStatusId = ev->Get()->Record.GetStatusCode();
+ return ydbStatusId != Ydb::StatusIds::BAD_REQUEST;
+}
+
+bool NeedFallback(const NDq::TEvDq::TEvAbortExecution::TPtr& ev) {
+ const auto& issues = ev->Get()->GetIssues();
+ for (auto it = issues.begin(); it < issues.end(); it++) {
+ if (it->GetCode() == TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR) {
+ return true;
+ }
+ }
+ return false;
+}
} // namespace NCommon
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_common.h b/ydb/library/yql/providers/dq/common/yql_dq_common.h
index c1c865a7e5b..3620862b128 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_common.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_common.h
@@ -1,5 +1,7 @@
#pragma once
+#include <ydb/library/yql/dq/actors/dq.h>
+
#include <util/generic/string.h>
#include <map>
@@ -12,5 +14,8 @@ TString GetSerializedResultType(const TString& program);
bool ParseCounterName(TString* prefix, std::map<TString, TString>* labels, TString* name, const TString& counterName);
+bool IsRetriable(const NDq::TEvDq::TEvAbortExecution::TPtr& ev);
+
+bool NeedFallback(const NDq::TEvDq::TEvAbortExecution::TPtr& ev);
} // namespace NCommon
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index af7151f48b5..11f6dc01037 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -82,9 +82,8 @@ private:
bool Retriable;
bool Fallback;
TString FilteredStderr;
- NDqProto::TDqFailure MetricContainer;
- TRunResult() : Retriable(false), Fallback(false), FilteredStderr(), MetricContainer() {
+ TRunResult() : Retriable(false), Fallback(false), FilteredStderr() {
}
};
@@ -98,36 +97,8 @@ private:
}
TRunResult result;
- NYql::TCounters stat;
for (TStringBuf line: StringSplitter(input).SplitByString("\n").SkipEmpty()) {
- if (line.StartsWith("Counter1:")) {
- TVector<TString> parts;
- Split(TString(line), " ", parts);
- if (parts.size() >= 3) {
- auto name = parts[1];
- i64 value;
- if (TryFromString<i64>(parts[2], value)) {
- stat.AddCounter(name, TDuration::MilliSeconds(value));
- }
- }
- } else if (line.StartsWith("Counter:")) {
- TVector<TString> parts;
- Split(TString(line), " ", parts);
- // name sum min max avg count
- if (parts.size() >= 7) {
- auto name = parts[1];
- TCounters::TEntry entry;
- if (
- TryFromString<i64>(parts[2], entry.Sum) &&
- TryFromString<i64>(parts[3], entry.Min) &&
- TryFromString<i64>(parts[4], entry.Max) &&
- TryFromString<i64>(parts[5], entry.Avg) &&
- TryFromString<i64>(parts[6], entry.Count))
- {
- stat.AddCounter(name, entry);
- }
- }
- } else if (line.Contains("mlockall failed")) {
+ if (line.Contains("mlockall failed")) {
// skip
} else {
if (!result.Fallback) {
@@ -178,11 +149,10 @@ private:
result.FilteredStderr += "\n";
}
}
- stat.FlushCounters(result.MetricContainer);
return result;
}
- static THolder<IEventBase> StatusToError(
+ static THolder<TEvDq::TEvAbortExecution> StatusToError(
const TEvError::TStatus& status,
TIntrusivePtr<TDqConfiguration> settings,
ui64 stageId,
@@ -209,13 +179,7 @@ private:
issue.AddSubIssue(MakeIntrusive<TIssue>(YqlIssue(parsedPos.GetOrElse(TPosition()), TIssuesIds::DQ_GATEWAY_ERROR, TString{terminationMessage})));
}
}
-
- if (settings->EnableComputeActor.Get().GetOrElse(false)) {
- return MakeHolder<NDq::TEvDq::TEvAbortExecution>(runResult.Retriable ? Ydb::StatusIds::UNAVAILABLE : Ydb::StatusIds::BAD_REQUEST, TVector<TIssue>{issue});
- }
- auto dqFailure = MakeHolder<TEvDqFailure>(issue, runResult.Retriable, runResult.Fallback);
- dqFailure->Record.MutableMetric()->Swap(runResult.MetricContainer.MutableMetric());
- return dqFailure;
+ return MakeHolder<NDq::TEvDq::TEvAbortExecution>(runResult.Retriable ? Ydb::StatusIds::UNAVAILABLE : Ydb::StatusIds::BAD_REQUEST, TVector<TIssue>{issue});
}
void PassAway() override {