diff options
author | Pisarenko Grigoriy <79596613+GrigoriyPA@users.noreply.github.com> | 2024-08-29 16:30:00 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-29 16:30:00 +0300 |
commit | 8a5c10df29f7640a14a423203c8392b194e88fa8 (patch) | |
tree | 2e52fe424a4665ed5ea59432cb725afd0e68cecd | |
parent | 9c8c9516f55af2e67efc378aff0a4c1219781e21 (diff) | |
download | ydb-8a5c10df29f7640a14a423203c8392b194e88fa8.tar.gz |
YQ WM improved overload issues (#8437)
6 files changed, 61 insertions, 39 deletions
diff --git a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp index 71b9e1ae39..2ff6e6912a 100644 --- a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp +++ b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp @@ -66,12 +66,14 @@ class TPoolHandlerActorBase : public TActor<TDerived> { LoadCpuThreshold->Set(std::max(poolConfig.DatabaseLoadCpuThreshold, 0.0)); } - void OnCleanup() { + void OnCleanup(bool resetConfigCounters) { ActivePoolHandlers->Dec(); - InFlightLimit->Set(0); - QueueSizeLimit->Set(0); - LoadCpuThreshold->Set(0); + if (resetConfigCounters) { + InFlightLimit->Set(0); + QueueSizeLimit->Set(0); + LoadCpuThreshold->Set(0); + } } private: @@ -136,7 +138,7 @@ public: STRICT_STFUNC(StateFuncBase, // Workload service events sFunc(TEvents::TEvPoison, HandlePoison); - sFunc(TEvPrivate::TEvStopPoolHandler, HandleStop); + hFunc(TEvPrivate::TEvStopPoolHandler, Handle); hFunc(TEvPrivate::TEvResolvePoolResponse, Handle); hFunc(TEvPrivate::TEvUpdatePoolSubscription, Handle); @@ -160,7 +162,7 @@ public: SendPoolInfoUpdate(std::nullopt, std::nullopt, Subscribers); - Counters.OnCleanup(); + Counters.OnCleanup(ResetCountersOnStrop); TBase::PassAway(); } @@ -171,8 +173,9 @@ private: this->PassAway(); } - void HandleStop() { + void Handle(TEvPrivate::TEvStopPoolHandler::TPtr& ev) { LOG_I("Got stop pool handler request, waiting for " << LocalSessions.size() << " requests"); + ResetCountersOnStrop = ev->Get()->ResetCounters; if (LocalSessions.empty()) { PassAway(); } else { @@ -332,7 +335,7 @@ public: if (!request->Started && request->State != TRequest::EState::Finishing) { if (request->State == TRequest::EState::Canceling && status == Ydb::StatusIds::SUCCESS) { status = Ydb::StatusIds::CANCELLED; - issues.AddIssue(TStringBuilder() << "Delay deadline exceeded in pool " << PoolId); + issues.AddIssue(TStringBuilder() << "Request was delayed during " << TInstant::Now() - request->StartTime << ", that is larger than delay deadline " << PoolConfig.QueryCancelAfter << " in pool " << PoolId << ", request was canceled"); } ReplyContinue(request, status, issues); return; @@ -515,6 +518,7 @@ private: ui64 LocalInFlight = 0; std::unordered_map<TString, TRequest> LocalSessions; bool StopHandler = false; // Stop than all requests finished + bool ResetCountersOnStrop = true; }; @@ -622,8 +626,13 @@ protected: } void OnScheduleRequest(TRequest* request) override { - if (PendingRequests.size() >= MAX_PENDING_REQUESTS || SaturationSub(GetLocalSessionsCount() - GetLocalInFlight(), InFlightLimit) > QueueSizeLimit) { - ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId); + if (PendingRequests.size() >= MAX_PENDING_REQUESTS) { + ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Request was rejected, number of local pending requests is " << PendingRequests.size() << ", that is larger than allowed limit " << MAX_PENDING_REQUESTS); + return; + } + + if (SaturationSub(GetLocalSessionsCount() - GetLocalInFlight(), InFlightLimit) > QueueSizeLimit) { + ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Request was rejected, number of local pending/delayed requests is " << GetLocalSessionsCount() - GetLocalInFlight() << ", that is larger than allowed limit " << QueueSizeLimit << " (including concurrent query limit " << InFlightLimit << ") for pool " << PoolId); return; } @@ -742,15 +751,15 @@ private: if (const ui64 delayedRequests = SaturationSub(GlobalState.AmountRequests() + PendingRequests.size(), InFlightLimit); delayedRequests > QueueSizeLimit) { RemoveBackRequests(PendingRequests, std::min(delayedRequests - QueueSizeLimit, PendingRequests.size()), [this](TRequest* request) { - ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId); + ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Request was rejected, number of local pending requests is " << PendingRequests.size() << ", number of global delayed/running requests is " << GlobalState.AmountRequests() << ", sum of them is larger than allowed limit " << QueueSizeLimit << " (including concurrent query limit " << InFlightLimit << ") for pool " << PoolId); }); FifoCounters.PendingRequestsCount->Set(PendingRequests.size()); } if (PendingRequests.empty() && delayedRequestsCount > QueueSizeLimit) { - RemoveBackRequests(DelayedRequests, delayedRequestsCount - QueueSizeLimit, [this](TRequest* request) { + RemoveBackRequests(DelayedRequests, delayedRequestsCount - QueueSizeLimit, [this, delayedRequestsCount](TRequest* request) { AddFinishedRequest(request->SessionId); - ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId); + ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Request was rejected, number of local delayed requests is " << delayedRequestsCount << ", that is larger than allowed limit " << QueueSizeLimit << " for pool " << PoolId); }); } @@ -787,9 +796,10 @@ private: if (!ev->Get()->QuotaAccepted) { LOG_D("Skipped request start due to load cpu threshold"); if (static_cast<EStartRequestCase>(ev->Cookie) == EStartRequestCase::Pending) { - ForEachUnfinished(DelayedRequests.begin(), DelayedRequests.end(), [this](TRequest* request) { + NYql::TIssues issues = GroupIssues(ev->Get()->Issues, TStringBuilder() << "Request was rejected, failed to request CPU quota for pool " << PoolId << ", current CPU threshold is " << 100.0 * ev->Get()->MaxClusterLoad << "%"); + ForEachUnfinished(DelayedRequests.begin(), DelayedRequests.end(), [this, issues](TRequest* request) { AddFinishedRequest(request->SessionId); - ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId); + ReplyContinue(request, Ydb::StatusIds::OVERLOADED, issues); }); } RefreshState(); diff --git a/ydb/core/kqp/workload_service/common/events.h b/ydb/core/kqp/workload_service/common/events.h index 3d95a65578..48643582d7 100644 --- a/ydb/core/kqp/workload_service/common/events.h +++ b/ydb/core/kqp/workload_service/common/events.h @@ -166,6 +166,11 @@ struct TEvPrivate { }; struct TEvStopPoolHandler : public NActors::TEventLocal<TEvStopPoolHandler, EvStopPoolHandler> { + explicit TEvStopPoolHandler(bool resetCounters) + : ResetCounters(resetCounters) + {} + + const bool ResetCounters; }; struct TEvCancelRequest : public NActors::TEventLocal<TEvCancelRequest, EvCancelRequest> { @@ -196,11 +201,15 @@ struct TEvPrivate { }; struct TEvCpuQuotaResponse : public NActors::TEventLocal<TEvCpuQuotaResponse, EvCpuQuotaResponse> { - explicit TEvCpuQuotaResponse(bool quotaAccepted) + explicit TEvCpuQuotaResponse(bool quotaAccepted, double maxClusterLoad, NYql::TIssues issues) : QuotaAccepted(quotaAccepted) + , MaxClusterLoad(maxClusterLoad) + , Issues(std::move(issues)) {} const bool QuotaAccepted; + const double MaxClusterLoad; + const NYql::TIssues Issues; }; struct TEvCpuLoadResponse : public NActors::TEventLocal<TEvCpuLoadResponse, EvCpuLoadResponse> { diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp index 82fd72bc63..24fdd21e3d 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp @@ -381,7 +381,7 @@ private: if (auto poolState = GetPoolState(database, poolId)) { if (poolState->NewPoolHandler) { - Send(*poolState->NewPoolHandler, new TEvPrivate::TEvStopPoolHandler()); + Send(*poolState->NewPoolHandler, new TEvPrivate::TEvStopPoolHandler(false)); } poolState->NewPoolHandler = ev->Get()->NewHandler; poolState->UpdateHandler(); @@ -443,7 +443,7 @@ private: for (const auto& [poolKey, poolState] : PoolIdToState) { if (!poolState.InFlightRequests && TInstant::Now() - poolState.LastUpdateTime > IDLE_DURATION) { CpuQuotaManager->CleanupHandler(poolState.PoolHandler); - Send(poolState.PoolHandler, new TEvPrivate::TEvStopPoolHandler()); + Send(poolState.PoolHandler, new TEvPrivate::TEvStopPoolHandler(true)); poolsToDelete.emplace_back(poolKey); } } diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h index ca0b84cdd9..9ae115235a 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h +++ b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h @@ -121,7 +121,7 @@ struct TPoolState { return; } - ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler()); + ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler(false)); PoolHandler = *NewPoolHandler; NewPoolHandler = std::nullopt; InFlightRequests = 0; @@ -160,7 +160,7 @@ struct TCpuQuotaManagerState { auto response = CpuQuotaManager.RequestCpuQuota(0.0, maxClusterLoad); bool quotaAccepted = response.Status == NYdb::EStatus::SUCCESS; - ActorContext.Send(poolHandler, new TEvPrivate::TEvCpuQuotaResponse(quotaAccepted), 0, coockie); + ActorContext.Send(poolHandler, new TEvPrivate::TEvCpuQuotaResponse(quotaAccepted, maxClusterLoad, std::move(response.Issues)), 0, coockie); // Schedule notification if (!quotaAccepted) { diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h index 2eb9912eee..b433e86d33 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h @@ -122,12 +122,6 @@ struct TSampleQueries { } template <typename TResult> - static void CheckOverloaded(const TResult& result, const TString& poolId) { - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::OVERLOADED, result.GetIssues().ToString()); - UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Too many pending requests for pool " << poolId); - } - - template <typename TResult> static void CheckCancelled(const TResult& result) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::CANCELLED, result.GetIssues().ToString()); UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Request timeout exceeded, cancelling after"); diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp index d1510ba8d6..9bb97b8ab0 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp @@ -58,7 +58,10 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) { } UNIT_ASSERT_C(firstRequest.HasValue(), "One of two requests shoud be rejected"); UNIT_ASSERT_C(!secondRequest.HasValue(), "One of two requests shoud be placed in pool"); - TSampleQueries::CheckOverloaded(firstRequest.GetResult(), ydb->GetSettings().PoolId_); + + auto result = firstRequest.GetResult(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 2, number of global delayed/running requests is 1, sum of them is larger than allowed limit 1 (including concurrent query limit 1) for pool " << ydb->GetSettings().PoolId_); return secondRequest; } @@ -114,10 +117,9 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) { auto hangingRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().HangUpDuringExecution(true)); ydb->WaitQueryExecution(hangingRequest); - TSampleQueries::CheckOverloaded( - ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)), - ydb->GetSettings().PoolId_ - ); + auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 1, number of global delayed/running requests is 1, sum of them is larger than allowed limit 0 (including concurrent query limit 1) for pool " << ydb->GetSettings().PoolId_); ydb->ContinueQueryExecution(hangingRequest); TSampleQueries::TSelect42::CheckResult(hangingRequest.GetResult()); @@ -142,10 +144,9 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) { ydb->WaitQueryExecution(asyncResult); } - TSampleQueries::CheckOverloaded( - ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)), - ydb->GetSettings().PoolId_ - ); + auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 1, number of global delayed/running requests is " << inFlight << ", sum of them is larger than allowed limit 0 (including concurrent query limit " << inFlight << ") for pool " << ydb->GetSettings().PoolId_); for (const auto& asyncResult : asyncResults) { ydb->ContinueQueryExecution(asyncResult); @@ -230,7 +231,8 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) { auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::CANCELLED, result.GetIssues().ToString()); - UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Delay deadline exceeded in pool " << ydb->GetSettings().PoolId_); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was delayed during"); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << ", that is larger than delay deadline 10.000000s in pool " << ydb->GetSettings().PoolId_ << ", request was canceled"); } Y_UNIT_TEST(TestCpuLoadThresholdRefresh) { @@ -289,7 +291,9 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceDistributed) { ydb->WaitPoolState({.DelayedRequests = 1, .RunningRequests = 1}); // Check distributed queue size - TSampleQueries::CheckOverloaded(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().NodeIndex(0)), ydb->GetSettings().PoolId_); + auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().NodeIndex(0)); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 1, number of global delayed/running requests is 2, sum of them is larger than allowed limit 1 (including concurrent query limit 1) for pool " << ydb->GetSettings().PoolId_); ydb->ContinueQueryExecution(delayedRequest); ydb->ContinueQueryExecution(hangingRequest); @@ -359,7 +363,9 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) { ); ydb->WaitQueryExecution(hangingRequest); - TSampleQueries::CheckOverloaded(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().PoolId(poolId)), poolId); + auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().PoolId(poolId)); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 1, number of global delayed/running requests is 1, sum of them is larger than allowed limit 0 (including concurrent query limit 1) for pool " << poolId); ydb->ContinueQueryExecution(hangingRequest); TSampleQueries::TSelect42::CheckResult(hangingRequest.GetResult()); @@ -401,7 +407,10 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) { QUEUE_SIZE=0 ); )"); - TSampleQueries::CheckOverloaded(delayedRequest.GetResult(), ydb->GetSettings().PoolId_); + + auto result = delayedRequest.GetResult(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local delayed requests is 1, that is larger than allowed limit 0 for pool " << ydb->GetSettings().PoolId_); ydb->ContinueQueryExecution(hangingRequest); TSampleQueries::TSelect42::CheckResult(hangingRequest.GetResult()); |