aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2022-10-02 18:45:29 +0300
committergvit <gvit@ydb.tech>2022-10-02 18:45:29 +0300
commit4c3d3d183e0584e0dbe29ccffe4a6788c569f4d9 (patch)
tree1f6c0e479dcaad8fd72e7ff559a92ee67bf035d6
parent67cd6af44e2c474a85db66eef80094a410b8804b (diff)
downloadydb-4c3d3d183e0584e0dbe29ccffe4a6788c569f4d9.tar.gz
remove idle timers from hot pass
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp43
-rw-r--r--ydb/core/kqp/kqp_worker_actor.cpp64
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_service.cpp55
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_service.h72
-rw-r--r--ydb/core/protos/config.proto1
5 files changed, 112 insertions, 123 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 59e8ffbb8c8..16ab0299118 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -177,7 +177,6 @@ public:
Config->FeatureFlags = AppData()->FeatureFlags;
Become(&TKqpSessionActor::ReadyState);
- StartIdleTimer();
}
NYql::TKikimrQueryDeadlines GetQueryDeadlines(const NKikimrKqp::TQueryRequest& queryRequest) {
@@ -417,8 +416,6 @@ public:
return;
}
- StopIdleTimer();
-
CompileQuery();
}
@@ -1755,8 +1752,6 @@ public:
? Ydb::Table::KeepAliveResult::SESSION_STATUS_READY
: Ydb::Table::KeepAliveResult::SESSION_STATUS_BUSY;
record.MutableResponse()->SetSessionStatus(sessionStatus);
- StartIdleTimer();
-
Send(ev->Sender, result.release(), 0, proxyRequestId);
}
@@ -1811,35 +1806,6 @@ public:
}
}
- void StartIdleTimer() {
- StopIdleTimer();
-
- ++IdleTimerId;
- auto idleDuration = TDuration::Seconds(*Config->_KqpSessionIdleTimeoutSec.Get());
- IdleTimerActorId = CreateLongTimer(TlsActivationContext->AsActorContext(), idleDuration,
- new IEventHandle(SelfId(), SelfId(), new TEvKqp::TEvIdleTimeout(IdleTimerId)));
- LOG_D("Created long timer for idle timeout, timer id: " << IdleTimerId
- << ", duration: " << idleDuration << ", actor: " << IdleTimerActorId);
- }
-
- void StopIdleTimer() {
- if (IdleTimerActorId) {
- LOG_D("Destroying long timer actor for idle timout: " << IdleTimerActorId);
- Send(IdleTimerActorId, new TEvents::TEvPoisonPill());
- }
- IdleTimerActorId = TActorId();
- }
-
- void Handle(TEvKqp::TEvIdleTimeout::TPtr& ev) {
- auto timerId = ev->Get()->TimerId;
-
- if (timerId == IdleTimerId) {
- LOG_N(TKqpRequestInfo("", SessionId) << "SessionActor idle timeout, worker destroyed");
- Counters->ReportSessionActorClosedIdle(Settings.DbCounters);
- FinalCleanup();
- }
- }
-
void SendRollbackRequest(TKqpTransactionContext* txCtx) {
if (QueryState) {
LWTRACK(KqpSessionSendRollback, QueryState->Orbit, QueryState->CurrentTx);
@@ -1985,7 +1951,6 @@ public:
TransactionsToBeAborted.clear();
CleanupCtx.reset();
QueryState.reset();
- StartIdleTimer();
Become(&TKqpSessionActor::ReadyState);
}
}
@@ -2040,7 +2005,6 @@ public:
HFunc(TEvKqp::TEvQueryRequest, HandleReady);
hFunc(TEvKqp::TEvPingSessionRequest, Handle);
- hFunc(TEvKqp::TEvIdleTimeout, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
@@ -2064,7 +2028,6 @@ public:
hFunc(TEvKqp::TEvCloseSessionRequest, HandleCompile);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
- hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
default:
UnexpectedEvent("CompileState", ev);
}
@@ -2092,7 +2055,6 @@ public:
hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
- hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
// always come from WorkerActor
hFunc(TEvKqp::TEvQueryResponse, ForwardResponse);
@@ -2119,7 +2081,6 @@ public:
hFunc(TEvKqp::TEvCloseSessionRequest, HandleCleanup);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
- hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
// always come from WorkerActor
hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup);
@@ -2142,7 +2103,6 @@ public:
hFunc(TEvKqp::TEvCloseSessionRequest, HandleTopicOps);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
- hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
default:
UnexpectedEvent("TopicOpsState", ev);
}
@@ -2299,9 +2259,6 @@ private:
std::vector<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted;
ui64 EvictedTx = 0;
std::unique_ptr<TEvKqp::TEvQueryResponse> QueryResponse;
-
- TActorId IdleTimerActorId;
- ui32 IdleTimerId = 0;
std::optional<TSessionShutdownState> ShutdownState;
TULIDGenerator UlidGen;
diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp
index 9f2dfb521ed..1e8d50ccb05 100644
--- a/ydb/core/kqp/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/kqp_worker_actor.cpp
@@ -125,7 +125,6 @@ public:
, Config(MakeIntrusive<TKikimrConfiguration>())
, CreationTime(TInstant::Now())
, QueryId(0)
- , IdleTimerId(0)
, ShutdownState(std::nullopt)
{
Y_VERIFY(ModuleResolverState);
@@ -161,7 +160,6 @@ public:
AppData(ctx)->FunctionRegistry, !Settings.LongSession);
Become(&TKqpWorkerActor::ReadyState);
- StartIdleTimer(ctx);
}
void HandleReady(TEvKqp::TEvCloseSessionRequest::TPtr &ev, const TActorContext &ctx) {
@@ -190,8 +188,6 @@ public:
return;
}
- StartIdleTimer(ctx);
-
ReplyPingStatus(ev->Sender, proxyRequestId, true, ctx);
}
@@ -381,8 +377,6 @@ public:
// some kind of internal query? or verify here?
}
- StopIdleTimer(ctx);
-
if (CompileQuery(ctx)) {
if (QueryState) {
QueryState->CpuTime += timer.GetTime();
@@ -411,19 +405,6 @@ public:
}
}
- void HandleReady(TEvKqp::TEvIdleTimeout::TPtr &ev, const TActorContext &ctx) {
- auto timerId = ev->Get()->TimerId;
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_WORKER, "Received TEvIdleTimeout in ready state, timer id: "
- << timerId << ", sender: " << ev->Sender);
-
- if (timerId == IdleTimerId) {
- LOG_NOTICE_S(ctx, NKikimrServices::KQP_WORKER, TKqpRequestInfo("", SessionId)
- << "Worker idle timeout, worker destroyed");
- Counters->ReportWorkerClosedIdle(Settings.DbCounters);
- FinalCleanup(ctx);
- }
- }
-
void HandleCompileQuery(TEvKqp::TEvCompileResponse::TPtr &ev, const TActorContext &ctx) {
auto compileResult = ev->Get()->CompileResult;
@@ -432,7 +413,6 @@ public:
if (compileResult->Status != Ydb::StatusIds::SUCCESS) {
if (ReplyQueryCompileError(compileResult, ctx)) {
- StartIdleTimer(ctx);
Become(&TKqpWorkerActor::ReadyState);
} else {
FinalCleanup(ctx);
@@ -455,7 +435,6 @@ public:
if (queryRequest.GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE) {
if (ReplyPrepareResult(compileResult, ctx)) {
- StartIdleTimer(ctx);
Become(&TKqpWorkerActor::ReadyState);
} else {
FinalCleanup(ctx);
@@ -507,11 +486,6 @@ public:
ReplyPingStatus(ev->Sender, proxyRequestId, false, ctx);
}
- void HandleCompileQuery(TEvKqp::TEvIdleTimeout::TPtr &ev, const TActorContext &ctx) {
- Y_UNUSED(ev);
- Y_UNUSED(ctx);
- }
-
void HandlePerformQuery(TEvKqp::TEvQueryRequest::TPtr &ev, const TActorContext &ctx) {
ReplyBusy(ev, ctx);
}
@@ -592,11 +566,6 @@ public:
}
}
- void HandlePerformQuery(TEvKqp::TEvIdleTimeout::TPtr &ev, const TActorContext &ctx) {
- Y_UNUSED(ev);
- Y_UNUSED(ctx);
- }
-
void HandlePerformCleanup(TEvKqp::TEvQueryRequest::TPtr &ev, const TActorContext &ctx) {
ui64 proxyRequestId = ev->Cookie;
auto& event = ev->Get()->Record;
@@ -677,11 +646,6 @@ public:
}
}
- void HandlePerformCleanup(TEvKqp::TEvIdleTimeout::TPtr &ev, const TActorContext &ctx) {
- Y_UNUSED(ev);
- Y_UNUSED(ctx);
- }
-
STFUNC(ReadyState) {
try {
switch (ev->GetTypeRewrite()) {
@@ -690,7 +654,6 @@ public:
HFunc(TEvKqp::TEvCloseSessionRequest, HandleReady);
HFunc(TEvKqp::TEvPingSessionRequest, HandleReady);
HFunc(TEvKqp::TEvContinueProcess, HandleReady);
- HFunc(TEvKqp::TEvIdleTimeout, HandleReady);
HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown);
HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown);
default:
@@ -708,7 +671,6 @@ public:
HFunc(TEvKqp::TEvCompileResponse, HandleCompileQuery);
HFunc(TEvKqp::TEvCloseSessionRequest, HandleCompileQuery);
HFunc(TEvKqp::TEvPingSessionRequest, HandleCompileQuery);
- HFunc(TEvKqp::TEvIdleTimeout, HandleCompileQuery);
HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown);
HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown);
default:
@@ -727,7 +689,6 @@ public:
HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformQuery);
HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformQuery);
HFunc(TEvKqp::TEvContinueProcess, HandlePerformQuery);
- HFunc(TEvKqp::TEvIdleTimeout, HandlePerformQuery);
HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown);
HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown);
default:
@@ -746,7 +707,6 @@ public:
HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformCleanup);
HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformCleanup);
HFunc(TEvKqp::TEvContinueProcess, HandlePerformCleanup);
- HFunc(TEvKqp::TEvIdleTimeout, HandlePerformCleanup);
HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown);
HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown);
default:
@@ -1202,7 +1162,6 @@ private:
CleanupState->Start = TInstant::Now();
if (isFinal) {
- StopIdleTimer(ctx);
Counters->ReportQueriesPerWorker(Settings.DbCounters, QueryId);
MakeNewQueryState();
@@ -1251,7 +1210,6 @@ private:
Die(ctx);
} else {
if (ReplyQueryResult(ctx)) {
- StartIdleTimer(ctx);
Become(&TKqpWorkerActor::ReadyState);
} else {
FinalCleanup(ctx);
@@ -2055,26 +2013,6 @@ private:
}
}
- void StartIdleTimer(const TActorContext& ctx) {
- StopIdleTimer(ctx);
-
- ++IdleTimerId;
- auto idleDuration = TDuration::Seconds(Config->_KqpSessionIdleTimeoutSec.Get().GetRef());
- IdleTimerActorId = CreateLongTimer(ctx, idleDuration,
- new IEventHandle(ctx.SelfID, ctx.SelfID, new TEvKqp::TEvIdleTimeout(IdleTimerId)));
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_WORKER, "Created long timer for idle timeout, timer id: " << IdleTimerId
- << ", duration: " << idleDuration << ", actor: " << IdleTimerActorId);
- }
-
- void StopIdleTimer(const TActorContext& ctx) {
- if (IdleTimerActorId) {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_WORKER, "Destroying long timer actor for idle timout: "
- << IdleTimerActorId);
- ctx.Send(IdleTimerActorId, new TEvents::TEvPoisonPill());
- }
- IdleTimerActorId = TActorId();
- }
-
IKikimrQueryExecutor::TExecuteSettings CreateRollbackSettings() {
YQL_ENSURE(QueryState);
@@ -2161,8 +2099,6 @@ private:
ui32 QueryId;
THolder<TKqpQueryState> QueryState;
THolder<TKqpCleanupState> CleanupState;
- ui32 IdleTimerId;
- TActorId IdleTimerActorId;
std::optional<TSessionShutdownState> ShutdownState;
};
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
index 0fb49de1963..ed5356d5035 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
@@ -123,11 +123,11 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
EvReadyToPublishResources = EventSpaceBegin(TEvents::ES_PRIVATE),
EvCollectPeerProxyData,
EvOnRequestTimeout,
+ EvCloseIdleSessions,
};
struct TEvReadyToPublishResources : public TEventLocal<TEvReadyToPublishResources, EEv::EvReadyToPublishResources> {};
struct TEvCollectPeerProxyData: public TEventLocal<TEvCollectPeerProxyData, EEv::EvCollectPeerProxyData> {};
-
struct TEvOnRequestTimeout: public TEventLocal<TEvOnRequestTimeout, EEv::EvOnRequestTimeout> {
public:
ui64 RequestId;
@@ -135,6 +135,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
TEvOnRequestTimeout(ui64 requestId, TDuration timeout): RequestId(requestId), Timeout(timeout) {};
};
+
+ struct TEvCloseIdleSessions : public TEventLocal<TEvCloseIdleSessions, EEv::EvCloseIdleSessions> {};
};
public:
@@ -216,6 +218,38 @@ public:
PublishResourceUsage();
AskSelfNodeInfo();
SendWhiteboardRequest();
+ ScheduleIdleSessionCheck();
+ }
+
+ void ScheduleIdleSessionCheck() {
+ if (!ShutdownState) {
+ const TDuration IdleSessionsCheckInterval = TDuration::Seconds(2);
+ Schedule(IdleSessionsCheckInterval, new TEvPrivate::TEvCloseIdleSessions());
+ }
+ }
+
+ void Handle(TEvPrivate::TEvCloseIdleSessions::TPtr&) {
+ CheckIdleSessions();
+ ScheduleIdleSessionCheck();
+ }
+
+ void CheckIdleSessions(const ui32 maxSessionsToClose = 10) {
+ ui32 closedIdleSessions = 0;
+ const NActors::TMonotonic now = TActivationContext::Monotonic();
+ while(true) {
+ const TKqpSessionInfo* sessionInfo = LocalSessions->GetIdleSession(now);
+ if (sessionInfo == nullptr || closedIdleSessions > maxSessionsToClose)
+ break;
+
+ SendSessionClose(sessionInfo);
+ ++closedIdleSessions;
+ }
+ }
+
+ void SendSessionClose(const TKqpSessionInfo* sessionInfo) {
+ auto closeSessionEv = std::make_unique<TEvKqp::TEvCloseSessionRequest>();
+ closeSessionEv->Record.MutableRequest()->SetSessionId(sessionInfo->SessionId);
+ Send(sessionInfo->WorkerId, closeSessionEv.release());
}
void AskSelfNodeInfo() {
@@ -342,9 +376,7 @@ public:
KQP_PROXY_LOG_I("Received tenant pool status, serving tenants: " << JoinRange(", ", Tenants.begin(), Tenants.end()));
for (auto& [_, sessionInfo] : *LocalSessions) {
if (!sessionInfo.Database.empty() && !Tenants.contains(sessionInfo.Database)) {
- auto closeSessionEv = MakeHolder<TEvKqp::TEvCloseSessionRequest>();
- closeSessionEv->Record.MutableRequest()->SetSessionId(sessionInfo.SessionId);
- Send(sessionInfo.WorkerId, closeSessionEv.Release());
+ SendSessionClose(&sessionInfo);
}
}
@@ -550,6 +582,7 @@ public:
TActorId targetId;
if (sessionInfo) {
targetId = sessionInfo->WorkerId;
+ LocalSessions->StopIdleCheck(sessionInfo);
} else {
targetId = TryGetSessionTargetActor(request.GetSessionId(), requestInfo, requestId);
if (!targetId) {
@@ -595,6 +628,10 @@ public:
}
}
+ TDuration GetSessionIdleDuration() const {
+ return TDuration::Seconds(TableServiceConfig.GetSessionIdleDurationSeconds());
+ }
+
void Handle(TEvKqp::TEvPingSessionRequest::TPtr& ev) {
auto& event = ev->Get()->Record;
auto& request = event.GetRequest();
@@ -610,7 +647,9 @@ public:
TActorId targetId;
if (sessionInfo) {
targetId = sessionInfo->WorkerId;
+ LocalSessions->StopIdleCheck(sessionInfo);
} else {
+ // forward request to kqp proxy on the different node.
targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId);
if (!targetId) {
return;
@@ -638,6 +677,11 @@ public:
return;
}
+ const TKqpSessionInfo* info = LocalSessions->FindPtr(proxyRequest->SessionId);
+ if (info) {
+ LocalSessions->StartIdleCheck(info, GetSessionIdleDuration());
+ }
+
LogResponse(proxyRequest->TraceId, ev->Get()->Record, proxyRequest->DbCounters);
Send(proxyRequest->Sender, ev->Release().Release(), 0, proxyRequest->SenderCookie);
@@ -997,6 +1041,7 @@ public:
hFunc(TEvPrivate::TEvOnRequestTimeout, Handle);
hFunc(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse, Handle);
hFunc(TEvKqp::TEvCreateSessionResponse, ForwardEvent);
+ hFunc(TEvPrivate::TEvCloseIdleSessions, Handle);
default:
Y_FAIL("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s",
ev->GetTypeRewrite(), ev->HasEvent() ? ev->GetBase()->ToString().data() : "serialized?");
@@ -1157,7 +1202,7 @@ private:
? CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, ModuleResolverState, Counters)
: CreateKqpWorkerActor(SelfId(), sessionId, KqpSettings, workerSettings, ModuleResolverState, Counters);
auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(workerActor, TMailboxType::HTSwap, AppData()->UserPoolId);
- TKqpSessionInfo* sessionInfo = LocalSessions->Create(sessionId, workerId, database, dbCounters, supportsBalancing);
+ TKqpSessionInfo* sessionInfo = LocalSessions->Create(sessionId, workerId, database, dbCounters, supportsBalancing, GetSessionIdleDuration());
KQP_PROXY_LOG_D(requestInfo << "Created new session"
<< ", sessionId: " << sessionInfo->SessionId
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.h b/ydb/core/kqp/proxy/kqp_proxy_service.h
index e6d44a176ca..d71b5005594 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_service.h
+++ b/ydb/core/kqp/proxy/kqp_proxy_service.h
@@ -78,23 +78,26 @@ struct TKqpSessionInfo {
TActorId WorkerId;
TString Database;
TKqpDbCountersPtr DbCounters;
- TInstant LastRequestAt;
- TInstant CreatedAt;
TInstant ShutdownStartedAt;
std::vector<i32> ReadyPos;
-
- TKqpSessionInfo(const TString& sessionId, const TActorId& workerId,
- const TString& database, TKqpDbCountersPtr dbCounters, std::vector<i32>&& pos)
+ NActors::TMonotonic IdleTimeout;
+ // position in the idle list.
+ std::list<TKqpSessionInfo*>::iterator IdlePos;
+
+ TKqpSessionInfo(
+ const TString& sessionId, const TActorId& workerId,
+ const TString& database, TKqpDbCountersPtr dbCounters, std::vector<i32>&& pos,
+ NActors::TMonotonic idleTimeout,
+ std::list<TKqpSessionInfo*>::iterator idlePos)
: SessionId(sessionId)
, WorkerId(workerId)
, Database(database)
, DbCounters(dbCounters)
, ShutdownStartedAt()
, ReadyPos(std::move(pos))
+ , IdleTimeout(idleTimeout)
+ , IdlePos(idlePos)
{
- auto now = TAppData::TimeProvider->Now();
- LastRequestAt = now;
- CreatedAt = now;
}
};
@@ -113,7 +116,6 @@ struct TSimpleResourceStats {
struct TPeerStats {
TSimpleResourceStats LocalSessionCount;
TSimpleResourceStats CrossAZSessionCount;
-
TSimpleResourceStats LocalCpu;
TSimpleResourceStats CrossAZCpu;
@@ -136,6 +138,7 @@ class TLocalSessionsRegistry {
THashMap<TString, ui32> SessionsCountPerDatabase;
std::vector<std::vector<TString>> ReadySessions;
TIntrusivePtr<IRandomProvider> RandomProvider;
+ std::list<TKqpSessionInfo*> IdleSessions;
public:
TLocalSessionsRegistry(TIntrusivePtr<IRandomProvider> randomProvider)
@@ -144,7 +147,8 @@ public:
{}
TKqpSessionInfo* Create(const TString& sessionId, const TActorId& workerId,
- const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing)
+ const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing,
+ TDuration idleDuration)
{
std::vector<i32> pos(2, -1);
pos[0] = ReadySessions[0].size();
@@ -155,13 +159,30 @@ public:
ReadySessions[1].push_back(sessionId);
}
- auto result = LocalSessions.emplace(sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos)));
+ auto result = LocalSessions.emplace(
+ sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos),
+ NActors::TActivationContext::Monotonic() + idleDuration, IdleSessions.end()));
SessionsCountPerDatabase[database]++;
Y_VERIFY(result.second, "Duplicate session id!");
TargetIdIndex.emplace(workerId, sessionId);
+ StartIdleCheck(&(result.first->second), idleDuration);
return &result.first->second;
}
+ const TKqpSessionInfo* GetIdleSession(const NActors::TMonotonic& now) {
+ if (IdleSessions.empty()) {
+ return nullptr;
+ }
+
+ const TKqpSessionInfo* candidate = (*IdleSessions.begin());
+ if (candidate->IdleTimeout > now) {
+ return nullptr;
+ }
+
+ IdleSessions.erase(IdleSessions.begin());
+ return candidate;
+ }
+
const THashSet<TString>& GetShutdownInFlight() const {
return ShutdownInFlightSessions;
}
@@ -196,6 +217,31 @@ public:
return ShutdownInFlightSessions.size();
}
+ void StartIdleCheck(const TKqpSessionInfo* sessionInfo, const TDuration idleDuration) {
+ if (!sessionInfo)
+ return;
+
+ TKqpSessionInfo* info = const_cast<TKqpSessionInfo*>(sessionInfo);
+
+ info->IdleTimeout = NActors::TActivationContext::Monotonic() + idleDuration;
+ if (info->IdlePos != IdleSessions.end()) {
+ IdleSessions.erase(info->IdlePos);
+ }
+
+ info->IdlePos = IdleSessions.insert(IdleSessions.end(), info);
+ }
+
+ void StopIdleCheck(const TKqpSessionInfo* sessionInfo) {
+ if (!sessionInfo)
+ return;
+
+ TKqpSessionInfo* info = const_cast<TKqpSessionInfo*>(sessionInfo);
+ if (info->IdlePos != IdleSessions.end()) {
+ IdleSessions.erase(info->IdlePos);
+ info->IdlePos = IdleSessions.end();
+ }
+ }
+
void Erase(const TString& sessionId) {
auto it = LocalSessions.find(sessionId);
if (it != LocalSessions.end()) {
@@ -208,6 +254,10 @@ public:
}
RemoveSessionFromLists(&(it->second));
+ if (it->second.IdlePos != IdleSessions.end()) {
+ IdleSessions.erase(it->second.IdlePos);
+ }
+
ShutdownInFlightSessions.erase(sessionId);
TargetIdIndex.erase(it->second.WorkerId);
LocalSessions.erase(it);
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index eeee15d6a2c..97b8506f311 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1172,6 +1172,7 @@ message TTableServiceConfig {
optional uint32 CompileQueryCacheTTLSec = 20 [default = 0];
optional TQueryReplayConfig QueryReplayConfig = 21;
optional bool EnableKqpSessionActor = 23 [default = true];
+ optional uint32 SessionIdleDurationSeconds = 24 [default = 600];
};
// Config describes immediate controls and allows