diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /contrib/python/stack-data/stack_data/utils.py | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'contrib/python/stack-data/stack_data/utils.py')
-rw-r--r-- | contrib/python/stack-data/stack_data/utils.py | 172 |
1 files changed, 0 insertions, 172 deletions
diff --git a/contrib/python/stack-data/stack_data/utils.py b/contrib/python/stack-data/stack_data/utils.py deleted file mode 100644 index 71d55eadc1..0000000000 --- a/contrib/python/stack-data/stack_data/utils.py +++ /dev/null @@ -1,172 +0,0 @@ -import ast -import itertools -import types -from collections import OrderedDict, Counter, defaultdict -from types import FrameType, TracebackType -from typing import ( - Iterator, List, Tuple, Iterable, Callable, Union, - TypeVar, Mapping, -) - -T = TypeVar('T') -R = TypeVar('R') - - -def truncate(seq, max_length: int, middle): - if len(seq) > max_length: - right = (max_length - len(middle)) // 2 - left = max_length - len(middle) - right - seq = seq[:left] + middle + seq[-right:] - return seq - - -def unique_in_order(it: Iterable[T]) -> List[T]: - return list(OrderedDict.fromkeys(it)) - - -def line_range(node: ast.AST) -> Tuple[int, int]: - """ - Returns a pair of numbers representing a half open range - (i.e. suitable as arguments to the `range()` builtin) - of line numbers of the given AST nodes. - """ - try: - return ( - node.first_token.start[0], - node.last_token.end[0] + 1, - ) - except AttributeError: - return ( - node.lineno, - getattr(node, "end_lineno", node.lineno) + 1, - ) - - -def highlight_unique(lst: List[T]) -> Iterator[Tuple[T, bool]]: - counts = Counter(lst) - - for is_common, group in itertools.groupby(lst, key=lambda x: counts[x] > 3): - if is_common: - group = list(group) - highlighted = [False] * len(group) - - def highlight_index(f): - try: - i = f() - except ValueError: - return None - highlighted[i] = True - return i - - for item in set(group): - first = highlight_index(lambda: group.index(item)) - if first is not None: - highlight_index(lambda: group.index(item, first + 1)) - highlight_index(lambda: -1 - group[::-1].index(item)) - else: - highlighted = itertools.repeat(True) - - yield from zip(group, highlighted) - - -def identity(x: T) -> T: - return x - - -def collapse_repeated(lst, *, collapser, mapper=identity, key=identity): - keyed = list(map(key, lst)) - for is_highlighted, group in itertools.groupby( - zip(lst, highlight_unique(keyed)), - key=lambda t: t[1][1], - ): - original_group, highlighted_group = zip(*group) - if is_highlighted: - yield from map(mapper, original_group) - else: - keyed_group, _ = zip(*highlighted_group) - yield collapser(list(original_group), list(keyed_group)) - - -def is_frame(frame_or_tb: Union[FrameType, TracebackType]) -> bool: - assert_(isinstance(frame_or_tb, (types.FrameType, types.TracebackType))) - return isinstance(frame_or_tb, (types.FrameType,)) - - -def iter_stack(frame_or_tb: Union[FrameType, TracebackType]) -> Iterator[Union[FrameType, TracebackType]]: - while frame_or_tb: - yield frame_or_tb - if is_frame(frame_or_tb): - frame_or_tb = frame_or_tb.f_back - else: - frame_or_tb = frame_or_tb.tb_next - - -def frame_and_lineno(frame_or_tb: Union[FrameType, TracebackType]) -> Tuple[FrameType, int]: - if is_frame(frame_or_tb): - return frame_or_tb, frame_or_tb.f_lineno - else: - return frame_or_tb.tb_frame, frame_or_tb.tb_lineno - - -def group_by_key_func(iterable: Iterable[T], key_func: Callable[[T], R]) -> Mapping[R, List[T]]: - # noinspection PyUnresolvedReferences - """ - Create a dictionary from an iterable such that the keys are the result of evaluating a key function on elements - of the iterable and the values are lists of elements all of which correspond to the key. - - >>> def si(d): return sorted(d.items()) - >>> si(group_by_key_func("a bb ccc d ee fff".split(), len)) - [(1, ['a', 'd']), (2, ['bb', 'ee']), (3, ['ccc', 'fff'])] - >>> si(group_by_key_func([-1, 0, 1, 3, 6, 8, 9, 2], lambda x: x % 2)) - [(0, [0, 6, 8, 2]), (1, [-1, 1, 3, 9])] - """ - result = defaultdict(list) - for item in iterable: - result[key_func(item)].append(item) - return result - - -class cached_property(object): - """ - A property that is only computed once per instance and then replaces itself - with an ordinary attribute. Deleting the attribute resets the property. - - Based on https://github.com/pydanny/cached-property/blob/master/cached_property.py - """ - - def __init__(self, func): - self.__doc__ = func.__doc__ - self.func = func - - def cached_property_wrapper(self, obj, _cls): - if obj is None: - return self - - value = obj.__dict__[self.func.__name__] = self.func(obj) - return value - - __get__ = cached_property_wrapper - - -def _pygmented_with_ranges(formatter, code, ranges): - import pygments - from pygments.lexers import get_lexer_by_name - - class MyLexer(type(get_lexer_by_name("python3"))): - def get_tokens(self, text): - length = 0 - for ttype, value in super().get_tokens(text): - if any(start <= length < end for start, end in ranges): - ttype = ttype.ExecutingNode - length += len(value) - yield ttype, value - - lexer = MyLexer(stripnl=False) - return pygments.highlight(code, lexer, formatter).splitlines() - - -def assert_(condition, error=""): - if not condition: - if isinstance(error, str): - error = AssertionError(error) - raise error |