aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest/py3/_pytest/runner.py
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/python/pytest/py3/_pytest/runner.py
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/runner.py')
-rw-r--r--contrib/python/pytest/py3/_pytest/runner.py386
1 files changed, 386 insertions, 0 deletions
diff --git a/contrib/python/pytest/py3/_pytest/runner.py b/contrib/python/pytest/py3/_pytest/runner.py
new file mode 100644
index 0000000000..412ea44a87
--- /dev/null
+++ b/contrib/python/pytest/py3/_pytest/runner.py
@@ -0,0 +1,386 @@
+""" basic collect and runtest protocol implementations """
+import bdb
+import os
+import sys
+from time import time
+from typing import Callable
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import attr
+
+from .reports import CollectErrorRepr
+from .reports import CollectReport
+from .reports import TestReport
+from _pytest._code.code import ExceptionChainRepr
+from _pytest._code.code import ExceptionInfo
+from _pytest.compat import TYPE_CHECKING
+from _pytest.nodes import Collector
+from _pytest.nodes import Node
+from _pytest.outcomes import Exit
+from _pytest.outcomes import Skipped
+from _pytest.outcomes import TEST_OUTCOME
+
+if TYPE_CHECKING:
+ from typing import Type
+ from typing_extensions import Literal
+
+#
+# pytest plugin hooks
+
+
+def pytest_addoption(parser):
+ group = parser.getgroup("terminal reporting", "reporting", after="general")
+ group.addoption(
+ "--durations",
+ action="store",
+ type=int,
+ default=None,
+ metavar="N",
+ help="show N slowest setup/test durations (N=0 for all).",
+ )
+
+
+def pytest_terminal_summary(terminalreporter):
+ durations = terminalreporter.config.option.durations
+ verbose = terminalreporter.config.getvalue("verbose")
+ if durations is None:
+ return
+ tr = terminalreporter
+ dlist = []
+ for replist in tr.stats.values():
+ for rep in replist:
+ if hasattr(rep, "duration"):
+ dlist.append(rep)
+ if not dlist:
+ return
+ dlist.sort(key=lambda x: x.duration)
+ dlist.reverse()
+ if not durations:
+ tr.write_sep("=", "slowest test durations")
+ else:
+ tr.write_sep("=", "slowest %s test durations" % durations)
+ dlist = dlist[:durations]
+
+ for rep in dlist:
+ if verbose < 2 and rep.duration < 0.005:
+ tr.write_line("")
+ tr.write_line("(0.00 durations hidden. Use -vv to show these durations.)")
+ break
+ tr.write_line("{:02.2f}s {:<8} {}".format(rep.duration, rep.when, rep.nodeid))
+
+
+def pytest_sessionstart(session):
+ session._setupstate = SetupState()
+
+
+def pytest_sessionfinish(session):
+ session._setupstate.teardown_all()
+
+
+def pytest_runtest_protocol(item, nextitem):
+ item.ihook.pytest_runtest_logstart(nodeid=item.nodeid, location=item.location)
+ runtestprotocol(item, nextitem=nextitem)
+ item.ihook.pytest_runtest_logfinish(nodeid=item.nodeid, location=item.location)
+ return True
+
+
+def runtestprotocol(item, log=True, nextitem=None):
+ hasrequest = hasattr(item, "_request")
+ if hasrequest and not item._request:
+ item._initrequest()
+ rep = call_and_report(item, "setup", log)
+ reports = [rep]
+ if rep.passed:
+ if item.config.getoption("setupshow", False):
+ show_test_item(item)
+ if not item.config.getoption("setuponly", False):
+ reports.append(call_and_report(item, "call", log))
+ reports.append(call_and_report(item, "teardown", log, nextitem=nextitem))
+ # after all teardown hooks have been called
+ # want funcargs and request info to go away
+ if hasrequest:
+ item._request = False
+ item.funcargs = None
+ return reports
+
+
+def show_test_item(item):
+ """Show test function, parameters and the fixtures of the test item."""
+ tw = item.config.get_terminal_writer()
+ tw.line()
+ tw.write(" " * 8)
+ tw.write(item.nodeid)
+ used_fixtures = sorted(getattr(item, "fixturenames", []))
+ if used_fixtures:
+ tw.write(" (fixtures used: {})".format(", ".join(used_fixtures)))
+
+
+def pytest_runtest_setup(item):
+ _update_current_test_var(item, "setup")
+ item.session._setupstate.prepare(item)
+
+
+def pytest_runtest_call(item):
+ _update_current_test_var(item, "call")
+ try:
+ del sys.last_type
+ del sys.last_value
+ del sys.last_traceback
+ except AttributeError:
+ pass
+ try:
+ item.runtest()
+ except Exception as e:
+ # Store trace info to allow postmortem debugging
+ sys.last_type = type(e)
+ sys.last_value = e
+ assert e.__traceback__ is not None
+ # Skip *this* frame
+ sys.last_traceback = e.__traceback__.tb_next
+ raise e
+
+
+def pytest_runtest_teardown(item, nextitem):
+ _update_current_test_var(item, "teardown")
+ item.session._setupstate.teardown_exact(item, nextitem)
+ _update_current_test_var(item, None)
+
+
+def _update_current_test_var(item, when):
+ """
+ Update PYTEST_CURRENT_TEST to reflect the current item and stage.
+
+ If ``when`` is None, delete PYTEST_CURRENT_TEST from the environment.
+ """
+ var_name = "PYTEST_CURRENT_TEST"
+ if when:
+ value = "{} ({})".format(item.nodeid, when)
+ # don't allow null bytes on environment variables (see #2644, #2957)
+ value = value.replace("\x00", "(null)")
+ os.environ[var_name] = value
+ else:
+ os.environ.pop(var_name)
+
+
+def pytest_report_teststatus(report):
+ if report.when in ("setup", "teardown"):
+ if report.failed:
+ # category, shortletter, verbose-word
+ return "error", "E", "ERROR"
+ elif report.skipped:
+ return "skipped", "s", "SKIPPED"
+ else:
+ return "", "", ""
+
+
+#
+# Implementation
+
+
+def call_and_report(
+ item, when: "Literal['setup', 'call', 'teardown']", log=True, **kwds
+):
+ call = call_runtest_hook(item, when, **kwds)
+ hook = item.ihook
+ report = hook.pytest_runtest_makereport(item=item, call=call)
+ if log:
+ hook.pytest_runtest_logreport(report=report)
+ if check_interactive_exception(call, report):
+ hook.pytest_exception_interact(node=item, call=call, report=report)
+ return report
+
+
+def check_interactive_exception(call, report):
+ return call.excinfo and not (
+ hasattr(report, "wasxfail")
+ or call.excinfo.errisinstance(Skipped)
+ or call.excinfo.errisinstance(bdb.BdbQuit)
+ )
+
+
+def call_runtest_hook(item, when: "Literal['setup', 'call', 'teardown']", **kwds):
+ if when == "setup":
+ ihook = item.ihook.pytest_runtest_setup
+ elif when == "call":
+ ihook = item.ihook.pytest_runtest_call
+ elif when == "teardown":
+ ihook = item.ihook.pytest_runtest_teardown
+ else:
+ assert False, "Unhandled runtest hook case: {}".format(when)
+ reraise = (Exit,) # type: Tuple[Type[BaseException], ...]
+ if not item.config.getoption("usepdb", False):
+ reraise += (KeyboardInterrupt,)
+ return CallInfo.from_call(
+ lambda: ihook(item=item, **kwds), when=when, reraise=reraise
+ )
+
+
+@attr.s(repr=False)
+class CallInfo:
+ """ Result/Exception info a function invocation. """
+
+ _result = attr.ib()
+ excinfo = attr.ib(type=Optional[ExceptionInfo])
+ start = attr.ib()
+ stop = attr.ib()
+ when = attr.ib()
+
+ @property
+ def result(self):
+ if self.excinfo is not None:
+ raise AttributeError("{!r} has no valid result".format(self))
+ return self._result
+
+ @classmethod
+ def from_call(cls, func, when, reraise=None) -> "CallInfo":
+ #: context of invocation: one of "setup", "call",
+ #: "teardown", "memocollect"
+ start = time()
+ excinfo = None
+ try:
+ result = func()
+ except: # noqa
+ excinfo = ExceptionInfo.from_current()
+ if reraise is not None and excinfo.errisinstance(reraise):
+ raise
+ result = None
+ stop = time()
+ return cls(start=start, stop=stop, when=when, result=result, excinfo=excinfo)
+
+ def __repr__(self):
+ if self.excinfo is None:
+ return "<CallInfo when={!r} result: {!r}>".format(self.when, self._result)
+ return "<CallInfo when={!r} excinfo={!r}>".format(self.when, self.excinfo)
+
+
+def pytest_runtest_makereport(item, call):
+ return TestReport.from_item_and_call(item, call)
+
+
+def pytest_make_collect_report(collector: Collector) -> CollectReport:
+ call = CallInfo.from_call(lambda: list(collector.collect()), "collect")
+ longrepr = None
+ if not call.excinfo:
+ outcome = "passed"
+ else:
+ skip_exceptions = [Skipped]
+ unittest = sys.modules.get("unittest")
+ if unittest is not None:
+ # Type ignored because unittest is loaded dynamically.
+ skip_exceptions.append(unittest.SkipTest) # type: ignore
+ if call.excinfo.errisinstance(tuple(skip_exceptions)):
+ outcome = "skipped"
+ r_ = collector._repr_failure_py(call.excinfo, "line")
+ assert isinstance(r_, ExceptionChainRepr), repr(r_)
+ r = r_.reprcrash
+ assert r
+ longrepr = (str(r.path), r.lineno, r.message)
+ else:
+ outcome = "failed"
+ errorinfo = collector.repr_failure(call.excinfo)
+ if not hasattr(errorinfo, "toterminal"):
+ errorinfo = CollectErrorRepr(errorinfo)
+ longrepr = errorinfo
+ rep = CollectReport(
+ collector.nodeid, outcome, longrepr, getattr(call, "result", None)
+ )
+ rep.call = call # type: ignore # see collect_one_node
+ return rep
+
+
+class SetupState:
+ """ shared state for setting up/tearing down test items or collectors. """
+
+ def __init__(self):
+ self.stack = [] # type: List[Node]
+ self._finalizers = {} # type: Dict[Node, List[Callable[[], None]]]
+
+ def addfinalizer(self, finalizer, colitem):
+ """ attach a finalizer to the given colitem. """
+ assert colitem and not isinstance(colitem, tuple)
+ assert callable(finalizer)
+ # assert colitem in self.stack # some unit tests don't setup stack :/
+ self._finalizers.setdefault(colitem, []).append(finalizer)
+
+ def _pop_and_teardown(self):
+ colitem = self.stack.pop()
+ self._teardown_with_finalization(colitem)
+
+ def _callfinalizers(self, colitem):
+ finalizers = self._finalizers.pop(colitem, None)
+ exc = None
+ while finalizers:
+ fin = finalizers.pop()
+ try:
+ fin()
+ except TEST_OUTCOME as e:
+ # XXX Only first exception will be seen by user,
+ # ideally all should be reported.
+ if exc is None:
+ exc = e
+ if exc:
+ raise exc
+
+ def _teardown_with_finalization(self, colitem):
+ self._callfinalizers(colitem)
+ colitem.teardown()
+ for colitem in self._finalizers:
+ assert colitem in self.stack
+
+ def teardown_all(self):
+ while self.stack:
+ self._pop_and_teardown()
+ for key in list(self._finalizers):
+ self._teardown_with_finalization(key)
+ assert not self._finalizers
+
+ def teardown_exact(self, item, nextitem):
+ needed_collectors = nextitem and nextitem.listchain() or []
+ self._teardown_towards(needed_collectors)
+
+ def _teardown_towards(self, needed_collectors):
+ exc = None
+ while self.stack:
+ if self.stack == needed_collectors[: len(self.stack)]:
+ break
+ try:
+ self._pop_and_teardown()
+ except TEST_OUTCOME as e:
+ # XXX Only first exception will be seen by user,
+ # ideally all should be reported.
+ if exc is None:
+ exc = e
+ if exc:
+ raise exc
+
+ def prepare(self, colitem):
+ """ setup objects along the collector chain to the test-method
+ and teardown previously setup objects."""
+ needed_collectors = colitem.listchain()
+ self._teardown_towards(needed_collectors)
+
+ # check if the last collection node has raised an error
+ for col in self.stack:
+ if hasattr(col, "_prepare_exc"):
+ exc = col._prepare_exc
+ raise exc
+ for col in needed_collectors[len(self.stack) :]:
+ self.stack.append(col)
+ try:
+ col.setup()
+ except TEST_OUTCOME as e:
+ col._prepare_exc = e
+ raise e
+
+
+def collect_one_node(collector):
+ ihook = collector.ihook
+ ihook.pytest_collectstart(collector=collector)
+ rep = ihook.pytest_make_collect_report(collector=collector)
+ call = rep.__dict__.pop("call", None)
+ if call and check_interactive_exception(call, rep):
+ ihook.pytest_exception_interact(node=collector, call=call, report=rep)
+ return rep