diff options
author | shadchin <shadchin@yandex-team.ru> | 2022-02-10 16:44:39 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:39 +0300 |
commit | e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (patch) | |
tree | 64175d5cadab313b3e7039ebaa06c5bc3295e274 /contrib/python/pytest/py3/_pytest/main.py | |
parent | 2598ef1d0aee359b4b6d5fdd1758916d5907d04f (diff) | |
download | ydb-e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0.tar.gz |
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/main.py')
-rw-r--r-- | contrib/python/pytest/py3/_pytest/main.py | 1148 |
1 files changed, 574 insertions, 574 deletions
diff --git a/contrib/python/pytest/py3/_pytest/main.py b/contrib/python/pytest/py3/_pytest/main.py index 1d76ea62bf..41a33d4494 100644 --- a/contrib/python/pytest/py3/_pytest/main.py +++ b/contrib/python/pytest/py3/_pytest/main.py @@ -1,69 +1,69 @@ -"""Core implementation of the testing process: init, session, runtest loop.""" -import argparse -import fnmatch +"""Core implementation of the testing process: init, session, runtest loop.""" +import argparse +import fnmatch import functools -import importlib +import importlib import os import sys -from pathlib import Path -from typing import Callable -from typing import Dict -from typing import FrozenSet -from typing import Iterator -from typing import List -from typing import Optional -from typing import overload -from typing import Sequence -from typing import Set -from typing import Tuple -from typing import Type -from typing import TYPE_CHECKING -from typing import Union +from pathlib import Path +from typing import Callable +from typing import Dict +from typing import FrozenSet +from typing import Iterator +from typing import List +from typing import Optional +from typing import overload +from typing import Sequence +from typing import Set +from typing import Tuple +from typing import Type +from typing import TYPE_CHECKING +from typing import Union import attr import py import _pytest._code from _pytest import nodes -from _pytest.compat import final -from _pytest.config import Config +from _pytest.compat import final +from _pytest.config import Config from _pytest.config import directory_arg -from _pytest.config import ExitCode +from _pytest.config import ExitCode from _pytest.config import hookimpl -from _pytest.config import PytestPluginManager +from _pytest.config import PytestPluginManager from _pytest.config import UsageError -from _pytest.config.argparsing import Parser -from _pytest.fixtures import FixtureManager +from _pytest.config.argparsing import Parser +from _pytest.fixtures import FixtureManager from _pytest.outcomes import exit -from _pytest.pathlib import absolutepath -from _pytest.pathlib import bestrelpath -from _pytest.pathlib import visit -from _pytest.reports import CollectReport -from _pytest.reports import TestReport +from _pytest.pathlib import absolutepath +from _pytest.pathlib import bestrelpath +from _pytest.pathlib import visit +from _pytest.reports import CollectReport +from _pytest.reports import TestReport from _pytest.runner import collect_one_node -from _pytest.runner import SetupState +from _pytest.runner import SetupState -if TYPE_CHECKING: - from typing_extensions import Literal +if TYPE_CHECKING: + from typing_extensions import Literal - -def pytest_addoption(parser: Parser) -> None: + +def pytest_addoption(parser: Parser) -> None: parser.addini( "norecursedirs", "directory patterns to avoid for recursion", type="args", - default=[ - "*.egg", - ".*", - "_darcs", - "build", - "CVS", - "dist", - "node_modules", - "venv", - "{arch}", - ], + default=[ + "*.egg", + ".*", + "_darcs", + "build", + "CVS", + "dist", + "node_modules", + "venv", + "{arch}", + ], ) parser.addini( "testpaths", @@ -80,21 +80,21 @@ def pytest_addoption(parser: Parser) -> None: dest="maxfail", const=1, help="exit instantly on first error or failed test.", - ) - group = parser.getgroup("pytest-warnings") - group.addoption( - "-W", - "--pythonwarnings", - action="append", - help="set which warnings to report, see -W option of python itself.", - ) - parser.addini( - "filterwarnings", - type="linelist", - help="Each line specifies a pattern for " - "warnings.filterwarnings. " - "Processed after -W/--pythonwarnings.", - ) + ) + group = parser.getgroup("pytest-warnings") + group.addoption( + "-W", + "--pythonwarnings", + action="append", + help="set which warnings to report, see -W option of python itself.", + ) + parser.addini( + "filterwarnings", + type="linelist", + help="Each line specifies a pattern for " + "warnings.filterwarnings. " + "Processed after -W/--pythonwarnings.", + ) group._addoption( "--maxfail", metavar="num", @@ -105,19 +105,19 @@ def pytest_addoption(parser: Parser) -> None: help="exit after first num failures or errors.", ) group._addoption( - "--strict-config", - action="store_true", - help="any warnings encountered while parsing the `pytest` section of the configuration file raise errors.", - ) - group._addoption( - "--strict-markers", + "--strict-config", action="store_true", - help="markers not registered in the `markers` section of the configuration file raise errors.", + help="any warnings encountered while parsing the `pytest` section of the configuration file raise errors.", + ) + group._addoption( + "--strict-markers", + action="store_true", + help="markers not registered in the `markers` section of the configuration file raise errors.", + ) + group._addoption( + "--strict", action="store_true", help="(deprecated) alias to --strict-markers.", ) group._addoption( - "--strict", action="store_true", help="(deprecated) alias to --strict-markers.", - ) - group._addoption( "-c", metavar="file", type=str, @@ -145,10 +145,10 @@ def pytest_addoption(parser: Parser) -> None: group.addoption( "--collectonly", "--collect-only", - "--co", + "--co", action="store_true", help="only collect tests, don't execute them.", - ) + ) group.addoption( "--pyargs", action="store_true", @@ -161,16 +161,16 @@ def pytest_addoption(parser: Parser) -> None: help="ignore path during collection (multi-allowed).", ) group.addoption( - "--ignore-glob", - action="append", - metavar="path", - help="ignore path pattern during collection (multi-allowed).", - ) - group.addoption( + "--ignore-glob", + action="append", + metavar="path", + help="ignore path pattern during collection (multi-allowed).", + ) + group.addoption( "--deselect", action="append", metavar="nodeid_prefix", - help="deselect item (via node id prefix) during collection (multi-allowed).", + help="deselect item (via node id prefix) during collection (multi-allowed).", ) group.addoption( "--confcutdir", @@ -202,21 +202,21 @@ def pytest_addoption(parser: Parser) -> None: default=False, help="Don't ignore tests in a local virtualenv directory", ) - group.addoption( - "--import-mode", - default="prepend", - choices=["prepend", "append", "importlib"], - dest="importmode", - help="prepend/append to sys.path when importing test modules and conftest files, " - "default is to prepend.", - ) + group.addoption( + "--import-mode", + default="prepend", + choices=["prepend", "append", "importlib"], + dest="importmode", + help="prepend/append to sys.path when importing test modules and conftest files, " + "default is to prepend.", + ) group = parser.getgroup("debugconfig", "test session debugging and configuration") group.addoption( "--basetemp", dest="basetemp", default=None, - type=validate_basetemp, + type=validate_basetemp, metavar="dir", help=( "base temporary directory for this test run." @@ -225,40 +225,40 @@ def pytest_addoption(parser: Parser) -> None: ) -def validate_basetemp(path: str) -> str: - # GH 7119 - msg = "basetemp must not be empty, the current working directory or any parent directory of it" - - # empty path - if not path: - raise argparse.ArgumentTypeError(msg) - - def is_ancestor(base: Path, query: Path) -> bool: - """Return whether query is an ancestor of base.""" - if base == query: - return True - for parent in base.parents: - if parent == query: - return True - return False - - # check if path is an ancestor of cwd - if is_ancestor(Path.cwd(), Path(path).absolute()): - raise argparse.ArgumentTypeError(msg) - - # check symlinks for ancestors - if is_ancestor(Path.cwd().resolve(), Path(path).resolve()): - raise argparse.ArgumentTypeError(msg) - - return path - - -def wrap_session( - config: Config, doit: Callable[[Config, "Session"], Optional[Union[int, ExitCode]]] -) -> Union[int, ExitCode]: - """Skeleton command line program.""" - session = Session.from_config(config) - session.exitstatus = ExitCode.OK +def validate_basetemp(path: str) -> str: + # GH 7119 + msg = "basetemp must not be empty, the current working directory or any parent directory of it" + + # empty path + if not path: + raise argparse.ArgumentTypeError(msg) + + def is_ancestor(base: Path, query: Path) -> bool: + """Return whether query is an ancestor of base.""" + if base == query: + return True + for parent in base.parents: + if parent == query: + return True + return False + + # check if path is an ancestor of cwd + if is_ancestor(Path.cwd(), Path(path).absolute()): + raise argparse.ArgumentTypeError(msg) + + # check symlinks for ancestors + if is_ancestor(Path.cwd().resolve(), Path(path).resolve()): + raise argparse.ArgumentTypeError(msg) + + return path + + +def wrap_session( + config: Config, doit: Callable[[Config, "Session"], Optional[Union[int, ExitCode]]] +) -> Union[int, ExitCode]: + """Skeleton command line program.""" + session = Session.from_config(config) + session.exitstatus = ExitCode.OK initstate = 0 try: try: @@ -268,77 +268,77 @@ def wrap_session( initstate = 2 session.exitstatus = doit(config, session) or 0 except UsageError: - session.exitstatus = ExitCode.USAGE_ERROR + session.exitstatus = ExitCode.USAGE_ERROR raise except Failed: - session.exitstatus = ExitCode.TESTS_FAILED - except (KeyboardInterrupt, exit.Exception): - excinfo = _pytest._code.ExceptionInfo.from_current() - exitstatus: Union[int, ExitCode] = ExitCode.INTERRUPTED - if isinstance(excinfo.value, exit.Exception): + session.exitstatus = ExitCode.TESTS_FAILED + except (KeyboardInterrupt, exit.Exception): + excinfo = _pytest._code.ExceptionInfo.from_current() + exitstatus: Union[int, ExitCode] = ExitCode.INTERRUPTED + if isinstance(excinfo.value, exit.Exception): if excinfo.value.returncode is not None: exitstatus = excinfo.value.returncode - if initstate < 2: - sys.stderr.write(f"{excinfo.typename}: {excinfo.value.msg}\n") + if initstate < 2: + sys.stderr.write(f"{excinfo.typename}: {excinfo.value.msg}\n") config.hook.pytest_keyboard_interrupt(excinfo=excinfo) session.exitstatus = exitstatus - except BaseException: - session.exitstatus = ExitCode.INTERNAL_ERROR - excinfo = _pytest._code.ExceptionInfo.from_current() - try: - config.notify_exception(excinfo, config.option) - except exit.Exception as exc: - if exc.returncode is not None: - session.exitstatus = exc.returncode - sys.stderr.write("{}: {}\n".format(type(exc).__name__, exc)) - else: - if isinstance(excinfo.value, SystemExit): - sys.stderr.write("mainloop: caught unexpected SystemExit!\n") + except BaseException: + session.exitstatus = ExitCode.INTERNAL_ERROR + excinfo = _pytest._code.ExceptionInfo.from_current() + try: + config.notify_exception(excinfo, config.option) + except exit.Exception as exc: + if exc.returncode is not None: + session.exitstatus = exc.returncode + sys.stderr.write("{}: {}\n".format(type(exc).__name__, exc)) + else: + if isinstance(excinfo.value, SystemExit): + sys.stderr.write("mainloop: caught unexpected SystemExit!\n") finally: - # Explicitly break reference cycle. - excinfo = None # type: ignore + # Explicitly break reference cycle. + excinfo = None # type: ignore session.startdir.chdir() if initstate >= 2: - try: - config.hook.pytest_sessionfinish( - session=session, exitstatus=session.exitstatus - ) - except exit.Exception as exc: - if exc.returncode is not None: - session.exitstatus = exc.returncode - sys.stderr.write("{}: {}\n".format(type(exc).__name__, exc)) + try: + config.hook.pytest_sessionfinish( + session=session, exitstatus=session.exitstatus + ) + except exit.Exception as exc: + if exc.returncode is not None: + session.exitstatus = exc.returncode + sys.stderr.write("{}: {}\n".format(type(exc).__name__, exc)) config._ensure_unconfigure() return session.exitstatus -def pytest_cmdline_main(config: Config) -> Union[int, ExitCode]: +def pytest_cmdline_main(config: Config) -> Union[int, ExitCode]: return wrap_session(config, _main) -def _main(config: Config, session: "Session") -> Optional[Union[int, ExitCode]]: - """Default command line protocol for initialization, session, - running tests and reporting.""" +def _main(config: Config, session: "Session") -> Optional[Union[int, ExitCode]]: + """Default command line protocol for initialization, session, + running tests and reporting.""" config.hook.pytest_collection(session=session) config.hook.pytest_runtestloop(session=session) if session.testsfailed: - return ExitCode.TESTS_FAILED + return ExitCode.TESTS_FAILED elif session.testscollected == 0: - return ExitCode.NO_TESTS_COLLECTED - return None + return ExitCode.NO_TESTS_COLLECTED + return None -def pytest_collection(session: "Session") -> None: - session.perform_collect() +def pytest_collection(session: "Session") -> None: + session.perform_collect() -def pytest_runtestloop(session: "Session") -> bool: +def pytest_runtestloop(session: "Session") -> bool: if session.testsfailed and not session.config.option.continue_on_collection_errors: - raise session.Interrupted( - "%d error%s during collection" - % (session.testsfailed, "s" if session.testsfailed != 1 else "") - ) + raise session.Interrupted( + "%d error%s during collection" + % (session.testsfailed, "s" if session.testsfailed != 1 else "") + ) if session.config.option.collectonly: return True @@ -353,9 +353,9 @@ def pytest_runtestloop(session: "Session") -> bool: return True -def _in_venv(path: py.path.local) -> bool: - """Attempt to detect if ``path`` is the root of a Virtual Environment by - checking for the existence of the appropriate activate script.""" +def _in_venv(path: py.path.local) -> bool: + """Attempt to detect if ``path`` is the root of a Virtual Environment by + checking for the existence of the appropriate activate script.""" bindir = path.join("Scripts" if sys.platform.startswith("win") else "bin") if not bindir.isdir(): return False @@ -370,7 +370,7 @@ def _in_venv(path: py.path.local) -> bool: return any([fname.basename in activates for fname in bindir.listdir()]) -def pytest_ignore_collect(path: py.path.local, config: Config) -> Optional[bool]: +def pytest_ignore_collect(path: py.path.local, config: Config) -> Optional[bool]: ignore_paths = config._getconftest_pathlist("collect_ignore", path=path.dirpath()) ignore_paths = ignore_paths or [] excludeopt = config.getoption("ignore") @@ -380,24 +380,24 @@ def pytest_ignore_collect(path: py.path.local, config: Config) -> Optional[bool] if py.path.local(path) in ignore_paths: return True - ignore_globs = config._getconftest_pathlist( - "collect_ignore_glob", path=path.dirpath() - ) - ignore_globs = ignore_globs or [] - excludeglobopt = config.getoption("ignore_glob") - if excludeglobopt: - ignore_globs.extend([py.path.local(x) for x in excludeglobopt]) - - if any(fnmatch.fnmatch(str(path), str(glob)) for glob in ignore_globs): - return True - + ignore_globs = config._getconftest_pathlist( + "collect_ignore_glob", path=path.dirpath() + ) + ignore_globs = ignore_globs or [] + excludeglobopt = config.getoption("ignore_glob") + if excludeglobopt: + ignore_globs.extend([py.path.local(x) for x in excludeglobopt]) + + if any(fnmatch.fnmatch(str(path), str(glob)) for glob in ignore_globs): + return True + allow_in_venv = config.getoption("collect_in_virtualenv") if not allow_in_venv and _in_venv(path): return True - return None + return None -def pytest_collection_modifyitems(items: List[nodes.Item], config: Config) -> None: +def pytest_collection_modifyitems(items: List[nodes.Item], config: Config) -> None: deselect_prefixes = tuple(config.getoption("deselect") or []) if not deselect_prefixes: return @@ -415,92 +415,92 @@ def pytest_collection_modifyitems(items: List[nodes.Item], config: Config) -> No items[:] = remaining -class FSHookProxy: - def __init__(self, pm: PytestPluginManager, remove_mods) -> None: - self.pm = pm - self.remove_mods = remove_mods +class FSHookProxy: + def __init__(self, pm: PytestPluginManager, remove_mods) -> None: + self.pm = pm + self.remove_mods = remove_mods + + def __getattr__(self, name: str): + x = self.pm.subset_hook_caller(name, remove_plugins=self.remove_mods) + self.__dict__[name] = x + return x - def __getattr__(self, name: str): - x = self.pm.subset_hook_caller(name, remove_plugins=self.remove_mods) - self.__dict__[name] = x - return x - class Interrupted(KeyboardInterrupt): - """Signals that the test run was interrupted.""" + """Signals that the test run was interrupted.""" - __module__ = "builtins" # For py3. + __module__ = "builtins" # For py3. class Failed(Exception): - """Signals a stop as failed test run.""" + """Signals a stop as failed test run.""" @attr.s -class _bestrelpath_cache(Dict[Path, str]): - path = attr.ib(type=Path) +class _bestrelpath_cache(Dict[Path, str]): + path = attr.ib(type=Path) - def __missing__(self, path: Path) -> str: - r = bestrelpath(self.path, path) + def __missing__(self, path: Path) -> str: + r = bestrelpath(self.path, path) self[path] = r return r -@final +@final class Session(nodes.FSCollector): Interrupted = Interrupted Failed = Failed - # Set on the session by runner.pytest_sessionstart. - _setupstate: SetupState - # Set on the session by fixtures.pytest_sessionstart. - _fixturemanager: FixtureManager - exitstatus: Union[int, ExitCode] - - def __init__(self, config: Config) -> None: - super().__init__( - config.rootdir, parent=None, config=config, session=self, nodeid="" + # Set on the session by runner.pytest_sessionstart. + _setupstate: SetupState + # Set on the session by fixtures.pytest_sessionstart. + _fixturemanager: FixtureManager + exitstatus: Union[int, ExitCode] + + def __init__(self, config: Config) -> None: + super().__init__( + config.rootdir, parent=None, config=config, session=self, nodeid="" ) self.testsfailed = 0 self.testscollected = 0 - self.shouldstop: Union[bool, str] = False - self.shouldfail: Union[bool, str] = False + self.shouldstop: Union[bool, str] = False + self.shouldfail: Union[bool, str] = False self.trace = config.trace.root.get("collection") - self.startdir = config.invocation_dir - self._initialpaths: FrozenSet[py.path.local] = frozenset() - - self._bestrelpathcache: Dict[Path, str] = _bestrelpath_cache(config.rootpath) - + self.startdir = config.invocation_dir + self._initialpaths: FrozenSet[py.path.local] = frozenset() + + self._bestrelpathcache: Dict[Path, str] = _bestrelpath_cache(config.rootpath) + self.config.pluginmanager.register(self, name="session") - @classmethod - def from_config(cls, config: Config) -> "Session": - session: Session = cls._create(config) - return session - - def __repr__(self) -> str: - return "<%s %s exitstatus=%r testsfailed=%d testscollected=%d>" % ( - self.__class__.__name__, - self.name, - getattr(self, "exitstatus", "<UNSET>"), - self.testsfailed, - self.testscollected, - ) - - def _node_location_to_relpath(self, node_path: Path) -> str: - # bestrelpath is a quite slow function. + @classmethod + def from_config(cls, config: Config) -> "Session": + session: Session = cls._create(config) + return session + + def __repr__(self) -> str: + return "<%s %s exitstatus=%r testsfailed=%d testscollected=%d>" % ( + self.__class__.__name__, + self.name, + getattr(self, "exitstatus", "<UNSET>"), + self.testsfailed, + self.testscollected, + ) + + def _node_location_to_relpath(self, node_path: Path) -> str: + # bestrelpath is a quite slow function. return self._bestrelpathcache[node_path] @hookimpl(tryfirst=True) - def pytest_collectstart(self) -> None: + def pytest_collectstart(self) -> None: if self.shouldfail: raise self.Failed(self.shouldfail) if self.shouldstop: raise self.Interrupted(self.shouldstop) @hookimpl(tryfirst=True) - def pytest_runtest_logreport( - self, report: Union[TestReport, CollectReport] - ) -> None: + def pytest_runtest_logreport( + self, report: Union[TestReport, CollectReport] + ) -> None: if report.failed and not hasattr(report, "wasxfail"): self.testsfailed += 1 maxfail = self.config.getvalue("maxfail") @@ -509,296 +509,296 @@ class Session(nodes.FSCollector): pytest_collectreport = pytest_runtest_logreport - def isinitpath(self, path: py.path.local) -> bool: + def isinitpath(self, path: py.path.local) -> bool: return path in self._initialpaths - def gethookproxy(self, fspath: py.path.local): - # Check if we have the common case of running - # hooks with all conftest.py files. - pm = self.config.pluginmanager - my_conftestmodules = pm._getconftestmodules( - fspath, self.config.getoption("importmode") - ) - remove_mods = pm._conftest_plugins.difference(my_conftestmodules) - if remove_mods: - # One or more conftests are not in use at this fspath. - proxy = FSHookProxy(pm, remove_mods) - else: - # All plugins are active for this fspath. - proxy = self.config.hook - return proxy - - def _recurse(self, direntry: "os.DirEntry[str]") -> bool: - if direntry.name == "__pycache__": - return False - path = py.path.local(direntry.path) - ihook = self.gethookproxy(path.dirpath()) - if ihook.pytest_ignore_collect(path=path, config=self.config): - return False - norecursepatterns = self.config.getini("norecursedirs") - if any(path.check(fnmatch=pat) for pat in norecursepatterns): - return False - return True - - def _collectfile( - self, path: py.path.local, handle_dupes: bool = True - ) -> Sequence[nodes.Collector]: - assert ( - path.isfile() - ), "{!r} is not a file (isdir={!r}, exists={!r}, islink={!r})".format( - path, path.isdir(), path.exists(), path.islink() - ) - ihook = self.gethookproxy(path) - if not self.isinitpath(path): - if ihook.pytest_ignore_collect(path=path, config=self.config): - return () - - if handle_dupes: - keepduplicates = self.config.getoption("keepduplicates") - if not keepduplicates: - duplicate_paths = self.config.pluginmanager._duplicatepaths - if path in duplicate_paths: - return () - else: - duplicate_paths.add(path) - - return ihook.pytest_collect_file(path=path, parent=self) # type: ignore[no-any-return] - - @overload - def perform_collect( - self, args: Optional[Sequence[str]] = ..., genitems: "Literal[True]" = ... - ) -> Sequence[nodes.Item]: - ... - - @overload - def perform_collect( - self, args: Optional[Sequence[str]] = ..., genitems: bool = ... - ) -> Sequence[Union[nodes.Item, nodes.Collector]]: - ... - - def perform_collect( - self, args: Optional[Sequence[str]] = None, genitems: bool = True - ) -> Sequence[Union[nodes.Item, nodes.Collector]]: - """Perform the collection phase for this session. - - This is called by the default - :func:`pytest_collection <_pytest.hookspec.pytest_collection>` hook - implementation; see the documentation of this hook for more details. - For testing purposes, it may also be called directly on a fresh - ``Session``. - - This function normally recursively expands any collectors collected - from the session to their items, and only items are returned. For - testing purposes, this may be suppressed by passing ``genitems=False``, - in which case the return value contains these collectors unexpanded, - and ``session.items`` is empty. - """ - if args is None: - args = self.config.args - - self.trace("perform_collect", self, args) - self.trace.root.indent += 1 - - self._notfound: List[Tuple[str, Sequence[nodes.Collector]]] = [] - self._initial_parts: List[Tuple[py.path.local, List[str]]] = [] - self.items: List[nodes.Item] = [] - + def gethookproxy(self, fspath: py.path.local): + # Check if we have the common case of running + # hooks with all conftest.py files. + pm = self.config.pluginmanager + my_conftestmodules = pm._getconftestmodules( + fspath, self.config.getoption("importmode") + ) + remove_mods = pm._conftest_plugins.difference(my_conftestmodules) + if remove_mods: + # One or more conftests are not in use at this fspath. + proxy = FSHookProxy(pm, remove_mods) + else: + # All plugins are active for this fspath. + proxy = self.config.hook + return proxy + + def _recurse(self, direntry: "os.DirEntry[str]") -> bool: + if direntry.name == "__pycache__": + return False + path = py.path.local(direntry.path) + ihook = self.gethookproxy(path.dirpath()) + if ihook.pytest_ignore_collect(path=path, config=self.config): + return False + norecursepatterns = self.config.getini("norecursedirs") + if any(path.check(fnmatch=pat) for pat in norecursepatterns): + return False + return True + + def _collectfile( + self, path: py.path.local, handle_dupes: bool = True + ) -> Sequence[nodes.Collector]: + assert ( + path.isfile() + ), "{!r} is not a file (isdir={!r}, exists={!r}, islink={!r})".format( + path, path.isdir(), path.exists(), path.islink() + ) + ihook = self.gethookproxy(path) + if not self.isinitpath(path): + if ihook.pytest_ignore_collect(path=path, config=self.config): + return () + + if handle_dupes: + keepduplicates = self.config.getoption("keepduplicates") + if not keepduplicates: + duplicate_paths = self.config.pluginmanager._duplicatepaths + if path in duplicate_paths: + return () + else: + duplicate_paths.add(path) + + return ihook.pytest_collect_file(path=path, parent=self) # type: ignore[no-any-return] + + @overload + def perform_collect( + self, args: Optional[Sequence[str]] = ..., genitems: "Literal[True]" = ... + ) -> Sequence[nodes.Item]: + ... + + @overload + def perform_collect( + self, args: Optional[Sequence[str]] = ..., genitems: bool = ... + ) -> Sequence[Union[nodes.Item, nodes.Collector]]: + ... + + def perform_collect( + self, args: Optional[Sequence[str]] = None, genitems: bool = True + ) -> Sequence[Union[nodes.Item, nodes.Collector]]: + """Perform the collection phase for this session. + + This is called by the default + :func:`pytest_collection <_pytest.hookspec.pytest_collection>` hook + implementation; see the documentation of this hook for more details. + For testing purposes, it may also be called directly on a fresh + ``Session``. + + This function normally recursively expands any collectors collected + from the session to their items, and only items are returned. For + testing purposes, this may be suppressed by passing ``genitems=False``, + in which case the return value contains these collectors unexpanded, + and ``session.items`` is empty. + """ + if args is None: + args = self.config.args + + self.trace("perform_collect", self, args) + self.trace.root.indent += 1 + + self._notfound: List[Tuple[str, Sequence[nodes.Collector]]] = [] + self._initial_parts: List[Tuple[py.path.local, List[str]]] = [] + self.items: List[nodes.Item] = [] + hook = self.config.hook - - items: Sequence[Union[nodes.Item, nodes.Collector]] = self.items + + items: Sequence[Union[nodes.Item, nodes.Collector]] = self.items try: - initialpaths: List[py.path.local] = [] - for arg in args: - fspath, parts = resolve_collection_argument( - self.config.invocation_params.dir, - arg, - as_pypath=self.config.option.pyargs, - ) - self._initial_parts.append((fspath, parts)) - initialpaths.append(fspath) - self._initialpaths = frozenset(initialpaths) - rep = collect_one_node(self) - self.ihook.pytest_collectreport(report=rep) - self.trace.root.indent -= 1 - if self._notfound: - errors = [] - for arg, cols in self._notfound: - line = f"(no name {arg!r} in any of {cols!r})" - errors.append(f"not found: {arg}\n{line}") - raise UsageError(*errors) - if not genitems: - items = rep.result - else: - if rep.passed: - for node in rep.result: - self.items.extend(self.genitems(node)) - + initialpaths: List[py.path.local] = [] + for arg in args: + fspath, parts = resolve_collection_argument( + self.config.invocation_params.dir, + arg, + as_pypath=self.config.option.pyargs, + ) + self._initial_parts.append((fspath, parts)) + initialpaths.append(fspath) + self._initialpaths = frozenset(initialpaths) + rep = collect_one_node(self) + self.ihook.pytest_collectreport(report=rep) + self.trace.root.indent -= 1 + if self._notfound: + errors = [] + for arg, cols in self._notfound: + line = f"(no name {arg!r} in any of {cols!r})" + errors.append(f"not found: {arg}\n{line}") + raise UsageError(*errors) + if not genitems: + items = rep.result + else: + if rep.passed: + for node in rep.result: + self.items.extend(self.genitems(node)) + self.config.pluginmanager.check_pending() hook.pytest_collection_modifyitems( session=self, config=self.config, items=items ) finally: hook.pytest_collection_finish(session=self) - + self.testscollected = len(items) return items - def collect(self) -> Iterator[Union[nodes.Item, nodes.Collector]]: - from _pytest.python import Package - - # Keep track of any collected nodes in here, so we don't duplicate fixtures. - node_cache1: Dict[py.path.local, Sequence[nodes.Collector]] = {} - node_cache2: Dict[ - Tuple[Type[nodes.Collector], py.path.local], nodes.Collector - ] = ({}) - - # Keep track of any collected collectors in matchnodes paths, so they - # are not collected more than once. - matchnodes_cache: Dict[Tuple[Type[nodes.Collector], str], CollectReport] = ({}) - - # Dirnames of pkgs with dunder-init files. - pkg_roots: Dict[str, Package] = {} - - for argpath, names in self._initial_parts: - self.trace("processing argument", (argpath, names)) - self.trace.root.indent += 1 - - # Start with a Session root, and delve to argpath item (dir or file) - # and stack all Packages found on the way. - # No point in finding packages when collecting doctests. - if not self.config.getoption("doctestmodules", False): - pm = self.config.pluginmanager - for parent in reversed(argpath.parts()): - if pm._confcutdir and pm._confcutdir.relto(parent): - break - - if parent.isdir(): - pkginit = parent.join("__init__.py") - if pkginit.isfile() and pkginit not in node_cache1: + def collect(self) -> Iterator[Union[nodes.Item, nodes.Collector]]: + from _pytest.python import Package + + # Keep track of any collected nodes in here, so we don't duplicate fixtures. + node_cache1: Dict[py.path.local, Sequence[nodes.Collector]] = {} + node_cache2: Dict[ + Tuple[Type[nodes.Collector], py.path.local], nodes.Collector + ] = ({}) + + # Keep track of any collected collectors in matchnodes paths, so they + # are not collected more than once. + matchnodes_cache: Dict[Tuple[Type[nodes.Collector], str], CollectReport] = ({}) + + # Dirnames of pkgs with dunder-init files. + pkg_roots: Dict[str, Package] = {} + + for argpath, names in self._initial_parts: + self.trace("processing argument", (argpath, names)) + self.trace.root.indent += 1 + + # Start with a Session root, and delve to argpath item (dir or file) + # and stack all Packages found on the way. + # No point in finding packages when collecting doctests. + if not self.config.getoption("doctestmodules", False): + pm = self.config.pluginmanager + for parent in reversed(argpath.parts()): + if pm._confcutdir and pm._confcutdir.relto(parent): + break + + if parent.isdir(): + pkginit = parent.join("__init__.py") + if pkginit.isfile() and pkginit not in node_cache1: col = self._collectfile(pkginit, handle_dupes=False) if col: if isinstance(col[0], Package): - pkg_roots[str(parent)] = col[0] - node_cache1[col[0].fspath] = [col[0]] - - # If it's a directory argument, recurse and look for any Subpackages. - # Let the Package collector deal with subnodes, don't collect here. - if argpath.check(dir=1): - assert not names, "invalid arg {!r}".format((argpath, names)) - - seen_dirs: Set[py.path.local] = set() - for direntry in visit(str(argpath), self._recurse): - if not direntry.is_file(): - continue - - path = py.path.local(direntry.path) - dirpath = path.dirpath() - - if dirpath not in seen_dirs: - # Collect packages first. - seen_dirs.add(dirpath) - pkginit = dirpath.join("__init__.py") - if pkginit.exists(): - for x in self._collectfile(pkginit): - yield x - if isinstance(x, Package): - pkg_roots[str(dirpath)] = x - if str(dirpath) in pkg_roots: - # Do not collect packages here. - continue - - for x in self._collectfile(path): - key = (type(x), x.fspath) - if key in node_cache2: - yield node_cache2[key] - else: - node_cache2[key] = x - yield x + pkg_roots[str(parent)] = col[0] + node_cache1[col[0].fspath] = [col[0]] + + # If it's a directory argument, recurse and look for any Subpackages. + # Let the Package collector deal with subnodes, don't collect here. + if argpath.check(dir=1): + assert not names, "invalid arg {!r}".format((argpath, names)) + + seen_dirs: Set[py.path.local] = set() + for direntry in visit(str(argpath), self._recurse): + if not direntry.is_file(): + continue + + path = py.path.local(direntry.path) + dirpath = path.dirpath() + + if dirpath not in seen_dirs: + # Collect packages first. + seen_dirs.add(dirpath) + pkginit = dirpath.join("__init__.py") + if pkginit.exists(): + for x in self._collectfile(pkginit): + yield x + if isinstance(x, Package): + pkg_roots[str(dirpath)] = x + if str(dirpath) in pkg_roots: + # Do not collect packages here. + continue + + for x in self._collectfile(path): + key = (type(x), x.fspath) + if key in node_cache2: + yield node_cache2[key] + else: + node_cache2[key] = x + yield x else: - assert argpath.check(file=1) - - if argpath in node_cache1: - col = node_cache1[argpath] - else: - collect_root = pkg_roots.get(argpath.dirname, self) - col = collect_root._collectfile(argpath, handle_dupes=False) - if col: - node_cache1[argpath] = col - - matching = [] - work: List[ - Tuple[Sequence[Union[nodes.Item, nodes.Collector]], Sequence[str]] - ] = [(col, names)] - while work: - self.trace("matchnodes", col, names) - self.trace.root.indent += 1 - - matchnodes, matchnames = work.pop() - for node in matchnodes: - if not matchnames: - matching.append(node) - continue - if not isinstance(node, nodes.Collector): - continue - key = (type(node), node.nodeid) - if key in matchnodes_cache: - rep = matchnodes_cache[key] - else: - rep = collect_one_node(node) - matchnodes_cache[key] = rep - if rep.passed: - submatchnodes = [] - for r in rep.result: - # TODO: Remove parametrized workaround once collection structure contains - # parametrization. - if ( - r.name == matchnames[0] - or r.name.split("[")[0] == matchnames[0] - ): - submatchnodes.append(r) - if submatchnodes: - work.append((submatchnodes, matchnames[1:])) - # XXX Accept IDs that don't have "()" for class instances. - elif len(rep.result) == 1 and rep.result[0].name == "()": - work.append((rep.result, matchnames)) - else: - # Report collection failures here to avoid failing to run some test - # specified in the command line because the module could not be - # imported (#134). - node.ihook.pytest_collectreport(report=rep) - - self.trace("matchnodes finished -> ", len(matching), "nodes") - self.trace.root.indent -= 1 - - if not matching: - report_arg = "::".join((str(argpath), *names)) - self._notfound.append((report_arg, col)) - continue - - # If __init__.py was the only file requested, then the matched - # node will be the corresponding Package (by default), and the - # first yielded item will be the __init__ Module itself, so - # just use that. If this special case isn't taken, then all the - # files in the package will be yielded. - if argpath.basename == "__init__.py" and isinstance( - matching[0], Package - ): - try: - yield next(iter(matching[0].collect())) - except StopIteration: - # The package collects nothing with only an __init__.py - # file in it, which gets ignored by the default - # "python_files" option. - pass - continue - - yield from matching - - self.trace.root.indent -= 1 - - def genitems( - self, node: Union[nodes.Item, nodes.Collector] - ) -> Iterator[nodes.Item]: + assert argpath.check(file=1) + + if argpath in node_cache1: + col = node_cache1[argpath] + else: + collect_root = pkg_roots.get(argpath.dirname, self) + col = collect_root._collectfile(argpath, handle_dupes=False) + if col: + node_cache1[argpath] = col + + matching = [] + work: List[ + Tuple[Sequence[Union[nodes.Item, nodes.Collector]], Sequence[str]] + ] = [(col, names)] + while work: + self.trace("matchnodes", col, names) + self.trace.root.indent += 1 + + matchnodes, matchnames = work.pop() + for node in matchnodes: + if not matchnames: + matching.append(node) + continue + if not isinstance(node, nodes.Collector): + continue + key = (type(node), node.nodeid) + if key in matchnodes_cache: + rep = matchnodes_cache[key] + else: + rep = collect_one_node(node) + matchnodes_cache[key] = rep + if rep.passed: + submatchnodes = [] + for r in rep.result: + # TODO: Remove parametrized workaround once collection structure contains + # parametrization. + if ( + r.name == matchnames[0] + or r.name.split("[")[0] == matchnames[0] + ): + submatchnodes.append(r) + if submatchnodes: + work.append((submatchnodes, matchnames[1:])) + # XXX Accept IDs that don't have "()" for class instances. + elif len(rep.result) == 1 and rep.result[0].name == "()": + work.append((rep.result, matchnames)) + else: + # Report collection failures here to avoid failing to run some test + # specified in the command line because the module could not be + # imported (#134). + node.ihook.pytest_collectreport(report=rep) + + self.trace("matchnodes finished -> ", len(matching), "nodes") + self.trace.root.indent -= 1 + + if not matching: + report_arg = "::".join((str(argpath), *names)) + self._notfound.append((report_arg, col)) + continue + + # If __init__.py was the only file requested, then the matched + # node will be the corresponding Package (by default), and the + # first yielded item will be the __init__ Module itself, so + # just use that. If this special case isn't taken, then all the + # files in the package will be yielded. + if argpath.basename == "__init__.py" and isinstance( + matching[0], Package + ): + try: + yield next(iter(matching[0].collect())) + except StopIteration: + # The package collects nothing with only an __init__.py + # file in it, which gets ignored by the default + # "python_files" option. + pass + continue + + yield from matching + + self.trace.root.indent -= 1 + + def genitems( + self, node: Union[nodes.Item, nodes.Collector] + ) -> Iterator[nodes.Item]: self.trace("genitems", node) if isinstance(node, nodes.Item): node.ihook.pytest_itemcollected(item=node) @@ -808,69 +808,69 @@ class Session(nodes.FSCollector): rep = collect_one_node(node) if rep.passed: for subnode in rep.result: - yield from self.genitems(subnode) + yield from self.genitems(subnode) node.ihook.pytest_collectreport(report=rep) - - -def search_pypath(module_name: str) -> str: - """Search sys.path for the given a dotted module name, and return its file system path.""" - try: - spec = importlib.util.find_spec(module_name) - # AttributeError: looks like package module, but actually filename - # ImportError: module does not exist - # ValueError: not a module name - except (AttributeError, ImportError, ValueError): - return module_name - if spec is None or spec.origin is None or spec.origin == "namespace": - return module_name - elif spec.submodule_search_locations: - return os.path.dirname(spec.origin) - else: - return spec.origin - - -def resolve_collection_argument( - invocation_path: Path, arg: str, *, as_pypath: bool = False -) -> Tuple[py.path.local, List[str]]: - """Parse path arguments optionally containing selection parts and return (fspath, names). - - Command-line arguments can point to files and/or directories, and optionally contain - parts for specific tests selection, for example: - - "pkg/tests/test_foo.py::TestClass::test_foo" - - This function ensures the path exists, and returns a tuple: - - (py.path.path("/full/path/to/pkg/tests/test_foo.py"), ["TestClass", "test_foo"]) - - When as_pypath is True, expects that the command-line argument actually contains - module paths instead of file-system paths: - - "pkg.tests.test_foo::TestClass::test_foo" - - In which case we search sys.path for a matching module, and then return the *path* to the - found module. - - If the path doesn't exist, raise UsageError. - If the path is a directory and selection parts are present, raise UsageError. - """ - strpath, *parts = str(arg).split("::") - if as_pypath: - strpath = search_pypath(strpath) - fspath = invocation_path / strpath - fspath = absolutepath(fspath) - if not fspath.exists(): - msg = ( - "module or package not found: {arg} (missing __init__.py?)" - if as_pypath - else "file or directory not found: {arg}" - ) - raise UsageError(msg.format(arg=arg)) - if parts and fspath.is_dir(): - msg = ( - "package argument cannot contain :: selection parts: {arg}" - if as_pypath - else "directory argument cannot contain :: selection parts: {arg}" - ) - raise UsageError(msg.format(arg=arg)) - return py.path.local(str(fspath)), parts + + +def search_pypath(module_name: str) -> str: + """Search sys.path for the given a dotted module name, and return its file system path.""" + try: + spec = importlib.util.find_spec(module_name) + # AttributeError: looks like package module, but actually filename + # ImportError: module does not exist + # ValueError: not a module name + except (AttributeError, ImportError, ValueError): + return module_name + if spec is None or spec.origin is None or spec.origin == "namespace": + return module_name + elif spec.submodule_search_locations: + return os.path.dirname(spec.origin) + else: + return spec.origin + + +def resolve_collection_argument( + invocation_path: Path, arg: str, *, as_pypath: bool = False +) -> Tuple[py.path.local, List[str]]: + """Parse path arguments optionally containing selection parts and return (fspath, names). + + Command-line arguments can point to files and/or directories, and optionally contain + parts for specific tests selection, for example: + + "pkg/tests/test_foo.py::TestClass::test_foo" + + This function ensures the path exists, and returns a tuple: + + (py.path.path("/full/path/to/pkg/tests/test_foo.py"), ["TestClass", "test_foo"]) + + When as_pypath is True, expects that the command-line argument actually contains + module paths instead of file-system paths: + + "pkg.tests.test_foo::TestClass::test_foo" + + In which case we search sys.path for a matching module, and then return the *path* to the + found module. + + If the path doesn't exist, raise UsageError. + If the path is a directory and selection parts are present, raise UsageError. + """ + strpath, *parts = str(arg).split("::") + if as_pypath: + strpath = search_pypath(strpath) + fspath = invocation_path / strpath + fspath = absolutepath(fspath) + if not fspath.exists(): + msg = ( + "module or package not found: {arg} (missing __init__.py?)" + if as_pypath + else "file or directory not found: {arg}" + ) + raise UsageError(msg.format(arg=arg)) + if parts and fspath.is_dir(): + msg = ( + "package argument cannot contain :: selection parts: {arg}" + if as_pypath + else "directory argument cannot contain :: selection parts: {arg}" + ) + raise UsageError(msg.format(arg=arg)) + return py.path.local(str(fspath)), parts |