# -*- coding: utf-8 -*-
import json
from unittest import mock
from oauthlib import common
from oauthlib.oauth2.rfc6749 import errors, tokens
from oauthlib.oauth2.rfc6749.endpoints import Server
from oauthlib.oauth2.rfc6749.endpoints.authorization import (
AuthorizationEndpoint,
)
from oauthlib.oauth2.rfc6749.endpoints.resource import ResourceEndpoint
from oauthlib.oauth2.rfc6749.endpoints.token import TokenEndpoint
from oauthlib.oauth2.rfc6749.grant_types import (
AuthorizationCodeGrant, ClientCredentialsGrant, ImplicitGrant,
ResourceOwnerPasswordCredentialsGrant,
)
from tests.unittest import TestCase
class AuthorizationEndpointTest(TestCase):
def setUp(self):
self.mock_validator = mock.MagicMock()
self.mock_validator.get_code_challenge.return_value = None
self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
auth_code = AuthorizationCodeGrant(
request_validator=self.mock_validator)
auth_code.save_authorization_code = mock.MagicMock()
implicit = ImplicitGrant(
request_validator=self.mock_validator)
implicit.save_token = mock.MagicMock()
response_types = {
'code': auth_code,
'token': implicit,
'none': auth_code
}
self.expires_in = 1800
token = tokens.BearerToken(
self.mock_validator,
expires_in=self.expires_in
)
self.endpoint = AuthorizationEndpoint(
default_response_type='code',
default_token_type=token,
response_types=response_types
)
@mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
def test_authorization_grant(self):
uri = 'http://i.b/l?response_type=code&client_id=me&scope=all+of+them&state=xyz'
uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
headers, body, status_code = self.endpoint.create_authorization_response(
uri, scopes=['all', 'of', 'them'])
self.assertIn('Location', headers)
self.assertURLEqual(headers['Location'], 'http://back.to/me?code=abc&state=xyz')
@mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
def test_implicit_grant(self):
uri = 'http://i.b/l?response_type=token&client_id=me&scope=all+of+them&state=xyz'
uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
headers, body, status_code = self.endpoint.create_authorization_response(
uri, scopes=['all', 'of', 'them'])
self.assertIn('Location', headers)
self.assertURLEqual(headers['Location'], 'http://back.to/me#access_token=abc&expires_in=' + str(self.expires_in) + '&token_type=Bearer&state=xyz&scope=all+of+them', parse_fragment=True)
def test_none_grant(self):
uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them&state=xyz'
uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
headers, body, status_code = self.endpoint.create_authorization_response(
uri, scopes=['all', 'of', 'them'])
self.assertIn('Location', headers)
self.assertURLEqual(headers['Location'], 'http://back.to/me?state=xyz', parse_fragment=True)
self.assertIsNone(body)
self.assertEqual(status_code, 302)
# and without the state parameter
uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them'
uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
headers, body, status_code = self.endpoint.create_authorization_response(
uri, scopes=['all', 'of', 'them'])
self.assertIn('Location', headers)
self.assertURLEqual(headers['Location'], 'http://back.to/me', parse_fragment=True)
self.assertIsNone(body)
self.assertEqual(status_code, 302)
def test_missing_type(self):
uri = 'http://i.b/l?client_id=me&scope=all+of+them'
uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
self.mock_validator.validate_request = mock.MagicMock(
side_effect=errors.InvalidRequestError())
headers, body, status_code = self.endpoint.create_authorization_response(
uri, scopes=['all', 'of', 'them'])
self.assertIn('Location', headers)
self.assertURLEqual(headers['Location'], 'http://back.to/me?error=invalid_request&error_description=Missing+response_type+parameter.')
def test_invalid_type(self):
uri = 'http://i.b/l?response_type=invalid&client_id=me&scope=all+of+them'
uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
self.mock_validator.validate_request = mock.MagicMock(
side_effect=errors.UnsupportedResponseTypeError())
headers, body, status_code = self.endpoint.create_authorization_response(
uri, scopes=['all', 'of', 'them'])
self.assertIn('Location', headers)
self.assertURLEqual(headers['Location'], 'http://back.to/me?error=unsupported_response_type')
class TokenEndpointTest(TestCase):
def setUp(self):
def set_user(request):
request.user = mock.MagicMock()
request.client = mock.MagicMock()
request.client.client_id = 'mocked_client_id'
return True
self.mock_validator = mock.MagicMock()
self.mock_validator.authenticate_client.side_effect = set_user
self.mock_validator.get_code_challenge.return_value = None
self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
auth_code = AuthorizationCodeGrant(
request_validator=self.mock_validator)
password = ResourceOwnerPasswordCredentialsGrant(
request_validator=self.mock_validator)
client = ClientCredentialsGrant(
request_validator=self.mock_validator)
supported_types = {
'authorization_code': auth_code,
'password': password,
'client_credentials': client,
}
self.expires_in = 1800
token = tokens.BearerToken(
self.mock_validator,
expires_in=self.expires_in
)
self.endpoint = TokenEndpoint(
'authorization_code',
default_token_type=token,
grant_types=supported_types
)
@mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
def test_authorization_grant(self):
body = 'grant_type=authorization_code&code=abc&scope=all+of+them'
headers, body, status_code = self.endpoint.create_token_response(
'', body=body)
token = {
'token_type': 'Bearer',
'expires_in': self.expires_in,
'access_token': 'abc',
'refresh_token': 'abc',
'scope': 'all of them'
}
self.assertEqual(json.loads(body), token)
body = 'grant_type=authorization_code&code=abc'
headers, body, status_code = self.endpoint.create_token_response(
'', body=body)
token = {
'token_type': 'Bearer',
'expires_in': self.expires_in,
'access_token': 'abc',
'refresh_token': 'abc'
}
self.assertEqual(json.loads(body), token)
# try with additional custom variables
body = 'grant_type=authorization_code&code=abc&state=foobar'
headers, body, status_code = self.endpoint.create_token_response(
'', body=body)
self.assertEqual(json.loads(body), token)
@mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
def test_password_grant(self):
body = 'grant_type=password&username=a&password=hello&scope=all+of+them'
headers, body, status_code = self.endpoint.create_token_response(
'', body=body)
token = {
'token_type': 'Bearer',
'expires_in': self.expires_in,
'access_token': 'abc',
'refresh_token': 'abc',
'scope': 'all of them',
}
self.assertEqual(json.loads(body), token)
@mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
def test_client_grant(self):
body = 'grant_type=client_credentials&scope=all+of+them'
headers, body, status_code = self.endpoint.create_token_response(
'', body=body)
token = {
'token_type': 'Bearer',
'expires_in': self.expires_in,
'access_token': 'abc',
'scope': 'all of them',
}
self.assertEqual(json.loads(body), token)
def test_missing_type(self):
_, body, _ = self.endpoint.create_token_response('', body='')
token = {'error': 'unsupported_grant_type'}
self.assertEqual(json.loads(body), token)
def test_invalid_type(self):
body = 'grant_type=invalid'
_, body, _ = self.endpoint.create_token_response('', body=body)
token = {'error': 'unsupported_grant_type'}
self.assertEqual(json.loads(body), token)
class SignedTokenEndpointTest(TestCase):
def setUp(self):
self.expires_in = 1800
def set_user(request):
request.user = mock.MagicMock()
request.client = mock.MagicMock()
request.client.client_id = 'mocked_client_id'
return True
self.mock_validator = mock.MagicMock()
self.mock_validator.get_code_challenge.return_value = None
self.mock_validator.authenticate_client.side_effect = set_user
self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
self.private_pem = """
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEA6TtDhWGwzEOWZP6m/zHoZnAPLABfetvoMPmxPGjFjtDuMRPv
EvI1sbixZBjBtdnc5rTtHUUQ25Am3JzwPRGo5laMGbj1pPyCPxlVi9LK82HQNX0B
YK7tZtVfDHElQA7F4v3j9d3rad4O9/n+lyGIQ0tT7yQcBm2A8FEaP0bZYCLMjwMN
WfaVLE8eXHyv+MfpNNLI9wttLxygKYM48I3NwsFuJgOa/KuodXaAmf8pJnx8t1Wn
nxvaYXFiUn/TxmhM/qhemPa6+0nqq+aWV5eT7xn4K/ghLgNs09v6Yge0pmPl9Oz+
+bjJ+aKRnAmwCOY8/5U5EilAiUOeBoO9+8OXtwIDAQABAoIBAGFTTbXXMkPK4HN8
oItVdDlrAanG7hECuz3UtFUVE3upS/xG6TjqweVLwRqYCh2ssDXFwjy4mXRGDzF4
e/e/6s9Txlrlh/w1MtTJ6ZzTdcViR9RKOczysjZ7S5KRlI3KnGFAuWPcG2SuOWjZ
dZfzcj1Crd/ZHajBAVFHRsCo/ATVNKbTRprFfb27xKpQ2BwH/GG781sLE3ZVNIhs
aRRaED4622kI1E/WXws2qQMqbFKzo0m1tPbLb3Z89WgZJ/tRQwuDype1Vfm7k6oX
xfbp3948qSe/yWKRlMoPkleji/WxPkSIalzWSAi9ziN/0Uzhe65FURgrfHL3XR1A
B8UR+aECgYEA7NPQZV4cAikk02Hv65JgISofqV49P8MbLXk8sdnI1n7Mj10TgzU3
lyQGDEX4hqvT0bTXe4KAOxQZx9wumu05ejfzhdtSsEm6ptGHyCdmYDQeV0C/pxDX
JNCK8XgMku2370XG0AnyBCT7NGlgtDcNCQufcesF2gEuoKiXg6Zjo7sCgYEA/Bzs
9fWGZZnSsMSBSW2OYbFuhF3Fne0HcxXQHipl0Rujc/9g0nccwqKGizn4fGOE7a8F
usQgJoeGcinL7E9OEP/uQ9VX1C9RNVjIxP1O5/Guw1zjxQQYetOvbPhN2QhD1Ye7
0TRKrW1BapcjwLpFQlVg1ZeTPOi5lv24W/wX9jUCgYEAkrMSX/hPuTbrTNVZ3L6r
NV/2hN+PaTPeXei/pBuXwOaCqDurnpcUfFcgN/IP5LwDVd+Dq0pHTFFDNv45EFbq
R77o5n3ZVsIVEMiyJ1XgoK8oLDw7e61+15smtjT69Piz+09pu+ytMcwGn4y3Dmsb
dALzHYnL8iLRU0ubrz0ec4kCgYAJiVKRTzNBPptQom49h85d9ac3jJCAE8o3WTjh
Gzt0uHXrWlqgO280EY/DTnMOyXjqwLcXxHlu26uDP/99tdY/IF8z46sJ1KxetzgI
84f7kBHLRAU9m5UNeFpnZdEUB5MBTbwWAsNcYgiabpMkpCcghjg+fBhOsoLqqjhC
CnwhjQKBgQDkv0QTdyBU84TE8J0XY3eLQwXbrvG2yD5A2ntN3PyxGEneX5WTJGMZ
xJxwaFYQiDS3b9E7b8Q5dg8qa5Y1+epdhx3cuQAWPm+AoHKshDfbRve4txBDQAqh
c6MxSWgsa+2Ld5SWSNbGtpPcmEM3Fl5ttMCNCKtNc0UE16oHwaPAIw==
-----END RSA PRIVATE KEY-----
"""
self.public_pem = """
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA6TtDhWGwzEOWZP6m/zHo
ZnAPLABfetvoMPmxPGjFjtDuMRPvEvI1sbixZBjBtdnc5rTtHUUQ25Am3JzwPRGo
5laMGbj1pPyCPxlVi9LK82HQNX0BYK7tZtVfDHElQA7F4v3j9d3rad4O9/n+lyGI
Q0tT7yQcBm2A8FEaP0bZYCLMjwMNWfaVLE8eXHyv+MfpNNLI9wttLxygKYM48I3N
wsFuJgOa/KuodXaAmf8pJnx8t1WnnxvaYXFiUn/TxmhM/qhemPa6+0nqq+aWV5eT
7xn4K/ghLgNs09v6Yge0pmPl9Oz++bjJ+aKRnAmwCOY8/5U5EilAiUOeBoO9+8OX
twIDAQAB
-----END PUBLIC KEY-----
"""
signed_token = tokens.signed_token_generator(self.private_pem,
user_id=123)
self.endpoint = Server(
self.mock_validator,
token_expires_in=self.expires_in,
token_generator=signed_token,
refresh_token_generator=tokens.random_token_generator
)
@mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
def test_authorization_grant(self):
body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&scope=all+of+them'
headers, body, status_code = self.endpoint.create_token_response(
'', body=body)
body = json.loads(body)
token = {
'token_type': 'Bearer',
'expires_in': self.expires_in,
'access_token': body['access_token'],
'refresh_token': 'abc',
'scope': 'all of them'
}
self.assertEqual(body, token)
body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc'
headers, body, status_code = self.endpoint.create_token_response(
'', body=body)
body = json.loads(body)
token = {
'token_type': 'Bearer',
'expires_in': self.expires_in,
'access_token': body['access_token'],
'refresh_token': 'abc'
}
self.assertEqual(body, token)
# try with additional custom variables
body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&state=foobar'
headers, body, status_code = self.endpoint.create_token_response(
'', body=body)
body = json.loads(body)
token = {
'token_type': 'Bearer',
'expires_in': self.expires_in,
'access_token': body['access_token'],
'refresh_token': 'abc'
}
self.assertEqual(body, token)
@mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
def test_password_grant(self):
body = 'grant_type=password&username=a&password=hello&scope=all+of+them'
headers, body, status_code = self.endpoint.create_token_response(
'', body=body)
body = json.loads(body)
token = {
'token_type': 'Bearer',
'expires_in': self.expires_in,
'access_token': body['access_token'],
'refresh_token': 'abc',
'scope': 'all of them',
}
self.assertEqual(body, token)
@mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
def test_scopes_and_user_id_stored_in_access_token(self):
body = 'grant_type=password&username=a&password=hello&scope=all+of+them'
headers, body, status_code = self.endpoint.create_token_response(
'', body=body)
access_token = json.loads(body)['access_token']
claims = common.verify_signed_token(self.public_pem, access_token)
self.assertEqual(claims['scope'], 'all of them')
self.assertEqual(claims['user_id'], 123)
@mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
def test_client_grant(self):
body = 'grant_type=client_credentials&scope=all+of+them'
headers, body, status_code = self.endpoint.create_token_response(
'', body=body)
body = json.loads(body)
token = {
'token_type': 'Bearer',
'expires_in': self.expires_in,
'access_token': body['access_token'],
'scope': 'all of them',
}
self.assertEqual(body, token)
def test_missing_type(self):
_, body, _ = self.endpoint.create_token_response('', body='client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&code=abc')
token = {'error': 'unsupported_grant_type'}
self.assertEqual(json.loads(body), token)
def test_invalid_type(self):
body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=invalid&code=abc'
_, body, _ = self.endpoint.create_token_response('', body=body)
token = {'error': 'unsupported_grant_type'}
self.assertEqual(json.loads(body), token)
class ResourceEndpointTest(TestCase):
def setUp(self):
self.mock_validator = mock.MagicMock()
self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
token = tokens.BearerToken(request_validator=self.mock_validator)
self.endpoint = ResourceEndpoint(
default_token='Bearer',
token_types={'Bearer': token}
)
def test_defaults(self):
uri = 'http://a.b/path?some=query'
self.mock_validator.validate_bearer_token.return_value = False
valid, request = self.endpoint.verify_request(uri)
self.assertFalse(valid)
self.assertEqual(request.token_type, 'Bearer')