aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-11-24 18:16:34 +0300
committergvit <gvit@ydb.tech>2023-11-24 18:56:28 +0300
commite1c0089c3585434787e5aa7ef923b916f0807e0a (patch)
treed745f070223f64ed82c1fe1dc894b252e375bd10
parent34a1ed43d2baff7d536451887b2f9744d30dfd43 (diff)
downloadydb-e1c0089c3585434787e5aa7ef923b916f0807e0a.tar.gz
remove specific topic ops state, use execute state KIKIMR-19901
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp53
1 files changed, 19 insertions, 34 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 94620b6427..a2739662f1 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -438,7 +438,7 @@ public:
auto navigate = QueryState->BuildSchemeCacheNavigate();
- Become(&TKqpSessionActor::TopicOpsState);
+ Become(&TKqpSessionActor::ExecuteState);
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
}
@@ -453,11 +453,17 @@ public:
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
const auto* response = ev->Get();
YQL_ENSURE(response->Request);
+ YQL_ENSURE(QueryState);
// outdated response from scheme cache.
// ignoring that.
if (response->Request->Cookie < QueryId)
return;
+ if (QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_TOPIC) {
+ ProcessTopicOps(ev);
+ return;
+ }
+
// table versions are not the same. need the query recompilation.
if (!QueryState->EnsureTableVersions(*response)) {
auto ev = QueryState->BuildReCompileRequest(CompilationCookie);
@@ -1950,6 +1956,8 @@ public:
hFunc(TEvents::TEvUndelivered, HandleNoop);
// message from KQP proxy in case of our reply just after kqp proxy timer tick
hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleNoop);
+ hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleNoop);
+
default:
UnexpectedEvent("ReadyState", ev);
}
@@ -1968,6 +1976,7 @@ public:
hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
hFunc(TEvKqp::TEvQueryRequest, Handle);
+ hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleExecute);
@@ -2018,6 +2027,7 @@ public:
hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleNoop);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop);
hFunc(TEvents::TEvUndelivered, HandleNoop);
+ hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleNoop);
// always come from WorkerActor
hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup);
@@ -2038,29 +2048,6 @@ public:
}
}
- STATEFN(TopicOpsState) {
- try {
- switch (ev->GetTypeRewrite()) {
- // common event handles for all states.
- hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
- hFunc(TEvKqp::TEvContinueShutdown, Handle);
- hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
- hFunc(TEvKqp::TEvQueryRequest, Handle);
-
- hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleTopicOps);
- hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleTopicOps);
-
- hFunc(TEvKqp::TEvCloseSessionRequest, HandleTopicOps);
- default:
- UnexpectedEvent("TopicOpsState", ev);
- }
- } catch (const TRequestFail& ex) {
- ReplyQueryError(ex.Status, ex.what(), ex.Issues);
- } catch (const yexception& ex) {
- InternalError(ex.what());
- }
- }
-
private:
TString CurrentStateFuncName() const {
@@ -2069,8 +2056,6 @@ private:
return "ReadyState";
} else if (func == &TThis::ExecuteState) {
return "ExecuteState";
- } else if (func == &TThis::TopicOpsState) {
- return "TopicOpsState";
} else if (func == &TThis::CleanupState) {
return "CleanupState";
} else {
@@ -2092,7 +2077,7 @@ private:
}
}
- void HandleTopicOps(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ void ProcessTopicOps(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
YQL_ENSURE(ev->Get()->Request);
if (ev->Get()->Request->Cookie < QueryId) {
return;
@@ -2120,19 +2105,19 @@ private:
}
if (HasTopicWriteOperations() && !HasTopicWriteId()) {
- Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId);
+ Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId, 0, QueryState->QueryId);
} else {
ReplySuccess();
}
}
- void HandleTopicOps(TEvKqp::TEvCloseSessionRequest::TPtr&) {
- YQL_ENSURE(QueryState);
- ReplyQueryError(Ydb::StatusIds::BAD_SESSION, "Request cancelled due to explicit session close request");
- Counters->ReportSessionActorClosedRequest(Settings.DbCounters);
- }
+ void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
+ if (CurrentStateFunc() != &TThis::ExecuteState || ev->Cookie < QueryId) {
+ return;
+ }
- void HandleTopicOps(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
+ YQL_ENSURE(QueryState);
+ YQL_ENSURE(QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_TOPIC);
SetTopicWriteId(NLongTxService::TLockHandle(ev->Get()->TxId, TActivationContext::ActorSystem()));
ReplySuccess();
}