aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/google-auth/py3/tests/test_aws.py
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-04-04 07:45:46 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-04-04 07:53:28 +0300
commit51958dfd22674e02052c8a292ab70fc2d52a07fc (patch)
tree42d2b859c555e9045203791ed3f60fa16e1b267e /contrib/python/google-auth/py3/tests/test_aws.py
parent7e62114667a5059c6b6a617d4cd9076928818478 (diff)
downloadydb-51958dfd22674e02052c8a292ab70fc2d52a07fc.tar.gz
Intermediate changes
Diffstat (limited to 'contrib/python/google-auth/py3/tests/test_aws.py')
-rw-r--r--contrib/python/google-auth/py3/tests/test_aws.py498
1 files changed, 398 insertions, 100 deletions
diff --git a/contrib/python/google-auth/py3/tests/test_aws.py b/contrib/python/google-auth/py3/tests/test_aws.py
index 3f358d52b0..5614820312 100644
--- a/contrib/python/google-auth/py3/tests/test_aws.py
+++ b/contrib/python/google-auth/py3/tests/test_aws.py
@@ -21,7 +21,7 @@ import urllib.parse
import mock
import pytest # type: ignore
-from google.auth import _helpers
+from google.auth import _helpers, external_account
from google.auth import aws
from google.auth import environment_vars
from google.auth import exceptions
@@ -616,8 +616,13 @@ class TestRequestSigner(object):
):
utcnow.return_value = datetime.datetime.strptime(time, "%Y-%m-%dT%H:%M:%SZ")
request_signer = aws.RequestSigner(region)
+ credentials_object = aws.AwsSecurityCredentials(
+ credentials.get("access_key_id"),
+ credentials.get("secret_access_key"),
+ credentials.get("security_token"),
+ )
actual_signed_request = request_signer.get_request_options(
- credentials,
+ credentials_object,
original_request.get("url"),
original_request.get("method"),
original_request.get("data"),
@@ -631,10 +636,7 @@ class TestRequestSigner(object):
with pytest.raises(ValueError) as excinfo:
request_signer.get_request_options(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- },
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY),
"invalid",
"POST",
)
@@ -646,10 +648,7 @@ class TestRequestSigner(object):
with pytest.raises(ValueError) as excinfo:
request_signer.get_request_options(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- },
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY),
"http://invalid",
"POST",
)
@@ -661,10 +660,7 @@ class TestRequestSigner(object):
with pytest.raises(ValueError) as excinfo:
request_signer.get_request_options(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- },
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY),
"https://",
"POST",
)
@@ -672,6 +668,36 @@ class TestRequestSigner(object):
assert excinfo.match(r"Invalid AWS service URL")
+class TestAwsSecurityCredentialsSupplier(aws.AwsSecurityCredentialsSupplier):
+ def __init__(
+ self,
+ security_credentials=None,
+ region=None,
+ credentials_exception=None,
+ region_exception=None,
+ expected_context=None,
+ ):
+ self._security_credentials = security_credentials
+ self._region = region
+ self._credentials_exception = credentials_exception
+ self._region_exception = region_exception
+ self._expected_context = expected_context
+
+ def get_aws_security_credentials(self, context, request):
+ if self._expected_context is not None:
+ assert self._expected_context == context
+ if self._credentials_exception is not None:
+ raise self._credentials_exception
+ return self._security_credentials
+
+ def get_aws_region(self, context, request):
+ if self._expected_context is not None:
+ assert self._expected_context == context
+ if self._region_exception is not None:
+ raise self._region_exception
+ return self._region
+
+
class TestCredentials(object):
AWS_REGION = "us-east-2"
AWS_ROLE = "gcp-aws-role"
@@ -734,7 +760,7 @@ class TestCredentials(object):
],
}
# Include security token if available.
- if "security_token" in aws_security_credentials:
+ if aws_security_credentials.session_token is not None:
reformatted_signed_request.get("headers").append(
{
"key": "x-amz-security-token",
@@ -773,16 +799,17 @@ class TestCredentials(object):
in an AWS environment.
"""
responses = []
- if imdsv2_session_token_status:
- # AWS session token request
- imdsv2_session_response = mock.create_autospec(
- transport.Response, instance=True
- )
- imdsv2_session_response.status = imdsv2_session_token_status
- imdsv2_session_response.data = imdsv2_session_token_data
- responses.append(imdsv2_session_response)
if region_status:
+ if imdsv2_session_token_status:
+ # AWS session token request
+ imdsv2_session_response = mock.create_autospec(
+ transport.Response, instance=True
+ )
+ imdsv2_session_response.status = imdsv2_session_token_status
+ imdsv2_session_response.data = imdsv2_session_token_data
+ responses.append(imdsv2_session_response)
+
# AWS region request.
region_response = mock.create_autospec(transport.Response, instance=True)
region_response.status = region_status
@@ -790,6 +817,15 @@ class TestCredentials(object):
region_response.data = "{}b".format(region_name).encode("utf-8")
responses.append(region_response)
+ if imdsv2_session_token_status:
+ # AWS session token request
+ imdsv2_session_response = mock.create_autospec(
+ transport.Response, instance=True
+ )
+ imdsv2_session_response.status = imdsv2_session_token_status
+ imdsv2_session_response.data = imdsv2_session_token_data
+ responses.append(imdsv2_session_response)
+
if role_status:
# AWS role name request.
role_response = mock.create_autospec(transport.Response, instance=True)
@@ -834,7 +870,8 @@ class TestCredentials(object):
@classmethod
def make_credentials(
cls,
- credential_source,
+ credential_source=None,
+ aws_security_credentials_supplier=None,
token_url=TOKEN_URL,
token_info_url=TOKEN_INFO_URL,
client_id=None,
@@ -851,6 +888,7 @@ class TestCredentials(object):
token_info_url=token_info_url,
service_account_impersonation_url=service_account_impersonation_url,
credential_source=credential_source,
+ aws_security_credentials_supplier=aws_security_credentials_supplier,
client_id=client_id,
client_secret=client_secret,
quota_project_id=quota_project_id,
@@ -929,6 +967,7 @@ class TestCredentials(object):
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
credential_source=self.CREDENTIAL_SOURCE,
+ aws_security_credentials_supplier=None,
quota_project_id=QUOTA_PROJECT_ID,
workforce_pool_user_project=None,
universe_domain=DEFAULT_UNIVERSE_DOMAIN,
@@ -957,6 +996,38 @@ class TestCredentials(object):
client_id=None,
client_secret=None,
credential_source=self.CREDENTIAL_SOURCE,
+ aws_security_credentials_supplier=None,
+ quota_project_id=None,
+ workforce_pool_user_project=None,
+ universe_domain=DEFAULT_UNIVERSE_DOMAIN,
+ )
+
+ @mock.patch.object(aws.Credentials, "__init__", return_value=None)
+ def test_from_info_supplier(self, mock_init):
+ supplier = TestAwsSecurityCredentialsSupplier()
+
+ credentials = aws.Credentials.from_info(
+ {
+ "audience": AUDIENCE,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "aws_security_credentials_supplier": supplier,
+ }
+ )
+
+ # Confirm aws.Credentials instance initialized with the expected parameters.
+ assert isinstance(credentials, aws.Credentials)
+ mock_init.assert_called_once_with(
+ audience=AUDIENCE,
+ subject_token_type=SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ token_info_url=None,
+ service_account_impersonation_url=None,
+ service_account_impersonation_options={},
+ client_id=None,
+ client_secret=None,
+ credential_source=None,
+ aws_security_credentials_supplier=supplier,
quota_project_id=None,
workforce_pool_user_project=None,
universe_domain=DEFAULT_UNIVERSE_DOMAIN,
@@ -993,6 +1064,7 @@ class TestCredentials(object):
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
credential_source=self.CREDENTIAL_SOURCE,
+ aws_security_credentials_supplier=None,
quota_project_id=QUOTA_PROJECT_ID,
workforce_pool_user_project=None,
universe_domain=DEFAULT_UNIVERSE_DOMAIN,
@@ -1022,6 +1094,7 @@ class TestCredentials(object):
client_id=None,
client_secret=None,
credential_source=self.CREDENTIAL_SOURCE,
+ aws_security_credentials_supplier=None,
quota_project_id=None,
workforce_pool_user_project=None,
universe_domain=DEFAULT_UNIVERSE_DOMAIN,
@@ -1036,6 +1109,27 @@ class TestCredentials(object):
assert excinfo.match(r"No valid AWS 'credential_source' provided")
+ def test_constructor_invalid_credential_source_and_supplier(self):
+ # Provide both a credential source and supplier.
+ with pytest.raises(ValueError) as excinfo:
+ self.make_credentials(
+ credential_source=self.CREDENTIAL_SOURCE,
+ aws_security_credentials_supplier="test",
+ )
+
+ assert excinfo.match(
+ r"AWS credential cannot have both a credential source and an AWS security credentials supplier."
+ )
+
+ def test_constructor_invalid_no_credential_source_or_supplier(self):
+ # Provide no credential source or supplier.
+ with pytest.raises(ValueError) as excinfo:
+ self.make_credentials()
+
+ assert excinfo.match(
+ r"A valid credential source or AWS security credentials supplier must be provided."
+ )
+
def test_constructor_invalid_environment_id(self):
# Provide invalid environment_id.
credential_source = self.CREDENTIAL_SOURCE.copy()
@@ -1158,11 +1252,7 @@ class TestCredentials(object):
subject_token = credentials.retrieve_subject_token(request)
assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
# Assert region request.
self.assert_aws_metadata_request_kwargs(
@@ -1231,11 +1321,7 @@ class TestCredentials(object):
subject_token = credentials.retrieve_subject_token(request)
assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
# Assert session token request
self.assert_aws_metadata_request_kwargs(
@@ -1250,15 +1336,22 @@ class TestCredentials(object):
REGION_URL,
{"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
)
- # Assert role request.
+ # Assert session token request
self.assert_aws_metadata_request_kwargs(
request.call_args_list[2][1],
+ IMDSV2_SESSION_TOKEN_URL,
+ {"X-aws-ec2-metadata-token-ttl-seconds": "300"},
+ "PUT",
+ )
+ # Assert role request.
+ self.assert_aws_metadata_request_kwargs(
+ request.call_args_list[3][1],
SECURITY_CREDS_URL,
{"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
)
# Assert security credentials request.
self.assert_aws_metadata_request_kwargs(
- request.call_args_list[3][1],
+ request.call_args_list[4][1],
"{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
{
"Content-Type": "application/json",
@@ -1335,11 +1428,7 @@ class TestCredentials(object):
subject_token = credentials.retrieve_subject_token(request)
assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
# Assert session token request.
self.assert_aws_metadata_request_kwargs(
@@ -1396,11 +1485,7 @@ class TestCredentials(object):
subject_token = credentials.retrieve_subject_token(request)
assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
# Assert session token request.
self.assert_aws_metadata_request_kwargs(
@@ -1451,11 +1536,7 @@ class TestCredentials(object):
subject_token = credentials.retrieve_subject_token(request)
assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
# Assert session token request.
self.assert_aws_metadata_request_kwargs(
@@ -1530,11 +1611,7 @@ class TestCredentials(object):
subject_token = credentials.retrieve_subject_token(request)
assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
# Assert session token request.
self.assert_aws_metadata_request_kwargs(
@@ -1549,15 +1626,22 @@ class TestCredentials(object):
REGION_URL_IPV6,
{"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
)
- # Assert role request.
+ # Assert session token request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[2][1],
+ IMDSV2_SESSION_TOKEN_URL_IPV6,
+ {"X-aws-ec2-metadata-token-ttl-seconds": "300"},
+ "PUT",
+ )
+ # Assert role request.
+ self.assert_aws_metadata_request_kwargs(
+ request.call_args_list[3][1],
SECURITY_CREDS_URL_IPV6,
{"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
)
# Assert security credentials request.
self.assert_aws_metadata_request_kwargs(
- request.call_args_list[3][1],
+ request.call_args_list[4][1],
"{}/{}".format(SECURITY_CREDS_URL_IPV6, self.AWS_ROLE),
{
"Content-Type": "application/json",
@@ -1619,7 +1703,7 @@ class TestCredentials(object):
subject_token = credentials.retrieve_subject_token(request)
assert subject_token == self.make_serialized_aws_signed_request(
- {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY}
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY)
)
@mock.patch("google.auth._helpers.utcnow")
@@ -1636,11 +1720,7 @@ class TestCredentials(object):
subject_token = credentials.retrieve_subject_token(None)
assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
@mock.patch("google.auth._helpers.utcnow")
@@ -1659,11 +1739,7 @@ class TestCredentials(object):
subject_token = credentials.retrieve_subject_token(None)
assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
@mock.patch("google.auth._helpers.utcnow")
@@ -1686,11 +1762,7 @@ class TestCredentials(object):
subject_token = credentials.retrieve_subject_token(None)
assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
@mock.patch("google.auth._helpers.utcnow")
@@ -1708,7 +1780,7 @@ class TestCredentials(object):
subject_token = credentials.retrieve_subject_token(None)
assert subject_token == self.make_serialized_aws_signed_request(
- {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY}
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY)
)
@mock.patch("google.auth._helpers.utcnow")
@@ -1730,11 +1802,7 @@ class TestCredentials(object):
subject_token = credentials.retrieve_subject_token(request)
assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
def test_retrieve_subject_token_error_determining_aws_region(self):
@@ -1806,11 +1874,7 @@ class TestCredentials(object):
self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
)
expected_subject_token = self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
token_headers = {
"Content-Type": "application/x-www-form-urlencoded",
@@ -1869,11 +1933,7 @@ class TestCredentials(object):
self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
)
expected_subject_token = self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
token_headers = {
"Content-Type": "application/x-www-form-urlencoded",
@@ -1939,11 +1999,7 @@ class TestCredentials(object):
_helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
).isoformat("T") + "Z"
expected_subject_token = self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
token_headers = {
"Content-Type": "application/x-www-form-urlencoded",
@@ -2036,11 +2092,7 @@ class TestCredentials(object):
_helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
).isoformat("T") + "Z"
expected_subject_token = self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
token_headers = {
"Content-Type": "application/x-www-form-urlencoded",
@@ -2122,3 +2174,249 @@ class TestCredentials(object):
credentials.refresh(request)
assert excinfo.match(r"Unable to retrieve AWS region")
+
+ @mock.patch("google.auth._helpers.utcnow")
+ def test_retrieve_subject_token_success_with_supplier(self, utcnow):
+ utcnow.return_value = datetime.datetime.strptime(
+ self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
+ )
+ request = self.make_mock_request()
+
+ security_credentials = aws.AwsSecurityCredentials(
+ ACCESS_KEY_ID, SECRET_ACCESS_KEY
+ )
+ supplier = TestAwsSecurityCredentialsSupplier(
+ security_credentials=security_credentials, region=self.AWS_REGION
+ )
+
+ credentials = self.make_credentials(aws_security_credentials_supplier=supplier)
+
+ subject_token = credentials.retrieve_subject_token(request)
+ assert subject_token == self.make_serialized_aws_signed_request(
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY)
+ )
+
+ @mock.patch("google.auth._helpers.utcnow")
+ def test_retrieve_subject_token_success_with_supplier_session_token(self, utcnow):
+ utcnow.return_value = datetime.datetime.strptime(
+ self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
+ )
+ request = self.make_mock_request()
+
+ security_credentials = aws.AwsSecurityCredentials(
+ ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN
+ )
+ supplier = TestAwsSecurityCredentialsSupplier(
+ security_credentials=security_credentials, region=self.AWS_REGION
+ )
+
+ credentials = self.make_credentials(aws_security_credentials_supplier=supplier)
+
+ subject_token = credentials.retrieve_subject_token(request)
+ assert subject_token == self.make_serialized_aws_signed_request(
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
+ )
+
+ @mock.patch("google.auth._helpers.utcnow")
+ def test_retrieve_subject_token_success_with_supplier_correct_context(self, utcnow):
+ utcnow.return_value = datetime.datetime.strptime(
+ self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
+ )
+ request = self.make_mock_request()
+ expected_context = external_account.SupplierContext(
+ SUBJECT_TOKEN_TYPE, AUDIENCE
+ )
+
+ security_credentials = aws.AwsSecurityCredentials(
+ ACCESS_KEY_ID, SECRET_ACCESS_KEY
+ )
+ supplier = TestAwsSecurityCredentialsSupplier(
+ security_credentials=security_credentials,
+ region=self.AWS_REGION,
+ expected_context=expected_context,
+ )
+
+ credentials = self.make_credentials(aws_security_credentials_supplier=supplier)
+
+ credentials.retrieve_subject_token(request)
+
+ def test_retrieve_subject_token_error_with_supplier(self):
+ request = self.make_mock_request()
+ expected_exception = exceptions.RefreshError("Test error")
+ supplier = TestAwsSecurityCredentialsSupplier(
+ region=self.AWS_REGION, credentials_exception=expected_exception
+ )
+
+ credentials = self.make_credentials(aws_security_credentials_supplier=supplier)
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials.refresh(request)
+
+ assert excinfo.match(r"Test error")
+
+ def test_retrieve_subject_token_error_with_supplier_region(self):
+ request = self.make_mock_request()
+ expected_exception = exceptions.RefreshError("Test error")
+ security_credentials = aws.AwsSecurityCredentials(
+ ACCESS_KEY_ID, SECRET_ACCESS_KEY
+ )
+ supplier = TestAwsSecurityCredentialsSupplier(
+ security_credentials=security_credentials,
+ region_exception=expected_exception,
+ )
+
+ credentials = self.make_credentials(aws_security_credentials_supplier=supplier)
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials.refresh(request)
+
+ assert excinfo.match(r"Test error")
+
+ @mock.patch(
+ "google.auth.metrics.python_and_auth_lib_version",
+ return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
+ )
+ @mock.patch("google.auth._helpers.utcnow")
+ def test_refresh_success_with_supplier_with_impersonation(
+ self, utcnow, mock_auth_lib_value
+ ):
+ utcnow.return_value = datetime.datetime.strptime(
+ self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
+ )
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
+ ).isoformat("T") + "Z"
+ expected_subject_token = self.make_serialized_aws_signed_request(
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
+ )
+ token_headers = {
+ "Content-Type": "application/x-www-form-urlencoded",
+ "Authorization": "Basic " + BASIC_AUTH_ENCODING,
+ "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false source/programmatic",
+ }
+ token_request_data = {
+ "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
+ "audience": AUDIENCE,
+ "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
+ "scope": "https://www.googleapis.com/auth/iam",
+ "subject_token": expected_subject_token,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ }
+ # Service account impersonation request/response.
+ impersonation_response = {
+ "accessToken": "SA_ACCESS_TOKEN",
+ "expireTime": expire_time,
+ }
+ impersonation_headers = {
+ "Content-Type": "application/json",
+ "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
+ "x-goog-user-project": QUOTA_PROJECT_ID,
+ "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
+ "x-allowed-locations": "0x0",
+ }
+ impersonation_request_data = {
+ "delegates": None,
+ "scope": SCOPES,
+ "lifetime": "3600s",
+ }
+ request = self.make_mock_request(
+ token_status=http_client.OK,
+ token_data=self.SUCCESS_RESPONSE,
+ impersonation_status=http_client.OK,
+ impersonation_data=impersonation_response,
+ )
+
+ supplier = TestAwsSecurityCredentialsSupplier(
+ security_credentials=aws.AwsSecurityCredentials(
+ ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN
+ ),
+ region=self.AWS_REGION,
+ )
+
+ credentials = self.make_credentials(
+ client_id=CLIENT_ID,
+ client_secret=CLIENT_SECRET,
+ aws_security_credentials_supplier=supplier,
+ service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
+ quota_project_id=QUOTA_PROJECT_ID,
+ scopes=SCOPES,
+ # Default scopes should be ignored.
+ default_scopes=["ignored"],
+ )
+
+ credentials.refresh(request)
+
+ assert len(request.call_args_list) == 2
+ # First request should be sent to GCP STS endpoint.
+ self.assert_token_request_kwargs(
+ request.call_args_list[0][1], token_headers, token_request_data
+ )
+ # Second request should be sent to iamcredentials endpoint for service
+ # account impersonation.
+ self.assert_impersonation_request_kwargs(
+ request.call_args_list[1][1],
+ impersonation_headers,
+ impersonation_request_data,
+ )
+ assert credentials.token == impersonation_response["accessToken"]
+ assert credentials.quota_project_id == QUOTA_PROJECT_ID
+ assert credentials.scopes == SCOPES
+ assert credentials.default_scopes == ["ignored"]
+
+ @mock.patch(
+ "google.auth.metrics.python_and_auth_lib_version",
+ return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
+ )
+ @mock.patch("google.auth._helpers.utcnow")
+ def test_refresh_success_with_supplier(self, utcnow, mock_auth_lib_value):
+ utcnow.return_value = datetime.datetime.strptime(
+ self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
+ )
+ expected_subject_token = self.make_serialized_aws_signed_request(
+ aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
+ )
+ token_headers = {
+ "Content-Type": "application/x-www-form-urlencoded",
+ "Authorization": "Basic " + BASIC_AUTH_ENCODING,
+ "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false source/programmatic",
+ }
+ token_request_data = {
+ "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
+ "audience": AUDIENCE,
+ "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
+ "scope": " ".join(SCOPES),
+ "subject_token": expected_subject_token,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ }
+ request = self.make_mock_request(
+ token_status=http_client.OK, token_data=self.SUCCESS_RESPONSE
+ )
+
+ supplier = TestAwsSecurityCredentialsSupplier(
+ security_credentials=aws.AwsSecurityCredentials(
+ ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN
+ ),
+ region=self.AWS_REGION,
+ )
+
+ credentials = self.make_credentials(
+ client_id=CLIENT_ID,
+ client_secret=CLIENT_SECRET,
+ aws_security_credentials_supplier=supplier,
+ quota_project_id=QUOTA_PROJECT_ID,
+ scopes=SCOPES,
+ # Default scopes should be ignored.
+ default_scopes=["ignored"],
+ )
+
+ credentials.refresh(request)
+
+ assert len(request.call_args_list) == 1
+ # First request should be sent to GCP STS endpoint.
+ self.assert_token_request_kwargs(
+ request.call_args_list[0][1], token_headers, token_request_data
+ )
+ assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
+ assert credentials.quota_project_id == QUOTA_PROJECT_ID
+ assert credentials.scopes == SCOPES
+ assert credentials.default_scopes == ["ignored"]