aboutsummaryrefslogtreecommitdiffstats
path: root/library/python
diff options
context:
space:
mode:
authoralexv-smirnov <alex@ydb.tech>2023-10-18 11:29:37 +0300
committeralexv-smirnov <alex@ydb.tech>2023-10-18 13:06:09 +0300
commit9ae2b50e805245bff2d8be04123f4e7f08806324 (patch)
treed97eaff30d6ea46d9c957ddbb57573f9044b5523 /library/python
parent18dc72ed9a86762cd037f5e41fb79cec458b3c2c (diff)
downloadydb-9ae2b50e805245bff2d8be04123f4e7f08806324.tar.gz
Move swag from devtools/ to library/python/testing/
Diffstat (limited to 'library/python')
-rw-r--r--library/python/testing/swag/daemon.py297
-rw-r--r--library/python/testing/swag/gdb.py59
-rw-r--r--library/python/testing/swag/lib/ya.make20
-rw-r--r--library/python/testing/swag/pathutil.py26
-rw-r--r--library/python/testing/swag/ports.py30
-rw-r--r--library/python/testing/swag/proto_traversals.py74
6 files changed, 506 insertions, 0 deletions
diff --git a/library/python/testing/swag/daemon.py b/library/python/testing/swag/daemon.py
new file mode 100644
index 0000000000..e59bedb5e7
--- /dev/null
+++ b/library/python/testing/swag/daemon.py
@@ -0,0 +1,297 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+import os
+import sys
+import signal
+import tempfile
+import shutil
+
+try:
+ from . import gdb
+except ValueError:
+ import gdb
+
+from yatest.common import process, output_path, TimeoutError, cores
+
+MAX_IO_LEN = 1024 * 10
+GYGABYTES = 1 << 30
+
+logger = logging.getLogger(__name__)
+
+
+def run_daemon(command, check_exit_code=True, shell=False, timeout=5, cwd=None,
+ env=None, stdin=None, stdout=None, stderr=None, creationflags=0):
+ daemon = Daemon(command, check_exit_code, shell, timeout, cwd, env, stdin, stdout, stderr, creationflags)
+ daemon.run()
+ return daemon
+
+
+def get_free_space(path):
+ stats = os.statvfs(path)
+ return stats.f_bavail * stats.f_frsize
+
+
+class DaemonError(RuntimeError):
+ def __init__(self, message, stdout=None, stderr=None, exit_code=None):
+ lst = [
+ "Daemon failed with message: {message}.".format(message=message),
+ ]
+ if exit_code is not None:
+ lst.append(
+ "Process exit_code = {exit_code}.".format(exit_code=exit_code)
+ )
+ if stdout is not None:
+ lst.append(
+ "Stdout: {stdout}".format(stdout=stdout)
+ )
+ if stderr is not None:
+ lst.append(
+ "Stderr: {stderr}".format(stderr=stderr)
+ )
+
+ super(DaemonError, self).__init__('\n'.join(lst))
+
+
+class Daemon(object):
+ def __init__(self, command, check_exit_code=True, shell=False, timeout=5, cwd=None,
+ env=None, stdin=None, stdout=None, stderr=None, creationflags=0):
+ if cwd is None:
+ cwd = tempfile.mkdtemp()
+ self.cwd = cwd
+
+ self.stdoutf = stdout or tempfile.NamedTemporaryFile(dir=self.cwd, prefix="stdout_", delete=False)
+ self.stderrf = stderr or tempfile.NamedTemporaryFile(dir=self.cwd, prefix="stderr_", delete=False)
+ self.stdinf = stdin or tempfile.NamedTemporaryFile(dir=self.cwd, prefix="stdin_", delete=False)
+
+ self.cmd = command
+ if sys.version_info.major > 2:
+ _basestring = str
+ else:
+ _basestring = basestring
+ if isinstance(command, _basestring):
+ self.cmd = [arg for arg in command.split() if arg]
+ self.daemon = None
+ self.name = os.path.basename(self.cmd[0])
+
+ self._shell = shell
+ self._env = env
+ self._creationflags = creationflags
+
+ self._check_exit_code = check_exit_code
+ self._timeout = timeout
+
+ def before_start(self):
+ pass
+
+ def after_start(self):
+ pass
+
+ def before_stop(self):
+ pass
+
+ def after_stop(self):
+ pass
+
+ def is_alive(self):
+ return self.daemon and self.daemon.running
+
+ def required_args(self):
+ return []
+
+ def check_run(self):
+ """This function checks that daemon is running. By default it
+ checks only the process status. But you can override it to
+ check your binary specific marks like 'port is busy' and
+ others."""
+ return self.is_alive()
+
+ def run(self):
+ if self.check_run():
+ logger.error("Can't run %s.\nProcess already started" % self.cmd)
+ raise DaemonError("daemon already started.")
+
+ try:
+ self.before_start()
+ except Exception:
+ logger.exception("Exception in user hook before_start")
+ self.daemon = process.execute(self.cmd[:1] + self.required_args() + self.cmd[1:],
+ False,
+ shell=self._shell,
+ cwd=self.cwd,
+ env=self._env,
+ stdin=self.stdinf,
+ stdout=self.stdoutf,
+ stderr=self.stderrf,
+ creationflags=self._creationflags,
+ wait=False)
+ stdout, stderr = self.__communicate()
+ timeout_reason_msg = "Failed to execute '{cmd}'.\n\tstdout: {out}\n\tstderr: {err}".format(
+ cmd=" ".join(self.cmd),
+ out=stdout,
+ err=stderr)
+ try:
+ process.wait_for(self.check_run, self._timeout, timeout_reason_msg, sleep_time=0.1)
+ except process.TimeoutError:
+ self.raise_on_death(timeout_reason_msg)
+
+ if not self.is_alive():
+ self.raise_on_death("WHY? %s %s" % (self.daemon, self.daemon.running))
+
+ try:
+ self.after_start()
+ except Exception as e:
+ msg = "Exception in user hook after_start. Exception: %s" % str(e)
+ logger.exception(msg)
+
+ return self
+
+ def raise_on_death(self, additional_text=""):
+ stdout = "[NO STDOUT]"
+ stderr = "[NO STDERR]"
+
+ if self.stdoutf and self.stdinf:
+ stdout, stderr = self.__communicate()
+ if self.daemon and getattr(self.daemon, "process"):
+ self.check_coredump()
+
+ raise DaemonError(
+ Daemon.__log_failed(
+ "process {} unexpectedly finished. \n\n {}".format(self.cmd, additional_text),
+ stdout,
+ stderr
+ )
+ )
+
+ def check_coredump(self):
+ try:
+ core_file = cores.recover_core_dump_file(self.cmd[0], self.cwd, self.daemon.process.pid)
+ if core_file:
+ logger.debug(core_file + " found, maybe this is our coredump file")
+ self.save_coredump(core_file)
+ else:
+ logger.debug("Core dump file was not found")
+ except Exception as e:
+ logger.warn("While checking coredump: " + str(e))
+
+ def save_coredump(self, core_file):
+ output_core_dir = output_path("cores")
+ shared_core_file = os.path.join(output_core_dir, os.path.basename(core_file))
+ if not os.path.isdir(output_core_dir):
+ os.mkdir(output_core_dir)
+
+ short_bt, _ = gdb.dump_traceback(executable=self.cmd[0], core_file=core_file,
+ output_file=shared_core_file + ".trace.txt")
+ if short_bt:
+ logger.error("Short backtrace = \n" + "=" * 80 + "\n" + short_bt + "\n" + "=" * 80)
+
+ space_left = float(get_free_space(output_core_dir))
+ if space_left > 5 * GYGABYTES:
+ shutil.copy2(
+ core_file,
+ shared_core_file
+ )
+ os.chmod(shared_core_file, 0o755)
+ logger.debug("Saved to " + output_core_dir)
+
+ else:
+ logger.error("Not enough space left on device (%s GB). Won't save %s file" % (float(space_left / GYGABYTES), core_file))
+
+ def stop(self, kill=False):
+ if not self.is_alive() and self.daemon.exit_code == 0:
+ return
+
+ if not self.is_alive():
+ stdout, stderr = self.__communicate()
+ self.check_coredump()
+ try:
+ self.after_stop()
+ except Exception:
+ logger.exception("Exception in user hook after_stop.")
+
+ raise DaemonError(
+ Daemon.__log_failed(
+ "process {} unexpectedly finished with exit code {}.".format(self.cmd, self.daemon.exit_code),
+ stdout,
+ stderr
+ ),
+ exit_code=self.daemon.exit_code
+ )
+
+ try:
+ self.before_stop()
+ except Exception:
+ logger.exception("Exception in user hook before_stop.")
+
+ stderr, stdout = self.__communicate()
+ timeout_reason_msg = "Cannot stop {cmd}.\n\tstdout: {out}\n\tstderr: {err}".format(
+ cmd=" ".join(self.cmd),
+ out=stdout,
+ err=stderr)
+ if not kill:
+ self.daemon.process.send_signal(signal.SIGINT)
+ try: # soft wait for. trying to kill with sigint
+ process.wait_for(lambda: not self.is_alive(), self._timeout, timeout_reason_msg, sleep_time=0.1)
+ except TimeoutError:
+ pass
+
+ is_killed = False
+ if self.is_alive():
+ self.daemon.process.send_signal(signal.SIGKILL)
+ is_killed = True
+
+ process.wait_for(lambda: not self.is_alive(), self._timeout, timeout_reason_msg, sleep_time=0.1)
+
+ try:
+ self.after_stop()
+ except Exception:
+ logger.exception("Exception in user hook after_stop")
+
+ if self.daemon.running:
+ stdout, stderr = self.__communicate()
+ msg = "cannot stop daemon {cmd}\n\tstdout: {out}\n\tstderr: {err}".format(
+ cmd=' '.join(self.cmd),
+ out=stdout,
+ err=stderr
+ )
+ logger.error(msg)
+ raise DaemonError(msg, stdout=stdout, stderr=stderr, exit_code=self.daemon.exit_code)
+
+ stdout, stderr = self.__communicate()
+ logger.debug(
+ "Process stopped: {cmd}.\n\tstdout:\n{out}\n\tstderr:\n{err}".format(
+ cmd=" ".join(self.cmd),
+ out=stdout,
+ err=stderr
+ )
+ )
+ if not is_killed:
+ self.check_coredump()
+ if self._check_exit_code and self.daemon.exit_code != 0:
+ stdout, stderr = self.__communicate()
+ raise DaemonError("Bad exit_code.", stdout=stdout, stderr=stderr, exit_code=self.daemon.exit_code)
+ else:
+ logger.warning("Exit code is not checked, cos binary was stopped by sigkill")
+
+ def _read_io(self, file_obj):
+ file_obj.flush()
+
+ cur_pos = file_obj.tell()
+ seek_pos_from_end = max(-cur_pos, -MAX_IO_LEN)
+ file_obj.seek(seek_pos_from_end, os.SEEK_END)
+ return file_obj.read()
+
+ def __communicate(self):
+ stderr = self._read_io(self.stderrf)
+ stdout = self._read_io(self.stdoutf)
+ return stdout, stderr
+
+ @staticmethod
+ def __log_failed(msg, stderr, stdout):
+ final_msg = '{msg}\nstdout: {out}\nstderr: {err}'.format(
+ msg=msg,
+ out=stdout,
+ err=stderr)
+ logger.error(msg)
+ return final_msg
diff --git a/library/python/testing/swag/gdb.py b/library/python/testing/swag/gdb.py
new file mode 100644
index 0000000000..b9a3694d90
--- /dev/null
+++ b/library/python/testing/swag/gdb.py
@@ -0,0 +1,59 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+
+import yatest.common
+
+logger = logging.getLogger(__name__)
+
+SHORT_DUMP_COMMAND = "{gdb} {executable} {core_file} --eval-command='backtrace full' --batch -q"
+LONG_DUMP_COMMAND = "{gdb} {executable} {core_file} --eval-command='thread apply all bt full' --batch -q"
+
+
+def get_gdb():
+ return yatest.common.gdb_path()
+
+
+def run_gdb_command(command, stdout_file, stderr_file):
+ logger.debug("Running gdb command %s" % command)
+
+ with open(stdout_file, "w") as out, open(stderr_file, "w") as err:
+ yatest.common.process.execute(
+ command,
+ check_exit_code=True,
+ wait=True,
+ shell=True,
+ stdout=out,
+ stderr=err
+ )
+
+
+def dump_traceback(executable, core_file, output_file):
+ """
+ Dumps traceback if its possible
+
+ :param executable: binary for gdb
+ :param core_file: core file for gdb
+ :param output_file: file to dump traceback to, also dump full traceback to <output_file + ".full">
+
+ :return: string tuple (short_backtrace, full_backtrace)
+ """
+ try:
+ gdb = get_gdb()
+ short_dump_command = SHORT_DUMP_COMMAND.format(gdb=gdb, executable=executable, core_file=core_file)
+ long_dump_command = LONG_DUMP_COMMAND.format(gdb=gdb, executable=executable, core_file=core_file)
+ run_gdb_command(short_dump_command, output_file, output_file + '.err')
+ output_file_full = output_file + ".full"
+ output_file_full_err = output_file_full + '.err'
+ run_gdb_command(long_dump_command, output_file_full, output_file_full_err)
+ except Exception:
+ logger.exception("Failed to print trace")
+ return '', ''
+
+ short_backtrace = ''
+ full_backtrace = ''
+ with open(output_file) as o, open(output_file_full) as e:
+ short_backtrace = o.read()
+ full_backtrace = e.read()
+ return short_backtrace, full_backtrace
diff --git a/library/python/testing/swag/lib/ya.make b/library/python/testing/swag/lib/ya.make
new file mode 100644
index 0000000000..e3e7cfa300
--- /dev/null
+++ b/library/python/testing/swag/lib/ya.make
@@ -0,0 +1,20 @@
+PY23_LIBRARY()
+
+PEERDIR(
+ contrib/python/protobuf
+ library/python/testing/yatest_common
+)
+
+SRCDIR(library/python/testing/swag)
+
+PY_SRCS(
+ NAMESPACE library.python.testing.swag
+
+ daemon.py
+ gdb.py
+ pathutil.py
+ ports.py
+ proto_traversals.py
+)
+
+END()
diff --git a/library/python/testing/swag/pathutil.py b/library/python/testing/swag/pathutil.py
new file mode 100644
index 0000000000..3f7910eff3
--- /dev/null
+++ b/library/python/testing/swag/pathutil.py
@@ -0,0 +1,26 @@
+import os
+import tempfile
+
+
+def get_valid_filename(filename, dirname):
+ current_file, counter = filename, 0
+ while os.path.exists(os.path.join(dirname, current_file)):
+ current_file = "%s_%d" % (filename, counter)
+ counter += 1
+ valid_path = os.path.join(dirname, current_file)
+ os.mknod(valid_path)
+ return valid_path
+
+
+def get_valid_tmpdir(name, tmp_dir):
+ current_dir, counter = name, 0
+ while os.path.exists(os.path.join(tmp_dir, current_dir)):
+ current_dir = "%s_%d" % (name, counter)
+ counter += 1
+ os.mkdir(os.path.join(tmp_dir, current_dir))
+ return os.path.join(tmp_dir, current_dir)
+
+
+def get_base_tmpdir(name):
+ tmppath = tempfile.gettempdir()
+ return get_valid_tmpdir(name, tmppath)
diff --git a/library/python/testing/swag/ports.py b/library/python/testing/swag/ports.py
new file mode 100644
index 0000000000..1bf790d06b
--- /dev/null
+++ b/library/python/testing/swag/ports.py
@@ -0,0 +1,30 @@
+import random
+import socket
+
+ATTEMPTS = 25
+# range 10000-10199 is reserved for Skynet on Sandbox machines
+MIN_PORT = 10200
+MAX_PORT = 25000
+
+
+def is_port_open(host, port):
+ _socket = socket.socket(socket.AF_INET)
+ return _socket.connect_ex((host, port)) != 0
+
+
+def find_free_port(range_start=MIN_PORT, range_end=MAX_PORT, attempts=ATTEMPTS):
+ """
+ Finds free port
+
+ :param range_start: start of range
+ :param range_end: end of range
+ :param attempts: number of tries to find free port
+
+ :return: some open port in a given range
+ """
+ ports = [random.randint(range_start, range_end) for _ in range(attempts)]
+ while ports:
+ port = ports.pop()
+ if is_port_open('', port):
+ return port
+ raise RuntimeError('Could not find free port in range = ' + str((range_start, range_end)))
diff --git a/library/python/testing/swag/proto_traversals.py b/library/python/testing/swag/proto_traversals.py
new file mode 100644
index 0000000000..2c521dec91
--- /dev/null
+++ b/library/python/testing/swag/proto_traversals.py
@@ -0,0 +1,74 @@
+import copy
+from google.protobuf.descriptor import FieldDescriptor as fdescriptor
+
+"""Recursive tree traversals for protobuf. Each message
+ is node, each field is leaf. Function walks through
+ proto and in each node do smth."""
+
+
+def search(proto, fname=None, ftype=None):
+ for desc, obj in proto.ListFields():
+ if desc.name == fname and (ftype is None or ftype == desc.type):
+ return (obj, desc, proto)
+ if desc.type == fdescriptor.TYPE_MESSAGE:
+ objs = obj if desc.label == fdescriptor.LABEL_REPEATED else [obj]
+ for one_obj in objs:
+ return search(one_obj, fname, ftype)
+ return None
+
+
+def search_and_process(proto, return_func=lambda params, child_values=None: params,
+ recalc_params_func=lambda proto, obj, desc, params: params,
+ params=None):
+ """Search and process each node. Recalc params on each step. Pass it down
+ the tree. On each leaf calcs return value from param, and pass it up. Nodes
+ calc return value with current param and childs return values.
+
+ Args:
+ * proto -- current node. to run through some proto, put its object here
+ * return_func -- function that return value. takes current (recalced for current
+ * node) param and list of return values for current node children.
+ * for leafs second parametr is None
+ * recalc_params_func -- function to recalc params in node. takes root proto,
+ * current object (or objects for repeated fields), current
+ * proto descriptor and param. return new param value
+ * params -- initial values for params"""
+ if proto is None:
+ return None
+
+ return_values = []
+ for desc, obj in proto.ListFields():
+ params = copy.deepcopy(params)
+ if desc.type == fdescriptor.TYPE_MESSAGE:
+ objs = obj if desc.label == fdescriptor.LABEL_REPEATED else [obj]
+ params = recalc_params_func(proto, obj, desc, params)
+ for one_obj in objs:
+ return_values.append(search_and_process(one_obj, return_func,
+ recalc_params_func, params))
+ else:
+ return_values.append(return_func(recalc_params_func(proto, obj, desc, params), None))
+ return return_func(params, return_values)
+
+
+def search_and_process_descriptors(proto_desc,
+ return_func=lambda params, child_values=None: params,
+ recalc_params_func=lambda desc, params: params,
+ params=None):
+ """Same as search and process(except we run recalc_params in root_proto too),
+ but process each node from PROTOBUF DESCRIPTIO, instead of each node from
+ protobuf message."""
+ params = copy.deepcopy(params)
+ params = recalc_params_func(proto_desc, params)
+
+ if proto_desc is None:
+ return None
+ elif hasattr(proto_desc, "type") and proto_desc.type != fdescriptor.TYPE_MESSAGE:
+ return return_func(params, None)
+
+ return_values = []
+ for field_desc in proto_desc.fields:
+ desc = field_desc if field_desc.message_type is None else field_desc.message_type
+ return_values.append(search_and_process_descriptors(desc, return_func,
+ recalc_params_func, params))
+
+ return return_func(params, return_values)