aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/websocket-client/py3/websocket/_app.py
diff options
context:
space:
mode:
authorAlexSm <alex@ydb.tech>2023-12-21 15:05:38 +0100
committerGitHub <noreply@github.com>2023-12-21 15:05:38 +0100
commite98bcbc74422492351c51646dba3849a138a8ffc (patch)
tree38ad7a09b1f9c201ce8a7e3d69f2017388769224 /contrib/python/websocket-client/py3/websocket/_app.py
parent559d7083cd8378cb25b9e966dedcca21d413e338 (diff)
downloadydb-e98bcbc74422492351c51646dba3849a138a8ffc.tar.gz
Import libs 1 (#590)
* Import libs 1 * Add new file without extension * Add file missed in export config
Diffstat (limited to 'contrib/python/websocket-client/py3/websocket/_app.py')
-rw-r--r--contrib/python/websocket-client/py3/websocket/_app.py283
1 files changed, 195 insertions, 88 deletions
diff --git a/contrib/python/websocket-client/py3/websocket/_app.py b/contrib/python/websocket-client/py3/websocket/_app.py
index 13f8bd5634..4d8af3b5b1 100644
--- a/contrib/python/websocket-client/py3/websocket/_app.py
+++ b/contrib/python/websocket-client/py3/websocket/_app.py
@@ -1,18 +1,19 @@
import inspect
import selectors
import socket
-import sys
import threading
import time
-import traceback
-
from typing import Any, Callable, Optional, Union
from . import _logging
from ._abnf import ABNF
-from ._url import parse_url
from ._core import WebSocket, getdefaulttimeout
-from ._exceptions import *
+from ._exceptions import (
+ WebSocketConnectionClosedException,
+ WebSocketException,
+ WebSocketTimeoutException,
+)
+from ._url import parse_url
"""
_app.py
@@ -47,22 +48,24 @@ class DispatcherBase:
"""
DispatcherBase
"""
- def __init__(self, app: Any, ping_timeout: float) -> None:
+
+ def __init__(self, app: Any, ping_timeout: Union[float, int, None]) -> None:
self.app = app
self.ping_timeout = ping_timeout
- def timeout(self, seconds: int, callback: Callable) -> None:
+ def timeout(self, seconds: Union[float, int, None], callback: Callable) -> None:
time.sleep(seconds)
callback()
def reconnect(self, seconds: int, reconnector: Callable) -> None:
try:
- _logging.info("reconnect() - retrying in {seconds_count} seconds [{frame_count} frames in stack]".format(
- seconds_count=seconds, frame_count=len(inspect.stack())))
+ _logging.info(
+ f"reconnect() - retrying in {seconds} seconds [{len(inspect.stack())} frames in stack]"
+ )
time.sleep(seconds)
reconnector(reconnecting=True)
except KeyboardInterrupt as e:
- _logging.info("User exited {err}".format(err=e))
+ _logging.info(f"User exited {e}")
raise e
@@ -70,13 +73,18 @@ class Dispatcher(DispatcherBase):
"""
Dispatcher
"""
- def read(self, sock: socket.socket, read_callback: Callable, check_callback: Callable) -> None:
+
+ def read(
+ self,
+ sock: socket.socket,
+ read_callback: Callable,
+ check_callback: Callable,
+ ) -> None:
sel = selectors.DefaultSelector()
sel.register(self.app.sock.sock, selectors.EVENT_READ)
try:
while self.app.keep_running:
- r = sel.select(self.ping_timeout)
- if r:
+ if sel.select(self.ping_timeout):
if not read_callback():
break
check_callback()
@@ -88,24 +96,31 @@ class SSLDispatcher(DispatcherBase):
"""
SSLDispatcher
"""
- def read(self, sock: socket.socket, read_callback: Callable, check_callback: Callable) -> None:
+
+ def read(
+ self,
+ sock: socket.socket,
+ read_callback: Callable,
+ check_callback: Callable,
+ ) -> None:
sock = self.app.sock.sock
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ)
try:
while self.app.keep_running:
- r = self.select(sock, sel)
- if r:
+ if self.select(sock, sel):
if not read_callback():
break
check_callback()
finally:
sel.close()
- def select(self, sock, sel:selectors.DefaultSelector):
+ def select(self, sock, sel: selectors.DefaultSelector):
sock = self.app.sock.sock
if sock.pending():
- return [sock,]
+ return [
+ sock,
+ ]
r = sel.select(self.ping_timeout)
@@ -117,17 +132,23 @@ class WrappedDispatcher:
"""
WrappedDispatcher
"""
- def __init__(self, app, ping_timeout: float, dispatcher: Dispatcher) -> None:
+
+ def __init__(self, app, ping_timeout: Union[float, int, None], dispatcher) -> None:
self.app = app
self.ping_timeout = ping_timeout
self.dispatcher = dispatcher
dispatcher.signal(2, dispatcher.abort) # keyboard interrupt
- def read(self, sock: socket.socket, read_callback: Callable, check_callback: Callable) -> None:
+ def read(
+ self,
+ sock: socket.socket,
+ read_callback: Callable,
+ check_callback: Callable,
+ ) -> None:
self.dispatcher.read(sock, read_callback)
self.ping_timeout and self.timeout(self.ping_timeout, check_callback)
- def timeout(self, seconds: int, callback: Callable) -> None:
+ def timeout(self, seconds: float, callback: Callable) -> None:
self.dispatcher.timeout(seconds, callback)
def reconnect(self, seconds: int, reconnector: Callable) -> None:
@@ -139,14 +160,24 @@ class WebSocketApp:
Higher level of APIs are provided. The interface is like JavaScript WebSocket object.
"""
- def __init__(self, url: str, header: Union[list, dict, Callable] = None,
- on_open: Callable = None, on_message: Callable = None, on_error: Callable = None,
- on_close: Callable = None, on_ping: Callable = None, on_pong: Callable = None,
- on_cont_message: Callable = None,
- keep_running: bool = True, get_mask_key: Callable = None, cookie: str = None,
- subprotocols: list = None,
- on_data: Callable = None,
- socket: socket.socket = None) -> None:
+ def __init__(
+ self,
+ url: str,
+ header: Union[list, dict, Callable, None] = None,
+ on_open: Optional[Callable[[WebSocket], None]] = None,
+ on_message: Optional[Callable[[WebSocket, Any], None]] = None,
+ on_error: Optional[Callable[[WebSocket, Any], None]] = None,
+ on_close: Optional[Callable[[WebSocket, Any, Any], None]] = None,
+ on_ping: Optional[Callable] = None,
+ on_pong: Optional[Callable] = None,
+ on_cont_message: Optional[Callable] = None,
+ keep_running: bool = True,
+ get_mask_key: Optional[Callable] = None,
+ cookie: Optional[str] = None,
+ subprotocols: Optional[list] = None,
+ on_data: Optional[Callable] = None,
+ socket: Optional[socket.socket] = None,
+ ) -> None:
"""
WebSocketApp initialization
@@ -222,13 +253,13 @@ class WebSocketApp:
self.on_cont_message = on_cont_message
self.keep_running = False
self.get_mask_key = get_mask_key
- self.sock = None
- self.last_ping_tm = 0
- self.last_pong_tm = 0
- self.ping_thread = None
- self.stop_ping = None
- self.ping_interval = 0
- self.ping_timeout = None
+ self.sock: Optional[WebSocket] = None
+ self.last_ping_tm = float(0)
+ self.last_pong_tm = float(0)
+ self.ping_thread: Optional[threading.Thread] = None
+ self.stop_ping: Optional[threading.Event] = None
+ self.ping_interval = float(0)
+ self.ping_timeout: Union[float, int, None] = None
self.ping_payload = ""
self.subprotocols = subprotocols
self.prepared_socket = socket
@@ -236,7 +267,7 @@ class WebSocketApp:
self.has_done_teardown = False
self.has_done_teardown_lock = threading.Lock()
- def send(self, data: str, opcode: int = ABNF.OPCODE_TEXT) -> None:
+ def send(self, data: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> None:
"""
send message
@@ -250,8 +281,21 @@ class WebSocketApp:
"""
if not self.sock or self.sock.send(data, opcode) == 0:
- raise WebSocketConnectionClosedException(
- "Connection is already closed.")
+ raise WebSocketConnectionClosedException("Connection is already closed.")
+
+ def send_text(self, text_data: str) -> None:
+ """
+ Sends UTF-8 encoded text.
+ """
+ if not self.sock or self.sock.send(text_data, ABNF.OPCODE_TEXT) == 0:
+ raise WebSocketConnectionClosedException("Connection is already closed.")
+
+ def send_bytes(self, data: Union[bytes, bytearray]) -> None:
+ """
+ Sends a sequence of bytes.
+ """
+ if not self.sock or self.sock.send(data, ABNF.OPCODE_BINARY) == 0:
+ raise WebSocketConnectionClosedException("Connection is already closed.")
def close(self, **kwargs) -> None:
"""
@@ -263,7 +307,7 @@ class WebSocketApp:
self.sock = None
def _start_ping_thread(self) -> None:
- self.last_ping_tm = self.last_pong_tm = 0
+ self.last_ping_tm = self.last_pong_tm = float(0)
self.stop_ping = threading.Event()
self.ping_thread = threading.Thread(target=self._send_ping)
self.ping_thread.daemon = True
@@ -274,7 +318,7 @@ class WebSocketApp:
self.stop_ping.set()
if self.ping_thread and self.ping_thread.is_alive():
self.ping_thread.join(3)
- self.last_ping_tm = self.last_pong_tm = 0
+ self.last_ping_tm = self.last_pong_tm = float(0)
def _send_ping(self) -> None:
if self.stop_ping.wait(self.ping_interval) or self.keep_running is False:
@@ -286,17 +330,28 @@ class WebSocketApp:
_logging.debug("Sending ping")
self.sock.ping(self.ping_payload)
except Exception as e:
- _logging.debug("Failed to send ping: {err}".format(err=e))
-
- def run_forever(self, sockopt: tuple = None, sslopt: dict = None,
- ping_interval: float = 0, ping_timeout: Optional[float] = None,
- ping_payload: str = "",
- http_proxy_host: str = None, http_proxy_port: Union[int, str] = None,
- http_no_proxy: list = None, http_proxy_auth: tuple = None,
- http_proxy_timeout: float = None,
- skip_utf8_validation: bool = False,
- host: str = None, origin: str = None, dispatcher: Dispatcher = None,
- suppress_origin: bool = False, proxy_type: str = None, reconnect: int = None) -> bool:
+ _logging.debug(f"Failed to send ping: {e}")
+
+ def run_forever(
+ self,
+ sockopt: tuple = None,
+ sslopt: dict = None,
+ ping_interval: Union[float, int] = 0,
+ ping_timeout: Union[float, int, None] = None,
+ ping_payload: str = "",
+ http_proxy_host: str = None,
+ http_proxy_port: Union[int, str] = None,
+ http_no_proxy: list = None,
+ http_proxy_auth: tuple = None,
+ http_proxy_timeout: Optional[float] = None,
+ skip_utf8_validation: bool = False,
+ host: str = None,
+ origin: str = None,
+ dispatcher=None,
+ suppress_origin: bool = False,
+ proxy_type: str = None,
+ reconnect: int = None,
+ ) -> bool:
"""
Run event loop for WebSocket framework.
@@ -360,7 +415,7 @@ class WebSocketApp:
if ping_timeout and ping_interval and ping_interval <= ping_timeout:
raise WebSocketException("Ensure ping_interval > ping_timeout")
if not sockopt:
- sockopt = []
+ sockopt = ()
if not sslopt:
sslopt = {}
if self.sock:
@@ -394,7 +449,8 @@ class WebSocketApp:
if self.sock:
self.sock.close()
close_status_code, close_reason = self._get_close_args(
- close_frame if close_frame else None)
+ close_frame if close_frame else None
+ )
self.sock = None
# Finally call the callback AFTER all teardown is complete
@@ -405,24 +461,34 @@ class WebSocketApp:
self.sock.shutdown()
self.sock = WebSocket(
- self.get_mask_key, sockopt=sockopt, sslopt=sslopt,
+ self.get_mask_key,
+ sockopt=sockopt,
+ sslopt=sslopt,
fire_cont_frame=self.on_cont_message is not None,
skip_utf8_validation=skip_utf8_validation,
- enable_multithread=True)
+ enable_multithread=True,
+ )
self.sock.settimeout(getdefaulttimeout())
try:
-
header = self.header() if callable(self.header) else self.header
self.sock.connect(
- self.url, header=header, cookie=self.cookie,
+ self.url,
+ header=header,
+ cookie=self.cookie,
http_proxy_host=http_proxy_host,
- http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy,
- http_proxy_auth=http_proxy_auth, http_proxy_timeout=http_proxy_timeout,
+ http_proxy_port=http_proxy_port,
+ http_no_proxy=http_no_proxy,
+ http_proxy_auth=http_proxy_auth,
+ http_proxy_timeout=http_proxy_timeout,
subprotocols=self.subprotocols,
- host=host, origin=origin, suppress_origin=suppress_origin,
- proxy_type=proxy_type, socket=self.prepared_socket)
+ host=host,
+ origin=origin,
+ suppress_origin=suppress_origin,
+ proxy_type=proxy_type,
+ socket=self.prepared_socket,
+ )
_logging.info("Websocket connected")
@@ -432,7 +498,13 @@ class WebSocketApp:
self._callback(self.on_open)
dispatcher.read(self.sock.sock, read, check)
- except (WebSocketConnectionClosedException, ConnectionRefusedError, KeyboardInterrupt, SystemExit, Exception) as e:
+ except (
+ WebSocketConnectionClosedException,
+ ConnectionRefusedError,
+ KeyboardInterrupt,
+ SystemExit,
+ Exception,
+ ) as e:
handleDisconnect(e, reconnecting)
def read() -> bool:
@@ -441,7 +513,10 @@ class WebSocketApp:
try:
op_code, frame = self.sock.recv_data_frame(True)
- except (WebSocketConnectionClosedException, KeyboardInterrupt) as e:
+ except (
+ WebSocketConnectionClosedException,
+ KeyboardInterrupt,
+ ) as e:
if custom_dispatcher:
return handleDisconnect(e)
else:
@@ -455,10 +530,8 @@ class WebSocketApp:
self.last_pong_tm = time.time()
self._callback(self.on_pong, frame.data)
elif op_code == ABNF.OPCODE_CONT and self.on_cont_message:
- self._callback(self.on_data, frame.data,
- frame.opcode, frame.fin)
- self._callback(self.on_cont_message,
- frame.data, frame.fin)
+ self._callback(self.on_data, frame.data, frame.opcode, frame.fin)
+ self._callback(self.on_cont_message, frame.data, frame.fin)
else:
data = frame.data
if op_code == ABNF.OPCODE_TEXT and not skip_utf8_validation:
@@ -469,18 +542,38 @@ class WebSocketApp:
return True
def check() -> bool:
- if (self.ping_timeout):
- has_timeout_expired = time.time() - self.last_ping_tm > self.ping_timeout
- has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0
- has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > self.ping_timeout
-
- if (self.last_ping_tm and
- has_timeout_expired and
- (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)):
+ if self.ping_timeout:
+ has_timeout_expired = (
+ time.time() - self.last_ping_tm > self.ping_timeout
+ )
+ has_pong_not_arrived_after_last_ping = (
+ self.last_pong_tm - self.last_ping_tm < 0
+ )
+ has_pong_arrived_too_late = (
+ self.last_pong_tm - self.last_ping_tm > self.ping_timeout
+ )
+
+ if (
+ self.last_ping_tm
+ and has_timeout_expired
+ and (
+ has_pong_not_arrived_after_last_ping
+ or has_pong_arrived_too_late
+ )
+ ):
raise WebSocketTimeoutException("ping/pong timed out")
return True
- def handleDisconnect(e: Exception, reconnecting: bool = False) -> bool:
+ def handleDisconnect(
+ e: Union[
+ WebSocketConnectionClosedException,
+ ConnectionRefusedError,
+ KeyboardInterrupt,
+ SystemExit,
+ Exception,
+ ],
+ reconnecting: bool = False,
+ ) -> bool:
self.has_errored = True
self._stop_ping_thread()
if not reconnecting:
@@ -492,25 +585,31 @@ class WebSocketApp:
raise
if reconnect:
- _logging.info("{err} - reconnect".format(err=e))
+ _logging.info(f"{e} - reconnect")
if custom_dispatcher:
- _logging.debug("Calling custom dispatcher reconnect [{frame_count} frames in stack]".format(frame_count=len(inspect.stack())))
+ _logging.debug(
+ f"Calling custom dispatcher reconnect [{len(inspect.stack())} frames in stack]"
+ )
dispatcher.reconnect(reconnect, setSock)
else:
- _logging.error("{err} - goodbye".format(err=e))
+ _logging.error(f"{e} - goodbye")
teardown()
custom_dispatcher = bool(dispatcher)
- dispatcher = self.create_dispatcher(ping_timeout, dispatcher, parse_url(self.url)[3])
+ dispatcher = self.create_dispatcher(
+ ping_timeout, dispatcher, parse_url(self.url)[3]
+ )
try:
setSock()
if not custom_dispatcher and reconnect:
while self.keep_running:
- _logging.debug("Calling dispatcher reconnect [{frame_count} frames in stack]".format(frame_count=len(inspect.stack())))
+ _logging.debug(
+ f"Calling dispatcher reconnect [{len(inspect.stack())} frames in stack]"
+ )
dispatcher.reconnect(reconnect, setSock)
except (KeyboardInterrupt, Exception) as e:
- _logging.info("tearing down on exception {err}".format(err=e))
+ _logging.info(f"tearing down on exception {e}")
teardown()
finally:
if not custom_dispatcher:
@@ -519,13 +618,17 @@ class WebSocketApp:
return self.has_errored
- def create_dispatcher(self, ping_timeout: int, dispatcher: Dispatcher = None, is_ssl: bool = False) -> DispatcherBase:
+ def create_dispatcher(
+ self,
+ ping_timeout: Union[float, int, None],
+ dispatcher: Optional[DispatcherBase] = None,
+ is_ssl: bool = False,
+ ) -> Union[Dispatcher, SSLDispatcher, WrappedDispatcher]:
if dispatcher: # If custom dispatcher is set, use WrappedDispatcher
return WrappedDispatcher(self, ping_timeout, dispatcher)
timeout = ping_timeout or 10
if is_ssl:
return SSLDispatcher(self, timeout)
-
return Dispatcher(self, timeout)
def _get_close_args(self, close_frame: ABNF) -> list:
@@ -540,8 +643,12 @@ class WebSocketApp:
# Extract close frame status code
if close_frame.data and len(close_frame.data) >= 2:
- close_status_code = 256 * close_frame.data[0] + close_frame.data[1]
- reason = close_frame.data[2:].decode('utf-8')
+ close_status_code = 256 * int(close_frame.data[0]) + int(
+ close_frame.data[1]
+ )
+ reason = close_frame.data[2:]
+ if isinstance(reason, bytes):
+ reason = reason.decode("utf-8")
return [close_status_code, reason]
else:
# Most likely reached this because len(close_frame_data.data) < 2
@@ -553,6 +660,6 @@ class WebSocketApp:
callback(self, *args)
except Exception as e:
- _logging.error("error from callback {callback}: {err}".format(callback=callback, err=e))
+ _logging.error(f"error from callback {callback}: {e}")
if self.on_error:
self.on_error(self, e)