aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/websocket-client/websocket/tests
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-12-09 18:25:21 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-12-09 19:18:57 +0300
commit13374e0884578812cda7697d0c5680122db59a37 (patch)
tree30a022eb841035299deb2b8c393b2902f0c21735 /contrib/python/websocket-client/websocket/tests
parentc7ade6d3bf7cd492235a61b77153351e422a28f3 (diff)
downloadydb-13374e0884578812cda7697d0c5680122db59a37.tar.gz
Intermediate changes
commit_hash:034150f557268506d7bc0cbd8b5becf65f765593
Diffstat (limited to 'contrib/python/websocket-client/websocket/tests')
-rw-r--r--contrib/python/websocket-client/websocket/tests/__init__.py0
-rw-r--r--contrib/python/websocket-client/websocket/tests/data/header01.txt6
-rw-r--r--contrib/python/websocket-client/websocket/tests/data/header02.txt6
-rw-r--r--contrib/python/websocket-client/websocket/tests/data/header03.txt8
-rw-r--r--contrib/python/websocket-client/websocket/tests/echo-server.py23
-rw-r--r--contrib/python/websocket-client/websocket/tests/test_abnf.py125
-rw-r--r--contrib/python/websocket-client/websocket/tests/test_app.py352
-rw-r--r--contrib/python/websocket-client/websocket/tests/test_cookiejar.py123
-rw-r--r--contrib/python/websocket-client/websocket/tests/test_http.py371
-rw-r--r--contrib/python/websocket-client/websocket/tests/test_url.py464
-rw-r--r--contrib/python/websocket-client/websocket/tests/test_websocket.py498
11 files changed, 1976 insertions, 0 deletions
diff --git a/contrib/python/websocket-client/websocket/tests/__init__.py b/contrib/python/websocket-client/websocket/tests/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/contrib/python/websocket-client/websocket/tests/__init__.py
diff --git a/contrib/python/websocket-client/websocket/tests/data/header01.txt b/contrib/python/websocket-client/websocket/tests/data/header01.txt
new file mode 100644
index 0000000000..d44d24c205
--- /dev/null
+++ b/contrib/python/websocket-client/websocket/tests/data/header01.txt
@@ -0,0 +1,6 @@
+HTTP/1.1 101 WebSocket Protocol Handshake
+Connection: Upgrade
+Upgrade: WebSocket
+Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0=
+some_header: something
+
diff --git a/contrib/python/websocket-client/websocket/tests/data/header02.txt b/contrib/python/websocket-client/websocket/tests/data/header02.txt
new file mode 100644
index 0000000000..f481de928a
--- /dev/null
+++ b/contrib/python/websocket-client/websocket/tests/data/header02.txt
@@ -0,0 +1,6 @@
+HTTP/1.1 101 WebSocket Protocol Handshake
+Connection: Upgrade
+Upgrade WebSocket
+Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0=
+some_header: something
+
diff --git a/contrib/python/websocket-client/websocket/tests/data/header03.txt b/contrib/python/websocket-client/websocket/tests/data/header03.txt
new file mode 100644
index 0000000000..1a81dc70ce
--- /dev/null
+++ b/contrib/python/websocket-client/websocket/tests/data/header03.txt
@@ -0,0 +1,8 @@
+HTTP/1.1 101 WebSocket Protocol Handshake
+Connection: Upgrade, Keep-Alive
+Upgrade: WebSocket
+Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0=
+Set-Cookie: Token=ABCDE
+Set-Cookie: Token=FGHIJ
+some_header: something
+
diff --git a/contrib/python/websocket-client/websocket/tests/echo-server.py b/contrib/python/websocket-client/websocket/tests/echo-server.py
new file mode 100644
index 0000000000..5d1e87087b
--- /dev/null
+++ b/contrib/python/websocket-client/websocket/tests/echo-server.py
@@ -0,0 +1,23 @@
+#!/usr/bin/env python
+
+# From https://github.com/aaugustin/websockets/blob/main/example/echo.py
+
+import asyncio
+import os
+
+import websockets
+
+LOCAL_WS_SERVER_PORT = int(os.environ.get("LOCAL_WS_SERVER_PORT", "8765"))
+
+
+async def echo(websocket):
+ async for message in websocket:
+ await websocket.send(message)
+
+
+async def main():
+ async with websockets.serve(echo, "localhost", LOCAL_WS_SERVER_PORT):
+ await asyncio.Future() # run forever
+
+
+asyncio.run(main())
diff --git a/contrib/python/websocket-client/websocket/tests/test_abnf.py b/contrib/python/websocket-client/websocket/tests/test_abnf.py
new file mode 100644
index 0000000000..a749f13bd5
--- /dev/null
+++ b/contrib/python/websocket-client/websocket/tests/test_abnf.py
@@ -0,0 +1,125 @@
+# -*- coding: utf-8 -*-
+#
+import unittest
+
+from websocket._abnf import ABNF, frame_buffer
+from websocket._exceptions import WebSocketProtocolException
+
+"""
+test_abnf.py
+websocket - WebSocket client library for Python
+
+Copyright 2024 engn33r
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+
+class ABNFTest(unittest.TestCase):
+ def test_init(self):
+ a = ABNF(0, 0, 0, 0, opcode=ABNF.OPCODE_PING)
+ self.assertEqual(a.fin, 0)
+ self.assertEqual(a.rsv1, 0)
+ self.assertEqual(a.rsv2, 0)
+ self.assertEqual(a.rsv3, 0)
+ self.assertEqual(a.opcode, 9)
+ self.assertEqual(a.data, "")
+ a_bad = ABNF(0, 1, 0, 0, opcode=77)
+ self.assertEqual(a_bad.rsv1, 1)
+ self.assertEqual(a_bad.opcode, 77)
+
+ def test_validate(self):
+ a_invalid_ping = ABNF(0, 0, 0, 0, opcode=ABNF.OPCODE_PING)
+ self.assertRaises(
+ WebSocketProtocolException,
+ a_invalid_ping.validate,
+ skip_utf8_validation=False,
+ )
+ a_bad_rsv_value = ABNF(0, 1, 0, 0, opcode=ABNF.OPCODE_TEXT)
+ self.assertRaises(
+ WebSocketProtocolException,
+ a_bad_rsv_value.validate,
+ skip_utf8_validation=False,
+ )
+ a_bad_opcode = ABNF(0, 0, 0, 0, opcode=77)
+ self.assertRaises(
+ WebSocketProtocolException,
+ a_bad_opcode.validate,
+ skip_utf8_validation=False,
+ )
+ a_bad_close_frame = ABNF(0, 0, 0, 0, opcode=ABNF.OPCODE_CLOSE, data=b"\x01")
+ self.assertRaises(
+ WebSocketProtocolException,
+ a_bad_close_frame.validate,
+ skip_utf8_validation=False,
+ )
+ a_bad_close_frame_2 = ABNF(
+ 0, 0, 0, 0, opcode=ABNF.OPCODE_CLOSE, data=b"\x01\x8a\xaa\xff\xdd"
+ )
+ self.assertRaises(
+ WebSocketProtocolException,
+ a_bad_close_frame_2.validate,
+ skip_utf8_validation=False,
+ )
+ a_bad_close_frame_3 = ABNF(
+ 0, 0, 0, 0, opcode=ABNF.OPCODE_CLOSE, data=b"\x03\xe7"
+ )
+ self.assertRaises(
+ WebSocketProtocolException,
+ a_bad_close_frame_3.validate,
+ skip_utf8_validation=True,
+ )
+
+ def test_mask(self):
+ abnf_none_data = ABNF(
+ 0, 0, 0, 0, opcode=ABNF.OPCODE_PING, mask_value=1, data=None
+ )
+ bytes_val = b"aaaa"
+ self.assertEqual(abnf_none_data._get_masked(bytes_val), bytes_val)
+ abnf_str_data = ABNF(
+ 0, 0, 0, 0, opcode=ABNF.OPCODE_PING, mask_value=1, data="a"
+ )
+ self.assertEqual(abnf_str_data._get_masked(bytes_val), b"aaaa\x00")
+
+ def test_format(self):
+ abnf_bad_rsv_bits = ABNF(2, 0, 0, 0, opcode=ABNF.OPCODE_TEXT)
+ self.assertRaises(ValueError, abnf_bad_rsv_bits.format)
+ abnf_bad_opcode = ABNF(0, 0, 0, 0, opcode=5)
+ self.assertRaises(ValueError, abnf_bad_opcode.format)
+ abnf_length_10 = ABNF(0, 0, 0, 0, opcode=ABNF.OPCODE_TEXT, data="abcdefghij")
+ self.assertEqual(b"\x01", abnf_length_10.format()[0].to_bytes(1, "big"))
+ self.assertEqual(b"\x8a", abnf_length_10.format()[1].to_bytes(1, "big"))
+ self.assertEqual("fin=0 opcode=1 data=abcdefghij", abnf_length_10.__str__())
+ abnf_length_20 = ABNF(
+ 0, 0, 0, 0, opcode=ABNF.OPCODE_BINARY, data="abcdefghijabcdefghij"
+ )
+ self.assertEqual(b"\x02", abnf_length_20.format()[0].to_bytes(1, "big"))
+ self.assertEqual(b"\x94", abnf_length_20.format()[1].to_bytes(1, "big"))
+ abnf_no_mask = ABNF(
+ 0, 0, 0, 0, opcode=ABNF.OPCODE_TEXT, mask_value=0, data=b"\x01\x8a\xcc"
+ )
+ self.assertEqual(b"\x01\x03\x01\x8a\xcc", abnf_no_mask.format())
+
+ def test_frame_buffer(self):
+ fb = frame_buffer(0, True)
+ self.assertEqual(fb.recv, 0)
+ self.assertEqual(fb.skip_utf8_validation, True)
+ fb.clear
+ self.assertEqual(fb.header, None)
+ self.assertEqual(fb.length, None)
+ self.assertEqual(fb.mask_value, None)
+ self.assertEqual(fb.has_mask(), False)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/contrib/python/websocket-client/websocket/tests/test_app.py b/contrib/python/websocket-client/websocket/tests/test_app.py
new file mode 100644
index 0000000000..18eace5442
--- /dev/null
+++ b/contrib/python/websocket-client/websocket/tests/test_app.py
@@ -0,0 +1,352 @@
+# -*- coding: utf-8 -*-
+#
+import os
+import os.path
+import ssl
+import threading
+import unittest
+
+import websocket as ws
+
+"""
+test_app.py
+websocket - WebSocket client library for Python
+
+Copyright 2024 engn33r
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+# Skip test to access the internet unless TEST_WITH_INTERNET == 1
+TEST_WITH_INTERNET = os.environ.get("TEST_WITH_INTERNET", "0") == "1"
+# Skip tests relying on local websockets server unless LOCAL_WS_SERVER_PORT != -1
+LOCAL_WS_SERVER_PORT = os.environ.get("LOCAL_WS_SERVER_PORT", "-1")
+TEST_WITH_LOCAL_SERVER = LOCAL_WS_SERVER_PORT != "-1"
+TRACEABLE = True
+
+
+class WebSocketAppTest(unittest.TestCase):
+ class NotSetYet:
+ """A marker class for signalling that a value hasn't been set yet."""
+
+ def setUp(self):
+ ws.enableTrace(TRACEABLE)
+
+ WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet()
+ WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet()
+ WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet()
+ WebSocketAppTest.on_error_data = WebSocketAppTest.NotSetYet()
+
+ def tearDown(self):
+ WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet()
+ WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet()
+ WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet()
+ WebSocketAppTest.on_error_data = WebSocketAppTest.NotSetYet()
+
+ def close(self):
+ pass
+
+ @unittest.skipUnless(
+ TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled"
+ )
+ def test_keep_running(self):
+ """A WebSocketApp should keep running as long as its self.keep_running
+ is not False (in the boolean context).
+ """
+
+ def on_open(self, *args, **kwargs):
+ """Set the keep_running flag for later inspection and immediately
+ close the connection.
+ """
+ self.send("hello!")
+ WebSocketAppTest.keep_running_open = self.keep_running
+ self.keep_running = False
+
+ def on_message(_, message):
+ print(message)
+ self.close()
+
+ def on_close(self, *args, **kwargs):
+ """Set the keep_running flag for the test to use."""
+ WebSocketAppTest.keep_running_close = self.keep_running
+
+ app = ws.WebSocketApp(
+ f"ws://127.0.0.1:{LOCAL_WS_SERVER_PORT}",
+ on_open=on_open,
+ on_close=on_close,
+ on_message=on_message,
+ )
+ app.run_forever()
+
+ # @unittest.skipUnless(TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled")
+ @unittest.skipUnless(False, "Test disabled for now (requires rel)")
+ def test_run_forever_dispatcher(self):
+ """A WebSocketApp should keep running as long as its self.keep_running
+ is not False (in the boolean context).
+ """
+
+ def on_open(self, *args, **kwargs):
+ """Send a message, receive, and send one more"""
+ self.send("hello!")
+ self.recv()
+ self.send("goodbye!")
+
+ def on_message(_, message):
+ print(message)
+ self.close()
+
+ app = ws.WebSocketApp(
+ f"ws://127.0.0.1:{LOCAL_WS_SERVER_PORT}",
+ on_open=on_open,
+ on_message=on_message,
+ )
+ app.run_forever(dispatcher="Dispatcher") # doesn't work
+
+ # app.run_forever(dispatcher=rel) # would work
+ # rel.dispatch()
+
+ @unittest.skipUnless(
+ TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled"
+ )
+ def test_run_forever_teardown_clean_exit(self):
+ """The WebSocketApp.run_forever() method should return `False` when the application ends gracefully."""
+ app = ws.WebSocketApp(f"ws://127.0.0.1:{LOCAL_WS_SERVER_PORT}")
+ threading.Timer(interval=0.2, function=app.close).start()
+ teardown = app.run_forever()
+ self.assertEqual(teardown, False)
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_sock_mask_key(self):
+ """A WebSocketApp should forward the received mask_key function down
+ to the actual socket.
+ """
+
+ def my_mask_key_func():
+ return "\x00\x00\x00\x00"
+
+ app = ws.WebSocketApp(
+ "wss://api-pub.bitfinex.com/ws/1", get_mask_key=my_mask_key_func
+ )
+
+ # if numpy is installed, this assertion fail
+ # Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
+ self.assertEqual(id(app.get_mask_key), id(my_mask_key_func))
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_invalid_ping_interval_ping_timeout(self):
+ """Test exception handling if ping_interval < ping_timeout"""
+
+ def on_ping(app, _):
+ print("Got a ping!")
+ app.close()
+
+ def on_pong(app, _):
+ print("Got a pong! No need to respond")
+ app.close()
+
+ app = ws.WebSocketApp(
+ "wss://api-pub.bitfinex.com/ws/1", on_ping=on_ping, on_pong=on_pong
+ )
+ self.assertRaises(
+ ws.WebSocketException,
+ app.run_forever,
+ ping_interval=1,
+ ping_timeout=2,
+ sslopt={"cert_reqs": ssl.CERT_NONE},
+ )
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_ping_interval(self):
+ """Test WebSocketApp proper ping functionality"""
+
+ def on_ping(app, _):
+ print("Got a ping!")
+ app.close()
+
+ def on_pong(app, _):
+ print("Got a pong! No need to respond")
+ app.close()
+
+ app = ws.WebSocketApp(
+ "wss://api-pub.bitfinex.com/ws/1", on_ping=on_ping, on_pong=on_pong
+ )
+ app.run_forever(
+ ping_interval=2, ping_timeout=1, sslopt={"cert_reqs": ssl.CERT_NONE}
+ )
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_opcode_close(self):
+ """Test WebSocketApp close opcode"""
+
+ app = ws.WebSocketApp("wss://tsock.us1.twilio.com/v3/wsconnect")
+ app.run_forever(ping_interval=2, ping_timeout=1, ping_payload="Ping payload")
+
+ # This is commented out because the URL no longer responds in the expected way
+ # @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ # def testOpcodeBinary(self):
+ # """ Test WebSocketApp binary opcode
+ # """
+ # app = ws.WebSocketApp('wss://streaming.vn.teslamotors.com/streaming/')
+ # app.run_forever(ping_interval=2, ping_timeout=1, ping_payload="Ping payload")
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_bad_ping_interval(self):
+ """A WebSocketApp handling of negative ping_interval"""
+ app = ws.WebSocketApp("wss://api-pub.bitfinex.com/ws/1")
+ self.assertRaises(
+ ws.WebSocketException,
+ app.run_forever,
+ ping_interval=-5,
+ sslopt={"cert_reqs": ssl.CERT_NONE},
+ )
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_bad_ping_timeout(self):
+ """A WebSocketApp handling of negative ping_timeout"""
+ app = ws.WebSocketApp("wss://api-pub.bitfinex.com/ws/1")
+ self.assertRaises(
+ ws.WebSocketException,
+ app.run_forever,
+ ping_timeout=-3,
+ sslopt={"cert_reqs": ssl.CERT_NONE},
+ )
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_close_status_code(self):
+ """Test extraction of close frame status code and close reason in WebSocketApp"""
+
+ def on_close(wsapp, close_status_code, close_msg):
+ print("on_close reached")
+
+ app = ws.WebSocketApp(
+ "wss://tsock.us1.twilio.com/v3/wsconnect", on_close=on_close
+ )
+ closeframe = ws.ABNF(
+ opcode=ws.ABNF.OPCODE_CLOSE, data=b"\x03\xe8no-init-from-client"
+ )
+ self.assertEqual([1000, "no-init-from-client"], app._get_close_args(closeframe))
+
+ closeframe = ws.ABNF(opcode=ws.ABNF.OPCODE_CLOSE, data=b"")
+ self.assertEqual([None, None], app._get_close_args(closeframe))
+
+ app2 = ws.WebSocketApp("wss://tsock.us1.twilio.com/v3/wsconnect")
+ closeframe = ws.ABNF(opcode=ws.ABNF.OPCODE_CLOSE, data=b"")
+ self.assertEqual([None, None], app2._get_close_args(closeframe))
+
+ self.assertRaises(
+ ws.WebSocketConnectionClosedException,
+ app.send,
+ data="test if connection is closed",
+ )
+
+ @unittest.skipUnless(
+ TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled"
+ )
+ def test_callback_function_exception(self):
+ """Test callback function exception handling"""
+
+ exc = None
+ passed_app = None
+
+ def on_open(app):
+ raise RuntimeError("Callback failed")
+
+ def on_error(app, err):
+ nonlocal passed_app
+ passed_app = app
+ nonlocal exc
+ exc = err
+
+ def on_pong(app, _):
+ app.close()
+
+ app = ws.WebSocketApp(
+ f"ws://127.0.0.1:{LOCAL_WS_SERVER_PORT}",
+ on_open=on_open,
+ on_error=on_error,
+ on_pong=on_pong,
+ )
+ app.run_forever(ping_interval=2, ping_timeout=1)
+
+ self.assertEqual(passed_app, app)
+ self.assertIsInstance(exc, RuntimeError)
+ self.assertEqual(str(exc), "Callback failed")
+
+ @unittest.skipUnless(
+ TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled"
+ )
+ def test_callback_method_exception(self):
+ """Test callback method exception handling"""
+
+ class Callbacks:
+ def __init__(self):
+ self.exc = None
+ self.passed_app = None
+ self.app = ws.WebSocketApp(
+ f"ws://127.0.0.1:{LOCAL_WS_SERVER_PORT}",
+ on_open=self.on_open,
+ on_error=self.on_error,
+ on_pong=self.on_pong,
+ )
+ self.app.run_forever(ping_interval=2, ping_timeout=1)
+
+ def on_open(self, _):
+ raise RuntimeError("Callback failed")
+
+ def on_error(self, app, err):
+ self.passed_app = app
+ self.exc = err
+
+ def on_pong(self, app, _):
+ app.close()
+
+ callbacks = Callbacks()
+
+ self.assertEqual(callbacks.passed_app, callbacks.app)
+ self.assertIsInstance(callbacks.exc, RuntimeError)
+ self.assertEqual(str(callbacks.exc), "Callback failed")
+
+ @unittest.skipUnless(
+ TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled"
+ )
+ def test_reconnect(self):
+ """Test reconnect"""
+ pong_count = 0
+ exc = None
+
+ def on_error(_, err):
+ nonlocal exc
+ exc = err
+
+ def on_pong(app, _):
+ nonlocal pong_count
+ pong_count += 1
+ if pong_count == 1:
+ # First pong, shutdown socket, enforce read error
+ app.sock.shutdown()
+ if pong_count >= 2:
+ # Got second pong after reconnect
+ app.close()
+
+ app = ws.WebSocketApp(
+ f"ws://127.0.0.1:{LOCAL_WS_SERVER_PORT}", on_pong=on_pong, on_error=on_error
+ )
+ app.run_forever(ping_interval=2, ping_timeout=1, reconnect=3)
+
+ self.assertEqual(pong_count, 2)
+ self.assertIsInstance(exc, ws.WebSocketTimeoutException)
+ self.assertEqual(str(exc), "ping/pong timed out")
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/contrib/python/websocket-client/websocket/tests/test_cookiejar.py b/contrib/python/websocket-client/websocket/tests/test_cookiejar.py
new file mode 100644
index 0000000000..67eddb627a
--- /dev/null
+++ b/contrib/python/websocket-client/websocket/tests/test_cookiejar.py
@@ -0,0 +1,123 @@
+import unittest
+
+from websocket._cookiejar import SimpleCookieJar
+
+"""
+test_cookiejar.py
+websocket - WebSocket client library for Python
+
+Copyright 2024 engn33r
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+
+class CookieJarTest(unittest.TestCase):
+ def test_add(self):
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.add("")
+ self.assertFalse(
+ cookie_jar.jar, "Cookie with no domain should not be added to the jar"
+ )
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.add("a=b")
+ self.assertFalse(
+ cookie_jar.jar, "Cookie with no domain should not be added to the jar"
+ )
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.add("a=b; domain=.abc")
+ self.assertTrue(".abc" in cookie_jar.jar)
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.add("a=b; domain=abc")
+ self.assertTrue(".abc" in cookie_jar.jar)
+ self.assertTrue("abc" not in cookie_jar.jar)
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.add("a=b; c=d; domain=abc")
+ self.assertEqual(cookie_jar.get("abc"), "a=b; c=d")
+ self.assertEqual(cookie_jar.get(None), "")
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.add("a=b; c=d; domain=abc")
+ cookie_jar.add("e=f; domain=abc")
+ self.assertEqual(cookie_jar.get("abc"), "a=b; c=d; e=f")
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.add("a=b; c=d; domain=abc")
+ cookie_jar.add("e=f; domain=.abc")
+ self.assertEqual(cookie_jar.get("abc"), "a=b; c=d; e=f")
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.add("a=b; c=d; domain=abc")
+ cookie_jar.add("e=f; domain=xyz")
+ self.assertEqual(cookie_jar.get("abc"), "a=b; c=d")
+ self.assertEqual(cookie_jar.get("xyz"), "e=f")
+ self.assertEqual(cookie_jar.get("something"), "")
+
+ def test_set(self):
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.set("a=b")
+ self.assertFalse(
+ cookie_jar.jar, "Cookie with no domain should not be added to the jar"
+ )
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.set("a=b; domain=.abc")
+ self.assertTrue(".abc" in cookie_jar.jar)
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.set("a=b; domain=abc")
+ self.assertTrue(".abc" in cookie_jar.jar)
+ self.assertTrue("abc" not in cookie_jar.jar)
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.set("a=b; c=d; domain=abc")
+ self.assertEqual(cookie_jar.get("abc"), "a=b; c=d")
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.set("a=b; c=d; domain=abc")
+ cookie_jar.set("e=f; domain=abc")
+ self.assertEqual(cookie_jar.get("abc"), "e=f")
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.set("a=b; c=d; domain=abc")
+ cookie_jar.set("e=f; domain=.abc")
+ self.assertEqual(cookie_jar.get("abc"), "e=f")
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.set("a=b; c=d; domain=abc")
+ cookie_jar.set("e=f; domain=xyz")
+ self.assertEqual(cookie_jar.get("abc"), "a=b; c=d")
+ self.assertEqual(cookie_jar.get("xyz"), "e=f")
+ self.assertEqual(cookie_jar.get("something"), "")
+
+ def test_get(self):
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.set("a=b; c=d; domain=abc.com")
+ self.assertEqual(cookie_jar.get("abc.com"), "a=b; c=d")
+ self.assertEqual(cookie_jar.get("x.abc.com"), "a=b; c=d")
+ self.assertEqual(cookie_jar.get("abc.com.es"), "")
+ self.assertEqual(cookie_jar.get("xabc.com"), "")
+
+ cookie_jar.set("a=b; c=d; domain=.abc.com")
+ self.assertEqual(cookie_jar.get("abc.com"), "a=b; c=d")
+ self.assertEqual(cookie_jar.get("x.abc.com"), "a=b; c=d")
+ self.assertEqual(cookie_jar.get("abc.com.es"), "")
+ self.assertEqual(cookie_jar.get("xabc.com"), "")
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/contrib/python/websocket-client/websocket/tests/test_http.py b/contrib/python/websocket-client/websocket/tests/test_http.py
new file mode 100644
index 0000000000..72465c2205
--- /dev/null
+++ b/contrib/python/websocket-client/websocket/tests/test_http.py
@@ -0,0 +1,371 @@
+# -*- coding: utf-8 -*-
+#
+import os
+import os.path
+import socket
+import ssl
+import unittest
+
+import websocket
+from websocket._exceptions import WebSocketProxyException, WebSocketException
+from websocket._http import (
+ _get_addrinfo_list,
+ _start_proxied_socket,
+ _tunnel,
+ connect,
+ proxy_info,
+ read_headers,
+ HAVE_PYTHON_SOCKS,
+)
+
+"""
+test_http.py
+websocket - WebSocket client library for Python
+
+Copyright 2024 engn33r
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+try:
+ from python_socks._errors import ProxyConnectionError, ProxyError, ProxyTimeoutError
+except:
+ from websocket._http import ProxyConnectionError, ProxyError, ProxyTimeoutError
+
+# Skip test to access the internet unless TEST_WITH_INTERNET == 1
+TEST_WITH_INTERNET = os.environ.get("TEST_WITH_INTERNET", "0") == "1"
+TEST_WITH_PROXY = os.environ.get("TEST_WITH_PROXY", "0") == "1"
+# Skip tests relying on local websockets server unless LOCAL_WS_SERVER_PORT != -1
+LOCAL_WS_SERVER_PORT = os.environ.get("LOCAL_WS_SERVER_PORT", "-1")
+TEST_WITH_LOCAL_SERVER = LOCAL_WS_SERVER_PORT != "-1"
+
+
+class SockMock:
+ def __init__(self):
+ self.data = []
+ self.sent = []
+
+ def add_packet(self, data):
+ self.data.append(data)
+
+ def gettimeout(self):
+ return None
+
+ def recv(self, bufsize):
+ if self.data:
+ e = self.data.pop(0)
+ if isinstance(e, Exception):
+ raise e
+ if len(e) > bufsize:
+ self.data.insert(0, e[bufsize:])
+ return e[:bufsize]
+
+ def send(self, data):
+ self.sent.append(data)
+ return len(data)
+
+ def close(self):
+ pass
+
+
+class HeaderSockMock(SockMock):
+ def __init__(self, fname):
+ SockMock.__init__(self)
+ import yatest.common as yc
+ path = os.path.join(os.path.dirname(yc.source_path(__file__)), fname)
+ with open(path, "rb") as f:
+ self.add_packet(f.read())
+
+
+class OptsList:
+ def __init__(self):
+ self.timeout = 1
+ self.sockopt = []
+ self.sslopt = {"cert_reqs": ssl.CERT_NONE}
+
+
+class HttpTest(unittest.TestCase):
+ def test_read_header(self):
+ status, header, _ = read_headers(HeaderSockMock("data/header01.txt"))
+ self.assertEqual(status, 101)
+ self.assertEqual(header["connection"], "Upgrade")
+ # header02.txt is intentionally malformed
+ self.assertRaises(
+ WebSocketException, read_headers, HeaderSockMock("data/header02.txt")
+ )
+
+ def test_tunnel(self):
+ self.assertRaises(
+ WebSocketProxyException,
+ _tunnel,
+ HeaderSockMock("data/header01.txt"),
+ "example.com",
+ 80,
+ ("username", "password"),
+ )
+ self.assertRaises(
+ WebSocketProxyException,
+ _tunnel,
+ HeaderSockMock("data/header02.txt"),
+ "example.com",
+ 80,
+ ("username", "password"),
+ )
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_connect(self):
+ # Not currently testing an actual proxy connection, so just check whether proxy errors are raised. This requires internet for a DNS lookup
+ if HAVE_PYTHON_SOCKS:
+ # Need this check, otherwise case where python_socks is not installed triggers
+ # websocket._exceptions.WebSocketException: Python Socks is needed for SOCKS proxying but is not available
+ self.assertRaises(
+ (ProxyTimeoutError, OSError),
+ _start_proxied_socket,
+ "wss://example.com",
+ OptsList(),
+ proxy_info(
+ http_proxy_host="example.com",
+ http_proxy_port="8080",
+ proxy_type="socks4",
+ http_proxy_timeout=1,
+ ),
+ )
+ self.assertRaises(
+ (ProxyTimeoutError, OSError),
+ _start_proxied_socket,
+ "wss://example.com",
+ OptsList(),
+ proxy_info(
+ http_proxy_host="example.com",
+ http_proxy_port="8080",
+ proxy_type="socks4a",
+ http_proxy_timeout=1,
+ ),
+ )
+ self.assertRaises(
+ (ProxyTimeoutError, OSError),
+ _start_proxied_socket,
+ "wss://example.com",
+ OptsList(),
+ proxy_info(
+ http_proxy_host="example.com",
+ http_proxy_port="8080",
+ proxy_type="socks5",
+ http_proxy_timeout=1,
+ ),
+ )
+ self.assertRaises(
+ (ProxyTimeoutError, OSError),
+ _start_proxied_socket,
+ "wss://example.com",
+ OptsList(),
+ proxy_info(
+ http_proxy_host="example.com",
+ http_proxy_port="8080",
+ proxy_type="socks5h",
+ http_proxy_timeout=1,
+ ),
+ )
+ self.assertRaises(
+ ProxyConnectionError,
+ connect,
+ "wss://example.com",
+ OptsList(),
+ proxy_info(
+ http_proxy_host="127.0.0.1",
+ http_proxy_port=9999,
+ proxy_type="socks4",
+ http_proxy_timeout=1,
+ ),
+ None,
+ )
+
+ self.assertRaises(
+ TypeError,
+ _get_addrinfo_list,
+ None,
+ 80,
+ True,
+ proxy_info(
+ http_proxy_host="127.0.0.1", http_proxy_port="9999", proxy_type="http"
+ ),
+ )
+ self.assertRaises(
+ TypeError,
+ _get_addrinfo_list,
+ None,
+ 80,
+ True,
+ proxy_info(
+ http_proxy_host="127.0.0.1", http_proxy_port="9999", proxy_type="http"
+ ),
+ )
+ self.assertRaises(
+ socket.timeout,
+ connect,
+ "wss://google.com",
+ OptsList(),
+ proxy_info(
+ http_proxy_host="8.8.8.8",
+ http_proxy_port=9999,
+ proxy_type="http",
+ http_proxy_timeout=1,
+ ),
+ None,
+ )
+ self.assertEqual(
+ connect(
+ "wss://google.com",
+ OptsList(),
+ proxy_info(
+ http_proxy_host="8.8.8.8", http_proxy_port=8080, proxy_type="http"
+ ),
+ True,
+ ),
+ (True, ("google.com", 443, "/")),
+ )
+ # The following test fails on Mac OS with a gaierror, not an OverflowError
+ # self.assertRaises(OverflowError, connect, "wss://example.com", OptsList(), proxy_info(http_proxy_host="127.0.0.1", http_proxy_port=99999, proxy_type="socks4", timeout=2), False)
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ @unittest.skipUnless(
+ TEST_WITH_PROXY, "This test requires a HTTP proxy to be running on port 8899"
+ )
+ @unittest.skipUnless(
+ TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled"
+ )
+ def test_proxy_connect(self):
+ ws = websocket.WebSocket()
+ ws.connect(
+ f"ws://127.0.0.1:{LOCAL_WS_SERVER_PORT}",
+ http_proxy_host="127.0.0.1",
+ http_proxy_port="8899",
+ proxy_type="http",
+ )
+ ws.send("Hello, Server")
+ server_response = ws.recv()
+ self.assertEqual(server_response, "Hello, Server")
+ # self.assertEqual(_start_proxied_socket("wss://api.bitfinex.com/ws/2", OptsList(), proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8899", proxy_type="http"))[1], ("api.bitfinex.com", 443, '/ws/2'))
+ self.assertEqual(
+ _get_addrinfo_list(
+ "api.bitfinex.com",
+ 443,
+ True,
+ proxy_info(
+ http_proxy_host="127.0.0.1",
+ http_proxy_port="8899",
+ proxy_type="http",
+ ),
+ ),
+ (
+ socket.getaddrinfo(
+ "127.0.0.1", 8899, 0, socket.SOCK_STREAM, socket.SOL_TCP
+ ),
+ True,
+ None,
+ ),
+ )
+ self.assertEqual(
+ connect(
+ "wss://api.bitfinex.com/ws/2",
+ OptsList(),
+ proxy_info(
+ http_proxy_host="127.0.0.1", http_proxy_port=8899, proxy_type="http"
+ ),
+ None,
+ )[1],
+ ("api.bitfinex.com", 443, "/ws/2"),
+ )
+ # TODO: Test SOCKS4 and SOCK5 proxies with unit tests
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_sslopt(self):
+ ssloptions = {
+ "check_hostname": False,
+ "server_hostname": "ServerName",
+ "ssl_version": ssl.PROTOCOL_TLS_CLIENT,
+ "ciphers": "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:\
+ TLS_AES_128_GCM_SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:\
+ ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-GCM-SHA384:\
+ ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:\
+ DHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:\
+ ECDHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES128-GCM-SHA256:\
+ ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:\
+ DHE-RSA-AES256-SHA256:ECDHE-ECDSA-AES128-SHA256:\
+ ECDHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA256:\
+ ECDHE-ECDSA-AES256-SHA:ECDHE-RSA-AES256-SHA",
+ "ecdh_curve": "prime256v1",
+ }
+ ws_ssl1 = websocket.WebSocket(sslopt=ssloptions)
+ ws_ssl1.connect("wss://api.bitfinex.com/ws/2")
+ ws_ssl1.send("Hello")
+ ws_ssl1.close()
+
+ ws_ssl2 = websocket.WebSocket(sslopt={"check_hostname": True})
+ ws_ssl2.connect("wss://api.bitfinex.com/ws/2")
+ ws_ssl2.close
+
+ def test_proxy_info(self):
+ self.assertEqual(
+ proxy_info(
+ http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http"
+ ).proxy_protocol,
+ "http",
+ )
+ self.assertRaises(
+ ProxyError,
+ proxy_info,
+ http_proxy_host="127.0.0.1",
+ http_proxy_port="8080",
+ proxy_type="badval",
+ )
+ self.assertEqual(
+ proxy_info(
+ http_proxy_host="example.com", http_proxy_port="8080", proxy_type="http"
+ ).proxy_host,
+ "example.com",
+ )
+ self.assertEqual(
+ proxy_info(
+ http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http"
+ ).proxy_port,
+ "8080",
+ )
+ self.assertEqual(
+ proxy_info(
+ http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http"
+ ).auth,
+ None,
+ )
+ self.assertEqual(
+ proxy_info(
+ http_proxy_host="127.0.0.1",
+ http_proxy_port="8080",
+ proxy_type="http",
+ http_proxy_auth=("my_username123", "my_pass321"),
+ ).auth[0],
+ "my_username123",
+ )
+ self.assertEqual(
+ proxy_info(
+ http_proxy_host="127.0.0.1",
+ http_proxy_port="8080",
+ proxy_type="http",
+ http_proxy_auth=("my_username123", "my_pass321"),
+ ).auth[1],
+ "my_pass321",
+ )
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/contrib/python/websocket-client/websocket/tests/test_url.py b/contrib/python/websocket-client/websocket/tests/test_url.py
new file mode 100644
index 0000000000..110fdfad70
--- /dev/null
+++ b/contrib/python/websocket-client/websocket/tests/test_url.py
@@ -0,0 +1,464 @@
+# -*- coding: utf-8 -*-
+#
+import os
+import unittest
+
+from websocket._url import (
+ _is_address_in_network,
+ _is_no_proxy_host,
+ get_proxy_info,
+ parse_url,
+)
+from websocket._exceptions import WebSocketProxyException
+
+"""
+test_url.py
+websocket - WebSocket client library for Python
+
+Copyright 2024 engn33r
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+
+class UrlTest(unittest.TestCase):
+ def test_address_in_network(self):
+ self.assertTrue(_is_address_in_network("127.0.0.1", "127.0.0.0/8"))
+ self.assertTrue(_is_address_in_network("127.1.0.1", "127.0.0.0/8"))
+ self.assertFalse(_is_address_in_network("127.1.0.1", "127.0.0.0/24"))
+
+ def test_parse_url(self):
+ p = parse_url("ws://www.example.com/r")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 80)
+ self.assertEqual(p[2], "/r")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("ws://www.example.com/r/")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 80)
+ self.assertEqual(p[2], "/r/")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("ws://www.example.com/")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 80)
+ self.assertEqual(p[2], "/")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("ws://www.example.com")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 80)
+ self.assertEqual(p[2], "/")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("ws://www.example.com:8080/r")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 8080)
+ self.assertEqual(p[2], "/r")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("ws://www.example.com:8080/")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 8080)
+ self.assertEqual(p[2], "/")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("ws://www.example.com:8080")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 8080)
+ self.assertEqual(p[2], "/")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("wss://www.example.com:8080/r")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 8080)
+ self.assertEqual(p[2], "/r")
+ self.assertEqual(p[3], True)
+
+ p = parse_url("wss://www.example.com:8080/r?key=value")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 8080)
+ self.assertEqual(p[2], "/r?key=value")
+ self.assertEqual(p[3], True)
+
+ self.assertRaises(ValueError, parse_url, "http://www.example.com/r")
+
+ p = parse_url("ws://[2a03:4000:123:83::3]/r")
+ self.assertEqual(p[0], "2a03:4000:123:83::3")
+ self.assertEqual(p[1], 80)
+ self.assertEqual(p[2], "/r")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("ws://[2a03:4000:123:83::3]:8080/r")
+ self.assertEqual(p[0], "2a03:4000:123:83::3")
+ self.assertEqual(p[1], 8080)
+ self.assertEqual(p[2], "/r")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("wss://[2a03:4000:123:83::3]/r")
+ self.assertEqual(p[0], "2a03:4000:123:83::3")
+ self.assertEqual(p[1], 443)
+ self.assertEqual(p[2], "/r")
+ self.assertEqual(p[3], True)
+
+ p = parse_url("wss://[2a03:4000:123:83::3]:8080/r")
+ self.assertEqual(p[0], "2a03:4000:123:83::3")
+ self.assertEqual(p[1], 8080)
+ self.assertEqual(p[2], "/r")
+ self.assertEqual(p[3], True)
+
+
+class IsNoProxyHostTest(unittest.TestCase):
+ def setUp(self):
+ self.no_proxy = os.environ.get("no_proxy", None)
+ if "no_proxy" in os.environ:
+ del os.environ["no_proxy"]
+
+ def tearDown(self):
+ if self.no_proxy:
+ os.environ["no_proxy"] = self.no_proxy
+ elif "no_proxy" in os.environ:
+ del os.environ["no_proxy"]
+
+ def test_match_all(self):
+ self.assertTrue(_is_no_proxy_host("any.websocket.org", ["*"]))
+ self.assertTrue(_is_no_proxy_host("192.168.0.1", ["*"]))
+ self.assertFalse(_is_no_proxy_host("192.168.0.1", ["192.168.1.1"]))
+ self.assertFalse(
+ _is_no_proxy_host("any.websocket.org", ["other.websocket.org"])
+ )
+ self.assertTrue(
+ _is_no_proxy_host("any.websocket.org", ["other.websocket.org", "*"])
+ )
+ os.environ["no_proxy"] = "*"
+ self.assertTrue(_is_no_proxy_host("any.websocket.org", None))
+ self.assertTrue(_is_no_proxy_host("192.168.0.1", None))
+ os.environ["no_proxy"] = "other.websocket.org, *"
+ self.assertTrue(_is_no_proxy_host("any.websocket.org", None))
+
+ def test_ip_address(self):
+ self.assertTrue(_is_no_proxy_host("127.0.0.1", ["127.0.0.1"]))
+ self.assertFalse(_is_no_proxy_host("127.0.0.2", ["127.0.0.1"]))
+ self.assertTrue(
+ _is_no_proxy_host("127.0.0.1", ["other.websocket.org", "127.0.0.1"])
+ )
+ self.assertFalse(
+ _is_no_proxy_host("127.0.0.2", ["other.websocket.org", "127.0.0.1"])
+ )
+ os.environ["no_proxy"] = "127.0.0.1"
+ self.assertTrue(_is_no_proxy_host("127.0.0.1", None))
+ self.assertFalse(_is_no_proxy_host("127.0.0.2", None))
+ os.environ["no_proxy"] = "other.websocket.org, 127.0.0.1"
+ self.assertTrue(_is_no_proxy_host("127.0.0.1", None))
+ self.assertFalse(_is_no_proxy_host("127.0.0.2", None))
+
+ def test_ip_address_in_range(self):
+ self.assertTrue(_is_no_proxy_host("127.0.0.1", ["127.0.0.0/8"]))
+ self.assertTrue(_is_no_proxy_host("127.0.0.2", ["127.0.0.0/8"]))
+ self.assertFalse(_is_no_proxy_host("127.1.0.1", ["127.0.0.0/24"]))
+ os.environ["no_proxy"] = "127.0.0.0/8"
+ self.assertTrue(_is_no_proxy_host("127.0.0.1", None))
+ self.assertTrue(_is_no_proxy_host("127.0.0.2", None))
+ os.environ["no_proxy"] = "127.0.0.0/24"
+ self.assertFalse(_is_no_proxy_host("127.1.0.1", None))
+
+ def test_hostname_match(self):
+ self.assertTrue(_is_no_proxy_host("my.websocket.org", ["my.websocket.org"]))
+ self.assertTrue(
+ _is_no_proxy_host(
+ "my.websocket.org", ["other.websocket.org", "my.websocket.org"]
+ )
+ )
+ self.assertFalse(_is_no_proxy_host("my.websocket.org", ["other.websocket.org"]))
+ os.environ["no_proxy"] = "my.websocket.org"
+ self.assertTrue(_is_no_proxy_host("my.websocket.org", None))
+ self.assertFalse(_is_no_proxy_host("other.websocket.org", None))
+ os.environ["no_proxy"] = "other.websocket.org, my.websocket.org"
+ self.assertTrue(_is_no_proxy_host("my.websocket.org", None))
+
+ def test_hostname_match_domain(self):
+ self.assertTrue(_is_no_proxy_host("any.websocket.org", [".websocket.org"]))
+ self.assertTrue(_is_no_proxy_host("my.other.websocket.org", [".websocket.org"]))
+ self.assertTrue(
+ _is_no_proxy_host(
+ "any.websocket.org", ["my.websocket.org", ".websocket.org"]
+ )
+ )
+ self.assertFalse(_is_no_proxy_host("any.websocket.com", [".websocket.org"]))
+ os.environ["no_proxy"] = ".websocket.org"
+ self.assertTrue(_is_no_proxy_host("any.websocket.org", None))
+ self.assertTrue(_is_no_proxy_host("my.other.websocket.org", None))
+ self.assertFalse(_is_no_proxy_host("any.websocket.com", None))
+ os.environ["no_proxy"] = "my.websocket.org, .websocket.org"
+ self.assertTrue(_is_no_proxy_host("any.websocket.org", None))
+
+
+class ProxyInfoTest(unittest.TestCase):
+ def setUp(self):
+ self.http_proxy = os.environ.get("http_proxy", None)
+ self.https_proxy = os.environ.get("https_proxy", None)
+ self.no_proxy = os.environ.get("no_proxy", None)
+ if "http_proxy" in os.environ:
+ del os.environ["http_proxy"]
+ if "https_proxy" in os.environ:
+ del os.environ["https_proxy"]
+ if "no_proxy" in os.environ:
+ del os.environ["no_proxy"]
+
+ def tearDown(self):
+ if self.http_proxy:
+ os.environ["http_proxy"] = self.http_proxy
+ elif "http_proxy" in os.environ:
+ del os.environ["http_proxy"]
+
+ if self.https_proxy:
+ os.environ["https_proxy"] = self.https_proxy
+ elif "https_proxy" in os.environ:
+ del os.environ["https_proxy"]
+
+ if self.no_proxy:
+ os.environ["no_proxy"] = self.no_proxy
+ elif "no_proxy" in os.environ:
+ del os.environ["no_proxy"]
+
+ def test_proxy_from_args(self):
+ self.assertRaises(
+ WebSocketProxyException,
+ get_proxy_info,
+ "echo.websocket.events",
+ False,
+ proxy_host="localhost",
+ )
+ self.assertEqual(
+ get_proxy_info(
+ "echo.websocket.events", False, proxy_host="localhost", proxy_port=3128
+ ),
+ ("localhost", 3128, None),
+ )
+ self.assertEqual(
+ get_proxy_info(
+ "echo.websocket.events", True, proxy_host="localhost", proxy_port=3128
+ ),
+ ("localhost", 3128, None),
+ )
+
+ self.assertEqual(
+ get_proxy_info(
+ "echo.websocket.events",
+ False,
+ proxy_host="localhost",
+ proxy_port=9001,
+ proxy_auth=("a", "b"),
+ ),
+ ("localhost", 9001, ("a", "b")),
+ )
+ self.assertEqual(
+ get_proxy_info(
+ "echo.websocket.events",
+ False,
+ proxy_host="localhost",
+ proxy_port=3128,
+ proxy_auth=("a", "b"),
+ ),
+ ("localhost", 3128, ("a", "b")),
+ )
+ self.assertEqual(
+ get_proxy_info(
+ "echo.websocket.events",
+ True,
+ proxy_host="localhost",
+ proxy_port=8765,
+ proxy_auth=("a", "b"),
+ ),
+ ("localhost", 8765, ("a", "b")),
+ )
+ self.assertEqual(
+ get_proxy_info(
+ "echo.websocket.events",
+ True,
+ proxy_host="localhost",
+ proxy_port=3128,
+ proxy_auth=("a", "b"),
+ ),
+ ("localhost", 3128, ("a", "b")),
+ )
+
+ self.assertEqual(
+ get_proxy_info(
+ "echo.websocket.events",
+ True,
+ proxy_host="localhost",
+ proxy_port=3128,
+ no_proxy=["example.com"],
+ proxy_auth=("a", "b"),
+ ),
+ ("localhost", 3128, ("a", "b")),
+ )
+ self.assertEqual(
+ get_proxy_info(
+ "echo.websocket.events",
+ True,
+ proxy_host="localhost",
+ proxy_port=3128,
+ no_proxy=["echo.websocket.events"],
+ proxy_auth=("a", "b"),
+ ),
+ (None, 0, None),
+ )
+
+ self.assertEqual(
+ get_proxy_info(
+ "echo.websocket.events",
+ True,
+ proxy_host="localhost",
+ proxy_port=3128,
+ no_proxy=[".websocket.events"],
+ ),
+ (None, 0, None),
+ )
+
+ def test_proxy_from_env(self):
+ os.environ["http_proxy"] = "http://localhost/"
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", False), ("localhost", None, None)
+ )
+ os.environ["http_proxy"] = "http://localhost:3128/"
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", False), ("localhost", 3128, None)
+ )
+
+ os.environ["http_proxy"] = "http://localhost/"
+ os.environ["https_proxy"] = "http://localhost2/"
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", False), ("localhost", None, None)
+ )
+ os.environ["http_proxy"] = "http://localhost:3128/"
+ os.environ["https_proxy"] = "http://localhost2:3128/"
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", False), ("localhost", 3128, None)
+ )
+
+ os.environ["http_proxy"] = "http://localhost/"
+ os.environ["https_proxy"] = "http://localhost2/"
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", True), ("localhost2", None, None)
+ )
+ os.environ["http_proxy"] = "http://localhost:3128/"
+ os.environ["https_proxy"] = "http://localhost2:3128/"
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", True), ("localhost2", 3128, None)
+ )
+
+ os.environ["http_proxy"] = ""
+ os.environ["https_proxy"] = "http://localhost2/"
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", True), ("localhost2", None, None)
+ )
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", False), (None, 0, None)
+ )
+ os.environ["http_proxy"] = ""
+ os.environ["https_proxy"] = "http://localhost2:3128/"
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", True), ("localhost2", 3128, None)
+ )
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", False), (None, 0, None)
+ )
+
+ os.environ["http_proxy"] = "http://localhost/"
+ os.environ["https_proxy"] = ""
+ self.assertEqual(get_proxy_info("echo.websocket.events", True), (None, 0, None))
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", False), ("localhost", None, None)
+ )
+ os.environ["http_proxy"] = "http://localhost:3128/"
+ os.environ["https_proxy"] = ""
+ self.assertEqual(get_proxy_info("echo.websocket.events", True), (None, 0, None))
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", False), ("localhost", 3128, None)
+ )
+
+ os.environ["http_proxy"] = "http://a:b@localhost/"
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", False),
+ ("localhost", None, ("a", "b")),
+ )
+ os.environ["http_proxy"] = "http://a:b@localhost:3128/"
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", False),
+ ("localhost", 3128, ("a", "b")),
+ )
+
+ os.environ["http_proxy"] = "http://a:b@localhost/"
+ os.environ["https_proxy"] = "http://a:b@localhost2/"
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", False),
+ ("localhost", None, ("a", "b")),
+ )
+ os.environ["http_proxy"] = "http://a:b@localhost:3128/"
+ os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", False),
+ ("localhost", 3128, ("a", "b")),
+ )
+
+ os.environ["http_proxy"] = "http://a:b@localhost/"
+ os.environ["https_proxy"] = "http://a:b@localhost2/"
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", True),
+ ("localhost2", None, ("a", "b")),
+ )
+ os.environ["http_proxy"] = "http://a:b@localhost:3128/"
+ os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", True),
+ ("localhost2", 3128, ("a", "b")),
+ )
+
+ os.environ[
+ "http_proxy"
+ ] = "http://john%40example.com:P%40SSWORD@localhost:3128/"
+ os.environ[
+ "https_proxy"
+ ] = "http://john%40example.com:P%40SSWORD@localhost2:3128/"
+ self.assertEqual(
+ get_proxy_info("echo.websocket.events", True),
+ ("localhost2", 3128, ("john@example.com", "P@SSWORD")),
+ )
+
+ os.environ["http_proxy"] = "http://a:b@localhost/"
+ os.environ["https_proxy"] = "http://a:b@localhost2/"
+ os.environ["no_proxy"] = "example1.com,example2.com"
+ self.assertEqual(
+ get_proxy_info("example.1.com", True), ("localhost2", None, ("a", "b"))
+ )
+ os.environ["http_proxy"] = "http://a:b@localhost:3128/"
+ os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
+ os.environ["no_proxy"] = "example1.com,example2.com, echo.websocket.events"
+ self.assertEqual(get_proxy_info("echo.websocket.events", True), (None, 0, None))
+ os.environ["http_proxy"] = "http://a:b@localhost:3128/"
+ os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
+ os.environ["no_proxy"] = "example1.com,example2.com, .websocket.events"
+ self.assertEqual(get_proxy_info("echo.websocket.events", True), (None, 0, None))
+
+ os.environ["http_proxy"] = "http://a:b@localhost:3128/"
+ os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
+ os.environ["no_proxy"] = "127.0.0.0/8, 192.168.0.0/16"
+ self.assertEqual(get_proxy_info("127.0.0.1", False), (None, 0, None))
+ self.assertEqual(get_proxy_info("192.168.1.1", False), (None, 0, None))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/contrib/python/websocket-client/websocket/tests/test_websocket.py b/contrib/python/websocket-client/websocket/tests/test_websocket.py
new file mode 100644
index 0000000000..892312a2db
--- /dev/null
+++ b/contrib/python/websocket-client/websocket/tests/test_websocket.py
@@ -0,0 +1,498 @@
+# -*- coding: utf-8 -*-
+#
+import os
+import os.path
+import socket
+import unittest
+from base64 import decodebytes as base64decode
+
+import websocket as ws
+from websocket._exceptions import WebSocketBadStatusException, WebSocketAddressException
+from websocket._handshake import _create_sec_websocket_key
+from websocket._handshake import _validate as _validate_header
+from websocket._http import read_headers
+from websocket._utils import validate_utf8
+
+"""
+test_websocket.py
+websocket - WebSocket client library for Python
+
+Copyright 2024 engn33r
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+try:
+ import ssl
+except ImportError:
+ # dummy class of SSLError for ssl none-support environment.
+ class SSLError(Exception):
+ pass
+
+
+# Skip test to access the internet unless TEST_WITH_INTERNET == 1
+TEST_WITH_INTERNET = os.environ.get("TEST_WITH_INTERNET", "0") == "1"
+# Skip tests relying on local websockets server unless LOCAL_WS_SERVER_PORT != -1
+LOCAL_WS_SERVER_PORT = os.environ.get("LOCAL_WS_SERVER_PORT", "-1")
+TEST_WITH_LOCAL_SERVER = LOCAL_WS_SERVER_PORT != "-1"
+TRACEABLE = True
+
+
+def create_mask_key(_):
+ return "abcd"
+
+
+class SockMock:
+ def __init__(self):
+ self.data = []
+ self.sent = []
+
+ def add_packet(self, data):
+ self.data.append(data)
+
+ def gettimeout(self):
+ return None
+
+ def recv(self, bufsize):
+ if self.data:
+ e = self.data.pop(0)
+ if isinstance(e, Exception):
+ raise e
+ if len(e) > bufsize:
+ self.data.insert(0, e[bufsize:])
+ return e[:bufsize]
+
+ def send(self, data):
+ self.sent.append(data)
+ return len(data)
+
+ def close(self):
+ pass
+
+
+class HeaderSockMock(SockMock):
+ def __init__(self, fname):
+ SockMock.__init__(self)
+ import yatest.common as yc
+ path = os.path.join(os.path.dirname(yc.source_path(__file__)), fname)
+ with open(path, "rb") as f:
+ self.add_packet(f.read())
+
+
+class WebSocketTest(unittest.TestCase):
+ def setUp(self):
+ ws.enableTrace(TRACEABLE)
+
+ def tearDown(self):
+ pass
+
+ def test_default_timeout(self):
+ self.assertEqual(ws.getdefaulttimeout(), None)
+ ws.setdefaulttimeout(10)
+ self.assertEqual(ws.getdefaulttimeout(), 10)
+ ws.setdefaulttimeout(None)
+
+ def test_ws_key(self):
+ key = _create_sec_websocket_key()
+ self.assertTrue(key != 24)
+ self.assertTrue("¥n" not in key)
+
+ def test_nonce(self):
+ """WebSocket key should be a random 16-byte nonce."""
+ key = _create_sec_websocket_key()
+ nonce = base64decode(key.encode("utf-8"))
+ self.assertEqual(16, len(nonce))
+
+ def test_ws_utils(self):
+ key = "c6b8hTg4EeGb2gQMztV1/g=="
+ required_header = {
+ "upgrade": "websocket",
+ "connection": "upgrade",
+ "sec-websocket-accept": "Kxep+hNu9n51529fGidYu7a3wO0=",
+ }
+ self.assertEqual(_validate_header(required_header, key, None), (True, None))
+
+ header = required_header.copy()
+ header["upgrade"] = "http"
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+ del header["upgrade"]
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+
+ header = required_header.copy()
+ header["connection"] = "something"
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+ del header["connection"]
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+
+ header = required_header.copy()
+ header["sec-websocket-accept"] = "something"
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+ del header["sec-websocket-accept"]
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+
+ header = required_header.copy()
+ header["sec-websocket-protocol"] = "sub1"
+ self.assertEqual(
+ _validate_header(header, key, ["sub1", "sub2"]), (True, "sub1")
+ )
+ # This case will print out a logging error using the error() function, but that is expected
+ self.assertEqual(_validate_header(header, key, ["sub2", "sub3"]), (False, None))
+
+ header = required_header.copy()
+ header["sec-websocket-protocol"] = "sUb1"
+ self.assertEqual(
+ _validate_header(header, key, ["Sub1", "suB2"]), (True, "sub1")
+ )
+
+ header = required_header.copy()
+ # This case will print out a logging error using the error() function, but that is expected
+ self.assertEqual(_validate_header(header, key, ["Sub1", "suB2"]), (False, None))
+
+ def test_read_header(self):
+ status, header, _ = read_headers(HeaderSockMock("data/header01.txt"))
+ self.assertEqual(status, 101)
+ self.assertEqual(header["connection"], "Upgrade")
+
+ status, header, _ = read_headers(HeaderSockMock("data/header03.txt"))
+ self.assertEqual(status, 101)
+ self.assertEqual(header["connection"], "Upgrade, Keep-Alive")
+
+ HeaderSockMock("data/header02.txt")
+ self.assertRaises(
+ ws.WebSocketException, read_headers, HeaderSockMock("data/header02.txt")
+ )
+
+ def test_send(self):
+ # TODO: add longer frame data
+ sock = ws.WebSocket()
+ sock.set_mask_key(create_mask_key)
+ s = sock.sock = HeaderSockMock("data/header01.txt")
+ sock.send("Hello")
+ self.assertEqual(s.sent[0], b"\x81\x85abcd)\x07\x0f\x08\x0e")
+
+ sock.send("こんにちは")
+ self.assertEqual(
+ s.sent[1],
+ b"\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc",
+ )
+
+ # sock.send("x" * 5000)
+ # self.assertEqual(s.sent[1], b'\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc")
+
+ self.assertEqual(sock.send_binary(b"1111111111101"), 19)
+
+ def test_recv(self):
+ # TODO: add longer frame data
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ something = (
+ b"\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc"
+ )
+ s.add_packet(something)
+ data = sock.recv()
+ self.assertEqual(data, "こんにちは")
+
+ s.add_packet(b"\x81\x85abcd)\x07\x0f\x08\x0e")
+ data = sock.recv()
+ self.assertEqual(data, "Hello")
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_iter(self):
+ count = 2
+ s = ws.create_connection("wss://api.bitfinex.com/ws/2")
+ s.send('{"event": "subscribe", "channel": "ticker"}')
+ for _ in s:
+ count -= 1
+ if count == 0:
+ break
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_next(self):
+ sock = ws.create_connection("wss://api.bitfinex.com/ws/2")
+ self.assertEqual(str, type(next(sock)))
+
+ def test_internal_recv_strict(self):
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ s.add_packet(b"foo")
+ s.add_packet(socket.timeout())
+ s.add_packet(b"bar")
+ # s.add_packet(SSLError("The read operation timed out"))
+ s.add_packet(b"baz")
+ with self.assertRaises(ws.WebSocketTimeoutException):
+ sock.frame_buffer.recv_strict(9)
+ # with self.assertRaises(SSLError):
+ # data = sock._recv_strict(9)
+ data = sock.frame_buffer.recv_strict(9)
+ self.assertEqual(data, b"foobarbaz")
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.frame_buffer.recv_strict(1)
+
+ def test_recv_timeout(self):
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ s.add_packet(b"\x81")
+ s.add_packet(socket.timeout())
+ s.add_packet(b"\x8dabcd\x29\x07\x0f\x08\x0e")
+ s.add_packet(socket.timeout())
+ s.add_packet(b"\x4e\x43\x33\x0e\x10\x0f\x00\x40")
+ with self.assertRaises(ws.WebSocketTimeoutException):
+ sock.recv()
+ with self.assertRaises(ws.WebSocketTimeoutException):
+ sock.recv()
+ data = sock.recv()
+ self.assertEqual(data, "Hello, World!")
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.recv()
+
+ def test_recv_with_simple_fragmentation(self):
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ # OPCODE=TEXT, FIN=0, MSG="Brevity is "
+ s.add_packet(b"\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C")
+ # OPCODE=CONT, FIN=1, MSG="the soul of wit"
+ s.add_packet(b"\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17")
+ data = sock.recv()
+ self.assertEqual(data, "Brevity is the soul of wit")
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.recv()
+
+ def test_recv_with_fire_event_of_fragmentation(self):
+ sock = ws.WebSocket(fire_cont_frame=True)
+ s = sock.sock = SockMock()
+ # OPCODE=TEXT, FIN=0, MSG="Brevity is "
+ s.add_packet(b"\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C")
+ # OPCODE=CONT, FIN=0, MSG="Brevity is "
+ s.add_packet(b"\x00\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C")
+ # OPCODE=CONT, FIN=1, MSG="the soul of wit"
+ s.add_packet(b"\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17")
+
+ _, data = sock.recv_data()
+ self.assertEqual(data, b"Brevity is ")
+ _, data = sock.recv_data()
+ self.assertEqual(data, b"Brevity is ")
+ _, data = sock.recv_data()
+ self.assertEqual(data, b"the soul of wit")
+
+ # OPCODE=CONT, FIN=0, MSG="Brevity is "
+ s.add_packet(b"\x80\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C")
+
+ with self.assertRaises(ws.WebSocketException):
+ sock.recv_data()
+
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.recv()
+
+ def test_close(self):
+ sock = ws.WebSocket()
+ sock.connected = True
+ sock.close
+
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ sock.connected = True
+ s.add_packet(b"\x88\x80\x17\x98p\x84")
+ sock.recv()
+ self.assertEqual(sock.connected, False)
+
+ def test_recv_cont_fragmentation(self):
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ # OPCODE=CONT, FIN=1, MSG="the soul of wit"
+ s.add_packet(b"\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17")
+ self.assertRaises(ws.WebSocketException, sock.recv)
+
+ def test_recv_with_prolonged_fragmentation(self):
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ # OPCODE=TEXT, FIN=0, MSG="Once more unto the breach, "
+ s.add_packet(
+ b"\x01\x9babcd.\x0c\x00\x01A\x0f\x0c\x16\x04B\x16\n\x15\rC\x10\t\x07C\x06\x13\x07\x02\x07\tNC"
+ )
+ # OPCODE=CONT, FIN=0, MSG="dear friends, "
+ s.add_packet(b"\x00\x8eabcd\x05\x07\x02\x16A\x04\x11\r\x04\x0c\x07\x17MB")
+ # OPCODE=CONT, FIN=1, MSG="once more"
+ s.add_packet(b"\x80\x89abcd\x0e\x0c\x00\x01A\x0f\x0c\x16\x04")
+ data = sock.recv()
+ self.assertEqual(data, "Once more unto the breach, dear friends, once more")
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.recv()
+
+ def test_recv_with_fragmentation_and_control_frame(self):
+ sock = ws.WebSocket()
+ sock.set_mask_key(create_mask_key)
+ s = sock.sock = SockMock()
+ # OPCODE=TEXT, FIN=0, MSG="Too much "
+ s.add_packet(b"\x01\x89abcd5\r\x0cD\x0c\x17\x00\x0cA")
+ # OPCODE=PING, FIN=1, MSG="Please PONG this"
+ s.add_packet(b"\x89\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17")
+ # OPCODE=CONT, FIN=1, MSG="of a good thing"
+ s.add_packet(b"\x80\x8fabcd\x0e\x04C\x05A\x05\x0c\x0b\x05B\x17\x0c\x08\x0c\x04")
+ data = sock.recv()
+ self.assertEqual(data, "Too much of a good thing")
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.recv()
+ self.assertEqual(
+ s.sent[0], b"\x8a\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17"
+ )
+
+ @unittest.skipUnless(
+ TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled"
+ )
+ def test_websocket(self):
+ s = ws.create_connection(f"ws://127.0.0.1:{LOCAL_WS_SERVER_PORT}")
+ self.assertNotEqual(s, None)
+ s.send("Hello, World")
+ result = s.next()
+ s.fileno()
+ self.assertEqual(result, "Hello, World")
+
+ s.send("こにゃにゃちは、世界")
+ result = s.recv()
+ self.assertEqual(result, "こにゃにゃちは、世界")
+ self.assertRaises(ValueError, s.send_close, -1, "")
+ s.close()
+
+ @unittest.skipUnless(
+ TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled"
+ )
+ def test_ping_pong(self):
+ s = ws.create_connection(f"ws://127.0.0.1:{LOCAL_WS_SERVER_PORT}")
+ self.assertNotEqual(s, None)
+ s.ping("Hello")
+ s.pong("Hi")
+ s.close()
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_support_redirect(self):
+ s = ws.WebSocket()
+ self.assertRaises(WebSocketBadStatusException, s.connect, "ws://google.com/")
+ # Need to find a URL that has a redirect code leading to a websocket
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_secure_websocket(self):
+ s = ws.create_connection("wss://api.bitfinex.com/ws/2")
+ self.assertNotEqual(s, None)
+ self.assertTrue(isinstance(s.sock, ssl.SSLSocket))
+ self.assertEqual(s.getstatus(), 101)
+ self.assertNotEqual(s.getheaders(), None)
+ s.settimeout(10)
+ self.assertEqual(s.gettimeout(), 10)
+ self.assertEqual(s.getsubprotocol(), None)
+ s.abort()
+
+ @unittest.skipUnless(
+ TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled"
+ )
+ def test_websocket_with_custom_header(self):
+ s = ws.create_connection(
+ f"ws://127.0.0.1:{LOCAL_WS_SERVER_PORT}",
+ headers={"User-Agent": "PythonWebsocketClient"},
+ )
+ self.assertNotEqual(s, None)
+ self.assertEqual(s.getsubprotocol(), None)
+ s.send("Hello, World")
+ result = s.recv()
+ self.assertEqual(result, "Hello, World")
+ self.assertRaises(ValueError, s.close, -1, "")
+ s.close()
+
+ @unittest.skipUnless(
+ TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled"
+ )
+ def test_after_close(self):
+ s = ws.create_connection(f"ws://127.0.0.1:{LOCAL_WS_SERVER_PORT}")
+ self.assertNotEqual(s, None)
+ s.close()
+ self.assertRaises(ws.WebSocketConnectionClosedException, s.send, "Hello")
+ self.assertRaises(ws.WebSocketConnectionClosedException, s.recv)
+
+
+class SockOptTest(unittest.TestCase):
+ @unittest.skipUnless(
+ TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled"
+ )
+ def test_sockopt(self):
+ sockopt = ((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),)
+ s = ws.create_connection(
+ f"ws://127.0.0.1:{LOCAL_WS_SERVER_PORT}", sockopt=sockopt
+ )
+ self.assertNotEqual(
+ s.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0
+ )
+ s.close()
+
+
+class UtilsTest(unittest.TestCase):
+ def test_utf8_validator(self):
+ state = validate_utf8(b"\xf0\x90\x80\x80")
+ self.assertEqual(state, True)
+ state = validate_utf8(
+ b"\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5\xed\xa0\x80edited"
+ )
+ self.assertEqual(state, False)
+ state = validate_utf8(b"")
+ self.assertEqual(state, True)
+
+
+class HandshakeTest(unittest.TestCase):
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_http_ssl(self):
+ websock1 = ws.WebSocket(
+ sslopt={"cert_chain": ssl.get_default_verify_paths().capath},
+ enable_multithread=False,
+ )
+ self.assertRaises(ValueError, websock1.connect, "wss://api.bitfinex.com/ws/2")
+ websock2 = ws.WebSocket(sslopt={"certfile": "myNonexistentCertFile"})
+ self.assertRaises(
+ FileNotFoundError, websock2.connect, "wss://api.bitfinex.com/ws/2"
+ )
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_manual_headers(self):
+ websock3 = ws.WebSocket(
+ sslopt={
+ "ca_certs": ssl.get_default_verify_paths().cafile,
+ "ca_cert_path": ssl.get_default_verify_paths().capath,
+ }
+ )
+ self.assertRaises(
+ WebSocketBadStatusException,
+ websock3.connect,
+ "wss://api.bitfinex.com/ws/2",
+ cookie="chocolate",
+ origin="testing_websockets.com",
+ host="echo.websocket.events/websocket-client-test",
+ subprotocols=["testproto"],
+ connection="Upgrade",
+ header={
+ "CustomHeader1": "123",
+ "Cookie": "TestValue",
+ "Sec-WebSocket-Key": "k9kFAUWNAMmf5OEMfTlOEA==",
+ "Sec-WebSocket-Protocol": "newprotocol",
+ },
+ )
+
+ def test_ipv6(self):
+ websock2 = ws.WebSocket()
+ self.assertRaises(ValueError, websock2.connect, "2001:4860:4860::8888")
+
+ def test_bad_urls(self):
+ websock3 = ws.WebSocket()
+ self.assertRaises(ValueError, websock3.connect, "ws//example.com")
+ self.assertRaises(WebSocketAddressException, websock3.connect, "ws://example")
+ self.assertRaises(ValueError, websock3.connect, "example.com")
+
+
+if __name__ == "__main__":
+ unittest.main()