diff options
author | gvit <gvit@ydb.tech> | 2023-11-21 16:10:53 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-11-21 18:15:43 +0300 |
commit | 06be6a741cec2831db22ba8ee4f62b3160fc08b3 (patch) | |
tree | d13c2745d7256c6802f688bfebdd8ed458e42426 | |
parent | 99c4a7155a930ab632c10d27b5cc62f40c49508a (diff) | |
download | ydb-06be6a741cec2831db22ba8ee4f62b3160fc08b3.tar.gz |
handle create snapshot response in all states KIKIMR-19901
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_snapshot_manager.h | 11 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 19 |
3 files changed, 32 insertions, 12 deletions
diff --git a/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp b/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp index a318689c59..cde30702a3 100644 --- a/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp +++ b/ydb/core/kqp/rm_service/kqp_snapshot_manager.cpp @@ -53,6 +53,7 @@ private: Tables = ev->Get()->Tables; MvccSnapshot = ev->Get()->MvccSnapshot; Orbit = std::move(ev->Get()->Orbit); + Cookie = ev->Get()->Cookie; LOG_D("KqpSnapshotManager: got snapshot request from " << ClientActorId); @@ -117,7 +118,8 @@ private: LOG_D("KqpSnapshotManager: snapshot: " << Snapshot << " acquired"); bool sent = Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse( - Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}, std::move(Orbit))); + Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}, std::move(Orbit)), + 0, Cookie); Y_DEBUG_ABORT_UNLESS(sent); PassAway(); @@ -151,7 +153,8 @@ private: issues.AddIssue("stale propose TEvProposeTransactionStatus in cleanup state"); Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse( - IKqpGateway::TKqpSnapshot::InvalidSnapshot, NKikimrIssues::TStatusIds::TIMEOUT, std::move(issues), std::move(Orbit))); + IKqpGateway::TKqpSnapshot::InvalidSnapshot, NKikimrIssues::TStatusIds::TIMEOUT, std::move(issues), std::move(Orbit)), + 0, Cookie); PassAway(); } @@ -170,7 +173,8 @@ private: LOG_D("KqpSnapshotManager: snapshot " << Snapshot.Step << ":" << Snapshot.TxId << " created"); bool sent = Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse( - Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}, std::move(Orbit))); + Snapshot, NKikimrIssues::TStatusIds::SUCCESS, /* issues */ {}, std::move(Orbit)), + 0, Cookie); Y_DEBUG_ABORT_UNLESS(sent); Become(&TThis::StateRefreshing); @@ -271,7 +275,8 @@ private: void ReplyErrorAndDie(NKikimrIssues::TStatusIds::EStatusCode status, NYql::TIssues&& issues) { if (CurrentStateFunc() == &TThis::StateAwaitCreation || CurrentStateFunc() == &TThis::StateAwaitAcquireResult) { Send(ClientActorId, new TEvKqpSnapshot::TEvCreateSnapshotResponse( - IKqpGateway::TKqpSnapshot::InvalidSnapshot, status, std::move(issues), std::move(Orbit))); + IKqpGateway::TKqpSnapshot::InvalidSnapshot, status, std::move(issues), std::move(Orbit)), + 0, Cookie); } else { SendDiscard(); } @@ -284,6 +289,7 @@ private: TActorId ClientActorId; IKqpGateway::TKqpSnapshot Snapshot; NLWTrace::TOrbit Orbit; + ui64 Cookie = 0; bool MvccSnapshot = false; diff --git a/ydb/core/kqp/rm_service/kqp_snapshot_manager.h b/ydb/core/kqp/rm_service/kqp_snapshot_manager.h index cfbacd21e2..be307d8dc1 100644 --- a/ydb/core/kqp/rm_service/kqp_snapshot_manager.h +++ b/ydb/core/kqp/rm_service/kqp_snapshot_manager.h @@ -13,19 +13,22 @@ struct TEvKqpSnapshot { struct TEvCreateSnapshotRequest : public TEventLocal<TEvCreateSnapshotRequest, TKqpSnapshotEvents::EvCreateSnapshotRequest> { - explicit TEvCreateSnapshotRequest(const TVector<TString>& tables, NLWTrace::TOrbit&& orbit = {}) + explicit TEvCreateSnapshotRequest(const TVector<TString>& tables, ui64 cookie, NLWTrace::TOrbit&& orbit = {}) : Tables(tables) , MvccSnapshot(false) - , Orbit(std::move(orbit)) {} + , Orbit(std::move(orbit)) + , Cookie(cookie) {} - explicit TEvCreateSnapshotRequest(NLWTrace::TOrbit&& orbit = {}) + explicit TEvCreateSnapshotRequest(ui64 cookie, NLWTrace::TOrbit&& orbit = {}) : Tables({}) , MvccSnapshot(true) - , Orbit(std::move(orbit)) {} + , Orbit(std::move(orbit)) + , Cookie(cookie) {} const TVector<TString> Tables; const bool MvccSnapshot; NLWTrace::TOrbit Orbit; + ui64 Cookie; }; struct TEvCreateSnapshotResponse : public TEventLocal<TEvCreateSnapshotResponse, diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 9c58998b22..490665ff03 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -533,7 +533,7 @@ public: auto* snapMgr = CreateKqpSnapshotManager(Settings.Database, timeout); auto snapMgrActorId = RegisterWithSameMailbox(snapMgr); - auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>(QueryState->PreparedQuery->GetQueryTables(), std::move(QueryState->Orbit)); + auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>(QueryState->PreparedQuery->GetQueryTables(), QueryId, std::move(QueryState->Orbit)); Send(snapMgrActorId, ev.release()); QueryState->TxCtx->SnapshotHandle.ManagingActor = snapMgrActorId; @@ -552,7 +552,7 @@ public: auto* snapMgr = CreateKqpSnapshotManager(Settings.Database, timeout); auto snapMgrActorId = RegisterWithSameMailbox(snapMgr); - auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>(std::move(QueryState->Orbit)); + auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>(QueryId, std::move(QueryState->Orbit)); Send(snapMgrActorId, ev.release()); } @@ -570,8 +570,13 @@ public: } } - void HandleExecute(TEvKqpSnapshot::TEvCreateSnapshotResponse::TPtr& ev) { + void Handle(TEvKqpSnapshot::TEvCreateSnapshotResponse::TPtr& ev) { + if (ev->Cookie < QueryId || CurrentStateFunc() != &TThis::ExecuteState) { + return; + } + TTimerGuard timer(this); + auto *response = ev->Get(); if (QueryState) { @@ -2018,6 +2023,7 @@ public: switch (ev->GetTypeRewrite()) { HFunc(TEvKqp::TEvQueryRequest, HandleReady); + hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); @@ -2046,6 +2052,7 @@ public: hFunc(TEvKqp::TEvQueryRequest, HandleCompile); hFunc(TEvKqp::TEvCompileResponse, Handle); + hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleCompile); hFunc(TEvKqp::TEvCloseSessionRequest, HandleCompile); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); @@ -2077,7 +2084,7 @@ public: hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute); hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleExecute); - hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, HandleExecute); + hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); @@ -2109,6 +2116,7 @@ public: hFunc(TEvKqp::TEvQueryRequest, HandleCleanup); hFunc(TEvKqpExecuter::TEvTxResponse, HandleCleanup); + hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, HandleCleanup); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); @@ -2136,6 +2144,7 @@ public: switch (ev->GetTypeRewrite()) { hFunc(TEvents::TEvGone, HandleFinalCleanup); hFunc(TEvents::TEvUndelivered, HandleNoop); + hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); } } @@ -2143,6 +2152,7 @@ public: switch (ev->GetTypeRewrite()) { hFunc(TEvents::TEvUndelivered, HandleWaitStats) hFunc(TEvKqpExecuter::TEvTxResponse, HandleWaitStats); + hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); } } @@ -2151,6 +2161,7 @@ public: switch (ev->GetTypeRewrite()) { hFunc(TEvKqp::TEvQueryRequest, HandleTopicOps); + hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleTopicOps); hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleTopicOps); |