aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest/py3/_pytest/_code
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/python/pytest/py3/_pytest/_code
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/_code')
-rw-r--r--contrib/python/pytest/py3/_pytest/_code/__init__.py10
-rw-r--r--contrib/python/pytest/py3/_pytest/_code/code.py1191
-rw-r--r--contrib/python/pytest/py3/_pytest/_code/source.py416
3 files changed, 1617 insertions, 0 deletions
diff --git a/contrib/python/pytest/py3/_pytest/_code/__init__.py b/contrib/python/pytest/py3/_pytest/_code/__init__.py
new file mode 100644
index 0000000000..370e41dc9f
--- /dev/null
+++ b/contrib/python/pytest/py3/_pytest/_code/__init__.py
@@ -0,0 +1,10 @@
+""" python inspection/code generation API """
+from .code import Code # noqa
+from .code import ExceptionInfo # noqa
+from .code import filter_traceback # noqa
+from .code import Frame # noqa
+from .code import getrawcode # noqa
+from .code import Traceback # noqa
+from .source import compile_ as compile # noqa
+from .source import getfslineno # noqa
+from .source import Source # noqa
diff --git a/contrib/python/pytest/py3/_pytest/_code/code.py b/contrib/python/pytest/py3/_pytest/_code/code.py
new file mode 100644
index 0000000000..965074c924
--- /dev/null
+++ b/contrib/python/pytest/py3/_pytest/_code/code.py
@@ -0,0 +1,1191 @@
+import inspect
+import re
+import sys
+import traceback
+from inspect import CO_VARARGS
+from inspect import CO_VARKEYWORDS
+from io import StringIO
+from traceback import format_exception_only
+from types import CodeType
+from types import FrameType
+from types import TracebackType
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Generic
+from typing import Iterable
+from typing import List
+from typing import Optional
+from typing import Pattern
+from typing import Sequence
+from typing import Set
+from typing import Tuple
+from typing import TypeVar
+from typing import Union
+from weakref import ref
+
+import attr
+import pluggy
+import py
+
+import _pytest
+from _pytest._io import TerminalWriter
+from _pytest._io.saferepr import safeformat
+from _pytest._io.saferepr import saferepr
+from _pytest.compat import ATTRS_EQ_FIELD
+from _pytest.compat import overload
+from _pytest.compat import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from typing import Type
+ from typing_extensions import Literal
+ from weakref import ReferenceType # noqa: F401
+
+ from _pytest._code import Source
+
+ _TracebackStyle = Literal["long", "short", "line", "no", "native"]
+
+
+class Code:
+ """ wrapper around Python code objects """
+
+ def __init__(self, rawcode) -> None:
+ if not hasattr(rawcode, "co_filename"):
+ rawcode = getrawcode(rawcode)
+ if not isinstance(rawcode, CodeType):
+ raise TypeError("not a code object: {!r}".format(rawcode))
+ self.filename = rawcode.co_filename
+ self.firstlineno = rawcode.co_firstlineno - 1
+ self.name = rawcode.co_name
+ self.raw = rawcode
+
+ def __eq__(self, other):
+ return self.raw == other.raw
+
+ # Ignore type because of https://github.com/python/mypy/issues/4266.
+ __hash__ = None # type: ignore
+
+ def __ne__(self, other):
+ return not self == other
+
+ @property
+ def path(self) -> Union[py.path.local, str]:
+ """ return a path object pointing to source code (or a str in case
+ of OSError / non-existing file).
+ """
+ if not self.raw.co_filename:
+ return ""
+ try:
+ p = py.path.local(self.raw.co_filename)
+ # maybe don't try this checking
+ if not p.check():
+ raise OSError("py.path check failed.")
+ return p
+ except OSError:
+ # XXX maybe try harder like the weird logic
+ # in the standard lib [linecache.updatecache] does?
+ return self.raw.co_filename
+
+ @property
+ def fullsource(self) -> Optional["Source"]:
+ """ return a _pytest._code.Source object for the full source file of the code
+ """
+ from _pytest._code import source
+
+ full, _ = source.findsource(self.raw)
+ return full
+
+ def source(self) -> "Source":
+ """ return a _pytest._code.Source object for the code object's source only
+ """
+ # return source only for that part of code
+ import _pytest._code
+
+ return _pytest._code.Source(self.raw)
+
+ def getargs(self, var: bool = False) -> Tuple[str, ...]:
+ """ return a tuple with the argument names for the code object
+
+ if 'var' is set True also return the names of the variable and
+ keyword arguments when present
+ """
+ # handfull shortcut for getting args
+ raw = self.raw
+ argcount = raw.co_argcount
+ if var:
+ argcount += raw.co_flags & CO_VARARGS
+ argcount += raw.co_flags & CO_VARKEYWORDS
+ return raw.co_varnames[:argcount]
+
+
+class Frame:
+ """Wrapper around a Python frame holding f_locals and f_globals
+ in which expressions can be evaluated."""
+
+ def __init__(self, frame: FrameType) -> None:
+ self.lineno = frame.f_lineno - 1
+ self.f_globals = frame.f_globals
+ self.f_locals = frame.f_locals
+ self.raw = frame
+ self.code = Code(frame.f_code)
+
+ @property
+ def statement(self) -> "Source":
+ """ statement this frame is at """
+ import _pytest._code
+
+ if self.code.fullsource is None:
+ return _pytest._code.Source("")
+ return self.code.fullsource.getstatement(self.lineno)
+
+ def eval(self, code, **vars):
+ """ evaluate 'code' in the frame
+
+ 'vars' are optional additional local variables
+
+ returns the result of the evaluation
+ """
+ f_locals = self.f_locals.copy()
+ f_locals.update(vars)
+ return eval(code, self.f_globals, f_locals)
+
+ def exec_(self, code, **vars) -> None:
+ """ exec 'code' in the frame
+
+ 'vars' are optional; additional local variables
+ """
+ f_locals = self.f_locals.copy()
+ f_locals.update(vars)
+ exec(code, self.f_globals, f_locals)
+
+ def repr(self, object: object) -> str:
+ """ return a 'safe' (non-recursive, one-line) string repr for 'object'
+ """
+ return saferepr(object)
+
+ def is_true(self, object):
+ return object
+
+ def getargs(self, var: bool = False):
+ """ return a list of tuples (name, value) for all arguments
+
+ if 'var' is set True also include the variable and keyword
+ arguments when present
+ """
+ retval = []
+ for arg in self.code.getargs(var):
+ try:
+ retval.append((arg, self.f_locals[arg]))
+ except KeyError:
+ pass # this can occur when using Psyco
+ return retval
+
+
+class TracebackEntry:
+ """ a single entry in a traceback """
+
+ _repr_style = None # type: Optional[Literal["short", "long"]]
+ exprinfo = None
+
+ def __init__(self, rawentry: TracebackType, excinfo=None) -> None:
+ self._excinfo = excinfo
+ self._rawentry = rawentry
+ self.lineno = rawentry.tb_lineno - 1
+
+ def set_repr_style(self, mode: "Literal['short', 'long']") -> None:
+ assert mode in ("short", "long")
+ self._repr_style = mode
+
+ @property
+ def frame(self) -> Frame:
+ return Frame(self._rawentry.tb_frame)
+
+ @property
+ def relline(self) -> int:
+ return self.lineno - self.frame.code.firstlineno
+
+ def __repr__(self) -> str:
+ return "<TracebackEntry %s:%d>" % (self.frame.code.path, self.lineno + 1)
+
+ @property
+ def statement(self) -> "Source":
+ """ _pytest._code.Source object for the current statement """
+ source = self.frame.code.fullsource
+ assert source is not None
+ return source.getstatement(self.lineno)
+
+ @property
+ def path(self):
+ """ path to the source code """
+ return self.frame.code.path
+
+ @property
+ def locals(self) -> Dict[str, Any]:
+ """ locals of underlying frame """
+ return self.frame.f_locals
+
+ def getfirstlinesource(self) -> int:
+ return self.frame.code.firstlineno
+
+ def getsource(self, astcache=None) -> Optional["Source"]:
+ """ return failing source code. """
+ # we use the passed in astcache to not reparse asttrees
+ # within exception info printing
+ from _pytest._code.source import getstatementrange_ast
+
+ source = self.frame.code.fullsource
+ if source is None:
+ return None
+ key = astnode = None
+ if astcache is not None:
+ key = self.frame.code.path
+ if key is not None:
+ astnode = astcache.get(key, None)
+ start = self.getfirstlinesource()
+ try:
+ astnode, _, end = getstatementrange_ast(
+ self.lineno, source, astnode=astnode
+ )
+ except SyntaxError:
+ end = self.lineno + 1
+ else:
+ if key is not None:
+ astcache[key] = astnode
+ return source[start:end]
+
+ source = property(getsource)
+
+ def ishidden(self):
+ """ return True if the current frame has a var __tracebackhide__
+ resolving to True.
+
+ If __tracebackhide__ is a callable, it gets called with the
+ ExceptionInfo instance and can decide whether to hide the traceback.
+
+ mostly for internal use
+ """
+ f = self.frame
+ tbh = f.f_locals.get(
+ "__tracebackhide__", f.f_globals.get("__tracebackhide__", False)
+ )
+ if tbh and callable(tbh):
+ return tbh(None if self._excinfo is None else self._excinfo())
+ return tbh
+
+ def __str__(self) -> str:
+ try:
+ fn = str(self.path)
+ except py.error.Error:
+ fn = "???"
+ name = self.frame.code.name
+ try:
+ line = str(self.statement).lstrip()
+ except KeyboardInterrupt:
+ raise
+ except: # noqa
+ line = "???"
+ return " File %r:%d in %s\n %s\n" % (fn, self.lineno + 1, name, line)
+
+ @property
+ def name(self) -> str:
+ """ co_name of underlying code """
+ return self.frame.code.raw.co_name
+
+
+class Traceback(List[TracebackEntry]):
+ """ Traceback objects encapsulate and offer higher level
+ access to Traceback entries.
+ """
+
+ def __init__(
+ self,
+ tb: Union[TracebackType, Iterable[TracebackEntry]],
+ excinfo: Optional["ReferenceType[ExceptionInfo]"] = None,
+ ) -> None:
+ """ initialize from given python traceback object and ExceptionInfo """
+ self._excinfo = excinfo
+ if isinstance(tb, TracebackType):
+
+ def f(cur: TracebackType) -> Iterable[TracebackEntry]:
+ cur_ = cur # type: Optional[TracebackType]
+ while cur_ is not None:
+ yield TracebackEntry(cur_, excinfo=excinfo)
+ cur_ = cur_.tb_next
+
+ super().__init__(f(tb))
+ else:
+ super().__init__(tb)
+
+ def cut(
+ self,
+ path=None,
+ lineno: Optional[int] = None,
+ firstlineno: Optional[int] = None,
+ excludepath=None,
+ ) -> "Traceback":
+ """ return a Traceback instance wrapping part of this Traceback
+
+ by providing any combination of path, lineno and firstlineno, the
+ first frame to start the to-be-returned traceback is determined
+
+ this allows cutting the first part of a Traceback instance e.g.
+ for formatting reasons (removing some uninteresting bits that deal
+ with handling of the exception/traceback)
+ """
+ for x in self:
+ code = x.frame.code
+ codepath = code.path
+ if (
+ (path is None or codepath == path)
+ and (
+ excludepath is None
+ or not isinstance(codepath, py.path.local)
+ or not codepath.relto(excludepath)
+ )
+ and (lineno is None or x.lineno == lineno)
+ and (firstlineno is None or x.frame.code.firstlineno == firstlineno)
+ ):
+ return Traceback(x._rawentry, self._excinfo)
+ return self
+
+ @overload
+ def __getitem__(self, key: int) -> TracebackEntry:
+ raise NotImplementedError()
+
+ @overload # noqa: F811
+ def __getitem__(self, key: slice) -> "Traceback": # noqa: F811
+ raise NotImplementedError()
+
+ def __getitem__( # noqa: F811
+ self, key: Union[int, slice]
+ ) -> Union[TracebackEntry, "Traceback"]:
+ if isinstance(key, slice):
+ return self.__class__(super().__getitem__(key))
+ else:
+ return super().__getitem__(key)
+
+ def filter(
+ self, fn: Callable[[TracebackEntry], bool] = lambda x: not x.ishidden()
+ ) -> "Traceback":
+ """ return a Traceback instance with certain items removed
+
+ fn is a function that gets a single argument, a TracebackEntry
+ instance, and should return True when the item should be added
+ to the Traceback, False when not
+
+ by default this removes all the TracebackEntries which are hidden
+ (see ishidden() above)
+ """
+ return Traceback(filter(fn, self), self._excinfo)
+
+ def getcrashentry(self) -> TracebackEntry:
+ """ return last non-hidden traceback entry that lead
+ to the exception of a traceback.
+ """
+ for i in range(-1, -len(self) - 1, -1):
+ entry = self[i]
+ if not entry.ishidden():
+ return entry
+ return self[-1]
+
+ def recursionindex(self) -> Optional[int]:
+ """ return the index of the frame/TracebackEntry where recursion
+ originates if appropriate, None if no recursion occurred
+ """
+ cache = {} # type: Dict[Tuple[Any, int, int], List[Dict[str, Any]]]
+ for i, entry in enumerate(self):
+ # id for the code.raw is needed to work around
+ # the strange metaprogramming in the decorator lib from pypi
+ # which generates code objects that have hash/value equality
+ # XXX needs a test
+ key = entry.frame.code.path, id(entry.frame.code.raw), entry.lineno
+ # print "checking for recursion at", key
+ values = cache.setdefault(key, [])
+ if values:
+ f = entry.frame
+ loc = f.f_locals
+ for otherloc in values:
+ if f.is_true(
+ f.eval(
+ co_equal,
+ __recursioncache_locals_1=loc,
+ __recursioncache_locals_2=otherloc,
+ )
+ ):
+ return i
+ values.append(entry.frame.f_locals)
+ return None
+
+
+co_equal = compile(
+ "__recursioncache_locals_1 == __recursioncache_locals_2", "?", "eval"
+)
+
+
+_E = TypeVar("_E", bound=BaseException)
+
+
+@attr.s(repr=False)
+class ExceptionInfo(Generic[_E]):
+ """ wraps sys.exc_info() objects and offers
+ help for navigating the traceback.
+ """
+
+ _assert_start_repr = "AssertionError('assert "
+
+ _excinfo = attr.ib(type=Optional[Tuple["Type[_E]", "_E", TracebackType]])
+ _striptext = attr.ib(type=str, default="")
+ _traceback = attr.ib(type=Optional[Traceback], default=None)
+
+ @classmethod
+ def from_exc_info(
+ cls,
+ exc_info: Tuple["Type[_E]", "_E", TracebackType],
+ exprinfo: Optional[str] = None,
+ ) -> "ExceptionInfo[_E]":
+ """returns an ExceptionInfo for an existing exc_info tuple.
+
+ .. warning::
+
+ Experimental API
+
+
+ :param exprinfo: a text string helping to determine if we should
+ strip ``AssertionError`` from the output, defaults
+ to the exception message/``__str__()``
+ """
+ _striptext = ""
+ if exprinfo is None and isinstance(exc_info[1], AssertionError):
+ exprinfo = getattr(exc_info[1], "msg", None)
+ if exprinfo is None:
+ exprinfo = saferepr(exc_info[1])
+ if exprinfo and exprinfo.startswith(cls._assert_start_repr):
+ _striptext = "AssertionError: "
+
+ return cls(exc_info, _striptext)
+
+ @classmethod
+ def from_current(
+ cls, exprinfo: Optional[str] = None
+ ) -> "ExceptionInfo[BaseException]":
+ """returns an ExceptionInfo matching the current traceback
+
+ .. warning::
+
+ Experimental API
+
+
+ :param exprinfo: a text string helping to determine if we should
+ strip ``AssertionError`` from the output, defaults
+ to the exception message/``__str__()``
+ """
+ tup = sys.exc_info()
+ assert tup[0] is not None, "no current exception"
+ assert tup[1] is not None, "no current exception"
+ assert tup[2] is not None, "no current exception"
+ exc_info = (tup[0], tup[1], tup[2])
+ return ExceptionInfo.from_exc_info(exc_info, exprinfo)
+
+ @classmethod
+ def for_later(cls) -> "ExceptionInfo[_E]":
+ """return an unfilled ExceptionInfo
+ """
+ return cls(None)
+
+ def fill_unfilled(self, exc_info: Tuple["Type[_E]", _E, TracebackType]) -> None:
+ """fill an unfilled ExceptionInfo created with for_later()"""
+ assert self._excinfo is None, "ExceptionInfo was already filled"
+ self._excinfo = exc_info
+
+ @property
+ def type(self) -> "Type[_E]":
+ """the exception class"""
+ assert (
+ self._excinfo is not None
+ ), ".type can only be used after the context manager exits"
+ return self._excinfo[0]
+
+ @property
+ def value(self) -> _E:
+ """the exception value"""
+ assert (
+ self._excinfo is not None
+ ), ".value can only be used after the context manager exits"
+ return self._excinfo[1]
+
+ @property
+ def tb(self) -> TracebackType:
+ """the exception raw traceback"""
+ assert (
+ self._excinfo is not None
+ ), ".tb can only be used after the context manager exits"
+ return self._excinfo[2]
+
+ @property
+ def typename(self) -> str:
+ """the type name of the exception"""
+ assert (
+ self._excinfo is not None
+ ), ".typename can only be used after the context manager exits"
+ return self.type.__name__
+
+ @property
+ def traceback(self) -> Traceback:
+ """the traceback"""
+ if self._traceback is None:
+ self._traceback = Traceback(self.tb, excinfo=ref(self))
+ return self._traceback
+
+ @traceback.setter
+ def traceback(self, value: Traceback) -> None:
+ self._traceback = value
+
+ def __repr__(self) -> str:
+ if self._excinfo is None:
+ return "<ExceptionInfo for raises contextmanager>"
+ return "<{} {} tblen={}>".format(
+ self.__class__.__name__, saferepr(self._excinfo[1]), len(self.traceback)
+ )
+
+ def exconly(self, tryshort: bool = False) -> str:
+ """ return the exception as a string
+
+ when 'tryshort' resolves to True, and the exception is a
+ _pytest._code._AssertionError, only the actual exception part of
+ the exception representation is returned (so 'AssertionError: ' is
+ removed from the beginning)
+ """
+ lines = format_exception_only(self.type, self.value)
+ text = "".join(lines)
+ text = text.rstrip()
+ if tryshort:
+ if text.startswith(self._striptext):
+ text = text[len(self._striptext) :]
+ return text
+
+ def errisinstance(
+ self, exc: Union["Type[BaseException]", Tuple["Type[BaseException]", ...]]
+ ) -> bool:
+ """ return True if the exception is an instance of exc """
+ return isinstance(self.value, exc)
+
+ def _getreprcrash(self) -> "ReprFileLocation":
+ exconly = self.exconly(tryshort=True)
+ entry = self.traceback.getcrashentry()
+ path, lineno = entry.frame.code.raw.co_filename, entry.lineno
+ return ReprFileLocation(path, lineno + 1, exconly)
+
+ def getrepr(
+ self,
+ showlocals: bool = False,
+ style: "_TracebackStyle" = "long",
+ abspath: bool = False,
+ tbfilter: bool = True,
+ funcargs: bool = False,
+ truncate_locals: bool = True,
+ chain: bool = True,
+ ) -> Union["ReprExceptionInfo", "ExceptionChainRepr"]:
+ """
+ Return str()able representation of this exception info.
+
+ :param bool showlocals:
+ Show locals per traceback entry.
+ Ignored if ``style=="native"``.
+
+ :param str style: long|short|no|native traceback style
+
+ :param bool abspath:
+ If paths should be changed to absolute or left unchanged.
+
+ :param bool tbfilter:
+ Hide entries that contain a local variable ``__tracebackhide__==True``.
+ Ignored if ``style=="native"``.
+
+ :param bool funcargs:
+ Show fixtures ("funcargs" for legacy purposes) per traceback entry.
+
+ :param bool truncate_locals:
+ With ``showlocals==True``, make sure locals can be safely represented as strings.
+
+ :param bool chain: if chained exceptions in Python 3 should be shown.
+
+ .. versionchanged:: 3.9
+
+ Added the ``chain`` parameter.
+ """
+ if style == "native":
+ return ReprExceptionInfo(
+ ReprTracebackNative(
+ traceback.format_exception(
+ self.type, self.value, self.traceback[0]._rawentry
+ )
+ ),
+ self._getreprcrash(),
+ )
+
+ fmt = FormattedExcinfo(
+ showlocals=showlocals,
+ style=style,
+ abspath=abspath,
+ tbfilter=tbfilter,
+ funcargs=funcargs,
+ truncate_locals=truncate_locals,
+ chain=chain,
+ )
+ return fmt.repr_excinfo(self)
+
+ def match(self, regexp: "Union[str, Pattern]") -> "Literal[True]":
+ """
+ Check whether the regular expression `regexp` matches the string
+ representation of the exception using :func:`python:re.search`.
+ If it matches `True` is returned.
+ If it doesn't match an `AssertionError` is raised.
+ """
+ __tracebackhide__ = True
+ assert re.search(
+ regexp, str(self.value)
+ ), "Pattern {!r} does not match {!r}".format(regexp, str(self.value))
+ # Return True to allow for "assert excinfo.match()".
+ return True
+
+
+@attr.s
+class FormattedExcinfo:
+ """ presenting information about failing Functions and Generators. """
+
+ # for traceback entries
+ flow_marker = ">"
+ fail_marker = "E"
+
+ showlocals = attr.ib(type=bool, default=False)
+ style = attr.ib(type="_TracebackStyle", default="long")
+ abspath = attr.ib(type=bool, default=True)
+ tbfilter = attr.ib(type=bool, default=True)
+ funcargs = attr.ib(type=bool, default=False)
+ truncate_locals = attr.ib(type=bool, default=True)
+ chain = attr.ib(type=bool, default=True)
+ astcache = attr.ib(default=attr.Factory(dict), init=False, repr=False)
+
+ def _getindent(self, source: "Source") -> int:
+ # figure out indent for given source
+ try:
+ s = str(source.getstatement(len(source) - 1))
+ except KeyboardInterrupt:
+ raise
+ except: # noqa
+ try:
+ s = str(source[-1])
+ except KeyboardInterrupt:
+ raise
+ except: # noqa
+ return 0
+ return 4 + (len(s) - len(s.lstrip()))
+
+ def _getentrysource(self, entry: TracebackEntry) -> Optional["Source"]:
+ source = entry.getsource(self.astcache)
+ if source is not None:
+ source = source.deindent()
+ return source
+
+ def repr_args(self, entry: TracebackEntry) -> Optional["ReprFuncArgs"]:
+ if self.funcargs:
+ args = []
+ for argname, argvalue in entry.frame.getargs(var=True):
+ args.append((argname, saferepr(argvalue)))
+ return ReprFuncArgs(args)
+ return None
+
+ def get_source(
+ self,
+ source: "Source",
+ line_index: int = -1,
+ excinfo: Optional[ExceptionInfo] = None,
+ short: bool = False,
+ ) -> List[str]:
+ """ return formatted and marked up source lines. """
+ import _pytest._code
+
+ lines = []
+ if source is None or line_index >= len(source.lines):
+ source = _pytest._code.Source("???")
+ line_index = 0
+ if line_index < 0:
+ line_index += len(source)
+ space_prefix = " "
+ if short:
+ lines.append(space_prefix + source.lines[line_index].strip())
+ else:
+ for line in source.lines[:line_index]:
+ lines.append(space_prefix + line)
+ lines.append(self.flow_marker + " " + source.lines[line_index])
+ for line in source.lines[line_index + 1 :]:
+ lines.append(space_prefix + line)
+ if excinfo is not None:
+ indent = 4 if short else self._getindent(source)
+ lines.extend(self.get_exconly(excinfo, indent=indent, markall=True))
+ return lines
+
+ def get_exconly(
+ self, excinfo: ExceptionInfo, indent: int = 4, markall: bool = False
+ ) -> List[str]:
+ lines = []
+ indentstr = " " * indent
+ # get the real exception information out
+ exlines = excinfo.exconly(tryshort=True).split("\n")
+ failindent = self.fail_marker + indentstr[1:]
+ for line in exlines:
+ lines.append(failindent + line)
+ if not markall:
+ failindent = indentstr
+ return lines
+
+ def repr_locals(self, locals: Dict[str, object]) -> Optional["ReprLocals"]:
+ if self.showlocals:
+ lines = []
+ keys = [loc for loc in locals if loc[0] != "@"]
+ keys.sort()
+ for name in keys:
+ value = locals[name]
+ if name == "__builtins__":
+ lines.append("__builtins__ = <builtins>")
+ else:
+ # This formatting could all be handled by the
+ # _repr() function, which is only reprlib.Repr in
+ # disguise, so is very configurable.
+ if self.truncate_locals:
+ str_repr = saferepr(value)
+ else:
+ str_repr = safeformat(value)
+ # if len(str_repr) < 70 or not isinstance(value,
+ # (list, tuple, dict)):
+ lines.append("{:<10} = {}".format(name, str_repr))
+ # else:
+ # self._line("%-10s =\\" % (name,))
+ # # XXX
+ # pprint.pprint(value, stream=self.excinfowriter)
+ return ReprLocals(lines)
+ return None
+
+ def repr_traceback_entry(
+ self, entry: TracebackEntry, excinfo: Optional[ExceptionInfo] = None
+ ) -> "ReprEntry":
+ import _pytest._code
+
+ source = self._getentrysource(entry)
+ if source is None:
+ source = _pytest._code.Source("???")
+ line_index = 0
+ else:
+ line_index = entry.lineno - entry.getfirstlinesource()
+
+ lines = [] # type: List[str]
+ style = entry._repr_style if entry._repr_style is not None else self.style
+ if style in ("short", "long"):
+ short = style == "short"
+ reprargs = self.repr_args(entry) if not short else None
+ s = self.get_source(source, line_index, excinfo, short=short)
+ lines.extend(s)
+ if short:
+ message = "in %s" % (entry.name)
+ else:
+ message = excinfo and excinfo.typename or ""
+ path = self._makepath(entry.path)
+ reprfileloc = ReprFileLocation(path, entry.lineno + 1, message)
+ localsrepr = self.repr_locals(entry.locals)
+ return ReprEntry(lines, reprargs, localsrepr, reprfileloc, style)
+ if excinfo:
+ lines.extend(self.get_exconly(excinfo, indent=4))
+ return ReprEntry(lines, None, None, None, style)
+
+ def _makepath(self, path):
+ if not self.abspath:
+ try:
+ np = py.path.local().bestrelpath(path)
+ except OSError:
+ return path
+ if len(np) < len(str(path)):
+ path = np
+ return path
+
+ def repr_traceback(self, excinfo: ExceptionInfo) -> "ReprTraceback":
+ traceback = excinfo.traceback
+ if self.tbfilter:
+ traceback = traceback.filter()
+
+ if excinfo.errisinstance(RecursionError):
+ traceback, extraline = self._truncate_recursive_traceback(traceback)
+ else:
+ extraline = None
+
+ last = traceback[-1]
+ entries = []
+ for index, entry in enumerate(traceback):
+ einfo = (last == entry) and excinfo or None
+ reprentry = self.repr_traceback_entry(entry, einfo)
+ entries.append(reprentry)
+ return ReprTraceback(entries, extraline, style=self.style)
+
+ def _truncate_recursive_traceback(
+ self, traceback: Traceback
+ ) -> Tuple[Traceback, Optional[str]]:
+ """
+ Truncate the given recursive traceback trying to find the starting point
+ of the recursion.
+
+ The detection is done by going through each traceback entry and finding the
+ point in which the locals of the frame are equal to the locals of a previous frame (see ``recursionindex()``.
+
+ Handle the situation where the recursion process might raise an exception (for example
+ comparing numpy arrays using equality raises a TypeError), in which case we do our best to
+ warn the user of the error and show a limited traceback.
+ """
+ try:
+ recursionindex = traceback.recursionindex()
+ except Exception as e:
+ max_frames = 10
+ extraline = (
+ "!!! Recursion error detected, but an error occurred locating the origin of recursion.\n"
+ " The following exception happened when comparing locals in the stack frame:\n"
+ " {exc_type}: {exc_msg}\n"
+ " Displaying first and last {max_frames} stack frames out of {total}."
+ ).format(
+ exc_type=type(e).__name__,
+ exc_msg=str(e),
+ max_frames=max_frames,
+ total=len(traceback),
+ ) # type: Optional[str]
+ # Type ignored because adding two instaces of a List subtype
+ # currently incorrectly has type List instead of the subtype.
+ traceback = traceback[:max_frames] + traceback[-max_frames:] # type: ignore
+ else:
+ if recursionindex is not None:
+ extraline = "!!! Recursion detected (same locals & position)"
+ traceback = traceback[: recursionindex + 1]
+ else:
+ extraline = None
+
+ return traceback, extraline
+
+ def repr_excinfo(self, excinfo: ExceptionInfo) -> "ExceptionChainRepr":
+ repr_chain = (
+ []
+ ) # type: List[Tuple[ReprTraceback, Optional[ReprFileLocation], Optional[str]]]
+ e = excinfo.value
+ excinfo_ = excinfo # type: Optional[ExceptionInfo]
+ descr = None
+ seen = set() # type: Set[int]
+ while e is not None and id(e) not in seen:
+ seen.add(id(e))
+ if excinfo_:
+ reprtraceback = self.repr_traceback(excinfo_)
+ reprcrash = excinfo_._getreprcrash() # type: Optional[ReprFileLocation]
+ else:
+ # fallback to native repr if the exception doesn't have a traceback:
+ # ExceptionInfo objects require a full traceback to work
+ reprtraceback = ReprTracebackNative(
+ traceback.format_exception(type(e), e, None)
+ )
+ reprcrash = None
+
+ repr_chain += [(reprtraceback, reprcrash, descr)]
+ if e.__cause__ is not None and self.chain:
+ e = e.__cause__
+ excinfo_ = (
+ ExceptionInfo((type(e), e, e.__traceback__))
+ if e.__traceback__
+ else None
+ )
+ descr = "The above exception was the direct cause of the following exception:"
+ elif (
+ e.__context__ is not None and not e.__suppress_context__ and self.chain
+ ):
+ e = e.__context__
+ excinfo_ = (
+ ExceptionInfo((type(e), e, e.__traceback__))
+ if e.__traceback__
+ else None
+ )
+ descr = "During handling of the above exception, another exception occurred:"
+ else:
+ e = None
+ repr_chain.reverse()
+ return ExceptionChainRepr(repr_chain)
+
+
+@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore
+class TerminalRepr:
+ def __str__(self) -> str:
+ # FYI this is called from pytest-xdist's serialization of exception
+ # information.
+ io = StringIO()
+ tw = TerminalWriter(file=io)
+ self.toterminal(tw)
+ return io.getvalue().strip()
+
+ def __repr__(self) -> str:
+ return "<{} instance at {:0x}>".format(self.__class__, id(self))
+
+ def toterminal(self, tw: TerminalWriter) -> None:
+ raise NotImplementedError()
+
+
+@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore
+class ExceptionRepr(TerminalRepr):
+ def __attrs_post_init__(self):
+ self.sections = [] # type: List[Tuple[str, str, str]]
+
+ def addsection(self, name: str, content: str, sep: str = "-") -> None:
+ self.sections.append((name, content, sep))
+
+ def toterminal(self, tw: TerminalWriter) -> None:
+ for name, content, sep in self.sections:
+ tw.sep(sep, name)
+ tw.line(content)
+
+
+@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore
+class ExceptionChainRepr(ExceptionRepr):
+ chain = attr.ib(
+ type=Sequence[
+ Tuple["ReprTraceback", Optional["ReprFileLocation"], Optional[str]]
+ ]
+ )
+
+ def __attrs_post_init__(self):
+ super().__attrs_post_init__()
+ # reprcrash and reprtraceback of the outermost (the newest) exception
+ # in the chain
+ self.reprtraceback = self.chain[-1][0]
+ self.reprcrash = self.chain[-1][1]
+
+ def toterminal(self, tw: TerminalWriter) -> None:
+ for element in self.chain:
+ element[0].toterminal(tw)
+ if element[2] is not None:
+ tw.line("")
+ tw.line(element[2], yellow=True)
+ super().toterminal(tw)
+
+
+@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore
+class ReprExceptionInfo(ExceptionRepr):
+ reprtraceback = attr.ib(type="ReprTraceback")
+ reprcrash = attr.ib(type="ReprFileLocation")
+
+ def toterminal(self, tw: TerminalWriter) -> None:
+ self.reprtraceback.toterminal(tw)
+ super().toterminal(tw)
+
+
+@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore
+class ReprTraceback(TerminalRepr):
+ reprentries = attr.ib(type=Sequence[Union["ReprEntry", "ReprEntryNative"]])
+ extraline = attr.ib(type=Optional[str])
+ style = attr.ib(type="_TracebackStyle")
+
+ entrysep = "_ "
+
+ def toterminal(self, tw: TerminalWriter) -> None:
+ # the entries might have different styles
+ for i, entry in enumerate(self.reprentries):
+ if entry.style == "long":
+ tw.line("")
+ entry.toterminal(tw)
+ if i < len(self.reprentries) - 1:
+ next_entry = self.reprentries[i + 1]
+ if (
+ entry.style == "long"
+ or entry.style == "short"
+ and next_entry.style == "long"
+ ):
+ tw.sep(self.entrysep)
+
+ if self.extraline:
+ tw.line(self.extraline)
+
+
+class ReprTracebackNative(ReprTraceback):
+ def __init__(self, tblines: Sequence[str]) -> None:
+ self.style = "native"
+ self.reprentries = [ReprEntryNative(tblines)]
+ self.extraline = None
+
+
+@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore
+class ReprEntryNative(TerminalRepr):
+ lines = attr.ib(type=Sequence[str])
+ style = "native" # type: _TracebackStyle
+
+ def toterminal(self, tw: TerminalWriter) -> None:
+ tw.write("".join(self.lines))
+
+
+@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore
+class ReprEntry(TerminalRepr):
+ lines = attr.ib(type=Sequence[str])
+ reprfuncargs = attr.ib(type=Optional["ReprFuncArgs"])
+ reprlocals = attr.ib(type=Optional["ReprLocals"])
+ reprfileloc = attr.ib(type=Optional["ReprFileLocation"])
+ style = attr.ib(type="_TracebackStyle")
+
+ def _write_entry_lines(self, tw: TerminalWriter) -> None:
+ """Writes the source code portions of a list of traceback entries with syntax highlighting.
+
+ Usually entries are lines like these:
+
+ " x = 1"
+ "> assert x == 2"
+ "E assert 1 == 2"
+
+ This function takes care of rendering the "source" portions of it (the lines without
+ the "E" prefix) using syntax highlighting, taking care to not highlighting the ">"
+ character, as doing so might break line continuations.
+ """
+
+ indent_size = 4
+
+ def is_fail(line):
+ return line.startswith("{} ".format(FormattedExcinfo.fail_marker))
+
+ if not self.lines:
+ return
+
+ # separate indents and source lines that are not failures: we want to
+ # highlight the code but not the indentation, which may contain markers
+ # such as "> assert 0"
+ indents = []
+ source_lines = []
+ for line in self.lines:
+ if not is_fail(line):
+ indents.append(line[:indent_size])
+ source_lines.append(line[indent_size:])
+
+ tw._write_source(source_lines, indents)
+
+ # failure lines are always completely red and bold
+ for line in (x for x in self.lines if is_fail(x)):
+ tw.line(line, bold=True, red=True)
+
+ def toterminal(self, tw: TerminalWriter) -> None:
+ if self.style == "short":
+ assert self.reprfileloc is not None
+ self.reprfileloc.toterminal(tw)
+ self._write_entry_lines(tw)
+ if self.reprlocals:
+ self.reprlocals.toterminal(tw, indent=" " * 8)
+ return
+
+ if self.reprfuncargs:
+ self.reprfuncargs.toterminal(tw)
+
+ self._write_entry_lines(tw)
+
+ if self.reprlocals:
+ tw.line("")
+ self.reprlocals.toterminal(tw)
+ if self.reprfileloc:
+ if self.lines:
+ tw.line("")
+ self.reprfileloc.toterminal(tw)
+
+ def __str__(self) -> str:
+ return "{}\n{}\n{}".format(
+ "\n".join(self.lines), self.reprlocals, self.reprfileloc
+ )
+
+
+@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore
+class ReprFileLocation(TerminalRepr):
+ path = attr.ib(type=str, converter=str)
+ lineno = attr.ib(type=int)
+ message = attr.ib(type=str)
+
+ def toterminal(self, tw: TerminalWriter) -> None:
+ # filename and lineno output for each entry,
+ # using an output format that most editors understand
+ msg = self.message
+ i = msg.find("\n")
+ if i != -1:
+ msg = msg[:i]
+ tw.write(self.path, bold=True, red=True)
+ tw.line(":{}: {}".format(self.lineno, msg))
+
+
+@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore
+class ReprLocals(TerminalRepr):
+ lines = attr.ib(type=Sequence[str])
+
+ def toterminal(self, tw: TerminalWriter, indent="") -> None:
+ for line in self.lines:
+ tw.line(indent + line)
+
+
+@attr.s(**{ATTRS_EQ_FIELD: False}) # type: ignore
+class ReprFuncArgs(TerminalRepr):
+ args = attr.ib(type=Sequence[Tuple[str, object]])
+
+ def toterminal(self, tw: TerminalWriter) -> None:
+ if self.args:
+ linesofar = ""
+ for name, value in self.args:
+ ns = "{} = {}".format(name, value)
+ if len(ns) + len(linesofar) + 2 > tw.fullwidth:
+ if linesofar:
+ tw.line(linesofar)
+ linesofar = ns
+ else:
+ if linesofar:
+ linesofar += ", " + ns
+ else:
+ linesofar = ns
+ if linesofar:
+ tw.line(linesofar)
+ tw.line("")
+
+
+def getrawcode(obj, trycall: bool = True):
+ """ return code object for given function. """
+ try:
+ return obj.__code__
+ except AttributeError:
+ obj = getattr(obj, "f_code", obj)
+ obj = getattr(obj, "__code__", obj)
+ if trycall and not hasattr(obj, "co_firstlineno"):
+ if hasattr(obj, "__call__") and not inspect.isclass(obj):
+ x = getrawcode(obj.__call__, trycall=False)
+ if hasattr(x, "co_firstlineno"):
+ return x
+ return obj
+
+
+# relative paths that we use to filter traceback entries from appearing to the user;
+# see filter_traceback
+# note: if we need to add more paths than what we have now we should probably use a list
+# for better maintenance
+
+_PLUGGY_DIR = py.path.local(pluggy.__file__.rstrip("oc"))
+# pluggy is either a package or a single module depending on the version
+if _PLUGGY_DIR.basename == "__init__.py":
+ _PLUGGY_DIR = _PLUGGY_DIR.dirpath()
+_PYTEST_DIR = py.path.local(_pytest.__file__).dirpath()
+_PY_DIR = py.path.local(py.__file__).dirpath()
+
+
+def filter_traceback(entry: TracebackEntry) -> bool:
+ """Return True if a TracebackEntry instance should be removed from tracebacks:
+ * dynamically generated code (no code to show up for it);
+ * internal traceback from pytest or its internal libraries, py and pluggy.
+ """
+ # entry.path might sometimes return a str object when the entry
+ # points to dynamically generated code
+ # see https://bitbucket.org/pytest-dev/py/issues/71
+ raw_filename = entry.frame.code.raw.co_filename
+ is_generated = "<" in raw_filename and ">" in raw_filename
+ if is_generated:
+ return False
+ # entry.path might point to a non-existing file, in which case it will
+ # also return a str object. see #1133
+ p = py.path.local(entry.path)
+ return (
+ not p.relto(_PLUGGY_DIR) and not p.relto(_PYTEST_DIR) and not p.relto(_PY_DIR)
+ )
diff --git a/contrib/python/pytest/py3/_pytest/_code/source.py b/contrib/python/pytest/py3/_pytest/_code/source.py
new file mode 100644
index 0000000000..28c11e5d5e
--- /dev/null
+++ b/contrib/python/pytest/py3/_pytest/_code/source.py
@@ -0,0 +1,416 @@
+import ast
+import inspect
+import linecache
+import sys
+import textwrap
+import tokenize
+import warnings
+from bisect import bisect_right
+from types import CodeType
+from types import FrameType
+from typing import Any
+from typing import Iterator
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+from typing import Union
+
+import py
+
+from _pytest.compat import get_real_func
+from _pytest.compat import overload
+from _pytest.compat import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from typing_extensions import Literal
+
+
+class Source:
+ """ an immutable object holding a source code fragment,
+ possibly deindenting it.
+ """
+
+ _compilecounter = 0
+
+ def __init__(self, *parts, **kwargs) -> None:
+ self.lines = lines = [] # type: List[str]
+ de = kwargs.get("deindent", True)
+ for part in parts:
+ if not part:
+ partlines = [] # type: List[str]
+ elif isinstance(part, Source):
+ partlines = part.lines
+ elif isinstance(part, (tuple, list)):
+ partlines = [x.rstrip("\n") for x in part]
+ elif isinstance(part, str):
+ partlines = part.split("\n")
+ else:
+ partlines = getsource(part, deindent=de).lines
+ if de:
+ partlines = deindent(partlines)
+ lines.extend(partlines)
+
+ def __eq__(self, other):
+ try:
+ return self.lines == other.lines
+ except AttributeError:
+ if isinstance(other, str):
+ return str(self) == other
+ return False
+
+ # Ignore type because of https://github.com/python/mypy/issues/4266.
+ __hash__ = None # type: ignore
+
+ @overload
+ def __getitem__(self, key: int) -> str:
+ raise NotImplementedError()
+
+ @overload # noqa: F811
+ def __getitem__(self, key: slice) -> "Source": # noqa: F811
+ raise NotImplementedError()
+
+ def __getitem__(self, key: Union[int, slice]) -> Union[str, "Source"]: # noqa: F811
+ if isinstance(key, int):
+ return self.lines[key]
+ else:
+ if key.step not in (None, 1):
+ raise IndexError("cannot slice a Source with a step")
+ newsource = Source()
+ newsource.lines = self.lines[key.start : key.stop]
+ return newsource
+
+ def __iter__(self) -> Iterator[str]:
+ return iter(self.lines)
+
+ def __len__(self) -> int:
+ return len(self.lines)
+
+ def strip(self) -> "Source":
+ """ return new source object with trailing
+ and leading blank lines removed.
+ """
+ start, end = 0, len(self)
+ while start < end and not self.lines[start].strip():
+ start += 1
+ while end > start and not self.lines[end - 1].strip():
+ end -= 1
+ source = Source()
+ source.lines[:] = self.lines[start:end]
+ return source
+
+ def putaround(
+ self, before: str = "", after: str = "", indent: str = " " * 4
+ ) -> "Source":
+ """ return a copy of the source object with
+ 'before' and 'after' wrapped around it.
+ """
+ beforesource = Source(before)
+ aftersource = Source(after)
+ newsource = Source()
+ lines = [(indent + line) for line in self.lines]
+ newsource.lines = beforesource.lines + lines + aftersource.lines
+ return newsource
+
+ def indent(self, indent: str = " " * 4) -> "Source":
+ """ return a copy of the source object with
+ all lines indented by the given indent-string.
+ """
+ newsource = Source()
+ newsource.lines = [(indent + line) for line in self.lines]
+ return newsource
+
+ def getstatement(self, lineno: int) -> "Source":
+ """ return Source statement which contains the
+ given linenumber (counted from 0).
+ """
+ start, end = self.getstatementrange(lineno)
+ return self[start:end]
+
+ def getstatementrange(self, lineno: int) -> Tuple[int, int]:
+ """ return (start, end) tuple which spans the minimal
+ statement region which containing the given lineno.
+ """
+ if not (0 <= lineno < len(self)):
+ raise IndexError("lineno out of range")
+ ast, start, end = getstatementrange_ast(lineno, self)
+ return start, end
+
+ def deindent(self) -> "Source":
+ """return a new source object deindented."""
+ newsource = Source()
+ newsource.lines[:] = deindent(self.lines)
+ return newsource
+
+ def isparseable(self, deindent: bool = True) -> bool:
+ """ return True if source is parseable, heuristically
+ deindenting it by default.
+ """
+ if deindent:
+ source = str(self.deindent())
+ else:
+ source = str(self)
+ try:
+ ast.parse(source)
+ except (SyntaxError, ValueError, TypeError):
+ return False
+ else:
+ return True
+
+ def __str__(self) -> str:
+ return "\n".join(self.lines)
+
+ @overload
+ def compile(
+ self,
+ filename: Optional[str] = ...,
+ mode: str = ...,
+ flag: "Literal[0]" = ...,
+ dont_inherit: int = ...,
+ _genframe: Optional[FrameType] = ...,
+ ) -> CodeType:
+ raise NotImplementedError()
+
+ @overload # noqa: F811
+ def compile( # noqa: F811
+ self,
+ filename: Optional[str] = ...,
+ mode: str = ...,
+ flag: int = ...,
+ dont_inherit: int = ...,
+ _genframe: Optional[FrameType] = ...,
+ ) -> Union[CodeType, ast.AST]:
+ raise NotImplementedError()
+
+ def compile( # noqa: F811
+ self,
+ filename: Optional[str] = None,
+ mode: str = "exec",
+ flag: int = 0,
+ dont_inherit: int = 0,
+ _genframe: Optional[FrameType] = None,
+ ) -> Union[CodeType, ast.AST]:
+ """ return compiled code object. if filename is None
+ invent an artificial filename which displays
+ the source/line position of the caller frame.
+ """
+ if not filename or py.path.local(filename).check(file=0):
+ if _genframe is None:
+ _genframe = sys._getframe(1) # the caller
+ fn, lineno = _genframe.f_code.co_filename, _genframe.f_lineno
+ base = "<%d-codegen " % self._compilecounter
+ self.__class__._compilecounter += 1
+ if not filename:
+ filename = base + "%s:%d>" % (fn, lineno)
+ else:
+ filename = base + "%r %s:%d>" % (filename, fn, lineno)
+ source = "\n".join(self.lines) + "\n"
+ try:
+ co = compile(source, filename, mode, flag)
+ except SyntaxError as ex:
+ # re-represent syntax errors from parsing python strings
+ msglines = self.lines[: ex.lineno]
+ if ex.offset:
+ msglines.append(" " * ex.offset + "^")
+ msglines.append("(code was compiled probably from here: %s)" % filename)
+ newex = SyntaxError("\n".join(msglines))
+ newex.offset = ex.offset
+ newex.lineno = ex.lineno
+ newex.text = ex.text
+ raise newex
+ else:
+ if flag & ast.PyCF_ONLY_AST:
+ assert isinstance(co, ast.AST)
+ return co
+ assert isinstance(co, CodeType)
+ lines = [(x + "\n") for x in self.lines]
+ # Type ignored because linecache.cache is private.
+ linecache.cache[filename] = (1, None, lines, filename) # type: ignore
+ return co
+
+
+#
+# public API shortcut functions
+#
+
+
+@overload
+def compile_(
+ source: Union[str, bytes, ast.mod, ast.AST],
+ filename: Optional[str] = ...,
+ mode: str = ...,
+ flags: "Literal[0]" = ...,
+ dont_inherit: int = ...,
+) -> CodeType:
+ raise NotImplementedError()
+
+
+@overload # noqa: F811
+def compile_( # noqa: F811
+ source: Union[str, bytes, ast.mod, ast.AST],
+ filename: Optional[str] = ...,
+ mode: str = ...,
+ flags: int = ...,
+ dont_inherit: int = ...,
+) -> Union[CodeType, ast.AST]:
+ raise NotImplementedError()
+
+
+def compile_( # noqa: F811
+ source: Union[str, bytes, ast.mod, ast.AST],
+ filename: Optional[str] = None,
+ mode: str = "exec",
+ flags: int = 0,
+ dont_inherit: int = 0,
+) -> Union[CodeType, ast.AST]:
+ """ compile the given source to a raw code object,
+ and maintain an internal cache which allows later
+ retrieval of the source code for the code object
+ and any recursively created code objects.
+ """
+ if isinstance(source, ast.AST):
+ # XXX should Source support having AST?
+ assert filename is not None
+ co = compile(source, filename, mode, flags, dont_inherit)
+ assert isinstance(co, (CodeType, ast.AST))
+ return co
+ _genframe = sys._getframe(1) # the caller
+ s = Source(source)
+ return s.compile(filename, mode, flags, _genframe=_genframe)
+
+
+def getfslineno(obj: Any) -> Tuple[Union[str, py.path.local], int]:
+ """ Return source location (path, lineno) for the given object.
+ If the source cannot be determined return ("", -1).
+
+ The line number is 0-based.
+ """
+ from .code import Code
+
+ # xxx let decorators etc specify a sane ordering
+ # NOTE: this used to be done in _pytest.compat.getfslineno, initially added
+ # in 6ec13a2b9. It ("place_as") appears to be something very custom.
+ obj = get_real_func(obj)
+ if hasattr(obj, "place_as"):
+ obj = obj.place_as
+
+ try:
+ code = Code(obj)
+ except TypeError:
+ try:
+ fn = inspect.getsourcefile(obj) or inspect.getfile(obj)
+ except TypeError:
+ return "", -1
+
+ fspath = fn and py.path.local(fn) or ""
+ lineno = -1
+ if fspath:
+ try:
+ _, lineno = findsource(obj)
+ except IOError:
+ pass
+ return fspath, lineno
+ else:
+ return code.path, code.firstlineno
+
+
+#
+# helper functions
+#
+
+
+def findsource(obj) -> Tuple[Optional[Source], int]:
+ try:
+ sourcelines, lineno = inspect.findsource(obj)
+ except Exception:
+ return None, -1
+ source = Source()
+ source.lines = [line.rstrip() for line in sourcelines]
+ return source, lineno
+
+
+def getsource(obj, **kwargs) -> Source:
+ from .code import getrawcode
+
+ obj = getrawcode(obj)
+ try:
+ strsrc = inspect.getsource(obj)
+ except IndentationError:
+ strsrc = '"Buggy python version consider upgrading, cannot get source"'
+ assert isinstance(strsrc, str)
+ return Source(strsrc, **kwargs)
+
+
+def deindent(lines: Sequence[str]) -> List[str]:
+ return textwrap.dedent("\n".join(lines)).splitlines()
+
+
+def get_statement_startend2(lineno: int, node: ast.AST) -> Tuple[int, Optional[int]]:
+ import ast
+
+ # flatten all statements and except handlers into one lineno-list
+ # AST's line numbers start indexing at 1
+ values = [] # type: List[int]
+ for x in ast.walk(node):
+ if isinstance(x, (ast.stmt, ast.ExceptHandler)):
+ values.append(x.lineno - 1)
+ for name in ("finalbody", "orelse"):
+ val = getattr(x, name, None) # type: Optional[List[ast.stmt]]
+ if val:
+ # treat the finally/orelse part as its own statement
+ values.append(val[0].lineno - 1 - 1)
+ values.sort()
+ insert_index = bisect_right(values, lineno)
+ start = values[insert_index - 1]
+ if insert_index >= len(values):
+ end = None
+ else:
+ end = values[insert_index]
+ return start, end
+
+
+def getstatementrange_ast(
+ lineno: int,
+ source: Source,
+ assertion: bool = False,
+ astnode: Optional[ast.AST] = None,
+) -> Tuple[ast.AST, int, int]:
+ if astnode is None:
+ content = str(source)
+ # See #4260:
+ # don't produce duplicate warnings when compiling source to find ast
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore")
+ astnode = ast.parse(content, "source", "exec")
+
+ start, end = get_statement_startend2(lineno, astnode)
+ # we need to correct the end:
+ # - ast-parsing strips comments
+ # - there might be empty lines
+ # - we might have lesser indented code blocks at the end
+ if end is None:
+ end = len(source.lines)
+
+ if end > start + 1:
+ # make sure we don't span differently indented code blocks
+ # by using the BlockFinder helper used which inspect.getsource() uses itself
+ block_finder = inspect.BlockFinder()
+ # if we start with an indented line, put blockfinder to "started" mode
+ block_finder.started = source.lines[start][0].isspace()
+ it = ((x + "\n") for x in source.lines[start:end])
+ try:
+ for tok in tokenize.generate_tokens(lambda: next(it)):
+ block_finder.tokeneater(*tok)
+ except (inspect.EndOfBlock, IndentationError):
+ end = block_finder.last + start
+ except Exception:
+ pass
+
+ # the end might still point to a comment or empty line, correct it
+ while end:
+ line = source.lines[end - 1].lstrip()
+ if line.startswith("#") or not line:
+ end -= 1
+ else:
+ break
+ return astnode, start, end