diff options
author | robot-piglet <[email protected]> | 2025-09-05 12:45:40 +0300 |
---|---|---|
committer | robot-piglet <[email protected]> | 2025-09-05 13:32:17 +0300 |
commit | 22568739dbd9bf62b94abb305c082a53b21e5aef (patch) | |
tree | 7833f8119b1292867b256356e25ee8593dccac75 /contrib/python/oauthlib/tests | |
parent | 2db41830cd5fce47e0bac5493c1e1472908cc4c5 (diff) |
Intermediate changes
commit_hash:39273c986cf85d42ec97d1388aaaf324a02a04bc
Diffstat (limited to 'contrib/python/oauthlib/tests')
21 files changed, 616 insertions, 78 deletions
diff --git a/contrib/python/oauthlib/tests/oauth1/rfc5849/endpoints/test_base.py b/contrib/python/oauthlib/tests/oauth1/rfc5849/endpoints/test_base.py index e87f359baa4..792aaccf4a9 100644 --- a/contrib/python/oauthlib/tests/oauth1/rfc5849/endpoints/test_base.py +++ b/contrib/python/oauthlib/tests/oauth1/rfc5849/endpoints/test_base.py @@ -304,7 +304,7 @@ class ClientValidator(RequestValidator): def validate_timestamp_and_nonce(self, client_key, timestamp, nonce, request, request_token=None, access_token=None): resource_owner_key = request_token if request_token else access_token - return not (client_key, nonce, timestamp, resource_owner_key) in self.nonces + return (client_key, nonce, timestamp, resource_owner_key) not in self.nonces def validate_client_key(self, client_key): return client_key in self.clients diff --git a/contrib/python/oauthlib/tests/oauth1/rfc5849/test_signatures.py b/contrib/python/oauthlib/tests/oauth1/rfc5849/test_signatures.py index 2d4735eafd2..0b1dc0046bd 100644 --- a/contrib/python/oauthlib/tests/oauth1/rfc5849/test_signatures.py +++ b/contrib/python/oauthlib/tests/oauth1/rfc5849/test_signatures.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +from jwt import InvalidKeyError from oauthlib.oauth1.rfc5849.signature import ( base_string_uri, collect_parameters, normalize_parameters, sign_hmac_sha1_with_client, sign_hmac_sha256_with_client, @@ -82,12 +83,13 @@ class SignatureTests(TestCase): # ==== Example test vector ======================================= - eg_signature_base_string =\ - 'POST&http%3A%2F%2Fexample.com%2Frequest&a2%3Dr%2520b%26a3%3D2%2520q' \ - '%26a3%3Da%26b5%3D%253D%25253D%26c%2540%3D%26c2%3D%26oauth_consumer_' \ - 'key%3D9djdj82h48djs9d2%26oauth_nonce%3D7d8f3e4a%26oauth_signature_m' \ - 'ethod%3DHMAC-SHA1%26oauth_timestamp%3D137131201%26oauth_token%3Dkkk' \ + eg_signature_base_string = ( + 'POST&http%3A%2F%2Fexample.com%2Frequest&a2%3Dr%2520b%26a3%3D2%2520q' + '%26a3%3Da%26b5%3D%253D%25253D%26c%2540%3D%26c2%3D%26oauth_consumer_' + 'key%3D9djdj82h48djs9d2%26oauth_nonce%3D7d8f3e4a%26oauth_signature_m' + 'ethod%3DHMAC-SHA1%26oauth_timestamp%3D137131201%26oauth_token%3Dkkk' '9d7dh3k39sjv7' + ) # The _signature base string_ above is copied from the end of # RFC 5849 section 3.4.1.1. @@ -101,11 +103,11 @@ class SignatureTests(TestCase): eg_base_string_uri = 'http://example.com/request' - eg_normalized_parameters =\ - 'a2=r%20b&a3=2%20q&a3=a&b5=%3D%253D&c%40=&c2=&oauth_consumer_key=9dj' \ - 'dj82h48djs9d2&oauth_nonce=7d8f3e4a&oauth_signature_method=HMAC-SHA1' \ + eg_normalized_parameters = ( + 'a2=r%20b&a3=2%20q&a3=a&b5=%3D%253D&c%40=&c2=&oauth_consumer_key=9dj' + 'dj82h48djs9d2&oauth_nonce=7d8f3e4a&oauth_signature_method=HMAC-SHA1' '&oauth_timestamp=137131201&oauth_token=kkk9d7dh3k39sjv7' - + ) # The above _normalized parameters_ corresponds to the parameters below. # # The parameters below is copied from the table at the end of @@ -133,12 +135,12 @@ class SignatureTests(TestCase): eg_body = 'c2&a3=2+q' - eg_authorization_header =\ - 'OAuth realm="Example", oauth_consumer_key="9djdj82h48djs9d2",' \ - ' oauth_token="kkk9d7dh3k39sjv7", oauth_signature_method="HMAC-SHA1",' \ - ' oauth_timestamp="137131201", oauth_nonce="7d8f3e4a",' \ + eg_authorization_header = ( + 'OAuth realm="Example", oauth_consumer_key="9djdj82h48djs9d2",' + ' oauth_token="kkk9d7dh3k39sjv7", oauth_signature_method="HMAC-SHA1",' + ' oauth_timestamp="137131201", oauth_nonce="7d8f3e4a",' ' oauth_signature="djosJKDKJSD8743243%2Fjdk33klY%3D"' - + ) # ==== Signature base string calculating function tests ========== def test_signature_base_string(self): @@ -465,10 +467,10 @@ class SignatureTests(TestCase): expected_signature_hmac_sha256 = \ 'wdfdHUKXHbOnOGZP8WFAWMSAmWzN3EVBWWgXGlC/Eo4=' - expected_signature_hmac_sha512 = \ - 'u/vlyZFDxOWOZ9UUXwRBJHvq8/T4jCA74ocRmn2ECnjUBTAeJiZIRU8hDTjS88Tz' \ + expected_signature_hmac_sha512 = ( + 'u/vlyZFDxOWOZ9UUXwRBJHvq8/T4jCA74ocRmn2ECnjUBTAeJiZIRU8hDTjS88Tz' '1fGONffMpdZxUkUTW3k1kg==' - + ) def test_sign_hmac_sha1_with_client(self): """ Test sign and verify with HMAC-SHA1. @@ -632,21 +634,21 @@ GLYT3Jw1Lfb1bbuck9Y0JsRJO7uydWUbxXyZ+8YaDfE2NMw7sh2vAgMBAAE= # Note: the "echo -n" is needed to remove the last newline character, which # most text editors will add. - expected_signature_rsa_sha1 = \ - 'mFY2KOEnlYWsTvUA+5kxuBIcvBYXu+ljw9ttVJQxKduMueGSVPCB1tK1PlqVLK738' \ - 'HK0t19ecBJfb6rMxUwrriw+MlBO+jpojkZIWccw1J4cAb4qu4M81DbpUAq4j/1w/Q' \ + expected_signature_rsa_sha1 = ( + 'mFY2KOEnlYWsTvUA+5kxuBIcvBYXu+ljw9ttVJQxKduMueGSVPCB1tK1PlqVLK738' + 'HK0t19ecBJfb6rMxUwrriw+MlBO+jpojkZIWccw1J4cAb4qu4M81DbpUAq4j/1w/Q' 'yTR4TWCODlEfN7Zfgy8+pf+TjiXfIwRC1jEWbuL1E=' - - expected_signature_rsa_sha256 = \ - 'jqKl6m0WS69tiVJV8ZQ6aQEfJqISoZkiPBXRv6Al2+iFSaDpfeXjYm+Hbx6m1azR' \ - 'drZ/35PM3cvuid3LwW/siAkzb0xQcGnTyAPH8YcGWzmnKGY7LsB7fkqThchNxvRK' \ + ) + expected_signature_rsa_sha256 = ( + 'jqKl6m0WS69tiVJV8ZQ6aQEfJqISoZkiPBXRv6Al2+iFSaDpfeXjYm+Hbx6m1azR' + 'drZ/35PM3cvuid3LwW/siAkzb0xQcGnTyAPH8YcGWzmnKGY7LsB7fkqThchNxvRK' '/N7s9M1WMnfZZ+1dQbbwtTs1TG1+iexUcV7r3M7Heec=' - - expected_signature_rsa_sha512 = \ - 'jL1CnjlsNd25qoZVHZ2oJft47IRYTjpF5CvCUjL3LY0NTnbEeVhE4amWXUFBe9GL' \ - 'DWdUh/79ZWNOrCirBFIP26cHLApjYdt4ZG7EVK0/GubS2v8wT1QPRsog8zyiMZkm' \ + ) + expected_signature_rsa_sha512 = ( + 'jL1CnjlsNd25qoZVHZ2oJft47IRYTjpF5CvCUjL3LY0NTnbEeVhE4amWXUFBe9GL' + 'DWdUh/79ZWNOrCirBFIP26cHLApjYdt4ZG7EVK0/GubS2v8wT1QPRsog8zyiMZkm' 'g4JXdWCGXG8YRvRJTg+QKhXuXwS6TcMNakrgzgFIVhA=' - + ) def test_sign_rsa_sha1_with_client(self): """ Test sign and verify with RSA-SHA1. @@ -764,12 +766,17 @@ MmgDHR2tt8KeYTSgfU+BAkBcaVF91EQ7VXhvyABNYjeYP7lU7orOgdWMa/zbLXSU # Signing needs a private key - for bad_value in [None, '', 'foobar']: + for bad_value in [None, '']: self.assertRaises(ValueError, sign_rsa_sha1_with_client, self.eg_signature_base_string, MockClient(rsa_key=bad_value)) + self.assertRaises((InvalidKeyError, ValueError), + sign_rsa_sha1_with_client, + self.eg_signature_base_string, + MockClient(rsa_key='foobar')) + self.assertRaises(AttributeError, sign_rsa_sha1_with_client, self.eg_signature_base_string, diff --git a/contrib/python/oauthlib/tests/oauth1/rfc5849/test_utils.py b/contrib/python/oauthlib/tests/oauth1/rfc5849/test_utils.py index 013c71a910d..2212890819f 100644 --- a/contrib/python/oauthlib/tests/oauth1/rfc5849/test_utils.py +++ b/contrib/python/oauthlib/tests/oauth1/rfc5849/test_utils.py @@ -53,11 +53,11 @@ class UtilsTests(TestCase): # The following is an isolated test function used to test the filter_params decorator. @filter_params def special_test_function(params, realm=None): - """ I am a special test function """ + """I am a special test function""" return 'OAuth ' + ','.join(['='.join([k, v]) for k, v in params]) # check that the docstring got through - self.assertEqual(special_test_function.__doc__, " I am a special test function ") + self.assertEqual(special_test_function.__doc__, "I am a special test function") # Check that the decorator filtering works as per design. # Any param that does not start with 'oauth' diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_base.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_base.py index 70a22834c31..b0970f2d46a 100644 --- a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_base.py +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_base.py @@ -1,5 +1,7 @@ # -*- coding: utf-8 -*- import datetime +import json +from unittest.mock import patch from oauthlib import common from oauthlib.oauth2 import Client, InsecureTransportError, TokenExpiredError @@ -302,31 +304,6 @@ class ClientTest(TestCase): self.assertEqual(h, {'Content-Type': 'application/x-www-form-urlencoded'}) self.assertFormBodyEqual(b, 'grant_type=refresh_token&scope={}&refresh_token={}'.format(scope, token)) - def test_parse_token_response_invalid_expires_at(self): - token_json = ('{ "access_token":"2YotnFZFEjr1zCsicMWpAA",' - ' "token_type":"example",' - ' "expires_at":"2006-01-02T15:04:05Z",' - ' "scope":"/profile",' - ' "example_parameter":"example_value"}') - token = { - "access_token": "2YotnFZFEjr1zCsicMWpAA", - "token_type": "example", - "expires_at": "2006-01-02T15:04:05Z", - "scope": ["/profile"], - "example_parameter": "example_value" - } - - client = Client(self.client_id) - - # Parse code and state - response = client.parse_request_body_response(token_json, scope=["/profile"]) - self.assertEqual(response, token) - self.assertEqual(None, client._expires_at) - self.assertEqual(client.access_token, response.get("access_token")) - self.assertEqual(client.refresh_token, response.get("refresh_token")) - self.assertEqual(client.token_type, response.get("token_type")) - - def test_create_code_verifier_min_length(self): client = Client(self.client_id) length = 43 @@ -339,6 +316,12 @@ class ClientTest(TestCase): code_verifier = client.create_code_verifier(length=length) self.assertEqual(client.code_verifier, code_verifier) + def test_create_code_verifier_length(self): + client = Client(self.client_id) + length = 96 + code_verifier = client.create_code_verifier(length=length) + self.assertEqual(len(code_verifier), length) + def test_create_code_challenge_plain(self): client = Client(self.client_id) code_verifier = client.create_code_verifier(length=128) @@ -353,3 +336,43 @@ class ClientTest(TestCase): code_verifier = client.create_code_verifier(length=128) code_challenge_s256 = client.create_code_challenge(code_verifier=code_verifier, code_challenge_method='S256') self.assertEqual(code_challenge_s256, client.code_challenge) + + def test_parse_token_response_expires_at_types(self): + for title, fieldjson, expected, generated in [ + ('int', 1661185148, 1661185148, 1661185148), + ('float', 1661185148.6437678, 1661185148.6437678, 1661185148.6437678), + ('str', "\"2006-01-02T15:04:05Z\"", "2006-01-02T15:04:05Z", None), + ('str-as-int', "\"1661185148\"", 1661185148, 1661185148), + ('str-as-float', "\"1661185148.42\"", 1661185148.42, 1661185148.42), + ]: + with self.subTest(msg=title): + token_json = ('{{ "access_token":"2YotnFZFEjr1zCsicMWpAA",' + ' "token_type":"example",' + ' "expires_at":{expires_at},' + ' "scope":"/profile",' + ' "example_parameter":"example_value"}}'.format(expires_at=fieldjson)) + + client = Client(self.client_id) + response = client.parse_request_body_response(token_json, scope=["/profile"]) + + self.assertEqual(response['expires_at'], expected, "response attribute wrong") + self.assertEqual(client.expires_at, expected, "client attribute wrong") + if generated: + self.assertEqual(client._expires_at, generated, "internal expiration wrong") + + @patch('time.time') + def test_parse_token_response_generated_expires_at_is_int(self, t): + t.return_value = 1661185148.6437678 + expected_expires_at = round(t.return_value) + 3600 + token_json = ('{ "access_token":"2YotnFZFEjr1zCsicMWpAA",' + ' "token_type":"example",' + ' "expires_in":3600,' + ' "scope":"/profile",' + ' "example_parameter":"example_value"}') + + client = Client(self.client_id) + + response = client.parse_request_body_response(token_json, scope=["/profile"]) + + self.assertEqual(response['expires_at'], expected_expires_at) + self.assertEqual(client._expires_at, expected_expires_at) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_service_application.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_service_application.py index b97d8554edc..84361d8bc26 100644 --- a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_service_application.py +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_service_application.py @@ -166,7 +166,7 @@ mfvGGg3xNjTMO7IdrwIDAQAB @patch('time.time') def test_parse_token_response(self, t): t.return_value = time() - self.token['expires_at'] = self.token['expires_in'] + t.return_value + self.token['expires_at'] = self.token['expires_in'] + round(t.return_value) client = ServiceApplicationClient(self.client_id) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_web_application.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_web_application.py index 7a711215125..f9a4c9d115c 100644 --- a/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_web_application.py +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/clients/test_web_application.py @@ -252,18 +252,34 @@ class WebApplicationClientTest(TestCase): self.assertEqual(r4b_params['client_id'], self.client_id) # scenario Warnings - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") # catch all - - # warning1 - raise a DeprecationWarning if a `client_id` is submitted - rWarnings1 = client.prepare_request_body(client_id=self.client_id) - self.assertEqual(len(w), 1) - self.assertIsInstance(w[0].message, DeprecationWarning) - + # warning1 - raise a DeprecationWarning if a `client_id` is submitted + with self.assertWarns(DeprecationWarning): + client.prepare_request_body(client_id=self.client_id) # testing the exact warning message in Python2&Python3 is a pain # scenario Exceptions # exception1 - raise a ValueError if the a different `client_id` is submitted - with self.assertRaises(ValueError) as cm: + with self.assertWarns(DeprecationWarning), self.assertRaises(ValueError): client.prepare_request_body(client_id='different_client_id') # testing the exact exception message in Python2&Python3 is a pain + + def test_expires_in_as_str(self): + """ + see regression issue #906 + """ + + client = WebApplicationClient( + client_id="dummy", + token={"access_token": "xyz", "expires_in": "3600"} + ) + self.assertIsNotNone(client) + client = WebApplicationClient( + client_id="dummy", + token={"access_token": "xyz", "expires_in": 3600} + ) + self.assertIsNotNone(client) + client = WebApplicationClient( + client_id="dummy", + token={"access_token": "xyz", "expires_in": 3600.12} + ) + self.assertIsNotNone(client) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_metadata.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_metadata.py index 1f5b912100c..c36d94bfcf7 100644 --- a/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_metadata.py +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/endpoints/test_metadata.py @@ -20,8 +20,8 @@ class MetadataEndpointTest(TestCase): "introspection_endpoint": "https://foo.bar/introspect", "token_endpoint": "https://foo.bar/token" } - from oauthlib.oauth2 import Server as OAuth2Server - from oauthlib.openid import Server as OpenIDServer + from oauthlib.oauth2 import Server as OAuth2Server # noqa: PLC0415 + from oauthlib.openid import Server as OpenIDServer # noqa: PLC0415 endpoint = OAuth2Server(None) metadata = MetadataEndpoint([endpoint], default_claims) @@ -98,6 +98,7 @@ class MetadataEndpointTest(TestCase): "scopes_supported": ["email", "profile"], "grant_types_supported": [ "authorization_code", + "urn:ietf:params:oauth:grant-type:device_code", "password", "client_credentials", "refresh_token", @@ -130,8 +131,8 @@ class MetadataEndpointTest(TestCase): } def sort_list(claims): - for k in claims.keys(): - claims[k] = sorted(claims[k]) + for key, value in claims.items(): + claims[key] = sorted(value) sort_list(metadata.claims) sort_list(expected_claims) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_refresh_token.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_refresh_token.py index 581f2a4d6ab..f963444a145 100644 --- a/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_refresh_token.py +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/grant_types/test_refresh_token.py @@ -130,6 +130,22 @@ class RefreshTokenGrantTest(TestCase): self.request) self.mock_validator.client_authentication_required.assert_called_once_with(self.request) + + def test_authentication_required_populate_client_id(self): + """ + Make sure that request.client_id is populated from + request.client.client_id if None. + + """ + self.mock_validator.client_authentication_required.return_value = True + self.mock_validator.authenticate_client.return_value = True + # self.mock_validator.authenticate_client_id.return_value = False + # self.request.code = 'waffles' + self.request.client_id = None + self.request.client.client_id = 'foobar' + self.auth.validate_token_request(self.request) + self.request.client_id = 'foobar' + def test_invalid_grant_type(self): self.request.grant_type = 'wrong_type' self.assertRaises(errors.UnsupportedGrantTypeError, @@ -168,7 +184,7 @@ class RefreshTokenGrantTest(TestCase): # all ok but without request.scope del self.request.scope self.auth.validate_token_request(self.request) - self.assertEqual(self.request.scopes, 'foo bar baz'.split()) + self.assertEqual(self.request.scopes, ['foo', 'bar', 'baz']) # CORS diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/test_parameters.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_parameters.py index cd8c9e952ba..bd8a8b61ac1 100644 --- a/contrib/python/oauthlib/tests/oauth2/rfc6749/test_parameters.py +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_parameters.py @@ -302,3 +302,30 @@ class ParameterTests(TestCase): finally: signals.scope_changed.disconnect(record_scope_change) del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] + + + def test_parse_expires(self): + for title, arg, expected in [ + ('none', (None, None), (None, None, None)), + ('expires_in only', (3600, None), (3600, 4600, 4600)), + ('expires_in and expires_at', (3600, 200), (3600, 200, 200)), + ('expires_in and expires_at float', (3600, 200.42), (3600, 200.42, 200.42)), + ('expires_in and expires_at str-int', (3600, "200"), (3600, 200, 200)), + ('expires_in and expires_at str-float', (3600, "200.42"), (3600, 200.42, 200.42)), + ('expires_in float only', (3600.12, None), (3600, 4600, 4600)), + ('expires_in float and expires_at', (3600.12, 200), (3600, 200, 200)), + ('expires_in float and expires_at float', (3600.12, 200.42), (3600, 200.42, 200.42)), + ('expires_in float and expires_at str-int', (3600.12, "200"), (3600, 200, 200)), + ('expires_in float and expires_at str-float', (3600.12, "200.42"), (3600, 200.42, 200.42)), + ('expires_in str only', ("3600", None), (3600, 4600, 4600)), + ('expires_in str and expires_at', ("3600", 200), (3600, 200, 200)), + ('expires_in str and expires_at float', ("3600", 200.42), (3600, 200.42, 200.42)), + ('expires_in str and expires_at str-int', ("3600", "200"), (3600, 200, 200)), + ('expires_in str and expires_at str-float', ("3600", "200.42"), (3600, 200.42, 200.42)), + ]: + with self.subTest(msg=title): + params = { + "expires_in": arg[0], + "expires_at": arg[1] + } + self.assertEqual(expected, parse_expires(params)) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/test_tokens.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_tokens.py index fa6b1c092c8..ec8efca4acc 100644 --- a/contrib/python/oauthlib/tests/oauth2/rfc6749/test_tokens.py +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_tokens.py @@ -76,7 +76,7 @@ class TokenTest(TestCase): bearer_uri = 'http://server.example.com/resource?access_token=vF9dft4qmT' def _mocked_validate_bearer_token(self, token, scopes, request): - if not token: + if not token: # noqa: SIM103 return False return True diff --git a/contrib/python/oauthlib/tests/oauth2/rfc6749/test_utils.py b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_utils.py index 3299591926f..8417fe56705 100644 --- a/contrib/python/oauthlib/tests/oauth2/rfc6749/test_utils.py +++ b/contrib/python/oauthlib/tests/oauth2/rfc6749/test_utils.py @@ -78,7 +78,7 @@ class UtilsTests(TestCase): for x in string_list: assert x in set_scope - self.assertRaises(ValueError, list_to_scope, object()) + self.assertRaises(ValueError, list_to_scope, object()) def test_scope_to_list(self): expected = ['foo', 'bar', 'baz'] diff --git a/contrib/python/oauthlib/tests/oauth2/rfc8628/endpoints/__init__.py b/contrib/python/oauthlib/tests/oauth2/rfc8628/endpoints/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc8628/endpoints/__init__.py diff --git a/contrib/python/oauthlib/tests/oauth2/rfc8628/endpoints/test_device_application_server.py b/contrib/python/oauthlib/tests/oauth2/rfc8628/endpoints/test_device_application_server.py new file mode 100644 index 00000000000..f0436754a5e --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc8628/endpoints/test_device_application_server.py @@ -0,0 +1,26 @@ +import json +from unittest import TestCase, mock + +from oauthlib.common import Request, urlencode +from oauthlib.oauth2.rfc6749 import errors +from oauthlib.oauth2.rfc8628.endpoints.pre_configured import DeviceApplicationServer +from oauthlib.oauth2.rfc8628.request_validator import RequestValidator + + +def test_server_set_up_device_endpoint_instance_attributes_correctly(): + """ + Simple test that just instantiates DeviceApplicationServer + and asserts the important attributes are present + """ + validator = mock.MagicMock(spec=RequestValidator) + validator.get_default_redirect_uri.return_value = None + validator.get_code_challenge.return_value = None + + verification_uri = "test.com/device" + verification_uri_complete = "test.com/device?user_code=123" + device = DeviceApplicationServer(validator, verification_uri=verification_uri, verification_uri_complete=verification_uri_complete) + device_vars = vars(device) + assert device_vars["_verification_uri_complete"] == "test.com/device?user_code=123" + assert device_vars["_verification_uri"] == "test.com/device" + assert device_vars["_expires_in"] == 1800 + assert device_vars["_interval"] == 5 diff --git a/contrib/python/oauthlib/tests/oauth2/rfc8628/endpoints/test_error_responses.py b/contrib/python/oauthlib/tests/oauth2/rfc8628/endpoints/test_error_responses.py new file mode 100644 index 00000000000..c799cc81af2 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc8628/endpoints/test_error_responses.py @@ -0,0 +1,95 @@ +import json +from unittest import TestCase, mock + +from oauthlib.common import Request, urlencode +from oauthlib.oauth2.rfc6749 import errors +from oauthlib.oauth2.rfc8628.endpoints.pre_configured import DeviceApplicationServer +from oauthlib.oauth2.rfc8628.request_validator import RequestValidator + + +class ErrorResponseTest(TestCase): + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = "mocked" + return True + + def build_request(self, uri="https://example.com/device_authorize", client_id="foo"): + body = "" + if client_id: + body = f"client_id={client_id}" + return Request( + uri, + http_method="POST", + body=body, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + + def assert_request_raises(self, error, request): + """Test that the request fails similarly on the validation and response endpoint.""" + self.assertRaises( + error, + self.device.validate_device_authorization_request, + request, + ) + self.assertRaises( + error, + self.device.create_device_authorization_response, + uri=request.uri, + http_method=request.http_method, + body=request.body, + headers=request.headers, + ) + + def setUp(self): + self.validator = mock.MagicMock(spec=RequestValidator) + self.validator.get_default_redirect_uri.return_value = None + self.validator.get_code_challenge.return_value = None + self.device = DeviceApplicationServer(self.validator, "https://example.com/verify") + + def test_missing_client_id(self): + # Device code grant + request = self.build_request(client_id=None) + self.assert_request_raises(errors.MissingClientIdError, request) + + def test_empty_client_id(self): + # Device code grant + self.assertRaises( + errors.MissingClientIdError, + self.device.create_device_authorization_response, + "https://i.l/", + "POST", + "client_id=", + {"Content-Type": "application/x-www-form-urlencoded"}, + ) + + def test_invalid_client_id(self): + request = self.build_request(client_id="foo") + # Device code grant + self.validator.validate_client_id.return_value = False + self.assert_request_raises(errors.InvalidClientIdError, request) + + def test_duplicate_client_id(self): + request = self.build_request() + request.body = "client_id=foo&client_id=bar" + # Device code grant + self.validator.validate_client_id.return_value = False + self.assert_request_raises(errors.InvalidRequestFatalError, request) + + def test_unauthenticated_confidential_client(self): + self.validator.client_authentication_required.return_value = True + self.validator.authenticate_client.return_value = False + request = self.build_request() + self.assert_request_raises(errors.InvalidClientError, request) + + def test_unauthenticated_public_client(self): + self.validator.client_authentication_required.return_value = False + self.validator.authenticate_client_id.return_value = False + request = self.build_request() + self.assert_request_raises(errors.InvalidClientError, request) + + def test_duplicate_scope_parameter(self): + request = self.build_request() + request.body = "client_id=foo&scope=foo&scope=bar" + # Device code grant + self.validator.validate_client_id.return_value = False + self.assert_request_raises(errors.InvalidRequestFatalError, request) diff --git a/contrib/python/oauthlib/tests/oauth2/rfc8628/grant_types/__init__.py b/contrib/python/oauthlib/tests/oauth2/rfc8628/grant_types/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc8628/grant_types/__init__.py diff --git a/contrib/python/oauthlib/tests/oauth2/rfc8628/grant_types/test_device_code.py b/contrib/python/oauthlib/tests/oauth2/rfc8628/grant_types/test_device_code.py new file mode 100644 index 00000000000..da0592f7426 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc8628/grant_types/test_device_code.py @@ -0,0 +1,172 @@ +import json +from unittest import mock +import pytest + +from oauthlib import common + +from oauthlib.oauth2.rfc8628.grant_types import DeviceCodeGrant +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +def create_request(body: str = "") -> common.Request: + request = common.Request("http://a.b/path", body=body or None) + request.scopes = ("hello", "world") + request.expires_in = 1800 + request.client = "batman" + request.client_id = "abcdef" + request.code = "1234" + request.response_type = "code" + request.grant_type = "urn:ietf:params:oauth:grant-type:device_code" + request.redirect_uri = "https://a.b/" + return request + + +def create_device_code_grant(mock_validator: mock.MagicMock) -> DeviceCodeGrant: + return DeviceCodeGrant(request_validator=mock_validator) + + +def test_custom_auth_validators_unsupported(): + custom_validator = mock.Mock() + validator = mock.MagicMock() + + expected = ( + "DeviceCodeGrant does not " + "support authorization validators. Use token validators instead." + ) + with pytest.raises(ValueError, match=expected): + DeviceCodeGrant(validator, pre_auth=[custom_validator]) + + with pytest.raises(ValueError, match=expected): + DeviceCodeGrant(validator, post_auth=[custom_validator]) + + expected = "'tuple' object has no attribute 'append'" + auth = DeviceCodeGrant(validator) + with pytest.raises(AttributeError, match=expected): + auth.custom_validators.pre_auth.append(custom_validator) + + +def test_custom_pre_and_post_token_validators(): + client = mock.MagicMock() + + validator = mock.MagicMock() + pre_token_validator = mock.Mock() + post_token_validator = mock.Mock() + + request: common.Request = create_request() + request.client = client + + auth = DeviceCodeGrant(validator) + + auth.custom_validators.pre_token.append(pre_token_validator) + auth.custom_validators.post_token.append(post_token_validator) + + bearer = BearerToken(validator) + auth.create_token_response(request, bearer) + + pre_token_validator.assert_called() + post_token_validator.assert_called() + + +def test_create_token_response(): + validator = mock.MagicMock() + request: common.Request = create_request() + request.client = mock.Mock() + + auth = DeviceCodeGrant(validator) + + bearer = BearerToken(validator) + + headers, body, status_code = auth.create_token_response(request, bearer) + token = json.loads(body) + + assert headers == { + "Content-Type": "application/json", + "Cache-Control": "no-store", + "Pragma": "no-cache", + } + + # when a custom token generator callable isn't used + # the random generator is used as default for the access token + assert token == { + "access_token": mock.ANY, + "expires_in": 3600, + "token_type": "Bearer", + "scope": "hello world", + "refresh_token": mock.ANY, + } + + assert status_code == 200 + + validator.save_token.assert_called_once() + + +def test_invalid_client_error(): + validator = mock.MagicMock() + request: common.Request = create_request() + request.client = mock.Mock() + + auth = DeviceCodeGrant(validator) + bearer = BearerToken(validator) + + validator.authenticate_client.return_value = False + + headers, body, status_code = auth.create_token_response(request, bearer) + body = json.loads(body) + + assert headers == { + "Content-Type": "application/json", + "Cache-Control": "no-store", + "Pragma": "no-cache", + "WWW-Authenticate": 'Bearer error="invalid_client"', + } + assert body == {"error": "invalid_client"} + assert status_code == 401 + + validator.save_token.assert_not_called() + + +def test_invalid_grant_type_error(): + validator = mock.MagicMock() + request: common.Request = create_request() + request.client = mock.Mock() + + request.grant_type = "not_device_code" + + auth = DeviceCodeGrant(validator) + bearer = BearerToken(validator) + + headers, body, status_code = auth.create_token_response(request, bearer) + body = json.loads(body) + + assert headers == { + "Content-Type": "application/json", + "Cache-Control": "no-store", + "Pragma": "no-cache", + } + assert body == {"error": "unsupported_grant_type"} + assert status_code == 400 + + validator.save_token.assert_not_called() + + +def test_duplicate_params_error(): + validator = mock.MagicMock() + request: common.Request = create_request( + "client_id=123&scope=openid&scope=openid" + ) + request.client = mock.Mock() + + auth = DeviceCodeGrant(validator) + bearer = BearerToken(validator) + + headers, body, status_code = auth.create_token_response(request, bearer) + body = json.loads(body) + + assert headers == { + "Content-Type": "application/json", + "Cache-Control": "no-store", + "Pragma": "no-cache", + } + assert body == {"error": "invalid_request", "error_description": "Duplicate scope parameter."} + assert status_code == 400 + + validator.save_token.assert_not_called() diff --git a/contrib/python/oauthlib/tests/oauth2/rfc8628/test_server.py b/contrib/python/oauthlib/tests/oauth2/rfc8628/test_server.py new file mode 100644 index 00000000000..52025032849 --- /dev/null +++ b/contrib/python/oauthlib/tests/oauth2/rfc8628/test_server.py @@ -0,0 +1,113 @@ +import json +from unittest import mock + +from oauthlib.oauth2.rfc8628.endpoints import DeviceAuthorizationEndpoint +from oauthlib.oauth2.rfc8628.request_validator import RequestValidator + +from tests.unittest import TestCase + + +class DeviceAuthorizationEndpointTest(TestCase): + def _configure_endpoint( + self, interval=None, verification_uri_complete=None, user_code_generator=None + ): + self.endpoint = DeviceAuthorizationEndpoint( + request_validator=mock.MagicMock(spec=RequestValidator), + verification_uri=self.verification_uri, + interval=interval, + verification_uri_complete=verification_uri_complete, + user_code_generator=user_code_generator, + ) + + def setUp(self): + self.request_validator = mock.MagicMock(spec=RequestValidator) + self.verification_uri = "http://i.b/l/verify" + self.uri = "http://i.b/l" + self.http_method = "POST" + self.body = "client_id=abc" + self.headers = {"Content-Type": "application/x-www-form-urlencoded"} + self._configure_endpoint() + + def response_payload(self): + return self.uri, self.http_method, self.body, self.headers + + @mock.patch("oauthlib.oauth2.rfc8628.endpoints.device_authorization.generate_token") + def test_device_authorization_grant(self, generate_token): + generate_token.side_effect = ["abc", "def"] + _, body, status_code = self.endpoint.create_device_authorization_response( + *self.response_payload() + ) + expected_payload = { + "verification_uri": "http://i.b/l/verify", + "user_code": "abc", + "device_code": "def", + "expires_in": 1800, + } + self.assertEqual(200, status_code) + self.assertEqual(body, expected_payload) + + @mock.patch( + "oauthlib.oauth2.rfc8628.endpoints.device_authorization.generate_token", + lambda: "abc", + ) + def test_device_authorization_grant_interval(self): + self._configure_endpoint(interval=5) + _, body, _ = self.endpoint.create_device_authorization_response(*self.response_payload()) + self.assertEqual(5, body["interval"]) + + @mock.patch( + "oauthlib.oauth2.rfc8628.endpoints.device_authorization.generate_token", + lambda: "abc", + ) + def test_device_authorization_grant_interval_with_zero(self): + self._configure_endpoint(interval=0) + _, body, _ = self.endpoint.create_device_authorization_response(*self.response_payload()) + self.assertEqual(0, body["interval"]) + + @mock.patch( + "oauthlib.oauth2.rfc8628.endpoints.device_authorization.generate_token", + lambda: "abc", + ) + def test_device_authorization_grant_verify_url_complete_string(self): + self._configure_endpoint(verification_uri_complete="http://i.l/v?user_code={user_code}") + _, body, _ = self.endpoint.create_device_authorization_response(*self.response_payload()) + self.assertEqual( + "http://i.l/v?user_code=abc", + body["verification_uri_complete"], + ) + + @mock.patch( + "oauthlib.oauth2.rfc8628.endpoints.device_authorization.generate_token", + lambda: "abc", + ) + def test_device_authorization_grant_verify_url_complete_callable(self): + self._configure_endpoint(verification_uri_complete=lambda u: f"http://i.l/v?user_code={u}") + _, body, _ = self.endpoint.create_device_authorization_response(*self.response_payload()) + self.assertEqual( + "http://i.l/v?user_code=abc", + body["verification_uri_complete"], + ) + + @mock.patch( + "oauthlib.oauth2.rfc8628.endpoints.device_authorization.generate_token", + lambda: "abc", + ) + def test_device_authorization_grant_user_gode_generator(self): + def user_code(): + """ + A friendly user code the device can display and the user + can type in. It's up to the device how + this code should be displayed. e.g 123-456 + """ + return "123456" + + self._configure_endpoint( + verification_uri_complete=lambda u: f"http://i.l/v?user_code={u}", + user_code_generator=user_code, + ) + + _, body, _ = self.endpoint.create_device_authorization_response(*self.response_payload()) + self.assertEqual( + "http://i.l/v?user_code=123456", + body["verification_uri_complete"], + ) diff --git a/contrib/python/oauthlib/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py b/contrib/python/oauthlib/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py index c55136fbf1e..5b04edff6bf 100644 --- a/contrib/python/oauthlib/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py +++ b/contrib/python/oauthlib/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py @@ -28,7 +28,8 @@ class OpenIDConnectEndpointTest(TestCase): 'redirect_uri': 'https://a.b/cb', 'response_type': 'code', 'client_id': 'abcdef', - 'scope': 'hello openid' + 'scope': 'hello openid', + 'ui_locales': 'en-US' } self.url = 'http://a.b/path?' + urlencode(params) @@ -76,3 +77,4 @@ class OpenIDConnectEndpointTest(TestCase): self.assertEqual(creds['prompt'], {'consent'}) self.assertEqual(creds['nonce'], 'abcd') self.assertEqual(creds['display'], 'touch') + self.assertEqual(creds['ui_locales'], ['en-US']) diff --git a/contrib/python/oauthlib/tests/openid/connect/core/endpoints/test_refresh_token.py b/contrib/python/oauthlib/tests/openid/connect/core/endpoints/test_refresh_token.py new file mode 100644 index 00000000000..9161f5a5f67 --- /dev/null +++ b/contrib/python/oauthlib/tests/openid/connect/core/endpoints/test_refresh_token.py @@ -0,0 +1,32 @@ +"""Ensure that the server correctly uses the OIDC flavor of +the Refresh token grant type when appropriate. + +When the OpenID scope is provided, the refresh token response +should include a fresh ID token. +""" +import json +from unittest import mock + +from oauthlib.openid import RequestValidator +from oauthlib.openid.connect.core.endpoints.pre_configured import Server + +from tests.unittest import TestCase + + +class TestRefreshToken(TestCase): + + def setUp(self): + self.validator = mock.MagicMock(spec=RequestValidator) + self.validator.get_id_token.return_value='id_token' + + self.server = Server(self.validator) + + def test_refresh_token_with_openid(self): + request_body = 'scope=openid+test_scope&grant_type=refresh_token&refresh_token=abc' + headers, body, status = self.server.create_token_response('', body=request_body) + self.assertIn('id_token', json.loads(body)) + + def test_refresh_token_no_openid(self): + request_body = 'scope=test_scope&grant_type=refresh_token&refresh_token=abc' + headers, body, status = self.server.create_token_response('', body=request_body) + self.assertNotIn('id_token', json.loads(body)) diff --git a/contrib/python/oauthlib/tests/test_uri_validate.py b/contrib/python/oauthlib/tests/test_uri_validate.py index 6a9f8ea60bc..f1ac404e04c 100644 --- a/contrib/python/oauthlib/tests/test_uri_validate.py +++ b/contrib/python/oauthlib/tests/test_uri_validate.py @@ -1,4 +1,6 @@ +from datetime import datetime import unittest + from oauthlib.uri_validate import is_absolute_uri from tests.unittest import TestCase @@ -76,7 +78,6 @@ class UriValidateTest(TestCase): self.assertIsNone(is_absolute_uri('http://[abcd:efgh::1]/')) def test_recursive_regex(self): - from datetime import datetime t0 = datetime.now() is_absolute_uri('http://[::::::::::::::::::::::::::]/path') t1 = datetime.now() diff --git a/contrib/python/oauthlib/tests/ya.make b/contrib/python/oauthlib/tests/ya.make index b207e5ea638..041befa192e 100644 --- a/contrib/python/oauthlib/tests/ya.make +++ b/contrib/python/oauthlib/tests/ya.make @@ -2,7 +2,6 @@ PY3TEST() PEERDIR( contrib/python/oauthlib - contrib/python/mock contrib/python/PyJWT contrib/python/blinker ) @@ -63,12 +62,19 @@ TEST_SRCS( oauth2/rfc8628/__init__.py oauth2/rfc8628/clients/__init__.py oauth2/rfc8628/clients/test_device.py + oauth2/rfc8628/endpoints/__init__.py + oauth2/rfc8628/endpoints/test_device_application_server.py + oauth2/rfc8628/endpoints/test_error_responses.py + oauth2/rfc8628/grant_types/__init__.py + oauth2/rfc8628/grant_types/test_device_code.py + oauth2/rfc8628/test_server.py openid/__init__.py openid/connect/__init__.py openid/connect/core/__init__.py openid/connect/core/endpoints/__init__.py openid/connect/core/endpoints/test_claims_handling.py openid/connect/core/endpoints/test_openid_connect_params_handling.py + openid/connect/core/endpoints/test_refresh_token.py openid/connect/core/endpoints/test_userinfo_endpoint.py openid/connect/core/grant_types/__init__.py openid/connect/core/grant_types/test_authorization_code.py @@ -76,6 +82,7 @@ TEST_SRCS( openid/connect/core/grant_types/test_dispatchers.py openid/connect/core/grant_types/test_hybrid.py openid/connect/core/grant_types/test_implicit.py + openid/connect/core/grant_types/test_refresh_token.py openid/connect/core/test_request_validator.py openid/connect/core/test_server.py openid/connect/core/test_tokens.py |