diff options
author | arcadia-devtools <arcadia-devtools@yandex-team.ru> | 2022-02-09 12:00:52 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 15:58:17 +0300 |
commit | 8e1413fed79d1e8036e65228af6c93399ccf5502 (patch) | |
tree | 502c9df7b2614d20541c7a2d39d390e9a51877cc /contrib/python/pytest/py3/_pytest/main.py | |
parent | 6b813c17d56d1d05f92c61ddc347d0e4d358fe85 (diff) | |
download | ydb-8e1413fed79d1e8036e65228af6c93399ccf5502.tar.gz |
intermediate changes
ref:614ed510ddd3cdf86a8c5dbf19afd113397e0172
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/main.py')
-rw-r--r-- | contrib/python/pytest/py3/_pytest/main.py | 737 |
1 files changed, 464 insertions, 273 deletions
diff --git a/contrib/python/pytest/py3/_pytest/main.py b/contrib/python/pytest/py3/_pytest/main.py index 61eb7ca74c..41a33d4494 100644 --- a/contrib/python/pytest/py3/_pytest/main.py +++ b/contrib/python/pytest/py3/_pytest/main.py @@ -1,16 +1,23 @@ -""" core implementation of testing process: init, session, runtest loop. """ +"""Core implementation of the testing process: init, session, runtest loop.""" +import argparse import fnmatch import functools import importlib import os import sys +from pathlib import Path from typing import Callable from typing import Dict from typing import FrozenSet +from typing import Iterator from typing import List from typing import Optional +from typing import overload from typing import Sequence +from typing import Set from typing import Tuple +from typing import Type +from typing import TYPE_CHECKING from typing import Union import attr @@ -18,32 +25,45 @@ import py import _pytest._code from _pytest import nodes -from _pytest.compat import TYPE_CHECKING +from _pytest.compat import final from _pytest.config import Config from _pytest.config import directory_arg from _pytest.config import ExitCode from _pytest.config import hookimpl +from _pytest.config import PytestPluginManager from _pytest.config import UsageError +from _pytest.config.argparsing import Parser from _pytest.fixtures import FixtureManager from _pytest.outcomes import exit +from _pytest.pathlib import absolutepath +from _pytest.pathlib import bestrelpath +from _pytest.pathlib import visit from _pytest.reports import CollectReport +from _pytest.reports import TestReport from _pytest.runner import collect_one_node from _pytest.runner import SetupState if TYPE_CHECKING: - from typing import Type from typing_extensions import Literal - from _pytest.python import Package - -def pytest_addoption(parser): +def pytest_addoption(parser: Parser) -> None: parser.addini( "norecursedirs", "directory patterns to avoid for recursion", type="args", - default=[".*", "build", "dist", "CVS", "_darcs", "{arch}", "*.egg", "venv"], + default=[ + "*.egg", + ".*", + "_darcs", + "build", + "CVS", + "dist", + "node_modules", + "venv", + "{arch}", + ], ) parser.addini( "testpaths", @@ -61,6 +81,20 @@ def pytest_addoption(parser): const=1, help="exit instantly on first error or failed test.", ) + group = parser.getgroup("pytest-warnings") + group.addoption( + "-W", + "--pythonwarnings", + action="append", + help="set which warnings to report, see -W option of python itself.", + ) + parser.addini( + "filterwarnings", + type="linelist", + help="Each line specifies a pattern for " + "warnings.filterwarnings. " + "Processed after -W/--pythonwarnings.", + ) group._addoption( "--maxfail", metavar="num", @@ -71,12 +105,19 @@ def pytest_addoption(parser): help="exit after first num failures or errors.", ) group._addoption( + "--strict-config", + action="store_true", + help="any warnings encountered while parsing the `pytest` section of the configuration file raise errors.", + ) + group._addoption( "--strict-markers", - "--strict", action="store_true", help="markers not registered in the `markers` section of the configuration file raise errors.", ) group._addoption( + "--strict", action="store_true", help="(deprecated) alias to --strict-markers.", + ) + group._addoption( "-c", metavar="file", type=str, @@ -161,12 +202,21 @@ def pytest_addoption(parser): default=False, help="Don't ignore tests in a local virtualenv directory", ) + group.addoption( + "--import-mode", + default="prepend", + choices=["prepend", "append", "importlib"], + dest="importmode", + help="prepend/append to sys.path when importing test modules and conftest files, " + "default is to prepend.", + ) group = parser.getgroup("debugconfig", "test session debugging and configuration") group.addoption( "--basetemp", dest="basetemp", default=None, + type=validate_basetemp, metavar="dir", help=( "base temporary directory for this test run." @@ -175,10 +225,38 @@ def pytest_addoption(parser): ) +def validate_basetemp(path: str) -> str: + # GH 7119 + msg = "basetemp must not be empty, the current working directory or any parent directory of it" + + # empty path + if not path: + raise argparse.ArgumentTypeError(msg) + + def is_ancestor(base: Path, query: Path) -> bool: + """Return whether query is an ancestor of base.""" + if base == query: + return True + for parent in base.parents: + if parent == query: + return True + return False + + # check if path is an ancestor of cwd + if is_ancestor(Path.cwd(), Path(path).absolute()): + raise argparse.ArgumentTypeError(msg) + + # check symlinks for ancestors + if is_ancestor(Path.cwd().resolve(), Path(path).resolve()): + raise argparse.ArgumentTypeError(msg) + + return path + + def wrap_session( config: Config, doit: Callable[[Config, "Session"], Optional[Union[int, ExitCode]]] ) -> Union[int, ExitCode]: - """Skeleton command line program""" + """Skeleton command line program.""" session = Session.from_config(config) session.exitstatus = ExitCode.OK initstate = 0 @@ -196,17 +274,15 @@ def wrap_session( session.exitstatus = ExitCode.TESTS_FAILED except (KeyboardInterrupt, exit.Exception): excinfo = _pytest._code.ExceptionInfo.from_current() - exitstatus = ExitCode.INTERRUPTED # type: Union[int, ExitCode] + exitstatus: Union[int, ExitCode] = ExitCode.INTERRUPTED if isinstance(excinfo.value, exit.Exception): if excinfo.value.returncode is not None: exitstatus = excinfo.value.returncode if initstate < 2: - sys.stderr.write( - "{}: {}\n".format(excinfo.typename, excinfo.value.msg) - ) + sys.stderr.write(f"{excinfo.typename}: {excinfo.value.msg}\n") config.hook.pytest_keyboard_interrupt(excinfo=excinfo) session.exitstatus = exitstatus - except: # noqa + except BaseException: session.exitstatus = ExitCode.INTERNAL_ERROR excinfo = _pytest._code.ExceptionInfo.from_current() try: @@ -216,7 +292,7 @@ def wrap_session( session.exitstatus = exc.returncode sys.stderr.write("{}: {}\n".format(type(exc).__name__, exc)) else: - if excinfo.errisinstance(SystemExit): + if isinstance(excinfo.value, SystemExit): sys.stderr.write("mainloop: caught unexpected SystemExit!\n") finally: @@ -236,13 +312,13 @@ def wrap_session( return session.exitstatus -def pytest_cmdline_main(config): +def pytest_cmdline_main(config: Config) -> Union[int, ExitCode]: return wrap_session(config, _main) def _main(config: Config, session: "Session") -> Optional[Union[int, ExitCode]]: - """ default command line protocol for initialization, session, - running tests and reporting. """ + """Default command line protocol for initialization, session, + running tests and reporting.""" config.hook.pytest_collection(session=session) config.hook.pytest_runtestloop(session=session) @@ -253,11 +329,11 @@ def _main(config: Config, session: "Session") -> Optional[Union[int, ExitCode]]: return None -def pytest_collection(session): - return session.perform_collect() +def pytest_collection(session: "Session") -> None: + session.perform_collect() -def pytest_runtestloop(session): +def pytest_runtestloop(session: "Session") -> bool: if session.testsfailed and not session.config.option.continue_on_collection_errors: raise session.Interrupted( "%d error%s during collection" @@ -277,9 +353,9 @@ def pytest_runtestloop(session): return True -def _in_venv(path): - """Attempts to detect if ``path`` is the root of a Virtual Environment by - checking for the existence of the appropriate activate script""" +def _in_venv(path: py.path.local) -> bool: + """Attempt to detect if ``path`` is the root of a Virtual Environment by + checking for the existence of the appropriate activate script.""" bindir = path.join("Scripts" if sys.platform.startswith("win") else "bin") if not bindir.isdir(): return False @@ -294,9 +370,7 @@ def _in_venv(path): return any([fname.basename in activates for fname in bindir.listdir()]) -def pytest_ignore_collect( - path: py.path.local, config: Config -) -> "Optional[Literal[True]]": +def pytest_ignore_collect(path: py.path.local, config: Config) -> Optional[bool]: ignore_paths = config._getconftest_pathlist("collect_ignore", path=path.dirpath()) ignore_paths = ignore_paths or [] excludeopt = config.getoption("ignore") @@ -323,7 +397,7 @@ def pytest_ignore_collect( return None -def pytest_collection_modifyitems(items, config): +def pytest_collection_modifyitems(items: List[nodes.Item], config: Config) -> None: deselect_prefixes = tuple(config.getoption("deselect") or []) if not deselect_prefixes: return @@ -341,76 +415,69 @@ def pytest_collection_modifyitems(items, config): items[:] = remaining -class NoMatch(Exception): - """ raised if matching cannot locate a matching names. """ +class FSHookProxy: + def __init__(self, pm: PytestPluginManager, remove_mods) -> None: + self.pm = pm + self.remove_mods = remove_mods + + def __getattr__(self, name: str): + x = self.pm.subset_hook_caller(name, remove_plugins=self.remove_mods) + self.__dict__[name] = x + return x class Interrupted(KeyboardInterrupt): - """ signals an interrupted test run. """ + """Signals that the test run was interrupted.""" - __module__ = "builtins" # for py3 + __module__ = "builtins" # For py3. class Failed(Exception): - """ signals a stop as failed test run. """ + """Signals a stop as failed test run.""" @attr.s -class _bestrelpath_cache(dict): - path = attr.ib(type=py.path.local) +class _bestrelpath_cache(Dict[Path, str]): + path = attr.ib(type=Path) - def __missing__(self, path: py.path.local) -> str: - r = self.path.bestrelpath(path) # type: str + def __missing__(self, path: Path) -> str: + r = bestrelpath(self.path, path) self[path] = r return r +@final class Session(nodes.FSCollector): Interrupted = Interrupted Failed = Failed # Set on the session by runner.pytest_sessionstart. - _setupstate = None # type: SetupState + _setupstate: SetupState # Set on the session by fixtures.pytest_sessionstart. - _fixturemanager = None # type: FixtureManager - exitstatus = None # type: Union[int, ExitCode] + _fixturemanager: FixtureManager + exitstatus: Union[int, ExitCode] def __init__(self, config: Config) -> None: - nodes.FSCollector.__init__( - self, config.rootdir, parent=None, config=config, session=self, nodeid="" + super().__init__( + config.rootdir, parent=None, config=config, session=self, nodeid="" ) self.testsfailed = 0 self.testscollected = 0 - self.shouldstop = False - self.shouldfail = False + self.shouldstop: Union[bool, str] = False + self.shouldfail: Union[bool, str] = False self.trace = config.trace.root.get("collection") self.startdir = config.invocation_dir - self._initialpaths = frozenset() # type: FrozenSet[py.path.local] - - # Keep track of any collected nodes in here, so we don't duplicate fixtures - self._collection_node_cache1 = ( - {} - ) # type: Dict[py.path.local, Sequence[nodes.Collector]] - self._collection_node_cache2 = ( - {} - ) # type: Dict[Tuple[Type[nodes.Collector], py.path.local], nodes.Collector] - self._collection_node_cache3 = ( - {} - ) # type: Dict[Tuple[Type[nodes.Collector], str], CollectReport] - - # Dirnames of pkgs with dunder-init files. - self._collection_pkg_roots = {} # type: Dict[py.path.local, Package] + self._initialpaths: FrozenSet[py.path.local] = frozenset() - self._bestrelpathcache = _bestrelpath_cache( - config.rootdir - ) # type: Dict[py.path.local, str] + self._bestrelpathcache: Dict[Path, str] = _bestrelpath_cache(config.rootpath) self.config.pluginmanager.register(self, name="session") @classmethod - def from_config(cls, config): - return cls._create(config) + def from_config(cls, config: Config) -> "Session": + session: Session = cls._create(config) + return session - def __repr__(self): + def __repr__(self) -> str: return "<%s %s exitstatus=%r testsfailed=%d testscollected=%d>" % ( self.__class__.__name__, self.name, @@ -419,19 +486,21 @@ class Session(nodes.FSCollector): self.testscollected, ) - def _node_location_to_relpath(self, node_path: py.path.local) -> str: - # bestrelpath is a quite slow function + def _node_location_to_relpath(self, node_path: Path) -> str: + # bestrelpath is a quite slow function. return self._bestrelpathcache[node_path] @hookimpl(tryfirst=True) - def pytest_collectstart(self): + def pytest_collectstart(self) -> None: if self.shouldfail: raise self.Failed(self.shouldfail) if self.shouldstop: raise self.Interrupted(self.shouldstop) @hookimpl(tryfirst=True) - def pytest_runtest_logreport(self, report): + def pytest_runtest_logreport( + self, report: Union[TestReport, CollectReport] + ) -> None: if report.failed and not hasattr(report, "wasxfail"): self.testsfailed += 1 maxfail = self.config.getvalue("maxfail") @@ -440,238 +509,296 @@ class Session(nodes.FSCollector): pytest_collectreport = pytest_runtest_logreport - def isinitpath(self, path): + def isinitpath(self, path: py.path.local) -> bool: return path in self._initialpaths def gethookproxy(self, fspath: py.path.local): - return super()._gethookproxy(fspath) + # Check if we have the common case of running + # hooks with all conftest.py files. + pm = self.config.pluginmanager + my_conftestmodules = pm._getconftestmodules( + fspath, self.config.getoption("importmode") + ) + remove_mods = pm._conftest_plugins.difference(my_conftestmodules) + if remove_mods: + # One or more conftests are not in use at this fspath. + proxy = FSHookProxy(pm, remove_mods) + else: + # All plugins are active for this fspath. + proxy = self.config.hook + return proxy + + def _recurse(self, direntry: "os.DirEntry[str]") -> bool: + if direntry.name == "__pycache__": + return False + path = py.path.local(direntry.path) + ihook = self.gethookproxy(path.dirpath()) + if ihook.pytest_ignore_collect(path=path, config=self.config): + return False + norecursepatterns = self.config.getini("norecursedirs") + if any(path.check(fnmatch=pat) for pat in norecursepatterns): + return False + return True + + def _collectfile( + self, path: py.path.local, handle_dupes: bool = True + ) -> Sequence[nodes.Collector]: + assert ( + path.isfile() + ), "{!r} is not a file (isdir={!r}, exists={!r}, islink={!r})".format( + path, path.isdir(), path.exists(), path.islink() + ) + ihook = self.gethookproxy(path) + if not self.isinitpath(path): + if ihook.pytest_ignore_collect(path=path, config=self.config): + return () + + if handle_dupes: + keepduplicates = self.config.getoption("keepduplicates") + if not keepduplicates: + duplicate_paths = self.config.pluginmanager._duplicatepaths + if path in duplicate_paths: + return () + else: + duplicate_paths.add(path) + + return ihook.pytest_collect_file(path=path, parent=self) # type: ignore[no-any-return] + + @overload + def perform_collect( + self, args: Optional[Sequence[str]] = ..., genitems: "Literal[True]" = ... + ) -> Sequence[nodes.Item]: + ... + + @overload + def perform_collect( + self, args: Optional[Sequence[str]] = ..., genitems: bool = ... + ) -> Sequence[Union[nodes.Item, nodes.Collector]]: + ... + + def perform_collect( + self, args: Optional[Sequence[str]] = None, genitems: bool = True + ) -> Sequence[Union[nodes.Item, nodes.Collector]]: + """Perform the collection phase for this session. + + This is called by the default + :func:`pytest_collection <_pytest.hookspec.pytest_collection>` hook + implementation; see the documentation of this hook for more details. + For testing purposes, it may also be called directly on a fresh + ``Session``. + + This function normally recursively expands any collectors collected + from the session to their items, and only items are returned. For + testing purposes, this may be suppressed by passing ``genitems=False``, + in which case the return value contains these collectors unexpanded, + and ``session.items`` is empty. + """ + if args is None: + args = self.config.args + + self.trace("perform_collect", self, args) + self.trace.root.indent += 1 + + self._notfound: List[Tuple[str, Sequence[nodes.Collector]]] = [] + self._initial_parts: List[Tuple[py.path.local, List[str]]] = [] + self.items: List[nodes.Item] = [] - def perform_collect(self, args=None, genitems=True): hook = self.config.hook + + items: Sequence[Union[nodes.Item, nodes.Collector]] = self.items try: - items = self._perform_collect(args, genitems) + initialpaths: List[py.path.local] = [] + for arg in args: + fspath, parts = resolve_collection_argument( + self.config.invocation_params.dir, + arg, + as_pypath=self.config.option.pyargs, + ) + self._initial_parts.append((fspath, parts)) + initialpaths.append(fspath) + self._initialpaths = frozenset(initialpaths) + rep = collect_one_node(self) + self.ihook.pytest_collectreport(report=rep) + self.trace.root.indent -= 1 + if self._notfound: + errors = [] + for arg, cols in self._notfound: + line = f"(no name {arg!r} in any of {cols!r})" + errors.append(f"not found: {arg}\n{line}") + raise UsageError(*errors) + if not genitems: + items = rep.result + else: + if rep.passed: + for node in rep.result: + self.items.extend(self.genitems(node)) + self.config.pluginmanager.check_pending() hook.pytest_collection_modifyitems( session=self, config=self.config, items=items ) finally: hook.pytest_collection_finish(session=self) + self.testscollected = len(items) return items - def _perform_collect(self, args, genitems): - if args is None: - args = self.config.args - self.trace("perform_collect", self, args) - self.trace.root.indent += 1 - self._notfound = [] - initialpaths = [] # type: List[py.path.local] - self._initial_parts = [] # type: List[Tuple[py.path.local, List[str]]] - self.items = items = [] - for arg in args: - fspath, parts = self._parsearg(arg) - self._initial_parts.append((fspath, parts)) - initialpaths.append(fspath) - self._initialpaths = frozenset(initialpaths) - rep = collect_one_node(self) - self.ihook.pytest_collectreport(report=rep) - self.trace.root.indent -= 1 - if self._notfound: - errors = [] - for arg, exc in self._notfound: - line = "(no name {!r} in any of {!r})".format(arg, exc.args[0]) - errors.append("not found: {}\n{}".format(arg, line)) - raise UsageError(*errors) - if not genitems: - return rep.result - else: - if rep.passed: - for node in rep.result: - self.items.extend(self.genitems(node)) - return items + def collect(self) -> Iterator[Union[nodes.Item, nodes.Collector]]: + from _pytest.python import Package - def collect(self): - for fspath, parts in self._initial_parts: - self.trace("processing argument", (fspath, parts)) - self.trace.root.indent += 1 - try: - yield from self._collect(fspath, parts) - except NoMatch as exc: - report_arg = "::".join((str(fspath), *parts)) - # we are inside a make_report hook so - # we cannot directly pass through the exception - self._notfound.append((report_arg, exc)) + # Keep track of any collected nodes in here, so we don't duplicate fixtures. + node_cache1: Dict[py.path.local, Sequence[nodes.Collector]] = {} + node_cache2: Dict[ + Tuple[Type[nodes.Collector], py.path.local], nodes.Collector + ] = ({}) - self.trace.root.indent -= 1 - self._collection_node_cache1.clear() - self._collection_node_cache2.clear() - self._collection_node_cache3.clear() - self._collection_pkg_roots.clear() + # Keep track of any collected collectors in matchnodes paths, so they + # are not collected more than once. + matchnodes_cache: Dict[Tuple[Type[nodes.Collector], str], CollectReport] = ({}) - def _collect(self, argpath, names): - from _pytest.python import Package + # Dirnames of pkgs with dunder-init files. + pkg_roots: Dict[str, Package] = {} + + for argpath, names in self._initial_parts: + self.trace("processing argument", (argpath, names)) + self.trace.root.indent += 1 - # Start with a Session root, and delve to argpath item (dir or file) - # and stack all Packages found on the way. - # No point in finding packages when collecting doctests - if not self.config.getoption("doctestmodules", False): - pm = self.config.pluginmanager - for parent in reversed(argpath.parts()): - if pm._confcutdir and pm._confcutdir.relto(parent): - break - - if parent.isdir(): - pkginit = parent.join("__init__.py") - if pkginit.isfile(): - if pkginit not in self._collection_node_cache1: + # Start with a Session root, and delve to argpath item (dir or file) + # and stack all Packages found on the way. + # No point in finding packages when collecting doctests. + if not self.config.getoption("doctestmodules", False): + pm = self.config.pluginmanager + for parent in reversed(argpath.parts()): + if pm._confcutdir and pm._confcutdir.relto(parent): + break + + if parent.isdir(): + pkginit = parent.join("__init__.py") + if pkginit.isfile() and pkginit not in node_cache1: col = self._collectfile(pkginit, handle_dupes=False) if col: if isinstance(col[0], Package): - self._collection_pkg_roots[parent] = col[0] - # always store a list in the cache, matchnodes expects it - self._collection_node_cache1[col[0].fspath] = [col[0]] - - # If it's a directory argument, recurse and look for any Subpackages. - # Let the Package collector deal with subnodes, don't collect here. - if argpath.check(dir=1): - assert not names, "invalid arg {!r}".format((argpath, names)) - - seen_dirs = set() - for path in argpath.visit( - fil=self._visit_filter, rec=self._recurse, bf=True, sort=True - ): - dirpath = path.dirpath() - if dirpath not in seen_dirs: - # Collect packages first. - seen_dirs.add(dirpath) - pkginit = dirpath.join("__init__.py") - if pkginit.exists(): - for x in self._collectfile(pkginit): + pkg_roots[str(parent)] = col[0] + node_cache1[col[0].fspath] = [col[0]] + + # If it's a directory argument, recurse and look for any Subpackages. + # Let the Package collector deal with subnodes, don't collect here. + if argpath.check(dir=1): + assert not names, "invalid arg {!r}".format((argpath, names)) + + seen_dirs: Set[py.path.local] = set() + for direntry in visit(str(argpath), self._recurse): + if not direntry.is_file(): + continue + + path = py.path.local(direntry.path) + dirpath = path.dirpath() + + if dirpath not in seen_dirs: + # Collect packages first. + seen_dirs.add(dirpath) + pkginit = dirpath.join("__init__.py") + if pkginit.exists(): + for x in self._collectfile(pkginit): + yield x + if isinstance(x, Package): + pkg_roots[str(dirpath)] = x + if str(dirpath) in pkg_roots: + # Do not collect packages here. + continue + + for x in self._collectfile(path): + key = (type(x), x.fspath) + if key in node_cache2: + yield node_cache2[key] + else: + node_cache2[key] = x yield x - if isinstance(x, Package): - self._collection_pkg_roots[dirpath] = x - if dirpath in self._collection_pkg_roots: - # Do not collect packages here. + else: + assert argpath.check(file=1) + + if argpath in node_cache1: + col = node_cache1[argpath] + else: + collect_root = pkg_roots.get(argpath.dirname, self) + col = collect_root._collectfile(argpath, handle_dupes=False) + if col: + node_cache1[argpath] = col + + matching = [] + work: List[ + Tuple[Sequence[Union[nodes.Item, nodes.Collector]], Sequence[str]] + ] = [(col, names)] + while work: + self.trace("matchnodes", col, names) + self.trace.root.indent += 1 + + matchnodes, matchnames = work.pop() + for node in matchnodes: + if not matchnames: + matching.append(node) + continue + if not isinstance(node, nodes.Collector): + continue + key = (type(node), node.nodeid) + if key in matchnodes_cache: + rep = matchnodes_cache[key] + else: + rep = collect_one_node(node) + matchnodes_cache[key] = rep + if rep.passed: + submatchnodes = [] + for r in rep.result: + # TODO: Remove parametrized workaround once collection structure contains + # parametrization. + if ( + r.name == matchnames[0] + or r.name.split("[")[0] == matchnames[0] + ): + submatchnodes.append(r) + if submatchnodes: + work.append((submatchnodes, matchnames[1:])) + # XXX Accept IDs that don't have "()" for class instances. + elif len(rep.result) == 1 and rep.result[0].name == "()": + work.append((rep.result, matchnames)) + else: + # Report collection failures here to avoid failing to run some test + # specified in the command line because the module could not be + # imported (#134). + node.ihook.pytest_collectreport(report=rep) + + self.trace("matchnodes finished -> ", len(matching), "nodes") + self.trace.root.indent -= 1 + + if not matching: + report_arg = "::".join((str(argpath), *names)) + self._notfound.append((report_arg, col)) continue - for x in self._collectfile(path): - key = (type(x), x.fspath) - if key in self._collection_node_cache2: - yield self._collection_node_cache2[key] - else: - self._collection_node_cache2[key] = x - yield x - else: - assert argpath.check(file=1) + # If __init__.py was the only file requested, then the matched + # node will be the corresponding Package (by default), and the + # first yielded item will be the __init__ Module itself, so + # just use that. If this special case isn't taken, then all the + # files in the package will be yielded. + if argpath.basename == "__init__.py" and isinstance( + matching[0], Package + ): + try: + yield next(iter(matching[0].collect())) + except StopIteration: + # The package collects nothing with only an __init__.py + # file in it, which gets ignored by the default + # "python_files" option. + pass + continue - if argpath in self._collection_node_cache1: - col = self._collection_node_cache1[argpath] - else: - collect_root = self._collection_pkg_roots.get(argpath.dirname, self) - col = collect_root._collectfile(argpath, handle_dupes=False) - if col: - self._collection_node_cache1[argpath] = col - m = self.matchnodes(col, names) - # If __init__.py was the only file requested, then the matched node will be - # the corresponding Package, and the first yielded item will be the __init__ - # Module itself, so just use that. If this special case isn't taken, then all - # the files in the package will be yielded. - if argpath.basename == "__init__.py": - try: - yield next(m[0].collect()) - except StopIteration: - # The package collects nothing with only an __init__.py - # file in it, which gets ignored by the default - # "python_files" option. - pass - return - yield from m - - @staticmethod - def _visit_filter(f): - return f.check(file=1) - - def _tryconvertpyarg(self, x): - """Convert a dotted module name to path.""" - try: - spec = importlib.util.find_spec(x) - # AttributeError: looks like package module, but actually filename - # ImportError: module does not exist - # ValueError: not a module name - except (AttributeError, ImportError, ValueError): - return x - if spec is None or spec.origin in {None, "namespace"}: - return x - elif spec.submodule_search_locations: - return os.path.dirname(spec.origin) - else: - return spec.origin - - def _parsearg(self, arg): - """ return (fspath, names) tuple after checking the file exists. """ - strpath, *parts = str(arg).split("::") - if self.config.option.pyargs: - strpath = self._tryconvertpyarg(strpath) - relpath = strpath.replace("/", os.sep) - fspath = self.config.invocation_dir.join(relpath, abs=True) - if not fspath.check(): - if self.config.option.pyargs: - raise UsageError( - "file or package not found: " + arg + " (missing __init__.py?)" - ) - raise UsageError("file not found: " + arg) - fspath = fspath.realpath() - return (fspath, parts) + yield from matching - def matchnodes(self, matching, names): - self.trace("matchnodes", matching, names) - self.trace.root.indent += 1 - nodes = self._matchnodes(matching, names) - num = len(nodes) - self.trace("matchnodes finished -> ", num, "nodes") - self.trace.root.indent -= 1 - if num == 0: - raise NoMatch(matching, names[:1]) - return nodes - - def _matchnodes(self, matching, names): - if not matching or not names: - return matching - name = names[0] - assert name - nextnames = names[1:] - resultnodes = [] - for node in matching: - if isinstance(node, nodes.Item): - if not names: - resultnodes.append(node) - continue - assert isinstance(node, nodes.Collector) - key = (type(node), node.nodeid) - if key in self._collection_node_cache3: - rep = self._collection_node_cache3[key] - else: - rep = collect_one_node(node) - self._collection_node_cache3[key] = rep - if rep.passed: - has_matched = False - for x in rep.result: - # TODO: remove parametrized workaround once collection structure contains parametrization - if x.name == name or x.name.split("[")[0] == name: - resultnodes.extend(self.matchnodes([x], nextnames)) - has_matched = True - # XXX accept IDs that don't have "()" for class instances - if not has_matched and len(rep.result) == 1 and x.name == "()": - nextnames.insert(0, name) - resultnodes.extend(self.matchnodes([x], nextnames)) - else: - # report collection failures here to avoid failing to run some test - # specified in the command line because the module could not be - # imported (#134) - node.ihook.pytest_collectreport(report=rep) - return resultnodes + self.trace.root.indent -= 1 - def genitems(self, node): + def genitems( + self, node: Union[nodes.Item, nodes.Collector] + ) -> Iterator[nodes.Item]: self.trace("genitems", node) if isinstance(node, nodes.Item): node.ihook.pytest_itemcollected(item=node) @@ -683,3 +810,67 @@ class Session(nodes.FSCollector): for subnode in rep.result: yield from self.genitems(subnode) node.ihook.pytest_collectreport(report=rep) + + +def search_pypath(module_name: str) -> str: + """Search sys.path for the given a dotted module name, and return its file system path.""" + try: + spec = importlib.util.find_spec(module_name) + # AttributeError: looks like package module, but actually filename + # ImportError: module does not exist + # ValueError: not a module name + except (AttributeError, ImportError, ValueError): + return module_name + if spec is None or spec.origin is None or spec.origin == "namespace": + return module_name + elif spec.submodule_search_locations: + return os.path.dirname(spec.origin) + else: + return spec.origin + + +def resolve_collection_argument( + invocation_path: Path, arg: str, *, as_pypath: bool = False +) -> Tuple[py.path.local, List[str]]: + """Parse path arguments optionally containing selection parts and return (fspath, names). + + Command-line arguments can point to files and/or directories, and optionally contain + parts for specific tests selection, for example: + + "pkg/tests/test_foo.py::TestClass::test_foo" + + This function ensures the path exists, and returns a tuple: + + (py.path.path("/full/path/to/pkg/tests/test_foo.py"), ["TestClass", "test_foo"]) + + When as_pypath is True, expects that the command-line argument actually contains + module paths instead of file-system paths: + + "pkg.tests.test_foo::TestClass::test_foo" + + In which case we search sys.path for a matching module, and then return the *path* to the + found module. + + If the path doesn't exist, raise UsageError. + If the path is a directory and selection parts are present, raise UsageError. + """ + strpath, *parts = str(arg).split("::") + if as_pypath: + strpath = search_pypath(strpath) + fspath = invocation_path / strpath + fspath = absolutepath(fspath) + if not fspath.exists(): + msg = ( + "module or package not found: {arg} (missing __init__.py?)" + if as_pypath + else "file or directory not found: {arg}" + ) + raise UsageError(msg.format(arg=arg)) + if parts and fspath.is_dir(): + msg = ( + "package argument cannot contain :: selection parts: {arg}" + if as_pypath + else "directory argument cannot contain :: selection parts: {arg}" + ) + raise UsageError(msg.format(arg=arg)) + return py.path.local(str(fspath)), parts |