diff options
author | gvit <gvit@ydb.tech> | 2023-11-23 15:34:23 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-11-23 16:25:41 +0300 |
commit | b07264456a2e8b5929901f258ad60399bb64678a (patch) | |
tree | 304e0fdaaa62556b6595f046f6dc4ad42949d025 | |
parent | 207ac81618e05ade724a8a8193bc9125d466bd06 (diff) | |
download | ydb-b07264456a2e8b5929901f258ad60399bb64678a.tar.gz |
continue states refactoring in session actor
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 124 |
1 files changed, 34 insertions, 90 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 934dfbd3b5..c87164a2c6 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -320,7 +320,12 @@ public: Cleanup(); } - void HandleReady(TEvKqp::TEvQueryRequest::TPtr& ev, const NActors::TActorContext& ctx) { + void Handle(TEvKqp::TEvQueryRequest::TPtr& ev) { + if (CurrentStateFunc() != &TThis::ReadyState) { + ReplyBusy(ev); + return; + } + ui64 proxyRequestId = ev->Cookie; YQL_ENSURE(ev->Get()->GetSessionId() == SessionId, "Invalid session, expected: " << SessionId << ", got: " << ev->Get()->GetSessionId()); @@ -415,7 +420,7 @@ public: return ForwardRequest(ev); case NKikimrKqp::QUERY_ACTION_TOPIC: - return AddOffsetsToTransaction(ctx); + return AddOffsetsToTransaction(); } QueryState->UpdateTempTablesState(TempTablesState); @@ -423,7 +428,7 @@ public: CompileQuery(); } - void AddOffsetsToTransaction(const NActors::TActorContext& ctx) { + void AddOffsetsToTransaction() { YQL_ENSURE(QueryState); if (!PrepareQueryTransaction()) { return; @@ -434,7 +439,7 @@ public: auto navigate = QueryState->BuildSchemeCacheNavigate(); Become(&TKqpSessionActor::TopicOpsState); - ctx.Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release())); + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release())); } void CompileQuery() { @@ -442,11 +447,7 @@ public: auto ev = QueryState->BuildCompileRequest(CompilationCookie); LOG_D("Sending CompileQuery request"); Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId); - Become(&TKqpSessionActor::CompileState); - } - - void HandleCompile(TEvKqp::TEvQueryRequest::TPtr& ev) { - ReplyBusy(ev); + Become(&TKqpSessionActor::ExecuteState); } void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { @@ -474,7 +475,6 @@ public: return; } - YQL_ENSURE(QueryState); TTimerGuard timer(this); @@ -1085,10 +1085,6 @@ public: void HandleNoop(T&) { } - void HandleExecute(TEvKqp::TEvQueryRequest::TPtr& ev) { - ReplyBusy(ev); - } - bool MergeLocksWithTxResult(const NKikimrKqp::TExecuterTxResult& result) { if (result.HasLocks()) { auto& txCtx = QueryState->TxCtx; @@ -1272,22 +1268,12 @@ public: msg.GetStatusCode(), "Request timeout exceeded"); Send(ExecuterId, abortEv.Release(), IEventHandle::FlagTrackDelivery); + } else { + const auto& issues = ev->Get()->GetIssues(); + ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), logMsg, MessageFromIssues(issues)); } } - void HandleCompile(TEvKqp::TEvAbortExecution::TPtr& ev) { - auto& msg = ev->Get()->Record; - - TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName(); - LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode())); - - const auto& issues = ev->Get()->GetIssues(); - - YQL_ENSURE(!ExecuterId); - - ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), logMsg, MessageFromIssues(issues)); - } - void CollectSystemViewQueryStats(const NKqpProto::TKqpStatsQuery* stats, TDuration queryDuration, const TString& database, ui64 requestUnits) { @@ -1668,13 +1654,6 @@ public: CleanupAndPassAway(); } - void HandleCompile(TEvKqp::TEvCloseSessionRequest::TPtr&) { - YQL_ENSURE(QueryState); - ReplyQueryError(Ydb::StatusIds::BAD_SESSION, - "Request cancelled due to explicit session close request"); - Counters->ReportSessionActorClosedRequest(Settings.DbCounters); - } - void HandleExecute(TEvKqp::TEvCloseSessionRequest::TPtr&) { YQL_ENSURE(QueryState); QueryState->KeepSession = false; @@ -1740,10 +1719,6 @@ public: } } - void HandleCleanup(TEvKqp::TEvQueryRequest::TPtr& ev) { - ReplyBusy(ev); - } - void CleanupAndPassAway() { Cleanup(true); } @@ -1921,9 +1896,6 @@ public: FillTxInfo(response); ExecuterId = TActorId{}; - if (CurrentStateFunc() == &TThis::CompileState && ydbStatus == Ydb::StatusIds::TIMEOUT) { - FillSystemViewQueryStats(nullptr); - } Cleanup(IsFatalError(ydbStatus)); } @@ -1960,12 +1932,13 @@ public: STFUNC(ReadyState) { try { switch (ev->GetTypeRewrite()) { - HFunc(TEvKqp::TEvQueryRequest, HandleReady); - - hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); - hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady); + // common event handles for all states. hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); + hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); + hFunc(TEvKqp::TEvQueryRequest, Handle); + + hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady); hFunc(TEvKqp::TEvCancelQueryRequest, Handle); // forgotten messages from previous aborted request @@ -1985,55 +1958,29 @@ public: } } - STATEFN(CompileState) { + STATEFN(ExecuteState) { try { switch (ev->GetTypeRewrite()) { - hFunc(TEvKqp::TEvQueryRequest, HandleCompile); - hFunc(TEvKqp::TEvCompileResponse, Handle); - - hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); - hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleCompile); - hFunc(TEvKqp::TEvCloseSessionRequest, HandleCompile); + // common event handles for all states. hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); - hFunc(NGRpcService::TEvClientLost, HandleClientLost); - hFunc(TEvKqp::TEvCancelQueryRequest, Handle); - - // forgotten messages from previous aborted request - hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop); - hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); - hFunc(TEvents::TEvUndelivered, HandleNoop); - default: - UnexpectedEvent("CompileState", ev); - } - } catch (const TRequestFail& ex) { - ReplyQueryError(ex.Status, ex.what(), ex.Issues); - } catch (const yexception& ex) { - InternalError(ex.what()); - } - } + hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); + hFunc(TEvKqp::TEvQueryRequest, Handle); - STATEFN(ExecuteState) { - try { - switch (ev->GetTypeRewrite()) { - hFunc(TEvKqp::TEvQueryRequest, HandleExecute); hFunc(TEvKqpExecuter::TEvTxResponse, HandleExecute); hFunc(TEvKqpExecuter::TEvStreamData, HandleExecute); hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute); hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleExecute); - hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute); - hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); - hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(NGRpcService::TEvClientLost, HandleClientLost); hFunc(TEvKqp::TEvCancelQueryRequest, Handle); // forgotten messages from previous aborted request hFunc(TEvKqp::TEvCompileResponse, Handle); - hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop); + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); hFunc(TEvents::TEvUndelivered, HandleNoop); // always come from WorkerActor @@ -2052,13 +1999,15 @@ public: STATEFN(CleanupState) { try { switch (ev->GetTypeRewrite()) { - hFunc(TEvKqp::TEvQueryRequest, HandleCleanup); + // common event handles for all states. + hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); + hFunc(TEvKqp::TEvContinueShutdown, Handle); + hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); + hFunc(TEvKqp::TEvQueryRequest, Handle); + hFunc(TEvKqpExecuter::TEvTxResponse, HandleCleanup); - hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, HandleCleanup); - hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); - hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(NGRpcService::TEvClientLost, HandleNoop); hFunc(TEvKqp::TEvCancelQueryRequest, HandleNoop); @@ -2090,15 +2039,16 @@ public: STATEFN(TopicOpsState) { try { switch (ev->GetTypeRewrite()) { - hFunc(TEvKqp::TEvQueryRequest, HandleTopicOps); - + // common event handles for all states. + hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); + hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); + hFunc(TEvKqp::TEvQueryRequest, Handle); + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleTopicOps); hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleTopicOps); hFunc(TEvKqp::TEvCloseSessionRequest, HandleTopicOps); - hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); - hFunc(TEvKqp::TEvContinueShutdown, Handle); default: UnexpectedEvent("TopicOpsState", ev); } @@ -2119,8 +2069,6 @@ private: return "ExecuteState"; } else if (func == &TThis::TopicOpsState) { return "TopicOpsState"; - } else if (func == &TThis::CompileState) { - return "CompileState"; } else if (func == &TThis::CleanupState) { return "CleanupState"; } else { @@ -2142,10 +2090,6 @@ private: } } - void HandleTopicOps(TEvKqp::TEvQueryRequest::TPtr& ev) { - ReplyBusy(ev); - } - void HandleTopicOps(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { YQL_ENSURE(ev->Get()->Request); if (ev->Get()->Request->Cookie < QueryId) { |