diff options
author | arcadia-devtools <arcadia-devtools@yandex-team.ru> | 2022-02-09 12:00:52 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 15:58:17 +0300 |
commit | 8e1413fed79d1e8036e65228af6c93399ccf5502 (patch) | |
tree | 502c9df7b2614d20541c7a2d39d390e9a51877cc /contrib/python/pytest/py3/_pytest/nodes.py | |
parent | 6b813c17d56d1d05f92c61ddc347d0e4d358fe85 (diff) | |
download | ydb-8e1413fed79d1e8036e65228af6c93399ccf5502.tar.gz |
intermediate changes
ref:614ed510ddd3cdf86a8c5dbf19afd113397e0172
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/nodes.py')
-rw-r--r-- | contrib/python/pytest/py3/_pytest/nodes.py | 471 |
1 files changed, 234 insertions, 237 deletions
diff --git a/contrib/python/pytest/py3/_pytest/nodes.py b/contrib/python/pytest/py3/_pytest/nodes.py index 6f22a8daaa..27434fb6a6 100644 --- a/contrib/python/pytest/py3/_pytest/nodes.py +++ b/contrib/python/pytest/py3/_pytest/nodes.py @@ -1,121 +1,143 @@ import os import warnings -from functools import lru_cache -from typing import Any -from typing import Dict +from pathlib import Path +from typing import Callable +from typing import Iterable +from typing import Iterator from typing import List from typing import Optional +from typing import overload from typing import Set from typing import Tuple +from typing import Type +from typing import TYPE_CHECKING +from typing import TypeVar from typing import Union import py import _pytest._code -from _pytest._code.code import ExceptionChainRepr +from _pytest._code import getfslineno from _pytest._code.code import ExceptionInfo -from _pytest._code.code import ReprExceptionInfo -from _pytest._code.source import getfslineno +from _pytest._code.code import TerminalRepr from _pytest.compat import cached_property -from _pytest.compat import TYPE_CHECKING from _pytest.config import Config from _pytest.config import ConftestImportFailure -from _pytest.config import PytestPluginManager -from _pytest.deprecated import NODE_USE_FROM_PARENT -from _pytest.fixtures import FixtureDef -from _pytest.fixtures import FixtureLookupError -from _pytest.fixtures import FixtureLookupErrorRepr +from _pytest.deprecated import FSCOLLECTOR_GETHOOKPROXY_ISINITPATH from _pytest.mark.structures import Mark from _pytest.mark.structures import MarkDecorator from _pytest.mark.structures import NodeKeywords from _pytest.outcomes import fail -from _pytest.pathlib import Path +from _pytest.pathlib import absolutepath from _pytest.store import Store if TYPE_CHECKING: # Imported here due to circular import. - from _pytest.main import Session # noqa: F401 + from _pytest.main import Session + from _pytest._code.code import _TracebackStyle + SEP = "/" tracebackcutdir = py.path.local(_pytest.__file__).dirpath() -@lru_cache(maxsize=None) -def _splitnode(nodeid): - """Split a nodeid into constituent 'parts'. +def iterparentnodeids(nodeid: str) -> Iterator[str]: + """Return the parent node IDs of a given node ID, inclusive. - Node IDs are strings, and can be things like: - '' - 'testing/code' - 'testing/code/test_excinfo.py' - 'testing/code/test_excinfo.py::TestFormattedExcinfo' + For the node ID - Return values are lists e.g. - [] - ['testing', 'code'] - ['testing', 'code', 'test_excinfo.py'] - ['testing', 'code', 'test_excinfo.py', 'TestFormattedExcinfo'] - """ - if nodeid == "": - # If there is no root node at all, return an empty list so the caller's logic can remain sane - return () - parts = nodeid.split(SEP) - # Replace single last element 'test_foo.py::Bar' with multiple elements 'test_foo.py', 'Bar' - parts[-1:] = parts[-1].split("::") - # Convert parts into a tuple to avoid possible errors with caching of a mutable type - return tuple(parts) + "testing/code/test_excinfo.py::TestFormattedExcinfo::test_repr_source" + the result would be -def ischildnode(baseid, nodeid): - """Return True if the nodeid is a child node of the baseid. + "" + "testing" + "testing/code" + "testing/code/test_excinfo.py" + "testing/code/test_excinfo.py::TestFormattedExcinfo" + "testing/code/test_excinfo.py::TestFormattedExcinfo::test_repr_source" - E.g. 'foo/bar::Baz' is a child of 'foo', 'foo/bar' and 'foo/bar::Baz', but not of 'foo/blorp' + Note that :: parts are only considered at the last / component. """ - base_parts = _splitnode(baseid) - node_parts = _splitnode(nodeid) - if len(node_parts) < len(base_parts): - return False - return node_parts[: len(base_parts)] == base_parts + pos = 0 + sep = SEP + yield "" + while True: + at = nodeid.find(sep, pos) + if at == -1 and sep == SEP: + sep = "::" + elif at == -1: + if nodeid: + yield nodeid + break + else: + if at: + yield nodeid[:at] + pos = at + len(sep) + + +_NodeType = TypeVar("_NodeType", bound="Node") class NodeMeta(type): def __call__(self, *k, **kw): - warnings.warn(NODE_USE_FROM_PARENT.format(name=self.__name__), stacklevel=2) - return super().__call__(*k, **kw) + msg = ( + "Direct construction of {name} has been deprecated, please use {name}.from_parent.\n" + "See " + "https://docs.pytest.org/en/stable/deprecations.html#node-construction-changed-to-node-from-parent" + " for more details." + ).format(name=self.__name__) + fail(msg, pytrace=False) def _create(self, *k, **kw): return super().__call__(*k, **kw) class Node(metaclass=NodeMeta): - """ base class for Collector and Item the test collection tree. - Collector subclasses have children, Items are terminal nodes.""" + """Base class for Collector and Item, the components of the test + collection tree. + + Collector subclasses have children; Items are leaf nodes. + """ + + # Use __slots__ to make attribute access faster. + # Note that __dict__ is still available. + __slots__ = ( + "name", + "parent", + "config", + "session", + "fspath", + "_nodeid", + "_store", + "__dict__", + ) def __init__( self, name: str, - parent: Optional["Node"] = None, + parent: "Optional[Node]" = None, config: Optional[Config] = None, - session: Optional["Session"] = None, + session: "Optional[Session]" = None, fspath: Optional[py.path.local] = None, nodeid: Optional[str] = None, ) -> None: - #: a unique name within the scope of the parent node + #: A unique name within the scope of the parent node. self.name = name - #: the parent collector node. + #: The parent collector node. self.parent = parent - #: the pytest config object + #: The pytest config object. if config: - self.config = config + self.config: Config = config else: if not parent: raise TypeError("config or parent must be provided") self.config = parent.config - #: the session this node is part of + #: The pytest session this node is part of. if session: self.session = session else: @@ -123,20 +145,17 @@ class Node(metaclass=NodeMeta): raise TypeError("session or parent must be provided") self.session = parent.session - #: filesystem path where this node was collected from (can be None) + #: Filesystem path where this node was collected from (can be None). self.fspath = fspath or getattr(parent, "fspath", None) - #: keywords/markers collected from all scopes + #: Keywords/markers collected from all scopes. self.keywords = NodeKeywords(self) - #: the marker objects belonging to this node - self.own_markers = [] # type: List[Mark] - - #: allow adding of extra keywords to use for matching - self.extra_keyword_matches = set() # type: Set[str] + #: The marker objects belonging to this node. + self.own_markers: List[Mark] = [] - # used for storing artificial fixturedefs for direct parametrization - self._name2pseudofixturedef = {} # type: Dict[str, FixtureDef] + #: Allow adding of extra keywords to use for matching. + self.extra_keyword_matches: Set[str] = set() if nodeid is not None: assert "::()" not in nodeid @@ -154,15 +173,15 @@ class Node(metaclass=NodeMeta): @classmethod def from_parent(cls, parent: "Node", **kw): - """ - Public Constructor for Nodes + """Public constructor for Nodes. This indirection got introduced in order to enable removing the fragile logic from the node constructors. - Subclasses can use ``super().from_parent(...)`` when overriding the construction + Subclasses can use ``super().from_parent(...)`` when overriding the + construction. - :param parent: the parent node of this test Node + :param parent: The parent node of this Node. """ if "config" in kw: raise TypeError("config is not a valid argument for from_parent") @@ -172,64 +191,67 @@ class Node(metaclass=NodeMeta): @property def ihook(self): - """ fspath sensitive hook proxy used to call pytest hooks""" + """fspath-sensitive hook proxy used to call pytest hooks.""" return self.session.gethookproxy(self.fspath) - def __repr__(self): + def __repr__(self) -> str: return "<{} {}>".format(self.__class__.__name__, getattr(self, "name", None)) - def warn(self, warning): - """Issue a warning for this item. + def warn(self, warning: Warning) -> None: + """Issue a warning for this Node. - Warnings will be displayed after the test session, unless explicitly suppressed + Warnings will be displayed after the test session, unless explicitly suppressed. - :param Warning warning: the warning instance to issue. Must be a subclass of PytestWarning. + :param Warning warning: + The warning instance to issue. - :raise ValueError: if ``warning`` instance is not a subclass of PytestWarning. + :raises ValueError: If ``warning`` instance is not a subclass of Warning. Example usage: .. code-block:: python node.warn(PytestWarning("some message")) + node.warn(UserWarning("some message")) + .. versionchanged:: 6.2 + Any subclass of :class:`Warning` is now accepted, rather than only + :class:`PytestWarning <pytest.PytestWarning>` subclasses. """ - from _pytest.warning_types import PytestWarning - - if not isinstance(warning, PytestWarning): + # enforce type checks here to avoid getting a generic type error later otherwise. + if not isinstance(warning, Warning): raise ValueError( - "warning must be an instance of PytestWarning or subclass, got {!r}".format( + "warning must be an instance of Warning or subclass, got {!r}".format( warning ) ) path, lineno = get_fslocation_from_item(self) + assert lineno is not None warnings.warn_explicit( - warning, - category=None, - filename=str(path), - lineno=lineno + 1 if lineno is not None else None, + warning, category=None, filename=str(path), lineno=lineno + 1, ) - # methods for ordering nodes + # Methods for ordering nodes. + @property - def nodeid(self): - """ a ::-separated string denoting its collection tree address. """ + def nodeid(self) -> str: + """A ::-separated string denoting its collection tree address.""" return self._nodeid - def __hash__(self): - return hash(self.nodeid) + def __hash__(self) -> int: + return hash(self._nodeid) - def setup(self): + def setup(self) -> None: pass - def teardown(self): + def teardown(self) -> None: pass - def listchain(self): - """ return list of all parent collectors up to self, - starting from root of collection tree. """ + def listchain(self) -> List["Node"]: + """Return list of all parent collectors up to self, starting from + the root of collection tree.""" chain = [] - item = self # type: Optional[Node] + item: Optional[Node] = self while item is not None: chain.append(item) item = item.parent @@ -239,12 +261,10 @@ class Node(metaclass=NodeMeta): def add_marker( self, marker: Union[str, MarkDecorator], append: bool = True ) -> None: - """dynamically add a marker object to the node. + """Dynamically add a marker object to the node. - :type marker: ``str`` or ``pytest.mark.*`` object - :param marker: - ``append=True`` whether to append the marker, - if ``False`` insert at position ``0``. + :param append: + Whether to append the marker, or prepend it. """ from _pytest.mark import MARK_GEN @@ -254,78 +274,93 @@ class Node(metaclass=NodeMeta): marker_ = getattr(MARK_GEN, marker) else: raise ValueError("is not a string or pytest.mark.* Marker") - self.keywords[marker_.name] = marker + self.keywords[marker_.name] = marker_ if append: self.own_markers.append(marker_.mark) else: self.own_markers.insert(0, marker_.mark) - def iter_markers(self, name=None): - """ - :param name: if given, filter the results by the name attribute + def iter_markers(self, name: Optional[str] = None) -> Iterator[Mark]: + """Iterate over all markers of the node. - iterate over all markers of the node + :param name: If given, filter the results by the name attribute. """ return (x[1] for x in self.iter_markers_with_node(name=name)) - def iter_markers_with_node(self, name=None): - """ - :param name: if given, filter the results by the name attribute + def iter_markers_with_node( + self, name: Optional[str] = None + ) -> Iterator[Tuple["Node", Mark]]: + """Iterate over all markers of the node. - iterate over all markers of the node - returns sequence of tuples (node, mark) + :param name: If given, filter the results by the name attribute. + :returns: An iterator of (node, mark) tuples. """ for node in reversed(self.listchain()): for mark in node.own_markers: if name is None or getattr(mark, "name", None) == name: yield node, mark - def get_closest_marker(self, name, default=None): - """return the first marker matching the name, from closest (for example function) to farther level (for example - module level). + @overload + def get_closest_marker(self, name: str) -> Optional[Mark]: + ... + + @overload + def get_closest_marker(self, name: str, default: Mark) -> Mark: + ... + + def get_closest_marker( + self, name: str, default: Optional[Mark] = None + ) -> Optional[Mark]: + """Return the first marker matching the name, from closest (for + example function) to farther level (for example module level). - :param default: fallback return value of no marker was found - :param name: name to filter by + :param default: Fallback return value if no marker was found. + :param name: Name to filter by. """ return next(self.iter_markers(name=name), default) - def listextrakeywords(self): - """ Return a set of all extra keywords in self and any parents.""" - extra_keywords = set() # type: Set[str] + def listextrakeywords(self) -> Set[str]: + """Return a set of all extra keywords in self and any parents.""" + extra_keywords: Set[str] = set() for item in self.listchain(): extra_keywords.update(item.extra_keyword_matches) return extra_keywords - def listnames(self): + def listnames(self) -> List[str]: return [x.name for x in self.listchain()] - def addfinalizer(self, fin): - """ register a function to be called when this node is finalized. + def addfinalizer(self, fin: Callable[[], object]) -> None: + """Register a function to be called when this node is finalized. This method can only be called when this node is active in a setup chain, for example during self.setup(). """ self.session._setupstate.addfinalizer(fin, self) - def getparent(self, cls): - """ get the next parent node (including ourself) - which is an instance of the given class""" - current = self # type: Optional[Node] + def getparent(self, cls: Type[_NodeType]) -> Optional[_NodeType]: + """Get the next parent node (including self) which is an instance of + the given class.""" + current: Optional[Node] = self while current and not isinstance(current, cls): current = current.parent + assert current is None or isinstance(current, cls) return current - def _prunetraceback(self, excinfo): + def _prunetraceback(self, excinfo: ExceptionInfo[BaseException]) -> None: pass def _repr_failure_py( - self, excinfo: ExceptionInfo[BaseException], style=None, - ) -> Union[str, ReprExceptionInfo, ExceptionChainRepr, FixtureLookupErrorRepr]: + self, + excinfo: ExceptionInfo[BaseException], + style: "Optional[_TracebackStyle]" = None, + ) -> TerminalRepr: + from _pytest.fixtures import FixtureLookupError + if isinstance(excinfo.value, ConftestImportFailure): excinfo = ExceptionInfo(excinfo.value.excinfo) if isinstance(excinfo.value, fail.Exception): if not excinfo.value.pytrace: - return str(excinfo.value) + style = "value" if isinstance(excinfo.value, FixtureLookupError): return excinfo.value.formatrepr() if self.config.getoption("fulltrace", False): @@ -356,7 +391,7 @@ class Node(metaclass=NodeMeta): # It will be better to just always display paths relative to invocation_dir, but # this requires a lot of plumbing (#6428). try: - abspath = Path(os.getcwd()) != Path(self.config.invocation_dir) + abspath = Path(os.getcwd()) != self.config.invocation_params.dir except OSError: abspath = True @@ -370,49 +405,59 @@ class Node(metaclass=NodeMeta): ) def repr_failure( - self, excinfo, style=None - ) -> Union[str, ReprExceptionInfo, ExceptionChainRepr, FixtureLookupErrorRepr]: + self, + excinfo: ExceptionInfo[BaseException], + style: "Optional[_TracebackStyle]" = None, + ) -> Union[str, TerminalRepr]: + """Return a representation of a collection or test failure. + + :param excinfo: Exception information for the failure. + """ return self._repr_failure_py(excinfo, style) def get_fslocation_from_item( - item: "Item", + node: "Node", ) -> Tuple[Union[str, py.path.local], Optional[int]]: - """Tries to extract the actual location from an item, depending on available attributes: + """Try to extract the actual location from a node, depending on available attributes: - * "fslocation": a pair (path, lineno) - * "obj": a Python object that the item wraps. + * "location": a pair (path, lineno) + * "obj": a Python object that the node wraps. * "fspath": just a path - :rtype: a tuple of (str|LocalPath, int) with filename and line number. + :rtype: A tuple of (str|py.path.local, int) with filename and line number. """ - try: - return item.location[:2] - except AttributeError: - pass - obj = getattr(item, "obj", None) + # See Item.location. + location: Optional[Tuple[str, Optional[int], str]] = getattr(node, "location", None) + if location is not None: + return location[:2] + obj = getattr(node, "obj", None) if obj is not None: return getfslineno(obj) - return getattr(item, "fspath", "unknown location"), -1 + return getattr(node, "fspath", "unknown location"), -1 class Collector(Node): - """ Collector instances create children through collect() - and thus iteratively build a tree. - """ + """Collector instances create children through collect() and thus + iteratively build a tree.""" class CollectError(Exception): - """ an error during collection, contains a custom message. """ + """An error during collection, contains a custom message.""" - def collect(self): - """ returns a list of children (items and collectors) - for this collection node. - """ + def collect(self) -> Iterable[Union["Item", "Collector"]]: + """Return a list of children (items and collectors) for this + collection node.""" raise NotImplementedError("abstract") - def repr_failure(self, excinfo): - """ represent a collection failure. """ - if excinfo.errisinstance(self.CollectError) and not self.config.getoption( + # TODO: This omits the style= parameter which breaks Liskov Substitution. + def repr_failure( # type: ignore[override] + self, excinfo: ExceptionInfo[BaseException] + ) -> Union[str, TerminalRepr]: + """Return a representation of a collection failure. + + :param excinfo: Exception information for the failure. + """ + if isinstance(excinfo.value, self.CollectError) and not self.config.getoption( "fulltrace", False ): exc = excinfo.value @@ -426,7 +471,7 @@ class Collector(Node): return self._repr_failure_py(excinfo, style=tbstyle) - def _prunetraceback(self, excinfo): + def _prunetraceback(self, excinfo: ExceptionInfo[BaseException]) -> None: if hasattr(self, "fspath"): traceback = excinfo.traceback ntraceback = traceback.cut(path=self.fspath) @@ -441,23 +486,14 @@ def _check_initialpaths_for_relpath(session, fspath): return fspath.relto(initial_path) -class FSHookProxy: - def __init__( - self, fspath: py.path.local, pm: PytestPluginManager, remove_mods - ) -> None: - self.fspath = fspath - self.pm = pm - self.remove_mods = remove_mods - - def __getattr__(self, name: str): - x = self.pm.subset_hook_caller(name, remove_plugins=self.remove_mods) - self.__dict__[name] = x - return x - - class FSCollector(Collector): def __init__( - self, fspath: py.path.local, parent=None, config=None, session=None, nodeid=None + self, + fspath: py.path.local, + parent=None, + config: Optional[Config] = None, + session: Optional["Session"] = None, + nodeid: Optional[str] = None, ) -> None: name = fspath.basename if parent is not None: @@ -479,91 +515,56 @@ class FSCollector(Collector): super().__init__(name, parent, config, session, nodeid=nodeid, fspath=fspath) - self._norecursepatterns = self.config.getini("norecursedirs") - @classmethod def from_parent(cls, parent, *, fspath, **kw): - """ - The public constructor - """ + """The public constructor.""" return super().from_parent(parent=parent, fspath=fspath, **kw) - def _gethookproxy(self, fspath: py.path.local): - # check if we have the common case of running - # hooks with all conftest.py files - pm = self.config.pluginmanager - my_conftestmodules = pm._getconftestmodules(fspath) - remove_mods = pm._conftest_plugins.difference(my_conftestmodules) - if remove_mods: - # one or more conftests are not in use at this fspath - proxy = FSHookProxy(fspath, pm, remove_mods) - else: - # all plugins are active for this fspath - proxy = self.config.hook - return proxy - - def _recurse(self, dirpath: py.path.local) -> bool: - if dirpath.basename == "__pycache__": - return False - ihook = self._gethookproxy(dirpath.dirpath()) - if ihook.pytest_ignore_collect(path=dirpath, config=self.config): - return False - for pat in self._norecursepatterns: - if dirpath.check(fnmatch=pat): - return False - ihook = self._gethookproxy(dirpath) - ihook.pytest_collect_directory(path=dirpath, parent=self) - return True - - def _collectfile(self, path, handle_dupes=True): - assert ( - path.isfile() - ), "{!r} is not a file (isdir={!r}, exists={!r}, islink={!r})".format( - path, path.isdir(), path.exists(), path.islink() - ) - ihook = self.gethookproxy(path) - if not self.isinitpath(path): - if ihook.pytest_ignore_collect(path=path, config=self.config): - return () - - if handle_dupes: - keepduplicates = self.config.getoption("keepduplicates") - if not keepduplicates: - duplicate_paths = self.config.pluginmanager._duplicatepaths - if path in duplicate_paths: - return () - else: - duplicate_paths.add(path) + def gethookproxy(self, fspath: py.path.local): + warnings.warn(FSCOLLECTOR_GETHOOKPROXY_ISINITPATH, stacklevel=2) + return self.session.gethookproxy(fspath) - return ihook.pytest_collect_file(path=path, parent=self) + def isinitpath(self, path: py.path.local) -> bool: + warnings.warn(FSCOLLECTOR_GETHOOKPROXY_ISINITPATH, stacklevel=2) + return self.session.isinitpath(path) class File(FSCollector): - """ base class for collecting tests from a file. """ + """Base class for collecting tests from a file. + + :ref:`non-python tests`. + """ class Item(Node): - """ a basic test invocation item. Note that for a single function - there might be multiple test invocation items. + """A basic test invocation item. + + Note that for a single function there might be multiple test invocation items. """ nextitem = None - def __init__(self, name, parent=None, config=None, session=None, nodeid=None): + def __init__( + self, + name, + parent=None, + config: Optional[Config] = None, + session: Optional["Session"] = None, + nodeid: Optional[str] = None, + ) -> None: super().__init__(name, parent, config, session, nodeid=nodeid) - self._report_sections = [] # type: List[Tuple[str, str, str]] + self._report_sections: List[Tuple[str, str, str]] = [] - #: user properties is a list of tuples (name, value) that holds user - #: defined properties for this test. - self.user_properties = [] # type: List[Tuple[str, Any]] + #: A list of tuples (name, value) that holds user defined properties + #: for this test. + self.user_properties: List[Tuple[str, object]] = [] def runtest(self) -> None: raise NotImplementedError("runtest must be implemented by Item subclass") def add_report_section(self, when: str, key: str, content: str) -> None: - """ - Adds a new report section, similar to what's done internally to add stdout and - stderr captured output:: + """Add a new report section, similar to what's done internally to add + stdout and stderr captured output:: item.add_report_section("call", "stdout", "report section contents") @@ -572,7 +573,6 @@ class Item(Node): :param str key: Name of the section, can be customized at will. Pytest uses ``"stdout"`` and ``"stderr"`` internally. - :param str content: The full contents as a string. """ @@ -585,10 +585,7 @@ class Item(Node): @cached_property def location(self) -> Tuple[str, Optional[int], str]: location = self.reportinfo() - if isinstance(location[0], py.path.local): - fspath = location[0] - else: - fspath = py.path.local(location[0]) + fspath = absolutepath(str(location[0])) relfspath = self.session._node_location_to_relpath(fspath) assert type(location[2]) is str return (relfspath, location[1], location[2]) |