aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2022-06-30 20:37:14 +0300
committerVitalii Gridnev <gridnevvvit@gmail.com>2022-06-30 20:37:14 +0300
commitd29a0b28f5078c91efce67d27e9eee433159d999 (patch)
treeba13e798e767c5ebd6926f348beb7c8a8e80e072
parent53d4c76b348320ead25bfe9aa4efdc92d6fbb762 (diff)
downloadydb-d29a0b28f5078c91efce67d27e9eee433159d999.tar.gz
add option to balancer: enable tiers KIKIMR-15226
ref:3c4093a5f1da7ee65bbaa753f7cb3e24a65d03e1
-rw-r--r--ydb/core/grpc_services/rpc_create_session.cpp1
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp14
-rw-r--r--ydb/core/kqp/counters/kqp_counters.h4
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_service.cpp242
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_service.h6
-rw-r--r--ydb/core/protos/config.proto22
-rw-r--r--ydb/core/protos/kqp.proto1
7 files changed, 197 insertions, 93 deletions
diff --git a/ydb/core/grpc_services/rpc_create_session.cpp b/ydb/core/grpc_services/rpc_create_session.cpp
index eeaf8c1eae..7f52771a3d 100644
--- a/ydb/core/grpc_services/rpc_create_session.cpp
+++ b/ydb/core/grpc_services/rpc_create_session.cpp
@@ -66,6 +66,7 @@ private:
if (Request().HasClientCapability(NYdb::YDB_CLIENT_CAPABILITY_SESSION_BALANCER)) {
ev->Record.SetCanCreateRemoteSession(true);
+ ev->Record.SetSupportsBalancing(true);
}
SetDatabase(ev, Request());
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index 291450f6f6..7385f21495 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -206,8 +206,8 @@ void TKqpCountersBase::Init() {
SessionActorCleanupLatency = KqpGroup->GetHistogram(
"SessionActors/CleanupLatencyMs", NMonitoring::ExponentialHistogram(10, 2, 1));
- SessionBalancerCV = KqpGroup->GetCounter("SessionBalancer/CV", false);
SessionBalancerShutdowns = KqpGroup->GetCounter("SessionBalancer/Shutdown", true);
+ SessionGracefulShutdownHit = KqpGroup->GetCounter("SessionBalancer/GracefulHit", true);
/* Transactions */
TxCreated = KqpGroup->GetCounter("Transactions/Created", true);
@@ -270,6 +270,10 @@ void TKqpCountersBase::ReportSessionShutdownRequest() {
SessionBalancerShutdowns->Inc();
}
+void TKqpCountersBase::ReportSessionGracefulShutdownHit() {
+ SessionGracefulShutdownHit->Inc();
+}
+
void TKqpCountersBase::ReportCreateSession(ui64 requestSize) {
CreateSessionRequests->Inc();
*RequestBytes += requestSize;
@@ -807,6 +811,14 @@ void TKqpCounters::ReportSessionShutdownRequest(TKqpDbCountersPtr dbCounters) {
}
}
+void TKqpCounters::ReportSessionGracefulShutdownHit(TKqpDbCountersPtr dbCounters) {
+ TKqpCountersBase::ReportSessionGracefulShutdownHit();
+ if (dbCounters) {
+ dbCounters->ReportSessionGracefulShutdownHit();
+ }
+
+}
+
void TKqpCounters::ReportCreateSession(TKqpDbCountersPtr dbCounters, ui64 requestSize) {
TKqpCountersBase::ReportCreateSession(requestSize);
if (dbCounters) {
diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h
index d811bd9722..cbbadf7380 100644
--- a/ydb/core/kqp/counters/kqp_counters.h
+++ b/ydb/core/kqp/counters/kqp_counters.h
@@ -36,6 +36,7 @@ protected:
void ReportQueryAction(NKikimrKqp::EQueryAction action);
void ReportQueryType(NKikimrKqp::EQueryType type);
+ void ReportSessionGracefulShutdownHit();
void ReportSessionShutdownRequest();
void ReportCreateSession(ui64 requestSize);
void ReportPingSession(ui64 requestSize);
@@ -159,7 +160,7 @@ protected:
NMonitoring::TDynamicCounters::TCounterPtr WorkersClosedError;
NMonitoring::TDynamicCounters::TCounterPtr WorkersClosedRequest;
NMonitoring::TDynamicCounters::TCounterPtr ActiveWorkers;
- NMonitoring::TDynamicCounters::TCounterPtr SessionBalancerCV;
+ NMonitoring::TDynamicCounters::TCounterPtr SessionGracefulShutdownHit;
NMonitoring::TDynamicCounters::TCounterPtr SessionBalancerShutdowns;
NMonitoring::TDynamicCounters::TCounterPtr ProxyForwardedRequests;
@@ -263,6 +264,7 @@ public:
explicit TKqpCounters(const NMonitoring::TDynamicCounterPtr& counters, const TActorContext* ctx = nullptr);
void ReportProxyForwardedRequest(TKqpDbCountersPtr dbCounters);
+ void ReportSessionGracefulShutdownHit(TKqpDbCountersPtr dbCounters);
void ReportSessionShutdownRequest(TKqpDbCountersPtr dbCounters);
void ReportCreateSession(TKqpDbCountersPtr dbCounters, ui64 requestSize);
void ReportPingSession(TKqpDbCountersPtr dbCounters, ui64 requestSize);
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
index 614844d28c..9bbbb1320b 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
@@ -55,6 +55,9 @@ static constexpr TDuration DEFAULT_EXTRA_TIMEOUT_WAIT = TDuration::MilliSeconds(
static constexpr TDuration DEFAULT_CREATE_SESSION_TIMEOUT = TDuration::MilliSeconds(5000);
+using namespace NKikimrConfig;
+
+
std::optional<ui32> GetDefaultStateStorageGroupId(const TString& database) {
if (auto* domainInfo = AppData()->DomainsInfo->GetDomainByName(ExtractDomain(database))) {
return domainInfo->DefaultStateStorageGroup;
@@ -118,20 +121,30 @@ class TLocalSessionsRegistry {
THashMap<TString, TKqpSessionInfo> LocalSessions;
THashMap<TActorId, TString> TargetIdIndex;
THashSet<TString> ShutdownInFlightSessions;
- TList<TString> RecentUsedSessions;
- THashMap<TString, TList<TString>::iterator> Links;
THashMap<TString, ui32> SessionsCountPerDatabase;
+ std::vector<std::vector<TString>> ReadySessions;
+ TIntrusivePtr<IRandomProvider> RandomProvider;
public:
- TLocalSessionsRegistry() = default;
+ TLocalSessionsRegistry(TIntrusivePtr<IRandomProvider> randomProvider)
+ : ReadySessions(2)
+ , RandomProvider(randomProvider)
+ {}
TKqpSessionInfo* Create(const TString& sessionId, const TActorId& workerId,
- const TString& database, TKqpDbCountersPtr dbCounters)
+ const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing)
{
- RecentUsedSessions.push_front(sessionId);
- auto result = LocalSessions.emplace(sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters));
+ std::vector<i32> pos(2, -1);
+ pos[0] = ReadySessions[0].size();
+ ReadySessions[0].push_back(sessionId);
+
+ if (supportsBalancing) {
+ pos[1] = ReadySessions[1].size();
+ ReadySessions[1].push_back(sessionId);
+ }
+
+ auto result = LocalSessions.emplace(sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos)));
SessionsCountPerDatabase[database]++;
- Links.emplace(sessionId, RecentUsedSessions.begin());
Y_VERIFY(result.second, "Duplicate session id!");
TargetIdIndex.emplace(workerId, sessionId);
return &result.first->second;
@@ -141,23 +154,21 @@ public:
return ShutdownInFlightSessions;
}
- TKqpSessionInfo* CanShutdownSession(const TString& sessionId) {
- auto [_, success] = ShutdownInFlightSessions.emplace(sessionId);
- if (success) {
- auto ptr = LocalSessions.FindPtr(sessionId);
- ptr->ShutdownStartedAt = TAppData::TimeProvider->Now();
- return ptr;
- }
-
- return nullptr;
+ TKqpSessionInfo* StartShutdownSession(const TString& sessionId) {
+ ShutdownInFlightSessions.emplace(sessionId);
+ auto ptr = LocalSessions.FindPtr(sessionId);
+ ptr->ShutdownStartedAt = TAppData::TimeProvider->Now();
+ RemoveSessionFromLists(ptr);
+ return ptr;
}
- TKqpSessionInfo* PickSessionToShutdown() {
- for(auto it = RecentUsedSessions.begin(); it != RecentUsedSessions.end(); ++it) {
- TKqpSessionInfo* session = CanShutdownSession(*it);
- if (session)
- return session;
+ TKqpSessionInfo* PickSessionToShutdown(bool force, ui32 minReasonableToKick) {
+ auto& sessions = force ? ReadySessions.at(0) : ReadySessions.at(1);
+ if (sessions.size() >= minReasonableToKick) {
+ ui64 idx = RandomProvider->GenRand() % sessions.size();
+ return StartShutdownSession(sessions[idx]);
}
+
return nullptr;
}
@@ -183,15 +194,40 @@ public:
SessionsCountPerDatabase.erase(counter);
}
}
- auto lnk = Links.find(sessionId);
- RecentUsedSessions.erase(lnk->second);
- Links.erase(sessionId);
+
+ RemoveSessionFromLists(&(it->second));
ShutdownInFlightSessions.erase(sessionId);
TargetIdIndex.erase(it->second.WorkerId);
LocalSessions.erase(it);
}
}
+ void RemoveSessionFromLists(TKqpSessionInfo* ptr) {
+ for(ui32 i = 0; i < ptr->ReadyPos.size(); ++i) {
+ i32& pos = ptr->ReadyPos.at(i);
+ auto& sessions = ReadySessions.at(i);
+ if (pos != -1 && pos + 1 != static_cast<i32>(sessions.size())) {
+ auto& lastPos = LocalSessions.at(sessions.back()).ReadyPos.at(i);
+ Y_VERIFY(lastPos + 1 == static_cast<i32>(sessions.size()));
+ std::swap(sessions[pos], sessions[lastPos]);
+ lastPos = pos;
+ }
+
+ if (pos != -1) {
+ sessions.pop_back();
+ pos = -1;
+ }
+ }
+ }
+
+ const TKqpSessionInfo* IsPendingShutdown(const TString& sessionId) const {
+ if (ShutdownInFlightSessions.find(sessionId) != ShutdownInFlightSessions.end()) {
+ return FindPtr(sessionId);
+ }
+
+ return nullptr;
+ }
+
bool CheckDatabaseLimits(const TString& database, ui32 databaseLimit) {
auto it = SessionsCountPerDatabase.find(database);
if (it == SessionsCountPerDatabase.end()){
@@ -213,25 +249,6 @@ public:
return LocalSessions.FindPtr(sessionId);
}
- void Promote(const TString& sessionId) {
- auto lnk = Links.find(sessionId);
- Y_VERIFY(lnk != Links.end());
- RecentUsedSessions.erase(lnk->second);
- RecentUsedSessions.push_front(lnk->first);
- lnk->second = RecentUsedSessions.begin();
- }
-
- TKqpSessionInfo* FindAndPromote(const TString& sessionId) {
- TKqpSessionInfo* ptr = LocalSessions.FindPtr(sessionId);
- if (ptr) {
- ptr->LastRequestAt = TAppData::TimeProvider->Now();
- ++ptr->UseFrequency;
- Promote(sessionId);
- }
-
- return ptr;
- }
-
void Erase(const TActorId& targetId) {
auto it = TargetIdIndex.find(targetId);
if (it != TargetIdIndex.end()){
@@ -278,13 +295,15 @@ public:
, PendingRequests()
, TenantsReady(false)
, Tenants()
- , ModuleResolverState() {}
+ , ModuleResolverState()
+ {}
void Bootstrap() {
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(KQP_PROVIDER));
Counters = MakeIntrusive<TKqpCounters>(AppData()->Counters, &TlsActivationContext->AsActorContext());
ModuleResolverState = MakeIntrusive<TModuleResolverState>();
+ LocalSessions = std::make_unique<TLocalSessionsRegistry>(AppData()->RandomProvider);
RandomProvider = AppData()->RandomProvider;
if (!GetYqlDefaultModuleResolver(ModuleResolverState->ExprCtx, ModuleResolverState->ModuleResolver)) {
TStringStream errorStream;
@@ -413,7 +432,7 @@ public:
return;
}
- NodeResources.SetActiveWorkersCount(LocalSessions.size());
+ NodeResources.SetActiveWorkersCount(LocalSessions->size());
PublishBoardPath = MakeKqpProxyBoardPath(database);
auto actor = CreateBoardPublishActor(PublishBoardPath, NodeResources.SerializeAsString(), SelfId(), *groupId, 0, true);
BoardPublishActor = Register(actor);
@@ -463,7 +482,7 @@ public:
}
KQP_PROXY_LOG_I("Received tenant pool status, serving tenants: " << JoinRange(", ", Tenants.begin(), Tenants.end()));
- for (auto& [_, sessionInfo] : LocalSessions) {
+ for (auto& [_, sessionInfo] : *LocalSessions) {
if (!sessionInfo.Database.empty() && !Tenants.contains(sessionInfo.Database)) {
auto closeSessionEv = MakeHolder<TEvKqp::TEvCloseSessionRequest>();
closeSessionEv->Record.MutableRequest()->SetSessionId(sessionInfo.SessionId);
@@ -533,11 +552,11 @@ public:
KQP_PROXY_LOG_N("KQP proxy shutdown requested.");
ShutdownRequested = true;
ShutdownState.Reset(ev->Get()->ShutdownState.Get());
- ShutdownState->Update(LocalSessions.size());
+ ShutdownState->Update(LocalSessions->size());
auto& shs = TableServiceConfig.GetShutdownSettings();
ui32 hardTimeout = shs.GetHardTimeoutMs();
ui32 softTimeout = shs.GetSoftTimeoutMs();
- for(auto& [idx, sessionInfo] : LocalSessions) {
+ for(auto& [idx, sessionInfo] : *LocalSessions) {
Send(sessionInfo.WorkerId, new TEvKqp::TEvInitiateSessionShutdown(softTimeout, hardTimeout));
}
}
@@ -548,6 +567,11 @@ public:
return false;
}
+ const auto& sbs = TableServiceConfig.GetSessionBalancerSettings();
+ if (!sbs.GetSupportRemoteSessionCreation()) {
+ return false;
+ }
+
ui64 randomNumber = RandomProvider->GenRand();
ui32 nodeId = LocalDatacenterProxies[randomNumber % LocalDatacenterProxies.size()];
if (nodeId == SelfId().NodeId()){
@@ -557,6 +581,7 @@ public:
std::unique_ptr<TEvKqp::TEvCreateSessionRequest> remoteRequest = std::make_unique<TEvKqp::TEvCreateSessionRequest>();
remoteRequest->Record.SetDeadlineUs(event.GetDeadlineUs());
remoteRequest->Record.SetTraceId(event.GetTraceId());
+ remoteRequest->Record.SetSupportsBalancing(event.GetSupportsBalancing());
remoteRequest->Record.MutableRequest()->SetDatabase(event.GetRequest().GetDatabase());
ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, event.GetTraceId(), TKqpEvents::EvCreateSessionRequest);
@@ -582,7 +607,7 @@ public:
const auto deadline = TInstant::MicroSeconds(event.GetDeadlineUs());
if (CheckRequestDeadline(requestInfo, deadline, result) &&
- CreateNewSessionWorker(requestInfo, TString(DefaultKikimrPublicClusterName), true, request.GetDatabase(), result))
+ CreateNewSessionWorker(requestInfo, TString(DefaultKikimrPublicClusterName), true, request.GetDatabase(), event.GetSupportsBalancing(), result))
{
auto& response = *responseEv->Record.MutableResponse();
response.SetSessionId(result.Value->SessionId);
@@ -657,7 +682,7 @@ public:
} else {
TProcessResult<TKqpSessionInfo*> result;
if (!CreateNewSessionWorker(requestInfo, TString(DefaultKikimrPublicClusterName), false,
- request.GetDatabase(), result))
+ request.GetDatabase(), false, result))
{
if (!dbCounters) {
dbCounters = Counters->GetDbCounters(request.GetDatabase());
@@ -697,6 +722,15 @@ public:
LogRequest(request, requestInfo, ev->Sender, dbCounters);
+ auto sessionInfo = LocalSessions->IsPendingShutdown(sessionId);
+ if (sessionInfo) {
+ if (dbCounters) {
+ // session is pending shutdown, and we close it
+ // but direct request from user.
+ Counters->ReportSessionGracefulShutdownHit(sessionInfo->DbCounters);
+ }
+ }
+
if (!sessionId.empty()) {
TProcessResult<TActorId> result;
if (TryGetSessionTargetActor(sessionId, requestInfo, result)) {
@@ -832,23 +866,61 @@ public:
return false;
}
+ std::pair<bool, ui32> GetBalancerEnableSettings() const {
+ const auto& sbs = TableServiceConfig.GetSessionBalancerSettings();
+ ui32 maxInFlightSize = sbs.GetMaxSessionsShutdownInFlightSize();
+ bool force = false;
+
+ auto tier = sbs.GetEnableTier();
+ if (sbs.GetEnabled()) {
+ // it's legacy configuration.
+ tier = TTableServiceConfig_TSessionBalancerSettings::TIER_ENABLED_FOR_ALL;
+ }
+
+ switch(tier) {
+ case TTableServiceConfig_TSessionBalancerSettings::TIER_DISABLED:
+ return {false, 0};
+ case TTableServiceConfig_TSessionBalancerSettings::TIER_ENABLED_FOR_ALL:
+ return {true, maxInFlightSize};
+ case TTableServiceConfig_TSessionBalancerSettings::TIER_ENABLED_FOR_SESSIONS_WITH_SUPPORT:
+ return {false, maxInFlightSize};
+ default:
+ return {false, 0};
+ }
+
+ return {force, maxInFlightSize};
+ }
+
void TryKickSession() {
const auto& sbs = TableServiceConfig.GetSessionBalancerSettings();
- ui32 maxInFlightSize = sbs.GetMaxSessionsShutdownInFlightSize();
- if (!maxInFlightSize || !sbs.GetEnabled())
- return;
+ const std::pair<bool, ui32> settings = GetBalancerEnableSettings();
Y_VERIFY(PeerStats);
bool isReasonableToKick = false;
+ ui32 strategy = static_cast<ui32>(sbs.GetStrategy());
+ ui32 balanceByCpu = strategy & TTableServiceConfig_TSessionBalancerSettings::BALANCE_BY_CPU;
+ ui32 balanceByCount = strategy & TTableServiceConfig_TSessionBalancerSettings::BALANCE_BY_COUNT;
+
if (sbs.GetLocalDatacenterPolicy()) {
- isReasonableToKick |= ShouldStartBalancing(PeerStats->LocalSessionCount, static_cast<double>(sbs.GetMinNodeSessions()), static_cast<double>(LocalSessions.size()));
- isReasonableToKick |= ShouldStartBalancing(PeerStats->LocalCpu, sbs.GetMinCpuBalancerThreshold(), NodeResources.GetCpuUsage());
+ if (balanceByCount) {
+ isReasonableToKick |= ShouldStartBalancing(PeerStats->LocalSessionCount, static_cast<double>(sbs.GetMinNodeSessions()), static_cast<double>(LocalSessions->size()));
+ }
+
+ if (balanceByCpu) {
+ isReasonableToKick |= ShouldStartBalancing(PeerStats->LocalCpu, sbs.GetMinCpuBalancerThreshold(), NodeResources.GetCpuUsage());
+ }
+
} else {
- isReasonableToKick |= ShouldStartBalancing(PeerStats->CrossAZSessionCount, static_cast<double>(sbs.GetMinNodeSessions()), static_cast<double>(LocalSessions.size()));
- isReasonableToKick |= ShouldStartBalancing(PeerStats->CrossAZCpu, sbs.GetMinCpuBalancerThreshold(), NodeResources.GetCpuUsage());
+ if (balanceByCount) {
+ isReasonableToKick |= ShouldStartBalancing(PeerStats->CrossAZSessionCount, static_cast<double>(sbs.GetMinNodeSessions()), static_cast<double>(LocalSessions->size()));
+ }
+
+ if (balanceByCpu) {
+ isReasonableToKick |= ShouldStartBalancing(PeerStats->CrossAZCpu, sbs.GetMinCpuBalancerThreshold(), NodeResources.GetCpuUsage());
+ }
}
if (!isReasonableToKick) {
@@ -859,8 +931,8 @@ public:
ServerWorkerBalancerComplete = false;
}
- while(LocalSessions.GetShutdownInFlightSize() < maxInFlightSize) {
- auto sessionInfo = LocalSessions.PickSessionToShutdown();
+ while(LocalSessions->GetShutdownInFlightSize() < settings.second) {
+ auto sessionInfo = LocalSessions->PickSessionToShutdown(settings.first, sbs.GetMinNodeSessions());
if (!sessionInfo) {
break;
}
@@ -882,12 +954,12 @@ public:
}
void ProcessMonShutdownQueue(ui32 wantsToShutdown) {
- for(auto& [_, sessionInfo] : LocalSessions) {
- if (!wantsToShutdown)
+ for(ui32 i = 0; i < wantsToShutdown; ++i) {
+ const TKqpSessionInfo* candidate = LocalSessions->PickSessionToShutdown(true, 0);
+ if (!candidate)
break;
- StartSessionGraceShutdown(&sessionInfo);
- --wantsToShutdown;
+ StartSessionGraceShutdown(candidate);
}
}
@@ -901,7 +973,7 @@ public:
const TString& forceShutdown = cgi.Get("force_shutdown");
ui32 wantsToShutdown = 0;
if (forceShutdown == "all") {
- wantsToShutdown = LocalSessions.size();
+ wantsToShutdown = LocalSessions->size();
} else {
wantsToShutdown = FromStringWithDefault<ui32>(forceShutdown, 0);
}
@@ -932,12 +1004,13 @@ public:
str << "Force shutdown all sessions: <a href=\"kqp_proxy?" << cgiTmp.Print() << "\">Execute</a>" << Endl;
}
- str << "Session balancer settings: " << Endl;
- str << "Enabled: " << (sbs.GetEnabled() ? "true" : "false") << Endl;
- str << "MaxSessionsShutdownInFlightSize: " << sbs.GetMaxSessionsShutdownInFlightSize() << Endl;
+ const std::pair<bool, ui32> sbsSettings = GetBalancerEnableSettings();
+ str << "Allow shutdown all sessions: " << (sbsSettings.first ? "true": "false") << Endl;
+ str << "MaxSessionsShutdownInFlightSize: " << sbsSettings.second << Endl;
str << "LocalDatacenterPolicy: " << (sbs.GetLocalDatacenterPolicy() ? "true" : "false") << Endl;
str << "MaxCVTreshold: " << sbs.GetMaxCVTreshold() << Endl;
str << "MinCVTreshold: " << sbs.GetMinCVTreshold() << Endl;
+ str << "Balance strategy: " << TTableServiceConfig_TSessionBalancerSettings_EBalancingStrategy_Name(sbs.GetStrategy()) << Endl;
str << Endl;
@@ -956,17 +1029,16 @@ public:
str << "EnableSessionActor: "
<< (AppData()->FeatureFlags.GetEnableKqpSessionActor() ? "true" : "false") << Endl;
- str << "Active workers/session_actors count on node: " << LocalSessions.size() << Endl;
+ str << "Active workers/session_actors count on node: " << LocalSessions->size() << Endl;
- const auto& sessionsShutdownInFlight = LocalSessions.GetShutdownInFlight();
+ const auto& sessionsShutdownInFlight = LocalSessions->GetShutdownInFlight();
if (!sessionsShutdownInFlight.empty()) {
str << Endl;
str << "Sessions shutdown in flight: " << Endl;
auto now = TAppData::TimeProvider->Now();
for(const auto& sessionId : sessionsShutdownInFlight) {
- auto session = LocalSessions.FindPtr(sessionId);
- str << "Session " << sessionId << " is under shutdown for " << (now - session->ShutdownStartedAt).SecondsFloat() << " seconds. "
- << "QueryRequests: " << session->UseFrequency << Endl;
+ auto session = LocalSessions->FindPtr(sessionId);
+ str << "Session " << sessionId << " is under shutdown for " << (now - session->ShutdownStartedAt).SecondsFloat() << " seconds. " << Endl;
}
str << Endl;
@@ -989,7 +1061,7 @@ public:
str << "Peer(NodeId: " << entry.GetNodeId() << ", DataCenter: " << entry.GetDataCenterId() << "): active workers: "
<< entry.GetActiveWorkersCount() << "): cpu usage: " << entry.GetCpuUsage() << ", threads count: " << entry.GetThreads() << Endl;
}
- }
+ }
}
}
@@ -1040,7 +1112,7 @@ public:
RemoveSession(sessionId, workerId);
KQP_PROXY_LOG_D("Session closed, sessionId: " << event.GetResponse().GetSessionId()
- << ", workerId: " << workerId << ", local sessions count: " << LocalSessions.size());
+ << ", workerId: " << workerId << ", local sessions count: " << LocalSessions->size());
}
}
@@ -1183,7 +1255,7 @@ private:
}
bool CreateNewSessionWorker(const TKqpRequestInfo& requestInfo,
- const TString& cluster, bool longSession, const TString& database, TProcessResult<TKqpSessionInfo*>& result)
+ const TString& cluster, bool longSession, const TString& database, bool supportsBalancing, TProcessResult<TKqpSessionInfo*>& result)
{
if (!database.empty()) {
if (!TenantsReady) {
@@ -1209,7 +1281,7 @@ private:
}
auto sessionsLimitPerNode = TableServiceConfig.GetSessionsLimitPerNode();
- if (sessionsLimitPerNode && !LocalSessions.CheckDatabaseLimits(database, sessionsLimitPerNode)) {
+ if (sessionsLimitPerNode && !LocalSessions->CheckDatabaseLimits(database, sessionsLimitPerNode)) {
TString error = TStringBuilder() << "Active sessions limit exceeded, maximum allowed: "
<< sessionsLimitPerNode;
KQP_PROXY_LOG_W(requestInfo << error);
@@ -1232,14 +1304,14 @@ private:
? CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, ModuleResolverState, Counters)
: CreateKqpWorkerActor(SelfId(), sessionId, KqpSettings, workerSettings, ModuleResolverState, Counters);
auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(workerActor, TMailboxType::HTSwap, AppData()->UserPoolId);
- TKqpSessionInfo* sessionInfo = LocalSessions.Create(sessionId, workerId, database, dbCounters);
+ TKqpSessionInfo* sessionInfo = LocalSessions->Create(sessionId, workerId, database, dbCounters, supportsBalancing);
KQP_PROXY_LOG_D(requestInfo << "Created new session"
<< ", sessionId: " << sessionInfo->SessionId
<< ", workerId: " << sessionInfo->WorkerId
<< ", database: " << sessionInfo->Database
<< ", longSession: " << longSession
- << ", local sessions count: " << LocalSessions.size());
+ << ", local sessions count: " << LocalSessions->size());
result.YdbStatus = Ydb::StatusIds::SUCCESS;
result.Error.clear();
@@ -1264,7 +1336,7 @@ private:
}
if (*nodeId == SelfId().NodeId()) {
- auto localSession = LocalSessions.FindAndPromote(sessionId);
+ auto localSession = LocalSessions->FindPtr(sessionId);
if (!localSession) {
TString error = TStringBuilder() << "Session not found: " << sessionId;
KQP_PROXY_LOG_N(requestInfo << error);
@@ -1289,19 +1361,19 @@ private:
void RemoveSession(const TString& sessionId, const TActorId& workerId) {
if (!sessionId.empty()) {
- LocalSessions.Erase(sessionId);
+ LocalSessions->Erase(sessionId);
PublishResourceUsage();
if (ShutdownRequested) {
- ShutdownState->Update(LocalSessions.size());
+ ShutdownState->Update(LocalSessions->size());
}
return;
}
- LocalSessions.Erase(workerId);
+ LocalSessions->Erase(workerId);
PublishResourceUsage();
if (ShutdownRequested) {
- ShutdownState->Update(LocalSessions.size());
+ ShutdownState->Update(LocalSessions->size());
}
}
@@ -1325,7 +1397,7 @@ private:
}
TKqpDbCountersPtr GetDbCountersForSession(const TString& sessionId) const {
- auto localSession = LocalSessions.FindPtr(sessionId);
+ auto localSession = LocalSessions->FindPtr(sessionId);
return localSession ? localSession->DbCounters : nullptr;
}
@@ -1348,7 +1420,7 @@ private:
TIntrusivePtr<TModuleResolverState> ModuleResolverState;
TIntrusivePtr<TKqpCounters> Counters;
- TLocalSessionsRegistry LocalSessions;
+ std::unique_ptr<TLocalSessionsRegistry> LocalSessions;
bool ServerWorkerBalancerComplete = false;
std::optional<TString> SelfDataCenterId;
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.h b/ydb/core/kqp/proxy/kqp_proxy_service.h
index 963ab43cb1..fe3d58ed42 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_service.h
+++ b/ydb/core/kqp/proxy/kqp_proxy_service.h
@@ -78,19 +78,19 @@ struct TKqpSessionInfo {
TActorId WorkerId;
TString Database;
TKqpDbCountersPtr DbCounters;
- ui32 UseFrequency;
TInstant LastRequestAt;
TInstant CreatedAt;
TInstant ShutdownStartedAt;
+ std::vector<i32> ReadyPos;
TKqpSessionInfo(const TString& sessionId, const TActorId& workerId,
- const TString& database, TKqpDbCountersPtr dbCounters)
+ const TString& database, TKqpDbCountersPtr dbCounters, std::vector<i32>&& pos)
: SessionId(sessionId)
, WorkerId(workerId)
, Database(database)
, DbCounters(dbCounters)
- , UseFrequency(0)
, ShutdownStartedAt()
+ , ReadyPos(std::move(pos))
{
auto now = TAppData::TimeProvider->Now();
LastRequestAt = now;
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 8041d4f4c8..cd79e3e913 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1046,9 +1046,22 @@ message TTableServiceConfig {
message TSessionBalancerSettings {
optional bool Enabled = 1 [default = true];
+
+ enum EEnableTier {
+ TIER_DISABLED = 1;
+ TIER_ENABLED_FOR_SESSIONS_WITH_SUPPORT = 2;
+ TIER_ENABLED_FOR_ALL = 3;
+ }
+
+ enum EBalancingStrategy {
+ BALANCE_BY_CPU = 1;
+ BALANCE_BY_COUNT = 2;
+ BALANCE_BY_CPU_AND_COUNT = 3;
+ }
+
// MaxSessionsShutdownInFlightSize option determines the number of sessions
// that we can keep in shutdown state at the moment.
- optional uint32 MaxSessionsShutdownInFlightSize = 2 [default = 3];
+ optional uint32 MaxSessionsShutdownInFlightSize = 2 [default = 1];
// GREEN ZONE YELLOW ZONE RED ZONE
// MinCV MaxCV
// < MinCV | | > MaxCV
@@ -1063,11 +1076,11 @@ message TTableServiceConfig {
optional uint32 MaxCVTreshold = 4 [default = 20];
// timeout (in milliseconds)
// After HardSessionShutdownTimeoutMs session will be terminated.
- optional uint32 HardSessionShutdownTimeoutMs = 5 [default = 5000];
+ optional uint32 HardSessionShutdownTimeoutMs = 5 [default = 30000];
// timeout (in milliseconds)
// After SoftSessionShutdownTimeoutMs milliseconds
// session will not accept new query requests.
- optional uint32 SoftSessionShutdownTimeoutMs = 6 [default = 1000];
+ optional uint32 SoftSessionShutdownTimeoutMs = 6 [default = 15000];
// Minimum number of sessions on node to start balancing. If number of sessions on node
// less than MinNodeSessions, balancer will not be able to kick session from node.
optional uint32 MinNodeSessions = 7 [default = 10];
@@ -1079,6 +1092,9 @@ message TTableServiceConfig {
optional uint32 BoardPublishIntervalMs = 10 [default = 1000];
optional uint32 BoardLookupIntervalMs = 11 [default = 30000];
+ optional EEnableTier EnableTier = 12 [ default = TIER_DISABLED ];
+ optional EBalancingStrategy Strategy = 13 [ default = BALANCE_BY_COUNT ];
+ optional bool SupportRemoteSessionCreation = 14 [ default = false ];
}
message TQueryReplayConfig {
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index bc9db521ec..e0c220ad21 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -313,6 +313,7 @@ message TEvCreateSessionRequest {
// If flag is true, kqp proxy will create session on the different node,
// if flag is false, in this case proxy will create session locally.
optional bool CanCreateRemoteSession = 4 [default = false];
+ optional bool SupportsBalancing = 5 [ default = false];
}
message TCreateSessionResponse {