diff options
author | sergeyveselov <sergeyveselov@yandex-team.com> | 2023-09-28 17:14:29 +0300 |
---|---|---|
committer | sergeyveselov <sergeyveselov@yandex-team.com> | 2023-09-28 17:36:37 +0300 |
commit | 7c3eadc2bc74bf74b1e6e8fe945a3fe0c818a24b (patch) | |
tree | 7e5ed13f3c8f5864b725488bc545fa5e1293aaf8 | |
parent | 2b06fb34ee993c45ddb9bc2d5565e3fe823520fa (diff) | |
download | ydb-7c3eadc2bc74bf74b1e6e8fe945a3fe0c818a24b.tar.gz |
answer ThrottlingException when user's budget for requesting queues list is over
answer ThrottlingException when user's budget for requesting queues list is over
-rw-r--r-- | ydb/core/ymq/actor/auth_multi_factory.cpp | 7 | ||||
-rw-r--r-- | ydb/core/ymq/actor/events.h | 16 | ||||
-rw-r--r-- | ydb/core/ymq/actor/get_queue_attributes.cpp | 7 | ||||
-rw-r--r-- | ydb/core/ymq/actor/get_queue_url.cpp | 5 | ||||
-rw-r--r-- | ydb/core/ymq/actor/proxy_actor.cpp | 8 | ||||
-rw-r--r-- | ydb/core/ymq/actor/proxy_service.cpp | 2 | ||||
-rw-r--r-- | ydb/core/ymq/actor/service.cpp | 55 | ||||
-rw-r--r-- | ydb/core/ymq/actor/service.h | 8 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/cloud/test_common.py | 3 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py | 40 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/common/ya.make | 1 |
11 files changed, 141 insertions, 11 deletions
diff --git a/ydb/core/ymq/actor/auth_multi_factory.cpp b/ydb/core/ymq/actor/auth_multi_factory.cpp index ac37cfca4f..afcb35dce4 100644 --- a/ydb/core/ymq/actor/auth_multi_factory.cpp +++ b/ydb/core/ymq/actor/auth_multi_factory.cpp @@ -356,6 +356,13 @@ public: } void HandleQueueFolderIdAndCustomName(TSqsEvents::TEvQueueFolderIdAndCustomName::TPtr& ev) { + if (ev->Get()->Throttled) { + RLOG_SQS_INFO("Get queue folder id and custom name was throttled."); + SetError(NErrors::THROTTLING_EXCEPTION, "Too many requests for nonexistent queue"); + SendReplyAndDie(); + return; + } + if (ev->Get()->Failed) { RLOG_SQS_INFO("Get queue folder id and custom name failed. Failed: " << ev->Get()->Failed << ". Exists: " << ev->Get()->Exists); SetError(NErrors::INTERNAL_FAILURE, "Internal folder service error."); diff --git a/ydb/core/ymq/actor/events.h b/ydb/core/ymq/actor/events.h index 6c856bd37e..38df3c6772 100644 --- a/ydb/core/ymq/actor/events.h +++ b/ydb/core/ymq/actor/events.h @@ -195,6 +195,9 @@ struct TSqsEvents { bool UserExists = false; bool QueueExists = false; + // Event processing was throttled + bool Throttled = false; + // Queue info ui32 TablesFormat = 0; ui64 QueueVersion = 0; @@ -441,6 +444,7 @@ struct TSqsEvents { SessionError, QueueDoesNotExist, UserDoesNotExist, + Throttled, }; NKikimrClient::TSqsResponse Record; @@ -481,6 +485,7 @@ struct TSqsEvents { NoQueue, FailedToConnectToLeader, Error, + Throttled, }; TString RequestId; @@ -527,13 +532,16 @@ struct TSqsEvents { struct TEvQueueId : public NActors::TEventLocal<TEvQueueId, EvQueueId> { bool Exists = false; bool Failed = false; + // Event processing was throttled + bool Throttled = false; TString QueueId; // resource id in case of Yandex.Cloud mode and queue name in case of Yandex ui64 Version = 0; // last queue version registered in service actor ui64 ShardsCount = 0; // number of queue shards ui32 TablesFormat = 0; - TEvQueueId(const bool failed = false) + TEvQueueId(const bool failed = false, const bool throttled = false) : Failed(failed) + , Throttled(throttled) { } @@ -563,11 +571,15 @@ struct TSqsEvents { struct TEvQueueFolderIdAndCustomName : public NActors::TEventLocal<TEvQueueFolderIdAndCustomName, EvQueueFolderIdAndCustomName> { bool Exists = false; bool Failed = false; + // Event processing was throttled + bool Throttled = false; + TString QueueFolderId; TString QueueCustomName; - TEvQueueFolderIdAndCustomName(bool failed = false) + TEvQueueFolderIdAndCustomName(bool failed = false, bool throttled = false) : Failed(failed) + , Throttled(throttled) { } diff --git a/ydb/core/ymq/actor/get_queue_attributes.cpp b/ydb/core/ymq/actor/get_queue_attributes.cpp index 5dacb354c0..598a12d90a 100644 --- a/ydb/core/ymq/actor/get_queue_attributes.cpp +++ b/ydb/core/ymq/actor/get_queue_attributes.cpp @@ -263,6 +263,13 @@ private: void HandleQueueFolderIdAndCustomName(TSqsEvents::TEvQueueFolderIdAndCustomName::TPtr& ev) { auto* result = Response_.MutableGetQueueAttributes(); + if (ev->Get()->Throttled) { + RLOG_SQS_DEBUG("Get queue folder id and custom name was throttled."); + MakeError(result, NErrors::THROTTLING_EXCEPTION); + SendReplyAndDie(); + return; + } + if (ev->Get()->Failed || !ev->Get()->Exists) { RLOG_SQS_DEBUG("Get queue folder id and custom name failed. Failed: " << ev->Get()->Failed << ". Exists: " << ev->Get()->Exists); MakeError(result, NErrors::INTERNAL_FAILURE); diff --git a/ydb/core/ymq/actor/get_queue_url.cpp b/ydb/core/ymq/actor/get_queue_url.cpp index 4d0861629e..8898ae01b4 100644 --- a/ydb/core/ymq/actor/get_queue_url.cpp +++ b/ydb/core/ymq/actor/get_queue_url.cpp @@ -58,7 +58,10 @@ private: } void HandleQueueId(TSqsEvents::TEvQueueId::TPtr& ev) { - if (ev->Get()->Failed) { + if (ev->Get()->Throttled) { + RLOG_SQS_WARN("Get queue id was throttled"); + MakeError(MutableErrorDesc(), NErrors::THROTTLING_EXCEPTION); + } else if (ev->Get()->Failed) { RLOG_SQS_WARN("Get queue id failed"); MakeError(MutableErrorDesc(), NErrors::INTERNAL_FAILURE); } else { diff --git a/ydb/core/ymq/actor/proxy_actor.cpp b/ydb/core/ymq/actor/proxy_actor.cpp index 8c2d2a931a..02b2d7fce0 100644 --- a/ydb/core/ymq/actor/proxy_actor.cpp +++ b/ydb/core/ymq/actor/proxy_actor.cpp @@ -99,6 +99,12 @@ void TProxyActor::HandleConfiguration(TSqsEvents::TEvConfiguration::TPtr& ev) { COLLECT_HISTOGRAM_COUNTER(detailedCounters, GetConfiguration_Duration, confDuration.MilliSeconds()); } + if (ev->Get()->Throttled) { + RLOG_SQS_ERROR("Attempt to get configuration was throttled"); + SendErrorAndDie(NErrors::THROTTLING_EXCEPTION, "Too many requests to nonexistent queue."); + return; + } + if (ev->Get()->Fail) { RLOG_SQS_ERROR("Failed to get configuration"); SendErrorAndDie(NErrors::INTERNAL_FAILURE, "Failed to get configuration."); @@ -213,6 +219,8 @@ const TErrorClass& TProxyActor::GetErrorClass(TSqsEvents::TEvProxySqsResponse::E case EProxyStatus::QueueDoesNotExist: case EProxyStatus::UserDoesNotExist: return NErrors::NON_EXISTENT_QUEUE; + case EProxyStatus::Throttled: + return NErrors::THROTTLING_EXCEPTION; default: return NErrors::INTERNAL_FAILURE; } diff --git a/ydb/core/ymq/actor/proxy_service.cpp b/ydb/core/ymq/actor/proxy_service.cpp index ea3e096e04..a73f129645 100644 --- a/ydb/core/ymq/actor/proxy_service.cpp +++ b/ydb/core/ymq/actor/proxy_service.cpp @@ -87,6 +87,8 @@ static TSqsEvents::TEvProxySqsResponse::EProxyStatus GetLeaderNodeForQueueStatus return TSqsEvents::TEvProxySqsResponse::EProxyStatus::QueueDoesNotExist; case TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::FailedToConnectToLeader: return TSqsEvents::TEvProxySqsResponse::EProxyStatus::SessionError; + case TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::Throttled: + return TSqsEvents::TEvProxySqsResponse::EProxyStatus::Throttled; case TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::Error: default: return TSqsEvents::TEvProxySqsResponse::EProxyStatus::LeaderResolvingError; diff --git a/ydb/core/ymq/actor/service.cpp b/ydb/core/ymq/actor/service.cpp index 09a4182f8a..5231f82806 100644 --- a/ydb/core/ymq/actor/service.cpp +++ b/ydb/core/ymq/actor/service.cpp @@ -537,7 +537,7 @@ void TSqsService::HandleGetLeaderNodeForQueueRequest(TSqsEvents::TEvGetLeaderNod RLOG_SQS_REQ_DEBUG(reqId, "Queue [" << userName << "/" << queueName << "] was not found in sqs service list. Requesting queues list"); user->GetLeaderNodeRequests_.emplace(queueName, std::move(ev)); } else { - Send(ev->Sender, new TSqsEvents::TEvGetLeaderNodeForQueueResponse(reqId, userName, queueName, TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::NoQueue)); + AnswerThrottled(ev); } return; } @@ -585,7 +585,7 @@ void TSqsService::HandleGetConfiguration(TSqsEvents::TEvGetConfiguration::TPtr& RLOG_SQS_REQ_DEBUG(reqId, "Queue [" << userName << "/" << queueName << "] was not found in sqs service list. Requesting queues list"); user->GetConfigurationRequests_.emplace(queueName, std::move(ev)); } else { - AnswerNotExists(ev, user); + AnswerThrottled(ev); } return; } @@ -673,6 +673,31 @@ void TSqsService::AnswerFailed(TSqsEvents::TEvCountQueues::TPtr& ev, const TUser Send(ev->Sender, new TSqsEvents::TEvCountQueuesResponse(true)); } +void TSqsService::AnswerThrottled(TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr& ev) { + const TString& reqId = ev->Get()->RequestId; + const TString& userName = ev->Get()->UserName; + const TString& queueName = ev->Get()->QueueName; + RLOG_SQS_REQ_DEBUG(reqId, "Throttled because of too many requests for nonexistent queue [" << queueName << "] for user [" << userName << "] while getting leader node"); + Send(ev->Sender, new TSqsEvents::TEvGetLeaderNodeForQueueResponse(reqId, userName, queueName, TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::Throttled)); +} + +void TSqsService::AnswerThrottled(TSqsEvents::TEvGetConfiguration::TPtr& ev) { + RLOG_SQS_REQ_DEBUG(ev->Get()->RequestId, "Throttled because of too many requests for nonexistent queue [" << ev->Get()->QueueName << "] for user [" << ev->Get()->UserName << "] while getting configuration"); + auto answer = MakeHolder<TSqsEvents::TEvConfiguration>(); + answer->Throttled = true; + Send(ev->Sender, answer.Release()); +} + +void TSqsService::AnswerThrottled(TSqsEvents::TEvGetQueueId::TPtr& ev) { + RLOG_SQS_REQ_DEBUG(ev->Get()->RequestId, "Throttled because of too many requests for nonexistent queue [" << ev->Get()->CustomQueueName << "] for user [" << ev->Get()->UserName << "] while getting queue id"); + Send(ev->Sender, new TSqsEvents::TEvQueueId(false, true)); +} + +void TSqsService::AnswerThrottled(TSqsEvents::TEvGetQueueFolderIdAndCustomName::TPtr& ev) { + RLOG_SQS_REQ_DEBUG(ev->Get()->RequestId, "Throttled because of too many requests for nonexistent queue [" << ev->Get()->QueueName << "] for user [" << ev->Get()->UserName << "] while getting folder id and custom name"); + Send(ev->Sender, new TSqsEvents::TEvQueueFolderIdAndCustomName(false, true)); +} + void TSqsService::Answer(TSqsEvents::TEvGetQueueFolderIdAndCustomName::TPtr& ev, const TQueueInfoPtr& queueInfo) { Send(ev->Sender, new TSqsEvents::TEvQueueFolderIdAndCustomName(queueInfo->FolderId_, queueInfo->CustomName_)); } @@ -772,9 +797,8 @@ void TSqsService::HandleGetQueueId(TSqsEvents::TEvGetQueueId::TPtr& ev) { << ev->Get()->FolderId << "] was not found in sqs service list for user [" << userName << "]. Requesting queues list"); user->GetQueueIdRequests_.emplace(std::make_pair(ev->Get()->CustomQueueName, ev->Get()->FolderId), std::move(ev)); - } else { - AnswerNotExists(ev, user); - } + } else + AnswerThrottled(ev); return; } @@ -790,7 +814,8 @@ void TSqsService::HandleGetQueueId(TSqsEvents::TEvGetQueueId::TPtr& ev) { ); } -void TSqsService::HandleGetQueueFolderIdAndCustomName(TSqsEvents::TEvGetQueueFolderIdAndCustomName::TPtr& ev) { +void TSqsService::HandleGetQueueFolderIdAndCustomName( + TSqsEvents::TEvGetQueueFolderIdAndCustomName::TPtr& ev) { TUserInfoPtr user = GetUserOrWait(ev); if (!user) { return; @@ -806,7 +831,7 @@ void TSqsService::HandleGetQueueFolderIdAndCustomName(TSqsEvents::TEvGetQueueFol RLOG_SQS_REQ_DEBUG(reqId, "Queue [" << userName << "/" << queueName << "] was not found in sqs service list. Requesting queues list"); user->GetQueueFolderIdAndCustomNameRequests_.emplace(queueName, std::move(ev)); } else { - AnswerNotExists(ev, user); + AnswerThrottled(ev); } return; } @@ -1301,6 +1326,13 @@ void TSqsService::AnswerNoQueueToRequests(const TUserInfoPtr& user) { AnswerNoQueueToRequests(user, user->GetQueueFolderIdAndCustomNameRequests_); } +void TSqsService::AnswerThrottledToRequests(const TUserInfoPtr& user) { + AnswerThrottledToRequests(user->GetLeaderNodeRequests_); + AnswerThrottledToRequests(user->GetConfigurationRequests_); + AnswerThrottledToRequests(user->GetQueueIdRequests_); + AnswerThrottledToRequests(user->GetQueueFolderIdAndCustomNameRequests_); +} + void TSqsService::AnswerErrorToRequests() { AnswerErrorToRequests(nullptr, GetLeaderNodeRequests_); AnswerErrorToRequests(nullptr, GetConfigurationRequests_); @@ -1501,6 +1533,15 @@ void TSqsService::AnswerNoQueueToRequests(const TUserInfoPtr& user, TMultimap& m } template <class TMultimap> +void TSqsService::AnswerThrottledToRequests(TMultimap& map) { + for (auto& queueToRequest : map) { + auto& req = queueToRequest.second; + AnswerThrottled(req); + } + map.clear(); +} + +template <class TMultimap> void TSqsService::AnswerErrorToRequests(const TUserInfoPtr& user, TMultimap& map) { for (auto& queueToRequest : map) { auto& req = queueToRequest.second; diff --git a/ydb/core/ymq/actor/service.h b/ydb/core/ymq/actor/service.h index b7467a55f5..b3ba29ddce 100644 --- a/ydb/core/ymq/actor/service.h +++ b/ydb/core/ymq/actor/service.h @@ -83,6 +83,7 @@ private: void AnswerNoUserToRequests(); void AnswerNoQueueToRequests(const TUserInfoPtr& user); + void AnswerThrottledToRequests(const TUserInfoPtr& user); void AnswerErrorToRequests(); void AnswerErrorToRequests(const TUserInfoPtr& user); @@ -116,6 +117,8 @@ private: void AnswerNoQueueToRequests(const TUserInfoPtr& user, TMultimap& map); template <class TMultimap> void AnswerErrorToRequests(const TUserInfoPtr& user, TMultimap& map); + template <class TMultimap> + void AnswerThrottledToRequests(TMultimap& map); void AnswerNotExists(TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr& ev, const TUserInfoPtr& userInfo); void AnswerNotExists(TSqsEvents::TEvGetConfiguration::TPtr& ev, const TUserInfoPtr& userInfo); @@ -129,6 +132,11 @@ private: void AnswerFailed(TSqsEvents::TEvGetQueueFolderIdAndCustomName::TPtr& ev, const TUserInfoPtr& userInfo); void AnswerFailed(TSqsEvents::TEvCountQueues::TPtr& ev, const TUserInfoPtr& userInfo); + void AnswerThrottled(TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr& ev); + void AnswerThrottled(TSqsEvents::TEvGetConfiguration::TPtr& ev); + void AnswerThrottled(TSqsEvents::TEvGetQueueId::TPtr& ev); + void AnswerThrottled(TSqsEvents::TEvGetQueueFolderIdAndCustomName::TPtr& ev); + void Answer(TSqsEvents::TEvGetQueueFolderIdAndCustomName::TPtr& ev, const TQueueInfoPtr& queueInfo); void AnswerCountQueuesRequests(const TUserInfoPtr& user); diff --git a/ydb/tests/functional/sqs/cloud/test_common.py b/ydb/tests/functional/sqs/cloud/test_common.py index 77e94c6b6e..13fd77143f 100644 --- a/ydb/tests/functional/sqs/cloud/test_common.py +++ b/ydb/tests/functional/sqs/cloud/test_common.py @@ -78,7 +78,8 @@ class CommonTests(KikimrSqsTestBase): break except RuntimeError as e: last_error = e - if "The specified queue doesn't exist" not in str(e): + if ("The specified queue doesn't exist" not in str(e) + and "<Code>ThrottlingException</Code>" not in str(e)): raise e time.sleep(1) else: diff --git a/ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py b/ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py new file mode 100644 index 0000000000..2da8c69ef2 --- /dev/null +++ b/ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +from hamcrest import assert_that, raises + +from ydb.tests.library.sqs.test_base import KikimrSqsTestBase + + +class TestSqsThrottlingOnNonexistentQueue(KikimrSqsTestBase): + + def test_throttling_on_nonexistent_queue(self): + queue_url = self._create_queue_and_assert(self.queue_name, False, True) + nonexistent_queue_url = queue_url + "_nonex" + + def get_attributes_of_nonexistent_queue(): + self._sqs_api.get_queue_attributes(nonexistent_queue_url) + + # Draining budget + for _ in range(16): + try: + get_attributes_of_nonexistent_queue() + except Exception: + pass + + throttling_exception_pattern = ".*</Message><Code>ThrottlingException</Code>.*" + + assert_that( + get_attributes_of_nonexistent_queue, + raises( + RuntimeError, + pattern=throttling_exception_pattern + ) + ) + + assert_that( + lambda: self._sqs_api.send_message(nonexistent_queue_url, "foobar"), + raises( + RuntimeError, + pattern=throttling_exception_pattern + ) + ) diff --git a/ydb/tests/functional/sqs/common/ya.make b/ydb/tests/functional/sqs/common/ya.make index 097c53a629..fa37f78944 100644 --- a/ydb/tests/functional/sqs/common/ya.make +++ b/ydb/tests/functional/sqs/common/ya.make @@ -12,6 +12,7 @@ TEST_SRCS( test_queue_attributes_validation.py test_queues_managing.py test_format_without_version.py + test_throttling_nonexistent_queue.py ) IF (SANITIZER_TYPE) |