aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/websocket-client/py3/websocket/_url.py
diff options
context:
space:
mode:
authorAlexSm <alex@ydb.tech>2023-12-21 15:05:38 +0100
committerGitHub <noreply@github.com>2023-12-21 15:05:38 +0100
commite98bcbc74422492351c51646dba3849a138a8ffc (patch)
tree38ad7a09b1f9c201ce8a7e3d69f2017388769224 /contrib/python/websocket-client/py3/websocket/_url.py
parent559d7083cd8378cb25b9e966dedcca21d413e338 (diff)
downloadydb-e98bcbc74422492351c51646dba3849a138a8ffc.tar.gz
Import libs 1 (#590)
* Import libs 1 * Add new file without extension * Add file missed in export config
Diffstat (limited to 'contrib/python/websocket-client/py3/websocket/_url.py')
-rw-r--r--contrib/python/websocket-client/py3/websocket/_url.py46
1 files changed, 32 insertions, 14 deletions
diff --git a/contrib/python/websocket-client/py3/websocket/_url.py b/contrib/python/websocket-client/py3/websocket/_url.py
index a330615485..7d53830e19 100644
--- a/contrib/python/websocket-client/py3/websocket/_url.py
+++ b/contrib/python/websocket-client/py3/websocket/_url.py
@@ -1,7 +1,6 @@
import os
import socket
import struct
-
from typing import Optional
from urllib.parse import unquote, urlparse
@@ -68,7 +67,7 @@ def parse_url(url: str) -> tuple:
resource = "/"
if parsed.query:
- resource += "?" + parsed.query
+ resource += f"?{parsed.query}"
return hostname, port, resource, is_secure
@@ -94,9 +93,9 @@ def _is_subnet_address(hostname: str) -> bool:
def _is_address_in_network(ip: str, net: str) -> bool:
- ipaddr = struct.unpack('!I', socket.inet_aton(ip))[0]
- netaddr, netmask = net.split('/')
- netaddr = struct.unpack('!I', socket.inet_aton(netaddr))[0]
+ ipaddr: int = struct.unpack("!I", socket.inet_aton(ip))[0]
+ netaddr, netmask = net.split("/")
+ netaddr: int = struct.unpack("!I", socket.inet_aton(netaddr))[0]
netmask = (0xFFFFFFFF << (32 - int(netmask))) & 0xFFFFFFFF
return ipaddr & netmask == netaddr
@@ -104,27 +103,40 @@ def _is_address_in_network(ip: str, net: str) -> bool:
def _is_no_proxy_host(hostname: str, no_proxy: Optional[list]) -> bool:
if not no_proxy:
- v = os.environ.get("no_proxy", os.environ.get("NO_PROXY", "")).replace(" ", "")
- if v:
+ if v := os.environ.get("no_proxy", os.environ.get("NO_PROXY", "")).replace(
+ " ", ""
+ ):
no_proxy = v.split(",")
if not no_proxy:
no_proxy = DEFAULT_NO_PROXY_HOST
- if '*' in no_proxy:
+ if "*" in no_proxy:
return True
if hostname in no_proxy:
return True
if _is_ip_address(hostname):
- return any([_is_address_in_network(hostname, subnet) for subnet in no_proxy if _is_subnet_address(subnet)])
- for domain in [domain for domain in no_proxy if domain.startswith('.')]:
+ return any(
+ [
+ _is_address_in_network(hostname, subnet)
+ for subnet in no_proxy
+ if _is_subnet_address(subnet)
+ ]
+ )
+ for domain in [domain for domain in no_proxy if domain.startswith(".")]:
if hostname.endswith(domain):
return True
return False
def get_proxy_info(
- hostname: str, is_secure: bool, proxy_host: Optional[str] = None, proxy_port: int = 0, proxy_auth: Optional[tuple] = None,
- no_proxy: Optional[list] = None, proxy_type: str = 'http') -> tuple:
+ hostname: str,
+ is_secure: bool,
+ proxy_host: Optional[str] = None,
+ proxy_port: int = 0,
+ proxy_auth: Optional[tuple] = None,
+ no_proxy: Optional[list] = None,
+ proxy_type: str = "http",
+) -> tuple:
"""
Try to retrieve proxy host and port from environment
if not provided in options.
@@ -160,10 +172,16 @@ def get_proxy_info(
return proxy_host, port, auth
env_key = "https_proxy" if is_secure else "http_proxy"
- value = os.environ.get(env_key, os.environ.get(env_key.upper(), "")).replace(" ", "")
+ value = os.environ.get(env_key, os.environ.get(env_key.upper(), "")).replace(
+ " ", ""
+ )
if value:
proxy = urlparse(value)
- auth = (unquote(proxy.username), unquote(proxy.password)) if proxy.username else None
+ auth = (
+ (unquote(proxy.username), unquote(proxy.password))
+ if proxy.username
+ else None
+ )
return proxy.hostname, proxy.port, auth
return None, 0, None