aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest/py3/_pytest/main.py
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.ru>2022-02-10 16:44:39 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:44:39 +0300
commite9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (patch)
tree64175d5cadab313b3e7039ebaa06c5bc3295e274 /contrib/python/pytest/py3/_pytest/main.py
parent2598ef1d0aee359b4b6d5fdd1758916d5907d04f (diff)
downloadydb-e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0.tar.gz
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/main.py')
-rw-r--r--contrib/python/pytest/py3/_pytest/main.py1148
1 files changed, 574 insertions, 574 deletions
diff --git a/contrib/python/pytest/py3/_pytest/main.py b/contrib/python/pytest/py3/_pytest/main.py
index 1d76ea62bf..41a33d4494 100644
--- a/contrib/python/pytest/py3/_pytest/main.py
+++ b/contrib/python/pytest/py3/_pytest/main.py
@@ -1,69 +1,69 @@
-"""Core implementation of the testing process: init, session, runtest loop."""
-import argparse
-import fnmatch
+"""Core implementation of the testing process: init, session, runtest loop."""
+import argparse
+import fnmatch
import functools
-import importlib
+import importlib
import os
import sys
-from pathlib import Path
-from typing import Callable
-from typing import Dict
-from typing import FrozenSet
-from typing import Iterator
-from typing import List
-from typing import Optional
-from typing import overload
-from typing import Sequence
-from typing import Set
-from typing import Tuple
-from typing import Type
-from typing import TYPE_CHECKING
-from typing import Union
+from pathlib import Path
+from typing import Callable
+from typing import Dict
+from typing import FrozenSet
+from typing import Iterator
+from typing import List
+from typing import Optional
+from typing import overload
+from typing import Sequence
+from typing import Set
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import Union
import attr
import py
import _pytest._code
from _pytest import nodes
-from _pytest.compat import final
-from _pytest.config import Config
+from _pytest.compat import final
+from _pytest.config import Config
from _pytest.config import directory_arg
-from _pytest.config import ExitCode
+from _pytest.config import ExitCode
from _pytest.config import hookimpl
-from _pytest.config import PytestPluginManager
+from _pytest.config import PytestPluginManager
from _pytest.config import UsageError
-from _pytest.config.argparsing import Parser
-from _pytest.fixtures import FixtureManager
+from _pytest.config.argparsing import Parser
+from _pytest.fixtures import FixtureManager
from _pytest.outcomes import exit
-from _pytest.pathlib import absolutepath
-from _pytest.pathlib import bestrelpath
-from _pytest.pathlib import visit
-from _pytest.reports import CollectReport
-from _pytest.reports import TestReport
+from _pytest.pathlib import absolutepath
+from _pytest.pathlib import bestrelpath
+from _pytest.pathlib import visit
+from _pytest.reports import CollectReport
+from _pytest.reports import TestReport
from _pytest.runner import collect_one_node
-from _pytest.runner import SetupState
+from _pytest.runner import SetupState
-if TYPE_CHECKING:
- from typing_extensions import Literal
+if TYPE_CHECKING:
+ from typing_extensions import Literal
-
-def pytest_addoption(parser: Parser) -> None:
+
+def pytest_addoption(parser: Parser) -> None:
parser.addini(
"norecursedirs",
"directory patterns to avoid for recursion",
type="args",
- default=[
- "*.egg",
- ".*",
- "_darcs",
- "build",
- "CVS",
- "dist",
- "node_modules",
- "venv",
- "{arch}",
- ],
+ default=[
+ "*.egg",
+ ".*",
+ "_darcs",
+ "build",
+ "CVS",
+ "dist",
+ "node_modules",
+ "venv",
+ "{arch}",
+ ],
)
parser.addini(
"testpaths",
@@ -80,21 +80,21 @@ def pytest_addoption(parser: Parser) -> None:
dest="maxfail",
const=1,
help="exit instantly on first error or failed test.",
- )
- group = parser.getgroup("pytest-warnings")
- group.addoption(
- "-W",
- "--pythonwarnings",
- action="append",
- help="set which warnings to report, see -W option of python itself.",
- )
- parser.addini(
- "filterwarnings",
- type="linelist",
- help="Each line specifies a pattern for "
- "warnings.filterwarnings. "
- "Processed after -W/--pythonwarnings.",
- )
+ )
+ group = parser.getgroup("pytest-warnings")
+ group.addoption(
+ "-W",
+ "--pythonwarnings",
+ action="append",
+ help="set which warnings to report, see -W option of python itself.",
+ )
+ parser.addini(
+ "filterwarnings",
+ type="linelist",
+ help="Each line specifies a pattern for "
+ "warnings.filterwarnings. "
+ "Processed after -W/--pythonwarnings.",
+ )
group._addoption(
"--maxfail",
metavar="num",
@@ -105,19 +105,19 @@ def pytest_addoption(parser: Parser) -> None:
help="exit after first num failures or errors.",
)
group._addoption(
- "--strict-config",
- action="store_true",
- help="any warnings encountered while parsing the `pytest` section of the configuration file raise errors.",
- )
- group._addoption(
- "--strict-markers",
+ "--strict-config",
action="store_true",
- help="markers not registered in the `markers` section of the configuration file raise errors.",
+ help="any warnings encountered while parsing the `pytest` section of the configuration file raise errors.",
+ )
+ group._addoption(
+ "--strict-markers",
+ action="store_true",
+ help="markers not registered in the `markers` section of the configuration file raise errors.",
+ )
+ group._addoption(
+ "--strict", action="store_true", help="(deprecated) alias to --strict-markers.",
)
group._addoption(
- "--strict", action="store_true", help="(deprecated) alias to --strict-markers.",
- )
- group._addoption(
"-c",
metavar="file",
type=str,
@@ -145,10 +145,10 @@ def pytest_addoption(parser: Parser) -> None:
group.addoption(
"--collectonly",
"--collect-only",
- "--co",
+ "--co",
action="store_true",
help="only collect tests, don't execute them.",
- )
+ )
group.addoption(
"--pyargs",
action="store_true",
@@ -161,16 +161,16 @@ def pytest_addoption(parser: Parser) -> None:
help="ignore path during collection (multi-allowed).",
)
group.addoption(
- "--ignore-glob",
- action="append",
- metavar="path",
- help="ignore path pattern during collection (multi-allowed).",
- )
- group.addoption(
+ "--ignore-glob",
+ action="append",
+ metavar="path",
+ help="ignore path pattern during collection (multi-allowed).",
+ )
+ group.addoption(
"--deselect",
action="append",
metavar="nodeid_prefix",
- help="deselect item (via node id prefix) during collection (multi-allowed).",
+ help="deselect item (via node id prefix) during collection (multi-allowed).",
)
group.addoption(
"--confcutdir",
@@ -202,21 +202,21 @@ def pytest_addoption(parser: Parser) -> None:
default=False,
help="Don't ignore tests in a local virtualenv directory",
)
- group.addoption(
- "--import-mode",
- default="prepend",
- choices=["prepend", "append", "importlib"],
- dest="importmode",
- help="prepend/append to sys.path when importing test modules and conftest files, "
- "default is to prepend.",
- )
+ group.addoption(
+ "--import-mode",
+ default="prepend",
+ choices=["prepend", "append", "importlib"],
+ dest="importmode",
+ help="prepend/append to sys.path when importing test modules and conftest files, "
+ "default is to prepend.",
+ )
group = parser.getgroup("debugconfig", "test session debugging and configuration")
group.addoption(
"--basetemp",
dest="basetemp",
default=None,
- type=validate_basetemp,
+ type=validate_basetemp,
metavar="dir",
help=(
"base temporary directory for this test run."
@@ -225,40 +225,40 @@ def pytest_addoption(parser: Parser) -> None:
)
-def validate_basetemp(path: str) -> str:
- # GH 7119
- msg = "basetemp must not be empty, the current working directory or any parent directory of it"
-
- # empty path
- if not path:
- raise argparse.ArgumentTypeError(msg)
-
- def is_ancestor(base: Path, query: Path) -> bool:
- """Return whether query is an ancestor of base."""
- if base == query:
- return True
- for parent in base.parents:
- if parent == query:
- return True
- return False
-
- # check if path is an ancestor of cwd
- if is_ancestor(Path.cwd(), Path(path).absolute()):
- raise argparse.ArgumentTypeError(msg)
-
- # check symlinks for ancestors
- if is_ancestor(Path.cwd().resolve(), Path(path).resolve()):
- raise argparse.ArgumentTypeError(msg)
-
- return path
-
-
-def wrap_session(
- config: Config, doit: Callable[[Config, "Session"], Optional[Union[int, ExitCode]]]
-) -> Union[int, ExitCode]:
- """Skeleton command line program."""
- session = Session.from_config(config)
- session.exitstatus = ExitCode.OK
+def validate_basetemp(path: str) -> str:
+ # GH 7119
+ msg = "basetemp must not be empty, the current working directory or any parent directory of it"
+
+ # empty path
+ if not path:
+ raise argparse.ArgumentTypeError(msg)
+
+ def is_ancestor(base: Path, query: Path) -> bool:
+ """Return whether query is an ancestor of base."""
+ if base == query:
+ return True
+ for parent in base.parents:
+ if parent == query:
+ return True
+ return False
+
+ # check if path is an ancestor of cwd
+ if is_ancestor(Path.cwd(), Path(path).absolute()):
+ raise argparse.ArgumentTypeError(msg)
+
+ # check symlinks for ancestors
+ if is_ancestor(Path.cwd().resolve(), Path(path).resolve()):
+ raise argparse.ArgumentTypeError(msg)
+
+ return path
+
+
+def wrap_session(
+ config: Config, doit: Callable[[Config, "Session"], Optional[Union[int, ExitCode]]]
+) -> Union[int, ExitCode]:
+ """Skeleton command line program."""
+ session = Session.from_config(config)
+ session.exitstatus = ExitCode.OK
initstate = 0
try:
try:
@@ -268,77 +268,77 @@ def wrap_session(
initstate = 2
session.exitstatus = doit(config, session) or 0
except UsageError:
- session.exitstatus = ExitCode.USAGE_ERROR
+ session.exitstatus = ExitCode.USAGE_ERROR
raise
except Failed:
- session.exitstatus = ExitCode.TESTS_FAILED
- except (KeyboardInterrupt, exit.Exception):
- excinfo = _pytest._code.ExceptionInfo.from_current()
- exitstatus: Union[int, ExitCode] = ExitCode.INTERRUPTED
- if isinstance(excinfo.value, exit.Exception):
+ session.exitstatus = ExitCode.TESTS_FAILED
+ except (KeyboardInterrupt, exit.Exception):
+ excinfo = _pytest._code.ExceptionInfo.from_current()
+ exitstatus: Union[int, ExitCode] = ExitCode.INTERRUPTED
+ if isinstance(excinfo.value, exit.Exception):
if excinfo.value.returncode is not None:
exitstatus = excinfo.value.returncode
- if initstate < 2:
- sys.stderr.write(f"{excinfo.typename}: {excinfo.value.msg}\n")
+ if initstate < 2:
+ sys.stderr.write(f"{excinfo.typename}: {excinfo.value.msg}\n")
config.hook.pytest_keyboard_interrupt(excinfo=excinfo)
session.exitstatus = exitstatus
- except BaseException:
- session.exitstatus = ExitCode.INTERNAL_ERROR
- excinfo = _pytest._code.ExceptionInfo.from_current()
- try:
- config.notify_exception(excinfo, config.option)
- except exit.Exception as exc:
- if exc.returncode is not None:
- session.exitstatus = exc.returncode
- sys.stderr.write("{}: {}\n".format(type(exc).__name__, exc))
- else:
- if isinstance(excinfo.value, SystemExit):
- sys.stderr.write("mainloop: caught unexpected SystemExit!\n")
+ except BaseException:
+ session.exitstatus = ExitCode.INTERNAL_ERROR
+ excinfo = _pytest._code.ExceptionInfo.from_current()
+ try:
+ config.notify_exception(excinfo, config.option)
+ except exit.Exception as exc:
+ if exc.returncode is not None:
+ session.exitstatus = exc.returncode
+ sys.stderr.write("{}: {}\n".format(type(exc).__name__, exc))
+ else:
+ if isinstance(excinfo.value, SystemExit):
+ sys.stderr.write("mainloop: caught unexpected SystemExit!\n")
finally:
- # Explicitly break reference cycle.
- excinfo = None # type: ignore
+ # Explicitly break reference cycle.
+ excinfo = None # type: ignore
session.startdir.chdir()
if initstate >= 2:
- try:
- config.hook.pytest_sessionfinish(
- session=session, exitstatus=session.exitstatus
- )
- except exit.Exception as exc:
- if exc.returncode is not None:
- session.exitstatus = exc.returncode
- sys.stderr.write("{}: {}\n".format(type(exc).__name__, exc))
+ try:
+ config.hook.pytest_sessionfinish(
+ session=session, exitstatus=session.exitstatus
+ )
+ except exit.Exception as exc:
+ if exc.returncode is not None:
+ session.exitstatus = exc.returncode
+ sys.stderr.write("{}: {}\n".format(type(exc).__name__, exc))
config._ensure_unconfigure()
return session.exitstatus
-def pytest_cmdline_main(config: Config) -> Union[int, ExitCode]:
+def pytest_cmdline_main(config: Config) -> Union[int, ExitCode]:
return wrap_session(config, _main)
-def _main(config: Config, session: "Session") -> Optional[Union[int, ExitCode]]:
- """Default command line protocol for initialization, session,
- running tests and reporting."""
+def _main(config: Config, session: "Session") -> Optional[Union[int, ExitCode]]:
+ """Default command line protocol for initialization, session,
+ running tests and reporting."""
config.hook.pytest_collection(session=session)
config.hook.pytest_runtestloop(session=session)
if session.testsfailed:
- return ExitCode.TESTS_FAILED
+ return ExitCode.TESTS_FAILED
elif session.testscollected == 0:
- return ExitCode.NO_TESTS_COLLECTED
- return None
+ return ExitCode.NO_TESTS_COLLECTED
+ return None
-def pytest_collection(session: "Session") -> None:
- session.perform_collect()
+def pytest_collection(session: "Session") -> None:
+ session.perform_collect()
-def pytest_runtestloop(session: "Session") -> bool:
+def pytest_runtestloop(session: "Session") -> bool:
if session.testsfailed and not session.config.option.continue_on_collection_errors:
- raise session.Interrupted(
- "%d error%s during collection"
- % (session.testsfailed, "s" if session.testsfailed != 1 else "")
- )
+ raise session.Interrupted(
+ "%d error%s during collection"
+ % (session.testsfailed, "s" if session.testsfailed != 1 else "")
+ )
if session.config.option.collectonly:
return True
@@ -353,9 +353,9 @@ def pytest_runtestloop(session: "Session") -> bool:
return True
-def _in_venv(path: py.path.local) -> bool:
- """Attempt to detect if ``path`` is the root of a Virtual Environment by
- checking for the existence of the appropriate activate script."""
+def _in_venv(path: py.path.local) -> bool:
+ """Attempt to detect if ``path`` is the root of a Virtual Environment by
+ checking for the existence of the appropriate activate script."""
bindir = path.join("Scripts" if sys.platform.startswith("win") else "bin")
if not bindir.isdir():
return False
@@ -370,7 +370,7 @@ def _in_venv(path: py.path.local) -> bool:
return any([fname.basename in activates for fname in bindir.listdir()])
-def pytest_ignore_collect(path: py.path.local, config: Config) -> Optional[bool]:
+def pytest_ignore_collect(path: py.path.local, config: Config) -> Optional[bool]:
ignore_paths = config._getconftest_pathlist("collect_ignore", path=path.dirpath())
ignore_paths = ignore_paths or []
excludeopt = config.getoption("ignore")
@@ -380,24 +380,24 @@ def pytest_ignore_collect(path: py.path.local, config: Config) -> Optional[bool]
if py.path.local(path) in ignore_paths:
return True
- ignore_globs = config._getconftest_pathlist(
- "collect_ignore_glob", path=path.dirpath()
- )
- ignore_globs = ignore_globs or []
- excludeglobopt = config.getoption("ignore_glob")
- if excludeglobopt:
- ignore_globs.extend([py.path.local(x) for x in excludeglobopt])
-
- if any(fnmatch.fnmatch(str(path), str(glob)) for glob in ignore_globs):
- return True
-
+ ignore_globs = config._getconftest_pathlist(
+ "collect_ignore_glob", path=path.dirpath()
+ )
+ ignore_globs = ignore_globs or []
+ excludeglobopt = config.getoption("ignore_glob")
+ if excludeglobopt:
+ ignore_globs.extend([py.path.local(x) for x in excludeglobopt])
+
+ if any(fnmatch.fnmatch(str(path), str(glob)) for glob in ignore_globs):
+ return True
+
allow_in_venv = config.getoption("collect_in_virtualenv")
if not allow_in_venv and _in_venv(path):
return True
- return None
+ return None
-def pytest_collection_modifyitems(items: List[nodes.Item], config: Config) -> None:
+def pytest_collection_modifyitems(items: List[nodes.Item], config: Config) -> None:
deselect_prefixes = tuple(config.getoption("deselect") or [])
if not deselect_prefixes:
return
@@ -415,92 +415,92 @@ def pytest_collection_modifyitems(items: List[nodes.Item], config: Config) -> No
items[:] = remaining
-class FSHookProxy:
- def __init__(self, pm: PytestPluginManager, remove_mods) -> None:
- self.pm = pm
- self.remove_mods = remove_mods
+class FSHookProxy:
+ def __init__(self, pm: PytestPluginManager, remove_mods) -> None:
+ self.pm = pm
+ self.remove_mods = remove_mods
+
+ def __getattr__(self, name: str):
+ x = self.pm.subset_hook_caller(name, remove_plugins=self.remove_mods)
+ self.__dict__[name] = x
+ return x
- def __getattr__(self, name: str):
- x = self.pm.subset_hook_caller(name, remove_plugins=self.remove_mods)
- self.__dict__[name] = x
- return x
-
class Interrupted(KeyboardInterrupt):
- """Signals that the test run was interrupted."""
+ """Signals that the test run was interrupted."""
- __module__ = "builtins" # For py3.
+ __module__ = "builtins" # For py3.
class Failed(Exception):
- """Signals a stop as failed test run."""
+ """Signals a stop as failed test run."""
@attr.s
-class _bestrelpath_cache(Dict[Path, str]):
- path = attr.ib(type=Path)
+class _bestrelpath_cache(Dict[Path, str]):
+ path = attr.ib(type=Path)
- def __missing__(self, path: Path) -> str:
- r = bestrelpath(self.path, path)
+ def __missing__(self, path: Path) -> str:
+ r = bestrelpath(self.path, path)
self[path] = r
return r
-@final
+@final
class Session(nodes.FSCollector):
Interrupted = Interrupted
Failed = Failed
- # Set on the session by runner.pytest_sessionstart.
- _setupstate: SetupState
- # Set on the session by fixtures.pytest_sessionstart.
- _fixturemanager: FixtureManager
- exitstatus: Union[int, ExitCode]
-
- def __init__(self, config: Config) -> None:
- super().__init__(
- config.rootdir, parent=None, config=config, session=self, nodeid=""
+ # Set on the session by runner.pytest_sessionstart.
+ _setupstate: SetupState
+ # Set on the session by fixtures.pytest_sessionstart.
+ _fixturemanager: FixtureManager
+ exitstatus: Union[int, ExitCode]
+
+ def __init__(self, config: Config) -> None:
+ super().__init__(
+ config.rootdir, parent=None, config=config, session=self, nodeid=""
)
self.testsfailed = 0
self.testscollected = 0
- self.shouldstop: Union[bool, str] = False
- self.shouldfail: Union[bool, str] = False
+ self.shouldstop: Union[bool, str] = False
+ self.shouldfail: Union[bool, str] = False
self.trace = config.trace.root.get("collection")
- self.startdir = config.invocation_dir
- self._initialpaths: FrozenSet[py.path.local] = frozenset()
-
- self._bestrelpathcache: Dict[Path, str] = _bestrelpath_cache(config.rootpath)
-
+ self.startdir = config.invocation_dir
+ self._initialpaths: FrozenSet[py.path.local] = frozenset()
+
+ self._bestrelpathcache: Dict[Path, str] = _bestrelpath_cache(config.rootpath)
+
self.config.pluginmanager.register(self, name="session")
- @classmethod
- def from_config(cls, config: Config) -> "Session":
- session: Session = cls._create(config)
- return session
-
- def __repr__(self) -> str:
- return "<%s %s exitstatus=%r testsfailed=%d testscollected=%d>" % (
- self.__class__.__name__,
- self.name,
- getattr(self, "exitstatus", "<UNSET>"),
- self.testsfailed,
- self.testscollected,
- )
-
- def _node_location_to_relpath(self, node_path: Path) -> str:
- # bestrelpath is a quite slow function.
+ @classmethod
+ def from_config(cls, config: Config) -> "Session":
+ session: Session = cls._create(config)
+ return session
+
+ def __repr__(self) -> str:
+ return "<%s %s exitstatus=%r testsfailed=%d testscollected=%d>" % (
+ self.__class__.__name__,
+ self.name,
+ getattr(self, "exitstatus", "<UNSET>"),
+ self.testsfailed,
+ self.testscollected,
+ )
+
+ def _node_location_to_relpath(self, node_path: Path) -> str:
+ # bestrelpath is a quite slow function.
return self._bestrelpathcache[node_path]
@hookimpl(tryfirst=True)
- def pytest_collectstart(self) -> None:
+ def pytest_collectstart(self) -> None:
if self.shouldfail:
raise self.Failed(self.shouldfail)
if self.shouldstop:
raise self.Interrupted(self.shouldstop)
@hookimpl(tryfirst=True)
- def pytest_runtest_logreport(
- self, report: Union[TestReport, CollectReport]
- ) -> None:
+ def pytest_runtest_logreport(
+ self, report: Union[TestReport, CollectReport]
+ ) -> None:
if report.failed and not hasattr(report, "wasxfail"):
self.testsfailed += 1
maxfail = self.config.getvalue("maxfail")
@@ -509,296 +509,296 @@ class Session(nodes.FSCollector):
pytest_collectreport = pytest_runtest_logreport
- def isinitpath(self, path: py.path.local) -> bool:
+ def isinitpath(self, path: py.path.local) -> bool:
return path in self._initialpaths
- def gethookproxy(self, fspath: py.path.local):
- # Check if we have the common case of running
- # hooks with all conftest.py files.
- pm = self.config.pluginmanager
- my_conftestmodules = pm._getconftestmodules(
- fspath, self.config.getoption("importmode")
- )
- remove_mods = pm._conftest_plugins.difference(my_conftestmodules)
- if remove_mods:
- # One or more conftests are not in use at this fspath.
- proxy = FSHookProxy(pm, remove_mods)
- else:
- # All plugins are active for this fspath.
- proxy = self.config.hook
- return proxy
-
- def _recurse(self, direntry: "os.DirEntry[str]") -> bool:
- if direntry.name == "__pycache__":
- return False
- path = py.path.local(direntry.path)
- ihook = self.gethookproxy(path.dirpath())
- if ihook.pytest_ignore_collect(path=path, config=self.config):
- return False
- norecursepatterns = self.config.getini("norecursedirs")
- if any(path.check(fnmatch=pat) for pat in norecursepatterns):
- return False
- return True
-
- def _collectfile(
- self, path: py.path.local, handle_dupes: bool = True
- ) -> Sequence[nodes.Collector]:
- assert (
- path.isfile()
- ), "{!r} is not a file (isdir={!r}, exists={!r}, islink={!r})".format(
- path, path.isdir(), path.exists(), path.islink()
- )
- ihook = self.gethookproxy(path)
- if not self.isinitpath(path):
- if ihook.pytest_ignore_collect(path=path, config=self.config):
- return ()
-
- if handle_dupes:
- keepduplicates = self.config.getoption("keepduplicates")
- if not keepduplicates:
- duplicate_paths = self.config.pluginmanager._duplicatepaths
- if path in duplicate_paths:
- return ()
- else:
- duplicate_paths.add(path)
-
- return ihook.pytest_collect_file(path=path, parent=self) # type: ignore[no-any-return]
-
- @overload
- def perform_collect(
- self, args: Optional[Sequence[str]] = ..., genitems: "Literal[True]" = ...
- ) -> Sequence[nodes.Item]:
- ...
-
- @overload
- def perform_collect(
- self, args: Optional[Sequence[str]] = ..., genitems: bool = ...
- ) -> Sequence[Union[nodes.Item, nodes.Collector]]:
- ...
-
- def perform_collect(
- self, args: Optional[Sequence[str]] = None, genitems: bool = True
- ) -> Sequence[Union[nodes.Item, nodes.Collector]]:
- """Perform the collection phase for this session.
-
- This is called by the default
- :func:`pytest_collection <_pytest.hookspec.pytest_collection>` hook
- implementation; see the documentation of this hook for more details.
- For testing purposes, it may also be called directly on a fresh
- ``Session``.
-
- This function normally recursively expands any collectors collected
- from the session to their items, and only items are returned. For
- testing purposes, this may be suppressed by passing ``genitems=False``,
- in which case the return value contains these collectors unexpanded,
- and ``session.items`` is empty.
- """
- if args is None:
- args = self.config.args
-
- self.trace("perform_collect", self, args)
- self.trace.root.indent += 1
-
- self._notfound: List[Tuple[str, Sequence[nodes.Collector]]] = []
- self._initial_parts: List[Tuple[py.path.local, List[str]]] = []
- self.items: List[nodes.Item] = []
-
+ def gethookproxy(self, fspath: py.path.local):
+ # Check if we have the common case of running
+ # hooks with all conftest.py files.
+ pm = self.config.pluginmanager
+ my_conftestmodules = pm._getconftestmodules(
+ fspath, self.config.getoption("importmode")
+ )
+ remove_mods = pm._conftest_plugins.difference(my_conftestmodules)
+ if remove_mods:
+ # One or more conftests are not in use at this fspath.
+ proxy = FSHookProxy(pm, remove_mods)
+ else:
+ # All plugins are active for this fspath.
+ proxy = self.config.hook
+ return proxy
+
+ def _recurse(self, direntry: "os.DirEntry[str]") -> bool:
+ if direntry.name == "__pycache__":
+ return False
+ path = py.path.local(direntry.path)
+ ihook = self.gethookproxy(path.dirpath())
+ if ihook.pytest_ignore_collect(path=path, config=self.config):
+ return False
+ norecursepatterns = self.config.getini("norecursedirs")
+ if any(path.check(fnmatch=pat) for pat in norecursepatterns):
+ return False
+ return True
+
+ def _collectfile(
+ self, path: py.path.local, handle_dupes: bool = True
+ ) -> Sequence[nodes.Collector]:
+ assert (
+ path.isfile()
+ ), "{!r} is not a file (isdir={!r}, exists={!r}, islink={!r})".format(
+ path, path.isdir(), path.exists(), path.islink()
+ )
+ ihook = self.gethookproxy(path)
+ if not self.isinitpath(path):
+ if ihook.pytest_ignore_collect(path=path, config=self.config):
+ return ()
+
+ if handle_dupes:
+ keepduplicates = self.config.getoption("keepduplicates")
+ if not keepduplicates:
+ duplicate_paths = self.config.pluginmanager._duplicatepaths
+ if path in duplicate_paths:
+ return ()
+ else:
+ duplicate_paths.add(path)
+
+ return ihook.pytest_collect_file(path=path, parent=self) # type: ignore[no-any-return]
+
+ @overload
+ def perform_collect(
+ self, args: Optional[Sequence[str]] = ..., genitems: "Literal[True]" = ...
+ ) -> Sequence[nodes.Item]:
+ ...
+
+ @overload
+ def perform_collect(
+ self, args: Optional[Sequence[str]] = ..., genitems: bool = ...
+ ) -> Sequence[Union[nodes.Item, nodes.Collector]]:
+ ...
+
+ def perform_collect(
+ self, args: Optional[Sequence[str]] = None, genitems: bool = True
+ ) -> Sequence[Union[nodes.Item, nodes.Collector]]:
+ """Perform the collection phase for this session.
+
+ This is called by the default
+ :func:`pytest_collection <_pytest.hookspec.pytest_collection>` hook
+ implementation; see the documentation of this hook for more details.
+ For testing purposes, it may also be called directly on a fresh
+ ``Session``.
+
+ This function normally recursively expands any collectors collected
+ from the session to their items, and only items are returned. For
+ testing purposes, this may be suppressed by passing ``genitems=False``,
+ in which case the return value contains these collectors unexpanded,
+ and ``session.items`` is empty.
+ """
+ if args is None:
+ args = self.config.args
+
+ self.trace("perform_collect", self, args)
+ self.trace.root.indent += 1
+
+ self._notfound: List[Tuple[str, Sequence[nodes.Collector]]] = []
+ self._initial_parts: List[Tuple[py.path.local, List[str]]] = []
+ self.items: List[nodes.Item] = []
+
hook = self.config.hook
-
- items: Sequence[Union[nodes.Item, nodes.Collector]] = self.items
+
+ items: Sequence[Union[nodes.Item, nodes.Collector]] = self.items
try:
- initialpaths: List[py.path.local] = []
- for arg in args:
- fspath, parts = resolve_collection_argument(
- self.config.invocation_params.dir,
- arg,
- as_pypath=self.config.option.pyargs,
- )
- self._initial_parts.append((fspath, parts))
- initialpaths.append(fspath)
- self._initialpaths = frozenset(initialpaths)
- rep = collect_one_node(self)
- self.ihook.pytest_collectreport(report=rep)
- self.trace.root.indent -= 1
- if self._notfound:
- errors = []
- for arg, cols in self._notfound:
- line = f"(no name {arg!r} in any of {cols!r})"
- errors.append(f"not found: {arg}\n{line}")
- raise UsageError(*errors)
- if not genitems:
- items = rep.result
- else:
- if rep.passed:
- for node in rep.result:
- self.items.extend(self.genitems(node))
-
+ initialpaths: List[py.path.local] = []
+ for arg in args:
+ fspath, parts = resolve_collection_argument(
+ self.config.invocation_params.dir,
+ arg,
+ as_pypath=self.config.option.pyargs,
+ )
+ self._initial_parts.append((fspath, parts))
+ initialpaths.append(fspath)
+ self._initialpaths = frozenset(initialpaths)
+ rep = collect_one_node(self)
+ self.ihook.pytest_collectreport(report=rep)
+ self.trace.root.indent -= 1
+ if self._notfound:
+ errors = []
+ for arg, cols in self._notfound:
+ line = f"(no name {arg!r} in any of {cols!r})"
+ errors.append(f"not found: {arg}\n{line}")
+ raise UsageError(*errors)
+ if not genitems:
+ items = rep.result
+ else:
+ if rep.passed:
+ for node in rep.result:
+ self.items.extend(self.genitems(node))
+
self.config.pluginmanager.check_pending()
hook.pytest_collection_modifyitems(
session=self, config=self.config, items=items
)
finally:
hook.pytest_collection_finish(session=self)
-
+
self.testscollected = len(items)
return items
- def collect(self) -> Iterator[Union[nodes.Item, nodes.Collector]]:
- from _pytest.python import Package
-
- # Keep track of any collected nodes in here, so we don't duplicate fixtures.
- node_cache1: Dict[py.path.local, Sequence[nodes.Collector]] = {}
- node_cache2: Dict[
- Tuple[Type[nodes.Collector], py.path.local], nodes.Collector
- ] = ({})
-
- # Keep track of any collected collectors in matchnodes paths, so they
- # are not collected more than once.
- matchnodes_cache: Dict[Tuple[Type[nodes.Collector], str], CollectReport] = ({})
-
- # Dirnames of pkgs with dunder-init files.
- pkg_roots: Dict[str, Package] = {}
-
- for argpath, names in self._initial_parts:
- self.trace("processing argument", (argpath, names))
- self.trace.root.indent += 1
-
- # Start with a Session root, and delve to argpath item (dir or file)
- # and stack all Packages found on the way.
- # No point in finding packages when collecting doctests.
- if not self.config.getoption("doctestmodules", False):
- pm = self.config.pluginmanager
- for parent in reversed(argpath.parts()):
- if pm._confcutdir and pm._confcutdir.relto(parent):
- break
-
- if parent.isdir():
- pkginit = parent.join("__init__.py")
- if pkginit.isfile() and pkginit not in node_cache1:
+ def collect(self) -> Iterator[Union[nodes.Item, nodes.Collector]]:
+ from _pytest.python import Package
+
+ # Keep track of any collected nodes in here, so we don't duplicate fixtures.
+ node_cache1: Dict[py.path.local, Sequence[nodes.Collector]] = {}
+ node_cache2: Dict[
+ Tuple[Type[nodes.Collector], py.path.local], nodes.Collector
+ ] = ({})
+
+ # Keep track of any collected collectors in matchnodes paths, so they
+ # are not collected more than once.
+ matchnodes_cache: Dict[Tuple[Type[nodes.Collector], str], CollectReport] = ({})
+
+ # Dirnames of pkgs with dunder-init files.
+ pkg_roots: Dict[str, Package] = {}
+
+ for argpath, names in self._initial_parts:
+ self.trace("processing argument", (argpath, names))
+ self.trace.root.indent += 1
+
+ # Start with a Session root, and delve to argpath item (dir or file)
+ # and stack all Packages found on the way.
+ # No point in finding packages when collecting doctests.
+ if not self.config.getoption("doctestmodules", False):
+ pm = self.config.pluginmanager
+ for parent in reversed(argpath.parts()):
+ if pm._confcutdir and pm._confcutdir.relto(parent):
+ break
+
+ if parent.isdir():
+ pkginit = parent.join("__init__.py")
+ if pkginit.isfile() and pkginit not in node_cache1:
col = self._collectfile(pkginit, handle_dupes=False)
if col:
if isinstance(col[0], Package):
- pkg_roots[str(parent)] = col[0]
- node_cache1[col[0].fspath] = [col[0]]
-
- # If it's a directory argument, recurse and look for any Subpackages.
- # Let the Package collector deal with subnodes, don't collect here.
- if argpath.check(dir=1):
- assert not names, "invalid arg {!r}".format((argpath, names))
-
- seen_dirs: Set[py.path.local] = set()
- for direntry in visit(str(argpath), self._recurse):
- if not direntry.is_file():
- continue
-
- path = py.path.local(direntry.path)
- dirpath = path.dirpath()
-
- if dirpath not in seen_dirs:
- # Collect packages first.
- seen_dirs.add(dirpath)
- pkginit = dirpath.join("__init__.py")
- if pkginit.exists():
- for x in self._collectfile(pkginit):
- yield x
- if isinstance(x, Package):
- pkg_roots[str(dirpath)] = x
- if str(dirpath) in pkg_roots:
- # Do not collect packages here.
- continue
-
- for x in self._collectfile(path):
- key = (type(x), x.fspath)
- if key in node_cache2:
- yield node_cache2[key]
- else:
- node_cache2[key] = x
- yield x
+ pkg_roots[str(parent)] = col[0]
+ node_cache1[col[0].fspath] = [col[0]]
+
+ # If it's a directory argument, recurse and look for any Subpackages.
+ # Let the Package collector deal with subnodes, don't collect here.
+ if argpath.check(dir=1):
+ assert not names, "invalid arg {!r}".format((argpath, names))
+
+ seen_dirs: Set[py.path.local] = set()
+ for direntry in visit(str(argpath), self._recurse):
+ if not direntry.is_file():
+ continue
+
+ path = py.path.local(direntry.path)
+ dirpath = path.dirpath()
+
+ if dirpath not in seen_dirs:
+ # Collect packages first.
+ seen_dirs.add(dirpath)
+ pkginit = dirpath.join("__init__.py")
+ if pkginit.exists():
+ for x in self._collectfile(pkginit):
+ yield x
+ if isinstance(x, Package):
+ pkg_roots[str(dirpath)] = x
+ if str(dirpath) in pkg_roots:
+ # Do not collect packages here.
+ continue
+
+ for x in self._collectfile(path):
+ key = (type(x), x.fspath)
+ if key in node_cache2:
+ yield node_cache2[key]
+ else:
+ node_cache2[key] = x
+ yield x
else:
- assert argpath.check(file=1)
-
- if argpath in node_cache1:
- col = node_cache1[argpath]
- else:
- collect_root = pkg_roots.get(argpath.dirname, self)
- col = collect_root._collectfile(argpath, handle_dupes=False)
- if col:
- node_cache1[argpath] = col
-
- matching = []
- work: List[
- Tuple[Sequence[Union[nodes.Item, nodes.Collector]], Sequence[str]]
- ] = [(col, names)]
- while work:
- self.trace("matchnodes", col, names)
- self.trace.root.indent += 1
-
- matchnodes, matchnames = work.pop()
- for node in matchnodes:
- if not matchnames:
- matching.append(node)
- continue
- if not isinstance(node, nodes.Collector):
- continue
- key = (type(node), node.nodeid)
- if key in matchnodes_cache:
- rep = matchnodes_cache[key]
- else:
- rep = collect_one_node(node)
- matchnodes_cache[key] = rep
- if rep.passed:
- submatchnodes = []
- for r in rep.result:
- # TODO: Remove parametrized workaround once collection structure contains
- # parametrization.
- if (
- r.name == matchnames[0]
- or r.name.split("[")[0] == matchnames[0]
- ):
- submatchnodes.append(r)
- if submatchnodes:
- work.append((submatchnodes, matchnames[1:]))
- # XXX Accept IDs that don't have "()" for class instances.
- elif len(rep.result) == 1 and rep.result[0].name == "()":
- work.append((rep.result, matchnames))
- else:
- # Report collection failures here to avoid failing to run some test
- # specified in the command line because the module could not be
- # imported (#134).
- node.ihook.pytest_collectreport(report=rep)
-
- self.trace("matchnodes finished -> ", len(matching), "nodes")
- self.trace.root.indent -= 1
-
- if not matching:
- report_arg = "::".join((str(argpath), *names))
- self._notfound.append((report_arg, col))
- continue
-
- # If __init__.py was the only file requested, then the matched
- # node will be the corresponding Package (by default), and the
- # first yielded item will be the __init__ Module itself, so
- # just use that. If this special case isn't taken, then all the
- # files in the package will be yielded.
- if argpath.basename == "__init__.py" and isinstance(
- matching[0], Package
- ):
- try:
- yield next(iter(matching[0].collect()))
- except StopIteration:
- # The package collects nothing with only an __init__.py
- # file in it, which gets ignored by the default
- # "python_files" option.
- pass
- continue
-
- yield from matching
-
- self.trace.root.indent -= 1
-
- def genitems(
- self, node: Union[nodes.Item, nodes.Collector]
- ) -> Iterator[nodes.Item]:
+ assert argpath.check(file=1)
+
+ if argpath in node_cache1:
+ col = node_cache1[argpath]
+ else:
+ collect_root = pkg_roots.get(argpath.dirname, self)
+ col = collect_root._collectfile(argpath, handle_dupes=False)
+ if col:
+ node_cache1[argpath] = col
+
+ matching = []
+ work: List[
+ Tuple[Sequence[Union[nodes.Item, nodes.Collector]], Sequence[str]]
+ ] = [(col, names)]
+ while work:
+ self.trace("matchnodes", col, names)
+ self.trace.root.indent += 1
+
+ matchnodes, matchnames = work.pop()
+ for node in matchnodes:
+ if not matchnames:
+ matching.append(node)
+ continue
+ if not isinstance(node, nodes.Collector):
+ continue
+ key = (type(node), node.nodeid)
+ if key in matchnodes_cache:
+ rep = matchnodes_cache[key]
+ else:
+ rep = collect_one_node(node)
+ matchnodes_cache[key] = rep
+ if rep.passed:
+ submatchnodes = []
+ for r in rep.result:
+ # TODO: Remove parametrized workaround once collection structure contains
+ # parametrization.
+ if (
+ r.name == matchnames[0]
+ or r.name.split("[")[0] == matchnames[0]
+ ):
+ submatchnodes.append(r)
+ if submatchnodes:
+ work.append((submatchnodes, matchnames[1:]))
+ # XXX Accept IDs that don't have "()" for class instances.
+ elif len(rep.result) == 1 and rep.result[0].name == "()":
+ work.append((rep.result, matchnames))
+ else:
+ # Report collection failures here to avoid failing to run some test
+ # specified in the command line because the module could not be
+ # imported (#134).
+ node.ihook.pytest_collectreport(report=rep)
+
+ self.trace("matchnodes finished -> ", len(matching), "nodes")
+ self.trace.root.indent -= 1
+
+ if not matching:
+ report_arg = "::".join((str(argpath), *names))
+ self._notfound.append((report_arg, col))
+ continue
+
+ # If __init__.py was the only file requested, then the matched
+ # node will be the corresponding Package (by default), and the
+ # first yielded item will be the __init__ Module itself, so
+ # just use that. If this special case isn't taken, then all the
+ # files in the package will be yielded.
+ if argpath.basename == "__init__.py" and isinstance(
+ matching[0], Package
+ ):
+ try:
+ yield next(iter(matching[0].collect()))
+ except StopIteration:
+ # The package collects nothing with only an __init__.py
+ # file in it, which gets ignored by the default
+ # "python_files" option.
+ pass
+ continue
+
+ yield from matching
+
+ self.trace.root.indent -= 1
+
+ def genitems(
+ self, node: Union[nodes.Item, nodes.Collector]
+ ) -> Iterator[nodes.Item]:
self.trace("genitems", node)
if isinstance(node, nodes.Item):
node.ihook.pytest_itemcollected(item=node)
@@ -808,69 +808,69 @@ class Session(nodes.FSCollector):
rep = collect_one_node(node)
if rep.passed:
for subnode in rep.result:
- yield from self.genitems(subnode)
+ yield from self.genitems(subnode)
node.ihook.pytest_collectreport(report=rep)
-
-
-def search_pypath(module_name: str) -> str:
- """Search sys.path for the given a dotted module name, and return its file system path."""
- try:
- spec = importlib.util.find_spec(module_name)
- # AttributeError: looks like package module, but actually filename
- # ImportError: module does not exist
- # ValueError: not a module name
- except (AttributeError, ImportError, ValueError):
- return module_name
- if spec is None or spec.origin is None or spec.origin == "namespace":
- return module_name
- elif spec.submodule_search_locations:
- return os.path.dirname(spec.origin)
- else:
- return spec.origin
-
-
-def resolve_collection_argument(
- invocation_path: Path, arg: str, *, as_pypath: bool = False
-) -> Tuple[py.path.local, List[str]]:
- """Parse path arguments optionally containing selection parts and return (fspath, names).
-
- Command-line arguments can point to files and/or directories, and optionally contain
- parts for specific tests selection, for example:
-
- "pkg/tests/test_foo.py::TestClass::test_foo"
-
- This function ensures the path exists, and returns a tuple:
-
- (py.path.path("/full/path/to/pkg/tests/test_foo.py"), ["TestClass", "test_foo"])
-
- When as_pypath is True, expects that the command-line argument actually contains
- module paths instead of file-system paths:
-
- "pkg.tests.test_foo::TestClass::test_foo"
-
- In which case we search sys.path for a matching module, and then return the *path* to the
- found module.
-
- If the path doesn't exist, raise UsageError.
- If the path is a directory and selection parts are present, raise UsageError.
- """
- strpath, *parts = str(arg).split("::")
- if as_pypath:
- strpath = search_pypath(strpath)
- fspath = invocation_path / strpath
- fspath = absolutepath(fspath)
- if not fspath.exists():
- msg = (
- "module or package not found: {arg} (missing __init__.py?)"
- if as_pypath
- else "file or directory not found: {arg}"
- )
- raise UsageError(msg.format(arg=arg))
- if parts and fspath.is_dir():
- msg = (
- "package argument cannot contain :: selection parts: {arg}"
- if as_pypath
- else "directory argument cannot contain :: selection parts: {arg}"
- )
- raise UsageError(msg.format(arg=arg))
- return py.path.local(str(fspath)), parts
+
+
+def search_pypath(module_name: str) -> str:
+ """Search sys.path for the given a dotted module name, and return its file system path."""
+ try:
+ spec = importlib.util.find_spec(module_name)
+ # AttributeError: looks like package module, but actually filename
+ # ImportError: module does not exist
+ # ValueError: not a module name
+ except (AttributeError, ImportError, ValueError):
+ return module_name
+ if spec is None or spec.origin is None or spec.origin == "namespace":
+ return module_name
+ elif spec.submodule_search_locations:
+ return os.path.dirname(spec.origin)
+ else:
+ return spec.origin
+
+
+def resolve_collection_argument(
+ invocation_path: Path, arg: str, *, as_pypath: bool = False
+) -> Tuple[py.path.local, List[str]]:
+ """Parse path arguments optionally containing selection parts and return (fspath, names).
+
+ Command-line arguments can point to files and/or directories, and optionally contain
+ parts for specific tests selection, for example:
+
+ "pkg/tests/test_foo.py::TestClass::test_foo"
+
+ This function ensures the path exists, and returns a tuple:
+
+ (py.path.path("/full/path/to/pkg/tests/test_foo.py"), ["TestClass", "test_foo"])
+
+ When as_pypath is True, expects that the command-line argument actually contains
+ module paths instead of file-system paths:
+
+ "pkg.tests.test_foo::TestClass::test_foo"
+
+ In which case we search sys.path for a matching module, and then return the *path* to the
+ found module.
+
+ If the path doesn't exist, raise UsageError.
+ If the path is a directory and selection parts are present, raise UsageError.
+ """
+ strpath, *parts = str(arg).split("::")
+ if as_pypath:
+ strpath = search_pypath(strpath)
+ fspath = invocation_path / strpath
+ fspath = absolutepath(fspath)
+ if not fspath.exists():
+ msg = (
+ "module or package not found: {arg} (missing __init__.py?)"
+ if as_pypath
+ else "file or directory not found: {arg}"
+ )
+ raise UsageError(msg.format(arg=arg))
+ if parts and fspath.is_dir():
+ msg = (
+ "package argument cannot contain :: selection parts: {arg}"
+ if as_pypath
+ else "directory argument cannot contain :: selection parts: {arg}"
+ )
+ raise UsageError(msg.format(arg=arg))
+ return py.path.local(str(fspath)), parts