aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/stack-data/stack_data
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /contrib/python/stack-data/stack_data
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'contrib/python/stack-data/stack_data')
-rw-r--r--contrib/python/stack-data/stack_data/__init__.py10
-rw-r--r--contrib/python/stack-data/stack_data/core.py882
-rw-r--r--contrib/python/stack-data/stack_data/formatting.py209
-rw-r--r--contrib/python/stack-data/stack_data/serializing.py200
-rw-r--r--contrib/python/stack-data/stack_data/utils.py172
-rw-r--r--contrib/python/stack-data/stack_data/version.py1
6 files changed, 0 insertions, 1474 deletions
diff --git a/contrib/python/stack-data/stack_data/__init__.py b/contrib/python/stack-data/stack_data/__init__.py
deleted file mode 100644
index a75f8846ac..0000000000
--- a/contrib/python/stack-data/stack_data/__init__.py
+++ /dev/null
@@ -1,10 +0,0 @@
-from .core import Source, FrameInfo, markers_from_ranges, Options, LINE_GAP, Line, Variable, RangeInLine, \
- RepeatedFrames, MarkerInLine, style_with_executing_node
-from .formatting import Formatter
-from .serializing import Serializer
-
-try:
- from .version import __version__
-except ImportError:
- # version.py is auto-generated with the git tag when building
- __version__ = "???"
diff --git a/contrib/python/stack-data/stack_data/core.py b/contrib/python/stack-data/stack_data/core.py
deleted file mode 100644
index 97313fe333..0000000000
--- a/contrib/python/stack-data/stack_data/core.py
+++ /dev/null
@@ -1,882 +0,0 @@
-import ast
-import html
-import os
-import sys
-from collections import defaultdict, Counter
-from textwrap import dedent
-from types import FrameType, CodeType, TracebackType
-from typing import (
- Iterator, List, Tuple, Optional, NamedTuple,
- Any, Iterable, Callable, Union,
- Sequence)
-from typing import Mapping
-
-import executing
-from asttokens.util import Token
-from executing import only
-from pure_eval import Evaluator, is_expression_interesting
-from stack_data.utils import (
- truncate, unique_in_order, line_range,
- frame_and_lineno, iter_stack, collapse_repeated, group_by_key_func,
- cached_property, is_frame, _pygmented_with_ranges, assert_)
-
-RangeInLine = NamedTuple('RangeInLine',
- [('start', int),
- ('end', int),
- ('data', Any)])
-RangeInLine.__doc__ = """
-Represents a range of characters within one line of source code,
-and some associated data.
-
-Typically this will be converted to a pair of markers by markers_from_ranges.
-"""
-
-MarkerInLine = NamedTuple('MarkerInLine',
- [('position', int),
- ('is_start', bool),
- ('string', str)])
-MarkerInLine.__doc__ = """
-A string that is meant to be inserted at a given position in a line of source code.
-For example, this could be an ANSI code or the opening or closing of an HTML tag.
-is_start should be True if this is the first of a pair such as the opening of an HTML tag.
-This will help to sort and insert markers correctly.
-
-Typically this would be created from a RangeInLine by markers_from_ranges.
-Then use Line.render to insert the markers correctly.
-"""
-
-
-class Variable(
- NamedTuple('_Variable',
- [('name', str),
- ('nodes', Sequence[ast.AST]),
- ('value', Any)])
-):
- """
- An expression that appears one or more times in source code and its associated value.
- This will usually be a variable but it can be any expression evaluated by pure_eval.
- - name is the source text of the expression.
- - nodes is a list of equivalent nodes representing the same expression.
- - value is the safely evaluated value of the expression.
- """
- __hash__ = object.__hash__
- __eq__ = object.__eq__
-
-
-class Source(executing.Source):
- """
- The source code of a single file and associated metadata.
-
- In addition to the attributes from the base class executing.Source,
- if .tree is not None, meaning this is valid Python code, objects have:
- - pieces: a list of Piece objects
- - tokens_by_lineno: a defaultdict(list) mapping line numbers to lists of tokens.
-
- Don't construct this class. Get an instance from frame_info.source.
- """
-
- def __init__(self, *args, **kwargs):
- super(Source, self).__init__(*args, **kwargs)
- if self.tree:
- self.asttokens()
-
- @cached_property
- def pieces(self) -> List[range]:
- if not self.tree:
- return [
- range(i, i + 1)
- for i in range(1, len(self.lines) + 1)
- ]
- return list(self._clean_pieces())
-
- @cached_property
- def tokens_by_lineno(self) -> Mapping[int, List[Token]]:
- if not self.tree:
- raise AttributeError("This file doesn't contain valid Python, so .tokens_by_lineno doesn't exist")
- return group_by_key_func(
- self.asttokens().tokens,
- lambda tok: tok.start[0],
- )
-
- def _clean_pieces(self) -> Iterator[range]:
- pieces = self._raw_split_into_pieces(self.tree, 1, len(self.lines) + 1)
- pieces = [
- (start, end)
- for (start, end) in pieces
- if end > start
- ]
-
- starts = [start for start, end in pieces[1:]]
- ends = [end for start, end in pieces[:-1]]
- if starts != ends:
- joins = list(map(set, zip(starts, ends)))
- mismatches = [s for s in joins if len(s) > 1]
- raise AssertionError("Pieces mismatches: %s" % mismatches)
-
- def is_blank(i):
- try:
- return not self.lines[i - 1].strip()
- except IndexError:
- return False
-
- for start, end in pieces:
- while is_blank(start):
- start += 1
- while is_blank(end - 1):
- end -= 1
- if start < end:
- yield range(start, end)
-
- def _raw_split_into_pieces(
- self,
- stmt: ast.AST,
- start: int,
- end: int,
- ) -> Iterator[Tuple[int, int]]:
- self.asttokens()
-
- for name, body in ast.iter_fields(stmt):
- if (
- isinstance(body, list) and body and
- isinstance(body[0], (ast.stmt, ast.ExceptHandler))
- ):
- for rang, group in sorted(group_by_key_func(body, line_range).items()):
- sub_stmt = group[0]
- for inner_start, inner_end in self._raw_split_into_pieces(sub_stmt, *rang):
- if start < inner_start:
- yield start, inner_start
- if inner_start < inner_end:
- yield inner_start, inner_end
- start = inner_end
-
- yield start, end
-
-
-class Options:
- """
- Configuration for FrameInfo, either in the constructor or the .stack_data classmethod.
- These all determine which Lines and gaps are produced by FrameInfo.lines.
-
- before and after are the number of pieces of context to include in a frame
- in addition to the executing piece.
-
- include_signature is whether to include the function signature as a piece in a frame.
-
- If a piece (other than the executing piece) has more than max_lines_per_piece lines,
- it will be truncated with a gap in the middle.
- """
- def __init__(
- self, *,
- before: int = 3,
- after: int = 1,
- include_signature: bool = False,
- max_lines_per_piece: int = 6,
- pygments_formatter=None
- ):
- self.before = before
- self.after = after
- self.include_signature = include_signature
- self.max_lines_per_piece = max_lines_per_piece
- self.pygments_formatter = pygments_formatter
-
- def __repr__(self):
- keys = sorted(self.__dict__)
- items = ("{}={!r}".format(k, self.__dict__[k]) for k in keys)
- return "{}({})".format(type(self).__name__, ", ".join(items))
-
-
-class LineGap(object):
- """
- A singleton representing one or more lines of source code that were skipped
- in FrameInfo.lines.
-
- LINE_GAP can be created in two ways:
- - by truncating a piece of context that's too long.
- - immediately after the signature piece if Options.include_signature is true
- and the following piece isn't already part of the included pieces.
- """
- def __repr__(self):
- return "LINE_GAP"
-
-
-LINE_GAP = LineGap()
-
-
-class Line(object):
- """
- A single line of source code for a particular stack frame.
-
- Typically this is obtained from FrameInfo.lines.
- Since that list may also contain LINE_GAP, you should first check
- that this is really a Line before using it.
-
- Attributes:
- - frame_info
- - lineno: the 1-based line number within the file
- - text: the raw source of this line. For displaying text, see .render() instead.
- - leading_indent: the number of leading spaces that should probably be stripped.
- This attribute is set within FrameInfo.lines. If you construct this class
- directly you should probably set it manually (at least to 0).
- - is_current: whether this is the line currently being executed by the interpreter
- within this frame.
- - tokens: a list of source tokens in this line
-
- There are several helpers for constructing RangeInLines which can be converted to markers
- using markers_from_ranges which can be passed to .render():
- - token_ranges
- - variable_ranges
- - executing_node_ranges
- - range_from_node
- """
- def __init__(
- self,
- frame_info: 'FrameInfo',
- lineno: int,
- ):
- self.frame_info = frame_info
- self.lineno = lineno
- self.text = frame_info.source.lines[lineno - 1] # type: str
- self.leading_indent = None # type: Optional[int]
-
- def __repr__(self):
- return "<{self.__class__.__name__} {self.lineno} (current={self.is_current}) " \
- "{self.text!r} of {self.frame_info.filename}>".format(self=self)
-
- @property
- def is_current(self) -> bool:
- """
- Whether this is the line currently being executed by the interpreter
- within this frame.
- """
- return self.lineno == self.frame_info.lineno
-
- @property
- def tokens(self) -> List[Token]:
- """
- A list of source tokens in this line.
- The tokens are Token objects from asttokens:
- https://asttokens.readthedocs.io/en/latest/api-index.html#asttokens.util.Token
- """
- return self.frame_info.source.tokens_by_lineno[self.lineno]
-
- @cached_property
- def token_ranges(self) -> List[RangeInLine]:
- """
- A list of RangeInLines for each token in .tokens,
- where range.data is a Token object from asttokens:
- https://asttokens.readthedocs.io/en/latest/api-index.html#asttokens.util.Token
- """
- return [
- RangeInLine(
- token.start[1],
- token.end[1],
- token,
- )
- for token in self.tokens
- ]
-
- @cached_property
- def variable_ranges(self) -> List[RangeInLine]:
- """
- A list of RangeInLines for each Variable that appears at least partially in this line.
- The data attribute of the range is a pair (variable, node) where node is the particular
- AST node from the list variable.nodes that corresponds to this range.
- """
- return [
- self.range_from_node(node, (variable, node))
- for variable, node in self.frame_info.variables_by_lineno[self.lineno]
- ]
-
- @cached_property
- def executing_node_ranges(self) -> List[RangeInLine]:
- """
- A list of one or zero RangeInLines for the executing node of this frame.
- The list will have one element if the node can be found and it overlaps this line.
- """
- return self._raw_executing_node_ranges(
- self.frame_info._executing_node_common_indent
- )
-
- def _raw_executing_node_ranges(self, common_indent=0) -> List[RangeInLine]:
- ex = self.frame_info.executing
- node = ex.node
- if node:
- rang = self.range_from_node(node, ex, common_indent)
- if rang:
- return [rang]
- return []
-
- def range_from_node(
- self, node: ast.AST, data: Any, common_indent: int = 0
- ) -> Optional[RangeInLine]:
- """
- If the given node overlaps with this line, return a RangeInLine
- with the correct start and end and the given data.
- Otherwise, return None.
- """
- start, end = line_range(node)
- end -= 1
- if not (start <= self.lineno <= end):
- return None
- if start == self.lineno:
- try:
- range_start = node.first_token.start[1]
- except AttributeError:
- range_start = node.col_offset
- else:
- range_start = 0
-
- range_start = max(range_start, common_indent)
-
- if end == self.lineno:
- try:
- range_end = node.last_token.end[1]
- except AttributeError:
- try:
- range_end = node.end_col_offset
- except AttributeError:
- return None
- else:
- range_end = len(self.text)
-
- return RangeInLine(range_start, range_end, data)
-
- def render(
- self,
- markers: Iterable[MarkerInLine] = (),
- *,
- strip_leading_indent: bool = True,
- pygmented: bool = False,
- escape_html: bool = False
- ) -> str:
- """
- Produces a string for display consisting of .text
- with the .strings of each marker inserted at the correct positions.
- If strip_leading_indent is true (the default) then leading spaces
- common to all lines in this frame will be excluded.
- """
- if pygmented and self.frame_info.scope:
- assert_(not markers, ValueError("Cannot use pygmented with markers"))
- start_line, lines = self.frame_info._pygmented_scope_lines
- result = lines[self.lineno - start_line]
- if strip_leading_indent:
- result = result.replace(self.text[:self.leading_indent], "", 1)
- return result
-
- text = self.text
-
- # This just makes the loop below simpler
- markers = list(markers) + [MarkerInLine(position=len(text), is_start=False, string='')]
-
- markers.sort(key=lambda t: t[:2])
-
- parts = []
- if strip_leading_indent:
- start = self.leading_indent
- else:
- start = 0
- original_start = start
-
- for marker in markers:
- text_part = text[start:marker.position]
- if escape_html:
- text_part = html.escape(text_part)
- parts.append(text_part)
- parts.append(marker.string)
-
- # Ensure that start >= leading_indent
- start = max(marker.position, original_start)
- return ''.join(parts)
-
-
-def markers_from_ranges(
- ranges: Iterable[RangeInLine],
- converter: Callable[[RangeInLine], Optional[Tuple[str, str]]],
-) -> List[MarkerInLine]:
- """
- Helper to create MarkerInLines given some RangeInLines.
- converter should be a function accepting a RangeInLine returning
- either None (which is ignored) or a pair of strings which
- are used to create two markers included in the returned list.
- """
- markers = []
- for rang in ranges:
- converted = converter(rang)
- if converted is None:
- continue
-
- start_string, end_string = converted
- if not (isinstance(start_string, str) and isinstance(end_string, str)):
- raise TypeError("converter should return None or a pair of strings")
-
- markers += [
- MarkerInLine(position=rang.start, is_start=True, string=start_string),
- MarkerInLine(position=rang.end, is_start=False, string=end_string),
- ]
- return markers
-
-
-def style_with_executing_node(style, modifier):
- from pygments.styles import get_style_by_name
- if isinstance(style, str):
- style = get_style_by_name(style)
-
- class NewStyle(style):
- for_executing_node = True
-
- styles = {
- **style.styles,
- **{
- k.ExecutingNode: v + " " + modifier
- for k, v in style.styles.items()
- }
- }
-
- return NewStyle
-
-
-class RepeatedFrames:
- """
- A sequence of consecutive stack frames which shouldn't be displayed because
- the same code and line number were repeated many times in the stack, e.g.
- because of deep recursion.
-
- Attributes:
- - frames: list of raw frame or traceback objects
- - frame_keys: list of tuples (frame.f_code, lineno) extracted from the frame objects.
- It's this information from the frames that is used to determine
- whether two frames should be considered similar (i.e. repeating).
- - description: A string briefly describing frame_keys
- """
- def __init__(
- self,
- frames: List[Union[FrameType, TracebackType]],
- frame_keys: List[Tuple[CodeType, int]],
- ):
- self.frames = frames
- self.frame_keys = frame_keys
-
- @cached_property
- def description(self) -> str:
- """
- A string briefly describing the repeated frames, e.g.
- my_function at line 10 (100 times)
- """
- counts = sorted(Counter(self.frame_keys).items(),
- key=lambda item: (-item[1], item[0][0].co_name))
- return ', '.join(
- '{name} at line {lineno} ({count} times)'.format(
- name=Source.for_filename(code.co_filename).code_qualname(code),
- lineno=lineno,
- count=count,
- )
- for (code, lineno), count in counts
- )
-
- def __repr__(self):
- return '<{self.__class__.__name__} {self.description}>'.format(self=self)
-
-
-class FrameInfo(object):
- """
- Information about a frame!
- Pass either a frame object or a traceback object,
- and optionally an Options object to configure.
-
- Or use the classmethod FrameInfo.stack_data() for an iterator of FrameInfo and
- RepeatedFrames objects.
-
- Attributes:
- - frame: an actual stack frame object, either frame_or_tb or frame_or_tb.tb_frame
- - options
- - code: frame.f_code
- - source: a Source object
- - filename: a hopefully absolute file path derived from code.co_filename
- - scope: the AST node of the innermost function, class or module being executed
- - lines: a list of Line/LineGap objects to display, determined by options
- - executing: an Executing object from the `executing` library, which has:
- - .node: the AST node being executed in this frame, or None if it's unknown
- - .statements: a set of one or more candidate statements (AST nodes, probably just one)
- currently being executed in this frame.
- - .code_qualname(): the __qualname__ of the function or class being executed,
- or just the code name.
-
- Properties returning one or more pieces of source code (ranges of lines):
- - scope_pieces: all the pieces in the scope
- - included_pieces: a subset of scope_pieces determined by options
- - executing_piece: the piece currently being executed in this frame
-
- Properties returning lists of Variable objects:
- - variables: all variables in the scope
- - variables_by_lineno: variables organised into lines
- - variables_in_lines: variables contained within FrameInfo.lines
- - variables_in_executing_piece: variables contained within FrameInfo.executing_piece
- """
- def __init__(
- self,
- frame_or_tb: Union[FrameType, TracebackType],
- options: Optional[Options] = None,
- ):
- self.executing = Source.executing(frame_or_tb)
- frame, self.lineno = frame_and_lineno(frame_or_tb)
- self.frame = frame
- self.code = frame.f_code
- self.options = options or Options() # type: Options
- self.source = self.executing.source # type: Source
-
- def __repr__(self):
- return "{self.__class__.__name__}({self.frame})".format(self=self)
-
- @classmethod
- def stack_data(
- cls,
- frame_or_tb: Union[FrameType, TracebackType],
- options: Optional[Options] = None,
- *,
- collapse_repeated_frames: bool = True
- ) -> Iterator[Union['FrameInfo', RepeatedFrames]]:
- """
- An iterator of FrameInfo and RepeatedFrames objects representing
- a full traceback or stack. Similar consecutive frames are collapsed into RepeatedFrames
- objects, so always check what type of object has been yielded.
-
- Pass either a frame object or a traceback object,
- and optionally an Options object to configure.
- """
- stack = list(iter_stack(frame_or_tb))
-
- # Reverse the stack from a frame so that it's in the same order
- # as the order from a traceback, which is the order of a printed
- # traceback when read top to bottom (most recent call last)
- if is_frame(frame_or_tb):
- stack = stack[::-1]
-
- def mapper(f):
- return cls(f, options)
-
- if not collapse_repeated_frames:
- yield from map(mapper, stack)
- return
-
- def _frame_key(x):
- frame, lineno = frame_and_lineno(x)
- return frame.f_code, lineno
-
- yield from collapse_repeated(
- stack,
- mapper=mapper,
- collapser=RepeatedFrames,
- key=_frame_key,
- )
-
- @cached_property
- def scope_pieces(self) -> List[range]:
- """
- All the pieces (ranges of lines) contained in this object's .scope,
- unless there is no .scope (because the source isn't valid Python syntax)
- in which case it returns all the pieces in the source file, each containing one line.
- """
- if not self.scope:
- return self.source.pieces
-
- scope_start, scope_end = line_range(self.scope)
- return [
- piece
- for piece in self.source.pieces
- if scope_start <= piece.start and piece.stop <= scope_end
- ]
-
- @cached_property
- def filename(self) -> str:
- """
- A hopefully absolute file path derived from .code.co_filename,
- the current working directory, and sys.path.
- Code based on ipython.
- """
- result = self.code.co_filename
-
- if (
- os.path.isabs(result) or
- (
- result.startswith("<") and
- result.endswith(">")
- )
- ):
- return result
-
- # Try to make the filename absolute by trying all
- # sys.path entries (which is also what linecache does)
- # as well as the current working directory
- for dirname in ["."] + list(sys.path):
- try:
- fullname = os.path.join(dirname, result)
- if os.path.isfile(fullname):
- return os.path.abspath(fullname)
- except Exception:
- # Just in case that sys.path contains very
- # strange entries...
- pass
-
- return result
-
- @cached_property
- def executing_piece(self) -> range:
- """
- The piece (range of lines) containing the line currently being executed
- by the interpreter in this frame.
- """
- return only(
- piece
- for piece in self.scope_pieces
- if self.lineno in piece
- )
-
- @cached_property
- def included_pieces(self) -> List[range]:
- """
- The list of pieces (ranges of lines) to display for this frame.
- Consists of .executing_piece, surrounding context pieces
- determined by .options.before and .options.after,
- and the function signature if a function is being executed and
- .options.include_signature is True (in which case this might not
- be a contiguous range of pieces).
- Always a subset of .scope_pieces.
- """
- scope_pieces = self.scope_pieces
- if not self.scope_pieces:
- return []
-
- pos = scope_pieces.index(self.executing_piece)
- pieces_start = max(0, pos - self.options.before)
- pieces_end = pos + 1 + self.options.after
- pieces = scope_pieces[pieces_start:pieces_end]
-
- if (
- self.options.include_signature
- and not self.code.co_name.startswith('<')
- and isinstance(self.scope, (ast.FunctionDef, ast.AsyncFunctionDef))
- and pieces_start > 0
- ):
- pieces.insert(0, scope_pieces[0])
-
- return pieces
-
- @cached_property
- def _executing_node_common_indent(self) -> int:
- """
- The common minimal indentation shared by the markers intended
- for an exception node that spans multiple lines.
-
- Intended to be used only internally.
- """
- indents = []
- lines = [line for line in self.lines if isinstance(line, Line)]
-
- for line in lines:
- for rang in line._raw_executing_node_ranges():
- begin_text = len(line.text) - len(line.text.lstrip())
- indent = max(rang.start, begin_text)
- indents.append(indent)
-
- return min(indents) if indents else 0
-
- @cached_property
- def lines(self) -> List[Union[Line, LineGap]]:
- """
- A list of lines to display, determined by options.
- The objects yielded either have type Line or are the singleton LINE_GAP.
- Always check the type that you're dealing with when iterating.
-
- LINE_GAP can be created in two ways:
- - by truncating a piece of context that's too long, determined by
- .options.max_lines_per_piece
- - immediately after the signature piece if Options.include_signature is true
- and the following piece isn't already part of the included pieces.
-
- The Line objects are all within the ranges from .included_pieces.
- """
- pieces = self.included_pieces
- if not pieces:
- return []
-
- result = []
- for i, piece in enumerate(pieces):
- if (
- i == 1
- and self.scope
- and pieces[0] == self.scope_pieces[0]
- and pieces[1] != self.scope_pieces[1]
- ):
- result.append(LINE_GAP)
-
- lines = [Line(self, i) for i in piece] # type: List[Line]
- if piece != self.executing_piece:
- lines = truncate(
- lines,
- max_length=self.options.max_lines_per_piece,
- middle=[LINE_GAP],
- )
- result.extend(lines)
-
- real_lines = [
- line
- for line in result
- if isinstance(line, Line)
- ]
-
- text = "\n".join(
- line.text
- for line in real_lines
- )
- dedented_lines = dedent(text).splitlines()
- leading_indent = len(real_lines[0].text) - len(dedented_lines[0])
- for line in real_lines:
- line.leading_indent = leading_indent
-
- return result
-
- @cached_property
- def scope(self) -> Optional[ast.AST]:
- """
- The AST node of the innermost function, class or module being executed.
- """
- if not self.source.tree or not self.executing.statements:
- return None
-
- stmt = list(self.executing.statements)[0]
- while True:
- # Get the parent first in case the original statement is already
- # a function definition, e.g. if we're calling a decorator
- # In that case we still want the surrounding scope, not that function
- stmt = stmt.parent
- if isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Module)):
- return stmt
-
- @cached_property
- def _pygmented_scope_lines(self) -> Optional[Tuple[int, List[str]]]:
- # noinspection PyUnresolvedReferences
- from pygments.formatters import HtmlFormatter
-
- formatter = self.options.pygments_formatter
- scope = self.scope
- assert_(formatter, ValueError("Must set a pygments formatter in Options"))
- assert_(scope)
-
- if isinstance(formatter, HtmlFormatter):
- formatter.nowrap = True
-
- atok = self.source.asttokens()
- node = self.executing.node
- if node and getattr(formatter.style, "for_executing_node", False):
- scope_start = atok.get_text_range(scope)[0]
- start, end = atok.get_text_range(node)
- start -= scope_start
- end -= scope_start
- ranges = [(start, end)]
- else:
- ranges = []
-
- code = atok.get_text(scope)
- lines = _pygmented_with_ranges(formatter, code, ranges)
-
- start_line = line_range(scope)[0]
-
- return start_line, lines
-
- @cached_property
- def variables(self) -> List[Variable]:
- """
- All Variable objects whose nodes are contained within .scope
- and whose values could be safely evaluated by pure_eval.
- """
- if not self.scope:
- return []
-
- evaluator = Evaluator.from_frame(self.frame)
- scope = self.scope
- node_values = [
- pair
- for pair in evaluator.find_expressions(scope)
- if is_expression_interesting(*pair)
- ] # type: List[Tuple[ast.AST, Any]]
-
- if isinstance(scope, (ast.FunctionDef, ast.AsyncFunctionDef)):
- for node in ast.walk(scope.args):
- if not isinstance(node, ast.arg):
- continue
- name = node.arg
- try:
- value = evaluator.names[name]
- except KeyError:
- pass
- else:
- node_values.append((node, value))
-
- # Group equivalent nodes together
- def get_text(n):
- if isinstance(n, ast.arg):
- return n.arg
- else:
- return self.source.asttokens().get_text(n)
-
- def normalise_node(n):
- try:
- # Add parens to avoid syntax errors for multiline expressions
- return ast.parse('(' + get_text(n) + ')')
- except Exception:
- return n
-
- grouped = group_by_key_func(
- node_values,
- lambda nv: ast.dump(normalise_node(nv[0])),
- )
-
- result = []
- for group in grouped.values():
- nodes, values = zip(*group)
- value = values[0]
- text = get_text(nodes[0])
- if not text:
- continue
- result.append(Variable(text, nodes, value))
-
- return result
-
- @cached_property
- def variables_by_lineno(self) -> Mapping[int, List[Tuple[Variable, ast.AST]]]:
- """
- A mapping from 1-based line numbers to lists of pairs:
- - A Variable object
- - A specific AST node from the variable's .nodes list that's
- in the line at that line number.
- """
- result = defaultdict(list)
- for var in self.variables:
- for node in var.nodes:
- for lineno in range(*line_range(node)):
- result[lineno].append((var, node))
- return result
-
- @cached_property
- def variables_in_lines(self) -> List[Variable]:
- """
- A list of Variable objects contained within the lines returned by .lines.
- """
- return unique_in_order(
- var
- for line in self.lines
- if isinstance(line, Line)
- for var, node in self.variables_by_lineno[line.lineno]
- )
-
- @cached_property
- def variables_in_executing_piece(self) -> List[Variable]:
- """
- A list of Variable objects contained within the lines
- in the range returned by .executing_piece.
- """
- return unique_in_order(
- var
- for lineno in self.executing_piece
- for var, node in self.variables_by_lineno[lineno]
- )
diff --git a/contrib/python/stack-data/stack_data/formatting.py b/contrib/python/stack-data/stack_data/formatting.py
deleted file mode 100644
index 7c5c0e5fc5..0000000000
--- a/contrib/python/stack-data/stack_data/formatting.py
+++ /dev/null
@@ -1,209 +0,0 @@
-import inspect
-import sys
-import traceback
-from types import FrameType, TracebackType
-from typing import Union, Iterable
-
-from stack_data import style_with_executing_node, Options, Line, FrameInfo, LINE_GAP, Variable, RepeatedFrames
-from stack_data.utils import assert_
-
-
-class Formatter:
- def __init__(
- self, *,
- options=None,
- pygmented=False,
- show_executing_node=True,
- pygments_formatter_cls=None,
- pygments_formatter_kwargs=None,
- pygments_style="monokai",
- executing_node_modifier="bg:#005080",
- executing_node_underline="^",
- current_line_indicator="-->",
- line_gap_string="(...)",
- show_variables=False,
- use_code_qualname=True,
- show_linenos=True,
- strip_leading_indent=True,
- html=False,
- chain=True,
- collapse_repeated_frames=True
- ):
- if options is None:
- options = Options()
-
- if pygmented and not options.pygments_formatter:
- if show_executing_node:
- pygments_style = style_with_executing_node(
- pygments_style, executing_node_modifier
- )
-
- if pygments_formatter_cls is None:
- from pygments.formatters.terminal256 import Terminal256Formatter \
- as pygments_formatter_cls
-
- options.pygments_formatter = pygments_formatter_cls(
- style=pygments_style,
- **pygments_formatter_kwargs or {},
- )
-
- self.pygmented = pygmented
- self.show_executing_node = show_executing_node
- assert_(
- len(executing_node_underline) == 1,
- ValueError("executing_node_underline must be a single character"),
- )
- self.executing_node_underline = executing_node_underline
- self.current_line_indicator = current_line_indicator or ""
- self.line_gap_string = line_gap_string
- self.show_variables = show_variables
- self.show_linenos = show_linenos
- self.use_code_qualname = use_code_qualname
- self.strip_leading_indent = strip_leading_indent
- self.html = html
- self.chain = chain
- self.options = options
- self.collapse_repeated_frames = collapse_repeated_frames
-
- def set_hook(self):
- def excepthook(_etype, evalue, _tb):
- self.print_exception(evalue)
-
- sys.excepthook = excepthook
-
- def print_exception(self, e=None, *, file=None):
- self.print_lines(self.format_exception(e), file=file)
-
- def print_stack(self, frame_or_tb=None, *, file=None):
- if frame_or_tb is None:
- frame_or_tb = inspect.currentframe().f_back
-
- self.print_lines(self.format_stack(frame_or_tb), file=file)
-
- def print_lines(self, lines, *, file=None):
- if file is None:
- file = sys.stderr
- for line in lines:
- print(line, file=file, end="")
-
- def format_exception(self, e=None) -> Iterable[str]:
- if e is None:
- e = sys.exc_info()[1]
-
- if self.chain:
- if e.__cause__ is not None:
- yield from self.format_exception(e.__cause__)
- yield traceback._cause_message
- elif (e.__context__ is not None
- and not e.__suppress_context__):
- yield from self.format_exception(e.__context__)
- yield traceback._context_message
-
- yield 'Traceback (most recent call last):\n'
- yield from self.format_stack(e.__traceback__)
- yield from traceback.format_exception_only(type(e), e)
-
- def format_stack(self, frame_or_tb=None) -> Iterable[str]:
- if frame_or_tb is None:
- frame_or_tb = inspect.currentframe().f_back
-
- yield from self.format_stack_data(
- FrameInfo.stack_data(
- frame_or_tb,
- self.options,
- collapse_repeated_frames=self.collapse_repeated_frames,
- )
- )
-
- def format_stack_data(
- self, stack: Iterable[Union[FrameInfo, RepeatedFrames]]
- ) -> Iterable[str]:
- for item in stack:
- if isinstance(item, FrameInfo):
- yield from self.format_frame(item)
- else:
- yield self.format_repeated_frames(item)
-
- def format_repeated_frames(self, repeated_frames: RepeatedFrames) -> str:
- return ' [... skipping similar frames: {}]\n'.format(
- repeated_frames.description
- )
-
- def format_frame(self, frame: Union[FrameInfo, FrameType, TracebackType]) -> Iterable[str]:
- if not isinstance(frame, FrameInfo):
- frame = FrameInfo(frame, self.options)
-
- yield self.format_frame_header(frame)
-
- for line in frame.lines:
- if isinstance(line, Line):
- yield self.format_line(line)
- else:
- assert_(line is LINE_GAP)
- yield self.line_gap_string + "\n"
-
- if self.show_variables:
- try:
- yield from self.format_variables(frame)
- except Exception:
- pass
-
- def format_frame_header(self, frame_info: FrameInfo) -> str:
- return ' File "{frame_info.filename}", line {frame_info.lineno}, in {name}\n'.format(
- frame_info=frame_info,
- name=(
- frame_info.executing.code_qualname()
- if self.use_code_qualname else
- frame_info.code.co_name
- ),
- )
-
- def format_line(self, line: Line) -> str:
- result = ""
- if self.current_line_indicator:
- if line.is_current:
- result = self.current_line_indicator
- else:
- result = " " * len(self.current_line_indicator)
- result += " "
-
- if self.show_linenos:
- result += "{:4} | ".format(line.lineno)
-
- result = result or " "
-
- prefix = result
-
- result += line.render(
- pygmented=self.pygmented,
- escape_html=self.html,
- strip_leading_indent=self.strip_leading_indent,
- ) + "\n"
-
- if self.show_executing_node and not self.pygmented:
- for line_range in line.executing_node_ranges:
- start = line_range.start - line.leading_indent
- end = line_range.end - line.leading_indent
- result += (
- " " * (start + len(prefix))
- + self.executing_node_underline * (end - start)
- + "\n"
- )
-
- return result
-
- def format_variables(self, frame_info: FrameInfo) -> Iterable[str]:
- for var in sorted(frame_info.variables, key=lambda v: v.name):
- try:
- yield self.format_variable(var) + "\n"
- except Exception:
- pass
-
- def format_variable(self, var: Variable) -> str:
- return "{} = {}".format(
- var.name,
- self.format_variable_value(var.value),
- )
-
- def format_variable_value(self, value) -> str:
- return repr(value)
diff --git a/contrib/python/stack-data/stack_data/serializing.py b/contrib/python/stack-data/stack_data/serializing.py
deleted file mode 100644
index fb67d2906a..0000000000
--- a/contrib/python/stack-data/stack_data/serializing.py
+++ /dev/null
@@ -1,200 +0,0 @@
-import inspect
-import logging
-import sys
-import traceback
-from collections import Counter
-from html import escape as escape_html
-from types import FrameType, TracebackType
-from typing import Union, Iterable, List
-
-from stack_data import (
- style_with_executing_node,
- Options,
- Line,
- FrameInfo,
- Variable,
- RepeatedFrames,
-)
-
-log = logging.getLogger(__name__)
-
-
-class Serializer:
- def __init__(
- self,
- *,
- options=None,
- pygmented=False,
- show_executing_node=True,
- pygments_formatter_cls=None,
- pygments_formatter_kwargs=None,
- pygments_style="monokai",
- executing_node_modifier="bg:#005080",
- use_code_qualname=True,
- strip_leading_indent=True,
- html=False,
- chain=True,
- collapse_repeated_frames=True,
- show_variables=False,
- ):
- if options is None:
- options = Options()
-
- if pygmented and not options.pygments_formatter:
- if show_executing_node:
- pygments_style = style_with_executing_node(
- pygments_style, executing_node_modifier
- )
-
- if pygments_formatter_cls is None:
- if html:
- from pygments.formatters.html import (
- HtmlFormatter as pygments_formatter_cls,
- )
- else:
- from pygments.formatters.terminal256 import (
- Terminal256Formatter as pygments_formatter_cls,
- )
-
- options.pygments_formatter = pygments_formatter_cls(
- style=pygments_style,
- **pygments_formatter_kwargs or {},
- )
-
- self.pygmented = pygmented
- self.use_code_qualname = use_code_qualname
- self.strip_leading_indent = strip_leading_indent
- self.html = html
- self.chain = chain
- self.options = options
- self.collapse_repeated_frames = collapse_repeated_frames
- self.show_variables = show_variables
-
- def format_exception(self, e=None) -> List[dict]:
- if e is None:
- e = sys.exc_info()[1]
-
- result = []
-
- if self.chain:
- if e.__cause__ is not None:
- result = self.format_exception(e.__cause__)
- result[-1]["tail"] = traceback._cause_message.strip()
- elif e.__context__ is not None and not e.__suppress_context__:
- result = self.format_exception(e.__context__)
- result[-1]["tail"] = traceback._context_message.strip()
-
- result.append(self.format_traceback_part(e))
- return result
-
- def format_traceback_part(self, e: BaseException) -> dict:
- return dict(
- frames=self.format_stack(e.__traceback__ or sys.exc_info()[2]),
- exception=dict(
- type=type(e).__name__,
- message=traceback._some_str(e),
- ),
- tail="",
- )
-
- def format_stack(self, frame_or_tb=None) -> List[dict]:
- if frame_or_tb is None:
- frame_or_tb = inspect.currentframe().f_back
-
- return list(
- self.format_stack_data(
- FrameInfo.stack_data(
- frame_or_tb,
- self.options,
- collapse_repeated_frames=self.collapse_repeated_frames,
- )
- )
- )
-
- def format_stack_data(
- self, stack: Iterable[Union[FrameInfo, RepeatedFrames]]
- ) -> Iterable[dict]:
- for item in stack:
- if isinstance(item, FrameInfo):
- if not self.should_include_frame(item):
- continue
- yield dict(type="frame", **self.format_frame(item))
- else:
- yield dict(type="repeated_frames", **self.format_repeated_frames(item))
-
- def format_repeated_frames(self, repeated_frames: RepeatedFrames) -> dict:
- counts = sorted(
- Counter(repeated_frames.frame_keys).items(),
- key=lambda item: (-item[1], item[0][0].co_name),
- )
- return dict(
- frames=[
- dict(
- name=code.co_name,
- lineno=lineno,
- count=count,
- )
- for (code, lineno), count in counts
- ]
- )
-
- def format_frame(self, frame: Union[FrameInfo, FrameType, TracebackType]) -> dict:
- if not isinstance(frame, FrameInfo):
- frame = FrameInfo(frame, self.options)
-
- result = dict(
- name=(
- frame.executing.code_qualname()
- if self.use_code_qualname
- else frame.code.co_name
- ),
- filename=frame.filename,
- lineno=frame.lineno,
- lines=list(self.format_lines(frame.lines)),
- )
- if self.show_variables:
- result["variables"] = list(self.format_variables(frame))
- return result
-
- def format_lines(self, lines):
- for line in lines:
- if isinstance(line, Line):
- yield dict(type="line", **self.format_line(line))
- else:
- yield dict(type="line_gap")
-
- def format_line(self, line: Line) -> dict:
- return dict(
- is_current=line.is_current,
- lineno=line.lineno,
- text=line.render(
- pygmented=self.pygmented,
- escape_html=self.html,
- strip_leading_indent=self.strip_leading_indent,
- ),
- )
-
- def format_variables(self, frame_info: FrameInfo) -> Iterable[dict]:
- try:
- for var in sorted(frame_info.variables, key=lambda v: v.name):
- yield self.format_variable(var)
- except Exception: # pragma: no cover
- log.exception("Error in getting frame variables")
-
- def format_variable(self, var: Variable) -> dict:
- return dict(
- name=self.format_variable_part(var.name),
- value=self.format_variable_part(self.format_variable_value(var.value)),
- )
-
- def format_variable_part(self, text):
- if self.html:
- return escape_html(text)
- else:
- return text
-
- def format_variable_value(self, value) -> str:
- return repr(value)
-
- def should_include_frame(self, frame_info: FrameInfo) -> bool:
- return True # pragma: no cover
diff --git a/contrib/python/stack-data/stack_data/utils.py b/contrib/python/stack-data/stack_data/utils.py
deleted file mode 100644
index 71d55eadc1..0000000000
--- a/contrib/python/stack-data/stack_data/utils.py
+++ /dev/null
@@ -1,172 +0,0 @@
-import ast
-import itertools
-import types
-from collections import OrderedDict, Counter, defaultdict
-from types import FrameType, TracebackType
-from typing import (
- Iterator, List, Tuple, Iterable, Callable, Union,
- TypeVar, Mapping,
-)
-
-T = TypeVar('T')
-R = TypeVar('R')
-
-
-def truncate(seq, max_length: int, middle):
- if len(seq) > max_length:
- right = (max_length - len(middle)) // 2
- left = max_length - len(middle) - right
- seq = seq[:left] + middle + seq[-right:]
- return seq
-
-
-def unique_in_order(it: Iterable[T]) -> List[T]:
- return list(OrderedDict.fromkeys(it))
-
-
-def line_range(node: ast.AST) -> Tuple[int, int]:
- """
- Returns a pair of numbers representing a half open range
- (i.e. suitable as arguments to the `range()` builtin)
- of line numbers of the given AST nodes.
- """
- try:
- return (
- node.first_token.start[0],
- node.last_token.end[0] + 1,
- )
- except AttributeError:
- return (
- node.lineno,
- getattr(node, "end_lineno", node.lineno) + 1,
- )
-
-
-def highlight_unique(lst: List[T]) -> Iterator[Tuple[T, bool]]:
- counts = Counter(lst)
-
- for is_common, group in itertools.groupby(lst, key=lambda x: counts[x] > 3):
- if is_common:
- group = list(group)
- highlighted = [False] * len(group)
-
- def highlight_index(f):
- try:
- i = f()
- except ValueError:
- return None
- highlighted[i] = True
- return i
-
- for item in set(group):
- first = highlight_index(lambda: group.index(item))
- if first is not None:
- highlight_index(lambda: group.index(item, first + 1))
- highlight_index(lambda: -1 - group[::-1].index(item))
- else:
- highlighted = itertools.repeat(True)
-
- yield from zip(group, highlighted)
-
-
-def identity(x: T) -> T:
- return x
-
-
-def collapse_repeated(lst, *, collapser, mapper=identity, key=identity):
- keyed = list(map(key, lst))
- for is_highlighted, group in itertools.groupby(
- zip(lst, highlight_unique(keyed)),
- key=lambda t: t[1][1],
- ):
- original_group, highlighted_group = zip(*group)
- if is_highlighted:
- yield from map(mapper, original_group)
- else:
- keyed_group, _ = zip(*highlighted_group)
- yield collapser(list(original_group), list(keyed_group))
-
-
-def is_frame(frame_or_tb: Union[FrameType, TracebackType]) -> bool:
- assert_(isinstance(frame_or_tb, (types.FrameType, types.TracebackType)))
- return isinstance(frame_or_tb, (types.FrameType,))
-
-
-def iter_stack(frame_or_tb: Union[FrameType, TracebackType]) -> Iterator[Union[FrameType, TracebackType]]:
- while frame_or_tb:
- yield frame_or_tb
- if is_frame(frame_or_tb):
- frame_or_tb = frame_or_tb.f_back
- else:
- frame_or_tb = frame_or_tb.tb_next
-
-
-def frame_and_lineno(frame_or_tb: Union[FrameType, TracebackType]) -> Tuple[FrameType, int]:
- if is_frame(frame_or_tb):
- return frame_or_tb, frame_or_tb.f_lineno
- else:
- return frame_or_tb.tb_frame, frame_or_tb.tb_lineno
-
-
-def group_by_key_func(iterable: Iterable[T], key_func: Callable[[T], R]) -> Mapping[R, List[T]]:
- # noinspection PyUnresolvedReferences
- """
- Create a dictionary from an iterable such that the keys are the result of evaluating a key function on elements
- of the iterable and the values are lists of elements all of which correspond to the key.
-
- >>> def si(d): return sorted(d.items())
- >>> si(group_by_key_func("a bb ccc d ee fff".split(), len))
- [(1, ['a', 'd']), (2, ['bb', 'ee']), (3, ['ccc', 'fff'])]
- >>> si(group_by_key_func([-1, 0, 1, 3, 6, 8, 9, 2], lambda x: x % 2))
- [(0, [0, 6, 8, 2]), (1, [-1, 1, 3, 9])]
- """
- result = defaultdict(list)
- for item in iterable:
- result[key_func(item)].append(item)
- return result
-
-
-class cached_property(object):
- """
- A property that is only computed once per instance and then replaces itself
- with an ordinary attribute. Deleting the attribute resets the property.
-
- Based on https://github.com/pydanny/cached-property/blob/master/cached_property.py
- """
-
- def __init__(self, func):
- self.__doc__ = func.__doc__
- self.func = func
-
- def cached_property_wrapper(self, obj, _cls):
- if obj is None:
- return self
-
- value = obj.__dict__[self.func.__name__] = self.func(obj)
- return value
-
- __get__ = cached_property_wrapper
-
-
-def _pygmented_with_ranges(formatter, code, ranges):
- import pygments
- from pygments.lexers import get_lexer_by_name
-
- class MyLexer(type(get_lexer_by_name("python3"))):
- def get_tokens(self, text):
- length = 0
- for ttype, value in super().get_tokens(text):
- if any(start <= length < end for start, end in ranges):
- ttype = ttype.ExecutingNode
- length += len(value)
- yield ttype, value
-
- lexer = MyLexer(stripnl=False)
- return pygments.highlight(code, lexer, formatter).splitlines()
-
-
-def assert_(condition, error=""):
- if not condition:
- if isinstance(error, str):
- error = AssertionError(error)
- raise error
diff --git a/contrib/python/stack-data/stack_data/version.py b/contrib/python/stack-data/stack_data/version.py
deleted file mode 100644
index 0404d81037..0000000000
--- a/contrib/python/stack-data/stack_data/version.py
+++ /dev/null
@@ -1 +0,0 @@
-__version__ = '0.3.0'