From e0e3e1717e3d33762ce61950504f9637a6e669ed Mon Sep 17 00:00:00 2001
From: nkozlovskiy <nmk@ydb.tech>
Date: Fri, 29 Sep 2023 12:24:06 +0300
Subject: add ydb deps

---
 .../python3/src/Lib/asyncio/format_helpers.py      | 76 ++++++++++++++++++++++
 1 file changed, 76 insertions(+)
 create mode 100644 contrib/tools/python3/src/Lib/asyncio/format_helpers.py

(limited to 'contrib/tools/python3/src/Lib/asyncio/format_helpers.py')

diff --git a/contrib/tools/python3/src/Lib/asyncio/format_helpers.py b/contrib/tools/python3/src/Lib/asyncio/format_helpers.py
new file mode 100644
index 0000000000..27d11fd4fa
--- /dev/null
+++ b/contrib/tools/python3/src/Lib/asyncio/format_helpers.py
@@ -0,0 +1,76 @@
+import functools
+import inspect
+import reprlib
+import sys
+import traceback
+
+from . import constants
+
+
+def _get_function_source(func):
+    func = inspect.unwrap(func)
+    if inspect.isfunction(func):
+        code = func.__code__
+        return (code.co_filename, code.co_firstlineno)
+    if isinstance(func, functools.partial):
+        return _get_function_source(func.func)
+    if isinstance(func, functools.partialmethod):
+        return _get_function_source(func.func)
+    return None
+
+
+def _format_callback_source(func, args):
+    func_repr = _format_callback(func, args, None)
+    source = _get_function_source(func)
+    if source:
+        func_repr += f' at {source[0]}:{source[1]}'
+    return func_repr
+
+
+def _format_args_and_kwargs(args, kwargs):
+    """Format function arguments and keyword arguments.
+
+    Special case for a single parameter: ('hello',) is formatted as ('hello').
+    """
+    # use reprlib to limit the length of the output
+    items = []
+    if args:
+        items.extend(reprlib.repr(arg) for arg in args)
+    if kwargs:
+        items.extend(f'{k}={reprlib.repr(v)}' for k, v in kwargs.items())
+    return '({})'.format(', '.join(items))
+
+
+def _format_callback(func, args, kwargs, suffix=''):
+    if isinstance(func, functools.partial):
+        suffix = _format_args_and_kwargs(args, kwargs) + suffix
+        return _format_callback(func.func, func.args, func.keywords, suffix)
+
+    if hasattr(func, '__qualname__') and func.__qualname__:
+        func_repr = func.__qualname__
+    elif hasattr(func, '__name__') and func.__name__:
+        func_repr = func.__name__
+    else:
+        func_repr = repr(func)
+
+    func_repr += _format_args_and_kwargs(args, kwargs)
+    if suffix:
+        func_repr += suffix
+    return func_repr
+
+
+def extract_stack(f=None, limit=None):
+    """Replacement for traceback.extract_stack() that only does the
+    necessary work for asyncio debug mode.
+    """
+    if f is None:
+        f = sys._getframe().f_back
+    if limit is None:
+        # Limit the amount of work to a reasonable amount, as extract_stack()
+        # can be called for each coroutine and future in debug mode.
+        limit = constants.DEBUG_STACK_DEPTH
+    stack = traceback.StackSummary.extract(traceback.walk_stack(f),
+                                           limit=limit,
+                                           lookup_lines=False)
+    stack.reverse()
+    return stack
-- 
cgit v1.2.3