summaryrefslogtreecommitdiffstats
path: root/contrib/python/websocket-client/websocket/tests/test_socket.py
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2025-10-22 11:36:42 +0300
committerrobot-piglet <[email protected]>2025-10-22 12:14:27 +0300
commit6a490d481992dac77fa8785bb4d6e6cafea36fa3 (patch)
tree28dac4c8ea239eedadc726b73a16514223e56389 /contrib/python/websocket-client/websocket/tests/test_socket.py
parentd924ab94175835dc15b389ee8969ff0ddfd35930 (diff)
Intermediate changes
commit_hash:6bfda3fd45ff19cb21e3edc6e8b7dad337978a7e
Diffstat (limited to 'contrib/python/websocket-client/websocket/tests/test_socket.py')
-rw-r--r--contrib/python/websocket-client/websocket/tests/test_socket.py357
1 files changed, 357 insertions, 0 deletions
diff --git a/contrib/python/websocket-client/websocket/tests/test_socket.py b/contrib/python/websocket-client/websocket/tests/test_socket.py
new file mode 100644
index 00000000000..5b8b65bd6b5
--- /dev/null
+++ b/contrib/python/websocket-client/websocket/tests/test_socket.py
@@ -0,0 +1,357 @@
+# -*- coding: utf-8 -*-
+import errno
+import socket
+import unittest
+from unittest.mock import Mock, patch, MagicMock
+import time
+
+from websocket._socket import recv, recv_line, send, DEFAULT_SOCKET_OPTION
+from websocket._ssl_compat import (
+ SSLError,
+ SSLEOFError,
+ SSLWantWriteError,
+ SSLWantReadError,
+)
+from websocket._exceptions import (
+ WebSocketTimeoutException,
+ WebSocketConnectionClosedException,
+)
+
+"""
+test_socket.py
+websocket - WebSocket client library for Python
+
+Copyright 2025 engn33r
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+class SocketTest(unittest.TestCase):
+ def test_default_socket_option(self):
+ """Test DEFAULT_SOCKET_OPTION contains expected options"""
+ self.assertIsInstance(DEFAULT_SOCKET_OPTION, list)
+ self.assertGreater(len(DEFAULT_SOCKET_OPTION), 0)
+
+ # Should contain TCP_NODELAY option
+ tcp_nodelay_found = any(
+ opt[1] == socket.TCP_NODELAY for opt in DEFAULT_SOCKET_OPTION
+ )
+ self.assertTrue(tcp_nodelay_found)
+
+ def test_recv_normal(self):
+ """Test normal recv operation"""
+ mock_sock = Mock()
+ mock_sock.recv.return_value = b"test data"
+
+ result = recv(mock_sock, 9)
+
+ self.assertEqual(result, b"test data")
+ mock_sock.recv.assert_called_once_with(9)
+
+ def test_recv_timeout_error(self):
+ """Test recv with TimeoutError"""
+ mock_sock = Mock()
+ mock_sock.recv.side_effect = TimeoutError("Connection timed out")
+
+ with self.assertRaises(WebSocketTimeoutException) as cm:
+ recv(mock_sock, 9)
+
+ self.assertEqual(str(cm.exception), "Connection timed out")
+
+ def test_recv_socket_timeout(self):
+ """Test recv with socket.timeout"""
+ mock_sock = Mock()
+ timeout_exc = socket.timeout("Socket timed out")
+ timeout_exc.args = ("Socket timed out",)
+ mock_sock.recv.side_effect = timeout_exc
+ mock_sock.gettimeout.return_value = 30.0
+
+ with self.assertRaises(WebSocketTimeoutException) as cm:
+ recv(mock_sock, 9)
+
+ # In Python 3.10+, socket.timeout is a subclass of TimeoutError
+ # so it's caught by the TimeoutError handler with hardcoded message
+ # In Python 3.9, socket.timeout is caught by socket.timeout handler
+ # which preserves the original message
+ import sys
+
+ if sys.version_info >= (3, 10):
+ self.assertEqual(str(cm.exception), "Connection timed out")
+ else:
+ self.assertEqual(str(cm.exception), "Socket timed out")
+
+ def test_recv_ssl_timeout(self):
+ """Test recv with SSL timeout error"""
+ mock_sock = Mock()
+ ssl_exc = SSLError("The operation timed out")
+ ssl_exc.args = ("The operation timed out",)
+ mock_sock.recv.side_effect = ssl_exc
+
+ with self.assertRaises(WebSocketTimeoutException) as cm:
+ recv(mock_sock, 9)
+
+ self.assertEqual(str(cm.exception), "The operation timed out")
+
+ def test_recv_ssl_non_timeout_error(self):
+ """Test recv with SSL non-timeout error"""
+ mock_sock = Mock()
+ ssl_exc = SSLError("SSL certificate error")
+ ssl_exc.args = ("SSL certificate error",)
+ mock_sock.recv.side_effect = ssl_exc
+
+ # Should re-raise the original SSL error
+ with self.assertRaises(SSLError):
+ recv(mock_sock, 9)
+
+ def test_recv_empty_response(self):
+ """Test recv with empty response (connection closed)"""
+ mock_sock = Mock()
+ mock_sock.recv.return_value = b""
+
+ with self.assertRaises(WebSocketConnectionClosedException) as cm:
+ recv(mock_sock, 9)
+
+ self.assertEqual(str(cm.exception), "Connection to remote host was lost.")
+
+ def test_recv_ssl_want_read_error(self):
+ """Test recv with SSLWantReadError (should retry)"""
+ mock_sock = Mock()
+
+ # First call raises SSLWantReadError, second call succeeds
+ mock_sock.recv.side_effect = [SSLWantReadError(), b"data after retry"]
+
+ with patch("selectors.DefaultSelector") as mock_selector_class:
+ mock_selector = Mock()
+ mock_selector_class.return_value = mock_selector
+ mock_selector.select.return_value = [True] # Ready to read
+
+ result = recv(mock_sock, 100)
+
+ self.assertEqual(result, b"data after retry")
+ mock_selector.register.assert_called()
+ mock_selector.close.assert_called()
+
+ def test_recv_ssl_want_read_timeout(self):
+ """Test recv with SSLWantReadError that times out"""
+ mock_sock = Mock()
+ mock_sock.recv.side_effect = SSLWantReadError()
+ mock_sock.gettimeout.return_value = 1.0
+
+ with patch("selectors.DefaultSelector") as mock_selector_class:
+ mock_selector = Mock()
+ mock_selector_class.return_value = mock_selector
+ mock_selector.select.return_value = [] # Timeout
+
+ with self.assertRaises(WebSocketTimeoutException):
+ recv(mock_sock, 100)
+
+ def test_recv_line(self):
+ """Test recv_line functionality"""
+ mock_sock = Mock()
+
+ # Mock recv to return one character at a time
+ recv_calls = [b"H", b"e", b"l", b"l", b"o", b"\n"]
+
+ with patch("websocket._socket.recv", side_effect=recv_calls) as mock_recv:
+ result = recv_line(mock_sock)
+
+ self.assertEqual(result, b"Hello\n")
+ self.assertEqual(mock_recv.call_count, 6)
+
+ def test_send_normal(self):
+ """Test normal send operation"""
+ mock_sock = Mock()
+ mock_sock.send.return_value = 9
+ mock_sock.gettimeout.return_value = 30.0
+
+ result = send(mock_sock, b"test data")
+
+ self.assertEqual(result, 9)
+ mock_sock.send.assert_called_with(b"test data")
+
+ def test_send_zero_timeout(self):
+ """Test send with zero timeout (non-blocking)"""
+ mock_sock = Mock()
+ mock_sock.send.return_value = 9
+ mock_sock.gettimeout.return_value = 0
+
+ result = send(mock_sock, b"test data")
+
+ self.assertEqual(result, 9)
+ mock_sock.send.assert_called_once_with(b"test data")
+
+ def test_send_ssl_eof_error(self):
+ """Test send with SSLEOFError"""
+ mock_sock = Mock()
+ mock_sock.gettimeout.return_value = 30.0
+ mock_sock.send.side_effect = SSLEOFError("Connection closed")
+
+ with self.assertRaises(WebSocketConnectionClosedException) as cm:
+ send(mock_sock, b"test data")
+
+ self.assertEqual(str(cm.exception), "socket is already closed.")
+
+ def test_send_ssl_want_write_error(self):
+ """Test send with SSLWantWriteError (should retry)"""
+ mock_sock = Mock()
+ mock_sock.gettimeout.return_value = 30.0
+
+ # First call raises SSLWantWriteError, second call succeeds
+ mock_sock.send.side_effect = [SSLWantWriteError(), 9]
+
+ with patch("selectors.DefaultSelector") as mock_selector_class:
+ mock_selector = Mock()
+ mock_selector_class.return_value = mock_selector
+ mock_selector.select.return_value = [True] # Ready to write
+
+ result = send(mock_sock, b"test data")
+
+ self.assertEqual(result, 9)
+ mock_selector.register.assert_called()
+ mock_selector.close.assert_called()
+
+ def test_send_socket_eagain_error(self):
+ """Test send with EAGAIN error (should retry)"""
+ mock_sock = Mock()
+ mock_sock.gettimeout.return_value = 30.0
+
+ # Create socket error with EAGAIN
+ eagain_error = socket.error("Resource temporarily unavailable")
+ eagain_error.errno = errno.EAGAIN
+ eagain_error.args = (errno.EAGAIN, "Resource temporarily unavailable")
+
+ # First call raises EAGAIN, second call succeeds
+ mock_sock.send.side_effect = [eagain_error, 9]
+
+ with patch("selectors.DefaultSelector") as mock_selector_class:
+ mock_selector = Mock()
+ mock_selector_class.return_value = mock_selector
+ mock_selector.select.return_value = [True] # Ready to write
+
+ result = send(mock_sock, b"test data")
+
+ self.assertEqual(result, 9)
+
+ def test_send_socket_ewouldblock_error(self):
+ """Test send with EWOULDBLOCK error (should retry)"""
+ mock_sock = Mock()
+ mock_sock.gettimeout.return_value = 30.0
+
+ # Create socket error with EWOULDBLOCK
+ ewouldblock_error = socket.error("Operation would block")
+ ewouldblock_error.errno = errno.EWOULDBLOCK
+ ewouldblock_error.args = (errno.EWOULDBLOCK, "Operation would block")
+
+ # First call raises EWOULDBLOCK, second call succeeds
+ mock_sock.send.side_effect = [ewouldblock_error, 9]
+
+ with patch("selectors.DefaultSelector") as mock_selector_class:
+ mock_selector = Mock()
+ mock_selector_class.return_value = mock_selector
+ mock_selector.select.return_value = [True] # Ready to write
+
+ result = send(mock_sock, b"test data")
+
+ self.assertEqual(result, 9)
+
+ def test_send_socket_other_error(self):
+ """Test send with other socket error (should raise)"""
+ mock_sock = Mock()
+ mock_sock.gettimeout.return_value = 30.0
+
+ # Create socket error with different errno
+ other_error = socket.error("Connection reset by peer")
+ other_error.errno = errno.ECONNRESET
+ other_error.args = (errno.ECONNRESET, "Connection reset by peer")
+
+ mock_sock.send.side_effect = other_error
+
+ with self.assertRaises(socket.error):
+ send(mock_sock, b"test data")
+
+ def test_send_socket_error_no_errno(self):
+ """Test send with socket error that has no errno"""
+ mock_sock = Mock()
+ mock_sock.gettimeout.return_value = 30.0
+
+ # Create socket error without errno attribute
+ no_errno_error = socket.error("Generic socket error")
+ no_errno_error.args = ("Generic socket error",)
+
+ mock_sock.send.side_effect = no_errno_error
+
+ with self.assertRaises(socket.error):
+ send(mock_sock, b"test data")
+
+ def test_send_write_timeout(self):
+ """Test send write operation timeout"""
+ mock_sock = Mock()
+ mock_sock.gettimeout.return_value = 30.0
+
+ # First call raises EAGAIN
+ eagain_error = socket.error("Resource temporarily unavailable")
+ eagain_error.errno = errno.EAGAIN
+ eagain_error.args = (errno.EAGAIN, "Resource temporarily unavailable")
+
+ mock_sock.send.side_effect = eagain_error
+
+ with patch("selectors.DefaultSelector") as mock_selector_class:
+ mock_selector = Mock()
+ mock_selector_class.return_value = mock_selector
+ mock_selector.select.return_value = [] # Timeout - nothing ready
+
+ result = send(mock_sock, b"test data")
+
+ # Should return 0 when write times out
+ self.assertEqual(result, 0)
+
+ def test_send_string_data(self):
+ """Test send with string data (should be encoded)"""
+ mock_sock = Mock()
+ mock_sock.send.return_value = 9
+ mock_sock.gettimeout.return_value = 30.0
+
+ result = send(mock_sock, "test data")
+
+ self.assertEqual(result, 9)
+ mock_sock.send.assert_called_with(b"test data")
+
+ def test_send_partial_send_retry(self):
+ """Test send retry mechanism"""
+ mock_sock = Mock()
+ mock_sock.gettimeout.return_value = 30.0
+
+ # Create a scenario where send succeeds after selector retry
+ eagain_error = socket.error("Resource temporarily unavailable")
+ eagain_error.errno = errno.EAGAIN
+ eagain_error.args = (errno.EAGAIN, "Resource temporarily unavailable")
+
+ # Mock the internal _send function behavior
+ mock_sock.send.side_effect = [eagain_error, 9]
+
+ with patch("selectors.DefaultSelector") as mock_selector_class:
+ mock_selector = Mock()
+ mock_selector_class.return_value = mock_selector
+ mock_selector.select.return_value = [True] # Socket ready for writing
+
+ result = send(mock_sock, b"test data")
+
+ self.assertEqual(result, 9)
+ # Verify selector was used for retry mechanism
+ mock_selector.register.assert_called()
+ mock_selector.select.assert_called()
+ mock_selector.close.assert_called()
+
+
+if __name__ == "__main__":
+ unittest.main()