aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest/py3/_pytest/main.py
diff options
context:
space:
mode:
authorarcadia-devtools <arcadia-devtools@yandex-team.ru>2022-02-09 12:00:52 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 15:58:17 +0300
commit8e1413fed79d1e8036e65228af6c93399ccf5502 (patch)
tree502c9df7b2614d20541c7a2d39d390e9a51877cc /contrib/python/pytest/py3/_pytest/main.py
parent6b813c17d56d1d05f92c61ddc347d0e4d358fe85 (diff)
downloadydb-8e1413fed79d1e8036e65228af6c93399ccf5502.tar.gz
intermediate changes
ref:614ed510ddd3cdf86a8c5dbf19afd113397e0172
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/main.py')
-rw-r--r--contrib/python/pytest/py3/_pytest/main.py737
1 files changed, 464 insertions, 273 deletions
diff --git a/contrib/python/pytest/py3/_pytest/main.py b/contrib/python/pytest/py3/_pytest/main.py
index 61eb7ca74c..41a33d4494 100644
--- a/contrib/python/pytest/py3/_pytest/main.py
+++ b/contrib/python/pytest/py3/_pytest/main.py
@@ -1,16 +1,23 @@
-""" core implementation of testing process: init, session, runtest loop. """
+"""Core implementation of the testing process: init, session, runtest loop."""
+import argparse
import fnmatch
import functools
import importlib
import os
import sys
+from pathlib import Path
from typing import Callable
from typing import Dict
from typing import FrozenSet
+from typing import Iterator
from typing import List
from typing import Optional
+from typing import overload
from typing import Sequence
+from typing import Set
from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
from typing import Union
import attr
@@ -18,32 +25,45 @@ import py
import _pytest._code
from _pytest import nodes
-from _pytest.compat import TYPE_CHECKING
+from _pytest.compat import final
from _pytest.config import Config
from _pytest.config import directory_arg
from _pytest.config import ExitCode
from _pytest.config import hookimpl
+from _pytest.config import PytestPluginManager
from _pytest.config import UsageError
+from _pytest.config.argparsing import Parser
from _pytest.fixtures import FixtureManager
from _pytest.outcomes import exit
+from _pytest.pathlib import absolutepath
+from _pytest.pathlib import bestrelpath
+from _pytest.pathlib import visit
from _pytest.reports import CollectReport
+from _pytest.reports import TestReport
from _pytest.runner import collect_one_node
from _pytest.runner import SetupState
if TYPE_CHECKING:
- from typing import Type
from typing_extensions import Literal
- from _pytest.python import Package
-
-def pytest_addoption(parser):
+def pytest_addoption(parser: Parser) -> None:
parser.addini(
"norecursedirs",
"directory patterns to avoid for recursion",
type="args",
- default=[".*", "build", "dist", "CVS", "_darcs", "{arch}", "*.egg", "venv"],
+ default=[
+ "*.egg",
+ ".*",
+ "_darcs",
+ "build",
+ "CVS",
+ "dist",
+ "node_modules",
+ "venv",
+ "{arch}",
+ ],
)
parser.addini(
"testpaths",
@@ -61,6 +81,20 @@ def pytest_addoption(parser):
const=1,
help="exit instantly on first error or failed test.",
)
+ group = parser.getgroup("pytest-warnings")
+ group.addoption(
+ "-W",
+ "--pythonwarnings",
+ action="append",
+ help="set which warnings to report, see -W option of python itself.",
+ )
+ parser.addini(
+ "filterwarnings",
+ type="linelist",
+ help="Each line specifies a pattern for "
+ "warnings.filterwarnings. "
+ "Processed after -W/--pythonwarnings.",
+ )
group._addoption(
"--maxfail",
metavar="num",
@@ -71,12 +105,19 @@ def pytest_addoption(parser):
help="exit after first num failures or errors.",
)
group._addoption(
+ "--strict-config",
+ action="store_true",
+ help="any warnings encountered while parsing the `pytest` section of the configuration file raise errors.",
+ )
+ group._addoption(
"--strict-markers",
- "--strict",
action="store_true",
help="markers not registered in the `markers` section of the configuration file raise errors.",
)
group._addoption(
+ "--strict", action="store_true", help="(deprecated) alias to --strict-markers.",
+ )
+ group._addoption(
"-c",
metavar="file",
type=str,
@@ -161,12 +202,21 @@ def pytest_addoption(parser):
default=False,
help="Don't ignore tests in a local virtualenv directory",
)
+ group.addoption(
+ "--import-mode",
+ default="prepend",
+ choices=["prepend", "append", "importlib"],
+ dest="importmode",
+ help="prepend/append to sys.path when importing test modules and conftest files, "
+ "default is to prepend.",
+ )
group = parser.getgroup("debugconfig", "test session debugging and configuration")
group.addoption(
"--basetemp",
dest="basetemp",
default=None,
+ type=validate_basetemp,
metavar="dir",
help=(
"base temporary directory for this test run."
@@ -175,10 +225,38 @@ def pytest_addoption(parser):
)
+def validate_basetemp(path: str) -> str:
+ # GH 7119
+ msg = "basetemp must not be empty, the current working directory or any parent directory of it"
+
+ # empty path
+ if not path:
+ raise argparse.ArgumentTypeError(msg)
+
+ def is_ancestor(base: Path, query: Path) -> bool:
+ """Return whether query is an ancestor of base."""
+ if base == query:
+ return True
+ for parent in base.parents:
+ if parent == query:
+ return True
+ return False
+
+ # check if path is an ancestor of cwd
+ if is_ancestor(Path.cwd(), Path(path).absolute()):
+ raise argparse.ArgumentTypeError(msg)
+
+ # check symlinks for ancestors
+ if is_ancestor(Path.cwd().resolve(), Path(path).resolve()):
+ raise argparse.ArgumentTypeError(msg)
+
+ return path
+
+
def wrap_session(
config: Config, doit: Callable[[Config, "Session"], Optional[Union[int, ExitCode]]]
) -> Union[int, ExitCode]:
- """Skeleton command line program"""
+ """Skeleton command line program."""
session = Session.from_config(config)
session.exitstatus = ExitCode.OK
initstate = 0
@@ -196,17 +274,15 @@ def wrap_session(
session.exitstatus = ExitCode.TESTS_FAILED
except (KeyboardInterrupt, exit.Exception):
excinfo = _pytest._code.ExceptionInfo.from_current()
- exitstatus = ExitCode.INTERRUPTED # type: Union[int, ExitCode]
+ exitstatus: Union[int, ExitCode] = ExitCode.INTERRUPTED
if isinstance(excinfo.value, exit.Exception):
if excinfo.value.returncode is not None:
exitstatus = excinfo.value.returncode
if initstate < 2:
- sys.stderr.write(
- "{}: {}\n".format(excinfo.typename, excinfo.value.msg)
- )
+ sys.stderr.write(f"{excinfo.typename}: {excinfo.value.msg}\n")
config.hook.pytest_keyboard_interrupt(excinfo=excinfo)
session.exitstatus = exitstatus
- except: # noqa
+ except BaseException:
session.exitstatus = ExitCode.INTERNAL_ERROR
excinfo = _pytest._code.ExceptionInfo.from_current()
try:
@@ -216,7 +292,7 @@ def wrap_session(
session.exitstatus = exc.returncode
sys.stderr.write("{}: {}\n".format(type(exc).__name__, exc))
else:
- if excinfo.errisinstance(SystemExit):
+ if isinstance(excinfo.value, SystemExit):
sys.stderr.write("mainloop: caught unexpected SystemExit!\n")
finally:
@@ -236,13 +312,13 @@ def wrap_session(
return session.exitstatus
-def pytest_cmdline_main(config):
+def pytest_cmdline_main(config: Config) -> Union[int, ExitCode]:
return wrap_session(config, _main)
def _main(config: Config, session: "Session") -> Optional[Union[int, ExitCode]]:
- """ default command line protocol for initialization, session,
- running tests and reporting. """
+ """Default command line protocol for initialization, session,
+ running tests and reporting."""
config.hook.pytest_collection(session=session)
config.hook.pytest_runtestloop(session=session)
@@ -253,11 +329,11 @@ def _main(config: Config, session: "Session") -> Optional[Union[int, ExitCode]]:
return None
-def pytest_collection(session):
- return session.perform_collect()
+def pytest_collection(session: "Session") -> None:
+ session.perform_collect()
-def pytest_runtestloop(session):
+def pytest_runtestloop(session: "Session") -> bool:
if session.testsfailed and not session.config.option.continue_on_collection_errors:
raise session.Interrupted(
"%d error%s during collection"
@@ -277,9 +353,9 @@ def pytest_runtestloop(session):
return True
-def _in_venv(path):
- """Attempts to detect if ``path`` is the root of a Virtual Environment by
- checking for the existence of the appropriate activate script"""
+def _in_venv(path: py.path.local) -> bool:
+ """Attempt to detect if ``path`` is the root of a Virtual Environment by
+ checking for the existence of the appropriate activate script."""
bindir = path.join("Scripts" if sys.platform.startswith("win") else "bin")
if not bindir.isdir():
return False
@@ -294,9 +370,7 @@ def _in_venv(path):
return any([fname.basename in activates for fname in bindir.listdir()])
-def pytest_ignore_collect(
- path: py.path.local, config: Config
-) -> "Optional[Literal[True]]":
+def pytest_ignore_collect(path: py.path.local, config: Config) -> Optional[bool]:
ignore_paths = config._getconftest_pathlist("collect_ignore", path=path.dirpath())
ignore_paths = ignore_paths or []
excludeopt = config.getoption("ignore")
@@ -323,7 +397,7 @@ def pytest_ignore_collect(
return None
-def pytest_collection_modifyitems(items, config):
+def pytest_collection_modifyitems(items: List[nodes.Item], config: Config) -> None:
deselect_prefixes = tuple(config.getoption("deselect") or [])
if not deselect_prefixes:
return
@@ -341,76 +415,69 @@ def pytest_collection_modifyitems(items, config):
items[:] = remaining
-class NoMatch(Exception):
- """ raised if matching cannot locate a matching names. """
+class FSHookProxy:
+ def __init__(self, pm: PytestPluginManager, remove_mods) -> None:
+ self.pm = pm
+ self.remove_mods = remove_mods
+
+ def __getattr__(self, name: str):
+ x = self.pm.subset_hook_caller(name, remove_plugins=self.remove_mods)
+ self.__dict__[name] = x
+ return x
class Interrupted(KeyboardInterrupt):
- """ signals an interrupted test run. """
+ """Signals that the test run was interrupted."""
- __module__ = "builtins" # for py3
+ __module__ = "builtins" # For py3.
class Failed(Exception):
- """ signals a stop as failed test run. """
+ """Signals a stop as failed test run."""
@attr.s
-class _bestrelpath_cache(dict):
- path = attr.ib(type=py.path.local)
+class _bestrelpath_cache(Dict[Path, str]):
+ path = attr.ib(type=Path)
- def __missing__(self, path: py.path.local) -> str:
- r = self.path.bestrelpath(path) # type: str
+ def __missing__(self, path: Path) -> str:
+ r = bestrelpath(self.path, path)
self[path] = r
return r
+@final
class Session(nodes.FSCollector):
Interrupted = Interrupted
Failed = Failed
# Set on the session by runner.pytest_sessionstart.
- _setupstate = None # type: SetupState
+ _setupstate: SetupState
# Set on the session by fixtures.pytest_sessionstart.
- _fixturemanager = None # type: FixtureManager
- exitstatus = None # type: Union[int, ExitCode]
+ _fixturemanager: FixtureManager
+ exitstatus: Union[int, ExitCode]
def __init__(self, config: Config) -> None:
- nodes.FSCollector.__init__(
- self, config.rootdir, parent=None, config=config, session=self, nodeid=""
+ super().__init__(
+ config.rootdir, parent=None, config=config, session=self, nodeid=""
)
self.testsfailed = 0
self.testscollected = 0
- self.shouldstop = False
- self.shouldfail = False
+ self.shouldstop: Union[bool, str] = False
+ self.shouldfail: Union[bool, str] = False
self.trace = config.trace.root.get("collection")
self.startdir = config.invocation_dir
- self._initialpaths = frozenset() # type: FrozenSet[py.path.local]
-
- # Keep track of any collected nodes in here, so we don't duplicate fixtures
- self._collection_node_cache1 = (
- {}
- ) # type: Dict[py.path.local, Sequence[nodes.Collector]]
- self._collection_node_cache2 = (
- {}
- ) # type: Dict[Tuple[Type[nodes.Collector], py.path.local], nodes.Collector]
- self._collection_node_cache3 = (
- {}
- ) # type: Dict[Tuple[Type[nodes.Collector], str], CollectReport]
-
- # Dirnames of pkgs with dunder-init files.
- self._collection_pkg_roots = {} # type: Dict[py.path.local, Package]
+ self._initialpaths: FrozenSet[py.path.local] = frozenset()
- self._bestrelpathcache = _bestrelpath_cache(
- config.rootdir
- ) # type: Dict[py.path.local, str]
+ self._bestrelpathcache: Dict[Path, str] = _bestrelpath_cache(config.rootpath)
self.config.pluginmanager.register(self, name="session")
@classmethod
- def from_config(cls, config):
- return cls._create(config)
+ def from_config(cls, config: Config) -> "Session":
+ session: Session = cls._create(config)
+ return session
- def __repr__(self):
+ def __repr__(self) -> str:
return "<%s %s exitstatus=%r testsfailed=%d testscollected=%d>" % (
self.__class__.__name__,
self.name,
@@ -419,19 +486,21 @@ class Session(nodes.FSCollector):
self.testscollected,
)
- def _node_location_to_relpath(self, node_path: py.path.local) -> str:
- # bestrelpath is a quite slow function
+ def _node_location_to_relpath(self, node_path: Path) -> str:
+ # bestrelpath is a quite slow function.
return self._bestrelpathcache[node_path]
@hookimpl(tryfirst=True)
- def pytest_collectstart(self):
+ def pytest_collectstart(self) -> None:
if self.shouldfail:
raise self.Failed(self.shouldfail)
if self.shouldstop:
raise self.Interrupted(self.shouldstop)
@hookimpl(tryfirst=True)
- def pytest_runtest_logreport(self, report):
+ def pytest_runtest_logreport(
+ self, report: Union[TestReport, CollectReport]
+ ) -> None:
if report.failed and not hasattr(report, "wasxfail"):
self.testsfailed += 1
maxfail = self.config.getvalue("maxfail")
@@ -440,238 +509,296 @@ class Session(nodes.FSCollector):
pytest_collectreport = pytest_runtest_logreport
- def isinitpath(self, path):
+ def isinitpath(self, path: py.path.local) -> bool:
return path in self._initialpaths
def gethookproxy(self, fspath: py.path.local):
- return super()._gethookproxy(fspath)
+ # Check if we have the common case of running
+ # hooks with all conftest.py files.
+ pm = self.config.pluginmanager
+ my_conftestmodules = pm._getconftestmodules(
+ fspath, self.config.getoption("importmode")
+ )
+ remove_mods = pm._conftest_plugins.difference(my_conftestmodules)
+ if remove_mods:
+ # One or more conftests are not in use at this fspath.
+ proxy = FSHookProxy(pm, remove_mods)
+ else:
+ # All plugins are active for this fspath.
+ proxy = self.config.hook
+ return proxy
+
+ def _recurse(self, direntry: "os.DirEntry[str]") -> bool:
+ if direntry.name == "__pycache__":
+ return False
+ path = py.path.local(direntry.path)
+ ihook = self.gethookproxy(path.dirpath())
+ if ihook.pytest_ignore_collect(path=path, config=self.config):
+ return False
+ norecursepatterns = self.config.getini("norecursedirs")
+ if any(path.check(fnmatch=pat) for pat in norecursepatterns):
+ return False
+ return True
+
+ def _collectfile(
+ self, path: py.path.local, handle_dupes: bool = True
+ ) -> Sequence[nodes.Collector]:
+ assert (
+ path.isfile()
+ ), "{!r} is not a file (isdir={!r}, exists={!r}, islink={!r})".format(
+ path, path.isdir(), path.exists(), path.islink()
+ )
+ ihook = self.gethookproxy(path)
+ if not self.isinitpath(path):
+ if ihook.pytest_ignore_collect(path=path, config=self.config):
+ return ()
+
+ if handle_dupes:
+ keepduplicates = self.config.getoption("keepduplicates")
+ if not keepduplicates:
+ duplicate_paths = self.config.pluginmanager._duplicatepaths
+ if path in duplicate_paths:
+ return ()
+ else:
+ duplicate_paths.add(path)
+
+ return ihook.pytest_collect_file(path=path, parent=self) # type: ignore[no-any-return]
+
+ @overload
+ def perform_collect(
+ self, args: Optional[Sequence[str]] = ..., genitems: "Literal[True]" = ...
+ ) -> Sequence[nodes.Item]:
+ ...
+
+ @overload
+ def perform_collect(
+ self, args: Optional[Sequence[str]] = ..., genitems: bool = ...
+ ) -> Sequence[Union[nodes.Item, nodes.Collector]]:
+ ...
+
+ def perform_collect(
+ self, args: Optional[Sequence[str]] = None, genitems: bool = True
+ ) -> Sequence[Union[nodes.Item, nodes.Collector]]:
+ """Perform the collection phase for this session.
+
+ This is called by the default
+ :func:`pytest_collection <_pytest.hookspec.pytest_collection>` hook
+ implementation; see the documentation of this hook for more details.
+ For testing purposes, it may also be called directly on a fresh
+ ``Session``.
+
+ This function normally recursively expands any collectors collected
+ from the session to their items, and only items are returned. For
+ testing purposes, this may be suppressed by passing ``genitems=False``,
+ in which case the return value contains these collectors unexpanded,
+ and ``session.items`` is empty.
+ """
+ if args is None:
+ args = self.config.args
+
+ self.trace("perform_collect", self, args)
+ self.trace.root.indent += 1
+
+ self._notfound: List[Tuple[str, Sequence[nodes.Collector]]] = []
+ self._initial_parts: List[Tuple[py.path.local, List[str]]] = []
+ self.items: List[nodes.Item] = []
- def perform_collect(self, args=None, genitems=True):
hook = self.config.hook
+
+ items: Sequence[Union[nodes.Item, nodes.Collector]] = self.items
try:
- items = self._perform_collect(args, genitems)
+ initialpaths: List[py.path.local] = []
+ for arg in args:
+ fspath, parts = resolve_collection_argument(
+ self.config.invocation_params.dir,
+ arg,
+ as_pypath=self.config.option.pyargs,
+ )
+ self._initial_parts.append((fspath, parts))
+ initialpaths.append(fspath)
+ self._initialpaths = frozenset(initialpaths)
+ rep = collect_one_node(self)
+ self.ihook.pytest_collectreport(report=rep)
+ self.trace.root.indent -= 1
+ if self._notfound:
+ errors = []
+ for arg, cols in self._notfound:
+ line = f"(no name {arg!r} in any of {cols!r})"
+ errors.append(f"not found: {arg}\n{line}")
+ raise UsageError(*errors)
+ if not genitems:
+ items = rep.result
+ else:
+ if rep.passed:
+ for node in rep.result:
+ self.items.extend(self.genitems(node))
+
self.config.pluginmanager.check_pending()
hook.pytest_collection_modifyitems(
session=self, config=self.config, items=items
)
finally:
hook.pytest_collection_finish(session=self)
+
self.testscollected = len(items)
return items
- def _perform_collect(self, args, genitems):
- if args is None:
- args = self.config.args
- self.trace("perform_collect", self, args)
- self.trace.root.indent += 1
- self._notfound = []
- initialpaths = [] # type: List[py.path.local]
- self._initial_parts = [] # type: List[Tuple[py.path.local, List[str]]]
- self.items = items = []
- for arg in args:
- fspath, parts = self._parsearg(arg)
- self._initial_parts.append((fspath, parts))
- initialpaths.append(fspath)
- self._initialpaths = frozenset(initialpaths)
- rep = collect_one_node(self)
- self.ihook.pytest_collectreport(report=rep)
- self.trace.root.indent -= 1
- if self._notfound:
- errors = []
- for arg, exc in self._notfound:
- line = "(no name {!r} in any of {!r})".format(arg, exc.args[0])
- errors.append("not found: {}\n{}".format(arg, line))
- raise UsageError(*errors)
- if not genitems:
- return rep.result
- else:
- if rep.passed:
- for node in rep.result:
- self.items.extend(self.genitems(node))
- return items
+ def collect(self) -> Iterator[Union[nodes.Item, nodes.Collector]]:
+ from _pytest.python import Package
- def collect(self):
- for fspath, parts in self._initial_parts:
- self.trace("processing argument", (fspath, parts))
- self.trace.root.indent += 1
- try:
- yield from self._collect(fspath, parts)
- except NoMatch as exc:
- report_arg = "::".join((str(fspath), *parts))
- # we are inside a make_report hook so
- # we cannot directly pass through the exception
- self._notfound.append((report_arg, exc))
+ # Keep track of any collected nodes in here, so we don't duplicate fixtures.
+ node_cache1: Dict[py.path.local, Sequence[nodes.Collector]] = {}
+ node_cache2: Dict[
+ Tuple[Type[nodes.Collector], py.path.local], nodes.Collector
+ ] = ({})
- self.trace.root.indent -= 1
- self._collection_node_cache1.clear()
- self._collection_node_cache2.clear()
- self._collection_node_cache3.clear()
- self._collection_pkg_roots.clear()
+ # Keep track of any collected collectors in matchnodes paths, so they
+ # are not collected more than once.
+ matchnodes_cache: Dict[Tuple[Type[nodes.Collector], str], CollectReport] = ({})
- def _collect(self, argpath, names):
- from _pytest.python import Package
+ # Dirnames of pkgs with dunder-init files.
+ pkg_roots: Dict[str, Package] = {}
+
+ for argpath, names in self._initial_parts:
+ self.trace("processing argument", (argpath, names))
+ self.trace.root.indent += 1
- # Start with a Session root, and delve to argpath item (dir or file)
- # and stack all Packages found on the way.
- # No point in finding packages when collecting doctests
- if not self.config.getoption("doctestmodules", False):
- pm = self.config.pluginmanager
- for parent in reversed(argpath.parts()):
- if pm._confcutdir and pm._confcutdir.relto(parent):
- break
-
- if parent.isdir():
- pkginit = parent.join("__init__.py")
- if pkginit.isfile():
- if pkginit not in self._collection_node_cache1:
+ # Start with a Session root, and delve to argpath item (dir or file)
+ # and stack all Packages found on the way.
+ # No point in finding packages when collecting doctests.
+ if not self.config.getoption("doctestmodules", False):
+ pm = self.config.pluginmanager
+ for parent in reversed(argpath.parts()):
+ if pm._confcutdir and pm._confcutdir.relto(parent):
+ break
+
+ if parent.isdir():
+ pkginit = parent.join("__init__.py")
+ if pkginit.isfile() and pkginit not in node_cache1:
col = self._collectfile(pkginit, handle_dupes=False)
if col:
if isinstance(col[0], Package):
- self._collection_pkg_roots[parent] = col[0]
- # always store a list in the cache, matchnodes expects it
- self._collection_node_cache1[col[0].fspath] = [col[0]]
-
- # If it's a directory argument, recurse and look for any Subpackages.
- # Let the Package collector deal with subnodes, don't collect here.
- if argpath.check(dir=1):
- assert not names, "invalid arg {!r}".format((argpath, names))
-
- seen_dirs = set()
- for path in argpath.visit(
- fil=self._visit_filter, rec=self._recurse, bf=True, sort=True
- ):
- dirpath = path.dirpath()
- if dirpath not in seen_dirs:
- # Collect packages first.
- seen_dirs.add(dirpath)
- pkginit = dirpath.join("__init__.py")
- if pkginit.exists():
- for x in self._collectfile(pkginit):
+ pkg_roots[str(parent)] = col[0]
+ node_cache1[col[0].fspath] = [col[0]]
+
+ # If it's a directory argument, recurse and look for any Subpackages.
+ # Let the Package collector deal with subnodes, don't collect here.
+ if argpath.check(dir=1):
+ assert not names, "invalid arg {!r}".format((argpath, names))
+
+ seen_dirs: Set[py.path.local] = set()
+ for direntry in visit(str(argpath), self._recurse):
+ if not direntry.is_file():
+ continue
+
+ path = py.path.local(direntry.path)
+ dirpath = path.dirpath()
+
+ if dirpath not in seen_dirs:
+ # Collect packages first.
+ seen_dirs.add(dirpath)
+ pkginit = dirpath.join("__init__.py")
+ if pkginit.exists():
+ for x in self._collectfile(pkginit):
+ yield x
+ if isinstance(x, Package):
+ pkg_roots[str(dirpath)] = x
+ if str(dirpath) in pkg_roots:
+ # Do not collect packages here.
+ continue
+
+ for x in self._collectfile(path):
+ key = (type(x), x.fspath)
+ if key in node_cache2:
+ yield node_cache2[key]
+ else:
+ node_cache2[key] = x
yield x
- if isinstance(x, Package):
- self._collection_pkg_roots[dirpath] = x
- if dirpath in self._collection_pkg_roots:
- # Do not collect packages here.
+ else:
+ assert argpath.check(file=1)
+
+ if argpath in node_cache1:
+ col = node_cache1[argpath]
+ else:
+ collect_root = pkg_roots.get(argpath.dirname, self)
+ col = collect_root._collectfile(argpath, handle_dupes=False)
+ if col:
+ node_cache1[argpath] = col
+
+ matching = []
+ work: List[
+ Tuple[Sequence[Union[nodes.Item, nodes.Collector]], Sequence[str]]
+ ] = [(col, names)]
+ while work:
+ self.trace("matchnodes", col, names)
+ self.trace.root.indent += 1
+
+ matchnodes, matchnames = work.pop()
+ for node in matchnodes:
+ if not matchnames:
+ matching.append(node)
+ continue
+ if not isinstance(node, nodes.Collector):
+ continue
+ key = (type(node), node.nodeid)
+ if key in matchnodes_cache:
+ rep = matchnodes_cache[key]
+ else:
+ rep = collect_one_node(node)
+ matchnodes_cache[key] = rep
+ if rep.passed:
+ submatchnodes = []
+ for r in rep.result:
+ # TODO: Remove parametrized workaround once collection structure contains
+ # parametrization.
+ if (
+ r.name == matchnames[0]
+ or r.name.split("[")[0] == matchnames[0]
+ ):
+ submatchnodes.append(r)
+ if submatchnodes:
+ work.append((submatchnodes, matchnames[1:]))
+ # XXX Accept IDs that don't have "()" for class instances.
+ elif len(rep.result) == 1 and rep.result[0].name == "()":
+ work.append((rep.result, matchnames))
+ else:
+ # Report collection failures here to avoid failing to run some test
+ # specified in the command line because the module could not be
+ # imported (#134).
+ node.ihook.pytest_collectreport(report=rep)
+
+ self.trace("matchnodes finished -> ", len(matching), "nodes")
+ self.trace.root.indent -= 1
+
+ if not matching:
+ report_arg = "::".join((str(argpath), *names))
+ self._notfound.append((report_arg, col))
continue
- for x in self._collectfile(path):
- key = (type(x), x.fspath)
- if key in self._collection_node_cache2:
- yield self._collection_node_cache2[key]
- else:
- self._collection_node_cache2[key] = x
- yield x
- else:
- assert argpath.check(file=1)
+ # If __init__.py was the only file requested, then the matched
+ # node will be the corresponding Package (by default), and the
+ # first yielded item will be the __init__ Module itself, so
+ # just use that. If this special case isn't taken, then all the
+ # files in the package will be yielded.
+ if argpath.basename == "__init__.py" and isinstance(
+ matching[0], Package
+ ):
+ try:
+ yield next(iter(matching[0].collect()))
+ except StopIteration:
+ # The package collects nothing with only an __init__.py
+ # file in it, which gets ignored by the default
+ # "python_files" option.
+ pass
+ continue
- if argpath in self._collection_node_cache1:
- col = self._collection_node_cache1[argpath]
- else:
- collect_root = self._collection_pkg_roots.get(argpath.dirname, self)
- col = collect_root._collectfile(argpath, handle_dupes=False)
- if col:
- self._collection_node_cache1[argpath] = col
- m = self.matchnodes(col, names)
- # If __init__.py was the only file requested, then the matched node will be
- # the corresponding Package, and the first yielded item will be the __init__
- # Module itself, so just use that. If this special case isn't taken, then all
- # the files in the package will be yielded.
- if argpath.basename == "__init__.py":
- try:
- yield next(m[0].collect())
- except StopIteration:
- # The package collects nothing with only an __init__.py
- # file in it, which gets ignored by the default
- # "python_files" option.
- pass
- return
- yield from m
-
- @staticmethod
- def _visit_filter(f):
- return f.check(file=1)
-
- def _tryconvertpyarg(self, x):
- """Convert a dotted module name to path."""
- try:
- spec = importlib.util.find_spec(x)
- # AttributeError: looks like package module, but actually filename
- # ImportError: module does not exist
- # ValueError: not a module name
- except (AttributeError, ImportError, ValueError):
- return x
- if spec is None or spec.origin in {None, "namespace"}:
- return x
- elif spec.submodule_search_locations:
- return os.path.dirname(spec.origin)
- else:
- return spec.origin
-
- def _parsearg(self, arg):
- """ return (fspath, names) tuple after checking the file exists. """
- strpath, *parts = str(arg).split("::")
- if self.config.option.pyargs:
- strpath = self._tryconvertpyarg(strpath)
- relpath = strpath.replace("/", os.sep)
- fspath = self.config.invocation_dir.join(relpath, abs=True)
- if not fspath.check():
- if self.config.option.pyargs:
- raise UsageError(
- "file or package not found: " + arg + " (missing __init__.py?)"
- )
- raise UsageError("file not found: " + arg)
- fspath = fspath.realpath()
- return (fspath, parts)
+ yield from matching
- def matchnodes(self, matching, names):
- self.trace("matchnodes", matching, names)
- self.trace.root.indent += 1
- nodes = self._matchnodes(matching, names)
- num = len(nodes)
- self.trace("matchnodes finished -> ", num, "nodes")
- self.trace.root.indent -= 1
- if num == 0:
- raise NoMatch(matching, names[:1])
- return nodes
-
- def _matchnodes(self, matching, names):
- if not matching or not names:
- return matching
- name = names[0]
- assert name
- nextnames = names[1:]
- resultnodes = []
- for node in matching:
- if isinstance(node, nodes.Item):
- if not names:
- resultnodes.append(node)
- continue
- assert isinstance(node, nodes.Collector)
- key = (type(node), node.nodeid)
- if key in self._collection_node_cache3:
- rep = self._collection_node_cache3[key]
- else:
- rep = collect_one_node(node)
- self._collection_node_cache3[key] = rep
- if rep.passed:
- has_matched = False
- for x in rep.result:
- # TODO: remove parametrized workaround once collection structure contains parametrization
- if x.name == name or x.name.split("[")[0] == name:
- resultnodes.extend(self.matchnodes([x], nextnames))
- has_matched = True
- # XXX accept IDs that don't have "()" for class instances
- if not has_matched and len(rep.result) == 1 and x.name == "()":
- nextnames.insert(0, name)
- resultnodes.extend(self.matchnodes([x], nextnames))
- else:
- # report collection failures here to avoid failing to run some test
- # specified in the command line because the module could not be
- # imported (#134)
- node.ihook.pytest_collectreport(report=rep)
- return resultnodes
+ self.trace.root.indent -= 1
- def genitems(self, node):
+ def genitems(
+ self, node: Union[nodes.Item, nodes.Collector]
+ ) -> Iterator[nodes.Item]:
self.trace("genitems", node)
if isinstance(node, nodes.Item):
node.ihook.pytest_itemcollected(item=node)
@@ -683,3 +810,67 @@ class Session(nodes.FSCollector):
for subnode in rep.result:
yield from self.genitems(subnode)
node.ihook.pytest_collectreport(report=rep)
+
+
+def search_pypath(module_name: str) -> str:
+ """Search sys.path for the given a dotted module name, and return its file system path."""
+ try:
+ spec = importlib.util.find_spec(module_name)
+ # AttributeError: looks like package module, but actually filename
+ # ImportError: module does not exist
+ # ValueError: not a module name
+ except (AttributeError, ImportError, ValueError):
+ return module_name
+ if spec is None or spec.origin is None or spec.origin == "namespace":
+ return module_name
+ elif spec.submodule_search_locations:
+ return os.path.dirname(spec.origin)
+ else:
+ return spec.origin
+
+
+def resolve_collection_argument(
+ invocation_path: Path, arg: str, *, as_pypath: bool = False
+) -> Tuple[py.path.local, List[str]]:
+ """Parse path arguments optionally containing selection parts and return (fspath, names).
+
+ Command-line arguments can point to files and/or directories, and optionally contain
+ parts for specific tests selection, for example:
+
+ "pkg/tests/test_foo.py::TestClass::test_foo"
+
+ This function ensures the path exists, and returns a tuple:
+
+ (py.path.path("/full/path/to/pkg/tests/test_foo.py"), ["TestClass", "test_foo"])
+
+ When as_pypath is True, expects that the command-line argument actually contains
+ module paths instead of file-system paths:
+
+ "pkg.tests.test_foo::TestClass::test_foo"
+
+ In which case we search sys.path for a matching module, and then return the *path* to the
+ found module.
+
+ If the path doesn't exist, raise UsageError.
+ If the path is a directory and selection parts are present, raise UsageError.
+ """
+ strpath, *parts = str(arg).split("::")
+ if as_pypath:
+ strpath = search_pypath(strpath)
+ fspath = invocation_path / strpath
+ fspath = absolutepath(fspath)
+ if not fspath.exists():
+ msg = (
+ "module or package not found: {arg} (missing __init__.py?)"
+ if as_pypath
+ else "file or directory not found: {arg}"
+ )
+ raise UsageError(msg.format(arg=arg))
+ if parts and fspath.is_dir():
+ msg = (
+ "package argument cannot contain :: selection parts: {arg}"
+ if as_pypath
+ else "directory argument cannot contain :: selection parts: {arg}"
+ )
+ raise UsageError(msg.format(arg=arg))
+ return py.path.local(str(fspath)), parts