diff options
author | robot-piglet <[email protected]> | 2025-02-08 20:17:29 +0300 |
---|---|---|
committer | robot-piglet <[email protected]> | 2025-02-08 20:32:11 +0300 |
commit | 6b7c255668de517dff6462bd377d345d240f8a67 (patch) | |
tree | 1bde953b7f2b6d9e8efd72a0ceebfa0a791a024c /contrib/python/google-auth/py3/tests | |
parent | 2309a9980fd82ba7df5a21876c790e7e4d776ded (diff) |
Intermediate changes
commit_hash:f4cb1bdccfb534d71b7f461fc8f8e5656c47bfa5
Diffstat (limited to 'contrib/python/google-auth/py3/tests')
-rw-r--r-- | contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py | 24 | ||||
-rw-r--r-- | contrib/python/google-auth/py3/tests/test_impersonated_credentials.py | 120 |
2 files changed, 142 insertions, 2 deletions
diff --git a/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py b/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py index 03ba8de497d..a768b17fa0d 100644 --- a/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py +++ b/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py @@ -346,12 +346,32 @@ def test_get_return_none_for_not_found_error(): @mock.patch("time.sleep", return_value=None) def test_get_failure_connection_failed(mock_sleep): request = make_request("") - request.side_effect = exceptions.TransportError() + request.side_effect = exceptions.TransportError("failure message") with pytest.raises(exceptions.TransportError) as excinfo: _metadata.get(request, PATH) - assert excinfo.match(r"Compute Engine Metadata server unavailable") + assert excinfo.match( + r"Compute Engine Metadata server unavailable due to failure message" + ) + + request.assert_called_with( + method="GET", + url=_metadata._METADATA_ROOT + PATH, + headers=_metadata._METADATA_HEADERS, + ) + assert request.call_count == 5 + + +def test_get_too_many_requests_retryable_error_failure(): + request = make_request("too many requests", status=http_client.TOO_MANY_REQUESTS) + + with pytest.raises(exceptions.TransportError) as excinfo: + _metadata.get(request, PATH) + + assert excinfo.match( + r"Compute Engine Metadata server unavailable due to too many requests" + ) request.assert_called_with( method="GET", diff --git a/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py b/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py index 371477b8a9f..0321a1a1d7b 100644 --- a/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py +++ b/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py @@ -72,6 +72,17 @@ def mock_donor_credentials(): yield grant +def mock_dwd_credentials(): + with mock.patch("google.oauth2._client.jwt_grant", autospec=True) as grant: + grant.return_value = ( + "1/fFAGRNJasdfz70BzhT3Zg", + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + yield grant + + class MockResponse: def __init__(self, json_data, status_code): self.json_data = json_data @@ -124,6 +135,7 @@ class TestImpersonatedCredentials(object): source_credentials=SOURCE_CREDENTIALS, lifetime=LIFETIME, target_principal=TARGET_PRINCIPAL, + subject=None, iam_endpoint_override=None, ): @@ -133,6 +145,7 @@ class TestImpersonatedCredentials(object): target_scopes=self.TARGET_SCOPES, delegates=self.DELEGATES, lifetime=lifetime, + subject=subject, iam_endpoint_override=iam_endpoint_override, ) @@ -240,6 +253,28 @@ class TestImpersonatedCredentials(object): ) @pytest.mark.parametrize("use_data_bytes", [True, False]) + def test_refresh_with_subject_success(self, use_data_bytes, mock_dwd_credentials): + credentials = self.make_credentials(subject="[email protected]", lifetime=None) + + response_body = {"signedJwt": "example_signed_jwt"} + + request = self.make_request( + data=json.dumps(response_body), + status=http_client.OK, + use_data_bytes=use_data_bytes, + ) + + with mock.patch( + "google.auth.metrics.token_request_access_token_impersonate", + return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, + ): + credentials.refresh(request) + + assert credentials.valid + assert not credentials.expired + assert credentials.token == "1/fFAGRNJasdfz70BzhT3Zg" + + @pytest.mark.parametrize("use_data_bytes", [True, False]) def test_refresh_success_nonGdu(self, use_data_bytes, mock_donor_credentials): source_credentials = service_account.Credentials( SIGNER, "[email protected]", TOKEN_URI, universe_domain="foo.bar" @@ -419,6 +454,33 @@ class TestImpersonatedCredentials(object): assert not credentials.valid assert credentials.expired + def test_refresh_failure_subject_with_nondefault_domain( + self, mock_donor_credentials + ): + source_credentials = service_account.Credentials( + SIGNER, "[email protected]", TOKEN_URI, universe_domain="foo.bar" + ) + credentials = self.make_credentials( + source_credentials=source_credentials, subject="[email protected]" + ) + + expire_time = (_helpers.utcnow().replace(microsecond=0)).isoformat("T") + "Z" + response_body = {"accessToken": "token", "expireTime": expire_time} + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + + with pytest.raises(exceptions.GoogleAuthError) as excinfo: + credentials.refresh(request) + + assert excinfo.match( + "Domain-wide delegation is not supported in universes other " + + "than googleapis.com" + ) + + assert not credentials.valid + assert credentials.expired + def test_expired(self): credentials = self.make_credentials(lifetime=None) assert credentials.expired @@ -811,3 +873,61 @@ class TestImpersonatedCredentials(object): id_creds.refresh(request) assert id_creds.quota_project_id == "project-foo" + + def test_sign_jwt_request_success(self): + principal = "[email protected]" + expected_signed_jwt = "correct_signed_jwt" + + response_body = {"keyId": "1", "signedJwt": expected_signed_jwt} + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + + signed_jwt = impersonated_credentials._sign_jwt_request( + request=request, principal=principal, headers={}, payload={} + ) + + assert signed_jwt == expected_signed_jwt + request.assert_called_once_with( + url="https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]:signJwt", + method="POST", + headers={}, + body=json.dumps({"delegates": [], "payload": json.dumps({})}).encode( + "utf-8" + ), + ) + + def test_sign_jwt_request_http_error(self): + principal = "[email protected]" + + request = self.make_request( + data="error_message", status=http_client.BAD_REQUEST + ) + + with pytest.raises(exceptions.RefreshError) as excinfo: + _ = impersonated_credentials._sign_jwt_request( + request=request, principal=principal, headers={}, payload={} + ) + + assert excinfo.match(impersonated_credentials._REFRESH_ERROR) + + assert excinfo.value.args[0] == "Unable to acquire impersonated credentials" + assert excinfo.value.args[1] == "error_message" + + def test_sign_jwt_request_invalid_response_error(self): + principal = "[email protected]" + + request = self.make_request(data="invalid_data", status=http_client.OK) + + with pytest.raises(exceptions.RefreshError) as excinfo: + _ = impersonated_credentials._sign_jwt_request( + request=request, principal=principal, headers={}, payload={} + ) + + assert excinfo.match(impersonated_credentials._REFRESH_ERROR) + + assert ( + excinfo.value.args[0] + == "Unable to acquire impersonated credentials: No signed JWT in response." + ) + assert excinfo.value.args[1] == "invalid_data" |