aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-11-23 15:34:23 +0300
committergvit <gvit@ydb.tech>2023-11-23 16:25:41 +0300
commitb07264456a2e8b5929901f258ad60399bb64678a (patch)
tree304e0fdaaa62556b6595f046f6dc4ad42949d025
parent207ac81618e05ade724a8a8193bc9125d466bd06 (diff)
downloadydb-b07264456a2e8b5929901f258ad60399bb64678a.tar.gz
continue states refactoring in session actor
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp124
1 files changed, 34 insertions, 90 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 934dfbd3b5..c87164a2c6 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -320,7 +320,12 @@ public:
Cleanup();
}
- void HandleReady(TEvKqp::TEvQueryRequest::TPtr& ev, const NActors::TActorContext& ctx) {
+ void Handle(TEvKqp::TEvQueryRequest::TPtr& ev) {
+ if (CurrentStateFunc() != &TThis::ReadyState) {
+ ReplyBusy(ev);
+ return;
+ }
+
ui64 proxyRequestId = ev->Cookie;
YQL_ENSURE(ev->Get()->GetSessionId() == SessionId,
"Invalid session, expected: " << SessionId << ", got: " << ev->Get()->GetSessionId());
@@ -415,7 +420,7 @@ public:
return ForwardRequest(ev);
case NKikimrKqp::QUERY_ACTION_TOPIC:
- return AddOffsetsToTransaction(ctx);
+ return AddOffsetsToTransaction();
}
QueryState->UpdateTempTablesState(TempTablesState);
@@ -423,7 +428,7 @@ public:
CompileQuery();
}
- void AddOffsetsToTransaction(const NActors::TActorContext& ctx) {
+ void AddOffsetsToTransaction() {
YQL_ENSURE(QueryState);
if (!PrepareQueryTransaction()) {
return;
@@ -434,7 +439,7 @@ public:
auto navigate = QueryState->BuildSchemeCacheNavigate();
Become(&TKqpSessionActor::TopicOpsState);
- ctx.Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
}
void CompileQuery() {
@@ -442,11 +447,7 @@ public:
auto ev = QueryState->BuildCompileRequest(CompilationCookie);
LOG_D("Sending CompileQuery request");
Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId);
- Become(&TKqpSessionActor::CompileState);
- }
-
- void HandleCompile(TEvKqp::TEvQueryRequest::TPtr& ev) {
- ReplyBusy(ev);
+ Become(&TKqpSessionActor::ExecuteState);
}
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
@@ -474,7 +475,6 @@ public:
return;
}
-
YQL_ENSURE(QueryState);
TTimerGuard timer(this);
@@ -1085,10 +1085,6 @@ public:
void HandleNoop(T&) {
}
- void HandleExecute(TEvKqp::TEvQueryRequest::TPtr& ev) {
- ReplyBusy(ev);
- }
-
bool MergeLocksWithTxResult(const NKikimrKqp::TExecuterTxResult& result) {
if (result.HasLocks()) {
auto& txCtx = QueryState->TxCtx;
@@ -1272,22 +1268,12 @@ public:
msg.GetStatusCode(),
"Request timeout exceeded");
Send(ExecuterId, abortEv.Release(), IEventHandle::FlagTrackDelivery);
+ } else {
+ const auto& issues = ev->Get()->GetIssues();
+ ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), logMsg, MessageFromIssues(issues));
}
}
- void HandleCompile(TEvKqp::TEvAbortExecution::TPtr& ev) {
- auto& msg = ev->Get()->Record;
-
- TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName();
- LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()));
-
- const auto& issues = ev->Get()->GetIssues();
-
- YQL_ENSURE(!ExecuterId);
-
- ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), logMsg, MessageFromIssues(issues));
- }
-
void CollectSystemViewQueryStats(const NKqpProto::TKqpStatsQuery* stats, TDuration queryDuration,
const TString& database, ui64 requestUnits)
{
@@ -1668,13 +1654,6 @@ public:
CleanupAndPassAway();
}
- void HandleCompile(TEvKqp::TEvCloseSessionRequest::TPtr&) {
- YQL_ENSURE(QueryState);
- ReplyQueryError(Ydb::StatusIds::BAD_SESSION,
- "Request cancelled due to explicit session close request");
- Counters->ReportSessionActorClosedRequest(Settings.DbCounters);
- }
-
void HandleExecute(TEvKqp::TEvCloseSessionRequest::TPtr&) {
YQL_ENSURE(QueryState);
QueryState->KeepSession = false;
@@ -1740,10 +1719,6 @@ public:
}
}
- void HandleCleanup(TEvKqp::TEvQueryRequest::TPtr& ev) {
- ReplyBusy(ev);
- }
-
void CleanupAndPassAway() {
Cleanup(true);
}
@@ -1921,9 +1896,6 @@ public:
FillTxInfo(response);
ExecuterId = TActorId{};
- if (CurrentStateFunc() == &TThis::CompileState && ydbStatus == Ydb::StatusIds::TIMEOUT) {
- FillSystemViewQueryStats(nullptr);
- }
Cleanup(IsFatalError(ydbStatus));
}
@@ -1960,12 +1932,13 @@ public:
STFUNC(ReadyState) {
try {
switch (ev->GetTypeRewrite()) {
- HFunc(TEvKqp::TEvQueryRequest, HandleReady);
-
- hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
- hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady);
+ // common event handles for all states.
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
+ hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
+ hFunc(TEvKqp::TEvQueryRequest, Handle);
+
+ hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady);
hFunc(TEvKqp::TEvCancelQueryRequest, Handle);
// forgotten messages from previous aborted request
@@ -1985,55 +1958,29 @@ public:
}
}
- STATEFN(CompileState) {
+ STATEFN(ExecuteState) {
try {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvKqp::TEvQueryRequest, HandleCompile);
- hFunc(TEvKqp::TEvCompileResponse, Handle);
-
- hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
- hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleCompile);
- hFunc(TEvKqp::TEvCloseSessionRequest, HandleCompile);
+ // common event handles for all states.
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
- hFunc(NGRpcService::TEvClientLost, HandleClientLost);
- hFunc(TEvKqp::TEvCancelQueryRequest, Handle);
-
- // forgotten messages from previous aborted request
- hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop);
- hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
- hFunc(TEvents::TEvUndelivered, HandleNoop);
- default:
- UnexpectedEvent("CompileState", ev);
- }
- } catch (const TRequestFail& ex) {
- ReplyQueryError(ex.Status, ex.what(), ex.Issues);
- } catch (const yexception& ex) {
- InternalError(ex.what());
- }
- }
+ hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
+ hFunc(TEvKqp::TEvQueryRequest, Handle);
- STATEFN(ExecuteState) {
- try {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvKqp::TEvQueryRequest, HandleExecute);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleExecute);
hFunc(TEvKqpExecuter::TEvStreamData, HandleExecute);
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute);
hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleExecute);
- hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute);
- hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
- hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(NGRpcService::TEvClientLost, HandleClientLost);
hFunc(TEvKqp::TEvCancelQueryRequest, Handle);
// forgotten messages from previous aborted request
hFunc(TEvKqp::TEvCompileResponse, Handle);
- hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop);
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
hFunc(TEvents::TEvUndelivered, HandleNoop);
// always come from WorkerActor
@@ -2052,13 +1999,15 @@ public:
STATEFN(CleanupState) {
try {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvKqp::TEvQueryRequest, HandleCleanup);
+ // common event handles for all states.
+ hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
+ hFunc(TEvKqp::TEvContinueShutdown, Handle);
+ hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
+ hFunc(TEvKqp::TEvQueryRequest, Handle);
+
hFunc(TEvKqpExecuter::TEvTxResponse, HandleCleanup);
- hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleCleanup);
- hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
- hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(NGRpcService::TEvClientLost, HandleNoop);
hFunc(TEvKqp::TEvCancelQueryRequest, HandleNoop);
@@ -2090,15 +2039,16 @@ public:
STATEFN(TopicOpsState) {
try {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvKqp::TEvQueryRequest, HandleTopicOps);
-
+ // common event handles for all states.
+ hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
+ hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
+ hFunc(TEvKqp::TEvQueryRequest, Handle);
+
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleTopicOps);
hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleTopicOps);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleTopicOps);
- hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
- hFunc(TEvKqp::TEvContinueShutdown, Handle);
default:
UnexpectedEvent("TopicOpsState", ev);
}
@@ -2119,8 +2069,6 @@ private:
return "ExecuteState";
} else if (func == &TThis::TopicOpsState) {
return "TopicOpsState";
- } else if (func == &TThis::CompileState) {
- return "CompileState";
} else if (func == &TThis::CleanupState) {
return "CleanupState";
} else {
@@ -2142,10 +2090,6 @@ private:
}
}
- void HandleTopicOps(TEvKqp::TEvQueryRequest::TPtr& ev) {
- ReplyBusy(ev);
- }
-
void HandleTopicOps(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
YQL_ENSURE(ev->Get()->Request);
if (ev->Get()->Request->Cookie < QueryId) {