diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-03-25 09:11:17 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-03-25 09:17:48 +0300 |
commit | 4624e4cfd95649270db02616edde8d0ca249b63d (patch) | |
tree | 1c8a43f50533ca759d137f258e42862e8cf5e80f /contrib/python/requests-oauthlib/tests | |
parent | d2d971701bd8377ead5f973c96be81042774bd2a (diff) | |
download | ydb-4624e4cfd95649270db02616edde8d0ca249b63d.tar.gz |
Intermediate changes
Diffstat (limited to 'contrib/python/requests-oauthlib/tests')
7 files changed, 247 insertions, 44 deletions
diff --git a/contrib/python/requests-oauthlib/tests/examples/base.py b/contrib/python/requests-oauthlib/tests/examples/base.py new file mode 100644 index 0000000000..2efa5dd746 --- /dev/null +++ b/contrib/python/requests-oauthlib/tests/examples/base.py @@ -0,0 +1,106 @@ +import os.path +import os +import subprocess +import shlex +import shutil +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.common.keys import Keys +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.wait import WebDriverWait + + +cwd = os.path.dirname(os.path.realpath(__file__)) + + +class Sample(): + def setUp(self): + super().setUp() + self.proc = None + self.outputs = [] + + def tearDown(self): + super().tearDown() + if self.proc is not None: + self.proc.stdin.close() + self.proc.stdout.close() + self.proc.kill() + + def replaceVariables(self, filein ,fileout, vars): + with open(filein, "rt") as fin: + with open(fileout, "wt") as fout: + for line in fin: + for k, v in vars.items(): + line = line.replace(k, v) + fout.write(line) + + def run_sample(self, filepath, variables): + inpath = os.path.join(cwd, "..", "..", "docs", "examples", filepath) + outpath = os.path.join(cwd, "tmp_{}".format(filepath)) + self.replaceVariables(inpath, outpath, variables) + + self.proc = subprocess.Popen( + [shutil.which("python"), + outpath], + text=True, bufsize=1, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE + ) + + def write(self, string): + self.proc.stdin.write(string) + self.proc.stdin.flush() + + def wait_for_pattern(self, pattern): + try: + while True: + line = self.proc.stdout.readline() + self.outputs.append(line) + if pattern in line: + return line + except subprocess.TimeoutExpired: + self.assertTrue(False, "timeout when looking for output") + + def wait_for_end(self): + try: + outs, err = self.proc.communicate(timeout=10) + self.outputs += filter(lambda x: x != '', outs.split('\n')) + except subprocess.TimeoutExpired: + self.assertTrue(False, "timeout when looking for output") + return self.outputs[-1] + + + +class Browser(): + def setUp(self): + super().setUp() + options = webdriver.ChromeOptions() + options.add_argument("--headless=new") + self.driver = webdriver.Chrome(options=options) + self.user_username = os.environ.get("AUTH0_USERNAME") + self.user_password = os.environ.get("AUTH0_PASSWORD") + + if not self.user_username or not self.user_password: + self.skipTest("auth0 is not configured properly") + + def tearDown(self): + super().tearDown() + self.driver.quit() + + def authorize_auth0(self, authorize_url, expected_redirect_uri): + self.driver.get(authorize_url) + username = self.driver.find_element(By.ID, "username") + password = self.driver.find_element(By.ID, "password") + + wait = WebDriverWait(self.driver, timeout=2) + wait.until(lambda d : username.is_displayed()) + wait.until(lambda d : password.is_displayed()) + + username.clear() + username.send_keys(self.user_username) + password.send_keys(self.user_password) + username.send_keys(Keys.RETURN) + + wait.until(EC.url_contains(expected_redirect_uri)) + return self.driver.current_url + diff --git a/contrib/python/requests-oauthlib/tests/examples/test_native_spa_pkce_auth0.py b/contrib/python/requests-oauthlib/tests/examples/test_native_spa_pkce_auth0.py new file mode 100644 index 0000000000..6ff41e251c --- /dev/null +++ b/contrib/python/requests-oauthlib/tests/examples/test_native_spa_pkce_auth0.py @@ -0,0 +1,39 @@ +import os +import unittest + +from . import base + +class TestNativeAuth0Test(base.Sample, base.Browser, unittest.TestCase): + def setUp(self): + super().setUp() + self.client_id = os.environ.get("AUTH0_PKCE_CLIENT_ID") + self.idp_domain = os.environ.get("AUTH0_DOMAIN") + + if not self.client_id or not self.idp_domain: + self.skipTest("native auth0 is not configured properly") + + def test_login(self): + # redirect_uri is http:// + os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = "1" + + self.run_sample( + "native_spa_pkce_auth0.py", { + "OAUTH_CLIENT_ID": self.client_id, + "OAUTH_IDP_DOMAIN": self.idp_domain, + } + ) + authorize_url = self.wait_for_pattern("https://") + redirect_uri = self.authorize_auth0(authorize_url, "http://") + self.write(redirect_uri) + last_line = self.wait_for_end() + + import ast + response = ast.literal_eval(last_line) + self.assertIn("access_token", response) + self.assertIn("id_token", response) + self.assertIn("scope", response) + self.assertIn("openid", response["scope"]) + self.assertIn("expires_in", response) + self.assertIn("expires_at", response) + self.assertIn("token_type", response) + self.assertEqual("Bearer", response["token_type"]) diff --git a/contrib/python/requests-oauthlib/tests/test_compliance_fixes.py b/contrib/python/requests-oauthlib/tests/test_compliance_fixes.py index 5c90d52660..c5166bdb2f 100644 --- a/contrib/python/requests-oauthlib/tests/test_compliance_fixes.py +++ b/contrib/python/requests-oauthlib/tests/test_compliance_fixes.py @@ -1,14 +1,10 @@ -from __future__ import unicode_literals from unittest import TestCase import requests import requests_mock import time -try: - from urlparse import urlparse, parse_qs -except ImportError: - from urllib.parse import urlparse, parse_qs +from urllib.parse import urlparse, parse_qs from oauthlib.oauth2.rfc6749.errors import InvalidGrantError from requests_oauthlib import OAuth2Session @@ -332,3 +328,60 @@ class EbayComplianceFixTest(TestCase): authorization_response="https://i.b/?code=hello", ) assert token["token_type"] == "Bearer" + + +def access_and_refresh_token_request_compliance_fix_test(session, client_secret): + def _non_compliant_header(url, headers, body): + headers["X-Client-Secret"] = client_secret + return url, headers, body + + session.register_compliance_hook("access_token_request", _non_compliant_header) + session.register_compliance_hook("refresh_token_request", _non_compliant_header) + return session + + +class RefreshTokenRequestComplianceFixTest(TestCase): + value_to_test_for = "value_to_test_for" + + def setUp(self): + mocker = requests_mock.Mocker() + mocker.post( + "https://example.com/token", + request_headers={"X-Client-Secret": self.value_to_test_for}, + json={ + "access_token": "this is the access token", + "expires_in": 7200, + "token_type": "Bearer", + }, + headers={"Content-Type": "application/json"}, + ) + mocker.post( + "https://example.com/refresh", + request_headers={"X-Client-Secret": self.value_to_test_for}, + json={ + "access_token": "this is the access token", + "expires_in": 7200, + "token_type": "Bearer", + }, + headers={"Content-Type": "application/json"}, + ) + mocker.start() + self.addCleanup(mocker.stop) + + session = OAuth2Session() + self.fixed_session = access_and_refresh_token_request_compliance_fix_test( + session, self.value_to_test_for + ) + + def test_access_token(self): + token = self.fixed_session.fetch_token( + "https://example.com/token", + authorization_response="https://i.b/?code=hello", + ) + assert token["token_type"] == "Bearer" + + def test_refresh_token(self): + token = self.fixed_session.refresh_token( + "https://example.com/refresh", + ) + assert token["token_type"] == "Bearer" diff --git a/contrib/python/requests-oauthlib/tests/test_core.py b/contrib/python/requests-oauthlib/tests/test_core.py index 6892e9f1ce..09cd0f0212 100644 --- a/contrib/python/requests-oauthlib/tests/test_core.py +++ b/contrib/python/requests-oauthlib/tests/test_core.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -from __future__ import unicode_literals import requests import requests_oauthlib import oauthlib @@ -7,10 +6,7 @@ import os.path from io import StringIO import unittest -try: - import mock -except ImportError: - from unittest import mock +from unittest import mock @mock.patch("oauthlib.oauth1.rfc5849.generate_timestamp") diff --git a/contrib/python/requests-oauthlib/tests/test_oauth1_session.py b/contrib/python/requests-oauthlib/tests/test_oauth1_session.py index 1dd2b2f158..b3c8c70483 100644 --- a/contrib/python/requests-oauthlib/tests/test_oauth1_session.py +++ b/contrib/python/requests-oauthlib/tests/test_oauth1_session.py @@ -1,19 +1,13 @@ -from __future__ import unicode_literals, print_function import unittest -import sys import requests from io import StringIO +from unittest import mock from oauthlib.oauth1 import SIGNATURE_TYPE_QUERY, SIGNATURE_TYPE_BODY from oauthlib.oauth1 import SIGNATURE_RSA, SIGNATURE_PLAINTEXT from requests_oauthlib import OAuth1Session try: - import mock -except ImportError: - from unittest import mock - -try: import cryptography except ImportError: cryptography = None @@ -23,11 +17,6 @@ try: except ImportError: jwt = None -if sys.version[0] == "3": - unicode_type = str -else: - unicode_type = unicode - TEST_RSA_KEY = ( "-----BEGIN RSA PRIVATE KEY-----\n" @@ -165,8 +154,8 @@ class OAuth1SessionTest(unittest.TestCase): self.assertEqual(resp["oauth_token"], "foo") self.assertEqual(resp["oauth_verifier"], "bar") for k, v in resp.items(): - self.assertIsInstance(k, unicode_type) - self.assertIsInstance(v, unicode_type) + self.assertIsInstance(k, str) + self.assertIsInstance(v, str) def test_fetch_request_token(self): auth = OAuth1Session("foo") @@ -174,8 +163,8 @@ class OAuth1SessionTest(unittest.TestCase): resp = auth.fetch_request_token("https://example.com/token") self.assertEqual(resp["oauth_token"], "foo") for k, v in resp.items(): - self.assertIsInstance(k, unicode_type) - self.assertIsInstance(v, unicode_type) + self.assertIsInstance(k, str) + self.assertIsInstance(v, str) def test_fetch_request_token_with_optional_arguments(self): auth = OAuth1Session("foo") @@ -185,8 +174,8 @@ class OAuth1SessionTest(unittest.TestCase): ) self.assertEqual(resp["oauth_token"], "foo") for k, v in resp.items(): - self.assertIsInstance(k, unicode_type) - self.assertIsInstance(v, unicode_type) + self.assertIsInstance(k, str) + self.assertIsInstance(v, str) def test_fetch_access_token(self): auth = OAuth1Session("foo", verifier="bar") @@ -194,8 +183,8 @@ class OAuth1SessionTest(unittest.TestCase): resp = auth.fetch_access_token("https://example.com/token") self.assertEqual(resp["oauth_token"], "foo") for k, v in resp.items(): - self.assertIsInstance(k, unicode_type) - self.assertIsInstance(v, unicode_type) + self.assertIsInstance(k, str) + self.assertIsInstance(v, str) def test_fetch_access_token_with_optional_arguments(self): auth = OAuth1Session("foo", verifier="bar") @@ -205,8 +194,8 @@ class OAuth1SessionTest(unittest.TestCase): ) self.assertEqual(resp["oauth_token"], "foo") for k, v in resp.items(): - self.assertIsInstance(k, unicode_type) - self.assertIsInstance(v, unicode_type) + self.assertIsInstance(k, str) + self.assertIsInstance(v, str) def _test_fetch_access_token_raises_error(self, auth): """Assert that an error is being raised whenever there's no verifier @@ -308,12 +297,6 @@ class OAuth1SessionTest(unittest.TestCase): generate_nonce.return_value = "abc" generate_timestamp.return_value = "123" - signature = ( - "OAuth " - 'oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", ' - 'oauth_signature_method="RSA-SHA1", oauth_consumer_key="foo", ' - 'oauth_verifier="bar", oauth_signature="{sig}"' - ).format(sig=TEST_RSA_OAUTH_SIGNATURE) sess = OAuth1Session( "key", "secret", diff --git a/contrib/python/requests-oauthlib/tests/test_oauth2_auth.py b/contrib/python/requests-oauthlib/tests/test_oauth2_auth.py index accb561ef6..69ed6f6647 100644 --- a/contrib/python/requests-oauthlib/tests/test_oauth2_auth.py +++ b/contrib/python/requests-oauthlib/tests/test_oauth2_auth.py @@ -1,4 +1,3 @@ -from __future__ import unicode_literals import unittest from oauthlib.oauth2 import WebApplicationClient, MobileApplicationClient diff --git a/contrib/python/requests-oauthlib/tests/test_oauth2_session.py b/contrib/python/requests-oauthlib/tests/test_oauth2_session.py index cfc6236855..7e3e63c57a 100644 --- a/contrib/python/requests-oauthlib/tests/test_oauth2_session.py +++ b/contrib/python/requests-oauthlib/tests/test_oauth2_session.py @@ -1,4 +1,3 @@ -from __future__ import unicode_literals import json import time import tempfile @@ -8,10 +7,7 @@ from base64 import b64encode from copy import deepcopy from unittest import TestCase -try: - import mock -except ImportError: - from unittest import mock +from unittest import mock from oauthlib.common import urlencode from oauthlib.oauth2 import TokenExpiredError, OAuth2Error @@ -124,6 +120,27 @@ class OAuth2SessionTest(TestCase): self.assertIn(self.client_id, auth_url) self.assertIn("response_type=token", auth_url) + def test_pkce_authorization_url(self): + url = "https://example.com/authorize?foo=bar" + + web = WebApplicationClient(self.client_id) + s = OAuth2Session(client=web, pkce="S256") + auth_url, state = s.authorization_url(url) + self.assertIn(state, auth_url) + self.assertIn(self.client_id, auth_url) + self.assertIn("response_type=code", auth_url) + self.assertIn("code_challenge=", auth_url) + self.assertIn("code_challenge_method=S256", auth_url) + + mobile = MobileApplicationClient(self.client_id) + s = OAuth2Session(client=mobile, pkce="S256") + auth_url, state = s.authorization_url(url) + self.assertIn(state, auth_url) + self.assertIn(self.client_id, auth_url) + self.assertIn("response_type=token", auth_url) + self.assertIn("code_challenge=", auth_url) + self.assertIn("code_challenge_method=S256", auth_url) + @mock.patch("time.time", new=lambda: fake_time) def test_refresh_token_request(self): self.expired_token = dict(self.token) @@ -424,6 +441,16 @@ class OAuth2SessionTest(TestCase): authorization_response="https://i.b/no-state?code=abc", ) + @mock.patch("time.time", new=lambda: fake_time) + def test_pkce_web_app_fetch_token(self): + url = "https://example.com/token" + + web = WebApplicationClient(self.client_id, code=CODE) + sess = OAuth2Session(client=web, token=self.token, pkce="S256") + sess.send = fake_token(self.token) + sess._code_verifier = "foobar" + self.assertEqual(sess.fetch_token(url), self.token) + def test_client_id_proxy(self): sess = OAuth2Session("test-id") self.assertEqual(sess.client_id, "test-id") |