aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2022-09-08 12:25:36 +0300
committergvit <gvit@ydb.tech>2022-09-08 12:25:36 +0300
commit5c79f4555d1f786b56008cdc1075ec5a176b3723 (patch)
treeb43633df745795751daaaeacc3256d86f57fa88f
parentad99613425cb5d6a84eb2643264b1ec4f64740ec (diff)
downloadydb-5c79f4555d1f786b56008cdc1075ec5a176b3723.tar.gz
refactor kqp proxy: avoid decoding session in hot pass, avoid multiple lookups in hashtables
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_service.cpp312
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_service.h140
2 files changed, 205 insertions, 247 deletions
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
index 38a8e168622..2e198a3d4fb 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
@@ -117,147 +117,6 @@ TString EncodeSessionId(ui32 nodeId, const TString& id) {
}
-class TLocalSessionsRegistry {
- THashMap<TString, TKqpSessionInfo> LocalSessions;
- THashMap<TActorId, TString> TargetIdIndex;
- THashSet<TString> ShutdownInFlightSessions;
- THashMap<TString, ui32> SessionsCountPerDatabase;
- std::vector<std::vector<TString>> ReadySessions;
- TIntrusivePtr<IRandomProvider> RandomProvider;
-
-public:
- TLocalSessionsRegistry(TIntrusivePtr<IRandomProvider> randomProvider)
- : ReadySessions(2)
- , RandomProvider(randomProvider)
- {}
-
- TKqpSessionInfo* Create(const TString& sessionId, const TActorId& workerId,
- const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing)
- {
- std::vector<i32> pos(2, -1);
- pos[0] = ReadySessions[0].size();
- ReadySessions[0].push_back(sessionId);
-
- if (supportsBalancing) {
- pos[1] = ReadySessions[1].size();
- ReadySessions[1].push_back(sessionId);
- }
-
- auto result = LocalSessions.emplace(sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos)));
- SessionsCountPerDatabase[database]++;
- Y_VERIFY(result.second, "Duplicate session id!");
- TargetIdIndex.emplace(workerId, sessionId);
- return &result.first->second;
- }
-
- const THashSet<TString>& GetShutdownInFlight() const {
- return ShutdownInFlightSessions;
- }
-
- TKqpSessionInfo* StartShutdownSession(const TString& sessionId) {
- ShutdownInFlightSessions.emplace(sessionId);
- auto ptr = LocalSessions.FindPtr(sessionId);
- ptr->ShutdownStartedAt = TAppData::TimeProvider->Now();
- RemoveSessionFromLists(ptr);
- return ptr;
- }
-
- TKqpSessionInfo* PickSessionToShutdown(bool force, ui32 minReasonableToKick) {
- auto& sessions = force ? ReadySessions.at(0) : ReadySessions.at(1);
- if (sessions.size() >= minReasonableToKick) {
- ui64 idx = RandomProvider->GenRand() % sessions.size();
- return StartShutdownSession(sessions[idx]);
- }
-
- return nullptr;
- }
-
- THashMap<TString, TKqpSessionInfo>::const_iterator begin() const {
- return LocalSessions.begin();
- }
-
- THashMap<TString, TKqpSessionInfo>::const_iterator end() const {
- return LocalSessions.end();
- }
-
- size_t GetShutdownInFlightSize() const {
- return ShutdownInFlightSessions.size();
- }
-
- void Erase(const TString& sessionId) {
- auto it = LocalSessions.find(sessionId);
- if (it != LocalSessions.end()) {
- auto counter = SessionsCountPerDatabase.find(it->second.Database);
- if (counter != SessionsCountPerDatabase.end()) {
- counter->second--;
- if (counter->second == 0) {
- SessionsCountPerDatabase.erase(counter);
- }
- }
-
- RemoveSessionFromLists(&(it->second));
- ShutdownInFlightSessions.erase(sessionId);
- TargetIdIndex.erase(it->second.WorkerId);
- LocalSessions.erase(it);
- }
- }
-
- void RemoveSessionFromLists(TKqpSessionInfo* ptr) {
- for(ui32 i = 0; i < ptr->ReadyPos.size(); ++i) {
- i32& pos = ptr->ReadyPos.at(i);
- auto& sessions = ReadySessions.at(i);
- if (pos != -1 && pos + 1 != static_cast<i32>(sessions.size())) {
- auto& lastPos = LocalSessions.at(sessions.back()).ReadyPos.at(i);
- Y_VERIFY(lastPos + 1 == static_cast<i32>(sessions.size()));
- std::swap(sessions[pos], sessions[lastPos]);
- lastPos = pos;
- }
-
- if (pos != -1) {
- sessions.pop_back();
- pos = -1;
- }
- }
- }
-
- const TKqpSessionInfo* IsPendingShutdown(const TString& sessionId) const {
- if (ShutdownInFlightSessions.find(sessionId) != ShutdownInFlightSessions.end()) {
- return FindPtr(sessionId);
- }
-
- return nullptr;
- }
-
- bool CheckDatabaseLimits(const TString& database, ui32 databaseLimit) {
- auto it = SessionsCountPerDatabase.find(database);
- if (it == SessionsCountPerDatabase.end()){
- return true;
- }
-
- if (it->second + 1 <= databaseLimit) {
- return true;
- }
-
- return false;
- }
-
- size_t size() const {
- return LocalSessions.size();
- }
-
- const TKqpSessionInfo* FindPtr(const TString& sessionId) const {
- return LocalSessions.FindPtr(sessionId);
- }
-
- void Erase(const TActorId& targetId) {
- auto it = TargetIdIndex.find(targetId);
- if (it != TargetIdIndex.end()){
- Erase(it->second);
- }
- }
-};
-
-
class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
struct TEvPrivate {
enum EEv {
@@ -456,8 +315,7 @@ public:
DoPublishResources();
}
- void Handle(TEvPrivate::TEvReadyToPublishResources::TPtr& ev) {
- Y_UNUSED(ev);
+ void Handle(TEvPrivate::TEvReadyToPublishResources::TPtr&) {
ResourcesPublishScheduled = false;
DoPublishResources();
}
@@ -493,8 +351,7 @@ public:
PublishResourceUsage();
}
- void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr& ev) {
- Y_UNUSED(ev);
+ void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) {
KQP_PROXY_LOG_D("Subscribed for config changes.");
}
@@ -561,7 +418,7 @@ public:
}
}
- bool CreateRemoteSession(TEvKqp::TEvCreateSessionRequest::TPtr& ev) {
+ bool CreateRemoteSession(TEvKqp::TEvCreateSessionRequest::TPtr& ev, ui64 requestId) {
auto& event = ev->Get()->Record;
if (!event.GetCanCreateRemoteSession() || LocalDatacenterProxies.empty()) {
return false;
@@ -584,7 +441,6 @@ public:
remoteRequest->Record.SetSupportsBalancing(event.GetSupportsBalancing());
remoteRequest->Record.MutableRequest()->SetDatabase(event.GetRequest().GetDatabase());
- ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, event.GetTraceId(), TKqpEvents::EvCreateSessionRequest);
Send(MakeKqpProxyID(nodeId), remoteRequest.release(), IEventHandle::FlagTrackDelivery, requestId);
TDuration timeout = DEFAULT_CREATE_SESSION_TIMEOUT;
StartQueryTimeout(requestId, timeout);
@@ -595,7 +451,8 @@ public:
auto& event = ev->Get()->Record;
auto& request = event.GetRequest();
TKqpRequestInfo requestInfo(event.GetTraceId());
- if (CreateRemoteSession(ev)) {
+ ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, event.GetTraceId(), TKqpEvents::EvCreateSessionRequest);
+ if (CreateRemoteSession(ev, requestId)) {
return;
}
@@ -622,6 +479,7 @@ public:
responseEv->Record.SetYdbStatus(result.YdbStatus);
responseEv->Record.SetError(result.Error);
+ PendingRequests.Erase(requestId);
LogResponse(event.GetTraceId(), responseEv->Record, dbCounters);
Send(ev->Sender, responseEv.Release(), 0, ev->Cookie);
}
@@ -632,8 +490,28 @@ public:
TString traceId = event.GetTraceId();
TKqpRequestInfo requestInfo(traceId);
ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvQueryRequest);
+ if (request.GetSessionId().empty()) {
+ TProcessResult<TKqpSessionInfo*> result;
+ if (!CreateNewSessionWorker(requestInfo, TString(DefaultKikimrPublicClusterName), false,
+ request.GetDatabase(), false, result))
+ {
+ LogRequest(request, requestInfo, ev->Sender, requestId, Counters->GetDbCounters(request.GetDatabase()));
+ ReplyProcessError(result.YdbStatus, result.Error, requestId);
+ return;
+ }
- auto dbCounters = GetDbCountersForSession(request.GetSessionId());
+ request.SetSessionId(result.Value->SessionId);
+ }
+
+ TString sessionId = request.GetSessionId();
+ const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId);
+ auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr;
+ if (!dbCounters) {
+ dbCounters = Counters->GetDbCounters(request.GetDatabase());
+ }
+
+ PendingRequests.SetSessionId(requestId, sessionId, dbCounters);
+ LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters);
auto queryLimitBytes = TableServiceConfig.GetQueryLimitBytes();
if (queryLimitBytes && IsSqlQuery(request.GetType())) {
@@ -641,10 +519,6 @@ public:
if (querySizeBytes > queryLimitBytes) {
TString error = TStringBuilder() << "Query text size exceeds limit (" << querySizeBytes << "b > " << queryLimitBytes << "b)";
ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId);
- if (!dbCounters) {
- dbCounters = Counters->GetDbCounters(request.GetDatabase());
- }
- LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters);
return;
}
}
@@ -653,11 +527,6 @@ public:
if (paramsLimitBytes) {
auto paramsBytes = request.GetParameters().ByteSizeLong();
if (paramsBytes > paramsLimitBytes) {
- if (!dbCounters) {
- dbCounters = Counters->GetDbCounters(request.GetDatabase());
- }
- LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters);
-
TString error = TStringBuilder() << "Parameters size exceeds limit (" << paramsBytes << "b > " << paramsLimitBytes << "b)";
ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId);
return;
@@ -679,42 +548,15 @@ public:
}
TActorId targetId;
- if (!request.GetSessionId().empty()) {
- TProcessResult<TActorId> result;
- if (!TryGetSessionTargetActor(request.GetSessionId(), requestInfo, result)) {
- if (!dbCounters) {
- dbCounters = Counters->GetDbCounters(request.GetDatabase());
- }
- LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters);
- ReplyProcessError(result.YdbStatus, result.Error, requestId);
- return;
- }
-
- targetId = result.Value;
-
- LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters);
+ if (sessionInfo) {
+ targetId = sessionInfo->WorkerId;
} else {
- TProcessResult<TKqpSessionInfo*> result;
- if (!CreateNewSessionWorker(requestInfo, TString(DefaultKikimrPublicClusterName), false,
- request.GetDatabase(), false, result))
- {
- if (!dbCounters) {
- dbCounters = Counters->GetDbCounters(request.GetDatabase());
- }
- LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters);
- ReplyProcessError(result.YdbStatus, result.Error, requestId);
+ targetId = TryGetSessionTargetActor(request.GetSessionId(), requestInfo, requestId);
+ if (!targetId) {
return;
}
-
- targetId = result.Value->WorkerId;
- request.SetSessionId(result.Value->SessionId);
- dbCounters = result.Value->DbCounters;
-
- LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters);
}
- TString sessionId = request.GetSessionId();
- PendingRequests.SetSessionId(requestId, sessionId, dbCounters);
// We add extra milliseconds to the user-specified timeout, so it means we give additional priority for worker replies,
// because it is much better to give detailed error message rather than generic timeout.
// For example, it helps to avoid race in event order when worker and proxy recieve timeout at the same moment.
@@ -732,23 +574,23 @@ public:
TKqpRequestInfo requestInfo(event.GetTraceId());
TString sessionId = request.GetSessionId();
- auto dbCounters = GetDbCountersForSession(sessionId);
+ const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId);
+ auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr;
LogRequest(request, requestInfo, ev->Sender, dbCounters);
- auto sessionInfo = LocalSessions->IsPendingShutdown(sessionId);
- if (sessionInfo) {
- if (dbCounters) {
- // session is pending shutdown, and we close it
- // but direct request from user.
- Counters->ReportSessionGracefulShutdownHit(sessionInfo->DbCounters);
- }
+ if (LocalSessions->IsPendingShutdown(sessionId) && dbCounters) {
+ Counters->ReportSessionGracefulShutdownHit(dbCounters);
}
- if (!sessionId.empty()) {
- TProcessResult<TActorId> result;
- if (TryGetSessionTargetActor(sessionId, requestInfo, result)) {
- Send(result.Value, ev->Release().Release());
+ if (sessionInfo) {
+ Send(sessionInfo->WorkerId, ev->Release().Release());
+ } else {
+ if (!sessionId.empty()) {
+ TActorId targetId = TryGetSessionTargetActor(sessionId, requestInfo, 0);
+ if (targetId) {
+ Send(targetId, ev->Release().Release());
+ }
}
}
}
@@ -761,15 +603,18 @@ public:
TKqpRequestInfo requestInfo(traceId);
auto sessionId = request.GetSessionId();
ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvPingSessionRequest);
-
- auto dbCounters = GetDbCountersForSession(sessionId);
-
+ const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId);
+ auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr;
LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters);
- TProcessResult<TActorId> result;
- if (!TryGetSessionTargetActor(sessionId, requestInfo, result)) {
- ReplyProcessError(result.YdbStatus, result.Error, requestId);
- return;
+ TActorId targetId;
+ if (sessionInfo) {
+ targetId = sessionInfo->WorkerId;
+ } else {
+ targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId);
+ if (!targetId) {
+ return;
+ }
}
TDuration timeout = DEFAULT_KEEP_ALIVE_TIMEOUT;
@@ -779,7 +624,7 @@ public:
PendingRequests.SetSessionId(requestId, sessionId, dbCounters);
StartQueryTimeout(requestId, timeout);
- Send(result.Value, ev->Release().Release(), IEventHandle::FlagTrackDelivery, requestId);
+ Send(targetId, ev->Release().Release(), IEventHandle::FlagTrackDelivery, requestId);
}
template<typename TEvent>
@@ -821,8 +666,7 @@ public:
}
}
- void Handle(TEvPrivate::TEvCollectPeerProxyData::TPtr& ev) {
- Y_UNUSED(ev);
+ void Handle(TEvPrivate::TEvCollectPeerProxyData::TPtr&) {
LookupPeerProxyData();
if (!ShutdownRequested) {
const auto& sbs = TableServiceConfig.GetSessionBalancerSettings();
@@ -1171,11 +1015,10 @@ private:
Counters->ReportResponseStatus(dbCounters, event.ByteSize(), status);
}
- void LogResponse(const TKqpRequestInfo& requestInfo,
+ void LogResponse(const TKqpRequestInfo&,
const TEvKqp::TProtoArenaHolder<NKikimrKqp::TEvQueryResponse>& holder,
TKqpDbCountersPtr dbCounters)
{
- Y_UNUSED(requestInfo);
const auto& event = holder.GetRef();
Counters->ReportResponseStatus(dbCounters, event.ByteSize(),
@@ -1192,20 +1035,16 @@ private:
Counters->ReportResultsBytes(dbCounters, resultsBytes);
}
- void LogResponse(const TKqpRequestInfo& requestInfo,
+ void LogResponse(const TKqpRequestInfo&,
const NKikimrKqp::TEvCreateSessionResponse& event, TKqpDbCountersPtr dbCounters)
{
- Y_UNUSED(requestInfo);
-
Counters->ReportResponseStatus(dbCounters, event.ByteSize(),
event.GetYdbStatus());
}
- void LogResponse(const TKqpRequestInfo& requestInfo,
+ void LogResponse(const TKqpRequestInfo&,
const NKikimrKqp::TEvPingSessionResponse& event, TKqpDbCountersPtr dbCounters)
{
- Y_UNUSED(requestInfo);
-
Counters->ReportResponseStatus(dbCounters, event.ByteSize(), event.GetStatus());
}
@@ -1334,34 +1173,21 @@ private:
return true;
}
- bool TryGetSessionTargetActor(const TString& sessionId, const TKqpRequestInfo& requestInfo, TProcessResult<TActorId>& result)
+ TActorId TryGetSessionTargetActor(const TString& sessionId, const TKqpRequestInfo& requestInfo, ui64 requestId)
{
- result.YdbStatus = Ydb::StatusIds::SUCCESS;
- result.Error.clear();
-
auto nodeId = TryDecodeYdbSessionId(sessionId);
if (!nodeId) {
TString error = TStringBuilder() << "Failed to parse session id: " << sessionId;
KQP_PROXY_LOG_W(requestInfo << error);
-
- result.YdbStatus = Ydb::StatusIds::BAD_REQUEST;
- result.Error = error;
- return false;
+ ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId);
+ return TActorId();
}
if (*nodeId == SelfId().NodeId()) {
- auto localSession = LocalSessions->FindPtr(sessionId);
- if (!localSession) {
- TString error = TStringBuilder() << "Session not found: " << sessionId;
- KQP_PROXY_LOG_N(requestInfo << error);
-
- result.YdbStatus = Ydb::StatusIds::BAD_SESSION;
- result.Error = error;
- return false;
- }
-
- result.Value = localSession->WorkerId;
- return true;
+ TString error = TStringBuilder() << "Session not found: " << sessionId;
+ KQP_PROXY_LOG_N(requestInfo << error);
+ ReplyProcessError(Ydb::StatusIds::BAD_SESSION, error, requestId);
+ return TActorId();
}
if (!Tenants.empty()) {
@@ -1369,8 +1195,7 @@ private:
Counters->ReportProxyForwardedRequest(counters);
}
- result.Value = MakeKqpProxyID(*nodeId);
- return true;
+ return MakeKqpProxyID(*nodeId);
}
void RemoveSession(const TString& sessionId, const TActorId& workerId) {
@@ -1410,11 +1235,6 @@ private:
NYql::NDq::SetYqlLogLevels(yqlPriority);
}
- TKqpDbCountersPtr GetDbCountersForSession(const TString& sessionId) const {
- auto localSession = LocalSessions->FindPtr(sessionId);
- return localSession ? localSession->DbCounters : nullptr;
- }
-
private:
NYql::NLog::YqlLoggerScope YqlLoggerScope;
NKikimrConfig::TLogConfig LogConfig;
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.h b/ydb/core/kqp/proxy/kqp_proxy_service.h
index fe3d58ed425..e6d44a176ca 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_service.h
+++ b/ydb/core/kqp/proxy/kqp_proxy_service.h
@@ -40,7 +40,7 @@ class TKqpProxyRequestTracker {
public:
TKqpProxyRequestTracker()
- : RequestId(0)
+ : RequestId(1)
{}
ui64 RegisterRequest(const TActorId& sender, ui64 senderCookie, const TString& traceId, ui32 eventType) {
@@ -129,6 +129,144 @@ struct TPeerStats {
};
+class TLocalSessionsRegistry {
+ THashMap<TString, TKqpSessionInfo> LocalSessions;
+ THashMap<TActorId, TString> TargetIdIndex;
+ THashSet<TString> ShutdownInFlightSessions;
+ THashMap<TString, ui32> SessionsCountPerDatabase;
+ std::vector<std::vector<TString>> ReadySessions;
+ TIntrusivePtr<IRandomProvider> RandomProvider;
+
+public:
+ TLocalSessionsRegistry(TIntrusivePtr<IRandomProvider> randomProvider)
+ : ReadySessions(2)
+ , RandomProvider(randomProvider)
+ {}
+
+ TKqpSessionInfo* Create(const TString& sessionId, const TActorId& workerId,
+ const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing)
+ {
+ std::vector<i32> pos(2, -1);
+ pos[0] = ReadySessions[0].size();
+ ReadySessions[0].push_back(sessionId);
+
+ if (supportsBalancing) {
+ pos[1] = ReadySessions[1].size();
+ ReadySessions[1].push_back(sessionId);
+ }
+
+ auto result = LocalSessions.emplace(sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos)));
+ SessionsCountPerDatabase[database]++;
+ Y_VERIFY(result.second, "Duplicate session id!");
+ TargetIdIndex.emplace(workerId, sessionId);
+ return &result.first->second;
+ }
+
+ const THashSet<TString>& GetShutdownInFlight() const {
+ return ShutdownInFlightSessions;
+ }
+
+ TKqpSessionInfo* StartShutdownSession(const TString& sessionId) {
+ ShutdownInFlightSessions.emplace(sessionId);
+ auto ptr = LocalSessions.FindPtr(sessionId);
+ ptr->ShutdownStartedAt = TAppData::TimeProvider->Now();
+ RemoveSessionFromLists(ptr);
+ return ptr;
+ }
+
+ TKqpSessionInfo* PickSessionToShutdown(bool force, ui32 minReasonableToKick) {
+ auto& sessions = force ? ReadySessions.at(0) : ReadySessions.at(1);
+ if (sessions.size() >= minReasonableToKick) {
+ ui64 idx = RandomProvider->GenRand() % sessions.size();
+ return StartShutdownSession(sessions[idx]);
+ }
+
+ return nullptr;
+ }
+
+ THashMap<TString, TKqpSessionInfo>::const_iterator begin() const {
+ return LocalSessions.begin();
+ }
+
+ THashMap<TString, TKqpSessionInfo>::const_iterator end() const {
+ return LocalSessions.end();
+ }
+
+ size_t GetShutdownInFlightSize() const {
+ return ShutdownInFlightSessions.size();
+ }
+
+ void Erase(const TString& sessionId) {
+ auto it = LocalSessions.find(sessionId);
+ if (it != LocalSessions.end()) {
+ auto counter = SessionsCountPerDatabase.find(it->second.Database);
+ if (counter != SessionsCountPerDatabase.end()) {
+ counter->second--;
+ if (counter->second == 0) {
+ SessionsCountPerDatabase.erase(counter);
+ }
+ }
+
+ RemoveSessionFromLists(&(it->second));
+ ShutdownInFlightSessions.erase(sessionId);
+ TargetIdIndex.erase(it->second.WorkerId);
+ LocalSessions.erase(it);
+ }
+ }
+
+ bool IsPendingShutdown(const TString& sessionId) const {
+ return ShutdownInFlightSessions.find(sessionId) != ShutdownInFlightSessions.end();
+ }
+
+ bool CheckDatabaseLimits(const TString& database, ui32 databaseLimit) {
+ auto it = SessionsCountPerDatabase.find(database);
+ if (it == SessionsCountPerDatabase.end()){
+ return true;
+ }
+
+ if (it->second + 1 <= databaseLimit) {
+ return true;
+ }
+
+ return false;
+ }
+
+ size_t size() const {
+ return LocalSessions.size();
+ }
+
+ const TKqpSessionInfo* FindPtr(const TString& sessionId) const {
+ return LocalSessions.FindPtr(sessionId);
+ }
+
+ void Erase(const TActorId& targetId) {
+ auto it = TargetIdIndex.find(targetId);
+ if (it != TargetIdIndex.end()){
+ Erase(it->second);
+ }
+ }
+
+private:
+ void RemoveSessionFromLists(TKqpSessionInfo* ptr) {
+ for(ui32 i = 0; i < ptr->ReadyPos.size(); ++i) {
+ i32& pos = ptr->ReadyPos.at(i);
+ auto& sessions = ReadySessions.at(i);
+ if (pos != -1 && pos + 1 != static_cast<i32>(sessions.size())) {
+ auto& lastPos = LocalSessions.at(sessions.back()).ReadyPos.at(i);
+ Y_VERIFY(lastPos + 1 == static_cast<i32>(sessions.size()));
+ std::swap(sessions[pos], sessions[lastPos]);
+ lastPos = pos;
+ }
+
+ if (pos != -1) {
+ sessions.pop_back();
+ pos = -1;
+ }
+ }
+ }
+};
+
+
TSimpleResourceStats CalcPeerStats(
const TVector<NKikimrKqp::TKqpProxyNodeResources>& data, const TString& selfDataCenterId, bool localDatacenterPolicy,
std::function<double(const NKikimrKqp::TKqpProxyNodeResources& entry)> ExtractValue);