diff options
author | alexv-smirnov <alex@ydb.tech> | 2023-10-18 11:29:37 +0300 |
---|---|---|
committer | alexv-smirnov <alex@ydb.tech> | 2023-10-18 13:06:09 +0300 |
commit | 9ae2b50e805245bff2d8be04123f4e7f08806324 (patch) | |
tree | d97eaff30d6ea46d9c957ddbb57573f9044b5523 /library/python | |
parent | 18dc72ed9a86762cd037f5e41fb79cec458b3c2c (diff) | |
download | ydb-9ae2b50e805245bff2d8be04123f4e7f08806324.tar.gz |
Move swag from devtools/ to library/python/testing/
Diffstat (limited to 'library/python')
-rw-r--r-- | library/python/testing/swag/daemon.py | 297 | ||||
-rw-r--r-- | library/python/testing/swag/gdb.py | 59 | ||||
-rw-r--r-- | library/python/testing/swag/lib/ya.make | 20 | ||||
-rw-r--r-- | library/python/testing/swag/pathutil.py | 26 | ||||
-rw-r--r-- | library/python/testing/swag/ports.py | 30 | ||||
-rw-r--r-- | library/python/testing/swag/proto_traversals.py | 74 |
6 files changed, 506 insertions, 0 deletions
diff --git a/library/python/testing/swag/daemon.py b/library/python/testing/swag/daemon.py new file mode 100644 index 0000000000..e59bedb5e7 --- /dev/null +++ b/library/python/testing/swag/daemon.py @@ -0,0 +1,297 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import os +import sys +import signal +import tempfile +import shutil + +try: + from . import gdb +except ValueError: + import gdb + +from yatest.common import process, output_path, TimeoutError, cores + +MAX_IO_LEN = 1024 * 10 +GYGABYTES = 1 << 30 + +logger = logging.getLogger(__name__) + + +def run_daemon(command, check_exit_code=True, shell=False, timeout=5, cwd=None, + env=None, stdin=None, stdout=None, stderr=None, creationflags=0): + daemon = Daemon(command, check_exit_code, shell, timeout, cwd, env, stdin, stdout, stderr, creationflags) + daemon.run() + return daemon + + +def get_free_space(path): + stats = os.statvfs(path) + return stats.f_bavail * stats.f_frsize + + +class DaemonError(RuntimeError): + def __init__(self, message, stdout=None, stderr=None, exit_code=None): + lst = [ + "Daemon failed with message: {message}.".format(message=message), + ] + if exit_code is not None: + lst.append( + "Process exit_code = {exit_code}.".format(exit_code=exit_code) + ) + if stdout is not None: + lst.append( + "Stdout: {stdout}".format(stdout=stdout) + ) + if stderr is not None: + lst.append( + "Stderr: {stderr}".format(stderr=stderr) + ) + + super(DaemonError, self).__init__('\n'.join(lst)) + + +class Daemon(object): + def __init__(self, command, check_exit_code=True, shell=False, timeout=5, cwd=None, + env=None, stdin=None, stdout=None, stderr=None, creationflags=0): + if cwd is None: + cwd = tempfile.mkdtemp() + self.cwd = cwd + + self.stdoutf = stdout or tempfile.NamedTemporaryFile(dir=self.cwd, prefix="stdout_", delete=False) + self.stderrf = stderr or tempfile.NamedTemporaryFile(dir=self.cwd, prefix="stderr_", delete=False) + self.stdinf = stdin or tempfile.NamedTemporaryFile(dir=self.cwd, prefix="stdin_", delete=False) + + self.cmd = command + if sys.version_info.major > 2: + _basestring = str + else: + _basestring = basestring + if isinstance(command, _basestring): + self.cmd = [arg for arg in command.split() if arg] + self.daemon = None + self.name = os.path.basename(self.cmd[0]) + + self._shell = shell + self._env = env + self._creationflags = creationflags + + self._check_exit_code = check_exit_code + self._timeout = timeout + + def before_start(self): + pass + + def after_start(self): + pass + + def before_stop(self): + pass + + def after_stop(self): + pass + + def is_alive(self): + return self.daemon and self.daemon.running + + def required_args(self): + return [] + + def check_run(self): + """This function checks that daemon is running. By default it + checks only the process status. But you can override it to + check your binary specific marks like 'port is busy' and + others.""" + return self.is_alive() + + def run(self): + if self.check_run(): + logger.error("Can't run %s.\nProcess already started" % self.cmd) + raise DaemonError("daemon already started.") + + try: + self.before_start() + except Exception: + logger.exception("Exception in user hook before_start") + self.daemon = process.execute(self.cmd[:1] + self.required_args() + self.cmd[1:], + False, + shell=self._shell, + cwd=self.cwd, + env=self._env, + stdin=self.stdinf, + stdout=self.stdoutf, + stderr=self.stderrf, + creationflags=self._creationflags, + wait=False) + stdout, stderr = self.__communicate() + timeout_reason_msg = "Failed to execute '{cmd}'.\n\tstdout: {out}\n\tstderr: {err}".format( + cmd=" ".join(self.cmd), + out=stdout, + err=stderr) + try: + process.wait_for(self.check_run, self._timeout, timeout_reason_msg, sleep_time=0.1) + except process.TimeoutError: + self.raise_on_death(timeout_reason_msg) + + if not self.is_alive(): + self.raise_on_death("WHY? %s %s" % (self.daemon, self.daemon.running)) + + try: + self.after_start() + except Exception as e: + msg = "Exception in user hook after_start. Exception: %s" % str(e) + logger.exception(msg) + + return self + + def raise_on_death(self, additional_text=""): + stdout = "[NO STDOUT]" + stderr = "[NO STDERR]" + + if self.stdoutf and self.stdinf: + stdout, stderr = self.__communicate() + if self.daemon and getattr(self.daemon, "process"): + self.check_coredump() + + raise DaemonError( + Daemon.__log_failed( + "process {} unexpectedly finished. \n\n {}".format(self.cmd, additional_text), + stdout, + stderr + ) + ) + + def check_coredump(self): + try: + core_file = cores.recover_core_dump_file(self.cmd[0], self.cwd, self.daemon.process.pid) + if core_file: + logger.debug(core_file + " found, maybe this is our coredump file") + self.save_coredump(core_file) + else: + logger.debug("Core dump file was not found") + except Exception as e: + logger.warn("While checking coredump: " + str(e)) + + def save_coredump(self, core_file): + output_core_dir = output_path("cores") + shared_core_file = os.path.join(output_core_dir, os.path.basename(core_file)) + if not os.path.isdir(output_core_dir): + os.mkdir(output_core_dir) + + short_bt, _ = gdb.dump_traceback(executable=self.cmd[0], core_file=core_file, + output_file=shared_core_file + ".trace.txt") + if short_bt: + logger.error("Short backtrace = \n" + "=" * 80 + "\n" + short_bt + "\n" + "=" * 80) + + space_left = float(get_free_space(output_core_dir)) + if space_left > 5 * GYGABYTES: + shutil.copy2( + core_file, + shared_core_file + ) + os.chmod(shared_core_file, 0o755) + logger.debug("Saved to " + output_core_dir) + + else: + logger.error("Not enough space left on device (%s GB). Won't save %s file" % (float(space_left / GYGABYTES), core_file)) + + def stop(self, kill=False): + if not self.is_alive() and self.daemon.exit_code == 0: + return + + if not self.is_alive(): + stdout, stderr = self.__communicate() + self.check_coredump() + try: + self.after_stop() + except Exception: + logger.exception("Exception in user hook after_stop.") + + raise DaemonError( + Daemon.__log_failed( + "process {} unexpectedly finished with exit code {}.".format(self.cmd, self.daemon.exit_code), + stdout, + stderr + ), + exit_code=self.daemon.exit_code + ) + + try: + self.before_stop() + except Exception: + logger.exception("Exception in user hook before_stop.") + + stderr, stdout = self.__communicate() + timeout_reason_msg = "Cannot stop {cmd}.\n\tstdout: {out}\n\tstderr: {err}".format( + cmd=" ".join(self.cmd), + out=stdout, + err=stderr) + if not kill: + self.daemon.process.send_signal(signal.SIGINT) + try: # soft wait for. trying to kill with sigint + process.wait_for(lambda: not self.is_alive(), self._timeout, timeout_reason_msg, sleep_time=0.1) + except TimeoutError: + pass + + is_killed = False + if self.is_alive(): + self.daemon.process.send_signal(signal.SIGKILL) + is_killed = True + + process.wait_for(lambda: not self.is_alive(), self._timeout, timeout_reason_msg, sleep_time=0.1) + + try: + self.after_stop() + except Exception: + logger.exception("Exception in user hook after_stop") + + if self.daemon.running: + stdout, stderr = self.__communicate() + msg = "cannot stop daemon {cmd}\n\tstdout: {out}\n\tstderr: {err}".format( + cmd=' '.join(self.cmd), + out=stdout, + err=stderr + ) + logger.error(msg) + raise DaemonError(msg, stdout=stdout, stderr=stderr, exit_code=self.daemon.exit_code) + + stdout, stderr = self.__communicate() + logger.debug( + "Process stopped: {cmd}.\n\tstdout:\n{out}\n\tstderr:\n{err}".format( + cmd=" ".join(self.cmd), + out=stdout, + err=stderr + ) + ) + if not is_killed: + self.check_coredump() + if self._check_exit_code and self.daemon.exit_code != 0: + stdout, stderr = self.__communicate() + raise DaemonError("Bad exit_code.", stdout=stdout, stderr=stderr, exit_code=self.daemon.exit_code) + else: + logger.warning("Exit code is not checked, cos binary was stopped by sigkill") + + def _read_io(self, file_obj): + file_obj.flush() + + cur_pos = file_obj.tell() + seek_pos_from_end = max(-cur_pos, -MAX_IO_LEN) + file_obj.seek(seek_pos_from_end, os.SEEK_END) + return file_obj.read() + + def __communicate(self): + stderr = self._read_io(self.stderrf) + stdout = self._read_io(self.stdoutf) + return stdout, stderr + + @staticmethod + def __log_failed(msg, stderr, stdout): + final_msg = '{msg}\nstdout: {out}\nstderr: {err}'.format( + msg=msg, + out=stdout, + err=stderr) + logger.error(msg) + return final_msg diff --git a/library/python/testing/swag/gdb.py b/library/python/testing/swag/gdb.py new file mode 100644 index 0000000000..b9a3694d90 --- /dev/null +++ b/library/python/testing/swag/gdb.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging + +import yatest.common + +logger = logging.getLogger(__name__) + +SHORT_DUMP_COMMAND = "{gdb} {executable} {core_file} --eval-command='backtrace full' --batch -q" +LONG_DUMP_COMMAND = "{gdb} {executable} {core_file} --eval-command='thread apply all bt full' --batch -q" + + +def get_gdb(): + return yatest.common.gdb_path() + + +def run_gdb_command(command, stdout_file, stderr_file): + logger.debug("Running gdb command %s" % command) + + with open(stdout_file, "w") as out, open(stderr_file, "w") as err: + yatest.common.process.execute( + command, + check_exit_code=True, + wait=True, + shell=True, + stdout=out, + stderr=err + ) + + +def dump_traceback(executable, core_file, output_file): + """ + Dumps traceback if its possible + + :param executable: binary for gdb + :param core_file: core file for gdb + :param output_file: file to dump traceback to, also dump full traceback to <output_file + ".full"> + + :return: string tuple (short_backtrace, full_backtrace) + """ + try: + gdb = get_gdb() + short_dump_command = SHORT_DUMP_COMMAND.format(gdb=gdb, executable=executable, core_file=core_file) + long_dump_command = LONG_DUMP_COMMAND.format(gdb=gdb, executable=executable, core_file=core_file) + run_gdb_command(short_dump_command, output_file, output_file + '.err') + output_file_full = output_file + ".full" + output_file_full_err = output_file_full + '.err' + run_gdb_command(long_dump_command, output_file_full, output_file_full_err) + except Exception: + logger.exception("Failed to print trace") + return '', '' + + short_backtrace = '' + full_backtrace = '' + with open(output_file) as o, open(output_file_full) as e: + short_backtrace = o.read() + full_backtrace = e.read() + return short_backtrace, full_backtrace diff --git a/library/python/testing/swag/lib/ya.make b/library/python/testing/swag/lib/ya.make new file mode 100644 index 0000000000..e3e7cfa300 --- /dev/null +++ b/library/python/testing/swag/lib/ya.make @@ -0,0 +1,20 @@ +PY23_LIBRARY() + +PEERDIR( + contrib/python/protobuf + library/python/testing/yatest_common +) + +SRCDIR(library/python/testing/swag) + +PY_SRCS( + NAMESPACE library.python.testing.swag + + daemon.py + gdb.py + pathutil.py + ports.py + proto_traversals.py +) + +END() diff --git a/library/python/testing/swag/pathutil.py b/library/python/testing/swag/pathutil.py new file mode 100644 index 0000000000..3f7910eff3 --- /dev/null +++ b/library/python/testing/swag/pathutil.py @@ -0,0 +1,26 @@ +import os +import tempfile + + +def get_valid_filename(filename, dirname): + current_file, counter = filename, 0 + while os.path.exists(os.path.join(dirname, current_file)): + current_file = "%s_%d" % (filename, counter) + counter += 1 + valid_path = os.path.join(dirname, current_file) + os.mknod(valid_path) + return valid_path + + +def get_valid_tmpdir(name, tmp_dir): + current_dir, counter = name, 0 + while os.path.exists(os.path.join(tmp_dir, current_dir)): + current_dir = "%s_%d" % (name, counter) + counter += 1 + os.mkdir(os.path.join(tmp_dir, current_dir)) + return os.path.join(tmp_dir, current_dir) + + +def get_base_tmpdir(name): + tmppath = tempfile.gettempdir() + return get_valid_tmpdir(name, tmppath) diff --git a/library/python/testing/swag/ports.py b/library/python/testing/swag/ports.py new file mode 100644 index 0000000000..1bf790d06b --- /dev/null +++ b/library/python/testing/swag/ports.py @@ -0,0 +1,30 @@ +import random +import socket + +ATTEMPTS = 25 +# range 10000-10199 is reserved for Skynet on Sandbox machines +MIN_PORT = 10200 +MAX_PORT = 25000 + + +def is_port_open(host, port): + _socket = socket.socket(socket.AF_INET) + return _socket.connect_ex((host, port)) != 0 + + +def find_free_port(range_start=MIN_PORT, range_end=MAX_PORT, attempts=ATTEMPTS): + """ + Finds free port + + :param range_start: start of range + :param range_end: end of range + :param attempts: number of tries to find free port + + :return: some open port in a given range + """ + ports = [random.randint(range_start, range_end) for _ in range(attempts)] + while ports: + port = ports.pop() + if is_port_open('', port): + return port + raise RuntimeError('Could not find free port in range = ' + str((range_start, range_end))) diff --git a/library/python/testing/swag/proto_traversals.py b/library/python/testing/swag/proto_traversals.py new file mode 100644 index 0000000000..2c521dec91 --- /dev/null +++ b/library/python/testing/swag/proto_traversals.py @@ -0,0 +1,74 @@ +import copy +from google.protobuf.descriptor import FieldDescriptor as fdescriptor + +"""Recursive tree traversals for protobuf. Each message + is node, each field is leaf. Function walks through + proto and in each node do smth.""" + + +def search(proto, fname=None, ftype=None): + for desc, obj in proto.ListFields(): + if desc.name == fname and (ftype is None or ftype == desc.type): + return (obj, desc, proto) + if desc.type == fdescriptor.TYPE_MESSAGE: + objs = obj if desc.label == fdescriptor.LABEL_REPEATED else [obj] + for one_obj in objs: + return search(one_obj, fname, ftype) + return None + + +def search_and_process(proto, return_func=lambda params, child_values=None: params, + recalc_params_func=lambda proto, obj, desc, params: params, + params=None): + """Search and process each node. Recalc params on each step. Pass it down + the tree. On each leaf calcs return value from param, and pass it up. Nodes + calc return value with current param and childs return values. + + Args: + * proto -- current node. to run through some proto, put its object here + * return_func -- function that return value. takes current (recalced for current + * node) param and list of return values for current node children. + * for leafs second parametr is None + * recalc_params_func -- function to recalc params in node. takes root proto, + * current object (or objects for repeated fields), current + * proto descriptor and param. return new param value + * params -- initial values for params""" + if proto is None: + return None + + return_values = [] + for desc, obj in proto.ListFields(): + params = copy.deepcopy(params) + if desc.type == fdescriptor.TYPE_MESSAGE: + objs = obj if desc.label == fdescriptor.LABEL_REPEATED else [obj] + params = recalc_params_func(proto, obj, desc, params) + for one_obj in objs: + return_values.append(search_and_process(one_obj, return_func, + recalc_params_func, params)) + else: + return_values.append(return_func(recalc_params_func(proto, obj, desc, params), None)) + return return_func(params, return_values) + + +def search_and_process_descriptors(proto_desc, + return_func=lambda params, child_values=None: params, + recalc_params_func=lambda desc, params: params, + params=None): + """Same as search and process(except we run recalc_params in root_proto too), + but process each node from PROTOBUF DESCRIPTIO, instead of each node from + protobuf message.""" + params = copy.deepcopy(params) + params = recalc_params_func(proto_desc, params) + + if proto_desc is None: + return None + elif hasattr(proto_desc, "type") and proto_desc.type != fdescriptor.TYPE_MESSAGE: + return return_func(params, None) + + return_values = [] + for field_desc in proto_desc.fields: + desc = field_desc if field_desc.message_type is None else field_desc.message_type + return_values.append(search_and_process_descriptors(desc, return_func, + recalc_params_func, params)) + + return return_func(params, return_values) |