aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/unittest/mock.py
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.ru>2022-02-10 16:44:30 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:44:30 +0300
commit2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch)
tree012bb94d777798f1f56ac1cec429509766d05181 /contrib/tools/python3/src/Lib/unittest/mock.py
parent6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff)
downloadydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/unittest/mock.py')
-rw-r--r--contrib/tools/python3/src/Lib/unittest/mock.py1406
1 files changed, 703 insertions, 703 deletions
diff --git a/contrib/tools/python3/src/Lib/unittest/mock.py b/contrib/tools/python3/src/Lib/unittest/mock.py
index 5c78274d09..5c6141571e 100644
--- a/contrib/tools/python3/src/Lib/unittest/mock.py
+++ b/contrib/tools/python3/src/Lib/unittest/mock.py
@@ -13,7 +13,7 @@ __all__ = (
'ANY',
'call',
'create_autospec',
- 'AsyncMock',
+ 'AsyncMock',
'FILTER_DIR',
'NonCallableMock',
'NonCallableMagicMock',
@@ -23,16 +23,16 @@ __all__ = (
)
-import asyncio
-import contextlib
-import io
+import asyncio
+import contextlib
+import io
import inspect
import pprint
import sys
import builtins
-from asyncio import iscoroutinefunction
-from types import CodeType, ModuleType, MethodType
-from unittest.util import safe_repr
+from asyncio import iscoroutinefunction
+from types import CodeType, ModuleType, MethodType
+from unittest.util import safe_repr
from functools import wraps, partial
@@ -44,21 +44,21 @@ FILTER_DIR = True
# Without this, the __class__ properties wouldn't be set correctly
_safe_super = super
-def _is_async_obj(obj):
- if _is_instance_mock(obj) and not isinstance(obj, AsyncMock):
- return False
- if hasattr(obj, '__func__'):
- obj = getattr(obj, '__func__')
- return iscoroutinefunction(obj) or inspect.isawaitable(obj)
-
-
-def _is_async_func(func):
- if getattr(func, '__code__', None):
- return iscoroutinefunction(func)
- else:
- return False
-
-
+def _is_async_obj(obj):
+ if _is_instance_mock(obj) and not isinstance(obj, AsyncMock):
+ return False
+ if hasattr(obj, '__func__'):
+ obj = getattr(obj, '__func__')
+ return iscoroutinefunction(obj) or inspect.isawaitable(obj)
+
+
+def _is_async_func(func):
+ if getattr(func, '__code__', None):
+ return iscoroutinefunction(func)
+ else:
+ return False
+
+
def _is_instance_mock(obj):
# can't use isinstance on Mock objects because they override __class__
# The base class for all mocks is NonCallableMock
@@ -67,20 +67,20 @@ def _is_instance_mock(obj):
def _is_exception(obj):
return (
- isinstance(obj, BaseException) or
- isinstance(obj, type) and issubclass(obj, BaseException)
+ isinstance(obj, BaseException) or
+ isinstance(obj, type) and issubclass(obj, BaseException)
)
-def _extract_mock(obj):
- # Autospecced functions will return a FunctionType with "mock" attribute
- # which is the actual mock object that needs to be used.
- if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'):
- return obj.mock
- else:
- return obj
-
-
+def _extract_mock(obj):
+ # Autospecced functions will return a FunctionType with "mock" attribute
+ # which is the actual mock object that needs to be used.
+ if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'):
+ return obj.mock
+ else:
+ return obj
+
+
def _get_signature_object(func, as_instance, eat_self):
"""
Given an arbitrary, possibly callable object, try to create a suitable
@@ -89,7 +89,7 @@ def _get_signature_object(func, as_instance, eat_self):
"""
if isinstance(func, type) and not as_instance:
# If it's a type and should be modelled as a type, use __init__.
- func = func.__init__
+ func = func.__init__
# Skip the `self` argument in __init__
eat_self = True
elif not isinstance(func, FunctionTypes):
@@ -115,7 +115,7 @@ def _check_signature(func, mock, skipfirst, instance=False):
if sig is None:
return
func, sig = sig
- def checksig(self, /, *args, **kwargs):
+ def checksig(self, /, *args, **kwargs):
sig.bind(*args, **kwargs)
_copy_func_details(func, checksig)
type(mock)._mock_check_sig = checksig
@@ -138,8 +138,8 @@ def _copy_func_details(func, funcopy):
def _callable(obj):
if isinstance(obj, type):
return True
- if isinstance(obj, (staticmethod, classmethod, MethodType)):
- return _callable(obj.__func__)
+ if isinstance(obj, (staticmethod, classmethod, MethodType)):
+ return _callable(obj.__func__)
if getattr(obj, '__call__', None) is not None:
return True
return False
@@ -242,33 +242,33 @@ def _setup_func(funcopy, mock, sig):
mock._mock_delegate = funcopy
-def _setup_async_mock(mock):
- mock._is_coroutine = asyncio.coroutines._is_coroutine
- mock.await_count = 0
- mock.await_args = None
- mock.await_args_list = _CallList()
-
- # Mock is not configured yet so the attributes are set
- # to a function and then the corresponding mock helper function
- # is called when the helper is accessed similar to _setup_func.
- def wrapper(attr, /, *args, **kwargs):
- return getattr(mock.mock, attr)(*args, **kwargs)
-
- for attribute in ('assert_awaited',
- 'assert_awaited_once',
- 'assert_awaited_with',
- 'assert_awaited_once_with',
- 'assert_any_await',
- 'assert_has_awaits',
- 'assert_not_awaited'):
-
- # setattr(mock, attribute, wrapper) causes late binding
- # hence attribute will always be the last value in the loop
- # Use partial(wrapper, attribute) to ensure the attribute is bound
- # correctly.
- setattr(mock, attribute, partial(wrapper, attribute))
-
-
+def _setup_async_mock(mock):
+ mock._is_coroutine = asyncio.coroutines._is_coroutine
+ mock.await_count = 0
+ mock.await_args = None
+ mock.await_args_list = _CallList()
+
+ # Mock is not configured yet so the attributes are set
+ # to a function and then the corresponding mock helper function
+ # is called when the helper is accessed similar to _setup_func.
+ def wrapper(attr, /, *args, **kwargs):
+ return getattr(mock.mock, attr)(*args, **kwargs)
+
+ for attribute in ('assert_awaited',
+ 'assert_awaited_once',
+ 'assert_awaited_with',
+ 'assert_awaited_once_with',
+ 'assert_any_await',
+ 'assert_has_awaits',
+ 'assert_not_awaited'):
+
+ # setattr(mock, attribute, wrapper) causes late binding
+ # hence attribute will always be the last value in the loop
+ # Use partial(wrapper, attribute) to ensure the attribute is bound
+ # correctly.
+ setattr(mock, attribute, partial(wrapper, attribute))
+
+
def _is_magic(name):
return '__%s__' % name[2:-2] == name
@@ -354,7 +354,7 @@ class _CallList(list):
def _check_and_set_parent(parent, value, name, new_name):
- value = _extract_mock(value)
+ value = _extract_mock(value)
if not _is_instance_mock(value):
return False
@@ -389,7 +389,7 @@ class _MockIter(object):
class Base(object):
_mock_return_value = DEFAULT
_mock_side_effect = None
- def __init__(self, /, *args, **kwargs):
+ def __init__(self, /, *args, **kwargs):
pass
@@ -397,19 +397,19 @@ class Base(object):
class NonCallableMock(Base):
"""A non-callable version of `Mock`"""
- def __new__(cls, /, *args, **kw):
+ def __new__(cls, /, *args, **kw):
# every instance has its own class
# so we can create magic methods on the
# class without stomping on other mocks
- bases = (cls,)
- if not issubclass(cls, AsyncMockMixin):
- # Check if spec is an async object or function
- bound_args = _MOCK_SIG.bind_partial(cls, *args, **kw).arguments
- spec_arg = bound_args.get('spec_set', bound_args.get('spec'))
- if spec_arg is not None and _is_async_obj(spec_arg):
- bases = (AsyncMockMixin, cls)
- new = type(cls.__name__, bases, {'__doc__': cls.__doc__})
- instance = _safe_super(NonCallableMock, cls).__new__(new)
+ bases = (cls,)
+ if not issubclass(cls, AsyncMockMixin):
+ # Check if spec is an async object or function
+ bound_args = _MOCK_SIG.bind_partial(cls, *args, **kw).arguments
+ spec_arg = bound_args.get('spec_set', bound_args.get('spec'))
+ if spec_arg is not None and _is_async_obj(spec_arg):
+ bases = (AsyncMockMixin, cls)
+ new = type(cls.__name__, bases, {'__doc__': cls.__doc__})
+ instance = _safe_super(NonCallableMock, cls).__new__(new)
return instance
@@ -463,13 +463,13 @@ class NonCallableMock(Base):
Attach a mock as an attribute of this one, replacing its name and
parent. Calls to the attached mock will be recorded in the
`method_calls` and `mock_calls` attributes of this one."""
- inner_mock = _extract_mock(mock)
-
- inner_mock._mock_parent = None
- inner_mock._mock_new_parent = None
- inner_mock._mock_name = ''
- inner_mock._mock_new_name = None
+ inner_mock = _extract_mock(mock)
+ inner_mock._mock_parent = None
+ inner_mock._mock_new_parent = None
+ inner_mock._mock_name = ''
+ inner_mock._mock_new_name = None
+
setattr(self, attribute, mock)
@@ -486,17 +486,17 @@ class NonCallableMock(Base):
_eat_self=False):
_spec_class = None
_spec_signature = None
- _spec_asyncs = []
-
- for attr in dir(spec):
- if iscoroutinefunction(getattr(spec, attr, None)):
- _spec_asyncs.append(attr)
+ _spec_asyncs = []
+ for attr in dir(spec):
+ if iscoroutinefunction(getattr(spec, attr, None)):
+ _spec_asyncs.append(attr)
+
if spec is not None and not _is_list(spec):
if isinstance(spec, type):
_spec_class = spec
else:
- _spec_class = type(spec)
+ _spec_class = type(spec)
res = _get_signature_object(spec,
_spec_as_instance, _eat_self)
_spec_signature = res and res[1]
@@ -508,7 +508,7 @@ class NonCallableMock(Base):
__dict__['_spec_set'] = spec_set
__dict__['_spec_signature'] = _spec_signature
__dict__['_mock_methods'] = spec
- __dict__['_spec_asyncs'] = _spec_asyncs
+ __dict__['_spec_asyncs'] = _spec_asyncs
def __get_return_value(self):
ret = self._mock_return_value
@@ -593,14 +593,14 @@ class NonCallableMock(Base):
for child in self._mock_children.values():
if isinstance(child, _SpecState) or child is _deleted:
continue
- child.reset_mock(visited, return_value=return_value, side_effect=side_effect)
+ child.reset_mock(visited, return_value=return_value, side_effect=side_effect)
ret = self._mock_return_value
if _is_instance_mock(ret) and ret is not self:
ret.reset_mock(visited)
- def configure_mock(self, /, **kwargs):
+ def configure_mock(self, /, **kwargs):
"""Set attributes on the mock through keyword arguments.
Attributes plus return values and side effects can be set on child
@@ -632,8 +632,8 @@ class NonCallableMock(Base):
raise AttributeError(name)
if not self._mock_unsafe:
if name.startswith(('assert', 'assret')):
- raise AttributeError("Attributes cannot start with 'assert' "
- "or 'assret'")
+ raise AttributeError("Attributes cannot start with 'assert' "
+ "or 'assret'")
result = self._mock_children.get(name)
if result is _deleted:
@@ -669,7 +669,7 @@ class NonCallableMock(Base):
dot = '.'
if _name_list == ['()']:
dot = ''
-
+
while _parent is not None:
last = _parent
@@ -717,14 +717,14 @@ class NonCallableMock(Base):
extras = self._mock_methods or []
from_type = dir(type(self))
from_dict = list(self.__dict__)
- from_child_mocks = [
- m_name for m_name, m_value in self._mock_children.items()
- if m_value is not _deleted]
+ from_child_mocks = [
+ m_name for m_name, m_value in self._mock_children.items()
+ if m_value is not _deleted]
from_type = [e for e in from_type if not e.startswith('_')]
from_dict = [e for e in from_dict if not e.startswith('_') or
_is_magic(e)]
- return sorted(set(extras + from_type + from_dict + from_child_mocks))
+ return sorted(set(extras + from_type + from_dict + from_child_mocks))
def __setattr__(self, name, value):
@@ -776,7 +776,7 @@ class NonCallableMock(Base):
obj = self._mock_children.get(name, _missing)
if name in self.__dict__:
- _safe_super(NonCallableMock, self).__delattr__(name)
+ _safe_super(NonCallableMock, self).__delattr__(name)
elif obj is _deleted:
raise AttributeError(name)
if obj is not _missing:
@@ -789,47 +789,47 @@ class NonCallableMock(Base):
return _format_call_signature(name, args, kwargs)
- def _format_mock_failure_message(self, args, kwargs, action='call'):
- message = 'expected %s not found.\nExpected: %s\nActual: %s'
+ def _format_mock_failure_message(self, args, kwargs, action='call'):
+ message = 'expected %s not found.\nExpected: %s\nActual: %s'
expected_string = self._format_mock_call_signature(args, kwargs)
call_args = self.call_args
actual_string = self._format_mock_call_signature(*call_args)
- return message % (action, expected_string, actual_string)
-
-
- def _get_call_signature_from_name(self, name):
- """
- * If call objects are asserted against a method/function like obj.meth1
- then there could be no name for the call object to lookup. Hence just
- return the spec_signature of the method/function being asserted against.
- * If the name is not empty then remove () and split by '.' to get
- list of names to iterate through the children until a potential
- match is found. A child mock is created only during attribute access
- so if we get a _SpecState then no attributes of the spec were accessed
- and can be safely exited.
- """
- if not name:
- return self._spec_signature
-
- sig = None
- names = name.replace('()', '').split('.')
- children = self._mock_children
-
- for name in names:
- child = children.get(name)
- if child is None or isinstance(child, _SpecState):
- break
- else:
- # If an autospecced object is attached using attach_mock the
- # child would be a function with mock object as attribute from
- # which signature has to be derived.
- child = _extract_mock(child)
- children = child._mock_children
- sig = child._spec_signature
-
- return sig
-
-
+ return message % (action, expected_string, actual_string)
+
+
+ def _get_call_signature_from_name(self, name):
+ """
+ * If call objects are asserted against a method/function like obj.meth1
+ then there could be no name for the call object to lookup. Hence just
+ return the spec_signature of the method/function being asserted against.
+ * If the name is not empty then remove () and split by '.' to get
+ list of names to iterate through the children until a potential
+ match is found. A child mock is created only during attribute access
+ so if we get a _SpecState then no attributes of the spec were accessed
+ and can be safely exited.
+ """
+ if not name:
+ return self._spec_signature
+
+ sig = None
+ names = name.replace('()', '').split('.')
+ children = self._mock_children
+
+ for name in names:
+ child = children.get(name)
+ if child is None or isinstance(child, _SpecState):
+ break
+ else:
+ # If an autospecced object is attached using attach_mock the
+ # child would be a function with mock object as attribute from
+ # which signature has to be derived.
+ child = _extract_mock(child)
+ children = child._mock_children
+ sig = child._spec_signature
+
+ return sig
+
+
def _call_matcher(self, _call):
"""
Given a call (or simply an (args, kwargs) tuple), return a
@@ -837,12 +837,12 @@ class NonCallableMock(Base):
This is a best effort method which relies on the spec's signature,
if available, or falls back on the arguments themselves.
"""
-
- if isinstance(_call, tuple) and len(_call) > 2:
- sig = self._get_call_signature_from_name(_call[0])
- else:
- sig = self._spec_signature
-
+
+ if isinstance(_call, tuple) and len(_call) > 2:
+ sig = self._get_call_signature_from_name(_call[0])
+ else:
+ sig = self._spec_signature
+
if sig is not None:
if len(_call) == 2:
name = ''
@@ -850,71 +850,71 @@ class NonCallableMock(Base):
else:
name, args, kwargs = _call
try:
- bound_call = sig.bind(*args, **kwargs)
- return call(name, bound_call.args, bound_call.kwargs)
+ bound_call = sig.bind(*args, **kwargs)
+ return call(name, bound_call.args, bound_call.kwargs)
except TypeError as e:
return e.with_traceback(None)
else:
return _call
- def assert_not_called(self):
+ def assert_not_called(self):
"""assert that the mock was never called.
"""
if self.call_count != 0:
- msg = ("Expected '%s' to not have been called. Called %s times.%s"
- % (self._mock_name or 'mock',
- self.call_count,
- self._calls_repr()))
+ msg = ("Expected '%s' to not have been called. Called %s times.%s"
+ % (self._mock_name or 'mock',
+ self.call_count,
+ self._calls_repr()))
raise AssertionError(msg)
- def assert_called(self):
+ def assert_called(self):
"""assert that the mock was called at least once
"""
if self.call_count == 0:
msg = ("Expected '%s' to have been called." %
- (self._mock_name or 'mock'))
+ (self._mock_name or 'mock'))
raise AssertionError(msg)
- def assert_called_once(self):
+ def assert_called_once(self):
"""assert that the mock was called only once.
"""
if not self.call_count == 1:
- msg = ("Expected '%s' to have been called once. Called %s times.%s"
- % (self._mock_name or 'mock',
- self.call_count,
- self._calls_repr()))
+ msg = ("Expected '%s' to have been called once. Called %s times.%s"
+ % (self._mock_name or 'mock',
+ self.call_count,
+ self._calls_repr()))
raise AssertionError(msg)
- def assert_called_with(self, /, *args, **kwargs):
- """assert that the last call was made with the specified arguments.
+ def assert_called_with(self, /, *args, **kwargs):
+ """assert that the last call was made with the specified arguments.
Raises an AssertionError if the args and keyword args passed in are
different to the last call to the mock."""
if self.call_args is None:
expected = self._format_mock_call_signature(args, kwargs)
- actual = 'not called.'
- error_message = ('expected call not found.\nExpected: %s\nActual: %s'
- % (expected, actual))
- raise AssertionError(error_message)
+ actual = 'not called.'
+ error_message = ('expected call not found.\nExpected: %s\nActual: %s'
+ % (expected, actual))
+ raise AssertionError(error_message)
def _error_message():
msg = self._format_mock_failure_message(args, kwargs)
return msg
- expected = self._call_matcher(_Call((args, kwargs), two=True))
+ expected = self._call_matcher(_Call((args, kwargs), two=True))
actual = self._call_matcher(self.call_args)
- if actual != expected:
+ if actual != expected:
cause = expected if isinstance(expected, Exception) else None
raise AssertionError(_error_message()) from cause
- def assert_called_once_with(self, /, *args, **kwargs):
+ def assert_called_once_with(self, /, *args, **kwargs):
"""assert that the mock was called exactly once and that that call was
with the specified arguments."""
if not self.call_count == 1:
- msg = ("Expected '%s' to be called once. Called %s times.%s"
- % (self._mock_name or 'mock',
- self.call_count,
- self._calls_repr()))
+ msg = ("Expected '%s' to be called once. Called %s times.%s"
+ % (self._mock_name or 'mock',
+ self.call_count,
+ self._calls_repr()))
raise AssertionError(msg)
return self.assert_called_with(*args, **kwargs)
@@ -930,21 +930,21 @@ class NonCallableMock(Base):
If `any_order` is True then the calls can be in any order, but
they must all appear in `mock_calls`."""
expected = [self._call_matcher(c) for c in calls]
- cause = next((e for e in expected if isinstance(e, Exception)), None)
+ cause = next((e for e in expected if isinstance(e, Exception)), None)
all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls)
if not any_order:
if expected not in all_calls:
- if cause is None:
- problem = 'Calls not found.'
- else:
- problem = ('Error processing expected calls.\n'
- 'Errors: {}').format(
- [e if isinstance(e, Exception) else None
- for e in expected])
+ if cause is None:
+ problem = 'Calls not found.'
+ else:
+ problem = ('Error processing expected calls.\n'
+ 'Errors: {}').format(
+ [e if isinstance(e, Exception) else None
+ for e in expected])
raise AssertionError(
- f'{problem}\n'
- f'Expected: {_CallList(calls)}'
- f'{self._calls_repr(prefix="Actual").rstrip(".")}'
+ f'{problem}\n'
+ f'Expected: {_CallList(calls)}'
+ f'{self._calls_repr(prefix="Actual").rstrip(".")}'
) from cause
return
@@ -958,29 +958,29 @@ class NonCallableMock(Base):
not_found.append(kall)
if not_found:
raise AssertionError(
- '%r does not contain all of %r in its call list, '
- 'found %r instead' % (self._mock_name or 'mock',
- tuple(not_found), all_calls)
+ '%r does not contain all of %r in its call list, '
+ 'found %r instead' % (self._mock_name or 'mock',
+ tuple(not_found), all_calls)
) from cause
- def assert_any_call(self, /, *args, **kwargs):
+ def assert_any_call(self, /, *args, **kwargs):
"""assert the mock has been called with the specified arguments.
The assert passes if the mock has *ever* been called, unlike
`assert_called_with` and `assert_called_once_with` that only pass if
the call is the most recent one."""
- expected = self._call_matcher(_Call((args, kwargs), two=True))
- cause = expected if isinstance(expected, Exception) else None
+ expected = self._call_matcher(_Call((args, kwargs), two=True))
+ cause = expected if isinstance(expected, Exception) else None
actual = [self._call_matcher(c) for c in self.call_args_list]
- if cause or expected not in _AnyComparer(actual):
+ if cause or expected not in _AnyComparer(actual):
expected_string = self._format_mock_call_signature(args, kwargs)
raise AssertionError(
'%s call not found' % expected_string
) from cause
- def _get_child_mock(self, /, **kw):
+ def _get_child_mock(self, /, **kw):
"""Create the child mocks for attributes and return value.
By default child mocks will be the same type as the parent.
Subclasses of Mock may want to override this to customize the way
@@ -988,68 +988,68 @@ class NonCallableMock(Base):
For non-callable mocks the callable variant will be used (rather than
any custom subclass)."""
- _new_name = kw.get("_new_name")
- if _new_name in self.__dict__['_spec_asyncs']:
- return AsyncMock(**kw)
-
- if self._mock_sealed:
- attribute = f".{kw['name']}" if "name" in kw else "()"
- mock_name = self._extract_mock_name() + attribute
- raise AttributeError(mock_name)
-
+ _new_name = kw.get("_new_name")
+ if _new_name in self.__dict__['_spec_asyncs']:
+ return AsyncMock(**kw)
+
+ if self._mock_sealed:
+ attribute = f".{kw['name']}" if "name" in kw else "()"
+ mock_name = self._extract_mock_name() + attribute
+ raise AttributeError(mock_name)
+
_type = type(self)
- if issubclass(_type, MagicMock) and _new_name in _async_method_magics:
- # Any asynchronous magic becomes an AsyncMock
- klass = AsyncMock
- elif issubclass(_type, AsyncMockMixin):
- if (_new_name in _all_sync_magics or
- self._mock_methods and _new_name in self._mock_methods):
- # Any synchronous method on AsyncMock becomes a MagicMock
- klass = MagicMock
- else:
- klass = AsyncMock
- elif not issubclass(_type, CallableMixin):
+ if issubclass(_type, MagicMock) and _new_name in _async_method_magics:
+ # Any asynchronous magic becomes an AsyncMock
+ klass = AsyncMock
+ elif issubclass(_type, AsyncMockMixin):
+ if (_new_name in _all_sync_magics or
+ self._mock_methods and _new_name in self._mock_methods):
+ # Any synchronous method on AsyncMock becomes a MagicMock
+ klass = MagicMock
+ else:
+ klass = AsyncMock
+ elif not issubclass(_type, CallableMixin):
if issubclass(_type, NonCallableMagicMock):
klass = MagicMock
- elif issubclass(_type, NonCallableMock):
+ elif issubclass(_type, NonCallableMock):
klass = Mock
else:
klass = _type.__mro__[1]
return klass(**kw)
- def _calls_repr(self, prefix="Calls"):
- """Renders self.mock_calls as a string.
-
- Example: "\nCalls: [call(1), call(2)]."
-
- If self.mock_calls is empty, an empty string is returned. The
- output will be truncated if very long.
- """
- if not self.mock_calls:
- return ""
- return f"\n{prefix}: {safe_repr(self.mock_calls)}."
-
-
-_MOCK_SIG = inspect.signature(NonCallableMock.__init__)
-
-
-class _AnyComparer(list):
- """A list which checks if it contains a call which may have an
- argument of ANY, flipping the components of item and self from
- their traditional locations so that ANY is guaranteed to be on
- the left."""
- def __contains__(self, item):
- for _call in self:
- assert len(item) == len(_call)
- if all([
- expected == actual
- for expected, actual in zip(item, _call)
- ]):
- return True
- return False
-
-
+ def _calls_repr(self, prefix="Calls"):
+ """Renders self.mock_calls as a string.
+
+ Example: "\nCalls: [call(1), call(2)]."
+
+ If self.mock_calls is empty, an empty string is returned. The
+ output will be truncated if very long.
+ """
+ if not self.mock_calls:
+ return ""
+ return f"\n{prefix}: {safe_repr(self.mock_calls)}."
+
+
+_MOCK_SIG = inspect.signature(NonCallableMock.__init__)
+
+
+class _AnyComparer(list):
+ """A list which checks if it contains a call which may have an
+ argument of ANY, flipping the components of item and self from
+ their traditional locations so that ANY is guaranteed to be on
+ the left."""
+ def __contains__(self, item):
+ for _call in self:
+ assert len(item) == len(_call)
+ if all([
+ expected == actual
+ for expected, actual in zip(item, _call)
+ ]):
+ return True
+ return False
+
+
def _try_iter(obj):
if obj is None:
return obj
@@ -1079,29 +1079,29 @@ class CallableMixin(Base):
self.side_effect = side_effect
- def _mock_check_sig(self, /, *args, **kwargs):
+ def _mock_check_sig(self, /, *args, **kwargs):
# stub method that can be replaced with one with a specific signature
pass
- def __call__(self, /, *args, **kwargs):
+ def __call__(self, /, *args, **kwargs):
# can't use self in-case a function / method we are mocking uses self
# in the signature
- self._mock_check_sig(*args, **kwargs)
- self._increment_mock_call(*args, **kwargs)
- return self._mock_call(*args, **kwargs)
-
+ self._mock_check_sig(*args, **kwargs)
+ self._increment_mock_call(*args, **kwargs)
+ return self._mock_call(*args, **kwargs)
- def _mock_call(self, /, *args, **kwargs):
- return self._execute_mock_call(*args, **kwargs)
- def _increment_mock_call(self, /, *args, **kwargs):
+ def _mock_call(self, /, *args, **kwargs):
+ return self._execute_mock_call(*args, **kwargs)
+
+ def _increment_mock_call(self, /, *args, **kwargs):
self.called = True
self.call_count += 1
# handle call_args
- # needs to be set here so assertions on call arguments pass before
- # execution in the case of awaited calls
+ # needs to be set here so assertions on call arguments pass before
+ # execution in the case of awaited calls
_call = _Call((args, kwargs), two=True)
self.call_args = _call
self.call_args_list.append(_call)
@@ -1141,9 +1141,9 @@ class CallableMixin(Base):
# follow the parental chain:
_new_parent = _new_parent._mock_new_parent
- def _execute_mock_call(self, /, *args, **kwargs):
- # separate from _increment_mock_call so that awaited functions are
- # executed separately from their call, also AsyncMock overrides this method
+ def _execute_mock_call(self, /, *args, **kwargs):
+ # separate from _increment_mock_call so that awaited functions are
+ # executed separately from their call, also AsyncMock overrides this method
effect = self.side_effect
if effect is not None:
@@ -1288,8 +1288,8 @@ class _patch(object):
def __call__(self, func):
if isinstance(func, type):
return self.decorate_class(func)
- if inspect.iscoroutinefunction(func):
- return self.decorate_async_callable(func)
+ if inspect.iscoroutinefunction(func):
+ return self.decorate_async_callable(func)
return self.decorate_callable(func)
@@ -1307,51 +1307,51 @@ class _patch(object):
return klass
- @contextlib.contextmanager
- def decoration_helper(self, patched, args, keywargs):
- extra_args = []
- with contextlib.ExitStack() as exit_stack:
- for patching in patched.patchings:
- arg = exit_stack.enter_context(patching)
- if patching.attribute_name is not None:
- keywargs.update(arg)
- elif patching.new is DEFAULT:
- extra_args.append(arg)
-
- args += tuple(extra_args)
- yield (args, keywargs)
-
-
+ @contextlib.contextmanager
+ def decoration_helper(self, patched, args, keywargs):
+ extra_args = []
+ with contextlib.ExitStack() as exit_stack:
+ for patching in patched.patchings:
+ arg = exit_stack.enter_context(patching)
+ if patching.attribute_name is not None:
+ keywargs.update(arg)
+ elif patching.new is DEFAULT:
+ extra_args.append(arg)
+
+ args += tuple(extra_args)
+ yield (args, keywargs)
+
+
def decorate_callable(self, func):
- # NB. Keep the method in sync with decorate_async_callable()
+ # NB. Keep the method in sync with decorate_async_callable()
if hasattr(func, 'patchings'):
func.patchings.append(self)
return func
@wraps(func)
def patched(*args, **keywargs):
- with self.decoration_helper(patched,
- args,
- keywargs) as (newargs, newkeywargs):
- return func(*newargs, **newkeywargs)
-
- patched.patchings = [self]
- return patched
-
-
- def decorate_async_callable(self, func):
- # NB. Keep the method in sync with decorate_callable()
- if hasattr(func, 'patchings'):
- func.patchings.append(self)
- return func
-
- @wraps(func)
- async def patched(*args, **keywargs):
- with self.decoration_helper(patched,
- args,
- keywargs) as (newargs, newkeywargs):
- return await func(*newargs, **newkeywargs)
-
+ with self.decoration_helper(patched,
+ args,
+ keywargs) as (newargs, newkeywargs):
+ return func(*newargs, **newkeywargs)
+
+ patched.patchings = [self]
+ return patched
+
+
+ def decorate_async_callable(self, func):
+ # NB. Keep the method in sync with decorate_callable()
+ if hasattr(func, 'patchings'):
+ func.patchings.append(self)
+ return func
+
+ @wraps(func)
+ async def patched(*args, **keywargs):
+ with self.decoration_helper(patched,
+ args,
+ keywargs) as (newargs, newkeywargs):
+ return await func(*newargs, **newkeywargs)
+
patched.patchings = [self]
return patched
@@ -1424,10 +1424,10 @@ class _patch(object):
if isinstance(original, type):
# If we're patching out a class and there is a spec
inherit = True
- if spec is None and _is_async_obj(original):
- Klass = AsyncMock
- else:
- Klass = MagicMock
+ if spec is None and _is_async_obj(original):
+ Klass = AsyncMock
+ else:
+ Klass = MagicMock
_kwargs = {}
if new_callable is not None:
Klass = new_callable
@@ -1439,9 +1439,9 @@ class _patch(object):
not_callable = '__call__' not in this_spec
else:
not_callable = not callable(this_spec)
- if _is_async_obj(this_spec):
- Klass = AsyncMock
- elif not_callable:
+ if _is_async_obj(this_spec):
+ Klass = AsyncMock
+ elif not_callable:
Klass = NonCallableMagicMock
if spec is not None:
@@ -1496,23 +1496,23 @@ class _patch(object):
self.temp_original = original
self.is_local = local
- self._exit_stack = contextlib.ExitStack()
- try:
- setattr(self.target, self.attribute, new_attr)
- if self.attribute_name is not None:
- extra_args = {}
- if self.new is DEFAULT:
- extra_args[self.attribute_name] = new
- for patching in self.additional_patchers:
- arg = self._exit_stack.enter_context(patching)
- if patching.new is DEFAULT:
- extra_args.update(arg)
- return extra_args
-
- return new
- except:
- if not self.__exit__(*sys.exc_info()):
- raise
+ self._exit_stack = contextlib.ExitStack()
+ try:
+ setattr(self.target, self.attribute, new_attr)
+ if self.attribute_name is not None:
+ extra_args = {}
+ if self.new is DEFAULT:
+ extra_args[self.attribute_name] = new
+ for patching in self.additional_patchers:
+ arg = self._exit_stack.enter_context(patching)
+ if patching.new is DEFAULT:
+ extra_args.update(arg)
+ return extra_args
+
+ return new
+ except:
+ if not self.__exit__(*sys.exc_info()):
+ raise
def __exit__(self, *exc_info):
"""Undo the patch."""
@@ -1530,9 +1530,9 @@ class _patch(object):
del self.temp_original
del self.is_local
del self.target
- exit_stack = self._exit_stack
- del self._exit_stack
- return exit_stack.__exit__(*exc_info)
+ exit_stack = self._exit_stack
+ del self._exit_stack
+ return exit_stack.__exit__(*exc_info)
def start(self):
@@ -1548,9 +1548,9 @@ class _patch(object):
self._active_patches.remove(self)
except ValueError:
# If the patch hasn't been started this will fail
- return None
+ return None
- return self.__exit__(None, None, None)
+ return self.__exit__(None, None, None)
@@ -1582,10 +1582,10 @@ def _patch_object(
When used as a class decorator `patch.object` honours `patch.TEST_PREFIX`
for choosing which methods to wrap.
"""
- if type(target) is str:
- raise TypeError(
- f"{target!r} must be the actual object to be patched, not a str"
- )
+ if type(target) is str:
+ raise TypeError(
+ f"{target!r} must be the actual object to be patched, not a str"
+ )
getter = lambda: target
return _patch(
getter, attribute, new, spec, create,
@@ -1652,9 +1652,9 @@ def patch(
is patched with a `new` object. When the function/with statement exits
the patch is undone.
- If `new` is omitted, then the target is replaced with an
- `AsyncMock if the patched object is an async function or a
- `MagicMock` otherwise. If `patch` is used as a decorator and `new` is
+ If `new` is omitted, then the target is replaced with an
+ `AsyncMock if the patched object is an async function or a
+ `MagicMock` otherwise. If `patch` is used as a decorator and `new` is
omitted, the created mock is passed in as an extra argument to the
decorated function. If `patch` is used as a context manager the created
mock is returned by the context manager.
@@ -1672,8 +1672,8 @@ def patch(
patch to pass in the object being mocked as the spec/spec_set object.
`new_callable` allows you to specify a different class, or callable object,
- that will be called to create the `new` object. By default `AsyncMock` is
- used for async functions and `MagicMock` for the rest.
+ that will be called to create the `new` object. By default `AsyncMock` is
+ used for async functions and `MagicMock` for the rest.
A more powerful form of `spec` is `autospec`. If you set `autospec=True`
then the mock will be created with a spec from the object being replaced.
@@ -1707,8 +1707,8 @@ def patch(
"as"; very useful if `patch` is creating a mock object for you.
`patch` takes arbitrary keyword arguments. These will be passed to
- `AsyncMock` if the patched object is asynchronous, to `MagicMock`
- otherwise or to `new_callable` if specified.
+ `AsyncMock` if the patched object is asynchronous, to `MagicMock`
+ otherwise or to `new_callable` if specified.
`patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are
available for alternate use-cases.
@@ -1786,7 +1786,7 @@ class _patch_dict(object):
def __enter__(self):
"""Patch the dict."""
self._patch_dict()
- return self.in_dict
+ return self.in_dict
def _patch_dict(self):
@@ -1832,29 +1832,29 @@ class _patch_dict(object):
def __exit__(self, *args):
"""Unpatch the dict."""
- if self._original is not None:
- self._unpatch_dict()
+ if self._original is not None:
+ self._unpatch_dict()
return False
- def start(self):
- """Activate a patch, returning any created mock."""
- result = self.__enter__()
- _patch._active_patches.append(self)
- return result
-
-
- def stop(self):
- """Stop an active patch."""
- try:
- _patch._active_patches.remove(self)
- except ValueError:
- # If the patch hasn't been started this will fail
- return None
-
- return self.__exit__(None, None, None)
-
-
+ def start(self):
+ """Activate a patch, returning any created mock."""
+ result = self.__enter__()
+ _patch._active_patches.append(self)
+ return result
+
+
+ def stop(self):
+ """Stop an active patch."""
+ try:
+ _patch._active_patches.remove(self)
+ except ValueError:
+ # If the patch hasn't been started this will fail
+ return None
+
+ return self.__exit__(None, None, None)
+
+
def _clear_dict(in_dict):
try:
in_dict.clear()
@@ -1886,10 +1886,10 @@ magic_methods = (
# because there is no idivmod
"divmod rdivmod neg pos abs invert "
"complex int float index "
- "round trunc floor ceil "
+ "round trunc floor ceil "
"bool next "
- "fspath "
- "aiter "
+ "fspath "
+ "aiter "
)
numerics = (
@@ -1913,7 +1913,7 @@ _non_defaults = {
def _get_method(name, func):
"Turns a callable object (like a mock) into a real function"
- def method(self, /, *args, **kw):
+ def method(self, /, *args, **kw):
return func(self, *args, **kw)
method.__name__ = name
return method
@@ -1924,15 +1924,15 @@ _magics = {
' '.join([magic_methods, numerics, inplace, right]).split()
}
-# Magic methods used for async `with` statements
-_async_method_magics = {"__aenter__", "__aexit__", "__anext__"}
-# Magic methods that are only used with async calls but are synchronous functions themselves
-_sync_async_magics = {"__aiter__"}
-_async_magics = _async_method_magics | _sync_async_magics
-
-_all_sync_magics = _magics | _non_defaults
-_all_magics = _all_sync_magics | _async_magics
+# Magic methods used for async `with` statements
+_async_method_magics = {"__aenter__", "__aexit__", "__anext__"}
+# Magic methods that are only used with async calls but are synchronous functions themselves
+_sync_async_magics = {"__aiter__"}
+_async_magics = _async_method_magics | _sync_async_magics
+_all_sync_magics = _magics | _non_defaults
+_all_magics = _all_sync_magics | _async_magics
+
_unsupported_magics = {
'__getattr__', '__setattr__',
'__init__', '__new__', '__prepare__',
@@ -1944,7 +1944,7 @@ _calculate_return_value = {
'__hash__': lambda self: object.__hash__(self),
'__str__': lambda self: object.__str__(self),
'__sizeof__': lambda self: object.__sizeof__(self),
- '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}",
+ '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}",
}
_return_values = {
@@ -1960,7 +1960,7 @@ _return_values = {
'__float__': 1.0,
'__bool__': True,
'__index__': 1,
- '__aexit__': False,
+ '__aexit__': False,
}
@@ -1993,19 +1993,19 @@ def _get_iter(self):
return iter(ret_val)
return __iter__
-def _get_async_iter(self):
- def __aiter__():
- ret_val = self.__aiter__._mock_return_value
- if ret_val is DEFAULT:
- return _AsyncIterator(iter([]))
- return _AsyncIterator(iter(ret_val))
- return __aiter__
-
+def _get_async_iter(self):
+ def __aiter__():
+ ret_val = self.__aiter__._mock_return_value
+ if ret_val is DEFAULT:
+ return _AsyncIterator(iter([]))
+ return _AsyncIterator(iter(ret_val))
+ return __aiter__
+
_side_effect_methods = {
'__eq__': _get_eq,
'__ne__': _get_ne,
'__iter__': _get_iter,
- '__aiter__': _get_async_iter
+ '__aiter__': _get_async_iter
}
@@ -2016,9 +2016,9 @@ def _set_return_value(mock, method, name):
method.return_value = fixed
return
- return_calculator = _calculate_return_value.get(name)
- if return_calculator is not None:
- return_value = return_calculator(mock)
+ return_calculator = _calculate_return_value.get(name)
+ if return_calculator is not None:
+ return_value = return_calculator(mock)
method.return_value = return_value
return
@@ -2028,22 +2028,22 @@ def _set_return_value(mock, method, name):
-class MagicMixin(Base):
- def __init__(self, /, *args, **kw):
+class MagicMixin(Base):
+ def __init__(self, /, *args, **kw):
self._mock_set_magics() # make magic work for kwargs in init
_safe_super(MagicMixin, self).__init__(*args, **kw)
self._mock_set_magics() # fix magic broken by upper level init
def _mock_set_magics(self):
- orig_magics = _magics | _async_method_magics
- these_magics = orig_magics
+ orig_magics = _magics | _async_method_magics
+ these_magics = orig_magics
if getattr(self, "_mock_methods", None) is not None:
- these_magics = orig_magics.intersection(self._mock_methods)
+ these_magics = orig_magics.intersection(self._mock_methods)
remove_magics = set()
- remove_magics = orig_magics - these_magics
+ remove_magics = orig_magics - these_magics
for entry in remove_magics:
if entry in type(self).__dict__:
@@ -2071,11 +2071,11 @@ class NonCallableMagicMock(MagicMixin, NonCallableMock):
self._mock_set_magics()
-class AsyncMagicMixin(MagicMixin):
- def __init__(self, /, *args, **kw):
- self._mock_set_magics() # make magic work for kwargs in init
- _safe_super(AsyncMagicMixin, self).__init__(*args, **kw)
- self._mock_set_magics() # fix magic broken by upper level init
+class AsyncMagicMixin(MagicMixin):
+ def __init__(self, /, *args, **kw):
+ self._mock_set_magics() # make magic work for kwargs in init
+ _safe_super(AsyncMagicMixin, self).__init__(*args, **kw)
+ self._mock_set_magics() # fix magic broken by upper level init
class MagicMock(MagicMixin, Mock):
"""
@@ -2099,7 +2099,7 @@ class MagicMock(MagicMixin, Mock):
-class MagicProxy(Base):
+class MagicProxy(Base):
def __init__(self, name, parent):
self.name = name
self.parent = parent
@@ -2117,232 +2117,232 @@ class MagicProxy(Base):
return self.create_mock()
-class AsyncMockMixin(Base):
- await_count = _delegating_property('await_count')
- await_args = _delegating_property('await_args')
- await_args_list = _delegating_property('await_args_list')
-
- def __init__(self, /, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # iscoroutinefunction() checks _is_coroutine property to say if an
- # object is a coroutine. Without this check it looks to see if it is a
- # function/method, which in this case it is not (since it is an
- # AsyncMock).
- # It is set through __dict__ because when spec_set is True, this
- # attribute is likely undefined.
- self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine
- self.__dict__['_mock_await_count'] = 0
- self.__dict__['_mock_await_args'] = None
- self.__dict__['_mock_await_args_list'] = _CallList()
- code_mock = NonCallableMock(spec_set=CodeType)
- code_mock.co_flags = inspect.CO_COROUTINE
- self.__dict__['__code__'] = code_mock
-
- async def _execute_mock_call(self, /, *args, **kwargs):
- # This is nearly just like super(), except for special handling
- # of coroutines
-
- _call = _Call((args, kwargs), two=True)
- self.await_count += 1
- self.await_args = _call
- self.await_args_list.append(_call)
-
- effect = self.side_effect
- if effect is not None:
- if _is_exception(effect):
- raise effect
- elif not _callable(effect):
- try:
- result = next(effect)
- except StopIteration:
- # It is impossible to propogate a StopIteration
- # through coroutines because of PEP 479
- raise StopAsyncIteration
- if _is_exception(result):
- raise result
- elif iscoroutinefunction(effect):
- result = await effect(*args, **kwargs)
- else:
- result = effect(*args, **kwargs)
-
- if result is not DEFAULT:
- return result
-
- if self._mock_return_value is not DEFAULT:
- return self.return_value
-
- if self._mock_wraps is not None:
- if iscoroutinefunction(self._mock_wraps):
- return await self._mock_wraps(*args, **kwargs)
- return self._mock_wraps(*args, **kwargs)
-
- return self.return_value
-
- def assert_awaited(self):
- """
- Assert that the mock was awaited at least once.
- """
- if self.await_count == 0:
- msg = f"Expected {self._mock_name or 'mock'} to have been awaited."
- raise AssertionError(msg)
-
- def assert_awaited_once(self):
- """
- Assert that the mock was awaited exactly once.
- """
- if not self.await_count == 1:
- msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
- f" Awaited {self.await_count} times.")
- raise AssertionError(msg)
-
- def assert_awaited_with(self, /, *args, **kwargs):
- """
- Assert that the last await was with the specified arguments.
- """
- if self.await_args is None:
- expected = self._format_mock_call_signature(args, kwargs)
- raise AssertionError(f'Expected await: {expected}\nNot awaited')
-
- def _error_message():
- msg = self._format_mock_failure_message(args, kwargs, action='await')
- return msg
-
- expected = self._call_matcher(_Call((args, kwargs), two=True))
- actual = self._call_matcher(self.await_args)
- if actual != expected:
- cause = expected if isinstance(expected, Exception) else None
- raise AssertionError(_error_message()) from cause
-
- def assert_awaited_once_with(self, /, *args, **kwargs):
- """
- Assert that the mock was awaited exactly once and with the specified
- arguments.
- """
- if not self.await_count == 1:
- msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
- f" Awaited {self.await_count} times.")
- raise AssertionError(msg)
- return self.assert_awaited_with(*args, **kwargs)
-
- def assert_any_await(self, /, *args, **kwargs):
- """
- Assert the mock has ever been awaited with the specified arguments.
- """
- expected = self._call_matcher(_Call((args, kwargs), two=True))
- cause = expected if isinstance(expected, Exception) else None
- actual = [self._call_matcher(c) for c in self.await_args_list]
- if cause or expected not in _AnyComparer(actual):
- expected_string = self._format_mock_call_signature(args, kwargs)
- raise AssertionError(
- '%s await not found' % expected_string
- ) from cause
-
- def assert_has_awaits(self, calls, any_order=False):
- """
- Assert the mock has been awaited with the specified calls.
- The :attr:`await_args_list` list is checked for the awaits.
-
- If `any_order` is False (the default) then the awaits must be
- sequential. There can be extra calls before or after the
- specified awaits.
-
- If `any_order` is True then the awaits can be in any order, but
- they must all appear in :attr:`await_args_list`.
- """
- expected = [self._call_matcher(c) for c in calls]
- cause = next((e for e in expected if isinstance(e, Exception)), None)
- all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list)
- if not any_order:
- if expected not in all_awaits:
- if cause is None:
- problem = 'Awaits not found.'
- else:
- problem = ('Error processing expected awaits.\n'
- 'Errors: {}').format(
- [e if isinstance(e, Exception) else None
- for e in expected])
- raise AssertionError(
- f'{problem}\n'
- f'Expected: {_CallList(calls)}\n'
- f'Actual: {self.await_args_list}'
- ) from cause
- return
-
- all_awaits = list(all_awaits)
-
- not_found = []
- for kall in expected:
- try:
- all_awaits.remove(kall)
- except ValueError:
- not_found.append(kall)
- if not_found:
- raise AssertionError(
- '%r not all found in await list' % (tuple(not_found),)
- ) from cause
-
- def assert_not_awaited(self):
- """
- Assert that the mock was never awaited.
- """
- if self.await_count != 0:
- msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited."
- f" Awaited {self.await_count} times.")
- raise AssertionError(msg)
-
- def reset_mock(self, /, *args, **kwargs):
- """
- See :func:`.Mock.reset_mock()`
- """
- super().reset_mock(*args, **kwargs)
- self.await_count = 0
- self.await_args = None
- self.await_args_list = _CallList()
-
-
-class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock):
- """
- Enhance :class:`Mock` with features allowing to mock
- an async function.
-
- The :class:`AsyncMock` object will behave so the object is
- recognized as an async function, and the result of a call is an awaitable:
-
- >>> mock = AsyncMock()
- >>> iscoroutinefunction(mock)
- True
- >>> inspect.isawaitable(mock())
- True
-
-
- The result of ``mock()`` is an async function which will have the outcome
- of ``side_effect`` or ``return_value``:
-
- - if ``side_effect`` is a function, the async function will return the
- result of that function,
- - if ``side_effect`` is an exception, the async function will raise the
- exception,
- - if ``side_effect`` is an iterable, the async function will return the
- next value of the iterable, however, if the sequence of result is
- exhausted, ``StopIteration`` is raised immediately,
- - if ``side_effect`` is not defined, the async function will return the
- value defined by ``return_value``, hence, by default, the async function
- returns a new :class:`AsyncMock` object.
-
- If the outcome of ``side_effect`` or ``return_value`` is an async function,
- the mock async function obtained when the mock object is called will be this
- async function itself (and not an async function returning an async
- function).
-
- The test author can also specify a wrapped object with ``wraps``. In this
- case, the :class:`Mock` object behavior is the same as with an
- :class:`.Mock` object: the wrapped object may have methods
- defined as async function functions.
-
- Based on Martin Richard's asynctest project.
- """
-
-
+class AsyncMockMixin(Base):
+ await_count = _delegating_property('await_count')
+ await_args = _delegating_property('await_args')
+ await_args_list = _delegating_property('await_args_list')
+
+ def __init__(self, /, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ # iscoroutinefunction() checks _is_coroutine property to say if an
+ # object is a coroutine. Without this check it looks to see if it is a
+ # function/method, which in this case it is not (since it is an
+ # AsyncMock).
+ # It is set through __dict__ because when spec_set is True, this
+ # attribute is likely undefined.
+ self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine
+ self.__dict__['_mock_await_count'] = 0
+ self.__dict__['_mock_await_args'] = None
+ self.__dict__['_mock_await_args_list'] = _CallList()
+ code_mock = NonCallableMock(spec_set=CodeType)
+ code_mock.co_flags = inspect.CO_COROUTINE
+ self.__dict__['__code__'] = code_mock
+
+ async def _execute_mock_call(self, /, *args, **kwargs):
+ # This is nearly just like super(), except for special handling
+ # of coroutines
+
+ _call = _Call((args, kwargs), two=True)
+ self.await_count += 1
+ self.await_args = _call
+ self.await_args_list.append(_call)
+
+ effect = self.side_effect
+ if effect is not None:
+ if _is_exception(effect):
+ raise effect
+ elif not _callable(effect):
+ try:
+ result = next(effect)
+ except StopIteration:
+ # It is impossible to propogate a StopIteration
+ # through coroutines because of PEP 479
+ raise StopAsyncIteration
+ if _is_exception(result):
+ raise result
+ elif iscoroutinefunction(effect):
+ result = await effect(*args, **kwargs)
+ else:
+ result = effect(*args, **kwargs)
+
+ if result is not DEFAULT:
+ return result
+
+ if self._mock_return_value is not DEFAULT:
+ return self.return_value
+
+ if self._mock_wraps is not None:
+ if iscoroutinefunction(self._mock_wraps):
+ return await self._mock_wraps(*args, **kwargs)
+ return self._mock_wraps(*args, **kwargs)
+
+ return self.return_value
+
+ def assert_awaited(self):
+ """
+ Assert that the mock was awaited at least once.
+ """
+ if self.await_count == 0:
+ msg = f"Expected {self._mock_name or 'mock'} to have been awaited."
+ raise AssertionError(msg)
+
+ def assert_awaited_once(self):
+ """
+ Assert that the mock was awaited exactly once.
+ """
+ if not self.await_count == 1:
+ msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
+ f" Awaited {self.await_count} times.")
+ raise AssertionError(msg)
+
+ def assert_awaited_with(self, /, *args, **kwargs):
+ """
+ Assert that the last await was with the specified arguments.
+ """
+ if self.await_args is None:
+ expected = self._format_mock_call_signature(args, kwargs)
+ raise AssertionError(f'Expected await: {expected}\nNot awaited')
+
+ def _error_message():
+ msg = self._format_mock_failure_message(args, kwargs, action='await')
+ return msg
+
+ expected = self._call_matcher(_Call((args, kwargs), two=True))
+ actual = self._call_matcher(self.await_args)
+ if actual != expected:
+ cause = expected if isinstance(expected, Exception) else None
+ raise AssertionError(_error_message()) from cause
+
+ def assert_awaited_once_with(self, /, *args, **kwargs):
+ """
+ Assert that the mock was awaited exactly once and with the specified
+ arguments.
+ """
+ if not self.await_count == 1:
+ msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
+ f" Awaited {self.await_count} times.")
+ raise AssertionError(msg)
+ return self.assert_awaited_with(*args, **kwargs)
+
+ def assert_any_await(self, /, *args, **kwargs):
+ """
+ Assert the mock has ever been awaited with the specified arguments.
+ """
+ expected = self._call_matcher(_Call((args, kwargs), two=True))
+ cause = expected if isinstance(expected, Exception) else None
+ actual = [self._call_matcher(c) for c in self.await_args_list]
+ if cause or expected not in _AnyComparer(actual):
+ expected_string = self._format_mock_call_signature(args, kwargs)
+ raise AssertionError(
+ '%s await not found' % expected_string
+ ) from cause
+
+ def assert_has_awaits(self, calls, any_order=False):
+ """
+ Assert the mock has been awaited with the specified calls.
+ The :attr:`await_args_list` list is checked for the awaits.
+
+ If `any_order` is False (the default) then the awaits must be
+ sequential. There can be extra calls before or after the
+ specified awaits.
+
+ If `any_order` is True then the awaits can be in any order, but
+ they must all appear in :attr:`await_args_list`.
+ """
+ expected = [self._call_matcher(c) for c in calls]
+ cause = next((e for e in expected if isinstance(e, Exception)), None)
+ all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list)
+ if not any_order:
+ if expected not in all_awaits:
+ if cause is None:
+ problem = 'Awaits not found.'
+ else:
+ problem = ('Error processing expected awaits.\n'
+ 'Errors: {}').format(
+ [e if isinstance(e, Exception) else None
+ for e in expected])
+ raise AssertionError(
+ f'{problem}\n'
+ f'Expected: {_CallList(calls)}\n'
+ f'Actual: {self.await_args_list}'
+ ) from cause
+ return
+
+ all_awaits = list(all_awaits)
+
+ not_found = []
+ for kall in expected:
+ try:
+ all_awaits.remove(kall)
+ except ValueError:
+ not_found.append(kall)
+ if not_found:
+ raise AssertionError(
+ '%r not all found in await list' % (tuple(not_found),)
+ ) from cause
+
+ def assert_not_awaited(self):
+ """
+ Assert that the mock was never awaited.
+ """
+ if self.await_count != 0:
+ msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited."
+ f" Awaited {self.await_count} times.")
+ raise AssertionError(msg)
+
+ def reset_mock(self, /, *args, **kwargs):
+ """
+ See :func:`.Mock.reset_mock()`
+ """
+ super().reset_mock(*args, **kwargs)
+ self.await_count = 0
+ self.await_args = None
+ self.await_args_list = _CallList()
+
+
+class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock):
+ """
+ Enhance :class:`Mock` with features allowing to mock
+ an async function.
+
+ The :class:`AsyncMock` object will behave so the object is
+ recognized as an async function, and the result of a call is an awaitable:
+
+ >>> mock = AsyncMock()
+ >>> iscoroutinefunction(mock)
+ True
+ >>> inspect.isawaitable(mock())
+ True
+
+
+ The result of ``mock()`` is an async function which will have the outcome
+ of ``side_effect`` or ``return_value``:
+
+ - if ``side_effect`` is a function, the async function will return the
+ result of that function,
+ - if ``side_effect`` is an exception, the async function will raise the
+ exception,
+ - if ``side_effect`` is an iterable, the async function will return the
+ next value of the iterable, however, if the sequence of result is
+ exhausted, ``StopIteration`` is raised immediately,
+ - if ``side_effect`` is not defined, the async function will return the
+ value defined by ``return_value``, hence, by default, the async function
+ returns a new :class:`AsyncMock` object.
+
+ If the outcome of ``side_effect`` or ``return_value`` is an async function,
+ the mock async function obtained when the mock object is called will be this
+ async function itself (and not an async function returning an async
+ function).
+
+ The test author can also specify a wrapped object with ``wraps``. In this
+ case, the :class:`Mock` object behavior is the same as with an
+ :class:`.Mock` object: the wrapped object may have methods
+ defined as async function functions.
+
+ Based on Martin Richard's asynctest project.
+ """
+
+
class _ANY(object):
"A helper object that compares equal to everything."
@@ -2364,7 +2364,7 @@ def _format_call_signature(name, args, kwargs):
formatted_args = ''
args_string = ', '.join([repr(arg) for arg in args])
kwargs_string = ', '.join([
- '%s=%r' % (key, value) for key, value in kwargs.items()
+ '%s=%r' % (key, value) for key, value in kwargs.items()
])
if args_string:
formatted_args = args_string
@@ -2439,7 +2439,7 @@ class _Call(tuple):
try:
len_other = len(other)
except TypeError:
- return NotImplemented
+ return NotImplemented
self_name = ''
if len(self) == 2:
@@ -2491,7 +2491,7 @@ class _Call(tuple):
__ne__ = object.__ne__
- def __call__(self, /, *args, **kwargs):
+ def __call__(self, /, *args, **kwargs):
if self._mock_name is None:
return _Call(('', args, kwargs), name='()')
@@ -2506,28 +2506,28 @@ class _Call(tuple):
return _Call(name=name, parent=self, from_kall=False)
- def __getattribute__(self, attr):
- if attr in tuple.__dict__:
- raise AttributeError
- return tuple.__getattribute__(self, attr)
-
-
- def _get_call_arguments(self):
- if len(self) == 2:
- args, kwargs = self
- else:
- name, args, kwargs = self
-
- return args, kwargs
-
- @property
- def args(self):
- return self._get_call_arguments()[0]
-
- @property
- def kwargs(self):
- return self._get_call_arguments()[1]
-
+ def __getattribute__(self, attr):
+ if attr in tuple.__dict__:
+ raise AttributeError
+ return tuple.__getattribute__(self, attr)
+
+
+ def _get_call_arguments(self):
+ if len(self) == 2:
+ args, kwargs = self
+ else:
+ name, args, kwargs = self
+
+ return args, kwargs
+
+ @property
+ def args(self):
+ return self._get_call_arguments()[0]
+
+ @property
+ def kwargs(self):
+ return self._get_call_arguments()[1]
+
def __repr__(self):
if not self._mock_from_kall:
name = self._mock_name or 'call'
@@ -2590,7 +2590,7 @@ def create_autospec(spec, spec_set=False, instance=False, _parent=None,
spec = type(spec)
is_type = isinstance(spec, type)
- is_async_func = _is_async_func(spec)
+ is_async_func = _is_async_func(spec)
_kwargs = {'spec': spec}
if spec_set:
_kwargs = {'spec_set': spec}
@@ -2607,11 +2607,11 @@ def create_autospec(spec, spec_set=False, instance=False, _parent=None,
# descriptors don't have a spec
# because we don't know what type they return
_kwargs = {}
- elif is_async_func:
- if instance:
- raise RuntimeError("Instance can not be True when create_autospec "
- "is mocking an async function")
- Klass = AsyncMock
+ elif is_async_func:
+ if instance:
+ raise RuntimeError("Instance can not be True when create_autospec "
+ "is mocking an async function")
+ Klass = AsyncMock
elif not _callable(spec):
Klass = NonCallableMagicMock
elif is_type and instance and not _instance_callable(spec):
@@ -2631,8 +2631,8 @@ def create_autospec(spec, spec_set=False, instance=False, _parent=None,
# should only happen at the top level because we don't
# recurse for functions
mock = _set_signature(mock, spec)
- if is_async_func:
- _setup_async_mock(mock)
+ if is_async_func:
+ _setup_async_mock(mock)
else:
_check_signature(spec, mock, is_type, instance)
@@ -2676,13 +2676,13 @@ def create_autospec(spec, spec_set=False, instance=False, _parent=None,
skipfirst = _must_skip(spec, entry, is_type)
kwargs['_eat_self'] = skipfirst
- if iscoroutinefunction(original):
- child_klass = AsyncMock
- else:
- child_klass = MagicMock
- new = child_klass(parent=parent, name=entry, _new_name=entry,
- _new_parent=parent,
- **kwargs)
+ if iscoroutinefunction(original):
+ child_klass = AsyncMock
+ else:
+ child_klass = MagicMock
+ new = child_klass(parent=parent, name=entry, _new_name=entry,
+ _new_parent=parent,
+ **kwargs)
mock._mock_children[entry] = new
_check_signature(original, new, skipfirst=skipfirst)
@@ -2713,14 +2713,14 @@ def _must_skip(spec, entry, is_type):
continue
if isinstance(result, (staticmethod, classmethod)):
return False
- elif isinstance(result, FunctionTypes):
+ elif isinstance(result, FunctionTypes):
# Normal method => skip if looked up on type
# (if looked up on instance, self is already skipped)
return is_type
else:
return False
- # function is a dynamically provided attribute
+ # function is a dynamically provided attribute
return is_type
@@ -2747,11 +2747,11 @@ FunctionTypes = (
file_spec = None
-def _to_stream(read_data):
- if isinstance(read_data, bytes):
- return io.BytesIO(read_data)
+def _to_stream(read_data):
+ if isinstance(read_data, bytes):
+ return io.BytesIO(read_data)
else:
- return io.StringIO(read_data)
+ return io.StringIO(read_data)
def mock_open(mock=None, read_data=''):
@@ -2766,23 +2766,23 @@ def mock_open(mock=None, read_data=''):
`read_data` is a string for the `read`, `readline` and `readlines` of the
file handle to return. This is an empty string by default.
"""
- _read_data = _to_stream(read_data)
- _state = [_read_data, None]
-
+ _read_data = _to_stream(read_data)
+ _state = [_read_data, None]
+
def _readlines_side_effect(*args, **kwargs):
if handle.readlines.return_value is not None:
return handle.readlines.return_value
- return _state[0].readlines(*args, **kwargs)
+ return _state[0].readlines(*args, **kwargs)
def _read_side_effect(*args, **kwargs):
if handle.read.return_value is not None:
return handle.read.return_value
- return _state[0].read(*args, **kwargs)
+ return _state[0].read(*args, **kwargs)
- def _readline_side_effect(*args, **kwargs):
+ def _readline_side_effect(*args, **kwargs):
yield from _iter_side_effect()
while True:
- yield _state[0].readline(*args, **kwargs)
+ yield _state[0].readline(*args, **kwargs)
def _iter_side_effect():
if handle.readline.return_value is not None:
@@ -2791,11 +2791,11 @@ def mock_open(mock=None, read_data=''):
for line in _state[0]:
yield line
- def _next_side_effect():
- if handle.readline.return_value is not None:
- return handle.readline.return_value
- return next(_state[0])
-
+ def _next_side_effect():
+ if handle.readline.return_value is not None:
+ return handle.readline.return_value
+ return next(_state[0])
+
global file_spec
if file_spec is None:
import _io
@@ -2817,10 +2817,10 @@ def mock_open(mock=None, read_data=''):
handle.readline.side_effect = _state[1]
handle.readlines.side_effect = _readlines_side_effect
handle.__iter__.side_effect = _iter_side_effect
- handle.__next__.side_effect = _next_side_effect
+ handle.__next__.side_effect = _next_side_effect
def reset_data(*args, **kwargs):
- _state[0] = _to_stream(read_data)
+ _state[0] = _to_stream(read_data)
if handle.readline.side_effect == _state[1]:
# Only reset the side effect if the user hasn't overridden it.
_state[1] = _readline_side_effect()
@@ -2841,10 +2841,10 @@ class PropertyMock(Mock):
Fetching a `PropertyMock` instance from an object calls the mock, with
no args. Setting it calls the mock with the value being set.
"""
- def _get_child_mock(self, /, **kwargs):
+ def _get_child_mock(self, /, **kwargs):
return MagicMock(**kwargs)
- def __get__(self, obj, obj_type=None):
+ def __get__(self, obj, obj_type=None):
return self()
def __set__(self, obj, val):
self(val)
@@ -2868,25 +2868,25 @@ def seal(mock):
continue
if not isinstance(m, NonCallableMock):
continue
- if isinstance(m._mock_children.get(attr), _SpecState):
- continue
+ if isinstance(m._mock_children.get(attr), _SpecState):
+ continue
if m._mock_new_parent is mock:
seal(m)
-
-
-class _AsyncIterator:
- """
- Wraps an iterator in an asynchronous iterator.
- """
- def __init__(self, iterator):
- self.iterator = iterator
- code_mock = NonCallableMock(spec_set=CodeType)
- code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE
- self.__dict__['__code__'] = code_mock
-
- async def __anext__(self):
- try:
- return next(self.iterator)
- except StopIteration:
- pass
- raise StopAsyncIteration
+
+
+class _AsyncIterator:
+ """
+ Wraps an iterator in an asynchronous iterator.
+ """
+ def __init__(self, iterator):
+ self.iterator = iterator
+ code_mock = NonCallableMock(spec_set=CodeType)
+ code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE
+ self.__dict__['__code__'] = code_mock
+
+ async def __anext__(self):
+ try:
+ return next(self.iterator)
+ except StopIteration:
+ pass
+ raise StopAsyncIteration