diff options
author | alexv-smirnov <alex@ydb.tech> | 2023-12-01 12:02:50 +0300 |
---|---|---|
committer | alexv-smirnov <alex@ydb.tech> | 2023-12-01 13:28:10 +0300 |
commit | 0e578a4c44d4abd539d9838347b9ebafaca41dfb (patch) | |
tree | a0c1969c37f818c830ebeff9c077eacf30be6ef8 /contrib/python/websocket-client/py3/websocket/tests | |
parent | 84f2d3d4cc985e63217cff149bd2e6d67ae6fe22 (diff) | |
download | ydb-0e578a4c44d4abd539d9838347b9ebafaca41dfb.tar.gz |
Change "ya.make"
Diffstat (limited to 'contrib/python/websocket-client/py3/websocket/tests')
10 files changed, 1476 insertions, 0 deletions
diff --git a/contrib/python/websocket-client/py3/websocket/tests/__init__.py b/contrib/python/websocket-client/py3/websocket/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/websocket-client/py3/websocket/tests/__init__.py diff --git a/contrib/python/websocket-client/py3/websocket/tests/data/header01.txt b/contrib/python/websocket-client/py3/websocket/tests/data/header01.txt new file mode 100644 index 0000000000..d44d24c205 --- /dev/null +++ b/contrib/python/websocket-client/py3/websocket/tests/data/header01.txt @@ -0,0 +1,6 @@ +HTTP/1.1 101 WebSocket Protocol Handshake +Connection: Upgrade +Upgrade: WebSocket +Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0= +some_header: something + diff --git a/contrib/python/websocket-client/py3/websocket/tests/data/header02.txt b/contrib/python/websocket-client/py3/websocket/tests/data/header02.txt new file mode 100644 index 0000000000..f481de928a --- /dev/null +++ b/contrib/python/websocket-client/py3/websocket/tests/data/header02.txt @@ -0,0 +1,6 @@ +HTTP/1.1 101 WebSocket Protocol Handshake +Connection: Upgrade +Upgrade WebSocket +Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0= +some_header: something + diff --git a/contrib/python/websocket-client/py3/websocket/tests/data/header03.txt b/contrib/python/websocket-client/py3/websocket/tests/data/header03.txt new file mode 100644 index 0000000000..1a81dc70ce --- /dev/null +++ b/contrib/python/websocket-client/py3/websocket/tests/data/header03.txt @@ -0,0 +1,8 @@ +HTTP/1.1 101 WebSocket Protocol Handshake +Connection: Upgrade, Keep-Alive +Upgrade: WebSocket +Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0= +Set-Cookie: Token=ABCDE +Set-Cookie: Token=FGHIJ +some_header: something + diff --git a/contrib/python/websocket-client/py3/websocket/tests/test_abnf.py b/contrib/python/websocket-client/py3/websocket/tests/test_abnf.py new file mode 100644 index 0000000000..dbf9b636a3 --- /dev/null +++ b/contrib/python/websocket-client/py3/websocket/tests/test_abnf.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +# +import websocket as ws +from websocket._abnf import * +import unittest + +""" +test_abnf.py +websocket - WebSocket client library for Python + +Copyright 2023 engn33r + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + + +class ABNFTest(unittest.TestCase): + + def testInit(self): + a = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING) + self.assertEqual(a.fin, 0) + self.assertEqual(a.rsv1, 0) + self.assertEqual(a.rsv2, 0) + self.assertEqual(a.rsv3, 0) + self.assertEqual(a.opcode, 9) + self.assertEqual(a.data, '') + a_bad = ABNF(0,1,0,0, opcode=77) + self.assertEqual(a_bad.rsv1, 1) + self.assertEqual(a_bad.opcode, 77) + + def testValidate(self): + a_invalid_ping = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING) + self.assertRaises(ws._exceptions.WebSocketProtocolException, a_invalid_ping.validate, skip_utf8_validation=False) + a_bad_rsv_value = ABNF(0,1,0,0, opcode=ABNF.OPCODE_TEXT) + self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_rsv_value.validate, skip_utf8_validation=False) + a_bad_opcode = ABNF(0,0,0,0, opcode=77) + self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_opcode.validate, skip_utf8_validation=False) + a_bad_close_frame = ABNF(0,0,0,0, opcode=ABNF.OPCODE_CLOSE, data=b'\x01') + self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_close_frame.validate, skip_utf8_validation=False) + a_bad_close_frame_2 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_CLOSE, data=b'\x01\x8a\xaa\xff\xdd') + self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_close_frame_2.validate, skip_utf8_validation=False) + a_bad_close_frame_3 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_CLOSE, data=b'\x03\xe7') + self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_close_frame_3.validate, skip_utf8_validation=True) + + def testMask(self): + abnf_none_data = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING, mask=1, data=None) + bytes_val = b"aaaa" + self.assertEqual(abnf_none_data._get_masked(bytes_val), bytes_val) + abnf_str_data = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING, mask=1, data="a") + self.assertEqual(abnf_str_data._get_masked(bytes_val), b'aaaa\x00') + + def testFormat(self): + abnf_bad_rsv_bits = ABNF(2,0,0,0, opcode=ABNF.OPCODE_TEXT) + self.assertRaises(ValueError, abnf_bad_rsv_bits.format) + abnf_bad_opcode = ABNF(0,0,0,0, opcode=5) + self.assertRaises(ValueError, abnf_bad_opcode.format) + abnf_length_10 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_TEXT, data="abcdefghij") + self.assertEqual(b'\x01', abnf_length_10.format()[0].to_bytes(1, 'big')) + self.assertEqual(b'\x8a', abnf_length_10.format()[1].to_bytes(1, 'big')) + self.assertEqual("fin=0 opcode=1 data=abcdefghij", abnf_length_10.__str__()) + abnf_length_20 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_BINARY, data="abcdefghijabcdefghij") + self.assertEqual(b'\x02', abnf_length_20.format()[0].to_bytes(1, 'big')) + self.assertEqual(b'\x94', abnf_length_20.format()[1].to_bytes(1, 'big')) + abnf_no_mask = ABNF(0,0,0,0, opcode=ABNF.OPCODE_TEXT, mask=0, data=b'\x01\x8a\xcc') + self.assertEqual(b'\x01\x03\x01\x8a\xcc', abnf_no_mask.format()) + + def testFrameBuffer(self): + fb = frame_buffer(0, True) + self.assertEqual(fb.recv, 0) + self.assertEqual(fb.skip_utf8_validation, True) + fb.clear + self.assertEqual(fb.header, None) + self.assertEqual(fb.length, None) + self.assertEqual(fb.mask, None) + self.assertEqual(fb.has_mask(), False) + + +if __name__ == "__main__": + unittest.main() diff --git a/contrib/python/websocket-client/py3/websocket/tests/test_app.py b/contrib/python/websocket-client/py3/websocket/tests/test_app.py new file mode 100644 index 0000000000..ff90a0aa87 --- /dev/null +++ b/contrib/python/websocket-client/py3/websocket/tests/test_app.py @@ -0,0 +1,299 @@ +# -*- coding: utf-8 -*- +# +import os +import os.path +import threading +import websocket as ws +import ssl +import unittest + +""" +test_app.py +websocket - WebSocket client library for Python + +Copyright 2023 engn33r + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +# Skip test to access the internet unless TEST_WITH_INTERNET == 1 +TEST_WITH_INTERNET = os.environ.get('TEST_WITH_INTERNET', '0') == '1' +# Skip tests relying on local websockets server unless LOCAL_WS_SERVER_PORT != -1 +LOCAL_WS_SERVER_PORT = os.environ.get('LOCAL_WS_SERVER_PORT', '-1') +TEST_WITH_LOCAL_SERVER = LOCAL_WS_SERVER_PORT != '-1' +TRACEABLE = True + + +class WebSocketAppTest(unittest.TestCase): + + class NotSetYet: + """ A marker class for signalling that a value hasn't been set yet. + """ + + def setUp(self): + ws.enableTrace(TRACEABLE) + + WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet() + WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet() + WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet() + WebSocketAppTest.on_error_data = WebSocketAppTest.NotSetYet() + + def tearDown(self): + WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet() + WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet() + WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet() + WebSocketAppTest.on_error_data = WebSocketAppTest.NotSetYet() + + @unittest.skipUnless(TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled") + def testKeepRunning(self): + """ A WebSocketApp should keep running as long as its self.keep_running + is not False (in the boolean context). + """ + + def on_open(self, *args, **kwargs): + """ Set the keep_running flag for later inspection and immediately + close the connection. + """ + self.send("hello!") + WebSocketAppTest.keep_running_open = self.keep_running + self.keep_running = False + + def on_message(wsapp, message): + print(message) + self.close() + + def on_close(self, *args, **kwargs): + """ Set the keep_running flag for the test to use. + """ + WebSocketAppTest.keep_running_close = self.keep_running + + app = ws.WebSocketApp('ws://127.0.0.1:' + LOCAL_WS_SERVER_PORT, on_open=on_open, on_close=on_close, on_message=on_message) + app.run_forever() + +# @unittest.skipUnless(TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled") + @unittest.skipUnless(False, "Test disabled for now (requires rel)") + def testRunForeverDispatcher(self): + """ A WebSocketApp should keep running as long as its self.keep_running + is not False (in the boolean context). + """ + + def on_open(self, *args, **kwargs): + """ Send a message, receive, and send one more + """ + self.send("hello!") + self.recv() + self.send("goodbye!") + + def on_message(wsapp, message): + print(message) + self.close() + + app = ws.WebSocketApp('ws://127.0.0.1:' + LOCAL_WS_SERVER_PORT, on_open=on_open, on_message=on_message) + app.run_forever(dispatcher="Dispatcher") # doesn't work +# app.run_forever(dispatcher=rel) # would work +# rel.dispatch() + + @unittest.skipUnless(TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled") + def testRunForeverTeardownCleanExit(self): + """ The WebSocketApp.run_forever() method should return `False` when the application ends gracefully. + """ + app = ws.WebSocketApp('ws://127.0.0.1:' + LOCAL_WS_SERVER_PORT) + threading.Timer(interval=0.2, function=app.close).start() + teardown = app.run_forever() + self.assertEqual(teardown, False) + + @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") + def testSockMaskKey(self): + """ A WebSocketApp should forward the received mask_key function down + to the actual socket. + """ + + def my_mask_key_func(): + return "\x00\x00\x00\x00" + + app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1', get_mask_key=my_mask_key_func) + + # if numpy is installed, this assertion fail + # Note: We can't use 'is' for comparing the functions directly, need to use 'id'. + self.assertEqual(id(app.get_mask_key), id(my_mask_key_func)) + + @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") + def testInvalidPingIntervalPingTimeout(self): + """ Test exception handling if ping_interval < ping_timeout + """ + + def on_ping(app, msg): + print("Got a ping!") + app.close() + + def on_pong(app, msg): + print("Got a pong! No need to respond") + app.close() + + app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1', on_ping=on_ping, on_pong=on_pong) + self.assertRaises(ws.WebSocketException, app.run_forever, ping_interval=1, ping_timeout=2, sslopt={"cert_reqs": ssl.CERT_NONE}) + + @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") + def testPingInterval(self): + """ Test WebSocketApp proper ping functionality + """ + + def on_ping(app, msg): + print("Got a ping!") + app.close() + + def on_pong(app, msg): + print("Got a pong! No need to respond") + app.close() + + app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1', on_ping=on_ping, on_pong=on_pong) + app.run_forever(ping_interval=2, ping_timeout=1, sslopt={"cert_reqs": ssl.CERT_NONE}) + + @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") + def testOpcodeClose(self): + """ Test WebSocketApp close opcode + """ + + app = ws.WebSocketApp('wss://tsock.us1.twilio.com/v3/wsconnect') + app.run_forever(ping_interval=2, ping_timeout=1, ping_payload="Ping payload") + + # This is commented out because the URL no longer responds in the expected way + # @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") + # def testOpcodeBinary(self): + # """ Test WebSocketApp binary opcode + # """ + # app = ws.WebSocketApp('wss://streaming.vn.teslamotors.com/streaming/') + # app.run_forever(ping_interval=2, ping_timeout=1, ping_payload="Ping payload") + + @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") + def testBadPingInterval(self): + """ A WebSocketApp handling of negative ping_interval + """ + app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1') + self.assertRaises(ws.WebSocketException, app.run_forever, ping_interval=-5, sslopt={"cert_reqs": ssl.CERT_NONE}) + + @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") + def testBadPingTimeout(self): + """ A WebSocketApp handling of negative ping_timeout + """ + app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1') + self.assertRaises(ws.WebSocketException, app.run_forever, ping_timeout=-3, sslopt={"cert_reqs": ssl.CERT_NONE}) + + @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") + def testCloseStatusCode(self): + """ Test extraction of close frame status code and close reason in WebSocketApp + """ + def on_close(wsapp, close_status_code, close_msg): + print("on_close reached") + + app = ws.WebSocketApp('wss://tsock.us1.twilio.com/v3/wsconnect', on_close=on_close) + closeframe = ws.ABNF(opcode=ws.ABNF.OPCODE_CLOSE, data=b'\x03\xe8no-init-from-client') + self.assertEqual([1000, 'no-init-from-client'], app._get_close_args(closeframe)) + + closeframe = ws.ABNF(opcode=ws.ABNF.OPCODE_CLOSE, data=b'') + self.assertEqual([None, None], app._get_close_args(closeframe)) + + app2 = ws.WebSocketApp('wss://tsock.us1.twilio.com/v3/wsconnect') + closeframe = ws.ABNF(opcode=ws.ABNF.OPCODE_CLOSE, data=b'') + self.assertEqual([None, None], app2._get_close_args(closeframe)) + + self.assertRaises(ws.WebSocketConnectionClosedException, app.send, data="test if connection is closed") + + @unittest.skipUnless(TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled") + def testCallbackFunctionException(self): + """ Test callback function exception handling """ + + exc = None + passed_app = None + + def on_open(app): + raise RuntimeError("Callback failed") + + def on_error(app, err): + nonlocal passed_app + passed_app = app + nonlocal exc + exc = err + + def on_pong(app, msg): + app.close() + + app = ws.WebSocketApp('ws://127.0.0.1:' + LOCAL_WS_SERVER_PORT, on_open=on_open, on_error=on_error, on_pong=on_pong) + app.run_forever(ping_interval=2, ping_timeout=1) + + self.assertEqual(passed_app, app) + self.assertIsInstance(exc, RuntimeError) + self.assertEqual(str(exc), "Callback failed") + + @unittest.skipUnless(TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled") + def testCallbackMethodException(self): + """ Test callback method exception handling """ + + class Callbacks: + def __init__(self): + self.exc = None + self.passed_app = None + self.app = ws.WebSocketApp( + 'ws://127.0.0.1:' + LOCAL_WS_SERVER_PORT, + on_open=self.on_open, + on_error=self.on_error, + on_pong=self.on_pong + ) + self.app.run_forever(ping_interval=2, ping_timeout=1) + + def on_open(self, app): + raise RuntimeError("Callback failed") + + def on_error(self, app, err): + self.passed_app = app + self.exc = err + + def on_pong(self, app, msg): + app.close() + + callbacks = Callbacks() + + self.assertEqual(callbacks.passed_app, callbacks.app) + self.assertIsInstance(callbacks.exc, RuntimeError) + self.assertEqual(str(callbacks.exc), "Callback failed") + + @unittest.skipUnless(TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled") + def testReconnect(self): + """ Test reconnect """ + pong_count = 0 + exc = None + + def on_error(app, err): + nonlocal exc + exc = err + + def on_pong(app, msg): + nonlocal pong_count + pong_count += 1 + if pong_count == 1: + # First pong, shutdown socket, enforce read error + app.sock.shutdown() + if pong_count >= 2: + # Got second pong after reconnect + app.close() + + app = ws.WebSocketApp('ws://127.0.0.1:' + LOCAL_WS_SERVER_PORT, on_pong=on_pong, on_error=on_error) + app.run_forever(ping_interval=2, ping_timeout=1, reconnect=3) + + self.assertEqual(pong_count, 2) + self.assertIsInstance(exc, ws.WebSocketTimeoutException) + self.assertEqual(str(exc), "ping/pong timed out") + + +if __name__ == "__main__": + unittest.main() diff --git a/contrib/python/websocket-client/py3/websocket/tests/test_cookiejar.py b/contrib/python/websocket-client/py3/websocket/tests/test_cookiejar.py new file mode 100644 index 0000000000..8f835e9e7c --- /dev/null +++ b/contrib/python/websocket-client/py3/websocket/tests/test_cookiejar.py @@ -0,0 +1,116 @@ +import unittest +from websocket._cookiejar import SimpleCookieJar + +""" +test_cookiejar.py +websocket - WebSocket client library for Python + +Copyright 2023 engn33r + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + + +class CookieJarTest(unittest.TestCase): + def testAdd(self): + cookie_jar = SimpleCookieJar() + cookie_jar.add("") + self.assertFalse(cookie_jar.jar, "Cookie with no domain should not be added to the jar") + + cookie_jar = SimpleCookieJar() + cookie_jar.add("a=b") + self.assertFalse(cookie_jar.jar, "Cookie with no domain should not be added to the jar") + + cookie_jar = SimpleCookieJar() + cookie_jar.add("a=b; domain=.abc") + self.assertTrue(".abc" in cookie_jar.jar) + + cookie_jar = SimpleCookieJar() + cookie_jar.add("a=b; domain=abc") + self.assertTrue(".abc" in cookie_jar.jar) + self.assertTrue("abc" not in cookie_jar.jar) + + cookie_jar = SimpleCookieJar() + cookie_jar.add("a=b; c=d; domain=abc") + self.assertEqual(cookie_jar.get("abc"), "a=b; c=d") + self.assertEqual(cookie_jar.get(None), "") + + cookie_jar = SimpleCookieJar() + cookie_jar.add("a=b; c=d; domain=abc") + cookie_jar.add("e=f; domain=abc") + self.assertEqual(cookie_jar.get("abc"), "a=b; c=d; e=f") + + cookie_jar = SimpleCookieJar() + cookie_jar.add("a=b; c=d; domain=abc") + cookie_jar.add("e=f; domain=.abc") + self.assertEqual(cookie_jar.get("abc"), "a=b; c=d; e=f") + + cookie_jar = SimpleCookieJar() + cookie_jar.add("a=b; c=d; domain=abc") + cookie_jar.add("e=f; domain=xyz") + self.assertEqual(cookie_jar.get("abc"), "a=b; c=d") + self.assertEqual(cookie_jar.get("xyz"), "e=f") + self.assertEqual(cookie_jar.get("something"), "") + + def testSet(self): + cookie_jar = SimpleCookieJar() + cookie_jar.set("a=b") + self.assertFalse(cookie_jar.jar, "Cookie with no domain should not be added to the jar") + + cookie_jar = SimpleCookieJar() + cookie_jar.set("a=b; domain=.abc") + self.assertTrue(".abc" in cookie_jar.jar) + + cookie_jar = SimpleCookieJar() + cookie_jar.set("a=b; domain=abc") + self.assertTrue(".abc" in cookie_jar.jar) + self.assertTrue("abc" not in cookie_jar.jar) + + cookie_jar = SimpleCookieJar() + cookie_jar.set("a=b; c=d; domain=abc") + self.assertEqual(cookie_jar.get("abc"), "a=b; c=d") + + cookie_jar = SimpleCookieJar() + cookie_jar.set("a=b; c=d; domain=abc") + cookie_jar.set("e=f; domain=abc") + self.assertEqual(cookie_jar.get("abc"), "e=f") + + cookie_jar = SimpleCookieJar() + cookie_jar.set("a=b; c=d; domain=abc") + cookie_jar.set("e=f; domain=.abc") + self.assertEqual(cookie_jar.get("abc"), "e=f") + + cookie_jar = SimpleCookieJar() + cookie_jar.set("a=b; c=d; domain=abc") + cookie_jar.set("e=f; domain=xyz") + self.assertEqual(cookie_jar.get("abc"), "a=b; c=d") + self.assertEqual(cookie_jar.get("xyz"), "e=f") + self.assertEqual(cookie_jar.get("something"), "") + + def testGet(self): + cookie_jar = SimpleCookieJar() + cookie_jar.set("a=b; c=d; domain=abc.com") + self.assertEqual(cookie_jar.get("abc.com"), "a=b; c=d") + self.assertEqual(cookie_jar.get("x.abc.com"), "a=b; c=d") + self.assertEqual(cookie_jar.get("abc.com.es"), "") + self.assertEqual(cookie_jar.get("xabc.com"), "") + + cookie_jar.set("a=b; c=d; domain=.abc.com") + self.assertEqual(cookie_jar.get("abc.com"), "a=b; c=d") + self.assertEqual(cookie_jar.get("x.abc.com"), "a=b; c=d") + self.assertEqual(cookie_jar.get("abc.com.es"), "") + self.assertEqual(cookie_jar.get("xabc.com"), "") + + +if __name__ == "__main__": + unittest.main() diff --git a/contrib/python/websocket-client/py3/websocket/tests/test_http.py b/contrib/python/websocket-client/py3/websocket/tests/test_http.py new file mode 100644 index 0000000000..456279f288 --- /dev/null +++ b/contrib/python/websocket-client/py3/websocket/tests/test_http.py @@ -0,0 +1,177 @@ +# -*- coding: utf-8 -*- +# +import os +import os.path +import websocket as ws +from websocket._http import proxy_info, read_headers, _start_proxied_socket, _tunnel, _get_addrinfo_list, connect +import unittest +import ssl +import websocket +import socket + +""" +test_http.py +websocket - WebSocket client library for Python + +Copyright 2023 engn33r + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +try: + from python_socks._errors import ProxyError, ProxyTimeoutError, ProxyConnectionError +except: + from websocket._http import ProxyError, ProxyTimeoutError, ProxyConnectionError + +# Skip test to access the internet unless TEST_WITH_INTERNET == 1 +TEST_WITH_INTERNET = os.environ.get('TEST_WITH_INTERNET', '0') == '1' +TEST_WITH_PROXY = os.environ.get('TEST_WITH_PROXY', '0') == '1' +# Skip tests relying on local websockets server unless LOCAL_WS_SERVER_PORT != -1 +LOCAL_WS_SERVER_PORT = os.environ.get('LOCAL_WS_SERVER_PORT', '-1') +TEST_WITH_LOCAL_SERVER = LOCAL_WS_SERVER_PORT != '-1' + + +class SockMock: + def __init__(self): + self.data = [] + self.sent = [] + + def add_packet(self, data): + self.data.append(data) + + def gettimeout(self): + return None + + def recv(self, bufsize): + if self.data: + e = self.data.pop(0) + if isinstance(e, Exception): + raise e + if len(e) > bufsize: + self.data.insert(0, e[bufsize:]) + return e[:bufsize] + + def send(self, data): + self.sent.append(data) + return len(data) + + def close(self): + pass + + +class HeaderSockMock(SockMock): + + def __init__(self, fname): + SockMock.__init__(self) + import yatest.common + path = yatest.common.source_path(os.path.join('contrib/python/websocket-client/py3/websocket/tests', fname)) + with open(path, "rb") as f: + self.add_packet(f.read()) + + +class OptsList(): + + def __init__(self): + self.timeout = 1 + self.sockopt = [] + self.sslopt = {"cert_reqs": ssl.CERT_NONE} + + +class HttpTest(unittest.TestCase): + + def testReadHeader(self): + status, header, status_message = read_headers(HeaderSockMock("data/header01.txt")) + self.assertEqual(status, 101) + self.assertEqual(header["connection"], "Upgrade") + # header02.txt is intentionally malformed + self.assertRaises(ws.WebSocketException, read_headers, HeaderSockMock("data/header02.txt")) + + def testTunnel(self): + self.assertRaises(ws.WebSocketProxyException, _tunnel, HeaderSockMock("data/header01.txt"), "example.com", 80, ("username", "password")) + self.assertRaises(ws.WebSocketProxyException, _tunnel, HeaderSockMock("data/header02.txt"), "example.com", 80, ("username", "password")) + + @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") + def testConnect(self): + # Not currently testing an actual proxy connection, so just check whether proxy errors are raised. This requires internet for a DNS lookup + if ws._http.HAVE_PYTHON_SOCKS: + # Need this check, otherwise case where python_socks is not installed triggers + # websocket._exceptions.WebSocketException: Python Socks is needed for SOCKS proxying but is not available + self.assertRaises((ProxyTimeoutError, OSError), _start_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="socks4", http_proxy_timeout=1)) + self.assertRaises((ProxyTimeoutError, OSError), _start_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="socks4a", http_proxy_timeout=1)) + self.assertRaises((ProxyTimeoutError, OSError), _start_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="socks5", http_proxy_timeout=1)) + self.assertRaises((ProxyTimeoutError, OSError), _start_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="socks5h", http_proxy_timeout=1)) + self.assertRaises(ProxyConnectionError, connect, "wss://example.com", OptsList(), proxy_info(http_proxy_host="127.0.0.1", http_proxy_port=9999, proxy_type="socks4", http_proxy_timeout=1), None) + + self.assertRaises(TypeError, _get_addrinfo_list, None, 80, True, proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="9999", proxy_type="http")) + self.assertRaises(TypeError, _get_addrinfo_list, None, 80, True, proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="9999", proxy_type="http")) + self.assertRaises(socket.timeout, connect, "wss://google.com", OptsList(), proxy_info(http_proxy_host="8.8.8.8", http_proxy_port=9999, proxy_type="http", http_proxy_timeout=1), None) + self.assertEqual( + connect("wss://google.com", OptsList(), proxy_info(http_proxy_host="8.8.8.8", http_proxy_port=8080, proxy_type="http"), True), + (True, ("google.com", 443, "/"))) + # The following test fails on Mac OS with a gaierror, not an OverflowError + # self.assertRaises(OverflowError, connect, "wss://example.com", OptsList(), proxy_info(http_proxy_host="127.0.0.1", http_proxy_port=99999, proxy_type="socks4", timeout=2), False) + + @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") + @unittest.skipUnless(TEST_WITH_PROXY, "This test requires a HTTP proxy to be running on port 8899") + @unittest.skipUnless(TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled") + def testProxyConnect(self): + ws = websocket.WebSocket() + ws.connect("ws://127.0.0.1:" + LOCAL_WS_SERVER_PORT, http_proxy_host="127.0.0.1", http_proxy_port="8899", proxy_type="http") + ws.send("Hello, Server") + server_response = ws.recv() + self.assertEqual(server_response, "Hello, Server") + # self.assertEqual(_start_proxied_socket("wss://api.bitfinex.com/ws/2", OptsList(), proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8899", proxy_type="http"))[1], ("api.bitfinex.com", 443, '/ws/2')) + self.assertEqual(_get_addrinfo_list("api.bitfinex.com", 443, True, proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8899", proxy_type="http")), + (socket.getaddrinfo("127.0.0.1", 8899, 0, socket.SOCK_STREAM, socket.SOL_TCP), True, None)) + self.assertEqual(connect("wss://api.bitfinex.com/ws/2", OptsList(), proxy_info(http_proxy_host="127.0.0.1", http_proxy_port=8899, proxy_type="http"), None)[1], ("api.bitfinex.com", 443, '/ws/2')) + # TODO: Test SOCKS4 and SOCK5 proxies with unit tests + + @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") + def testSSLopt(self): + ssloptions = { + "check_hostname": False, + "server_hostname": "ServerName", + "ssl_version": ssl.PROTOCOL_TLS_CLIENT, + "ciphers": "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:\ + TLS_AES_128_GCM_SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:\ + ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-GCM-SHA384:\ + ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:\ + DHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:\ + ECDHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES128-GCM-SHA256:\ + ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:\ + DHE-RSA-AES256-SHA256:ECDHE-ECDSA-AES128-SHA256:\ + ECDHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA256:\ + ECDHE-ECDSA-AES256-SHA:ECDHE-RSA-AES256-SHA", + "ecdh_curve": "prime256v1" + } + ws_ssl1 = websocket.WebSocket(sslopt=ssloptions) + ws_ssl1.connect("wss://api.bitfinex.com/ws/2") + ws_ssl1.send("Hello") + ws_ssl1.close() + + ws_ssl2 = websocket.WebSocket(sslopt={"check_hostname": True}) + ws_ssl2.connect("wss://api.bitfinex.com/ws/2") + ws_ssl2.close + + def testProxyInfo(self): + self.assertEqual(proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http").proxy_protocol, "http") + self.assertRaises(ProxyError, proxy_info, http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="badval") + self.assertEqual(proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="http").proxy_host, "example.com") + self.assertEqual(proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http").proxy_port, "8080") + self.assertEqual(proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http").auth, None) + self.assertEqual(proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http", http_proxy_auth=("my_username123", "my_pass321")).auth[0], "my_username123") + self.assertEqual(proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http", http_proxy_auth=("my_username123", "my_pass321")).auth[1], "my_pass321") + + +if __name__ == "__main__": + unittest.main() diff --git a/contrib/python/websocket-client/py3/websocket/tests/test_url.py b/contrib/python/websocket-client/py3/websocket/tests/test_url.py new file mode 100644 index 0000000000..a74dd7669d --- /dev/null +++ b/contrib/python/websocket-client/py3/websocket/tests/test_url.py @@ -0,0 +1,319 @@ +# -*- coding: utf-8 -*- +# +import os +import unittest +from websocket._url import get_proxy_info, parse_url, _is_address_in_network, _is_no_proxy_host + +""" +test_url.py +websocket - WebSocket client library for Python + +Copyright 2023 engn33r + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + + +class UrlTest(unittest.TestCase): + + def test_address_in_network(self): + self.assertTrue(_is_address_in_network('127.0.0.1', '127.0.0.0/8')) + self.assertTrue(_is_address_in_network('127.1.0.1', '127.0.0.0/8')) + self.assertFalse(_is_address_in_network('127.1.0.1', '127.0.0.0/24')) + + def testParseUrl(self): + p = parse_url("ws://www.example.com/r") + self.assertEqual(p[0], "www.example.com") + self.assertEqual(p[1], 80) + self.assertEqual(p[2], "/r") + self.assertEqual(p[3], False) + + p = parse_url("ws://www.example.com/r/") + self.assertEqual(p[0], "www.example.com") + self.assertEqual(p[1], 80) + self.assertEqual(p[2], "/r/") + self.assertEqual(p[3], False) + + p = parse_url("ws://www.example.com/") + self.assertEqual(p[0], "www.example.com") + self.assertEqual(p[1], 80) + self.assertEqual(p[2], "/") + self.assertEqual(p[3], False) + + p = parse_url("ws://www.example.com") + self.assertEqual(p[0], "www.example.com") + self.assertEqual(p[1], 80) + self.assertEqual(p[2], "/") + self.assertEqual(p[3], False) + + p = parse_url("ws://www.example.com:8080/r") + self.assertEqual(p[0], "www.example.com") + self.assertEqual(p[1], 8080) + self.assertEqual(p[2], "/r") + self.assertEqual(p[3], False) + + p = parse_url("ws://www.example.com:8080/") + self.assertEqual(p[0], "www.example.com") + self.assertEqual(p[1], 8080) + self.assertEqual(p[2], "/") + self.assertEqual(p[3], False) + + p = parse_url("ws://www.example.com:8080") + self.assertEqual(p[0], "www.example.com") + self.assertEqual(p[1], 8080) + self.assertEqual(p[2], "/") + self.assertEqual(p[3], False) + + p = parse_url("wss://www.example.com:8080/r") + self.assertEqual(p[0], "www.example.com") + self.assertEqual(p[1], 8080) + self.assertEqual(p[2], "/r") + self.assertEqual(p[3], True) + + p = parse_url("wss://www.example.com:8080/r?key=value") + self.assertEqual(p[0], "www.example.com") + self.assertEqual(p[1], 8080) + self.assertEqual(p[2], "/r?key=value") + self.assertEqual(p[3], True) + + self.assertRaises(ValueError, parse_url, "http://www.example.com/r") + + p = parse_url("ws://[2a03:4000:123:83::3]/r") + self.assertEqual(p[0], "2a03:4000:123:83::3") + self.assertEqual(p[1], 80) + self.assertEqual(p[2], "/r") + self.assertEqual(p[3], False) + + p = parse_url("ws://[2a03:4000:123:83::3]:8080/r") + self.assertEqual(p[0], "2a03:4000:123:83::3") + self.assertEqual(p[1], 8080) + self.assertEqual(p[2], "/r") + self.assertEqual(p[3], False) + + p = parse_url("wss://[2a03:4000:123:83::3]/r") + self.assertEqual(p[0], "2a03:4000:123:83::3") + self.assertEqual(p[1], 443) + self.assertEqual(p[2], "/r") + self.assertEqual(p[3], True) + + p = parse_url("wss://[2a03:4000:123:83::3]:8080/r") + self.assertEqual(p[0], "2a03:4000:123:83::3") + self.assertEqual(p[1], 8080) + self.assertEqual(p[2], "/r") + self.assertEqual(p[3], True) + + +class IsNoProxyHostTest(unittest.TestCase): + def setUp(self): + self.no_proxy = os.environ.get("no_proxy", None) + if "no_proxy" in os.environ: + del os.environ["no_proxy"] + + def tearDown(self): + if self.no_proxy: + os.environ["no_proxy"] = self.no_proxy + elif "no_proxy" in os.environ: + del os.environ["no_proxy"] + + def testMatchAll(self): + self.assertTrue(_is_no_proxy_host("any.websocket.org", ['*'])) + self.assertTrue(_is_no_proxy_host("192.168.0.1", ['*'])) + self.assertTrue(_is_no_proxy_host("any.websocket.org", ['other.websocket.org', '*'])) + os.environ['no_proxy'] = '*' + self.assertTrue(_is_no_proxy_host("any.websocket.org", None)) + self.assertTrue(_is_no_proxy_host("192.168.0.1", None)) + os.environ['no_proxy'] = 'other.websocket.org, *' + self.assertTrue(_is_no_proxy_host("any.websocket.org", None)) + + def testIpAddress(self): + self.assertTrue(_is_no_proxy_host("127.0.0.1", ['127.0.0.1'])) + self.assertFalse(_is_no_proxy_host("127.0.0.2", ['127.0.0.1'])) + self.assertTrue(_is_no_proxy_host("127.0.0.1", ['other.websocket.org', '127.0.0.1'])) + self.assertFalse(_is_no_proxy_host("127.0.0.2", ['other.websocket.org', '127.0.0.1'])) + os.environ['no_proxy'] = '127.0.0.1' + self.assertTrue(_is_no_proxy_host("127.0.0.1", None)) + self.assertFalse(_is_no_proxy_host("127.0.0.2", None)) + os.environ['no_proxy'] = 'other.websocket.org, 127.0.0.1' + self.assertTrue(_is_no_proxy_host("127.0.0.1", None)) + self.assertFalse(_is_no_proxy_host("127.0.0.2", None)) + + def testIpAddressInRange(self): + self.assertTrue(_is_no_proxy_host("127.0.0.1", ['127.0.0.0/8'])) + self.assertTrue(_is_no_proxy_host("127.0.0.2", ['127.0.0.0/8'])) + self.assertFalse(_is_no_proxy_host("127.1.0.1", ['127.0.0.0/24'])) + os.environ['no_proxy'] = '127.0.0.0/8' + self.assertTrue(_is_no_proxy_host("127.0.0.1", None)) + self.assertTrue(_is_no_proxy_host("127.0.0.2", None)) + os.environ['no_proxy'] = '127.0.0.0/24' + self.assertFalse(_is_no_proxy_host("127.1.0.1", None)) + + def testHostnameMatch(self): + self.assertTrue(_is_no_proxy_host("my.websocket.org", ['my.websocket.org'])) + self.assertTrue(_is_no_proxy_host("my.websocket.org", ['other.websocket.org', 'my.websocket.org'])) + self.assertFalse(_is_no_proxy_host("my.websocket.org", ['other.websocket.org'])) + os.environ['no_proxy'] = 'my.websocket.org' + self.assertTrue(_is_no_proxy_host("my.websocket.org", None)) + self.assertFalse(_is_no_proxy_host("other.websocket.org", None)) + os.environ['no_proxy'] = 'other.websocket.org, my.websocket.org' + self.assertTrue(_is_no_proxy_host("my.websocket.org", None)) + + def testHostnameMatchDomain(self): + self.assertTrue(_is_no_proxy_host("any.websocket.org", ['.websocket.org'])) + self.assertTrue(_is_no_proxy_host("my.other.websocket.org", ['.websocket.org'])) + self.assertTrue(_is_no_proxy_host("any.websocket.org", ['my.websocket.org', '.websocket.org'])) + self.assertFalse(_is_no_proxy_host("any.websocket.com", ['.websocket.org'])) + os.environ['no_proxy'] = '.websocket.org' + self.assertTrue(_is_no_proxy_host("any.websocket.org", None)) + self.assertTrue(_is_no_proxy_host("my.other.websocket.org", None)) + self.assertFalse(_is_no_proxy_host("any.websocket.com", None)) + os.environ['no_proxy'] = 'my.websocket.org, .websocket.org' + self.assertTrue(_is_no_proxy_host("any.websocket.org", None)) + + +class ProxyInfoTest(unittest.TestCase): + def setUp(self): + self.http_proxy = os.environ.get("http_proxy", None) + self.https_proxy = os.environ.get("https_proxy", None) + self.no_proxy = os.environ.get("no_proxy", None) + if "http_proxy" in os.environ: + del os.environ["http_proxy"] + if "https_proxy" in os.environ: + del os.environ["https_proxy"] + if "no_proxy" in os.environ: + del os.environ["no_proxy"] + + def tearDown(self): + if self.http_proxy: + os.environ["http_proxy"] = self.http_proxy + elif "http_proxy" in os.environ: + del os.environ["http_proxy"] + + if self.https_proxy: + os.environ["https_proxy"] = self.https_proxy + elif "https_proxy" in os.environ: + del os.environ["https_proxy"] + + if self.no_proxy: + os.environ["no_proxy"] = self.no_proxy + elif "no_proxy" in os.environ: + del os.environ["no_proxy"] + + def testProxyFromArgs(self): + self.assertEqual(get_proxy_info("echo.websocket.events", False, proxy_host="localhost"), ("localhost", 0, None)) + self.assertEqual(get_proxy_info("echo.websocket.events", False, proxy_host="localhost", proxy_port=3128), + ("localhost", 3128, None)) + self.assertEqual(get_proxy_info("echo.websocket.events", True, proxy_host="localhost"), ("localhost", 0, None)) + self.assertEqual(get_proxy_info("echo.websocket.events", True, proxy_host="localhost", proxy_port=3128), + ("localhost", 3128, None)) + + self.assertEqual(get_proxy_info("echo.websocket.events", False, proxy_host="localhost", proxy_auth=("a", "b")), + ("localhost", 0, ("a", "b"))) + self.assertEqual( + get_proxy_info("echo.websocket.events", False, proxy_host="localhost", proxy_port=3128, proxy_auth=("a", "b")), + ("localhost", 3128, ("a", "b"))) + self.assertEqual(get_proxy_info("echo.websocket.events", True, proxy_host="localhost", proxy_auth=("a", "b")), + ("localhost", 0, ("a", "b"))) + self.assertEqual( + get_proxy_info("echo.websocket.events", True, proxy_host="localhost", proxy_port=3128, proxy_auth=("a", "b")), + ("localhost", 3128, ("a", "b"))) + + self.assertEqual(get_proxy_info("echo.websocket.events", True, proxy_host="localhost", proxy_port=3128, + no_proxy=["example.com"], proxy_auth=("a", "b")), + ("localhost", 3128, ("a", "b"))) + self.assertEqual(get_proxy_info("echo.websocket.events", True, proxy_host="localhost", proxy_port=3128, + no_proxy=["echo.websocket.events"], proxy_auth=("a", "b")), + (None, 0, None)) + + def testProxyFromEnv(self): + os.environ["http_proxy"] = "http://localhost/" + self.assertEqual(get_proxy_info("echo.websocket.events", False), ("localhost", None, None)) + os.environ["http_proxy"] = "http://localhost:3128/" + self.assertEqual(get_proxy_info("echo.websocket.events", False), ("localhost", 3128, None)) + + os.environ["http_proxy"] = "http://localhost/" + os.environ["https_proxy"] = "http://localhost2/" + self.assertEqual(get_proxy_info("echo.websocket.events", False), ("localhost", None, None)) + os.environ["http_proxy"] = "http://localhost:3128/" + os.environ["https_proxy"] = "http://localhost2:3128/" + self.assertEqual(get_proxy_info("echo.websocket.events", False), ("localhost", 3128, None)) + + os.environ["http_proxy"] = "http://localhost/" + os.environ["https_proxy"] = "http://localhost2/" + self.assertEqual(get_proxy_info("echo.websocket.events", True), ("localhost2", None, None)) + os.environ["http_proxy"] = "http://localhost:3128/" + os.environ["https_proxy"] = "http://localhost2:3128/" + self.assertEqual(get_proxy_info("echo.websocket.events", True), ("localhost2", 3128, None)) + + os.environ["http_proxy"] = "" + os.environ["https_proxy"] = "http://localhost2/" + self.assertEqual(get_proxy_info("echo.websocket.events", True), ("localhost2", None, None)) + self.assertEqual(get_proxy_info("echo.websocket.events", False), (None, 0, None)) + os.environ["http_proxy"] = "" + os.environ["https_proxy"] = "http://localhost2:3128/" + self.assertEqual(get_proxy_info("echo.websocket.events", True), ("localhost2", 3128, None)) + self.assertEqual(get_proxy_info("echo.websocket.events", False), (None, 0, None)) + + os.environ["http_proxy"] = "http://localhost/" + os.environ["https_proxy"] = "" + self.assertEqual(get_proxy_info("echo.websocket.events", True), (None, 0, None)) + self.assertEqual(get_proxy_info("echo.websocket.events", False), ("localhost", None, None)) + os.environ["http_proxy"] = "http://localhost:3128/" + os.environ["https_proxy"] = "" + self.assertEqual(get_proxy_info("echo.websocket.events", True), (None, 0, None)) + self.assertEqual(get_proxy_info("echo.websocket.events", False), ("localhost", 3128, None)) + + os.environ["http_proxy"] = "http://a:b@localhost/" + self.assertEqual(get_proxy_info("echo.websocket.events", False), ("localhost", None, ("a", "b"))) + os.environ["http_proxy"] = "http://a:b@localhost:3128/" + self.assertEqual(get_proxy_info("echo.websocket.events", False), ("localhost", 3128, ("a", "b"))) + + os.environ["http_proxy"] = "http://a:b@localhost/" + os.environ["https_proxy"] = "http://a:b@localhost2/" + self.assertEqual(get_proxy_info("echo.websocket.events", False), ("localhost", None, ("a", "b"))) + os.environ["http_proxy"] = "http://a:b@localhost:3128/" + os.environ["https_proxy"] = "http://a:b@localhost2:3128/" + self.assertEqual(get_proxy_info("echo.websocket.events", False), ("localhost", 3128, ("a", "b"))) + + os.environ["http_proxy"] = "http://a:b@localhost/" + os.environ["https_proxy"] = "http://a:b@localhost2/" + self.assertEqual(get_proxy_info("echo.websocket.events", True), ("localhost2", None, ("a", "b"))) + os.environ["http_proxy"] = "http://a:b@localhost:3128/" + os.environ["https_proxy"] = "http://a:b@localhost2:3128/" + self.assertEqual(get_proxy_info("echo.websocket.events", True), ("localhost2", 3128, ("a", "b"))) + + os.environ["http_proxy"] = "http://john%40example.com:P%40SSWORD@localhost:3128/" + os.environ["https_proxy"] = "http://john%40example.com:P%40SSWORD@localhost2:3128/" + self.assertEqual(get_proxy_info("echo.websocket.events", True), ("localhost2", 3128, ("john@example.com", "P@SSWORD"))) + + os.environ["http_proxy"] = "http://a:b@localhost/" + os.environ["https_proxy"] = "http://a:b@localhost2/" + os.environ["no_proxy"] = "example1.com,example2.com" + self.assertEqual(get_proxy_info("example.1.com", True), ("localhost2", None, ("a", "b"))) + os.environ["http_proxy"] = "http://a:b@localhost:3128/" + os.environ["https_proxy"] = "http://a:b@localhost2:3128/" + os.environ["no_proxy"] = "example1.com,example2.com, echo.websocket.events" + self.assertEqual(get_proxy_info("echo.websocket.events", True), (None, 0, None)) + os.environ["http_proxy"] = "http://a:b@localhost:3128/" + os.environ["https_proxy"] = "http://a:b@localhost2:3128/" + os.environ["no_proxy"] = "example1.com,example2.com, .websocket.events" + self.assertEqual(get_proxy_info("echo.websocket.events", True), (None, 0, None)) + + os.environ["http_proxy"] = "http://a:b@localhost:3128/" + os.environ["https_proxy"] = "http://a:b@localhost2:3128/" + os.environ["no_proxy"] = "127.0.0.0/8, 192.168.0.0/16" + self.assertEqual(get_proxy_info("127.0.0.1", False), (None, 0, None)) + self.assertEqual(get_proxy_info("192.168.1.1", False), (None, 0, None)) + + +if __name__ == "__main__": + unittest.main() diff --git a/contrib/python/websocket-client/py3/websocket/tests/test_websocket.py b/contrib/python/websocket-client/py3/websocket/tests/test_websocket.py new file mode 100644 index 0000000000..54555c8b6c --- /dev/null +++ b/contrib/python/websocket-client/py3/websocket/tests/test_websocket.py @@ -0,0 +1,456 @@ +# -*- coding: utf-8 -*- +# +import os +import os.path +import socket +import websocket as ws +import unittest +from websocket._handshake import _create_sec_websocket_key, \ + _validate as _validate_header +from websocket._http import read_headers +from websocket._utils import validate_utf8 +from base64 import decodebytes as base64decode + +""" +test_websocket.py +websocket - WebSocket client library for Python + +Copyright 2023 engn33r + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +try: + import ssl + from ssl import SSLError +except ImportError: + # dummy class of SSLError for ssl none-support environment. + class SSLError(Exception): + pass + +# Skip test to access the internet unless TEST_WITH_INTERNET == 1 +TEST_WITH_INTERNET = os.environ.get('TEST_WITH_INTERNET', '0') == '1' +# Skip tests relying on local websockets server unless LOCAL_WS_SERVER_PORT != -1 +LOCAL_WS_SERVER_PORT = os.environ.get('LOCAL_WS_SERVER_PORT', '-1') +TEST_WITH_LOCAL_SERVER = LOCAL_WS_SERVER_PORT != '-1' +TRACEABLE = True + + +def create_mask_key(_): + return "abcd" + + +class SockMock: + def __init__(self): + self.data = [] + self.sent = [] + + def add_packet(self, data): + self.data.append(data) + + def gettimeout(self): + return None + + def recv(self, bufsize): + if self.data: + e = self.data.pop(0) + if isinstance(e, Exception): + raise e + if len(e) > bufsize: + self.data.insert(0, e[bufsize:]) + return e[:bufsize] + + def send(self, data): + self.sent.append(data) + return len(data) + + def close(self): + pass + + +class HeaderSockMock(SockMock): + + def __init__(self, fname): + SockMock.__init__(self) + import yatest.common + path = yatest.common.source_path(os.path.join('contrib/python/websocket-client/py3/websocket/tests', fname)) + with open(path, "rb") as f: + self.add_packet(f.read()) + + +class WebSocketTest(unittest.TestCase): + def setUp(self): + ws.enableTrace(TRACEABLE) + + def tearDown(self): + pass + + def testDefaultTimeout(self): + self.assertEqual(ws.getdefaulttimeout(), None) + ws.setdefaulttimeout(10) + self.assertEqual(ws.getdefaulttimeout(), 10) + ws.setdefaulttimeout(None) + + def testWSKey(self): + key = _create_sec_websocket_key() + self.assertTrue(key != 24) + self.assertTrue(str("¥n") not in key) + + def testNonce(self): + """ WebSocket key should be a random 16-byte nonce. + """ + key = _create_sec_websocket_key() + nonce = base64decode(key.encode("utf-8")) + self.assertEqual(16, len(nonce)) + + def testWsUtils(self): + key = "c6b8hTg4EeGb2gQMztV1/g==" + required_header = { + "upgrade": "websocket", + "connection": "upgrade", + "sec-websocket-accept": "Kxep+hNu9n51529fGidYu7a3wO0="} + self.assertEqual(_validate_header(required_header, key, None), (True, None)) + + header = required_header.copy() + header["upgrade"] = "http" + self.assertEqual(_validate_header(header, key, None), (False, None)) + del header["upgrade"] + self.assertEqual(_validate_header(header, key, None), (False, None)) + + header = required_header.copy() + header["connection"] = "something" + self.assertEqual(_validate_header(header, key, None), (False, None)) + del header["connection"] + self.assertEqual(_validate_header(header, key, None), (False, None)) + + header = required_header.copy() + header["sec-websocket-accept"] = "something" + self.assertEqual(_validate_header(header, key, None), (False, None)) + del header["sec-websocket-accept"] + self.assertEqual(_validate_header(header, key, None), (False, None)) + + header = required_header.copy() + header["sec-websocket-protocol"] = "sub1" + self.assertEqual(_validate_header(header, key, ["sub1", "sub2"]), (True, "sub1")) + # This case will print out a logging error using the error() function, but that is expected + self.assertEqual(_validate_header(header, key, ["sub2", "sub3"]), (False, None)) + + header = required_header.copy() + header["sec-websocket-protocol"] = "sUb1" + self.assertEqual(_validate_header(header, key, ["Sub1", "suB2"]), (True, "sub1")) + + header = required_header.copy() + # This case will print out a logging error using the error() function, but that is expected + self.assertEqual(_validate_header(header, key, ["Sub1", "suB2"]), (False, None)) + + def testReadHeader(self): + status, header, status_message = read_headers(HeaderSockMock("data/header01.txt")) + self.assertEqual(status, 101) + self.assertEqual(header["connection"], "Upgrade") + + status, header, status_message = read_headers(HeaderSockMock("data/header03.txt")) + self.assertEqual(status, 101) + self.assertEqual(header["connection"], "Upgrade, Keep-Alive") + + HeaderSockMock("data/header02.txt") + self.assertRaises(ws.WebSocketException, read_headers, HeaderSockMock("data/header02.txt")) + + def testSend(self): + # TODO: add longer frame data + sock = ws.WebSocket() + sock.set_mask_key(create_mask_key) + s = sock.sock = HeaderSockMock("data/header01.txt") + sock.send("Hello") + self.assertEqual(s.sent[0], b'\x81\x85abcd)\x07\x0f\x08\x0e') + + sock.send("こんにちは") + self.assertEqual(s.sent[1], b'\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc') + +# sock.send("x" * 5000) +# self.assertEqual(s.sent[1], b'\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc") + + self.assertEqual(sock.send_binary(b'1111111111101'), 19) + + def testRecv(self): + # TODO: add longer frame data + sock = ws.WebSocket() + s = sock.sock = SockMock() + something = b'\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc' + s.add_packet(something) + data = sock.recv() + self.assertEqual(data, "こんにちは") + + s.add_packet(b'\x81\x85abcd)\x07\x0f\x08\x0e') + data = sock.recv() + self.assertEqual(data, "Hello") + + @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") + def testIter(self): + count = 2 + s = ws.create_connection('wss://api.bitfinex.com/ws/2') + s.send('{"event": "subscribe", "channel": "ticker"}') + for _ in s: + count -= 1 + if count == 0: + break + + @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") + def testNext(self): + sock = ws.create_connection('wss://api.bitfinex.com/ws/2') + self.assertEqual(str, type(next(sock))) + + def testInternalRecvStrict(self): + sock = ws.WebSocket() + s = sock.sock = SockMock() + s.add_packet(b'foo') + s.add_packet(socket.timeout()) + s.add_packet(b'bar') + # s.add_packet(SSLError("The read operation timed out")) + s.add_packet(b'baz') + with self.assertRaises(ws.WebSocketTimeoutException): + sock.frame_buffer.recv_strict(9) + # with self.assertRaises(SSLError): + # data = sock._recv_strict(9) + data = sock.frame_buffer.recv_strict(9) + self.assertEqual(data, b'foobarbaz') + with self.assertRaises(ws.WebSocketConnectionClosedException): + sock.frame_buffer.recv_strict(1) + + def testRecvTimeout(self): + sock = ws.WebSocket() + s = sock.sock = SockMock() + s.add_packet(b'\x81') + s.add_packet(socket.timeout()) + s.add_packet(b'\x8dabcd\x29\x07\x0f\x08\x0e') + s.add_packet(socket.timeout()) + s.add_packet(b'\x4e\x43\x33\x0e\x10\x0f\x00\x40') + with self.assertRaises(ws.WebSocketTimeoutException): + sock.recv() + with self.assertRaises(ws.WebSocketTimeoutException): + sock.recv() + data = sock.recv() + self.assertEqual(data, "Hello, World!") + with self.assertRaises(ws.WebSocketConnectionClosedException): + sock.recv() + + def testRecvWithSimpleFragmentation(self): + sock = ws.WebSocket() + s = sock.sock = SockMock() + # OPCODE=TEXT, FIN=0, MSG="Brevity is " + s.add_packet(b'\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C') + # OPCODE=CONT, FIN=1, MSG="the soul of wit" + s.add_packet(b'\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17') + data = sock.recv() + self.assertEqual(data, "Brevity is the soul of wit") + with self.assertRaises(ws.WebSocketConnectionClosedException): + sock.recv() + + def testRecvWithFireEventOfFragmentation(self): + sock = ws.WebSocket(fire_cont_frame=True) + s = sock.sock = SockMock() + # OPCODE=TEXT, FIN=0, MSG="Brevity is " + s.add_packet(b'\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C') + # OPCODE=CONT, FIN=0, MSG="Brevity is " + s.add_packet(b'\x00\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C') + # OPCODE=CONT, FIN=1, MSG="the soul of wit" + s.add_packet(b'\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17') + + _, data = sock.recv_data() + self.assertEqual(data, b'Brevity is ') + _, data = sock.recv_data() + self.assertEqual(data, b'Brevity is ') + _, data = sock.recv_data() + self.assertEqual(data, b'the soul of wit') + + # OPCODE=CONT, FIN=0, MSG="Brevity is " + s.add_packet(b'\x80\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C') + + with self.assertRaises(ws.WebSocketException): + sock.recv_data() + + with self.assertRaises(ws.WebSocketConnectionClosedException): + sock.recv() + + def testClose(self): + sock = ws.WebSocket() + sock.connected = True + sock.close + + sock = ws.WebSocket() + s = sock.sock = SockMock() + sock.connected = True + s.add_packet(b'\x88\x80\x17\x98p\x84') + sock.recv() + self.assertEqual(sock.connected, False) + + def testRecvContFragmentation(self): + sock = ws.WebSocket() + s = sock.sock = SockMock() + # OPCODE=CONT, FIN=1, MSG="the soul of wit" + s.add_packet(b'\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17') + self.assertRaises(ws.WebSocketException, sock.recv) + + def testRecvWithProlongedFragmentation(self): + sock = ws.WebSocket() + s = sock.sock = SockMock() + # OPCODE=TEXT, FIN=0, MSG="Once more unto the breach, " + s.add_packet(b'\x01\x9babcd.\x0c\x00\x01A\x0f\x0c\x16\x04B\x16\n\x15\rC\x10\t\x07C\x06\x13\x07\x02\x07\tNC') + # OPCODE=CONT, FIN=0, MSG="dear friends, " + s.add_packet(b'\x00\x8eabcd\x05\x07\x02\x16A\x04\x11\r\x04\x0c\x07\x17MB') + # OPCODE=CONT, FIN=1, MSG="once more" + s.add_packet(b'\x80\x89abcd\x0e\x0c\x00\x01A\x0f\x0c\x16\x04') + data = sock.recv() + self.assertEqual( + data, + "Once more unto the breach, dear friends, once more") + with self.assertRaises(ws.WebSocketConnectionClosedException): + sock.recv() + + def testRecvWithFragmentationAndControlFrame(self): + sock = ws.WebSocket() + sock.set_mask_key(create_mask_key) + s = sock.sock = SockMock() + # OPCODE=TEXT, FIN=0, MSG="Too much " + s.add_packet(b'\x01\x89abcd5\r\x0cD\x0c\x17\x00\x0cA') + # OPCODE=PING, FIN=1, MSG="Please PONG this" + s.add_packet(b'\x89\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17') + # OPCODE=CONT, FIN=1, MSG="of a good thing" + s.add_packet(b'\x80\x8fabcd\x0e\x04C\x05A\x05\x0c\x0b\x05B\x17\x0c\x08\x0c\x04') + data = sock.recv() + self.assertEqual(data, "Too much of a good thing") + with self.assertRaises(ws.WebSocketConnectionClosedException): + sock.recv() + self.assertEqual( + s.sent[0], + b'\x8a\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17') + + @unittest.skipUnless(TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled") + def testWebSocket(self): + s = ws.create_connection("ws://127.0.0.1:" + LOCAL_WS_SERVER_PORT) + self.assertNotEqual(s, None) + s.send("Hello, World") + result = s.next() + s.fileno() + self.assertEqual(result, "Hello, World") + + s.send("こにゃにゃちは、世界") + result = s.recv() + self.assertEqual(result, "こにゃにゃちは、世界") + self.assertRaises(ValueError, s.send_close, -1, "") + s.close() + + @unittest.skipUnless(TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled") + def testPingPong(self): + s = ws.create_connection("ws://127.0.0.1:" + LOCAL_WS_SERVER_PORT) + self.assertNotEqual(s, None) + s.ping("Hello") + s.pong("Hi") + s.close() + + @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") + def testSupportRedirect(self): + s = ws.WebSocket() + self.assertRaises(ws._exceptions.WebSocketBadStatusException, s.connect, "ws://google.com/") + # Need to find a URL that has a redirect code leading to a websocket + + @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") + def testSecureWebSocket(self): + import ssl + s = ws.create_connection("wss://api.bitfinex.com/ws/2") + self.assertNotEqual(s, None) + self.assertTrue(isinstance(s.sock, ssl.SSLSocket)) + self.assertEqual(s.getstatus(), 101) + self.assertNotEqual(s.getheaders(), None) + s.settimeout(10) + self.assertEqual(s.gettimeout(), 10) + self.assertEqual(s.getsubprotocol(), None) + s.abort() + + @unittest.skipUnless(TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled") + def testWebSocketWithCustomHeader(self): + s = ws.create_connection("ws://127.0.0.1:" + LOCAL_WS_SERVER_PORT, + headers={"User-Agent": "PythonWebsocketClient"}) + self.assertNotEqual(s, None) + self.assertEqual(s.getsubprotocol(), None) + s.send("Hello, World") + result = s.recv() + self.assertEqual(result, "Hello, World") + self.assertRaises(ValueError, s.close, -1, "") + s.close() + + @unittest.skipUnless(TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled") + def testAfterClose(self): + s = ws.create_connection("ws://127.0.0.1:" + LOCAL_WS_SERVER_PORT) + self.assertNotEqual(s, None) + s.close() + self.assertRaises(ws.WebSocketConnectionClosedException, s.send, "Hello") + self.assertRaises(ws.WebSocketConnectionClosedException, s.recv) + + +class SockOptTest(unittest.TestCase): + @unittest.skipUnless(TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled") + def testSockOpt(self): + sockopt = ((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),) + s = ws.create_connection("ws://127.0.0.1:" + LOCAL_WS_SERVER_PORT, sockopt=sockopt) + self.assertNotEqual(s.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0) + s.close() + + +class UtilsTest(unittest.TestCase): + def testUtf8Validator(self): + state = validate_utf8(b'\xf0\x90\x80\x80') + self.assertEqual(state, True) + state = validate_utf8(b'\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5\xed\xa0\x80edited') + self.assertEqual(state, False) + state = validate_utf8(b'') + self.assertEqual(state, True) + + +class HandshakeTest(unittest.TestCase): + @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") + def test_http_SSL(self): + websock1 = ws.WebSocket(sslopt={"cert_chain": ssl.get_default_verify_paths().capath}, enable_multithread=False) + self.assertRaises(ValueError, + websock1.connect, "wss://api.bitfinex.com/ws/2") + websock2 = ws.WebSocket(sslopt={"certfile": "myNonexistentCertFile"}) + self.assertRaises(FileNotFoundError, + websock2.connect, "wss://api.bitfinex.com/ws/2") + + @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") + def testManualHeaders(self): + websock3 = ws.WebSocket(sslopt={"ca_certs": ssl.get_default_verify_paths().cafile, + "ca_cert_path": ssl.get_default_verify_paths().capath}) + self.assertRaises(ws._exceptions.WebSocketBadStatusException, + websock3.connect, "wss://api.bitfinex.com/ws/2", cookie="chocolate", + origin="testing_websockets.com", + host="echo.websocket.events/websocket-client-test", + subprotocols=["testproto"], + connection="Upgrade", + header={"CustomHeader1":"123", + "Cookie":"TestValue", + "Sec-WebSocket-Key":"k9kFAUWNAMmf5OEMfTlOEA==", + "Sec-WebSocket-Protocol":"newprotocol"}) + + def testIPv6(self): + websock2 = ws.WebSocket() + self.assertRaises(ValueError, websock2.connect, "2001:4860:4860::8888") + + def testBadURLs(self): + websock3 = ws.WebSocket() + self.assertRaises(ValueError, websock3.connect, "ws//example.com") + self.assertRaises(ws.WebSocketAddressException, websock3.connect, "ws://example") + self.assertRaises(ValueError, websock3.connect, "example.com") + + +if __name__ == "__main__": + unittest.main() |