summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <[email protected]>2022-02-16 13:15:33 +0300
committerVitalii Gridnev <[email protected]>2022-02-16 13:15:33 +0300
commit6887aa978d65f156229edabc4aeef7ed52aecea5 (patch)
tree4f05a93ef4b433523adb36367361ad2730bc6c16
parent7722f5293a9ac69af56a069a22b3fdb359d0cfe5 (diff)
limit sessions per database KIKIMR-14331
ref:40946267a6649f18d58ed990c1962015e5f92d8e
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_service.cpp40
1 files changed, 23 insertions, 17 deletions
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
index decd2cb3011..4c444eee4ff 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
@@ -118,6 +118,7 @@ class TLocalSessionsRegistry {
THashSet<TString> ShutdownInFlightSessions;
TList<TString> RecentUsedSessions;
THashMap<TString, TList<TString>::iterator> Links;
+ THashMap<TString, ui32> SessionsCountPerDatabase;
public:
TLocalSessionsRegistry() = default;
@@ -127,6 +128,7 @@ public:
{
RecentUsedSessions.push_front(sessionId);
auto result = LocalSessions.emplace(sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters));
+ SessionsCountPerDatabase[database]++;
Links.emplace(sessionId, RecentUsedSessions.begin());
Y_VERIFY(result.second, "Duplicate session id!");
TargetIdIndex.emplace(workerId, sessionId);
@@ -172,6 +174,13 @@ public:
void Erase(const TString& sessionId) {
auto it = LocalSessions.find(sessionId);
if (it != LocalSessions.end()) {
+ auto counter = SessionsCountPerDatabase.find(it->second.Database);
+ if (counter != SessionsCountPerDatabase.end()) {
+ counter->second--;
+ if (counter->second == 0) {
+ SessionsCountPerDatabase.erase(counter);
+ }
+ }
auto lnk = Links.find(sessionId);
RecentUsedSessions.erase(lnk->second);
Links.erase(sessionId);
@@ -181,6 +190,19 @@ public:
}
}
+ bool CheckDatabaseLimits(const TString& database, ui32 databaseLimit) {
+ auto it = SessionsCountPerDatabase.find(database);
+ if (it == SessionsCountPerDatabase.end()){
+ return true;
+ }
+
+ if (it->second + 1 <= databaseLimit) {
+ return true;
+ }
+
+ return false;
+ }
+
size_t size() const {
return LocalSessions.size();
}
@@ -1113,20 +1135,6 @@ private:
result.Error = error;
return false;
}
-
- /*
- * TODO: disabled due to SLYDB-42. Remove (or rework) it after KIKIMR-9650 & KIKIMR-9652
- if (!Tenants.contains(database)) {
- TString error = TStringBuilder() << "Unexpected node for database: " << database;
-
- LOG_ERROR_S(ctx, NKikimrServices::KQP_PROXY, requestInfo << error
- << ", known databases: " << JoinRange(", ", Tenants.begin(), Tenants.end()));
-
- result.YdbStatus = Ydb::StatusIds::INTERNAL_ERROR;
- result.Error = error;
- return false;
- }
- */
}
if (ShutdownRequested) {
@@ -1135,15 +1143,13 @@ private:
KQP_PROXY_LOG_N(requestInfo << error);
result.ResourceExhausted = true;
- // we can possibly make a better status here.
result.YdbStatus = Ydb::StatusIds::OVERLOADED;
result.Error = error;
-
return false;
}
auto sessionsLimitPerNode = TableServiceConfig.GetSessionsLimitPerNode();
- if (sessionsLimitPerNode && LocalSessions.size() >= sessionsLimitPerNode) {
+ if (sessionsLimitPerNode && !LocalSessions.CheckDatabaseLimits(database, sessionsLimitPerNode)) {
TString error = TStringBuilder() << "Active sessions limit exceeded, maximum allowed: "
<< sessionsLimitPerNode;
KQP_PROXY_LOG_W(requestInfo << error);