aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/requests-oauthlib/tests
diff options
context:
space:
mode:
authoralexv-smirnov <alex@ydb.tech>2023-12-01 12:02:50 +0300
committeralexv-smirnov <alex@ydb.tech>2023-12-01 13:28:10 +0300
commit0e578a4c44d4abd539d9838347b9ebafaca41dfb (patch)
treea0c1969c37f818c830ebeff9c077eacf30be6ef8 /contrib/python/requests-oauthlib/tests
parent84f2d3d4cc985e63217cff149bd2e6d67ae6fe22 (diff)
downloadydb-0e578a4c44d4abd539d9838347b9ebafaca41dfb.tar.gz
Change "ya.make"
Diffstat (limited to 'contrib/python/requests-oauthlib/tests')
-rw-r--r--contrib/python/requests-oauthlib/tests/__init__.py0
-rw-r--r--contrib/python/requests-oauthlib/tests/test.bin1
-rw-r--r--contrib/python/requests-oauthlib/tests/test_compliance_fixes.py334
-rw-r--r--contrib/python/requests-oauthlib/tests/test_core.py170
-rw-r--r--contrib/python/requests-oauthlib/tests/test_oauth1_session.py348
-rw-r--r--contrib/python/requests-oauthlib/tests/test_oauth2_auth.py54
-rw-r--r--contrib/python/requests-oauthlib/tests/test_oauth2_session.py527
-rw-r--r--contrib/python/requests-oauthlib/tests/ya.make28
8 files changed, 1462 insertions, 0 deletions
diff --git a/contrib/python/requests-oauthlib/tests/__init__.py b/contrib/python/requests-oauthlib/tests/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/contrib/python/requests-oauthlib/tests/__init__.py
diff --git a/contrib/python/requests-oauthlib/tests/test.bin b/contrib/python/requests-oauthlib/tests/test.bin
new file mode 100644
index 00000000000..b00d4f4796b
--- /dev/null
+++ b/contrib/python/requests-oauthlib/tests/test.bin
@@ -0,0 +1 @@
+¥Æ \ No newline at end of file
diff --git a/contrib/python/requests-oauthlib/tests/test_compliance_fixes.py b/contrib/python/requests-oauthlib/tests/test_compliance_fixes.py
new file mode 100644
index 00000000000..5c90d526608
--- /dev/null
+++ b/contrib/python/requests-oauthlib/tests/test_compliance_fixes.py
@@ -0,0 +1,334 @@
+from __future__ import unicode_literals
+from unittest import TestCase
+
+import requests
+import requests_mock
+import time
+
+try:
+ from urlparse import urlparse, parse_qs
+except ImportError:
+ from urllib.parse import urlparse, parse_qs
+
+from oauthlib.oauth2.rfc6749.errors import InvalidGrantError
+from requests_oauthlib import OAuth2Session
+from requests_oauthlib.compliance_fixes import facebook_compliance_fix
+from requests_oauthlib.compliance_fixes import fitbit_compliance_fix
+from requests_oauthlib.compliance_fixes import mailchimp_compliance_fix
+from requests_oauthlib.compliance_fixes import weibo_compliance_fix
+from requests_oauthlib.compliance_fixes import slack_compliance_fix
+from requests_oauthlib.compliance_fixes import instagram_compliance_fix
+from requests_oauthlib.compliance_fixes import plentymarkets_compliance_fix
+from requests_oauthlib.compliance_fixes import ebay_compliance_fix
+
+
+class FacebookComplianceFixTest(TestCase):
+ def setUp(self):
+ mocker = requests_mock.Mocker()
+ mocker.post(
+ "https://graph.facebook.com/oauth/access_token",
+ text="access_token=urlencoded",
+ headers={"Content-Type": "text/plain"},
+ )
+ mocker.start()
+ self.addCleanup(mocker.stop)
+
+ facebook = OAuth2Session("someclientid", redirect_uri="https://i.b")
+ self.session = facebook_compliance_fix(facebook)
+
+ def test_fetch_access_token(self):
+ token = self.session.fetch_token(
+ "https://graph.facebook.com/oauth/access_token",
+ client_secret="someclientsecret",
+ authorization_response="https://i.b/?code=hello",
+ )
+ self.assertEqual(token, {"access_token": "urlencoded", "token_type": "Bearer"})
+
+
+class FitbitComplianceFixTest(TestCase):
+ def setUp(self):
+ self.mocker = requests_mock.Mocker()
+ self.mocker.post(
+ "https://api.fitbit.com/oauth2/token",
+ json={"errors": [{"errorType": "invalid_grant"}]},
+ )
+ self.mocker.start()
+ self.addCleanup(self.mocker.stop)
+
+ fitbit = OAuth2Session("someclientid", redirect_uri="https://i.b")
+ self.session = fitbit_compliance_fix(fitbit)
+
+ def test_fetch_access_token(self):
+ self.assertRaises(
+ InvalidGrantError,
+ self.session.fetch_token,
+ "https://api.fitbit.com/oauth2/token",
+ client_secret="someclientsecret",
+ authorization_response="https://i.b/?code=hello",
+ )
+
+ self.mocker.post(
+ "https://api.fitbit.com/oauth2/token", json={"access_token": "fitbit"}
+ )
+
+ token = self.session.fetch_token(
+ "https://api.fitbit.com/oauth2/token", client_secret="good"
+ )
+
+ self.assertEqual(token, {"access_token": "fitbit"})
+
+ def test_refresh_token(self):
+ self.assertRaises(
+ InvalidGrantError,
+ self.session.refresh_token,
+ "https://api.fitbit.com/oauth2/token",
+ auth=requests.auth.HTTPBasicAuth("someclientid", "someclientsecret"),
+ )
+
+ self.mocker.post(
+ "https://api.fitbit.com/oauth2/token",
+ json={"access_token": "access", "refresh_token": "refresh"},
+ )
+
+ token = self.session.refresh_token(
+ "https://api.fitbit.com/oauth2/token",
+ auth=requests.auth.HTTPBasicAuth("someclientid", "someclientsecret"),
+ )
+
+ self.assertEqual(token["access_token"], "access")
+ self.assertEqual(token["refresh_token"], "refresh")
+
+
+class MailChimpComplianceFixTest(TestCase):
+ def setUp(self):
+ mocker = requests_mock.Mocker()
+ mocker.post(
+ "https://login.mailchimp.com/oauth2/token",
+ json={"access_token": "mailchimp", "expires_in": 0, "scope": None},
+ )
+ mocker.start()
+ self.addCleanup(mocker.stop)
+
+ mailchimp = OAuth2Session("someclientid", redirect_uri="https://i.b")
+ self.session = mailchimp_compliance_fix(mailchimp)
+
+ def test_fetch_access_token(self):
+ token = self.session.fetch_token(
+ "https://login.mailchimp.com/oauth2/token",
+ client_secret="someclientsecret",
+ authorization_response="https://i.b/?code=hello",
+ )
+ # Times should be close
+ approx_expires_at = time.time() + 3600
+ actual_expires_at = token.pop("expires_at")
+ self.assertAlmostEqual(actual_expires_at, approx_expires_at, places=2)
+
+ # Other token values exact
+ self.assertEqual(token, {"access_token": "mailchimp", "expires_in": 3600})
+
+ # And no scope at all
+ self.assertNotIn("scope", token)
+
+
+class WeiboComplianceFixTest(TestCase):
+ def setUp(self):
+ mocker = requests_mock.Mocker()
+ mocker.post(
+ "https://api.weibo.com/oauth2/access_token", json={"access_token": "weibo"}
+ )
+ mocker.start()
+ self.addCleanup(mocker.stop)
+
+ weibo = OAuth2Session("someclientid", redirect_uri="https://i.b")
+ self.session = weibo_compliance_fix(weibo)
+
+ def test_fetch_access_token(self):
+ token = self.session.fetch_token(
+ "https://api.weibo.com/oauth2/access_token",
+ client_secret="someclientsecret",
+ authorization_response="https://i.b/?code=hello",
+ )
+ self.assertEqual(token, {"access_token": "weibo", "token_type": "Bearer"})
+
+
+class SlackComplianceFixTest(TestCase):
+ def setUp(self):
+ mocker = requests_mock.Mocker()
+ mocker.post(
+ "https://slack.com/api/oauth.access",
+ json={"access_token": "xoxt-23984754863-2348975623103", "scope": "read"},
+ )
+ for method in ("GET", "POST"):
+ mocker.request(
+ method=method,
+ url="https://slack.com/api/auth.test",
+ json={
+ "ok": True,
+ "url": "https://myteam.slack.com/",
+ "team": "My Team",
+ "user": "cal",
+ "team_id": "T12345",
+ "user_id": "U12345",
+ },
+ )
+ mocker.start()
+ self.addCleanup(mocker.stop)
+
+ slack = OAuth2Session("someclientid", redirect_uri="https://i.b")
+ self.session = slack_compliance_fix(slack)
+
+ def test_protected_request(self):
+ self.session.token = {"access_token": "dummy-access-token"}
+ response = self.session.get("https://slack.com/api/auth.test")
+ url = response.request.url
+ query = parse_qs(urlparse(url).query)
+ self.assertNotIn("token", query)
+ body = response.request.body
+ data = parse_qs(body)
+ self.assertEqual(data["token"], ["dummy-access-token"])
+
+ def test_protected_request_override_token_get(self):
+ self.session.token = {"access_token": "dummy-access-token"}
+ response = self.session.get(
+ "https://slack.com/api/auth.test", data={"token": "different-token"}
+ )
+ url = response.request.url
+ query = parse_qs(urlparse(url).query)
+ self.assertNotIn("token", query)
+ body = response.request.body
+ data = parse_qs(body)
+ self.assertEqual(data["token"], ["different-token"])
+
+ def test_protected_request_override_token_post(self):
+ self.session.token = {"access_token": "dummy-access-token"}
+ response = self.session.post(
+ "https://slack.com/api/auth.test", data={"token": "different-token"}
+ )
+ url = response.request.url
+ query = parse_qs(urlparse(url).query)
+ self.assertNotIn("token", query)
+ body = response.request.body
+ data = parse_qs(body)
+ self.assertEqual(data["token"], ["different-token"])
+
+ def test_protected_request_override_token_url(self):
+ self.session.token = {"access_token": "dummy-access-token"}
+ response = self.session.get(
+ "https://slack.com/api/auth.test?token=different-token"
+ )
+ url = response.request.url
+ query = parse_qs(urlparse(url).query)
+ self.assertEqual(query["token"], ["different-token"])
+ self.assertIsNone(response.request.body)
+
+
+class InstagramComplianceFixTest(TestCase):
+ def setUp(self):
+ mocker = requests_mock.Mocker()
+ mocker.request(
+ method="GET",
+ url="https://api.instagram.com/v1/users/self",
+ json={
+ "data": {
+ "id": "1574083",
+ "username": "snoopdogg",
+ "full_name": "Snoop Dogg",
+ "profile_picture": "http://distillery.s3.amazonaws.com/profiles/profile_1574083_75sq_1295469061.jpg",
+ "bio": "This is my bio",
+ "website": "http://snoopdogg.com",
+ "is_business": False,
+ "counts": {"media": 1320, "follows": 420, "followed_by": 3410},
+ }
+ },
+ )
+ mocker.start()
+ self.addCleanup(mocker.stop)
+
+ instagram = OAuth2Session("someclientid", redirect_uri="https://i.b")
+ self.session = instagram_compliance_fix(instagram)
+
+ def test_protected_request(self):
+ self.session.token = {"access_token": "dummy-access-token"}
+ response = self.session.get("https://api.instagram.com/v1/users/self")
+ url = response.request.url
+ query = parse_qs(urlparse(url).query)
+ self.assertIn("access_token", query)
+ self.assertEqual(query["access_token"], ["dummy-access-token"])
+
+ def test_protected_request_dont_override(self):
+ """check that if the access_token param
+ already exist we don't override it"""
+ self.session.token = {"access_token": "dummy-access-token"}
+ response = self.session.get(
+ "https://api.instagram.com/v1/users/self?access_token=correct-access-token"
+ )
+ url = response.request.url
+ query = parse_qs(urlparse(url).query)
+ self.assertIn("access_token", query)
+ self.assertEqual(query["access_token"], ["correct-access-token"])
+
+
+class PlentymarketsComplianceFixTest(TestCase):
+ def setUp(self):
+ mocker = requests_mock.Mocker()
+ mocker.post(
+ "https://shop.plentymarkets-cloud02.com",
+ json={
+ "accessToken": "ecUN1r8KhJewMCdLAmpHOdZ4O0ofXKB9zf6CXK61",
+ "tokenType": "Bearer",
+ "expiresIn": 86400,
+ "refreshToken": "iG2kBGIjcXaRE4xmTVUnv7xwxX7XMcWCHqJmFaSX",
+ },
+ headers={"Content-Type": "application/json"},
+ )
+ mocker.start()
+ self.addCleanup(mocker.stop)
+
+ plentymarkets = OAuth2Session("someclientid", redirect_uri="https://i.b")
+ self.session = plentymarkets_compliance_fix(plentymarkets)
+
+ def test_fetch_access_token(self):
+ token = self.session.fetch_token(
+ "https://shop.plentymarkets-cloud02.com",
+ authorization_response="https://i.b/?code=hello",
+ )
+
+ approx_expires_at = time.time() + 86400
+ actual_expires_at = token.pop("expires_at")
+ self.assertAlmostEqual(actual_expires_at, approx_expires_at, places=2)
+
+ self.assertEqual(
+ token,
+ {
+ "access_token": "ecUN1r8KhJewMCdLAmpHOdZ4O0ofXKB9zf6CXK61",
+ "expires_in": 86400,
+ "token_type": "Bearer",
+ "refresh_token": "iG2kBGIjcXaRE4xmTVUnv7xwxX7XMcWCHqJmFaSX",
+ },
+ )
+
+
+class EbayComplianceFixTest(TestCase):
+ def setUp(self):
+ mocker = requests_mock.Mocker()
+ mocker.post(
+ "https://api.ebay.com/identity/v1/oauth2/token",
+ json={
+ "access_token": "this is the access token",
+ "expires_in": 7200,
+ "token_type": "Application Access Token",
+ },
+ headers={"Content-Type": "application/json"},
+ )
+ mocker.start()
+ self.addCleanup(mocker.stop)
+
+ session = OAuth2Session()
+ self.fixed_session = ebay_compliance_fix(session)
+
+ def test_fetch_access_token(self):
+ token = self.fixed_session.fetch_token(
+ "https://api.ebay.com/identity/v1/oauth2/token",
+ authorization_response="https://i.b/?code=hello",
+ )
+ assert token["token_type"] == "Bearer"
diff --git a/contrib/python/requests-oauthlib/tests/test_core.py b/contrib/python/requests-oauthlib/tests/test_core.py
new file mode 100644
index 00000000000..6892e9f1ce8
--- /dev/null
+++ b/contrib/python/requests-oauthlib/tests/test_core.py
@@ -0,0 +1,170 @@
+# -*- coding: utf-8 -*-
+from __future__ import unicode_literals
+import requests
+import requests_oauthlib
+import oauthlib
+import os.path
+from io import StringIO
+import unittest
+
+try:
+ import mock
+except ImportError:
+ from unittest import mock
+
+
+@mock.patch("oauthlib.oauth1.rfc5849.generate_timestamp")
+@mock.patch("oauthlib.oauth1.rfc5849.generate_nonce")
+class OAuth1Test(unittest.TestCase):
+ def testFormEncoded(self, generate_nonce, generate_timestamp):
+ """OAuth1 assumes form encoded if content type is not specified."""
+ generate_nonce.return_value = "abc"
+ generate_timestamp.return_value = "1"
+ oauth = requests_oauthlib.OAuth1("client_key")
+ headers = {"Content-type": "application/x-www-form-urlencoded"}
+ r = requests.Request(
+ method="POST",
+ url="http://a.b/path?query=retain",
+ auth=oauth,
+ data="this=really&is=&+form=encoded",
+ headers=headers,
+ )
+ a = r.prepare()
+
+ self.assertEqual(a.url, "http://a.b/path?query=retain")
+ self.assertEqual(a.body, b"this=really&is=&+form=encoded")
+ self.assertEqual(
+ a.headers.get("Content-Type"), b"application/x-www-form-urlencoded"
+ )
+
+ # guess content-type
+ r = requests.Request(
+ method="POST",
+ url="http://a.b/path?query=retain",
+ auth=oauth,
+ data="this=really&is=&+form=encoded",
+ )
+ b = r.prepare()
+ self.assertEqual(b.url, "http://a.b/path?query=retain")
+ self.assertEqual(b.body, b"this=really&is=&+form=encoded")
+ self.assertEqual(
+ b.headers.get("Content-Type"), b"application/x-www-form-urlencoded"
+ )
+
+ self.assertEqual(a.headers.get("Authorization"), b.headers.get("Authorization"))
+
+ def testNonFormEncoded(self, generate_nonce, generate_timestamp):
+ """OAuth signature only depend on body if it is form encoded."""
+ generate_nonce.return_value = "abc"
+ generate_timestamp.return_value = "1"
+ oauth = requests_oauthlib.OAuth1("client_key")
+
+ r = requests.Request(
+ method="POST",
+ url="http://a.b/path?query=retain",
+ auth=oauth,
+ data="this really is not form encoded",
+ )
+ a = r.prepare()
+
+ r = requests.Request(
+ method="POST", url="http://a.b/path?query=retain", auth=oauth
+ )
+ b = r.prepare()
+
+ self.assertEqual(a.headers.get("Authorization"), b.headers.get("Authorization"))
+
+ r = requests.Request(
+ method="POST",
+ url="http://a.b/path?query=retain",
+ auth=oauth,
+ files={"test": StringIO("hello")},
+ )
+ c = r.prepare()
+
+ self.assertEqual(b.headers.get("Authorization"), c.headers.get("Authorization"))
+
+ @unittest.skip("test uses real http://httpbin.org")
+ def testCanPostBinaryData(self, generate_nonce, generate_timestamp):
+ """
+ Test we can post binary data. Should prevent regression of the
+ UnicodeDecodeError issue.
+ """
+ generate_nonce.return_value = "abc"
+ generate_timestamp.return_value = "1"
+ oauth = requests_oauthlib.OAuth1("client_key")
+ import yatest.common
+ dirname = yatest.common.test_source_path()
+ fname = os.path.join(dirname, "test.bin")
+
+ with open(fname, "rb") as f:
+ r = requests.post(
+ "http://httpbin.org/post",
+ data={"hi": "there"},
+ files={"media": (os.path.basename(f.name), f)},
+ headers={"content-type": "application/octet-stream"},
+ auth=oauth,
+ )
+ self.assertEqual(r.status_code, 200)
+
+ @unittest.skip("test uses real http://httpbin.org")
+ def test_url_is_native_str(self, generate_nonce, generate_timestamp):
+ """
+ Test that the URL is always a native string.
+ """
+ generate_nonce.return_value = "abc"
+ generate_timestamp.return_value = "1"
+ oauth = requests_oauthlib.OAuth1("client_key")
+
+ r = requests.get("http://httpbin.org/get", auth=oauth)
+ self.assertIsInstance(r.request.url, str)
+
+ @unittest.skip("test uses real http://httpbin.org")
+ def test_content_type_override(self, generate_nonce, generate_timestamp):
+ """
+ Content type should only be guessed if none is given.
+ """
+ generate_nonce.return_value = "abc"
+ generate_timestamp.return_value = "1"
+ oauth = requests_oauthlib.OAuth1("client_key")
+ data = "a"
+ r = requests.post("http://httpbin.org/get", data=data, auth=oauth)
+ self.assertEqual(
+ r.request.headers.get("Content-Type"), b"application/x-www-form-urlencoded"
+ )
+ r = requests.post(
+ "http://httpbin.org/get",
+ auth=oauth,
+ data=data,
+ headers={"Content-type": "application/json"},
+ )
+ self.assertEqual(r.request.headers.get("Content-Type"), b"application/json")
+
+ def test_register_client_class(self, generate_timestamp, generate_nonce):
+ class ClientSubclass(oauthlib.oauth1.Client):
+ pass
+
+ self.assertTrue(hasattr(requests_oauthlib.OAuth1, "client_class"))
+
+ self.assertEqual(requests_oauthlib.OAuth1.client_class, oauthlib.oauth1.Client)
+
+ normal = requests_oauthlib.OAuth1("client_key")
+
+ self.assertIsInstance(normal.client, oauthlib.oauth1.Client)
+ self.assertNotIsInstance(normal.client, ClientSubclass)
+
+ requests_oauthlib.OAuth1.client_class = ClientSubclass
+
+ self.assertEqual(requests_oauthlib.OAuth1.client_class, ClientSubclass)
+
+ custom = requests_oauthlib.OAuth1("client_key")
+
+ self.assertIsInstance(custom.client, oauthlib.oauth1.Client)
+ self.assertIsInstance(custom.client, ClientSubclass)
+
+ overridden = requests_oauthlib.OAuth1(
+ "client_key", client_class=oauthlib.oauth1.Client
+ )
+
+ self.assertIsInstance(overridden.client, oauthlib.oauth1.Client)
+ self.assertNotIsInstance(normal.client, ClientSubclass)
diff --git a/contrib/python/requests-oauthlib/tests/test_oauth1_session.py b/contrib/python/requests-oauthlib/tests/test_oauth1_session.py
new file mode 100644
index 00000000000..1dd2b2f1586
--- /dev/null
+++ b/contrib/python/requests-oauthlib/tests/test_oauth1_session.py
@@ -0,0 +1,348 @@
+from __future__ import unicode_literals, print_function
+import unittest
+import sys
+import requests
+from io import StringIO
+
+from oauthlib.oauth1 import SIGNATURE_TYPE_QUERY, SIGNATURE_TYPE_BODY
+from oauthlib.oauth1 import SIGNATURE_RSA, SIGNATURE_PLAINTEXT
+from requests_oauthlib import OAuth1Session
+
+try:
+ import mock
+except ImportError:
+ from unittest import mock
+
+try:
+ import cryptography
+except ImportError:
+ cryptography = None
+
+try:
+ import jwt
+except ImportError:
+ jwt = None
+
+if sys.version[0] == "3":
+ unicode_type = str
+else:
+ unicode_type = unicode
+
+
+TEST_RSA_KEY = (
+ "-----BEGIN RSA PRIVATE KEY-----\n"
+ "MIIEogIBAAKCAQEApF1JaMSN8TEsh4N4O/5SpEAVLivJyLH+Cgl3OQBPGgJkt8cg\n"
+ "49oasl+5iJS+VdrILxWM9/JCJyURpUuslX4Eb4eUBtQ0x5BaPa8+S2NLdGTaL7nB\n"
+ "OO8o8n0C5FEUU+qlEip79KE8aqOj+OC44VsIquSmOvWIQD26n3fCVlgwoRBD1gzz\n"
+ "sDOeaSyzpKrZR851Kh6rEmF2qjJ8jt6EkxMsRNACmBomzgA4M1TTsisSUO87444p\n"
+ "e35Z4/n5c735o2fZMrGgMwiJNh7rT8SYxtIkxngioiGnwkxGQxQ4NzPAHg+XSY0J\n"
+ "04pNm7KqTkgtxyrqOANJLIjXlR+U9SQ90NjHVQIDAQABAoIBABuBPOKaWcJt3yzC\n"
+ "NGGduoif7KtwSnEaUA+v69KPGa2Zju8uFHPssKD+4dZYRc2qMeunKJLpaGaSjnRh\n"
+ "yHyvvOBJCN1nr3lhz6gY5kzJTfwpUFXCOPJlGy4Q+2Xnp4YvcvYqQ9n5DVovDiZ8\n"
+ "vJOBn16xqpudMPLHIa7D5LJ8SY76HBjE+imTXw1EShdh5TOV9bmPFQqH6JFzowRH\n"
+ "hyH2DPHuyHJj6cl8FyqJw5lVWzG3n6Prvk7bYHsjmGjurN35UsumNAp6VouNyUP1\n"
+ "RAEcUJega49aIs6/FJ0ENJzQjlsAzVbTleHkpez2aIok+wsWJGJ4SVxAjADOWAaZ\n"
+ "uEJPc3UCgYEA1g4ZGrXOuo75p9/MRIepXGpBWxip4V7B9XmO9WzPCv8nMorJntWB\n"
+ "msYV1I01aITxadHatO4Gl2xLniNkDyrEQzJ7w38RQgsVK+CqbnC0K9N77QPbHeC1\n"
+ "YQd9RCNyUohOimKvb7jyv798FBU1GO5QI2eNgfnnfteSVXhD2iOoTOsCgYEAxJJ+\n"
+ "8toxJdnLa0uUsAbql6zeNXGbUBMzu3FomKlyuWuq841jS2kIalaO/TRj5hbnE45j\n"
+ "mCjeLgTVO6Ach3Wfk4zrqajqfFJ0zUg/Wexp49lC3RWiV4icBb85Q6bzeJD9Dn9v\n"
+ "hjpfWVkczf/NeA1fGH/pcgfkT6Dm706GFFttLL8CgYBl/HeXk1H47xAiHO4dJKnb\n"
+ "v0B+X8To/RXamF01r+8BpUoOubOQetdyX7ic+d6deuHu8i6LD/GSCeYJZYFR/KVg\n"
+ "AtiW757QYalnq3ZogkhFrVCZP8IRfTPOFBxp752TlyAcrSI7T9pQ47IBe4094KXM\n"
+ "CJWSfPgAJkOxd0iU0XJpmwKBgGfQxuMTgSlwYRKFlD1zKap5TdID8fbUbVnth0Q5\n"
+ "GbH7vwlp/qrxCdS/aj0n0irOpbOaW9ccnlrHiqY25VpVMLYIkt3DrDOEiNNx+KNR\n"
+ "TItdTwbcSiTYrS4L0/56ydM/H6bsfsXxRjI18hSJqMZiqXqS84OZz2aOn+h7HCzc\n"
+ "LEiZAoGASk20wFvilpRKHq79xxFWiDUPHi0x0pp82dYIEntGQkKUWkbSlhgf3MAi\n"
+ "5NEQTDmXdnB+rVeWIvEi+BXfdnNgdn8eC4zSdtF4sIAhYr5VWZo0WVWDhT7u2ccv\n"
+ "ZBFymiz8lo3gN57wGUCi9pbZqzV1+ZppX6YTNDdDCE0q+KO3Cec=\n"
+ "-----END RSA PRIVATE KEY-----"
+)
+
+TEST_RSA_OAUTH_SIGNATURE = (
+ "j8WF8PGjojT82aUDd2EL%2Bz7HCoHInFzWUpiEKMCy%2BJ2cYHWcBS7mXlmFDLgAKV0"
+ "P%2FyX4TrpXODYnJ6dRWdfghqwDpi%2FlQmB2jxCiGMdJoYxh3c5zDf26gEbGdP6D7O"
+ "Ssp5HUnzH6sNkmVjuE%2FxoJcHJdc23H6GhOs7VJ2LWNdbhKWP%2FMMlTrcoQDn8lz"
+ "%2Fb24WsJ6ae1txkUzpFOOlLM8aTdNtGL4OtsubOlRhNqnAFq93FyhXg0KjzUyIZzmMX"
+ "9Vx90jTks5QeBGYcLE0Op2iHb2u%2FO%2BEgdwFchgEwE5LgMUyHUI4F3Wglp28yHOAM"
+ "jPkI%2FkWMvpxtMrU3Z3KN31WQ%3D%3D"
+)
+
+
+class OAuth1SessionTest(unittest.TestCase):
+ def test_signature_types(self):
+ def verify_signature(getter):
+ def fake_send(r, **kwargs):
+ signature = getter(r)
+ if isinstance(signature, bytes):
+ signature = signature.decode("utf-8")
+ self.assertIn("oauth_signature", signature)
+ resp = mock.MagicMock(spec=requests.Response)
+ resp.cookies = []
+ return resp
+
+ return fake_send
+
+ header = OAuth1Session("foo")
+ header.send = verify_signature(lambda r: r.headers["Authorization"])
+ header.post("https://i.b")
+
+ query = OAuth1Session("foo", signature_type=SIGNATURE_TYPE_QUERY)
+ query.send = verify_signature(lambda r: r.url)
+ query.post("https://i.b")
+
+ body = OAuth1Session("foo", signature_type=SIGNATURE_TYPE_BODY)
+ headers = {"Content-Type": "application/x-www-form-urlencoded"}
+ body.send = verify_signature(lambda r: r.body)
+ body.post("https://i.b", headers=headers, data="")
+
+ @mock.patch("oauthlib.oauth1.rfc5849.generate_timestamp")
+ @mock.patch("oauthlib.oauth1.rfc5849.generate_nonce")
+ def test_signature_methods(self, generate_nonce, generate_timestamp):
+ if not cryptography:
+ raise unittest.SkipTest("cryptography module is required")
+ if not jwt:
+ raise unittest.SkipTest("pyjwt module is required")
+
+ generate_nonce.return_value = "abc"
+ generate_timestamp.return_value = "123"
+
+ signature = 'OAuth oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", oauth_signature_method="HMAC-SHA1", oauth_consumer_key="foo", oauth_signature="h2sRqLArjhlc5p3FTkuNogVHlKE%3D"'
+ auth = OAuth1Session("foo")
+ auth.send = self.verify_signature(signature)
+ auth.post("https://i.b")
+
+ signature = 'OAuth oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", oauth_signature_method="PLAINTEXT", oauth_consumer_key="foo", oauth_signature="%26"'
+ auth = OAuth1Session("foo", signature_method=SIGNATURE_PLAINTEXT)
+ auth.send = self.verify_signature(signature)
+ auth.post("https://i.b")
+
+ signature = (
+ "OAuth "
+ 'oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", '
+ 'oauth_signature_method="RSA-SHA1", oauth_consumer_key="foo", '
+ 'oauth_signature="{sig}"'
+ ).format(sig=TEST_RSA_OAUTH_SIGNATURE)
+ auth = OAuth1Session(
+ "foo", signature_method=SIGNATURE_RSA, rsa_key=TEST_RSA_KEY
+ )
+ auth.send = self.verify_signature(signature)
+ auth.post("https://i.b")
+
+ @mock.patch("oauthlib.oauth1.rfc5849.generate_timestamp")
+ @mock.patch("oauthlib.oauth1.rfc5849.generate_nonce")
+ def test_binary_upload(self, generate_nonce, generate_timestamp):
+ generate_nonce.return_value = "abc"
+ generate_timestamp.return_value = "123"
+ fake_xml = StringIO("hello world")
+ headers = {"Content-Type": "application/xml"}
+ signature = 'OAuth oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", oauth_signature_method="HMAC-SHA1", oauth_consumer_key="foo", oauth_signature="h2sRqLArjhlc5p3FTkuNogVHlKE%3D"'
+ auth = OAuth1Session("foo")
+ auth.send = self.verify_signature(signature)
+ auth.post("https://i.b", headers=headers, files=[("fake", fake_xml)])
+
+ @mock.patch("oauthlib.oauth1.rfc5849.generate_timestamp")
+ @mock.patch("oauthlib.oauth1.rfc5849.generate_nonce")
+ def test_nonascii(self, generate_nonce, generate_timestamp):
+ generate_nonce.return_value = "abc"
+ generate_timestamp.return_value = "123"
+ signature = 'OAuth oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", oauth_signature_method="HMAC-SHA1", oauth_consumer_key="foo", oauth_signature="W0haoue5IZAZoaJiYCtfqwMf8x8%3D"'
+ auth = OAuth1Session("foo")
+ auth.send = self.verify_signature(signature)
+ auth.post("https://i.b?cjk=%E5%95%A6%E5%95%A6")
+
+ def test_authorization_url(self):
+ auth = OAuth1Session("foo")
+ url = "https://example.comm/authorize"
+ token = "asluif023sf"
+ auth_url = auth.authorization_url(url, request_token=token)
+ self.assertEqual(auth_url, url + "?oauth_token=" + token)
+
+ def test_parse_response_url(self):
+ url = "https://i.b/callback?oauth_token=foo&oauth_verifier=bar"
+ auth = OAuth1Session("foo")
+ resp = auth.parse_authorization_response(url)
+ self.assertEqual(resp["oauth_token"], "foo")
+ self.assertEqual(resp["oauth_verifier"], "bar")
+ for k, v in resp.items():
+ self.assertIsInstance(k, unicode_type)
+ self.assertIsInstance(v, unicode_type)
+
+ def test_fetch_request_token(self):
+ auth = OAuth1Session("foo")
+ auth.send = self.fake_body("oauth_token=foo")
+ resp = auth.fetch_request_token("https://example.com/token")
+ self.assertEqual(resp["oauth_token"], "foo")
+ for k, v in resp.items():
+ self.assertIsInstance(k, unicode_type)
+ self.assertIsInstance(v, unicode_type)
+
+ def test_fetch_request_token_with_optional_arguments(self):
+ auth = OAuth1Session("foo")
+ auth.send = self.fake_body("oauth_token=foo")
+ resp = auth.fetch_request_token(
+ "https://example.com/token", verify=False, stream=True
+ )
+ self.assertEqual(resp["oauth_token"], "foo")
+ for k, v in resp.items():
+ self.assertIsInstance(k, unicode_type)
+ self.assertIsInstance(v, unicode_type)
+
+ def test_fetch_access_token(self):
+ auth = OAuth1Session("foo", verifier="bar")
+ auth.send = self.fake_body("oauth_token=foo")
+ resp = auth.fetch_access_token("https://example.com/token")
+ self.assertEqual(resp["oauth_token"], "foo")
+ for k, v in resp.items():
+ self.assertIsInstance(k, unicode_type)
+ self.assertIsInstance(v, unicode_type)
+
+ def test_fetch_access_token_with_optional_arguments(self):
+ auth = OAuth1Session("foo", verifier="bar")
+ auth.send = self.fake_body("oauth_token=foo")
+ resp = auth.fetch_access_token(
+ "https://example.com/token", verify=False, stream=True
+ )
+ self.assertEqual(resp["oauth_token"], "foo")
+ for k, v in resp.items():
+ self.assertIsInstance(k, unicode_type)
+ self.assertIsInstance(v, unicode_type)
+
+ def _test_fetch_access_token_raises_error(self, auth):
+ """Assert that an error is being raised whenever there's no verifier
+ passed in to the client.
+ """
+ auth.send = self.fake_body("oauth_token=foo")
+ with self.assertRaises(ValueError) as cm:
+ auth.fetch_access_token("https://example.com/token")
+ self.assertEqual("No client verifier has been set.", str(cm.exception))
+
+ def test_fetch_token_invalid_response(self):
+ auth = OAuth1Session("foo")
+ auth.send = self.fake_body("not valid urlencoded response!")
+ self.assertRaises(
+ ValueError, auth.fetch_request_token, "https://example.com/token"
+ )
+
+ for code in (400, 401, 403):
+ auth.send = self.fake_body("valid=response", code)
+ with self.assertRaises(ValueError) as cm:
+ auth.fetch_request_token("https://example.com/token")
+ self.assertEqual(cm.exception.status_code, code)
+ self.assertIsInstance(cm.exception.response, requests.Response)
+
+ def test_fetch_access_token_missing_verifier(self):
+ self._test_fetch_access_token_raises_error(OAuth1Session("foo"))
+
+ def test_fetch_access_token_has_verifier_is_none(self):
+ auth = OAuth1Session("foo")
+ del auth._client.client.verifier
+ self._test_fetch_access_token_raises_error(auth)
+
+ def test_token_proxy_set(self):
+ token = {
+ "oauth_token": "fake-key",
+ "oauth_token_secret": "fake-secret",
+ "oauth_verifier": "fake-verifier",
+ }
+ sess = OAuth1Session("foo")
+ self.assertIsNone(sess._client.client.resource_owner_key)
+ self.assertIsNone(sess._client.client.resource_owner_secret)
+ self.assertIsNone(sess._client.client.verifier)
+ self.assertEqual(sess.token, {})
+
+ sess.token = token
+ self.assertEqual(sess._client.client.resource_owner_key, "fake-key")
+ self.assertEqual(sess._client.client.resource_owner_secret, "fake-secret")
+ self.assertEqual(sess._client.client.verifier, "fake-verifier")
+
+ def test_token_proxy_get(self):
+ token = {
+ "oauth_token": "fake-key",
+ "oauth_token_secret": "fake-secret",
+ "oauth_verifier": "fake-verifier",
+ }
+ sess = OAuth1Session(
+ "foo",
+ resource_owner_key=token["oauth_token"],
+ resource_owner_secret=token["oauth_token_secret"],
+ verifier=token["oauth_verifier"],
+ )
+ self.assertEqual(sess.token, token)
+
+ sess._client.client.resource_owner_key = "different-key"
+ token["oauth_token"] = "different-key"
+
+ self.assertEqual(sess.token, token)
+
+ def test_authorized_false(self):
+ sess = OAuth1Session("foo")
+ self.assertIs(sess.authorized, False)
+
+ def test_authorized_false_rsa(self):
+ signature = (
+ "OAuth "
+ 'oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", '
+ 'oauth_signature_method="RSA-SHA1", oauth_consumer_key="foo", '
+ 'oauth_signature="{sig}"'
+ ).format(sig=TEST_RSA_OAUTH_SIGNATURE)
+ sess = OAuth1Session(
+ "foo", signature_method=SIGNATURE_RSA, rsa_key=TEST_RSA_KEY
+ )
+ sess.send = self.verify_signature(signature)
+ self.assertIs(sess.authorized, False)
+
+ def test_authorized_true(self):
+ sess = OAuth1Session("key", "secret", verifier="bar")
+ sess.send = self.fake_body("oauth_token=foo&oauth_token_secret=bar")
+ sess.fetch_access_token("https://example.com/token")
+ self.assertIs(sess.authorized, True)
+
+ @mock.patch("oauthlib.oauth1.rfc5849.generate_timestamp")
+ @mock.patch("oauthlib.oauth1.rfc5849.generate_nonce")
+ def test_authorized_true_rsa(self, generate_nonce, generate_timestamp):
+ if not cryptography:
+ raise unittest.SkipTest("cryptography module is required")
+ if not jwt:
+ raise unittest.SkipTest("pyjwt module is required")
+
+ generate_nonce.return_value = "abc"
+ generate_timestamp.return_value = "123"
+ signature = (
+ "OAuth "
+ 'oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", '
+ 'oauth_signature_method="RSA-SHA1", oauth_consumer_key="foo", '
+ 'oauth_verifier="bar", oauth_signature="{sig}"'
+ ).format(sig=TEST_RSA_OAUTH_SIGNATURE)
+ sess = OAuth1Session(
+ "key",
+ "secret",
+ signature_method=SIGNATURE_RSA,
+ rsa_key=TEST_RSA_KEY,
+ verifier="bar",
+ )
+ sess.send = self.fake_body("oauth_token=foo&oauth_token_secret=bar")
+ sess.fetch_access_token("https://example.com/token")
+ self.assertIs(sess.authorized, True)
+
+ def verify_signature(self, signature):
+ def fake_send(r, **kwargs):
+ auth_header = r.headers["Authorization"]
+ if isinstance(auth_header, bytes):
+ auth_header = auth_header.decode("utf-8")
+ self.assertEqual(auth_header, signature)
+ resp = mock.MagicMock(spec=requests.Response)
+ resp.cookies = []
+ return resp
+
+ return fake_send
+
+ def fake_body(self, body, status_code=200):
+ def fake_send(r, **kwargs):
+ resp = mock.MagicMock(spec=requests.Response)
+ resp.cookies = []
+ resp.text = body
+ resp.status_code = status_code
+ return resp
+
+ return fake_send
diff --git a/contrib/python/requests-oauthlib/tests/test_oauth2_auth.py b/contrib/python/requests-oauthlib/tests/test_oauth2_auth.py
new file mode 100644
index 00000000000..accb561ef6c
--- /dev/null
+++ b/contrib/python/requests-oauthlib/tests/test_oauth2_auth.py
@@ -0,0 +1,54 @@
+from __future__ import unicode_literals
+import unittest
+
+from oauthlib.oauth2 import WebApplicationClient, MobileApplicationClient
+from oauthlib.oauth2 import LegacyApplicationClient, BackendApplicationClient
+from requests import Request
+from requests_oauthlib import OAuth2
+
+
+class OAuth2AuthTest(unittest.TestCase):
+ def setUp(self):
+ self.token = {
+ "token_type": "Bearer",
+ "access_token": "asdfoiw37850234lkjsdfsdf",
+ "expires_in": "3600",
+ }
+ self.client_id = "foo"
+ self.clients = [
+ WebApplicationClient(self.client_id),
+ MobileApplicationClient(self.client_id),
+ LegacyApplicationClient(self.client_id),
+ BackendApplicationClient(self.client_id),
+ ]
+
+ def test_add_token_to_url(self):
+ url = "https://example.com/resource?foo=bar"
+ new_url = url + "&access_token=" + self.token["access_token"]
+ for client in self.clients:
+ client.default_token_placement = "query"
+ auth = OAuth2(client=client, token=self.token)
+ r = Request("GET", url, auth=auth).prepare()
+ self.assertEqual(r.url, new_url)
+
+ def test_add_token_to_headers(self):
+ token = "Bearer " + self.token["access_token"]
+ for client in self.clients:
+ auth = OAuth2(client=client, token=self.token)
+ r = Request("GET", "https://i.b", auth=auth).prepare()
+ self.assertEqual(r.headers["Authorization"], token)
+
+ def test_add_token_to_body(self):
+ body = "foo=bar"
+ new_body = body + "&access_token=" + self.token["access_token"]
+ for client in self.clients:
+ client.default_token_placement = "body"
+ auth = OAuth2(client=client, token=self.token)
+ r = Request("GET", "https://i.b", data=body, auth=auth).prepare()
+ self.assertEqual(r.body, new_body)
+
+ def test_add_nonexisting_token(self):
+ for client in self.clients:
+ auth = OAuth2(client=client)
+ r = Request("GET", "https://i.b", auth=auth)
+ self.assertRaises(ValueError, r.prepare)
diff --git a/contrib/python/requests-oauthlib/tests/test_oauth2_session.py b/contrib/python/requests-oauthlib/tests/test_oauth2_session.py
new file mode 100644
index 00000000000..cfc6236855c
--- /dev/null
+++ b/contrib/python/requests-oauthlib/tests/test_oauth2_session.py
@@ -0,0 +1,527 @@
+from __future__ import unicode_literals
+import json
+import time
+import tempfile
+import shutil
+import os
+from base64 import b64encode
+from copy import deepcopy
+from unittest import TestCase
+
+try:
+ import mock
+except ImportError:
+ from unittest import mock
+
+from oauthlib.common import urlencode
+from oauthlib.oauth2 import TokenExpiredError, OAuth2Error
+from oauthlib.oauth2 import MismatchingStateError
+from oauthlib.oauth2 import WebApplicationClient, MobileApplicationClient
+from oauthlib.oauth2 import LegacyApplicationClient, BackendApplicationClient
+from requests_oauthlib import OAuth2Session, TokenUpdated
+import requests
+
+from requests.auth import _basic_auth_str
+
+
+fake_time = time.time()
+CODE = "asdf345xdf"
+
+
+def fake_token(token):
+ def fake_send(r, **kwargs):
+ resp = mock.MagicMock()
+ resp.text = json.dumps(token)
+ return resp
+
+ return fake_send
+
+
+class OAuth2SessionTest(TestCase):
+ def setUp(self):
+ self.token = {
+ "token_type": "Bearer",
+ "access_token": "asdfoiw37850234lkjsdfsdf",
+ "refresh_token": "sldvafkjw34509s8dfsdf",
+ "expires_in": 3600,
+ "expires_at": fake_time + 3600,
+ }
+ # use someclientid:someclientsecret to easily differentiate between client and user credentials
+ # these are the values used in oauthlib tests
+ self.client_id = "someclientid"
+ self.client_secret = "someclientsecret"
+ self.user_username = "user_username"
+ self.user_password = "user_password"
+ self.client_WebApplication = WebApplicationClient(self.client_id, code=CODE)
+ self.client_LegacyApplication = LegacyApplicationClient(self.client_id)
+ self.client_BackendApplication = BackendApplicationClient(self.client_id)
+ self.client_MobileApplication = MobileApplicationClient(self.client_id)
+ self.clients = [
+ self.client_WebApplication,
+ self.client_LegacyApplication,
+ self.client_BackendApplication,
+ ]
+ self.all_clients = self.clients + [self.client_MobileApplication]
+
+ def test_add_token(self):
+ token = "Bearer " + self.token["access_token"]
+
+ def verifier(r, **kwargs):
+ auth_header = r.headers.get(str("Authorization"), None)
+ self.assertEqual(auth_header, token)
+ resp = mock.MagicMock()
+ resp.cookes = []
+ return resp
+
+ for client in self.all_clients:
+ sess = OAuth2Session(client=client, token=self.token)
+ sess.send = verifier
+ sess.get("https://i.b")
+
+ def test_mtls(self):
+ cert = (
+ "testsomething.example-client.pem",
+ "testsomething.example-client-key.pem",
+ )
+
+ def verifier(r, **kwargs):
+ self.assertIn("cert", kwargs)
+ self.assertEqual(cert, kwargs["cert"])
+ self.assertIn("client_id=" + self.client_id, r.body)
+ resp = mock.MagicMock()
+ resp.text = json.dumps(self.token)
+ return resp
+
+ for client in self.clients:
+ sess = OAuth2Session(client=client)
+ sess.send = verifier
+
+ if isinstance(client, LegacyApplicationClient):
+ sess.fetch_token(
+ "https://i.b",
+ include_client_id=True,
+ cert=cert,
+ username="username1",
+ password="password1",
+ )
+ else:
+ sess.fetch_token("https://i.b", include_client_id=True, cert=cert)
+
+ def test_authorization_url(self):
+ url = "https://example.com/authorize?foo=bar"
+
+ web = WebApplicationClient(self.client_id)
+ s = OAuth2Session(client=web)
+ auth_url, state = s.authorization_url(url)
+ self.assertIn(state, auth_url)
+ self.assertIn(self.client_id, auth_url)
+ self.assertIn("response_type=code", auth_url)
+
+ mobile = MobileApplicationClient(self.client_id)
+ s = OAuth2Session(client=mobile)
+ auth_url, state = s.authorization_url(url)
+ self.assertIn(state, auth_url)
+ self.assertIn(self.client_id, auth_url)
+ self.assertIn("response_type=token", auth_url)
+
+ @mock.patch("time.time", new=lambda: fake_time)
+ def test_refresh_token_request(self):
+ self.expired_token = dict(self.token)
+ self.expired_token["expires_in"] = "-1"
+ del self.expired_token["expires_at"]
+
+ def fake_refresh(r, **kwargs):
+ if "/refresh" in r.url:
+ self.assertNotIn("Authorization", r.headers)
+ resp = mock.MagicMock()
+ resp.text = json.dumps(self.token)
+ return resp
+
+ # No auto refresh setup
+ for client in self.clients:
+ sess = OAuth2Session(client=client, token=self.expired_token)
+ self.assertRaises(TokenExpiredError, sess.get, "https://i.b")
+
+ # Auto refresh but no auto update
+ for client in self.clients:
+ sess = OAuth2Session(
+ client=client,
+ token=self.expired_token,
+ auto_refresh_url="https://i.b/refresh",
+ )
+ sess.send = fake_refresh
+ self.assertRaises(TokenUpdated, sess.get, "https://i.b")
+
+ # Auto refresh and auto update
+ def token_updater(token):
+ self.assertEqual(token, self.token)
+
+ for client in self.clients:
+ sess = OAuth2Session(
+ client=client,
+ token=self.expired_token,
+ auto_refresh_url="https://i.b/refresh",
+ token_updater=token_updater,
+ )
+ sess.send = fake_refresh
+ sess.get("https://i.b")
+
+ def fake_refresh_with_auth(r, **kwargs):
+ if "/refresh" in r.url:
+ self.assertIn("Authorization", r.headers)
+ encoded = b64encode(
+ "{client_id}:{client_secret}".format(
+ client_id=self.client_id, client_secret=self.client_secret
+ ).encode("latin1")
+ )
+ content = "Basic {encoded}".format(encoded=encoded.decode("latin1"))
+ self.assertEqual(r.headers["Authorization"], content)
+ resp = mock.MagicMock()
+ resp.text = json.dumps(self.token)
+ return resp
+
+ for client in self.clients:
+ sess = OAuth2Session(
+ client=client,
+ token=self.expired_token,
+ auto_refresh_url="https://i.b/refresh",
+ token_updater=token_updater,
+ )
+ sess.send = fake_refresh_with_auth
+ sess.get(
+ "https://i.b",
+ client_id=self.client_id,
+ client_secret=self.client_secret,
+ )
+
+ @mock.patch("time.time", new=lambda: fake_time)
+ def test_token_from_fragment(self):
+ mobile = MobileApplicationClient(self.client_id)
+ response_url = "https://i.b/callback#" + urlencode(self.token.items())
+ sess = OAuth2Session(client=mobile)
+ self.assertEqual(sess.token_from_fragment(response_url), self.token)
+
+ @mock.patch("time.time", new=lambda: fake_time)
+ def test_fetch_token(self):
+ url = "https://example.com/token"
+
+ for client in self.clients:
+ sess = OAuth2Session(client=client, token=self.token)
+ sess.send = fake_token(self.token)
+ if isinstance(client, LegacyApplicationClient):
+ # this client requires a username+password
+ # if unset, an error will be raised
+ self.assertRaises(ValueError, sess.fetch_token, url)
+ self.assertRaises(
+ ValueError, sess.fetch_token, url, username="username1"
+ )
+ self.assertRaises(
+ ValueError, sess.fetch_token, url, password="password1"
+ )
+ # otherwise it will pass
+ self.assertEqual(
+ sess.fetch_token(url, username="username1", password="password1"),
+ self.token,
+ )
+ else:
+ self.assertEqual(sess.fetch_token(url), self.token)
+
+ error = {"error": "invalid_request"}
+ for client in self.clients:
+ sess = OAuth2Session(client=client, token=self.token)
+ sess.send = fake_token(error)
+ if isinstance(client, LegacyApplicationClient):
+ # this client requires a username+password
+ # if unset, an error will be raised
+ self.assertRaises(ValueError, sess.fetch_token, url)
+ self.assertRaises(
+ ValueError, sess.fetch_token, url, username="username1"
+ )
+ self.assertRaises(
+ ValueError, sess.fetch_token, url, password="password1"
+ )
+ # otherwise it will pass
+ self.assertRaises(
+ OAuth2Error,
+ sess.fetch_token,
+ url,
+ username="username1",
+ password="password1",
+ )
+ else:
+ self.assertRaises(OAuth2Error, sess.fetch_token, url)
+
+ # there are different scenarios in which the `client_id` can be specified
+ # reference `oauthlib.tests.oauth2.rfc6749.clients.test_web_application.WebApplicationClientTest.test_prepare_request_body`
+ # this only needs to test WebApplicationClient
+ client = self.client_WebApplication
+ client.tester = True
+
+ # this should be a tuple of (r.url, r.body, r.headers.get('Authorization'))
+ _fetch_history = []
+
+ def fake_token_history(token):
+ def fake_send(r, **kwargs):
+ resp = mock.MagicMock()
+ resp.text = json.dumps(token)
+ _fetch_history.append(
+ (r.url, r.body, r.headers.get("Authorization", None))
+ )
+ return resp
+
+ return fake_send
+
+ sess = OAuth2Session(client=client, token=self.token)
+ sess.send = fake_token_history(self.token)
+ expected_auth_header = _basic_auth_str(self.client_id, self.client_secret)
+
+ # scenario 1 - default request
+ # this should send the `client_id` in the headers, as that is recommended by the RFC
+ self.assertEqual(
+ sess.fetch_token(url, client_secret="someclientsecret"), self.token
+ )
+ self.assertEqual(len(_fetch_history), 1)
+ self.assertNotIn(
+ "client_id", _fetch_history[0][1]
+ ) # no `client_id` in the body
+ self.assertNotIn(
+ "client_secret", _fetch_history[0][1]
+ ) # no `client_secret` in the body
+ self.assertEqual(
+ _fetch_history[0][2], expected_auth_header
+ ) # ensure a Basic Authorization header
+
+ # scenario 2 - force the `client_id` into the body
+ self.assertEqual(
+ sess.fetch_token(
+ url, client_secret="someclientsecret", include_client_id=True
+ ),
+ self.token,
+ )
+ self.assertEqual(len(_fetch_history), 2)
+ self.assertIn("client_id=%s" % self.client_id, _fetch_history[1][1])
+ self.assertIn("client_secret=%s" % self.client_secret, _fetch_history[1][1])
+ self.assertEqual(
+ _fetch_history[1][2], None
+ ) # ensure NO Basic Authorization header
+
+ # scenario 3 - send in an auth object
+ auth = requests.auth.HTTPBasicAuth(self.client_id, self.client_secret)
+ self.assertEqual(sess.fetch_token(url, auth=auth), self.token)
+ self.assertEqual(len(_fetch_history), 3)
+ self.assertNotIn(
+ "client_id", _fetch_history[2][1]
+ ) # no `client_id` in the body
+ self.assertNotIn(
+ "client_secret", _fetch_history[2][1]
+ ) # no `client_secret` in the body
+ self.assertEqual(
+ _fetch_history[2][2], expected_auth_header
+ ) # ensure a Basic Authorization header
+
+ # scenario 4 - send in a username/password combo
+ # this should send the `client_id` in the headers, like scenario 1
+ self.assertEqual(
+ sess.fetch_token(
+ url, username=self.user_username, password=self.user_password
+ ),
+ self.token,
+ )
+ self.assertEqual(len(_fetch_history), 4)
+ self.assertNotIn(
+ "client_id", _fetch_history[3][1]
+ ) # no `client_id` in the body
+ self.assertNotIn(
+ "client_secret", _fetch_history[3][1]
+ ) # no `client_secret` in the body
+ self.assertEqual(
+ _fetch_history[0][2], expected_auth_header
+ ) # ensure a Basic Authorization header
+ self.assertIn("username=%s" % self.user_username, _fetch_history[3][1])
+ self.assertIn("password=%s" % self.user_password, _fetch_history[3][1])
+
+ # scenario 5 - send data in `params` and not in `data` for providers
+ # that expect data in URL
+ self.assertEqual(
+ sess.fetch_token(url, client_secret="somesecret", force_querystring=True),
+ self.token,
+ )
+ self.assertIn("code=%s" % CODE, _fetch_history[4][0])
+
+ # some quick tests for valid ways of supporting `client_secret`
+
+ # scenario 2b - force the `client_id` into the body; but the `client_secret` is `None`
+ self.assertEqual(
+ sess.fetch_token(url, client_secret=None, include_client_id=True),
+ self.token,
+ )
+ self.assertEqual(len(_fetch_history), 6)
+ self.assertIn("client_id=%s" % self.client_id, _fetch_history[5][1])
+ self.assertNotIn(
+ "client_secret=", _fetch_history[5][1]
+ ) # no `client_secret` in the body
+ self.assertEqual(
+ _fetch_history[5][2], None
+ ) # ensure NO Basic Authorization header
+
+ # scenario 2c - force the `client_id` into the body; but the `client_secret` is an empty string
+ self.assertEqual(
+ sess.fetch_token(url, client_secret="", include_client_id=True), self.token
+ )
+ self.assertEqual(len(_fetch_history), 7)
+ self.assertIn("client_id=%s" % self.client_id, _fetch_history[6][1])
+ self.assertIn("client_secret=", _fetch_history[6][1])
+ self.assertEqual(
+ _fetch_history[6][2], None
+ ) # ensure NO Basic Authorization header
+
+ def test_cleans_previous_token_before_fetching_new_one(self):
+ """Makes sure the previous token is cleaned before fetching a new one.
+
+ The reason behind it is that, if the previous token is expired, this
+ method shouldn't fail with a TokenExpiredError, since it's attempting
+ to get a new one (which shouldn't be expired).
+
+ """
+ new_token = deepcopy(self.token)
+ past = time.time() - 7200
+ now = time.time()
+ self.token["expires_at"] = past
+ new_token["expires_at"] = now + 3600
+ url = "https://example.com/token"
+
+ with mock.patch("time.time", lambda: now):
+ for client in self.clients:
+ sess = OAuth2Session(client=client, token=self.token)
+ sess.send = fake_token(new_token)
+ if isinstance(client, LegacyApplicationClient):
+ # this client requires a username+password
+ # if unset, an error will be raised
+ self.assertRaises(ValueError, sess.fetch_token, url)
+ self.assertRaises(
+ ValueError, sess.fetch_token, url, username="username1"
+ )
+ self.assertRaises(
+ ValueError, sess.fetch_token, url, password="password1"
+ )
+ # otherwise it will pass
+ self.assertEqual(
+ sess.fetch_token(
+ url, username="username1", password="password1"
+ ),
+ new_token,
+ )
+ else:
+ self.assertEqual(sess.fetch_token(url), new_token)
+
+ def test_web_app_fetch_token(self):
+ # Ensure the state parameter is used, see issue #105.
+ client = OAuth2Session("someclientid", state="somestate")
+ self.assertRaises(
+ MismatchingStateError,
+ client.fetch_token,
+ "https://i.b/token",
+ authorization_response="https://i.b/no-state?code=abc",
+ )
+
+ def test_client_id_proxy(self):
+ sess = OAuth2Session("test-id")
+ self.assertEqual(sess.client_id, "test-id")
+ sess.client_id = "different-id"
+ self.assertEqual(sess.client_id, "different-id")
+ sess._client.client_id = "something-else"
+ self.assertEqual(sess.client_id, "something-else")
+ del sess.client_id
+ self.assertIsNone(sess.client_id)
+
+ def test_access_token_proxy(self):
+ sess = OAuth2Session("test-id")
+ self.assertIsNone(sess.access_token)
+ sess.access_token = "test-token"
+ self.assertEqual(sess.access_token, "test-token")
+ sess._client.access_token = "different-token"
+ self.assertEqual(sess.access_token, "different-token")
+ del sess.access_token
+ self.assertIsNone(sess.access_token)
+
+ def test_token_proxy(self):
+ token = {"access_token": "test-access"}
+ sess = OAuth2Session("test-id", token=token)
+ self.assertEqual(sess.access_token, "test-access")
+ self.assertEqual(sess.token, token)
+ token["access_token"] = "something-else"
+ sess.token = token
+ self.assertEqual(sess.access_token, "something-else")
+ self.assertEqual(sess.token, token)
+ sess._client.access_token = "different-token"
+ token["access_token"] = "different-token"
+ self.assertEqual(sess.access_token, "different-token")
+ self.assertEqual(sess.token, token)
+ # can't delete token attribute
+ with self.assertRaises(AttributeError):
+ del sess.token
+
+ def test_authorized_false(self):
+ sess = OAuth2Session("someclientid")
+ self.assertFalse(sess.authorized)
+
+ @mock.patch("time.time", new=lambda: fake_time)
+ def test_authorized_true(self):
+ def fake_token(token):
+ def fake_send(r, **kwargs):
+ resp = mock.MagicMock()
+ resp.text = json.dumps(token)
+ return resp
+
+ return fake_send
+
+ url = "https://example.com/token"
+
+ for client in self.clients:
+ sess = OAuth2Session(client=client)
+ sess.send = fake_token(self.token)
+ self.assertFalse(sess.authorized)
+ if isinstance(client, LegacyApplicationClient):
+ # this client requires a username+password
+ # if unset, an error will be raised
+ self.assertRaises(ValueError, sess.fetch_token, url)
+ self.assertRaises(
+ ValueError, sess.fetch_token, url, username="username1"
+ )
+ self.assertRaises(
+ ValueError, sess.fetch_token, url, password="password1"
+ )
+ # otherwise it will pass
+ sess.fetch_token(url, username="username1", password="password1")
+ else:
+ sess.fetch_token(url)
+ self.assertTrue(sess.authorized)
+
+
+class OAuth2SessionNetrcTest(OAuth2SessionTest):
+ """Ensure that there is no magic auth handling.
+
+ By default, requests sessions have magic handling of netrc files,
+ which is undesirable for this library because it will take
+ precedence over manually set authentication headers.
+ """
+
+ def setUp(self):
+ # Set up a temporary home directory
+ self.homedir = tempfile.mkdtemp()
+ self.prehome = os.environ.get("HOME", None)
+ os.environ["HOME"] = self.homedir
+
+ # Write a .netrc file that will cause problems
+ netrc_loc = os.path.expanduser("~/.netrc")
+ with open(netrc_loc, "w") as f:
+ f.write("machine i.b\n" " password abc123\n" " login spam@eggs.co\n")
+
+ super(OAuth2SessionNetrcTest, self).setUp()
+
+ def tearDown(self):
+ super(OAuth2SessionNetrcTest, self).tearDown()
+
+ if self.prehome is not None:
+ os.environ["HOME"] = self.prehome
+ shutil.rmtree(self.homedir)
diff --git a/contrib/python/requests-oauthlib/tests/ya.make b/contrib/python/requests-oauthlib/tests/ya.make
new file mode 100644
index 00000000000..a8f7328ae79
--- /dev/null
+++ b/contrib/python/requests-oauthlib/tests/ya.make
@@ -0,0 +1,28 @@
+PY3TEST()
+
+PEERDIR(
+ contrib/python/requests-oauthlib
+ contrib/python/requests-mock
+)
+
+# These tests use real http://httpbin.org that is why they are disabled:
+# testCanPostBinaryData
+# test_url_is_native_str
+# test_content_type_override
+
+TEST_SRCS(
+ __init__.py
+ test_compliance_fixes.py
+ test_core.py
+ test_oauth1_session.py
+ test_oauth2_auth.py
+ test_oauth2_session.py
+)
+
+DATA(
+ arcadia/contrib/python/requests-oauthlib/tests
+)
+
+NO_LINT()
+
+END()