aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/requests-oauthlib/tests
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-03-25 09:11:17 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-03-25 09:17:48 +0300
commit4624e4cfd95649270db02616edde8d0ca249b63d (patch)
tree1c8a43f50533ca759d137f258e42862e8cf5e80f /contrib/python/requests-oauthlib/tests
parentd2d971701bd8377ead5f973c96be81042774bd2a (diff)
downloadydb-4624e4cfd95649270db02616edde8d0ca249b63d.tar.gz
Intermediate changes
Diffstat (limited to 'contrib/python/requests-oauthlib/tests')
-rw-r--r--contrib/python/requests-oauthlib/tests/examples/base.py106
-rw-r--r--contrib/python/requests-oauthlib/tests/examples/test_native_spa_pkce_auth0.py39
-rw-r--r--contrib/python/requests-oauthlib/tests/test_compliance_fixes.py63
-rw-r--r--contrib/python/requests-oauthlib/tests/test_core.py6
-rw-r--r--contrib/python/requests-oauthlib/tests/test_oauth1_session.py39
-rw-r--r--contrib/python/requests-oauthlib/tests/test_oauth2_auth.py1
-rw-r--r--contrib/python/requests-oauthlib/tests/test_oauth2_session.py37
7 files changed, 247 insertions, 44 deletions
diff --git a/contrib/python/requests-oauthlib/tests/examples/base.py b/contrib/python/requests-oauthlib/tests/examples/base.py
new file mode 100644
index 0000000000..2efa5dd746
--- /dev/null
+++ b/contrib/python/requests-oauthlib/tests/examples/base.py
@@ -0,0 +1,106 @@
+import os.path
+import os
+import subprocess
+import shlex
+import shutil
+from selenium import webdriver
+from selenium.webdriver.common.by import By
+from selenium.webdriver.common.keys import Keys
+from selenium.webdriver.support import expected_conditions as EC
+from selenium.webdriver.support.wait import WebDriverWait
+
+
+cwd = os.path.dirname(os.path.realpath(__file__))
+
+
+class Sample():
+ def setUp(self):
+ super().setUp()
+ self.proc = None
+ self.outputs = []
+
+ def tearDown(self):
+ super().tearDown()
+ if self.proc is not None:
+ self.proc.stdin.close()
+ self.proc.stdout.close()
+ self.proc.kill()
+
+ def replaceVariables(self, filein ,fileout, vars):
+ with open(filein, "rt") as fin:
+ with open(fileout, "wt") as fout:
+ for line in fin:
+ for k, v in vars.items():
+ line = line.replace(k, v)
+ fout.write(line)
+
+ def run_sample(self, filepath, variables):
+ inpath = os.path.join(cwd, "..", "..", "docs", "examples", filepath)
+ outpath = os.path.join(cwd, "tmp_{}".format(filepath))
+ self.replaceVariables(inpath, outpath, variables)
+
+ self.proc = subprocess.Popen(
+ [shutil.which("python"),
+ outpath],
+ text=True, bufsize=1,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE
+ )
+
+ def write(self, string):
+ self.proc.stdin.write(string)
+ self.proc.stdin.flush()
+
+ def wait_for_pattern(self, pattern):
+ try:
+ while True:
+ line = self.proc.stdout.readline()
+ self.outputs.append(line)
+ if pattern in line:
+ return line
+ except subprocess.TimeoutExpired:
+ self.assertTrue(False, "timeout when looking for output")
+
+ def wait_for_end(self):
+ try:
+ outs, err = self.proc.communicate(timeout=10)
+ self.outputs += filter(lambda x: x != '', outs.split('\n'))
+ except subprocess.TimeoutExpired:
+ self.assertTrue(False, "timeout when looking for output")
+ return self.outputs[-1]
+
+
+
+class Browser():
+ def setUp(self):
+ super().setUp()
+ options = webdriver.ChromeOptions()
+ options.add_argument("--headless=new")
+ self.driver = webdriver.Chrome(options=options)
+ self.user_username = os.environ.get("AUTH0_USERNAME")
+ self.user_password = os.environ.get("AUTH0_PASSWORD")
+
+ if not self.user_username or not self.user_password:
+ self.skipTest("auth0 is not configured properly")
+
+ def tearDown(self):
+ super().tearDown()
+ self.driver.quit()
+
+ def authorize_auth0(self, authorize_url, expected_redirect_uri):
+ self.driver.get(authorize_url)
+ username = self.driver.find_element(By.ID, "username")
+ password = self.driver.find_element(By.ID, "password")
+
+ wait = WebDriverWait(self.driver, timeout=2)
+ wait.until(lambda d : username.is_displayed())
+ wait.until(lambda d : password.is_displayed())
+
+ username.clear()
+ username.send_keys(self.user_username)
+ password.send_keys(self.user_password)
+ username.send_keys(Keys.RETURN)
+
+ wait.until(EC.url_contains(expected_redirect_uri))
+ return self.driver.current_url
+
diff --git a/contrib/python/requests-oauthlib/tests/examples/test_native_spa_pkce_auth0.py b/contrib/python/requests-oauthlib/tests/examples/test_native_spa_pkce_auth0.py
new file mode 100644
index 0000000000..6ff41e251c
--- /dev/null
+++ b/contrib/python/requests-oauthlib/tests/examples/test_native_spa_pkce_auth0.py
@@ -0,0 +1,39 @@
+import os
+import unittest
+
+from . import base
+
+class TestNativeAuth0Test(base.Sample, base.Browser, unittest.TestCase):
+ def setUp(self):
+ super().setUp()
+ self.client_id = os.environ.get("AUTH0_PKCE_CLIENT_ID")
+ self.idp_domain = os.environ.get("AUTH0_DOMAIN")
+
+ if not self.client_id or not self.idp_domain:
+ self.skipTest("native auth0 is not configured properly")
+
+ def test_login(self):
+ # redirect_uri is http://
+ os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = "1"
+
+ self.run_sample(
+ "native_spa_pkce_auth0.py", {
+ "OAUTH_CLIENT_ID": self.client_id,
+ "OAUTH_IDP_DOMAIN": self.idp_domain,
+ }
+ )
+ authorize_url = self.wait_for_pattern("https://")
+ redirect_uri = self.authorize_auth0(authorize_url, "http://")
+ self.write(redirect_uri)
+ last_line = self.wait_for_end()
+
+ import ast
+ response = ast.literal_eval(last_line)
+ self.assertIn("access_token", response)
+ self.assertIn("id_token", response)
+ self.assertIn("scope", response)
+ self.assertIn("openid", response["scope"])
+ self.assertIn("expires_in", response)
+ self.assertIn("expires_at", response)
+ self.assertIn("token_type", response)
+ self.assertEqual("Bearer", response["token_type"])
diff --git a/contrib/python/requests-oauthlib/tests/test_compliance_fixes.py b/contrib/python/requests-oauthlib/tests/test_compliance_fixes.py
index 5c90d52660..c5166bdb2f 100644
--- a/contrib/python/requests-oauthlib/tests/test_compliance_fixes.py
+++ b/contrib/python/requests-oauthlib/tests/test_compliance_fixes.py
@@ -1,14 +1,10 @@
-from __future__ import unicode_literals
from unittest import TestCase
import requests
import requests_mock
import time
-try:
- from urlparse import urlparse, parse_qs
-except ImportError:
- from urllib.parse import urlparse, parse_qs
+from urllib.parse import urlparse, parse_qs
from oauthlib.oauth2.rfc6749.errors import InvalidGrantError
from requests_oauthlib import OAuth2Session
@@ -332,3 +328,60 @@ class EbayComplianceFixTest(TestCase):
authorization_response="https://i.b/?code=hello",
)
assert token["token_type"] == "Bearer"
+
+
+def access_and_refresh_token_request_compliance_fix_test(session, client_secret):
+ def _non_compliant_header(url, headers, body):
+ headers["X-Client-Secret"] = client_secret
+ return url, headers, body
+
+ session.register_compliance_hook("access_token_request", _non_compliant_header)
+ session.register_compliance_hook("refresh_token_request", _non_compliant_header)
+ return session
+
+
+class RefreshTokenRequestComplianceFixTest(TestCase):
+ value_to_test_for = "value_to_test_for"
+
+ def setUp(self):
+ mocker = requests_mock.Mocker()
+ mocker.post(
+ "https://example.com/token",
+ request_headers={"X-Client-Secret": self.value_to_test_for},
+ json={
+ "access_token": "this is the access token",
+ "expires_in": 7200,
+ "token_type": "Bearer",
+ },
+ headers={"Content-Type": "application/json"},
+ )
+ mocker.post(
+ "https://example.com/refresh",
+ request_headers={"X-Client-Secret": self.value_to_test_for},
+ json={
+ "access_token": "this is the access token",
+ "expires_in": 7200,
+ "token_type": "Bearer",
+ },
+ headers={"Content-Type": "application/json"},
+ )
+ mocker.start()
+ self.addCleanup(mocker.stop)
+
+ session = OAuth2Session()
+ self.fixed_session = access_and_refresh_token_request_compliance_fix_test(
+ session, self.value_to_test_for
+ )
+
+ def test_access_token(self):
+ token = self.fixed_session.fetch_token(
+ "https://example.com/token",
+ authorization_response="https://i.b/?code=hello",
+ )
+ assert token["token_type"] == "Bearer"
+
+ def test_refresh_token(self):
+ token = self.fixed_session.refresh_token(
+ "https://example.com/refresh",
+ )
+ assert token["token_type"] == "Bearer"
diff --git a/contrib/python/requests-oauthlib/tests/test_core.py b/contrib/python/requests-oauthlib/tests/test_core.py
index 6892e9f1ce..09cd0f0212 100644
--- a/contrib/python/requests-oauthlib/tests/test_core.py
+++ b/contrib/python/requests-oauthlib/tests/test_core.py
@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
-from __future__ import unicode_literals
import requests
import requests_oauthlib
import oauthlib
@@ -7,10 +6,7 @@ import os.path
from io import StringIO
import unittest
-try:
- import mock
-except ImportError:
- from unittest import mock
+from unittest import mock
@mock.patch("oauthlib.oauth1.rfc5849.generate_timestamp")
diff --git a/contrib/python/requests-oauthlib/tests/test_oauth1_session.py b/contrib/python/requests-oauthlib/tests/test_oauth1_session.py
index 1dd2b2f158..b3c8c70483 100644
--- a/contrib/python/requests-oauthlib/tests/test_oauth1_session.py
+++ b/contrib/python/requests-oauthlib/tests/test_oauth1_session.py
@@ -1,19 +1,13 @@
-from __future__ import unicode_literals, print_function
import unittest
-import sys
import requests
from io import StringIO
+from unittest import mock
from oauthlib.oauth1 import SIGNATURE_TYPE_QUERY, SIGNATURE_TYPE_BODY
from oauthlib.oauth1 import SIGNATURE_RSA, SIGNATURE_PLAINTEXT
from requests_oauthlib import OAuth1Session
try:
- import mock
-except ImportError:
- from unittest import mock
-
-try:
import cryptography
except ImportError:
cryptography = None
@@ -23,11 +17,6 @@ try:
except ImportError:
jwt = None
-if sys.version[0] == "3":
- unicode_type = str
-else:
- unicode_type = unicode
-
TEST_RSA_KEY = (
"-----BEGIN RSA PRIVATE KEY-----\n"
@@ -165,8 +154,8 @@ class OAuth1SessionTest(unittest.TestCase):
self.assertEqual(resp["oauth_token"], "foo")
self.assertEqual(resp["oauth_verifier"], "bar")
for k, v in resp.items():
- self.assertIsInstance(k, unicode_type)
- self.assertIsInstance(v, unicode_type)
+ self.assertIsInstance(k, str)
+ self.assertIsInstance(v, str)
def test_fetch_request_token(self):
auth = OAuth1Session("foo")
@@ -174,8 +163,8 @@ class OAuth1SessionTest(unittest.TestCase):
resp = auth.fetch_request_token("https://example.com/token")
self.assertEqual(resp["oauth_token"], "foo")
for k, v in resp.items():
- self.assertIsInstance(k, unicode_type)
- self.assertIsInstance(v, unicode_type)
+ self.assertIsInstance(k, str)
+ self.assertIsInstance(v, str)
def test_fetch_request_token_with_optional_arguments(self):
auth = OAuth1Session("foo")
@@ -185,8 +174,8 @@ class OAuth1SessionTest(unittest.TestCase):
)
self.assertEqual(resp["oauth_token"], "foo")
for k, v in resp.items():
- self.assertIsInstance(k, unicode_type)
- self.assertIsInstance(v, unicode_type)
+ self.assertIsInstance(k, str)
+ self.assertIsInstance(v, str)
def test_fetch_access_token(self):
auth = OAuth1Session("foo", verifier="bar")
@@ -194,8 +183,8 @@ class OAuth1SessionTest(unittest.TestCase):
resp = auth.fetch_access_token("https://example.com/token")
self.assertEqual(resp["oauth_token"], "foo")
for k, v in resp.items():
- self.assertIsInstance(k, unicode_type)
- self.assertIsInstance(v, unicode_type)
+ self.assertIsInstance(k, str)
+ self.assertIsInstance(v, str)
def test_fetch_access_token_with_optional_arguments(self):
auth = OAuth1Session("foo", verifier="bar")
@@ -205,8 +194,8 @@ class OAuth1SessionTest(unittest.TestCase):
)
self.assertEqual(resp["oauth_token"], "foo")
for k, v in resp.items():
- self.assertIsInstance(k, unicode_type)
- self.assertIsInstance(v, unicode_type)
+ self.assertIsInstance(k, str)
+ self.assertIsInstance(v, str)
def _test_fetch_access_token_raises_error(self, auth):
"""Assert that an error is being raised whenever there's no verifier
@@ -308,12 +297,6 @@ class OAuth1SessionTest(unittest.TestCase):
generate_nonce.return_value = "abc"
generate_timestamp.return_value = "123"
- signature = (
- "OAuth "
- 'oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", '
- 'oauth_signature_method="RSA-SHA1", oauth_consumer_key="foo", '
- 'oauth_verifier="bar", oauth_signature="{sig}"'
- ).format(sig=TEST_RSA_OAUTH_SIGNATURE)
sess = OAuth1Session(
"key",
"secret",
diff --git a/contrib/python/requests-oauthlib/tests/test_oauth2_auth.py b/contrib/python/requests-oauthlib/tests/test_oauth2_auth.py
index accb561ef6..69ed6f6647 100644
--- a/contrib/python/requests-oauthlib/tests/test_oauth2_auth.py
+++ b/contrib/python/requests-oauthlib/tests/test_oauth2_auth.py
@@ -1,4 +1,3 @@
-from __future__ import unicode_literals
import unittest
from oauthlib.oauth2 import WebApplicationClient, MobileApplicationClient
diff --git a/contrib/python/requests-oauthlib/tests/test_oauth2_session.py b/contrib/python/requests-oauthlib/tests/test_oauth2_session.py
index cfc6236855..7e3e63c57a 100644
--- a/contrib/python/requests-oauthlib/tests/test_oauth2_session.py
+++ b/contrib/python/requests-oauthlib/tests/test_oauth2_session.py
@@ -1,4 +1,3 @@
-from __future__ import unicode_literals
import json
import time
import tempfile
@@ -8,10 +7,7 @@ from base64 import b64encode
from copy import deepcopy
from unittest import TestCase
-try:
- import mock
-except ImportError:
- from unittest import mock
+from unittest import mock
from oauthlib.common import urlencode
from oauthlib.oauth2 import TokenExpiredError, OAuth2Error
@@ -124,6 +120,27 @@ class OAuth2SessionTest(TestCase):
self.assertIn(self.client_id, auth_url)
self.assertIn("response_type=token", auth_url)
+ def test_pkce_authorization_url(self):
+ url = "https://example.com/authorize?foo=bar"
+
+ web = WebApplicationClient(self.client_id)
+ s = OAuth2Session(client=web, pkce="S256")
+ auth_url, state = s.authorization_url(url)
+ self.assertIn(state, auth_url)
+ self.assertIn(self.client_id, auth_url)
+ self.assertIn("response_type=code", auth_url)
+ self.assertIn("code_challenge=", auth_url)
+ self.assertIn("code_challenge_method=S256", auth_url)
+
+ mobile = MobileApplicationClient(self.client_id)
+ s = OAuth2Session(client=mobile, pkce="S256")
+ auth_url, state = s.authorization_url(url)
+ self.assertIn(state, auth_url)
+ self.assertIn(self.client_id, auth_url)
+ self.assertIn("response_type=token", auth_url)
+ self.assertIn("code_challenge=", auth_url)
+ self.assertIn("code_challenge_method=S256", auth_url)
+
@mock.patch("time.time", new=lambda: fake_time)
def test_refresh_token_request(self):
self.expired_token = dict(self.token)
@@ -424,6 +441,16 @@ class OAuth2SessionTest(TestCase):
authorization_response="https://i.b/no-state?code=abc",
)
+ @mock.patch("time.time", new=lambda: fake_time)
+ def test_pkce_web_app_fetch_token(self):
+ url = "https://example.com/token"
+
+ web = WebApplicationClient(self.client_id, code=CODE)
+ sess = OAuth2Session(client=web, token=self.token, pkce="S256")
+ sess.send = fake_token(self.token)
+ sess._code_verifier = "foobar"
+ self.assertEqual(sess.fetch_token(url), self.token)
+
def test_client_id_proxy(self):
sess = OAuth2Session("test-id")
self.assertEqual(sess.client_id, "test-id")