aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest/py3/_pytest/nodes.py
diff options
context:
space:
mode:
authordeshevoy <deshevoy@yandex-team.ru>2022-02-10 16:46:56 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:56 +0300
commite988f30484abe5fdeedcc7a5d3c226c01a21800c (patch)
tree0a217b173aabb57b7e51f8a169989b1a3e0309fe /contrib/python/pytest/py3/_pytest/nodes.py
parent33ee501c05d3f24036ae89766a858930ae66c548 (diff)
downloadydb-e988f30484abe5fdeedcc7a5d3c226c01a21800c.tar.gz
Restoring authorship annotation for <deshevoy@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/nodes.py')
-rw-r--r--contrib/python/pytest/py3/_pytest/nodes.py460
1 files changed, 230 insertions, 230 deletions
diff --git a/contrib/python/pytest/py3/_pytest/nodes.py b/contrib/python/pytest/py3/_pytest/nodes.py
index 27434fb6a6..a4a4b5b57c 100644
--- a/contrib/python/pytest/py3/_pytest/nodes.py
+++ b/contrib/python/pytest/py3/_pytest/nodes.py
@@ -1,5 +1,5 @@
-import os
-import warnings
+import os
+import warnings
from pathlib import Path
from typing import Callable
from typing import Iterable
@@ -13,10 +13,10 @@ from typing import Type
from typing import TYPE_CHECKING
from typing import TypeVar
from typing import Union
-
-import py
-
-import _pytest._code
+
+import py
+
+import _pytest._code
from _pytest._code import getfslineno
from _pytest._code.code import ExceptionInfo
from _pytest._code.code import TerminalRepr
@@ -26,40 +26,40 @@ from _pytest.config import ConftestImportFailure
from _pytest.deprecated import FSCOLLECTOR_GETHOOKPROXY_ISINITPATH
from _pytest.mark.structures import Mark
from _pytest.mark.structures import MarkDecorator
-from _pytest.mark.structures import NodeKeywords
-from _pytest.outcomes import fail
+from _pytest.mark.structures import NodeKeywords
+from _pytest.outcomes import fail
from _pytest.pathlib import absolutepath
from _pytest.store import Store
-
+
if TYPE_CHECKING:
# Imported here due to circular import.
from _pytest.main import Session
from _pytest._code.code import _TracebackStyle
-SEP = "/"
-
-tracebackcutdir = py.path.local(_pytest.__file__).dirpath()
-
-
+SEP = "/"
+
+tracebackcutdir = py.path.local(_pytest.__file__).dirpath()
+
+
def iterparentnodeids(nodeid: str) -> Iterator[str]:
"""Return the parent node IDs of a given node ID, inclusive.
-
+
For the node ID
-
+
"testing/code/test_excinfo.py::TestFormattedExcinfo::test_repr_source"
-
+
the result would be
-
+
""
"testing"
"testing/code"
"testing/code/test_excinfo.py"
"testing/code/test_excinfo.py::TestFormattedExcinfo"
"testing/code/test_excinfo.py::TestFormattedExcinfo::test_repr_source"
-
+
Note that :: parts are only considered at the last / component.
- """
+ """
pos = 0
sep = SEP
yield ""
@@ -75,8 +75,8 @@ def iterparentnodeids(nodeid: str) -> Iterator[str]:
if at:
yield nodeid[:at]
pos = at + len(sep)
-
-
+
+
_NodeType = TypeVar("_NodeType", bound="Node")
@@ -97,7 +97,7 @@ class NodeMeta(type):
class Node(metaclass=NodeMeta):
"""Base class for Collector and Item, the components of the test
collection tree.
-
+
Collector subclasses have children; Items are leaf nodes.
"""
@@ -114,7 +114,7 @@ class Node(metaclass=NodeMeta):
"__dict__",
)
- def __init__(
+ def __init__(
self,
name: str,
parent: "Optional[Node]" = None,
@@ -124,11 +124,11 @@ class Node(metaclass=NodeMeta):
nodeid: Optional[str] = None,
) -> None:
#: A unique name within the scope of the parent node.
- self.name = name
-
+ self.name = name
+
#: The parent collector node.
- self.parent = parent
-
+ self.parent = parent
+
#: The pytest config object.
if config:
self.config: Config = config
@@ -136,7 +136,7 @@ class Node(metaclass=NodeMeta):
if not parent:
raise TypeError("config or parent must be provided")
self.config = parent.config
-
+
#: The pytest session this node is part of.
if session:
self.session = session
@@ -144,29 +144,29 @@ class Node(metaclass=NodeMeta):
if not parent:
raise TypeError("session or parent must be provided")
self.session = parent.session
-
+
#: Filesystem path where this node was collected from (can be None).
- self.fspath = fspath or getattr(parent, "fspath", None)
-
+ self.fspath = fspath or getattr(parent, "fspath", None)
+
#: Keywords/markers collected from all scopes.
- self.keywords = NodeKeywords(self)
-
+ self.keywords = NodeKeywords(self)
+
#: The marker objects belonging to this node.
self.own_markers: List[Mark] = []
-
+
#: Allow adding of extra keywords to use for matching.
self.extra_keyword_matches: Set[str] = set()
-
- if nodeid is not None:
- assert "::()" not in nodeid
- self._nodeid = nodeid
- else:
+
+ if nodeid is not None:
+ assert "::()" not in nodeid
+ self._nodeid = nodeid
+ else:
if not self.parent:
raise TypeError("nodeid or parent must be provided")
- self._nodeid = self.parent.nodeid
- if self.name != "()":
- self._nodeid += "::" + self.name
-
+ self._nodeid = self.parent.nodeid
+ if self.name != "()":
+ self._nodeid += "::" + self.name
+
# A place where plugins can store information on the node for their
# own use. Currently only intended for internal plugins.
self._store = Store()
@@ -189,121 +189,121 @@ class Node(metaclass=NodeMeta):
raise TypeError("session is not a valid argument for from_parent")
return cls._create(parent=parent, **kw)
- @property
- def ihook(self):
+ @property
+ def ihook(self):
"""fspath-sensitive hook proxy used to call pytest hooks."""
- return self.session.gethookproxy(self.fspath)
-
+ return self.session.gethookproxy(self.fspath)
+
def __repr__(self) -> str:
return "<{} {}>".format(self.__class__.__name__, getattr(self, "name", None))
-
+
def warn(self, warning: Warning) -> None:
"""Issue a warning for this Node.
-
+
Warnings will be displayed after the test session, unless explicitly suppressed.
-
+
:param Warning warning:
The warning instance to issue.
-
+
:raises ValueError: If ``warning`` instance is not a subclass of Warning.
-
+
Example usage:
-
- .. code-block:: python
-
- node.warn(PytestWarning("some message"))
+
+ .. code-block:: python
+
+ node.warn(PytestWarning("some message"))
node.warn(UserWarning("some message"))
-
+
.. versionchanged:: 6.2
Any subclass of :class:`Warning` is now accepted, rather than only
:class:`PytestWarning <pytest.PytestWarning>` subclasses.
- """
+ """
# enforce type checks here to avoid getting a generic type error later otherwise.
if not isinstance(warning, Warning):
- raise ValueError(
+ raise ValueError(
"warning must be an instance of Warning or subclass, got {!r}".format(
- warning
- )
- )
- path, lineno = get_fslocation_from_item(self)
+ warning
+ )
+ )
+ path, lineno = get_fslocation_from_item(self)
assert lineno is not None
- warnings.warn_explicit(
+ warnings.warn_explicit(
warning, category=None, filename=str(path), lineno=lineno + 1,
- )
-
+ )
+
# Methods for ordering nodes.
- @property
+ @property
def nodeid(self) -> str:
"""A ::-separated string denoting its collection tree address."""
- return self._nodeid
-
+ return self._nodeid
+
def __hash__(self) -> int:
return hash(self._nodeid)
-
+
def setup(self) -> None:
- pass
-
+ pass
+
def teardown(self) -> None:
- pass
-
+ pass
+
def listchain(self) -> List["Node"]:
"""Return list of all parent collectors up to self, starting from
the root of collection tree."""
- chain = []
+ chain = []
item: Optional[Node] = self
- while item is not None:
- chain.append(item)
- item = item.parent
- chain.reverse()
- return chain
-
+ while item is not None:
+ chain.append(item)
+ item = item.parent
+ chain.reverse()
+ return chain
+
def add_marker(
self, marker: Union[str, MarkDecorator], append: bool = True
) -> None:
"""Dynamically add a marker object to the node.
-
+
:param append:
Whether to append the marker, or prepend it.
- """
+ """
from _pytest.mark import MARK_GEN
-
+
if isinstance(marker, MarkDecorator):
marker_ = marker
elif isinstance(marker, str):
marker_ = getattr(MARK_GEN, marker)
else:
- raise ValueError("is not a string or pytest.mark.* Marker")
+ raise ValueError("is not a string or pytest.mark.* Marker")
self.keywords[marker_.name] = marker_
- if append:
+ if append:
self.own_markers.append(marker_.mark)
- else:
+ else:
self.own_markers.insert(0, marker_.mark)
-
+
def iter_markers(self, name: Optional[str] = None) -> Iterator[Mark]:
"""Iterate over all markers of the node.
-
+
:param name: If given, filter the results by the name attribute.
- """
- return (x[1] for x in self.iter_markers_with_node(name=name))
-
+ """
+ return (x[1] for x in self.iter_markers_with_node(name=name))
+
def iter_markers_with_node(
self, name: Optional[str] = None
) -> Iterator[Tuple["Node", Mark]]:
"""Iterate over all markers of the node.
-
+
:param name: If given, filter the results by the name attribute.
:returns: An iterator of (node, mark) tuples.
- """
- for node in reversed(self.listchain()):
- for mark in node.own_markers:
- if name is None or getattr(mark, "name", None) == name:
- yield node, mark
-
+ """
+ for node in reversed(self.listchain()):
+ for mark in node.own_markers:
+ if name is None or getattr(mark, "name", None) == name:
+ yield node, mark
+
@overload
def get_closest_marker(self, name: str) -> Optional[Mark]:
...
-
+
@overload
def get_closest_marker(self, name: str, default: Mark) -> Mark:
...
@@ -316,39 +316,39 @@ class Node(metaclass=NodeMeta):
:param default: Fallback return value if no marker was found.
:param name: Name to filter by.
- """
- return next(self.iter_markers(name=name), default)
-
+ """
+ return next(self.iter_markers(name=name), default)
+
def listextrakeywords(self) -> Set[str]:
"""Return a set of all extra keywords in self and any parents."""
extra_keywords: Set[str] = set()
- for item in self.listchain():
- extra_keywords.update(item.extra_keyword_matches)
- return extra_keywords
-
+ for item in self.listchain():
+ extra_keywords.update(item.extra_keyword_matches)
+ return extra_keywords
+
def listnames(self) -> List[str]:
- return [x.name for x in self.listchain()]
-
+ return [x.name for x in self.listchain()]
+
def addfinalizer(self, fin: Callable[[], object]) -> None:
"""Register a function to be called when this node is finalized.
-
- This method can only be called when this node is active
- in a setup chain, for example during self.setup().
- """
- self.session._setupstate.addfinalizer(fin, self)
-
+
+ This method can only be called when this node is active
+ in a setup chain, for example during self.setup().
+ """
+ self.session._setupstate.addfinalizer(fin, self)
+
def getparent(self, cls: Type[_NodeType]) -> Optional[_NodeType]:
"""Get the next parent node (including self) which is an instance of
the given class."""
current: Optional[Node] = self
- while current and not isinstance(current, cls):
- current = current.parent
+ while current and not isinstance(current, cls):
+ current = current.parent
assert current is None or isinstance(current, cls)
- return current
-
+ return current
+
def _prunetraceback(self, excinfo: ExceptionInfo[BaseException]) -> None:
- pass
-
+ pass
+
def _repr_failure_py(
self,
excinfo: ExceptionInfo[BaseException],
@@ -359,51 +359,51 @@ class Node(metaclass=NodeMeta):
if isinstance(excinfo.value, ConftestImportFailure):
excinfo = ExceptionInfo(excinfo.value.excinfo)
if isinstance(excinfo.value, fail.Exception):
- if not excinfo.value.pytrace:
+ if not excinfo.value.pytrace:
style = "value"
if isinstance(excinfo.value, FixtureLookupError):
- return excinfo.value.formatrepr()
+ return excinfo.value.formatrepr()
if self.config.getoption("fulltrace", False):
- style = "long"
- else:
- tb = _pytest._code.Traceback([excinfo.traceback[-1]])
- self._prunetraceback(excinfo)
- if len(excinfo.traceback) == 0:
- excinfo.traceback = tb
- if style == "auto":
- style = "long"
- # XXX should excinfo.getrepr record all data and toterminal() process it?
- if style is None:
+ style = "long"
+ else:
+ tb = _pytest._code.Traceback([excinfo.traceback[-1]])
+ self._prunetraceback(excinfo)
+ if len(excinfo.traceback) == 0:
+ excinfo.traceback = tb
+ if style == "auto":
+ style = "long"
+ # XXX should excinfo.getrepr record all data and toterminal() process it?
+ if style is None:
if self.config.getoption("tbstyle", "auto") == "short":
- style = "short"
- else:
- style = "long"
-
+ style = "short"
+ else:
+ style = "long"
+
if self.config.getoption("verbose", 0) > 1:
- truncate_locals = False
- else:
- truncate_locals = True
-
+ truncate_locals = False
+ else:
+ truncate_locals = True
+
# excinfo.getrepr() formats paths relative to the CWD if `abspath` is False.
# It is possible for a fixture/test to change the CWD while this code runs, which
# would then result in the user seeing confusing paths in the failure message.
# To fix this, if the CWD changed, always display the full absolute path.
# It will be better to just always display paths relative to invocation_dir, but
# this requires a lot of plumbing (#6428).
- try:
+ try:
abspath = Path(os.getcwd()) != self.config.invocation_params.dir
- except OSError:
- abspath = True
-
- return excinfo.getrepr(
- funcargs=True,
- abspath=abspath,
+ except OSError:
+ abspath = True
+
+ return excinfo.getrepr(
+ funcargs=True,
+ abspath=abspath,
showlocals=self.config.getoption("showlocals", False),
- style=style,
+ style=style,
tbfilter=False, # pruned already, or in --fulltrace mode.
- truncate_locals=truncate_locals,
- )
-
+ truncate_locals=truncate_locals,
+ )
+
def repr_failure(
self,
excinfo: ExceptionInfo[BaseException],
@@ -414,41 +414,41 @@ class Node(metaclass=NodeMeta):
:param excinfo: Exception information for the failure.
"""
return self._repr_failure_py(excinfo, style)
-
-
+
+
def get_fslocation_from_item(
node: "Node",
) -> Tuple[Union[str, py.path.local], Optional[int]]:
"""Try to extract the actual location from a node, depending on available attributes:
-
+
* "location": a pair (path, lineno)
* "obj": a Python object that the node wraps.
- * "fspath": just a path
-
+ * "fspath": just a path
+
:rtype: A tuple of (str|py.path.local, int) with filename and line number.
- """
+ """
# See Item.location.
location: Optional[Tuple[str, Optional[int], str]] = getattr(node, "location", None)
if location is not None:
return location[:2]
obj = getattr(node, "obj", None)
- if obj is not None:
- return getfslineno(obj)
+ if obj is not None:
+ return getfslineno(obj)
return getattr(node, "fspath", "unknown location"), -1
-
-
-class Collector(Node):
+
+
+class Collector(Node):
"""Collector instances create children through collect() and thus
iteratively build a tree."""
-
- class CollectError(Exception):
+
+ class CollectError(Exception):
"""An error during collection, contains a custom message."""
-
+
def collect(self) -> Iterable[Union["Item", "Collector"]]:
"""Return a list of children (items and collectors) for this
collection node."""
- raise NotImplementedError("abstract")
-
+ raise NotImplementedError("abstract")
+
# TODO: This omits the style= parameter which breaks Liskov Substitution.
def repr_failure( # type: ignore[override]
self, excinfo: ExceptionInfo[BaseException]
@@ -460,9 +460,9 @@ class Collector(Node):
if isinstance(excinfo.value, self.CollectError) and not self.config.getoption(
"fulltrace", False
):
- exc = excinfo.value
- return str(exc.args[0])
-
+ exc = excinfo.value
+ return str(exc.args[0])
+
# Respect explicit tbstyle option, but default to "short"
# (_repr_failure_py uses "long" with "fulltrace" option always).
tbstyle = self.config.getoption("tbstyle", "auto")
@@ -472,21 +472,21 @@ class Collector(Node):
return self._repr_failure_py(excinfo, style=tbstyle)
def _prunetraceback(self, excinfo: ExceptionInfo[BaseException]) -> None:
- if hasattr(self, "fspath"):
- traceback = excinfo.traceback
- ntraceback = traceback.cut(path=self.fspath)
- if ntraceback == traceback:
- ntraceback = ntraceback.cut(excludepath=tracebackcutdir)
- excinfo.traceback = ntraceback.filter()
-
-
-def _check_initialpaths_for_relpath(session, fspath):
- for initial_path in session._initialpaths:
- if fspath.common(initial_path) == initial_path:
- return fspath.relto(initial_path)
-
-
-class FSCollector(Collector):
+ if hasattr(self, "fspath"):
+ traceback = excinfo.traceback
+ ntraceback = traceback.cut(path=self.fspath)
+ if ntraceback == traceback:
+ ntraceback = ntraceback.cut(excludepath=tracebackcutdir)
+ excinfo.traceback = ntraceback.filter()
+
+
+def _check_initialpaths_for_relpath(session, fspath):
+ for initial_path in session._initialpaths:
+ if fspath.common(initial_path) == initial_path:
+ return fspath.relto(initial_path)
+
+
+class FSCollector(Collector):
def __init__(
self,
fspath: py.path.local,
@@ -495,24 +495,24 @@ class FSCollector(Collector):
session: Optional["Session"] = None,
nodeid: Optional[str] = None,
) -> None:
- name = fspath.basename
- if parent is not None:
- rel = fspath.relto(parent.fspath)
- if rel:
- name = rel
- name = name.replace(os.sep, SEP)
- self.fspath = fspath
-
- session = session or parent.session
-
- if nodeid is None:
- nodeid = self.fspath.relto(session.config.rootdir)
-
- if not nodeid:
- nodeid = _check_initialpaths_for_relpath(session, fspath)
- if nodeid and os.sep != SEP:
- nodeid = nodeid.replace(os.sep, SEP)
-
+ name = fspath.basename
+ if parent is not None:
+ rel = fspath.relto(parent.fspath)
+ if rel:
+ name = rel
+ name = name.replace(os.sep, SEP)
+ self.fspath = fspath
+
+ session = session or parent.session
+
+ if nodeid is None:
+ nodeid = self.fspath.relto(session.config.rootdir)
+
+ if not nodeid:
+ nodeid = _check_initialpaths_for_relpath(session, fspath)
+ if nodeid and os.sep != SEP:
+ nodeid = nodeid.replace(os.sep, SEP)
+
super().__init__(name, parent, config, session, nodeid=nodeid, fspath=fspath)
@classmethod
@@ -528,22 +528,22 @@ class FSCollector(Collector):
warnings.warn(FSCOLLECTOR_GETHOOKPROXY_ISINITPATH, stacklevel=2)
return self.session.isinitpath(path)
-
+
class File(FSCollector):
"""Base class for collecting tests from a file.
-
+
:ref:`non-python tests`.
"""
class Item(Node):
"""A basic test invocation item.
-
+
Note that for a single function there might be multiple test invocation items.
- """
-
- nextitem = None
-
+ """
+
+ nextitem = None
+
def __init__(
self,
name,
@@ -554,34 +554,34 @@ class Item(Node):
) -> None:
super().__init__(name, parent, config, session, nodeid=nodeid)
self._report_sections: List[Tuple[str, str, str]] = []
-
+
#: A list of tuples (name, value) that holds user defined properties
#: for this test.
self.user_properties: List[Tuple[str, object]] = []
-
+
def runtest(self) -> None:
raise NotImplementedError("runtest must be implemented by Item subclass")
def add_report_section(self, when: str, key: str, content: str) -> None:
"""Add a new report section, similar to what's done internally to add
stdout and stderr captured output::
-
- item.add_report_section("call", "stdout", "report section contents")
-
- :param str when:
- One of the possible capture states, ``"setup"``, ``"call"``, ``"teardown"``.
- :param str key:
- Name of the section, can be customized at will. Pytest uses ``"stdout"`` and
- ``"stderr"`` internally.
- :param str content:
- The full contents as a string.
- """
- if content:
- self._report_sections.append((when, key, content))
-
+
+ item.add_report_section("call", "stdout", "report section contents")
+
+ :param str when:
+ One of the possible capture states, ``"setup"``, ``"call"``, ``"teardown"``.
+ :param str key:
+ Name of the section, can be customized at will. Pytest uses ``"stdout"`` and
+ ``"stderr"`` internally.
+ :param str content:
+ The full contents as a string.
+ """
+ if content:
+ self._report_sections.append((when, key, content))
+
def reportinfo(self) -> Tuple[Union[py.path.local, str], Optional[int], str]:
- return self.fspath, None, ""
-
+ return self.fspath, None, ""
+
@cached_property
def location(self) -> Tuple[str, Optional[int], str]:
location = self.reportinfo()