aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/websocket-client/py3/websocket/_handshake.py
diff options
context:
space:
mode:
authorAlexSm <alex@ydb.tech>2023-12-21 15:05:38 +0100
committerGitHub <noreply@github.com>2023-12-21 15:05:38 +0100
commite98bcbc74422492351c51646dba3849a138a8ffc (patch)
tree38ad7a09b1f9c201ce8a7e3d69f2017388769224 /contrib/python/websocket-client/py3/websocket/_handshake.py
parent559d7083cd8378cb25b9e966dedcca21d413e338 (diff)
downloadydb-e98bcbc74422492351c51646dba3849a138a8ffc.tar.gz
Import libs 1 (#590)
* Import libs 1 * Add new file without extension * Add file missed in export config
Diffstat (limited to 'contrib/python/websocket-client/py3/websocket/_handshake.py')
-rw-r--r--contrib/python/websocket-client/py3/websocket/_handshake.py109
1 files changed, 57 insertions, 52 deletions
diff --git a/contrib/python/websocket-client/py3/websocket/_handshake.py b/contrib/python/websocket-client/py3/websocket/_handshake.py
index a94d3030c3..e63dc97931 100644
--- a/contrib/python/websocket-client/py3/websocket/_handshake.py
+++ b/contrib/python/websocket-client/py3/websocket/_handshake.py
@@ -20,7 +20,8 @@ import hashlib
import hmac
import os
from base64 import encodebytes as base64encode
-from http import client as HTTPStatus
+from http import HTTPStatus
+
from ._cookiejar import SimpleCookieJar
from ._exceptions import *
from ._http import *
@@ -32,14 +33,19 @@ __all__ = ["handshake_response", "handshake", "SUPPORTED_REDIRECT_STATUSES"]
# websocket supported version.
VERSION = 13
-SUPPORTED_REDIRECT_STATUSES = (HTTPStatus.MOVED_PERMANENTLY, HTTPStatus.FOUND, HTTPStatus.SEE_OTHER, HTTPStatus.TEMPORARY_REDIRECT, HTTPStatus.PERMANENT_REDIRECT)
+SUPPORTED_REDIRECT_STATUSES = (
+ HTTPStatus.MOVED_PERMANENTLY,
+ HTTPStatus.FOUND,
+ HTTPStatus.SEE_OTHER,
+ HTTPStatus.TEMPORARY_REDIRECT,
+ HTTPStatus.PERMANENT_REDIRECT,
+)
SUCCESS_STATUSES = SUPPORTED_REDIRECT_STATUSES + (HTTPStatus.SWITCHING_PROTOCOLS,)
CookieJar = SimpleCookieJar()
class handshake_response:
-
def __init__(self, status: int, headers: dict, subprotocol):
self.status = status
self.headers = headers
@@ -47,7 +53,9 @@ class handshake_response:
CookieJar.add(headers.get("set-cookie"))
-def handshake(sock, url: str, hostname: str, port: int, resource: str, **options):
+def handshake(
+ sock, url: str, hostname: str, port: int, resource: str, **options
+) -> handshake_response:
headers, key = _get_handshake_headers(resource, url, hostname, port, options)
header_str = "\r\n".join(headers)
@@ -66,74 +74,64 @@ def handshake(sock, url: str, hostname: str, port: int, resource: str, **options
def _pack_hostname(hostname: str) -> str:
# IPv6 address
- if ':' in hostname:
- return '[' + hostname + ']'
-
+ if ":" in hostname:
+ return f"[{hostname}]"
return hostname
-def _get_handshake_headers(resource: str, url: str, host: str, port: int, options: dict):
- headers = [
- "GET {resource} HTTP/1.1".format(resource=resource),
- "Upgrade: websocket"
- ]
- if port == 80 or port == 443:
+def _get_handshake_headers(
+ resource: str, url: str, host: str, port: int, options: dict
+) -> tuple:
+ headers = [f"GET {resource} HTTP/1.1", "Upgrade: websocket"]
+ if port in [80, 443]:
hostport = _pack_hostname(host)
else:
- hostport = "{h}:{p}".format(h=_pack_hostname(host), p=port)
+ hostport = f"{_pack_hostname(host)}:{port}"
if options.get("host"):
- headers.append("Host: {h}".format(h=options["host"]))
+ headers.append(f'Host: {options["host"]}')
else:
- headers.append("Host: {hp}".format(hp=hostport))
+ headers.append(f"Host: {hostport}")
# scheme indicates whether http or https is used in Origin
# The same approach is used in parse_url of _url.py to set default port
scheme, url = url.split(":", 1)
if not options.get("suppress_origin"):
if "origin" in options and options["origin"] is not None:
- headers.append("Origin: {origin}".format(origin=options["origin"]))
+ headers.append(f'Origin: {options["origin"]}')
elif scheme == "wss":
- headers.append("Origin: https://{hp}".format(hp=hostport))
+ headers.append(f"Origin: https://{hostport}")
else:
- headers.append("Origin: http://{hp}".format(hp=hostport))
+ headers.append(f"Origin: http://{hostport}")
key = _create_sec_websocket_key()
# Append Sec-WebSocket-Key & Sec-WebSocket-Version if not manually specified
- if not options.get('header') or 'Sec-WebSocket-Key' not in options['header']:
- headers.append("Sec-WebSocket-Key: {key}".format(key=key))
+ if not options.get("header") or "Sec-WebSocket-Key" not in options["header"]:
+ headers.append(f"Sec-WebSocket-Key: {key}")
else:
- key = options['header']['Sec-WebSocket-Key']
+ key = options["header"]["Sec-WebSocket-Key"]
- if not options.get('header') or 'Sec-WebSocket-Version' not in options['header']:
- headers.append("Sec-WebSocket-Version: {version}".format(version=VERSION))
+ if not options.get("header") or "Sec-WebSocket-Version" not in options["header"]:
+ headers.append(f"Sec-WebSocket-Version: {VERSION}")
- if not options.get('connection'):
- headers.append('Connection: Upgrade')
+ if not options.get("connection"):
+ headers.append("Connection: Upgrade")
else:
- headers.append(options['connection'])
+ headers.append(options["connection"])
- subprotocols = options.get("subprotocols")
- if subprotocols:
- headers.append("Sec-WebSocket-Protocol: {protocols}".format(protocols=",".join(subprotocols)))
+ if subprotocols := options.get("subprotocols"):
+ headers.append(f'Sec-WebSocket-Protocol: {",".join(subprotocols)}')
- header = options.get("header")
- if header:
+ if header := options.get("header"):
if isinstance(header, dict):
- header = [
- ": ".join([k, v])
- for k, v in header.items()
- if v is not None
- ]
+ header = [": ".join([k, v]) for k, v in header.items() if v is not None]
headers.extend(header)
server_cookie = CookieJar.get(host)
client_cookie = options.get("cookie", None)
- cookie = "; ".join(filter(None, [server_cookie, client_cookie]))
-
- if cookie:
- headers.append("Cookie: {cookie}".format(cookie=cookie))
+ if cookie := "; ".join(filter(None, [server_cookie, client_cookie])):
+ headers.append(f"Cookie: {cookie}")
headers.extend(("", ""))
return headers, key
@@ -142,12 +140,20 @@ def _get_handshake_headers(resource: str, url: str, host: str, port: int, option
def _get_resp_headers(sock, success_statuses: tuple = SUCCESS_STATUSES) -> tuple:
status, resp_headers, status_message = read_headers(sock)
if status not in success_statuses:
- content_len = resp_headers.get('content-length')
+ content_len = resp_headers.get("content-length")
if content_len:
- response_body = sock.recv(int(content_len)) # read the body of the HTTP error message response and include it in the exception
+ response_body = sock.recv(
+ int(content_len)
+ ) # read the body of the HTTP error message response and include it in the exception
else:
response_body = None
- raise WebSocketBadStatusException("Handshake status {status} {message} -+-+- {headers} -+-+- {body}".format(status=status, message=status_message, headers=resp_headers, body=response_body), status, status_message, resp_headers, response_body)
+ raise WebSocketBadStatusException(
+ f"Handshake status {status} {status_message} -+-+- {resp_headers} -+-+- {response_body}",
+ status,
+ status_message,
+ resp_headers,
+ response_body,
+ )
return status, resp_headers
@@ -157,20 +163,20 @@ _HEADERS_TO_CHECK = {
}
-def _validate(headers, key: str, subprotocols):
+def _validate(headers, key: str, subprotocols) -> tuple:
subproto = None
for k, v in _HEADERS_TO_CHECK.items():
r = headers.get(k, None)
if not r:
return False, None
- r = [x.strip().lower() for x in r.split(',')]
+ r = [x.strip().lower() for x in r.split(",")]
if v not in r:
return False, None
if subprotocols:
subproto = headers.get("sec-websocket-protocol", None)
if not subproto or subproto.lower() not in [s.lower() for s in subprotocols]:
- error("Invalid subprotocol: " + str(subprotocols))
+ error(f"Invalid subprotocol: {subprotocols}")
return False, None
subproto = subproto.lower()
@@ -180,13 +186,12 @@ def _validate(headers, key: str, subprotocols):
result = result.lower()
if isinstance(result, str):
- result = result.encode('utf-8')
+ result = result.encode("utf-8")
- value = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").encode('utf-8')
+ value = f"{key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11".encode("utf-8")
hashed = base64encode(hashlib.sha1(value).digest()).strip().lower()
- success = hmac.compare_digest(hashed, result)
- if success:
+ if hmac.compare_digest(hashed, result):
return True, subproto
else:
return False, None
@@ -194,4 +199,4 @@ def _validate(headers, key: str, subprotocols):
def _create_sec_websocket_key() -> str:
randomness = os.urandom(16)
- return base64encode(randomness).decode('utf-8').strip()
+ return base64encode(randomness).decode("utf-8").strip()