diff options
author | nkozlovskiy <nmk@ydb.tech> | 2023-09-29 12:24:06 +0300 |
---|---|---|
committer | nkozlovskiy <nmk@ydb.tech> | 2023-09-29 12:41:34 +0300 |
commit | e0e3e1717e3d33762ce61950504f9637a6e669ed (patch) | |
tree | bca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/python/stack-data/stack_data | |
parent | 38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff) | |
download | ydb-e0e3e1717e3d33762ce61950504f9637a6e669ed.tar.gz |
add ydb deps
Diffstat (limited to 'contrib/python/stack-data/stack_data')
-rw-r--r-- | contrib/python/stack-data/stack_data/__init__.py | 10 | ||||
-rw-r--r-- | contrib/python/stack-data/stack_data/core.py | 926 | ||||
-rw-r--r-- | contrib/python/stack-data/stack_data/formatting.py | 234 | ||||
-rw-r--r-- | contrib/python/stack-data/stack_data/py.typed | 1 | ||||
-rw-r--r-- | contrib/python/stack-data/stack_data/serializing.py | 201 | ||||
-rw-r--r-- | contrib/python/stack-data/stack_data/utils.py | 184 | ||||
-rw-r--r-- | contrib/python/stack-data/stack_data/version.py | 1 |
7 files changed, 1557 insertions, 0 deletions
diff --git a/contrib/python/stack-data/stack_data/__init__.py b/contrib/python/stack-data/stack_data/__init__.py new file mode 100644 index 0000000000..828121483c --- /dev/null +++ b/contrib/python/stack-data/stack_data/__init__.py @@ -0,0 +1,10 @@ +from .core import Source, FrameInfo, markers_from_ranges, Options, LINE_GAP, Line, Variable, RangeInLine, \ + RepeatedFrames, MarkerInLine, style_with_executing_node, BlankLineRange, BlankLines +from .formatting import Formatter +from .serializing import Serializer + +try: + from .version import __version__ +except ImportError: + # version.py is auto-generated with the git tag when building + __version__ = "???" diff --git a/contrib/python/stack-data/stack_data/core.py b/contrib/python/stack-data/stack_data/core.py new file mode 100644 index 0000000000..88e060392a --- /dev/null +++ b/contrib/python/stack-data/stack_data/core.py @@ -0,0 +1,926 @@ +import ast +import html +import os +import sys +from collections import defaultdict, Counter +from enum import Enum +from textwrap import dedent +from types import FrameType, CodeType, TracebackType +from typing import ( + Iterator, List, Tuple, Optional, NamedTuple, + Any, Iterable, Callable, Union, + Sequence) +from typing import Mapping + +import executing +from asttokens.util import Token +from executing import only +from pure_eval import Evaluator, is_expression_interesting +from stack_data.utils import ( + truncate, unique_in_order, line_range, + frame_and_lineno, iter_stack, collapse_repeated, group_by_key_func, + cached_property, is_frame, _pygmented_with_ranges, assert_) + +RangeInLine = NamedTuple('RangeInLine', + [('start', int), + ('end', int), + ('data', Any)]) +RangeInLine.__doc__ = """ +Represents a range of characters within one line of source code, +and some associated data. + +Typically this will be converted to a pair of markers by markers_from_ranges. +""" + +MarkerInLine = NamedTuple('MarkerInLine', + [('position', int), + ('is_start', bool), + ('string', str)]) +MarkerInLine.__doc__ = """ +A string that is meant to be inserted at a given position in a line of source code. +For example, this could be an ANSI code or the opening or closing of an HTML tag. +is_start should be True if this is the first of a pair such as the opening of an HTML tag. +This will help to sort and insert markers correctly. + +Typically this would be created from a RangeInLine by markers_from_ranges. +Then use Line.render to insert the markers correctly. +""" + + +class BlankLines(Enum): + """The values are intended to correspond to the following behaviour: + HIDDEN: blank lines are not shown in the output + VISIBLE: blank lines are visible in the output + SINGLE: any consecutive blank lines are shown as a single blank line + in the output. This option requires the line number to be shown. + For a single blank line, the corresponding line number is shown. + Two or more consecutive blank lines are shown as a single blank + line in the output with a custom string shown instead of a + specific line number. + """ + HIDDEN = 1 + VISIBLE = 2 + SINGLE=3 + +class Variable( + NamedTuple('_Variable', + [('name', str), + ('nodes', Sequence[ast.AST]), + ('value', Any)]) +): + """ + An expression that appears one or more times in source code and its associated value. + This will usually be a variable but it can be any expression evaluated by pure_eval. + - name is the source text of the expression. + - nodes is a list of equivalent nodes representing the same expression. + - value is the safely evaluated value of the expression. + """ + __hash__ = object.__hash__ + __eq__ = object.__eq__ + + +class Source(executing.Source): + """ + The source code of a single file and associated metadata. + + In addition to the attributes from the base class executing.Source, + if .tree is not None, meaning this is valid Python code, objects have: + - pieces: a list of Piece objects + - tokens_by_lineno: a defaultdict(list) mapping line numbers to lists of tokens. + + Don't construct this class. Get an instance from frame_info.source. + """ + + @cached_property + def pieces(self) -> List[range]: + if not self.tree: + return [ + range(i, i + 1) + for i in range(1, len(self.lines) + 1) + ] + return list(self._clean_pieces()) + + @cached_property + def tokens_by_lineno(self) -> Mapping[int, List[Token]]: + if not self.tree: + raise AttributeError("This file doesn't contain valid Python, so .tokens_by_lineno doesn't exist") + return group_by_key_func( + self.asttokens().tokens, + lambda tok: tok.start[0], + ) + + def _clean_pieces(self) -> Iterator[range]: + pieces = self._raw_split_into_pieces(self.tree, 1, len(self.lines) + 1) + pieces = [ + (start, end) + for (start, end) in pieces + if end > start + ] + + # Combine overlapping pieces, i.e. consecutive pieces where the end of the first + # is greater than the start of the second. + # This can happen when two statements are on the same line separated by a semicolon. + new_pieces = pieces[:1] + for (start, end) in pieces[1:]: + (last_start, last_end) = new_pieces[-1] + if start < last_end: + assert start == last_end - 1 + assert ';' in self.lines[start - 1] + new_pieces[-1] = (last_start, end) + else: + new_pieces.append((start, end)) + pieces = new_pieces + + starts = [start for start, end in pieces[1:]] + ends = [end for start, end in pieces[:-1]] + if starts != ends: + joins = list(map(set, zip(starts, ends))) + mismatches = [s for s in joins if len(s) > 1] + raise AssertionError("Pieces mismatches: %s" % mismatches) + + def is_blank(i): + try: + return not self.lines[i - 1].strip() + except IndexError: + return False + + for start, end in pieces: + while is_blank(start): + start += 1 + while is_blank(end - 1): + end -= 1 + if start < end: + yield range(start, end) + + def _raw_split_into_pieces( + self, + stmt: ast.AST, + start: int, + end: int, + ) -> Iterator[Tuple[int, int]]: + for name, body in ast.iter_fields(stmt): + if ( + isinstance(body, list) and body and + isinstance(body[0], (ast.stmt, ast.ExceptHandler, getattr(ast, 'match_case', ()))) + ): + for rang, group in sorted(group_by_key_func(body, self.line_range).items()): + sub_stmt = group[0] + for inner_start, inner_end in self._raw_split_into_pieces(sub_stmt, *rang): + if start < inner_start: + yield start, inner_start + if inner_start < inner_end: + yield inner_start, inner_end + start = inner_end + + yield start, end + + def line_range(self, node: ast.AST) -> Tuple[int, int]: + return line_range(self.asttext(), node) + + +class Options: + """ + Configuration for FrameInfo, either in the constructor or the .stack_data classmethod. + These all determine which Lines and gaps are produced by FrameInfo.lines. + + before and after are the number of pieces of context to include in a frame + in addition to the executing piece. + + include_signature is whether to include the function signature as a piece in a frame. + + If a piece (other than the executing piece) has more than max_lines_per_piece lines, + it will be truncated with a gap in the middle. + """ + def __init__( + self, *, + before: int = 3, + after: int = 1, + include_signature: bool = False, + max_lines_per_piece: int = 6, + pygments_formatter=None, + blank_lines = BlankLines.HIDDEN + ): + self.before = before + self.after = after + self.include_signature = include_signature + self.max_lines_per_piece = max_lines_per_piece + self.pygments_formatter = pygments_formatter + self.blank_lines = blank_lines + + def __repr__(self): + keys = sorted(self.__dict__) + items = ("{}={!r}".format(k, self.__dict__[k]) for k in keys) + return "{}({})".format(type(self).__name__, ", ".join(items)) + + +class LineGap(object): + """ + A singleton representing one or more lines of source code that were skipped + in FrameInfo.lines. + + LINE_GAP can be created in two ways: + - by truncating a piece of context that's too long. + - immediately after the signature piece if Options.include_signature is true + and the following piece isn't already part of the included pieces. + """ + def __repr__(self): + return "LINE_GAP" + + +LINE_GAP = LineGap() + + +class BlankLineRange: + """ + Records the line number range for blank lines gaps between pieces. + For a single blank line, begin_lineno == end_lineno. + """ + def __init__(self, begin_lineno: int, end_lineno: int): + self.begin_lineno = begin_lineno + self.end_lineno = end_lineno + + +class Line(object): + """ + A single line of source code for a particular stack frame. + + Typically this is obtained from FrameInfo.lines. + Since that list may also contain LINE_GAP, you should first check + that this is really a Line before using it. + + Attributes: + - frame_info + - lineno: the 1-based line number within the file + - text: the raw source of this line. For displaying text, see .render() instead. + - leading_indent: the number of leading spaces that should probably be stripped. + This attribute is set within FrameInfo.lines. If you construct this class + directly you should probably set it manually (at least to 0). + - is_current: whether this is the line currently being executed by the interpreter + within this frame. + - tokens: a list of source tokens in this line + + There are several helpers for constructing RangeInLines which can be converted to markers + using markers_from_ranges which can be passed to .render(): + - token_ranges + - variable_ranges + - executing_node_ranges + - range_from_node + """ + def __init__( + self, + frame_info: 'FrameInfo', + lineno: int, + ): + self.frame_info = frame_info + self.lineno = lineno + self.text = frame_info.source.lines[lineno - 1] # type: str + self.leading_indent = None # type: Optional[int] + + def __repr__(self): + return "<{self.__class__.__name__} {self.lineno} (current={self.is_current}) " \ + "{self.text!r} of {self.frame_info.filename}>".format(self=self) + + @property + def is_current(self) -> bool: + """ + Whether this is the line currently being executed by the interpreter + within this frame. + """ + return self.lineno == self.frame_info.lineno + + @property + def tokens(self) -> List[Token]: + """ + A list of source tokens in this line. + The tokens are Token objects from asttokens: + https://asttokens.readthedocs.io/en/latest/api-index.html#asttokens.util.Token + """ + return self.frame_info.source.tokens_by_lineno[self.lineno] + + @cached_property + def token_ranges(self) -> List[RangeInLine]: + """ + A list of RangeInLines for each token in .tokens, + where range.data is a Token object from asttokens: + https://asttokens.readthedocs.io/en/latest/api-index.html#asttokens.util.Token + """ + return [ + RangeInLine( + token.start[1], + token.end[1], + token, + ) + for token in self.tokens + ] + + @cached_property + def variable_ranges(self) -> List[RangeInLine]: + """ + A list of RangeInLines for each Variable that appears at least partially in this line. + The data attribute of the range is a pair (variable, node) where node is the particular + AST node from the list variable.nodes that corresponds to this range. + """ + return [ + self.range_from_node(node, (variable, node)) + for variable, node in self.frame_info.variables_by_lineno[self.lineno] + ] + + @cached_property + def executing_node_ranges(self) -> List[RangeInLine]: + """ + A list of one or zero RangeInLines for the executing node of this frame. + The list will have one element if the node can be found and it overlaps this line. + """ + return self._raw_executing_node_ranges( + self.frame_info._executing_node_common_indent + ) + + def _raw_executing_node_ranges(self, common_indent=0) -> List[RangeInLine]: + ex = self.frame_info.executing + node = ex.node + if node: + rang = self.range_from_node(node, ex, common_indent) + if rang: + return [rang] + return [] + + def range_from_node( + self, node: ast.AST, data: Any, common_indent: int = 0 + ) -> Optional[RangeInLine]: + """ + If the given node overlaps with this line, return a RangeInLine + with the correct start and end and the given data. + Otherwise, return None. + """ + atext = self.frame_info.source.asttext() + (start, range_start), (end, range_end) = atext.get_text_positions(node, padded=False) + + if not (start <= self.lineno <= end): + return None + + if start != self.lineno: + range_start = common_indent + + if end != self.lineno: + range_end = len(self.text) + + if range_start == range_end == 0: + # This is an empty line. If it were included, it would result + # in a value of zero for the common indentation assigned to + # a block of code. + return None + + return RangeInLine(range_start, range_end, data) + + def render( + self, + markers: Iterable[MarkerInLine] = (), + *, + strip_leading_indent: bool = True, + pygmented: bool = False, + escape_html: bool = False + ) -> str: + """ + Produces a string for display consisting of .text + with the .strings of each marker inserted at the correct positions. + If strip_leading_indent is true (the default) then leading spaces + common to all lines in this frame will be excluded. + """ + if pygmented and self.frame_info.scope: + assert_(not markers, ValueError("Cannot use pygmented with markers")) + start_line, lines = self.frame_info._pygmented_scope_lines + result = lines[self.lineno - start_line] + if strip_leading_indent: + result = result.replace(self.text[:self.leading_indent], "", 1) + return result + + text = self.text + + # This just makes the loop below simpler + markers = list(markers) + [MarkerInLine(position=len(text), is_start=False, string='')] + + markers.sort(key=lambda t: t[:2]) + + parts = [] + if strip_leading_indent: + start = self.leading_indent + else: + start = 0 + original_start = start + + for marker in markers: + text_part = text[start:marker.position] + if escape_html: + text_part = html.escape(text_part) + parts.append(text_part) + parts.append(marker.string) + + # Ensure that start >= leading_indent + start = max(marker.position, original_start) + return ''.join(parts) + + +def markers_from_ranges( + ranges: Iterable[RangeInLine], + converter: Callable[[RangeInLine], Optional[Tuple[str, str]]], +) -> List[MarkerInLine]: + """ + Helper to create MarkerInLines given some RangeInLines. + converter should be a function accepting a RangeInLine returning + either None (which is ignored) or a pair of strings which + are used to create two markers included in the returned list. + """ + markers = [] + for rang in ranges: + converted = converter(rang) + if converted is None: + continue + + start_string, end_string = converted + if not (isinstance(start_string, str) and isinstance(end_string, str)): + raise TypeError("converter should return None or a pair of strings") + + markers += [ + MarkerInLine(position=rang.start, is_start=True, string=start_string), + MarkerInLine(position=rang.end, is_start=False, string=end_string), + ] + return markers + + +def style_with_executing_node(style, modifier): + from pygments.styles import get_style_by_name + if isinstance(style, str): + style = get_style_by_name(style) + + class NewStyle(style): + for_executing_node = True + + styles = { + **style.styles, + **{ + k.ExecutingNode: v + " " + modifier + for k, v in style.styles.items() + } + } + + return NewStyle + + +class RepeatedFrames: + """ + A sequence of consecutive stack frames which shouldn't be displayed because + the same code and line number were repeated many times in the stack, e.g. + because of deep recursion. + + Attributes: + - frames: list of raw frame or traceback objects + - frame_keys: list of tuples (frame.f_code, lineno) extracted from the frame objects. + It's this information from the frames that is used to determine + whether two frames should be considered similar (i.e. repeating). + - description: A string briefly describing frame_keys + """ + def __init__( + self, + frames: List[Union[FrameType, TracebackType]], + frame_keys: List[Tuple[CodeType, int]], + ): + self.frames = frames + self.frame_keys = frame_keys + + @cached_property + def description(self) -> str: + """ + A string briefly describing the repeated frames, e.g. + my_function at line 10 (100 times) + """ + counts = sorted(Counter(self.frame_keys).items(), + key=lambda item: (-item[1], item[0][0].co_name)) + return ', '.join( + '{name} at line {lineno} ({count} times)'.format( + name=Source.for_filename(code.co_filename).code_qualname(code), + lineno=lineno, + count=count, + ) + for (code, lineno), count in counts + ) + + def __repr__(self): + return '<{self.__class__.__name__} {self.description}>'.format(self=self) + + +class FrameInfo(object): + """ + Information about a frame! + Pass either a frame object or a traceback object, + and optionally an Options object to configure. + + Or use the classmethod FrameInfo.stack_data() for an iterator of FrameInfo and + RepeatedFrames objects. + + Attributes: + - frame: an actual stack frame object, either frame_or_tb or frame_or_tb.tb_frame + - options + - code: frame.f_code + - source: a Source object + - filename: a hopefully absolute file path derived from code.co_filename + - scope: the AST node of the innermost function, class or module being executed + - lines: a list of Line/LineGap objects to display, determined by options + - executing: an Executing object from the `executing` library, which has: + - .node: the AST node being executed in this frame, or None if it's unknown + - .statements: a set of one or more candidate statements (AST nodes, probably just one) + currently being executed in this frame. + - .code_qualname(): the __qualname__ of the function or class being executed, + or just the code name. + + Properties returning one or more pieces of source code (ranges of lines): + - scope_pieces: all the pieces in the scope + - included_pieces: a subset of scope_pieces determined by options + - executing_piece: the piece currently being executed in this frame + + Properties returning lists of Variable objects: + - variables: all variables in the scope + - variables_by_lineno: variables organised into lines + - variables_in_lines: variables contained within FrameInfo.lines + - variables_in_executing_piece: variables contained within FrameInfo.executing_piece + """ + def __init__( + self, + frame_or_tb: Union[FrameType, TracebackType], + options: Optional[Options] = None, + ): + self.executing = Source.executing(frame_or_tb) + frame, self.lineno = frame_and_lineno(frame_or_tb) + self.frame = frame + self.code = frame.f_code + self.options = options or Options() # type: Options + self.source = self.executing.source # type: Source + + + def __repr__(self): + return "{self.__class__.__name__}({self.frame})".format(self=self) + + @classmethod + def stack_data( + cls, + frame_or_tb: Union[FrameType, TracebackType], + options: Optional[Options] = None, + *, + collapse_repeated_frames: bool = True + ) -> Iterator[Union['FrameInfo', RepeatedFrames]]: + """ + An iterator of FrameInfo and RepeatedFrames objects representing + a full traceback or stack. Similar consecutive frames are collapsed into RepeatedFrames + objects, so always check what type of object has been yielded. + + Pass either a frame object or a traceback object, + and optionally an Options object to configure. + """ + stack = list(iter_stack(frame_or_tb)) + + # Reverse the stack from a frame so that it's in the same order + # as the order from a traceback, which is the order of a printed + # traceback when read top to bottom (most recent call last) + if is_frame(frame_or_tb): + stack = stack[::-1] + + def mapper(f): + return cls(f, options) + + if not collapse_repeated_frames: + yield from map(mapper, stack) + return + + def _frame_key(x): + frame, lineno = frame_and_lineno(x) + return frame.f_code, lineno + + yield from collapse_repeated( + stack, + mapper=mapper, + collapser=RepeatedFrames, + key=_frame_key, + ) + + @cached_property + def scope_pieces(self) -> List[range]: + """ + All the pieces (ranges of lines) contained in this object's .scope, + unless there is no .scope (because the source isn't valid Python syntax) + in which case it returns all the pieces in the source file, each containing one line. + """ + if not self.scope: + return self.source.pieces + + scope_start, scope_end = self.source.line_range(self.scope) + return [ + piece + for piece in self.source.pieces + if scope_start <= piece.start and piece.stop <= scope_end + ] + + @cached_property + def filename(self) -> str: + """ + A hopefully absolute file path derived from .code.co_filename, + the current working directory, and sys.path. + Code based on ipython. + """ + result = self.code.co_filename + + if ( + os.path.isabs(result) or + ( + result.startswith("<") and + result.endswith(">") + ) + ): + return result + + # Try to make the filename absolute by trying all + # sys.path entries (which is also what linecache does) + # as well as the current working directory + for dirname in ["."] + list(sys.path): + try: + fullname = os.path.join(dirname, result) + if os.path.isfile(fullname): + return os.path.abspath(fullname) + except Exception: + # Just in case that sys.path contains very + # strange entries... + pass + + return result + + @cached_property + def executing_piece(self) -> range: + """ + The piece (range of lines) containing the line currently being executed + by the interpreter in this frame. + """ + return only( + piece + for piece in self.scope_pieces + if self.lineno in piece + ) + + @cached_property + def included_pieces(self) -> List[range]: + """ + The list of pieces (ranges of lines) to display for this frame. + Consists of .executing_piece, surrounding context pieces + determined by .options.before and .options.after, + and the function signature if a function is being executed and + .options.include_signature is True (in which case this might not + be a contiguous range of pieces). + Always a subset of .scope_pieces. + """ + scope_pieces = self.scope_pieces + if not self.scope_pieces: + return [] + + pos = scope_pieces.index(self.executing_piece) + pieces_start = max(0, pos - self.options.before) + pieces_end = pos + 1 + self.options.after + pieces = scope_pieces[pieces_start:pieces_end] + + if ( + self.options.include_signature + and not self.code.co_name.startswith('<') + and isinstance(self.scope, (ast.FunctionDef, ast.AsyncFunctionDef)) + and pieces_start > 0 + ): + pieces.insert(0, scope_pieces[0]) + + return pieces + + @cached_property + def _executing_node_common_indent(self) -> int: + """ + The common minimal indentation shared by the markers intended + for an exception node that spans multiple lines. + + Intended to be used only internally. + """ + indents = [] + lines = [line for line in self.lines if isinstance(line, Line)] + + for line in lines: + for rang in line._raw_executing_node_ranges(): + begin_text = len(line.text) - len(line.text.lstrip()) + indent = max(rang.start, begin_text) + indents.append(indent) + + if len(indents) <= 1: + return 0 + + return min(indents[1:]) + + @cached_property + def lines(self) -> List[Union[Line, LineGap, BlankLineRange]]: + """ + A list of lines to display, determined by options. + The objects yielded either have type Line, BlankLineRange + or are the singleton LINE_GAP. + Always check the type that you're dealing with when iterating. + + LINE_GAP can be created in two ways: + - by truncating a piece of context that's too long, determined by + .options.max_lines_per_piece + - immediately after the signature piece if Options.include_signature is true + and the following piece isn't already part of the included pieces. + + The Line objects are all within the ranges from .included_pieces. + """ + pieces = self.included_pieces + if not pieces: + return [] + + add_empty_lines = self.options.blank_lines in (BlankLines.VISIBLE, BlankLines.SINGLE) + prev_piece = None + result = [] + for i, piece in enumerate(pieces): + if ( + i == 1 + and self.scope + and pieces[0] == self.scope_pieces[0] + and pieces[1] != self.scope_pieces[1] + ): + result.append(LINE_GAP) + elif prev_piece and add_empty_lines and piece.start > prev_piece.stop: + if self.options.blank_lines == BlankLines.SINGLE: + result.append(BlankLineRange(prev_piece.stop, piece.start-1)) + else: # BlankLines.VISIBLE + for lineno in range(prev_piece.stop, piece.start): + result.append(Line(self, lineno)) + + lines = [Line(self, i) for i in piece] # type: List[Line] + if piece != self.executing_piece: + lines = truncate( + lines, + max_length=self.options.max_lines_per_piece, + middle=[LINE_GAP], + ) + result.extend(lines) + prev_piece = piece + + real_lines = [ + line + for line in result + if isinstance(line, Line) + ] + + text = "\n".join( + line.text + for line in real_lines + ) + dedented_lines = dedent(text).splitlines() + leading_indent = len(real_lines[0].text) - len(dedented_lines[0]) + for line in real_lines: + line.leading_indent = leading_indent + return result + + @cached_property + def scope(self) -> Optional[ast.AST]: + """ + The AST node of the innermost function, class or module being executed. + """ + if not self.source.tree or not self.executing.statements: + return None + + stmt = list(self.executing.statements)[0] + while True: + # Get the parent first in case the original statement is already + # a function definition, e.g. if we're calling a decorator + # In that case we still want the surrounding scope, not that function + stmt = stmt.parent + if isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Module)): + return stmt + + @cached_property + def _pygmented_scope_lines(self) -> Optional[Tuple[int, List[str]]]: + # noinspection PyUnresolvedReferences + from pygments.formatters import HtmlFormatter + + formatter = self.options.pygments_formatter + scope = self.scope + assert_(formatter, ValueError("Must set a pygments formatter in Options")) + assert_(scope) + + if isinstance(formatter, HtmlFormatter): + formatter.nowrap = True + + atext = self.source.asttext() + node = self.executing.node + if node and getattr(formatter.style, "for_executing_node", False): + scope_start = atext.get_text_range(scope)[0] + start, end = atext.get_text_range(node) + start -= scope_start + end -= scope_start + ranges = [(start, end)] + else: + ranges = [] + + code = atext.get_text(scope) + lines = _pygmented_with_ranges(formatter, code, ranges) + + start_line = self.source.line_range(scope)[0] + + return start_line, lines + + @cached_property + def variables(self) -> List[Variable]: + """ + All Variable objects whose nodes are contained within .scope + and whose values could be safely evaluated by pure_eval. + """ + if not self.scope: + return [] + + evaluator = Evaluator.from_frame(self.frame) + scope = self.scope + node_values = [ + pair + for pair in evaluator.find_expressions(scope) + if is_expression_interesting(*pair) + ] # type: List[Tuple[ast.AST, Any]] + + if isinstance(scope, (ast.FunctionDef, ast.AsyncFunctionDef)): + for node in ast.walk(scope.args): + if not isinstance(node, ast.arg): + continue + name = node.arg + try: + value = evaluator.names[name] + except KeyError: + pass + else: + node_values.append((node, value)) + + # Group equivalent nodes together + def get_text(n): + if isinstance(n, ast.arg): + return n.arg + else: + return self.source.asttext().get_text(n) + + def normalise_node(n): + try: + # Add parens to avoid syntax errors for multiline expressions + return ast.parse('(' + get_text(n) + ')') + except Exception: + return n + + grouped = group_by_key_func( + node_values, + lambda nv: ast.dump(normalise_node(nv[0])), + ) + + result = [] + for group in grouped.values(): + nodes, values = zip(*group) + value = values[0] + text = get_text(nodes[0]) + if not text: + continue + result.append(Variable(text, nodes, value)) + + return result + + @cached_property + def variables_by_lineno(self) -> Mapping[int, List[Tuple[Variable, ast.AST]]]: + """ + A mapping from 1-based line numbers to lists of pairs: + - A Variable object + - A specific AST node from the variable's .nodes list that's + in the line at that line number. + """ + result = defaultdict(list) + for var in self.variables: + for node in var.nodes: + for lineno in range(*self.source.line_range(node)): + result[lineno].append((var, node)) + return result + + @cached_property + def variables_in_lines(self) -> List[Variable]: + """ + A list of Variable objects contained within the lines returned by .lines. + """ + return unique_in_order( + var + for line in self.lines + if isinstance(line, Line) + for var, node in self.variables_by_lineno[line.lineno] + ) + + @cached_property + def variables_in_executing_piece(self) -> List[Variable]: + """ + A list of Variable objects contained within the lines + in the range returned by .executing_piece. + """ + return unique_in_order( + var + for lineno in self.executing_piece + for var, node in self.variables_by_lineno[lineno] + ) diff --git a/contrib/python/stack-data/stack_data/formatting.py b/contrib/python/stack-data/stack_data/formatting.py new file mode 100644 index 0000000000..e4c6f07fc8 --- /dev/null +++ b/contrib/python/stack-data/stack_data/formatting.py @@ -0,0 +1,234 @@ +import inspect +import sys +import traceback +from types import FrameType, TracebackType +from typing import Union, Iterable + +from stack_data import (style_with_executing_node, Options, Line, FrameInfo, LINE_GAP, + Variable, RepeatedFrames, BlankLineRange, BlankLines) +from stack_data.utils import assert_ + + +class Formatter: + def __init__( + self, *, + options=None, + pygmented=False, + show_executing_node=True, + pygments_formatter_cls=None, + pygments_formatter_kwargs=None, + pygments_style="monokai", + executing_node_modifier="bg:#005080", + executing_node_underline="^", + current_line_indicator="-->", + line_gap_string="(...)", + line_number_gap_string=":", + line_number_format_string="{:4} | ", + show_variables=False, + use_code_qualname=True, + show_linenos=True, + strip_leading_indent=True, + html=False, + chain=True, + collapse_repeated_frames=True + ): + if options is None: + options = Options() + + if pygmented and not options.pygments_formatter: + if show_executing_node: + pygments_style = style_with_executing_node( + pygments_style, executing_node_modifier + ) + + if pygments_formatter_cls is None: + from pygments.formatters.terminal256 import Terminal256Formatter \ + as pygments_formatter_cls + + options.pygments_formatter = pygments_formatter_cls( + style=pygments_style, + **pygments_formatter_kwargs or {}, + ) + + self.pygmented = pygmented + self.show_executing_node = show_executing_node + assert_( + len(executing_node_underline) == 1, + ValueError("executing_node_underline must be a single character"), + ) + self.executing_node_underline = executing_node_underline + self.current_line_indicator = current_line_indicator or "" + self.line_gap_string = line_gap_string + self.line_number_gap_string = line_number_gap_string + self.line_number_format_string = line_number_format_string + self.show_variables = show_variables + self.show_linenos = show_linenos + self.use_code_qualname = use_code_qualname + self.strip_leading_indent = strip_leading_indent + self.html = html + self.chain = chain + self.options = options + self.collapse_repeated_frames = collapse_repeated_frames + if not self.show_linenos and self.options.blank_lines == BlankLines.SINGLE: + raise ValueError( + "BlankLines.SINGLE option can only be used when show_linenos=True" + ) + + def set_hook(self): + def excepthook(_etype, evalue, _tb): + self.print_exception(evalue) + + sys.excepthook = excepthook + + def print_exception(self, e=None, *, file=None): + self.print_lines(self.format_exception(e), file=file) + + def print_stack(self, frame_or_tb=None, *, file=None): + if frame_or_tb is None: + frame_or_tb = inspect.currentframe().f_back + + self.print_lines(self.format_stack(frame_or_tb), file=file) + + def print_lines(self, lines, *, file=None): + if file is None: + file = sys.stderr + for line in lines: + print(line, file=file, end="") + + def format_exception(self, e=None) -> Iterable[str]: + if e is None: + e = sys.exc_info()[1] + + if self.chain: + if e.__cause__ is not None: + yield from self.format_exception(e.__cause__) + yield traceback._cause_message + elif (e.__context__ is not None + and not e.__suppress_context__): + yield from self.format_exception(e.__context__) + yield traceback._context_message + + yield 'Traceback (most recent call last):\n' + yield from self.format_stack(e.__traceback__) + yield from traceback.format_exception_only(type(e), e) + + def format_stack(self, frame_or_tb=None) -> Iterable[str]: + if frame_or_tb is None: + frame_or_tb = inspect.currentframe().f_back + + yield from self.format_stack_data( + FrameInfo.stack_data( + frame_or_tb, + self.options, + collapse_repeated_frames=self.collapse_repeated_frames, + ) + ) + + def format_stack_data( + self, stack: Iterable[Union[FrameInfo, RepeatedFrames]] + ) -> Iterable[str]: + for item in stack: + if isinstance(item, FrameInfo): + yield from self.format_frame(item) + else: + yield self.format_repeated_frames(item) + + def format_repeated_frames(self, repeated_frames: RepeatedFrames) -> str: + return ' [... skipping similar frames: {}]\n'.format( + repeated_frames.description + ) + + def format_frame(self, frame: Union[FrameInfo, FrameType, TracebackType]) -> Iterable[str]: + if not isinstance(frame, FrameInfo): + frame = FrameInfo(frame, self.options) + + yield self.format_frame_header(frame) + + for line in frame.lines: + if isinstance(line, Line): + yield self.format_line(line) + elif isinstance(line, BlankLineRange): + yield self.format_blank_lines_linenumbers(line) + else: + assert_(line is LINE_GAP) + yield self.line_gap_string + "\n" + + if self.show_variables: + try: + yield from self.format_variables(frame) + except Exception: + pass + + def format_frame_header(self, frame_info: FrameInfo) -> str: + return ' File "{frame_info.filename}", line {frame_info.lineno}, in {name}\n'.format( + frame_info=frame_info, + name=( + frame_info.executing.code_qualname() + if self.use_code_qualname else + frame_info.code.co_name + ), + ) + + def format_line(self, line: Line) -> str: + result = "" + if self.current_line_indicator: + if line.is_current: + result = self.current_line_indicator + else: + result = " " * len(self.current_line_indicator) + result += " " + else: + result = " " + + if self.show_linenos: + result += self.line_number_format_string.format(line.lineno) + + prefix = result + + result += line.render( + pygmented=self.pygmented, + escape_html=self.html, + strip_leading_indent=self.strip_leading_indent, + ) + "\n" + + if self.show_executing_node and not self.pygmented: + for line_range in line.executing_node_ranges: + start = line_range.start - line.leading_indent + end = line_range.end - line.leading_indent + # if end <= start, we have an empty line inside a highlighted + # block of code. In this case, we need to avoid inserting + # an extra blank line with no markers present. + if end > start: + result += ( + " " * (start + len(prefix)) + + self.executing_node_underline * (end - start) + + "\n" + ) + return result + + + def format_blank_lines_linenumbers(self, blank_line): + if self.current_line_indicator: + result = " " * len(self.current_line_indicator) + " " + else: + result = " " + if blank_line.begin_lineno == blank_line.end_lineno: + return result + self.line_number_format_string.format(blank_line.begin_lineno) + "\n" + return result + " {}\n".format(self.line_number_gap_string) + + + def format_variables(self, frame_info: FrameInfo) -> Iterable[str]: + for var in sorted(frame_info.variables, key=lambda v: v.name): + try: + yield self.format_variable(var) + "\n" + except Exception: + pass + + def format_variable(self, var: Variable) -> str: + return "{} = {}".format( + var.name, + self.format_variable_value(var.value), + ) + + def format_variable_value(self, value) -> str: + return repr(value) diff --git a/contrib/python/stack-data/stack_data/py.typed b/contrib/python/stack-data/stack_data/py.typed new file mode 100644 index 0000000000..4402a14d48 --- /dev/null +++ b/contrib/python/stack-data/stack_data/py.typed @@ -0,0 +1 @@ +# Marker file for PEP 561. The ``stack_data`` package uses inline types. diff --git a/contrib/python/stack-data/stack_data/serializing.py b/contrib/python/stack-data/stack_data/serializing.py new file mode 100644 index 0000000000..0d813f69c4 --- /dev/null +++ b/contrib/python/stack-data/stack_data/serializing.py @@ -0,0 +1,201 @@ +import inspect +import logging +import sys +import traceback +from collections import Counter +from html import escape as escape_html +from types import FrameType, TracebackType +from typing import Union, Iterable, List + +from stack_data import ( + style_with_executing_node, + Options, + Line, + FrameInfo, + Variable, + RepeatedFrames, +) +from stack_data.utils import some_str + +log = logging.getLogger(__name__) + + +class Serializer: + def __init__( + self, + *, + options=None, + pygmented=False, + show_executing_node=True, + pygments_formatter_cls=None, + pygments_formatter_kwargs=None, + pygments_style="monokai", + executing_node_modifier="bg:#005080", + use_code_qualname=True, + strip_leading_indent=True, + html=False, + chain=True, + collapse_repeated_frames=True, + show_variables=False, + ): + if options is None: + options = Options() + + if pygmented and not options.pygments_formatter: + if show_executing_node: + pygments_style = style_with_executing_node( + pygments_style, executing_node_modifier + ) + + if pygments_formatter_cls is None: + if html: + from pygments.formatters.html import ( + HtmlFormatter as pygments_formatter_cls, + ) + else: + from pygments.formatters.terminal256 import ( + Terminal256Formatter as pygments_formatter_cls, + ) + + options.pygments_formatter = pygments_formatter_cls( + style=pygments_style, + **pygments_formatter_kwargs or {}, + ) + + self.pygmented = pygmented + self.use_code_qualname = use_code_qualname + self.strip_leading_indent = strip_leading_indent + self.html = html + self.chain = chain + self.options = options + self.collapse_repeated_frames = collapse_repeated_frames + self.show_variables = show_variables + + def format_exception(self, e=None) -> List[dict]: + if e is None: + e = sys.exc_info()[1] + + result = [] + + if self.chain: + if e.__cause__ is not None: + result = self.format_exception(e.__cause__) + result[-1]["tail"] = traceback._cause_message.strip() + elif e.__context__ is not None and not e.__suppress_context__: + result = self.format_exception(e.__context__) + result[-1]["tail"] = traceback._context_message.strip() + + result.append(self.format_traceback_part(e)) + return result + + def format_traceback_part(self, e: BaseException) -> dict: + return dict( + frames=self.format_stack(e.__traceback__ or sys.exc_info()[2]), + exception=dict( + type=type(e).__name__, + message=some_str(e), + ), + tail="", + ) + + def format_stack(self, frame_or_tb=None) -> List[dict]: + if frame_or_tb is None: + frame_or_tb = inspect.currentframe().f_back + + return list( + self.format_stack_data( + FrameInfo.stack_data( + frame_or_tb, + self.options, + collapse_repeated_frames=self.collapse_repeated_frames, + ) + ) + ) + + def format_stack_data( + self, stack: Iterable[Union[FrameInfo, RepeatedFrames]] + ) -> Iterable[dict]: + for item in stack: + if isinstance(item, FrameInfo): + if not self.should_include_frame(item): + continue + yield dict(type="frame", **self.format_frame(item)) + else: + yield dict(type="repeated_frames", **self.format_repeated_frames(item)) + + def format_repeated_frames(self, repeated_frames: RepeatedFrames) -> dict: + counts = sorted( + Counter(repeated_frames.frame_keys).items(), + key=lambda item: (-item[1], item[0][0].co_name), + ) + return dict( + frames=[ + dict( + name=code.co_name, + lineno=lineno, + count=count, + ) + for (code, lineno), count in counts + ] + ) + + def format_frame(self, frame: Union[FrameInfo, FrameType, TracebackType]) -> dict: + if not isinstance(frame, FrameInfo): + frame = FrameInfo(frame, self.options) + + result = dict( + name=( + frame.executing.code_qualname() + if self.use_code_qualname + else frame.code.co_name + ), + filename=frame.filename, + lineno=frame.lineno, + lines=list(self.format_lines(frame.lines)), + ) + if self.show_variables: + result["variables"] = list(self.format_variables(frame)) + return result + + def format_lines(self, lines): + for line in lines: + if isinstance(line, Line): + yield dict(type="line", **self.format_line(line)) + else: + yield dict(type="line_gap") + + def format_line(self, line: Line) -> dict: + return dict( + is_current=line.is_current, + lineno=line.lineno, + text=line.render( + pygmented=self.pygmented, + escape_html=self.html, + strip_leading_indent=self.strip_leading_indent, + ), + ) + + def format_variables(self, frame_info: FrameInfo) -> Iterable[dict]: + try: + for var in sorted(frame_info.variables, key=lambda v: v.name): + yield self.format_variable(var) + except Exception: # pragma: no cover + log.exception("Error in getting frame variables") + + def format_variable(self, var: Variable) -> dict: + return dict( + name=self.format_variable_part(var.name), + value=self.format_variable_part(self.format_variable_value(var.value)), + ) + + def format_variable_part(self, text): + if self.html: + return escape_html(text) + else: + return text + + def format_variable_value(self, value) -> str: + return repr(value) + + def should_include_frame(self, frame_info: FrameInfo) -> bool: + return True # pragma: no cover diff --git a/contrib/python/stack-data/stack_data/utils.py b/contrib/python/stack-data/stack_data/utils.py new file mode 100644 index 0000000000..78ce2d60a4 --- /dev/null +++ b/contrib/python/stack-data/stack_data/utils.py @@ -0,0 +1,184 @@ +import ast +import itertools +import types +from collections import OrderedDict, Counter, defaultdict +from types import FrameType, TracebackType +from typing import ( + Iterator, List, Tuple, Iterable, Callable, Union, + TypeVar, Mapping, +) + +from asttokens import ASTText + +T = TypeVar('T') +R = TypeVar('R') + + +def truncate(seq, max_length: int, middle): + if len(seq) > max_length: + right = (max_length - len(middle)) // 2 + left = max_length - len(middle) - right + seq = seq[:left] + middle + seq[-right:] + return seq + + +def unique_in_order(it: Iterable[T]) -> List[T]: + return list(OrderedDict.fromkeys(it)) + + +def line_range(atok: ASTText, node: ast.AST) -> Tuple[int, int]: + """ + Returns a pair of numbers representing a half open range + (i.e. suitable as arguments to the `range()` builtin) + of line numbers of the given AST nodes. + """ + if isinstance(node, getattr(ast, "match_case", ())): + start, _end = line_range(atok, node.pattern) + _start, end = line_range(atok, node.body[-1]) + return start, end + else: + (start, _), (end, _) = atok.get_text_positions(node, padded=False) + return start, end + 1 + + +def highlight_unique(lst: List[T]) -> Iterator[Tuple[T, bool]]: + counts = Counter(lst) + + for is_common, group in itertools.groupby(lst, key=lambda x: counts[x] > 3): + if is_common: + group = list(group) + highlighted = [False] * len(group) + + def highlight_index(f): + try: + i = f() + except ValueError: + return None + highlighted[i] = True + return i + + for item in set(group): + first = highlight_index(lambda: group.index(item)) + if first is not None: + highlight_index(lambda: group.index(item, first + 1)) + highlight_index(lambda: -1 - group[::-1].index(item)) + else: + highlighted = itertools.repeat(True) + + yield from zip(group, highlighted) + + +def identity(x: T) -> T: + return x + + +def collapse_repeated(lst, *, collapser, mapper=identity, key=identity): + keyed = list(map(key, lst)) + for is_highlighted, group in itertools.groupby( + zip(lst, highlight_unique(keyed)), + key=lambda t: t[1][1], + ): + original_group, highlighted_group = zip(*group) + if is_highlighted: + yield from map(mapper, original_group) + else: + keyed_group, _ = zip(*highlighted_group) + yield collapser(list(original_group), list(keyed_group)) + + +def is_frame(frame_or_tb: Union[FrameType, TracebackType]) -> bool: + assert_(isinstance(frame_or_tb, (types.FrameType, types.TracebackType))) + return isinstance(frame_or_tb, (types.FrameType,)) + + +def iter_stack(frame_or_tb: Union[FrameType, TracebackType]) -> Iterator[Union[FrameType, TracebackType]]: + while frame_or_tb: + yield frame_or_tb + if is_frame(frame_or_tb): + frame_or_tb = frame_or_tb.f_back + else: + frame_or_tb = frame_or_tb.tb_next + + +def frame_and_lineno(frame_or_tb: Union[FrameType, TracebackType]) -> Tuple[FrameType, int]: + if is_frame(frame_or_tb): + return frame_or_tb, frame_or_tb.f_lineno + else: + return frame_or_tb.tb_frame, frame_or_tb.tb_lineno + + +def group_by_key_func(iterable: Iterable[T], key_func: Callable[[T], R]) -> Mapping[R, List[T]]: + # noinspection PyUnresolvedReferences + """ + Create a dictionary from an iterable such that the keys are the result of evaluating a key function on elements + of the iterable and the values are lists of elements all of which correspond to the key. + + >>> def si(d): return sorted(d.items()) + >>> si(group_by_key_func("a bb ccc d ee fff".split(), len)) + [(1, ['a', 'd']), (2, ['bb', 'ee']), (3, ['ccc', 'fff'])] + >>> si(group_by_key_func([-1, 0, 1, 3, 6, 8, 9, 2], lambda x: x % 2)) + [(0, [0, 6, 8, 2]), (1, [-1, 1, 3, 9])] + """ + result = defaultdict(list) + for item in iterable: + result[key_func(item)].append(item) + return result + + +class cached_property(object): + """ + A property that is only computed once per instance and then replaces itself + with an ordinary attribute. Deleting the attribute resets the property. + + Based on https://github.com/pydanny/cached-property/blob/master/cached_property.py + """ + + def __init__(self, func): + self.__doc__ = func.__doc__ + self.func = func + + def cached_property_wrapper(self, obj, _cls): + if obj is None: + return self + + value = obj.__dict__[self.func.__name__] = self.func(obj) + return value + + __get__ = cached_property_wrapper + + +def _pygmented_with_ranges(formatter, code, ranges): + import pygments + from pygments.lexers import get_lexer_by_name + + class MyLexer(type(get_lexer_by_name("python3"))): + def get_tokens(self, text): + length = 0 + for ttype, value in super().get_tokens(text): + if any(start <= length < end for start, end in ranges): + ttype = ttype.ExecutingNode + length += len(value) + yield ttype, value + + lexer = MyLexer(stripnl=False) + try: + highlighted = pygments.highlight(code, lexer, formatter) + except Exception: + # When pygments fails, prefer code without highlighting over crashing + highlighted = code + return highlighted.splitlines() + + +def assert_(condition, error=""): + if not condition: + if isinstance(error, str): + error = AssertionError(error) + raise error + + +# Copied from the standard traceback module pre-3.11 +def some_str(value): + try: + return str(value) + except: + return '<unprintable %s object>' % type(value).__name__ diff --git a/contrib/python/stack-data/stack_data/version.py b/contrib/python/stack-data/stack_data/version.py new file mode 100644 index 0000000000..aece342d93 --- /dev/null +++ b/contrib/python/stack-data/stack_data/version.py @@ -0,0 +1 @@ +__version__ = '0.6.2' |