diff options
author | alexbogo <alexbogo@ydb.tech> | 2023-07-17 17:11:09 +0300 |
---|---|---|
committer | alexbogo <alexbogo@ydb.tech> | 2023-07-17 17:11:09 +0300 |
commit | 3337775f0751aa9bb532c245c8056f38ea336237 (patch) | |
tree | 3dc595b96abf6d677caa11a70e3a17676f874bdc | |
parent | 8e697f3319a6aea1e8a06962d021f0b14fb9cc39 (diff) | |
download | ydb-3337775f0751aa9bb532c245c8056f38ea336237.tar.gz |
[ymq] response unavailable for retryable error & restore retries
init
-rw-r--r-- | ydb/core/ymq/actor/auth_multi_factory.cpp | 33 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/cloud/test_yandex_cloud_mode.py | 18 |
2 files changed, 46 insertions, 5 deletions
diff --git a/ydb/core/ymq/actor/auth_multi_factory.cpp b/ydb/core/ymq/actor/auth_multi_factory.cpp index ce36391485..ac37cfca4f 100644 --- a/ydb/core/ymq/actor/auth_multi_factory.cpp +++ b/ydb/core/ymq/actor/auth_multi_factory.cpp @@ -23,7 +23,8 @@ constexpr TDuration CLOUD_AUTH_RETRY_PERIOD = TDuration::MilliSeconds(10); constexpr TDuration CLOUD_AUTH_MAX_RETRY_PERIOD = TDuration::Seconds(5); constexpr ui64 AUTHENTICATE_WAKEUP_TAG = 1; -constexpr ui64 FOLDER_SERVICE_REQUEST_WAKEUP_TAG = 2; +constexpr ui64 AUTHORIZATION_WAKEUP_TAG = 2; +constexpr ui64 FOLDER_SERVICE_REQUEST_WAKEUP_TAG = 3; static const std::pair<EAction, TStringBuf> Action2Permission[] = { {EAction::ChangeMessageVisibility, "ymq.messages.changeVisibility"}, @@ -214,8 +215,12 @@ public: || status.GRpcStatusCode == grpc::StatusCode::UNAVAILABLE; } + bool CanRetry() const { + return TActivationContext::Now() < StartTime_ + CLOUD_AUTH_TIMEOUT; + } + bool CanRetry(const NGrpc::TGrpcStatus& status) const { - return TActivationContext::Now() < StartTime_ + CLOUD_AUTH_TIMEOUT && IsTemporaryError(status); + return CanRetry() && IsTemporaryError(status); } void ScheduleRetry(TDuration& duration, ui64 wakeupTag) { @@ -225,6 +230,10 @@ public: duration = Min(duration * 2, CLOUD_AUTH_MAX_RETRY_PERIOD); } + void ScheduleAuthorizationRetry() { + ScheduleRetry(AuthorizeRetryPeriod_, AUTHORIZATION_WAKEUP_TAG); + } + void ScheduleAuthenticateRetry() { ScheduleRetry(AuthenticateRetryPeriod_, AUTHENTICATE_WAKEUP_TAG); } @@ -285,9 +294,16 @@ public: Counters_.AuthorizeDuration->Collect((TActivationContext::Now() - AuthorizeRequestStartTimestamp_).MilliSeconds()); if (result.Error) { - RLOG_SQS_INFO("Authorize failed. Error: " << result.Error.ToString()); - SetError(NErrors::ACCESS_DENIED, "IAM authorization error."); - SendReplyAndDie(); + if (CanRetry() && result.Error.Retryable) { + ScheduleAuthorizationRetry(); + } else { + RLOG_SQS_INFO("Authorize failed. Error: " << result.Error.ToString()); + SetError( + result.Error.Retryable ? NErrors::SERVICE_UNAVAILABLE : NErrors::ACCESS_DENIED, + "IAM authorization error." + ); + SendReplyAndDie(); + } return; } @@ -325,6 +341,9 @@ public: void HandleWakeup(TEvWakeup::TPtr& ev) { switch (ev->Get()->Tag) { + case AUTHORIZATION_WAKEUP_TAG: + Authorize(); + break; case AUTHENTICATE_WAKEUP_TAG: Authenticate(); break; @@ -418,6 +437,10 @@ public: TEvTicketParser::TEvAuthorizeTicketResult result("fake_token", nullptr); if (AccessKeySignature_ && AccessKeySignature_->AccessKeyId.empty()) { result.Error.Message = "mocked_auth_error: empty access key"; + result.Error.Retryable = false; + } else if (AccessKeySignature_ && AccessKeySignature_->AccessKeyId == "TEST_ID_FOR_RETRYIES") { + result.Error.Message = "mocked_auth_error: correct process retries"; + result.Error.Retryable = true; } else { result.Token = MakeIntrusive<NACLib::TUserToken>("fake_user_sid@as", TVector<TString>()); } diff --git a/ydb/tests/functional/sqs/cloud/test_yandex_cloud_mode.py b/ydb/tests/functional/sqs/cloud/test_yandex_cloud_mode.py index 954d1c3202..181950b355 100644 --- a/ydb/tests/functional/sqs/cloud/test_yandex_cloud_mode.py +++ b/ydb/tests/functional/sqs/cloud/test_yandex_cloud_mode.py @@ -101,6 +101,24 @@ class TestSqsYandexCloudMode(get_test_with_sqs_tenant_installation(KikimrSqsTest assert_that(r.status_code, equal_to(400)) @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS) + def test_retryable_iam_error(self, tables_format): + self._init_with_params(tables_format=tables_format) + url = self._make_server_url() + + boto_client = self._make_boto_client('TEST_ID_FOR_RETRYIES', 'SECRET', url) + + def list_queues(): + boto_client.list_queues() + + assert_that( + list_queues, + raises( + botocore.exceptions.ClientError, + pattern='ServiceUnavailable.+IAM authorization error' + ) + ) + + @pytest.mark.parametrize(**TABLES_FORMAT_PARAMS) def test_empty_access_key_id(self, tables_format): self._init_with_params(tables_format=tables_format) url = self._make_server_url() |