summaryrefslogtreecommitdiffstats
path: root/contrib/python/typeguard
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2025-01-13 22:57:12 +0300
committerrobot-piglet <[email protected]>2025-01-13 23:12:28 +0300
commitf35fe00dcc2af8b9605460b0eba29304694c7d22 (patch)
tree5a70bd13e7037e2fdbabc25b49e4b9ef7af5d106 /contrib/python/typeguard
parentd952d8354362c04f11ca60d229dcaf0709fab9da (diff)
Intermediate changes
commit_hash:d3483365e53236dc94c949b30c45470fa72387a7
Diffstat (limited to 'contrib/python/typeguard')
-rw-r--r--contrib/python/typeguard/.dist-info/METADATA77
-rw-r--r--contrib/python/typeguard/.dist-info/entry_points.txt3
-rw-r--r--contrib/python/typeguard/.dist-info/top_level.txt1
-rw-r--r--contrib/python/typeguard/LICENSE19
-rw-r--r--contrib/python/typeguard/README.rst41
-rw-r--r--contrib/python/typeguard/patches/01-fix-tests.patch42
-rw-r--r--contrib/python/typeguard/patches/02-support-new-typing-extensions.patch16
-rw-r--r--contrib/python/typeguard/patches/03-support-python-3.12.patch15
-rw-r--r--contrib/python/typeguard/patches/04-support-python-3.12.4.patch11
-rw-r--r--contrib/python/typeguard/tests/conftest.py12
-rw-r--r--contrib/python/typeguard/tests/dummymodule.py92
-rw-r--r--contrib/python/typeguard/tests/mypy/negative.py53
-rw-r--r--contrib/python/typeguard/tests/mypy/positive.py56
-rw-r--r--contrib/python/typeguard/tests/mypy/test_type_annotations.py114
-rw-r--r--contrib/python/typeguard/tests/test_importhook.py125
-rw-r--r--contrib/python/typeguard/tests/test_typeguard.py1548
-rw-r--r--contrib/python/typeguard/tests/test_typeguard_py36.py189
-rw-r--r--contrib/python/typeguard/tests/ya.make22
-rw-r--r--contrib/python/typeguard/typeguard/__init__.py1258
-rw-r--r--contrib/python/typeguard/typeguard/importhook.py162
-rw-r--r--contrib/python/typeguard/typeguard/py.typed0
-rw-r--r--contrib/python/typeguard/typeguard/pytest_plugin.py30
-rw-r--r--contrib/python/typeguard/ya.make30
23 files changed, 3916 insertions, 0 deletions
diff --git a/contrib/python/typeguard/.dist-info/METADATA b/contrib/python/typeguard/.dist-info/METADATA
new file mode 100644
index 00000000000..6ca2152dd5a
--- /dev/null
+++ b/contrib/python/typeguard/.dist-info/METADATA
@@ -0,0 +1,77 @@
+Metadata-Version: 2.1
+Name: typeguard
+Version: 2.13.3
+Summary: Run-time type checker for Python
+Home-page: UNKNOWN
+Author: Alex Grönholm
+Author-email: [email protected]
+License: MIT
+Project-URL: Documentation, https://typeguard.readthedocs.io/en/latest/
+Project-URL: Change log, https://typeguard.readthedocs.io/en/latest/versionhistory.html
+Project-URL: Source code, https://github.com/agronholm/typeguard
+Project-URL: Issue tracker, https://github.com/agronholm/typeguard/issues
+Platform: UNKNOWN
+Classifier: Development Status :: 5 - Production/Stable
+Classifier: Intended Audience :: Developers
+Classifier: License :: OSI Approved :: MIT License
+Classifier: Programming Language :: Python
+Classifier: Programming Language :: Python :: 3
+Classifier: Programming Language :: Python :: 3.5
+Classifier: Programming Language :: Python :: 3.6
+Classifier: Programming Language :: Python :: 3.7
+Classifier: Programming Language :: Python :: 3.8
+Classifier: Programming Language :: Python :: 3.9
+Classifier: Programming Language :: Python :: 3.10
+Requires-Python: >=3.5.3
+License-File: LICENSE
+Provides-Extra: doc
+Requires-Dist: sphinx-rtd-theme ; extra == 'doc'
+Requires-Dist: sphinx-autodoc-typehints (>=1.2.0) ; extra == 'doc'
+Provides-Extra: test
+Requires-Dist: pytest ; extra == 'test'
+Requires-Dist: typing-extensions ; extra == 'test'
+Requires-Dist: mypy ; (platform_python_implementation != "PyPy") and extra == 'test'
+
+.. image:: https://travis-ci.com/agronholm/typeguard.svg?branch=master
+ :target: https://travis-ci.com/agronholm/typeguard
+ :alt: Build Status
+.. image:: https://coveralls.io/repos/agronholm/typeguard/badge.svg?branch=master&service=github
+ :target: https://coveralls.io/github/agronholm/typeguard?branch=master
+ :alt: Code Coverage
+.. image:: https://readthedocs.org/projects/typeguard/badge/?version=latest
+ :target: https://typeguard.readthedocs.io/en/latest/?badge=latest
+
+This library provides run-time type checking for functions defined with
+`PEP 484 <https://www.python.org/dev/peps/pep-0484/>`_ argument (and return) type annotations.
+
+Four principal ways to do type checking are provided, each with its pros and cons:
+
+#. the ``check_argument_types()`` and ``check_return_type()`` functions:
+
+ * debugger friendly (except when running with the pydev debugger with the C extension installed)
+ * does not work reliably with dynamically defined type hints (e.g. in nested functions)
+#. the ``@typechecked`` decorator:
+
+ * automatically type checks yields and sends of returned generators (regular and async)
+ * adds an extra frame to the call stack for every call to a decorated function
+#. the stack profiler hook (``with TypeChecker('packagename'):``) (deprecated):
+
+ * emits warnings instead of raising ``TypeError``
+ * requires very few modifications to the code
+ * multiple TypeCheckers can be stacked/nested
+ * does not work reliably with dynamically defined type hints (e.g. in nested functions)
+ * may cause problems with badly behaving debuggers or profilers
+ * cannot distinguish between an exception being raised and a ``None`` being returned
+#. the import hook (``typeguard.importhook.install_import_hook()``):
+
+ * automatically annotates classes and functions with ``@typechecked`` on import
+ * no code changes required in target modules
+ * requires imports of modules you need to check to be deferred until after the import hook has
+ been installed
+ * may clash with other import hooks
+
+See the documentation_ for further instructions.
+
+.. _documentation: https://typeguard.readthedocs.io/en/latest/
+
+
diff --git a/contrib/python/typeguard/.dist-info/entry_points.txt b/contrib/python/typeguard/.dist-info/entry_points.txt
new file mode 100644
index 00000000000..0bde2f50de5
--- /dev/null
+++ b/contrib/python/typeguard/.dist-info/entry_points.txt
@@ -0,0 +1,3 @@
+[pytest11]
+typeguard = typeguard.pytest_plugin
+
diff --git a/contrib/python/typeguard/.dist-info/top_level.txt b/contrib/python/typeguard/.dist-info/top_level.txt
new file mode 100644
index 00000000000..be5ec23ea20
--- /dev/null
+++ b/contrib/python/typeguard/.dist-info/top_level.txt
@@ -0,0 +1 @@
+typeguard
diff --git a/contrib/python/typeguard/LICENSE b/contrib/python/typeguard/LICENSE
new file mode 100644
index 00000000000..07806f8af9d
--- /dev/null
+++ b/contrib/python/typeguard/LICENSE
@@ -0,0 +1,19 @@
+This is the MIT license: http://www.opensource.org/licenses/mit-license.php
+
+Copyright (c) Alex Grönholm
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of this
+software and associated documentation files (the "Software"), to deal in the Software
+without restriction, including without limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons
+to whom the Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all copies or
+substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
+INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
+PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
+FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/contrib/python/typeguard/README.rst b/contrib/python/typeguard/README.rst
new file mode 100644
index 00000000000..6d2ed01bac4
--- /dev/null
+++ b/contrib/python/typeguard/README.rst
@@ -0,0 +1,41 @@
+.. image:: https://travis-ci.com/agronholm/typeguard.svg?branch=master
+ :target: https://travis-ci.com/agronholm/typeguard
+ :alt: Build Status
+.. image:: https://coveralls.io/repos/agronholm/typeguard/badge.svg?branch=master&service=github
+ :target: https://coveralls.io/github/agronholm/typeguard?branch=master
+ :alt: Code Coverage
+.. image:: https://readthedocs.org/projects/typeguard/badge/?version=latest
+ :target: https://typeguard.readthedocs.io/en/latest/?badge=latest
+
+This library provides run-time type checking for functions defined with
+`PEP 484 <https://www.python.org/dev/peps/pep-0484/>`_ argument (and return) type annotations.
+
+Four principal ways to do type checking are provided, each with its pros and cons:
+
+#. the ``check_argument_types()`` and ``check_return_type()`` functions:
+
+ * debugger friendly (except when running with the pydev debugger with the C extension installed)
+ * does not work reliably with dynamically defined type hints (e.g. in nested functions)
+#. the ``@typechecked`` decorator:
+
+ * automatically type checks yields and sends of returned generators (regular and async)
+ * adds an extra frame to the call stack for every call to a decorated function
+#. the stack profiler hook (``with TypeChecker('packagename'):``) (deprecated):
+
+ * emits warnings instead of raising ``TypeError``
+ * requires very few modifications to the code
+ * multiple TypeCheckers can be stacked/nested
+ * does not work reliably with dynamically defined type hints (e.g. in nested functions)
+ * may cause problems with badly behaving debuggers or profilers
+ * cannot distinguish between an exception being raised and a ``None`` being returned
+#. the import hook (``typeguard.importhook.install_import_hook()``):
+
+ * automatically annotates classes and functions with ``@typechecked`` on import
+ * no code changes required in target modules
+ * requires imports of modules you need to check to be deferred until after the import hook has
+ been installed
+ * may clash with other import hooks
+
+See the documentation_ for further instructions.
+
+.. _documentation: https://typeguard.readthedocs.io/en/latest/
diff --git a/contrib/python/typeguard/patches/01-fix-tests.patch b/contrib/python/typeguard/patches/01-fix-tests.patch
new file mode 100644
index 00000000000..f012dca6207
--- /dev/null
+++ b/contrib/python/typeguard/patches/01-fix-tests.patch
@@ -0,0 +1,42 @@
+--- contrib/python/typeguard/tests/test_importhook.py (index)
++++ contrib/python/typeguard/tests/test_importhook.py (working tree)
+@@ -8,7 +8,9 @@ import pytest
+
+ from typeguard.importhook import TypeguardFinder, install_import_hook
+
+-this_dir = Path(__file__).parent
++import yatest.common as yc
++
++this_dir = Path(yc.test_source_path())
+ dummy_module_path = this_dir / 'dummymodule.py'
+ cached_module_path = Path(cache_from_source(str(dummy_module_path), optimization='typeguard'))
+
+@@ -29,6 +31,7 @@ def dummymodule():
+ sys.path.remove(str(this_dir))
+
+
+ def test_cached_module(dummymodule):
+ assert cached_module_path.is_file()
+
+--- contrib/python/typeguard/tests/test_typeguard.py (index)
++++ contrib/python/typeguard/tests/test_typeguard.py (working tree)
+@@ -84,1 +84,1 @@ def mock_class(request):
+- (Child(), 'test_typeguard.Child'),
++ (Child(), '__tests__.test_typeguard.Child'),
+@@ -467,1 +467,1 @@ class TestCheckArgumentTypes:
+- r'(test_typeguard\.)?Employee; got tuple instead')
++ r'(__tests__\.test_typeguard\.)?Employee; got tuple instead')
+@@ -547,2 +547,2 @@ class TestCheckArgumentTypes:
+- assert str(exc.value) == ('type of argument "a" must be test_typeguard.Child or one of '
+- 'its subclasses; got test_typeguard.Parent instead')
++ assert str(exc.value) == ('type of argument "a" must be __tests__.test_typeguard.Child or one of '
++ 'its subclasses; got __tests__.test_typeguard.Parent instead')
+@@ -556,1 +556,1 @@ class TestCheckArgumentTypes:
+- '"a" must be a subclass of test_typeguard.Child; got test_typeguard.Parent instead')
++ '"a" must be a subclass of __tests__.test_typeguard.Child; got __tests__.test_typeguard.Parent instead')
+@@ -585,2 +585,2 @@ class TestCheckArgumentTypes:
+- assert str(exc.value) == ('type of argument "a" must be test_typeguard.Child; '
+- 'got test_typeguard.Parent instead')
++ assert str(exc.value) == ('type of argument "a" must be __tests__.test_typeguard.Child; '
++ 'got __tests__.test_typeguard.Parent instead')
diff --git a/contrib/python/typeguard/patches/02-support-new-typing-extensions.patch b/contrib/python/typeguard/patches/02-support-new-typing-extensions.patch
new file mode 100644
index 00000000000..b296e06bd0b
--- /dev/null
+++ b/contrib/python/typeguard/patches/02-support-new-typing-extensions.patch
@@ -0,0 +1,16 @@
+--- contrib/python/typeguard/tests/test_typeguard.py (index)
++++ contrib/python/typeguard/tests/test_typeguard.py (working tree)
+@@ -8,11 +8,11 @@ from functools import lru_cache, partial, wraps
+ from io import BytesIO, StringIO
+ from typing import (
+ AbstractSet, Any, AnyStr, BinaryIO, Callable, Container, Dict, Generator, Generic, Iterable,
+- Iterator, List, NamedTuple, Sequence, Set, TextIO, Tuple, Type, TypeVar, Union)
++ Iterator, List, NamedTuple, Sequence, Set, TextIO, Tuple, Type, TypeVar, Union, TypedDict)
+ from unittest.mock import MagicMock, Mock
+
+ import pytest
+-from typing_extensions import Literal, NoReturn, Protocol, TypedDict, runtime_checkable
++from typing_extensions import Literal, NoReturn, Protocol, runtime_checkable
+
+ from typeguard import (
+ ForwardRefPolicy, TypeChecker, TypeHintWarning, TypeWarning, check_argument_types,
diff --git a/contrib/python/typeguard/patches/03-support-python-3.12.patch b/contrib/python/typeguard/patches/03-support-python-3.12.patch
new file mode 100644
index 00000000000..d0432dd0d3f
--- /dev/null
+++ b/contrib/python/typeguard/patches/03-support-python-3.12.patch
@@ -0,0 +1,15 @@
+--- contrib/python/typeguard/typeguard/importhook.py (index)
++++ contrib/python/typeguard/typeguard/importhook.py (working tree)
+@@ -29 +29 @@ class TypeguardTransformer(ast.NodeVisitor):
+- elif isinstance(child, ast.Expr) and isinstance(child.value, ast.Str):
++ elif isinstance(child, ast.Expr) and isinstance(child.value, ast.Constant):
+--- contrib/python/typeguard/tests/test_typeguard.py (index)
++++ contrib/python/typeguard/tests/test_typeguard.py (working tree)
+@@ -1227,6 +1227,7 @@ class TestTypeChecked:
+ def foo():
+ pass
+
++ @pytest.mark.skipif(sys.version_info >= (3, 12), reason="Fail wint Python 3.12")
+ @pytest.mark.parametrize('annotation', [TBound, TConstrained], ids=['bound', 'constrained'])
+ def test_typevar_forwardref(self, annotation):
+ @typechecked
diff --git a/contrib/python/typeguard/patches/04-support-python-3.12.4.patch b/contrib/python/typeguard/patches/04-support-python-3.12.4.patch
new file mode 100644
index 00000000000..bed273464aa
--- /dev/null
+++ b/contrib/python/typeguard/patches/04-support-python-3.12.4.patch
@@ -0,0 +1,11 @@
+--- contrib/python/typeguard/typeguard/__init__.py (index)
++++ contrib/python/typeguard/typeguard/__init__.py (working tree)
+@@ -265,7 +265,7 @@ def resolve_forwardref(maybe_ref, memo: _TypeCheckMemo):
+ if sys.version_info < (3, 9, 0):
+ return evaluate_forwardref(maybe_ref, memo.globals, memo.locals)
+ else:
+- return evaluate_forwardref(maybe_ref, memo.globals, memo.locals, frozenset())
++ return evaluate_forwardref(maybe_ref, memo.globals, memo.locals, recursive_guard=frozenset())
+
+ else:
+ return maybe_ref
diff --git a/contrib/python/typeguard/tests/conftest.py b/contrib/python/typeguard/tests/conftest.py
new file mode 100644
index 00000000000..2a3132bc4f7
--- /dev/null
+++ b/contrib/python/typeguard/tests/conftest.py
@@ -0,0 +1,12 @@
+import re
+import sys
+
+version_re = re.compile(r'_py(\d)(\d)\.py$')
+
+
+def pytest_ignore_collect(path, config):
+ match = version_re.search(path.basename)
+ if match:
+ version = tuple(int(x) for x in match.groups())
+ if sys.version_info < version:
+ return True
diff --git a/contrib/python/typeguard/tests/dummymodule.py b/contrib/python/typeguard/tests/dummymodule.py
new file mode 100644
index 00000000000..7578976a1a7
--- /dev/null
+++ b/contrib/python/typeguard/tests/dummymodule.py
@@ -0,0 +1,92 @@
+"""Module docstring."""
+from __future__ import absolute_import, division
+
+from typing import no_type_check, no_type_check_decorator
+
+from typeguard import typeguard_ignore
+
+
+@no_type_check_decorator
+def dummy_decorator(func):
+ return func
+
+
+def type_checked_func(x: int, y: int) -> int:
+ return x * y
+
+
+@no_type_check
+def non_type_checked_func(x: int, y: str) -> 6:
+ return 'foo'
+
+
+@dummy_decorator
+def non_type_checked_decorated_func(x: int, y: str) -> 6:
+ return 'foo'
+
+
+@typeguard_ignore
+def non_typeguard_checked_func(x: int, y: str) -> 6:
+ return 'foo'
+
+
+def dynamic_type_checking_func(arg, argtype, return_annotation):
+ def inner(x: argtype) -> return_annotation:
+ return str(x)
+
+ return inner(arg)
+
+
+class Metaclass(type):
+ pass
+
+
+class DummyClass(metaclass=Metaclass):
+ def type_checked_method(self, x: int, y: int) -> int:
+ return x * y
+
+ @classmethod
+ def type_checked_classmethod(cls, x: int, y: int) -> int:
+ return x * y
+
+ @staticmethod
+ def type_checked_staticmethod(x: int, y: int) -> int:
+ return x * y
+
+ @classmethod
+ def undocumented_classmethod(cls, x, y):
+ pass
+
+ @staticmethod
+ def undocumented_staticmethod(x, y):
+ pass
+
+ @property
+ def unannotated_property(self):
+ return None
+
+
+def outer():
+ class Inner:
+ pass
+
+ def create_inner() -> 'Inner':
+ return Inner()
+
+ return create_inner
+
+
+class Outer:
+ class Inner:
+ pass
+
+ def create_inner(self) -> 'Inner':
+ return Outer.Inner()
+
+ @classmethod
+ def create_inner_classmethod(cls) -> 'Inner':
+ return Outer.Inner()
+
+ @staticmethod
+ def create_inner_staticmethod() -> 'Inner':
+ return Outer.Inner()
diff --git a/contrib/python/typeguard/tests/mypy/negative.py b/contrib/python/typeguard/tests/mypy/negative.py
new file mode 100644
index 00000000000..6db0eb2a35b
--- /dev/null
+++ b/contrib/python/typeguard/tests/mypy/negative.py
@@ -0,0 +1,53 @@
+from typeguard import check_argument_types, check_return_type, typechecked, typeguard_ignore
+
+
+@typechecked
+def foo(x: int) -> int:
+ return x + 1
+
+
+@typechecked
+def bar(x: int) -> int:
+ return str(x) # error: Incompatible return value type (got "str", expected "int")
+
+
+@typeguard_ignore
+def non_typeguard_checked_func(x: int) -> int:
+ return str(x) # error: Incompatible return value type (got "str", expected "int")
+
+
+def returns_str() -> str:
+ return bar(0) # error: Incompatible return value type (got "int", expected "str")
+
+
+def arg_type(x: int) -> str:
+ return check_argument_types() # noqa: E501 # error: Incompatible return value type (got "bool", expected "str")
+
+
+def ret_type() -> str:
+ return check_return_type(False) # noqa: E501 # error: Incompatible return value type (got "bool", expected "str")
+
+
+_ = arg_type(foo) # noqa: E501 # error: Argument 1 to "arg_type" has incompatible type "Callable[[int], int]"; expected "int"
+_ = foo("typeguard") # error: Argument 1 to "foo" has incompatible type "str"; expected "int"
+
+
+@typechecked
+class MyClass:
+ def __init__(self, x: int = 0) -> None:
+ self.x = x
+
+ def add(self, y: int) -> int:
+ return self.x + y
+
+
+def get_value(c: MyClass) -> int:
+ return c.x
+
+
+def create_myclass(x: int) -> MyClass:
+ return MyClass(x)
+
+
+_ = get_value("foo") # noqa: E501 # error: Argument 1 to "get_value" has incompatible type "str"; expected "MyClass"
+_ = MyClass(returns_str()) # noqa: E501 # error: Argument 1 to "MyClass" has incompatible type "str"; expected "int"
diff --git a/contrib/python/typeguard/tests/mypy/positive.py b/contrib/python/typeguard/tests/mypy/positive.py
new file mode 100644
index 00000000000..2f01bebf362
--- /dev/null
+++ b/contrib/python/typeguard/tests/mypy/positive.py
@@ -0,0 +1,56 @@
+from typing import Callable
+
+from typeguard import check_argument_types, check_return_type, typechecked
+
+
+@typechecked
+def foo(x: str) -> str:
+ return "hello " + x
+
+
+def takes_callable(f: Callable[[str], str]) -> str:
+ return f("typeguard")
+
+
+takes_callable(foo)
+
+
+def has_valid_arguments(x: int, y: str) -> bool:
+ return check_argument_types()
+
+
+def has_valid_return_type(y: str) -> str:
+ check_return_type(y)
+ return y
+
+
+@typechecked
+class MyClass:
+
+ def __init__(self, x: int) -> None:
+ self.x = x
+
+ def add(self, y: int) -> int:
+ return self.x + y
+
+
+def get_value(c: MyClass) -> int:
+ return c.x
+
+
+@typechecked
+def get_value_checked(c: MyClass) -> int:
+ return c.x
+
+
+def create_myclass(x: int) -> MyClass:
+ return MyClass(x)
+
+
+@typechecked
+def create_myclass_checked(x: int) -> MyClass:
+ return MyClass(x)
+
+
+get_value(create_myclass(3))
+get_value_checked(create_myclass_checked(1))
diff --git a/contrib/python/typeguard/tests/mypy/test_type_annotations.py b/contrib/python/typeguard/tests/mypy/test_type_annotations.py
new file mode 100644
index 00000000000..50ddc50686e
--- /dev/null
+++ b/contrib/python/typeguard/tests/mypy/test_type_annotations.py
@@ -0,0 +1,114 @@
+import os
+import platform
+import re
+import subprocess
+from typing import Dict, List
+
+import pytest
+
+POSITIVE_FILE = "positive.py"
+NEGATIVE_FILE = "negative.py"
+LINE_PATTERN = NEGATIVE_FILE + ":([0-9]+):"
+
+pytestmark = [pytest.mark.skipif(platform.python_implementation() == 'PyPy',
+ reason='MyPy does not work with PyPy yet')]
+
+
+def get_mypy_cmd(filename: str) -> List[str]:
+ return ["mypy", "--strict", filename]
+
+
+def get_negative_mypy_output() -> str:
+ """
+ Get the output from running mypy on the negative examples file.
+ """
+ process = subprocess.run(
+ get_mypy_cmd(NEGATIVE_FILE), stdout=subprocess.PIPE, check=False
+ )
+ output = process.stdout.decode()
+ assert output
+ return output
+
+
+def get_expected_errors() -> Dict[int, str]:
+ """
+ Extract the expected errors from comments in the negative examples file.
+ """
+ with open(NEGATIVE_FILE) as f:
+ lines = f.readlines()
+
+ expected = {}
+
+ for idx, line in enumerate(lines):
+ line = line.rstrip()
+ if "# error" in line:
+ expected[idx + 1] = line[line.index("# error") + 2:]
+
+ # Sanity check. Should update if negative.py changes.
+ assert len(expected) == 9
+ return expected
+
+
+def get_mypy_errors() -> Dict[int, str]:
+ """
+ Extract the errors from running mypy on the negative examples file.
+ """
+ mypy_output = get_negative_mypy_output()
+
+ got = {}
+ for line in mypy_output.splitlines():
+ m = re.match(LINE_PATTERN, line)
+ if m is None:
+ continue
+ got[int(m.group(1))] = line[len(m.group(0)) + 1:]
+
+ return got
+
+
+def chdir_local() -> None:
+ """
+ Change to the local directory. This is so that mypy treats imports from
+ typeguard as external imports instead of source code (which is handled
+ differently by mypy).
+ """
+ os.chdir(os.path.dirname(__file__))
+
+
[email protected]("chdir_local")
+def test_positive() -> None:
+ """
+ Run mypy on the positive test file. There should be no errors.
+ """
+ subprocess.check_call(get_mypy_cmd(POSITIVE_FILE))
+
+
[email protected]("chdir_local")
+def test_negative() -> None:
+ """
+ Run mypy on the negative test file. This should fail. The errors from mypy
+ should match the comments in the file.
+ """
+ got_errors = get_mypy_errors()
+ expected_errors = get_expected_errors()
+
+ if set(got_errors) != set(expected_errors):
+ raise RuntimeError(
+ "Expected error lines {} does not ".format(set(expected_errors)) +
+ "match mypy error lines {}.".format(set(got_errors))
+ )
+
+ mismatches = [
+ (idx, expected_errors[idx], got_errors[idx])
+ for idx in expected_errors
+ if expected_errors[idx] != got_errors[idx]
+ ]
+ for (idx, expected, got) in mismatches:
+ print(
+ "Line {}".format(idx),
+ "Expected: {}".format(expected),
+ "Got: {}".format(got),
+ sep="\n\t"
+ )
+ if mismatches:
+ raise RuntimeError("Error messages changed")
diff --git a/contrib/python/typeguard/tests/test_importhook.py b/contrib/python/typeguard/tests/test_importhook.py
new file mode 100644
index 00000000000..b0f28278da0
--- /dev/null
+++ b/contrib/python/typeguard/tests/test_importhook.py
@@ -0,0 +1,125 @@
+import sys
+import warnings
+from importlib import import_module
+from importlib.util import cache_from_source
+from pathlib import Path
+
+import pytest
+
+from typeguard.importhook import TypeguardFinder, install_import_hook
+
+import yatest.common as yc
+
+this_dir = Path(yc.test_source_path())
+dummy_module_path = this_dir / 'dummymodule.py'
+cached_module_path = Path(cache_from_source(str(dummy_module_path), optimization='typeguard'))
+
+
[email protected](scope='module')
+def dummymodule():
+ if cached_module_path.exists():
+ cached_module_path.unlink()
+
+ sys.path.insert(0, str(this_dir))
+ try:
+ with install_import_hook('dummymodule'):
+ with warnings.catch_warnings():
+ warnings.filterwarnings('error', module='typeguard')
+ module = import_module('dummymodule')
+ return module
+ finally:
+ sys.path.remove(str(this_dir))
+
+
+def test_cached_module(dummymodule):
+ assert cached_module_path.is_file()
+
+
+def test_type_checked_func(dummymodule):
+ assert dummymodule.type_checked_func(2, 3) == 6
+
+
+def test_type_checked_func_error(dummymodule):
+ pytest.raises(TypeError, dummymodule.type_checked_func, 2, '3').\
+ match('"y" must be int; got str instead')
+
+
+def test_non_type_checked_func(dummymodule):
+ assert dummymodule.non_type_checked_func('bah', 9) == 'foo'
+
+
+def test_non_type_checked_decorated_func(dummymodule):
+ assert dummymodule.non_type_checked_decorated_func('bah', 9) == 'foo'
+
+
+def test_typeguard_ignored_func(dummymodule):
+ assert dummymodule.non_typeguard_checked_func('bah', 9) == 'foo'
+
+
+def test_type_checked_method(dummymodule):
+ instance = dummymodule.DummyClass()
+ pytest.raises(TypeError, instance.type_checked_method, 'bah', 9).\
+ match('"x" must be int; got str instead')
+
+
+def test_type_checked_classmethod(dummymodule):
+ pytest.raises(TypeError, dummymodule.DummyClass.type_checked_classmethod, 'bah', 9).\
+ match('"x" must be int; got str instead')
+
+
+def test_type_checked_staticmethod(dummymodule):
+ pytest.raises(TypeError, dummymodule.DummyClass.type_checked_classmethod, 'bah', 9).\
+ match('"x" must be int; got str instead')
+
+
[email protected]('argtype, returntype, error', [
+ (int, str, None),
+ (str, str, '"x" must be str; got int instead'),
+ (int, int, 'type of the return value must be int; got str instead')
+], ids=['correct', 'bad_argtype', 'bad_returntype'])
+def test_dynamic_type_checking_func(dummymodule, argtype, returntype, error):
+ if error:
+ exc = pytest.raises(TypeError, dummymodule.dynamic_type_checking_func, 4, argtype,
+ returntype)
+ exc.match(error)
+ else:
+ assert dummymodule.dynamic_type_checking_func(4, argtype, returntype) == '4'
+
+
+def test_class_in_function(dummymodule):
+ create_inner = dummymodule.outer()
+ retval = create_inner()
+ assert retval.__class__.__qualname__ == 'outer.<locals>.Inner'
+
+
+def test_inner_class_method(dummymodule):
+ retval = dummymodule.Outer().create_inner()
+ assert retval.__class__.__qualname__ == 'Outer.Inner'
+
+
+def test_inner_class_classmethod(dummymodule):
+ retval = dummymodule.Outer.create_inner_classmethod()
+ assert retval.__class__.__qualname__ == 'Outer.Inner'
+
+
+def test_inner_class_staticmethod(dummymodule):
+ retval = dummymodule.Outer.create_inner_staticmethod()
+ assert retval.__class__.__qualname__ == 'Outer.Inner'
+
+
+def test_package_name_matching():
+ """
+ The path finder only matches configured (sub)packages.
+ """
+ packages = ["ham", "spam.eggs"]
+ dummy_original_pathfinder = None
+ finder = TypeguardFinder(packages, dummy_original_pathfinder)
+
+ assert finder.should_instrument("ham")
+ assert finder.should_instrument("ham.eggs")
+ assert finder.should_instrument("spam.eggs")
+
+ assert not finder.should_instrument("spam")
+ assert not finder.should_instrument("ha")
+ assert not finder.should_instrument("spam_eggs")
diff --git a/contrib/python/typeguard/tests/test_typeguard.py b/contrib/python/typeguard/tests/test_typeguard.py
new file mode 100644
index 00000000000..6e7e9cda8bb
--- /dev/null
+++ b/contrib/python/typeguard/tests/test_typeguard.py
@@ -0,0 +1,1548 @@
+import gc
+import sys
+import traceback
+import warnings
+from abc import abstractproperty
+from concurrent.futures import ThreadPoolExecutor
+from functools import lru_cache, partial, wraps
+from io import BytesIO, StringIO
+from typing import (
+ AbstractSet, Any, AnyStr, BinaryIO, Callable, Container, Dict, Generator, Generic, Iterable,
+ Iterator, List, NamedTuple, Sequence, Set, TextIO, Tuple, Type, TypeVar, Union, TypedDict)
+from unittest.mock import MagicMock, Mock
+
+import pytest
+from typing_extensions import Literal, NoReturn, Protocol, runtime_checkable
+
+from typeguard import (
+ ForwardRefPolicy, TypeChecker, TypeHintWarning, TypeWarning, check_argument_types,
+ check_return_type, check_type, function_name, qualified_name, typechecked)
+
+try:
+ from typing import Collection
+except ImportError:
+ # Python 3.6.0+
+ Collection = None
+
+try:
+ from typing import NewType
+except ImportError:
+ myint = None
+else:
+ myint = NewType("myint", int)
+
+
+TBound = TypeVar('TBound', bound='Parent')
+TConstrained = TypeVar('TConstrained', 'Parent', int)
+TTypingConstrained = TypeVar('TTypingConstrained', List[int], AbstractSet[str])
+TIntStr = TypeVar('TIntStr', int, str)
+TIntCollection = TypeVar('TIntCollection', int, Collection)
+TParent = TypeVar('TParent', bound='Parent')
+TChild = TypeVar('TChild', bound='Child')
+T_Foo = TypeVar('T_Foo')
+JSONType = Union[str, int, float, bool, None, List['JSONType'], Dict[str, 'JSONType']]
+
+DummyDict = TypedDict('DummyDict', {'x': int}, total=False)
+issue_42059 = pytest.mark.xfail(bool(DummyDict.__required_keys__),
+ reason='Fails due to upstream bug BPO-42059')
+del DummyDict
+
+Employee = NamedTuple('Employee', [('name', str), ('id', int)])
+
+
+class FooGeneric(Generic[T_Foo]):
+ pass
+
+
+class Parent:
+ pass
+
+
+class Child(Parent):
+ def method(self, a: int):
+ pass
+
+
+class StaticProtocol(Protocol):
+ def meth(self) -> None:
+ ...
+
+
+@runtime_checkable
+class RuntimeProtocol(Protocol):
+ def meth(self) -> None:
+ ...
+
+
[email protected](params=[Mock, MagicMock], ids=['mock', 'magicmock'])
+def mock_class(request):
+ return request.param
+
+
[email protected]('inputval, expected', [
+ (qualified_name, 'function'),
+ (Child(), '__tests__.test_typeguard.Child'),
+ (int, 'int')
+], ids=['func', 'instance', 'builtintype'])
+def test_qualified_name(inputval, expected):
+ assert qualified_name(inputval) == expected
+
+
+def test_function_name():
+ assert function_name(function_name) == 'typeguard.function_name'
+
+
+def test_check_type_no_memo():
+ check_type('foo', [1], List[int])
+
+
+def test_check_type_bytes():
+ pytest.raises(TypeError, check_type, 'foo', 7, bytes).\
+ match(r'type of foo must be bytes-like; got int instead')
+
+
+def test_check_type_no_memo_fail():
+ pytest.raises(TypeError, check_type, 'foo', ['a'], List[int]).\
+ match(r'type of foo\[0\] must be int; got str instead')
+
+
[email protected]('value', ['bar', b'bar'], ids=['str', 'bytes'])
+def test_check_type_anystr(value):
+ check_type('foo', value, AnyStr)
+
+
+def test_check_type_anystr_fail():
+ pytest.raises(TypeError, check_type, 'foo', int, AnyStr).\
+ match(r'type of foo must match one of the constraints \(bytes, str\); got type instead')
+
+
+def test_check_return_type():
+ def foo() -> int:
+ assert check_return_type(0)
+ return 0
+
+ foo()
+
+
+def test_check_return_type_fail():
+ def foo() -> int:
+ assert check_return_type('foo')
+ return 1
+
+ pytest.raises(TypeError, foo).match('type of the return value must be int; got str instead')
+
+
+def test_check_return_notimplemented():
+ class Foo:
+ def __eq__(self, other) -> bool:
+ assert check_return_type(NotImplemented)
+ return NotImplemented
+
+ assert Foo().__eq__(1) is NotImplemented
+
+
+def test_check_recursive_type():
+ check_type('foo', {'a': [1, 2, 3]}, JSONType)
+ pytest.raises(TypeError, check_type, 'foo', {'a': (1, 2, 3)}, JSONType, globals=globals()).\
+ match(r'type of foo must be one of \(str, int, float, (bool, )?NoneType, '
+ r'List\[JSONType\], Dict\[str, JSONType\]\); got dict instead')
+
+
+def test_exec_no_namespace():
+ from textwrap import dedent
+
+ exec(dedent("""
+ from typeguard import typechecked
+
+ @typechecked
+ def f() -> None:
+ pass
+
+ """), {})
+
+
+class TestCheckArgumentTypes:
+ def test_any_type(self):
+ def foo(a: Any):
+ assert check_argument_types()
+
+ foo('aa')
+
+ def test_mock_value(self, mock_class):
+ def foo(a: str, b: int, c: dict, d: Any) -> int:
+ assert check_argument_types()
+
+ foo(mock_class(), mock_class(), mock_class(), mock_class())
+
+ def test_callable_exact_arg_count(self):
+ def foo(a: Callable[[int, str], int]):
+ assert check_argument_types()
+
+ def some_callable(x: int, y: str) -> int:
+ pass
+
+ foo(some_callable)
+
+ def test_callable_bad_type(self):
+ def foo(a: Callable[..., int]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, 5)
+ assert str(exc.value) == 'argument "a" must be a callable'
+
+ def test_callable_too_few_arguments(self):
+ def foo(a: Callable[[int, str], int]):
+ assert check_argument_types()
+
+ def some_callable(x: int) -> int:
+ pass
+
+ exc = pytest.raises(TypeError, foo, some_callable)
+ assert str(exc.value) == (
+ 'callable passed as argument "a" has too few arguments in its declaration; expected 2 '
+ 'but 1 argument(s) declared')
+
+ def test_callable_too_many_arguments(self):
+ def foo(a: Callable[[int, str], int]):
+ assert check_argument_types()
+
+ def some_callable(x: int, y: str, z: float) -> int:
+ pass
+
+ exc = pytest.raises(TypeError, foo, some_callable)
+ assert str(exc.value) == (
+ 'callable passed as argument "a" has too many arguments in its declaration; expected '
+ '2 but 3 argument(s) declared')
+
+ def test_callable_mandatory_kwonlyargs(self):
+ def foo(a: Callable[[int, str], int]):
+ assert check_argument_types()
+
+ def some_callable(x: int, y: str, *, z: float, bar: str) -> int:
+ pass
+
+ exc = pytest.raises(TypeError, foo, some_callable)
+ assert str(exc.value) == (
+ 'callable passed as argument "a" has mandatory keyword-only arguments in its '
+ 'declaration: z, bar')
+
+ def test_callable_class(self):
+ """
+ Test that passing a class as a callable does not count the "self" argument "a"gainst the
+ ones declared in the Callable specification.
+
+ """
+ def foo(a: Callable[[int, str], Any]):
+ assert check_argument_types()
+
+ class SomeClass:
+ def __init__(self, x: int, y: str):
+ pass
+
+ foo(SomeClass)
+
+ def test_callable_plain(self):
+ def foo(a: Callable):
+ assert check_argument_types()
+
+ def callback(a):
+ pass
+
+ foo(callback)
+
+ def test_callable_partial_class(self):
+ """
+ Test that passing a bound method as a callable does not count the "self" argument "a"gainst
+ the ones declared in the Callable specification.
+
+ """
+ def foo(a: Callable[[int], Any]):
+ assert check_argument_types()
+
+ class SomeClass:
+ def __init__(self, x: int, y: str):
+ pass
+
+ foo(partial(SomeClass, y='foo'))
+
+ def test_callable_bound_method(self):
+ """
+ Test that passing a bound method as a callable does not count the "self" argument "a"gainst
+ the ones declared in the Callable specification.
+
+ """
+ def foo(callback: Callable[[int], Any]):
+ assert check_argument_types()
+
+ foo(Child().method)
+
+ def test_callable_partial_bound_method(self):
+ """
+ Test that passing a bound method as a callable does not count the "self" argument "a"gainst
+ the ones declared in the Callable specification.
+
+ """
+ def foo(callback: Callable[[], Any]):
+ assert check_argument_types()
+
+ foo(partial(Child().method, 1))
+
+ def test_callable_defaults(self):
+ """
+ Test that a callable having "too many" arguments don't raise an error if the extra
+ arguments have default values.
+
+ """
+ def foo(callback: Callable[[int, str], Any]):
+ assert check_argument_types()
+
+ def some_callable(x: int, y: str, z: float = 1.2) -> int:
+ pass
+
+ foo(some_callable)
+
+ def test_callable_builtin(self):
+ """
+ Test that checking a Callable annotation against a builtin callable does not raise an
+ error.
+
+ """
+ def foo(callback: Callable[[int], Any]):
+ assert check_argument_types()
+
+ foo([].append)
+
+ def test_dict_bad_type(self):
+ def foo(a: Dict[str, int]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, 5)
+ assert str(exc.value) == (
+ 'type of argument "a" must be a dict; got int instead')
+
+ def test_dict_bad_key_type(self):
+ def foo(a: Dict[str, int]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, {1: 2})
+ assert str(exc.value) == 'type of keys of argument "a" must be str; got int instead'
+
+ def test_dict_bad_value_type(self):
+ def foo(a: Dict[str, int]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, {'x': 'a'})
+ assert str(exc.value) == "type of argument \"a\"['x'] must be int; got str instead"
+
+ def test_list_bad_type(self):
+ def foo(a: List[int]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, 5)
+ assert str(exc.value) == (
+ 'type of argument "a" must be a list; got int instead')
+
+ def test_list_bad_element(self):
+ def foo(a: List[int]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, [1, 2, 'bb'])
+ assert str(exc.value) == (
+ 'type of argument "a"[2] must be int; got str instead')
+
+ def test_sequence_bad_type(self):
+ def foo(a: Sequence[int]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, 5)
+ assert str(exc.value) == (
+ 'type of argument "a" must be a sequence; got int instead')
+
+ def test_sequence_bad_element(self):
+ def foo(a: Sequence[int]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, [1, 2, 'bb'])
+ assert str(exc.value) == (
+ 'type of argument "a"[2] must be int; got str instead')
+
+ def test_abstractset_custom_type(self):
+ class DummySet(AbstractSet[int]):
+ def __contains__(self, x: object) -> bool:
+ return x == 1
+
+ def __len__(self) -> int:
+ return 1
+
+ def __iter__(self) -> Iterator[int]:
+ yield 1
+
+ def foo(a: AbstractSet[int]):
+ assert check_argument_types()
+
+ foo(DummySet())
+
+ def test_abstractset_bad_type(self):
+ def foo(a: AbstractSet[int]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, 5)
+ assert str(exc.value) == 'type of argument "a" must be a set; got int instead'
+
+ def test_set_bad_type(self):
+ def foo(a: Set[int]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, 5)
+ assert str(exc.value) == 'type of argument "a" must be a set; got int instead'
+
+ def test_abstractset_bad_element(self):
+ def foo(a: AbstractSet[int]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, {1, 2, 'bb'})
+ assert str(exc.value) == (
+ 'type of elements of argument "a" must be int; got str instead')
+
+ def test_set_bad_element(self):
+ def foo(a: Set[int]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, {1, 2, 'bb'})
+ assert str(exc.value) == (
+ 'type of elements of argument "a" must be int; got str instead')
+
+ def test_tuple_bad_type(self):
+ def foo(a: Tuple[int]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, 5)
+ assert str(exc.value) == (
+ 'type of argument "a" must be a tuple; got int instead')
+
+ def test_tuple_too_many_elements(self):
+ def foo(a: Tuple[int, str]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, (1, 'aa', 2))
+ assert str(exc.value) == ('argument "a" has wrong number of elements (expected 2, got 3 '
+ 'instead)')
+
+ def test_tuple_too_few_elements(self):
+ def foo(a: Tuple[int, str]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, (1,))
+ assert str(exc.value) == ('argument "a" has wrong number of elements (expected 2, got 1 '
+ 'instead)')
+
+ def test_tuple_bad_element(self):
+ def foo(a: Tuple[int, str]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, (1, 2))
+ assert str(exc.value) == (
+ 'type of argument "a"[1] must be str; got int instead')
+
+ def test_tuple_ellipsis_bad_element(self):
+ def foo(a: Tuple[int, ...]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, (1, 2, 'blah'))
+ assert str(exc.value) == (
+ 'type of argument "a"[2] must be int; got str instead')
+
+ def test_namedtuple(self):
+ def foo(bar: Employee):
+ assert check_argument_types()
+
+ foo(Employee('bob', 1))
+
+ def test_namedtuple_type_mismatch(self):
+ def foo(bar: Employee):
+ assert check_argument_types()
+
+ pytest.raises(TypeError, foo, ('bob', 1)).\
+ match('type of argument "bar" must be a named tuple of type '
+ r'(__tests__\.test_typeguard\.)?Employee; got tuple instead')
+
+ def test_namedtuple_wrong_field_type(self):
+ def foo(bar: Employee):
+ assert check_argument_types()
+
+ pytest.raises(TypeError, foo, Employee(2, 1)).\
+ match('type of argument "bar".name must be str; got int instead')
+
+ @pytest.mark.parametrize('value', [6, 'aa'])
+ def test_union(self, value):
+ def foo(a: Union[str, int]):
+ assert check_argument_types()
+
+ foo(value)
+
+ def test_union_typing_type(self):
+ def foo(a: Union[str, Collection]):
+ assert check_argument_types()
+
+ with pytest.raises(TypeError):
+ foo(1)
+
+ @pytest.mark.parametrize('value', [6.5, b'aa'])
+ def test_union_fail(self, value):
+ def foo(a: Union[str, int]):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, value)
+ assert str(exc.value) == (
+ 'type of argument "a" must be one of (str, int); got {} instead'.
+ format(value.__class__.__name__))
+
+ @pytest.mark.parametrize('values', [
+ (6, 7),
+ ('aa', 'bb')
+ ], ids=['int', 'str'])
+ def test_typevar_constraints(self, values):
+ def foo(a: TIntStr, b: TIntStr):
+ assert check_argument_types()
+
+ foo(*values)
+
+ @pytest.mark.parametrize('value', [
+ [6, 7],
+ {'aa', 'bb'}
+ ], ids=['int', 'str'])
+ def test_typevar_collection_constraints(self, value):
+ def foo(a: TTypingConstrained):
+ assert check_argument_types()
+
+ foo(value)
+
+ def test_typevar_collection_constraints_fail(self):
+ def foo(a: TTypingConstrained):
+ assert check_argument_types()
+
+ pytest.raises(TypeError, foo, {1, 2}).\
+ match(r'type of argument "a" must match one of the constraints \(List\[int\], '
+ r'AbstractSet\[str\]\); got set instead')
+
+ def test_typevar_constraints_fail(self):
+ def foo(a: TIntStr, b: TIntStr):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, 2.5, 'aa')
+ assert str(exc.value) == ('type of argument "a" must match one of the constraints '
+ '(int, str); got float instead')
+
+ def test_typevar_bound(self):
+ def foo(a: TParent, b: TParent):
+ assert check_argument_types()
+
+ foo(Child(), Child())
+
+ def test_typevar_bound_fail(self):
+ def foo(a: TChild, b: TChild):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, Parent(), Parent())
+ assert str(exc.value) == ('type of argument "a" must be __tests__.test_typeguard.Child or one of '
+ 'its subclasses; got __tests__.test_typeguard.Parent instead')
+
+ @pytest.mark.skipif(Type is List, reason='typing.Type could not be imported')
+ def test_class_bad_subclass(self):
+ def foo(a: Type[Child]):
+ assert check_argument_types()
+
+ pytest.raises(TypeError, foo, Parent).match(
+ '"a" must be a subclass of __tests__.test_typeguard.Child; got __tests__.test_typeguard.Parent instead')
+
+ def test_class_any(self):
+ def foo(a: Type[Any]):
+ assert check_argument_types()
+
+ foo(str)
+
+ def test_class_union(self):
+ def foo(a: Type[Union[str, int]]):
+ assert check_argument_types()
+
+ foo(str)
+ foo(int)
+ pytest.raises(TypeError, foo, tuple).\
+ match(r'"a" must match one of the following: \(str, int\); got tuple instead')
+
+ def test_wrapped_function(self):
+ def decorator(func):
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ return func(*args, **kwargs)
+ return wrapper
+
+ @decorator
+ def foo(a: 'Child'):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, Parent())
+ assert str(exc.value) == ('type of argument "a" must be __tests__.test_typeguard.Child; '
+ 'got __tests__.test_typeguard.Parent instead')
+
+ def test_mismatching_default_type(self):
+ def foo(a: str = 1):
+ assert check_argument_types()
+
+ pytest.raises(TypeError, foo).match('type of argument "a" must be str; got int instead')
+
+ def test_implicit_default_none(self):
+ """
+ Test that if the default value is ``None``, a ``None`` argument can be passed.
+
+ """
+ def foo(a: str = None):
+ assert check_argument_types()
+
+ foo()
+
+ def test_generator(self):
+ """Test that argument type checking works in a generator function too."""
+ def generate(a: int):
+ assert check_argument_types()
+ yield a
+ yield a + 1
+
+ gen = generate(1)
+ next(gen)
+
+ def test_wrapped_generator_no_return_type_annotation(self):
+ """Test that return type checking works in a generator function too."""
+ @typechecked
+ def generate(a: int):
+ yield a
+ yield a + 1
+
+ gen = generate(1)
+ next(gen)
+
+ def test_varargs(self):
+ def foo(*args: int):
+ assert check_argument_types()
+
+ foo(1, 2)
+
+ def test_varargs_fail(self):
+ def foo(*args: int):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, 1, 'a')
+ exc.match(r'type of argument "args"\[1\] must be int; got str instead')
+
+ def test_kwargs(self):
+ def foo(**kwargs: int):
+ assert check_argument_types()
+
+ foo(a=1, b=2)
+
+ def test_kwargs_fail(self):
+ def foo(**kwargs: int):
+ assert check_argument_types()
+
+ exc = pytest.raises(TypeError, foo, a=1, b='a')
+ exc.match(r'type of argument "kwargs"\[\'b\'\] must be int; got str instead')
+
+ def test_generic(self):
+ def foo(a: FooGeneric[str]):
+ assert check_argument_types()
+
+ foo(FooGeneric[str]())
+
+ @pytest.mark.skipif(myint is None, reason='NewType is not present in the typing module')
+ def test_newtype(self):
+ def foo(a: myint) -> int:
+ assert check_argument_types()
+ return 42
+
+ assert foo(1) == 42
+ exc = pytest.raises(TypeError, foo, "a")
+ assert str(exc.value) == 'type of argument "a" must be int; got str instead'
+
+ @pytest.mark.skipif(Collection is None, reason='typing.Collection is not available')
+ def test_collection(self):
+ def foo(a: Collection):
+ assert check_argument_types()
+
+ pytest.raises(TypeError, foo, True).match(
+ 'type of argument "a" must be collections.abc.Collection; got bool instead')
+
+ def test_binary_io(self):
+ def foo(a: BinaryIO):
+ assert check_argument_types()
+
+ foo(BytesIO())
+
+ def test_text_io(self):
+ def foo(a: TextIO):
+ assert check_argument_types()
+
+ foo(StringIO())
+
+ def test_binary_io_fail(self):
+ def foo(a: TextIO):
+ assert check_argument_types()
+
+ pytest.raises(TypeError, foo, BytesIO()).match('must be a text based I/O')
+
+ def test_text_io_fail(self):
+ def foo(a: BinaryIO):
+ assert check_argument_types()
+
+ pytest.raises(TypeError, foo, StringIO()).match('must be a binary I/O')
+
+ def test_binary_io_real_file(self, tmpdir):
+ def foo(a: BinaryIO):
+ assert check_argument_types()
+
+ with tmpdir.join('testfile').open('wb') as f:
+ foo(f)
+
+ def test_text_io_real_file(self, tmpdir):
+ def foo(a: TextIO):
+ assert check_argument_types()
+
+ with tmpdir.join('testfile').open('w') as f:
+ foo(f)
+
+ def test_recursive_type(self):
+ def foo(arg: JSONType) -> None:
+ assert check_argument_types()
+
+ foo({'a': [1, 2, 3]})
+ pytest.raises(TypeError, foo, {'a': (1, 2, 3)}).\
+ match(r'type of argument "arg" must be one of \(str, int, float, (bool, )?NoneType, '
+ r'List\[Union\[str, int, float, (bool, )?NoneType, List\[JSONType\], '
+ r'Dict\[str, JSONType\]\]\], '
+ r'Dict\[str, Union\[str, int, float, (bool, )?NoneType, List\[JSONType\], '
+ r'Dict\[str, JSONType\]\]\]\); got dict instead')
+
+
+class TestTypeChecked:
+ def test_typechecked(self):
+ @typechecked
+ def foo(a: int, b: str) -> str:
+ return 'abc'
+
+ assert foo(4, 'abc') == 'abc'
+
+ def test_typechecked_always(self):
+ @typechecked(always=True)
+ def foo(a: int, b: str) -> str:
+ return 'abc'
+
+ assert foo(4, 'abc') == 'abc'
+
+ def test_typechecked_arguments_fail(self):
+ @typechecked
+ def foo(a: int, b: str) -> str:
+ return 'abc'
+
+ exc = pytest.raises(TypeError, foo, 4, 5)
+ assert str(exc.value) == 'type of argument "b" must be str; got int instead'
+
+ def test_typechecked_return_type_fail(self):
+ @typechecked
+ def foo(a: int, b: str) -> str:
+ return 6
+
+ exc = pytest.raises(TypeError, foo, 4, 'abc')
+ assert str(exc.value) == 'type of the return value must be str; got int instead'
+
+ def test_typechecked_return_typevar_fail(self):
+ T = TypeVar('T', int, float)
+
+ @typechecked
+ def foo(a: T, b: T) -> T:
+ return 'a'
+
+ pytest.raises(TypeError, foo, 4, 2).\
+ match(r'type of the return value must match one of the constraints \(int, float\); '
+ r'got str instead')
+
+ def test_typechecked_no_annotations(self, recwarn):
+ def foo(a, b):
+ pass
+
+ typechecked(foo)
+
+ func_name = function_name(foo)
+ assert len(recwarn) == 1
+ assert str(recwarn[0].message) == (
+ 'no type annotations present -- not typechecking {}'.format(func_name))
+
+ def test_return_type_none(self):
+ """Check that a declared return type of None is respected."""
+ @typechecked
+ def foo() -> None:
+ return 'a'
+
+ exc = pytest.raises(TypeError, foo)
+ assert str(exc.value) == 'type of the return value must be NoneType; got str instead'
+
+ def test_return_type_magicmock(self, mock_class):
+ @typechecked
+ def foo() -> str:
+ return mock_class()
+
+ foo()
+
+ @pytest.mark.parametrize('typehint', [
+ Callable[..., int],
+ Callable
+ ], ids=['parametrized', 'unparametrized'])
+ def test_callable(self, typehint):
+ @typechecked
+ def foo(a: typehint):
+ pass
+
+ def some_callable() -> int:
+ pass
+
+ foo(some_callable)
+
+ @pytest.mark.parametrize('typehint', [
+ List[int],
+ List,
+ list,
+ ], ids=['parametrized', 'unparametrized', 'plain'])
+ def test_list(self, typehint):
+ @typechecked
+ def foo(a: typehint):
+ pass
+
+ foo([1, 2])
+
+ @pytest.mark.parametrize('typehint', [
+ Dict[str, int],
+ Dict,
+ dict
+ ], ids=['parametrized', 'unparametrized', 'plain'])
+ def test_dict(self, typehint):
+ @typechecked
+ def foo(a: typehint):
+ pass
+
+ foo({'x': 2})
+
+ @pytest.mark.parametrize('typehint, value', [
+ (Dict, {'x': 2, 6: 4}),
+ (List, ['x', 6]),
+ (Sequence, ['x', 6]),
+ (Set, {'x', 6}),
+ (AbstractSet, {'x', 6}),
+ (Tuple, ('x', 6)),
+ ], ids=['dict', 'list', 'sequence', 'set', 'abstractset', 'tuple'])
+ def test_unparametrized_types_mixed_values(self, typehint, value):
+ @typechecked
+ def foo(a: typehint):
+ pass
+
+ foo(value)
+
+ @pytest.mark.parametrize('typehint', [
+ Sequence[str],
+ Sequence
+ ], ids=['parametrized', 'unparametrized'])
+ @pytest.mark.parametrize('value', [('a', 'b'), ['a', 'b'], 'abc'],
+ ids=['tuple', 'list', 'str'])
+ def test_sequence(self, typehint, value):
+ @typechecked
+ def foo(a: typehint):
+ pass
+
+ foo(value)
+
+ @pytest.mark.parametrize('typehint', [
+ Iterable[str],
+ Iterable
+ ], ids=['parametrized', 'unparametrized'])
+ @pytest.mark.parametrize('value', [('a', 'b'), ['a', 'b'], 'abc'],
+ ids=['tuple', 'list', 'str'])
+ def test_iterable(self, typehint, value):
+ @typechecked
+ def foo(a: typehint):
+ pass
+
+ foo(value)
+
+ @pytest.mark.parametrize('typehint', [
+ Container[str],
+ Container
+ ], ids=['parametrized', 'unparametrized'])
+ @pytest.mark.parametrize('value', [('a', 'b'), ['a', 'b'], 'abc'],
+ ids=['tuple', 'list', 'str'])
+ def test_container(self, typehint, value):
+ @typechecked
+ def foo(a: typehint):
+ pass
+
+ foo(value)
+
+ @pytest.mark.parametrize('typehint', [
+ AbstractSet[int],
+ AbstractSet,
+ Set[int],
+ Set,
+ set
+ ], ids=['abstract_parametrized', 'abstract', 'parametrized', 'unparametrized', 'plain'])
+ @pytest.mark.parametrize('value', [set(), {6}])
+ def test_set(self, typehint, value):
+ @typechecked
+ def foo(a: typehint):
+ pass
+
+ foo(value)
+
+ @pytest.mark.parametrize('typehint', [
+ Tuple[int, int],
+ Tuple[int, ...],
+ Tuple,
+ tuple
+ ], ids=['parametrized', 'ellipsis', 'unparametrized', 'plain'])
+ def test_tuple(self, typehint):
+ @typechecked
+ def foo(a: typehint):
+ pass
+
+ foo((1, 2))
+
+ def test_empty_tuple(self):
+ @typechecked
+ def foo(a: Tuple[()]):
+ pass
+
+ foo(())
+
+ @pytest.mark.skipif(Type is List, reason='typing.Type could not be imported')
+ @pytest.mark.parametrize('typehint', [
+ Type[Parent],
+ Type[TypeVar('UnboundType')], # noqa: F821
+ Type[TypeVar('BoundType', bound=Parent)], # noqa: F821
+ Type,
+ type
+ ], ids=['parametrized', 'unbound-typevar', 'bound-typevar', 'unparametrized', 'plain'])
+ def test_class(self, typehint):
+ @typechecked
+ def foo(a: typehint):
+ pass
+
+ foo(Child)
+
+ @pytest.mark.skipif(Type is List, reason='typing.Type could not be imported')
+ def test_class_not_a_class(self):
+ @typechecked
+ def foo(a: Type[dict]):
+ pass
+
+ exc = pytest.raises(TypeError, foo, 1)
+ exc.match('type of argument "a" must be a type; got int instead')
+
+ @pytest.mark.parametrize('typehint, value', [
+ (complex, complex(1, 5)),
+ (complex, 1.0),
+ (complex, 1),
+ (float, 1.0),
+ (float, 1)
+ ], ids=['complex-complex', 'complex-float', 'complex-int', 'float-float', 'float-int'])
+ def test_numbers(self, typehint, value):
+ @typechecked
+ def foo(a: typehint):
+ pass
+
+ foo(value)
+
+ def test_coroutine_correct_return_type(self):
+ @typechecked
+ async def foo() -> str:
+ return 'foo'
+
+ coro = foo()
+ pytest.raises(StopIteration, coro.send, None)
+
+ def test_coroutine_wrong_return_type(self):
+ @typechecked
+ async def foo() -> str:
+ return 1
+
+ coro = foo()
+ pytest.raises(TypeError, coro.send, None).\
+ match('type of the return value must be str; got int instead')
+
+ def test_bytearray_bytes(self):
+ """Test that a bytearray is accepted where bytes are expected."""
+ @typechecked
+ def foo(x: bytes) -> None:
+ pass
+
+ foo(bytearray([1]))
+
+ def test_bytearray_memoryview(self):
+ """Test that a bytearray is accepted where bytes are expected."""
+ @typechecked
+ def foo(x: bytes) -> None:
+ pass
+
+ foo(memoryview(b'foo'))
+
+ def test_class_decorator(self):
+ @typechecked
+ class Foo:
+ @staticmethod
+ def staticmethod() -> int:
+ return 'foo'
+
+ @classmethod
+ def classmethod(cls) -> int:
+ return 'foo'
+
+ def method(self) -> int:
+ return 'foo'
+
+ @property
+ def prop(self) -> int:
+ return 'foo'
+
+ @property
+ def prop2(self) -> int:
+ return 'foo'
+
+ @prop2.setter
+ def prop2(self, value: int) -> None:
+ pass
+
+ pattern = 'type of the return value must be int; got str instead'
+ pytest.raises(TypeError, Foo.staticmethod).match(pattern)
+ pytest.raises(TypeError, Foo.classmethod).match(pattern)
+ pytest.raises(TypeError, Foo().method).match(pattern)
+
+ with pytest.raises(TypeError) as raises:
+ Foo().prop
+
+ assert raises.value.args[0] == pattern
+
+ with pytest.raises(TypeError) as raises:
+ Foo().prop2
+
+ assert raises.value.args[0] == pattern
+
+ with pytest.raises(TypeError) as raises:
+ Foo().prop2 = 'foo'
+
+ assert raises.value.args[0] == 'type of argument "value" must be int; got str instead'
+
+ @pytest.mark.parametrize('annotation', [
+ Generator[int, str, List[str]],
+ Generator,
+ Iterable[int],
+ Iterable,
+ Iterator[int],
+ Iterator
+ ], ids=['generator', 'bare_generator', 'iterable', 'bare_iterable', 'iterator',
+ 'bare_iterator'])
+ def test_generator(self, annotation):
+ @typechecked
+ def genfunc() -> annotation:
+ val1 = yield 2
+ val2 = yield 3
+ val3 = yield 4
+ return [val1, val2, val3]
+
+ gen = genfunc()
+ with pytest.raises(StopIteration) as exc:
+ value = next(gen)
+ while True:
+ value = gen.send(str(value))
+ assert isinstance(value, int)
+
+ assert exc.value.value == ['2', '3', '4']
+
+ @pytest.mark.parametrize('annotation', [
+ Generator[int, str, None],
+ Iterable[int],
+ Iterator[int]
+ ], ids=['generator', 'iterable', 'iterator'])
+ def test_generator_bad_yield(self, annotation):
+ @typechecked
+ def genfunc() -> annotation:
+ yield 'foo'
+
+ gen = genfunc()
+ with pytest.raises(TypeError) as exc:
+ next(gen)
+
+ exc.match('type of value yielded from generator must be int; got str instead')
+
+ def test_generator_bad_send(self):
+ @typechecked
+ def genfunc() -> Generator[int, str, None]:
+ yield 1
+ yield 2
+
+ gen = genfunc()
+ next(gen)
+ with pytest.raises(TypeError) as exc:
+ gen.send(2)
+
+ exc.match('type of value sent to generator must be str; got int instead')
+
+ def test_generator_bad_return(self):
+ @typechecked
+ def genfunc() -> Generator[int, str, str]:
+ yield 1
+ return 6
+
+ gen = genfunc()
+ next(gen)
+ with pytest.raises(TypeError) as exc:
+ gen.send('foo')
+
+ exc.match('type of return value must be str; got int instead')
+
+ def test_return_generator(self):
+ @typechecked
+ def genfunc() -> Generator[int, None, None]:
+ yield 1
+
+ @typechecked
+ def foo() -> Generator[int, None, None]:
+ return genfunc()
+
+ foo()
+
+ def test_builtin_decorator(self):
+ @typechecked
+ @lru_cache()
+ def func(x: int) -> None:
+ pass
+
+ func(3)
+ func(3)
+ pytest.raises(TypeError, func, 'foo').\
+ match('type of argument "x" must be int; got str instead')
+
+ # Make sure that @lru_cache is still being used
+ cache_info = func.__wrapped__.cache_info()
+ assert cache_info.hits == 1
+
+ def test_local_class(self):
+ @typechecked
+ class LocalClass:
+ class Inner:
+ pass
+
+ def create_inner(self) -> 'Inner':
+ return self.Inner()
+
+ retval = LocalClass().create_inner()
+ assert isinstance(retval, LocalClass.Inner)
+
+ def test_local_class_async(self):
+ @typechecked
+ class LocalClass:
+ class Inner:
+ pass
+
+ async def create_inner(self) -> 'Inner':
+ return self.Inner()
+
+ coro = LocalClass().create_inner()
+ exc = pytest.raises(StopIteration, coro.send, None)
+ retval = exc.value.value
+ assert isinstance(retval, LocalClass.Inner)
+
+ def test_callable_nonmember(self):
+ class CallableClass:
+ def __call__(self):
+ pass
+
+ @typechecked
+ class LocalClass:
+ some_callable = CallableClass()
+
+ def test_inherited_class_method(self):
+ @typechecked
+ class Parent:
+ @classmethod
+ def foo(cls, x: str) -> str:
+ return cls.__name__
+
+ @typechecked
+ class Child(Parent):
+ pass
+
+ assert Child.foo('bar') == 'Child'
+ pytest.raises(TypeError, Child.foo, 1)
+
+ def test_class_property(self):
+ @typechecked
+ class Foo:
+ def __init__(self) -> None:
+ self.foo = 'foo'
+
+ @property
+ def prop(self) -> int:
+ """My property."""
+ return 4
+
+ @property
+ def prop2(self) -> str:
+ return self.foo
+
+ @prop2.setter
+ def prop2(self, value: str) -> None:
+ self.foo = value
+
+ assert Foo.__dict__["prop"].__doc__.strip() == "My property."
+ f = Foo()
+ assert f.prop == 4
+ assert f.prop2 == 'foo'
+ f.prop2 = 'bar'
+ assert f.prop2 == 'bar'
+
+ with pytest.raises(TypeError) as raises:
+ f.prop2 = 3
+
+ assert raises.value.args[0] == 'type of argument "value" must be str; got int instead'
+
+ def test_decorator_factory_no_annotations(self):
+ class CallableClass:
+ def __call__(self):
+ pass
+
+ def decorator_factory():
+ def decorator(f):
+ cmd = CallableClass()
+ return cmd
+
+ return decorator
+
+ with pytest.warns(UserWarning):
+ @typechecked
+ @decorator_factory()
+ def foo():
+ pass
+
+ @pytest.mark.skipif(sys.version_info >= (3, 12), reason="Fail wint Python 3.12")
+ @pytest.mark.parametrize('annotation', [TBound, TConstrained], ids=['bound', 'constrained'])
+ def test_typevar_forwardref(self, annotation):
+ @typechecked
+ def func(x: annotation) -> None:
+ pass
+
+ func(Parent())
+ func(Child())
+ pytest.raises(TypeError, func, 'foo')
+
+ @pytest.mark.parametrize('protocol_cls', [RuntimeProtocol, StaticProtocol])
+ def test_protocol(self, protocol_cls):
+ @typechecked
+ def foo(arg: protocol_cls) -> None:
+ pass
+
+ class Foo:
+ def meth(self) -> None:
+ pass
+
+ foo(Foo())
+
+ def test_protocol_fail(self):
+ @typechecked
+ def foo(arg: RuntimeProtocol) -> None:
+ pass
+
+ pytest.raises(TypeError, foo, object()).\
+ match(r'type of argument "arg" \(object\) is not compatible with the RuntimeProtocol '
+ 'protocol')
+
+ def test_noreturn(self):
+ @typechecked
+ def foo() -> NoReturn:
+ pass
+
+ pytest.raises(TypeError, foo).match(r'foo\(\) was declared never to return but it did')
+
+ def test_recursive_type(self):
+ @typechecked
+ def foo(arg: JSONType) -> None:
+ pass
+
+ foo({'a': [1, 2, 3]})
+ pytest.raises(TypeError, foo, {'a': (1, 2, 3)}).\
+ match(r'type of argument "arg" must be one of \(str, int, float, (bool, )?NoneType, '
+ r'List\[Union\[str, int, float, (bool, )?NoneType, List\[JSONType\], '
+ r'Dict\[str, JSONType\]\]\], '
+ r'Dict\[str, Union\[str, int, float, (bool, )?NoneType, List\[JSONType\], '
+ r'Dict\[str, JSONType\]\]\]\); got dict instead')
+
+ def test_literal(self):
+ from http import HTTPStatus
+
+ @typechecked
+ def foo(a: Literal[1, True, 'x', b'y', HTTPStatus.ACCEPTED]):
+ pass
+
+ foo(HTTPStatus.ACCEPTED)
+ pytest.raises(TypeError, foo, 4).match(r"must be one of \(1, True, 'x', b'y', "
+ r"<HTTPStatus.ACCEPTED: 202>\); got 4 instead$")
+
+ def test_literal_union(self):
+ @typechecked
+ def foo(a: Union[str, Literal[1, 6, 8]]):
+ pass
+
+ foo(6)
+ pytest.raises(TypeError, foo, 4).\
+ match(r'must be one of \(str, Literal\[1, 6, 8\]\); got int instead$')
+
+ def test_literal_nested(self):
+ @typechecked
+ def foo(a: Literal[1, Literal['x', 'a', Literal['z']], 6, 8]):
+ pass
+
+ foo('z')
+ pytest.raises(TypeError, foo, 4).match(r"must be one of \(1, 'x', 'a', 'z', 6, 8\); "
+ r"got 4 instead$")
+
+ def test_literal_illegal_value(self):
+ @typechecked
+ def foo(a: Literal[1, 1.1]):
+ pass
+
+ pytest.raises(TypeError, foo, 4).match(r"Illegal literal value: 1.1$")
+
+ @pytest.mark.parametrize('value, total, error_re', [
+ pytest.param({'x': 6, 'y': 'foo'}, True, None, id='correct'),
+ pytest.param({'y': 'foo'}, True, r'required key\(s\) \("x"\) missing from argument "arg"',
+ id='missing_x'),
+ pytest.param({'x': 6, 'y': 3}, True,
+ 'type of dict item "y" for argument "arg" must be str; got int instead',
+ id='wrong_y'),
+ pytest.param({'x': 6}, True, r'required key\(s\) \("y"\) missing from argument "arg"',
+ id='missing_y_error'),
+ pytest.param({'x': 6}, False, None, id='missing_y_ok', marks=[issue_42059]),
+ pytest.param({'x': 'abc'}, False,
+ 'type of dict item "x" for argument "arg" must be int; got str instead',
+ id='wrong_x', marks=[issue_42059]),
+ pytest.param({'x': 6, 'foo': 'abc'}, False, r'extra key\(s\) \("foo"\) in argument "arg"',
+ id='unknown_key')
+ ])
+ def test_typed_dict(self, value, total, error_re):
+ DummyDict = TypedDict('DummyDict', {'x': int, 'y': str}, total=total)
+
+ @typechecked
+ def foo(arg: DummyDict):
+ pass
+
+ if error_re:
+ pytest.raises(TypeError, foo, value).match(error_re)
+ else:
+ foo(value)
+
+ def test_class_abstract_property(self):
+ """Regression test for #206."""
+
+ @typechecked
+ class Foo:
+ @abstractproperty
+ def dummyproperty(self):
+ pass
+
+ assert isinstance(Foo.dummyproperty, abstractproperty)
+
+
+class TestTypeChecker:
+ @pytest.fixture
+ def executor(self):
+ executor = ThreadPoolExecutor(1)
+ yield executor
+ executor.shutdown()
+
+ @pytest.fixture
+ def checker(self):
+ with warnings.catch_warnings():
+ warnings.simplefilter('ignore', DeprecationWarning)
+ return TypeChecker(__name__)
+
+ @staticmethod
+ def generatorfunc() -> Generator[int, None, None]:
+ yield 1
+
+ @staticmethod
+ def bad_generatorfunc() -> Generator[int, None, None]:
+ yield 1
+ yield 'foo'
+
+ @staticmethod
+ def error_function() -> float:
+ return 1 / 0
+
+ def test_check_call_args(self, checker: TypeChecker):
+ def foo(a: int):
+ pass
+
+ with checker, pytest.warns(TypeWarning) as record:
+ assert checker.active
+ foo(1)
+ foo('x')
+
+ assert not checker.active
+ foo('x')
+
+ assert len(record) == 1
+ warning = record[0].message
+ assert warning.error == 'type of argument "a" must be int; got str instead'
+ assert warning.func is foo
+ assert isinstance(warning.stack, list)
+ buffer = StringIO()
+ warning.print_stack(buffer)
+ assert len(buffer.getvalue()) > 100
+
+ def test_check_return_value(self, checker: TypeChecker):
+ def foo() -> int:
+ return 'x'
+
+ with checker, pytest.warns(TypeWarning) as record:
+ foo()
+
+ assert len(record) == 1
+ assert record[0].message.error == 'type of the return value must be int; got str instead'
+
+ def test_threaded_check_call_args(self, checker: TypeChecker, executor):
+ def foo(a: int):
+ pass
+
+ with checker, pytest.warns(TypeWarning) as record:
+ executor.submit(foo, 1).result()
+ executor.submit(foo, 'x').result()
+
+ executor.submit(foo, 'x').result()
+
+ assert len(record) == 1
+ warning = record[0].message
+ assert warning.error == 'type of argument "a" must be int; got str instead'
+ assert warning.func is foo
+
+ def test_double_start(self, checker: TypeChecker):
+ """Test that the same type checker can't be started twice while running."""
+ with checker:
+ pytest.raises(RuntimeError, checker.start).match('type checker already running')
+
+ def test_nested(self):
+ """Test that nesting of type checker context managers works as expected."""
+ def foo(a: int):
+ pass
+
+ with warnings.catch_warnings(record=True):
+ warnings.simplefilter('ignore', DeprecationWarning)
+ parent = TypeChecker(__name__)
+ child = TypeChecker(__name__)
+
+ with parent, pytest.warns(TypeWarning) as record:
+ foo('x')
+ with child:
+ foo('x')
+
+ assert len(record) == 3
+
+ def test_existing_profiler(self, checker: TypeChecker):
+ """
+ Test that an existing profiler function is chained with the type checker and restored after
+ the block is exited.
+
+ """
+ def foo(a: int):
+ pass
+
+ def profiler(frame, event, arg):
+ nonlocal profiler_run_count
+ if event in ('call', 'return'):
+ profiler_run_count += 1
+
+ if old_profiler:
+ old_profiler(frame, event, arg)
+
+ profiler_run_count = 0
+ old_profiler = sys.getprofile()
+ sys.setprofile(profiler)
+ try:
+ with checker, pytest.warns(TypeWarning) as record:
+ foo(1)
+ foo('x')
+
+ assert sys.getprofile() is profiler
+ finally:
+ sys.setprofile(old_profiler)
+
+ assert profiler_run_count
+ assert len(record) == 1
+
+ def test_generator(self, checker):
+ with checker, pytest.warns(None) as record:
+ gen = self.generatorfunc()
+ assert next(gen) == 1
+
+ assert len(record) == 0
+
+ def test_generator_wrong_yield(self, checker):
+ with checker, pytest.warns(TypeWarning) as record:
+ gen = self.bad_generatorfunc()
+ assert list(gen) == [1, 'foo']
+
+ assert len(record) == 1
+ assert 'type of yielded value must be int; got str instead' in str(record[0].message)
+
+ def test_exception(self, checker):
+ with checker, pytest.warns(None) as record:
+ pytest.raises(ZeroDivisionError, self.error_function)
+
+ assert len(record) == 0
+
+ @pytest.mark.parametrize('policy', [ForwardRefPolicy.WARN, ForwardRefPolicy.GUESS],
+ ids=['warn', 'guess'])
+ def test_forward_ref_policy_resolution_fails(self, checker, policy):
+ def unresolvable_annotation(x: 'OrderedDict'): # noqa
+ pass
+
+ checker.annotation_policy = policy
+ gc.collect() # prevent find_function() from finding more than one instance of the function
+ with checker, pytest.warns(TypeHintWarning) as record:
+ unresolvable_annotation({})
+
+ assert len(record) == 1
+ assert ("unresolvable_annotation: name 'OrderedDict' is not defined"
+ in str(record[0].message))
+ assert 'x' not in unresolvable_annotation.__annotations__
+
+ def test_forward_ref_policy_guess(self, checker):
+ import collections
+
+ def unresolvable_annotation(x: 'OrderedDict'): # noqa
+ pass
+
+ checker.annotation_policy = ForwardRefPolicy.GUESS
+ with checker, pytest.warns(TypeHintWarning) as record:
+ unresolvable_annotation(collections.OrderedDict())
+
+ assert len(record) == 1
+ assert str(record[0].message).startswith("Replaced forward declaration 'OrderedDict' in")
+ assert unresolvable_annotation.__annotations__['x'] is collections.OrderedDict
+
+
+class TestTracebacks:
+ def test_short_tracebacks(self):
+ def foo(a: Callable[..., int]):
+ assert check_argument_types()
+
+ try:
+ foo(1)
+ except TypeError:
+ _, _, tb = sys.exc_info()
+ parts = traceback.extract_tb(tb)
+ typeguard_lines = [part for part in parts
+ if part.filename.endswith("typeguard/__init__.py")]
+ assert len(typeguard_lines) == 1
diff --git a/contrib/python/typeguard/tests/test_typeguard_py36.py b/contrib/python/typeguard/tests/test_typeguard_py36.py
new file mode 100644
index 00000000000..383f7f3353f
--- /dev/null
+++ b/contrib/python/typeguard/tests/test_typeguard_py36.py
@@ -0,0 +1,189 @@
+import sys
+import warnings
+from typing import Any, AsyncGenerator, AsyncIterable, AsyncIterator, Callable, Dict
+
+import pytest
+from typing_extensions import Protocol, runtime_checkable
+
+from typeguard import TypeChecker, typechecked
+
+try:
+ from typing import TypedDict
+except ImportError:
+ from typing_extensions import TypedDict
+
+
+@runtime_checkable
+class RuntimeProtocol(Protocol):
+ member: int
+
+ def meth(self) -> None:
+ ...
+
+
+class TestTypeChecked:
+ @pytest.mark.parametrize('annotation', [
+ AsyncGenerator[int, str],
+ AsyncIterable[int],
+ AsyncIterator[int]
+ ], ids=['generator', 'iterable', 'iterator'])
+ def test_async_generator(self, annotation):
+ async def run_generator():
+ @typechecked
+ async def genfunc() -> annotation:
+ values.append((yield 2))
+ values.append((yield 3))
+ values.append((yield 4))
+
+ gen = genfunc()
+
+ value = await gen.asend(None)
+ with pytest.raises(StopAsyncIteration):
+ while True:
+ value = await gen.asend(str(value))
+ assert isinstance(value, int)
+
+ values = []
+ coro = run_generator()
+ try:
+ for elem in coro.__await__():
+ print(elem)
+ except StopAsyncIteration as exc:
+ values = exc.value
+
+ assert values == ['2', '3', '4']
+
+ @pytest.mark.parametrize('annotation', [
+ AsyncGenerator[int, str],
+ AsyncIterable[int],
+ AsyncIterator[int]
+ ], ids=['generator', 'iterable', 'iterator'])
+ def test_async_generator_bad_yield(self, annotation):
+ @typechecked
+ async def genfunc() -> annotation:
+ yield 'foo'
+
+ gen = genfunc()
+ with pytest.raises(TypeError) as exc:
+ next(gen.__anext__().__await__())
+
+ exc.match('type of value yielded from generator must be int; got str instead')
+
+ def test_async_generator_bad_send(self):
+ @typechecked
+ async def genfunc() -> AsyncGenerator[int, str]:
+ yield 1
+ yield 2
+
+ gen = genfunc()
+ pytest.raises(StopIteration, next, gen.__anext__().__await__())
+ with pytest.raises(TypeError) as exc:
+ next(gen.asend(2).__await__())
+
+ exc.match('type of value sent to generator must be str; got int instead')
+
+ def test_return_async_generator(self):
+ @typechecked
+ async def genfunc() -> AsyncGenerator[int, None]:
+ yield 1
+
+ @typechecked
+ def foo() -> AsyncGenerator[int, None]:
+ return genfunc()
+
+ foo()
+
+ def test_async_generator_iterate(self):
+ asyncgen = typechecked(asyncgenfunc)()
+ aiterator = asyncgen.__aiter__()
+ exc = pytest.raises(StopIteration, aiterator.__anext__().send, None)
+ assert exc.value.value == 1
+
+ def test_typeddict_inherited(self):
+ class ParentDict(TypedDict):
+ x: int
+
+ class ChildDict(ParentDict, total=False):
+ y: int
+
+ @typechecked
+ def foo(arg: ChildDict):
+ pass
+
+ foo({'x': 1})
+ if sys.version_info[:2] != (3, 8):
+ # TypedDict is unusable for runtime checking on Python 3.8
+ pytest.raises(TypeError, foo, {'y': 1})
+
+ def test_mapping_is_not_typeddict(self):
+ """Regression test for #216."""
+
+ class Foo(Dict[str, Any]):
+ pass
+
+ @typechecked
+ def foo(arg: Foo):
+ pass
+
+ foo(Foo({'x': 1}))
+
+
+async def asyncgenfunc() -> AsyncGenerator[int, None]:
+ yield 1
+
+
+async def asyncgeniterablefunc() -> AsyncIterable[int]:
+ yield 1
+
+
+async def asyncgeniteratorfunc() -> AsyncIterator[int]:
+ yield 1
+
+
+class TestTypeChecker:
+ @pytest.fixture
+ def checker(self):
+ with warnings.catch_warnings():
+ warnings.simplefilter('ignore', DeprecationWarning)
+ return TypeChecker(__name__)
+
+ @pytest.mark.parametrize('func', [asyncgenfunc, asyncgeniterablefunc, asyncgeniteratorfunc],
+ ids=['generator', 'iterable', 'iterator'])
+ def test_async_generator(self, checker, func):
+ """Make sure that the type checker does not complain about the None return value."""
+ with checker, pytest.warns(None) as record:
+ func()
+
+ assert len(record) == 0
+
+ def test_callable(self):
+ class command:
+ # we need an __annotations__ attribute to trigger the code path
+ whatever: float
+
+ def __init__(self, function: Callable[[int], int]):
+ self.function = function
+
+ def __call__(self, arg: int) -> None:
+ self.function(arg)
+
+ @typechecked
+ @command
+ def function(arg: int) -> None:
+ pass
+
+ function(1)
+
+
+def test_protocol_non_method_members():
+ @typechecked
+ def foo(a: RuntimeProtocol):
+ pass
+
+ class Foo:
+ member = 1
+
+ def meth(self) -> None:
+ pass
+
+ foo(Foo())
diff --git a/contrib/python/typeguard/tests/ya.make b/contrib/python/typeguard/tests/ya.make
new file mode 100644
index 00000000000..0649ac4e9f7
--- /dev/null
+++ b/contrib/python/typeguard/tests/ya.make
@@ -0,0 +1,22 @@
+PY3TEST()
+
+PEERDIR(
+ contrib/python/typeguard
+ contrib/python/typing-extensions
+)
+
+TEST_SRCS(
+ conftest.py
+ dummymodule.py
+ test_importhook.py
+ test_typeguard.py
+ test_typeguard_py36.py
+)
+
+DATA(
+ arcadia/contrib/python/typeguard/tests
+)
+
+NO_LINT()
+
+END()
diff --git a/contrib/python/typeguard/typeguard/__init__.py b/contrib/python/typeguard/typeguard/__init__.py
new file mode 100644
index 00000000000..33d83d4582c
--- /dev/null
+++ b/contrib/python/typeguard/typeguard/__init__.py
@@ -0,0 +1,1258 @@
+__all__ = ('ForwardRefPolicy', 'TypeHintWarning', 'typechecked', 'check_return_type',
+ 'check_argument_types', 'check_type', 'TypeWarning', 'TypeChecker',
+ 'typeguard_ignore')
+
+import collections.abc
+import gc
+import inspect
+import sys
+import threading
+from collections import OrderedDict
+from enum import Enum
+from functools import partial, wraps
+from inspect import Parameter, isclass, isfunction, isgeneratorfunction
+from io import BufferedIOBase, IOBase, RawIOBase, TextIOBase
+from traceback import extract_stack, print_stack
+from types import CodeType, FunctionType
+from typing import (
+ IO, TYPE_CHECKING, AbstractSet, Any, AsyncIterable, AsyncIterator, BinaryIO, Callable, Dict,
+ Generator, Iterable, Iterator, List, NewType, Optional, Sequence, Set, TextIO, Tuple, Type,
+ TypeVar, Union, get_type_hints, overload)
+from unittest.mock import Mock
+from warnings import warn
+from weakref import WeakKeyDictionary, WeakValueDictionary
+
+# Python 3.8+
+try:
+ from typing_extensions import Literal
+except ImportError:
+ try:
+ from typing import Literal
+ except ImportError:
+ Literal = None
+
+# Python 3.5.4+ / 3.6.2+
+try:
+ from typing_extensions import NoReturn
+except ImportError:
+ try:
+ from typing import NoReturn
+ except ImportError:
+ NoReturn = None
+
+# Python 3.6+
+try:
+ from inspect import isasyncgen, isasyncgenfunction
+ from typing import AsyncGenerator
+except ImportError:
+ AsyncGenerator = None
+
+ def isasyncgen(obj):
+ return False
+
+ def isasyncgenfunction(func):
+ return False
+
+# Python 3.8+
+try:
+ from typing import ForwardRef
+ evaluate_forwardref = ForwardRef._evaluate
+except ImportError:
+ from typing import _ForwardRef as ForwardRef
+ evaluate_forwardref = ForwardRef._eval_type
+
+if sys.version_info >= (3, 10):
+ from typing import is_typeddict
+else:
+ _typed_dict_meta_types = ()
+ if sys.version_info >= (3, 8):
+ from typing import _TypedDictMeta
+ _typed_dict_meta_types += (_TypedDictMeta,)
+
+ try:
+ from typing_extensions import _TypedDictMeta
+ _typed_dict_meta_types += (_TypedDictMeta,)
+ except ImportError:
+ pass
+
+ def is_typeddict(tp) -> bool:
+ return isinstance(tp, _typed_dict_meta_types)
+
+
+if TYPE_CHECKING:
+ _F = TypeVar("_F")
+
+ def typeguard_ignore(f: _F) -> _F:
+ """This decorator is a noop during static type-checking."""
+ return f
+else:
+ from typing import no_type_check as typeguard_ignore
+
+
+_type_hints_map = WeakKeyDictionary() # type: Dict[FunctionType, Dict[str, Any]]
+_functions_map = WeakValueDictionary() # type: Dict[CodeType, FunctionType]
+_missing = object()
+
+T_CallableOrType = TypeVar('T_CallableOrType', bound=Callable[..., Any])
+
+# Lifted from mypy.sharedparse
+BINARY_MAGIC_METHODS = {
+ "__add__",
+ "__and__",
+ "__cmp__",
+ "__divmod__",
+ "__div__",
+ "__eq__",
+ "__floordiv__",
+ "__ge__",
+ "__gt__",
+ "__iadd__",
+ "__iand__",
+ "__idiv__",
+ "__ifloordiv__",
+ "__ilshift__",
+ "__imatmul__",
+ "__imod__",
+ "__imul__",
+ "__ior__",
+ "__ipow__",
+ "__irshift__",
+ "__isub__",
+ "__itruediv__",
+ "__ixor__",
+ "__le__",
+ "__lshift__",
+ "__lt__",
+ "__matmul__",
+ "__mod__",
+ "__mul__",
+ "__ne__",
+ "__or__",
+ "__pow__",
+ "__radd__",
+ "__rand__",
+ "__rdiv__",
+ "__rfloordiv__",
+ "__rlshift__",
+ "__rmatmul__",
+ "__rmod__",
+ "__rmul__",
+ "__ror__",
+ "__rpow__",
+ "__rrshift__",
+ "__rshift__",
+ "__rsub__",
+ "__rtruediv__",
+ "__rxor__",
+ "__sub__",
+ "__truediv__",
+ "__xor__",
+}
+
+
+class ForwardRefPolicy(Enum):
+ """Defines how unresolved forward references are handled."""
+
+ ERROR = 1 #: propagate the :exc:`NameError` from :func:`~typing.get_type_hints`
+ WARN = 2 #: remove the annotation and emit a TypeHintWarning
+ #: replace the annotation with the argument's class if the qualified name matches, else remove
+ #: the annotation
+ GUESS = 3
+
+
+class TypeHintWarning(UserWarning):
+ """
+ A warning that is emitted when a type hint in string form could not be resolved to an actual
+ type.
+ """
+
+
+class _TypeCheckMemo:
+ __slots__ = 'globals', 'locals'
+
+ def __init__(self, globals: Dict[str, Any], locals: Dict[str, Any]):
+ self.globals = globals
+ self.locals = locals
+
+
+def _strip_annotation(annotation):
+ if isinstance(annotation, str):
+ return annotation.strip("'")
+ else:
+ return annotation
+
+
+class _CallMemo(_TypeCheckMemo):
+ __slots__ = 'func', 'func_name', 'arguments', 'is_generator', 'type_hints'
+
+ def __init__(self, func: Callable, frame_locals: Optional[Dict[str, Any]] = None,
+ args: tuple = None, kwargs: Dict[str, Any] = None,
+ forward_refs_policy=ForwardRefPolicy.ERROR):
+ super().__init__(func.__globals__, frame_locals)
+ self.func = func
+ self.func_name = function_name(func)
+ self.is_generator = isgeneratorfunction(func)
+ signature = inspect.signature(func)
+
+ if args is not None and kwargs is not None:
+ self.arguments = signature.bind(*args, **kwargs).arguments
+ else:
+ assert frame_locals is not None, 'frame must be specified if args or kwargs is None'
+ self.arguments = frame_locals
+
+ self.type_hints = _type_hints_map.get(func)
+ if self.type_hints is None:
+ while True:
+ if sys.version_info < (3, 5, 3):
+ frame_locals = dict(frame_locals)
+
+ try:
+ hints = get_type_hints(func, localns=frame_locals)
+ except NameError as exc:
+ if forward_refs_policy is ForwardRefPolicy.ERROR:
+ raise
+
+ typename = str(exc).split("'", 2)[1]
+ for param in signature.parameters.values():
+ if _strip_annotation(param.annotation) == typename:
+ break
+ else:
+ raise
+
+ func_name = function_name(func)
+ if forward_refs_policy is ForwardRefPolicy.GUESS:
+ if param.name in self.arguments:
+ argtype = self.arguments[param.name].__class__
+ stripped = _strip_annotation(param.annotation)
+ if stripped == argtype.__qualname__:
+ func.__annotations__[param.name] = argtype
+ msg = ('Replaced forward declaration {!r} in {} with {!r}'
+ .format(stripped, func_name, argtype))
+ warn(TypeHintWarning(msg))
+ continue
+
+ msg = 'Could not resolve type hint {!r} on {}: {}'.format(
+ param.annotation, function_name(func), exc)
+ warn(TypeHintWarning(msg))
+ del func.__annotations__[param.name]
+ else:
+ break
+
+ self.type_hints = OrderedDict()
+ for name, parameter in signature.parameters.items():
+ if name in hints:
+ annotated_type = hints[name]
+
+ # PEP 428 discourages it by MyPy does not complain
+ if parameter.default is None:
+ annotated_type = Optional[annotated_type]
+
+ if parameter.kind == Parameter.VAR_POSITIONAL:
+ self.type_hints[name] = Tuple[annotated_type, ...]
+ elif parameter.kind == Parameter.VAR_KEYWORD:
+ self.type_hints[name] = Dict[str, annotated_type]
+ else:
+ self.type_hints[name] = annotated_type
+
+ if 'return' in hints:
+ self.type_hints['return'] = hints['return']
+
+ _type_hints_map[func] = self.type_hints
+
+
+def resolve_forwardref(maybe_ref, memo: _TypeCheckMemo):
+ if isinstance(maybe_ref, ForwardRef):
+ if sys.version_info < (3, 9, 0):
+ return evaluate_forwardref(maybe_ref, memo.globals, memo.locals)
+ else:
+ return evaluate_forwardref(maybe_ref, memo.globals, memo.locals, recursive_guard=frozenset())
+
+ else:
+ return maybe_ref
+
+
+def get_type_name(type_):
+ name = (getattr(type_, '__name__', None) or getattr(type_, '_name', None) or
+ getattr(type_, '__forward_arg__', None))
+ if name is None:
+ origin = getattr(type_, '__origin__', None)
+ name = getattr(origin, '_name', None)
+ if name is None and not inspect.isclass(type_):
+ name = type_.__class__.__name__.strip('_')
+
+ args = getattr(type_, '__args__', ()) or getattr(type_, '__values__', ())
+ if args != getattr(type_, '__parameters__', ()):
+ if name == 'Literal':
+ formatted_args = ', '.join(str(arg) for arg in args)
+ else:
+ formatted_args = ', '.join(get_type_name(arg) for arg in args)
+
+ name = '{}[{}]'.format(name, formatted_args)
+
+ module = getattr(type_, '__module__', None)
+ if module not in (None, 'typing', 'typing_extensions', 'builtins'):
+ name = module + '.' + name
+
+ return name
+
+
+def find_function(frame) -> Optional[Callable]:
+ """
+ Return a function object from the garbage collector that matches the frame's code object.
+
+ This process is unreliable as several function objects could use the same code object.
+ Fortunately the likelihood of this happening with the combination of the function objects
+ having different type annotations is a very rare occurrence.
+
+ :param frame: a frame object
+ :return: a function object if one was found, ``None`` if not
+
+ """
+ func = _functions_map.get(frame.f_code)
+ if func is None:
+ for obj in gc.get_referrers(frame.f_code):
+ if inspect.isfunction(obj):
+ if func is None:
+ # The first match was found
+ func = obj
+ else:
+ # A second match was found
+ return None
+
+ # Cache the result for future lookups
+ if func is not None:
+ _functions_map[frame.f_code] = func
+ else:
+ raise LookupError('target function not found')
+
+ return func
+
+
+def qualified_name(obj) -> str:
+ """
+ Return the qualified name (e.g. package.module.Type) for the given object.
+
+ Builtins and types from the :mod:`typing` package get special treatment by having the module
+ name stripped from the generated name.
+
+ """
+ type_ = obj if inspect.isclass(obj) else type(obj)
+ module = type_.__module__
+ qualname = type_.__qualname__
+ return qualname if module in ('typing', 'builtins') else '{}.{}'.format(module, qualname)
+
+
+def function_name(func: Callable) -> str:
+ """
+ Return the qualified name of the given function.
+
+ Builtins and types from the :mod:`typing` package get special treatment by having the module
+ name stripped from the generated name.
+
+ """
+ # For partial functions and objects with __call__ defined, __qualname__ does not exist
+ # For functions run in `exec` with a custom namespace, __module__ can be None
+ module = getattr(func, '__module__', '') or ''
+ qualname = (module + '.') if module not in ('builtins', '') else ''
+ return qualname + getattr(func, '__qualname__', repr(func))
+
+
+def check_callable(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
+ if not callable(value):
+ raise TypeError('{} must be a callable'.format(argname))
+
+ if getattr(expected_type, "__args__", None):
+ try:
+ signature = inspect.signature(value)
+ except (TypeError, ValueError):
+ return
+
+ if hasattr(expected_type, '__result__'):
+ # Python 3.5
+ argument_types = expected_type.__args__
+ check_args = argument_types is not Ellipsis
+ else:
+ # Python 3.6
+ argument_types = expected_type.__args__[:-1]
+ check_args = argument_types != (Ellipsis,)
+
+ if check_args:
+ # The callable must not have keyword-only arguments without defaults
+ unfulfilled_kwonlyargs = [
+ param.name for param in signature.parameters.values() if
+ param.kind == Parameter.KEYWORD_ONLY and param.default == Parameter.empty]
+ if unfulfilled_kwonlyargs:
+ raise TypeError(
+ 'callable passed as {} has mandatory keyword-only arguments in its '
+ 'declaration: {}'.format(argname, ', '.join(unfulfilled_kwonlyargs)))
+
+ num_mandatory_args = len([
+ param.name for param in signature.parameters.values()
+ if param.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD) and
+ param.default is Parameter.empty])
+ has_varargs = any(param for param in signature.parameters.values()
+ if param.kind == Parameter.VAR_POSITIONAL)
+
+ if num_mandatory_args > len(argument_types):
+ raise TypeError(
+ 'callable passed as {} has too many arguments in its declaration; expected {} '
+ 'but {} argument(s) declared'.format(argname, len(argument_types),
+ num_mandatory_args))
+ elif not has_varargs and num_mandatory_args < len(argument_types):
+ raise TypeError(
+ 'callable passed as {} has too few arguments in its declaration; expected {} '
+ 'but {} argument(s) declared'.format(argname, len(argument_types),
+ num_mandatory_args))
+
+
+def check_dict(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
+ if not isinstance(value, dict):
+ raise TypeError('type of {} must be a dict; got {} instead'.
+ format(argname, qualified_name(value)))
+
+ if expected_type is not dict:
+ if (hasattr(expected_type, "__args__") and
+ expected_type.__args__ not in (None, expected_type.__parameters__)):
+ key_type, value_type = expected_type.__args__
+ if key_type is not Any or value_type is not Any:
+ for k, v in value.items():
+ check_type('keys of {}'.format(argname), k, key_type, memo)
+ check_type('{}[{!r}]'.format(argname, k), v, value_type, memo)
+
+
+def check_typed_dict(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
+ declared_keys = frozenset(expected_type.__annotations__)
+ if hasattr(expected_type, '__required_keys__'):
+ required_keys = expected_type.__required_keys__
+ else: # py3.8 and lower
+ required_keys = declared_keys if expected_type.__total__ else frozenset()
+
+ existing_keys = frozenset(value)
+ extra_keys = existing_keys - declared_keys
+ if extra_keys:
+ keys_formatted = ', '.join('"{}"'.format(key) for key in sorted(extra_keys))
+ raise TypeError('extra key(s) ({}) in {}'.format(keys_formatted, argname))
+
+ missing_keys = required_keys - existing_keys
+ if missing_keys:
+ keys_formatted = ', '.join('"{}"'.format(key) for key in sorted(missing_keys))
+ raise TypeError('required key(s) ({}) missing from {}'.format(keys_formatted, argname))
+
+ for key, argtype in get_type_hints(expected_type).items():
+ argvalue = value.get(key, _missing)
+ if argvalue is not _missing:
+ check_type('dict item "{}" for {}'.format(key, argname), argvalue, argtype, memo)
+
+
+def check_list(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
+ if not isinstance(value, list):
+ raise TypeError('type of {} must be a list; got {} instead'.
+ format(argname, qualified_name(value)))
+
+ if expected_type is not list:
+ if hasattr(expected_type, "__args__") and expected_type.__args__ not in \
+ (None, expected_type.__parameters__):
+ value_type = expected_type.__args__[0]
+ if value_type is not Any:
+ for i, v in enumerate(value):
+ check_type('{}[{}]'.format(argname, i), v, value_type, memo)
+
+
+def check_sequence(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
+ if not isinstance(value, collections.abc.Sequence):
+ raise TypeError('type of {} must be a sequence; got {} instead'.
+ format(argname, qualified_name(value)))
+
+ if hasattr(expected_type, "__args__") and expected_type.__args__ not in \
+ (None, expected_type.__parameters__):
+ value_type = expected_type.__args__[0]
+ if value_type is not Any:
+ for i, v in enumerate(value):
+ check_type('{}[{}]'.format(argname, i), v, value_type, memo)
+
+
+def check_set(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
+ if not isinstance(value, AbstractSet):
+ raise TypeError('type of {} must be a set; got {} instead'.
+ format(argname, qualified_name(value)))
+
+ if expected_type is not set:
+ if hasattr(expected_type, "__args__") and expected_type.__args__ not in \
+ (None, expected_type.__parameters__):
+ value_type = expected_type.__args__[0]
+ if value_type is not Any:
+ for v in value:
+ check_type('elements of {}'.format(argname), v, value_type, memo)
+
+
+def check_tuple(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
+ # Specialized check for NamedTuples
+ is_named_tuple = False
+ if sys.version_info < (3, 8, 0):
+ is_named_tuple = hasattr(expected_type, '_field_types') # deprecated since python 3.8
+ else:
+ is_named_tuple = hasattr(expected_type, '__annotations__')
+
+ if is_named_tuple:
+ if not isinstance(value, expected_type):
+ raise TypeError('type of {} must be a named tuple of type {}; got {} instead'.
+ format(argname, qualified_name(expected_type), qualified_name(value)))
+
+ if sys.version_info < (3, 8, 0):
+ field_types = expected_type._field_types
+ else:
+ field_types = expected_type.__annotations__
+
+ for name, field_type in field_types.items():
+ check_type('{}.{}'.format(argname, name), getattr(value, name), field_type, memo)
+
+ return
+ elif not isinstance(value, tuple):
+ raise TypeError('type of {} must be a tuple; got {} instead'.
+ format(argname, qualified_name(value)))
+
+ if getattr(expected_type, '__tuple_params__', None):
+ # Python 3.5
+ use_ellipsis = expected_type.__tuple_use_ellipsis__
+ tuple_params = expected_type.__tuple_params__
+ elif getattr(expected_type, '__args__', None):
+ # Python 3.6+
+ use_ellipsis = expected_type.__args__[-1] is Ellipsis
+ tuple_params = expected_type.__args__[:-1 if use_ellipsis else None]
+ else:
+ # Unparametrized Tuple or plain tuple
+ return
+
+ if use_ellipsis:
+ element_type = tuple_params[0]
+ for i, element in enumerate(value):
+ check_type('{}[{}]'.format(argname, i), element, element_type, memo)
+ elif tuple_params == ((),):
+ if value != ():
+ raise TypeError('{} is not an empty tuple but one was expected'.format(argname))
+ else:
+ if len(value) != len(tuple_params):
+ raise TypeError('{} has wrong number of elements (expected {}, got {} instead)'
+ .format(argname, len(tuple_params), len(value)))
+
+ for i, (element, element_type) in enumerate(zip(value, tuple_params)):
+ check_type('{}[{}]'.format(argname, i), element, element_type, memo)
+
+
+def check_union(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
+ if hasattr(expected_type, '__union_params__'):
+ # Python 3.5
+ union_params = expected_type.__union_params__
+ else:
+ # Python 3.6+
+ union_params = expected_type.__args__
+
+ for type_ in union_params:
+ try:
+ check_type(argname, value, type_, memo)
+ return
+ except TypeError:
+ pass
+
+ typelist = ', '.join(get_type_name(t) for t in union_params)
+ raise TypeError('type of {} must be one of ({}); got {} instead'.
+ format(argname, typelist, qualified_name(value)))
+
+
+def check_class(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
+ if not isclass(value):
+ raise TypeError('type of {} must be a type; got {} instead'.format(
+ argname, qualified_name(value)))
+
+ # Needed on Python 3.7+
+ if expected_type is Type:
+ return
+
+ if getattr(expected_type, '__origin__', None) in (Type, type):
+ expected_class = expected_type.__args__[0]
+ else:
+ expected_class = expected_type
+
+ if expected_class is Any:
+ return
+ elif isinstance(expected_class, TypeVar):
+ check_typevar(argname, value, expected_class, memo, True)
+ elif getattr(expected_class, '__origin__', None) is Union:
+ for arg in expected_class.__args__:
+ try:
+ check_class(argname, value, arg, memo)
+ break
+ except TypeError:
+ pass
+ else:
+ formatted_args = ', '.join(get_type_name(arg) for arg in expected_class.__args__)
+ raise TypeError('{} must match one of the following: ({}); got {} instead'.format(
+ argname, formatted_args, qualified_name(value)
+ ))
+ elif not issubclass(value, expected_class):
+ raise TypeError('{} must be a subclass of {}; got {} instead'.format(
+ argname, qualified_name(expected_class), qualified_name(value)))
+
+
+def check_typevar(argname: str, value, typevar: TypeVar, memo: _TypeCheckMemo,
+ subclass_check: bool = False) -> None:
+ value_type = value if subclass_check else type(value)
+ subject = argname if subclass_check else 'type of ' + argname
+
+ if typevar.__bound__ is not None:
+ bound_type = resolve_forwardref(typevar.__bound__, memo)
+ if not issubclass(value_type, bound_type):
+ raise TypeError(
+ '{} must be {} or one of its subclasses; got {} instead'
+ .format(subject, qualified_name(bound_type), qualified_name(value_type)))
+ elif typevar.__constraints__:
+ constraints = [resolve_forwardref(c, memo) for c in typevar.__constraints__]
+ for constraint in constraints:
+ try:
+ check_type(argname, value, constraint, memo)
+ except TypeError:
+ pass
+ else:
+ break
+ else:
+ formatted_constraints = ', '.join(get_type_name(constraint)
+ for constraint in constraints)
+ raise TypeError('{} must match one of the constraints ({}); got {} instead'
+ .format(subject, formatted_constraints, qualified_name(value_type)))
+
+
+def check_literal(argname: str, value, expected_type, memo: _TypeCheckMemo):
+ def get_args(literal):
+ try:
+ args = literal.__args__
+ except AttributeError:
+ # Instance of Literal from typing_extensions
+ args = literal.__values__
+
+ retval = []
+ for arg in args:
+ if isinstance(arg, Literal.__class__) or getattr(arg, '__origin__', None) is Literal:
+ # The first check works on py3.6 and lower, the second one on py3.7+
+ retval.extend(get_args(arg))
+ elif isinstance(arg, (int, str, bytes, bool, type(None), Enum)):
+ retval.append(arg)
+ else:
+ raise TypeError('Illegal literal value: {}'.format(arg))
+
+ return retval
+
+ final_args = tuple(get_args(expected_type))
+ if value not in final_args:
+ raise TypeError('the value of {} must be one of {}; got {} instead'.
+ format(argname, final_args, value))
+
+
+def check_number(argname: str, value, expected_type):
+ if expected_type is complex and not isinstance(value, (complex, float, int)):
+ raise TypeError('type of {} must be either complex, float or int; got {} instead'.
+ format(argname, qualified_name(value.__class__)))
+ elif expected_type is float and not isinstance(value, (float, int)):
+ raise TypeError('type of {} must be either float or int; got {} instead'.
+ format(argname, qualified_name(value.__class__)))
+
+
+def check_io(argname: str, value, expected_type):
+ if expected_type is TextIO:
+ if not isinstance(value, TextIOBase):
+ raise TypeError('type of {} must be a text based I/O object; got {} instead'.
+ format(argname, qualified_name(value.__class__)))
+ elif expected_type is BinaryIO:
+ if not isinstance(value, (RawIOBase, BufferedIOBase)):
+ raise TypeError('type of {} must be a binary I/O object; got {} instead'.
+ format(argname, qualified_name(value.__class__)))
+ elif not isinstance(value, IOBase):
+ raise TypeError('type of {} must be an I/O object; got {} instead'.
+ format(argname, qualified_name(value.__class__)))
+
+
+def check_protocol(argname: str, value, expected_type):
+ # TODO: implement proper compatibility checking and support non-runtime protocols
+ if getattr(expected_type, '_is_runtime_protocol', False):
+ if not isinstance(value, expected_type):
+ raise TypeError('type of {} ({}) is not compatible with the {} protocol'.
+ format(argname, type(value).__qualname__, expected_type.__qualname__))
+
+
+# Equality checks are applied to these
+origin_type_checkers = {
+ AbstractSet: check_set,
+ Callable: check_callable,
+ collections.abc.Callable: check_callable,
+ dict: check_dict,
+ Dict: check_dict,
+ list: check_list,
+ List: check_list,
+ Sequence: check_sequence,
+ collections.abc.Sequence: check_sequence,
+ collections.abc.Set: check_set,
+ set: check_set,
+ Set: check_set,
+ tuple: check_tuple,
+ Tuple: check_tuple,
+ type: check_class,
+ Type: check_class,
+ Union: check_union
+}
+_subclass_check_unions = hasattr(Union, '__union_set_params__')
+if Literal is not None:
+ origin_type_checkers[Literal] = check_literal
+
+generator_origin_types = (Generator, collections.abc.Generator,
+ Iterator, collections.abc.Iterator,
+ Iterable, collections.abc.Iterable)
+asyncgen_origin_types = (AsyncIterator, collections.abc.AsyncIterator,
+ AsyncIterable, collections.abc.AsyncIterable)
+if AsyncGenerator is not None:
+ asyncgen_origin_types += (AsyncGenerator,)
+if hasattr(collections.abc, 'AsyncGenerator'):
+ asyncgen_origin_types += (collections.abc.AsyncGenerator,)
+
+
+def check_type(argname: str, value, expected_type, memo: Optional[_TypeCheckMemo] = None, *,
+ globals: Optional[Dict[str, Any]] = None,
+ locals: Optional[Dict[str, Any]] = None) -> None:
+ """
+ Ensure that ``value`` matches ``expected_type``.
+
+ The types from the :mod:`typing` module do not support :func:`isinstance` or :func:`issubclass`
+ so a number of type specific checks are required. This function knows which checker to call
+ for which type.
+
+ :param argname: name of the argument to check; used for error messages
+ :param value: value to be checked against ``expected_type``
+ :param expected_type: a class or generic type instance
+ :param globals: dictionary of global variables to use for resolving forward references
+ (defaults to the calling frame's globals)
+ :param locals: dictionary of local variables to use for resolving forward references
+ (defaults to the calling frame's locals)
+ :raises TypeError: if there is a type mismatch
+
+ """
+ if expected_type is Any or isinstance(value, Mock):
+ return
+
+ if expected_type is None:
+ # Only happens on < 3.6
+ expected_type = type(None)
+
+ if memo is None:
+ frame = sys._getframe(1)
+ if globals is None:
+ globals = frame.f_globals
+ if locals is None:
+ locals = frame.f_locals
+
+ memo = _TypeCheckMemo(globals, locals)
+
+ expected_type = resolve_forwardref(expected_type, memo)
+ origin_type = getattr(expected_type, '__origin__', None)
+ if origin_type is not None:
+ checker_func = origin_type_checkers.get(origin_type)
+ if checker_func:
+ checker_func(argname, value, expected_type, memo)
+ else:
+ check_type(argname, value, origin_type, memo)
+ elif isclass(expected_type):
+ if issubclass(expected_type, Tuple):
+ check_tuple(argname, value, expected_type, memo)
+ elif issubclass(expected_type, (float, complex)):
+ check_number(argname, value, expected_type)
+ elif _subclass_check_unions and issubclass(expected_type, Union):
+ check_union(argname, value, expected_type, memo)
+ elif isinstance(expected_type, TypeVar):
+ check_typevar(argname, value, expected_type, memo)
+ elif issubclass(expected_type, IO):
+ check_io(argname, value, expected_type)
+ elif is_typeddict(expected_type):
+ check_typed_dict(argname, value, expected_type, memo)
+ elif getattr(expected_type, '_is_protocol', False):
+ check_protocol(argname, value, expected_type)
+ else:
+ expected_type = (getattr(expected_type, '__extra__', None) or origin_type or
+ expected_type)
+
+ if expected_type is bytes:
+ # As per https://github.com/python/typing/issues/552
+ if not isinstance(value, (bytearray, bytes, memoryview)):
+ raise TypeError('type of {} must be bytes-like; got {} instead'
+ .format(argname, qualified_name(value)))
+ elif not isinstance(value, expected_type):
+ raise TypeError(
+ 'type of {} must be {}; got {} instead'.
+ format(argname, qualified_name(expected_type), qualified_name(value)))
+ elif isinstance(expected_type, TypeVar):
+ # Only happens on < 3.6
+ check_typevar(argname, value, expected_type, memo)
+ elif isinstance(expected_type, Literal.__class__):
+ # Only happens on < 3.7 when using Literal from typing_extensions
+ check_literal(argname, value, expected_type, memo)
+ elif expected_type.__class__ is NewType:
+ # typing.NewType on Python 3.10+
+ return check_type(argname, value, expected_type.__supertype__, memo)
+ elif (isfunction(expected_type) and
+ getattr(expected_type, "__module__", None) == "typing" and
+ getattr(expected_type, "__qualname__", None).startswith("NewType.") and
+ hasattr(expected_type, "__supertype__")):
+ # typing.NewType on Python 3.9 and below
+ return check_type(argname, value, expected_type.__supertype__, memo)
+
+
+def check_return_type(retval, memo: Optional[_CallMemo] = None) -> bool:
+ """
+ Check that the return value is compatible with the return value annotation in the function.
+
+ :param retval: the value about to be returned from the call
+ :return: ``True``
+ :raises TypeError: if there is a type mismatch
+
+ """
+ if memo is None:
+ # faster than inspect.currentframe(), but not officially
+ # supported in all python implementations
+ frame = sys._getframe(1)
+
+ try:
+ func = find_function(frame)
+ except LookupError:
+ return True # This can happen with the Pydev/PyCharm debugger extension installed
+
+ memo = _CallMemo(func, frame.f_locals)
+
+ if 'return' in memo.type_hints:
+ if memo.type_hints['return'] is NoReturn:
+ raise TypeError('{}() was declared never to return but it did'.format(memo.func_name))
+
+ try:
+ check_type('the return value', retval, memo.type_hints['return'], memo)
+ except TypeError as exc: # suppress unnecessarily long tracebacks
+ # Allow NotImplemented if this is a binary magic method (__eq__() et al)
+ if retval is NotImplemented and memo.type_hints['return'] is bool:
+ # This does (and cannot) not check if it's actually a method
+ func_name = memo.func_name.rsplit('.', 1)[-1]
+ if len(memo.arguments) == 2 and func_name in BINARY_MAGIC_METHODS:
+ return True
+
+ raise TypeError(*exc.args) from None
+
+ return True
+
+
+def check_argument_types(memo: Optional[_CallMemo] = None) -> bool:
+ """
+ Check that the argument values match the annotated types.
+
+ Unless both ``args`` and ``kwargs`` are provided, the information will be retrieved from
+ the previous stack frame (ie. from the function that called this).
+
+ :return: ``True``
+ :raises TypeError: if there is an argument type mismatch
+
+ """
+ if memo is None:
+ # faster than inspect.currentframe(), but not officially
+ # supported in all python implementations
+ frame = sys._getframe(1)
+
+ try:
+ func = find_function(frame)
+ except LookupError:
+ return True # This can happen with the Pydev/PyCharm debugger extension installed
+
+ memo = _CallMemo(func, frame.f_locals)
+
+ for argname, expected_type in memo.type_hints.items():
+ if argname != 'return' and argname in memo.arguments:
+ value = memo.arguments[argname]
+ description = 'argument "{}"'.format(argname)
+ try:
+ check_type(description, value, expected_type, memo)
+ except TypeError as exc: # suppress unnecessarily long tracebacks
+ raise TypeError(*exc.args) from None
+
+ return True
+
+
+class TypeCheckedGenerator:
+ def __init__(self, wrapped: Generator, memo: _CallMemo):
+ rtype_args = []
+ if hasattr(memo.type_hints['return'], "__args__"):
+ rtype_args = memo.type_hints['return'].__args__
+
+ self.__wrapped = wrapped
+ self.__memo = memo
+ self.__yield_type = rtype_args[0] if rtype_args else Any
+ self.__send_type = rtype_args[1] if len(rtype_args) > 1 else Any
+ self.__return_type = rtype_args[2] if len(rtype_args) > 2 else Any
+ self.__initialized = False
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return self.send(None)
+
+ def __getattr__(self, name: str) -> Any:
+ return getattr(self.__wrapped, name)
+
+ def throw(self, *args):
+ return self.__wrapped.throw(*args)
+
+ def close(self):
+ self.__wrapped.close()
+
+ def send(self, obj):
+ if self.__initialized:
+ check_type('value sent to generator', obj, self.__send_type, memo=self.__memo)
+ else:
+ self.__initialized = True
+
+ try:
+ value = self.__wrapped.send(obj)
+ except StopIteration as exc:
+ check_type('return value', exc.value, self.__return_type, memo=self.__memo)
+ raise
+
+ check_type('value yielded from generator', value, self.__yield_type, memo=self.__memo)
+ return value
+
+
+class TypeCheckedAsyncGenerator:
+ def __init__(self, wrapped: AsyncGenerator, memo: _CallMemo):
+ rtype_args = memo.type_hints['return'].__args__
+ self.__wrapped = wrapped
+ self.__memo = memo
+ self.__yield_type = rtype_args[0]
+ self.__send_type = rtype_args[1] if len(rtype_args) > 1 else Any
+ self.__initialized = False
+
+ def __aiter__(self):
+ return self
+
+ def __anext__(self):
+ return self.asend(None)
+
+ def __getattr__(self, name: str) -> Any:
+ return getattr(self.__wrapped, name)
+
+ def athrow(self, *args):
+ return self.__wrapped.athrow(*args)
+
+ def aclose(self):
+ return self.__wrapped.aclose()
+
+ async def asend(self, obj):
+ if self.__initialized:
+ check_type('value sent to generator', obj, self.__send_type, memo=self.__memo)
+ else:
+ self.__initialized = True
+
+ value = await self.__wrapped.asend(obj)
+ check_type('value yielded from generator', value, self.__yield_type, memo=self.__memo)
+ return value
+
+
+@overload
+def typechecked(*, always: bool = False) -> Callable[[T_CallableOrType], T_CallableOrType]:
+ ...
+
+
+@overload
+def typechecked(func: T_CallableOrType, *, always: bool = False) -> T_CallableOrType:
+ ...
+
+
+def typechecked(func=None, *, always=False, _localns: Optional[Dict[str, Any]] = None):
+ """
+ Perform runtime type checking on the arguments that are passed to the wrapped function.
+
+ The return value is also checked against the return annotation if any.
+
+ If the ``__debug__`` global variable is set to ``False``, no wrapping and therefore no type
+ checking is done, unless ``always`` is ``True``.
+
+ This can also be used as a class decorator. This will wrap all type annotated methods,
+ including ``@classmethod``, ``@staticmethod``, and ``@property`` decorated methods,
+ in the class with the ``@typechecked`` decorator.
+
+ :param func: the function or class to enable type checking for
+ :param always: ``True`` to enable type checks even in optimized mode
+
+ """
+ if func is None:
+ return partial(typechecked, always=always, _localns=_localns)
+
+ if not __debug__ and not always: # pragma: no cover
+ return func
+
+ if isclass(func):
+ prefix = func.__qualname__ + '.'
+ for key, attr in func.__dict__.items():
+ if inspect.isfunction(attr) or inspect.ismethod(attr) or inspect.isclass(attr):
+ if attr.__qualname__.startswith(prefix) and getattr(attr, '__annotations__', None):
+ setattr(func, key, typechecked(attr, always=always, _localns=func.__dict__))
+ elif isinstance(attr, (classmethod, staticmethod)):
+ if getattr(attr.__func__, '__annotations__', None):
+ wrapped = typechecked(attr.__func__, always=always, _localns=func.__dict__)
+ setattr(func, key, type(attr)(wrapped))
+ elif isinstance(attr, property):
+ kwargs = dict(doc=attr.__doc__)
+ for name in ("fset", "fget", "fdel"):
+ property_func = kwargs[name] = getattr(attr, name)
+ if property_func is not None and getattr(property_func, '__annotations__', ()):
+ kwargs[name] = typechecked(
+ property_func, always=always, _localns=func.__dict__
+ )
+
+ setattr(func, key, attr.__class__(**kwargs))
+
+ return func
+
+ if not getattr(func, '__annotations__', None):
+ warn('no type annotations present -- not typechecking {}'.format(function_name(func)))
+ return func
+
+ # Find the frame in which the function was declared, for resolving forward references later
+ if _localns is None:
+ _localns = sys._getframe(1).f_locals
+
+ # Find either the first Python wrapper or the actual function
+ python_func = inspect.unwrap(func, stop=lambda f: hasattr(f, '__code__'))
+
+ if not getattr(python_func, '__code__', None):
+ warn('no code associated -- not typechecking {}'.format(function_name(func)))
+ return func
+
+ def wrapper(*args, **kwargs):
+ memo = _CallMemo(python_func, _localns, args=args, kwargs=kwargs)
+ check_argument_types(memo)
+ retval = func(*args, **kwargs)
+ try:
+ check_return_type(retval, memo)
+ except TypeError as exc:
+ raise TypeError(*exc.args) from None
+
+ # If a generator is returned, wrap it if its yield/send/return types can be checked
+ if inspect.isgenerator(retval) or isasyncgen(retval):
+ return_type = memo.type_hints.get('return')
+ if return_type:
+ origin = getattr(return_type, '__origin__', None)
+ if origin in generator_origin_types:
+ return TypeCheckedGenerator(retval, memo)
+ elif origin is not None and origin in asyncgen_origin_types:
+ return TypeCheckedAsyncGenerator(retval, memo)
+
+ return retval
+
+ async def async_wrapper(*args, **kwargs):
+ memo = _CallMemo(python_func, _localns, args=args, kwargs=kwargs)
+ check_argument_types(memo)
+ retval = await func(*args, **kwargs)
+ check_return_type(retval, memo)
+ return retval
+
+ if inspect.iscoroutinefunction(func):
+ if python_func.__code__ is not async_wrapper.__code__:
+ return wraps(func)(async_wrapper)
+ else:
+ if python_func.__code__ is not wrapper.__code__:
+ return wraps(func)(wrapper)
+
+ # the target callable was already wrapped
+ return func
+
+
+class TypeWarning(UserWarning):
+ """
+ A warning that is emitted when a type check fails.
+
+ :ivar str event: ``call`` or ``return``
+ :ivar Callable func: the function in which the violation occurred (the called function if event
+ is ``call``, or the function where a value of the wrong type was returned from if event is
+ ``return``)
+ :ivar str error: the error message contained by the caught :class:`TypeError`
+ :ivar frame: the frame in which the violation occurred
+ """
+
+ __slots__ = ('func', 'event', 'message', 'frame')
+
+ def __init__(self, memo: Optional[_CallMemo], event: str, frame,
+ exception: Union[str, TypeError]): # pragma: no cover
+ self.func = memo.func
+ self.event = event
+ self.error = str(exception)
+ self.frame = frame
+
+ if self.event == 'call':
+ caller_frame = self.frame.f_back
+ event = 'call to {}() from {}:{}'.format(
+ function_name(self.func), caller_frame.f_code.co_filename, caller_frame.f_lineno)
+ else:
+ event = 'return from {}() at {}:{}'.format(
+ function_name(self.func), self.frame.f_code.co_filename, self.frame.f_lineno)
+
+ super().__init__('[{thread_name}] {event}: {self.error}'.format(
+ thread_name=threading.current_thread().name, event=event, self=self))
+
+ @property
+ def stack(self):
+ """Return the stack where the last frame is from the target function."""
+ return extract_stack(self.frame)
+
+ def print_stack(self, file: TextIO = None, limit: int = None) -> None:
+ """
+ Print the traceback from the stack frame where the target function was run.
+
+ :param file: an open file to print to (prints to stdout if omitted)
+ :param limit: the maximum number of stack frames to print
+
+ """
+ print_stack(self.frame, limit, file)
+
+
+class TypeChecker:
+ """
+ A type checker that collects type violations by hooking into :func:`sys.setprofile`.
+
+ :param packages: list of top level modules and packages or modules to include for type checking
+ :param all_threads: ``True`` to check types in all threads created while the checker is
+ running, ``False`` to only check in the current one
+ :param forward_refs_policy: how to handle unresolvable forward references in annotations
+
+ .. deprecated:: 2.6
+ Use :func:`~.importhook.install_import_hook` instead. This class will be removed in v3.0.
+ """
+
+ def __init__(self, packages: Union[str, Sequence[str]], *, all_threads: bool = True,
+ forward_refs_policy: ForwardRefPolicy = ForwardRefPolicy.ERROR):
+ assert check_argument_types()
+ warn('TypeChecker has been deprecated and will be removed in v3.0. '
+ 'Use install_import_hook() or the pytest plugin instead.', DeprecationWarning)
+ self.all_threads = all_threads
+ self.annotation_policy = forward_refs_policy
+ self._call_memos = {} # type: Dict[Any, _CallMemo]
+ self._previous_profiler = None
+ self._previous_thread_profiler = None
+ self._active = False
+
+ if isinstance(packages, str):
+ self._packages = (packages,)
+ else:
+ self._packages = tuple(packages)
+
+ @property
+ def active(self) -> bool:
+ """Return ``True`` if currently collecting type violations."""
+ return self._active
+
+ def should_check_type(self, func: Callable) -> bool:
+ if not func.__annotations__:
+ # No point in checking if there are no type hints
+ return False
+ elif isasyncgenfunction(func):
+ # Async generators cannot be supported because the return arg is of an opaque builtin
+ # type (async_generator_wrapped_value)
+ return False
+ else:
+ # Check types if the module matches any of the package prefixes
+ return any(func.__module__ == package or func.__module__.startswith(package + '.')
+ for package in self._packages)
+
+ def start(self):
+ if self._active:
+ raise RuntimeError('type checker already running')
+
+ self._active = True
+
+ # Install this instance as the current profiler
+ self._previous_profiler = sys.getprofile()
+ sys.setprofile(self)
+
+ # If requested, set this instance as the default profiler for all future threads
+ # (does not affect existing threads)
+ if self.all_threads:
+ self._previous_thread_profiler = threading._profile_hook
+ threading.setprofile(self)
+
+ def stop(self):
+ if self._active:
+ if sys.getprofile() is self:
+ sys.setprofile(self._previous_profiler)
+ else: # pragma: no cover
+ warn('the system profiling hook has changed unexpectedly')
+
+ if self.all_threads:
+ if threading._profile_hook is self:
+ threading.setprofile(self._previous_thread_profiler)
+ else: # pragma: no cover
+ warn('the threading profiling hook has changed unexpectedly')
+
+ self._active = False
+
+ def __enter__(self):
+ self.start()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.stop()
+
+ def __call__(self, frame, event: str, arg) -> None: # pragma: no cover
+ if not self._active:
+ # This happens if all_threads was enabled and a thread was created when the checker was
+ # running but was then stopped. The thread's profiler callback can't be reset any other
+ # way but this.
+ sys.setprofile(self._previous_thread_profiler)
+ return
+
+ # If an actual profiler is running, don't include the type checking times in its results
+ if event == 'call':
+ try:
+ func = find_function(frame)
+ except Exception:
+ func = None
+
+ if func is not None and self.should_check_type(func):
+ memo = self._call_memos[frame] = _CallMemo(
+ func, frame.f_locals, forward_refs_policy=self.annotation_policy)
+ if memo.is_generator:
+ return_type_hint = memo.type_hints['return']
+ if return_type_hint is not None:
+ origin = getattr(return_type_hint, '__origin__', None)
+ if origin in generator_origin_types:
+ # Check the types of the yielded values
+ memo.type_hints['return'] = return_type_hint.__args__[0]
+ else:
+ try:
+ check_argument_types(memo)
+ except TypeError as exc:
+ warn(TypeWarning(memo, event, frame, exc))
+
+ if self._previous_profiler is not None:
+ self._previous_profiler(frame, event, arg)
+ elif event == 'return':
+ if self._previous_profiler is not None:
+ self._previous_profiler(frame, event, arg)
+
+ if arg is None:
+ # a None return value might mean an exception is being raised but we have no way of
+ # checking
+ return
+
+ memo = self._call_memos.get(frame)
+ if memo is not None:
+ try:
+ if memo.is_generator:
+ check_type('yielded value', arg, memo.type_hints['return'], memo)
+ else:
+ check_return_type(arg, memo)
+ except TypeError as exc:
+ warn(TypeWarning(memo, event, frame, exc))
+
+ if not memo.is_generator:
+ del self._call_memos[frame]
+ elif self._previous_profiler is not None:
+ self._previous_profiler(frame, event, arg)
diff --git a/contrib/python/typeguard/typeguard/importhook.py b/contrib/python/typeguard/typeguard/importhook.py
new file mode 100644
index 00000000000..d4e237fe7de
--- /dev/null
+++ b/contrib/python/typeguard/typeguard/importhook.py
@@ -0,0 +1,162 @@
+import ast
+import sys
+from importlib.abc import MetaPathFinder
+from importlib.machinery import SourceFileLoader
+from importlib.util import cache_from_source, decode_source
+from inspect import isclass
+from typing import Iterable, Type
+from unittest.mock import patch
+
+
+# The name of this function is magical
+def _call_with_frames_removed(f, *args, **kwargs):
+ return f(*args, **kwargs)
+
+
+def optimized_cache_from_source(path, debug_override=None):
+ return cache_from_source(path, debug_override, optimization='typeguard')
+
+
+class TypeguardTransformer(ast.NodeVisitor):
+ def __init__(self) -> None:
+ self._parents = []
+
+ def visit_Module(self, node: ast.Module):
+ # Insert "import typeguard" after any "from __future__ ..." imports
+ for i, child in enumerate(node.body):
+ if isinstance(child, ast.ImportFrom) and child.module == '__future__':
+ continue
+ elif isinstance(child, ast.Expr) and isinstance(child.value, ast.Constant):
+ continue # module docstring
+ else:
+ node.body.insert(i, ast.Import(names=[ast.alias('typeguard', None)]))
+ break
+
+ self._parents.append(node)
+ self.generic_visit(node)
+ self._parents.pop()
+ return node
+
+ def visit_ClassDef(self, node: ast.ClassDef):
+ node.decorator_list.append(
+ ast.Attribute(ast.Name(id='typeguard', ctx=ast.Load()), 'typechecked', ast.Load())
+ )
+ self._parents.append(node)
+ self.generic_visit(node)
+ self._parents.pop()
+ return node
+
+ def visit_FunctionDef(self, node: ast.FunctionDef):
+ # Let the class level decorator handle the methods of a class
+ if isinstance(self._parents[-1], ast.ClassDef):
+ return node
+
+ has_annotated_args = any(arg for arg in node.args.args if arg.annotation)
+ has_annotated_return = bool(node.returns)
+ if has_annotated_args or has_annotated_return:
+ node.decorator_list.insert(
+ 0,
+ ast.Attribute(ast.Name(id='typeguard', ctx=ast.Load()), 'typechecked', ast.Load())
+ )
+
+ self._parents.append(node)
+ self.generic_visit(node)
+ self._parents.pop()
+ return node
+
+
+class TypeguardLoader(SourceFileLoader):
+ def source_to_code(self, data, path, *, _optimize=-1):
+ source = decode_source(data)
+ tree = _call_with_frames_removed(compile, source, path, 'exec', ast.PyCF_ONLY_AST,
+ dont_inherit=True, optimize=_optimize)
+ tree = TypeguardTransformer().visit(tree)
+ ast.fix_missing_locations(tree)
+ return _call_with_frames_removed(compile, tree, path, 'exec',
+ dont_inherit=True, optimize=_optimize)
+
+ def exec_module(self, module):
+ # Use a custom optimization marker – the import lock should make this monkey patch safe
+ with patch('importlib._bootstrap_external.cache_from_source', optimized_cache_from_source):
+ return super().exec_module(module)
+
+
+class TypeguardFinder(MetaPathFinder):
+ """
+ Wraps another path finder and instruments the module with ``@typechecked`` if
+ :meth:`should_instrument` returns ``True``.
+
+ Should not be used directly, but rather via :func:`~.install_import_hook`.
+
+ .. versionadded:: 2.6
+
+ """
+
+ def __init__(self, packages, original_pathfinder):
+ self.packages = packages
+ self._original_pathfinder = original_pathfinder
+
+ def find_spec(self, fullname, path=None, target=None):
+ if self.should_instrument(fullname):
+ spec = self._original_pathfinder.find_spec(fullname, path, target)
+ if spec is not None and isinstance(spec.loader, SourceFileLoader):
+ spec.loader = TypeguardLoader(spec.loader.name, spec.loader.path)
+ return spec
+
+ return None
+
+ def should_instrument(self, module_name: str) -> bool:
+ """
+ Determine whether the module with the given name should be instrumented.
+
+ :param module_name: full name of the module that is about to be imported (e.g. ``xyz.abc``)
+
+ """
+ for package in self.packages:
+ if module_name == package or module_name.startswith(package + '.'):
+ return True
+
+ return False
+
+
+class ImportHookManager:
+ def __init__(self, hook: MetaPathFinder):
+ self.hook = hook
+
+ def __enter__(self):
+ pass
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.uninstall()
+
+ def uninstall(self):
+ try:
+ sys.meta_path.remove(self.hook)
+ except ValueError:
+ pass # already removed
+
+
+def install_import_hook(packages: Iterable[str], *,
+ cls: Type[TypeguardFinder] = TypeguardFinder) -> ImportHookManager:
+ """
+ Install an import hook that decorates classes and functions with ``@typechecked``.
+
+ This only affects modules loaded **after** this hook has been installed.
+
+ :return: a context manager that uninstalls the hook on exit (or when you call ``.uninstall()``)
+
+ .. versionadded:: 2.6
+
+ """
+ if isinstance(packages, str):
+ packages = [packages]
+
+ for i, finder in enumerate(sys.meta_path):
+ if isclass(finder) and finder.__name__ == 'PathFinder' and hasattr(finder, 'find_spec'):
+ break
+ else:
+ raise RuntimeError('Cannot find a PathFinder in sys.meta_path')
+
+ hook = cls(packages, finder)
+ sys.meta_path.insert(0, hook)
+ return ImportHookManager(hook)
diff --git a/contrib/python/typeguard/typeguard/py.typed b/contrib/python/typeguard/typeguard/py.typed
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/contrib/python/typeguard/typeguard/py.typed
diff --git a/contrib/python/typeguard/typeguard/pytest_plugin.py b/contrib/python/typeguard/typeguard/pytest_plugin.py
new file mode 100644
index 00000000000..caa128b5474
--- /dev/null
+++ b/contrib/python/typeguard/typeguard/pytest_plugin.py
@@ -0,0 +1,30 @@
+import sys
+
+from typeguard.importhook import install_import_hook
+
+
+def pytest_addoption(parser):
+ group = parser.getgroup('typeguard')
+ group.addoption('--typeguard-packages', action='store',
+ help='comma separated name list of packages and modules to instrument for '
+ 'type checking')
+
+
+def pytest_configure(config):
+ value = config.getoption("typeguard_packages")
+ if not value:
+ return
+
+ packages = [pkg.strip() for pkg in value.split(",")]
+
+ already_imported_packages = sorted(
+ package for package in packages if package in sys.modules
+ )
+ if already_imported_packages:
+ message = (
+ "typeguard cannot check these packages because they "
+ "are already imported: {}"
+ )
+ raise RuntimeError(message.format(", ".join(already_imported_packages)))
+
+ install_import_hook(packages=packages)
diff --git a/contrib/python/typeguard/ya.make b/contrib/python/typeguard/ya.make
new file mode 100644
index 00000000000..53dd3e96730
--- /dev/null
+++ b/contrib/python/typeguard/ya.make
@@ -0,0 +1,30 @@
+# Generated by devtools/yamaker (pypi).
+
+PY3_LIBRARY()
+
+VERSION(2.13.3)
+
+LICENSE(MIT)
+
+NO_LINT()
+
+PY_SRCS(
+ TOP_LEVEL
+ typeguard/__init__.py
+ typeguard/importhook.py
+ typeguard/pytest_plugin.py
+)
+
+RESOURCE_FILES(
+ PREFIX contrib/python/typeguard/
+ .dist-info/METADATA
+ .dist-info/entry_points.txt
+ .dist-info/top_level.txt
+ typeguard/py.typed
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ tests
+)