summaryrefslogtreecommitdiffstats
path: root/contrib/python/google-auth/py3/tests
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2025-02-08 20:17:29 +0300
committerrobot-piglet <[email protected]>2025-02-08 20:32:11 +0300
commit6b7c255668de517dff6462bd377d345d240f8a67 (patch)
tree1bde953b7f2b6d9e8efd72a0ceebfa0a791a024c /contrib/python/google-auth/py3/tests
parent2309a9980fd82ba7df5a21876c790e7e4d776ded (diff)
Intermediate changes
commit_hash:f4cb1bdccfb534d71b7f461fc8f8e5656c47bfa5
Diffstat (limited to 'contrib/python/google-auth/py3/tests')
-rw-r--r--contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py24
-rw-r--r--contrib/python/google-auth/py3/tests/test_impersonated_credentials.py120
2 files changed, 142 insertions, 2 deletions
diff --git a/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py b/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py
index 03ba8de497d..a768b17fa0d 100644
--- a/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py
+++ b/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py
@@ -346,12 +346,32 @@ def test_get_return_none_for_not_found_error():
@mock.patch("time.sleep", return_value=None)
def test_get_failure_connection_failed(mock_sleep):
request = make_request("")
- request.side_effect = exceptions.TransportError()
+ request.side_effect = exceptions.TransportError("failure message")
with pytest.raises(exceptions.TransportError) as excinfo:
_metadata.get(request, PATH)
- assert excinfo.match(r"Compute Engine Metadata server unavailable")
+ assert excinfo.match(
+ r"Compute Engine Metadata server unavailable due to failure message"
+ )
+
+ request.assert_called_with(
+ method="GET",
+ url=_metadata._METADATA_ROOT + PATH,
+ headers=_metadata._METADATA_HEADERS,
+ )
+ assert request.call_count == 5
+
+
+def test_get_too_many_requests_retryable_error_failure():
+ request = make_request("too many requests", status=http_client.TOO_MANY_REQUESTS)
+
+ with pytest.raises(exceptions.TransportError) as excinfo:
+ _metadata.get(request, PATH)
+
+ assert excinfo.match(
+ r"Compute Engine Metadata server unavailable due to too many requests"
+ )
request.assert_called_with(
method="GET",
diff --git a/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py b/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py
index 371477b8a9f..0321a1a1d7b 100644
--- a/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py
+++ b/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py
@@ -72,6 +72,17 @@ def mock_donor_credentials():
yield grant
+def mock_dwd_credentials():
+ with mock.patch("google.oauth2._client.jwt_grant", autospec=True) as grant:
+ grant.return_value = (
+ "1/fFAGRNJasdfz70BzhT3Zg",
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ {},
+ )
+ yield grant
+
+
class MockResponse:
def __init__(self, json_data, status_code):
self.json_data = json_data
@@ -124,6 +135,7 @@ class TestImpersonatedCredentials(object):
source_credentials=SOURCE_CREDENTIALS,
lifetime=LIFETIME,
target_principal=TARGET_PRINCIPAL,
+ subject=None,
iam_endpoint_override=None,
):
@@ -133,6 +145,7 @@ class TestImpersonatedCredentials(object):
target_scopes=self.TARGET_SCOPES,
delegates=self.DELEGATES,
lifetime=lifetime,
+ subject=subject,
iam_endpoint_override=iam_endpoint_override,
)
@@ -240,6 +253,28 @@ class TestImpersonatedCredentials(object):
)
@pytest.mark.parametrize("use_data_bytes", [True, False])
+ def test_refresh_with_subject_success(self, use_data_bytes, mock_dwd_credentials):
+ credentials = self.make_credentials(subject="[email protected]", lifetime=None)
+
+ response_body = {"signedJwt": "example_signed_jwt"}
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.OK,
+ use_data_bytes=use_data_bytes,
+ )
+
+ with mock.patch(
+ "google.auth.metrics.token_request_access_token_impersonate",
+ return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
+ ):
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert not credentials.expired
+ assert credentials.token == "1/fFAGRNJasdfz70BzhT3Zg"
+
+ @pytest.mark.parametrize("use_data_bytes", [True, False])
def test_refresh_success_nonGdu(self, use_data_bytes, mock_donor_credentials):
source_credentials = service_account.Credentials(
SIGNER, "[email protected]", TOKEN_URI, universe_domain="foo.bar"
@@ -419,6 +454,33 @@ class TestImpersonatedCredentials(object):
assert not credentials.valid
assert credentials.expired
+ def test_refresh_failure_subject_with_nondefault_domain(
+ self, mock_donor_credentials
+ ):
+ source_credentials = service_account.Credentials(
+ SIGNER, "[email protected]", TOKEN_URI, universe_domain="foo.bar"
+ )
+ credentials = self.make_credentials(
+ source_credentials=source_credentials, subject="[email protected]"
+ )
+
+ expire_time = (_helpers.utcnow().replace(microsecond=0)).isoformat("T") + "Z"
+ response_body = {"accessToken": "token", "expireTime": expire_time}
+ request = self.make_request(
+ data=json.dumps(response_body), status=http_client.OK
+ )
+
+ with pytest.raises(exceptions.GoogleAuthError) as excinfo:
+ credentials.refresh(request)
+
+ assert excinfo.match(
+ "Domain-wide delegation is not supported in universes other "
+ + "than googleapis.com"
+ )
+
+ assert not credentials.valid
+ assert credentials.expired
+
def test_expired(self):
credentials = self.make_credentials(lifetime=None)
assert credentials.expired
@@ -811,3 +873,61 @@ class TestImpersonatedCredentials(object):
id_creds.refresh(request)
assert id_creds.quota_project_id == "project-foo"
+
+ def test_sign_jwt_request_success(self):
+ principal = "[email protected]"
+ expected_signed_jwt = "correct_signed_jwt"
+
+ response_body = {"keyId": "1", "signedJwt": expected_signed_jwt}
+ request = self.make_request(
+ data=json.dumps(response_body), status=http_client.OK
+ )
+
+ signed_jwt = impersonated_credentials._sign_jwt_request(
+ request=request, principal=principal, headers={}, payload={}
+ )
+
+ assert signed_jwt == expected_signed_jwt
+ request.assert_called_once_with(
+ url="https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]:signJwt",
+ method="POST",
+ headers={},
+ body=json.dumps({"delegates": [], "payload": json.dumps({})}).encode(
+ "utf-8"
+ ),
+ )
+
+ def test_sign_jwt_request_http_error(self):
+ principal = "[email protected]"
+
+ request = self.make_request(
+ data="error_message", status=http_client.BAD_REQUEST
+ )
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ _ = impersonated_credentials._sign_jwt_request(
+ request=request, principal=principal, headers={}, payload={}
+ )
+
+ assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
+
+ assert excinfo.value.args[0] == "Unable to acquire impersonated credentials"
+ assert excinfo.value.args[1] == "error_message"
+
+ def test_sign_jwt_request_invalid_response_error(self):
+ principal = "[email protected]"
+
+ request = self.make_request(data="invalid_data", status=http_client.OK)
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ _ = impersonated_credentials._sign_jwt_request(
+ request=request, principal=principal, headers={}, payload={}
+ )
+
+ assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
+
+ assert (
+ excinfo.value.args[0]
+ == "Unable to acquire impersonated credentials: No signed JWT in response."
+ )
+ assert excinfo.value.args[1] == "invalid_data"