diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-12-09 18:25:21 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-12-09 19:18:57 +0300 |
commit | 13374e0884578812cda7697d0c5680122db59a37 (patch) | |
tree | 30a022eb841035299deb2b8c393b2902f0c21735 /contrib/python/websocket-client/websocket/_wsdump.py | |
parent | c7ade6d3bf7cd492235a61b77153351e422a28f3 (diff) | |
download | ydb-13374e0884578812cda7697d0c5680122db59a37.tar.gz |
Intermediate changes
commit_hash:034150f557268506d7bc0cbd8b5becf65f765593
Diffstat (limited to 'contrib/python/websocket-client/websocket/_wsdump.py')
-rw-r--r-- | contrib/python/websocket-client/websocket/_wsdump.py | 244 |
1 files changed, 244 insertions, 0 deletions
diff --git a/contrib/python/websocket-client/websocket/_wsdump.py b/contrib/python/websocket-client/websocket/_wsdump.py new file mode 100644 index 0000000000..d4d76dc509 --- /dev/null +++ b/contrib/python/websocket-client/websocket/_wsdump.py @@ -0,0 +1,244 @@ +#!/usr/bin/env python3 + +""" +wsdump.py +websocket - WebSocket client library for Python + +Copyright 2024 engn33r + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import argparse +import code +import gzip +import ssl +import sys +import threading +import time +import zlib +from urllib.parse import urlparse + +import websocket + +try: + import readline +except ImportError: + pass + + +def get_encoding() -> str: + encoding = getattr(sys.stdin, "encoding", "") + if not encoding: + return "utf-8" + else: + return encoding.lower() + + +OPCODE_DATA = (websocket.ABNF.OPCODE_TEXT, websocket.ABNF.OPCODE_BINARY) +ENCODING = get_encoding() + + +class VAction(argparse.Action): + def __call__( + self, + parser: argparse.Namespace, + args: tuple, + values: str, + option_string: str = None, + ) -> None: + if values is None: + values = "1" + try: + values = int(values) + except ValueError: + values = values.count("v") + 1 + setattr(args, self.dest, values) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="WebSocket Simple Dump Tool") + parser.add_argument( + "url", metavar="ws_url", help="websocket url. ex. ws://echo.websocket.events/" + ) + parser.add_argument("-p", "--proxy", help="proxy url. ex. http://127.0.0.1:8080") + parser.add_argument( + "-v", + "--verbose", + default=0, + nargs="?", + action=VAction, + dest="verbose", + help="set verbose mode. If set to 1, show opcode. " + "If set to 2, enable to trace websocket module", + ) + parser.add_argument( + "-n", "--nocert", action="store_true", help="Ignore invalid SSL cert" + ) + parser.add_argument("-r", "--raw", action="store_true", help="raw output") + parser.add_argument("-s", "--subprotocols", nargs="*", help="Set subprotocols") + parser.add_argument("-o", "--origin", help="Set origin") + parser.add_argument( + "--eof-wait", + default=0, + type=int, + help="wait time(second) after 'EOF' received.", + ) + parser.add_argument("-t", "--text", help="Send initial text") + parser.add_argument( + "--timings", action="store_true", help="Print timings in seconds" + ) + parser.add_argument("--headers", help="Set custom headers. Use ',' as separator") + + return parser.parse_args() + + +class RawInput: + def raw_input(self, prompt: str = "") -> str: + line = input(prompt) + + if ENCODING and ENCODING != "utf-8" and not isinstance(line, str): + line = line.decode(ENCODING).encode("utf-8") + elif isinstance(line, str): + line = line.encode("utf-8") + + return line + + +class InteractiveConsole(RawInput, code.InteractiveConsole): + def write(self, data: str) -> None: + sys.stdout.write("\033[2K\033[E") + # sys.stdout.write("\n") + sys.stdout.write("\033[34m< " + data + "\033[39m") + sys.stdout.write("\n> ") + sys.stdout.flush() + + def read(self) -> str: + return self.raw_input("> ") + + +class NonInteractive(RawInput): + def write(self, data: str) -> None: + sys.stdout.write(data) + sys.stdout.write("\n") + sys.stdout.flush() + + def read(self) -> str: + return self.raw_input("") + + +def main() -> None: + start_time = time.time() + args = parse_args() + if args.verbose > 1: + websocket.enableTrace(True) + options = {} + if args.proxy: + p = urlparse(args.proxy) + options["http_proxy_host"] = p.hostname + options["http_proxy_port"] = p.port + if args.origin: + options["origin"] = args.origin + if args.subprotocols: + options["subprotocols"] = args.subprotocols + opts = {} + if args.nocert: + opts = {"cert_reqs": ssl.CERT_NONE, "check_hostname": False} + if args.headers: + options["header"] = list(map(str.strip, args.headers.split(","))) + ws = websocket.create_connection(args.url, sslopt=opts, **options) + if args.raw: + console = NonInteractive() + else: + console = InteractiveConsole() + print("Press Ctrl+C to quit") + + def recv() -> tuple: + try: + frame = ws.recv_frame() + except websocket.WebSocketException: + return websocket.ABNF.OPCODE_CLOSE, "" + if not frame: + raise websocket.WebSocketException(f"Not a valid frame {frame}") + elif frame.opcode in OPCODE_DATA: + return frame.opcode, frame.data + elif frame.opcode == websocket.ABNF.OPCODE_CLOSE: + ws.send_close() + return frame.opcode, "" + elif frame.opcode == websocket.ABNF.OPCODE_PING: + ws.pong(frame.data) + return frame.opcode, frame.data + + return frame.opcode, frame.data + + def recv_ws() -> None: + while True: + opcode, data = recv() + msg = None + if opcode == websocket.ABNF.OPCODE_TEXT and isinstance(data, bytes): + data = str(data, "utf-8") + if ( + isinstance(data, bytes) and len(data) > 2 and data[:2] == b"\037\213" + ): # gzip magick + try: + data = "[gzip] " + str(gzip.decompress(data), "utf-8") + except: + pass + elif isinstance(data, bytes): + try: + data = "[zlib] " + str( + zlib.decompress(data, -zlib.MAX_WBITS), "utf-8" + ) + except: + pass + + if isinstance(data, bytes): + data = repr(data) + + if args.verbose: + msg = f"{websocket.ABNF.OPCODE_MAP.get(opcode)}: {data}" + else: + msg = data + + if msg is not None: + if args.timings: + console.write(f"{time.time() - start_time}: {msg}") + else: + console.write(msg) + + if opcode == websocket.ABNF.OPCODE_CLOSE: + break + + thread = threading.Thread(target=recv_ws) + thread.daemon = True + thread.start() + + if args.text: + ws.send(args.text) + + while True: + try: + message = console.read() + ws.send(message) + except KeyboardInterrupt: + return + except EOFError: + time.sleep(args.eof_wait) + return + + +if __name__ == "__main__": + try: + main() + except Exception as e: + print(e) |