diff options
author | alexv-smirnov <alex@ydb.tech> | 2023-12-01 12:02:50 +0300 |
---|---|---|
committer | alexv-smirnov <alex@ydb.tech> | 2023-12-01 13:28:10 +0300 |
commit | 0e578a4c44d4abd539d9838347b9ebafaca41dfb (patch) | |
tree | a0c1969c37f818c830ebeff9c077eacf30be6ef8 /contrib/python/requests-oauthlib/tests | |
parent | 84f2d3d4cc985e63217cff149bd2e6d67ae6fe22 (diff) | |
download | ydb-0e578a4c44d4abd539d9838347b9ebafaca41dfb.tar.gz |
Change "ya.make"
Diffstat (limited to 'contrib/python/requests-oauthlib/tests')
8 files changed, 1462 insertions, 0 deletions
diff --git a/contrib/python/requests-oauthlib/tests/__init__.py b/contrib/python/requests-oauthlib/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/contrib/python/requests-oauthlib/tests/__init__.py diff --git a/contrib/python/requests-oauthlib/tests/test.bin b/contrib/python/requests-oauthlib/tests/test.bin new file mode 100644 index 00000000000..b00d4f4796b --- /dev/null +++ b/contrib/python/requests-oauthlib/tests/test.bin @@ -0,0 +1 @@ +¥Æ
\ No newline at end of file diff --git a/contrib/python/requests-oauthlib/tests/test_compliance_fixes.py b/contrib/python/requests-oauthlib/tests/test_compliance_fixes.py new file mode 100644 index 00000000000..5c90d526608 --- /dev/null +++ b/contrib/python/requests-oauthlib/tests/test_compliance_fixes.py @@ -0,0 +1,334 @@ +from __future__ import unicode_literals +from unittest import TestCase + +import requests +import requests_mock +import time + +try: + from urlparse import urlparse, parse_qs +except ImportError: + from urllib.parse import urlparse, parse_qs + +from oauthlib.oauth2.rfc6749.errors import InvalidGrantError +from requests_oauthlib import OAuth2Session +from requests_oauthlib.compliance_fixes import facebook_compliance_fix +from requests_oauthlib.compliance_fixes import fitbit_compliance_fix +from requests_oauthlib.compliance_fixes import mailchimp_compliance_fix +from requests_oauthlib.compliance_fixes import weibo_compliance_fix +from requests_oauthlib.compliance_fixes import slack_compliance_fix +from requests_oauthlib.compliance_fixes import instagram_compliance_fix +from requests_oauthlib.compliance_fixes import plentymarkets_compliance_fix +from requests_oauthlib.compliance_fixes import ebay_compliance_fix + + +class FacebookComplianceFixTest(TestCase): + def setUp(self): + mocker = requests_mock.Mocker() + mocker.post( + "https://graph.facebook.com/oauth/access_token", + text="access_token=urlencoded", + headers={"Content-Type": "text/plain"}, + ) + mocker.start() + self.addCleanup(mocker.stop) + + facebook = OAuth2Session("someclientid", redirect_uri="https://i.b") + self.session = facebook_compliance_fix(facebook) + + def test_fetch_access_token(self): + token = self.session.fetch_token( + "https://graph.facebook.com/oauth/access_token", + client_secret="someclientsecret", + authorization_response="https://i.b/?code=hello", + ) + self.assertEqual(token, {"access_token": "urlencoded", "token_type": "Bearer"}) + + +class FitbitComplianceFixTest(TestCase): + def setUp(self): + self.mocker = requests_mock.Mocker() + self.mocker.post( + "https://api.fitbit.com/oauth2/token", + json={"errors": [{"errorType": "invalid_grant"}]}, + ) + self.mocker.start() + self.addCleanup(self.mocker.stop) + + fitbit = OAuth2Session("someclientid", redirect_uri="https://i.b") + self.session = fitbit_compliance_fix(fitbit) + + def test_fetch_access_token(self): + self.assertRaises( + InvalidGrantError, + self.session.fetch_token, + "https://api.fitbit.com/oauth2/token", + client_secret="someclientsecret", + authorization_response="https://i.b/?code=hello", + ) + + self.mocker.post( + "https://api.fitbit.com/oauth2/token", json={"access_token": "fitbit"} + ) + + token = self.session.fetch_token( + "https://api.fitbit.com/oauth2/token", client_secret="good" + ) + + self.assertEqual(token, {"access_token": "fitbit"}) + + def test_refresh_token(self): + self.assertRaises( + InvalidGrantError, + self.session.refresh_token, + "https://api.fitbit.com/oauth2/token", + auth=requests.auth.HTTPBasicAuth("someclientid", "someclientsecret"), + ) + + self.mocker.post( + "https://api.fitbit.com/oauth2/token", + json={"access_token": "access", "refresh_token": "refresh"}, + ) + + token = self.session.refresh_token( + "https://api.fitbit.com/oauth2/token", + auth=requests.auth.HTTPBasicAuth("someclientid", "someclientsecret"), + ) + + self.assertEqual(token["access_token"], "access") + self.assertEqual(token["refresh_token"], "refresh") + + +class MailChimpComplianceFixTest(TestCase): + def setUp(self): + mocker = requests_mock.Mocker() + mocker.post( + "https://login.mailchimp.com/oauth2/token", + json={"access_token": "mailchimp", "expires_in": 0, "scope": None}, + ) + mocker.start() + self.addCleanup(mocker.stop) + + mailchimp = OAuth2Session("someclientid", redirect_uri="https://i.b") + self.session = mailchimp_compliance_fix(mailchimp) + + def test_fetch_access_token(self): + token = self.session.fetch_token( + "https://login.mailchimp.com/oauth2/token", + client_secret="someclientsecret", + authorization_response="https://i.b/?code=hello", + ) + # Times should be close + approx_expires_at = time.time() + 3600 + actual_expires_at = token.pop("expires_at") + self.assertAlmostEqual(actual_expires_at, approx_expires_at, places=2) + + # Other token values exact + self.assertEqual(token, {"access_token": "mailchimp", "expires_in": 3600}) + + # And no scope at all + self.assertNotIn("scope", token) + + +class WeiboComplianceFixTest(TestCase): + def setUp(self): + mocker = requests_mock.Mocker() + mocker.post( + "https://api.weibo.com/oauth2/access_token", json={"access_token": "weibo"} + ) + mocker.start() + self.addCleanup(mocker.stop) + + weibo = OAuth2Session("someclientid", redirect_uri="https://i.b") + self.session = weibo_compliance_fix(weibo) + + def test_fetch_access_token(self): + token = self.session.fetch_token( + "https://api.weibo.com/oauth2/access_token", + client_secret="someclientsecret", + authorization_response="https://i.b/?code=hello", + ) + self.assertEqual(token, {"access_token": "weibo", "token_type": "Bearer"}) + + +class SlackComplianceFixTest(TestCase): + def setUp(self): + mocker = requests_mock.Mocker() + mocker.post( + "https://slack.com/api/oauth.access", + json={"access_token": "xoxt-23984754863-2348975623103", "scope": "read"}, + ) + for method in ("GET", "POST"): + mocker.request( + method=method, + url="https://slack.com/api/auth.test", + json={ + "ok": True, + "url": "https://myteam.slack.com/", + "team": "My Team", + "user": "cal", + "team_id": "T12345", + "user_id": "U12345", + }, + ) + mocker.start() + self.addCleanup(mocker.stop) + + slack = OAuth2Session("someclientid", redirect_uri="https://i.b") + self.session = slack_compliance_fix(slack) + + def test_protected_request(self): + self.session.token = {"access_token": "dummy-access-token"} + response = self.session.get("https://slack.com/api/auth.test") + url = response.request.url + query = parse_qs(urlparse(url).query) + self.assertNotIn("token", query) + body = response.request.body + data = parse_qs(body) + self.assertEqual(data["token"], ["dummy-access-token"]) + + def test_protected_request_override_token_get(self): + self.session.token = {"access_token": "dummy-access-token"} + response = self.session.get( + "https://slack.com/api/auth.test", data={"token": "different-token"} + ) + url = response.request.url + query = parse_qs(urlparse(url).query) + self.assertNotIn("token", query) + body = response.request.body + data = parse_qs(body) + self.assertEqual(data["token"], ["different-token"]) + + def test_protected_request_override_token_post(self): + self.session.token = {"access_token": "dummy-access-token"} + response = self.session.post( + "https://slack.com/api/auth.test", data={"token": "different-token"} + ) + url = response.request.url + query = parse_qs(urlparse(url).query) + self.assertNotIn("token", query) + body = response.request.body + data = parse_qs(body) + self.assertEqual(data["token"], ["different-token"]) + + def test_protected_request_override_token_url(self): + self.session.token = {"access_token": "dummy-access-token"} + response = self.session.get( + "https://slack.com/api/auth.test?token=different-token" + ) + url = response.request.url + query = parse_qs(urlparse(url).query) + self.assertEqual(query["token"], ["different-token"]) + self.assertIsNone(response.request.body) + + +class InstagramComplianceFixTest(TestCase): + def setUp(self): + mocker = requests_mock.Mocker() + mocker.request( + method="GET", + url="https://api.instagram.com/v1/users/self", + json={ + "data": { + "id": "1574083", + "username": "snoopdogg", + "full_name": "Snoop Dogg", + "profile_picture": "http://distillery.s3.amazonaws.com/profiles/profile_1574083_75sq_1295469061.jpg", + "bio": "This is my bio", + "website": "http://snoopdogg.com", + "is_business": False, + "counts": {"media": 1320, "follows": 420, "followed_by": 3410}, + } + }, + ) + mocker.start() + self.addCleanup(mocker.stop) + + instagram = OAuth2Session("someclientid", redirect_uri="https://i.b") + self.session = instagram_compliance_fix(instagram) + + def test_protected_request(self): + self.session.token = {"access_token": "dummy-access-token"} + response = self.session.get("https://api.instagram.com/v1/users/self") + url = response.request.url + query = parse_qs(urlparse(url).query) + self.assertIn("access_token", query) + self.assertEqual(query["access_token"], ["dummy-access-token"]) + + def test_protected_request_dont_override(self): + """check that if the access_token param + already exist we don't override it""" + self.session.token = {"access_token": "dummy-access-token"} + response = self.session.get( + "https://api.instagram.com/v1/users/self?access_token=correct-access-token" + ) + url = response.request.url + query = parse_qs(urlparse(url).query) + self.assertIn("access_token", query) + self.assertEqual(query["access_token"], ["correct-access-token"]) + + +class PlentymarketsComplianceFixTest(TestCase): + def setUp(self): + mocker = requests_mock.Mocker() + mocker.post( + "https://shop.plentymarkets-cloud02.com", + json={ + "accessToken": "ecUN1r8KhJewMCdLAmpHOdZ4O0ofXKB9zf6CXK61", + "tokenType": "Bearer", + "expiresIn": 86400, + "refreshToken": "iG2kBGIjcXaRE4xmTVUnv7xwxX7XMcWCHqJmFaSX", + }, + headers={"Content-Type": "application/json"}, + ) + mocker.start() + self.addCleanup(mocker.stop) + + plentymarkets = OAuth2Session("someclientid", redirect_uri="https://i.b") + self.session = plentymarkets_compliance_fix(plentymarkets) + + def test_fetch_access_token(self): + token = self.session.fetch_token( + "https://shop.plentymarkets-cloud02.com", + authorization_response="https://i.b/?code=hello", + ) + + approx_expires_at = time.time() + 86400 + actual_expires_at = token.pop("expires_at") + self.assertAlmostEqual(actual_expires_at, approx_expires_at, places=2) + + self.assertEqual( + token, + { + "access_token": "ecUN1r8KhJewMCdLAmpHOdZ4O0ofXKB9zf6CXK61", + "expires_in": 86400, + "token_type": "Bearer", + "refresh_token": "iG2kBGIjcXaRE4xmTVUnv7xwxX7XMcWCHqJmFaSX", + }, + ) + + +class EbayComplianceFixTest(TestCase): + def setUp(self): + mocker = requests_mock.Mocker() + mocker.post( + "https://api.ebay.com/identity/v1/oauth2/token", + json={ + "access_token": "this is the access token", + "expires_in": 7200, + "token_type": "Application Access Token", + }, + headers={"Content-Type": "application/json"}, + ) + mocker.start() + self.addCleanup(mocker.stop) + + session = OAuth2Session() + self.fixed_session = ebay_compliance_fix(session) + + def test_fetch_access_token(self): + token = self.fixed_session.fetch_token( + "https://api.ebay.com/identity/v1/oauth2/token", + authorization_response="https://i.b/?code=hello", + ) + assert token["token_type"] == "Bearer" diff --git a/contrib/python/requests-oauthlib/tests/test_core.py b/contrib/python/requests-oauthlib/tests/test_core.py new file mode 100644 index 00000000000..6892e9f1ce8 --- /dev/null +++ b/contrib/python/requests-oauthlib/tests/test_core.py @@ -0,0 +1,170 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals +import requests +import requests_oauthlib +import oauthlib +import os.path +from io import StringIO +import unittest + +try: + import mock +except ImportError: + from unittest import mock + + +@mock.patch("oauthlib.oauth1.rfc5849.generate_timestamp") +@mock.patch("oauthlib.oauth1.rfc5849.generate_nonce") +class OAuth1Test(unittest.TestCase): + def testFormEncoded(self, generate_nonce, generate_timestamp): + """OAuth1 assumes form encoded if content type is not specified.""" + generate_nonce.return_value = "abc" + generate_timestamp.return_value = "1" + oauth = requests_oauthlib.OAuth1("client_key") + headers = {"Content-type": "application/x-www-form-urlencoded"} + r = requests.Request( + method="POST", + url="http://a.b/path?query=retain", + auth=oauth, + data="this=really&is=&+form=encoded", + headers=headers, + ) + a = r.prepare() + + self.assertEqual(a.url, "http://a.b/path?query=retain") + self.assertEqual(a.body, b"this=really&is=&+form=encoded") + self.assertEqual( + a.headers.get("Content-Type"), b"application/x-www-form-urlencoded" + ) + + # guess content-type + r = requests.Request( + method="POST", + url="http://a.b/path?query=retain", + auth=oauth, + data="this=really&is=&+form=encoded", + ) + b = r.prepare() + self.assertEqual(b.url, "http://a.b/path?query=retain") + self.assertEqual(b.body, b"this=really&is=&+form=encoded") + self.assertEqual( + b.headers.get("Content-Type"), b"application/x-www-form-urlencoded" + ) + + self.assertEqual(a.headers.get("Authorization"), b.headers.get("Authorization")) + + def testNonFormEncoded(self, generate_nonce, generate_timestamp): + """OAuth signature only depend on body if it is form encoded.""" + generate_nonce.return_value = "abc" + generate_timestamp.return_value = "1" + oauth = requests_oauthlib.OAuth1("client_key") + + r = requests.Request( + method="POST", + url="http://a.b/path?query=retain", + auth=oauth, + data="this really is not form encoded", + ) + a = r.prepare() + + r = requests.Request( + method="POST", url="http://a.b/path?query=retain", auth=oauth + ) + b = r.prepare() + + self.assertEqual(a.headers.get("Authorization"), b.headers.get("Authorization")) + + r = requests.Request( + method="POST", + url="http://a.b/path?query=retain", + auth=oauth, + files={"test": StringIO("hello")}, + ) + c = r.prepare() + + self.assertEqual(b.headers.get("Authorization"), c.headers.get("Authorization")) + + @unittest.skip("test uses real http://httpbin.org") + def testCanPostBinaryData(self, generate_nonce, generate_timestamp): + """ + Test we can post binary data. Should prevent regression of the + UnicodeDecodeError issue. + """ + generate_nonce.return_value = "abc" + generate_timestamp.return_value = "1" + oauth = requests_oauthlib.OAuth1("client_key") + import yatest.common + dirname = yatest.common.test_source_path() + fname = os.path.join(dirname, "test.bin") + + with open(fname, "rb") as f: + r = requests.post( + "http://httpbin.org/post", + data={"hi": "there"}, + files={"media": (os.path.basename(f.name), f)}, + headers={"content-type": "application/octet-stream"}, + auth=oauth, + ) + self.assertEqual(r.status_code, 200) + + @unittest.skip("test uses real http://httpbin.org") + def test_url_is_native_str(self, generate_nonce, generate_timestamp): + """ + Test that the URL is always a native string. + """ + generate_nonce.return_value = "abc" + generate_timestamp.return_value = "1" + oauth = requests_oauthlib.OAuth1("client_key") + + r = requests.get("http://httpbin.org/get", auth=oauth) + self.assertIsInstance(r.request.url, str) + + @unittest.skip("test uses real http://httpbin.org") + def test_content_type_override(self, generate_nonce, generate_timestamp): + """ + Content type should only be guessed if none is given. + """ + generate_nonce.return_value = "abc" + generate_timestamp.return_value = "1" + oauth = requests_oauthlib.OAuth1("client_key") + data = "a" + r = requests.post("http://httpbin.org/get", data=data, auth=oauth) + self.assertEqual( + r.request.headers.get("Content-Type"), b"application/x-www-form-urlencoded" + ) + r = requests.post( + "http://httpbin.org/get", + auth=oauth, + data=data, + headers={"Content-type": "application/json"}, + ) + self.assertEqual(r.request.headers.get("Content-Type"), b"application/json") + + def test_register_client_class(self, generate_timestamp, generate_nonce): + class ClientSubclass(oauthlib.oauth1.Client): + pass + + self.assertTrue(hasattr(requests_oauthlib.OAuth1, "client_class")) + + self.assertEqual(requests_oauthlib.OAuth1.client_class, oauthlib.oauth1.Client) + + normal = requests_oauthlib.OAuth1("client_key") + + self.assertIsInstance(normal.client, oauthlib.oauth1.Client) + self.assertNotIsInstance(normal.client, ClientSubclass) + + requests_oauthlib.OAuth1.client_class = ClientSubclass + + self.assertEqual(requests_oauthlib.OAuth1.client_class, ClientSubclass) + + custom = requests_oauthlib.OAuth1("client_key") + + self.assertIsInstance(custom.client, oauthlib.oauth1.Client) + self.assertIsInstance(custom.client, ClientSubclass) + + overridden = requests_oauthlib.OAuth1( + "client_key", client_class=oauthlib.oauth1.Client + ) + + self.assertIsInstance(overridden.client, oauthlib.oauth1.Client) + self.assertNotIsInstance(normal.client, ClientSubclass) diff --git a/contrib/python/requests-oauthlib/tests/test_oauth1_session.py b/contrib/python/requests-oauthlib/tests/test_oauth1_session.py new file mode 100644 index 00000000000..1dd2b2f1586 --- /dev/null +++ b/contrib/python/requests-oauthlib/tests/test_oauth1_session.py @@ -0,0 +1,348 @@ +from __future__ import unicode_literals, print_function +import unittest +import sys +import requests +from io import StringIO + +from oauthlib.oauth1 import SIGNATURE_TYPE_QUERY, SIGNATURE_TYPE_BODY +from oauthlib.oauth1 import SIGNATURE_RSA, SIGNATURE_PLAINTEXT +from requests_oauthlib import OAuth1Session + +try: + import mock +except ImportError: + from unittest import mock + +try: + import cryptography +except ImportError: + cryptography = None + +try: + import jwt +except ImportError: + jwt = None + +if sys.version[0] == "3": + unicode_type = str +else: + unicode_type = unicode + + +TEST_RSA_KEY = ( + "-----BEGIN RSA PRIVATE KEY-----\n" + "MIIEogIBAAKCAQEApF1JaMSN8TEsh4N4O/5SpEAVLivJyLH+Cgl3OQBPGgJkt8cg\n" + "49oasl+5iJS+VdrILxWM9/JCJyURpUuslX4Eb4eUBtQ0x5BaPa8+S2NLdGTaL7nB\n" + "OO8o8n0C5FEUU+qlEip79KE8aqOj+OC44VsIquSmOvWIQD26n3fCVlgwoRBD1gzz\n" + "sDOeaSyzpKrZR851Kh6rEmF2qjJ8jt6EkxMsRNACmBomzgA4M1TTsisSUO87444p\n" + "e35Z4/n5c735o2fZMrGgMwiJNh7rT8SYxtIkxngioiGnwkxGQxQ4NzPAHg+XSY0J\n" + "04pNm7KqTkgtxyrqOANJLIjXlR+U9SQ90NjHVQIDAQABAoIBABuBPOKaWcJt3yzC\n" + "NGGduoif7KtwSnEaUA+v69KPGa2Zju8uFHPssKD+4dZYRc2qMeunKJLpaGaSjnRh\n" + "yHyvvOBJCN1nr3lhz6gY5kzJTfwpUFXCOPJlGy4Q+2Xnp4YvcvYqQ9n5DVovDiZ8\n" + "vJOBn16xqpudMPLHIa7D5LJ8SY76HBjE+imTXw1EShdh5TOV9bmPFQqH6JFzowRH\n" + "hyH2DPHuyHJj6cl8FyqJw5lVWzG3n6Prvk7bYHsjmGjurN35UsumNAp6VouNyUP1\n" + "RAEcUJega49aIs6/FJ0ENJzQjlsAzVbTleHkpez2aIok+wsWJGJ4SVxAjADOWAaZ\n" + "uEJPc3UCgYEA1g4ZGrXOuo75p9/MRIepXGpBWxip4V7B9XmO9WzPCv8nMorJntWB\n" + "msYV1I01aITxadHatO4Gl2xLniNkDyrEQzJ7w38RQgsVK+CqbnC0K9N77QPbHeC1\n" + "YQd9RCNyUohOimKvb7jyv798FBU1GO5QI2eNgfnnfteSVXhD2iOoTOsCgYEAxJJ+\n" + "8toxJdnLa0uUsAbql6zeNXGbUBMzu3FomKlyuWuq841jS2kIalaO/TRj5hbnE45j\n" + "mCjeLgTVO6Ach3Wfk4zrqajqfFJ0zUg/Wexp49lC3RWiV4icBb85Q6bzeJD9Dn9v\n" + "hjpfWVkczf/NeA1fGH/pcgfkT6Dm706GFFttLL8CgYBl/HeXk1H47xAiHO4dJKnb\n" + "v0B+X8To/RXamF01r+8BpUoOubOQetdyX7ic+d6deuHu8i6LD/GSCeYJZYFR/KVg\n" + "AtiW757QYalnq3ZogkhFrVCZP8IRfTPOFBxp752TlyAcrSI7T9pQ47IBe4094KXM\n" + "CJWSfPgAJkOxd0iU0XJpmwKBgGfQxuMTgSlwYRKFlD1zKap5TdID8fbUbVnth0Q5\n" + "GbH7vwlp/qrxCdS/aj0n0irOpbOaW9ccnlrHiqY25VpVMLYIkt3DrDOEiNNx+KNR\n" + "TItdTwbcSiTYrS4L0/56ydM/H6bsfsXxRjI18hSJqMZiqXqS84OZz2aOn+h7HCzc\n" + "LEiZAoGASk20wFvilpRKHq79xxFWiDUPHi0x0pp82dYIEntGQkKUWkbSlhgf3MAi\n" + "5NEQTDmXdnB+rVeWIvEi+BXfdnNgdn8eC4zSdtF4sIAhYr5VWZo0WVWDhT7u2ccv\n" + "ZBFymiz8lo3gN57wGUCi9pbZqzV1+ZppX6YTNDdDCE0q+KO3Cec=\n" + "-----END RSA PRIVATE KEY-----" +) + +TEST_RSA_OAUTH_SIGNATURE = ( + "j8WF8PGjojT82aUDd2EL%2Bz7HCoHInFzWUpiEKMCy%2BJ2cYHWcBS7mXlmFDLgAKV0" + "P%2FyX4TrpXODYnJ6dRWdfghqwDpi%2FlQmB2jxCiGMdJoYxh3c5zDf26gEbGdP6D7O" + "Ssp5HUnzH6sNkmVjuE%2FxoJcHJdc23H6GhOs7VJ2LWNdbhKWP%2FMMlTrcoQDn8lz" + "%2Fb24WsJ6ae1txkUzpFOOlLM8aTdNtGL4OtsubOlRhNqnAFq93FyhXg0KjzUyIZzmMX" + "9Vx90jTks5QeBGYcLE0Op2iHb2u%2FO%2BEgdwFchgEwE5LgMUyHUI4F3Wglp28yHOAM" + "jPkI%2FkWMvpxtMrU3Z3KN31WQ%3D%3D" +) + + +class OAuth1SessionTest(unittest.TestCase): + def test_signature_types(self): + def verify_signature(getter): + def fake_send(r, **kwargs): + signature = getter(r) + if isinstance(signature, bytes): + signature = signature.decode("utf-8") + self.assertIn("oauth_signature", signature) + resp = mock.MagicMock(spec=requests.Response) + resp.cookies = [] + return resp + + return fake_send + + header = OAuth1Session("foo") + header.send = verify_signature(lambda r: r.headers["Authorization"]) + header.post("https://i.b") + + query = OAuth1Session("foo", signature_type=SIGNATURE_TYPE_QUERY) + query.send = verify_signature(lambda r: r.url) + query.post("https://i.b") + + body = OAuth1Session("foo", signature_type=SIGNATURE_TYPE_BODY) + headers = {"Content-Type": "application/x-www-form-urlencoded"} + body.send = verify_signature(lambda r: r.body) + body.post("https://i.b", headers=headers, data="") + + @mock.patch("oauthlib.oauth1.rfc5849.generate_timestamp") + @mock.patch("oauthlib.oauth1.rfc5849.generate_nonce") + def test_signature_methods(self, generate_nonce, generate_timestamp): + if not cryptography: + raise unittest.SkipTest("cryptography module is required") + if not jwt: + raise unittest.SkipTest("pyjwt module is required") + + generate_nonce.return_value = "abc" + generate_timestamp.return_value = "123" + + signature = 'OAuth oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", oauth_signature_method="HMAC-SHA1", oauth_consumer_key="foo", oauth_signature="h2sRqLArjhlc5p3FTkuNogVHlKE%3D"' + auth = OAuth1Session("foo") + auth.send = self.verify_signature(signature) + auth.post("https://i.b") + + signature = 'OAuth oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", oauth_signature_method="PLAINTEXT", oauth_consumer_key="foo", oauth_signature="%26"' + auth = OAuth1Session("foo", signature_method=SIGNATURE_PLAINTEXT) + auth.send = self.verify_signature(signature) + auth.post("https://i.b") + + signature = ( + "OAuth " + 'oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", ' + 'oauth_signature_method="RSA-SHA1", oauth_consumer_key="foo", ' + 'oauth_signature="{sig}"' + ).format(sig=TEST_RSA_OAUTH_SIGNATURE) + auth = OAuth1Session( + "foo", signature_method=SIGNATURE_RSA, rsa_key=TEST_RSA_KEY + ) + auth.send = self.verify_signature(signature) + auth.post("https://i.b") + + @mock.patch("oauthlib.oauth1.rfc5849.generate_timestamp") + @mock.patch("oauthlib.oauth1.rfc5849.generate_nonce") + def test_binary_upload(self, generate_nonce, generate_timestamp): + generate_nonce.return_value = "abc" + generate_timestamp.return_value = "123" + fake_xml = StringIO("hello world") + headers = {"Content-Type": "application/xml"} + signature = 'OAuth oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", oauth_signature_method="HMAC-SHA1", oauth_consumer_key="foo", oauth_signature="h2sRqLArjhlc5p3FTkuNogVHlKE%3D"' + auth = OAuth1Session("foo") + auth.send = self.verify_signature(signature) + auth.post("https://i.b", headers=headers, files=[("fake", fake_xml)]) + + @mock.patch("oauthlib.oauth1.rfc5849.generate_timestamp") + @mock.patch("oauthlib.oauth1.rfc5849.generate_nonce") + def test_nonascii(self, generate_nonce, generate_timestamp): + generate_nonce.return_value = "abc" + generate_timestamp.return_value = "123" + signature = 'OAuth oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", oauth_signature_method="HMAC-SHA1", oauth_consumer_key="foo", oauth_signature="W0haoue5IZAZoaJiYCtfqwMf8x8%3D"' + auth = OAuth1Session("foo") + auth.send = self.verify_signature(signature) + auth.post("https://i.b?cjk=%E5%95%A6%E5%95%A6") + + def test_authorization_url(self): + auth = OAuth1Session("foo") + url = "https://example.comm/authorize" + token = "asluif023sf" + auth_url = auth.authorization_url(url, request_token=token) + self.assertEqual(auth_url, url + "?oauth_token=" + token) + + def test_parse_response_url(self): + url = "https://i.b/callback?oauth_token=foo&oauth_verifier=bar" + auth = OAuth1Session("foo") + resp = auth.parse_authorization_response(url) + self.assertEqual(resp["oauth_token"], "foo") + self.assertEqual(resp["oauth_verifier"], "bar") + for k, v in resp.items(): + self.assertIsInstance(k, unicode_type) + self.assertIsInstance(v, unicode_type) + + def test_fetch_request_token(self): + auth = OAuth1Session("foo") + auth.send = self.fake_body("oauth_token=foo") + resp = auth.fetch_request_token("https://example.com/token") + self.assertEqual(resp["oauth_token"], "foo") + for k, v in resp.items(): + self.assertIsInstance(k, unicode_type) + self.assertIsInstance(v, unicode_type) + + def test_fetch_request_token_with_optional_arguments(self): + auth = OAuth1Session("foo") + auth.send = self.fake_body("oauth_token=foo") + resp = auth.fetch_request_token( + "https://example.com/token", verify=False, stream=True + ) + self.assertEqual(resp["oauth_token"], "foo") + for k, v in resp.items(): + self.assertIsInstance(k, unicode_type) + self.assertIsInstance(v, unicode_type) + + def test_fetch_access_token(self): + auth = OAuth1Session("foo", verifier="bar") + auth.send = self.fake_body("oauth_token=foo") + resp = auth.fetch_access_token("https://example.com/token") + self.assertEqual(resp["oauth_token"], "foo") + for k, v in resp.items(): + self.assertIsInstance(k, unicode_type) + self.assertIsInstance(v, unicode_type) + + def test_fetch_access_token_with_optional_arguments(self): + auth = OAuth1Session("foo", verifier="bar") + auth.send = self.fake_body("oauth_token=foo") + resp = auth.fetch_access_token( + "https://example.com/token", verify=False, stream=True + ) + self.assertEqual(resp["oauth_token"], "foo") + for k, v in resp.items(): + self.assertIsInstance(k, unicode_type) + self.assertIsInstance(v, unicode_type) + + def _test_fetch_access_token_raises_error(self, auth): + """Assert that an error is being raised whenever there's no verifier + passed in to the client. + """ + auth.send = self.fake_body("oauth_token=foo") + with self.assertRaises(ValueError) as cm: + auth.fetch_access_token("https://example.com/token") + self.assertEqual("No client verifier has been set.", str(cm.exception)) + + def test_fetch_token_invalid_response(self): + auth = OAuth1Session("foo") + auth.send = self.fake_body("not valid urlencoded response!") + self.assertRaises( + ValueError, auth.fetch_request_token, "https://example.com/token" + ) + + for code in (400, 401, 403): + auth.send = self.fake_body("valid=response", code) + with self.assertRaises(ValueError) as cm: + auth.fetch_request_token("https://example.com/token") + self.assertEqual(cm.exception.status_code, code) + self.assertIsInstance(cm.exception.response, requests.Response) + + def test_fetch_access_token_missing_verifier(self): + self._test_fetch_access_token_raises_error(OAuth1Session("foo")) + + def test_fetch_access_token_has_verifier_is_none(self): + auth = OAuth1Session("foo") + del auth._client.client.verifier + self._test_fetch_access_token_raises_error(auth) + + def test_token_proxy_set(self): + token = { + "oauth_token": "fake-key", + "oauth_token_secret": "fake-secret", + "oauth_verifier": "fake-verifier", + } + sess = OAuth1Session("foo") + self.assertIsNone(sess._client.client.resource_owner_key) + self.assertIsNone(sess._client.client.resource_owner_secret) + self.assertIsNone(sess._client.client.verifier) + self.assertEqual(sess.token, {}) + + sess.token = token + self.assertEqual(sess._client.client.resource_owner_key, "fake-key") + self.assertEqual(sess._client.client.resource_owner_secret, "fake-secret") + self.assertEqual(sess._client.client.verifier, "fake-verifier") + + def test_token_proxy_get(self): + token = { + "oauth_token": "fake-key", + "oauth_token_secret": "fake-secret", + "oauth_verifier": "fake-verifier", + } + sess = OAuth1Session( + "foo", + resource_owner_key=token["oauth_token"], + resource_owner_secret=token["oauth_token_secret"], + verifier=token["oauth_verifier"], + ) + self.assertEqual(sess.token, token) + + sess._client.client.resource_owner_key = "different-key" + token["oauth_token"] = "different-key" + + self.assertEqual(sess.token, token) + + def test_authorized_false(self): + sess = OAuth1Session("foo") + self.assertIs(sess.authorized, False) + + def test_authorized_false_rsa(self): + signature = ( + "OAuth " + 'oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", ' + 'oauth_signature_method="RSA-SHA1", oauth_consumer_key="foo", ' + 'oauth_signature="{sig}"' + ).format(sig=TEST_RSA_OAUTH_SIGNATURE) + sess = OAuth1Session( + "foo", signature_method=SIGNATURE_RSA, rsa_key=TEST_RSA_KEY + ) + sess.send = self.verify_signature(signature) + self.assertIs(sess.authorized, False) + + def test_authorized_true(self): + sess = OAuth1Session("key", "secret", verifier="bar") + sess.send = self.fake_body("oauth_token=foo&oauth_token_secret=bar") + sess.fetch_access_token("https://example.com/token") + self.assertIs(sess.authorized, True) + + @mock.patch("oauthlib.oauth1.rfc5849.generate_timestamp") + @mock.patch("oauthlib.oauth1.rfc5849.generate_nonce") + def test_authorized_true_rsa(self, generate_nonce, generate_timestamp): + if not cryptography: + raise unittest.SkipTest("cryptography module is required") + if not jwt: + raise unittest.SkipTest("pyjwt module is required") + + generate_nonce.return_value = "abc" + generate_timestamp.return_value = "123" + signature = ( + "OAuth " + 'oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", ' + 'oauth_signature_method="RSA-SHA1", oauth_consumer_key="foo", ' + 'oauth_verifier="bar", oauth_signature="{sig}"' + ).format(sig=TEST_RSA_OAUTH_SIGNATURE) + sess = OAuth1Session( + "key", + "secret", + signature_method=SIGNATURE_RSA, + rsa_key=TEST_RSA_KEY, + verifier="bar", + ) + sess.send = self.fake_body("oauth_token=foo&oauth_token_secret=bar") + sess.fetch_access_token("https://example.com/token") + self.assertIs(sess.authorized, True) + + def verify_signature(self, signature): + def fake_send(r, **kwargs): + auth_header = r.headers["Authorization"] + if isinstance(auth_header, bytes): + auth_header = auth_header.decode("utf-8") + self.assertEqual(auth_header, signature) + resp = mock.MagicMock(spec=requests.Response) + resp.cookies = [] + return resp + + return fake_send + + def fake_body(self, body, status_code=200): + def fake_send(r, **kwargs): + resp = mock.MagicMock(spec=requests.Response) + resp.cookies = [] + resp.text = body + resp.status_code = status_code + return resp + + return fake_send diff --git a/contrib/python/requests-oauthlib/tests/test_oauth2_auth.py b/contrib/python/requests-oauthlib/tests/test_oauth2_auth.py new file mode 100644 index 00000000000..accb561ef6c --- /dev/null +++ b/contrib/python/requests-oauthlib/tests/test_oauth2_auth.py @@ -0,0 +1,54 @@ +from __future__ import unicode_literals +import unittest + +from oauthlib.oauth2 import WebApplicationClient, MobileApplicationClient +from oauthlib.oauth2 import LegacyApplicationClient, BackendApplicationClient +from requests import Request +from requests_oauthlib import OAuth2 + + +class OAuth2AuthTest(unittest.TestCase): + def setUp(self): + self.token = { + "token_type": "Bearer", + "access_token": "asdfoiw37850234lkjsdfsdf", + "expires_in": "3600", + } + self.client_id = "foo" + self.clients = [ + WebApplicationClient(self.client_id), + MobileApplicationClient(self.client_id), + LegacyApplicationClient(self.client_id), + BackendApplicationClient(self.client_id), + ] + + def test_add_token_to_url(self): + url = "https://example.com/resource?foo=bar" + new_url = url + "&access_token=" + self.token["access_token"] + for client in self.clients: + client.default_token_placement = "query" + auth = OAuth2(client=client, token=self.token) + r = Request("GET", url, auth=auth).prepare() + self.assertEqual(r.url, new_url) + + def test_add_token_to_headers(self): + token = "Bearer " + self.token["access_token"] + for client in self.clients: + auth = OAuth2(client=client, token=self.token) + r = Request("GET", "https://i.b", auth=auth).prepare() + self.assertEqual(r.headers["Authorization"], token) + + def test_add_token_to_body(self): + body = "foo=bar" + new_body = body + "&access_token=" + self.token["access_token"] + for client in self.clients: + client.default_token_placement = "body" + auth = OAuth2(client=client, token=self.token) + r = Request("GET", "https://i.b", data=body, auth=auth).prepare() + self.assertEqual(r.body, new_body) + + def test_add_nonexisting_token(self): + for client in self.clients: + auth = OAuth2(client=client) + r = Request("GET", "https://i.b", auth=auth) + self.assertRaises(ValueError, r.prepare) diff --git a/contrib/python/requests-oauthlib/tests/test_oauth2_session.py b/contrib/python/requests-oauthlib/tests/test_oauth2_session.py new file mode 100644 index 00000000000..cfc6236855c --- /dev/null +++ b/contrib/python/requests-oauthlib/tests/test_oauth2_session.py @@ -0,0 +1,527 @@ +from __future__ import unicode_literals +import json +import time +import tempfile +import shutil +import os +from base64 import b64encode +from copy import deepcopy +from unittest import TestCase + +try: + import mock +except ImportError: + from unittest import mock + +from oauthlib.common import urlencode +from oauthlib.oauth2 import TokenExpiredError, OAuth2Error +from oauthlib.oauth2 import MismatchingStateError +from oauthlib.oauth2 import WebApplicationClient, MobileApplicationClient +from oauthlib.oauth2 import LegacyApplicationClient, BackendApplicationClient +from requests_oauthlib import OAuth2Session, TokenUpdated +import requests + +from requests.auth import _basic_auth_str + + +fake_time = time.time() +CODE = "asdf345xdf" + + +def fake_token(token): + def fake_send(r, **kwargs): + resp = mock.MagicMock() + resp.text = json.dumps(token) + return resp + + return fake_send + + +class OAuth2SessionTest(TestCase): + def setUp(self): + self.token = { + "token_type": "Bearer", + "access_token": "asdfoiw37850234lkjsdfsdf", + "refresh_token": "sldvafkjw34509s8dfsdf", + "expires_in": 3600, + "expires_at": fake_time + 3600, + } + # use someclientid:someclientsecret to easily differentiate between client and user credentials + # these are the values used in oauthlib tests + self.client_id = "someclientid" + self.client_secret = "someclientsecret" + self.user_username = "user_username" + self.user_password = "user_password" + self.client_WebApplication = WebApplicationClient(self.client_id, code=CODE) + self.client_LegacyApplication = LegacyApplicationClient(self.client_id) + self.client_BackendApplication = BackendApplicationClient(self.client_id) + self.client_MobileApplication = MobileApplicationClient(self.client_id) + self.clients = [ + self.client_WebApplication, + self.client_LegacyApplication, + self.client_BackendApplication, + ] + self.all_clients = self.clients + [self.client_MobileApplication] + + def test_add_token(self): + token = "Bearer " + self.token["access_token"] + + def verifier(r, **kwargs): + auth_header = r.headers.get(str("Authorization"), None) + self.assertEqual(auth_header, token) + resp = mock.MagicMock() + resp.cookes = [] + return resp + + for client in self.all_clients: + sess = OAuth2Session(client=client, token=self.token) + sess.send = verifier + sess.get("https://i.b") + + def test_mtls(self): + cert = ( + "testsomething.example-client.pem", + "testsomething.example-client-key.pem", + ) + + def verifier(r, **kwargs): + self.assertIn("cert", kwargs) + self.assertEqual(cert, kwargs["cert"]) + self.assertIn("client_id=" + self.client_id, r.body) + resp = mock.MagicMock() + resp.text = json.dumps(self.token) + return resp + + for client in self.clients: + sess = OAuth2Session(client=client) + sess.send = verifier + + if isinstance(client, LegacyApplicationClient): + sess.fetch_token( + "https://i.b", + include_client_id=True, + cert=cert, + username="username1", + password="password1", + ) + else: + sess.fetch_token("https://i.b", include_client_id=True, cert=cert) + + def test_authorization_url(self): + url = "https://example.com/authorize?foo=bar" + + web = WebApplicationClient(self.client_id) + s = OAuth2Session(client=web) + auth_url, state = s.authorization_url(url) + self.assertIn(state, auth_url) + self.assertIn(self.client_id, auth_url) + self.assertIn("response_type=code", auth_url) + + mobile = MobileApplicationClient(self.client_id) + s = OAuth2Session(client=mobile) + auth_url, state = s.authorization_url(url) + self.assertIn(state, auth_url) + self.assertIn(self.client_id, auth_url) + self.assertIn("response_type=token", auth_url) + + @mock.patch("time.time", new=lambda: fake_time) + def test_refresh_token_request(self): + self.expired_token = dict(self.token) + self.expired_token["expires_in"] = "-1" + del self.expired_token["expires_at"] + + def fake_refresh(r, **kwargs): + if "/refresh" in r.url: + self.assertNotIn("Authorization", r.headers) + resp = mock.MagicMock() + resp.text = json.dumps(self.token) + return resp + + # No auto refresh setup + for client in self.clients: + sess = OAuth2Session(client=client, token=self.expired_token) + self.assertRaises(TokenExpiredError, sess.get, "https://i.b") + + # Auto refresh but no auto update + for client in self.clients: + sess = OAuth2Session( + client=client, + token=self.expired_token, + auto_refresh_url="https://i.b/refresh", + ) + sess.send = fake_refresh + self.assertRaises(TokenUpdated, sess.get, "https://i.b") + + # Auto refresh and auto update + def token_updater(token): + self.assertEqual(token, self.token) + + for client in self.clients: + sess = OAuth2Session( + client=client, + token=self.expired_token, + auto_refresh_url="https://i.b/refresh", + token_updater=token_updater, + ) + sess.send = fake_refresh + sess.get("https://i.b") + + def fake_refresh_with_auth(r, **kwargs): + if "/refresh" in r.url: + self.assertIn("Authorization", r.headers) + encoded = b64encode( + "{client_id}:{client_secret}".format( + client_id=self.client_id, client_secret=self.client_secret + ).encode("latin1") + ) + content = "Basic {encoded}".format(encoded=encoded.decode("latin1")) + self.assertEqual(r.headers["Authorization"], content) + resp = mock.MagicMock() + resp.text = json.dumps(self.token) + return resp + + for client in self.clients: + sess = OAuth2Session( + client=client, + token=self.expired_token, + auto_refresh_url="https://i.b/refresh", + token_updater=token_updater, + ) + sess.send = fake_refresh_with_auth + sess.get( + "https://i.b", + client_id=self.client_id, + client_secret=self.client_secret, + ) + + @mock.patch("time.time", new=lambda: fake_time) + def test_token_from_fragment(self): + mobile = MobileApplicationClient(self.client_id) + response_url = "https://i.b/callback#" + urlencode(self.token.items()) + sess = OAuth2Session(client=mobile) + self.assertEqual(sess.token_from_fragment(response_url), self.token) + + @mock.patch("time.time", new=lambda: fake_time) + def test_fetch_token(self): + url = "https://example.com/token" + + for client in self.clients: + sess = OAuth2Session(client=client, token=self.token) + sess.send = fake_token(self.token) + if isinstance(client, LegacyApplicationClient): + # this client requires a username+password + # if unset, an error will be raised + self.assertRaises(ValueError, sess.fetch_token, url) + self.assertRaises( + ValueError, sess.fetch_token, url, username="username1" + ) + self.assertRaises( + ValueError, sess.fetch_token, url, password="password1" + ) + # otherwise it will pass + self.assertEqual( + sess.fetch_token(url, username="username1", password="password1"), + self.token, + ) + else: + self.assertEqual(sess.fetch_token(url), self.token) + + error = {"error": "invalid_request"} + for client in self.clients: + sess = OAuth2Session(client=client, token=self.token) + sess.send = fake_token(error) + if isinstance(client, LegacyApplicationClient): + # this client requires a username+password + # if unset, an error will be raised + self.assertRaises(ValueError, sess.fetch_token, url) + self.assertRaises( + ValueError, sess.fetch_token, url, username="username1" + ) + self.assertRaises( + ValueError, sess.fetch_token, url, password="password1" + ) + # otherwise it will pass + self.assertRaises( + OAuth2Error, + sess.fetch_token, + url, + username="username1", + password="password1", + ) + else: + self.assertRaises(OAuth2Error, sess.fetch_token, url) + + # there are different scenarios in which the `client_id` can be specified + # reference `oauthlib.tests.oauth2.rfc6749.clients.test_web_application.WebApplicationClientTest.test_prepare_request_body` + # this only needs to test WebApplicationClient + client = self.client_WebApplication + client.tester = True + + # this should be a tuple of (r.url, r.body, r.headers.get('Authorization')) + _fetch_history = [] + + def fake_token_history(token): + def fake_send(r, **kwargs): + resp = mock.MagicMock() + resp.text = json.dumps(token) + _fetch_history.append( + (r.url, r.body, r.headers.get("Authorization", None)) + ) + return resp + + return fake_send + + sess = OAuth2Session(client=client, token=self.token) + sess.send = fake_token_history(self.token) + expected_auth_header = _basic_auth_str(self.client_id, self.client_secret) + + # scenario 1 - default request + # this should send the `client_id` in the headers, as that is recommended by the RFC + self.assertEqual( + sess.fetch_token(url, client_secret="someclientsecret"), self.token + ) + self.assertEqual(len(_fetch_history), 1) + self.assertNotIn( + "client_id", _fetch_history[0][1] + ) # no `client_id` in the body + self.assertNotIn( + "client_secret", _fetch_history[0][1] + ) # no `client_secret` in the body + self.assertEqual( + _fetch_history[0][2], expected_auth_header + ) # ensure a Basic Authorization header + + # scenario 2 - force the `client_id` into the body + self.assertEqual( + sess.fetch_token( + url, client_secret="someclientsecret", include_client_id=True + ), + self.token, + ) + self.assertEqual(len(_fetch_history), 2) + self.assertIn("client_id=%s" % self.client_id, _fetch_history[1][1]) + self.assertIn("client_secret=%s" % self.client_secret, _fetch_history[1][1]) + self.assertEqual( + _fetch_history[1][2], None + ) # ensure NO Basic Authorization header + + # scenario 3 - send in an auth object + auth = requests.auth.HTTPBasicAuth(self.client_id, self.client_secret) + self.assertEqual(sess.fetch_token(url, auth=auth), self.token) + self.assertEqual(len(_fetch_history), 3) + self.assertNotIn( + "client_id", _fetch_history[2][1] + ) # no `client_id` in the body + self.assertNotIn( + "client_secret", _fetch_history[2][1] + ) # no `client_secret` in the body + self.assertEqual( + _fetch_history[2][2], expected_auth_header + ) # ensure a Basic Authorization header + + # scenario 4 - send in a username/password combo + # this should send the `client_id` in the headers, like scenario 1 + self.assertEqual( + sess.fetch_token( + url, username=self.user_username, password=self.user_password + ), + self.token, + ) + self.assertEqual(len(_fetch_history), 4) + self.assertNotIn( + "client_id", _fetch_history[3][1] + ) # no `client_id` in the body + self.assertNotIn( + "client_secret", _fetch_history[3][1] + ) # no `client_secret` in the body + self.assertEqual( + _fetch_history[0][2], expected_auth_header + ) # ensure a Basic Authorization header + self.assertIn("username=%s" % self.user_username, _fetch_history[3][1]) + self.assertIn("password=%s" % self.user_password, _fetch_history[3][1]) + + # scenario 5 - send data in `params` and not in `data` for providers + # that expect data in URL + self.assertEqual( + sess.fetch_token(url, client_secret="somesecret", force_querystring=True), + self.token, + ) + self.assertIn("code=%s" % CODE, _fetch_history[4][0]) + + # some quick tests for valid ways of supporting `client_secret` + + # scenario 2b - force the `client_id` into the body; but the `client_secret` is `None` + self.assertEqual( + sess.fetch_token(url, client_secret=None, include_client_id=True), + self.token, + ) + self.assertEqual(len(_fetch_history), 6) + self.assertIn("client_id=%s" % self.client_id, _fetch_history[5][1]) + self.assertNotIn( + "client_secret=", _fetch_history[5][1] + ) # no `client_secret` in the body + self.assertEqual( + _fetch_history[5][2], None + ) # ensure NO Basic Authorization header + + # scenario 2c - force the `client_id` into the body; but the `client_secret` is an empty string + self.assertEqual( + sess.fetch_token(url, client_secret="", include_client_id=True), self.token + ) + self.assertEqual(len(_fetch_history), 7) + self.assertIn("client_id=%s" % self.client_id, _fetch_history[6][1]) + self.assertIn("client_secret=", _fetch_history[6][1]) + self.assertEqual( + _fetch_history[6][2], None + ) # ensure NO Basic Authorization header + + def test_cleans_previous_token_before_fetching_new_one(self): + """Makes sure the previous token is cleaned before fetching a new one. + + The reason behind it is that, if the previous token is expired, this + method shouldn't fail with a TokenExpiredError, since it's attempting + to get a new one (which shouldn't be expired). + + """ + new_token = deepcopy(self.token) + past = time.time() - 7200 + now = time.time() + self.token["expires_at"] = past + new_token["expires_at"] = now + 3600 + url = "https://example.com/token" + + with mock.patch("time.time", lambda: now): + for client in self.clients: + sess = OAuth2Session(client=client, token=self.token) + sess.send = fake_token(new_token) + if isinstance(client, LegacyApplicationClient): + # this client requires a username+password + # if unset, an error will be raised + self.assertRaises(ValueError, sess.fetch_token, url) + self.assertRaises( + ValueError, sess.fetch_token, url, username="username1" + ) + self.assertRaises( + ValueError, sess.fetch_token, url, password="password1" + ) + # otherwise it will pass + self.assertEqual( + sess.fetch_token( + url, username="username1", password="password1" + ), + new_token, + ) + else: + self.assertEqual(sess.fetch_token(url), new_token) + + def test_web_app_fetch_token(self): + # Ensure the state parameter is used, see issue #105. + client = OAuth2Session("someclientid", state="somestate") + self.assertRaises( + MismatchingStateError, + client.fetch_token, + "https://i.b/token", + authorization_response="https://i.b/no-state?code=abc", + ) + + def test_client_id_proxy(self): + sess = OAuth2Session("test-id") + self.assertEqual(sess.client_id, "test-id") + sess.client_id = "different-id" + self.assertEqual(sess.client_id, "different-id") + sess._client.client_id = "something-else" + self.assertEqual(sess.client_id, "something-else") + del sess.client_id + self.assertIsNone(sess.client_id) + + def test_access_token_proxy(self): + sess = OAuth2Session("test-id") + self.assertIsNone(sess.access_token) + sess.access_token = "test-token" + self.assertEqual(sess.access_token, "test-token") + sess._client.access_token = "different-token" + self.assertEqual(sess.access_token, "different-token") + del sess.access_token + self.assertIsNone(sess.access_token) + + def test_token_proxy(self): + token = {"access_token": "test-access"} + sess = OAuth2Session("test-id", token=token) + self.assertEqual(sess.access_token, "test-access") + self.assertEqual(sess.token, token) + token["access_token"] = "something-else" + sess.token = token + self.assertEqual(sess.access_token, "something-else") + self.assertEqual(sess.token, token) + sess._client.access_token = "different-token" + token["access_token"] = "different-token" + self.assertEqual(sess.access_token, "different-token") + self.assertEqual(sess.token, token) + # can't delete token attribute + with self.assertRaises(AttributeError): + del sess.token + + def test_authorized_false(self): + sess = OAuth2Session("someclientid") + self.assertFalse(sess.authorized) + + @mock.patch("time.time", new=lambda: fake_time) + def test_authorized_true(self): + def fake_token(token): + def fake_send(r, **kwargs): + resp = mock.MagicMock() + resp.text = json.dumps(token) + return resp + + return fake_send + + url = "https://example.com/token" + + for client in self.clients: + sess = OAuth2Session(client=client) + sess.send = fake_token(self.token) + self.assertFalse(sess.authorized) + if isinstance(client, LegacyApplicationClient): + # this client requires a username+password + # if unset, an error will be raised + self.assertRaises(ValueError, sess.fetch_token, url) + self.assertRaises( + ValueError, sess.fetch_token, url, username="username1" + ) + self.assertRaises( + ValueError, sess.fetch_token, url, password="password1" + ) + # otherwise it will pass + sess.fetch_token(url, username="username1", password="password1") + else: + sess.fetch_token(url) + self.assertTrue(sess.authorized) + + +class OAuth2SessionNetrcTest(OAuth2SessionTest): + """Ensure that there is no magic auth handling. + + By default, requests sessions have magic handling of netrc files, + which is undesirable for this library because it will take + precedence over manually set authentication headers. + """ + + def setUp(self): + # Set up a temporary home directory + self.homedir = tempfile.mkdtemp() + self.prehome = os.environ.get("HOME", None) + os.environ["HOME"] = self.homedir + + # Write a .netrc file that will cause problems + netrc_loc = os.path.expanduser("~/.netrc") + with open(netrc_loc, "w") as f: + f.write("machine i.b\n" " password abc123\n" " login spam@eggs.co\n") + + super(OAuth2SessionNetrcTest, self).setUp() + + def tearDown(self): + super(OAuth2SessionNetrcTest, self).tearDown() + + if self.prehome is not None: + os.environ["HOME"] = self.prehome + shutil.rmtree(self.homedir) diff --git a/contrib/python/requests-oauthlib/tests/ya.make b/contrib/python/requests-oauthlib/tests/ya.make new file mode 100644 index 00000000000..a8f7328ae79 --- /dev/null +++ b/contrib/python/requests-oauthlib/tests/ya.make @@ -0,0 +1,28 @@ +PY3TEST() + +PEERDIR( + contrib/python/requests-oauthlib + contrib/python/requests-mock +) + +# These tests use real http://httpbin.org that is why they are disabled: +# testCanPostBinaryData +# test_url_is_native_str +# test_content_type_override + +TEST_SRCS( + __init__.py + test_compliance_fixes.py + test_core.py + test_oauth1_session.py + test_oauth2_auth.py + test_oauth2_session.py +) + +DATA( + arcadia/contrib/python/requests-oauthlib/tests +) + +NO_LINT() + +END() |