summaryrefslogtreecommitdiffstats
path: root/contrib/python/google-auth/py3/tests
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2025-10-14 13:58:45 +0300
committerrobot-piglet <[email protected]>2025-10-14 14:26:06 +0300
commit09cc5fe0eb0747ac9ce1444c9acc944838a8cfa2 (patch)
tree61e19f6a3c904d77e58ff647f4c9473378d6954b /contrib/python/google-auth/py3/tests
parente9146d8a4d0ee112c89906f9fc8ce23b92250439 (diff)
Intermediate changes
commit_hash:bc75ab7ba0ee5a6571045c99062e8d4a996d16dd
Diffstat (limited to 'contrib/python/google-auth/py3/tests')
-rw-r--r--contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py384
-rw-r--r--contrib/python/google-auth/py3/tests/oauth2/test__client.py177
-rw-r--r--contrib/python/google-auth/py3/tests/oauth2/test_service_account.py293
-rw-r--r--contrib/python/google-auth/py3/tests/test__helpers.py29
-rw-r--r--contrib/python/google-auth/py3/tests/test_aws.py10
-rw-r--r--contrib/python/google-auth/py3/tests/test_credentials.py186
-rw-r--r--contrib/python/google-auth/py3/tests/test_external_account.py52
-rw-r--r--contrib/python/google-auth/py3/tests/test_identity_pool.py6
-rw-r--r--contrib/python/google-auth/py3/tests/test_impersonated_credentials.py302
9 files changed, 1361 insertions, 78 deletions
diff --git a/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py b/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py
index 03fe845b1f3..1c77069938d 100644
--- a/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py
+++ b/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py
@@ -13,17 +13,20 @@
# limitations under the License.
import base64
import datetime
+import os
import mock
import pytest # type: ignore
import responses # type: ignore
from google.auth import _helpers
+from google.auth import environment_vars
from google.auth import exceptions
from google.auth import jwt
from google.auth import transport
from google.auth.compute_engine import credentials
from google.auth.transport import requests
+from google.oauth2 import _client
SAMPLE_ID_TOKEN_EXP = 1584393400
@@ -49,7 +52,6 @@ ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
ID_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
"gl-python/3.7 auth/1.1 auth-request-type/it cred-type/mds"
)
-
FAKE_SERVICE_ACCOUNT_EMAIL = "[email protected]"
FAKE_QUOTA_PROJECT_ID = "fake-quota-project"
FAKE_SCOPES = ["scope1", "scope2"]
@@ -60,6 +62,9 @@ FAKE_UNIVERSE_DOMAIN = "fake-universe-domain"
class TestCredentials(object):
credentials = None
credentials_with_all_fields = None
+ VALID_TRUST_BOUNDARY = {"encodedLocations": "valid-encoded-locations"}
+ NO_OP_TRUST_BOUNDARY = {"encodedLocations": ""}
+ EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/default/allowedLocations"
@pytest.fixture(autouse=True)
def credentials_fixture(self):
@@ -174,6 +179,18 @@ class TestCredentials(object):
}
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ def test_refresh_no_email(self, get):
+ get.return_value = {
+ # No "email" field.
+ "scopes": ["one", "two"]
+ }
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ self.credentials.refresh(None)
+
+ assert excinfo.match(r"missing 'email' field")
+
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_error(self, get):
get.side_effect = exceptions.TransportError("http error")
@@ -241,6 +258,18 @@ class TestCredentials(object):
assert creds.universe_domain == "universe_domain"
assert creds._universe_domain_cached
+ def test_with_trust_boundary(self):
+ creds = self.credentials_with_all_fields
+ new_boundary = {"encodedLocations": "new_boundary"}
+ new_creds = creds.with_trust_boundary(new_boundary)
+
+ assert new_creds is not creds
+ assert new_creds._trust_boundary == new_boundary
+ assert new_creds._service_account_email == creds._service_account_email
+ assert new_creds._quota_project_id == creds._quota_project_id
+ assert new_creds._scopes == creds._scopes
+ assert new_creds._default_scopes == creds._default_scopes
+
def test_token_usage_metrics(self):
self.credentials.token = "token"
self.credentials.expiry = None
@@ -280,6 +309,355 @@ class TestCredentials(object):
# domain endpoint.
get_universe_domain.assert_not_called()
+ @mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True)
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ def test_refresh_trust_boundary_lookup_skipped_if_env_var_not_true(
+ self, mock_metadata_get, mock_lookup_tb
+ ):
+ creds = self.credentials
+ request = mock.Mock()
+
+ mock_metadata_get.side_effect = [
+ # from _retrieve_info
+ {"email": "default", "scopes": ["scope1"]},
+ # from get_service_account_token
+ {"access_token": "mock_token", "expires_in": 3600},
+ ]
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "false"}
+ ):
+ creds.refresh(request)
+
+ mock_lookup_tb.assert_not_called()
+ assert creds._trust_boundary is None
+
+ @mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True)
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ def test_refresh_trust_boundary_lookup_skipped_if_env_var_missing(
+ self, mock_metadata_get, mock_lookup_tb
+ ):
+ creds = self.credentials
+ request = mock.Mock()
+
+ mock_metadata_get.side_effect = [
+ # from _retrieve_info
+ {"email": "default", "scopes": ["scope1"]},
+ # from get_service_account_token
+ {"access_token": "mock_token", "expires_in": 3600},
+ ]
+
+ with mock.patch.dict(os.environ, clear=True):
+ creds.refresh(request)
+
+ mock_lookup_tb.assert_not_called()
+ assert creds._trust_boundary is None
+
+ @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ def test_refresh_trust_boundary_lookup_success(
+ self, mock_metadata_get, mock_lookup_tb
+ ):
+ mock_lookup_tb.return_value = {
+ "locations": ["us-central1"],
+ "encodedLocations": "0xABC",
+ }
+ creds = self.credentials
+ request = mock.Mock()
+
+ # The first call to _metadata.get is for service account info, the second
+ # for the access token, and the third for the universe domain.
+ mock_metadata_get.side_effect = [
+ # from _retrieve_info
+ {"email": "[email protected]", "scopes": ["scope1"]},
+ # from get_service_account_token
+ {"access_token": "mock_token", "expires_in": 3600},
+ # from get_universe_domain
+ "",
+ ]
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ creds.refresh(request)
+
+ # Verify _metadata.get was called three times.
+ assert mock_metadata_get.call_count == 3
+ # Verify lookup_trust_boundary was called with correct URL and token
+ expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations"
+ mock_lookup_tb.assert_called_once_with(
+ request, expected_url, headers={"authorization": "Bearer mock_token"}
+ )
+ # Verify trust boundary was set
+ assert creds._trust_boundary == {
+ "locations": ["us-central1"],
+ "encodedLocations": "0xABC",
+ }
+
+ # Verify x-allowed-locations header is set by apply()
+ headers_applied = {}
+ creds.apply(headers_applied)
+ assert headers_applied["x-allowed-locations"] == "0xABC"
+
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
+ def test_refresh_trust_boundary_lookup_fails_no_cache(
+ self, mock_lookup_tb, mock_metadata_get
+ ):
+ mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed")
+ creds = self.credentials
+ request = mock.Mock()
+
+ # Mock metadata calls for token, universe domain, and service account info
+ mock_metadata_get.side_effect = [
+ # from _retrieve_info
+ {"email": "[email protected]", "scopes": ["scope1"]},
+ # from get_service_account_token
+ {"access_token": "mock_token", "expires_in": 3600},
+ # from get_universe_domain
+ "",
+ ]
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ with pytest.raises(exceptions.RefreshError, match="Lookup failed"):
+ creds.refresh(request)
+
+ assert creds._trust_boundary is None
+ assert mock_metadata_get.call_count == 3
+ mock_lookup_tb.assert_called_once()
+
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
+ def test_refresh_trust_boundary_lookup_fails_with_cached_data(
+ self, mock_lookup_tb, mock_metadata_get
+ ):
+ # First refresh: Successfully fetch a valid trust boundary.
+ mock_lookup_tb.return_value = {
+ "locations": ["us-central1"],
+ "encodedLocations": "0xABC",
+ }
+ mock_metadata_get.side_effect = [
+ # from _retrieve_info
+ {"email": "[email protected]", "scopes": ["scope1"]},
+ # from get_service_account_token
+ {"access_token": "mock_token_1", "expires_in": 3600},
+ # from get_universe_domain
+ "",
+ ]
+ creds = self.credentials
+ request = mock.Mock()
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ creds.refresh(request)
+
+ assert creds._trust_boundary == {
+ "locations": ["us-central1"],
+ "encodedLocations": "0xABC",
+ }
+ mock_lookup_tb.assert_called_once()
+
+ # Second refresh: Mock lookup to fail, but expect cached data to be preserved.
+ mock_lookup_tb.reset_mock()
+ mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed")
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ # This refresh should not raise an error because a cached value exists.
+ mock_metadata_get.reset_mock()
+ mock_metadata_get.side_effect = [
+ # from _retrieve_info
+ {"email": "[email protected]", "scopes": ["scope1"]},
+ # from get_service_account_token
+ {"access_token": "mock_token_2", "expires_in": 3600},
+ # from get_universe_domain
+ "",
+ ]
+ creds.refresh(request)
+
+ assert creds._trust_boundary == {
+ "locations": ["us-central1"],
+ "encodedLocations": "0xABC",
+ }
+ mock_lookup_tb.assert_called_once()
+
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
+ def test_refresh_fetches_no_op_trust_boundary(
+ self, mock_lookup_tb, mock_metadata_get
+ ):
+ mock_lookup_tb.return_value = {"locations": [], "encodedLocations": "0x0"}
+ creds = self.credentials
+ request = mock.Mock()
+
+ mock_metadata_get.side_effect = [
+ # from _retrieve_info
+ {"email": "[email protected]", "scopes": ["scope1"]},
+ # from get_service_account_token
+ {"access_token": "mock_token", "expires_in": 3600},
+ # from get_universe_domain
+ "",
+ ]
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ creds.refresh(request)
+
+ assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"}
+ assert mock_metadata_get.call_count == 3
+ expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations"
+ mock_lookup_tb.assert_called_once_with(
+ request, expected_url, headers={"authorization": "Bearer mock_token"}
+ )
+ # Verify that an empty header was added.
+ headers_applied = {}
+ creds.apply(headers_applied)
+ assert headers_applied["x-allowed-locations"] == ""
+
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
+ def test_refresh_starts_with_no_op_trust_boundary_skips_lookup(
+ self, mock_lookup_tb, mock_metadata_get
+ ):
+ creds = self.credentials
+ # Use pre-cache universe domain to avoid an extra metadata call.
+ creds._universe_domain_cached = True
+ creds._trust_boundary = {"locations": [], "encodedLocations": "0x0"}
+ request = mock.Mock()
+
+ mock_metadata_get.side_effect = [
+ # from _retrieve_info
+ {"email": "[email protected]", "scopes": ["scope1"]},
+ # from get_service_account_token
+ {"access_token": "mock_token", "expires_in": 3600},
+ ]
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ creds.refresh(request)
+
+ # Verify trust boundary remained NO_OP
+ assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"}
+ # Lookup should be skipped
+ mock_lookup_tb.assert_not_called()
+ # Two metadata calls for token refresh should have happened.
+ assert mock_metadata_get.call_count == 2
+
+ # Verify that an empty header was added.
+ headers_applied = {}
+ creds.apply(headers_applied)
+ assert headers_applied["x-allowed-locations"] == ""
+
+ @mock.patch(
+ "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
+ )
+ @mock.patch(
+ "google.auth.compute_engine._metadata.get_universe_domain", autospec=True
+ )
+ def test_build_trust_boundary_lookup_url_default_email(
+ self, mock_get_universe_domain, mock_get_service_account_info
+ ):
+ # Test with default service account email, which needs resolution
+ creds = self.credentials
+ creds._service_account_email = "default"
+ mock_get_service_account_info.return_value = {
+ "email": "[email protected]"
+ }
+ mock_get_universe_domain.return_value = "googleapis.com"
+
+ url = creds._build_trust_boundary_lookup_url()
+
+ mock_get_service_account_info.assert_called_once_with(mock.ANY, "default")
+ mock_get_universe_domain.assert_called_once_with(mock.ANY)
+ assert url == (
+ "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations"
+ )
+
+ @mock.patch(
+ "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
+ )
+ @mock.patch(
+ "google.auth.compute_engine._metadata.get_universe_domain", autospec=True
+ )
+ def test_build_trust_boundary_lookup_url_explicit_email(
+ self, mock_get_universe_domain, mock_get_service_account_info
+ ):
+ # Test with an explicit service account email, no resolution needed
+ creds = self.credentials
+ creds._service_account_email = FAKE_SERVICE_ACCOUNT_EMAIL
+ mock_get_universe_domain.return_value = "googleapis.com"
+
+ url = creds._build_trust_boundary_lookup_url()
+
+ mock_get_service_account_info.assert_not_called()
+ mock_get_universe_domain.assert_called_once_with(mock.ANY)
+ assert url == (
+ "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations"
+ )
+
+ @mock.patch(
+ "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
+ )
+ @mock.patch(
+ "google.auth.compute_engine._metadata.get_universe_domain", autospec=True
+ )
+ def test_build_trust_boundary_lookup_url_non_default_universe(
+ self, mock_get_universe_domain, mock_get_service_account_info
+ ):
+ # Test with a non-default universe domain
+ creds = self.credentials_with_all_fields
+
+ url = creds._build_trust_boundary_lookup_url()
+
+ # Universe domain is cached and email is explicit, so no metadata calls needed.
+ mock_get_service_account_info.assert_not_called()
+ mock_get_universe_domain.assert_not_called()
+ assert url == (
+ "https://iamcredentials.fake-universe-domain/v1/projects/-/serviceAccounts/[email protected]/allowedLocations"
+ )
+
+ @mock.patch(
+ "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
+ )
+ def test_build_trust_boundary_lookup_url_get_service_account_info_fails(
+ self, mock_get_service_account_info
+ ):
+ # Test scenario where get_service_account_info fails
+ mock_get_service_account_info.side_effect = exceptions.TransportError(
+ "Failed to get info"
+ )
+ creds = self.credentials
+ creds._service_account_email = "default"
+
+ with pytest.raises(
+ exceptions.RefreshError,
+ match=r"Failed to get service account email for trust boundary lookup: .*",
+ ):
+ creds._build_trust_boundary_lookup_url()
+
+ @mock.patch(
+ "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
+ )
+ def test_build_trust_boundary_lookup_url_no_email(
+ self, mock_get_service_account_info
+ ):
+ # Test with default service account email, which needs resolution, but metadata
+ # returns no email.
+ creds = self.credentials
+ creds._service_account_email = "default"
+ mock_get_service_account_info.return_value = {"scopes": ["one", "two"]}
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ creds._build_trust_boundary_lookup_url()
+
+ assert excinfo.match(r"missing 'email' field")
+
class TestIDTokenCredentials(object):
credentials = None
@@ -466,7 +844,7 @@ class TestIDTokenCredentials(object):
@responses.activate
def test_with_target_audience_integration(self):
- """ Test that it is possible to refresh credentials
+ """Test that it is possible to refresh credentials
generated from `with_target_audience`.
Instead of mocking the methods, the HTTP responses
@@ -634,7 +1012,7 @@ class TestIDTokenCredentials(object):
@responses.activate
def test_with_quota_project_integration(self):
- """ Test that it is possible to refresh credentials
+ """Test that it is possible to refresh credentials
generated from `with_quota_project`.
Instead of mocking the methods, the HTTP responses
diff --git a/contrib/python/google-auth/py3/tests/oauth2/test__client.py b/contrib/python/google-auth/py3/tests/oauth2/test__client.py
index df77bbcd2f6..ec1725c7e29 100644
--- a/contrib/python/google-auth/py3/tests/oauth2/test__client.py
+++ b/contrib/python/google-auth/py3/tests/oauth2/test__client.py
@@ -325,7 +325,7 @@ def test_call_iam_generate_id_token_endpoint():
"fake_email",
"fake_audience",
"fake_access_token",
- "googleapis.com",
+ universe_domain="googleapis.com",
)
assert (
@@ -631,3 +631,178 @@ def test__token_endpoint_request_no_throw_with_retry(can_retry):
assert mock_request.call_count == 3
else:
assert mock_request.call_count == 1
+
+
+def test_lookup_trust_boundary():
+ response_data = {
+ "locations": ["us-central1", "us-east1"],
+ "encodedLocations": "0x80080000000000",
+ }
+
+ mock_response = mock.create_autospec(transport.Response, instance=True)
+ mock_response.status = http_client.OK
+ mock_response.data = json.dumps(response_data).encode("utf-8")
+
+ mock_request = mock.create_autospec(transport.Request)
+ mock_request.return_value = mock_response
+
+ url = "http://example.com"
+ headers = {"Authorization": "Bearer access_token"}
+ response = _client._lookup_trust_boundary(mock_request, url, headers=headers)
+
+ assert response["encodedLocations"] == "0x80080000000000"
+ assert response["locations"] == ["us-central1", "us-east1"]
+
+ mock_request.assert_called_once_with(method="GET", url=url, headers=headers)
+
+
+def test_lookup_trust_boundary_no_op_response_without_locations():
+ response_data = {"encodedLocations": "0x0"}
+
+ mock_response = mock.create_autospec(transport.Response, instance=True)
+ mock_response.status = http_client.OK
+ mock_response.data = json.dumps(response_data).encode("utf-8")
+
+ mock_request = mock.create_autospec(transport.Request)
+ mock_request.return_value = mock_response
+
+ url = "http://example.com"
+ headers = {"Authorization": "Bearer access_token"}
+ # for the response to be valid, we only need encodedLocations to be present.
+ response = _client._lookup_trust_boundary(mock_request, url, headers=headers)
+ assert response["encodedLocations"] == "0x0"
+ assert "locations" not in response
+
+ mock_request.assert_called_once_with(method="GET", url=url, headers=headers)
+
+
+def test_lookup_trust_boundary_no_op_response():
+ response_data = {"locations": [], "encodedLocations": "0x0"}
+
+ mock_response = mock.create_autospec(transport.Response, instance=True)
+ mock_response.status = http_client.OK
+ mock_response.data = json.dumps(response_data).encode("utf-8")
+
+ mock_request = mock.create_autospec(transport.Request)
+ mock_request.return_value = mock_response
+
+ url = "http://example.com"
+ headers = {"Authorization": "Bearer access_token"}
+ response = _client._lookup_trust_boundary(mock_request, url, headers=headers)
+
+ assert response["encodedLocations"] == "0x0"
+ assert response["locations"] == []
+
+ mock_request.assert_called_once_with(method="GET", url=url, headers=headers)
+
+
+def test_lookup_trust_boundary_error():
+ mock_response = mock.create_autospec(transport.Response, instance=True)
+ mock_response.status = http_client.INTERNAL_SERVER_ERROR
+ mock_response.data = "this is an error message"
+
+ mock_request = mock.create_autospec(transport.Request)
+ mock_request.return_value = mock_response
+
+ url = "http://example.com"
+ headers = {"Authorization": "Bearer access_token"}
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ _client._lookup_trust_boundary(mock_request, url, headers=headers)
+ assert excinfo.match("this is an error message")
+
+ mock_request.assert_called_with(method="GET", url=url, headers=headers)
+
+
+def test_lookup_trust_boundary_missing_encoded_locations():
+ response_data = {"locations": [], "bad_field": "0x0"}
+
+ mock_response = mock.create_autospec(transport.Response, instance=True)
+ mock_response.status = http_client.OK
+ mock_response.data = json.dumps(response_data).encode("utf-8")
+
+ mock_request = mock.create_autospec(transport.Request)
+ mock_request.return_value = mock_response
+
+ url = "http://example.com"
+ headers = {"Authorization": "Bearer access_token"}
+ with pytest.raises(exceptions.MalformedError) as excinfo:
+ _client._lookup_trust_boundary(mock_request, url, headers=headers)
+ assert excinfo.match("Invalid trust boundary info")
+
+ mock_request.assert_called_once_with(method="GET", url=url, headers=headers)
+
+
+def test_lookup_trust_boundary_internal_failure_and_retry_failure_error():
+ retryable_error = mock.create_autospec(transport.Response, instance=True)
+ retryable_error.status = http_client.BAD_REQUEST
+ retryable_error.data = json.dumps({"error_description": "internal_failure"}).encode(
+ "utf-8"
+ )
+
+ unretryable_error = mock.create_autospec(transport.Response, instance=True)
+ unretryable_error.status = http_client.BAD_REQUEST
+ unretryable_error.data = json.dumps({"error_description": "invalid_scope"}).encode(
+ "utf-8"
+ )
+
+ request = mock.create_autospec(transport.Request)
+
+ request.side_effect = [retryable_error, retryable_error, unretryable_error]
+ headers = {"Authorization": "Bearer access_token"}
+
+ with pytest.raises(exceptions.RefreshError):
+ _client._lookup_trust_boundary_request(
+ request, "http://example.com", headers=headers
+ )
+ # request should be called three times. Two retryable errors and one
+ # unretryable error to break the retry loop.
+ assert request.call_count == 3
+ for call in request.call_args_list:
+ assert call[1]["headers"] == headers
+
+
+def test_lookup_trust_boundary_internal_failure_and_retry_succeeds():
+ retryable_error = mock.create_autospec(transport.Response, instance=True)
+ retryable_error.status = http_client.BAD_REQUEST
+ retryable_error.data = json.dumps({"error_description": "internal_failure"}).encode(
+ "utf-8"
+ )
+
+ response_data = {"locations": [], "encodedLocations": "0x0"}
+ response = mock.create_autospec(transport.Response, instance=True)
+ response.status = http_client.OK
+ response.data = json.dumps(response_data).encode("utf-8")
+
+ request = mock.create_autospec(transport.Request)
+
+ headers = {"Authorization": "Bearer access_token"}
+ request.side_effect = [retryable_error, response]
+
+ _ = _client._lookup_trust_boundary_request(
+ request, "http://example.com", headers=headers
+ )
+
+ assert request.call_count == 2
+ for call in request.call_args_list:
+ assert call[1]["headers"] == headers
+
+
+def test_lookup_trust_boundary_with_headers():
+ response_data = {
+ "locations": ["us-central1", "us-east1"],
+ "encodedLocations": "0x80080000000000",
+ }
+
+ mock_response = mock.create_autospec(transport.Response, instance=True)
+ mock_response.status = http_client.OK
+ mock_response.data = json.dumps(response_data).encode("utf-8")
+
+ mock_request = mock.create_autospec(transport.Request)
+ mock_request.return_value = mock_response
+ headers = {"Authorization": "Bearer access_token", "x-test-header": "test-value"}
+
+ _client._lookup_trust_boundary(mock_request, "http://example.com", headers=headers)
+
+ mock_request.assert_called_once_with(
+ method="GET", url="http://example.com", headers=headers
+ )
diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py b/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py
index e60c8200f4b..12f43afc0f7 100644
--- a/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py
+++ b/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py
@@ -20,7 +20,9 @@ import mock
import pytest # type: ignore
from google.auth import _helpers
+from google.auth import credentials
from google.auth import crypt
+from google.auth import environment_vars
from google.auth import exceptions
from google.auth import iam
from google.auth import jwt
@@ -59,14 +61,31 @@ SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
class TestCredentials(object):
SERVICE_ACCOUNT_EMAIL = "[email protected]"
TOKEN_URI = "https://example.com/oauth2/token"
+ NO_OP_TRUST_BOUNDARY = {
+ "locations": credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS,
+ "encodedLocations": credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS,
+ }
+ VALID_TRUST_BOUNDARY = {
+ "locations": ["us-central1", "us-east1"],
+ "encodedLocations": "0xVALIDHEXSA",
+ }
+ EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = (
+ "https://iamcredentials.googleapis.com/v1/projects/-"
+ "/serviceAccounts/[email protected]/allowedLocations"
+ )
@classmethod
- def make_credentials(cls, universe_domain=DEFAULT_UNIVERSE_DOMAIN):
+ def make_credentials(
+ cls,
+ universe_domain=DEFAULT_UNIVERSE_DOMAIN,
+ trust_boundary=None, # Align with Credentials class default
+ ):
return service_account.Credentials(
SIGNER,
cls.SERVICE_ACCOUNT_EMAIL,
cls.TOKEN_URI,
universe_domain=universe_domain,
+ trust_boundary=trust_boundary,
)
def test_get_cred_info(self):
@@ -252,6 +271,18 @@ class TestCredentials(object):
"always_use_jwt_access should be True for non-default universe domain"
)
+ def test_with_trust_boundary(self):
+ credentials = self.make_credentials()
+ new_boundary = {"encodedLocations": "new_boundary"}
+ new_credentials = credentials.with_trust_boundary(new_boundary)
+
+ assert new_credentials is not credentials
+ assert new_credentials._trust_boundary == new_boundary
+ assert new_credentials._signer == credentials._signer
+ assert (
+ new_credentials.service_account_email == credentials.service_account_email
+ )
+
def test__make_authorization_grant_assertion(self):
credentials = self.make_credentials()
token = credentials._make_authorization_grant_assertion()
@@ -496,10 +527,42 @@ class TestCredentials(object):
# Check that the credentials have the token.
assert credentials.token == token
- # Check that the credentials are valid (have a token and are not
- # expired)
+ # Check that the credentials are valid (have a token and are not expired).
assert credentials.valid
+ # Trust boundary should be None since env var is not set and no initial
+ # boundary was provided.
+ assert credentials._trust_boundary is None
+
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
+ def test_refresh_skips_trust_boundary_lookup_non_default_universe(
+ self, mock_jwt_grant, mock_lookup_trust_boundary
+ ):
+ # Create credentials with a non-default universe domain
+ credentials = self.make_credentials(universe_domain=FAKE_UNIVERSE_DOMAIN)
+ token = "token"
+ mock_jwt_grant.return_value = (
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ {},
+ )
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ credentials.refresh(request)
+
+ # Ensure jwt_grant was called (token refresh happened)
+ mock_jwt_grant.assert_called_once()
+ # Ensure trust boundary lookup was not called
+ mock_lookup_trust_boundary.assert_not_called()
+ # Verify that x-allowed-locations header is not set by apply()
+ headers_applied = {}
+ credentials.apply(headers_applied)
+ assert "x-allowed-locations" not in headers_applied
+
@mock.patch("google.oauth2._client.jwt_grant", autospec=True)
def test_before_request_refreshes(self, jwt_grant):
credentials = self.make_credentials()
@@ -608,6 +671,208 @@ class TestCredentials(object):
credentials.refresh(None)
assert excinfo.match("domain wide delegation is not supported")
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
+ def test_refresh_success_with_valid_trust_boundary(
+ self, mock_jwt_grant, mock_lookup_trust_boundary
+ ):
+ # Start with no boundary.
+ credentials = self.make_credentials(trust_boundary=None)
+ token = "token"
+ mock_jwt_grant.return_value = (
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ {},
+ )
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ # Mock the trust boundary lookup to return a valid boundary.
+ mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert credentials.token == token
+
+ # Verify trust boundary was set.
+ assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
+
+ # Verify the mock was called with the correct URL.
+ mock_lookup_trust_boundary.assert_called_once_with(
+ request,
+ self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE,
+ headers={"authorization": "Bearer token"},
+ )
+
+ # Verify x-allowed-locations header is set correctly by apply().
+ headers_applied = {}
+ credentials.apply(headers_applied)
+ assert (
+ headers_applied["x-allowed-locations"]
+ == self.VALID_TRUST_BOUNDARY["encodedLocations"]
+ )
+
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
+ def test_refresh_fetches_no_op_trust_boundary(
+ self, mock_jwt_grant, mock_lookup_trust_boundary
+ ):
+ # Start with no trust boundary
+ credentials = self.make_credentials(trust_boundary=None)
+ token = "token"
+ mock_jwt_grant.return_value = (
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ {},
+ )
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ mock_lookup_trust_boundary.return_value = self.NO_OP_TRUST_BOUNDARY
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert credentials.token == token
+ assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY
+ mock_lookup_trust_boundary.assert_called_once_with(
+ request,
+ self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE,
+ headers={"authorization": "Bearer token"},
+ )
+ headers_applied = {}
+ credentials.apply(headers_applied)
+ assert headers_applied["x-allowed-locations"] == ""
+
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
+ def test_refresh_starts_with_no_op_trust_boundary_skips_lookup(
+ self, mock_jwt_grant, mock_lookup_trust_boundary
+ ):
+ credentials = self.make_credentials(
+ trust_boundary=self.NO_OP_TRUST_BOUNDARY
+ ) # Start with NO_OP
+ token = "token"
+ mock_jwt_grant.return_value = (
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ {},
+ )
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert credentials.token == token
+ # Verify trust boundary remained NO_OP
+ assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY
+
+ # Lookup should be skipped
+ mock_lookup_trust_boundary.assert_not_called()
+
+ # Verify that an empty header was added.
+ headers_applied = {}
+ credentials.apply(headers_applied)
+ assert headers_applied["x-allowed-locations"] == ""
+
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
+ def test_refresh_trust_boundary_lookup_fails_no_cache(
+ self, mock_jwt_grant, mock_lookup_trust_boundary
+ ):
+ # Start with no trust boundary
+ credentials = self.make_credentials(trust_boundary=None)
+ mock_lookup_trust_boundary.side_effect = exceptions.RefreshError(
+ "Lookup failed"
+ )
+ mock_jwt_grant.return_value = (
+ "mock_access_token",
+ _helpers.utcnow() + datetime.timedelta(seconds=3600),
+ {},
+ )
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ # Mock the trust boundary lookup to raise an error
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ), pytest.raises(exceptions.RefreshError, match="Lookup failed"):
+ credentials.refresh(request)
+
+ assert credentials._trust_boundary is None
+ mock_lookup_trust_boundary.assert_called_once()
+
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
+ def test_refresh_trust_boundary_lookup_fails_with_cached_data(
+ self, mock_jwt_grant, mock_lookup_trust_boundary
+ ):
+ # Initial setup: Credentials with no trust boundary.
+ credentials = self.make_credentials(trust_boundary=None)
+ token = "token"
+ mock_jwt_grant.return_value = (
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ {},
+ )
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ # First refresh: Successfully fetch a valid trust boundary.
+ mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert credentials.token == token
+ assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
+ mock_lookup_trust_boundary.assert_called_once_with(
+ request,
+ self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE,
+ headers={"authorization": "Bearer token"},
+ )
+
+ # Second refresh: Mock lookup to fail, but expect cached data to be preserved.
+ mock_lookup_trust_boundary.reset_mock()
+ mock_lookup_trust_boundary.side_effect = exceptions.RefreshError(
+ "Lookup failed"
+ )
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ credentials.refresh(request) # This should NOT raise an exception
+
+ assert credentials.valid # Credentials should still be valid
+ assert (
+ credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
+ ) # Cached data should be preserved
+ mock_lookup_trust_boundary.assert_called_once_with(
+ request,
+ self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE,
+ headers={
+ "authorization": "Bearer token",
+ "x-allowed-locations": self.VALID_TRUST_BOUNDARY["encodedLocations"],
+ },
+ ) # Lookup should have been attempted again
+
+ def test_build_trust_boundary_lookup_url_no_email(self):
+ credentials = self.make_credentials()
+ credentials._service_account_email = None
+
+ with pytest.raises(ValueError) as excinfo:
+ credentials._build_trust_boundary_lookup_url()
+
+ assert "Service account email is required" in str(excinfo.value)
+
class TestIDTokenCredentials(object):
SERVICE_ACCOUNT_EMAIL = "[email protected]"
@@ -790,9 +1055,14 @@ class TestIDTokenCredentials(object):
)
request = mock.Mock()
credentials.refresh(request)
- req, iam_endpoint, signer_email, target_audience, access_token, universe_domain = call_iam_generate_id_token_endpoint.call_args[
- 0
- ]
+ (
+ req,
+ iam_endpoint,
+ signer_email,
+ target_audience,
+ access_token,
+ universe_domain,
+ ) = call_iam_generate_id_token_endpoint.call_args[0]
assert req == request
assert iam_endpoint == iam._IAM_IDTOKEN_ENDPOINT
assert signer_email == "[email protected]"
@@ -812,9 +1082,14 @@ class TestIDTokenCredentials(object):
)
request = mock.Mock()
credentials.refresh(request)
- req, iam_endpoint, signer_email, target_audience, access_token, universe_domain = call_iam_generate_id_token_endpoint.call_args[
- 0
- ]
+ (
+ req,
+ iam_endpoint,
+ signer_email,
+ target_audience,
+ access_token,
+ universe_domain,
+ ) = call_iam_generate_id_token_endpoint.call_args[0]
assert req == request
assert (
iam_endpoint
diff --git a/contrib/python/google-auth/py3/tests/test__helpers.py b/contrib/python/google-auth/py3/tests/test__helpers.py
index a4337c01608..ce3ec11e29c 100644
--- a/contrib/python/google-auth/py3/tests/test__helpers.py
+++ b/contrib/python/google-auth/py3/tests/test__helpers.py
@@ -20,7 +20,7 @@ import urllib
import pytest # type: ignore
-from google.auth import _helpers
+from google.auth import _helpers, exceptions
# _MOCK_BASE_LOGGER_NAME is the base logger namespace used for testing.
_MOCK_BASE_LOGGER_NAME = "foogle"
@@ -234,6 +234,33 @@ def test_unpadded_urlsafe_b64encode():
assert _helpers.unpadded_urlsafe_b64encode(case) == expected
+def test_get_bool_from_env(monkeypatch):
+ # Test default value when environment variable is not set.
+ assert _helpers.get_bool_from_env("TEST_VAR") is False
+ assert _helpers.get_bool_from_env("TEST_VAR", default=True) is True
+
+ # Test true values (case-insensitive)
+ for true_value in ("true", "True", "TRUE", "1"):
+ monkeypatch.setenv("TEST_VAR", true_value)
+ assert _helpers.get_bool_from_env("TEST_VAR") is True
+
+ # Test false values (case-insensitive)
+ for false_value in ("false", "False", "FALSE", "0"):
+ monkeypatch.setenv("TEST_VAR", false_value)
+ assert _helpers.get_bool_from_env("TEST_VAR") is False
+
+ # Test invalid value
+ monkeypatch.setenv("TEST_VAR", "invalid_value")
+ with pytest.raises(exceptions.InvalidValue) as excinfo:
+ _helpers.get_bool_from_env("TEST_VAR")
+ assert 'must be one of "true", "false", "1", or "0"' in str(excinfo.value)
+
+ # Test empty string value
+ monkeypatch.setenv("TEST_VAR", "")
+ with pytest.raises(exceptions.InvalidValue):
+ _helpers.get_bool_from_env("TEST_VAR")
+
+
def test_hash_sensitive_info_basic():
test_data = {
"expires_in": 3599,
diff --git a/contrib/python/google-auth/py3/tests/test_aws.py b/contrib/python/google-auth/py3/tests/test_aws.py
index df1f02e7d70..41ce970d1e8 100644
--- a/contrib/python/google-auth/py3/tests/test_aws.py
+++ b/contrib/python/google-auth/py3/tests/test_aws.py
@@ -920,7 +920,7 @@ class TestCredentials(object):
assert request_kwargs["body"] is not None
body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
assert len(body_tuples) == len(request_data.keys())
- for (k, v) in body_tuples:
+ for k, v in body_tuples:
assert v.decode("utf-8") == request_data[k.decode("utf-8")]
@classmethod
@@ -2057,7 +2057,9 @@ class TestCredentials(object):
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
"x-goog-user-project": QUOTA_PROJECT_ID,
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-allowed-locations": "0x0",
+ # TODO(negarb): Uncomment and update when trust boundary is supported
+ # for external account credentials.
+ # "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
@@ -2150,7 +2152,7 @@ class TestCredentials(object):
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
"x-goog-user-project": QUOTA_PROJECT_ID,
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
@@ -2345,7 +2347,7 @@ class TestCredentials(object):
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
"x-goog-user-project": QUOTA_PROJECT_ID,
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
diff --git a/contrib/python/google-auth/py3/tests/test_credentials.py b/contrib/python/google-auth/py3/tests/test_credentials.py
index e11bcb4e551..1fb88009691 100644
--- a/contrib/python/google-auth/py3/tests/test_credentials.py
+++ b/contrib/python/google-auth/py3/tests/test_credentials.py
@@ -13,17 +13,21 @@
# limitations under the License.
import datetime
+import os
import mock
import pytest # type: ignore
from google.auth import _helpers
from google.auth import credentials
+from google.auth import environment_vars
+from google.auth import exceptions
+from google.oauth2 import _client
-class CredentialsImpl(credentials.Credentials):
- def refresh(self, request):
- self.token = request
+class CredentialsImpl(credentials.CredentialsWithTrustBoundary):
+ def _refresh_token(self, request):
+ self.token = "refreshed-token"
self.expiry = (
datetime.datetime.utcnow()
+ _helpers.REFRESH_THRESHOLD
@@ -33,6 +37,10 @@ class CredentialsImpl(credentials.Credentials):
def with_quota_project(self, quota_project_id):
raise NotImplementedError()
+ def _build_trust_boundary_lookup_url(self):
+ # Using self.token here to make the URL dynamic for testing purposes
+ return "http://mock.url/lookup_for_{}".format(self.token)
+
class CredentialsImplWithMetrics(credentials.Credentials):
def refresh(self, request):
@@ -89,49 +97,49 @@ def test_expired_and_valid():
def test_before_request():
credentials = CredentialsImpl()
- request = "token"
+ request = mock.Mock()
headers = {}
# First call should call refresh, setting the token.
credentials.before_request(request, "http://example.com", "GET", headers)
assert credentials.valid
- assert credentials.token == "token"
- assert headers["authorization"] == "Bearer token"
+ assert credentials.token == "refreshed-token"
+ assert headers["authorization"] == "Bearer refreshed-token"
assert "x-allowed-locations" not in headers
- request = "token2"
+ request = mock.Mock()
headers = {}
# Second call shouldn't call refresh.
credentials.before_request(request, "http://example.com", "GET", headers)
assert credentials.valid
- assert credentials.token == "token"
- assert headers["authorization"] == "Bearer token"
+ assert credentials.token == "refreshed-token"
+ assert headers["authorization"] == "Bearer refreshed-token"
assert "x-allowed-locations" not in headers
def test_before_request_with_trust_boundary():
DUMMY_BOUNDARY = "0xA30"
credentials = CredentialsImpl()
- credentials._trust_boundary = {"locations": [], "encoded_locations": DUMMY_BOUNDARY}
- request = "token"
+ credentials._trust_boundary = {"locations": [], "encodedLocations": DUMMY_BOUNDARY}
+ request = mock.Mock()
headers = {}
# First call should call refresh, setting the token.
credentials.before_request(request, "http://example.com", "GET", headers)
assert credentials.valid
- assert credentials.token == "token"
- assert headers["authorization"] == "Bearer token"
+ assert credentials.token == "refreshed-token"
+ assert headers["authorization"] == "Bearer refreshed-token"
assert headers["x-allowed-locations"] == DUMMY_BOUNDARY
- request = "token2"
+ request = mock.Mock()
headers = {}
# Second call shouldn't call refresh.
credentials.before_request(request, "http://example.com", "GET", headers)
assert credentials.valid
- assert credentials.token == "token"
- assert headers["authorization"] == "Bearer token"
+ assert credentials.token == "refreshed-token"
+ assert headers["authorization"] == "Bearer refreshed-token"
assert headers["x-allowed-locations"] == DUMMY_BOUNDARY
@@ -198,6 +206,18 @@ def test_readonly_scoped_credentials_scopes():
assert credentials.has_scopes(["one", "two"])
assert not credentials.has_scopes(["three"])
+ # Test with default scopes
+ credentials_with_default = ReadOnlyScopedCredentialsImpl()
+ credentials_with_default._default_scopes = ["one", "two"]
+ assert credentials_with_default.has_scopes(["one", "two"])
+ assert not credentials_with_default.has_scopes(["three"])
+
+ # Test with no scopes
+ credentials_no_scopes = ReadOnlyScopedCredentialsImpl()
+ assert not credentials_no_scopes.has_scopes(["one"])
+
+ assert credentials_no_scopes.has_scopes([])
+
def test_readonly_scoped_credentials_requires_scopes():
credentials = ReadOnlyScopedCredentialsImpl()
@@ -245,7 +265,7 @@ def test_nonblocking_refresh_fresh_credentials():
c._refresh_worker = mock.MagicMock()
- request = "token"
+ request = mock.Mock()
c.refresh(request)
assert c.token_state == credentials.TokenState.FRESH
@@ -258,7 +278,7 @@ def test_nonblocking_refresh_invalid_credentials():
c = CredentialsImpl()
c.with_non_blocking_refresh()
- request = "token"
+ request = mock.Mock()
headers = {}
assert c.token_state == credentials.TokenState.INVALID
@@ -266,8 +286,8 @@ def test_nonblocking_refresh_invalid_credentials():
c.before_request(request, "http://example.com", "GET", headers)
assert c.token_state == credentials.TokenState.FRESH
assert c.valid
- assert c.token == "token"
- assert headers["authorization"] == "Bearer token"
+ assert c.token == "refreshed-token"
+ assert headers["authorization"] == "Bearer refreshed-token"
assert "x-identity-trust-boundary" not in headers
@@ -275,7 +295,7 @@ def test_nonblocking_refresh_stale_credentials():
c = CredentialsImpl()
c.with_non_blocking_refresh()
- request = "token"
+ request = mock.Mock()
headers = {}
# Invalid credentials MUST require a blocking refresh.
@@ -296,8 +316,8 @@ def test_nonblocking_refresh_stale_credentials():
assert c.token_state == credentials.TokenState.FRESH
assert c.valid
- assert c.token == "token"
- assert headers["authorization"] == "Bearer token"
+ assert c.token == "refreshed-token"
+ assert headers["authorization"] == "Bearer refreshed-token"
assert "x-identity-trust-boundary" not in headers
@@ -305,7 +325,7 @@ def test_nonblocking_refresh_failed_credentials():
c = CredentialsImpl()
c.with_non_blocking_refresh()
- request = "token"
+ request = mock.Mock()
headers = {}
# Invalid credentials MUST require a blocking refresh.
@@ -328,18 +348,130 @@ def test_nonblocking_refresh_failed_credentials():
assert c.token_state == credentials.TokenState.FRESH
assert c.valid
- assert c.token == "token"
- assert headers["authorization"] == "Bearer token"
+ assert c.token == "refreshed-token"
+ assert headers["authorization"] == "Bearer refreshed-token"
assert "x-identity-trust-boundary" not in headers
def test_token_state_no_expiry():
c = CredentialsImpl()
- request = "token"
+ request = mock.Mock()
c.refresh(request)
c.expiry = None
assert c.token_state == credentials.TokenState.FRESH
c.before_request(request, "http://example.com", "GET", {})
+
+
+class TestCredentialsWithTrustBoundary(object):
+ @mock.patch.object(_client, "_lookup_trust_boundary")
+ def test_lookup_trust_boundary_env_var_not_true(self, mock_lookup_tb):
+ creds = CredentialsImpl()
+ request = mock.Mock()
+
+ # Ensure env var is not "true"
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "false"}
+ ):
+ result = creds._refresh_trust_boundary(request)
+
+ assert result is None
+ mock_lookup_tb.assert_not_called()
+
+ @mock.patch.object(_client, "_lookup_trust_boundary")
+ def test_lookup_trust_boundary_env_var_missing(self, mock_lookup_tb):
+ creds = CredentialsImpl()
+ request = mock.Mock()
+
+ # Ensure env var is missing
+ with mock.patch.dict(os.environ, clear=True):
+ result = creds._refresh_trust_boundary(request)
+
+ assert result is None
+ mock_lookup_tb.assert_not_called()
+
+ @mock.patch.object(_client, "_lookup_trust_boundary")
+ def test_lookup_trust_boundary_non_default_universe(self, mock_lookup_tb):
+ creds = CredentialsImpl()
+ creds._universe_domain = "my.universe.com" # Non-GDU
+ request = mock.Mock()
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ result = creds._refresh_trust_boundary(request)
+
+ assert result is None
+ mock_lookup_tb.assert_not_called()
+
+ @mock.patch.object(_client, "_lookup_trust_boundary")
+ def test_lookup_trust_boundary_calls_client_and_build_url(self, mock_lookup_tb):
+ creds = CredentialsImpl()
+ creds.token = "test_token" # For _build_trust_boundary_lookup_url
+ request = mock.Mock()
+ expected_url = "http://mock.url/lookup_for_test_token"
+ expected_boundary_info = {"encodedLocations": "0xABC"}
+ mock_lookup_tb.return_value = expected_boundary_info
+
+ # Mock _build_trust_boundary_lookup_url to ensure it's called.
+ mock_build_url = mock.Mock(return_value=expected_url)
+ creds._build_trust_boundary_lookup_url = mock_build_url
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ result = creds._lookup_trust_boundary(request)
+
+ assert result == expected_boundary_info
+ mock_build_url.assert_called_once()
+ expected_headers = {"authorization": "Bearer test_token"}
+ mock_lookup_tb.assert_called_once_with(
+ request, expected_url, headers=expected_headers
+ )
+
+ @mock.patch.object(_client, "_lookup_trust_boundary")
+ def test_lookup_trust_boundary_build_url_returns_none(self, mock_lookup_tb):
+ creds = CredentialsImpl()
+ request = mock.Mock()
+
+ # Mock _build_trust_boundary_lookup_url to return None
+ mock_build_url = mock.Mock(return_value=None)
+ creds._build_trust_boundary_lookup_url = mock_build_url
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ with pytest.raises(
+ exceptions.InvalidValue,
+ match="Failed to build trust boundary lookup URL.",
+ ):
+ creds._lookup_trust_boundary(request)
+
+ mock_build_url.assert_called_once() # Ensure _build_trust_boundary_lookup_url was called
+ mock_lookup_tb.assert_not_called() # Ensure _client.lookup_trust_boundary was not called
+
+ @mock.patch("google.auth.credentials._LOGGER")
+ @mock.patch("google.auth._helpers.is_logging_enabled", return_value=True)
+ @mock.patch.object(_client, "_lookup_trust_boundary")
+ def test_refresh_trust_boundary_fails_with_cached_data_and_logging(
+ self, mock_lookup_tb, mock_is_logging_enabled, mock_logger
+ ):
+ creds = CredentialsImpl()
+ creds._trust_boundary = {"encodedLocations": "0xABC"}
+ request = mock.Mock()
+
+ refresh_error = exceptions.RefreshError("Lookup failed")
+ mock_lookup_tb.side_effect = refresh_error
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ creds.refresh(request)
+
+ mock_lookup_tb.assert_called_once()
+ mock_is_logging_enabled.assert_called_once_with(mock_logger)
+ mock_logger.debug.assert_called_once_with(
+ "Using cached trust boundary due to refresh error: %s", refresh_error
+ )
diff --git a/contrib/python/google-auth/py3/tests/test_external_account.py b/contrib/python/google-auth/py3/tests/test_external_account.py
index bddcb4afa1a..d86a19bef16 100644
--- a/contrib/python/google-auth/py3/tests/test_external_account.py
+++ b/contrib/python/google-auth/py3/tests/test_external_account.py
@@ -247,7 +247,7 @@ class TestCredentials(object):
assert "cert" not in request_kwargs
assert request_kwargs["body"] is not None
body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
- for (k, v) in body_tuples:
+ for k, v in body_tuples:
assert v.decode("utf-8") == request_data[k.decode("utf-8")]
assert len(body_tuples) == len(request_data.keys())
@@ -920,7 +920,9 @@ class TestCredentials(object):
"Content-Type": "application/json",
"authorization": "Bearer {}".format(token_response["access_token"]),
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-allowed-locations": "0x0",
+ # TODO(negarb): Uncomment and update when trust boundary is supported
+ # for external account credentials.
+ # "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
@@ -1010,7 +1012,7 @@ class TestCredentials(object):
"Content-Type": "application/json",
"authorization": "Bearer {}".format(token_response["access_token"]),
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
@@ -1097,7 +1099,7 @@ class TestCredentials(object):
"Content-Type": "application/json",
"authorization": "Bearer {}".format(token_response["access_token"]),
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
@@ -1331,7 +1333,7 @@ class TestCredentials(object):
"Content-Type": "application/json",
"authorization": "Bearer {}".format(token_response["access_token"]),
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
@@ -1415,7 +1417,7 @@ class TestCredentials(object):
"Content-Type": "application/json",
"authorization": "Bearer {}".format(token_response["access_token"]),
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
@@ -1471,7 +1473,7 @@ class TestCredentials(object):
assert headers == {
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
def test_apply_workforce_without_quota_project_id(self):
@@ -1488,7 +1490,7 @@ class TestCredentials(object):
assert headers == {
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
def test_apply_impersonation_without_quota_project_id(self):
@@ -1520,7 +1522,7 @@ class TestCredentials(object):
assert headers == {
"authorization": "Bearer {}".format(impersonation_response["accessToken"]),
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
def test_apply_with_quota_project_id(self):
@@ -1537,7 +1539,7 @@ class TestCredentials(object):
"other": "header-value",
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
"x-goog-user-project": self.QUOTA_PROJECT_ID,
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
def test_apply_impersonation_with_quota_project_id(self):
@@ -1572,7 +1574,7 @@ class TestCredentials(object):
"other": "header-value",
"authorization": "Bearer {}".format(impersonation_response["accessToken"]),
"x-goog-user-project": self.QUOTA_PROJECT_ID,
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
def test_before_request(self):
@@ -1588,7 +1590,7 @@ class TestCredentials(object):
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
# Second call shouldn't call refresh.
@@ -1597,7 +1599,7 @@ class TestCredentials(object):
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
def test_before_request_workforce(self):
@@ -1615,7 +1617,7 @@ class TestCredentials(object):
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
# Second call shouldn't call refresh.
@@ -1624,7 +1626,7 @@ class TestCredentials(object):
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
def test_before_request_impersonation(self):
@@ -1655,7 +1657,7 @@ class TestCredentials(object):
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(impersonation_response["accessToken"]),
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
# Second call shouldn't call refresh.
@@ -1664,7 +1666,7 @@ class TestCredentials(object):
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(impersonation_response["accessToken"]),
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
@mock.patch("google.auth._helpers.utcnow")
@@ -1693,7 +1695,7 @@ class TestCredentials(object):
# Cached token should be used.
assert headers == {
"authorization": "Bearer token",
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
# Next call should simulate 1 second passed.
@@ -1709,7 +1711,7 @@ class TestCredentials(object):
# New token should be retrieved.
assert headers == {
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
@mock.patch("google.auth._helpers.utcnow")
@@ -1754,7 +1756,7 @@ class TestCredentials(object):
# Cached token should be used.
assert headers == {
"authorization": "Bearer token",
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
# Next call should simulate 1 second passed. This will trigger the expiration
@@ -1773,7 +1775,7 @@ class TestCredentials(object):
# New token should be retrieved.
assert headers == {
"authorization": "Bearer {}".format(impersonation_response["accessToken"]),
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
@pytest.mark.parametrize(
@@ -1872,7 +1874,7 @@ class TestCredentials(object):
"x-goog-user-project": self.QUOTA_PROJECT_ID,
"authorization": "Bearer {}".format(token_response["access_token"]),
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
@@ -1926,7 +1928,7 @@ class TestCredentials(object):
"authorization": "Bearer {}".format(
impersonation_response["accessToken"]
),
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
},
)
@@ -1998,7 +2000,7 @@ class TestCredentials(object):
"authorization": "Bearer {}".format(
self.SUCCESS_RESPONSE["access_token"]
),
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
},
)
@@ -2048,7 +2050,7 @@ class TestCredentials(object):
"Content-Type": "application/json",
"authorization": "Bearer {}".format(token_response["access_token"]),
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- "x-allowed-locations": "0x0",
+ # "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
diff --git a/contrib/python/google-auth/py3/tests/test_identity_pool.py b/contrib/python/google-auth/py3/tests/test_identity_pool.py
index 4d78a5c22ea..8ca2892e2e6 100644
--- a/contrib/python/google-auth/py3/tests/test_identity_pool.py
+++ b/contrib/python/google-auth/py3/tests/test_identity_pool.py
@@ -285,7 +285,7 @@ class TestCredentials(object):
assert request_kwargs["body"] is not None
body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
assert len(body_tuples) == len(request_data.keys())
- for (k, v) in body_tuples:
+ for k, v in body_tuples:
assert v.decode("utf-8") == request_data[k.decode("utf-8")]
@classmethod
@@ -384,7 +384,9 @@ class TestCredentials(object):
"Content-Type": "application/json",
"authorization": "Bearer {}".format(token_response["access_token"]),
"x-goog-api-client": metrics_header_value,
- "x-allowed-locations": "0x0",
+ # TODO(negarb): Uncomment and update when trust boundary is supported
+ # for external account credentials.
+ # "x-allowed-locations": "0x0",
}
impersonation_request_data = {
"delegates": None,
diff --git a/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py b/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py
index 9aeb505fdd9..a9f45f88948 100644
--- a/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py
+++ b/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py
@@ -22,7 +22,9 @@ import mock
import pytest # type: ignore
from google.auth import _helpers
+from google.auth import credentials as auth_credentials
from google.auth import crypt
+from google.auth import environment_vars
from google.auth import exceptions
from google.auth import impersonated_credentials
from google.auth import transport
@@ -128,8 +130,21 @@ class TestImpersonatedCredentials(object):
# Because Python 2.7:
DELEGATES = [] # type: ignore
LIFETIME = 3600
+ NO_OP_TRUST_BOUNDARY = {
+ "locations": auth_credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS,
+ "encodedLocations": auth_credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS,
+ }
+ VALID_TRUST_BOUNDARY = {
+ "locations": ["us-central1", "us-east1"],
+ "encodedLocations": "0xVALIDHEX",
+ }
+ EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = (
+ "https://iamcredentials.googleapis.com/v1/projects/-"
+ "/serviceAccounts/[email protected]/allowedLocations"
+ )
+ FAKE_UNIVERSE_DOMAIN = "universe.foo"
SOURCE_CREDENTIALS = service_account.Credentials(
- SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI
+ SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI, trust_boundary=NO_OP_TRUST_BOUNDARY
)
USER_SOURCE_CREDENTIALS = credentials.Credentials(token="ABCDE")
IAM_ENDPOINT_OVERRIDE = (
@@ -144,6 +159,7 @@ class TestImpersonatedCredentials(object):
target_principal=TARGET_PRINCIPAL,
subject=None,
iam_endpoint_override=None,
+ trust_boundary=None, # Align with Credentials class default
):
return Credentials(
@@ -154,6 +170,7 @@ class TestImpersonatedCredentials(object):
lifetime=lifetime,
subject=subject,
iam_endpoint_override=iam_endpoint_override,
+ trust_boundary=trust_boundary,
)
def test_from_impersonated_service_account_info(self):
@@ -163,7 +180,7 @@ class TestImpersonatedCredentials(object):
assert isinstance(credentials, impersonated_credentials.Credentials)
def test_from_impersonated_service_account_info_with_invalid_source_credentials_type(
- self
+ self,
):
info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO)
assert "source_credentials" in info
@@ -178,7 +195,7 @@ class TestImpersonatedCredentials(object):
)
def test_from_impersonated_service_account_info_with_invalid_impersonation_url(
- self
+ self,
):
info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO)
info["service_account_impersonation_url"] = "invalid_url"
@@ -263,8 +280,12 @@ class TestImpersonatedCredentials(object):
assert headers["x-goog-api-client"] == "cred-type/imp"
@pytest.mark.parametrize("use_data_bytes", [True, False])
- def test_refresh_success(self, use_data_bytes, mock_donor_credentials):
- credentials = self.make_credentials(lifetime=None)
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ def test_refresh_success(
+ self, mock_lookup_trust_boundary, use_data_bytes, mock_donor_credentials
+ ):
+ # Start with no boundary.
+ credentials = self.make_credentials(lifetime=None, trust_boundary=None)
token = "token"
expire_time = (
@@ -278,7 +299,12 @@ class TestImpersonatedCredentials(object):
use_data_bytes=use_data_bytes,
)
- with mock.patch(
+ # Mock the trust boundary lookup to return a valid value.
+ mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ), mock.patch(
"google.auth.metrics.token_request_access_token_impersonate",
return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
):
@@ -291,6 +317,239 @@ class TestImpersonatedCredentials(object):
== ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE
)
+ # Verify that the x-allowed-locations header from the source credential
+ # was applied. The source credential has a NO_OP boundary, so the
+ # header should be an empty string.
+ request_kwargs = request.call_args[1]
+ assert "headers" in request_kwargs
+ assert "x-allowed-locations" in request_kwargs["headers"]
+ assert request_kwargs["headers"]["x-allowed-locations"] == ""
+
+ # Verify trust boundary was set.
+ assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
+
+ # Verify the mock was called with the correct URL.
+ mock_lookup_trust_boundary.assert_called_once_with(
+ request,
+ self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE,
+ headers={"authorization": "Bearer token"},
+ )
+
+ # Verify x-allowed-locations header is set correctly by apply().
+ headers_applied = {}
+ credentials.apply(headers_applied)
+ assert (
+ headers_applied["x-allowed-locations"]
+ == self.VALID_TRUST_BOUNDARY["encodedLocations"]
+ )
+
+ def test_refresh_source_creds_no_trust_boundary(self):
+ # Use a source credential that does not support trust boundaries.
+ source_credentials = credentials.Credentials(token="source_token")
+ creds = self.make_credentials(source_credentials=source_credentials)
+ token = "impersonated_token"
+
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": token, "expireTime": expire_time}
+
+ request = self.make_request(
+ data=json.dumps(response_body), status=http_client.OK
+ )
+
+ creds.refresh(request)
+
+ # Verify that the x-allowed-locations header was NOT applied because
+ # the source credential does not support trust boundaries.
+ request_kwargs = request.call_args[1]
+ assert "x-allowed-locations" not in request_kwargs["headers"]
+
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ def test_refresh_trust_boundary_lookup_fails_no_cache(
+ self, mock_lookup_trust_boundary, mock_donor_credentials
+ ):
+ # Start with no trust boundary
+ credentials = self.make_credentials(lifetime=None, trust_boundary=None)
+ token = "token"
+
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": token, "expireTime": expire_time}
+
+ request = self.make_request(
+ data=json.dumps(response_body), status=http_client.OK
+ )
+
+ # Mock the trust boundary lookup to raise an error
+ mock_lookup_trust_boundary.side_effect = exceptions.RefreshError(
+ "Lookup failed"
+ )
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ), pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials.refresh(request)
+
+ assert "Lookup failed" in str(excinfo.value)
+ assert credentials._trust_boundary is None # Still no trust boundary
+ mock_lookup_trust_boundary.assert_called_once()
+
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ def test_refresh_fetches_no_op_trust_boundary(
+ self, mock_lookup_trust_boundary, mock_donor_credentials
+ ):
+ # Start with no trust boundary
+ credentials = self.make_credentials(lifetime=None, trust_boundary=None)
+ token = "token"
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": token, "expireTime": expire_time}
+ request = self.make_request(
+ data=json.dumps(response_body), status=http_client.OK
+ )
+
+ mock_lookup_trust_boundary.return_value = (
+ self.NO_OP_TRUST_BOUNDARY
+ ) # Mock returns NO_OP
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ), mock.patch(
+ "google.auth.metrics.token_request_access_token_impersonate",
+ return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
+ ):
+ credentials.refresh(request)
+
+ assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY
+ mock_lookup_trust_boundary.assert_called_once_with(
+ request,
+ self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE,
+ headers={"authorization": "Bearer token"},
+ )
+ headers_applied = {}
+ credentials.apply(headers_applied)
+ assert headers_applied["x-allowed-locations"] == ""
+
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ def test_refresh_skips_trust_boundary_lookup_non_default_universe(
+ self, mock_lookup_trust_boundary
+ ):
+ # Create source credentials with a non-default universe domain
+ source_credentials = service_account.Credentials(
+ SIGNER,
+ TOKEN_URI,
+ universe_domain=self.FAKE_UNIVERSE_DOMAIN,
+ )
+ # Create impersonated credentials using the non-default source credentials
+ credentials = self.make_credentials(source_credentials=source_credentials)
+
+ # Mock the IAM credentials API call for generateAccessToken
+ token = "token"
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": token, "expireTime": expire_time}
+ request = self.make_request(
+ data=json.dumps(response_body), status=http_client.OK
+ )
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ credentials.refresh(request)
+
+ # Ensure trust boundary lookup was not called
+ mock_lookup_trust_boundary.assert_not_called()
+ # Verify that x-allowed-locations header is not set by apply()
+ headers_applied = {}
+ credentials.apply(headers_applied)
+ assert "x-allowed-locations" not in headers_applied
+
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ def test_refresh_starts_with_no_op_trust_boundary_skips_lookup(
+ self, mock_lookup_trust_boundary, mock_donor_credentials
+ ):
+ credentials = self.make_credentials(
+ lifetime=None, trust_boundary=self.NO_OP_TRUST_BOUNDARY
+ ) # Start with NO_OP
+ token = "token"
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": token, "expireTime": expire_time}
+ request = self.make_request(
+ data=json.dumps(response_body), status=http_client.OK
+ )
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ), mock.patch(
+ "google.auth.metrics.token_request_access_token_impersonate",
+ return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
+ ):
+ credentials.refresh(request)
+
+ # Verify trust boundary remained NO_OP
+ assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY
+
+ # Lookup should be skipped
+ mock_lookup_trust_boundary.assert_not_called()
+
+ # Verify that an empty header was added.
+ headers_applied = {}
+ credentials.apply(headers_applied)
+ assert headers_applied["x-allowed-locations"] == ""
+
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ def test_refresh_trust_boundary_lookup_fails_with_cached_data2(
+ self, mock_lookup_trust_boundary, mock_donor_credentials
+ ):
+ # Start with no trust boundary
+ credentials = self.make_credentials(lifetime=None, trust_boundary=None)
+ token = "token"
+
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": token, "expireTime": expire_time}
+
+ request = self.make_request(
+ data=json.dumps(response_body), status=http_client.OK
+ )
+
+ # First refresh: Successfully fetch a valid trust boundary.
+ mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ), mock.patch(
+ "google.auth.metrics.token_request_access_token_impersonate",
+ return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
+ ):
+ credentials.refresh(request)
+
+ assert credentials.valid
+ # Verify trust boundary was set.
+ assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
+ mock_lookup_trust_boundary.assert_called_once()
+
+ # Second refresh: Mock lookup to fail, but expect cached data to be preserved.
+ mock_lookup_trust_boundary.reset_mock()
+ mock_lookup_trust_boundary.side_effect = exceptions.RefreshError(
+ "Lookup failed"
+ )
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
+ mock_lookup_trust_boundary.assert_called_once()
+
@pytest.mark.parametrize("use_data_bytes", [True, False])
def test_refresh_with_subject_success(self, use_data_bytes, mock_dwd_credentials):
credentials = self.make_credentials(subject="[email protected]", lifetime=None)
@@ -673,6 +932,37 @@ class TestImpersonatedCredentials(object):
assert credentials.requires_scopes is False
assert credentials._target_scopes == ["fake_scope1", "fake_scope2"]
+ def test_with_trust_boundary(self):
+ credentials = self.make_credentials()
+ new_boundary = {"encodedLocations": "new_boundary"}
+ new_credentials = credentials.with_trust_boundary(new_boundary)
+
+ assert new_credentials is not credentials
+ assert new_credentials._trust_boundary == new_boundary
+ # The source credentials should be a copy, not the same object.
+ # But they should be functionally equivalent.
+ assert (
+ new_credentials._source_credentials is not credentials._source_credentials
+ )
+
+ assert (
+ new_credentials._source_credentials.service_account_email
+ == credentials._source_credentials.service_account_email
+ )
+ assert (
+ new_credentials._source_credentials._signer
+ == credentials._source_credentials._signer
+ )
+ assert new_credentials._target_principal == credentials._target_principal
+
+ def test_build_trust_boundary_lookup_url_no_email(self):
+ credentials = self.make_credentials(target_principal=None)
+
+ with pytest.raises(ValueError) as excinfo:
+ credentials._build_trust_boundary_lookup_url()
+
+ assert "Service account email is required" in str(excinfo.value)
+
def test_with_scopes_provide_default_scopes(self):
credentials = self.make_credentials()
credentials._target_scopes = []