aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest/py3/_pytest/compat.py
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.ru>2022-02-10 16:44:30 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:44:30 +0300
commit2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch)
tree012bb94d777798f1f56ac1cec429509766d05181 /contrib/python/pytest/py3/_pytest/compat.py
parent6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff)
downloadydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/compat.py')
-rw-r--r--contrib/python/pytest/py3/_pytest/compat.py484
1 files changed, 242 insertions, 242 deletions
diff --git a/contrib/python/pytest/py3/_pytest/compat.py b/contrib/python/pytest/py3/_pytest/compat.py
index c23cc962ce..8c74996c1d 100644
--- a/contrib/python/pytest/py3/_pytest/compat.py
+++ b/contrib/python/pytest/py3/_pytest/compat.py
@@ -1,51 +1,51 @@
-"""Python version compatibility code."""
-import enum
+"""Python version compatibility code."""
+import enum
import functools
import inspect
import re
import sys
from contextlib import contextmanager
-from inspect import Parameter
-from inspect import signature
-from pathlib import Path
-from typing import Any
-from typing import Callable
-from typing import Generic
-from typing import Optional
-from typing import Tuple
-from typing import TYPE_CHECKING
-from typing import TypeVar
-from typing import Union
-
-import attr
+from inspect import Parameter
+from inspect import signature
+from pathlib import Path
+from typing import Any
+from typing import Callable
+from typing import Generic
+from typing import Optional
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+
+import attr
from _pytest.outcomes import fail
from _pytest.outcomes import TEST_OUTCOME
-if TYPE_CHECKING:
- from typing import NoReturn
- from typing_extensions import Final
+if TYPE_CHECKING:
+ from typing import NoReturn
+ from typing_extensions import Final
-_T = TypeVar("_T")
-_S = TypeVar("_S")
+_T = TypeVar("_T")
+_S = TypeVar("_S")
-# fmt: off
-# Singleton type for NOTSET, as described in:
-# https://www.python.org/dev/peps/pep-0484/#support-for-singleton-types-in-unions
-class NotSetType(enum.Enum):
- token = 0
-NOTSET: "Final" = NotSetType.token # noqa: E305
-# fmt: on
+# fmt: off
+# Singleton type for NOTSET, as described in:
+# https://www.python.org/dev/peps/pep-0484/#support-for-singleton-types-in-unions
+class NotSetType(enum.Enum):
+ token = 0
+NOTSET: "Final" = NotSetType.token # noqa: E305
+# fmt: on
-if sys.version_info >= (3, 8):
- from importlib import metadata as importlib_metadata
-else:
- import importlib_metadata # noqa: F401
-
-
-def _format_args(func: Callable[..., Any]) -> str:
+if sys.version_info >= (3, 8):
+ from importlib import metadata as importlib_metadata
+else:
+ import importlib_metadata # noqa: F401
+
+
+def _format_args(func: Callable[..., Any]) -> str:
return str(signature(func))
@@ -53,87 +53,87 @@ def _format_args(func: Callable[..., Any]) -> str:
REGEX_TYPE = type(re.compile(""))
-def is_generator(func: object) -> bool:
+def is_generator(func: object) -> bool:
genfunc = inspect.isgeneratorfunction(func)
return genfunc and not iscoroutinefunction(func)
-def iscoroutinefunction(func: object) -> bool:
- """Return True if func is a coroutine function (a function defined with async
- def syntax, and doesn't contain yield), or a function decorated with
- @asyncio.coroutine.
+def iscoroutinefunction(func: object) -> bool:
+ """Return True if func is a coroutine function (a function defined with async
+ def syntax, and doesn't contain yield), or a function decorated with
+ @asyncio.coroutine.
- Note: copied and modified from Python 3.5's builtin couroutines.py to avoid
- importing asyncio directly, which in turns also initializes the "logging"
- module as a side-effect (see issue #8).
+ Note: copied and modified from Python 3.5's builtin couroutines.py to avoid
+ importing asyncio directly, which in turns also initializes the "logging"
+ module as a side-effect (see issue #8).
"""
- return inspect.iscoroutinefunction(func) or getattr(func, "_is_coroutine", False)
-
+ return inspect.iscoroutinefunction(func) or getattr(func, "_is_coroutine", False)
+
+
+def is_async_function(func: object) -> bool:
+ """Return True if the given function seems to be an async function or
+ an async generator."""
+ return iscoroutinefunction(func) or inspect.isasyncgenfunction(func)
-def is_async_function(func: object) -> bool:
- """Return True if the given function seems to be an async function or
- an async generator."""
- return iscoroutinefunction(func) or inspect.isasyncgenfunction(func)
-
-def getlocation(function, curdir: Optional[str] = None) -> str:
+def getlocation(function, curdir: Optional[str] = None) -> str:
function = get_real_func(function)
- fn = Path(inspect.getfile(function))
+ fn = Path(inspect.getfile(function))
lineno = function.__code__.co_firstlineno
- if curdir is not None:
- try:
- relfn = fn.relative_to(curdir)
- except ValueError:
- pass
- else:
- return "%s:%d" % (relfn, lineno + 1)
+ if curdir is not None:
+ try:
+ relfn = fn.relative_to(curdir)
+ except ValueError:
+ pass
+ else:
+ return "%s:%d" % (relfn, lineno + 1)
return "%s:%d" % (fn, lineno + 1)
-def num_mock_patch_args(function) -> int:
- """Return number of arguments used up by mock arguments (if any)."""
+def num_mock_patch_args(function) -> int:
+ """Return number of arguments used up by mock arguments (if any)."""
patchings = getattr(function, "patchings", None)
if not patchings:
return 0
- mock_sentinel = getattr(sys.modules.get("mock"), "DEFAULT", object())
- ut_mock_sentinel = getattr(sys.modules.get("unittest.mock"), "DEFAULT", object())
-
- return len(
- [
- p
- for p in patchings
- if not p.attribute_name
- and (p.new is mock_sentinel or p.new is ut_mock_sentinel)
- ]
- )
-
-
-def getfuncargnames(
- function: Callable[..., Any],
- *,
- name: str = "",
- is_method: bool = False,
- cls: Optional[type] = None,
-) -> Tuple[str, ...]:
- """Return the names of a function's mandatory arguments.
-
- Should return the names of all function arguments that:
- * Aren't bound to an instance or type as in instance or class methods.
- * Don't have default values.
- * Aren't bound with functools.partial.
- * Aren't replaced with mocks.
+ mock_sentinel = getattr(sys.modules.get("mock"), "DEFAULT", object())
+ ut_mock_sentinel = getattr(sys.modules.get("unittest.mock"), "DEFAULT", object())
+
+ return len(
+ [
+ p
+ for p in patchings
+ if not p.attribute_name
+ and (p.new is mock_sentinel or p.new is ut_mock_sentinel)
+ ]
+ )
+
+
+def getfuncargnames(
+ function: Callable[..., Any],
+ *,
+ name: str = "",
+ is_method: bool = False,
+ cls: Optional[type] = None,
+) -> Tuple[str, ...]:
+ """Return the names of a function's mandatory arguments.
+
+ Should return the names of all function arguments that:
+ * Aren't bound to an instance or type as in instance or class methods.
+ * Don't have default values.
+ * Aren't bound with functools.partial.
+ * Aren't replaced with mocks.
The is_method and cls arguments indicate that the function should
be treated as a bound method even though it's not unless, only in
the case of cls, the function is a static method.
- The name parameter should be the original name in which the function was collected.
- """
- # TODO(RonnyPfannschmidt): This function should be refactored when we
- # revisit fixtures. The fixture mechanism should ask the node for
- # the fixture names, and not try to obtain directly from the
- # function object well after collection has occurred.
+ The name parameter should be the original name in which the function was collected.
+ """
+ # TODO(RonnyPfannschmidt): This function should be refactored when we
+ # revisit fixtures. The fixture mechanism should ask the node for
+ # the fixture names, and not try to obtain directly from the
+ # function object well after collection has occurred.
# The parameters attribute of a Signature object contains an
# ordered mapping of parameter names to Parameter instances. This
@@ -143,7 +143,7 @@ def getfuncargnames(
parameters = signature(function).parameters
except (ValueError, TypeError) as e:
fail(
- f"Could not determine arguments of {function!r}: {e}", pytrace=False,
+ f"Could not determine arguments of {function!r}: {e}", pytrace=False,
)
arg_names = tuple(
@@ -155,14 +155,14 @@ def getfuncargnames(
)
and p.default is Parameter.empty
)
- if not name:
- name = function.__name__
-
+ if not name:
+ name = function.__name__
+
# If this function should be treated as a bound method even though
# it's passed as an unbound method or function, remove the first
# parameter name.
if is_method or (
- cls and not isinstance(cls.__dict__.get(name, None), staticmethod)
+ cls and not isinstance(cls.__dict__.get(name, None), staticmethod)
):
arg_names = arg_names[1:]
# Remove any names that will be replaced with mocks.
@@ -171,21 +171,21 @@ def getfuncargnames(
return arg_names
-if sys.version_info < (3, 7):
+if sys.version_info < (3, 7):
- @contextmanager
- def nullcontext():
- yield
+ @contextmanager
+ def nullcontext():
+ yield
-
-else:
- from contextlib import nullcontext as nullcontext # noqa: F401
-
-
-def get_default_arg_names(function: Callable[..., Any]) -> Tuple[str, ...]:
- # Note: this code intentionally mirrors the code at the beginning of
- # getfuncargnames, to get the arguments which were excluded from its result
- # because they had default values.
+
+else:
+ from contextlib import nullcontext as nullcontext # noqa: F401
+
+
+def get_default_arg_names(function: Callable[..., Any]) -> Tuple[str, ...]:
+ # Note: this code intentionally mirrors the code at the beginning of
+ # getfuncargnames, to get the arguments which were excluded from its result
+ # because they had default values.
return tuple(
p.name
for p in signature(function).parameters.values()
@@ -194,64 +194,64 @@ def get_default_arg_names(function: Callable[..., Any]) -> Tuple[str, ...]:
)
-_non_printable_ascii_translate_table = {
- i: f"\\x{i:02x}" for i in range(128) if i not in range(32, 127)
-}
-_non_printable_ascii_translate_table.update(
- {ord("\t"): "\\t", ord("\r"): "\\r", ord("\n"): "\\n"}
-)
+_non_printable_ascii_translate_table = {
+ i: f"\\x{i:02x}" for i in range(128) if i not in range(32, 127)
+}
+_non_printable_ascii_translate_table.update(
+ {ord("\t"): "\\t", ord("\r"): "\\r", ord("\n"): "\\n"}
+)
+
+
+def _translate_non_printable(s: str) -> str:
+ return s.translate(_non_printable_ascii_translate_table)
+
+
+STRING_TYPES = bytes, str
-def _translate_non_printable(s: str) -> str:
- return s.translate(_non_printable_ascii_translate_table)
+def _bytes_to_ascii(val: bytes) -> str:
+ return val.decode("ascii", "backslashreplace")
-STRING_TYPES = bytes, str
+def ascii_escaped(val: Union[bytes, str]) -> str:
+ r"""If val is pure ASCII, return it as an str, otherwise, escape
+ bytes objects into a sequence of escaped bytes:
+ b'\xc3\xb4\xc5\xd6' -> r'\xc3\xb4\xc5\xd6'
-def _bytes_to_ascii(val: bytes) -> str:
- return val.decode("ascii", "backslashreplace")
+ and escapes unicode objects into a sequence of escaped unicode
+ ids, e.g.:
+ r'4\nV\U00043efa\x0eMXWB\x1e\u3028\u15fd\xcd\U0007d944'
-def ascii_escaped(val: Union[bytes, str]) -> str:
- r"""If val is pure ASCII, return it as an str, otherwise, escape
- bytes objects into a sequence of escaped bytes:
+ Note:
+ The obvious "v.decode('unicode-escape')" will return
+ valid UTF-8 unicode if it finds them in bytes, but we
+ want to return escaped bytes for any byte, even if they match
+ a UTF-8 string.
+ """
+ if isinstance(val, bytes):
+ ret = _bytes_to_ascii(val)
+ else:
+ ret = val
+ return ret
- b'\xc3\xb4\xc5\xd6' -> r'\xc3\xb4\xc5\xd6'
- and escapes unicode objects into a sequence of escaped unicode
- ids, e.g.:
-
- r'4\nV\U00043efa\x0eMXWB\x1e\u3028\u15fd\xcd\U0007d944'
-
- Note:
- The obvious "v.decode('unicode-escape')" will return
- valid UTF-8 unicode if it finds them in bytes, but we
- want to return escaped bytes for any byte, even if they match
- a UTF-8 string.
- """
- if isinstance(val, bytes):
- ret = _bytes_to_ascii(val)
- else:
- ret = val
- return ret
-
-
-@attr.s
-class _PytestWrapper:
+@attr.s
+class _PytestWrapper:
"""Dummy wrapper around a function object for internal use only.
- Used to correctly unwrap the underlying function object when we are
- creating fixtures, because we wrap the function object ourselves with a
- decorator to issue warnings when the fixture function is called directly.
+ Used to correctly unwrap the underlying function object when we are
+ creating fixtures, because we wrap the function object ourselves with a
+ decorator to issue warnings when the fixture function is called directly.
"""
- obj = attr.ib()
+ obj = attr.ib()
def get_real_func(obj):
- """Get the real function object of the (possibly) wrapped object by
- functools.wraps or functools.partial."""
+ """Get the real function object of the (possibly) wrapped object by
+ functools.wraps or functools.partial."""
start_obj = obj
for i in range(100):
# __pytest_wrapped__ is set by @pytest.fixture when wrapping the fixture function
@@ -266,11 +266,11 @@ def get_real_func(obj):
break
obj = new_obj
else:
- from _pytest._io.saferepr import saferepr
-
+ from _pytest._io.saferepr import saferepr
+
raise ValueError(
("could not find real function of {start}\nstopped at {current}").format(
- start=saferepr(start_obj), current=saferepr(obj)
+ start=saferepr(start_obj), current=saferepr(obj)
)
)
if isinstance(obj, functools.partial):
@@ -279,13 +279,13 @@ def get_real_func(obj):
def get_real_method(obj, holder):
- """Attempt to obtain the real function object that might be wrapping
- ``obj``, while at the same time returning a bound method to ``holder`` if
- the original object was a bound method."""
+ """Attempt to obtain the real function object that might be wrapping
+ ``obj``, while at the same time returning a bound method to ``holder`` if
+ the original object was a bound method."""
try:
is_method = hasattr(obj, "__func__")
obj = get_real_func(obj)
- except Exception: # pragma: no cover
+ except Exception: # pragma: no cover
return obj
if is_method and hasattr(obj, "__get__") and callable(obj.__get__):
obj = obj.__get__(holder)
@@ -299,14 +299,14 @@ def getimfunc(func):
return func
-def safe_getattr(object: Any, name: str, default: Any) -> Any:
- """Like getattr but return default upon any Exception or any OutcomeException.
+def safe_getattr(object: Any, name: str, default: Any) -> Any:
+ """Like getattr but return default upon any Exception or any OutcomeException.
Attribute access can potentially fail for 'evil' Python objects.
See issue #214.
- It catches OutcomeException because of #2490 (issue #580), new outcomes
- are derived from BaseException instead of Exception (for more details
- check #2707).
+ It catches OutcomeException because of #2490 (issue #580), new outcomes
+ are derived from BaseException instead of Exception (for more details
+ check #2707).
"""
try:
return getattr(object, name, default)
@@ -314,87 +314,87 @@ def safe_getattr(object: Any, name: str, default: Any) -> Any:
return default
-def safe_isclass(obj: object) -> bool:
+def safe_isclass(obj: object) -> bool:
"""Ignore any exception via isinstance on Python 3."""
try:
- return inspect.isclass(obj)
+ return inspect.isclass(obj)
except Exception:
return False
-if TYPE_CHECKING:
- if sys.version_info >= (3, 8):
- from typing import final as final
- else:
- from typing_extensions import final as final
-elif sys.version_info >= (3, 8):
- from typing import final as final
-else:
-
- def final(f):
- return f
-
-
-if sys.version_info >= (3, 8):
- from functools import cached_property as cached_property
-else:
- from typing import overload
- from typing import Type
-
- class cached_property(Generic[_S, _T]):
- __slots__ = ("func", "__doc__")
-
- def __init__(self, func: Callable[[_S], _T]) -> None:
- self.func = func
- self.__doc__ = func.__doc__
-
- @overload
- def __get__(
- self, instance: None, owner: Optional[Type[_S]] = ...
- ) -> "cached_property[_S, _T]":
- ...
-
- @overload
- def __get__(self, instance: _S, owner: Optional[Type[_S]] = ...) -> _T:
- ...
-
- def __get__(self, instance, owner=None):
- if instance is None:
- return self
- value = instance.__dict__[self.func.__name__] = self.func(instance)
- return value
-
-
-# Perform exhaustiveness checking.
-#
-# Consider this example:
-#
-# MyUnion = Union[int, str]
-#
-# def handle(x: MyUnion) -> int {
-# if isinstance(x, int):
-# return 1
-# elif isinstance(x, str):
-# return 2
-# else:
-# raise Exception('unreachable')
-#
-# Now suppose we add a new variant:
-#
-# MyUnion = Union[int, str, bytes]
-#
-# After doing this, we must remember ourselves to go and update the handle
-# function to handle the new variant.
-#
-# With `assert_never` we can do better:
-#
-# // raise Exception('unreachable')
-# return assert_never(x)
-#
-# Now, if we forget to handle the new variant, the type-checker will emit a
-# compile-time error, instead of the runtime error we would have gotten
-# previously.
-#
-# This also work for Enums (if you use `is` to compare) and Literals.
-def assert_never(value: "NoReturn") -> "NoReturn":
- assert False, "Unhandled value: {} ({})".format(value, type(value).__name__)
+if TYPE_CHECKING:
+ if sys.version_info >= (3, 8):
+ from typing import final as final
+ else:
+ from typing_extensions import final as final
+elif sys.version_info >= (3, 8):
+ from typing import final as final
+else:
+
+ def final(f):
+ return f
+
+
+if sys.version_info >= (3, 8):
+ from functools import cached_property as cached_property
+else:
+ from typing import overload
+ from typing import Type
+
+ class cached_property(Generic[_S, _T]):
+ __slots__ = ("func", "__doc__")
+
+ def __init__(self, func: Callable[[_S], _T]) -> None:
+ self.func = func
+ self.__doc__ = func.__doc__
+
+ @overload
+ def __get__(
+ self, instance: None, owner: Optional[Type[_S]] = ...
+ ) -> "cached_property[_S, _T]":
+ ...
+
+ @overload
+ def __get__(self, instance: _S, owner: Optional[Type[_S]] = ...) -> _T:
+ ...
+
+ def __get__(self, instance, owner=None):
+ if instance is None:
+ return self
+ value = instance.__dict__[self.func.__name__] = self.func(instance)
+ return value
+
+
+# Perform exhaustiveness checking.
+#
+# Consider this example:
+#
+# MyUnion = Union[int, str]
+#
+# def handle(x: MyUnion) -> int {
+# if isinstance(x, int):
+# return 1
+# elif isinstance(x, str):
+# return 2
+# else:
+# raise Exception('unreachable')
+#
+# Now suppose we add a new variant:
+#
+# MyUnion = Union[int, str, bytes]
+#
+# After doing this, we must remember ourselves to go and update the handle
+# function to handle the new variant.
+#
+# With `assert_never` we can do better:
+#
+# // raise Exception('unreachable')
+# return assert_never(x)
+#
+# Now, if we forget to handle the new variant, the type-checker will emit a
+# compile-time error, instead of the runtime error we would have gotten
+# previously.
+#
+# This also work for Enums (if you use `is` to compare) and Literals.
+def assert_never(value: "NoReturn") -> "NoReturn":
+ assert False, "Unhandled value: {} ({})".format(value, type(value).__name__)