diff options
| author | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
|---|---|---|
| committer | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
| commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
| tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/python/pytest/py2/_pytest/runner.py | |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/python/pytest/py2/_pytest/runner.py')
| -rw-r--r-- | contrib/python/pytest/py2/_pytest/runner.py | 376 |
1 files changed, 376 insertions, 0 deletions
diff --git a/contrib/python/pytest/py2/_pytest/runner.py b/contrib/python/pytest/py2/_pytest/runner.py new file mode 100644 index 00000000000..34ae9177388 --- /dev/null +++ b/contrib/python/pytest/py2/_pytest/runner.py @@ -0,0 +1,376 @@ +# -*- coding: utf-8 -*- +""" basic collect and runtest protocol implementations """ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import bdb +import os +import sys +from time import time + +import attr +import six + +from .reports import CollectErrorRepr +from .reports import CollectReport +from .reports import TestReport +from _pytest._code.code import ExceptionInfo +from _pytest.compat import safe_str +from _pytest.outcomes import Exit +from _pytest.outcomes import Skipped +from _pytest.outcomes import TEST_OUTCOME + +# +# pytest plugin hooks + + +def pytest_addoption(parser): + group = parser.getgroup("terminal reporting", "reporting", after="general") + group.addoption( + "--durations", + action="store", + type=int, + default=None, + metavar="N", + help="show N slowest setup/test durations (N=0 for all).", + ), + + +def pytest_terminal_summary(terminalreporter): + durations = terminalreporter.config.option.durations + verbose = terminalreporter.config.getvalue("verbose") + if durations is None: + return + tr = terminalreporter + dlist = [] + for replist in tr.stats.values(): + for rep in replist: + if hasattr(rep, "duration"): + dlist.append(rep) + if not dlist: + return + dlist.sort(key=lambda x: x.duration) + dlist.reverse() + if not durations: + tr.write_sep("=", "slowest test durations") + else: + tr.write_sep("=", "slowest %s test durations" % durations) + dlist = dlist[:durations] + + for rep in dlist: + if verbose < 2 and rep.duration < 0.005: + tr.write_line("") + tr.write_line("(0.00 durations hidden. Use -vv to show these durations.)") + break + tr.write_line("%02.2fs %-8s %s" % (rep.duration, rep.when, rep.nodeid)) + + +def pytest_sessionstart(session): + session._setupstate = SetupState() + + +def pytest_sessionfinish(session): + session._setupstate.teardown_all() + + +def pytest_runtest_protocol(item, nextitem): + item.ihook.pytest_runtest_logstart(nodeid=item.nodeid, location=item.location) + runtestprotocol(item, nextitem=nextitem) + item.ihook.pytest_runtest_logfinish(nodeid=item.nodeid, location=item.location) + return True + + +def runtestprotocol(item, log=True, nextitem=None): + hasrequest = hasattr(item, "_request") + if hasrequest and not item._request: + item._initrequest() + rep = call_and_report(item, "setup", log) + reports = [rep] + if rep.passed: + if item.config.getoption("setupshow", False): + show_test_item(item) + if not item.config.getoption("setuponly", False): + reports.append(call_and_report(item, "call", log)) + reports.append(call_and_report(item, "teardown", log, nextitem=nextitem)) + # after all teardown hooks have been called + # want funcargs and request info to go away + if hasrequest: + item._request = False + item.funcargs = None + return reports + + +def show_test_item(item): + """Show test function, parameters and the fixtures of the test item.""" + tw = item.config.get_terminal_writer() + tw.line() + tw.write(" " * 8) + tw.write(item._nodeid) + used_fixtures = sorted(item._fixtureinfo.name2fixturedefs.keys()) + if used_fixtures: + tw.write(" (fixtures used: {})".format(", ".join(used_fixtures))) + + +def pytest_runtest_setup(item): + _update_current_test_var(item, "setup") + item.session._setupstate.prepare(item) + + +def pytest_runtest_call(item): + _update_current_test_var(item, "call") + sys.last_type, sys.last_value, sys.last_traceback = (None, None, None) + try: + item.runtest() + except Exception: + # Store trace info to allow postmortem debugging + type, value, tb = sys.exc_info() + tb = tb.tb_next # Skip *this* frame + sys.last_type = type + sys.last_value = value + sys.last_traceback = tb + del type, value, tb # Get rid of these in this frame + raise + + +def pytest_runtest_teardown(item, nextitem): + _update_current_test_var(item, "teardown") + item.session._setupstate.teardown_exact(item, nextitem) + _update_current_test_var(item, None) + + +def _update_current_test_var(item, when): + """ + Update PYTEST_CURRENT_TEST to reflect the current item and stage. + + If ``when`` is None, delete PYTEST_CURRENT_TEST from the environment. + """ + var_name = "PYTEST_CURRENT_TEST" + if when: + value = "{} ({})".format(item.nodeid, when) + # don't allow null bytes on environment variables (see #2644, #2957) + value = value.replace("\x00", "(null)") + os.environ[var_name] = value + else: + os.environ.pop(var_name) + + +def pytest_report_teststatus(report): + if report.when in ("setup", "teardown"): + if report.failed: + # category, shortletter, verbose-word + return "error", "E", "ERROR" + elif report.skipped: + return "skipped", "s", "SKIPPED" + else: + return "", "", "" + + +# +# Implementation + + +def call_and_report(item, when, log=True, **kwds): + call = call_runtest_hook(item, when, **kwds) + hook = item.ihook + report = hook.pytest_runtest_makereport(item=item, call=call) + if log: + hook.pytest_runtest_logreport(report=report) + if check_interactive_exception(call, report): + hook.pytest_exception_interact(node=item, call=call, report=report) + return report + + +def check_interactive_exception(call, report): + return call.excinfo and not ( + hasattr(report, "wasxfail") + or call.excinfo.errisinstance(Skipped) + or call.excinfo.errisinstance(bdb.BdbQuit) + ) + + +def call_runtest_hook(item, when, **kwds): + hookname = "pytest_runtest_" + when + ihook = getattr(item.ihook, hookname) + reraise = (Exit,) + if not item.config.getoption("usepdb", False): + reraise += (KeyboardInterrupt,) + return CallInfo.from_call( + lambda: ihook(item=item, **kwds), when=when, reraise=reraise + ) + + [email protected](repr=False) +class CallInfo(object): + """ Result/Exception info a function invocation. """ + + _result = attr.ib() + # Optional[ExceptionInfo] + excinfo = attr.ib() + start = attr.ib() + stop = attr.ib() + when = attr.ib() + + @property + def result(self): + if self.excinfo is not None: + raise AttributeError("{!r} has no valid result".format(self)) + return self._result + + @classmethod + def from_call(cls, func, when, reraise=None): + #: context of invocation: one of "setup", "call", + #: "teardown", "memocollect" + start = time() + excinfo = None + try: + result = func() + except: # noqa + excinfo = ExceptionInfo.from_current() + if reraise is not None and excinfo.errisinstance(reraise): + raise + result = None + stop = time() + return cls(start=start, stop=stop, when=when, result=result, excinfo=excinfo) + + def __repr__(self): + if self.excinfo is not None: + status = "exception" + value = self.excinfo.value + else: + # TODO: investigate unification + value = repr(self._result) + status = "result" + return "<CallInfo when={when!r} {status}: {value}>".format( + when=self.when, value=safe_str(value), status=status + ) + + +def pytest_runtest_makereport(item, call): + return TestReport.from_item_and_call(item, call) + + +def pytest_make_collect_report(collector): + call = CallInfo.from_call(lambda: list(collector.collect()), "collect") + longrepr = None + if not call.excinfo: + outcome = "passed" + else: + from _pytest import nose + + skip_exceptions = (Skipped,) + nose.get_skip_exceptions() + if call.excinfo.errisinstance(skip_exceptions): + outcome = "skipped" + r = collector._repr_failure_py(call.excinfo, "line").reprcrash + longrepr = (str(r.path), r.lineno, r.message) + else: + outcome = "failed" + errorinfo = collector.repr_failure(call.excinfo) + if not hasattr(errorinfo, "toterminal"): + errorinfo = CollectErrorRepr(errorinfo) + longrepr = errorinfo + rep = CollectReport( + collector.nodeid, outcome, longrepr, getattr(call, "result", None) + ) + rep.call = call # see collect_one_node + return rep + + +class SetupState(object): + """ shared state for setting up/tearing down test items or collectors. """ + + def __init__(self): + self.stack = [] + self._finalizers = {} + + def addfinalizer(self, finalizer, colitem): + """ attach a finalizer to the given colitem. + if colitem is None, this will add a finalizer that + is called at the end of teardown_all(). + """ + assert colitem and not isinstance(colitem, tuple) + assert callable(finalizer) + # assert colitem in self.stack # some unit tests don't setup stack :/ + self._finalizers.setdefault(colitem, []).append(finalizer) + + def _pop_and_teardown(self): + colitem = self.stack.pop() + self._teardown_with_finalization(colitem) + + def _callfinalizers(self, colitem): + finalizers = self._finalizers.pop(colitem, None) + exc = None + while finalizers: + fin = finalizers.pop() + try: + fin() + except TEST_OUTCOME: + # XXX Only first exception will be seen by user, + # ideally all should be reported. + if exc is None: + exc = sys.exc_info() + if exc: + six.reraise(*exc) + + def _teardown_with_finalization(self, colitem): + self._callfinalizers(colitem) + if hasattr(colitem, "teardown"): + colitem.teardown() + for colitem in self._finalizers: + assert ( + colitem is None or colitem in self.stack or isinstance(colitem, tuple) + ) + + def teardown_all(self): + while self.stack: + self._pop_and_teardown() + for key in list(self._finalizers): + self._teardown_with_finalization(key) + assert not self._finalizers + + def teardown_exact(self, item, nextitem): + needed_collectors = nextitem and nextitem.listchain() or [] + self._teardown_towards(needed_collectors) + + def _teardown_towards(self, needed_collectors): + exc = None + while self.stack: + if self.stack == needed_collectors[: len(self.stack)]: + break + try: + self._pop_and_teardown() + except TEST_OUTCOME: + # XXX Only first exception will be seen by user, + # ideally all should be reported. + if exc is None: + exc = sys.exc_info() + if exc: + six.reraise(*exc) + + def prepare(self, colitem): + """ setup objects along the collector chain to the test-method + and teardown previously setup objects.""" + needed_collectors = colitem.listchain() + self._teardown_towards(needed_collectors) + + # check if the last collection node has raised an error + for col in self.stack: + if hasattr(col, "_prepare_exc"): + six.reraise(*col._prepare_exc) + for col in needed_collectors[len(self.stack) :]: + self.stack.append(col) + try: + col.setup() + except TEST_OUTCOME: + col._prepare_exc = sys.exc_info() + raise + + +def collect_one_node(collector): + ihook = collector.ihook + ihook.pytest_collectstart(collector=collector) + rep = ihook.pytest_make_collect_report(collector=collector) + call = rep.__dict__.pop("call", None) + if call and check_interactive_exception(call, rep): + ihook.pytest_exception_interact(node=collector, call=call, report=rep) + return rep |
