aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest/py3/_pytest/terminal.py
diff options
context:
space:
mode:
authordeshevoy <deshevoy@yandex-team.ru>2022-02-10 16:46:56 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:56 +0300
commite988f30484abe5fdeedcc7a5d3c226c01a21800c (patch)
tree0a217b173aabb57b7e51f8a169989b1a3e0309fe /contrib/python/pytest/py3/_pytest/terminal.py
parent33ee501c05d3f24036ae89766a858930ae66c548 (diff)
downloadydb-e988f30484abe5fdeedcc7a5d3c226c01a21800c.tar.gz
Restoring authorship annotation for <deshevoy@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/terminal.py')
-rw-r--r--contrib/python/pytest/py3/_pytest/terminal.py1060
1 files changed, 530 insertions, 530 deletions
diff --git a/contrib/python/pytest/py3/_pytest/terminal.py b/contrib/python/pytest/py3/_pytest/terminal.py
index fbfb09aecf..7e317310e1 100644
--- a/contrib/python/pytest/py3/_pytest/terminal.py
+++ b/contrib/python/pytest/py3/_pytest/terminal.py
@@ -1,12 +1,12 @@
"""Terminal reporting of the full testing process.
-
-This is a good source for looking at the various reporting hooks.
-"""
-import argparse
+
+This is a good source for looking at the various reporting hooks.
+"""
+import argparse
import datetime
import inspect
-import platform
-import sys
+import platform
+import sys
import warnings
from collections import Counter
from functools import partial
@@ -25,13 +25,13 @@ from typing import TextIO
from typing import Tuple
from typing import TYPE_CHECKING
from typing import Union
-
-import attr
-import pluggy
-import py
-
+
+import attr
+import pluggy
+import py
+
import _pytest._version
-from _pytest import nodes
+from _pytest import nodes
from _pytest import timing
from _pytest._code import ExceptionInfo
from _pytest._code.code import ExceptionRepr
@@ -49,7 +49,7 @@ from _pytest.pathlib import bestrelpath
from _pytest.reports import BaseReport
from _pytest.reports import CollectReport
from _pytest.reports import TestReport
-
+
if TYPE_CHECKING:
from typing_extensions import Literal
@@ -57,7 +57,7 @@ if TYPE_CHECKING:
REPORT_COLLECTING_RESOLUTION = 0.5
-
+
KNOWN_TYPES = (
"failed",
"passed",
@@ -72,13 +72,13 @@ KNOWN_TYPES = (
_REPORTCHARS_DEFAULT = "fE"
-class MoreQuietAction(argparse.Action):
+class MoreQuietAction(argparse.Action):
"""A modified copy of the argparse count action which counts down and updates
the legacy quiet attribute at the same time.
-
+
Used to unify verbosity handling.
- """
-
+ """
+
def __init__(
self,
option_strings: Sequence[str],
@@ -88,14 +88,14 @@ class MoreQuietAction(argparse.Action):
help: Optional[str] = None,
) -> None:
super().__init__(
- option_strings=option_strings,
- dest=dest,
- nargs=0,
- default=default,
- required=required,
- help=help,
- )
-
+ option_strings=option_strings,
+ dest=dest,
+ nargs=0,
+ default=default,
+ required=required,
+ help=help,
+ )
+
def __call__(
self,
parser: argparse.ArgumentParser,
@@ -103,23 +103,23 @@ class MoreQuietAction(argparse.Action):
values: Union[str, Sequence[object], None],
option_string: Optional[str] = None,
) -> None:
- new_count = getattr(namespace, self.dest, 0) - 1
- setattr(namespace, self.dest, new_count)
- # todo Deprecate config.quiet
- namespace.quiet = getattr(namespace, "quiet", 0) + 1
-
-
+ new_count = getattr(namespace, self.dest, 0) - 1
+ setattr(namespace, self.dest, new_count)
+ # todo Deprecate config.quiet
+ namespace.quiet = getattr(namespace, "quiet", 0) + 1
+
+
def pytest_addoption(parser: Parser) -> None:
- group = parser.getgroup("terminal reporting", "reporting", after="general")
- group._addoption(
- "-v",
- "--verbose",
- action="count",
- default=0,
- dest="verbose",
- help="increase verbosity.",
+ group = parser.getgroup("terminal reporting", "reporting", after="general")
+ group._addoption(
+ "-v",
+ "--verbose",
+ action="count",
+ default=0,
+ dest="verbose",
+ help="increase verbosity.",
)
- group._addoption(
+ group._addoption(
"--no-header",
action="store_true",
default=False,
@@ -134,113 +134,113 @@ def pytest_addoption(parser: Parser) -> None:
help="disable summary",
)
group._addoption(
- "-q",
- "--quiet",
- action=MoreQuietAction,
- default=0,
- dest="verbose",
- help="decrease verbosity.",
+ "-q",
+ "--quiet",
+ action=MoreQuietAction,
+ default=0,
+ dest="verbose",
+ help="decrease verbosity.",
)
- group._addoption(
+ group._addoption(
"--verbosity",
dest="verbose",
type=int,
default=0,
help="set verbosity. Default is 0.",
- )
- group._addoption(
- "-r",
- action="store",
- dest="reportchars",
+ )
+ group._addoption(
+ "-r",
+ action="store",
+ dest="reportchars",
default=_REPORTCHARS_DEFAULT,
- metavar="chars",
+ metavar="chars",
help="show extra test summary info as specified by chars: (f)ailed, "
"(E)rror, (s)kipped, (x)failed, (X)passed, "
"(p)assed, (P)assed with output, (a)ll except passed (p/P), or (A)ll. "
"(w)arnings are enabled by default (see --disable-warnings), "
"'N' can be used to reset the list. (default: 'fE').",
- )
- group._addoption(
- "--disable-warnings",
- "--disable-pytest-warnings",
- default=False,
- dest="disable_warnings",
- action="store_true",
- help="disable warnings summary",
- )
- group._addoption(
- "-l",
- "--showlocals",
- action="store_true",
- dest="showlocals",
- default=False,
- help="show locals in tracebacks (disabled by default).",
- )
- group._addoption(
- "--tb",
- metavar="style",
- action="store",
- dest="tbstyle",
- default="auto",
- choices=["auto", "long", "short", "no", "line", "native"],
- help="traceback print mode (auto/long/short/line/native/no).",
- )
- group._addoption(
- "--show-capture",
- action="store",
- dest="showcapture",
- choices=["no", "stdout", "stderr", "log", "all"],
- default="all",
- help="Controls how captured stdout/stderr/log is shown on failed tests. "
- "Default is 'all'.",
- )
- group._addoption(
- "--fulltrace",
- "--full-trace",
- action="store_true",
- default=False,
- help="don't cut any tracebacks (default is to cut).",
- )
- group._addoption(
- "--color",
- metavar="color",
- action="store",
- dest="color",
- default="auto",
- choices=["yes", "no", "auto"],
- help="color terminal output (yes/no/auto).",
- )
+ )
+ group._addoption(
+ "--disable-warnings",
+ "--disable-pytest-warnings",
+ default=False,
+ dest="disable_warnings",
+ action="store_true",
+ help="disable warnings summary",
+ )
+ group._addoption(
+ "-l",
+ "--showlocals",
+ action="store_true",
+ dest="showlocals",
+ default=False,
+ help="show locals in tracebacks (disabled by default).",
+ )
+ group._addoption(
+ "--tb",
+ metavar="style",
+ action="store",
+ dest="tbstyle",
+ default="auto",
+ choices=["auto", "long", "short", "no", "line", "native"],
+ help="traceback print mode (auto/long/short/line/native/no).",
+ )
+ group._addoption(
+ "--show-capture",
+ action="store",
+ dest="showcapture",
+ choices=["no", "stdout", "stderr", "log", "all"],
+ default="all",
+ help="Controls how captured stdout/stderr/log is shown on failed tests. "
+ "Default is 'all'.",
+ )
+ group._addoption(
+ "--fulltrace",
+ "--full-trace",
+ action="store_true",
+ default=False,
+ help="don't cut any tracebacks (default is to cut).",
+ )
+ group._addoption(
+ "--color",
+ metavar="color",
+ action="store",
+ dest="color",
+ default="auto",
+ choices=["yes", "no", "auto"],
+ help="color terminal output (yes/no/auto).",
+ )
group._addoption(
"--code-highlight",
default="yes",
choices=["yes", "no"],
help="Whether code should be highlighted (only if --color is also enabled)",
)
-
- parser.addini(
- "console_output_style",
+
+ parser.addini(
+ "console_output_style",
help='console output: "classic", or with additional progress information ("progress" (percentage) | "count").',
- default="progress",
- )
-
-
+ default="progress",
+ )
+
+
def pytest_configure(config: Config) -> None:
- reporter = TerminalReporter(config, sys.stdout)
- config.pluginmanager.register(reporter, "terminalreporter")
- if config.option.debug or config.option.traceconfig:
-
- def mywriter(tags, args):
- msg = " ".join(map(str, args))
- reporter.write_line("[traceconfig] " + msg)
-
- config.trace.root.setprocessor("pytest:config", mywriter)
-
-
+ reporter = TerminalReporter(config, sys.stdout)
+ config.pluginmanager.register(reporter, "terminalreporter")
+ if config.option.debug or config.option.traceconfig:
+
+ def mywriter(tags, args):
+ msg = " ".join(map(str, args))
+ reporter.write_line("[traceconfig] " + msg)
+
+ config.trace.root.setprocessor("pytest:config", mywriter)
+
+
def getreportopt(config: Config) -> str:
reportchars: str = config.option.reportchars
old_aliases = {"F", "S"}
- reportopts = ""
+ reportopts = ""
for char in reportchars:
if char in old_aliases:
char = char.lower()
@@ -258,97 +258,97 @@ def getreportopt(config: Config) -> str:
elif config.option.disable_warnings and "w" in reportopts:
reportopts = reportopts.replace("w", "")
- return reportopts
-
-
+ return reportopts
+
+
@hookimpl(trylast=True) # after _pytest.runner
def pytest_report_teststatus(report: BaseReport) -> Tuple[str, str, str]:
letter = "F"
- if report.passed:
- letter = "."
- elif report.skipped:
- letter = "s"
-
+ if report.passed:
+ letter = "."
+ elif report.skipped:
+ letter = "s"
+
outcome: str = report.outcome
if report.when in ("collect", "setup", "teardown") and outcome == "failed":
outcome = "error"
letter = "E"
-
+
return outcome, letter, outcome.upper()
-@attr.s
+@attr.s
class WarningReport:
"""Simple structure to hold warnings information captured by ``pytest_warning_recorded``.
-
+
:ivar str message:
User friendly message about the warning.
:ivar str|None nodeid:
nodeid that generated the warning (see ``get_location``).
- :ivar tuple|py.path.local fslocation:
+ :ivar tuple|py.path.local fslocation:
File system location of the source of the warning (see ``get_location``).
- """
-
+ """
+
message = attr.ib(type=str)
nodeid = attr.ib(type=Optional[str], default=None)
fslocation = attr.ib(
type=Optional[Union[Tuple[str, int], py.path.local]], default=None
)
count_towards_summary = True
-
+
def get_location(self, config: Config) -> Optional[str]:
"""Return the more user-friendly information about the location of a warning, or None."""
- if self.nodeid:
- return self.nodeid
- if self.fslocation:
- if isinstance(self.fslocation, tuple) and len(self.fslocation) >= 2:
- filename, linenum = self.fslocation[:2]
+ if self.nodeid:
+ return self.nodeid
+ if self.fslocation:
+ if isinstance(self.fslocation, tuple) and len(self.fslocation) >= 2:
+ filename, linenum = self.fslocation[:2]
relpath = bestrelpath(
config.invocation_params.dir, absolutepath(filename)
)
return f"{relpath}:{linenum}"
- else:
- return str(self.fslocation)
- return None
-
-
+ else:
+ return str(self.fslocation)
+ return None
+
+
@final
class TerminalReporter:
def __init__(self, config: Config, file: Optional[TextIO] = None) -> None:
- import _pytest.config
-
- self.config = config
- self._numcollected = 0
+ import _pytest.config
+
+ self.config = config
+ self._numcollected = 0
self._session: Optional[Session] = None
self._showfspath: Optional[bool] = None
-
+
self.stats: Dict[str, List[Any]] = {}
self._main_color: Optional[str] = None
self._known_types: Optional[List[str]] = None
self.startdir = config.invocation_dir
self.startpath = config.invocation_params.dir
- if file is None:
- file = sys.stdout
- self._tw = _pytest.config.create_terminal_writer(config, file)
- self._screen_width = self._tw.fullwidth
+ if file is None:
+ file = sys.stdout
+ self._tw = _pytest.config.create_terminal_writer(config, file)
+ self._screen_width = self._tw.fullwidth
self.currentfspath: Union[None, Path, str, int] = None
- self.reportchars = getreportopt(config)
- self.hasmarkup = self._tw.hasmarkup
- self.isatty = file.isatty()
+ self.reportchars = getreportopt(config)
+ self.hasmarkup = self._tw.hasmarkup
+ self.isatty = file.isatty()
self._progress_nodeids_reported: Set[str] = set()
- self._show_progress_info = self._determine_show_progress_info()
+ self._show_progress_info = self._determine_show_progress_info()
self._collect_report_last_write: Optional[float] = None
self._already_displayed_warnings: Optional[int] = None
self._keyboardinterrupt_memo: Optional[ExceptionRepr] = None
-
+
def _determine_show_progress_info(self) -> "Literal['progress', 'count', False]":
"""Return whether we should display progress information based on the current config."""
- # do not show progress if we are not capturing output (#3038)
+ # do not show progress if we are not capturing output (#3038)
if self.config.getoption("capture", "no") == "no":
- return False
- # do not show progress if we are showing fixture setup/teardown
+ return False
+ # do not show progress if we are showing fixture setup/teardown
if self.config.getoption("setupshow", False):
- return False
+ return False
cfg: str = self.config.getini("console_output_style")
if cfg == "progress":
return "progress"
@@ -356,7 +356,7 @@ class TerminalReporter:
return "count"
else:
return False
-
+
@property
def verbosity(self) -> int:
verbosity: int = self.config.option.verbose
@@ -389,64 +389,64 @@ class TerminalReporter:
return self.verbosity > 0
def hasopt(self, char: str) -> bool:
- char = {"xfailed": "x", "skipped": "s"}.get(char, char)
- return char in self.reportchars
-
+ char = {"xfailed": "x", "skipped": "s"}.get(char, char)
+ return char in self.reportchars
+
def write_fspath_result(self, nodeid: str, res, **markup: bool) -> None:
fspath = self.config.rootpath / nodeid.split("::")[0]
if self.currentfspath is None or fspath != self.currentfspath:
- if self.currentfspath is not None and self._show_progress_info:
- self._write_progress_information_filling_space()
- self.currentfspath = fspath
+ if self.currentfspath is not None and self._show_progress_info:
+ self._write_progress_information_filling_space()
+ self.currentfspath = fspath
relfspath = bestrelpath(self.startpath, fspath)
- self._tw.line()
+ self._tw.line()
self._tw.write(relfspath + " ")
self._tw.write(res, flush=True, **markup)
-
+
def write_ensure_prefix(self, prefix: str, extra: str = "", **kwargs) -> None:
- if self.currentfspath != prefix:
- self._tw.line()
- self.currentfspath = prefix
- self._tw.write(prefix)
- if extra:
- self._tw.write(extra, **kwargs)
- self.currentfspath = -2
-
+ if self.currentfspath != prefix:
+ self._tw.line()
+ self.currentfspath = prefix
+ self._tw.write(prefix)
+ if extra:
+ self._tw.write(extra, **kwargs)
+ self.currentfspath = -2
+
def ensure_newline(self) -> None:
- if self.currentfspath:
- self._tw.line()
- self.currentfspath = None
-
+ if self.currentfspath:
+ self._tw.line()
+ self.currentfspath = None
+
def write(self, content: str, *, flush: bool = False, **markup: bool) -> None:
self._tw.write(content, flush=flush, **markup)
-
+
def flush(self) -> None:
self._tw.flush()
def write_line(self, line: Union[str, bytes], **markup: bool) -> None:
if not isinstance(line, str):
line = str(line, errors="replace")
- self.ensure_newline()
- self._tw.line(line, **markup)
-
+ self.ensure_newline()
+ self._tw.line(line, **markup)
+
def rewrite(self, line: str, **markup: bool) -> None:
"""Rewinds the terminal cursor to the beginning and writes the given line.
-
+
:param erase:
If True, will also add spaces until the full terminal width to ensure
- previous lines are properly erased.
-
- The rest of the keyword arguments are markup instructions.
- """
- erase = markup.pop("erase", False)
- if erase:
- fill_count = self._tw.fullwidth - len(line) - 1
- fill = " " * fill_count
- else:
- fill = ""
- line = str(line)
- self._tw.write("\r" + line + fill, **markup)
-
+ previous lines are properly erased.
+
+ The rest of the keyword arguments are markup instructions.
+ """
+ erase = markup.pop("erase", False)
+ if erase:
+ fill_count = self._tw.fullwidth - len(line) - 1
+ fill = " " * fill_count
+ else:
+ fill = ""
+ line = str(line)
+ self._tw.write("\r" + line + fill, **markup)
+
def write_sep(
self,
sep: str,
@@ -454,15 +454,15 @@ class TerminalReporter:
fullwidth: Optional[int] = None,
**markup: bool,
) -> None:
- self.ensure_newline()
+ self.ensure_newline()
self._tw.sep(sep, title, fullwidth, **markup)
-
+
def section(self, title: str, sep: str = "=", **kw: bool) -> None:
- self._tw.sep(sep, title, **kw)
-
+ self._tw.sep(sep, title, **kw)
+
def line(self, msg: str, **kw: bool) -> None:
- self._tw.line(msg, **kw)
-
+ self._tw.line(msg, **kw)
+
def _add_stats(self, category: str, items: Sequence[Any]) -> None:
set_main_color = category not in self.stats
self.stats.setdefault(category, []).extend(items)
@@ -471,81 +471,81 @@ class TerminalReporter:
def pytest_internalerror(self, excrepr: ExceptionRepr) -> bool:
for line in str(excrepr).split("\n"):
- self.write_line("INTERNALERROR> " + line)
+ self.write_line("INTERNALERROR> " + line)
return True
-
+
def pytest_warning_recorded(
self, warning_message: warnings.WarningMessage, nodeid: str,
) -> None:
- from _pytest.warnings import warning_record_to_str
-
- fslocation = warning_message.filename, warning_message.lineno
- message = warning_record_to_str(warning_message)
-
- warning_report = WarningReport(
- fslocation=fslocation, message=message, nodeid=nodeid
- )
+ from _pytest.warnings import warning_record_to_str
+
+ fslocation = warning_message.filename, warning_message.lineno
+ message = warning_record_to_str(warning_message)
+
+ warning_report = WarningReport(
+ fslocation=fslocation, message=message, nodeid=nodeid
+ )
self._add_stats("warnings", [warning_report])
-
+
def pytest_plugin_registered(self, plugin: _PluggyPlugin) -> None:
- if self.config.option.traceconfig:
+ if self.config.option.traceconfig:
msg = f"PLUGIN registered: {plugin}"
# XXX This event may happen during setup/teardown time
- # which unfortunately captures our output here
+ # which unfortunately captures our output here
# which garbles our output if we use self.write_line.
- self.write_line(msg)
-
+ self.write_line(msg)
+
def pytest_deselected(self, items: Sequence[Item]) -> None:
self._add_stats("deselected", items)
-
+
def pytest_runtest_logstart(
self, nodeid: str, location: Tuple[str, Optional[int], str]
) -> None:
# Ensure that the path is printed before the
# 1st test of a module starts running.
- if self.showlongtestinfo:
- line = self._locationline(nodeid, *location)
- self.write_ensure_prefix(line, "")
+ if self.showlongtestinfo:
+ line = self._locationline(nodeid, *location)
+ self.write_ensure_prefix(line, "")
self.flush()
- elif self.showfspath:
+ elif self.showfspath:
self.write_fspath_result(nodeid, "")
self.flush()
-
+
def pytest_runtest_logreport(self, report: TestReport) -> None:
self._tests_ran = True
- rep = report
+ rep = report
res: Tuple[
str, str, Union[str, Tuple[str, Mapping[str, bool]]]
] = self.config.hook.pytest_report_teststatus(report=rep, config=self.config)
- category, letter, word = res
+ category, letter, word = res
if not isinstance(word, tuple):
markup = None
else:
- word, markup = word
+ word, markup = word
self._add_stats(category, [rep])
- if not letter and not word:
+ if not letter and not word:
# Probably passed setup/teardown.
- return
- running_xdist = hasattr(rep, "node")
- if markup is None:
+ return
+ running_xdist = hasattr(rep, "node")
+ if markup is None:
was_xfail = hasattr(report, "wasxfail")
if rep.passed and not was_xfail:
- markup = {"green": True}
+ markup = {"green": True}
elif rep.passed and was_xfail:
markup = {"yellow": True}
- elif rep.failed:
- markup = {"red": True}
- elif rep.skipped:
- markup = {"yellow": True}
- else:
- markup = {}
- if self.verbosity <= 0:
+ elif rep.failed:
+ markup = {"red": True}
+ elif rep.skipped:
+ markup = {"yellow": True}
+ else:
+ markup = {}
+ if self.verbosity <= 0:
self._tw.write(letter, **markup)
- else:
- self._progress_nodeids_reported.add(rep.nodeid)
- line = self._locationline(rep.nodeid, *rep.location)
- if not running_xdist:
- self.write_ensure_prefix(line, word, **markup)
+ else:
+ self._progress_nodeids_reported.add(rep.nodeid)
+ line = self._locationline(rep.nodeid, *rep.location)
+ if not running_xdist:
+ self.write_ensure_prefix(line, word, **markup)
if rep.skipped or hasattr(report, "wasxfail"):
available_width = (
(self._tw.fullwidth - self._tw.width_of_current_line)
@@ -556,22 +556,22 @@ class TerminalReporter:
reason_ = _format_trimmed(" ({})", reason, available_width)
if reason and reason_ is not None:
self._tw.write(reason_)
- if self._show_progress_info:
- self._write_progress_information_filling_space()
- else:
- self.ensure_newline()
+ if self._show_progress_info:
+ self._write_progress_information_filling_space()
+ else:
+ self.ensure_newline()
self._tw.write("[%s]" % rep.node.gateway.id)
- if self._show_progress_info:
- self._tw.write(
- self._get_progress_information_message() + " ", cyan=True
- )
- else:
- self._tw.write(" ")
- self._tw.write(word, **markup)
- self._tw.write(" " + line)
- self.currentfspath = -2
+ if self._show_progress_info:
+ self._tw.write(
+ self._get_progress_information_message() + " ", cyan=True
+ )
+ else:
+ self._tw.write(" ")
+ self._tw.write(word, **markup)
+ self._tw.write(" " + line)
+ self.currentfspath = -2
self.flush()
-
+
@property
def _is_last_item(self) -> bool:
assert self._session is not None
@@ -585,114 +585,114 @@ class TerminalReporter:
progress_length = len(" [{}/{}]".format(str(num_tests), str(num_tests)))
else:
progress_length = len(" [100%]")
-
- self._progress_nodeids_reported.add(nodeid)
+
+ self._progress_nodeids_reported.add(nodeid)
if self._is_last_item:
- self._write_progress_information_filling_space()
- else:
+ self._write_progress_information_filling_space()
+ else:
main_color, _ = self._get_main_color()
- w = self._width_of_current_line
- past_edge = w + progress_length + 1 >= self._screen_width
- if past_edge:
- msg = self._get_progress_information_message()
+ w = self._width_of_current_line
+ past_edge = w + progress_length + 1 >= self._screen_width
+ if past_edge:
+ msg = self._get_progress_information_message()
self._tw.write(msg + "\n", **{main_color: True})
-
+
def _get_progress_information_message(self) -> str:
assert self._session
- collected = self._session.testscollected
+ collected = self._session.testscollected
if self._show_progress_info == "count":
- if collected:
- progress = self._progress_nodeids_reported
- counter_format = "{{:{}d}}".format(len(str(collected)))
+ if collected:
+ progress = self._progress_nodeids_reported
+ counter_format = "{{:{}d}}".format(len(str(collected)))
format_string = f" [{counter_format}/{{}}]"
- return format_string.format(len(progress), collected)
+ return format_string.format(len(progress), collected)
return f" [ {collected} / {collected} ]"
- else:
- if collected:
+ else:
+ if collected:
return " [{:3d}%]".format(
len(self._progress_nodeids_reported) * 100 // collected
)
- return " [100%]"
-
+ return " [100%]"
+
def _write_progress_information_filling_space(self) -> None:
color, _ = self._get_main_color()
- msg = self._get_progress_information_message()
- w = self._width_of_current_line
- fill = self._tw.fullwidth - w - 1
+ msg = self._get_progress_information_message()
+ w = self._width_of_current_line
+ fill = self._tw.fullwidth - w - 1
self.write(msg.rjust(fill), flush=True, **{color: True})
-
- @property
+
+ @property
def _width_of_current_line(self) -> int:
"""Return the width of the current line."""
return self._tw.width_of_current_line
-
+
def pytest_collection(self) -> None:
- if self.isatty:
- if self.config.option.verbose >= 0:
+ if self.isatty:
+ if self.config.option.verbose >= 0:
self.write("collecting ... ", flush=True, bold=True)
self._collect_report_last_write = timing.time()
- elif self.config.option.verbose >= 1:
+ elif self.config.option.verbose >= 1:
self.write("collecting ... ", flush=True, bold=True)
-
+
def pytest_collectreport(self, report: CollectReport) -> None:
- if report.failed:
+ if report.failed:
self._add_stats("error", [report])
- elif report.skipped:
+ elif report.skipped:
self._add_stats("skipped", [report])
items = [x for x in report.result if isinstance(x, Item)]
- self._numcollected += len(items)
- if self.isatty:
- self.report_collect()
-
+ self._numcollected += len(items)
+ if self.isatty:
+ self.report_collect()
+
def report_collect(self, final: bool = False) -> None:
- if self.config.option.verbose < 0:
- return
-
- if not final:
- # Only write "collecting" report every 0.5s.
+ if self.config.option.verbose < 0:
+ return
+
+ if not final:
+ # Only write "collecting" report every 0.5s.
t = timing.time()
- if (
- self._collect_report_last_write is not None
+ if (
+ self._collect_report_last_write is not None
and self._collect_report_last_write > t - REPORT_COLLECTING_RESOLUTION
- ):
- return
- self._collect_report_last_write = t
-
- errors = len(self.stats.get("error", []))
- skipped = len(self.stats.get("skipped", []))
- deselected = len(self.stats.get("deselected", []))
+ ):
+ return
+ self._collect_report_last_write = t
+
+ errors = len(self.stats.get("error", []))
+ skipped = len(self.stats.get("skipped", []))
+ deselected = len(self.stats.get("deselected", []))
selected = self._numcollected - errors - skipped - deselected
- if final:
- line = "collected "
- else:
- line = "collecting "
- line += (
- str(self._numcollected) + " item" + ("" if self._numcollected == 1 else "s")
- )
- if errors:
+ if final:
+ line = "collected "
+ else:
+ line = "collecting "
+ line += (
+ str(self._numcollected) + " item" + ("" if self._numcollected == 1 else "s")
+ )
+ if errors:
line += " / %d error%s" % (errors, "s" if errors != 1 else "")
- if deselected:
- line += " / %d deselected" % deselected
- if skipped:
- line += " / %d skipped" % skipped
+ if deselected:
+ line += " / %d deselected" % deselected
+ if skipped:
+ line += " / %d skipped" % skipped
if self._numcollected > selected > 0:
line += " / %d selected" % selected
- if self.isatty:
- self.rewrite(line, bold=True, erase=True)
- if final:
- self.write("\n")
- else:
- self.write_line(line)
-
+ if self.isatty:
+ self.rewrite(line, bold=True, erase=True)
+ if final:
+ self.write("\n")
+ else:
+ self.write_line(line)
+
@hookimpl(trylast=True)
def pytest_sessionstart(self, session: "Session") -> None:
- self._session = session
+ self._session = session
self._sessionstarttime = timing.time()
- if not self.showheader:
- return
- self.write_sep("=", "test session starts", bold=True)
- verinfo = platform.python_version()
+ if not self.showheader:
+ return
+ self.write_sep("=", "test session starts", bold=True)
+ verinfo = platform.python_version()
if not self.no_header:
msg = f"platform {sys.platform} -- Python {verinfo}"
pypy_version_info = getattr(sys, "pypy_version_info", None)
@@ -713,7 +713,7 @@ class TerminalReporter:
config=self.config, startdir=self.startdir
)
self._write_report_lines_from_hooks(lines)
-
+
def _write_report_lines_from_hooks(
self, lines: Sequence[Union[str, Sequence[str]]]
) -> None:
@@ -723,32 +723,32 @@ class TerminalReporter:
else:
for line in line_or_lines:
self.write_line(line)
-
+
def pytest_report_header(self, config: Config) -> List[str]:
line = "rootdir: %s" % config.rootpath
if config.inipath:
line += ", configfile: " + bestrelpath(config.rootpath, config.inipath)
-
+
testpaths: List[str] = config.getini("testpaths")
if config.invocation_params.dir == config.rootpath and config.args == testpaths:
line += ", testpaths: {}".format(", ".join(testpaths))
result = [line]
- plugininfo = config.pluginmanager.list_plugin_distinfo()
- if plugininfo:
+ plugininfo = config.pluginmanager.list_plugin_distinfo()
+ if plugininfo:
result.append("plugins: %s" % ", ".join(_plugin_nameversions(plugininfo)))
return result
-
+
def pytest_collection_finish(self, session: "Session") -> None:
self.report_collect(True)
-
- lines = self.config.hook.pytest_report_collectionfinish(
- config=self.config, startdir=self.startdir, items=session.items
- )
- self._write_report_lines_from_hooks(lines)
-
+
+ lines = self.config.hook.pytest_report_collectionfinish(
+ config=self.config, startdir=self.startdir, items=session.items
+ )
+ self._write_report_lines_from_hooks(lines)
+
if self.config.getoption("collectonly"):
if session.items:
if self.config.option.verbose > -1:
@@ -763,30 +763,30 @@ class TerminalReporter:
def _printcollecteditems(self, items: Sequence[Item]) -> None:
# To print out items and their parent collectors
- # we take care to leave out Instances aka ()
+ # we take care to leave out Instances aka ()
# because later versions are going to get rid of them anyway.
- if self.config.option.verbose < 0:
- if self.config.option.verbose < -1:
+ if self.config.option.verbose < 0:
+ if self.config.option.verbose < -1:
counts = Counter(item.nodeid.split("::", 1)[0] for item in items)
- for name, count in sorted(counts.items()):
- self._tw.line("%s: %d" % (name, count))
- else:
- for item in items:
- self._tw.line(item.nodeid)
- return
+ for name, count in sorted(counts.items()):
+ self._tw.line("%s: %d" % (name, count))
+ else:
+ for item in items:
+ self._tw.line(item.nodeid)
+ return
stack: List[Node] = []
- indent = ""
- for item in items:
- needed_collectors = item.listchain()[1:] # strip root node
- while stack:
- if stack == needed_collectors[: len(stack)]:
- break
- stack.pop()
- for col in needed_collectors[len(stack) :]:
- stack.append(col)
- if col.name == "()": # Skip Instances.
- continue
- indent = (len(stack) - 1) * " "
+ indent = ""
+ for item in items:
+ needed_collectors = item.listchain()[1:] # strip root node
+ while stack:
+ if stack == needed_collectors[: len(stack)]:
+ break
+ stack.pop()
+ for col in needed_collectors[len(stack) :]:
+ stack.append(col)
+ if col.name == "()": # Skip Instances.
+ continue
+ indent = (len(stack) - 1) * " "
self._tw.line(f"{indent}{col}")
if self.config.option.verbose >= 1:
obj = getattr(col, "obj", None)
@@ -794,134 +794,134 @@ class TerminalReporter:
if doc:
for line in doc.splitlines():
self._tw.line("{}{}".format(indent + " ", line))
-
+
@hookimpl(hookwrapper=True)
def pytest_sessionfinish(
self, session: "Session", exitstatus: Union[int, ExitCode]
):
- outcome = yield
- outcome.get_result()
- self._tw.line("")
- summary_exit_codes = (
+ outcome = yield
+ outcome.get_result()
+ self._tw.line("")
+ summary_exit_codes = (
ExitCode.OK,
ExitCode.TESTS_FAILED,
ExitCode.INTERRUPTED,
ExitCode.USAGE_ERROR,
ExitCode.NO_TESTS_COLLECTED,
- )
+ )
if exitstatus in summary_exit_codes and not self.no_summary:
- self.config.hook.pytest_terminal_summary(
+ self.config.hook.pytest_terminal_summary(
terminalreporter=self, exitstatus=exitstatus, config=self.config
- )
+ )
if session.shouldfail:
self.write_sep("!", str(session.shouldfail), red=True)
if exitstatus == ExitCode.INTERRUPTED:
- self._report_keyboardinterrupt()
+ self._report_keyboardinterrupt()
self._keyboardinterrupt_memo = None
elif session.shouldstop:
self.write_sep("!", str(session.shouldstop), red=True)
- self.summary_stats()
-
+ self.summary_stats()
+
@hookimpl(hookwrapper=True)
def pytest_terminal_summary(self) -> Generator[None, None, None]:
- self.summary_errors()
- self.summary_failures()
- self.summary_warnings()
+ self.summary_errors()
+ self.summary_failures()
+ self.summary_warnings()
self.summary_passes()
- yield
+ yield
self.short_test_summary()
- # Display any extra warnings from teardown here (if any).
- self.summary_warnings()
-
+ # Display any extra warnings from teardown here (if any).
+ self.summary_warnings()
+
def pytest_keyboard_interrupt(self, excinfo: ExceptionInfo[BaseException]) -> None:
- self._keyboardinterrupt_memo = excinfo.getrepr(funcargs=True)
-
+ self._keyboardinterrupt_memo = excinfo.getrepr(funcargs=True)
+
def pytest_unconfigure(self) -> None:
if self._keyboardinterrupt_memo is not None:
- self._report_keyboardinterrupt()
-
+ self._report_keyboardinterrupt()
+
def _report_keyboardinterrupt(self) -> None:
- excrepr = self._keyboardinterrupt_memo
+ excrepr = self._keyboardinterrupt_memo
assert excrepr is not None
assert excrepr.reprcrash is not None
- msg = excrepr.reprcrash.message
- self.write_sep("!", msg)
- if "KeyboardInterrupt" in msg:
- if self.config.option.fulltrace:
- excrepr.toterminal(self._tw)
- else:
- excrepr.reprcrash.toterminal(self._tw)
- self._tw.line(
+ msg = excrepr.reprcrash.message
+ self.write_sep("!", msg)
+ if "KeyboardInterrupt" in msg:
+ if self.config.option.fulltrace:
+ excrepr.toterminal(self._tw)
+ else:
+ excrepr.reprcrash.toterminal(self._tw)
+ self._tw.line(
"(to show a full traceback on KeyboardInterrupt use --full-trace)",
- yellow=True,
- )
-
- def _locationline(self, nodeid, fspath, lineno, domain):
- def mkrel(nodeid):
- line = self.config.cwd_relative_nodeid(nodeid)
- if domain and line.endswith(domain):
- line = line[: -len(domain)]
- values = domain.split("[")
- values[0] = values[0].replace(".", "::") # don't replace '.' in params
- line += "[".join(values)
- return line
-
+ yellow=True,
+ )
+
+ def _locationline(self, nodeid, fspath, lineno, domain):
+ def mkrel(nodeid):
+ line = self.config.cwd_relative_nodeid(nodeid)
+ if domain and line.endswith(domain):
+ line = line[: -len(domain)]
+ values = domain.split("[")
+ values[0] = values[0].replace(".", "::") # don't replace '.' in params
+ line += "[".join(values)
+ return line
+
# collect_fspath comes from testid which has a "/"-normalized path.
-
- if fspath:
- res = mkrel(nodeid)
- if self.verbosity >= 2 and nodeid.split("::")[0] != fspath.replace(
- "\\", nodes.SEP
- ):
+
+ if fspath:
+ res = mkrel(nodeid)
+ if self.verbosity >= 2 and nodeid.split("::")[0] != fspath.replace(
+ "\\", nodes.SEP
+ ):
res += " <- " + bestrelpath(self.startpath, fspath)
- else:
- res = "[location]"
- return res + " "
-
- def _getfailureheadline(self, rep):
+ else:
+ res = "[location]"
+ return res + " "
+
+ def _getfailureheadline(self, rep):
head_line = rep.head_line
if head_line:
return head_line
return "test session" # XXX?
-
- def _getcrashline(self, rep):
- try:
- return str(rep.longrepr.reprcrash)
- except AttributeError:
- try:
- return str(rep.longrepr)[:50]
- except AttributeError:
- return ""
-
- #
+
+ def _getcrashline(self, rep):
+ try:
+ return str(rep.longrepr.reprcrash)
+ except AttributeError:
+ try:
+ return str(rep.longrepr)[:50]
+ except AttributeError:
+ return ""
+
+ #
# Summaries for sessionfinish.
- #
+ #
def getreports(self, name: str):
- values = []
- for x in self.stats.get(name, []):
- if not hasattr(x, "_pdbshown"):
- values.append(x)
- return values
-
+ values = []
+ for x in self.stats.get(name, []):
+ if not hasattr(x, "_pdbshown"):
+ values.append(x)
+ return values
+
def summary_warnings(self) -> None:
- if self.hasopt("w"):
+ if self.hasopt("w"):
all_warnings: Optional[List[WarningReport]] = self.stats.get("warnings")
- if not all_warnings:
- return
-
+ if not all_warnings:
+ return
+
final = self._already_displayed_warnings is not None
- if final:
+ if final:
warning_reports = all_warnings[self._already_displayed_warnings :]
- else:
+ else:
warning_reports = all_warnings
self._already_displayed_warnings = len(warning_reports)
if not warning_reports:
- return
-
+ return
+
reports_grouped_by_message: Dict[str, List[WarningReport]] = {}
for wr in warning_reports:
reports_grouped_by_message.setdefault(wr.message, []).append(wr)
-
+
def collapsed_location_report(reports: List[WarningReport]) -> str:
locations = []
for w in reports:
@@ -940,8 +940,8 @@ class TerminalReporter:
for k, v in counts_by_filename.items()
)
- title = "warnings summary (final)" if final else "warnings summary"
- self.write_sep("=", title, yellow=True, bold=False)
+ title = "warnings summary (final)" if final else "warnings summary"
+ self.write_sep("=", title, yellow=True, bold=False)
for message, message_reports in reports_grouped_by_message.items():
maybe_location = collapsed_location_report(message_reports)
if maybe_location:
@@ -952,23 +952,23 @@ class TerminalReporter:
else:
message = message.rstrip()
self._tw.line(message)
- self._tw.line()
+ self._tw.line()
self._tw.line("-- Docs: https://docs.pytest.org/en/stable/warnings.html")
-
+
def summary_passes(self) -> None:
- if self.config.option.tbstyle != "no":
- if self.hasopt("P"):
+ if self.config.option.tbstyle != "no":
+ if self.hasopt("P"):
reports: List[TestReport] = self.getreports("passed")
- if not reports:
- return
- self.write_sep("=", "PASSES")
- for rep in reports:
- if rep.sections:
- msg = self._getfailureheadline(rep)
+ if not reports:
+ return
+ self.write_sep("=", "PASSES")
+ for rep in reports:
+ if rep.sections:
+ msg = self._getfailureheadline(rep)
self.write_sep("_", msg, green=True, bold=True)
- self._outrep_summary(rep)
+ self._outrep_summary(rep)
self._handle_teardown_sections(rep.nodeid)
-
+
def _get_teardown_reports(self, nodeid: str) -> List[TestReport]:
reports = self.getreports("")
return [
@@ -982,63 +982,63 @@ class TerminalReporter:
self.print_teardown_sections(report)
def print_teardown_sections(self, rep: TestReport) -> None:
- showcapture = self.config.option.showcapture
- if showcapture == "no":
- return
- for secname, content in rep.sections:
- if showcapture != "all" and showcapture not in secname:
- continue
- if "teardown" in secname:
- self._tw.sep("-", secname)
- if content[-1:] == "\n":
- content = content[:-1]
- self._tw.line(content)
-
+ showcapture = self.config.option.showcapture
+ if showcapture == "no":
+ return
+ for secname, content in rep.sections:
+ if showcapture != "all" and showcapture not in secname:
+ continue
+ if "teardown" in secname:
+ self._tw.sep("-", secname)
+ if content[-1:] == "\n":
+ content = content[:-1]
+ self._tw.line(content)
+
def summary_failures(self) -> None:
- if self.config.option.tbstyle != "no":
+ if self.config.option.tbstyle != "no":
reports: List[BaseReport] = self.getreports("failed")
- if not reports:
- return
- self.write_sep("=", "FAILURES")
+ if not reports:
+ return
+ self.write_sep("=", "FAILURES")
if self.config.option.tbstyle == "line":
for rep in reports:
- line = self._getcrashline(rep)
- self.write_line(line)
+ line = self._getcrashline(rep)
+ self.write_line(line)
else:
for rep in reports:
- msg = self._getfailureheadline(rep)
- self.write_sep("_", msg, red=True, bold=True)
- self._outrep_summary(rep)
+ msg = self._getfailureheadline(rep)
+ self.write_sep("_", msg, red=True, bold=True)
+ self._outrep_summary(rep)
self._handle_teardown_sections(rep.nodeid)
-
+
def summary_errors(self) -> None:
- if self.config.option.tbstyle != "no":
+ if self.config.option.tbstyle != "no":
reports: List[BaseReport] = self.getreports("error")
- if not reports:
- return
- self.write_sep("=", "ERRORS")
- for rep in self.stats["error"]:
- msg = self._getfailureheadline(rep)
+ if not reports:
+ return
+ self.write_sep("=", "ERRORS")
+ for rep in self.stats["error"]:
+ msg = self._getfailureheadline(rep)
if rep.when == "collect":
- msg = "ERROR collecting " + msg
+ msg = "ERROR collecting " + msg
else:
msg = f"ERROR at {rep.when} of {msg}"
- self.write_sep("_", msg, red=True, bold=True)
- self._outrep_summary(rep)
-
+ self.write_sep("_", msg, red=True, bold=True)
+ self._outrep_summary(rep)
+
def _outrep_summary(self, rep: BaseReport) -> None:
- rep.toterminal(self._tw)
- showcapture = self.config.option.showcapture
- if showcapture == "no":
- return
- for secname, content in rep.sections:
- if showcapture != "all" and showcapture not in secname:
- continue
- self._tw.sep("-", secname)
- if content[-1:] == "\n":
- content = content[:-1]
- self._tw.line(content)
-
+ rep.toterminal(self._tw)
+ showcapture = self.config.option.showcapture
+ if showcapture == "no":
+ return
+ for secname, content in rep.sections:
+ if showcapture != "all" and showcapture not in secname:
+ continue
+ self._tw.sep("-", secname)
+ if content[-1:] == "\n":
+ content = content[:-1]
+ self._tw.line(content)
+
def summary_stats(self) -> None:
if self.verbosity < -1:
return
@@ -1046,7 +1046,7 @@ class TerminalReporter:
session_duration = timing.time() - self._sessionstarttime
(parts, main_color) = self.build_summary_stats_line()
line_parts = []
-
+
display_sep = self.verbosity >= 0
if display_sep:
fullwidth = self._tw.fullwidth
@@ -1056,7 +1056,7 @@ class TerminalReporter:
fullwidth += len(with_markup) - len(text)
line_parts.append(with_markup)
msg = ", ".join(line_parts)
-
+
main_markup = {main_color: True}
duration = " in {}".format(format_session_duration(session_duration))
duration_with_markup = self._tw.markup(duration, **main_markup)
@@ -1079,7 +1079,7 @@ class TerminalReporter:
def short_test_summary(self) -> None:
if not self.reportchars:
return
-
+
def show_simple(stat, lines: List[str]) -> None:
failed = self.stats.get(stat, [])
if not failed:
@@ -1295,7 +1295,7 @@ def _get_line_with_reprcrash_message(
line = f"{verbose_word} {pos}"
line_width = wcswidth(line)
- try:
+ try:
# Type ignored intentionally -- possible AttributeError expected.
msg = rep.longrepr.reprcrash.message # type: ignore[union-attr]
except AttributeError:
@@ -1305,9 +1305,9 @@ def _get_line_with_reprcrash_message(
msg = _format_trimmed(" - {}", msg, available_width)
if msg is not None:
line += msg
-
+
return line
-
+
def _folded_skips(
startpath: Path, skipped: Sequence[CollectReport],
@@ -1346,33 +1346,33 @@ _color_for_type = {
"passed": "green",
}
_color_for_type_default = "yellow"
-
-
+
+
def pluralize(count: int, noun: str) -> Tuple[int, str]:
# No need to pluralize words such as `failed` or `passed`.
if noun not in ["error", "warnings", "test"]:
return count, noun
-
+
# The `warnings` key is plural. To avoid API breakage, we keep it that way but
# set it to singular here so we can determine plurality in the same way as we do
# for `error`.
noun = noun.replace("warnings", "warning")
-
+
return count, noun + "s" if count != 1 else noun
-
+
def _plugin_nameversions(plugininfo) -> List[str]:
values: List[str] = []
- for plugin, dist in plugininfo:
+ for plugin, dist in plugininfo:
# Gets us name and version!
- name = "{dist.project_name}-{dist.version}".format(dist=dist)
+ name = "{dist.project_name}-{dist.version}".format(dist=dist)
# Questionable convenience, but it keeps things short.
- if name.startswith("pytest-"):
- name = name[7:]
+ if name.startswith("pytest-"):
+ name = name[7:]
# We decided to print python package names they can have more than one plugin.
- if name not in values:
- values.append(name)
- return values
+ if name not in values:
+ values.append(name)
+ return values
def format_session_duration(seconds: float) -> str: