aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-05-25 23:45:02 +0300
committerVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-05-25 23:45:02 +0300
commit55377746cfa5edfc6be95ea6fa2242d7e811d69d (patch)
treeeca9334918c60a23bb5f9902ed007ec7315683bc
parenta8c3518112206fa470df64ca692d0e4a3de878f8 (diff)
downloadydb-55377746cfa5edfc6be95ea6fa2242d7e811d69d.tar.gz
Kill WorkerActor after every request KIKIMR-11938
ref:a883d313499794fe3fec8a9aed1fce7768c47a45
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp41
1 files changed, 33 insertions, 8 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 16a678d06de..9a2c5d0c4e6 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -92,6 +92,7 @@ struct TKqpCleanupCtx {
ui64 AbortedTransactionsCount = 0;
ui64 TransactionsToBeAborted = 0;
std::vector<IKqpGateway::TExecPhysicalRequest> ExecuterAbortRequests;
+ bool IsWaitingForWorkerToClose = false;
bool Final = false;
TInstant Start = TInstant::Now();
};
@@ -176,15 +177,15 @@ public:
QueryState = std::make_unique<TKqpQueryState>();
}
- template<class T>
- void ForwardRequest(T& ev) {
+ void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) {
if (!WorkerId) {
- std::unique_ptr<IActor> workerActor(CreateKqpWorkerActor(Owner, SessionId, KqpSettings, Settings,
+ std::unique_ptr<IActor> workerActor(CreateKqpWorkerActor(SelfId(), SessionId, KqpSettings, Settings,
ModuleResolverState, Counters));
WorkerId = RegisterWithSameMailbox(workerActor.release());
}
TlsActivationContext->Send(new IEventHandle(*WorkerId, SelfId(), ev->Release().Release(), ev->Flags, ev->Cookie,
nullptr, std::move(ev->TraceId)));
+ Become(&TKqpSessionActor::ExecuteState);
}
void ForwardResponse(TEvKqp::TEvQueryResponse::TPtr& ev) {
@@ -443,7 +444,7 @@ public:
YQL_ENSURE(compileResult->PreparedQuery);
const ui32 compiledVersion = compileResult->PreparedQuery->GetVersion();
YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1,
- "Invalid compiled version: " << compiledVersion);
+ "SessionActor can not execute OldEngine requests (invalid compiled version: " << compiledVersion << ")");
QueryState->CompileResult = compileResult;
QueryState->CompileStats.Swap(&ev->Get()->Stats);
@@ -1660,9 +1661,20 @@ public:
ExplicitTransactions.Clear();
}
- if (TransactionsToBeAborted.size()) {
+ if (WorkerId) {
+ auto ev = std::make_unique<TEvKqp::TEvCloseSessionRequest>();
+ ev->Record.MutableRequest()->SetSessionId(SessionId);
+ Send(*WorkerId, ev.release());
+ WorkerId.reset();
+
YQL_ENSURE(!CleanupCtx);
CleanupCtx.reset(new TKqpCleanupCtx);
+ CleanupCtx->IsWaitingForWorkerToClose = true;
+ }
+
+ if (TransactionsToBeAborted.size()) {
+ if (!CleanupCtx)
+ CleanupCtx.reset(new TKqpCleanupCtx);
CleanupCtx->Final = isFinal;
CleanupCtx->AbortedTransactionsCount = 0;
CleanupCtx->TransactionsToBeAborted = TransactionsToBeAborted.size();
@@ -1679,6 +1691,13 @@ public:
}
}
+ void HandleCleanup(TEvKqp::TEvCloseSessionResponse::TPtr&) {
+ CleanupCtx->IsWaitingForWorkerToClose = false;
+ if (CleanupCtx->AbortedTransactionsCount == CleanupCtx->TransactionsToBeAborted) {
+ EndCleanup(CleanupCtx->Final);
+ }
+ }
+
void HandleCleanup(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
auto& response = ev->Get()->Record.GetResponse();
if (response.GetStatus() != Ydb::StatusIds::SUCCESS) {
@@ -1695,7 +1714,8 @@ public:
auto& txCtx = TransactionsToBeAborted[CleanupCtx->AbortedTransactionsCount];
SendRollbackRequest(txCtx.Get());
} else {
- EndCleanup(CleanupCtx->Final);
+ if (!CleanupCtx->IsWaitingForWorkerToClose)
+ EndCleanup(CleanupCtx->Final);
}
}
@@ -1784,8 +1804,6 @@ public:
hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
-
- hFunc(TEvKqp::TEvQueryResponse, ForwardResponse);
default:
UnexpectedEvent("ReadyState", ev);
}
@@ -1835,6 +1853,9 @@ public:
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
+
+ // always come from WorkerActor
+ hFunc(TEvKqp::TEvQueryResponse, ForwardResponse);
default:
UnexpectedEvent("ExecuteState", ev);
}
@@ -1859,6 +1880,9 @@ public:
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
+
+ // always come from WorkerActor
+ hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup);
default:
UnexpectedEvent("CleanupState", ev);
}
@@ -1866,6 +1890,7 @@ public:
InternalError(ex.what());
}
}
+
private:
void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) {
InternalError(TStringBuilder() << "TKqpSessionActor in state " << state << " recieve unexpected event " <<