aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/google-auth/py2/tests/oauth2
diff options
context:
space:
mode:
authoralexv-smirnov <alex@ydb.tech>2023-12-01 12:02:50 +0300
committeralexv-smirnov <alex@ydb.tech>2023-12-01 13:28:10 +0300
commit0e578a4c44d4abd539d9838347b9ebafaca41dfb (patch)
treea0c1969c37f818c830ebeff9c077eacf30be6ef8 /contrib/python/google-auth/py2/tests/oauth2
parent84f2d3d4cc985e63217cff149bd2e6d67ae6fe22 (diff)
downloadydb-0e578a4c44d4abd539d9838347b9ebafaca41dfb.tar.gz
Change "ya.make"
Diffstat (limited to 'contrib/python/google-auth/py2/tests/oauth2')
-rw-r--r--contrib/python/google-auth/py2/tests/oauth2/__init__.py0
-rw-r--r--contrib/python/google-auth/py2/tests/oauth2/test__client.py330
-rw-r--r--contrib/python/google-auth/py2/tests/oauth2/test_challenges.py132
-rw-r--r--contrib/python/google-auth/py2/tests/oauth2/test_credentials.py876
-rw-r--r--contrib/python/google-auth/py2/tests/oauth2/test_id_token.py228
-rw-r--r--contrib/python/google-auth/py2/tests/oauth2/test_reauth.py308
-rw-r--r--contrib/python/google-auth/py2/tests/oauth2/test_service_account.py433
-rw-r--r--contrib/python/google-auth/py2/tests/oauth2/test_sts.py395
-rw-r--r--contrib/python/google-auth/py2/tests/oauth2/test_utils.py264
9 files changed, 2966 insertions, 0 deletions
diff --git a/contrib/python/google-auth/py2/tests/oauth2/__init__.py b/contrib/python/google-auth/py2/tests/oauth2/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/contrib/python/google-auth/py2/tests/oauth2/__init__.py
diff --git a/contrib/python/google-auth/py2/tests/oauth2/test__client.py b/contrib/python/google-auth/py2/tests/oauth2/test__client.py
new file mode 100644
index 0000000000..1dba2523e7
--- /dev/null
+++ b/contrib/python/google-auth/py2/tests/oauth2/test__client.py
@@ -0,0 +1,330 @@
+# Copyright 2016 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import json
+import os
+
+import mock
+import pytest
+import six
+from six.moves import http_client
+from six.moves import urllib
+
+from google.auth import _helpers
+from google.auth import crypt
+from google.auth import exceptions
+from google.auth import jwt
+from google.auth import transport
+from google.oauth2 import _client
+
+
+import yatest.common
+DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+
+with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
+ PRIVATE_KEY_BYTES = fh.read()
+
+SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
+
+SCOPES_AS_LIST = [
+ "https://www.googleapis.com/auth/pubsub",
+ "https://www.googleapis.com/auth/logging.write",
+]
+SCOPES_AS_STRING = (
+ "https://www.googleapis.com/auth/pubsub"
+ " https://www.googleapis.com/auth/logging.write"
+)
+
+
+def test__handle_error_response():
+ response_data = {"error": "help", "error_description": "I'm alive"}
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ _client._handle_error_response(response_data)
+
+ assert excinfo.match(r"help: I\'m alive")
+
+
+def test__handle_error_response_non_json():
+ response_data = {"foo": "bar"}
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ _client._handle_error_response(response_data)
+
+ assert excinfo.match(r"{\"foo\": \"bar\"}")
+
+
+@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+def test__parse_expiry(unused_utcnow):
+ result = _client._parse_expiry({"expires_in": 500})
+ assert result == datetime.datetime.min + datetime.timedelta(seconds=500)
+
+
+def test__parse_expiry_none():
+ assert _client._parse_expiry({}) is None
+
+
+def make_request(response_data, status=http_client.OK):
+ response = mock.create_autospec(transport.Response, instance=True)
+ response.status = status
+ response.data = json.dumps(response_data).encode("utf-8")
+ request = mock.create_autospec(transport.Request)
+ request.return_value = response
+ return request
+
+
+def test__token_endpoint_request():
+ request = make_request({"test": "response"})
+
+ result = _client._token_endpoint_request(
+ request, "http://example.com", {"test": "params"}
+ )
+
+ # Check request call
+ request.assert_called_with(
+ method="POST",
+ url="http://example.com",
+ headers={"Content-Type": "application/x-www-form-urlencoded"},
+ body="test=params".encode("utf-8"),
+ )
+
+ # Check result
+ assert result == {"test": "response"}
+
+
+def test__token_endpoint_request_use_json():
+ request = make_request({"test": "response"})
+
+ result = _client._token_endpoint_request(
+ request,
+ "http://example.com",
+ {"test": "params"},
+ access_token="access_token",
+ use_json=True,
+ )
+
+ # Check request call
+ request.assert_called_with(
+ method="POST",
+ url="http://example.com",
+ headers={
+ "Content-Type": "application/json",
+ "Authorization": "Bearer access_token",
+ },
+ body=b'{"test": "params"}',
+ )
+
+ # Check result
+ assert result == {"test": "response"}
+
+
+def test__token_endpoint_request_error():
+ request = make_request({}, status=http_client.BAD_REQUEST)
+
+ with pytest.raises(exceptions.RefreshError):
+ _client._token_endpoint_request(request, "http://example.com", {})
+
+
+def test__token_endpoint_request_internal_failure_error():
+ request = make_request(
+ {"error_description": "internal_failure"}, status=http_client.BAD_REQUEST
+ )
+
+ with pytest.raises(exceptions.RefreshError):
+ _client._token_endpoint_request(
+ request, "http://example.com", {"error_description": "internal_failure"}
+ )
+
+ request = make_request(
+ {"error": "internal_failure"}, status=http_client.BAD_REQUEST
+ )
+
+ with pytest.raises(exceptions.RefreshError):
+ _client._token_endpoint_request(
+ request, "http://example.com", {"error": "internal_failure"}
+ )
+
+
+def verify_request_params(request, params):
+ request_body = request.call_args[1]["body"].decode("utf-8")
+ request_params = urllib.parse.parse_qs(request_body)
+
+ for key, value in six.iteritems(params):
+ assert request_params[key][0] == value
+
+
+@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+def test_jwt_grant(utcnow):
+ request = make_request(
+ {"access_token": "token", "expires_in": 500, "extra": "data"}
+ )
+
+ token, expiry, extra_data = _client.jwt_grant(
+ request, "http://example.com", "assertion_value"
+ )
+
+ # Check request call
+ verify_request_params(
+ request, {"grant_type": _client._JWT_GRANT_TYPE, "assertion": "assertion_value"}
+ )
+
+ # Check result
+ assert token == "token"
+ assert expiry == utcnow() + datetime.timedelta(seconds=500)
+ assert extra_data["extra"] == "data"
+
+
+def test_jwt_grant_no_access_token():
+ request = make_request(
+ {
+ # No access token.
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
+
+ with pytest.raises(exceptions.RefreshError):
+ _client.jwt_grant(request, "http://example.com", "assertion_value")
+
+
+def test_id_token_jwt_grant():
+ now = _helpers.utcnow()
+ id_token_expiry = _helpers.datetime_to_secs(now)
+ id_token = jwt.encode(SIGNER, {"exp": id_token_expiry}).decode("utf-8")
+ request = make_request({"id_token": id_token, "extra": "data"})
+
+ token, expiry, extra_data = _client.id_token_jwt_grant(
+ request, "http://example.com", "assertion_value"
+ )
+
+ # Check request call
+ verify_request_params(
+ request, {"grant_type": _client._JWT_GRANT_TYPE, "assertion": "assertion_value"}
+ )
+
+ # Check result
+ assert token == id_token
+ # JWT does not store microseconds
+ now = now.replace(microsecond=0)
+ assert expiry == now
+ assert extra_data["extra"] == "data"
+
+
+def test_id_token_jwt_grant_no_access_token():
+ request = make_request(
+ {
+ # No access token.
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
+
+ with pytest.raises(exceptions.RefreshError):
+ _client.id_token_jwt_grant(request, "http://example.com", "assertion_value")
+
+
+@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+def test_refresh_grant(unused_utcnow):
+ request = make_request(
+ {
+ "access_token": "token",
+ "refresh_token": "new_refresh_token",
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
+
+ token, refresh_token, expiry, extra_data = _client.refresh_grant(
+ request,
+ "http://example.com",
+ "refresh_token",
+ "client_id",
+ "client_secret",
+ rapt_token="rapt_token",
+ )
+
+ # Check request call
+ verify_request_params(
+ request,
+ {
+ "grant_type": _client._REFRESH_GRANT_TYPE,
+ "refresh_token": "refresh_token",
+ "client_id": "client_id",
+ "client_secret": "client_secret",
+ "rapt": "rapt_token",
+ },
+ )
+
+ # Check result
+ assert token == "token"
+ assert refresh_token == "new_refresh_token"
+ assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500)
+ assert extra_data["extra"] == "data"
+
+
+@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+def test_refresh_grant_with_scopes(unused_utcnow):
+ request = make_request(
+ {
+ "access_token": "token",
+ "refresh_token": "new_refresh_token",
+ "expires_in": 500,
+ "extra": "data",
+ "scope": SCOPES_AS_STRING,
+ }
+ )
+
+ token, refresh_token, expiry, extra_data = _client.refresh_grant(
+ request,
+ "http://example.com",
+ "refresh_token",
+ "client_id",
+ "client_secret",
+ SCOPES_AS_LIST,
+ )
+
+ # Check request call.
+ verify_request_params(
+ request,
+ {
+ "grant_type": _client._REFRESH_GRANT_TYPE,
+ "refresh_token": "refresh_token",
+ "client_id": "client_id",
+ "client_secret": "client_secret",
+ "scope": SCOPES_AS_STRING,
+ },
+ )
+
+ # Check result.
+ assert token == "token"
+ assert refresh_token == "new_refresh_token"
+ assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500)
+ assert extra_data["extra"] == "data"
+
+
+def test_refresh_grant_no_access_token():
+ request = make_request(
+ {
+ # No access token.
+ "refresh_token": "new_refresh_token",
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
+
+ with pytest.raises(exceptions.RefreshError):
+ _client.refresh_grant(
+ request, "http://example.com", "refresh_token", "client_id", "client_secret"
+ )
diff --git a/contrib/python/google-auth/py2/tests/oauth2/test_challenges.py b/contrib/python/google-auth/py2/tests/oauth2/test_challenges.py
new file mode 100644
index 0000000000..019b908dae
--- /dev/null
+++ b/contrib/python/google-auth/py2/tests/oauth2/test_challenges.py
@@ -0,0 +1,132 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests for the reauth module."""
+
+import base64
+import sys
+
+import mock
+import pytest
+import pyu2f
+
+from google.auth import exceptions
+from google.oauth2 import challenges
+
+
+def test_get_user_password():
+ with mock.patch("getpass.getpass", return_value="foo"):
+ assert challenges.get_user_password("") == "foo"
+
+
+def test_security_key():
+ metadata = {
+ "status": "READY",
+ "challengeId": 2,
+ "challengeType": "SECURITY_KEY",
+ "securityKey": {
+ "applicationId": "security_key_application_id",
+ "challenges": [
+ {
+ "keyHandle": "some_key",
+ "challenge": base64.urlsafe_b64encode(
+ "some_challenge".encode("ascii")
+ ).decode("ascii"),
+ }
+ ],
+ },
+ }
+ mock_key = mock.Mock()
+
+ challenge = challenges.SecurityKeyChallenge()
+
+ # Test the case that security key challenge is passed.
+ with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key):
+ with mock.patch(
+ "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate"
+ ) as mock_authenticate:
+ mock_authenticate.return_value = "security key response"
+ assert challenge.name == "SECURITY_KEY"
+ assert challenge.is_locally_eligible
+ assert challenge.obtain_challenge_input(metadata) == {
+ "securityKey": "security key response"
+ }
+ mock_authenticate.assert_called_with(
+ "security_key_application_id",
+ [{"key": mock_key, "challenge": b"some_challenge"}],
+ print_callback=sys.stderr.write,
+ )
+
+ # Test various types of exceptions.
+ with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key):
+ with mock.patch(
+ "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate"
+ ) as mock_authenticate:
+ mock_authenticate.side_effect = pyu2f.errors.U2FError(
+ pyu2f.errors.U2FError.DEVICE_INELIGIBLE
+ )
+ assert challenge.obtain_challenge_input(metadata) is None
+
+ with mock.patch(
+ "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate"
+ ) as mock_authenticate:
+ mock_authenticate.side_effect = pyu2f.errors.U2FError(
+ pyu2f.errors.U2FError.TIMEOUT
+ )
+ assert challenge.obtain_challenge_input(metadata) is None
+
+ with mock.patch(
+ "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate"
+ ) as mock_authenticate:
+ mock_authenticate.side_effect = pyu2f.errors.U2FError(
+ pyu2f.errors.U2FError.BAD_REQUEST
+ )
+ with pytest.raises(pyu2f.errors.U2FError):
+ challenge.obtain_challenge_input(metadata)
+
+ with mock.patch(
+ "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate"
+ ) as mock_authenticate:
+ mock_authenticate.side_effect = pyu2f.errors.NoDeviceFoundError()
+ assert challenge.obtain_challenge_input(metadata) is None
+
+ with mock.patch(
+ "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate"
+ ) as mock_authenticate:
+ mock_authenticate.side_effect = pyu2f.errors.UnsupportedVersionException()
+ with pytest.raises(pyu2f.errors.UnsupportedVersionException):
+ challenge.obtain_challenge_input(metadata)
+
+ with mock.patch.dict("sys.modules"):
+ sys.modules["pyu2f"] = None
+ with pytest.raises(exceptions.ReauthFailError) as excinfo:
+ challenge.obtain_challenge_input(metadata)
+ assert excinfo.match(r"pyu2f dependency is required")
+
+
+@mock.patch("getpass.getpass", return_value="foo")
+def test_password_challenge(getpass_mock):
+ challenge = challenges.PasswordChallenge()
+
+ with mock.patch("getpass.getpass", return_value="foo"):
+ assert challenge.is_locally_eligible
+ assert challenge.name == "PASSWORD"
+ assert challenges.PasswordChallenge().obtain_challenge_input({}) == {
+ "credential": "foo"
+ }
+
+ with mock.patch("getpass.getpass", return_value=None):
+ assert challenges.PasswordChallenge().obtain_challenge_input({}) == {
+ "credential": " "
+ }
diff --git a/contrib/python/google-auth/py2/tests/oauth2/test_credentials.py b/contrib/python/google-auth/py2/tests/oauth2/test_credentials.py
new file mode 100644
index 0000000000..5c21ebe547
--- /dev/null
+++ b/contrib/python/google-auth/py2/tests/oauth2/test_credentials.py
@@ -0,0 +1,876 @@
+# Copyright 2016 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import json
+import os
+import pickle
+import sys
+
+import mock
+import pytest
+
+from google.auth import _helpers
+from google.auth import exceptions
+from google.auth import transport
+from google.oauth2 import credentials
+
+
+import yatest.common
+DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+
+AUTH_USER_JSON_FILE = os.path.join(DATA_DIR, "authorized_user.json")
+
+with open(AUTH_USER_JSON_FILE, "r") as fh:
+ AUTH_USER_INFO = json.load(fh)
+
+
+class TestCredentials(object):
+ TOKEN_URI = "https://example.com/oauth2/token"
+ REFRESH_TOKEN = "refresh_token"
+ RAPT_TOKEN = "rapt_token"
+ CLIENT_ID = "client_id"
+ CLIENT_SECRET = "client_secret"
+
+ @classmethod
+ def make_credentials(cls):
+ return credentials.Credentials(
+ token=None,
+ refresh_token=cls.REFRESH_TOKEN,
+ token_uri=cls.TOKEN_URI,
+ client_id=cls.CLIENT_ID,
+ client_secret=cls.CLIENT_SECRET,
+ rapt_token=cls.RAPT_TOKEN,
+ )
+
+ def test_default_state(self):
+ credentials = self.make_credentials()
+ assert not credentials.valid
+ # Expiration hasn't been set yet
+ assert not credentials.expired
+ # Scopes aren't required for these credentials
+ assert not credentials.requires_scopes
+ # Test properties
+ assert credentials.refresh_token == self.REFRESH_TOKEN
+ assert credentials.token_uri == self.TOKEN_URI
+ assert credentials.client_id == self.CLIENT_ID
+ assert credentials.client_secret == self.CLIENT_SECRET
+ assert credentials.rapt_token == self.RAPT_TOKEN
+ assert credentials.refresh_handler is None
+
+ def test_refresh_handler_setter_and_getter(self):
+ scopes = ["email", "profile"]
+ original_refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN_1", None))
+ updated_refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN_2", None))
+ creds = credentials.Credentials(
+ token=None,
+ refresh_token=None,
+ token_uri=None,
+ client_id=None,
+ client_secret=None,
+ rapt_token=None,
+ scopes=scopes,
+ default_scopes=None,
+ refresh_handler=original_refresh_handler,
+ )
+
+ assert creds.refresh_handler is original_refresh_handler
+
+ creds.refresh_handler = updated_refresh_handler
+
+ assert creds.refresh_handler is updated_refresh_handler
+
+ creds.refresh_handler = None
+
+ assert creds.refresh_handler is None
+
+ def test_invalid_refresh_handler(self):
+ scopes = ["email", "profile"]
+ with pytest.raises(TypeError) as excinfo:
+ credentials.Credentials(
+ token=None,
+ refresh_token=None,
+ token_uri=None,
+ client_id=None,
+ client_secret=None,
+ rapt_token=None,
+ scopes=scopes,
+ default_scopes=None,
+ refresh_handler=object(),
+ )
+
+ assert excinfo.match("The provided refresh_handler is not a callable or None.")
+
+ @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
+ @mock.patch(
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
+ )
+ def test_refresh_success(self, unused_utcnow, refresh_grant):
+ token = "token"
+ new_rapt_token = "new_rapt_token"
+ expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
+ grant_response = {"id_token": mock.sentinel.id_token}
+ refresh_grant.return_value = (
+ # Access token
+ token,
+ # New refresh token
+ None,
+ # Expiry,
+ expiry,
+ # Extra data
+ grant_response,
+ # rapt_token
+ new_rapt_token,
+ )
+
+ request = mock.create_autospec(transport.Request)
+ credentials = self.make_credentials()
+
+ # Refresh credentials
+ credentials.refresh(request)
+
+ # Check jwt grant call.
+ refresh_grant.assert_called_with(
+ request,
+ self.TOKEN_URI,
+ self.REFRESH_TOKEN,
+ self.CLIENT_ID,
+ self.CLIENT_SECRET,
+ None,
+ self.RAPT_TOKEN,
+ )
+
+ # Check that the credentials have the token and expiry
+ assert credentials.token == token
+ assert credentials.expiry == expiry
+ assert credentials.id_token == mock.sentinel.id_token
+ assert credentials.rapt_token == new_rapt_token
+
+ # Check that the credentials are valid (have a token and are not
+ # expired)
+ assert credentials.valid
+
+ def test_refresh_no_refresh_token(self):
+ request = mock.create_autospec(transport.Request)
+ credentials_ = credentials.Credentials(token=None, refresh_token=None)
+
+ with pytest.raises(exceptions.RefreshError, match="necessary fields"):
+ credentials_.refresh(request)
+
+ request.assert_not_called()
+
+ @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
+ @mock.patch(
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
+ )
+ def test_refresh_with_refresh_token_and_refresh_handler(
+ self, unused_utcnow, refresh_grant
+ ):
+ token = "token"
+ new_rapt_token = "new_rapt_token"
+ expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
+ grant_response = {"id_token": mock.sentinel.id_token}
+ refresh_grant.return_value = (
+ # Access token
+ token,
+ # New refresh token
+ None,
+ # Expiry,
+ expiry,
+ # Extra data
+ grant_response,
+ # rapt_token
+ new_rapt_token,
+ )
+
+ refresh_handler = mock.Mock()
+ request = mock.create_autospec(transport.Request)
+ creds = credentials.Credentials(
+ token=None,
+ refresh_token=self.REFRESH_TOKEN,
+ token_uri=self.TOKEN_URI,
+ client_id=self.CLIENT_ID,
+ client_secret=self.CLIENT_SECRET,
+ rapt_token=self.RAPT_TOKEN,
+ refresh_handler=refresh_handler,
+ )
+
+ # Refresh credentials
+ creds.refresh(request)
+
+ # Check jwt grant call.
+ refresh_grant.assert_called_with(
+ request,
+ self.TOKEN_URI,
+ self.REFRESH_TOKEN,
+ self.CLIENT_ID,
+ self.CLIENT_SECRET,
+ None,
+ self.RAPT_TOKEN,
+ )
+
+ # Check that the credentials have the token and expiry
+ assert creds.token == token
+ assert creds.expiry == expiry
+ assert creds.id_token == mock.sentinel.id_token
+ assert creds.rapt_token == new_rapt_token
+
+ # Check that the credentials are valid (have a token and are not
+ # expired)
+ assert creds.valid
+
+ # Assert refresh handler not called as the refresh token has
+ # higher priority.
+ refresh_handler.assert_not_called()
+
+ @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+ def test_refresh_with_refresh_handler_success_scopes(self, unused_utcnow):
+ expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800)
+ refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN", expected_expiry))
+ scopes = ["email", "profile"]
+ default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
+ request = mock.create_autospec(transport.Request)
+ creds = credentials.Credentials(
+ token=None,
+ refresh_token=None,
+ token_uri=None,
+ client_id=None,
+ client_secret=None,
+ rapt_token=None,
+ scopes=scopes,
+ default_scopes=default_scopes,
+ refresh_handler=refresh_handler,
+ )
+
+ creds.refresh(request)
+
+ assert creds.token == "ACCESS_TOKEN"
+ assert creds.expiry == expected_expiry
+ assert creds.valid
+ assert not creds.expired
+ # Confirm refresh handler called with the expected arguments.
+ refresh_handler.assert_called_with(request, scopes=scopes)
+
+ @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+ def test_refresh_with_refresh_handler_success_default_scopes(self, unused_utcnow):
+ expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800)
+ original_refresh_handler = mock.Mock(
+ return_value=("UNUSED_TOKEN", expected_expiry)
+ )
+ refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN", expected_expiry))
+ default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
+ request = mock.create_autospec(transport.Request)
+ creds = credentials.Credentials(
+ token=None,
+ refresh_token=None,
+ token_uri=None,
+ client_id=None,
+ client_secret=None,
+ rapt_token=None,
+ scopes=None,
+ default_scopes=default_scopes,
+ refresh_handler=original_refresh_handler,
+ )
+
+ # Test newly set refresh_handler is used instead of the original one.
+ creds.refresh_handler = refresh_handler
+ creds.refresh(request)
+
+ assert creds.token == "ACCESS_TOKEN"
+ assert creds.expiry == expected_expiry
+ assert creds.valid
+ assert not creds.expired
+ # default_scopes should be used since no developer provided scopes
+ # are provided.
+ refresh_handler.assert_called_with(request, scopes=default_scopes)
+
+ @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+ def test_refresh_with_refresh_handler_invalid_token(self, unused_utcnow):
+ expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800)
+ # Simulate refresh handler does not return a valid token.
+ refresh_handler = mock.Mock(return_value=(None, expected_expiry))
+ scopes = ["email", "profile"]
+ default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
+ request = mock.create_autospec(transport.Request)
+ creds = credentials.Credentials(
+ token=None,
+ refresh_token=None,
+ token_uri=None,
+ client_id=None,
+ client_secret=None,
+ rapt_token=None,
+ scopes=scopes,
+ default_scopes=default_scopes,
+ refresh_handler=refresh_handler,
+ )
+
+ with pytest.raises(
+ exceptions.RefreshError, match="returned token is not a string"
+ ):
+ creds.refresh(request)
+
+ assert creds.token is None
+ assert creds.expiry is None
+ assert not creds.valid
+ # Confirm refresh handler called with the expected arguments.
+ refresh_handler.assert_called_with(request, scopes=scopes)
+
+ def test_refresh_with_refresh_handler_invalid_expiry(self):
+ # Simulate refresh handler returns expiration time in an invalid unit.
+ refresh_handler = mock.Mock(return_value=("TOKEN", 2800))
+ scopes = ["email", "profile"]
+ default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
+ request = mock.create_autospec(transport.Request)
+ creds = credentials.Credentials(
+ token=None,
+ refresh_token=None,
+ token_uri=None,
+ client_id=None,
+ client_secret=None,
+ rapt_token=None,
+ scopes=scopes,
+ default_scopes=default_scopes,
+ refresh_handler=refresh_handler,
+ )
+
+ with pytest.raises(
+ exceptions.RefreshError, match="returned expiry is not a datetime object"
+ ):
+ creds.refresh(request)
+
+ assert creds.token is None
+ assert creds.expiry is None
+ assert not creds.valid
+ # Confirm refresh handler called with the expected arguments.
+ refresh_handler.assert_called_with(request, scopes=scopes)
+
+ @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+ def test_refresh_with_refresh_handler_expired_token(self, unused_utcnow):
+ expected_expiry = datetime.datetime.min + _helpers.CLOCK_SKEW
+ # Simulate refresh handler returns an expired token.
+ refresh_handler = mock.Mock(return_value=("TOKEN", expected_expiry))
+ scopes = ["email", "profile"]
+ default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
+ request = mock.create_autospec(transport.Request)
+ creds = credentials.Credentials(
+ token=None,
+ refresh_token=None,
+ token_uri=None,
+ client_id=None,
+ client_secret=None,
+ rapt_token=None,
+ scopes=scopes,
+ default_scopes=default_scopes,
+ refresh_handler=refresh_handler,
+ )
+
+ with pytest.raises(exceptions.RefreshError, match="already expired"):
+ creds.refresh(request)
+
+ assert creds.token is None
+ assert creds.expiry is None
+ assert not creds.valid
+ # Confirm refresh handler called with the expected arguments.
+ refresh_handler.assert_called_with(request, scopes=scopes)
+
+ @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
+ @mock.patch(
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
+ )
+ def test_credentials_with_scopes_requested_refresh_success(
+ self, unused_utcnow, refresh_grant
+ ):
+ scopes = ["email", "profile"]
+ default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
+ token = "token"
+ new_rapt_token = "new_rapt_token"
+ expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
+ grant_response = {"id_token": mock.sentinel.id_token, "scope": "email profile"}
+ refresh_grant.return_value = (
+ # Access token
+ token,
+ # New refresh token
+ None,
+ # Expiry,
+ expiry,
+ # Extra data
+ grant_response,
+ # rapt token
+ new_rapt_token,
+ )
+
+ request = mock.create_autospec(transport.Request)
+ creds = credentials.Credentials(
+ token=None,
+ refresh_token=self.REFRESH_TOKEN,
+ token_uri=self.TOKEN_URI,
+ client_id=self.CLIENT_ID,
+ client_secret=self.CLIENT_SECRET,
+ scopes=scopes,
+ default_scopes=default_scopes,
+ rapt_token=self.RAPT_TOKEN,
+ )
+
+ # Refresh credentials
+ creds.refresh(request)
+
+ # Check jwt grant call.
+ refresh_grant.assert_called_with(
+ request,
+ self.TOKEN_URI,
+ self.REFRESH_TOKEN,
+ self.CLIENT_ID,
+ self.CLIENT_SECRET,
+ scopes,
+ self.RAPT_TOKEN,
+ )
+
+ # Check that the credentials have the token and expiry
+ assert creds.token == token
+ assert creds.expiry == expiry
+ assert creds.id_token == mock.sentinel.id_token
+ assert creds.has_scopes(scopes)
+ assert creds.rapt_token == new_rapt_token
+
+ # Check that the credentials are valid (have a token and are not
+ # expired.)
+ assert creds.valid
+
+ @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
+ @mock.patch(
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
+ )
+ def test_credentials_with_only_default_scopes_requested(
+ self, unused_utcnow, refresh_grant
+ ):
+ default_scopes = ["email", "profile"]
+ token = "token"
+ new_rapt_token = "new_rapt_token"
+ expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
+ grant_response = {"id_token": mock.sentinel.id_token}
+ refresh_grant.return_value = (
+ # Access token
+ token,
+ # New refresh token
+ None,
+ # Expiry,
+ expiry,
+ # Extra data
+ grant_response,
+ # rapt token
+ new_rapt_token,
+ )
+
+ request = mock.create_autospec(transport.Request)
+ creds = credentials.Credentials(
+ token=None,
+ refresh_token=self.REFRESH_TOKEN,
+ token_uri=self.TOKEN_URI,
+ client_id=self.CLIENT_ID,
+ client_secret=self.CLIENT_SECRET,
+ default_scopes=default_scopes,
+ rapt_token=self.RAPT_TOKEN,
+ )
+
+ # Refresh credentials
+ creds.refresh(request)
+
+ # Check jwt grant call.
+ refresh_grant.assert_called_with(
+ request,
+ self.TOKEN_URI,
+ self.REFRESH_TOKEN,
+ self.CLIENT_ID,
+ self.CLIENT_SECRET,
+ default_scopes,
+ self.RAPT_TOKEN,
+ )
+
+ # Check that the credentials have the token and expiry
+ assert creds.token == token
+ assert creds.expiry == expiry
+ assert creds.id_token == mock.sentinel.id_token
+ assert creds.has_scopes(default_scopes)
+ assert creds.rapt_token == new_rapt_token
+
+ # Check that the credentials are valid (have a token and are not
+ # expired.)
+ assert creds.valid
+
+ @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
+ @mock.patch(
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
+ )
+ def test_credentials_with_scopes_returned_refresh_success(
+ self, unused_utcnow, refresh_grant
+ ):
+ scopes = ["email", "profile"]
+ token = "token"
+ new_rapt_token = "new_rapt_token"
+ expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
+ grant_response = {
+ "id_token": mock.sentinel.id_token,
+ "scopes": " ".join(scopes),
+ }
+ refresh_grant.return_value = (
+ # Access token
+ token,
+ # New refresh token
+ None,
+ # Expiry,
+ expiry,
+ # Extra data
+ grant_response,
+ # rapt token
+ new_rapt_token,
+ )
+
+ request = mock.create_autospec(transport.Request)
+ creds = credentials.Credentials(
+ token=None,
+ refresh_token=self.REFRESH_TOKEN,
+ token_uri=self.TOKEN_URI,
+ client_id=self.CLIENT_ID,
+ client_secret=self.CLIENT_SECRET,
+ scopes=scopes,
+ rapt_token=self.RAPT_TOKEN,
+ )
+
+ # Refresh credentials
+ creds.refresh(request)
+
+ # Check jwt grant call.
+ refresh_grant.assert_called_with(
+ request,
+ self.TOKEN_URI,
+ self.REFRESH_TOKEN,
+ self.CLIENT_ID,
+ self.CLIENT_SECRET,
+ scopes,
+ self.RAPT_TOKEN,
+ )
+
+ # Check that the credentials have the token and expiry
+ assert creds.token == token
+ assert creds.expiry == expiry
+ assert creds.id_token == mock.sentinel.id_token
+ assert creds.has_scopes(scopes)
+ assert creds.rapt_token == new_rapt_token
+
+ # Check that the credentials are valid (have a token and are not
+ # expired.)
+ assert creds.valid
+
+ @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
+ @mock.patch(
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
+ )
+ def test_credentials_with_scopes_refresh_failure_raises_refresh_error(
+ self, unused_utcnow, refresh_grant
+ ):
+ scopes = ["email", "profile"]
+ scopes_returned = ["email"]
+ token = "token"
+ new_rapt_token = "new_rapt_token"
+ expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
+ grant_response = {
+ "id_token": mock.sentinel.id_token,
+ "scope": " ".join(scopes_returned),
+ }
+ refresh_grant.return_value = (
+ # Access token
+ token,
+ # New refresh token
+ None,
+ # Expiry,
+ expiry,
+ # Extra data
+ grant_response,
+ # rapt token
+ new_rapt_token,
+ )
+
+ request = mock.create_autospec(transport.Request)
+ creds = credentials.Credentials(
+ token=None,
+ refresh_token=self.REFRESH_TOKEN,
+ token_uri=self.TOKEN_URI,
+ client_id=self.CLIENT_ID,
+ client_secret=self.CLIENT_SECRET,
+ scopes=scopes,
+ rapt_token=self.RAPT_TOKEN,
+ )
+
+ # Refresh credentials
+ with pytest.raises(
+ exceptions.RefreshError, match="Not all requested scopes were granted"
+ ):
+ creds.refresh(request)
+
+ # Check jwt grant call.
+ refresh_grant.assert_called_with(
+ request,
+ self.TOKEN_URI,
+ self.REFRESH_TOKEN,
+ self.CLIENT_ID,
+ self.CLIENT_SECRET,
+ scopes,
+ self.RAPT_TOKEN,
+ )
+
+ # Check that the credentials have the token and expiry
+ assert creds.token == token
+ assert creds.expiry == expiry
+ assert creds.id_token == mock.sentinel.id_token
+ assert creds.has_scopes(scopes)
+ assert creds.rapt_token == new_rapt_token
+
+ # Check that the credentials are valid (have a token and are not
+ # expired.)
+ assert creds.valid
+
+ def test_apply_with_quota_project_id(self):
+ creds = credentials.Credentials(
+ token="token",
+ refresh_token=self.REFRESH_TOKEN,
+ token_uri=self.TOKEN_URI,
+ client_id=self.CLIENT_ID,
+ client_secret=self.CLIENT_SECRET,
+ quota_project_id="quota-project-123",
+ )
+
+ headers = {}
+ creds.apply(headers)
+ assert headers["x-goog-user-project"] == "quota-project-123"
+ assert "token" in headers["authorization"]
+
+ def test_apply_with_no_quota_project_id(self):
+ creds = credentials.Credentials(
+ token="token",
+ refresh_token=self.REFRESH_TOKEN,
+ token_uri=self.TOKEN_URI,
+ client_id=self.CLIENT_ID,
+ client_secret=self.CLIENT_SECRET,
+ )
+
+ headers = {}
+ creds.apply(headers)
+ assert "x-goog-user-project" not in headers
+ assert "token" in headers["authorization"]
+
+ def test_with_quota_project(self):
+ creds = credentials.Credentials(
+ token="token",
+ refresh_token=self.REFRESH_TOKEN,
+ token_uri=self.TOKEN_URI,
+ client_id=self.CLIENT_ID,
+ client_secret=self.CLIENT_SECRET,
+ quota_project_id="quota-project-123",
+ )
+
+ new_creds = creds.with_quota_project("new-project-456")
+ assert new_creds.quota_project_id == "new-project-456"
+ headers = {}
+ creds.apply(headers)
+ assert "x-goog-user-project" in headers
+
+ def test_from_authorized_user_info(self):
+ info = AUTH_USER_INFO.copy()
+
+ creds = credentials.Credentials.from_authorized_user_info(info)
+ assert creds.client_secret == info["client_secret"]
+ assert creds.client_id == info["client_id"]
+ assert creds.refresh_token == info["refresh_token"]
+ assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
+ assert creds.scopes is None
+
+ scopes = ["email", "profile"]
+ creds = credentials.Credentials.from_authorized_user_info(info, scopes)
+ assert creds.client_secret == info["client_secret"]
+ assert creds.client_id == info["client_id"]
+ assert creds.refresh_token == info["refresh_token"]
+ assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
+ assert creds.scopes == scopes
+
+ info["scopes"] = "email" # single non-array scope from file
+ creds = credentials.Credentials.from_authorized_user_info(info)
+ assert creds.scopes == [info["scopes"]]
+
+ info["scopes"] = ["email", "profile"] # array scope from file
+ creds = credentials.Credentials.from_authorized_user_info(info)
+ assert creds.scopes == info["scopes"]
+
+ expiry = datetime.datetime(2020, 8, 14, 15, 54, 1)
+ info["expiry"] = expiry.isoformat() + "Z"
+ creds = credentials.Credentials.from_authorized_user_info(info)
+ assert creds.expiry == expiry
+ assert creds.expired
+
+ def test_from_authorized_user_file(self):
+ info = AUTH_USER_INFO.copy()
+
+ creds = credentials.Credentials.from_authorized_user_file(AUTH_USER_JSON_FILE)
+ assert creds.client_secret == info["client_secret"]
+ assert creds.client_id == info["client_id"]
+ assert creds.refresh_token == info["refresh_token"]
+ assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
+ assert creds.scopes is None
+
+ scopes = ["email", "profile"]
+ creds = credentials.Credentials.from_authorized_user_file(
+ AUTH_USER_JSON_FILE, scopes
+ )
+ assert creds.client_secret == info["client_secret"]
+ assert creds.client_id == info["client_id"]
+ assert creds.refresh_token == info["refresh_token"]
+ assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
+ assert creds.scopes == scopes
+
+ def test_to_json(self):
+ info = AUTH_USER_INFO.copy()
+ expiry = datetime.datetime(2020, 8, 14, 15, 54, 1)
+ info["expiry"] = expiry.isoformat() + "Z"
+ creds = credentials.Credentials.from_authorized_user_info(info)
+ assert creds.expiry == expiry
+
+ # Test with no `strip` arg
+ json_output = creds.to_json()
+ json_asdict = json.loads(json_output)
+ assert json_asdict.get("token") == creds.token
+ assert json_asdict.get("refresh_token") == creds.refresh_token
+ assert json_asdict.get("token_uri") == creds.token_uri
+ assert json_asdict.get("client_id") == creds.client_id
+ assert json_asdict.get("scopes") == creds.scopes
+ assert json_asdict.get("client_secret") == creds.client_secret
+ assert json_asdict.get("expiry") == info["expiry"]
+
+ # Test with a `strip` arg
+ json_output = creds.to_json(strip=["client_secret"])
+ json_asdict = json.loads(json_output)
+ assert json_asdict.get("token") == creds.token
+ assert json_asdict.get("refresh_token") == creds.refresh_token
+ assert json_asdict.get("token_uri") == creds.token_uri
+ assert json_asdict.get("client_id") == creds.client_id
+ assert json_asdict.get("scopes") == creds.scopes
+ assert json_asdict.get("client_secret") is None
+
+ # Test with no expiry
+ creds.expiry = None
+ json_output = creds.to_json()
+ json_asdict = json.loads(json_output)
+ assert json_asdict.get("expiry") is None
+
+ def test_pickle_and_unpickle(self):
+ creds = self.make_credentials()
+ unpickled = pickle.loads(pickle.dumps(creds))
+
+ # make sure attributes aren't lost during pickling
+ assert list(creds.__dict__).sort() == list(unpickled.__dict__).sort()
+
+ for attr in list(creds.__dict__):
+ assert getattr(creds, attr) == getattr(unpickled, attr)
+
+ def test_pickle_and_unpickle_with_refresh_handler(self):
+ expected_expiry = _helpers.utcnow() + datetime.timedelta(seconds=2800)
+ refresh_handler = mock.Mock(return_value=("TOKEN", expected_expiry))
+
+ creds = credentials.Credentials(
+ token=None,
+ refresh_token=None,
+ token_uri=None,
+ client_id=None,
+ client_secret=None,
+ rapt_token=None,
+ refresh_handler=refresh_handler,
+ )
+ unpickled = pickle.loads(pickle.dumps(creds))
+
+ # make sure attributes aren't lost during pickling
+ assert list(creds.__dict__).sort() == list(unpickled.__dict__).sort()
+
+ for attr in list(creds.__dict__):
+ # For the _refresh_handler property, the unpickled creds should be
+ # set to None.
+ if attr == "_refresh_handler":
+ assert getattr(unpickled, attr) is None
+ else:
+ assert getattr(creds, attr) == getattr(unpickled, attr)
+
+ def test_pickle_with_missing_attribute(self):
+ creds = self.make_credentials()
+
+ # remove an optional attribute before pickling
+ # this mimics a pickle created with a previous class definition with
+ # fewer attributes
+ del creds.__dict__["_quota_project_id"]
+
+ unpickled = pickle.loads(pickle.dumps(creds))
+
+ # Attribute should be initialized by `__setstate__`
+ assert unpickled.quota_project_id is None
+
+ # pickles are not compatible across versions
+ @pytest.mark.skipif(
+ sys.version_info < (3, 5),
+ reason="pickle file can only be loaded with Python >= 3.5",
+ )
+ def test_unpickle_old_credentials_pickle(self):
+ # make sure a credentials file pickled with an older
+ # library version (google-auth==1.5.1) can be unpickled
+ with open(
+ os.path.join(DATA_DIR, "old_oauth_credentials_py3.pickle"), "rb"
+ ) as f:
+ credentials = pickle.load(f)
+ assert credentials.quota_project_id is None
+
+
+class TestUserAccessTokenCredentials(object):
+ def test_instance(self):
+ cred = credentials.UserAccessTokenCredentials()
+ assert cred._account is None
+
+ cred = cred.with_account("account")
+ assert cred._account == "account"
+
+ @mock.patch("google.auth._cloud_sdk.get_auth_access_token", autospec=True)
+ def test_refresh(self, get_auth_access_token):
+ get_auth_access_token.return_value = "access_token"
+ cred = credentials.UserAccessTokenCredentials()
+ cred.refresh(None)
+ assert cred.token == "access_token"
+
+ def test_with_quota_project(self):
+ cred = credentials.UserAccessTokenCredentials()
+ quota_project_cred = cred.with_quota_project("project-foo")
+
+ assert quota_project_cred._quota_project_id == "project-foo"
+ assert quota_project_cred._account == cred._account
+
+ @mock.patch(
+ "google.oauth2.credentials.UserAccessTokenCredentials.apply", autospec=True
+ )
+ @mock.patch(
+ "google.oauth2.credentials.UserAccessTokenCredentials.refresh", autospec=True
+ )
+ def test_before_request(self, refresh, apply):
+ cred = credentials.UserAccessTokenCredentials()
+ cred.before_request(mock.Mock(), "GET", "https://example.com", {})
+ refresh.assert_called()
+ apply.assert_called()
diff --git a/contrib/python/google-auth/py2/tests/oauth2/test_id_token.py b/contrib/python/google-auth/py2/tests/oauth2/test_id_token.py
new file mode 100644
index 0000000000..9576f2562a
--- /dev/null
+++ b/contrib/python/google-auth/py2/tests/oauth2/test_id_token.py
@@ -0,0 +1,228 @@
+# Copyright 2014 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+
+import mock
+import pytest
+
+from google.auth import environment_vars
+from google.auth import exceptions
+from google.auth import transport
+import google.auth.compute_engine._metadata
+from google.oauth2 import id_token
+from google.oauth2 import service_account
+
+import yatest.common
+SERVICE_ACCOUNT_FILE = os.path.join(
+ yatest.common.test_source_path(), "data/service_account.json"
+)
+
+
+def make_request(status, data=None):
+ response = mock.create_autospec(transport.Response, instance=True)
+ response.status = status
+
+ if data is not None:
+ response.data = json.dumps(data).encode("utf-8")
+
+ request = mock.create_autospec(transport.Request)
+ request.return_value = response
+ return request
+
+
+def test__fetch_certs_success():
+ certs = {"1": "cert"}
+ request = make_request(200, certs)
+
+ returned_certs = id_token._fetch_certs(request, mock.sentinel.cert_url)
+
+ request.assert_called_once_with(mock.sentinel.cert_url, method="GET")
+ assert returned_certs == certs
+
+
+def test__fetch_certs_failure():
+ request = make_request(404)
+
+ with pytest.raises(exceptions.TransportError):
+ id_token._fetch_certs(request, mock.sentinel.cert_url)
+
+ request.assert_called_once_with(mock.sentinel.cert_url, method="GET")
+
+
+@mock.patch("google.auth.jwt.decode", autospec=True)
+@mock.patch("google.oauth2.id_token._fetch_certs", autospec=True)
+def test_verify_token(_fetch_certs, decode):
+ result = id_token.verify_token(mock.sentinel.token, mock.sentinel.request)
+
+ assert result == decode.return_value
+ _fetch_certs.assert_called_once_with(
+ mock.sentinel.request, id_token._GOOGLE_OAUTH2_CERTS_URL
+ )
+ decode.assert_called_once_with(
+ mock.sentinel.token, certs=_fetch_certs.return_value, audience=None
+ )
+
+
+@mock.patch("google.auth.jwt.decode", autospec=True)
+@mock.patch("google.oauth2.id_token._fetch_certs", autospec=True)
+def test_verify_token_args(_fetch_certs, decode):
+ result = id_token.verify_token(
+ mock.sentinel.token,
+ mock.sentinel.request,
+ audience=mock.sentinel.audience,
+ certs_url=mock.sentinel.certs_url,
+ )
+
+ assert result == decode.return_value
+ _fetch_certs.assert_called_once_with(mock.sentinel.request, mock.sentinel.certs_url)
+ decode.assert_called_once_with(
+ mock.sentinel.token,
+ certs=_fetch_certs.return_value,
+ audience=mock.sentinel.audience,
+ )
+
+
+@mock.patch("google.oauth2.id_token.verify_token", autospec=True)
+def test_verify_oauth2_token(verify_token):
+ verify_token.return_value = {"iss": "accounts.google.com"}
+ result = id_token.verify_oauth2_token(
+ mock.sentinel.token, mock.sentinel.request, audience=mock.sentinel.audience
+ )
+
+ assert result == verify_token.return_value
+ verify_token.assert_called_once_with(
+ mock.sentinel.token,
+ mock.sentinel.request,
+ audience=mock.sentinel.audience,
+ certs_url=id_token._GOOGLE_OAUTH2_CERTS_URL,
+ )
+
+
+@mock.patch("google.oauth2.id_token.verify_token", autospec=True)
+def test_verify_oauth2_token_invalid_iss(verify_token):
+ verify_token.return_value = {"iss": "invalid_issuer"}
+
+ with pytest.raises(exceptions.GoogleAuthError):
+ id_token.verify_oauth2_token(
+ mock.sentinel.token, mock.sentinel.request, audience=mock.sentinel.audience
+ )
+
+
+@mock.patch("google.oauth2.id_token.verify_token", autospec=True)
+def test_verify_firebase_token(verify_token):
+ result = id_token.verify_firebase_token(
+ mock.sentinel.token, mock.sentinel.request, audience=mock.sentinel.audience
+ )
+
+ assert result == verify_token.return_value
+ verify_token.assert_called_once_with(
+ mock.sentinel.token,
+ mock.sentinel.request,
+ audience=mock.sentinel.audience,
+ certs_url=id_token._GOOGLE_APIS_CERTS_URL,
+ )
+
+
+def test_fetch_id_token_from_metadata_server(monkeypatch):
+ monkeypatch.delenv(environment_vars.CREDENTIALS, raising=False)
+
+ def mock_init(self, request, audience, use_metadata_identity_endpoint):
+ assert use_metadata_identity_endpoint
+ self.token = "id_token"
+
+ with mock.patch("google.auth.compute_engine._metadata.ping", return_value=True):
+ with mock.patch.multiple(
+ google.auth.compute_engine.IDTokenCredentials,
+ __init__=mock_init,
+ refresh=mock.Mock(),
+ ):
+ request = mock.Mock()
+ token = id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
+ assert token == "id_token"
+
+
+def test_fetch_id_token_from_explicit_cred_json_file(monkeypatch):
+ monkeypatch.setenv(environment_vars.CREDENTIALS, SERVICE_ACCOUNT_FILE)
+
+ def mock_refresh(self, request):
+ self.token = "id_token"
+
+ with mock.patch.object(service_account.IDTokenCredentials, "refresh", mock_refresh):
+ request = mock.Mock()
+ token = id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
+ assert token == "id_token"
+
+
+def test_fetch_id_token_no_cred_exists(monkeypatch):
+ monkeypatch.delenv(environment_vars.CREDENTIALS, raising=False)
+
+ with mock.patch(
+ "google.auth.compute_engine._metadata.ping",
+ side_effect=exceptions.TransportError(),
+ ):
+ with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
+ request = mock.Mock()
+ id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
+ assert excinfo.match(
+ r"Neither metadata server or valid service account credentials are found."
+ )
+
+ with mock.patch("google.auth.compute_engine._metadata.ping", return_value=False):
+ with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
+ request = mock.Mock()
+ id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
+ assert excinfo.match(
+ r"Neither metadata server or valid service account credentials are found."
+ )
+
+
+def test_fetch_id_token_invalid_cred_file_type(monkeypatch):
+ user_credentials_file = os.path.join(
+ yatest.common.test_source_path(), "data/authorized_user.json"
+ )
+ monkeypatch.setenv(environment_vars.CREDENTIALS, user_credentials_file)
+
+ with mock.patch("google.auth.compute_engine._metadata.ping", return_value=False):
+ with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
+ request = mock.Mock()
+ id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
+ assert excinfo.match(
+ r"Neither metadata server or valid service account credentials are found."
+ )
+
+
+def test_fetch_id_token_invalid_json(monkeypatch):
+ not_json_file = os.path.join(yatest.common.test_source_path(), "data/public_cert.pem")
+ monkeypatch.setenv(environment_vars.CREDENTIALS, not_json_file)
+
+ with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
+ request = mock.Mock()
+ id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
+ assert excinfo.match(
+ r"GOOGLE_APPLICATION_CREDENTIALS is not valid service account credentials."
+ )
+
+
+def test_fetch_id_token_invalid_cred_path(monkeypatch):
+ not_json_file = os.path.join(yatest.common.test_source_path(), "data/not_exists.json")
+ monkeypatch.setenv(environment_vars.CREDENTIALS, not_json_file)
+
+ with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
+ request = mock.Mock()
+ id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
+ assert excinfo.match(
+ r"GOOGLE_APPLICATION_CREDENTIALS path is either not found or invalid."
+ )
diff --git a/contrib/python/google-auth/py2/tests/oauth2/test_reauth.py b/contrib/python/google-auth/py2/tests/oauth2/test_reauth.py
new file mode 100644
index 0000000000..7876986873
--- /dev/null
+++ b/contrib/python/google-auth/py2/tests/oauth2/test_reauth.py
@@ -0,0 +1,308 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+
+import mock
+import pytest
+
+from google.auth import exceptions
+from google.oauth2 import reauth
+
+
+MOCK_REQUEST = mock.Mock()
+CHALLENGES_RESPONSE_TEMPLATE = {
+ "status": "CHALLENGE_REQUIRED",
+ "sessionId": "123",
+ "challenges": [
+ {
+ "status": "READY",
+ "challengeId": 1,
+ "challengeType": "PASSWORD",
+ "securityKey": {},
+ }
+ ],
+}
+CHALLENGES_RESPONSE_AUTHENTICATED = {
+ "status": "AUTHENTICATED",
+ "sessionId": "123",
+ "encodedProofOfReauthToken": "new_rapt_token",
+}
+
+
+class MockChallenge(object):
+ def __init__(self, name, locally_eligible, challenge_input):
+ self.name = name
+ self.is_locally_eligible = locally_eligible
+ self.challenge_input = challenge_input
+
+ def obtain_challenge_input(self, metadata):
+ return self.challenge_input
+
+
+def _test_is_interactive():
+ with mock.patch("sys.stdin.isatty", return_value=True):
+ assert reauth.is_interactive()
+
+
+def test__get_challenges():
+ with mock.patch(
+ "google.oauth2._client._token_endpoint_request"
+ ) as mock_token_endpoint_request:
+ reauth._get_challenges(MOCK_REQUEST, ["SAML"], "token")
+ mock_token_endpoint_request.assert_called_with(
+ MOCK_REQUEST,
+ reauth._REAUTH_API + ":start",
+ {"supportedChallengeTypes": ["SAML"]},
+ access_token="token",
+ use_json=True,
+ )
+
+
+def test__get_challenges_with_scopes():
+ with mock.patch(
+ "google.oauth2._client._token_endpoint_request"
+ ) as mock_token_endpoint_request:
+ reauth._get_challenges(
+ MOCK_REQUEST, ["SAML"], "token", requested_scopes=["scope"]
+ )
+ mock_token_endpoint_request.assert_called_with(
+ MOCK_REQUEST,
+ reauth._REAUTH_API + ":start",
+ {
+ "supportedChallengeTypes": ["SAML"],
+ "oauthScopesForDomainPolicyLookup": ["scope"],
+ },
+ access_token="token",
+ use_json=True,
+ )
+
+
+def test__send_challenge_result():
+ with mock.patch(
+ "google.oauth2._client._token_endpoint_request"
+ ) as mock_token_endpoint_request:
+ reauth._send_challenge_result(
+ MOCK_REQUEST, "123", "1", {"credential": "password"}, "token"
+ )
+ mock_token_endpoint_request.assert_called_with(
+ MOCK_REQUEST,
+ reauth._REAUTH_API + "/123:continue",
+ {
+ "sessionId": "123",
+ "challengeId": "1",
+ "action": "RESPOND",
+ "proposalResponse": {"credential": "password"},
+ },
+ access_token="token",
+ use_json=True,
+ )
+
+
+def test__run_next_challenge_not_ready():
+ challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
+ challenges_response["challenges"][0]["status"] = "STATUS_UNSPECIFIED"
+ assert (
+ reauth._run_next_challenge(challenges_response, MOCK_REQUEST, "token") is None
+ )
+
+
+def test__run_next_challenge_not_supported():
+ challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
+ challenges_response["challenges"][0]["challengeType"] = "CHALLENGE_TYPE_UNSPECIFIED"
+ with pytest.raises(exceptions.ReauthFailError) as excinfo:
+ reauth._run_next_challenge(challenges_response, MOCK_REQUEST, "token")
+ assert excinfo.match(r"Unsupported challenge type CHALLENGE_TYPE_UNSPECIFIED")
+
+
+def test__run_next_challenge_not_locally_eligible():
+ mock_challenge = MockChallenge("PASSWORD", False, "challenge_input")
+ with mock.patch(
+ "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
+ ):
+ with pytest.raises(exceptions.ReauthFailError) as excinfo:
+ reauth._run_next_challenge(
+ CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
+ )
+ assert excinfo.match(r"Challenge PASSWORD is not locally eligible")
+
+
+def test__run_next_challenge_no_challenge_input():
+ mock_challenge = MockChallenge("PASSWORD", True, None)
+ with mock.patch(
+ "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
+ ):
+ assert (
+ reauth._run_next_challenge(
+ CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
+ )
+ is None
+ )
+
+
+def test__run_next_challenge_success():
+ mock_challenge = MockChallenge("PASSWORD", True, {"credential": "password"})
+ with mock.patch(
+ "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
+ ):
+ with mock.patch(
+ "google.oauth2.reauth._send_challenge_result"
+ ) as mock_send_challenge_result:
+ reauth._run_next_challenge(
+ CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
+ )
+ mock_send_challenge_result.assert_called_with(
+ MOCK_REQUEST, "123", 1, {"credential": "password"}, "token"
+ )
+
+
+def test__obtain_rapt_authenticated():
+ with mock.patch(
+ "google.oauth2.reauth._get_challenges",
+ return_value=CHALLENGES_RESPONSE_AUTHENTICATED,
+ ):
+ assert reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token"
+
+
+def test__obtain_rapt_authenticated_after_run_next_challenge():
+ with mock.patch(
+ "google.oauth2.reauth._get_challenges",
+ return_value=CHALLENGES_RESPONSE_TEMPLATE,
+ ):
+ with mock.patch(
+ "google.oauth2.reauth._run_next_challenge",
+ side_effect=[
+ CHALLENGES_RESPONSE_TEMPLATE,
+ CHALLENGES_RESPONSE_AUTHENTICATED,
+ ],
+ ):
+ with mock.patch("google.oauth2.reauth.is_interactive", return_value=True):
+ assert (
+ reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token"
+ )
+
+
+def test__obtain_rapt_unsupported_status():
+ challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
+ challenges_response["status"] = "STATUS_UNSPECIFIED"
+ with mock.patch(
+ "google.oauth2.reauth._get_challenges", return_value=challenges_response
+ ):
+ with pytest.raises(exceptions.ReauthFailError) as excinfo:
+ reauth._obtain_rapt(MOCK_REQUEST, "token", None)
+ assert excinfo.match(r"API error: STATUS_UNSPECIFIED")
+
+
+def test__obtain_rapt_not_interactive():
+ with mock.patch(
+ "google.oauth2.reauth._get_challenges",
+ return_value=CHALLENGES_RESPONSE_TEMPLATE,
+ ):
+ with mock.patch("google.oauth2.reauth.is_interactive", return_value=False):
+ with pytest.raises(exceptions.ReauthFailError) as excinfo:
+ reauth._obtain_rapt(MOCK_REQUEST, "token", None)
+ assert excinfo.match(r"not in an interactive session")
+
+
+def test__obtain_rapt_not_authenticated():
+ with mock.patch(
+ "google.oauth2.reauth._get_challenges",
+ return_value=CHALLENGES_RESPONSE_TEMPLATE,
+ ):
+ with mock.patch("google.oauth2.reauth.RUN_CHALLENGE_RETRY_LIMIT", 0):
+ with pytest.raises(exceptions.ReauthFailError) as excinfo:
+ reauth._obtain_rapt(MOCK_REQUEST, "token", None)
+ assert excinfo.match(r"Reauthentication failed")
+
+
+def test_get_rapt_token():
+ with mock.patch(
+ "google.oauth2._client.refresh_grant", return_value=("token", None, None, None)
+ ) as mock_refresh_grant:
+ with mock.patch(
+ "google.oauth2.reauth._obtain_rapt", return_value="new_rapt_token"
+ ) as mock_obtain_rapt:
+ assert (
+ reauth.get_rapt_token(
+ MOCK_REQUEST,
+ "client_id",
+ "client_secret",
+ "refresh_token",
+ "token_uri",
+ )
+ == "new_rapt_token"
+ )
+ mock_refresh_grant.assert_called_with(
+ request=MOCK_REQUEST,
+ client_id="client_id",
+ client_secret="client_secret",
+ refresh_token="refresh_token",
+ token_uri="token_uri",
+ scopes=[reauth._REAUTH_SCOPE],
+ )
+ mock_obtain_rapt.assert_called_with(
+ MOCK_REQUEST, "token", requested_scopes=None
+ )
+
+
+def test_refresh_grant_failed():
+ with mock.patch(
+ "google.oauth2._client._token_endpoint_request_no_throw"
+ ) as mock_token_request:
+ mock_token_request.return_value = (False, {"error": "Bad request"})
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ reauth.refresh_grant(
+ MOCK_REQUEST,
+ "token_uri",
+ "refresh_token",
+ "client_id",
+ "client_secret",
+ scopes=["foo", "bar"],
+ rapt_token="rapt_token",
+ )
+ assert excinfo.match(r"Bad request")
+ mock_token_request.assert_called_with(
+ MOCK_REQUEST,
+ "token_uri",
+ {
+ "grant_type": "refresh_token",
+ "client_id": "client_id",
+ "client_secret": "client_secret",
+ "refresh_token": "refresh_token",
+ "scope": "foo bar",
+ "rapt": "rapt_token",
+ },
+ )
+
+
+def test_refresh_grant_success():
+ with mock.patch(
+ "google.oauth2._client._token_endpoint_request_no_throw"
+ ) as mock_token_request:
+ mock_token_request.side_effect = [
+ (False, {"error": "invalid_grant", "error_subtype": "rapt_required"}),
+ (True, {"access_token": "access_token"}),
+ ]
+ with mock.patch(
+ "google.oauth2.reauth.get_rapt_token", return_value="new_rapt_token"
+ ):
+ assert reauth.refresh_grant(
+ MOCK_REQUEST, "token_uri", "refresh_token", "client_id", "client_secret"
+ ) == (
+ "access_token",
+ "refresh_token",
+ None,
+ {"access_token": "access_token"},
+ "new_rapt_token",
+ )
diff --git a/contrib/python/google-auth/py2/tests/oauth2/test_service_account.py b/contrib/python/google-auth/py2/tests/oauth2/test_service_account.py
new file mode 100644
index 0000000000..a5d59dd713
--- /dev/null
+++ b/contrib/python/google-auth/py2/tests/oauth2/test_service_account.py
@@ -0,0 +1,433 @@
+# Copyright 2016 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import json
+import os
+
+import mock
+
+from google.auth import _helpers
+from google.auth import crypt
+from google.auth import jwt
+from google.auth import transport
+from google.oauth2 import service_account
+
+
+import yatest.common
+DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
+
+with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
+ PRIVATE_KEY_BYTES = fh.read()
+
+with open(os.path.join(DATA_DIR, "public_cert.pem"), "rb") as fh:
+ PUBLIC_CERT_BYTES = fh.read()
+
+with open(os.path.join(DATA_DIR, "other_cert.pem"), "rb") as fh:
+ OTHER_CERT_BYTES = fh.read()
+
+SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
+
+with open(SERVICE_ACCOUNT_JSON_FILE, "r") as fh:
+ SERVICE_ACCOUNT_INFO = json.load(fh)
+
+SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
+
+
+class TestCredentials(object):
+ SERVICE_ACCOUNT_EMAIL = "service-account@example.com"
+ TOKEN_URI = "https://example.com/oauth2/token"
+
+ @classmethod
+ def make_credentials(cls):
+ return service_account.Credentials(
+ SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI
+ )
+
+ def test_from_service_account_info(self):
+ credentials = service_account.Credentials.from_service_account_info(
+ SERVICE_ACCOUNT_INFO
+ )
+
+ assert credentials._signer.key_id == SERVICE_ACCOUNT_INFO["private_key_id"]
+ assert credentials.service_account_email == SERVICE_ACCOUNT_INFO["client_email"]
+ assert credentials._token_uri == SERVICE_ACCOUNT_INFO["token_uri"]
+
+ def test_from_service_account_info_args(self):
+ info = SERVICE_ACCOUNT_INFO.copy()
+ scopes = ["email", "profile"]
+ subject = "subject"
+ additional_claims = {"meta": "data"}
+
+ credentials = service_account.Credentials.from_service_account_info(
+ info, scopes=scopes, subject=subject, additional_claims=additional_claims
+ )
+
+ assert credentials.service_account_email == info["client_email"]
+ assert credentials.project_id == info["project_id"]
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._token_uri == info["token_uri"]
+ assert credentials._scopes == scopes
+ assert credentials._subject == subject
+ assert credentials._additional_claims == additional_claims
+
+ def test_from_service_account_file(self):
+ info = SERVICE_ACCOUNT_INFO.copy()
+
+ credentials = service_account.Credentials.from_service_account_file(
+ SERVICE_ACCOUNT_JSON_FILE
+ )
+
+ assert credentials.service_account_email == info["client_email"]
+ assert credentials.project_id == info["project_id"]
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._token_uri == info["token_uri"]
+
+ def test_from_service_account_file_args(self):
+ info = SERVICE_ACCOUNT_INFO.copy()
+ scopes = ["email", "profile"]
+ subject = "subject"
+ additional_claims = {"meta": "data"}
+
+ credentials = service_account.Credentials.from_service_account_file(
+ SERVICE_ACCOUNT_JSON_FILE,
+ subject=subject,
+ scopes=scopes,
+ additional_claims=additional_claims,
+ )
+
+ assert credentials.service_account_email == info["client_email"]
+ assert credentials.project_id == info["project_id"]
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._token_uri == info["token_uri"]
+ assert credentials._scopes == scopes
+ assert credentials._subject == subject
+ assert credentials._additional_claims == additional_claims
+
+ def test_default_state(self):
+ credentials = self.make_credentials()
+ assert not credentials.valid
+ # Expiration hasn't been set yet
+ assert not credentials.expired
+ # Scopes haven't been specified yet
+ assert credentials.requires_scopes
+
+ def test_sign_bytes(self):
+ credentials = self.make_credentials()
+ to_sign = b"123"
+ signature = credentials.sign_bytes(to_sign)
+ assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
+
+ def test_signer(self):
+ credentials = self.make_credentials()
+ assert isinstance(credentials.signer, crypt.Signer)
+
+ def test_signer_email(self):
+ credentials = self.make_credentials()
+ assert credentials.signer_email == self.SERVICE_ACCOUNT_EMAIL
+
+ def test_create_scoped(self):
+ credentials = self.make_credentials()
+ scopes = ["email", "profile"]
+ credentials = credentials.with_scopes(scopes)
+ assert credentials._scopes == scopes
+
+ def test_with_claims(self):
+ credentials = self.make_credentials()
+ new_credentials = credentials.with_claims({"meep": "moop"})
+ assert new_credentials._additional_claims == {"meep": "moop"}
+
+ def test_with_quota_project(self):
+ credentials = self.make_credentials()
+ new_credentials = credentials.with_quota_project("new-project-456")
+ assert new_credentials.quota_project_id == "new-project-456"
+ hdrs = {}
+ new_credentials.apply(hdrs, token="tok")
+ assert "x-goog-user-project" in hdrs
+
+ def test__make_authorization_grant_assertion(self):
+ credentials = self.make_credentials()
+ token = credentials._make_authorization_grant_assertion()
+ payload = jwt.decode(token, PUBLIC_CERT_BYTES)
+ assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL
+ assert payload["aud"] == service_account._GOOGLE_OAUTH2_TOKEN_ENDPOINT
+
+ def test__make_authorization_grant_assertion_scoped(self):
+ credentials = self.make_credentials()
+ scopes = ["email", "profile"]
+ credentials = credentials.with_scopes(scopes)
+ token = credentials._make_authorization_grant_assertion()
+ payload = jwt.decode(token, PUBLIC_CERT_BYTES)
+ assert payload["scope"] == "email profile"
+
+ def test__make_authorization_grant_assertion_subject(self):
+ credentials = self.make_credentials()
+ subject = "user@example.com"
+ credentials = credentials.with_subject(subject)
+ token = credentials._make_authorization_grant_assertion()
+ payload = jwt.decode(token, PUBLIC_CERT_BYTES)
+ assert payload["sub"] == subject
+
+ def test_apply_with_quota_project_id(self):
+ credentials = service_account.Credentials(
+ SIGNER,
+ self.SERVICE_ACCOUNT_EMAIL,
+ self.TOKEN_URI,
+ quota_project_id="quota-project-123",
+ )
+
+ headers = {}
+ credentials.apply(headers, token="token")
+
+ assert headers["x-goog-user-project"] == "quota-project-123"
+ assert "token" in headers["authorization"]
+
+ def test_apply_with_no_quota_project_id(self):
+ credentials = service_account.Credentials(
+ SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI
+ )
+
+ headers = {}
+ credentials.apply(headers, token="token")
+
+ assert "x-goog-user-project" not in headers
+ assert "token" in headers["authorization"]
+
+ @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
+ def test__create_self_signed_jwt(self, jwt):
+ credentials = service_account.Credentials(
+ SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI
+ )
+
+ audience = "https://pubsub.googleapis.com"
+ credentials._create_self_signed_jwt(audience)
+ jwt.from_signing_credentials.assert_called_once_with(credentials, audience)
+
+ @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
+ def test__create_self_signed_jwt_with_user_scopes(self, jwt):
+ credentials = service_account.Credentials(
+ SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI, scopes=["foo"]
+ )
+
+ audience = "https://pubsub.googleapis.com"
+ credentials._create_self_signed_jwt(audience)
+
+ # JWT should not be created if there are user-defined scopes
+ jwt.from_signing_credentials.assert_not_called()
+
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
+ def test_refresh_success(self, jwt_grant):
+ credentials = self.make_credentials()
+ token = "token"
+ jwt_grant.return_value = (
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ {},
+ )
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ # Refresh credentials
+ credentials.refresh(request)
+
+ # Check jwt grant call.
+ assert jwt_grant.called
+
+ called_request, token_uri, assertion = jwt_grant.call_args[0]
+ assert called_request == request
+ assert token_uri == credentials._token_uri
+ assert jwt.decode(assertion, PUBLIC_CERT_BYTES)
+ # No further assertion done on the token, as there are separate tests
+ # for checking the authorization grant assertion.
+
+ # Check that the credentials have the token.
+ assert credentials.token == token
+
+ # Check that the credentials are valid (have a token and are not
+ # expired)
+ assert credentials.valid
+
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
+ def test_before_request_refreshes(self, jwt_grant):
+ credentials = self.make_credentials()
+ token = "token"
+ jwt_grant.return_value = (
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ None,
+ )
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ # Credentials should start as invalid
+ assert not credentials.valid
+
+ # before_request should cause a refresh
+ credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
+
+ # The refresh endpoint should've been called.
+ assert jwt_grant.called
+
+ # Credentials should now be valid.
+ assert credentials.valid
+
+ @mock.patch("google.auth.jwt.Credentials._make_jwt")
+ def test_refresh_with_jwt_credentials(self, make_jwt):
+ credentials = self.make_credentials()
+ credentials._create_self_signed_jwt("https://pubsub.googleapis.com")
+
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ token = "token"
+ expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
+ make_jwt.return_value = (token, expiry)
+
+ # Credentials should start as invalid
+ assert not credentials.valid
+
+ # before_request should cause a refresh
+ credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
+
+ # Credentials should now be valid.
+ assert credentials.valid
+
+ # Assert make_jwt was called
+ assert make_jwt.called_once()
+
+ assert credentials.token == token
+ assert credentials.expiry == expiry
+
+
+class TestIDTokenCredentials(object):
+ SERVICE_ACCOUNT_EMAIL = "service-account@example.com"
+ TOKEN_URI = "https://example.com/oauth2/token"
+ TARGET_AUDIENCE = "https://example.com"
+
+ @classmethod
+ def make_credentials(cls):
+ return service_account.IDTokenCredentials(
+ SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI, cls.TARGET_AUDIENCE
+ )
+
+ def test_from_service_account_info(self):
+ credentials = service_account.IDTokenCredentials.from_service_account_info(
+ SERVICE_ACCOUNT_INFO, target_audience=self.TARGET_AUDIENCE
+ )
+
+ assert credentials._signer.key_id == SERVICE_ACCOUNT_INFO["private_key_id"]
+ assert credentials.service_account_email == SERVICE_ACCOUNT_INFO["client_email"]
+ assert credentials._token_uri == SERVICE_ACCOUNT_INFO["token_uri"]
+ assert credentials._target_audience == self.TARGET_AUDIENCE
+
+ def test_from_service_account_file(self):
+ info = SERVICE_ACCOUNT_INFO.copy()
+
+ credentials = service_account.IDTokenCredentials.from_service_account_file(
+ SERVICE_ACCOUNT_JSON_FILE, target_audience=self.TARGET_AUDIENCE
+ )
+
+ assert credentials.service_account_email == info["client_email"]
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._token_uri == info["token_uri"]
+ assert credentials._target_audience == self.TARGET_AUDIENCE
+
+ def test_default_state(self):
+ credentials = self.make_credentials()
+ assert not credentials.valid
+ # Expiration hasn't been set yet
+ assert not credentials.expired
+
+ def test_sign_bytes(self):
+ credentials = self.make_credentials()
+ to_sign = b"123"
+ signature = credentials.sign_bytes(to_sign)
+ assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
+
+ def test_signer(self):
+ credentials = self.make_credentials()
+ assert isinstance(credentials.signer, crypt.Signer)
+
+ def test_signer_email(self):
+ credentials = self.make_credentials()
+ assert credentials.signer_email == self.SERVICE_ACCOUNT_EMAIL
+
+ def test_with_target_audience(self):
+ credentials = self.make_credentials()
+ new_credentials = credentials.with_target_audience("https://new.example.com")
+ assert new_credentials._target_audience == "https://new.example.com"
+
+ def test_with_quota_project(self):
+ credentials = self.make_credentials()
+ new_credentials = credentials.with_quota_project("project-foo")
+ assert new_credentials._quota_project_id == "project-foo"
+
+ def test__make_authorization_grant_assertion(self):
+ credentials = self.make_credentials()
+ token = credentials._make_authorization_grant_assertion()
+ payload = jwt.decode(token, PUBLIC_CERT_BYTES)
+ assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL
+ assert payload["aud"] == service_account._GOOGLE_OAUTH2_TOKEN_ENDPOINT
+ assert payload["target_audience"] == self.TARGET_AUDIENCE
+
+ @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
+ def test_refresh_success(self, id_token_jwt_grant):
+ credentials = self.make_credentials()
+ token = "token"
+ id_token_jwt_grant.return_value = (
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ {},
+ )
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ # Refresh credentials
+ credentials.refresh(request)
+
+ # Check jwt grant call.
+ assert id_token_jwt_grant.called
+
+ called_request, token_uri, assertion = id_token_jwt_grant.call_args[0]
+ assert called_request == request
+ assert token_uri == credentials._token_uri
+ assert jwt.decode(assertion, PUBLIC_CERT_BYTES)
+ # No further assertion done on the token, as there are separate tests
+ # for checking the authorization grant assertion.
+
+ # Check that the credentials have the token.
+ assert credentials.token == token
+
+ # Check that the credentials are valid (have a token and are not
+ # expired)
+ assert credentials.valid
+
+ @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
+ def test_before_request_refreshes(self, id_token_jwt_grant):
+ credentials = self.make_credentials()
+ token = "token"
+ id_token_jwt_grant.return_value = (
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ None,
+ )
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ # Credentials should start as invalid
+ assert not credentials.valid
+
+ # before_request should cause a refresh
+ credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
+
+ # The refresh endpoint should've been called.
+ assert id_token_jwt_grant.called
+
+ # Credentials should now be valid.
+ assert credentials.valid
diff --git a/contrib/python/google-auth/py2/tests/oauth2/test_sts.py b/contrib/python/google-auth/py2/tests/oauth2/test_sts.py
new file mode 100644
index 0000000000..e8e008df5d
--- /dev/null
+++ b/contrib/python/google-auth/py2/tests/oauth2/test_sts.py
@@ -0,0 +1,395 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+import mock
+import pytest
+from six.moves import http_client
+from six.moves import urllib
+
+from google.auth import exceptions
+from google.auth import transport
+from google.oauth2 import sts
+from google.oauth2 import utils
+
+CLIENT_ID = "username"
+CLIENT_SECRET = "password"
+# Base64 encoding of "username:password"
+BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
+
+
+class TestStsClient(object):
+ GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
+ RESOURCE = "https://api.example.com/"
+ AUDIENCE = "urn:example:cooperation-context"
+ SCOPES = ["scope1", "scope2"]
+ REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
+ SUBJECT_TOKEN = "HEADER.SUBJECT_TOKEN_PAYLOAD.SIGNATURE"
+ SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
+ ACTOR_TOKEN = "HEADER.ACTOR_TOKEN_PAYLOAD.SIGNATURE"
+ ACTOR_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
+ TOKEN_EXCHANGE_ENDPOINT = "https://example.com/token.oauth2"
+ ADDON_HEADERS = {"x-client-version": "0.1.2"}
+ ADDON_OPTIONS = {"additional": {"non-standard": ["options"], "other": "some-value"}}
+ SUCCESS_RESPONSE = {
+ "access_token": "ACCESS_TOKEN",
+ "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
+ "token_type": "Bearer",
+ "expires_in": 3600,
+ "scope": "scope1 scope2",
+ }
+ ERROR_RESPONSE = {
+ "error": "invalid_request",
+ "error_description": "Invalid subject token",
+ "error_uri": "https://tools.ietf.org/html/rfc6749",
+ }
+ CLIENT_AUTH_BASIC = utils.ClientAuthentication(
+ utils.ClientAuthType.basic, CLIENT_ID, CLIENT_SECRET
+ )
+ CLIENT_AUTH_REQUEST_BODY = utils.ClientAuthentication(
+ utils.ClientAuthType.request_body, CLIENT_ID, CLIENT_SECRET
+ )
+
+ @classmethod
+ def make_client(cls, client_auth=None):
+ return sts.Client(cls.TOKEN_EXCHANGE_ENDPOINT, client_auth)
+
+ @classmethod
+ def make_mock_request(cls, data, status=http_client.OK):
+ response = mock.create_autospec(transport.Response, instance=True)
+ response.status = status
+ response.data = json.dumps(data).encode("utf-8")
+
+ request = mock.create_autospec(transport.Request)
+ request.return_value = response
+
+ return request
+
+ @classmethod
+ def assert_request_kwargs(cls, request_kwargs, headers, request_data):
+ """Asserts the request was called with the expected parameters.
+ """
+ assert request_kwargs["url"] == cls.TOKEN_EXCHANGE_ENDPOINT
+ assert request_kwargs["method"] == "POST"
+ assert request_kwargs["headers"] == headers
+ assert request_kwargs["body"] is not None
+ body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
+ for (k, v) in body_tuples:
+ assert v.decode("utf-8") == request_data[k.decode("utf-8")]
+ assert len(body_tuples) == len(request_data.keys())
+
+ def test_exchange_token_full_success_without_auth(self):
+ """Test token exchange success without client authentication using full
+ parameters.
+ """
+ client = self.make_client()
+ headers = self.ADDON_HEADERS.copy()
+ headers["Content-Type"] = "application/x-www-form-urlencoded"
+ request_data = {
+ "grant_type": self.GRANT_TYPE,
+ "resource": self.RESOURCE,
+ "audience": self.AUDIENCE,
+ "scope": " ".join(self.SCOPES),
+ "requested_token_type": self.REQUESTED_TOKEN_TYPE,
+ "subject_token": self.SUBJECT_TOKEN,
+ "subject_token_type": self.SUBJECT_TOKEN_TYPE,
+ "actor_token": self.ACTOR_TOKEN,
+ "actor_token_type": self.ACTOR_TOKEN_TYPE,
+ "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
+ }
+ request = self.make_mock_request(
+ status=http_client.OK, data=self.SUCCESS_RESPONSE
+ )
+
+ response = client.exchange_token(
+ request,
+ self.GRANT_TYPE,
+ self.SUBJECT_TOKEN,
+ self.SUBJECT_TOKEN_TYPE,
+ self.RESOURCE,
+ self.AUDIENCE,
+ self.SCOPES,
+ self.REQUESTED_TOKEN_TYPE,
+ self.ACTOR_TOKEN,
+ self.ACTOR_TOKEN_TYPE,
+ self.ADDON_OPTIONS,
+ self.ADDON_HEADERS,
+ )
+
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
+ assert response == self.SUCCESS_RESPONSE
+
+ def test_exchange_token_partial_success_without_auth(self):
+ """Test token exchange success without client authentication using
+ partial (required only) parameters.
+ """
+ client = self.make_client()
+ headers = {"Content-Type": "application/x-www-form-urlencoded"}
+ request_data = {
+ "grant_type": self.GRANT_TYPE,
+ "audience": self.AUDIENCE,
+ "requested_token_type": self.REQUESTED_TOKEN_TYPE,
+ "subject_token": self.SUBJECT_TOKEN,
+ "subject_token_type": self.SUBJECT_TOKEN_TYPE,
+ }
+ request = self.make_mock_request(
+ status=http_client.OK, data=self.SUCCESS_RESPONSE
+ )
+
+ response = client.exchange_token(
+ request,
+ grant_type=self.GRANT_TYPE,
+ subject_token=self.SUBJECT_TOKEN,
+ subject_token_type=self.SUBJECT_TOKEN_TYPE,
+ audience=self.AUDIENCE,
+ requested_token_type=self.REQUESTED_TOKEN_TYPE,
+ )
+
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
+ assert response == self.SUCCESS_RESPONSE
+
+ def test_exchange_token_non200_without_auth(self):
+ """Test token exchange without client auth responding with non-200 status.
+ """
+ client = self.make_client()
+ request = self.make_mock_request(
+ status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
+ )
+
+ with pytest.raises(exceptions.OAuthError) as excinfo:
+ client.exchange_token(
+ request,
+ self.GRANT_TYPE,
+ self.SUBJECT_TOKEN,
+ self.SUBJECT_TOKEN_TYPE,
+ self.RESOURCE,
+ self.AUDIENCE,
+ self.SCOPES,
+ self.REQUESTED_TOKEN_TYPE,
+ self.ACTOR_TOKEN,
+ self.ACTOR_TOKEN_TYPE,
+ self.ADDON_OPTIONS,
+ self.ADDON_HEADERS,
+ )
+
+ assert excinfo.match(
+ r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
+ )
+
+ def test_exchange_token_full_success_with_basic_auth(self):
+ """Test token exchange success with basic client authentication using full
+ parameters.
+ """
+ client = self.make_client(self.CLIENT_AUTH_BASIC)
+ headers = self.ADDON_HEADERS.copy()
+ headers["Content-Type"] = "application/x-www-form-urlencoded"
+ headers["Authorization"] = "Basic {}".format(BASIC_AUTH_ENCODING)
+ request_data = {
+ "grant_type": self.GRANT_TYPE,
+ "resource": self.RESOURCE,
+ "audience": self.AUDIENCE,
+ "scope": " ".join(self.SCOPES),
+ "requested_token_type": self.REQUESTED_TOKEN_TYPE,
+ "subject_token": self.SUBJECT_TOKEN,
+ "subject_token_type": self.SUBJECT_TOKEN_TYPE,
+ "actor_token": self.ACTOR_TOKEN,
+ "actor_token_type": self.ACTOR_TOKEN_TYPE,
+ "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
+ }
+ request = self.make_mock_request(
+ status=http_client.OK, data=self.SUCCESS_RESPONSE
+ )
+
+ response = client.exchange_token(
+ request,
+ self.GRANT_TYPE,
+ self.SUBJECT_TOKEN,
+ self.SUBJECT_TOKEN_TYPE,
+ self.RESOURCE,
+ self.AUDIENCE,
+ self.SCOPES,
+ self.REQUESTED_TOKEN_TYPE,
+ self.ACTOR_TOKEN,
+ self.ACTOR_TOKEN_TYPE,
+ self.ADDON_OPTIONS,
+ self.ADDON_HEADERS,
+ )
+
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
+ assert response == self.SUCCESS_RESPONSE
+
+ def test_exchange_token_partial_success_with_basic_auth(self):
+ """Test token exchange success with basic client authentication using
+ partial (required only) parameters.
+ """
+ client = self.make_client(self.CLIENT_AUTH_BASIC)
+ headers = {
+ "Content-Type": "application/x-www-form-urlencoded",
+ "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
+ }
+ request_data = {
+ "grant_type": self.GRANT_TYPE,
+ "audience": self.AUDIENCE,
+ "requested_token_type": self.REQUESTED_TOKEN_TYPE,
+ "subject_token": self.SUBJECT_TOKEN,
+ "subject_token_type": self.SUBJECT_TOKEN_TYPE,
+ }
+ request = self.make_mock_request(
+ status=http_client.OK, data=self.SUCCESS_RESPONSE
+ )
+
+ response = client.exchange_token(
+ request,
+ grant_type=self.GRANT_TYPE,
+ subject_token=self.SUBJECT_TOKEN,
+ subject_token_type=self.SUBJECT_TOKEN_TYPE,
+ audience=self.AUDIENCE,
+ requested_token_type=self.REQUESTED_TOKEN_TYPE,
+ )
+
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
+ assert response == self.SUCCESS_RESPONSE
+
+ def test_exchange_token_non200_with_basic_auth(self):
+ """Test token exchange with basic client auth responding with non-200
+ status.
+ """
+ client = self.make_client(self.CLIENT_AUTH_BASIC)
+ request = self.make_mock_request(
+ status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
+ )
+
+ with pytest.raises(exceptions.OAuthError) as excinfo:
+ client.exchange_token(
+ request,
+ self.GRANT_TYPE,
+ self.SUBJECT_TOKEN,
+ self.SUBJECT_TOKEN_TYPE,
+ self.RESOURCE,
+ self.AUDIENCE,
+ self.SCOPES,
+ self.REQUESTED_TOKEN_TYPE,
+ self.ACTOR_TOKEN,
+ self.ACTOR_TOKEN_TYPE,
+ self.ADDON_OPTIONS,
+ self.ADDON_HEADERS,
+ )
+
+ assert excinfo.match(
+ r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
+ )
+
+ def test_exchange_token_full_success_with_reqbody_auth(self):
+ """Test token exchange success with request body client authenticaiton
+ using full parameters.
+ """
+ client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
+ headers = self.ADDON_HEADERS.copy()
+ headers["Content-Type"] = "application/x-www-form-urlencoded"
+ request_data = {
+ "grant_type": self.GRANT_TYPE,
+ "resource": self.RESOURCE,
+ "audience": self.AUDIENCE,
+ "scope": " ".join(self.SCOPES),
+ "requested_token_type": self.REQUESTED_TOKEN_TYPE,
+ "subject_token": self.SUBJECT_TOKEN,
+ "subject_token_type": self.SUBJECT_TOKEN_TYPE,
+ "actor_token": self.ACTOR_TOKEN,
+ "actor_token_type": self.ACTOR_TOKEN_TYPE,
+ "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
+ "client_id": CLIENT_ID,
+ "client_secret": CLIENT_SECRET,
+ }
+ request = self.make_mock_request(
+ status=http_client.OK, data=self.SUCCESS_RESPONSE
+ )
+
+ response = client.exchange_token(
+ request,
+ self.GRANT_TYPE,
+ self.SUBJECT_TOKEN,
+ self.SUBJECT_TOKEN_TYPE,
+ self.RESOURCE,
+ self.AUDIENCE,
+ self.SCOPES,
+ self.REQUESTED_TOKEN_TYPE,
+ self.ACTOR_TOKEN,
+ self.ACTOR_TOKEN_TYPE,
+ self.ADDON_OPTIONS,
+ self.ADDON_HEADERS,
+ )
+
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
+ assert response == self.SUCCESS_RESPONSE
+
+ def test_exchange_token_partial_success_with_reqbody_auth(self):
+ """Test token exchange success with request body client authentication
+ using partial (required only) parameters.
+ """
+ client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
+ headers = {"Content-Type": "application/x-www-form-urlencoded"}
+ request_data = {
+ "grant_type": self.GRANT_TYPE,
+ "audience": self.AUDIENCE,
+ "requested_token_type": self.REQUESTED_TOKEN_TYPE,
+ "subject_token": self.SUBJECT_TOKEN,
+ "subject_token_type": self.SUBJECT_TOKEN_TYPE,
+ "client_id": CLIENT_ID,
+ "client_secret": CLIENT_SECRET,
+ }
+ request = self.make_mock_request(
+ status=http_client.OK, data=self.SUCCESS_RESPONSE
+ )
+
+ response = client.exchange_token(
+ request,
+ grant_type=self.GRANT_TYPE,
+ subject_token=self.SUBJECT_TOKEN,
+ subject_token_type=self.SUBJECT_TOKEN_TYPE,
+ audience=self.AUDIENCE,
+ requested_token_type=self.REQUESTED_TOKEN_TYPE,
+ )
+
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
+ assert response == self.SUCCESS_RESPONSE
+
+ def test_exchange_token_non200_with_reqbody_auth(self):
+ """Test token exchange with POST request body client auth responding
+ with non-200 status.
+ """
+ client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
+ request = self.make_mock_request(
+ status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
+ )
+
+ with pytest.raises(exceptions.OAuthError) as excinfo:
+ client.exchange_token(
+ request,
+ self.GRANT_TYPE,
+ self.SUBJECT_TOKEN,
+ self.SUBJECT_TOKEN_TYPE,
+ self.RESOURCE,
+ self.AUDIENCE,
+ self.SCOPES,
+ self.REQUESTED_TOKEN_TYPE,
+ self.ACTOR_TOKEN,
+ self.ACTOR_TOKEN_TYPE,
+ self.ADDON_OPTIONS,
+ self.ADDON_HEADERS,
+ )
+
+ assert excinfo.match(
+ r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
+ )
diff --git a/contrib/python/google-auth/py2/tests/oauth2/test_utils.py b/contrib/python/google-auth/py2/tests/oauth2/test_utils.py
new file mode 100644
index 0000000000..6de9ff5337
--- /dev/null
+++ b/contrib/python/google-auth/py2/tests/oauth2/test_utils.py
@@ -0,0 +1,264 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+import pytest
+
+from google.auth import exceptions
+from google.oauth2 import utils
+
+
+CLIENT_ID = "username"
+CLIENT_SECRET = "password"
+# Base64 encoding of "username:password"
+BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
+# Base64 encoding of "username:"
+BASIC_AUTH_ENCODING_SECRETLESS = "dXNlcm5hbWU6"
+
+
+class AuthHandler(utils.OAuthClientAuthHandler):
+ def __init__(self, client_auth=None):
+ super(AuthHandler, self).__init__(client_auth)
+
+ def apply_client_authentication_options(
+ self, headers, request_body=None, bearer_token=None
+ ):
+ return super(AuthHandler, self).apply_client_authentication_options(
+ headers, request_body, bearer_token
+ )
+
+
+class TestClientAuthentication(object):
+ @classmethod
+ def make_client_auth(cls, client_secret=None):
+ return utils.ClientAuthentication(
+ utils.ClientAuthType.basic, CLIENT_ID, client_secret
+ )
+
+ def test_initialization_with_client_secret(self):
+ client_auth = self.make_client_auth(CLIENT_SECRET)
+
+ assert client_auth.client_auth_type == utils.ClientAuthType.basic
+ assert client_auth.client_id == CLIENT_ID
+ assert client_auth.client_secret == CLIENT_SECRET
+
+ def test_initialization_no_client_secret(self):
+ client_auth = self.make_client_auth()
+
+ assert client_auth.client_auth_type == utils.ClientAuthType.basic
+ assert client_auth.client_id == CLIENT_ID
+ assert client_auth.client_secret is None
+
+
+class TestOAuthClientAuthHandler(object):
+ CLIENT_AUTH_BASIC = utils.ClientAuthentication(
+ utils.ClientAuthType.basic, CLIENT_ID, CLIENT_SECRET
+ )
+ CLIENT_AUTH_BASIC_SECRETLESS = utils.ClientAuthentication(
+ utils.ClientAuthType.basic, CLIENT_ID
+ )
+ CLIENT_AUTH_REQUEST_BODY = utils.ClientAuthentication(
+ utils.ClientAuthType.request_body, CLIENT_ID, CLIENT_SECRET
+ )
+ CLIENT_AUTH_REQUEST_BODY_SECRETLESS = utils.ClientAuthentication(
+ utils.ClientAuthType.request_body, CLIENT_ID
+ )
+
+ @classmethod
+ def make_oauth_client_auth_handler(cls, client_auth=None):
+ return AuthHandler(client_auth)
+
+ def test_apply_client_authentication_options_none(self):
+ headers = {"Content-Type": "application/json"}
+ request_body = {"foo": "bar"}
+ auth_handler = self.make_oauth_client_auth_handler()
+
+ auth_handler.apply_client_authentication_options(headers, request_body)
+
+ assert headers == {"Content-Type": "application/json"}
+ assert request_body == {"foo": "bar"}
+
+ def test_apply_client_authentication_options_basic(self):
+ headers = {"Content-Type": "application/json"}
+ request_body = {"foo": "bar"}
+ auth_handler = self.make_oauth_client_auth_handler(self.CLIENT_AUTH_BASIC)
+
+ auth_handler.apply_client_authentication_options(headers, request_body)
+
+ assert headers == {
+ "Content-Type": "application/json",
+ "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
+ }
+ assert request_body == {"foo": "bar"}
+
+ def test_apply_client_authentication_options_basic_nosecret(self):
+ headers = {"Content-Type": "application/json"}
+ request_body = {"foo": "bar"}
+ auth_handler = self.make_oauth_client_auth_handler(
+ self.CLIENT_AUTH_BASIC_SECRETLESS
+ )
+
+ auth_handler.apply_client_authentication_options(headers, request_body)
+
+ assert headers == {
+ "Content-Type": "application/json",
+ "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING_SECRETLESS),
+ }
+ assert request_body == {"foo": "bar"}
+
+ def test_apply_client_authentication_options_request_body(self):
+ headers = {"Content-Type": "application/json"}
+ request_body = {"foo": "bar"}
+ auth_handler = self.make_oauth_client_auth_handler(
+ self.CLIENT_AUTH_REQUEST_BODY
+ )
+
+ auth_handler.apply_client_authentication_options(headers, request_body)
+
+ assert headers == {"Content-Type": "application/json"}
+ assert request_body == {
+ "foo": "bar",
+ "client_id": CLIENT_ID,
+ "client_secret": CLIENT_SECRET,
+ }
+
+ def test_apply_client_authentication_options_request_body_nosecret(self):
+ headers = {"Content-Type": "application/json"}
+ request_body = {"foo": "bar"}
+ auth_handler = self.make_oauth_client_auth_handler(
+ self.CLIENT_AUTH_REQUEST_BODY_SECRETLESS
+ )
+
+ auth_handler.apply_client_authentication_options(headers, request_body)
+
+ assert headers == {"Content-Type": "application/json"}
+ assert request_body == {
+ "foo": "bar",
+ "client_id": CLIENT_ID,
+ "client_secret": "",
+ }
+
+ def test_apply_client_authentication_options_request_body_no_body(self):
+ headers = {"Content-Type": "application/json"}
+ auth_handler = self.make_oauth_client_auth_handler(
+ self.CLIENT_AUTH_REQUEST_BODY
+ )
+
+ with pytest.raises(exceptions.OAuthError) as excinfo:
+ auth_handler.apply_client_authentication_options(headers)
+
+ assert excinfo.match(r"HTTP request does not support request-body")
+
+ def test_apply_client_authentication_options_bearer_token(self):
+ bearer_token = "ACCESS_TOKEN"
+ headers = {"Content-Type": "application/json"}
+ request_body = {"foo": "bar"}
+ auth_handler = self.make_oauth_client_auth_handler()
+
+ auth_handler.apply_client_authentication_options(
+ headers, request_body, bearer_token
+ )
+
+ assert headers == {
+ "Content-Type": "application/json",
+ "Authorization": "Bearer {}".format(bearer_token),
+ }
+ assert request_body == {"foo": "bar"}
+
+ def test_apply_client_authentication_options_bearer_and_basic(self):
+ bearer_token = "ACCESS_TOKEN"
+ headers = {"Content-Type": "application/json"}
+ request_body = {"foo": "bar"}
+ auth_handler = self.make_oauth_client_auth_handler(self.CLIENT_AUTH_BASIC)
+
+ auth_handler.apply_client_authentication_options(
+ headers, request_body, bearer_token
+ )
+
+ # Bearer token should have higher priority.
+ assert headers == {
+ "Content-Type": "application/json",
+ "Authorization": "Bearer {}".format(bearer_token),
+ }
+ assert request_body == {"foo": "bar"}
+
+ def test_apply_client_authentication_options_bearer_and_request_body(self):
+ bearer_token = "ACCESS_TOKEN"
+ headers = {"Content-Type": "application/json"}
+ request_body = {"foo": "bar"}
+ auth_handler = self.make_oauth_client_auth_handler(
+ self.CLIENT_AUTH_REQUEST_BODY
+ )
+
+ auth_handler.apply_client_authentication_options(
+ headers, request_body, bearer_token
+ )
+
+ # Bearer token should have higher priority.
+ assert headers == {
+ "Content-Type": "application/json",
+ "Authorization": "Bearer {}".format(bearer_token),
+ }
+ assert request_body == {"foo": "bar"}
+
+
+def test__handle_error_response_code_only():
+ error_resp = {"error": "unsupported_grant_type"}
+ response_data = json.dumps(error_resp)
+
+ with pytest.raises(exceptions.OAuthError) as excinfo:
+ utils.handle_error_response(response_data)
+
+ assert excinfo.match(r"Error code unsupported_grant_type")
+
+
+def test__handle_error_response_code_description():
+ error_resp = {
+ "error": "unsupported_grant_type",
+ "error_description": "The provided grant_type is unsupported",
+ }
+ response_data = json.dumps(error_resp)
+
+ with pytest.raises(exceptions.OAuthError) as excinfo:
+ utils.handle_error_response(response_data)
+
+ assert excinfo.match(
+ r"Error code unsupported_grant_type: The provided grant_type is unsupported"
+ )
+
+
+def test__handle_error_response_code_description_uri():
+ error_resp = {
+ "error": "unsupported_grant_type",
+ "error_description": "The provided grant_type is unsupported",
+ "error_uri": "https://tools.ietf.org/html/rfc6749",
+ }
+ response_data = json.dumps(error_resp)
+
+ with pytest.raises(exceptions.OAuthError) as excinfo:
+ utils.handle_error_response(response_data)
+
+ assert excinfo.match(
+ r"Error code unsupported_grant_type: The provided grant_type is unsupported - https://tools.ietf.org/html/rfc6749"
+ )
+
+
+def test__handle_error_response_non_json():
+ response_data = "Oops, something wrong happened"
+
+ with pytest.raises(exceptions.OAuthError) as excinfo:
+ utils.handle_error_response(response_data)
+
+ assert excinfo.match(r"Oops, something wrong happened")