diff options
author | alexv-smirnov <alex@ydb.tech> | 2023-12-01 12:02:50 +0300 |
---|---|---|
committer | alexv-smirnov <alex@ydb.tech> | 2023-12-01 13:28:10 +0300 |
commit | 0e578a4c44d4abd539d9838347b9ebafaca41dfb (patch) | |
tree | a0c1969c37f818c830ebeff9c077eacf30be6ef8 /contrib/python/google-auth/py3/tests/test_impersonated_credentials.py | |
parent | 84f2d3d4cc985e63217cff149bd2e6d67ae6fe22 (diff) | |
download | ydb-0e578a4c44d4abd539d9838347b9ebafaca41dfb.tar.gz |
Change "ya.make"
Diffstat (limited to 'contrib/python/google-auth/py3/tests/test_impersonated_credentials.py')
-rw-r--r-- | contrib/python/google-auth/py3/tests/test_impersonated_credentials.py | 660 |
1 files changed, 660 insertions, 0 deletions
diff --git a/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py b/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py new file mode 100644 index 0000000000..d63d2d5d3b --- /dev/null +++ b/contrib/python/google-auth/py3/tests/test_impersonated_credentials.py @@ -0,0 +1,660 @@ +# Copyright 2018 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import http.client as http_client +import json +import os + +import mock +import pytest # type: ignore + +from google.auth import _helpers +from google.auth import crypt +from google.auth import exceptions +from google.auth import impersonated_credentials +from google.auth import transport +from google.auth.impersonated_credentials import Credentials +from google.oauth2 import credentials +from google.oauth2 import service_account + +import yatest.common +DATA_DIR = os.path.join(yatest.common.test_source_path(), "data") + +with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh: + PRIVATE_KEY_BYTES = fh.read() + +SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json") + +ID_TOKEN_DATA = ( + "eyJhbGciOiJSUzI1NiIsImtpZCI6ImRmMzc1ODkwOGI3OTIyOTNhZDk3N2Ew" + "Yjk5MWQ5OGE3N2Y0ZWVlY2QiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwc" + "zovL2Zvby5iYXIiLCJhenAiOiIxMDIxMDE1NTA4MzQyMDA3MDg1NjgiLCJle" + "HAiOjE1NjQ0NzUwNTEsImlhdCI6MTU2NDQ3MTQ1MSwiaXNzIjoiaHR0cHM6L" + "y9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTAyMTAxNTUwODM0MjAwN" + "zA4NTY4In0.redacted" +) +ID_TOKEN_EXPIRY = 1564475051 + +with open(SERVICE_ACCOUNT_JSON_FILE, "rb") as fh: + SERVICE_ACCOUNT_INFO = json.load(fh) + +SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1") +TOKEN_URI = "https://example.com/oauth2/token" + +ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = ( + "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/imp" +) +ID_TOKEN_REQUEST_METRICS_HEADER_VALUE = ( + "gl-python/3.7 auth/1.1 auth-request-type/it cred-type/imp" +) + + +@pytest.fixture +def mock_donor_credentials(): + with mock.patch("google.oauth2._client.jwt_grant", autospec=True) as grant: + grant.return_value = ( + "source token", + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + yield grant + + +class MockResponse: + def __init__(self, json_data, status_code): + self.json_data = json_data + self.status_code = status_code + + def json(self): + return self.json_data + + +@pytest.fixture +def mock_authorizedsession_sign(): + with mock.patch( + "google.auth.transport.requests.AuthorizedSession.request", autospec=True + ) as auth_session: + data = {"keyId": "1", "signedBlob": "c2lnbmF0dXJl"} + auth_session.return_value = MockResponse(data, http_client.OK) + yield auth_session + + +@pytest.fixture +def mock_authorizedsession_idtoken(): + with mock.patch( + "google.auth.transport.requests.AuthorizedSession.request", autospec=True + ) as auth_session: + data = {"token": ID_TOKEN_DATA} + auth_session.return_value = MockResponse(data, http_client.OK) + yield auth_session + + +class TestImpersonatedCredentials(object): + + SERVICE_ACCOUNT_EMAIL = "service-account@example.com" + TARGET_PRINCIPAL = "impersonated@project.iam.gserviceaccount.com" + TARGET_SCOPES = ["https://www.googleapis.com/auth/devstorage.read_only"] + # DELEGATES: List[str] = [] + # Because Python 2.7: + DELEGATES = [] # type: ignore + LIFETIME = 3600 + SOURCE_CREDENTIALS = service_account.Credentials( + SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI + ) + USER_SOURCE_CREDENTIALS = credentials.Credentials(token="ABCDE") + IAM_ENDPOINT_OVERRIDE = ( + "https://us-east1-iamcredentials.googleapis.com/v1/projects/-" + + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL) + ) + + def make_credentials( + self, + source_credentials=SOURCE_CREDENTIALS, + lifetime=LIFETIME, + target_principal=TARGET_PRINCIPAL, + iam_endpoint_override=None, + ): + + return Credentials( + source_credentials=source_credentials, + target_principal=target_principal, + target_scopes=self.TARGET_SCOPES, + delegates=self.DELEGATES, + lifetime=lifetime, + iam_endpoint_override=iam_endpoint_override, + ) + + def test_make_from_user_credentials(self): + credentials = self.make_credentials( + source_credentials=self.USER_SOURCE_CREDENTIALS + ) + assert not credentials.valid + assert credentials.expired + + def test_default_state(self): + credentials = self.make_credentials() + assert not credentials.valid + assert credentials.expired + + def test_make_from_service_account_self_signed_jwt(self): + source_credentials = service_account.Credentials( + SIGNER, self.SERVICE_ACCOUNT_EMAIL, TOKEN_URI, always_use_jwt_access=True + ) + credentials = self.make_credentials(source_credentials=source_credentials) + # test the source credential don't lose self signed jwt setting + assert credentials._source_credentials._always_use_jwt_access + assert credentials._source_credentials._jwt_credentials + + def make_request( + self, + data, + status=http_client.OK, + headers=None, + side_effect=None, + use_data_bytes=True, + ): + response = mock.create_autospec(transport.Response, instance=False) + response.status = status + response.data = _helpers.to_bytes(data) if use_data_bytes else data + response.headers = headers or {} + + request = mock.create_autospec(transport.Request, instance=False) + request.side_effect = side_effect + request.return_value = response + + return request + + def test_token_usage_metrics(self): + credentials = self.make_credentials() + credentials.token = "token" + credentials.expiry = None + + headers = {} + credentials.before_request(mock.Mock(), None, None, headers) + assert headers["authorization"] == "Bearer token" + assert headers["x-goog-api-client"] == "cred-type/imp" + + @pytest.mark.parametrize("use_data_bytes", [True, False]) + def test_refresh_success(self, use_data_bytes, mock_donor_credentials): + credentials = self.make_credentials(lifetime=None) + token = "token" + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + + request = self.make_request( + data=json.dumps(response_body), + status=http_client.OK, + use_data_bytes=use_data_bytes, + ) + + with mock.patch( + "google.auth.metrics.token_request_access_token_impersonate", + return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, + ): + credentials.refresh(request) + + assert credentials.valid + assert not credentials.expired + assert ( + request.call_args.kwargs["headers"]["x-goog-api-client"] + == ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE + ) + + @pytest.mark.parametrize("use_data_bytes", [True, False]) + def test_refresh_success_iam_endpoint_override( + self, use_data_bytes, mock_donor_credentials + ): + credentials = self.make_credentials( + lifetime=None, iam_endpoint_override=self.IAM_ENDPOINT_OVERRIDE + ) + token = "token" + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + + request = self.make_request( + data=json.dumps(response_body), + status=http_client.OK, + use_data_bytes=use_data_bytes, + ) + + credentials.refresh(request) + + assert credentials.valid + assert not credentials.expired + # Confirm override endpoint used. + request_kwargs = request.call_args[1] + assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE + + @pytest.mark.parametrize("time_skew", [100, -100]) + def test_refresh_source_credentials(self, time_skew): + credentials = self.make_credentials(lifetime=None) + + # Source credentials is refreshed only if it is expired within + # _helpers.REFRESH_THRESHOLD from now. We add a time_skew to the expiry, so + # source credentials is refreshed only if time_skew <= 0. + credentials._source_credentials.expiry = ( + _helpers.utcnow() + + _helpers.REFRESH_THRESHOLD + + datetime.timedelta(seconds=time_skew) + ) + credentials._source_credentials.token = "Token" + + with mock.patch( + "google.oauth2.service_account.Credentials.refresh", autospec=True + ) as source_cred_refresh: + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": "token", "expireTime": expire_time} + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + + credentials.refresh(request) + + assert credentials.valid + assert not credentials.expired + + # Source credentials is refreshed only if it is expired within + # _helpers.REFRESH_THRESHOLD + if time_skew > 0: + source_cred_refresh.assert_not_called() + else: + source_cred_refresh.assert_called_once() + + def test_refresh_failure_malformed_expire_time(self, mock_donor_credentials): + credentials = self.make_credentials(lifetime=None) + token = "token" + + expire_time = (_helpers.utcnow() + datetime.timedelta(seconds=500)).isoformat( + "T" + ) + response_body = {"accessToken": token, "expireTime": expire_time} + + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.refresh(request) + + assert excinfo.match(impersonated_credentials._REFRESH_ERROR) + + assert not credentials.valid + assert credentials.expired + + def test_refresh_failure_unauthorzed(self, mock_donor_credentials): + credentials = self.make_credentials(lifetime=None) + + response_body = { + "error": { + "code": 403, + "message": "The caller does not have permission", + "status": "PERMISSION_DENIED", + } + } + + request = self.make_request( + data=json.dumps(response_body), status=http_client.UNAUTHORIZED + ) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.refresh(request) + + assert excinfo.match(impersonated_credentials._REFRESH_ERROR) + + assert not credentials.valid + assert credentials.expired + + def test_refresh_failure(self): + credentials = self.make_credentials(lifetime=None) + credentials.expiry = None + credentials.token = "token" + id_creds = impersonated_credentials.IDTokenCredentials( + credentials, target_audience="audience" + ) + + response = mock.create_autospec(transport.Response, instance=False) + response.status_code = http_client.UNAUTHORIZED + response.json = mock.Mock(return_value="failed to get ID token") + + with mock.patch( + "google.auth.transport.requests.AuthorizedSession.post", + return_value=response, + ): + with pytest.raises(exceptions.RefreshError) as excinfo: + id_creds.refresh(None) + + assert excinfo.match("Error getting ID token") + + def test_refresh_failure_http_error(self, mock_donor_credentials): + credentials = self.make_credentials(lifetime=None) + + response_body = {} + + request = self.make_request( + data=json.dumps(response_body), status=http_client.HTTPException + ) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.refresh(request) + + assert excinfo.match(impersonated_credentials._REFRESH_ERROR) + + assert not credentials.valid + assert credentials.expired + + def test_expired(self): + credentials = self.make_credentials(lifetime=None) + assert credentials.expired + + def test_signer(self): + credentials = self.make_credentials() + assert isinstance(credentials.signer, impersonated_credentials.Credentials) + + def test_signer_email(self): + credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL) + assert credentials.signer_email == self.TARGET_PRINCIPAL + + def test_service_account_email(self): + credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL) + assert credentials.service_account_email == self.TARGET_PRINCIPAL + + def test_sign_bytes(self, mock_donor_credentials, mock_authorizedsession_sign): + credentials = self.make_credentials(lifetime=None) + token = "token" + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + token_response_body = {"accessToken": token, "expireTime": expire_time} + + response = mock.create_autospec(transport.Response, instance=False) + response.status = http_client.OK + response.data = _helpers.to_bytes(json.dumps(token_response_body)) + + request = mock.create_autospec(transport.Request, instance=False) + request.return_value = response + + credentials.refresh(request) + + assert credentials.valid + assert not credentials.expired + + signature = credentials.sign_bytes(b"signed bytes") + assert signature == b"signature" + + def test_sign_bytes_failure(self): + credentials = self.make_credentials(lifetime=None) + + with mock.patch( + "google.auth.transport.requests.AuthorizedSession.request", autospec=True + ) as auth_session: + data = {"error": {"code": 403, "message": "unauthorized"}} + auth_session.return_value = MockResponse(data, http_client.FORBIDDEN) + + with pytest.raises(exceptions.TransportError) as excinfo: + credentials.sign_bytes(b"foo") + assert excinfo.match("'code': 403") + + def test_with_quota_project(self): + credentials = self.make_credentials() + + quota_project_creds = credentials.with_quota_project("project-foo") + assert quota_project_creds._quota_project_id == "project-foo" + + @pytest.mark.parametrize("use_data_bytes", [True, False]) + def test_with_quota_project_iam_endpoint_override( + self, use_data_bytes, mock_donor_credentials + ): + credentials = self.make_credentials( + lifetime=None, iam_endpoint_override=self.IAM_ENDPOINT_OVERRIDE + ) + token = "token" + # iam_endpoint_override should be copied to created credentials. + quota_project_creds = credentials.with_quota_project("project-foo") + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + + request = self.make_request( + data=json.dumps(response_body), + status=http_client.OK, + use_data_bytes=use_data_bytes, + ) + + quota_project_creds.refresh(request) + + assert quota_project_creds.valid + assert not quota_project_creds.expired + # Confirm override endpoint used. + request_kwargs = request.call_args[1] + assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE + + def test_with_scopes(self): + credentials = self.make_credentials() + credentials._target_scopes = [] + assert credentials.requires_scopes is True + credentials = credentials.with_scopes(["fake_scope1", "fake_scope2"]) + assert credentials.requires_scopes is False + assert credentials._target_scopes == ["fake_scope1", "fake_scope2"] + + def test_with_scopes_provide_default_scopes(self): + credentials = self.make_credentials() + credentials._target_scopes = [] + credentials = credentials.with_scopes( + ["fake_scope1"], default_scopes=["fake_scope2"] + ) + assert credentials._target_scopes == ["fake_scope1"] + + def test_id_token_success( + self, mock_donor_credentials, mock_authorizedsession_idtoken + ): + credentials = self.make_credentials(lifetime=None) + token = "token" + target_audience = "https://foo.bar" + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + + credentials.refresh(request) + + assert credentials.valid + assert not credentials.expired + + id_creds = impersonated_credentials.IDTokenCredentials( + credentials, target_audience=target_audience + ) + id_creds.refresh(request) + + assert id_creds.token == ID_TOKEN_DATA + assert id_creds.expiry == datetime.datetime.utcfromtimestamp(ID_TOKEN_EXPIRY) + + def test_id_token_metrics(self, mock_donor_credentials): + credentials = self.make_credentials(lifetime=None) + credentials.token = "token" + credentials.expiry = None + target_audience = "https://foo.bar" + + id_creds = impersonated_credentials.IDTokenCredentials( + credentials, target_audience=target_audience + ) + + with mock.patch( + "google.auth.metrics.token_request_id_token_impersonate", + return_value=ID_TOKEN_REQUEST_METRICS_HEADER_VALUE, + ): + with mock.patch( + "google.auth.transport.requests.AuthorizedSession.post", autospec=True + ) as mock_post: + data = {"token": ID_TOKEN_DATA} + mock_post.return_value = MockResponse(data, http_client.OK) + id_creds.refresh(None) + + assert id_creds.token == ID_TOKEN_DATA + assert id_creds.expiry == datetime.datetime.utcfromtimestamp( + ID_TOKEN_EXPIRY + ) + assert ( + mock_post.call_args.kwargs["headers"]["x-goog-api-client"] + == ID_TOKEN_REQUEST_METRICS_HEADER_VALUE + ) + + def test_id_token_from_credential( + self, mock_donor_credentials, mock_authorizedsession_idtoken + ): + credentials = self.make_credentials(lifetime=None) + token = "token" + target_audience = "https://foo.bar" + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + + credentials.refresh(request) + + assert credentials.valid + assert not credentials.expired + + new_credentials = self.make_credentials(lifetime=None) + + id_creds = impersonated_credentials.IDTokenCredentials( + credentials, target_audience=target_audience, include_email=True + ) + id_creds = id_creds.from_credentials(target_credentials=new_credentials) + id_creds.refresh(request) + + assert id_creds.token == ID_TOKEN_DATA + assert id_creds._include_email is True + assert id_creds._target_credentials is new_credentials + + def test_id_token_with_target_audience( + self, mock_donor_credentials, mock_authorizedsession_idtoken + ): + credentials = self.make_credentials(lifetime=None) + token = "token" + target_audience = "https://foo.bar" + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + + credentials.refresh(request) + + assert credentials.valid + assert not credentials.expired + + id_creds = impersonated_credentials.IDTokenCredentials( + credentials, include_email=True + ) + id_creds = id_creds.with_target_audience(target_audience=target_audience) + id_creds.refresh(request) + + assert id_creds.token == ID_TOKEN_DATA + assert id_creds.expiry == datetime.datetime.utcfromtimestamp(ID_TOKEN_EXPIRY) + assert id_creds._include_email is True + + def test_id_token_invalid_cred( + self, mock_donor_credentials, mock_authorizedsession_idtoken + ): + credentials = None + + with pytest.raises(exceptions.GoogleAuthError) as excinfo: + impersonated_credentials.IDTokenCredentials(credentials) + + assert excinfo.match("Provided Credential must be" " impersonated_credentials") + + def test_id_token_with_include_email( + self, mock_donor_credentials, mock_authorizedsession_idtoken + ): + credentials = self.make_credentials(lifetime=None) + token = "token" + target_audience = "https://foo.bar" + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + + credentials.refresh(request) + + assert credentials.valid + assert not credentials.expired + + id_creds = impersonated_credentials.IDTokenCredentials( + credentials, target_audience=target_audience + ) + id_creds = id_creds.with_include_email(True) + id_creds.refresh(request) + + assert id_creds.token == ID_TOKEN_DATA + + def test_id_token_with_quota_project( + self, mock_donor_credentials, mock_authorizedsession_idtoken + ): + credentials = self.make_credentials(lifetime=None) + token = "token" + target_audience = "https://foo.bar" + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + + credentials.refresh(request) + + assert credentials.valid + assert not credentials.expired + + id_creds = impersonated_credentials.IDTokenCredentials( + credentials, target_audience=target_audience + ) + id_creds = id_creds.with_quota_project("project-foo") + id_creds.refresh(request) + + assert id_creds.quota_project_id == "project-foo" |