diff options
author | shadchin <shadchin@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
commit | 2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch) | |
tree | 012bb94d777798f1f56ac1cec429509766d05181 /contrib/python/pytest/py3/_pytest/compat.py | |
parent | 6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff) | |
download | ydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz |
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/compat.py')
-rw-r--r-- | contrib/python/pytest/py3/_pytest/compat.py | 484 |
1 files changed, 242 insertions, 242 deletions
diff --git a/contrib/python/pytest/py3/_pytest/compat.py b/contrib/python/pytest/py3/_pytest/compat.py index c23cc962ce..8c74996c1d 100644 --- a/contrib/python/pytest/py3/_pytest/compat.py +++ b/contrib/python/pytest/py3/_pytest/compat.py @@ -1,51 +1,51 @@ -"""Python version compatibility code.""" -import enum +"""Python version compatibility code.""" +import enum import functools import inspect import re import sys from contextlib import contextmanager -from inspect import Parameter -from inspect import signature -from pathlib import Path -from typing import Any -from typing import Callable -from typing import Generic -from typing import Optional -from typing import Tuple -from typing import TYPE_CHECKING -from typing import TypeVar -from typing import Union - -import attr +from inspect import Parameter +from inspect import signature +from pathlib import Path +from typing import Any +from typing import Callable +from typing import Generic +from typing import Optional +from typing import Tuple +from typing import TYPE_CHECKING +from typing import TypeVar +from typing import Union + +import attr from _pytest.outcomes import fail from _pytest.outcomes import TEST_OUTCOME -if TYPE_CHECKING: - from typing import NoReturn - from typing_extensions import Final +if TYPE_CHECKING: + from typing import NoReturn + from typing_extensions import Final -_T = TypeVar("_T") -_S = TypeVar("_S") +_T = TypeVar("_T") +_S = TypeVar("_S") -# fmt: off -# Singleton type for NOTSET, as described in: -# https://www.python.org/dev/peps/pep-0484/#support-for-singleton-types-in-unions -class NotSetType(enum.Enum): - token = 0 -NOTSET: "Final" = NotSetType.token # noqa: E305 -# fmt: on +# fmt: off +# Singleton type for NOTSET, as described in: +# https://www.python.org/dev/peps/pep-0484/#support-for-singleton-types-in-unions +class NotSetType(enum.Enum): + token = 0 +NOTSET: "Final" = NotSetType.token # noqa: E305 +# fmt: on -if sys.version_info >= (3, 8): - from importlib import metadata as importlib_metadata -else: - import importlib_metadata # noqa: F401 - - -def _format_args(func: Callable[..., Any]) -> str: +if sys.version_info >= (3, 8): + from importlib import metadata as importlib_metadata +else: + import importlib_metadata # noqa: F401 + + +def _format_args(func: Callable[..., Any]) -> str: return str(signature(func)) @@ -53,87 +53,87 @@ def _format_args(func: Callable[..., Any]) -> str: REGEX_TYPE = type(re.compile("")) -def is_generator(func: object) -> bool: +def is_generator(func: object) -> bool: genfunc = inspect.isgeneratorfunction(func) return genfunc and not iscoroutinefunction(func) -def iscoroutinefunction(func: object) -> bool: - """Return True if func is a coroutine function (a function defined with async - def syntax, and doesn't contain yield), or a function decorated with - @asyncio.coroutine. +def iscoroutinefunction(func: object) -> bool: + """Return True if func is a coroutine function (a function defined with async + def syntax, and doesn't contain yield), or a function decorated with + @asyncio.coroutine. - Note: copied and modified from Python 3.5's builtin couroutines.py to avoid - importing asyncio directly, which in turns also initializes the "logging" - module as a side-effect (see issue #8). + Note: copied and modified from Python 3.5's builtin couroutines.py to avoid + importing asyncio directly, which in turns also initializes the "logging" + module as a side-effect (see issue #8). """ - return inspect.iscoroutinefunction(func) or getattr(func, "_is_coroutine", False) - + return inspect.iscoroutinefunction(func) or getattr(func, "_is_coroutine", False) + + +def is_async_function(func: object) -> bool: + """Return True if the given function seems to be an async function or + an async generator.""" + return iscoroutinefunction(func) or inspect.isasyncgenfunction(func) -def is_async_function(func: object) -> bool: - """Return True if the given function seems to be an async function or - an async generator.""" - return iscoroutinefunction(func) or inspect.isasyncgenfunction(func) - -def getlocation(function, curdir: Optional[str] = None) -> str: +def getlocation(function, curdir: Optional[str] = None) -> str: function = get_real_func(function) - fn = Path(inspect.getfile(function)) + fn = Path(inspect.getfile(function)) lineno = function.__code__.co_firstlineno - if curdir is not None: - try: - relfn = fn.relative_to(curdir) - except ValueError: - pass - else: - return "%s:%d" % (relfn, lineno + 1) + if curdir is not None: + try: + relfn = fn.relative_to(curdir) + except ValueError: + pass + else: + return "%s:%d" % (relfn, lineno + 1) return "%s:%d" % (fn, lineno + 1) -def num_mock_patch_args(function) -> int: - """Return number of arguments used up by mock arguments (if any).""" +def num_mock_patch_args(function) -> int: + """Return number of arguments used up by mock arguments (if any).""" patchings = getattr(function, "patchings", None) if not patchings: return 0 - mock_sentinel = getattr(sys.modules.get("mock"), "DEFAULT", object()) - ut_mock_sentinel = getattr(sys.modules.get("unittest.mock"), "DEFAULT", object()) - - return len( - [ - p - for p in patchings - if not p.attribute_name - and (p.new is mock_sentinel or p.new is ut_mock_sentinel) - ] - ) - - -def getfuncargnames( - function: Callable[..., Any], - *, - name: str = "", - is_method: bool = False, - cls: Optional[type] = None, -) -> Tuple[str, ...]: - """Return the names of a function's mandatory arguments. - - Should return the names of all function arguments that: - * Aren't bound to an instance or type as in instance or class methods. - * Don't have default values. - * Aren't bound with functools.partial. - * Aren't replaced with mocks. + mock_sentinel = getattr(sys.modules.get("mock"), "DEFAULT", object()) + ut_mock_sentinel = getattr(sys.modules.get("unittest.mock"), "DEFAULT", object()) + + return len( + [ + p + for p in patchings + if not p.attribute_name + and (p.new is mock_sentinel or p.new is ut_mock_sentinel) + ] + ) + + +def getfuncargnames( + function: Callable[..., Any], + *, + name: str = "", + is_method: bool = False, + cls: Optional[type] = None, +) -> Tuple[str, ...]: + """Return the names of a function's mandatory arguments. + + Should return the names of all function arguments that: + * Aren't bound to an instance or type as in instance or class methods. + * Don't have default values. + * Aren't bound with functools.partial. + * Aren't replaced with mocks. The is_method and cls arguments indicate that the function should be treated as a bound method even though it's not unless, only in the case of cls, the function is a static method. - The name parameter should be the original name in which the function was collected. - """ - # TODO(RonnyPfannschmidt): This function should be refactored when we - # revisit fixtures. The fixture mechanism should ask the node for - # the fixture names, and not try to obtain directly from the - # function object well after collection has occurred. + The name parameter should be the original name in which the function was collected. + """ + # TODO(RonnyPfannschmidt): This function should be refactored when we + # revisit fixtures. The fixture mechanism should ask the node for + # the fixture names, and not try to obtain directly from the + # function object well after collection has occurred. # The parameters attribute of a Signature object contains an # ordered mapping of parameter names to Parameter instances. This @@ -143,7 +143,7 @@ def getfuncargnames( parameters = signature(function).parameters except (ValueError, TypeError) as e: fail( - f"Could not determine arguments of {function!r}: {e}", pytrace=False, + f"Could not determine arguments of {function!r}: {e}", pytrace=False, ) arg_names = tuple( @@ -155,14 +155,14 @@ def getfuncargnames( ) and p.default is Parameter.empty ) - if not name: - name = function.__name__ - + if not name: + name = function.__name__ + # If this function should be treated as a bound method even though # it's passed as an unbound method or function, remove the first # parameter name. if is_method or ( - cls and not isinstance(cls.__dict__.get(name, None), staticmethod) + cls and not isinstance(cls.__dict__.get(name, None), staticmethod) ): arg_names = arg_names[1:] # Remove any names that will be replaced with mocks. @@ -171,21 +171,21 @@ def getfuncargnames( return arg_names -if sys.version_info < (3, 7): +if sys.version_info < (3, 7): - @contextmanager - def nullcontext(): - yield + @contextmanager + def nullcontext(): + yield - -else: - from contextlib import nullcontext as nullcontext # noqa: F401 - - -def get_default_arg_names(function: Callable[..., Any]) -> Tuple[str, ...]: - # Note: this code intentionally mirrors the code at the beginning of - # getfuncargnames, to get the arguments which were excluded from its result - # because they had default values. + +else: + from contextlib import nullcontext as nullcontext # noqa: F401 + + +def get_default_arg_names(function: Callable[..., Any]) -> Tuple[str, ...]: + # Note: this code intentionally mirrors the code at the beginning of + # getfuncargnames, to get the arguments which were excluded from its result + # because they had default values. return tuple( p.name for p in signature(function).parameters.values() @@ -194,64 +194,64 @@ def get_default_arg_names(function: Callable[..., Any]) -> Tuple[str, ...]: ) -_non_printable_ascii_translate_table = { - i: f"\\x{i:02x}" for i in range(128) if i not in range(32, 127) -} -_non_printable_ascii_translate_table.update( - {ord("\t"): "\\t", ord("\r"): "\\r", ord("\n"): "\\n"} -) +_non_printable_ascii_translate_table = { + i: f"\\x{i:02x}" for i in range(128) if i not in range(32, 127) +} +_non_printable_ascii_translate_table.update( + {ord("\t"): "\\t", ord("\r"): "\\r", ord("\n"): "\\n"} +) + + +def _translate_non_printable(s: str) -> str: + return s.translate(_non_printable_ascii_translate_table) + + +STRING_TYPES = bytes, str -def _translate_non_printable(s: str) -> str: - return s.translate(_non_printable_ascii_translate_table) +def _bytes_to_ascii(val: bytes) -> str: + return val.decode("ascii", "backslashreplace") -STRING_TYPES = bytes, str +def ascii_escaped(val: Union[bytes, str]) -> str: + r"""If val is pure ASCII, return it as an str, otherwise, escape + bytes objects into a sequence of escaped bytes: + b'\xc3\xb4\xc5\xd6' -> r'\xc3\xb4\xc5\xd6' -def _bytes_to_ascii(val: bytes) -> str: - return val.decode("ascii", "backslashreplace") + and escapes unicode objects into a sequence of escaped unicode + ids, e.g.: + r'4\nV\U00043efa\x0eMXWB\x1e\u3028\u15fd\xcd\U0007d944' -def ascii_escaped(val: Union[bytes, str]) -> str: - r"""If val is pure ASCII, return it as an str, otherwise, escape - bytes objects into a sequence of escaped bytes: + Note: + The obvious "v.decode('unicode-escape')" will return + valid UTF-8 unicode if it finds them in bytes, but we + want to return escaped bytes for any byte, even if they match + a UTF-8 string. + """ + if isinstance(val, bytes): + ret = _bytes_to_ascii(val) + else: + ret = val + return ret - b'\xc3\xb4\xc5\xd6' -> r'\xc3\xb4\xc5\xd6' - and escapes unicode objects into a sequence of escaped unicode - ids, e.g.: - - r'4\nV\U00043efa\x0eMXWB\x1e\u3028\u15fd\xcd\U0007d944' - - Note: - The obvious "v.decode('unicode-escape')" will return - valid UTF-8 unicode if it finds them in bytes, but we - want to return escaped bytes for any byte, even if they match - a UTF-8 string. - """ - if isinstance(val, bytes): - ret = _bytes_to_ascii(val) - else: - ret = val - return ret - - -@attr.s -class _PytestWrapper: +@attr.s +class _PytestWrapper: """Dummy wrapper around a function object for internal use only. - Used to correctly unwrap the underlying function object when we are - creating fixtures, because we wrap the function object ourselves with a - decorator to issue warnings when the fixture function is called directly. + Used to correctly unwrap the underlying function object when we are + creating fixtures, because we wrap the function object ourselves with a + decorator to issue warnings when the fixture function is called directly. """ - obj = attr.ib() + obj = attr.ib() def get_real_func(obj): - """Get the real function object of the (possibly) wrapped object by - functools.wraps or functools.partial.""" + """Get the real function object of the (possibly) wrapped object by + functools.wraps or functools.partial.""" start_obj = obj for i in range(100): # __pytest_wrapped__ is set by @pytest.fixture when wrapping the fixture function @@ -266,11 +266,11 @@ def get_real_func(obj): break obj = new_obj else: - from _pytest._io.saferepr import saferepr - + from _pytest._io.saferepr import saferepr + raise ValueError( ("could not find real function of {start}\nstopped at {current}").format( - start=saferepr(start_obj), current=saferepr(obj) + start=saferepr(start_obj), current=saferepr(obj) ) ) if isinstance(obj, functools.partial): @@ -279,13 +279,13 @@ def get_real_func(obj): def get_real_method(obj, holder): - """Attempt to obtain the real function object that might be wrapping - ``obj``, while at the same time returning a bound method to ``holder`` if - the original object was a bound method.""" + """Attempt to obtain the real function object that might be wrapping + ``obj``, while at the same time returning a bound method to ``holder`` if + the original object was a bound method.""" try: is_method = hasattr(obj, "__func__") obj = get_real_func(obj) - except Exception: # pragma: no cover + except Exception: # pragma: no cover return obj if is_method and hasattr(obj, "__get__") and callable(obj.__get__): obj = obj.__get__(holder) @@ -299,14 +299,14 @@ def getimfunc(func): return func -def safe_getattr(object: Any, name: str, default: Any) -> Any: - """Like getattr but return default upon any Exception or any OutcomeException. +def safe_getattr(object: Any, name: str, default: Any) -> Any: + """Like getattr but return default upon any Exception or any OutcomeException. Attribute access can potentially fail for 'evil' Python objects. See issue #214. - It catches OutcomeException because of #2490 (issue #580), new outcomes - are derived from BaseException instead of Exception (for more details - check #2707). + It catches OutcomeException because of #2490 (issue #580), new outcomes + are derived from BaseException instead of Exception (for more details + check #2707). """ try: return getattr(object, name, default) @@ -314,87 +314,87 @@ def safe_getattr(object: Any, name: str, default: Any) -> Any: return default -def safe_isclass(obj: object) -> bool: +def safe_isclass(obj: object) -> bool: """Ignore any exception via isinstance on Python 3.""" try: - return inspect.isclass(obj) + return inspect.isclass(obj) except Exception: return False -if TYPE_CHECKING: - if sys.version_info >= (3, 8): - from typing import final as final - else: - from typing_extensions import final as final -elif sys.version_info >= (3, 8): - from typing import final as final -else: - - def final(f): - return f - - -if sys.version_info >= (3, 8): - from functools import cached_property as cached_property -else: - from typing import overload - from typing import Type - - class cached_property(Generic[_S, _T]): - __slots__ = ("func", "__doc__") - - def __init__(self, func: Callable[[_S], _T]) -> None: - self.func = func - self.__doc__ = func.__doc__ - - @overload - def __get__( - self, instance: None, owner: Optional[Type[_S]] = ... - ) -> "cached_property[_S, _T]": - ... - - @overload - def __get__(self, instance: _S, owner: Optional[Type[_S]] = ...) -> _T: - ... - - def __get__(self, instance, owner=None): - if instance is None: - return self - value = instance.__dict__[self.func.__name__] = self.func(instance) - return value - - -# Perform exhaustiveness checking. -# -# Consider this example: -# -# MyUnion = Union[int, str] -# -# def handle(x: MyUnion) -> int { -# if isinstance(x, int): -# return 1 -# elif isinstance(x, str): -# return 2 -# else: -# raise Exception('unreachable') -# -# Now suppose we add a new variant: -# -# MyUnion = Union[int, str, bytes] -# -# After doing this, we must remember ourselves to go and update the handle -# function to handle the new variant. -# -# With `assert_never` we can do better: -# -# // raise Exception('unreachable') -# return assert_never(x) -# -# Now, if we forget to handle the new variant, the type-checker will emit a -# compile-time error, instead of the runtime error we would have gotten -# previously. -# -# This also work for Enums (if you use `is` to compare) and Literals. -def assert_never(value: "NoReturn") -> "NoReturn": - assert False, "Unhandled value: {} ({})".format(value, type(value).__name__) +if TYPE_CHECKING: + if sys.version_info >= (3, 8): + from typing import final as final + else: + from typing_extensions import final as final +elif sys.version_info >= (3, 8): + from typing import final as final +else: + + def final(f): + return f + + +if sys.version_info >= (3, 8): + from functools import cached_property as cached_property +else: + from typing import overload + from typing import Type + + class cached_property(Generic[_S, _T]): + __slots__ = ("func", "__doc__") + + def __init__(self, func: Callable[[_S], _T]) -> None: + self.func = func + self.__doc__ = func.__doc__ + + @overload + def __get__( + self, instance: None, owner: Optional[Type[_S]] = ... + ) -> "cached_property[_S, _T]": + ... + + @overload + def __get__(self, instance: _S, owner: Optional[Type[_S]] = ...) -> _T: + ... + + def __get__(self, instance, owner=None): + if instance is None: + return self + value = instance.__dict__[self.func.__name__] = self.func(instance) + return value + + +# Perform exhaustiveness checking. +# +# Consider this example: +# +# MyUnion = Union[int, str] +# +# def handle(x: MyUnion) -> int { +# if isinstance(x, int): +# return 1 +# elif isinstance(x, str): +# return 2 +# else: +# raise Exception('unreachable') +# +# Now suppose we add a new variant: +# +# MyUnion = Union[int, str, bytes] +# +# After doing this, we must remember ourselves to go and update the handle +# function to handle the new variant. +# +# With `assert_never` we can do better: +# +# // raise Exception('unreachable') +# return assert_never(x) +# +# Now, if we forget to handle the new variant, the type-checker will emit a +# compile-time error, instead of the runtime error we would have gotten +# previously. +# +# This also work for Enums (if you use `is` to compare) and Literals. +def assert_never(value: "NoReturn") -> "NoReturn": + assert False, "Unhandled value: {} ({})".format(value, type(value).__name__) |