diff options
author | alexv-smirnov <alex@ydb.tech> | 2023-12-01 12:02:50 +0300 |
---|---|---|
committer | alexv-smirnov <alex@ydb.tech> | 2023-12-01 13:28:10 +0300 |
commit | 0e578a4c44d4abd539d9838347b9ebafaca41dfb (patch) | |
tree | a0c1969c37f818c830ebeff9c077eacf30be6ef8 /contrib/python/oauthlib/tests/oauth2 | |
parent | 84f2d3d4cc985e63217cff149bd2e6d67ae6fe22 (diff) | |
download | ydb-0e578a4c44d4abd539d9838347b9ebafaca41dfb.tar.gz |
Change "ya.make"
Diffstat (limited to 'contrib/python/oauthlib/tests/oauth2')
35 files changed, 4813 insertions, 0 deletions
diff --git a/contrib/python/oauthlib/tests/oauth2/__init__.py b/contrib/python/oauthlib/tests/oauth2/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/__init__.py diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/__init__.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/__init__.py diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/__init__.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/__init__.py diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_backend_application.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_backend_application.py new file mode 100644 index 0000000000..c1489ac7c6 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_backend_application.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +import os +from unittest.mock import patch + +from oauthlib import signals +from oauthlib.oauth2 import BackendApplicationClient + +from tests.unittest import TestCase + + +@patch('time.time', new=lambda: 1000) +class BackendApplicationClientTest(TestCase): + + client_id = "someclientid" + client_secret = 'someclientsecret' + scope = ["/profile"] + kwargs = { + "some": "providers", + "require": "extra arguments" + } + + body = "not=empty" + + body_up = "not=empty&grant_type=client_credentials" + body_kwargs = body_up + "&some=providers&require=extra+arguments" + + token_json = ('{ "access_token":"2YotnFZFEjr1zCsicMWpAA",' + ' "token_type":"example",' + ' "expires_in":3600,' + ' "scope":"/profile",' + ' "example_parameter":"example_value"}') + token = { + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "expires_at": 4600, + "scope": ["/profile"], + "example_parameter": "example_value" + } + + def test_request_body(self): + client = BackendApplicationClient(self.client_id) + + # Basic, no extra arguments + body = client.prepare_request_body(body=self.body) + self.assertFormBodyEqual(body, self.body_up) + + rclient = BackendApplicationClient(self.client_id) + body = rclient.prepare_request_body(body=self.body) + self.assertFormBodyEqual(body, self.body_up) + + # With extra parameters + body = client.prepare_request_body(body=self.body, **self.kwargs) + self.assertFormBodyEqual(body, self.body_kwargs) + + def test_parse_token_response(self): + client = BackendApplicationClient(self.client_id) + + # Parse code and state + response = client.parse_request_body_response(self.token_json, scope=self.scope) + self.assertEqual(response, self.token) + self.assertEqual(client.access_token, response.get("access_token")) + self.assertEqual(client.refresh_token, response.get("refresh_token")) + self.assertEqual(client.token_type, response.get("token_type")) + + # Mismatching state + self.assertRaises(Warning, client.parse_request_body_response, self.token_json, scope="invalid") + os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '3' + token = client.parse_request_body_response(self.token_json, scope="invalid") + self.assertTrue(token.scope_changed) + + scope_changes_recorded = [] + def record_scope_change(sender, message, old, new): + scope_changes_recorded.append((message, old, new)) + + signals.scope_changed.connect(record_scope_change) + try: + client.parse_request_body_response(self.token_json, scope="invalid") + self.assertEqual(len(scope_changes_recorded), 1) + message, old, new = scope_changes_recorded[0] + self.assertEqual(message, 'Scope has changed from "invalid" to "/profile".') + self.assertEqual(old, ['invalid']) + self.assertEqual(new, ['/profile']) + finally: + signals.scope_changed.disconnect(record_scope_change) + del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_base.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_base.py new file mode 100644 index 0000000000..70a22834c3 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_base.py @@ -0,0 +1,355 @@ +# -*- coding: utf-8 -*- +import datetime + +from oauthlib import common +from oauthlib.oauth2 import Client, InsecureTransportError, TokenExpiredError +from oauthlib.oauth2.rfc6749 import utils +from oauthlib.oauth2.rfc6749.clients import AUTH_HEADER, BODY, URI_QUERY + +from tests.unittest import TestCase + + +class ClientTest(TestCase): + + client_id = "someclientid" + uri = "https://example.com/path?query=world" + body = "not=empty" + headers = {} + access_token = "token" + mac_key = "secret" + + bearer_query = uri + "&access_token=" + access_token + bearer_header = { + "Authorization": "Bearer " + access_token + } + bearer_body = body + "&access_token=" + access_token + + mac_00_header = { + "Authorization": 'MAC id="' + access_token + '", nonce="0:abc123",' + + ' bodyhash="Yqyso8r3hR5Nm1ZFv+6AvNHrxjE=",' + + ' mac="0X6aACoBY0G6xgGZVJ1IeE8dF9k="' + } + mac_01_header = { + "Authorization": 'MAC id="' + access_token + '", ts="123456789",' + + ' nonce="abc123", mac="Xuk+9oqaaKyhitkgh1CD0xrI6+s="' + } + + def test_add_bearer_token(self): + """Test a number of bearer token placements""" + + # Invalid token type + client = Client(self.client_id, token_type="invalid") + self.assertRaises(ValueError, client.add_token, self.uri) + + # Case-insensitive token type + client = Client(self.client_id, access_token=self.access_token, token_type="bEAreR") + uri, headers, body = client.add_token(self.uri, body=self.body, + headers=self.headers) + self.assertURLEqual(uri, self.uri) + self.assertFormBodyEqual(body, self.body) + self.assertEqual(headers, self.bearer_header) + + # Non-HTTPS + insecure_uri = 'http://example.com/path?query=world' + client = Client(self.client_id, access_token=self.access_token, token_type="Bearer") + self.assertRaises(InsecureTransportError, client.add_token, insecure_uri, + body=self.body, + headers=self.headers) + + # Missing access token + client = Client(self.client_id) + self.assertRaises(ValueError, client.add_token, self.uri) + + # Expired token + expired = 523549800 + expired_token = { + 'expires_at': expired, + } + client = Client(self.client_id, token=expired_token, access_token=self.access_token, token_type="Bearer") + self.assertRaises(TokenExpiredError, client.add_token, self.uri, + body=self.body, headers=self.headers) + + # The default token placement, bearer in auth header + client = Client(self.client_id, access_token=self.access_token) + uri, headers, body = client.add_token(self.uri, body=self.body, + headers=self.headers) + self.assertURLEqual(uri, self.uri) + self.assertFormBodyEqual(body, self.body) + self.assertEqual(headers, self.bearer_header) + + # Setting default placements of tokens + client = Client(self.client_id, access_token=self.access_token, + default_token_placement=AUTH_HEADER) + uri, headers, body = client.add_token(self.uri, body=self.body, + headers=self.headers) + self.assertURLEqual(uri, self.uri) + self.assertFormBodyEqual(body, self.body) + self.assertEqual(headers, self.bearer_header) + + client = Client(self.client_id, access_token=self.access_token, + default_token_placement=URI_QUERY) + uri, headers, body = client.add_token(self.uri, body=self.body, + headers=self.headers) + self.assertURLEqual(uri, self.bearer_query) + self.assertFormBodyEqual(body, self.body) + self.assertEqual(headers, self.headers) + + client = Client(self.client_id, access_token=self.access_token, + default_token_placement=BODY) + uri, headers, body = client.add_token(self.uri, body=self.body, + headers=self.headers) + self.assertURLEqual(uri, self.uri) + self.assertFormBodyEqual(body, self.bearer_body) + self.assertEqual(headers, self.headers) + + # Asking for specific placement in the add_token method + client = Client(self.client_id, access_token=self.access_token) + uri, headers, body = client.add_token(self.uri, body=self.body, + headers=self.headers, token_placement=AUTH_HEADER) + self.assertURLEqual(uri, self.uri) + self.assertFormBodyEqual(body, self.body) + self.assertEqual(headers, self.bearer_header) + + client = Client(self.client_id, access_token=self.access_token) + uri, headers, body = client.add_token(self.uri, body=self.body, + headers=self.headers, token_placement=URI_QUERY) + self.assertURLEqual(uri, self.bearer_query) + self.assertFormBodyEqual(body, self.body) + self.assertEqual(headers, self.headers) + + client = Client(self.client_id, access_token=self.access_token) + uri, headers, body = client.add_token(self.uri, body=self.body, + headers=self.headers, token_placement=BODY) + self.assertURLEqual(uri, self.uri) + self.assertFormBodyEqual(body, self.bearer_body) + self.assertEqual(headers, self.headers) + + # Invalid token placement + client = Client(self.client_id, access_token=self.access_token) + self.assertRaises(ValueError, client.add_token, self.uri, body=self.body, + headers=self.headers, token_placement="invalid") + + client = Client(self.client_id, access_token=self.access_token, + default_token_placement="invalid") + self.assertRaises(ValueError, client.add_token, self.uri, body=self.body, + headers=self.headers) + + def test_add_mac_token(self): + # Missing access token + client = Client(self.client_id, token_type="MAC") + self.assertRaises(ValueError, client.add_token, self.uri) + + # Invalid hash algorithm + client = Client(self.client_id, token_type="MAC", + access_token=self.access_token, mac_key=self.mac_key, + mac_algorithm="hmac-sha-2") + self.assertRaises(ValueError, client.add_token, self.uri) + + orig_generate_timestamp = common.generate_timestamp + orig_generate_nonce = common.generate_nonce + orig_generate_age = utils.generate_age + self.addCleanup(setattr, common, 'generage_timestamp', orig_generate_timestamp) + self.addCleanup(setattr, common, 'generage_nonce', orig_generate_nonce) + self.addCleanup(setattr, utils, 'generate_age', orig_generate_age) + common.generate_timestamp = lambda: '123456789' + common.generate_nonce = lambda: 'abc123' + utils.generate_age = lambda *args: 0 + + # Add the Authorization header (draft 00) + client = Client(self.client_id, token_type="MAC", + access_token=self.access_token, mac_key=self.mac_key, + mac_algorithm="hmac-sha-1") + uri, headers, body = client.add_token(self.uri, body=self.body, + headers=self.headers, issue_time=datetime.datetime.now()) + self.assertEqual(uri, self.uri) + self.assertEqual(body, self.body) + self.assertEqual(headers, self.mac_00_header) + # Non-HTTPS + insecure_uri = 'http://example.com/path?query=world' + self.assertRaises(InsecureTransportError, client.add_token, insecure_uri, + body=self.body, + headers=self.headers, + issue_time=datetime.datetime.now()) + # Expired Token + expired = 523549800 + expired_token = { + 'expires_at': expired, + } + client = Client(self.client_id, token=expired_token, token_type="MAC", + access_token=self.access_token, mac_key=self.mac_key, + mac_algorithm="hmac-sha-1") + self.assertRaises(TokenExpiredError, client.add_token, self.uri, + body=self.body, + headers=self.headers, + issue_time=datetime.datetime.now()) + + # Add the Authorization header (draft 01) + client = Client(self.client_id, token_type="MAC", + access_token=self.access_token, mac_key=self.mac_key, + mac_algorithm="hmac-sha-1") + uri, headers, body = client.add_token(self.uri, body=self.body, + headers=self.headers, draft=1) + self.assertEqual(uri, self.uri) + self.assertEqual(body, self.body) + self.assertEqual(headers, self.mac_01_header) + # Non-HTTPS + insecure_uri = 'http://example.com/path?query=world' + self.assertRaises(InsecureTransportError, client.add_token, insecure_uri, + body=self.body, + headers=self.headers, + draft=1) + # Expired Token + expired = 523549800 + expired_token = { + 'expires_at': expired, + } + client = Client(self.client_id, token=expired_token, token_type="MAC", + access_token=self.access_token, mac_key=self.mac_key, + mac_algorithm="hmac-sha-1") + self.assertRaises(TokenExpiredError, client.add_token, self.uri, + body=self.body, + headers=self.headers, + draft=1) + + def test_revocation_request(self): + client = Client(self.client_id) + + url = 'https://example.com/revoke' + token = 'foobar' + + # Valid request + u, h, b = client.prepare_token_revocation_request(url, token) + self.assertEqual(u, url) + self.assertEqual(h, {'Content-Type': 'application/x-www-form-urlencoded'}) + self.assertEqual(b, 'token=%s&token_type_hint=access_token' % token) + + # Non-HTTPS revocation endpoint + self.assertRaises(InsecureTransportError, + client.prepare_token_revocation_request, + 'http://example.com/revoke', token) + + + u, h, b = client.prepare_token_revocation_request( + url, token, token_type_hint='refresh_token') + self.assertEqual(u, url) + self.assertEqual(h, {'Content-Type': 'application/x-www-form-urlencoded'}) + self.assertEqual(b, 'token=%s&token_type_hint=refresh_token' % token) + + # JSONP + u, h, b = client.prepare_token_revocation_request( + url, token, callback='hello.world') + self.assertURLEqual(u, url + '?callback=hello.world&token=%s&token_type_hint=access_token' % token) + self.assertEqual(h, {'Content-Type': 'application/x-www-form-urlencoded'}) + self.assertEqual(b, '') + + def test_prepare_authorization_request(self): + redirect_url = 'https://example.com/callback/' + scopes = 'read' + auth_url = 'https://example.com/authorize/' + state = 'fake_state' + + client = Client(self.client_id, redirect_url=redirect_url, scope=scopes, state=state) + + # Non-HTTPS + self.assertRaises(InsecureTransportError, + client.prepare_authorization_request, 'http://example.com/authorize/') + + # NotImplementedError + self.assertRaises(NotImplementedError, client.prepare_authorization_request, auth_url) + + def test_prepare_token_request(self): + redirect_url = 'https://example.com/callback/' + scopes = 'read' + token_url = 'https://example.com/token/' + state = 'fake_state' + + client = Client(self.client_id, scope=scopes, state=state) + + # Non-HTTPS + self.assertRaises(InsecureTransportError, + client.prepare_token_request, 'http://example.com/token/') + + # NotImplementedError + self.assertRaises(NotImplementedError, client.prepare_token_request, token_url) + + def test_prepare_refresh_token_request(self): + client = Client(self.client_id) + + url = 'https://example.com/revoke' + token = 'foobar' + scope = 'extra_scope' + + u, h, b = client.prepare_refresh_token_request(url, token) + self.assertEqual(u, url) + self.assertEqual(h, {'Content-Type': 'application/x-www-form-urlencoded'}) + self.assertFormBodyEqual(b, 'grant_type=refresh_token&refresh_token=%s' % token) + + # Non-HTTPS revocation endpoint + self.assertRaises(InsecureTransportError, + client.prepare_refresh_token_request, + 'http://example.com/revoke', token) + + # provide extra scope + u, h, b = client.prepare_refresh_token_request(url, token, scope=scope) + self.assertEqual(u, url) + self.assertEqual(h, {'Content-Type': 'application/x-www-form-urlencoded'}) + self.assertFormBodyEqual(b, 'grant_type=refresh_token&scope={}&refresh_token={}'.format(scope, token)) + + # provide scope while init + client = Client(self.client_id, scope=scope) + u, h, b = client.prepare_refresh_token_request(url, token, scope=scope) + self.assertEqual(u, url) + self.assertEqual(h, {'Content-Type': 'application/x-www-form-urlencoded'}) + self.assertFormBodyEqual(b, 'grant_type=refresh_token&scope={}&refresh_token={}'.format(scope, token)) + + def test_parse_token_response_invalid_expires_at(self): + token_json = ('{ "access_token":"2YotnFZFEjr1zCsicMWpAA",' + ' "token_type":"example",' + ' "expires_at":"2006-01-02T15:04:05Z",' + ' "scope":"/profile",' + ' "example_parameter":"example_value"}') + token = { + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_at": "2006-01-02T15:04:05Z", + "scope": ["/profile"], + "example_parameter": "example_value" + } + + client = Client(self.client_id) + + # Parse code and state + response = client.parse_request_body_response(token_json, scope=["/profile"]) + self.assertEqual(response, token) + self.assertEqual(None, client._expires_at) + self.assertEqual(client.access_token, response.get("access_token")) + self.assertEqual(client.refresh_token, response.get("refresh_token")) + self.assertEqual(client.token_type, response.get("token_type")) + + + def test_create_code_verifier_min_length(self): + client = Client(self.client_id) + length = 43 + code_verifier = client.create_code_verifier(length=length) + self.assertEqual(client.code_verifier, code_verifier) + + def test_create_code_verifier_max_length(self): + client = Client(self.client_id) + length = 128 + code_verifier = client.create_code_verifier(length=length) + self.assertEqual(client.code_verifier, code_verifier) + + def test_create_code_challenge_plain(self): + client = Client(self.client_id) + code_verifier = client.create_code_verifier(length=128) + code_challenge_plain = client.create_code_challenge(code_verifier=code_verifier) + + # if no code_challenge_method specified, code_challenge = code_verifier + self.assertEqual(code_challenge_plain, client.code_verifier) + self.assertEqual(client.code_challenge_method, "plain") + + def test_create_code_challenge_s256(self): + client = Client(self.client_id) + code_verifier = client.create_code_verifier(length=128) + code_challenge_s256 = client.create_code_challenge(code_verifier=code_verifier, code_challenge_method='S256') + self.assertEqual(code_challenge_s256, client.code_challenge) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_legacy_application.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_legacy_application.py new file mode 100644 index 0000000000..b5a18194b7 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_legacy_application.py @@ -0,0 +1,140 @@ +# -*- coding: utf-8 -*- +import os +import urllib.parse as urlparse +from unittest.mock import patch + +from oauthlib import signals +from oauthlib.oauth2 import LegacyApplicationClient + +from tests.unittest import TestCase + + +@patch('time.time', new=lambda: 1000) +class LegacyApplicationClientTest(TestCase): + + client_id = "someclientid" + client_secret = 'someclientsecret' + scope = ["/profile"] + kwargs = { + "some": "providers", + "require": "extra arguments" + } + + username = "user_username" + password = "user_password" + body = "not=empty" + + body_up = "not=empty&grant_type=password&username={}&password={}".format(username, password) + body_kwargs = body_up + "&some=providers&require=extra+arguments" + + token_json = ('{ "access_token":"2YotnFZFEjr1zCsicMWpAA",' + ' "token_type":"example",' + ' "expires_in":3600,' + ' "scope":"/profile",' + ' "refresh_token":"tGzv3JOkF0XG5Qx2TlKWIA",' + ' "example_parameter":"example_value"}') + token = { + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "expires_at": 4600, + "scope": scope, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "example_parameter": "example_value" + } + + def test_request_body(self): + client = LegacyApplicationClient(self.client_id) + + # Basic, no extra arguments + body = client.prepare_request_body(self.username, self.password, + body=self.body) + self.assertFormBodyEqual(body, self.body_up) + + # With extra parameters + body = client.prepare_request_body(self.username, self.password, + body=self.body, **self.kwargs) + self.assertFormBodyEqual(body, self.body_kwargs) + + def test_parse_token_response(self): + client = LegacyApplicationClient(self.client_id) + + # Parse code and state + response = client.parse_request_body_response(self.token_json, scope=self.scope) + self.assertEqual(response, self.token) + self.assertEqual(client.access_token, response.get("access_token")) + self.assertEqual(client.refresh_token, response.get("refresh_token")) + self.assertEqual(client.token_type, response.get("token_type")) + + # Mismatching state + self.assertRaises(Warning, client.parse_request_body_response, self.token_json, scope="invalid") + os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '5' + token = client.parse_request_body_response(self.token_json, scope="invalid") + self.assertTrue(token.scope_changed) + + scope_changes_recorded = [] + def record_scope_change(sender, message, old, new): + scope_changes_recorded.append((message, old, new)) + + signals.scope_changed.connect(record_scope_change) + try: + client.parse_request_body_response(self.token_json, scope="invalid") + self.assertEqual(len(scope_changes_recorded), 1) + message, old, new = scope_changes_recorded[0] + self.assertEqual(message, 'Scope has changed from "invalid" to "/profile".') + self.assertEqual(old, ['invalid']) + self.assertEqual(new, ['/profile']) + finally: + signals.scope_changed.disconnect(record_scope_change) + del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] + + def test_prepare_request_body(self): + """ + see issue #585 + https://github.com/oauthlib/oauthlib/issues/585 + """ + client = LegacyApplicationClient(self.client_id) + + # scenario 1, default behavior to not include `client_id` + r1 = client.prepare_request_body(username=self.username, password=self.password) + self.assertIn(r1, ('grant_type=password&username={}&password={}'.format(self.username, self.password), + 'grant_type=password&password={}&username={}'.format(self.password, self.username), + )) + + # scenario 2, include `client_id` in the body + r2 = client.prepare_request_body(username=self.username, password=self.password, include_client_id=True) + r2_params = dict(urlparse.parse_qsl(r2, keep_blank_values=True)) + self.assertEqual(len(r2_params.keys()), 4) + self.assertEqual(r2_params['grant_type'], 'password') + self.assertEqual(r2_params['username'], self.username) + self.assertEqual(r2_params['password'], self.password) + self.assertEqual(r2_params['client_id'], self.client_id) + + # scenario 3, include `client_id` + `client_secret` in the body + r3 = client.prepare_request_body(username=self.username, password=self.password, include_client_id=True, client_secret=self.client_secret) + r3_params = dict(urlparse.parse_qsl(r3, keep_blank_values=True)) + self.assertEqual(len(r3_params.keys()), 5) + self.assertEqual(r3_params['grant_type'], 'password') + self.assertEqual(r3_params['username'], self.username) + self.assertEqual(r3_params['password'], self.password) + self.assertEqual(r3_params['client_id'], self.client_id) + self.assertEqual(r3_params['client_secret'], self.client_secret) + + # scenario 4, `client_secret` is an empty string + r4 = client.prepare_request_body(username=self.username, password=self.password, include_client_id=True, client_secret='') + r4_params = dict(urlparse.parse_qsl(r4, keep_blank_values=True)) + self.assertEqual(len(r4_params.keys()), 5) + self.assertEqual(r4_params['grant_type'], 'password') + self.assertEqual(r4_params['username'], self.username) + self.assertEqual(r4_params['password'], self.password) + self.assertEqual(r4_params['client_id'], self.client_id) + self.assertEqual(r4_params['client_secret'], '') + + # scenario 4b`,` client_secret is `None` + r4b = client.prepare_request_body(username=self.username, password=self.password, include_client_id=True, client_secret=None) + r4b_params = dict(urlparse.parse_qsl(r4b, keep_blank_values=True)) + self.assertEqual(len(r4b_params.keys()), 4) + self.assertEqual(r4b_params['grant_type'], 'password') + self.assertEqual(r4b_params['username'], self.username) + self.assertEqual(r4b_params['password'], self.password) + self.assertEqual(r4b_params['client_id'], self.client_id) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_mobile_application.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_mobile_application.py new file mode 100644 index 0000000000..c40950c978 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_mobile_application.py @@ -0,0 +1,111 @@ +# -*- coding: utf-8 -*- +import os +from unittest.mock import patch + +from oauthlib import signals +from oauthlib.oauth2 import MobileApplicationClient + +from tests.unittest import TestCase + + +@patch('time.time', new=lambda: 1000) +class MobileApplicationClientTest(TestCase): + + client_id = "someclientid" + uri = "https://example.com/path?query=world" + uri_id = uri + "&response_type=token&client_id=" + client_id + uri_redirect = uri_id + "&redirect_uri=http%3A%2F%2Fmy.page.com%2Fcallback" + redirect_uri = "http://my.page.com/callback" + scope = ["/profile"] + state = "xyz" + uri_scope = uri_id + "&scope=%2Fprofile" + uri_state = uri_id + "&state=" + state + kwargs = { + "some": "providers", + "require": "extra arguments" + } + uri_kwargs = uri_id + "&some=providers&require=extra+arguments" + + code = "zzzzaaaa" + + response_uri = ('https://client.example.com/cb?#' + 'access_token=2YotnFZFEjr1zCsicMWpAA&' + 'token_type=example&' + 'expires_in=3600&' + 'scope=%2Fprofile&' + 'example_parameter=example_value') + token = { + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "expires_at": 4600, + "scope": scope, + "example_parameter": "example_value" + } + + def test_implicit_token_uri(self): + client = MobileApplicationClient(self.client_id) + + # Basic, no extra arguments + uri = client.prepare_request_uri(self.uri) + self.assertURLEqual(uri, self.uri_id) + + # With redirection uri + uri = client.prepare_request_uri(self.uri, redirect_uri=self.redirect_uri) + self.assertURLEqual(uri, self.uri_redirect) + + # With scope + uri = client.prepare_request_uri(self.uri, scope=self.scope) + self.assertURLEqual(uri, self.uri_scope) + + # With state + uri = client.prepare_request_uri(self.uri, state=self.state) + self.assertURLEqual(uri, self.uri_state) + + # With extra parameters through kwargs + uri = client.prepare_request_uri(self.uri, **self.kwargs) + self.assertURLEqual(uri, self.uri_kwargs) + + def test_populate_attributes(self): + + client = MobileApplicationClient(self.client_id) + + response_uri = (self.response_uri + "&code=EVIL-CODE") + + client.parse_request_uri_response(response_uri, scope=self.scope) + + # We must not accidentally pick up any further security + # credentials at this point. + self.assertIsNone(client.code) + + def test_parse_token_response(self): + client = MobileApplicationClient(self.client_id) + + # Parse code and state + response = client.parse_request_uri_response(self.response_uri, scope=self.scope) + self.assertEqual(response, self.token) + self.assertEqual(client.access_token, response.get("access_token")) + self.assertEqual(client.refresh_token, response.get("refresh_token")) + self.assertEqual(client.token_type, response.get("token_type")) + + # Mismatching scope + self.assertRaises(Warning, client.parse_request_uri_response, self.response_uri, scope="invalid") + os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '4' + token = client.parse_request_uri_response(self.response_uri, scope='invalid') + self.assertTrue(token.scope_changed) + + scope_changes_recorded = [] + def record_scope_change(sender, message, old, new): + scope_changes_recorded.append((message, old, new)) + + signals.scope_changed.connect(record_scope_change) + try: + client.parse_request_uri_response(self.response_uri, scope="invalid") + self.assertEqual(len(scope_changes_recorded), 1) + message, old, new = scope_changes_recorded[0] + self.assertEqual(message, 'Scope has changed from "invalid" to "/profile".') + self.assertEqual(old, ['invalid']) + self.assertEqual(new, ['/profile']) + finally: + signals.scope_changed.disconnect(record_scope_change) + del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_service_application.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_service_application.py new file mode 100644 index 0000000000..b97d8554ed --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_service_application.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- +import os +from time import time +from unittest.mock import patch + +import jwt + +from oauthlib.common import Request +from oauthlib.oauth2 import ServiceApplicationClient + +from tests.unittest import TestCase + + +class ServiceApplicationClientTest(TestCase): + + gt = ServiceApplicationClient.grant_type + + private_key = """ +-----BEGIN RSA PRIVATE KEY----- +MIICXgIBAAKBgQDk1/bxyS8Q8jiheHeYYp/4rEKJopeQRRKKpZI4s5i+UPwVpupG +AlwXWfzXwSMaKPAoKJNdu7tqKRniqst5uoHXw98gj0x7zamu0Ck1LtQ4c7pFMVah +5IYGhBi2E9ycNS329W27nJPWNCbESTu7snVlG8V8mfvGGg3xNjTMO7IdrwIDAQAB +AoGBAOQ2KuH8S5+OrsL4K+wfjoCi6MfxCUyqVU9GxocdM1m30WyWRFMEz2nKJ8fR +p3vTD4w8yplTOhcoXdQZl0kRoaDzrcYkm2VvJtQRrX7dKFT8dR8D/Tr7dNQLOXfC +DY6xveQczE7qt7Vk7lp4FqmxBsaaEuokt78pOOjywZoInjZhAkEA9wz3zoZNT0/i +rf6qv2qTIeieUB035N3dyw6f1BGSWYaXSuerDCD/J1qZbAPKKhyHZbVawFt3UMhe +542UftBaxQJBAO0iJy1I8GQjGnS7B3yvyH3CcLYGy296+XO/2xKp/d/ty1OIeovx +C60pLNwuFNF3z9d2GVQAdoQ89hUkOtjZLeMCQQD0JO6oPHUeUjYT+T7ImAv7UKVT +Suy30sKjLzqoGw1kR+wv7C5PeDRvscs4wa4CW9s6mjSrMDkDrmCLuJDtmf55AkEA +kmaMg2PNrjUR51F0zOEFycaaqXbGcFwe1/xx9zLmHzMDXd4bsnwt9kk+fe0hQzVS +JzatanQit3+feev1PN3QewJAWv4RZeavEUhKv+kLe95Yd0su7lTLVduVgh4v5yLT +Ga6FHdjGPcfajt+nrpB1n8UQBEH9ZxniokR/IPvdMlxqXA== +-----END RSA PRIVATE KEY----- +""" + + public_key = """ +-----BEGIN PUBLIC KEY----- +MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDk1/bxyS8Q8jiheHeYYp/4rEKJ +opeQRRKKpZI4s5i+UPwVpupGAlwXWfzXwSMaKPAoKJNdu7tqKRniqst5uoHXw98g +j0x7zamu0Ck1LtQ4c7pFMVah5IYGhBi2E9ycNS329W27nJPWNCbESTu7snVlG8V8 +mfvGGg3xNjTMO7IdrwIDAQAB +-----END PUBLIC KEY----- +""" + + subject = 'resource-owner@provider.com' + + issuer = 'the-client@provider.com' + + audience = 'https://provider.com/token' + + client_id = "someclientid" + scope = ["/profile"] + kwargs = { + "some": "providers", + "require": "extra arguments" + } + + body = "isnot=empty" + + body_up = "not=empty&grant_type=%s" % gt + body_kwargs = body_up + "&some=providers&require=extra+arguments" + + token_json = ('{ "access_token":"2YotnFZFEjr1zCsicMWpAA",' + ' "token_type":"example",' + ' "expires_in":3600,' + ' "scope":"/profile",' + ' "example_parameter":"example_value"}') + token = { + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "scope": ["/profile"], + "example_parameter": "example_value" + } + + @patch('time.time') + def test_request_body(self, t): + t.return_value = time() + self.token['expires_at'] = self.token['expires_in'] + t.return_value + + client = ServiceApplicationClient( + self.client_id, private_key=self.private_key) + + # Basic with min required params + body = client.prepare_request_body(issuer=self.issuer, + subject=self.subject, + audience=self.audience, + body=self.body) + r = Request('https://a.b', body=body) + self.assertEqual(r.isnot, 'empty') + self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type) + + claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256']) + + self.assertEqual(claim['iss'], self.issuer) + # audience verification is handled during decode now + self.assertEqual(claim['sub'], self.subject) + self.assertEqual(claim['iat'], int(t.return_value)) + self.assertNotIn('nbf', claim) + self.assertNotIn('jti', claim) + + # Missing issuer parameter + self.assertRaises(ValueError, client.prepare_request_body, + issuer=None, subject=self.subject, audience=self.audience, body=self.body) + + # Missing subject parameter + self.assertRaises(ValueError, client.prepare_request_body, + issuer=self.issuer, subject=None, audience=self.audience, body=self.body) + + # Missing audience parameter + self.assertRaises(ValueError, client.prepare_request_body, + issuer=self.issuer, subject=self.subject, audience=None, body=self.body) + + # Optional kwargs + not_before = time() - 3600 + jwt_id = '8zd15df4s35f43sd' + body = client.prepare_request_body(issuer=self.issuer, + subject=self.subject, + audience=self.audience, + body=self.body, + not_before=not_before, + jwt_id=jwt_id) + + r = Request('https://a.b', body=body) + self.assertEqual(r.isnot, 'empty') + self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type) + + claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256']) + + self.assertEqual(claim['iss'], self.issuer) + # audience verification is handled during decode now + self.assertEqual(claim['sub'], self.subject) + self.assertEqual(claim['iat'], int(t.return_value)) + self.assertEqual(claim['nbf'], not_before) + self.assertEqual(claim['jti'], jwt_id) + + @patch('time.time') + def test_request_body_no_initial_private_key(self, t): + t.return_value = time() + self.token['expires_at'] = self.token['expires_in'] + t.return_value + + client = ServiceApplicationClient( + self.client_id, private_key=None) + + # Basic with private key provided + body = client.prepare_request_body(issuer=self.issuer, + subject=self.subject, + audience=self.audience, + body=self.body, + private_key=self.private_key) + r = Request('https://a.b', body=body) + self.assertEqual(r.isnot, 'empty') + self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type) + + claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256']) + + self.assertEqual(claim['iss'], self.issuer) + # audience verification is handled during decode now + self.assertEqual(claim['sub'], self.subject) + self.assertEqual(claim['iat'], int(t.return_value)) + + # No private key provided + self.assertRaises(ValueError, client.prepare_request_body, + issuer=self.issuer, subject=self.subject, audience=self.audience, body=self.body) + + @patch('time.time') + def test_parse_token_response(self, t): + t.return_value = time() + self.token['expires_at'] = self.token['expires_in'] + t.return_value + + client = ServiceApplicationClient(self.client_id) + + # Parse code and state + response = client.parse_request_body_response(self.token_json, scope=self.scope) + self.assertEqual(response, self.token) + self.assertEqual(client.access_token, response.get("access_token")) + self.assertEqual(client.refresh_token, response.get("refresh_token")) + self.assertEqual(client.token_type, response.get("token_type")) + + # Mismatching state + self.assertRaises(Warning, client.parse_request_body_response, self.token_json, scope="invalid") + os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '2' + token = client.parse_request_body_response(self.token_json, scope="invalid") + self.assertTrue(token.scope_changed) + del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_web_application.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_web_application.py new file mode 100644 index 0000000000..7a71121512 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_web_application.py @@ -0,0 +1,269 @@ +# -*- coding: utf-8 -*- +import os +import urllib.parse as urlparse +import warnings +from unittest.mock import patch + +from oauthlib import common, signals +from oauthlib.oauth2 import ( + BackendApplicationClient, Client, LegacyApplicationClient, + MobileApplicationClient, WebApplicationClient, +) +from oauthlib.oauth2.rfc6749 import errors, utils +from oauthlib.oauth2.rfc6749.clients import AUTH_HEADER, BODY, URI_QUERY + +from tests.unittest import TestCase + + +@patch('time.time', new=lambda: 1000) +class WebApplicationClientTest(TestCase): + + client_id = "someclientid" + client_secret = 'someclientsecret' + uri = "https://example.com/path?query=world" + uri_id = uri + "&response_type=code&client_id=" + client_id + uri_redirect = uri_id + "&redirect_uri=http%3A%2F%2Fmy.page.com%2Fcallback" + redirect_uri = "http://my.page.com/callback" + code_verifier = "code_verifier" + scope = ["/profile"] + state = "xyz" + code_challenge = "code_challenge" + code_challenge_method = "S256" + uri_scope = uri_id + "&scope=%2Fprofile" + uri_state = uri_id + "&state=" + state + uri_code_challenge = uri_id + "&code_challenge=" + code_challenge + "&code_challenge_method=" + code_challenge_method + uri_code_challenge_method = uri_id + "&code_challenge=" + code_challenge + "&code_challenge_method=plain" + kwargs = { + "some": "providers", + "require": "extra arguments" + } + uri_kwargs = uri_id + "&some=providers&require=extra+arguments" + uri_authorize_code = uri_redirect + "&scope=%2Fprofile&state=" + state + + code = "zzzzaaaa" + body = "not=empty" + + body_code = "not=empty&grant_type=authorization_code&code={}&client_id={}".format(code, client_id) + body_redirect = body_code + "&redirect_uri=http%3A%2F%2Fmy.page.com%2Fcallback" + body_code_verifier = body_code + "&code_verifier=code_verifier" + body_kwargs = body_code + "&some=providers&require=extra+arguments" + + response_uri = "https://client.example.com/cb?code=zzzzaaaa&state=xyz" + response = {"code": "zzzzaaaa", "state": "xyz"} + + token_json = ('{ "access_token":"2YotnFZFEjr1zCsicMWpAA",' + ' "token_type":"example",' + ' "expires_in":3600,' + ' "scope":"/profile",' + ' "refresh_token":"tGzv3JOkF0XG5Qx2TlKWIA",' + ' "example_parameter":"example_value"}') + token = { + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "expires_at": 4600, + "scope": scope, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "example_parameter": "example_value" + } + + def test_auth_grant_uri(self): + client = WebApplicationClient(self.client_id) + + # Basic, no extra arguments + uri = client.prepare_request_uri(self.uri) + self.assertURLEqual(uri, self.uri_id) + + # With redirection uri + uri = client.prepare_request_uri(self.uri, redirect_uri=self.redirect_uri) + self.assertURLEqual(uri, self.uri_redirect) + + # With scope + uri = client.prepare_request_uri(self.uri, scope=self.scope) + self.assertURLEqual(uri, self.uri_scope) + + # With state + uri = client.prepare_request_uri(self.uri, state=self.state) + self.assertURLEqual(uri, self.uri_state) + + # with code_challenge and code_challenge_method + uri = client.prepare_request_uri(self.uri, code_challenge=self.code_challenge, code_challenge_method=self.code_challenge_method) + self.assertURLEqual(uri, self.uri_code_challenge) + + # with no code_challenge_method + uri = client.prepare_request_uri(self.uri, code_challenge=self.code_challenge) + self.assertURLEqual(uri, self.uri_code_challenge_method) + + # With extra parameters through kwargs + uri = client.prepare_request_uri(self.uri, **self.kwargs) + self.assertURLEqual(uri, self.uri_kwargs) + + def test_request_body(self): + client = WebApplicationClient(self.client_id, code=self.code) + + # Basic, no extra arguments + body = client.prepare_request_body(body=self.body) + self.assertFormBodyEqual(body, self.body_code) + + rclient = WebApplicationClient(self.client_id) + body = rclient.prepare_request_body(code=self.code, body=self.body) + self.assertFormBodyEqual(body, self.body_code) + + # With redirection uri + body = client.prepare_request_body(body=self.body, redirect_uri=self.redirect_uri) + self.assertFormBodyEqual(body, self.body_redirect) + + # With code verifier + body = client.prepare_request_body(body=self.body, code_verifier=self.code_verifier) + self.assertFormBodyEqual(body, self.body_code_verifier) + + # With extra parameters + body = client.prepare_request_body(body=self.body, **self.kwargs) + self.assertFormBodyEqual(body, self.body_kwargs) + + def test_parse_grant_uri_response(self): + client = WebApplicationClient(self.client_id) + + # Parse code and state + response = client.parse_request_uri_response(self.response_uri, state=self.state) + self.assertEqual(response, self.response) + self.assertEqual(client.code, self.code) + + # Mismatching state + self.assertRaises(errors.MismatchingStateError, + client.parse_request_uri_response, + self.response_uri, + state="invalid") + + def test_populate_attributes(self): + + client = WebApplicationClient(self.client_id) + + response_uri = (self.response_uri + + "&access_token=EVIL-TOKEN" + "&refresh_token=EVIL-TOKEN" + "&mac_key=EVIL-KEY") + + client.parse_request_uri_response(response_uri, self.state) + + self.assertEqual(client.code, self.code) + + # We must not accidentally pick up any further security + # credentials at this point. + self.assertIsNone(client.access_token) + self.assertIsNone(client.refresh_token) + self.assertIsNone(client.mac_key) + + def test_parse_token_response(self): + client = WebApplicationClient(self.client_id) + + # Parse code and state + response = client.parse_request_body_response(self.token_json, scope=self.scope) + self.assertEqual(response, self.token) + self.assertEqual(client.access_token, response.get("access_token")) + self.assertEqual(client.refresh_token, response.get("refresh_token")) + self.assertEqual(client.token_type, response.get("token_type")) + + # Mismatching state + self.assertRaises(Warning, client.parse_request_body_response, self.token_json, scope="invalid") + os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '1' + token = client.parse_request_body_response(self.token_json, scope="invalid") + self.assertTrue(token.scope_changed) + + scope_changes_recorded = [] + def record_scope_change(sender, message, old, new): + scope_changes_recorded.append((message, old, new)) + + signals.scope_changed.connect(record_scope_change) + try: + client.parse_request_body_response(self.token_json, scope="invalid") + self.assertEqual(len(scope_changes_recorded), 1) + message, old, new = scope_changes_recorded[0] + self.assertEqual(message, 'Scope has changed from "invalid" to "/profile".') + self.assertEqual(old, ['invalid']) + self.assertEqual(new, ['/profile']) + finally: + signals.scope_changed.disconnect(record_scope_change) + del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] + + def test_prepare_authorization_requeset(self): + client = WebApplicationClient(self.client_id) + + url, header, body = client.prepare_authorization_request( + self.uri, redirect_url=self.redirect_uri, state=self.state, scope=self.scope) + self.assertURLEqual(url, self.uri_authorize_code) + # verify default header and body only + self.assertEqual(header, {'Content-Type': 'application/x-www-form-urlencoded'}) + self.assertEqual(body, '') + + def test_prepare_request_body(self): + """ + see issue #585 + https://github.com/oauthlib/oauthlib/issues/585 + + `prepare_request_body` should support the following scenarios: + 1. Include client_id alone in the body (default) + 2. Include client_id and client_secret in auth and not include them in the body (RFC preferred solution) + 3. Include client_id and client_secret in the body (RFC alternative solution) + 4. Include client_id in the body and an empty string for client_secret. + """ + client = WebApplicationClient(self.client_id) + + # scenario 1, default behavior to include `client_id` + r1 = client.prepare_request_body() + self.assertEqual(r1, 'grant_type=authorization_code&client_id=%s' % self.client_id) + + r1b = client.prepare_request_body(include_client_id=True) + self.assertEqual(r1b, 'grant_type=authorization_code&client_id=%s' % self.client_id) + + # scenario 2, do not include `client_id` in the body, so it can be sent in auth. + r2 = client.prepare_request_body(include_client_id=False) + self.assertEqual(r2, 'grant_type=authorization_code') + + # scenario 3, Include client_id and client_secret in the body (RFC alternative solution) + # the order of kwargs being appended is not guaranteed. for brevity, check the 2 permutations instead of sorting + r3 = client.prepare_request_body(client_secret=self.client_secret) + r3_params = dict(urlparse.parse_qsl(r3, keep_blank_values=True)) + self.assertEqual(len(r3_params.keys()), 3) + self.assertEqual(r3_params['grant_type'], 'authorization_code') + self.assertEqual(r3_params['client_id'], self.client_id) + self.assertEqual(r3_params['client_secret'], self.client_secret) + + r3b = client.prepare_request_body(include_client_id=True, client_secret=self.client_secret) + r3b_params = dict(urlparse.parse_qsl(r3b, keep_blank_values=True)) + self.assertEqual(len(r3b_params.keys()), 3) + self.assertEqual(r3b_params['grant_type'], 'authorization_code') + self.assertEqual(r3b_params['client_id'], self.client_id) + self.assertEqual(r3b_params['client_secret'], self.client_secret) + + # scenario 4, `client_secret` is an empty string + r4 = client.prepare_request_body(include_client_id=True, client_secret='') + r4_params = dict(urlparse.parse_qsl(r4, keep_blank_values=True)) + self.assertEqual(len(r4_params.keys()), 3) + self.assertEqual(r4_params['grant_type'], 'authorization_code') + self.assertEqual(r4_params['client_id'], self.client_id) + self.assertEqual(r4_params['client_secret'], '') + + # scenario 4b, `client_secret` is `None` + r4b = client.prepare_request_body(include_client_id=True, client_secret=None) + r4b_params = dict(urlparse.parse_qsl(r4b, keep_blank_values=True)) + self.assertEqual(len(r4b_params.keys()), 2) + self.assertEqual(r4b_params['grant_type'], 'authorization_code') + self.assertEqual(r4b_params['client_id'], self.client_id) + + # scenario Warnings + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") # catch all + + # warning1 - raise a DeprecationWarning if a `client_id` is submitted + rWarnings1 = client.prepare_request_body(client_id=self.client_id) + self.assertEqual(len(w), 1) + self.assertIsInstance(w[0].message, DeprecationWarning) + + # testing the exact warning message in Python2&Python3 is a pain + + # scenario Exceptions + # exception1 - raise a ValueError if the a different `client_id` is submitted + with self.assertRaises(ValueError) as cm: + client.prepare_request_body(client_id='different_client_id') + # testing the exact exception message in Python2&Python3 is a pain diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/__init__.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/__init__.py diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_base_endpoint.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_base_endpoint.py new file mode 100644 index 0000000000..b1af6c3306 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_base_endpoint.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +from oauthlib.oauth2 import ( + FatalClientError, OAuth2Error, RequestValidator, Server, +) +from oauthlib.oauth2.rfc6749 import ( + BaseEndpoint, catch_errors_and_unavailability, +) + +from tests.unittest import TestCase + + +class BaseEndpointTest(TestCase): + + def test_default_config(self): + endpoint = BaseEndpoint() + self.assertFalse(endpoint.catch_errors) + self.assertTrue(endpoint.available) + endpoint.catch_errors = True + self.assertTrue(endpoint.catch_errors) + endpoint.available = False + self.assertFalse(endpoint.available) + + def test_error_catching(self): + validator = RequestValidator() + server = Server(validator) + server.catch_errors = True + h, b, s = server.create_token_response( + 'https://example.com', body='grant_type=authorization_code&code=abc' + ) + self.assertIn("server_error", b) + self.assertEqual(s, 500) + + def test_unavailability(self): + validator = RequestValidator() + server = Server(validator) + server.available = False + h, b, s = server.create_authorization_response('https://example.com') + self.assertIn("temporarily_unavailable", b) + self.assertEqual(s, 503) + + def test_wrapper(self): + + class TestServer(Server): + + @catch_errors_and_unavailability + def throw_error(self, uri): + raise ValueError() + + @catch_errors_and_unavailability + def throw_oauth_error(self, uri): + raise OAuth2Error() + + @catch_errors_and_unavailability + def throw_fatal_oauth_error(self, uri): + raise FatalClientError() + + validator = RequestValidator() + server = TestServer(validator) + + server.catch_errors = True + h, b, s = server.throw_error('a') + self.assertIn("server_error", b) + self.assertEqual(s, 500) + + server.available = False + h, b, s = server.throw_error('a') + self.assertIn("temporarily_unavailable", b) + self.assertEqual(s, 503) + + server.available = True + self.assertRaises(OAuth2Error, server.throw_oauth_error, 'a') + self.assertRaises(FatalClientError, server.throw_fatal_oauth_error, 'a') + server.catch_errors = False + self.assertRaises(OAuth2Error, server.throw_oauth_error, 'a') + self.assertRaises(FatalClientError, server.throw_fatal_oauth_error, 'a') diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_client_authentication.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_client_authentication.py new file mode 100644 index 0000000000..0659ee0d25 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_client_authentication.py @@ -0,0 +1,162 @@ +"""Client authentication tests across all endpoints. + +Client authentication in OAuth2 serve two purposes, to authenticate +confidential clients and to ensure public clients are in fact public. The +latter is achieved with authenticate_client_id and the former with +authenticate_client. + +We make sure authentication is done by requiring a client object to be set +on the request object with a client_id parameter. The client_id attribute +prevents this check from being circumvented with a client form parameter. +""" +import json +from unittest import mock + +from oauthlib.oauth2 import ( + BackendApplicationServer, LegacyApplicationServer, MobileApplicationServer, + RequestValidator, WebApplicationServer, +) + +from tests.unittest import TestCase + +from .test_utils import get_fragment_credentials + + +class ClientAuthenticationTest(TestCase): + + def inspect_client(self, request, refresh_token=False): + if not request.client or not request.client.client_id: + raise ValueError() + return 'abc' + + def setUp(self): + self.validator = mock.MagicMock(spec=RequestValidator) + self.validator.is_pkce_required.return_value = False + self.validator.get_code_challenge.return_value = None + self.validator.get_default_redirect_uri.return_value = 'http://i.b./path' + self.web = WebApplicationServer(self.validator, + token_generator=self.inspect_client) + self.mobile = MobileApplicationServer(self.validator, + token_generator=self.inspect_client) + self.legacy = LegacyApplicationServer(self.validator, + token_generator=self.inspect_client) + self.backend = BackendApplicationServer(self.validator, + token_generator=self.inspect_client) + self.token_uri = 'http://example.com/path' + self.auth_uri = 'http://example.com/path?client_id=abc&response_type=token' + # should be base64 but no added value in this unittest + self.basicauth_client_creds = {"Authorization": "john:doe"} + self.basicauth_client_id = {"Authorization": "john:"} + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def set_client_id(self, client_id, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def basicauth_authenticate_client(self, request): + assert "Authorization" in request.headers + assert "john:doe" in request.headers["Authorization"] + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def test_client_id_authentication(self): + token_uri = 'http://example.com/path' + + # authorization code grant + self.validator.authenticate_client.return_value = False + self.validator.authenticate_client_id.return_value = False + _, body, _ = self.web.create_token_response(token_uri, + body='grant_type=authorization_code&code=mock') + self.assertEqual(json.loads(body)['error'], 'invalid_client') + + self.validator.authenticate_client_id.return_value = True + self.validator.authenticate_client.side_effect = self.set_client + _, body, _ = self.web.create_token_response(token_uri, + body='grant_type=authorization_code&code=mock') + self.assertIn('access_token', json.loads(body)) + + # implicit grant + auth_uri = 'http://example.com/path?client_id=abc&response_type=token' + self.assertRaises(ValueError, self.mobile.create_authorization_response, + auth_uri, scopes=['random']) + + self.validator.validate_client_id.side_effect = self.set_client_id + h, _, s = self.mobile.create_authorization_response(auth_uri, scopes=['random']) + self.assertEqual(302, s) + self.assertIn('Location', h) + self.assertIn('access_token', get_fragment_credentials(h['Location'])) + + def test_basicauth_web(self): + self.validator.authenticate_client.side_effect = self.basicauth_authenticate_client + _, body, _ = self.web.create_token_response( + self.token_uri, + body='grant_type=authorization_code&code=mock', + headers=self.basicauth_client_creds + ) + self.assertIn('access_token', json.loads(body)) + + def test_basicauth_legacy(self): + self.validator.authenticate_client.side_effect = self.basicauth_authenticate_client + _, body, _ = self.legacy.create_token_response( + self.token_uri, + body='grant_type=password&username=abc&password=secret', + headers=self.basicauth_client_creds + ) + self.assertIn('access_token', json.loads(body)) + + def test_basicauth_backend(self): + self.validator.authenticate_client.side_effect = self.basicauth_authenticate_client + _, body, _ = self.backend.create_token_response( + self.token_uri, + body='grant_type=client_credentials', + headers=self.basicauth_client_creds + ) + self.assertIn('access_token', json.loads(body)) + + def test_basicauth_revoke(self): + self.validator.authenticate_client.side_effect = self.basicauth_authenticate_client + + # legacy or any other uses the same RevocationEndpoint + _, body, status = self.legacy.create_revocation_response( + self.token_uri, + body='token=foobar', + headers=self.basicauth_client_creds + ) + self.assertEqual(status, 200, body) + + def test_basicauth_introspect(self): + self.validator.authenticate_client.side_effect = self.basicauth_authenticate_client + + # legacy or any other uses the same IntrospectEndpoint + _, body, status = self.legacy.create_introspect_response( + self.token_uri, + body='token=foobar', + headers=self.basicauth_client_creds + ) + self.assertEqual(status, 200, body) + + def test_custom_authentication(self): + token_uri = 'http://example.com/path' + + # authorization code grant + self.assertRaises(NotImplementedError, + self.web.create_token_response, token_uri, + body='grant_type=authorization_code&code=mock') + + # password grant + self.validator.authenticate_client.return_value = True + self.assertRaises(NotImplementedError, + self.legacy.create_token_response, token_uri, + body='grant_type=password&username=abc&password=secret') + + # client credentials grant + self.validator.authenticate_client.return_value = True + self.assertRaises(NotImplementedError, + self.backend.create_token_response, token_uri, + body='grant_type=client_credentials') diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py new file mode 100644 index 0000000000..32c770ccb7 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py @@ -0,0 +1,128 @@ +"""Ensure credentials are preserved through the authorization. + +The Authorization Code Grant will need to preserve state as well as redirect +uri and the Implicit Grant will need to preserve state. +""" +import json +from unittest import mock + +from oauthlib.oauth2 import ( + MobileApplicationServer, RequestValidator, WebApplicationServer, +) +from oauthlib.oauth2.rfc6749 import errors + +from tests.unittest import TestCase + +from .test_utils import get_fragment_credentials, get_query_credentials + + +class PreservationTest(TestCase): + + DEFAULT_REDIRECT_URI = 'http://i.b./path' + + def setUp(self): + self.validator = mock.MagicMock(spec=RequestValidator) + self.validator.get_default_redirect_uri.return_value = self.DEFAULT_REDIRECT_URI + self.validator.get_code_challenge.return_value = None + self.validator.authenticate_client.side_effect = self.set_client + self.web = WebApplicationServer(self.validator) + self.mobile = MobileApplicationServer(self.validator) + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def test_state_preservation(self): + auth_uri = 'http://example.com/path?state=xyz&client_id=abc&response_type=' + + # authorization grant + h, _, s = self.web.create_authorization_response( + auth_uri + 'code', scopes=['random']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertEqual(get_query_credentials(h['Location'])['state'][0], 'xyz') + + # implicit grant + h, _, s = self.mobile.create_authorization_response( + auth_uri + 'token', scopes=['random']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertEqual(get_fragment_credentials(h['Location'])['state'][0], 'xyz') + + def test_redirect_uri_preservation(self): + auth_uri = 'http://example.com/path?redirect_uri=http%3A%2F%2Fi.b%2Fpath&client_id=abc' + redirect_uri = 'http://i.b/path' + token_uri = 'http://example.com/path' + + # authorization grant + h, _, s = self.web.create_authorization_response( + auth_uri + '&response_type=code', scopes=['random']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertTrue(h['Location'].startswith(redirect_uri)) + + # confirm_redirect_uri should return false if the redirect uri + # was given in the authorization but not in the token request. + self.validator.confirm_redirect_uri.return_value = False + code = get_query_credentials(h['Location'])['code'][0] + _, body, _ = self.web.create_token_response(token_uri, + body='grant_type=authorization_code&code=%s' % code) + self.assertEqual(json.loads(body)['error'], 'invalid_request') + + # implicit grant + h, _, s = self.mobile.create_authorization_response( + auth_uri + '&response_type=token', scopes=['random']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertTrue(h['Location'].startswith(redirect_uri)) + + def test_invalid_redirect_uri(self): + auth_uri = 'http://example.com/path?redirect_uri=http%3A%2F%2Fi.b%2Fpath&client_id=abc' + self.validator.validate_redirect_uri.return_value = False + + # authorization grant + self.assertRaises(errors.MismatchingRedirectURIError, + self.web.create_authorization_response, + auth_uri + '&response_type=code', scopes=['random']) + + # implicit grant + self.assertRaises(errors.MismatchingRedirectURIError, + self.mobile.create_authorization_response, + auth_uri + '&response_type=token', scopes=['random']) + + def test_default_uri(self): + auth_uri = 'http://example.com/path?state=xyz&client_id=abc' + + self.validator.get_default_redirect_uri.return_value = None + + # authorization grant + self.assertRaises(errors.MissingRedirectURIError, + self.web.create_authorization_response, + auth_uri + '&response_type=code', scopes=['random']) + + # implicit grant + self.assertRaises(errors.MissingRedirectURIError, + self.mobile.create_authorization_response, + auth_uri + '&response_type=token', scopes=['random']) + + def test_default_uri_in_token(self): + auth_uri = 'http://example.com/path?state=xyz&client_id=abc' + token_uri = 'http://example.com/path' + + # authorization grant + h, _, s = self.web.create_authorization_response( + auth_uri + '&response_type=code', scopes=['random']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertTrue(h['Location'].startswith(self.DEFAULT_REDIRECT_URI)) + + # confirm_redirect_uri should return true if the redirect uri + # was not given in the authorization AND not in the token request. + self.validator.confirm_redirect_uri.return_value = True + code = get_query_credentials(h['Location'])['code'][0] + self.validator.validate_code.return_value = True + _, body, s = self.web.create_token_response(token_uri, + body='grant_type=authorization_code&code=%s' % code) + self.assertEqual(s, 200) + self.assertEqual(self.validator.confirm_redirect_uri.call_args[0][2], self.DEFAULT_REDIRECT_URI) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_error_responses.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_error_responses.py new file mode 100644 index 0000000000..f61595e213 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_error_responses.py @@ -0,0 +1,491 @@ +"""Ensure the correct error responses are returned for all defined error types. +""" +import json +from unittest import mock + +from oauthlib.common import urlencode +from oauthlib.oauth2 import ( + BackendApplicationServer, LegacyApplicationServer, MobileApplicationServer, + RequestValidator, WebApplicationServer, +) +from oauthlib.oauth2.rfc6749 import errors + +from tests.unittest import TestCase + + +class ErrorResponseTest(TestCase): + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def setUp(self): + self.validator = mock.MagicMock(spec=RequestValidator) + self.validator.get_default_redirect_uri.return_value = None + self.validator.get_code_challenge.return_value = None + self.web = WebApplicationServer(self.validator) + self.mobile = MobileApplicationServer(self.validator) + self.legacy = LegacyApplicationServer(self.validator) + self.backend = BackendApplicationServer(self.validator) + + def test_invalid_redirect_uri(self): + uri = 'https://example.com/authorize?response_type={0}&client_id=foo&redirect_uri=wrong' + + # Authorization code grant + self.assertRaises(errors.InvalidRedirectURIError, + self.web.validate_authorization_request, uri.format('code')) + self.assertRaises(errors.InvalidRedirectURIError, + self.web.create_authorization_response, uri.format('code'), scopes=['foo']) + + # Implicit grant + self.assertRaises(errors.InvalidRedirectURIError, + self.mobile.validate_authorization_request, uri.format('token')) + self.assertRaises(errors.InvalidRedirectURIError, + self.mobile.create_authorization_response, uri.format('token'), scopes=['foo']) + + def test_invalid_default_redirect_uri(self): + uri = 'https://example.com/authorize?response_type={0}&client_id=foo' + self.validator.get_default_redirect_uri.return_value = "wrong" + + # Authorization code grant + self.assertRaises(errors.InvalidRedirectURIError, + self.web.validate_authorization_request, uri.format('code')) + self.assertRaises(errors.InvalidRedirectURIError, + self.web.create_authorization_response, uri.format('code'), scopes=['foo']) + + # Implicit grant + self.assertRaises(errors.InvalidRedirectURIError, + self.mobile.validate_authorization_request, uri.format('token')) + self.assertRaises(errors.InvalidRedirectURIError, + self.mobile.create_authorization_response, uri.format('token'), scopes=['foo']) + + def test_missing_redirect_uri(self): + uri = 'https://example.com/authorize?response_type={0}&client_id=foo' + + # Authorization code grant + self.assertRaises(errors.MissingRedirectURIError, + self.web.validate_authorization_request, uri.format('code')) + self.assertRaises(errors.MissingRedirectURIError, + self.web.create_authorization_response, uri.format('code'), scopes=['foo']) + + # Implicit grant + self.assertRaises(errors.MissingRedirectURIError, + self.mobile.validate_authorization_request, uri.format('token')) + self.assertRaises(errors.MissingRedirectURIError, + self.mobile.create_authorization_response, uri.format('token'), scopes=['foo']) + + def test_mismatching_redirect_uri(self): + uri = 'https://example.com/authorize?response_type={0}&client_id=foo&redirect_uri=https%3A%2F%2Fi.b%2Fback' + + # Authorization code grant + self.validator.validate_redirect_uri.return_value = False + self.assertRaises(errors.MismatchingRedirectURIError, + self.web.validate_authorization_request, uri.format('code')) + self.assertRaises(errors.MismatchingRedirectURIError, + self.web.create_authorization_response, uri.format('code'), scopes=['foo']) + + # Implicit grant + self.assertRaises(errors.MismatchingRedirectURIError, + self.mobile.validate_authorization_request, uri.format('token')) + self.assertRaises(errors.MismatchingRedirectURIError, + self.mobile.create_authorization_response, uri.format('token'), scopes=['foo']) + + def test_missing_client_id(self): + uri = 'https://example.com/authorize?response_type={0}&redirect_uri=https%3A%2F%2Fi.b%2Fback' + + # Authorization code grant + self.validator.validate_redirect_uri.return_value = False + self.assertRaises(errors.MissingClientIdError, + self.web.validate_authorization_request, uri.format('code')) + self.assertRaises(errors.MissingClientIdError, + self.web.create_authorization_response, uri.format('code'), scopes=['foo']) + + # Implicit grant + self.assertRaises(errors.MissingClientIdError, + self.mobile.validate_authorization_request, uri.format('token')) + self.assertRaises(errors.MissingClientIdError, + self.mobile.create_authorization_response, uri.format('token'), scopes=['foo']) + + def test_invalid_client_id(self): + uri = 'https://example.com/authorize?response_type={0}&client_id=foo&redirect_uri=https%3A%2F%2Fi.b%2Fback' + + # Authorization code grant + self.validator.validate_client_id.return_value = False + self.assertRaises(errors.InvalidClientIdError, + self.web.validate_authorization_request, uri.format('code')) + self.assertRaises(errors.InvalidClientIdError, + self.web.create_authorization_response, uri.format('code'), scopes=['foo']) + + # Implicit grant + self.assertRaises(errors.InvalidClientIdError, + self.mobile.validate_authorization_request, uri.format('token')) + self.assertRaises(errors.InvalidClientIdError, + self.mobile.create_authorization_response, uri.format('token'), scopes=['foo']) + + def test_empty_parameter(self): + uri = 'https://example.com/authorize?client_id=foo&redirect_uri=https%3A%2F%2Fi.b%2Fback&response_type=code&' + + # Authorization code grant + self.assertRaises(errors.InvalidRequestFatalError, + self.web.validate_authorization_request, uri) + + # Implicit grant + self.assertRaises(errors.InvalidRequestFatalError, + self.mobile.validate_authorization_request, uri) + + def test_invalid_request(self): + self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' + token_uri = 'https://i.b/token' + + invalid_bodies = [ + # duplicate params + 'grant_type=authorization_code&client_id=nope&client_id=nope&code=foo' + ] + for body in invalid_bodies: + _, body, _ = self.web.create_token_response(token_uri, + body=body) + self.assertEqual('invalid_request', json.loads(body)['error']) + + # Password credentials grant + invalid_bodies = [ + # duplicate params + 'grant_type=password&username=foo&username=bar&password=baz' + # missing username + 'grant_type=password&password=baz' + # missing password + 'grant_type=password&username=foo' + ] + self.validator.authenticate_client.side_effect = self.set_client + for body in invalid_bodies: + _, body, _ = self.legacy.create_token_response(token_uri, + body=body) + self.assertEqual('invalid_request', json.loads(body)['error']) + + # Client credentials grant + invalid_bodies = [ + # duplicate params + 'grant_type=client_credentials&scope=foo&scope=bar' + ] + for body in invalid_bodies: + _, body, _ = self.backend.create_token_response(token_uri, + body=body) + self.assertEqual('invalid_request', json.loads(body)['error']) + + def test_invalid_request_duplicate_params(self): + self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' + uri = 'https://i.b/auth?client_id=foo&client_id=bar&response_type={0}' + description = 'Duplicate client_id parameter.' + + # Authorization code + self.assertRaisesRegex(errors.InvalidRequestFatalError, + description, + self.web.validate_authorization_request, + uri.format('code')) + self.assertRaisesRegex(errors.InvalidRequestFatalError, + description, + self.web.create_authorization_response, + uri.format('code'), scopes=['foo']) + + # Implicit grant + self.assertRaisesRegex(errors.InvalidRequestFatalError, + description, + self.mobile.validate_authorization_request, + uri.format('token')) + self.assertRaisesRegex(errors.InvalidRequestFatalError, + description, + self.mobile.create_authorization_response, + uri.format('token'), scopes=['foo']) + + def test_invalid_request_missing_response_type(self): + + self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' + + uri = 'https://i.b/auth?client_id=foo' + + # Authorization code + self.assertRaises(errors.MissingResponseTypeError, + self.web.validate_authorization_request, + uri.format('code')) + h, _, s = self.web.create_authorization_response(uri, scopes=['foo']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertIn('error=invalid_request', h['Location']) + + # Implicit grant + self.assertRaises(errors.MissingResponseTypeError, + self.mobile.validate_authorization_request, + uri.format('token')) + h, _, s = self.mobile.create_authorization_response(uri, scopes=['foo']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertIn('error=invalid_request', h['Location']) + + def test_unauthorized_client(self): + self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' + self.validator.validate_grant_type.return_value = False + self.validator.validate_response_type.return_value = False + self.validator.authenticate_client.side_effect = self.set_client + token_uri = 'https://i.b/token' + + # Authorization code grant + self.assertRaises(errors.UnauthorizedClientError, + self.web.validate_authorization_request, + 'https://i.b/auth?response_type=code&client_id=foo') + _, body, _ = self.web.create_token_response(token_uri, + body='grant_type=authorization_code&code=foo') + self.assertEqual('unauthorized_client', json.loads(body)['error']) + + # Implicit grant + self.assertRaises(errors.UnauthorizedClientError, + self.mobile.validate_authorization_request, + 'https://i.b/auth?response_type=token&client_id=foo') + + # Password credentials grant + _, body, _ = self.legacy.create_token_response(token_uri, + body='grant_type=password&username=foo&password=bar') + self.assertEqual('unauthorized_client', json.loads(body)['error']) + + # Client credentials grant + _, body, _ = self.backend.create_token_response(token_uri, + body='grant_type=client_credentials') + self.assertEqual('unauthorized_client', json.loads(body)['error']) + + def test_access_denied(self): + self.validator.authenticate_client.side_effect = self.set_client + self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' + self.validator.confirm_redirect_uri.return_value = False + token_uri = 'https://i.b/token' + # Authorization code grant + _, body, _ = self.web.create_token_response(token_uri, + body='grant_type=authorization_code&code=foo') + self.assertEqual('invalid_request', json.loads(body)['error']) + + def test_access_denied_no_default_redirecturi(self): + self.validator.authenticate_client.side_effect = self.set_client + self.validator.get_default_redirect_uri.return_value = None + token_uri = 'https://i.b/token' + # Authorization code grant + _, body, _ = self.web.create_token_response(token_uri, + body='grant_type=authorization_code&code=foo') + self.assertEqual('invalid_request', json.loads(body)['error']) + + def test_unsupported_response_type(self): + self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' + + # Authorization code grant + self.assertRaises(errors.UnsupportedResponseTypeError, + self.web.validate_authorization_request, + 'https://i.b/auth?response_type=foo&client_id=foo') + + # Implicit grant + self.assertRaises(errors.UnsupportedResponseTypeError, + self.mobile.validate_authorization_request, + 'https://i.b/auth?response_type=foo&client_id=foo') + + def test_invalid_scope(self): + self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' + self.validator.validate_scopes.return_value = False + self.validator.authenticate_client.side_effect = self.set_client + + # Authorization code grant + self.assertRaises(errors.InvalidScopeError, + self.web.validate_authorization_request, + 'https://i.b/auth?response_type=code&client_id=foo') + + # Implicit grant + self.assertRaises(errors.InvalidScopeError, + self.mobile.validate_authorization_request, + 'https://i.b/auth?response_type=token&client_id=foo') + + # Password credentials grant + _, body, _ = self.legacy.create_token_response( + 'https://i.b/token', + body='grant_type=password&username=foo&password=bar') + self.assertEqual('invalid_scope', json.loads(body)['error']) + + # Client credentials grant + _, body, _ = self.backend.create_token_response( + 'https://i.b/token', + body='grant_type=client_credentials') + self.assertEqual('invalid_scope', json.loads(body)['error']) + + def test_server_error(self): + def raise_error(*args, **kwargs): + raise ValueError() + + self.validator.validate_client_id.side_effect = raise_error + self.validator.authenticate_client.side_effect = raise_error + self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' + + # Authorization code grant + self.web.catch_errors = True + _, _, s = self.web.create_authorization_response( + 'https://i.b/auth?client_id=foo&response_type=code', + scopes=['foo']) + self.assertEqual(s, 500) + _, _, s = self.web.create_token_response( + 'https://i.b/token', + body='grant_type=authorization_code&code=foo', + scopes=['foo']) + self.assertEqual(s, 500) + + # Implicit grant + self.mobile.catch_errors = True + _, _, s = self.mobile.create_authorization_response( + 'https://i.b/auth?client_id=foo&response_type=token', + scopes=['foo']) + self.assertEqual(s, 500) + + # Password credentials grant + self.legacy.catch_errors = True + _, _, s = self.legacy.create_token_response( + 'https://i.b/token', + body='grant_type=password&username=foo&password=foo') + self.assertEqual(s, 500) + + # Client credentials grant + self.backend.catch_errors = True + _, _, s = self.backend.create_token_response( + 'https://i.b/token', + body='grant_type=client_credentials') + self.assertEqual(s, 500) + + def test_temporarily_unavailable(self): + # Authorization code grant + self.web.available = False + _, _, s = self.web.create_authorization_response( + 'https://i.b/auth?client_id=foo&response_type=code', + scopes=['foo']) + self.assertEqual(s, 503) + _, _, s = self.web.create_token_response( + 'https://i.b/token', + body='grant_type=authorization_code&code=foo', + scopes=['foo']) + self.assertEqual(s, 503) + + # Implicit grant + self.mobile.available = False + _, _, s = self.mobile.create_authorization_response( + 'https://i.b/auth?client_id=foo&response_type=token', + scopes=['foo']) + self.assertEqual(s, 503) + + # Password credentials grant + self.legacy.available = False + _, _, s = self.legacy.create_token_response( + 'https://i.b/token', + body='grant_type=password&username=foo&password=foo') + self.assertEqual(s, 503) + + # Client credentials grant + self.backend.available = False + _, _, s = self.backend.create_token_response( + 'https://i.b/token', + body='grant_type=client_credentials') + self.assertEqual(s, 503) + + def test_invalid_client(self): + self.validator.authenticate_client.return_value = False + self.validator.authenticate_client_id.return_value = False + + # Authorization code grant + _, body, _ = self.web.create_token_response('https://i.b/token', + body='grant_type=authorization_code&code=foo') + self.assertEqual('invalid_client', json.loads(body)['error']) + + # Password credentials grant + _, body, _ = self.legacy.create_token_response('https://i.b/token', + body='grant_type=password&username=foo&password=bar') + self.assertEqual('invalid_client', json.loads(body)['error']) + + # Client credentials grant + _, body, _ = self.legacy.create_token_response('https://i.b/token', + body='grant_type=client_credentials') + self.assertEqual('invalid_client', json.loads(body)['error']) + + def test_invalid_grant(self): + self.validator.authenticate_client.side_effect = self.set_client + + # Authorization code grant + self.validator.validate_code.return_value = False + _, body, _ = self.web.create_token_response('https://i.b/token', + body='grant_type=authorization_code&code=foo') + self.assertEqual('invalid_grant', json.loads(body)['error']) + + # Password credentials grant + self.validator.validate_user.return_value = False + _, body, _ = self.legacy.create_token_response('https://i.b/token', + body='grant_type=password&username=foo&password=bar') + self.assertEqual('invalid_grant', json.loads(body)['error']) + + def test_unsupported_grant_type(self): + self.validator.authenticate_client.side_effect = self.set_client + + # Authorization code grant + _, body, _ = self.web.create_token_response('https://i.b/token', + body='grant_type=bar&code=foo') + self.assertEqual('unsupported_grant_type', json.loads(body)['error']) + + # Password credentials grant + _, body, _ = self.legacy.create_token_response('https://i.b/token', + body='grant_type=bar&username=foo&password=bar') + self.assertEqual('unsupported_grant_type', json.loads(body)['error']) + + # Client credentials grant + _, body, _ = self.backend.create_token_response('https://i.b/token', + body='grant_type=bar') + self.assertEqual('unsupported_grant_type', json.loads(body)['error']) + + def test_invalid_request_method(self): + test_methods = ['GET', 'pUt', 'dEleTe', 'paTcH'] + test_methods = test_methods + [x.lower() for x in test_methods] + [x.upper() for x in test_methods] + for method in test_methods: + self.validator.authenticate_client.side_effect = self.set_client + + uri = "http://i/b/token/" + try: + _, body, s = self.web.create_token_response(uri, + body='grant_type=access_token&code=123', http_method=method) + self.fail('This should have failed with InvalidRequestError') + except errors.InvalidRequestError as ire: + self.assertIn('Unsupported request method', ire.description) + + try: + _, body, s = self.legacy.create_token_response(uri, + body='grant_type=access_token&code=123', http_method=method) + self.fail('This should have failed with InvalidRequestError') + except errors.InvalidRequestError as ire: + self.assertIn('Unsupported request method', ire.description) + + try: + _, body, s = self.backend.create_token_response(uri, + body='grant_type=access_token&code=123', http_method=method) + self.fail('This should have failed with InvalidRequestError') + except errors.InvalidRequestError as ire: + self.assertIn('Unsupported request method', ire.description) + + def test_invalid_post_request(self): + self.validator.authenticate_client.side_effect = self.set_client + for param in ['token', 'secret', 'code', 'foo']: + uri = 'https://i/b/token?' + urlencode([(param, 'secret')]) + try: + _, body, s = self.web.create_token_response(uri, + body='grant_type=access_token&code=123') + self.fail('This should have failed with InvalidRequestError') + except errors.InvalidRequestError as ire: + self.assertIn('URL query parameters are not allowed', ire.description) + + try: + _, body, s = self.legacy.create_token_response(uri, + body='grant_type=access_token&code=123') + self.fail('This should have failed with InvalidRequestError') + except errors.InvalidRequestError as ire: + self.assertIn('URL query parameters are not allowed', ire.description) + + try: + _, body, s = self.backend.create_token_response(uri, + body='grant_type=access_token&code=123') + self.fail('This should have failed with InvalidRequestError') + except errors.InvalidRequestError as ire: + self.assertIn('URL query parameters are not allowed', ire.description) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_extra_credentials.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_extra_credentials.py new file mode 100644 index 0000000000..97aaf86dff --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_extra_credentials.py @@ -0,0 +1,69 @@ +"""Ensure extra credentials can be supplied for inclusion in tokens. +""" +from unittest import mock + +from oauthlib.oauth2 import ( + BackendApplicationServer, LegacyApplicationServer, MobileApplicationServer, + RequestValidator, WebApplicationServer, +) + +from tests.unittest import TestCase + + +class ExtraCredentialsTest(TestCase): + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def setUp(self): + self.validator = mock.MagicMock(spec=RequestValidator) + self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb' + self.web = WebApplicationServer(self.validator) + self.mobile = MobileApplicationServer(self.validator) + self.legacy = LegacyApplicationServer(self.validator) + self.backend = BackendApplicationServer(self.validator) + + def test_post_authorization_request(self): + def save_code(client_id, token, request): + self.assertEqual('creds', request.extra) + + def save_token(token, request): + self.assertEqual('creds', request.extra) + + # Authorization code grant + self.validator.save_authorization_code.side_effect = save_code + self.web.create_authorization_response( + 'https://i.b/auth?client_id=foo&response_type=code', + scopes=['foo'], + credentials={'extra': 'creds'}) + + # Implicit grant + self.validator.save_bearer_token.side_effect = save_token + self.mobile.create_authorization_response( + 'https://i.b/auth?client_id=foo&response_type=token', + scopes=['foo'], + credentials={'extra': 'creds'}) + + def test_token_request(self): + def save_token(token, request): + self.assertIn('extra', token) + + self.validator.save_bearer_token.side_effect = save_token + self.validator.authenticate_client.side_effect = self.set_client + + # Authorization code grant + self.web.create_token_response('https://i.b/token', + body='grant_type=authorization_code&code=foo', + credentials={'extra': 'creds'}) + + # Password credentials grant + self.legacy.create_token_response('https://i.b/token', + body='grant_type=password&username=foo&password=bar', + credentials={'extra': 'creds'}) + + # Client credentials grant + self.backend.create_token_response('https://i.b/token', + body='grant_type=client_credentials', + credentials={'extra': 'creds'}) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py new file mode 100644 index 0000000000..6d3d119a3b --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py @@ -0,0 +1,168 @@ +# -*- coding: utf-8 -*- +from json import loads +from unittest.mock import MagicMock + +from oauthlib.common import urlencode +from oauthlib.oauth2 import IntrospectEndpoint, RequestValidator + +from tests.unittest import TestCase + + +class IntrospectEndpointTest(TestCase): + + def setUp(self): + self.validator = MagicMock(wraps=RequestValidator()) + self.validator.client_authentication_required.return_value = True + self.validator.authenticate_client.return_value = True + self.validator.validate_bearer_token.return_value = True + self.validator.introspect_token.return_value = {} + self.endpoint = IntrospectEndpoint(self.validator) + + self.uri = 'should_not_matter' + self.headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + } + self.resp_h = { + 'Cache-Control': 'no-store', + 'Content-Type': 'application/json', + 'Pragma': 'no-cache' + } + self.resp_b = { + "active": True + } + + def test_introspect_token(self): + for token_type in ('access_token', 'refresh_token', 'invalid'): + body = urlencode([('token', 'foo'), + ('token_type_hint', token_type)]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), self.resp_b) + self.assertEqual(s, 200) + + def test_introspect_token_nohint(self): + # don't specify token_type_hint + body = urlencode([('token', 'foo')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), self.resp_b) + self.assertEqual(s, 200) + + def test_introspect_token_false(self): + self.validator.introspect_token.return_value = None + body = urlencode([('token', 'foo')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), {"active": False}) + self.assertEqual(s, 200) + + def test_introspect_token_claims(self): + self.validator.introspect_token.return_value = {"foo": "bar"} + body = urlencode([('token', 'foo')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), {"active": True, "foo": "bar"}) + self.assertEqual(s, 200) + + def test_introspect_token_claims_spoof_active(self): + self.validator.introspect_token.return_value = {"foo": "bar", "active": False} + body = urlencode([('token', 'foo')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), {"active": True, "foo": "bar"}) + self.assertEqual(s, 200) + + def test_introspect_token_client_authentication_failed(self): + self.validator.authenticate_client.return_value = False + body = urlencode([('token', 'foo'), + ('token_type_hint', 'access_token')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, { + 'Content-Type': 'application/json', + 'Cache-Control': 'no-store', + 'Pragma': 'no-cache', + "WWW-Authenticate": 'Bearer error="invalid_client"' + }) + self.assertEqual(loads(b)['error'], 'invalid_client') + self.assertEqual(s, 401) + + def test_introspect_token_public_client_authentication(self): + self.validator.client_authentication_required.return_value = False + self.validator.authenticate_client_id.return_value = True + for token_type in ('access_token', 'refresh_token', 'invalid'): + body = urlencode([('token', 'foo'), + ('token_type_hint', token_type)]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), self.resp_b) + self.assertEqual(s, 200) + + def test_introspect_token_public_client_authentication_failed(self): + self.validator.client_authentication_required.return_value = False + self.validator.authenticate_client_id.return_value = False + body = urlencode([('token', 'foo'), + ('token_type_hint', 'access_token')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, { + 'Content-Type': 'application/json', + 'Cache-Control': 'no-store', + 'Pragma': 'no-cache', + "WWW-Authenticate": 'Bearer error="invalid_client"' + }) + self.assertEqual(loads(b)['error'], 'invalid_client') + self.assertEqual(s, 401) + + def test_introspect_unsupported_token(self): + endpoint = IntrospectEndpoint(self.validator, + supported_token_types=['access_token']) + body = urlencode([('token', 'foo'), + ('token_type_hint', 'refresh_token')]) + h, b, s = endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b)['error'], 'unsupported_token_type') + self.assertEqual(s, 400) + + h, b, s = endpoint.create_introspect_response(self.uri, + headers=self.headers, body='') + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b)['error'], 'invalid_request') + self.assertEqual(s, 400) + + def test_introspect_invalid_request_method(self): + endpoint = IntrospectEndpoint(self.validator, + supported_token_types=['access_token']) + test_methods = ['GET', 'pUt', 'dEleTe', 'paTcH'] + test_methods = test_methods + [x.lower() for x in test_methods] + [x.upper() for x in test_methods] + for method in test_methods: + body = urlencode([('token', 'foo'), + ('token_type_hint', 'refresh_token')]) + h, b, s = endpoint.create_introspect_response(self.uri, + http_method = method, headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b)['error'], 'invalid_request') + self.assertIn('Unsupported request method', loads(b)['error_description']) + self.assertEqual(s, 400) + + def test_introspect_bad_post_request(self): + endpoint = IntrospectEndpoint(self.validator, + supported_token_types=['access_token']) + for param in ['token', 'secret', 'code', 'foo']: + uri = 'http://some.endpoint?' + urlencode([(param, 'secret')]) + body = urlencode([('token', 'foo'), + ('token_type_hint', 'access_token')]) + h, b, s = endpoint.create_introspect_response( + uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b)['error'], 'invalid_request') + self.assertIn('query parameters are not allowed', loads(b)['error_description']) + self.assertEqual(s, 400) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_metadata.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_metadata.py new file mode 100644 index 0000000000..1f5b912100 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_metadata.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +import json + +from oauthlib.oauth2 import MetadataEndpoint, Server, TokenEndpoint + +from tests.unittest import TestCase + + +class MetadataEndpointTest(TestCase): + def setUp(self): + self.metadata = { + "issuer": 'https://foo.bar' + } + + def test_openid_oauth2_preconfigured(self): + default_claims = { + "issuer": 'https://foo.bar', + "authorization_endpoint": "https://foo.bar/authorize", + "revocation_endpoint": "https://foo.bar/revoke", + "introspection_endpoint": "https://foo.bar/introspect", + "token_endpoint": "https://foo.bar/token" + } + from oauthlib.oauth2 import Server as OAuth2Server + from oauthlib.openid import Server as OpenIDServer + + endpoint = OAuth2Server(None) + metadata = MetadataEndpoint([endpoint], default_claims) + oauth2_claims = metadata.claims + + endpoint = OpenIDServer(None) + metadata = MetadataEndpoint([endpoint], default_claims) + openid_claims = metadata.claims + + # Pure OAuth2 Authorization Metadata are similar with OpenID but + # response_type not! (OIDC contains "id_token" and hybrid flows) + del oauth2_claims['response_types_supported'] + del openid_claims['response_types_supported'] + + self.maxDiff = None + self.assertEqual(openid_claims, oauth2_claims) + + def test_create_metadata_response(self): + endpoint = TokenEndpoint(None, None, grant_types={"password": None}) + metadata = MetadataEndpoint([endpoint], { + "issuer": 'https://foo.bar', + "token_endpoint": "https://foo.bar/token" + }) + headers, body, status = metadata.create_metadata_response('/', 'GET') + assert headers == { + 'Content-Type': 'application/json', + 'Access-Control-Allow-Origin': '*', + } + claims = json.loads(body) + assert claims['issuer'] == 'https://foo.bar' + + def test_token_endpoint(self): + endpoint = TokenEndpoint(None, None, grant_types={"password": None}) + metadata = MetadataEndpoint([endpoint], { + "issuer": 'https://foo.bar', + "token_endpoint": "https://foo.bar/token" + }) + self.assertIn("grant_types_supported", metadata.claims) + self.assertEqual(metadata.claims["grant_types_supported"], ["password"]) + + def test_token_endpoint_overridden(self): + endpoint = TokenEndpoint(None, None, grant_types={"password": None}) + metadata = MetadataEndpoint([endpoint], { + "issuer": 'https://foo.bar', + "token_endpoint": "https://foo.bar/token", + "grant_types_supported": ["pass_word_special_provider"] + }) + self.assertIn("grant_types_supported", metadata.claims) + self.assertEqual(metadata.claims["grant_types_supported"], ["pass_word_special_provider"]) + + def test_mandatory_fields(self): + metadata = MetadataEndpoint([], self.metadata) + self.assertIn("issuer", metadata.claims) + self.assertEqual(metadata.claims["issuer"], 'https://foo.bar') + + def test_server_metadata(self): + endpoint = Server(None) + metadata = MetadataEndpoint([endpoint], { + "issuer": 'https://foo.bar', + "authorization_endpoint": "https://foo.bar/authorize", + "introspection_endpoint": "https://foo.bar/introspect", + "revocation_endpoint": "https://foo.bar/revoke", + "token_endpoint": "https://foo.bar/token", + "jwks_uri": "https://foo.bar/certs", + "scopes_supported": ["email", "profile"] + }) + expected_claims = { + "issuer": "https://foo.bar", + "authorization_endpoint": "https://foo.bar/authorize", + "introspection_endpoint": "https://foo.bar/introspect", + "revocation_endpoint": "https://foo.bar/revoke", + "token_endpoint": "https://foo.bar/token", + "jwks_uri": "https://foo.bar/certs", + "scopes_supported": ["email", "profile"], + "grant_types_supported": [ + "authorization_code", + "password", + "client_credentials", + "refresh_token", + "implicit" + ], + "token_endpoint_auth_methods_supported": [ + "client_secret_post", + "client_secret_basic" + ], + "response_types_supported": [ + "code", + "token" + ], + "response_modes_supported": [ + "query", + "fragment" + ], + "code_challenge_methods_supported": [ + "plain", + "S256" + ], + "revocation_endpoint_auth_methods_supported": [ + "client_secret_post", + "client_secret_basic" + ], + "introspection_endpoint_auth_methods_supported": [ + "client_secret_post", + "client_secret_basic" + ] + } + + def sort_list(claims): + for k in claims.keys(): + claims[k] = sorted(claims[k]) + + sort_list(metadata.claims) + sort_list(expected_claims) + self.assertEqual(sorted(metadata.claims.items()), sorted(expected_claims.items())) + + def test_metadata_validate_issuer(self): + with self.assertRaises(ValueError): + endpoint = TokenEndpoint( + None, None, grant_types={"password": None}, + ) + metadata = MetadataEndpoint([endpoint], { + "issuer": 'http://foo.bar', + "token_endpoint": "https://foo.bar/token", + }) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py new file mode 100644 index 0000000000..04533888e9 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py @@ -0,0 +1,108 @@ +"""Ensure all tokens are associated with a resource owner. +""" +import json +from unittest import mock + +from oauthlib.oauth2 import ( + BackendApplicationServer, LegacyApplicationServer, MobileApplicationServer, + RequestValidator, WebApplicationServer, +) + +from tests.unittest import TestCase + +from .test_utils import get_fragment_credentials, get_query_credentials + + +class ResourceOwnerAssociationTest(TestCase): + + auth_uri = 'http://example.com/path?client_id=abc' + token_uri = 'http://example.com/path' + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def set_user(self, client_id, code, client, request): + request.user = 'test' + return True + + def set_user_from_username(self, username, password, client, request): + request.user = 'test' + return True + + def set_user_from_credentials(self, request): + request.user = 'test' + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def inspect_client(self, request, refresh_token=False): + if not request.user: + raise ValueError() + return 'abc' + + def setUp(self): + self.validator = mock.MagicMock(spec=RequestValidator) + self.validator.get_default_redirect_uri.return_value = 'http://i.b./path' + self.validator.get_code_challenge.return_value = None + self.validator.authenticate_client.side_effect = self.set_client + self.web = WebApplicationServer(self.validator, + token_generator=self.inspect_client) + self.mobile = MobileApplicationServer(self.validator, + token_generator=self.inspect_client) + self.legacy = LegacyApplicationServer(self.validator, + token_generator=self.inspect_client) + self.backend = BackendApplicationServer(self.validator, + token_generator=self.inspect_client) + + def test_web_application(self): + # TODO: code generator + intercept test + h, _, s = self.web.create_authorization_response( + self.auth_uri + '&response_type=code', + credentials={'user': 'test'}, scopes=['random']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + code = get_query_credentials(h['Location'])['code'][0] + self.assertRaises(ValueError, + self.web.create_token_response, self.token_uri, + body='grant_type=authorization_code&code=%s' % code) + + self.validator.validate_code.side_effect = self.set_user + _, body, _ = self.web.create_token_response(self.token_uri, + body='grant_type=authorization_code&code=%s' % code) + self.assertEqual(json.loads(body)['access_token'], 'abc') + + def test_mobile_application(self): + self.assertRaises(ValueError, + self.mobile.create_authorization_response, + self.auth_uri + '&response_type=token') + + h, _, s = self.mobile.create_authorization_response( + self.auth_uri + '&response_type=token', + credentials={'user': 'test'}, scopes=['random']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertEqual(get_fragment_credentials(h['Location'])['access_token'][0], 'abc') + + def test_legacy_application(self): + body = 'grant_type=password&username=abc&password=secret' + self.assertRaises(ValueError, + self.legacy.create_token_response, + self.token_uri, body=body) + + self.validator.validate_user.side_effect = self.set_user_from_username + _, body, _ = self.legacy.create_token_response( + self.token_uri, body=body) + self.assertEqual(json.loads(body)['access_token'], 'abc') + + def test_backend_application(self): + body = 'grant_type=client_credentials' + self.assertRaises(ValueError, + self.backend.create_token_response, + self.token_uri, body=body) + + self.validator.authenticate_client.side_effect = self.set_user_from_credentials + _, body, _ = self.backend.create_token_response( + self.token_uri, body=body) + self.assertEqual(json.loads(body)['access_token'], 'abc') diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_revocation_endpoint.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_revocation_endpoint.py new file mode 100644 index 0000000000..338dbd91fa --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_revocation_endpoint.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +from json import loads +from unittest.mock import MagicMock + +from oauthlib.common import urlencode +from oauthlib.oauth2 import RequestValidator, RevocationEndpoint + +from tests.unittest import TestCase + + +class RevocationEndpointTest(TestCase): + + def setUp(self): + self.validator = MagicMock(wraps=RequestValidator()) + self.validator.client_authentication_required.return_value = True + self.validator.authenticate_client.return_value = True + self.validator.revoke_token.return_value = True + self.endpoint = RevocationEndpoint(self.validator) + + self.uri = 'https://example.com/revoke_token' + self.headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + } + self.resp_h = { + 'Cache-Control': 'no-store', + 'Content-Type': 'application/json', + 'Pragma': 'no-cache' + } + + def test_revoke_token(self): + for token_type in ('access_token', 'refresh_token', 'invalid'): + body = urlencode([('token', 'foo'), + ('token_type_hint', token_type)]) + h, b, s = self.endpoint.create_revocation_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, {}) + self.assertEqual(b, '') + self.assertEqual(s, 200) + + # don't specify token_type_hint + body = urlencode([('token', 'foo')]) + h, b, s = self.endpoint.create_revocation_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, {}) + self.assertEqual(b, '') + self.assertEqual(s, 200) + + def test_revoke_token_client_authentication_failed(self): + self.validator.authenticate_client.return_value = False + body = urlencode([('token', 'foo'), + ('token_type_hint', 'access_token')]) + h, b, s = self.endpoint.create_revocation_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, { + 'Content-Type': 'application/json', + 'Cache-Control': 'no-store', + 'Pragma': 'no-cache', + "WWW-Authenticate": 'Bearer error="invalid_client"' + }) + self.assertEqual(loads(b)['error'], 'invalid_client') + self.assertEqual(s, 401) + + def test_revoke_token_public_client_authentication(self): + self.validator.client_authentication_required.return_value = False + self.validator.authenticate_client_id.return_value = True + for token_type in ('access_token', 'refresh_token', 'invalid'): + body = urlencode([('token', 'foo'), + ('token_type_hint', token_type)]) + h, b, s = self.endpoint.create_revocation_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, {}) + self.assertEqual(b, '') + self.assertEqual(s, 200) + + def test_revoke_token_public_client_authentication_failed(self): + self.validator.client_authentication_required.return_value = False + self.validator.authenticate_client_id.return_value = False + body = urlencode([('token', 'foo'), + ('token_type_hint', 'access_token')]) + h, b, s = self.endpoint.create_revocation_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, { + 'Content-Type': 'application/json', + 'Cache-Control': 'no-store', + 'Pragma': 'no-cache', + "WWW-Authenticate": 'Bearer error="invalid_client"' + }) + self.assertEqual(loads(b)['error'], 'invalid_client') + self.assertEqual(s, 401) + + def test_revoke_with_callback(self): + endpoint = RevocationEndpoint(self.validator, enable_jsonp=True) + callback = 'package.hello_world' + for token_type in ('access_token', 'refresh_token', 'invalid'): + body = urlencode([('token', 'foo'), + ('token_type_hint', token_type), + ('callback', callback)]) + h, b, s = endpoint.create_revocation_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, {}) + self.assertEqual(b, callback + '();') + self.assertEqual(s, 200) + + def test_revoke_unsupported_token(self): + endpoint = RevocationEndpoint(self.validator, + supported_token_types=['access_token']) + body = urlencode([('token', 'foo'), + ('token_type_hint', 'refresh_token')]) + h, b, s = endpoint.create_revocation_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b)['error'], 'unsupported_token_type') + self.assertEqual(s, 400) + + h, b, s = endpoint.create_revocation_response(self.uri, + headers=self.headers, body='') + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b)['error'], 'invalid_request') + self.assertEqual(s, 400) + + def test_revoke_invalid_request_method(self): + endpoint = RevocationEndpoint(self.validator, + supported_token_types=['access_token']) + test_methods = ['GET', 'pUt', 'dEleTe', 'paTcH'] + test_methods = test_methods + [x.lower() for x in test_methods] + [x.upper() for x in test_methods] + for method in test_methods: + body = urlencode([('token', 'foo'), + ('token_type_hint', 'refresh_token')]) + h, b, s = endpoint.create_revocation_response(self.uri, + http_method = method, headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b)['error'], 'invalid_request') + self.assertIn('Unsupported request method', loads(b)['error_description']) + self.assertEqual(s, 400) + + def test_revoke_bad_post_request(self): + endpoint = RevocationEndpoint(self.validator, + supported_token_types=['access_token']) + for param in ['token', 'secret', 'code', 'foo']: + uri = 'http://some.endpoint?' + urlencode([(param, 'secret')]) + body = urlencode([('token', 'foo'), + ('token_type_hint', 'access_token')]) + h, b, s = endpoint.create_revocation_response(uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b)['error'], 'invalid_request') + self.assertIn('query parameters are not allowed', loads(b)['error_description']) + self.assertEqual(s, 400) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_scope_handling.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_scope_handling.py new file mode 100644 index 0000000000..4c87d9c7c8 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_scope_handling.py @@ -0,0 +1,193 @@ +"""Ensure scope is preserved across authorization. + +Fairly trivial in all grants except the Authorization Code Grant where scope +need to be persisted temporarily in an authorization code. +""" +import json +from unittest import mock + +from oauthlib.oauth2 import ( + BackendApplicationServer, LegacyApplicationServer, MobileApplicationServer, + RequestValidator, Server, WebApplicationServer, +) + +from tests.unittest import TestCase + +from .test_utils import get_fragment_credentials, get_query_credentials + + +class TestScopeHandling(TestCase): + + DEFAULT_REDIRECT_URI = 'http://i.b./path' + + def set_scopes(self, scopes): + def set_request_scopes(client_id, code, client, request): + request.scopes = scopes + return True + return set_request_scopes + + def set_user(self, request): + request.user = 'foo' + request.client_id = 'bar' + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def setUp(self): + self.validator = mock.MagicMock(spec=RequestValidator) + self.validator.get_default_redirect_uri.return_value = TestScopeHandling.DEFAULT_REDIRECT_URI + self.validator.get_code_challenge.return_value = None + self.validator.authenticate_client.side_effect = self.set_client + self.server = Server(self.validator) + self.web = WebApplicationServer(self.validator) + self.mobile = MobileApplicationServer(self.validator) + self.legacy = LegacyApplicationServer(self.validator) + self.backend = BackendApplicationServer(self.validator) + + def test_scope_extraction(self): + scopes = ( + ('images', ['images']), + ('images+videos', ['images', 'videos']), + ('images+videos+openid', ['images', 'videos', 'openid']), + ('http%3A%2f%2fa.b%2fvideos', ['http://a.b/videos']), + ('http%3A%2f%2fa.b%2fvideos+pics', ['http://a.b/videos', 'pics']), + ('pics+http%3A%2f%2fa.b%2fvideos', ['pics', 'http://a.b/videos']), + ('http%3A%2f%2fa.b%2fvideos+https%3A%2f%2fc.d%2Fsecret', ['http://a.b/videos', 'https://c.d/secret']), + ) + + uri = 'http://example.com/path?client_id=abc&scope=%s&response_type=%s' + for scope, correct_scopes in scopes: + scopes, _ = self.web.validate_authorization_request( + uri % (scope, 'code')) + self.assertCountEqual(scopes, correct_scopes) + scopes, _ = self.mobile.validate_authorization_request( + uri % (scope, 'token')) + self.assertCountEqual(scopes, correct_scopes) + scopes, _ = self.server.validate_authorization_request( + uri % (scope, 'code')) + self.assertCountEqual(scopes, correct_scopes) + + def test_scope_preservation(self): + scope = 'pics+http%3A%2f%2fa.b%2fvideos' + decoded_scope = 'pics http://a.b/videos' + auth_uri = 'http://example.com/path?client_id=abc&response_type=' + token_uri = 'http://example.com/path' + + # authorization grant + for backend_server_type in ['web', 'server']: + h, _, s = getattr(self, backend_server_type).create_authorization_response( + auth_uri + 'code', scopes=decoded_scope.split(' ')) + self.validator.validate_code.side_effect = self.set_scopes(decoded_scope.split(' ')) + self.assertEqual(s, 302) + self.assertIn('Location', h) + code = get_query_credentials(h['Location'])['code'][0] + _, body, _ = getattr(self, backend_server_type).create_token_response(token_uri, + body='client_id=me&redirect_uri=http://back.to/me&grant_type=authorization_code&code=%s' % code) + self.assertEqual(json.loads(body)['scope'], decoded_scope) + + # implicit grant + for backend_server_type in ['mobile', 'server']: + h, _, s = getattr(self, backend_server_type).create_authorization_response( + auth_uri + 'token', scopes=decoded_scope.split(' ')) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertEqual(get_fragment_credentials(h['Location'])['scope'][0], decoded_scope) + + # resource owner password credentials grant + for backend_server_type in ['legacy', 'server']: + body = 'grant_type=password&username=abc&password=secret&scope=%s' + + _, body, _ = getattr(self, backend_server_type).create_token_response(token_uri, + body=body % scope) + self.assertEqual(json.loads(body)['scope'], decoded_scope) + + # client credentials grant + for backend_server_type in ['backend', 'server']: + body = 'grant_type=client_credentials&scope=%s' + self.validator.authenticate_client.side_effect = self.set_user + _, body, _ = getattr(self, backend_server_type).create_token_response(token_uri, + body=body % scope) + self.assertEqual(json.loads(body)['scope'], decoded_scope) + + def test_scope_changed(self): + scope = 'pics+http%3A%2f%2fa.b%2fvideos' + scopes = ['images', 'http://a.b/videos'] + decoded_scope = 'images http://a.b/videos' + auth_uri = 'http://example.com/path?client_id=abc&response_type=' + token_uri = 'http://example.com/path' + + # authorization grant + h, _, s = self.web.create_authorization_response( + auth_uri + 'code', scopes=scopes) + self.assertEqual(s, 302) + self.assertIn('Location', h) + code = get_query_credentials(h['Location'])['code'][0] + self.validator.validate_code.side_effect = self.set_scopes(scopes) + _, body, _ = self.web.create_token_response(token_uri, + body='grant_type=authorization_code&code=%s' % code) + self.assertEqual(json.loads(body)['scope'], decoded_scope) + + # implicit grant + self.validator.validate_scopes.side_effect = self.set_scopes(scopes) + h, _, s = self.mobile.create_authorization_response( + auth_uri + 'token', scopes=scopes) + self.assertEqual(s, 302) + self.assertIn('Location', h) + self.assertEqual(get_fragment_credentials(h['Location'])['scope'][0], decoded_scope) + + # resource owner password credentials grant + self.validator.validate_scopes.side_effect = self.set_scopes(scopes) + body = 'grant_type=password&username=abc&password=secret&scope=%s' + _, body, _ = self.legacy.create_token_response(token_uri, + body=body % scope) + self.assertEqual(json.loads(body)['scope'], decoded_scope) + + # client credentials grant + self.validator.validate_scopes.side_effect = self.set_scopes(scopes) + self.validator.authenticate_client.side_effect = self.set_user + body = 'grant_type=client_credentials&scope=%s' + _, body, _ = self.backend.create_token_response(token_uri, + body=body % scope) + + self.assertEqual(json.loads(body)['scope'], decoded_scope) + + def test_invalid_scope(self): + scope = 'pics+http%3A%2f%2fa.b%2fvideos' + auth_uri = 'http://example.com/path?client_id=abc&response_type=' + token_uri = 'http://example.com/path' + + self.validator.validate_scopes.return_value = False + + # authorization grant + h, _, s = self.web.create_authorization_response( + auth_uri + 'code', scopes=['invalid']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + error = get_query_credentials(h['Location'])['error'][0] + self.assertEqual(error, 'invalid_scope') + + # implicit grant + h, _, s = self.mobile.create_authorization_response( + auth_uri + 'token', scopes=['invalid']) + self.assertEqual(s, 302) + self.assertIn('Location', h) + error = get_fragment_credentials(h['Location'])['error'][0] + self.assertEqual(error, 'invalid_scope') + + # resource owner password credentials grant + body = 'grant_type=password&username=abc&password=secret&scope=%s' + _, body, _ = self.legacy.create_token_response(token_uri, + body=body % scope) + self.assertEqual(json.loads(body)['error'], 'invalid_scope') + + # client credentials grant + self.validator.authenticate_client.side_effect = self.set_user + body = 'grant_type=client_credentials&scope=%s' + _, body, _ = self.backend.create_token_response(token_uri, + body=body % scope) + self.assertEqual(json.loads(body)['error'], 'invalid_scope') diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_utils.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_utils.py new file mode 100644 index 0000000000..5eae1956f4 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_utils.py @@ -0,0 +1,11 @@ +import urllib.parse as urlparse + + +def get_query_credentials(uri): + return urlparse.parse_qs(urlparse.urlparse(uri).query, + keep_blank_values=True) + + +def get_fragment_credentials(uri): + return urlparse.parse_qs(urlparse.urlparse(uri).fragment, + keep_blank_values=True) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/__init__.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/__init__.py diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_authorization_code.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_authorization_code.py new file mode 100644 index 0000000000..77e1a81b46 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_authorization_code.py @@ -0,0 +1,382 @@ +# -*- coding: utf-8 -*- +import json +from unittest import mock + +from oauthlib.common import Request +from oauthlib.oauth2.rfc6749 import errors +from oauthlib.oauth2.rfc6749.grant_types import ( + AuthorizationCodeGrant, authorization_code, +) +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from tests.unittest import TestCase + + +class AuthorizationCodeGrantTest(TestCase): + + def setUp(self): + self.request = Request('http://a.b/path') + self.request.scopes = ('hello', 'world') + self.request.expires_in = 1800 + self.request.client = 'batman' + self.request.client_id = 'abcdef' + self.request.code = '1234' + self.request.response_type = 'code' + self.request.grant_type = 'authorization_code' + self.request.redirect_uri = 'https://a.b/cb' + + self.mock_validator = mock.MagicMock() + self.mock_validator.is_pkce_required.return_value = False + self.mock_validator.get_code_challenge.return_value = None + self.mock_validator.is_origin_allowed.return_value = False + self.mock_validator.authenticate_client.side_effect = self.set_client + self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator) + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def setup_validators(self): + self.authval1, self.authval2 = mock.Mock(), mock.Mock() + self.authval1.return_value = {} + self.authval2.return_value = {} + self.tknval1, self.tknval2 = mock.Mock(), mock.Mock() + self.tknval1.return_value = None + self.tknval2.return_value = None + self.auth.custom_validators.pre_token.append(self.tknval1) + self.auth.custom_validators.post_token.append(self.tknval2) + self.auth.custom_validators.pre_auth.append(self.authval1) + self.auth.custom_validators.post_auth.append(self.authval2) + + def test_custom_auth_validators(self): + self.setup_validators() + + bearer = BearerToken(self.mock_validator) + self.auth.create_authorization_response(self.request, bearer) + self.assertTrue(self.authval1.called) + self.assertTrue(self.authval2.called) + self.assertFalse(self.tknval1.called) + self.assertFalse(self.tknval2.called) + + def test_custom_token_validators(self): + self.setup_validators() + + bearer = BearerToken(self.mock_validator) + self.auth.create_token_response(self.request, bearer) + self.assertTrue(self.tknval1.called) + self.assertTrue(self.tknval2.called) + self.assertFalse(self.authval1.called) + self.assertFalse(self.authval2.called) + + def test_create_authorization_grant(self): + bearer = BearerToken(self.mock_validator) + self.request.response_mode = 'query' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + grant = dict(Request(h['Location']).uri_query_params) + self.assertIn('code', grant) + self.assertTrue(self.mock_validator.validate_redirect_uri.called) + self.assertTrue(self.mock_validator.validate_response_type.called) + self.assertTrue(self.mock_validator.validate_scopes.called) + + def test_create_authorization_grant_no_scopes(self): + bearer = BearerToken(self.mock_validator) + self.request.response_mode = 'query' + self.request.scopes = [] + self.auth.create_authorization_response(self.request, bearer) + + def test_create_authorization_grant_state(self): + self.request.state = 'abc' + self.request.redirect_uri = None + self.request.response_mode = 'query' + self.mock_validator.get_default_redirect_uri.return_value = 'https://a.b/cb' + bearer = BearerToken(self.mock_validator) + h, b, s = self.auth.create_authorization_response(self.request, bearer) + grant = dict(Request(h['Location']).uri_query_params) + self.assertIn('code', grant) + self.assertIn('state', grant) + self.assertFalse(self.mock_validator.validate_redirect_uri.called) + self.assertTrue(self.mock_validator.get_default_redirect_uri.called) + self.assertTrue(self.mock_validator.validate_response_type.called) + self.assertTrue(self.mock_validator.validate_scopes.called) + + @mock.patch('oauthlib.common.generate_token') + def test_create_authorization_response(self, generate_token): + generate_token.return_value = 'abc' + bearer = BearerToken(self.mock_validator) + self.request.response_mode = 'query' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], 'https://a.b/cb?code=abc') + self.request.response_mode = 'fragment' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], 'https://a.b/cb#code=abc') + + def test_create_token_response(self): + bearer = BearerToken(self.mock_validator) + + h, token, s = self.auth.create_token_response(self.request, bearer) + token = json.loads(token) + self.assertEqual(self.mock_validator.save_token.call_count, 1) + self.assertIn('access_token', token) + self.assertIn('refresh_token', token) + self.assertIn('expires_in', token) + self.assertIn('scope', token) + self.assertTrue(self.mock_validator.client_authentication_required.called) + self.assertTrue(self.mock_validator.authenticate_client.called) + self.assertTrue(self.mock_validator.validate_code.called) + self.assertTrue(self.mock_validator.confirm_redirect_uri.called) + self.assertTrue(self.mock_validator.validate_grant_type.called) + self.assertTrue(self.mock_validator.invalidate_authorization_code.called) + + def test_create_token_response_without_refresh_token(self): + self.auth.refresh_token = False # Not to issue refresh token. + + bearer = BearerToken(self.mock_validator) + h, token, s = self.auth.create_token_response(self.request, bearer) + token = json.loads(token) + self.assertEqual(self.mock_validator.save_token.call_count, 1) + self.assertIn('access_token', token) + self.assertNotIn('refresh_token', token) + self.assertIn('expires_in', token) + self.assertIn('scope', token) + self.assertTrue(self.mock_validator.client_authentication_required.called) + self.assertTrue(self.mock_validator.authenticate_client.called) + self.assertTrue(self.mock_validator.validate_code.called) + self.assertTrue(self.mock_validator.confirm_redirect_uri.called) + self.assertTrue(self.mock_validator.validate_grant_type.called) + self.assertTrue(self.mock_validator.invalidate_authorization_code.called) + + def test_invalid_request(self): + del self.request.code + self.assertRaises(errors.InvalidRequestError, self.auth.validate_token_request, + self.request) + + def test_invalid_request_duplicates(self): + request = mock.MagicMock(wraps=self.request) + request.grant_type = 'authorization_code' + request.duplicate_params = ['client_id'] + self.assertRaises(errors.InvalidRequestError, self.auth.validate_token_request, + request) + + def test_authentication_required(self): + """ + ensure client_authentication_required() is properly called + """ + self.auth.validate_token_request(self.request) + self.mock_validator.client_authentication_required.assert_called_once_with(self.request) + + def test_authenticate_client(self): + self.mock_validator.authenticate_client.side_effect = None + self.mock_validator.authenticate_client.return_value = False + self.assertRaises(errors.InvalidClientError, self.auth.validate_token_request, + self.request) + + def test_client_id_missing(self): + self.mock_validator.authenticate_client.side_effect = None + request = mock.MagicMock(wraps=self.request) + request.grant_type = 'authorization_code' + del request.client.client_id + self.assertRaises(NotImplementedError, self.auth.validate_token_request, + request) + + def test_invalid_grant(self): + self.request.client = 'batman' + self.mock_validator.authenticate_client = self.set_client + self.mock_validator.validate_code.return_value = False + self.assertRaises(errors.InvalidGrantError, + self.auth.validate_token_request, self.request) + + def test_invalid_grant_type(self): + self.request.grant_type = 'foo' + self.assertRaises(errors.UnsupportedGrantTypeError, + self.auth.validate_token_request, self.request) + + def test_authenticate_client_id(self): + self.mock_validator.client_authentication_required.return_value = False + self.mock_validator.authenticate_client_id.return_value = False + self.request.state = 'abc' + self.assertRaises(errors.InvalidClientError, + self.auth.validate_token_request, self.request) + + def test_invalid_redirect_uri(self): + self.mock_validator.confirm_redirect_uri.return_value = False + self.assertRaises(errors.MismatchingRedirectURIError, + self.auth.validate_token_request, self.request) + + # PKCE validate_authorization_request + def test_pkce_challenge_missing(self): + self.mock_validator.is_pkce_required.return_value = True + self.assertRaises(errors.MissingCodeChallengeError, + self.auth.validate_authorization_request, self.request) + + def test_pkce_default_method(self): + for required in [True, False]: + self.mock_validator.is_pkce_required.return_value = required + self.request.code_challenge = "present" + _, ri = self.auth.validate_authorization_request(self.request) + self.assertIn("code_challenge", ri) + self.assertIn("code_challenge_method", ri) + self.assertEqual(ri["code_challenge"], "present") + self.assertEqual(ri["code_challenge_method"], "plain") + + def test_pkce_wrong_method(self): + for required in [True, False]: + self.mock_validator.is_pkce_required.return_value = required + self.request.code_challenge = "present" + self.request.code_challenge_method = "foobar" + self.assertRaises(errors.UnsupportedCodeChallengeMethodError, + self.auth.validate_authorization_request, self.request) + + # PKCE validate_token_request + def test_pkce_verifier_missing(self): + self.mock_validator.is_pkce_required.return_value = True + self.assertRaises(errors.MissingCodeVerifierError, + self.auth.validate_token_request, self.request) + + # PKCE validate_token_request + def test_pkce_required_verifier_missing_challenge_missing(self): + self.mock_validator.is_pkce_required.return_value = True + self.request.code_verifier = None + self.mock_validator.get_code_challenge.return_value = None + self.assertRaises(errors.MissingCodeVerifierError, + self.auth.validate_token_request, self.request) + + def test_pkce_required_verifier_missing_challenge_valid(self): + self.mock_validator.is_pkce_required.return_value = True + self.request.code_verifier = None + self.mock_validator.get_code_challenge.return_value = "foo" + self.assertRaises(errors.MissingCodeVerifierError, + self.auth.validate_token_request, self.request) + + def test_pkce_required_verifier_valid_challenge_missing(self): + self.mock_validator.is_pkce_required.return_value = True + self.request.code_verifier = "foobar" + self.mock_validator.get_code_challenge.return_value = None + self.assertRaises(errors.InvalidGrantError, + self.auth.validate_token_request, self.request) + + def test_pkce_required_verifier_valid_challenge_valid_method_valid(self): + self.mock_validator.is_pkce_required.return_value = True + self.request.code_verifier = "foobar" + self.mock_validator.get_code_challenge.return_value = "foobar" + self.mock_validator.get_code_challenge_method.return_value = "plain" + self.auth.validate_token_request(self.request) + + def test_pkce_required_verifier_invalid_challenge_valid_method_valid(self): + self.mock_validator.is_pkce_required.return_value = True + self.request.code_verifier = "foobar" + self.mock_validator.get_code_challenge.return_value = "raboof" + self.mock_validator.get_code_challenge_method.return_value = "plain" + self.assertRaises(errors.InvalidGrantError, + self.auth.validate_token_request, self.request) + + def test_pkce_required_verifier_valid_challenge_valid_method_wrong(self): + self.mock_validator.is_pkce_required.return_value = True + self.request.code_verifier = "present" + self.mock_validator.get_code_challenge.return_value = "foobar" + self.mock_validator.get_code_challenge_method.return_value = "cryptic_method" + self.assertRaises(errors.ServerError, + self.auth.validate_token_request, self.request) + + def test_pkce_verifier_valid_challenge_valid_method_missing(self): + self.mock_validator.is_pkce_required.return_value = True + self.request.code_verifier = "present" + self.mock_validator.get_code_challenge.return_value = "foobar" + self.mock_validator.get_code_challenge_method.return_value = None + self.assertRaises(errors.InvalidGrantError, + self.auth.validate_token_request, self.request) + + def test_pkce_optional_verifier_valid_challenge_missing(self): + self.mock_validator.is_pkce_required.return_value = False + self.request.code_verifier = "present" + self.mock_validator.get_code_challenge.return_value = None + self.auth.validate_token_request(self.request) + + def test_pkce_optional_verifier_missing_challenge_valid(self): + self.mock_validator.is_pkce_required.return_value = False + self.request.code_verifier = None + self.mock_validator.get_code_challenge.return_value = "foobar" + self.assertRaises(errors.MissingCodeVerifierError, + self.auth.validate_token_request, self.request) + + # PKCE functions + def test_wrong_code_challenge_method_plain(self): + self.assertFalse(authorization_code.code_challenge_method_plain("foo", "bar")) + + def test_correct_code_challenge_method_plain(self): + self.assertTrue(authorization_code.code_challenge_method_plain("foo", "foo")) + + def test_wrong_code_challenge_method_s256(self): + self.assertFalse(authorization_code.code_challenge_method_s256("foo", "bar")) + + def test_correct_code_challenge_method_s256(self): + # "abcd" as verifier gives a '+' to base64 + self.assertTrue( + authorization_code.code_challenge_method_s256("abcd", + "iNQmb9TmM40TuEX88olXnSCciXgjuSF9o-Fhk28DFYk") + ) + # "/" as verifier gives a '/' and '+' to base64 + self.assertTrue( + authorization_code.code_challenge_method_s256("/", + "il7asoJjJEMhngUeSt4tHVu8Zxx4EFG_FDeJfL3-oPE") + ) + # Example from PKCE RFCE + self.assertTrue( + authorization_code.code_challenge_method_s256("dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk", + "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM") + ) + + def test_code_modifier_called(self): + bearer = BearerToken(self.mock_validator) + code_modifier = mock.MagicMock(wraps=lambda grant, *a: grant) + self.auth.register_code_modifier(code_modifier) + self.auth.create_authorization_response(self.request, bearer) + code_modifier.assert_called_once() + + def test_hybrid_token_save(self): + bearer = BearerToken(self.mock_validator) + self.auth.register_code_modifier( + lambda grant, *a: dict(list(grant.items()) + [('access_token', 1)]) + ) + self.auth.create_authorization_response(self.request, bearer) + self.mock_validator.save_token.assert_called_once() + + # CORS + + def test_create_cors_headers(self): + bearer = BearerToken(self.mock_validator) + self.request.headers['origin'] = 'https://foo.bar' + self.mock_validator.is_origin_allowed.return_value = True + + headers = self.auth.create_token_response(self.request, bearer)[0] + self.assertEqual( + headers['Access-Control-Allow-Origin'], 'https://foo.bar' + ) + self.mock_validator.is_origin_allowed.assert_called_once_with( + 'abcdef', 'https://foo.bar', self.request + ) + + def test_create_cors_headers_no_origin(self): + bearer = BearerToken(self.mock_validator) + headers = self.auth.create_token_response(self.request, bearer)[0] + self.assertNotIn('Access-Control-Allow-Origin', headers) + self.mock_validator.is_origin_allowed.assert_not_called() + + def test_create_cors_headers_insecure_origin(self): + bearer = BearerToken(self.mock_validator) + self.request.headers['origin'] = 'http://foo.bar' + + headers = self.auth.create_token_response(self.request, bearer)[0] + self.assertNotIn('Access-Control-Allow-Origin', headers) + self.mock_validator.is_origin_allowed.assert_not_called() + + def test_create_cors_headers_invalid_origin(self): + bearer = BearerToken(self.mock_validator) + self.request.headers['origin'] = 'https://foo.bar' + self.mock_validator.is_origin_allowed.return_value = False + + headers = self.auth.create_token_response(self.request, bearer)[0] + self.assertNotIn('Access-Control-Allow-Origin', headers) + self.mock_validator.is_origin_allowed.assert_called_once_with( + 'abcdef', 'https://foo.bar', self.request + ) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_client_credentials.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_client_credentials.py new file mode 100644 index 0000000000..e9559c7931 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_client_credentials.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +import json +from unittest import mock + +from oauthlib.common import Request +from oauthlib.oauth2.rfc6749.grant_types import ClientCredentialsGrant +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from tests.unittest import TestCase + + +class ClientCredentialsGrantTest(TestCase): + + def setUp(self): + mock_client = mock.MagicMock() + mock_client.user.return_value = 'mocked user' + self.request = Request('http://a.b/path') + self.request.grant_type = 'client_credentials' + self.request.client = mock_client + self.request.scopes = ('mocked', 'scopes') + self.mock_validator = mock.MagicMock() + self.auth = ClientCredentialsGrant( + request_validator=self.mock_validator) + + def test_custom_auth_validators_unsupported(self): + authval1, authval2 = mock.Mock(), mock.Mock() + expected = ('ClientCredentialsGrant does not support authorization ' + 'validators. Use token validators instead.') + with self.assertRaises(ValueError) as caught: + ClientCredentialsGrant(self.mock_validator, pre_auth=[authval1]) + self.assertEqual(caught.exception.args[0], expected) + with self.assertRaises(ValueError) as caught: + ClientCredentialsGrant(self.mock_validator, post_auth=[authval2]) + self.assertEqual(caught.exception.args[0], expected) + with self.assertRaises(AttributeError): + self.auth.custom_validators.pre_auth.append(authval1) + with self.assertRaises(AttributeError): + self.auth.custom_validators.pre_auth.append(authval2) + + def test_custom_token_validators(self): + tknval1, tknval2 = mock.Mock(), mock.Mock() + self.auth.custom_validators.pre_token.append(tknval1) + self.auth.custom_validators.post_token.append(tknval2) + + bearer = BearerToken(self.mock_validator) + self.auth.create_token_response(self.request, bearer) + self.assertTrue(tknval1.called) + self.assertTrue(tknval2.called) + + def test_create_token_response(self): + bearer = BearerToken(self.mock_validator) + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + token = json.loads(body) + self.assertEqual(self.mock_validator.save_token.call_count, 1) + self.assertIn('access_token', token) + self.assertIn('token_type', token) + self.assertIn('expires_in', token) + self.assertIn('Content-Type', headers) + self.assertEqual(headers['Content-Type'], 'application/json') + + def test_error_response(self): + bearer = BearerToken(self.mock_validator) + self.mock_validator.authenticate_client.return_value = False + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + self.assertEqual(self.mock_validator.save_token.call_count, 0) + error_msg = json.loads(body) + self.assertIn('error', error_msg) + self.assertEqual(error_msg['error'], 'invalid_client') + self.assertIn('Content-Type', headers) + self.assertEqual(headers['Content-Type'], 'application/json') + + def test_validate_token_response(self): + # wrong grant type, scope + pass diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_implicit.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_implicit.py new file mode 100644 index 0000000000..1fb71a1dc9 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_implicit.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +from unittest import mock + +from oauthlib.common import Request +from oauthlib.oauth2.rfc6749.grant_types import ImplicitGrant +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from tests.unittest import TestCase + + +class ImplicitGrantTest(TestCase): + + def setUp(self): + mock_client = mock.MagicMock() + mock_client.user.return_value = 'mocked user' + self.request = Request('http://a.b/path') + self.request.scopes = ('hello', 'world') + self.request.client = mock_client + self.request.client_id = 'abcdef' + self.request.response_type = 'token' + self.request.state = 'xyz' + self.request.redirect_uri = 'https://b.c/p' + + self.mock_validator = mock.MagicMock() + self.auth = ImplicitGrant(request_validator=self.mock_validator) + + @mock.patch('oauthlib.common.generate_token') + def test_create_token_response(self, generate_token): + generate_token.return_value = '1234' + bearer = BearerToken(self.mock_validator, expires_in=1800) + h, b, s = self.auth.create_token_response(self.request, bearer) + correct_uri = 'https://b.c/p#access_token=1234&token_type=Bearer&expires_in=1800&state=xyz&scope=hello+world' + self.assertEqual(s, 302) + self.assertURLEqual(h['Location'], correct_uri, parse_fragment=True) + self.assertEqual(self.mock_validator.save_token.call_count, 1) + + correct_uri = 'https://b.c/p?access_token=1234&token_type=Bearer&expires_in=1800&state=xyz&scope=hello+world' + self.request.response_mode = 'query' + h, b, s = self.auth.create_token_response(self.request, bearer) + self.assertURLEqual(h['Location'], correct_uri) + + def test_custom_validators(self): + self.authval1, self.authval2 = mock.Mock(), mock.Mock() + self.tknval1, self.tknval2 = mock.Mock(), mock.Mock() + for val in (self.authval1, self.authval2): + val.return_value = {} + for val in (self.tknval1, self.tknval2): + val.return_value = None + self.auth.custom_validators.pre_token.append(self.tknval1) + self.auth.custom_validators.post_token.append(self.tknval2) + self.auth.custom_validators.pre_auth.append(self.authval1) + self.auth.custom_validators.post_auth.append(self.authval2) + + bearer = BearerToken(self.mock_validator) + self.auth.create_token_response(self.request, bearer) + self.assertTrue(self.tknval1.called) + self.assertTrue(self.tknval2.called) + self.assertTrue(self.authval1.called) + self.assertTrue(self.authval2.called) + + def test_error_response(self): + pass diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_refresh_token.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_refresh_token.py new file mode 100644 index 0000000000..581f2a4d6a --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_refresh_token.py @@ -0,0 +1,211 @@ +# -*- coding: utf-8 -*- +import json +from unittest import mock + +from oauthlib.common import Request +from oauthlib.oauth2.rfc6749 import errors +from oauthlib.oauth2.rfc6749.grant_types import RefreshTokenGrant +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from tests.unittest import TestCase + + +class RefreshTokenGrantTest(TestCase): + + def setUp(self): + mock_client = mock.MagicMock() + mock_client.user.return_value = 'mocked user' + self.request = Request('http://a.b/path') + self.request.grant_type = 'refresh_token' + self.request.refresh_token = 'lsdkfhj230' + self.request.client_id = 'abcdef' + self.request.client = mock_client + self.request.scope = 'foo' + self.mock_validator = mock.MagicMock() + self.auth = RefreshTokenGrant( + request_validator=self.mock_validator) + + def test_create_token_response(self): + self.mock_validator.get_original_scopes.return_value = ['foo', 'bar'] + bearer = BearerToken(self.mock_validator) + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + token = json.loads(body) + self.assertEqual(self.mock_validator.save_token.call_count, 1) + self.assertIn('access_token', token) + self.assertIn('token_type', token) + self.assertIn('expires_in', token) + self.assertEqual(token['scope'], 'foo') + + def test_custom_auth_validators_unsupported(self): + authval1, authval2 = mock.Mock(), mock.Mock() + expected = ('RefreshTokenGrant does not support authorization ' + 'validators. Use token validators instead.') + with self.assertRaises(ValueError) as caught: + RefreshTokenGrant(self.mock_validator, pre_auth=[authval1]) + self.assertEqual(caught.exception.args[0], expected) + with self.assertRaises(ValueError) as caught: + RefreshTokenGrant(self.mock_validator, post_auth=[authval2]) + self.assertEqual(caught.exception.args[0], expected) + with self.assertRaises(AttributeError): + self.auth.custom_validators.pre_auth.append(authval1) + with self.assertRaises(AttributeError): + self.auth.custom_validators.pre_auth.append(authval2) + + def test_custom_token_validators(self): + tknval1, tknval2 = mock.Mock(), mock.Mock() + self.auth.custom_validators.pre_token.append(tknval1) + self.auth.custom_validators.post_token.append(tknval2) + + bearer = BearerToken(self.mock_validator) + self.auth.create_token_response(self.request, bearer) + self.assertTrue(tknval1.called) + self.assertTrue(tknval2.called) + + def test_create_token_inherit_scope(self): + self.request.scope = None + self.mock_validator.get_original_scopes.return_value = ['foo', 'bar'] + bearer = BearerToken(self.mock_validator) + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + token = json.loads(body) + self.assertEqual(self.mock_validator.save_token.call_count, 1) + self.assertIn('access_token', token) + self.assertIn('token_type', token) + self.assertIn('expires_in', token) + self.assertEqual(token['scope'], 'foo bar') + + def test_create_token_within_original_scope(self): + self.mock_validator.get_original_scopes.return_value = ['baz'] + self.mock_validator.is_within_original_scope.return_value = True + bearer = BearerToken(self.mock_validator) + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + token = json.loads(body) + self.assertEqual(self.mock_validator.save_token.call_count, 1) + self.assertIn('access_token', token) + self.assertIn('token_type', token) + self.assertIn('expires_in', token) + self.assertEqual(token['scope'], 'foo') + + def test_invalid_scope(self): + self.mock_validator.get_original_scopes.return_value = ['baz'] + self.mock_validator.is_within_original_scope.return_value = False + bearer = BearerToken(self.mock_validator) + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + token = json.loads(body) + self.assertEqual(self.mock_validator.save_token.call_count, 0) + self.assertEqual(token['error'], 'invalid_scope') + self.assertEqual(status_code, 400) + + def test_invalid_token(self): + self.mock_validator.validate_refresh_token.return_value = False + bearer = BearerToken(self.mock_validator) + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + token = json.loads(body) + self.assertEqual(self.mock_validator.save_token.call_count, 0) + self.assertEqual(token['error'], 'invalid_grant') + self.assertEqual(status_code, 400) + + def test_invalid_client(self): + self.mock_validator.authenticate_client.return_value = False + bearer = BearerToken(self.mock_validator) + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + token = json.loads(body) + self.assertEqual(self.mock_validator.save_token.call_count, 0) + self.assertEqual(token['error'], 'invalid_client') + self.assertEqual(status_code, 401) + + def test_authentication_required(self): + """ + ensure client_authentication_required() is properly called + """ + self.mock_validator.authenticate_client.return_value = False + self.mock_validator.authenticate_client_id.return_value = False + self.request.code = 'waffles' + self.assertRaises(errors.InvalidClientError, self.auth.validate_token_request, + self.request) + self.mock_validator.client_authentication_required.assert_called_once_with(self.request) + + def test_invalid_grant_type(self): + self.request.grant_type = 'wrong_type' + self.assertRaises(errors.UnsupportedGrantTypeError, + self.auth.validate_token_request, self.request) + + def test_authenticate_client_id(self): + self.mock_validator.client_authentication_required.return_value = False + self.request.refresh_token = mock.MagicMock() + self.mock_validator.authenticate_client_id.return_value = False + self.assertRaises(errors.InvalidClientError, + self.auth.validate_token_request, self.request) + + def test_invalid_refresh_token(self): + # invalid refresh token + self.mock_validator.authenticate_client_id.return_value = True + self.mock_validator.validate_refresh_token.return_value = False + self.assertRaises(errors.InvalidGrantError, + self.auth.validate_token_request, self.request) + # no token provided + del self.request.refresh_token + self.assertRaises(errors.InvalidRequestError, + self.auth.validate_token_request, self.request) + + def test_invalid_scope_original_scopes_empty(self): + self.mock_validator.validate_refresh_token.return_value = True + self.mock_validator.is_within_original_scope.return_value = False + self.assertRaises(errors.InvalidScopeError, + self.auth.validate_token_request, self.request) + + def test_valid_token_request(self): + self.request.scope = 'foo bar' + self.mock_validator.get_original_scopes = mock.Mock() + self.mock_validator.get_original_scopes.return_value = 'foo bar baz' + self.auth.validate_token_request(self.request) + self.assertEqual(self.request.scopes, self.request.scope.split()) + # all ok but without request.scope + del self.request.scope + self.auth.validate_token_request(self.request) + self.assertEqual(self.request.scopes, 'foo bar baz'.split()) + + # CORS + + def test_create_cors_headers(self): + bearer = BearerToken(self.mock_validator) + self.request.headers['origin'] = 'https://foo.bar' + self.mock_validator.is_origin_allowed.return_value = True + + headers = self.auth.create_token_response(self.request, bearer)[0] + self.assertEqual( + headers['Access-Control-Allow-Origin'], 'https://foo.bar' + ) + self.mock_validator.is_origin_allowed.assert_called_once_with( + 'abcdef', 'https://foo.bar', self.request + ) + + def test_create_cors_headers_no_origin(self): + bearer = BearerToken(self.mock_validator) + headers = self.auth.create_token_response(self.request, bearer)[0] + self.assertNotIn('Access-Control-Allow-Origin', headers) + self.mock_validator.is_origin_allowed.assert_not_called() + + def test_create_cors_headers_insecure_origin(self): + bearer = BearerToken(self.mock_validator) + self.request.headers['origin'] = 'http://foo.bar' + + headers = self.auth.create_token_response(self.request, bearer)[0] + self.assertNotIn('Access-Control-Allow-Origin', headers) + self.mock_validator.is_origin_allowed.assert_not_called() + + def test_create_cors_headers_invalid_origin(self): + bearer = BearerToken(self.mock_validator) + self.request.headers['origin'] = 'https://foo.bar' + self.mock_validator.is_origin_allowed.return_value = False + + headers = self.auth.create_token_response(self.request, bearer)[0] + self.assertNotIn('Access-Control-Allow-Origin', headers) + self.mock_validator.is_origin_allowed.assert_called_once_with( + 'abcdef', 'https://foo.bar', self.request + ) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_resource_owner_password.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_resource_owner_password.py new file mode 100644 index 0000000000..294e27be35 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_resource_owner_password.py @@ -0,0 +1,156 @@ +# -*- coding: utf-8 -*- +import json +from unittest import mock + +from oauthlib.common import Request +from oauthlib.oauth2.rfc6749 import errors +from oauthlib.oauth2.rfc6749.grant_types import ( + ResourceOwnerPasswordCredentialsGrant, +) +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from tests.unittest import TestCase + + +class ResourceOwnerPasswordCredentialsGrantTest(TestCase): + + def setUp(self): + mock_client = mock.MagicMock() + mock_client.user.return_value = 'mocked user' + self.request = Request('http://a.b/path') + self.request.grant_type = 'password' + self.request.username = 'john' + self.request.password = 'doe' + self.request.client = mock_client + self.request.scopes = ('mocked', 'scopes') + self.mock_validator = mock.MagicMock() + self.auth = ResourceOwnerPasswordCredentialsGrant( + request_validator=self.mock_validator) + + def set_client(self, request, *args, **kwargs): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + def test_create_token_response(self): + bearer = BearerToken(self.mock_validator) + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + token = json.loads(body) + self.assertEqual(self.mock_validator.save_token.call_count, 1) + self.assertIn('access_token', token) + self.assertIn('token_type', token) + self.assertIn('expires_in', token) + self.assertIn('refresh_token', token) + # ensure client_authentication_required() is properly called + self.mock_validator.client_authentication_required.assert_called_once_with(self.request) + # fail client authentication + self.mock_validator.reset_mock() + self.mock_validator.validate_user.return_value = True + self.mock_validator.authenticate_client.return_value = False + status_code = self.auth.create_token_response(self.request, bearer)[2] + self.assertEqual(status_code, 401) + self.assertEqual(self.mock_validator.save_token.call_count, 0) + + # mock client_authentication_required() returning False then fail + self.mock_validator.reset_mock() + self.mock_validator.client_authentication_required.return_value = False + self.mock_validator.authenticate_client_id.return_value = False + status_code = self.auth.create_token_response(self.request, bearer)[2] + self.assertEqual(status_code, 401) + self.assertEqual(self.mock_validator.save_token.call_count, 0) + + def test_create_token_response_without_refresh_token(self): + # self.auth.refresh_token = False so we don't generate a refresh token + self.auth = ResourceOwnerPasswordCredentialsGrant( + request_validator=self.mock_validator, refresh_token=False) + bearer = BearerToken(self.mock_validator) + headers, body, status_code = self.auth.create_token_response( + self.request, bearer) + token = json.loads(body) + self.assertEqual(self.mock_validator.save_token.call_count, 1) + self.assertIn('access_token', token) + self.assertIn('token_type', token) + self.assertIn('expires_in', token) + # ensure no refresh token is generated + self.assertNotIn('refresh_token', token) + # ensure client_authentication_required() is properly called + self.mock_validator.client_authentication_required.assert_called_once_with(self.request) + # fail client authentication + self.mock_validator.reset_mock() + self.mock_validator.validate_user.return_value = True + self.mock_validator.authenticate_client.return_value = False + status_code = self.auth.create_token_response(self.request, bearer)[2] + self.assertEqual(status_code, 401) + self.assertEqual(self.mock_validator.save_token.call_count, 0) + # mock client_authentication_required() returning False then fail + self.mock_validator.reset_mock() + self.mock_validator.client_authentication_required.return_value = False + self.mock_validator.authenticate_client_id.return_value = False + status_code = self.auth.create_token_response(self.request, bearer)[2] + self.assertEqual(status_code, 401) + self.assertEqual(self.mock_validator.save_token.call_count, 0) + + def test_custom_auth_validators_unsupported(self): + authval1, authval2 = mock.Mock(), mock.Mock() + expected = ('ResourceOwnerPasswordCredentialsGrant does not ' + 'support authorization validators. Use token ' + 'validators instead.') + with self.assertRaises(ValueError) as caught: + ResourceOwnerPasswordCredentialsGrant(self.mock_validator, + pre_auth=[authval1]) + self.assertEqual(caught.exception.args[0], expected) + with self.assertRaises(ValueError) as caught: + ResourceOwnerPasswordCredentialsGrant(self.mock_validator, + post_auth=[authval2]) + self.assertEqual(caught.exception.args[0], expected) + with self.assertRaises(AttributeError): + self.auth.custom_validators.pre_auth.append(authval1) + with self.assertRaises(AttributeError): + self.auth.custom_validators.pre_auth.append(authval2) + + def test_custom_token_validators(self): + tknval1, tknval2 = mock.Mock(), mock.Mock() + self.auth.custom_validators.pre_token.append(tknval1) + self.auth.custom_validators.post_token.append(tknval2) + + bearer = BearerToken(self.mock_validator) + self.auth.create_token_response(self.request, bearer) + self.assertTrue(tknval1.called) + self.assertTrue(tknval2.called) + + def test_error_response(self): + pass + + def test_scopes(self): + pass + + def test_invalid_request_missing_params(self): + del self.request.grant_type + self.assertRaises(errors.InvalidRequestError, self.auth.validate_token_request, + self.request) + + def test_invalid_request_duplicates(self): + request = mock.MagicMock(wraps=self.request) + request.duplicate_params = ['scope'] + self.assertRaises(errors.InvalidRequestError, self.auth.validate_token_request, + request) + + def test_invalid_grant_type(self): + self.request.grant_type = 'foo' + self.assertRaises(errors.UnsupportedGrantTypeError, + self.auth.validate_token_request, self.request) + + def test_invalid_user(self): + self.mock_validator.validate_user.return_value = False + self.assertRaises(errors.InvalidGrantError, self.auth.validate_token_request, + self.request) + + def test_client_id_missing(self): + del self.request.client.client_id + self.assertRaises(NotImplementedError, self.auth.validate_token_request, + self.request) + + def test_valid_token_request(self): + self.mock_validator.validate_grant_type.return_value = True + self.auth.validate_token_request(self.request) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/test_parameters.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_parameters.py new file mode 100644 index 0000000000..cd8c9e952b --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_parameters.py @@ -0,0 +1,304 @@ +from unittest.mock import patch + +from oauthlib import signals +from oauthlib.oauth2.rfc6749.errors import * +from oauthlib.oauth2.rfc6749.parameters import * + +from tests.unittest import TestCase + + +@patch('time.time', new=lambda: 1000) +class ParameterTests(TestCase): + + state = 'xyz' + auth_base = { + 'uri': 'https://server.example.com/authorize', + 'client_id': 's6BhdRkqt3', + 'redirect_uri': 'https://client.example.com/cb', + 'state': state, + 'scope': 'photos' + } + list_scope = ['list', 'of', 'scopes'] + + auth_grant = {'response_type': 'code'} + auth_grant_pkce = {'response_type': 'code', 'code_challenge': "code_challenge", + 'code_challenge_method': 'code_challenge_method'} + auth_grant_list_scope = {} + auth_implicit = {'response_type': 'token', 'extra': 'extra'} + auth_implicit_list_scope = {} + + def setUp(self): + self.auth_grant.update(self.auth_base) + self.auth_grant_pkce.update(self.auth_base) + self.auth_implicit.update(self.auth_base) + self.auth_grant_list_scope.update(self.auth_grant) + self.auth_grant_list_scope['scope'] = self.list_scope + self.auth_implicit_list_scope.update(self.auth_implicit) + self.auth_implicit_list_scope['scope'] = self.list_scope + + auth_base_uri = ('https://server.example.com/authorize?response_type={0}' + '&client_id=s6BhdRkqt3&redirect_uri=https%3A%2F%2F' + 'client.example.com%2Fcb&scope={1}&state={2}{3}') + + auth_base_uri_pkce = ('https://server.example.com/authorize?response_type={0}' + '&client_id=s6BhdRkqt3&redirect_uri=https%3A%2F%2F' + 'client.example.com%2Fcb&scope={1}&state={2}{3}&code_challenge={4}' + '&code_challenge_method={5}') + + auth_grant_uri = auth_base_uri.format('code', 'photos', state, '') + auth_grant_uri_pkce = auth_base_uri_pkce.format('code', 'photos', state, '', 'code_challenge', + 'code_challenge_method') + auth_grant_uri_list_scope = auth_base_uri.format('code', 'list+of+scopes', state, '') + auth_implicit_uri = auth_base_uri.format('token', 'photos', state, '&extra=extra') + auth_implicit_uri_list_scope = auth_base_uri.format('token', 'list+of+scopes', state, '&extra=extra') + + grant_body = { + 'grant_type': 'authorization_code', + 'code': 'SplxlOBeZQQYbYS6WxSbIA', + 'redirect_uri': 'https://client.example.com/cb' + } + grant_body_pkce = { + 'grant_type': 'authorization_code', + 'code': 'SplxlOBeZQQYbYS6WxSbIA', + 'redirect_uri': 'https://client.example.com/cb', + 'code_verifier': 'code_verifier' + } + grant_body_scope = {'scope': 'photos'} + grant_body_list_scope = {'scope': list_scope} + auth_grant_body = ('grant_type=authorization_code&' + 'code=SplxlOBeZQQYbYS6WxSbIA&' + 'redirect_uri=https%3A%2F%2Fclient.example.com%2Fcb') + auth_grant_body_pkce = ('grant_type=authorization_code&' + 'code=SplxlOBeZQQYbYS6WxSbIA&' + 'redirect_uri=https%3A%2F%2Fclient.example.com%2Fcb' + '&code_verifier=code_verifier') + auth_grant_body_scope = auth_grant_body + '&scope=photos' + auth_grant_body_list_scope = auth_grant_body + '&scope=list+of+scopes' + + pwd_body = { + 'grant_type': 'password', + 'username': 'johndoe', + 'password': 'A3ddj3w' + } + password_body = 'grant_type=password&username=johndoe&password=A3ddj3w' + + cred_grant = {'grant_type': 'client_credentials'} + cred_body = 'grant_type=client_credentials' + + grant_response = 'https://client.example.com/cb?code=SplxlOBeZQQYbYS6WxSbIA&state=xyz' + grant_dict = {'code': 'SplxlOBeZQQYbYS6WxSbIA', 'state': state} + + error_nocode = 'https://client.example.com/cb?state=xyz' + error_nostate = 'https://client.example.com/cb?code=SplxlOBeZQQYbYS6WxSbIA' + error_wrongstate = 'https://client.example.com/cb?code=SplxlOBeZQQYbYS6WxSbIA&state=abc' + error_denied = 'https://client.example.com/cb?error=access_denied&state=xyz' + error_invalid = 'https://client.example.com/cb?error=invalid_request&state=xyz' + + implicit_base = 'https://example.com/cb#access_token=2YotnFZFEjr1zCsicMWpAA&scope=abc&' + implicit_response = implicit_base + 'state={}&token_type=example&expires_in=3600'.format(state) + implicit_notype = implicit_base + 'state={}&expires_in=3600'.format(state) + implicit_wrongstate = implicit_base + 'state={}&token_type=exampleexpires_in=3600'.format('invalid') + implicit_nostate = implicit_base + 'token_type=example&expires_in=3600' + implicit_notoken = 'https://example.com/cb#state=xyz&token_type=example&expires_in=3600' + + implicit_dict = { + 'access_token': '2YotnFZFEjr1zCsicMWpAA', + 'state': state, + 'token_type': 'example', + 'expires_in': 3600, + 'expires_at': 4600, + 'scope': ['abc'] + } + + json_response = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",' + ' "token_type": "example",' + ' "expires_in": 3600,' + ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",' + ' "example_parameter": "example_value",' + ' "scope":"abc def"}') + json_response_noscope = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",' + ' "token_type": "example",' + ' "expires_in": 3600,' + ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",' + ' "example_parameter": "example_value" }') + json_response_noexpire = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",' + ' "token_type": "example",' + ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",' + ' "example_parameter": "example_value"}') + json_response_expirenull = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",' + ' "token_type": "example",' + ' "expires_in": null,' + ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",' + ' "example_parameter": "example_value"}') + + json_custom_error = '{ "error": "incorrect_client_credentials" }' + json_error = '{ "error": "access_denied" }' + + json_notoken = ('{ "token_type": "example",' + ' "expires_in": 3600,' + ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",' + ' "example_parameter": "example_value" }') + + json_notype = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",' + ' "expires_in": 3600,' + ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",' + ' "example_parameter": "example_value" }') + + json_dict = { + 'access_token': '2YotnFZFEjr1zCsicMWpAA', + 'token_type': 'example', + 'expires_in': 3600, + 'expires_at': 4600, + 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA', + 'example_parameter': 'example_value', + 'scope': ['abc', 'def'] + } + + json_noscope_dict = { + 'access_token': '2YotnFZFEjr1zCsicMWpAA', + 'token_type': 'example', + 'expires_in': 3600, + 'expires_at': 4600, + 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA', + 'example_parameter': 'example_value' + } + + json_noexpire_dict = { + 'access_token': '2YotnFZFEjr1zCsicMWpAA', + 'token_type': 'example', + 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA', + 'example_parameter': 'example_value' + } + + json_notype_dict = { + 'access_token': '2YotnFZFEjr1zCsicMWpAA', + 'expires_in': 3600, + 'expires_at': 4600, + 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA', + 'example_parameter': 'example_value', + } + + url_encoded_response = ('access_token=2YotnFZFEjr1zCsicMWpAA' + '&token_type=example' + '&expires_in=3600' + '&refresh_token=tGzv3JOkF0XG5Qx2TlKWIA' + '&example_parameter=example_value' + '&scope=abc def') + + url_encoded_error = 'error=access_denied' + + url_encoded_notoken = ('token_type=example' + '&expires_in=3600' + '&refresh_token=tGzv3JOkF0XG5Qx2TlKWIA' + '&example_parameter=example_value') + + + def test_prepare_grant_uri(self): + """Verify correct authorization URI construction.""" + self.assertURLEqual(prepare_grant_uri(**self.auth_grant), self.auth_grant_uri) + self.assertURLEqual(prepare_grant_uri(**self.auth_grant_list_scope), self.auth_grant_uri_list_scope) + self.assertURLEqual(prepare_grant_uri(**self.auth_implicit), self.auth_implicit_uri) + self.assertURLEqual(prepare_grant_uri(**self.auth_implicit_list_scope), self.auth_implicit_uri_list_scope) + self.assertURLEqual(prepare_grant_uri(**self.auth_grant_pkce), self.auth_grant_uri_pkce) + + def test_prepare_token_request(self): + """Verify correct access token request body construction.""" + self.assertFormBodyEqual(prepare_token_request(**self.grant_body), self.auth_grant_body) + self.assertFormBodyEqual(prepare_token_request(**self.pwd_body), self.password_body) + self.assertFormBodyEqual(prepare_token_request(**self.cred_grant), self.cred_body) + self.assertFormBodyEqual(prepare_token_request(**self.grant_body_pkce), self.auth_grant_body_pkce) + + def test_grant_response(self): + """Verify correct parameter parsing and validation for auth code responses.""" + params = parse_authorization_code_response(self.grant_response) + self.assertEqual(params, self.grant_dict) + params = parse_authorization_code_response(self.grant_response, state=self.state) + self.assertEqual(params, self.grant_dict) + + self.assertRaises(MissingCodeError, parse_authorization_code_response, + self.error_nocode) + self.assertRaises(AccessDeniedError, parse_authorization_code_response, + self.error_denied) + self.assertRaises(InvalidRequestFatalError, parse_authorization_code_response, + self.error_invalid) + self.assertRaises(MismatchingStateError, parse_authorization_code_response, + self.error_nostate, state=self.state) + self.assertRaises(MismatchingStateError, parse_authorization_code_response, + self.error_wrongstate, state=self.state) + + def test_implicit_token_response(self): + """Verify correct parameter parsing and validation for implicit responses.""" + self.assertEqual(parse_implicit_response(self.implicit_response), + self.implicit_dict) + self.assertRaises(MissingTokenError, parse_implicit_response, + self.implicit_notoken) + self.assertRaises(ValueError, parse_implicit_response, + self.implicit_nostate, state=self.state) + self.assertRaises(ValueError, parse_implicit_response, + self.implicit_wrongstate, state=self.state) + + def test_custom_json_error(self): + self.assertRaises(CustomOAuth2Error, parse_token_response, self.json_custom_error) + + def test_json_token_response(self): + """Verify correct parameter parsing and validation for token responses. """ + self.assertEqual(parse_token_response(self.json_response), self.json_dict) + self.assertRaises(AccessDeniedError, parse_token_response, self.json_error) + self.assertRaises(MissingTokenError, parse_token_response, self.json_notoken) + + self.assertEqual(parse_token_response(self.json_response_noscope, + scope=['all', 'the', 'scopes']), self.json_noscope_dict) + self.assertEqual(parse_token_response(self.json_response_noexpire), self.json_noexpire_dict) + self.assertEqual(parse_token_response(self.json_response_expirenull), self.json_noexpire_dict) + + scope_changes_recorded = [] + def record_scope_change(sender, message, old, new): + scope_changes_recorded.append((message, old, new)) + + os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '1' + signals.scope_changed.connect(record_scope_change) + try: + parse_token_response(self.json_response, scope='aaa') + self.assertEqual(len(scope_changes_recorded), 1) + message, old, new = scope_changes_recorded[0] + for scope in new + old: + self.assertIn(scope, message) + self.assertEqual(old, ['aaa']) + self.assertEqual(set(new), {'abc', 'def'}) + finally: + signals.scope_changed.disconnect(record_scope_change) + del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] + + + def test_json_token_notype(self): + """Verify strict token type parsing only when configured. """ + self.assertEqual(parse_token_response(self.json_notype), self.json_notype_dict) + try: + os.environ['OAUTHLIB_STRICT_TOKEN_TYPE'] = '1' + self.assertRaises(MissingTokenTypeError, parse_token_response, self.json_notype) + finally: + del os.environ['OAUTHLIB_STRICT_TOKEN_TYPE'] + + def test_url_encoded_token_response(self): + """Verify fallback parameter parsing and validation for token responses. """ + self.assertEqual(parse_token_response(self.url_encoded_response), self.json_dict) + self.assertRaises(AccessDeniedError, parse_token_response, self.url_encoded_error) + self.assertRaises(MissingTokenError, parse_token_response, self.url_encoded_notoken) + + scope_changes_recorded = [] + def record_scope_change(sender, message, old, new): + scope_changes_recorded.append((message, old, new)) + + os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '1' + signals.scope_changed.connect(record_scope_change) + try: + token = parse_token_response(self.url_encoded_response, scope='aaa') + self.assertEqual(len(scope_changes_recorded), 1) + message, old, new = scope_changes_recorded[0] + for scope in new + old: + self.assertIn(scope, message) + self.assertEqual(old, ['aaa']) + self.assertEqual(set(new), {'abc', 'def'}) + finally: + signals.scope_changed.disconnect(record_scope_change) + del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/test_request_validator.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_request_validator.py new file mode 100644 index 0000000000..7a8d06b668 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_request_validator.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +from oauthlib.oauth2 import RequestValidator + +from tests.unittest import TestCase + + +class RequestValidatorTest(TestCase): + + def test_method_contracts(self): + v = RequestValidator() + self.assertRaises(NotImplementedError, v.authenticate_client, 'r') + self.assertRaises(NotImplementedError, v.authenticate_client_id, + 'client_id', 'r') + self.assertRaises(NotImplementedError, v.confirm_redirect_uri, + 'client_id', 'code', 'redirect_uri', 'client', 'request') + self.assertRaises(NotImplementedError, v.get_default_redirect_uri, + 'client_id', 'request') + self.assertRaises(NotImplementedError, v.get_default_scopes, + 'client_id', 'request') + self.assertRaises(NotImplementedError, v.get_original_scopes, + 'refresh_token', 'request') + self.assertFalse(v.is_within_original_scope( + ['scope'], 'refresh_token', 'request')) + self.assertRaises(NotImplementedError, v.invalidate_authorization_code, + 'client_id', 'code', 'request') + self.assertRaises(NotImplementedError, v.save_authorization_code, + 'client_id', 'code', 'request') + self.assertRaises(NotImplementedError, v.save_bearer_token, + 'token', 'request') + self.assertRaises(NotImplementedError, v.validate_bearer_token, + 'token', 'scopes', 'request') + self.assertRaises(NotImplementedError, v.validate_client_id, + 'client_id', 'request') + self.assertRaises(NotImplementedError, v.validate_code, + 'client_id', 'code', 'client', 'request') + self.assertRaises(NotImplementedError, v.validate_grant_type, + 'client_id', 'grant_type', 'client', 'request') + self.assertRaises(NotImplementedError, v.validate_redirect_uri, + 'client_id', 'redirect_uri', 'request') + self.assertRaises(NotImplementedError, v.validate_refresh_token, + 'refresh_token', 'client', 'request') + self.assertRaises(NotImplementedError, v.validate_response_type, + 'client_id', 'response_type', 'client', 'request') + self.assertRaises(NotImplementedError, v.validate_scopes, + 'client_id', 'scopes', 'client', 'request') + self.assertRaises(NotImplementedError, v.validate_user, + 'username', 'password', 'client', 'request') + self.assertTrue(v.client_authentication_required('r')) + self.assertFalse( + v.is_origin_allowed('client_id', 'https://foo.bar', 'r') + ) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/test_server.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_server.py new file mode 100644 index 0000000000..94af37e56b --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_server.py @@ -0,0 +1,391 @@ +# -*- coding: utf-8 -*- +import json +from unittest import mock + +from oauthlib import common +from oauthlib.oauth2.rfc6749 import errors, tokens +from oauthlib.oauth2.rfc6749.endpoints import Server +from oauthlib.oauth2.rfc6749.endpoints.authorization import ( + AuthorizationEndpoint, +) +from oauthlib.oauth2.rfc6749.endpoints.resource import ResourceEndpoint +from oauthlib.oauth2.rfc6749.endpoints.token import TokenEndpoint +from oauthlib.oauth2.rfc6749.grant_types import ( + AuthorizationCodeGrant, ClientCredentialsGrant, ImplicitGrant, + ResourceOwnerPasswordCredentialsGrant, +) + +from tests.unittest import TestCase + + +class AuthorizationEndpointTest(TestCase): + + def setUp(self): + self.mock_validator = mock.MagicMock() + self.mock_validator.get_code_challenge.return_value = None + self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) + auth_code = AuthorizationCodeGrant( + request_validator=self.mock_validator) + auth_code.save_authorization_code = mock.MagicMock() + implicit = ImplicitGrant( + request_validator=self.mock_validator) + implicit.save_token = mock.MagicMock() + + response_types = { + 'code': auth_code, + 'token': implicit, + 'none': auth_code + } + self.expires_in = 1800 + token = tokens.BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) + self.endpoint = AuthorizationEndpoint( + default_response_type='code', + default_token_type=token, + response_types=response_types + ) + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_authorization_grant(self): + uri = 'http://i.b/l?response_type=code&client_id=me&scope=all+of+them&state=xyz' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?code=abc&state=xyz') + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_implicit_grant(self): + uri = 'http://i.b/l?response_type=token&client_id=me&scope=all+of+them&state=xyz' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me#access_token=abc&expires_in=' + str(self.expires_in) + '&token_type=Bearer&state=xyz&scope=all+of+them', parse_fragment=True) + + def test_none_grant(self): + uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them&state=xyz' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?state=xyz', parse_fragment=True) + self.assertIsNone(body) + self.assertEqual(status_code, 302) + + # and without the state parameter + uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me', parse_fragment=True) + self.assertIsNone(body) + self.assertEqual(status_code, 302) + + def test_missing_type(self): + uri = 'http://i.b/l?client_id=me&scope=all+of+them' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + self.mock_validator.validate_request = mock.MagicMock( + side_effect=errors.InvalidRequestError()) + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?error=invalid_request&error_description=Missing+response_type+parameter.') + + def test_invalid_type(self): + uri = 'http://i.b/l?response_type=invalid&client_id=me&scope=all+of+them' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + self.mock_validator.validate_request = mock.MagicMock( + side_effect=errors.UnsupportedResponseTypeError()) + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?error=unsupported_response_type') + + +class TokenEndpointTest(TestCase): + + def setUp(self): + def set_user(request): + request.user = mock.MagicMock() + request.client = mock.MagicMock() + request.client.client_id = 'mocked_client_id' + return True + + self.mock_validator = mock.MagicMock() + self.mock_validator.authenticate_client.side_effect = set_user + self.mock_validator.get_code_challenge.return_value = None + self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) + auth_code = AuthorizationCodeGrant( + request_validator=self.mock_validator) + password = ResourceOwnerPasswordCredentialsGrant( + request_validator=self.mock_validator) + client = ClientCredentialsGrant( + request_validator=self.mock_validator) + supported_types = { + 'authorization_code': auth_code, + 'password': password, + 'client_credentials': client, + } + self.expires_in = 1800 + token = tokens.BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) + self.endpoint = TokenEndpoint( + 'authorization_code', + default_token_type=token, + grant_types=supported_types + ) + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_authorization_grant(self): + body = 'grant_type=authorization_code&code=abc&scope=all+of+them' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + token = { + 'token_type': 'Bearer', + 'expires_in': self.expires_in, + 'access_token': 'abc', + 'refresh_token': 'abc', + 'scope': 'all of them' + } + self.assertEqual(json.loads(body), token) + + body = 'grant_type=authorization_code&code=abc' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + token = { + 'token_type': 'Bearer', + 'expires_in': self.expires_in, + 'access_token': 'abc', + 'refresh_token': 'abc' + } + self.assertEqual(json.loads(body), token) + + # try with additional custom variables + body = 'grant_type=authorization_code&code=abc&state=foobar' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + self.assertEqual(json.loads(body), token) + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_password_grant(self): + body = 'grant_type=password&username=a&password=hello&scope=all+of+them' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + token = { + 'token_type': 'Bearer', + 'expires_in': self.expires_in, + 'access_token': 'abc', + 'refresh_token': 'abc', + 'scope': 'all of them', + } + self.assertEqual(json.loads(body), token) + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_client_grant(self): + body = 'grant_type=client_credentials&scope=all+of+them' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + token = { + 'token_type': 'Bearer', + 'expires_in': self.expires_in, + 'access_token': 'abc', + 'scope': 'all of them', + } + self.assertEqual(json.loads(body), token) + + def test_missing_type(self): + _, body, _ = self.endpoint.create_token_response('', body='') + token = {'error': 'unsupported_grant_type'} + self.assertEqual(json.loads(body), token) + + def test_invalid_type(self): + body = 'grant_type=invalid' + _, body, _ = self.endpoint.create_token_response('', body=body) + token = {'error': 'unsupported_grant_type'} + self.assertEqual(json.loads(body), token) + + +class SignedTokenEndpointTest(TestCase): + + def setUp(self): + self.expires_in = 1800 + + def set_user(request): + request.user = mock.MagicMock() + request.client = mock.MagicMock() + request.client.client_id = 'mocked_client_id' + return True + + self.mock_validator = mock.MagicMock() + self.mock_validator.get_code_challenge.return_value = None + self.mock_validator.authenticate_client.side_effect = set_user + self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) + + self.private_pem = """ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEA6TtDhWGwzEOWZP6m/zHoZnAPLABfetvoMPmxPGjFjtDuMRPv +EvI1sbixZBjBtdnc5rTtHUUQ25Am3JzwPRGo5laMGbj1pPyCPxlVi9LK82HQNX0B +YK7tZtVfDHElQA7F4v3j9d3rad4O9/n+lyGIQ0tT7yQcBm2A8FEaP0bZYCLMjwMN +WfaVLE8eXHyv+MfpNNLI9wttLxygKYM48I3NwsFuJgOa/KuodXaAmf8pJnx8t1Wn +nxvaYXFiUn/TxmhM/qhemPa6+0nqq+aWV5eT7xn4K/ghLgNs09v6Yge0pmPl9Oz+ ++bjJ+aKRnAmwCOY8/5U5EilAiUOeBoO9+8OXtwIDAQABAoIBAGFTTbXXMkPK4HN8 +oItVdDlrAanG7hECuz3UtFUVE3upS/xG6TjqweVLwRqYCh2ssDXFwjy4mXRGDzF4 +e/e/6s9Txlrlh/w1MtTJ6ZzTdcViR9RKOczysjZ7S5KRlI3KnGFAuWPcG2SuOWjZ +dZfzcj1Crd/ZHajBAVFHRsCo/ATVNKbTRprFfb27xKpQ2BwH/GG781sLE3ZVNIhs +aRRaED4622kI1E/WXws2qQMqbFKzo0m1tPbLb3Z89WgZJ/tRQwuDype1Vfm7k6oX +xfbp3948qSe/yWKRlMoPkleji/WxPkSIalzWSAi9ziN/0Uzhe65FURgrfHL3XR1A +B8UR+aECgYEA7NPQZV4cAikk02Hv65JgISofqV49P8MbLXk8sdnI1n7Mj10TgzU3 +lyQGDEX4hqvT0bTXe4KAOxQZx9wumu05ejfzhdtSsEm6ptGHyCdmYDQeV0C/pxDX +JNCK8XgMku2370XG0AnyBCT7NGlgtDcNCQufcesF2gEuoKiXg6Zjo7sCgYEA/Bzs +9fWGZZnSsMSBSW2OYbFuhF3Fne0HcxXQHipl0Rujc/9g0nccwqKGizn4fGOE7a8F +usQgJoeGcinL7E9OEP/uQ9VX1C9RNVjIxP1O5/Guw1zjxQQYetOvbPhN2QhD1Ye7 +0TRKrW1BapcjwLpFQlVg1ZeTPOi5lv24W/wX9jUCgYEAkrMSX/hPuTbrTNVZ3L6r +NV/2hN+PaTPeXei/pBuXwOaCqDurnpcUfFcgN/IP5LwDVd+Dq0pHTFFDNv45EFbq +R77o5n3ZVsIVEMiyJ1XgoK8oLDw7e61+15smtjT69Piz+09pu+ytMcwGn4y3Dmsb +dALzHYnL8iLRU0ubrz0ec4kCgYAJiVKRTzNBPptQom49h85d9ac3jJCAE8o3WTjh +Gzt0uHXrWlqgO280EY/DTnMOyXjqwLcXxHlu26uDP/99tdY/IF8z46sJ1KxetzgI +84f7kBHLRAU9m5UNeFpnZdEUB5MBTbwWAsNcYgiabpMkpCcghjg+fBhOsoLqqjhC +CnwhjQKBgQDkv0QTdyBU84TE8J0XY3eLQwXbrvG2yD5A2ntN3PyxGEneX5WTJGMZ +xJxwaFYQiDS3b9E7b8Q5dg8qa5Y1+epdhx3cuQAWPm+AoHKshDfbRve4txBDQAqh +c6MxSWgsa+2Ld5SWSNbGtpPcmEM3Fl5ttMCNCKtNc0UE16oHwaPAIw== +-----END RSA PRIVATE KEY----- + """ + + self.public_pem = """ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA6TtDhWGwzEOWZP6m/zHo +ZnAPLABfetvoMPmxPGjFjtDuMRPvEvI1sbixZBjBtdnc5rTtHUUQ25Am3JzwPRGo +5laMGbj1pPyCPxlVi9LK82HQNX0BYK7tZtVfDHElQA7F4v3j9d3rad4O9/n+lyGI +Q0tT7yQcBm2A8FEaP0bZYCLMjwMNWfaVLE8eXHyv+MfpNNLI9wttLxygKYM48I3N +wsFuJgOa/KuodXaAmf8pJnx8t1WnnxvaYXFiUn/TxmhM/qhemPa6+0nqq+aWV5eT +7xn4K/ghLgNs09v6Yge0pmPl9Oz++bjJ+aKRnAmwCOY8/5U5EilAiUOeBoO9+8OX +twIDAQAB +-----END PUBLIC KEY----- + """ + + signed_token = tokens.signed_token_generator(self.private_pem, + user_id=123) + self.endpoint = Server( + self.mock_validator, + token_expires_in=self.expires_in, + token_generator=signed_token, + refresh_token_generator=tokens.random_token_generator + ) + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_authorization_grant(self): + body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&scope=all+of+them' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + body = json.loads(body) + token = { + 'token_type': 'Bearer', + 'expires_in': self.expires_in, + 'access_token': body['access_token'], + 'refresh_token': 'abc', + 'scope': 'all of them' + } + self.assertEqual(body, token) + + body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + body = json.loads(body) + token = { + 'token_type': 'Bearer', + 'expires_in': self.expires_in, + 'access_token': body['access_token'], + 'refresh_token': 'abc' + } + self.assertEqual(body, token) + + # try with additional custom variables + body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&state=foobar' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + body = json.loads(body) + token = { + 'token_type': 'Bearer', + 'expires_in': self.expires_in, + 'access_token': body['access_token'], + 'refresh_token': 'abc' + } + self.assertEqual(body, token) + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_password_grant(self): + body = 'grant_type=password&username=a&password=hello&scope=all+of+them' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + body = json.loads(body) + token = { + 'token_type': 'Bearer', + 'expires_in': self.expires_in, + 'access_token': body['access_token'], + 'refresh_token': 'abc', + 'scope': 'all of them', + } + self.assertEqual(body, token) + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_scopes_and_user_id_stored_in_access_token(self): + body = 'grant_type=password&username=a&password=hello&scope=all+of+them' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + + access_token = json.loads(body)['access_token'] + + claims = common.verify_signed_token(self.public_pem, access_token) + + self.assertEqual(claims['scope'], 'all of them') + self.assertEqual(claims['user_id'], 123) + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_client_grant(self): + body = 'grant_type=client_credentials&scope=all+of+them' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + body = json.loads(body) + token = { + 'token_type': 'Bearer', + 'expires_in': self.expires_in, + 'access_token': body['access_token'], + 'scope': 'all of them', + } + self.assertEqual(body, token) + + def test_missing_type(self): + _, body, _ = self.endpoint.create_token_response('', body='client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&code=abc') + token = {'error': 'unsupported_grant_type'} + self.assertEqual(json.loads(body), token) + + def test_invalid_type(self): + body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=invalid&code=abc' + _, body, _ = self.endpoint.create_token_response('', body=body) + token = {'error': 'unsupported_grant_type'} + self.assertEqual(json.loads(body), token) + + +class ResourceEndpointTest(TestCase): + + def setUp(self): + self.mock_validator = mock.MagicMock() + self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) + token = tokens.BearerToken(request_validator=self.mock_validator) + self.endpoint = ResourceEndpoint( + default_token='Bearer', + token_types={'Bearer': token} + ) + + def test_defaults(self): + uri = 'http://a.b/path?some=query' + self.mock_validator.validate_bearer_token.return_value = False + valid, request = self.endpoint.verify_request(uri) + self.assertFalse(valid) + self.assertEqual(request.token_type, 'Bearer') diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/test_tokens.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_tokens.py new file mode 100644 index 0000000000..fa6b1c092c --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_tokens.py @@ -0,0 +1,170 @@ +from unittest import mock + +from oauthlib.common import Request +from oauthlib.oauth2.rfc6749.tokens import ( + BearerToken, prepare_bearer_body, prepare_bearer_headers, + prepare_bearer_uri, prepare_mac_header, +) + +from tests.unittest import TestCase + + +class TokenTest(TestCase): + + # MAC without body/payload or extension + mac_plain = { + 'token': 'h480djs93hd8', + 'uri': 'http://example.com/resource/1?b=1&a=2', + 'key': '489dks293j39', + 'http_method': 'GET', + 'nonce': '264095:dj83hs9s', + 'hash_algorithm': 'hmac-sha-1' + } + auth_plain = { + 'Authorization': 'MAC id="h480djs93hd8", nonce="264095:dj83hs9s",' + ' mac="SLDJd4mg43cjQfElUs3Qub4L6xE="' + } + + # MAC with body/payload, no extension + mac_body = { + 'token': 'jd93dh9dh39D', + 'uri': 'http://example.com/request', + 'key': '8yfrufh348h', + 'http_method': 'POST', + 'nonce': '273156:di3hvdf8', + 'hash_algorithm': 'hmac-sha-1', + 'body': 'hello=world%21' + } + auth_body = { + 'Authorization': 'MAC id="jd93dh9dh39D", nonce="273156:di3hvdf8",' + ' bodyhash="k9kbtCIy0CkI3/FEfpS/oIDjk6k=", mac="W7bdMZbv9UWOTadASIQHagZyirA="' + } + + # MAC with body/payload and extension + mac_both = { + 'token': 'h480djs93hd8', + 'uri': 'http://example.com/request?b5=%3D%253D&a3=a&c%40=&a2=r%20b&c2&a3=2+q', + 'key': '489dks293j39', + 'http_method': 'GET', + 'nonce': '264095:7d8f3e4a', + 'hash_algorithm': 'hmac-sha-1', + 'body': 'Hello World!', + 'ext': 'a,b,c' + } + auth_both = { + 'Authorization': 'MAC id="h480djs93hd8", nonce="264095:7d8f3e4a",' + ' bodyhash="Lve95gjOVATpfV8EL5X4nxwjKHE=", ext="a,b,c",' + ' mac="Z3C2DojEopRDIC88/imW8Ez853g="' + } + + # Bearer + token = 'vF9dft4qmT' + uri = 'http://server.example.com/resource' + bearer_headers = { + 'Authorization': 'Bearer vF9dft4qmT' + } + valid_bearer_header_lowercase = {"Authorization": "bearer vF9dft4qmT"} + fake_bearer_headers = [ + {'Authorization': 'Beaver vF9dft4qmT'}, + {'Authorization': 'BeavervF9dft4qmT'}, + {'Authorization': 'Beaver vF9dft4qmT'}, + {'Authorization': 'BearerF9dft4qmT'}, + {'Authorization': 'Bearer vF9d ft4qmT'}, + ] + valid_header_with_multiple_spaces = {'Authorization': 'Bearer vF9dft4qmT'} + bearer_body = 'access_token=vF9dft4qmT' + bearer_uri = 'http://server.example.com/resource?access_token=vF9dft4qmT' + + def _mocked_validate_bearer_token(self, token, scopes, request): + if not token: + return False + return True + + def test_prepare_mac_header(self): + """Verify mac signatures correctness + + TODO: verify hmac-sha-256 + """ + self.assertEqual(prepare_mac_header(**self.mac_plain), self.auth_plain) + self.assertEqual(prepare_mac_header(**self.mac_body), self.auth_body) + self.assertEqual(prepare_mac_header(**self.mac_both), self.auth_both) + + def test_prepare_bearer_request(self): + """Verify proper addition of bearer tokens to requests. + + They may be represented as query components in body or URI or + in a Bearer authorization header. + """ + self.assertEqual(prepare_bearer_headers(self.token), self.bearer_headers) + self.assertEqual(prepare_bearer_body(self.token), self.bearer_body) + self.assertEqual(prepare_bearer_uri(self.token, uri=self.uri), self.bearer_uri) + + def test_valid_bearer_is_validated(self): + request_validator = mock.MagicMock() + request_validator.validate_bearer_token = self._mocked_validate_bearer_token + + request = Request("/", headers=self.bearer_headers) + result = BearerToken(request_validator=request_validator).validate_request( + request + ) + self.assertTrue(result) + + def test_lowercase_bearer_is_validated(self): + request_validator = mock.MagicMock() + request_validator.validate_bearer_token = self._mocked_validate_bearer_token + + request = Request("/", headers=self.valid_bearer_header_lowercase) + result = BearerToken(request_validator=request_validator).validate_request( + request + ) + self.assertTrue(result) + + def test_fake_bearer_is_not_validated(self): + request_validator = mock.MagicMock() + request_validator.validate_bearer_token = self._mocked_validate_bearer_token + + for fake_header in self.fake_bearer_headers: + request = Request("/", headers=fake_header) + result = BearerToken(request_validator=request_validator).validate_request( + request + ) + + self.assertFalse(result) + + def test_header_with_multispaces_is_validated(self): + request_validator = mock.MagicMock() + request_validator.validate_bearer_token = self._mocked_validate_bearer_token + + request = Request("/", headers=self.valid_header_with_multiple_spaces) + result = BearerToken(request_validator=request_validator).validate_request( + request + ) + + self.assertTrue(result) + + def test_estimate_type(self): + request_validator = mock.MagicMock() + request_validator.validate_bearer_token = self._mocked_validate_bearer_token + request = Request("/", headers=self.bearer_headers) + result = BearerToken(request_validator=request_validator).estimate_type(request) + self.assertEqual(result, 9) + + def test_estimate_type_with_fake_header_returns_type_0(self): + request_validator = mock.MagicMock() + request_validator.validate_bearer_token = self._mocked_validate_bearer_token + + for fake_header in self.fake_bearer_headers: + request = Request("/", headers=fake_header) + result = BearerToken(request_validator=request_validator).estimate_type( + request + ) + + if ( + fake_header["Authorization"].count(" ") == 2 + and fake_header["Authorization"].split()[0] == "Bearer" + ): + # If we're dealing with the header containing 2 spaces, it will be recognized + # as a Bearer valid header, the token itself will be invalid by the way. + self.assertEqual(result, 9) + else: + self.assertEqual(result, 0) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/test_utils.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_utils.py new file mode 100644 index 0000000000..3299591926 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_utils.py @@ -0,0 +1,100 @@ +import datetime +import os + +from oauthlib.oauth2.rfc6749.utils import ( + escape, generate_age, host_from_uri, is_secure_transport, list_to_scope, + params_from_uri, scope_to_list, +) + +from tests.unittest import TestCase + + +class ScopeObject: + """ + Fixture for testing list_to_scope()/scope_to_list() with objects other + than regular strings. + """ + def __init__(self, scope): + self.scope = scope + + def __str__(self): + return self.scope + + +class UtilsTests(TestCase): + + def test_escape(self): + """Assert that we are only escaping unicode""" + self.assertRaises(ValueError, escape, b"I am a string type. Not a unicode type.") + self.assertEqual(escape("I am a unicode type."), "I%20am%20a%20unicode%20type.") + + def test_host_from_uri(self): + """Test if hosts and ports are properly extracted from URIs. + + This should be done according to the MAC Authentication spec. + Defaults ports should be provided when none is present in the URI. + """ + self.assertEqual(host_from_uri('http://a.b-c.com:8080'), ('a.b-c.com', '8080')) + self.assertEqual(host_from_uri('https://a.b.com:8080'), ('a.b.com', '8080')) + self.assertEqual(host_from_uri('http://www.example.com'), ('www.example.com', '80')) + self.assertEqual(host_from_uri('https://www.example.com'), ('www.example.com', '443')) + + def test_is_secure_transport(self): + """Test check secure uri.""" + if 'OAUTHLIB_INSECURE_TRANSPORT' in os.environ: + del os.environ['OAUTHLIB_INSECURE_TRANSPORT'] + + self.assertTrue(is_secure_transport('https://example.com')) + self.assertFalse(is_secure_transport('http://example.com')) + + os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' + self.assertTrue(is_secure_transport('http://example.com')) + del os.environ['OAUTHLIB_INSECURE_TRANSPORT'] + + def test_params_from_uri(self): + self.assertEqual(params_from_uri('http://i.b/?foo=bar&g&scope=a+d'), + {'foo': 'bar', 'g': '', 'scope': ['a', 'd']}) + + def test_generate_age(self): + issue_time = datetime.datetime.now() - datetime.timedelta( + days=3, minutes=1, seconds=4) + self.assertGreater(float(generate_age(issue_time)), 259263.0) + + def test_list_to_scope(self): + expected = 'foo bar baz' + + string_list = ['foo', 'bar', 'baz'] + self.assertEqual(list_to_scope(string_list), expected) + + string_tuple = ('foo', 'bar', 'baz') + self.assertEqual(list_to_scope(string_tuple), expected) + + obj_list = [ScopeObject('foo'), ScopeObject('bar'), ScopeObject('baz')] + self.assertEqual(list_to_scope(obj_list), expected) + + set_list = set(string_list) + set_scope = list_to_scope(set_list) + assert len(set_scope.split(' ')) == 3 + for x in string_list: + assert x in set_scope + + self.assertRaises(ValueError, list_to_scope, object()) + + def test_scope_to_list(self): + expected = ['foo', 'bar', 'baz'] + + string_scopes = 'foo bar baz ' + self.assertEqual(scope_to_list(string_scopes), expected) + + string_list_scopes = ['foo', 'bar', 'baz'] + self.assertEqual(scope_to_list(string_list_scopes), expected) + + tuple_list_scopes = ('foo', 'bar', 'baz') + self.assertEqual(scope_to_list(tuple_list_scopes), expected) + + obj_list_scopes = [ScopeObject('foo'), ScopeObject('bar'), ScopeObject('baz')] + self.assertEqual(scope_to_list(obj_list_scopes), expected) + + set_list_scopes = set(string_list_scopes) + set_list = scope_to_list(set_list_scopes) + self.assertEqual(sorted(set_list), sorted(string_list_scopes)) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc8628/__init__.py b/contrib/python/oauthlib/tests/oauth2/rfc8628/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc8628/__init__.py diff --git a/contrib/python/oauthlib/tests/oauth2/rfc8628/clients/__init__.py b/contrib/python/oauthlib/tests/oauth2/rfc8628/clients/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc8628/clients/__init__.py diff --git a/contrib/python/oauthlib/tests/oauth2/rfc8628/clients/test_device.py b/contrib/python/oauthlib/tests/oauth2/rfc8628/clients/test_device.py new file mode 100644 index 0000000000..725dea2a92 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc8628/clients/test_device.py @@ -0,0 +1,63 @@ +import os +from unittest.mock import patch + +from oauthlib import signals +from oauthlib.oauth2 import DeviceClient + +from tests.unittest import TestCase + + +class DeviceClientTest(TestCase): + + client_id = "someclientid" + kwargs = { + "some": "providers", + "require": "extra arguments" + } + + client_secret = "asecret" + + device_code = "somedevicecode" + + scope = ["profile", "email"] + + body = "not=empty" + + body_up = "not=empty&grant_type=urn:ietf:params:oauth:grant-type:device_code" + body_code = body_up + "&device_code=somedevicecode" + body_kwargs = body_code + "&some=providers&require=extra+arguments" + + uri = "https://example.com/path?query=world" + uri_id = uri + "&client_id=" + client_id + uri_grant = uri_id + "&grant_type=urn:ietf:params:oauth:grant-type:device_code" + uri_secret = uri_grant + "&client_secret=asecret" + uri_scope = uri_secret + "&scope=profile+email" + + def test_request_body(self): + client = DeviceClient(self.client_id) + + # Basic, no extra arguments + body = client.prepare_request_body(self.device_code, body=self.body) + self.assertFormBodyEqual(body, self.body_code) + + rclient = DeviceClient(self.client_id) + body = rclient.prepare_request_body(self.device_code, body=self.body) + self.assertFormBodyEqual(body, self.body_code) + + # With extra parameters + body = client.prepare_request_body( + self.device_code, body=self.body, **self.kwargs) + self.assertFormBodyEqual(body, self.body_kwargs) + + def test_request_uri(self): + client = DeviceClient(self.client_id) + + uri = client.prepare_request_uri(self.uri) + self.assertURLEqual(uri, self.uri_grant) + + client = DeviceClient(self.client_id, client_secret=self.client_secret) + uri = client.prepare_request_uri(self.uri) + self.assertURLEqual(uri, self.uri_secret) + + uri = client.prepare_request_uri(self.uri, scope=self.scope) + self.assertURLEqual(uri, self.uri_scope) |