diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-12-09 18:25:21 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-12-09 19:18:57 +0300 |
commit | 13374e0884578812cda7697d0c5680122db59a37 (patch) | |
tree | 30a022eb841035299deb2b8c393b2902f0c21735 /contrib/python/websocket-client/websocket/_app.py | |
parent | c7ade6d3bf7cd492235a61b77153351e422a28f3 (diff) | |
download | ydb-13374e0884578812cda7697d0c5680122db59a37.tar.gz |
Intermediate changes
commit_hash:034150f557268506d7bc0cbd8b5becf65f765593
Diffstat (limited to 'contrib/python/websocket-client/websocket/_app.py')
-rw-r--r-- | contrib/python/websocket-client/websocket/_app.py | 677 |
1 files changed, 677 insertions, 0 deletions
diff --git a/contrib/python/websocket-client/websocket/_app.py b/contrib/python/websocket-client/websocket/_app.py new file mode 100644 index 0000000000..9fee76546b --- /dev/null +++ b/contrib/python/websocket-client/websocket/_app.py @@ -0,0 +1,677 @@ +import inspect +import selectors +import socket +import threading +import time +from typing import Any, Callable, Optional, Union + +from . import _logging +from ._abnf import ABNF +from ._core import WebSocket, getdefaulttimeout +from ._exceptions import ( + WebSocketConnectionClosedException, + WebSocketException, + WebSocketTimeoutException, +) +from ._ssl_compat import SSLEOFError +from ._url import parse_url + +""" +_app.py +websocket - WebSocket client library for Python + +Copyright 2024 engn33r + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +__all__ = ["WebSocketApp"] + +RECONNECT = 0 + + +def setReconnect(reconnectInterval: int) -> None: + global RECONNECT + RECONNECT = reconnectInterval + + +class DispatcherBase: + """ + DispatcherBase + """ + + def __init__(self, app: Any, ping_timeout: Union[float, int, None]) -> None: + self.app = app + self.ping_timeout = ping_timeout + + def timeout(self, seconds: Union[float, int, None], callback: Callable) -> None: + time.sleep(seconds) + callback() + + def reconnect(self, seconds: int, reconnector: Callable) -> None: + try: + _logging.info( + f"reconnect() - retrying in {seconds} seconds [{len(inspect.stack())} frames in stack]" + ) + time.sleep(seconds) + reconnector(reconnecting=True) + except KeyboardInterrupt as e: + _logging.info(f"User exited {e}") + raise e + + +class Dispatcher(DispatcherBase): + """ + Dispatcher + """ + + def read( + self, + sock: socket.socket, + read_callback: Callable, + check_callback: Callable, + ) -> None: + sel = selectors.DefaultSelector() + sel.register(self.app.sock.sock, selectors.EVENT_READ) + try: + while self.app.keep_running: + if sel.select(self.ping_timeout): + if not read_callback(): + break + check_callback() + finally: + sel.close() + + +class SSLDispatcher(DispatcherBase): + """ + SSLDispatcher + """ + + def read( + self, + sock: socket.socket, + read_callback: Callable, + check_callback: Callable, + ) -> None: + sock = self.app.sock.sock + sel = selectors.DefaultSelector() + sel.register(sock, selectors.EVENT_READ) + try: + while self.app.keep_running: + if self.select(sock, sel): + if not read_callback(): + break + check_callback() + finally: + sel.close() + + def select(self, sock, sel: selectors.DefaultSelector): + sock = self.app.sock.sock + if sock.pending(): + return [ + sock, + ] + + r = sel.select(self.ping_timeout) + + if len(r) > 0: + return r[0][0] + + +class WrappedDispatcher: + """ + WrappedDispatcher + """ + + def __init__(self, app, ping_timeout: Union[float, int, None], dispatcher) -> None: + self.app = app + self.ping_timeout = ping_timeout + self.dispatcher = dispatcher + dispatcher.signal(2, dispatcher.abort) # keyboard interrupt + + def read( + self, + sock: socket.socket, + read_callback: Callable, + check_callback: Callable, + ) -> None: + self.dispatcher.read(sock, read_callback) + self.ping_timeout and self.timeout(self.ping_timeout, check_callback) + + def timeout(self, seconds: float, callback: Callable) -> None: + self.dispatcher.timeout(seconds, callback) + + def reconnect(self, seconds: int, reconnector: Callable) -> None: + self.timeout(seconds, reconnector) + + +class WebSocketApp: + """ + Higher level of APIs are provided. The interface is like JavaScript WebSocket object. + """ + + def __init__( + self, + url: str, + header: Union[list, dict, Callable, None] = None, + on_open: Optional[Callable[[WebSocket], None]] = None, + on_reconnect: Optional[Callable[[WebSocket], None]] = None, + on_message: Optional[Callable[[WebSocket, Any], None]] = None, + on_error: Optional[Callable[[WebSocket, Any], None]] = None, + on_close: Optional[Callable[[WebSocket, Any, Any], None]] = None, + on_ping: Optional[Callable] = None, + on_pong: Optional[Callable] = None, + on_cont_message: Optional[Callable] = None, + keep_running: bool = True, + get_mask_key: Optional[Callable] = None, + cookie: Optional[str] = None, + subprotocols: Optional[list] = None, + on_data: Optional[Callable] = None, + socket: Optional[socket.socket] = None, + ) -> None: + """ + WebSocketApp initialization + + Parameters + ---------- + url: str + Websocket url. + header: list or dict or Callable + Custom header for websocket handshake. + If the parameter is a callable object, it is called just before the connection attempt. + The returned dict or list is used as custom header value. + This could be useful in order to properly setup timestamp dependent headers. + on_open: function + Callback object which is called at opening websocket. + on_open has one argument. + The 1st argument is this class object. + on_reconnect: function + Callback object which is called at reconnecting websocket. + on_reconnect has one argument. + The 1st argument is this class object. + on_message: function + Callback object which is called when received data. + on_message has 2 arguments. + The 1st argument is this class object. + The 2nd argument is utf-8 data received from the server. + on_error: function + Callback object which is called when we get error. + on_error has 2 arguments. + The 1st argument is this class object. + The 2nd argument is exception object. + on_close: function + Callback object which is called when connection is closed. + on_close has 3 arguments. + The 1st argument is this class object. + The 2nd argument is close_status_code. + The 3rd argument is close_msg. + on_cont_message: function + Callback object which is called when a continuation + frame is received. + on_cont_message has 3 arguments. + The 1st argument is this class object. + The 2nd argument is utf-8 string which we get from the server. + The 3rd argument is continue flag. if 0, the data continue + to next frame data + on_data: function + Callback object which is called when a message received. + This is called before on_message or on_cont_message, + and then on_message or on_cont_message is called. + on_data has 4 argument. + The 1st argument is this class object. + The 2nd argument is utf-8 string which we get from the server. + The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came. + The 4th argument is continue flag. If 0, the data continue + keep_running: bool + This parameter is obsolete and ignored. + get_mask_key: function + A callable function to get new mask keys, see the + WebSocket.set_mask_key's docstring for more information. + cookie: str + Cookie value. + subprotocols: list + List of available sub protocols. Default is None. + socket: socket + Pre-initialized stream socket. + """ + self.url = url + self.header = header if header is not None else [] + self.cookie = cookie + + self.on_open = on_open + self.on_reconnect = on_reconnect + self.on_message = on_message + self.on_data = on_data + self.on_error = on_error + self.on_close = on_close + self.on_ping = on_ping + self.on_pong = on_pong + self.on_cont_message = on_cont_message + self.keep_running = False + self.get_mask_key = get_mask_key + self.sock: Optional[WebSocket] = None + self.last_ping_tm = float(0) + self.last_pong_tm = float(0) + self.ping_thread: Optional[threading.Thread] = None + self.stop_ping: Optional[threading.Event] = None + self.ping_interval = float(0) + self.ping_timeout: Union[float, int, None] = None + self.ping_payload = "" + self.subprotocols = subprotocols + self.prepared_socket = socket + self.has_errored = False + self.has_done_teardown = False + self.has_done_teardown_lock = threading.Lock() + + def send(self, data: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> None: + """ + send message + + Parameters + ---------- + data: str + Message to send. If you set opcode to OPCODE_TEXT, + data must be utf-8 string or unicode. + opcode: int + Operation code of data. Default is OPCODE_TEXT. + """ + + if not self.sock or self.sock.send(data, opcode) == 0: + raise WebSocketConnectionClosedException("Connection is already closed.") + + def send_text(self, text_data: str) -> None: + """ + Sends UTF-8 encoded text. + """ + if not self.sock or self.sock.send(text_data, ABNF.OPCODE_TEXT) == 0: + raise WebSocketConnectionClosedException("Connection is already closed.") + + def send_bytes(self, data: Union[bytes, bytearray]) -> None: + """ + Sends a sequence of bytes. + """ + if not self.sock or self.sock.send(data, ABNF.OPCODE_BINARY) == 0: + raise WebSocketConnectionClosedException("Connection is already closed.") + + def close(self, **kwargs) -> None: + """ + Close websocket connection. + """ + self.keep_running = False + if self.sock: + self.sock.close(**kwargs) + self.sock = None + + def _start_ping_thread(self) -> None: + self.last_ping_tm = self.last_pong_tm = float(0) + self.stop_ping = threading.Event() + self.ping_thread = threading.Thread(target=self._send_ping) + self.ping_thread.daemon = True + self.ping_thread.start() + + def _stop_ping_thread(self) -> None: + if self.stop_ping: + self.stop_ping.set() + if self.ping_thread and self.ping_thread.is_alive(): + self.ping_thread.join(3) + self.last_ping_tm = self.last_pong_tm = float(0) + + def _send_ping(self) -> None: + if self.stop_ping.wait(self.ping_interval) or self.keep_running is False: + return + while not self.stop_ping.wait(self.ping_interval) and self.keep_running is True: + if self.sock: + self.last_ping_tm = time.time() + try: + _logging.debug("Sending ping") + self.sock.ping(self.ping_payload) + except Exception as e: + _logging.debug(f"Failed to send ping: {e}") + + def run_forever( + self, + sockopt: tuple = None, + sslopt: dict = None, + ping_interval: Union[float, int] = 0, + ping_timeout: Union[float, int, None] = None, + ping_payload: str = "", + http_proxy_host: str = None, + http_proxy_port: Union[int, str] = None, + http_no_proxy: list = None, + http_proxy_auth: tuple = None, + http_proxy_timeout: Optional[float] = None, + skip_utf8_validation: bool = False, + host: str = None, + origin: str = None, + dispatcher=None, + suppress_origin: bool = False, + proxy_type: str = None, + reconnect: int = None, + ) -> bool: + """ + Run event loop for WebSocket framework. + + This loop is an infinite loop and is alive while websocket is available. + + Parameters + ---------- + sockopt: tuple + Values for socket.setsockopt. + sockopt must be tuple + and each element is argument of sock.setsockopt. + sslopt: dict + Optional dict object for ssl socket option. + ping_interval: int or float + Automatically send "ping" command + every specified period (in seconds). + If set to 0, no ping is sent periodically. + ping_timeout: int or float + Timeout (in seconds) if the pong message is not received. + ping_payload: str + Payload message to send with each ping. + http_proxy_host: str + HTTP proxy host name. + http_proxy_port: int or str + HTTP proxy port. If not set, set to 80. + http_no_proxy: list + Whitelisted host names that don't use the proxy. + http_proxy_timeout: int or float + HTTP proxy timeout, default is 60 sec as per python-socks. + http_proxy_auth: tuple + HTTP proxy auth information. tuple of username and password. Default is None. + skip_utf8_validation: bool + skip utf8 validation. + host: str + update host header. + origin: str + update origin header. + dispatcher: Dispatcher object + customize reading data from socket. + suppress_origin: bool + suppress outputting origin header. + proxy_type: str + type of proxy from: http, socks4, socks4a, socks5, socks5h + reconnect: int + delay interval when reconnecting + + Returns + ------- + teardown: bool + False if the `WebSocketApp` is closed or caught KeyboardInterrupt, + True if any other exception was raised during a loop. + """ + + if reconnect is None: + reconnect = RECONNECT + + if ping_timeout is not None and ping_timeout <= 0: + raise WebSocketException("Ensure ping_timeout > 0") + if ping_interval is not None and ping_interval < 0: + raise WebSocketException("Ensure ping_interval >= 0") + if ping_timeout and ping_interval and ping_interval <= ping_timeout: + raise WebSocketException("Ensure ping_interval > ping_timeout") + if not sockopt: + sockopt = () + if not sslopt: + sslopt = {} + if self.sock: + raise WebSocketException("socket is already opened") + + self.ping_interval = ping_interval + self.ping_timeout = ping_timeout + self.ping_payload = ping_payload + self.has_done_teardown = False + self.keep_running = True + + def teardown(close_frame: ABNF = None): + """ + Tears down the connection. + + Parameters + ---------- + close_frame: ABNF frame + If close_frame is set, the on_close handler is invoked + with the statusCode and reason from the provided frame. + """ + + # teardown() is called in many code paths to ensure resources are cleaned up and on_close is fired. + # To ensure the work is only done once, we use this bool and lock. + with self.has_done_teardown_lock: + if self.has_done_teardown: + return + self.has_done_teardown = True + + self._stop_ping_thread() + self.keep_running = False + if self.sock: + self.sock.close() + close_status_code, close_reason = self._get_close_args( + close_frame if close_frame else None + ) + self.sock = None + + # Finally call the callback AFTER all teardown is complete + self._callback(self.on_close, close_status_code, close_reason) + + def setSock(reconnecting: bool = False) -> None: + if reconnecting and self.sock: + self.sock.shutdown() + + self.sock = WebSocket( + self.get_mask_key, + sockopt=sockopt, + sslopt=sslopt, + fire_cont_frame=self.on_cont_message is not None, + skip_utf8_validation=skip_utf8_validation, + enable_multithread=True, + ) + + self.sock.settimeout(getdefaulttimeout()) + try: + header = self.header() if callable(self.header) else self.header + + self.sock.connect( + self.url, + header=header, + cookie=self.cookie, + http_proxy_host=http_proxy_host, + http_proxy_port=http_proxy_port, + http_no_proxy=http_no_proxy, + http_proxy_auth=http_proxy_auth, + http_proxy_timeout=http_proxy_timeout, + subprotocols=self.subprotocols, + host=host, + origin=origin, + suppress_origin=suppress_origin, + proxy_type=proxy_type, + socket=self.prepared_socket, + ) + + _logging.info("Websocket connected") + + if self.ping_interval: + self._start_ping_thread() + + if reconnecting and self.on_reconnect: + self._callback(self.on_reconnect) + else: + self._callback(self.on_open) + + dispatcher.read(self.sock.sock, read, check) + except ( + WebSocketConnectionClosedException, + ConnectionRefusedError, + KeyboardInterrupt, + SystemExit, + Exception, + ) as e: + handleDisconnect(e, reconnecting) + + def read() -> bool: + if not self.keep_running: + return teardown() + + try: + op_code, frame = self.sock.recv_data_frame(True) + except ( + WebSocketConnectionClosedException, + KeyboardInterrupt, + SSLEOFError, + ) as e: + if custom_dispatcher: + return handleDisconnect(e, bool(reconnect)) + else: + raise e + + if op_code == ABNF.OPCODE_CLOSE: + return teardown(frame) + elif op_code == ABNF.OPCODE_PING: + self._callback(self.on_ping, frame.data) + elif op_code == ABNF.OPCODE_PONG: + self.last_pong_tm = time.time() + self._callback(self.on_pong, frame.data) + elif op_code == ABNF.OPCODE_CONT and self.on_cont_message: + self._callback(self.on_data, frame.data, frame.opcode, frame.fin) + self._callback(self.on_cont_message, frame.data, frame.fin) + else: + data = frame.data + if op_code == ABNF.OPCODE_TEXT and not skip_utf8_validation: + data = data.decode("utf-8") + self._callback(self.on_data, data, frame.opcode, True) + self._callback(self.on_message, data) + + return True + + def check() -> bool: + if self.ping_timeout: + has_timeout_expired = ( + time.time() - self.last_ping_tm > self.ping_timeout + ) + has_pong_not_arrived_after_last_ping = ( + self.last_pong_tm - self.last_ping_tm < 0 + ) + has_pong_arrived_too_late = ( + self.last_pong_tm - self.last_ping_tm > self.ping_timeout + ) + + if ( + self.last_ping_tm + and has_timeout_expired + and ( + has_pong_not_arrived_after_last_ping + or has_pong_arrived_too_late + ) + ): + raise WebSocketTimeoutException("ping/pong timed out") + return True + + def handleDisconnect( + e: Union[ + WebSocketConnectionClosedException, + ConnectionRefusedError, + KeyboardInterrupt, + SystemExit, + Exception, + ], + reconnecting: bool = False, + ) -> bool: + self.has_errored = True + self._stop_ping_thread() + if not reconnecting: + self._callback(self.on_error, e) + + if isinstance(e, (KeyboardInterrupt, SystemExit)): + teardown() + # Propagate further + raise + + if reconnect: + _logging.info(f"{e} - reconnect") + if custom_dispatcher: + _logging.debug( + f"Calling custom dispatcher reconnect [{len(inspect.stack())} frames in stack]" + ) + dispatcher.reconnect(reconnect, setSock) + else: + _logging.error(f"{e} - goodbye") + teardown() + + custom_dispatcher = bool(dispatcher) + dispatcher = self.create_dispatcher( + ping_timeout, dispatcher, parse_url(self.url)[3] + ) + + try: + setSock() + if not custom_dispatcher and reconnect: + while self.keep_running: + _logging.debug( + f"Calling dispatcher reconnect [{len(inspect.stack())} frames in stack]" + ) + dispatcher.reconnect(reconnect, setSock) + except (KeyboardInterrupt, Exception) as e: + _logging.info(f"tearing down on exception {e}") + teardown() + finally: + if not custom_dispatcher: + # Ensure teardown was called before returning from run_forever + teardown() + + return self.has_errored + + def create_dispatcher( + self, + ping_timeout: Union[float, int, None], + dispatcher: Optional[DispatcherBase] = None, + is_ssl: bool = False, + ) -> Union[Dispatcher, SSLDispatcher, WrappedDispatcher]: + if dispatcher: # If custom dispatcher is set, use WrappedDispatcher + return WrappedDispatcher(self, ping_timeout, dispatcher) + timeout = ping_timeout or 10 + if is_ssl: + return SSLDispatcher(self, timeout) + return Dispatcher(self, timeout) + + def _get_close_args(self, close_frame: ABNF) -> list: + """ + _get_close_args extracts the close code and reason from the close body + if it exists (RFC6455 says WebSocket Connection Close Code is optional) + """ + # Need to catch the case where close_frame is None + # Otherwise the following if statement causes an error + if not self.on_close or not close_frame: + return [None, None] + + # Extract close frame status code + if close_frame.data and len(close_frame.data) >= 2: + close_status_code = 256 * int(close_frame.data[0]) + int( + close_frame.data[1] + ) + reason = close_frame.data[2:] + if isinstance(reason, bytes): + reason = reason.decode("utf-8") + return [close_status_code, reason] + else: + # Most likely reached this because len(close_frame_data.data) < 2 + return [None, None] + + def _callback(self, callback, *args) -> None: + if callback: + try: + callback(self, *args) + + except Exception as e: + _logging.error(f"error from callback {callback}: {e}") + if self.on_error: + self.on_error(self, e) |