aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/trial/_synctest.py
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/trial/_synctest.py
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/trial/_synctest.py')
-rw-r--r--contrib/python/Twisted/py3/twisted/trial/_synctest.py1464
1 files changed, 1464 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/trial/_synctest.py b/contrib/python/Twisted/py3/twisted/trial/_synctest.py
new file mode 100644
index 0000000000..2cffc2c79e
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/trial/_synctest.py
@@ -0,0 +1,1464 @@
+# -*- test-case-name: twisted.trial.test -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Things likely to be used by writers of unit tests.
+
+Maintainer: Jonathan Lange
+"""
+
+
+import inspect
+import os
+import sys
+import tempfile
+import types
+import unittest as pyunit
+import warnings
+from dis import findlinestarts as _findlinestarts
+from typing import (
+ Any,
+ Callable,
+ Coroutine,
+ Generator,
+ Iterable,
+ List,
+ NoReturn,
+ Optional,
+ Tuple,
+ Type,
+ TypeVar,
+ Union,
+)
+
+# Python 2.7 and higher has skip support built-in
+from unittest import SkipTest
+
+from attrs import frozen
+from typing_extensions import ParamSpec
+
+from twisted.internet.defer import Deferred, ensureDeferred
+from twisted.python import failure, log, monkey
+from twisted.python.deprecate import (
+ DEPRECATION_WARNING_FORMAT,
+ getDeprecationWarningString,
+ getVersionString,
+ warnAboutFunction,
+)
+from twisted.python.reflect import fullyQualifiedName
+from twisted.python.util import runWithWarningsSuppressed
+from twisted.trial import itrial, util
+
+_P = ParamSpec("_P")
+T = TypeVar("T")
+
+
+class FailTest(AssertionError):
+ """
+ Raised to indicate the current test has failed to pass.
+ """
+
+
+@frozen
+class Todo:
+ """
+ Internal object used to mark a L{TestCase} as 'todo'. Tests marked 'todo'
+ are reported differently in Trial L{TestResult}s. If todo'd tests fail,
+ they do not fail the suite and the errors are reported in a separate
+ category. If todo'd tests succeed, Trial L{TestResult}s will report an
+ unexpected success.
+
+ @ivar reason: A string explaining why the test is marked 'todo'
+
+ @ivar errors: An iterable of exception types that the test is expected to
+ raise. If one of these errors is raised by the test, it will be
+ trapped. Raising any other kind of error will fail the test. If
+ L{None} then all errors will be trapped.
+ """
+
+ reason: str
+ errors: Optional[Iterable[Type[BaseException]]] = None
+
+ def __repr__(self) -> str:
+ return f"<Todo reason={self.reason!r} errors={self.errors!r}>"
+
+ def expected(self, failure):
+ """
+ @param failure: A L{twisted.python.failure.Failure}.
+
+ @return: C{True} if C{failure} is expected, C{False} otherwise.
+ """
+ if self.errors is None:
+ return True
+ for error in self.errors:
+ if failure.check(error):
+ return True
+ return False
+
+
+def makeTodo(
+ value: Union[
+ str, Tuple[Union[Type[BaseException], Iterable[Type[BaseException]]], str]
+ ]
+) -> Todo:
+ """
+ Return a L{Todo} object built from C{value}.
+
+ If C{value} is a string, return a Todo that expects any exception with
+ C{value} as a reason. If C{value} is a tuple, the second element is used
+ as the reason and the first element as the excepted error(s).
+
+ @param value: A string or a tuple of C{(errors, reason)}, where C{errors}
+ is either a single exception class or an iterable of exception classes.
+
+ @return: A L{Todo} object.
+ """
+ if isinstance(value, str):
+ return Todo(reason=value)
+ if isinstance(value, tuple):
+ errors, reason = value
+ if isinstance(errors, type):
+ iterableErrors: Iterable[Type[BaseException]] = [errors]
+ else:
+ iterableErrors = errors
+ return Todo(reason=reason, errors=iterableErrors)
+
+
+class _Warning:
+ """
+ A L{_Warning} instance represents one warning emitted through the Python
+ warning system (L{warnings}). This is used to insulate callers of
+ L{_collectWarnings} from changes to the Python warnings system which might
+ otherwise require changes to the warning objects that function passes to
+ the observer object it accepts.
+
+ @ivar message: The string which was passed as the message parameter to
+ L{warnings.warn}.
+
+ @ivar category: The L{Warning} subclass which was passed as the category
+ parameter to L{warnings.warn}.
+
+ @ivar filename: The name of the file containing the definition of the code
+ object which was C{stacklevel} frames above the call to
+ L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel}
+ parameter passed to L{warnings.warn}.
+
+ @ivar lineno: The source line associated with the active instruction of the
+ code object object which was C{stacklevel} frames above the call to
+ L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel}
+ parameter passed to L{warnings.warn}.
+ """
+
+ def __init__(self, message, category, filename, lineno):
+ self.message = message
+ self.category = category
+ self.filename = filename
+ self.lineno = lineno
+
+
+def _setWarningRegistryToNone(modules):
+ """
+ Disable the per-module cache for every module found in C{modules}, typically
+ C{sys.modules}.
+
+ @param modules: Dictionary of modules, typically sys.module dict
+ """
+ for v in list(modules.values()):
+ if v is not None:
+ try:
+ v.__warningregistry__ = None
+ except BaseException:
+ # Don't specify a particular exception type to handle in case
+ # some wacky object raises some wacky exception in response to
+ # the setattr attempt.
+ pass
+
+
+def _collectWarnings(observeWarning, f, *args, **kwargs):
+ """
+ Call C{f} with C{args} positional arguments and C{kwargs} keyword arguments
+ and collect all warnings which are emitted as a result in a list.
+
+ @param observeWarning: A callable which will be invoked with a L{_Warning}
+ instance each time a warning is emitted.
+
+ @return: The return value of C{f(*args, **kwargs)}.
+ """
+
+ def showWarning(message, category, filename, lineno, file=None, line=None):
+ assert isinstance(message, Warning)
+ observeWarning(_Warning(str(message), category, filename, lineno))
+
+ # Disable the per-module cache for every module otherwise if the warning
+ # which the caller is expecting us to collect was already emitted it won't
+ # be re-emitted by the call to f which happens below.
+ _setWarningRegistryToNone(sys.modules)
+
+ origFilters = warnings.filters[:]
+ origShow = warnings.showwarning
+ warnings.simplefilter("always")
+ try:
+ warnings.showwarning = showWarning
+ result = f(*args, **kwargs)
+ finally:
+ warnings.filters[:] = origFilters
+ warnings.showwarning = origShow
+ return result
+
+
+class UnsupportedTrialFeature(Exception):
+ """A feature of twisted.trial was used that pyunit cannot support."""
+
+
+class PyUnitResultAdapter:
+ """
+ Wrap a C{TestResult} from the standard library's C{unittest} so that it
+ supports the extended result types from Trial, and also supports
+ L{twisted.python.failure.Failure}s being passed to L{addError} and
+ L{addFailure}.
+ """
+
+ def __init__(self, original):
+ """
+ @param original: A C{TestResult} instance from C{unittest}.
+ """
+ self.original = original
+
+ def _exc_info(self, err):
+ return util.excInfoOrFailureToExcInfo(err)
+
+ def startTest(self, method):
+ self.original.startTest(method)
+
+ def stopTest(self, method):
+ self.original.stopTest(method)
+
+ def addFailure(self, test, fail):
+ self.original.addFailure(test, self._exc_info(fail))
+
+ def addError(self, test, error):
+ self.original.addError(test, self._exc_info(error))
+
+ def _unsupported(self, test, feature, info):
+ self.original.addFailure(
+ test,
+ (UnsupportedTrialFeature, UnsupportedTrialFeature(feature, info), None),
+ )
+
+ def addSkip(self, test, reason):
+ """
+ Report the skip as a failure.
+ """
+ self.original.addSkip(test, reason)
+
+ def addUnexpectedSuccess(self, test, todo=None):
+ """
+ Report the unexpected success as a failure.
+ """
+ self._unsupported(test, "unexpected success", todo)
+
+ def addExpectedFailure(self, test, error):
+ """
+ Report the expected failure (i.e. todo) as a failure.
+ """
+ self._unsupported(test, "expected failure", error)
+
+ def addSuccess(self, test):
+ self.original.addSuccess(test)
+
+ def upDownError(self, method, error, warn, printStatus):
+ pass
+
+
+class _AssertRaisesContext:
+ """
+ A helper for implementing C{assertRaises}. This is a context manager and a
+ helper method to support the non-context manager version of
+ C{assertRaises}.
+
+ @ivar _testCase: See C{testCase} parameter of C{__init__}
+
+ @ivar _expected: See C{expected} parameter of C{__init__}
+
+ @ivar _returnValue: The value returned by the callable being tested (only
+ when not being used as a context manager).
+
+ @ivar _expectedName: A short string describing the expected exception
+ (usually the name of the exception class).
+
+ @ivar exception: The exception which was raised by the function being
+ tested (if it raised one).
+ """
+
+ def __init__(self, testCase, expected):
+ """
+ @param testCase: The L{TestCase} instance which is used to raise a
+ test-failing exception when that is necessary.
+
+ @param expected: The exception type expected to be raised.
+ """
+ self._testCase = testCase
+ self._expected = expected
+ self._returnValue = None
+ try:
+ self._expectedName = self._expected.__name__
+ except AttributeError:
+ self._expectedName = str(self._expected)
+
+ def _handle(self, obj):
+ """
+ Call the given object using this object as a context manager.
+
+ @param obj: The object to call and which is expected to raise some
+ exception.
+ @type obj: L{object}
+
+ @return: Whatever exception is raised by C{obj()}.
+ @rtype: L{BaseException}
+ """
+ with self as context:
+ self._returnValue = obj()
+ return context.exception
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exceptionType, exceptionValue, traceback):
+ """
+ Check exit exception against expected exception.
+ """
+ # No exception raised.
+ if exceptionType is None:
+ self._testCase.fail(
+ "{} not raised ({} returned)".format(
+ self._expectedName, self._returnValue
+ )
+ )
+
+ if not isinstance(exceptionValue, exceptionType):
+ # Support some Python 2.6 ridiculousness. Exceptions raised using
+ # the C API appear here as the arguments you might pass to the
+ # exception class to create an exception instance. So... do that
+ # to turn them into the instances.
+ if isinstance(exceptionValue, tuple):
+ exceptionValue = exceptionType(*exceptionValue)
+ else:
+ exceptionValue = exceptionType(exceptionValue)
+
+ # Store exception so that it can be access from context.
+ self.exception = exceptionValue
+
+ # Wrong exception raised.
+ if not issubclass(exceptionType, self._expected):
+ reason = failure.Failure(exceptionValue, exceptionType, traceback)
+ self._testCase.fail(
+ "{} raised instead of {}:\n {}".format(
+ fullyQualifiedName(exceptionType),
+ self._expectedName,
+ reason.getTraceback(),
+ ),
+ )
+
+ # All good.
+ return True
+
+
+class _Assertions(pyunit.TestCase):
+ """
+ Replaces many of the built-in TestCase assertions. In general, these
+ assertions provide better error messages and are easier to use in
+ callbacks.
+ """
+
+ def fail(self, msg: Optional[object] = None) -> NoReturn:
+ """
+ Absolutely fail the test. Do not pass go, do not collect $200.
+
+ @param msg: the message that will be displayed as the reason for the
+ failure
+ """
+ raise self.failureException(msg)
+
+ def assertFalse(self, condition, msg=None):
+ """
+ Fail the test if C{condition} evaluates to True.
+
+ @param condition: any object that defines __nonzero__
+ """
+ super().assertFalse(condition, msg)
+ return condition
+
+ assertNot = assertFalse
+ failUnlessFalse = assertFalse
+ failIf = assertFalse
+
+ def assertTrue(self, condition, msg=None):
+ """
+ Fail the test if C{condition} evaluates to False.
+
+ @param condition: any object that defines __nonzero__
+ """
+ super().assertTrue(condition, msg)
+ return condition
+
+ assert_ = assertTrue
+ failUnlessTrue = assertTrue
+ failUnless = assertTrue
+
+ def assertRaises(self, exception, f=None, *args, **kwargs):
+ """
+ Fail the test unless calling the function C{f} with the given
+ C{args} and C{kwargs} raises C{exception}. The failure will report
+ the traceback and call stack of the unexpected exception.
+
+ @param exception: exception type that is to be expected
+ @param f: the function to call
+
+ @return: If C{f} is L{None}, a context manager which will make an
+ assertion about the exception raised from the suite it manages. If
+ C{f} is not L{None}, the exception raised by C{f}.
+
+ @raise self.failureException: Raised if the function call does
+ not raise an exception or if it raises an exception of a
+ different type.
+ """
+ context = _AssertRaisesContext(self, exception)
+ if f is None:
+ return context
+
+ return context._handle(lambda: f(*args, **kwargs))
+
+ # unittest.TestCase.assertRaises() is defined with 4 arguments
+ # but we define it with 5 arguments. So we need to tell mypy
+ # to ignore the following assignment to failUnlessRaises
+ failUnlessRaises = assertRaises # type: ignore[assignment]
+
+ def assertEqual(self, first, second, msg=None):
+ """
+ Fail the test if C{first} and C{second} are not equal.
+
+ @param msg: A string describing the failure that's included in the
+ exception.
+ """
+ super().assertEqual(first, second, msg)
+ return first
+
+ failUnlessEqual = assertEqual
+ failUnlessEquals = assertEqual
+ assertEquals = assertEqual
+
+ def assertIs(self, first, second, msg=None):
+ """
+ Fail the test if C{first} is not C{second}. This is an
+ obect-identity-equality test, not an object equality
+ (i.e. C{__eq__}) test.
+
+ @param msg: if msg is None, then the failure message will be
+ '%r is not %r' % (first, second)
+ """
+ if first is not second:
+ raise self.failureException(msg or f"{first!r} is not {second!r}")
+ return first
+
+ failUnlessIdentical = assertIs
+ assertIdentical = assertIs
+
+ def assertIsNot(self, first, second, msg=None):
+ """
+ Fail the test if C{first} is C{second}. This is an
+ obect-identity-equality test, not an object equality
+ (i.e. C{__eq__}) test.
+
+ @param msg: if msg is None, then the failure message will be
+ '%r is %r' % (first, second)
+ """
+ if first is second:
+ raise self.failureException(msg or f"{first!r} is {second!r}")
+ return first
+
+ failIfIdentical = assertIsNot
+ assertNotIdentical = assertIsNot
+
+ def assertNotEqual(self, first, second, msg=None):
+ """
+ Fail the test if C{first} == C{second}.
+
+ @param msg: if msg is None, then the failure message will be
+ '%r == %r' % (first, second)
+ """
+ if not first != second:
+ raise self.failureException(msg or f"{first!r} == {second!r}")
+ return first
+
+ assertNotEquals = assertNotEqual
+ failIfEquals = assertNotEqual
+ failIfEqual = assertNotEqual
+
+ def assertIn(self, containee, container, msg=None):
+ """
+ Fail the test if C{containee} is not found in C{container}.
+
+ @param containee: the value that should be in C{container}
+ @param container: a sequence type, or in the case of a mapping type,
+ will follow semantics of 'if key in dict.keys()'
+ @param msg: if msg is None, then the failure message will be
+ '%r not in %r' % (first, second)
+ """
+ if containee not in container:
+ raise self.failureException(msg or f"{containee!r} not in {container!r}")
+ return containee
+
+ failUnlessIn = assertIn
+
+ def assertNotIn(self, containee, container, msg=None):
+ """
+ Fail the test if C{containee} is found in C{container}.
+
+ @param containee: the value that should not be in C{container}
+ @param container: a sequence type, or in the case of a mapping type,
+ will follow semantics of 'if key in dict.keys()'
+ @param msg: if msg is None, then the failure message will be
+ '%r in %r' % (first, second)
+ """
+ if containee in container:
+ raise self.failureException(msg or f"{containee!r} in {container!r}")
+ return containee
+
+ failIfIn = assertNotIn
+
+ def assertNotAlmostEqual(self, first, second, places=7, msg=None, delta=None):
+ """
+ Fail if the two objects are equal as determined by their
+ difference rounded to the given number of decimal places
+ (default 7) and comparing to zero.
+
+ @note: decimal places (from zero) is usually not the same
+ as significant digits (measured from the most
+ significant digit).
+
+ @note: included for compatibility with PyUnit test cases
+ """
+ if round(second - first, places) == 0:
+ raise self.failureException(
+ msg or f"{first!r} == {second!r} within {places!r} places"
+ )
+ return first
+
+ assertNotAlmostEquals = assertNotAlmostEqual # type:ignore[assignment]
+ failIfAlmostEqual = assertNotAlmostEqual # type:ignore[assignment]
+ failIfAlmostEquals = assertNotAlmostEqual
+
+ def assertAlmostEqual(self, first, second, places=7, msg=None, delta=None):
+ """
+ Fail if the two objects are unequal as determined by their
+ difference rounded to the given number of decimal places
+ (default 7) and comparing to zero.
+
+ @note: decimal places (from zero) is usually not the same
+ as significant digits (measured from the most
+ significant digit).
+
+ @note: included for compatibility with PyUnit test cases
+ """
+ if round(second - first, places) != 0:
+ raise self.failureException(
+ msg or f"{first!r} != {second!r} within {places!r} places"
+ )
+ return first
+
+ assertAlmostEquals = assertAlmostEqual # type:ignore[assignment]
+ failUnlessAlmostEqual = assertAlmostEqual # type:ignore[assignment]
+
+ def assertApproximates(self, first, second, tolerance, msg=None):
+ """
+ Fail if C{first} - C{second} > C{tolerance}
+
+ @param msg: if msg is None, then the failure message will be
+ '%r ~== %r' % (first, second)
+ """
+ if abs(first - second) > tolerance:
+ raise self.failureException(msg or f"{first} ~== {second}")
+ return first
+
+ failUnlessApproximates = assertApproximates
+
+ def assertSubstring(self, substring, astring, msg=None):
+ """
+ Fail if C{substring} does not exist within C{astring}.
+ """
+ return self.failUnlessIn(substring, astring, msg)
+
+ failUnlessSubstring = assertSubstring
+
+ def assertNotSubstring(self, substring, astring, msg=None):
+ """
+ Fail if C{astring} contains C{substring}.
+ """
+ return self.failIfIn(substring, astring, msg)
+
+ failIfSubstring = assertNotSubstring
+
+ def assertWarns(self, category, message, filename, f, *args, **kwargs):
+ """
+ Fail if the given function doesn't generate the specified warning when
+ called. It calls the function, checks the warning, and forwards the
+ result of the function if everything is fine.
+
+ @param category: the category of the warning to check.
+ @param message: the output message of the warning to check.
+ @param filename: the filename where the warning should come from.
+ @param f: the function which is supposed to generate the warning.
+ @type f: any callable.
+ @param args: the arguments to C{f}.
+ @param kwargs: the keywords arguments to C{f}.
+
+ @return: the result of the original function C{f}.
+ """
+ warningsShown = []
+ result = _collectWarnings(warningsShown.append, f, *args, **kwargs)
+
+ if not warningsShown:
+ self.fail("No warnings emitted")
+ first = warningsShown[0]
+ for other in warningsShown[1:]:
+ if (other.message, other.category) != (first.message, first.category):
+ self.fail("Can't handle different warnings")
+ self.assertEqual(first.message, message)
+ self.assertIdentical(first.category, category)
+
+ # Use starts with because of .pyc/.pyo issues.
+ self.assertTrue(
+ filename.startswith(first.filename),
+ f"Warning in {first.filename!r}, expected {filename!r}",
+ )
+
+ # It would be nice to be able to check the line number as well, but
+ # different configurations actually end up reporting different line
+ # numbers (generally the variation is only 1 line, but that's enough
+ # to fail the test erroneously...).
+ # self.assertEqual(lineno, xxx)
+
+ return result
+
+ failUnlessWarns = assertWarns
+
+ def assertIsInstance(self, instance, classOrTuple, message=None):
+ """
+ Fail if C{instance} is not an instance of the given class or of
+ one of the given classes.
+
+ @param instance: the object to test the type (first argument of the
+ C{isinstance} call).
+ @type instance: any.
+ @param classOrTuple: the class or classes to test against (second
+ argument of the C{isinstance} call).
+ @type classOrTuple: class, type, or tuple.
+
+ @param message: Custom text to include in the exception text if the
+ assertion fails.
+ """
+ if not isinstance(instance, classOrTuple):
+ if message is None:
+ suffix = ""
+ else:
+ suffix = ": " + message
+ self.fail(f"{instance!r} is not an instance of {classOrTuple}{suffix}")
+
+ failUnlessIsInstance = assertIsInstance
+
+ def assertNotIsInstance(self, instance, classOrTuple):
+ """
+ Fail if C{instance} is an instance of the given class or of one of the
+ given classes.
+
+ @param instance: the object to test the type (first argument of the
+ C{isinstance} call).
+ @type instance: any.
+ @param classOrTuple: the class or classes to test against (second
+ argument of the C{isinstance} call).
+ @type classOrTuple: class, type, or tuple.
+ """
+ if isinstance(instance, classOrTuple):
+ self.fail(f"{instance!r} is an instance of {classOrTuple}")
+
+ failIfIsInstance = assertNotIsInstance
+
+ def successResultOf(
+ self,
+ deferred: Union[
+ Coroutine[Deferred[T], Any, T],
+ Generator[Deferred[T], Any, T],
+ Deferred[T],
+ ],
+ ) -> T:
+ """
+ Return the current success result of C{deferred} or raise
+ C{self.failureException}.
+
+ @param deferred: A L{Deferred<twisted.internet.defer.Deferred>} or
+ I{coroutine} which has a success result.
+
+ For a L{Deferred<twisted.internet.defer.Deferred>} this means
+ L{Deferred.callback<twisted.internet.defer.Deferred.callback>} or
+ L{Deferred.errback<twisted.internet.defer.Deferred.errback>} has
+ been called on it and it has reached the end of its callback chain
+ and the last callback or errback returned a
+ non-L{failure.Failure}.
+
+ For a I{coroutine} this means all awaited values have a success
+ result.
+
+ @raise SynchronousTestCase.failureException: If the
+ L{Deferred<twisted.internet.defer.Deferred>} has no result or has a
+ failure result.
+
+ @return: The result of C{deferred}.
+ """
+ deferred = ensureDeferred(deferred)
+ results: List[Union[T, failure.Failure]] = []
+ deferred.addBoth(results.append)
+
+ if not results:
+ self.fail(
+ "Success result expected on {!r}, found no result instead".format(
+ deferred
+ )
+ )
+
+ result = results[0]
+
+ if isinstance(result, failure.Failure):
+ self.fail(
+ "Success result expected on {!r}, "
+ "found failure result instead:\n{}".format(
+ deferred, result.getTraceback()
+ )
+ )
+ return result
+
+ def failureResultOf(self, deferred, *expectedExceptionTypes):
+ """
+ Return the current failure result of C{deferred} or raise
+ C{self.failureException}.
+
+ @param deferred: A L{Deferred<twisted.internet.defer.Deferred>} which
+ has a failure result. This means
+ L{Deferred.callback<twisted.internet.defer.Deferred.callback>} or
+ L{Deferred.errback<twisted.internet.defer.Deferred.errback>} has
+ been called on it and it has reached the end of its callback chain
+ and the last callback or errback raised an exception or returned a
+ L{failure.Failure}.
+ @type deferred: L{Deferred<twisted.internet.defer.Deferred>}
+
+ @param expectedExceptionTypes: Exception types to expect - if
+ provided, and the exception wrapped by the failure result is
+ not one of the types provided, then this test will fail.
+
+ @raise SynchronousTestCase.failureException: If the
+ L{Deferred<twisted.internet.defer.Deferred>} has no result, has a
+ success result, or has an unexpected failure result.
+
+ @return: The failure result of C{deferred}.
+ @rtype: L{failure.Failure}
+ """
+ deferred = ensureDeferred(deferred)
+ result = []
+ deferred.addBoth(result.append)
+
+ if not result:
+ self.fail(
+ "Failure result expected on {!r}, found no result instead".format(
+ deferred
+ )
+ )
+
+ result = result[0]
+
+ if not isinstance(result, failure.Failure):
+ self.fail(
+ "Failure result expected on {!r}, "
+ "found success result ({!r}) instead".format(deferred, result)
+ )
+
+ if expectedExceptionTypes and not result.check(*expectedExceptionTypes):
+ expectedString = " or ".join(
+ [".".join((t.__module__, t.__name__)) for t in expectedExceptionTypes]
+ )
+
+ self.fail(
+ "Failure of type ({}) expected on {!r}, "
+ "found type {!r} instead: {}".format(
+ expectedString, deferred, result.type, result.getTraceback()
+ )
+ )
+
+ return result
+
+ def assertNoResult(self, deferred):
+ """
+ Assert that C{deferred} does not have a result at this point.
+
+ If the assertion succeeds, then the result of C{deferred} is left
+ unchanged. Otherwise, any L{failure.Failure} result is swallowed.
+
+ @param deferred: A L{Deferred<twisted.internet.defer.Deferred>} without
+ a result. This means that neither
+ L{Deferred.callback<twisted.internet.defer.Deferred.callback>} nor
+ L{Deferred.errback<twisted.internet.defer.Deferred.errback>} has
+ been called, or that the
+ L{Deferred<twisted.internet.defer.Deferred>} is waiting on another
+ L{Deferred<twisted.internet.defer.Deferred>} for a result.
+ @type deferred: L{Deferred<twisted.internet.defer.Deferred>}
+
+ @raise SynchronousTestCase.failureException: If the
+ L{Deferred<twisted.internet.defer.Deferred>} has a result.
+ """
+ deferred = ensureDeferred(deferred)
+ result = []
+
+ def cb(res):
+ result.append(res)
+ return res
+
+ deferred.addBoth(cb)
+
+ if result:
+ # If there is already a failure, the self.fail below will
+ # report it, so swallow it in the deferred
+ deferred.addErrback(lambda _: None)
+ self.fail(
+ "No result expected on {!r}, found {!r} instead".format(
+ deferred, result[0]
+ )
+ )
+
+
+class _LogObserver:
+ """
+ Observes the Twisted logs and catches any errors.
+
+ @ivar _errors: A C{list} of L{Failure} instances which were received as
+ error events from the Twisted logging system.
+
+ @ivar _added: A C{int} giving the number of times C{_add} has been called
+ less the number of times C{_remove} has been called; used to only add
+ this observer to the Twisted logging since once, regardless of the
+ number of calls to the add method.
+
+ @ivar _ignored: A C{list} of exception types which will not be recorded.
+ """
+
+ def __init__(self):
+ self._errors = []
+ self._added = 0
+ self._ignored = []
+
+ def _add(self):
+ if self._added == 0:
+ log.addObserver(self.gotEvent)
+ self._added += 1
+
+ def _remove(self):
+ self._added -= 1
+ if self._added == 0:
+ log.removeObserver(self.gotEvent)
+
+ def _ignoreErrors(self, *errorTypes):
+ """
+ Do not store any errors with any of the given types.
+ """
+ self._ignored.extend(errorTypes)
+
+ def _clearIgnores(self):
+ """
+ Stop ignoring any errors we might currently be ignoring.
+ """
+ self._ignored = []
+
+ def flushErrors(self, *errorTypes):
+ """
+ Flush errors from the list of caught errors. If no arguments are
+ specified, remove all errors. If arguments are specified, only remove
+ errors of those types from the stored list.
+ """
+ if errorTypes:
+ flushed = []
+ remainder = []
+ for f in self._errors:
+ if f.check(*errorTypes):
+ flushed.append(f)
+ else:
+ remainder.append(f)
+ self._errors = remainder
+ else:
+ flushed = self._errors
+ self._errors = []
+ return flushed
+
+ def getErrors(self):
+ """
+ Return a list of errors caught by this observer.
+ """
+ return self._errors
+
+ def gotEvent(self, event):
+ """
+ The actual observer method. Called whenever a message is logged.
+
+ @param event: A dictionary containing the log message. Actual
+ structure undocumented (see source for L{twisted.python.log}).
+ """
+ if event.get("isError", False) and "failure" in event:
+ f = event["failure"]
+ if len(self._ignored) == 0 or not f.check(*self._ignored):
+ self._errors.append(f)
+
+
+_logObserver = _LogObserver()
+
+
+class SynchronousTestCase(_Assertions):
+ """
+ A unit test. The atom of the unit testing universe.
+
+ This class extends C{unittest.TestCase} from the standard library. A number
+ of convenient testing helpers are added, including logging and warning
+ integration, monkey-patching support, and more.
+
+ To write a unit test, subclass C{SynchronousTestCase} and define a method
+ (say, 'test_foo') on the subclass. To run the test, instantiate your
+ subclass with the name of the method, and call L{run} on the instance,
+ passing a L{TestResult} object.
+
+ The C{trial} script will automatically find any C{SynchronousTestCase}
+ subclasses defined in modules beginning with 'test_' and construct test
+ cases for all methods beginning with 'test'.
+
+ If an error is logged during the test run, the test will fail with an
+ error. See L{log.err}.
+
+ @ivar failureException: An exception class, defaulting to C{FailTest}. If
+ the test method raises this exception, it will be reported as a failure,
+ rather than an exception. All of the assertion methods raise this if the
+ assertion fails.
+
+ @ivar skip: L{None} or a string explaining why this test is to be
+ skipped. If defined, the test will not be run. Instead, it will be
+ reported to the result object as 'skipped' (if the C{TestResult} supports
+ skipping).
+
+ @ivar todo: L{None}, a string or a tuple of C{(errors, reason)} where
+ C{errors} is either an exception class or an iterable of exception
+ classes, and C{reason} is a string. See L{Todo} or L{makeTodo} for more
+ information.
+
+ @ivar suppress: L{None} or a list of tuples of C{(args, kwargs)} to be
+ passed to C{warnings.filterwarnings}. Use these to suppress warnings
+ raised in a test. Useful for testing deprecated code. See also
+ L{util.suppress}.
+ """
+
+ failureException = FailTest
+
+ def __init__(self, methodName="runTest"):
+ super().__init__(methodName)
+ self._passed = False
+ self._cleanups = []
+ self._testMethodName = methodName
+ testMethod = getattr(self, methodName)
+ self._parents = [testMethod, self, sys.modules.get(self.__class__.__module__)]
+
+ def __eq__(self, other: object) -> bool:
+ """
+ Override the comparison defined by the base TestCase which considers
+ instances of the same class with the same _testMethodName to be
+ equal. Since trial puts TestCase instances into a set, that
+ definition of comparison makes it impossible to run the same test
+ method twice. Most likely, trial should stop using a set to hold
+ tests, but until it does, this is necessary on Python 2.6. -exarkun
+ """
+ if isinstance(other, SynchronousTestCase):
+ return self is other
+ else:
+ return NotImplemented
+
+ def __hash__(self):
+ return hash((self.__class__, self._testMethodName))
+
+ def shortDescription(self):
+ desc = super().shortDescription()
+ if desc is None:
+ return self._testMethodName
+ return desc
+
+ def getSkip(self) -> Tuple[bool, Optional[str]]:
+ """
+ Return the skip reason set on this test, if any is set. Checks on the
+ instance first, then the class, then the module, then packages. As
+ soon as it finds something with a C{skip} attribute, returns that in
+ a tuple (L{True}, L{str}).
+ If the C{skip} attribute does not exist, look for C{__unittest_skip__}
+ and C{__unittest_skip_why__} attributes which are set by the standard
+ library L{unittest.skip} function.
+ Returns (L{False}, L{None}) if it cannot find anything.
+ See L{TestCase} docstring for more details.
+ """
+ skipReason = util.acquireAttribute(self._parents, "skip", None)
+ doSkip = skipReason is not None
+ if skipReason is None:
+ doSkip = getattr(self, "__unittest_skip__", False)
+ if doSkip:
+ skipReason = getattr(self, "__unittest_skip_why__", "")
+ return (doSkip, skipReason)
+
+ def getTodo(self):
+ """
+ Return a L{Todo} object if the test is marked todo. Checks on the
+ instance first, then the class, then the module, then packages. As
+ soon as it finds something with a C{todo} attribute, returns that.
+ Returns L{None} if it cannot find anything. See L{TestCase} docstring
+ for more details.
+ """
+ todo = util.acquireAttribute(self._parents, "todo", None)
+ if todo is None:
+ return None
+ return makeTodo(todo)
+
+ def runTest(self):
+ """
+ If no C{methodName} argument is passed to the constructor, L{run} will
+ treat this method as the thing with the actual test inside.
+ """
+
+ def run(self, result):
+ """
+ Run the test case, storing the results in C{result}.
+
+ First runs C{setUp} on self, then runs the test method (defined in the
+ constructor), then runs C{tearDown}. As with the standard library
+ L{unittest.TestCase}, the return value of these methods is disregarded.
+ In particular, returning a L{Deferred<twisted.internet.defer.Deferred>}
+ has no special additional consequences.
+
+ @param result: A L{TestResult} object.
+ """
+ log.msg("--> %s <--" % (self.id()))
+ new_result = itrial.IReporter(result, None)
+ if new_result is None:
+ result = PyUnitResultAdapter(result)
+ else:
+ result = new_result
+ result.startTest(self)
+ (doSkip, skipReason) = self.getSkip()
+ if doSkip: # don't run test methods that are marked as .skip
+ result.addSkip(self, skipReason)
+ result.stopTest(self)
+ return
+
+ self._passed = False
+ self._warnings = []
+
+ self._installObserver()
+ # All the code inside _runFixturesAndTest will be run such that warnings
+ # emitted by it will be collected and retrievable by flushWarnings.
+ _collectWarnings(self._warnings.append, self._runFixturesAndTest, result)
+
+ # Any collected warnings which the test method didn't flush get
+ # re-emitted so they'll be logged or show up on stdout or whatever.
+ for w in self.flushWarnings():
+ try:
+ warnings.warn_explicit(**w)
+ except BaseException:
+ result.addError(self, failure.Failure())
+
+ result.stopTest(self)
+
+ # f should be a positional only argument but that is a breaking change
+ # see https://github.com/twisted/twisted/issues/11967
+ def addCleanup( # type: ignore[override]
+ self, f: Callable[_P, object], *args: _P.args, **kwargs: _P.kwargs
+ ) -> None:
+ """
+ Add the given function to a list of functions to be called after the
+ test has run, but before C{tearDown}.
+
+ Functions will be run in reverse order of being added. This helps
+ ensure that tear down complements set up.
+
+ As with all aspects of L{SynchronousTestCase}, Deferreds are not
+ supported in cleanup functions.
+ """
+ self._cleanups.append((f, args, kwargs))
+
+ def patch(self, obj, attribute, value):
+ """
+ Monkey patch an object for the duration of the test.
+
+ The monkey patch will be reverted at the end of the test using the
+ L{addCleanup} mechanism.
+
+ The L{monkey.MonkeyPatcher} is returned so that users can restore and
+ re-apply the monkey patch within their tests.
+
+ @param obj: The object to monkey patch.
+ @param attribute: The name of the attribute to change.
+ @param value: The value to set the attribute to.
+ @return: A L{monkey.MonkeyPatcher} object.
+ """
+ monkeyPatch = monkey.MonkeyPatcher((obj, attribute, value))
+ monkeyPatch.patch()
+ self.addCleanup(monkeyPatch.restore)
+ return monkeyPatch
+
+ def flushLoggedErrors(self, *errorTypes):
+ """
+ Remove stored errors received from the log.
+
+ C{TestCase} stores each error logged during the run of the test and
+ reports them as errors during the cleanup phase (after C{tearDown}).
+
+ @param errorTypes: If unspecified, flush all errors. Otherwise, only
+ flush errors that match the given types.
+
+ @return: A list of failures that have been removed.
+ """
+ return self._observer.flushErrors(*errorTypes)
+
+ def flushWarnings(self, offendingFunctions=None):
+ """
+ Remove stored warnings from the list of captured warnings and return
+ them.
+
+ @param offendingFunctions: If L{None}, all warnings issued during the
+ currently running test will be flushed. Otherwise, only warnings
+ which I{point} to a function included in this list will be flushed.
+ All warnings include a filename and source line number; if these
+ parts of a warning point to a source line which is part of a
+ function, then the warning I{points} to that function.
+ @type offendingFunctions: L{None} or L{list} of functions or methods.
+
+ @raise ValueError: If C{offendingFunctions} is not L{None} and includes
+ an object which is not a L{types.FunctionType} or
+ L{types.MethodType} instance.
+
+ @return: A C{list}, each element of which is a C{dict} giving
+ information about one warning which was flushed by this call. The
+ keys of each C{dict} are:
+
+ - C{'message'}: The string which was passed as the I{message}
+ parameter to L{warnings.warn}.
+
+ - C{'category'}: The warning subclass which was passed as the
+ I{category} parameter to L{warnings.warn}.
+
+ - C{'filename'}: The name of the file containing the definition
+ of the code object which was C{stacklevel} frames above the
+ call to L{warnings.warn}, where C{stacklevel} is the value of
+ the C{stacklevel} parameter passed to L{warnings.warn}.
+
+ - C{'lineno'}: The source line associated with the active
+ instruction of the code object object which was C{stacklevel}
+ frames above the call to L{warnings.warn}, where
+ C{stacklevel} is the value of the C{stacklevel} parameter
+ passed to L{warnings.warn}.
+ """
+ if offendingFunctions is None:
+ toFlush = self._warnings[:]
+ self._warnings[:] = []
+ else:
+ toFlush = []
+ for aWarning in self._warnings:
+ for aFunction in offendingFunctions:
+ if not isinstance(
+ aFunction, (types.FunctionType, types.MethodType)
+ ):
+ raise ValueError(f"{aFunction!r} is not a function or method")
+
+ # inspect.getabsfile(aFunction) sometimes returns a
+ # filename which disagrees with the filename the warning
+ # system generates. This seems to be because a
+ # function's code object doesn't deal with source files
+ # being renamed. inspect.getabsfile(module) seems
+ # better (or at least agrees with the warning system
+ # more often), and does some normalization for us which
+ # is desirable. inspect.getmodule() is attractive, but
+ # somewhat broken in Python < 2.6. See Python bug 4845.
+ aModule = sys.modules[aFunction.__module__]
+ filename = inspect.getabsfile(aModule)
+
+ if filename != os.path.normcase(aWarning.filename):
+ continue
+ lineNumbers = [
+ lineNumber
+ for _, lineNumber in _findlinestarts(aFunction.__code__)
+ ]
+ if not (min(lineNumbers) <= aWarning.lineno <= max(lineNumbers)):
+ continue
+ # The warning points to this function, flush it and move on
+ # to the next warning.
+ toFlush.append(aWarning)
+ break
+ # Remove everything which is being flushed.
+ list(map(self._warnings.remove, toFlush))
+
+ return [
+ {
+ "message": w.message,
+ "category": w.category,
+ "filename": w.filename,
+ "lineno": w.lineno,
+ }
+ for w in toFlush
+ ]
+
+ def getDeprecatedModuleAttribute(self, moduleName, name, version, message=None):
+ """
+ Retrieve a module attribute which should have been deprecated,
+ and assert that we saw the appropriate deprecation warning.
+
+ @type moduleName: C{str}
+ @param moduleName: Fully-qualified Python name of the module containing
+ the deprecated attribute; if called from the same module as the
+ attributes are being deprecated in, using the C{__name__} global can
+ be helpful
+
+ @type name: C{str}
+ @param name: Attribute name which we expect to be deprecated
+
+ @param version: The first L{version<twisted.python.versions.Version>} that
+ the module attribute was deprecated.
+
+ @type message: C{str}
+ @param message: (optional) The expected deprecation message for the module attribute
+
+ @return: The given attribute from the named module
+
+ @raise FailTest: if no warnings were emitted on getattr, or if the
+ L{DeprecationWarning} emitted did not produce the canonical
+ please-use-something-else message that is standard for Twisted
+ deprecations according to the given version and replacement.
+
+ @since: Twisted 21.2.0
+ """
+ fqpn = moduleName + "." + name
+ module = sys.modules[moduleName]
+ attr = getattr(module, name)
+ warningsShown = self.flushWarnings([self.getDeprecatedModuleAttribute])
+ if len(warningsShown) == 0:
+ self.fail(f"{fqpn} is not deprecated.")
+
+ observedWarning = warningsShown[0]["message"]
+ expectedWarning = DEPRECATION_WARNING_FORMAT % {
+ "fqpn": fqpn,
+ "version": getVersionString(version),
+ }
+ if message is not None:
+ expectedWarning = expectedWarning + ": " + message
+ self.assert_(
+ observedWarning.startswith(expectedWarning),
+ f"Expected {observedWarning!r} to start with {expectedWarning!r}",
+ )
+
+ return attr
+
+ def callDeprecated(self, version, f, *args, **kwargs):
+ """
+ Call a function that should have been deprecated at a specific version
+ and in favor of a specific alternative, and assert that it was thusly
+ deprecated.
+
+ @param version: A 2-sequence of (since, replacement), where C{since} is
+ a the first L{version<incremental.Version>} that C{f}
+ should have been deprecated since, and C{replacement} is a suggested
+ replacement for the deprecated functionality, as described by
+ L{twisted.python.deprecate.deprecated}. If there is no suggested
+ replacement, this parameter may also be simply a
+ L{version<incremental.Version>} by itself.
+
+ @param f: The deprecated function to call.
+
+ @param args: The arguments to pass to C{f}.
+
+ @param kwargs: The keyword arguments to pass to C{f}.
+
+ @return: Whatever C{f} returns.
+
+ @raise Exception: Whatever C{f} raises. If any exception is
+ raised by C{f}, though, no assertions will be made about emitted
+ deprecations.
+
+ @raise FailTest: if no warnings were emitted by C{f}, or if the
+ L{DeprecationWarning} emitted did not produce the canonical
+ please-use-something-else message that is standard for Twisted
+ deprecations according to the given version and replacement.
+ """
+ result = f(*args, **kwargs)
+ warningsShown = self.flushWarnings([self.callDeprecated])
+ try:
+ info = list(version)
+ except TypeError:
+ since = version
+ replacement = None
+ else:
+ [since, replacement] = info
+
+ if len(warningsShown) == 0:
+ self.fail(f"{f!r} is not deprecated.")
+
+ observedWarning = warningsShown[0]["message"]
+ expectedWarning = getDeprecationWarningString(f, since, replacement=replacement)
+ self.assertEqual(expectedWarning, observedWarning)
+
+ return result
+
+ def mktemp(self):
+ """
+ Create a new path name which can be used for a new file or directory.
+
+ The result is a relative path that is guaranteed to be unique within the
+ current working directory. The parent of the path will exist, but the
+ path will not.
+
+ For a temporary directory call os.mkdir on the path. For a temporary
+ file just create the file (e.g. by opening the path for writing and then
+ closing it).
+
+ @return: The newly created path
+ @rtype: C{str}
+ """
+ MAX_FILENAME = 32 # some platforms limit lengths of filenames
+ base = os.path.join(
+ self.__class__.__module__[:MAX_FILENAME],
+ self.__class__.__name__[:MAX_FILENAME],
+ self._testMethodName[:MAX_FILENAME],
+ )
+ if not os.path.exists(base):
+ os.makedirs(base)
+ # With 3.11 or older mkdtemp returns a relative path.
+ # With newer it is absolute.
+ # Here we make sure we always handle a relative path.
+ # See https://github.com/python/cpython/issues/51574
+ dirname = os.path.relpath(tempfile.mkdtemp("", "", base))
+ return os.path.join(dirname, "temp")
+
+ def _getSuppress(self):
+ """
+ Returns any warning suppressions set for this test. Checks on the
+ instance first, then the class, then the module, then packages. As
+ soon as it finds something with a C{suppress} attribute, returns that.
+ Returns any empty list (i.e. suppress no warnings) if it cannot find
+ anything. See L{TestCase} docstring for more details.
+ """
+ return util.acquireAttribute(self._parents, "suppress", [])
+
+ def _getSkipReason(self, method, skip):
+ """
+ Return the reason to use for skipping a test method.
+
+ @param method: The method which produced the skip.
+ @param skip: A L{unittest.SkipTest} instance raised by C{method}.
+ """
+ if len(skip.args) > 0:
+ return skip.args[0]
+
+ warnAboutFunction(
+ method,
+ "Do not raise unittest.SkipTest with no arguments! Give a reason "
+ "for skipping tests!",
+ )
+ return skip
+
+ def _run(self, suppress, todo, method, result):
+ """
+ Run a single method, either a test method or fixture.
+
+ @param suppress: Any warnings to suppress, as defined by the C{suppress}
+ attribute on this method, test case, or the module it is defined in.
+
+ @param todo: Any expected failure or failures, as defined by the C{todo}
+ attribute on this method, test case, or the module it is defined in.
+
+ @param method: The method to run.
+
+ @param result: The TestResult instance to which to report results.
+
+ @return: C{True} if the method fails and no further method/fixture calls
+ should be made, C{False} otherwise.
+ """
+ if inspect.isgeneratorfunction(method):
+ exc = TypeError(
+ "{!r} is a generator function and therefore will never run".format(
+ method
+ )
+ )
+ result.addError(self, failure.Failure(exc))
+ return True
+ try:
+ runWithWarningsSuppressed(suppress, method)
+ except SkipTest as e:
+ result.addSkip(self, self._getSkipReason(method, e))
+ except BaseException:
+ reason = failure.Failure()
+ if todo is None or not todo.expected(reason):
+ if reason.check(self.failureException):
+ addResult = result.addFailure
+ else:
+ addResult = result.addError
+ addResult(self, reason)
+ else:
+ result.addExpectedFailure(self, reason, todo)
+ else:
+ return False
+ return True
+
+ def _runFixturesAndTest(self, result):
+ """
+ Run C{setUp}, a test method, test cleanups, and C{tearDown}.
+
+ @param result: The TestResult instance to which to report results.
+ """
+ suppress = self._getSuppress()
+ try:
+ if self._run(suppress, None, self.setUp, result):
+ return
+
+ todo = self.getTodo()
+ method = getattr(self, self._testMethodName)
+ failed = self._run(suppress, todo, method, result)
+ finally:
+ self._runCleanups(result)
+
+ if todo and not failed:
+ result.addUnexpectedSuccess(self, todo)
+
+ if self._run(suppress, None, self.tearDown, result):
+ failed = True
+
+ for error in self._observer.getErrors():
+ result.addError(self, error)
+ failed = True
+ self._observer.flushErrors()
+ self._removeObserver()
+
+ if not (failed or todo):
+ result.addSuccess(self)
+
+ def _runCleanups(self, result):
+ """
+ Synchronously run any cleanups which have been added.
+ """
+ while len(self._cleanups) > 0:
+ f, args, kwargs = self._cleanups.pop()
+ try:
+ f(*args, **kwargs)
+ except BaseException:
+ f = failure.Failure()
+ result.addError(self, f)
+
+ def _installObserver(self):
+ self._observer = _logObserver
+ self._observer._add()
+
+ def _removeObserver(self):
+ self._observer._remove()