diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2025-05-05 12:31:52 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2025-05-05 12:41:33 +0300 |
commit | 6ff49ec58061f642c3a2f83c61eba12820787dfc (patch) | |
tree | c733ec9bdb15ed280080d31dea8725bfec717acd /contrib/python/pytest/py3/_pytest/fixtures.py | |
parent | eefca8305c6a545cc6b16dca3eb0d91dcef2adcd (diff) | |
download | ydb-6ff49ec58061f642c3a2f83c61eba12820787dfc.tar.gz |
Intermediate changes
commit_hash:8b3bb826b17db8329ed1221f545c0645f12c552d
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/fixtures.py')
-rw-r--r-- | contrib/python/pytest/py3/_pytest/fixtures.py | 752 |
1 files changed, 392 insertions, 360 deletions
diff --git a/contrib/python/pytest/py3/_pytest/fixtures.py b/contrib/python/pytest/py3/_pytest/fixtures.py index 0462504efaf..206fd084ae4 100644 --- a/contrib/python/pytest/py3/_pytest/fixtures.py +++ b/contrib/python/pytest/py3/_pytest/fixtures.py @@ -1,18 +1,19 @@ +import abc +from collections import defaultdict +from collections import deque +from contextlib import suppress import dataclasses import functools import inspect import os -import sys -import warnings -from collections import defaultdict -from collections import deque -from contextlib import suppress from pathlib import Path -from types import TracebackType +from typing import AbstractSet from typing import Any from typing import Callable from typing import cast from typing import Dict +from typing import Final +from typing import final from typing import Generator from typing import Generic from typing import Iterable @@ -21,13 +22,14 @@ from typing import List from typing import MutableMapping from typing import NoReturn from typing import Optional +from typing import overload from typing import Sequence from typing import Set from typing import Tuple -from typing import Type from typing import TYPE_CHECKING from typing import TypeVar from typing import Union +import warnings import _pytest from _pytest import nodes @@ -35,10 +37,8 @@ from _pytest._code import getfslineno from _pytest._code.code import FormattedExcinfo from _pytest._code.code import TerminalRepr from _pytest._io import TerminalWriter -from _pytest.compat import _format_args from _pytest.compat import _PytestWrapper from _pytest.compat import assert_never -from _pytest.compat import final from _pytest.compat import get_real_func from _pytest.compat import get_real_method from _pytest.compat import getfuncargnames @@ -47,12 +47,12 @@ from _pytest.compat import getlocation from _pytest.compat import is_generator from _pytest.compat import NOTSET from _pytest.compat import NotSetType -from _pytest.compat import overload from _pytest.compat import safe_getattr from _pytest.config import _PluggyPlugin from _pytest.config import Config from _pytest.config.argparsing import Parser from _pytest.deprecated import check_ispytest +from _pytest.deprecated import MARKED_FIXTURE from _pytest.deprecated import YIELD_FIXTURE from _pytest.mark import Mark from _pytest.mark import ParameterSet @@ -62,17 +62,17 @@ from _pytest.outcomes import skip from _pytest.outcomes import TEST_OUTCOME from _pytest.pathlib import absolutepath from _pytest.pathlib import bestrelpath +from _pytest.scope import _ScopeName from _pytest.scope import HIGH_SCOPES from _pytest.scope import Scope -from _pytest.stash import StashKey if TYPE_CHECKING: from typing import Deque - from _pytest.scope import _ScopeName from _pytest.main import Session from _pytest.python import CallSpec2 + from _pytest.python import Function from _pytest.python import Metafunc @@ -97,8 +97,8 @@ _FixtureCachedResult = Union[ None, # Cache key. object, - # Exc info if raised. - Tuple[Type[BaseException], BaseException, TracebackType], + # Exception if raised. + BaseException, ], ] @@ -120,9 +120,8 @@ def get_scope_package( from _pytest.python import Package current: Optional[Union[nodes.Item, nodes.Collector]] = node - fixture_package_name = "{}/{}".format(fixturedef.baseid, "__init__.py") while current and ( - not isinstance(current, Package) or fixture_package_name != current.nodeid + not isinstance(current, Package) or current.nodeid != fixturedef.baseid ): current = current.parent # type: ignore[assignment] if current is None: @@ -136,7 +135,9 @@ def get_scope_node( import _pytest.python if scope is Scope.Function: - return node.getparent(nodes.Item) + # Type ignored because this is actually safe, see: + # https://github.com/python/mypy/issues/4717 + return node.getparent(nodes.Item) # type: ignore[type-abstract] elif scope is Scope.Class: return node.getparent(_pytest.python.Class) elif scope is Scope.Module: @@ -149,80 +150,6 @@ def get_scope_node( assert_never(scope) -# Used for storing artificial fixturedefs for direct parametrization. -name2pseudofixturedef_key = StashKey[Dict[str, "FixtureDef[Any]"]]() - - -def add_funcarg_pseudo_fixture_def( - collector: nodes.Collector, metafunc: "Metafunc", fixturemanager: "FixtureManager" -) -> None: - # This function will transform all collected calls to functions - # if they use direct funcargs (i.e. direct parametrization) - # because we want later test execution to be able to rely on - # an existing FixtureDef structure for all arguments. - # XXX we can probably avoid this algorithm if we modify CallSpec2 - # to directly care for creating the fixturedefs within its methods. - if not metafunc._calls[0].funcargs: - # This function call does not have direct parametrization. - return - # Collect funcargs of all callspecs into a list of values. - arg2params: Dict[str, List[object]] = {} - arg2scope: Dict[str, Scope] = {} - for callspec in metafunc._calls: - for argname, argvalue in callspec.funcargs.items(): - assert argname not in callspec.params - callspec.params[argname] = argvalue - arg2params_list = arg2params.setdefault(argname, []) - callspec.indices[argname] = len(arg2params_list) - arg2params_list.append(argvalue) - if argname not in arg2scope: - scope = callspec._arg2scope.get(argname, Scope.Function) - arg2scope[argname] = scope - callspec.funcargs.clear() - - # Register artificial FixtureDef's so that later at test execution - # time we can rely on a proper FixtureDef to exist for fixture setup. - arg2fixturedefs = metafunc._arg2fixturedefs - for argname, valuelist in arg2params.items(): - # If we have a scope that is higher than function, we need - # to make sure we only ever create an according fixturedef on - # a per-scope basis. We thus store and cache the fixturedef on the - # node related to the scope. - scope = arg2scope[argname] - node = None - if scope is not Scope.Function: - node = get_scope_node(collector, scope) - if node is None: - assert scope is Scope.Class and isinstance( - collector, _pytest.python.Module - ) - # Use module-level collector for class-scope (for now). - node = collector - if node is None: - name2pseudofixturedef = None - else: - default: Dict[str, FixtureDef[Any]] = {} - name2pseudofixturedef = node.stash.setdefault( - name2pseudofixturedef_key, default - ) - if name2pseudofixturedef is not None and argname in name2pseudofixturedef: - arg2fixturedefs[argname] = [name2pseudofixturedef[argname]] - else: - fixturedef = FixtureDef( - fixturemanager=fixturemanager, - baseid="", - argname=argname, - func=get_direct_param_fixture_func, - scope=arg2scope[argname], - params=valuelist, - unittest=False, - ids=None, - ) - arg2fixturedefs[argname] = [fixturedef] - if name2pseudofixturedef is not None: - name2pseudofixturedef[argname] = fixturedef - - def getfixturemarker(obj: object) -> Optional["FixtureFunctionMarker"]: """Return fixturemarker or None if it doesn't exist or raised exceptions.""" @@ -232,11 +159,17 @@ def getfixturemarker(obj: object) -> Optional["FixtureFunctionMarker"]: ) -# Parametrized fixture key, helper alias for code below. -_Key = Tuple[object, ...] +@dataclasses.dataclass(frozen=True) +class FixtureArgKey: + argname: str + param_index: int + scoped_item_path: Optional[Path] + item_cls: Optional[type] -def get_parametrized_fixture_keys(item: nodes.Item, scope: Scope) -> Iterator[_Key]: +def get_parametrized_fixture_keys( + item: nodes.Item, scope: Scope +) -> Iterator[FixtureArgKey]: """Return list of keys for all parametrized arguments which match the specified scope.""" assert scope is not Scope.Function @@ -246,24 +179,28 @@ def get_parametrized_fixture_keys(item: nodes.Item, scope: Scope) -> Iterator[_K pass else: cs: CallSpec2 = callspec - # cs.indices.items() is random order of argnames. Need to + # cs.indices is random order of argnames. Need to # sort this so that different calls to # get_parametrized_fixture_keys will be deterministic. - for argname, param_index in sorted(cs.indices.items()): + for argname in sorted(cs.indices): if cs._arg2scope[argname] != scope: continue + + item_cls = None if scope is Scope.Session: - key: _Key = (argname, param_index) + scoped_item_path = None elif scope is Scope.Package: - key = (argname, param_index, item.path.parent) + scoped_item_path = item.path elif scope is Scope.Module: - key = (argname, param_index, item.path) + scoped_item_path = item.path elif scope is Scope.Class: + scoped_item_path = item.path item_cls = item.cls # type: ignore[attr-defined] - key = (argname, param_index, item.path, item_cls) else: assert_never(scope) - yield key + + param_index = cs.indices[argname] + yield FixtureArgKey(argname, param_index, scoped_item_path, item_cls) # Algorithm for sorting on a per-parametrized resource setup basis. @@ -273,19 +210,17 @@ def get_parametrized_fixture_keys(item: nodes.Item, scope: Scope) -> Iterator[_K def reorder_items(items: Sequence[nodes.Item]) -> List[nodes.Item]: - argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[_Key, None]]] = {} - items_by_argkey: Dict[Scope, Dict[_Key, Deque[nodes.Item]]] = {} + argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[FixtureArgKey, None]]] = {} + items_by_argkey: Dict[Scope, Dict[FixtureArgKey, Deque[nodes.Item]]] = {} for scope in HIGH_SCOPES: - d: Dict[nodes.Item, Dict[_Key, None]] = {} - argkeys_cache[scope] = d - item_d: Dict[_Key, Deque[nodes.Item]] = defaultdict(deque) - items_by_argkey[scope] = item_d + scoped_argkeys_cache = argkeys_cache[scope] = {} + scoped_items_by_argkey = items_by_argkey[scope] = defaultdict(deque) for item in items: keys = dict.fromkeys(get_parametrized_fixture_keys(item, scope), None) if keys: - d[item] = keys + scoped_argkeys_cache[item] = keys for key in keys: - item_d[key].append(item) + scoped_items_by_argkey[key].append(item) items_dict = dict.fromkeys(items, None) return list( reorder_items_atscope(items_dict, argkeys_cache, items_by_argkey, Scope.Session) @@ -294,8 +229,8 @@ def reorder_items(items: Sequence[nodes.Item]) -> List[nodes.Item]: def fix_cache_order( item: nodes.Item, - argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[_Key, None]]], - items_by_argkey: Dict[Scope, Dict[_Key, "Deque[nodes.Item]"]], + argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[FixtureArgKey, None]]], + items_by_argkey: Dict[Scope, Dict[FixtureArgKey, "Deque[nodes.Item]"]], ) -> None: for scope in HIGH_SCOPES: for key in argkeys_cache[scope].get(item, []): @@ -304,13 +239,13 @@ def fix_cache_order( def reorder_items_atscope( items: Dict[nodes.Item, None], - argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[_Key, None]]], - items_by_argkey: Dict[Scope, Dict[_Key, "Deque[nodes.Item]"]], + argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[FixtureArgKey, None]]], + items_by_argkey: Dict[Scope, Dict[FixtureArgKey, "Deque[nodes.Item]"]], scope: Scope, ) -> Dict[nodes.Item, None]: if scope is Scope.Function or len(items) < 3: return items - ignore: Set[Optional[_Key]] = set() + ignore: Set[Optional[FixtureArgKey]] = set() items_deque = deque(items) items_done: Dict[nodes.Item, None] = {} scoped_items_by_argkey = items_by_argkey[scope] @@ -348,21 +283,35 @@ def reorder_items_atscope( return items_done -def get_direct_param_fixture_func(request: "FixtureRequest") -> Any: - return request.param +@dataclasses.dataclass(frozen=True) +class FuncFixtureInfo: + """Fixture-related information for a fixture-requesting item (e.g. test + function). + This is used to examine the fixtures which an item requests statically + (known during collection). This includes autouse fixtures, fixtures + requested by the `usefixtures` marker, fixtures requested in the function + parameters, and the transitive closure of these. + + An item may also request fixtures dynamically (using `request.getfixturevalue`); + these are not reflected here. + """ -@dataclasses.dataclass -class FuncFixtureInfo: __slots__ = ("argnames", "initialnames", "names_closure", "name2fixturedefs") - # Original function argument names. + # Fixture names that the item requests directly by function parameters. argnames: Tuple[str, ...] - # Argnames that function immediately requires. These include argnames + - # fixture names specified via usefixtures and via autouse=True in fixture - # definitions. + # Fixture names that the item immediately requires. These include + # argnames + fixture names specified via usefixtures and via autouse=True in + # fixture definitions. initialnames: Tuple[str, ...] + # The transitive closure of the fixture names that the item requires. + # Note: can't include dynamic dependencies (`request.getfixturevalue` calls). names_closure: List[str] + # A map from a fixture name in the transitive closure to the FixtureDefs + # matching the name which are applicable to this function. + # There may be multiple overriding fixtures with the same name. The + # sequence is ordered from furthest to closes to the function. name2fixturedefs: Dict[str, Sequence["FixtureDef[Any]"]] def prune_dependency_tree(self) -> None: @@ -393,25 +342,45 @@ class FuncFixtureInfo: self.names_closure[:] = sorted(closure, key=self.names_closure.index) -class FixtureRequest: - """A request for a fixture from a test or fixture function. +class FixtureRequest(abc.ABC): + """The type of the ``request`` fixture. - A request object gives access to the requesting test context and has - an optional ``param`` attribute in case the fixture is parametrized - indirectly. + A request object gives access to the requesting test context and has a + ``param`` attribute in case the fixture is parametrized. """ - def __init__(self, pyfuncitem, *, _ispytest: bool = False) -> None: + def __init__( + self, + pyfuncitem: "Function", + fixturename: Optional[str], + arg2fixturedefs: Dict[str, Sequence["FixtureDef[Any]"]], + arg2index: Dict[str, int], + fixture_defs: Dict[str, "FixtureDef[Any]"], + *, + _ispytest: bool = False, + ) -> None: check_ispytest(_ispytest) - self._pyfuncitem = pyfuncitem #: Fixture for which this request is being performed. - self.fixturename: Optional[str] = None - self._scope = Scope.Function - self._fixture_defs: Dict[str, FixtureDef[Any]] = {} - fixtureinfo: FuncFixtureInfo = pyfuncitem._fixtureinfo - self._arg2fixturedefs = fixtureinfo.name2fixturedefs.copy() - self._arg2index: Dict[str, int] = {} - self._fixturemanager: FixtureManager = pyfuncitem.session._fixturemanager + self.fixturename: Final = fixturename + self._pyfuncitem: Final = pyfuncitem + # The FixtureDefs for each fixture name requested by this item. + # Starts from the statically-known fixturedefs resolved during + # collection. Dynamically requested fixtures (using + # `request.getfixturevalue("foo")`) are added dynamically. + self._arg2fixturedefs: Final = arg2fixturedefs + # A fixture may override another fixture with the same name, e.g. a fixture + # in a module can override a fixture in a conftest, a fixture in a class can + # override a fixture in the module, and so on. + # An overriding fixture can request its own name; in this case it gets + # the value of the fixture it overrides, one level up. + # The _arg2index state keeps the current depth in the overriding chain. + # The fixturedefs list in _arg2fixturedefs for a given name is ordered from + # furthest to closest, so we use negative indexing -1, -2, ... to go from + # last to first. + self._arg2index: Final = arg2index + # The evaluated argnames so far, mapping to the FixtureDef they resolved + # to. + self._fixture_defs: Final = fixture_defs # Notes on the type of `param`: # -`request.param` is only defined in parametrized fixtures, and will raise # AttributeError otherwise. Python typing has no notion of "undefined", so @@ -423,37 +392,31 @@ class FixtureRequest: self.param: Any @property - def scope(self) -> "_ScopeName": + def _fixturemanager(self) -> "FixtureManager": + return self._pyfuncitem.session._fixturemanager + + @property + @abc.abstractmethod + def _scope(self) -> Scope: + raise NotImplementedError() + + @property + def scope(self) -> _ScopeName: """Scope string, one of "function", "class", "module", "package", "session".""" return self._scope.value @property def fixturenames(self) -> List[str]: """Names of all active fixtures in this request.""" - result = list(self._pyfuncitem._fixtureinfo.names_closure) + result = list(self._pyfuncitem.fixturenames) result.extend(set(self._fixture_defs).difference(result)) return result @property + @abc.abstractmethod def node(self): """Underlying collection node (depends on current request scope).""" - scope = self._scope - if scope is Scope.Function: - # This might also be a non-function Item despite its attribute name. - node: Optional[Union[nodes.Item, nodes.Collector]] = self._pyfuncitem - elif scope is Scope.Package: - # FIXME: _fixturedef is not defined on FixtureRequest (this class), - # but on FixtureRequest (a subclass). - node = get_scope_package(self._pyfuncitem, self._fixturedef) # type: ignore[attr-defined] - else: - node = get_scope_node(self._pyfuncitem, scope) - if node is None and scope is Scope.Class: - # Fallback to function item itself. - node = self._pyfuncitem - assert node, 'Could not obtain a node for scope "{}" for function {!r}'.format( - scope, self._pyfuncitem - ) - return node + raise NotImplementedError() def _getnextfixturedef(self, argname: str) -> "FixtureDef[Any]": fixturedefs = self._arg2fixturedefs.get(argname, None) @@ -464,12 +427,17 @@ class FixtureRequest: assert self._pyfuncitem.parent is not None parentid = self._pyfuncitem.parent.nodeid fixturedefs = self._fixturemanager.getfixturedefs(argname, parentid) - # TODO: Fix this type ignore. Either add assert or adjust types. - # Can this be None here? - self._arg2fixturedefs[argname] = fixturedefs # type: ignore[assignment] - # fixturedefs list is immutable so we maintain a decreasing index. + if fixturedefs is not None: + self._arg2fixturedefs[argname] = fixturedefs + # No fixtures defined with this name. + if fixturedefs is None: + raise FixtureLookupError(argname, self) + # The are no fixtures with this name applicable for the function. + if not fixturedefs: + raise FixtureLookupError(argname, self) index = self._arg2index.get(argname, 0) - 1 - if fixturedefs is None or (-index > len(fixturedefs)): + # The fixture requested its own name, but no remaining to override. + if -index > len(fixturedefs): raise FixtureLookupError(argname, self) self._arg2index[argname] = index return fixturedefs[index] @@ -502,7 +470,7 @@ class FixtureRequest: """Instance (can be None) on which test function was collected.""" # unittest support hack, see _pytest.unittest.TestCaseFunction. try: - return self._pyfuncitem._testcase + return self._pyfuncitem._testcase # type: ignore[attr-defined] except AttributeError: function = getattr(self, "function", None) return getattr(function, "__self__", None) @@ -512,15 +480,16 @@ class FixtureRequest: """Python module object where the test function was collected.""" if self.scope not in ("function", "class", "module"): raise AttributeError(f"module not available in {self.scope}-scoped context") - return self._pyfuncitem.getparent(_pytest.python.Module).obj + mod = self._pyfuncitem.getparent(_pytest.python.Module) + assert mod is not None + return mod.obj @property def path(self) -> Path: """Path where the test function was collected.""" if self.scope not in ("function", "class", "module", "package"): raise AttributeError(f"path not available in {self.scope}-scoped context") - # TODO: Remove ignore once _pyfuncitem is properly typed. - return self._pyfuncitem.path # type: ignore + return self._pyfuncitem.path @property def keywords(self) -> MutableMapping[str, Any]: @@ -533,11 +502,11 @@ class FixtureRequest: """Pytest session object.""" return self._pyfuncitem.session # type: ignore[no-any-return] + @abc.abstractmethod def addfinalizer(self, finalizer: Callable[[], object]) -> None: """Add finalizer/teardown function to be called without arguments after the last test within the requesting test context finished execution.""" - # XXX usually this method is shadowed by fixturedef specific ones. - self.node.addfinalizer(finalizer) + raise NotImplementedError() def applymarker(self, marker: Union[str, MarkDecorator]) -> None: """Apply a marker to a single test function invocation. @@ -558,13 +527,6 @@ class FixtureRequest: """ raise self._fixturemanager.FixtureLookupError(None, self, msg) - def _fillfixtures(self) -> None: - item = self._pyfuncitem - fixturenames = getattr(item, "fixturenames", self.fixturenames) - for argname in fixturenames: - if argname not in item.funcargs: - item.funcargs[argname] = self.getfixturevalue(argname) - def getfixturevalue(self, argname: str) -> Any: """Dynamically run a named fixture function. @@ -592,9 +554,8 @@ class FixtureRequest: def _get_active_fixturedef( self, argname: str ) -> Union["FixtureDef[object]", PseudoFixtureDef[object]]: - try: - return self._fixture_defs[argname] - except KeyError: + fixturedef = self._fixture_defs.get(argname) + if fixturedef is None: try: fixturedef = self._getnextfixturedef(argname) except FixtureLookupError: @@ -602,10 +563,8 @@ class FixtureRequest: cached_result = (self, [0], None) return PseudoFixtureDef(cached_result, Scope.Function) raise - # Remove indent to prevent the python3 exception - # from leaking into the call. - self._compute_fixture_value(fixturedef) - self._fixture_defs[argname] = fixturedef + self._compute_fixture_value(fixturedef) + self._fixture_defs[argname] = fixturedef return fixturedef def _get_fixturestack(self) -> List["FixtureDef[Any]"]: @@ -648,13 +607,9 @@ class FixtureRequest: fixtures_not_supported = getattr(funcitem, "nofuncargs", False) if has_params and fixtures_not_supported: msg = ( - "{name} does not support fixtures, maybe unittest.TestCase subclass?\n" - "Node id: {nodeid}\n" - "Function type: {typename}" - ).format( - name=funcitem.name, - nodeid=funcitem.nodeid, - typename=type(funcitem).__name__, + f"{funcitem.name} does not support fixtures, maybe unittest.TestCase subclass?\n" + f"Node id: {funcitem.nodeid}\n" + f"Function type: {type(funcitem).__name__}" ) fail(msg, pytrace=False) if has_params: @@ -698,7 +653,97 @@ class FixtureRequest: self, fixturedef: "FixtureDef[object]", subrequest: "SubRequest" ) -> None: # If fixture function failed it might have registered finalizers. - subrequest.node.addfinalizer(lambda: fixturedef.finish(request=subrequest)) + finalizer = functools.partial(fixturedef.finish, request=subrequest) + subrequest.node.addfinalizer(finalizer) + + +@final +class TopRequest(FixtureRequest): + """The type of the ``request`` fixture in a test function.""" + + def __init__(self, pyfuncitem: "Function", *, _ispytest: bool = False) -> None: + super().__init__( + fixturename=None, + pyfuncitem=pyfuncitem, + arg2fixturedefs=pyfuncitem._fixtureinfo.name2fixturedefs.copy(), + arg2index={}, + fixture_defs={}, + _ispytest=_ispytest, + ) + + @property + def _scope(self) -> Scope: + return Scope.Function + + @property + def node(self): + return self._pyfuncitem + + def __repr__(self) -> str: + return "<FixtureRequest for %r>" % (self.node) + + def _fillfixtures(self) -> None: + item = self._pyfuncitem + for argname in item.fixturenames: + if argname not in item.funcargs: + item.funcargs[argname] = self.getfixturevalue(argname) + + def addfinalizer(self, finalizer: Callable[[], object]) -> None: + self.node.addfinalizer(finalizer) + + +@final +class SubRequest(FixtureRequest): + """The type of the ``request`` fixture in a fixture function requested + (transitively) by a test function.""" + + def __init__( + self, + request: FixtureRequest, + scope: Scope, + param: Any, + param_index: int, + fixturedef: "FixtureDef[object]", + *, + _ispytest: bool = False, + ) -> None: + super().__init__( + pyfuncitem=request._pyfuncitem, + fixturename=fixturedef.argname, + fixture_defs=request._fixture_defs, + arg2fixturedefs=request._arg2fixturedefs, + arg2index=request._arg2index, + _ispytest=_ispytest, + ) + self._parent_request: Final[FixtureRequest] = request + self._scope_field: Final = scope + self._fixturedef: Final = fixturedef + if param is not NOTSET: + self.param = param + self.param_index: Final = param_index + + def __repr__(self) -> str: + return f"<SubRequest {self.fixturename!r} for {self._pyfuncitem!r}>" + + @property + def _scope(self) -> Scope: + return self._scope_field + + @property + def node(self): + scope = self._scope + if scope is Scope.Function: + # This might also be a non-function Item despite its attribute name. + node: Optional[Union[nodes.Item, nodes.Collector]] = self._pyfuncitem + elif scope is Scope.Package: + node = get_scope_package(self._pyfuncitem, self._fixturedef) + else: + node = get_scope_node(self._pyfuncitem, scope) + if node is None and scope is Scope.Class: + # Fallback to function item itself. + node = self._pyfuncitem + assert node, f'Could not obtain a node for scope "{scope}" for function {self._pyfuncitem!r}' + return node def _check_scope( self, @@ -728,48 +773,13 @@ class FixtureRequest: p = bestrelpath(session.path, fs) else: p = fs - args = _format_args(factory) - lines.append("%s:%d: def %s%s" % (p, lineno + 1, factory.__name__, args)) + lines.append( + "%s:%d: def %s%s" + % (p, lineno + 1, factory.__name__, inspect.signature(factory)) + ) return lines - def __repr__(self) -> str: - return "<FixtureRequest for %r>" % (self.node) - - -@final -class SubRequest(FixtureRequest): - """A sub request for handling getting a fixture from a test function/fixture.""" - - def __init__( - self, - request: "FixtureRequest", - scope: Scope, - param: Any, - param_index: int, - fixturedef: "FixtureDef[object]", - *, - _ispytest: bool = False, - ) -> None: - check_ispytest(_ispytest) - self._parent_request = request - self.fixturename = fixturedef.argname - if param is not NOTSET: - self.param = param - self.param_index = param_index - self._scope = scope - self._fixturedef = fixturedef - self._pyfuncitem = request._pyfuncitem - self._fixture_defs = request._fixture_defs - self._arg2fixturedefs = request._arg2fixturedefs - self._arg2index = request._arg2index - self._fixturemanager = request._fixturemanager - - def __repr__(self) -> str: - return f"<SubRequest {self.fixturename!r} for {self._pyfuncitem!r}>" - def addfinalizer(self, finalizer: Callable[[], object]) -> None: - """Add finalizer/teardown function to be called without arguments after - the last test within the requesting test context finished execution.""" self._fixturedef.addfinalizer(finalizer) def _schedule_finalizers( @@ -778,7 +788,10 @@ class SubRequest(FixtureRequest): # If the executing fixturedef was not explicitly requested in the argument list (via # getfixturevalue inside the fixture call) then ensure this fixture def will be finished # first. - if fixturedef.argname not in self.fixturenames: + if ( + fixturedef.argname not in self._fixture_defs + and fixturedef.argname not in self._pyfuncitem.fixturenames + ): fixturedef.addfinalizer( functools.partial(self._fixturedef.finish, request=self) ) @@ -825,14 +838,16 @@ class FixtureLookupError(LookupError): if msg is None: fm = self.request._fixturemanager available = set() - parentid = self.request._pyfuncitem.parent.nodeid + parent = self.request._pyfuncitem.parent + assert parent is not None + parentid = parent.nodeid for name, fixturedefs in fm._arg2fixturedefs.items(): faclist = list(fm._matchfactories(fixturedefs, parentid)) if faclist: available.add(name) if self.argname in available: - msg = " recursive dependency involving fixture '{}' detected".format( - self.argname + msg = ( + f" recursive dependency involving fixture '{self.argname}' detected" ) else: msg = f"fixture '{self.argname}' not found" @@ -916,25 +931,23 @@ def _teardown_yield_fixture(fixturefunc, it) -> None: def _eval_scope_callable( - scope_callable: "Callable[[str, Config], _ScopeName]", + scope_callable: Callable[[str, Config], _ScopeName], fixture_name: str, config: Config, -) -> "_ScopeName": +) -> _ScopeName: try: # Type ignored because there is no typing mechanism to specify # keyword arguments, currently. result = scope_callable(fixture_name=fixture_name, config=config) # type: ignore[call-arg] except Exception as e: raise TypeError( - "Error evaluating {} while defining fixture '{}'.\n" - "Expected a function with the signature (*, fixture_name, config)".format( - scope_callable, fixture_name - ) + f"Error evaluating {scope_callable} while defining fixture '{fixture_name}'.\n" + "Expected a function with the signature (*, fixture_name, config)" ) from e if not isinstance(result, str): fail( - "Expected {} to return a 'str' while defining fixture '{}', but it returned:\n" - "{!r}".format(scope_callable, fixture_name, result), + f"Expected {scope_callable} to return a 'str' while defining fixture '{fixture_name}', but it returned:\n" + f"{result!r}", pytrace=False, ) return result @@ -942,7 +955,11 @@ def _eval_scope_callable( @final class FixtureDef(Generic[FixtureValue]): - """A container for a fixture definition.""" + """A container for a fixture definition. + + Note: At this time, only explicitly documented fields and methods are + considered public stable API. + """ def __init__( self, @@ -950,13 +967,16 @@ class FixtureDef(Generic[FixtureValue]): baseid: Optional[str], argname: str, func: "_FixtureFunc[FixtureValue]", - scope: Union[Scope, "_ScopeName", Callable[[str, Config], "_ScopeName"], None], + scope: Union[Scope, _ScopeName, Callable[[str, Config], _ScopeName], None], params: Optional[Sequence[object]], unittest: bool = False, ids: Optional[ Union[Tuple[Optional[object], ...], Callable[[Any], Optional[object]]] ] = None, + *, + _ispytest: bool = False, ) -> None: + check_ispytest(_ispytest) self._fixturemanager = fixturemanager # The "base" node ID for the fixture. # @@ -972,15 +992,15 @@ class FixtureDef(Generic[FixtureValue]): # directory path relative to the rootdir. # # For other plugins, the baseid is the empty string (always matches). - self.baseid = baseid or "" + self.baseid: Final = baseid or "" # Whether the fixture was found from a node or a conftest in the # collection tree. Will be false for fixtures defined in non-conftest # plugins. - self.has_location = baseid is not None + self.has_location: Final = baseid is not None # The fixture factory function. - self.func = func + self.func: Final = func # The name by which the fixture may be requested. - self.argname = argname + self.argname: Final = argname if scope is None: scope = Scope.Function elif callable(scope): @@ -989,26 +1009,24 @@ class FixtureDef(Generic[FixtureValue]): scope = Scope.from_user( scope, descr=f"Fixture '{func.__name__}'", where=baseid ) - self._scope = scope + self._scope: Final = scope # If the fixture is directly parametrized, the parameter values. - self.params: Optional[Sequence[object]] = params + self.params: Final = params # If the fixture is directly parametrized, a tuple of explicit IDs to # assign to the parameter values, or a callable to generate an ID given # a parameter value. - self.ids = ids + self.ids: Final = ids # The names requested by the fixtures. - self.argnames = getfuncargnames(func, name=argname, is_method=unittest) + self.argnames: Final = getfuncargnames(func, name=argname, is_method=unittest) # Whether the fixture was collected from a unittest TestCase class. - # Note that it really only makes sense to define autouse fixtures in - # unittest TestCases. - self.unittest = unittest + self.unittest: Final = unittest # If the fixture was executed, the current value of the fixture. # Can change if the fixture is executed with different parameters. self.cached_result: Optional[_FixtureCachedResult[FixtureValue]] = None - self._finalizers: List[Callable[[], object]] = [] + self._finalizers: Final[List[Callable[[], object]]] = [] @property - def scope(self) -> "_ScopeName": + def scope(self) -> _ScopeName: """Scope string, one of "function", "class", "module", "package", "session".""" return self._scope.value @@ -1036,7 +1054,7 @@ class FixtureDef(Generic[FixtureValue]): # value and remove all finalizers because they may be bound methods # which will keep instances alive. self.cached_result = None - self._finalizers = [] + self._finalizers.clear() def execute(self, request: SubRequest) -> FixtureValue: # Get required arguments and register our own finish() @@ -1050,13 +1068,13 @@ class FixtureDef(Generic[FixtureValue]): my_cache_key = self.cache_key(request) if self.cached_result is not None: + cache_key = self.cached_result[1] # note: comparison with `==` can fail (or be expensive) for e.g. # numpy arrays (#6497). - cache_key = self.cached_result[1] if my_cache_key is cache_key: if self.cached_result[2] is not None: - _, val, tb = self.cached_result[2] - raise val.with_traceback(tb) + exc = self.cached_result[2] + raise exc else: result = self.cached_result[0] return result @@ -1073,9 +1091,7 @@ class FixtureDef(Generic[FixtureValue]): return request.param_index if not hasattr(request, "param") else request.param def __repr__(self) -> str: - return "<FixtureDef argname={!r} scope={!r} baseid={!r}>".format( - self.argname, self.scope, self.baseid - ) + return f"<FixtureDef argname={self.argname!r} scope={self.scope!r} baseid={self.baseid!r}>" def resolve_fixture_function( @@ -1096,7 +1112,8 @@ def resolve_fixture_function( # Handle the case where fixture is defined not in a test class, but some other class # (for example a plugin class with a fixture), see #2270. if hasattr(fixturefunc, "__self__") and not isinstance( - request.instance, fixturefunc.__self__.__class__ # type: ignore[union-attr] + request.instance, + fixturefunc.__self__.__class__, # type: ignore[union-attr] ): return fixturefunc fixturefunc = getimfunc(fixturedef.func) @@ -1121,35 +1138,18 @@ def pytest_fixture_setup( my_cache_key = fixturedef.cache_key(request) try: result = call_fixture_func(fixturefunc, request, kwargs) - except TEST_OUTCOME: - exc_info = sys.exc_info() - assert exc_info[0] is not None - if isinstance( - exc_info[1], skip.Exception - ) and not fixturefunc.__name__.startswith("xunit_setup"): - exc_info[1]._use_item_location = True # type: ignore[attr-defined] - fixturedef.cached_result = (None, my_cache_key, exc_info) + except TEST_OUTCOME as e: + if isinstance(e, skip.Exception): + # The test requested a fixture which caused a skip. + # Don't show the fixture as the skip location, as then the user + # wouldn't know which test skipped. + e._use_item_location = True + fixturedef.cached_result = (None, my_cache_key, e) raise fixturedef.cached_result = (result, my_cache_key, None) return result -def _ensure_immutable_ids( - ids: Optional[Union[Sequence[Optional[object]], Callable[[Any], Optional[object]]]] -) -> Optional[Union[Tuple[Optional[object], ...], Callable[[Any], Optional[object]]]]: - if ids is None: - return None - if callable(ids): - return ids - return tuple(ids) - - -def _params_converter( - params: Optional[Iterable[object]], -) -> Optional[Tuple[object, ...]]: - return tuple(params) if params is not None else None - - def wrap_function_to_error_out_if_called_directly( function: FixtureFunction, fixture_marker: "FixtureFunctionMarker", @@ -1196,18 +1196,19 @@ class FixtureFunctionMarker: if getattr(function, "_pytestfixturefunction", False): raise ValueError( - "fixture is being applied more than once to the same function" + f"@pytest.fixture is being applied more than once to the same function {function.__name__!r}" ) + if hasattr(function, "pytestmark"): + warnings.warn(MARKED_FIXTURE, stacklevel=2) + function = wrap_function_to_error_out_if_called_directly(function, self) name = self.name or function.__name__ if name == "request": location = getlocation(function) fail( - "'request' is a reserved word for fixtures, use another name:\n {}".format( - location - ), + f"'request' is a reserved word for fixtures, use another name:\n {location}", pytrace=False, ) @@ -1373,6 +1374,31 @@ def pytest_addoption(parser: Parser) -> None: ) +def _get_direct_parametrize_args(node: nodes.Node) -> Set[str]: + """Return all direct parametrization arguments of a node, so we don't + mistake them for fixtures. + + Check https://github.com/pytest-dev/pytest/issues/5036. + + These things are done later as well when dealing with parametrization + so this could be improved. + """ + parametrize_argnames: Set[str] = set() + for marker in node.iter_markers(name="parametrize"): + if not marker.kwargs.get("indirect", False): + p_argnames, _ = ParameterSet._parse_parametrize_args( + *marker.args, **marker.kwargs + ) + parametrize_argnames.update(p_argnames) + return parametrize_argnames + + +def deduplicate_names(*seqs: Iterable[str]) -> Tuple[str, ...]: + """De-duplicate the sequence of names while keeping the original order.""" + # Ideally we would use a set, but it does not preserve insertion order. + return tuple(dict.fromkeys(name for seq in seqs for name in seq)) + + class FixtureManager: """pytest fixture definitions and information is stored and managed from this class. @@ -1410,70 +1436,75 @@ class FixtureManager: def __init__(self, session: "Session") -> None: self.session = session self.config: Config = session.config - self._arg2fixturedefs: Dict[str, List[FixtureDef[Any]]] = {} - self._holderobjseen: Set[object] = set() + # Maps a fixture name (argname) to all of the FixtureDefs in the test + # suite/plugins defined with this name. Populated by parsefactories(). + # TODO: The order of the FixtureDefs list of each arg is significant, + # explain. + self._arg2fixturedefs: Final[Dict[str, List[FixtureDef[Any]]]] = {} + self._holderobjseen: Final[Set[object]] = set() # A mapping from a nodeid to a list of autouse fixtures it defines. - self._nodeid_autousenames: Dict[str, List[str]] = { + self._nodeid_autousenames: Final[Dict[str, List[str]]] = { "": self.config.getini("usefixtures"), } session.config.pluginmanager.register(self, "funcmanage") - def _get_direct_parametrize_args(self, node: nodes.Node) -> List[str]: - """Return all direct parametrization arguments of a node, so we don't - mistake them for fixtures. + def getfixtureinfo( + self, + node: nodes.Item, + func: Optional[Callable[..., object]], + cls: Optional[type], + ) -> FuncFixtureInfo: + """Calculate the :class:`FuncFixtureInfo` for an item. - Check https://github.com/pytest-dev/pytest/issues/5036. + If ``func`` is None, or if the item sets an attribute + ``nofuncargs = True``, then ``func`` is not examined at all. - These things are done later as well when dealing with parametrization - so this could be improved. + :param node: + The item requesting the fixtures. + :param func: + The item's function. + :param cls: + If the function is a method, the method's class. """ - parametrize_argnames: List[str] = [] - for marker in node.iter_markers(name="parametrize"): - if not marker.kwargs.get("indirect", False): - p_argnames, _ = ParameterSet._parse_parametrize_args( - *marker.args, **marker.kwargs - ) - parametrize_argnames.extend(p_argnames) - - return parametrize_argnames - - def getfixtureinfo( - self, node: nodes.Node, func, cls, funcargs: bool = True - ) -> FuncFixtureInfo: - if funcargs and not getattr(node, "nofuncargs", False): + if func is not None and not getattr(node, "nofuncargs", False): argnames = getfuncargnames(func, name=node.name, cls=cls) else: argnames = () + usefixturesnames = self._getusefixturesnames(node) + autousenames = self._getautousenames(node.nodeid) + initialnames = deduplicate_names(autousenames, usefixturesnames, argnames) - usefixtures = tuple( - arg for mark in node.iter_markers(name="usefixtures") for arg in mark.args - ) - initialnames = usefixtures + argnames - fm = node.session._fixturemanager - initialnames, names_closure, arg2fixturedefs = fm.getfixtureclosure( - initialnames, node, ignore_args=self._get_direct_parametrize_args(node) + direct_parametrize_args = _get_direct_parametrize_args(node) + + names_closure, arg2fixturedefs = self.getfixtureclosure( + parentnode=node, + initialnames=initialnames, + ignore_args=direct_parametrize_args, ) + return FuncFixtureInfo(argnames, initialnames, names_closure, arg2fixturedefs) - def pytest_plugin_registered(self, plugin: _PluggyPlugin) -> None: - nodeid = None - try: - p = absolutepath(plugin.__file__) # type: ignore[attr-defined] - except AttributeError: - pass + def pytest_plugin_registered(self, plugin: _PluggyPlugin, plugin_name: str) -> None: + # Fixtures defined in conftest plugins are only visible to within the + # conftest's directory. This is unlike fixtures in non-conftest plugins + # which have global visibility. So for conftests, construct the base + # nodeid from the plugin name (which is the conftest path). + if plugin_name and plugin_name.endswith("conftest.py"): + # Note: we explicitly do *not* use `plugin.__file__` here -- The + # difference is that plugin_name has the correct capitalization on + # case-insensitive systems (Windows) and other normalization issues + # (issue #11816). + conftestpath = absolutepath(plugin_name) + try: + nodeid = str(conftestpath.parent.relative_to(self.config.rootpath)) + except ValueError: + nodeid = "" + if nodeid == ".": + nodeid = "" + if os.sep != nodes.SEP: + nodeid = nodeid.replace(os.sep, nodes.SEP) else: - # Construct the base nodeid which is later used to check - # what fixtures are visible for particular tests (as denoted - # by their test id). - if p.name.startswith("conftest.py"): - try: - nodeid = str(p.parent.relative_to(self.config.rootpath)) - except ValueError: - nodeid = "" - if nodeid == ".": - nodeid = "" - if os.sep != nodes.SEP: - nodeid = nodeid.replace(os.sep, nodes.SEP) + nodeid = None self.parsefactories(plugin, nodeid) @@ -1484,12 +1515,17 @@ class FixtureManager: if basenames: yield from basenames + def _getusefixturesnames(self, node: nodes.Item) -> Iterator[str]: + """Return the names of usefixtures fixtures applicable to node.""" + for mark in node.iter_markers(name="usefixtures"): + yield from mark.args + def getfixtureclosure( self, - fixturenames: Tuple[str, ...], parentnode: nodes.Node, - ignore_args: Sequence[str] = (), - ) -> Tuple[Tuple[str, ...], List[str], Dict[str, Sequence[FixtureDef[Any]]]]: + initialnames: Tuple[str, ...], + ignore_args: AbstractSet[str], + ) -> Tuple[List[str], Dict[str, Sequence[FixtureDef[Any]]]]: # Collect the closure of all fixtures, starting with the given # fixturenames as the initial set. As we have to visit all # factory definitions anyway, we also return an arg2fixturedefs @@ -1498,19 +1534,7 @@ class FixtureManager: # (discovering matching fixtures for a given name/node is expensive). parentid = parentnode.nodeid - fixturenames_closure = list(self._getautousenames(parentid)) - - def merge(otherlist: Iterable[str]) -> None: - for arg in otherlist: - if arg not in fixturenames_closure: - fixturenames_closure.append(arg) - - merge(fixturenames) - - # At this point, fixturenames_closure contains what we call "initialnames", - # which is a set of fixturenames the function immediately requests. We - # need to return it as well, so save this. - initialnames = tuple(fixturenames_closure) + fixturenames_closure = list(initialnames) arg2fixturedefs: Dict[str, Sequence[FixtureDef[Any]]] = {} lastlen = -1 @@ -1524,7 +1548,9 @@ class FixtureManager: fixturedefs = self.getfixturedefs(argname, parentid) if fixturedefs: arg2fixturedefs[argname] = fixturedefs - merge(fixturedefs[-1].argnames) + for arg in fixturedefs[-1].argnames: + if arg not in fixturenames_closure: + fixturenames_closure.append(arg) def sort_by_scope(arg_name: str) -> Scope: try: @@ -1535,7 +1561,7 @@ class FixtureManager: return fixturedefs[-1]._scope fixturenames_closure.sort(key=sort_by_scope, reverse=True) - return initialnames, fixturenames_closure, arg2fixturedefs + return fixturenames_closure, arg2fixturedefs def pytest_generate_tests(self, metafunc: "Metafunc") -> None: """Generate new tests based on parametrized fixtures used by the given metafunc""" @@ -1671,6 +1697,7 @@ class FixtureManager: params=marker.params, unittest=unittest, ids=marker.ids, + _ispytest=True, ) faclist = self._arg2fixturedefs.setdefault(name, []) @@ -1692,11 +1719,16 @@ class FixtureManager: def getfixturedefs( self, argname: str, nodeid: str ) -> Optional[Sequence[FixtureDef[Any]]]: - """Get a list of fixtures which are applicable to the given node id. + """Get FixtureDefs for a fixture name which are applicable + to a given node. + + Returns None if there are no fixtures at all defined with the given + name. (This is different from the case in which there are fixtures + with the given name, but none applicable to the node. In this case, + an empty result is returned). - :param str argname: Name of the fixture to search for. - :param str nodeid: Full node id of the requesting test. - :rtype: Sequence[FixtureDef] + :param argname: Name of the fixture to search for. + :param nodeid: Full node id of the requesting test. """ try: fixturedefs = self._arg2fixturedefs[argname] |