diff options
| author | robot-piglet <[email protected]> | 2025-10-14 13:58:45 +0300 |
|---|---|---|
| committer | robot-piglet <[email protected]> | 2025-10-14 14:26:06 +0300 |
| commit | 09cc5fe0eb0747ac9ce1444c9acc944838a8cfa2 (patch) | |
| tree | 61e19f6a3c904d77e58ff647f4c9473378d6954b /contrib/python/google-auth/py3/tests | |
| parent | e9146d8a4d0ee112c89906f9fc8ce23b92250439 (diff) | |
Intermediate changes
commit_hash:bc75ab7ba0ee5a6571045c99062e8d4a996d16dd
Diffstat (limited to 'contrib/python/google-auth/py3/tests')
9 files changed, 1361 insertions, 78 deletions
diff --git a/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py b/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py index 03fe845b1f3..1c77069938d 100644 --- a/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py +++ b/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py @@ -13,17 +13,20 @@ # limitations under the License. import base64 import datetime +import os import mock import pytest # type: ignore import responses # type: ignore from google.auth import _helpers +from google.auth import environment_vars from google.auth import exceptions from google.auth import jwt from google.auth import transport from google.auth.compute_engine import credentials from google.auth.transport import requests +from google.oauth2 import _client SAMPLE_ID_TOKEN_EXP = 1584393400 @@ -49,7 +52,6 @@ ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = ( ID_TOKEN_REQUEST_METRICS_HEADER_VALUE = ( "gl-python/3.7 auth/1.1 auth-request-type/it cred-type/mds" ) - FAKE_SERVICE_ACCOUNT_EMAIL = "[email protected]" FAKE_QUOTA_PROJECT_ID = "fake-quota-project" FAKE_SCOPES = ["scope1", "scope2"] @@ -60,6 +62,9 @@ FAKE_UNIVERSE_DOMAIN = "fake-universe-domain" class TestCredentials(object): credentials = None credentials_with_all_fields = None + VALID_TRUST_BOUNDARY = {"encodedLocations": "valid-encoded-locations"} + NO_OP_TRUST_BOUNDARY = {"encodedLocations": ""} + EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/default/allowedLocations" @pytest.fixture(autouse=True) def credentials_fixture(self): @@ -174,6 +179,18 @@ class TestCredentials(object): } @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_refresh_no_email(self, get): + get.return_value = { + # No "email" field. + "scopes": ["one", "two"] + } + + with pytest.raises(exceptions.RefreshError) as excinfo: + self.credentials.refresh(None) + + assert excinfo.match(r"missing 'email' field") + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) def test_refresh_error(self, get): get.side_effect = exceptions.TransportError("http error") @@ -241,6 +258,18 @@ class TestCredentials(object): assert creds.universe_domain == "universe_domain" assert creds._universe_domain_cached + def test_with_trust_boundary(self): + creds = self.credentials_with_all_fields + new_boundary = {"encodedLocations": "new_boundary"} + new_creds = creds.with_trust_boundary(new_boundary) + + assert new_creds is not creds + assert new_creds._trust_boundary == new_boundary + assert new_creds._service_account_email == creds._service_account_email + assert new_creds._quota_project_id == creds._quota_project_id + assert new_creds._scopes == creds._scopes + assert new_creds._default_scopes == creds._default_scopes + def test_token_usage_metrics(self): self.credentials.token = "token" self.credentials.expiry = None @@ -280,6 +309,355 @@ class TestCredentials(object): # domain endpoint. get_universe_domain.assert_not_called() + @mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_refresh_trust_boundary_lookup_skipped_if_env_var_not_true( + self, mock_metadata_get, mock_lookup_tb + ): + creds = self.credentials + request = mock.Mock() + + mock_metadata_get.side_effect = [ + # from _retrieve_info + {"email": "default", "scopes": ["scope1"]}, + # from get_service_account_token + {"access_token": "mock_token", "expires_in": 3600}, + ] + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "false"} + ): + creds.refresh(request) + + mock_lookup_tb.assert_not_called() + assert creds._trust_boundary is None + + @mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_refresh_trust_boundary_lookup_skipped_if_env_var_missing( + self, mock_metadata_get, mock_lookup_tb + ): + creds = self.credentials + request = mock.Mock() + + mock_metadata_get.side_effect = [ + # from _retrieve_info + {"email": "default", "scopes": ["scope1"]}, + # from get_service_account_token + {"access_token": "mock_token", "expires_in": 3600}, + ] + + with mock.patch.dict(os.environ, clear=True): + creds.refresh(request) + + mock_lookup_tb.assert_not_called() + assert creds._trust_boundary is None + + @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_refresh_trust_boundary_lookup_success( + self, mock_metadata_get, mock_lookup_tb + ): + mock_lookup_tb.return_value = { + "locations": ["us-central1"], + "encodedLocations": "0xABC", + } + creds = self.credentials + request = mock.Mock() + + # The first call to _metadata.get is for service account info, the second + # for the access token, and the third for the universe domain. + mock_metadata_get.side_effect = [ + # from _retrieve_info + {"email": "[email protected]", "scopes": ["scope1"]}, + # from get_service_account_token + {"access_token": "mock_token", "expires_in": 3600}, + # from get_universe_domain + "", + ] + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + creds.refresh(request) + + # Verify _metadata.get was called three times. + assert mock_metadata_get.call_count == 3 + # Verify lookup_trust_boundary was called with correct URL and token + expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations" + mock_lookup_tb.assert_called_once_with( + request, expected_url, headers={"authorization": "Bearer mock_token"} + ) + # Verify trust boundary was set + assert creds._trust_boundary == { + "locations": ["us-central1"], + "encodedLocations": "0xABC", + } + + # Verify x-allowed-locations header is set by apply() + headers_applied = {} + creds.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "0xABC" + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) + def test_refresh_trust_boundary_lookup_fails_no_cache( + self, mock_lookup_tb, mock_metadata_get + ): + mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed") + creds = self.credentials + request = mock.Mock() + + # Mock metadata calls for token, universe domain, and service account info + mock_metadata_get.side_effect = [ + # from _retrieve_info + {"email": "[email protected]", "scopes": ["scope1"]}, + # from get_service_account_token + {"access_token": "mock_token", "expires_in": 3600}, + # from get_universe_domain + "", + ] + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + with pytest.raises(exceptions.RefreshError, match="Lookup failed"): + creds.refresh(request) + + assert creds._trust_boundary is None + assert mock_metadata_get.call_count == 3 + mock_lookup_tb.assert_called_once() + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) + def test_refresh_trust_boundary_lookup_fails_with_cached_data( + self, mock_lookup_tb, mock_metadata_get + ): + # First refresh: Successfully fetch a valid trust boundary. + mock_lookup_tb.return_value = { + "locations": ["us-central1"], + "encodedLocations": "0xABC", + } + mock_metadata_get.side_effect = [ + # from _retrieve_info + {"email": "[email protected]", "scopes": ["scope1"]}, + # from get_service_account_token + {"access_token": "mock_token_1", "expires_in": 3600}, + # from get_universe_domain + "", + ] + creds = self.credentials + request = mock.Mock() + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + creds.refresh(request) + + assert creds._trust_boundary == { + "locations": ["us-central1"], + "encodedLocations": "0xABC", + } + mock_lookup_tb.assert_called_once() + + # Second refresh: Mock lookup to fail, but expect cached data to be preserved. + mock_lookup_tb.reset_mock() + mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed") + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + # This refresh should not raise an error because a cached value exists. + mock_metadata_get.reset_mock() + mock_metadata_get.side_effect = [ + # from _retrieve_info + {"email": "[email protected]", "scopes": ["scope1"]}, + # from get_service_account_token + {"access_token": "mock_token_2", "expires_in": 3600}, + # from get_universe_domain + "", + ] + creds.refresh(request) + + assert creds._trust_boundary == { + "locations": ["us-central1"], + "encodedLocations": "0xABC", + } + mock_lookup_tb.assert_called_once() + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) + def test_refresh_fetches_no_op_trust_boundary( + self, mock_lookup_tb, mock_metadata_get + ): + mock_lookup_tb.return_value = {"locations": [], "encodedLocations": "0x0"} + creds = self.credentials + request = mock.Mock() + + mock_metadata_get.side_effect = [ + # from _retrieve_info + {"email": "[email protected]", "scopes": ["scope1"]}, + # from get_service_account_token + {"access_token": "mock_token", "expires_in": 3600}, + # from get_universe_domain + "", + ] + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + creds.refresh(request) + + assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"} + assert mock_metadata_get.call_count == 3 + expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations" + mock_lookup_tb.assert_called_once_with( + request, expected_url, headers={"authorization": "Bearer mock_token"} + ) + # Verify that an empty header was added. + headers_applied = {} + creds.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "" + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) + def test_refresh_starts_with_no_op_trust_boundary_skips_lookup( + self, mock_lookup_tb, mock_metadata_get + ): + creds = self.credentials + # Use pre-cache universe domain to avoid an extra metadata call. + creds._universe_domain_cached = True + creds._trust_boundary = {"locations": [], "encodedLocations": "0x0"} + request = mock.Mock() + + mock_metadata_get.side_effect = [ + # from _retrieve_info + {"email": "[email protected]", "scopes": ["scope1"]}, + # from get_service_account_token + {"access_token": "mock_token", "expires_in": 3600}, + ] + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + creds.refresh(request) + + # Verify trust boundary remained NO_OP + assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"} + # Lookup should be skipped + mock_lookup_tb.assert_not_called() + # Two metadata calls for token refresh should have happened. + assert mock_metadata_get.call_count == 2 + + # Verify that an empty header was added. + headers_applied = {} + creds.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "" + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + @mock.patch( + "google.auth.compute_engine._metadata.get_universe_domain", autospec=True + ) + def test_build_trust_boundary_lookup_url_default_email( + self, mock_get_universe_domain, mock_get_service_account_info + ): + # Test with default service account email, which needs resolution + creds = self.credentials + creds._service_account_email = "default" + mock_get_service_account_info.return_value = { + "email": "[email protected]" + } + mock_get_universe_domain.return_value = "googleapis.com" + + url = creds._build_trust_boundary_lookup_url() + + mock_get_service_account_info.assert_called_once_with(mock.ANY, "default") + mock_get_universe_domain.assert_called_once_with(mock.ANY) + assert url == ( + "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations" + ) + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + @mock.patch( + "google.auth.compute_engine._metadata.get_universe_domain", autospec=True + ) + def test_build_trust_boundary_lookup_url_explicit_email( + self, mock_get_universe_domain, mock_get_service_account_info + ): + # Test with an explicit service account email, no resolution needed + creds = self.credentials + creds._service_account_email = FAKE_SERVICE_ACCOUNT_EMAIL + mock_get_universe_domain.return_value = "googleapis.com" + + url = creds._build_trust_boundary_lookup_url() + + mock_get_service_account_info.assert_not_called() + mock_get_universe_domain.assert_called_once_with(mock.ANY) + assert url == ( + "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations" + ) + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + @mock.patch( + "google.auth.compute_engine._metadata.get_universe_domain", autospec=True + ) + def test_build_trust_boundary_lookup_url_non_default_universe( + self, mock_get_universe_domain, mock_get_service_account_info + ): + # Test with a non-default universe domain + creds = self.credentials_with_all_fields + + url = creds._build_trust_boundary_lookup_url() + + # Universe domain is cached and email is explicit, so no metadata calls needed. + mock_get_service_account_info.assert_not_called() + mock_get_universe_domain.assert_not_called() + assert url == ( + "https://iamcredentials.fake-universe-domain/v1/projects/-/serviceAccounts/[email protected]/allowedLocations" + ) + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + def test_build_trust_boundary_lookup_url_get_service_account_info_fails( + self, mock_get_service_account_info + ): + # Test scenario where get_service_account_info fails + mock_get_service_account_info.side_effect = exceptions.TransportError( + "Failed to get info" + ) + creds = self.credentials + creds._service_account_email = "default" + + with pytest.raises( + exceptions.RefreshError, + match=r"Failed to get service account email for trust boundary lookup: .*", + ): + creds._build_trust_boundary_lookup_url() + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + def test_build_trust_boundary_lookup_url_no_email( + self, mock_get_service_account_info + ): + # Test with default service account email, which needs resolution, but metadata + # returns no email. + creds = self.credentials + creds._service_account_email = "default" + mock_get_service_account_info.return_value = {"scopes": ["one", "two"]} + + with pytest.raises(exceptions.RefreshError) as excinfo: + creds._build_trust_boundary_lookup_url() + + assert excinfo.match(r"missing 'email' field") + class TestIDTokenCredentials(object): credentials = None @@ -466,7 +844,7 @@ class TestIDTokenCredentials(object): @responses.activate def test_with_target_audience_integration(self): - """ Test that it is possible to refresh credentials + """Test that it is possible to refresh credentials generated from `with_target_audience`. Instead of mocking the methods, the HTTP responses @@ -634,7 +1012,7 @@ class TestIDTokenCredentials(object): @responses.activate def test_with_quota_project_integration(self): - """ Test that it is possible to refresh credentials + """Test that it is possible to refresh credentials generated from `with_quota_project`. Instead of mocking the methods, the HTTP responses diff --git a/contrib/python/google-auth/py3/tests/oauth2/test__client.py b/contrib/python/google-auth/py3/tests/oauth2/test__client.py index df77bbcd2f6..ec1725c7e29 100644 --- a/contrib/python/google-auth/py3/tests/oauth2/test__client.py +++ b/contrib/python/google-auth/py3/tests/oauth2/test__client.py @@ -325,7 +325,7 @@ def test_call_iam_generate_id_token_endpoint(): "fake_email", "fake_audience", "fake_access_token", - "googleapis.com", + universe_domain="googleapis.com", ) assert ( @@ -631,3 +631,178 @@ def test__token_endpoint_request_no_throw_with_retry(can_retry): assert mock_request.call_count == 3 else: assert mock_request.call_count == 1 + + +def test_lookup_trust_boundary(): + response_data = { + "locations": ["us-central1", "us-east1"], + "encodedLocations": "0x80080000000000", + } + + mock_response = mock.create_autospec(transport.Response, instance=True) + mock_response.status = http_client.OK + mock_response.data = json.dumps(response_data).encode("utf-8") + + mock_request = mock.create_autospec(transport.Request) + mock_request.return_value = mock_response + + url = "http://example.com" + headers = {"Authorization": "Bearer access_token"} + response = _client._lookup_trust_boundary(mock_request, url, headers=headers) + + assert response["encodedLocations"] == "0x80080000000000" + assert response["locations"] == ["us-central1", "us-east1"] + + mock_request.assert_called_once_with(method="GET", url=url, headers=headers) + + +def test_lookup_trust_boundary_no_op_response_without_locations(): + response_data = {"encodedLocations": "0x0"} + + mock_response = mock.create_autospec(transport.Response, instance=True) + mock_response.status = http_client.OK + mock_response.data = json.dumps(response_data).encode("utf-8") + + mock_request = mock.create_autospec(transport.Request) + mock_request.return_value = mock_response + + url = "http://example.com" + headers = {"Authorization": "Bearer access_token"} + # for the response to be valid, we only need encodedLocations to be present. + response = _client._lookup_trust_boundary(mock_request, url, headers=headers) + assert response["encodedLocations"] == "0x0" + assert "locations" not in response + + mock_request.assert_called_once_with(method="GET", url=url, headers=headers) + + +def test_lookup_trust_boundary_no_op_response(): + response_data = {"locations": [], "encodedLocations": "0x0"} + + mock_response = mock.create_autospec(transport.Response, instance=True) + mock_response.status = http_client.OK + mock_response.data = json.dumps(response_data).encode("utf-8") + + mock_request = mock.create_autospec(transport.Request) + mock_request.return_value = mock_response + + url = "http://example.com" + headers = {"Authorization": "Bearer access_token"} + response = _client._lookup_trust_boundary(mock_request, url, headers=headers) + + assert response["encodedLocations"] == "0x0" + assert response["locations"] == [] + + mock_request.assert_called_once_with(method="GET", url=url, headers=headers) + + +def test_lookup_trust_boundary_error(): + mock_response = mock.create_autospec(transport.Response, instance=True) + mock_response.status = http_client.INTERNAL_SERVER_ERROR + mock_response.data = "this is an error message" + + mock_request = mock.create_autospec(transport.Request) + mock_request.return_value = mock_response + + url = "http://example.com" + headers = {"Authorization": "Bearer access_token"} + with pytest.raises(exceptions.RefreshError) as excinfo: + _client._lookup_trust_boundary(mock_request, url, headers=headers) + assert excinfo.match("this is an error message") + + mock_request.assert_called_with(method="GET", url=url, headers=headers) + + +def test_lookup_trust_boundary_missing_encoded_locations(): + response_data = {"locations": [], "bad_field": "0x0"} + + mock_response = mock.create_autospec(transport.Response, instance=True) + mock_response.status = http_client.OK + mock_response.data = json.dumps(response_data).encode("utf-8") + + mock_request = mock.create_autospec(transport.Request) + mock_request.return_value = mock_response + + url = "http://example.com" + headers = {"Authorization": "Bearer access_token"} + with pytest.raises(exceptions.MalformedError) as excinfo: + _client._lookup_trust_boundary(mock_request, url, headers=headers) + assert excinfo.match("Invalid trust boundary info") + + mock_request.assert_called_once_with(method="GET", url=url, headers=headers) + + +def test_lookup_trust_boundary_internal_failure_and_retry_failure_error(): + retryable_error = mock.create_autospec(transport.Response, instance=True) + retryable_error.status = http_client.BAD_REQUEST + retryable_error.data = json.dumps({"error_description": "internal_failure"}).encode( + "utf-8" + ) + + unretryable_error = mock.create_autospec(transport.Response, instance=True) + unretryable_error.status = http_client.BAD_REQUEST + unretryable_error.data = json.dumps({"error_description": "invalid_scope"}).encode( + "utf-8" + ) + + request = mock.create_autospec(transport.Request) + + request.side_effect = [retryable_error, retryable_error, unretryable_error] + headers = {"Authorization": "Bearer access_token"} + + with pytest.raises(exceptions.RefreshError): + _client._lookup_trust_boundary_request( + request, "http://example.com", headers=headers + ) + # request should be called three times. Two retryable errors and one + # unretryable error to break the retry loop. + assert request.call_count == 3 + for call in request.call_args_list: + assert call[1]["headers"] == headers + + +def test_lookup_trust_boundary_internal_failure_and_retry_succeeds(): + retryable_error = mock.create_autospec(transport.Response, instance=True) + retryable_error.status = http_client.BAD_REQUEST + retryable_error.data = json.dumps({"error_description": "internal_failure"}).encode( + "utf-8" + ) + + response_data = {"locations": [], "encodedLocations": "0x0"} + response = mock.create_autospec(transport.Response, instance=True) + response.status = http_client.OK + response.data = json.dumps(response_data).encode("utf-8") + + request = mock.create_autospec(transport.Request) + + headers = {"Authorization": "Bearer access_token"} + request.side_effect = [retryable_error, response] + + _ = _client._lookup_trust_boundary_request( + request, "http://example.com", headers=headers + ) + + assert request.call_count == 2 + for call in request.call_args_list: + assert call[1]["headers"] == headers + + +def test_lookup_trust_boundary_with_headers(): + response_data = { + "locations": ["us-central1", "us-east1"], + "encodedLocations": "0x80080000000000", + } + + mock_response = mock.create_autospec(transport.Response, instance=True) + mock_response.status = http_client.OK + mock_response.data = json.dumps(response_data).encode("utf-8") + + mock_request = mock.create_autospec(transport.Request) + mock_request.return_value = mock_response + headers = {"Authorization": "Bearer access_token", "x-test-header": "test-value"} + + _client._lookup_trust_boundary(mock_request, "http://example.com", headers=headers) + + mock_request.assert_called_once_with( + method="GET", url="http://example.com", headers=headers + ) diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py b/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py index e60c8200f4b..12f43afc0f7 100644 --- a/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py +++ b/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py @@ -20,7 +20,9 @@ import mock import pytest # type: ignore from google.auth import _helpers +from google.auth import credentials from google.auth import crypt +from google.auth import environment_vars from google.auth import exceptions from google.auth import iam from google.auth import jwt @@ -59,14 +61,31 @@ SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1") class TestCredentials(object): SERVICE_ACCOUNT_EMAIL = "[email protected]" TOKEN_URI = "https://example.com/oauth2/token" + NO_OP_TRUST_BOUNDARY = { + "locations": credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS, + "encodedLocations": credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS, + } + VALID_TRUST_BOUNDARY = { + "locations": ["us-central1", "us-east1"], + "encodedLocations": "0xVALIDHEXSA", + } + EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = ( + "https://iamcredentials.googleapis.com/v1/projects/-" + "/serviceAccounts/[email protected]/allowedLocations" + ) @classmethod - def make_credentials(cls, universe_domain=DEFAULT_UNIVERSE_DOMAIN): + def make_credentials( + cls, + universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, # Align with Credentials class default + ): return service_account.Credentials( SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI, universe_domain=universe_domain, + trust_boundary=trust_boundary, ) def test_get_cred_info(self): @@ -252,6 +271,18 @@ class TestCredentials(object): "always_use_jwt_access should be True for non-default universe domain" ) + def test_with_trust_boundary(self): + credentials = self.make_credentials() + new_boundary = {"encodedLocations": "new_boundary"} + new_credentials = credentials.with_trust_boundary(new_boundary) + + assert new_credentials is not credentials + assert new_credentials._trust_boundary == new_boundary + assert new_credentials._signer == credentials._signer + assert ( + new_credentials.service_account_email == credentials.service_account_email + ) + def test__make_authorization_grant_assertion(self): credentials = self.make_credentials() token = credentials._make_authorization_grant_assertion() @@ -496,10 +527,42 @@ class TestCredentials(object): # Check that the credentials have the token. assert credentials.token == token - # Check that the credentials are valid (have a token and are not - # expired) + # Check that the credentials are valid (have a token and are not expired). assert credentials.valid + # Trust boundary should be None since env var is not set and no initial + # boundary was provided. + assert credentials._trust_boundary is None + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_skips_trust_boundary_lookup_non_default_universe( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + # Create credentials with a non-default universe domain + credentials = self.make_credentials(universe_domain=FAKE_UNIVERSE_DOMAIN) + token = "token" + mock_jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + # Ensure jwt_grant was called (token refresh happened) + mock_jwt_grant.assert_called_once() + # Ensure trust boundary lookup was not called + mock_lookup_trust_boundary.assert_not_called() + # Verify that x-allowed-locations header is not set by apply() + headers_applied = {} + credentials.apply(headers_applied) + assert "x-allowed-locations" not in headers_applied + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) def test_before_request_refreshes(self, jwt_grant): credentials = self.make_credentials() @@ -608,6 +671,208 @@ class TestCredentials(object): credentials.refresh(None) assert excinfo.match("domain wide delegation is not supported") + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_success_with_valid_trust_boundary( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + # Start with no boundary. + credentials = self.make_credentials(trust_boundary=None) + token = "token" + mock_jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + # Mock the trust boundary lookup to return a valid boundary. + mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + assert credentials.valid + assert credentials.token == token + + # Verify trust boundary was set. + assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + + # Verify the mock was called with the correct URL. + mock_lookup_trust_boundary.assert_called_once_with( + request, + self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, + headers={"authorization": "Bearer token"}, + ) + + # Verify x-allowed-locations header is set correctly by apply(). + headers_applied = {} + credentials.apply(headers_applied) + assert ( + headers_applied["x-allowed-locations"] + == self.VALID_TRUST_BOUNDARY["encodedLocations"] + ) + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_fetches_no_op_trust_boundary( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + # Start with no trust boundary + credentials = self.make_credentials(trust_boundary=None) + token = "token" + mock_jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + mock_lookup_trust_boundary.return_value = self.NO_OP_TRUST_BOUNDARY + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + assert credentials.valid + assert credentials.token == token + assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY + mock_lookup_trust_boundary.assert_called_once_with( + request, + self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, + headers={"authorization": "Bearer token"}, + ) + headers_applied = {} + credentials.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "" + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_starts_with_no_op_trust_boundary_skips_lookup( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + credentials = self.make_credentials( + trust_boundary=self.NO_OP_TRUST_BOUNDARY + ) # Start with NO_OP + token = "token" + mock_jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + assert credentials.valid + assert credentials.token == token + # Verify trust boundary remained NO_OP + assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY + + # Lookup should be skipped + mock_lookup_trust_boundary.assert_not_called() + + # Verify that an empty header was added. + headers_applied = {} + credentials.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "" + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_trust_boundary_lookup_fails_no_cache( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + # Start with no trust boundary + credentials = self.make_credentials(trust_boundary=None) + mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( + "Lookup failed" + ) + mock_jwt_grant.return_value = ( + "mock_access_token", + _helpers.utcnow() + datetime.timedelta(seconds=3600), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + # Mock the trust boundary lookup to raise an error + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ), pytest.raises(exceptions.RefreshError, match="Lookup failed"): + credentials.refresh(request) + + assert credentials._trust_boundary is None + mock_lookup_trust_boundary.assert_called_once() + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_trust_boundary_lookup_fails_with_cached_data( + self, mock_jwt_grant, mock_lookup_trust_boundary + ): + # Initial setup: Credentials with no trust boundary. + credentials = self.make_credentials(trust_boundary=None) + token = "token" + mock_jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + # First refresh: Successfully fetch a valid trust boundary. + mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + assert credentials.valid + assert credentials.token == token + assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + mock_lookup_trust_boundary.assert_called_once_with( + request, + self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, + headers={"authorization": "Bearer token"}, + ) + + # Second refresh: Mock lookup to fail, but expect cached data to be preserved. + mock_lookup_trust_boundary.reset_mock() + mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( + "Lookup failed" + ) + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) # This should NOT raise an exception + + assert credentials.valid # Credentials should still be valid + assert ( + credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + ) # Cached data should be preserved + mock_lookup_trust_boundary.assert_called_once_with( + request, + self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, + headers={ + "authorization": "Bearer token", + "x-allowed-locations": self.VALID_TRUST_BOUNDARY["encodedLocations"], + }, + ) # Lookup should have been attempted again + + def test_build_trust_boundary_lookup_url_no_email(self): + credentials = self.make_credentials() + credentials._service_account_email = None + + with pytest.raises(ValueError) as excinfo: + credentials._build_trust_boundary_lookup_url() + + assert "Service account email is required" in str(excinfo.value) + class TestIDTokenCredentials(object): SERVICE_ACCOUNT_EMAIL = "[email protected]" @@ -790,9 +1055,14 @@ class TestIDTokenCredentials(object): ) request = mock.Mock() credentials.refresh(request) - req, iam_endpoint, signer_email, target_audience, access_token, universe_domain = call_iam_generate_id_token_endpoint.call_args[ - 0 - ] + ( + req, + iam_endpoint, + signer_email, + target_audience, + access_token, + universe_domain, + ) = call_iam_generate_id_token_endpoint.call_args[0] assert req == request assert iam_endpoint == iam._IAM_IDTOKEN_ENDPOINT assert signer_email == "[email protected]" @@ -812,9 +1082,14 @@ class TestIDTokenCredentials(object): ) request = mock.Mock() credentials.refresh(request) - req, iam_endpoint, signer_email, target_audience, access_token, universe_domain = call_iam_generate_id_token_endpoint.call_args[ - 0 - ] + ( + req, + iam_endpoint, + signer_email, + target_audience, + access_token, + universe_domain, + ) = call_iam_generate_id_token_endpoint.call_args[0] assert req == request assert ( iam_endpoint diff --git a/contrib/python/google-auth/py3/tests/test__helpers.py b/contrib/python/google-auth/py3/tests/test__helpers.py index a4337c01608..ce3ec11e29c 100644 --- a/contrib/python/google-auth/py3/tests/test__helpers.py +++ b/contrib/python/google-auth/py3/tests/test__helpers.py @@ -20,7 +20,7 @@ import urllib import pytest # type: ignore -from google.auth import _helpers +from google.auth import _helpers, exceptions # _MOCK_BASE_LOGGER_NAME is the base logger namespace used for testing. _MOCK_BASE_LOGGER_NAME = "foogle" @@ -234,6 +234,33 @@ def test_unpadded_urlsafe_b64encode(): assert _helpers.unpadded_urlsafe_b64encode(case) == expected +def test_get_bool_from_env(monkeypatch): + # Test default value when environment variable is not set. + assert _helpers.get_bool_from_env("TEST_VAR") is False + assert _helpers.get_bool_from_env("TEST_VAR", default=True) is True + + # Test true values (case-insensitive) + for true_value in ("true", "True", "TRUE", "1"): + monkeypatch.setenv("TEST_VAR", true_value) + assert _helpers.get_bool_from_env("TEST_VAR") is True + + # Test false values (case-insensitive) + for false_value in ("false", "False", "FALSE", "0"): + monkeypatch.setenv("TEST_VAR", false_value) + assert _helpers.get_bool_from_env("TEST_VAR") is False + + # Test invalid value + monkeypatch.setenv("TEST_VAR", "invalid_value") + with pytest.raises(exceptions.InvalidValue) as excinfo: + _helpers.get_bool_from_env("TEST_VAR") + assert 'must be one of "true", "false", "1", or "0"' in str(excinfo.value) + + # Test empty string value + monkeypatch.setenv("TEST_VAR", "") + with pytest.raises(exceptions.InvalidValue): + _helpers.get_bool_from_env("TEST_VAR") + + def test_hash_sensitive_info_basic(): test_data = { "expires_in": 3599, diff --git a/contrib/python/google-auth/py3/tests/test_aws.py b/contrib/python/google-auth/py3/tests/test_aws.py index df1f02e7d70..41ce970d1e8 100644 --- a/contrib/python/google-auth/py3/tests/test_aws.py +++ b/contrib/python/google-auth/py3/tests/test_aws.py @@ -920,7 +920,7 @@ class TestCredentials(object): assert request_kwargs["body"] is not None body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) assert len(body_tuples) == len(request_data.keys()) - for (k, v) in body_tuples: + for k, v in body_tuples: assert v.decode("utf-8") == request_data[k.decode("utf-8")] @classmethod @@ -2057,7 +2057,9 @@ class TestCredentials(object): "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), "x-goog-user-project": QUOTA_PROJECT_ID, "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # TODO(negarb): Uncomment and update when trust boundary is supported + # for external account credentials. + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -2150,7 +2152,7 @@ class TestCredentials(object): "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), "x-goog-user-project": QUOTA_PROJECT_ID, "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -2345,7 +2347,7 @@ class TestCredentials(object): "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), "x-goog-user-project": QUOTA_PROJECT_ID, "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, diff --git a/contrib/python/google-auth/py3/tests/test_credentials.py b/contrib/python/google-auth/py3/tests/test_credentials.py index e11bcb4e551..1fb88009691 100644 --- a/contrib/python/google-auth/py3/tests/test_credentials.py +++ b/contrib/python/google-auth/py3/tests/test_credentials.py @@ -13,17 +13,21 @@ # limitations under the License. import datetime +import os import mock import pytest # type: ignore from google.auth import _helpers from google.auth import credentials +from google.auth import environment_vars +from google.auth import exceptions +from google.oauth2 import _client -class CredentialsImpl(credentials.Credentials): - def refresh(self, request): - self.token = request +class CredentialsImpl(credentials.CredentialsWithTrustBoundary): + def _refresh_token(self, request): + self.token = "refreshed-token" self.expiry = ( datetime.datetime.utcnow() + _helpers.REFRESH_THRESHOLD @@ -33,6 +37,10 @@ class CredentialsImpl(credentials.Credentials): def with_quota_project(self, quota_project_id): raise NotImplementedError() + def _build_trust_boundary_lookup_url(self): + # Using self.token here to make the URL dynamic for testing purposes + return "http://mock.url/lookup_for_{}".format(self.token) + class CredentialsImplWithMetrics(credentials.Credentials): def refresh(self, request): @@ -89,49 +97,49 @@ def test_expired_and_valid(): def test_before_request(): credentials = CredentialsImpl() - request = "token" + request = mock.Mock() headers = {} # First call should call refresh, setting the token. credentials.before_request(request, "http://example.com", "GET", headers) assert credentials.valid - assert credentials.token == "token" - assert headers["authorization"] == "Bearer token" + assert credentials.token == "refreshed-token" + assert headers["authorization"] == "Bearer refreshed-token" assert "x-allowed-locations" not in headers - request = "token2" + request = mock.Mock() headers = {} # Second call shouldn't call refresh. credentials.before_request(request, "http://example.com", "GET", headers) assert credentials.valid - assert credentials.token == "token" - assert headers["authorization"] == "Bearer token" + assert credentials.token == "refreshed-token" + assert headers["authorization"] == "Bearer refreshed-token" assert "x-allowed-locations" not in headers def test_before_request_with_trust_boundary(): DUMMY_BOUNDARY = "0xA30" credentials = CredentialsImpl() - credentials._trust_boundary = {"locations": [], "encoded_locations": DUMMY_BOUNDARY} - request = "token" + credentials._trust_boundary = {"locations": [], "encodedLocations": DUMMY_BOUNDARY} + request = mock.Mock() headers = {} # First call should call refresh, setting the token. credentials.before_request(request, "http://example.com", "GET", headers) assert credentials.valid - assert credentials.token == "token" - assert headers["authorization"] == "Bearer token" + assert credentials.token == "refreshed-token" + assert headers["authorization"] == "Bearer refreshed-token" assert headers["x-allowed-locations"] == DUMMY_BOUNDARY - request = "token2" + request = mock.Mock() headers = {} # Second call shouldn't call refresh. credentials.before_request(request, "http://example.com", "GET", headers) assert credentials.valid - assert credentials.token == "token" - assert headers["authorization"] == "Bearer token" + assert credentials.token == "refreshed-token" + assert headers["authorization"] == "Bearer refreshed-token" assert headers["x-allowed-locations"] == DUMMY_BOUNDARY @@ -198,6 +206,18 @@ def test_readonly_scoped_credentials_scopes(): assert credentials.has_scopes(["one", "two"]) assert not credentials.has_scopes(["three"]) + # Test with default scopes + credentials_with_default = ReadOnlyScopedCredentialsImpl() + credentials_with_default._default_scopes = ["one", "two"] + assert credentials_with_default.has_scopes(["one", "two"]) + assert not credentials_with_default.has_scopes(["three"]) + + # Test with no scopes + credentials_no_scopes = ReadOnlyScopedCredentialsImpl() + assert not credentials_no_scopes.has_scopes(["one"]) + + assert credentials_no_scopes.has_scopes([]) + def test_readonly_scoped_credentials_requires_scopes(): credentials = ReadOnlyScopedCredentialsImpl() @@ -245,7 +265,7 @@ def test_nonblocking_refresh_fresh_credentials(): c._refresh_worker = mock.MagicMock() - request = "token" + request = mock.Mock() c.refresh(request) assert c.token_state == credentials.TokenState.FRESH @@ -258,7 +278,7 @@ def test_nonblocking_refresh_invalid_credentials(): c = CredentialsImpl() c.with_non_blocking_refresh() - request = "token" + request = mock.Mock() headers = {} assert c.token_state == credentials.TokenState.INVALID @@ -266,8 +286,8 @@ def test_nonblocking_refresh_invalid_credentials(): c.before_request(request, "http://example.com", "GET", headers) assert c.token_state == credentials.TokenState.FRESH assert c.valid - assert c.token == "token" - assert headers["authorization"] == "Bearer token" + assert c.token == "refreshed-token" + assert headers["authorization"] == "Bearer refreshed-token" assert "x-identity-trust-boundary" not in headers @@ -275,7 +295,7 @@ def test_nonblocking_refresh_stale_credentials(): c = CredentialsImpl() c.with_non_blocking_refresh() - request = "token" + request = mock.Mock() headers = {} # Invalid credentials MUST require a blocking refresh. @@ -296,8 +316,8 @@ def test_nonblocking_refresh_stale_credentials(): assert c.token_state == credentials.TokenState.FRESH assert c.valid - assert c.token == "token" - assert headers["authorization"] == "Bearer token" + assert c.token == "refreshed-token" + assert headers["authorization"] == "Bearer refreshed-token" assert "x-identity-trust-boundary" not in headers @@ -305,7 +325,7 @@ def test_nonblocking_refresh_failed_credentials(): c = CredentialsImpl() c.with_non_blocking_refresh() - request = "token" + request = mock.Mock() headers = {} # Invalid credentials MUST require a blocking refresh. @@ -328,18 +348,130 @@ def test_nonblocking_refresh_failed_credentials(): assert c.token_state == credentials.TokenState.FRESH assert c.valid - assert c.token == "token" - assert headers["authorization"] == "Bearer token" + assert c.token == "refreshed-token" + assert headers["authorization"] == "Bearer refreshed-token" assert "x-identity-trust-boundary" not in headers def test_token_state_no_expiry(): c = CredentialsImpl() - request = "token" + request = mock.Mock() c.refresh(request) c.expiry = None assert c.token_state == credentials.TokenState.FRESH c.before_request(request, "http://example.com", "GET", {}) + + +class TestCredentialsWithTrustBoundary(object): + @mock.patch.object(_client, "_lookup_trust_boundary") + def test_lookup_trust_boundary_env_var_not_true(self, mock_lookup_tb): + creds = CredentialsImpl() + request = mock.Mock() + + # Ensure env var is not "true" + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "false"} + ): + result = creds._refresh_trust_boundary(request) + + assert result is None + mock_lookup_tb.assert_not_called() + + @mock.patch.object(_client, "_lookup_trust_boundary") + def test_lookup_trust_boundary_env_var_missing(self, mock_lookup_tb): + creds = CredentialsImpl() + request = mock.Mock() + + # Ensure env var is missing + with mock.patch.dict(os.environ, clear=True): + result = creds._refresh_trust_boundary(request) + + assert result is None + mock_lookup_tb.assert_not_called() + + @mock.patch.object(_client, "_lookup_trust_boundary") + def test_lookup_trust_boundary_non_default_universe(self, mock_lookup_tb): + creds = CredentialsImpl() + creds._universe_domain = "my.universe.com" # Non-GDU + request = mock.Mock() + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + result = creds._refresh_trust_boundary(request) + + assert result is None + mock_lookup_tb.assert_not_called() + + @mock.patch.object(_client, "_lookup_trust_boundary") + def test_lookup_trust_boundary_calls_client_and_build_url(self, mock_lookup_tb): + creds = CredentialsImpl() + creds.token = "test_token" # For _build_trust_boundary_lookup_url + request = mock.Mock() + expected_url = "http://mock.url/lookup_for_test_token" + expected_boundary_info = {"encodedLocations": "0xABC"} + mock_lookup_tb.return_value = expected_boundary_info + + # Mock _build_trust_boundary_lookup_url to ensure it's called. + mock_build_url = mock.Mock(return_value=expected_url) + creds._build_trust_boundary_lookup_url = mock_build_url + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + result = creds._lookup_trust_boundary(request) + + assert result == expected_boundary_info + mock_build_url.assert_called_once() + expected_headers = {"authorization": "Bearer test_token"} + mock_lookup_tb.assert_called_once_with( + request, expected_url, headers=expected_headers + ) + + @mock.patch.object(_client, "_lookup_trust_boundary") + def test_lookup_trust_boundary_build_url_returns_none(self, mock_lookup_tb): + creds = CredentialsImpl() + request = mock.Mock() + + # Mock _build_trust_boundary_lookup_url to return None + mock_build_url = mock.Mock(return_value=None) + creds._build_trust_boundary_lookup_url = mock_build_url + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + with pytest.raises( + exceptions.InvalidValue, + match="Failed to build trust boundary lookup URL.", + ): + creds._lookup_trust_boundary(request) + + mock_build_url.assert_called_once() # Ensure _build_trust_boundary_lookup_url was called + mock_lookup_tb.assert_not_called() # Ensure _client.lookup_trust_boundary was not called + + @mock.patch("google.auth.credentials._LOGGER") + @mock.patch("google.auth._helpers.is_logging_enabled", return_value=True) + @mock.patch.object(_client, "_lookup_trust_boundary") + def test_refresh_trust_boundary_fails_with_cached_data_and_logging( + self, mock_lookup_tb, mock_is_logging_enabled, mock_logger + ): + creds = CredentialsImpl() + creds._trust_boundary = {"encodedLocations": "0xABC"} + request = mock.Mock() + + refresh_error = exceptions.RefreshError("Lookup failed") + mock_lookup_tb.side_effect = refresh_error + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + creds.refresh(request) + + mock_lookup_tb.assert_called_once() + mock_is_logging_enabled.assert_called_once_with(mock_logger) + mock_logger.debug.assert_called_once_with( + "Using cached trust boundary due to refresh error: %s", refresh_error + ) diff --git a/contrib/python/google-auth/py3/tests/test_external_account.py b/contrib/python/google-auth/py3/tests/test_external_account.py index bddcb4afa1a..d86a19bef16 100644 --- a/contrib/python/google-auth/py3/tests/test_external_account.py +++ b/contrib/python/google-auth/py3/tests/test_external_account.py @@ -247,7 +247,7 @@ class TestCredentials(object): assert "cert" not in request_kwargs assert request_kwargs["body"] is not None body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) - for (k, v) in body_tuples: + for k, v in body_tuples: assert v.decode("utf-8") == request_data[k.decode("utf-8")] assert len(body_tuples) == len(request_data.keys()) @@ -920,7 +920,9 @@ class TestCredentials(object): "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # TODO(negarb): Uncomment and update when trust boundary is supported + # for external account credentials. + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1010,7 +1012,7 @@ class TestCredentials(object): "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1097,7 +1099,7 @@ class TestCredentials(object): "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1331,7 +1333,7 @@ class TestCredentials(object): "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1415,7 +1417,7 @@ class TestCredentials(object): "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1471,7 +1473,7 @@ class TestCredentials(object): assert headers == { "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } def test_apply_workforce_without_quota_project_id(self): @@ -1488,7 +1490,7 @@ class TestCredentials(object): assert headers == { "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } def test_apply_impersonation_without_quota_project_id(self): @@ -1520,7 +1522,7 @@ class TestCredentials(object): assert headers == { "authorization": "Bearer {}".format(impersonation_response["accessToken"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } def test_apply_with_quota_project_id(self): @@ -1537,7 +1539,7 @@ class TestCredentials(object): "other": "header-value", "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), "x-goog-user-project": self.QUOTA_PROJECT_ID, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } def test_apply_impersonation_with_quota_project_id(self): @@ -1572,7 +1574,7 @@ class TestCredentials(object): "other": "header-value", "authorization": "Bearer {}".format(impersonation_response["accessToken"]), "x-goog-user-project": self.QUOTA_PROJECT_ID, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } def test_before_request(self): @@ -1588,7 +1590,7 @@ class TestCredentials(object): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } # Second call shouldn't call refresh. @@ -1597,7 +1599,7 @@ class TestCredentials(object): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } def test_before_request_workforce(self): @@ -1615,7 +1617,7 @@ class TestCredentials(object): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } # Second call shouldn't call refresh. @@ -1624,7 +1626,7 @@ class TestCredentials(object): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } def test_before_request_impersonation(self): @@ -1655,7 +1657,7 @@ class TestCredentials(object): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(impersonation_response["accessToken"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } # Second call shouldn't call refresh. @@ -1664,7 +1666,7 @@ class TestCredentials(object): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(impersonation_response["accessToken"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } @mock.patch("google.auth._helpers.utcnow") @@ -1693,7 +1695,7 @@ class TestCredentials(object): # Cached token should be used. assert headers == { "authorization": "Bearer token", - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } # Next call should simulate 1 second passed. @@ -1709,7 +1711,7 @@ class TestCredentials(object): # New token should be retrieved. assert headers == { "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } @mock.patch("google.auth._helpers.utcnow") @@ -1754,7 +1756,7 @@ class TestCredentials(object): # Cached token should be used. assert headers == { "authorization": "Bearer token", - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } # Next call should simulate 1 second passed. This will trigger the expiration @@ -1773,7 +1775,7 @@ class TestCredentials(object): # New token should be retrieved. assert headers == { "authorization": "Bearer {}".format(impersonation_response["accessToken"]), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } @pytest.mark.parametrize( @@ -1872,7 +1874,7 @@ class TestCredentials(object): "x-goog-user-project": self.QUOTA_PROJECT_ID, "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1926,7 +1928,7 @@ class TestCredentials(object): "authorization": "Bearer {}".format( impersonation_response["accessToken"] ), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", }, ) @@ -1998,7 +2000,7 @@ class TestCredentials(object): "authorization": "Bearer {}".format( self.SUCCESS_RESPONSE["access_token"] ), - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", }, ) @@ -2048,7 +2050,7 @@ class TestCredentials(object): "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - "x-allowed-locations": "0x0", + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, diff --git a/contrib/python/google-auth/py3/tests/test_identity_pool.py b/contrib/python/google-auth/py3/tests/test_identity_pool.py index 4d78a5c22ea..8ca2892e2e6 100644 --- a/contrib/python/google-auth/py3/tests/test_identity_pool.py +++ b/contrib/python/google-auth/py3/tests/test_identity_pool.py @@ -285,7 +285,7 @@ class TestCredentials(object): assert request_kwargs["body"] is not None body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) assert len(body_tuples) == len(request_data.keys()) - for (k, v) in body_tuples: + for k, v in body_tuples: assert v.decode("utf-8") == request_data[k.decode("utf-8")] @classmethod @@ -384,7 +384,9 @@ class TestCredentials(object): "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": metrics_header_value, - "x-allowed-locations": "0x0", + # TODO(negarb): Uncomment and update when trust boundary is supported + # for external account credentials. + # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, diff --git a/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py b/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py index 9aeb505fdd9..a9f45f88948 100644 --- a/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py +++ b/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py @@ -22,7 +22,9 @@ import mock import pytest # type: ignore from google.auth import _helpers +from google.auth import credentials as auth_credentials from google.auth import crypt +from google.auth import environment_vars from google.auth import exceptions from google.auth import impersonated_credentials from google.auth import transport @@ -128,8 +130,21 @@ class TestImpersonatedCredentials(object): # Because Python 2.7: DELEGATES = [] # type: ignore LIFETIME = 3600 + NO_OP_TRUST_BOUNDARY = { + "locations": auth_credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS, + "encodedLocations": auth_credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS, + } + VALID_TRUST_BOUNDARY = { + "locations": ["us-central1", "us-east1"], + "encodedLocations": "0xVALIDHEX", + } + EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = ( + "https://iamcredentials.googleapis.com/v1/projects/-" + "/serviceAccounts/[email protected]/allowedLocations" + ) + FAKE_UNIVERSE_DOMAIN = "universe.foo" SOURCE_CREDENTIALS = service_account.Credentials( - SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI + SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI, trust_boundary=NO_OP_TRUST_BOUNDARY ) USER_SOURCE_CREDENTIALS = credentials.Credentials(token="ABCDE") IAM_ENDPOINT_OVERRIDE = ( @@ -144,6 +159,7 @@ class TestImpersonatedCredentials(object): target_principal=TARGET_PRINCIPAL, subject=None, iam_endpoint_override=None, + trust_boundary=None, # Align with Credentials class default ): return Credentials( @@ -154,6 +170,7 @@ class TestImpersonatedCredentials(object): lifetime=lifetime, subject=subject, iam_endpoint_override=iam_endpoint_override, + trust_boundary=trust_boundary, ) def test_from_impersonated_service_account_info(self): @@ -163,7 +180,7 @@ class TestImpersonatedCredentials(object): assert isinstance(credentials, impersonated_credentials.Credentials) def test_from_impersonated_service_account_info_with_invalid_source_credentials_type( - self + self, ): info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO) assert "source_credentials" in info @@ -178,7 +195,7 @@ class TestImpersonatedCredentials(object): ) def test_from_impersonated_service_account_info_with_invalid_impersonation_url( - self + self, ): info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO) info["service_account_impersonation_url"] = "invalid_url" @@ -263,8 +280,12 @@ class TestImpersonatedCredentials(object): assert headers["x-goog-api-client"] == "cred-type/imp" @pytest.mark.parametrize("use_data_bytes", [True, False]) - def test_refresh_success(self, use_data_bytes, mock_donor_credentials): - credentials = self.make_credentials(lifetime=None) + @mock.patch("google.oauth2._client._lookup_trust_boundary") + def test_refresh_success( + self, mock_lookup_trust_boundary, use_data_bytes, mock_donor_credentials + ): + # Start with no boundary. + credentials = self.make_credentials(lifetime=None, trust_boundary=None) token = "token" expire_time = ( @@ -278,7 +299,12 @@ class TestImpersonatedCredentials(object): use_data_bytes=use_data_bytes, ) - with mock.patch( + # Mock the trust boundary lookup to return a valid value. + mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ), mock.patch( "google.auth.metrics.token_request_access_token_impersonate", return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, ): @@ -291,6 +317,239 @@ class TestImpersonatedCredentials(object): == ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE ) + # Verify that the x-allowed-locations header from the source credential + # was applied. The source credential has a NO_OP boundary, so the + # header should be an empty string. + request_kwargs = request.call_args[1] + assert "headers" in request_kwargs + assert "x-allowed-locations" in request_kwargs["headers"] + assert request_kwargs["headers"]["x-allowed-locations"] == "" + + # Verify trust boundary was set. + assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + + # Verify the mock was called with the correct URL. + mock_lookup_trust_boundary.assert_called_once_with( + request, + self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, + headers={"authorization": "Bearer token"}, + ) + + # Verify x-allowed-locations header is set correctly by apply(). + headers_applied = {} + credentials.apply(headers_applied) + assert ( + headers_applied["x-allowed-locations"] + == self.VALID_TRUST_BOUNDARY["encodedLocations"] + ) + + def test_refresh_source_creds_no_trust_boundary(self): + # Use a source credential that does not support trust boundaries. + source_credentials = credentials.Credentials(token="source_token") + creds = self.make_credentials(source_credentials=source_credentials) + token = "impersonated_token" + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + + creds.refresh(request) + + # Verify that the x-allowed-locations header was NOT applied because + # the source credential does not support trust boundaries. + request_kwargs = request.call_args[1] + assert "x-allowed-locations" not in request_kwargs["headers"] + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + def test_refresh_trust_boundary_lookup_fails_no_cache( + self, mock_lookup_trust_boundary, mock_donor_credentials + ): + # Start with no trust boundary + credentials = self.make_credentials(lifetime=None, trust_boundary=None) + token = "token" + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + + # Mock the trust boundary lookup to raise an error + mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( + "Lookup failed" + ) + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ), pytest.raises(exceptions.RefreshError) as excinfo: + credentials.refresh(request) + + assert "Lookup failed" in str(excinfo.value) + assert credentials._trust_boundary is None # Still no trust boundary + mock_lookup_trust_boundary.assert_called_once() + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + def test_refresh_fetches_no_op_trust_boundary( + self, mock_lookup_trust_boundary, mock_donor_credentials + ): + # Start with no trust boundary + credentials = self.make_credentials(lifetime=None, trust_boundary=None) + token = "token" + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + + mock_lookup_trust_boundary.return_value = ( + self.NO_OP_TRUST_BOUNDARY + ) # Mock returns NO_OP + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ), mock.patch( + "google.auth.metrics.token_request_access_token_impersonate", + return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, + ): + credentials.refresh(request) + + assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY + mock_lookup_trust_boundary.assert_called_once_with( + request, + self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, + headers={"authorization": "Bearer token"}, + ) + headers_applied = {} + credentials.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "" + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + def test_refresh_skips_trust_boundary_lookup_non_default_universe( + self, mock_lookup_trust_boundary + ): + # Create source credentials with a non-default universe domain + source_credentials = service_account.Credentials( + SIGNER, + "[email protected]", + TOKEN_URI, + universe_domain=self.FAKE_UNIVERSE_DOMAIN, + ) + # Create impersonated credentials using the non-default source credentials + credentials = self.make_credentials(source_credentials=source_credentials) + + # Mock the IAM credentials API call for generateAccessToken + token = "token" + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + # Ensure trust boundary lookup was not called + mock_lookup_trust_boundary.assert_not_called() + # Verify that x-allowed-locations header is not set by apply() + headers_applied = {} + credentials.apply(headers_applied) + assert "x-allowed-locations" not in headers_applied + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + def test_refresh_starts_with_no_op_trust_boundary_skips_lookup( + self, mock_lookup_trust_boundary, mock_donor_credentials + ): + credentials = self.make_credentials( + lifetime=None, trust_boundary=self.NO_OP_TRUST_BOUNDARY + ) # Start with NO_OP + token = "token" + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ), mock.patch( + "google.auth.metrics.token_request_access_token_impersonate", + return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, + ): + credentials.refresh(request) + + # Verify trust boundary remained NO_OP + assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY + + # Lookup should be skipped + mock_lookup_trust_boundary.assert_not_called() + + # Verify that an empty header was added. + headers_applied = {} + credentials.apply(headers_applied) + assert headers_applied["x-allowed-locations"] == "" + + @mock.patch("google.oauth2._client._lookup_trust_boundary") + def test_refresh_trust_boundary_lookup_fails_with_cached_data2( + self, mock_lookup_trust_boundary, mock_donor_credentials + ): + # Start with no trust boundary + credentials = self.make_credentials(lifetime=None, trust_boundary=None) + token = "token" + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + + # First refresh: Successfully fetch a valid trust boundary. + mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ), mock.patch( + "google.auth.metrics.token_request_access_token_impersonate", + return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, + ): + credentials.refresh(request) + + assert credentials.valid + # Verify trust boundary was set. + assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + mock_lookup_trust_boundary.assert_called_once() + + # Second refresh: Mock lookup to fail, but expect cached data to be preserved. + mock_lookup_trust_boundary.reset_mock() + mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( + "Lookup failed" + ) + + with mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + assert credentials.valid + assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + mock_lookup_trust_boundary.assert_called_once() + @pytest.mark.parametrize("use_data_bytes", [True, False]) def test_refresh_with_subject_success(self, use_data_bytes, mock_dwd_credentials): credentials = self.make_credentials(subject="[email protected]", lifetime=None) @@ -673,6 +932,37 @@ class TestImpersonatedCredentials(object): assert credentials.requires_scopes is False assert credentials._target_scopes == ["fake_scope1", "fake_scope2"] + def test_with_trust_boundary(self): + credentials = self.make_credentials() + new_boundary = {"encodedLocations": "new_boundary"} + new_credentials = credentials.with_trust_boundary(new_boundary) + + assert new_credentials is not credentials + assert new_credentials._trust_boundary == new_boundary + # The source credentials should be a copy, not the same object. + # But they should be functionally equivalent. + assert ( + new_credentials._source_credentials is not credentials._source_credentials + ) + + assert ( + new_credentials._source_credentials.service_account_email + == credentials._source_credentials.service_account_email + ) + assert ( + new_credentials._source_credentials._signer + == credentials._source_credentials._signer + ) + assert new_credentials._target_principal == credentials._target_principal + + def test_build_trust_boundary_lookup_url_no_email(self): + credentials = self.make_credentials(target_principal=None) + + with pytest.raises(ValueError) as excinfo: + credentials._build_trust_boundary_lookup_url() + + assert "Service account email is required" in str(excinfo.value) + def test_with_scopes_provide_default_scopes(self): credentials = self.make_credentials() credentials._target_scopes = [] |
