diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-30 20:37:14 +0300 |
---|---|---|
committer | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-30 20:37:14 +0300 |
commit | d29a0b28f5078c91efce67d27e9eee433159d999 (patch) | |
tree | ba13e798e767c5ebd6926f348beb7c8a8e80e072 | |
parent | 53d4c76b348320ead25bfe9aa4efdc92d6fbb762 (diff) | |
download | ydb-d29a0b28f5078c91efce67d27e9eee433159d999.tar.gz |
add option to balancer: enable tiers KIKIMR-15226
ref:3c4093a5f1da7ee65bbaa753f7cb3e24a65d03e1
-rw-r--r-- | ydb/core/grpc_services/rpc_create_session.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/proxy/kqp_proxy_service.cpp | 242 | ||||
-rw-r--r-- | ydb/core/kqp/proxy/kqp_proxy_service.h | 6 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 22 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 1 |
7 files changed, 197 insertions, 93 deletions
diff --git a/ydb/core/grpc_services/rpc_create_session.cpp b/ydb/core/grpc_services/rpc_create_session.cpp index eeaf8c1eae..7f52771a3d 100644 --- a/ydb/core/grpc_services/rpc_create_session.cpp +++ b/ydb/core/grpc_services/rpc_create_session.cpp @@ -66,6 +66,7 @@ private: if (Request().HasClientCapability(NYdb::YDB_CLIENT_CAPABILITY_SESSION_BALANCER)) { ev->Record.SetCanCreateRemoteSession(true); + ev->Record.SetSupportsBalancing(true); } SetDatabase(ev, Request()); diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index 291450f6f6..7385f21495 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -206,8 +206,8 @@ void TKqpCountersBase::Init() { SessionActorCleanupLatency = KqpGroup->GetHistogram( "SessionActors/CleanupLatencyMs", NMonitoring::ExponentialHistogram(10, 2, 1)); - SessionBalancerCV = KqpGroup->GetCounter("SessionBalancer/CV", false); SessionBalancerShutdowns = KqpGroup->GetCounter("SessionBalancer/Shutdown", true); + SessionGracefulShutdownHit = KqpGroup->GetCounter("SessionBalancer/GracefulHit", true); /* Transactions */ TxCreated = KqpGroup->GetCounter("Transactions/Created", true); @@ -270,6 +270,10 @@ void TKqpCountersBase::ReportSessionShutdownRequest() { SessionBalancerShutdowns->Inc(); } +void TKqpCountersBase::ReportSessionGracefulShutdownHit() { + SessionGracefulShutdownHit->Inc(); +} + void TKqpCountersBase::ReportCreateSession(ui64 requestSize) { CreateSessionRequests->Inc(); *RequestBytes += requestSize; @@ -807,6 +811,14 @@ void TKqpCounters::ReportSessionShutdownRequest(TKqpDbCountersPtr dbCounters) { } } +void TKqpCounters::ReportSessionGracefulShutdownHit(TKqpDbCountersPtr dbCounters) { + TKqpCountersBase::ReportSessionGracefulShutdownHit(); + if (dbCounters) { + dbCounters->ReportSessionGracefulShutdownHit(); + } + +} + void TKqpCounters::ReportCreateSession(TKqpDbCountersPtr dbCounters, ui64 requestSize) { TKqpCountersBase::ReportCreateSession(requestSize); if (dbCounters) { diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index d811bd9722..cbbadf7380 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -36,6 +36,7 @@ protected: void ReportQueryAction(NKikimrKqp::EQueryAction action); void ReportQueryType(NKikimrKqp::EQueryType type); + void ReportSessionGracefulShutdownHit(); void ReportSessionShutdownRequest(); void ReportCreateSession(ui64 requestSize); void ReportPingSession(ui64 requestSize); @@ -159,7 +160,7 @@ protected: NMonitoring::TDynamicCounters::TCounterPtr WorkersClosedError; NMonitoring::TDynamicCounters::TCounterPtr WorkersClosedRequest; NMonitoring::TDynamicCounters::TCounterPtr ActiveWorkers; - NMonitoring::TDynamicCounters::TCounterPtr SessionBalancerCV; + NMonitoring::TDynamicCounters::TCounterPtr SessionGracefulShutdownHit; NMonitoring::TDynamicCounters::TCounterPtr SessionBalancerShutdowns; NMonitoring::TDynamicCounters::TCounterPtr ProxyForwardedRequests; @@ -263,6 +264,7 @@ public: explicit TKqpCounters(const NMonitoring::TDynamicCounterPtr& counters, const TActorContext* ctx = nullptr); void ReportProxyForwardedRequest(TKqpDbCountersPtr dbCounters); + void ReportSessionGracefulShutdownHit(TKqpDbCountersPtr dbCounters); void ReportSessionShutdownRequest(TKqpDbCountersPtr dbCounters); void ReportCreateSession(TKqpDbCountersPtr dbCounters, ui64 requestSize); void ReportPingSession(TKqpDbCountersPtr dbCounters, ui64 requestSize); diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp index 614844d28c..9bbbb1320b 100644 --- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp @@ -55,6 +55,9 @@ static constexpr TDuration DEFAULT_EXTRA_TIMEOUT_WAIT = TDuration::MilliSeconds( static constexpr TDuration DEFAULT_CREATE_SESSION_TIMEOUT = TDuration::MilliSeconds(5000); +using namespace NKikimrConfig; + + std::optional<ui32> GetDefaultStateStorageGroupId(const TString& database) { if (auto* domainInfo = AppData()->DomainsInfo->GetDomainByName(ExtractDomain(database))) { return domainInfo->DefaultStateStorageGroup; @@ -118,20 +121,30 @@ class TLocalSessionsRegistry { THashMap<TString, TKqpSessionInfo> LocalSessions; THashMap<TActorId, TString> TargetIdIndex; THashSet<TString> ShutdownInFlightSessions; - TList<TString> RecentUsedSessions; - THashMap<TString, TList<TString>::iterator> Links; THashMap<TString, ui32> SessionsCountPerDatabase; + std::vector<std::vector<TString>> ReadySessions; + TIntrusivePtr<IRandomProvider> RandomProvider; public: - TLocalSessionsRegistry() = default; + TLocalSessionsRegistry(TIntrusivePtr<IRandomProvider> randomProvider) + : ReadySessions(2) + , RandomProvider(randomProvider) + {} TKqpSessionInfo* Create(const TString& sessionId, const TActorId& workerId, - const TString& database, TKqpDbCountersPtr dbCounters) + const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing) { - RecentUsedSessions.push_front(sessionId); - auto result = LocalSessions.emplace(sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters)); + std::vector<i32> pos(2, -1); + pos[0] = ReadySessions[0].size(); + ReadySessions[0].push_back(sessionId); + + if (supportsBalancing) { + pos[1] = ReadySessions[1].size(); + ReadySessions[1].push_back(sessionId); + } + + auto result = LocalSessions.emplace(sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos))); SessionsCountPerDatabase[database]++; - Links.emplace(sessionId, RecentUsedSessions.begin()); Y_VERIFY(result.second, "Duplicate session id!"); TargetIdIndex.emplace(workerId, sessionId); return &result.first->second; @@ -141,23 +154,21 @@ public: return ShutdownInFlightSessions; } - TKqpSessionInfo* CanShutdownSession(const TString& sessionId) { - auto [_, success] = ShutdownInFlightSessions.emplace(sessionId); - if (success) { - auto ptr = LocalSessions.FindPtr(sessionId); - ptr->ShutdownStartedAt = TAppData::TimeProvider->Now(); - return ptr; - } - - return nullptr; + TKqpSessionInfo* StartShutdownSession(const TString& sessionId) { + ShutdownInFlightSessions.emplace(sessionId); + auto ptr = LocalSessions.FindPtr(sessionId); + ptr->ShutdownStartedAt = TAppData::TimeProvider->Now(); + RemoveSessionFromLists(ptr); + return ptr; } - TKqpSessionInfo* PickSessionToShutdown() { - for(auto it = RecentUsedSessions.begin(); it != RecentUsedSessions.end(); ++it) { - TKqpSessionInfo* session = CanShutdownSession(*it); - if (session) - return session; + TKqpSessionInfo* PickSessionToShutdown(bool force, ui32 minReasonableToKick) { + auto& sessions = force ? ReadySessions.at(0) : ReadySessions.at(1); + if (sessions.size() >= minReasonableToKick) { + ui64 idx = RandomProvider->GenRand() % sessions.size(); + return StartShutdownSession(sessions[idx]); } + return nullptr; } @@ -183,15 +194,40 @@ public: SessionsCountPerDatabase.erase(counter); } } - auto lnk = Links.find(sessionId); - RecentUsedSessions.erase(lnk->second); - Links.erase(sessionId); + + RemoveSessionFromLists(&(it->second)); ShutdownInFlightSessions.erase(sessionId); TargetIdIndex.erase(it->second.WorkerId); LocalSessions.erase(it); } } + void RemoveSessionFromLists(TKqpSessionInfo* ptr) { + for(ui32 i = 0; i < ptr->ReadyPos.size(); ++i) { + i32& pos = ptr->ReadyPos.at(i); + auto& sessions = ReadySessions.at(i); + if (pos != -1 && pos + 1 != static_cast<i32>(sessions.size())) { + auto& lastPos = LocalSessions.at(sessions.back()).ReadyPos.at(i); + Y_VERIFY(lastPos + 1 == static_cast<i32>(sessions.size())); + std::swap(sessions[pos], sessions[lastPos]); + lastPos = pos; + } + + if (pos != -1) { + sessions.pop_back(); + pos = -1; + } + } + } + + const TKqpSessionInfo* IsPendingShutdown(const TString& sessionId) const { + if (ShutdownInFlightSessions.find(sessionId) != ShutdownInFlightSessions.end()) { + return FindPtr(sessionId); + } + + return nullptr; + } + bool CheckDatabaseLimits(const TString& database, ui32 databaseLimit) { auto it = SessionsCountPerDatabase.find(database); if (it == SessionsCountPerDatabase.end()){ @@ -213,25 +249,6 @@ public: return LocalSessions.FindPtr(sessionId); } - void Promote(const TString& sessionId) { - auto lnk = Links.find(sessionId); - Y_VERIFY(lnk != Links.end()); - RecentUsedSessions.erase(lnk->second); - RecentUsedSessions.push_front(lnk->first); - lnk->second = RecentUsedSessions.begin(); - } - - TKqpSessionInfo* FindAndPromote(const TString& sessionId) { - TKqpSessionInfo* ptr = LocalSessions.FindPtr(sessionId); - if (ptr) { - ptr->LastRequestAt = TAppData::TimeProvider->Now(); - ++ptr->UseFrequency; - Promote(sessionId); - } - - return ptr; - } - void Erase(const TActorId& targetId) { auto it = TargetIdIndex.find(targetId); if (it != TargetIdIndex.end()){ @@ -278,13 +295,15 @@ public: , PendingRequests() , TenantsReady(false) , Tenants() - , ModuleResolverState() {} + , ModuleResolverState() + {} void Bootstrap() { NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(KQP_PROVIDER)); Counters = MakeIntrusive<TKqpCounters>(AppData()->Counters, &TlsActivationContext->AsActorContext()); ModuleResolverState = MakeIntrusive<TModuleResolverState>(); + LocalSessions = std::make_unique<TLocalSessionsRegistry>(AppData()->RandomProvider); RandomProvider = AppData()->RandomProvider; if (!GetYqlDefaultModuleResolver(ModuleResolverState->ExprCtx, ModuleResolverState->ModuleResolver)) { TStringStream errorStream; @@ -413,7 +432,7 @@ public: return; } - NodeResources.SetActiveWorkersCount(LocalSessions.size()); + NodeResources.SetActiveWorkersCount(LocalSessions->size()); PublishBoardPath = MakeKqpProxyBoardPath(database); auto actor = CreateBoardPublishActor(PublishBoardPath, NodeResources.SerializeAsString(), SelfId(), *groupId, 0, true); BoardPublishActor = Register(actor); @@ -463,7 +482,7 @@ public: } KQP_PROXY_LOG_I("Received tenant pool status, serving tenants: " << JoinRange(", ", Tenants.begin(), Tenants.end())); - for (auto& [_, sessionInfo] : LocalSessions) { + for (auto& [_, sessionInfo] : *LocalSessions) { if (!sessionInfo.Database.empty() && !Tenants.contains(sessionInfo.Database)) { auto closeSessionEv = MakeHolder<TEvKqp::TEvCloseSessionRequest>(); closeSessionEv->Record.MutableRequest()->SetSessionId(sessionInfo.SessionId); @@ -533,11 +552,11 @@ public: KQP_PROXY_LOG_N("KQP proxy shutdown requested."); ShutdownRequested = true; ShutdownState.Reset(ev->Get()->ShutdownState.Get()); - ShutdownState->Update(LocalSessions.size()); + ShutdownState->Update(LocalSessions->size()); auto& shs = TableServiceConfig.GetShutdownSettings(); ui32 hardTimeout = shs.GetHardTimeoutMs(); ui32 softTimeout = shs.GetSoftTimeoutMs(); - for(auto& [idx, sessionInfo] : LocalSessions) { + for(auto& [idx, sessionInfo] : *LocalSessions) { Send(sessionInfo.WorkerId, new TEvKqp::TEvInitiateSessionShutdown(softTimeout, hardTimeout)); } } @@ -548,6 +567,11 @@ public: return false; } + const auto& sbs = TableServiceConfig.GetSessionBalancerSettings(); + if (!sbs.GetSupportRemoteSessionCreation()) { + return false; + } + ui64 randomNumber = RandomProvider->GenRand(); ui32 nodeId = LocalDatacenterProxies[randomNumber % LocalDatacenterProxies.size()]; if (nodeId == SelfId().NodeId()){ @@ -557,6 +581,7 @@ public: std::unique_ptr<TEvKqp::TEvCreateSessionRequest> remoteRequest = std::make_unique<TEvKqp::TEvCreateSessionRequest>(); remoteRequest->Record.SetDeadlineUs(event.GetDeadlineUs()); remoteRequest->Record.SetTraceId(event.GetTraceId()); + remoteRequest->Record.SetSupportsBalancing(event.GetSupportsBalancing()); remoteRequest->Record.MutableRequest()->SetDatabase(event.GetRequest().GetDatabase()); ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, event.GetTraceId(), TKqpEvents::EvCreateSessionRequest); @@ -582,7 +607,7 @@ public: const auto deadline = TInstant::MicroSeconds(event.GetDeadlineUs()); if (CheckRequestDeadline(requestInfo, deadline, result) && - CreateNewSessionWorker(requestInfo, TString(DefaultKikimrPublicClusterName), true, request.GetDatabase(), result)) + CreateNewSessionWorker(requestInfo, TString(DefaultKikimrPublicClusterName), true, request.GetDatabase(), event.GetSupportsBalancing(), result)) { auto& response = *responseEv->Record.MutableResponse(); response.SetSessionId(result.Value->SessionId); @@ -657,7 +682,7 @@ public: } else { TProcessResult<TKqpSessionInfo*> result; if (!CreateNewSessionWorker(requestInfo, TString(DefaultKikimrPublicClusterName), false, - request.GetDatabase(), result)) + request.GetDatabase(), false, result)) { if (!dbCounters) { dbCounters = Counters->GetDbCounters(request.GetDatabase()); @@ -697,6 +722,15 @@ public: LogRequest(request, requestInfo, ev->Sender, dbCounters); + auto sessionInfo = LocalSessions->IsPendingShutdown(sessionId); + if (sessionInfo) { + if (dbCounters) { + // session is pending shutdown, and we close it + // but direct request from user. + Counters->ReportSessionGracefulShutdownHit(sessionInfo->DbCounters); + } + } + if (!sessionId.empty()) { TProcessResult<TActorId> result; if (TryGetSessionTargetActor(sessionId, requestInfo, result)) { @@ -832,23 +866,61 @@ public: return false; } + std::pair<bool, ui32> GetBalancerEnableSettings() const { + const auto& sbs = TableServiceConfig.GetSessionBalancerSettings(); + ui32 maxInFlightSize = sbs.GetMaxSessionsShutdownInFlightSize(); + bool force = false; + + auto tier = sbs.GetEnableTier(); + if (sbs.GetEnabled()) { + // it's legacy configuration. + tier = TTableServiceConfig_TSessionBalancerSettings::TIER_ENABLED_FOR_ALL; + } + + switch(tier) { + case TTableServiceConfig_TSessionBalancerSettings::TIER_DISABLED: + return {false, 0}; + case TTableServiceConfig_TSessionBalancerSettings::TIER_ENABLED_FOR_ALL: + return {true, maxInFlightSize}; + case TTableServiceConfig_TSessionBalancerSettings::TIER_ENABLED_FOR_SESSIONS_WITH_SUPPORT: + return {false, maxInFlightSize}; + default: + return {false, 0}; + } + + return {force, maxInFlightSize}; + } + void TryKickSession() { const auto& sbs = TableServiceConfig.GetSessionBalancerSettings(); - ui32 maxInFlightSize = sbs.GetMaxSessionsShutdownInFlightSize(); - if (!maxInFlightSize || !sbs.GetEnabled()) - return; + const std::pair<bool, ui32> settings = GetBalancerEnableSettings(); Y_VERIFY(PeerStats); bool isReasonableToKick = false; + ui32 strategy = static_cast<ui32>(sbs.GetStrategy()); + ui32 balanceByCpu = strategy & TTableServiceConfig_TSessionBalancerSettings::BALANCE_BY_CPU; + ui32 balanceByCount = strategy & TTableServiceConfig_TSessionBalancerSettings::BALANCE_BY_COUNT; + if (sbs.GetLocalDatacenterPolicy()) { - isReasonableToKick |= ShouldStartBalancing(PeerStats->LocalSessionCount, static_cast<double>(sbs.GetMinNodeSessions()), static_cast<double>(LocalSessions.size())); - isReasonableToKick |= ShouldStartBalancing(PeerStats->LocalCpu, sbs.GetMinCpuBalancerThreshold(), NodeResources.GetCpuUsage()); + if (balanceByCount) { + isReasonableToKick |= ShouldStartBalancing(PeerStats->LocalSessionCount, static_cast<double>(sbs.GetMinNodeSessions()), static_cast<double>(LocalSessions->size())); + } + + if (balanceByCpu) { + isReasonableToKick |= ShouldStartBalancing(PeerStats->LocalCpu, sbs.GetMinCpuBalancerThreshold(), NodeResources.GetCpuUsage()); + } + } else { - isReasonableToKick |= ShouldStartBalancing(PeerStats->CrossAZSessionCount, static_cast<double>(sbs.GetMinNodeSessions()), static_cast<double>(LocalSessions.size())); - isReasonableToKick |= ShouldStartBalancing(PeerStats->CrossAZCpu, sbs.GetMinCpuBalancerThreshold(), NodeResources.GetCpuUsage()); + if (balanceByCount) { + isReasonableToKick |= ShouldStartBalancing(PeerStats->CrossAZSessionCount, static_cast<double>(sbs.GetMinNodeSessions()), static_cast<double>(LocalSessions->size())); + } + + if (balanceByCpu) { + isReasonableToKick |= ShouldStartBalancing(PeerStats->CrossAZCpu, sbs.GetMinCpuBalancerThreshold(), NodeResources.GetCpuUsage()); + } } if (!isReasonableToKick) { @@ -859,8 +931,8 @@ public: ServerWorkerBalancerComplete = false; } - while(LocalSessions.GetShutdownInFlightSize() < maxInFlightSize) { - auto sessionInfo = LocalSessions.PickSessionToShutdown(); + while(LocalSessions->GetShutdownInFlightSize() < settings.second) { + auto sessionInfo = LocalSessions->PickSessionToShutdown(settings.first, sbs.GetMinNodeSessions()); if (!sessionInfo) { break; } @@ -882,12 +954,12 @@ public: } void ProcessMonShutdownQueue(ui32 wantsToShutdown) { - for(auto& [_, sessionInfo] : LocalSessions) { - if (!wantsToShutdown) + for(ui32 i = 0; i < wantsToShutdown; ++i) { + const TKqpSessionInfo* candidate = LocalSessions->PickSessionToShutdown(true, 0); + if (!candidate) break; - StartSessionGraceShutdown(&sessionInfo); - --wantsToShutdown; + StartSessionGraceShutdown(candidate); } } @@ -901,7 +973,7 @@ public: const TString& forceShutdown = cgi.Get("force_shutdown"); ui32 wantsToShutdown = 0; if (forceShutdown == "all") { - wantsToShutdown = LocalSessions.size(); + wantsToShutdown = LocalSessions->size(); } else { wantsToShutdown = FromStringWithDefault<ui32>(forceShutdown, 0); } @@ -932,12 +1004,13 @@ public: str << "Force shutdown all sessions: <a href=\"kqp_proxy?" << cgiTmp.Print() << "\">Execute</a>" << Endl; } - str << "Session balancer settings: " << Endl; - str << "Enabled: " << (sbs.GetEnabled() ? "true" : "false") << Endl; - str << "MaxSessionsShutdownInFlightSize: " << sbs.GetMaxSessionsShutdownInFlightSize() << Endl; + const std::pair<bool, ui32> sbsSettings = GetBalancerEnableSettings(); + str << "Allow shutdown all sessions: " << (sbsSettings.first ? "true": "false") << Endl; + str << "MaxSessionsShutdownInFlightSize: " << sbsSettings.second << Endl; str << "LocalDatacenterPolicy: " << (sbs.GetLocalDatacenterPolicy() ? "true" : "false") << Endl; str << "MaxCVTreshold: " << sbs.GetMaxCVTreshold() << Endl; str << "MinCVTreshold: " << sbs.GetMinCVTreshold() << Endl; + str << "Balance strategy: " << TTableServiceConfig_TSessionBalancerSettings_EBalancingStrategy_Name(sbs.GetStrategy()) << Endl; str << Endl; @@ -956,17 +1029,16 @@ public: str << "EnableSessionActor: " << (AppData()->FeatureFlags.GetEnableKqpSessionActor() ? "true" : "false") << Endl; - str << "Active workers/session_actors count on node: " << LocalSessions.size() << Endl; + str << "Active workers/session_actors count on node: " << LocalSessions->size() << Endl; - const auto& sessionsShutdownInFlight = LocalSessions.GetShutdownInFlight(); + const auto& sessionsShutdownInFlight = LocalSessions->GetShutdownInFlight(); if (!sessionsShutdownInFlight.empty()) { str << Endl; str << "Sessions shutdown in flight: " << Endl; auto now = TAppData::TimeProvider->Now(); for(const auto& sessionId : sessionsShutdownInFlight) { - auto session = LocalSessions.FindPtr(sessionId); - str << "Session " << sessionId << " is under shutdown for " << (now - session->ShutdownStartedAt).SecondsFloat() << " seconds. " - << "QueryRequests: " << session->UseFrequency << Endl; + auto session = LocalSessions->FindPtr(sessionId); + str << "Session " << sessionId << " is under shutdown for " << (now - session->ShutdownStartedAt).SecondsFloat() << " seconds. " << Endl; } str << Endl; @@ -989,7 +1061,7 @@ public: str << "Peer(NodeId: " << entry.GetNodeId() << ", DataCenter: " << entry.GetDataCenterId() << "): active workers: " << entry.GetActiveWorkersCount() << "): cpu usage: " << entry.GetCpuUsage() << ", threads count: " << entry.GetThreads() << Endl; } - } + } } } @@ -1040,7 +1112,7 @@ public: RemoveSession(sessionId, workerId); KQP_PROXY_LOG_D("Session closed, sessionId: " << event.GetResponse().GetSessionId() - << ", workerId: " << workerId << ", local sessions count: " << LocalSessions.size()); + << ", workerId: " << workerId << ", local sessions count: " << LocalSessions->size()); } } @@ -1183,7 +1255,7 @@ private: } bool CreateNewSessionWorker(const TKqpRequestInfo& requestInfo, - const TString& cluster, bool longSession, const TString& database, TProcessResult<TKqpSessionInfo*>& result) + const TString& cluster, bool longSession, const TString& database, bool supportsBalancing, TProcessResult<TKqpSessionInfo*>& result) { if (!database.empty()) { if (!TenantsReady) { @@ -1209,7 +1281,7 @@ private: } auto sessionsLimitPerNode = TableServiceConfig.GetSessionsLimitPerNode(); - if (sessionsLimitPerNode && !LocalSessions.CheckDatabaseLimits(database, sessionsLimitPerNode)) { + if (sessionsLimitPerNode && !LocalSessions->CheckDatabaseLimits(database, sessionsLimitPerNode)) { TString error = TStringBuilder() << "Active sessions limit exceeded, maximum allowed: " << sessionsLimitPerNode; KQP_PROXY_LOG_W(requestInfo << error); @@ -1232,14 +1304,14 @@ private: ? CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, ModuleResolverState, Counters) : CreateKqpWorkerActor(SelfId(), sessionId, KqpSettings, workerSettings, ModuleResolverState, Counters); auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(workerActor, TMailboxType::HTSwap, AppData()->UserPoolId); - TKqpSessionInfo* sessionInfo = LocalSessions.Create(sessionId, workerId, database, dbCounters); + TKqpSessionInfo* sessionInfo = LocalSessions->Create(sessionId, workerId, database, dbCounters, supportsBalancing); KQP_PROXY_LOG_D(requestInfo << "Created new session" << ", sessionId: " << sessionInfo->SessionId << ", workerId: " << sessionInfo->WorkerId << ", database: " << sessionInfo->Database << ", longSession: " << longSession - << ", local sessions count: " << LocalSessions.size()); + << ", local sessions count: " << LocalSessions->size()); result.YdbStatus = Ydb::StatusIds::SUCCESS; result.Error.clear(); @@ -1264,7 +1336,7 @@ private: } if (*nodeId == SelfId().NodeId()) { - auto localSession = LocalSessions.FindAndPromote(sessionId); + auto localSession = LocalSessions->FindPtr(sessionId); if (!localSession) { TString error = TStringBuilder() << "Session not found: " << sessionId; KQP_PROXY_LOG_N(requestInfo << error); @@ -1289,19 +1361,19 @@ private: void RemoveSession(const TString& sessionId, const TActorId& workerId) { if (!sessionId.empty()) { - LocalSessions.Erase(sessionId); + LocalSessions->Erase(sessionId); PublishResourceUsage(); if (ShutdownRequested) { - ShutdownState->Update(LocalSessions.size()); + ShutdownState->Update(LocalSessions->size()); } return; } - LocalSessions.Erase(workerId); + LocalSessions->Erase(workerId); PublishResourceUsage(); if (ShutdownRequested) { - ShutdownState->Update(LocalSessions.size()); + ShutdownState->Update(LocalSessions->size()); } } @@ -1325,7 +1397,7 @@ private: } TKqpDbCountersPtr GetDbCountersForSession(const TString& sessionId) const { - auto localSession = LocalSessions.FindPtr(sessionId); + auto localSession = LocalSessions->FindPtr(sessionId); return localSession ? localSession->DbCounters : nullptr; } @@ -1348,7 +1420,7 @@ private: TIntrusivePtr<TModuleResolverState> ModuleResolverState; TIntrusivePtr<TKqpCounters> Counters; - TLocalSessionsRegistry LocalSessions; + std::unique_ptr<TLocalSessionsRegistry> LocalSessions; bool ServerWorkerBalancerComplete = false; std::optional<TString> SelfDataCenterId; diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.h b/ydb/core/kqp/proxy/kqp_proxy_service.h index 963ab43cb1..fe3d58ed42 100644 --- a/ydb/core/kqp/proxy/kqp_proxy_service.h +++ b/ydb/core/kqp/proxy/kqp_proxy_service.h @@ -78,19 +78,19 @@ struct TKqpSessionInfo { TActorId WorkerId; TString Database; TKqpDbCountersPtr DbCounters; - ui32 UseFrequency; TInstant LastRequestAt; TInstant CreatedAt; TInstant ShutdownStartedAt; + std::vector<i32> ReadyPos; TKqpSessionInfo(const TString& sessionId, const TActorId& workerId, - const TString& database, TKqpDbCountersPtr dbCounters) + const TString& database, TKqpDbCountersPtr dbCounters, std::vector<i32>&& pos) : SessionId(sessionId) , WorkerId(workerId) , Database(database) , DbCounters(dbCounters) - , UseFrequency(0) , ShutdownStartedAt() + , ReadyPos(std::move(pos)) { auto now = TAppData::TimeProvider->Now(); LastRequestAt = now; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 8041d4f4c8..cd79e3e913 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1046,9 +1046,22 @@ message TTableServiceConfig { message TSessionBalancerSettings { optional bool Enabled = 1 [default = true]; + + enum EEnableTier { + TIER_DISABLED = 1; + TIER_ENABLED_FOR_SESSIONS_WITH_SUPPORT = 2; + TIER_ENABLED_FOR_ALL = 3; + } + + enum EBalancingStrategy { + BALANCE_BY_CPU = 1; + BALANCE_BY_COUNT = 2; + BALANCE_BY_CPU_AND_COUNT = 3; + } + // MaxSessionsShutdownInFlightSize option determines the number of sessions // that we can keep in shutdown state at the moment. - optional uint32 MaxSessionsShutdownInFlightSize = 2 [default = 3]; + optional uint32 MaxSessionsShutdownInFlightSize = 2 [default = 1]; // GREEN ZONE YELLOW ZONE RED ZONE // MinCV MaxCV // < MinCV | | > MaxCV @@ -1063,11 +1076,11 @@ message TTableServiceConfig { optional uint32 MaxCVTreshold = 4 [default = 20]; // timeout (in milliseconds) // After HardSessionShutdownTimeoutMs session will be terminated. - optional uint32 HardSessionShutdownTimeoutMs = 5 [default = 5000]; + optional uint32 HardSessionShutdownTimeoutMs = 5 [default = 30000]; // timeout (in milliseconds) // After SoftSessionShutdownTimeoutMs milliseconds // session will not accept new query requests. - optional uint32 SoftSessionShutdownTimeoutMs = 6 [default = 1000]; + optional uint32 SoftSessionShutdownTimeoutMs = 6 [default = 15000]; // Minimum number of sessions on node to start balancing. If number of sessions on node // less than MinNodeSessions, balancer will not be able to kick session from node. optional uint32 MinNodeSessions = 7 [default = 10]; @@ -1079,6 +1092,9 @@ message TTableServiceConfig { optional uint32 BoardPublishIntervalMs = 10 [default = 1000]; optional uint32 BoardLookupIntervalMs = 11 [default = 30000]; + optional EEnableTier EnableTier = 12 [ default = TIER_DISABLED ]; + optional EBalancingStrategy Strategy = 13 [ default = BALANCE_BY_COUNT ]; + optional bool SupportRemoteSessionCreation = 14 [ default = false ]; } message TQueryReplayConfig { diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index bc9db521ec..e0c220ad21 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -313,6 +313,7 @@ message TEvCreateSessionRequest { // If flag is true, kqp proxy will create session on the different node, // if flag is false, in this case proxy will create session locally. optional bool CanCreateRemoteSession = 4 [default = false]; + optional bool SupportsBalancing = 5 [ default = false]; } message TCreateSessionResponse { |