diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2025-05-05 12:31:52 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2025-05-05 12:41:33 +0300 |
commit | 6ff49ec58061f642c3a2f83c61eba12820787dfc (patch) | |
tree | c733ec9bdb15ed280080d31dea8725bfec717acd /contrib/python/pytest/py3/_pytest/pytester.py | |
parent | eefca8305c6a545cc6b16dca3eb0d91dcef2adcd (diff) | |
download | ydb-6ff49ec58061f642c3a2f83c61eba12820787dfc.tar.gz |
Intermediate changes
commit_hash:8b3bb826b17db8329ed1221f545c0645f12c552d
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/pytester.py')
-rw-r--r-- | contrib/python/pytest/py3/_pytest/pytester.py | 110 |
1 files changed, 48 insertions, 62 deletions
diff --git a/contrib/python/pytest/py3/_pytest/pytester.py b/contrib/python/pytest/py3/_pytest/pytester.py index 0771065e065..00bd7b02cbd 100644 --- a/contrib/python/pytest/py3/_pytest/pytester.py +++ b/contrib/python/pytest/py3/_pytest/pytester.py @@ -2,28 +2,32 @@ PYTEST_DONT_REWRITE """ + import collections.abc import contextlib +from fnmatch import fnmatch import gc import importlib +from io import StringIO import locale import os +from pathlib import Path import platform import re import shutil import subprocess import sys import traceback -from fnmatch import fnmatch -from io import StringIO -from pathlib import Path from typing import Any from typing import Callable from typing import Dict +from typing import Final +from typing import final from typing import Generator from typing import IO from typing import Iterable from typing import List +from typing import Literal from typing import Optional from typing import overload from typing import Sequence @@ -40,7 +44,6 @@ from iniconfig import SectionWrapper from _pytest import timing from _pytest._code import Source from _pytest.capture import _get_multicapture -from _pytest.compat import final from _pytest.compat import NOTSET from _pytest.compat import NotSetType from _pytest.config import _PluggyPlugin @@ -61,7 +64,6 @@ from _pytest.outcomes import fail from _pytest.outcomes import importorskip from _pytest.outcomes import skip from _pytest.pathlib import bestrelpath -from _pytest.pathlib import copytree from _pytest.pathlib import make_numbered_dir from _pytest.reports import CollectReport from _pytest.reports import TestReport @@ -70,9 +72,6 @@ from _pytest.warning_types import PytestWarning if TYPE_CHECKING: - from typing_extensions import Final - from typing_extensions import Literal - import pexpect @@ -124,13 +123,18 @@ def pytest_configure(config: Config) -> None: class LsofFdLeakChecker: def get_open_files(self) -> List[Tuple[str, str]]: + if sys.version_info >= (3, 11): + # New in Python 3.11, ignores utf-8 mode + encoding = locale.getencoding() + else: + encoding = locale.getpreferredencoding(False) out = subprocess.run( ("lsof", "-Ffn0", "-p", str(os.getpid())), stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, check=True, text=True, - encoding=locale.getpreferredencoding(False), + encoding=encoding, ).stdout def isopen(line: str) -> bool: @@ -163,29 +167,31 @@ class LsofFdLeakChecker: else: return True - @hookimpl(hookwrapper=True, tryfirst=True) - def pytest_runtest_protocol(self, item: Item) -> Generator[None, None, None]: + @hookimpl(wrapper=True, tryfirst=True) + def pytest_runtest_protocol(self, item: Item) -> Generator[None, object, object]: lines1 = self.get_open_files() - yield - if hasattr(sys, "pypy_version_info"): - gc.collect() - lines2 = self.get_open_files() - - new_fds = {t[0] for t in lines2} - {t[0] for t in lines1} - leaked_files = [t for t in lines2 if t[0] in new_fds] - if leaked_files: - error = [ - "***** %s FD leakage detected" % len(leaked_files), - *(str(f) for f in leaked_files), - "*** Before:", - *(str(f) for f in lines1), - "*** After:", - *(str(f) for f in lines2), - "***** %s FD leakage detected" % len(leaked_files), - "*** function %s:%s: %s " % item.location, - "See issue #2366", - ] - item.warn(PytestWarning("\n".join(error))) + try: + return (yield) + finally: + if hasattr(sys, "pypy_version_info"): + gc.collect() + lines2 = self.get_open_files() + + new_fds = {t[0] for t in lines2} - {t[0] for t in lines1} + leaked_files = [t for t in lines2 if t[0] in new_fds] + if leaked_files: + error = [ + "***** %s FD leakage detected" % len(leaked_files), + *(str(f) for f in leaked_files), + "*** Before:", + *(str(f) for f in lines1), + "*** After:", + *(str(f) for f in lines2), + "***** %s FD leakage detected" % len(leaked_files), + "*** function {}:{}: {} ".format(*item.location), + "See issue #2366", + ] + item.warn(PytestWarning("\n".join(error))) # used at least by pytest-xdist plugin @@ -371,14 +377,12 @@ class HookRecorder: values.append(rep) if not values: raise ValueError( - "could not find test report matching %r: " - "no test reports at all!" % (inamepart,) + f"could not find test report matching {inamepart!r}: " + "no test reports at all!" ) if len(values) > 1: raise ValueError( - "found 2 or more testreports matching {!r}: {}".format( - inamepart, values - ) + f"found 2 or more testreports matching {inamepart!r}: {values}" ) return values[0] @@ -626,14 +630,6 @@ class RunResult: ) -class CwdSnapshot: - def __init__(self) -> None: - self.__saved = os.getcwd() - - def restore(self) -> None: - os.chdir(self.__saved) - - class SysModulesSnapshot: def __init__(self, preserve: Optional[Callable[[str], bool]] = None) -> None: self.__preserve = preserve @@ -697,15 +693,14 @@ class Pytester: #: be added to the list. The type of items to add to the list depends on #: the method using them so refer to them for details. self.plugins: List[Union[str, _PluggyPlugin]] = [] - self._cwd_snapshot = CwdSnapshot() self._sys_path_snapshot = SysPathsSnapshot() self._sys_modules_snapshot = self.__take_sys_modules_snapshot() - self.chdir() self._request.addfinalizer(self._finalize) self._method = self._request.config.getoption("--runpytest") self._test_tmproot = tmp_path_factory.mktemp(f"tmp-{name}", numbered=True) self._monkeypatch = mp = monkeypatch + self.chdir() mp.setenv("PYTEST_DEBUG_TEMPROOT", str(self._test_tmproot)) # Ensure no unexpected caching via tox. mp.delenv("TOX_ENV_DIR", raising=False) @@ -736,7 +731,6 @@ class Pytester: """ self._sys_modules_snapshot.restore() self._sys_path_snapshot.restore() - self._cwd_snapshot.restore() def __take_sys_modules_snapshot(self) -> SysModulesSnapshot: # Some zope modules used by twisted-related tests keep internal state @@ -761,7 +755,7 @@ class Pytester: This is done automatically upon instantiation. """ - os.chdir(self.path) + self._monkeypatch.chdir(self.path) def _makefile( self, @@ -813,7 +807,6 @@ class Pytester: The first created file. Examples: - .. code-block:: python pytester.makefile(".txt", "line1", "line2") @@ -830,7 +823,7 @@ class Pytester: return self._makefile(ext, args, kwargs) def makeconftest(self, source: str) -> Path: - """Write a contest.py file. + """Write a conftest.py file. :param source: The contents. :returns: The conftest.py file. @@ -867,7 +860,6 @@ class Pytester: existing files. Examples: - .. code-block:: python def test_something(pytester): @@ -887,7 +879,6 @@ class Pytester: existing files. Examples: - .. code-block:: python def test_something(pytester): @@ -973,7 +964,7 @@ class Pytester: example_path = example_dir.joinpath(name) if example_path.is_dir() and not example_path.joinpath("__init__.py").is_file(): - copytree(example_path, self.path) + shutil.copytree(example_path, self.path, symlinks=True, dirs_exist_ok=True) return self.path elif example_path.is_file(): result = self.path.joinpath(example_path.name) @@ -1050,7 +1041,7 @@ class Pytester: The calling test instance (class containing the test method) must provide a ``.getrunner()`` method which should return a runner which can run the test protocol for a single item, e.g. - :py:func:`_pytest.runner.runtestprotocol`. + ``_pytest.runner.runtestprotocol``. """ # used from runner functional tests item = self.getitem(source) @@ -1277,9 +1268,7 @@ class Pytester: for item in items: if item.name == funcname: return item - assert 0, "{!r} item not found in module:\n{}\nitems: {}".format( - funcname, source, items - ) + assert 0, f"{funcname!r} item not found in module:\n{source}\nitems: {items}" def getitems(self, source: Union[str, "os.PathLike[str]"]) -> List[Item]: """Return all test items collected from the module. @@ -1401,7 +1390,7 @@ class Pytester: :param stdin: Optional standard input. - - If it is :py:attr:`CLOSE_STDIN` (Default), then this method calls + - If it is ``CLOSE_STDIN`` (Default), then this method calls :py:class:`subprocess.Popen` with ``stdin=subprocess.PIPE``, and the standard input is closed immediately after the new command is started. @@ -1438,10 +1427,7 @@ class Pytester: def handle_timeout() -> None: __tracebackhide__ = True - timeout_message = ( - "{seconds} second timeout expired running:" - " {command}".format(seconds=timeout, command=cmdargs) - ) + timeout_message = f"{timeout} second timeout expired running: {cmdargs}" popen.kill() popen.wait() |