diff options
author | AlexSm <alex@ydb.tech> | 2023-12-21 15:05:38 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-21 15:05:38 +0100 |
commit | e98bcbc74422492351c51646dba3849a138a8ffc (patch) | |
tree | 38ad7a09b1f9c201ce8a7e3d69f2017388769224 /contrib/python/websocket-client/py3/websocket/_handshake.py | |
parent | 559d7083cd8378cb25b9e966dedcca21d413e338 (diff) | |
download | ydb-e98bcbc74422492351c51646dba3849a138a8ffc.tar.gz |
Import libs 1 (#590)
* Import libs 1
* Add new file without extension
* Add file missed in export config
Diffstat (limited to 'contrib/python/websocket-client/py3/websocket/_handshake.py')
-rw-r--r-- | contrib/python/websocket-client/py3/websocket/_handshake.py | 109 |
1 files changed, 57 insertions, 52 deletions
diff --git a/contrib/python/websocket-client/py3/websocket/_handshake.py b/contrib/python/websocket-client/py3/websocket/_handshake.py index a94d3030c3..e63dc97931 100644 --- a/contrib/python/websocket-client/py3/websocket/_handshake.py +++ b/contrib/python/websocket-client/py3/websocket/_handshake.py @@ -20,7 +20,8 @@ import hashlib import hmac import os from base64 import encodebytes as base64encode -from http import client as HTTPStatus +from http import HTTPStatus + from ._cookiejar import SimpleCookieJar from ._exceptions import * from ._http import * @@ -32,14 +33,19 @@ __all__ = ["handshake_response", "handshake", "SUPPORTED_REDIRECT_STATUSES"] # websocket supported version. VERSION = 13 -SUPPORTED_REDIRECT_STATUSES = (HTTPStatus.MOVED_PERMANENTLY, HTTPStatus.FOUND, HTTPStatus.SEE_OTHER, HTTPStatus.TEMPORARY_REDIRECT, HTTPStatus.PERMANENT_REDIRECT) +SUPPORTED_REDIRECT_STATUSES = ( + HTTPStatus.MOVED_PERMANENTLY, + HTTPStatus.FOUND, + HTTPStatus.SEE_OTHER, + HTTPStatus.TEMPORARY_REDIRECT, + HTTPStatus.PERMANENT_REDIRECT, +) SUCCESS_STATUSES = SUPPORTED_REDIRECT_STATUSES + (HTTPStatus.SWITCHING_PROTOCOLS,) CookieJar = SimpleCookieJar() class handshake_response: - def __init__(self, status: int, headers: dict, subprotocol): self.status = status self.headers = headers @@ -47,7 +53,9 @@ class handshake_response: CookieJar.add(headers.get("set-cookie")) -def handshake(sock, url: str, hostname: str, port: int, resource: str, **options): +def handshake( + sock, url: str, hostname: str, port: int, resource: str, **options +) -> handshake_response: headers, key = _get_handshake_headers(resource, url, hostname, port, options) header_str = "\r\n".join(headers) @@ -66,74 +74,64 @@ def handshake(sock, url: str, hostname: str, port: int, resource: str, **options def _pack_hostname(hostname: str) -> str: # IPv6 address - if ':' in hostname: - return '[' + hostname + ']' - + if ":" in hostname: + return f"[{hostname}]" return hostname -def _get_handshake_headers(resource: str, url: str, host: str, port: int, options: dict): - headers = [ - "GET {resource} HTTP/1.1".format(resource=resource), - "Upgrade: websocket" - ] - if port == 80 or port == 443: +def _get_handshake_headers( + resource: str, url: str, host: str, port: int, options: dict +) -> tuple: + headers = [f"GET {resource} HTTP/1.1", "Upgrade: websocket"] + if port in [80, 443]: hostport = _pack_hostname(host) else: - hostport = "{h}:{p}".format(h=_pack_hostname(host), p=port) + hostport = f"{_pack_hostname(host)}:{port}" if options.get("host"): - headers.append("Host: {h}".format(h=options["host"])) + headers.append(f'Host: {options["host"]}') else: - headers.append("Host: {hp}".format(hp=hostport)) + headers.append(f"Host: {hostport}") # scheme indicates whether http or https is used in Origin # The same approach is used in parse_url of _url.py to set default port scheme, url = url.split(":", 1) if not options.get("suppress_origin"): if "origin" in options and options["origin"] is not None: - headers.append("Origin: {origin}".format(origin=options["origin"])) + headers.append(f'Origin: {options["origin"]}') elif scheme == "wss": - headers.append("Origin: https://{hp}".format(hp=hostport)) + headers.append(f"Origin: https://{hostport}") else: - headers.append("Origin: http://{hp}".format(hp=hostport)) + headers.append(f"Origin: http://{hostport}") key = _create_sec_websocket_key() # Append Sec-WebSocket-Key & Sec-WebSocket-Version if not manually specified - if not options.get('header') or 'Sec-WebSocket-Key' not in options['header']: - headers.append("Sec-WebSocket-Key: {key}".format(key=key)) + if not options.get("header") or "Sec-WebSocket-Key" not in options["header"]: + headers.append(f"Sec-WebSocket-Key: {key}") else: - key = options['header']['Sec-WebSocket-Key'] + key = options["header"]["Sec-WebSocket-Key"] - if not options.get('header') or 'Sec-WebSocket-Version' not in options['header']: - headers.append("Sec-WebSocket-Version: {version}".format(version=VERSION)) + if not options.get("header") or "Sec-WebSocket-Version" not in options["header"]: + headers.append(f"Sec-WebSocket-Version: {VERSION}") - if not options.get('connection'): - headers.append('Connection: Upgrade') + if not options.get("connection"): + headers.append("Connection: Upgrade") else: - headers.append(options['connection']) + headers.append(options["connection"]) - subprotocols = options.get("subprotocols") - if subprotocols: - headers.append("Sec-WebSocket-Protocol: {protocols}".format(protocols=",".join(subprotocols))) + if subprotocols := options.get("subprotocols"): + headers.append(f'Sec-WebSocket-Protocol: {",".join(subprotocols)}') - header = options.get("header") - if header: + if header := options.get("header"): if isinstance(header, dict): - header = [ - ": ".join([k, v]) - for k, v in header.items() - if v is not None - ] + header = [": ".join([k, v]) for k, v in header.items() if v is not None] headers.extend(header) server_cookie = CookieJar.get(host) client_cookie = options.get("cookie", None) - cookie = "; ".join(filter(None, [server_cookie, client_cookie])) - - if cookie: - headers.append("Cookie: {cookie}".format(cookie=cookie)) + if cookie := "; ".join(filter(None, [server_cookie, client_cookie])): + headers.append(f"Cookie: {cookie}") headers.extend(("", "")) return headers, key @@ -142,12 +140,20 @@ def _get_handshake_headers(resource: str, url: str, host: str, port: int, option def _get_resp_headers(sock, success_statuses: tuple = SUCCESS_STATUSES) -> tuple: status, resp_headers, status_message = read_headers(sock) if status not in success_statuses: - content_len = resp_headers.get('content-length') + content_len = resp_headers.get("content-length") if content_len: - response_body = sock.recv(int(content_len)) # read the body of the HTTP error message response and include it in the exception + response_body = sock.recv( + int(content_len) + ) # read the body of the HTTP error message response and include it in the exception else: response_body = None - raise WebSocketBadStatusException("Handshake status {status} {message} -+-+- {headers} -+-+- {body}".format(status=status, message=status_message, headers=resp_headers, body=response_body), status, status_message, resp_headers, response_body) + raise WebSocketBadStatusException( + f"Handshake status {status} {status_message} -+-+- {resp_headers} -+-+- {response_body}", + status, + status_message, + resp_headers, + response_body, + ) return status, resp_headers @@ -157,20 +163,20 @@ _HEADERS_TO_CHECK = { } -def _validate(headers, key: str, subprotocols): +def _validate(headers, key: str, subprotocols) -> tuple: subproto = None for k, v in _HEADERS_TO_CHECK.items(): r = headers.get(k, None) if not r: return False, None - r = [x.strip().lower() for x in r.split(',')] + r = [x.strip().lower() for x in r.split(",")] if v not in r: return False, None if subprotocols: subproto = headers.get("sec-websocket-protocol", None) if not subproto or subproto.lower() not in [s.lower() for s in subprotocols]: - error("Invalid subprotocol: " + str(subprotocols)) + error(f"Invalid subprotocol: {subprotocols}") return False, None subproto = subproto.lower() @@ -180,13 +186,12 @@ def _validate(headers, key: str, subprotocols): result = result.lower() if isinstance(result, str): - result = result.encode('utf-8') + result = result.encode("utf-8") - value = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").encode('utf-8') + value = f"{key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11".encode("utf-8") hashed = base64encode(hashlib.sha1(value).digest()).strip().lower() - success = hmac.compare_digest(hashed, result) - if success: + if hmac.compare_digest(hashed, result): return True, subproto else: return False, None @@ -194,4 +199,4 @@ def _validate(headers, key: str, subprotocols): def _create_sec_websocket_key() -> str: randomness = os.urandom(16) - return base64encode(randomness).decode('utf-8').strip() + return base64encode(randomness).decode("utf-8").strip() |