aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexbogo <alexbogo@ydb.tech>2023-07-17 17:11:09 +0300
committeralexbogo <alexbogo@ydb.tech>2023-07-17 17:11:09 +0300
commit3337775f0751aa9bb532c245c8056f38ea336237 (patch)
tree3dc595b96abf6d677caa11a70e3a17676f874bdc
parent8e697f3319a6aea1e8a06962d021f0b14fb9cc39 (diff)
downloadydb-3337775f0751aa9bb532c245c8056f38ea336237.tar.gz
[ymq] response unavailable for retryable error & restore retries
init
-rw-r--r--ydb/core/ymq/actor/auth_multi_factory.cpp33
-rw-r--r--ydb/tests/functional/sqs/cloud/test_yandex_cloud_mode.py18
2 files changed, 46 insertions, 5 deletions
diff --git a/ydb/core/ymq/actor/auth_multi_factory.cpp b/ydb/core/ymq/actor/auth_multi_factory.cpp
index ce36391485..ac37cfca4f 100644
--- a/ydb/core/ymq/actor/auth_multi_factory.cpp
+++ b/ydb/core/ymq/actor/auth_multi_factory.cpp
@@ -23,7 +23,8 @@ constexpr TDuration CLOUD_AUTH_RETRY_PERIOD = TDuration::MilliSeconds(10);
constexpr TDuration CLOUD_AUTH_MAX_RETRY_PERIOD = TDuration::Seconds(5);
constexpr ui64 AUTHENTICATE_WAKEUP_TAG = 1;
-constexpr ui64 FOLDER_SERVICE_REQUEST_WAKEUP_TAG = 2;
+constexpr ui64 AUTHORIZATION_WAKEUP_TAG = 2;
+constexpr ui64 FOLDER_SERVICE_REQUEST_WAKEUP_TAG = 3;
static const std::pair<EAction, TStringBuf> Action2Permission[] = {
{EAction::ChangeMessageVisibility, "ymq.messages.changeVisibility"},
@@ -214,8 +215,12 @@ public:
|| status.GRpcStatusCode == grpc::StatusCode::UNAVAILABLE;
}
+ bool CanRetry() const {
+ return TActivationContext::Now() < StartTime_ + CLOUD_AUTH_TIMEOUT;
+ }
+
bool CanRetry(const NGrpc::TGrpcStatus& status) const {
- return TActivationContext::Now() < StartTime_ + CLOUD_AUTH_TIMEOUT && IsTemporaryError(status);
+ return CanRetry() && IsTemporaryError(status);
}
void ScheduleRetry(TDuration& duration, ui64 wakeupTag) {
@@ -225,6 +230,10 @@ public:
duration = Min(duration * 2, CLOUD_AUTH_MAX_RETRY_PERIOD);
}
+ void ScheduleAuthorizationRetry() {
+ ScheduleRetry(AuthorizeRetryPeriod_, AUTHORIZATION_WAKEUP_TAG);
+ }
+
void ScheduleAuthenticateRetry() {
ScheduleRetry(AuthenticateRetryPeriod_, AUTHENTICATE_WAKEUP_TAG);
}
@@ -285,9 +294,16 @@ public:
Counters_.AuthorizeDuration->Collect((TActivationContext::Now() - AuthorizeRequestStartTimestamp_).MilliSeconds());
if (result.Error) {
- RLOG_SQS_INFO("Authorize failed. Error: " << result.Error.ToString());
- SetError(NErrors::ACCESS_DENIED, "IAM authorization error.");
- SendReplyAndDie();
+ if (CanRetry() && result.Error.Retryable) {
+ ScheduleAuthorizationRetry();
+ } else {
+ RLOG_SQS_INFO("Authorize failed. Error: " << result.Error.ToString());
+ SetError(
+ result.Error.Retryable ? NErrors::SERVICE_UNAVAILABLE : NErrors::ACCESS_DENIED,
+ "IAM authorization error."
+ );
+ SendReplyAndDie();
+ }
return;
}
@@ -325,6 +341,9 @@ public:
void HandleWakeup(TEvWakeup::TPtr& ev) {
switch (ev->Get()->Tag) {
+ case AUTHORIZATION_WAKEUP_TAG:
+ Authorize();
+ break;
case AUTHENTICATE_WAKEUP_TAG:
Authenticate();
break;
@@ -418,6 +437,10 @@ public:
TEvTicketParser::TEvAuthorizeTicketResult result("fake_token", nullptr);
if (AccessKeySignature_ && AccessKeySignature_->AccessKeyId.empty()) {
result.Error.Message = "mocked_auth_error: empty access key";
+ result.Error.Retryable = false;
+ } else if (AccessKeySignature_ && AccessKeySignature_->AccessKeyId == "TEST_ID_FOR_RETRYIES") {
+ result.Error.Message = "mocked_auth_error: correct process retries";
+ result.Error.Retryable = true;
} else {
result.Token = MakeIntrusive<NACLib::TUserToken>("fake_user_sid@as", TVector<TString>());
}
diff --git a/ydb/tests/functional/sqs/cloud/test_yandex_cloud_mode.py b/ydb/tests/functional/sqs/cloud/test_yandex_cloud_mode.py
index 954d1c3202..181950b355 100644
--- a/ydb/tests/functional/sqs/cloud/test_yandex_cloud_mode.py
+++ b/ydb/tests/functional/sqs/cloud/test_yandex_cloud_mode.py
@@ -101,6 +101,24 @@ class TestSqsYandexCloudMode(get_test_with_sqs_tenant_installation(KikimrSqsTest
assert_that(r.status_code, equal_to(400))
@pytest.mark.parametrize(**TABLES_FORMAT_PARAMS)
+ def test_retryable_iam_error(self, tables_format):
+ self._init_with_params(tables_format=tables_format)
+ url = self._make_server_url()
+
+ boto_client = self._make_boto_client('TEST_ID_FOR_RETRYIES', 'SECRET', url)
+
+ def list_queues():
+ boto_client.list_queues()
+
+ assert_that(
+ list_queues,
+ raises(
+ botocore.exceptions.ClientError,
+ pattern='ServiceUnavailable.+IAM authorization error'
+ )
+ )
+
+ @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS)
def test_empty_access_key_id(self, tables_format):
self._init_with_params(tables_format=tables_format)
url = self._make_server_url()