diff options
author | gvit <gvit@ydb.tech> | 2023-01-13 15:49:50 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-01-13 15:49:50 +0300 |
commit | e561f72d1597d6d2186e70ca00337e30192e77e1 (patch) | |
tree | 0962c290812ab3c53efdd7fe893fca9b9d89106b | |
parent | 8e966540118fbe292cd9027cf68d2a1780a2b779 (diff) | |
download | ydb-e561f72d1597d6d2186e70ca00337e30192e77e1.tar.gz |
implement idle checks in kqp proxy service
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 64 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.h | 65 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 63 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_worker_actor.cpp | 12 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 |
5 files changed, 121 insertions, 84 deletions
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 8299c070ba..24984a56f8 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -107,6 +107,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> { EvReadyToPublishResources = EventSpaceBegin(TEvents::ES_PRIVATE), EvCollectPeerProxyData, EvOnRequestTimeout, + EvCloseIdleSessions, }; struct TEvReadyToPublishResources : public TEventLocal<TEvReadyToPublishResources, EEv::EvReadyToPublishResources> {}; @@ -119,6 +120,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> { TEvOnRequestTimeout(ui64 requestId, TDuration timeout): RequestId(requestId), Timeout(timeout) {}; }; + + struct TEvCloseIdleSessions : public TEventLocal<TEvCloseIdleSessions, EEv::EvCloseIdleSessions> {}; }; public: @@ -196,6 +199,42 @@ public: PublishResourceUsage(); AskSelfNodeInfo(); SendWhiteboardRequest(); + ScheduleIdleSessionCheck(); + } + + TDuration GetSessionIdleDuration() const { + return TDuration::Seconds(TableServiceConfig.GetSessionIdleDurationSeconds()); + } + + void ScheduleIdleSessionCheck() { + if (!ShutdownState) { + const TDuration IdleSessionsCheckInterval = TDuration::Seconds(2); + Schedule(IdleSessionsCheckInterval, new TEvPrivate::TEvCloseIdleSessions()); + } + } + + void Handle(TEvPrivate::TEvCloseIdleSessions::TPtr&) { + CheckIdleSessions(); + ScheduleIdleSessionCheck(); + } + + void CheckIdleSessions(const ui32 maxSessionsToClose = 10) { + ui32 closedIdleSessions = 0; + const NActors::TMonotonic now = TActivationContext::Monotonic(); + while(true) { + const TKqpSessionInfo* sessionInfo = LocalSessions->GetIdleSession(now); + if (sessionInfo == nullptr || closedIdleSessions > maxSessionsToClose) + break; + + SendSessionClose(sessionInfo); + ++closedIdleSessions; + } + } + + void SendSessionClose(const TKqpSessionInfo* sessionInfo) { + auto closeSessionEv = std::make_unique<TEvKqp::TEvCloseSessionRequest>(); + closeSessionEv->Record.MutableRequest()->SetSessionId(sessionInfo->SessionId); + Send(sessionInfo->WorkerId, closeSessionEv.release()); } void AskSelfNodeInfo() { @@ -506,6 +545,7 @@ public: TActorId targetId; if (sessionInfo) { targetId = sessionInfo->WorkerId; + LocalSessions->StopIdleCheck(sessionInfo); } else { targetId = TryGetSessionTargetActor(request.GetSessionId(), requestInfo, requestId); if (!targetId) { @@ -566,6 +606,21 @@ public: TActorId targetId; if (sessionInfo) { targetId = sessionInfo->WorkerId; + const bool isIdle = LocalSessions->IsSessionIdle(sessionInfo); + if (isIdle) { + LocalSessions->StopIdleCheck(sessionInfo); + LocalSessions->StartIdleCheck(sessionInfo, GetSessionIdleDuration()); + } + + auto result = std::make_unique<TEvKqp::TEvPingSessionResponse>(); + auto& record = result->Record; + record.SetStatus(Ydb::StatusIds::SUCCESS); + auto sessionStatus = isIdle + ? Ydb::Table::KeepAliveResult::SESSION_STATUS_READY + : Ydb::Table::KeepAliveResult::SESSION_STATUS_BUSY; + record.MutableResponse()->SetSessionStatus(sessionStatus); + Send(SelfId(), result.release(), IEventHandle::FlagTrackDelivery, requestId); + return; } else { targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId); if (!targetId) { @@ -594,6 +649,11 @@ public: return; } + const TKqpSessionInfo* info = LocalSessions->FindPtr(proxyRequest->SessionId); + if (info) { + LocalSessions->StartIdleCheck(info, GetSessionIdleDuration()); + } + LogResponse(proxyRequest->TraceId, ev->Get()->Record, proxyRequest->DbCounters); Send(proxyRequest->Sender, ev->Release().Release(), 0, proxyRequest->SenderCookie); @@ -946,6 +1006,7 @@ public: hFunc(TEvPrivate::TEvOnRequestTimeout, Handle); hFunc(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse, Handle); hFunc(TEvKqp::TEvCreateSessionResponse, ForwardEvent); + hFunc(TEvPrivate::TEvCloseIdleSessions, Handle); default: Y_FAIL("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s", ev->GetTypeRewrite(), ev->HasEvent() ? ev->GetBase()->ToString().data() : "serialized?"); @@ -1102,7 +1163,8 @@ private: IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, ModuleResolverState, Counters); auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId); - TKqpSessionInfo* sessionInfo = LocalSessions->Create(sessionId, workerId, database, dbCounters, supportsBalancing); + TKqpSessionInfo* sessionInfo = LocalSessions->Create( + sessionId, workerId, database, dbCounters, supportsBalancing, GetSessionIdleDuration()); KQP_PROXY_LOG_D(requestInfo << "Created new session" << ", sessionId: " << sessionInfo->SessionId diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.h b/ydb/core/kqp/proxy_service/kqp_proxy_service.h index 11f1ce1aef..8dc1b9622d 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.h @@ -79,23 +79,24 @@ struct TKqpSessionInfo { TActorId WorkerId; TString Database; TKqpDbCountersPtr DbCounters; - TInstant LastRequestAt; - TInstant CreatedAt; TInstant ShutdownStartedAt; std::vector<i32> ReadyPos; + NActors::TMonotonic IdleTimeout; + // position in the idle list. + std::list<TKqpSessionInfo*>::iterator IdlePos; TKqpSessionInfo(const TString& sessionId, const TActorId& workerId, - const TString& database, TKqpDbCountersPtr dbCounters, std::vector<i32>&& pos) + const TString& database, TKqpDbCountersPtr dbCounters, std::vector<i32>&& pos, + NActors::TMonotonic idleTimeout, std::list<TKqpSessionInfo*>::iterator idlePos) : SessionId(sessionId) , WorkerId(workerId) , Database(database) , DbCounters(dbCounters) , ShutdownStartedAt() , ReadyPos(std::move(pos)) + , IdleTimeout(idleTimeout) + , IdlePos(idlePos) { - auto now = TAppData::TimeProvider->Now(); - LastRequestAt = now; - CreatedAt = now; } }; @@ -137,6 +138,7 @@ class TLocalSessionsRegistry { THashMap<TString, ui32> SessionsCountPerDatabase; std::vector<std::vector<TString>> ReadySessions; TIntrusivePtr<IRandomProvider> RandomProvider; + std::list<TKqpSessionInfo*> IdleSessions; public: TLocalSessionsRegistry(TIntrusivePtr<IRandomProvider> randomProvider) @@ -145,7 +147,8 @@ public: {} TKqpSessionInfo* Create(const TString& sessionId, const TActorId& workerId, - const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing) + const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing, + TDuration idleDuration) { std::vector<i32> pos(2, -1); pos[0] = ReadySessions[0].size(); @@ -157,10 +160,12 @@ public: } auto result = LocalSessions.emplace(sessionId, - TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos))); + TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos), + NActors::TActivationContext::Monotonic() + idleDuration, IdleSessions.end())); SessionsCountPerDatabase[database]++; Y_VERIFY(result.second, "Duplicate session id!"); TargetIdIndex.emplace(workerId, sessionId); + StartIdleCheck(&(result.first->second), idleDuration); return &result.first->second; } @@ -176,6 +181,49 @@ public: return ptr; } + bool IsSessionIdle(const TKqpSessionInfo* sessionInfo) const { + return sessionInfo->IdlePos != IdleSessions.end(); + } + + const TKqpSessionInfo* GetIdleSession(const NActors::TMonotonic& now) { + if (IdleSessions.empty()) { + return nullptr; + } + + const TKqpSessionInfo* candidate = (*IdleSessions.begin()); + if (candidate->IdleTimeout > now) { + return nullptr; + } + + StopIdleCheck(candidate); + return candidate; + } + + void StartIdleCheck(const TKqpSessionInfo* sessionInfo, const TDuration idleDuration) { + if (!sessionInfo) + return; + + TKqpSessionInfo* info = const_cast<TKqpSessionInfo*>(sessionInfo); + + info->IdleTimeout = NActors::TActivationContext::Monotonic() + idleDuration; + if (info->IdlePos != IdleSessions.end()) { + IdleSessions.erase(info->IdlePos); + } + + info->IdlePos = IdleSessions.insert(IdleSessions.end(), info); + } + + void StopIdleCheck(const TKqpSessionInfo* sessionInfo) { + if (!sessionInfo) + return; + + TKqpSessionInfo* info = const_cast<TKqpSessionInfo*>(sessionInfo); + if (info->IdlePos != IdleSessions.end()) { + IdleSessions.erase(info->IdlePos); + info->IdlePos = IdleSessions.end(); + } + } + TKqpSessionInfo* PickSessionToShutdown(bool force, ui32 minReasonableToKick) { auto& sessions = force ? ReadySessions.at(0) : ReadySessions.at(1); if (sessions.size() >= minReasonableToKick) { @@ -209,6 +257,7 @@ public: } } + StopIdleCheck(&(it->second)); RemoveSessionFromLists(&(it->second)); ShutdownInFlightSessions.erase(sessionId); TargetIdIndex.erase(it->second.WorkerId); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 9c27cdcbbe..5eda7bcb2b 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -246,7 +246,6 @@ public: Config->FeatureFlags = AppData()->FeatureFlags; Become(&TKqpSessionActor::ReadyState); - StartIdleTimer(); } TString LogPrefix() const { @@ -560,8 +559,6 @@ public: return; } - StopIdleTimer(); - CompileQuery(); } @@ -1825,22 +1822,6 @@ public: } } - void Handle(TEvKqp::TEvPingSessionRequest::TPtr& ev) { - ui64 proxyRequestId = ev->Cookie; - auto result = std::make_unique<TEvKqp::TEvPingSessionResponse>(); - auto& record = result->Record; - record.SetStatus(Ydb::StatusIds::SUCCESS); - auto sessionStatus = CurrentStateFunc() == &TThis::ReadyState - ? Ydb::Table::KeepAliveResult::SESSION_STATUS_READY - : Ydb::Table::KeepAliveResult::SESSION_STATUS_BUSY; - record.MutableResponse()->SetSessionStatus(sessionStatus); - if (CurrentStateFunc() == &TThis::ReadyState) { - StartIdleTimer(); - } - - Send(ev->Sender, result.release(), 0, proxyRequestId); - } - void HandleReady(TEvKqp::TEvCloseSessionRequest::TPtr&) { LOG_I("Session closed due to explicit close event"); Counters->ReportSessionActorClosedRequest(Settings.DbCounters); @@ -1891,35 +1872,6 @@ public: } } - void StartIdleTimer() { - StopIdleTimer(); - - ++IdleTimerId; - auto idleDuration = TDuration::Seconds(*Config->_KqpSessionIdleTimeoutSec.Get()); - IdleTimerActorId = CreateLongTimer(TlsActivationContext->AsActorContext(), idleDuration, - new IEventHandle(SelfId(), SelfId(), new TEvKqp::TEvIdleTimeout(IdleTimerId))); - LOG_D("Created long timer for idle timeout, timer id: " << IdleTimerId - << ", duration: " << idleDuration << ", actor: " << IdleTimerActorId); - } - - void StopIdleTimer() { - if (IdleTimerActorId) { - LOG_D("Destroying long timer actor for idle timout: " << IdleTimerActorId); - Send(IdleTimerActorId, new TEvents::TEvPoisonPill()); - } - IdleTimerActorId = TActorId(); - } - - void Handle(TEvKqp::TEvIdleTimeout::TPtr& ev) { - auto timerId = ev->Get()->TimerId; - - if (timerId == IdleTimerId) { - LOG_N("SessionActor idle timeout, worker destroyed"); - Counters->ReportSessionActorClosedIdle(Settings.DbCounters); - CleanupAndPassAway(); - } - } - void SendRollbackRequest(TKqpTransactionContext* txCtx) { if (QueryState) { LWTRACK(KqpSessionSendRollback, QueryState->Orbit, QueryState->CurrentTx); @@ -2070,7 +2022,6 @@ public: TransactionsToBeAborted.clear(); CleanupCtx.reset(); QueryState.reset(); - StartIdleTimer(); Become(&TKqpSessionActor::ReadyState); } ExecuterId = TActorId{}; @@ -2125,8 +2076,6 @@ public: switch (ev->GetTypeRewrite()) { HFunc(TEvKqp::TEvQueryRequest, HandleReady); - hFunc(TEvKqp::TEvPingSessionRequest, Handle); - hFunc(TEvKqp::TEvIdleTimeout, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); @@ -2147,11 +2096,9 @@ public: hFunc(TEvKqp::TEvQueryRequest, HandleCompile); hFunc(TEvKqp::TEvCompileResponse, HandleCompile); - hFunc(TEvKqp::TEvPingSessionRequest, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, HandleCompile); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); - hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); hFunc(TEvKqpExecuter::TEvTxResponse, HandleTxResponse); hFunc(TEvents::TEvWakeup, HandleWakeup); default: @@ -2177,11 +2124,9 @@ public: hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleExecute); hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, HandleExecute); - hFunc(TEvKqp::TEvPingSessionRequest, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); - hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); hFunc(TEvents::TEvWakeup, HandleWakeup); // always come from WorkerActor @@ -2205,11 +2150,9 @@ public: hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop); hFunc(TEvKqp::TEvCompileResponse, HandleNoop); - hFunc(TEvKqp::TEvPingSessionRequest, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, HandleCleanup); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); - hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); hFunc(TEvents::TEvWakeup, HandleNoop); // always come from WorkerActor @@ -2229,11 +2172,9 @@ public: hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleTopicOps); - hFunc(TEvKqp::TEvPingSessionRequest, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, HandleTopicOps); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); - hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); default: UnexpectedEvent("TopicOpsState", ev); } @@ -2406,11 +2347,7 @@ private: std::vector<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted; ui64 EvictedTx = 0; std::unique_ptr<TEvKqp::TEvQueryResponse> QueryResponse; - - TActorId IdleTimerActorId; - ui32 IdleTimerId = 0; std::optional<TSessionShutdownState> ShutdownState; - TULIDGenerator UlidGen; }; diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index 02bacc3086..1be1327762 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -852,18 +852,6 @@ private: } } - bool ReplyPingStatus(const TActorId& sender, ui64 proxyRequestId, bool ready, const TActorContext& ctx) { - auto ev = MakeHolder<TEvKqp::TEvPingSessionResponse>(); - auto& record = ev->Record; - record.SetStatus(Ydb::StatusIds::SUCCESS); - record.MutableResponse()->SetSessionStatus(ready - ? Ydb::Table::KeepAliveResult::SESSION_STATUS_READY - : Ydb::Table::KeepAliveResult::SESSION_STATUS_BUSY); - - AddTrailingInfo(record); - return ctx.Send(sender, ev.Release(), 0, proxyRequestId); - } - bool ReplyProcessError(const TActorId& sender, ui64 proxyRequestId, Ydb::StatusIds::StatusCode ydbStatus, const TString& message, const TActorContext& ctx) { diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index f67cb56292..fb51130cc5 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1213,6 +1213,7 @@ message TTableServiceConfig { optional bool EnableKqpScanQuerySourceRead = 26 [default = false]; optional bool EnableKqpDataQuerySourceRead = 27 [default = false]; + optional uint64 SessionIdleDurationSeconds = 28 [default = 600]; }; // Config describes immediate controls and allows |