aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/websocket-client/websocket/tests/test_websocket.py
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-12-09 18:25:21 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-12-09 19:18:57 +0300
commit13374e0884578812cda7697d0c5680122db59a37 (patch)
tree30a022eb841035299deb2b8c393b2902f0c21735 /contrib/python/websocket-client/websocket/tests/test_websocket.py
parentc7ade6d3bf7cd492235a61b77153351e422a28f3 (diff)
downloadydb-13374e0884578812cda7697d0c5680122db59a37.tar.gz
Intermediate changes
commit_hash:034150f557268506d7bc0cbd8b5becf65f765593
Diffstat (limited to 'contrib/python/websocket-client/websocket/tests/test_websocket.py')
-rw-r--r--contrib/python/websocket-client/websocket/tests/test_websocket.py498
1 files changed, 498 insertions, 0 deletions
diff --git a/contrib/python/websocket-client/websocket/tests/test_websocket.py b/contrib/python/websocket-client/websocket/tests/test_websocket.py
new file mode 100644
index 0000000000..892312a2db
--- /dev/null
+++ b/contrib/python/websocket-client/websocket/tests/test_websocket.py
@@ -0,0 +1,498 @@
+# -*- coding: utf-8 -*-
+#
+import os
+import os.path
+import socket
+import unittest
+from base64 import decodebytes as base64decode
+
+import websocket as ws
+from websocket._exceptions import WebSocketBadStatusException, WebSocketAddressException
+from websocket._handshake import _create_sec_websocket_key
+from websocket._handshake import _validate as _validate_header
+from websocket._http import read_headers
+from websocket._utils import validate_utf8
+
+"""
+test_websocket.py
+websocket - WebSocket client library for Python
+
+Copyright 2024 engn33r
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+try:
+ import ssl
+except ImportError:
+ # dummy class of SSLError for ssl none-support environment.
+ class SSLError(Exception):
+ pass
+
+
+# Skip test to access the internet unless TEST_WITH_INTERNET == 1
+TEST_WITH_INTERNET = os.environ.get("TEST_WITH_INTERNET", "0") == "1"
+# Skip tests relying on local websockets server unless LOCAL_WS_SERVER_PORT != -1
+LOCAL_WS_SERVER_PORT = os.environ.get("LOCAL_WS_SERVER_PORT", "-1")
+TEST_WITH_LOCAL_SERVER = LOCAL_WS_SERVER_PORT != "-1"
+TRACEABLE = True
+
+
+def create_mask_key(_):
+ return "abcd"
+
+
+class SockMock:
+ def __init__(self):
+ self.data = []
+ self.sent = []
+
+ def add_packet(self, data):
+ self.data.append(data)
+
+ def gettimeout(self):
+ return None
+
+ def recv(self, bufsize):
+ if self.data:
+ e = self.data.pop(0)
+ if isinstance(e, Exception):
+ raise e
+ if len(e) > bufsize:
+ self.data.insert(0, e[bufsize:])
+ return e[:bufsize]
+
+ def send(self, data):
+ self.sent.append(data)
+ return len(data)
+
+ def close(self):
+ pass
+
+
+class HeaderSockMock(SockMock):
+ def __init__(self, fname):
+ SockMock.__init__(self)
+ import yatest.common as yc
+ path = os.path.join(os.path.dirname(yc.source_path(__file__)), fname)
+ with open(path, "rb") as f:
+ self.add_packet(f.read())
+
+
+class WebSocketTest(unittest.TestCase):
+ def setUp(self):
+ ws.enableTrace(TRACEABLE)
+
+ def tearDown(self):
+ pass
+
+ def test_default_timeout(self):
+ self.assertEqual(ws.getdefaulttimeout(), None)
+ ws.setdefaulttimeout(10)
+ self.assertEqual(ws.getdefaulttimeout(), 10)
+ ws.setdefaulttimeout(None)
+
+ def test_ws_key(self):
+ key = _create_sec_websocket_key()
+ self.assertTrue(key != 24)
+ self.assertTrue("¥n" not in key)
+
+ def test_nonce(self):
+ """WebSocket key should be a random 16-byte nonce."""
+ key = _create_sec_websocket_key()
+ nonce = base64decode(key.encode("utf-8"))
+ self.assertEqual(16, len(nonce))
+
+ def test_ws_utils(self):
+ key = "c6b8hTg4EeGb2gQMztV1/g=="
+ required_header = {
+ "upgrade": "websocket",
+ "connection": "upgrade",
+ "sec-websocket-accept": "Kxep+hNu9n51529fGidYu7a3wO0=",
+ }
+ self.assertEqual(_validate_header(required_header, key, None), (True, None))
+
+ header = required_header.copy()
+ header["upgrade"] = "http"
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+ del header["upgrade"]
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+
+ header = required_header.copy()
+ header["connection"] = "something"
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+ del header["connection"]
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+
+ header = required_header.copy()
+ header["sec-websocket-accept"] = "something"
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+ del header["sec-websocket-accept"]
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+
+ header = required_header.copy()
+ header["sec-websocket-protocol"] = "sub1"
+ self.assertEqual(
+ _validate_header(header, key, ["sub1", "sub2"]), (True, "sub1")
+ )
+ # This case will print out a logging error using the error() function, but that is expected
+ self.assertEqual(_validate_header(header, key, ["sub2", "sub3"]), (False, None))
+
+ header = required_header.copy()
+ header["sec-websocket-protocol"] = "sUb1"
+ self.assertEqual(
+ _validate_header(header, key, ["Sub1", "suB2"]), (True, "sub1")
+ )
+
+ header = required_header.copy()
+ # This case will print out a logging error using the error() function, but that is expected
+ self.assertEqual(_validate_header(header, key, ["Sub1", "suB2"]), (False, None))
+
+ def test_read_header(self):
+ status, header, _ = read_headers(HeaderSockMock("data/header01.txt"))
+ self.assertEqual(status, 101)
+ self.assertEqual(header["connection"], "Upgrade")
+
+ status, header, _ = read_headers(HeaderSockMock("data/header03.txt"))
+ self.assertEqual(status, 101)
+ self.assertEqual(header["connection"], "Upgrade, Keep-Alive")
+
+ HeaderSockMock("data/header02.txt")
+ self.assertRaises(
+ ws.WebSocketException, read_headers, HeaderSockMock("data/header02.txt")
+ )
+
+ def test_send(self):
+ # TODO: add longer frame data
+ sock = ws.WebSocket()
+ sock.set_mask_key(create_mask_key)
+ s = sock.sock = HeaderSockMock("data/header01.txt")
+ sock.send("Hello")
+ self.assertEqual(s.sent[0], b"\x81\x85abcd)\x07\x0f\x08\x0e")
+
+ sock.send("こんにちは")
+ self.assertEqual(
+ s.sent[1],
+ b"\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc",
+ )
+
+ # sock.send("x" * 5000)
+ # self.assertEqual(s.sent[1], b'\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc")
+
+ self.assertEqual(sock.send_binary(b"1111111111101"), 19)
+
+ def test_recv(self):
+ # TODO: add longer frame data
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ something = (
+ b"\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc"
+ )
+ s.add_packet(something)
+ data = sock.recv()
+ self.assertEqual(data, "こんにちは")
+
+ s.add_packet(b"\x81\x85abcd)\x07\x0f\x08\x0e")
+ data = sock.recv()
+ self.assertEqual(data, "Hello")
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_iter(self):
+ count = 2
+ s = ws.create_connection("wss://api.bitfinex.com/ws/2")
+ s.send('{"event": "subscribe", "channel": "ticker"}')
+ for _ in s:
+ count -= 1
+ if count == 0:
+ break
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_next(self):
+ sock = ws.create_connection("wss://api.bitfinex.com/ws/2")
+ self.assertEqual(str, type(next(sock)))
+
+ def test_internal_recv_strict(self):
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ s.add_packet(b"foo")
+ s.add_packet(socket.timeout())
+ s.add_packet(b"bar")
+ # s.add_packet(SSLError("The read operation timed out"))
+ s.add_packet(b"baz")
+ with self.assertRaises(ws.WebSocketTimeoutException):
+ sock.frame_buffer.recv_strict(9)
+ # with self.assertRaises(SSLError):
+ # data = sock._recv_strict(9)
+ data = sock.frame_buffer.recv_strict(9)
+ self.assertEqual(data, b"foobarbaz")
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.frame_buffer.recv_strict(1)
+
+ def test_recv_timeout(self):
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ s.add_packet(b"\x81")
+ s.add_packet(socket.timeout())
+ s.add_packet(b"\x8dabcd\x29\x07\x0f\x08\x0e")
+ s.add_packet(socket.timeout())
+ s.add_packet(b"\x4e\x43\x33\x0e\x10\x0f\x00\x40")
+ with self.assertRaises(ws.WebSocketTimeoutException):
+ sock.recv()
+ with self.assertRaises(ws.WebSocketTimeoutException):
+ sock.recv()
+ data = sock.recv()
+ self.assertEqual(data, "Hello, World!")
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.recv()
+
+ def test_recv_with_simple_fragmentation(self):
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ # OPCODE=TEXT, FIN=0, MSG="Brevity is "
+ s.add_packet(b"\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C")
+ # OPCODE=CONT, FIN=1, MSG="the soul of wit"
+ s.add_packet(b"\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17")
+ data = sock.recv()
+ self.assertEqual(data, "Brevity is the soul of wit")
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.recv()
+
+ def test_recv_with_fire_event_of_fragmentation(self):
+ sock = ws.WebSocket(fire_cont_frame=True)
+ s = sock.sock = SockMock()
+ # OPCODE=TEXT, FIN=0, MSG="Brevity is "
+ s.add_packet(b"\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C")
+ # OPCODE=CONT, FIN=0, MSG="Brevity is "
+ s.add_packet(b"\x00\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C")
+ # OPCODE=CONT, FIN=1, MSG="the soul of wit"
+ s.add_packet(b"\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17")
+
+ _, data = sock.recv_data()
+ self.assertEqual(data, b"Brevity is ")
+ _, data = sock.recv_data()
+ self.assertEqual(data, b"Brevity is ")
+ _, data = sock.recv_data()
+ self.assertEqual(data, b"the soul of wit")
+
+ # OPCODE=CONT, FIN=0, MSG="Brevity is "
+ s.add_packet(b"\x80\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C")
+
+ with self.assertRaises(ws.WebSocketException):
+ sock.recv_data()
+
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.recv()
+
+ def test_close(self):
+ sock = ws.WebSocket()
+ sock.connected = True
+ sock.close
+
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ sock.connected = True
+ s.add_packet(b"\x88\x80\x17\x98p\x84")
+ sock.recv()
+ self.assertEqual(sock.connected, False)
+
+ def test_recv_cont_fragmentation(self):
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ # OPCODE=CONT, FIN=1, MSG="the soul of wit"
+ s.add_packet(b"\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17")
+ self.assertRaises(ws.WebSocketException, sock.recv)
+
+ def test_recv_with_prolonged_fragmentation(self):
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ # OPCODE=TEXT, FIN=0, MSG="Once more unto the breach, "
+ s.add_packet(
+ b"\x01\x9babcd.\x0c\x00\x01A\x0f\x0c\x16\x04B\x16\n\x15\rC\x10\t\x07C\x06\x13\x07\x02\x07\tNC"
+ )
+ # OPCODE=CONT, FIN=0, MSG="dear friends, "
+ s.add_packet(b"\x00\x8eabcd\x05\x07\x02\x16A\x04\x11\r\x04\x0c\x07\x17MB")
+ # OPCODE=CONT, FIN=1, MSG="once more"
+ s.add_packet(b"\x80\x89abcd\x0e\x0c\x00\x01A\x0f\x0c\x16\x04")
+ data = sock.recv()
+ self.assertEqual(data, "Once more unto the breach, dear friends, once more")
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.recv()
+
+ def test_recv_with_fragmentation_and_control_frame(self):
+ sock = ws.WebSocket()
+ sock.set_mask_key(create_mask_key)
+ s = sock.sock = SockMock()
+ # OPCODE=TEXT, FIN=0, MSG="Too much "
+ s.add_packet(b"\x01\x89abcd5\r\x0cD\x0c\x17\x00\x0cA")
+ # OPCODE=PING, FIN=1, MSG="Please PONG this"
+ s.add_packet(b"\x89\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17")
+ # OPCODE=CONT, FIN=1, MSG="of a good thing"
+ s.add_packet(b"\x80\x8fabcd\x0e\x04C\x05A\x05\x0c\x0b\x05B\x17\x0c\x08\x0c\x04")
+ data = sock.recv()
+ self.assertEqual(data, "Too much of a good thing")
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.recv()
+ self.assertEqual(
+ s.sent[0], b"\x8a\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17"
+ )
+
+ @unittest.skipUnless(
+ TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled"
+ )
+ def test_websocket(self):
+ s = ws.create_connection(f"ws://127.0.0.1:{LOCAL_WS_SERVER_PORT}")
+ self.assertNotEqual(s, None)
+ s.send("Hello, World")
+ result = s.next()
+ s.fileno()
+ self.assertEqual(result, "Hello, World")
+
+ s.send("こにゃにゃちは、世界")
+ result = s.recv()
+ self.assertEqual(result, "こにゃにゃちは、世界")
+ self.assertRaises(ValueError, s.send_close, -1, "")
+ s.close()
+
+ @unittest.skipUnless(
+ TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled"
+ )
+ def test_ping_pong(self):
+ s = ws.create_connection(f"ws://127.0.0.1:{LOCAL_WS_SERVER_PORT}")
+ self.assertNotEqual(s, None)
+ s.ping("Hello")
+ s.pong("Hi")
+ s.close()
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_support_redirect(self):
+ s = ws.WebSocket()
+ self.assertRaises(WebSocketBadStatusException, s.connect, "ws://google.com/")
+ # Need to find a URL that has a redirect code leading to a websocket
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_secure_websocket(self):
+ s = ws.create_connection("wss://api.bitfinex.com/ws/2")
+ self.assertNotEqual(s, None)
+ self.assertTrue(isinstance(s.sock, ssl.SSLSocket))
+ self.assertEqual(s.getstatus(), 101)
+ self.assertNotEqual(s.getheaders(), None)
+ s.settimeout(10)
+ self.assertEqual(s.gettimeout(), 10)
+ self.assertEqual(s.getsubprotocol(), None)
+ s.abort()
+
+ @unittest.skipUnless(
+ TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled"
+ )
+ def test_websocket_with_custom_header(self):
+ s = ws.create_connection(
+ f"ws://127.0.0.1:{LOCAL_WS_SERVER_PORT}",
+ headers={"User-Agent": "PythonWebsocketClient"},
+ )
+ self.assertNotEqual(s, None)
+ self.assertEqual(s.getsubprotocol(), None)
+ s.send("Hello, World")
+ result = s.recv()
+ self.assertEqual(result, "Hello, World")
+ self.assertRaises(ValueError, s.close, -1, "")
+ s.close()
+
+ @unittest.skipUnless(
+ TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled"
+ )
+ def test_after_close(self):
+ s = ws.create_connection(f"ws://127.0.0.1:{LOCAL_WS_SERVER_PORT}")
+ self.assertNotEqual(s, None)
+ s.close()
+ self.assertRaises(ws.WebSocketConnectionClosedException, s.send, "Hello")
+ self.assertRaises(ws.WebSocketConnectionClosedException, s.recv)
+
+
+class SockOptTest(unittest.TestCase):
+ @unittest.skipUnless(
+ TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled"
+ )
+ def test_sockopt(self):
+ sockopt = ((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),)
+ s = ws.create_connection(
+ f"ws://127.0.0.1:{LOCAL_WS_SERVER_PORT}", sockopt=sockopt
+ )
+ self.assertNotEqual(
+ s.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0
+ )
+ s.close()
+
+
+class UtilsTest(unittest.TestCase):
+ def test_utf8_validator(self):
+ state = validate_utf8(b"\xf0\x90\x80\x80")
+ self.assertEqual(state, True)
+ state = validate_utf8(
+ b"\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5\xed\xa0\x80edited"
+ )
+ self.assertEqual(state, False)
+ state = validate_utf8(b"")
+ self.assertEqual(state, True)
+
+
+class HandshakeTest(unittest.TestCase):
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_http_ssl(self):
+ websock1 = ws.WebSocket(
+ sslopt={"cert_chain": ssl.get_default_verify_paths().capath},
+ enable_multithread=False,
+ )
+ self.assertRaises(ValueError, websock1.connect, "wss://api.bitfinex.com/ws/2")
+ websock2 = ws.WebSocket(sslopt={"certfile": "myNonexistentCertFile"})
+ self.assertRaises(
+ FileNotFoundError, websock2.connect, "wss://api.bitfinex.com/ws/2"
+ )
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def test_manual_headers(self):
+ websock3 = ws.WebSocket(
+ sslopt={
+ "ca_certs": ssl.get_default_verify_paths().cafile,
+ "ca_cert_path": ssl.get_default_verify_paths().capath,
+ }
+ )
+ self.assertRaises(
+ WebSocketBadStatusException,
+ websock3.connect,
+ "wss://api.bitfinex.com/ws/2",
+ cookie="chocolate",
+ origin="testing_websockets.com",
+ host="echo.websocket.events/websocket-client-test",
+ subprotocols=["testproto"],
+ connection="Upgrade",
+ header={
+ "CustomHeader1": "123",
+ "Cookie": "TestValue",
+ "Sec-WebSocket-Key": "k9kFAUWNAMmf5OEMfTlOEA==",
+ "Sec-WebSocket-Protocol": "newprotocol",
+ },
+ )
+
+ def test_ipv6(self):
+ websock2 = ws.WebSocket()
+ self.assertRaises(ValueError, websock2.connect, "2001:4860:4860::8888")
+
+ def test_bad_urls(self):
+ websock3 = ws.WebSocket()
+ self.assertRaises(ValueError, websock3.connect, "ws//example.com")
+ self.assertRaises(WebSocketAddressException, websock3.connect, "ws://example")
+ self.assertRaises(ValueError, websock3.connect, "example.com")
+
+
+if __name__ == "__main__":
+ unittest.main()