diff options
author | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-05-25 23:45:02 +0300 |
---|---|---|
committer | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-05-25 23:45:02 +0300 |
commit | 55377746cfa5edfc6be95ea6fa2242d7e811d69d (patch) | |
tree | eca9334918c60a23bb5f9902ed007ec7315683bc | |
parent | a8c3518112206fa470df64ca692d0e4a3de878f8 (diff) | |
download | ydb-55377746cfa5edfc6be95ea6fa2242d7e811d69d.tar.gz |
Kill WorkerActor after every request KIKIMR-11938
ref:a883d313499794fe3fec8a9aed1fce7768c47a45
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 41 |
1 files changed, 33 insertions, 8 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 16a678d06de..9a2c5d0c4e6 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -92,6 +92,7 @@ struct TKqpCleanupCtx { ui64 AbortedTransactionsCount = 0; ui64 TransactionsToBeAborted = 0; std::vector<IKqpGateway::TExecPhysicalRequest> ExecuterAbortRequests; + bool IsWaitingForWorkerToClose = false; bool Final = false; TInstant Start = TInstant::Now(); }; @@ -176,15 +177,15 @@ public: QueryState = std::make_unique<TKqpQueryState>(); } - template<class T> - void ForwardRequest(T& ev) { + void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) { if (!WorkerId) { - std::unique_ptr<IActor> workerActor(CreateKqpWorkerActor(Owner, SessionId, KqpSettings, Settings, + std::unique_ptr<IActor> workerActor(CreateKqpWorkerActor(SelfId(), SessionId, KqpSettings, Settings, ModuleResolverState, Counters)); WorkerId = RegisterWithSameMailbox(workerActor.release()); } TlsActivationContext->Send(new IEventHandle(*WorkerId, SelfId(), ev->Release().Release(), ev->Flags, ev->Cookie, nullptr, std::move(ev->TraceId))); + Become(&TKqpSessionActor::ExecuteState); } void ForwardResponse(TEvKqp::TEvQueryResponse::TPtr& ev) { @@ -443,7 +444,7 @@ public: YQL_ENSURE(compileResult->PreparedQuery); const ui32 compiledVersion = compileResult->PreparedQuery->GetVersion(); YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1, - "Invalid compiled version: " << compiledVersion); + "SessionActor can not execute OldEngine requests (invalid compiled version: " << compiledVersion << ")"); QueryState->CompileResult = compileResult; QueryState->CompileStats.Swap(&ev->Get()->Stats); @@ -1660,9 +1661,20 @@ public: ExplicitTransactions.Clear(); } - if (TransactionsToBeAborted.size()) { + if (WorkerId) { + auto ev = std::make_unique<TEvKqp::TEvCloseSessionRequest>(); + ev->Record.MutableRequest()->SetSessionId(SessionId); + Send(*WorkerId, ev.release()); + WorkerId.reset(); + YQL_ENSURE(!CleanupCtx); CleanupCtx.reset(new TKqpCleanupCtx); + CleanupCtx->IsWaitingForWorkerToClose = true; + } + + if (TransactionsToBeAborted.size()) { + if (!CleanupCtx) + CleanupCtx.reset(new TKqpCleanupCtx); CleanupCtx->Final = isFinal; CleanupCtx->AbortedTransactionsCount = 0; CleanupCtx->TransactionsToBeAborted = TransactionsToBeAborted.size(); @@ -1679,6 +1691,13 @@ public: } } + void HandleCleanup(TEvKqp::TEvCloseSessionResponse::TPtr&) { + CleanupCtx->IsWaitingForWorkerToClose = false; + if (CleanupCtx->AbortedTransactionsCount == CleanupCtx->TransactionsToBeAborted) { + EndCleanup(CleanupCtx->Final); + } + } + void HandleCleanup(TEvKqpExecuter::TEvTxResponse::TPtr& ev) { auto& response = ev->Get()->Record.GetResponse(); if (response.GetStatus() != Ydb::StatusIds::SUCCESS) { @@ -1695,7 +1714,8 @@ public: auto& txCtx = TransactionsToBeAborted[CleanupCtx->AbortedTransactionsCount]; SendRollbackRequest(txCtx.Get()); } else { - EndCleanup(CleanupCtx->Final); + if (!CleanupCtx->IsWaitingForWorkerToClose) + EndCleanup(CleanupCtx->Final); } } @@ -1784,8 +1804,6 @@ public: hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); - - hFunc(TEvKqp::TEvQueryResponse, ForwardResponse); default: UnexpectedEvent("ReadyState", ev); } @@ -1835,6 +1853,9 @@ public: hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); + + // always come from WorkerActor + hFunc(TEvKqp::TEvQueryResponse, ForwardResponse); default: UnexpectedEvent("ExecuteState", ev); } @@ -1859,6 +1880,9 @@ public: hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); + + // always come from WorkerActor + hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup); default: UnexpectedEvent("CleanupState", ev); } @@ -1866,6 +1890,7 @@ public: InternalError(ex.what()); } } + private: void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) { InternalError(TStringBuilder() << "TKqpSessionActor in state " << state << " recieve unexpected event " << |