summaryrefslogtreecommitdiffstats
path: root/contrib/python/google-auth/py3/tests/compute_engine
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2025-10-14 13:58:45 +0300
committerrobot-piglet <[email protected]>2025-10-14 14:26:06 +0300
commit09cc5fe0eb0747ac9ce1444c9acc944838a8cfa2 (patch)
tree61e19f6a3c904d77e58ff647f4c9473378d6954b /contrib/python/google-auth/py3/tests/compute_engine
parente9146d8a4d0ee112c89906f9fc8ce23b92250439 (diff)
Intermediate changes
commit_hash:bc75ab7ba0ee5a6571045c99062e8d4a996d16dd
Diffstat (limited to 'contrib/python/google-auth/py3/tests/compute_engine')
-rw-r--r--contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py384
1 files changed, 381 insertions, 3 deletions
diff --git a/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py b/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py
index 03fe845b1f3..1c77069938d 100644
--- a/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py
+++ b/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py
@@ -13,17 +13,20 @@
# limitations under the License.
import base64
import datetime
+import os
import mock
import pytest # type: ignore
import responses # type: ignore
from google.auth import _helpers
+from google.auth import environment_vars
from google.auth import exceptions
from google.auth import jwt
from google.auth import transport
from google.auth.compute_engine import credentials
from google.auth.transport import requests
+from google.oauth2 import _client
SAMPLE_ID_TOKEN_EXP = 1584393400
@@ -49,7 +52,6 @@ ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
ID_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
"gl-python/3.7 auth/1.1 auth-request-type/it cred-type/mds"
)
-
FAKE_SERVICE_ACCOUNT_EMAIL = "[email protected]"
FAKE_QUOTA_PROJECT_ID = "fake-quota-project"
FAKE_SCOPES = ["scope1", "scope2"]
@@ -60,6 +62,9 @@ FAKE_UNIVERSE_DOMAIN = "fake-universe-domain"
class TestCredentials(object):
credentials = None
credentials_with_all_fields = None
+ VALID_TRUST_BOUNDARY = {"encodedLocations": "valid-encoded-locations"}
+ NO_OP_TRUST_BOUNDARY = {"encodedLocations": ""}
+ EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/default/allowedLocations"
@pytest.fixture(autouse=True)
def credentials_fixture(self):
@@ -174,6 +179,18 @@ class TestCredentials(object):
}
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ def test_refresh_no_email(self, get):
+ get.return_value = {
+ # No "email" field.
+ "scopes": ["one", "two"]
+ }
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ self.credentials.refresh(None)
+
+ assert excinfo.match(r"missing 'email' field")
+
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_error(self, get):
get.side_effect = exceptions.TransportError("http error")
@@ -241,6 +258,18 @@ class TestCredentials(object):
assert creds.universe_domain == "universe_domain"
assert creds._universe_domain_cached
+ def test_with_trust_boundary(self):
+ creds = self.credentials_with_all_fields
+ new_boundary = {"encodedLocations": "new_boundary"}
+ new_creds = creds.with_trust_boundary(new_boundary)
+
+ assert new_creds is not creds
+ assert new_creds._trust_boundary == new_boundary
+ assert new_creds._service_account_email == creds._service_account_email
+ assert new_creds._quota_project_id == creds._quota_project_id
+ assert new_creds._scopes == creds._scopes
+ assert new_creds._default_scopes == creds._default_scopes
+
def test_token_usage_metrics(self):
self.credentials.token = "token"
self.credentials.expiry = None
@@ -280,6 +309,355 @@ class TestCredentials(object):
# domain endpoint.
get_universe_domain.assert_not_called()
+ @mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True)
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ def test_refresh_trust_boundary_lookup_skipped_if_env_var_not_true(
+ self, mock_metadata_get, mock_lookup_tb
+ ):
+ creds = self.credentials
+ request = mock.Mock()
+
+ mock_metadata_get.side_effect = [
+ # from _retrieve_info
+ {"email": "default", "scopes": ["scope1"]},
+ # from get_service_account_token
+ {"access_token": "mock_token", "expires_in": 3600},
+ ]
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "false"}
+ ):
+ creds.refresh(request)
+
+ mock_lookup_tb.assert_not_called()
+ assert creds._trust_boundary is None
+
+ @mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True)
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ def test_refresh_trust_boundary_lookup_skipped_if_env_var_missing(
+ self, mock_metadata_get, mock_lookup_tb
+ ):
+ creds = self.credentials
+ request = mock.Mock()
+
+ mock_metadata_get.side_effect = [
+ # from _retrieve_info
+ {"email": "default", "scopes": ["scope1"]},
+ # from get_service_account_token
+ {"access_token": "mock_token", "expires_in": 3600},
+ ]
+
+ with mock.patch.dict(os.environ, clear=True):
+ creds.refresh(request)
+
+ mock_lookup_tb.assert_not_called()
+ assert creds._trust_boundary is None
+
+ @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ def test_refresh_trust_boundary_lookup_success(
+ self, mock_metadata_get, mock_lookup_tb
+ ):
+ mock_lookup_tb.return_value = {
+ "locations": ["us-central1"],
+ "encodedLocations": "0xABC",
+ }
+ creds = self.credentials
+ request = mock.Mock()
+
+ # The first call to _metadata.get is for service account info, the second
+ # for the access token, and the third for the universe domain.
+ mock_metadata_get.side_effect = [
+ # from _retrieve_info
+ {"email": "[email protected]", "scopes": ["scope1"]},
+ # from get_service_account_token
+ {"access_token": "mock_token", "expires_in": 3600},
+ # from get_universe_domain
+ "",
+ ]
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ creds.refresh(request)
+
+ # Verify _metadata.get was called three times.
+ assert mock_metadata_get.call_count == 3
+ # Verify lookup_trust_boundary was called with correct URL and token
+ expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations"
+ mock_lookup_tb.assert_called_once_with(
+ request, expected_url, headers={"authorization": "Bearer mock_token"}
+ )
+ # Verify trust boundary was set
+ assert creds._trust_boundary == {
+ "locations": ["us-central1"],
+ "encodedLocations": "0xABC",
+ }
+
+ # Verify x-allowed-locations header is set by apply()
+ headers_applied = {}
+ creds.apply(headers_applied)
+ assert headers_applied["x-allowed-locations"] == "0xABC"
+
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
+ def test_refresh_trust_boundary_lookup_fails_no_cache(
+ self, mock_lookup_tb, mock_metadata_get
+ ):
+ mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed")
+ creds = self.credentials
+ request = mock.Mock()
+
+ # Mock metadata calls for token, universe domain, and service account info
+ mock_metadata_get.side_effect = [
+ # from _retrieve_info
+ {"email": "[email protected]", "scopes": ["scope1"]},
+ # from get_service_account_token
+ {"access_token": "mock_token", "expires_in": 3600},
+ # from get_universe_domain
+ "",
+ ]
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ with pytest.raises(exceptions.RefreshError, match="Lookup failed"):
+ creds.refresh(request)
+
+ assert creds._trust_boundary is None
+ assert mock_metadata_get.call_count == 3
+ mock_lookup_tb.assert_called_once()
+
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
+ def test_refresh_trust_boundary_lookup_fails_with_cached_data(
+ self, mock_lookup_tb, mock_metadata_get
+ ):
+ # First refresh: Successfully fetch a valid trust boundary.
+ mock_lookup_tb.return_value = {
+ "locations": ["us-central1"],
+ "encodedLocations": "0xABC",
+ }
+ mock_metadata_get.side_effect = [
+ # from _retrieve_info
+ {"email": "[email protected]", "scopes": ["scope1"]},
+ # from get_service_account_token
+ {"access_token": "mock_token_1", "expires_in": 3600},
+ # from get_universe_domain
+ "",
+ ]
+ creds = self.credentials
+ request = mock.Mock()
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ creds.refresh(request)
+
+ assert creds._trust_boundary == {
+ "locations": ["us-central1"],
+ "encodedLocations": "0xABC",
+ }
+ mock_lookup_tb.assert_called_once()
+
+ # Second refresh: Mock lookup to fail, but expect cached data to be preserved.
+ mock_lookup_tb.reset_mock()
+ mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed")
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ # This refresh should not raise an error because a cached value exists.
+ mock_metadata_get.reset_mock()
+ mock_metadata_get.side_effect = [
+ # from _retrieve_info
+ {"email": "[email protected]", "scopes": ["scope1"]},
+ # from get_service_account_token
+ {"access_token": "mock_token_2", "expires_in": 3600},
+ # from get_universe_domain
+ "",
+ ]
+ creds.refresh(request)
+
+ assert creds._trust_boundary == {
+ "locations": ["us-central1"],
+ "encodedLocations": "0xABC",
+ }
+ mock_lookup_tb.assert_called_once()
+
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
+ def test_refresh_fetches_no_op_trust_boundary(
+ self, mock_lookup_tb, mock_metadata_get
+ ):
+ mock_lookup_tb.return_value = {"locations": [], "encodedLocations": "0x0"}
+ creds = self.credentials
+ request = mock.Mock()
+
+ mock_metadata_get.side_effect = [
+ # from _retrieve_info
+ {"email": "[email protected]", "scopes": ["scope1"]},
+ # from get_service_account_token
+ {"access_token": "mock_token", "expires_in": 3600},
+ # from get_universe_domain
+ "",
+ ]
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ creds.refresh(request)
+
+ assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"}
+ assert mock_metadata_get.call_count == 3
+ expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations"
+ mock_lookup_tb.assert_called_once_with(
+ request, expected_url, headers={"authorization": "Bearer mock_token"}
+ )
+ # Verify that an empty header was added.
+ headers_applied = {}
+ creds.apply(headers_applied)
+ assert headers_applied["x-allowed-locations"] == ""
+
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
+ def test_refresh_starts_with_no_op_trust_boundary_skips_lookup(
+ self, mock_lookup_tb, mock_metadata_get
+ ):
+ creds = self.credentials
+ # Use pre-cache universe domain to avoid an extra metadata call.
+ creds._universe_domain_cached = True
+ creds._trust_boundary = {"locations": [], "encodedLocations": "0x0"}
+ request = mock.Mock()
+
+ mock_metadata_get.side_effect = [
+ # from _retrieve_info
+ {"email": "[email protected]", "scopes": ["scope1"]},
+ # from get_service_account_token
+ {"access_token": "mock_token", "expires_in": 3600},
+ ]
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ creds.refresh(request)
+
+ # Verify trust boundary remained NO_OP
+ assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"}
+ # Lookup should be skipped
+ mock_lookup_tb.assert_not_called()
+ # Two metadata calls for token refresh should have happened.
+ assert mock_metadata_get.call_count == 2
+
+ # Verify that an empty header was added.
+ headers_applied = {}
+ creds.apply(headers_applied)
+ assert headers_applied["x-allowed-locations"] == ""
+
+ @mock.patch(
+ "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
+ )
+ @mock.patch(
+ "google.auth.compute_engine._metadata.get_universe_domain", autospec=True
+ )
+ def test_build_trust_boundary_lookup_url_default_email(
+ self, mock_get_universe_domain, mock_get_service_account_info
+ ):
+ # Test with default service account email, which needs resolution
+ creds = self.credentials
+ creds._service_account_email = "default"
+ mock_get_service_account_info.return_value = {
+ "email": "[email protected]"
+ }
+ mock_get_universe_domain.return_value = "googleapis.com"
+
+ url = creds._build_trust_boundary_lookup_url()
+
+ mock_get_service_account_info.assert_called_once_with(mock.ANY, "default")
+ mock_get_universe_domain.assert_called_once_with(mock.ANY)
+ assert url == (
+ "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations"
+ )
+
+ @mock.patch(
+ "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
+ )
+ @mock.patch(
+ "google.auth.compute_engine._metadata.get_universe_domain", autospec=True
+ )
+ def test_build_trust_boundary_lookup_url_explicit_email(
+ self, mock_get_universe_domain, mock_get_service_account_info
+ ):
+ # Test with an explicit service account email, no resolution needed
+ creds = self.credentials
+ creds._service_account_email = FAKE_SERVICE_ACCOUNT_EMAIL
+ mock_get_universe_domain.return_value = "googleapis.com"
+
+ url = creds._build_trust_boundary_lookup_url()
+
+ mock_get_service_account_info.assert_not_called()
+ mock_get_universe_domain.assert_called_once_with(mock.ANY)
+ assert url == (
+ "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations"
+ )
+
+ @mock.patch(
+ "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
+ )
+ @mock.patch(
+ "google.auth.compute_engine._metadata.get_universe_domain", autospec=True
+ )
+ def test_build_trust_boundary_lookup_url_non_default_universe(
+ self, mock_get_universe_domain, mock_get_service_account_info
+ ):
+ # Test with a non-default universe domain
+ creds = self.credentials_with_all_fields
+
+ url = creds._build_trust_boundary_lookup_url()
+
+ # Universe domain is cached and email is explicit, so no metadata calls needed.
+ mock_get_service_account_info.assert_not_called()
+ mock_get_universe_domain.assert_not_called()
+ assert url == (
+ "https://iamcredentials.fake-universe-domain/v1/projects/-/serviceAccounts/[email protected]/allowedLocations"
+ )
+
+ @mock.patch(
+ "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
+ )
+ def test_build_trust_boundary_lookup_url_get_service_account_info_fails(
+ self, mock_get_service_account_info
+ ):
+ # Test scenario where get_service_account_info fails
+ mock_get_service_account_info.side_effect = exceptions.TransportError(
+ "Failed to get info"
+ )
+ creds = self.credentials
+ creds._service_account_email = "default"
+
+ with pytest.raises(
+ exceptions.RefreshError,
+ match=r"Failed to get service account email for trust boundary lookup: .*",
+ ):
+ creds._build_trust_boundary_lookup_url()
+
+ @mock.patch(
+ "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
+ )
+ def test_build_trust_boundary_lookup_url_no_email(
+ self, mock_get_service_account_info
+ ):
+ # Test with default service account email, which needs resolution, but metadata
+ # returns no email.
+ creds = self.credentials
+ creds._service_account_email = "default"
+ mock_get_service_account_info.return_value = {"scopes": ["one", "two"]}
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ creds._build_trust_boundary_lookup_url()
+
+ assert excinfo.match(r"missing 'email' field")
+
class TestIDTokenCredentials(object):
credentials = None
@@ -466,7 +844,7 @@ class TestIDTokenCredentials(object):
@responses.activate
def test_with_target_audience_integration(self):
- """ Test that it is possible to refresh credentials
+ """Test that it is possible to refresh credentials
generated from `with_target_audience`.
Instead of mocking the methods, the HTTP responses
@@ -634,7 +1012,7 @@ class TestIDTokenCredentials(object):
@responses.activate
def test_with_quota_project_integration(self):
- """ Test that it is possible to refresh credentials
+ """Test that it is possible to refresh credentials
generated from `with_quota_project`.
Instead of mocking the methods, the HTTP responses