aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest/py3/_pytest/pytester.py
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2025-05-05 12:31:52 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2025-05-05 12:41:33 +0300
commit6ff49ec58061f642c3a2f83c61eba12820787dfc (patch)
treec733ec9bdb15ed280080d31dea8725bfec717acd /contrib/python/pytest/py3/_pytest/pytester.py
parenteefca8305c6a545cc6b16dca3eb0d91dcef2adcd (diff)
downloadydb-6ff49ec58061f642c3a2f83c61eba12820787dfc.tar.gz
Intermediate changes
commit_hash:8b3bb826b17db8329ed1221f545c0645f12c552d
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/pytester.py')
-rw-r--r--contrib/python/pytest/py3/_pytest/pytester.py110
1 files changed, 48 insertions, 62 deletions
diff --git a/contrib/python/pytest/py3/_pytest/pytester.py b/contrib/python/pytest/py3/_pytest/pytester.py
index 0771065e065..00bd7b02cbd 100644
--- a/contrib/python/pytest/py3/_pytest/pytester.py
+++ b/contrib/python/pytest/py3/_pytest/pytester.py
@@ -2,28 +2,32 @@
PYTEST_DONT_REWRITE
"""
+
import collections.abc
import contextlib
+from fnmatch import fnmatch
import gc
import importlib
+from io import StringIO
import locale
import os
+from pathlib import Path
import platform
import re
import shutil
import subprocess
import sys
import traceback
-from fnmatch import fnmatch
-from io import StringIO
-from pathlib import Path
from typing import Any
from typing import Callable
from typing import Dict
+from typing import Final
+from typing import final
from typing import Generator
from typing import IO
from typing import Iterable
from typing import List
+from typing import Literal
from typing import Optional
from typing import overload
from typing import Sequence
@@ -40,7 +44,6 @@ from iniconfig import SectionWrapper
from _pytest import timing
from _pytest._code import Source
from _pytest.capture import _get_multicapture
-from _pytest.compat import final
from _pytest.compat import NOTSET
from _pytest.compat import NotSetType
from _pytest.config import _PluggyPlugin
@@ -61,7 +64,6 @@ from _pytest.outcomes import fail
from _pytest.outcomes import importorskip
from _pytest.outcomes import skip
from _pytest.pathlib import bestrelpath
-from _pytest.pathlib import copytree
from _pytest.pathlib import make_numbered_dir
from _pytest.reports import CollectReport
from _pytest.reports import TestReport
@@ -70,9 +72,6 @@ from _pytest.warning_types import PytestWarning
if TYPE_CHECKING:
- from typing_extensions import Final
- from typing_extensions import Literal
-
import pexpect
@@ -124,13 +123,18 @@ def pytest_configure(config: Config) -> None:
class LsofFdLeakChecker:
def get_open_files(self) -> List[Tuple[str, str]]:
+ if sys.version_info >= (3, 11):
+ # New in Python 3.11, ignores utf-8 mode
+ encoding = locale.getencoding()
+ else:
+ encoding = locale.getpreferredencoding(False)
out = subprocess.run(
("lsof", "-Ffn0", "-p", str(os.getpid())),
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
check=True,
text=True,
- encoding=locale.getpreferredencoding(False),
+ encoding=encoding,
).stdout
def isopen(line: str) -> bool:
@@ -163,29 +167,31 @@ class LsofFdLeakChecker:
else:
return True
- @hookimpl(hookwrapper=True, tryfirst=True)
- def pytest_runtest_protocol(self, item: Item) -> Generator[None, None, None]:
+ @hookimpl(wrapper=True, tryfirst=True)
+ def pytest_runtest_protocol(self, item: Item) -> Generator[None, object, object]:
lines1 = self.get_open_files()
- yield
- if hasattr(sys, "pypy_version_info"):
- gc.collect()
- lines2 = self.get_open_files()
-
- new_fds = {t[0] for t in lines2} - {t[0] for t in lines1}
- leaked_files = [t for t in lines2 if t[0] in new_fds]
- if leaked_files:
- error = [
- "***** %s FD leakage detected" % len(leaked_files),
- *(str(f) for f in leaked_files),
- "*** Before:",
- *(str(f) for f in lines1),
- "*** After:",
- *(str(f) for f in lines2),
- "***** %s FD leakage detected" % len(leaked_files),
- "*** function %s:%s: %s " % item.location,
- "See issue #2366",
- ]
- item.warn(PytestWarning("\n".join(error)))
+ try:
+ return (yield)
+ finally:
+ if hasattr(sys, "pypy_version_info"):
+ gc.collect()
+ lines2 = self.get_open_files()
+
+ new_fds = {t[0] for t in lines2} - {t[0] for t in lines1}
+ leaked_files = [t for t in lines2 if t[0] in new_fds]
+ if leaked_files:
+ error = [
+ "***** %s FD leakage detected" % len(leaked_files),
+ *(str(f) for f in leaked_files),
+ "*** Before:",
+ *(str(f) for f in lines1),
+ "*** After:",
+ *(str(f) for f in lines2),
+ "***** %s FD leakage detected" % len(leaked_files),
+ "*** function {}:{}: {} ".format(*item.location),
+ "See issue #2366",
+ ]
+ item.warn(PytestWarning("\n".join(error)))
# used at least by pytest-xdist plugin
@@ -371,14 +377,12 @@ class HookRecorder:
values.append(rep)
if not values:
raise ValueError(
- "could not find test report matching %r: "
- "no test reports at all!" % (inamepart,)
+ f"could not find test report matching {inamepart!r}: "
+ "no test reports at all!"
)
if len(values) > 1:
raise ValueError(
- "found 2 or more testreports matching {!r}: {}".format(
- inamepart, values
- )
+ f"found 2 or more testreports matching {inamepart!r}: {values}"
)
return values[0]
@@ -626,14 +630,6 @@ class RunResult:
)
-class CwdSnapshot:
- def __init__(self) -> None:
- self.__saved = os.getcwd()
-
- def restore(self) -> None:
- os.chdir(self.__saved)
-
-
class SysModulesSnapshot:
def __init__(self, preserve: Optional[Callable[[str], bool]] = None) -> None:
self.__preserve = preserve
@@ -697,15 +693,14 @@ class Pytester:
#: be added to the list. The type of items to add to the list depends on
#: the method using them so refer to them for details.
self.plugins: List[Union[str, _PluggyPlugin]] = []
- self._cwd_snapshot = CwdSnapshot()
self._sys_path_snapshot = SysPathsSnapshot()
self._sys_modules_snapshot = self.__take_sys_modules_snapshot()
- self.chdir()
self._request.addfinalizer(self._finalize)
self._method = self._request.config.getoption("--runpytest")
self._test_tmproot = tmp_path_factory.mktemp(f"tmp-{name}", numbered=True)
self._monkeypatch = mp = monkeypatch
+ self.chdir()
mp.setenv("PYTEST_DEBUG_TEMPROOT", str(self._test_tmproot))
# Ensure no unexpected caching via tox.
mp.delenv("TOX_ENV_DIR", raising=False)
@@ -736,7 +731,6 @@ class Pytester:
"""
self._sys_modules_snapshot.restore()
self._sys_path_snapshot.restore()
- self._cwd_snapshot.restore()
def __take_sys_modules_snapshot(self) -> SysModulesSnapshot:
# Some zope modules used by twisted-related tests keep internal state
@@ -761,7 +755,7 @@ class Pytester:
This is done automatically upon instantiation.
"""
- os.chdir(self.path)
+ self._monkeypatch.chdir(self.path)
def _makefile(
self,
@@ -813,7 +807,6 @@ class Pytester:
The first created file.
Examples:
-
.. code-block:: python
pytester.makefile(".txt", "line1", "line2")
@@ -830,7 +823,7 @@ class Pytester:
return self._makefile(ext, args, kwargs)
def makeconftest(self, source: str) -> Path:
- """Write a contest.py file.
+ """Write a conftest.py file.
:param source: The contents.
:returns: The conftest.py file.
@@ -867,7 +860,6 @@ class Pytester:
existing files.
Examples:
-
.. code-block:: python
def test_something(pytester):
@@ -887,7 +879,6 @@ class Pytester:
existing files.
Examples:
-
.. code-block:: python
def test_something(pytester):
@@ -973,7 +964,7 @@ class Pytester:
example_path = example_dir.joinpath(name)
if example_path.is_dir() and not example_path.joinpath("__init__.py").is_file():
- copytree(example_path, self.path)
+ shutil.copytree(example_path, self.path, symlinks=True, dirs_exist_ok=True)
return self.path
elif example_path.is_file():
result = self.path.joinpath(example_path.name)
@@ -1050,7 +1041,7 @@ class Pytester:
The calling test instance (class containing the test method) must
provide a ``.getrunner()`` method which should return a runner which
can run the test protocol for a single item, e.g.
- :py:func:`_pytest.runner.runtestprotocol`.
+ ``_pytest.runner.runtestprotocol``.
"""
# used from runner functional tests
item = self.getitem(source)
@@ -1277,9 +1268,7 @@ class Pytester:
for item in items:
if item.name == funcname:
return item
- assert 0, "{!r} item not found in module:\n{}\nitems: {}".format(
- funcname, source, items
- )
+ assert 0, f"{funcname!r} item not found in module:\n{source}\nitems: {items}"
def getitems(self, source: Union[str, "os.PathLike[str]"]) -> List[Item]:
"""Return all test items collected from the module.
@@ -1401,7 +1390,7 @@ class Pytester:
:param stdin:
Optional standard input.
- - If it is :py:attr:`CLOSE_STDIN` (Default), then this method calls
+ - If it is ``CLOSE_STDIN`` (Default), then this method calls
:py:class:`subprocess.Popen` with ``stdin=subprocess.PIPE``, and
the standard input is closed immediately after the new command is
started.
@@ -1438,10 +1427,7 @@ class Pytester:
def handle_timeout() -> None:
__tracebackhide__ = True
- timeout_message = (
- "{seconds} second timeout expired running:"
- " {command}".format(seconds=timeout, command=cmdargs)
- )
+ timeout_message = f"{timeout} second timeout expired running: {cmdargs}"
popen.kill()
popen.wait()