diff options
author | gvit <gvit@ydb.tech> | 2023-11-24 18:16:34 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-11-24 18:56:28 +0300 |
commit | e1c0089c3585434787e5aa7ef923b916f0807e0a (patch) | |
tree | d745f070223f64ed82c1fe1dc894b252e375bd10 | |
parent | 34a1ed43d2baff7d536451887b2f9744d30dfd43 (diff) | |
download | ydb-e1c0089c3585434787e5aa7ef923b916f0807e0a.tar.gz |
remove specific topic ops state, use execute state KIKIMR-19901
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 53 |
1 files changed, 19 insertions, 34 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 94620b6427..a2739662f1 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -438,7 +438,7 @@ public: auto navigate = QueryState->BuildSchemeCacheNavigate(); - Become(&TKqpSessionActor::TopicOpsState); + Become(&TKqpSessionActor::ExecuteState); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release())); } @@ -453,11 +453,17 @@ public: void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { const auto* response = ev->Get(); YQL_ENSURE(response->Request); + YQL_ENSURE(QueryState); // outdated response from scheme cache. // ignoring that. if (response->Request->Cookie < QueryId) return; + if (QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_TOPIC) { + ProcessTopicOps(ev); + return; + } + // table versions are not the same. need the query recompilation. if (!QueryState->EnsureTableVersions(*response)) { auto ev = QueryState->BuildReCompileRequest(CompilationCookie); @@ -1950,6 +1956,8 @@ public: hFunc(TEvents::TEvUndelivered, HandleNoop); // message from KQP proxy in case of our reply just after kqp proxy timer tick hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleNoop); + hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleNoop); + default: UnexpectedEvent("ReadyState", ev); } @@ -1968,6 +1976,7 @@ public: hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); hFunc(TEvKqp::TEvQueryRequest, Handle); + hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle); hFunc(TEvKqpExecuter::TEvTxResponse, HandleExecute); @@ -2018,6 +2027,7 @@ public: hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleNoop); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop); hFunc(TEvents::TEvUndelivered, HandleNoop); + hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleNoop); // always come from WorkerActor hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup); @@ -2038,29 +2048,6 @@ public: } } - STATEFN(TopicOpsState) { - try { - switch (ev->GetTypeRewrite()) { - // common event handles for all states. - hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); - hFunc(TEvKqp::TEvContinueShutdown, Handle); - hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); - hFunc(TEvKqp::TEvQueryRequest, Handle); - - hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleTopicOps); - hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleTopicOps); - - hFunc(TEvKqp::TEvCloseSessionRequest, HandleTopicOps); - default: - UnexpectedEvent("TopicOpsState", ev); - } - } catch (const TRequestFail& ex) { - ReplyQueryError(ex.Status, ex.what(), ex.Issues); - } catch (const yexception& ex) { - InternalError(ex.what()); - } - } - private: TString CurrentStateFuncName() const { @@ -2069,8 +2056,6 @@ private: return "ReadyState"; } else if (func == &TThis::ExecuteState) { return "ExecuteState"; - } else if (func == &TThis::TopicOpsState) { - return "TopicOpsState"; } else if (func == &TThis::CleanupState) { return "CleanupState"; } else { @@ -2092,7 +2077,7 @@ private: } } - void HandleTopicOps(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + void ProcessTopicOps(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { YQL_ENSURE(ev->Get()->Request); if (ev->Get()->Request->Cookie < QueryId) { return; @@ -2120,19 +2105,19 @@ private: } if (HasTopicWriteOperations() && !HasTopicWriteId()) { - Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId); + Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId, 0, QueryState->QueryId); } else { ReplySuccess(); } } - void HandleTopicOps(TEvKqp::TEvCloseSessionRequest::TPtr&) { - YQL_ENSURE(QueryState); - ReplyQueryError(Ydb::StatusIds::BAD_SESSION, "Request cancelled due to explicit session close request"); - Counters->ReportSessionActorClosedRequest(Settings.DbCounters); - } + void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) { + if (CurrentStateFunc() != &TThis::ExecuteState || ev->Cookie < QueryId) { + return; + } - void HandleTopicOps(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) { + YQL_ENSURE(QueryState); + YQL_ENSURE(QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_TOPIC); SetTopicWriteId(NLongTxService::TLockHandle(ev->Get()->TxId, TActivationContext::ActorSystem())); ReplySuccess(); } |