diff options
| author | robot-piglet <[email protected]> | 2025-10-14 13:58:45 +0300 |
|---|---|---|
| committer | robot-piglet <[email protected]> | 2025-10-14 14:26:06 +0300 |
| commit | 09cc5fe0eb0747ac9ce1444c9acc944838a8cfa2 (patch) | |
| tree | 61e19f6a3c904d77e58ff647f4c9473378d6954b /contrib/python/google-auth/py3/tests/oauth2/test_service_account.py | |
| parent | e9146d8a4d0ee112c89906f9fc8ce23b92250439 (diff) | |
Intermediate changes
commit_hash:bc75ab7ba0ee5a6571045c99062e8d4a996d16dd
Diffstat (limited to 'contrib/python/google-auth/py3/tests/oauth2/test_service_account.py')
| -rw-r--r-- | contrib/python/google-auth/py3/tests/oauth2/test_service_account.py | 293 |
1 files changed, 284 insertions, 9 deletions
diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py b/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py index e60c8200f4b..12f43afc0f7 100644 --- a/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py +++ b/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py @@ -20,7 +20,9 @@ import mock import pytest # type: ignore from google.auth import _helpers +from google.auth import credentials from google.auth import crypt +from google.auth import environment_vars from google.auth import exceptions from google.auth import iam from google.auth import jwt @@ -59,14 +61,31 @@ SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1") class TestCredentials(object): SERVICE_ACCOUNT_EMAIL = "[email protected]" TOKEN_URI = "https://example.com/oauth2/token" + NO_OP_TRUST_BOUNDARY = { + "locations": credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS, + "encodedLocations": credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS, + } + VALID_TRUST_BOUNDARY = { + "locations": ["us-central1", "us-east1"], + "encodedLocations": "0xVALIDHEXSA", + } + EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = ( + "https://iamcredentials.googleapis.com/v1/projects/-" + "/serviceAccounts/[email protected]/allowedLocations" + ) @classmethod - def make_credentials(cls, universe_domain=DEFAULT_UNIVERSE_DOMAIN): + def make_credentials( + cls, + universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, # Align with Credentials class default + ): return service_account.Credentials( SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI, universe_domain=universe_domain, + trust_boundary=trust_boundary, ) def test_get_cred_info(self): @@ -252,6 +271,18 @@ class TestCredentials(object): "always_use_jwt_access should be True for non-default universe domain" ) + def test_with_trust_boundary(self): + credentials = self.make_credentials() + new_boundary = {"encodedLocations": "new_boundary"} + new_credentials = credentials.with_trust_boundary(new_boundary) + + assert new_credentials is not credentials + assert new_credentials._trust_boundary == new_boundary + assert new_credentials._signer == credentials._signer + assert ( + new_credentials.service_account_email == credentials.service_account_email + ) + def test__make_authorization_grant_assertion(self): credentials = self.make_credentials() token = credentials._make_authorization_grant_assertion() @@ -496,10 +527,42 @@ class TestCredentials(object): # Check that the credentials have the token. assert credentials.token == token - # Check that the credentials are valid (have a token and are not - # expired) + # Check that the credentials are valid (have a token and are not expired). assert credentials.valid + # Trust boundary should be None since env var is not set and no initial + # boundary was provided. + assert credentials._trust_boundary is None + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_skips_trust_boundary_lookup_non_default_universe( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + # Create credentials with a non-default universe domain + credentials = self.make_credentials(universe_domain=FAKE_UNIVERSE_DOMAIN) + token = "token" + mock_jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + # Ensure jwt_grant was called (token refresh happened) + mock_jwt_grant.assert_called_once() + # Ensure trust boundary lookup was not called + mock_lookup_trust_boundary.assert_not_called() + # Verify that x-allowed-locations header is not set by apply() + headers_applied = {} + credentials.apply(headers_applied) + assert "x-allowed-locations" not in headers_applied + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) def test_before_request_refreshes(self, jwt_grant): credentials = self.make_credentials() @@ -608,6 +671,208 @@ class TestCredentials(object): credentials.refresh(None) assert excinfo.match("domain wide delegation is not supported") + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_success_with_valid_trust_boundary( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + # Start with no boundary. + credentials = self.make_credentials(trust_boundary=None) + token = "token" + mock_jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + # Mock the trust boundary lookup to return a valid boundary. + mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + assert credentials.valid + assert credentials.token == token + + # Verify trust boundary was set. + assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + + # Verify the mock was called with the correct URL. + mock_lookup_trust_boundary.assert_called_once_with( + request, + self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, + headers={"authorization": "Bearer token"}, + ) + + # Verify x-allowed-locations header is set correctly by apply(). + headers_applied = {} + credentials.apply(headers_applied) + assert ( + headers_applied["x-allowed-locations"] + == self.VALID_TRUST_BOUNDARY["encodedLocations"] + ) + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_fetches_no_op_trust_boundary( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + # Start with no trust boundary + credentials = self.make_credentials(trust_boundary=None) + token = "token" + mock_jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + mock_lookup_trust_boundary.return_value = self.NO_OP_TRUST_BOUNDARY + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + assert credentials.valid + assert credentials.token == token + assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY + mock_lookup_trust_boundary.assert_called_once_with( + request, + self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, + headers={"authorization": "Bearer token"}, + ) + headers_applied = {} + credentials.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "" + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_starts_with_no_op_trust_boundary_skips_lookup( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + credentials = self.make_credentials( + trust_boundary=self.NO_OP_TRUST_BOUNDARY + ) # Start with NO_OP + token = "token" + mock_jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + assert credentials.valid + assert credentials.token == token + # Verify trust boundary remained NO_OP + assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY + + # Lookup should be skipped + mock_lookup_trust_boundary.assert_not_called() + + # Verify that an empty header was added. + headers_applied = {} + credentials.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "" + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_trust_boundary_lookup_fails_no_cache( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + # Start with no trust boundary + credentials = self.make_credentials(trust_boundary=None) + mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( + "Lookup failed" + ) + mock_jwt_grant.return_value = ( + "mock_access_token", + _helpers.utcnow() + datetime.timedelta(seconds=3600), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + # Mock the trust boundary lookup to raise an error + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ), pytest.raises(exceptions.RefreshError, match="Lookup failed"): + credentials.refresh(request) + + assert credentials._trust_boundary is None + mock_lookup_trust_boundary.assert_called_once() + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_trust_boundary_lookup_fails_with_cached_data( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + # Initial setup: Credentials with no trust boundary. + credentials = self.make_credentials(trust_boundary=None) + token = "token" + mock_jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + # First refresh: Successfully fetch a valid trust boundary. + mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + assert credentials.valid + assert credentials.token == token + assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + mock_lookup_trust_boundary.assert_called_once_with( + request, + self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, + headers={"authorization": "Bearer token"}, + ) + + # Second refresh: Mock lookup to fail, but expect cached data to be preserved. + mock_lookup_trust_boundary.reset_mock() + mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( + "Lookup failed" + ) + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) # This should NOT raise an exception + + assert credentials.valid # Credentials should still be valid + assert ( + credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + ) # Cached data should be preserved + mock_lookup_trust_boundary.assert_called_once_with( + request, + self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, + headers={ + "authorization": "Bearer token", + "x-allowed-locations": self.VALID_TRUST_BOUNDARY["encodedLocations"], + }, + ) # Lookup should have been attempted again + + def test_build_trust_boundary_lookup_url_no_email(self): + credentials = self.make_credentials() + credentials._service_account_email = None + + with pytest.raises(ValueError) as excinfo: + credentials._build_trust_boundary_lookup_url() + + assert "Service account email is required" in str(excinfo.value) + class TestIDTokenCredentials(object): SERVICE_ACCOUNT_EMAIL = "[email protected]" @@ -790,9 +1055,14 @@ class TestIDTokenCredentials(object): ) request = mock.Mock() credentials.refresh(request) - req, iam_endpoint, signer_email, target_audience, access_token, universe_domain = call_iam_generate_id_token_endpoint.call_args[ - 0 - ] + ( + req, + iam_endpoint, + signer_email, + target_audience, + access_token, + universe_domain, + ) = call_iam_generate_id_token_endpoint.call_args[0] assert req == request assert iam_endpoint == iam._IAM_IDTOKEN_ENDPOINT assert signer_email == "[email protected]" @@ -812,9 +1082,14 @@ class TestIDTokenCredentials(object): ) request = mock.Mock() credentials.refresh(request) - req, iam_endpoint, signer_email, target_audience, access_token, universe_domain = call_iam_generate_id_token_endpoint.call_args[ - 0 - ] + ( + req, + iam_endpoint, + signer_email, + target_audience, + access_token, + universe_domain, + ) = call_iam_generate_id_token_endpoint.call_args[0] assert req == request assert ( iam_endpoint |
