diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2025-05-05 12:31:52 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2025-05-05 12:41:33 +0300 |
commit | 6ff49ec58061f642c3a2f83c61eba12820787dfc (patch) | |
tree | c733ec9bdb15ed280080d31dea8725bfec717acd /contrib/python/pytest/py3/_pytest/main.py | |
parent | eefca8305c6a545cc6b16dca3eb0d91dcef2adcd (diff) | |
download | ydb-6ff49ec58061f642c3a2f83c61eba12820787dfc.tar.gz |
Intermediate changes
commit_hash:8b3bb826b17db8329ed1221f545c0645f12c552d
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/main.py')
-rw-r--r-- | contrib/python/pytest/py3/_pytest/main.py | 495 |
1 files changed, 299 insertions, 196 deletions
diff --git a/contrib/python/pytest/py3/_pytest/main.py b/contrib/python/pytest/py3/_pytest/main.py index ea89a63fa1b..fd9dddfa318 100644 --- a/contrib/python/pytest/py3/_pytest/main.py +++ b/contrib/python/pytest/py3/_pytest/main.py @@ -1,29 +1,33 @@ """Core implementation of the testing process: init, session, runtest loop.""" + import argparse import dataclasses import fnmatch import functools import importlib import os -import sys from pathlib import Path +import sys +from typing import AbstractSet from typing import Callable from typing import Dict +from typing import final from typing import FrozenSet +from typing import Iterable from typing import Iterator from typing import List +from typing import Literal from typing import Optional +from typing import overload from typing import Sequence -from typing import Set from typing import Tuple -from typing import Type -from typing import TYPE_CHECKING from typing import Union +import warnings + +import pluggy -import _pytest._code from _pytest import nodes -from _pytest.compat import final -from _pytest.compat import overload +import _pytest._code from _pytest.config import Config from _pytest.config import directory_arg from _pytest.config import ExitCode @@ -31,21 +35,19 @@ from _pytest.config import hookimpl from _pytest.config import PytestPluginManager from _pytest.config import UsageError from _pytest.config.argparsing import Parser +from _pytest.config.compat import PathAwareHookProxy from _pytest.fixtures import FixtureManager from _pytest.outcomes import exit from _pytest.pathlib import absolutepath from _pytest.pathlib import bestrelpath from _pytest.pathlib import fnmatch_ex from _pytest.pathlib import safe_exists -from _pytest.pathlib import visit +from _pytest.pathlib import scandir from _pytest.reports import CollectReport from _pytest.reports import TestReport from _pytest.runner import collect_one_node from _pytest.runner import SetupState - - -if TYPE_CHECKING: - from typing_extensions import Literal +from _pytest.warning_types import PytestWarning def pytest_addoption(parser: Parser) -> None: @@ -377,7 +379,7 @@ def _in_venv(path: Path) -> bool: def pytest_ignore_collect(collection_path: Path, config: Config) -> Optional[bool]: ignore_paths = config._getconftest_pathlist( - "collect_ignore", path=collection_path.parent, rootpath=config.rootpath + "collect_ignore", path=collection_path.parent ) ignore_paths = ignore_paths or [] excludeopt = config.getoption("ignore") @@ -388,7 +390,7 @@ def pytest_ignore_collect(collection_path: Path, config: Config) -> Optional[boo return True ignore_globs = config._getconftest_pathlist( - "collect_ignore_glob", path=collection_path.parent, rootpath=config.rootpath + "collect_ignore_glob", path=collection_path.parent ) ignore_globs = ignore_globs or [] excludeglobopt = config.getoption("ignore_glob") @@ -410,6 +412,12 @@ def pytest_ignore_collect(collection_path: Path, config: Config) -> Optional[boo return None +def pytest_collect_directory( + path: Path, parent: nodes.Collector +) -> Optional[nodes.Collector]: + return Dir.from_parent(parent, path=path) + + def pytest_collection_modifyitems(items: List[nodes.Item], config: Config) -> None: deselect_prefixes = tuple(config.getoption("deselect") or []) if not deselect_prefixes: @@ -429,11 +437,15 @@ def pytest_collection_modifyitems(items: List[nodes.Item], config: Config) -> No class FSHookProxy: - def __init__(self, pm: PytestPluginManager, remove_mods) -> None: + def __init__( + self, + pm: PytestPluginManager, + remove_mods: AbstractSet[object], + ) -> None: self.pm = pm self.remove_mods = remove_mods - def __getattr__(self, name: str): + def __getattr__(self, name: str) -> pluggy.HookCaller: x = self.pm.subset_hook_caller(name, remove_plugins=self.remove_mods) self.__dict__[name] = x return x @@ -462,7 +474,60 @@ class _bestrelpath_cache(Dict[Path, str]): @final -class Session(nodes.FSCollector): +class Dir(nodes.Directory): + """Collector of files in a file system directory. + + .. versionadded:: 8.0 + + .. note:: + + Python directories with an `__init__.py` file are instead collected by + :class:`~pytest.Package` by default. Both are :class:`~pytest.Directory` + collectors. + """ + + @classmethod + def from_parent( # type: ignore[override] + cls, + parent: nodes.Collector, # type: ignore[override] + *, + path: Path, + ) -> "Dir": + """The public constructor. + + :param parent: The parent collector of this Dir. + :param path: The directory's path. + """ + return super().from_parent(parent=parent, path=path) # type: ignore[no-any-return] + + def collect(self) -> Iterable[Union[nodes.Item, nodes.Collector]]: + config = self.config + col: Optional[nodes.Collector] + cols: Sequence[nodes.Collector] + ihook = self.ihook + for direntry in scandir(self.path): + if direntry.is_dir(): + if direntry.name == "__pycache__": + continue + path = Path(direntry.path) + if not self.session.isinitpath(path, with_parents=True): + if ihook.pytest_ignore_collect(collection_path=path, config=config): + continue + col = ihook.pytest_collect_directory(path=path, parent=self) + if col is not None: + yield col + + elif direntry.is_file(): + path = Path(direntry.path) + if not self.session.isinitpath(path): + if ihook.pytest_ignore_collect(collection_path=path, config=config): + continue + cols = ihook.pytest_collect_file(file_path=path, parent=self) + yield from cols + + +@final +class Session(nodes.Collector): """The root of the collection tree. ``Session`` collects the initial paths given as arguments to pytest. @@ -478,6 +543,7 @@ class Session(nodes.FSCollector): def __init__(self, config: Config) -> None: super().__init__( + name="", path=config.rootpath, fspath=None, parent=None, @@ -487,10 +553,15 @@ class Session(nodes.FSCollector): ) self.testsfailed = 0 self.testscollected = 0 - self.shouldstop: Union[bool, str] = False - self.shouldfail: Union[bool, str] = False + self._shouldstop: Union[bool, str] = False + self._shouldfail: Union[bool, str] = False self.trace = config.trace.root.get("collection") self._initialpaths: FrozenSet[Path] = frozenset() + self._initialpaths_with_parents: FrozenSet[Path] = frozenset() + self._notfound: List[Tuple[str, Sequence[nodes.Collector]]] = [] + self._initial_parts: List[Tuple[Path, List[str]]] = [] + self._collection_cache: Dict[nodes.Collector, CollectReport] = {} + self.items: List[nodes.Item] = [] self._bestrelpathcache: Dict[Path, str] = _bestrelpath_cache(config.rootpath) @@ -511,6 +582,42 @@ class Session(nodes.FSCollector): ) @property + def shouldstop(self) -> Union[bool, str]: + return self._shouldstop + + @shouldstop.setter + def shouldstop(self, value: Union[bool, str]) -> None: + # The runner checks shouldfail and assumes that if it is set we are + # definitely stopping, so prevent unsetting it. + if value is False and self._shouldstop: + warnings.warn( + PytestWarning( + "session.shouldstop cannot be unset after it has been set; ignoring." + ), + stacklevel=2, + ) + return + self._shouldstop = value + + @property + def shouldfail(self) -> Union[bool, str]: + return self._shouldfail + + @shouldfail.setter + def shouldfail(self, value: Union[bool, str]) -> None: + # The runner checks shouldfail and assumes that if it is set we are + # definitely stopping, so prevent unsetting it. + if value is False and self._shouldfail: + warnings.warn( + PytestWarning( + "session.shouldfail cannot be unset after it has been set; ignoring." + ), + stacklevel=2, + ) + return + self._shouldfail = value + + @property def startpath(self) -> Path: """The path from which pytest was invoked. @@ -541,65 +648,77 @@ class Session(nodes.FSCollector): pytest_collectreport = pytest_runtest_logreport - def isinitpath(self, path: Union[str, "os.PathLike[str]"]) -> bool: + def isinitpath( + self, + path: Union[str, "os.PathLike[str]"], + *, + with_parents: bool = False, + ) -> bool: + """Is path an initial path? + + An initial path is a path explicitly given to pytest on the command + line. + + :param with_parents: + If set, also return True if the path is a parent of an initial path. + + .. versionchanged:: 8.0 + Added the ``with_parents`` parameter. + """ # Optimization: Path(Path(...)) is much slower than isinstance. path_ = path if isinstance(path, Path) else Path(path) - return path_ in self._initialpaths + if with_parents: + return path_ in self._initialpaths_with_parents + else: + return path_ in self._initialpaths - def gethookproxy(self, fspath: "os.PathLike[str]"): + def gethookproxy(self, fspath: "os.PathLike[str]") -> pluggy.HookRelay: # Optimization: Path(Path(...)) is much slower than isinstance. path = fspath if isinstance(fspath, Path) else Path(fspath) pm = self.config.pluginmanager # Check if we have the common case of running # hooks with all conftest.py files. - my_conftestmodules = pm._getconftestmodules( - path, - self.config.getoption("importmode"), - rootpath=self.config.rootpath, - ) + my_conftestmodules = pm._getconftestmodules(path) remove_mods = pm._conftest_plugins.difference(my_conftestmodules) + proxy: pluggy.HookRelay if remove_mods: - # One or more conftests are not in use at this fspath. - from .config.compat import PathAwareHookProxy - - proxy = PathAwareHookProxy(FSHookProxy(pm, remove_mods)) + # One or more conftests are not in use at this path. + proxy = PathAwareHookProxy(FSHookProxy(pm, remove_mods)) # type: ignore[arg-type,assignment] else: # All plugins are active for this fspath. proxy = self.config.hook return proxy - def _recurse(self, direntry: "os.DirEntry[str]") -> bool: - if direntry.name == "__pycache__": - return False - fspath = Path(direntry.path) - ihook = self.gethookproxy(fspath.parent) - if ihook.pytest_ignore_collect(collection_path=fspath, config=self.config): - return False - return True - - def _collectfile( - self, fspath: Path, handle_dupes: bool = True + def _collect_path( + self, + path: Path, + path_cache: Dict[Path, Sequence[nodes.Collector]], ) -> Sequence[nodes.Collector]: - assert ( - fspath.is_file() - ), "{!r} is not a file (isdir={!r}, exists={!r}, islink={!r})".format( - fspath, fspath.is_dir(), fspath.exists(), fspath.is_symlink() - ) - ihook = self.gethookproxy(fspath) - if not self.isinitpath(fspath): - if ihook.pytest_ignore_collect(collection_path=fspath, config=self.config): - return () + """Create a Collector for the given path. - if handle_dupes: - keepduplicates = self.config.getoption("keepduplicates") - if not keepduplicates: - duplicate_paths = self.config.pluginmanager._duplicatepaths - if fspath in duplicate_paths: - return () - else: - duplicate_paths.add(fspath) + `path_cache` makes it so the same Collectors are returned for the same + path. + """ + if path in path_cache: + return path_cache[path] + + if path.is_dir(): + ihook = self.gethookproxy(path.parent) + col: Optional[nodes.Collector] = ihook.pytest_collect_directory( + path=path, parent=self + ) + cols: Sequence[nodes.Collector] = (col,) if col is not None else () - return ihook.pytest_collect_file(file_path=fspath, parent=self) # type: ignore[no-any-return] + elif path.is_file(): + ihook = self.gethookproxy(path) + cols = ihook.pytest_collect_file(file_path=path, parent=self) + + else: + # Broken symlink or invalid/missing file. + cols = () + + path_cache[path] = cols + return cols @overload def perform_collect( @@ -635,15 +754,16 @@ class Session(nodes.FSCollector): self.trace("perform_collect", self, args) self.trace.root.indent += 1 - self._notfound: List[Tuple[str, Sequence[nodes.Collector]]] = [] - self._initial_parts: List[Tuple[Path, List[str]]] = [] - self.items: List[nodes.Item] = [] - hook = self.config.hook + self._notfound = [] + self._initial_parts = [] + self._collection_cache = {} + self.items = [] items: Sequence[Union[nodes.Item, nodes.Collector]] = self.items try: initialpaths: List[Path] = [] + initialpaths_with_parents: List[Path] = [] for arg in args: fspath, parts = resolve_collection_argument( self.config.invocation_params.dir, @@ -652,7 +772,11 @@ class Session(nodes.FSCollector): ) self._initial_parts.append((fspath, parts)) initialpaths.append(fspath) + initialpaths_with_parents.append(fspath) + initialpaths_with_parents.extend(fspath.parents) self._initialpaths = frozenset(initialpaths) + self._initialpaths_with_parents = frozenset(initialpaths_with_parents) + rep = collect_one_node(self) self.ihook.pytest_collectreport(report=rep) self.trace.root.indent -= 1 @@ -661,12 +785,13 @@ class Session(nodes.FSCollector): for arg, collectors in self._notfound: if collectors: errors.append( - f"not found: {arg}\n(no name {arg!r} in any of {collectors!r})" + f"not found: {arg}\n(no match in any of {collectors!r})" ) else: errors.append(f"found no collectors for {arg}") raise UsageError(*errors) + if not genitems: items = rep.result else: @@ -679,154 +804,126 @@ class Session(nodes.FSCollector): session=self, config=self.config, items=items ) finally: + self._notfound = [] + self._initial_parts = [] + self._collection_cache = {} hook.pytest_collection_finish(session=self) - self.testscollected = len(items) - return items + if genitems: + self.testscollected = len(items) - def collect(self) -> Iterator[Union[nodes.Item, nodes.Collector]]: - from _pytest.python import Package + return items - # Keep track of any collected nodes in here, so we don't duplicate fixtures. - node_cache1: Dict[Path, Sequence[nodes.Collector]] = {} - node_cache2: Dict[Tuple[Type[nodes.Collector], Path], nodes.Collector] = {} + def _collect_one_node( + self, + node: nodes.Collector, + handle_dupes: bool = True, + ) -> Tuple[CollectReport, bool]: + if node in self._collection_cache and handle_dupes: + rep = self._collection_cache[node] + return rep, True + else: + rep = collect_one_node(node) + self._collection_cache[node] = rep + return rep, False - # Keep track of any collected collectors in matchnodes paths, so they - # are not collected more than once. - matchnodes_cache: Dict[Tuple[Type[nodes.Collector], str], CollectReport] = {} + def collect(self) -> Iterator[Union[nodes.Item, nodes.Collector]]: + # This is a cache for the root directories of the initial paths. + # We can't use collection_cache for Session because of its special + # role as the bootstrapping collector. + path_cache: Dict[Path, Sequence[nodes.Collector]] = {} - # Directories of pkgs with dunder-init files. - pkg_roots: Dict[Path, Package] = {} + pm = self.config.pluginmanager for argpath, names in self._initial_parts: self.trace("processing argument", (argpath, names)) self.trace.root.indent += 1 - # Start with a Session root, and delve to argpath item (dir or file) - # and stack all Packages found on the way. - # No point in finding packages when collecting doctests. - if not self.config.getoption("doctestmodules", False): - pm = self.config.pluginmanager - for parent in (argpath, *argpath.parents): - if not pm._is_in_confcutdir(argpath): - break - - if parent.is_dir(): - pkginit = parent / "__init__.py" - if pkginit.is_file() and pkginit not in node_cache1: - col = self._collectfile(pkginit, handle_dupes=False) - if col: - if isinstance(col[0], Package): - pkg_roots[parent] = col[0] - node_cache1[col[0].path] = [col[0]] - - # If it's a directory argument, recurse and look for any Subpackages. - # Let the Package collector deal with subnodes, don't collect here. + # resolve_collection_argument() ensures this. if argpath.is_dir(): assert not names, f"invalid arg {(argpath, names)!r}" - seen_dirs: Set[Path] = set() - for direntry in visit(argpath, self._recurse): - if not direntry.is_file(): - continue - - path = Path(direntry.path) - dirpath = path.parent - - if dirpath not in seen_dirs: - # Collect packages first. - seen_dirs.add(dirpath) - pkginit = dirpath / "__init__.py" - if pkginit.exists(): - for x in self._collectfile(pkginit): - yield x - if isinstance(x, Package): - pkg_roots[dirpath] = x - if dirpath in pkg_roots: - # Do not collect packages here. - continue + # Match the argpath from the root, e.g. + # /a/b/c.py -> [/, /a, /a/b, /a/b/c.py] + paths = [*reversed(argpath.parents), argpath] + # Paths outside of the confcutdir should not be considered, unless + # it's the argpath itself. + while len(paths) > 1 and not pm._is_in_confcutdir(paths[0]): + paths = paths[1:] + + # Start going over the parts from the root, collecting each level + # and discarding all nodes which don't match the level's part. + any_matched_in_initial_part = False + notfound_collectors = [] + work: List[ + Tuple[Union[nodes.Collector, nodes.Item], List[Union[Path, str]]] + ] = [(self, paths + names)] + while work: + matchnode, matchparts = work.pop() + + # Pop'd all of the parts, this is a match. + if not matchparts: + yield matchnode + any_matched_in_initial_part = True + continue - for x in self._collectfile(path): - key2 = (type(x), x.path) - if key2 in node_cache2: - yield node_cache2[key2] - else: - node_cache2[key2] = x - yield x - else: - assert argpath.is_file() + # Should have been matched by now, discard. + if not isinstance(matchnode, nodes.Collector): + continue - if argpath in node_cache1: - col = node_cache1[argpath] + # Collect this level of matching. + # Collecting Session (self) is done directly to avoid endless + # recursion to this function. + subnodes: Sequence[Union[nodes.Collector, nodes.Item]] + if isinstance(matchnode, Session): + assert isinstance(matchparts[0], Path) + subnodes = matchnode._collect_path(matchparts[0], path_cache) else: - collect_root = pkg_roots.get(argpath.parent, self) - col = collect_root._collectfile(argpath, handle_dupes=False) - if col: - node_cache1[argpath] = col - - matching = [] - work: List[ - Tuple[Sequence[Union[nodes.Item, nodes.Collector]], Sequence[str]] - ] = [(col, names)] - while work: - self.trace("matchnodes", col, names) - self.trace.root.indent += 1 - - matchnodes, matchnames = work.pop() - for node in matchnodes: - if not matchnames: - matching.append(node) - continue - if not isinstance(node, nodes.Collector): - continue - key = (type(node), node.nodeid) - if key in matchnodes_cache: - rep = matchnodes_cache[key] - else: - rep = collect_one_node(node) - matchnodes_cache[key] = rep - if rep.passed: - submatchnodes = [] - for r in rep.result: - # TODO: Remove parametrized workaround once collection structure contains - # parametrization. - if ( - r.name == matchnames[0] - or r.name.split("[")[0] == matchnames[0] - ): - submatchnodes.append(r) - if submatchnodes: - work.append((submatchnodes, matchnames[1:])) - else: - # Report collection failures here to avoid failing to run some test - # specified in the command line because the module could not be - # imported (#134). - node.ihook.pytest_collectreport(report=rep) - - self.trace("matchnodes finished -> ", len(matching), "nodes") - self.trace.root.indent -= 1 - - if not matching: - report_arg = "::".join((str(argpath), *names)) - self._notfound.append((report_arg, col)) - continue + # For backward compat, files given directly multiple + # times on the command line should not be deduplicated. + handle_dupes = not ( + len(matchparts) == 1 + and isinstance(matchparts[0], Path) + and matchparts[0].is_file() + ) + rep, duplicate = self._collect_one_node(matchnode, handle_dupes) + if not duplicate and not rep.passed: + # Report collection failures here to avoid failing to + # run some test specified in the command line because + # the module could not be imported (#134). + matchnode.ihook.pytest_collectreport(report=rep) + if not rep.passed: + continue + subnodes = rep.result + + # Prune this level. + any_matched_in_collector = False + for node in reversed(subnodes): + # Path part e.g. `/a/b/` in `/a/b/test_file.py::TestIt::test_it`. + if isinstance(matchparts[0], Path): + is_match = node.path == matchparts[0] + if sys.platform == "win32" and not is_match: + # In case the file paths do not match, fallback to samefile() to + # account for short-paths on Windows (#11895). + is_match = os.path.samefile(node.path, matchparts[0]) + # Name part e.g. `TestIt` in `/a/b/test_file.py::TestIt::test_it`. + else: + # TODO: Remove parametrized workaround once collection structure contains + # parametrization. + is_match = ( + node.name == matchparts[0] + or node.name.split("[")[0] == matchparts[0] + ) + if is_match: + work.append((node, matchparts[1:])) + any_matched_in_collector = True - # If __init__.py was the only file requested, then the matched - # node will be the corresponding Package (by default), and the - # first yielded item will be the __init__ Module itself, so - # just use that. If this special case isn't taken, then all the - # files in the package will be yielded. - if argpath.name == "__init__.py" and isinstance(matching[0], Package): - try: - yield next(iter(matching[0].collect())) - except StopIteration: - # The package collects nothing with only an __init__.py - # file in it, which gets ignored by the default - # "python_files" option. - pass - continue + if not any_matched_in_collector: + notfound_collectors.append(matchnode) - yield from matching + if not any_matched_in_initial_part: + report_arg = "::".join((str(argpath), *names)) + self._notfound.append((report_arg, notfound_collectors)) self.trace.root.indent -= 1 @@ -839,11 +936,17 @@ class Session(nodes.FSCollector): yield node else: assert isinstance(node, nodes.Collector) - rep = collect_one_node(node) + keepduplicates = self.config.getoption("keepduplicates") + # For backward compat, dedup only applies to files. + handle_dupes = not (keepduplicates and isinstance(node, nodes.File)) + rep, duplicate = self._collect_one_node(node, handle_dupes) + if duplicate and not keepduplicates: + return if rep.passed: for subnode in rep.result: yield from self.genitems(subnode) - node.ihook.pytest_collectreport(report=rep) + if not duplicate: + node.ihook.pytest_collectreport(report=rep) def search_pypath(module_name: str) -> str: |