diff options
author | deshevoy <deshevoy@yandex-team.ru> | 2022-02-10 16:46:56 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:56 +0300 |
commit | e988f30484abe5fdeedcc7a5d3c226c01a21800c (patch) | |
tree | 0a217b173aabb57b7e51f8a169989b1a3e0309fe /contrib/python/pytest/py3/_pytest/pytester.py | |
parent | 33ee501c05d3f24036ae89766a858930ae66c548 (diff) | |
download | ydb-e988f30484abe5fdeedcc7a5d3c226c01a21800c.tar.gz |
Restoring authorship annotation for <deshevoy@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/pytester.py')
-rw-r--r-- | contrib/python/pytest/py3/_pytest/pytester.py | 1522 |
1 files changed, 761 insertions, 761 deletions
diff --git a/contrib/python/pytest/py3/_pytest/pytester.py b/contrib/python/pytest/py3/_pytest/pytester.py index 31259d1bdc..5c67e559fc 100644 --- a/contrib/python/pytest/py3/_pytest/pytester.py +++ b/contrib/python/pytest/py3/_pytest/pytester.py @@ -4,16 +4,16 @@ PYTEST_DONT_REWRITE """ import collections.abc import contextlib -import gc +import gc import importlib -import os -import platform -import re +import os +import platform +import re import shutil -import subprocess -import sys -import traceback -from fnmatch import fnmatch +import subprocess +import sys +import traceback +from fnmatch import fnmatch from io import StringIO from pathlib import Path from typing import Any @@ -30,15 +30,15 @@ from typing import Tuple from typing import Type from typing import TYPE_CHECKING from typing import Union -from weakref import WeakKeyDictionary - +from weakref import WeakKeyDictionary + import attr -import py +import py from iniconfig import IniConfig from iniconfig import SectionWrapper - + from _pytest import timing -from _pytest._code import Source +from _pytest._code import Source from _pytest.capture import _get_multicapture from _pytest.compat import final from _pytest.config import _PluggyPlugin @@ -51,7 +51,7 @@ from _pytest.config.argparsing import Parser from _pytest.deprecated import check_ispytest from _pytest.fixtures import fixture from _pytest.fixtures import FixtureRequest -from _pytest.main import Session +from _pytest.main import Session from _pytest.monkeypatch import MonkeyPatch from _pytest.nodes import Collector from _pytest.nodes import Item @@ -63,7 +63,7 @@ from _pytest.reports import CollectReport from _pytest.reports import TestReport from _pytest.tmpdir import TempPathFactory from _pytest.warning_types import PytestWarning - + if TYPE_CHECKING: from typing_extensions import Literal @@ -73,48 +73,48 @@ if TYPE_CHECKING: pytest_plugins = ["pytester_assertions"] -IGNORE_PAM = [ # filenames added when obtaining details about the current user +IGNORE_PAM = [ # filenames added when obtaining details about the current user "/var/lib/sss/mc/passwd" -] - - +] + + def pytest_addoption(parser: Parser) -> None: - parser.addoption( - "--lsof", - action="store_true", - dest="lsof", - default=False, - help="run FD checks if lsof is available", - ) - - parser.addoption( - "--runpytest", - default="inprocess", - dest="runpytest", - choices=("inprocess", "subprocess"), - help=( - "run pytest sub runs in tests using an 'inprocess' " - "or 'subprocess' (python -m main) method" - ), - ) - - parser.addini( - "pytester_example_dir", help="directory to take the pytester example files from" - ) - - + parser.addoption( + "--lsof", + action="store_true", + dest="lsof", + default=False, + help="run FD checks if lsof is available", + ) + + parser.addoption( + "--runpytest", + default="inprocess", + dest="runpytest", + choices=("inprocess", "subprocess"), + help=( + "run pytest sub runs in tests using an 'inprocess' " + "or 'subprocess' (python -m main) method" + ), + ) + + parser.addini( + "pytester_example_dir", help="directory to take the pytester example files from" + ) + + def pytest_configure(config: Config) -> None: - if config.getvalue("lsof"): - checker = LsofFdLeakChecker() - if checker.matching_platform(): - config.pluginmanager.register(checker) - + if config.getvalue("lsof"): + checker = LsofFdLeakChecker() + if checker.matching_platform(): + config.pluginmanager.register(checker) + config.addinivalue_line( "markers", "pytester_example_path(*path_segments): join the given path " "segments to `pytester_example_dir` for this test.", ) - + class LsofFdLeakChecker: def get_open_files(self) -> List[Tuple[str, str]]: @@ -125,48 +125,48 @@ class LsofFdLeakChecker: check=True, universal_newlines=True, ).stdout - + def isopen(line: str) -> bool: - return line.startswith("f") and ( - "deleted" not in line - and "mem" not in line - and "txt" not in line - and "cwd" not in line - ) - - open_files = [] - - for line in out.split("\n"): - if isopen(line): - fields = line.split("\0") - fd = fields[0][1:] - filename = fields[1][1:] - if filename in IGNORE_PAM: - continue - if filename.startswith("/"): - open_files.append((fd, filename)) - - return open_files - + return line.startswith("f") and ( + "deleted" not in line + and "mem" not in line + and "txt" not in line + and "cwd" not in line + ) + + open_files = [] + + for line in out.split("\n"): + if isopen(line): + fields = line.split("\0") + fd = fields[0][1:] + filename = fields[1][1:] + if filename in IGNORE_PAM: + continue + if filename.startswith("/"): + open_files.append((fd, filename)) + + return open_files + def matching_platform(self) -> bool: - try: + try: subprocess.run(("lsof", "-v"), check=True) except (OSError, subprocess.CalledProcessError): - return False - else: - return True - + return False + else: + return True + @hookimpl(hookwrapper=True, tryfirst=True) def pytest_runtest_protocol(self, item: Item) -> Generator[None, None, None]: - lines1 = self.get_open_files() - yield - if hasattr(sys, "pypy_version_info"): - gc.collect() - lines2 = self.get_open_files() - - new_fds = {t[0] for t in lines2} - {t[0] for t in lines1} - leaked_files = [t for t in lines2 if t[0] in new_fds] - if leaked_files: + lines1 = self.get_open_files() + yield + if hasattr(sys, "pypy_version_info"): + gc.collect() + lines2 = self.get_open_files() + + new_fds = {t[0] for t in lines2} - {t[0] for t in lines1} + leaked_files = [t for t in lines2 if t[0] in new_fds] + if leaked_files: error = [ "***** %s FD leakage detected" % len(leaked_files), *(str(f) for f in leaked_files), @@ -179,116 +179,116 @@ class LsofFdLeakChecker: "See issue #2366", ] item.warn(PytestWarning("\n".join(error))) - - -# used at least by pytest-xdist plugin - - + + +# used at least by pytest-xdist plugin + + @fixture def _pytest(request: FixtureRequest) -> "PytestArg": - """Return a helper which offers a gethookrecorder(hook) method which - returns a HookRecorder instance which helps to make assertions about called + """Return a helper which offers a gethookrecorder(hook) method which + returns a HookRecorder instance which helps to make assertions about called hooks.""" - return PytestArg(request) - - + return PytestArg(request) + + class PytestArg: def __init__(self, request: FixtureRequest) -> None: self._request = request - + def gethookrecorder(self, hook) -> "HookRecorder": - hookrecorder = HookRecorder(hook._pm) + hookrecorder = HookRecorder(hook._pm) self._request.addfinalizer(hookrecorder.finish_recording) - return hookrecorder - - + return hookrecorder + + def get_public_names(values: Iterable[str]) -> List[str]: - """Only return names from iterator values without a leading underscore.""" - return [x for x in values if x[0] != "_"] - - + """Only return names from iterator values without a leading underscore.""" + return [x for x in values if x[0] != "_"] + + class ParsedCall: def __init__(self, name: str, kwargs) -> None: - self.__dict__.update(kwargs) - self._name = name - + self.__dict__.update(kwargs) + self._name = name + def __repr__(self) -> str: - d = self.__dict__.copy() - del d["_name"] + d = self.__dict__.copy() + del d["_name"] return f"<ParsedCall {self._name!r}(**{d!r})>" - + if TYPE_CHECKING: # The class has undetermined attributes, this tells mypy about it. def __getattr__(self, key: str): ... - + class HookRecorder: - """Record all hooks called in a plugin manager. - - This wraps all the hook calls in the plugin manager, recording each call - before propagating the normal calls. - """ - + """Record all hooks called in a plugin manager. + + This wraps all the hook calls in the plugin manager, recording each call + before propagating the normal calls. + """ + def __init__(self, pluginmanager: PytestPluginManager) -> None: - self._pluginmanager = pluginmanager + self._pluginmanager = pluginmanager self.calls: List[ParsedCall] = [] self.ret: Optional[Union[int, ExitCode]] = None - + def before(hook_name: str, hook_impls, kwargs) -> None: - self.calls.append(ParsedCall(hook_name, kwargs)) - + self.calls.append(ParsedCall(hook_name, kwargs)) + def after(outcome, hook_name: str, hook_impls, kwargs) -> None: - pass - - self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after) - + pass + + self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after) + def finish_recording(self) -> None: - self._undo_wrapping() - + self._undo_wrapping() + def getcalls(self, names: Union[str, Iterable[str]]) -> List[ParsedCall]: - if isinstance(names, str): - names = names.split() - return [call for call in self.calls if call._name in names] - + if isinstance(names, str): + names = names.split() + return [call for call in self.calls if call._name in names] + def assert_contains(self, entries: Sequence[Tuple[str, str]]) -> None: - __tracebackhide__ = True - i = 0 - entries = list(entries) - backlocals = sys._getframe(1).f_locals - while entries: - name, check = entries.pop(0) - for ind, call in enumerate(self.calls[i:]): - if call._name == name: - print("NAMEMATCH", name, call) - if eval(check, backlocals, call.__dict__): - print("CHECKERMATCH", repr(check), "->", call) - else: - print("NOCHECKERMATCH", repr(check), "-", call) - continue - i += ind + 1 - break - print("NONAMEMATCH", name, "with", call) - else: + __tracebackhide__ = True + i = 0 + entries = list(entries) + backlocals = sys._getframe(1).f_locals + while entries: + name, check = entries.pop(0) + for ind, call in enumerate(self.calls[i:]): + if call._name == name: + print("NAMEMATCH", name, call) + if eval(check, backlocals, call.__dict__): + print("CHECKERMATCH", repr(check), "->", call) + else: + print("NOCHECKERMATCH", repr(check), "-", call) + continue + i += ind + 1 + break + print("NONAMEMATCH", name, "with", call) + else: fail(f"could not find {name!r} check {check!r}") - + def popcall(self, name: str) -> ParsedCall: - __tracebackhide__ = True - for i, call in enumerate(self.calls): - if call._name == name: - del self.calls[i] - return call + __tracebackhide__ = True + for i, call in enumerate(self.calls): + if call._name == name: + del self.calls[i] + return call lines = [f"could not find call {name!r}, in:"] - lines.extend([" %s" % x for x in self.calls]) + lines.extend([" %s" % x for x in self.calls]) fail("\n".join(lines)) - + def getcall(self, name: str) -> ParsedCall: - values = self.getcalls(name) - assert len(values) == 1, (name, values) - return values[0] - - # functionality for test reports - + values = self.getcalls(name) + assert len(values) == 1, (name, values) + return values[0] + + # functionality for test reports + @overload def getreports( self, names: "Literal['pytest_collectreport']", @@ -318,10 +318,10 @@ class HookRecorder: "pytest_runtest_logreport", ), ) -> Sequence[Union[CollectReport, TestReport]]: - return [x.report for x in self.getcalls(names)] - - def matchreport( - self, + return [x.report for x in self.getcalls(names)] + + def matchreport( + self, inamepart: str = "", names: Union[str, Iterable[str]] = ( "pytest_runtest_logreport", @@ -330,28 +330,28 @@ class HookRecorder: when: Optional[str] = None, ) -> Union[CollectReport, TestReport]: """Return a testreport whose dotted import path matches.""" - values = [] - for rep in self.getreports(names=names): + values = [] + for rep in self.getreports(names=names): if not when and rep.when != "call" and rep.passed: # setup/teardown passing reports - let's ignore those - continue + continue if when and rep.when != when: continue - if not inamepart or inamepart in rep.nodeid.split("::"): - values.append(rep) - if not values: - raise ValueError( - "could not find test report matching %r: " - "no test reports at all!" % (inamepart,) - ) - if len(values) > 1: - raise ValueError( + if not inamepart or inamepart in rep.nodeid.split("::"): + values.append(rep) + if not values: + raise ValueError( + "could not find test report matching %r: " + "no test reports at all!" % (inamepart,) + ) + if len(values) > 1: + raise ValueError( "found 2 or more testreports matching {!r}: {}".format( inamepart, values ) - ) - return values[0] - + ) + return values[0] + @overload def getfailures( self, names: "Literal['pytest_collectreport']", @@ -381,11 +381,11 @@ class HookRecorder: "pytest_runtest_logreport", ), ) -> Sequence[Union[CollectReport, TestReport]]: - return [rep for rep in self.getreports(names) if rep.failed] - + return [rep for rep in self.getreports(names) if rep.failed] + def getfailedcollections(self) -> Sequence[CollectReport]: - return self.getfailures("pytest_collectreport") - + return self.getfailures("pytest_collectreport") + def listoutcomes( self, ) -> Tuple[ @@ -393,46 +393,46 @@ class HookRecorder: Sequence[Union[CollectReport, TestReport]], Sequence[Union[CollectReport, TestReport]], ]: - passed = [] - skipped = [] - failed = [] + passed = [] + skipped = [] + failed = [] for rep in self.getreports( ("pytest_collectreport", "pytest_runtest_logreport") ): - if rep.passed: + if rep.passed: if rep.when == "call": assert isinstance(rep, TestReport) - passed.append(rep) - elif rep.skipped: - skipped.append(rep) + passed.append(rep) + elif rep.skipped: + skipped.append(rep) else: assert rep.failed, f"Unexpected outcome: {rep!r}" - failed.append(rep) - return passed, skipped, failed - + failed.append(rep) + return passed, skipped, failed + def countoutcomes(self) -> List[int]: - return [len(x) for x in self.listoutcomes()] - + return [len(x) for x in self.listoutcomes()] + def assertoutcome(self, passed: int = 0, skipped: int = 0, failed: int = 0) -> None: __tracebackhide__ = True from _pytest.pytester_assertions import assertoutcome - + outcomes = self.listoutcomes() assertoutcome( outcomes, passed=passed, skipped=skipped, failed=failed, ) def clear(self) -> None: - self.calls[:] = [] - - + self.calls[:] = [] + + @fixture def linecomp() -> "LineComp": """A :class: `LineComp` instance for checking that an input linearly contains a sequence of strings.""" - return LineComp() - - + return LineComp() + + @fixture(name="LineMatcher") def LineMatcher_fixture(request: FixtureRequest) -> Type["LineMatcher"]: """A reference to the :class: `LineMatcher`. @@ -440,18 +440,18 @@ def LineMatcher_fixture(request: FixtureRequest) -> Type["LineMatcher"]: This is instantiable with a list of lines (without their trailing newlines). This is useful for testing large texts, such as the output of commands. """ - return LineMatcher - - + return LineMatcher + + @fixture def pytester(request: FixtureRequest, tmp_path_factory: TempPathFactory) -> "Pytester": """ Facilities to write tests/configuration files, execute pytest in isolation, and match against expected output, perfect for black-box testing of pytest plugins. - + It attempts to isolate the test run from external factors as much as possible, modifying the current working directory to ``path`` and environment variables during initialization. - + It is particularly useful for testing plugins. It is similar to the :fixture:`tmp_path` fixture but provides methods which aid in testing pytest itself. """ @@ -491,11 +491,11 @@ def _config_for_test() -> Generator[Config, None, None]: rex_session_duration = re.compile(r"\d+\.\d\ds") # Regex to match all the counts and phrases in the summary line: "34 passed, 111 skipped". rex_outcome = re.compile(r"(\d+) (\w+)") - - + + class RunResult: """The result of running a command.""" - + def __init__( self, ret: Union[int, ExitCode], @@ -508,21 +508,21 @@ class RunResult: """The return value.""" except ValueError: self.ret = ret - self.outlines = outlines + self.outlines = outlines """List of lines captured from stdout.""" - self.errlines = errlines + self.errlines = errlines """List of lines captured from stderr.""" - self.stdout = LineMatcher(outlines) + self.stdout = LineMatcher(outlines) """:class:`LineMatcher` of stdout. Use e.g. :func:`str(stdout) <LineMatcher.__str__()>` to reconstruct stdout, or the commonly used :func:`stdout.fnmatch_lines() <LineMatcher.fnmatch_lines()>` method. """ - self.stderr = LineMatcher(errlines) + self.stderr = LineMatcher(errlines) """:class:`LineMatcher` of stderr.""" - self.duration = duration + self.duration = duration """Duration in seconds.""" - + def __repr__(self) -> str: return ( "<RunResult ret=%s len(stdout.lines)=%d len(stderr.lines)=%d duration=%.2fs>" @@ -531,14 +531,14 @@ class RunResult: def parseoutcomes(self) -> Dict[str, int]: """Return a dictionary of outcome noun -> count from parsing the terminal - output that the test process produced. - + output that the test process produced. + The returned nouns will always be in plural form:: ======= 1 failed, 1 passed, 1 warning, 1 error in 0.13s ==== Will return ``{"failed": 1, "passed": 1, "warnings": 1, "errors": 1}``. - """ + """ return self.parse_summary_nouns(self.outlines) @classmethod @@ -553,19 +553,19 @@ class RunResult: """ for line in reversed(lines): if rex_session_duration.search(line): - outcomes = rex_outcome.findall(line) + outcomes = rex_outcome.findall(line) ret = {noun: int(count) for (count, noun) in outcomes} break else: raise ValueError("Pytest terminal summary report not found") - + to_plural = { "warning": "warnings", "error": "errors", } return {to_plural.get(k, k): v for k, v in ret.items()} - def assert_outcomes( + def assert_outcomes( self, passed: int = 0, skipped: int = 0, @@ -574,11 +574,11 @@ class RunResult: xpassed: int = 0, xfailed: int = 0, ) -> None: - """Assert that the specified outcomes appear with the respective + """Assert that the specified outcomes appear with the respective numbers (0 means it didn't occur) in the text output from a test run.""" __tracebackhide__ = True from _pytest.pytester_assertions import assert_outcomes - + outcomes = self.parseoutcomes() assert_outcomes( outcomes, @@ -589,65 +589,65 @@ class RunResult: xpassed=xpassed, xfailed=xfailed, ) - - + + class CwdSnapshot: def __init__(self) -> None: - self.__saved = os.getcwd() - + self.__saved = os.getcwd() + def restore(self) -> None: - os.chdir(self.__saved) - - + os.chdir(self.__saved) + + class SysModulesSnapshot: def __init__(self, preserve: Optional[Callable[[str], bool]] = None) -> None: - self.__preserve = preserve - self.__saved = dict(sys.modules) - + self.__preserve = preserve + self.__saved = dict(sys.modules) + def restore(self) -> None: - if self.__preserve: - self.__saved.update( - (k, m) for k, m in sys.modules.items() if self.__preserve(k) - ) - sys.modules.clear() - sys.modules.update(self.__saved) - - + if self.__preserve: + self.__saved.update( + (k, m) for k, m in sys.modules.items() if self.__preserve(k) + ) + sys.modules.clear() + sys.modules.update(self.__saved) + + class SysPathsSnapshot: def __init__(self) -> None: - self.__saved = list(sys.path), list(sys.meta_path) - + self.__saved = list(sys.path), list(sys.meta_path) + def restore(self) -> None: - sys.path[:], sys.meta_path[:] = self.__saved - - + sys.path[:], sys.meta_path[:] = self.__saved + + @final class Pytester: """ Facilities to write tests/configuration files, execute pytest in isolation, and match against expected output, perfect for black-box testing of pytest plugins. - + It attempts to isolate the test run from external factors as much as possible, modifying the current working directory to ``path`` and environment variables during initialization. - - Attributes: - + + Attributes: + :ivar Path path: temporary directory path used to create files/run tests from, etc. - + :ivar plugins: A list of plugins to use with :py:meth:`parseconfig` and - :py:meth:`runpytest`. Initially this is an empty list but plugins can - be added to the list. The type of items to add to the list depends on - the method using them so refer to them for details. - """ - + :py:meth:`runpytest`. Initially this is an empty list but plugins can + be added to the list. The type of items to add to the list depends on + the method using them so refer to them for details. + """ + __test__ = False CLOSE_STDIN = object - class TimeoutExpired(Exception): - pass - + class TimeoutExpired(Exception): + pass + def __init__( self, request: FixtureRequest, @@ -667,14 +667,14 @@ class Pytester: self._name = name self._path: Path = tmp_path_factory.mktemp(name, numbered=True) self.plugins: List[Union[str, _PluggyPlugin]] = [] - self._cwd_snapshot = CwdSnapshot() - self._sys_path_snapshot = SysPathsSnapshot() - self._sys_modules_snapshot = self.__take_sys_modules_snapshot() - self.chdir() + self._cwd_snapshot = CwdSnapshot() + self._sys_path_snapshot = SysPathsSnapshot() + self._sys_modules_snapshot = self.__take_sys_modules_snapshot() + self.chdir() self._request.addfinalizer(self._finalize) self._method = self._request.config.getoption("--runpytest") self._test_tmproot = tmp_path_factory.mktemp(f"tmp-{name}", numbered=True) - + self._monkeypatch = mp = MonkeyPatch() mp.setenv("PYTEST_DEBUG_TEMPROOT", str(self._test_tmproot)) # Ensure no unexpected caching via tox. @@ -692,48 +692,48 @@ class Pytester: def path(self) -> Path: """Temporary directory where files are created and pytest is executed.""" return self._path - + def __repr__(self) -> str: return f"<Pytester {self.path!r}>" - + def _finalize(self) -> None: """ Clean up global state artifacts. - - Some methods modify the global interpreter state and this tries to + + Some methods modify the global interpreter state and this tries to clean this up. It does not remove the temporary directory however so - it can be looked at after the test run has finished. - """ - self._sys_modules_snapshot.restore() - self._sys_path_snapshot.restore() - self._cwd_snapshot.restore() + it can be looked at after the test run has finished. + """ + self._sys_modules_snapshot.restore() + self._sys_path_snapshot.restore() + self._cwd_snapshot.restore() self._monkeypatch.undo() - + def __take_sys_modules_snapshot(self) -> SysModulesSnapshot: # Some zope modules used by twisted-related tests keep internal state - # and can't be deleted; we had some trouble in the past with + # and can't be deleted; we had some trouble in the past with # `zope.interface` for example. # # Preserve readline due to https://bugs.python.org/issue41033. # pexpect issues a SIGWINCH. - def preserve_module(name): + def preserve_module(name): return name.startswith(("zope", "readline")) - - return SysModulesSnapshot(preserve=preserve_module) - + + return SysModulesSnapshot(preserve=preserve_module) + def make_hook_recorder(self, pluginmanager: PytestPluginManager) -> HookRecorder: - """Create a new :py:class:`HookRecorder` for a PluginManager.""" - pluginmanager.reprec = reprec = HookRecorder(pluginmanager) + """Create a new :py:class:`HookRecorder` for a PluginManager.""" + pluginmanager.reprec = reprec = HookRecorder(pluginmanager) self._request.addfinalizer(reprec.finish_recording) - return reprec - + return reprec + def chdir(self) -> None: - """Cd into the temporary directory. - - This is done automatically upon instantiation. - """ + """Cd into the temporary directory. + + This is done automatically upon instantiation. + """ os.chdir(self.path) - + def _makefile( self, ext: str, @@ -742,30 +742,30 @@ class Pytester: encoding: str = "utf-8", ) -> Path: items = list(files.items()) - + def to_text(s: Union[Any, bytes]) -> str: return s.decode(encoding) if isinstance(s, bytes) else str(s) - + if lines: source = "\n".join(to_text(x) for x in lines) basename = self._name - items.insert(0, (basename, source)) - - ret = None - for basename, value in items: + items.insert(0, (basename, source)) + + ret = None + for basename, value in items: p = self.path.joinpath(basename).with_suffix(ext) p.parent.mkdir(parents=True, exist_ok=True) source_ = Source(value) source = "\n".join(to_text(line) for line in source_.lines) p.write_text(source.strip(), encoding=encoding) - if ret is None: - ret = p + if ret is None: + ret = p assert ret is not None - return ret - + return ret + def makefile(self, ext: str, *args: str, **kwargs: str) -> Path: r"""Create new file(s) in the test directory. - + :param str ext: The extension the file(s) should use, including the dot, e.g. `.py`. :param args: @@ -775,31 +775,31 @@ class Pytester: :param kwargs: Each keyword is the name of a file, while the value of it will be written as contents of the file. - - Examples: - - .. code-block:: python - + + Examples: + + .. code-block:: python + pytester.makefile(".txt", "line1", "line2") - + pytester.makefile(".ini", pytest="[pytest]\naddopts=-rs\n") - - """ - return self._makefile(ext, args, kwargs) - + + """ + return self._makefile(ext, args, kwargs) + def makeconftest(self, source: str) -> Path: - """Write a contest.py file with 'source' as contents.""" - return self.makepyfile(conftest=source) - + """Write a contest.py file with 'source' as contents.""" + return self.makepyfile(conftest=source) + def makeini(self, source: str) -> Path: - """Write a tox.ini file with 'source' as contents.""" - return self.makefile(".ini", tox=source) - + """Write a tox.ini file with 'source' as contents.""" + return self.makefile(".ini", tox=source) + def getinicfg(self, source: str) -> SectionWrapper: - """Return the pytest section from the tox.ini config file.""" - p = self.makeini(source) + """Return the pytest section from the tox.ini config file.""" + p = self.makeini(source) return IniConfig(str(p))["pytest"] - + def makepyprojecttoml(self, source: str) -> Path: """Write a pyproject.toml file with 'source' as contents. @@ -825,8 +825,8 @@ class Pytester: # At this point, both 'test_something.py' & 'custom.py' exist in the test directory. """ - return self._makefile(".py", args, kwargs) - + return self._makefile(".py", args, kwargs) + def maketxtfile(self, *args, **kwargs) -> Path: r"""Shortcut for .makefile() with a .txt extension. @@ -845,38 +845,38 @@ class Pytester: # At this point, both 'test_something.txt' & 'custom.txt' exist in the test directory. """ - return self._makefile(".txt", args, kwargs) - + return self._makefile(".txt", args, kwargs) + def syspathinsert( self, path: Optional[Union[str, "os.PathLike[str]"]] = None ) -> None: - """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`. - - This is undone automatically when this object dies at the end of each - test. - """ - if path is None: + """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`. + + This is undone automatically when this object dies at the end of each + test. + """ + if path is None: path = self.path - + self._monkeypatch.syspath_prepend(str(path)) - + def mkdir(self, name: str) -> Path: - """Create a new (sub)directory.""" + """Create a new (sub)directory.""" p = self.path / name p.mkdir() return p - + def mkpydir(self, name: str) -> Path: - """Create a new python package. - - This creates a (sub)directory with an empty ``__init__.py`` file so it + """Create a new python package. + + This creates a (sub)directory with an empty ``__init__.py`` file so it gets recognised as a Python package. - """ + """ p = self.path / name p.mkdir() p.joinpath("__init__.py").touch() - return p - + return p + def copy_example(self, name: Optional[str] = None) -> Path: """Copy file from project's directory into the testdir. @@ -885,30 +885,30 @@ class Pytester: """ example_dir = self._request.config.getini("pytester_example_dir") - if example_dir is None: - raise ValueError("pytester_example_dir is unset, can't copy examples") + if example_dir is None: + raise ValueError("pytester_example_dir is unset, can't copy examples") example_dir = Path(str(self._request.config.rootdir)) / example_dir - + for extra_element in self._request.node.iter_markers("pytester_example_path"): - assert extra_element.args + assert extra_element.args example_dir = example_dir.joinpath(*extra_element.args) - - if name is None: + + if name is None: func_name = self._name - maybe_dir = example_dir / func_name - maybe_file = example_dir / (func_name + ".py") - + maybe_dir = example_dir / func_name + maybe_file = example_dir / (func_name + ".py") + if maybe_dir.is_dir(): - example_path = maybe_dir + example_path = maybe_dir elif maybe_file.is_file(): - example_path = maybe_file - else: - raise LookupError( + example_path = maybe_file + else: + raise LookupError( f"{func_name} can't be found as module or package in {example_dir}" - ) - else: + ) + else: example_path = example_dir.joinpath(name) - + if example_path.is_dir() and not example_path.joinpath("__init__.py").is_file(): # TODO: py.path.local.copy can copy files to existing directories, # while with shutil.copytree the destination directory cannot exist, @@ -918,119 +918,119 @@ class Pytester: elif example_path.is_file(): result = self.path.joinpath(example_path.name) shutil.copy(example_path, result) - return result - else: - raise LookupError( + return result + else: + raise LookupError( f'example "{example_path}" is not found as a file or directory' - ) - - Session = Session - + ) + + Session = Session + def getnode( self, config: Config, arg: Union[str, "os.PathLike[str]"] ) -> Optional[Union[Collector, Item]]: - """Return the collection node of a file. - + """Return the collection node of a file. + :param _pytest.config.Config config: A pytest config. See :py:meth:`parseconfig` and :py:meth:`parseconfigure` for creating it. :param py.path.local arg: Path to the file. - """ + """ session = Session.from_config(config) - assert "::" not in str(arg) - p = py.path.local(arg) - config.hook.pytest_sessionstart(session=session) - res = session.perform_collect([str(p)], genitems=False)[0] + assert "::" not in str(arg) + p = py.path.local(arg) + config.hook.pytest_sessionstart(session=session) + res = session.perform_collect([str(p)], genitems=False)[0] config.hook.pytest_sessionfinish(session=session, exitstatus=ExitCode.OK) - return res - + return res + def getpathnode(self, path: Union[str, "os.PathLike[str]"]): - """Return the collection node of a file. - - This is like :py:meth:`getnode` but uses :py:meth:`parseconfigure` to - create the (configured) pytest Config instance. - + """Return the collection node of a file. + + This is like :py:meth:`getnode` but uses :py:meth:`parseconfigure` to + create the (configured) pytest Config instance. + :param py.path.local path: Path to the file. - """ + """ path = py.path.local(path) - config = self.parseconfigure(path) + config = self.parseconfigure(path) session = Session.from_config(config) - x = session.fspath.bestrelpath(path) - config.hook.pytest_sessionstart(session=session) - res = session.perform_collect([x], genitems=False)[0] + x = session.fspath.bestrelpath(path) + config.hook.pytest_sessionstart(session=session) + res = session.perform_collect([x], genitems=False)[0] config.hook.pytest_sessionfinish(session=session, exitstatus=ExitCode.OK) - return res - + return res + def genitems(self, colitems: Sequence[Union[Item, Collector]]) -> List[Item]: - """Generate all test items from a collection node. - - This recurses into the collection node and returns a list of all the - test items contained within. - """ - session = colitems[0].session + """Generate all test items from a collection node. + + This recurses into the collection node and returns a list of all the + test items contained within. + """ + session = colitems[0].session result: List[Item] = [] - for colitem in colitems: - result.extend(session.genitems(colitem)) - return result - + for colitem in colitems: + result.extend(session.genitems(colitem)) + return result + def runitem(self, source: str) -> Any: - """Run the "test_func" Item. - - The calling test instance (class containing the test method) must - provide a ``.getrunner()`` method which should return a runner which - can run the test protocol for a single item, e.g. - :py:func:`_pytest.runner.runtestprotocol`. - """ - # used from runner functional tests - item = self.getitem(source) - # the test class where we are called from wants to provide the runner + """Run the "test_func" Item. + + The calling test instance (class containing the test method) must + provide a ``.getrunner()`` method which should return a runner which + can run the test protocol for a single item, e.g. + :py:func:`_pytest.runner.runtestprotocol`. + """ + # used from runner functional tests + item = self.getitem(source) + # the test class where we are called from wants to provide the runner testclassinstance = self._request.instance - runner = testclassinstance.getrunner() - return runner(item) - + runner = testclassinstance.getrunner() + return runner(item) + def inline_runsource(self, source: str, *cmdlineargs) -> HookRecorder: - """Run a test module in process using ``pytest.main()``. - - This run writes "source" into a temporary file and runs - ``pytest.main()`` on it, returning a :py:class:`HookRecorder` instance - for the result. - + """Run a test module in process using ``pytest.main()``. + + This run writes "source" into a temporary file and runs + ``pytest.main()`` on it, returning a :py:class:`HookRecorder` instance + for the result. + :param source: The source code of the test module. - + :param cmdlineargs: Any extra command line arguments to use. - + :returns: :py:class:`HookRecorder` instance of the result. - """ - p = self.makepyfile(source) - values = list(cmdlineargs) + [p] - return self.inline_run(*values) - + """ + p = self.makepyfile(source) + values = list(cmdlineargs) + [p] + return self.inline_run(*values) + def inline_genitems(self, *args) -> Tuple[List[Item], HookRecorder]: - """Run ``pytest.main(['--collectonly'])`` in-process. - - Runs the :py:func:`pytest.main` function to run all of pytest inside - the test process itself like :py:meth:`inline_run`, but returns a - tuple of the collected items and a :py:class:`HookRecorder` instance. - """ - rec = self.inline_run("--collect-only", *args) - items = [x.item for x in rec.getcalls("pytest_itemcollected")] - return items, rec - + """Run ``pytest.main(['--collectonly'])`` in-process. + + Runs the :py:func:`pytest.main` function to run all of pytest inside + the test process itself like :py:meth:`inline_run`, but returns a + tuple of the collected items and a :py:class:`HookRecorder` instance. + """ + rec = self.inline_run("--collect-only", *args) + items = [x.item for x in rec.getcalls("pytest_itemcollected")] + return items, rec + def inline_run( self, *args: Union[str, "os.PathLike[str]"], plugins=(), no_reraise_ctrlc: bool = False, ) -> HookRecorder: - """Run ``pytest.main()`` in-process, returning a HookRecorder. - - Runs the :py:func:`pytest.main` function to run all of pytest inside - the test process itself. This means it can return a - :py:class:`HookRecorder` instance which gives more detailed results - from that run than can be done by matching stdout/stderr from - :py:meth:`runpytest`. - + """Run ``pytest.main()`` in-process, returning a HookRecorder. + + Runs the :py:func:`pytest.main` function to run all of pytest inside + the test process itself. This means it can return a + :py:class:`HookRecorder` instance which gives more detailed results + from that run than can be done by matching stdout/stderr from + :py:meth:`runpytest`. + :param args: Command line arguments to pass to :py:func:`pytest.main`. :param plugins: @@ -1045,99 +1045,99 @@ class Pytester: # properly between file creation and inline_run (especially if imports # are interspersed with file creation) importlib.invalidate_caches() - + plugins = list(plugins) - finalizers = [] - try: - # Any sys.module or sys.path changes done while running pytest - # inline should be reverted after the test run completes to avoid - # clashing with later inline tests run within the same pytest test, - # e.g. just because they use matching test module names. - finalizers.append(self.__take_sys_modules_snapshot().restore) - finalizers.append(SysPathsSnapshot().restore) - - # Important note: - # - our tests should not leave any other references/registrations - # laying around other than possibly loaded test modules - # referenced from sys.modules, as nothing will clean those up - # automatically - - rec = [] - + finalizers = [] + try: + # Any sys.module or sys.path changes done while running pytest + # inline should be reverted after the test run completes to avoid + # clashing with later inline tests run within the same pytest test, + # e.g. just because they use matching test module names. + finalizers.append(self.__take_sys_modules_snapshot().restore) + finalizers.append(SysPathsSnapshot().restore) + + # Important note: + # - our tests should not leave any other references/registrations + # laying around other than possibly loaded test modules + # referenced from sys.modules, as nothing will clean those up + # automatically + + rec = [] + class Collect: def pytest_configure(x, config: Config) -> None: - rec.append(self.make_hook_recorder(config.pluginmanager)) - - plugins.append(Collect()) + rec.append(self.make_hook_recorder(config.pluginmanager)) + + plugins.append(Collect()) ret = main([str(x) for x in args], plugins=plugins) - if len(rec) == 1: - reprec = rec.pop() - else: - + if len(rec) == 1: + reprec = rec.pop() + else: + class reprec: # type: ignore - pass - + pass + reprec.ret = ret # type: ignore - + # Typically we reraise keyboard interrupts from the child run # because it's our user requesting interruption of the testing. if ret == ExitCode.INTERRUPTED and not no_reraise_ctrlc: - calls = reprec.getcalls("pytest_keyboard_interrupt") - if calls and calls[-1].excinfo.type == KeyboardInterrupt: - raise KeyboardInterrupt() - return reprec - finally: - for finalizer in finalizers: - finalizer() - + calls = reprec.getcalls("pytest_keyboard_interrupt") + if calls and calls[-1].excinfo.type == KeyboardInterrupt: + raise KeyboardInterrupt() + return reprec + finally: + for finalizer in finalizers: + finalizer() + def runpytest_inprocess( self, *args: Union[str, "os.PathLike[str]"], **kwargs: Any ) -> RunResult: - """Return result of running pytest in-process, providing a similar + """Return result of running pytest in-process, providing a similar interface to what self.runpytest() provides.""" syspathinsert = kwargs.pop("syspathinsert", False) - + if syspathinsert: - self.syspathinsert() + self.syspathinsert() now = timing.time() capture = _get_multicapture("sys") - capture.start_capturing() - try: - try: - reprec = self.inline_run(*args, **kwargs) - except SystemExit as e: + capture.start_capturing() + try: + try: + reprec = self.inline_run(*args, **kwargs) + except SystemExit as e: ret = e.args[0] try: ret = ExitCode(e.args[0]) except ValueError: pass - + class reprec: # type: ignore ret = ret - - except Exception: - traceback.print_exc() - + + except Exception: + traceback.print_exc() + class reprec: # type: ignore ret = ExitCode(3) - - finally: - out, err = capture.readouterr() - capture.stop_capturing() - sys.stdout.write(out) - sys.stderr.write(err) - + + finally: + out, err = capture.readouterr() + capture.stop_capturing() + sys.stdout.write(out) + sys.stderr.write(err) + assert reprec.ret is not None res = RunResult( reprec.ret, out.splitlines(), err.splitlines(), timing.time() - now ) res.reprec = reprec # type: ignore - return res - + return res + def runpytest( self, *args: Union[str, "os.PathLike[str]"], **kwargs: Any ) -> RunResult: - """Run pytest inline or in a subprocess, depending on the command line + """Run pytest inline or in a subprocess, depending on the command line option "--runpytest" and return a :py:class:`RunResult`.""" new_args = self._ensure_basetemp(args) if self._method == "inprocess": @@ -1145,128 +1145,128 @@ class Pytester: elif self._method == "subprocess": return self.runpytest_subprocess(*new_args, **kwargs) raise RuntimeError(f"Unrecognized runpytest option: {self._method}") - + def _ensure_basetemp( self, args: Sequence[Union[str, "os.PathLike[str]"]] ) -> List[Union[str, "os.PathLike[str]"]]: new_args = list(args) for x in new_args: if str(x).startswith("--basetemp"): - break - else: + break + else: new_args.append("--basetemp=%s" % self.path.parent.joinpath("basetemp")) return new_args - + def parseconfig(self, *args: Union[str, "os.PathLike[str]"]) -> Config: - """Return a new pytest Config instance from given commandline args. - - This invokes the pytest bootstrapping code in _pytest.config to create - a new :py:class:`_pytest.core.PluginManager` and call the - pytest_cmdline_parse hook to create a new - :py:class:`_pytest.config.Config` instance. - - If :py:attr:`plugins` has been populated they should be plugin modules - to be registered with the PluginManager. - """ - import _pytest.config - + """Return a new pytest Config instance from given commandline args. + + This invokes the pytest bootstrapping code in _pytest.config to create + a new :py:class:`_pytest.core.PluginManager` and call the + pytest_cmdline_parse hook to create a new + :py:class:`_pytest.config.Config` instance. + + If :py:attr:`plugins` has been populated they should be plugin modules + to be registered with the PluginManager. + """ + import _pytest.config + new_args = self._ensure_basetemp(args) new_args = [str(x) for x in new_args] config = _pytest.config._prepareconfig(new_args, self.plugins) # type: ignore[arg-type] - # we don't know what the test will do with this half-setup config - # object and thus we make sure it gets unconfigured properly in any - # case (otherwise capturing could still be active, for example) + # we don't know what the test will do with this half-setup config + # object and thus we make sure it gets unconfigured properly in any + # case (otherwise capturing could still be active, for example) self._request.addfinalizer(config._ensure_unconfigure) - return config - + return config + def parseconfigure(self, *args: Union[str, "os.PathLike[str]"]) -> Config: - """Return a new pytest configured Config instance. - + """Return a new pytest configured Config instance. + Returns a new :py:class:`_pytest.config.Config` instance like - :py:meth:`parseconfig`, but also calls the pytest_configure hook. - """ - config = self.parseconfig(*args) - config._do_configure() - return config - + :py:meth:`parseconfig`, but also calls the pytest_configure hook. + """ + config = self.parseconfig(*args) + config._do_configure() + return config + def getitem(self, source: str, funcname: str = "test_func") -> Item: - """Return the test item for a test function. - + """Return the test item for a test function. + Writes the source to a python file and runs pytest's collection on - the resulting module, returning the test item for the requested - function name. - + the resulting module, returning the test item for the requested + function name. + :param source: The module source. :param funcname: The name of the test function for which to return a test item. - """ - items = self.getitems(source) - for item in items: - if item.name == funcname: - return item + """ + items = self.getitems(source) + for item in items: + if item.name == funcname: + return item assert 0, "{!r} item not found in module:\n{}\nitems: {}".format( funcname, source, items - ) - + ) + def getitems(self, source: str) -> List[Item]: - """Return all test items collected from the module. - + """Return all test items collected from the module. + Writes the source to a Python file and runs pytest's collection on - the resulting module, returning all test items contained within. - """ - modcol = self.getmodulecol(source) - return self.genitems([modcol]) - + the resulting module, returning all test items contained within. + """ + modcol = self.getmodulecol(source) + return self.genitems([modcol]) + def getmodulecol( self, source: Union[str, Path], configargs=(), *, withinit: bool = False ): - """Return the module collection node for ``source``. - + """Return the module collection node for ``source``. + Writes ``source`` to a file using :py:meth:`makepyfile` and then - runs the pytest collection on it, returning the collection node for the - test module. - + runs the pytest collection on it, returning the collection node for the + test module. + :param source: The source code of the module to collect. - + :param configargs: Any extra arguments to pass to :py:meth:`parseconfigure`. - + :param withinit: Whether to also write an ``__init__.py`` file to the same directory to ensure it is a package. - """ - if isinstance(source, Path): + """ + if isinstance(source, Path): path = self.path.joinpath(source) - assert not withinit, "not supported for paths" - else: + assert not withinit, "not supported for paths" + else: kw = {self._name: str(source)} - path = self.makepyfile(**kw) - if withinit: - self.makepyfile(__init__="#") - self.config = config = self.parseconfigure(path, *configargs) - return self.getnode(config, path) - + path = self.makepyfile(**kw) + if withinit: + self.makepyfile(__init__="#") + self.config = config = self.parseconfigure(path, *configargs) + return self.getnode(config, path) + def collect_by_name( self, modcol: Collector, name: str ) -> Optional[Union[Item, Collector]]: - """Return the collection node for name from the module collection. - + """Return the collection node for name from the module collection. + Searchs a module collection node for a collection node matching the given name. - + :param modcol: A module collection node; see :py:meth:`getmodulecol`. :param name: The name of the node to return. - """ - if modcol not in self._mod_collections: - self._mod_collections[modcol] = list(modcol.collect()) - for colitem in self._mod_collections[modcol]: - if colitem.name == name: - return colitem + """ + if modcol not in self._mod_collections: + self._mod_collections[modcol] = list(modcol.collect()) + for colitem in self._mod_collections[modcol]: + if colitem.name == name: + return colitem return None - + def popen( self, cmdargs, @@ -1275,26 +1275,26 @@ class Pytester: stdin=CLOSE_STDIN, **kw, ): - """Invoke subprocess.Popen. - + """Invoke subprocess.Popen. + Calls subprocess.Popen making sure the current working directory is in the PYTHONPATH. - - You probably want to use :py:meth:`run` instead. - """ - env = os.environ.copy() - env["PYTHONPATH"] = os.pathsep.join( - filter(None, [os.getcwd(), env.get("PYTHONPATH", "")]) - ) - kw["env"] = env - + + You probably want to use :py:meth:`run` instead. + """ + env = os.environ.copy() + env["PYTHONPATH"] = os.pathsep.join( + filter(None, [os.getcwd(), env.get("PYTHONPATH", "")]) + ) + kw["env"] = env + if stdin is self.CLOSE_STDIN: kw["stdin"] = subprocess.PIPE elif isinstance(stdin, bytes): kw["stdin"] = subprocess.PIPE else: kw["stdin"] = stdin - + popen = subprocess.Popen(cmdargs, stdout=stdout, stderr=stderr, **kw) if stdin is self.CLOSE_STDIN: assert popen.stdin is not None @@ -1303,18 +1303,18 @@ class Pytester: assert popen.stdin is not None popen.stdin.write(stdin) - return popen - + return popen + def run( self, *cmdargs: Union[str, "os.PathLike[str]"], timeout: Optional[float] = None, stdin=CLOSE_STDIN, ) -> RunResult: - """Run a command with arguments. - - Run a process using subprocess.Popen saving the stdout and stderr. - + """Run a command with arguments. + + Run a process using subprocess.Popen saving the stdout and stderr. + :param cmdargs: The sequence of arguments to pass to `subprocess.Popen()`, with path-like objects being converted to ``str`` automatically. @@ -1326,11 +1326,11 @@ class Pytester: the pipe, otherwise it is passed through to ``popen``. Defaults to ``CLOSE_STDIN``, which translates to using a pipe (``subprocess.PIPE``) that gets closed. - + :rtype: RunResult - """ - __tracebackhide__ = True - + """ + __tracebackhide__ = True + # TODO: Remove type ignore in next mypy release. # https://github.com/python/typeshed/pull/4582 cmdargs = tuple( @@ -1338,125 +1338,125 @@ class Pytester: ) p1 = self.path.joinpath("stdout") p2 = self.path.joinpath("stderr") - print("running:", *cmdargs) + print("running:", *cmdargs) print(" in:", Path.cwd()) with p1.open("w", encoding="utf8") as f1, p2.open("w", encoding="utf8") as f2: now = timing.time() - popen = self.popen( + popen = self.popen( cmdargs, stdin=stdin, stdout=f1, stderr=f2, close_fds=(sys.platform != "win32"), - ) + ) if popen.stdin is not None: popen.stdin.close() - + def handle_timeout() -> None: - __tracebackhide__ = True - - timeout_message = ( - "{seconds} second timeout expired running:" - " {command}".format(seconds=timeout, command=cmdargs) - ) - - popen.kill() - popen.wait() - raise self.TimeoutExpired(timeout_message) - - if timeout is None: - ret = popen.wait() + __tracebackhide__ = True + + timeout_message = ( + "{seconds} second timeout expired running:" + " {command}".format(seconds=timeout, command=cmdargs) + ) + + popen.kill() + popen.wait() + raise self.TimeoutExpired(timeout_message) + + if timeout is None: + ret = popen.wait() else: - try: - ret = popen.wait(timeout) - except subprocess.TimeoutExpired: - handle_timeout() + try: + ret = popen.wait(timeout) + except subprocess.TimeoutExpired: + handle_timeout() with p1.open(encoding="utf8") as f1, p2.open(encoding="utf8") as f2: - out = f1.read().splitlines() - err = f2.read().splitlines() + out = f1.read().splitlines() + err = f2.read().splitlines() - self._dump_lines(out, sys.stdout) - self._dump_lines(err, sys.stderr) + self._dump_lines(out, sys.stdout) + self._dump_lines(err, sys.stderr) with contextlib.suppress(ValueError): ret = ExitCode(ret) return RunResult(ret, out, err, timing.time() - now) - - def _dump_lines(self, lines, fp): - try: - for line in lines: - print(line, file=fp) - except UnicodeEncodeError: + + def _dump_lines(self, lines, fp): + try: + for line in lines: + print(line, file=fp) + except UnicodeEncodeError: print(f"couldn't print to {fp} because of encoding") - + def _getpytestargs(self) -> Tuple[str, ...]: - return sys.executable, "-mpytest" - + return sys.executable, "-mpytest" + def runpython(self, script) -> RunResult: - """Run a python script using sys.executable as interpreter. - + """Run a python script using sys.executable as interpreter. + :rtype: RunResult - """ - return self.run(sys.executable, script) - - def runpython_c(self, command): + """ + return self.run(sys.executable, script) + + def runpython_c(self, command): """Run python -c "command". :rtype: RunResult """ - return self.run(sys.executable, "-c", command) - + return self.run(sys.executable, "-c", command) + def runpytest_subprocess(self, *args, timeout: Optional[float] = None) -> RunResult: - """Run pytest as a subprocess with given arguments. - - Any plugins added to the :py:attr:`plugins` list will be added using the - ``-p`` command line option. Additionally ``--basetemp`` is used to put - any temporary files and directories in a numbered directory prefixed - with "runpytest-" to not conflict with the normal numbered pytest - location for temporary files and directories. - + """Run pytest as a subprocess with given arguments. + + Any plugins added to the :py:attr:`plugins` list will be added using the + ``-p`` command line option. Additionally ``--basetemp`` is used to put + any temporary files and directories in a numbered directory prefixed + with "runpytest-" to not conflict with the normal numbered pytest + location for temporary files and directories. + :param args: The sequence of arguments to pass to the pytest subprocess. :param timeout: The period in seconds after which to timeout and raise :py:class:`Pytester.TimeoutExpired`. - + :rtype: RunResult - """ - __tracebackhide__ = True + """ + __tracebackhide__ = True p = make_numbered_dir(root=self.path, prefix="runpytest-", mode=0o700) - args = ("--basetemp=%s" % p,) + args - plugins = [x for x in self.plugins if isinstance(x, str)] - if plugins: - args = ("-p", plugins[0]) + args - args = self._getpytestargs() + args + args = ("--basetemp=%s" % p,) + args + plugins = [x for x in self.plugins if isinstance(x, str)] + if plugins: + args = ("-p", plugins[0]) + args + args = self._getpytestargs() + args return self.run(*args, timeout=timeout) - + def spawn_pytest( self, string: str, expect_timeout: float = 10.0 ) -> "pexpect.spawn": - """Run pytest using pexpect. - - This makes sure to use the right pytest and sets up the temporary - directory locations. - - The pexpect child is returned. - """ + """Run pytest using pexpect. + + This makes sure to use the right pytest and sets up the temporary + directory locations. + + The pexpect child is returned. + """ basetemp = self.path / "temp-pexpect" basetemp.mkdir(mode=0o700) - invoke = " ".join(map(str, self._getpytestargs())) + invoke = " ".join(map(str, self._getpytestargs())) cmd = f"{invoke} --basetemp={basetemp} {string}" - return self.spawn(cmd, expect_timeout=expect_timeout) - + return self.spawn(cmd, expect_timeout=expect_timeout) + def spawn(self, cmd: str, expect_timeout: float = 10.0) -> "pexpect.spawn": - """Run a command using pexpect. - - The pexpect child is returned. - """ + """Run a command using pexpect. + + The pexpect child is returned. + """ pexpect = importorskip("pexpect", "3.0") - if hasattr(sys, "pypy_version_info") and "64" in platform.machine(): + if hasattr(sys, "pypy_version_info") and "64" in platform.machine(): skip("pypy-64 bit not supported") if not hasattr(pexpect, "spawn"): skip("pexpect.spawn not available") @@ -1464,27 +1464,27 @@ class Pytester: child = pexpect.spawn(cmd, logfile=logfile, timeout=expect_timeout) self._request.addfinalizer(logfile.close) - return child - - + return child + + class LineComp: def __init__(self) -> None: self.stringio = StringIO() """:class:`python:io.StringIO()` instance used for input.""" - + def assert_contains_lines(self, lines2: Sequence[str]) -> None: """Assert that ``lines2`` are contained (linearly) in :attr:`stringio`'s value. - + Lines are matched using :func:`LineMatcher.fnmatch_lines`. - """ - __tracebackhide__ = True - val = self.stringio.getvalue() - self.stringio.truncate(0) - self.stringio.seek(0) - lines1 = val.split("\n") + """ + __tracebackhide__ = True + val = self.stringio.getvalue() + self.stringio.truncate(0) + self.stringio.seek(0) + lines1 = val.split("\n") LineMatcher(lines1).fnmatch_lines(lines2) - - + + @final @attr.s(repr=False, str=False, init=False) class Testdir: @@ -1697,19 +1697,19 @@ class Testdir: class LineMatcher: - """Flexible matching of text. - - This is a convenience class to test large texts like the output of - commands. - - The constructor takes a list of lines without their trailing newlines, i.e. - ``text.splitlines()``. - """ - + """Flexible matching of text. + + This is a convenience class to test large texts like the output of + commands. + + The constructor takes a list of lines without their trailing newlines, i.e. + ``text.splitlines()``. + """ + def __init__(self, lines: List[str]) -> None: - self.lines = lines + self.lines = lines self._log_output: List[str] = [] - + def __str__(self) -> str: """Return the entire original text. @@ -1719,90 +1719,90 @@ class LineMatcher: return "\n".join(self.lines) def _getlines(self, lines2: Union[str, Sequence[str], Source]) -> Sequence[str]: - if isinstance(lines2, str): - lines2 = Source(lines2) - if isinstance(lines2, Source): - lines2 = lines2.strip().lines - return lines2 - + if isinstance(lines2, str): + lines2 = Source(lines2) + if isinstance(lines2, Source): + lines2 = lines2.strip().lines + return lines2 + def fnmatch_lines_random(self, lines2: Sequence[str]) -> None: """Check lines exist in the output in any order (using :func:`python:fnmatch.fnmatch`).""" __tracebackhide__ = True - self._match_lines_random(lines2, fnmatch) - + self._match_lines_random(lines2, fnmatch) + def re_match_lines_random(self, lines2: Sequence[str]) -> None: """Check lines exist in the output in any order (using :func:`python:re.match`).""" __tracebackhide__ = True self._match_lines_random(lines2, lambda name, pat: bool(re.match(pat, name))) - + def _match_lines_random( self, lines2: Sequence[str], match_func: Callable[[str, str], bool] ) -> None: __tracebackhide__ = True - lines2 = self._getlines(lines2) - for line in lines2: - for x in self.lines: - if line == x or match_func(x, line): - self._log("matched: ", repr(line)) - break - else: + lines2 = self._getlines(lines2) + for line in lines2: + for x in self.lines: + if line == x or match_func(x, line): + self._log("matched: ", repr(line)) + break + else: msg = "line %r not found in output" % line self._log(msg) self._fail(msg) - + def get_lines_after(self, fnline: str) -> Sequence[str]: - """Return all lines following the given line in the text. - - The given line can contain glob wildcards. - """ - for i, line in enumerate(self.lines): - if fnline == line or fnmatch(line, fnline): - return self.lines[i + 1 :] - raise ValueError("line %r not found in output" % fnline) - + """Return all lines following the given line in the text. + + The given line can contain glob wildcards. + """ + for i, line in enumerate(self.lines): + if fnline == line or fnmatch(line, fnline): + return self.lines[i + 1 :] + raise ValueError("line %r not found in output" % fnline) + def _log(self, *args) -> None: self._log_output.append(" ".join(str(x) for x in args)) - - @property + + @property def _log_text(self) -> str: - return "\n".join(self._log_output) - + return "\n".join(self._log_output) + def fnmatch_lines( self, lines2: Sequence[str], *, consecutive: bool = False ) -> None: """Check lines exist in the output (using :func:`python:fnmatch.fnmatch`). - - The argument is a list of lines which have to match and can use glob - wildcards. If they do not match a pytest.fail() is called. The + + The argument is a list of lines which have to match and can use glob + wildcards. If they do not match a pytest.fail() is called. The matches and non-matches are also shown as part of the error message. - + :param lines2: String patterns to match. :param consecutive: Match lines consecutively? - """ - __tracebackhide__ = True + """ + __tracebackhide__ = True self._match_lines(lines2, fnmatch, "fnmatch", consecutive=consecutive) - + def re_match_lines( self, lines2: Sequence[str], *, consecutive: bool = False ) -> None: """Check lines exist in the output (using :func:`python:re.match`). - - The argument is a list of lines which have to match using ``re.match``. - If they do not match a pytest.fail() is called. - + + The argument is a list of lines which have to match using ``re.match``. + If they do not match a pytest.fail() is called. + The matches and non-matches are also shown as part of the error message. - + :param lines2: string patterns to match. :param consecutive: match lines consecutively? - """ - __tracebackhide__ = True + """ + __tracebackhide__ = True self._match_lines( lines2, lambda name, pat: bool(re.match(pat, name)), "re.match", consecutive=consecutive, ) - + def _match_lines( self, lines2: Sequence[str], @@ -1811,8 +1811,8 @@ class LineMatcher: *, consecutive: bool = False, ) -> None: - """Underlying implementation of ``fnmatch_lines`` and ``re_match_lines``. - + """Underlying implementation of ``fnmatch_lines`` and ``re_match_lines``. + :param Sequence[str] lines2: List of string patterns to match. The actual format depends on ``match_func``. @@ -1825,31 +1825,31 @@ class LineMatcher: when a match occurs. :param consecutive: Match lines consecutively? - """ + """ if not isinstance(lines2, collections.abc.Sequence): raise TypeError("invalid type for lines2: {}".format(type(lines2).__name__)) - lines2 = self._getlines(lines2) - lines1 = self.lines[:] - extralines = [] - __tracebackhide__ = True + lines2 = self._getlines(lines2) + lines1 = self.lines[:] + extralines = [] + __tracebackhide__ = True wnick = len(match_nickname) + 1 started = False - for line in lines2: - nomatchprinted = False - while lines1: - nextline = lines1.pop(0) - if line == nextline: - self._log("exact match:", repr(line)) + for line in lines2: + nomatchprinted = False + while lines1: + nextline = lines1.pop(0) + if line == nextline: + self._log("exact match:", repr(line)) started = True - break - elif match_func(nextline, line): - self._log("%s:" % match_nickname, repr(line)) + break + elif match_func(nextline, line): + self._log("%s:" % match_nickname, repr(line)) self._log( "{:>{width}}".format("with:", width=wnick), repr(nextline) ) started = True - break - else: + break + else: if consecutive and started: msg = f"no consecutive match: {line!r}" self._log(msg) @@ -1857,14 +1857,14 @@ class LineMatcher: "{:>{width}}".format("with:", width=wnick), repr(nextline) ) self._fail(msg) - if not nomatchprinted: + if not nomatchprinted: self._log( "{:>{width}}".format("nomatch:", width=wnick), repr(line) ) - nomatchprinted = True + nomatchprinted = True self._log("{:>{width}}".format("and:", width=wnick), repr(nextline)) - extralines.append(nextline) - else: + extralines.append(nextline) + else: msg = f"remains unmatched: {line!r}" self._log(msg) self._fail(msg) |