diff options
author | AlexSm <alex@ydb.tech> | 2023-12-21 15:05:38 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-21 15:05:38 +0100 |
commit | e98bcbc74422492351c51646dba3849a138a8ffc (patch) | |
tree | 38ad7a09b1f9c201ce8a7e3d69f2017388769224 /contrib/python/websocket-client/py3/websocket/_app.py | |
parent | 559d7083cd8378cb25b9e966dedcca21d413e338 (diff) | |
download | ydb-e98bcbc74422492351c51646dba3849a138a8ffc.tar.gz |
Import libs 1 (#590)
* Import libs 1
* Add new file without extension
* Add file missed in export config
Diffstat (limited to 'contrib/python/websocket-client/py3/websocket/_app.py')
-rw-r--r-- | contrib/python/websocket-client/py3/websocket/_app.py | 283 |
1 files changed, 195 insertions, 88 deletions
diff --git a/contrib/python/websocket-client/py3/websocket/_app.py b/contrib/python/websocket-client/py3/websocket/_app.py index 13f8bd5634..4d8af3b5b1 100644 --- a/contrib/python/websocket-client/py3/websocket/_app.py +++ b/contrib/python/websocket-client/py3/websocket/_app.py @@ -1,18 +1,19 @@ import inspect import selectors import socket -import sys import threading import time -import traceback - from typing import Any, Callable, Optional, Union from . import _logging from ._abnf import ABNF -from ._url import parse_url from ._core import WebSocket, getdefaulttimeout -from ._exceptions import * +from ._exceptions import ( + WebSocketConnectionClosedException, + WebSocketException, + WebSocketTimeoutException, +) +from ._url import parse_url """ _app.py @@ -47,22 +48,24 @@ class DispatcherBase: """ DispatcherBase """ - def __init__(self, app: Any, ping_timeout: float) -> None: + + def __init__(self, app: Any, ping_timeout: Union[float, int, None]) -> None: self.app = app self.ping_timeout = ping_timeout - def timeout(self, seconds: int, callback: Callable) -> None: + def timeout(self, seconds: Union[float, int, None], callback: Callable) -> None: time.sleep(seconds) callback() def reconnect(self, seconds: int, reconnector: Callable) -> None: try: - _logging.info("reconnect() - retrying in {seconds_count} seconds [{frame_count} frames in stack]".format( - seconds_count=seconds, frame_count=len(inspect.stack()))) + _logging.info( + f"reconnect() - retrying in {seconds} seconds [{len(inspect.stack())} frames in stack]" + ) time.sleep(seconds) reconnector(reconnecting=True) except KeyboardInterrupt as e: - _logging.info("User exited {err}".format(err=e)) + _logging.info(f"User exited {e}") raise e @@ -70,13 +73,18 @@ class Dispatcher(DispatcherBase): """ Dispatcher """ - def read(self, sock: socket.socket, read_callback: Callable, check_callback: Callable) -> None: + + def read( + self, + sock: socket.socket, + read_callback: Callable, + check_callback: Callable, + ) -> None: sel = selectors.DefaultSelector() sel.register(self.app.sock.sock, selectors.EVENT_READ) try: while self.app.keep_running: - r = sel.select(self.ping_timeout) - if r: + if sel.select(self.ping_timeout): if not read_callback(): break check_callback() @@ -88,24 +96,31 @@ class SSLDispatcher(DispatcherBase): """ SSLDispatcher """ - def read(self, sock: socket.socket, read_callback: Callable, check_callback: Callable) -> None: + + def read( + self, + sock: socket.socket, + read_callback: Callable, + check_callback: Callable, + ) -> None: sock = self.app.sock.sock sel = selectors.DefaultSelector() sel.register(sock, selectors.EVENT_READ) try: while self.app.keep_running: - r = self.select(sock, sel) - if r: + if self.select(sock, sel): if not read_callback(): break check_callback() finally: sel.close() - def select(self, sock, sel:selectors.DefaultSelector): + def select(self, sock, sel: selectors.DefaultSelector): sock = self.app.sock.sock if sock.pending(): - return [sock,] + return [ + sock, + ] r = sel.select(self.ping_timeout) @@ -117,17 +132,23 @@ class WrappedDispatcher: """ WrappedDispatcher """ - def __init__(self, app, ping_timeout: float, dispatcher: Dispatcher) -> None: + + def __init__(self, app, ping_timeout: Union[float, int, None], dispatcher) -> None: self.app = app self.ping_timeout = ping_timeout self.dispatcher = dispatcher dispatcher.signal(2, dispatcher.abort) # keyboard interrupt - def read(self, sock: socket.socket, read_callback: Callable, check_callback: Callable) -> None: + def read( + self, + sock: socket.socket, + read_callback: Callable, + check_callback: Callable, + ) -> None: self.dispatcher.read(sock, read_callback) self.ping_timeout and self.timeout(self.ping_timeout, check_callback) - def timeout(self, seconds: int, callback: Callable) -> None: + def timeout(self, seconds: float, callback: Callable) -> None: self.dispatcher.timeout(seconds, callback) def reconnect(self, seconds: int, reconnector: Callable) -> None: @@ -139,14 +160,24 @@ class WebSocketApp: Higher level of APIs are provided. The interface is like JavaScript WebSocket object. """ - def __init__(self, url: str, header: Union[list, dict, Callable] = None, - on_open: Callable = None, on_message: Callable = None, on_error: Callable = None, - on_close: Callable = None, on_ping: Callable = None, on_pong: Callable = None, - on_cont_message: Callable = None, - keep_running: bool = True, get_mask_key: Callable = None, cookie: str = None, - subprotocols: list = None, - on_data: Callable = None, - socket: socket.socket = None) -> None: + def __init__( + self, + url: str, + header: Union[list, dict, Callable, None] = None, + on_open: Optional[Callable[[WebSocket], None]] = None, + on_message: Optional[Callable[[WebSocket, Any], None]] = None, + on_error: Optional[Callable[[WebSocket, Any], None]] = None, + on_close: Optional[Callable[[WebSocket, Any, Any], None]] = None, + on_ping: Optional[Callable] = None, + on_pong: Optional[Callable] = None, + on_cont_message: Optional[Callable] = None, + keep_running: bool = True, + get_mask_key: Optional[Callable] = None, + cookie: Optional[str] = None, + subprotocols: Optional[list] = None, + on_data: Optional[Callable] = None, + socket: Optional[socket.socket] = None, + ) -> None: """ WebSocketApp initialization @@ -222,13 +253,13 @@ class WebSocketApp: self.on_cont_message = on_cont_message self.keep_running = False self.get_mask_key = get_mask_key - self.sock = None - self.last_ping_tm = 0 - self.last_pong_tm = 0 - self.ping_thread = None - self.stop_ping = None - self.ping_interval = 0 - self.ping_timeout = None + self.sock: Optional[WebSocket] = None + self.last_ping_tm = float(0) + self.last_pong_tm = float(0) + self.ping_thread: Optional[threading.Thread] = None + self.stop_ping: Optional[threading.Event] = None + self.ping_interval = float(0) + self.ping_timeout: Union[float, int, None] = None self.ping_payload = "" self.subprotocols = subprotocols self.prepared_socket = socket @@ -236,7 +267,7 @@ class WebSocketApp: self.has_done_teardown = False self.has_done_teardown_lock = threading.Lock() - def send(self, data: str, opcode: int = ABNF.OPCODE_TEXT) -> None: + def send(self, data: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> None: """ send message @@ -250,8 +281,21 @@ class WebSocketApp: """ if not self.sock or self.sock.send(data, opcode) == 0: - raise WebSocketConnectionClosedException( - "Connection is already closed.") + raise WebSocketConnectionClosedException("Connection is already closed.") + + def send_text(self, text_data: str) -> None: + """ + Sends UTF-8 encoded text. + """ + if not self.sock or self.sock.send(text_data, ABNF.OPCODE_TEXT) == 0: + raise WebSocketConnectionClosedException("Connection is already closed.") + + def send_bytes(self, data: Union[bytes, bytearray]) -> None: + """ + Sends a sequence of bytes. + """ + if not self.sock or self.sock.send(data, ABNF.OPCODE_BINARY) == 0: + raise WebSocketConnectionClosedException("Connection is already closed.") def close(self, **kwargs) -> None: """ @@ -263,7 +307,7 @@ class WebSocketApp: self.sock = None def _start_ping_thread(self) -> None: - self.last_ping_tm = self.last_pong_tm = 0 + self.last_ping_tm = self.last_pong_tm = float(0) self.stop_ping = threading.Event() self.ping_thread = threading.Thread(target=self._send_ping) self.ping_thread.daemon = True @@ -274,7 +318,7 @@ class WebSocketApp: self.stop_ping.set() if self.ping_thread and self.ping_thread.is_alive(): self.ping_thread.join(3) - self.last_ping_tm = self.last_pong_tm = 0 + self.last_ping_tm = self.last_pong_tm = float(0) def _send_ping(self) -> None: if self.stop_ping.wait(self.ping_interval) or self.keep_running is False: @@ -286,17 +330,28 @@ class WebSocketApp: _logging.debug("Sending ping") self.sock.ping(self.ping_payload) except Exception as e: - _logging.debug("Failed to send ping: {err}".format(err=e)) - - def run_forever(self, sockopt: tuple = None, sslopt: dict = None, - ping_interval: float = 0, ping_timeout: Optional[float] = None, - ping_payload: str = "", - http_proxy_host: str = None, http_proxy_port: Union[int, str] = None, - http_no_proxy: list = None, http_proxy_auth: tuple = None, - http_proxy_timeout: float = None, - skip_utf8_validation: bool = False, - host: str = None, origin: str = None, dispatcher: Dispatcher = None, - suppress_origin: bool = False, proxy_type: str = None, reconnect: int = None) -> bool: + _logging.debug(f"Failed to send ping: {e}") + + def run_forever( + self, + sockopt: tuple = None, + sslopt: dict = None, + ping_interval: Union[float, int] = 0, + ping_timeout: Union[float, int, None] = None, + ping_payload: str = "", + http_proxy_host: str = None, + http_proxy_port: Union[int, str] = None, + http_no_proxy: list = None, + http_proxy_auth: tuple = None, + http_proxy_timeout: Optional[float] = None, + skip_utf8_validation: bool = False, + host: str = None, + origin: str = None, + dispatcher=None, + suppress_origin: bool = False, + proxy_type: str = None, + reconnect: int = None, + ) -> bool: """ Run event loop for WebSocket framework. @@ -360,7 +415,7 @@ class WebSocketApp: if ping_timeout and ping_interval and ping_interval <= ping_timeout: raise WebSocketException("Ensure ping_interval > ping_timeout") if not sockopt: - sockopt = [] + sockopt = () if not sslopt: sslopt = {} if self.sock: @@ -394,7 +449,8 @@ class WebSocketApp: if self.sock: self.sock.close() close_status_code, close_reason = self._get_close_args( - close_frame if close_frame else None) + close_frame if close_frame else None + ) self.sock = None # Finally call the callback AFTER all teardown is complete @@ -405,24 +461,34 @@ class WebSocketApp: self.sock.shutdown() self.sock = WebSocket( - self.get_mask_key, sockopt=sockopt, sslopt=sslopt, + self.get_mask_key, + sockopt=sockopt, + sslopt=sslopt, fire_cont_frame=self.on_cont_message is not None, skip_utf8_validation=skip_utf8_validation, - enable_multithread=True) + enable_multithread=True, + ) self.sock.settimeout(getdefaulttimeout()) try: - header = self.header() if callable(self.header) else self.header self.sock.connect( - self.url, header=header, cookie=self.cookie, + self.url, + header=header, + cookie=self.cookie, http_proxy_host=http_proxy_host, - http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy, - http_proxy_auth=http_proxy_auth, http_proxy_timeout=http_proxy_timeout, + http_proxy_port=http_proxy_port, + http_no_proxy=http_no_proxy, + http_proxy_auth=http_proxy_auth, + http_proxy_timeout=http_proxy_timeout, subprotocols=self.subprotocols, - host=host, origin=origin, suppress_origin=suppress_origin, - proxy_type=proxy_type, socket=self.prepared_socket) + host=host, + origin=origin, + suppress_origin=suppress_origin, + proxy_type=proxy_type, + socket=self.prepared_socket, + ) _logging.info("Websocket connected") @@ -432,7 +498,13 @@ class WebSocketApp: self._callback(self.on_open) dispatcher.read(self.sock.sock, read, check) - except (WebSocketConnectionClosedException, ConnectionRefusedError, KeyboardInterrupt, SystemExit, Exception) as e: + except ( + WebSocketConnectionClosedException, + ConnectionRefusedError, + KeyboardInterrupt, + SystemExit, + Exception, + ) as e: handleDisconnect(e, reconnecting) def read() -> bool: @@ -441,7 +513,10 @@ class WebSocketApp: try: op_code, frame = self.sock.recv_data_frame(True) - except (WebSocketConnectionClosedException, KeyboardInterrupt) as e: + except ( + WebSocketConnectionClosedException, + KeyboardInterrupt, + ) as e: if custom_dispatcher: return handleDisconnect(e) else: @@ -455,10 +530,8 @@ class WebSocketApp: self.last_pong_tm = time.time() self._callback(self.on_pong, frame.data) elif op_code == ABNF.OPCODE_CONT and self.on_cont_message: - self._callback(self.on_data, frame.data, - frame.opcode, frame.fin) - self._callback(self.on_cont_message, - frame.data, frame.fin) + self._callback(self.on_data, frame.data, frame.opcode, frame.fin) + self._callback(self.on_cont_message, frame.data, frame.fin) else: data = frame.data if op_code == ABNF.OPCODE_TEXT and not skip_utf8_validation: @@ -469,18 +542,38 @@ class WebSocketApp: return True def check() -> bool: - if (self.ping_timeout): - has_timeout_expired = time.time() - self.last_ping_tm > self.ping_timeout - has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0 - has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > self.ping_timeout - - if (self.last_ping_tm and - has_timeout_expired and - (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)): + if self.ping_timeout: + has_timeout_expired = ( + time.time() - self.last_ping_tm > self.ping_timeout + ) + has_pong_not_arrived_after_last_ping = ( + self.last_pong_tm - self.last_ping_tm < 0 + ) + has_pong_arrived_too_late = ( + self.last_pong_tm - self.last_ping_tm > self.ping_timeout + ) + + if ( + self.last_ping_tm + and has_timeout_expired + and ( + has_pong_not_arrived_after_last_ping + or has_pong_arrived_too_late + ) + ): raise WebSocketTimeoutException("ping/pong timed out") return True - def handleDisconnect(e: Exception, reconnecting: bool = False) -> bool: + def handleDisconnect( + e: Union[ + WebSocketConnectionClosedException, + ConnectionRefusedError, + KeyboardInterrupt, + SystemExit, + Exception, + ], + reconnecting: bool = False, + ) -> bool: self.has_errored = True self._stop_ping_thread() if not reconnecting: @@ -492,25 +585,31 @@ class WebSocketApp: raise if reconnect: - _logging.info("{err} - reconnect".format(err=e)) + _logging.info(f"{e} - reconnect") if custom_dispatcher: - _logging.debug("Calling custom dispatcher reconnect [{frame_count} frames in stack]".format(frame_count=len(inspect.stack()))) + _logging.debug( + f"Calling custom dispatcher reconnect [{len(inspect.stack())} frames in stack]" + ) dispatcher.reconnect(reconnect, setSock) else: - _logging.error("{err} - goodbye".format(err=e)) + _logging.error(f"{e} - goodbye") teardown() custom_dispatcher = bool(dispatcher) - dispatcher = self.create_dispatcher(ping_timeout, dispatcher, parse_url(self.url)[3]) + dispatcher = self.create_dispatcher( + ping_timeout, dispatcher, parse_url(self.url)[3] + ) try: setSock() if not custom_dispatcher and reconnect: while self.keep_running: - _logging.debug("Calling dispatcher reconnect [{frame_count} frames in stack]".format(frame_count=len(inspect.stack()))) + _logging.debug( + f"Calling dispatcher reconnect [{len(inspect.stack())} frames in stack]" + ) dispatcher.reconnect(reconnect, setSock) except (KeyboardInterrupt, Exception) as e: - _logging.info("tearing down on exception {err}".format(err=e)) + _logging.info(f"tearing down on exception {e}") teardown() finally: if not custom_dispatcher: @@ -519,13 +618,17 @@ class WebSocketApp: return self.has_errored - def create_dispatcher(self, ping_timeout: int, dispatcher: Dispatcher = None, is_ssl: bool = False) -> DispatcherBase: + def create_dispatcher( + self, + ping_timeout: Union[float, int, None], + dispatcher: Optional[DispatcherBase] = None, + is_ssl: bool = False, + ) -> Union[Dispatcher, SSLDispatcher, WrappedDispatcher]: if dispatcher: # If custom dispatcher is set, use WrappedDispatcher return WrappedDispatcher(self, ping_timeout, dispatcher) timeout = ping_timeout or 10 if is_ssl: return SSLDispatcher(self, timeout) - return Dispatcher(self, timeout) def _get_close_args(self, close_frame: ABNF) -> list: @@ -540,8 +643,12 @@ class WebSocketApp: # Extract close frame status code if close_frame.data and len(close_frame.data) >= 2: - close_status_code = 256 * close_frame.data[0] + close_frame.data[1] - reason = close_frame.data[2:].decode('utf-8') + close_status_code = 256 * int(close_frame.data[0]) + int( + close_frame.data[1] + ) + reason = close_frame.data[2:] + if isinstance(reason, bytes): + reason = reason.decode("utf-8") return [close_status_code, reason] else: # Most likely reached this because len(close_frame_data.data) < 2 @@ -553,6 +660,6 @@ class WebSocketApp: callback(self, *args) except Exception as e: - _logging.error("error from callback {callback}: {err}".format(callback=callback, err=e)) + _logging.error(f"error from callback {callback}: {e}") if self.on_error: self.on_error(self, e) |