diff options
author | alexv-smirnov <alex@ydb.tech> | 2023-12-01 12:02:50 +0300 |
---|---|---|
committer | alexv-smirnov <alex@ydb.tech> | 2023-12-01 13:28:10 +0300 |
commit | 0e578a4c44d4abd539d9838347b9ebafaca41dfb (patch) | |
tree | a0c1969c37f818c830ebeff9c077eacf30be6ef8 /contrib/python/google-auth/py2/tests/oauth2 | |
parent | 84f2d3d4cc985e63217cff149bd2e6d67ae6fe22 (diff) | |
download | ydb-0e578a4c44d4abd539d9838347b9ebafaca41dfb.tar.gz |
Change "ya.make"
Diffstat (limited to 'contrib/python/google-auth/py2/tests/oauth2')
9 files changed, 2966 insertions, 0 deletions
diff --git a/contrib/python/google-auth/py2/tests/oauth2/__init__.py b/contrib/python/google-auth/py2/tests/oauth2/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/google-auth/py2/tests/oauth2/__init__.py diff --git a/contrib/python/google-auth/py2/tests/oauth2/test__client.py b/contrib/python/google-auth/py2/tests/oauth2/test__client.py new file mode 100644 index 0000000000..1dba2523e7 --- /dev/null +++ b/contrib/python/google-auth/py2/tests/oauth2/test__client.py @@ -0,0 +1,330 @@ +# Copyright 2016 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json +import os + +import mock +import pytest +import six +from six.moves import http_client +from six.moves import urllib + +from google.auth import _helpers +from google.auth import crypt +from google.auth import exceptions +from google.auth import jwt +from google.auth import transport +from google.oauth2 import _client + + +import yatest.common +DATA_DIR = os.path.join(yatest.common.test_source_path(), "data") + +with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh: + PRIVATE_KEY_BYTES = fh.read() + +SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1") + +SCOPES_AS_LIST = [ + "https://www.googleapis.com/auth/pubsub", + "https://www.googleapis.com/auth/logging.write", +] +SCOPES_AS_STRING = ( + "https://www.googleapis.com/auth/pubsub" + " https://www.googleapis.com/auth/logging.write" +) + + +def test__handle_error_response(): + response_data = {"error": "help", "error_description": "I'm alive"} + + with pytest.raises(exceptions.RefreshError) as excinfo: + _client._handle_error_response(response_data) + + assert excinfo.match(r"help: I\'m alive") + + +def test__handle_error_response_non_json(): + response_data = {"foo": "bar"} + + with pytest.raises(exceptions.RefreshError) as excinfo: + _client._handle_error_response(response_data) + + assert excinfo.match(r"{\"foo\": \"bar\"}") + + +@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) +def test__parse_expiry(unused_utcnow): + result = _client._parse_expiry({"expires_in": 500}) + assert result == datetime.datetime.min + datetime.timedelta(seconds=500) + + +def test__parse_expiry_none(): + assert _client._parse_expiry({}) is None + + +def make_request(response_data, status=http_client.OK): + response = mock.create_autospec(transport.Response, instance=True) + response.status = status + response.data = json.dumps(response_data).encode("utf-8") + request = mock.create_autospec(transport.Request) + request.return_value = response + return request + + +def test__token_endpoint_request(): + request = make_request({"test": "response"}) + + result = _client._token_endpoint_request( + request, "http://example.com", {"test": "params"} + ) + + # Check request call + request.assert_called_with( + method="POST", + url="http://example.com", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + body="test=params".encode("utf-8"), + ) + + # Check result + assert result == {"test": "response"} + + +def test__token_endpoint_request_use_json(): + request = make_request({"test": "response"}) + + result = _client._token_endpoint_request( + request, + "http://example.com", + {"test": "params"}, + access_token="access_token", + use_json=True, + ) + + # Check request call + request.assert_called_with( + method="POST", + url="http://example.com", + headers={ + "Content-Type": "application/json", + "Authorization": "Bearer access_token", + }, + body=b'{"test": "params"}', + ) + + # Check result + assert result == {"test": "response"} + + +def test__token_endpoint_request_error(): + request = make_request({}, status=http_client.BAD_REQUEST) + + with pytest.raises(exceptions.RefreshError): + _client._token_endpoint_request(request, "http://example.com", {}) + + +def test__token_endpoint_request_internal_failure_error(): + request = make_request( + {"error_description": "internal_failure"}, status=http_client.BAD_REQUEST + ) + + with pytest.raises(exceptions.RefreshError): + _client._token_endpoint_request( + request, "http://example.com", {"error_description": "internal_failure"} + ) + + request = make_request( + {"error": "internal_failure"}, status=http_client.BAD_REQUEST + ) + + with pytest.raises(exceptions.RefreshError): + _client._token_endpoint_request( + request, "http://example.com", {"error": "internal_failure"} + ) + + +def verify_request_params(request, params): + request_body = request.call_args[1]["body"].decode("utf-8") + request_params = urllib.parse.parse_qs(request_body) + + for key, value in six.iteritems(params): + assert request_params[key][0] == value + + +@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) +def test_jwt_grant(utcnow): + request = make_request( + {"access_token": "token", "expires_in": 500, "extra": "data"} + ) + + token, expiry, extra_data = _client.jwt_grant( + request, "http://example.com", "assertion_value" + ) + + # Check request call + verify_request_params( + request, {"grant_type": _client._JWT_GRANT_TYPE, "assertion": "assertion_value"} + ) + + # Check result + assert token == "token" + assert expiry == utcnow() + datetime.timedelta(seconds=500) + assert extra_data["extra"] == "data" + + +def test_jwt_grant_no_access_token(): + request = make_request( + { + # No access token. + "expires_in": 500, + "extra": "data", + } + ) + + with pytest.raises(exceptions.RefreshError): + _client.jwt_grant(request, "http://example.com", "assertion_value") + + +def test_id_token_jwt_grant(): + now = _helpers.utcnow() + id_token_expiry = _helpers.datetime_to_secs(now) + id_token = jwt.encode(SIGNER, {"exp": id_token_expiry}).decode("utf-8") + request = make_request({"id_token": id_token, "extra": "data"}) + + token, expiry, extra_data = _client.id_token_jwt_grant( + request, "http://example.com", "assertion_value" + ) + + # Check request call + verify_request_params( + request, {"grant_type": _client._JWT_GRANT_TYPE, "assertion": "assertion_value"} + ) + + # Check result + assert token == id_token + # JWT does not store microseconds + now = now.replace(microsecond=0) + assert expiry == now + assert extra_data["extra"] == "data" + + +def test_id_token_jwt_grant_no_access_token(): + request = make_request( + { + # No access token. + "expires_in": 500, + "extra": "data", + } + ) + + with pytest.raises(exceptions.RefreshError): + _client.id_token_jwt_grant(request, "http://example.com", "assertion_value") + + +@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) +def test_refresh_grant(unused_utcnow): + request = make_request( + { + "access_token": "token", + "refresh_token": "new_refresh_token", + "expires_in": 500, + "extra": "data", + } + ) + + token, refresh_token, expiry, extra_data = _client.refresh_grant( + request, + "http://example.com", + "refresh_token", + "client_id", + "client_secret", + rapt_token="rapt_token", + ) + + # Check request call + verify_request_params( + request, + { + "grant_type": _client._REFRESH_GRANT_TYPE, + "refresh_token": "refresh_token", + "client_id": "client_id", + "client_secret": "client_secret", + "rapt": "rapt_token", + }, + ) + + # Check result + assert token == "token" + assert refresh_token == "new_refresh_token" + assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500) + assert extra_data["extra"] == "data" + + +@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) +def test_refresh_grant_with_scopes(unused_utcnow): + request = make_request( + { + "access_token": "token", + "refresh_token": "new_refresh_token", + "expires_in": 500, + "extra": "data", + "scope": SCOPES_AS_STRING, + } + ) + + token, refresh_token, expiry, extra_data = _client.refresh_grant( + request, + "http://example.com", + "refresh_token", + "client_id", + "client_secret", + SCOPES_AS_LIST, + ) + + # Check request call. + verify_request_params( + request, + { + "grant_type": _client._REFRESH_GRANT_TYPE, + "refresh_token": "refresh_token", + "client_id": "client_id", + "client_secret": "client_secret", + "scope": SCOPES_AS_STRING, + }, + ) + + # Check result. + assert token == "token" + assert refresh_token == "new_refresh_token" + assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500) + assert extra_data["extra"] == "data" + + +def test_refresh_grant_no_access_token(): + request = make_request( + { + # No access token. + "refresh_token": "new_refresh_token", + "expires_in": 500, + "extra": "data", + } + ) + + with pytest.raises(exceptions.RefreshError): + _client.refresh_grant( + request, "http://example.com", "refresh_token", "client_id", "client_secret" + ) diff --git a/contrib/python/google-auth/py2/tests/oauth2/test_challenges.py b/contrib/python/google-auth/py2/tests/oauth2/test_challenges.py new file mode 100644 index 0000000000..019b908dae --- /dev/null +++ b/contrib/python/google-auth/py2/tests/oauth2/test_challenges.py @@ -0,0 +1,132 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the reauth module.""" + +import base64 +import sys + +import mock +import pytest +import pyu2f + +from google.auth import exceptions +from google.oauth2 import challenges + + +def test_get_user_password(): + with mock.patch("getpass.getpass", return_value="foo"): + assert challenges.get_user_password("") == "foo" + + +def test_security_key(): + metadata = { + "status": "READY", + "challengeId": 2, + "challengeType": "SECURITY_KEY", + "securityKey": { + "applicationId": "security_key_application_id", + "challenges": [ + { + "keyHandle": "some_key", + "challenge": base64.urlsafe_b64encode( + "some_challenge".encode("ascii") + ).decode("ascii"), + } + ], + }, + } + mock_key = mock.Mock() + + challenge = challenges.SecurityKeyChallenge() + + # Test the case that security key challenge is passed. + with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key): + with mock.patch( + "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" + ) as mock_authenticate: + mock_authenticate.return_value = "security key response" + assert challenge.name == "SECURITY_KEY" + assert challenge.is_locally_eligible + assert challenge.obtain_challenge_input(metadata) == { + "securityKey": "security key response" + } + mock_authenticate.assert_called_with( + "security_key_application_id", + [{"key": mock_key, "challenge": b"some_challenge"}], + print_callback=sys.stderr.write, + ) + + # Test various types of exceptions. + with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key): + with mock.patch( + "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" + ) as mock_authenticate: + mock_authenticate.side_effect = pyu2f.errors.U2FError( + pyu2f.errors.U2FError.DEVICE_INELIGIBLE + ) + assert challenge.obtain_challenge_input(metadata) is None + + with mock.patch( + "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" + ) as mock_authenticate: + mock_authenticate.side_effect = pyu2f.errors.U2FError( + pyu2f.errors.U2FError.TIMEOUT + ) + assert challenge.obtain_challenge_input(metadata) is None + + with mock.patch( + "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" + ) as mock_authenticate: + mock_authenticate.side_effect = pyu2f.errors.U2FError( + pyu2f.errors.U2FError.BAD_REQUEST + ) + with pytest.raises(pyu2f.errors.U2FError): + challenge.obtain_challenge_input(metadata) + + with mock.patch( + "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" + ) as mock_authenticate: + mock_authenticate.side_effect = pyu2f.errors.NoDeviceFoundError() + assert challenge.obtain_challenge_input(metadata) is None + + with mock.patch( + "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" + ) as mock_authenticate: + mock_authenticate.side_effect = pyu2f.errors.UnsupportedVersionException() + with pytest.raises(pyu2f.errors.UnsupportedVersionException): + challenge.obtain_challenge_input(metadata) + + with mock.patch.dict("sys.modules"): + sys.modules["pyu2f"] = None + with pytest.raises(exceptions.ReauthFailError) as excinfo: + challenge.obtain_challenge_input(metadata) + assert excinfo.match(r"pyu2f dependency is required") + + +@mock.patch("getpass.getpass", return_value="foo") +def test_password_challenge(getpass_mock): + challenge = challenges.PasswordChallenge() + + with mock.patch("getpass.getpass", return_value="foo"): + assert challenge.is_locally_eligible + assert challenge.name == "PASSWORD" + assert challenges.PasswordChallenge().obtain_challenge_input({}) == { + "credential": "foo" + } + + with mock.patch("getpass.getpass", return_value=None): + assert challenges.PasswordChallenge().obtain_challenge_input({}) == { + "credential": " " + } diff --git a/contrib/python/google-auth/py2/tests/oauth2/test_credentials.py b/contrib/python/google-auth/py2/tests/oauth2/test_credentials.py new file mode 100644 index 0000000000..5c21ebe547 --- /dev/null +++ b/contrib/python/google-auth/py2/tests/oauth2/test_credentials.py @@ -0,0 +1,876 @@ +# Copyright 2016 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json +import os +import pickle +import sys + +import mock +import pytest + +from google.auth import _helpers +from google.auth import exceptions +from google.auth import transport +from google.oauth2 import credentials + + +import yatest.common +DATA_DIR = os.path.join(yatest.common.test_source_path(), "data") + +AUTH_USER_JSON_FILE = os.path.join(DATA_DIR, "authorized_user.json") + +with open(AUTH_USER_JSON_FILE, "r") as fh: + AUTH_USER_INFO = json.load(fh) + + +class TestCredentials(object): + TOKEN_URI = "https://example.com/oauth2/token" + REFRESH_TOKEN = "refresh_token" + RAPT_TOKEN = "rapt_token" + CLIENT_ID = "client_id" + CLIENT_SECRET = "client_secret" + + @classmethod + def make_credentials(cls): + return credentials.Credentials( + token=None, + refresh_token=cls.REFRESH_TOKEN, + token_uri=cls.TOKEN_URI, + client_id=cls.CLIENT_ID, + client_secret=cls.CLIENT_SECRET, + rapt_token=cls.RAPT_TOKEN, + ) + + def test_default_state(self): + credentials = self.make_credentials() + assert not credentials.valid + # Expiration hasn't been set yet + assert not credentials.expired + # Scopes aren't required for these credentials + assert not credentials.requires_scopes + # Test properties + assert credentials.refresh_token == self.REFRESH_TOKEN + assert credentials.token_uri == self.TOKEN_URI + assert credentials.client_id == self.CLIENT_ID + assert credentials.client_secret == self.CLIENT_SECRET + assert credentials.rapt_token == self.RAPT_TOKEN + assert credentials.refresh_handler is None + + def test_refresh_handler_setter_and_getter(self): + scopes = ["email", "profile"] + original_refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN_1", None)) + updated_refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN_2", None)) + creds = credentials.Credentials( + token=None, + refresh_token=None, + token_uri=None, + client_id=None, + client_secret=None, + rapt_token=None, + scopes=scopes, + default_scopes=None, + refresh_handler=original_refresh_handler, + ) + + assert creds.refresh_handler is original_refresh_handler + + creds.refresh_handler = updated_refresh_handler + + assert creds.refresh_handler is updated_refresh_handler + + creds.refresh_handler = None + + assert creds.refresh_handler is None + + def test_invalid_refresh_handler(self): + scopes = ["email", "profile"] + with pytest.raises(TypeError) as excinfo: + credentials.Credentials( + token=None, + refresh_token=None, + token_uri=None, + client_id=None, + client_secret=None, + rapt_token=None, + scopes=scopes, + default_scopes=None, + refresh_handler=object(), + ) + + assert excinfo.match("The provided refresh_handler is not a callable or None.") + + @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.min + _helpers.CLOCK_SKEW, + ) + def test_refresh_success(self, unused_utcnow, refresh_grant): + token = "token" + new_rapt_token = "new_rapt_token" + expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) + grant_response = {"id_token": mock.sentinel.id_token} + refresh_grant.return_value = ( + # Access token + token, + # New refresh token + None, + # Expiry, + expiry, + # Extra data + grant_response, + # rapt_token + new_rapt_token, + ) + + request = mock.create_autospec(transport.Request) + credentials = self.make_credentials() + + # Refresh credentials + credentials.refresh(request) + + # Check jwt grant call. + refresh_grant.assert_called_with( + request, + self.TOKEN_URI, + self.REFRESH_TOKEN, + self.CLIENT_ID, + self.CLIENT_SECRET, + None, + self.RAPT_TOKEN, + ) + + # Check that the credentials have the token and expiry + assert credentials.token == token + assert credentials.expiry == expiry + assert credentials.id_token == mock.sentinel.id_token + assert credentials.rapt_token == new_rapt_token + + # Check that the credentials are valid (have a token and are not + # expired) + assert credentials.valid + + def test_refresh_no_refresh_token(self): + request = mock.create_autospec(transport.Request) + credentials_ = credentials.Credentials(token=None, refresh_token=None) + + with pytest.raises(exceptions.RefreshError, match="necessary fields"): + credentials_.refresh(request) + + request.assert_not_called() + + @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.min + _helpers.CLOCK_SKEW, + ) + def test_refresh_with_refresh_token_and_refresh_handler( + self, unused_utcnow, refresh_grant + ): + token = "token" + new_rapt_token = "new_rapt_token" + expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) + grant_response = {"id_token": mock.sentinel.id_token} + refresh_grant.return_value = ( + # Access token + token, + # New refresh token + None, + # Expiry, + expiry, + # Extra data + grant_response, + # rapt_token + new_rapt_token, + ) + + refresh_handler = mock.Mock() + request = mock.create_autospec(transport.Request) + creds = credentials.Credentials( + token=None, + refresh_token=self.REFRESH_TOKEN, + token_uri=self.TOKEN_URI, + client_id=self.CLIENT_ID, + client_secret=self.CLIENT_SECRET, + rapt_token=self.RAPT_TOKEN, + refresh_handler=refresh_handler, + ) + + # Refresh credentials + creds.refresh(request) + + # Check jwt grant call. + refresh_grant.assert_called_with( + request, + self.TOKEN_URI, + self.REFRESH_TOKEN, + self.CLIENT_ID, + self.CLIENT_SECRET, + None, + self.RAPT_TOKEN, + ) + + # Check that the credentials have the token and expiry + assert creds.token == token + assert creds.expiry == expiry + assert creds.id_token == mock.sentinel.id_token + assert creds.rapt_token == new_rapt_token + + # Check that the credentials are valid (have a token and are not + # expired) + assert creds.valid + + # Assert refresh handler not called as the refresh token has + # higher priority. + refresh_handler.assert_not_called() + + @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) + def test_refresh_with_refresh_handler_success_scopes(self, unused_utcnow): + expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800) + refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN", expected_expiry)) + scopes = ["email", "profile"] + default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] + request = mock.create_autospec(transport.Request) + creds = credentials.Credentials( + token=None, + refresh_token=None, + token_uri=None, + client_id=None, + client_secret=None, + rapt_token=None, + scopes=scopes, + default_scopes=default_scopes, + refresh_handler=refresh_handler, + ) + + creds.refresh(request) + + assert creds.token == "ACCESS_TOKEN" + assert creds.expiry == expected_expiry + assert creds.valid + assert not creds.expired + # Confirm refresh handler called with the expected arguments. + refresh_handler.assert_called_with(request, scopes=scopes) + + @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) + def test_refresh_with_refresh_handler_success_default_scopes(self, unused_utcnow): + expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800) + original_refresh_handler = mock.Mock( + return_value=("UNUSED_TOKEN", expected_expiry) + ) + refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN", expected_expiry)) + default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] + request = mock.create_autospec(transport.Request) + creds = credentials.Credentials( + token=None, + refresh_token=None, + token_uri=None, + client_id=None, + client_secret=None, + rapt_token=None, + scopes=None, + default_scopes=default_scopes, + refresh_handler=original_refresh_handler, + ) + + # Test newly set refresh_handler is used instead of the original one. + creds.refresh_handler = refresh_handler + creds.refresh(request) + + assert creds.token == "ACCESS_TOKEN" + assert creds.expiry == expected_expiry + assert creds.valid + assert not creds.expired + # default_scopes should be used since no developer provided scopes + # are provided. + refresh_handler.assert_called_with(request, scopes=default_scopes) + + @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) + def test_refresh_with_refresh_handler_invalid_token(self, unused_utcnow): + expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800) + # Simulate refresh handler does not return a valid token. + refresh_handler = mock.Mock(return_value=(None, expected_expiry)) + scopes = ["email", "profile"] + default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] + request = mock.create_autospec(transport.Request) + creds = credentials.Credentials( + token=None, + refresh_token=None, + token_uri=None, + client_id=None, + client_secret=None, + rapt_token=None, + scopes=scopes, + default_scopes=default_scopes, + refresh_handler=refresh_handler, + ) + + with pytest.raises( + exceptions.RefreshError, match="returned token is not a string" + ): + creds.refresh(request) + + assert creds.token is None + assert creds.expiry is None + assert not creds.valid + # Confirm refresh handler called with the expected arguments. + refresh_handler.assert_called_with(request, scopes=scopes) + + def test_refresh_with_refresh_handler_invalid_expiry(self): + # Simulate refresh handler returns expiration time in an invalid unit. + refresh_handler = mock.Mock(return_value=("TOKEN", 2800)) + scopes = ["email", "profile"] + default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] + request = mock.create_autospec(transport.Request) + creds = credentials.Credentials( + token=None, + refresh_token=None, + token_uri=None, + client_id=None, + client_secret=None, + rapt_token=None, + scopes=scopes, + default_scopes=default_scopes, + refresh_handler=refresh_handler, + ) + + with pytest.raises( + exceptions.RefreshError, match="returned expiry is not a datetime object" + ): + creds.refresh(request) + + assert creds.token is None + assert creds.expiry is None + assert not creds.valid + # Confirm refresh handler called with the expected arguments. + refresh_handler.assert_called_with(request, scopes=scopes) + + @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) + def test_refresh_with_refresh_handler_expired_token(self, unused_utcnow): + expected_expiry = datetime.datetime.min + _helpers.CLOCK_SKEW + # Simulate refresh handler returns an expired token. + refresh_handler = mock.Mock(return_value=("TOKEN", expected_expiry)) + scopes = ["email", "profile"] + default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] + request = mock.create_autospec(transport.Request) + creds = credentials.Credentials( + token=None, + refresh_token=None, + token_uri=None, + client_id=None, + client_secret=None, + rapt_token=None, + scopes=scopes, + default_scopes=default_scopes, + refresh_handler=refresh_handler, + ) + + with pytest.raises(exceptions.RefreshError, match="already expired"): + creds.refresh(request) + + assert creds.token is None + assert creds.expiry is None + assert not creds.valid + # Confirm refresh handler called with the expected arguments. + refresh_handler.assert_called_with(request, scopes=scopes) + + @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.min + _helpers.CLOCK_SKEW, + ) + def test_credentials_with_scopes_requested_refresh_success( + self, unused_utcnow, refresh_grant + ): + scopes = ["email", "profile"] + default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] + token = "token" + new_rapt_token = "new_rapt_token" + expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) + grant_response = {"id_token": mock.sentinel.id_token, "scope": "email profile"} + refresh_grant.return_value = ( + # Access token + token, + # New refresh token + None, + # Expiry, + expiry, + # Extra data + grant_response, + # rapt token + new_rapt_token, + ) + + request = mock.create_autospec(transport.Request) + creds = credentials.Credentials( + token=None, + refresh_token=self.REFRESH_TOKEN, + token_uri=self.TOKEN_URI, + client_id=self.CLIENT_ID, + client_secret=self.CLIENT_SECRET, + scopes=scopes, + default_scopes=default_scopes, + rapt_token=self.RAPT_TOKEN, + ) + + # Refresh credentials + creds.refresh(request) + + # Check jwt grant call. + refresh_grant.assert_called_with( + request, + self.TOKEN_URI, + self.REFRESH_TOKEN, + self.CLIENT_ID, + self.CLIENT_SECRET, + scopes, + self.RAPT_TOKEN, + ) + + # Check that the credentials have the token and expiry + assert creds.token == token + assert creds.expiry == expiry + assert creds.id_token == mock.sentinel.id_token + assert creds.has_scopes(scopes) + assert creds.rapt_token == new_rapt_token + + # Check that the credentials are valid (have a token and are not + # expired.) + assert creds.valid + + @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.min + _helpers.CLOCK_SKEW, + ) + def test_credentials_with_only_default_scopes_requested( + self, unused_utcnow, refresh_grant + ): + default_scopes = ["email", "profile"] + token = "token" + new_rapt_token = "new_rapt_token" + expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) + grant_response = {"id_token": mock.sentinel.id_token} + refresh_grant.return_value = ( + # Access token + token, + # New refresh token + None, + # Expiry, + expiry, + # Extra data + grant_response, + # rapt token + new_rapt_token, + ) + + request = mock.create_autospec(transport.Request) + creds = credentials.Credentials( + token=None, + refresh_token=self.REFRESH_TOKEN, + token_uri=self.TOKEN_URI, + client_id=self.CLIENT_ID, + client_secret=self.CLIENT_SECRET, + default_scopes=default_scopes, + rapt_token=self.RAPT_TOKEN, + ) + + # Refresh credentials + creds.refresh(request) + + # Check jwt grant call. + refresh_grant.assert_called_with( + request, + self.TOKEN_URI, + self.REFRESH_TOKEN, + self.CLIENT_ID, + self.CLIENT_SECRET, + default_scopes, + self.RAPT_TOKEN, + ) + + # Check that the credentials have the token and expiry + assert creds.token == token + assert creds.expiry == expiry + assert creds.id_token == mock.sentinel.id_token + assert creds.has_scopes(default_scopes) + assert creds.rapt_token == new_rapt_token + + # Check that the credentials are valid (have a token and are not + # expired.) + assert creds.valid + + @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.min + _helpers.CLOCK_SKEW, + ) + def test_credentials_with_scopes_returned_refresh_success( + self, unused_utcnow, refresh_grant + ): + scopes = ["email", "profile"] + token = "token" + new_rapt_token = "new_rapt_token" + expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) + grant_response = { + "id_token": mock.sentinel.id_token, + "scopes": " ".join(scopes), + } + refresh_grant.return_value = ( + # Access token + token, + # New refresh token + None, + # Expiry, + expiry, + # Extra data + grant_response, + # rapt token + new_rapt_token, + ) + + request = mock.create_autospec(transport.Request) + creds = credentials.Credentials( + token=None, + refresh_token=self.REFRESH_TOKEN, + token_uri=self.TOKEN_URI, + client_id=self.CLIENT_ID, + client_secret=self.CLIENT_SECRET, + scopes=scopes, + rapt_token=self.RAPT_TOKEN, + ) + + # Refresh credentials + creds.refresh(request) + + # Check jwt grant call. + refresh_grant.assert_called_with( + request, + self.TOKEN_URI, + self.REFRESH_TOKEN, + self.CLIENT_ID, + self.CLIENT_SECRET, + scopes, + self.RAPT_TOKEN, + ) + + # Check that the credentials have the token and expiry + assert creds.token == token + assert creds.expiry == expiry + assert creds.id_token == mock.sentinel.id_token + assert creds.has_scopes(scopes) + assert creds.rapt_token == new_rapt_token + + # Check that the credentials are valid (have a token and are not + # expired.) + assert creds.valid + + @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) + @mock.patch( + "google.auth._helpers.utcnow", + return_value=datetime.datetime.min + _helpers.CLOCK_SKEW, + ) + def test_credentials_with_scopes_refresh_failure_raises_refresh_error( + self, unused_utcnow, refresh_grant + ): + scopes = ["email", "profile"] + scopes_returned = ["email"] + token = "token" + new_rapt_token = "new_rapt_token" + expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) + grant_response = { + "id_token": mock.sentinel.id_token, + "scope": " ".join(scopes_returned), + } + refresh_grant.return_value = ( + # Access token + token, + # New refresh token + None, + # Expiry, + expiry, + # Extra data + grant_response, + # rapt token + new_rapt_token, + ) + + request = mock.create_autospec(transport.Request) + creds = credentials.Credentials( + token=None, + refresh_token=self.REFRESH_TOKEN, + token_uri=self.TOKEN_URI, + client_id=self.CLIENT_ID, + client_secret=self.CLIENT_SECRET, + scopes=scopes, + rapt_token=self.RAPT_TOKEN, + ) + + # Refresh credentials + with pytest.raises( + exceptions.RefreshError, match="Not all requested scopes were granted" + ): + creds.refresh(request) + + # Check jwt grant call. + refresh_grant.assert_called_with( + request, + self.TOKEN_URI, + self.REFRESH_TOKEN, + self.CLIENT_ID, + self.CLIENT_SECRET, + scopes, + self.RAPT_TOKEN, + ) + + # Check that the credentials have the token and expiry + assert creds.token == token + assert creds.expiry == expiry + assert creds.id_token == mock.sentinel.id_token + assert creds.has_scopes(scopes) + assert creds.rapt_token == new_rapt_token + + # Check that the credentials are valid (have a token and are not + # expired.) + assert creds.valid + + def test_apply_with_quota_project_id(self): + creds = credentials.Credentials( + token="token", + refresh_token=self.REFRESH_TOKEN, + token_uri=self.TOKEN_URI, + client_id=self.CLIENT_ID, + client_secret=self.CLIENT_SECRET, + quota_project_id="quota-project-123", + ) + + headers = {} + creds.apply(headers) + assert headers["x-goog-user-project"] == "quota-project-123" + assert "token" in headers["authorization"] + + def test_apply_with_no_quota_project_id(self): + creds = credentials.Credentials( + token="token", + refresh_token=self.REFRESH_TOKEN, + token_uri=self.TOKEN_URI, + client_id=self.CLIENT_ID, + client_secret=self.CLIENT_SECRET, + ) + + headers = {} + creds.apply(headers) + assert "x-goog-user-project" not in headers + assert "token" in headers["authorization"] + + def test_with_quota_project(self): + creds = credentials.Credentials( + token="token", + refresh_token=self.REFRESH_TOKEN, + token_uri=self.TOKEN_URI, + client_id=self.CLIENT_ID, + client_secret=self.CLIENT_SECRET, + quota_project_id="quota-project-123", + ) + + new_creds = creds.with_quota_project("new-project-456") + assert new_creds.quota_project_id == "new-project-456" + headers = {} + creds.apply(headers) + assert "x-goog-user-project" in headers + + def test_from_authorized_user_info(self): + info = AUTH_USER_INFO.copy() + + creds = credentials.Credentials.from_authorized_user_info(info) + assert creds.client_secret == info["client_secret"] + assert creds.client_id == info["client_id"] + assert creds.refresh_token == info["refresh_token"] + assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT + assert creds.scopes is None + + scopes = ["email", "profile"] + creds = credentials.Credentials.from_authorized_user_info(info, scopes) + assert creds.client_secret == info["client_secret"] + assert creds.client_id == info["client_id"] + assert creds.refresh_token == info["refresh_token"] + assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT + assert creds.scopes == scopes + + info["scopes"] = "email" # single non-array scope from file + creds = credentials.Credentials.from_authorized_user_info(info) + assert creds.scopes == [info["scopes"]] + + info["scopes"] = ["email", "profile"] # array scope from file + creds = credentials.Credentials.from_authorized_user_info(info) + assert creds.scopes == info["scopes"] + + expiry = datetime.datetime(2020, 8, 14, 15, 54, 1) + info["expiry"] = expiry.isoformat() + "Z" + creds = credentials.Credentials.from_authorized_user_info(info) + assert creds.expiry == expiry + assert creds.expired + + def test_from_authorized_user_file(self): + info = AUTH_USER_INFO.copy() + + creds = credentials.Credentials.from_authorized_user_file(AUTH_USER_JSON_FILE) + assert creds.client_secret == info["client_secret"] + assert creds.client_id == info["client_id"] + assert creds.refresh_token == info["refresh_token"] + assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT + assert creds.scopes is None + + scopes = ["email", "profile"] + creds = credentials.Credentials.from_authorized_user_file( + AUTH_USER_JSON_FILE, scopes + ) + assert creds.client_secret == info["client_secret"] + assert creds.client_id == info["client_id"] + assert creds.refresh_token == info["refresh_token"] + assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT + assert creds.scopes == scopes + + def test_to_json(self): + info = AUTH_USER_INFO.copy() + expiry = datetime.datetime(2020, 8, 14, 15, 54, 1) + info["expiry"] = expiry.isoformat() + "Z" + creds = credentials.Credentials.from_authorized_user_info(info) + assert creds.expiry == expiry + + # Test with no `strip` arg + json_output = creds.to_json() + json_asdict = json.loads(json_output) + assert json_asdict.get("token") == creds.token + assert json_asdict.get("refresh_token") == creds.refresh_token + assert json_asdict.get("token_uri") == creds.token_uri + assert json_asdict.get("client_id") == creds.client_id + assert json_asdict.get("scopes") == creds.scopes + assert json_asdict.get("client_secret") == creds.client_secret + assert json_asdict.get("expiry") == info["expiry"] + + # Test with a `strip` arg + json_output = creds.to_json(strip=["client_secret"]) + json_asdict = json.loads(json_output) + assert json_asdict.get("token") == creds.token + assert json_asdict.get("refresh_token") == creds.refresh_token + assert json_asdict.get("token_uri") == creds.token_uri + assert json_asdict.get("client_id") == creds.client_id + assert json_asdict.get("scopes") == creds.scopes + assert json_asdict.get("client_secret") is None + + # Test with no expiry + creds.expiry = None + json_output = creds.to_json() + json_asdict = json.loads(json_output) + assert json_asdict.get("expiry") is None + + def test_pickle_and_unpickle(self): + creds = self.make_credentials() + unpickled = pickle.loads(pickle.dumps(creds)) + + # make sure attributes aren't lost during pickling + assert list(creds.__dict__).sort() == list(unpickled.__dict__).sort() + + for attr in list(creds.__dict__): + assert getattr(creds, attr) == getattr(unpickled, attr) + + def test_pickle_and_unpickle_with_refresh_handler(self): + expected_expiry = _helpers.utcnow() + datetime.timedelta(seconds=2800) + refresh_handler = mock.Mock(return_value=("TOKEN", expected_expiry)) + + creds = credentials.Credentials( + token=None, + refresh_token=None, + token_uri=None, + client_id=None, + client_secret=None, + rapt_token=None, + refresh_handler=refresh_handler, + ) + unpickled = pickle.loads(pickle.dumps(creds)) + + # make sure attributes aren't lost during pickling + assert list(creds.__dict__).sort() == list(unpickled.__dict__).sort() + + for attr in list(creds.__dict__): + # For the _refresh_handler property, the unpickled creds should be + # set to None. + if attr == "_refresh_handler": + assert getattr(unpickled, attr) is None + else: + assert getattr(creds, attr) == getattr(unpickled, attr) + + def test_pickle_with_missing_attribute(self): + creds = self.make_credentials() + + # remove an optional attribute before pickling + # this mimics a pickle created with a previous class definition with + # fewer attributes + del creds.__dict__["_quota_project_id"] + + unpickled = pickle.loads(pickle.dumps(creds)) + + # Attribute should be initialized by `__setstate__` + assert unpickled.quota_project_id is None + + # pickles are not compatible across versions + @pytest.mark.skipif( + sys.version_info < (3, 5), + reason="pickle file can only be loaded with Python >= 3.5", + ) + def test_unpickle_old_credentials_pickle(self): + # make sure a credentials file pickled with an older + # library version (google-auth==1.5.1) can be unpickled + with open( + os.path.join(DATA_DIR, "old_oauth_credentials_py3.pickle"), "rb" + ) as f: + credentials = pickle.load(f) + assert credentials.quota_project_id is None + + +class TestUserAccessTokenCredentials(object): + def test_instance(self): + cred = credentials.UserAccessTokenCredentials() + assert cred._account is None + + cred = cred.with_account("account") + assert cred._account == "account" + + @mock.patch("google.auth._cloud_sdk.get_auth_access_token", autospec=True) + def test_refresh(self, get_auth_access_token): + get_auth_access_token.return_value = "access_token" + cred = credentials.UserAccessTokenCredentials() + cred.refresh(None) + assert cred.token == "access_token" + + def test_with_quota_project(self): + cred = credentials.UserAccessTokenCredentials() + quota_project_cred = cred.with_quota_project("project-foo") + + assert quota_project_cred._quota_project_id == "project-foo" + assert quota_project_cred._account == cred._account + + @mock.patch( + "google.oauth2.credentials.UserAccessTokenCredentials.apply", autospec=True + ) + @mock.patch( + "google.oauth2.credentials.UserAccessTokenCredentials.refresh", autospec=True + ) + def test_before_request(self, refresh, apply): + cred = credentials.UserAccessTokenCredentials() + cred.before_request(mock.Mock(), "GET", "https://example.com", {}) + refresh.assert_called() + apply.assert_called() diff --git a/contrib/python/google-auth/py2/tests/oauth2/test_id_token.py b/contrib/python/google-auth/py2/tests/oauth2/test_id_token.py new file mode 100644 index 0000000000..9576f2562a --- /dev/null +++ b/contrib/python/google-auth/py2/tests/oauth2/test_id_token.py @@ -0,0 +1,228 @@ +# Copyright 2014 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os + +import mock +import pytest + +from google.auth import environment_vars +from google.auth import exceptions +from google.auth import transport +import google.auth.compute_engine._metadata +from google.oauth2 import id_token +from google.oauth2 import service_account + +import yatest.common +SERVICE_ACCOUNT_FILE = os.path.join( + yatest.common.test_source_path(), "data/service_account.json" +) + + +def make_request(status, data=None): + response = mock.create_autospec(transport.Response, instance=True) + response.status = status + + if data is not None: + response.data = json.dumps(data).encode("utf-8") + + request = mock.create_autospec(transport.Request) + request.return_value = response + return request + + +def test__fetch_certs_success(): + certs = {"1": "cert"} + request = make_request(200, certs) + + returned_certs = id_token._fetch_certs(request, mock.sentinel.cert_url) + + request.assert_called_once_with(mock.sentinel.cert_url, method="GET") + assert returned_certs == certs + + +def test__fetch_certs_failure(): + request = make_request(404) + + with pytest.raises(exceptions.TransportError): + id_token._fetch_certs(request, mock.sentinel.cert_url) + + request.assert_called_once_with(mock.sentinel.cert_url, method="GET") + + +@mock.patch("google.auth.jwt.decode", autospec=True) +@mock.patch("google.oauth2.id_token._fetch_certs", autospec=True) +def test_verify_token(_fetch_certs, decode): + result = id_token.verify_token(mock.sentinel.token, mock.sentinel.request) + + assert result == decode.return_value + _fetch_certs.assert_called_once_with( + mock.sentinel.request, id_token._GOOGLE_OAUTH2_CERTS_URL + ) + decode.assert_called_once_with( + mock.sentinel.token, certs=_fetch_certs.return_value, audience=None + ) + + +@mock.patch("google.auth.jwt.decode", autospec=True) +@mock.patch("google.oauth2.id_token._fetch_certs", autospec=True) +def test_verify_token_args(_fetch_certs, decode): + result = id_token.verify_token( + mock.sentinel.token, + mock.sentinel.request, + audience=mock.sentinel.audience, + certs_url=mock.sentinel.certs_url, + ) + + assert result == decode.return_value + _fetch_certs.assert_called_once_with(mock.sentinel.request, mock.sentinel.certs_url) + decode.assert_called_once_with( + mock.sentinel.token, + certs=_fetch_certs.return_value, + audience=mock.sentinel.audience, + ) + + +@mock.patch("google.oauth2.id_token.verify_token", autospec=True) +def test_verify_oauth2_token(verify_token): + verify_token.return_value = {"iss": "accounts.google.com"} + result = id_token.verify_oauth2_token( + mock.sentinel.token, mock.sentinel.request, audience=mock.sentinel.audience + ) + + assert result == verify_token.return_value + verify_token.assert_called_once_with( + mock.sentinel.token, + mock.sentinel.request, + audience=mock.sentinel.audience, + certs_url=id_token._GOOGLE_OAUTH2_CERTS_URL, + ) + + +@mock.patch("google.oauth2.id_token.verify_token", autospec=True) +def test_verify_oauth2_token_invalid_iss(verify_token): + verify_token.return_value = {"iss": "invalid_issuer"} + + with pytest.raises(exceptions.GoogleAuthError): + id_token.verify_oauth2_token( + mock.sentinel.token, mock.sentinel.request, audience=mock.sentinel.audience + ) + + +@mock.patch("google.oauth2.id_token.verify_token", autospec=True) +def test_verify_firebase_token(verify_token): + result = id_token.verify_firebase_token( + mock.sentinel.token, mock.sentinel.request, audience=mock.sentinel.audience + ) + + assert result == verify_token.return_value + verify_token.assert_called_once_with( + mock.sentinel.token, + mock.sentinel.request, + audience=mock.sentinel.audience, + certs_url=id_token._GOOGLE_APIS_CERTS_URL, + ) + + +def test_fetch_id_token_from_metadata_server(monkeypatch): + monkeypatch.delenv(environment_vars.CREDENTIALS, raising=False) + + def mock_init(self, request, audience, use_metadata_identity_endpoint): + assert use_metadata_identity_endpoint + self.token = "id_token" + + with mock.patch("google.auth.compute_engine._metadata.ping", return_value=True): + with mock.patch.multiple( + google.auth.compute_engine.IDTokenCredentials, + __init__=mock_init, + refresh=mock.Mock(), + ): + request = mock.Mock() + token = id_token.fetch_id_token(request, "https://pubsub.googleapis.com") + assert token == "id_token" + + +def test_fetch_id_token_from_explicit_cred_json_file(monkeypatch): + monkeypatch.setenv(environment_vars.CREDENTIALS, SERVICE_ACCOUNT_FILE) + + def mock_refresh(self, request): + self.token = "id_token" + + with mock.patch.object(service_account.IDTokenCredentials, "refresh", mock_refresh): + request = mock.Mock() + token = id_token.fetch_id_token(request, "https://pubsub.googleapis.com") + assert token == "id_token" + + +def test_fetch_id_token_no_cred_exists(monkeypatch): + monkeypatch.delenv(environment_vars.CREDENTIALS, raising=False) + + with mock.patch( + "google.auth.compute_engine._metadata.ping", + side_effect=exceptions.TransportError(), + ): + with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: + request = mock.Mock() + id_token.fetch_id_token(request, "https://pubsub.googleapis.com") + assert excinfo.match( + r"Neither metadata server or valid service account credentials are found." + ) + + with mock.patch("google.auth.compute_engine._metadata.ping", return_value=False): + with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: + request = mock.Mock() + id_token.fetch_id_token(request, "https://pubsub.googleapis.com") + assert excinfo.match( + r"Neither metadata server or valid service account credentials are found." + ) + + +def test_fetch_id_token_invalid_cred_file_type(monkeypatch): + user_credentials_file = os.path.join( + yatest.common.test_source_path(), "data/authorized_user.json" + ) + monkeypatch.setenv(environment_vars.CREDENTIALS, user_credentials_file) + + with mock.patch("google.auth.compute_engine._metadata.ping", return_value=False): + with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: + request = mock.Mock() + id_token.fetch_id_token(request, "https://pubsub.googleapis.com") + assert excinfo.match( + r"Neither metadata server or valid service account credentials are found." + ) + + +def test_fetch_id_token_invalid_json(monkeypatch): + not_json_file = os.path.join(yatest.common.test_source_path(), "data/public_cert.pem") + monkeypatch.setenv(environment_vars.CREDENTIALS, not_json_file) + + with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: + request = mock.Mock() + id_token.fetch_id_token(request, "https://pubsub.googleapis.com") + assert excinfo.match( + r"GOOGLE_APPLICATION_CREDENTIALS is not valid service account credentials." + ) + + +def test_fetch_id_token_invalid_cred_path(monkeypatch): + not_json_file = os.path.join(yatest.common.test_source_path(), "data/not_exists.json") + monkeypatch.setenv(environment_vars.CREDENTIALS, not_json_file) + + with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: + request = mock.Mock() + id_token.fetch_id_token(request, "https://pubsub.googleapis.com") + assert excinfo.match( + r"GOOGLE_APPLICATION_CREDENTIALS path is either not found or invalid." + ) diff --git a/contrib/python/google-auth/py2/tests/oauth2/test_reauth.py b/contrib/python/google-auth/py2/tests/oauth2/test_reauth.py new file mode 100644 index 0000000000..7876986873 --- /dev/null +++ b/contrib/python/google-auth/py2/tests/oauth2/test_reauth.py @@ -0,0 +1,308 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +import mock +import pytest + +from google.auth import exceptions +from google.oauth2 import reauth + + +MOCK_REQUEST = mock.Mock() +CHALLENGES_RESPONSE_TEMPLATE = { + "status": "CHALLENGE_REQUIRED", + "sessionId": "123", + "challenges": [ + { + "status": "READY", + "challengeId": 1, + "challengeType": "PASSWORD", + "securityKey": {}, + } + ], +} +CHALLENGES_RESPONSE_AUTHENTICATED = { + "status": "AUTHENTICATED", + "sessionId": "123", + "encodedProofOfReauthToken": "new_rapt_token", +} + + +class MockChallenge(object): + def __init__(self, name, locally_eligible, challenge_input): + self.name = name + self.is_locally_eligible = locally_eligible + self.challenge_input = challenge_input + + def obtain_challenge_input(self, metadata): + return self.challenge_input + + +def _test_is_interactive(): + with mock.patch("sys.stdin.isatty", return_value=True): + assert reauth.is_interactive() + + +def test__get_challenges(): + with mock.patch( + "google.oauth2._client._token_endpoint_request" + ) as mock_token_endpoint_request: + reauth._get_challenges(MOCK_REQUEST, ["SAML"], "token") + mock_token_endpoint_request.assert_called_with( + MOCK_REQUEST, + reauth._REAUTH_API + ":start", + {"supportedChallengeTypes": ["SAML"]}, + access_token="token", + use_json=True, + ) + + +def test__get_challenges_with_scopes(): + with mock.patch( + "google.oauth2._client._token_endpoint_request" + ) as mock_token_endpoint_request: + reauth._get_challenges( + MOCK_REQUEST, ["SAML"], "token", requested_scopes=["scope"] + ) + mock_token_endpoint_request.assert_called_with( + MOCK_REQUEST, + reauth._REAUTH_API + ":start", + { + "supportedChallengeTypes": ["SAML"], + "oauthScopesForDomainPolicyLookup": ["scope"], + }, + access_token="token", + use_json=True, + ) + + +def test__send_challenge_result(): + with mock.patch( + "google.oauth2._client._token_endpoint_request" + ) as mock_token_endpoint_request: + reauth._send_challenge_result( + MOCK_REQUEST, "123", "1", {"credential": "password"}, "token" + ) + mock_token_endpoint_request.assert_called_with( + MOCK_REQUEST, + reauth._REAUTH_API + "/123:continue", + { + "sessionId": "123", + "challengeId": "1", + "action": "RESPOND", + "proposalResponse": {"credential": "password"}, + }, + access_token="token", + use_json=True, + ) + + +def test__run_next_challenge_not_ready(): + challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE) + challenges_response["challenges"][0]["status"] = "STATUS_UNSPECIFIED" + assert ( + reauth._run_next_challenge(challenges_response, MOCK_REQUEST, "token") is None + ) + + +def test__run_next_challenge_not_supported(): + challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE) + challenges_response["challenges"][0]["challengeType"] = "CHALLENGE_TYPE_UNSPECIFIED" + with pytest.raises(exceptions.ReauthFailError) as excinfo: + reauth._run_next_challenge(challenges_response, MOCK_REQUEST, "token") + assert excinfo.match(r"Unsupported challenge type CHALLENGE_TYPE_UNSPECIFIED") + + +def test__run_next_challenge_not_locally_eligible(): + mock_challenge = MockChallenge("PASSWORD", False, "challenge_input") + with mock.patch( + "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge} + ): + with pytest.raises(exceptions.ReauthFailError) as excinfo: + reauth._run_next_challenge( + CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token" + ) + assert excinfo.match(r"Challenge PASSWORD is not locally eligible") + + +def test__run_next_challenge_no_challenge_input(): + mock_challenge = MockChallenge("PASSWORD", True, None) + with mock.patch( + "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge} + ): + assert ( + reauth._run_next_challenge( + CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token" + ) + is None + ) + + +def test__run_next_challenge_success(): + mock_challenge = MockChallenge("PASSWORD", True, {"credential": "password"}) + with mock.patch( + "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge} + ): + with mock.patch( + "google.oauth2.reauth._send_challenge_result" + ) as mock_send_challenge_result: + reauth._run_next_challenge( + CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token" + ) + mock_send_challenge_result.assert_called_with( + MOCK_REQUEST, "123", 1, {"credential": "password"}, "token" + ) + + +def test__obtain_rapt_authenticated(): + with mock.patch( + "google.oauth2.reauth._get_challenges", + return_value=CHALLENGES_RESPONSE_AUTHENTICATED, + ): + assert reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token" + + +def test__obtain_rapt_authenticated_after_run_next_challenge(): + with mock.patch( + "google.oauth2.reauth._get_challenges", + return_value=CHALLENGES_RESPONSE_TEMPLATE, + ): + with mock.patch( + "google.oauth2.reauth._run_next_challenge", + side_effect=[ + CHALLENGES_RESPONSE_TEMPLATE, + CHALLENGES_RESPONSE_AUTHENTICATED, + ], + ): + with mock.patch("google.oauth2.reauth.is_interactive", return_value=True): + assert ( + reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token" + ) + + +def test__obtain_rapt_unsupported_status(): + challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE) + challenges_response["status"] = "STATUS_UNSPECIFIED" + with mock.patch( + "google.oauth2.reauth._get_challenges", return_value=challenges_response + ): + with pytest.raises(exceptions.ReauthFailError) as excinfo: + reauth._obtain_rapt(MOCK_REQUEST, "token", None) + assert excinfo.match(r"API error: STATUS_UNSPECIFIED") + + +def test__obtain_rapt_not_interactive(): + with mock.patch( + "google.oauth2.reauth._get_challenges", + return_value=CHALLENGES_RESPONSE_TEMPLATE, + ): + with mock.patch("google.oauth2.reauth.is_interactive", return_value=False): + with pytest.raises(exceptions.ReauthFailError) as excinfo: + reauth._obtain_rapt(MOCK_REQUEST, "token", None) + assert excinfo.match(r"not in an interactive session") + + +def test__obtain_rapt_not_authenticated(): + with mock.patch( + "google.oauth2.reauth._get_challenges", + return_value=CHALLENGES_RESPONSE_TEMPLATE, + ): + with mock.patch("google.oauth2.reauth.RUN_CHALLENGE_RETRY_LIMIT", 0): + with pytest.raises(exceptions.ReauthFailError) as excinfo: + reauth._obtain_rapt(MOCK_REQUEST, "token", None) + assert excinfo.match(r"Reauthentication failed") + + +def test_get_rapt_token(): + with mock.patch( + "google.oauth2._client.refresh_grant", return_value=("token", None, None, None) + ) as mock_refresh_grant: + with mock.patch( + "google.oauth2.reauth._obtain_rapt", return_value="new_rapt_token" + ) as mock_obtain_rapt: + assert ( + reauth.get_rapt_token( + MOCK_REQUEST, + "client_id", + "client_secret", + "refresh_token", + "token_uri", + ) + == "new_rapt_token" + ) + mock_refresh_grant.assert_called_with( + request=MOCK_REQUEST, + client_id="client_id", + client_secret="client_secret", + refresh_token="refresh_token", + token_uri="token_uri", + scopes=[reauth._REAUTH_SCOPE], + ) + mock_obtain_rapt.assert_called_with( + MOCK_REQUEST, "token", requested_scopes=None + ) + + +def test_refresh_grant_failed(): + with mock.patch( + "google.oauth2._client._token_endpoint_request_no_throw" + ) as mock_token_request: + mock_token_request.return_value = (False, {"error": "Bad request"}) + with pytest.raises(exceptions.RefreshError) as excinfo: + reauth.refresh_grant( + MOCK_REQUEST, + "token_uri", + "refresh_token", + "client_id", + "client_secret", + scopes=["foo", "bar"], + rapt_token="rapt_token", + ) + assert excinfo.match(r"Bad request") + mock_token_request.assert_called_with( + MOCK_REQUEST, + "token_uri", + { + "grant_type": "refresh_token", + "client_id": "client_id", + "client_secret": "client_secret", + "refresh_token": "refresh_token", + "scope": "foo bar", + "rapt": "rapt_token", + }, + ) + + +def test_refresh_grant_success(): + with mock.patch( + "google.oauth2._client._token_endpoint_request_no_throw" + ) as mock_token_request: + mock_token_request.side_effect = [ + (False, {"error": "invalid_grant", "error_subtype": "rapt_required"}), + (True, {"access_token": "access_token"}), + ] + with mock.patch( + "google.oauth2.reauth.get_rapt_token", return_value="new_rapt_token" + ): + assert reauth.refresh_grant( + MOCK_REQUEST, "token_uri", "refresh_token", "client_id", "client_secret" + ) == ( + "access_token", + "refresh_token", + None, + {"access_token": "access_token"}, + "new_rapt_token", + ) diff --git a/contrib/python/google-auth/py2/tests/oauth2/test_service_account.py b/contrib/python/google-auth/py2/tests/oauth2/test_service_account.py new file mode 100644 index 0000000000..a5d59dd713 --- /dev/null +++ b/contrib/python/google-auth/py2/tests/oauth2/test_service_account.py @@ -0,0 +1,433 @@ +# Copyright 2016 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json +import os + +import mock + +from google.auth import _helpers +from google.auth import crypt +from google.auth import jwt +from google.auth import transport +from google.oauth2 import service_account + + +import yatest.common +DATA_DIR = os.path.join(yatest.common.test_source_path(), "data") + +with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh: + PRIVATE_KEY_BYTES = fh.read() + +with open(os.path.join(DATA_DIR, "public_cert.pem"), "rb") as fh: + PUBLIC_CERT_BYTES = fh.read() + +with open(os.path.join(DATA_DIR, "other_cert.pem"), "rb") as fh: + OTHER_CERT_BYTES = fh.read() + +SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json") + +with open(SERVICE_ACCOUNT_JSON_FILE, "r") as fh: + SERVICE_ACCOUNT_INFO = json.load(fh) + +SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1") + + +class TestCredentials(object): + SERVICE_ACCOUNT_EMAIL = "service-account@example.com" + TOKEN_URI = "https://example.com/oauth2/token" + + @classmethod + def make_credentials(cls): + return service_account.Credentials( + SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI + ) + + def test_from_service_account_info(self): + credentials = service_account.Credentials.from_service_account_info( + SERVICE_ACCOUNT_INFO + ) + + assert credentials._signer.key_id == SERVICE_ACCOUNT_INFO["private_key_id"] + assert credentials.service_account_email == SERVICE_ACCOUNT_INFO["client_email"] + assert credentials._token_uri == SERVICE_ACCOUNT_INFO["token_uri"] + + def test_from_service_account_info_args(self): + info = SERVICE_ACCOUNT_INFO.copy() + scopes = ["email", "profile"] + subject = "subject" + additional_claims = {"meta": "data"} + + credentials = service_account.Credentials.from_service_account_info( + info, scopes=scopes, subject=subject, additional_claims=additional_claims + ) + + assert credentials.service_account_email == info["client_email"] + assert credentials.project_id == info["project_id"] + assert credentials._signer.key_id == info["private_key_id"] + assert credentials._token_uri == info["token_uri"] + assert credentials._scopes == scopes + assert credentials._subject == subject + assert credentials._additional_claims == additional_claims + + def test_from_service_account_file(self): + info = SERVICE_ACCOUNT_INFO.copy() + + credentials = service_account.Credentials.from_service_account_file( + SERVICE_ACCOUNT_JSON_FILE + ) + + assert credentials.service_account_email == info["client_email"] + assert credentials.project_id == info["project_id"] + assert credentials._signer.key_id == info["private_key_id"] + assert credentials._token_uri == info["token_uri"] + + def test_from_service_account_file_args(self): + info = SERVICE_ACCOUNT_INFO.copy() + scopes = ["email", "profile"] + subject = "subject" + additional_claims = {"meta": "data"} + + credentials = service_account.Credentials.from_service_account_file( + SERVICE_ACCOUNT_JSON_FILE, + subject=subject, + scopes=scopes, + additional_claims=additional_claims, + ) + + assert credentials.service_account_email == info["client_email"] + assert credentials.project_id == info["project_id"] + assert credentials._signer.key_id == info["private_key_id"] + assert credentials._token_uri == info["token_uri"] + assert credentials._scopes == scopes + assert credentials._subject == subject + assert credentials._additional_claims == additional_claims + + def test_default_state(self): + credentials = self.make_credentials() + assert not credentials.valid + # Expiration hasn't been set yet + assert not credentials.expired + # Scopes haven't been specified yet + assert credentials.requires_scopes + + def test_sign_bytes(self): + credentials = self.make_credentials() + to_sign = b"123" + signature = credentials.sign_bytes(to_sign) + assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES) + + def test_signer(self): + credentials = self.make_credentials() + assert isinstance(credentials.signer, crypt.Signer) + + def test_signer_email(self): + credentials = self.make_credentials() + assert credentials.signer_email == self.SERVICE_ACCOUNT_EMAIL + + def test_create_scoped(self): + credentials = self.make_credentials() + scopes = ["email", "profile"] + credentials = credentials.with_scopes(scopes) + assert credentials._scopes == scopes + + def test_with_claims(self): + credentials = self.make_credentials() + new_credentials = credentials.with_claims({"meep": "moop"}) + assert new_credentials._additional_claims == {"meep": "moop"} + + def test_with_quota_project(self): + credentials = self.make_credentials() + new_credentials = credentials.with_quota_project("new-project-456") + assert new_credentials.quota_project_id == "new-project-456" + hdrs = {} + new_credentials.apply(hdrs, token="tok") + assert "x-goog-user-project" in hdrs + + def test__make_authorization_grant_assertion(self): + credentials = self.make_credentials() + token = credentials._make_authorization_grant_assertion() + payload = jwt.decode(token, PUBLIC_CERT_BYTES) + assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL + assert payload["aud"] == service_account._GOOGLE_OAUTH2_TOKEN_ENDPOINT + + def test__make_authorization_grant_assertion_scoped(self): + credentials = self.make_credentials() + scopes = ["email", "profile"] + credentials = credentials.with_scopes(scopes) + token = credentials._make_authorization_grant_assertion() + payload = jwt.decode(token, PUBLIC_CERT_BYTES) + assert payload["scope"] == "email profile" + + def test__make_authorization_grant_assertion_subject(self): + credentials = self.make_credentials() + subject = "user@example.com" + credentials = credentials.with_subject(subject) + token = credentials._make_authorization_grant_assertion() + payload = jwt.decode(token, PUBLIC_CERT_BYTES) + assert payload["sub"] == subject + + def test_apply_with_quota_project_id(self): + credentials = service_account.Credentials( + SIGNER, + self.SERVICE_ACCOUNT_EMAIL, + self.TOKEN_URI, + quota_project_id="quota-project-123", + ) + + headers = {} + credentials.apply(headers, token="token") + + assert headers["x-goog-user-project"] == "quota-project-123" + assert "token" in headers["authorization"] + + def test_apply_with_no_quota_project_id(self): + credentials = service_account.Credentials( + SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI + ) + + headers = {} + credentials.apply(headers, token="token") + + assert "x-goog-user-project" not in headers + assert "token" in headers["authorization"] + + @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True) + def test__create_self_signed_jwt(self, jwt): + credentials = service_account.Credentials( + SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI + ) + + audience = "https://pubsub.googleapis.com" + credentials._create_self_signed_jwt(audience) + jwt.from_signing_credentials.assert_called_once_with(credentials, audience) + + @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True) + def test__create_self_signed_jwt_with_user_scopes(self, jwt): + credentials = service_account.Credentials( + SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI, scopes=["foo"] + ) + + audience = "https://pubsub.googleapis.com" + credentials._create_self_signed_jwt(audience) + + # JWT should not be created if there are user-defined scopes + jwt.from_signing_credentials.assert_not_called() + + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_refresh_success(self, jwt_grant): + credentials = self.make_credentials() + token = "token" + jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + # Refresh credentials + credentials.refresh(request) + + # Check jwt grant call. + assert jwt_grant.called + + called_request, token_uri, assertion = jwt_grant.call_args[0] + assert called_request == request + assert token_uri == credentials._token_uri + assert jwt.decode(assertion, PUBLIC_CERT_BYTES) + # No further assertion done on the token, as there are separate tests + # for checking the authorization grant assertion. + + # Check that the credentials have the token. + assert credentials.token == token + + # Check that the credentials are valid (have a token and are not + # expired) + assert credentials.valid + + @mock.patch("google.oauth2._client.jwt_grant", autospec=True) + def test_before_request_refreshes(self, jwt_grant): + credentials = self.make_credentials() + token = "token" + jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + None, + ) + request = mock.create_autospec(transport.Request, instance=True) + + # Credentials should start as invalid + assert not credentials.valid + + # before_request should cause a refresh + credentials.before_request(request, "GET", "http://example.com?a=1#3", {}) + + # The refresh endpoint should've been called. + assert jwt_grant.called + + # Credentials should now be valid. + assert credentials.valid + + @mock.patch("google.auth.jwt.Credentials._make_jwt") + def test_refresh_with_jwt_credentials(self, make_jwt): + credentials = self.make_credentials() + credentials._create_self_signed_jwt("https://pubsub.googleapis.com") + + request = mock.create_autospec(transport.Request, instance=True) + + token = "token" + expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) + make_jwt.return_value = (token, expiry) + + # Credentials should start as invalid + assert not credentials.valid + + # before_request should cause a refresh + credentials.before_request(request, "GET", "http://example.com?a=1#3", {}) + + # Credentials should now be valid. + assert credentials.valid + + # Assert make_jwt was called + assert make_jwt.called_once() + + assert credentials.token == token + assert credentials.expiry == expiry + + +class TestIDTokenCredentials(object): + SERVICE_ACCOUNT_EMAIL = "service-account@example.com" + TOKEN_URI = "https://example.com/oauth2/token" + TARGET_AUDIENCE = "https://example.com" + + @classmethod + def make_credentials(cls): + return service_account.IDTokenCredentials( + SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI, cls.TARGET_AUDIENCE + ) + + def test_from_service_account_info(self): + credentials = service_account.IDTokenCredentials.from_service_account_info( + SERVICE_ACCOUNT_INFO, target_audience=self.TARGET_AUDIENCE + ) + + assert credentials._signer.key_id == SERVICE_ACCOUNT_INFO["private_key_id"] + assert credentials.service_account_email == SERVICE_ACCOUNT_INFO["client_email"] + assert credentials._token_uri == SERVICE_ACCOUNT_INFO["token_uri"] + assert credentials._target_audience == self.TARGET_AUDIENCE + + def test_from_service_account_file(self): + info = SERVICE_ACCOUNT_INFO.copy() + + credentials = service_account.IDTokenCredentials.from_service_account_file( + SERVICE_ACCOUNT_JSON_FILE, target_audience=self.TARGET_AUDIENCE + ) + + assert credentials.service_account_email == info["client_email"] + assert credentials._signer.key_id == info["private_key_id"] + assert credentials._token_uri == info["token_uri"] + assert credentials._target_audience == self.TARGET_AUDIENCE + + def test_default_state(self): + credentials = self.make_credentials() + assert not credentials.valid + # Expiration hasn't been set yet + assert not credentials.expired + + def test_sign_bytes(self): + credentials = self.make_credentials() + to_sign = b"123" + signature = credentials.sign_bytes(to_sign) + assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES) + + def test_signer(self): + credentials = self.make_credentials() + assert isinstance(credentials.signer, crypt.Signer) + + def test_signer_email(self): + credentials = self.make_credentials() + assert credentials.signer_email == self.SERVICE_ACCOUNT_EMAIL + + def test_with_target_audience(self): + credentials = self.make_credentials() + new_credentials = credentials.with_target_audience("https://new.example.com") + assert new_credentials._target_audience == "https://new.example.com" + + def test_with_quota_project(self): + credentials = self.make_credentials() + new_credentials = credentials.with_quota_project("project-foo") + assert new_credentials._quota_project_id == "project-foo" + + def test__make_authorization_grant_assertion(self): + credentials = self.make_credentials() + token = credentials._make_authorization_grant_assertion() + payload = jwt.decode(token, PUBLIC_CERT_BYTES) + assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL + assert payload["aud"] == service_account._GOOGLE_OAUTH2_TOKEN_ENDPOINT + assert payload["target_audience"] == self.TARGET_AUDIENCE + + @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True) + def test_refresh_success(self, id_token_jwt_grant): + credentials = self.make_credentials() + token = "token" + id_token_jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + {}, + ) + request = mock.create_autospec(transport.Request, instance=True) + + # Refresh credentials + credentials.refresh(request) + + # Check jwt grant call. + assert id_token_jwt_grant.called + + called_request, token_uri, assertion = id_token_jwt_grant.call_args[0] + assert called_request == request + assert token_uri == credentials._token_uri + assert jwt.decode(assertion, PUBLIC_CERT_BYTES) + # No further assertion done on the token, as there are separate tests + # for checking the authorization grant assertion. + + # Check that the credentials have the token. + assert credentials.token == token + + # Check that the credentials are valid (have a token and are not + # expired) + assert credentials.valid + + @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True) + def test_before_request_refreshes(self, id_token_jwt_grant): + credentials = self.make_credentials() + token = "token" + id_token_jwt_grant.return_value = ( + token, + _helpers.utcnow() + datetime.timedelta(seconds=500), + None, + ) + request = mock.create_autospec(transport.Request, instance=True) + + # Credentials should start as invalid + assert not credentials.valid + + # before_request should cause a refresh + credentials.before_request(request, "GET", "http://example.com?a=1#3", {}) + + # The refresh endpoint should've been called. + assert id_token_jwt_grant.called + + # Credentials should now be valid. + assert credentials.valid diff --git a/contrib/python/google-auth/py2/tests/oauth2/test_sts.py b/contrib/python/google-auth/py2/tests/oauth2/test_sts.py new file mode 100644 index 0000000000..e8e008df5d --- /dev/null +++ b/contrib/python/google-auth/py2/tests/oauth2/test_sts.py @@ -0,0 +1,395 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +import mock +import pytest +from six.moves import http_client +from six.moves import urllib + +from google.auth import exceptions +from google.auth import transport +from google.oauth2 import sts +from google.oauth2 import utils + +CLIENT_ID = "username" +CLIENT_SECRET = "password" +# Base64 encoding of "username:password" +BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" + + +class TestStsClient(object): + GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange" + RESOURCE = "https://api.example.com/" + AUDIENCE = "urn:example:cooperation-context" + SCOPES = ["scope1", "scope2"] + REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token" + SUBJECT_TOKEN = "HEADER.SUBJECT_TOKEN_PAYLOAD.SIGNATURE" + SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" + ACTOR_TOKEN = "HEADER.ACTOR_TOKEN_PAYLOAD.SIGNATURE" + ACTOR_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" + TOKEN_EXCHANGE_ENDPOINT = "https://example.com/token.oauth2" + ADDON_HEADERS = {"x-client-version": "0.1.2"} + ADDON_OPTIONS = {"additional": {"non-standard": ["options"], "other": "some-value"}} + SUCCESS_RESPONSE = { + "access_token": "ACCESS_TOKEN", + "issued_token_type": "urn:ietf:params:oauth:token-type:access_token", + "token_type": "Bearer", + "expires_in": 3600, + "scope": "scope1 scope2", + } + ERROR_RESPONSE = { + "error": "invalid_request", + "error_description": "Invalid subject token", + "error_uri": "https://tools.ietf.org/html/rfc6749", + } + CLIENT_AUTH_BASIC = utils.ClientAuthentication( + utils.ClientAuthType.basic, CLIENT_ID, CLIENT_SECRET + ) + CLIENT_AUTH_REQUEST_BODY = utils.ClientAuthentication( + utils.ClientAuthType.request_body, CLIENT_ID, CLIENT_SECRET + ) + + @classmethod + def make_client(cls, client_auth=None): + return sts.Client(cls.TOKEN_EXCHANGE_ENDPOINT, client_auth) + + @classmethod + def make_mock_request(cls, data, status=http_client.OK): + response = mock.create_autospec(transport.Response, instance=True) + response.status = status + response.data = json.dumps(data).encode("utf-8") + + request = mock.create_autospec(transport.Request) + request.return_value = response + + return request + + @classmethod + def assert_request_kwargs(cls, request_kwargs, headers, request_data): + """Asserts the request was called with the expected parameters. + """ + assert request_kwargs["url"] == cls.TOKEN_EXCHANGE_ENDPOINT + assert request_kwargs["method"] == "POST" + assert request_kwargs["headers"] == headers + assert request_kwargs["body"] is not None + body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) + for (k, v) in body_tuples: + assert v.decode("utf-8") == request_data[k.decode("utf-8")] + assert len(body_tuples) == len(request_data.keys()) + + def test_exchange_token_full_success_without_auth(self): + """Test token exchange success without client authentication using full + parameters. + """ + client = self.make_client() + headers = self.ADDON_HEADERS.copy() + headers["Content-Type"] = "application/x-www-form-urlencoded" + request_data = { + "grant_type": self.GRANT_TYPE, + "resource": self.RESOURCE, + "audience": self.AUDIENCE, + "scope": " ".join(self.SCOPES), + "requested_token_type": self.REQUESTED_TOKEN_TYPE, + "subject_token": self.SUBJECT_TOKEN, + "subject_token_type": self.SUBJECT_TOKEN_TYPE, + "actor_token": self.ACTOR_TOKEN, + "actor_token_type": self.ACTOR_TOKEN_TYPE, + "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)), + } + request = self.make_mock_request( + status=http_client.OK, data=self.SUCCESS_RESPONSE + ) + + response = client.exchange_token( + request, + self.GRANT_TYPE, + self.SUBJECT_TOKEN, + self.SUBJECT_TOKEN_TYPE, + self.RESOURCE, + self.AUDIENCE, + self.SCOPES, + self.REQUESTED_TOKEN_TYPE, + self.ACTOR_TOKEN, + self.ACTOR_TOKEN_TYPE, + self.ADDON_OPTIONS, + self.ADDON_HEADERS, + ) + + self.assert_request_kwargs(request.call_args[1], headers, request_data) + assert response == self.SUCCESS_RESPONSE + + def test_exchange_token_partial_success_without_auth(self): + """Test token exchange success without client authentication using + partial (required only) parameters. + """ + client = self.make_client() + headers = {"Content-Type": "application/x-www-form-urlencoded"} + request_data = { + "grant_type": self.GRANT_TYPE, + "audience": self.AUDIENCE, + "requested_token_type": self.REQUESTED_TOKEN_TYPE, + "subject_token": self.SUBJECT_TOKEN, + "subject_token_type": self.SUBJECT_TOKEN_TYPE, + } + request = self.make_mock_request( + status=http_client.OK, data=self.SUCCESS_RESPONSE + ) + + response = client.exchange_token( + request, + grant_type=self.GRANT_TYPE, + subject_token=self.SUBJECT_TOKEN, + subject_token_type=self.SUBJECT_TOKEN_TYPE, + audience=self.AUDIENCE, + requested_token_type=self.REQUESTED_TOKEN_TYPE, + ) + + self.assert_request_kwargs(request.call_args[1], headers, request_data) + assert response == self.SUCCESS_RESPONSE + + def test_exchange_token_non200_without_auth(self): + """Test token exchange without client auth responding with non-200 status. + """ + client = self.make_client() + request = self.make_mock_request( + status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE + ) + + with pytest.raises(exceptions.OAuthError) as excinfo: + client.exchange_token( + request, + self.GRANT_TYPE, + self.SUBJECT_TOKEN, + self.SUBJECT_TOKEN_TYPE, + self.RESOURCE, + self.AUDIENCE, + self.SCOPES, + self.REQUESTED_TOKEN_TYPE, + self.ACTOR_TOKEN, + self.ACTOR_TOKEN_TYPE, + self.ADDON_OPTIONS, + self.ADDON_HEADERS, + ) + + assert excinfo.match( + r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749" + ) + + def test_exchange_token_full_success_with_basic_auth(self): + """Test token exchange success with basic client authentication using full + parameters. + """ + client = self.make_client(self.CLIENT_AUTH_BASIC) + headers = self.ADDON_HEADERS.copy() + headers["Content-Type"] = "application/x-www-form-urlencoded" + headers["Authorization"] = "Basic {}".format(BASIC_AUTH_ENCODING) + request_data = { + "grant_type": self.GRANT_TYPE, + "resource": self.RESOURCE, + "audience": self.AUDIENCE, + "scope": " ".join(self.SCOPES), + "requested_token_type": self.REQUESTED_TOKEN_TYPE, + "subject_token": self.SUBJECT_TOKEN, + "subject_token_type": self.SUBJECT_TOKEN_TYPE, + "actor_token": self.ACTOR_TOKEN, + "actor_token_type": self.ACTOR_TOKEN_TYPE, + "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)), + } + request = self.make_mock_request( + status=http_client.OK, data=self.SUCCESS_RESPONSE + ) + + response = client.exchange_token( + request, + self.GRANT_TYPE, + self.SUBJECT_TOKEN, + self.SUBJECT_TOKEN_TYPE, + self.RESOURCE, + self.AUDIENCE, + self.SCOPES, + self.REQUESTED_TOKEN_TYPE, + self.ACTOR_TOKEN, + self.ACTOR_TOKEN_TYPE, + self.ADDON_OPTIONS, + self.ADDON_HEADERS, + ) + + self.assert_request_kwargs(request.call_args[1], headers, request_data) + assert response == self.SUCCESS_RESPONSE + + def test_exchange_token_partial_success_with_basic_auth(self): + """Test token exchange success with basic client authentication using + partial (required only) parameters. + """ + client = self.make_client(self.CLIENT_AUTH_BASIC) + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING), + } + request_data = { + "grant_type": self.GRANT_TYPE, + "audience": self.AUDIENCE, + "requested_token_type": self.REQUESTED_TOKEN_TYPE, + "subject_token": self.SUBJECT_TOKEN, + "subject_token_type": self.SUBJECT_TOKEN_TYPE, + } + request = self.make_mock_request( + status=http_client.OK, data=self.SUCCESS_RESPONSE + ) + + response = client.exchange_token( + request, + grant_type=self.GRANT_TYPE, + subject_token=self.SUBJECT_TOKEN, + subject_token_type=self.SUBJECT_TOKEN_TYPE, + audience=self.AUDIENCE, + requested_token_type=self.REQUESTED_TOKEN_TYPE, + ) + + self.assert_request_kwargs(request.call_args[1], headers, request_data) + assert response == self.SUCCESS_RESPONSE + + def test_exchange_token_non200_with_basic_auth(self): + """Test token exchange with basic client auth responding with non-200 + status. + """ + client = self.make_client(self.CLIENT_AUTH_BASIC) + request = self.make_mock_request( + status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE + ) + + with pytest.raises(exceptions.OAuthError) as excinfo: + client.exchange_token( + request, + self.GRANT_TYPE, + self.SUBJECT_TOKEN, + self.SUBJECT_TOKEN_TYPE, + self.RESOURCE, + self.AUDIENCE, + self.SCOPES, + self.REQUESTED_TOKEN_TYPE, + self.ACTOR_TOKEN, + self.ACTOR_TOKEN_TYPE, + self.ADDON_OPTIONS, + self.ADDON_HEADERS, + ) + + assert excinfo.match( + r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749" + ) + + def test_exchange_token_full_success_with_reqbody_auth(self): + """Test token exchange success with request body client authenticaiton + using full parameters. + """ + client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY) + headers = self.ADDON_HEADERS.copy() + headers["Content-Type"] = "application/x-www-form-urlencoded" + request_data = { + "grant_type": self.GRANT_TYPE, + "resource": self.RESOURCE, + "audience": self.AUDIENCE, + "scope": " ".join(self.SCOPES), + "requested_token_type": self.REQUESTED_TOKEN_TYPE, + "subject_token": self.SUBJECT_TOKEN, + "subject_token_type": self.SUBJECT_TOKEN_TYPE, + "actor_token": self.ACTOR_TOKEN, + "actor_token_type": self.ACTOR_TOKEN_TYPE, + "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)), + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + } + request = self.make_mock_request( + status=http_client.OK, data=self.SUCCESS_RESPONSE + ) + + response = client.exchange_token( + request, + self.GRANT_TYPE, + self.SUBJECT_TOKEN, + self.SUBJECT_TOKEN_TYPE, + self.RESOURCE, + self.AUDIENCE, + self.SCOPES, + self.REQUESTED_TOKEN_TYPE, + self.ACTOR_TOKEN, + self.ACTOR_TOKEN_TYPE, + self.ADDON_OPTIONS, + self.ADDON_HEADERS, + ) + + self.assert_request_kwargs(request.call_args[1], headers, request_data) + assert response == self.SUCCESS_RESPONSE + + def test_exchange_token_partial_success_with_reqbody_auth(self): + """Test token exchange success with request body client authentication + using partial (required only) parameters. + """ + client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY) + headers = {"Content-Type": "application/x-www-form-urlencoded"} + request_data = { + "grant_type": self.GRANT_TYPE, + "audience": self.AUDIENCE, + "requested_token_type": self.REQUESTED_TOKEN_TYPE, + "subject_token": self.SUBJECT_TOKEN, + "subject_token_type": self.SUBJECT_TOKEN_TYPE, + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + } + request = self.make_mock_request( + status=http_client.OK, data=self.SUCCESS_RESPONSE + ) + + response = client.exchange_token( + request, + grant_type=self.GRANT_TYPE, + subject_token=self.SUBJECT_TOKEN, + subject_token_type=self.SUBJECT_TOKEN_TYPE, + audience=self.AUDIENCE, + requested_token_type=self.REQUESTED_TOKEN_TYPE, + ) + + self.assert_request_kwargs(request.call_args[1], headers, request_data) + assert response == self.SUCCESS_RESPONSE + + def test_exchange_token_non200_with_reqbody_auth(self): + """Test token exchange with POST request body client auth responding + with non-200 status. + """ + client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY) + request = self.make_mock_request( + status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE + ) + + with pytest.raises(exceptions.OAuthError) as excinfo: + client.exchange_token( + request, + self.GRANT_TYPE, + self.SUBJECT_TOKEN, + self.SUBJECT_TOKEN_TYPE, + self.RESOURCE, + self.AUDIENCE, + self.SCOPES, + self.REQUESTED_TOKEN_TYPE, + self.ACTOR_TOKEN, + self.ACTOR_TOKEN_TYPE, + self.ADDON_OPTIONS, + self.ADDON_HEADERS, + ) + + assert excinfo.match( + r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749" + ) diff --git a/contrib/python/google-auth/py2/tests/oauth2/test_utils.py b/contrib/python/google-auth/py2/tests/oauth2/test_utils.py new file mode 100644 index 0000000000..6de9ff5337 --- /dev/null +++ b/contrib/python/google-auth/py2/tests/oauth2/test_utils.py @@ -0,0 +1,264 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +import pytest + +from google.auth import exceptions +from google.oauth2 import utils + + +CLIENT_ID = "username" +CLIENT_SECRET = "password" +# Base64 encoding of "username:password" +BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" +# Base64 encoding of "username:" +BASIC_AUTH_ENCODING_SECRETLESS = "dXNlcm5hbWU6" + + +class AuthHandler(utils.OAuthClientAuthHandler): + def __init__(self, client_auth=None): + super(AuthHandler, self).__init__(client_auth) + + def apply_client_authentication_options( + self, headers, request_body=None, bearer_token=None + ): + return super(AuthHandler, self).apply_client_authentication_options( + headers, request_body, bearer_token + ) + + +class TestClientAuthentication(object): + @classmethod + def make_client_auth(cls, client_secret=None): + return utils.ClientAuthentication( + utils.ClientAuthType.basic, CLIENT_ID, client_secret + ) + + def test_initialization_with_client_secret(self): + client_auth = self.make_client_auth(CLIENT_SECRET) + + assert client_auth.client_auth_type == utils.ClientAuthType.basic + assert client_auth.client_id == CLIENT_ID + assert client_auth.client_secret == CLIENT_SECRET + + def test_initialization_no_client_secret(self): + client_auth = self.make_client_auth() + + assert client_auth.client_auth_type == utils.ClientAuthType.basic + assert client_auth.client_id == CLIENT_ID + assert client_auth.client_secret is None + + +class TestOAuthClientAuthHandler(object): + CLIENT_AUTH_BASIC = utils.ClientAuthentication( + utils.ClientAuthType.basic, CLIENT_ID, CLIENT_SECRET + ) + CLIENT_AUTH_BASIC_SECRETLESS = utils.ClientAuthentication( + utils.ClientAuthType.basic, CLIENT_ID + ) + CLIENT_AUTH_REQUEST_BODY = utils.ClientAuthentication( + utils.ClientAuthType.request_body, CLIENT_ID, CLIENT_SECRET + ) + CLIENT_AUTH_REQUEST_BODY_SECRETLESS = utils.ClientAuthentication( + utils.ClientAuthType.request_body, CLIENT_ID + ) + + @classmethod + def make_oauth_client_auth_handler(cls, client_auth=None): + return AuthHandler(client_auth) + + def test_apply_client_authentication_options_none(self): + headers = {"Content-Type": "application/json"} + request_body = {"foo": "bar"} + auth_handler = self.make_oauth_client_auth_handler() + + auth_handler.apply_client_authentication_options(headers, request_body) + + assert headers == {"Content-Type": "application/json"} + assert request_body == {"foo": "bar"} + + def test_apply_client_authentication_options_basic(self): + headers = {"Content-Type": "application/json"} + request_body = {"foo": "bar"} + auth_handler = self.make_oauth_client_auth_handler(self.CLIENT_AUTH_BASIC) + + auth_handler.apply_client_authentication_options(headers, request_body) + + assert headers == { + "Content-Type": "application/json", + "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING), + } + assert request_body == {"foo": "bar"} + + def test_apply_client_authentication_options_basic_nosecret(self): + headers = {"Content-Type": "application/json"} + request_body = {"foo": "bar"} + auth_handler = self.make_oauth_client_auth_handler( + self.CLIENT_AUTH_BASIC_SECRETLESS + ) + + auth_handler.apply_client_authentication_options(headers, request_body) + + assert headers == { + "Content-Type": "application/json", + "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING_SECRETLESS), + } + assert request_body == {"foo": "bar"} + + def test_apply_client_authentication_options_request_body(self): + headers = {"Content-Type": "application/json"} + request_body = {"foo": "bar"} + auth_handler = self.make_oauth_client_auth_handler( + self.CLIENT_AUTH_REQUEST_BODY + ) + + auth_handler.apply_client_authentication_options(headers, request_body) + + assert headers == {"Content-Type": "application/json"} + assert request_body == { + "foo": "bar", + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + } + + def test_apply_client_authentication_options_request_body_nosecret(self): + headers = {"Content-Type": "application/json"} + request_body = {"foo": "bar"} + auth_handler = self.make_oauth_client_auth_handler( + self.CLIENT_AUTH_REQUEST_BODY_SECRETLESS + ) + + auth_handler.apply_client_authentication_options(headers, request_body) + + assert headers == {"Content-Type": "application/json"} + assert request_body == { + "foo": "bar", + "client_id": CLIENT_ID, + "client_secret": "", + } + + def test_apply_client_authentication_options_request_body_no_body(self): + headers = {"Content-Type": "application/json"} + auth_handler = self.make_oauth_client_auth_handler( + self.CLIENT_AUTH_REQUEST_BODY + ) + + with pytest.raises(exceptions.OAuthError) as excinfo: + auth_handler.apply_client_authentication_options(headers) + + assert excinfo.match(r"HTTP request does not support request-body") + + def test_apply_client_authentication_options_bearer_token(self): + bearer_token = "ACCESS_TOKEN" + headers = {"Content-Type": "application/json"} + request_body = {"foo": "bar"} + auth_handler = self.make_oauth_client_auth_handler() + + auth_handler.apply_client_authentication_options( + headers, request_body, bearer_token + ) + + assert headers == { + "Content-Type": "application/json", + "Authorization": "Bearer {}".format(bearer_token), + } + assert request_body == {"foo": "bar"} + + def test_apply_client_authentication_options_bearer_and_basic(self): + bearer_token = "ACCESS_TOKEN" + headers = {"Content-Type": "application/json"} + request_body = {"foo": "bar"} + auth_handler = self.make_oauth_client_auth_handler(self.CLIENT_AUTH_BASIC) + + auth_handler.apply_client_authentication_options( + headers, request_body, bearer_token + ) + + # Bearer token should have higher priority. + assert headers == { + "Content-Type": "application/json", + "Authorization": "Bearer {}".format(bearer_token), + } + assert request_body == {"foo": "bar"} + + def test_apply_client_authentication_options_bearer_and_request_body(self): + bearer_token = "ACCESS_TOKEN" + headers = {"Content-Type": "application/json"} + request_body = {"foo": "bar"} + auth_handler = self.make_oauth_client_auth_handler( + self.CLIENT_AUTH_REQUEST_BODY + ) + + auth_handler.apply_client_authentication_options( + headers, request_body, bearer_token + ) + + # Bearer token should have higher priority. + assert headers == { + "Content-Type": "application/json", + "Authorization": "Bearer {}".format(bearer_token), + } + assert request_body == {"foo": "bar"} + + +def test__handle_error_response_code_only(): + error_resp = {"error": "unsupported_grant_type"} + response_data = json.dumps(error_resp) + + with pytest.raises(exceptions.OAuthError) as excinfo: + utils.handle_error_response(response_data) + + assert excinfo.match(r"Error code unsupported_grant_type") + + +def test__handle_error_response_code_description(): + error_resp = { + "error": "unsupported_grant_type", + "error_description": "The provided grant_type is unsupported", + } + response_data = json.dumps(error_resp) + + with pytest.raises(exceptions.OAuthError) as excinfo: + utils.handle_error_response(response_data) + + assert excinfo.match( + r"Error code unsupported_grant_type: The provided grant_type is unsupported" + ) + + +def test__handle_error_response_code_description_uri(): + error_resp = { + "error": "unsupported_grant_type", + "error_description": "The provided grant_type is unsupported", + "error_uri": "https://tools.ietf.org/html/rfc6749", + } + response_data = json.dumps(error_resp) + + with pytest.raises(exceptions.OAuthError) as excinfo: + utils.handle_error_response(response_data) + + assert excinfo.match( + r"Error code unsupported_grant_type: The provided grant_type is unsupported - https://tools.ietf.org/html/rfc6749" + ) + + +def test__handle_error_response_non_json(): + response_data = "Oops, something wrong happened" + + with pytest.raises(exceptions.OAuthError) as excinfo: + utils.handle_error_response(response_data) + + assert excinfo.match(r"Oops, something wrong happened") |