diff options
author | gvit <gvit@ydb.tech> | 2022-09-08 12:25:36 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2022-09-08 12:25:36 +0300 |
commit | 5c79f4555d1f786b56008cdc1075ec5a176b3723 (patch) | |
tree | b43633df745795751daaaeacc3256d86f57fa88f | |
parent | ad99613425cb5d6a84eb2643264b1ec4f64740ec (diff) | |
download | ydb-5c79f4555d1f786b56008cdc1075ec5a176b3723.tar.gz |
refactor kqp proxy: avoid decoding session in hot pass, avoid multiple lookups in hashtables
-rw-r--r-- | ydb/core/kqp/proxy/kqp_proxy_service.cpp | 312 | ||||
-rw-r--r-- | ydb/core/kqp/proxy/kqp_proxy_service.h | 140 |
2 files changed, 205 insertions, 247 deletions
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp index 38a8e168622..2e198a3d4fb 100644 --- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp @@ -117,147 +117,6 @@ TString EncodeSessionId(ui32 nodeId, const TString& id) { } -class TLocalSessionsRegistry { - THashMap<TString, TKqpSessionInfo> LocalSessions; - THashMap<TActorId, TString> TargetIdIndex; - THashSet<TString> ShutdownInFlightSessions; - THashMap<TString, ui32> SessionsCountPerDatabase; - std::vector<std::vector<TString>> ReadySessions; - TIntrusivePtr<IRandomProvider> RandomProvider; - -public: - TLocalSessionsRegistry(TIntrusivePtr<IRandomProvider> randomProvider) - : ReadySessions(2) - , RandomProvider(randomProvider) - {} - - TKqpSessionInfo* Create(const TString& sessionId, const TActorId& workerId, - const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing) - { - std::vector<i32> pos(2, -1); - pos[0] = ReadySessions[0].size(); - ReadySessions[0].push_back(sessionId); - - if (supportsBalancing) { - pos[1] = ReadySessions[1].size(); - ReadySessions[1].push_back(sessionId); - } - - auto result = LocalSessions.emplace(sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos))); - SessionsCountPerDatabase[database]++; - Y_VERIFY(result.second, "Duplicate session id!"); - TargetIdIndex.emplace(workerId, sessionId); - return &result.first->second; - } - - const THashSet<TString>& GetShutdownInFlight() const { - return ShutdownInFlightSessions; - } - - TKqpSessionInfo* StartShutdownSession(const TString& sessionId) { - ShutdownInFlightSessions.emplace(sessionId); - auto ptr = LocalSessions.FindPtr(sessionId); - ptr->ShutdownStartedAt = TAppData::TimeProvider->Now(); - RemoveSessionFromLists(ptr); - return ptr; - } - - TKqpSessionInfo* PickSessionToShutdown(bool force, ui32 minReasonableToKick) { - auto& sessions = force ? ReadySessions.at(0) : ReadySessions.at(1); - if (sessions.size() >= minReasonableToKick) { - ui64 idx = RandomProvider->GenRand() % sessions.size(); - return StartShutdownSession(sessions[idx]); - } - - return nullptr; - } - - THashMap<TString, TKqpSessionInfo>::const_iterator begin() const { - return LocalSessions.begin(); - } - - THashMap<TString, TKqpSessionInfo>::const_iterator end() const { - return LocalSessions.end(); - } - - size_t GetShutdownInFlightSize() const { - return ShutdownInFlightSessions.size(); - } - - void Erase(const TString& sessionId) { - auto it = LocalSessions.find(sessionId); - if (it != LocalSessions.end()) { - auto counter = SessionsCountPerDatabase.find(it->second.Database); - if (counter != SessionsCountPerDatabase.end()) { - counter->second--; - if (counter->second == 0) { - SessionsCountPerDatabase.erase(counter); - } - } - - RemoveSessionFromLists(&(it->second)); - ShutdownInFlightSessions.erase(sessionId); - TargetIdIndex.erase(it->second.WorkerId); - LocalSessions.erase(it); - } - } - - void RemoveSessionFromLists(TKqpSessionInfo* ptr) { - for(ui32 i = 0; i < ptr->ReadyPos.size(); ++i) { - i32& pos = ptr->ReadyPos.at(i); - auto& sessions = ReadySessions.at(i); - if (pos != -1 && pos + 1 != static_cast<i32>(sessions.size())) { - auto& lastPos = LocalSessions.at(sessions.back()).ReadyPos.at(i); - Y_VERIFY(lastPos + 1 == static_cast<i32>(sessions.size())); - std::swap(sessions[pos], sessions[lastPos]); - lastPos = pos; - } - - if (pos != -1) { - sessions.pop_back(); - pos = -1; - } - } - } - - const TKqpSessionInfo* IsPendingShutdown(const TString& sessionId) const { - if (ShutdownInFlightSessions.find(sessionId) != ShutdownInFlightSessions.end()) { - return FindPtr(sessionId); - } - - return nullptr; - } - - bool CheckDatabaseLimits(const TString& database, ui32 databaseLimit) { - auto it = SessionsCountPerDatabase.find(database); - if (it == SessionsCountPerDatabase.end()){ - return true; - } - - if (it->second + 1 <= databaseLimit) { - return true; - } - - return false; - } - - size_t size() const { - return LocalSessions.size(); - } - - const TKqpSessionInfo* FindPtr(const TString& sessionId) const { - return LocalSessions.FindPtr(sessionId); - } - - void Erase(const TActorId& targetId) { - auto it = TargetIdIndex.find(targetId); - if (it != TargetIdIndex.end()){ - Erase(it->second); - } - } -}; - - class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> { struct TEvPrivate { enum EEv { @@ -456,8 +315,7 @@ public: DoPublishResources(); } - void Handle(TEvPrivate::TEvReadyToPublishResources::TPtr& ev) { - Y_UNUSED(ev); + void Handle(TEvPrivate::TEvReadyToPublishResources::TPtr&) { ResourcesPublishScheduled = false; DoPublishResources(); } @@ -493,8 +351,7 @@ public: PublishResourceUsage(); } - void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr& ev) { - Y_UNUSED(ev); + void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) { KQP_PROXY_LOG_D("Subscribed for config changes."); } @@ -561,7 +418,7 @@ public: } } - bool CreateRemoteSession(TEvKqp::TEvCreateSessionRequest::TPtr& ev) { + bool CreateRemoteSession(TEvKqp::TEvCreateSessionRequest::TPtr& ev, ui64 requestId) { auto& event = ev->Get()->Record; if (!event.GetCanCreateRemoteSession() || LocalDatacenterProxies.empty()) { return false; @@ -584,7 +441,6 @@ public: remoteRequest->Record.SetSupportsBalancing(event.GetSupportsBalancing()); remoteRequest->Record.MutableRequest()->SetDatabase(event.GetRequest().GetDatabase()); - ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, event.GetTraceId(), TKqpEvents::EvCreateSessionRequest); Send(MakeKqpProxyID(nodeId), remoteRequest.release(), IEventHandle::FlagTrackDelivery, requestId); TDuration timeout = DEFAULT_CREATE_SESSION_TIMEOUT; StartQueryTimeout(requestId, timeout); @@ -595,7 +451,8 @@ public: auto& event = ev->Get()->Record; auto& request = event.GetRequest(); TKqpRequestInfo requestInfo(event.GetTraceId()); - if (CreateRemoteSession(ev)) { + ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, event.GetTraceId(), TKqpEvents::EvCreateSessionRequest); + if (CreateRemoteSession(ev, requestId)) { return; } @@ -622,6 +479,7 @@ public: responseEv->Record.SetYdbStatus(result.YdbStatus); responseEv->Record.SetError(result.Error); + PendingRequests.Erase(requestId); LogResponse(event.GetTraceId(), responseEv->Record, dbCounters); Send(ev->Sender, responseEv.Release(), 0, ev->Cookie); } @@ -632,8 +490,28 @@ public: TString traceId = event.GetTraceId(); TKqpRequestInfo requestInfo(traceId); ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvQueryRequest); + if (request.GetSessionId().empty()) { + TProcessResult<TKqpSessionInfo*> result; + if (!CreateNewSessionWorker(requestInfo, TString(DefaultKikimrPublicClusterName), false, + request.GetDatabase(), false, result)) + { + LogRequest(request, requestInfo, ev->Sender, requestId, Counters->GetDbCounters(request.GetDatabase())); + ReplyProcessError(result.YdbStatus, result.Error, requestId); + return; + } - auto dbCounters = GetDbCountersForSession(request.GetSessionId()); + request.SetSessionId(result.Value->SessionId); + } + + TString sessionId = request.GetSessionId(); + const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId); + auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr; + if (!dbCounters) { + dbCounters = Counters->GetDbCounters(request.GetDatabase()); + } + + PendingRequests.SetSessionId(requestId, sessionId, dbCounters); + LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters); auto queryLimitBytes = TableServiceConfig.GetQueryLimitBytes(); if (queryLimitBytes && IsSqlQuery(request.GetType())) { @@ -641,10 +519,6 @@ public: if (querySizeBytes > queryLimitBytes) { TString error = TStringBuilder() << "Query text size exceeds limit (" << querySizeBytes << "b > " << queryLimitBytes << "b)"; ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId); - if (!dbCounters) { - dbCounters = Counters->GetDbCounters(request.GetDatabase()); - } - LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters); return; } } @@ -653,11 +527,6 @@ public: if (paramsLimitBytes) { auto paramsBytes = request.GetParameters().ByteSizeLong(); if (paramsBytes > paramsLimitBytes) { - if (!dbCounters) { - dbCounters = Counters->GetDbCounters(request.GetDatabase()); - } - LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters); - TString error = TStringBuilder() << "Parameters size exceeds limit (" << paramsBytes << "b > " << paramsLimitBytes << "b)"; ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId); return; @@ -679,42 +548,15 @@ public: } TActorId targetId; - if (!request.GetSessionId().empty()) { - TProcessResult<TActorId> result; - if (!TryGetSessionTargetActor(request.GetSessionId(), requestInfo, result)) { - if (!dbCounters) { - dbCounters = Counters->GetDbCounters(request.GetDatabase()); - } - LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters); - ReplyProcessError(result.YdbStatus, result.Error, requestId); - return; - } - - targetId = result.Value; - - LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters); + if (sessionInfo) { + targetId = sessionInfo->WorkerId; } else { - TProcessResult<TKqpSessionInfo*> result; - if (!CreateNewSessionWorker(requestInfo, TString(DefaultKikimrPublicClusterName), false, - request.GetDatabase(), false, result)) - { - if (!dbCounters) { - dbCounters = Counters->GetDbCounters(request.GetDatabase()); - } - LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters); - ReplyProcessError(result.YdbStatus, result.Error, requestId); + targetId = TryGetSessionTargetActor(request.GetSessionId(), requestInfo, requestId); + if (!targetId) { return; } - - targetId = result.Value->WorkerId; - request.SetSessionId(result.Value->SessionId); - dbCounters = result.Value->DbCounters; - - LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters); } - TString sessionId = request.GetSessionId(); - PendingRequests.SetSessionId(requestId, sessionId, dbCounters); // We add extra milliseconds to the user-specified timeout, so it means we give additional priority for worker replies, // because it is much better to give detailed error message rather than generic timeout. // For example, it helps to avoid race in event order when worker and proxy recieve timeout at the same moment. @@ -732,23 +574,23 @@ public: TKqpRequestInfo requestInfo(event.GetTraceId()); TString sessionId = request.GetSessionId(); - auto dbCounters = GetDbCountersForSession(sessionId); + const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId); + auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr; LogRequest(request, requestInfo, ev->Sender, dbCounters); - auto sessionInfo = LocalSessions->IsPendingShutdown(sessionId); - if (sessionInfo) { - if (dbCounters) { - // session is pending shutdown, and we close it - // but direct request from user. - Counters->ReportSessionGracefulShutdownHit(sessionInfo->DbCounters); - } + if (LocalSessions->IsPendingShutdown(sessionId) && dbCounters) { + Counters->ReportSessionGracefulShutdownHit(dbCounters); } - if (!sessionId.empty()) { - TProcessResult<TActorId> result; - if (TryGetSessionTargetActor(sessionId, requestInfo, result)) { - Send(result.Value, ev->Release().Release()); + if (sessionInfo) { + Send(sessionInfo->WorkerId, ev->Release().Release()); + } else { + if (!sessionId.empty()) { + TActorId targetId = TryGetSessionTargetActor(sessionId, requestInfo, 0); + if (targetId) { + Send(targetId, ev->Release().Release()); + } } } } @@ -761,15 +603,18 @@ public: TKqpRequestInfo requestInfo(traceId); auto sessionId = request.GetSessionId(); ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvPingSessionRequest); - - auto dbCounters = GetDbCountersForSession(sessionId); - + const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId); + auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr; LogRequest(request, requestInfo, ev->Sender, requestId, dbCounters); - TProcessResult<TActorId> result; - if (!TryGetSessionTargetActor(sessionId, requestInfo, result)) { - ReplyProcessError(result.YdbStatus, result.Error, requestId); - return; + TActorId targetId; + if (sessionInfo) { + targetId = sessionInfo->WorkerId; + } else { + targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId); + if (!targetId) { + return; + } } TDuration timeout = DEFAULT_KEEP_ALIVE_TIMEOUT; @@ -779,7 +624,7 @@ public: PendingRequests.SetSessionId(requestId, sessionId, dbCounters); StartQueryTimeout(requestId, timeout); - Send(result.Value, ev->Release().Release(), IEventHandle::FlagTrackDelivery, requestId); + Send(targetId, ev->Release().Release(), IEventHandle::FlagTrackDelivery, requestId); } template<typename TEvent> @@ -821,8 +666,7 @@ public: } } - void Handle(TEvPrivate::TEvCollectPeerProxyData::TPtr& ev) { - Y_UNUSED(ev); + void Handle(TEvPrivate::TEvCollectPeerProxyData::TPtr&) { LookupPeerProxyData(); if (!ShutdownRequested) { const auto& sbs = TableServiceConfig.GetSessionBalancerSettings(); @@ -1171,11 +1015,10 @@ private: Counters->ReportResponseStatus(dbCounters, event.ByteSize(), status); } - void LogResponse(const TKqpRequestInfo& requestInfo, + void LogResponse(const TKqpRequestInfo&, const TEvKqp::TProtoArenaHolder<NKikimrKqp::TEvQueryResponse>& holder, TKqpDbCountersPtr dbCounters) { - Y_UNUSED(requestInfo); const auto& event = holder.GetRef(); Counters->ReportResponseStatus(dbCounters, event.ByteSize(), @@ -1192,20 +1035,16 @@ private: Counters->ReportResultsBytes(dbCounters, resultsBytes); } - void LogResponse(const TKqpRequestInfo& requestInfo, + void LogResponse(const TKqpRequestInfo&, const NKikimrKqp::TEvCreateSessionResponse& event, TKqpDbCountersPtr dbCounters) { - Y_UNUSED(requestInfo); - Counters->ReportResponseStatus(dbCounters, event.ByteSize(), event.GetYdbStatus()); } - void LogResponse(const TKqpRequestInfo& requestInfo, + void LogResponse(const TKqpRequestInfo&, const NKikimrKqp::TEvPingSessionResponse& event, TKqpDbCountersPtr dbCounters) { - Y_UNUSED(requestInfo); - Counters->ReportResponseStatus(dbCounters, event.ByteSize(), event.GetStatus()); } @@ -1334,34 +1173,21 @@ private: return true; } - bool TryGetSessionTargetActor(const TString& sessionId, const TKqpRequestInfo& requestInfo, TProcessResult<TActorId>& result) + TActorId TryGetSessionTargetActor(const TString& sessionId, const TKqpRequestInfo& requestInfo, ui64 requestId) { - result.YdbStatus = Ydb::StatusIds::SUCCESS; - result.Error.clear(); - auto nodeId = TryDecodeYdbSessionId(sessionId); if (!nodeId) { TString error = TStringBuilder() << "Failed to parse session id: " << sessionId; KQP_PROXY_LOG_W(requestInfo << error); - - result.YdbStatus = Ydb::StatusIds::BAD_REQUEST; - result.Error = error; - return false; + ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId); + return TActorId(); } if (*nodeId == SelfId().NodeId()) { - auto localSession = LocalSessions->FindPtr(sessionId); - if (!localSession) { - TString error = TStringBuilder() << "Session not found: " << sessionId; - KQP_PROXY_LOG_N(requestInfo << error); - - result.YdbStatus = Ydb::StatusIds::BAD_SESSION; - result.Error = error; - return false; - } - - result.Value = localSession->WorkerId; - return true; + TString error = TStringBuilder() << "Session not found: " << sessionId; + KQP_PROXY_LOG_N(requestInfo << error); + ReplyProcessError(Ydb::StatusIds::BAD_SESSION, error, requestId); + return TActorId(); } if (!Tenants.empty()) { @@ -1369,8 +1195,7 @@ private: Counters->ReportProxyForwardedRequest(counters); } - result.Value = MakeKqpProxyID(*nodeId); - return true; + return MakeKqpProxyID(*nodeId); } void RemoveSession(const TString& sessionId, const TActorId& workerId) { @@ -1410,11 +1235,6 @@ private: NYql::NDq::SetYqlLogLevels(yqlPriority); } - TKqpDbCountersPtr GetDbCountersForSession(const TString& sessionId) const { - auto localSession = LocalSessions->FindPtr(sessionId); - return localSession ? localSession->DbCounters : nullptr; - } - private: NYql::NLog::YqlLoggerScope YqlLoggerScope; NKikimrConfig::TLogConfig LogConfig; diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.h b/ydb/core/kqp/proxy/kqp_proxy_service.h index fe3d58ed425..e6d44a176ca 100644 --- a/ydb/core/kqp/proxy/kqp_proxy_service.h +++ b/ydb/core/kqp/proxy/kqp_proxy_service.h @@ -40,7 +40,7 @@ class TKqpProxyRequestTracker { public: TKqpProxyRequestTracker() - : RequestId(0) + : RequestId(1) {} ui64 RegisterRequest(const TActorId& sender, ui64 senderCookie, const TString& traceId, ui32 eventType) { @@ -129,6 +129,144 @@ struct TPeerStats { }; +class TLocalSessionsRegistry { + THashMap<TString, TKqpSessionInfo> LocalSessions; + THashMap<TActorId, TString> TargetIdIndex; + THashSet<TString> ShutdownInFlightSessions; + THashMap<TString, ui32> SessionsCountPerDatabase; + std::vector<std::vector<TString>> ReadySessions; + TIntrusivePtr<IRandomProvider> RandomProvider; + +public: + TLocalSessionsRegistry(TIntrusivePtr<IRandomProvider> randomProvider) + : ReadySessions(2) + , RandomProvider(randomProvider) + {} + + TKqpSessionInfo* Create(const TString& sessionId, const TActorId& workerId, + const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing) + { + std::vector<i32> pos(2, -1); + pos[0] = ReadySessions[0].size(); + ReadySessions[0].push_back(sessionId); + + if (supportsBalancing) { + pos[1] = ReadySessions[1].size(); + ReadySessions[1].push_back(sessionId); + } + + auto result = LocalSessions.emplace(sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos))); + SessionsCountPerDatabase[database]++; + Y_VERIFY(result.second, "Duplicate session id!"); + TargetIdIndex.emplace(workerId, sessionId); + return &result.first->second; + } + + const THashSet<TString>& GetShutdownInFlight() const { + return ShutdownInFlightSessions; + } + + TKqpSessionInfo* StartShutdownSession(const TString& sessionId) { + ShutdownInFlightSessions.emplace(sessionId); + auto ptr = LocalSessions.FindPtr(sessionId); + ptr->ShutdownStartedAt = TAppData::TimeProvider->Now(); + RemoveSessionFromLists(ptr); + return ptr; + } + + TKqpSessionInfo* PickSessionToShutdown(bool force, ui32 minReasonableToKick) { + auto& sessions = force ? ReadySessions.at(0) : ReadySessions.at(1); + if (sessions.size() >= minReasonableToKick) { + ui64 idx = RandomProvider->GenRand() % sessions.size(); + return StartShutdownSession(sessions[idx]); + } + + return nullptr; + } + + THashMap<TString, TKqpSessionInfo>::const_iterator begin() const { + return LocalSessions.begin(); + } + + THashMap<TString, TKqpSessionInfo>::const_iterator end() const { + return LocalSessions.end(); + } + + size_t GetShutdownInFlightSize() const { + return ShutdownInFlightSessions.size(); + } + + void Erase(const TString& sessionId) { + auto it = LocalSessions.find(sessionId); + if (it != LocalSessions.end()) { + auto counter = SessionsCountPerDatabase.find(it->second.Database); + if (counter != SessionsCountPerDatabase.end()) { + counter->second--; + if (counter->second == 0) { + SessionsCountPerDatabase.erase(counter); + } + } + + RemoveSessionFromLists(&(it->second)); + ShutdownInFlightSessions.erase(sessionId); + TargetIdIndex.erase(it->second.WorkerId); + LocalSessions.erase(it); + } + } + + bool IsPendingShutdown(const TString& sessionId) const { + return ShutdownInFlightSessions.find(sessionId) != ShutdownInFlightSessions.end(); + } + + bool CheckDatabaseLimits(const TString& database, ui32 databaseLimit) { + auto it = SessionsCountPerDatabase.find(database); + if (it == SessionsCountPerDatabase.end()){ + return true; + } + + if (it->second + 1 <= databaseLimit) { + return true; + } + + return false; + } + + size_t size() const { + return LocalSessions.size(); + } + + const TKqpSessionInfo* FindPtr(const TString& sessionId) const { + return LocalSessions.FindPtr(sessionId); + } + + void Erase(const TActorId& targetId) { + auto it = TargetIdIndex.find(targetId); + if (it != TargetIdIndex.end()){ + Erase(it->second); + } + } + +private: + void RemoveSessionFromLists(TKqpSessionInfo* ptr) { + for(ui32 i = 0; i < ptr->ReadyPos.size(); ++i) { + i32& pos = ptr->ReadyPos.at(i); + auto& sessions = ReadySessions.at(i); + if (pos != -1 && pos + 1 != static_cast<i32>(sessions.size())) { + auto& lastPos = LocalSessions.at(sessions.back()).ReadyPos.at(i); + Y_VERIFY(lastPos + 1 == static_cast<i32>(sessions.size())); + std::swap(sessions[pos], sessions[lastPos]); + lastPos = pos; + } + + if (pos != -1) { + sessions.pop_back(); + pos = -1; + } + } + } +}; + + TSimpleResourceStats CalcPeerStats( const TVector<NKikimrKqp::TKqpProxyNodeResources>& data, const TString& selfDataCenterId, bool localDatacenterPolicy, std::function<double(const NKikimrKqp::TKqpProxyNodeResources& entry)> ExtractValue); |