diff options
author | spuchin <spuchin@ydb.tech> | 2022-08-10 01:08:22 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2022-08-10 01:08:22 +0300 |
commit | 432e33c7f18f5b6c2c253eec913d041bad139390 (patch) | |
tree | 3c351366e144ed71995e2a5066f265c691cc21ea | |
parent | dd2db1bb6afc3705b308e83bef1a467ac38d4427 (diff) | |
download | ydb-432e33c7f18f5b6c2c253eec913d041bad139390.tar.gz |
Catch execeptions on top level in KQP worker actor. ()
-rw-r--r-- | ydb/core/kqp/kqp_worker_actor.cpp | 133 |
1 files changed, 90 insertions, 43 deletions
diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp index a0537876514..acdd4f1c34f 100644 --- a/ydb/core/kqp/kqp_worker_actor.cpp +++ b/ydb/core/kqp/kqp_worker_actor.cpp @@ -683,61 +683,77 @@ public: } STFUNC(ReadyState) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvKqp::TEvQueryRequest, HandleReady); - HFunc(TEvKqp::TEvCompileResponse, HandleReady); - HFunc(TEvKqp::TEvCloseSessionRequest, HandleReady); - HFunc(TEvKqp::TEvPingSessionRequest, HandleReady); - HFunc(TEvKqp::TEvContinueProcess, HandleReady); - HFunc(TEvKqp::TEvIdleTimeout, HandleReady); - HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown); - HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown); - default: - Y_FAIL("TKqpWorkerActor, ReadyState: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); + try { + switch (ev->GetTypeRewrite()) { + HFunc(TEvKqp::TEvQueryRequest, HandleReady); + HFunc(TEvKqp::TEvCompileResponse, HandleReady); + HFunc(TEvKqp::TEvCloseSessionRequest, HandleReady); + HFunc(TEvKqp::TEvPingSessionRequest, HandleReady); + HFunc(TEvKqp::TEvContinueProcess, HandleReady); + HFunc(TEvKqp::TEvIdleTimeout, HandleReady); + HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown); + HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown); + default: + UnexpectedEvent("ReadyState", ev, ctx); + } + } catch (const yexception& ex) { + InternalError(ex.what(), ctx); } } STFUNC(CompileQueryState) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvKqp::TEvQueryRequest, HandleCompileQuery); - HFunc(TEvKqp::TEvCompileResponse, HandleCompileQuery); - HFunc(TEvKqp::TEvCloseSessionRequest, HandleCompileQuery); - HFunc(TEvKqp::TEvPingSessionRequest, HandleCompileQuery); - HFunc(TEvKqp::TEvIdleTimeout, HandleCompileQuery); - HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown); - HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown); - default: - Y_FAIL("TKqpWorkerActor, CompileQueryState: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); + try { + switch (ev->GetTypeRewrite()) { + HFunc(TEvKqp::TEvQueryRequest, HandleCompileQuery); + HFunc(TEvKqp::TEvCompileResponse, HandleCompileQuery); + HFunc(TEvKqp::TEvCloseSessionRequest, HandleCompileQuery); + HFunc(TEvKqp::TEvPingSessionRequest, HandleCompileQuery); + HFunc(TEvKqp::TEvIdleTimeout, HandleCompileQuery); + HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown); + HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown); + default: + UnexpectedEvent("CompileQueryState", ev, ctx); + } + } catch (const yexception& ex) { + InternalError(ex.what(), ctx); } } STFUNC(PerformQueryState) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvKqp::TEvQueryRequest, HandlePerformQuery); - HFunc(TEvKqp::TEvCompileResponse, HandlePerformQuery); - HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformQuery); - HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformQuery); - HFunc(TEvKqp::TEvContinueProcess, HandlePerformQuery); - HFunc(TEvKqp::TEvIdleTimeout, HandlePerformQuery); - HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown); - HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown); - default: - Y_FAIL("TKqpWorkerActor, PerformQueryState: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); + try { + switch (ev->GetTypeRewrite()) { + HFunc(TEvKqp::TEvQueryRequest, HandlePerformQuery); + HFunc(TEvKqp::TEvCompileResponse, HandlePerformQuery); + HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformQuery); + HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformQuery); + HFunc(TEvKqp::TEvContinueProcess, HandlePerformQuery); + HFunc(TEvKqp::TEvIdleTimeout, HandlePerformQuery); + HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown); + HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown); + default: + UnexpectedEvent("PerformQueryState", ev, ctx); + } + } catch (const yexception& ex) { + InternalError(ex.what(), ctx); } } STFUNC(PerformCleanupState) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvKqp::TEvQueryRequest, HandlePerformCleanup); - HFunc(TEvKqp::TEvCompileResponse, HandlePerformCleanup); - HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformCleanup); - HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformCleanup); - HFunc(TEvKqp::TEvContinueProcess, HandlePerformCleanup); - HFunc(TEvKqp::TEvIdleTimeout, HandlePerformCleanup); - HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown); - HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown); - default: - Y_FAIL("TKqpWorkerActor, PerformCleanupState: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); + try { + switch (ev->GetTypeRewrite()) { + HFunc(TEvKqp::TEvQueryRequest, HandlePerformCleanup); + HFunc(TEvKqp::TEvCompileResponse, HandlePerformCleanup); + HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformCleanup); + HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformCleanup); + HFunc(TEvKqp::TEvContinueProcess, HandlePerformCleanup); + HFunc(TEvKqp::TEvIdleTimeout, HandlePerformCleanup); + HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown); + HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown); + default: + UnexpectedEvent("PerformCleanupState", ev, ctx); + } + } catch (const yexception& ex) { + InternalError(ex.what(), ctx); } } @@ -2095,6 +2111,37 @@ private: } private: + void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev, const TActorContext& ctx) { + TString message = TStringBuilder() << "TKqpWorkerActor in state " + << state << " received unexpected event " + << TypeName(*ev.Get()->GetBase()) << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()); + + InternalError(message, ctx); + } + + void InternalError(const TString& message, const TActorContext& ctx) { + LOG_ERROR_S(ctx, NKikimrServices::KQP_WORKER, "Internal error, SelfId: " + << SelfId() << ", message: " << message); + + if (QueryState) { + auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); + ReplyProcessError(QueryState->Sender, QueryState->ProxyRequestId, requestInfo, + Ydb::StatusIds::INTERNAL_ERROR, message, ctx); + } + + auto lifeSpan = TInstant::Now() - CreationTime; + Counters->ReportWorkerFinished(Settings.DbCounters, lifeSpan); + + auto closeEv = MakeHolder<TEvKqp::TEvCloseSessionResponse>(); + closeEv->Record.SetStatus(Ydb::StatusIds::SUCCESS); + closeEv->Record.MutableResponse()->SetSessionId(SessionId); + closeEv->Record.MutableResponse()->SetClosed(true); + ctx.Send(Owner, closeEv.Release()); + + Die(ctx); + } + +private: TActorId Owner; TString SessionId; TKqpWorkerSettings Settings; |