aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-11-21 16:10:53 +0300
committergvit <gvit@ydb.tech>2023-11-21 18:15:43 +0300
commit06be6a741cec2831db22ba8ee4f62b3160fc08b3 (patch)
treed13c2745d7256c6802f688bfebdd8ed458e42426
parent99c4a7155a930ab632c10d27b5cc62f40c49508a (diff)
downloadydb-06be6a741cec2831db22ba8ee4f62b3160fc08b3.tar.gz
handle create snapshot response in all states KIKIMR-19901
-rw-r--r--ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp14
-rw-r--r--ydb/core/kqp/rm_service/kqp_snapshot_manager.h11
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp19
3 files changed, 32 insertions, 12 deletions
diff --git a/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp b/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp
index a318689c59..cde30702a3 100644
--- a/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp
+++ b/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp
@@ -53,6 +53,7 @@ private:
Tables = ev->Get()->Tables;
MvccSnapshot = ev->Get()->MvccSnapshot;
Orbit = std::move(ev->Get()->Orbit);
+ Cookie = ev->Get()->Cookie;
LOG_D("KqpSnapshotManager: got snapshot request from " << ClientActorId);
@@ -117,7 +118,8 @@ private:
LOG_D("KqpSnapshotManager: snapshot: " << Snapshot << " acquired");
bool sent = Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse(
- Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}, std::move(Orbit)));
+ Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}, std::move(Orbit)),
+ 0, Cookie);
Y_DEBUG_ABORT_UNLESS(sent);
PassAway();
@@ -151,7 +153,8 @@ private:
issues.AddIssue("stale propose TEvProposeTransactionStatus in cleanup state");
Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse(
- IKqpGateway::TKqpSnapshot::InvalidSnapshot, NKikimrIssues::TStatusIds::TIMEOUT, std::move(issues), std::move(Orbit)));
+ IKqpGateway::TKqpSnapshot::InvalidSnapshot, NKikimrIssues::TStatusIds::TIMEOUT, std::move(issues), std::move(Orbit)),
+ 0, Cookie);
PassAway();
}
@@ -170,7 +173,8 @@ private:
LOG_D("KqpSnapshotManager: snapshot " << Snapshot.Step << ":" << Snapshot.TxId << " created");
bool sent = Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse(
- Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}, std::move(Orbit)));
+ Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}, std::move(Orbit)),
+ 0, Cookie);
Y_DEBUG_ABORT_UNLESS(sent);
Become(&TThis::StateRefreshing);
@@ -271,7 +275,8 @@ private:
void ReplyErrorAndDie(NKikimrIssues::TStatusIds::EStatusCode status, NYql::TIssues&& issues) {
if (CurrentStateFunc() == &TThis::StateAwaitCreation || CurrentStateFunc() == &TThis::StateAwaitAcquireResult) {
Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse(
- IKqpGateway::TKqpSnapshot::InvalidSnapshot, status, std::move(issues), std::move(Orbit)));
+ IKqpGateway::TKqpSnapshot::InvalidSnapshot, status, std::move(issues), std::move(Orbit)),
+ 0, Cookie);
} else {
SendDiscard();
}
@@ -284,6 +289,7 @@ private:
TActorId ClientActorId;
IKqpGateway::TKqpSnapshot Snapshot;
NLWTrace::TOrbit Orbit;
+ ui64 Cookie = 0;
bool MvccSnapshot = false;
diff --git a/ydb/core/kqp/rm_service/kqp_snapshot_manager.h b/ydb/core/kqp/rm_service/kqp_snapshot_manager.h
index cfbacd21e2..be307d8dc1 100644
--- a/ydb/core/kqp/rm_service/kqp_snapshot_manager.h
+++ b/ydb/core/kqp/rm_service/kqp_snapshot_manager.h
@@ -13,19 +13,22 @@ struct TEvKqpSnapshot {
struct TEvCreateSnapshotRequest : public TEventLocal<TEvCreateSnapshotRequest,
TKqpSnapshotEvents::EvCreateSnapshotRequest>
{
- explicit TEvCreateSnapshotRequest(const TVector<TString>& tables, NLWTrace::TOrbit&& orbit = {})
+ explicit TEvCreateSnapshotRequest(const TVector<TString>& tables, ui64 cookie, NLWTrace::TOrbit&& orbit = {})
: Tables(tables)
, MvccSnapshot(false)
- , Orbit(std::move(orbit)) {}
+ , Orbit(std::move(orbit))
+ , Cookie(cookie) {}
- explicit TEvCreateSnapshotRequest(NLWTrace::TOrbit&& orbit = {})
+ explicit TEvCreateSnapshotRequest(ui64 cookie, NLWTrace::TOrbit&& orbit = {})
: Tables({})
, MvccSnapshot(true)
- , Orbit(std::move(orbit)) {}
+ , Orbit(std::move(orbit))
+ , Cookie(cookie) {}
const TVector<TString> Tables;
const bool MvccSnapshot;
NLWTrace::TOrbit Orbit;
+ ui64 Cookie;
};
struct TEvCreateSnapshotResponse : public TEventLocal<TEvCreateSnapshotResponse,
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 9c58998b22..490665ff03 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -533,7 +533,7 @@ public:
auto* snapMgr = CreateKqpSnapshotManager(Settings.Database, timeout);
auto snapMgrActorId = RegisterWithSameMailbox(snapMgr);
- auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>(QueryState->PreparedQuery->GetQueryTables(), std::move(QueryState->Orbit));
+ auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>(QueryState->PreparedQuery->GetQueryTables(), QueryId, std::move(QueryState->Orbit));
Send(snapMgrActorId, ev.release());
QueryState->TxCtx->SnapshotHandle.ManagingActor = snapMgrActorId;
@@ -552,7 +552,7 @@ public:
auto* snapMgr = CreateKqpSnapshotManager(Settings.Database, timeout);
auto snapMgrActorId = RegisterWithSameMailbox(snapMgr);
- auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>(std::move(QueryState->Orbit));
+ auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>(QueryId, std::move(QueryState->Orbit));
Send(snapMgrActorId, ev.release());
}
@@ -570,8 +570,13 @@ public:
}
}
- void HandleExecute(TEvKqpSnapshot::TEvCreateSnapshotResponse::TPtr& ev) {
+ void Handle(TEvKqpSnapshot::TEvCreateSnapshotResponse::TPtr& ev) {
+ if (ev->Cookie < QueryId || CurrentStateFunc() != &TThis::ExecuteState) {
+ return;
+ }
+
TTimerGuard timer(this);
+
auto *response = ev->Get();
if (QueryState) {
@@ -2018,6 +2023,7 @@ public:
switch (ev->GetTypeRewrite()) {
HFunc(TEvKqp::TEvQueryRequest, HandleReady);
+ hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
@@ -2046,6 +2052,7 @@ public:
hFunc(TEvKqp::TEvQueryRequest, HandleCompile);
hFunc(TEvKqp::TEvCompileResponse, Handle);
+ hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleCompile);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleCompile);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
@@ -2077,7 +2084,7 @@ public:
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute);
hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleExecute);
- hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, HandleExecute);
+ hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
@@ -2109,6 +2116,7 @@ public:
hFunc(TEvKqp::TEvQueryRequest, HandleCleanup);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleCleanup);
+ hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleCleanup);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
@@ -2136,6 +2144,7 @@ public:
switch (ev->GetTypeRewrite()) {
hFunc(TEvents::TEvGone, HandleFinalCleanup);
hFunc(TEvents::TEvUndelivered, HandleNoop);
+ hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
}
}
@@ -2143,6 +2152,7 @@ public:
switch (ev->GetTypeRewrite()) {
hFunc(TEvents::TEvUndelivered, HandleWaitStats)
hFunc(TEvKqpExecuter::TEvTxResponse, HandleWaitStats);
+ hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
}
}
@@ -2151,6 +2161,7 @@ public:
switch (ev->GetTypeRewrite()) {
hFunc(TEvKqp::TEvQueryRequest, HandleTopicOps);
+ hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleTopicOps);
hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleTopicOps);