aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/oauthlib/tests/oauth2
diff options
context:
space:
mode:
authoralexv-smirnov <alex@ydb.tech>2023-12-01 12:02:50 +0300
committeralexv-smirnov <alex@ydb.tech>2023-12-01 13:28:10 +0300
commit0e578a4c44d4abd539d9838347b9ebafaca41dfb (patch)
treea0c1969c37f818c830ebeff9c077eacf30be6ef8 /contrib/python/oauthlib/tests/oauth2
parent84f2d3d4cc985e63217cff149bd2e6d67ae6fe22 (diff)
downloadydb-0e578a4c44d4abd539d9838347b9ebafaca41dfb.tar.gz
Change "ya.make"
Diffstat (limited to 'contrib/python/oauthlib/tests/oauth2')
-rw-r--r--contrib/python/oauthlib/tests/oauth2/__init__.py0
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/__init__.py0
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/clients/__init__.py0
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_backend_application.py86
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_base.py355
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_legacy_application.py140
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_mobile_application.py111
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_service_application.py185
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_web_application.py269
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/__init__.py0
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_base_endpoint.py75
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_client_authentication.py162
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py128
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_error_responses.py491
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_extra_credentials.py69
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py168
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_metadata.py148
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py108
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_revocation_endpoint.py148
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_scope_handling.py193
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_utils.py11
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/__init__.py0
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_authorization_code.py382
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_client_credentials.py76
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_implicit.py62
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_refresh_token.py211
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_resource_owner_password.py156
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/test_parameters.py304
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/test_request_validator.py51
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/test_server.py391
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/test_tokens.py170
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc6749/test_utils.py100
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc8628/__init__.py0
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc8628/clients/__init__.py0
-rw-r--r--contrib/python/oauthlib/tests/oauth2/rfc8628/clients/test_device.py63
35 files changed, 4813 insertions, 0 deletions
diff --git a/contrib/python/oauthlib/tests/oauth2/__init__.py b/contrib/python/oauthlib/tests/oauth2/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/__init__.py
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/__init__.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/__init__.py
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/__init__.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/__init__.py
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_backend_application.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_backend_application.py
new file mode 100644
index 0000000000..c1489ac7c6
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_backend_application.py
@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+import os
+from unittest.mock import patch
+
+from oauthlib import signals
+from oauthlib.oauth2 import BackendApplicationClient
+
+from tests.unittest import TestCase
+
+
+@patch('time.time', new=lambda: 1000)
+class BackendApplicationClientTest(TestCase):
+
+ client_id = "someclientid"
+ client_secret = 'someclientsecret'
+ scope = ["/profile"]
+ kwargs = {
+ "some": "providers",
+ "require": "extra arguments"
+ }
+
+ body = "not=empty"
+
+ body_up = "not=empty&grant_type=client_credentials"
+ body_kwargs = body_up + "&some=providers&require=extra+arguments"
+
+ token_json = ('{ "access_token":"2YotnFZFEjr1zCsicMWpAA",'
+ ' "token_type":"example",'
+ ' "expires_in":3600,'
+ ' "scope":"/profile",'
+ ' "example_parameter":"example_value"}')
+ token = {
+ "access_token": "2YotnFZFEjr1zCsicMWpAA",
+ "token_type": "example",
+ "expires_in": 3600,
+ "expires_at": 4600,
+ "scope": ["/profile"],
+ "example_parameter": "example_value"
+ }
+
+ def test_request_body(self):
+ client = BackendApplicationClient(self.client_id)
+
+ # Basic, no extra arguments
+ body = client.prepare_request_body(body=self.body)
+ self.assertFormBodyEqual(body, self.body_up)
+
+ rclient = BackendApplicationClient(self.client_id)
+ body = rclient.prepare_request_body(body=self.body)
+ self.assertFormBodyEqual(body, self.body_up)
+
+ # With extra parameters
+ body = client.prepare_request_body(body=self.body, **self.kwargs)
+ self.assertFormBodyEqual(body, self.body_kwargs)
+
+ def test_parse_token_response(self):
+ client = BackendApplicationClient(self.client_id)
+
+ # Parse code and state
+ response = client.parse_request_body_response(self.token_json, scope=self.scope)
+ self.assertEqual(response, self.token)
+ self.assertEqual(client.access_token, response.get("access_token"))
+ self.assertEqual(client.refresh_token, response.get("refresh_token"))
+ self.assertEqual(client.token_type, response.get("token_type"))
+
+ # Mismatching state
+ self.assertRaises(Warning, client.parse_request_body_response, self.token_json, scope="invalid")
+ os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '3'
+ token = client.parse_request_body_response(self.token_json, scope="invalid")
+ self.assertTrue(token.scope_changed)
+
+ scope_changes_recorded = []
+ def record_scope_change(sender, message, old, new):
+ scope_changes_recorded.append((message, old, new))
+
+ signals.scope_changed.connect(record_scope_change)
+ try:
+ client.parse_request_body_response(self.token_json, scope="invalid")
+ self.assertEqual(len(scope_changes_recorded), 1)
+ message, old, new = scope_changes_recorded[0]
+ self.assertEqual(message, 'Scope has changed from "invalid" to "/profile".')
+ self.assertEqual(old, ['invalid'])
+ self.assertEqual(new, ['/profile'])
+ finally:
+ signals.scope_changed.disconnect(record_scope_change)
+ del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE']
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_base.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_base.py
new file mode 100644
index 0000000000..70a22834c3
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_base.py
@@ -0,0 +1,355 @@
+# -*- coding: utf-8 -*-
+import datetime
+
+from oauthlib import common
+from oauthlib.oauth2 import Client, InsecureTransportError, TokenExpiredError
+from oauthlib.oauth2.rfc6749 import utils
+from oauthlib.oauth2.rfc6749.clients import AUTH_HEADER, BODY, URI_QUERY
+
+from tests.unittest import TestCase
+
+
+class ClientTest(TestCase):
+
+ client_id = "someclientid"
+ uri = "https://example.com/path?query=world"
+ body = "not=empty"
+ headers = {}
+ access_token = "token"
+ mac_key = "secret"
+
+ bearer_query = uri + "&access_token=" + access_token
+ bearer_header = {
+ "Authorization": "Bearer " + access_token
+ }
+ bearer_body = body + "&access_token=" + access_token
+
+ mac_00_header = {
+ "Authorization": 'MAC id="' + access_token + '", nonce="0:abc123",' +
+ ' bodyhash="Yqyso8r3hR5Nm1ZFv+6AvNHrxjE=",' +
+ ' mac="0X6aACoBY0G6xgGZVJ1IeE8dF9k="'
+ }
+ mac_01_header = {
+ "Authorization": 'MAC id="' + access_token + '", ts="123456789",' +
+ ' nonce="abc123", mac="Xuk+9oqaaKyhitkgh1CD0xrI6+s="'
+ }
+
+ def test_add_bearer_token(self):
+ """Test a number of bearer token placements"""
+
+ # Invalid token type
+ client = Client(self.client_id, token_type="invalid")
+ self.assertRaises(ValueError, client.add_token, self.uri)
+
+ # Case-insensitive token type
+ client = Client(self.client_id, access_token=self.access_token, token_type="bEAreR")
+ uri, headers, body = client.add_token(self.uri, body=self.body,
+ headers=self.headers)
+ self.assertURLEqual(uri, self.uri)
+ self.assertFormBodyEqual(body, self.body)
+ self.assertEqual(headers, self.bearer_header)
+
+ # Non-HTTPS
+ insecure_uri = 'http://example.com/path?query=world'
+ client = Client(self.client_id, access_token=self.access_token, token_type="Bearer")
+ self.assertRaises(InsecureTransportError, client.add_token, insecure_uri,
+ body=self.body,
+ headers=self.headers)
+
+ # Missing access token
+ client = Client(self.client_id)
+ self.assertRaises(ValueError, client.add_token, self.uri)
+
+ # Expired token
+ expired = 523549800
+ expired_token = {
+ 'expires_at': expired,
+ }
+ client = Client(self.client_id, token=expired_token, access_token=self.access_token, token_type="Bearer")
+ self.assertRaises(TokenExpiredError, client.add_token, self.uri,
+ body=self.body, headers=self.headers)
+
+ # The default token placement, bearer in auth header
+ client = Client(self.client_id, access_token=self.access_token)
+ uri, headers, body = client.add_token(self.uri, body=self.body,
+ headers=self.headers)
+ self.assertURLEqual(uri, self.uri)
+ self.assertFormBodyEqual(body, self.body)
+ self.assertEqual(headers, self.bearer_header)
+
+ # Setting default placements of tokens
+ client = Client(self.client_id, access_token=self.access_token,
+ default_token_placement=AUTH_HEADER)
+ uri, headers, body = client.add_token(self.uri, body=self.body,
+ headers=self.headers)
+ self.assertURLEqual(uri, self.uri)
+ self.assertFormBodyEqual(body, self.body)
+ self.assertEqual(headers, self.bearer_header)
+
+ client = Client(self.client_id, access_token=self.access_token,
+ default_token_placement=URI_QUERY)
+ uri, headers, body = client.add_token(self.uri, body=self.body,
+ headers=self.headers)
+ self.assertURLEqual(uri, self.bearer_query)
+ self.assertFormBodyEqual(body, self.body)
+ self.assertEqual(headers, self.headers)
+
+ client = Client(self.client_id, access_token=self.access_token,
+ default_token_placement=BODY)
+ uri, headers, body = client.add_token(self.uri, body=self.body,
+ headers=self.headers)
+ self.assertURLEqual(uri, self.uri)
+ self.assertFormBodyEqual(body, self.bearer_body)
+ self.assertEqual(headers, self.headers)
+
+ # Asking for specific placement in the add_token method
+ client = Client(self.client_id, access_token=self.access_token)
+ uri, headers, body = client.add_token(self.uri, body=self.body,
+ headers=self.headers, token_placement=AUTH_HEADER)
+ self.assertURLEqual(uri, self.uri)
+ self.assertFormBodyEqual(body, self.body)
+ self.assertEqual(headers, self.bearer_header)
+
+ client = Client(self.client_id, access_token=self.access_token)
+ uri, headers, body = client.add_token(self.uri, body=self.body,
+ headers=self.headers, token_placement=URI_QUERY)
+ self.assertURLEqual(uri, self.bearer_query)
+ self.assertFormBodyEqual(body, self.body)
+ self.assertEqual(headers, self.headers)
+
+ client = Client(self.client_id, access_token=self.access_token)
+ uri, headers, body = client.add_token(self.uri, body=self.body,
+ headers=self.headers, token_placement=BODY)
+ self.assertURLEqual(uri, self.uri)
+ self.assertFormBodyEqual(body, self.bearer_body)
+ self.assertEqual(headers, self.headers)
+
+ # Invalid token placement
+ client = Client(self.client_id, access_token=self.access_token)
+ self.assertRaises(ValueError, client.add_token, self.uri, body=self.body,
+ headers=self.headers, token_placement="invalid")
+
+ client = Client(self.client_id, access_token=self.access_token,
+ default_token_placement="invalid")
+ self.assertRaises(ValueError, client.add_token, self.uri, body=self.body,
+ headers=self.headers)
+
+ def test_add_mac_token(self):
+ # Missing access token
+ client = Client(self.client_id, token_type="MAC")
+ self.assertRaises(ValueError, client.add_token, self.uri)
+
+ # Invalid hash algorithm
+ client = Client(self.client_id, token_type="MAC",
+ access_token=self.access_token, mac_key=self.mac_key,
+ mac_algorithm="hmac-sha-2")
+ self.assertRaises(ValueError, client.add_token, self.uri)
+
+ orig_generate_timestamp = common.generate_timestamp
+ orig_generate_nonce = common.generate_nonce
+ orig_generate_age = utils.generate_age
+ self.addCleanup(setattr, common, 'generage_timestamp', orig_generate_timestamp)
+ self.addCleanup(setattr, common, 'generage_nonce', orig_generate_nonce)
+ self.addCleanup(setattr, utils, 'generate_age', orig_generate_age)
+ common.generate_timestamp = lambda: '123456789'
+ common.generate_nonce = lambda: 'abc123'
+ utils.generate_age = lambda *args: 0
+
+ # Add the Authorization header (draft 00)
+ client = Client(self.client_id, token_type="MAC",
+ access_token=self.access_token, mac_key=self.mac_key,
+ mac_algorithm="hmac-sha-1")
+ uri, headers, body = client.add_token(self.uri, body=self.body,
+ headers=self.headers, issue_time=datetime.datetime.now())
+ self.assertEqual(uri, self.uri)
+ self.assertEqual(body, self.body)
+ self.assertEqual(headers, self.mac_00_header)
+ # Non-HTTPS
+ insecure_uri = 'http://example.com/path?query=world'
+ self.assertRaises(InsecureTransportError, client.add_token, insecure_uri,
+ body=self.body,
+ headers=self.headers,
+ issue_time=datetime.datetime.now())
+ # Expired Token
+ expired = 523549800
+ expired_token = {
+ 'expires_at': expired,
+ }
+ client = Client(self.client_id, token=expired_token, token_type="MAC",
+ access_token=self.access_token, mac_key=self.mac_key,
+ mac_algorithm="hmac-sha-1")
+ self.assertRaises(TokenExpiredError, client.add_token, self.uri,
+ body=self.body,
+ headers=self.headers,
+ issue_time=datetime.datetime.now())
+
+ # Add the Authorization header (draft 01)
+ client = Client(self.client_id, token_type="MAC",
+ access_token=self.access_token, mac_key=self.mac_key,
+ mac_algorithm="hmac-sha-1")
+ uri, headers, body = client.add_token(self.uri, body=self.body,
+ headers=self.headers, draft=1)
+ self.assertEqual(uri, self.uri)
+ self.assertEqual(body, self.body)
+ self.assertEqual(headers, self.mac_01_header)
+ # Non-HTTPS
+ insecure_uri = 'http://example.com/path?query=world'
+ self.assertRaises(InsecureTransportError, client.add_token, insecure_uri,
+ body=self.body,
+ headers=self.headers,
+ draft=1)
+ # Expired Token
+ expired = 523549800
+ expired_token = {
+ 'expires_at': expired,
+ }
+ client = Client(self.client_id, token=expired_token, token_type="MAC",
+ access_token=self.access_token, mac_key=self.mac_key,
+ mac_algorithm="hmac-sha-1")
+ self.assertRaises(TokenExpiredError, client.add_token, self.uri,
+ body=self.body,
+ headers=self.headers,
+ draft=1)
+
+ def test_revocation_request(self):
+ client = Client(self.client_id)
+
+ url = 'https://example.com/revoke'
+ token = 'foobar'
+
+ # Valid request
+ u, h, b = client.prepare_token_revocation_request(url, token)
+ self.assertEqual(u, url)
+ self.assertEqual(h, {'Content-Type': 'application/x-www-form-urlencoded'})
+ self.assertEqual(b, 'token=%s&token_type_hint=access_token' % token)
+
+ # Non-HTTPS revocation endpoint
+ self.assertRaises(InsecureTransportError,
+ client.prepare_token_revocation_request,
+ 'http://example.com/revoke', token)
+
+
+ u, h, b = client.prepare_token_revocation_request(
+ url, token, token_type_hint='refresh_token')
+ self.assertEqual(u, url)
+ self.assertEqual(h, {'Content-Type': 'application/x-www-form-urlencoded'})
+ self.assertEqual(b, 'token=%s&token_type_hint=refresh_token' % token)
+
+ # JSONP
+ u, h, b = client.prepare_token_revocation_request(
+ url, token, callback='hello.world')
+ self.assertURLEqual(u, url + '?callback=hello.world&token=%s&token_type_hint=access_token' % token)
+ self.assertEqual(h, {'Content-Type': 'application/x-www-form-urlencoded'})
+ self.assertEqual(b, '')
+
+ def test_prepare_authorization_request(self):
+ redirect_url = 'https://example.com/callback/'
+ scopes = 'read'
+ auth_url = 'https://example.com/authorize/'
+ state = 'fake_state'
+
+ client = Client(self.client_id, redirect_url=redirect_url, scope=scopes, state=state)
+
+ # Non-HTTPS
+ self.assertRaises(InsecureTransportError,
+ client.prepare_authorization_request, 'http://example.com/authorize/')
+
+ # NotImplementedError
+ self.assertRaises(NotImplementedError, client.prepare_authorization_request, auth_url)
+
+ def test_prepare_token_request(self):
+ redirect_url = 'https://example.com/callback/'
+ scopes = 'read'
+ token_url = 'https://example.com/token/'
+ state = 'fake_state'
+
+ client = Client(self.client_id, scope=scopes, state=state)
+
+ # Non-HTTPS
+ self.assertRaises(InsecureTransportError,
+ client.prepare_token_request, 'http://example.com/token/')
+
+ # NotImplementedError
+ self.assertRaises(NotImplementedError, client.prepare_token_request, token_url)
+
+ def test_prepare_refresh_token_request(self):
+ client = Client(self.client_id)
+
+ url = 'https://example.com/revoke'
+ token = 'foobar'
+ scope = 'extra_scope'
+
+ u, h, b = client.prepare_refresh_token_request(url, token)
+ self.assertEqual(u, url)
+ self.assertEqual(h, {'Content-Type': 'application/x-www-form-urlencoded'})
+ self.assertFormBodyEqual(b, 'grant_type=refresh_token&refresh_token=%s' % token)
+
+ # Non-HTTPS revocation endpoint
+ self.assertRaises(InsecureTransportError,
+ client.prepare_refresh_token_request,
+ 'http://example.com/revoke', token)
+
+ # provide extra scope
+ u, h, b = client.prepare_refresh_token_request(url, token, scope=scope)
+ self.assertEqual(u, url)
+ self.assertEqual(h, {'Content-Type': 'application/x-www-form-urlencoded'})
+ self.assertFormBodyEqual(b, 'grant_type=refresh_token&scope={}&refresh_token={}'.format(scope, token))
+
+ # provide scope while init
+ client = Client(self.client_id, scope=scope)
+ u, h, b = client.prepare_refresh_token_request(url, token, scope=scope)
+ self.assertEqual(u, url)
+ self.assertEqual(h, {'Content-Type': 'application/x-www-form-urlencoded'})
+ self.assertFormBodyEqual(b, 'grant_type=refresh_token&scope={}&refresh_token={}'.format(scope, token))
+
+ def test_parse_token_response_invalid_expires_at(self):
+ token_json = ('{ "access_token":"2YotnFZFEjr1zCsicMWpAA",'
+ ' "token_type":"example",'
+ ' "expires_at":"2006-01-02T15:04:05Z",'
+ ' "scope":"/profile",'
+ ' "example_parameter":"example_value"}')
+ token = {
+ "access_token": "2YotnFZFEjr1zCsicMWpAA",
+ "token_type": "example",
+ "expires_at": "2006-01-02T15:04:05Z",
+ "scope": ["/profile"],
+ "example_parameter": "example_value"
+ }
+
+ client = Client(self.client_id)
+
+ # Parse code and state
+ response = client.parse_request_body_response(token_json, scope=["/profile"])
+ self.assertEqual(response, token)
+ self.assertEqual(None, client._expires_at)
+ self.assertEqual(client.access_token, response.get("access_token"))
+ self.assertEqual(client.refresh_token, response.get("refresh_token"))
+ self.assertEqual(client.token_type, response.get("token_type"))
+
+
+ def test_create_code_verifier_min_length(self):
+ client = Client(self.client_id)
+ length = 43
+ code_verifier = client.create_code_verifier(length=length)
+ self.assertEqual(client.code_verifier, code_verifier)
+
+ def test_create_code_verifier_max_length(self):
+ client = Client(self.client_id)
+ length = 128
+ code_verifier = client.create_code_verifier(length=length)
+ self.assertEqual(client.code_verifier, code_verifier)
+
+ def test_create_code_challenge_plain(self):
+ client = Client(self.client_id)
+ code_verifier = client.create_code_verifier(length=128)
+ code_challenge_plain = client.create_code_challenge(code_verifier=code_verifier)
+
+ # if no code_challenge_method specified, code_challenge = code_verifier
+ self.assertEqual(code_challenge_plain, client.code_verifier)
+ self.assertEqual(client.code_challenge_method, "plain")
+
+ def test_create_code_challenge_s256(self):
+ client = Client(self.client_id)
+ code_verifier = client.create_code_verifier(length=128)
+ code_challenge_s256 = client.create_code_challenge(code_verifier=code_verifier, code_challenge_method='S256')
+ self.assertEqual(code_challenge_s256, client.code_challenge)
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_legacy_application.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_legacy_application.py
new file mode 100644
index 0000000000..b5a18194b7
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_legacy_application.py
@@ -0,0 +1,140 @@
+# -*- coding: utf-8 -*-
+import os
+import urllib.parse as urlparse
+from unittest.mock import patch
+
+from oauthlib import signals
+from oauthlib.oauth2 import LegacyApplicationClient
+
+from tests.unittest import TestCase
+
+
+@patch('time.time', new=lambda: 1000)
+class LegacyApplicationClientTest(TestCase):
+
+ client_id = "someclientid"
+ client_secret = 'someclientsecret'
+ scope = ["/profile"]
+ kwargs = {
+ "some": "providers",
+ "require": "extra arguments"
+ }
+
+ username = "user_username"
+ password = "user_password"
+ body = "not=empty"
+
+ body_up = "not=empty&grant_type=password&username={}&password={}".format(username, password)
+ body_kwargs = body_up + "&some=providers&require=extra+arguments"
+
+ token_json = ('{ "access_token":"2YotnFZFEjr1zCsicMWpAA",'
+ ' "token_type":"example",'
+ ' "expires_in":3600,'
+ ' "scope":"/profile",'
+ ' "refresh_token":"tGzv3JOkF0XG5Qx2TlKWIA",'
+ ' "example_parameter":"example_value"}')
+ token = {
+ "access_token": "2YotnFZFEjr1zCsicMWpAA",
+ "token_type": "example",
+ "expires_in": 3600,
+ "expires_at": 4600,
+ "scope": scope,
+ "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",
+ "example_parameter": "example_value"
+ }
+
+ def test_request_body(self):
+ client = LegacyApplicationClient(self.client_id)
+
+ # Basic, no extra arguments
+ body = client.prepare_request_body(self.username, self.password,
+ body=self.body)
+ self.assertFormBodyEqual(body, self.body_up)
+
+ # With extra parameters
+ body = client.prepare_request_body(self.username, self.password,
+ body=self.body, **self.kwargs)
+ self.assertFormBodyEqual(body, self.body_kwargs)
+
+ def test_parse_token_response(self):
+ client = LegacyApplicationClient(self.client_id)
+
+ # Parse code and state
+ response = client.parse_request_body_response(self.token_json, scope=self.scope)
+ self.assertEqual(response, self.token)
+ self.assertEqual(client.access_token, response.get("access_token"))
+ self.assertEqual(client.refresh_token, response.get("refresh_token"))
+ self.assertEqual(client.token_type, response.get("token_type"))
+
+ # Mismatching state
+ self.assertRaises(Warning, client.parse_request_body_response, self.token_json, scope="invalid")
+ os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '5'
+ token = client.parse_request_body_response(self.token_json, scope="invalid")
+ self.assertTrue(token.scope_changed)
+
+ scope_changes_recorded = []
+ def record_scope_change(sender, message, old, new):
+ scope_changes_recorded.append((message, old, new))
+
+ signals.scope_changed.connect(record_scope_change)
+ try:
+ client.parse_request_body_response(self.token_json, scope="invalid")
+ self.assertEqual(len(scope_changes_recorded), 1)
+ message, old, new = scope_changes_recorded[0]
+ self.assertEqual(message, 'Scope has changed from "invalid" to "/profile".')
+ self.assertEqual(old, ['invalid'])
+ self.assertEqual(new, ['/profile'])
+ finally:
+ signals.scope_changed.disconnect(record_scope_change)
+ del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE']
+
+ def test_prepare_request_body(self):
+ """
+ see issue #585
+ https://github.com/oauthlib/oauthlib/issues/585
+ """
+ client = LegacyApplicationClient(self.client_id)
+
+ # scenario 1, default behavior to not include `client_id`
+ r1 = client.prepare_request_body(username=self.username, password=self.password)
+ self.assertIn(r1, ('grant_type=password&username={}&password={}'.format(self.username, self.password),
+ 'grant_type=password&password={}&username={}'.format(self.password, self.username),
+ ))
+
+ # scenario 2, include `client_id` in the body
+ r2 = client.prepare_request_body(username=self.username, password=self.password, include_client_id=True)
+ r2_params = dict(urlparse.parse_qsl(r2, keep_blank_values=True))
+ self.assertEqual(len(r2_params.keys()), 4)
+ self.assertEqual(r2_params['grant_type'], 'password')
+ self.assertEqual(r2_params['username'], self.username)
+ self.assertEqual(r2_params['password'], self.password)
+ self.assertEqual(r2_params['client_id'], self.client_id)
+
+ # scenario 3, include `client_id` + `client_secret` in the body
+ r3 = client.prepare_request_body(username=self.username, password=self.password, include_client_id=True, client_secret=self.client_secret)
+ r3_params = dict(urlparse.parse_qsl(r3, keep_blank_values=True))
+ self.assertEqual(len(r3_params.keys()), 5)
+ self.assertEqual(r3_params['grant_type'], 'password')
+ self.assertEqual(r3_params['username'], self.username)
+ self.assertEqual(r3_params['password'], self.password)
+ self.assertEqual(r3_params['client_id'], self.client_id)
+ self.assertEqual(r3_params['client_secret'], self.client_secret)
+
+ # scenario 4, `client_secret` is an empty string
+ r4 = client.prepare_request_body(username=self.username, password=self.password, include_client_id=True, client_secret='')
+ r4_params = dict(urlparse.parse_qsl(r4, keep_blank_values=True))
+ self.assertEqual(len(r4_params.keys()), 5)
+ self.assertEqual(r4_params['grant_type'], 'password')
+ self.assertEqual(r4_params['username'], self.username)
+ self.assertEqual(r4_params['password'], self.password)
+ self.assertEqual(r4_params['client_id'], self.client_id)
+ self.assertEqual(r4_params['client_secret'], '')
+
+ # scenario 4b`,` client_secret is `None`
+ r4b = client.prepare_request_body(username=self.username, password=self.password, include_client_id=True, client_secret=None)
+ r4b_params = dict(urlparse.parse_qsl(r4b, keep_blank_values=True))
+ self.assertEqual(len(r4b_params.keys()), 4)
+ self.assertEqual(r4b_params['grant_type'], 'password')
+ self.assertEqual(r4b_params['username'], self.username)
+ self.assertEqual(r4b_params['password'], self.password)
+ self.assertEqual(r4b_params['client_id'], self.client_id)
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_mobile_application.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_mobile_application.py
new file mode 100644
index 0000000000..c40950c978
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_mobile_application.py
@@ -0,0 +1,111 @@
+# -*- coding: utf-8 -*-
+import os
+from unittest.mock import patch
+
+from oauthlib import signals
+from oauthlib.oauth2 import MobileApplicationClient
+
+from tests.unittest import TestCase
+
+
+@patch('time.time', new=lambda: 1000)
+class MobileApplicationClientTest(TestCase):
+
+ client_id = "someclientid"
+ uri = "https://example.com/path?query=world"
+ uri_id = uri + "&response_type=token&client_id=" + client_id
+ uri_redirect = uri_id + "&redirect_uri=http%3A%2F%2Fmy.page.com%2Fcallback"
+ redirect_uri = "http://my.page.com/callback"
+ scope = ["/profile"]
+ state = "xyz"
+ uri_scope = uri_id + "&scope=%2Fprofile"
+ uri_state = uri_id + "&state=" + state
+ kwargs = {
+ "some": "providers",
+ "require": "extra arguments"
+ }
+ uri_kwargs = uri_id + "&some=providers&require=extra+arguments"
+
+ code = "zzzzaaaa"
+
+ response_uri = ('https://client.example.com/cb?#'
+ 'access_token=2YotnFZFEjr1zCsicMWpAA&'
+ 'token_type=example&'
+ 'expires_in=3600&'
+ 'scope=%2Fprofile&'
+ 'example_parameter=example_value')
+ token = {
+ "access_token": "2YotnFZFEjr1zCsicMWpAA",
+ "token_type": "example",
+ "expires_in": 3600,
+ "expires_at": 4600,
+ "scope": scope,
+ "example_parameter": "example_value"
+ }
+
+ def test_implicit_token_uri(self):
+ client = MobileApplicationClient(self.client_id)
+
+ # Basic, no extra arguments
+ uri = client.prepare_request_uri(self.uri)
+ self.assertURLEqual(uri, self.uri_id)
+
+ # With redirection uri
+ uri = client.prepare_request_uri(self.uri, redirect_uri=self.redirect_uri)
+ self.assertURLEqual(uri, self.uri_redirect)
+
+ # With scope
+ uri = client.prepare_request_uri(self.uri, scope=self.scope)
+ self.assertURLEqual(uri, self.uri_scope)
+
+ # With state
+ uri = client.prepare_request_uri(self.uri, state=self.state)
+ self.assertURLEqual(uri, self.uri_state)
+
+ # With extra parameters through kwargs
+ uri = client.prepare_request_uri(self.uri, **self.kwargs)
+ self.assertURLEqual(uri, self.uri_kwargs)
+
+ def test_populate_attributes(self):
+
+ client = MobileApplicationClient(self.client_id)
+
+ response_uri = (self.response_uri + "&code=EVIL-CODE")
+
+ client.parse_request_uri_response(response_uri, scope=self.scope)
+
+ # We must not accidentally pick up any further security
+ # credentials at this point.
+ self.assertIsNone(client.code)
+
+ def test_parse_token_response(self):
+ client = MobileApplicationClient(self.client_id)
+
+ # Parse code and state
+ response = client.parse_request_uri_response(self.response_uri, scope=self.scope)
+ self.assertEqual(response, self.token)
+ self.assertEqual(client.access_token, response.get("access_token"))
+ self.assertEqual(client.refresh_token, response.get("refresh_token"))
+ self.assertEqual(client.token_type, response.get("token_type"))
+
+ # Mismatching scope
+ self.assertRaises(Warning, client.parse_request_uri_response, self.response_uri, scope="invalid")
+ os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '4'
+ token = client.parse_request_uri_response(self.response_uri, scope='invalid')
+ self.assertTrue(token.scope_changed)
+
+ scope_changes_recorded = []
+ def record_scope_change(sender, message, old, new):
+ scope_changes_recorded.append((message, old, new))
+
+ signals.scope_changed.connect(record_scope_change)
+ try:
+ client.parse_request_uri_response(self.response_uri, scope="invalid")
+ self.assertEqual(len(scope_changes_recorded), 1)
+ message, old, new = scope_changes_recorded[0]
+ self.assertEqual(message, 'Scope has changed from "invalid" to "/profile".')
+ self.assertEqual(old, ['invalid'])
+ self.assertEqual(new, ['/profile'])
+ finally:
+ signals.scope_changed.disconnect(record_scope_change)
+ del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE']
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_service_application.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_service_application.py
new file mode 100644
index 0000000000..b97d8554ed
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_service_application.py
@@ -0,0 +1,185 @@
+# -*- coding: utf-8 -*-
+import os
+from time import time
+from unittest.mock import patch
+
+import jwt
+
+from oauthlib.common import Request
+from oauthlib.oauth2 import ServiceApplicationClient
+
+from tests.unittest import TestCase
+
+
+class ServiceApplicationClientTest(TestCase):
+
+ gt = ServiceApplicationClient.grant_type
+
+ private_key = """
+-----BEGIN RSA PRIVATE KEY-----
+MIICXgIBAAKBgQDk1/bxyS8Q8jiheHeYYp/4rEKJopeQRRKKpZI4s5i+UPwVpupG
+AlwXWfzXwSMaKPAoKJNdu7tqKRniqst5uoHXw98gj0x7zamu0Ck1LtQ4c7pFMVah
+5IYGhBi2E9ycNS329W27nJPWNCbESTu7snVlG8V8mfvGGg3xNjTMO7IdrwIDAQAB
+AoGBAOQ2KuH8S5+OrsL4K+wfjoCi6MfxCUyqVU9GxocdM1m30WyWRFMEz2nKJ8fR
+p3vTD4w8yplTOhcoXdQZl0kRoaDzrcYkm2VvJtQRrX7dKFT8dR8D/Tr7dNQLOXfC
+DY6xveQczE7qt7Vk7lp4FqmxBsaaEuokt78pOOjywZoInjZhAkEA9wz3zoZNT0/i
+rf6qv2qTIeieUB035N3dyw6f1BGSWYaXSuerDCD/J1qZbAPKKhyHZbVawFt3UMhe
+542UftBaxQJBAO0iJy1I8GQjGnS7B3yvyH3CcLYGy296+XO/2xKp/d/ty1OIeovx
+C60pLNwuFNF3z9d2GVQAdoQ89hUkOtjZLeMCQQD0JO6oPHUeUjYT+T7ImAv7UKVT
+Suy30sKjLzqoGw1kR+wv7C5PeDRvscs4wa4CW9s6mjSrMDkDrmCLuJDtmf55AkEA
+kmaMg2PNrjUR51F0zOEFycaaqXbGcFwe1/xx9zLmHzMDXd4bsnwt9kk+fe0hQzVS
+JzatanQit3+feev1PN3QewJAWv4RZeavEUhKv+kLe95Yd0su7lTLVduVgh4v5yLT
+Ga6FHdjGPcfajt+nrpB1n8UQBEH9ZxniokR/IPvdMlxqXA==
+-----END RSA PRIVATE KEY-----
+"""
+
+ public_key = """
+-----BEGIN PUBLIC KEY-----
+MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDk1/bxyS8Q8jiheHeYYp/4rEKJ
+opeQRRKKpZI4s5i+UPwVpupGAlwXWfzXwSMaKPAoKJNdu7tqKRniqst5uoHXw98g
+j0x7zamu0Ck1LtQ4c7pFMVah5IYGhBi2E9ycNS329W27nJPWNCbESTu7snVlG8V8
+mfvGGg3xNjTMO7IdrwIDAQAB
+-----END PUBLIC KEY-----
+"""
+
+ subject = 'resource-owner@provider.com'
+
+ issuer = 'the-client@provider.com'
+
+ audience = 'https://provider.com/token'
+
+ client_id = "someclientid"
+ scope = ["/profile"]
+ kwargs = {
+ "some": "providers",
+ "require": "extra arguments"
+ }
+
+ body = "isnot=empty"
+
+ body_up = "not=empty&grant_type=%s" % gt
+ body_kwargs = body_up + "&some=providers&require=extra+arguments"
+
+ token_json = ('{ "access_token":"2YotnFZFEjr1zCsicMWpAA",'
+ ' "token_type":"example",'
+ ' "expires_in":3600,'
+ ' "scope":"/profile",'
+ ' "example_parameter":"example_value"}')
+ token = {
+ "access_token": "2YotnFZFEjr1zCsicMWpAA",
+ "token_type": "example",
+ "expires_in": 3600,
+ "scope": ["/profile"],
+ "example_parameter": "example_value"
+ }
+
+ @patch('time.time')
+ def test_request_body(self, t):
+ t.return_value = time()
+ self.token['expires_at'] = self.token['expires_in'] + t.return_value
+
+ client = ServiceApplicationClient(
+ self.client_id, private_key=self.private_key)
+
+ # Basic with min required params
+ body = client.prepare_request_body(issuer=self.issuer,
+ subject=self.subject,
+ audience=self.audience,
+ body=self.body)
+ r = Request('https://a.b', body=body)
+ self.assertEqual(r.isnot, 'empty')
+ self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)
+
+ claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])
+
+ self.assertEqual(claim['iss'], self.issuer)
+ # audience verification is handled during decode now
+ self.assertEqual(claim['sub'], self.subject)
+ self.assertEqual(claim['iat'], int(t.return_value))
+ self.assertNotIn('nbf', claim)
+ self.assertNotIn('jti', claim)
+
+ # Missing issuer parameter
+ self.assertRaises(ValueError, client.prepare_request_body,
+ issuer=None, subject=self.subject, audience=self.audience, body=self.body)
+
+ # Missing subject parameter
+ self.assertRaises(ValueError, client.prepare_request_body,
+ issuer=self.issuer, subject=None, audience=self.audience, body=self.body)
+
+ # Missing audience parameter
+ self.assertRaises(ValueError, client.prepare_request_body,
+ issuer=self.issuer, subject=self.subject, audience=None, body=self.body)
+
+ # Optional kwargs
+ not_before = time() - 3600
+ jwt_id = '8zd15df4s35f43sd'
+ body = client.prepare_request_body(issuer=self.issuer,
+ subject=self.subject,
+ audience=self.audience,
+ body=self.body,
+ not_before=not_before,
+ jwt_id=jwt_id)
+
+ r = Request('https://a.b', body=body)
+ self.assertEqual(r.isnot, 'empty')
+ self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)
+
+ claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])
+
+ self.assertEqual(claim['iss'], self.issuer)
+ # audience verification is handled during decode now
+ self.assertEqual(claim['sub'], self.subject)
+ self.assertEqual(claim['iat'], int(t.return_value))
+ self.assertEqual(claim['nbf'], not_before)
+ self.assertEqual(claim['jti'], jwt_id)
+
+ @patch('time.time')
+ def test_request_body_no_initial_private_key(self, t):
+ t.return_value = time()
+ self.token['expires_at'] = self.token['expires_in'] + t.return_value
+
+ client = ServiceApplicationClient(
+ self.client_id, private_key=None)
+
+ # Basic with private key provided
+ body = client.prepare_request_body(issuer=self.issuer,
+ subject=self.subject,
+ audience=self.audience,
+ body=self.body,
+ private_key=self.private_key)
+ r = Request('https://a.b', body=body)
+ self.assertEqual(r.isnot, 'empty')
+ self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)
+
+ claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])
+
+ self.assertEqual(claim['iss'], self.issuer)
+ # audience verification is handled during decode now
+ self.assertEqual(claim['sub'], self.subject)
+ self.assertEqual(claim['iat'], int(t.return_value))
+
+ # No private key provided
+ self.assertRaises(ValueError, client.prepare_request_body,
+ issuer=self.issuer, subject=self.subject, audience=self.audience, body=self.body)
+
+ @patch('time.time')
+ def test_parse_token_response(self, t):
+ t.return_value = time()
+ self.token['expires_at'] = self.token['expires_in'] + t.return_value
+
+ client = ServiceApplicationClient(self.client_id)
+
+ # Parse code and state
+ response = client.parse_request_body_response(self.token_json, scope=self.scope)
+ self.assertEqual(response, self.token)
+ self.assertEqual(client.access_token, response.get("access_token"))
+ self.assertEqual(client.refresh_token, response.get("refresh_token"))
+ self.assertEqual(client.token_type, response.get("token_type"))
+
+ # Mismatching state
+ self.assertRaises(Warning, client.parse_request_body_response, self.token_json, scope="invalid")
+ os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '2'
+ token = client.parse_request_body_response(self.token_json, scope="invalid")
+ self.assertTrue(token.scope_changed)
+ del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE']
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_web_application.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_web_application.py
new file mode 100644
index 0000000000..7a71121512
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_web_application.py
@@ -0,0 +1,269 @@
+# -*- coding: utf-8 -*-
+import os
+import urllib.parse as urlparse
+import warnings
+from unittest.mock import patch
+
+from oauthlib import common, signals
+from oauthlib.oauth2 import (
+ BackendApplicationClient, Client, LegacyApplicationClient,
+ MobileApplicationClient, WebApplicationClient,
+)
+from oauthlib.oauth2.rfc6749 import errors, utils
+from oauthlib.oauth2.rfc6749.clients import AUTH_HEADER, BODY, URI_QUERY
+
+from tests.unittest import TestCase
+
+
+@patch('time.time', new=lambda: 1000)
+class WebApplicationClientTest(TestCase):
+
+ client_id = "someclientid"
+ client_secret = 'someclientsecret'
+ uri = "https://example.com/path?query=world"
+ uri_id = uri + "&response_type=code&client_id=" + client_id
+ uri_redirect = uri_id + "&redirect_uri=http%3A%2F%2Fmy.page.com%2Fcallback"
+ redirect_uri = "http://my.page.com/callback"
+ code_verifier = "code_verifier"
+ scope = ["/profile"]
+ state = "xyz"
+ code_challenge = "code_challenge"
+ code_challenge_method = "S256"
+ uri_scope = uri_id + "&scope=%2Fprofile"
+ uri_state = uri_id + "&state=" + state
+ uri_code_challenge = uri_id + "&code_challenge=" + code_challenge + "&code_challenge_method=" + code_challenge_method
+ uri_code_challenge_method = uri_id + "&code_challenge=" + code_challenge + "&code_challenge_method=plain"
+ kwargs = {
+ "some": "providers",
+ "require": "extra arguments"
+ }
+ uri_kwargs = uri_id + "&some=providers&require=extra+arguments"
+ uri_authorize_code = uri_redirect + "&scope=%2Fprofile&state=" + state
+
+ code = "zzzzaaaa"
+ body = "not=empty"
+
+ body_code = "not=empty&grant_type=authorization_code&code={}&client_id={}".format(code, client_id)
+ body_redirect = body_code + "&redirect_uri=http%3A%2F%2Fmy.page.com%2Fcallback"
+ body_code_verifier = body_code + "&code_verifier=code_verifier"
+ body_kwargs = body_code + "&some=providers&require=extra+arguments"
+
+ response_uri = "https://client.example.com/cb?code=zzzzaaaa&state=xyz"
+ response = {"code": "zzzzaaaa", "state": "xyz"}
+
+ token_json = ('{ "access_token":"2YotnFZFEjr1zCsicMWpAA",'
+ ' "token_type":"example",'
+ ' "expires_in":3600,'
+ ' "scope":"/profile",'
+ ' "refresh_token":"tGzv3JOkF0XG5Qx2TlKWIA",'
+ ' "example_parameter":"example_value"}')
+ token = {
+ "access_token": "2YotnFZFEjr1zCsicMWpAA",
+ "token_type": "example",
+ "expires_in": 3600,
+ "expires_at": 4600,
+ "scope": scope,
+ "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",
+ "example_parameter": "example_value"
+ }
+
+ def test_auth_grant_uri(self):
+ client = WebApplicationClient(self.client_id)
+
+ # Basic, no extra arguments
+ uri = client.prepare_request_uri(self.uri)
+ self.assertURLEqual(uri, self.uri_id)
+
+ # With redirection uri
+ uri = client.prepare_request_uri(self.uri, redirect_uri=self.redirect_uri)
+ self.assertURLEqual(uri, self.uri_redirect)
+
+ # With scope
+ uri = client.prepare_request_uri(self.uri, scope=self.scope)
+ self.assertURLEqual(uri, self.uri_scope)
+
+ # With state
+ uri = client.prepare_request_uri(self.uri, state=self.state)
+ self.assertURLEqual(uri, self.uri_state)
+
+ # with code_challenge and code_challenge_method
+ uri = client.prepare_request_uri(self.uri, code_challenge=self.code_challenge, code_challenge_method=self.code_challenge_method)
+ self.assertURLEqual(uri, self.uri_code_challenge)
+
+ # with no code_challenge_method
+ uri = client.prepare_request_uri(self.uri, code_challenge=self.code_challenge)
+ self.assertURLEqual(uri, self.uri_code_challenge_method)
+
+ # With extra parameters through kwargs
+ uri = client.prepare_request_uri(self.uri, **self.kwargs)
+ self.assertURLEqual(uri, self.uri_kwargs)
+
+ def test_request_body(self):
+ client = WebApplicationClient(self.client_id, code=self.code)
+
+ # Basic, no extra arguments
+ body = client.prepare_request_body(body=self.body)
+ self.assertFormBodyEqual(body, self.body_code)
+
+ rclient = WebApplicationClient(self.client_id)
+ body = rclient.prepare_request_body(code=self.code, body=self.body)
+ self.assertFormBodyEqual(body, self.body_code)
+
+ # With redirection uri
+ body = client.prepare_request_body(body=self.body, redirect_uri=self.redirect_uri)
+ self.assertFormBodyEqual(body, self.body_redirect)
+
+ # With code verifier
+ body = client.prepare_request_body(body=self.body, code_verifier=self.code_verifier)
+ self.assertFormBodyEqual(body, self.body_code_verifier)
+
+ # With extra parameters
+ body = client.prepare_request_body(body=self.body, **self.kwargs)
+ self.assertFormBodyEqual(body, self.body_kwargs)
+
+ def test_parse_grant_uri_response(self):
+ client = WebApplicationClient(self.client_id)
+
+ # Parse code and state
+ response = client.parse_request_uri_response(self.response_uri, state=self.state)
+ self.assertEqual(response, self.response)
+ self.assertEqual(client.code, self.code)
+
+ # Mismatching state
+ self.assertRaises(errors.MismatchingStateError,
+ client.parse_request_uri_response,
+ self.response_uri,
+ state="invalid")
+
+ def test_populate_attributes(self):
+
+ client = WebApplicationClient(self.client_id)
+
+ response_uri = (self.response_uri +
+ "&access_token=EVIL-TOKEN"
+ "&refresh_token=EVIL-TOKEN"
+ "&mac_key=EVIL-KEY")
+
+ client.parse_request_uri_response(response_uri, self.state)
+
+ self.assertEqual(client.code, self.code)
+
+ # We must not accidentally pick up any further security
+ # credentials at this point.
+ self.assertIsNone(client.access_token)
+ self.assertIsNone(client.refresh_token)
+ self.assertIsNone(client.mac_key)
+
+ def test_parse_token_response(self):
+ client = WebApplicationClient(self.client_id)
+
+ # Parse code and state
+ response = client.parse_request_body_response(self.token_json, scope=self.scope)
+ self.assertEqual(response, self.token)
+ self.assertEqual(client.access_token, response.get("access_token"))
+ self.assertEqual(client.refresh_token, response.get("refresh_token"))
+ self.assertEqual(client.token_type, response.get("token_type"))
+
+ # Mismatching state
+ self.assertRaises(Warning, client.parse_request_body_response, self.token_json, scope="invalid")
+ os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '1'
+ token = client.parse_request_body_response(self.token_json, scope="invalid")
+ self.assertTrue(token.scope_changed)
+
+ scope_changes_recorded = []
+ def record_scope_change(sender, message, old, new):
+ scope_changes_recorded.append((message, old, new))
+
+ signals.scope_changed.connect(record_scope_change)
+ try:
+ client.parse_request_body_response(self.token_json, scope="invalid")
+ self.assertEqual(len(scope_changes_recorded), 1)
+ message, old, new = scope_changes_recorded[0]
+ self.assertEqual(message, 'Scope has changed from "invalid" to "/profile".')
+ self.assertEqual(old, ['invalid'])
+ self.assertEqual(new, ['/profile'])
+ finally:
+ signals.scope_changed.disconnect(record_scope_change)
+ del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE']
+
+ def test_prepare_authorization_requeset(self):
+ client = WebApplicationClient(self.client_id)
+
+ url, header, body = client.prepare_authorization_request(
+ self.uri, redirect_url=self.redirect_uri, state=self.state, scope=self.scope)
+ self.assertURLEqual(url, self.uri_authorize_code)
+ # verify default header and body only
+ self.assertEqual(header, {'Content-Type': 'application/x-www-form-urlencoded'})
+ self.assertEqual(body, '')
+
+ def test_prepare_request_body(self):
+ """
+ see issue #585
+ https://github.com/oauthlib/oauthlib/issues/585
+
+ `prepare_request_body` should support the following scenarios:
+ 1. Include client_id alone in the body (default)
+ 2. Include client_id and client_secret in auth and not include them in the body (RFC preferred solution)
+ 3. Include client_id and client_secret in the body (RFC alternative solution)
+ 4. Include client_id in the body and an empty string for client_secret.
+ """
+ client = WebApplicationClient(self.client_id)
+
+ # scenario 1, default behavior to include `client_id`
+ r1 = client.prepare_request_body()
+ self.assertEqual(r1, 'grant_type=authorization_code&client_id=%s' % self.client_id)
+
+ r1b = client.prepare_request_body(include_client_id=True)
+ self.assertEqual(r1b, 'grant_type=authorization_code&client_id=%s' % self.client_id)
+
+ # scenario 2, do not include `client_id` in the body, so it can be sent in auth.
+ r2 = client.prepare_request_body(include_client_id=False)
+ self.assertEqual(r2, 'grant_type=authorization_code')
+
+ # scenario 3, Include client_id and client_secret in the body (RFC alternative solution)
+ # the order of kwargs being appended is not guaranteed. for brevity, check the 2 permutations instead of sorting
+ r3 = client.prepare_request_body(client_secret=self.client_secret)
+ r3_params = dict(urlparse.parse_qsl(r3, keep_blank_values=True))
+ self.assertEqual(len(r3_params.keys()), 3)
+ self.assertEqual(r3_params['grant_type'], 'authorization_code')
+ self.assertEqual(r3_params['client_id'], self.client_id)
+ self.assertEqual(r3_params['client_secret'], self.client_secret)
+
+ r3b = client.prepare_request_body(include_client_id=True, client_secret=self.client_secret)
+ r3b_params = dict(urlparse.parse_qsl(r3b, keep_blank_values=True))
+ self.assertEqual(len(r3b_params.keys()), 3)
+ self.assertEqual(r3b_params['grant_type'], 'authorization_code')
+ self.assertEqual(r3b_params['client_id'], self.client_id)
+ self.assertEqual(r3b_params['client_secret'], self.client_secret)
+
+ # scenario 4, `client_secret` is an empty string
+ r4 = client.prepare_request_body(include_client_id=True, client_secret='')
+ r4_params = dict(urlparse.parse_qsl(r4, keep_blank_values=True))
+ self.assertEqual(len(r4_params.keys()), 3)
+ self.assertEqual(r4_params['grant_type'], 'authorization_code')
+ self.assertEqual(r4_params['client_id'], self.client_id)
+ self.assertEqual(r4_params['client_secret'], '')
+
+ # scenario 4b, `client_secret` is `None`
+ r4b = client.prepare_request_body(include_client_id=True, client_secret=None)
+ r4b_params = dict(urlparse.parse_qsl(r4b, keep_blank_values=True))
+ self.assertEqual(len(r4b_params.keys()), 2)
+ self.assertEqual(r4b_params['grant_type'], 'authorization_code')
+ self.assertEqual(r4b_params['client_id'], self.client_id)
+
+ # scenario Warnings
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter("always") # catch all
+
+ # warning1 - raise a DeprecationWarning if a `client_id` is submitted
+ rWarnings1 = client.prepare_request_body(client_id=self.client_id)
+ self.assertEqual(len(w), 1)
+ self.assertIsInstance(w[0].message, DeprecationWarning)
+
+ # testing the exact warning message in Python2&Python3 is a pain
+
+ # scenario Exceptions
+ # exception1 - raise a ValueError if the a different `client_id` is submitted
+ with self.assertRaises(ValueError) as cm:
+ client.prepare_request_body(client_id='different_client_id')
+ # testing the exact exception message in Python2&Python3 is a pain
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/__init__.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/__init__.py
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_base_endpoint.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_base_endpoint.py
new file mode 100644
index 0000000000..b1af6c3306
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_base_endpoint.py
@@ -0,0 +1,75 @@
+# -*- coding: utf-8 -*-
+from oauthlib.oauth2 import (
+ FatalClientError, OAuth2Error, RequestValidator, Server,
+)
+from oauthlib.oauth2.rfc6749 import (
+ BaseEndpoint, catch_errors_and_unavailability,
+)
+
+from tests.unittest import TestCase
+
+
+class BaseEndpointTest(TestCase):
+
+ def test_default_config(self):
+ endpoint = BaseEndpoint()
+ self.assertFalse(endpoint.catch_errors)
+ self.assertTrue(endpoint.available)
+ endpoint.catch_errors = True
+ self.assertTrue(endpoint.catch_errors)
+ endpoint.available = False
+ self.assertFalse(endpoint.available)
+
+ def test_error_catching(self):
+ validator = RequestValidator()
+ server = Server(validator)
+ server.catch_errors = True
+ h, b, s = server.create_token_response(
+ 'https://example.com', body='grant_type=authorization_code&code=abc'
+ )
+ self.assertIn("server_error", b)
+ self.assertEqual(s, 500)
+
+ def test_unavailability(self):
+ validator = RequestValidator()
+ server = Server(validator)
+ server.available = False
+ h, b, s = server.create_authorization_response('https://example.com')
+ self.assertIn("temporarily_unavailable", b)
+ self.assertEqual(s, 503)
+
+ def test_wrapper(self):
+
+ class TestServer(Server):
+
+ @catch_errors_and_unavailability
+ def throw_error(self, uri):
+ raise ValueError()
+
+ @catch_errors_and_unavailability
+ def throw_oauth_error(self, uri):
+ raise OAuth2Error()
+
+ @catch_errors_and_unavailability
+ def throw_fatal_oauth_error(self, uri):
+ raise FatalClientError()
+
+ validator = RequestValidator()
+ server = TestServer(validator)
+
+ server.catch_errors = True
+ h, b, s = server.throw_error('a')
+ self.assertIn("server_error", b)
+ self.assertEqual(s, 500)
+
+ server.available = False
+ h, b, s = server.throw_error('a')
+ self.assertIn("temporarily_unavailable", b)
+ self.assertEqual(s, 503)
+
+ server.available = True
+ self.assertRaises(OAuth2Error, server.throw_oauth_error, 'a')
+ self.assertRaises(FatalClientError, server.throw_fatal_oauth_error, 'a')
+ server.catch_errors = False
+ self.assertRaises(OAuth2Error, server.throw_oauth_error, 'a')
+ self.assertRaises(FatalClientError, server.throw_fatal_oauth_error, 'a')
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_client_authentication.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_client_authentication.py
new file mode 100644
index 0000000000..0659ee0d25
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_client_authentication.py
@@ -0,0 +1,162 @@
+"""Client authentication tests across all endpoints.
+
+Client authentication in OAuth2 serve two purposes, to authenticate
+confidential clients and to ensure public clients are in fact public. The
+latter is achieved with authenticate_client_id and the former with
+authenticate_client.
+
+We make sure authentication is done by requiring a client object to be set
+on the request object with a client_id parameter. The client_id attribute
+prevents this check from being circumvented with a client form parameter.
+"""
+import json
+from unittest import mock
+
+from oauthlib.oauth2 import (
+ BackendApplicationServer, LegacyApplicationServer, MobileApplicationServer,
+ RequestValidator, WebApplicationServer,
+)
+
+from tests.unittest import TestCase
+
+from .test_utils import get_fragment_credentials
+
+
+class ClientAuthenticationTest(TestCase):
+
+ def inspect_client(self, request, refresh_token=False):
+ if not request.client or not request.client.client_id:
+ raise ValueError()
+ return 'abc'
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.is_pkce_required.return_value = False
+ self.validator.get_code_challenge.return_value = None
+ self.validator.get_default_redirect_uri.return_value = 'http://i.b./path'
+ self.web = WebApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.mobile = MobileApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.legacy = LegacyApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.backend = BackendApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.token_uri = 'http://example.com/path'
+ self.auth_uri = 'http://example.com/path?client_id=abc&response_type=token'
+ # should be base64 but no added value in this unittest
+ self.basicauth_client_creds = {"Authorization": "john:doe"}
+ self.basicauth_client_id = {"Authorization": "john:"}
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def set_client_id(self, client_id, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def basicauth_authenticate_client(self, request):
+ assert "Authorization" in request.headers
+ assert "john:doe" in request.headers["Authorization"]
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def test_client_id_authentication(self):
+ token_uri = 'http://example.com/path'
+
+ # authorization code grant
+ self.validator.authenticate_client.return_value = False
+ self.validator.authenticate_client_id.return_value = False
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=mock')
+ self.assertEqual(json.loads(body)['error'], 'invalid_client')
+
+ self.validator.authenticate_client_id.return_value = True
+ self.validator.authenticate_client.side_effect = self.set_client
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=mock')
+ self.assertIn('access_token', json.loads(body))
+
+ # implicit grant
+ auth_uri = 'http://example.com/path?client_id=abc&response_type=token'
+ self.assertRaises(ValueError, self.mobile.create_authorization_response,
+ auth_uri, scopes=['random'])
+
+ self.validator.validate_client_id.side_effect = self.set_client_id
+ h, _, s = self.mobile.create_authorization_response(auth_uri, scopes=['random'])
+ self.assertEqual(302, s)
+ self.assertIn('Location', h)
+ self.assertIn('access_token', get_fragment_credentials(h['Location']))
+
+ def test_basicauth_web(self):
+ self.validator.authenticate_client.side_effect = self.basicauth_authenticate_client
+ _, body, _ = self.web.create_token_response(
+ self.token_uri,
+ body='grant_type=authorization_code&code=mock',
+ headers=self.basicauth_client_creds
+ )
+ self.assertIn('access_token', json.loads(body))
+
+ def test_basicauth_legacy(self):
+ self.validator.authenticate_client.side_effect = self.basicauth_authenticate_client
+ _, body, _ = self.legacy.create_token_response(
+ self.token_uri,
+ body='grant_type=password&username=abc&password=secret',
+ headers=self.basicauth_client_creds
+ )
+ self.assertIn('access_token', json.loads(body))
+
+ def test_basicauth_backend(self):
+ self.validator.authenticate_client.side_effect = self.basicauth_authenticate_client
+ _, body, _ = self.backend.create_token_response(
+ self.token_uri,
+ body='grant_type=client_credentials',
+ headers=self.basicauth_client_creds
+ )
+ self.assertIn('access_token', json.loads(body))
+
+ def test_basicauth_revoke(self):
+ self.validator.authenticate_client.side_effect = self.basicauth_authenticate_client
+
+ # legacy or any other uses the same RevocationEndpoint
+ _, body, status = self.legacy.create_revocation_response(
+ self.token_uri,
+ body='token=foobar',
+ headers=self.basicauth_client_creds
+ )
+ self.assertEqual(status, 200, body)
+
+ def test_basicauth_introspect(self):
+ self.validator.authenticate_client.side_effect = self.basicauth_authenticate_client
+
+ # legacy or any other uses the same IntrospectEndpoint
+ _, body, status = self.legacy.create_introspect_response(
+ self.token_uri,
+ body='token=foobar',
+ headers=self.basicauth_client_creds
+ )
+ self.assertEqual(status, 200, body)
+
+ def test_custom_authentication(self):
+ token_uri = 'http://example.com/path'
+
+ # authorization code grant
+ self.assertRaises(NotImplementedError,
+ self.web.create_token_response, token_uri,
+ body='grant_type=authorization_code&code=mock')
+
+ # password grant
+ self.validator.authenticate_client.return_value = True
+ self.assertRaises(NotImplementedError,
+ self.legacy.create_token_response, token_uri,
+ body='grant_type=password&username=abc&password=secret')
+
+ # client credentials grant
+ self.validator.authenticate_client.return_value = True
+ self.assertRaises(NotImplementedError,
+ self.backend.create_token_response, token_uri,
+ body='grant_type=client_credentials')
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py
new file mode 100644
index 0000000000..32c770ccb7
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py
@@ -0,0 +1,128 @@
+"""Ensure credentials are preserved through the authorization.
+
+The Authorization Code Grant will need to preserve state as well as redirect
+uri and the Implicit Grant will need to preserve state.
+"""
+import json
+from unittest import mock
+
+from oauthlib.oauth2 import (
+ MobileApplicationServer, RequestValidator, WebApplicationServer,
+)
+from oauthlib.oauth2.rfc6749 import errors
+
+from tests.unittest import TestCase
+
+from .test_utils import get_fragment_credentials, get_query_credentials
+
+
+class PreservationTest(TestCase):
+
+ DEFAULT_REDIRECT_URI = 'http://i.b./path'
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = self.DEFAULT_REDIRECT_URI
+ self.validator.get_code_challenge.return_value = None
+ self.validator.authenticate_client.side_effect = self.set_client
+ self.web = WebApplicationServer(self.validator)
+ self.mobile = MobileApplicationServer(self.validator)
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def test_state_preservation(self):
+ auth_uri = 'http://example.com/path?state=xyz&client_id=abc&response_type='
+
+ # authorization grant
+ h, _, s = self.web.create_authorization_response(
+ auth_uri + 'code', scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertEqual(get_query_credentials(h['Location'])['state'][0], 'xyz')
+
+ # implicit grant
+ h, _, s = self.mobile.create_authorization_response(
+ auth_uri + 'token', scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertEqual(get_fragment_credentials(h['Location'])['state'][0], 'xyz')
+
+ def test_redirect_uri_preservation(self):
+ auth_uri = 'http://example.com/path?redirect_uri=http%3A%2F%2Fi.b%2Fpath&client_id=abc'
+ redirect_uri = 'http://i.b/path'
+ token_uri = 'http://example.com/path'
+
+ # authorization grant
+ h, _, s = self.web.create_authorization_response(
+ auth_uri + '&response_type=code', scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertTrue(h['Location'].startswith(redirect_uri))
+
+ # confirm_redirect_uri should return false if the redirect uri
+ # was given in the authorization but not in the token request.
+ self.validator.confirm_redirect_uri.return_value = False
+ code = get_query_credentials(h['Location'])['code'][0]
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=%s' % code)
+ self.assertEqual(json.loads(body)['error'], 'invalid_request')
+
+ # implicit grant
+ h, _, s = self.mobile.create_authorization_response(
+ auth_uri + '&response_type=token', scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertTrue(h['Location'].startswith(redirect_uri))
+
+ def test_invalid_redirect_uri(self):
+ auth_uri = 'http://example.com/path?redirect_uri=http%3A%2F%2Fi.b%2Fpath&client_id=abc'
+ self.validator.validate_redirect_uri.return_value = False
+
+ # authorization grant
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.web.create_authorization_response,
+ auth_uri + '&response_type=code', scopes=['random'])
+
+ # implicit grant
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.mobile.create_authorization_response,
+ auth_uri + '&response_type=token', scopes=['random'])
+
+ def test_default_uri(self):
+ auth_uri = 'http://example.com/path?state=xyz&client_id=abc'
+
+ self.validator.get_default_redirect_uri.return_value = None
+
+ # authorization grant
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.web.create_authorization_response,
+ auth_uri + '&response_type=code', scopes=['random'])
+
+ # implicit grant
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.mobile.create_authorization_response,
+ auth_uri + '&response_type=token', scopes=['random'])
+
+ def test_default_uri_in_token(self):
+ auth_uri = 'http://example.com/path?state=xyz&client_id=abc'
+ token_uri = 'http://example.com/path'
+
+ # authorization grant
+ h, _, s = self.web.create_authorization_response(
+ auth_uri + '&response_type=code', scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertTrue(h['Location'].startswith(self.DEFAULT_REDIRECT_URI))
+
+ # confirm_redirect_uri should return true if the redirect uri
+ # was not given in the authorization AND not in the token request.
+ self.validator.confirm_redirect_uri.return_value = True
+ code = get_query_credentials(h['Location'])['code'][0]
+ self.validator.validate_code.return_value = True
+ _, body, s = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=%s' % code)
+ self.assertEqual(s, 200)
+ self.assertEqual(self.validator.confirm_redirect_uri.call_args[0][2], self.DEFAULT_REDIRECT_URI)
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_error_responses.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_error_responses.py
new file mode 100644
index 0000000000..f61595e213
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_error_responses.py
@@ -0,0 +1,491 @@
+"""Ensure the correct error responses are returned for all defined error types.
+"""
+import json
+from unittest import mock
+
+from oauthlib.common import urlencode
+from oauthlib.oauth2 import (
+ BackendApplicationServer, LegacyApplicationServer, MobileApplicationServer,
+ RequestValidator, WebApplicationServer,
+)
+from oauthlib.oauth2.rfc6749 import errors
+
+from tests.unittest import TestCase
+
+
+class ErrorResponseTest(TestCase):
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = None
+ self.validator.get_code_challenge.return_value = None
+ self.web = WebApplicationServer(self.validator)
+ self.mobile = MobileApplicationServer(self.validator)
+ self.legacy = LegacyApplicationServer(self.validator)
+ self.backend = BackendApplicationServer(self.validator)
+
+ def test_invalid_redirect_uri(self):
+ uri = 'https://example.com/authorize?response_type={0}&client_id=foo&redirect_uri=wrong'
+
+ # Authorization code grant
+ self.assertRaises(errors.InvalidRedirectURIError,
+ self.web.validate_authorization_request, uri.format('code'))
+ self.assertRaises(errors.InvalidRedirectURIError,
+ self.web.create_authorization_response, uri.format('code'), scopes=['foo'])
+
+ # Implicit grant
+ self.assertRaises(errors.InvalidRedirectURIError,
+ self.mobile.validate_authorization_request, uri.format('token'))
+ self.assertRaises(errors.InvalidRedirectURIError,
+ self.mobile.create_authorization_response, uri.format('token'), scopes=['foo'])
+
+ def test_invalid_default_redirect_uri(self):
+ uri = 'https://example.com/authorize?response_type={0}&client_id=foo'
+ self.validator.get_default_redirect_uri.return_value = "wrong"
+
+ # Authorization code grant
+ self.assertRaises(errors.InvalidRedirectURIError,
+ self.web.validate_authorization_request, uri.format('code'))
+ self.assertRaises(errors.InvalidRedirectURIError,
+ self.web.create_authorization_response, uri.format('code'), scopes=['foo'])
+
+ # Implicit grant
+ self.assertRaises(errors.InvalidRedirectURIError,
+ self.mobile.validate_authorization_request, uri.format('token'))
+ self.assertRaises(errors.InvalidRedirectURIError,
+ self.mobile.create_authorization_response, uri.format('token'), scopes=['foo'])
+
+ def test_missing_redirect_uri(self):
+ uri = 'https://example.com/authorize?response_type={0}&client_id=foo'
+
+ # Authorization code grant
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.web.validate_authorization_request, uri.format('code'))
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.web.create_authorization_response, uri.format('code'), scopes=['foo'])
+
+ # Implicit grant
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.mobile.validate_authorization_request, uri.format('token'))
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.mobile.create_authorization_response, uri.format('token'), scopes=['foo'])
+
+ def test_mismatching_redirect_uri(self):
+ uri = 'https://example.com/authorize?response_type={0}&client_id=foo&redirect_uri=https%3A%2F%2Fi.b%2Fback'
+
+ # Authorization code grant
+ self.validator.validate_redirect_uri.return_value = False
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.web.validate_authorization_request, uri.format('code'))
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.web.create_authorization_response, uri.format('code'), scopes=['foo'])
+
+ # Implicit grant
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.mobile.validate_authorization_request, uri.format('token'))
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.mobile.create_authorization_response, uri.format('token'), scopes=['foo'])
+
+ def test_missing_client_id(self):
+ uri = 'https://example.com/authorize?response_type={0}&redirect_uri=https%3A%2F%2Fi.b%2Fback'
+
+ # Authorization code grant
+ self.validator.validate_redirect_uri.return_value = False
+ self.assertRaises(errors.MissingClientIdError,
+ self.web.validate_authorization_request, uri.format('code'))
+ self.assertRaises(errors.MissingClientIdError,
+ self.web.create_authorization_response, uri.format('code'), scopes=['foo'])
+
+ # Implicit grant
+ self.assertRaises(errors.MissingClientIdError,
+ self.mobile.validate_authorization_request, uri.format('token'))
+ self.assertRaises(errors.MissingClientIdError,
+ self.mobile.create_authorization_response, uri.format('token'), scopes=['foo'])
+
+ def test_invalid_client_id(self):
+ uri = 'https://example.com/authorize?response_type={0}&client_id=foo&redirect_uri=https%3A%2F%2Fi.b%2Fback'
+
+ # Authorization code grant
+ self.validator.validate_client_id.return_value = False
+ self.assertRaises(errors.InvalidClientIdError,
+ self.web.validate_authorization_request, uri.format('code'))
+ self.assertRaises(errors.InvalidClientIdError,
+ self.web.create_authorization_response, uri.format('code'), scopes=['foo'])
+
+ # Implicit grant
+ self.assertRaises(errors.InvalidClientIdError,
+ self.mobile.validate_authorization_request, uri.format('token'))
+ self.assertRaises(errors.InvalidClientIdError,
+ self.mobile.create_authorization_response, uri.format('token'), scopes=['foo'])
+
+ def test_empty_parameter(self):
+ uri = 'https://example.com/authorize?client_id=foo&redirect_uri=https%3A%2F%2Fi.b%2Fback&response_type=code&'
+
+ # Authorization code grant
+ self.assertRaises(errors.InvalidRequestFatalError,
+ self.web.validate_authorization_request, uri)
+
+ # Implicit grant
+ self.assertRaises(errors.InvalidRequestFatalError,
+ self.mobile.validate_authorization_request, uri)
+
+ def test_invalid_request(self):
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+ token_uri = 'https://i.b/token'
+
+ invalid_bodies = [
+ # duplicate params
+ 'grant_type=authorization_code&client_id=nope&client_id=nope&code=foo'
+ ]
+ for body in invalid_bodies:
+ _, body, _ = self.web.create_token_response(token_uri,
+ body=body)
+ self.assertEqual('invalid_request', json.loads(body)['error'])
+
+ # Password credentials grant
+ invalid_bodies = [
+ # duplicate params
+ 'grant_type=password&username=foo&username=bar&password=baz'
+ # missing username
+ 'grant_type=password&password=baz'
+ # missing password
+ 'grant_type=password&username=foo'
+ ]
+ self.validator.authenticate_client.side_effect = self.set_client
+ for body in invalid_bodies:
+ _, body, _ = self.legacy.create_token_response(token_uri,
+ body=body)
+ self.assertEqual('invalid_request', json.loads(body)['error'])
+
+ # Client credentials grant
+ invalid_bodies = [
+ # duplicate params
+ 'grant_type=client_credentials&scope=foo&scope=bar'
+ ]
+ for body in invalid_bodies:
+ _, body, _ = self.backend.create_token_response(token_uri,
+ body=body)
+ self.assertEqual('invalid_request', json.loads(body)['error'])
+
+ def test_invalid_request_duplicate_params(self):
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+ uri = 'https://i.b/auth?client_id=foo&client_id=bar&response_type={0}'
+ description = 'Duplicate client_id parameter.'
+
+ # Authorization code
+ self.assertRaisesRegex(errors.InvalidRequestFatalError,
+ description,
+ self.web.validate_authorization_request,
+ uri.format('code'))
+ self.assertRaisesRegex(errors.InvalidRequestFatalError,
+ description,
+ self.web.create_authorization_response,
+ uri.format('code'), scopes=['foo'])
+
+ # Implicit grant
+ self.assertRaisesRegex(errors.InvalidRequestFatalError,
+ description,
+ self.mobile.validate_authorization_request,
+ uri.format('token'))
+ self.assertRaisesRegex(errors.InvalidRequestFatalError,
+ description,
+ self.mobile.create_authorization_response,
+ uri.format('token'), scopes=['foo'])
+
+ def test_invalid_request_missing_response_type(self):
+
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+
+ uri = 'https://i.b/auth?client_id=foo'
+
+ # Authorization code
+ self.assertRaises(errors.MissingResponseTypeError,
+ self.web.validate_authorization_request,
+ uri.format('code'))
+ h, _, s = self.web.create_authorization_response(uri, scopes=['foo'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertIn('error=invalid_request', h['Location'])
+
+ # Implicit grant
+ self.assertRaises(errors.MissingResponseTypeError,
+ self.mobile.validate_authorization_request,
+ uri.format('token'))
+ h, _, s = self.mobile.create_authorization_response(uri, scopes=['foo'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertIn('error=invalid_request', h['Location'])
+
+ def test_unauthorized_client(self):
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+ self.validator.validate_grant_type.return_value = False
+ self.validator.validate_response_type.return_value = False
+ self.validator.authenticate_client.side_effect = self.set_client
+ token_uri = 'https://i.b/token'
+
+ # Authorization code grant
+ self.assertRaises(errors.UnauthorizedClientError,
+ self.web.validate_authorization_request,
+ 'https://i.b/auth?response_type=code&client_id=foo')
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=foo')
+ self.assertEqual('unauthorized_client', json.loads(body)['error'])
+
+ # Implicit grant
+ self.assertRaises(errors.UnauthorizedClientError,
+ self.mobile.validate_authorization_request,
+ 'https://i.b/auth?response_type=token&client_id=foo')
+
+ # Password credentials grant
+ _, body, _ = self.legacy.create_token_response(token_uri,
+ body='grant_type=password&username=foo&password=bar')
+ self.assertEqual('unauthorized_client', json.loads(body)['error'])
+
+ # Client credentials grant
+ _, body, _ = self.backend.create_token_response(token_uri,
+ body='grant_type=client_credentials')
+ self.assertEqual('unauthorized_client', json.loads(body)['error'])
+
+ def test_access_denied(self):
+ self.validator.authenticate_client.side_effect = self.set_client
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+ self.validator.confirm_redirect_uri.return_value = False
+ token_uri = 'https://i.b/token'
+ # Authorization code grant
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=foo')
+ self.assertEqual('invalid_request', json.loads(body)['error'])
+
+ def test_access_denied_no_default_redirecturi(self):
+ self.validator.authenticate_client.side_effect = self.set_client
+ self.validator.get_default_redirect_uri.return_value = None
+ token_uri = 'https://i.b/token'
+ # Authorization code grant
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=foo')
+ self.assertEqual('invalid_request', json.loads(body)['error'])
+
+ def test_unsupported_response_type(self):
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+
+ # Authorization code grant
+ self.assertRaises(errors.UnsupportedResponseTypeError,
+ self.web.validate_authorization_request,
+ 'https://i.b/auth?response_type=foo&client_id=foo')
+
+ # Implicit grant
+ self.assertRaises(errors.UnsupportedResponseTypeError,
+ self.mobile.validate_authorization_request,
+ 'https://i.b/auth?response_type=foo&client_id=foo')
+
+ def test_invalid_scope(self):
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+ self.validator.validate_scopes.return_value = False
+ self.validator.authenticate_client.side_effect = self.set_client
+
+ # Authorization code grant
+ self.assertRaises(errors.InvalidScopeError,
+ self.web.validate_authorization_request,
+ 'https://i.b/auth?response_type=code&client_id=foo')
+
+ # Implicit grant
+ self.assertRaises(errors.InvalidScopeError,
+ self.mobile.validate_authorization_request,
+ 'https://i.b/auth?response_type=token&client_id=foo')
+
+ # Password credentials grant
+ _, body, _ = self.legacy.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=password&username=foo&password=bar')
+ self.assertEqual('invalid_scope', json.loads(body)['error'])
+
+ # Client credentials grant
+ _, body, _ = self.backend.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=client_credentials')
+ self.assertEqual('invalid_scope', json.loads(body)['error'])
+
+ def test_server_error(self):
+ def raise_error(*args, **kwargs):
+ raise ValueError()
+
+ self.validator.validate_client_id.side_effect = raise_error
+ self.validator.authenticate_client.side_effect = raise_error
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+
+ # Authorization code grant
+ self.web.catch_errors = True
+ _, _, s = self.web.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=code',
+ scopes=['foo'])
+ self.assertEqual(s, 500)
+ _, _, s = self.web.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=authorization_code&code=foo',
+ scopes=['foo'])
+ self.assertEqual(s, 500)
+
+ # Implicit grant
+ self.mobile.catch_errors = True
+ _, _, s = self.mobile.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=token',
+ scopes=['foo'])
+ self.assertEqual(s, 500)
+
+ # Password credentials grant
+ self.legacy.catch_errors = True
+ _, _, s = self.legacy.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=password&username=foo&password=foo')
+ self.assertEqual(s, 500)
+
+ # Client credentials grant
+ self.backend.catch_errors = True
+ _, _, s = self.backend.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=client_credentials')
+ self.assertEqual(s, 500)
+
+ def test_temporarily_unavailable(self):
+ # Authorization code grant
+ self.web.available = False
+ _, _, s = self.web.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=code',
+ scopes=['foo'])
+ self.assertEqual(s, 503)
+ _, _, s = self.web.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=authorization_code&code=foo',
+ scopes=['foo'])
+ self.assertEqual(s, 503)
+
+ # Implicit grant
+ self.mobile.available = False
+ _, _, s = self.mobile.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=token',
+ scopes=['foo'])
+ self.assertEqual(s, 503)
+
+ # Password credentials grant
+ self.legacy.available = False
+ _, _, s = self.legacy.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=password&username=foo&password=foo')
+ self.assertEqual(s, 503)
+
+ # Client credentials grant
+ self.backend.available = False
+ _, _, s = self.backend.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=client_credentials')
+ self.assertEqual(s, 503)
+
+ def test_invalid_client(self):
+ self.validator.authenticate_client.return_value = False
+ self.validator.authenticate_client_id.return_value = False
+
+ # Authorization code grant
+ _, body, _ = self.web.create_token_response('https://i.b/token',
+ body='grant_type=authorization_code&code=foo')
+ self.assertEqual('invalid_client', json.loads(body)['error'])
+
+ # Password credentials grant
+ _, body, _ = self.legacy.create_token_response('https://i.b/token',
+ body='grant_type=password&username=foo&password=bar')
+ self.assertEqual('invalid_client', json.loads(body)['error'])
+
+ # Client credentials grant
+ _, body, _ = self.legacy.create_token_response('https://i.b/token',
+ body='grant_type=client_credentials')
+ self.assertEqual('invalid_client', json.loads(body)['error'])
+
+ def test_invalid_grant(self):
+ self.validator.authenticate_client.side_effect = self.set_client
+
+ # Authorization code grant
+ self.validator.validate_code.return_value = False
+ _, body, _ = self.web.create_token_response('https://i.b/token',
+ body='grant_type=authorization_code&code=foo')
+ self.assertEqual('invalid_grant', json.loads(body)['error'])
+
+ # Password credentials grant
+ self.validator.validate_user.return_value = False
+ _, body, _ = self.legacy.create_token_response('https://i.b/token',
+ body='grant_type=password&username=foo&password=bar')
+ self.assertEqual('invalid_grant', json.loads(body)['error'])
+
+ def test_unsupported_grant_type(self):
+ self.validator.authenticate_client.side_effect = self.set_client
+
+ # Authorization code grant
+ _, body, _ = self.web.create_token_response('https://i.b/token',
+ body='grant_type=bar&code=foo')
+ self.assertEqual('unsupported_grant_type', json.loads(body)['error'])
+
+ # Password credentials grant
+ _, body, _ = self.legacy.create_token_response('https://i.b/token',
+ body='grant_type=bar&username=foo&password=bar')
+ self.assertEqual('unsupported_grant_type', json.loads(body)['error'])
+
+ # Client credentials grant
+ _, body, _ = self.backend.create_token_response('https://i.b/token',
+ body='grant_type=bar')
+ self.assertEqual('unsupported_grant_type', json.loads(body)['error'])
+
+ def test_invalid_request_method(self):
+ test_methods = ['GET', 'pUt', 'dEleTe', 'paTcH']
+ test_methods = test_methods + [x.lower() for x in test_methods] + [x.upper() for x in test_methods]
+ for method in test_methods:
+ self.validator.authenticate_client.side_effect = self.set_client
+
+ uri = "http://i/b/token/"
+ try:
+ _, body, s = self.web.create_token_response(uri,
+ body='grant_type=access_token&code=123', http_method=method)
+ self.fail('This should have failed with InvalidRequestError')
+ except errors.InvalidRequestError as ire:
+ self.assertIn('Unsupported request method', ire.description)
+
+ try:
+ _, body, s = self.legacy.create_token_response(uri,
+ body='grant_type=access_token&code=123', http_method=method)
+ self.fail('This should have failed with InvalidRequestError')
+ except errors.InvalidRequestError as ire:
+ self.assertIn('Unsupported request method', ire.description)
+
+ try:
+ _, body, s = self.backend.create_token_response(uri,
+ body='grant_type=access_token&code=123', http_method=method)
+ self.fail('This should have failed with InvalidRequestError')
+ except errors.InvalidRequestError as ire:
+ self.assertIn('Unsupported request method', ire.description)
+
+ def test_invalid_post_request(self):
+ self.validator.authenticate_client.side_effect = self.set_client
+ for param in ['token', 'secret', 'code', 'foo']:
+ uri = 'https://i/b/token?' + urlencode([(param, 'secret')])
+ try:
+ _, body, s = self.web.create_token_response(uri,
+ body='grant_type=access_token&code=123')
+ self.fail('This should have failed with InvalidRequestError')
+ except errors.InvalidRequestError as ire:
+ self.assertIn('URL query parameters are not allowed', ire.description)
+
+ try:
+ _, body, s = self.legacy.create_token_response(uri,
+ body='grant_type=access_token&code=123')
+ self.fail('This should have failed with InvalidRequestError')
+ except errors.InvalidRequestError as ire:
+ self.assertIn('URL query parameters are not allowed', ire.description)
+
+ try:
+ _, body, s = self.backend.create_token_response(uri,
+ body='grant_type=access_token&code=123')
+ self.fail('This should have failed with InvalidRequestError')
+ except errors.InvalidRequestError as ire:
+ self.assertIn('URL query parameters are not allowed', ire.description)
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_extra_credentials.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_extra_credentials.py
new file mode 100644
index 0000000000..97aaf86dff
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_extra_credentials.py
@@ -0,0 +1,69 @@
+"""Ensure extra credentials can be supplied for inclusion in tokens.
+"""
+from unittest import mock
+
+from oauthlib.oauth2 import (
+ BackendApplicationServer, LegacyApplicationServer, MobileApplicationServer,
+ RequestValidator, WebApplicationServer,
+)
+
+from tests.unittest import TestCase
+
+
+class ExtraCredentialsTest(TestCase):
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+ self.web = WebApplicationServer(self.validator)
+ self.mobile = MobileApplicationServer(self.validator)
+ self.legacy = LegacyApplicationServer(self.validator)
+ self.backend = BackendApplicationServer(self.validator)
+
+ def test_post_authorization_request(self):
+ def save_code(client_id, token, request):
+ self.assertEqual('creds', request.extra)
+
+ def save_token(token, request):
+ self.assertEqual('creds', request.extra)
+
+ # Authorization code grant
+ self.validator.save_authorization_code.side_effect = save_code
+ self.web.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=code',
+ scopes=['foo'],
+ credentials={'extra': 'creds'})
+
+ # Implicit grant
+ self.validator.save_bearer_token.side_effect = save_token
+ self.mobile.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=token',
+ scopes=['foo'],
+ credentials={'extra': 'creds'})
+
+ def test_token_request(self):
+ def save_token(token, request):
+ self.assertIn('extra', token)
+
+ self.validator.save_bearer_token.side_effect = save_token
+ self.validator.authenticate_client.side_effect = self.set_client
+
+ # Authorization code grant
+ self.web.create_token_response('https://i.b/token',
+ body='grant_type=authorization_code&code=foo',
+ credentials={'extra': 'creds'})
+
+ # Password credentials grant
+ self.legacy.create_token_response('https://i.b/token',
+ body='grant_type=password&username=foo&password=bar',
+ credentials={'extra': 'creds'})
+
+ # Client credentials grant
+ self.backend.create_token_response('https://i.b/token',
+ body='grant_type=client_credentials',
+ credentials={'extra': 'creds'})
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py
new file mode 100644
index 0000000000..6d3d119a3b
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py
@@ -0,0 +1,168 @@
+# -*- coding: utf-8 -*-
+from json import loads
+from unittest.mock import MagicMock
+
+from oauthlib.common import urlencode
+from oauthlib.oauth2 import IntrospectEndpoint, RequestValidator
+
+from tests.unittest import TestCase
+
+
+class IntrospectEndpointTest(TestCase):
+
+ def setUp(self):
+ self.validator = MagicMock(wraps=RequestValidator())
+ self.validator.client_authentication_required.return_value = True
+ self.validator.authenticate_client.return_value = True
+ self.validator.validate_bearer_token.return_value = True
+ self.validator.introspect_token.return_value = {}
+ self.endpoint = IntrospectEndpoint(self.validator)
+
+ self.uri = 'should_not_matter'
+ self.headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ }
+ self.resp_h = {
+ 'Cache-Control': 'no-store',
+ 'Content-Type': 'application/json',
+ 'Pragma': 'no-cache'
+ }
+ self.resp_b = {
+ "active": True
+ }
+
+ def test_introspect_token(self):
+ for token_type in ('access_token', 'refresh_token', 'invalid'):
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', token_type)])
+ h, b, s = self.endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b), self.resp_b)
+ self.assertEqual(s, 200)
+
+ def test_introspect_token_nohint(self):
+ # don't specify token_type_hint
+ body = urlencode([('token', 'foo')])
+ h, b, s = self.endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b), self.resp_b)
+ self.assertEqual(s, 200)
+
+ def test_introspect_token_false(self):
+ self.validator.introspect_token.return_value = None
+ body = urlencode([('token', 'foo')])
+ h, b, s = self.endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b), {"active": False})
+ self.assertEqual(s, 200)
+
+ def test_introspect_token_claims(self):
+ self.validator.introspect_token.return_value = {"foo": "bar"}
+ body = urlencode([('token', 'foo')])
+ h, b, s = self.endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b), {"active": True, "foo": "bar"})
+ self.assertEqual(s, 200)
+
+ def test_introspect_token_claims_spoof_active(self):
+ self.validator.introspect_token.return_value = {"foo": "bar", "active": False}
+ body = urlencode([('token', 'foo')])
+ h, b, s = self.endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b), {"active": True, "foo": "bar"})
+ self.assertEqual(s, 200)
+
+ def test_introspect_token_client_authentication_failed(self):
+ self.validator.authenticate_client.return_value = False
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', 'access_token')])
+ h, b, s = self.endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, {
+ 'Content-Type': 'application/json',
+ 'Cache-Control': 'no-store',
+ 'Pragma': 'no-cache',
+ "WWW-Authenticate": 'Bearer error="invalid_client"'
+ })
+ self.assertEqual(loads(b)['error'], 'invalid_client')
+ self.assertEqual(s, 401)
+
+ def test_introspect_token_public_client_authentication(self):
+ self.validator.client_authentication_required.return_value = False
+ self.validator.authenticate_client_id.return_value = True
+ for token_type in ('access_token', 'refresh_token', 'invalid'):
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', token_type)])
+ h, b, s = self.endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b), self.resp_b)
+ self.assertEqual(s, 200)
+
+ def test_introspect_token_public_client_authentication_failed(self):
+ self.validator.client_authentication_required.return_value = False
+ self.validator.authenticate_client_id.return_value = False
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', 'access_token')])
+ h, b, s = self.endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, {
+ 'Content-Type': 'application/json',
+ 'Cache-Control': 'no-store',
+ 'Pragma': 'no-cache',
+ "WWW-Authenticate": 'Bearer error="invalid_client"'
+ })
+ self.assertEqual(loads(b)['error'], 'invalid_client')
+ self.assertEqual(s, 401)
+
+ def test_introspect_unsupported_token(self):
+ endpoint = IntrospectEndpoint(self.validator,
+ supported_token_types=['access_token'])
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', 'refresh_token')])
+ h, b, s = endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b)['error'], 'unsupported_token_type')
+ self.assertEqual(s, 400)
+
+ h, b, s = endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body='')
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b)['error'], 'invalid_request')
+ self.assertEqual(s, 400)
+
+ def test_introspect_invalid_request_method(self):
+ endpoint = IntrospectEndpoint(self.validator,
+ supported_token_types=['access_token'])
+ test_methods = ['GET', 'pUt', 'dEleTe', 'paTcH']
+ test_methods = test_methods + [x.lower() for x in test_methods] + [x.upper() for x in test_methods]
+ for method in test_methods:
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', 'refresh_token')])
+ h, b, s = endpoint.create_introspect_response(self.uri,
+ http_method = method, headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b)['error'], 'invalid_request')
+ self.assertIn('Unsupported request method', loads(b)['error_description'])
+ self.assertEqual(s, 400)
+
+ def test_introspect_bad_post_request(self):
+ endpoint = IntrospectEndpoint(self.validator,
+ supported_token_types=['access_token'])
+ for param in ['token', 'secret', 'code', 'foo']:
+ uri = 'http://some.endpoint?' + urlencode([(param, 'secret')])
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', 'access_token')])
+ h, b, s = endpoint.create_introspect_response(
+ uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b)['error'], 'invalid_request')
+ self.assertIn('query parameters are not allowed', loads(b)['error_description'])
+ self.assertEqual(s, 400)
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_metadata.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_metadata.py
new file mode 100644
index 0000000000..1f5b912100
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_metadata.py
@@ -0,0 +1,148 @@
+# -*- coding: utf-8 -*-
+import json
+
+from oauthlib.oauth2 import MetadataEndpoint, Server, TokenEndpoint
+
+from tests.unittest import TestCase
+
+
+class MetadataEndpointTest(TestCase):
+ def setUp(self):
+ self.metadata = {
+ "issuer": 'https://foo.bar'
+ }
+
+ def test_openid_oauth2_preconfigured(self):
+ default_claims = {
+ "issuer": 'https://foo.bar',
+ "authorization_endpoint": "https://foo.bar/authorize",
+ "revocation_endpoint": "https://foo.bar/revoke",
+ "introspection_endpoint": "https://foo.bar/introspect",
+ "token_endpoint": "https://foo.bar/token"
+ }
+ from oauthlib.oauth2 import Server as OAuth2Server
+ from oauthlib.openid import Server as OpenIDServer
+
+ endpoint = OAuth2Server(None)
+ metadata = MetadataEndpoint([endpoint], default_claims)
+ oauth2_claims = metadata.claims
+
+ endpoint = OpenIDServer(None)
+ metadata = MetadataEndpoint([endpoint], default_claims)
+ openid_claims = metadata.claims
+
+ # Pure OAuth2 Authorization Metadata are similar with OpenID but
+ # response_type not! (OIDC contains "id_token" and hybrid flows)
+ del oauth2_claims['response_types_supported']
+ del openid_claims['response_types_supported']
+
+ self.maxDiff = None
+ self.assertEqual(openid_claims, oauth2_claims)
+
+ def test_create_metadata_response(self):
+ endpoint = TokenEndpoint(None, None, grant_types={"password": None})
+ metadata = MetadataEndpoint([endpoint], {
+ "issuer": 'https://foo.bar',
+ "token_endpoint": "https://foo.bar/token"
+ })
+ headers, body, status = metadata.create_metadata_response('/', 'GET')
+ assert headers == {
+ 'Content-Type': 'application/json',
+ 'Access-Control-Allow-Origin': '*',
+ }
+ claims = json.loads(body)
+ assert claims['issuer'] == 'https://foo.bar'
+
+ def test_token_endpoint(self):
+ endpoint = TokenEndpoint(None, None, grant_types={"password": None})
+ metadata = MetadataEndpoint([endpoint], {
+ "issuer": 'https://foo.bar',
+ "token_endpoint": "https://foo.bar/token"
+ })
+ self.assertIn("grant_types_supported", metadata.claims)
+ self.assertEqual(metadata.claims["grant_types_supported"], ["password"])
+
+ def test_token_endpoint_overridden(self):
+ endpoint = TokenEndpoint(None, None, grant_types={"password": None})
+ metadata = MetadataEndpoint([endpoint], {
+ "issuer": 'https://foo.bar',
+ "token_endpoint": "https://foo.bar/token",
+ "grant_types_supported": ["pass_word_special_provider"]
+ })
+ self.assertIn("grant_types_supported", metadata.claims)
+ self.assertEqual(metadata.claims["grant_types_supported"], ["pass_word_special_provider"])
+
+ def test_mandatory_fields(self):
+ metadata = MetadataEndpoint([], self.metadata)
+ self.assertIn("issuer", metadata.claims)
+ self.assertEqual(metadata.claims["issuer"], 'https://foo.bar')
+
+ def test_server_metadata(self):
+ endpoint = Server(None)
+ metadata = MetadataEndpoint([endpoint], {
+ "issuer": 'https://foo.bar',
+ "authorization_endpoint": "https://foo.bar/authorize",
+ "introspection_endpoint": "https://foo.bar/introspect",
+ "revocation_endpoint": "https://foo.bar/revoke",
+ "token_endpoint": "https://foo.bar/token",
+ "jwks_uri": "https://foo.bar/certs",
+ "scopes_supported": ["email", "profile"]
+ })
+ expected_claims = {
+ "issuer": "https://foo.bar",
+ "authorization_endpoint": "https://foo.bar/authorize",
+ "introspection_endpoint": "https://foo.bar/introspect",
+ "revocation_endpoint": "https://foo.bar/revoke",
+ "token_endpoint": "https://foo.bar/token",
+ "jwks_uri": "https://foo.bar/certs",
+ "scopes_supported": ["email", "profile"],
+ "grant_types_supported": [
+ "authorization_code",
+ "password",
+ "client_credentials",
+ "refresh_token",
+ "implicit"
+ ],
+ "token_endpoint_auth_methods_supported": [
+ "client_secret_post",
+ "client_secret_basic"
+ ],
+ "response_types_supported": [
+ "code",
+ "token"
+ ],
+ "response_modes_supported": [
+ "query",
+ "fragment"
+ ],
+ "code_challenge_methods_supported": [
+ "plain",
+ "S256"
+ ],
+ "revocation_endpoint_auth_methods_supported": [
+ "client_secret_post",
+ "client_secret_basic"
+ ],
+ "introspection_endpoint_auth_methods_supported": [
+ "client_secret_post",
+ "client_secret_basic"
+ ]
+ }
+
+ def sort_list(claims):
+ for k in claims.keys():
+ claims[k] = sorted(claims[k])
+
+ sort_list(metadata.claims)
+ sort_list(expected_claims)
+ self.assertEqual(sorted(metadata.claims.items()), sorted(expected_claims.items()))
+
+ def test_metadata_validate_issuer(self):
+ with self.assertRaises(ValueError):
+ endpoint = TokenEndpoint(
+ None, None, grant_types={"password": None},
+ )
+ metadata = MetadataEndpoint([endpoint], {
+ "issuer": 'http://foo.bar',
+ "token_endpoint": "https://foo.bar/token",
+ })
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py
new file mode 100644
index 0000000000..04533888e9
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py
@@ -0,0 +1,108 @@
+"""Ensure all tokens are associated with a resource owner.
+"""
+import json
+from unittest import mock
+
+from oauthlib.oauth2 import (
+ BackendApplicationServer, LegacyApplicationServer, MobileApplicationServer,
+ RequestValidator, WebApplicationServer,
+)
+
+from tests.unittest import TestCase
+
+from .test_utils import get_fragment_credentials, get_query_credentials
+
+
+class ResourceOwnerAssociationTest(TestCase):
+
+ auth_uri = 'http://example.com/path?client_id=abc'
+ token_uri = 'http://example.com/path'
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def set_user(self, client_id, code, client, request):
+ request.user = 'test'
+ return True
+
+ def set_user_from_username(self, username, password, client, request):
+ request.user = 'test'
+ return True
+
+ def set_user_from_credentials(self, request):
+ request.user = 'test'
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def inspect_client(self, request, refresh_token=False):
+ if not request.user:
+ raise ValueError()
+ return 'abc'
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = 'http://i.b./path'
+ self.validator.get_code_challenge.return_value = None
+ self.validator.authenticate_client.side_effect = self.set_client
+ self.web = WebApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.mobile = MobileApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.legacy = LegacyApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.backend = BackendApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+
+ def test_web_application(self):
+ # TODO: code generator + intercept test
+ h, _, s = self.web.create_authorization_response(
+ self.auth_uri + '&response_type=code',
+ credentials={'user': 'test'}, scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ code = get_query_credentials(h['Location'])['code'][0]
+ self.assertRaises(ValueError,
+ self.web.create_token_response, self.token_uri,
+ body='grant_type=authorization_code&code=%s' % code)
+
+ self.validator.validate_code.side_effect = self.set_user
+ _, body, _ = self.web.create_token_response(self.token_uri,
+ body='grant_type=authorization_code&code=%s' % code)
+ self.assertEqual(json.loads(body)['access_token'], 'abc')
+
+ def test_mobile_application(self):
+ self.assertRaises(ValueError,
+ self.mobile.create_authorization_response,
+ self.auth_uri + '&response_type=token')
+
+ h, _, s = self.mobile.create_authorization_response(
+ self.auth_uri + '&response_type=token',
+ credentials={'user': 'test'}, scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertEqual(get_fragment_credentials(h['Location'])['access_token'][0], 'abc')
+
+ def test_legacy_application(self):
+ body = 'grant_type=password&username=abc&password=secret'
+ self.assertRaises(ValueError,
+ self.legacy.create_token_response,
+ self.token_uri, body=body)
+
+ self.validator.validate_user.side_effect = self.set_user_from_username
+ _, body, _ = self.legacy.create_token_response(
+ self.token_uri, body=body)
+ self.assertEqual(json.loads(body)['access_token'], 'abc')
+
+ def test_backend_application(self):
+ body = 'grant_type=client_credentials'
+ self.assertRaises(ValueError,
+ self.backend.create_token_response,
+ self.token_uri, body=body)
+
+ self.validator.authenticate_client.side_effect = self.set_user_from_credentials
+ _, body, _ = self.backend.create_token_response(
+ self.token_uri, body=body)
+ self.assertEqual(json.loads(body)['access_token'], 'abc')
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_revocation_endpoint.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_revocation_endpoint.py
new file mode 100644
index 0000000000..338dbd91fa
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_revocation_endpoint.py
@@ -0,0 +1,148 @@
+# -*- coding: utf-8 -*-
+from json import loads
+from unittest.mock import MagicMock
+
+from oauthlib.common import urlencode
+from oauthlib.oauth2 import RequestValidator, RevocationEndpoint
+
+from tests.unittest import TestCase
+
+
+class RevocationEndpointTest(TestCase):
+
+ def setUp(self):
+ self.validator = MagicMock(wraps=RequestValidator())
+ self.validator.client_authentication_required.return_value = True
+ self.validator.authenticate_client.return_value = True
+ self.validator.revoke_token.return_value = True
+ self.endpoint = RevocationEndpoint(self.validator)
+
+ self.uri = 'https://example.com/revoke_token'
+ self.headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ }
+ self.resp_h = {
+ 'Cache-Control': 'no-store',
+ 'Content-Type': 'application/json',
+ 'Pragma': 'no-cache'
+ }
+
+ def test_revoke_token(self):
+ for token_type in ('access_token', 'refresh_token', 'invalid'):
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', token_type)])
+ h, b, s = self.endpoint.create_revocation_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, {})
+ self.assertEqual(b, '')
+ self.assertEqual(s, 200)
+
+ # don't specify token_type_hint
+ body = urlencode([('token', 'foo')])
+ h, b, s = self.endpoint.create_revocation_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, {})
+ self.assertEqual(b, '')
+ self.assertEqual(s, 200)
+
+ def test_revoke_token_client_authentication_failed(self):
+ self.validator.authenticate_client.return_value = False
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', 'access_token')])
+ h, b, s = self.endpoint.create_revocation_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, {
+ 'Content-Type': 'application/json',
+ 'Cache-Control': 'no-store',
+ 'Pragma': 'no-cache',
+ "WWW-Authenticate": 'Bearer error="invalid_client"'
+ })
+ self.assertEqual(loads(b)['error'], 'invalid_client')
+ self.assertEqual(s, 401)
+
+ def test_revoke_token_public_client_authentication(self):
+ self.validator.client_authentication_required.return_value = False
+ self.validator.authenticate_client_id.return_value = True
+ for token_type in ('access_token', 'refresh_token', 'invalid'):
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', token_type)])
+ h, b, s = self.endpoint.create_revocation_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, {})
+ self.assertEqual(b, '')
+ self.assertEqual(s, 200)
+
+ def test_revoke_token_public_client_authentication_failed(self):
+ self.validator.client_authentication_required.return_value = False
+ self.validator.authenticate_client_id.return_value = False
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', 'access_token')])
+ h, b, s = self.endpoint.create_revocation_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, {
+ 'Content-Type': 'application/json',
+ 'Cache-Control': 'no-store',
+ 'Pragma': 'no-cache',
+ "WWW-Authenticate": 'Bearer error="invalid_client"'
+ })
+ self.assertEqual(loads(b)['error'], 'invalid_client')
+ self.assertEqual(s, 401)
+
+ def test_revoke_with_callback(self):
+ endpoint = RevocationEndpoint(self.validator, enable_jsonp=True)
+ callback = 'package.hello_world'
+ for token_type in ('access_token', 'refresh_token', 'invalid'):
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', token_type),
+ ('callback', callback)])
+ h, b, s = endpoint.create_revocation_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, {})
+ self.assertEqual(b, callback + '();')
+ self.assertEqual(s, 200)
+
+ def test_revoke_unsupported_token(self):
+ endpoint = RevocationEndpoint(self.validator,
+ supported_token_types=['access_token'])
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', 'refresh_token')])
+ h, b, s = endpoint.create_revocation_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b)['error'], 'unsupported_token_type')
+ self.assertEqual(s, 400)
+
+ h, b, s = endpoint.create_revocation_response(self.uri,
+ headers=self.headers, body='')
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b)['error'], 'invalid_request')
+ self.assertEqual(s, 400)
+
+ def test_revoke_invalid_request_method(self):
+ endpoint = RevocationEndpoint(self.validator,
+ supported_token_types=['access_token'])
+ test_methods = ['GET', 'pUt', 'dEleTe', 'paTcH']
+ test_methods = test_methods + [x.lower() for x in test_methods] + [x.upper() for x in test_methods]
+ for method in test_methods:
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', 'refresh_token')])
+ h, b, s = endpoint.create_revocation_response(self.uri,
+ http_method = method, headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b)['error'], 'invalid_request')
+ self.assertIn('Unsupported request method', loads(b)['error_description'])
+ self.assertEqual(s, 400)
+
+ def test_revoke_bad_post_request(self):
+ endpoint = RevocationEndpoint(self.validator,
+ supported_token_types=['access_token'])
+ for param in ['token', 'secret', 'code', 'foo']:
+ uri = 'http://some.endpoint?' + urlencode([(param, 'secret')])
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', 'access_token')])
+ h, b, s = endpoint.create_revocation_response(uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b)['error'], 'invalid_request')
+ self.assertIn('query parameters are not allowed', loads(b)['error_description'])
+ self.assertEqual(s, 400)
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_scope_handling.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_scope_handling.py
new file mode 100644
index 0000000000..4c87d9c7c8
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_scope_handling.py
@@ -0,0 +1,193 @@
+"""Ensure scope is preserved across authorization.
+
+Fairly trivial in all grants except the Authorization Code Grant where scope
+need to be persisted temporarily in an authorization code.
+"""
+import json
+from unittest import mock
+
+from oauthlib.oauth2 import (
+ BackendApplicationServer, LegacyApplicationServer, MobileApplicationServer,
+ RequestValidator, Server, WebApplicationServer,
+)
+
+from tests.unittest import TestCase
+
+from .test_utils import get_fragment_credentials, get_query_credentials
+
+
+class TestScopeHandling(TestCase):
+
+ DEFAULT_REDIRECT_URI = 'http://i.b./path'
+
+ def set_scopes(self, scopes):
+ def set_request_scopes(client_id, code, client, request):
+ request.scopes = scopes
+ return True
+ return set_request_scopes
+
+ def set_user(self, request):
+ request.user = 'foo'
+ request.client_id = 'bar'
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = TestScopeHandling.DEFAULT_REDIRECT_URI
+ self.validator.get_code_challenge.return_value = None
+ self.validator.authenticate_client.side_effect = self.set_client
+ self.server = Server(self.validator)
+ self.web = WebApplicationServer(self.validator)
+ self.mobile = MobileApplicationServer(self.validator)
+ self.legacy = LegacyApplicationServer(self.validator)
+ self.backend = BackendApplicationServer(self.validator)
+
+ def test_scope_extraction(self):
+ scopes = (
+ ('images', ['images']),
+ ('images+videos', ['images', 'videos']),
+ ('images+videos+openid', ['images', 'videos', 'openid']),
+ ('http%3A%2f%2fa.b%2fvideos', ['http://a.b/videos']),
+ ('http%3A%2f%2fa.b%2fvideos+pics', ['http://a.b/videos', 'pics']),
+ ('pics+http%3A%2f%2fa.b%2fvideos', ['pics', 'http://a.b/videos']),
+ ('http%3A%2f%2fa.b%2fvideos+https%3A%2f%2fc.d%2Fsecret', ['http://a.b/videos', 'https://c.d/secret']),
+ )
+
+ uri = 'http://example.com/path?client_id=abc&scope=%s&response_type=%s'
+ for scope, correct_scopes in scopes:
+ scopes, _ = self.web.validate_authorization_request(
+ uri % (scope, 'code'))
+ self.assertCountEqual(scopes, correct_scopes)
+ scopes, _ = self.mobile.validate_authorization_request(
+ uri % (scope, 'token'))
+ self.assertCountEqual(scopes, correct_scopes)
+ scopes, _ = self.server.validate_authorization_request(
+ uri % (scope, 'code'))
+ self.assertCountEqual(scopes, correct_scopes)
+
+ def test_scope_preservation(self):
+ scope = 'pics+http%3A%2f%2fa.b%2fvideos'
+ decoded_scope = 'pics http://a.b/videos'
+ auth_uri = 'http://example.com/path?client_id=abc&response_type='
+ token_uri = 'http://example.com/path'
+
+ # authorization grant
+ for backend_server_type in ['web', 'server']:
+ h, _, s = getattr(self, backend_server_type).create_authorization_response(
+ auth_uri + 'code', scopes=decoded_scope.split(' '))
+ self.validator.validate_code.side_effect = self.set_scopes(decoded_scope.split(' '))
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ code = get_query_credentials(h['Location'])['code'][0]
+ _, body, _ = getattr(self, backend_server_type).create_token_response(token_uri,
+ body='client_id=me&redirect_uri=http://back.to/me&grant_type=authorization_code&code=%s' % code)
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ # implicit grant
+ for backend_server_type in ['mobile', 'server']:
+ h, _, s = getattr(self, backend_server_type).create_authorization_response(
+ auth_uri + 'token', scopes=decoded_scope.split(' '))
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertEqual(get_fragment_credentials(h['Location'])['scope'][0], decoded_scope)
+
+ # resource owner password credentials grant
+ for backend_server_type in ['legacy', 'server']:
+ body = 'grant_type=password&username=abc&password=secret&scope=%s'
+
+ _, body, _ = getattr(self, backend_server_type).create_token_response(token_uri,
+ body=body % scope)
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ # client credentials grant
+ for backend_server_type in ['backend', 'server']:
+ body = 'grant_type=client_credentials&scope=%s'
+ self.validator.authenticate_client.side_effect = self.set_user
+ _, body, _ = getattr(self, backend_server_type).create_token_response(token_uri,
+ body=body % scope)
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ def test_scope_changed(self):
+ scope = 'pics+http%3A%2f%2fa.b%2fvideos'
+ scopes = ['images', 'http://a.b/videos']
+ decoded_scope = 'images http://a.b/videos'
+ auth_uri = 'http://example.com/path?client_id=abc&response_type='
+ token_uri = 'http://example.com/path'
+
+ # authorization grant
+ h, _, s = self.web.create_authorization_response(
+ auth_uri + 'code', scopes=scopes)
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ code = get_query_credentials(h['Location'])['code'][0]
+ self.validator.validate_code.side_effect = self.set_scopes(scopes)
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=%s' % code)
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ # implicit grant
+ self.validator.validate_scopes.side_effect = self.set_scopes(scopes)
+ h, _, s = self.mobile.create_authorization_response(
+ auth_uri + 'token', scopes=scopes)
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertEqual(get_fragment_credentials(h['Location'])['scope'][0], decoded_scope)
+
+ # resource owner password credentials grant
+ self.validator.validate_scopes.side_effect = self.set_scopes(scopes)
+ body = 'grant_type=password&username=abc&password=secret&scope=%s'
+ _, body, _ = self.legacy.create_token_response(token_uri,
+ body=body % scope)
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ # client credentials grant
+ self.validator.validate_scopes.side_effect = self.set_scopes(scopes)
+ self.validator.authenticate_client.side_effect = self.set_user
+ body = 'grant_type=client_credentials&scope=%s'
+ _, body, _ = self.backend.create_token_response(token_uri,
+ body=body % scope)
+
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ def test_invalid_scope(self):
+ scope = 'pics+http%3A%2f%2fa.b%2fvideos'
+ auth_uri = 'http://example.com/path?client_id=abc&response_type='
+ token_uri = 'http://example.com/path'
+
+ self.validator.validate_scopes.return_value = False
+
+ # authorization grant
+ h, _, s = self.web.create_authorization_response(
+ auth_uri + 'code', scopes=['invalid'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ error = get_query_credentials(h['Location'])['error'][0]
+ self.assertEqual(error, 'invalid_scope')
+
+ # implicit grant
+ h, _, s = self.mobile.create_authorization_response(
+ auth_uri + 'token', scopes=['invalid'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ error = get_fragment_credentials(h['Location'])['error'][0]
+ self.assertEqual(error, 'invalid_scope')
+
+ # resource owner password credentials grant
+ body = 'grant_type=password&username=abc&password=secret&scope=%s'
+ _, body, _ = self.legacy.create_token_response(token_uri,
+ body=body % scope)
+ self.assertEqual(json.loads(body)['error'], 'invalid_scope')
+
+ # client credentials grant
+ self.validator.authenticate_client.side_effect = self.set_user
+ body = 'grant_type=client_credentials&scope=%s'
+ _, body, _ = self.backend.create_token_response(token_uri,
+ body=body % scope)
+ self.assertEqual(json.loads(body)['error'], 'invalid_scope')
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_utils.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_utils.py
new file mode 100644
index 0000000000..5eae1956f4
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_utils.py
@@ -0,0 +1,11 @@
+import urllib.parse as urlparse
+
+
+def get_query_credentials(uri):
+ return urlparse.parse_qs(urlparse.urlparse(uri).query,
+ keep_blank_values=True)
+
+
+def get_fragment_credentials(uri):
+ return urlparse.parse_qs(urlparse.urlparse(uri).fragment,
+ keep_blank_values=True)
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/__init__.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/__init__.py
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_authorization_code.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_authorization_code.py
new file mode 100644
index 0000000000..77e1a81b46
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_authorization_code.py
@@ -0,0 +1,382 @@
+# -*- coding: utf-8 -*-
+import json
+from unittest import mock
+
+from oauthlib.common import Request
+from oauthlib.oauth2.rfc6749 import errors
+from oauthlib.oauth2.rfc6749.grant_types import (
+ AuthorizationCodeGrant, authorization_code,
+)
+from oauthlib.oauth2.rfc6749.tokens import BearerToken
+
+from tests.unittest import TestCase
+
+
+class AuthorizationCodeGrantTest(TestCase):
+
+ def setUp(self):
+ self.request = Request('http://a.b/path')
+ self.request.scopes = ('hello', 'world')
+ self.request.expires_in = 1800
+ self.request.client = 'batman'
+ self.request.client_id = 'abcdef'
+ self.request.code = '1234'
+ self.request.response_type = 'code'
+ self.request.grant_type = 'authorization_code'
+ self.request.redirect_uri = 'https://a.b/cb'
+
+ self.mock_validator = mock.MagicMock()
+ self.mock_validator.is_pkce_required.return_value = False
+ self.mock_validator.get_code_challenge.return_value = None
+ self.mock_validator.is_origin_allowed.return_value = False
+ self.mock_validator.authenticate_client.side_effect = self.set_client
+ self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator)
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def setup_validators(self):
+ self.authval1, self.authval2 = mock.Mock(), mock.Mock()
+ self.authval1.return_value = {}
+ self.authval2.return_value = {}
+ self.tknval1, self.tknval2 = mock.Mock(), mock.Mock()
+ self.tknval1.return_value = None
+ self.tknval2.return_value = None
+ self.auth.custom_validators.pre_token.append(self.tknval1)
+ self.auth.custom_validators.post_token.append(self.tknval2)
+ self.auth.custom_validators.pre_auth.append(self.authval1)
+ self.auth.custom_validators.post_auth.append(self.authval2)
+
+ def test_custom_auth_validators(self):
+ self.setup_validators()
+
+ bearer = BearerToken(self.mock_validator)
+ self.auth.create_authorization_response(self.request, bearer)
+ self.assertTrue(self.authval1.called)
+ self.assertTrue(self.authval2.called)
+ self.assertFalse(self.tknval1.called)
+ self.assertFalse(self.tknval2.called)
+
+ def test_custom_token_validators(self):
+ self.setup_validators()
+
+ bearer = BearerToken(self.mock_validator)
+ self.auth.create_token_response(self.request, bearer)
+ self.assertTrue(self.tknval1.called)
+ self.assertTrue(self.tknval2.called)
+ self.assertFalse(self.authval1.called)
+ self.assertFalse(self.authval2.called)
+
+ def test_create_authorization_grant(self):
+ bearer = BearerToken(self.mock_validator)
+ self.request.response_mode = 'query'
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ grant = dict(Request(h['Location']).uri_query_params)
+ self.assertIn('code', grant)
+ self.assertTrue(self.mock_validator.validate_redirect_uri.called)
+ self.assertTrue(self.mock_validator.validate_response_type.called)
+ self.assertTrue(self.mock_validator.validate_scopes.called)
+
+ def test_create_authorization_grant_no_scopes(self):
+ bearer = BearerToken(self.mock_validator)
+ self.request.response_mode = 'query'
+ self.request.scopes = []
+ self.auth.create_authorization_response(self.request, bearer)
+
+ def test_create_authorization_grant_state(self):
+ self.request.state = 'abc'
+ self.request.redirect_uri = None
+ self.request.response_mode = 'query'
+ self.mock_validator.get_default_redirect_uri.return_value = 'https://a.b/cb'
+ bearer = BearerToken(self.mock_validator)
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ grant = dict(Request(h['Location']).uri_query_params)
+ self.assertIn('code', grant)
+ self.assertIn('state', grant)
+ self.assertFalse(self.mock_validator.validate_redirect_uri.called)
+ self.assertTrue(self.mock_validator.get_default_redirect_uri.called)
+ self.assertTrue(self.mock_validator.validate_response_type.called)
+ self.assertTrue(self.mock_validator.validate_scopes.called)
+
+ @mock.patch('oauthlib.common.generate_token')
+ def test_create_authorization_response(self, generate_token):
+ generate_token.return_value = 'abc'
+ bearer = BearerToken(self.mock_validator)
+ self.request.response_mode = 'query'
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], 'https://a.b/cb?code=abc')
+ self.request.response_mode = 'fragment'
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], 'https://a.b/cb#code=abc')
+
+ def test_create_token_response(self):
+ bearer = BearerToken(self.mock_validator)
+
+ h, token, s = self.auth.create_token_response(self.request, bearer)
+ token = json.loads(token)
+ self.assertEqual(self.mock_validator.save_token.call_count, 1)
+ self.assertIn('access_token', token)
+ self.assertIn('refresh_token', token)
+ self.assertIn('expires_in', token)
+ self.assertIn('scope', token)
+ self.assertTrue(self.mock_validator.client_authentication_required.called)
+ self.assertTrue(self.mock_validator.authenticate_client.called)
+ self.assertTrue(self.mock_validator.validate_code.called)
+ self.assertTrue(self.mock_validator.confirm_redirect_uri.called)
+ self.assertTrue(self.mock_validator.validate_grant_type.called)
+ self.assertTrue(self.mock_validator.invalidate_authorization_code.called)
+
+ def test_create_token_response_without_refresh_token(self):
+ self.auth.refresh_token = False # Not to issue refresh token.
+
+ bearer = BearerToken(self.mock_validator)
+ h, token, s = self.auth.create_token_response(self.request, bearer)
+ token = json.loads(token)
+ self.assertEqual(self.mock_validator.save_token.call_count, 1)
+ self.assertIn('access_token', token)
+ self.assertNotIn('refresh_token', token)
+ self.assertIn('expires_in', token)
+ self.assertIn('scope', token)
+ self.assertTrue(self.mock_validator.client_authentication_required.called)
+ self.assertTrue(self.mock_validator.authenticate_client.called)
+ self.assertTrue(self.mock_validator.validate_code.called)
+ self.assertTrue(self.mock_validator.confirm_redirect_uri.called)
+ self.assertTrue(self.mock_validator.validate_grant_type.called)
+ self.assertTrue(self.mock_validator.invalidate_authorization_code.called)
+
+ def test_invalid_request(self):
+ del self.request.code
+ self.assertRaises(errors.InvalidRequestError, self.auth.validate_token_request,
+ self.request)
+
+ def test_invalid_request_duplicates(self):
+ request = mock.MagicMock(wraps=self.request)
+ request.grant_type = 'authorization_code'
+ request.duplicate_params = ['client_id']
+ self.assertRaises(errors.InvalidRequestError, self.auth.validate_token_request,
+ request)
+
+ def test_authentication_required(self):
+ """
+ ensure client_authentication_required() is properly called
+ """
+ self.auth.validate_token_request(self.request)
+ self.mock_validator.client_authentication_required.assert_called_once_with(self.request)
+
+ def test_authenticate_client(self):
+ self.mock_validator.authenticate_client.side_effect = None
+ self.mock_validator.authenticate_client.return_value = False
+ self.assertRaises(errors.InvalidClientError, self.auth.validate_token_request,
+ self.request)
+
+ def test_client_id_missing(self):
+ self.mock_validator.authenticate_client.side_effect = None
+ request = mock.MagicMock(wraps=self.request)
+ request.grant_type = 'authorization_code'
+ del request.client.client_id
+ self.assertRaises(NotImplementedError, self.auth.validate_token_request,
+ request)
+
+ def test_invalid_grant(self):
+ self.request.client = 'batman'
+ self.mock_validator.authenticate_client = self.set_client
+ self.mock_validator.validate_code.return_value = False
+ self.assertRaises(errors.InvalidGrantError,
+ self.auth.validate_token_request, self.request)
+
+ def test_invalid_grant_type(self):
+ self.request.grant_type = 'foo'
+ self.assertRaises(errors.UnsupportedGrantTypeError,
+ self.auth.validate_token_request, self.request)
+
+ def test_authenticate_client_id(self):
+ self.mock_validator.client_authentication_required.return_value = False
+ self.mock_validator.authenticate_client_id.return_value = False
+ self.request.state = 'abc'
+ self.assertRaises(errors.InvalidClientError,
+ self.auth.validate_token_request, self.request)
+
+ def test_invalid_redirect_uri(self):
+ self.mock_validator.confirm_redirect_uri.return_value = False
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.auth.validate_token_request, self.request)
+
+ # PKCE validate_authorization_request
+ def test_pkce_challenge_missing(self):
+ self.mock_validator.is_pkce_required.return_value = True
+ self.assertRaises(errors.MissingCodeChallengeError,
+ self.auth.validate_authorization_request, self.request)
+
+ def test_pkce_default_method(self):
+ for required in [True, False]:
+ self.mock_validator.is_pkce_required.return_value = required
+ self.request.code_challenge = "present"
+ _, ri = self.auth.validate_authorization_request(self.request)
+ self.assertIn("code_challenge", ri)
+ self.assertIn("code_challenge_method", ri)
+ self.assertEqual(ri["code_challenge"], "present")
+ self.assertEqual(ri["code_challenge_method"], "plain")
+
+ def test_pkce_wrong_method(self):
+ for required in [True, False]:
+ self.mock_validator.is_pkce_required.return_value = required
+ self.request.code_challenge = "present"
+ self.request.code_challenge_method = "foobar"
+ self.assertRaises(errors.UnsupportedCodeChallengeMethodError,
+ self.auth.validate_authorization_request, self.request)
+
+ # PKCE validate_token_request
+ def test_pkce_verifier_missing(self):
+ self.mock_validator.is_pkce_required.return_value = True
+ self.assertRaises(errors.MissingCodeVerifierError,
+ self.auth.validate_token_request, self.request)
+
+ # PKCE validate_token_request
+ def test_pkce_required_verifier_missing_challenge_missing(self):
+ self.mock_validator.is_pkce_required.return_value = True
+ self.request.code_verifier = None
+ self.mock_validator.get_code_challenge.return_value = None
+ self.assertRaises(errors.MissingCodeVerifierError,
+ self.auth.validate_token_request, self.request)
+
+ def test_pkce_required_verifier_missing_challenge_valid(self):
+ self.mock_validator.is_pkce_required.return_value = True
+ self.request.code_verifier = None
+ self.mock_validator.get_code_challenge.return_value = "foo"
+ self.assertRaises(errors.MissingCodeVerifierError,
+ self.auth.validate_token_request, self.request)
+
+ def test_pkce_required_verifier_valid_challenge_missing(self):
+ self.mock_validator.is_pkce_required.return_value = True
+ self.request.code_verifier = "foobar"
+ self.mock_validator.get_code_challenge.return_value = None
+ self.assertRaises(errors.InvalidGrantError,
+ self.auth.validate_token_request, self.request)
+
+ def test_pkce_required_verifier_valid_challenge_valid_method_valid(self):
+ self.mock_validator.is_pkce_required.return_value = True
+ self.request.code_verifier = "foobar"
+ self.mock_validator.get_code_challenge.return_value = "foobar"
+ self.mock_validator.get_code_challenge_method.return_value = "plain"
+ self.auth.validate_token_request(self.request)
+
+ def test_pkce_required_verifier_invalid_challenge_valid_method_valid(self):
+ self.mock_validator.is_pkce_required.return_value = True
+ self.request.code_verifier = "foobar"
+ self.mock_validator.get_code_challenge.return_value = "raboof"
+ self.mock_validator.get_code_challenge_method.return_value = "plain"
+ self.assertRaises(errors.InvalidGrantError,
+ self.auth.validate_token_request, self.request)
+
+ def test_pkce_required_verifier_valid_challenge_valid_method_wrong(self):
+ self.mock_validator.is_pkce_required.return_value = True
+ self.request.code_verifier = "present"
+ self.mock_validator.get_code_challenge.return_value = "foobar"
+ self.mock_validator.get_code_challenge_method.return_value = "cryptic_method"
+ self.assertRaises(errors.ServerError,
+ self.auth.validate_token_request, self.request)
+
+ def test_pkce_verifier_valid_challenge_valid_method_missing(self):
+ self.mock_validator.is_pkce_required.return_value = True
+ self.request.code_verifier = "present"
+ self.mock_validator.get_code_challenge.return_value = "foobar"
+ self.mock_validator.get_code_challenge_method.return_value = None
+ self.assertRaises(errors.InvalidGrantError,
+ self.auth.validate_token_request, self.request)
+
+ def test_pkce_optional_verifier_valid_challenge_missing(self):
+ self.mock_validator.is_pkce_required.return_value = False
+ self.request.code_verifier = "present"
+ self.mock_validator.get_code_challenge.return_value = None
+ self.auth.validate_token_request(self.request)
+
+ def test_pkce_optional_verifier_missing_challenge_valid(self):
+ self.mock_validator.is_pkce_required.return_value = False
+ self.request.code_verifier = None
+ self.mock_validator.get_code_challenge.return_value = "foobar"
+ self.assertRaises(errors.MissingCodeVerifierError,
+ self.auth.validate_token_request, self.request)
+
+ # PKCE functions
+ def test_wrong_code_challenge_method_plain(self):
+ self.assertFalse(authorization_code.code_challenge_method_plain("foo", "bar"))
+
+ def test_correct_code_challenge_method_plain(self):
+ self.assertTrue(authorization_code.code_challenge_method_plain("foo", "foo"))
+
+ def test_wrong_code_challenge_method_s256(self):
+ self.assertFalse(authorization_code.code_challenge_method_s256("foo", "bar"))
+
+ def test_correct_code_challenge_method_s256(self):
+ # "abcd" as verifier gives a '+' to base64
+ self.assertTrue(
+ authorization_code.code_challenge_method_s256("abcd",
+ "iNQmb9TmM40TuEX88olXnSCciXgjuSF9o-Fhk28DFYk")
+ )
+ # "/" as verifier gives a '/' and '+' to base64
+ self.assertTrue(
+ authorization_code.code_challenge_method_s256("/",
+ "il7asoJjJEMhngUeSt4tHVu8Zxx4EFG_FDeJfL3-oPE")
+ )
+ # Example from PKCE RFCE
+ self.assertTrue(
+ authorization_code.code_challenge_method_s256("dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk",
+ "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM")
+ )
+
+ def test_code_modifier_called(self):
+ bearer = BearerToken(self.mock_validator)
+ code_modifier = mock.MagicMock(wraps=lambda grant, *a: grant)
+ self.auth.register_code_modifier(code_modifier)
+ self.auth.create_authorization_response(self.request, bearer)
+ code_modifier.assert_called_once()
+
+ def test_hybrid_token_save(self):
+ bearer = BearerToken(self.mock_validator)
+ self.auth.register_code_modifier(
+ lambda grant, *a: dict(list(grant.items()) + [('access_token', 1)])
+ )
+ self.auth.create_authorization_response(self.request, bearer)
+ self.mock_validator.save_token.assert_called_once()
+
+ # CORS
+
+ def test_create_cors_headers(self):
+ bearer = BearerToken(self.mock_validator)
+ self.request.headers['origin'] = 'https://foo.bar'
+ self.mock_validator.is_origin_allowed.return_value = True
+
+ headers = self.auth.create_token_response(self.request, bearer)[0]
+ self.assertEqual(
+ headers['Access-Control-Allow-Origin'], 'https://foo.bar'
+ )
+ self.mock_validator.is_origin_allowed.assert_called_once_with(
+ 'abcdef', 'https://foo.bar', self.request
+ )
+
+ def test_create_cors_headers_no_origin(self):
+ bearer = BearerToken(self.mock_validator)
+ headers = self.auth.create_token_response(self.request, bearer)[0]
+ self.assertNotIn('Access-Control-Allow-Origin', headers)
+ self.mock_validator.is_origin_allowed.assert_not_called()
+
+ def test_create_cors_headers_insecure_origin(self):
+ bearer = BearerToken(self.mock_validator)
+ self.request.headers['origin'] = 'http://foo.bar'
+
+ headers = self.auth.create_token_response(self.request, bearer)[0]
+ self.assertNotIn('Access-Control-Allow-Origin', headers)
+ self.mock_validator.is_origin_allowed.assert_not_called()
+
+ def test_create_cors_headers_invalid_origin(self):
+ bearer = BearerToken(self.mock_validator)
+ self.request.headers['origin'] = 'https://foo.bar'
+ self.mock_validator.is_origin_allowed.return_value = False
+
+ headers = self.auth.create_token_response(self.request, bearer)[0]
+ self.assertNotIn('Access-Control-Allow-Origin', headers)
+ self.mock_validator.is_origin_allowed.assert_called_once_with(
+ 'abcdef', 'https://foo.bar', self.request
+ )
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_client_credentials.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_client_credentials.py
new file mode 100644
index 0000000000..e9559c7931
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_client_credentials.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+import json
+from unittest import mock
+
+from oauthlib.common import Request
+from oauthlib.oauth2.rfc6749.grant_types import ClientCredentialsGrant
+from oauthlib.oauth2.rfc6749.tokens import BearerToken
+
+from tests.unittest import TestCase
+
+
+class ClientCredentialsGrantTest(TestCase):
+
+ def setUp(self):
+ mock_client = mock.MagicMock()
+ mock_client.user.return_value = 'mocked user'
+ self.request = Request('http://a.b/path')
+ self.request.grant_type = 'client_credentials'
+ self.request.client = mock_client
+ self.request.scopes = ('mocked', 'scopes')
+ self.mock_validator = mock.MagicMock()
+ self.auth = ClientCredentialsGrant(
+ request_validator=self.mock_validator)
+
+ def test_custom_auth_validators_unsupported(self):
+ authval1, authval2 = mock.Mock(), mock.Mock()
+ expected = ('ClientCredentialsGrant does not support authorization '
+ 'validators. Use token validators instead.')
+ with self.assertRaises(ValueError) as caught:
+ ClientCredentialsGrant(self.mock_validator, pre_auth=[authval1])
+ self.assertEqual(caught.exception.args[0], expected)
+ with self.assertRaises(ValueError) as caught:
+ ClientCredentialsGrant(self.mock_validator, post_auth=[authval2])
+ self.assertEqual(caught.exception.args[0], expected)
+ with self.assertRaises(AttributeError):
+ self.auth.custom_validators.pre_auth.append(authval1)
+ with self.assertRaises(AttributeError):
+ self.auth.custom_validators.pre_auth.append(authval2)
+
+ def test_custom_token_validators(self):
+ tknval1, tknval2 = mock.Mock(), mock.Mock()
+ self.auth.custom_validators.pre_token.append(tknval1)
+ self.auth.custom_validators.post_token.append(tknval2)
+
+ bearer = BearerToken(self.mock_validator)
+ self.auth.create_token_response(self.request, bearer)
+ self.assertTrue(tknval1.called)
+ self.assertTrue(tknval2.called)
+
+ def test_create_token_response(self):
+ bearer = BearerToken(self.mock_validator)
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ token = json.loads(body)
+ self.assertEqual(self.mock_validator.save_token.call_count, 1)
+ self.assertIn('access_token', token)
+ self.assertIn('token_type', token)
+ self.assertIn('expires_in', token)
+ self.assertIn('Content-Type', headers)
+ self.assertEqual(headers['Content-Type'], 'application/json')
+
+ def test_error_response(self):
+ bearer = BearerToken(self.mock_validator)
+ self.mock_validator.authenticate_client.return_value = False
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ self.assertEqual(self.mock_validator.save_token.call_count, 0)
+ error_msg = json.loads(body)
+ self.assertIn('error', error_msg)
+ self.assertEqual(error_msg['error'], 'invalid_client')
+ self.assertIn('Content-Type', headers)
+ self.assertEqual(headers['Content-Type'], 'application/json')
+
+ def test_validate_token_response(self):
+ # wrong grant type, scope
+ pass
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_implicit.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_implicit.py
new file mode 100644
index 0000000000..1fb71a1dc9
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_implicit.py
@@ -0,0 +1,62 @@
+# -*- coding: utf-8 -*-
+from unittest import mock
+
+from oauthlib.common import Request
+from oauthlib.oauth2.rfc6749.grant_types import ImplicitGrant
+from oauthlib.oauth2.rfc6749.tokens import BearerToken
+
+from tests.unittest import TestCase
+
+
+class ImplicitGrantTest(TestCase):
+
+ def setUp(self):
+ mock_client = mock.MagicMock()
+ mock_client.user.return_value = 'mocked user'
+ self.request = Request('http://a.b/path')
+ self.request.scopes = ('hello', 'world')
+ self.request.client = mock_client
+ self.request.client_id = 'abcdef'
+ self.request.response_type = 'token'
+ self.request.state = 'xyz'
+ self.request.redirect_uri = 'https://b.c/p'
+
+ self.mock_validator = mock.MagicMock()
+ self.auth = ImplicitGrant(request_validator=self.mock_validator)
+
+ @mock.patch('oauthlib.common.generate_token')
+ def test_create_token_response(self, generate_token):
+ generate_token.return_value = '1234'
+ bearer = BearerToken(self.mock_validator, expires_in=1800)
+ h, b, s = self.auth.create_token_response(self.request, bearer)
+ correct_uri = 'https://b.c/p#access_token=1234&token_type=Bearer&expires_in=1800&state=xyz&scope=hello+world'
+ self.assertEqual(s, 302)
+ self.assertURLEqual(h['Location'], correct_uri, parse_fragment=True)
+ self.assertEqual(self.mock_validator.save_token.call_count, 1)
+
+ correct_uri = 'https://b.c/p?access_token=1234&token_type=Bearer&expires_in=1800&state=xyz&scope=hello+world'
+ self.request.response_mode = 'query'
+ h, b, s = self.auth.create_token_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], correct_uri)
+
+ def test_custom_validators(self):
+ self.authval1, self.authval2 = mock.Mock(), mock.Mock()
+ self.tknval1, self.tknval2 = mock.Mock(), mock.Mock()
+ for val in (self.authval1, self.authval2):
+ val.return_value = {}
+ for val in (self.tknval1, self.tknval2):
+ val.return_value = None
+ self.auth.custom_validators.pre_token.append(self.tknval1)
+ self.auth.custom_validators.post_token.append(self.tknval2)
+ self.auth.custom_validators.pre_auth.append(self.authval1)
+ self.auth.custom_validators.post_auth.append(self.authval2)
+
+ bearer = BearerToken(self.mock_validator)
+ self.auth.create_token_response(self.request, bearer)
+ self.assertTrue(self.tknval1.called)
+ self.assertTrue(self.tknval2.called)
+ self.assertTrue(self.authval1.called)
+ self.assertTrue(self.authval2.called)
+
+ def test_error_response(self):
+ pass
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_refresh_token.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_refresh_token.py
new file mode 100644
index 0000000000..581f2a4d6a
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_refresh_token.py
@@ -0,0 +1,211 @@
+# -*- coding: utf-8 -*-
+import json
+from unittest import mock
+
+from oauthlib.common import Request
+from oauthlib.oauth2.rfc6749 import errors
+from oauthlib.oauth2.rfc6749.grant_types import RefreshTokenGrant
+from oauthlib.oauth2.rfc6749.tokens import BearerToken
+
+from tests.unittest import TestCase
+
+
+class RefreshTokenGrantTest(TestCase):
+
+ def setUp(self):
+ mock_client = mock.MagicMock()
+ mock_client.user.return_value = 'mocked user'
+ self.request = Request('http://a.b/path')
+ self.request.grant_type = 'refresh_token'
+ self.request.refresh_token = 'lsdkfhj230'
+ self.request.client_id = 'abcdef'
+ self.request.client = mock_client
+ self.request.scope = 'foo'
+ self.mock_validator = mock.MagicMock()
+ self.auth = RefreshTokenGrant(
+ request_validator=self.mock_validator)
+
+ def test_create_token_response(self):
+ self.mock_validator.get_original_scopes.return_value = ['foo', 'bar']
+ bearer = BearerToken(self.mock_validator)
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ token = json.loads(body)
+ self.assertEqual(self.mock_validator.save_token.call_count, 1)
+ self.assertIn('access_token', token)
+ self.assertIn('token_type', token)
+ self.assertIn('expires_in', token)
+ self.assertEqual(token['scope'], 'foo')
+
+ def test_custom_auth_validators_unsupported(self):
+ authval1, authval2 = mock.Mock(), mock.Mock()
+ expected = ('RefreshTokenGrant does not support authorization '
+ 'validators. Use token validators instead.')
+ with self.assertRaises(ValueError) as caught:
+ RefreshTokenGrant(self.mock_validator, pre_auth=[authval1])
+ self.assertEqual(caught.exception.args[0], expected)
+ with self.assertRaises(ValueError) as caught:
+ RefreshTokenGrant(self.mock_validator, post_auth=[authval2])
+ self.assertEqual(caught.exception.args[0], expected)
+ with self.assertRaises(AttributeError):
+ self.auth.custom_validators.pre_auth.append(authval1)
+ with self.assertRaises(AttributeError):
+ self.auth.custom_validators.pre_auth.append(authval2)
+
+ def test_custom_token_validators(self):
+ tknval1, tknval2 = mock.Mock(), mock.Mock()
+ self.auth.custom_validators.pre_token.append(tknval1)
+ self.auth.custom_validators.post_token.append(tknval2)
+
+ bearer = BearerToken(self.mock_validator)
+ self.auth.create_token_response(self.request, bearer)
+ self.assertTrue(tknval1.called)
+ self.assertTrue(tknval2.called)
+
+ def test_create_token_inherit_scope(self):
+ self.request.scope = None
+ self.mock_validator.get_original_scopes.return_value = ['foo', 'bar']
+ bearer = BearerToken(self.mock_validator)
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ token = json.loads(body)
+ self.assertEqual(self.mock_validator.save_token.call_count, 1)
+ self.assertIn('access_token', token)
+ self.assertIn('token_type', token)
+ self.assertIn('expires_in', token)
+ self.assertEqual(token['scope'], 'foo bar')
+
+ def test_create_token_within_original_scope(self):
+ self.mock_validator.get_original_scopes.return_value = ['baz']
+ self.mock_validator.is_within_original_scope.return_value = True
+ bearer = BearerToken(self.mock_validator)
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ token = json.loads(body)
+ self.assertEqual(self.mock_validator.save_token.call_count, 1)
+ self.assertIn('access_token', token)
+ self.assertIn('token_type', token)
+ self.assertIn('expires_in', token)
+ self.assertEqual(token['scope'], 'foo')
+
+ def test_invalid_scope(self):
+ self.mock_validator.get_original_scopes.return_value = ['baz']
+ self.mock_validator.is_within_original_scope.return_value = False
+ bearer = BearerToken(self.mock_validator)
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ token = json.loads(body)
+ self.assertEqual(self.mock_validator.save_token.call_count, 0)
+ self.assertEqual(token['error'], 'invalid_scope')
+ self.assertEqual(status_code, 400)
+
+ def test_invalid_token(self):
+ self.mock_validator.validate_refresh_token.return_value = False
+ bearer = BearerToken(self.mock_validator)
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ token = json.loads(body)
+ self.assertEqual(self.mock_validator.save_token.call_count, 0)
+ self.assertEqual(token['error'], 'invalid_grant')
+ self.assertEqual(status_code, 400)
+
+ def test_invalid_client(self):
+ self.mock_validator.authenticate_client.return_value = False
+ bearer = BearerToken(self.mock_validator)
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ token = json.loads(body)
+ self.assertEqual(self.mock_validator.save_token.call_count, 0)
+ self.assertEqual(token['error'], 'invalid_client')
+ self.assertEqual(status_code, 401)
+
+ def test_authentication_required(self):
+ """
+ ensure client_authentication_required() is properly called
+ """
+ self.mock_validator.authenticate_client.return_value = False
+ self.mock_validator.authenticate_client_id.return_value = False
+ self.request.code = 'waffles'
+ self.assertRaises(errors.InvalidClientError, self.auth.validate_token_request,
+ self.request)
+ self.mock_validator.client_authentication_required.assert_called_once_with(self.request)
+
+ def test_invalid_grant_type(self):
+ self.request.grant_type = 'wrong_type'
+ self.assertRaises(errors.UnsupportedGrantTypeError,
+ self.auth.validate_token_request, self.request)
+
+ def test_authenticate_client_id(self):
+ self.mock_validator.client_authentication_required.return_value = False
+ self.request.refresh_token = mock.MagicMock()
+ self.mock_validator.authenticate_client_id.return_value = False
+ self.assertRaises(errors.InvalidClientError,
+ self.auth.validate_token_request, self.request)
+
+ def test_invalid_refresh_token(self):
+ # invalid refresh token
+ self.mock_validator.authenticate_client_id.return_value = True
+ self.mock_validator.validate_refresh_token.return_value = False
+ self.assertRaises(errors.InvalidGrantError,
+ self.auth.validate_token_request, self.request)
+ # no token provided
+ del self.request.refresh_token
+ self.assertRaises(errors.InvalidRequestError,
+ self.auth.validate_token_request, self.request)
+
+ def test_invalid_scope_original_scopes_empty(self):
+ self.mock_validator.validate_refresh_token.return_value = True
+ self.mock_validator.is_within_original_scope.return_value = False
+ self.assertRaises(errors.InvalidScopeError,
+ self.auth.validate_token_request, self.request)
+
+ def test_valid_token_request(self):
+ self.request.scope = 'foo bar'
+ self.mock_validator.get_original_scopes = mock.Mock()
+ self.mock_validator.get_original_scopes.return_value = 'foo bar baz'
+ self.auth.validate_token_request(self.request)
+ self.assertEqual(self.request.scopes, self.request.scope.split())
+ # all ok but without request.scope
+ del self.request.scope
+ self.auth.validate_token_request(self.request)
+ self.assertEqual(self.request.scopes, 'foo bar baz'.split())
+
+ # CORS
+
+ def test_create_cors_headers(self):
+ bearer = BearerToken(self.mock_validator)
+ self.request.headers['origin'] = 'https://foo.bar'
+ self.mock_validator.is_origin_allowed.return_value = True
+
+ headers = self.auth.create_token_response(self.request, bearer)[0]
+ self.assertEqual(
+ headers['Access-Control-Allow-Origin'], 'https://foo.bar'
+ )
+ self.mock_validator.is_origin_allowed.assert_called_once_with(
+ 'abcdef', 'https://foo.bar', self.request
+ )
+
+ def test_create_cors_headers_no_origin(self):
+ bearer = BearerToken(self.mock_validator)
+ headers = self.auth.create_token_response(self.request, bearer)[0]
+ self.assertNotIn('Access-Control-Allow-Origin', headers)
+ self.mock_validator.is_origin_allowed.assert_not_called()
+
+ def test_create_cors_headers_insecure_origin(self):
+ bearer = BearerToken(self.mock_validator)
+ self.request.headers['origin'] = 'http://foo.bar'
+
+ headers = self.auth.create_token_response(self.request, bearer)[0]
+ self.assertNotIn('Access-Control-Allow-Origin', headers)
+ self.mock_validator.is_origin_allowed.assert_not_called()
+
+ def test_create_cors_headers_invalid_origin(self):
+ bearer = BearerToken(self.mock_validator)
+ self.request.headers['origin'] = 'https://foo.bar'
+ self.mock_validator.is_origin_allowed.return_value = False
+
+ headers = self.auth.create_token_response(self.request, bearer)[0]
+ self.assertNotIn('Access-Control-Allow-Origin', headers)
+ self.mock_validator.is_origin_allowed.assert_called_once_with(
+ 'abcdef', 'https://foo.bar', self.request
+ )
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_resource_owner_password.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_resource_owner_password.py
new file mode 100644
index 0000000000..294e27be35
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_resource_owner_password.py
@@ -0,0 +1,156 @@
+# -*- coding: utf-8 -*-
+import json
+from unittest import mock
+
+from oauthlib.common import Request
+from oauthlib.oauth2.rfc6749 import errors
+from oauthlib.oauth2.rfc6749.grant_types import (
+ ResourceOwnerPasswordCredentialsGrant,
+)
+from oauthlib.oauth2.rfc6749.tokens import BearerToken
+
+from tests.unittest import TestCase
+
+
+class ResourceOwnerPasswordCredentialsGrantTest(TestCase):
+
+ def setUp(self):
+ mock_client = mock.MagicMock()
+ mock_client.user.return_value = 'mocked user'
+ self.request = Request('http://a.b/path')
+ self.request.grant_type = 'password'
+ self.request.username = 'john'
+ self.request.password = 'doe'
+ self.request.client = mock_client
+ self.request.scopes = ('mocked', 'scopes')
+ self.mock_validator = mock.MagicMock()
+ self.auth = ResourceOwnerPasswordCredentialsGrant(
+ request_validator=self.mock_validator)
+
+ def set_client(self, request, *args, **kwargs):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def test_create_token_response(self):
+ bearer = BearerToken(self.mock_validator)
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ token = json.loads(body)
+ self.assertEqual(self.mock_validator.save_token.call_count, 1)
+ self.assertIn('access_token', token)
+ self.assertIn('token_type', token)
+ self.assertIn('expires_in', token)
+ self.assertIn('refresh_token', token)
+ # ensure client_authentication_required() is properly called
+ self.mock_validator.client_authentication_required.assert_called_once_with(self.request)
+ # fail client authentication
+ self.mock_validator.reset_mock()
+ self.mock_validator.validate_user.return_value = True
+ self.mock_validator.authenticate_client.return_value = False
+ status_code = self.auth.create_token_response(self.request, bearer)[2]
+ self.assertEqual(status_code, 401)
+ self.assertEqual(self.mock_validator.save_token.call_count, 0)
+
+ # mock client_authentication_required() returning False then fail
+ self.mock_validator.reset_mock()
+ self.mock_validator.client_authentication_required.return_value = False
+ self.mock_validator.authenticate_client_id.return_value = False
+ status_code = self.auth.create_token_response(self.request, bearer)[2]
+ self.assertEqual(status_code, 401)
+ self.assertEqual(self.mock_validator.save_token.call_count, 0)
+
+ def test_create_token_response_without_refresh_token(self):
+ # self.auth.refresh_token = False so we don't generate a refresh token
+ self.auth = ResourceOwnerPasswordCredentialsGrant(
+ request_validator=self.mock_validator, refresh_token=False)
+ bearer = BearerToken(self.mock_validator)
+ headers, body, status_code = self.auth.create_token_response(
+ self.request, bearer)
+ token = json.loads(body)
+ self.assertEqual(self.mock_validator.save_token.call_count, 1)
+ self.assertIn('access_token', token)
+ self.assertIn('token_type', token)
+ self.assertIn('expires_in', token)
+ # ensure no refresh token is generated
+ self.assertNotIn('refresh_token', token)
+ # ensure client_authentication_required() is properly called
+ self.mock_validator.client_authentication_required.assert_called_once_with(self.request)
+ # fail client authentication
+ self.mock_validator.reset_mock()
+ self.mock_validator.validate_user.return_value = True
+ self.mock_validator.authenticate_client.return_value = False
+ status_code = self.auth.create_token_response(self.request, bearer)[2]
+ self.assertEqual(status_code, 401)
+ self.assertEqual(self.mock_validator.save_token.call_count, 0)
+ # mock client_authentication_required() returning False then fail
+ self.mock_validator.reset_mock()
+ self.mock_validator.client_authentication_required.return_value = False
+ self.mock_validator.authenticate_client_id.return_value = False
+ status_code = self.auth.create_token_response(self.request, bearer)[2]
+ self.assertEqual(status_code, 401)
+ self.assertEqual(self.mock_validator.save_token.call_count, 0)
+
+ def test_custom_auth_validators_unsupported(self):
+ authval1, authval2 = mock.Mock(), mock.Mock()
+ expected = ('ResourceOwnerPasswordCredentialsGrant does not '
+ 'support authorization validators. Use token '
+ 'validators instead.')
+ with self.assertRaises(ValueError) as caught:
+ ResourceOwnerPasswordCredentialsGrant(self.mock_validator,
+ pre_auth=[authval1])
+ self.assertEqual(caught.exception.args[0], expected)
+ with self.assertRaises(ValueError) as caught:
+ ResourceOwnerPasswordCredentialsGrant(self.mock_validator,
+ post_auth=[authval2])
+ self.assertEqual(caught.exception.args[0], expected)
+ with self.assertRaises(AttributeError):
+ self.auth.custom_validators.pre_auth.append(authval1)
+ with self.assertRaises(AttributeError):
+ self.auth.custom_validators.pre_auth.append(authval2)
+
+ def test_custom_token_validators(self):
+ tknval1, tknval2 = mock.Mock(), mock.Mock()
+ self.auth.custom_validators.pre_token.append(tknval1)
+ self.auth.custom_validators.post_token.append(tknval2)
+
+ bearer = BearerToken(self.mock_validator)
+ self.auth.create_token_response(self.request, bearer)
+ self.assertTrue(tknval1.called)
+ self.assertTrue(tknval2.called)
+
+ def test_error_response(self):
+ pass
+
+ def test_scopes(self):
+ pass
+
+ def test_invalid_request_missing_params(self):
+ del self.request.grant_type
+ self.assertRaises(errors.InvalidRequestError, self.auth.validate_token_request,
+ self.request)
+
+ def test_invalid_request_duplicates(self):
+ request = mock.MagicMock(wraps=self.request)
+ request.duplicate_params = ['scope']
+ self.assertRaises(errors.InvalidRequestError, self.auth.validate_token_request,
+ request)
+
+ def test_invalid_grant_type(self):
+ self.request.grant_type = 'foo'
+ self.assertRaises(errors.UnsupportedGrantTypeError,
+ self.auth.validate_token_request, self.request)
+
+ def test_invalid_user(self):
+ self.mock_validator.validate_user.return_value = False
+ self.assertRaises(errors.InvalidGrantError, self.auth.validate_token_request,
+ self.request)
+
+ def test_client_id_missing(self):
+ del self.request.client.client_id
+ self.assertRaises(NotImplementedError, self.auth.validate_token_request,
+ self.request)
+
+ def test_valid_token_request(self):
+ self.mock_validator.validate_grant_type.return_value = True
+ self.auth.validate_token_request(self.request)
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/test_parameters.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_parameters.py
new file mode 100644
index 0000000000..cd8c9e952b
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_parameters.py
@@ -0,0 +1,304 @@
+from unittest.mock import patch
+
+from oauthlib import signals
+from oauthlib.oauth2.rfc6749.errors import *
+from oauthlib.oauth2.rfc6749.parameters import *
+
+from tests.unittest import TestCase
+
+
+@patch('time.time', new=lambda: 1000)
+class ParameterTests(TestCase):
+
+ state = 'xyz'
+ auth_base = {
+ 'uri': 'https://server.example.com/authorize',
+ 'client_id': 's6BhdRkqt3',
+ 'redirect_uri': 'https://client.example.com/cb',
+ 'state': state,
+ 'scope': 'photos'
+ }
+ list_scope = ['list', 'of', 'scopes']
+
+ auth_grant = {'response_type': 'code'}
+ auth_grant_pkce = {'response_type': 'code', 'code_challenge': "code_challenge",
+ 'code_challenge_method': 'code_challenge_method'}
+ auth_grant_list_scope = {}
+ auth_implicit = {'response_type': 'token', 'extra': 'extra'}
+ auth_implicit_list_scope = {}
+
+ def setUp(self):
+ self.auth_grant.update(self.auth_base)
+ self.auth_grant_pkce.update(self.auth_base)
+ self.auth_implicit.update(self.auth_base)
+ self.auth_grant_list_scope.update(self.auth_grant)
+ self.auth_grant_list_scope['scope'] = self.list_scope
+ self.auth_implicit_list_scope.update(self.auth_implicit)
+ self.auth_implicit_list_scope['scope'] = self.list_scope
+
+ auth_base_uri = ('https://server.example.com/authorize?response_type={0}'
+ '&client_id=s6BhdRkqt3&redirect_uri=https%3A%2F%2F'
+ 'client.example.com%2Fcb&scope={1}&state={2}{3}')
+
+ auth_base_uri_pkce = ('https://server.example.com/authorize?response_type={0}'
+ '&client_id=s6BhdRkqt3&redirect_uri=https%3A%2F%2F'
+ 'client.example.com%2Fcb&scope={1}&state={2}{3}&code_challenge={4}'
+ '&code_challenge_method={5}')
+
+ auth_grant_uri = auth_base_uri.format('code', 'photos', state, '')
+ auth_grant_uri_pkce = auth_base_uri_pkce.format('code', 'photos', state, '', 'code_challenge',
+ 'code_challenge_method')
+ auth_grant_uri_list_scope = auth_base_uri.format('code', 'list+of+scopes', state, '')
+ auth_implicit_uri = auth_base_uri.format('token', 'photos', state, '&extra=extra')
+ auth_implicit_uri_list_scope = auth_base_uri.format('token', 'list+of+scopes', state, '&extra=extra')
+
+ grant_body = {
+ 'grant_type': 'authorization_code',
+ 'code': 'SplxlOBeZQQYbYS6WxSbIA',
+ 'redirect_uri': 'https://client.example.com/cb'
+ }
+ grant_body_pkce = {
+ 'grant_type': 'authorization_code',
+ 'code': 'SplxlOBeZQQYbYS6WxSbIA',
+ 'redirect_uri': 'https://client.example.com/cb',
+ 'code_verifier': 'code_verifier'
+ }
+ grant_body_scope = {'scope': 'photos'}
+ grant_body_list_scope = {'scope': list_scope}
+ auth_grant_body = ('grant_type=authorization_code&'
+ 'code=SplxlOBeZQQYbYS6WxSbIA&'
+ 'redirect_uri=https%3A%2F%2Fclient.example.com%2Fcb')
+ auth_grant_body_pkce = ('grant_type=authorization_code&'
+ 'code=SplxlOBeZQQYbYS6WxSbIA&'
+ 'redirect_uri=https%3A%2F%2Fclient.example.com%2Fcb'
+ '&code_verifier=code_verifier')
+ auth_grant_body_scope = auth_grant_body + '&scope=photos'
+ auth_grant_body_list_scope = auth_grant_body + '&scope=list+of+scopes'
+
+ pwd_body = {
+ 'grant_type': 'password',
+ 'username': 'johndoe',
+ 'password': 'A3ddj3w'
+ }
+ password_body = 'grant_type=password&username=johndoe&password=A3ddj3w'
+
+ cred_grant = {'grant_type': 'client_credentials'}
+ cred_body = 'grant_type=client_credentials'
+
+ grant_response = 'https://client.example.com/cb?code=SplxlOBeZQQYbYS6WxSbIA&state=xyz'
+ grant_dict = {'code': 'SplxlOBeZQQYbYS6WxSbIA', 'state': state}
+
+ error_nocode = 'https://client.example.com/cb?state=xyz'
+ error_nostate = 'https://client.example.com/cb?code=SplxlOBeZQQYbYS6WxSbIA'
+ error_wrongstate = 'https://client.example.com/cb?code=SplxlOBeZQQYbYS6WxSbIA&state=abc'
+ error_denied = 'https://client.example.com/cb?error=access_denied&state=xyz'
+ error_invalid = 'https://client.example.com/cb?error=invalid_request&state=xyz'
+
+ implicit_base = 'https://example.com/cb#access_token=2YotnFZFEjr1zCsicMWpAA&scope=abc&'
+ implicit_response = implicit_base + 'state={}&token_type=example&expires_in=3600'.format(state)
+ implicit_notype = implicit_base + 'state={}&expires_in=3600'.format(state)
+ implicit_wrongstate = implicit_base + 'state={}&token_type=exampleexpires_in=3600'.format('invalid')
+ implicit_nostate = implicit_base + 'token_type=example&expires_in=3600'
+ implicit_notoken = 'https://example.com/cb#state=xyz&token_type=example&expires_in=3600'
+
+ implicit_dict = {
+ 'access_token': '2YotnFZFEjr1zCsicMWpAA',
+ 'state': state,
+ 'token_type': 'example',
+ 'expires_in': 3600,
+ 'expires_at': 4600,
+ 'scope': ['abc']
+ }
+
+ json_response = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",'
+ ' "token_type": "example",'
+ ' "expires_in": 3600,'
+ ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",'
+ ' "example_parameter": "example_value",'
+ ' "scope":"abc def"}')
+ json_response_noscope = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",'
+ ' "token_type": "example",'
+ ' "expires_in": 3600,'
+ ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",'
+ ' "example_parameter": "example_value" }')
+ json_response_noexpire = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",'
+ ' "token_type": "example",'
+ ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",'
+ ' "example_parameter": "example_value"}')
+ json_response_expirenull = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",'
+ ' "token_type": "example",'
+ ' "expires_in": null,'
+ ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",'
+ ' "example_parameter": "example_value"}')
+
+ json_custom_error = '{ "error": "incorrect_client_credentials" }'
+ json_error = '{ "error": "access_denied" }'
+
+ json_notoken = ('{ "token_type": "example",'
+ ' "expires_in": 3600,'
+ ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",'
+ ' "example_parameter": "example_value" }')
+
+ json_notype = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",'
+ ' "expires_in": 3600,'
+ ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",'
+ ' "example_parameter": "example_value" }')
+
+ json_dict = {
+ 'access_token': '2YotnFZFEjr1zCsicMWpAA',
+ 'token_type': 'example',
+ 'expires_in': 3600,
+ 'expires_at': 4600,
+ 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA',
+ 'example_parameter': 'example_value',
+ 'scope': ['abc', 'def']
+ }
+
+ json_noscope_dict = {
+ 'access_token': '2YotnFZFEjr1zCsicMWpAA',
+ 'token_type': 'example',
+ 'expires_in': 3600,
+ 'expires_at': 4600,
+ 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA',
+ 'example_parameter': 'example_value'
+ }
+
+ json_noexpire_dict = {
+ 'access_token': '2YotnFZFEjr1zCsicMWpAA',
+ 'token_type': 'example',
+ 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA',
+ 'example_parameter': 'example_value'
+ }
+
+ json_notype_dict = {
+ 'access_token': '2YotnFZFEjr1zCsicMWpAA',
+ 'expires_in': 3600,
+ 'expires_at': 4600,
+ 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA',
+ 'example_parameter': 'example_value',
+ }
+
+ url_encoded_response = ('access_token=2YotnFZFEjr1zCsicMWpAA'
+ '&token_type=example'
+ '&expires_in=3600'
+ '&refresh_token=tGzv3JOkF0XG5Qx2TlKWIA'
+ '&example_parameter=example_value'
+ '&scope=abc def')
+
+ url_encoded_error = 'error=access_denied'
+
+ url_encoded_notoken = ('token_type=example'
+ '&expires_in=3600'
+ '&refresh_token=tGzv3JOkF0XG5Qx2TlKWIA'
+ '&example_parameter=example_value')
+
+
+ def test_prepare_grant_uri(self):
+ """Verify correct authorization URI construction."""
+ self.assertURLEqual(prepare_grant_uri(**self.auth_grant), self.auth_grant_uri)
+ self.assertURLEqual(prepare_grant_uri(**self.auth_grant_list_scope), self.auth_grant_uri_list_scope)
+ self.assertURLEqual(prepare_grant_uri(**self.auth_implicit), self.auth_implicit_uri)
+ self.assertURLEqual(prepare_grant_uri(**self.auth_implicit_list_scope), self.auth_implicit_uri_list_scope)
+ self.assertURLEqual(prepare_grant_uri(**self.auth_grant_pkce), self.auth_grant_uri_pkce)
+
+ def test_prepare_token_request(self):
+ """Verify correct access token request body construction."""
+ self.assertFormBodyEqual(prepare_token_request(**self.grant_body), self.auth_grant_body)
+ self.assertFormBodyEqual(prepare_token_request(**self.pwd_body), self.password_body)
+ self.assertFormBodyEqual(prepare_token_request(**self.cred_grant), self.cred_body)
+ self.assertFormBodyEqual(prepare_token_request(**self.grant_body_pkce), self.auth_grant_body_pkce)
+
+ def test_grant_response(self):
+ """Verify correct parameter parsing and validation for auth code responses."""
+ params = parse_authorization_code_response(self.grant_response)
+ self.assertEqual(params, self.grant_dict)
+ params = parse_authorization_code_response(self.grant_response, state=self.state)
+ self.assertEqual(params, self.grant_dict)
+
+ self.assertRaises(MissingCodeError, parse_authorization_code_response,
+ self.error_nocode)
+ self.assertRaises(AccessDeniedError, parse_authorization_code_response,
+ self.error_denied)
+ self.assertRaises(InvalidRequestFatalError, parse_authorization_code_response,
+ self.error_invalid)
+ self.assertRaises(MismatchingStateError, parse_authorization_code_response,
+ self.error_nostate, state=self.state)
+ self.assertRaises(MismatchingStateError, parse_authorization_code_response,
+ self.error_wrongstate, state=self.state)
+
+ def test_implicit_token_response(self):
+ """Verify correct parameter parsing and validation for implicit responses."""
+ self.assertEqual(parse_implicit_response(self.implicit_response),
+ self.implicit_dict)
+ self.assertRaises(MissingTokenError, parse_implicit_response,
+ self.implicit_notoken)
+ self.assertRaises(ValueError, parse_implicit_response,
+ self.implicit_nostate, state=self.state)
+ self.assertRaises(ValueError, parse_implicit_response,
+ self.implicit_wrongstate, state=self.state)
+
+ def test_custom_json_error(self):
+ self.assertRaises(CustomOAuth2Error, parse_token_response, self.json_custom_error)
+
+ def test_json_token_response(self):
+ """Verify correct parameter parsing and validation for token responses. """
+ self.assertEqual(parse_token_response(self.json_response), self.json_dict)
+ self.assertRaises(AccessDeniedError, parse_token_response, self.json_error)
+ self.assertRaises(MissingTokenError, parse_token_response, self.json_notoken)
+
+ self.assertEqual(parse_token_response(self.json_response_noscope,
+ scope=['all', 'the', 'scopes']), self.json_noscope_dict)
+ self.assertEqual(parse_token_response(self.json_response_noexpire), self.json_noexpire_dict)
+ self.assertEqual(parse_token_response(self.json_response_expirenull), self.json_noexpire_dict)
+
+ scope_changes_recorded = []
+ def record_scope_change(sender, message, old, new):
+ scope_changes_recorded.append((message, old, new))
+
+ os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '1'
+ signals.scope_changed.connect(record_scope_change)
+ try:
+ parse_token_response(self.json_response, scope='aaa')
+ self.assertEqual(len(scope_changes_recorded), 1)
+ message, old, new = scope_changes_recorded[0]
+ for scope in new + old:
+ self.assertIn(scope, message)
+ self.assertEqual(old, ['aaa'])
+ self.assertEqual(set(new), {'abc', 'def'})
+ finally:
+ signals.scope_changed.disconnect(record_scope_change)
+ del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE']
+
+
+ def test_json_token_notype(self):
+ """Verify strict token type parsing only when configured. """
+ self.assertEqual(parse_token_response(self.json_notype), self.json_notype_dict)
+ try:
+ os.environ['OAUTHLIB_STRICT_TOKEN_TYPE'] = '1'
+ self.assertRaises(MissingTokenTypeError, parse_token_response, self.json_notype)
+ finally:
+ del os.environ['OAUTHLIB_STRICT_TOKEN_TYPE']
+
+ def test_url_encoded_token_response(self):
+ """Verify fallback parameter parsing and validation for token responses. """
+ self.assertEqual(parse_token_response(self.url_encoded_response), self.json_dict)
+ self.assertRaises(AccessDeniedError, parse_token_response, self.url_encoded_error)
+ self.assertRaises(MissingTokenError, parse_token_response, self.url_encoded_notoken)
+
+ scope_changes_recorded = []
+ def record_scope_change(sender, message, old, new):
+ scope_changes_recorded.append((message, old, new))
+
+ os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '1'
+ signals.scope_changed.connect(record_scope_change)
+ try:
+ token = parse_token_response(self.url_encoded_response, scope='aaa')
+ self.assertEqual(len(scope_changes_recorded), 1)
+ message, old, new = scope_changes_recorded[0]
+ for scope in new + old:
+ self.assertIn(scope, message)
+ self.assertEqual(old, ['aaa'])
+ self.assertEqual(set(new), {'abc', 'def'})
+ finally:
+ signals.scope_changed.disconnect(record_scope_change)
+ del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE']
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/test_request_validator.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_request_validator.py
new file mode 100644
index 0000000000..7a8d06b668
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_request_validator.py
@@ -0,0 +1,51 @@
+# -*- coding: utf-8 -*-
+from oauthlib.oauth2 import RequestValidator
+
+from tests.unittest import TestCase
+
+
+class RequestValidatorTest(TestCase):
+
+ def test_method_contracts(self):
+ v = RequestValidator()
+ self.assertRaises(NotImplementedError, v.authenticate_client, 'r')
+ self.assertRaises(NotImplementedError, v.authenticate_client_id,
+ 'client_id', 'r')
+ self.assertRaises(NotImplementedError, v.confirm_redirect_uri,
+ 'client_id', 'code', 'redirect_uri', 'client', 'request')
+ self.assertRaises(NotImplementedError, v.get_default_redirect_uri,
+ 'client_id', 'request')
+ self.assertRaises(NotImplementedError, v.get_default_scopes,
+ 'client_id', 'request')
+ self.assertRaises(NotImplementedError, v.get_original_scopes,
+ 'refresh_token', 'request')
+ self.assertFalse(v.is_within_original_scope(
+ ['scope'], 'refresh_token', 'request'))
+ self.assertRaises(NotImplementedError, v.invalidate_authorization_code,
+ 'client_id', 'code', 'request')
+ self.assertRaises(NotImplementedError, v.save_authorization_code,
+ 'client_id', 'code', 'request')
+ self.assertRaises(NotImplementedError, v.save_bearer_token,
+ 'token', 'request')
+ self.assertRaises(NotImplementedError, v.validate_bearer_token,
+ 'token', 'scopes', 'request')
+ self.assertRaises(NotImplementedError, v.validate_client_id,
+ 'client_id', 'request')
+ self.assertRaises(NotImplementedError, v.validate_code,
+ 'client_id', 'code', 'client', 'request')
+ self.assertRaises(NotImplementedError, v.validate_grant_type,
+ 'client_id', 'grant_type', 'client', 'request')
+ self.assertRaises(NotImplementedError, v.validate_redirect_uri,
+ 'client_id', 'redirect_uri', 'request')
+ self.assertRaises(NotImplementedError, v.validate_refresh_token,
+ 'refresh_token', 'client', 'request')
+ self.assertRaises(NotImplementedError, v.validate_response_type,
+ 'client_id', 'response_type', 'client', 'request')
+ self.assertRaises(NotImplementedError, v.validate_scopes,
+ 'client_id', 'scopes', 'client', 'request')
+ self.assertRaises(NotImplementedError, v.validate_user,
+ 'username', 'password', 'client', 'request')
+ self.assertTrue(v.client_authentication_required('r'))
+ self.assertFalse(
+ v.is_origin_allowed('client_id', 'https://foo.bar', 'r')
+ )
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/test_server.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_server.py
new file mode 100644
index 0000000000..94af37e56b
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_server.py
@@ -0,0 +1,391 @@
+# -*- coding: utf-8 -*-
+import json
+from unittest import mock
+
+from oauthlib import common
+from oauthlib.oauth2.rfc6749 import errors, tokens
+from oauthlib.oauth2.rfc6749.endpoints import Server
+from oauthlib.oauth2.rfc6749.endpoints.authorization import (
+ AuthorizationEndpoint,
+)
+from oauthlib.oauth2.rfc6749.endpoints.resource import ResourceEndpoint
+from oauthlib.oauth2.rfc6749.endpoints.token import TokenEndpoint
+from oauthlib.oauth2.rfc6749.grant_types import (
+ AuthorizationCodeGrant, ClientCredentialsGrant, ImplicitGrant,
+ ResourceOwnerPasswordCredentialsGrant,
+)
+
+from tests.unittest import TestCase
+
+
+class AuthorizationEndpointTest(TestCase):
+
+ def setUp(self):
+ self.mock_validator = mock.MagicMock()
+ self.mock_validator.get_code_challenge.return_value = None
+ self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
+ auth_code = AuthorizationCodeGrant(
+ request_validator=self.mock_validator)
+ auth_code.save_authorization_code = mock.MagicMock()
+ implicit = ImplicitGrant(
+ request_validator=self.mock_validator)
+ implicit.save_token = mock.MagicMock()
+
+ response_types = {
+ 'code': auth_code,
+ 'token': implicit,
+ 'none': auth_code
+ }
+ self.expires_in = 1800
+ token = tokens.BearerToken(
+ self.mock_validator,
+ expires_in=self.expires_in
+ )
+ self.endpoint = AuthorizationEndpoint(
+ default_response_type='code',
+ default_token_type=token,
+ response_types=response_types
+ )
+
+ @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
+ def test_authorization_grant(self):
+ uri = 'http://i.b/l?response_type=code&client_id=me&scope=all+of+them&state=xyz'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me?code=abc&state=xyz')
+
+ @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
+ def test_implicit_grant(self):
+ uri = 'http://i.b/l?response_type=token&client_id=me&scope=all+of+them&state=xyz'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me#access_token=abc&expires_in=' + str(self.expires_in) + '&token_type=Bearer&state=xyz&scope=all+of+them', parse_fragment=True)
+
+ def test_none_grant(self):
+ uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them&state=xyz'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me?state=xyz', parse_fragment=True)
+ self.assertIsNone(body)
+ self.assertEqual(status_code, 302)
+
+ # and without the state parameter
+ uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me', parse_fragment=True)
+ self.assertIsNone(body)
+ self.assertEqual(status_code, 302)
+
+ def test_missing_type(self):
+ uri = 'http://i.b/l?client_id=me&scope=all+of+them'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ self.mock_validator.validate_request = mock.MagicMock(
+ side_effect=errors.InvalidRequestError())
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me?error=invalid_request&error_description=Missing+response_type+parameter.')
+
+ def test_invalid_type(self):
+ uri = 'http://i.b/l?response_type=invalid&client_id=me&scope=all+of+them'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ self.mock_validator.validate_request = mock.MagicMock(
+ side_effect=errors.UnsupportedResponseTypeError())
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me?error=unsupported_response_type')
+
+
+class TokenEndpointTest(TestCase):
+
+ def setUp(self):
+ def set_user(request):
+ request.user = mock.MagicMock()
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked_client_id'
+ return True
+
+ self.mock_validator = mock.MagicMock()
+ self.mock_validator.authenticate_client.side_effect = set_user
+ self.mock_validator.get_code_challenge.return_value = None
+ self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
+ auth_code = AuthorizationCodeGrant(
+ request_validator=self.mock_validator)
+ password = ResourceOwnerPasswordCredentialsGrant(
+ request_validator=self.mock_validator)
+ client = ClientCredentialsGrant(
+ request_validator=self.mock_validator)
+ supported_types = {
+ 'authorization_code': auth_code,
+ 'password': password,
+ 'client_credentials': client,
+ }
+ self.expires_in = 1800
+ token = tokens.BearerToken(
+ self.mock_validator,
+ expires_in=self.expires_in
+ )
+ self.endpoint = TokenEndpoint(
+ 'authorization_code',
+ default_token_type=token,
+ grant_types=supported_types
+ )
+
+ @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
+ def test_authorization_grant(self):
+ body = 'grant_type=authorization_code&code=abc&scope=all+of+them'
+ headers, body, status_code = self.endpoint.create_token_response(
+ '', body=body)
+ token = {
+ 'token_type': 'Bearer',
+ 'expires_in': self.expires_in,
+ 'access_token': 'abc',
+ 'refresh_token': 'abc',
+ 'scope': 'all of them'
+ }
+ self.assertEqual(json.loads(body), token)
+
+ body = 'grant_type=authorization_code&code=abc'
+ headers, body, status_code = self.endpoint.create_token_response(
+ '', body=body)
+ token = {
+ 'token_type': 'Bearer',
+ 'expires_in': self.expires_in,
+ 'access_token': 'abc',
+ 'refresh_token': 'abc'
+ }
+ self.assertEqual(json.loads(body), token)
+
+ # try with additional custom variables
+ body = 'grant_type=authorization_code&code=abc&state=foobar'
+ headers, body, status_code = self.endpoint.create_token_response(
+ '', body=body)
+ self.assertEqual(json.loads(body), token)
+
+ @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
+ def test_password_grant(self):
+ body = 'grant_type=password&username=a&password=hello&scope=all+of+them'
+ headers, body, status_code = self.endpoint.create_token_response(
+ '', body=body)
+ token = {
+ 'token_type': 'Bearer',
+ 'expires_in': self.expires_in,
+ 'access_token': 'abc',
+ 'refresh_token': 'abc',
+ 'scope': 'all of them',
+ }
+ self.assertEqual(json.loads(body), token)
+
+ @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
+ def test_client_grant(self):
+ body = 'grant_type=client_credentials&scope=all+of+them'
+ headers, body, status_code = self.endpoint.create_token_response(
+ '', body=body)
+ token = {
+ 'token_type': 'Bearer',
+ 'expires_in': self.expires_in,
+ 'access_token': 'abc',
+ 'scope': 'all of them',
+ }
+ self.assertEqual(json.loads(body), token)
+
+ def test_missing_type(self):
+ _, body, _ = self.endpoint.create_token_response('', body='')
+ token = {'error': 'unsupported_grant_type'}
+ self.assertEqual(json.loads(body), token)
+
+ def test_invalid_type(self):
+ body = 'grant_type=invalid'
+ _, body, _ = self.endpoint.create_token_response('', body=body)
+ token = {'error': 'unsupported_grant_type'}
+ self.assertEqual(json.loads(body), token)
+
+
+class SignedTokenEndpointTest(TestCase):
+
+ def setUp(self):
+ self.expires_in = 1800
+
+ def set_user(request):
+ request.user = mock.MagicMock()
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked_client_id'
+ return True
+
+ self.mock_validator = mock.MagicMock()
+ self.mock_validator.get_code_challenge.return_value = None
+ self.mock_validator.authenticate_client.side_effect = set_user
+ self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
+
+ self.private_pem = """
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpAIBAAKCAQEA6TtDhWGwzEOWZP6m/zHoZnAPLABfetvoMPmxPGjFjtDuMRPv
+EvI1sbixZBjBtdnc5rTtHUUQ25Am3JzwPRGo5laMGbj1pPyCPxlVi9LK82HQNX0B
+YK7tZtVfDHElQA7F4v3j9d3rad4O9/n+lyGIQ0tT7yQcBm2A8FEaP0bZYCLMjwMN
+WfaVLE8eXHyv+MfpNNLI9wttLxygKYM48I3NwsFuJgOa/KuodXaAmf8pJnx8t1Wn
+nxvaYXFiUn/TxmhM/qhemPa6+0nqq+aWV5eT7xn4K/ghLgNs09v6Yge0pmPl9Oz+
++bjJ+aKRnAmwCOY8/5U5EilAiUOeBoO9+8OXtwIDAQABAoIBAGFTTbXXMkPK4HN8
+oItVdDlrAanG7hECuz3UtFUVE3upS/xG6TjqweVLwRqYCh2ssDXFwjy4mXRGDzF4
+e/e/6s9Txlrlh/w1MtTJ6ZzTdcViR9RKOczysjZ7S5KRlI3KnGFAuWPcG2SuOWjZ
+dZfzcj1Crd/ZHajBAVFHRsCo/ATVNKbTRprFfb27xKpQ2BwH/GG781sLE3ZVNIhs
+aRRaED4622kI1E/WXws2qQMqbFKzo0m1tPbLb3Z89WgZJ/tRQwuDype1Vfm7k6oX
+xfbp3948qSe/yWKRlMoPkleji/WxPkSIalzWSAi9ziN/0Uzhe65FURgrfHL3XR1A
+B8UR+aECgYEA7NPQZV4cAikk02Hv65JgISofqV49P8MbLXk8sdnI1n7Mj10TgzU3
+lyQGDEX4hqvT0bTXe4KAOxQZx9wumu05ejfzhdtSsEm6ptGHyCdmYDQeV0C/pxDX
+JNCK8XgMku2370XG0AnyBCT7NGlgtDcNCQufcesF2gEuoKiXg6Zjo7sCgYEA/Bzs
+9fWGZZnSsMSBSW2OYbFuhF3Fne0HcxXQHipl0Rujc/9g0nccwqKGizn4fGOE7a8F
+usQgJoeGcinL7E9OEP/uQ9VX1C9RNVjIxP1O5/Guw1zjxQQYetOvbPhN2QhD1Ye7
+0TRKrW1BapcjwLpFQlVg1ZeTPOi5lv24W/wX9jUCgYEAkrMSX/hPuTbrTNVZ3L6r
+NV/2hN+PaTPeXei/pBuXwOaCqDurnpcUfFcgN/IP5LwDVd+Dq0pHTFFDNv45EFbq
+R77o5n3ZVsIVEMiyJ1XgoK8oLDw7e61+15smtjT69Piz+09pu+ytMcwGn4y3Dmsb
+dALzHYnL8iLRU0ubrz0ec4kCgYAJiVKRTzNBPptQom49h85d9ac3jJCAE8o3WTjh
+Gzt0uHXrWlqgO280EY/DTnMOyXjqwLcXxHlu26uDP/99tdY/IF8z46sJ1KxetzgI
+84f7kBHLRAU9m5UNeFpnZdEUB5MBTbwWAsNcYgiabpMkpCcghjg+fBhOsoLqqjhC
+CnwhjQKBgQDkv0QTdyBU84TE8J0XY3eLQwXbrvG2yD5A2ntN3PyxGEneX5WTJGMZ
+xJxwaFYQiDS3b9E7b8Q5dg8qa5Y1+epdhx3cuQAWPm+AoHKshDfbRve4txBDQAqh
+c6MxSWgsa+2Ld5SWSNbGtpPcmEM3Fl5ttMCNCKtNc0UE16oHwaPAIw==
+-----END RSA PRIVATE KEY-----
+ """
+
+ self.public_pem = """
+-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA6TtDhWGwzEOWZP6m/zHo
+ZnAPLABfetvoMPmxPGjFjtDuMRPvEvI1sbixZBjBtdnc5rTtHUUQ25Am3JzwPRGo
+5laMGbj1pPyCPxlVi9LK82HQNX0BYK7tZtVfDHElQA7F4v3j9d3rad4O9/n+lyGI
+Q0tT7yQcBm2A8FEaP0bZYCLMjwMNWfaVLE8eXHyv+MfpNNLI9wttLxygKYM48I3N
+wsFuJgOa/KuodXaAmf8pJnx8t1WnnxvaYXFiUn/TxmhM/qhemPa6+0nqq+aWV5eT
+7xn4K/ghLgNs09v6Yge0pmPl9Oz++bjJ+aKRnAmwCOY8/5U5EilAiUOeBoO9+8OX
+twIDAQAB
+-----END PUBLIC KEY-----
+ """
+
+ signed_token = tokens.signed_token_generator(self.private_pem,
+ user_id=123)
+ self.endpoint = Server(
+ self.mock_validator,
+ token_expires_in=self.expires_in,
+ token_generator=signed_token,
+ refresh_token_generator=tokens.random_token_generator
+ )
+
+ @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
+ def test_authorization_grant(self):
+ body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&scope=all+of+them'
+ headers, body, status_code = self.endpoint.create_token_response(
+ '', body=body)
+ body = json.loads(body)
+ token = {
+ 'token_type': 'Bearer',
+ 'expires_in': self.expires_in,
+ 'access_token': body['access_token'],
+ 'refresh_token': 'abc',
+ 'scope': 'all of them'
+ }
+ self.assertEqual(body, token)
+
+ body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc'
+ headers, body, status_code = self.endpoint.create_token_response(
+ '', body=body)
+ body = json.loads(body)
+ token = {
+ 'token_type': 'Bearer',
+ 'expires_in': self.expires_in,
+ 'access_token': body['access_token'],
+ 'refresh_token': 'abc'
+ }
+ self.assertEqual(body, token)
+
+ # try with additional custom variables
+ body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&state=foobar'
+ headers, body, status_code = self.endpoint.create_token_response(
+ '', body=body)
+ body = json.loads(body)
+ token = {
+ 'token_type': 'Bearer',
+ 'expires_in': self.expires_in,
+ 'access_token': body['access_token'],
+ 'refresh_token': 'abc'
+ }
+ self.assertEqual(body, token)
+
+ @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
+ def test_password_grant(self):
+ body = 'grant_type=password&username=a&password=hello&scope=all+of+them'
+ headers, body, status_code = self.endpoint.create_token_response(
+ '', body=body)
+ body = json.loads(body)
+ token = {
+ 'token_type': 'Bearer',
+ 'expires_in': self.expires_in,
+ 'access_token': body['access_token'],
+ 'refresh_token': 'abc',
+ 'scope': 'all of them',
+ }
+ self.assertEqual(body, token)
+
+ @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
+ def test_scopes_and_user_id_stored_in_access_token(self):
+ body = 'grant_type=password&username=a&password=hello&scope=all+of+them'
+ headers, body, status_code = self.endpoint.create_token_response(
+ '', body=body)
+
+ access_token = json.loads(body)['access_token']
+
+ claims = common.verify_signed_token(self.public_pem, access_token)
+
+ self.assertEqual(claims['scope'], 'all of them')
+ self.assertEqual(claims['user_id'], 123)
+
+ @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
+ def test_client_grant(self):
+ body = 'grant_type=client_credentials&scope=all+of+them'
+ headers, body, status_code = self.endpoint.create_token_response(
+ '', body=body)
+ body = json.loads(body)
+ token = {
+ 'token_type': 'Bearer',
+ 'expires_in': self.expires_in,
+ 'access_token': body['access_token'],
+ 'scope': 'all of them',
+ }
+ self.assertEqual(body, token)
+
+ def test_missing_type(self):
+ _, body, _ = self.endpoint.create_token_response('', body='client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&code=abc')
+ token = {'error': 'unsupported_grant_type'}
+ self.assertEqual(json.loads(body), token)
+
+ def test_invalid_type(self):
+ body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=invalid&code=abc'
+ _, body, _ = self.endpoint.create_token_response('', body=body)
+ token = {'error': 'unsupported_grant_type'}
+ self.assertEqual(json.loads(body), token)
+
+
+class ResourceEndpointTest(TestCase):
+
+ def setUp(self):
+ self.mock_validator = mock.MagicMock()
+ self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
+ token = tokens.BearerToken(request_validator=self.mock_validator)
+ self.endpoint = ResourceEndpoint(
+ default_token='Bearer',
+ token_types={'Bearer': token}
+ )
+
+ def test_defaults(self):
+ uri = 'http://a.b/path?some=query'
+ self.mock_validator.validate_bearer_token.return_value = False
+ valid, request = self.endpoint.verify_request(uri)
+ self.assertFalse(valid)
+ self.assertEqual(request.token_type, 'Bearer')
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/test_tokens.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_tokens.py
new file mode 100644
index 0000000000..fa6b1c092c
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_tokens.py
@@ -0,0 +1,170 @@
+from unittest import mock
+
+from oauthlib.common import Request
+from oauthlib.oauth2.rfc6749.tokens import (
+ BearerToken, prepare_bearer_body, prepare_bearer_headers,
+ prepare_bearer_uri, prepare_mac_header,
+)
+
+from tests.unittest import TestCase
+
+
+class TokenTest(TestCase):
+
+ # MAC without body/payload or extension
+ mac_plain = {
+ 'token': 'h480djs93hd8',
+ 'uri': 'http://example.com/resource/1?b=1&a=2',
+ 'key': '489dks293j39',
+ 'http_method': 'GET',
+ 'nonce': '264095:dj83hs9s',
+ 'hash_algorithm': 'hmac-sha-1'
+ }
+ auth_plain = {
+ 'Authorization': 'MAC id="h480djs93hd8", nonce="264095:dj83hs9s",'
+ ' mac="SLDJd4mg43cjQfElUs3Qub4L6xE="'
+ }
+
+ # MAC with body/payload, no extension
+ mac_body = {
+ 'token': 'jd93dh9dh39D',
+ 'uri': 'http://example.com/request',
+ 'key': '8yfrufh348h',
+ 'http_method': 'POST',
+ 'nonce': '273156:di3hvdf8',
+ 'hash_algorithm': 'hmac-sha-1',
+ 'body': 'hello=world%21'
+ }
+ auth_body = {
+ 'Authorization': 'MAC id="jd93dh9dh39D", nonce="273156:di3hvdf8",'
+ ' bodyhash="k9kbtCIy0CkI3/FEfpS/oIDjk6k=", mac="W7bdMZbv9UWOTadASIQHagZyirA="'
+ }
+
+ # MAC with body/payload and extension
+ mac_both = {
+ 'token': 'h480djs93hd8',
+ 'uri': 'http://example.com/request?b5=%3D%253D&a3=a&c%40=&a2=r%20b&c2&a3=2+q',
+ 'key': '489dks293j39',
+ 'http_method': 'GET',
+ 'nonce': '264095:7d8f3e4a',
+ 'hash_algorithm': 'hmac-sha-1',
+ 'body': 'Hello World!',
+ 'ext': 'a,b,c'
+ }
+ auth_both = {
+ 'Authorization': 'MAC id="h480djs93hd8", nonce="264095:7d8f3e4a",'
+ ' bodyhash="Lve95gjOVATpfV8EL5X4nxwjKHE=", ext="a,b,c",'
+ ' mac="Z3C2DojEopRDIC88/imW8Ez853g="'
+ }
+
+ # Bearer
+ token = 'vF9dft4qmT'
+ uri = 'http://server.example.com/resource'
+ bearer_headers = {
+ 'Authorization': 'Bearer vF9dft4qmT'
+ }
+ valid_bearer_header_lowercase = {"Authorization": "bearer vF9dft4qmT"}
+ fake_bearer_headers = [
+ {'Authorization': 'Beaver vF9dft4qmT'},
+ {'Authorization': 'BeavervF9dft4qmT'},
+ {'Authorization': 'Beaver vF9dft4qmT'},
+ {'Authorization': 'BearerF9dft4qmT'},
+ {'Authorization': 'Bearer vF9d ft4qmT'},
+ ]
+ valid_header_with_multiple_spaces = {'Authorization': 'Bearer vF9dft4qmT'}
+ bearer_body = 'access_token=vF9dft4qmT'
+ bearer_uri = 'http://server.example.com/resource?access_token=vF9dft4qmT'
+
+ def _mocked_validate_bearer_token(self, token, scopes, request):
+ if not token:
+ return False
+ return True
+
+ def test_prepare_mac_header(self):
+ """Verify mac signatures correctness
+
+ TODO: verify hmac-sha-256
+ """
+ self.assertEqual(prepare_mac_header(**self.mac_plain), self.auth_plain)
+ self.assertEqual(prepare_mac_header(**self.mac_body), self.auth_body)
+ self.assertEqual(prepare_mac_header(**self.mac_both), self.auth_both)
+
+ def test_prepare_bearer_request(self):
+ """Verify proper addition of bearer tokens to requests.
+
+ They may be represented as query components in body or URI or
+ in a Bearer authorization header.
+ """
+ self.assertEqual(prepare_bearer_headers(self.token), self.bearer_headers)
+ self.assertEqual(prepare_bearer_body(self.token), self.bearer_body)
+ self.assertEqual(prepare_bearer_uri(self.token, uri=self.uri), self.bearer_uri)
+
+ def test_valid_bearer_is_validated(self):
+ request_validator = mock.MagicMock()
+ request_validator.validate_bearer_token = self._mocked_validate_bearer_token
+
+ request = Request("/", headers=self.bearer_headers)
+ result = BearerToken(request_validator=request_validator).validate_request(
+ request
+ )
+ self.assertTrue(result)
+
+ def test_lowercase_bearer_is_validated(self):
+ request_validator = mock.MagicMock()
+ request_validator.validate_bearer_token = self._mocked_validate_bearer_token
+
+ request = Request("/", headers=self.valid_bearer_header_lowercase)
+ result = BearerToken(request_validator=request_validator).validate_request(
+ request
+ )
+ self.assertTrue(result)
+
+ def test_fake_bearer_is_not_validated(self):
+ request_validator = mock.MagicMock()
+ request_validator.validate_bearer_token = self._mocked_validate_bearer_token
+
+ for fake_header in self.fake_bearer_headers:
+ request = Request("/", headers=fake_header)
+ result = BearerToken(request_validator=request_validator).validate_request(
+ request
+ )
+
+ self.assertFalse(result)
+
+ def test_header_with_multispaces_is_validated(self):
+ request_validator = mock.MagicMock()
+ request_validator.validate_bearer_token = self._mocked_validate_bearer_token
+
+ request = Request("/", headers=self.valid_header_with_multiple_spaces)
+ result = BearerToken(request_validator=request_validator).validate_request(
+ request
+ )
+
+ self.assertTrue(result)
+
+ def test_estimate_type(self):
+ request_validator = mock.MagicMock()
+ request_validator.validate_bearer_token = self._mocked_validate_bearer_token
+ request = Request("/", headers=self.bearer_headers)
+ result = BearerToken(request_validator=request_validator).estimate_type(request)
+ self.assertEqual(result, 9)
+
+ def test_estimate_type_with_fake_header_returns_type_0(self):
+ request_validator = mock.MagicMock()
+ request_validator.validate_bearer_token = self._mocked_validate_bearer_token
+
+ for fake_header in self.fake_bearer_headers:
+ request = Request("/", headers=fake_header)
+ result = BearerToken(request_validator=request_validator).estimate_type(
+ request
+ )
+
+ if (
+ fake_header["Authorization"].count(" ") == 2
+ and fake_header["Authorization"].split()[0] == "Bearer"
+ ):
+ # If we're dealing with the header containing 2 spaces, it will be recognized
+ # as a Bearer valid header, the token itself will be invalid by the way.
+ self.assertEqual(result, 9)
+ else:
+ self.assertEqual(result, 0)
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/test_utils.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_utils.py
new file mode 100644
index 0000000000..3299591926
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_utils.py
@@ -0,0 +1,100 @@
+import datetime
+import os
+
+from oauthlib.oauth2.rfc6749.utils import (
+ escape, generate_age, host_from_uri, is_secure_transport, list_to_scope,
+ params_from_uri, scope_to_list,
+)
+
+from tests.unittest import TestCase
+
+
+class ScopeObject:
+ """
+ Fixture for testing list_to_scope()/scope_to_list() with objects other
+ than regular strings.
+ """
+ def __init__(self, scope):
+ self.scope = scope
+
+ def __str__(self):
+ return self.scope
+
+
+class UtilsTests(TestCase):
+
+ def test_escape(self):
+ """Assert that we are only escaping unicode"""
+ self.assertRaises(ValueError, escape, b"I am a string type. Not a unicode type.")
+ self.assertEqual(escape("I am a unicode type."), "I%20am%20a%20unicode%20type.")
+
+ def test_host_from_uri(self):
+ """Test if hosts and ports are properly extracted from URIs.
+
+ This should be done according to the MAC Authentication spec.
+ Defaults ports should be provided when none is present in the URI.
+ """
+ self.assertEqual(host_from_uri('http://a.b-c.com:8080'), ('a.b-c.com', '8080'))
+ self.assertEqual(host_from_uri('https://a.b.com:8080'), ('a.b.com', '8080'))
+ self.assertEqual(host_from_uri('http://www.example.com'), ('www.example.com', '80'))
+ self.assertEqual(host_from_uri('https://www.example.com'), ('www.example.com', '443'))
+
+ def test_is_secure_transport(self):
+ """Test check secure uri."""
+ if 'OAUTHLIB_INSECURE_TRANSPORT' in os.environ:
+ del os.environ['OAUTHLIB_INSECURE_TRANSPORT']
+
+ self.assertTrue(is_secure_transport('https://example.com'))
+ self.assertFalse(is_secure_transport('http://example.com'))
+
+ os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
+ self.assertTrue(is_secure_transport('http://example.com'))
+ del os.environ['OAUTHLIB_INSECURE_TRANSPORT']
+
+ def test_params_from_uri(self):
+ self.assertEqual(params_from_uri('http://i.b/?foo=bar&g&scope=a+d'),
+ {'foo': 'bar', 'g': '', 'scope': ['a', 'd']})
+
+ def test_generate_age(self):
+ issue_time = datetime.datetime.now() - datetime.timedelta(
+ days=3, minutes=1, seconds=4)
+ self.assertGreater(float(generate_age(issue_time)), 259263.0)
+
+ def test_list_to_scope(self):
+ expected = 'foo bar baz'
+
+ string_list = ['foo', 'bar', 'baz']
+ self.assertEqual(list_to_scope(string_list), expected)
+
+ string_tuple = ('foo', 'bar', 'baz')
+ self.assertEqual(list_to_scope(string_tuple), expected)
+
+ obj_list = [ScopeObject('foo'), ScopeObject('bar'), ScopeObject('baz')]
+ self.assertEqual(list_to_scope(obj_list), expected)
+
+ set_list = set(string_list)
+ set_scope = list_to_scope(set_list)
+ assert len(set_scope.split(' ')) == 3
+ for x in string_list:
+ assert x in set_scope
+
+ self.assertRaises(ValueError, list_to_scope, object())
+
+ def test_scope_to_list(self):
+ expected = ['foo', 'bar', 'baz']
+
+ string_scopes = 'foo bar baz '
+ self.assertEqual(scope_to_list(string_scopes), expected)
+
+ string_list_scopes = ['foo', 'bar', 'baz']
+ self.assertEqual(scope_to_list(string_list_scopes), expected)
+
+ tuple_list_scopes = ('foo', 'bar', 'baz')
+ self.assertEqual(scope_to_list(tuple_list_scopes), expected)
+
+ obj_list_scopes = [ScopeObject('foo'), ScopeObject('bar'), ScopeObject('baz')]
+ self.assertEqual(scope_to_list(obj_list_scopes), expected)
+
+ set_list_scopes = set(string_list_scopes)
+ set_list = scope_to_list(set_list_scopes)
+ self.assertEqual(sorted(set_list), sorted(string_list_scopes))
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc8628/__init__.py b/contrib/python/oauthlib/tests/oauth2/rfc8628/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc8628/__init__.py
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc8628/clients/__init__.py b/contrib/python/oauthlib/tests/oauth2/rfc8628/clients/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc8628/clients/__init__.py
diff --git a/contrib/python/oauthlib/tests/oauth2/rfc8628/clients/test_device.py b/contrib/python/oauthlib/tests/oauth2/rfc8628/clients/test_device.py
new file mode 100644
index 0000000000..725dea2a92
--- /dev/null
+++ b/contrib/python/oauthlib/tests/oauth2/rfc8628/clients/test_device.py
@@ -0,0 +1,63 @@
+import os
+from unittest.mock import patch
+
+from oauthlib import signals
+from oauthlib.oauth2 import DeviceClient
+
+from tests.unittest import TestCase
+
+
+class DeviceClientTest(TestCase):
+
+ client_id = "someclientid"
+ kwargs = {
+ "some": "providers",
+ "require": "extra arguments"
+ }
+
+ client_secret = "asecret"
+
+ device_code = "somedevicecode"
+
+ scope = ["profile", "email"]
+
+ body = "not=empty"
+
+ body_up = "not=empty&grant_type=urn:ietf:params:oauth:grant-type:device_code"
+ body_code = body_up + "&device_code=somedevicecode"
+ body_kwargs = body_code + "&some=providers&require=extra+arguments"
+
+ uri = "https://example.com/path?query=world"
+ uri_id = uri + "&client_id=" + client_id
+ uri_grant = uri_id + "&grant_type=urn:ietf:params:oauth:grant-type:device_code"
+ uri_secret = uri_grant + "&client_secret=asecret"
+ uri_scope = uri_secret + "&scope=profile+email"
+
+ def test_request_body(self):
+ client = DeviceClient(self.client_id)
+
+ # Basic, no extra arguments
+ body = client.prepare_request_body(self.device_code, body=self.body)
+ self.assertFormBodyEqual(body, self.body_code)
+
+ rclient = DeviceClient(self.client_id)
+ body = rclient.prepare_request_body(self.device_code, body=self.body)
+ self.assertFormBodyEqual(body, self.body_code)
+
+ # With extra parameters
+ body = client.prepare_request_body(
+ self.device_code, body=self.body, **self.kwargs)
+ self.assertFormBodyEqual(body, self.body_kwargs)
+
+ def test_request_uri(self):
+ client = DeviceClient(self.client_id)
+
+ uri = client.prepare_request_uri(self.uri)
+ self.assertURLEqual(uri, self.uri_grant)
+
+ client = DeviceClient(self.client_id, client_secret=self.client_secret)
+ uri = client.prepare_request_uri(self.uri)
+ self.assertURLEqual(uri, self.uri_secret)
+
+ uri = client.prepare_request_uri(self.uri, scope=self.scope)
+ self.assertURLEqual(uri, self.uri_scope)