aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest/py3/_pytest/main.py
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2025-05-05 12:31:52 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2025-05-05 12:41:33 +0300
commit6ff49ec58061f642c3a2f83c61eba12820787dfc (patch)
treec733ec9bdb15ed280080d31dea8725bfec717acd /contrib/python/pytest/py3/_pytest/main.py
parenteefca8305c6a545cc6b16dca3eb0d91dcef2adcd (diff)
downloadydb-6ff49ec58061f642c3a2f83c61eba12820787dfc.tar.gz
Intermediate changes
commit_hash:8b3bb826b17db8329ed1221f545c0645f12c552d
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/main.py')
-rw-r--r--contrib/python/pytest/py3/_pytest/main.py495
1 files changed, 299 insertions, 196 deletions
diff --git a/contrib/python/pytest/py3/_pytest/main.py b/contrib/python/pytest/py3/_pytest/main.py
index ea89a63fa1b..fd9dddfa318 100644
--- a/contrib/python/pytest/py3/_pytest/main.py
+++ b/contrib/python/pytest/py3/_pytest/main.py
@@ -1,29 +1,33 @@
"""Core implementation of the testing process: init, session, runtest loop."""
+
import argparse
import dataclasses
import fnmatch
import functools
import importlib
import os
-import sys
from pathlib import Path
+import sys
+from typing import AbstractSet
from typing import Callable
from typing import Dict
+from typing import final
from typing import FrozenSet
+from typing import Iterable
from typing import Iterator
from typing import List
+from typing import Literal
from typing import Optional
+from typing import overload
from typing import Sequence
-from typing import Set
from typing import Tuple
-from typing import Type
-from typing import TYPE_CHECKING
from typing import Union
+import warnings
+
+import pluggy
-import _pytest._code
from _pytest import nodes
-from _pytest.compat import final
-from _pytest.compat import overload
+import _pytest._code
from _pytest.config import Config
from _pytest.config import directory_arg
from _pytest.config import ExitCode
@@ -31,21 +35,19 @@ from _pytest.config import hookimpl
from _pytest.config import PytestPluginManager
from _pytest.config import UsageError
from _pytest.config.argparsing import Parser
+from _pytest.config.compat import PathAwareHookProxy
from _pytest.fixtures import FixtureManager
from _pytest.outcomes import exit
from _pytest.pathlib import absolutepath
from _pytest.pathlib import bestrelpath
from _pytest.pathlib import fnmatch_ex
from _pytest.pathlib import safe_exists
-from _pytest.pathlib import visit
+from _pytest.pathlib import scandir
from _pytest.reports import CollectReport
from _pytest.reports import TestReport
from _pytest.runner import collect_one_node
from _pytest.runner import SetupState
-
-
-if TYPE_CHECKING:
- from typing_extensions import Literal
+from _pytest.warning_types import PytestWarning
def pytest_addoption(parser: Parser) -> None:
@@ -377,7 +379,7 @@ def _in_venv(path: Path) -> bool:
def pytest_ignore_collect(collection_path: Path, config: Config) -> Optional[bool]:
ignore_paths = config._getconftest_pathlist(
- "collect_ignore", path=collection_path.parent, rootpath=config.rootpath
+ "collect_ignore", path=collection_path.parent
)
ignore_paths = ignore_paths or []
excludeopt = config.getoption("ignore")
@@ -388,7 +390,7 @@ def pytest_ignore_collect(collection_path: Path, config: Config) -> Optional[boo
return True
ignore_globs = config._getconftest_pathlist(
- "collect_ignore_glob", path=collection_path.parent, rootpath=config.rootpath
+ "collect_ignore_glob", path=collection_path.parent
)
ignore_globs = ignore_globs or []
excludeglobopt = config.getoption("ignore_glob")
@@ -410,6 +412,12 @@ def pytest_ignore_collect(collection_path: Path, config: Config) -> Optional[boo
return None
+def pytest_collect_directory(
+ path: Path, parent: nodes.Collector
+) -> Optional[nodes.Collector]:
+ return Dir.from_parent(parent, path=path)
+
+
def pytest_collection_modifyitems(items: List[nodes.Item], config: Config) -> None:
deselect_prefixes = tuple(config.getoption("deselect") or [])
if not deselect_prefixes:
@@ -429,11 +437,15 @@ def pytest_collection_modifyitems(items: List[nodes.Item], config: Config) -> No
class FSHookProxy:
- def __init__(self, pm: PytestPluginManager, remove_mods) -> None:
+ def __init__(
+ self,
+ pm: PytestPluginManager,
+ remove_mods: AbstractSet[object],
+ ) -> None:
self.pm = pm
self.remove_mods = remove_mods
- def __getattr__(self, name: str):
+ def __getattr__(self, name: str) -> pluggy.HookCaller:
x = self.pm.subset_hook_caller(name, remove_plugins=self.remove_mods)
self.__dict__[name] = x
return x
@@ -462,7 +474,60 @@ class _bestrelpath_cache(Dict[Path, str]):
@final
-class Session(nodes.FSCollector):
+class Dir(nodes.Directory):
+ """Collector of files in a file system directory.
+
+ .. versionadded:: 8.0
+
+ .. note::
+
+ Python directories with an `__init__.py` file are instead collected by
+ :class:`~pytest.Package` by default. Both are :class:`~pytest.Directory`
+ collectors.
+ """
+
+ @classmethod
+ def from_parent( # type: ignore[override]
+ cls,
+ parent: nodes.Collector, # type: ignore[override]
+ *,
+ path: Path,
+ ) -> "Dir":
+ """The public constructor.
+
+ :param parent: The parent collector of this Dir.
+ :param path: The directory's path.
+ """
+ return super().from_parent(parent=parent, path=path) # type: ignore[no-any-return]
+
+ def collect(self) -> Iterable[Union[nodes.Item, nodes.Collector]]:
+ config = self.config
+ col: Optional[nodes.Collector]
+ cols: Sequence[nodes.Collector]
+ ihook = self.ihook
+ for direntry in scandir(self.path):
+ if direntry.is_dir():
+ if direntry.name == "__pycache__":
+ continue
+ path = Path(direntry.path)
+ if not self.session.isinitpath(path, with_parents=True):
+ if ihook.pytest_ignore_collect(collection_path=path, config=config):
+ continue
+ col = ihook.pytest_collect_directory(path=path, parent=self)
+ if col is not None:
+ yield col
+
+ elif direntry.is_file():
+ path = Path(direntry.path)
+ if not self.session.isinitpath(path):
+ if ihook.pytest_ignore_collect(collection_path=path, config=config):
+ continue
+ cols = ihook.pytest_collect_file(file_path=path, parent=self)
+ yield from cols
+
+
+@final
+class Session(nodes.Collector):
"""The root of the collection tree.
``Session`` collects the initial paths given as arguments to pytest.
@@ -478,6 +543,7 @@ class Session(nodes.FSCollector):
def __init__(self, config: Config) -> None:
super().__init__(
+ name="",
path=config.rootpath,
fspath=None,
parent=None,
@@ -487,10 +553,15 @@ class Session(nodes.FSCollector):
)
self.testsfailed = 0
self.testscollected = 0
- self.shouldstop: Union[bool, str] = False
- self.shouldfail: Union[bool, str] = False
+ self._shouldstop: Union[bool, str] = False
+ self._shouldfail: Union[bool, str] = False
self.trace = config.trace.root.get("collection")
self._initialpaths: FrozenSet[Path] = frozenset()
+ self._initialpaths_with_parents: FrozenSet[Path] = frozenset()
+ self._notfound: List[Tuple[str, Sequence[nodes.Collector]]] = []
+ self._initial_parts: List[Tuple[Path, List[str]]] = []
+ self._collection_cache: Dict[nodes.Collector, CollectReport] = {}
+ self.items: List[nodes.Item] = []
self._bestrelpathcache: Dict[Path, str] = _bestrelpath_cache(config.rootpath)
@@ -511,6 +582,42 @@ class Session(nodes.FSCollector):
)
@property
+ def shouldstop(self) -> Union[bool, str]:
+ return self._shouldstop
+
+ @shouldstop.setter
+ def shouldstop(self, value: Union[bool, str]) -> None:
+ # The runner checks shouldfail and assumes that if it is set we are
+ # definitely stopping, so prevent unsetting it.
+ if value is False and self._shouldstop:
+ warnings.warn(
+ PytestWarning(
+ "session.shouldstop cannot be unset after it has been set; ignoring."
+ ),
+ stacklevel=2,
+ )
+ return
+ self._shouldstop = value
+
+ @property
+ def shouldfail(self) -> Union[bool, str]:
+ return self._shouldfail
+
+ @shouldfail.setter
+ def shouldfail(self, value: Union[bool, str]) -> None:
+ # The runner checks shouldfail and assumes that if it is set we are
+ # definitely stopping, so prevent unsetting it.
+ if value is False and self._shouldfail:
+ warnings.warn(
+ PytestWarning(
+ "session.shouldfail cannot be unset after it has been set; ignoring."
+ ),
+ stacklevel=2,
+ )
+ return
+ self._shouldfail = value
+
+ @property
def startpath(self) -> Path:
"""The path from which pytest was invoked.
@@ -541,65 +648,77 @@ class Session(nodes.FSCollector):
pytest_collectreport = pytest_runtest_logreport
- def isinitpath(self, path: Union[str, "os.PathLike[str]"]) -> bool:
+ def isinitpath(
+ self,
+ path: Union[str, "os.PathLike[str]"],
+ *,
+ with_parents: bool = False,
+ ) -> bool:
+ """Is path an initial path?
+
+ An initial path is a path explicitly given to pytest on the command
+ line.
+
+ :param with_parents:
+ If set, also return True if the path is a parent of an initial path.
+
+ .. versionchanged:: 8.0
+ Added the ``with_parents`` parameter.
+ """
# Optimization: Path(Path(...)) is much slower than isinstance.
path_ = path if isinstance(path, Path) else Path(path)
- return path_ in self._initialpaths
+ if with_parents:
+ return path_ in self._initialpaths_with_parents
+ else:
+ return path_ in self._initialpaths
- def gethookproxy(self, fspath: "os.PathLike[str]"):
+ def gethookproxy(self, fspath: "os.PathLike[str]") -> pluggy.HookRelay:
# Optimization: Path(Path(...)) is much slower than isinstance.
path = fspath if isinstance(fspath, Path) else Path(fspath)
pm = self.config.pluginmanager
# Check if we have the common case of running
# hooks with all conftest.py files.
- my_conftestmodules = pm._getconftestmodules(
- path,
- self.config.getoption("importmode"),
- rootpath=self.config.rootpath,
- )
+ my_conftestmodules = pm._getconftestmodules(path)
remove_mods = pm._conftest_plugins.difference(my_conftestmodules)
+ proxy: pluggy.HookRelay
if remove_mods:
- # One or more conftests are not in use at this fspath.
- from .config.compat import PathAwareHookProxy
-
- proxy = PathAwareHookProxy(FSHookProxy(pm, remove_mods))
+ # One or more conftests are not in use at this path.
+ proxy = PathAwareHookProxy(FSHookProxy(pm, remove_mods)) # type: ignore[arg-type,assignment]
else:
# All plugins are active for this fspath.
proxy = self.config.hook
return proxy
- def _recurse(self, direntry: "os.DirEntry[str]") -> bool:
- if direntry.name == "__pycache__":
- return False
- fspath = Path(direntry.path)
- ihook = self.gethookproxy(fspath.parent)
- if ihook.pytest_ignore_collect(collection_path=fspath, config=self.config):
- return False
- return True
-
- def _collectfile(
- self, fspath: Path, handle_dupes: bool = True
+ def _collect_path(
+ self,
+ path: Path,
+ path_cache: Dict[Path, Sequence[nodes.Collector]],
) -> Sequence[nodes.Collector]:
- assert (
- fspath.is_file()
- ), "{!r} is not a file (isdir={!r}, exists={!r}, islink={!r})".format(
- fspath, fspath.is_dir(), fspath.exists(), fspath.is_symlink()
- )
- ihook = self.gethookproxy(fspath)
- if not self.isinitpath(fspath):
- if ihook.pytest_ignore_collect(collection_path=fspath, config=self.config):
- return ()
+ """Create a Collector for the given path.
- if handle_dupes:
- keepduplicates = self.config.getoption("keepduplicates")
- if not keepduplicates:
- duplicate_paths = self.config.pluginmanager._duplicatepaths
- if fspath in duplicate_paths:
- return ()
- else:
- duplicate_paths.add(fspath)
+ `path_cache` makes it so the same Collectors are returned for the same
+ path.
+ """
+ if path in path_cache:
+ return path_cache[path]
+
+ if path.is_dir():
+ ihook = self.gethookproxy(path.parent)
+ col: Optional[nodes.Collector] = ihook.pytest_collect_directory(
+ path=path, parent=self
+ )
+ cols: Sequence[nodes.Collector] = (col,) if col is not None else ()
- return ihook.pytest_collect_file(file_path=fspath, parent=self) # type: ignore[no-any-return]
+ elif path.is_file():
+ ihook = self.gethookproxy(path)
+ cols = ihook.pytest_collect_file(file_path=path, parent=self)
+
+ else:
+ # Broken symlink or invalid/missing file.
+ cols = ()
+
+ path_cache[path] = cols
+ return cols
@overload
def perform_collect(
@@ -635,15 +754,16 @@ class Session(nodes.FSCollector):
self.trace("perform_collect", self, args)
self.trace.root.indent += 1
- self._notfound: List[Tuple[str, Sequence[nodes.Collector]]] = []
- self._initial_parts: List[Tuple[Path, List[str]]] = []
- self.items: List[nodes.Item] = []
-
hook = self.config.hook
+ self._notfound = []
+ self._initial_parts = []
+ self._collection_cache = {}
+ self.items = []
items: Sequence[Union[nodes.Item, nodes.Collector]] = self.items
try:
initialpaths: List[Path] = []
+ initialpaths_with_parents: List[Path] = []
for arg in args:
fspath, parts = resolve_collection_argument(
self.config.invocation_params.dir,
@@ -652,7 +772,11 @@ class Session(nodes.FSCollector):
)
self._initial_parts.append((fspath, parts))
initialpaths.append(fspath)
+ initialpaths_with_parents.append(fspath)
+ initialpaths_with_parents.extend(fspath.parents)
self._initialpaths = frozenset(initialpaths)
+ self._initialpaths_with_parents = frozenset(initialpaths_with_parents)
+
rep = collect_one_node(self)
self.ihook.pytest_collectreport(report=rep)
self.trace.root.indent -= 1
@@ -661,12 +785,13 @@ class Session(nodes.FSCollector):
for arg, collectors in self._notfound:
if collectors:
errors.append(
- f"not found: {arg}\n(no name {arg!r} in any of {collectors!r})"
+ f"not found: {arg}\n(no match in any of {collectors!r})"
)
else:
errors.append(f"found no collectors for {arg}")
raise UsageError(*errors)
+
if not genitems:
items = rep.result
else:
@@ -679,154 +804,126 @@ class Session(nodes.FSCollector):
session=self, config=self.config, items=items
)
finally:
+ self._notfound = []
+ self._initial_parts = []
+ self._collection_cache = {}
hook.pytest_collection_finish(session=self)
- self.testscollected = len(items)
- return items
+ if genitems:
+ self.testscollected = len(items)
- def collect(self) -> Iterator[Union[nodes.Item, nodes.Collector]]:
- from _pytest.python import Package
+ return items
- # Keep track of any collected nodes in here, so we don't duplicate fixtures.
- node_cache1: Dict[Path, Sequence[nodes.Collector]] = {}
- node_cache2: Dict[Tuple[Type[nodes.Collector], Path], nodes.Collector] = {}
+ def _collect_one_node(
+ self,
+ node: nodes.Collector,
+ handle_dupes: bool = True,
+ ) -> Tuple[CollectReport, bool]:
+ if node in self._collection_cache and handle_dupes:
+ rep = self._collection_cache[node]
+ return rep, True
+ else:
+ rep = collect_one_node(node)
+ self._collection_cache[node] = rep
+ return rep, False
- # Keep track of any collected collectors in matchnodes paths, so they
- # are not collected more than once.
- matchnodes_cache: Dict[Tuple[Type[nodes.Collector], str], CollectReport] = {}
+ def collect(self) -> Iterator[Union[nodes.Item, nodes.Collector]]:
+ # This is a cache for the root directories of the initial paths.
+ # We can't use collection_cache for Session because of its special
+ # role as the bootstrapping collector.
+ path_cache: Dict[Path, Sequence[nodes.Collector]] = {}
- # Directories of pkgs with dunder-init files.
- pkg_roots: Dict[Path, Package] = {}
+ pm = self.config.pluginmanager
for argpath, names in self._initial_parts:
self.trace("processing argument", (argpath, names))
self.trace.root.indent += 1
- # Start with a Session root, and delve to argpath item (dir or file)
- # and stack all Packages found on the way.
- # No point in finding packages when collecting doctests.
- if not self.config.getoption("doctestmodules", False):
- pm = self.config.pluginmanager
- for parent in (argpath, *argpath.parents):
- if not pm._is_in_confcutdir(argpath):
- break
-
- if parent.is_dir():
- pkginit = parent / "__init__.py"
- if pkginit.is_file() and pkginit not in node_cache1:
- col = self._collectfile(pkginit, handle_dupes=False)
- if col:
- if isinstance(col[0], Package):
- pkg_roots[parent] = col[0]
- node_cache1[col[0].path] = [col[0]]
-
- # If it's a directory argument, recurse and look for any Subpackages.
- # Let the Package collector deal with subnodes, don't collect here.
+ # resolve_collection_argument() ensures this.
if argpath.is_dir():
assert not names, f"invalid arg {(argpath, names)!r}"
- seen_dirs: Set[Path] = set()
- for direntry in visit(argpath, self._recurse):
- if not direntry.is_file():
- continue
-
- path = Path(direntry.path)
- dirpath = path.parent
-
- if dirpath not in seen_dirs:
- # Collect packages first.
- seen_dirs.add(dirpath)
- pkginit = dirpath / "__init__.py"
- if pkginit.exists():
- for x in self._collectfile(pkginit):
- yield x
- if isinstance(x, Package):
- pkg_roots[dirpath] = x
- if dirpath in pkg_roots:
- # Do not collect packages here.
- continue
+ # Match the argpath from the root, e.g.
+ # /a/b/c.py -> [/, /a, /a/b, /a/b/c.py]
+ paths = [*reversed(argpath.parents), argpath]
+ # Paths outside of the confcutdir should not be considered, unless
+ # it's the argpath itself.
+ while len(paths) > 1 and not pm._is_in_confcutdir(paths[0]):
+ paths = paths[1:]
+
+ # Start going over the parts from the root, collecting each level
+ # and discarding all nodes which don't match the level's part.
+ any_matched_in_initial_part = False
+ notfound_collectors = []
+ work: List[
+ Tuple[Union[nodes.Collector, nodes.Item], List[Union[Path, str]]]
+ ] = [(self, paths + names)]
+ while work:
+ matchnode, matchparts = work.pop()
+
+ # Pop'd all of the parts, this is a match.
+ if not matchparts:
+ yield matchnode
+ any_matched_in_initial_part = True
+ continue
- for x in self._collectfile(path):
- key2 = (type(x), x.path)
- if key2 in node_cache2:
- yield node_cache2[key2]
- else:
- node_cache2[key2] = x
- yield x
- else:
- assert argpath.is_file()
+ # Should have been matched by now, discard.
+ if not isinstance(matchnode, nodes.Collector):
+ continue
- if argpath in node_cache1:
- col = node_cache1[argpath]
+ # Collect this level of matching.
+ # Collecting Session (self) is done directly to avoid endless
+ # recursion to this function.
+ subnodes: Sequence[Union[nodes.Collector, nodes.Item]]
+ if isinstance(matchnode, Session):
+ assert isinstance(matchparts[0], Path)
+ subnodes = matchnode._collect_path(matchparts[0], path_cache)
else:
- collect_root = pkg_roots.get(argpath.parent, self)
- col = collect_root._collectfile(argpath, handle_dupes=False)
- if col:
- node_cache1[argpath] = col
-
- matching = []
- work: List[
- Tuple[Sequence[Union[nodes.Item, nodes.Collector]], Sequence[str]]
- ] = [(col, names)]
- while work:
- self.trace("matchnodes", col, names)
- self.trace.root.indent += 1
-
- matchnodes, matchnames = work.pop()
- for node in matchnodes:
- if not matchnames:
- matching.append(node)
- continue
- if not isinstance(node, nodes.Collector):
- continue
- key = (type(node), node.nodeid)
- if key in matchnodes_cache:
- rep = matchnodes_cache[key]
- else:
- rep = collect_one_node(node)
- matchnodes_cache[key] = rep
- if rep.passed:
- submatchnodes = []
- for r in rep.result:
- # TODO: Remove parametrized workaround once collection structure contains
- # parametrization.
- if (
- r.name == matchnames[0]
- or r.name.split("[")[0] == matchnames[0]
- ):
- submatchnodes.append(r)
- if submatchnodes:
- work.append((submatchnodes, matchnames[1:]))
- else:
- # Report collection failures here to avoid failing to run some test
- # specified in the command line because the module could not be
- # imported (#134).
- node.ihook.pytest_collectreport(report=rep)
-
- self.trace("matchnodes finished -> ", len(matching), "nodes")
- self.trace.root.indent -= 1
-
- if not matching:
- report_arg = "::".join((str(argpath), *names))
- self._notfound.append((report_arg, col))
- continue
+ # For backward compat, files given directly multiple
+ # times on the command line should not be deduplicated.
+ handle_dupes = not (
+ len(matchparts) == 1
+ and isinstance(matchparts[0], Path)
+ and matchparts[0].is_file()
+ )
+ rep, duplicate = self._collect_one_node(matchnode, handle_dupes)
+ if not duplicate and not rep.passed:
+ # Report collection failures here to avoid failing to
+ # run some test specified in the command line because
+ # the module could not be imported (#134).
+ matchnode.ihook.pytest_collectreport(report=rep)
+ if not rep.passed:
+ continue
+ subnodes = rep.result
+
+ # Prune this level.
+ any_matched_in_collector = False
+ for node in reversed(subnodes):
+ # Path part e.g. `/a/b/` in `/a/b/test_file.py::TestIt::test_it`.
+ if isinstance(matchparts[0], Path):
+ is_match = node.path == matchparts[0]
+ if sys.platform == "win32" and not is_match:
+ # In case the file paths do not match, fallback to samefile() to
+ # account for short-paths on Windows (#11895).
+ is_match = os.path.samefile(node.path, matchparts[0])
+ # Name part e.g. `TestIt` in `/a/b/test_file.py::TestIt::test_it`.
+ else:
+ # TODO: Remove parametrized workaround once collection structure contains
+ # parametrization.
+ is_match = (
+ node.name == matchparts[0]
+ or node.name.split("[")[0] == matchparts[0]
+ )
+ if is_match:
+ work.append((node, matchparts[1:]))
+ any_matched_in_collector = True
- # If __init__.py was the only file requested, then the matched
- # node will be the corresponding Package (by default), and the
- # first yielded item will be the __init__ Module itself, so
- # just use that. If this special case isn't taken, then all the
- # files in the package will be yielded.
- if argpath.name == "__init__.py" and isinstance(matching[0], Package):
- try:
- yield next(iter(matching[0].collect()))
- except StopIteration:
- # The package collects nothing with only an __init__.py
- # file in it, which gets ignored by the default
- # "python_files" option.
- pass
- continue
+ if not any_matched_in_collector:
+ notfound_collectors.append(matchnode)
- yield from matching
+ if not any_matched_in_initial_part:
+ report_arg = "::".join((str(argpath), *names))
+ self._notfound.append((report_arg, notfound_collectors))
self.trace.root.indent -= 1
@@ -839,11 +936,17 @@ class Session(nodes.FSCollector):
yield node
else:
assert isinstance(node, nodes.Collector)
- rep = collect_one_node(node)
+ keepduplicates = self.config.getoption("keepduplicates")
+ # For backward compat, dedup only applies to files.
+ handle_dupes = not (keepduplicates and isinstance(node, nodes.File))
+ rep, duplicate = self._collect_one_node(node, handle_dupes)
+ if duplicate and not keepduplicates:
+ return
if rep.passed:
for subnode in rep.result:
yield from self.genitems(subnode)
- node.ihook.pytest_collectreport(report=rep)
+ if not duplicate:
+ node.ihook.pytest_collectreport(report=rep)
def search_pypath(module_name: str) -> str: