aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/websocket-client/py3/websocket/_wsdump.py
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-12-09 18:25:21 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-12-09 19:18:57 +0300
commit13374e0884578812cda7697d0c5680122db59a37 (patch)
tree30a022eb841035299deb2b8c393b2902f0c21735 /contrib/python/websocket-client/py3/websocket/_wsdump.py
parentc7ade6d3bf7cd492235a61b77153351e422a28f3 (diff)
downloadydb-13374e0884578812cda7697d0c5680122db59a37.tar.gz
Intermediate changes
commit_hash:034150f557268506d7bc0cbd8b5becf65f765593
Diffstat (limited to 'contrib/python/websocket-client/py3/websocket/_wsdump.py')
-rw-r--r--contrib/python/websocket-client/py3/websocket/_wsdump.py244
1 files changed, 0 insertions, 244 deletions
diff --git a/contrib/python/websocket-client/py3/websocket/_wsdump.py b/contrib/python/websocket-client/py3/websocket/_wsdump.py
deleted file mode 100644
index d4d76dc509e..00000000000
--- a/contrib/python/websocket-client/py3/websocket/_wsdump.py
+++ /dev/null
@@ -1,244 +0,0 @@
-#!/usr/bin/env python3
-
-"""
-wsdump.py
-websocket - WebSocket client library for Python
-
-Copyright 2024 engn33r
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-"""
-
-import argparse
-import code
-import gzip
-import ssl
-import sys
-import threading
-import time
-import zlib
-from urllib.parse import urlparse
-
-import websocket
-
-try:
- import readline
-except ImportError:
- pass
-
-
-def get_encoding() -> str:
- encoding = getattr(sys.stdin, "encoding", "")
- if not encoding:
- return "utf-8"
- else:
- return encoding.lower()
-
-
-OPCODE_DATA = (websocket.ABNF.OPCODE_TEXT, websocket.ABNF.OPCODE_BINARY)
-ENCODING = get_encoding()
-
-
-class VAction(argparse.Action):
- def __call__(
- self,
- parser: argparse.Namespace,
- args: tuple,
- values: str,
- option_string: str = None,
- ) -> None:
- if values is None:
- values = "1"
- try:
- values = int(values)
- except ValueError:
- values = values.count("v") + 1
- setattr(args, self.dest, values)
-
-
-def parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(description="WebSocket Simple Dump Tool")
- parser.add_argument(
- "url", metavar="ws_url", help="websocket url. ex. ws://echo.websocket.events/"
- )
- parser.add_argument("-p", "--proxy", help="proxy url. ex. http://127.0.0.1:8080")
- parser.add_argument(
- "-v",
- "--verbose",
- default=0,
- nargs="?",
- action=VAction,
- dest="verbose",
- help="set verbose mode. If set to 1, show opcode. "
- "If set to 2, enable to trace websocket module",
- )
- parser.add_argument(
- "-n", "--nocert", action="store_true", help="Ignore invalid SSL cert"
- )
- parser.add_argument("-r", "--raw", action="store_true", help="raw output")
- parser.add_argument("-s", "--subprotocols", nargs="*", help="Set subprotocols")
- parser.add_argument("-o", "--origin", help="Set origin")
- parser.add_argument(
- "--eof-wait",
- default=0,
- type=int,
- help="wait time(second) after 'EOF' received.",
- )
- parser.add_argument("-t", "--text", help="Send initial text")
- parser.add_argument(
- "--timings", action="store_true", help="Print timings in seconds"
- )
- parser.add_argument("--headers", help="Set custom headers. Use ',' as separator")
-
- return parser.parse_args()
-
-
-class RawInput:
- def raw_input(self, prompt: str = "") -> str:
- line = input(prompt)
-
- if ENCODING and ENCODING != "utf-8" and not isinstance(line, str):
- line = line.decode(ENCODING).encode("utf-8")
- elif isinstance(line, str):
- line = line.encode("utf-8")
-
- return line
-
-
-class InteractiveConsole(RawInput, code.InteractiveConsole):
- def write(self, data: str) -> None:
- sys.stdout.write("\033[2K\033[E")
- # sys.stdout.write("\n")
- sys.stdout.write("\033[34m< " + data + "\033[39m")
- sys.stdout.write("\n> ")
- sys.stdout.flush()
-
- def read(self) -> str:
- return self.raw_input("> ")
-
-
-class NonInteractive(RawInput):
- def write(self, data: str) -> None:
- sys.stdout.write(data)
- sys.stdout.write("\n")
- sys.stdout.flush()
-
- def read(self) -> str:
- return self.raw_input("")
-
-
-def main() -> None:
- start_time = time.time()
- args = parse_args()
- if args.verbose > 1:
- websocket.enableTrace(True)
- options = {}
- if args.proxy:
- p = urlparse(args.proxy)
- options["http_proxy_host"] = p.hostname
- options["http_proxy_port"] = p.port
- if args.origin:
- options["origin"] = args.origin
- if args.subprotocols:
- options["subprotocols"] = args.subprotocols
- opts = {}
- if args.nocert:
- opts = {"cert_reqs": ssl.CERT_NONE, "check_hostname": False}
- if args.headers:
- options["header"] = list(map(str.strip, args.headers.split(",")))
- ws = websocket.create_connection(args.url, sslopt=opts, **options)
- if args.raw:
- console = NonInteractive()
- else:
- console = InteractiveConsole()
- print("Press Ctrl+C to quit")
-
- def recv() -> tuple:
- try:
- frame = ws.recv_frame()
- except websocket.WebSocketException:
- return websocket.ABNF.OPCODE_CLOSE, ""
- if not frame:
- raise websocket.WebSocketException(f"Not a valid frame {frame}")
- elif frame.opcode in OPCODE_DATA:
- return frame.opcode, frame.data
- elif frame.opcode == websocket.ABNF.OPCODE_CLOSE:
- ws.send_close()
- return frame.opcode, ""
- elif frame.opcode == websocket.ABNF.OPCODE_PING:
- ws.pong(frame.data)
- return frame.opcode, frame.data
-
- return frame.opcode, frame.data
-
- def recv_ws() -> None:
- while True:
- opcode, data = recv()
- msg = None
- if opcode == websocket.ABNF.OPCODE_TEXT and isinstance(data, bytes):
- data = str(data, "utf-8")
- if (
- isinstance(data, bytes) and len(data) > 2 and data[:2] == b"\037\213"
- ): # gzip magick
- try:
- data = "[gzip] " + str(gzip.decompress(data), "utf-8")
- except:
- pass
- elif isinstance(data, bytes):
- try:
- data = "[zlib] " + str(
- zlib.decompress(data, -zlib.MAX_WBITS), "utf-8"
- )
- except:
- pass
-
- if isinstance(data, bytes):
- data = repr(data)
-
- if args.verbose:
- msg = f"{websocket.ABNF.OPCODE_MAP.get(opcode)}: {data}"
- else:
- msg = data
-
- if msg is not None:
- if args.timings:
- console.write(f"{time.time() - start_time}: {msg}")
- else:
- console.write(msg)
-
- if opcode == websocket.ABNF.OPCODE_CLOSE:
- break
-
- thread = threading.Thread(target=recv_ws)
- thread.daemon = True
- thread.start()
-
- if args.text:
- ws.send(args.text)
-
- while True:
- try:
- message = console.read()
- ws.send(message)
- except KeyboardInterrupt:
- return
- except EOFError:
- time.sleep(args.eof_wait)
- return
-
-
-if __name__ == "__main__":
- try:
- main()
- except Exception as e:
- print(e)