# coding: utf-8
import base64
import errno
import re
import sys
import os
import logging
import fnmatch
import json
import time
import traceback
import collections
import signal
import inspect
import warnings
import attr
import faulthandler
import py
import pytest
import six
import _pytest
import _pytest._io
import _pytest.mark
import _pytest.outcomes
import _pytest.skipping
from _pytest.warning_types import PytestUnhandledCoroutineWarning
from yatest_lib import test_splitter
import yatest.common as yatest_common
from library.python.pytest.plugins.metrics import test_metrics
try:
import resource
except ImportError:
resource = None
try:
import library.python.pytest.yatest_tools as tools
except ImportError:
# fallback for pytest script mode
import yatest_tools as tools
try:
from library.python import filelock
except ImportError:
filelock = None
import yatest_lib.tools
import yatest_lib.external as canon
import yatest_lib.ya
from library.python.pytest import context
console_logger = logging.getLogger("console")
yatest_logger = logging.getLogger("ya.test")
_pytest.main.EXIT_NOTESTSCOLLECTED = 0
SHUTDOWN_REQUESTED = False
pytest_config = None
def configure_pdb_on_demand():
import signal
if hasattr(signal, "SIGUSR1"):
def on_signal(*args):
import ipdb
ipdb.set_trace()
signal.signal(signal.SIGUSR1, on_signal)
class CustomImporter(object):
def __init__(self, roots):
self._roots = roots
def find_spec(self, fullname, path, target=None):
return None
def find_module(self, fullname, package_path=None):
for path in self._roots:
full_path = self._get_module_path(path, fullname)
if os.path.exists(full_path) and os.path.isdir(full_path) and not os.path.exists(os.path.join(full_path, "__init__.py")):
open(os.path.join(full_path, "__init__.py"), "w").close()
return None
def _get_module_path(self, path, fullname):
return os.path.join(path, *fullname.split('.'))
class YaTestLoggingFileHandler(logging.FileHandler):
pass
class _TokenFilterFormatter(logging.Formatter):
def __init__(self, fmt):
super(_TokenFilterFormatter, self).__init__(fmt)
self._replacements = []
if not self._replacements:
if six.PY2:
for k, v in os.environ.iteritems():
if k.endswith('TOKEN') and v:
self._replacements.append(v)
elif six.PY3:
for k, v in os.environ.items():
if k.endswith('TOKEN') and v:
self._replacements.append(v)
self._replacements = sorted(self._replacements)
def _filter(self, s):
for r in self._replacements:
s = s.replace(r, "[SECRET]")
return s
def format(self, record):
return self._filter(super(_TokenFilterFormatter, self).format(record))
def setup_logging(log_path, level=logging.DEBUG, *other_logs):
logs = [log_path] + list(other_logs)
root_logger = logging.getLogger()
for i in range(len(root_logger.handlers) - 1, -1, -1):
if isinstance(root_logger.handlers[i], YaTestLoggingFileHandler):
root_logger.handlers.pop(i).close()
root_logger.setLevel(level)
for log_file in logs:
file_handler = YaTestLoggingFileHandler(log_file)
log_format = '%(asctime)s - %(levelname)s - %(name)s - %(funcName)s: %(message)s'
file_handler.setFormatter(_TokenFilterFormatter(log_format))
file_handler.setLevel(level)
root_logger.addHandler(file_handler)
def pytest_addoption(parser):
parser.addoption("--build-root", action="store", dest="build_root", default="", help="path to the build root")
parser.addoption("--dep-root", action="append", dest="dep_roots", default=[], help="path to the dep build roots")
parser.addoption("--source-root", action="store", dest="source_root", default="", help="path to the source root")
parser.addoption("--data-root", action="store", dest="data_root", default="", help="path to the arcadia_tests_data root")
parser.addoption("--output-dir", action="store", dest="output_dir", default="", help="path to the test output dir")
parser.addoption("--python-path", action="store", dest="python_path", default="", help="path the canonical python binary")
parser.addoption("--valgrind-path", action="store", dest="valgrind_path", default="", help="path the canonical valgring binary")
parser.addoption("--test-filter", action="append", dest="test_filter", default=None, help="test filter")
parser.addoption("--test-file-filter", action="store", dest="test_file_filter", default=None, help="test file filter")
parser.addoption("--test-param", action="append", dest="test_params", default=None, help="test parameters")
parser.addoption("--test-log-level", action="store", dest="test_log_level", choices=["critical", "error", "warning", "info", "debug"], default="debug", help="test log level")
parser.addoption("--mode", action="store", choices=[yatest_lib.ya.RunMode.List, yatest_lib.ya.RunMode.Run], dest="mode", default=yatest_lib.ya.RunMode.Run, help="testing mode")
parser.addoption("--test-list-file", action="store", dest="test_list_file")
parser.addoption("--modulo", default=1, type=int)
parser.addoption("--modulo-index", default=0, type=int)
parser.addoption("--partition-mode", default='SEQUENTIAL', help="Split tests according to partitoin mode")
parser.addoption("--split-by-tests", action='store_true', help="Split test execution by tests instead of suites", default=False)
parser.addoption("--project-path", action="store", default="", help="path to CMakeList where test is declared")
parser.addoption("--build-type", action="store", default="", help="build type")
parser.addoption("--flags", action="append", dest="flags", default=[], help="build flags (-D)")
parser.addoption("--sanitize", action="store", default="", help="sanitize mode")
parser.addoption("--test-stderr", action="store_true", default=False, help="test stderr")
parser.addoption("--test-debug", action="store_true", default=False, help="test debug mode")
parser.addoption("--root-dir", action="store", default=None)
parser.addoption("--ya-trace", action="store", dest="ya_trace_path", default=None, help="path to ya trace report")
parser.addoption("--ya-version", action="store", dest="ya_version", default=0, type=int, help="allows to be compatible with ya and the new changes in ya-dev")
parser.addoption(
"--test-suffix", action="store", dest="test_suffix", default=None, help="add suffix to every test name"
)
parser.addoption("--gdb-path", action="store", dest="gdb_path", default="", help="path the canonical gdb binary")
parser.addoption("--collect-cores", action="store_true", dest="collect_cores", default=False, help="allows core dump file recovering during test")
parser.addoption("--sanitizer-extra-checks", action="store_true", dest="sanitizer_extra_checks", default=False, help="enables extra checks for tests built with sanitizers")
parser.addoption("--report-deselected", action="store_true", dest="report_deselected", default=False, help="report deselected tests to the trace file")
parser.addoption("--pdb-on-sigusr1", action="store_true", default=False, help="setup pdb.set_trace on SIGUSR1")
parser.addoption("--test-tool-bin", help="Path to test_tool")
parser.addoption("--test-list-path", dest="test_list_path", action="store", help="path to test list", default="")
def from_ya_test():
return "YA_TEST_RUNNER" in os.environ
@pytest.hookimpl(tryfirst=True)
def pytest_configure(config):
global pytest_config
pytest_config = config
config.option.continue_on_collection_errors = True
config.from_ya_test = from_ya_test()
config.test_logs = collections.defaultdict(dict)
test_metrics.metrics = {}
config.suite_metrics = {}
config.configure_timestamp = time.time()
context = {
"project_path": config.option.project_path,
"test_stderr": config.option.test_stderr,
"test_debug": config.option.test_debug,
"build_type": config.option.build_type,
"test_traceback": config.option.tbstyle,
"flags": config.option.flags,
"sanitize": config.option.sanitize,
}
if config.option.collectonly:
config.option.mode = yatest_lib.ya.RunMode.List
config.ya = yatest_lib.ya.Ya(
config.option.mode,
config.option.source_root,
config.option.build_root,
config.option.dep_roots,
config.option.output_dir,
config.option.test_params,
context,
config.option.python_path,
config.option.valgrind_path,
config.option.gdb_path,
config.option.data_root,
)
config.option.test_log_level = {
"critical": logging.CRITICAL,
"error": logging.ERROR,
"warning": logging.WARN,
"info": logging.INFO,
"debug": logging.DEBUG,
}[config.option.test_log_level]
if not config.option.collectonly:
setup_logging(os.path.join(config.ya.output_dir, "run.log"), config.option.test_log_level)
config.current_item_nodeid = None
config.current_test_name = None
config.test_cores_count = 0
config.collect_cores = config.option.collect_cores
config.sanitizer_extra_checks = config.option.sanitizer_extra_checks
try:
config.test_tool_bin = config.option.test_tool_bin
except AttributeError:
logging.info("test_tool_bin not specified")
if config.sanitizer_extra_checks:
for envvar in ['LSAN_OPTIONS', 'ASAN_OPTIONS']:
if envvar in os.environ:
os.environ.pop(envvar)
if envvar + '_ORIGINAL' in os.environ:
os.environ[envvar] = os.environ[envvar + '_ORIGINAL']
extra_sys_path = []
# Arcadia paths from the test DEPENDS section of ya.make
extra_sys_path.append(os.path.join(config.option.source_root, config.option.project_path))
# Build root is required for correct import of protobufs, because imports are related to the root
# (like import devtools.dummy_arcadia.protos.lib.my_proto_pb2)
extra_sys_path.append(config.option.build_root)
for path in config.option.dep_roots:
if os.path.isabs(path):
extra_sys_path.append(path)
else:
extra_sys_path.append(os.path.join(config.option.source_root, path))
sys_path_set = set(sys.path)
for path in extra_sys_path:
if path not in sys_path_set:
sys.path.append(path)
sys_path_set.add(path)
os.environ["PYTHONPATH"] = os.pathsep.join(sys.path)
if not config.option.collectonly:
if config.option.ya_trace_path:
config.ya_trace_reporter = TraceReportGenerator(config.option.ya_trace_path)
else:
config.ya_trace_reporter = DryTraceReportGenerator(config.option.ya_trace_path)
config.ya_version = config.option.ya_version
sys.meta_path.append(CustomImporter([config.option.build_root] + [os.path.join(config.option.build_root, dep) for dep in config.option.dep_roots]))
if config.option.pdb_on_sigusr1:
configure_pdb_on_demand()
yatest_common.runtime._set_ya_config(config=config)
# Dump python backtrace in case of any errors
faulthandler.enable()
if hasattr(signal, "SIGQUIT"):
# SIGQUIT is used by test_tool to teardown tests which overruns timeout
faulthandler.register(signal.SIGQUIT, chain=True)
if hasattr(signal, "SIGUSR2"):
signal.signal(signal.SIGUSR2, _graceful_shutdown)
session_should_exit = False
def _graceful_shutdown_on_log(should_exit):
if should_exit:
pytest.exit("Graceful shutdown requested")
def pytest_runtest_logreport(report):
_graceful_shutdown_on_log(session_should_exit)
def pytest_runtest_logstart(nodeid, location):
_graceful_shutdown_on_log(session_should_exit)
def pytest_runtest_logfinish(nodeid, location):
_graceful_shutdown_on_log(session_should_exit)
def _graceful_shutdown(*args):
global session_should_exit
session_should_exit = True
try:
import library.python.coverage
library.python.coverage.stop_coverage_tracing()
except ImportError:
pass
traceback.print_stack(file=sys.stderr)
capman = pytest_config.pluginmanager.getplugin("capturemanager")
capman.suspend(in_=True)
_graceful_shutdown_on_log(not capman.is_globally_capturing())
def _get_rusage():
return resource and resource.getrusage(resource.RUSAGE_SELF)
def _collect_test_rusage(item):
if resource and hasattr(item, "rusage"):
finish_rusage = _get_rusage()
ya_inst = pytest_config.ya
def add_metric(attr_name, metric_name=None, modifier=None):
if not metric_name:
metric_name = attr_name
if not modifier:
modifier = lambda x: x
if hasattr(item.rusage, attr_name):
ya_inst.set_metric_value(metric_name, modifier(getattr(finish_rusage, attr_name) - getattr(item.rusage, attr_name)))
for args in [
("ru_maxrss", "ru_rss", lambda x: x*1024), # to be the same as in util/system/rusage.cpp
("ru_utime",),
("ru_stime",),
("ru_ixrss", None, lambda x: x*1024),
("ru_idrss", None, lambda x: x*1024),
("ru_isrss", None, lambda x: x*1024),
("ru_majflt", "ru_major_pagefaults"),
("ru_minflt", "ru_minor_pagefaults"),
("ru_nswap",),
("ru_inblock",),
("ru_oublock",),
("ru_msgsnd",),
("ru_msgrcv",),
("ru_nsignals",),
("ru_nvcsw",),
("ru_nivcsw",),
]:
add_metric(*args)
def _get_item_tags(item):
tags = []
for key, value in item.keywords.items():
if key == 'pytestmark' and isinstance(value, list):
for mark in value:
tags.append(mark.name)
elif isinstance(value, _pytest.mark.MarkDecorator):
tags.append(key)
return tags
def pytest_runtest_setup(item):
item.rusage = _get_rusage()
pytest_config.test_cores_count = 0
pytest_config.current_item_nodeid = item.nodeid
class_name, test_name = tools.split_node_id(item.nodeid)
test_log_path = tools.get_test_log_file_path(pytest_config.ya.output_dir, class_name, test_name)
setup_logging(
os.path.join(pytest_config.ya.output_dir, "run.log"),
pytest_config.option.test_log_level,
test_log_path
)
pytest_config.test_logs[item.nodeid]['log'] = test_log_path
pytest_config.test_logs[item.nodeid]['logsdir'] = pytest_config.ya.output_dir
pytest_config.current_test_log_path = test_log_path
pytest_config.current_test_name = "{}::{}".format(class_name, test_name)
separator = "#" * 100
yatest_logger.info(separator)
yatest_logger.info(test_name)
yatest_logger.info(separator)
yatest_logger.info("Test setup")
test_item = CrashedTestItem(item.nodeid, pytest_config.option.test_suffix)
pytest_config.ya_trace_reporter.on_start_test_class(test_item)
pytest_config.ya_trace_reporter.on_start_test_case(test_item)
def pytest_runtest_teardown(item, nextitem):
yatest_logger.info("Test teardown")
def pytest_runtest_call(item):
class_name, test_name = tools.split_node_id(item.nodeid)
yatest_logger.info("Test call (class_name: %s, test_name: %s)", class_name, test_name)
def pytest_deselected(items):
config = pytest_config
if config.option.report_deselected:
for item in items:
deselected_item = DeselectedTestItem(item.nodeid, config.option.test_suffix)
config.ya_trace_reporter.on_start_test_class(deselected_item)
config.ya_trace_reporter.on_finish_test_case(deselected_item)
config.ya_trace_reporter.on_finish_test_class(deselected_item)
@pytest.mark.trylast
def pytest_collection_modifyitems(items, config):
def filter_items(filters):
filtered_items = []
deselected_items = []
for item in items:
canonical_node_id = str(CustomTestItem(item.nodeid, pytest_config.option.test_suffix))
matched = False
for flt in filters:
if "::" not in flt and "*" not in flt:
flt += "*" # add support for filtering by module name
if canonical_node_id.endswith(flt) or fnmatch.fnmatch(tools.escape_for_fnmatch(canonical_node_id), tools.escape_for_fnmatch(flt)):
matched = True
if matched:
filtered_items.append(item)
else:
deselected_items.append(item)
config.hook.pytest_deselected(items=deselected_items)
items[:] = filtered_items
def filter_by_full_name(filters):
filter_set = {flt for flt in filters}
filtered_items = []
deselected_items = []
for item in items:
if item.nodeid in filter_set:
filtered_items.append(item)
else:
deselected_items.append(item)
config.hook.pytest_deselected(items=deselected_items)
items[:] = filtered_items
# XXX - check to be removed when tests for peerdirs don't run
for item in items:
if not item.nodeid:
item._nodeid = os.path.basename(item.location[0])
if os.path.exists(config.option.test_list_path):
with open(config.option.test_list_path, 'r') as afile:
chunks = json.load(afile)
filters = chunks[config.option.modulo_index]
filter_by_full_name(filters)
else:
if config.option.test_filter:
filter_items(config.option.test_filter)
partition_mode = config.option.partition_mode
modulo = config.option.modulo
if modulo > 1:
items[:] = sorted(items, key=lambda item: item.nodeid)
modulo_index = config.option.modulo_index
split_by_tests = config.option.split_by_tests
items_by_classes = {}
res = []
for item in items:
if item.nodeid.count("::") == 2 and not split_by_tests:
class_name = item.nodeid.rsplit("::", 1)[0]
if class_name not in items_by_classes:
items_by_classes[class_name] = []
res.append(items_by_classes[class_name])
items_by_classes[class_name].append(item)
else:
res.append([item])
chunk_items = test_splitter.get_splitted_tests(res, modulo, modulo_index, partition_mode, is_sorted=True)
items[:] = []
for item in chunk_items:
items.extend(item)
yatest_logger.info("Modulo %s tests are: %s", modulo_index, chunk_items)
if config.option.mode == yatest_lib.ya.RunMode.Run:
for item in items:
test_item = NotLaunchedTestItem(item.nodeid, config.option.test_suffix)
config.ya_trace_reporter.on_start_test_class(test_item)
config.ya_trace_reporter.on_finish_test_case(test_item)
config.ya_trace_reporter.on_finish_test_class(test_item)
elif config.option.mode == yatest_lib.ya.RunMode.List:
tests = []
for item in items:
item = CustomTestItem(item.nodeid, pytest_config.option.test_suffix, item.keywords)
record = {
"class": item.class_name,
"test": item.test_name,
"tags": _get_item_tags(item),
}
tests.append(record)
if config.option.test_list_file:
with open(config.option.test_list_file, 'w') as afile:
json.dump(tests, afile)
# TODO prettyboy remove after test_tool release - currently it's required for backward compatibility
sys.stderr.write(json.dumps(tests))
def pytest_collectreport(report):
if not report.passed:
if hasattr(pytest_config, 'ya_trace_reporter'):
test_item = TestItem(report, None, pytest_config.option.test_suffix)
pytest_config.ya_trace_reporter.on_error(test_item)
else:
sys.stderr.write(yatest_lib.tools.to_utf8(report.longrepr))
@pytest.mark.tryfirst
def pytest_pyfunc_call(pyfuncitem):
testfunction = pyfuncitem.obj
iscoroutinefunction = getattr(inspect, "iscoroutinefunction", None)
if iscoroutinefunction is not None and iscoroutinefunction(testfunction):
msg = "Coroutine functions are not natively supported and have been skipped.\n"
msg += "You need to install a suitable plugin for your async framework, for example:\n"
msg += " - pytest-asyncio\n"
msg += " - pytest-trio\n"
msg += " - pytest-tornasync"
warnings.warn(PytestUnhandledCoroutineWarning(msg.format(pyfuncitem.nodeid)))
_pytest.outcomes.skip(msg="coroutine function and no async plugin installed (see warnings)")
funcargs = pyfuncitem.funcargs
testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
pyfuncitem.retval = testfunction(**testargs)
return True
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_makereport(item, call):
def logreport(report, result, call):
test_item = TestItem(report, result, pytest_config.option.test_suffix)
if not pytest_config.suite_metrics and context.Ctx.get("YA_PYTEST_START_TIMESTAMP"):
pytest_config.suite_metrics["pytest_startup_duration"] = call.start - context.Ctx["YA_PYTEST_START_TIMESTAMP"]
pytest_config.ya_trace_reporter.dump_suite_metrics()
pytest_config.ya_trace_reporter.on_log_report(test_item)
if report.outcome == "failed":
yatest_logger.error(report.longrepr)
if report.when == "call":
_collect_test_rusage(item)
pytest_config.ya_trace_reporter.on_finish_test_case(test_item)
elif report.when == "setup":
pytest_config.ya_trace_reporter.on_start_test_class(test_item)
if report.outcome != "passed":
pytest_config.ya_trace_reporter.on_start_test_case(test_item)
pytest_config.ya_trace_reporter.on_finish_test_case(test_item)
else:
pytest_config.ya_trace_reporter.on_start_test_case(test_item)
elif report.when == "teardown":
if report.outcome == "failed":
pytest_config.ya_trace_reporter.on_start_test_case(test_item)
pytest_config.ya_trace_reporter.on_finish_test_case(test_item)
else:
pytest_config.ya_trace_reporter.on_finish_test_case(test_item, duration_only=True)
pytest_config.ya_trace_reporter.on_finish_test_class(test_item)
outcome = yield
rep = outcome.get_result()
result = None
if hasattr(item, 'retval') and item.retval is not None:
result = item.retval
if not pytest_config.from_ya_test:
ti = TestItem(rep, result, pytest_config.option.test_suffix)
tr = pytest_config.pluginmanager.getplugin('terminalreporter')
tr.write_line("{} - Validating canonical data is not supported when running standalone binary".format(ti), yellow=True, bold=True)
logreport(rep, result, call)
def pytest_make_parametrize_id(config, val, argname):
# Avoid <, > symbols in canondata file names
if inspect.isfunction(val) and val.__name__ == "<lambda>":
return str(argname)
return None
def get_formatted_error(report):
if isinstance(report.longrepr, tuple):
text = ""
for entry in report.longrepr:
text += colorize(entry)
else:
text = colorize(report.longrepr)
text = yatest_lib.tools.to_utf8(text)
return text
def colorize(longrepr):
# use default pytest colorization
if pytest_config.option.tbstyle != "short":
io = py.io.TextIO()
if six.PY2:
writer = py.io.TerminalWriter(file=io)
else:
writer = _pytest._io.TerminalWriter(file=io)
# enable colorization
writer.hasmarkup = True
if hasattr(longrepr, 'reprtraceback') and hasattr(longrepr.reprtraceback, 'toterminal'):
longrepr.reprtraceback.toterminal(writer)
return io.getvalue().strip()
return yatest_lib.tools.to_utf8(longrepr)
text = yatest_lib.tools.to_utf8(longrepr)
pos = text.find("E ")
if pos == -1:
return text
bt, error = text[:pos], text[pos:]
filters = [
# File path, line number and function name
(re.compile(r"^(.*?):(\d+): in (\S+)", flags=re.MULTILINE), r"[[unimp]]\1[[rst]]:[[alt2]]\2[[rst]]: in [[alt1]]\3[[rst]]"),
]
for regex, substitution in filters:
bt = regex.sub(substitution, bt)
return "{}[[bad]]{}".format(bt, error)
class TestItem(object):
def __init__(self, report, result, test_suffix):
self._result = result
self.nodeid = report.nodeid
self._class_name, self._test_name = tools.split_node_id(self.nodeid, test_suffix)
self._error = ""
self._status = None
self._process_report(report)
self._duration = hasattr(report, 'duration') and report.duration or 0
self._keywords = getattr(report, "keywords", {})
def _process_report(self, report):
if report.longrepr:
self.set_error(report)
if hasattr(report, 'when') and report.when != "call":
self.set_error(report.when + " failed:\n" + self._error)
report_teststatus = _pytest.skipping.pytest_report_teststatus(report)
if report_teststatus is not None:
report_teststatus = report_teststatus[0]
if report_teststatus == 'xfailed':
self._status = 'xfail'
self.set_error(report.wasxfail, 'imp')
elif report_teststatus == 'xpassed':
self._status = 'xpass'
self.set_error("Test unexpectedly passed")
elif report.skipped:
self._status = 'skipped'
self.set_error(yatest_lib.tools.to_utf8(report.longrepr[-1]))
elif report.passed:
self._status = 'good'
else:
self._status = 'fail'
@property
def status(self):
return self._status
def set_status(self, status):
self._status = status
@property
def test_name(self):
return tools.normalize_name(self._test_name)
@property
def class_name(self):
return tools.normalize_name(self._class_name)
@property
def error(self):
return self._error
def set_error(self, entry, marker='bad'):
if isinstance(entry, _pytest.reports.BaseReport):
self._error = get_formatted_error(entry)
else:
self._error = "[[{}]]{}".format(yatest_lib.tools.to_str(marker), yatest_lib.tools.to_str(entry))
@property
def duration(self):
return self._duration
@property
def result(self):
if 'not_canonize' in self._keywords:
return None
return self._result
@property
def keywords(self):
return self._keywords
def __str__(self):
return "{}::{}".format(self.class_name, self.test_name)
class CustomTestItem(TestItem):
def __init__(self, nodeid, test_suffix, keywords=None):
self._result = None
self.nodeid = nodeid
self._class_name, self._test_name = tools.split_node_id(nodeid, test_suffix)
self._duration = 0
self._error = ""
self._keywords = keywords if keywords is not None else {}
class NotLaunchedTestItem(CustomTestItem):
def __init__(self, nodeid, test_suffix):
super(NotLaunchedTestItem, self).__init__(nodeid, test_suffix)
self._status = "not_launched"
class CrashedTestItem(CustomTestItem):
def __init__(self, nodeid, test_suffix):
super(CrashedTestItem, self).__init__(nodeid, test_suffix)
self._status = "crashed"
class DeselectedTestItem(CustomTestItem):
def __init__(self, nodeid, test_suffix):
super(DeselectedTestItem, self).__init__(nodeid, test_suffix)
self._status = "deselected"
class TraceReportGenerator(object):
def __init__(self, out_file_path):
self._filename = out_file_path
self._file = open(out_file_path, 'w')
self._wreckage_filename = out_file_path + '.wreckage'
self._test_messages = {}
self._test_duration = {}
# Some machinery to avoid data corruption due sloppy fork()
self._current_test = (None, None)
self._pid = os.getpid()
self._check_intricate_respawn()
def _check_intricate_respawn(self):
pid_file = self._filename + '.pid'
try:
# python2 doesn't support open(f, 'x')
afile = os.fdopen(os.open(pid_file, os.O_WRONLY | os.O_EXCL | os.O_CREAT), 'w')
afile.write(str(self._pid))
afile.close()
return
except OSError as e:
if e.errno != errno.EEXIST:
raise
# Looks like the test binary was respawned
if from_ya_test():
try:
with open(pid_file) as afile:
prev_pid = afile.read()
except Exception as e:
prev_pid = '(failed to obtain previous pid: {})'.format(e)
parts = [
"Aborting test run: test machinery found that the test binary {} has already been run before.".format(sys.executable),
"Looks like test has incorrect respawn/relaunch logic within test binary.",
"Test should not try to restart itself - this is a poorly designed test case that leads to errors and could corrupt internal test machinery files.",
"Debug info: previous pid:{} current:{}".format(prev_pid, self._pid),
]
msg = '\n'.join(parts)
yatest_logger.error(msg)
if filelock:
lock = filelock.FileLock(self._wreckage_filename + '.lock')
lock.acquire()
with open(self._wreckage_filename, 'a') as afile:
self._file = afile
self._dump_trace('chunk_event', {"errors": [('fail', '[[bad]]' + msg)]})
raise Exception(msg)
else:
# Test binary is launched without `ya make -t`'s testing machinery - don't rely on clean environment
pass
def on_start_test_class(self, test_item):
pytest_config.ya.set_test_item_node_id(test_item.nodeid)
class_name = test_item.class_name.decode('utf-8') if sys.version_info[0] < 3 else test_item.class_name
self._current_test = (class_name, None)
self.trace('test-started', {'class': class_name})
def on_finish_test_class(self, test_item):
pytest_config.ya.set_test_item_node_id(test_item.nodeid)
self.trace('test-finished', {'class': test_item.class_name.decode('utf-8') if sys.version_info[0] < 3 else test_item.class_name})
def on_start_test_case(self, test_item):
class_name = yatest_lib.tools.to_utf8(test_item.class_name)
subtest_name = yatest_lib.tools.to_utf8(test_item.test_name)
message = {
'class': class_name,
'subtest': subtest_name,
}
if test_item.nodeid in pytest_config.test_logs:
message['logs'] = pytest_config.test_logs[test_item.nodeid]
pytest_config.ya.set_test_item_node_id(test_item.nodeid)
self._current_test = (class_name, subtest_name)
self.trace('subtest-started', message)
def on_finish_test_case(self, test_item, duration_only=False):
if test_item.result is not None:
try:
result = canon.serialize(test_item.result)
except Exception as e:
yatest_logger.exception("Error while serializing test results")
test_item.set_error("Invalid test result: {}".format(e))
test_item.set_status("fail")
result = None
else:
result = None
if duration_only and test_item.nodeid in self._test_messages: # add teardown time
message = self._test_messages[test_item.nodeid]
else:
comment = self._test_messages[test_item.nodeid]['comment'] if test_item.nodeid in self._test_messages else ''
comment += self._get_comment(test_item)
message = {
'class': yatest_lib.tools.to_utf8(test_item.class_name),
'subtest': yatest_lib.tools.to_utf8(test_item.test_name),
'status': test_item.status,
'comment': comment,
'result': result,
'metrics': test_metrics.get(test_item.nodeid),
'is_diff_test': 'diff_test' in test_item.keywords,
'tags': _get_item_tags(test_item),
}
if test_item.nodeid in pytest_config.test_logs:
message['logs'] = pytest_config.test_logs[test_item.nodeid]
message['time'] = self._test_duration.get(test_item.nodeid, test_item.duration)
self.trace('subtest-finished', message)
self._test_messages[test_item.nodeid] = message
def dump_suite_metrics(self):
message = {"metrics": pytest_config.suite_metrics}
self.trace("chunk_event", message)
def on_error(self, test_item):
self.trace('chunk_event', {"errors": [(test_item.status, self._get_comment(test_item))]})
def on_log_report(self, test_item):
if test_item.nodeid in self._test_duration:
self._test_duration[test_item.nodeid] += test_item._duration
else:
self._test_duration[test_item.nodeid] = test_item._duration
@staticmethod
def _get_comment(test_item):
msg = yatest_lib.tools.to_utf8(test_item.error)
if not msg:
return ""
return msg + "[[rst]]"
def _dump_trace(self, name, value):
event = {
'timestamp': time.time(),
'value': value,
'name': name
}
data = yatest_lib.tools.to_str(json.dumps(event, ensure_ascii=False))
self._file.write(data + '\n')
self._file.flush()
def _check_sloppy_fork(self, name, value):
if self._pid == os.getpid():
return
yatest_logger.error("Skip tracing to avoid data corruption, name = %s, value = %s", name, value)
try:
# Lock wreckage tracefile to avoid race if multiple tests use fork sloppily
if filelock:
lock = filelock.FileLock(self._wreckage_filename + '.lock')
lock.acquire()
with open(self._wreckage_filename, 'a') as afile:
self._file = afile
parts = [
"It looks like you have leaked process - it could corrupt internal test machinery files.",
"Usually it happens when you casually use fork() without os._exit(),",
"which results in two pytest processes running at the same time.",
"Pid of the original pytest's process is {}, however current process has {} pid.".format(self._pid, os.getpid()),
]
if self._current_test[1]:
parts.append("Most likely the problem is in '{}' test.".format(self._current_test))
else:
parts.append("Most likely new process was created before any test was launched (during the import stage?).")
if value.get('comment'):
comment = value.get('comment', '').strip()
# multiline comment
newline_required = '\n' if '\n' in comment else ''
parts.append("Debug info: name = '{}' comment:{}{}".format(name, newline_required, comment))
else:
val_str = json.dumps(value, ensure_ascii=False).encode('utf-8')
parts.append("Debug info: name = '{}' value = '{}'".format(name, base64.b64encode(val_str)))
msg = "[[bad]]{}".format('\n'.join(parts))
class_name, subtest_name = self._current_test
if subtest_name:
data = {
'class': class_name,
'subtest': subtest_name,
'status': 'fail',
'comment': msg,
}
# overwrite original status
self._dump_trace('subtest-finished', data)
else:
self._dump_trace('chunk_event', {"errors": [('fail', msg)]})
except Exception as e:
yatest_logger.exception(e)
finally:
os._exit(38)
def trace(self, name, value):
self._check_sloppy_fork(name, value)
self._dump_trace(name, value)
class DryTraceReportGenerator(TraceReportGenerator):
"""
Generator does not write any information.
"""
def __init__(self, *args, **kwargs):
self._test_messages = {}
self._test_duration = {}
def trace(self, name, value):
pass