aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsergeyveselov <sergeyveselov@yandex-team.com>2023-09-28 17:14:29 +0300
committersergeyveselov <sergeyveselov@yandex-team.com>2023-09-28 17:36:37 +0300
commit7c3eadc2bc74bf74b1e6e8fe945a3fe0c818a24b (patch)
tree7e5ed13f3c8f5864b725488bc545fa5e1293aaf8
parent2b06fb34ee993c45ddb9bc2d5565e3fe823520fa (diff)
downloadydb-7c3eadc2bc74bf74b1e6e8fe945a3fe0c818a24b.tar.gz
answer ThrottlingException when user's budget for requesting queues list is over
answer ThrottlingException when user's budget for requesting queues list is over
-rw-r--r--ydb/core/ymq/actor/auth_multi_factory.cpp7
-rw-r--r--ydb/core/ymq/actor/events.h16
-rw-r--r--ydb/core/ymq/actor/get_queue_attributes.cpp7
-rw-r--r--ydb/core/ymq/actor/get_queue_url.cpp5
-rw-r--r--ydb/core/ymq/actor/proxy_actor.cpp8
-rw-r--r--ydb/core/ymq/actor/proxy_service.cpp2
-rw-r--r--ydb/core/ymq/actor/service.cpp55
-rw-r--r--ydb/core/ymq/actor/service.h8
-rw-r--r--ydb/tests/functional/sqs/cloud/test_common.py3
-rw-r--r--ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py40
-rw-r--r--ydb/tests/functional/sqs/common/ya.make1
11 files changed, 141 insertions, 11 deletions
diff --git a/ydb/core/ymq/actor/auth_multi_factory.cpp b/ydb/core/ymq/actor/auth_multi_factory.cpp
index ac37cfca4f..afcb35dce4 100644
--- a/ydb/core/ymq/actor/auth_multi_factory.cpp
+++ b/ydb/core/ymq/actor/auth_multi_factory.cpp
@@ -356,6 +356,13 @@ public:
}
void HandleQueueFolderIdAndCustomName(TSqsEvents::TEvQueueFolderIdAndCustomName::TPtr& ev) {
+ if (ev->Get()->Throttled) {
+ RLOG_SQS_INFO("Get queue folder id and custom name was throttled.");
+ SetError(NErrors::THROTTLING_EXCEPTION, "Too many requests for nonexistent queue");
+ SendReplyAndDie();
+ return;
+ }
+
if (ev->Get()->Failed) {
RLOG_SQS_INFO("Get queue folder id and custom name failed. Failed: " << ev->Get()->Failed << ". Exists: " << ev->Get()->Exists);
SetError(NErrors::INTERNAL_FAILURE, "Internal folder service error.");
diff --git a/ydb/core/ymq/actor/events.h b/ydb/core/ymq/actor/events.h
index 6c856bd37e..38df3c6772 100644
--- a/ydb/core/ymq/actor/events.h
+++ b/ydb/core/ymq/actor/events.h
@@ -195,6 +195,9 @@ struct TSqsEvents {
bool UserExists = false;
bool QueueExists = false;
+ // Event processing was throttled
+ bool Throttled = false;
+
// Queue info
ui32 TablesFormat = 0;
ui64 QueueVersion = 0;
@@ -441,6 +444,7 @@ struct TSqsEvents {
SessionError,
QueueDoesNotExist,
UserDoesNotExist,
+ Throttled,
};
NKikimrClient::TSqsResponse Record;
@@ -481,6 +485,7 @@ struct TSqsEvents {
NoQueue,
FailedToConnectToLeader,
Error,
+ Throttled,
};
TString RequestId;
@@ -527,13 +532,16 @@ struct TSqsEvents {
struct TEvQueueId : public NActors::TEventLocal<TEvQueueId, EvQueueId> {
bool Exists = false;
bool Failed = false;
+ // Event processing was throttled
+ bool Throttled = false;
TString QueueId; // resource id in case of Yandex.Cloud mode and queue name in case of Yandex
ui64 Version = 0; // last queue version registered in service actor
ui64 ShardsCount = 0; // number of queue shards
ui32 TablesFormat = 0;
- TEvQueueId(const bool failed = false)
+ TEvQueueId(const bool failed = false, const bool throttled = false)
: Failed(failed)
+ , Throttled(throttled)
{
}
@@ -563,11 +571,15 @@ struct TSqsEvents {
struct TEvQueueFolderIdAndCustomName : public NActors::TEventLocal<TEvQueueFolderIdAndCustomName, EvQueueFolderIdAndCustomName> {
bool Exists = false;
bool Failed = false;
+ // Event processing was throttled
+ bool Throttled = false;
+
TString QueueFolderId;
TString QueueCustomName;
- TEvQueueFolderIdAndCustomName(bool failed = false)
+ TEvQueueFolderIdAndCustomName(bool failed = false, bool throttled = false)
: Failed(failed)
+ , Throttled(throttled)
{
}
diff --git a/ydb/core/ymq/actor/get_queue_attributes.cpp b/ydb/core/ymq/actor/get_queue_attributes.cpp
index 5dacb354c0..598a12d90a 100644
--- a/ydb/core/ymq/actor/get_queue_attributes.cpp
+++ b/ydb/core/ymq/actor/get_queue_attributes.cpp
@@ -263,6 +263,13 @@ private:
void HandleQueueFolderIdAndCustomName(TSqsEvents::TEvQueueFolderIdAndCustomName::TPtr& ev) {
auto* result = Response_.MutableGetQueueAttributes();
+ if (ev->Get()->Throttled) {
+ RLOG_SQS_DEBUG("Get queue folder id and custom name was throttled.");
+ MakeError(result, NErrors::THROTTLING_EXCEPTION);
+ SendReplyAndDie();
+ return;
+ }
+
if (ev->Get()->Failed || !ev->Get()->Exists) {
RLOG_SQS_DEBUG("Get queue folder id and custom name failed. Failed: " << ev->Get()->Failed << ". Exists: " << ev->Get()->Exists);
MakeError(result, NErrors::INTERNAL_FAILURE);
diff --git a/ydb/core/ymq/actor/get_queue_url.cpp b/ydb/core/ymq/actor/get_queue_url.cpp
index 4d0861629e..8898ae01b4 100644
--- a/ydb/core/ymq/actor/get_queue_url.cpp
+++ b/ydb/core/ymq/actor/get_queue_url.cpp
@@ -58,7 +58,10 @@ private:
}
void HandleQueueId(TSqsEvents::TEvQueueId::TPtr& ev) {
- if (ev->Get()->Failed) {
+ if (ev->Get()->Throttled) {
+ RLOG_SQS_WARN("Get queue id was throttled");
+ MakeError(MutableErrorDesc(), NErrors::THROTTLING_EXCEPTION);
+ } else if (ev->Get()->Failed) {
RLOG_SQS_WARN("Get queue id failed");
MakeError(MutableErrorDesc(), NErrors::INTERNAL_FAILURE);
} else {
diff --git a/ydb/core/ymq/actor/proxy_actor.cpp b/ydb/core/ymq/actor/proxy_actor.cpp
index 8c2d2a931a..02b2d7fce0 100644
--- a/ydb/core/ymq/actor/proxy_actor.cpp
+++ b/ydb/core/ymq/actor/proxy_actor.cpp
@@ -99,6 +99,12 @@ void TProxyActor::HandleConfiguration(TSqsEvents::TEvConfiguration::TPtr& ev) {
COLLECT_HISTOGRAM_COUNTER(detailedCounters, GetConfiguration_Duration, confDuration.MilliSeconds());
}
+ if (ev->Get()->Throttled) {
+ RLOG_SQS_ERROR("Attempt to get configuration was throttled");
+ SendErrorAndDie(NErrors::THROTTLING_EXCEPTION, "Too many requests to nonexistent queue.");
+ return;
+ }
+
if (ev->Get()->Fail) {
RLOG_SQS_ERROR("Failed to get configuration");
SendErrorAndDie(NErrors::INTERNAL_FAILURE, "Failed to get configuration.");
@@ -213,6 +219,8 @@ const TErrorClass& TProxyActor::GetErrorClass(TSqsEvents::TEvProxySqsResponse::E
case EProxyStatus::QueueDoesNotExist:
case EProxyStatus::UserDoesNotExist:
return NErrors::NON_EXISTENT_QUEUE;
+ case EProxyStatus::Throttled:
+ return NErrors::THROTTLING_EXCEPTION;
default:
return NErrors::INTERNAL_FAILURE;
}
diff --git a/ydb/core/ymq/actor/proxy_service.cpp b/ydb/core/ymq/actor/proxy_service.cpp
index ea3e096e04..a73f129645 100644
--- a/ydb/core/ymq/actor/proxy_service.cpp
+++ b/ydb/core/ymq/actor/proxy_service.cpp
@@ -87,6 +87,8 @@ static TSqsEvents::TEvProxySqsResponse::EProxyStatus GetLeaderNodeForQueueStatus
return TSqsEvents::TEvProxySqsResponse::EProxyStatus::QueueDoesNotExist;
case TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::FailedToConnectToLeader:
return TSqsEvents::TEvProxySqsResponse::EProxyStatus::SessionError;
+ case TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::Throttled:
+ return TSqsEvents::TEvProxySqsResponse::EProxyStatus::Throttled;
case TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::Error:
default:
return TSqsEvents::TEvProxySqsResponse::EProxyStatus::LeaderResolvingError;
diff --git a/ydb/core/ymq/actor/service.cpp b/ydb/core/ymq/actor/service.cpp
index 09a4182f8a..5231f82806 100644
--- a/ydb/core/ymq/actor/service.cpp
+++ b/ydb/core/ymq/actor/service.cpp
@@ -537,7 +537,7 @@ void TSqsService::HandleGetLeaderNodeForQueueRequest(TSqsEvents::TEvGetLeaderNod
RLOG_SQS_REQ_DEBUG(reqId, "Queue [" << userName << "/" << queueName << "] was not found in sqs service list. Requesting queues list");
user->GetLeaderNodeRequests_.emplace(queueName, std::move(ev));
} else {
- Send(ev->Sender, new TSqsEvents::TEvGetLeaderNodeForQueueResponse(reqId, userName, queueName, TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::NoQueue));
+ AnswerThrottled(ev);
}
return;
}
@@ -585,7 +585,7 @@ void TSqsService::HandleGetConfiguration(TSqsEvents::TEvGetConfiguration::TPtr&
RLOG_SQS_REQ_DEBUG(reqId, "Queue [" << userName << "/" << queueName << "] was not found in sqs service list. Requesting queues list");
user->GetConfigurationRequests_.emplace(queueName, std::move(ev));
} else {
- AnswerNotExists(ev, user);
+ AnswerThrottled(ev);
}
return;
}
@@ -673,6 +673,31 @@ void TSqsService::AnswerFailed(TSqsEvents::TEvCountQueues::TPtr& ev, const TUser
Send(ev->Sender, new TSqsEvents::TEvCountQueuesResponse(true));
}
+void TSqsService::AnswerThrottled(TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr& ev) {
+ const TString& reqId = ev->Get()->RequestId;
+ const TString& userName = ev->Get()->UserName;
+ const TString& queueName = ev->Get()->QueueName;
+ RLOG_SQS_REQ_DEBUG(reqId, "Throttled because of too many requests for nonexistent queue [" << queueName << "] for user [" << userName << "] while getting leader node");
+ Send(ev->Sender, new TSqsEvents::TEvGetLeaderNodeForQueueResponse(reqId, userName, queueName, TSqsEvents::TEvGetLeaderNodeForQueueResponse::EStatus::Throttled));
+}
+
+void TSqsService::AnswerThrottled(TSqsEvents::TEvGetConfiguration::TPtr& ev) {
+ RLOG_SQS_REQ_DEBUG(ev->Get()->RequestId, "Throttled because of too many requests for nonexistent queue [" << ev->Get()->QueueName << "] for user [" << ev->Get()->UserName << "] while getting configuration");
+ auto answer = MakeHolder<TSqsEvents::TEvConfiguration>();
+ answer->Throttled = true;
+ Send(ev->Sender, answer.Release());
+}
+
+void TSqsService::AnswerThrottled(TSqsEvents::TEvGetQueueId::TPtr& ev) {
+ RLOG_SQS_REQ_DEBUG(ev->Get()->RequestId, "Throttled because of too many requests for nonexistent queue [" << ev->Get()->CustomQueueName << "] for user [" << ev->Get()->UserName << "] while getting queue id");
+ Send(ev->Sender, new TSqsEvents::TEvQueueId(false, true));
+}
+
+void TSqsService::AnswerThrottled(TSqsEvents::TEvGetQueueFolderIdAndCustomName::TPtr& ev) {
+ RLOG_SQS_REQ_DEBUG(ev->Get()->RequestId, "Throttled because of too many requests for nonexistent queue [" << ev->Get()->QueueName << "] for user [" << ev->Get()->UserName << "] while getting folder id and custom name");
+ Send(ev->Sender, new TSqsEvents::TEvQueueFolderIdAndCustomName(false, true));
+}
+
void TSqsService::Answer(TSqsEvents::TEvGetQueueFolderIdAndCustomName::TPtr& ev, const TQueueInfoPtr& queueInfo) {
Send(ev->Sender, new TSqsEvents::TEvQueueFolderIdAndCustomName(queueInfo->FolderId_, queueInfo->CustomName_));
}
@@ -772,9 +797,8 @@ void TSqsService::HandleGetQueueId(TSqsEvents::TEvGetQueueId::TPtr& ev) {
<< ev->Get()->FolderId << "] was not found in sqs service list for user ["
<< userName << "]. Requesting queues list");
user->GetQueueIdRequests_.emplace(std::make_pair(ev->Get()->CustomQueueName, ev->Get()->FolderId), std::move(ev));
- } else {
- AnswerNotExists(ev, user);
- }
+ } else
+ AnswerThrottled(ev);
return;
}
@@ -790,7 +814,8 @@ void TSqsService::HandleGetQueueId(TSqsEvents::TEvGetQueueId::TPtr& ev) {
);
}
-void TSqsService::HandleGetQueueFolderIdAndCustomName(TSqsEvents::TEvGetQueueFolderIdAndCustomName::TPtr& ev) {
+void TSqsService::HandleGetQueueFolderIdAndCustomName(
+ TSqsEvents::TEvGetQueueFolderIdAndCustomName::TPtr& ev) {
TUserInfoPtr user = GetUserOrWait(ev);
if (!user) {
return;
@@ -806,7 +831,7 @@ void TSqsService::HandleGetQueueFolderIdAndCustomName(TSqsEvents::TEvGetQueueFol
RLOG_SQS_REQ_DEBUG(reqId, "Queue [" << userName << "/" << queueName << "] was not found in sqs service list. Requesting queues list");
user->GetQueueFolderIdAndCustomNameRequests_.emplace(queueName, std::move(ev));
} else {
- AnswerNotExists(ev, user);
+ AnswerThrottled(ev);
}
return;
}
@@ -1301,6 +1326,13 @@ void TSqsService::AnswerNoQueueToRequests(const TUserInfoPtr& user) {
AnswerNoQueueToRequests(user, user->GetQueueFolderIdAndCustomNameRequests_);
}
+void TSqsService::AnswerThrottledToRequests(const TUserInfoPtr& user) {
+ AnswerThrottledToRequests(user->GetLeaderNodeRequests_);
+ AnswerThrottledToRequests(user->GetConfigurationRequests_);
+ AnswerThrottledToRequests(user->GetQueueIdRequests_);
+ AnswerThrottledToRequests(user->GetQueueFolderIdAndCustomNameRequests_);
+}
+
void TSqsService::AnswerErrorToRequests() {
AnswerErrorToRequests(nullptr, GetLeaderNodeRequests_);
AnswerErrorToRequests(nullptr, GetConfigurationRequests_);
@@ -1501,6 +1533,15 @@ void TSqsService::AnswerNoQueueToRequests(const TUserInfoPtr& user, TMultimap& m
}
template <class TMultimap>
+void TSqsService::AnswerThrottledToRequests(TMultimap& map) {
+ for (auto& queueToRequest : map) {
+ auto& req = queueToRequest.second;
+ AnswerThrottled(req);
+ }
+ map.clear();
+}
+
+template <class TMultimap>
void TSqsService::AnswerErrorToRequests(const TUserInfoPtr& user, TMultimap& map) {
for (auto& queueToRequest : map) {
auto& req = queueToRequest.second;
diff --git a/ydb/core/ymq/actor/service.h b/ydb/core/ymq/actor/service.h
index b7467a55f5..b3ba29ddce 100644
--- a/ydb/core/ymq/actor/service.h
+++ b/ydb/core/ymq/actor/service.h
@@ -83,6 +83,7 @@ private:
void AnswerNoUserToRequests();
void AnswerNoQueueToRequests(const TUserInfoPtr& user);
+ void AnswerThrottledToRequests(const TUserInfoPtr& user);
void AnswerErrorToRequests();
void AnswerErrorToRequests(const TUserInfoPtr& user);
@@ -116,6 +117,8 @@ private:
void AnswerNoQueueToRequests(const TUserInfoPtr& user, TMultimap& map);
template <class TMultimap>
void AnswerErrorToRequests(const TUserInfoPtr& user, TMultimap& map);
+ template <class TMultimap>
+ void AnswerThrottledToRequests(TMultimap& map);
void AnswerNotExists(TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr& ev, const TUserInfoPtr& userInfo);
void AnswerNotExists(TSqsEvents::TEvGetConfiguration::TPtr& ev, const TUserInfoPtr& userInfo);
@@ -129,6 +132,11 @@ private:
void AnswerFailed(TSqsEvents::TEvGetQueueFolderIdAndCustomName::TPtr& ev, const TUserInfoPtr& userInfo);
void AnswerFailed(TSqsEvents::TEvCountQueues::TPtr& ev, const TUserInfoPtr& userInfo);
+ void AnswerThrottled(TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr& ev);
+ void AnswerThrottled(TSqsEvents::TEvGetConfiguration::TPtr& ev);
+ void AnswerThrottled(TSqsEvents::TEvGetQueueId::TPtr& ev);
+ void AnswerThrottled(TSqsEvents::TEvGetQueueFolderIdAndCustomName::TPtr& ev);
+
void Answer(TSqsEvents::TEvGetQueueFolderIdAndCustomName::TPtr& ev, const TQueueInfoPtr& queueInfo);
void AnswerCountQueuesRequests(const TUserInfoPtr& user);
diff --git a/ydb/tests/functional/sqs/cloud/test_common.py b/ydb/tests/functional/sqs/cloud/test_common.py
index 77e94c6b6e..13fd77143f 100644
--- a/ydb/tests/functional/sqs/cloud/test_common.py
+++ b/ydb/tests/functional/sqs/cloud/test_common.py
@@ -78,7 +78,8 @@ class CommonTests(KikimrSqsTestBase):
break
except RuntimeError as e:
last_error = e
- if "The specified queue doesn't exist" not in str(e):
+ if ("The specified queue doesn't exist" not in str(e)
+ and "<Code>ThrottlingException</Code>" not in str(e)):
raise e
time.sleep(1)
else:
diff --git a/ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py b/ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py
new file mode 100644
index 0000000000..2da8c69ef2
--- /dev/null
+++ b/ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py
@@ -0,0 +1,40 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from hamcrest import assert_that, raises
+
+from ydb.tests.library.sqs.test_base import KikimrSqsTestBase
+
+
+class TestSqsThrottlingOnNonexistentQueue(KikimrSqsTestBase):
+
+ def test_throttling_on_nonexistent_queue(self):
+ queue_url = self._create_queue_and_assert(self.queue_name, False, True)
+ nonexistent_queue_url = queue_url + "_nonex"
+
+ def get_attributes_of_nonexistent_queue():
+ self._sqs_api.get_queue_attributes(nonexistent_queue_url)
+
+ # Draining budget
+ for _ in range(16):
+ try:
+ get_attributes_of_nonexistent_queue()
+ except Exception:
+ pass
+
+ throttling_exception_pattern = ".*</Message><Code>ThrottlingException</Code>.*"
+
+ assert_that(
+ get_attributes_of_nonexistent_queue,
+ raises(
+ RuntimeError,
+ pattern=throttling_exception_pattern
+ )
+ )
+
+ assert_that(
+ lambda: self._sqs_api.send_message(nonexistent_queue_url, "foobar"),
+ raises(
+ RuntimeError,
+ pattern=throttling_exception_pattern
+ )
+ )
diff --git a/ydb/tests/functional/sqs/common/ya.make b/ydb/tests/functional/sqs/common/ya.make
index 097c53a629..fa37f78944 100644
--- a/ydb/tests/functional/sqs/common/ya.make
+++ b/ydb/tests/functional/sqs/common/ya.make
@@ -12,6 +12,7 @@ TEST_SRCS(
test_queue_attributes_validation.py
test_queues_managing.py
test_format_without_version.py
+ test_throttling_nonexistent_queue.py
)
IF (SANITIZER_TYPE)