aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-04-22 22:14:09 +0300
committerVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-04-22 22:14:09 +0300
commitcb63cb3c30759993ff7e56aa59354a0db20e43f0 (patch)
treeb07878e4d08b44301e2c208aa336626ef11ed73e
parentbc397e8af42491e31ef07c909c8ddf6a7e78a806 (diff)
downloadydb-cb63cb3c30759993ff7e56aa59354a0db20e43f0.tar.gz
Handle session shutdown requests KIKIMR-14428
ref:18d574f9d5673def6a59131c6b21b7961d818e5a
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp11
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp105
-rw-r--r--ydb/core/kqp/kqp_worker_actor.cpp33
-rw-r--r--ydb/core/kqp/kqp_worker_common.h33
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_service.cpp6
5 files changed, 135 insertions, 53 deletions
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index f3c96fd19e..f62713bdd2 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -495,6 +495,10 @@ void TKqpCountersBase::ReportSessionActorClosedError() {
SessionActorsClosedError->Inc();
}
+void TKqpCountersBase::ReportSessionActorClosedRequest() {
+ SessionActorsClosedRequest->Inc();
+}
+
void TKqpCountersBase::ReportQueriesPerSessionActor(ui32 queryId) {
QueriesPerSessionActor->Collect(queryId);
}
@@ -1021,6 +1025,13 @@ void TKqpCounters::ReportSessionActorClosedError(TKqpDbCountersPtr dbCounters) {
}
}
+void TKqpCounters::ReportSessionActorClosedRequest(TKqpDbCountersPtr dbCounters) {
+ TKqpCountersBase::ReportSessionActorClosedRequest();
+ if (dbCounters) {
+ dbCounters->ReportSessionActorClosedRequest();
+ }
+}
+
void TKqpCounters::ReportSessionActorClosedIdle(TKqpDbCountersPtr dbCounters) {
TKqpCountersBase::ReportSessionActorClosedIdle();
if (dbCounters) {
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index fac13d7360..2a4eb644a7 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -79,7 +79,7 @@ struct TKqpQueryState {
NYql::TKikimrQueryDeadlines QueryDeadlines;
NKqpProto::TKqpStatsQuery Stats;
-
+ bool KeepSession = false;
TString UserToken;
NLWTrace::TOrbit Orbit;
@@ -306,10 +306,19 @@ public:
YQL_ENSURE(requestInfo.GetSessionId() == SessionId,
"Invalid session, expected: " << SessionId << ", got: " << requestInfo.GetSessionId());
- MakeNewQueryState();
+ if (ShutdownState && ShutdownState->SoftTimeoutReached()) {
+ // we reached the soft timeout, so at this point we don't allow to accept new queries for session.
+ LOG_N(TKqpRequestInfo("", SessionId)
+ << "System shutdown requested: soft timeout reached, no queries can be accepted. Closing session.");
+ ReplyQueryError(requestInfo, Ydb::StatusIds::BAD_SESSION, "Session is under shutdown");
+ FinalCleanup();
+ return;
+ }
+ MakeNewQueryState();
QueryState->Request.Swap(event.MutableRequest());
auto& queryRequest = QueryState->Request;
+
YQL_ENSURE(queryRequest.GetDatabase() == Settings.Database,
"Wrong database, expected:" << Settings.Database << ", got: " << queryRequest.GetDatabase());
@@ -336,6 +345,7 @@ public:
QueryState->QueryDeadlines = GetQueryDeadlines(queryRequest);
QueryState->ParametersSize = queryRequest.GetParameters().ByteSize();
QueryState->RequestActorId = ActorIdFromProto(event.GetRequestActorId());
+ QueryState->KeepSession = Settings.LongSession || queryRequest.GetKeepSession();
switch (action) {
case NKikimrKqp::QUERY_ACTION_EXECUTE:
@@ -345,6 +355,13 @@ public:
auto type = queryRequest.GetType();
YQL_ENSURE(type != NKikimrKqp::QUERY_TYPE_UNDEFINED, "query type is undefined");
+ if (action == NKikimrKqp::QUERY_ACTION_PREPARE) {
+ if (QueryState->KeepSession && !Settings.LongSession) {
+ ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST)
+ << "Expected KeepSession=false for non-execute requests";
+ }
+ }
+
if (!IsQueryTypeSupported(type)) {
event.MutableRequest()->Swap(&QueryState->Request);
ForwardRequest(ev);
@@ -974,7 +991,8 @@ public:
}
- void HandleNoop(TEvKqpExecuter::TEvExecuterProgress::TPtr& /*ev*/) {
+ template<typename T>
+ void HandleNoop(T&) {
}
void HandleExecute(TEvKqp::TEvQueryRequest::TPtr& ev) {
@@ -1417,7 +1435,9 @@ public:
auto& response = *record.MutableResponse();
const auto& status = record.GetYdbStatus();
- response.SetSessionId(SessionId);
+ if (QueryState->KeepSession) {
+ response.SetSessionId(SessionId);
+ }
if (status == Ydb::StatusIds::SUCCESS) {
LWTRACK(KqpQueryReplySuccess, QueryState->Orbit, record.GetArena()->SpaceUsed());
@@ -1479,16 +1499,54 @@ public:
Send(ev->Sender, result.release(), 0, proxyRequestId);
}
- void Handle(TEvKqp::TEvCloseSessionRequest::TPtr&) {
- if (QueryState) {
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- ReplyQueryError(requestInfo, Ydb::StatusIds::SESSION_EXPIRED,
- "Request cancelled due to explicit session close request");
+ void HandleReady(TEvKqp::TEvCloseSessionRequest::TPtr&) {
+ LOG_I(TKqpRequestInfo("", SessionId) << "Session closed due to explicit close event");
+ Counters->ReportSessionActorClosedRequest(Settings.DbCounters);
+ FinalCleanup();
+ }
+
+ void HandleCompile(TEvKqp::TEvCloseSessionRequest::TPtr&) {
+ YQL_ENSURE(QueryState);
+ ReplyQueryError(TKqpRequestInfo(QueryState->TraceId, SessionId), Ydb::StatusIds::BAD_SESSION,
+ "Request cancelled due to explicit session close request");
+ Counters->ReportSessionActorClosedRequest(Settings.DbCounters);
+ }
+
+ void HandleExecute(TEvKqp::TEvCloseSessionRequest::TPtr&) {
+ YQL_ENSURE(QueryState);
+ QueryState->KeepSession = false;
+ }
+
+ void HandleCleanup(TEvKqp::TEvCloseSessionRequest::TPtr&) {
+ YQL_ENSURE(CleanupCtx);
+ if (!CleanupCtx->Final) {
+ YQL_ENSURE(QueryState);
+ QueryState->KeepSession = false;
}
- if (CleanupCtx) {
- CleanupCtx->Final = true;
+ }
+
+ void Handle(TEvKqp::TEvInitiateSessionShutdown::TPtr& ev) {
+ if (!ShutdownState) {
+ LOG_N("Started session shutdown " << TKqpRequestInfo("", SessionId));
+
+ ShutdownState = TSessionShutdownState(ev->Get()->SoftTimeoutMs, ev->Get()->HardTimeoutMs);
+ ScheduleNextShutdownTick();
+ }
+ }
+
+ void ScheduleNextShutdownTick() {
+ Schedule(TDuration::MilliSeconds(ShutdownState->GetNextTickMs()), new TEvKqp::TEvContinueShutdown());
+ }
+
+ void Handle(TEvKqp::TEvContinueShutdown::TPtr&) {
+ Y_VERIFY(ShutdownState);
+ ShutdownState->MoveToNextState();
+ if (ShutdownState->HardTimeoutReached()) {
+ LOG_N("Reached hard shutdown timeout " << TKqpRequestInfo("", SessionId));
+ Send(SelfId(), new TEvKqp::TEvCloseSessionRequest());
} else {
- FinalCleanup();
+ ScheduleNextShutdownTick();
+ LOG_I("Schedule next shutdown tick " << TKqpRequestInfo("", SessionId));
}
}
@@ -1543,9 +1601,6 @@ public:
}
}
- void HandleNoop(TEvKqp::TEvIdleTimeout::TPtr&) {
- }
-
void HandleCleanup(TEvKqp::TEvQueryRequest::TPtr& ev) {
ReplyBusy(ev);
}
@@ -1555,6 +1610,8 @@ public:
}
void Cleanup(bool isFinal = false) {
+ isFinal = isFinal || !QueryState->KeepSession;
+
if (isFinal) {
for (auto it = ExplicitTransactions.Begin(); it != ExplicitTransactions.End(); ++it) {
it.Value()->Invalidate();
@@ -1670,7 +1727,9 @@ public:
hFunc(TEvKqp::TEvPingSessionRequest, Handle);
hFunc(TEvKqp::TEvIdleTimeout, Handle);
- hFunc(TEvKqp::TEvCloseSessionRequest, Handle);
+ hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady);
+ hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
+ hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(TEvKqp::TEvQueryResponse, ForwardResponse);
default:
@@ -1690,7 +1749,9 @@ public:
hFunc(TEvKqp::TEvCompileResponse, HandleCompile);
hFunc(TEvKqp::TEvPingSessionRequest, Handle);
- hFunc(TEvKqp::TEvCloseSessionRequest, Handle);
+ hFunc(TEvKqp::TEvCloseSessionRequest, HandleCompile);
+ hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
+ hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
default:
UnexpectedEvent("CompileState", ev);
@@ -1716,7 +1777,9 @@ public:
hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, HandleExecute);
hFunc(TEvKqp::TEvPingSessionRequest, Handle);
- hFunc(TEvKqp::TEvCloseSessionRequest, Handle);
+ hFunc(TEvKqp::TEvCloseSessionRequest, HandleExecute);
+ hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
+ hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
default:
UnexpectedEvent("ExecuteState", ev);
@@ -1735,9 +1798,12 @@ public:
hFunc(TEvKqp::TEvQueryRequest, HandleCleanup);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleCleanup);
hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop);
+ hFunc(TEvKqp::TEvCompileResponse, HandleNoop);
hFunc(TEvKqp::TEvPingSessionRequest, Handle);
- hFunc(TEvKqp::TEvCloseSessionRequest, Handle);
+ hFunc(TEvKqp::TEvCloseSessionRequest, HandleCleanup);
+ hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
+ hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
default:
UnexpectedEvent("CleanupState", ev);
@@ -1786,6 +1852,7 @@ private:
TActorId IdleTimerActorId;
ui32 IdleTimerId = 0;
TDuration IdleDuration;
+ std::optional<TSessionShutdownState> ShutdownState;
};
} // namespace
diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp
index 0efd2bb996..7a05f12b22 100644
--- a/ydb/core/kqp/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/kqp_worker_actor.cpp
@@ -78,39 +78,6 @@ struct TKqpQueryState {
};
-struct TSessionShutdownState {
- TSessionShutdownState(ui32 softTimeout, ui32 hardTimeout)
- : HardTimeout(hardTimeout)
- , SoftTimeout(softTimeout)
- {}
-
- ui32 Step = 0;
- ui32 HardTimeout;
- ui32 SoftTimeout;
-
- void MoveToNextState() {
- ++Step;
- }
-
- ui32 GetNextTickMs() const {
- if (Step == 0) {
- return std::min(HardTimeout, SoftTimeout);
- } else if (Step == 1) {
- return std::max(HardTimeout, SoftTimeout) - std::min(HardTimeout, SoftTimeout) + 1;
- } else {
- return 50;
- }
- }
-
- bool SoftTimeoutReached() const {
- return Step == 1;
- }
-
- bool HardTimeoutReached() const {
- return Step == 2;
- }
-};
-
struct TKqpCleanupState {
bool Final = false;
TInstant Start;
diff --git a/ydb/core/kqp/kqp_worker_common.h b/ydb/core/kqp/kqp_worker_common.h
index e462630181..b89221a326 100644
--- a/ydb/core/kqp/kqp_worker_common.h
+++ b/ydb/core/kqp/kqp_worker_common.h
@@ -7,6 +7,39 @@
namespace NKikimr::NKqp {
+struct TSessionShutdownState {
+ TSessionShutdownState(ui32 softTimeout, ui32 hardTimeout)
+ : HardTimeout(hardTimeout)
+ , SoftTimeout(softTimeout)
+ {}
+
+ ui32 Step = 0;
+ ui32 HardTimeout;
+ ui32 SoftTimeout;
+
+ void MoveToNextState() {
+ ++Step;
+ }
+
+ ui32 GetNextTickMs() const {
+ if (Step == 0) {
+ return std::min(HardTimeout, SoftTimeout);
+ } else if (Step == 1) {
+ return std::max(HardTimeout, SoftTimeout) - std::min(HardTimeout, SoftTimeout) + 1;
+ } else {
+ return 50;
+ }
+ }
+
+ bool SoftTimeoutReached() const {
+ return Step == 1;
+ }
+
+ bool HardTimeoutReached() const {
+ return Step == 2;
+ }
+};
+
inline bool IsExecuteAction(const NKikimrKqp::EQueryAction& action) {
switch (action) {
case NKikimrKqp::QUERY_ACTION_EXECUTE:
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
index 1b4d4e6735..4b1074132a 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
@@ -949,7 +949,11 @@ public:
}
}
- str << Endl << "Active workers count on node: " << LocalSessions.size() << Endl;
+ str << Endl;
+
+ str << "EnableSessionActor: "
+ << (AppData()->FeatureFlags.GetEnableKqpSessionActor() ? "true" : "false") << Endl;
+ str << "Active workers/session_actors count on node: " << LocalSessions.size() << Endl;
const auto& sessionsShutdownInFlight = LocalSessions.GetShutdownInFlight();
if (!sessionsShutdownInFlight.empty()) {