aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2022-08-10 01:08:22 +0300
committerspuchin <spuchin@ydb.tech>2022-08-10 01:08:22 +0300
commit432e33c7f18f5b6c2c253eec913d041bad139390 (patch)
tree3c351366e144ed71995e2a5066f265c691cc21ea
parentdd2db1bb6afc3705b308e83bef1a467ac38d4427 (diff)
downloadydb-432e33c7f18f5b6c2c253eec913d041bad139390.tar.gz
Catch execeptions on top level in KQP worker actor. ()
-rw-r--r--ydb/core/kqp/kqp_worker_actor.cpp133
1 files changed, 90 insertions, 43 deletions
diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp
index a0537876514..acdd4f1c34f 100644
--- a/ydb/core/kqp/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/kqp_worker_actor.cpp
@@ -683,61 +683,77 @@ public:
}
STFUNC(ReadyState) {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvKqp::TEvQueryRequest, HandleReady);
- HFunc(TEvKqp::TEvCompileResponse, HandleReady);
- HFunc(TEvKqp::TEvCloseSessionRequest, HandleReady);
- HFunc(TEvKqp::TEvPingSessionRequest, HandleReady);
- HFunc(TEvKqp::TEvContinueProcess, HandleReady);
- HFunc(TEvKqp::TEvIdleTimeout, HandleReady);
- HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown);
- HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown);
- default:
- Y_FAIL("TKqpWorkerActor, ReadyState: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite());
+ try {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvKqp::TEvQueryRequest, HandleReady);
+ HFunc(TEvKqp::TEvCompileResponse, HandleReady);
+ HFunc(TEvKqp::TEvCloseSessionRequest, HandleReady);
+ HFunc(TEvKqp::TEvPingSessionRequest, HandleReady);
+ HFunc(TEvKqp::TEvContinueProcess, HandleReady);
+ HFunc(TEvKqp::TEvIdleTimeout, HandleReady);
+ HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown);
+ HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown);
+ default:
+ UnexpectedEvent("ReadyState", ev, ctx);
+ }
+ } catch (const yexception& ex) {
+ InternalError(ex.what(), ctx);
}
}
STFUNC(CompileQueryState) {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvKqp::TEvQueryRequest, HandleCompileQuery);
- HFunc(TEvKqp::TEvCompileResponse, HandleCompileQuery);
- HFunc(TEvKqp::TEvCloseSessionRequest, HandleCompileQuery);
- HFunc(TEvKqp::TEvPingSessionRequest, HandleCompileQuery);
- HFunc(TEvKqp::TEvIdleTimeout, HandleCompileQuery);
- HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown);
- HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown);
- default:
- Y_FAIL("TKqpWorkerActor, CompileQueryState: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite());
+ try {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvKqp::TEvQueryRequest, HandleCompileQuery);
+ HFunc(TEvKqp::TEvCompileResponse, HandleCompileQuery);
+ HFunc(TEvKqp::TEvCloseSessionRequest, HandleCompileQuery);
+ HFunc(TEvKqp::TEvPingSessionRequest, HandleCompileQuery);
+ HFunc(TEvKqp::TEvIdleTimeout, HandleCompileQuery);
+ HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown);
+ HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown);
+ default:
+ UnexpectedEvent("CompileQueryState", ev, ctx);
+ }
+ } catch (const yexception& ex) {
+ InternalError(ex.what(), ctx);
}
}
STFUNC(PerformQueryState) {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvKqp::TEvQueryRequest, HandlePerformQuery);
- HFunc(TEvKqp::TEvCompileResponse, HandlePerformQuery);
- HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformQuery);
- HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformQuery);
- HFunc(TEvKqp::TEvContinueProcess, HandlePerformQuery);
- HFunc(TEvKqp::TEvIdleTimeout, HandlePerformQuery);
- HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown);
- HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown);
- default:
- Y_FAIL("TKqpWorkerActor, PerformQueryState: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite());
+ try {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvKqp::TEvQueryRequest, HandlePerformQuery);
+ HFunc(TEvKqp::TEvCompileResponse, HandlePerformQuery);
+ HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformQuery);
+ HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformQuery);
+ HFunc(TEvKqp::TEvContinueProcess, HandlePerformQuery);
+ HFunc(TEvKqp::TEvIdleTimeout, HandlePerformQuery);
+ HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown);
+ HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown);
+ default:
+ UnexpectedEvent("PerformQueryState", ev, ctx);
+ }
+ } catch (const yexception& ex) {
+ InternalError(ex.what(), ctx);
}
}
STFUNC(PerformCleanupState) {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvKqp::TEvQueryRequest, HandlePerformCleanup);
- HFunc(TEvKqp::TEvCompileResponse, HandlePerformCleanup);
- HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformCleanup);
- HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformCleanup);
- HFunc(TEvKqp::TEvContinueProcess, HandlePerformCleanup);
- HFunc(TEvKqp::TEvIdleTimeout, HandlePerformCleanup);
- HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown);
- HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown);
- default:
- Y_FAIL("TKqpWorkerActor, PerformCleanupState: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite());
+ try {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvKqp::TEvQueryRequest, HandlePerformCleanup);
+ HFunc(TEvKqp::TEvCompileResponse, HandlePerformCleanup);
+ HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformCleanup);
+ HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformCleanup);
+ HFunc(TEvKqp::TEvContinueProcess, HandlePerformCleanup);
+ HFunc(TEvKqp::TEvIdleTimeout, HandlePerformCleanup);
+ HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown);
+ HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown);
+ default:
+ UnexpectedEvent("PerformCleanupState", ev, ctx);
+ }
+ } catch (const yexception& ex) {
+ InternalError(ex.what(), ctx);
}
}
@@ -2095,6 +2111,37 @@ private:
}
private:
+ void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev, const TActorContext& ctx) {
+ TString message = TStringBuilder() << "TKqpWorkerActor in state "
+ << state << " received unexpected event "
+ << TypeName(*ev.Get()->GetBase()) << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite());
+
+ InternalError(message, ctx);
+ }
+
+ void InternalError(const TString& message, const TActorContext& ctx) {
+ LOG_ERROR_S(ctx, NKikimrServices::KQP_WORKER, "Internal error, SelfId: "
+ << SelfId() << ", message: " << message);
+
+ if (QueryState) {
+ auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
+ ReplyProcessError(QueryState->Sender, QueryState->ProxyRequestId, requestInfo,
+ Ydb::StatusIds::INTERNAL_ERROR, message, ctx);
+ }
+
+ auto lifeSpan = TInstant::Now() - CreationTime;
+ Counters->ReportWorkerFinished(Settings.DbCounters, lifeSpan);
+
+ auto closeEv = MakeHolder<TEvKqp::TEvCloseSessionResponse>();
+ closeEv->Record.SetStatus(Ydb::StatusIds::SUCCESS);
+ closeEv->Record.MutableResponse()->SetSessionId(SessionId);
+ closeEv->Record.MutableResponse()->SetClosed(true);
+ ctx.Send(Owner, closeEv.Release());
+
+ Die(ctx);
+ }
+
+private:
TActorId Owner;
TString SessionId;
TKqpWorkerSettings Settings;