diff options
author | Vitalii Gridnev <[email protected]> | 2022-02-16 13:15:33 +0300 |
---|---|---|
committer | Vitalii Gridnev <[email protected]> | 2022-02-16 13:15:33 +0300 |
commit | 6887aa978d65f156229edabc4aeef7ed52aecea5 (patch) | |
tree | 4f05a93ef4b433523adb36367361ad2730bc6c16 | |
parent | 7722f5293a9ac69af56a069a22b3fdb359d0cfe5 (diff) |
limit sessions per database KIKIMR-14331
ref:40946267a6649f18d58ed990c1962015e5f92d8e
-rw-r--r-- | ydb/core/kqp/proxy/kqp_proxy_service.cpp | 40 |
1 files changed, 23 insertions, 17 deletions
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp index decd2cb3011..4c444eee4ff 100644 --- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp @@ -118,6 +118,7 @@ class TLocalSessionsRegistry { THashSet<TString> ShutdownInFlightSessions; TList<TString> RecentUsedSessions; THashMap<TString, TList<TString>::iterator> Links; + THashMap<TString, ui32> SessionsCountPerDatabase; public: TLocalSessionsRegistry() = default; @@ -127,6 +128,7 @@ public: { RecentUsedSessions.push_front(sessionId); auto result = LocalSessions.emplace(sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters)); + SessionsCountPerDatabase[database]++; Links.emplace(sessionId, RecentUsedSessions.begin()); Y_VERIFY(result.second, "Duplicate session id!"); TargetIdIndex.emplace(workerId, sessionId); @@ -172,6 +174,13 @@ public: void Erase(const TString& sessionId) { auto it = LocalSessions.find(sessionId); if (it != LocalSessions.end()) { + auto counter = SessionsCountPerDatabase.find(it->second.Database); + if (counter != SessionsCountPerDatabase.end()) { + counter->second--; + if (counter->second == 0) { + SessionsCountPerDatabase.erase(counter); + } + } auto lnk = Links.find(sessionId); RecentUsedSessions.erase(lnk->second); Links.erase(sessionId); @@ -181,6 +190,19 @@ public: } } + bool CheckDatabaseLimits(const TString& database, ui32 databaseLimit) { + auto it = SessionsCountPerDatabase.find(database); + if (it == SessionsCountPerDatabase.end()){ + return true; + } + + if (it->second + 1 <= databaseLimit) { + return true; + } + + return false; + } + size_t size() const { return LocalSessions.size(); } @@ -1113,20 +1135,6 @@ private: result.Error = error; return false; } - - /* - * TODO: disabled due to SLYDB-42. Remove (or rework) it after KIKIMR-9650 & KIKIMR-9652 - if (!Tenants.contains(database)) { - TString error = TStringBuilder() << "Unexpected node for database: " << database; - - LOG_ERROR_S(ctx, NKikimrServices::KQP_PROXY, requestInfo << error - << ", known databases: " << JoinRange(", ", Tenants.begin(), Tenants.end())); - - result.YdbStatus = Ydb::StatusIds::INTERNAL_ERROR; - result.Error = error; - return false; - } - */ } if (ShutdownRequested) { @@ -1135,15 +1143,13 @@ private: KQP_PROXY_LOG_N(requestInfo << error); result.ResourceExhausted = true; - // we can possibly make a better status here. result.YdbStatus = Ydb::StatusIds::OVERLOADED; result.Error = error; - return false; } auto sessionsLimitPerNode = TableServiceConfig.GetSessionsLimitPerNode(); - if (sessionsLimitPerNode && LocalSessions.size() >= sessionsLimitPerNode) { + if (sessionsLimitPerNode && !LocalSessions.CheckDatabaseLimits(database, sessionsLimitPerNode)) { TString error = TStringBuilder() << "Active sessions limit exceeded, maximum allowed: " << sessionsLimitPerNode; KQP_PROXY_LOG_W(requestInfo << error); |