diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-12-09 18:25:21 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-12-09 19:18:57 +0300 |
commit | 13374e0884578812cda7697d0c5680122db59a37 (patch) | |
tree | 30a022eb841035299deb2b8c393b2902f0c21735 /contrib/python/websocket-client/py3/websocket/_wsdump.py | |
parent | c7ade6d3bf7cd492235a61b77153351e422a28f3 (diff) | |
download | ydb-13374e0884578812cda7697d0c5680122db59a37.tar.gz |
Intermediate changes
commit_hash:034150f557268506d7bc0cbd8b5becf65f765593
Diffstat (limited to 'contrib/python/websocket-client/py3/websocket/_wsdump.py')
-rw-r--r-- | contrib/python/websocket-client/py3/websocket/_wsdump.py | 244 |
1 files changed, 0 insertions, 244 deletions
diff --git a/contrib/python/websocket-client/py3/websocket/_wsdump.py b/contrib/python/websocket-client/py3/websocket/_wsdump.py deleted file mode 100644 index d4d76dc509e..00000000000 --- a/contrib/python/websocket-client/py3/websocket/_wsdump.py +++ /dev/null @@ -1,244 +0,0 @@ -#!/usr/bin/env python3 - -""" -wsdump.py -websocket - WebSocket client library for Python - -Copyright 2024 engn33r - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import argparse -import code -import gzip -import ssl -import sys -import threading -import time -import zlib -from urllib.parse import urlparse - -import websocket - -try: - import readline -except ImportError: - pass - - -def get_encoding() -> str: - encoding = getattr(sys.stdin, "encoding", "") - if not encoding: - return "utf-8" - else: - return encoding.lower() - - -OPCODE_DATA = (websocket.ABNF.OPCODE_TEXT, websocket.ABNF.OPCODE_BINARY) -ENCODING = get_encoding() - - -class VAction(argparse.Action): - def __call__( - self, - parser: argparse.Namespace, - args: tuple, - values: str, - option_string: str = None, - ) -> None: - if values is None: - values = "1" - try: - values = int(values) - except ValueError: - values = values.count("v") + 1 - setattr(args, self.dest, values) - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="WebSocket Simple Dump Tool") - parser.add_argument( - "url", metavar="ws_url", help="websocket url. ex. ws://echo.websocket.events/" - ) - parser.add_argument("-p", "--proxy", help="proxy url. ex. http://127.0.0.1:8080") - parser.add_argument( - "-v", - "--verbose", - default=0, - nargs="?", - action=VAction, - dest="verbose", - help="set verbose mode. If set to 1, show opcode. " - "If set to 2, enable to trace websocket module", - ) - parser.add_argument( - "-n", "--nocert", action="store_true", help="Ignore invalid SSL cert" - ) - parser.add_argument("-r", "--raw", action="store_true", help="raw output") - parser.add_argument("-s", "--subprotocols", nargs="*", help="Set subprotocols") - parser.add_argument("-o", "--origin", help="Set origin") - parser.add_argument( - "--eof-wait", - default=0, - type=int, - help="wait time(second) after 'EOF' received.", - ) - parser.add_argument("-t", "--text", help="Send initial text") - parser.add_argument( - "--timings", action="store_true", help="Print timings in seconds" - ) - parser.add_argument("--headers", help="Set custom headers. Use ',' as separator") - - return parser.parse_args() - - -class RawInput: - def raw_input(self, prompt: str = "") -> str: - line = input(prompt) - - if ENCODING and ENCODING != "utf-8" and not isinstance(line, str): - line = line.decode(ENCODING).encode("utf-8") - elif isinstance(line, str): - line = line.encode("utf-8") - - return line - - -class InteractiveConsole(RawInput, code.InteractiveConsole): - def write(self, data: str) -> None: - sys.stdout.write("\033[2K\033[E") - # sys.stdout.write("\n") - sys.stdout.write("\033[34m< " + data + "\033[39m") - sys.stdout.write("\n> ") - sys.stdout.flush() - - def read(self) -> str: - return self.raw_input("> ") - - -class NonInteractive(RawInput): - def write(self, data: str) -> None: - sys.stdout.write(data) - sys.stdout.write("\n") - sys.stdout.flush() - - def read(self) -> str: - return self.raw_input("") - - -def main() -> None: - start_time = time.time() - args = parse_args() - if args.verbose > 1: - websocket.enableTrace(True) - options = {} - if args.proxy: - p = urlparse(args.proxy) - options["http_proxy_host"] = p.hostname - options["http_proxy_port"] = p.port - if args.origin: - options["origin"] = args.origin - if args.subprotocols: - options["subprotocols"] = args.subprotocols - opts = {} - if args.nocert: - opts = {"cert_reqs": ssl.CERT_NONE, "check_hostname": False} - if args.headers: - options["header"] = list(map(str.strip, args.headers.split(","))) - ws = websocket.create_connection(args.url, sslopt=opts, **options) - if args.raw: - console = NonInteractive() - else: - console = InteractiveConsole() - print("Press Ctrl+C to quit") - - def recv() -> tuple: - try: - frame = ws.recv_frame() - except websocket.WebSocketException: - return websocket.ABNF.OPCODE_CLOSE, "" - if not frame: - raise websocket.WebSocketException(f"Not a valid frame {frame}") - elif frame.opcode in OPCODE_DATA: - return frame.opcode, frame.data - elif frame.opcode == websocket.ABNF.OPCODE_CLOSE: - ws.send_close() - return frame.opcode, "" - elif frame.opcode == websocket.ABNF.OPCODE_PING: - ws.pong(frame.data) - return frame.opcode, frame.data - - return frame.opcode, frame.data - - def recv_ws() -> None: - while True: - opcode, data = recv() - msg = None - if opcode == websocket.ABNF.OPCODE_TEXT and isinstance(data, bytes): - data = str(data, "utf-8") - if ( - isinstance(data, bytes) and len(data) > 2 and data[:2] == b"\037\213" - ): # gzip magick - try: - data = "[gzip] " + str(gzip.decompress(data), "utf-8") - except: - pass - elif isinstance(data, bytes): - try: - data = "[zlib] " + str( - zlib.decompress(data, -zlib.MAX_WBITS), "utf-8" - ) - except: - pass - - if isinstance(data, bytes): - data = repr(data) - - if args.verbose: - msg = f"{websocket.ABNF.OPCODE_MAP.get(opcode)}: {data}" - else: - msg = data - - if msg is not None: - if args.timings: - console.write(f"{time.time() - start_time}: {msg}") - else: - console.write(msg) - - if opcode == websocket.ABNF.OPCODE_CLOSE: - break - - thread = threading.Thread(target=recv_ws) - thread.daemon = True - thread.start() - - if args.text: - ws.send(args.text) - - while True: - try: - message = console.read() - ws.send(message) - except KeyboardInterrupt: - return - except EOFError: - time.sleep(args.eof_wait) - return - - -if __name__ == "__main__": - try: - main() - except Exception as e: - print(e) |