aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsvartmetal <svartmetal@yandex-team.com>2022-10-07 18:51:28 +0300
committersvartmetal <svartmetal@yandex-team.com>2022-10-07 18:51:28 +0300
commit3eedff7d70100cbe7ae67c9ec3f6f1c25526fd40 (patch)
treeb88816500e79a5516deac65a469de7055e2f2c68
parent50b7367df2b1a0cb024fafe65f5254a81c8c895d (diff)
downloadydb-3eedff7d70100cbe7ae67c9ec3f6f1c25526fd40.tar.gz
Revert commit rXXXXXX
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp43
-rw-r--r--ydb/core/kqp/kqp_worker_actor.cpp64
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_service.cpp55
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_service.h72
-rw-r--r--ydb/core/protos/config.proto1
5 files changed, 123 insertions, 112 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index b73175be91..b6ee60731a 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -177,6 +177,7 @@ public:
Config->FeatureFlags = AppData()->FeatureFlags;
Become(&TKqpSessionActor::ReadyState);
+ StartIdleTimer();
}
NYql::TKikimrQueryDeadlines GetQueryDeadlines(const NKikimrKqp::TQueryRequest& queryRequest) {
@@ -416,6 +417,8 @@ public:
return;
}
+ StopIdleTimer();
+
CompileQuery();
}
@@ -1754,6 +1757,8 @@ public:
? Ydb::Table::KeepAliveResult::SESSION_STATUS_READY
: Ydb::Table::KeepAliveResult::SESSION_STATUS_BUSY;
record.MutableResponse()->SetSessionStatus(sessionStatus);
+ StartIdleTimer();
+
Send(ev->Sender, result.release(), 0, proxyRequestId);
}
@@ -1808,6 +1813,35 @@ public:
}
}
+ void StartIdleTimer() {
+ StopIdleTimer();
+
+ ++IdleTimerId;
+ auto idleDuration = TDuration::Seconds(*Config->_KqpSessionIdleTimeoutSec.Get());
+ IdleTimerActorId = CreateLongTimer(TlsActivationContext->AsActorContext(), idleDuration,
+ new IEventHandle(SelfId(), SelfId(), new TEvKqp::TEvIdleTimeout(IdleTimerId)));
+ LOG_D("Created long timer for idle timeout, timer id: " << IdleTimerId
+ << ", duration: " << idleDuration << ", actor: " << IdleTimerActorId);
+ }
+
+ void StopIdleTimer() {
+ if (IdleTimerActorId) {
+ LOG_D("Destroying long timer actor for idle timout: " << IdleTimerActorId);
+ Send(IdleTimerActorId, new TEvents::TEvPoisonPill());
+ }
+ IdleTimerActorId = TActorId();
+ }
+
+ void Handle(TEvKqp::TEvIdleTimeout::TPtr& ev) {
+ auto timerId = ev->Get()->TimerId;
+
+ if (timerId == IdleTimerId) {
+ LOG_N(TKqpRequestInfo("", SessionId) << "SessionActor idle timeout, worker destroyed");
+ Counters->ReportSessionActorClosedIdle(Settings.DbCounters);
+ FinalCleanup();
+ }
+ }
+
void SendRollbackRequest(TKqpTransactionContext* txCtx) {
if (QueryState) {
LWTRACK(KqpSessionSendRollback, QueryState->Orbit, QueryState->CurrentTx);
@@ -1953,6 +1987,7 @@ public:
TransactionsToBeAborted.clear();
CleanupCtx.reset();
QueryState.reset();
+ StartIdleTimer();
Become(&TKqpSessionActor::ReadyState);
}
}
@@ -2007,6 +2042,7 @@ public:
HFunc(TEvKqp::TEvQueryRequest, HandleReady);
hFunc(TEvKqp::TEvPingSessionRequest, Handle);
+ hFunc(TEvKqp::TEvIdleTimeout, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
@@ -2030,6 +2066,7 @@ public:
hFunc(TEvKqp::TEvCloseSessionRequest, HandleCompile);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
+ hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
default:
UnexpectedEvent("CompileState", ev);
}
@@ -2057,6 +2094,7 @@ public:
hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
+ hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
// always come from WorkerActor
hFunc(TEvKqp::TEvQueryResponse, ForwardResponse);
@@ -2083,6 +2121,7 @@ public:
hFunc(TEvKqp::TEvCloseSessionRequest, HandleCleanup);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
+ hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
// always come from WorkerActor
hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup);
@@ -2105,6 +2144,7 @@ public:
hFunc(TEvKqp::TEvCloseSessionRequest, HandleTopicOps);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
+ hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
default:
UnexpectedEvent("TopicOpsState", ev);
}
@@ -2261,6 +2301,9 @@ private:
std::vector<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted;
ui64 EvictedTx = 0;
std::unique_ptr<TEvKqp::TEvQueryResponse> QueryResponse;
+
+ TActorId IdleTimerActorId;
+ ui32 IdleTimerId = 0;
std::optional<TSessionShutdownState> ShutdownState;
TULIDGenerator UlidGen;
diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp
index 1e8d50ccb0..9f2dfb521e 100644
--- a/ydb/core/kqp/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/kqp_worker_actor.cpp
@@ -125,6 +125,7 @@ public:
, Config(MakeIntrusive<TKikimrConfiguration>())
, CreationTime(TInstant::Now())
, QueryId(0)
+ , IdleTimerId(0)
, ShutdownState(std::nullopt)
{
Y_VERIFY(ModuleResolverState);
@@ -160,6 +161,7 @@ public:
AppData(ctx)->FunctionRegistry, !Settings.LongSession);
Become(&TKqpWorkerActor::ReadyState);
+ StartIdleTimer(ctx);
}
void HandleReady(TEvKqp::TEvCloseSessionRequest::TPtr &ev, const TActorContext &ctx) {
@@ -188,6 +190,8 @@ public:
return;
}
+ StartIdleTimer(ctx);
+
ReplyPingStatus(ev->Sender, proxyRequestId, true, ctx);
}
@@ -377,6 +381,8 @@ public:
// some kind of internal query? or verify here?
}
+ StopIdleTimer(ctx);
+
if (CompileQuery(ctx)) {
if (QueryState) {
QueryState->CpuTime += timer.GetTime();
@@ -405,6 +411,19 @@ public:
}
}
+ void HandleReady(TEvKqp::TEvIdleTimeout::TPtr &ev, const TActorContext &ctx) {
+ auto timerId = ev->Get()->TimerId;
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_WORKER, "Received TEvIdleTimeout in ready state, timer id: "
+ << timerId << ", sender: " << ev->Sender);
+
+ if (timerId == IdleTimerId) {
+ LOG_NOTICE_S(ctx, NKikimrServices::KQP_WORKER, TKqpRequestInfo("", SessionId)
+ << "Worker idle timeout, worker destroyed");
+ Counters->ReportWorkerClosedIdle(Settings.DbCounters);
+ FinalCleanup(ctx);
+ }
+ }
+
void HandleCompileQuery(TEvKqp::TEvCompileResponse::TPtr &ev, const TActorContext &ctx) {
auto compileResult = ev->Get()->CompileResult;
@@ -413,6 +432,7 @@ public:
if (compileResult->Status != Ydb::StatusIds::SUCCESS) {
if (ReplyQueryCompileError(compileResult, ctx)) {
+ StartIdleTimer(ctx);
Become(&TKqpWorkerActor::ReadyState);
} else {
FinalCleanup(ctx);
@@ -435,6 +455,7 @@ public:
if (queryRequest.GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE) {
if (ReplyPrepareResult(compileResult, ctx)) {
+ StartIdleTimer(ctx);
Become(&TKqpWorkerActor::ReadyState);
} else {
FinalCleanup(ctx);
@@ -486,6 +507,11 @@ public:
ReplyPingStatus(ev->Sender, proxyRequestId, false, ctx);
}
+ void HandleCompileQuery(TEvKqp::TEvIdleTimeout::TPtr &ev, const TActorContext &ctx) {
+ Y_UNUSED(ev);
+ Y_UNUSED(ctx);
+ }
+
void HandlePerformQuery(TEvKqp::TEvQueryRequest::TPtr &ev, const TActorContext &ctx) {
ReplyBusy(ev, ctx);
}
@@ -566,6 +592,11 @@ public:
}
}
+ void HandlePerformQuery(TEvKqp::TEvIdleTimeout::TPtr &ev, const TActorContext &ctx) {
+ Y_UNUSED(ev);
+ Y_UNUSED(ctx);
+ }
+
void HandlePerformCleanup(TEvKqp::TEvQueryRequest::TPtr &ev, const TActorContext &ctx) {
ui64 proxyRequestId = ev->Cookie;
auto& event = ev->Get()->Record;
@@ -646,6 +677,11 @@ public:
}
}
+ void HandlePerformCleanup(TEvKqp::TEvIdleTimeout::TPtr &ev, const TActorContext &ctx) {
+ Y_UNUSED(ev);
+ Y_UNUSED(ctx);
+ }
+
STFUNC(ReadyState) {
try {
switch (ev->GetTypeRewrite()) {
@@ -654,6 +690,7 @@ public:
HFunc(TEvKqp::TEvCloseSessionRequest, HandleReady);
HFunc(TEvKqp::TEvPingSessionRequest, HandleReady);
HFunc(TEvKqp::TEvContinueProcess, HandleReady);
+ HFunc(TEvKqp::TEvIdleTimeout, HandleReady);
HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown);
HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown);
default:
@@ -671,6 +708,7 @@ public:
HFunc(TEvKqp::TEvCompileResponse, HandleCompileQuery);
HFunc(TEvKqp::TEvCloseSessionRequest, HandleCompileQuery);
HFunc(TEvKqp::TEvPingSessionRequest, HandleCompileQuery);
+ HFunc(TEvKqp::TEvIdleTimeout, HandleCompileQuery);
HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown);
HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown);
default:
@@ -689,6 +727,7 @@ public:
HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformQuery);
HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformQuery);
HFunc(TEvKqp::TEvContinueProcess, HandlePerformQuery);
+ HFunc(TEvKqp::TEvIdleTimeout, HandlePerformQuery);
HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown);
HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown);
default:
@@ -707,6 +746,7 @@ public:
HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformCleanup);
HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformCleanup);
HFunc(TEvKqp::TEvContinueProcess, HandlePerformCleanup);
+ HFunc(TEvKqp::TEvIdleTimeout, HandlePerformCleanup);
HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown);
HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown);
default:
@@ -1162,6 +1202,7 @@ private:
CleanupState->Start = TInstant::Now();
if (isFinal) {
+ StopIdleTimer(ctx);
Counters->ReportQueriesPerWorker(Settings.DbCounters, QueryId);
MakeNewQueryState();
@@ -1210,6 +1251,7 @@ private:
Die(ctx);
} else {
if (ReplyQueryResult(ctx)) {
+ StartIdleTimer(ctx);
Become(&TKqpWorkerActor::ReadyState);
} else {
FinalCleanup(ctx);
@@ -2013,6 +2055,26 @@ private:
}
}
+ void StartIdleTimer(const TActorContext& ctx) {
+ StopIdleTimer(ctx);
+
+ ++IdleTimerId;
+ auto idleDuration = TDuration::Seconds(Config->_KqpSessionIdleTimeoutSec.Get().GetRef());
+ IdleTimerActorId = CreateLongTimer(ctx, idleDuration,
+ new IEventHandle(ctx.SelfID, ctx.SelfID, new TEvKqp::TEvIdleTimeout(IdleTimerId)));
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_WORKER, "Created long timer for idle timeout, timer id: " << IdleTimerId
+ << ", duration: " << idleDuration << ", actor: " << IdleTimerActorId);
+ }
+
+ void StopIdleTimer(const TActorContext& ctx) {
+ if (IdleTimerActorId) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_WORKER, "Destroying long timer actor for idle timout: "
+ << IdleTimerActorId);
+ ctx.Send(IdleTimerActorId, new TEvents::TEvPoisonPill());
+ }
+ IdleTimerActorId = TActorId();
+ }
+
IKikimrQueryExecutor::TExecuteSettings CreateRollbackSettings() {
YQL_ENSURE(QueryState);
@@ -2099,6 +2161,8 @@ private:
ui32 QueryId;
THolder<TKqpQueryState> QueryState;
THolder<TKqpCleanupState> CleanupState;
+ ui32 IdleTimerId;
+ TActorId IdleTimerActorId;
std::optional<TSessionShutdownState> ShutdownState;
};
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
index ed5356d503..0fb49de196 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
@@ -123,11 +123,11 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
EvReadyToPublishResources = EventSpaceBegin(TEvents::ES_PRIVATE),
EvCollectPeerProxyData,
EvOnRequestTimeout,
- EvCloseIdleSessions,
};
struct TEvReadyToPublishResources : public TEventLocal<TEvReadyToPublishResources, EEv::EvReadyToPublishResources> {};
struct TEvCollectPeerProxyData: public TEventLocal<TEvCollectPeerProxyData, EEv::EvCollectPeerProxyData> {};
+
struct TEvOnRequestTimeout: public TEventLocal<TEvOnRequestTimeout, EEv::EvOnRequestTimeout> {
public:
ui64 RequestId;
@@ -135,8 +135,6 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
TEvOnRequestTimeout(ui64 requestId, TDuration timeout): RequestId(requestId), Timeout(timeout) {};
};
-
- struct TEvCloseIdleSessions : public TEventLocal<TEvCloseIdleSessions, EEv::EvCloseIdleSessions> {};
};
public:
@@ -218,38 +216,6 @@ public:
PublishResourceUsage();
AskSelfNodeInfo();
SendWhiteboardRequest();
- ScheduleIdleSessionCheck();
- }
-
- void ScheduleIdleSessionCheck() {
- if (!ShutdownState) {
- const TDuration IdleSessionsCheckInterval = TDuration::Seconds(2);
- Schedule(IdleSessionsCheckInterval, new TEvPrivate::TEvCloseIdleSessions());
- }
- }
-
- void Handle(TEvPrivate::TEvCloseIdleSessions::TPtr&) {
- CheckIdleSessions();
- ScheduleIdleSessionCheck();
- }
-
- void CheckIdleSessions(const ui32 maxSessionsToClose = 10) {
- ui32 closedIdleSessions = 0;
- const NActors::TMonotonic now = TActivationContext::Monotonic();
- while(true) {
- const TKqpSessionInfo* sessionInfo = LocalSessions->GetIdleSession(now);
- if (sessionInfo == nullptr || closedIdleSessions > maxSessionsToClose)
- break;
-
- SendSessionClose(sessionInfo);
- ++closedIdleSessions;
- }
- }
-
- void SendSessionClose(const TKqpSessionInfo* sessionInfo) {
- auto closeSessionEv = std::make_unique<TEvKqp::TEvCloseSessionRequest>();
- closeSessionEv->Record.MutableRequest()->SetSessionId(sessionInfo->SessionId);
- Send(sessionInfo->WorkerId, closeSessionEv.release());
}
void AskSelfNodeInfo() {
@@ -376,7 +342,9 @@ public:
KQP_PROXY_LOG_I("Received tenant pool status, serving tenants: " << JoinRange(", ", Tenants.begin(), Tenants.end()));
for (auto& [_, sessionInfo] : *LocalSessions) {
if (!sessionInfo.Database.empty() && !Tenants.contains(sessionInfo.Database)) {
- SendSessionClose(&sessionInfo);
+ auto closeSessionEv = MakeHolder<TEvKqp::TEvCloseSessionRequest>();
+ closeSessionEv->Record.MutableRequest()->SetSessionId(sessionInfo.SessionId);
+ Send(sessionInfo.WorkerId, closeSessionEv.Release());
}
}
@@ -582,7 +550,6 @@ public:
TActorId targetId;
if (sessionInfo) {
targetId = sessionInfo->WorkerId;
- LocalSessions->StopIdleCheck(sessionInfo);
} else {
targetId = TryGetSessionTargetActor(request.GetSessionId(), requestInfo, requestId);
if (!targetId) {
@@ -628,10 +595,6 @@ public:
}
}
- TDuration GetSessionIdleDuration() const {
- return TDuration::Seconds(TableServiceConfig.GetSessionIdleDurationSeconds());
- }
-
void Handle(TEvKqp::TEvPingSessionRequest::TPtr& ev) {
auto& event = ev->Get()->Record;
auto& request = event.GetRequest();
@@ -647,9 +610,7 @@ public:
TActorId targetId;
if (sessionInfo) {
targetId = sessionInfo->WorkerId;
- LocalSessions->StopIdleCheck(sessionInfo);
} else {
- // forward request to kqp proxy on the different node.
targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId);
if (!targetId) {
return;
@@ -677,11 +638,6 @@ public:
return;
}
- const TKqpSessionInfo* info = LocalSessions->FindPtr(proxyRequest->SessionId);
- if (info) {
- LocalSessions->StartIdleCheck(info, GetSessionIdleDuration());
- }
-
LogResponse(proxyRequest->TraceId, ev->Get()->Record, proxyRequest->DbCounters);
Send(proxyRequest->Sender, ev->Release().Release(), 0, proxyRequest->SenderCookie);
@@ -1041,7 +997,6 @@ public:
hFunc(TEvPrivate::TEvOnRequestTimeout, Handle);
hFunc(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse, Handle);
hFunc(TEvKqp::TEvCreateSessionResponse, ForwardEvent);
- hFunc(TEvPrivate::TEvCloseIdleSessions, Handle);
default:
Y_FAIL("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s",
ev->GetTypeRewrite(), ev->HasEvent() ? ev->GetBase()->ToString().data() : "serialized?");
@@ -1202,7 +1157,7 @@ private:
? CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, ModuleResolverState, Counters)
: CreateKqpWorkerActor(SelfId(), sessionId, KqpSettings, workerSettings, ModuleResolverState, Counters);
auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(workerActor, TMailboxType::HTSwap, AppData()->UserPoolId);
- TKqpSessionInfo* sessionInfo = LocalSessions->Create(sessionId, workerId, database, dbCounters, supportsBalancing, GetSessionIdleDuration());
+ TKqpSessionInfo* sessionInfo = LocalSessions->Create(sessionId, workerId, database, dbCounters, supportsBalancing);
KQP_PROXY_LOG_D(requestInfo << "Created new session"
<< ", sessionId: " << sessionInfo->SessionId
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.h b/ydb/core/kqp/proxy/kqp_proxy_service.h
index d71b500559..e6d44a176c 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_service.h
+++ b/ydb/core/kqp/proxy/kqp_proxy_service.h
@@ -78,26 +78,23 @@ struct TKqpSessionInfo {
TActorId WorkerId;
TString Database;
TKqpDbCountersPtr DbCounters;
+ TInstant LastRequestAt;
+ TInstant CreatedAt;
TInstant ShutdownStartedAt;
std::vector<i32> ReadyPos;
- NActors::TMonotonic IdleTimeout;
- // position in the idle list.
- std::list<TKqpSessionInfo*>::iterator IdlePos;
-
- TKqpSessionInfo(
- const TString& sessionId, const TActorId& workerId,
- const TString& database, TKqpDbCountersPtr dbCounters, std::vector<i32>&& pos,
- NActors::TMonotonic idleTimeout,
- std::list<TKqpSessionInfo*>::iterator idlePos)
+
+ TKqpSessionInfo(const TString& sessionId, const TActorId& workerId,
+ const TString& database, TKqpDbCountersPtr dbCounters, std::vector<i32>&& pos)
: SessionId(sessionId)
, WorkerId(workerId)
, Database(database)
, DbCounters(dbCounters)
, ShutdownStartedAt()
, ReadyPos(std::move(pos))
- , IdleTimeout(idleTimeout)
- , IdlePos(idlePos)
{
+ auto now = TAppData::TimeProvider->Now();
+ LastRequestAt = now;
+ CreatedAt = now;
}
};
@@ -116,6 +113,7 @@ struct TSimpleResourceStats {
struct TPeerStats {
TSimpleResourceStats LocalSessionCount;
TSimpleResourceStats CrossAZSessionCount;
+
TSimpleResourceStats LocalCpu;
TSimpleResourceStats CrossAZCpu;
@@ -138,7 +136,6 @@ class TLocalSessionsRegistry {
THashMap<TString, ui32> SessionsCountPerDatabase;
std::vector<std::vector<TString>> ReadySessions;
TIntrusivePtr<IRandomProvider> RandomProvider;
- std::list<TKqpSessionInfo*> IdleSessions;
public:
TLocalSessionsRegistry(TIntrusivePtr<IRandomProvider> randomProvider)
@@ -147,8 +144,7 @@ public:
{}
TKqpSessionInfo* Create(const TString& sessionId, const TActorId& workerId,
- const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing,
- TDuration idleDuration)
+ const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing)
{
std::vector<i32> pos(2, -1);
pos[0] = ReadySessions[0].size();
@@ -159,30 +155,13 @@ public:
ReadySessions[1].push_back(sessionId);
}
- auto result = LocalSessions.emplace(
- sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos),
- NActors::TActivationContext::Monotonic() + idleDuration, IdleSessions.end()));
+ auto result = LocalSessions.emplace(sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos)));
SessionsCountPerDatabase[database]++;
Y_VERIFY(result.second, "Duplicate session id!");
TargetIdIndex.emplace(workerId, sessionId);
- StartIdleCheck(&(result.first->second), idleDuration);
return &result.first->second;
}
- const TKqpSessionInfo* GetIdleSession(const NActors::TMonotonic& now) {
- if (IdleSessions.empty()) {
- return nullptr;
- }
-
- const TKqpSessionInfo* candidate = (*IdleSessions.begin());
- if (candidate->IdleTimeout > now) {
- return nullptr;
- }
-
- IdleSessions.erase(IdleSessions.begin());
- return candidate;
- }
-
const THashSet<TString>& GetShutdownInFlight() const {
return ShutdownInFlightSessions;
}
@@ -217,31 +196,6 @@ public:
return ShutdownInFlightSessions.size();
}
- void StartIdleCheck(const TKqpSessionInfo* sessionInfo, const TDuration idleDuration) {
- if (!sessionInfo)
- return;
-
- TKqpSessionInfo* info = const_cast<TKqpSessionInfo*>(sessionInfo);
-
- info->IdleTimeout = NActors::TActivationContext::Monotonic() + idleDuration;
- if (info->IdlePos != IdleSessions.end()) {
- IdleSessions.erase(info->IdlePos);
- }
-
- info->IdlePos = IdleSessions.insert(IdleSessions.end(), info);
- }
-
- void StopIdleCheck(const TKqpSessionInfo* sessionInfo) {
- if (!sessionInfo)
- return;
-
- TKqpSessionInfo* info = const_cast<TKqpSessionInfo*>(sessionInfo);
- if (info->IdlePos != IdleSessions.end()) {
- IdleSessions.erase(info->IdlePos);
- info->IdlePos = IdleSessions.end();
- }
- }
-
void Erase(const TString& sessionId) {
auto it = LocalSessions.find(sessionId);
if (it != LocalSessions.end()) {
@@ -254,10 +208,6 @@ public:
}
RemoveSessionFromLists(&(it->second));
- if (it->second.IdlePos != IdleSessions.end()) {
- IdleSessions.erase(it->second.IdlePos);
- }
-
ShutdownInFlightSessions.erase(sessionId);
TargetIdIndex.erase(it->second.WorkerId);
LocalSessions.erase(it);
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 719212cb6e..e03275ec3b 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1172,7 +1172,6 @@ message TTableServiceConfig {
optional uint32 CompileQueryCacheTTLSec = 20 [default = 0];
optional TQueryReplayConfig QueryReplayConfig = 21;
optional bool EnableKqpSessionActor = 23 [default = true];
- optional uint32 SessionIdleDurationSeconds = 24 [default = 600];
};
// Config describes immediate controls and allows