diff options
author | robot-piglet <[email protected]> | 2025-10-14 13:58:45 +0300 |
---|---|---|
committer | robot-piglet <[email protected]> | 2025-10-14 14:26:06 +0300 |
commit | 09cc5fe0eb0747ac9ce1444c9acc944838a8cfa2 (patch) | |
tree | 61e19f6a3c904d77e58ff647f4c9473378d6954b /contrib/python/google-auth/py3/tests/compute_engine | |
parent | e9146d8a4d0ee112c89906f9fc8ce23b92250439 (diff) |
Intermediate changes
commit_hash:bc75ab7ba0ee5a6571045c99062e8d4a996d16dd
Diffstat (limited to 'contrib/python/google-auth/py3/tests/compute_engine')
-rw-r--r-- | contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py | 384 |
1 files changed, 381 insertions, 3 deletions
diff --git a/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py b/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py index 03fe845b1f3..1c77069938d 100644 --- a/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py +++ b/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py @@ -13,17 +13,20 @@ # limitations under the License. import base64 import datetime +import os import mock import pytest # type: ignore import responses # type: ignore from google.auth import _helpers +from google.auth import environment_vars from google.auth import exceptions from google.auth import jwt from google.auth import transport from google.auth.compute_engine import credentials from google.auth.transport import requests +from google.oauth2 import _client SAMPLE_ID_TOKEN_EXP = 1584393400 @@ -49,7 +52,6 @@ ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = ( ID_TOKEN_REQUEST_METRICS_HEADER_VALUE = ( "gl-python/3.7 auth/1.1 auth-request-type/it cred-type/mds" ) - FAKE_SERVICE_ACCOUNT_EMAIL = "[email protected]" FAKE_QUOTA_PROJECT_ID = "fake-quota-project" FAKE_SCOPES = ["scope1", "scope2"] @@ -60,6 +62,9 @@ FAKE_UNIVERSE_DOMAIN = "fake-universe-domain" class TestCredentials(object): credentials = None credentials_with_all_fields = None + VALID_TRUST_BOUNDARY = {"encodedLocations": "valid-encoded-locations"} + NO_OP_TRUST_BOUNDARY = {"encodedLocations": ""} + EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/default/allowedLocations" @pytest.fixture(autouse=True) def credentials_fixture(self): @@ -174,6 +179,18 @@ class TestCredentials(object): } @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_refresh_no_email(self, get): + get.return_value = { + # No "email" field. + "scopes": ["one", "two"] + } + + with pytest.raises(exceptions.RefreshError) as excinfo: + self.credentials.refresh(None) + + assert excinfo.match(r"missing 'email' field") + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) def test_refresh_error(self, get): get.side_effect = exceptions.TransportError("http error") @@ -241,6 +258,18 @@ class TestCredentials(object): assert creds.universe_domain == "universe_domain" assert creds._universe_domain_cached + def test_with_trust_boundary(self): + creds = self.credentials_with_all_fields + new_boundary = {"encodedLocations": "new_boundary"} + new_creds = creds.with_trust_boundary(new_boundary) + + assert new_creds is not creds + assert new_creds._trust_boundary == new_boundary + assert new_creds._service_account_email == creds._service_account_email + assert new_creds._quota_project_id == creds._quota_project_id + assert new_creds._scopes == creds._scopes + assert new_creds._default_scopes == creds._default_scopes + def test_token_usage_metrics(self): self.credentials.token = "token" self.credentials.expiry = None @@ -280,6 +309,355 @@ class TestCredentials(object): # domain endpoint. get_universe_domain.assert_not_called() + @mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_refresh_trust_boundary_lookup_skipped_if_env_var_not_true( + self, mock_metadata_get, mock_lookup_tb + ): + creds = self.credentials + request = mock.Mock() + + mock_metadata_get.side_effect = [ + # from _retrieve_info + {"email": "default", "scopes": ["scope1"]}, + # from get_service_account_token + {"access_token": "mock_token", "expires_in": 3600}, + ] + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "false"} + ): + creds.refresh(request) + + mock_lookup_tb.assert_not_called() + assert creds._trust_boundary is None + + @mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_refresh_trust_boundary_lookup_skipped_if_env_var_missing( + self, mock_metadata_get, mock_lookup_tb + ): + creds = self.credentials + request = mock.Mock() + + mock_metadata_get.side_effect = [ + # from _retrieve_info + {"email": "default", "scopes": ["scope1"]}, + # from get_service_account_token + {"access_token": "mock_token", "expires_in": 3600}, + ] + + with mock.patch.dict(os.environ, clear=True): + creds.refresh(request) + + mock_lookup_tb.assert_not_called() + assert creds._trust_boundary is None + + @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_refresh_trust_boundary_lookup_success( + self, mock_metadata_get, mock_lookup_tb + ): + mock_lookup_tb.return_value = { + "locations": ["us-central1"], + "encodedLocations": "0xABC", + } + creds = self.credentials + request = mock.Mock() + + # The first call to _metadata.get is for service account info, the second + # for the access token, and the third for the universe domain. + mock_metadata_get.side_effect = [ + # from _retrieve_info + {"email": "[email protected]", "scopes": ["scope1"]}, + # from get_service_account_token + {"access_token": "mock_token", "expires_in": 3600}, + # from get_universe_domain + "", + ] + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + creds.refresh(request) + + # Verify _metadata.get was called three times. + assert mock_metadata_get.call_count == 3 + # Verify lookup_trust_boundary was called with correct URL and token + expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations" + mock_lookup_tb.assert_called_once_with( + request, expected_url, headers={"authorization": "Bearer mock_token"} + ) + # Verify trust boundary was set + assert creds._trust_boundary == { + "locations": ["us-central1"], + "encodedLocations": "0xABC", + } + + # Verify x-allowed-locations header is set by apply() + headers_applied = {} + creds.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "0xABC" + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) + def test_refresh_trust_boundary_lookup_fails_no_cache( + self, mock_lookup_tb, mock_metadata_get + ): + mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed") + creds = self.credentials + request = mock.Mock() + + # Mock metadata calls for token, universe domain, and service account info + mock_metadata_get.side_effect = [ + # from _retrieve_info + {"email": "[email protected]", "scopes": ["scope1"]}, + # from get_service_account_token + {"access_token": "mock_token", "expires_in": 3600}, + # from get_universe_domain + "", + ] + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + with pytest.raises(exceptions.RefreshError, match="Lookup failed"): + creds.refresh(request) + + assert creds._trust_boundary is None + assert mock_metadata_get.call_count == 3 + mock_lookup_tb.assert_called_once() + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) + def test_refresh_trust_boundary_lookup_fails_with_cached_data( + self, mock_lookup_tb, mock_metadata_get + ): + # First refresh: Successfully fetch a valid trust boundary. + mock_lookup_tb.return_value = { + "locations": ["us-central1"], + "encodedLocations": "0xABC", + } + mock_metadata_get.side_effect = [ + # from _retrieve_info + {"email": "[email protected]", "scopes": ["scope1"]}, + # from get_service_account_token + {"access_token": "mock_token_1", "expires_in": 3600}, + # from get_universe_domain + "", + ] + creds = self.credentials + request = mock.Mock() + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + creds.refresh(request) + + assert creds._trust_boundary == { + "locations": ["us-central1"], + "encodedLocations": "0xABC", + } + mock_lookup_tb.assert_called_once() + + # Second refresh: Mock lookup to fail, but expect cached data to be preserved. + mock_lookup_tb.reset_mock() + mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed") + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + # This refresh should not raise an error because a cached value exists. + mock_metadata_get.reset_mock() + mock_metadata_get.side_effect = [ + # from _retrieve_info + {"email": "[email protected]", "scopes": ["scope1"]}, + # from get_service_account_token + {"access_token": "mock_token_2", "expires_in": 3600}, + # from get_universe_domain + "", + ] + creds.refresh(request) + + assert creds._trust_boundary == { + "locations": ["us-central1"], + "encodedLocations": "0xABC", + } + mock_lookup_tb.assert_called_once() + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) + def test_refresh_fetches_no_op_trust_boundary( + self, mock_lookup_tb, mock_metadata_get + ): + mock_lookup_tb.return_value = {"locations": [], "encodedLocations": "0x0"} + creds = self.credentials + request = mock.Mock() + + mock_metadata_get.side_effect = [ + # from _retrieve_info + {"email": "[email protected]", "scopes": ["scope1"]}, + # from get_service_account_token + {"access_token": "mock_token", "expires_in": 3600}, + # from get_universe_domain + "", + ] + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + creds.refresh(request) + + assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"} + assert mock_metadata_get.call_count == 3 + expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations" + mock_lookup_tb.assert_called_once_with( + request, expected_url, headers={"authorization": "Bearer mock_token"} + ) + # Verify that an empty header was added. + headers_applied = {} + creds.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "" + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) + def test_refresh_starts_with_no_op_trust_boundary_skips_lookup( + self, mock_lookup_tb, mock_metadata_get + ): + creds = self.credentials + # Use pre-cache universe domain to avoid an extra metadata call. + creds._universe_domain_cached = True + creds._trust_boundary = {"locations": [], "encodedLocations": "0x0"} + request = mock.Mock() + + mock_metadata_get.side_effect = [ + # from _retrieve_info + {"email": "[email protected]", "scopes": ["scope1"]}, + # from get_service_account_token + {"access_token": "mock_token", "expires_in": 3600}, + ] + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + creds.refresh(request) + + # Verify trust boundary remained NO_OP + assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"} + # Lookup should be skipped + mock_lookup_tb.assert_not_called() + # Two metadata calls for token refresh should have happened. + assert mock_metadata_get.call_count == 2 + + # Verify that an empty header was added. + headers_applied = {} + creds.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "" + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + @mock.patch( + "google.auth.compute_engine._metadata.get_universe_domain", autospec=True + ) + def test_build_trust_boundary_lookup_url_default_email( + self, mock_get_universe_domain, mock_get_service_account_info + ): + # Test with default service account email, which needs resolution + creds = self.credentials + creds._service_account_email = "default" + mock_get_service_account_info.return_value = { + "email": "[email protected]" + } + mock_get_universe_domain.return_value = "googleapis.com" + + url = creds._build_trust_boundary_lookup_url() + + mock_get_service_account_info.assert_called_once_with(mock.ANY, "default") + mock_get_universe_domain.assert_called_once_with(mock.ANY) + assert url == ( + "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations" + ) + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + @mock.patch( + "google.auth.compute_engine._metadata.get_universe_domain", autospec=True + ) + def test_build_trust_boundary_lookup_url_explicit_email( + self, mock_get_universe_domain, mock_get_service_account_info + ): + # Test with an explicit service account email, no resolution needed + creds = self.credentials + creds._service_account_email = FAKE_SERVICE_ACCOUNT_EMAIL + mock_get_universe_domain.return_value = "googleapis.com" + + url = creds._build_trust_boundary_lookup_url() + + mock_get_service_account_info.assert_not_called() + mock_get_universe_domain.assert_called_once_with(mock.ANY) + assert url == ( + "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations" + ) + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + @mock.patch( + "google.auth.compute_engine._metadata.get_universe_domain", autospec=True + ) + def test_build_trust_boundary_lookup_url_non_default_universe( + self, mock_get_universe_domain, mock_get_service_account_info + ): + # Test with a non-default universe domain + creds = self.credentials_with_all_fields + + url = creds._build_trust_boundary_lookup_url() + + # Universe domain is cached and email is explicit, so no metadata calls needed. + mock_get_service_account_info.assert_not_called() + mock_get_universe_domain.assert_not_called() + assert url == ( + "https://iamcredentials.fake-universe-domain/v1/projects/-/serviceAccounts/[email protected]/allowedLocations" + ) + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + def test_build_trust_boundary_lookup_url_get_service_account_info_fails( + self, mock_get_service_account_info + ): + # Test scenario where get_service_account_info fails + mock_get_service_account_info.side_effect = exceptions.TransportError( + "Failed to get info" + ) + creds = self.credentials + creds._service_account_email = "default" + + with pytest.raises( + exceptions.RefreshError, + match=r"Failed to get service account email for trust boundary lookup: .*", + ): + creds._build_trust_boundary_lookup_url() + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + def test_build_trust_boundary_lookup_url_no_email( + self, mock_get_service_account_info + ): + # Test with default service account email, which needs resolution, but metadata + # returns no email. + creds = self.credentials + creds._service_account_email = "default" + mock_get_service_account_info.return_value = {"scopes": ["one", "two"]} + + with pytest.raises(exceptions.RefreshError) as excinfo: + creds._build_trust_boundary_lookup_url() + + assert excinfo.match(r"missing 'email' field") + class TestIDTokenCredentials(object): credentials = None @@ -466,7 +844,7 @@ class TestIDTokenCredentials(object): @responses.activate def test_with_target_audience_integration(self): - """ Test that it is possible to refresh credentials + """Test that it is possible to refresh credentials generated from `with_target_audience`. Instead of mocking the methods, the HTTP responses @@ -634,7 +1012,7 @@ class TestIDTokenCredentials(object): @responses.activate def test_with_quota_project_integration(self): - """ Test that it is possible to refresh credentials + """Test that it is possible to refresh credentials generated from `with_quota_project`. Instead of mocking the methods, the HTTP responses |