diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /contrib/python/stack-data/stack_data/core.py | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'contrib/python/stack-data/stack_data/core.py')
-rw-r--r-- | contrib/python/stack-data/stack_data/core.py | 882 |
1 files changed, 0 insertions, 882 deletions
diff --git a/contrib/python/stack-data/stack_data/core.py b/contrib/python/stack-data/stack_data/core.py deleted file mode 100644 index 97313fe333..0000000000 --- a/contrib/python/stack-data/stack_data/core.py +++ /dev/null @@ -1,882 +0,0 @@ -import ast -import html -import os -import sys -from collections import defaultdict, Counter -from textwrap import dedent -from types import FrameType, CodeType, TracebackType -from typing import ( - Iterator, List, Tuple, Optional, NamedTuple, - Any, Iterable, Callable, Union, - Sequence) -from typing import Mapping - -import executing -from asttokens.util import Token -from executing import only -from pure_eval import Evaluator, is_expression_interesting -from stack_data.utils import ( - truncate, unique_in_order, line_range, - frame_and_lineno, iter_stack, collapse_repeated, group_by_key_func, - cached_property, is_frame, _pygmented_with_ranges, assert_) - -RangeInLine = NamedTuple('RangeInLine', - [('start', int), - ('end', int), - ('data', Any)]) -RangeInLine.__doc__ = """ -Represents a range of characters within one line of source code, -and some associated data. - -Typically this will be converted to a pair of markers by markers_from_ranges. -""" - -MarkerInLine = NamedTuple('MarkerInLine', - [('position', int), - ('is_start', bool), - ('string', str)]) -MarkerInLine.__doc__ = """ -A string that is meant to be inserted at a given position in a line of source code. -For example, this could be an ANSI code or the opening or closing of an HTML tag. -is_start should be True if this is the first of a pair such as the opening of an HTML tag. -This will help to sort and insert markers correctly. - -Typically this would be created from a RangeInLine by markers_from_ranges. -Then use Line.render to insert the markers correctly. -""" - - -class Variable( - NamedTuple('_Variable', - [('name', str), - ('nodes', Sequence[ast.AST]), - ('value', Any)]) -): - """ - An expression that appears one or more times in source code and its associated value. - This will usually be a variable but it can be any expression evaluated by pure_eval. - - name is the source text of the expression. - - nodes is a list of equivalent nodes representing the same expression. - - value is the safely evaluated value of the expression. - """ - __hash__ = object.__hash__ - __eq__ = object.__eq__ - - -class Source(executing.Source): - """ - The source code of a single file and associated metadata. - - In addition to the attributes from the base class executing.Source, - if .tree is not None, meaning this is valid Python code, objects have: - - pieces: a list of Piece objects - - tokens_by_lineno: a defaultdict(list) mapping line numbers to lists of tokens. - - Don't construct this class. Get an instance from frame_info.source. - """ - - def __init__(self, *args, **kwargs): - super(Source, self).__init__(*args, **kwargs) - if self.tree: - self.asttokens() - - @cached_property - def pieces(self) -> List[range]: - if not self.tree: - return [ - range(i, i + 1) - for i in range(1, len(self.lines) + 1) - ] - return list(self._clean_pieces()) - - @cached_property - def tokens_by_lineno(self) -> Mapping[int, List[Token]]: - if not self.tree: - raise AttributeError("This file doesn't contain valid Python, so .tokens_by_lineno doesn't exist") - return group_by_key_func( - self.asttokens().tokens, - lambda tok: tok.start[0], - ) - - def _clean_pieces(self) -> Iterator[range]: - pieces = self._raw_split_into_pieces(self.tree, 1, len(self.lines) + 1) - pieces = [ - (start, end) - for (start, end) in pieces - if end > start - ] - - starts = [start for start, end in pieces[1:]] - ends = [end for start, end in pieces[:-1]] - if starts != ends: - joins = list(map(set, zip(starts, ends))) - mismatches = [s for s in joins if len(s) > 1] - raise AssertionError("Pieces mismatches: %s" % mismatches) - - def is_blank(i): - try: - return not self.lines[i - 1].strip() - except IndexError: - return False - - for start, end in pieces: - while is_blank(start): - start += 1 - while is_blank(end - 1): - end -= 1 - if start < end: - yield range(start, end) - - def _raw_split_into_pieces( - self, - stmt: ast.AST, - start: int, - end: int, - ) -> Iterator[Tuple[int, int]]: - self.asttokens() - - for name, body in ast.iter_fields(stmt): - if ( - isinstance(body, list) and body and - isinstance(body[0], (ast.stmt, ast.ExceptHandler)) - ): - for rang, group in sorted(group_by_key_func(body, line_range).items()): - sub_stmt = group[0] - for inner_start, inner_end in self._raw_split_into_pieces(sub_stmt, *rang): - if start < inner_start: - yield start, inner_start - if inner_start < inner_end: - yield inner_start, inner_end - start = inner_end - - yield start, end - - -class Options: - """ - Configuration for FrameInfo, either in the constructor or the .stack_data classmethod. - These all determine which Lines and gaps are produced by FrameInfo.lines. - - before and after are the number of pieces of context to include in a frame - in addition to the executing piece. - - include_signature is whether to include the function signature as a piece in a frame. - - If a piece (other than the executing piece) has more than max_lines_per_piece lines, - it will be truncated with a gap in the middle. - """ - def __init__( - self, *, - before: int = 3, - after: int = 1, - include_signature: bool = False, - max_lines_per_piece: int = 6, - pygments_formatter=None - ): - self.before = before - self.after = after - self.include_signature = include_signature - self.max_lines_per_piece = max_lines_per_piece - self.pygments_formatter = pygments_formatter - - def __repr__(self): - keys = sorted(self.__dict__) - items = ("{}={!r}".format(k, self.__dict__[k]) for k in keys) - return "{}({})".format(type(self).__name__, ", ".join(items)) - - -class LineGap(object): - """ - A singleton representing one or more lines of source code that were skipped - in FrameInfo.lines. - - LINE_GAP can be created in two ways: - - by truncating a piece of context that's too long. - - immediately after the signature piece if Options.include_signature is true - and the following piece isn't already part of the included pieces. - """ - def __repr__(self): - return "LINE_GAP" - - -LINE_GAP = LineGap() - - -class Line(object): - """ - A single line of source code for a particular stack frame. - - Typically this is obtained from FrameInfo.lines. - Since that list may also contain LINE_GAP, you should first check - that this is really a Line before using it. - - Attributes: - - frame_info - - lineno: the 1-based line number within the file - - text: the raw source of this line. For displaying text, see .render() instead. - - leading_indent: the number of leading spaces that should probably be stripped. - This attribute is set within FrameInfo.lines. If you construct this class - directly you should probably set it manually (at least to 0). - - is_current: whether this is the line currently being executed by the interpreter - within this frame. - - tokens: a list of source tokens in this line - - There are several helpers for constructing RangeInLines which can be converted to markers - using markers_from_ranges which can be passed to .render(): - - token_ranges - - variable_ranges - - executing_node_ranges - - range_from_node - """ - def __init__( - self, - frame_info: 'FrameInfo', - lineno: int, - ): - self.frame_info = frame_info - self.lineno = lineno - self.text = frame_info.source.lines[lineno - 1] # type: str - self.leading_indent = None # type: Optional[int] - - def __repr__(self): - return "<{self.__class__.__name__} {self.lineno} (current={self.is_current}) " \ - "{self.text!r} of {self.frame_info.filename}>".format(self=self) - - @property - def is_current(self) -> bool: - """ - Whether this is the line currently being executed by the interpreter - within this frame. - """ - return self.lineno == self.frame_info.lineno - - @property - def tokens(self) -> List[Token]: - """ - A list of source tokens in this line. - The tokens are Token objects from asttokens: - https://asttokens.readthedocs.io/en/latest/api-index.html#asttokens.util.Token - """ - return self.frame_info.source.tokens_by_lineno[self.lineno] - - @cached_property - def token_ranges(self) -> List[RangeInLine]: - """ - A list of RangeInLines for each token in .tokens, - where range.data is a Token object from asttokens: - https://asttokens.readthedocs.io/en/latest/api-index.html#asttokens.util.Token - """ - return [ - RangeInLine( - token.start[1], - token.end[1], - token, - ) - for token in self.tokens - ] - - @cached_property - def variable_ranges(self) -> List[RangeInLine]: - """ - A list of RangeInLines for each Variable that appears at least partially in this line. - The data attribute of the range is a pair (variable, node) where node is the particular - AST node from the list variable.nodes that corresponds to this range. - """ - return [ - self.range_from_node(node, (variable, node)) - for variable, node in self.frame_info.variables_by_lineno[self.lineno] - ] - - @cached_property - def executing_node_ranges(self) -> List[RangeInLine]: - """ - A list of one or zero RangeInLines for the executing node of this frame. - The list will have one element if the node can be found and it overlaps this line. - """ - return self._raw_executing_node_ranges( - self.frame_info._executing_node_common_indent - ) - - def _raw_executing_node_ranges(self, common_indent=0) -> List[RangeInLine]: - ex = self.frame_info.executing - node = ex.node - if node: - rang = self.range_from_node(node, ex, common_indent) - if rang: - return [rang] - return [] - - def range_from_node( - self, node: ast.AST, data: Any, common_indent: int = 0 - ) -> Optional[RangeInLine]: - """ - If the given node overlaps with this line, return a RangeInLine - with the correct start and end and the given data. - Otherwise, return None. - """ - start, end = line_range(node) - end -= 1 - if not (start <= self.lineno <= end): - return None - if start == self.lineno: - try: - range_start = node.first_token.start[1] - except AttributeError: - range_start = node.col_offset - else: - range_start = 0 - - range_start = max(range_start, common_indent) - - if end == self.lineno: - try: - range_end = node.last_token.end[1] - except AttributeError: - try: - range_end = node.end_col_offset - except AttributeError: - return None - else: - range_end = len(self.text) - - return RangeInLine(range_start, range_end, data) - - def render( - self, - markers: Iterable[MarkerInLine] = (), - *, - strip_leading_indent: bool = True, - pygmented: bool = False, - escape_html: bool = False - ) -> str: - """ - Produces a string for display consisting of .text - with the .strings of each marker inserted at the correct positions. - If strip_leading_indent is true (the default) then leading spaces - common to all lines in this frame will be excluded. - """ - if pygmented and self.frame_info.scope: - assert_(not markers, ValueError("Cannot use pygmented with markers")) - start_line, lines = self.frame_info._pygmented_scope_lines - result = lines[self.lineno - start_line] - if strip_leading_indent: - result = result.replace(self.text[:self.leading_indent], "", 1) - return result - - text = self.text - - # This just makes the loop below simpler - markers = list(markers) + [MarkerInLine(position=len(text), is_start=False, string='')] - - markers.sort(key=lambda t: t[:2]) - - parts = [] - if strip_leading_indent: - start = self.leading_indent - else: - start = 0 - original_start = start - - for marker in markers: - text_part = text[start:marker.position] - if escape_html: - text_part = html.escape(text_part) - parts.append(text_part) - parts.append(marker.string) - - # Ensure that start >= leading_indent - start = max(marker.position, original_start) - return ''.join(parts) - - -def markers_from_ranges( - ranges: Iterable[RangeInLine], - converter: Callable[[RangeInLine], Optional[Tuple[str, str]]], -) -> List[MarkerInLine]: - """ - Helper to create MarkerInLines given some RangeInLines. - converter should be a function accepting a RangeInLine returning - either None (which is ignored) or a pair of strings which - are used to create two markers included in the returned list. - """ - markers = [] - for rang in ranges: - converted = converter(rang) - if converted is None: - continue - - start_string, end_string = converted - if not (isinstance(start_string, str) and isinstance(end_string, str)): - raise TypeError("converter should return None or a pair of strings") - - markers += [ - MarkerInLine(position=rang.start, is_start=True, string=start_string), - MarkerInLine(position=rang.end, is_start=False, string=end_string), - ] - return markers - - -def style_with_executing_node(style, modifier): - from pygments.styles import get_style_by_name - if isinstance(style, str): - style = get_style_by_name(style) - - class NewStyle(style): - for_executing_node = True - - styles = { - **style.styles, - **{ - k.ExecutingNode: v + " " + modifier - for k, v in style.styles.items() - } - } - - return NewStyle - - -class RepeatedFrames: - """ - A sequence of consecutive stack frames which shouldn't be displayed because - the same code and line number were repeated many times in the stack, e.g. - because of deep recursion. - - Attributes: - - frames: list of raw frame or traceback objects - - frame_keys: list of tuples (frame.f_code, lineno) extracted from the frame objects. - It's this information from the frames that is used to determine - whether two frames should be considered similar (i.e. repeating). - - description: A string briefly describing frame_keys - """ - def __init__( - self, - frames: List[Union[FrameType, TracebackType]], - frame_keys: List[Tuple[CodeType, int]], - ): - self.frames = frames - self.frame_keys = frame_keys - - @cached_property - def description(self) -> str: - """ - A string briefly describing the repeated frames, e.g. - my_function at line 10 (100 times) - """ - counts = sorted(Counter(self.frame_keys).items(), - key=lambda item: (-item[1], item[0][0].co_name)) - return ', '.join( - '{name} at line {lineno} ({count} times)'.format( - name=Source.for_filename(code.co_filename).code_qualname(code), - lineno=lineno, - count=count, - ) - for (code, lineno), count in counts - ) - - def __repr__(self): - return '<{self.__class__.__name__} {self.description}>'.format(self=self) - - -class FrameInfo(object): - """ - Information about a frame! - Pass either a frame object or a traceback object, - and optionally an Options object to configure. - - Or use the classmethod FrameInfo.stack_data() for an iterator of FrameInfo and - RepeatedFrames objects. - - Attributes: - - frame: an actual stack frame object, either frame_or_tb or frame_or_tb.tb_frame - - options - - code: frame.f_code - - source: a Source object - - filename: a hopefully absolute file path derived from code.co_filename - - scope: the AST node of the innermost function, class or module being executed - - lines: a list of Line/LineGap objects to display, determined by options - - executing: an Executing object from the `executing` library, which has: - - .node: the AST node being executed in this frame, or None if it's unknown - - .statements: a set of one or more candidate statements (AST nodes, probably just one) - currently being executed in this frame. - - .code_qualname(): the __qualname__ of the function or class being executed, - or just the code name. - - Properties returning one or more pieces of source code (ranges of lines): - - scope_pieces: all the pieces in the scope - - included_pieces: a subset of scope_pieces determined by options - - executing_piece: the piece currently being executed in this frame - - Properties returning lists of Variable objects: - - variables: all variables in the scope - - variables_by_lineno: variables organised into lines - - variables_in_lines: variables contained within FrameInfo.lines - - variables_in_executing_piece: variables contained within FrameInfo.executing_piece - """ - def __init__( - self, - frame_or_tb: Union[FrameType, TracebackType], - options: Optional[Options] = None, - ): - self.executing = Source.executing(frame_or_tb) - frame, self.lineno = frame_and_lineno(frame_or_tb) - self.frame = frame - self.code = frame.f_code - self.options = options or Options() # type: Options - self.source = self.executing.source # type: Source - - def __repr__(self): - return "{self.__class__.__name__}({self.frame})".format(self=self) - - @classmethod - def stack_data( - cls, - frame_or_tb: Union[FrameType, TracebackType], - options: Optional[Options] = None, - *, - collapse_repeated_frames: bool = True - ) -> Iterator[Union['FrameInfo', RepeatedFrames]]: - """ - An iterator of FrameInfo and RepeatedFrames objects representing - a full traceback or stack. Similar consecutive frames are collapsed into RepeatedFrames - objects, so always check what type of object has been yielded. - - Pass either a frame object or a traceback object, - and optionally an Options object to configure. - """ - stack = list(iter_stack(frame_or_tb)) - - # Reverse the stack from a frame so that it's in the same order - # as the order from a traceback, which is the order of a printed - # traceback when read top to bottom (most recent call last) - if is_frame(frame_or_tb): - stack = stack[::-1] - - def mapper(f): - return cls(f, options) - - if not collapse_repeated_frames: - yield from map(mapper, stack) - return - - def _frame_key(x): - frame, lineno = frame_and_lineno(x) - return frame.f_code, lineno - - yield from collapse_repeated( - stack, - mapper=mapper, - collapser=RepeatedFrames, - key=_frame_key, - ) - - @cached_property - def scope_pieces(self) -> List[range]: - """ - All the pieces (ranges of lines) contained in this object's .scope, - unless there is no .scope (because the source isn't valid Python syntax) - in which case it returns all the pieces in the source file, each containing one line. - """ - if not self.scope: - return self.source.pieces - - scope_start, scope_end = line_range(self.scope) - return [ - piece - for piece in self.source.pieces - if scope_start <= piece.start and piece.stop <= scope_end - ] - - @cached_property - def filename(self) -> str: - """ - A hopefully absolute file path derived from .code.co_filename, - the current working directory, and sys.path. - Code based on ipython. - """ - result = self.code.co_filename - - if ( - os.path.isabs(result) or - ( - result.startswith("<") and - result.endswith(">") - ) - ): - return result - - # Try to make the filename absolute by trying all - # sys.path entries (which is also what linecache does) - # as well as the current working directory - for dirname in ["."] + list(sys.path): - try: - fullname = os.path.join(dirname, result) - if os.path.isfile(fullname): - return os.path.abspath(fullname) - except Exception: - # Just in case that sys.path contains very - # strange entries... - pass - - return result - - @cached_property - def executing_piece(self) -> range: - """ - The piece (range of lines) containing the line currently being executed - by the interpreter in this frame. - """ - return only( - piece - for piece in self.scope_pieces - if self.lineno in piece - ) - - @cached_property - def included_pieces(self) -> List[range]: - """ - The list of pieces (ranges of lines) to display for this frame. - Consists of .executing_piece, surrounding context pieces - determined by .options.before and .options.after, - and the function signature if a function is being executed and - .options.include_signature is True (in which case this might not - be a contiguous range of pieces). - Always a subset of .scope_pieces. - """ - scope_pieces = self.scope_pieces - if not self.scope_pieces: - return [] - - pos = scope_pieces.index(self.executing_piece) - pieces_start = max(0, pos - self.options.before) - pieces_end = pos + 1 + self.options.after - pieces = scope_pieces[pieces_start:pieces_end] - - if ( - self.options.include_signature - and not self.code.co_name.startswith('<') - and isinstance(self.scope, (ast.FunctionDef, ast.AsyncFunctionDef)) - and pieces_start > 0 - ): - pieces.insert(0, scope_pieces[0]) - - return pieces - - @cached_property - def _executing_node_common_indent(self) -> int: - """ - The common minimal indentation shared by the markers intended - for an exception node that spans multiple lines. - - Intended to be used only internally. - """ - indents = [] - lines = [line for line in self.lines if isinstance(line, Line)] - - for line in lines: - for rang in line._raw_executing_node_ranges(): - begin_text = len(line.text) - len(line.text.lstrip()) - indent = max(rang.start, begin_text) - indents.append(indent) - - return min(indents) if indents else 0 - - @cached_property - def lines(self) -> List[Union[Line, LineGap]]: - """ - A list of lines to display, determined by options. - The objects yielded either have type Line or are the singleton LINE_GAP. - Always check the type that you're dealing with when iterating. - - LINE_GAP can be created in two ways: - - by truncating a piece of context that's too long, determined by - .options.max_lines_per_piece - - immediately after the signature piece if Options.include_signature is true - and the following piece isn't already part of the included pieces. - - The Line objects are all within the ranges from .included_pieces. - """ - pieces = self.included_pieces - if not pieces: - return [] - - result = [] - for i, piece in enumerate(pieces): - if ( - i == 1 - and self.scope - and pieces[0] == self.scope_pieces[0] - and pieces[1] != self.scope_pieces[1] - ): - result.append(LINE_GAP) - - lines = [Line(self, i) for i in piece] # type: List[Line] - if piece != self.executing_piece: - lines = truncate( - lines, - max_length=self.options.max_lines_per_piece, - middle=[LINE_GAP], - ) - result.extend(lines) - - real_lines = [ - line - for line in result - if isinstance(line, Line) - ] - - text = "\n".join( - line.text - for line in real_lines - ) - dedented_lines = dedent(text).splitlines() - leading_indent = len(real_lines[0].text) - len(dedented_lines[0]) - for line in real_lines: - line.leading_indent = leading_indent - - return result - - @cached_property - def scope(self) -> Optional[ast.AST]: - """ - The AST node of the innermost function, class or module being executed. - """ - if not self.source.tree or not self.executing.statements: - return None - - stmt = list(self.executing.statements)[0] - while True: - # Get the parent first in case the original statement is already - # a function definition, e.g. if we're calling a decorator - # In that case we still want the surrounding scope, not that function - stmt = stmt.parent - if isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Module)): - return stmt - - @cached_property - def _pygmented_scope_lines(self) -> Optional[Tuple[int, List[str]]]: - # noinspection PyUnresolvedReferences - from pygments.formatters import HtmlFormatter - - formatter = self.options.pygments_formatter - scope = self.scope - assert_(formatter, ValueError("Must set a pygments formatter in Options")) - assert_(scope) - - if isinstance(formatter, HtmlFormatter): - formatter.nowrap = True - - atok = self.source.asttokens() - node = self.executing.node - if node and getattr(formatter.style, "for_executing_node", False): - scope_start = atok.get_text_range(scope)[0] - start, end = atok.get_text_range(node) - start -= scope_start - end -= scope_start - ranges = [(start, end)] - else: - ranges = [] - - code = atok.get_text(scope) - lines = _pygmented_with_ranges(formatter, code, ranges) - - start_line = line_range(scope)[0] - - return start_line, lines - - @cached_property - def variables(self) -> List[Variable]: - """ - All Variable objects whose nodes are contained within .scope - and whose values could be safely evaluated by pure_eval. - """ - if not self.scope: - return [] - - evaluator = Evaluator.from_frame(self.frame) - scope = self.scope - node_values = [ - pair - for pair in evaluator.find_expressions(scope) - if is_expression_interesting(*pair) - ] # type: List[Tuple[ast.AST, Any]] - - if isinstance(scope, (ast.FunctionDef, ast.AsyncFunctionDef)): - for node in ast.walk(scope.args): - if not isinstance(node, ast.arg): - continue - name = node.arg - try: - value = evaluator.names[name] - except KeyError: - pass - else: - node_values.append((node, value)) - - # Group equivalent nodes together - def get_text(n): - if isinstance(n, ast.arg): - return n.arg - else: - return self.source.asttokens().get_text(n) - - def normalise_node(n): - try: - # Add parens to avoid syntax errors for multiline expressions - return ast.parse('(' + get_text(n) + ')') - except Exception: - return n - - grouped = group_by_key_func( - node_values, - lambda nv: ast.dump(normalise_node(nv[0])), - ) - - result = [] - for group in grouped.values(): - nodes, values = zip(*group) - value = values[0] - text = get_text(nodes[0]) - if not text: - continue - result.append(Variable(text, nodes, value)) - - return result - - @cached_property - def variables_by_lineno(self) -> Mapping[int, List[Tuple[Variable, ast.AST]]]: - """ - A mapping from 1-based line numbers to lists of pairs: - - A Variable object - - A specific AST node from the variable's .nodes list that's - in the line at that line number. - """ - result = defaultdict(list) - for var in self.variables: - for node in var.nodes: - for lineno in range(*line_range(node)): - result[lineno].append((var, node)) - return result - - @cached_property - def variables_in_lines(self) -> List[Variable]: - """ - A list of Variable objects contained within the lines returned by .lines. - """ - return unique_in_order( - var - for line in self.lines - if isinstance(line, Line) - for var, node in self.variables_by_lineno[line.lineno] - ) - - @cached_property - def variables_in_executing_piece(self) -> List[Variable]: - """ - A list of Variable objects contained within the lines - in the range returned by .executing_piece. - """ - return unique_in_order( - var - for lineno in self.executing_piece - for var, node in self.variables_by_lineno[lineno] - ) |