summaryrefslogtreecommitdiffstats
path: root/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2025-10-14 13:58:45 +0300
committerrobot-piglet <[email protected]>2025-10-14 14:26:06 +0300
commit09cc5fe0eb0747ac9ce1444c9acc944838a8cfa2 (patch)
tree61e19f6a3c904d77e58ff647f4c9473378d6954b /contrib/python/google-auth/py3/tests/oauth2/test_service_account.py
parente9146d8a4d0ee112c89906f9fc8ce23b92250439 (diff)
Intermediate changes
commit_hash:bc75ab7ba0ee5a6571045c99062e8d4a996d16dd
Diffstat (limited to 'contrib/python/google-auth/py3/tests/oauth2/test_service_account.py')
-rw-r--r--contrib/python/google-auth/py3/tests/oauth2/test_service_account.py293
1 files changed, 284 insertions, 9 deletions
diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py b/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py
index e60c8200f4b..12f43afc0f7 100644
--- a/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py
+++ b/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py
@@ -20,7 +20,9 @@ import mock
import pytest # type: ignore
from google.auth import _helpers
+from google.auth import credentials
from google.auth import crypt
+from google.auth import environment_vars
from google.auth import exceptions
from google.auth import iam
from google.auth import jwt
@@ -59,14 +61,31 @@ SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
class TestCredentials(object):
SERVICE_ACCOUNT_EMAIL = "[email protected]"
TOKEN_URI = "https://example.com/oauth2/token"
+ NO_OP_TRUST_BOUNDARY = {
+ "locations": credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS,
+ "encodedLocations": credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS,
+ }
+ VALID_TRUST_BOUNDARY = {
+ "locations": ["us-central1", "us-east1"],
+ "encodedLocations": "0xVALIDHEXSA",
+ }
+ EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = (
+ "https://iamcredentials.googleapis.com/v1/projects/-"
+ "/serviceAccounts/[email protected]/allowedLocations"
+ )
@classmethod
- def make_credentials(cls, universe_domain=DEFAULT_UNIVERSE_DOMAIN):
+ def make_credentials(
+ cls,
+ universe_domain=DEFAULT_UNIVERSE_DOMAIN,
+ trust_boundary=None, # Align with Credentials class default
+ ):
return service_account.Credentials(
SIGNER,
cls.SERVICE_ACCOUNT_EMAIL,
cls.TOKEN_URI,
universe_domain=universe_domain,
+ trust_boundary=trust_boundary,
)
def test_get_cred_info(self):
@@ -252,6 +271,18 @@ class TestCredentials(object):
"always_use_jwt_access should be True for non-default universe domain"
)
+ def test_with_trust_boundary(self):
+ credentials = self.make_credentials()
+ new_boundary = {"encodedLocations": "new_boundary"}
+ new_credentials = credentials.with_trust_boundary(new_boundary)
+
+ assert new_credentials is not credentials
+ assert new_credentials._trust_boundary == new_boundary
+ assert new_credentials._signer == credentials._signer
+ assert (
+ new_credentials.service_account_email == credentials.service_account_email
+ )
+
def test__make_authorization_grant_assertion(self):
credentials = self.make_credentials()
token = credentials._make_authorization_grant_assertion()
@@ -496,10 +527,42 @@ class TestCredentials(object):
# Check that the credentials have the token.
assert credentials.token == token
- # Check that the credentials are valid (have a token and are not
- # expired)
+ # Check that the credentials are valid (have a token and are not expired).
assert credentials.valid
+ # Trust boundary should be None since env var is not set and no initial
+ # boundary was provided.
+ assert credentials._trust_boundary is None
+
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
+ def test_refresh_skips_trust_boundary_lookup_non_default_universe(
+ self, mock_jwt_grant, mock_lookup_trust_boundary
+ ):
+ # Create credentials with a non-default universe domain
+ credentials = self.make_credentials(universe_domain=FAKE_UNIVERSE_DOMAIN)
+ token = "token"
+ mock_jwt_grant.return_value = (
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ {},
+ )
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ credentials.refresh(request)
+
+ # Ensure jwt_grant was called (token refresh happened)
+ mock_jwt_grant.assert_called_once()
+ # Ensure trust boundary lookup was not called
+ mock_lookup_trust_boundary.assert_not_called()
+ # Verify that x-allowed-locations header is not set by apply()
+ headers_applied = {}
+ credentials.apply(headers_applied)
+ assert "x-allowed-locations" not in headers_applied
+
@mock.patch("google.oauth2._client.jwt_grant", autospec=True)
def test_before_request_refreshes(self, jwt_grant):
credentials = self.make_credentials()
@@ -608,6 +671,208 @@ class TestCredentials(object):
credentials.refresh(None)
assert excinfo.match("domain wide delegation is not supported")
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
+ def test_refresh_success_with_valid_trust_boundary(
+ self, mock_jwt_grant, mock_lookup_trust_boundary
+ ):
+ # Start with no boundary.
+ credentials = self.make_credentials(trust_boundary=None)
+ token = "token"
+ mock_jwt_grant.return_value = (
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ {},
+ )
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ # Mock the trust boundary lookup to return a valid boundary.
+ mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert credentials.token == token
+
+ # Verify trust boundary was set.
+ assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
+
+ # Verify the mock was called with the correct URL.
+ mock_lookup_trust_boundary.assert_called_once_with(
+ request,
+ self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE,
+ headers={"authorization": "Bearer token"},
+ )
+
+ # Verify x-allowed-locations header is set correctly by apply().
+ headers_applied = {}
+ credentials.apply(headers_applied)
+ assert (
+ headers_applied["x-allowed-locations"]
+ == self.VALID_TRUST_BOUNDARY["encodedLocations"]
+ )
+
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
+ def test_refresh_fetches_no_op_trust_boundary(
+ self, mock_jwt_grant, mock_lookup_trust_boundary
+ ):
+ # Start with no trust boundary
+ credentials = self.make_credentials(trust_boundary=None)
+ token = "token"
+ mock_jwt_grant.return_value = (
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ {},
+ )
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ mock_lookup_trust_boundary.return_value = self.NO_OP_TRUST_BOUNDARY
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert credentials.token == token
+ assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY
+ mock_lookup_trust_boundary.assert_called_once_with(
+ request,
+ self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE,
+ headers={"authorization": "Bearer token"},
+ )
+ headers_applied = {}
+ credentials.apply(headers_applied)
+ assert headers_applied["x-allowed-locations"] == ""
+
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
+ def test_refresh_starts_with_no_op_trust_boundary_skips_lookup(
+ self, mock_jwt_grant, mock_lookup_trust_boundary
+ ):
+ credentials = self.make_credentials(
+ trust_boundary=self.NO_OP_TRUST_BOUNDARY
+ ) # Start with NO_OP
+ token = "token"
+ mock_jwt_grant.return_value = (
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ {},
+ )
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert credentials.token == token
+ # Verify trust boundary remained NO_OP
+ assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY
+
+ # Lookup should be skipped
+ mock_lookup_trust_boundary.assert_not_called()
+
+ # Verify that an empty header was added.
+ headers_applied = {}
+ credentials.apply(headers_applied)
+ assert headers_applied["x-allowed-locations"] == ""
+
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
+ def test_refresh_trust_boundary_lookup_fails_no_cache(
+ self, mock_jwt_grant, mock_lookup_trust_boundary
+ ):
+ # Start with no trust boundary
+ credentials = self.make_credentials(trust_boundary=None)
+ mock_lookup_trust_boundary.side_effect = exceptions.RefreshError(
+ "Lookup failed"
+ )
+ mock_jwt_grant.return_value = (
+ "mock_access_token",
+ _helpers.utcnow() + datetime.timedelta(seconds=3600),
+ {},
+ )
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ # Mock the trust boundary lookup to raise an error
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ), pytest.raises(exceptions.RefreshError, match="Lookup failed"):
+ credentials.refresh(request)
+
+ assert credentials._trust_boundary is None
+ mock_lookup_trust_boundary.assert_called_once()
+
+ @mock.patch("google.oauth2._client._lookup_trust_boundary")
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
+ def test_refresh_trust_boundary_lookup_fails_with_cached_data(
+ self, mock_jwt_grant, mock_lookup_trust_boundary
+ ):
+ # Initial setup: Credentials with no trust boundary.
+ credentials = self.make_credentials(trust_boundary=None)
+ token = "token"
+ mock_jwt_grant.return_value = (
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ {},
+ )
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ # First refresh: Successfully fetch a valid trust boundary.
+ mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert credentials.token == token
+ assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
+ mock_lookup_trust_boundary.assert_called_once_with(
+ request,
+ self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE,
+ headers={"authorization": "Bearer token"},
+ )
+
+ # Second refresh: Mock lookup to fail, but expect cached data to be preserved.
+ mock_lookup_trust_boundary.reset_mock()
+ mock_lookup_trust_boundary.side_effect = exceptions.RefreshError(
+ "Lookup failed"
+ )
+
+ with mock.patch.dict(
+ os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
+ ):
+ credentials.refresh(request) # This should NOT raise an exception
+
+ assert credentials.valid # Credentials should still be valid
+ assert (
+ credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
+ ) # Cached data should be preserved
+ mock_lookup_trust_boundary.assert_called_once_with(
+ request,
+ self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE,
+ headers={
+ "authorization": "Bearer token",
+ "x-allowed-locations": self.VALID_TRUST_BOUNDARY["encodedLocations"],
+ },
+ ) # Lookup should have been attempted again
+
+ def test_build_trust_boundary_lookup_url_no_email(self):
+ credentials = self.make_credentials()
+ credentials._service_account_email = None
+
+ with pytest.raises(ValueError) as excinfo:
+ credentials._build_trust_boundary_lookup_url()
+
+ assert "Service account email is required" in str(excinfo.value)
+
class TestIDTokenCredentials(object):
SERVICE_ACCOUNT_EMAIL = "[email protected]"
@@ -790,9 +1055,14 @@ class TestIDTokenCredentials(object):
)
request = mock.Mock()
credentials.refresh(request)
- req, iam_endpoint, signer_email, target_audience, access_token, universe_domain = call_iam_generate_id_token_endpoint.call_args[
- 0
- ]
+ (
+ req,
+ iam_endpoint,
+ signer_email,
+ target_audience,
+ access_token,
+ universe_domain,
+ ) = call_iam_generate_id_token_endpoint.call_args[0]
assert req == request
assert iam_endpoint == iam._IAM_IDTOKEN_ENDPOINT
assert signer_email == "[email protected]"
@@ -812,9 +1082,14 @@ class TestIDTokenCredentials(object):
)
request = mock.Mock()
credentials.refresh(request)
- req, iam_endpoint, signer_email, target_audience, access_token, universe_domain = call_iam_generate_id_token_endpoint.call_args[
- 0
- ]
+ (
+ req,
+ iam_endpoint,
+ signer_email,
+ target_audience,
+ access_token,
+ universe_domain,
+ ) = call_iam_generate_id_token_endpoint.call_args[0]
assert req == request
assert (
iam_endpoint