aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPisarenko Grigoriy <79596613+GrigoriyPA@users.noreply.github.com>2024-08-29 16:30:00 +0300
committerGitHub <noreply@github.com>2024-08-29 16:30:00 +0300
commit8a5c10df29f7640a14a423203c8392b194e88fa8 (patch)
tree2e52fe424a4665ed5ea59432cb725afd0e68cecd
parent9c8c9516f55af2e67efc378aff0a4c1219781e21 (diff)
downloadydb-8a5c10df29f7640a14a423203c8392b194e88fa8.tar.gz
YQ WM improved overload issues (#8437)
-rw-r--r--ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp40
-rw-r--r--ydb/core/kqp/workload_service/common/events.h11
-rw-r--r--ydb/core/kqp/workload_service/kqp_workload_service.cpp4
-rw-r--r--ydb/core/kqp/workload_service/kqp_workload_service_impl.h4
-rw-r--r--ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h6
-rw-r--r--ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp35
6 files changed, 61 insertions, 39 deletions
diff --git a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp
index 71b9e1ae39..2ff6e6912a 100644
--- a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp
+++ b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp
@@ -66,12 +66,14 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
LoadCpuThreshold->Set(std::max(poolConfig.DatabaseLoadCpuThreshold, 0.0));
}
- void OnCleanup() {
+ void OnCleanup(bool resetConfigCounters) {
ActivePoolHandlers->Dec();
- InFlightLimit->Set(0);
- QueueSizeLimit->Set(0);
- LoadCpuThreshold->Set(0);
+ if (resetConfigCounters) {
+ InFlightLimit->Set(0);
+ QueueSizeLimit->Set(0);
+ LoadCpuThreshold->Set(0);
+ }
}
private:
@@ -136,7 +138,7 @@ public:
STRICT_STFUNC(StateFuncBase,
// Workload service events
sFunc(TEvents::TEvPoison, HandlePoison);
- sFunc(TEvPrivate::TEvStopPoolHandler, HandleStop);
+ hFunc(TEvPrivate::TEvStopPoolHandler, Handle);
hFunc(TEvPrivate::TEvResolvePoolResponse, Handle);
hFunc(TEvPrivate::TEvUpdatePoolSubscription, Handle);
@@ -160,7 +162,7 @@ public:
SendPoolInfoUpdate(std::nullopt, std::nullopt, Subscribers);
- Counters.OnCleanup();
+ Counters.OnCleanup(ResetCountersOnStrop);
TBase::PassAway();
}
@@ -171,8 +173,9 @@ private:
this->PassAway();
}
- void HandleStop() {
+ void Handle(TEvPrivate::TEvStopPoolHandler::TPtr& ev) {
LOG_I("Got stop pool handler request, waiting for " << LocalSessions.size() << " requests");
+ ResetCountersOnStrop = ev->Get()->ResetCounters;
if (LocalSessions.empty()) {
PassAway();
} else {
@@ -332,7 +335,7 @@ public:
if (!request->Started && request->State != TRequest::EState::Finishing) {
if (request->State == TRequest::EState::Canceling && status == Ydb::StatusIds::SUCCESS) {
status = Ydb::StatusIds::CANCELLED;
- issues.AddIssue(TStringBuilder() << "Delay deadline exceeded in pool " << PoolId);
+ issues.AddIssue(TStringBuilder() << "Request was delayed during " << TInstant::Now() - request->StartTime << ", that is larger than delay deadline " << PoolConfig.QueryCancelAfter << " in pool " << PoolId << ", request was canceled");
}
ReplyContinue(request, status, issues);
return;
@@ -515,6 +518,7 @@ private:
ui64 LocalInFlight = 0;
std::unordered_map<TString, TRequest> LocalSessions;
bool StopHandler = false; // Stop than all requests finished
+ bool ResetCountersOnStrop = true;
};
@@ -622,8 +626,13 @@ protected:
}
void OnScheduleRequest(TRequest* request) override {
- if (PendingRequests.size() >= MAX_PENDING_REQUESTS || SaturationSub(GetLocalSessionsCount() - GetLocalInFlight(), InFlightLimit) > QueueSizeLimit) {
- ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId);
+ if (PendingRequests.size() >= MAX_PENDING_REQUESTS) {
+ ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Request was rejected, number of local pending requests is " << PendingRequests.size() << ", that is larger than allowed limit " << MAX_PENDING_REQUESTS);
+ return;
+ }
+
+ if (SaturationSub(GetLocalSessionsCount() - GetLocalInFlight(), InFlightLimit) > QueueSizeLimit) {
+ ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Request was rejected, number of local pending/delayed requests is " << GetLocalSessionsCount() - GetLocalInFlight() << ", that is larger than allowed limit " << QueueSizeLimit << " (including concurrent query limit " << InFlightLimit << ") for pool " << PoolId);
return;
}
@@ -742,15 +751,15 @@ private:
if (const ui64 delayedRequests = SaturationSub(GlobalState.AmountRequests() + PendingRequests.size(), InFlightLimit); delayedRequests > QueueSizeLimit) {
RemoveBackRequests(PendingRequests, std::min(delayedRequests - QueueSizeLimit, PendingRequests.size()), [this](TRequest* request) {
- ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId);
+ ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Request was rejected, number of local pending requests is " << PendingRequests.size() << ", number of global delayed/running requests is " << GlobalState.AmountRequests() << ", sum of them is larger than allowed limit " << QueueSizeLimit << " (including concurrent query limit " << InFlightLimit << ") for pool " << PoolId);
});
FifoCounters.PendingRequestsCount->Set(PendingRequests.size());
}
if (PendingRequests.empty() && delayedRequestsCount > QueueSizeLimit) {
- RemoveBackRequests(DelayedRequests, delayedRequestsCount - QueueSizeLimit, [this](TRequest* request) {
+ RemoveBackRequests(DelayedRequests, delayedRequestsCount - QueueSizeLimit, [this, delayedRequestsCount](TRequest* request) {
AddFinishedRequest(request->SessionId);
- ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId);
+ ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Request was rejected, number of local delayed requests is " << delayedRequestsCount << ", that is larger than allowed limit " << QueueSizeLimit << " for pool " << PoolId);
});
}
@@ -787,9 +796,10 @@ private:
if (!ev->Get()->QuotaAccepted) {
LOG_D("Skipped request start due to load cpu threshold");
if (static_cast<EStartRequestCase>(ev->Cookie) == EStartRequestCase::Pending) {
- ForEachUnfinished(DelayedRequests.begin(), DelayedRequests.end(), [this](TRequest* request) {
+ NYql::TIssues issues = GroupIssues(ev->Get()->Issues, TStringBuilder() << "Request was rejected, failed to request CPU quota for pool " << PoolId << ", current CPU threshold is " << 100.0 * ev->Get()->MaxClusterLoad << "%");
+ ForEachUnfinished(DelayedRequests.begin(), DelayedRequests.end(), [this, issues](TRequest* request) {
AddFinishedRequest(request->SessionId);
- ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId);
+ ReplyContinue(request, Ydb::StatusIds::OVERLOADED, issues);
});
}
RefreshState();
diff --git a/ydb/core/kqp/workload_service/common/events.h b/ydb/core/kqp/workload_service/common/events.h
index 3d95a65578..48643582d7 100644
--- a/ydb/core/kqp/workload_service/common/events.h
+++ b/ydb/core/kqp/workload_service/common/events.h
@@ -166,6 +166,11 @@ struct TEvPrivate {
};
struct TEvStopPoolHandler : public NActors::TEventLocal<TEvStopPoolHandler, EvStopPoolHandler> {
+ explicit TEvStopPoolHandler(bool resetCounters)
+ : ResetCounters(resetCounters)
+ {}
+
+ const bool ResetCounters;
};
struct TEvCancelRequest : public NActors::TEventLocal<TEvCancelRequest, EvCancelRequest> {
@@ -196,11 +201,15 @@ struct TEvPrivate {
};
struct TEvCpuQuotaResponse : public NActors::TEventLocal<TEvCpuQuotaResponse, EvCpuQuotaResponse> {
- explicit TEvCpuQuotaResponse(bool quotaAccepted)
+ explicit TEvCpuQuotaResponse(bool quotaAccepted, double maxClusterLoad, NYql::TIssues issues)
: QuotaAccepted(quotaAccepted)
+ , MaxClusterLoad(maxClusterLoad)
+ , Issues(std::move(issues))
{}
const bool QuotaAccepted;
+ const double MaxClusterLoad;
+ const NYql::TIssues Issues;
};
struct TEvCpuLoadResponse : public NActors::TEventLocal<TEvCpuLoadResponse, EvCpuLoadResponse> {
diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp
index 82fd72bc63..24fdd21e3d 100644
--- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp
+++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp
@@ -381,7 +381,7 @@ private:
if (auto poolState = GetPoolState(database, poolId)) {
if (poolState->NewPoolHandler) {
- Send(*poolState->NewPoolHandler, new TEvPrivate::TEvStopPoolHandler());
+ Send(*poolState->NewPoolHandler, new TEvPrivate::TEvStopPoolHandler(false));
}
poolState->NewPoolHandler = ev->Get()->NewHandler;
poolState->UpdateHandler();
@@ -443,7 +443,7 @@ private:
for (const auto& [poolKey, poolState] : PoolIdToState) {
if (!poolState.InFlightRequests && TInstant::Now() - poolState.LastUpdateTime > IDLE_DURATION) {
CpuQuotaManager->CleanupHandler(poolState.PoolHandler);
- Send(poolState.PoolHandler, new TEvPrivate::TEvStopPoolHandler());
+ Send(poolState.PoolHandler, new TEvPrivate::TEvStopPoolHandler(true));
poolsToDelete.emplace_back(poolKey);
}
}
diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h
index ca0b84cdd9..9ae115235a 100644
--- a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h
+++ b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h
@@ -121,7 +121,7 @@ struct TPoolState {
return;
}
- ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler());
+ ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler(false));
PoolHandler = *NewPoolHandler;
NewPoolHandler = std::nullopt;
InFlightRequests = 0;
@@ -160,7 +160,7 @@ struct TCpuQuotaManagerState {
auto response = CpuQuotaManager.RequestCpuQuota(0.0, maxClusterLoad);
bool quotaAccepted = response.Status == NYdb::EStatus::SUCCESS;
- ActorContext.Send(poolHandler, new TEvPrivate::TEvCpuQuotaResponse(quotaAccepted), 0, coockie);
+ ActorContext.Send(poolHandler, new TEvPrivate::TEvCpuQuotaResponse(quotaAccepted, maxClusterLoad, std::move(response.Issues)), 0, coockie);
// Schedule notification
if (!quotaAccepted) {
diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h
index 2eb9912eee..b433e86d33 100644
--- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h
+++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h
@@ -122,12 +122,6 @@ struct TSampleQueries {
}
template <typename TResult>
- static void CheckOverloaded(const TResult& result, const TString& poolId) {
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::OVERLOADED, result.GetIssues().ToString());
- UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Too many pending requests for pool " << poolId);
- }
-
- template <typename TResult>
static void CheckCancelled(const TResult& result) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::CANCELLED, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Request timeout exceeded, cancelling after");
diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp
index d1510ba8d6..9bb97b8ab0 100644
--- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp
+++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp
@@ -58,7 +58,10 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {
}
UNIT_ASSERT_C(firstRequest.HasValue(), "One of two requests shoud be rejected");
UNIT_ASSERT_C(!secondRequest.HasValue(), "One of two requests shoud be placed in pool");
- TSampleQueries::CheckOverloaded(firstRequest.GetResult(), ydb->GetSettings().PoolId_);
+
+ auto result = firstRequest.GetResult();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 2, number of global delayed/running requests is 1, sum of them is larger than allowed limit 1 (including concurrent query limit 1) for pool " << ydb->GetSettings().PoolId_);
return secondRequest;
}
@@ -114,10 +117,9 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {
auto hangingRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().HangUpDuringExecution(true));
ydb->WaitQueryExecution(hangingRequest);
- TSampleQueries::CheckOverloaded(
- ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)),
- ydb->GetSettings().PoolId_
- );
+ auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false));
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 1, number of global delayed/running requests is 1, sum of them is larger than allowed limit 0 (including concurrent query limit 1) for pool " << ydb->GetSettings().PoolId_);
ydb->ContinueQueryExecution(hangingRequest);
TSampleQueries::TSelect42::CheckResult(hangingRequest.GetResult());
@@ -142,10 +144,9 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {
ydb->WaitQueryExecution(asyncResult);
}
- TSampleQueries::CheckOverloaded(
- ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)),
- ydb->GetSettings().PoolId_
- );
+ auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false));
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 1, number of global delayed/running requests is " << inFlight << ", sum of them is larger than allowed limit 0 (including concurrent query limit " << inFlight << ") for pool " << ydb->GetSettings().PoolId_);
for (const auto& asyncResult : asyncResults) {
ydb->ContinueQueryExecution(asyncResult);
@@ -230,7 +231,8 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {
auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false));
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::CANCELLED, result.GetIssues().ToString());
- UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Delay deadline exceeded in pool " << ydb->GetSettings().PoolId_);
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was delayed during");
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << ", that is larger than delay deadline 10.000000s in pool " << ydb->GetSettings().PoolId_ << ", request was canceled");
}
Y_UNIT_TEST(TestCpuLoadThresholdRefresh) {
@@ -289,7 +291,9 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceDistributed) {
ydb->WaitPoolState({.DelayedRequests = 1, .RunningRequests = 1});
// Check distributed queue size
- TSampleQueries::CheckOverloaded(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().NodeIndex(0)), ydb->GetSettings().PoolId_);
+ auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().NodeIndex(0));
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 1, number of global delayed/running requests is 2, sum of them is larger than allowed limit 1 (including concurrent query limit 1) for pool " << ydb->GetSettings().PoolId_);
ydb->ContinueQueryExecution(delayedRequest);
ydb->ContinueQueryExecution(hangingRequest);
@@ -359,7 +363,9 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) {
);
ydb->WaitQueryExecution(hangingRequest);
- TSampleQueries::CheckOverloaded(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().PoolId(poolId)), poolId);
+ auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().PoolId(poolId));
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 1, number of global delayed/running requests is 1, sum of them is larger than allowed limit 0 (including concurrent query limit 1) for pool " << poolId);
ydb->ContinueQueryExecution(hangingRequest);
TSampleQueries::TSelect42::CheckResult(hangingRequest.GetResult());
@@ -401,7 +407,10 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) {
QUEUE_SIZE=0
);
)");
- TSampleQueries::CheckOverloaded(delayedRequest.GetResult(), ydb->GetSettings().PoolId_);
+
+ auto result = delayedRequest.GetResult();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local delayed requests is 1, that is larger than allowed limit 0 for pool " << ydb->GetSettings().PoolId_);
ydb->ContinueQueryExecution(hangingRequest);
TSampleQueries::TSelect42::CheckResult(hangingRequest.GetResult());