# coding: utf-8
import collections
import functools
import math
import os
import re
import sys
from . import config
import yatest_lib.tools
SEP = '/'
TEST_MOD_PREFIX = '__tests__.'
class SubtestInfo(object):
skipped_prefix = '[SKIPPED] '
def from_str(cls, s):
if s.startswith(SubtestInfo.skipped_prefix):
s = s[len(SubtestInfo.skipped_prefix) :]
skipped = True
skipped = False
return SubtestInfo(*s.rsplit(TEST_SUBTEST_SEPARATOR, 1), skipped=skipped)
def __init__(self, test, subtest="", skipped=False, **kwargs):
self.test = test
self.subtest = subtest
self.skipped = skipped
for key, value in kwargs.iteritems():
setattr(self, key, value)
def __str__(self):
s = ''
if self.skipped:
s += SubtestInfo.skipped_prefix
return s + TEST_SUBTEST_SEPARATOR.join([self.test, self.subtest])
def __repr__(self):
return str(self)
class Status(object):
SKIPPED = -100
FLAKY = -1
'good': GOOD,
'fail': FAIL,
'xfail': XFAIL,
'xpass': XPASS,
'missing': MISSING,
'crashed': CRASHED,
'skipped': SKIPPED,
'flaky': FLAKY,
'not_launched': NOT_LAUNCHED,
'timeout': TIMEOUT,
'diff': CANON_DIFF,
TO_STR = {
GOOD: 'good',
FAIL: 'fail',
XFAIL: 'xfail',
XPASS: 'xpass',
MISSING: 'missing',
CRASHED: 'crashed',
SKIPPED: 'skipped',
FLAKY: 'flaky',
NOT_LAUNCHED: 'not_launched',
TIMEOUT: 'timeout',
CANON_DIFF: 'diff',
class Test(object):
def __init__(self, name, path, status=None, comment=None, subtests=None):
self.name = name
self.path = path
self.status = status
self.comment = comment
self.subtests = subtests or []
def __eq__(self, other):
if not isinstance(other, Test):
return False
return self.name == other.name and self.path == other.path
def __str__(self):
return "Test [{} {}] - {} - {}".format(self.name, self.path, self.status, self.comment)
def __repr__(self):
return str(self)
def add_subtest(self, subtest):
def setup_status(self, status, comment):
self.status = Status.BY_NAME[status or 'good']
if len(self.subtests) != 0:
self.status = max(self.status, max(s.status for s in self.subtests))
self.comment = comment
def subtests_by_status(self, status):
return [x.status for x in self.subtests].count(status)
# TODO: extract color theme logic from ya
'test_name': 'light-blue',
'test_project_path': 'dark-blue',
'test_dir_desc': 'dark-magenta',
'test_binary_path': 'light-gray',
# XXX: remove me
class YaCtx(object):
ya_ctx = YaCtx()
TRACE_FILE_NAME = "ytest.report.trace"
def lazy(func):
memory = {}
def wrapper(*args):
# Disabling caching in test mode
if config.is_test_mode():
return func(*args)
return memory[args]
except KeyError:
memory[args] = func(*args)
return memory[args]
return wrapper
def _get_mtab():
if os.path.exists("/etc/mtab"):
with open("/etc/mtab") as afile:
data = afile.read()
return [line.split(" ") for line in data.split("\n") if line]
return []
def get_max_filename_length(dirname):
Return maximum filename length for the filesystem
if sys.platform.startswith("linux"):
# Linux user's may work on mounted ecryptfs filesystem
# which has filename length limitations
for entry in _get_mtab():
mounted_dir, filesystem = entry[1], entry[2]
# http://unix.stackexchange.com/questions/32795/what-is-the-maximum-allowed-filename-and-folder-size-with-ecryptfs
if filesystem == "ecryptfs" and dirname and dirname.startswith(mounted_dir):
return 140
# default maximum filename length for most filesystems
return 255
def get_unique_file_path(dir_path, filename, cache=collections.defaultdict(set)):
Get unique filename in dir with proper filename length, using given filename/dir.
File/dir won't be created (thread nonsafe)
:param dir_path: path to dir
:param filename: original filename
:return: unique filename
max_suffix = 10000
# + 1 symbol for dot before suffix
tail_length = int(round(math.log(max_suffix, 10))) + 1
# truncate filename length in accordance with filesystem limitations
filename, extension = os.path.splitext(filename)
if sys.platform.startswith("win"):
# Trying to fit into MAX_PATH if it's possible.
# Remove after DEVTOOLS-1646
max_path = 260
filename_len = len(dir_path) + len(extension) + tail_length + len(os.sep)
if filename_len < max_path:
filename = yatest_lib.tools.trim_string(filename, max_path - filename_len)
filename = (
yatest_lib.tools.trim_string(filename, get_max_filename_length(dir_path) - tail_length - len(extension))
+ extension
candidate = os.path.join(dir_path, filename)
key = dir_path + filename
counter = sorted(
while os.path.exists(candidate):
counter += 1
assert counter < max_suffix
candidate = os.path.join(dir_path, filename + ".{}".format(counter))
return candidate
def escape_for_fnmatch(s):
return s.replace("[", "[").replace("]", "]")
def get_python_cmd(opts=None, use_huge=True, suite=None):
if opts and getattr(opts, 'flags', {}).get("USE_ARCADIA_PYTHON") == "no":
return ["python"]
if suite and not suite._use_arcadia_python:
return ["python"]
if use_huge:
return ["$(PYTHON)/python"]
ymake_path = opts.ymake_bin if opts and getattr(opts, 'ymake_bin', None) else "$(YMAKE)/ymake"
return [ymake_path, "--python"]
def normalize_name(name):
replacements = [
("\\", "\\\\"),
("\n", "\\n"),
("\t", "\\t"),
("\r", "\\r"),
for from_, to in replacements:
name = name.replace(from_, to)
return name
def normalize_filename(filename):
Replace invalid for file names characters with string equivalents
:param some_string: string to be converted to a valid file name
:return: valid file name
not_allowed_pattern = r"[\[\]\/:*?\"\'<>|+\0\\\s\x0b\x0c]"
filename = re.sub(not_allowed_pattern, ".", filename)
return re.sub(r"\.{2,}", ".", filename)
def get_test_log_file_path(output_dir, class_name, test_name, extension="log"):
get test log file path, platform dependant
:param output_dir: dir where log file should be placed
:param class_name: test class name
:param test_name: test name
:return: test log file name
if os.name == "nt":
# don't add class name to the log's filename
# to reduce it's length on windows
filename = test_name
filename = "{}.{}".format(class_name, test_name)
if not filename:
filename = "test"
filename += "." + extension
filename = normalize_filename(filename)
return get_unique_file_path(output_dir, filename)
def split_node_id(nodeid, test_suffix=None):
path, possible_open_bracket, params = nodeid.partition('[')
separator = "::"
test_name = None
if separator in path:
path, test_name = path.split(separator, 1)
path = _unify_path(path)
class_name = os.path.basename(path)
if test_name is None:
test_name = class_name
if test_suffix:
test_name += "::" + test_suffix
if separator in test_name:
klass_name, test_name = test_name.split(separator, 1)
if not test_suffix:
# test suffix is used for flakes and pep8, no need to add class_name as it's === class_name
class_name += separator + klass_name
if separator in test_name:
test_name = test_name.split(separator)[-1]
test_name += possible_open_bracket + params
return yatest_lib.tools.to_utf8(class_name), yatest_lib.tools.to_utf8(test_name)
def _suffix_test_modules_tree():
root = {}
for module in sys.extra_modules:
if not module.startswith(TEST_MOD_PREFIX):
module = module[len(TEST_MOD_PREFIX) :]
node = root
for name in reversed(module.split('.')):
if name == '__init__':
node = node.setdefault(name, {})
return root
def _conftest_load_policy_is_local(path):
return SEP in path and getattr(sys, "is_standalone_binary", False)
class MissingTestModule(Exception):
# If CONFTEST_LOAD_POLICY==LOCAL the path parameters is a true test file path. Something like
# /-B/taxi/uservices/services/alt/gen/tests/build/services/alt/validation/test_generated_files.py
# If CONFTEST_LOAD_POLICY is not LOCAL the path parameter is a module name with '.py' extension added. Example:
# validation.test_generated_files.py
# To make test names independent of the CONFTEST_LOAD_POLICY value replace path by module name if possible.
def _unify_path(path):
py_ext = ".py"
path = path.strip()
if _conftest_load_policy_is_local(path) and path.endswith(py_ext):
# Try to find best match for path as a module among test modules and use it as a class name.
# This is the only way to unify different CONFTEST_LOAD_POLICY modes
suff_tree = _suffix_test_modules_tree()
node, res = suff_tree, []
assert path.endswith(py_ext), path
parts = path[: -len(py_ext)].split(SEP)
# Use SEP as trailing terminator to make an extra step
# and find a proper match when parts is a full matching path
for p in reversed([SEP] + parts):
if p in node:
node = node[p]
if res:
return '.'.join(reversed(res)) + py_ext
# Top level test module
if TEST_MOD_PREFIX + p in sys.extra_modules:
return p + py_ext
# Unknown module - raise an error
raise MissingTestModule("Can't find proper module for '{}' path among: {}".format(path, suff_tree))
return path
def colorize_pytest_error(text):
error_prefix = "E "
blocks = [text]
while True:
text = blocks.pop()
err_start = text.find(error_prefix, 1)
if err_start == -1:
return ''.join(blocks + [text])
for pos in range(err_start + 1, len(text) - 1):
if text[pos] == '\n':
if not text[pos + 1 :].startswith(error_prefix):
err_end = pos + 1
err_end = len(text)
bt, error, tail = text[:err_start], text[err_start:err_end], text[err_end:]
filters = [
# File path, line number and function name
re.compile(r"^(.*?):(\d+): in (\S+)", flags=re.MULTILINE),
r"[[unimp]]\1[[rst]]:[[alt2]]\2[[rst]]: in [[alt1]]\3[[rst]]",
for regex, substitution in filters:
bt = regex.sub(substitution, bt)
blocks.append('[[bad]]' + error)