diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-11-21 11:14:32 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-11-21 11:23:42 +0300 |
commit | ed5b11df0064bcf6ef0d03aa0f1a34f4aa97851d (patch) | |
tree | 89842a7525bcc7f2809cbbc4360f6394d0ef929b /contrib/python/google-auth/py3/tests | |
parent | 532123792431edd487519a254f2248603c2056e7 (diff) | |
download | ydb-ed5b11df0064bcf6ef0d03aa0f1a34f4aa97851d.tar.gz |
Intermediate changes
commit_hash:9085ddac9f80e60b5b938027d444ed98e80ef95a
Diffstat (limited to 'contrib/python/google-auth/py3/tests')
8 files changed, 218 insertions, 15 deletions
diff --git a/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py b/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py index a06dc4fa191..03ba8de497d 100644 --- a/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py +++ b/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py @@ -399,7 +399,7 @@ def test_get_universe_domain_success(): request.assert_called_once_with( method="GET", - url=_metadata._METADATA_ROOT + "universe/universe_domain", + url=_metadata._METADATA_ROOT + "universe/universe-domain", headers=_metadata._METADATA_HEADERS, ) assert universe_domain == "fake_universe_domain" @@ -412,7 +412,7 @@ def test_get_universe_domain_success_empty_response(): request.assert_called_once_with( method="GET", - url=_metadata._METADATA_ROOT + "universe/universe_domain", + url=_metadata._METADATA_ROOT + "universe/universe-domain", headers=_metadata._METADATA_HEADERS, ) assert universe_domain == "googleapis.com" @@ -427,7 +427,7 @@ def test_get_universe_domain_not_found(): request.assert_called_once_with( method="GET", - url=_metadata._METADATA_ROOT + "universe/universe_domain", + url=_metadata._METADATA_ROOT + "universe/universe-domain", headers=_metadata._METADATA_HEADERS, ) assert universe_domain == "googleapis.com" @@ -447,7 +447,7 @@ def test_get_universe_domain_retryable_error_failure(): request.assert_called_with( method="GET", - url=_metadata._METADATA_ROOT + "universe/universe_domain", + url=_metadata._METADATA_ROOT + "universe/universe-domain", headers=_metadata._METADATA_HEADERS, ) assert request.call_count == 5 @@ -489,12 +489,12 @@ def test_get_universe_domain_retryable_error_success(): request_error.assert_called_once_with( method="GET", - url=_metadata._METADATA_ROOT + "universe/universe_domain", + url=_metadata._METADATA_ROOT + "universe/universe-domain", headers=_metadata._METADATA_HEADERS, ) request_ok.assert_called_once_with( method="GET", - url=_metadata._METADATA_ROOT + "universe/universe_domain", + url=_metadata._METADATA_ROOT + "universe/universe-domain", headers=_metadata._METADATA_HEADERS, ) @@ -513,7 +513,7 @@ def test_get_universe_domain_other_error(): request.assert_called_once_with( method="GET", - url=_metadata._METADATA_ROOT + "universe/universe_domain", + url=_metadata._METADATA_ROOT + "universe/universe-domain", headers=_metadata._METADATA_HEADERS, ) diff --git a/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py b/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py index 662210fa412..fddfb7f64d3 100644 --- a/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py +++ b/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py @@ -487,6 +487,16 @@ class TestIDTokenCredentials(object): }, ) + # mock information about universe_domain + responses.add( + responses.GET, + "http://metadata.google.internal/computeMetadata/v1/universe/" + "universe-domain", + status=200, + content_type="application/json", + json={}, + ) + # mock token for credentials responses.add( responses.GET, @@ -659,6 +669,16 @@ class TestIDTokenCredentials(object): }, ) + # stubby response about universe_domain + responses.add( + responses.GET, + "http://metadata.google.internal/computeMetadata/v1/universe/" + "universe-domain", + status=200, + content_type="application/json", + json={}, + ) + # mock sign blob endpoint signature = base64.b64encode(b"some-signature").decode("utf-8") responses.add( diff --git a/contrib/python/google-auth/py3/tests/data/impersonated_service_account_external_account_authorized_user_source.json b/contrib/python/google-auth/py3/tests/data/impersonated_service_account_external_account_authorized_user_source.json new file mode 100644 index 00000000000..0bc44c13a28 --- /dev/null +++ b/contrib/python/google-auth/py3/tests/data/impersonated_service_account_external_account_authorized_user_source.json @@ -0,0 +1,16 @@ +{ + "delegates": [ + "service-account-delegate@example.com" + ], + "service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/service-account-target@example.com:generateAccessToken", + "source_credentials": { + "type": "external_account_authorized_user", + "audience": "//iam.googleapis.com/locations/global/workforcePools/$WORKFORCE_POOL_ID/providers/$PROVIDER_ID", + "refresh_token": "refreshToken", + "token_url": "https://sts.googleapis.com/v1/oauth/token", + "token_info_url": "https://sts.googleapis.com/v1/instrospect", + "client_id": "clientId", + "client_secret": "clientSecret" + }, + "type": "impersonated_service_account" +}
\ No newline at end of file diff --git a/contrib/python/google-auth/py3/tests/oauth2/test__client.py b/contrib/python/google-auth/py3/tests/oauth2/test__client.py index 8736a4e27be..df77bbcd2f6 100644 --- a/contrib/python/google-auth/py3/tests/oauth2/test__client.py +++ b/contrib/python/google-auth/py3/tests/oauth2/test__client.py @@ -325,6 +325,7 @@ def test_call_iam_generate_id_token_endpoint(): "fake_email", "fake_audience", "fake_access_token", + "googleapis.com", ) assert ( @@ -362,6 +363,7 @@ def test_call_iam_generate_id_token_endpoint_no_id_token(): "fake_email", "fake_audience", "fake_access_token", + "googleapis.com", ) assert excinfo.match("No ID token in response") diff --git a/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py b/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py index fe02e828e73..e60c8200f4b 100644 --- a/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py +++ b/contrib/python/google-auth/py3/tests/oauth2/test_service_account.py @@ -790,7 +790,7 @@ class TestIDTokenCredentials(object): ) request = mock.Mock() credentials.refresh(request) - req, iam_endpoint, signer_email, target_audience, access_token = call_iam_generate_id_token_endpoint.call_args[ + req, iam_endpoint, signer_email, target_audience, access_token, universe_domain = call_iam_generate_id_token_endpoint.call_args[ 0 ] assert req == request @@ -812,7 +812,7 @@ class TestIDTokenCredentials(object): ) request = mock.Mock() credentials.refresh(request) - req, iam_endpoint, signer_email, target_audience, access_token = call_iam_generate_id_token_endpoint.call_args[ + req, iam_endpoint, signer_email, target_audience, access_token, universe_domain = call_iam_generate_id_token_endpoint.call_args[ 0 ] assert req == request diff --git a/contrib/python/google-auth/py3/tests/test__default.py b/contrib/python/google-auth/py3/tests/test__default.py index 3147d505dab..f71594fcc5e 100644 --- a/contrib/python/google-auth/py3/tests/test__default.py +++ b/contrib/python/google-auth/py3/tests/test__default.py @@ -154,6 +154,11 @@ IMPERSONATED_SERVICE_ACCOUNT_SERVICE_ACCOUNT_SOURCE_FILE = os.path.join( DATA_DIR, "impersonated_service_account_service_account_source.json" ) +IMPERSONATED_SERVICE_ACCOUNT_EXTERNAL_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE = os.path.join( + DATA_DIR, + "impersonated_service_account_external_account_authorized_user_source.json", +) + EXTERNAL_ACCOUNT_AUTHORIZED_USER_FILE = os.path.join( DATA_DIR, "external_account_authorized_user.json" ) @@ -366,6 +371,17 @@ def test_load_credentials_from_file_impersonated_with_service_account_source(): assert not credentials._quota_project_id +def test_load_credentials_from_file_impersonated_with_external_account_authorized_user_source(): + credentials, _ = _default.load_credentials_from_file( + IMPERSONATED_SERVICE_ACCOUNT_EXTERNAL_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE + ) + assert isinstance(credentials, impersonated_credentials.Credentials) + assert isinstance( + credentials._source_credentials, external_account_authorized_user.Credentials + ) + assert not credentials._quota_project_id + + def test_load_credentials_from_file_impersonated_passing_quota_project(): credentials, _ = _default.load_credentials_from_file( IMPERSONATED_SERVICE_ACCOUNT_SERVICE_ACCOUNT_SOURCE_FILE, diff --git a/contrib/python/google-auth/py3/tests/test_iam.py b/contrib/python/google-auth/py3/tests/test_iam.py index 6706afb4b52..01c2fa085aa 100644 --- a/contrib/python/google-auth/py3/tests/test_iam.py +++ b/contrib/python/google-auth/py3/tests/test_iam.py @@ -91,6 +91,7 @@ class TestSigner(object): assert returned_signature == signature kwargs = request.call_args[1] assert kwargs["headers"]["Content-Type"] == "application/json" + request.call_count == 1 def test_sign_bytes_failure(self): request = make_request(http_client.UNAUTHORIZED) @@ -100,3 +101,15 @@ class TestSigner(object): with pytest.raises(exceptions.TransportError): signer.sign("123") + request.call_count == 1 + + @mock.patch("time.sleep", return_value=None) + def test_sign_bytes_retryable_failure(self, mock_time): + request = make_request(http_client.INTERNAL_SERVER_ERROR) + credentials = make_credentials() + + signer = iam.Signer(request, credentials, mock.sentinel.service_account_email) + + with pytest.raises(exceptions.TransportError): + signer.sign("123") + request.call_count == 3 diff --git a/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py b/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py index 4fb68103a8d..371477b8a9f 100644 --- a/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py +++ b/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py @@ -147,6 +147,13 @@ class TestImpersonatedCredentials(object): "principal": "impersonated@project.iam.gserviceaccount.com", } + def test_universe_domain_matching_source(self): + source_credentials = service_account.Credentials( + SIGNER, "some@email.com", TOKEN_URI, universe_domain="foo.bar" + ) + credentials = self.make_credentials(source_credentials=source_credentials) + assert credentials.universe_domain == "foo.bar" + def test__make_copy_get_cred_info(self): credentials = self.make_credentials() credentials._cred_file_path = "/path/to/file" @@ -233,6 +240,38 @@ class TestImpersonatedCredentials(object): ) @pytest.mark.parametrize("use_data_bytes", [True, False]) + def test_refresh_success_nonGdu(self, use_data_bytes, mock_donor_credentials): + source_credentials = service_account.Credentials( + SIGNER, "some@email.com", TOKEN_URI, universe_domain="foo.bar" + ) + credentials = self.make_credentials( + lifetime=None, source_credentials=source_credentials + ) + token = "token" + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + + request = self.make_request( + data=json.dumps(response_body), + status=http_client.OK, + use_data_bytes=use_data_bytes, + ) + + credentials.refresh(request) + + assert credentials.valid + assert not credentials.expired + # Confirm override endpoint used. + request_kwargs = request.call_args[1] + assert ( + request_kwargs["url"] + == "https://iamcredentials.foo.bar/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:generateAccessToken" + ) + + @pytest.mark.parametrize("use_data_bytes", [True, False]) def test_refresh_success_iam_endpoint_override( self, use_data_bytes, mock_donor_credentials ): @@ -398,6 +437,38 @@ class TestImpersonatedCredentials(object): def test_sign_bytes(self, mock_donor_credentials, mock_authorizedsession_sign): credentials = self.make_credentials(lifetime=None) + expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:signBlob" + self._sign_bytes_helper( + credentials, + mock_donor_credentials, + mock_authorizedsession_sign, + expected_url, + ) + + def test_sign_bytes_nonGdu( + self, mock_donor_credentials, mock_authorizedsession_sign + ): + source_credentials = service_account.Credentials( + SIGNER, "some@email.com", TOKEN_URI, universe_domain="foo.bar" + ) + credentials = self.make_credentials( + lifetime=None, source_credentials=source_credentials + ) + expected_url = "https://iamcredentials.foo.bar/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:signBlob" + self._sign_bytes_helper( + credentials, + mock_donor_credentials, + mock_authorizedsession_sign, + expected_url, + ) + + def _sign_bytes_helper( + self, + credentials, + mock_donor_credentials, + mock_authorizedsession_sign, + expected_url, + ): token = "token" expire_time = ( @@ -413,11 +484,19 @@ class TestImpersonatedCredentials(object): request.return_value = response credentials.refresh(request) - assert credentials.valid assert not credentials.expired signature = credentials.sign_bytes(b"signed bytes") + mock_authorizedsession_sign.assert_called_with( + mock.ANY, + "POST", + expected_url, + None, + json={"payload": "c2lnbmVkIGJ5dGVz", "delegates": []}, + headers={"Content-Type": "application/json"}, + ) + assert signature == b"signature" def test_sign_bytes_failure(self): @@ -427,12 +506,28 @@ class TestImpersonatedCredentials(object): "google.auth.transport.requests.AuthorizedSession.request", autospec=True ) as auth_session: data = {"error": {"code": 403, "message": "unauthorized"}} - auth_session.return_value = MockResponse(data, http_client.FORBIDDEN) + mock_response = MockResponse(data, http_client.UNAUTHORIZED) + auth_session.return_value = mock_response with pytest.raises(exceptions.TransportError) as excinfo: credentials.sign_bytes(b"foo") assert excinfo.match("'code': 403") + @mock.patch("time.sleep", return_value=None) + def test_sign_bytes_retryable_failure(self, mock_time): + credentials = self.make_credentials(lifetime=None) + + with mock.patch( + "google.auth.transport.requests.AuthorizedSession.request", autospec=True + ) as auth_session: + data = {"error": {"code": 500, "message": "internal_failure"}} + mock_response = MockResponse(data, http_client.INTERNAL_SERVER_ERROR) + auth_session.return_value = mock_response + + with pytest.raises(exceptions.TransportError) as excinfo: + credentials.sign_bytes(b"foo") + assert excinfo.match("exhausted signBlob endpoint retries") + def test_with_quota_project(self): credentials = self.make_credentials() @@ -548,6 +643,45 @@ class TestImpersonatedCredentials(object): self, mock_donor_credentials, mock_authorizedsession_idtoken ): credentials = self.make_credentials(lifetime=None) + target_credentials = self.make_credentials(lifetime=None) + expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:generateIdToken" + self._test_id_token_helper( + credentials, + target_credentials, + mock_donor_credentials, + mock_authorizedsession_idtoken, + expected_url, + ) + + def test_id_token_from_credential_nonGdu( + self, mock_donor_credentials, mock_authorizedsession_idtoken + ): + source_credentials = service_account.Credentials( + SIGNER, "some@email.com", TOKEN_URI, universe_domain="foo.bar" + ) + credentials = self.make_credentials( + lifetime=None, source_credentials=source_credentials + ) + target_credentials = self.make_credentials( + lifetime=None, source_credentials=source_credentials + ) + expected_url = "https://iamcredentials.foo.bar/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:generateIdToken" + self._test_id_token_helper( + credentials, + target_credentials, + mock_donor_credentials, + mock_authorizedsession_idtoken, + expected_url, + ) + + def _test_id_token_helper( + self, + credentials, + target_credentials, + mock_donor_credentials, + mock_authorizedsession_idtoken, + expected_url, + ): token = "token" target_audience = "https://foo.bar" @@ -565,17 +699,19 @@ class TestImpersonatedCredentials(object): assert credentials.valid assert not credentials.expired - new_credentials = self.make_credentials(lifetime=None) - id_creds = impersonated_credentials.IDTokenCredentials( credentials, target_audience=target_audience, include_email=True ) - id_creds = id_creds.from_credentials(target_credentials=new_credentials) + id_creds = id_creds.from_credentials(target_credentials=target_credentials) id_creds.refresh(request) + args = mock_authorizedsession_idtoken.call_args.args + + assert args[2] == expected_url + assert id_creds.token == ID_TOKEN_DATA assert id_creds._include_email is True - assert id_creds._target_credentials is new_credentials + assert id_creds._target_credentials is target_credentials def test_id_token_with_target_audience( self, mock_donor_credentials, mock_authorizedsession_idtoken |