summaryrefslogtreecommitdiffstats
path: root/contrib/python/websocket-client/websocket/_url.py
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/python/websocket-client/websocket/_url.py')
-rw-r--r--contrib/python/websocket-client/websocket/_url.py190
1 files changed, 190 insertions, 0 deletions
diff --git a/contrib/python/websocket-client/websocket/_url.py b/contrib/python/websocket-client/websocket/_url.py
new file mode 100644
index 00000000000..902131710ba
--- /dev/null
+++ b/contrib/python/websocket-client/websocket/_url.py
@@ -0,0 +1,190 @@
+import os
+import socket
+import struct
+from typing import Optional
+from urllib.parse import unquote, urlparse
+from ._exceptions import WebSocketProxyException
+
+"""
+_url.py
+websocket - WebSocket client library for Python
+
+Copyright 2024 engn33r
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+__all__ = ["parse_url", "get_proxy_info"]
+
+
+def parse_url(url: str) -> tuple:
+ """
+ parse url and the result is tuple of
+ (hostname, port, resource path and the flag of secure mode)
+
+ Parameters
+ ----------
+ url: str
+ url string.
+ """
+ if ":" not in url:
+ raise ValueError("url is invalid")
+
+ scheme, url = url.split(":", 1)
+
+ parsed = urlparse(url, scheme="http")
+ if parsed.hostname:
+ hostname = parsed.hostname
+ else:
+ raise ValueError("hostname is invalid")
+ port = 0
+ if parsed.port:
+ port = parsed.port
+
+ is_secure = False
+ if scheme == "ws":
+ if not port:
+ port = 80
+ elif scheme == "wss":
+ is_secure = True
+ if not port:
+ port = 443
+ else:
+ raise ValueError("scheme %s is invalid" % scheme)
+
+ if parsed.path:
+ resource = parsed.path
+ else:
+ resource = "/"
+
+ if parsed.query:
+ resource += f"?{parsed.query}"
+
+ return hostname, port, resource, is_secure
+
+
+DEFAULT_NO_PROXY_HOST = ["localhost", "127.0.0.1"]
+
+
+def _is_ip_address(addr: str) -> bool:
+ try:
+ socket.inet_aton(addr)
+ except socket.error:
+ return False
+ else:
+ return True
+
+
+def _is_subnet_address(hostname: str) -> bool:
+ try:
+ addr, netmask = hostname.split("/")
+ return _is_ip_address(addr) and 0 <= int(netmask) < 32
+ except ValueError:
+ return False
+
+
+def _is_address_in_network(ip: str, net: str) -> bool:
+ ipaddr: int = struct.unpack("!I", socket.inet_aton(ip))[0]
+ netaddr, netmask = net.split("/")
+ netaddr: int = struct.unpack("!I", socket.inet_aton(netaddr))[0]
+
+ netmask = (0xFFFFFFFF << (32 - int(netmask))) & 0xFFFFFFFF
+ return ipaddr & netmask == netaddr
+
+
+def _is_no_proxy_host(hostname: str, no_proxy: Optional[list]) -> bool:
+ if not no_proxy:
+ if v := os.environ.get("no_proxy", os.environ.get("NO_PROXY", "")).replace(
+ " ", ""
+ ):
+ no_proxy = v.split(",")
+ if not no_proxy:
+ no_proxy = DEFAULT_NO_PROXY_HOST
+
+ if "*" in no_proxy:
+ return True
+ if hostname in no_proxy:
+ return True
+ if _is_ip_address(hostname):
+ return any(
+ [
+ _is_address_in_network(hostname, subnet)
+ for subnet in no_proxy
+ if _is_subnet_address(subnet)
+ ]
+ )
+ for domain in [domain for domain in no_proxy if domain.startswith(".")]:
+ if hostname.endswith(domain):
+ return True
+ return False
+
+
+def get_proxy_info(
+ hostname: str,
+ is_secure: bool,
+ proxy_host: Optional[str] = None,
+ proxy_port: int = 0,
+ proxy_auth: Optional[tuple] = None,
+ no_proxy: Optional[list] = None,
+ proxy_type: str = "http",
+) -> tuple:
+ """
+ Try to retrieve proxy host and port from environment
+ if not provided in options.
+ Result is (proxy_host, proxy_port, proxy_auth).
+ proxy_auth is tuple of username and password
+ of proxy authentication information.
+
+ Parameters
+ ----------
+ hostname: str
+ Websocket server name.
+ is_secure: bool
+ Is the connection secure? (wss) looks for "https_proxy" in env
+ instead of "http_proxy"
+ proxy_host: str
+ http proxy host name.
+ proxy_port: str or int
+ http proxy port.
+ no_proxy: list
+ Whitelisted host names that don't use the proxy.
+ proxy_auth: tuple
+ HTTP proxy auth information. Tuple of username and password. Default is None.
+ proxy_type: str
+ Specify the proxy protocol (http, socks4, socks4a, socks5, socks5h). Default is "http".
+ Use socks4a or socks5h if you want to send DNS requests through the proxy.
+ """
+ if _is_no_proxy_host(hostname, no_proxy):
+ return None, 0, None
+
+ if proxy_host:
+ if not proxy_port:
+ raise WebSocketProxyException("Cannot use port 0 when proxy_host specified")
+ port = proxy_port
+ auth = proxy_auth
+ return proxy_host, port, auth
+
+ env_key = "https_proxy" if is_secure else "http_proxy"
+ value = os.environ.get(env_key, os.environ.get(env_key.upper(), "")).replace(
+ " ", ""
+ )
+ if value:
+ proxy = urlparse(value)
+ auth = (
+ (unquote(proxy.username), unquote(proxy.password))
+ if proxy.username
+ else None
+ )
+ return proxy.hostname, proxy.port, auth
+
+ return None, 0, None