summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <[email protected]>2022-03-17 16:26:56 +0300
committerVitalii Gridnev <[email protected]>2022-03-17 16:26:56 +0300
commitd62a2feb1b4868615e72ece28cde5d7b70fa4243 (patch)
tree0c03323730dfc7893ed54450e891aec983d3cefb
parent645cbbd7ba1cc9b36688acfb6a70afdf591da69e (diff)
fix issues in balancing algorithms KIKIMR-11464
ref:f1748c89ccbb4191fabcdd74d307c7579c483381
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp11
-rw-r--r--ydb/core/kqp/counters/kqp_counters.h2
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_peer_stats_calculator.cpp16
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_service.cpp30
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_service.h20
5 files changed, 42 insertions, 37 deletions
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index 322861cb4ee..f3c96fd19e9 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -441,9 +441,6 @@ void TKqpCountersBase::ReportWorkerCreated() {
YdbSessionsActiveCount->Inc();
}
-void TKqpCountersBase::ReportSessionBalancerCV(ui32 value) {
- SessionBalancerCV->Set(value);
-}
void TKqpCountersBase::ReportProxyForwardedRequest() {
ProxyForwardedRequests->Inc();
@@ -807,14 +804,6 @@ NMonitoring::TDynamicCounterPtr TKqpCounters::GetQueryReplayCounters() const {
}
-void TKqpCounters::ReportSessionBalancerCV(TKqpDbCountersPtr dbCounters, ui32 val) {
- TKqpCountersBase::ReportSessionBalancerCV(val);
- if (dbCounters) {
- dbCounters->ReportSessionBalancerCV(val);
- }
-}
-
-
void TKqpCounters::ReportProxyForwardedRequest(TKqpDbCountersPtr dbCounters) {
TKqpCountersBase::ReportProxyForwardedRequest();
if (dbCounters) {
diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h
index 374ab524c54..36b99703cbb 100644
--- a/ydb/core/kqp/counters/kqp_counters.h
+++ b/ydb/core/kqp/counters/kqp_counters.h
@@ -81,7 +81,6 @@ protected:
void ReportQueriesPerSessionActor(ui32 queryId);
void ReportProxyForwardedRequest();
- void ReportSessionBalancerCV(ui32 value);
void ReportBeginTransaction(ui32 evictedTx, ui32 currentActiveTx, ui32 currentAbortedTx);
@@ -264,7 +263,6 @@ public:
explicit TKqpCounters(const NMonitoring::TDynamicCounterPtr& counters, const TActorContext* ctx = nullptr);
void ReportProxyForwardedRequest(TKqpDbCountersPtr dbCounters);
- void ReportSessionBalancerCV(TKqpDbCountersPtr dbCounters, ui32 val);
void ReportSessionShutdownRequest(TKqpDbCountersPtr dbCounters);
void ReportCreateSession(TKqpDbCountersPtr dbCounters, ui64 requestSize);
void ReportPingSession(TKqpDbCountersPtr dbCounters, ui64 requestSize);
diff --git a/ydb/core/kqp/proxy/kqp_proxy_peer_stats_calculator.cpp b/ydb/core/kqp/proxy/kqp_proxy_peer_stats_calculator.cpp
index 57fe7ff1c62..5448e2be9df 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_peer_stats_calculator.cpp
+++ b/ydb/core/kqp/proxy/kqp_proxy_peer_stats_calculator.cpp
@@ -41,7 +41,7 @@ TSimpleResourceStats CalcPeerStats(
if (getDataCenterId(entry) != selfDataCenterId && localDatacenterPolicy)
continue;
- xadd = static_cast<double> (entry.GetActiveWorkersCount()) - mean;
+ xadd = ExtractValue(entry) - mean;
deviation += xadd * xadd;
}
@@ -49,9 +49,7 @@ TSimpleResourceStats CalcPeerStats(
return TSimpleResourceStats(mean, deviation, static_cast<ui64>(std::llround(100.0f * deviation / mean)));
}
-TPeerStats CalcPeerStats(
- const TVector<NKikimrKqp::TKqpProxyNodeResources>& data, const TString& selfDataCenterId, bool localDatacenterPolicy)
-{
+TPeerStats CalcPeerStats(const TVector<NKikimrKqp::TKqpProxyNodeResources>& data, const TString& selfDataCenterId) {
auto getCpu = [](const NKikimrKqp::TKqpProxyNodeResources& entry) {
return entry.GetCpuUsage();
};
@@ -60,7 +58,13 @@ TPeerStats CalcPeerStats(
return static_cast<double>(entry.GetActiveWorkersCount());
};
- return TPeerStats(CalcPeerStats(data, selfDataCenterId, localDatacenterPolicy, getSessionCount), CalcPeerStats(data, selfDataCenterId, localDatacenterPolicy, getCpu));
+ return TPeerStats(
+ CalcPeerStats(data, selfDataCenterId, true, getSessionCount),
+ CalcPeerStats(data, selfDataCenterId, false, getSessionCount),
+
+ CalcPeerStats(data, selfDataCenterId, true, getCpu),
+ CalcPeerStats(data, selfDataCenterId, false, getCpu)
+ );
}
@@ -68,5 +72,5 @@ TPeerStats CalcPeerStats(
template<>
void Out<NKikimr::NKqp::TSimpleResourceStats>(IOutputStream& out, const NKikimr::NKqp::TSimpleResourceStats& v) {
- out << "ResourceStats<Mean: " << v.Mean << ", CV: " << v.CV << ", Deviation: " << v.Deviation << ">" << Endl;
+ out << "ResourceStats(Mean: " << v.Mean << ", CV: " << v.CV << ", Deviation: " << v.Deviation << ")";
}
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
index 13f30767469..b323cb2f84d 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
@@ -765,13 +765,7 @@ public:
}
Y_VERIFY(SelfDataCenterId, "Unexpected case: empty info about DC!");
- const auto& sbs = TableServiceConfig.GetSessionBalancerSettings();
- PeerStats = CalcPeerStats(PeerProxyNodeResources, *SelfDataCenterId, sbs.GetLocalDatacenterPolicy());
- if (!Tenants.empty()) {
- auto counters = Counters->GetDbCounters(*Tenants.begin());
- Counters->ReportSessionBalancerCV(counters, (ui32)PeerStats->SessionCount.CV);
- }
-
+ PeerStats = CalcPeerStats(PeerProxyNodeResources, *SelfDataCenterId);
TryKickSession();
}
@@ -803,13 +797,20 @@ public:
bool isReasonableToKick = false;
- isReasonableToKick |= ShouldStartBalancing(PeerStats->SessionCount, static_cast<double>(sbs.GetMinNodeSessions()), static_cast<double>(LocalSessions.size()));
- isReasonableToKick |= ShouldStartBalancing(PeerStats->Cpu, sbs.GetMinCpuBalancerThreshold(), NodeResources.GetCpuUsage());
+ if (sbs.GetLocalDatacenterPolicy()) {
+ isReasonableToKick |= ShouldStartBalancing(PeerStats->LocalSessionCount, static_cast<double>(sbs.GetMinNodeSessions()), static_cast<double>(LocalSessions.size()));
+ isReasonableToKick |= ShouldStartBalancing(PeerStats->LocalCpu, sbs.GetMinCpuBalancerThreshold(), NodeResources.GetCpuUsage());
+ } else {
+ isReasonableToKick |= ShouldStartBalancing(PeerStats->CrossAZSessionCount, static_cast<double>(sbs.GetMinNodeSessions()), static_cast<double>(LocalSessions.size()));
+ isReasonableToKick |= ShouldStartBalancing(PeerStats->CrossAZCpu, sbs.GetMinCpuBalancerThreshold(), NodeResources.GetCpuUsage());
+ }
if (!isReasonableToKick) {
// Start balancing
- ServerWorkerBalancerComplete = false;
+ ServerWorkerBalancerComplete = true;
return;
+ } else {
+ ServerWorkerBalancerComplete = false;
}
while(LocalSessions.GetShutdownInFlightSize() < maxInFlightSize) {
@@ -925,8 +926,13 @@ public:
str << "No peer proxy data available." << Endl;
} else {
str << Endl << "Peer Proxy data: " << Endl;
- str << "Session count stats: " << PeerStats->SessionCount << Endl;
- str << "Cpu stats: " << PeerStats->Cpu << Endl;
+ str << "Session count stats: " << Endl;
+ str << "Local: " << PeerStats->LocalSessionCount << Endl;
+ str << "Cross AZ: " << PeerStats->CrossAZSessionCount << Endl;
+
+ str << Endl << "CPU usage stats:" << Endl;
+ str << "Local: " << PeerStats->LocalCpu << Endl;
+ str << "Cross AZ: " << PeerStats->CrossAZCpu << Endl;
str << Endl;
for(const auto& entry : PeerProxyNodeResources) {
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.h b/ydb/core/kqp/proxy/kqp_proxy_service.h
index f5059d3f36b..963ab43cb1c 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_service.h
+++ b/ydb/core/kqp/proxy/kqp_proxy_service.h
@@ -111,12 +111,20 @@ struct TSimpleResourceStats {
};
struct TPeerStats {
- TSimpleResourceStats SessionCount;
- TSimpleResourceStats Cpu;
+ TSimpleResourceStats LocalSessionCount;
+ TSimpleResourceStats CrossAZSessionCount;
- TPeerStats(TSimpleResourceStats sessionsCount, TSimpleResourceStats cpu)
- : SessionCount(sessionsCount)
- , Cpu(cpu)
+ TSimpleResourceStats LocalCpu;
+ TSimpleResourceStats CrossAZCpu;
+
+
+ TPeerStats(TSimpleResourceStats localSessionsCount, TSimpleResourceStats crossAZSessionCount,
+ TSimpleResourceStats localCpu, TSimpleResourceStats crossAZCpu)
+
+ : LocalSessionCount(localSessionsCount)
+ , CrossAZSessionCount(crossAZSessionCount)
+ , LocalCpu(localCpu)
+ , CrossAZCpu(crossAZCpu)
{}
};
@@ -124,6 +132,6 @@ struct TPeerStats {
TSimpleResourceStats CalcPeerStats(
const TVector<NKikimrKqp::TKqpProxyNodeResources>& data, const TString& selfDataCenterId, bool localDatacenterPolicy,
std::function<double(const NKikimrKqp::TKqpProxyNodeResources& entry)> ExtractValue);
-TPeerStats CalcPeerStats(const TVector<NKikimrKqp::TKqpProxyNodeResources>& data, const TString& selfDataCenterId, bool localDatacenterPolicy);
+TPeerStats CalcPeerStats(const TVector<NKikimrKqp::TKqpProxyNodeResources>& data, const TString& selfDataCenterId);
} // namespace NKikimr::NKqp