aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/websocket-client/py2/websocket/tests
diff options
context:
space:
mode:
authoralexv-smirnov <alex@ydb.tech>2023-12-01 12:02:50 +0300
committeralexv-smirnov <alex@ydb.tech>2023-12-01 13:28:10 +0300
commit0e578a4c44d4abd539d9838347b9ebafaca41dfb (patch)
treea0c1969c37f818c830ebeff9c077eacf30be6ef8 /contrib/python/websocket-client/py2/websocket/tests
parent84f2d3d4cc985e63217cff149bd2e6d67ae6fe22 (diff)
downloadydb-0e578a4c44d4abd539d9838347b9ebafaca41dfb.tar.gz
Change "ya.make"
Diffstat (limited to 'contrib/python/websocket-client/py2/websocket/tests')
-rw-r--r--contrib/python/websocket-client/py2/websocket/tests/__init__.py0
-rw-r--r--contrib/python/websocket-client/py2/websocket/tests/data/header01.txt6
-rw-r--r--contrib/python/websocket-client/py2/websocket/tests/data/header02.txt6
-rw-r--r--contrib/python/websocket-client/py2/websocket/tests/data/header03.txt6
-rw-r--r--contrib/python/websocket-client/py2/websocket/tests/test_abnf.py77
-rw-r--r--contrib/python/websocket-client/py2/websocket/tests/test_app.py137
-rw-r--r--contrib/python/websocket-client/py2/websocket/tests/test_cookiejar.py117
-rw-r--r--contrib/python/websocket-client/py2/websocket/tests/test_http.py110
-rw-r--r--contrib/python/websocket-client/py2/websocket/tests/test_url.py309
-rw-r--r--contrib/python/websocket-client/py2/websocket/tests/test_websocket.py434
10 files changed, 1202 insertions, 0 deletions
diff --git a/contrib/python/websocket-client/py2/websocket/tests/__init__.py b/contrib/python/websocket-client/py2/websocket/tests/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/contrib/python/websocket-client/py2/websocket/tests/__init__.py
diff --git a/contrib/python/websocket-client/py2/websocket/tests/data/header01.txt b/contrib/python/websocket-client/py2/websocket/tests/data/header01.txt
new file mode 100644
index 0000000000..d44d24c205
--- /dev/null
+++ b/contrib/python/websocket-client/py2/websocket/tests/data/header01.txt
@@ -0,0 +1,6 @@
+HTTP/1.1 101 WebSocket Protocol Handshake
+Connection: Upgrade
+Upgrade: WebSocket
+Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0=
+some_header: something
+
diff --git a/contrib/python/websocket-client/py2/websocket/tests/data/header02.txt b/contrib/python/websocket-client/py2/websocket/tests/data/header02.txt
new file mode 100644
index 0000000000..f481de928a
--- /dev/null
+++ b/contrib/python/websocket-client/py2/websocket/tests/data/header02.txt
@@ -0,0 +1,6 @@
+HTTP/1.1 101 WebSocket Protocol Handshake
+Connection: Upgrade
+Upgrade WebSocket
+Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0=
+some_header: something
+
diff --git a/contrib/python/websocket-client/py2/websocket/tests/data/header03.txt b/contrib/python/websocket-client/py2/websocket/tests/data/header03.txt
new file mode 100644
index 0000000000..012b7d18dd
--- /dev/null
+++ b/contrib/python/websocket-client/py2/websocket/tests/data/header03.txt
@@ -0,0 +1,6 @@
+HTTP/1.1 101 WebSocket Protocol Handshake
+Connection: Upgrade, Keep-Alive
+Upgrade: WebSocket
+Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0=
+some_header: something
+
diff --git a/contrib/python/websocket-client/py2/websocket/tests/test_abnf.py b/contrib/python/websocket-client/py2/websocket/tests/test_abnf.py
new file mode 100644
index 0000000000..acce020682
--- /dev/null
+++ b/contrib/python/websocket-client/py2/websocket/tests/test_abnf.py
@@ -0,0 +1,77 @@
+# -*- coding: utf-8 -*-
+#
+"""
+websocket - WebSocket client library for Python
+
+Copyright (C) 2010 Hiroki Ohtani(liris)
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""
+
+import os
+import websocket as ws
+from websocket._abnf import *
+import sys
+sys.path[0:0] = [""]
+
+if sys.version_info[0] == 2 and sys.version_info[1] < 7:
+ import unittest2 as unittest
+else:
+ import unittest
+
+
+class ABNFTest(unittest.TestCase):
+
+ def testInit(self):
+ a = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING)
+ self.assertEqual(a.fin, 0)
+ self.assertEqual(a.rsv1, 0)
+ self.assertEqual(a.rsv2, 0)
+ self.assertEqual(a.rsv3, 0)
+ self.assertEqual(a.opcode, 9)
+ self.assertEqual(a.data, '')
+ a_bad = ABNF(0,1,0,0, opcode=77)
+ self.assertEqual(a_bad.rsv1, 1)
+ self.assertEqual(a_bad.opcode, 77)
+
+ def testValidate(self):
+ a = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING)
+ self.assertRaises(ws.WebSocketProtocolException, a.validate)
+ a_bad = ABNF(0,1,0,0, opcode=77)
+ self.assertRaises(ws.WebSocketProtocolException, a_bad.validate)
+ a_close = ABNF(0,1,0,0, opcode=ABNF.OPCODE_CLOSE, data="abcdefgh1234567890abcdefgh1234567890abcdefgh1234567890abcdefgh1234567890")
+ self.assertRaises(ws.WebSocketProtocolException, a_close.validate)
+
+# This caused an error in the Python 2.7 Github Actions build
+# Uncomment test case when Python 2 support no longer wanted
+# def testMask(self):
+# ab = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING)
+# bytes_val = bytes("aaaa", 'utf-8')
+# self.assertEqual(ab._get_masked(bytes_val), bytes_val)
+
+ def testFrameBuffer(self):
+ fb = frame_buffer(0, True)
+ self.assertEqual(fb.recv, 0)
+ self.assertEqual(fb.skip_utf8_validation, True)
+ fb.clear
+ self.assertEqual(fb.header, None)
+ self.assertEqual(fb.length, None)
+ self.assertEqual(fb.mask, None)
+ self.assertEqual(fb.has_mask(), False)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/contrib/python/websocket-client/py2/websocket/tests/test_app.py b/contrib/python/websocket-client/py2/websocket/tests/test_app.py
new file mode 100644
index 0000000000..e5a739008e
--- /dev/null
+++ b/contrib/python/websocket-client/py2/websocket/tests/test_app.py
@@ -0,0 +1,137 @@
+# -*- coding: utf-8 -*-
+#
+"""
+websocket - WebSocket client library for Python
+
+Copyright (C) 2010 Hiroki Ohtani(liris)
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""
+
+import os
+import os.path
+import websocket as ws
+import sys
+sys.path[0:0] = [""]
+
+try:
+ import ssl
+except ImportError:
+ HAVE_SSL = False
+
+if sys.version_info[0] == 2 and sys.version_info[1] < 7:
+ import unittest2 as unittest
+else:
+ import unittest
+
+# Skip test to access the internet.
+TEST_WITH_INTERNET = os.environ.get('TEST_WITH_INTERNET', '0') == '1'
+TRACEABLE = True
+
+
+class WebSocketAppTest(unittest.TestCase):
+
+ class NotSetYet(object):
+ """ A marker class for signalling that a value hasn't been set yet.
+ """
+
+ def setUp(self):
+ ws.enableTrace(TRACEABLE)
+
+ WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet()
+ WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet()
+ WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet()
+
+ def tearDown(self):
+ WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet()
+ WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet()
+ WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet()
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def testKeepRunning(self):
+ """ A WebSocketApp should keep running as long as its self.keep_running
+ is not False (in the boolean context).
+ """
+
+ def on_open(self, *args, **kwargs):
+ """ Set the keep_running flag for later inspection and immediately
+ close the connection.
+ """
+ WebSocketAppTest.keep_running_open = self.keep_running
+
+ self.close()
+
+ def on_close(self, *args, **kwargs):
+ """ Set the keep_running flag for the test to use.
+ """
+ WebSocketAppTest.keep_running_close = self.keep_running
+
+ app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, on_close=on_close)
+ app.run_forever()
+
+ # if numpy is installed, this assertion fail
+ # self.assertFalse(isinstance(WebSocketAppTest.keep_running_open,
+ # WebSocketAppTest.NotSetYet))
+
+ # self.assertFalse(isinstance(WebSocketAppTest.keep_running_close,
+ # WebSocketAppTest.NotSetYet))
+
+ # self.assertEqual(True, WebSocketAppTest.keep_running_open)
+ # self.assertEqual(False, WebSocketAppTest.keep_running_close)
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def testSockMaskKey(self):
+ """ A WebSocketApp should forward the received mask_key function down
+ to the actual socket.
+ """
+
+ def my_mask_key_func():
+ pass
+
+ def on_open(self, *args, **kwargs):
+ """ Set the value so the test can use it later on and immediately
+ close the connection.
+ """
+ WebSocketAppTest.get_mask_key_id = id(self.get_mask_key)
+ self.close()
+
+ app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func)
+ app.run_forever()
+
+ # if numpy is installed, this assertion fail
+ # Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
+ # self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func))
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def testPingInterval(self):
+ """ A WebSocketApp should ping regularly
+ """
+
+ def on_ping(app, msg):
+ print("Got a ping!")
+ app.close()
+
+ def on_pong(app, msg):
+ print("Got a pong! No need to respond")
+ app.close()
+
+ app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1', on_ping=on_ping, on_pong=on_pong)
+ app.run_forever(ping_interval=2, ping_timeout=1) # , sslopt={"cert_reqs": ssl.CERT_NONE}
+ self.assertRaises(ws.WebSocketException, app.run_forever, ping_interval=2, ping_timeout=3, sslopt={"cert_reqs": ssl.CERT_NONE})
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/contrib/python/websocket-client/py2/websocket/tests/test_cookiejar.py b/contrib/python/websocket-client/py2/websocket/tests/test_cookiejar.py
new file mode 100644
index 0000000000..fc66e58b0e
--- /dev/null
+++ b/contrib/python/websocket-client/py2/websocket/tests/test_cookiejar.py
@@ -0,0 +1,117 @@
+"""
+
+"""
+
+"""
+websocket - WebSocket client library for Python
+
+Copyright (C) 2010 Hiroki Ohtani(liris)
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""
+import unittest
+
+from websocket._cookiejar import SimpleCookieJar
+
+
+class CookieJarTest(unittest.TestCase):
+ def testAdd(self):
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.add("")
+ self.assertFalse(cookie_jar.jar, "Cookie with no domain should not be added to the jar")
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.add("a=b")
+ self.assertFalse(cookie_jar.jar, "Cookie with no domain should not be added to the jar")
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.add("a=b; domain=.abc")
+ self.assertTrue(".abc" in cookie_jar.jar)
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.add("a=b; domain=abc")
+ self.assertTrue(".abc" in cookie_jar.jar)
+ self.assertTrue("abc" not in cookie_jar.jar)
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.add("a=b; c=d; domain=abc")
+ self.assertEqual(cookie_jar.get("abc"), "a=b; c=d")
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.add("a=b; c=d; domain=abc")
+ cookie_jar.add("e=f; domain=abc")
+ self.assertEqual(cookie_jar.get("abc"), "a=b; c=d; e=f")
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.add("a=b; c=d; domain=abc")
+ cookie_jar.add("e=f; domain=.abc")
+ self.assertEqual(cookie_jar.get("abc"), "a=b; c=d; e=f")
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.add("a=b; c=d; domain=abc")
+ cookie_jar.add("e=f; domain=xyz")
+ self.assertEqual(cookie_jar.get("abc"), "a=b; c=d")
+ self.assertEqual(cookie_jar.get("xyz"), "e=f")
+ self.assertEqual(cookie_jar.get("something"), "")
+
+ def testSet(self):
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.set("a=b")
+ self.assertFalse(cookie_jar.jar, "Cookie with no domain should not be added to the jar")
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.set("a=b; domain=.abc")
+ self.assertTrue(".abc" in cookie_jar.jar)
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.set("a=b; domain=abc")
+ self.assertTrue(".abc" in cookie_jar.jar)
+ self.assertTrue("abc" not in cookie_jar.jar)
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.set("a=b; c=d; domain=abc")
+ self.assertEqual(cookie_jar.get("abc"), "a=b; c=d")
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.set("a=b; c=d; domain=abc")
+ cookie_jar.set("e=f; domain=abc")
+ self.assertEqual(cookie_jar.get("abc"), "e=f")
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.set("a=b; c=d; domain=abc")
+ cookie_jar.set("e=f; domain=.abc")
+ self.assertEqual(cookie_jar.get("abc"), "e=f")
+
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.set("a=b; c=d; domain=abc")
+ cookie_jar.set("e=f; domain=xyz")
+ self.assertEqual(cookie_jar.get("abc"), "a=b; c=d")
+ self.assertEqual(cookie_jar.get("xyz"), "e=f")
+ self.assertEqual(cookie_jar.get("something"), "")
+
+ def testGet(self):
+ cookie_jar = SimpleCookieJar()
+ cookie_jar.set("a=b; c=d; domain=abc.com")
+ self.assertEqual(cookie_jar.get("abc.com"), "a=b; c=d")
+ self.assertEqual(cookie_jar.get("x.abc.com"), "a=b; c=d")
+ self.assertEqual(cookie_jar.get("abc.com.es"), "")
+ self.assertEqual(cookie_jar.get("xabc.com"), "")
+
+ cookie_jar.set("a=b; c=d; domain=.abc.com")
+ self.assertEqual(cookie_jar.get("abc.com"), "a=b; c=d")
+ self.assertEqual(cookie_jar.get("x.abc.com"), "a=b; c=d")
+ self.assertEqual(cookie_jar.get("abc.com.es"), "")
+ self.assertEqual(cookie_jar.get("xabc.com"), "")
diff --git a/contrib/python/websocket-client/py2/websocket/tests/test_http.py b/contrib/python/websocket-client/py2/websocket/tests/test_http.py
new file mode 100644
index 0000000000..f08bd0c91c
--- /dev/null
+++ b/contrib/python/websocket-client/py2/websocket/tests/test_http.py
@@ -0,0 +1,110 @@
+# -*- coding: utf-8 -*-
+#
+"""
+websocket - WebSocket client library for Python
+
+Copyright (C) 2010 Hiroki Ohtani(liris)
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""
+
+import os
+import os.path
+import websocket as ws
+from websocket._http import proxy_info, read_headers, _open_proxied_socket, _tunnel
+import sys
+sys.path[0:0] = [""]
+
+if sys.version_info[0] == 2 and sys.version_info[1] < 7:
+ import unittest2 as unittest
+else:
+ import unittest
+
+
+class SockMock(object):
+ def __init__(self):
+ self.data = []
+ self.sent = []
+
+ def add_packet(self, data):
+ self.data.append(data)
+
+ def gettimeout(self):
+ return None
+
+ def recv(self, bufsize):
+ if self.data:
+ e = self.data.pop(0)
+ if isinstance(e, Exception):
+ raise e
+ if len(e) > bufsize:
+ self.data.insert(0, e[bufsize:])
+ return e[:bufsize]
+
+ def send(self, data):
+ self.sent.append(data)
+ return len(data)
+
+ def close(self):
+ pass
+
+
+class HeaderSockMock(SockMock):
+
+ def __init__(self, fname):
+ SockMock.__init__(self)
+ import yatest.common
+ path = yatest.common.source_path(os.path.join('contrib/python/websocket-client/py2/websocket/tests', fname))
+ with open(path, "rb") as f:
+ self.add_packet(f.read())
+
+
+class OptsList():
+
+ def __init__(self):
+ self.timeout = 0
+ self.sockopt = []
+
+
+class HttpTest(unittest.TestCase):
+
+ def testReadHeader(self):
+ status, header, status_message = read_headers(HeaderSockMock("data/header01.txt"))
+ self.assertEqual(status, 101)
+ self.assertEqual(header["connection"], "Upgrade")
+ # header02.txt is intentionally malformed
+ self.assertRaises(ws.WebSocketException, read_headers, HeaderSockMock("data/header02.txt"))
+
+ def testTunnel(self):
+ self.assertRaises(ws.WebSocketProxyException, _tunnel, HeaderSockMock("data/header01.txt"), "example.com", 80, ("username", "password"))
+ self.assertRaises(ws.WebSocketProxyException, _tunnel, HeaderSockMock("data/header02.txt"), "example.com", 80, ("username", "password"))
+
+ def _testConnect(self):
+ # Not currently testing an actual proxy connection, so just check whether TypeError is raised
+ self.assertRaises(TypeError, _open_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="http"))
+ self.assertRaises(TypeError, _open_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="socks4"))
+ self.assertRaises(TypeError, _open_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="socks5h"))
+
+ def testProxyInfo(self):
+ self.assertEqual(proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http").type, "http")
+ self.assertRaises(ValueError, proxy_info, http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="badval")
+ self.assertEqual(proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="http").host, "example.com")
+ self.assertEqual(proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http").port, "8080")
+ self.assertEqual(proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http").auth, None)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/contrib/python/websocket-client/py2/websocket/tests/test_url.py b/contrib/python/websocket-client/py2/websocket/tests/test_url.py
new file mode 100644
index 0000000000..b1d8e06f23
--- /dev/null
+++ b/contrib/python/websocket-client/py2/websocket/tests/test_url.py
@@ -0,0 +1,309 @@
+# -*- coding: utf-8 -*-
+#
+"""
+websocket - WebSocket client library for Python
+
+Copyright (C) 2010 Hiroki Ohtani(liris)
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""
+
+import sys
+import os
+
+from websocket._url import get_proxy_info, parse_url, _is_address_in_network, _is_no_proxy_host
+
+if sys.version_info[0] == 2 and sys.version_info[1] < 7:
+ import unittest2 as unittest
+else:
+ import unittest
+sys.path[0:0] = [""]
+
+
+class UrlTest(unittest.TestCase):
+
+ def test_address_in_network(self):
+ self.assertTrue(_is_address_in_network('127.0.0.1', '127.0.0.0/8'))
+ self.assertTrue(_is_address_in_network('127.1.0.1', '127.0.0.0/8'))
+ self.assertFalse(_is_address_in_network('127.1.0.1', '127.0.0.0/24'))
+
+ def testParseUrl(self):
+ p = parse_url("ws://www.example.com/r")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 80)
+ self.assertEqual(p[2], "/r")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("ws://www.example.com/r/")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 80)
+ self.assertEqual(p[2], "/r/")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("ws://www.example.com/")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 80)
+ self.assertEqual(p[2], "/")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("ws://www.example.com")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 80)
+ self.assertEqual(p[2], "/")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("ws://www.example.com:8080/r")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 8080)
+ self.assertEqual(p[2], "/r")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("ws://www.example.com:8080/")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 8080)
+ self.assertEqual(p[2], "/")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("ws://www.example.com:8080")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 8080)
+ self.assertEqual(p[2], "/")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("wss://www.example.com:8080/r")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 8080)
+ self.assertEqual(p[2], "/r")
+ self.assertEqual(p[3], True)
+
+ p = parse_url("wss://www.example.com:8080/r?key=value")
+ self.assertEqual(p[0], "www.example.com")
+ self.assertEqual(p[1], 8080)
+ self.assertEqual(p[2], "/r?key=value")
+ self.assertEqual(p[3], True)
+
+ self.assertRaises(ValueError, parse_url, "http://www.example.com/r")
+
+ if sys.version_info[0] == 2 and sys.version_info[1] < 7:
+ return
+
+ p = parse_url("ws://[2a03:4000:123:83::3]/r")
+ self.assertEqual(p[0], "2a03:4000:123:83::3")
+ self.assertEqual(p[1], 80)
+ self.assertEqual(p[2], "/r")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("ws://[2a03:4000:123:83::3]:8080/r")
+ self.assertEqual(p[0], "2a03:4000:123:83::3")
+ self.assertEqual(p[1], 8080)
+ self.assertEqual(p[2], "/r")
+ self.assertEqual(p[3], False)
+
+ p = parse_url("wss://[2a03:4000:123:83::3]/r")
+ self.assertEqual(p[0], "2a03:4000:123:83::3")
+ self.assertEqual(p[1], 443)
+ self.assertEqual(p[2], "/r")
+ self.assertEqual(p[3], True)
+
+ p = parse_url("wss://[2a03:4000:123:83::3]:8080/r")
+ self.assertEqual(p[0], "2a03:4000:123:83::3")
+ self.assertEqual(p[1], 8080)
+ self.assertEqual(p[2], "/r")
+ self.assertEqual(p[3], True)
+
+
+class IsNoProxyHostTest(unittest.TestCase):
+ def setUp(self):
+ self.no_proxy = os.environ.get("no_proxy", None)
+ if "no_proxy" in os.environ:
+ del os.environ["no_proxy"]
+
+ def tearDown(self):
+ if self.no_proxy:
+ os.environ["no_proxy"] = self.no_proxy
+ elif "no_proxy" in os.environ:
+ del os.environ["no_proxy"]
+
+ def testMatchAll(self):
+ self.assertTrue(_is_no_proxy_host("any.websocket.org", ['*']))
+ self.assertTrue(_is_no_proxy_host("192.168.0.1", ['*']))
+ self.assertTrue(_is_no_proxy_host("any.websocket.org", ['other.websocket.org', '*']))
+ os.environ['no_proxy'] = '*'
+ self.assertTrue(_is_no_proxy_host("any.websocket.org", None))
+ self.assertTrue(_is_no_proxy_host("192.168.0.1", None))
+ os.environ['no_proxy'] = 'other.websocket.org, *'
+ self.assertTrue(_is_no_proxy_host("any.websocket.org", None))
+
+ def testIpAddress(self):
+ self.assertTrue(_is_no_proxy_host("127.0.0.1", ['127.0.0.1']))
+ self.assertFalse(_is_no_proxy_host("127.0.0.2", ['127.0.0.1']))
+ self.assertTrue(_is_no_proxy_host("127.0.0.1", ['other.websocket.org', '127.0.0.1']))
+ self.assertFalse(_is_no_proxy_host("127.0.0.2", ['other.websocket.org', '127.0.0.1']))
+ os.environ['no_proxy'] = '127.0.0.1'
+ self.assertTrue(_is_no_proxy_host("127.0.0.1", None))
+ self.assertFalse(_is_no_proxy_host("127.0.0.2", None))
+ os.environ['no_proxy'] = 'other.websocket.org, 127.0.0.1'
+ self.assertTrue(_is_no_proxy_host("127.0.0.1", None))
+ self.assertFalse(_is_no_proxy_host("127.0.0.2", None))
+
+ def testIpAddressInRange(self):
+ self.assertTrue(_is_no_proxy_host("127.0.0.1", ['127.0.0.0/8']))
+ self.assertTrue(_is_no_proxy_host("127.0.0.2", ['127.0.0.0/8']))
+ self.assertFalse(_is_no_proxy_host("127.1.0.1", ['127.0.0.0/24']))
+ os.environ['no_proxy'] = '127.0.0.0/8'
+ self.assertTrue(_is_no_proxy_host("127.0.0.1", None))
+ self.assertTrue(_is_no_proxy_host("127.0.0.2", None))
+ os.environ['no_proxy'] = '127.0.0.0/24'
+ self.assertFalse(_is_no_proxy_host("127.1.0.1", None))
+
+ def testHostnameMatch(self):
+ self.assertTrue(_is_no_proxy_host("my.websocket.org", ['my.websocket.org']))
+ self.assertTrue(_is_no_proxy_host("my.websocket.org", ['other.websocket.org', 'my.websocket.org']))
+ self.assertFalse(_is_no_proxy_host("my.websocket.org", ['other.websocket.org']))
+ os.environ['no_proxy'] = 'my.websocket.org'
+ self.assertTrue(_is_no_proxy_host("my.websocket.org", None))
+ self.assertFalse(_is_no_proxy_host("other.websocket.org", None))
+ os.environ['no_proxy'] = 'other.websocket.org, my.websocket.org'
+ self.assertTrue(_is_no_proxy_host("my.websocket.org", None))
+
+ def testHostnameMatchDomain(self):
+ self.assertTrue(_is_no_proxy_host("any.websocket.org", ['.websocket.org']))
+ self.assertTrue(_is_no_proxy_host("my.other.websocket.org", ['.websocket.org']))
+ self.assertTrue(_is_no_proxy_host("any.websocket.org", ['my.websocket.org', '.websocket.org']))
+ self.assertFalse(_is_no_proxy_host("any.websocket.com", ['.websocket.org']))
+ os.environ['no_proxy'] = '.websocket.org'
+ self.assertTrue(_is_no_proxy_host("any.websocket.org", None))
+ self.assertTrue(_is_no_proxy_host("my.other.websocket.org", None))
+ self.assertFalse(_is_no_proxy_host("any.websocket.com", None))
+ os.environ['no_proxy'] = 'my.websocket.org, .websocket.org'
+ self.assertTrue(_is_no_proxy_host("any.websocket.org", None))
+
+
+class ProxyInfoTest(unittest.TestCase):
+ def setUp(self):
+ self.http_proxy = os.environ.get("http_proxy", None)
+ self.https_proxy = os.environ.get("https_proxy", None)
+ self.no_proxy = os.environ.get("no_proxy", None)
+ if "http_proxy" in os.environ:
+ del os.environ["http_proxy"]
+ if "https_proxy" in os.environ:
+ del os.environ["https_proxy"]
+ if "no_proxy" in os.environ:
+ del os.environ["no_proxy"]
+
+ def tearDown(self):
+ if self.http_proxy:
+ os.environ["http_proxy"] = self.http_proxy
+ elif "http_proxy" in os.environ:
+ del os.environ["http_proxy"]
+
+ if self.https_proxy:
+ os.environ["https_proxy"] = self.https_proxy
+ elif "https_proxy" in os.environ:
+ del os.environ["https_proxy"]
+
+ if self.no_proxy:
+ os.environ["no_proxy"] = self.no_proxy
+ elif "no_proxy" in os.environ:
+ del os.environ["no_proxy"]
+
+ def testProxyFromArgs(self):
+ self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost"), ("localhost", 0, None))
+ self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_port=3128),
+ ("localhost", 3128, None))
+ self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost"), ("localhost", 0, None))
+ self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128),
+ ("localhost", 3128, None))
+
+ self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_auth=("a", "b")),
+ ("localhost", 0, ("a", "b")))
+ self.assertEqual(
+ get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_port=3128, proxy_auth=("a", "b")),
+ ("localhost", 3128, ("a", "b")))
+ self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_auth=("a", "b")),
+ ("localhost", 0, ("a", "b")))
+ self.assertEqual(
+ get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128, proxy_auth=("a", "b")),
+ ("localhost", 3128, ("a", "b")))
+
+ self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128,
+ no_proxy=["example.com"], proxy_auth=("a", "b")),
+ ("localhost", 3128, ("a", "b")))
+ self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128,
+ no_proxy=["echo.websocket.org"], proxy_auth=("a", "b")),
+ (None, 0, None))
+
+ def testProxyFromEnv(self):
+ os.environ["http_proxy"] = "http://localhost/"
+ self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, None))
+ os.environ["http_proxy"] = "http://localhost:3128/"
+ self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None))
+
+ os.environ["http_proxy"] = "http://localhost/"
+ os.environ["https_proxy"] = "http://localhost2/"
+ self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, None))
+ os.environ["http_proxy"] = "http://localhost:3128/"
+ os.environ["https_proxy"] = "http://localhost2:3128/"
+ self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None))
+
+ os.environ["http_proxy"] = "http://localhost/"
+ os.environ["https_proxy"] = "http://localhost2/"
+ self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", None, None))
+ os.environ["http_proxy"] = "http://localhost:3128/"
+ os.environ["https_proxy"] = "http://localhost2:3128/"
+ self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, None))
+
+ os.environ["http_proxy"] = "http://a:b@localhost/"
+ self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b")))
+ os.environ["http_proxy"] = "http://a:b@localhost:3128/"
+ self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b")))
+
+ os.environ["http_proxy"] = "http://a:b@localhost/"
+ os.environ["https_proxy"] = "http://a:b@localhost2/"
+ self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b")))
+ os.environ["http_proxy"] = "http://a:b@localhost:3128/"
+ os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
+ self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b")))
+
+ os.environ["http_proxy"] = "http://a:b@localhost/"
+ os.environ["https_proxy"] = "http://a:b@localhost2/"
+ self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", None, ("a", "b")))
+ os.environ["http_proxy"] = "http://a:b@localhost:3128/"
+ os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
+ self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, ("a", "b")))
+
+ os.environ["http_proxy"] = "http://a:b@localhost/"
+ os.environ["https_proxy"] = "http://a:b@localhost2/"
+ os.environ["no_proxy"] = "example1.com,example2.com"
+ self.assertEqual(get_proxy_info("example.1.com", True), ("localhost2", None, ("a", "b")))
+ os.environ["http_proxy"] = "http://a:b@localhost:3128/"
+ os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
+ os.environ["no_proxy"] = "example1.com,example2.com, echo.websocket.org"
+ self.assertEqual(get_proxy_info("echo.websocket.org", True), (None, 0, None))
+ os.environ["http_proxy"] = "http://a:b@localhost:3128/"
+ os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
+ os.environ["no_proxy"] = "example1.com,example2.com, .websocket.org"
+ self.assertEqual(get_proxy_info("echo.websocket.org", True), (None, 0, None))
+
+ os.environ["http_proxy"] = "http://a:b@localhost:3128/"
+ os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
+ os.environ["no_proxy"] = "127.0.0.0/8, 192.168.0.0/16"
+ self.assertEqual(get_proxy_info("127.0.0.1", False), (None, 0, None))
+ self.assertEqual(get_proxy_info("192.168.1.1", False), (None, 0, None))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/contrib/python/websocket-client/py2/websocket/tests/test_websocket.py b/contrib/python/websocket-client/py2/websocket/tests/test_websocket.py
new file mode 100644
index 0000000000..b1b66b8a71
--- /dev/null
+++ b/contrib/python/websocket-client/py2/websocket/tests/test_websocket.py
@@ -0,0 +1,434 @@
+# -*- coding: utf-8 -*-
+#
+"""
+
+"""
+
+"""
+websocket - WebSocket client library for Python
+
+Copyright (C) 2010 Hiroki Ohtani(liris)
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""
+
+import sys
+sys.path[0:0] = [""]
+
+import os
+import os.path
+import socket
+
+import six
+
+# websocket-client
+import websocket as ws
+from websocket._handshake import _create_sec_websocket_key, \
+ _validate as _validate_header
+from websocket._http import read_headers
+from websocket._utils import validate_utf8
+
+if six.PY3:
+ from base64 import decodebytes as base64decode
+else:
+ from base64 import decodestring as base64decode
+
+if sys.version_info[0] == 2 and sys.version_info[1] < 7:
+ import unittest2 as unittest
+else:
+ import unittest
+
+try:
+ from ssl import SSLError
+except ImportError:
+ # dummy class of SSLError for ssl none-support environment.
+ class SSLError(Exception):
+ pass
+
+# Skip test to access the internet.
+TEST_WITH_INTERNET = os.environ.get('TEST_WITH_INTERNET', '0') == '1'
+TRACEABLE = True
+
+
+def create_mask_key(_):
+ return "abcd"
+
+
+class SockMock(object):
+ def __init__(self):
+ self.data = []
+ self.sent = []
+
+ def add_packet(self, data):
+ self.data.append(data)
+
+ def gettimeout(self):
+ return None
+
+ def recv(self, bufsize):
+ if self.data:
+ e = self.data.pop(0)
+ if isinstance(e, Exception):
+ raise e
+ if len(e) > bufsize:
+ self.data.insert(0, e[bufsize:])
+ return e[:bufsize]
+
+ def send(self, data):
+ self.sent.append(data)
+ return len(data)
+
+ def close(self):
+ pass
+
+
+class HeaderSockMock(SockMock):
+
+ def __init__(self, fname):
+ SockMock.__init__(self)
+ import yatest.common
+ path = yatest.common.source_path(os.path.join('contrib/python/websocket-client/py2/websocket/tests', fname))
+ with open(path, "rb") as f:
+ self.add_packet(f.read())
+
+
+class WebSocketTest(unittest.TestCase):
+ def setUp(self):
+ ws.enableTrace(TRACEABLE)
+
+ def tearDown(self):
+ pass
+
+ def testDefaultTimeout(self):
+ self.assertEqual(ws.getdefaulttimeout(), None)
+ ws.setdefaulttimeout(10)
+ self.assertEqual(ws.getdefaulttimeout(), 10)
+ ws.setdefaulttimeout(None)
+
+ def testWSKey(self):
+ key = _create_sec_websocket_key()
+ self.assertTrue(key != 24)
+ self.assertTrue(six.u("¥n") not in key)
+
+ def testNonce(self):
+ """ WebSocket key should be a random 16-byte nonce.
+ """
+ key = _create_sec_websocket_key()
+ nonce = base64decode(key.encode("utf-8"))
+ self.assertEqual(16, len(nonce))
+
+ def testWsUtils(self):
+ key = "c6b8hTg4EeGb2gQMztV1/g=="
+ required_header = {
+ "upgrade": "websocket",
+ "connection": "upgrade",
+ "sec-websocket-accept": "Kxep+hNu9n51529fGidYu7a3wO0="}
+ self.assertEqual(_validate_header(required_header, key, None), (True, None))
+
+ header = required_header.copy()
+ header["upgrade"] = "http"
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+ del header["upgrade"]
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+
+ header = required_header.copy()
+ header["connection"] = "something"
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+ del header["connection"]
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+
+ header = required_header.copy()
+ header["sec-websocket-accept"] = "something"
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+ del header["sec-websocket-accept"]
+ self.assertEqual(_validate_header(header, key, None), (False, None))
+
+ header = required_header.copy()
+ header["sec-websocket-protocol"] = "sub1"
+ self.assertEqual(_validate_header(header, key, ["sub1", "sub2"]), (True, "sub1"))
+ self.assertEqual(_validate_header(header, key, ["sub2", "sub3"]), (False, None))
+
+ header = required_header.copy()
+ header["sec-websocket-protocol"] = "sUb1"
+ self.assertEqual(_validate_header(header, key, ["Sub1", "suB2"]), (True, "sub1"))
+
+ header = required_header.copy()
+ self.assertEqual(_validate_header(header, key, ["Sub1", "suB2"]), (False, None))
+
+ def testReadHeader(self):
+ status, header, status_message = read_headers(HeaderSockMock("data/header01.txt"))
+ self.assertEqual(status, 101)
+ self.assertEqual(header["connection"], "Upgrade")
+
+ status, header, status_message = read_headers(HeaderSockMock("data/header03.txt"))
+ self.assertEqual(status, 101)
+ self.assertEqual(header["connection"], "Upgrade, Keep-Alive")
+
+ HeaderSockMock("data/header02.txt")
+ self.assertRaises(ws.WebSocketException, read_headers, HeaderSockMock("data/header02.txt"))
+
+ def testSend(self):
+ # TODO: add longer frame data
+ sock = ws.WebSocket()
+ sock.set_mask_key(create_mask_key)
+ s = sock.sock = HeaderSockMock("data/header01.txt")
+ sock.send("Hello")
+ self.assertEqual(s.sent[0], six.b("\x81\x85abcd)\x07\x0f\x08\x0e"))
+
+ sock.send("こんにちは")
+ self.assertEqual(s.sent[1], six.b("\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc"))
+
+ sock.send(u"こんにちは")
+ self.assertEqual(s.sent[1], six.b("\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc"))
+
+# sock.send("x" * 5000)
+# self.assertEqual(s.sent[1], six.b("\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc"))
+
+ self.assertEqual(sock.send_binary(b'1111111111101'), 19)
+
+ def testRecv(self):
+ # TODO: add longer frame data
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ something = six.b("\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc")
+ s.add_packet(something)
+ data = sock.recv()
+ self.assertEqual(data, "こんにちは")
+
+ s.add_packet(six.b("\x81\x85abcd)\x07\x0f\x08\x0e"))
+ data = sock.recv()
+ self.assertEqual(data, "Hello")
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def testIter(self):
+ count = 2
+ for _ in ws.create_connection('wss://stream.meetup.com/2/rsvps'):
+ count -= 1
+ if count == 0:
+ break
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def testNext(self):
+ sock = ws.create_connection('wss://stream.meetup.com/2/rsvps')
+ self.assertEqual(str, type(next(sock)))
+
+ def testInternalRecvStrict(self):
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ s.add_packet(six.b("foo"))
+ s.add_packet(socket.timeout())
+ s.add_packet(six.b("bar"))
+ # s.add_packet(SSLError("The read operation timed out"))
+ s.add_packet(six.b("baz"))
+ with self.assertRaises(ws.WebSocketTimeoutException):
+ sock.frame_buffer.recv_strict(9)
+ # if six.PY2:
+ # with self.assertRaises(ws.WebSocketTimeoutException):
+ # data = sock._recv_strict(9)
+ # else:
+ # with self.assertRaises(SSLError):
+ # data = sock._recv_strict(9)
+ data = sock.frame_buffer.recv_strict(9)
+ self.assertEqual(data, six.b("foobarbaz"))
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.frame_buffer.recv_strict(1)
+
+ def testRecvTimeout(self):
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ s.add_packet(six.b("\x81"))
+ s.add_packet(socket.timeout())
+ s.add_packet(six.b("\x8dabcd\x29\x07\x0f\x08\x0e"))
+ s.add_packet(socket.timeout())
+ s.add_packet(six.b("\x4e\x43\x33\x0e\x10\x0f\x00\x40"))
+ with self.assertRaises(ws.WebSocketTimeoutException):
+ sock.recv()
+ with self.assertRaises(ws.WebSocketTimeoutException):
+ sock.recv()
+ data = sock.recv()
+ self.assertEqual(data, "Hello, World!")
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.recv()
+
+ def testRecvWithSimpleFragmentation(self):
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ # OPCODE=TEXT, FIN=0, MSG="Brevity is "
+ s.add_packet(six.b("\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C"))
+ # OPCODE=CONT, FIN=1, MSG="the soul of wit"
+ s.add_packet(six.b("\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17"))
+ data = sock.recv()
+ self.assertEqual(data, "Brevity is the soul of wit")
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.recv()
+
+ def testRecvWithFireEventOfFragmentation(self):
+ sock = ws.WebSocket(fire_cont_frame=True)
+ s = sock.sock = SockMock()
+ # OPCODE=TEXT, FIN=0, MSG="Brevity is "
+ s.add_packet(six.b("\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C"))
+ # OPCODE=CONT, FIN=0, MSG="Brevity is "
+ s.add_packet(six.b("\x00\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C"))
+ # OPCODE=CONT, FIN=1, MSG="the soul of wit"
+ s.add_packet(six.b("\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17"))
+
+ _, data = sock.recv_data()
+ self.assertEqual(data, six.b("Brevity is "))
+ _, data = sock.recv_data()
+ self.assertEqual(data, six.b("Brevity is "))
+ _, data = sock.recv_data()
+ self.assertEqual(data, six.b("the soul of wit"))
+
+ # OPCODE=CONT, FIN=0, MSG="Brevity is "
+ s.add_packet(six.b("\x80\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C"))
+
+ with self.assertRaises(ws.WebSocketException):
+ sock.recv_data()
+
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.recv()
+
+ def testClose(self):
+ sock = ws.WebSocket()
+ sock.sock = SockMock()
+ sock.connected = True
+ sock.close()
+ self.assertEqual(sock.connected, False)
+
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ sock.connected = True
+ s.add_packet(six.b('\x88\x80\x17\x98p\x84'))
+ sock.recv()
+ self.assertEqual(sock.connected, False)
+
+ def testRecvContFragmentation(self):
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ # OPCODE=CONT, FIN=1, MSG="the soul of wit"
+ s.add_packet(six.b("\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17"))
+ self.assertRaises(ws.WebSocketException, sock.recv)
+
+ def testRecvWithProlongedFragmentation(self):
+ sock = ws.WebSocket()
+ s = sock.sock = SockMock()
+ # OPCODE=TEXT, FIN=0, MSG="Once more unto the breach, "
+ s.add_packet(six.b("\x01\x9babcd.\x0c\x00\x01A\x0f\x0c\x16\x04B\x16\n\x15"
+ "\rC\x10\t\x07C\x06\x13\x07\x02\x07\tNC"))
+ # OPCODE=CONT, FIN=0, MSG="dear friends, "
+ s.add_packet(six.b("\x00\x8eabcd\x05\x07\x02\x16A\x04\x11\r\x04\x0c\x07"
+ "\x17MB"))
+ # OPCODE=CONT, FIN=1, MSG="once more"
+ s.add_packet(six.b("\x80\x89abcd\x0e\x0c\x00\x01A\x0f\x0c\x16\x04"))
+ data = sock.recv()
+ self.assertEqual(
+ data,
+ "Once more unto the breach, dear friends, once more")
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.recv()
+
+ def testRecvWithFragmentationAndControlFrame(self):
+ sock = ws.WebSocket()
+ sock.set_mask_key(create_mask_key)
+ s = sock.sock = SockMock()
+ # OPCODE=TEXT, FIN=0, MSG="Too much "
+ s.add_packet(six.b("\x01\x89abcd5\r\x0cD\x0c\x17\x00\x0cA"))
+ # OPCODE=PING, FIN=1, MSG="Please PONG this"
+ s.add_packet(six.b("\x89\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17"))
+ # OPCODE=CONT, FIN=1, MSG="of a good thing"
+ s.add_packet(six.b("\x80\x8fabcd\x0e\x04C\x05A\x05\x0c\x0b\x05B\x17\x0c"
+ "\x08\x0c\x04"))
+ data = sock.recv()
+ self.assertEqual(data, "Too much of a good thing")
+ with self.assertRaises(ws.WebSocketConnectionClosedException):
+ sock.recv()
+ self.assertEqual(
+ s.sent[0],
+ six.b("\x8a\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17"))
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def testWebSocket(self):
+ s = ws.create_connection("ws://echo.websocket.org/")
+ self.assertNotEqual(s, None)
+ s.send("Hello, World")
+ result = s.recv()
+ self.assertEqual(result, "Hello, World")
+
+ s.send(u"こにゃにゃちは、世界")
+ result = s.recv()
+ self.assertEqual(result, "こにゃにゃちは、世界")
+ self.assertRaises(ValueError, s.send_close, -1, "")
+ s.close()
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def testPingPong(self):
+ s = ws.create_connection("ws://echo.websocket.org/")
+ self.assertNotEqual(s, None)
+ s.ping("Hello")
+ s.pong("Hi")
+ s.close()
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def testSecureWebSocket(self):
+ import ssl
+ s = ws.create_connection("wss://api.bitfinex.com/ws/2")
+ self.assertNotEqual(s, None)
+ self.assertTrue(isinstance(s.sock, ssl.SSLSocket))
+ self.assertEqual(s.getstatus(), 101)
+ self.assertNotEqual(s.getheaders(), None)
+ s.close()
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def testWebSocketWithCustomHeader(self):
+ s = ws.create_connection("ws://echo.websocket.org/",
+ headers={"User-Agent": "PythonWebsocketClient"})
+ self.assertNotEqual(s, None)
+ s.send("Hello, World")
+ result = s.recv()
+ self.assertEqual(result, "Hello, World")
+ self.assertRaises(ValueError, s.close, -1, "")
+ s.close()
+
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def testAfterClose(self):
+ s = ws.create_connection("ws://echo.websocket.org/")
+ self.assertNotEqual(s, None)
+ s.close()
+ self.assertRaises(ws.WebSocketConnectionClosedException, s.send, "Hello")
+ self.assertRaises(ws.WebSocketConnectionClosedException, s.recv)
+
+
+class SockOptTest(unittest.TestCase):
+ @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
+ def testSockOpt(self):
+ sockopt = ((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),)
+ s = ws.create_connection("ws://echo.websocket.org", sockopt=sockopt)
+ self.assertNotEqual(s.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0)
+ s.close()
+
+
+class UtilsTest(unittest.TestCase):
+ def testUtf8Validator(self):
+ state = validate_utf8(six.b('\xf0\x90\x80\x80'))
+ self.assertEqual(state, True)
+ state = validate_utf8(six.b('\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5\xed\xa0\x80edited'))
+ self.assertEqual(state, False)
+ state = validate_utf8(six.b(''))
+ self.assertEqual(state, True)
+
+
+if __name__ == "__main__":
+ unittest.main()