diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/python/pytest/py3/_pytest/_code | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/_code')
-rw-r--r-- | contrib/python/pytest/py3/_pytest/_code/__init__.py | 10 | ||||
-rw-r--r-- | contrib/python/pytest/py3/_pytest/_code/code.py | 1191 | ||||
-rw-r--r-- | contrib/python/pytest/py3/_pytest/_code/source.py | 416 |
3 files changed, 1617 insertions, 0 deletions
diff --git a/contrib/python/pytest/py3/_pytest/_code/__init__.py b/contrib/python/pytest/py3/_pytest/_code/__init__.py new file mode 100644 index 0000000000..370e41dc9f --- /dev/null +++ b/contrib/python/pytest/py3/_pytest/_code/__init__.py @@ -0,0 +1,10 @@ +""" python inspection/code generation API """ +from .code import Code # noqa +from .code import ExceptionInfo # noqa +from .code import filter_traceback # noqa +from .code import Frame # noqa +from .code import getrawcode # noqa +from .code import Traceback # noqa +from .source import compile_ as compile # noqa +from .source import getfslineno # noqa +from .source import Source # noqa diff --git a/contrib/python/pytest/py3/_pytest/_code/code.py b/contrib/python/pytest/py3/_pytest/_code/code.py new file mode 100644 index 0000000000..965074c924 --- /dev/null +++ b/contrib/python/pytest/py3/_pytest/_code/code.py @@ -0,0 +1,1191 @@ +import inspect +import re +import sys +import traceback +from inspect import CO_VARARGS +from inspect import CO_VARKEYWORDS +from io import StringIO +from traceback import format_exception_only +from types import CodeType +from types import FrameType +from types import TracebackType +from typing import Any +from typing import Callable +from typing import Dict +from typing import Generic +from typing import Iterable +from typing import List +from typing import Optional +from typing import Pattern +from typing import Sequence +from typing import Set +from typing import Tuple +from typing import TypeVar +from typing import Union +from weakref import ref + +import attr +import pluggy +import py + +import _pytest +from _pytest._io import TerminalWriter +from _pytest._io.saferepr import safeformat +from _pytest._io.saferepr import saferepr +from _pytest.compat import ATTRS_EQ_FIELD +from _pytest.compat import overload +from _pytest.compat import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Type + from typing_extensions import Literal + from weakref import ReferenceType # noqa: F401 + + from _pytest._code import Source + + _TracebackStyle = Literal["long", "short", "line", "no", "native"] + + +class Code: + """ wrapper around Python code objects """ + + def __init__(self, rawcode) -> None: + if not hasattr(rawcode, "co_filename"): + rawcode = getrawcode(rawcode) + if not isinstance(rawcode, CodeType): + raise TypeError("not a code object: {!r}".format(rawcode)) + self.filename = rawcode.co_filename + self.firstlineno = rawcode.co_firstlineno - 1 + self.name = rawcode.co_name + self.raw = rawcode + + def __eq__(self, other): + return self.raw == other.raw + + # Ignore type because of https://github.com/python/mypy/issues/4266. + __hash__ = None # type: ignore + + def __ne__(self, other): + return not self == other + + @property + def path(self) -> Union[py.path.local, str]: + """ return a path object pointing to source code (or a str in case + of OSError / non-existing file). + """ + if not self.raw.co_filename: + return "" + try: + p = py.path.local(self.raw.co_filename) + # maybe don't try this checking + if not p.check(): + raise OSError("py.path check failed.") + return p + except OSError: + # XXX maybe try harder like the weird logic + # in the standard lib [linecache.updatecache] does? + return self.raw.co_filename + + @property + def fullsource(self) -> Optional["Source"]: + """ return a _pytest._code.Source object for the full source file of the code + """ + from _pytest._code import source + + full, _ = source.findsource(self.raw) + return full + + def source(self) -> "Source": + """ return a _pytest._code.Source object for the code object's source only + """ + # return source only for that part of code + import _pytest._code + + return _pytest._code.Source(self.raw) + + def getargs(self, var: bool = False) -> Tuple[str, ...]: + """ return a tuple with the argument names for the code object + + if 'var' is set True also return the names of the variable and + keyword arguments when present + """ + # handfull shortcut for getting args + raw = self.raw + argcount = raw.co_argcount + if var: + argcount += raw.co_flags & CO_VARARGS + argcount += raw.co_flags & CO_VARKEYWORDS + return raw.co_varnames[:argcount] + + +class Frame: + """Wrapper around a Python frame holding f_locals and f_globals + in which expressions can be evaluated.""" + + def __init__(self, frame: FrameType) -> None: + self.lineno = frame.f_lineno - 1 + self.f_globals = frame.f_globals + self.f_locals = frame.f_locals + self.raw = frame + self.code = Code(frame.f_code) + + @property + def statement(self) -> "Source": + """ statement this frame is at """ + import _pytest._code + + if self.code.fullsource is None: + return _pytest._code.Source("") + return self.code.fullsource.getstatement(self.lineno) + + def eval(self, code, **vars): + """ evaluate 'code' in the frame + + 'vars' are optional additional local variables + + returns the result of the evaluation + """ + f_locals = self.f_locals.copy() + f_locals.update(vars) + return eval(code, self.f_globals, f_locals) + + def exec_(self, code, **vars) -> None: + """ exec 'code' in the frame + + 'vars' are optional; additional local variables + """ + f_locals = self.f_locals.copy() + f_locals.update(vars) + exec(code, self.f_globals, f_locals) + + def repr(self, object: object) -> str: + """ return a 'safe' (non-recursive, one-line) string repr for 'object' + """ + return saferepr(object) + + def is_true(self, object): + return object + + def getargs(self, var: bool = False): + """ return a list of tuples (name, value) for all arguments + + if 'var' is set True also include the variable and keyword + arguments when present + """ + retval = [] + for arg in self.code.getargs(var): + try: + retval.append((arg, self.f_locals[arg])) + except KeyError: + pass # this can occur when using Psyco + return retval + + +class TracebackEntry: + """ a single entry in a traceback """ + + _repr_style = None # type: Optional[Literal["short", "long"]] + exprinfo = None + + def __init__(self, rawentry: TracebackType, excinfo=None) -> None: + self._excinfo = excinfo + self._rawentry = rawentry + self.lineno = rawentry.tb_lineno - 1 + + def set_repr_style(self, mode: "Literal['short', 'long']") -> None: + assert mode in ("short", "long") + self._repr_style = mode + + @property + def frame(self) -> Frame: + return Frame(self._rawentry.tb_frame) + + @property + def relline(self) -> int: + return self.lineno - self.frame.code.firstlineno + + def __repr__(self) -> str: + return "<TracebackEntry %s:%d>" % (self.frame.code.path, self.lineno + 1) + + @property + def statement(self) -> "Source": + """ _pytest._code.Source object for the current statement """ + source = self.frame.code.fullsource + assert source is not None + return source.getstatement(self.lineno) + + @property + def path(self): + """ path to the source code """ + return self.frame.code.path + + @property + def locals(self) -> Dict[str, Any]: + """ locals of underlying frame """ + return self.frame.f_locals + + def getfirstlinesource(self) -> int: + return self.frame.code.firstlineno + + def getsource(self, astcache=None) -> Optional["Source"]: + """ return failing source code. """ + # we use the passed in astcache to not reparse asttrees + # within exception info printing + from _pytest._code.source import getstatementrange_ast + + source = self.frame.code.fullsource + if source is None: + return None + key = astnode = None + if astcache is not None: + key = self.frame.code.path + if key is not None: + astnode = astcache.get(key, None) + start = self.getfirstlinesource() + try: + astnode, _, end = getstatementrange_ast( + self.lineno, source, astnode=astnode + ) + except SyntaxError: + end = self.lineno + 1 + else: + if key is not None: + astcache[key] = astnode + return source[start:end] + + source = property(getsource) + + def ishidden(self): + """ return True if the current frame has a var __tracebackhide__ + resolving to True. + + If __tracebackhide__ is a callable, it gets called with the + ExceptionInfo instance and can decide whether to hide the traceback. + + mostly for internal use + """ + f = self.frame + tbh = f.f_locals.get( + "__tracebackhide__", f.f_globals.get("__tracebackhide__", False) + ) + if tbh and callable(tbh): + return tbh(None if self._excinfo is None else self._excinfo()) + return tbh + + def __str__(self) -> str: + try: + fn = str(self.path) + except py.error.Error: + fn = "???" + name = self.frame.code.name + try: + line = str(self.statement).lstrip() + except KeyboardInterrupt: + raise + except: # noqa + line = "???" + return " File %r:%d in %s\n %s\n" % (fn, self.lineno + 1, name, line) + + @property + def name(self) -> str: + """ co_name of underlying code """ + return self.frame.code.raw.co_name + + +class Traceback(List[TracebackEntry]): + """ Traceback objects encapsulate and offer higher level + access to Traceback entries. + """ + + def __init__( + self, + tb: Union[TracebackType, Iterable[TracebackEntry]], + excinfo: Optional["ReferenceType[ExceptionInfo]"] = None, + ) -> None: + """ initialize from given python traceback object and ExceptionInfo """ + self._excinfo = excinfo + if isinstance(tb, TracebackType): + + def f(cur: TracebackType) -> Iterable[TracebackEntry]: + cur_ = cur # type: Optional[TracebackType] + while cur_ is not None: + yield TracebackEntry(cur_, excinfo=excinfo) + cur_ = cur_.tb_next + + super().__init__(f(tb)) + else: + super().__init__(tb) + + def cut( + self, + path=None, + lineno: Optional[int] = None, + firstlineno: Optional[int] = None, + excludepath=None, + ) -> "Traceback": + """ return a Traceback instance wrapping part of this Traceback + + by providing any combination of path, lineno and firstlineno, the + first frame to start the to-be-returned traceback is determined + + this allows cutting the first part of a Traceback instance e.g. + for formatting reasons (removing some uninteresting bits that deal + with handling of the exception/traceback) + """ + for x in self: + code = x.frame.code + codepath = code.path + if ( + (path is None or codepath == path) + and ( + excludepath is None + or not isinstance(codepath, py.path.local) + or not codepath.relto(excludepath) + ) + and (lineno is None or x.lineno == lineno) + and (firstlineno is None or x.frame.code.firstlineno == firstlineno) + ): + return Traceback(x._rawentry, self._excinfo) + return self + + @overload + def __getitem__(self, key: int) -> TracebackEntry: + raise NotImplementedError() + + @overload # noqa: F811 + def __getitem__(self, key: slice) -> "Traceback": # noqa: F811 + raise NotImplementedError() + + def __getitem__( # noqa: F811 + self, key: Union[int, slice] + ) -> Union[TracebackEntry, "Traceback"]: + if isinstance(key, slice): + return self.__class__(super().__getitem__(key)) + else: + return super().__getitem__(key) + + def filter( + self, fn: Callable[[TracebackEntry], bool] = lambda x: not x.ishidden() + ) -> "Traceback": + """ return a Traceback instance with certain items removed + + fn is a function that gets a single argument, a TracebackEntry + instance, and should return True when the item should be added + to the Traceback, False when not + + by default this removes all the TracebackEntries which are hidden + (see ishidden() above) + """ + return Traceback(filter(fn, self), self._excinfo) + + def getcrashentry(self) -> TracebackEntry: + """ return last non-hidden traceback entry that lead + to the exception of a traceback. + """ + for i in range(-1, -len(self) - 1, -1): + entry = self[i] + if not entry.ishidden(): + return entry + return self[-1] + + def recursionindex(self) -> Optional[int]: + """ return the index of the frame/TracebackEntry where recursion + originates if appropriate, None if no recursion occurred + """ + cache = {} # type: Dict[Tuple[Any, int, int], List[Dict[str, Any]]] + for i, entry in enumerate(self): + # id for the code.raw is needed to work around + # the strange metaprogramming in the decorator lib from pypi + # which generates code objects that have hash/value equality + # XXX needs a test + key = entry.frame.code.path, id(entry.frame.code.raw), entry.lineno + # print "checking for recursion at", key + values = cache.setdefault(key, []) + if values: + f = entry.frame + loc = f.f_locals + for otherloc in values: + if f.is_true( + f.eval( + co_equal, + __recursioncache_locals_1=loc, + __recursioncache_locals_2=otherloc, + ) + ): + return i + values.append(entry.frame.f_locals) + return None + + +co_equal = compile( + "__recursioncache_locals_1 == __recursioncache_locals_2", "?", "eval" +) + + +_E = TypeVar("_E", bound=BaseException) + + +@attr.s(repr=False) +class ExceptionInfo(Generic[_E]): + """ wraps sys.exc_info() objects and offers + help for navigating the traceback. + """ + + _assert_start_repr = "AssertionError('assert " + + _excinfo = attr.ib(type=Optional[Tuple["Type[_E]", "_E", TracebackType]]) + _striptext = attr.ib(type=str, default="") + _traceback = attr.ib(type=Optional[Traceback], default=None) + + @classmethod + def from_exc_info( + cls, + exc_info: Tuple["Type[_E]", "_E", TracebackType], + exprinfo: Optional[str] = None, + ) -> "ExceptionInfo[_E]": + """returns an ExceptionInfo for an existing exc_info tuple. + + .. warning:: + + Experimental API + + + :param exprinfo: a text string helping to determine if we should + strip ``AssertionError`` from the output, defaults + to the exception message/``__str__()`` + """ + _striptext = "" + if exprinfo is None and isinstance(exc_info[1], AssertionError): + exprinfo = getattr(exc_info[1], "msg", None) + if exprinfo is None: + exprinfo = saferepr(exc_info[1]) + if exprinfo and exprinfo.startswith(cls._assert_start_repr): + _striptext = "AssertionError: " + + return cls(exc_info, _striptext) + + @classmethod + def from_current( + cls, exprinfo: Optional[str] = None + ) -> "ExceptionInfo[BaseException]": + """returns an ExceptionInfo matching the current traceback + + .. warning:: + + Experimental API + + + :param exprinfo: a text string helping to determine if we should + strip ``AssertionError`` from the output, defaults + to the exception message/``__str__()`` + """ + tup = sys.exc_info() + assert tup[0] is not None, "no current exception" + assert tup[1] is not None, "no current exception" + assert tup[2] is not None, "no current exception" + exc_info = (tup[0], tup[1], tup[2]) + return ExceptionInfo.from_exc_info(exc_info, exprinfo) + + @classmethod + def for_later(cls) -> "ExceptionInfo[_E]": + """return an unfilled ExceptionInfo + """ + return cls(None) + + def fill_unfilled(self, exc_info: Tuple["Type[_E]", _E, TracebackType]) -> None: + """fill an unfilled ExceptionInfo created with for_later()""" + assert self._excinfo is None, "ExceptionInfo was already filled" + self._excinfo = exc_info + + @property + def type(self) -> "Type[_E]": + """the exception class""" + assert ( + self._excinfo is not None + ), ".type can only be used after the context manager exits" + return self._excinfo[0] + + @property + def value(self) -> _E: + """the exception value""" + assert ( + self._excinfo is not None + ), ".value can only be used after the context manager exits" + return self._excinfo[1] + + @property + def tb(self) -> TracebackType: + """the exception raw traceback""" + assert ( + self._excinfo is not None + ), ".tb can only be used after the context manager exits" + return self._excinfo[2] + + @property + def typename(self) -> str: + """the type name of the exception""" + assert ( + self._excinfo is not None + ), ".typename can only be used after the context manager exits" + return self.type.__name__ + + @property + def traceback(self) -> Traceback: + """the traceback""" + if self._traceback is None: + self._traceback = Traceback(self.tb, excinfo=ref(self)) + return self._traceback + + @traceback.setter + def traceback(self, value: Traceback) -> None: + self._traceback = value + + def __repr__(self) -> str: + if self._excinfo is None: + return "<ExceptionInfo for raises contextmanager>" + return "<{} {} tblen={}>".format( + self.__class__.__name__, saferepr(self._excinfo[1]), len(self.traceback) + ) + + def exconly(self, tryshort: bool = False) -> str: + """ return the exception as a string + + when 'tryshort' resolves to True, and the exception is a + _pytest._code._AssertionError, only the actual exception part of + the exception representation is returned (so 'AssertionError: ' is + removed from the beginning) + """ + lines = format_exception_only(self.type, self.value) + text = "".join(lines) + text = text.rstrip() + if tryshort: + if text.startswith(self._striptext): + text = text[len(self._striptext) :] + return text + + def errisinstance( + self, exc: Union["Type[BaseException]", Tuple["Type[BaseException]", ...]] + ) -> bool: + """ return True if the exception is an instance of exc """ + return isinstance(self.value, exc) + + def _getreprcrash(self) -> "ReprFileLocation": + exconly = self.exconly(tryshort=True) + entry = self.traceback.getcrashentry() + path, lineno = entry.frame.code.raw.co_filename, entry.lineno + return ReprFileLocation(path, lineno + 1, exconly) + + def getrepr( + self, + showlocals: bool = False, + style: "_TracebackStyle" = "long", + abspath: bool = False, + tbfilter: bool = True, + funcargs: bool = False, + truncate_locals: bool = True, + chain: bool = True, + ) -> Union["ReprExceptionInfo", "ExceptionChainRepr"]: + """ + Return str()able representation of this exception info. + + :param bool showlocals: + Show locals per traceback entry. + Ignored if ``style=="native"``. + + :param str style: long|short|no|native traceback style + + :param bool abspath: + If paths should be changed to absolute or left unchanged. + + :param bool tbfilter: + Hide entries that contain a local variable ``__tracebackhide__==True``. + Ignored if ``style=="native"``. + + :param bool funcargs: + Show fixtures ("funcargs" for legacy purposes) per traceback entry. + + :param bool truncate_locals: + With ``showlocals==True``, make sure locals can be safely represented as strings. + + :param bool chain: if chained exceptions in Python 3 should be shown. + + .. versionchanged:: 3.9 + + Added the ``chain`` parameter. + """ + if style == "native": + return ReprExceptionInfo( + ReprTracebackNative( + traceback.format_exception( + self.type, self.value, self.traceback[0]._rawentry + ) + ), + self._getreprcrash(), + ) + + fmt = FormattedExcinfo( + showlocals=showlocals, + style=style, + abspath=abspath, + tbfilter=tbfilter, + funcargs=funcargs, + truncate_locals=truncate_locals, + chain=chain, + ) + return fmt.repr_excinfo(self) + + def match(self, regexp: "Union[str, Pattern]") -> "Literal[True]": + """ + Check whether the regular expression `regexp` matches the string + representation of the exception using :func:`python:re.search`. + If it matches `True` is returned. + If it doesn't match an `AssertionError` is raised. + """ + __tracebackhide__ = True + assert re.search( + regexp, str(self.value) + ), "Pattern {!r} does not match {!r}".format(regexp, str(self.value)) + # Return True to allow for "assert excinfo.match()". + return True + + +@attr.s +class FormattedExcinfo: + """ presenting information about failing Functions and Generators. """ + + # for traceback entries + flow_marker = ">" + fail_marker = "E" + + showlocals = attr.ib(type=bool, default=False) + style = attr.ib(type="_TracebackStyle", default="long") + abspath = attr.ib(type=bool, default=True) + tbfilter = attr.ib(type=bool, default=True) + funcargs = attr.ib(type=bool, default=False) + truncate_locals = attr.ib(type=bool, default=True) + chain = attr.ib(type=bool, default=True) + astcache = attr.ib(default=attr.Factory(dict), init=False, repr=False) + + def _getindent(self, source: "Source") -> int: + # figure out indent for given source + try: + s = str(source.getstatement(len(source) - 1)) + except KeyboardInterrupt: + raise + except: # noqa + try: + s = str(source[-1]) + except KeyboardInterrupt: + raise + except: # noqa + return 0 + return 4 + (len(s) - len(s.lstrip())) + + def _getentrysource(self, entry: TracebackEntry) -> Optional["Source"]: + source = entry.getsource(self.astcache) + if source is not None: + source = source.deindent() + return source + + def repr_args(self, entry: TracebackEntry) -> Optional["ReprFuncArgs"]: + if self.funcargs: + args = [] + for argname, argvalue in entry.frame.getargs(var=True): + args.append((argname, saferepr(argvalue))) + return ReprFuncArgs(args) + return None + + def get_source( + self, + source: "Source", + line_index: int = -1, + excinfo: Optional[ExceptionInfo] = None, + short: bool = False, + ) -> List[str]: + """ return formatted and marked up source lines. """ + import _pytest._code + + lines = [] + if source is None or line_index >= len(source.lines): + source = _pytest._code.Source("???") + line_index = 0 + if line_index < 0: + line_index += len(source) + space_prefix = " " + if short: + lines.append(space_prefix + source.lines[line_index].strip()) + else: + for line in source.lines[:line_index]: + lines.append(space_prefix + line) + lines.append(self.flow_marker + " " + source.lines[line_index]) + for line in source.lines[line_index + 1 :]: + lines.append(space_prefix + line) + if excinfo is not None: + indent = 4 if short else self._getindent(source) + lines.extend(self.get_exconly(excinfo, indent=indent, markall=True)) + return lines + + def get_exconly( + self, excinfo: ExceptionInfo, indent: int = 4, markall: bool = False + ) -> List[str]: + lines = [] + indentstr = " " * indent + # get the real exception information out + exlines = excinfo.exconly(tryshort=True).split("\n") + failindent = self.fail_marker + indentstr[1:] + for line in exlines: + lines.append(failindent + line) + if not markall: + failindent = indentstr + return lines + + def repr_locals(self, locals: Dict[str, object]) -> Optional["ReprLocals"]: + if self.showlocals: + lines = [] + keys = [loc for loc in locals if loc[0] != "@"] + keys.sort() + for name in keys: + value = locals[name] + if name == "__builtins__": + lines.append("__builtins__ = <builtins>") + else: + # This formatting could all be handled by the + # _repr() function, which is only reprlib.Repr in + # disguise, so is very configurable. + if self.truncate_locals: + str_repr = saferepr(value) + else: + str_repr = safeformat(value) + # if len(str_repr) < 70 or not isinstance(value, + # (list, tuple, dict)): + lines.append("{:<10} = {}".format(name, str_repr)) + # else: + # self._line("%-10s =\\" % (name,)) + # # XXX + # pprint.pprint(value, stream=self.excinfowriter) + return ReprLocals(lines) + return None + + def repr_traceback_entry( + self, entry: TracebackEntry, excinfo: Optional[ExceptionInfo] = None + ) -> "ReprEntry": + import _pytest._code + + source = self._getentrysource(entry) + if source is None: + source = _pytest._code.Source("???") + line_index = 0 + else: + line_index = entry.lineno - entry.getfirstlinesource() + + lines = [] # type: List[str] + style = entry._repr_style if entry._repr_style is not None else self.style + if style in ("short", "long"): + short = style == "short" + reprargs = self.repr_args(entry) if not short else None + s = self.get_source(source, line_index, excinfo, short=short) + lines.extend(s) + if short: + message = "in %s" % (entry.name) + else: + message = excinfo and excinfo.typename or "" + path = self._makepath(entry.path) + reprfileloc = ReprFileLocation(path, entry.lineno + 1, message) + localsrepr = self.repr_locals(entry.locals) + return ReprEntry(lines, reprargs, localsrepr, reprfileloc, style) + if excinfo: + lines.extend(self.get_exconly(excinfo, indent=4)) + return ReprEntry(lines, None, None, None, style) + + def _makepath(self, path): + if not self.abspath: + try: + np = py.path.local().bestrelpath(path) + except OSError: + return path + if len(np) < len(str(path)): + path = np + return path + + def repr_traceback(self, excinfo: ExceptionInfo) -> "ReprTraceback": + traceback = excinfo.traceback + if self.tbfilter: + traceback = traceback.filter() + + if excinfo.errisinstance(RecursionError): + traceback, extraline = self._truncate_recursive_traceback(traceback) + else: + extraline = None + + last = traceback[-1] + entries = [] + for index, entry in enumerate(traceback): + einfo = (last == entry) and excinfo or None + reprentry = self.repr_traceback_entry(entry, einfo) + entries.append(reprentry) + return ReprTraceback(entries, extraline, style=self.style) + + def _truncate_recursive_traceback( + self, traceback: Traceback + ) -> Tuple[Traceback, Optional[str]]: + """ + Truncate the given recursive traceback trying to find the starting point + of the recursion. + + The detection is done by going through each traceback entry and finding the + point in which the locals of the frame are equal to the locals of a previous frame (see ``recursionindex()``. + + Handle the situation where the recursion process might raise an exception (for example + comparing numpy arrays using equality raises a TypeError), in which case we do our best to + warn the user of the error and show a limited traceback. + """ + try: + recursionindex = traceback.recursionindex() + except Exception as e: + max_frames = 10 + extraline = ( + "!!! Recursion error detected, but an error occurred locating the origin of recursion.\n" + " The following exception happened when comparing locals in the stack frame:\n" + " {exc_type}: {exc_msg}\n" + " Displaying first and last {max_frames} stack frames out of {total}." + ).format( + exc_type=type(e).__name__, + exc_msg=str(e), + max_frames=max_frames, + total=len(traceback), + ) # type: Optional[str] + # Type ignored because adding two instaces of a List subtype + # currently incorrectly has type List instead of the subtype. + traceback = traceback[:max_frames] + traceback[-max_frames:] # type: ignore + else: + if recursionindex is not None: + extraline = "!!! Recursion detected (same locals & position)" + traceback = traceback[: recursionindex + 1] + else: + extraline = None + + return traceback, extraline + + def repr_excinfo(self, excinfo: ExceptionInfo) -> "ExceptionChainRepr": + repr_chain = ( + [] + ) # type: List[Tuple[ReprTraceback, Optional[ReprFileLocation], Optional[str]]] + e = excinfo.value + excinfo_ = excinfo # type: Optional[ExceptionInfo] + descr = None + seen = set() # type: Set[int] + while e is not None and id(e) not in seen: + seen.add(id(e)) + if excinfo_: + reprtraceback = self.repr_traceback(excinfo_) + reprcrash = excinfo_._getreprcrash() # type: Optional[ReprFileLocation] + else: + # fallback to native repr if the exception doesn't have a traceback: + # ExceptionInfo objects require a full traceback to work + reprtraceback = ReprTracebackNative( + traceback.format_exception(type(e), e, None) + ) + reprcrash = None + + repr_chain += [(reprtraceback, reprcrash, descr)] + if e.__cause__ is not None and self.chain: + e = e.__cause__ + excinfo_ = ( + ExceptionInfo((type(e), e, e.__traceback__)) + if e.__traceback__ + else None + ) + descr = "The above exception was the direct cause of the following exception:" + elif ( + e.__context__ is not None and not e.__suppress_context__ and self.chain + ): + e = e.__context__ + excinfo_ = ( + ExceptionInfo((type(e), e, e.__traceback__)) + if e.__traceback__ + else None + ) + descr = "During handling of the above exception, another exception occurred:" + else: + e = None + repr_chain.reverse() + return ExceptionChainRepr(repr_chain) + + +@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore +class TerminalRepr: + def __str__(self) -> str: + # FYI this is called from pytest-xdist's serialization of exception + # information. + io = StringIO() + tw = TerminalWriter(file=io) + self.toterminal(tw) + return io.getvalue().strip() + + def __repr__(self) -> str: + return "<{} instance at {:0x}>".format(self.__class__, id(self)) + + def toterminal(self, tw: TerminalWriter) -> None: + raise NotImplementedError() + + +@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore +class ExceptionRepr(TerminalRepr): + def __attrs_post_init__(self): + self.sections = [] # type: List[Tuple[str, str, str]] + + def addsection(self, name: str, content: str, sep: str = "-") -> None: + self.sections.append((name, content, sep)) + + def toterminal(self, tw: TerminalWriter) -> None: + for name, content, sep in self.sections: + tw.sep(sep, name) + tw.line(content) + + +@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore +class ExceptionChainRepr(ExceptionRepr): + chain = attr.ib( + type=Sequence[ + Tuple["ReprTraceback", Optional["ReprFileLocation"], Optional[str]] + ] + ) + + def __attrs_post_init__(self): + super().__attrs_post_init__() + # reprcrash and reprtraceback of the outermost (the newest) exception + # in the chain + self.reprtraceback = self.chain[-1][0] + self.reprcrash = self.chain[-1][1] + + def toterminal(self, tw: TerminalWriter) -> None: + for element in self.chain: + element[0].toterminal(tw) + if element[2] is not None: + tw.line("") + tw.line(element[2], yellow=True) + super().toterminal(tw) + + +@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore +class ReprExceptionInfo(ExceptionRepr): + reprtraceback = attr.ib(type="ReprTraceback") + reprcrash = attr.ib(type="ReprFileLocation") + + def toterminal(self, tw: TerminalWriter) -> None: + self.reprtraceback.toterminal(tw) + super().toterminal(tw) + + +@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore +class ReprTraceback(TerminalRepr): + reprentries = attr.ib(type=Sequence[Union["ReprEntry", "ReprEntryNative"]]) + extraline = attr.ib(type=Optional[str]) + style = attr.ib(type="_TracebackStyle") + + entrysep = "_ " + + def toterminal(self, tw: TerminalWriter) -> None: + # the entries might have different styles + for i, entry in enumerate(self.reprentries): + if entry.style == "long": + tw.line("") + entry.toterminal(tw) + if i < len(self.reprentries) - 1: + next_entry = self.reprentries[i + 1] + if ( + entry.style == "long" + or entry.style == "short" + and next_entry.style == "long" + ): + tw.sep(self.entrysep) + + if self.extraline: + tw.line(self.extraline) + + +class ReprTracebackNative(ReprTraceback): + def __init__(self, tblines: Sequence[str]) -> None: + self.style = "native" + self.reprentries = [ReprEntryNative(tblines)] + self.extraline = None + + +@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore +class ReprEntryNative(TerminalRepr): + lines = attr.ib(type=Sequence[str]) + style = "native" # type: _TracebackStyle + + def toterminal(self, tw: TerminalWriter) -> None: + tw.write("".join(self.lines)) + + +@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore +class ReprEntry(TerminalRepr): + lines = attr.ib(type=Sequence[str]) + reprfuncargs = attr.ib(type=Optional["ReprFuncArgs"]) + reprlocals = attr.ib(type=Optional["ReprLocals"]) + reprfileloc = attr.ib(type=Optional["ReprFileLocation"]) + style = attr.ib(type="_TracebackStyle") + + def _write_entry_lines(self, tw: TerminalWriter) -> None: + """Writes the source code portions of a list of traceback entries with syntax highlighting. + + Usually entries are lines like these: + + " x = 1" + "> assert x == 2" + "E assert 1 == 2" + + This function takes care of rendering the "source" portions of it (the lines without + the "E" prefix) using syntax highlighting, taking care to not highlighting the ">" + character, as doing so might break line continuations. + """ + + indent_size = 4 + + def is_fail(line): + return line.startswith("{} ".format(FormattedExcinfo.fail_marker)) + + if not self.lines: + return + + # separate indents and source lines that are not failures: we want to + # highlight the code but not the indentation, which may contain markers + # such as "> assert 0" + indents = [] + source_lines = [] + for line in self.lines: + if not is_fail(line): + indents.append(line[:indent_size]) + source_lines.append(line[indent_size:]) + + tw._write_source(source_lines, indents) + + # failure lines are always completely red and bold + for line in (x for x in self.lines if is_fail(x)): + tw.line(line, bold=True, red=True) + + def toterminal(self, tw: TerminalWriter) -> None: + if self.style == "short": + assert self.reprfileloc is not None + self.reprfileloc.toterminal(tw) + self._write_entry_lines(tw) + if self.reprlocals: + self.reprlocals.toterminal(tw, indent=" " * 8) + return + + if self.reprfuncargs: + self.reprfuncargs.toterminal(tw) + + self._write_entry_lines(tw) + + if self.reprlocals: + tw.line("") + self.reprlocals.toterminal(tw) + if self.reprfileloc: + if self.lines: + tw.line("") + self.reprfileloc.toterminal(tw) + + def __str__(self) -> str: + return "{}\n{}\n{}".format( + "\n".join(self.lines), self.reprlocals, self.reprfileloc + ) + + +@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore +class ReprFileLocation(TerminalRepr): + path = attr.ib(type=str, converter=str) + lineno = attr.ib(type=int) + message = attr.ib(type=str) + + def toterminal(self, tw: TerminalWriter) -> None: + # filename and lineno output for each entry, + # using an output format that most editors understand + msg = self.message + i = msg.find("\n") + if i != -1: + msg = msg[:i] + tw.write(self.path, bold=True, red=True) + tw.line(":{}: {}".format(self.lineno, msg)) + + +@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore +class ReprLocals(TerminalRepr): + lines = attr.ib(type=Sequence[str]) + + def toterminal(self, tw: TerminalWriter, indent="") -> None: + for line in self.lines: + tw.line(indent + line) + + +@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore +class ReprFuncArgs(TerminalRepr): + args = attr.ib(type=Sequence[Tuple[str, object]]) + + def toterminal(self, tw: TerminalWriter) -> None: + if self.args: + linesofar = "" + for name, value in self.args: + ns = "{} = {}".format(name, value) + if len(ns) + len(linesofar) + 2 > tw.fullwidth: + if linesofar: + tw.line(linesofar) + linesofar = ns + else: + if linesofar: + linesofar += ", " + ns + else: + linesofar = ns + if linesofar: + tw.line(linesofar) + tw.line("") + + +def getrawcode(obj, trycall: bool = True): + """ return code object for given function. """ + try: + return obj.__code__ + except AttributeError: + obj = getattr(obj, "f_code", obj) + obj = getattr(obj, "__code__", obj) + if trycall and not hasattr(obj, "co_firstlineno"): + if hasattr(obj, "__call__") and not inspect.isclass(obj): + x = getrawcode(obj.__call__, trycall=False) + if hasattr(x, "co_firstlineno"): + return x + return obj + + +# relative paths that we use to filter traceback entries from appearing to the user; +# see filter_traceback +# note: if we need to add more paths than what we have now we should probably use a list +# for better maintenance + +_PLUGGY_DIR = py.path.local(pluggy.__file__.rstrip("oc")) +# pluggy is either a package or a single module depending on the version +if _PLUGGY_DIR.basename == "__init__.py": + _PLUGGY_DIR = _PLUGGY_DIR.dirpath() +_PYTEST_DIR = py.path.local(_pytest.__file__).dirpath() +_PY_DIR = py.path.local(py.__file__).dirpath() + + +def filter_traceback(entry: TracebackEntry) -> bool: + """Return True if a TracebackEntry instance should be removed from tracebacks: + * dynamically generated code (no code to show up for it); + * internal traceback from pytest or its internal libraries, py and pluggy. + """ + # entry.path might sometimes return a str object when the entry + # points to dynamically generated code + # see https://bitbucket.org/pytest-dev/py/issues/71 + raw_filename = entry.frame.code.raw.co_filename + is_generated = "<" in raw_filename and ">" in raw_filename + if is_generated: + return False + # entry.path might point to a non-existing file, in which case it will + # also return a str object. see #1133 + p = py.path.local(entry.path) + return ( + not p.relto(_PLUGGY_DIR) and not p.relto(_PYTEST_DIR) and not p.relto(_PY_DIR) + ) diff --git a/contrib/python/pytest/py3/_pytest/_code/source.py b/contrib/python/pytest/py3/_pytest/_code/source.py new file mode 100644 index 0000000000..28c11e5d5e --- /dev/null +++ b/contrib/python/pytest/py3/_pytest/_code/source.py @@ -0,0 +1,416 @@ +import ast +import inspect +import linecache +import sys +import textwrap +import tokenize +import warnings +from bisect import bisect_right +from types import CodeType +from types import FrameType +from typing import Any +from typing import Iterator +from typing import List +from typing import Optional +from typing import Sequence +from typing import Tuple +from typing import Union + +import py + +from _pytest.compat import get_real_func +from _pytest.compat import overload +from _pytest.compat import TYPE_CHECKING + +if TYPE_CHECKING: + from typing_extensions import Literal + + +class Source: + """ an immutable object holding a source code fragment, + possibly deindenting it. + """ + + _compilecounter = 0 + + def __init__(self, *parts, **kwargs) -> None: + self.lines = lines = [] # type: List[str] + de = kwargs.get("deindent", True) + for part in parts: + if not part: + partlines = [] # type: List[str] + elif isinstance(part, Source): + partlines = part.lines + elif isinstance(part, (tuple, list)): + partlines = [x.rstrip("\n") for x in part] + elif isinstance(part, str): + partlines = part.split("\n") + else: + partlines = getsource(part, deindent=de).lines + if de: + partlines = deindent(partlines) + lines.extend(partlines) + + def __eq__(self, other): + try: + return self.lines == other.lines + except AttributeError: + if isinstance(other, str): + return str(self) == other + return False + + # Ignore type because of https://github.com/python/mypy/issues/4266. + __hash__ = None # type: ignore + + @overload + def __getitem__(self, key: int) -> str: + raise NotImplementedError() + + @overload # noqa: F811 + def __getitem__(self, key: slice) -> "Source": # noqa: F811 + raise NotImplementedError() + + def __getitem__(self, key: Union[int, slice]) -> Union[str, "Source"]: # noqa: F811 + if isinstance(key, int): + return self.lines[key] + else: + if key.step not in (None, 1): + raise IndexError("cannot slice a Source with a step") + newsource = Source() + newsource.lines = self.lines[key.start : key.stop] + return newsource + + def __iter__(self) -> Iterator[str]: + return iter(self.lines) + + def __len__(self) -> int: + return len(self.lines) + + def strip(self) -> "Source": + """ return new source object with trailing + and leading blank lines removed. + """ + start, end = 0, len(self) + while start < end and not self.lines[start].strip(): + start += 1 + while end > start and not self.lines[end - 1].strip(): + end -= 1 + source = Source() + source.lines[:] = self.lines[start:end] + return source + + def putaround( + self, before: str = "", after: str = "", indent: str = " " * 4 + ) -> "Source": + """ return a copy of the source object with + 'before' and 'after' wrapped around it. + """ + beforesource = Source(before) + aftersource = Source(after) + newsource = Source() + lines = [(indent + line) for line in self.lines] + newsource.lines = beforesource.lines + lines + aftersource.lines + return newsource + + def indent(self, indent: str = " " * 4) -> "Source": + """ return a copy of the source object with + all lines indented by the given indent-string. + """ + newsource = Source() + newsource.lines = [(indent + line) for line in self.lines] + return newsource + + def getstatement(self, lineno: int) -> "Source": + """ return Source statement which contains the + given linenumber (counted from 0). + """ + start, end = self.getstatementrange(lineno) + return self[start:end] + + def getstatementrange(self, lineno: int) -> Tuple[int, int]: + """ return (start, end) tuple which spans the minimal + statement region which containing the given lineno. + """ + if not (0 <= lineno < len(self)): + raise IndexError("lineno out of range") + ast, start, end = getstatementrange_ast(lineno, self) + return start, end + + def deindent(self) -> "Source": + """return a new source object deindented.""" + newsource = Source() + newsource.lines[:] = deindent(self.lines) + return newsource + + def isparseable(self, deindent: bool = True) -> bool: + """ return True if source is parseable, heuristically + deindenting it by default. + """ + if deindent: + source = str(self.deindent()) + else: + source = str(self) + try: + ast.parse(source) + except (SyntaxError, ValueError, TypeError): + return False + else: + return True + + def __str__(self) -> str: + return "\n".join(self.lines) + + @overload + def compile( + self, + filename: Optional[str] = ..., + mode: str = ..., + flag: "Literal[0]" = ..., + dont_inherit: int = ..., + _genframe: Optional[FrameType] = ..., + ) -> CodeType: + raise NotImplementedError() + + @overload # noqa: F811 + def compile( # noqa: F811 + self, + filename: Optional[str] = ..., + mode: str = ..., + flag: int = ..., + dont_inherit: int = ..., + _genframe: Optional[FrameType] = ..., + ) -> Union[CodeType, ast.AST]: + raise NotImplementedError() + + def compile( # noqa: F811 + self, + filename: Optional[str] = None, + mode: str = "exec", + flag: int = 0, + dont_inherit: int = 0, + _genframe: Optional[FrameType] = None, + ) -> Union[CodeType, ast.AST]: + """ return compiled code object. if filename is None + invent an artificial filename which displays + the source/line position of the caller frame. + """ + if not filename or py.path.local(filename).check(file=0): + if _genframe is None: + _genframe = sys._getframe(1) # the caller + fn, lineno = _genframe.f_code.co_filename, _genframe.f_lineno + base = "<%d-codegen " % self._compilecounter + self.__class__._compilecounter += 1 + if not filename: + filename = base + "%s:%d>" % (fn, lineno) + else: + filename = base + "%r %s:%d>" % (filename, fn, lineno) + source = "\n".join(self.lines) + "\n" + try: + co = compile(source, filename, mode, flag) + except SyntaxError as ex: + # re-represent syntax errors from parsing python strings + msglines = self.lines[: ex.lineno] + if ex.offset: + msglines.append(" " * ex.offset + "^") + msglines.append("(code was compiled probably from here: %s)" % filename) + newex = SyntaxError("\n".join(msglines)) + newex.offset = ex.offset + newex.lineno = ex.lineno + newex.text = ex.text + raise newex + else: + if flag & ast.PyCF_ONLY_AST: + assert isinstance(co, ast.AST) + return co + assert isinstance(co, CodeType) + lines = [(x + "\n") for x in self.lines] + # Type ignored because linecache.cache is private. + linecache.cache[filename] = (1, None, lines, filename) # type: ignore + return co + + +# +# public API shortcut functions +# + + +@overload +def compile_( + source: Union[str, bytes, ast.mod, ast.AST], + filename: Optional[str] = ..., + mode: str = ..., + flags: "Literal[0]" = ..., + dont_inherit: int = ..., +) -> CodeType: + raise NotImplementedError() + + +@overload # noqa: F811 +def compile_( # noqa: F811 + source: Union[str, bytes, ast.mod, ast.AST], + filename: Optional[str] = ..., + mode: str = ..., + flags: int = ..., + dont_inherit: int = ..., +) -> Union[CodeType, ast.AST]: + raise NotImplementedError() + + +def compile_( # noqa: F811 + source: Union[str, bytes, ast.mod, ast.AST], + filename: Optional[str] = None, + mode: str = "exec", + flags: int = 0, + dont_inherit: int = 0, +) -> Union[CodeType, ast.AST]: + """ compile the given source to a raw code object, + and maintain an internal cache which allows later + retrieval of the source code for the code object + and any recursively created code objects. + """ + if isinstance(source, ast.AST): + # XXX should Source support having AST? + assert filename is not None + co = compile(source, filename, mode, flags, dont_inherit) + assert isinstance(co, (CodeType, ast.AST)) + return co + _genframe = sys._getframe(1) # the caller + s = Source(source) + return s.compile(filename, mode, flags, _genframe=_genframe) + + +def getfslineno(obj: Any) -> Tuple[Union[str, py.path.local], int]: + """ Return source location (path, lineno) for the given object. + If the source cannot be determined return ("", -1). + + The line number is 0-based. + """ + from .code import Code + + # xxx let decorators etc specify a sane ordering + # NOTE: this used to be done in _pytest.compat.getfslineno, initially added + # in 6ec13a2b9. It ("place_as") appears to be something very custom. + obj = get_real_func(obj) + if hasattr(obj, "place_as"): + obj = obj.place_as + + try: + code = Code(obj) + except TypeError: + try: + fn = inspect.getsourcefile(obj) or inspect.getfile(obj) + except TypeError: + return "", -1 + + fspath = fn and py.path.local(fn) or "" + lineno = -1 + if fspath: + try: + _, lineno = findsource(obj) + except IOError: + pass + return fspath, lineno + else: + return code.path, code.firstlineno + + +# +# helper functions +# + + +def findsource(obj) -> Tuple[Optional[Source], int]: + try: + sourcelines, lineno = inspect.findsource(obj) + except Exception: + return None, -1 + source = Source() + source.lines = [line.rstrip() for line in sourcelines] + return source, lineno + + +def getsource(obj, **kwargs) -> Source: + from .code import getrawcode + + obj = getrawcode(obj) + try: + strsrc = inspect.getsource(obj) + except IndentationError: + strsrc = '"Buggy python version consider upgrading, cannot get source"' + assert isinstance(strsrc, str) + return Source(strsrc, **kwargs) + + +def deindent(lines: Sequence[str]) -> List[str]: + return textwrap.dedent("\n".join(lines)).splitlines() + + +def get_statement_startend2(lineno: int, node: ast.AST) -> Tuple[int, Optional[int]]: + import ast + + # flatten all statements and except handlers into one lineno-list + # AST's line numbers start indexing at 1 + values = [] # type: List[int] + for x in ast.walk(node): + if isinstance(x, (ast.stmt, ast.ExceptHandler)): + values.append(x.lineno - 1) + for name in ("finalbody", "orelse"): + val = getattr(x, name, None) # type: Optional[List[ast.stmt]] + if val: + # treat the finally/orelse part as its own statement + values.append(val[0].lineno - 1 - 1) + values.sort() + insert_index = bisect_right(values, lineno) + start = values[insert_index - 1] + if insert_index >= len(values): + end = None + else: + end = values[insert_index] + return start, end + + +def getstatementrange_ast( + lineno: int, + source: Source, + assertion: bool = False, + astnode: Optional[ast.AST] = None, +) -> Tuple[ast.AST, int, int]: + if astnode is None: + content = str(source) + # See #4260: + # don't produce duplicate warnings when compiling source to find ast + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + astnode = ast.parse(content, "source", "exec") + + start, end = get_statement_startend2(lineno, astnode) + # we need to correct the end: + # - ast-parsing strips comments + # - there might be empty lines + # - we might have lesser indented code blocks at the end + if end is None: + end = len(source.lines) + + if end > start + 1: + # make sure we don't span differently indented code blocks + # by using the BlockFinder helper used which inspect.getsource() uses itself + block_finder = inspect.BlockFinder() + # if we start with an indented line, put blockfinder to "started" mode + block_finder.started = source.lines[start][0].isspace() + it = ((x + "\n") for x in source.lines[start:end]) + try: + for tok in tokenize.generate_tokens(lambda: next(it)): + block_finder.tokeneater(*tok) + except (inspect.EndOfBlock, IndentationError): + end = block_finder.last + start + except Exception: + pass + + # the end might still point to a comment or empty line, correct it + while end: + line = source.lines[end - 1].lstrip() + if line.startswith("#") or not line: + end -= 1 + else: + break + return astnode, start, end |