diff options
author | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-04-22 22:14:09 +0300 |
---|---|---|
committer | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-04-22 22:14:09 +0300 |
commit | cb63cb3c30759993ff7e56aa59354a0db20e43f0 (patch) | |
tree | b07878e4d08b44301e2c208aa336626ef11ed73e | |
parent | bc397e8af42491e31ef07c909c8ddf6a7e78a806 (diff) | |
download | ydb-cb63cb3c30759993ff7e56aa59354a0db20e43f0.tar.gz |
Handle session shutdown requests KIKIMR-14428
ref:18d574f9d5673def6a59131c6b21b7961d818e5a
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 105 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_worker_actor.cpp | 33 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_worker_common.h | 33 | ||||
-rw-r--r-- | ydb/core/kqp/proxy/kqp_proxy_service.cpp | 6 |
5 files changed, 135 insertions, 53 deletions
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index f3c96fd19e..f62713bdd2 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -495,6 +495,10 @@ void TKqpCountersBase::ReportSessionActorClosedError() { SessionActorsClosedError->Inc(); } +void TKqpCountersBase::ReportSessionActorClosedRequest() { + SessionActorsClosedRequest->Inc(); +} + void TKqpCountersBase::ReportQueriesPerSessionActor(ui32 queryId) { QueriesPerSessionActor->Collect(queryId); } @@ -1021,6 +1025,13 @@ void TKqpCounters::ReportSessionActorClosedError(TKqpDbCountersPtr dbCounters) { } } +void TKqpCounters::ReportSessionActorClosedRequest(TKqpDbCountersPtr dbCounters) { + TKqpCountersBase::ReportSessionActorClosedRequest(); + if (dbCounters) { + dbCounters->ReportSessionActorClosedRequest(); + } +} + void TKqpCounters::ReportSessionActorClosedIdle(TKqpDbCountersPtr dbCounters) { TKqpCountersBase::ReportSessionActorClosedIdle(); if (dbCounters) { diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index fac13d7360..2a4eb644a7 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -79,7 +79,7 @@ struct TKqpQueryState { NYql::TKikimrQueryDeadlines QueryDeadlines; NKqpProto::TKqpStatsQuery Stats; - + bool KeepSession = false; TString UserToken; NLWTrace::TOrbit Orbit; @@ -306,10 +306,19 @@ public: YQL_ENSURE(requestInfo.GetSessionId() == SessionId, "Invalid session, expected: " << SessionId << ", got: " << requestInfo.GetSessionId()); - MakeNewQueryState(); + if (ShutdownState && ShutdownState->SoftTimeoutReached()) { + // we reached the soft timeout, so at this point we don't allow to accept new queries for session. + LOG_N(TKqpRequestInfo("", SessionId) + << "System shutdown requested: soft timeout reached, no queries can be accepted. Closing session."); + ReplyQueryError(requestInfo, Ydb::StatusIds::BAD_SESSION, "Session is under shutdown"); + FinalCleanup(); + return; + } + MakeNewQueryState(); QueryState->Request.Swap(event.MutableRequest()); auto& queryRequest = QueryState->Request; + YQL_ENSURE(queryRequest.GetDatabase() == Settings.Database, "Wrong database, expected:" << Settings.Database << ", got: " << queryRequest.GetDatabase()); @@ -336,6 +345,7 @@ public: QueryState->QueryDeadlines = GetQueryDeadlines(queryRequest); QueryState->ParametersSize = queryRequest.GetParameters().ByteSize(); QueryState->RequestActorId = ActorIdFromProto(event.GetRequestActorId()); + QueryState->KeepSession = Settings.LongSession || queryRequest.GetKeepSession(); switch (action) { case NKikimrKqp::QUERY_ACTION_EXECUTE: @@ -345,6 +355,13 @@ public: auto type = queryRequest.GetType(); YQL_ENSURE(type != NKikimrKqp::QUERY_TYPE_UNDEFINED, "query type is undefined"); + if (action == NKikimrKqp::QUERY_ACTION_PREPARE) { + if (QueryState->KeepSession && !Settings.LongSession) { + ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) + << "Expected KeepSession=false for non-execute requests"; + } + } + if (!IsQueryTypeSupported(type)) { event.MutableRequest()->Swap(&QueryState->Request); ForwardRequest(ev); @@ -974,7 +991,8 @@ public: } - void HandleNoop(TEvKqpExecuter::TEvExecuterProgress::TPtr& /*ev*/) { + template<typename T> + void HandleNoop(T&) { } void HandleExecute(TEvKqp::TEvQueryRequest::TPtr& ev) { @@ -1417,7 +1435,9 @@ public: auto& response = *record.MutableResponse(); const auto& status = record.GetYdbStatus(); - response.SetSessionId(SessionId); + if (QueryState->KeepSession) { + response.SetSessionId(SessionId); + } if (status == Ydb::StatusIds::SUCCESS) { LWTRACK(KqpQueryReplySuccess, QueryState->Orbit, record.GetArena()->SpaceUsed()); @@ -1479,16 +1499,54 @@ public: Send(ev->Sender, result.release(), 0, proxyRequestId); } - void Handle(TEvKqp::TEvCloseSessionRequest::TPtr&) { - if (QueryState) { - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - ReplyQueryError(requestInfo, Ydb::StatusIds::SESSION_EXPIRED, - "Request cancelled due to explicit session close request"); + void HandleReady(TEvKqp::TEvCloseSessionRequest::TPtr&) { + LOG_I(TKqpRequestInfo("", SessionId) << "Session closed due to explicit close event"); + Counters->ReportSessionActorClosedRequest(Settings.DbCounters); + FinalCleanup(); + } + + void HandleCompile(TEvKqp::TEvCloseSessionRequest::TPtr&) { + YQL_ENSURE(QueryState); + ReplyQueryError(TKqpRequestInfo(QueryState->TraceId, SessionId), Ydb::StatusIds::BAD_SESSION, + "Request cancelled due to explicit session close request"); + Counters->ReportSessionActorClosedRequest(Settings.DbCounters); + } + + void HandleExecute(TEvKqp::TEvCloseSessionRequest::TPtr&) { + YQL_ENSURE(QueryState); + QueryState->KeepSession = false; + } + + void HandleCleanup(TEvKqp::TEvCloseSessionRequest::TPtr&) { + YQL_ENSURE(CleanupCtx); + if (!CleanupCtx->Final) { + YQL_ENSURE(QueryState); + QueryState->KeepSession = false; } - if (CleanupCtx) { - CleanupCtx->Final = true; + } + + void Handle(TEvKqp::TEvInitiateSessionShutdown::TPtr& ev) { + if (!ShutdownState) { + LOG_N("Started session shutdown " << TKqpRequestInfo("", SessionId)); + + ShutdownState = TSessionShutdownState(ev->Get()->SoftTimeoutMs, ev->Get()->HardTimeoutMs); + ScheduleNextShutdownTick(); + } + } + + void ScheduleNextShutdownTick() { + Schedule(TDuration::MilliSeconds(ShutdownState->GetNextTickMs()), new TEvKqp::TEvContinueShutdown()); + } + + void Handle(TEvKqp::TEvContinueShutdown::TPtr&) { + Y_VERIFY(ShutdownState); + ShutdownState->MoveToNextState(); + if (ShutdownState->HardTimeoutReached()) { + LOG_N("Reached hard shutdown timeout " << TKqpRequestInfo("", SessionId)); + Send(SelfId(), new TEvKqp::TEvCloseSessionRequest()); } else { - FinalCleanup(); + ScheduleNextShutdownTick(); + LOG_I("Schedule next shutdown tick " << TKqpRequestInfo("", SessionId)); } } @@ -1543,9 +1601,6 @@ public: } } - void HandleNoop(TEvKqp::TEvIdleTimeout::TPtr&) { - } - void HandleCleanup(TEvKqp::TEvQueryRequest::TPtr& ev) { ReplyBusy(ev); } @@ -1555,6 +1610,8 @@ public: } void Cleanup(bool isFinal = false) { + isFinal = isFinal || !QueryState->KeepSession; + if (isFinal) { for (auto it = ExplicitTransactions.Begin(); it != ExplicitTransactions.End(); ++it) { it.Value()->Invalidate(); @@ -1670,7 +1727,9 @@ public: hFunc(TEvKqp::TEvPingSessionRequest, Handle); hFunc(TEvKqp::TEvIdleTimeout, Handle); - hFunc(TEvKqp::TEvCloseSessionRequest, Handle); + hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady); + hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); + hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(TEvKqp::TEvQueryResponse, ForwardResponse); default: @@ -1690,7 +1749,9 @@ public: hFunc(TEvKqp::TEvCompileResponse, HandleCompile); hFunc(TEvKqp::TEvPingSessionRequest, Handle); - hFunc(TEvKqp::TEvCloseSessionRequest, Handle); + hFunc(TEvKqp::TEvCloseSessionRequest, HandleCompile); + hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); + hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); default: UnexpectedEvent("CompileState", ev); @@ -1716,7 +1777,9 @@ public: hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, HandleExecute); hFunc(TEvKqp::TEvPingSessionRequest, Handle); - hFunc(TEvKqp::TEvCloseSessionRequest, Handle); + hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute); + hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); + hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); default: UnexpectedEvent("ExecuteState", ev); @@ -1735,9 +1798,12 @@ public: hFunc(TEvKqp::TEvQueryRequest, HandleCleanup); hFunc(TEvKqpExecuter::TEvTxResponse, HandleCleanup); hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop); + hFunc(TEvKqp::TEvCompileResponse, HandleNoop); hFunc(TEvKqp::TEvPingSessionRequest, Handle); - hFunc(TEvKqp::TEvCloseSessionRequest, Handle); + hFunc(TEvKqp::TEvCloseSessionRequest, HandleCleanup); + hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); + hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); default: UnexpectedEvent("CleanupState", ev); @@ -1786,6 +1852,7 @@ private: TActorId IdleTimerActorId; ui32 IdleTimerId = 0; TDuration IdleDuration; + std::optional<TSessionShutdownState> ShutdownState; }; } // namespace diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp index 0efd2bb996..7a05f12b22 100644 --- a/ydb/core/kqp/kqp_worker_actor.cpp +++ b/ydb/core/kqp/kqp_worker_actor.cpp @@ -78,39 +78,6 @@ struct TKqpQueryState { }; -struct TSessionShutdownState { - TSessionShutdownState(ui32 softTimeout, ui32 hardTimeout) - : HardTimeout(hardTimeout) - , SoftTimeout(softTimeout) - {} - - ui32 Step = 0; - ui32 HardTimeout; - ui32 SoftTimeout; - - void MoveToNextState() { - ++Step; - } - - ui32 GetNextTickMs() const { - if (Step == 0) { - return std::min(HardTimeout, SoftTimeout); - } else if (Step == 1) { - return std::max(HardTimeout, SoftTimeout) - std::min(HardTimeout, SoftTimeout) + 1; - } else { - return 50; - } - } - - bool SoftTimeoutReached() const { - return Step == 1; - } - - bool HardTimeoutReached() const { - return Step == 2; - } -}; - struct TKqpCleanupState { bool Final = false; TInstant Start; diff --git a/ydb/core/kqp/kqp_worker_common.h b/ydb/core/kqp/kqp_worker_common.h index e462630181..b89221a326 100644 --- a/ydb/core/kqp/kqp_worker_common.h +++ b/ydb/core/kqp/kqp_worker_common.h @@ -7,6 +7,39 @@ namespace NKikimr::NKqp { +struct TSessionShutdownState { + TSessionShutdownState(ui32 softTimeout, ui32 hardTimeout) + : HardTimeout(hardTimeout) + , SoftTimeout(softTimeout) + {} + + ui32 Step = 0; + ui32 HardTimeout; + ui32 SoftTimeout; + + void MoveToNextState() { + ++Step; + } + + ui32 GetNextTickMs() const { + if (Step == 0) { + return std::min(HardTimeout, SoftTimeout); + } else if (Step == 1) { + return std::max(HardTimeout, SoftTimeout) - std::min(HardTimeout, SoftTimeout) + 1; + } else { + return 50; + } + } + + bool SoftTimeoutReached() const { + return Step == 1; + } + + bool HardTimeoutReached() const { + return Step == 2; + } +}; + inline bool IsExecuteAction(const NKikimrKqp::EQueryAction& action) { switch (action) { case NKikimrKqp::QUERY_ACTION_EXECUTE: diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp index 1b4d4e6735..4b1074132a 100644 --- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp @@ -949,7 +949,11 @@ public: } } - str << Endl << "Active workers count on node: " << LocalSessions.size() << Endl; + str << Endl; + + str << "EnableSessionActor: " + << (AppData()->FeatureFlags.GetEnableKqpSessionActor() ? "true" : "false") << Endl; + str << "Active workers/session_actors count on node: " << LocalSessions.size() << Endl; const auto& sessionsShutdownInFlight = LocalSessions.GetShutdownInFlight(); if (!sessionsShutdownInFlight.empty()) { |