diff options
author | gvit <gvit@ydb.tech> | 2022-10-02 18:45:29 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2022-10-02 18:45:29 +0300 |
commit | 4c3d3d183e0584e0dbe29ccffe4a6788c569f4d9 (patch) | |
tree | 1f6c0e479dcaad8fd72e7ff559a92ee67bf035d6 | |
parent | 67cd6af44e2c474a85db66eef80094a410b8804b (diff) | |
download | ydb-4c3d3d183e0584e0dbe29ccffe4a6788c569f4d9.tar.gz |
remove idle timers from hot pass
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 43 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_worker_actor.cpp | 64 | ||||
-rw-r--r-- | ydb/core/kqp/proxy/kqp_proxy_service.cpp | 55 | ||||
-rw-r--r-- | ydb/core/kqp/proxy/kqp_proxy_service.h | 72 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 |
5 files changed, 112 insertions, 123 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 59e8ffbb8c8..16ab0299118 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -177,7 +177,6 @@ public: Config->FeatureFlags = AppData()->FeatureFlags; Become(&TKqpSessionActor::ReadyState); - StartIdleTimer(); } NYql::TKikimrQueryDeadlines GetQueryDeadlines(const NKikimrKqp::TQueryRequest& queryRequest) { @@ -417,8 +416,6 @@ public: return; } - StopIdleTimer(); - CompileQuery(); } @@ -1755,8 +1752,6 @@ public: ? Ydb::Table::KeepAliveResult::SESSION_STATUS_READY : Ydb::Table::KeepAliveResult::SESSION_STATUS_BUSY; record.MutableResponse()->SetSessionStatus(sessionStatus); - StartIdleTimer(); - Send(ev->Sender, result.release(), 0, proxyRequestId); } @@ -1811,35 +1806,6 @@ public: } } - void StartIdleTimer() { - StopIdleTimer(); - - ++IdleTimerId; - auto idleDuration = TDuration::Seconds(*Config->_KqpSessionIdleTimeoutSec.Get()); - IdleTimerActorId = CreateLongTimer(TlsActivationContext->AsActorContext(), idleDuration, - new IEventHandle(SelfId(), SelfId(), new TEvKqp::TEvIdleTimeout(IdleTimerId))); - LOG_D("Created long timer for idle timeout, timer id: " << IdleTimerId - << ", duration: " << idleDuration << ", actor: " << IdleTimerActorId); - } - - void StopIdleTimer() { - if (IdleTimerActorId) { - LOG_D("Destroying long timer actor for idle timout: " << IdleTimerActorId); - Send(IdleTimerActorId, new TEvents::TEvPoisonPill()); - } - IdleTimerActorId = TActorId(); - } - - void Handle(TEvKqp::TEvIdleTimeout::TPtr& ev) { - auto timerId = ev->Get()->TimerId; - - if (timerId == IdleTimerId) { - LOG_N(TKqpRequestInfo("", SessionId) << "SessionActor idle timeout, worker destroyed"); - Counters->ReportSessionActorClosedIdle(Settings.DbCounters); - FinalCleanup(); - } - } - void SendRollbackRequest(TKqpTransactionContext* txCtx) { if (QueryState) { LWTRACK(KqpSessionSendRollback, QueryState->Orbit, QueryState->CurrentTx); @@ -1985,7 +1951,6 @@ public: TransactionsToBeAborted.clear(); CleanupCtx.reset(); QueryState.reset(); - StartIdleTimer(); Become(&TKqpSessionActor::ReadyState); } } @@ -2040,7 +2005,6 @@ public: HFunc(TEvKqp::TEvQueryRequest, HandleReady); hFunc(TEvKqp::TEvPingSessionRequest, Handle); - hFunc(TEvKqp::TEvIdleTimeout, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); @@ -2064,7 +2028,6 @@ public: hFunc(TEvKqp::TEvCloseSessionRequest, HandleCompile); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); - hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); default: UnexpectedEvent("CompileState", ev); } @@ -2092,7 +2055,6 @@ public: hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); - hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); // always come from WorkerActor hFunc(TEvKqp::TEvQueryResponse, ForwardResponse); @@ -2119,7 +2081,6 @@ public: hFunc(TEvKqp::TEvCloseSessionRequest, HandleCleanup); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); - hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); // always come from WorkerActor hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup); @@ -2142,7 +2103,6 @@ public: hFunc(TEvKqp::TEvCloseSessionRequest, HandleTopicOps); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); - hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); default: UnexpectedEvent("TopicOpsState", ev); } @@ -2299,9 +2259,6 @@ private: std::vector<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted; ui64 EvictedTx = 0; std::unique_ptr<TEvKqp::TEvQueryResponse> QueryResponse; - - TActorId IdleTimerActorId; - ui32 IdleTimerId = 0; std::optional<TSessionShutdownState> ShutdownState; TULIDGenerator UlidGen; diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp index 9f2dfb521ed..1e8d50ccb05 100644 --- a/ydb/core/kqp/kqp_worker_actor.cpp +++ b/ydb/core/kqp/kqp_worker_actor.cpp @@ -125,7 +125,6 @@ public: , Config(MakeIntrusive<TKikimrConfiguration>()) , CreationTime(TInstant::Now()) , QueryId(0) - , IdleTimerId(0) , ShutdownState(std::nullopt) { Y_VERIFY(ModuleResolverState); @@ -161,7 +160,6 @@ public: AppData(ctx)->FunctionRegistry, !Settings.LongSession); Become(&TKqpWorkerActor::ReadyState); - StartIdleTimer(ctx); } void HandleReady(TEvKqp::TEvCloseSessionRequest::TPtr &ev, const TActorContext &ctx) { @@ -190,8 +188,6 @@ public: return; } - StartIdleTimer(ctx); - ReplyPingStatus(ev->Sender, proxyRequestId, true, ctx); } @@ -381,8 +377,6 @@ public: // some kind of internal query? or verify here? } - StopIdleTimer(ctx); - if (CompileQuery(ctx)) { if (QueryState) { QueryState->CpuTime += timer.GetTime(); @@ -411,19 +405,6 @@ public: } } - void HandleReady(TEvKqp::TEvIdleTimeout::TPtr &ev, const TActorContext &ctx) { - auto timerId = ev->Get()->TimerId; - LOG_DEBUG_S(ctx, NKikimrServices::KQP_WORKER, "Received TEvIdleTimeout in ready state, timer id: " - << timerId << ", sender: " << ev->Sender); - - if (timerId == IdleTimerId) { - LOG_NOTICE_S(ctx, NKikimrServices::KQP_WORKER, TKqpRequestInfo("", SessionId) - << "Worker idle timeout, worker destroyed"); - Counters->ReportWorkerClosedIdle(Settings.DbCounters); - FinalCleanup(ctx); - } - } - void HandleCompileQuery(TEvKqp::TEvCompileResponse::TPtr &ev, const TActorContext &ctx) { auto compileResult = ev->Get()->CompileResult; @@ -432,7 +413,6 @@ public: if (compileResult->Status != Ydb::StatusIds::SUCCESS) { if (ReplyQueryCompileError(compileResult, ctx)) { - StartIdleTimer(ctx); Become(&TKqpWorkerActor::ReadyState); } else { FinalCleanup(ctx); @@ -455,7 +435,6 @@ public: if (queryRequest.GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE) { if (ReplyPrepareResult(compileResult, ctx)) { - StartIdleTimer(ctx); Become(&TKqpWorkerActor::ReadyState); } else { FinalCleanup(ctx); @@ -507,11 +486,6 @@ public: ReplyPingStatus(ev->Sender, proxyRequestId, false, ctx); } - void HandleCompileQuery(TEvKqp::TEvIdleTimeout::TPtr &ev, const TActorContext &ctx) { - Y_UNUSED(ev); - Y_UNUSED(ctx); - } - void HandlePerformQuery(TEvKqp::TEvQueryRequest::TPtr &ev, const TActorContext &ctx) { ReplyBusy(ev, ctx); } @@ -592,11 +566,6 @@ public: } } - void HandlePerformQuery(TEvKqp::TEvIdleTimeout::TPtr &ev, const TActorContext &ctx) { - Y_UNUSED(ev); - Y_UNUSED(ctx); - } - void HandlePerformCleanup(TEvKqp::TEvQueryRequest::TPtr &ev, const TActorContext &ctx) { ui64 proxyRequestId = ev->Cookie; auto& event = ev->Get()->Record; @@ -677,11 +646,6 @@ public: } } - void HandlePerformCleanup(TEvKqp::TEvIdleTimeout::TPtr &ev, const TActorContext &ctx) { - Y_UNUSED(ev); - Y_UNUSED(ctx); - } - STFUNC(ReadyState) { try { switch (ev->GetTypeRewrite()) { @@ -690,7 +654,6 @@ public: HFunc(TEvKqp::TEvCloseSessionRequest, HandleReady); HFunc(TEvKqp::TEvPingSessionRequest, HandleReady); HFunc(TEvKqp::TEvContinueProcess, HandleReady); - HFunc(TEvKqp::TEvIdleTimeout, HandleReady); HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown); HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown); default: @@ -708,7 +671,6 @@ public: HFunc(TEvKqp::TEvCompileResponse, HandleCompileQuery); HFunc(TEvKqp::TEvCloseSessionRequest, HandleCompileQuery); HFunc(TEvKqp::TEvPingSessionRequest, HandleCompileQuery); - HFunc(TEvKqp::TEvIdleTimeout, HandleCompileQuery); HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown); HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown); default: @@ -727,7 +689,6 @@ public: HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformQuery); HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformQuery); HFunc(TEvKqp::TEvContinueProcess, HandlePerformQuery); - HFunc(TEvKqp::TEvIdleTimeout, HandlePerformQuery); HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown); HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown); default: @@ -746,7 +707,6 @@ public: HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformCleanup); HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformCleanup); HFunc(TEvKqp::TEvContinueProcess, HandlePerformCleanup); - HFunc(TEvKqp::TEvIdleTimeout, HandlePerformCleanup); HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown); HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown); default: @@ -1202,7 +1162,6 @@ private: CleanupState->Start = TInstant::Now(); if (isFinal) { - StopIdleTimer(ctx); Counters->ReportQueriesPerWorker(Settings.DbCounters, QueryId); MakeNewQueryState(); @@ -1251,7 +1210,6 @@ private: Die(ctx); } else { if (ReplyQueryResult(ctx)) { - StartIdleTimer(ctx); Become(&TKqpWorkerActor::ReadyState); } else { FinalCleanup(ctx); @@ -2055,26 +2013,6 @@ private: } } - void StartIdleTimer(const TActorContext& ctx) { - StopIdleTimer(ctx); - - ++IdleTimerId; - auto idleDuration = TDuration::Seconds(Config->_KqpSessionIdleTimeoutSec.Get().GetRef()); - IdleTimerActorId = CreateLongTimer(ctx, idleDuration, - new IEventHandle(ctx.SelfID, ctx.SelfID, new TEvKqp::TEvIdleTimeout(IdleTimerId))); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_WORKER, "Created long timer for idle timeout, timer id: " << IdleTimerId - << ", duration: " << idleDuration << ", actor: " << IdleTimerActorId); - } - - void StopIdleTimer(const TActorContext& ctx) { - if (IdleTimerActorId) { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_WORKER, "Destroying long timer actor for idle timout: " - << IdleTimerActorId); - ctx.Send(IdleTimerActorId, new TEvents::TEvPoisonPill()); - } - IdleTimerActorId = TActorId(); - } - IKikimrQueryExecutor::TExecuteSettings CreateRollbackSettings() { YQL_ENSURE(QueryState); @@ -2161,8 +2099,6 @@ private: ui32 QueryId; THolder<TKqpQueryState> QueryState; THolder<TKqpCleanupState> CleanupState; - ui32 IdleTimerId; - TActorId IdleTimerActorId; std::optional<TSessionShutdownState> ShutdownState; }; diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp index 0fb49de1963..ed5356d5035 100644 --- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp @@ -123,11 +123,11 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> { EvReadyToPublishResources = EventSpaceBegin(TEvents::ES_PRIVATE), EvCollectPeerProxyData, EvOnRequestTimeout, + EvCloseIdleSessions, }; struct TEvReadyToPublishResources : public TEventLocal<TEvReadyToPublishResources, EEv::EvReadyToPublishResources> {}; struct TEvCollectPeerProxyData: public TEventLocal<TEvCollectPeerProxyData, EEv::EvCollectPeerProxyData> {}; - struct TEvOnRequestTimeout: public TEventLocal<TEvOnRequestTimeout, EEv::EvOnRequestTimeout> { public: ui64 RequestId; @@ -135,6 +135,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> { TEvOnRequestTimeout(ui64 requestId, TDuration timeout): RequestId(requestId), Timeout(timeout) {}; }; + + struct TEvCloseIdleSessions : public TEventLocal<TEvCloseIdleSessions, EEv::EvCloseIdleSessions> {}; }; public: @@ -216,6 +218,38 @@ public: PublishResourceUsage(); AskSelfNodeInfo(); SendWhiteboardRequest(); + ScheduleIdleSessionCheck(); + } + + void ScheduleIdleSessionCheck() { + if (!ShutdownState) { + const TDuration IdleSessionsCheckInterval = TDuration::Seconds(2); + Schedule(IdleSessionsCheckInterval, new TEvPrivate::TEvCloseIdleSessions()); + } + } + + void Handle(TEvPrivate::TEvCloseIdleSessions::TPtr&) { + CheckIdleSessions(); + ScheduleIdleSessionCheck(); + } + + void CheckIdleSessions(const ui32 maxSessionsToClose = 10) { + ui32 closedIdleSessions = 0; + const NActors::TMonotonic now = TActivationContext::Monotonic(); + while(true) { + const TKqpSessionInfo* sessionInfo = LocalSessions->GetIdleSession(now); + if (sessionInfo == nullptr || closedIdleSessions > maxSessionsToClose) + break; + + SendSessionClose(sessionInfo); + ++closedIdleSessions; + } + } + + void SendSessionClose(const TKqpSessionInfo* sessionInfo) { + auto closeSessionEv = std::make_unique<TEvKqp::TEvCloseSessionRequest>(); + closeSessionEv->Record.MutableRequest()->SetSessionId(sessionInfo->SessionId); + Send(sessionInfo->WorkerId, closeSessionEv.release()); } void AskSelfNodeInfo() { @@ -342,9 +376,7 @@ public: KQP_PROXY_LOG_I("Received tenant pool status, serving tenants: " << JoinRange(", ", Tenants.begin(), Tenants.end())); for (auto& [_, sessionInfo] : *LocalSessions) { if (!sessionInfo.Database.empty() && !Tenants.contains(sessionInfo.Database)) { - auto closeSessionEv = MakeHolder<TEvKqp::TEvCloseSessionRequest>(); - closeSessionEv->Record.MutableRequest()->SetSessionId(sessionInfo.SessionId); - Send(sessionInfo.WorkerId, closeSessionEv.Release()); + SendSessionClose(&sessionInfo); } } @@ -550,6 +582,7 @@ public: TActorId targetId; if (sessionInfo) { targetId = sessionInfo->WorkerId; + LocalSessions->StopIdleCheck(sessionInfo); } else { targetId = TryGetSessionTargetActor(request.GetSessionId(), requestInfo, requestId); if (!targetId) { @@ -595,6 +628,10 @@ public: } } + TDuration GetSessionIdleDuration() const { + return TDuration::Seconds(TableServiceConfig.GetSessionIdleDurationSeconds()); + } + void Handle(TEvKqp::TEvPingSessionRequest::TPtr& ev) { auto& event = ev->Get()->Record; auto& request = event.GetRequest(); @@ -610,7 +647,9 @@ public: TActorId targetId; if (sessionInfo) { targetId = sessionInfo->WorkerId; + LocalSessions->StopIdleCheck(sessionInfo); } else { + // forward request to kqp proxy on the different node. targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId); if (!targetId) { return; @@ -638,6 +677,11 @@ public: return; } + const TKqpSessionInfo* info = LocalSessions->FindPtr(proxyRequest->SessionId); + if (info) { + LocalSessions->StartIdleCheck(info, GetSessionIdleDuration()); + } + LogResponse(proxyRequest->TraceId, ev->Get()->Record, proxyRequest->DbCounters); Send(proxyRequest->Sender, ev->Release().Release(), 0, proxyRequest->SenderCookie); @@ -997,6 +1041,7 @@ public: hFunc(TEvPrivate::TEvOnRequestTimeout, Handle); hFunc(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse, Handle); hFunc(TEvKqp::TEvCreateSessionResponse, ForwardEvent); + hFunc(TEvPrivate::TEvCloseIdleSessions, Handle); default: Y_FAIL("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s", ev->GetTypeRewrite(), ev->HasEvent() ? ev->GetBase()->ToString().data() : "serialized?"); @@ -1157,7 +1202,7 @@ private: ? CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, ModuleResolverState, Counters) : CreateKqpWorkerActor(SelfId(), sessionId, KqpSettings, workerSettings, ModuleResolverState, Counters); auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(workerActor, TMailboxType::HTSwap, AppData()->UserPoolId); - TKqpSessionInfo* sessionInfo = LocalSessions->Create(sessionId, workerId, database, dbCounters, supportsBalancing); + TKqpSessionInfo* sessionInfo = LocalSessions->Create(sessionId, workerId, database, dbCounters, supportsBalancing, GetSessionIdleDuration()); KQP_PROXY_LOG_D(requestInfo << "Created new session" << ", sessionId: " << sessionInfo->SessionId diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.h b/ydb/core/kqp/proxy/kqp_proxy_service.h index e6d44a176ca..d71b5005594 100644 --- a/ydb/core/kqp/proxy/kqp_proxy_service.h +++ b/ydb/core/kqp/proxy/kqp_proxy_service.h @@ -78,23 +78,26 @@ struct TKqpSessionInfo { TActorId WorkerId; TString Database; TKqpDbCountersPtr DbCounters; - TInstant LastRequestAt; - TInstant CreatedAt; TInstant ShutdownStartedAt; std::vector<i32> ReadyPos; - - TKqpSessionInfo(const TString& sessionId, const TActorId& workerId, - const TString& database, TKqpDbCountersPtr dbCounters, std::vector<i32>&& pos) + NActors::TMonotonic IdleTimeout; + // position in the idle list. + std::list<TKqpSessionInfo*>::iterator IdlePos; + + TKqpSessionInfo( + const TString& sessionId, const TActorId& workerId, + const TString& database, TKqpDbCountersPtr dbCounters, std::vector<i32>&& pos, + NActors::TMonotonic idleTimeout, + std::list<TKqpSessionInfo*>::iterator idlePos) : SessionId(sessionId) , WorkerId(workerId) , Database(database) , DbCounters(dbCounters) , ShutdownStartedAt() , ReadyPos(std::move(pos)) + , IdleTimeout(idleTimeout) + , IdlePos(idlePos) { - auto now = TAppData::TimeProvider->Now(); - LastRequestAt = now; - CreatedAt = now; } }; @@ -113,7 +116,6 @@ struct TSimpleResourceStats { struct TPeerStats { TSimpleResourceStats LocalSessionCount; TSimpleResourceStats CrossAZSessionCount; - TSimpleResourceStats LocalCpu; TSimpleResourceStats CrossAZCpu; @@ -136,6 +138,7 @@ class TLocalSessionsRegistry { THashMap<TString, ui32> SessionsCountPerDatabase; std::vector<std::vector<TString>> ReadySessions; TIntrusivePtr<IRandomProvider> RandomProvider; + std::list<TKqpSessionInfo*> IdleSessions; public: TLocalSessionsRegistry(TIntrusivePtr<IRandomProvider> randomProvider) @@ -144,7 +147,8 @@ public: {} TKqpSessionInfo* Create(const TString& sessionId, const TActorId& workerId, - const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing) + const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing, + TDuration idleDuration) { std::vector<i32> pos(2, -1); pos[0] = ReadySessions[0].size(); @@ -155,13 +159,30 @@ public: ReadySessions[1].push_back(sessionId); } - auto result = LocalSessions.emplace(sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos))); + auto result = LocalSessions.emplace( + sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos), + NActors::TActivationContext::Monotonic() + idleDuration, IdleSessions.end())); SessionsCountPerDatabase[database]++; Y_VERIFY(result.second, "Duplicate session id!"); TargetIdIndex.emplace(workerId, sessionId); + StartIdleCheck(&(result.first->second), idleDuration); return &result.first->second; } + const TKqpSessionInfo* GetIdleSession(const NActors::TMonotonic& now) { + if (IdleSessions.empty()) { + return nullptr; + } + + const TKqpSessionInfo* candidate = (*IdleSessions.begin()); + if (candidate->IdleTimeout > now) { + return nullptr; + } + + IdleSessions.erase(IdleSessions.begin()); + return candidate; + } + const THashSet<TString>& GetShutdownInFlight() const { return ShutdownInFlightSessions; } @@ -196,6 +217,31 @@ public: return ShutdownInFlightSessions.size(); } + void StartIdleCheck(const TKqpSessionInfo* sessionInfo, const TDuration idleDuration) { + if (!sessionInfo) + return; + + TKqpSessionInfo* info = const_cast<TKqpSessionInfo*>(sessionInfo); + + info->IdleTimeout = NActors::TActivationContext::Monotonic() + idleDuration; + if (info->IdlePos != IdleSessions.end()) { + IdleSessions.erase(info->IdlePos); + } + + info->IdlePos = IdleSessions.insert(IdleSessions.end(), info); + } + + void StopIdleCheck(const TKqpSessionInfo* sessionInfo) { + if (!sessionInfo) + return; + + TKqpSessionInfo* info = const_cast<TKqpSessionInfo*>(sessionInfo); + if (info->IdlePos != IdleSessions.end()) { + IdleSessions.erase(info->IdlePos); + info->IdlePos = IdleSessions.end(); + } + } + void Erase(const TString& sessionId) { auto it = LocalSessions.find(sessionId); if (it != LocalSessions.end()) { @@ -208,6 +254,10 @@ public: } RemoveSessionFromLists(&(it->second)); + if (it->second.IdlePos != IdleSessions.end()) { + IdleSessions.erase(it->second.IdlePos); + } + ShutdownInFlightSessions.erase(sessionId); TargetIdIndex.erase(it->second.WorkerId); LocalSessions.erase(it); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index eeee15d6a2c..97b8506f311 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1172,6 +1172,7 @@ message TTableServiceConfig { optional uint32 CompileQueryCacheTTLSec = 20 [default = 0]; optional TQueryReplayConfig QueryReplayConfig = 21; optional bool EnableKqpSessionActor = 23 [default = true]; + optional uint32 SessionIdleDurationSeconds = 24 [default = 600]; }; // Config describes immediate controls and allows |