diff options
author | shadchin <shadchin@yandex-team.ru> | 2022-02-10 16:44:39 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:39 +0300 |
commit | e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (patch) | |
tree | 64175d5cadab313b3e7039ebaa06c5bc3295e274 /contrib/python/pytest/py3/_pytest/terminal.py | |
parent | 2598ef1d0aee359b4b6d5fdd1758916d5907d04f (diff) | |
download | ydb-e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0.tar.gz |
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/terminal.py')
-rw-r--r-- | contrib/python/pytest/py3/_pytest/terminal.py | 1792 |
1 files changed, 896 insertions, 896 deletions
diff --git a/contrib/python/pytest/py3/_pytest/terminal.py b/contrib/python/pytest/py3/_pytest/terminal.py index 075fed3f03..fbfb09aecf 100644 --- a/contrib/python/pytest/py3/_pytest/terminal.py +++ b/contrib/python/pytest/py3/_pytest/terminal.py @@ -1,93 +1,93 @@ -"""Terminal reporting of the full testing process. +"""Terminal reporting of the full testing process. This is a good source for looking at the various reporting hooks. """ import argparse -import datetime -import inspect +import datetime +import inspect import platform import sys -import warnings -from collections import Counter -from functools import partial -from pathlib import Path -from typing import Any -from typing import Callable -from typing import cast -from typing import Dict -from typing import Generator -from typing import List -from typing import Mapping -from typing import Optional -from typing import Sequence -from typing import Set -from typing import TextIO -from typing import Tuple -from typing import TYPE_CHECKING -from typing import Union +import warnings +from collections import Counter +from functools import partial +from pathlib import Path +from typing import Any +from typing import Callable +from typing import cast +from typing import Dict +from typing import Generator +from typing import List +from typing import Mapping +from typing import Optional +from typing import Sequence +from typing import Set +from typing import TextIO +from typing import Tuple +from typing import TYPE_CHECKING +from typing import Union import attr import pluggy import py -import _pytest._version +import _pytest._version from _pytest import nodes -from _pytest import timing -from _pytest._code import ExceptionInfo -from _pytest._code.code import ExceptionRepr -from _pytest._io.wcwidth import wcswidth -from _pytest.compat import final -from _pytest.config import _PluggyPlugin -from _pytest.config import Config -from _pytest.config import ExitCode -from _pytest.config import hookimpl -from _pytest.config.argparsing import Parser -from _pytest.nodes import Item -from _pytest.nodes import Node -from _pytest.pathlib import absolutepath -from _pytest.pathlib import bestrelpath -from _pytest.reports import BaseReport -from _pytest.reports import CollectReport -from _pytest.reports import TestReport - -if TYPE_CHECKING: - from typing_extensions import Literal - - from _pytest.main import Session - - -REPORT_COLLECTING_RESOLUTION = 0.5 - -KNOWN_TYPES = ( - "failed", - "passed", - "skipped", - "deselected", - "xfailed", - "xpassed", - "warnings", - "error", -) - -_REPORTCHARS_DEFAULT = "fE" - - +from _pytest import timing +from _pytest._code import ExceptionInfo +from _pytest._code.code import ExceptionRepr +from _pytest._io.wcwidth import wcswidth +from _pytest.compat import final +from _pytest.config import _PluggyPlugin +from _pytest.config import Config +from _pytest.config import ExitCode +from _pytest.config import hookimpl +from _pytest.config.argparsing import Parser +from _pytest.nodes import Item +from _pytest.nodes import Node +from _pytest.pathlib import absolutepath +from _pytest.pathlib import bestrelpath +from _pytest.reports import BaseReport +from _pytest.reports import CollectReport +from _pytest.reports import TestReport + +if TYPE_CHECKING: + from typing_extensions import Literal + + from _pytest.main import Session + + +REPORT_COLLECTING_RESOLUTION = 0.5 + +KNOWN_TYPES = ( + "failed", + "passed", + "skipped", + "deselected", + "xfailed", + "xpassed", + "warnings", + "error", +) + +_REPORTCHARS_DEFAULT = "fE" + + class MoreQuietAction(argparse.Action): - """A modified copy of the argparse count action which counts down and updates - the legacy quiet attribute at the same time. + """A modified copy of the argparse count action which counts down and updates + the legacy quiet attribute at the same time. - Used to unify verbosity handling. + Used to unify verbosity handling. """ - def __init__( - self, - option_strings: Sequence[str], - dest: str, - default: object = None, - required: bool = False, - help: Optional[str] = None, - ) -> None: - super().__init__( + def __init__( + self, + option_strings: Sequence[str], + dest: str, + default: object = None, + required: bool = False, + help: Optional[str] = None, + ) -> None: + super().__init__( option_strings=option_strings, dest=dest, nargs=0, @@ -96,20 +96,20 @@ class MoreQuietAction(argparse.Action): help=help, ) - def __call__( - self, - parser: argparse.ArgumentParser, - namespace: argparse.Namespace, - values: Union[str, Sequence[object], None], - option_string: Optional[str] = None, - ) -> None: + def __call__( + self, + parser: argparse.ArgumentParser, + namespace: argparse.Namespace, + values: Union[str, Sequence[object], None], + option_string: Optional[str] = None, + ) -> None: new_count = getattr(namespace, self.dest, 0) - 1 setattr(namespace, self.dest, new_count) # todo Deprecate config.quiet namespace.quiet = getattr(namespace, "quiet", 0) + 1 -def pytest_addoption(parser: Parser) -> None: +def pytest_addoption(parser: Parser) -> None: group = parser.getgroup("terminal reporting", "reporting", after="general") group._addoption( "-v", @@ -118,47 +118,47 @@ def pytest_addoption(parser: Parser) -> None: default=0, dest="verbose", help="increase verbosity.", - ) + ) + group._addoption( + "--no-header", + action="store_true", + default=False, + dest="no_header", + help="disable header", + ) + group._addoption( + "--no-summary", + action="store_true", + default=False, + dest="no_summary", + help="disable summary", + ) group._addoption( - "--no-header", - action="store_true", - default=False, - dest="no_header", - help="disable header", - ) - group._addoption( - "--no-summary", - action="store_true", - default=False, - dest="no_summary", - help="disable summary", - ) - group._addoption( "-q", "--quiet", action=MoreQuietAction, default=0, dest="verbose", help="decrease verbosity.", - ) + ) group._addoption( - "--verbosity", - dest="verbose", - type=int, - default=0, - help="set verbosity. Default is 0.", + "--verbosity", + dest="verbose", + type=int, + default=0, + help="set verbosity. Default is 0.", ) group._addoption( "-r", action="store", dest="reportchars", - default=_REPORTCHARS_DEFAULT, + default=_REPORTCHARS_DEFAULT, metavar="chars", - help="show extra test summary info as specified by chars: (f)ailed, " - "(E)rror, (s)kipped, (x)failed, (X)passed, " - "(p)assed, (P)assed with output, (a)ll except passed (p/P), or (A)ll. " - "(w)arnings are enabled by default (see --disable-warnings), " - "'N' can be used to reset the list. (default: 'fE').", + help="show extra test summary info as specified by chars: (f)ailed, " + "(E)rror, (s)kipped, (x)failed, (X)passed, " + "(p)assed, (P)assed with output, (a)ll except passed (p/P), or (A)ll. " + "(w)arnings are enabled by default (see --disable-warnings), " + "'N' can be used to reset the list. (default: 'fE').", ) group._addoption( "--disable-warnings", @@ -210,21 +210,21 @@ def pytest_addoption(parser: Parser) -> None: choices=["yes", "no", "auto"], help="color terminal output (yes/no/auto).", ) - group._addoption( - "--code-highlight", - default="yes", - choices=["yes", "no"], - help="Whether code should be highlighted (only if --color is also enabled)", - ) + group._addoption( + "--code-highlight", + default="yes", + choices=["yes", "no"], + help="Whether code should be highlighted (only if --color is also enabled)", + ) parser.addini( "console_output_style", - help='console output: "classic", or with additional progress information ("progress" (percentage) | "count").', + help='console output: "classic", or with additional progress information ("progress" (percentage) | "count").', default="progress", ) -def pytest_configure(config: Config) -> None: +def pytest_configure(config: Config) -> None: reporter = TerminalReporter(config, sys.stdout) config.pluginmanager.register(reporter, "terminalreporter") if config.option.debug or config.option.traceconfig: @@ -236,174 +236,174 @@ def pytest_configure(config: Config) -> None: config.trace.root.setprocessor("pytest:config", mywriter) -def getreportopt(config: Config) -> str: - reportchars: str = config.option.reportchars - - old_aliases = {"F", "S"} +def getreportopt(config: Config) -> str: + reportchars: str = config.option.reportchars + + old_aliases = {"F", "S"} reportopts = "" - for char in reportchars: - if char in old_aliases: - char = char.lower() - if char == "a": - reportopts = "sxXEf" - elif char == "A": - reportopts = "PpsxXEf" - elif char == "N": - reportopts = "" - elif char not in reportopts: - reportopts += char - - if not config.option.disable_warnings and "w" not in reportopts: - reportopts = "w" + reportopts - elif config.option.disable_warnings and "w" in reportopts: - reportopts = reportopts.replace("w", "") - + for char in reportchars: + if char in old_aliases: + char = char.lower() + if char == "a": + reportopts = "sxXEf" + elif char == "A": + reportopts = "PpsxXEf" + elif char == "N": + reportopts = "" + elif char not in reportopts: + reportopts += char + + if not config.option.disable_warnings and "w" not in reportopts: + reportopts = "w" + reportopts + elif config.option.disable_warnings and "w" in reportopts: + reportopts = reportopts.replace("w", "") + return reportopts -@hookimpl(trylast=True) # after _pytest.runner -def pytest_report_teststatus(report: BaseReport) -> Tuple[str, str, str]: - letter = "F" +@hookimpl(trylast=True) # after _pytest.runner +def pytest_report_teststatus(report: BaseReport) -> Tuple[str, str, str]: + letter = "F" if report.passed: letter = "." elif report.skipped: letter = "s" - outcome: str = report.outcome - if report.when in ("collect", "setup", "teardown") and outcome == "failed": - outcome = "error" - letter = "E" + outcome: str = report.outcome + if report.when in ("collect", "setup", "teardown") and outcome == "failed": + outcome = "error" + letter = "E" + + return outcome, letter, outcome.upper() + - return outcome, letter, outcome.upper() - - @attr.s -class WarningReport: - """Simple structure to hold warnings information captured by ``pytest_warning_recorded``. +class WarningReport: + """Simple structure to hold warnings information captured by ``pytest_warning_recorded``. - :ivar str message: - User friendly message about the warning. - :ivar str|None nodeid: - nodeid that generated the warning (see ``get_location``). + :ivar str message: + User friendly message about the warning. + :ivar str|None nodeid: + nodeid that generated the warning (see ``get_location``). :ivar tuple|py.path.local fslocation: - File system location of the source of the warning (see ``get_location``). + File system location of the source of the warning (see ``get_location``). """ - message = attr.ib(type=str) - nodeid = attr.ib(type=Optional[str], default=None) - fslocation = attr.ib( - type=Optional[Union[Tuple[str, int], py.path.local]], default=None - ) - count_towards_summary = True + message = attr.ib(type=str) + nodeid = attr.ib(type=Optional[str], default=None) + fslocation = attr.ib( + type=Optional[Union[Tuple[str, int], py.path.local]], default=None + ) + count_towards_summary = True - def get_location(self, config: Config) -> Optional[str]: - """Return the more user-friendly information about the location of a warning, or None.""" + def get_location(self, config: Config) -> Optional[str]: + """Return the more user-friendly information about the location of a warning, or None.""" if self.nodeid: return self.nodeid if self.fslocation: if isinstance(self.fslocation, tuple) and len(self.fslocation) >= 2: filename, linenum = self.fslocation[:2] - relpath = bestrelpath( - config.invocation_params.dir, absolutepath(filename) - ) - return f"{relpath}:{linenum}" + relpath = bestrelpath( + config.invocation_params.dir, absolutepath(filename) + ) + return f"{relpath}:{linenum}" else: return str(self.fslocation) return None -@final -class TerminalReporter: - def __init__(self, config: Config, file: Optional[TextIO] = None) -> None: +@final +class TerminalReporter: + def __init__(self, config: Config, file: Optional[TextIO] = None) -> None: import _pytest.config self.config = config self._numcollected = 0 - self._session: Optional[Session] = None - self._showfspath: Optional[bool] = None - - self.stats: Dict[str, List[Any]] = {} - self._main_color: Optional[str] = None - self._known_types: Optional[List[str]] = None - self.startdir = config.invocation_dir - self.startpath = config.invocation_params.dir + self._session: Optional[Session] = None + self._showfspath: Optional[bool] = None + + self.stats: Dict[str, List[Any]] = {} + self._main_color: Optional[str] = None + self._known_types: Optional[List[str]] = None + self.startdir = config.invocation_dir + self.startpath = config.invocation_params.dir if file is None: file = sys.stdout self._tw = _pytest.config.create_terminal_writer(config, file) self._screen_width = self._tw.fullwidth - self.currentfspath: Union[None, Path, str, int] = None + self.currentfspath: Union[None, Path, str, int] = None self.reportchars = getreportopt(config) self.hasmarkup = self._tw.hasmarkup self.isatty = file.isatty() - self._progress_nodeids_reported: Set[str] = set() + self._progress_nodeids_reported: Set[str] = set() self._show_progress_info = self._determine_show_progress_info() - self._collect_report_last_write: Optional[float] = None - self._already_displayed_warnings: Optional[int] = None - self._keyboardinterrupt_memo: Optional[ExceptionRepr] = None + self._collect_report_last_write: Optional[float] = None + self._already_displayed_warnings: Optional[int] = None + self._keyboardinterrupt_memo: Optional[ExceptionRepr] = None - def _determine_show_progress_info(self) -> "Literal['progress', 'count', False]": - """Return whether we should display progress information based on the current config.""" + def _determine_show_progress_info(self) -> "Literal['progress', 'count', False]": + """Return whether we should display progress information based on the current config.""" # do not show progress if we are not capturing output (#3038) - if self.config.getoption("capture", "no") == "no": + if self.config.getoption("capture", "no") == "no": return False # do not show progress if we are showing fixture setup/teardown - if self.config.getoption("setupshow", False): + if self.config.getoption("setupshow", False): + return False + cfg: str = self.config.getini("console_output_style") + if cfg == "progress": + return "progress" + elif cfg == "count": + return "count" + else: return False - cfg: str = self.config.getini("console_output_style") - if cfg == "progress": - return "progress" - elif cfg == "count": - return "count" - else: - return False - - @property - def verbosity(self) -> int: - verbosity: int = self.config.option.verbose - return verbosity - - @property - def showheader(self) -> bool: - return self.verbosity >= 0 - - @property - def no_header(self) -> bool: - return bool(self.config.option.no_header) - - @property - def no_summary(self) -> bool: - return bool(self.config.option.no_summary) - - @property - def showfspath(self) -> bool: - if self._showfspath is None: - return self.verbosity >= 0 - return self._showfspath - - @showfspath.setter - def showfspath(self, value: Optional[bool]) -> None: - self._showfspath = value - - @property - def showlongtestinfo(self) -> bool: - return self.verbosity > 0 - - def hasopt(self, char: str) -> bool: + + @property + def verbosity(self) -> int: + verbosity: int = self.config.option.verbose + return verbosity + + @property + def showheader(self) -> bool: + return self.verbosity >= 0 + + @property + def no_header(self) -> bool: + return bool(self.config.option.no_header) + + @property + def no_summary(self) -> bool: + return bool(self.config.option.no_summary) + + @property + def showfspath(self) -> bool: + if self._showfspath is None: + return self.verbosity >= 0 + return self._showfspath + + @showfspath.setter + def showfspath(self, value: Optional[bool]) -> None: + self._showfspath = value + + @property + def showlongtestinfo(self) -> bool: + return self.verbosity > 0 + + def hasopt(self, char: str) -> bool: char = {"xfailed": "x", "skipped": "s"}.get(char, char) return char in self.reportchars - def write_fspath_result(self, nodeid: str, res, **markup: bool) -> None: - fspath = self.config.rootpath / nodeid.split("::")[0] - if self.currentfspath is None or fspath != self.currentfspath: + def write_fspath_result(self, nodeid: str, res, **markup: bool) -> None: + fspath = self.config.rootpath / nodeid.split("::")[0] + if self.currentfspath is None or fspath != self.currentfspath: if self.currentfspath is not None and self._show_progress_info: self._write_progress_information_filling_space() self.currentfspath = fspath - relfspath = bestrelpath(self.startpath, fspath) + relfspath = bestrelpath(self.startpath, fspath) self._tw.line() - self._tw.write(relfspath + " ") - self._tw.write(res, flush=True, **markup) + self._tw.write(relfspath + " ") + self._tw.write(res, flush=True, **markup) - def write_ensure_prefix(self, prefix: str, extra: str = "", **kwargs) -> None: + def write_ensure_prefix(self, prefix: str, extra: str = "", **kwargs) -> None: if self.currentfspath != prefix: self._tw.line() self.currentfspath = prefix @@ -412,28 +412,28 @@ class TerminalReporter: self._tw.write(extra, **kwargs) self.currentfspath = -2 - def ensure_newline(self) -> None: + def ensure_newline(self) -> None: if self.currentfspath: self._tw.line() self.currentfspath = None - def write(self, content: str, *, flush: bool = False, **markup: bool) -> None: - self._tw.write(content, flush=flush, **markup) + def write(self, content: str, *, flush: bool = False, **markup: bool) -> None: + self._tw.write(content, flush=flush, **markup) + + def flush(self) -> None: + self._tw.flush() - def flush(self) -> None: - self._tw.flush() - - def write_line(self, line: Union[str, bytes], **markup: bool) -> None: - if not isinstance(line, str): - line = str(line, errors="replace") + def write_line(self, line: Union[str, bytes], **markup: bool) -> None: + if not isinstance(line, str): + line = str(line, errors="replace") self.ensure_newline() self._tw.line(line, **markup) - def rewrite(self, line: str, **markup: bool) -> None: - """Rewinds the terminal cursor to the beginning and writes the given line. + def rewrite(self, line: str, **markup: bool) -> None: + """Rewinds the terminal cursor to the beginning and writes the given line. - :param erase: - If True, will also add spaces until the full terminal width to ensure + :param erase: + If True, will also add spaces until the full terminal width to ensure previous lines are properly erased. The rest of the keyword arguments are markup instructions. @@ -447,36 +447,36 @@ class TerminalReporter: line = str(line) self._tw.write("\r" + line + fill, **markup) - def write_sep( - self, - sep: str, - title: Optional[str] = None, - fullwidth: Optional[int] = None, - **markup: bool, - ) -> None: + def write_sep( + self, + sep: str, + title: Optional[str] = None, + fullwidth: Optional[int] = None, + **markup: bool, + ) -> None: self.ensure_newline() - self._tw.sep(sep, title, fullwidth, **markup) + self._tw.sep(sep, title, fullwidth, **markup) - def section(self, title: str, sep: str = "=", **kw: bool) -> None: + def section(self, title: str, sep: str = "=", **kw: bool) -> None: self._tw.sep(sep, title, **kw) - def line(self, msg: str, **kw: bool) -> None: + def line(self, msg: str, **kw: bool) -> None: self._tw.line(msg, **kw) - def _add_stats(self, category: str, items: Sequence[Any]) -> None: - set_main_color = category not in self.stats - self.stats.setdefault(category, []).extend(items) - if set_main_color: - self._set_main_color() - - def pytest_internalerror(self, excrepr: ExceptionRepr) -> bool: - for line in str(excrepr).split("\n"): + def _add_stats(self, category: str, items: Sequence[Any]) -> None: + set_main_color = category not in self.stats + self.stats.setdefault(category, []).extend(items) + if set_main_color: + self._set_main_color() + + def pytest_internalerror(self, excrepr: ExceptionRepr) -> bool: + for line in str(excrepr).split("\n"): self.write_line("INTERNALERROR> " + line) - return True + return True - def pytest_warning_recorded( - self, warning_message: warnings.WarningMessage, nodeid: str, - ) -> None: + def pytest_warning_recorded( + self, warning_message: warnings.WarningMessage, nodeid: str, + ) -> None: from _pytest.warnings import warning_record_to_str fslocation = warning_message.filename, warning_message.lineno @@ -485,54 +485,54 @@ class TerminalReporter: warning_report = WarningReport( fslocation=fslocation, message=message, nodeid=nodeid ) - self._add_stats("warnings", [warning_report]) + self._add_stats("warnings", [warning_report]) - def pytest_plugin_registered(self, plugin: _PluggyPlugin) -> None: + def pytest_plugin_registered(self, plugin: _PluggyPlugin) -> None: if self.config.option.traceconfig: - msg = f"PLUGIN registered: {plugin}" - # XXX This event may happen during setup/teardown time + msg = f"PLUGIN registered: {plugin}" + # XXX This event may happen during setup/teardown time # which unfortunately captures our output here - # which garbles our output if we use self.write_line. + # which garbles our output if we use self.write_line. self.write_line(msg) - def pytest_deselected(self, items: Sequence[Item]) -> None: - self._add_stats("deselected", items) + def pytest_deselected(self, items: Sequence[Item]) -> None: + self._add_stats("deselected", items) - def pytest_runtest_logstart( - self, nodeid: str, location: Tuple[str, Optional[int], str] - ) -> None: - # Ensure that the path is printed before the - # 1st test of a module starts running. + def pytest_runtest_logstart( + self, nodeid: str, location: Tuple[str, Optional[int], str] + ) -> None: + # Ensure that the path is printed before the + # 1st test of a module starts running. if self.showlongtestinfo: line = self._locationline(nodeid, *location) self.write_ensure_prefix(line, "") - self.flush() + self.flush() elif self.showfspath: - self.write_fspath_result(nodeid, "") - self.flush() + self.write_fspath_result(nodeid, "") + self.flush() - def pytest_runtest_logreport(self, report: TestReport) -> None: - self._tests_ran = True + def pytest_runtest_logreport(self, report: TestReport) -> None: + self._tests_ran = True rep = report - res: Tuple[ - str, str, Union[str, Tuple[str, Mapping[str, bool]]] - ] = self.config.hook.pytest_report_teststatus(report=rep, config=self.config) + res: Tuple[ + str, str, Union[str, Tuple[str, Mapping[str, bool]]] + ] = self.config.hook.pytest_report_teststatus(report=rep, config=self.config) category, letter, word = res - if not isinstance(word, tuple): - markup = None - else: + if not isinstance(word, tuple): + markup = None + else: word, markup = word - self._add_stats(category, [rep]) + self._add_stats(category, [rep]) if not letter and not word: - # Probably passed setup/teardown. + # Probably passed setup/teardown. return running_xdist = hasattr(rep, "node") if markup is None: - was_xfail = hasattr(report, "wasxfail") - if rep.passed and not was_xfail: + was_xfail = hasattr(report, "wasxfail") + if rep.passed and not was_xfail: markup = {"green": True} - elif rep.passed and was_xfail: - markup = {"yellow": True} + elif rep.passed and was_xfail: + markup = {"yellow": True} elif rep.failed: markup = {"red": True} elif rep.skipped: @@ -540,27 +540,27 @@ class TerminalReporter: else: markup = {} if self.verbosity <= 0: - self._tw.write(letter, **markup) + self._tw.write(letter, **markup) else: self._progress_nodeids_reported.add(rep.nodeid) line = self._locationline(rep.nodeid, *rep.location) if not running_xdist: self.write_ensure_prefix(line, word, **markup) - if rep.skipped or hasattr(report, "wasxfail"): - available_width = ( - (self._tw.fullwidth - self._tw.width_of_current_line) - - len(" [100%]") - - 1 - ) - reason = _get_raw_skip_reason(rep) - reason_ = _format_trimmed(" ({})", reason, available_width) - if reason and reason_ is not None: - self._tw.write(reason_) + if rep.skipped or hasattr(report, "wasxfail"): + available_width = ( + (self._tw.fullwidth - self._tw.width_of_current_line) + - len(" [100%]") + - 1 + ) + reason = _get_raw_skip_reason(rep) + reason_ = _format_trimmed(" ({})", reason, available_width) + if reason and reason_ is not None: + self._tw.write(reason_) if self._show_progress_info: self._write_progress_information_filling_space() else: self.ensure_newline() - self._tw.write("[%s]" % rep.node.gateway.id) + self._tw.write("[%s]" % rep.node.gateway.id) if self._show_progress_info: self._tw.write( self._get_progress_information_message() + " ", cyan=True @@ -570,91 +570,91 @@ class TerminalReporter: self._tw.write(word, **markup) self._tw.write(" " + line) self.currentfspath = -2 - self.flush() - - @property - def _is_last_item(self) -> bool: - assert self._session is not None - return len(self._progress_nodeids_reported) == self._session.testscollected - - def pytest_runtest_logfinish(self, nodeid: str) -> None: - assert self._session - if self.verbosity <= 0 and self._show_progress_info: - if self._show_progress_info == "count": - num_tests = self._session.testscollected - progress_length = len(" [{}/{}]".format(str(num_tests), str(num_tests))) - else: - progress_length = len(" [100%]") + self.flush() + + @property + def _is_last_item(self) -> bool: + assert self._session is not None + return len(self._progress_nodeids_reported) == self._session.testscollected + + def pytest_runtest_logfinish(self, nodeid: str) -> None: + assert self._session + if self.verbosity <= 0 and self._show_progress_info: + if self._show_progress_info == "count": + num_tests = self._session.testscollected + progress_length = len(" [{}/{}]".format(str(num_tests), str(num_tests))) + else: + progress_length = len(" [100%]") self._progress_nodeids_reported.add(nodeid) - - if self._is_last_item: + + if self._is_last_item: self._write_progress_information_filling_space() else: - main_color, _ = self._get_main_color() + main_color, _ = self._get_main_color() w = self._width_of_current_line past_edge = w + progress_length + 1 >= self._screen_width if past_edge: msg = self._get_progress_information_message() - self._tw.write(msg + "\n", **{main_color: True}) + self._tw.write(msg + "\n", **{main_color: True}) - def _get_progress_information_message(self) -> str: - assert self._session + def _get_progress_information_message(self) -> str: + assert self._session collected = self._session.testscollected - if self._show_progress_info == "count": + if self._show_progress_info == "count": if collected: progress = self._progress_nodeids_reported counter_format = "{{:{}d}}".format(len(str(collected))) - format_string = f" [{counter_format}/{{}}]" + format_string = f" [{counter_format}/{{}}]" return format_string.format(len(progress), collected) - return f" [ {collected} / {collected} ]" + return f" [ {collected} / {collected} ]" else: if collected: - return " [{:3d}%]".format( - len(self._progress_nodeids_reported) * 100 // collected - ) + return " [{:3d}%]".format( + len(self._progress_nodeids_reported) * 100 // collected + ) return " [100%]" - def _write_progress_information_filling_space(self) -> None: - color, _ = self._get_main_color() + def _write_progress_information_filling_space(self) -> None: + color, _ = self._get_main_color() msg = self._get_progress_information_message() w = self._width_of_current_line fill = self._tw.fullwidth - w - 1 - self.write(msg.rjust(fill), flush=True, **{color: True}) + self.write(msg.rjust(fill), flush=True, **{color: True}) @property - def _width_of_current_line(self) -> int: - """Return the width of the current line.""" - return self._tw.width_of_current_line + def _width_of_current_line(self) -> int: + """Return the width of the current line.""" + return self._tw.width_of_current_line - def pytest_collection(self) -> None: + def pytest_collection(self) -> None: if self.isatty: if self.config.option.verbose >= 0: - self.write("collecting ... ", flush=True, bold=True) - self._collect_report_last_write = timing.time() + self.write("collecting ... ", flush=True, bold=True) + self._collect_report_last_write = timing.time() elif self.config.option.verbose >= 1: - self.write("collecting ... ", flush=True, bold=True) + self.write("collecting ... ", flush=True, bold=True) - def pytest_collectreport(self, report: CollectReport) -> None: + def pytest_collectreport(self, report: CollectReport) -> None: if report.failed: - self._add_stats("error", [report]) + self._add_stats("error", [report]) elif report.skipped: - self._add_stats("skipped", [report]) - items = [x for x in report.result if isinstance(x, Item)] + self._add_stats("skipped", [report]) + items = [x for x in report.result if isinstance(x, Item)] self._numcollected += len(items) if self.isatty: self.report_collect() - def report_collect(self, final: bool = False) -> None: + def report_collect(self, final: bool = False) -> None: if self.config.option.verbose < 0: return if not final: # Only write "collecting" report every 0.5s. - t = timing.time() + t = timing.time() if ( self._collect_report_last_write is not None - and self._collect_report_last_write > t - REPORT_COLLECTING_RESOLUTION + and self._collect_report_last_write > t - REPORT_COLLECTING_RESOLUTION ): return self._collect_report_last_write = t @@ -662,7 +662,7 @@ class TerminalReporter: errors = len(self.stats.get("error", [])) skipped = len(self.stats.get("skipped", [])) deselected = len(self.stats.get("deselected", [])) - selected = self._numcollected - errors - skipped - deselected + selected = self._numcollected - errors - skipped - deselected if final: line = "collected " else: @@ -671,13 +671,13 @@ class TerminalReporter: str(self._numcollected) + " item" + ("" if self._numcollected == 1 else "s") ) if errors: - line += " / %d error%s" % (errors, "s" if errors != 1 else "") + line += " / %d error%s" % (errors, "s" if errors != 1 else "") if deselected: line += " / %d deselected" % deselected if skipped: line += " / %d skipped" % skipped - if self._numcollected > selected > 0: - line += " / %d selected" % selected + if self._numcollected > selected > 0: + line += " / %d selected" % selected if self.isatty: self.rewrite(line, bold=True, erase=True) if final: @@ -685,96 +685,96 @@ class TerminalReporter: else: self.write_line(line) - @hookimpl(trylast=True) - def pytest_sessionstart(self, session: "Session") -> None: + @hookimpl(trylast=True) + def pytest_sessionstart(self, session: "Session") -> None: self._session = session - self._sessionstarttime = timing.time() + self._sessionstarttime = timing.time() if not self.showheader: return self.write_sep("=", "test session starts", bold=True) verinfo = platform.python_version() - if not self.no_header: - msg = f"platform {sys.platform} -- Python {verinfo}" - pypy_version_info = getattr(sys, "pypy_version_info", None) - if pypy_version_info: - verinfo = ".".join(map(str, pypy_version_info[:3])) - msg += "[pypy-{}-{}]".format(verinfo, pypy_version_info[3]) - msg += ", pytest-{}, py-{}, pluggy-{}".format( - _pytest._version.version, py.__version__, pluggy.__version__ - ) - if ( - self.verbosity > 0 - or self.config.option.debug - or getattr(self.config.option, "pastebin", None) - ): - msg += " -- " + str(sys.executable) - self.write_line(msg) - lines = self.config.hook.pytest_report_header( - config=self.config, startdir=self.startdir - ) - self._write_report_lines_from_hooks(lines) - - def _write_report_lines_from_hooks( - self, lines: Sequence[Union[str, Sequence[str]]] - ) -> None: - for line_or_lines in reversed(lines): - if isinstance(line_or_lines, str): - self.write_line(line_or_lines) - else: - for line in line_or_lines: - self.write_line(line) - - def pytest_report_header(self, config: Config) -> List[str]: - line = "rootdir: %s" % config.rootpath - - if config.inipath: - line += ", configfile: " + bestrelpath(config.rootpath, config.inipath) - - testpaths: List[str] = config.getini("testpaths") - if config.invocation_params.dir == config.rootpath and config.args == testpaths: - line += ", testpaths: {}".format(", ".join(testpaths)) - - result = [line] - + if not self.no_header: + msg = f"platform {sys.platform} -- Python {verinfo}" + pypy_version_info = getattr(sys, "pypy_version_info", None) + if pypy_version_info: + verinfo = ".".join(map(str, pypy_version_info[:3])) + msg += "[pypy-{}-{}]".format(verinfo, pypy_version_info[3]) + msg += ", pytest-{}, py-{}, pluggy-{}".format( + _pytest._version.version, py.__version__, pluggy.__version__ + ) + if ( + self.verbosity > 0 + or self.config.option.debug + or getattr(self.config.option, "pastebin", None) + ): + msg += " -- " + str(sys.executable) + self.write_line(msg) + lines = self.config.hook.pytest_report_header( + config=self.config, startdir=self.startdir + ) + self._write_report_lines_from_hooks(lines) + + def _write_report_lines_from_hooks( + self, lines: Sequence[Union[str, Sequence[str]]] + ) -> None: + for line_or_lines in reversed(lines): + if isinstance(line_or_lines, str): + self.write_line(line_or_lines) + else: + for line in line_or_lines: + self.write_line(line) + + def pytest_report_header(self, config: Config) -> List[str]: + line = "rootdir: %s" % config.rootpath + + if config.inipath: + line += ", configfile: " + bestrelpath(config.rootpath, config.inipath) + + testpaths: List[str] = config.getini("testpaths") + if config.invocation_params.dir == config.rootpath and config.args == testpaths: + line += ", testpaths: {}".format(", ".join(testpaths)) + + result = [line] + plugininfo = config.pluginmanager.list_plugin_distinfo() if plugininfo: - result.append("plugins: %s" % ", ".join(_plugin_nameversions(plugininfo))) - return result + result.append("plugins: %s" % ", ".join(_plugin_nameversions(plugininfo))) + return result - def pytest_collection_finish(self, session: "Session") -> None: - self.report_collect(True) + def pytest_collection_finish(self, session: "Session") -> None: + self.report_collect(True) lines = self.config.hook.pytest_report_collectionfinish( config=self.config, startdir=self.startdir, items=session.items ) self._write_report_lines_from_hooks(lines) - if self.config.getoption("collectonly"): - if session.items: - if self.config.option.verbose > -1: - self._tw.line("") - self._printcollecteditems(session.items) - - failed = self.stats.get("failed") - if failed: - self._tw.sep("!", "collection failures") - for rep in failed: - rep.toterminal(self._tw) - - def _printcollecteditems(self, items: Sequence[Item]) -> None: - # To print out items and their parent collectors + if self.config.getoption("collectonly"): + if session.items: + if self.config.option.verbose > -1: + self._tw.line("") + self._printcollecteditems(session.items) + + failed = self.stats.get("failed") + if failed: + self._tw.sep("!", "collection failures") + for rep in failed: + rep.toterminal(self._tw) + + def _printcollecteditems(self, items: Sequence[Item]) -> None: + # To print out items and their parent collectors # we take care to leave out Instances aka () - # because later versions are going to get rid of them anyway. + # because later versions are going to get rid of them anyway. if self.config.option.verbose < 0: if self.config.option.verbose < -1: - counts = Counter(item.nodeid.split("::", 1)[0] for item in items) + counts = Counter(item.nodeid.split("::", 1)[0] for item in items) for name, count in sorted(counts.items()): self._tw.line("%s: %d" % (name, count)) else: for item in items: self._tw.line(item.nodeid) return - stack: List[Node] = [] + stack: List[Node] = [] indent = "" for item in items: needed_collectors = item.listchain()[1:] # strip root node @@ -787,63 +787,63 @@ class TerminalReporter: if col.name == "()": # Skip Instances. continue indent = (len(stack) - 1) * " " - self._tw.line(f"{indent}{col}") - if self.config.option.verbose >= 1: - obj = getattr(col, "obj", None) - doc = inspect.getdoc(obj) if obj else None - if doc: - for line in doc.splitlines(): - self._tw.line("{}{}".format(indent + " ", line)) - - @hookimpl(hookwrapper=True) - def pytest_sessionfinish( - self, session: "Session", exitstatus: Union[int, ExitCode] - ): + self._tw.line(f"{indent}{col}") + if self.config.option.verbose >= 1: + obj = getattr(col, "obj", None) + doc = inspect.getdoc(obj) if obj else None + if doc: + for line in doc.splitlines(): + self._tw.line("{}{}".format(indent + " ", line)) + + @hookimpl(hookwrapper=True) + def pytest_sessionfinish( + self, session: "Session", exitstatus: Union[int, ExitCode] + ): outcome = yield outcome.get_result() self._tw.line("") summary_exit_codes = ( - ExitCode.OK, - ExitCode.TESTS_FAILED, - ExitCode.INTERRUPTED, - ExitCode.USAGE_ERROR, - ExitCode.NO_TESTS_COLLECTED, + ExitCode.OK, + ExitCode.TESTS_FAILED, + ExitCode.INTERRUPTED, + ExitCode.USAGE_ERROR, + ExitCode.NO_TESTS_COLLECTED, ) - if exitstatus in summary_exit_codes and not self.no_summary: + if exitstatus in summary_exit_codes and not self.no_summary: self.config.hook.pytest_terminal_summary( - terminalreporter=self, exitstatus=exitstatus, config=self.config + terminalreporter=self, exitstatus=exitstatus, config=self.config ) - if session.shouldfail: - self.write_sep("!", str(session.shouldfail), red=True) - if exitstatus == ExitCode.INTERRUPTED: + if session.shouldfail: + self.write_sep("!", str(session.shouldfail), red=True) + if exitstatus == ExitCode.INTERRUPTED: self._report_keyboardinterrupt() - self._keyboardinterrupt_memo = None - elif session.shouldstop: - self.write_sep("!", str(session.shouldstop), red=True) + self._keyboardinterrupt_memo = None + elif session.shouldstop: + self.write_sep("!", str(session.shouldstop), red=True) self.summary_stats() - @hookimpl(hookwrapper=True) - def pytest_terminal_summary(self) -> Generator[None, None, None]: + @hookimpl(hookwrapper=True) + def pytest_terminal_summary(self) -> Generator[None, None, None]: self.summary_errors() self.summary_failures() self.summary_warnings() - self.summary_passes() + self.summary_passes() yield - self.short_test_summary() + self.short_test_summary() # Display any extra warnings from teardown here (if any). self.summary_warnings() - def pytest_keyboard_interrupt(self, excinfo: ExceptionInfo[BaseException]) -> None: + def pytest_keyboard_interrupt(self, excinfo: ExceptionInfo[BaseException]) -> None: self._keyboardinterrupt_memo = excinfo.getrepr(funcargs=True) - def pytest_unconfigure(self) -> None: - if self._keyboardinterrupt_memo is not None: + def pytest_unconfigure(self) -> None: + if self._keyboardinterrupt_memo is not None: self._report_keyboardinterrupt() - def _report_keyboardinterrupt(self) -> None: + def _report_keyboardinterrupt(self) -> None: excrepr = self._keyboardinterrupt_memo - assert excrepr is not None - assert excrepr.reprcrash is not None + assert excrepr is not None + assert excrepr.reprcrash is not None msg = excrepr.reprcrash.message self.write_sep("!", msg) if "KeyboardInterrupt" in msg: @@ -852,7 +852,7 @@ class TerminalReporter: else: excrepr.reprcrash.toterminal(self._tw) self._tw.line( - "(to show a full traceback on KeyboardInterrupt use --full-trace)", + "(to show a full traceback on KeyboardInterrupt use --full-trace)", yellow=True, ) @@ -866,23 +866,23 @@ class TerminalReporter: line += "[".join(values) return line - # collect_fspath comes from testid which has a "/"-normalized path. + # collect_fspath comes from testid which has a "/"-normalized path. if fspath: res = mkrel(nodeid) if self.verbosity >= 2 and nodeid.split("::")[0] != fspath.replace( "\\", nodes.SEP ): - res += " <- " + bestrelpath(self.startpath, fspath) + res += " <- " + bestrelpath(self.startpath, fspath) else: res = "[location]" return res + " " def _getfailureheadline(self, rep): - head_line = rep.head_line - if head_line: - return head_line - return "test session" # XXX? + head_line = rep.head_line + if head_line: + return head_line + return "test session" # XXX? def _getcrashline(self, rep): try: @@ -894,94 +894,94 @@ class TerminalReporter: return "" # - # Summaries for sessionfinish. + # Summaries for sessionfinish. # - def getreports(self, name: str): + def getreports(self, name: str): values = [] for x in self.stats.get(name, []): if not hasattr(x, "_pdbshown"): values.append(x) return values - def summary_warnings(self) -> None: + def summary_warnings(self) -> None: if self.hasopt("w"): - all_warnings: Optional[List[WarningReport]] = self.stats.get("warnings") + all_warnings: Optional[List[WarningReport]] = self.stats.get("warnings") if not all_warnings: return - final = self._already_displayed_warnings is not None + final = self._already_displayed_warnings is not None if final: - warning_reports = all_warnings[self._already_displayed_warnings :] + warning_reports = all_warnings[self._already_displayed_warnings :] else: - warning_reports = all_warnings - self._already_displayed_warnings = len(warning_reports) - if not warning_reports: + warning_reports = all_warnings + self._already_displayed_warnings = len(warning_reports) + if not warning_reports: return - reports_grouped_by_message: Dict[str, List[WarningReport]] = {} - for wr in warning_reports: - reports_grouped_by_message.setdefault(wr.message, []).append(wr) - - def collapsed_location_report(reports: List[WarningReport]) -> str: - locations = [] - for w in reports: - location = w.get_location(self.config) - if location: - locations.append(location) - - if len(locations) < 10: - return "\n".join(map(str, locations)) - - counts_by_filename = Counter( - str(loc).split("::", 1)[0] for loc in locations - ) - return "\n".join( - "{}: {} warning{}".format(k, v, "s" if v > 1 else "") - for k, v in counts_by_filename.items() - ) - + reports_grouped_by_message: Dict[str, List[WarningReport]] = {} + for wr in warning_reports: + reports_grouped_by_message.setdefault(wr.message, []).append(wr) + + def collapsed_location_report(reports: List[WarningReport]) -> str: + locations = [] + for w in reports: + location = w.get_location(self.config) + if location: + locations.append(location) + + if len(locations) < 10: + return "\n".join(map(str, locations)) + + counts_by_filename = Counter( + str(loc).split("::", 1)[0] for loc in locations + ) + return "\n".join( + "{}: {} warning{}".format(k, v, "s" if v > 1 else "") + for k, v in counts_by_filename.items() + ) + title = "warnings summary (final)" if final else "warnings summary" self.write_sep("=", title, yellow=True, bold=False) - for message, message_reports in reports_grouped_by_message.items(): - maybe_location = collapsed_location_report(message_reports) - if maybe_location: - self._tw.line(maybe_location) - lines = message.splitlines() - indented = "\n".join(" " + x for x in lines) - message = indented.rstrip() - else: - message = message.rstrip() - self._tw.line(message) + for message, message_reports in reports_grouped_by_message.items(): + maybe_location = collapsed_location_report(message_reports) + if maybe_location: + self._tw.line(maybe_location) + lines = message.splitlines() + indented = "\n".join(" " + x for x in lines) + message = indented.rstrip() + else: + message = message.rstrip() + self._tw.line(message) self._tw.line() - self._tw.line("-- Docs: https://docs.pytest.org/en/stable/warnings.html") + self._tw.line("-- Docs: https://docs.pytest.org/en/stable/warnings.html") - def summary_passes(self) -> None: + def summary_passes(self) -> None: if self.config.option.tbstyle != "no": if self.hasopt("P"): - reports: List[TestReport] = self.getreports("passed") + reports: List[TestReport] = self.getreports("passed") if not reports: return self.write_sep("=", "PASSES") for rep in reports: if rep.sections: msg = self._getfailureheadline(rep) - self.write_sep("_", msg, green=True, bold=True) + self.write_sep("_", msg, green=True, bold=True) self._outrep_summary(rep) - self._handle_teardown_sections(rep.nodeid) - - def _get_teardown_reports(self, nodeid: str) -> List[TestReport]: - reports = self.getreports("") - return [ - report - for report in reports - if report.when == "teardown" and report.nodeid == nodeid - ] - - def _handle_teardown_sections(self, nodeid: str) -> None: - for report in self._get_teardown_reports(nodeid): - self.print_teardown_sections(report) - - def print_teardown_sections(self, rep: TestReport) -> None: + self._handle_teardown_sections(rep.nodeid) + + def _get_teardown_reports(self, nodeid: str) -> List[TestReport]: + reports = self.getreports("") + return [ + report + for report in reports + if report.when == "teardown" and report.nodeid == nodeid + ] + + def _handle_teardown_sections(self, nodeid: str) -> None: + for report in self._get_teardown_reports(nodeid): + self.print_teardown_sections(report) + + def print_teardown_sections(self, rep: TestReport) -> None: showcapture = self.config.option.showcapture if showcapture == "no": return @@ -994,39 +994,39 @@ class TerminalReporter: content = content[:-1] self._tw.line(content) - def summary_failures(self) -> None: + def summary_failures(self) -> None: if self.config.option.tbstyle != "no": - reports: List[BaseReport] = self.getreports("failed") + reports: List[BaseReport] = self.getreports("failed") if not reports: return self.write_sep("=", "FAILURES") - if self.config.option.tbstyle == "line": - for rep in reports: + if self.config.option.tbstyle == "line": + for rep in reports: line = self._getcrashline(rep) self.write_line(line) - else: - for rep in reports: + else: + for rep in reports: msg = self._getfailureheadline(rep) self.write_sep("_", msg, red=True, bold=True) self._outrep_summary(rep) - self._handle_teardown_sections(rep.nodeid) + self._handle_teardown_sections(rep.nodeid) - def summary_errors(self) -> None: + def summary_errors(self) -> None: if self.config.option.tbstyle != "no": - reports: List[BaseReport] = self.getreports("error") + reports: List[BaseReport] = self.getreports("error") if not reports: return self.write_sep("=", "ERRORS") for rep in self.stats["error"]: msg = self._getfailureheadline(rep) - if rep.when == "collect": + if rep.when == "collect": msg = "ERROR collecting " + msg - else: - msg = f"ERROR at {rep.when} of {msg}" + else: + msg = f"ERROR at {rep.when} of {msg}" self.write_sep("_", msg, red=True, bold=True) self._outrep_summary(rep) - def _outrep_summary(self, rep: BaseReport) -> None: + def _outrep_summary(self, rep: BaseReport) -> None: rep.toterminal(self._tw) showcapture = self.config.option.showcapture if showcapture == "no": @@ -1039,367 +1039,367 @@ class TerminalReporter: content = content[:-1] self._tw.line(content) - def summary_stats(self) -> None: - if self.verbosity < -1: - return - - session_duration = timing.time() - self._sessionstarttime - (parts, main_color) = self.build_summary_stats_line() - line_parts = [] - - display_sep = self.verbosity >= 0 - if display_sep: - fullwidth = self._tw.fullwidth - for text, markup in parts: - with_markup = self._tw.markup(text, **markup) - if display_sep: - fullwidth += len(with_markup) - len(text) - line_parts.append(with_markup) - msg = ", ".join(line_parts) - - main_markup = {main_color: True} - duration = " in {}".format(format_session_duration(session_duration)) - duration_with_markup = self._tw.markup(duration, **main_markup) - if display_sep: - fullwidth += len(duration_with_markup) - len(duration) - msg += duration_with_markup - - if display_sep: - markup_for_end_sep = self._tw.markup("", **main_markup) - if markup_for_end_sep.endswith("\x1b[0m"): - markup_for_end_sep = markup_for_end_sep[:-4] - fullwidth += len(markup_for_end_sep) - msg += markup_for_end_sep - - if display_sep: - self.write_sep("=", msg, fullwidth=fullwidth, **main_markup) - else: - self.write_line(msg, **main_markup) - - def short_test_summary(self) -> None: - if not self.reportchars: - return - - def show_simple(stat, lines: List[str]) -> None: - failed = self.stats.get(stat, []) - if not failed: - return - termwidth = self._tw.fullwidth - config = self.config - for rep in failed: - line = _get_line_with_reprcrash_message(config, rep, termwidth) - lines.append(line) - - def show_xfailed(lines: List[str]) -> None: - xfailed = self.stats.get("xfailed", []) - for rep in xfailed: - verbose_word = rep._get_verbose_word(self.config) - pos = _get_pos(self.config, rep) - lines.append(f"{verbose_word} {pos}") - reason = rep.wasxfail - if reason: - lines.append(" " + str(reason)) - - def show_xpassed(lines: List[str]) -> None: - xpassed = self.stats.get("xpassed", []) - for rep in xpassed: - verbose_word = rep._get_verbose_word(self.config) - pos = _get_pos(self.config, rep) - reason = rep.wasxfail - lines.append(f"{verbose_word} {pos} {reason}") - - def show_skipped(lines: List[str]) -> None: - skipped: List[CollectReport] = self.stats.get("skipped", []) - fskips = _folded_skips(self.startpath, skipped) if skipped else [] - if not fskips: - return - verbose_word = skipped[0]._get_verbose_word(self.config) - for num, fspath, lineno, reason in fskips: - if reason.startswith("Skipped: "): - reason = reason[9:] - if lineno is not None: - lines.append( - "%s [%d] %s:%d: %s" - % (verbose_word, num, fspath, lineno, reason) - ) - else: - lines.append("%s [%d] %s: %s" % (verbose_word, num, fspath, reason)) - - REPORTCHAR_ACTIONS: Mapping[str, Callable[[List[str]], None]] = { - "x": show_xfailed, - "X": show_xpassed, - "f": partial(show_simple, "failed"), - "s": show_skipped, - "p": partial(show_simple, "passed"), - "E": partial(show_simple, "error"), - } - - lines: List[str] = [] - for char in self.reportchars: - action = REPORTCHAR_ACTIONS.get(char) - if action: # skipping e.g. "P" (passed with output) here. - action(lines) - - if lines: - self.write_sep("=", "short test summary info") - for line in lines: - self.write_line(line) - - def _get_main_color(self) -> Tuple[str, List[str]]: - if self._main_color is None or self._known_types is None or self._is_last_item: - self._set_main_color() - assert self._main_color - assert self._known_types - return self._main_color, self._known_types - - def _determine_main_color(self, unknown_type_seen: bool) -> str: - stats = self.stats - if "failed" in stats or "error" in stats: - main_color = "red" - elif "warnings" in stats or "xpassed" in stats or unknown_type_seen: - main_color = "yellow" - elif "passed" in stats or not self._is_last_item: - main_color = "green" - else: - main_color = "yellow" - return main_color - - def _set_main_color(self) -> None: - unknown_types: List[str] = [] - for found_type in self.stats.keys(): - if found_type: # setup/teardown reports have an empty key, ignore them - if found_type not in KNOWN_TYPES and found_type not in unknown_types: - unknown_types.append(found_type) - self._known_types = list(KNOWN_TYPES) + unknown_types - self._main_color = self._determine_main_color(bool(unknown_types)) - - def build_summary_stats_line(self) -> Tuple[List[Tuple[str, Dict[str, bool]]], str]: - """ - Build the parts used in the last summary stats line. - - The summary stats line is the line shown at the end, "=== 12 passed, 2 errors in Xs===". - - This function builds a list of the "parts" that make up for the text in that line, in - the example above it would be: - - [ - ("12 passed", {"green": True}), - ("2 errors", {"red": True} - ] - - That last dict for each line is a "markup dictionary", used by TerminalWriter to - color output. - - The final color of the line is also determined by this function, and is the second - element of the returned tuple. - """ - if self.config.getoption("collectonly"): - return self._build_collect_only_summary_stats_line() - else: - return self._build_normal_summary_stats_line() - - def _get_reports_to_display(self, key: str) -> List[Any]: - """Get test/collection reports for the given status key, such as `passed` or `error`.""" - reports = self.stats.get(key, []) - return [x for x in reports if getattr(x, "count_towards_summary", True)] - - def _build_normal_summary_stats_line( - self, - ) -> Tuple[List[Tuple[str, Dict[str, bool]]], str]: - main_color, known_types = self._get_main_color() - parts = [] - - for key in known_types: - reports = self._get_reports_to_display(key) - if reports: - count = len(reports) - color = _color_for_type.get(key, _color_for_type_default) - markup = {color: True, "bold": color == main_color} - parts.append(("%d %s" % pluralize(count, key), markup)) - - if not parts: - parts = [("no tests ran", {_color_for_type_default: True})] - - return parts, main_color - - def _build_collect_only_summary_stats_line( - self, - ) -> Tuple[List[Tuple[str, Dict[str, bool]]], str]: - deselected = len(self._get_reports_to_display("deselected")) - errors = len(self._get_reports_to_display("error")) - - if self._numcollected == 0: - parts = [("no tests collected", {"yellow": True})] - main_color = "yellow" - - elif deselected == 0: - main_color = "green" - collected_output = "%d %s collected" % pluralize(self._numcollected, "test") - parts = [(collected_output, {main_color: True})] - else: - all_tests_were_deselected = self._numcollected == deselected - if all_tests_were_deselected: - main_color = "yellow" - collected_output = f"no tests collected ({deselected} deselected)" - else: - main_color = "green" - selected = self._numcollected - deselected - collected_output = f"{selected}/{self._numcollected} tests collected ({deselected} deselected)" - - parts = [(collected_output, {main_color: True})] - - if errors: - main_color = _color_for_type["error"] - parts += [("%d %s" % pluralize(errors, "error"), {main_color: True})] - - return parts, main_color - - -def _get_pos(config: Config, rep: BaseReport): - nodeid = config.cwd_relative_nodeid(rep.nodeid) - return nodeid - - -def _format_trimmed(format: str, msg: str, available_width: int) -> Optional[str]: - """Format msg into format, ellipsizing it if doesn't fit in available_width. - - Returns None if even the ellipsis can't fit. - """ - # Only use the first line. - i = msg.find("\n") - if i != -1: - msg = msg[:i] - - ellipsis = "..." - format_width = wcswidth(format.format("")) - if format_width + len(ellipsis) > available_width: - return None - - if format_width + wcswidth(msg) > available_width: - available_width -= len(ellipsis) - msg = msg[:available_width] - while format_width + wcswidth(msg) > available_width: - msg = msg[:-1] - msg += ellipsis - - return format.format(msg) - - -def _get_line_with_reprcrash_message( - config: Config, rep: BaseReport, termwidth: int -) -> str: - """Get summary line for a report, trying to add reprcrash message.""" - verbose_word = rep._get_verbose_word(config) - pos = _get_pos(config, rep) - - line = f"{verbose_word} {pos}" - line_width = wcswidth(line) - + def summary_stats(self) -> None: + if self.verbosity < -1: + return + + session_duration = timing.time() - self._sessionstarttime + (parts, main_color) = self.build_summary_stats_line() + line_parts = [] + + display_sep = self.verbosity >= 0 + if display_sep: + fullwidth = self._tw.fullwidth + for text, markup in parts: + with_markup = self._tw.markup(text, **markup) + if display_sep: + fullwidth += len(with_markup) - len(text) + line_parts.append(with_markup) + msg = ", ".join(line_parts) + + main_markup = {main_color: True} + duration = " in {}".format(format_session_duration(session_duration)) + duration_with_markup = self._tw.markup(duration, **main_markup) + if display_sep: + fullwidth += len(duration_with_markup) - len(duration) + msg += duration_with_markup + + if display_sep: + markup_for_end_sep = self._tw.markup("", **main_markup) + if markup_for_end_sep.endswith("\x1b[0m"): + markup_for_end_sep = markup_for_end_sep[:-4] + fullwidth += len(markup_for_end_sep) + msg += markup_for_end_sep + + if display_sep: + self.write_sep("=", msg, fullwidth=fullwidth, **main_markup) + else: + self.write_line(msg, **main_markup) + + def short_test_summary(self) -> None: + if not self.reportchars: + return + + def show_simple(stat, lines: List[str]) -> None: + failed = self.stats.get(stat, []) + if not failed: + return + termwidth = self._tw.fullwidth + config = self.config + for rep in failed: + line = _get_line_with_reprcrash_message(config, rep, termwidth) + lines.append(line) + + def show_xfailed(lines: List[str]) -> None: + xfailed = self.stats.get("xfailed", []) + for rep in xfailed: + verbose_word = rep._get_verbose_word(self.config) + pos = _get_pos(self.config, rep) + lines.append(f"{verbose_word} {pos}") + reason = rep.wasxfail + if reason: + lines.append(" " + str(reason)) + + def show_xpassed(lines: List[str]) -> None: + xpassed = self.stats.get("xpassed", []) + for rep in xpassed: + verbose_word = rep._get_verbose_word(self.config) + pos = _get_pos(self.config, rep) + reason = rep.wasxfail + lines.append(f"{verbose_word} {pos} {reason}") + + def show_skipped(lines: List[str]) -> None: + skipped: List[CollectReport] = self.stats.get("skipped", []) + fskips = _folded_skips(self.startpath, skipped) if skipped else [] + if not fskips: + return + verbose_word = skipped[0]._get_verbose_word(self.config) + for num, fspath, lineno, reason in fskips: + if reason.startswith("Skipped: "): + reason = reason[9:] + if lineno is not None: + lines.append( + "%s [%d] %s:%d: %s" + % (verbose_word, num, fspath, lineno, reason) + ) + else: + lines.append("%s [%d] %s: %s" % (verbose_word, num, fspath, reason)) + + REPORTCHAR_ACTIONS: Mapping[str, Callable[[List[str]], None]] = { + "x": show_xfailed, + "X": show_xpassed, + "f": partial(show_simple, "failed"), + "s": show_skipped, + "p": partial(show_simple, "passed"), + "E": partial(show_simple, "error"), + } + + lines: List[str] = [] + for char in self.reportchars: + action = REPORTCHAR_ACTIONS.get(char) + if action: # skipping e.g. "P" (passed with output) here. + action(lines) + + if lines: + self.write_sep("=", "short test summary info") + for line in lines: + self.write_line(line) + + def _get_main_color(self) -> Tuple[str, List[str]]: + if self._main_color is None or self._known_types is None or self._is_last_item: + self._set_main_color() + assert self._main_color + assert self._known_types + return self._main_color, self._known_types + + def _determine_main_color(self, unknown_type_seen: bool) -> str: + stats = self.stats + if "failed" in stats or "error" in stats: + main_color = "red" + elif "warnings" in stats or "xpassed" in stats or unknown_type_seen: + main_color = "yellow" + elif "passed" in stats or not self._is_last_item: + main_color = "green" + else: + main_color = "yellow" + return main_color + + def _set_main_color(self) -> None: + unknown_types: List[str] = [] + for found_type in self.stats.keys(): + if found_type: # setup/teardown reports have an empty key, ignore them + if found_type not in KNOWN_TYPES and found_type not in unknown_types: + unknown_types.append(found_type) + self._known_types = list(KNOWN_TYPES) + unknown_types + self._main_color = self._determine_main_color(bool(unknown_types)) + + def build_summary_stats_line(self) -> Tuple[List[Tuple[str, Dict[str, bool]]], str]: + """ + Build the parts used in the last summary stats line. + + The summary stats line is the line shown at the end, "=== 12 passed, 2 errors in Xs===". + + This function builds a list of the "parts" that make up for the text in that line, in + the example above it would be: + + [ + ("12 passed", {"green": True}), + ("2 errors", {"red": True} + ] + + That last dict for each line is a "markup dictionary", used by TerminalWriter to + color output. + + The final color of the line is also determined by this function, and is the second + element of the returned tuple. + """ + if self.config.getoption("collectonly"): + return self._build_collect_only_summary_stats_line() + else: + return self._build_normal_summary_stats_line() + + def _get_reports_to_display(self, key: str) -> List[Any]: + """Get test/collection reports for the given status key, such as `passed` or `error`.""" + reports = self.stats.get(key, []) + return [x for x in reports if getattr(x, "count_towards_summary", True)] + + def _build_normal_summary_stats_line( + self, + ) -> Tuple[List[Tuple[str, Dict[str, bool]]], str]: + main_color, known_types = self._get_main_color() + parts = [] + + for key in known_types: + reports = self._get_reports_to_display(key) + if reports: + count = len(reports) + color = _color_for_type.get(key, _color_for_type_default) + markup = {color: True, "bold": color == main_color} + parts.append(("%d %s" % pluralize(count, key), markup)) + + if not parts: + parts = [("no tests ran", {_color_for_type_default: True})] + + return parts, main_color + + def _build_collect_only_summary_stats_line( + self, + ) -> Tuple[List[Tuple[str, Dict[str, bool]]], str]: + deselected = len(self._get_reports_to_display("deselected")) + errors = len(self._get_reports_to_display("error")) + + if self._numcollected == 0: + parts = [("no tests collected", {"yellow": True})] + main_color = "yellow" + + elif deselected == 0: + main_color = "green" + collected_output = "%d %s collected" % pluralize(self._numcollected, "test") + parts = [(collected_output, {main_color: True})] + else: + all_tests_were_deselected = self._numcollected == deselected + if all_tests_were_deselected: + main_color = "yellow" + collected_output = f"no tests collected ({deselected} deselected)" + else: + main_color = "green" + selected = self._numcollected - deselected + collected_output = f"{selected}/{self._numcollected} tests collected ({deselected} deselected)" + + parts = [(collected_output, {main_color: True})] + + if errors: + main_color = _color_for_type["error"] + parts += [("%d %s" % pluralize(errors, "error"), {main_color: True})] + + return parts, main_color + + +def _get_pos(config: Config, rep: BaseReport): + nodeid = config.cwd_relative_nodeid(rep.nodeid) + return nodeid + + +def _format_trimmed(format: str, msg: str, available_width: int) -> Optional[str]: + """Format msg into format, ellipsizing it if doesn't fit in available_width. + + Returns None if even the ellipsis can't fit. + """ + # Only use the first line. + i = msg.find("\n") + if i != -1: + msg = msg[:i] + + ellipsis = "..." + format_width = wcswidth(format.format("")) + if format_width + len(ellipsis) > available_width: + return None + + if format_width + wcswidth(msg) > available_width: + available_width -= len(ellipsis) + msg = msg[:available_width] + while format_width + wcswidth(msg) > available_width: + msg = msg[:-1] + msg += ellipsis + + return format.format(msg) + + +def _get_line_with_reprcrash_message( + config: Config, rep: BaseReport, termwidth: int +) -> str: + """Get summary line for a report, trying to add reprcrash message.""" + verbose_word = rep._get_verbose_word(config) + pos = _get_pos(config, rep) + + line = f"{verbose_word} {pos}" + line_width = wcswidth(line) + try: - # Type ignored intentionally -- possible AttributeError expected. - msg = rep.longrepr.reprcrash.message # type: ignore[union-attr] - except AttributeError: - pass - else: - available_width = termwidth - line_width - msg = _format_trimmed(" - {}", msg, available_width) - if msg is not None: - line += msg - - return line - - -def _folded_skips( - startpath: Path, skipped: Sequence[CollectReport], -) -> List[Tuple[int, str, Optional[int], str]]: - d: Dict[Tuple[str, Optional[int], str], List[CollectReport]] = {} - for event in skipped: - assert event.longrepr is not None - assert isinstance(event.longrepr, tuple), (event, event.longrepr) - assert len(event.longrepr) == 3, (event, event.longrepr) - fspath, lineno, reason = event.longrepr - # For consistency, report all fspaths in relative form. - fspath = bestrelpath(startpath, Path(fspath)) - keywords = getattr(event, "keywords", {}) - # Folding reports with global pytestmark variable. - # This is a workaround, because for now we cannot identify the scope of a skip marker - # TODO: Revisit after marks scope would be fixed. - if ( - event.when == "setup" - and "skip" in keywords - and "pytestmark" not in keywords - ): - key: Tuple[str, Optional[int], str] = (fspath, None, reason) - else: - key = (fspath, lineno, reason) - d.setdefault(key, []).append(event) - values: List[Tuple[int, str, Optional[int], str]] = [] - for key, events in d.items(): - values.append((len(events), *key)) - return values - - -_color_for_type = { - "failed": "red", - "error": "red", - "warnings": "yellow", - "passed": "green", -} -_color_for_type_default = "yellow" - - -def pluralize(count: int, noun: str) -> Tuple[int, str]: - # No need to pluralize words such as `failed` or `passed`. - if noun not in ["error", "warnings", "test"]: - return count, noun - - # The `warnings` key is plural. To avoid API breakage, we keep it that way but - # set it to singular here so we can determine plurality in the same way as we do - # for `error`. - noun = noun.replace("warnings", "warning") - - return count, noun + "s" if count != 1 else noun - - -def _plugin_nameversions(plugininfo) -> List[str]: - values: List[str] = [] + # Type ignored intentionally -- possible AttributeError expected. + msg = rep.longrepr.reprcrash.message # type: ignore[union-attr] + except AttributeError: + pass + else: + available_width = termwidth - line_width + msg = _format_trimmed(" - {}", msg, available_width) + if msg is not None: + line += msg + + return line + + +def _folded_skips( + startpath: Path, skipped: Sequence[CollectReport], +) -> List[Tuple[int, str, Optional[int], str]]: + d: Dict[Tuple[str, Optional[int], str], List[CollectReport]] = {} + for event in skipped: + assert event.longrepr is not None + assert isinstance(event.longrepr, tuple), (event, event.longrepr) + assert len(event.longrepr) == 3, (event, event.longrepr) + fspath, lineno, reason = event.longrepr + # For consistency, report all fspaths in relative form. + fspath = bestrelpath(startpath, Path(fspath)) + keywords = getattr(event, "keywords", {}) + # Folding reports with global pytestmark variable. + # This is a workaround, because for now we cannot identify the scope of a skip marker + # TODO: Revisit after marks scope would be fixed. + if ( + event.when == "setup" + and "skip" in keywords + and "pytestmark" not in keywords + ): + key: Tuple[str, Optional[int], str] = (fspath, None, reason) + else: + key = (fspath, lineno, reason) + d.setdefault(key, []).append(event) + values: List[Tuple[int, str, Optional[int], str]] = [] + for key, events in d.items(): + values.append((len(events), *key)) + return values + + +_color_for_type = { + "failed": "red", + "error": "red", + "warnings": "yellow", + "passed": "green", +} +_color_for_type_default = "yellow" + + +def pluralize(count: int, noun: str) -> Tuple[int, str]: + # No need to pluralize words such as `failed` or `passed`. + if noun not in ["error", "warnings", "test"]: + return count, noun + + # The `warnings` key is plural. To avoid API breakage, we keep it that way but + # set it to singular here so we can determine plurality in the same way as we do + # for `error`. + noun = noun.replace("warnings", "warning") + + return count, noun + "s" if count != 1 else noun + + +def _plugin_nameversions(plugininfo) -> List[str]: + values: List[str] = [] for plugin, dist in plugininfo: - # Gets us name and version! + # Gets us name and version! name = "{dist.project_name}-{dist.version}".format(dist=dist) - # Questionable convenience, but it keeps things short. + # Questionable convenience, but it keeps things short. if name.startswith("pytest-"): name = name[7:] - # We decided to print python package names they can have more than one plugin. + # We decided to print python package names they can have more than one plugin. if name not in values: values.append(name) return values - - -def format_session_duration(seconds: float) -> str: - """Format the given seconds in a human readable manner to show in the final summary.""" - if seconds < 60: - return f"{seconds:.2f}s" - else: - dt = datetime.timedelta(seconds=int(seconds)) - return f"{seconds:.2f}s ({dt})" - - -def _get_raw_skip_reason(report: TestReport) -> str: - """Get the reason string of a skip/xfail/xpass test report. - - The string is just the part given by the user. - """ - if hasattr(report, "wasxfail"): - reason = cast(str, report.wasxfail) - if reason.startswith("reason: "): - reason = reason[len("reason: ") :] - return reason - else: - assert report.skipped - assert isinstance(report.longrepr, tuple) - _, _, reason = report.longrepr - if reason.startswith("Skipped: "): - reason = reason[len("Skipped: ") :] - elif reason == "Skipped": - reason = "" - return reason + + +def format_session_duration(seconds: float) -> str: + """Format the given seconds in a human readable manner to show in the final summary.""" + if seconds < 60: + return f"{seconds:.2f}s" + else: + dt = datetime.timedelta(seconds=int(seconds)) + return f"{seconds:.2f}s ({dt})" + + +def _get_raw_skip_reason(report: TestReport) -> str: + """Get the reason string of a skip/xfail/xpass test report. + + The string is just the part given by the user. + """ + if hasattr(report, "wasxfail"): + reason = cast(str, report.wasxfail) + if reason.startswith("reason: "): + reason = reason[len("reason: ") :] + return reason + else: + assert report.skipped + assert isinstance(report.longrepr, tuple) + _, _, reason = report.longrepr + if reason.startswith("Skipped: "): + reason = reason[len("Skipped: ") :] + elif reason == "Skipped": + reason = "" + return reason |