aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Ozeritskiy <aozeritsky@ydb.tech>2023-12-19 16:27:42 +0100
committerGitHub <noreply@github.com>2023-12-19 16:27:42 +0100
commit55d122badd479a8e15c18fe91cf146d92158cdfc (patch)
treec4deb6e98006819badd49f651f773e998ad41b75
parentf2ff8549f3184926643ab5a830eb3d341e0fc1f8 (diff)
downloadydb-55d122badd479a8e15c18fe91cf146d92158cdfc.tar.gz
TaskRunnerActor accepts unknown events (#576)
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp26
1 files changed, 17 insertions, 9 deletions
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index afb384be683..dd5d81aa4c2 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -136,15 +136,23 @@ public:
}
}
- STRICT_STFUNC(Handler, {
- cFunc(NActors::TEvents::TEvPoison::EventType, TTaskRunnerActor::PassAway);
- hFunc(TEvTaskRunnerCreate, OnDqTask);
- hFunc(TEvContinueRun, OnContinueRun);
- hFunc(TEvPop, OnChannelPop);
- hFunc(TEvPush, OnChannelPush);
- hFunc(TEvSinkPop, OnSinkPop);
- hFunc(TEvSinkPopFinished, OnSinkPopFinished);
- })
+ STFUNC(Handler) {
+ switch (ev->GetTypeRewrite()) {
+ cFunc(NActors::TEvents::TEvPoison::EventType, TTaskRunnerActor::PassAway);
+ hFunc(TEvTaskRunnerCreate, OnDqTask);
+ hFunc(TEvContinueRun, OnContinueRun);
+ hFunc(TEvPop, OnChannelPop);
+ hFunc(TEvPush, OnChannelPush);
+ hFunc(TEvSinkPop, OnSinkPop);
+ hFunc(TEvSinkPopFinished, OnSinkPopFinished);
+ default: {
+ auto message = TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite() << " (" << ev->GetTypeName() << ")" << " stageId: " << StageId;
+ auto issue = TIssue(message).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR);
+ auto reply = MakeHolder<NDq::TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TVector<TIssue>{issue});
+ Send(ParentId, reply.Release());
+ }
+ }
+ }
private:
static std::pair<NYql::NDqProto::StatusIds::StatusCode, TString> ParseStderr(const TString& input, TIntrusivePtr<TDqConfiguration> settings) {