diff options
author | alexv-smirnov <alex@ydb.tech> | 2023-12-01 12:02:50 +0300 |
---|---|---|
committer | alexv-smirnov <alex@ydb.tech> | 2023-12-01 13:28:10 +0300 |
commit | 0e578a4c44d4abd539d9838347b9ebafaca41dfb (patch) | |
tree | a0c1969c37f818c830ebeff9c077eacf30be6ef8 /contrib/python/google-auth/py3/tests/compute_engine | |
parent | 84f2d3d4cc985e63217cff149bd2e6d67ae6fe22 (diff) | |
download | ydb-0e578a4c44d4abd539d9838347b9ebafaca41dfb.tar.gz |
Change "ya.make"
Diffstat (limited to 'contrib/python/google-auth/py3/tests/compute_engine')
5 files changed, 1327 insertions, 0 deletions
diff --git a/contrib/python/google-auth/py3/tests/compute_engine/__init__.py b/contrib/python/google-auth/py3/tests/compute_engine/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/google-auth/py3/tests/compute_engine/__init__.py diff --git a/contrib/python/google-auth/py3/tests/compute_engine/data/smbios_product_name b/contrib/python/google-auth/py3/tests/compute_engine/data/smbios_product_name new file mode 100644 index 0000000000..2ca735d9b3 --- /dev/null +++ b/contrib/python/google-auth/py3/tests/compute_engine/data/smbios_product_name @@ -0,0 +1 @@ +Google Compute Engine diff --git a/contrib/python/google-auth/py3/tests/compute_engine/data/smbios_product_name_non_google b/contrib/python/google-auth/py3/tests/compute_engine/data/smbios_product_name_non_google new file mode 100644 index 0000000000..9fd177038e --- /dev/null +++ b/contrib/python/google-auth/py3/tests/compute_engine/data/smbios_product_name_non_google @@ -0,0 +1 @@ +ABC Compute Engine diff --git a/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py b/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py new file mode 100644 index 0000000000..ddf84596af --- /dev/null +++ b/contrib/python/google-auth/py3/tests/compute_engine/test__metadata.py @@ -0,0 +1,450 @@ +# Copyright 2016 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import http.client as http_client +import importlib +import json +import os + +import mock +import pytest # type: ignore + +from google.auth import _helpers +from google.auth import environment_vars +from google.auth import exceptions +from google.auth import transport +from google.auth.compute_engine import _metadata + +PATH = "instance/service-accounts/default" + +DATA_DIR = os.path.join(os.path.dirname(__file__), "data") +SMBIOS_PRODUCT_NAME_FILE = os.path.join(DATA_DIR, "smbios_product_name") +SMBIOS_PRODUCT_NAME_NONEXISTENT_FILE = os.path.join( + DATA_DIR, "smbios_product_name_nonexistent" +) +SMBIOS_PRODUCT_NAME_NON_GOOGLE = os.path.join( + DATA_DIR, "smbios_product_name_non_google" +) + +ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = ( + "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/mds" +) +MDS_PING_METRICS_HEADER_VALUE = "gl-python/3.7 auth/1.1 auth-request-type/mds" +MDS_PING_REQUEST_HEADER = { + "metadata-flavor": "Google", + "x-goog-api-client": MDS_PING_METRICS_HEADER_VALUE, +} + + +def make_request(data, status=http_client.OK, headers=None, retry=False): + response = mock.create_autospec(transport.Response, instance=True) + response.status = status + response.data = _helpers.to_bytes(data) + response.headers = headers or {} + + request = mock.create_autospec(transport.Request) + if retry: + request.side_effect = [exceptions.TransportError(), response] + else: + request.return_value = response + + return request + + +def test_detect_gce_residency_linux_success(): + _metadata._GCE_PRODUCT_NAME_FILE = SMBIOS_PRODUCT_NAME_FILE + assert _metadata.detect_gce_residency_linux() + + +def test_detect_gce_residency_linux_non_google(): + _metadata._GCE_PRODUCT_NAME_FILE = SMBIOS_PRODUCT_NAME_NON_GOOGLE + assert not _metadata.detect_gce_residency_linux() + + +def test_detect_gce_residency_linux_nonexistent(): + _metadata._GCE_PRODUCT_NAME_FILE = SMBIOS_PRODUCT_NAME_NONEXISTENT_FILE + assert not _metadata.detect_gce_residency_linux() + + +def test_is_on_gce_ping_success(): + request = make_request("", headers=_metadata._METADATA_HEADERS) + assert _metadata.is_on_gce(request) + + +@mock.patch("os.name", new="nt") +def test_is_on_gce_windows_success(): + request = make_request("", headers={_metadata._METADATA_FLAVOR_HEADER: "meep"}) + assert not _metadata.is_on_gce(request) + + +@mock.patch("os.name", new="posix") +def test_is_on_gce_linux_success(): + request = make_request("", headers={_metadata._METADATA_FLAVOR_HEADER: "meep"}) + _metadata._GCE_PRODUCT_NAME_FILE = SMBIOS_PRODUCT_NAME_FILE + assert _metadata.is_on_gce(request) + + +@mock.patch("google.auth.metrics.mds_ping", return_value=MDS_PING_METRICS_HEADER_VALUE) +def test_ping_success(mock_metrics_header_value): + request = make_request("", headers=_metadata._METADATA_HEADERS) + + assert _metadata.ping(request) + + request.assert_called_once_with( + method="GET", + url=_metadata._METADATA_IP_ROOT, + headers=MDS_PING_REQUEST_HEADER, + timeout=_metadata._METADATA_DEFAULT_TIMEOUT, + ) + + +@mock.patch("google.auth.metrics.mds_ping", return_value=MDS_PING_METRICS_HEADER_VALUE) +def test_ping_success_retry(mock_metrics_header_value): + request = make_request("", headers=_metadata._METADATA_HEADERS, retry=True) + + assert _metadata.ping(request) + + request.assert_called_with( + method="GET", + url=_metadata._METADATA_IP_ROOT, + headers=MDS_PING_REQUEST_HEADER, + timeout=_metadata._METADATA_DEFAULT_TIMEOUT, + ) + assert request.call_count == 2 + + +def test_ping_failure_bad_flavor(): + request = make_request("", headers={_metadata._METADATA_FLAVOR_HEADER: "meep"}) + + assert not _metadata.ping(request) + + +def test_ping_failure_connection_failed(): + request = make_request("") + request.side_effect = exceptions.TransportError() + + assert not _metadata.ping(request) + + +@mock.patch("google.auth.metrics.mds_ping", return_value=MDS_PING_METRICS_HEADER_VALUE) +def _test_ping_success_custom_root(mock_metrics_header_value): + request = make_request("", headers=_metadata._METADATA_HEADERS) + + fake_ip = "1.2.3.4" + os.environ[environment_vars.GCE_METADATA_IP] = fake_ip + importlib.reload(_metadata) + + try: + assert _metadata.ping(request) + finally: + del os.environ[environment_vars.GCE_METADATA_IP] + importlib.reload(_metadata) + + request.assert_called_once_with( + method="GET", + url="http://" + fake_ip, + headers=MDS_PING_REQUEST_HEADER, + timeout=_metadata._METADATA_DEFAULT_TIMEOUT, + ) + + +def test_get_success_json(): + key, value = "foo", "bar" + + data = json.dumps({key: value}) + request = make_request(data, headers={"content-type": "application/json"}) + + result = _metadata.get(request, PATH) + + request.assert_called_once_with( + method="GET", + url=_metadata._METADATA_ROOT + PATH, + headers=_metadata._METADATA_HEADERS, + ) + assert result[key] == value + + +def test_get_success_retry(): + key, value = "foo", "bar" + + data = json.dumps({key: value}) + request = make_request( + data, headers={"content-type": "application/json"}, retry=True + ) + + result = _metadata.get(request, PATH) + + request.assert_called_with( + method="GET", + url=_metadata._METADATA_ROOT + PATH, + headers=_metadata._METADATA_HEADERS, + ) + assert request.call_count == 2 + assert result[key] == value + + +def test_get_success_text(): + data = "foobar" + request = make_request(data, headers={"content-type": "text/plain"}) + + result = _metadata.get(request, PATH) + + request.assert_called_once_with( + method="GET", + url=_metadata._METADATA_ROOT + PATH, + headers=_metadata._METADATA_HEADERS, + ) + assert result == data + + +def test_get_success_params(): + data = "foobar" + request = make_request(data, headers={"content-type": "text/plain"}) + params = {"recursive": "true"} + + result = _metadata.get(request, PATH, params=params) + + request.assert_called_once_with( + method="GET", + url=_metadata._METADATA_ROOT + PATH + "?recursive=true", + headers=_metadata._METADATA_HEADERS, + ) + assert result == data + + +def test_get_success_recursive_and_params(): + data = "foobar" + request = make_request(data, headers={"content-type": "text/plain"}) + params = {"recursive": "false"} + result = _metadata.get(request, PATH, recursive=True, params=params) + + request.assert_called_once_with( + method="GET", + url=_metadata._METADATA_ROOT + PATH + "?recursive=true", + headers=_metadata._METADATA_HEADERS, + ) + assert result == data + + +def test_get_success_recursive(): + data = "foobar" + request = make_request(data, headers={"content-type": "text/plain"}) + + result = _metadata.get(request, PATH, recursive=True) + + request.assert_called_once_with( + method="GET", + url=_metadata._METADATA_ROOT + PATH + "?recursive=true", + headers=_metadata._METADATA_HEADERS, + ) + assert result == data + + +def _test_get_success_custom_root_new_variable(): + request = make_request("{}", headers={"content-type": "application/json"}) + + fake_root = "another.metadata.service" + os.environ[environment_vars.GCE_METADATA_HOST] = fake_root + importlib.reload(_metadata) + + try: + _metadata.get(request, PATH) + finally: + del os.environ[environment_vars.GCE_METADATA_HOST] + importlib.reload(_metadata) + + request.assert_called_once_with( + method="GET", + url="http://{}/computeMetadata/v1/{}".format(fake_root, PATH), + headers=_metadata._METADATA_HEADERS, + ) + + +def _test_get_success_custom_root_old_variable(): + request = make_request("{}", headers={"content-type": "application/json"}) + + fake_root = "another.metadata.service" + os.environ[environment_vars.GCE_METADATA_ROOT] = fake_root + importlib.reload(_metadata) + + try: + _metadata.get(request, PATH) + finally: + del os.environ[environment_vars.GCE_METADATA_ROOT] + importlib.reload(_metadata) + + request.assert_called_once_with( + method="GET", + url="http://{}/computeMetadata/v1/{}".format(fake_root, PATH), + headers=_metadata._METADATA_HEADERS, + ) + + +def test_get_failure(): + request = make_request("Metadata error", status=http_client.NOT_FOUND) + + with pytest.raises(exceptions.TransportError) as excinfo: + _metadata.get(request, PATH) + + assert excinfo.match(r"Metadata error") + + request.assert_called_once_with( + method="GET", + url=_metadata._METADATA_ROOT + PATH, + headers=_metadata._METADATA_HEADERS, + ) + + +def test_get_failure_connection_failed(): + request = make_request("") + request.side_effect = exceptions.TransportError() + + with pytest.raises(exceptions.TransportError) as excinfo: + _metadata.get(request, PATH) + + assert excinfo.match(r"Compute Engine Metadata server unavailable") + + request.assert_called_with( + method="GET", + url=_metadata._METADATA_ROOT + PATH, + headers=_metadata._METADATA_HEADERS, + ) + assert request.call_count == 5 + + +def test_get_failure_bad_json(): + request = make_request("{", headers={"content-type": "application/json"}) + + with pytest.raises(exceptions.TransportError) as excinfo: + _metadata.get(request, PATH) + + assert excinfo.match(r"invalid JSON") + + request.assert_called_once_with( + method="GET", + url=_metadata._METADATA_ROOT + PATH, + headers=_metadata._METADATA_HEADERS, + ) + + +def test_get_project_id(): + project = "example-project" + request = make_request(project, headers={"content-type": "text/plain"}) + + project_id = _metadata.get_project_id(request) + + request.assert_called_once_with( + method="GET", + url=_metadata._METADATA_ROOT + "project/project-id", + headers=_metadata._METADATA_HEADERS, + ) + assert project_id == project + + +@mock.patch( + "google.auth.metrics.token_request_access_token_mds", + return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, +) +@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) +def test_get_service_account_token(utcnow, mock_metrics_header_value): + ttl = 500 + request = make_request( + json.dumps({"access_token": "token", "expires_in": ttl}), + headers={"content-type": "application/json"}, + ) + + token, expiry = _metadata.get_service_account_token(request) + + request.assert_called_once_with( + method="GET", + url=_metadata._METADATA_ROOT + PATH + "/token", + headers={ + "metadata-flavor": "Google", + "x-goog-api-client": ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, + }, + ) + assert token == "token" + assert expiry == utcnow() + datetime.timedelta(seconds=ttl) + + +@mock.patch( + "google.auth.metrics.token_request_access_token_mds", + return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, +) +@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) +def test_get_service_account_token_with_scopes_list(utcnow, mock_metrics_header_value): + ttl = 500 + request = make_request( + json.dumps({"access_token": "token", "expires_in": ttl}), + headers={"content-type": "application/json"}, + ) + + token, expiry = _metadata.get_service_account_token(request, scopes=["foo", "bar"]) + + request.assert_called_once_with( + method="GET", + url=_metadata._METADATA_ROOT + PATH + "/token" + "?scopes=foo%2Cbar", + headers={ + "metadata-flavor": "Google", + "x-goog-api-client": ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, + }, + ) + assert token == "token" + assert expiry == utcnow() + datetime.timedelta(seconds=ttl) + + +@mock.patch( + "google.auth.metrics.token_request_access_token_mds", + return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, +) +@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) +def test_get_service_account_token_with_scopes_string( + utcnow, mock_metrics_header_value +): + ttl = 500 + request = make_request( + json.dumps({"access_token": "token", "expires_in": ttl}), + headers={"content-type": "application/json"}, + ) + + token, expiry = _metadata.get_service_account_token(request, scopes="foo,bar") + + request.assert_called_once_with( + method="GET", + url=_metadata._METADATA_ROOT + PATH + "/token" + "?scopes=foo%2Cbar", + headers={ + "metadata-flavor": "Google", + "x-goog-api-client": ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, + }, + ) + assert token == "token" + assert expiry == utcnow() + datetime.timedelta(seconds=ttl) + + +def test_get_service_account_info(): + key, value = "foo", "bar" + request = make_request( + json.dumps({key: value}), headers={"content-type": "application/json"} + ) + + info = _metadata.get_service_account_info(request) + + request.assert_called_once_with( + method="GET", + url=_metadata._METADATA_ROOT + PATH + "/?recursive=true", + headers=_metadata._METADATA_HEADERS, + ) + + assert info[key] == value diff --git a/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py b/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py new file mode 100644 index 0000000000..507fea9fcc --- /dev/null +++ b/contrib/python/google-auth/py3/tests/compute_engine/test_credentials.py @@ -0,0 +1,875 @@ +# Copyright 2016 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import base64 +import datetime + +import mock +import pytest # type: ignore +import responses # type: ignore + +from google.auth import _helpers +from google.auth import exceptions +from google.auth import jwt +from google.auth import transport +from google.auth.compute_engine import credentials +from google.auth.transport import requests + +SAMPLE_ID_TOKEN_EXP = 1584393400 + +# header: {"alg": "RS256", "typ": "JWT", "kid": "1"} +# payload: {"iss": "issuer", "iat": 1584393348, "sub": "subject", +# "exp": 1584393400,"aud": "audience"} +SAMPLE_ID_TOKEN = ( + b"eyJhbGciOiAiUlMyNTYiLCAidHlwIjogIkpXVCIsICJraWQiOiAiMSJ9." + b"eyJpc3MiOiAiaXNzdWVyIiwgImlhdCI6IDE1ODQzOTMzNDgsICJzdWIiO" + b"iAic3ViamVjdCIsICJleHAiOiAxNTg0MzkzNDAwLCAiYXVkIjogImF1ZG" + b"llbmNlIn0." + b"OquNjHKhTmlgCk361omRo18F_uY-7y0f_AmLbzW062Q1Zr61HAwHYP5FM" + b"316CK4_0cH8MUNGASsvZc3VqXAqub6PUTfhemH8pFEwBdAdG0LhrNkU0H" + b"WN1YpT55IiQ31esLdL5q-qDsOPpNZJUti1y1lAreM5nIn2srdWzGXGs4i" + b"TRQsn0XkNUCL4RErpciXmjfhMrPkcAjKA-mXQm2fa4jmTlEZFqFmUlym1" + b"ozJ0yf5grjN6AslN4OGvAv1pS-_Ko_pGBS6IQtSBC6vVKCUuBfaqNjykg" + b"bsxbLa6Fp0SYeYwO8ifEnkRvasVpc1WTQqfRB2JCj5pTBDzJpIpFCMmnQ" +) + +ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = ( + "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/mds" +) +ID_TOKEN_REQUEST_METRICS_HEADER_VALUE = ( + "gl-python/3.7 auth/1.1 auth-request-type/it cred-type/mds" +) + + +class TestCredentials(object): + credentials = None + + @pytest.fixture(autouse=True) + def credentials_fixture(self): + self.credentials = credentials.Credentials() + + def test_default_state(self): + assert not self.credentials.valid + # Expiration hasn't been set yet + assert not self.credentials.expired + # Scopes are needed + assert self.credentials.requires_scopes + # Service account email hasn't been populated + assert self.credentials.service_account_email == "default" + # No quota project + assert not self.credentials._quota_project_id + + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD, + ) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_refresh_success(self, get, utcnow): + get.side_effect = [ + { + # First request is for sevice account info. + "email": "service-account@example.com", + "scopes": ["one", "two"], + }, + { + # Second request is for the token. + "access_token": "token", + "expires_in": 500, + }, + ] + + # Refresh credentials + self.credentials.refresh(None) + + # Check that the credentials have the token and proper expiration + assert self.credentials.token == "token" + assert self.credentials.expiry == (utcnow() + datetime.timedelta(seconds=500)) + + # Check the credential info + assert self.credentials.service_account_email == "service-account@example.com" + assert self.credentials._scopes == ["one", "two"] + + # Check that the credentials are valid (have a token and are not + # expired) + assert self.credentials.valid + + @mock.patch( + "google.auth.metrics.token_request_access_token_mds", + return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, + ) + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD, + ) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_refresh_success_with_scopes(self, get, utcnow, mock_metrics_header_value): + get.side_effect = [ + { + # First request is for sevice account info. + "email": "service-account@example.com", + "scopes": ["one", "two"], + }, + { + # Second request is for the token. + "access_token": "token", + "expires_in": 500, + }, + ] + + # Refresh credentials + scopes = ["three", "four"] + self.credentials = self.credentials.with_scopes(scopes) + self.credentials.refresh(None) + + # Check that the credentials have the token and proper expiration + assert self.credentials.token == "token" + assert self.credentials.expiry == (utcnow() + datetime.timedelta(seconds=500)) + + # Check the credential info + assert self.credentials.service_account_email == "service-account@example.com" + assert self.credentials._scopes == scopes + + # Check that the credentials are valid (have a token and are not + # expired) + assert self.credentials.valid + + kwargs = get.call_args[1] + assert kwargs["params"] == {"scopes": "three,four"} + assert kwargs["headers"] == { + "x-goog-api-client": ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE + } + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_refresh_error(self, get): + get.side_effect = exceptions.TransportError("http error") + + with pytest.raises(exceptions.RefreshError) as excinfo: + self.credentials.refresh(None) + + assert excinfo.match(r"http error") + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_before_request_refreshes(self, get): + get.side_effect = [ + { + # First request is for sevice account info. + "email": "service-account@example.com", + "scopes": "one two", + }, + { + # Second request is for the token. + "access_token": "token", + "expires_in": 500, + }, + ] + + # Credentials should start as invalid + assert not self.credentials.valid + + # before_request should cause a refresh + request = mock.create_autospec(transport.Request, instance=True) + self.credentials.before_request(request, "GET", "http://example.com?a=1#3", {}) + + # The refresh endpoint should've been called. + assert get.called + + # Credentials should now be valid. + assert self.credentials.valid + + def test_with_quota_project(self): + quota_project_creds = self.credentials.with_quota_project("project-foo") + + assert quota_project_creds._quota_project_id == "project-foo" + + def test_with_scopes(self): + assert self.credentials._scopes is None + + scopes = ["one", "two"] + self.credentials = self.credentials.with_scopes(scopes) + + assert self.credentials._scopes == scopes + + def test_token_usage_metrics(self): + self.credentials.token = "token" + self.credentials.expiry = None + + headers = {} + self.credentials.before_request(mock.Mock(), None, None, headers) + assert headers["authorization"] == "Bearer token" + assert headers["x-goog-api-client"] == "cred-type/mds" + + +class TestIDTokenCredentials(object): + credentials = None + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_default_state(self, get): + get.side_effect = [ + {"email": "service-account@example.com", "scope": ["one", "two"]} + ] + + request = mock.create_autospec(transport.Request, instance=True) + self.credentials = credentials.IDTokenCredentials( + request=request, target_audience="https://example.com" + ) + + assert not self.credentials.valid + # Expiration hasn't been set yet + assert not self.credentials.expired + # Service account email hasn't been populated + assert self.credentials.service_account_email == "service-account@example.com" + # Signer is initialized + assert self.credentials.signer + assert self.credentials.signer_email == "service-account@example.com" + # No quota project + assert not self.credentials._quota_project_id + + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.utcfromtimestamp(0), + ) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch("google.auth.iam.Signer.sign", autospec=True) + def test_make_authorization_grant_assertion(self, sign, get, utcnow): + get.side_effect = [ + {"email": "service-account@example.com", "scopes": ["one", "two"]} + ] + sign.side_effect = [b"signature"] + + request = mock.create_autospec(transport.Request, instance=True) + self.credentials = credentials.IDTokenCredentials( + request=request, target_audience="https://audience.com" + ) + + # Generate authorization grant: + token = self.credentials._make_authorization_grant_assertion() + payload = jwt.decode(token, verify=False) + + # The JWT token signature is 'signature' encoded in base 64: + assert token.endswith(b".c2lnbmF0dXJl") + + # Check that the credentials have the token and proper expiration + assert payload == { + "aud": "https://www.googleapis.com/oauth2/v4/token", + "exp": 3600, + "iat": 0, + "iss": "service-account@example.com", + "target_audience": "https://audience.com", + } + + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.utcfromtimestamp(0), + ) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch("google.auth.iam.Signer.sign", autospec=True) + def test_with_service_account(self, sign, get, utcnow): + sign.side_effect = [b"signature"] + + request = mock.create_autospec(transport.Request, instance=True) + self.credentials = credentials.IDTokenCredentials( + request=request, + target_audience="https://audience.com", + service_account_email="service-account@other.com", + ) + + # Generate authorization grant: + token = self.credentials._make_authorization_grant_assertion() + payload = jwt.decode(token, verify=False) + + # The JWT token signature is 'signature' encoded in base 64: + assert token.endswith(b".c2lnbmF0dXJl") + + # Check that the credentials have the token and proper expiration + assert payload == { + "aud": "https://www.googleapis.com/oauth2/v4/token", + "exp": 3600, + "iat": 0, + "iss": "service-account@other.com", + "target_audience": "https://audience.com", + } + + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.utcfromtimestamp(0), + ) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch("google.auth.iam.Signer.sign", autospec=True) + def test_additional_claims(self, sign, get, utcnow): + get.side_effect = [ + {"email": "service-account@example.com", "scopes": ["one", "two"]} + ] + sign.side_effect = [b"signature"] + + request = mock.create_autospec(transport.Request, instance=True) + self.credentials = credentials.IDTokenCredentials( + request=request, + target_audience="https://audience.com", + additional_claims={"foo": "bar"}, + ) + + # Generate authorization grant: + token = self.credentials._make_authorization_grant_assertion() + payload = jwt.decode(token, verify=False) + + # The JWT token signature is 'signature' encoded in base 64: + assert token.endswith(b".c2lnbmF0dXJl") + + # Check that the credentials have the token and proper expiration + assert payload == { + "aud": "https://www.googleapis.com/oauth2/v4/token", + "exp": 3600, + "iat": 0, + "iss": "service-account@example.com", + "target_audience": "https://audience.com", + "foo": "bar", + } + + def test_token_uri(self): + request = mock.create_autospec(transport.Request, instance=True) + + self.credentials = credentials.IDTokenCredentials( + request=request, + signer=mock.Mock(), + service_account_email="foo@example.com", + target_audience="https://audience.com", + ) + assert self.credentials._token_uri == credentials._DEFAULT_TOKEN_URI + + self.credentials = credentials.IDTokenCredentials( + request=request, + signer=mock.Mock(), + service_account_email="foo@example.com", + target_audience="https://audience.com", + token_uri="https://example.com/token", + ) + assert self.credentials._token_uri == "https://example.com/token" + + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.utcfromtimestamp(0), + ) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch("google.auth.iam.Signer.sign", autospec=True) + def test_with_target_audience(self, sign, get, utcnow): + get.side_effect = [ + {"email": "service-account@example.com", "scopes": ["one", "two"]} + ] + sign.side_effect = [b"signature"] + + request = mock.create_autospec(transport.Request, instance=True) + self.credentials = credentials.IDTokenCredentials( + request=request, target_audience="https://audience.com" + ) + self.credentials = self.credentials.with_target_audience("https://actually.not") + + # Generate authorization grant: + token = self.credentials._make_authorization_grant_assertion() + payload = jwt.decode(token, verify=False) + + # The JWT token signature is 'signature' encoded in base 64: + assert token.endswith(b".c2lnbmF0dXJl") + + # Check that the credentials have the token and proper expiration + assert payload == { + "aud": "https://www.googleapis.com/oauth2/v4/token", + "exp": 3600, + "iat": 0, + "iss": "service-account@example.com", + "target_audience": "https://actually.not", + } + + # Check that the signer have been initialized with a Request object + assert isinstance(self.credentials._signer._request, transport.Request) + + @responses.activate + def test_with_target_audience_integration(self): + """ Test that it is possible to refresh credentials + generated from `with_target_audience`. + + Instead of mocking the methods, the HTTP responses + have been mocked. + """ + + # mock information about credentials + responses.add( + responses.GET, + "http://metadata.google.internal/computeMetadata/v1/instance/" + "service-accounts/default/?recursive=true", + status=200, + content_type="application/json", + json={ + "scopes": "email", + "email": "service-account@example.com", + "aliases": ["default"], + }, + ) + + # mock token for credentials + responses.add( + responses.GET, + "http://metadata.google.internal/computeMetadata/v1/instance/" + "service-accounts/service-account@example.com/token", + status=200, + content_type="application/json", + json={ + "access_token": "some-token", + "expires_in": 3210, + "token_type": "Bearer", + }, + ) + + # mock sign blob endpoint + signature = base64.b64encode(b"some-signature").decode("utf-8") + responses.add( + responses.POST, + "https://iamcredentials.googleapis.com/v1/projects/-/" + "serviceAccounts/service-account@example.com:signBlob?alt=json", + status=200, + content_type="application/json", + json={"keyId": "some-key-id", "signedBlob": signature}, + ) + + id_token = "{}.{}.{}".format( + base64.b64encode(b'{"some":"some"}').decode("utf-8"), + base64.b64encode(b'{"exp": 3210}').decode("utf-8"), + base64.b64encode(b"token").decode("utf-8"), + ) + + # mock id token endpoint + responses.add( + responses.POST, + "https://www.googleapis.com/oauth2/v4/token", + status=200, + content_type="application/json", + json={"id_token": id_token, "expiry": 3210}, + ) + + self.credentials = credentials.IDTokenCredentials( + request=requests.Request(), + service_account_email="service-account@example.com", + target_audience="https://audience.com", + ) + + self.credentials = self.credentials.with_target_audience("https://actually.not") + + self.credentials.refresh(requests.Request()) + + assert self.credentials.token is not None + + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.utcfromtimestamp(0), + ) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch("google.auth.iam.Signer.sign", autospec=True) + def test_with_quota_project(self, sign, get, utcnow): + get.side_effect = [ + {"email": "service-account@example.com", "scopes": ["one", "two"]} + ] + sign.side_effect = [b"signature"] + + request = mock.create_autospec(transport.Request, instance=True) + self.credentials = credentials.IDTokenCredentials( + request=request, target_audience="https://audience.com" + ) + self.credentials = self.credentials.with_quota_project("project-foo") + + assert self.credentials._quota_project_id == "project-foo" + + # Generate authorization grant: + token = self.credentials._make_authorization_grant_assertion() + payload = jwt.decode(token, verify=False) + + # The JWT token signature is 'signature' encoded in base 64: + assert token.endswith(b".c2lnbmF0dXJl") + + # Check that the credentials have the token and proper expiration + assert payload == { + "aud": "https://www.googleapis.com/oauth2/v4/token", + "exp": 3600, + "iat": 0, + "iss": "service-account@example.com", + "target_audience": "https://audience.com", + } + + # Check that the signer have been initialized with a Request object + assert isinstance(self.credentials._signer._request, transport.Request) + + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.utcfromtimestamp(0), + ) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch("google.auth.iam.Signer.sign", autospec=True) + def test_with_token_uri(self, sign, get, utcnow): + get.side_effect = [ + {"email": "service-account@example.com", "scopes": ["one", "two"]} + ] + sign.side_effect = [b"signature"] + + request = mock.create_autospec(transport.Request, instance=True) + self.credentials = credentials.IDTokenCredentials( + request=request, + target_audience="https://audience.com", + token_uri="http://xyz.com", + ) + assert self.credentials._token_uri == "http://xyz.com" + creds_with_token_uri = self.credentials.with_token_uri("http://example.com") + assert creds_with_token_uri._token_uri == "http://example.com" + + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.utcfromtimestamp(0), + ) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch("google.auth.iam.Signer.sign", autospec=True) + def test_with_token_uri_exception(self, sign, get, utcnow): + get.side_effect = [ + {"email": "service-account@example.com", "scopes": ["one", "two"]} + ] + sign.side_effect = [b"signature"] + + request = mock.create_autospec(transport.Request, instance=True) + self.credentials = credentials.IDTokenCredentials( + request=request, + target_audience="https://audience.com", + use_metadata_identity_endpoint=True, + ) + assert self.credentials._token_uri is None + with pytest.raises(ValueError): + self.credentials.with_token_uri("http://example.com") + + @responses.activate + def test_with_quota_project_integration(self): + """ Test that it is possible to refresh credentials + generated from `with_quota_project`. + + Instead of mocking the methods, the HTTP responses + have been mocked. + """ + + # mock information about credentials + responses.add( + responses.GET, + "http://metadata.google.internal/computeMetadata/v1/instance/" + "service-accounts/default/?recursive=true", + status=200, + content_type="application/json", + json={ + "scopes": "email", + "email": "service-account@example.com", + "aliases": ["default"], + }, + ) + + # mock token for credentials + responses.add( + responses.GET, + "http://metadata.google.internal/computeMetadata/v1/instance/" + "service-accounts/service-account@example.com/token", + status=200, + content_type="application/json", + json={ + "access_token": "some-token", + "expires_in": 3210, + "token_type": "Bearer", + }, + ) + + # mock sign blob endpoint + signature = base64.b64encode(b"some-signature").decode("utf-8") + responses.add( + responses.POST, + "https://iamcredentials.googleapis.com/v1/projects/-/" + "serviceAccounts/service-account@example.com:signBlob?alt=json", + status=200, + content_type="application/json", + json={"keyId": "some-key-id", "signedBlob": signature}, + ) + + id_token = "{}.{}.{}".format( + base64.b64encode(b'{"some":"some"}').decode("utf-8"), + base64.b64encode(b'{"exp": 3210}').decode("utf-8"), + base64.b64encode(b"token").decode("utf-8"), + ) + + # mock id token endpoint + responses.add( + responses.POST, + "https://www.googleapis.com/oauth2/v4/token", + status=200, + content_type="application/json", + json={"id_token": id_token, "expiry": 3210}, + ) + + self.credentials = credentials.IDTokenCredentials( + request=requests.Request(), + service_account_email="service-account@example.com", + target_audience="https://audience.com", + ) + + self.credentials = self.credentials.with_quota_project("project-foo") + + self.credentials.refresh(requests.Request()) + + assert self.credentials.token is not None + assert self.credentials._quota_project_id == "project-foo" + + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.utcfromtimestamp(0), + ) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch("google.auth.iam.Signer.sign", autospec=True) + @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True) + def test_refresh_success(self, id_token_jwt_grant, sign, get, utcnow): + get.side_effect = [ + {"email": "service-account@example.com", "scopes": ["one", "two"]} + ] + sign.side_effect = [b"signature"] + id_token_jwt_grant.side_effect = [ + ("idtoken", datetime.datetime.utcfromtimestamp(3600), {}) + ] + + request = mock.create_autospec(transport.Request, instance=True) + self.credentials = credentials.IDTokenCredentials( + request=request, target_audience="https://audience.com" + ) + + # Refresh credentials + self.credentials.refresh(None) + + # Check that the credentials have the token and proper expiration + assert self.credentials.token == "idtoken" + assert self.credentials.expiry == (datetime.datetime.utcfromtimestamp(3600)) + + # Check the credential info + assert self.credentials.service_account_email == "service-account@example.com" + + # Check that the credentials are valid (have a token and are not + # expired) + assert self.credentials.valid + + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.utcfromtimestamp(0), + ) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch("google.auth.iam.Signer.sign", autospec=True) + def test_refresh_error(self, sign, get, utcnow): + get.side_effect = [ + {"email": "service-account@example.com", "scopes": ["one", "two"]} + ] + sign.side_effect = [b"signature"] + + request = mock.create_autospec(transport.Request, instance=True) + response = mock.Mock() + response.data = b'{"error": "http error"}' + response.status = 404 # Throw a 404 so the request is not retried. + request.side_effect = [response] + + self.credentials = credentials.IDTokenCredentials( + request=request, target_audience="https://audience.com" + ) + + with pytest.raises(exceptions.RefreshError) as excinfo: + self.credentials.refresh(request) + + assert excinfo.match(r"http error") + + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.utcfromtimestamp(0), + ) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch("google.auth.iam.Signer.sign", autospec=True) + @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True) + def test_before_request_refreshes(self, id_token_jwt_grant, sign, get, utcnow): + get.side_effect = [ + {"email": "service-account@example.com", "scopes": "one two"} + ] + sign.side_effect = [b"signature"] + id_token_jwt_grant.side_effect = [ + ("idtoken", datetime.datetime.utcfromtimestamp(3600), {}) + ] + + request = mock.create_autospec(transport.Request, instance=True) + self.credentials = credentials.IDTokenCredentials( + request=request, target_audience="https://audience.com" + ) + + # Credentials should start as invalid + assert not self.credentials.valid + + # before_request should cause a refresh + request = mock.create_autospec(transport.Request, instance=True) + self.credentials.before_request(request, "GET", "http://example.com?a=1#3", {}) + + # The refresh endpoint should've been called. + assert get.called + + # Credentials should now be valid. + assert self.credentials.valid + + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + @mock.patch("google.auth.iam.Signer.sign", autospec=True) + def test_sign_bytes(self, sign, get): + get.side_effect = [ + {"email": "service-account@example.com", "scopes": ["one", "two"]} + ] + sign.side_effect = [b"signature"] + + request = mock.create_autospec(transport.Request, instance=True) + response = mock.Mock() + response.data = b'{"signature": "c2lnbmF0dXJl"}' + response.status = 200 + request.side_effect = [response] + + self.credentials = credentials.IDTokenCredentials( + request=request, target_audience="https://audience.com" + ) + + # Generate authorization grant: + signature = self.credentials.sign_bytes(b"some bytes") + + # The JWT token signature is 'signature' encoded in base 64: + assert signature == b"signature" + + @mock.patch( + "google.auth.metrics.token_request_id_token_mds", + return_value=ID_TOKEN_REQUEST_METRICS_HEADER_VALUE, + ) + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_get_id_token_from_metadata( + self, get, get_service_account_info, mock_metrics_header_value + ): + get.return_value = SAMPLE_ID_TOKEN + get_service_account_info.return_value = {"email": "foo@example.com"} + + cred = credentials.IDTokenCredentials( + mock.Mock(), "audience", use_metadata_identity_endpoint=True + ) + cred.refresh(request=mock.Mock()) + + assert get.call_args.kwargs["headers"] == { + "x-goog-api-client": ID_TOKEN_REQUEST_METRICS_HEADER_VALUE + } + + assert cred.token == SAMPLE_ID_TOKEN + assert cred.expiry == datetime.datetime.utcfromtimestamp(SAMPLE_ID_TOKEN_EXP) + assert cred._use_metadata_identity_endpoint + assert cred._signer is None + assert cred._token_uri is None + assert cred._service_account_email == "foo@example.com" + assert cred._target_audience == "audience" + with pytest.raises(ValueError): + cred.sign_bytes(b"bytes") + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + def test_with_target_audience_for_metadata(self, get_service_account_info): + get_service_account_info.return_value = {"email": "foo@example.com"} + + cred = credentials.IDTokenCredentials( + mock.Mock(), "audience", use_metadata_identity_endpoint=True + ) + cred = cred.with_target_audience("new_audience") + + assert cred._target_audience == "new_audience" + assert cred._use_metadata_identity_endpoint + assert cred._signer is None + assert cred._token_uri is None + assert cred._service_account_email == "foo@example.com" + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + def test_id_token_with_quota_project(self, get_service_account_info): + get_service_account_info.return_value = {"email": "foo@example.com"} + + cred = credentials.IDTokenCredentials( + mock.Mock(), "audience", use_metadata_identity_endpoint=True + ) + cred = cred.with_quota_project("project-foo") + + assert cred._quota_project_id == "project-foo" + assert cred._use_metadata_identity_endpoint + assert cred._signer is None + assert cred._token_uri is None + assert cred._service_account_email == "foo@example.com" + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_invalid_id_token_from_metadata(self, get, get_service_account_info): + get.return_value = "invalid_id_token" + get_service_account_info.return_value = {"email": "foo@example.com"} + + cred = credentials.IDTokenCredentials( + mock.Mock(), "audience", use_metadata_identity_endpoint=True + ) + + with pytest.raises(ValueError): + cred.refresh(request=mock.Mock()) + + @mock.patch( + "google.auth.compute_engine._metadata.get_service_account_info", autospec=True + ) + @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) + def test_transport_error_from_metadata(self, get, get_service_account_info): + get.side_effect = exceptions.TransportError("transport error") + get_service_account_info.return_value = {"email": "foo@example.com"} + + cred = credentials.IDTokenCredentials( + mock.Mock(), "audience", use_metadata_identity_endpoint=True + ) + + with pytest.raises(exceptions.RefreshError) as excinfo: + cred.refresh(request=mock.Mock()) + assert excinfo.match(r"transport error") + + def test_get_id_token_from_metadata_constructor(self): + with pytest.raises(ValueError): + credentials.IDTokenCredentials( + mock.Mock(), + "audience", + use_metadata_identity_endpoint=True, + token_uri="token_uri", + ) + with pytest.raises(ValueError): + credentials.IDTokenCredentials( + mock.Mock(), + "audience", + use_metadata_identity_endpoint=True, + signer=mock.Mock(), + ) + with pytest.raises(ValueError): + credentials.IDTokenCredentials( + mock.Mock(), + "audience", + use_metadata_identity_endpoint=True, + additional_claims={"key", "value"}, + ) + with pytest.raises(ValueError): + credentials.IDTokenCredentials( + mock.Mock(), + "audience", + use_metadata_identity_endpoint=True, + service_account_email="foo@example.com", + ) |