aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-01-13 15:49:50 +0300
committergvit <gvit@ydb.tech>2023-01-13 15:49:50 +0300
commite561f72d1597d6d2186e70ca00337e30192e77e1 (patch)
tree0962c290812ab3c53efdd7fe893fca9b9d89106b
parent8e966540118fbe292cd9027cf68d2a1780a2b779 (diff)
downloadydb-e561f72d1597d6d2186e70ca00337e30192e77e1.tar.gz
implement idle checks in kqp proxy service
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp64
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.h65
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp63
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_actor.cpp12
-rw-r--r--ydb/core/protos/config.proto1
5 files changed, 121 insertions, 84 deletions
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 8299c070ba..24984a56f8 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -107,6 +107,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
EvReadyToPublishResources = EventSpaceBegin(TEvents::ES_PRIVATE),
EvCollectPeerProxyData,
EvOnRequestTimeout,
+ EvCloseIdleSessions,
};
struct TEvReadyToPublishResources : public TEventLocal<TEvReadyToPublishResources, EEv::EvReadyToPublishResources> {};
@@ -119,6 +120,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
TEvOnRequestTimeout(ui64 requestId, TDuration timeout): RequestId(requestId), Timeout(timeout) {};
};
+
+ struct TEvCloseIdleSessions : public TEventLocal<TEvCloseIdleSessions, EEv::EvCloseIdleSessions> {};
};
public:
@@ -196,6 +199,42 @@ public:
PublishResourceUsage();
AskSelfNodeInfo();
SendWhiteboardRequest();
+ ScheduleIdleSessionCheck();
+ }
+
+ TDuration GetSessionIdleDuration() const {
+ return TDuration::Seconds(TableServiceConfig.GetSessionIdleDurationSeconds());
+ }
+
+ void ScheduleIdleSessionCheck() {
+ if (!ShutdownState) {
+ const TDuration IdleSessionsCheckInterval = TDuration::Seconds(2);
+ Schedule(IdleSessionsCheckInterval, new TEvPrivate::TEvCloseIdleSessions());
+ }
+ }
+
+ void Handle(TEvPrivate::TEvCloseIdleSessions::TPtr&) {
+ CheckIdleSessions();
+ ScheduleIdleSessionCheck();
+ }
+
+ void CheckIdleSessions(const ui32 maxSessionsToClose = 10) {
+ ui32 closedIdleSessions = 0;
+ const NActors::TMonotonic now = TActivationContext::Monotonic();
+ while(true) {
+ const TKqpSessionInfo* sessionInfo = LocalSessions->GetIdleSession(now);
+ if (sessionInfo == nullptr || closedIdleSessions > maxSessionsToClose)
+ break;
+
+ SendSessionClose(sessionInfo);
+ ++closedIdleSessions;
+ }
+ }
+
+ void SendSessionClose(const TKqpSessionInfo* sessionInfo) {
+ auto closeSessionEv = std::make_unique<TEvKqp::TEvCloseSessionRequest>();
+ closeSessionEv->Record.MutableRequest()->SetSessionId(sessionInfo->SessionId);
+ Send(sessionInfo->WorkerId, closeSessionEv.release());
}
void AskSelfNodeInfo() {
@@ -506,6 +545,7 @@ public:
TActorId targetId;
if (sessionInfo) {
targetId = sessionInfo->WorkerId;
+ LocalSessions->StopIdleCheck(sessionInfo);
} else {
targetId = TryGetSessionTargetActor(request.GetSessionId(), requestInfo, requestId);
if (!targetId) {
@@ -566,6 +606,21 @@ public:
TActorId targetId;
if (sessionInfo) {
targetId = sessionInfo->WorkerId;
+ const bool isIdle = LocalSessions->IsSessionIdle(sessionInfo);
+ if (isIdle) {
+ LocalSessions->StopIdleCheck(sessionInfo);
+ LocalSessions->StartIdleCheck(sessionInfo, GetSessionIdleDuration());
+ }
+
+ auto result = std::make_unique<TEvKqp::TEvPingSessionResponse>();
+ auto& record = result->Record;
+ record.SetStatus(Ydb::StatusIds::SUCCESS);
+ auto sessionStatus = isIdle
+ ? Ydb::Table::KeepAliveResult::SESSION_STATUS_READY
+ : Ydb::Table::KeepAliveResult::SESSION_STATUS_BUSY;
+ record.MutableResponse()->SetSessionStatus(sessionStatus);
+ Send(SelfId(), result.release(), IEventHandle::FlagTrackDelivery, requestId);
+ return;
} else {
targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId);
if (!targetId) {
@@ -594,6 +649,11 @@ public:
return;
}
+ const TKqpSessionInfo* info = LocalSessions->FindPtr(proxyRequest->SessionId);
+ if (info) {
+ LocalSessions->StartIdleCheck(info, GetSessionIdleDuration());
+ }
+
LogResponse(proxyRequest->TraceId, ev->Get()->Record, proxyRequest->DbCounters);
Send(proxyRequest->Sender, ev->Release().Release(), 0, proxyRequest->SenderCookie);
@@ -946,6 +1006,7 @@ public:
hFunc(TEvPrivate::TEvOnRequestTimeout, Handle);
hFunc(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse, Handle);
hFunc(TEvKqp::TEvCreateSessionResponse, ForwardEvent);
+ hFunc(TEvPrivate::TEvCloseIdleSessions, Handle);
default:
Y_FAIL("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s",
ev->GetTypeRewrite(), ev->HasEvent() ? ev->GetBase()->ToString().data() : "serialized?");
@@ -1102,7 +1163,8 @@ private:
IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, ModuleResolverState, Counters);
auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId);
- TKqpSessionInfo* sessionInfo = LocalSessions->Create(sessionId, workerId, database, dbCounters, supportsBalancing);
+ TKqpSessionInfo* sessionInfo = LocalSessions->Create(
+ sessionId, workerId, database, dbCounters, supportsBalancing, GetSessionIdleDuration());
KQP_PROXY_LOG_D(requestInfo << "Created new session"
<< ", sessionId: " << sessionInfo->SessionId
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.h b/ydb/core/kqp/proxy_service/kqp_proxy_service.h
index 11f1ce1aef..8dc1b9622d 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.h
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.h
@@ -79,23 +79,24 @@ struct TKqpSessionInfo {
TActorId WorkerId;
TString Database;
TKqpDbCountersPtr DbCounters;
- TInstant LastRequestAt;
- TInstant CreatedAt;
TInstant ShutdownStartedAt;
std::vector<i32> ReadyPos;
+ NActors::TMonotonic IdleTimeout;
+ // position in the idle list.
+ std::list<TKqpSessionInfo*>::iterator IdlePos;
TKqpSessionInfo(const TString& sessionId, const TActorId& workerId,
- const TString& database, TKqpDbCountersPtr dbCounters, std::vector<i32>&& pos)
+ const TString& database, TKqpDbCountersPtr dbCounters, std::vector<i32>&& pos,
+ NActors::TMonotonic idleTimeout, std::list<TKqpSessionInfo*>::iterator idlePos)
: SessionId(sessionId)
, WorkerId(workerId)
, Database(database)
, DbCounters(dbCounters)
, ShutdownStartedAt()
, ReadyPos(std::move(pos))
+ , IdleTimeout(idleTimeout)
+ , IdlePos(idlePos)
{
- auto now = TAppData::TimeProvider->Now();
- LastRequestAt = now;
- CreatedAt = now;
}
};
@@ -137,6 +138,7 @@ class TLocalSessionsRegistry {
THashMap<TString, ui32> SessionsCountPerDatabase;
std::vector<std::vector<TString>> ReadySessions;
TIntrusivePtr<IRandomProvider> RandomProvider;
+ std::list<TKqpSessionInfo*> IdleSessions;
public:
TLocalSessionsRegistry(TIntrusivePtr<IRandomProvider> randomProvider)
@@ -145,7 +147,8 @@ public:
{}
TKqpSessionInfo* Create(const TString& sessionId, const TActorId& workerId,
- const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing)
+ const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing,
+ TDuration idleDuration)
{
std::vector<i32> pos(2, -1);
pos[0] = ReadySessions[0].size();
@@ -157,10 +160,12 @@ public:
}
auto result = LocalSessions.emplace(sessionId,
- TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos)));
+ TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos),
+ NActors::TActivationContext::Monotonic() + idleDuration, IdleSessions.end()));
SessionsCountPerDatabase[database]++;
Y_VERIFY(result.second, "Duplicate session id!");
TargetIdIndex.emplace(workerId, sessionId);
+ StartIdleCheck(&(result.first->second), idleDuration);
return &result.first->second;
}
@@ -176,6 +181,49 @@ public:
return ptr;
}
+ bool IsSessionIdle(const TKqpSessionInfo* sessionInfo) const {
+ return sessionInfo->IdlePos != IdleSessions.end();
+ }
+
+ const TKqpSessionInfo* GetIdleSession(const NActors::TMonotonic& now) {
+ if (IdleSessions.empty()) {
+ return nullptr;
+ }
+
+ const TKqpSessionInfo* candidate = (*IdleSessions.begin());
+ if (candidate->IdleTimeout > now) {
+ return nullptr;
+ }
+
+ StopIdleCheck(candidate);
+ return candidate;
+ }
+
+ void StartIdleCheck(const TKqpSessionInfo* sessionInfo, const TDuration idleDuration) {
+ if (!sessionInfo)
+ return;
+
+ TKqpSessionInfo* info = const_cast<TKqpSessionInfo*>(sessionInfo);
+
+ info->IdleTimeout = NActors::TActivationContext::Monotonic() + idleDuration;
+ if (info->IdlePos != IdleSessions.end()) {
+ IdleSessions.erase(info->IdlePos);
+ }
+
+ info->IdlePos = IdleSessions.insert(IdleSessions.end(), info);
+ }
+
+ void StopIdleCheck(const TKqpSessionInfo* sessionInfo) {
+ if (!sessionInfo)
+ return;
+
+ TKqpSessionInfo* info = const_cast<TKqpSessionInfo*>(sessionInfo);
+ if (info->IdlePos != IdleSessions.end()) {
+ IdleSessions.erase(info->IdlePos);
+ info->IdlePos = IdleSessions.end();
+ }
+ }
+
TKqpSessionInfo* PickSessionToShutdown(bool force, ui32 minReasonableToKick) {
auto& sessions = force ? ReadySessions.at(0) : ReadySessions.at(1);
if (sessions.size() >= minReasonableToKick) {
@@ -209,6 +257,7 @@ public:
}
}
+ StopIdleCheck(&(it->second));
RemoveSessionFromLists(&(it->second));
ShutdownInFlightSessions.erase(sessionId);
TargetIdIndex.erase(it->second.WorkerId);
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 9c27cdcbbe..5eda7bcb2b 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -246,7 +246,6 @@ public:
Config->FeatureFlags = AppData()->FeatureFlags;
Become(&TKqpSessionActor::ReadyState);
- StartIdleTimer();
}
TString LogPrefix() const {
@@ -560,8 +559,6 @@ public:
return;
}
- StopIdleTimer();
-
CompileQuery();
}
@@ -1825,22 +1822,6 @@ public:
}
}
- void Handle(TEvKqp::TEvPingSessionRequest::TPtr& ev) {
- ui64 proxyRequestId = ev->Cookie;
- auto result = std::make_unique<TEvKqp::TEvPingSessionResponse>();
- auto& record = result->Record;
- record.SetStatus(Ydb::StatusIds::SUCCESS);
- auto sessionStatus = CurrentStateFunc() == &TThis::ReadyState
- ? Ydb::Table::KeepAliveResult::SESSION_STATUS_READY
- : Ydb::Table::KeepAliveResult::SESSION_STATUS_BUSY;
- record.MutableResponse()->SetSessionStatus(sessionStatus);
- if (CurrentStateFunc() == &TThis::ReadyState) {
- StartIdleTimer();
- }
-
- Send(ev->Sender, result.release(), 0, proxyRequestId);
- }
-
void HandleReady(TEvKqp::TEvCloseSessionRequest::TPtr&) {
LOG_I("Session closed due to explicit close event");
Counters->ReportSessionActorClosedRequest(Settings.DbCounters);
@@ -1891,35 +1872,6 @@ public:
}
}
- void StartIdleTimer() {
- StopIdleTimer();
-
- ++IdleTimerId;
- auto idleDuration = TDuration::Seconds(*Config->_KqpSessionIdleTimeoutSec.Get());
- IdleTimerActorId = CreateLongTimer(TlsActivationContext->AsActorContext(), idleDuration,
- new IEventHandle(SelfId(), SelfId(), new TEvKqp::TEvIdleTimeout(IdleTimerId)));
- LOG_D("Created long timer for idle timeout, timer id: " << IdleTimerId
- << ", duration: " << idleDuration << ", actor: " << IdleTimerActorId);
- }
-
- void StopIdleTimer() {
- if (IdleTimerActorId) {
- LOG_D("Destroying long timer actor for idle timout: " << IdleTimerActorId);
- Send(IdleTimerActorId, new TEvents::TEvPoisonPill());
- }
- IdleTimerActorId = TActorId();
- }
-
- void Handle(TEvKqp::TEvIdleTimeout::TPtr& ev) {
- auto timerId = ev->Get()->TimerId;
-
- if (timerId == IdleTimerId) {
- LOG_N("SessionActor idle timeout, worker destroyed");
- Counters->ReportSessionActorClosedIdle(Settings.DbCounters);
- CleanupAndPassAway();
- }
- }
-
void SendRollbackRequest(TKqpTransactionContext* txCtx) {
if (QueryState) {
LWTRACK(KqpSessionSendRollback, QueryState->Orbit, QueryState->CurrentTx);
@@ -2070,7 +2022,6 @@ public:
TransactionsToBeAborted.clear();
CleanupCtx.reset();
QueryState.reset();
- StartIdleTimer();
Become(&TKqpSessionActor::ReadyState);
}
ExecuterId = TActorId{};
@@ -2125,8 +2076,6 @@ public:
switch (ev->GetTypeRewrite()) {
HFunc(TEvKqp::TEvQueryRequest, HandleReady);
- hFunc(TEvKqp::TEvPingSessionRequest, Handle);
- hFunc(TEvKqp::TEvIdleTimeout, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
@@ -2147,11 +2096,9 @@ public:
hFunc(TEvKqp::TEvQueryRequest, HandleCompile);
hFunc(TEvKqp::TEvCompileResponse, HandleCompile);
- hFunc(TEvKqp::TEvPingSessionRequest, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleCompile);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
- hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleTxResponse);
hFunc(TEvents::TEvWakeup, HandleWakeup);
default:
@@ -2177,11 +2124,9 @@ public:
hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleExecute);
hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, HandleExecute);
- hFunc(TEvKqp::TEvPingSessionRequest, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
- hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
hFunc(TEvents::TEvWakeup, HandleWakeup);
// always come from WorkerActor
@@ -2205,11 +2150,9 @@ public:
hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop);
hFunc(TEvKqp::TEvCompileResponse, HandleNoop);
- hFunc(TEvKqp::TEvPingSessionRequest, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleCleanup);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
- hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
hFunc(TEvents::TEvWakeup, HandleNoop);
// always come from WorkerActor
@@ -2229,11 +2172,9 @@ public:
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleTopicOps);
- hFunc(TEvKqp::TEvPingSessionRequest, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleTopicOps);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
- hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
default:
UnexpectedEvent("TopicOpsState", ev);
}
@@ -2406,11 +2347,7 @@ private:
std::vector<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted;
ui64 EvictedTx = 0;
std::unique_ptr<TEvKqp::TEvQueryResponse> QueryResponse;
-
- TActorId IdleTimerActorId;
- ui32 IdleTimerId = 0;
std::optional<TSessionShutdownState> ShutdownState;
-
TULIDGenerator UlidGen;
};
diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
index 02bacc3086..1be1327762 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
@@ -852,18 +852,6 @@ private:
}
}
- bool ReplyPingStatus(const TActorId& sender, ui64 proxyRequestId, bool ready, const TActorContext& ctx) {
- auto ev = MakeHolder<TEvKqp::TEvPingSessionResponse>();
- auto& record = ev->Record;
- record.SetStatus(Ydb::StatusIds::SUCCESS);
- record.MutableResponse()->SetSessionStatus(ready
- ? Ydb::Table::KeepAliveResult::SESSION_STATUS_READY
- : Ydb::Table::KeepAliveResult::SESSION_STATUS_BUSY);
-
- AddTrailingInfo(record);
- return ctx.Send(sender, ev.Release(), 0, proxyRequestId);
- }
-
bool ReplyProcessError(const TActorId& sender, ui64 proxyRequestId,
Ydb::StatusIds::StatusCode ydbStatus, const TString& message, const TActorContext& ctx)
{
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index f67cb56292..fb51130cc5 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1213,6 +1213,7 @@ message TTableServiceConfig {
optional bool EnableKqpScanQuerySourceRead = 26 [default = false];
optional bool EnableKqpDataQuerySourceRead = 27 [default = false];
+ optional uint64 SessionIdleDurationSeconds = 28 [default = 600];
};
// Config describes immediate controls and allows