aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/stack-data/stack_data
diff options
context:
space:
mode:
authorrobot-contrib <robot-contrib@yandex-team.ru>2022-05-18 00:43:36 +0300
committerrobot-contrib <robot-contrib@yandex-team.ru>2022-05-18 00:43:36 +0300
commit9e5f436a8b2a27bcc7802e443ea3ef3e41a82a75 (patch)
tree78b522cab9f76336e62064d4d8ff7c897659b20e /contrib/python/stack-data/stack_data
parent8113a823ffca6451bb5ff8f0334560885a939a24 (diff)
downloadydb-9e5f436a8b2a27bcc7802e443ea3ef3e41a82a75.tar.gz
Update contrib/python/ipython/py3 to 8.3.0
ref:e84342d4d30476f9148137f37fd0c6405fd36f55
Diffstat (limited to 'contrib/python/stack-data/stack_data')
-rw-r--r--contrib/python/stack-data/stack_data/__init__.py9
-rw-r--r--contrib/python/stack-data/stack_data/core.py882
-rw-r--r--contrib/python/stack-data/stack_data/formatting.py206
-rw-r--r--contrib/python/stack-data/stack_data/utils.py172
-rw-r--r--contrib/python/stack-data/stack_data/version.py1
5 files changed, 1270 insertions, 0 deletions
diff --git a/contrib/python/stack-data/stack_data/__init__.py b/contrib/python/stack-data/stack_data/__init__.py
new file mode 100644
index 0000000000..e9bc429e62
--- /dev/null
+++ b/contrib/python/stack-data/stack_data/__init__.py
@@ -0,0 +1,9 @@
+from .core import Source, FrameInfo, markers_from_ranges, Options, LINE_GAP, Line, Variable, RangeInLine, \
+ RepeatedFrames, MarkerInLine, style_with_executing_node
+from .formatting import Formatter
+
+try:
+ from .version import __version__
+except ImportError:
+ # version.py is auto-generated with the git tag when building
+ __version__ = "???"
diff --git a/contrib/python/stack-data/stack_data/core.py b/contrib/python/stack-data/stack_data/core.py
new file mode 100644
index 0000000000..97313fe333
--- /dev/null
+++ b/contrib/python/stack-data/stack_data/core.py
@@ -0,0 +1,882 @@
+import ast
+import html
+import os
+import sys
+from collections import defaultdict, Counter
+from textwrap import dedent
+from types import FrameType, CodeType, TracebackType
+from typing import (
+ Iterator, List, Tuple, Optional, NamedTuple,
+ Any, Iterable, Callable, Union,
+ Sequence)
+from typing import Mapping
+
+import executing
+from asttokens.util import Token
+from executing import only
+from pure_eval import Evaluator, is_expression_interesting
+from stack_data.utils import (
+ truncate, unique_in_order, line_range,
+ frame_and_lineno, iter_stack, collapse_repeated, group_by_key_func,
+ cached_property, is_frame, _pygmented_with_ranges, assert_)
+
+RangeInLine = NamedTuple('RangeInLine',
+ [('start', int),
+ ('end', int),
+ ('data', Any)])
+RangeInLine.__doc__ = """
+Represents a range of characters within one line of source code,
+and some associated data.
+
+Typically this will be converted to a pair of markers by markers_from_ranges.
+"""
+
+MarkerInLine = NamedTuple('MarkerInLine',
+ [('position', int),
+ ('is_start', bool),
+ ('string', str)])
+MarkerInLine.__doc__ = """
+A string that is meant to be inserted at a given position in a line of source code.
+For example, this could be an ANSI code or the opening or closing of an HTML tag.
+is_start should be True if this is the first of a pair such as the opening of an HTML tag.
+This will help to sort and insert markers correctly.
+
+Typically this would be created from a RangeInLine by markers_from_ranges.
+Then use Line.render to insert the markers correctly.
+"""
+
+
+class Variable(
+ NamedTuple('_Variable',
+ [('name', str),
+ ('nodes', Sequence[ast.AST]),
+ ('value', Any)])
+):
+ """
+ An expression that appears one or more times in source code and its associated value.
+ This will usually be a variable but it can be any expression evaluated by pure_eval.
+ - name is the source text of the expression.
+ - nodes is a list of equivalent nodes representing the same expression.
+ - value is the safely evaluated value of the expression.
+ """
+ __hash__ = object.__hash__
+ __eq__ = object.__eq__
+
+
+class Source(executing.Source):
+ """
+ The source code of a single file and associated metadata.
+
+ In addition to the attributes from the base class executing.Source,
+ if .tree is not None, meaning this is valid Python code, objects have:
+ - pieces: a list of Piece objects
+ - tokens_by_lineno: a defaultdict(list) mapping line numbers to lists of tokens.
+
+ Don't construct this class. Get an instance from frame_info.source.
+ """
+
+ def __init__(self, *args, **kwargs):
+ super(Source, self).__init__(*args, **kwargs)
+ if self.tree:
+ self.asttokens()
+
+ @cached_property
+ def pieces(self) -> List[range]:
+ if not self.tree:
+ return [
+ range(i, i + 1)
+ for i in range(1, len(self.lines) + 1)
+ ]
+ return list(self._clean_pieces())
+
+ @cached_property
+ def tokens_by_lineno(self) -> Mapping[int, List[Token]]:
+ if not self.tree:
+ raise AttributeError("This file doesn't contain valid Python, so .tokens_by_lineno doesn't exist")
+ return group_by_key_func(
+ self.asttokens().tokens,
+ lambda tok: tok.start[0],
+ )
+
+ def _clean_pieces(self) -> Iterator[range]:
+ pieces = self._raw_split_into_pieces(self.tree, 1, len(self.lines) + 1)
+ pieces = [
+ (start, end)
+ for (start, end) in pieces
+ if end > start
+ ]
+
+ starts = [start for start, end in pieces[1:]]
+ ends = [end for start, end in pieces[:-1]]
+ if starts != ends:
+ joins = list(map(set, zip(starts, ends)))
+ mismatches = [s for s in joins if len(s) > 1]
+ raise AssertionError("Pieces mismatches: %s" % mismatches)
+
+ def is_blank(i):
+ try:
+ return not self.lines[i - 1].strip()
+ except IndexError:
+ return False
+
+ for start, end in pieces:
+ while is_blank(start):
+ start += 1
+ while is_blank(end - 1):
+ end -= 1
+ if start < end:
+ yield range(start, end)
+
+ def _raw_split_into_pieces(
+ self,
+ stmt: ast.AST,
+ start: int,
+ end: int,
+ ) -> Iterator[Tuple[int, int]]:
+ self.asttokens()
+
+ for name, body in ast.iter_fields(stmt):
+ if (
+ isinstance(body, list) and body and
+ isinstance(body[0], (ast.stmt, ast.ExceptHandler))
+ ):
+ for rang, group in sorted(group_by_key_func(body, line_range).items()):
+ sub_stmt = group[0]
+ for inner_start, inner_end in self._raw_split_into_pieces(sub_stmt, *rang):
+ if start < inner_start:
+ yield start, inner_start
+ if inner_start < inner_end:
+ yield inner_start, inner_end
+ start = inner_end
+
+ yield start, end
+
+
+class Options:
+ """
+ Configuration for FrameInfo, either in the constructor or the .stack_data classmethod.
+ These all determine which Lines and gaps are produced by FrameInfo.lines.
+
+ before and after are the number of pieces of context to include in a frame
+ in addition to the executing piece.
+
+ include_signature is whether to include the function signature as a piece in a frame.
+
+ If a piece (other than the executing piece) has more than max_lines_per_piece lines,
+ it will be truncated with a gap in the middle.
+ """
+ def __init__(
+ self, *,
+ before: int = 3,
+ after: int = 1,
+ include_signature: bool = False,
+ max_lines_per_piece: int = 6,
+ pygments_formatter=None
+ ):
+ self.before = before
+ self.after = after
+ self.include_signature = include_signature
+ self.max_lines_per_piece = max_lines_per_piece
+ self.pygments_formatter = pygments_formatter
+
+ def __repr__(self):
+ keys = sorted(self.__dict__)
+ items = ("{}={!r}".format(k, self.__dict__[k]) for k in keys)
+ return "{}({})".format(type(self).__name__, ", ".join(items))
+
+
+class LineGap(object):
+ """
+ A singleton representing one or more lines of source code that were skipped
+ in FrameInfo.lines.
+
+ LINE_GAP can be created in two ways:
+ - by truncating a piece of context that's too long.
+ - immediately after the signature piece if Options.include_signature is true
+ and the following piece isn't already part of the included pieces.
+ """
+ def __repr__(self):
+ return "LINE_GAP"
+
+
+LINE_GAP = LineGap()
+
+
+class Line(object):
+ """
+ A single line of source code for a particular stack frame.
+
+ Typically this is obtained from FrameInfo.lines.
+ Since that list may also contain LINE_GAP, you should first check
+ that this is really a Line before using it.
+
+ Attributes:
+ - frame_info
+ - lineno: the 1-based line number within the file
+ - text: the raw source of this line. For displaying text, see .render() instead.
+ - leading_indent: the number of leading spaces that should probably be stripped.
+ This attribute is set within FrameInfo.lines. If you construct this class
+ directly you should probably set it manually (at least to 0).
+ - is_current: whether this is the line currently being executed by the interpreter
+ within this frame.
+ - tokens: a list of source tokens in this line
+
+ There are several helpers for constructing RangeInLines which can be converted to markers
+ using markers_from_ranges which can be passed to .render():
+ - token_ranges
+ - variable_ranges
+ - executing_node_ranges
+ - range_from_node
+ """
+ def __init__(
+ self,
+ frame_info: 'FrameInfo',
+ lineno: int,
+ ):
+ self.frame_info = frame_info
+ self.lineno = lineno
+ self.text = frame_info.source.lines[lineno - 1] # type: str
+ self.leading_indent = None # type: Optional[int]
+
+ def __repr__(self):
+ return "<{self.__class__.__name__} {self.lineno} (current={self.is_current}) " \
+ "{self.text!r} of {self.frame_info.filename}>".format(self=self)
+
+ @property
+ def is_current(self) -> bool:
+ """
+ Whether this is the line currently being executed by the interpreter
+ within this frame.
+ """
+ return self.lineno == self.frame_info.lineno
+
+ @property
+ def tokens(self) -> List[Token]:
+ """
+ A list of source tokens in this line.
+ The tokens are Token objects from asttokens:
+ https://asttokens.readthedocs.io/en/latest/api-index.html#asttokens.util.Token
+ """
+ return self.frame_info.source.tokens_by_lineno[self.lineno]
+
+ @cached_property
+ def token_ranges(self) -> List[RangeInLine]:
+ """
+ A list of RangeInLines for each token in .tokens,
+ where range.data is a Token object from asttokens:
+ https://asttokens.readthedocs.io/en/latest/api-index.html#asttokens.util.Token
+ """
+ return [
+ RangeInLine(
+ token.start[1],
+ token.end[1],
+ token,
+ )
+ for token in self.tokens
+ ]
+
+ @cached_property
+ def variable_ranges(self) -> List[RangeInLine]:
+ """
+ A list of RangeInLines for each Variable that appears at least partially in this line.
+ The data attribute of the range is a pair (variable, node) where node is the particular
+ AST node from the list variable.nodes that corresponds to this range.
+ """
+ return [
+ self.range_from_node(node, (variable, node))
+ for variable, node in self.frame_info.variables_by_lineno[self.lineno]
+ ]
+
+ @cached_property
+ def executing_node_ranges(self) -> List[RangeInLine]:
+ """
+ A list of one or zero RangeInLines for the executing node of this frame.
+ The list will have one element if the node can be found and it overlaps this line.
+ """
+ return self._raw_executing_node_ranges(
+ self.frame_info._executing_node_common_indent
+ )
+
+ def _raw_executing_node_ranges(self, common_indent=0) -> List[RangeInLine]:
+ ex = self.frame_info.executing
+ node = ex.node
+ if node:
+ rang = self.range_from_node(node, ex, common_indent)
+ if rang:
+ return [rang]
+ return []
+
+ def range_from_node(
+ self, node: ast.AST, data: Any, common_indent: int = 0
+ ) -> Optional[RangeInLine]:
+ """
+ If the given node overlaps with this line, return a RangeInLine
+ with the correct start and end and the given data.
+ Otherwise, return None.
+ """
+ start, end = line_range(node)
+ end -= 1
+ if not (start <= self.lineno <= end):
+ return None
+ if start == self.lineno:
+ try:
+ range_start = node.first_token.start[1]
+ except AttributeError:
+ range_start = node.col_offset
+ else:
+ range_start = 0
+
+ range_start = max(range_start, common_indent)
+
+ if end == self.lineno:
+ try:
+ range_end = node.last_token.end[1]
+ except AttributeError:
+ try:
+ range_end = node.end_col_offset
+ except AttributeError:
+ return None
+ else:
+ range_end = len(self.text)
+
+ return RangeInLine(range_start, range_end, data)
+
+ def render(
+ self,
+ markers: Iterable[MarkerInLine] = (),
+ *,
+ strip_leading_indent: bool = True,
+ pygmented: bool = False,
+ escape_html: bool = False
+ ) -> str:
+ """
+ Produces a string for display consisting of .text
+ with the .strings of each marker inserted at the correct positions.
+ If strip_leading_indent is true (the default) then leading spaces
+ common to all lines in this frame will be excluded.
+ """
+ if pygmented and self.frame_info.scope:
+ assert_(not markers, ValueError("Cannot use pygmented with markers"))
+ start_line, lines = self.frame_info._pygmented_scope_lines
+ result = lines[self.lineno - start_line]
+ if strip_leading_indent:
+ result = result.replace(self.text[:self.leading_indent], "", 1)
+ return result
+
+ text = self.text
+
+ # This just makes the loop below simpler
+ markers = list(markers) + [MarkerInLine(position=len(text), is_start=False, string='')]
+
+ markers.sort(key=lambda t: t[:2])
+
+ parts = []
+ if strip_leading_indent:
+ start = self.leading_indent
+ else:
+ start = 0
+ original_start = start
+
+ for marker in markers:
+ text_part = text[start:marker.position]
+ if escape_html:
+ text_part = html.escape(text_part)
+ parts.append(text_part)
+ parts.append(marker.string)
+
+ # Ensure that start >= leading_indent
+ start = max(marker.position, original_start)
+ return ''.join(parts)
+
+
+def markers_from_ranges(
+ ranges: Iterable[RangeInLine],
+ converter: Callable[[RangeInLine], Optional[Tuple[str, str]]],
+) -> List[MarkerInLine]:
+ """
+ Helper to create MarkerInLines given some RangeInLines.
+ converter should be a function accepting a RangeInLine returning
+ either None (which is ignored) or a pair of strings which
+ are used to create two markers included in the returned list.
+ """
+ markers = []
+ for rang in ranges:
+ converted = converter(rang)
+ if converted is None:
+ continue
+
+ start_string, end_string = converted
+ if not (isinstance(start_string, str) and isinstance(end_string, str)):
+ raise TypeError("converter should return None or a pair of strings")
+
+ markers += [
+ MarkerInLine(position=rang.start, is_start=True, string=start_string),
+ MarkerInLine(position=rang.end, is_start=False, string=end_string),
+ ]
+ return markers
+
+
+def style_with_executing_node(style, modifier):
+ from pygments.styles import get_style_by_name
+ if isinstance(style, str):
+ style = get_style_by_name(style)
+
+ class NewStyle(style):
+ for_executing_node = True
+
+ styles = {
+ **style.styles,
+ **{
+ k.ExecutingNode: v + " " + modifier
+ for k, v in style.styles.items()
+ }
+ }
+
+ return NewStyle
+
+
+class RepeatedFrames:
+ """
+ A sequence of consecutive stack frames which shouldn't be displayed because
+ the same code and line number were repeated many times in the stack, e.g.
+ because of deep recursion.
+
+ Attributes:
+ - frames: list of raw frame or traceback objects
+ - frame_keys: list of tuples (frame.f_code, lineno) extracted from the frame objects.
+ It's this information from the frames that is used to determine
+ whether two frames should be considered similar (i.e. repeating).
+ - description: A string briefly describing frame_keys
+ """
+ def __init__(
+ self,
+ frames: List[Union[FrameType, TracebackType]],
+ frame_keys: List[Tuple[CodeType, int]],
+ ):
+ self.frames = frames
+ self.frame_keys = frame_keys
+
+ @cached_property
+ def description(self) -> str:
+ """
+ A string briefly describing the repeated frames, e.g.
+ my_function at line 10 (100 times)
+ """
+ counts = sorted(Counter(self.frame_keys).items(),
+ key=lambda item: (-item[1], item[0][0].co_name))
+ return ', '.join(
+ '{name} at line {lineno} ({count} times)'.format(
+ name=Source.for_filename(code.co_filename).code_qualname(code),
+ lineno=lineno,
+ count=count,
+ )
+ for (code, lineno), count in counts
+ )
+
+ def __repr__(self):
+ return '<{self.__class__.__name__} {self.description}>'.format(self=self)
+
+
+class FrameInfo(object):
+ """
+ Information about a frame!
+ Pass either a frame object or a traceback object,
+ and optionally an Options object to configure.
+
+ Or use the classmethod FrameInfo.stack_data() for an iterator of FrameInfo and
+ RepeatedFrames objects.
+
+ Attributes:
+ - frame: an actual stack frame object, either frame_or_tb or frame_or_tb.tb_frame
+ - options
+ - code: frame.f_code
+ - source: a Source object
+ - filename: a hopefully absolute file path derived from code.co_filename
+ - scope: the AST node of the innermost function, class or module being executed
+ - lines: a list of Line/LineGap objects to display, determined by options
+ - executing: an Executing object from the `executing` library, which has:
+ - .node: the AST node being executed in this frame, or None if it's unknown
+ - .statements: a set of one or more candidate statements (AST nodes, probably just one)
+ currently being executed in this frame.
+ - .code_qualname(): the __qualname__ of the function or class being executed,
+ or just the code name.
+
+ Properties returning one or more pieces of source code (ranges of lines):
+ - scope_pieces: all the pieces in the scope
+ - included_pieces: a subset of scope_pieces determined by options
+ - executing_piece: the piece currently being executed in this frame
+
+ Properties returning lists of Variable objects:
+ - variables: all variables in the scope
+ - variables_by_lineno: variables organised into lines
+ - variables_in_lines: variables contained within FrameInfo.lines
+ - variables_in_executing_piece: variables contained within FrameInfo.executing_piece
+ """
+ def __init__(
+ self,
+ frame_or_tb: Union[FrameType, TracebackType],
+ options: Optional[Options] = None,
+ ):
+ self.executing = Source.executing(frame_or_tb)
+ frame, self.lineno = frame_and_lineno(frame_or_tb)
+ self.frame = frame
+ self.code = frame.f_code
+ self.options = options or Options() # type: Options
+ self.source = self.executing.source # type: Source
+
+ def __repr__(self):
+ return "{self.__class__.__name__}({self.frame})".format(self=self)
+
+ @classmethod
+ def stack_data(
+ cls,
+ frame_or_tb: Union[FrameType, TracebackType],
+ options: Optional[Options] = None,
+ *,
+ collapse_repeated_frames: bool = True
+ ) -> Iterator[Union['FrameInfo', RepeatedFrames]]:
+ """
+ An iterator of FrameInfo and RepeatedFrames objects representing
+ a full traceback or stack. Similar consecutive frames are collapsed into RepeatedFrames
+ objects, so always check what type of object has been yielded.
+
+ Pass either a frame object or a traceback object,
+ and optionally an Options object to configure.
+ """
+ stack = list(iter_stack(frame_or_tb))
+
+ # Reverse the stack from a frame so that it's in the same order
+ # as the order from a traceback, which is the order of a printed
+ # traceback when read top to bottom (most recent call last)
+ if is_frame(frame_or_tb):
+ stack = stack[::-1]
+
+ def mapper(f):
+ return cls(f, options)
+
+ if not collapse_repeated_frames:
+ yield from map(mapper, stack)
+ return
+
+ def _frame_key(x):
+ frame, lineno = frame_and_lineno(x)
+ return frame.f_code, lineno
+
+ yield from collapse_repeated(
+ stack,
+ mapper=mapper,
+ collapser=RepeatedFrames,
+ key=_frame_key,
+ )
+
+ @cached_property
+ def scope_pieces(self) -> List[range]:
+ """
+ All the pieces (ranges of lines) contained in this object's .scope,
+ unless there is no .scope (because the source isn't valid Python syntax)
+ in which case it returns all the pieces in the source file, each containing one line.
+ """
+ if not self.scope:
+ return self.source.pieces
+
+ scope_start, scope_end = line_range(self.scope)
+ return [
+ piece
+ for piece in self.source.pieces
+ if scope_start <= piece.start and piece.stop <= scope_end
+ ]
+
+ @cached_property
+ def filename(self) -> str:
+ """
+ A hopefully absolute file path derived from .code.co_filename,
+ the current working directory, and sys.path.
+ Code based on ipython.
+ """
+ result = self.code.co_filename
+
+ if (
+ os.path.isabs(result) or
+ (
+ result.startswith("<") and
+ result.endswith(">")
+ )
+ ):
+ return result
+
+ # Try to make the filename absolute by trying all
+ # sys.path entries (which is also what linecache does)
+ # as well as the current working directory
+ for dirname in ["."] + list(sys.path):
+ try:
+ fullname = os.path.join(dirname, result)
+ if os.path.isfile(fullname):
+ return os.path.abspath(fullname)
+ except Exception:
+ # Just in case that sys.path contains very
+ # strange entries...
+ pass
+
+ return result
+
+ @cached_property
+ def executing_piece(self) -> range:
+ """
+ The piece (range of lines) containing the line currently being executed
+ by the interpreter in this frame.
+ """
+ return only(
+ piece
+ for piece in self.scope_pieces
+ if self.lineno in piece
+ )
+
+ @cached_property
+ def included_pieces(self) -> List[range]:
+ """
+ The list of pieces (ranges of lines) to display for this frame.
+ Consists of .executing_piece, surrounding context pieces
+ determined by .options.before and .options.after,
+ and the function signature if a function is being executed and
+ .options.include_signature is True (in which case this might not
+ be a contiguous range of pieces).
+ Always a subset of .scope_pieces.
+ """
+ scope_pieces = self.scope_pieces
+ if not self.scope_pieces:
+ return []
+
+ pos = scope_pieces.index(self.executing_piece)
+ pieces_start = max(0, pos - self.options.before)
+ pieces_end = pos + 1 + self.options.after
+ pieces = scope_pieces[pieces_start:pieces_end]
+
+ if (
+ self.options.include_signature
+ and not self.code.co_name.startswith('<')
+ and isinstance(self.scope, (ast.FunctionDef, ast.AsyncFunctionDef))
+ and pieces_start > 0
+ ):
+ pieces.insert(0, scope_pieces[0])
+
+ return pieces
+
+ @cached_property
+ def _executing_node_common_indent(self) -> int:
+ """
+ The common minimal indentation shared by the markers intended
+ for an exception node that spans multiple lines.
+
+ Intended to be used only internally.
+ """
+ indents = []
+ lines = [line for line in self.lines if isinstance(line, Line)]
+
+ for line in lines:
+ for rang in line._raw_executing_node_ranges():
+ begin_text = len(line.text) - len(line.text.lstrip())
+ indent = max(rang.start, begin_text)
+ indents.append(indent)
+
+ return min(indents) if indents else 0
+
+ @cached_property
+ def lines(self) -> List[Union[Line, LineGap]]:
+ """
+ A list of lines to display, determined by options.
+ The objects yielded either have type Line or are the singleton LINE_GAP.
+ Always check the type that you're dealing with when iterating.
+
+ LINE_GAP can be created in two ways:
+ - by truncating a piece of context that's too long, determined by
+ .options.max_lines_per_piece
+ - immediately after the signature piece if Options.include_signature is true
+ and the following piece isn't already part of the included pieces.
+
+ The Line objects are all within the ranges from .included_pieces.
+ """
+ pieces = self.included_pieces
+ if not pieces:
+ return []
+
+ result = []
+ for i, piece in enumerate(pieces):
+ if (
+ i == 1
+ and self.scope
+ and pieces[0] == self.scope_pieces[0]
+ and pieces[1] != self.scope_pieces[1]
+ ):
+ result.append(LINE_GAP)
+
+ lines = [Line(self, i) for i in piece] # type: List[Line]
+ if piece != self.executing_piece:
+ lines = truncate(
+ lines,
+ max_length=self.options.max_lines_per_piece,
+ middle=[LINE_GAP],
+ )
+ result.extend(lines)
+
+ real_lines = [
+ line
+ for line in result
+ if isinstance(line, Line)
+ ]
+
+ text = "\n".join(
+ line.text
+ for line in real_lines
+ )
+ dedented_lines = dedent(text).splitlines()
+ leading_indent = len(real_lines[0].text) - len(dedented_lines[0])
+ for line in real_lines:
+ line.leading_indent = leading_indent
+
+ return result
+
+ @cached_property
+ def scope(self) -> Optional[ast.AST]:
+ """
+ The AST node of the innermost function, class or module being executed.
+ """
+ if not self.source.tree or not self.executing.statements:
+ return None
+
+ stmt = list(self.executing.statements)[0]
+ while True:
+ # Get the parent first in case the original statement is already
+ # a function definition, e.g. if we're calling a decorator
+ # In that case we still want the surrounding scope, not that function
+ stmt = stmt.parent
+ if isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Module)):
+ return stmt
+
+ @cached_property
+ def _pygmented_scope_lines(self) -> Optional[Tuple[int, List[str]]]:
+ # noinspection PyUnresolvedReferences
+ from pygments.formatters import HtmlFormatter
+
+ formatter = self.options.pygments_formatter
+ scope = self.scope
+ assert_(formatter, ValueError("Must set a pygments formatter in Options"))
+ assert_(scope)
+
+ if isinstance(formatter, HtmlFormatter):
+ formatter.nowrap = True
+
+ atok = self.source.asttokens()
+ node = self.executing.node
+ if node and getattr(formatter.style, "for_executing_node", False):
+ scope_start = atok.get_text_range(scope)[0]
+ start, end = atok.get_text_range(node)
+ start -= scope_start
+ end -= scope_start
+ ranges = [(start, end)]
+ else:
+ ranges = []
+
+ code = atok.get_text(scope)
+ lines = _pygmented_with_ranges(formatter, code, ranges)
+
+ start_line = line_range(scope)[0]
+
+ return start_line, lines
+
+ @cached_property
+ def variables(self) -> List[Variable]:
+ """
+ All Variable objects whose nodes are contained within .scope
+ and whose values could be safely evaluated by pure_eval.
+ """
+ if not self.scope:
+ return []
+
+ evaluator = Evaluator.from_frame(self.frame)
+ scope = self.scope
+ node_values = [
+ pair
+ for pair in evaluator.find_expressions(scope)
+ if is_expression_interesting(*pair)
+ ] # type: List[Tuple[ast.AST, Any]]
+
+ if isinstance(scope, (ast.FunctionDef, ast.AsyncFunctionDef)):
+ for node in ast.walk(scope.args):
+ if not isinstance(node, ast.arg):
+ continue
+ name = node.arg
+ try:
+ value = evaluator.names[name]
+ except KeyError:
+ pass
+ else:
+ node_values.append((node, value))
+
+ # Group equivalent nodes together
+ def get_text(n):
+ if isinstance(n, ast.arg):
+ return n.arg
+ else:
+ return self.source.asttokens().get_text(n)
+
+ def normalise_node(n):
+ try:
+ # Add parens to avoid syntax errors for multiline expressions
+ return ast.parse('(' + get_text(n) + ')')
+ except Exception:
+ return n
+
+ grouped = group_by_key_func(
+ node_values,
+ lambda nv: ast.dump(normalise_node(nv[0])),
+ )
+
+ result = []
+ for group in grouped.values():
+ nodes, values = zip(*group)
+ value = values[0]
+ text = get_text(nodes[0])
+ if not text:
+ continue
+ result.append(Variable(text, nodes, value))
+
+ return result
+
+ @cached_property
+ def variables_by_lineno(self) -> Mapping[int, List[Tuple[Variable, ast.AST]]]:
+ """
+ A mapping from 1-based line numbers to lists of pairs:
+ - A Variable object
+ - A specific AST node from the variable's .nodes list that's
+ in the line at that line number.
+ """
+ result = defaultdict(list)
+ for var in self.variables:
+ for node in var.nodes:
+ for lineno in range(*line_range(node)):
+ result[lineno].append((var, node))
+ return result
+
+ @cached_property
+ def variables_in_lines(self) -> List[Variable]:
+ """
+ A list of Variable objects contained within the lines returned by .lines.
+ """
+ return unique_in_order(
+ var
+ for line in self.lines
+ if isinstance(line, Line)
+ for var, node in self.variables_by_lineno[line.lineno]
+ )
+
+ @cached_property
+ def variables_in_executing_piece(self) -> List[Variable]:
+ """
+ A list of Variable objects contained within the lines
+ in the range returned by .executing_piece.
+ """
+ return unique_in_order(
+ var
+ for lineno in self.executing_piece
+ for var, node in self.variables_by_lineno[lineno]
+ )
diff --git a/contrib/python/stack-data/stack_data/formatting.py b/contrib/python/stack-data/stack_data/formatting.py
new file mode 100644
index 0000000000..b37da99b62
--- /dev/null
+++ b/contrib/python/stack-data/stack_data/formatting.py
@@ -0,0 +1,206 @@
+import inspect
+import sys
+import traceback
+from types import FrameType, TracebackType
+from typing import Union, Iterable
+
+from stack_data import style_with_executing_node, Options, Line, FrameInfo, LINE_GAP, Variable, RepeatedFrames
+from stack_data.utils import assert_
+
+
+class Formatter:
+ def __init__(
+ self, *,
+ options=Options(),
+ pygmented=False,
+ show_executing_node=True,
+ pygments_formatter_cls=None,
+ pygments_formatter_kwargs=None,
+ pygments_style="monokai",
+ executing_node_modifier="bg:#005080",
+ executing_node_underline="^",
+ current_line_indicator="-->",
+ line_gap_string="(...)",
+ show_variables=False,
+ use_code_qualname=True,
+ show_linenos=True,
+ strip_leading_indent=True,
+ html=False,
+ chain=True,
+ collapse_repeated_frames=True
+ ):
+ if pygmented and not options.pygments_formatter:
+ if show_executing_node:
+ pygments_style = style_with_executing_node(
+ pygments_style, executing_node_modifier
+ )
+
+ if pygments_formatter_cls is None:
+ from pygments.formatters.terminal256 import Terminal256Formatter \
+ as pygments_formatter_cls
+
+ options.pygments_formatter = pygments_formatter_cls(
+ style=pygments_style,
+ **pygments_formatter_kwargs or {},
+ )
+
+ self.pygmented = pygmented
+ self.show_executing_node = show_executing_node
+ assert_(
+ len(executing_node_underline) == 1,
+ ValueError("executing_node_underline must be a single character"),
+ )
+ self.executing_node_underline = executing_node_underline
+ self.current_line_indicator = current_line_indicator or ""
+ self.line_gap_string = line_gap_string
+ self.show_variables = show_variables
+ self.show_linenos = show_linenos
+ self.use_code_qualname = use_code_qualname
+ self.strip_leading_indent = strip_leading_indent
+ self.html = html
+ self.chain = chain
+ self.options = options
+ self.collapse_repeated_frames = collapse_repeated_frames
+
+ def set_hook(self):
+ def excepthook(_etype, evalue, _tb):
+ self.print_exception(evalue)
+
+ sys.excepthook = excepthook
+
+ def print_exception(self, e=None, *, file=None):
+ self.print_lines(self.format_exception(e), file=file)
+
+ def print_stack(self, frame_or_tb=None, *, file=None):
+ if frame_or_tb is None:
+ frame_or_tb = inspect.currentframe().f_back
+
+ self.print_lines(self.format_stack(frame_or_tb), file=file)
+
+ def print_lines(self, lines, *, file=None):
+ if file is None:
+ file = sys.stderr
+ for line in lines:
+ print(line, file=file, end="")
+
+ def format_exception(self, e=None) -> Iterable[str]:
+ if e is None:
+ e = sys.exc_info()[1]
+
+ if self.chain:
+ if e.__cause__ is not None:
+ yield from self.format_exception(e.__cause__)
+ yield traceback._cause_message
+ elif (e.__context__ is not None
+ and not e.__suppress_context__):
+ yield from self.format_exception(e.__context__)
+ yield traceback._context_message
+
+ yield 'Traceback (most recent call last):\n'
+ yield from self.format_stack(e.__traceback__)
+ yield from traceback.format_exception_only(type(e), e)
+
+ def format_stack(self, frame_or_tb=None) -> Iterable[str]:
+ if frame_or_tb is None:
+ frame_or_tb = inspect.currentframe().f_back
+
+ yield from self.format_stack_data(
+ FrameInfo.stack_data(
+ frame_or_tb,
+ self.options,
+ collapse_repeated_frames=self.collapse_repeated_frames,
+ )
+ )
+
+ def format_stack_data(
+ self, stack: Iterable[Union[FrameInfo, RepeatedFrames]]
+ ) -> Iterable[str]:
+ for item in stack:
+ if isinstance(item, FrameInfo):
+ yield from self.format_frame(item)
+ else:
+ yield self.format_repeated_frames(item)
+
+ def format_repeated_frames(self, repeated_frames: RepeatedFrames) -> str:
+ return ' [... skipping similar frames: {}]\n'.format(
+ repeated_frames.description
+ )
+
+ def format_frame(self, frame: Union[FrameInfo, FrameType, TracebackType]) -> Iterable[str]:
+ if not isinstance(frame, FrameInfo):
+ frame = FrameInfo(frame, self.options)
+
+ yield self.format_frame_header(frame)
+
+ for line in frame.lines:
+ if isinstance(line, Line):
+ yield self.format_line(line)
+ else:
+ assert_(line is LINE_GAP)
+ yield self.line_gap_string + "\n"
+
+ if self.show_variables:
+ try:
+ yield from self.format_variables(frame)
+ except Exception:
+ pass
+
+ def format_frame_header(self, frame_info: FrameInfo) -> str:
+ return ' File "{frame_info.filename}", line {frame_info.lineno}, in {name}\n'.format(
+ frame_info=frame_info,
+ name=(
+ frame_info.executing.code_qualname()
+ if self.use_code_qualname else
+ frame_info.code.co_name
+ ),
+ )
+
+ def format_line(self, line: Line) -> str:
+ result = ""
+ if self.current_line_indicator:
+ if line.is_current:
+ result = self.current_line_indicator
+ else:
+ result = " " * len(self.current_line_indicator)
+ result += " "
+
+ if self.show_linenos:
+ result += "{:4} | ".format(line.lineno)
+
+ result = result or " "
+
+ prefix = result
+
+ result += line.render(
+ pygmented=self.pygmented,
+ escape_html=self.html,
+ strip_leading_indent=self.strip_leading_indent,
+ ) + "\n"
+
+ if self.show_executing_node and not self.pygmented:
+ for line_range in line.executing_node_ranges:
+ start = line_range.start - line.leading_indent
+ end = line_range.end - line.leading_indent
+ result += (
+ " " * (start + len(prefix))
+ + self.executing_node_underline * (end - start)
+ + "\n"
+ )
+
+ return result
+
+ def format_variables(self, frame_info: FrameInfo) -> Iterable[str]:
+ for var in sorted(frame_info.variables, key=lambda v: v.name):
+ try:
+ yield self.format_variable(var) + "\n"
+ except Exception:
+ pass
+
+ def format_variable(self, var: Variable) -> str:
+ return "{} = {}".format(
+ var.name,
+ self.format_variable_value(var.value),
+ )
+
+ def format_variable_value(self, value) -> str:
+ return repr(value)
diff --git a/contrib/python/stack-data/stack_data/utils.py b/contrib/python/stack-data/stack_data/utils.py
new file mode 100644
index 0000000000..71d55eadc1
--- /dev/null
+++ b/contrib/python/stack-data/stack_data/utils.py
@@ -0,0 +1,172 @@
+import ast
+import itertools
+import types
+from collections import OrderedDict, Counter, defaultdict
+from types import FrameType, TracebackType
+from typing import (
+ Iterator, List, Tuple, Iterable, Callable, Union,
+ TypeVar, Mapping,
+)
+
+T = TypeVar('T')
+R = TypeVar('R')
+
+
+def truncate(seq, max_length: int, middle):
+ if len(seq) > max_length:
+ right = (max_length - len(middle)) // 2
+ left = max_length - len(middle) - right
+ seq = seq[:left] + middle + seq[-right:]
+ return seq
+
+
+def unique_in_order(it: Iterable[T]) -> List[T]:
+ return list(OrderedDict.fromkeys(it))
+
+
+def line_range(node: ast.AST) -> Tuple[int, int]:
+ """
+ Returns a pair of numbers representing a half open range
+ (i.e. suitable as arguments to the `range()` builtin)
+ of line numbers of the given AST nodes.
+ """
+ try:
+ return (
+ node.first_token.start[0],
+ node.last_token.end[0] + 1,
+ )
+ except AttributeError:
+ return (
+ node.lineno,
+ getattr(node, "end_lineno", node.lineno) + 1,
+ )
+
+
+def highlight_unique(lst: List[T]) -> Iterator[Tuple[T, bool]]:
+ counts = Counter(lst)
+
+ for is_common, group in itertools.groupby(lst, key=lambda x: counts[x] > 3):
+ if is_common:
+ group = list(group)
+ highlighted = [False] * len(group)
+
+ def highlight_index(f):
+ try:
+ i = f()
+ except ValueError:
+ return None
+ highlighted[i] = True
+ return i
+
+ for item in set(group):
+ first = highlight_index(lambda: group.index(item))
+ if first is not None:
+ highlight_index(lambda: group.index(item, first + 1))
+ highlight_index(lambda: -1 - group[::-1].index(item))
+ else:
+ highlighted = itertools.repeat(True)
+
+ yield from zip(group, highlighted)
+
+
+def identity(x: T) -> T:
+ return x
+
+
+def collapse_repeated(lst, *, collapser, mapper=identity, key=identity):
+ keyed = list(map(key, lst))
+ for is_highlighted, group in itertools.groupby(
+ zip(lst, highlight_unique(keyed)),
+ key=lambda t: t[1][1],
+ ):
+ original_group, highlighted_group = zip(*group)
+ if is_highlighted:
+ yield from map(mapper, original_group)
+ else:
+ keyed_group, _ = zip(*highlighted_group)
+ yield collapser(list(original_group), list(keyed_group))
+
+
+def is_frame(frame_or_tb: Union[FrameType, TracebackType]) -> bool:
+ assert_(isinstance(frame_or_tb, (types.FrameType, types.TracebackType)))
+ return isinstance(frame_or_tb, (types.FrameType,))
+
+
+def iter_stack(frame_or_tb: Union[FrameType, TracebackType]) -> Iterator[Union[FrameType, TracebackType]]:
+ while frame_or_tb:
+ yield frame_or_tb
+ if is_frame(frame_or_tb):
+ frame_or_tb = frame_or_tb.f_back
+ else:
+ frame_or_tb = frame_or_tb.tb_next
+
+
+def frame_and_lineno(frame_or_tb: Union[FrameType, TracebackType]) -> Tuple[FrameType, int]:
+ if is_frame(frame_or_tb):
+ return frame_or_tb, frame_or_tb.f_lineno
+ else:
+ return frame_or_tb.tb_frame, frame_or_tb.tb_lineno
+
+
+def group_by_key_func(iterable: Iterable[T], key_func: Callable[[T], R]) -> Mapping[R, List[T]]:
+ # noinspection PyUnresolvedReferences
+ """
+ Create a dictionary from an iterable such that the keys are the result of evaluating a key function on elements
+ of the iterable and the values are lists of elements all of which correspond to the key.
+
+ >>> def si(d): return sorted(d.items())
+ >>> si(group_by_key_func("a bb ccc d ee fff".split(), len))
+ [(1, ['a', 'd']), (2, ['bb', 'ee']), (3, ['ccc', 'fff'])]
+ >>> si(group_by_key_func([-1, 0, 1, 3, 6, 8, 9, 2], lambda x: x % 2))
+ [(0, [0, 6, 8, 2]), (1, [-1, 1, 3, 9])]
+ """
+ result = defaultdict(list)
+ for item in iterable:
+ result[key_func(item)].append(item)
+ return result
+
+
+class cached_property(object):
+ """
+ A property that is only computed once per instance and then replaces itself
+ with an ordinary attribute. Deleting the attribute resets the property.
+
+ Based on https://github.com/pydanny/cached-property/blob/master/cached_property.py
+ """
+
+ def __init__(self, func):
+ self.__doc__ = func.__doc__
+ self.func = func
+
+ def cached_property_wrapper(self, obj, _cls):
+ if obj is None:
+ return self
+
+ value = obj.__dict__[self.func.__name__] = self.func(obj)
+ return value
+
+ __get__ = cached_property_wrapper
+
+
+def _pygmented_with_ranges(formatter, code, ranges):
+ import pygments
+ from pygments.lexers import get_lexer_by_name
+
+ class MyLexer(type(get_lexer_by_name("python3"))):
+ def get_tokens(self, text):
+ length = 0
+ for ttype, value in super().get_tokens(text):
+ if any(start <= length < end for start, end in ranges):
+ ttype = ttype.ExecutingNode
+ length += len(value)
+ yield ttype, value
+
+ lexer = MyLexer(stripnl=False)
+ return pygments.highlight(code, lexer, formatter).splitlines()
+
+
+def assert_(condition, error=""):
+ if not condition:
+ if isinstance(error, str):
+ error = AssertionError(error)
+ raise error
diff --git a/contrib/python/stack-data/stack_data/version.py b/contrib/python/stack-data/stack_data/version.py
new file mode 100644
index 0000000000..7fd229a32b
--- /dev/null
+++ b/contrib/python/stack-data/stack_data/version.py
@@ -0,0 +1 @@
+__version__ = '0.2.0'