diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/trial/_synctest.py | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/trial/_synctest.py')
-rw-r--r-- | contrib/python/Twisted/py3/twisted/trial/_synctest.py | 1464 |
1 files changed, 1464 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/trial/_synctest.py b/contrib/python/Twisted/py3/twisted/trial/_synctest.py new file mode 100644 index 0000000000..2cffc2c79e --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/trial/_synctest.py @@ -0,0 +1,1464 @@ +# -*- test-case-name: twisted.trial.test -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Things likely to be used by writers of unit tests. + +Maintainer: Jonathan Lange +""" + + +import inspect +import os +import sys +import tempfile +import types +import unittest as pyunit +import warnings +from dis import findlinestarts as _findlinestarts +from typing import ( + Any, + Callable, + Coroutine, + Generator, + Iterable, + List, + NoReturn, + Optional, + Tuple, + Type, + TypeVar, + Union, +) + +# Python 2.7 and higher has skip support built-in +from unittest import SkipTest + +from attrs import frozen +from typing_extensions import ParamSpec + +from twisted.internet.defer import Deferred, ensureDeferred +from twisted.python import failure, log, monkey +from twisted.python.deprecate import ( + DEPRECATION_WARNING_FORMAT, + getDeprecationWarningString, + getVersionString, + warnAboutFunction, +) +from twisted.python.reflect import fullyQualifiedName +from twisted.python.util import runWithWarningsSuppressed +from twisted.trial import itrial, util + +_P = ParamSpec("_P") +T = TypeVar("T") + + +class FailTest(AssertionError): + """ + Raised to indicate the current test has failed to pass. + """ + + +@frozen +class Todo: + """ + Internal object used to mark a L{TestCase} as 'todo'. Tests marked 'todo' + are reported differently in Trial L{TestResult}s. If todo'd tests fail, + they do not fail the suite and the errors are reported in a separate + category. If todo'd tests succeed, Trial L{TestResult}s will report an + unexpected success. + + @ivar reason: A string explaining why the test is marked 'todo' + + @ivar errors: An iterable of exception types that the test is expected to + raise. If one of these errors is raised by the test, it will be + trapped. Raising any other kind of error will fail the test. If + L{None} then all errors will be trapped. + """ + + reason: str + errors: Optional[Iterable[Type[BaseException]]] = None + + def __repr__(self) -> str: + return f"<Todo reason={self.reason!r} errors={self.errors!r}>" + + def expected(self, failure): + """ + @param failure: A L{twisted.python.failure.Failure}. + + @return: C{True} if C{failure} is expected, C{False} otherwise. + """ + if self.errors is None: + return True + for error in self.errors: + if failure.check(error): + return True + return False + + +def makeTodo( + value: Union[ + str, Tuple[Union[Type[BaseException], Iterable[Type[BaseException]]], str] + ] +) -> Todo: + """ + Return a L{Todo} object built from C{value}. + + If C{value} is a string, return a Todo that expects any exception with + C{value} as a reason. If C{value} is a tuple, the second element is used + as the reason and the first element as the excepted error(s). + + @param value: A string or a tuple of C{(errors, reason)}, where C{errors} + is either a single exception class or an iterable of exception classes. + + @return: A L{Todo} object. + """ + if isinstance(value, str): + return Todo(reason=value) + if isinstance(value, tuple): + errors, reason = value + if isinstance(errors, type): + iterableErrors: Iterable[Type[BaseException]] = [errors] + else: + iterableErrors = errors + return Todo(reason=reason, errors=iterableErrors) + + +class _Warning: + """ + A L{_Warning} instance represents one warning emitted through the Python + warning system (L{warnings}). This is used to insulate callers of + L{_collectWarnings} from changes to the Python warnings system which might + otherwise require changes to the warning objects that function passes to + the observer object it accepts. + + @ivar message: The string which was passed as the message parameter to + L{warnings.warn}. + + @ivar category: The L{Warning} subclass which was passed as the category + parameter to L{warnings.warn}. + + @ivar filename: The name of the file containing the definition of the code + object which was C{stacklevel} frames above the call to + L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel} + parameter passed to L{warnings.warn}. + + @ivar lineno: The source line associated with the active instruction of the + code object object which was C{stacklevel} frames above the call to + L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel} + parameter passed to L{warnings.warn}. + """ + + def __init__(self, message, category, filename, lineno): + self.message = message + self.category = category + self.filename = filename + self.lineno = lineno + + +def _setWarningRegistryToNone(modules): + """ + Disable the per-module cache for every module found in C{modules}, typically + C{sys.modules}. + + @param modules: Dictionary of modules, typically sys.module dict + """ + for v in list(modules.values()): + if v is not None: + try: + v.__warningregistry__ = None + except BaseException: + # Don't specify a particular exception type to handle in case + # some wacky object raises some wacky exception in response to + # the setattr attempt. + pass + + +def _collectWarnings(observeWarning, f, *args, **kwargs): + """ + Call C{f} with C{args} positional arguments and C{kwargs} keyword arguments + and collect all warnings which are emitted as a result in a list. + + @param observeWarning: A callable which will be invoked with a L{_Warning} + instance each time a warning is emitted. + + @return: The return value of C{f(*args, **kwargs)}. + """ + + def showWarning(message, category, filename, lineno, file=None, line=None): + assert isinstance(message, Warning) + observeWarning(_Warning(str(message), category, filename, lineno)) + + # Disable the per-module cache for every module otherwise if the warning + # which the caller is expecting us to collect was already emitted it won't + # be re-emitted by the call to f which happens below. + _setWarningRegistryToNone(sys.modules) + + origFilters = warnings.filters[:] + origShow = warnings.showwarning + warnings.simplefilter("always") + try: + warnings.showwarning = showWarning + result = f(*args, **kwargs) + finally: + warnings.filters[:] = origFilters + warnings.showwarning = origShow + return result + + +class UnsupportedTrialFeature(Exception): + """A feature of twisted.trial was used that pyunit cannot support.""" + + +class PyUnitResultAdapter: + """ + Wrap a C{TestResult} from the standard library's C{unittest} so that it + supports the extended result types from Trial, and also supports + L{twisted.python.failure.Failure}s being passed to L{addError} and + L{addFailure}. + """ + + def __init__(self, original): + """ + @param original: A C{TestResult} instance from C{unittest}. + """ + self.original = original + + def _exc_info(self, err): + return util.excInfoOrFailureToExcInfo(err) + + def startTest(self, method): + self.original.startTest(method) + + def stopTest(self, method): + self.original.stopTest(method) + + def addFailure(self, test, fail): + self.original.addFailure(test, self._exc_info(fail)) + + def addError(self, test, error): + self.original.addError(test, self._exc_info(error)) + + def _unsupported(self, test, feature, info): + self.original.addFailure( + test, + (UnsupportedTrialFeature, UnsupportedTrialFeature(feature, info), None), + ) + + def addSkip(self, test, reason): + """ + Report the skip as a failure. + """ + self.original.addSkip(test, reason) + + def addUnexpectedSuccess(self, test, todo=None): + """ + Report the unexpected success as a failure. + """ + self._unsupported(test, "unexpected success", todo) + + def addExpectedFailure(self, test, error): + """ + Report the expected failure (i.e. todo) as a failure. + """ + self._unsupported(test, "expected failure", error) + + def addSuccess(self, test): + self.original.addSuccess(test) + + def upDownError(self, method, error, warn, printStatus): + pass + + +class _AssertRaisesContext: + """ + A helper for implementing C{assertRaises}. This is a context manager and a + helper method to support the non-context manager version of + C{assertRaises}. + + @ivar _testCase: See C{testCase} parameter of C{__init__} + + @ivar _expected: See C{expected} parameter of C{__init__} + + @ivar _returnValue: The value returned by the callable being tested (only + when not being used as a context manager). + + @ivar _expectedName: A short string describing the expected exception + (usually the name of the exception class). + + @ivar exception: The exception which was raised by the function being + tested (if it raised one). + """ + + def __init__(self, testCase, expected): + """ + @param testCase: The L{TestCase} instance which is used to raise a + test-failing exception when that is necessary. + + @param expected: The exception type expected to be raised. + """ + self._testCase = testCase + self._expected = expected + self._returnValue = None + try: + self._expectedName = self._expected.__name__ + except AttributeError: + self._expectedName = str(self._expected) + + def _handle(self, obj): + """ + Call the given object using this object as a context manager. + + @param obj: The object to call and which is expected to raise some + exception. + @type obj: L{object} + + @return: Whatever exception is raised by C{obj()}. + @rtype: L{BaseException} + """ + with self as context: + self._returnValue = obj() + return context.exception + + def __enter__(self): + return self + + def __exit__(self, exceptionType, exceptionValue, traceback): + """ + Check exit exception against expected exception. + """ + # No exception raised. + if exceptionType is None: + self._testCase.fail( + "{} not raised ({} returned)".format( + self._expectedName, self._returnValue + ) + ) + + if not isinstance(exceptionValue, exceptionType): + # Support some Python 2.6 ridiculousness. Exceptions raised using + # the C API appear here as the arguments you might pass to the + # exception class to create an exception instance. So... do that + # to turn them into the instances. + if isinstance(exceptionValue, tuple): + exceptionValue = exceptionType(*exceptionValue) + else: + exceptionValue = exceptionType(exceptionValue) + + # Store exception so that it can be access from context. + self.exception = exceptionValue + + # Wrong exception raised. + if not issubclass(exceptionType, self._expected): + reason = failure.Failure(exceptionValue, exceptionType, traceback) + self._testCase.fail( + "{} raised instead of {}:\n {}".format( + fullyQualifiedName(exceptionType), + self._expectedName, + reason.getTraceback(), + ), + ) + + # All good. + return True + + +class _Assertions(pyunit.TestCase): + """ + Replaces many of the built-in TestCase assertions. In general, these + assertions provide better error messages and are easier to use in + callbacks. + """ + + def fail(self, msg: Optional[object] = None) -> NoReturn: + """ + Absolutely fail the test. Do not pass go, do not collect $200. + + @param msg: the message that will be displayed as the reason for the + failure + """ + raise self.failureException(msg) + + def assertFalse(self, condition, msg=None): + """ + Fail the test if C{condition} evaluates to True. + + @param condition: any object that defines __nonzero__ + """ + super().assertFalse(condition, msg) + return condition + + assertNot = assertFalse + failUnlessFalse = assertFalse + failIf = assertFalse + + def assertTrue(self, condition, msg=None): + """ + Fail the test if C{condition} evaluates to False. + + @param condition: any object that defines __nonzero__ + """ + super().assertTrue(condition, msg) + return condition + + assert_ = assertTrue + failUnlessTrue = assertTrue + failUnless = assertTrue + + def assertRaises(self, exception, f=None, *args, **kwargs): + """ + Fail the test unless calling the function C{f} with the given + C{args} and C{kwargs} raises C{exception}. The failure will report + the traceback and call stack of the unexpected exception. + + @param exception: exception type that is to be expected + @param f: the function to call + + @return: If C{f} is L{None}, a context manager which will make an + assertion about the exception raised from the suite it manages. If + C{f} is not L{None}, the exception raised by C{f}. + + @raise self.failureException: Raised if the function call does + not raise an exception or if it raises an exception of a + different type. + """ + context = _AssertRaisesContext(self, exception) + if f is None: + return context + + return context._handle(lambda: f(*args, **kwargs)) + + # unittest.TestCase.assertRaises() is defined with 4 arguments + # but we define it with 5 arguments. So we need to tell mypy + # to ignore the following assignment to failUnlessRaises + failUnlessRaises = assertRaises # type: ignore[assignment] + + def assertEqual(self, first, second, msg=None): + """ + Fail the test if C{first} and C{second} are not equal. + + @param msg: A string describing the failure that's included in the + exception. + """ + super().assertEqual(first, second, msg) + return first + + failUnlessEqual = assertEqual + failUnlessEquals = assertEqual + assertEquals = assertEqual + + def assertIs(self, first, second, msg=None): + """ + Fail the test if C{first} is not C{second}. This is an + obect-identity-equality test, not an object equality + (i.e. C{__eq__}) test. + + @param msg: if msg is None, then the failure message will be + '%r is not %r' % (first, second) + """ + if first is not second: + raise self.failureException(msg or f"{first!r} is not {second!r}") + return first + + failUnlessIdentical = assertIs + assertIdentical = assertIs + + def assertIsNot(self, first, second, msg=None): + """ + Fail the test if C{first} is C{second}. This is an + obect-identity-equality test, not an object equality + (i.e. C{__eq__}) test. + + @param msg: if msg is None, then the failure message will be + '%r is %r' % (first, second) + """ + if first is second: + raise self.failureException(msg or f"{first!r} is {second!r}") + return first + + failIfIdentical = assertIsNot + assertNotIdentical = assertIsNot + + def assertNotEqual(self, first, second, msg=None): + """ + Fail the test if C{first} == C{second}. + + @param msg: if msg is None, then the failure message will be + '%r == %r' % (first, second) + """ + if not first != second: + raise self.failureException(msg or f"{first!r} == {second!r}") + return first + + assertNotEquals = assertNotEqual + failIfEquals = assertNotEqual + failIfEqual = assertNotEqual + + def assertIn(self, containee, container, msg=None): + """ + Fail the test if C{containee} is not found in C{container}. + + @param containee: the value that should be in C{container} + @param container: a sequence type, or in the case of a mapping type, + will follow semantics of 'if key in dict.keys()' + @param msg: if msg is None, then the failure message will be + '%r not in %r' % (first, second) + """ + if containee not in container: + raise self.failureException(msg or f"{containee!r} not in {container!r}") + return containee + + failUnlessIn = assertIn + + def assertNotIn(self, containee, container, msg=None): + """ + Fail the test if C{containee} is found in C{container}. + + @param containee: the value that should not be in C{container} + @param container: a sequence type, or in the case of a mapping type, + will follow semantics of 'if key in dict.keys()' + @param msg: if msg is None, then the failure message will be + '%r in %r' % (first, second) + """ + if containee in container: + raise self.failureException(msg or f"{containee!r} in {container!r}") + return containee + + failIfIn = assertNotIn + + def assertNotAlmostEqual(self, first, second, places=7, msg=None, delta=None): + """ + Fail if the two objects are equal as determined by their + difference rounded to the given number of decimal places + (default 7) and comparing to zero. + + @note: decimal places (from zero) is usually not the same + as significant digits (measured from the most + significant digit). + + @note: included for compatibility with PyUnit test cases + """ + if round(second - first, places) == 0: + raise self.failureException( + msg or f"{first!r} == {second!r} within {places!r} places" + ) + return first + + assertNotAlmostEquals = assertNotAlmostEqual # type:ignore[assignment] + failIfAlmostEqual = assertNotAlmostEqual # type:ignore[assignment] + failIfAlmostEquals = assertNotAlmostEqual + + def assertAlmostEqual(self, first, second, places=7, msg=None, delta=None): + """ + Fail if the two objects are unequal as determined by their + difference rounded to the given number of decimal places + (default 7) and comparing to zero. + + @note: decimal places (from zero) is usually not the same + as significant digits (measured from the most + significant digit). + + @note: included for compatibility with PyUnit test cases + """ + if round(second - first, places) != 0: + raise self.failureException( + msg or f"{first!r} != {second!r} within {places!r} places" + ) + return first + + assertAlmostEquals = assertAlmostEqual # type:ignore[assignment] + failUnlessAlmostEqual = assertAlmostEqual # type:ignore[assignment] + + def assertApproximates(self, first, second, tolerance, msg=None): + """ + Fail if C{first} - C{second} > C{tolerance} + + @param msg: if msg is None, then the failure message will be + '%r ~== %r' % (first, second) + """ + if abs(first - second) > tolerance: + raise self.failureException(msg or f"{first} ~== {second}") + return first + + failUnlessApproximates = assertApproximates + + def assertSubstring(self, substring, astring, msg=None): + """ + Fail if C{substring} does not exist within C{astring}. + """ + return self.failUnlessIn(substring, astring, msg) + + failUnlessSubstring = assertSubstring + + def assertNotSubstring(self, substring, astring, msg=None): + """ + Fail if C{astring} contains C{substring}. + """ + return self.failIfIn(substring, astring, msg) + + failIfSubstring = assertNotSubstring + + def assertWarns(self, category, message, filename, f, *args, **kwargs): + """ + Fail if the given function doesn't generate the specified warning when + called. It calls the function, checks the warning, and forwards the + result of the function if everything is fine. + + @param category: the category of the warning to check. + @param message: the output message of the warning to check. + @param filename: the filename where the warning should come from. + @param f: the function which is supposed to generate the warning. + @type f: any callable. + @param args: the arguments to C{f}. + @param kwargs: the keywords arguments to C{f}. + + @return: the result of the original function C{f}. + """ + warningsShown = [] + result = _collectWarnings(warningsShown.append, f, *args, **kwargs) + + if not warningsShown: + self.fail("No warnings emitted") + first = warningsShown[0] + for other in warningsShown[1:]: + if (other.message, other.category) != (first.message, first.category): + self.fail("Can't handle different warnings") + self.assertEqual(first.message, message) + self.assertIdentical(first.category, category) + + # Use starts with because of .pyc/.pyo issues. + self.assertTrue( + filename.startswith(first.filename), + f"Warning in {first.filename!r}, expected {filename!r}", + ) + + # It would be nice to be able to check the line number as well, but + # different configurations actually end up reporting different line + # numbers (generally the variation is only 1 line, but that's enough + # to fail the test erroneously...). + # self.assertEqual(lineno, xxx) + + return result + + failUnlessWarns = assertWarns + + def assertIsInstance(self, instance, classOrTuple, message=None): + """ + Fail if C{instance} is not an instance of the given class or of + one of the given classes. + + @param instance: the object to test the type (first argument of the + C{isinstance} call). + @type instance: any. + @param classOrTuple: the class or classes to test against (second + argument of the C{isinstance} call). + @type classOrTuple: class, type, or tuple. + + @param message: Custom text to include in the exception text if the + assertion fails. + """ + if not isinstance(instance, classOrTuple): + if message is None: + suffix = "" + else: + suffix = ": " + message + self.fail(f"{instance!r} is not an instance of {classOrTuple}{suffix}") + + failUnlessIsInstance = assertIsInstance + + def assertNotIsInstance(self, instance, classOrTuple): + """ + Fail if C{instance} is an instance of the given class or of one of the + given classes. + + @param instance: the object to test the type (first argument of the + C{isinstance} call). + @type instance: any. + @param classOrTuple: the class or classes to test against (second + argument of the C{isinstance} call). + @type classOrTuple: class, type, or tuple. + """ + if isinstance(instance, classOrTuple): + self.fail(f"{instance!r} is an instance of {classOrTuple}") + + failIfIsInstance = assertNotIsInstance + + def successResultOf( + self, + deferred: Union[ + Coroutine[Deferred[T], Any, T], + Generator[Deferred[T], Any, T], + Deferred[T], + ], + ) -> T: + """ + Return the current success result of C{deferred} or raise + C{self.failureException}. + + @param deferred: A L{Deferred<twisted.internet.defer.Deferred>} or + I{coroutine} which has a success result. + + For a L{Deferred<twisted.internet.defer.Deferred>} this means + L{Deferred.callback<twisted.internet.defer.Deferred.callback>} or + L{Deferred.errback<twisted.internet.defer.Deferred.errback>} has + been called on it and it has reached the end of its callback chain + and the last callback or errback returned a + non-L{failure.Failure}. + + For a I{coroutine} this means all awaited values have a success + result. + + @raise SynchronousTestCase.failureException: If the + L{Deferred<twisted.internet.defer.Deferred>} has no result or has a + failure result. + + @return: The result of C{deferred}. + """ + deferred = ensureDeferred(deferred) + results: List[Union[T, failure.Failure]] = [] + deferred.addBoth(results.append) + + if not results: + self.fail( + "Success result expected on {!r}, found no result instead".format( + deferred + ) + ) + + result = results[0] + + if isinstance(result, failure.Failure): + self.fail( + "Success result expected on {!r}, " + "found failure result instead:\n{}".format( + deferred, result.getTraceback() + ) + ) + return result + + def failureResultOf(self, deferred, *expectedExceptionTypes): + """ + Return the current failure result of C{deferred} or raise + C{self.failureException}. + + @param deferred: A L{Deferred<twisted.internet.defer.Deferred>} which + has a failure result. This means + L{Deferred.callback<twisted.internet.defer.Deferred.callback>} or + L{Deferred.errback<twisted.internet.defer.Deferred.errback>} has + been called on it and it has reached the end of its callback chain + and the last callback or errback raised an exception or returned a + L{failure.Failure}. + @type deferred: L{Deferred<twisted.internet.defer.Deferred>} + + @param expectedExceptionTypes: Exception types to expect - if + provided, and the exception wrapped by the failure result is + not one of the types provided, then this test will fail. + + @raise SynchronousTestCase.failureException: If the + L{Deferred<twisted.internet.defer.Deferred>} has no result, has a + success result, or has an unexpected failure result. + + @return: The failure result of C{deferred}. + @rtype: L{failure.Failure} + """ + deferred = ensureDeferred(deferred) + result = [] + deferred.addBoth(result.append) + + if not result: + self.fail( + "Failure result expected on {!r}, found no result instead".format( + deferred + ) + ) + + result = result[0] + + if not isinstance(result, failure.Failure): + self.fail( + "Failure result expected on {!r}, " + "found success result ({!r}) instead".format(deferred, result) + ) + + if expectedExceptionTypes and not result.check(*expectedExceptionTypes): + expectedString = " or ".join( + [".".join((t.__module__, t.__name__)) for t in expectedExceptionTypes] + ) + + self.fail( + "Failure of type ({}) expected on {!r}, " + "found type {!r} instead: {}".format( + expectedString, deferred, result.type, result.getTraceback() + ) + ) + + return result + + def assertNoResult(self, deferred): + """ + Assert that C{deferred} does not have a result at this point. + + If the assertion succeeds, then the result of C{deferred} is left + unchanged. Otherwise, any L{failure.Failure} result is swallowed. + + @param deferred: A L{Deferred<twisted.internet.defer.Deferred>} without + a result. This means that neither + L{Deferred.callback<twisted.internet.defer.Deferred.callback>} nor + L{Deferred.errback<twisted.internet.defer.Deferred.errback>} has + been called, or that the + L{Deferred<twisted.internet.defer.Deferred>} is waiting on another + L{Deferred<twisted.internet.defer.Deferred>} for a result. + @type deferred: L{Deferred<twisted.internet.defer.Deferred>} + + @raise SynchronousTestCase.failureException: If the + L{Deferred<twisted.internet.defer.Deferred>} has a result. + """ + deferred = ensureDeferred(deferred) + result = [] + + def cb(res): + result.append(res) + return res + + deferred.addBoth(cb) + + if result: + # If there is already a failure, the self.fail below will + # report it, so swallow it in the deferred + deferred.addErrback(lambda _: None) + self.fail( + "No result expected on {!r}, found {!r} instead".format( + deferred, result[0] + ) + ) + + +class _LogObserver: + """ + Observes the Twisted logs and catches any errors. + + @ivar _errors: A C{list} of L{Failure} instances which were received as + error events from the Twisted logging system. + + @ivar _added: A C{int} giving the number of times C{_add} has been called + less the number of times C{_remove} has been called; used to only add + this observer to the Twisted logging since once, regardless of the + number of calls to the add method. + + @ivar _ignored: A C{list} of exception types which will not be recorded. + """ + + def __init__(self): + self._errors = [] + self._added = 0 + self._ignored = [] + + def _add(self): + if self._added == 0: + log.addObserver(self.gotEvent) + self._added += 1 + + def _remove(self): + self._added -= 1 + if self._added == 0: + log.removeObserver(self.gotEvent) + + def _ignoreErrors(self, *errorTypes): + """ + Do not store any errors with any of the given types. + """ + self._ignored.extend(errorTypes) + + def _clearIgnores(self): + """ + Stop ignoring any errors we might currently be ignoring. + """ + self._ignored = [] + + def flushErrors(self, *errorTypes): + """ + Flush errors from the list of caught errors. If no arguments are + specified, remove all errors. If arguments are specified, only remove + errors of those types from the stored list. + """ + if errorTypes: + flushed = [] + remainder = [] + for f in self._errors: + if f.check(*errorTypes): + flushed.append(f) + else: + remainder.append(f) + self._errors = remainder + else: + flushed = self._errors + self._errors = [] + return flushed + + def getErrors(self): + """ + Return a list of errors caught by this observer. + """ + return self._errors + + def gotEvent(self, event): + """ + The actual observer method. Called whenever a message is logged. + + @param event: A dictionary containing the log message. Actual + structure undocumented (see source for L{twisted.python.log}). + """ + if event.get("isError", False) and "failure" in event: + f = event["failure"] + if len(self._ignored) == 0 or not f.check(*self._ignored): + self._errors.append(f) + + +_logObserver = _LogObserver() + + +class SynchronousTestCase(_Assertions): + """ + A unit test. The atom of the unit testing universe. + + This class extends C{unittest.TestCase} from the standard library. A number + of convenient testing helpers are added, including logging and warning + integration, monkey-patching support, and more. + + To write a unit test, subclass C{SynchronousTestCase} and define a method + (say, 'test_foo') on the subclass. To run the test, instantiate your + subclass with the name of the method, and call L{run} on the instance, + passing a L{TestResult} object. + + The C{trial} script will automatically find any C{SynchronousTestCase} + subclasses defined in modules beginning with 'test_' and construct test + cases for all methods beginning with 'test'. + + If an error is logged during the test run, the test will fail with an + error. See L{log.err}. + + @ivar failureException: An exception class, defaulting to C{FailTest}. If + the test method raises this exception, it will be reported as a failure, + rather than an exception. All of the assertion methods raise this if the + assertion fails. + + @ivar skip: L{None} or a string explaining why this test is to be + skipped. If defined, the test will not be run. Instead, it will be + reported to the result object as 'skipped' (if the C{TestResult} supports + skipping). + + @ivar todo: L{None}, a string or a tuple of C{(errors, reason)} where + C{errors} is either an exception class or an iterable of exception + classes, and C{reason} is a string. See L{Todo} or L{makeTodo} for more + information. + + @ivar suppress: L{None} or a list of tuples of C{(args, kwargs)} to be + passed to C{warnings.filterwarnings}. Use these to suppress warnings + raised in a test. Useful for testing deprecated code. See also + L{util.suppress}. + """ + + failureException = FailTest + + def __init__(self, methodName="runTest"): + super().__init__(methodName) + self._passed = False + self._cleanups = [] + self._testMethodName = methodName + testMethod = getattr(self, methodName) + self._parents = [testMethod, self, sys.modules.get(self.__class__.__module__)] + + def __eq__(self, other: object) -> bool: + """ + Override the comparison defined by the base TestCase which considers + instances of the same class with the same _testMethodName to be + equal. Since trial puts TestCase instances into a set, that + definition of comparison makes it impossible to run the same test + method twice. Most likely, trial should stop using a set to hold + tests, but until it does, this is necessary on Python 2.6. -exarkun + """ + if isinstance(other, SynchronousTestCase): + return self is other + else: + return NotImplemented + + def __hash__(self): + return hash((self.__class__, self._testMethodName)) + + def shortDescription(self): + desc = super().shortDescription() + if desc is None: + return self._testMethodName + return desc + + def getSkip(self) -> Tuple[bool, Optional[str]]: + """ + Return the skip reason set on this test, if any is set. Checks on the + instance first, then the class, then the module, then packages. As + soon as it finds something with a C{skip} attribute, returns that in + a tuple (L{True}, L{str}). + If the C{skip} attribute does not exist, look for C{__unittest_skip__} + and C{__unittest_skip_why__} attributes which are set by the standard + library L{unittest.skip} function. + Returns (L{False}, L{None}) if it cannot find anything. + See L{TestCase} docstring for more details. + """ + skipReason = util.acquireAttribute(self._parents, "skip", None) + doSkip = skipReason is not None + if skipReason is None: + doSkip = getattr(self, "__unittest_skip__", False) + if doSkip: + skipReason = getattr(self, "__unittest_skip_why__", "") + return (doSkip, skipReason) + + def getTodo(self): + """ + Return a L{Todo} object if the test is marked todo. Checks on the + instance first, then the class, then the module, then packages. As + soon as it finds something with a C{todo} attribute, returns that. + Returns L{None} if it cannot find anything. See L{TestCase} docstring + for more details. + """ + todo = util.acquireAttribute(self._parents, "todo", None) + if todo is None: + return None + return makeTodo(todo) + + def runTest(self): + """ + If no C{methodName} argument is passed to the constructor, L{run} will + treat this method as the thing with the actual test inside. + """ + + def run(self, result): + """ + Run the test case, storing the results in C{result}. + + First runs C{setUp} on self, then runs the test method (defined in the + constructor), then runs C{tearDown}. As with the standard library + L{unittest.TestCase}, the return value of these methods is disregarded. + In particular, returning a L{Deferred<twisted.internet.defer.Deferred>} + has no special additional consequences. + + @param result: A L{TestResult} object. + """ + log.msg("--> %s <--" % (self.id())) + new_result = itrial.IReporter(result, None) + if new_result is None: + result = PyUnitResultAdapter(result) + else: + result = new_result + result.startTest(self) + (doSkip, skipReason) = self.getSkip() + if doSkip: # don't run test methods that are marked as .skip + result.addSkip(self, skipReason) + result.stopTest(self) + return + + self._passed = False + self._warnings = [] + + self._installObserver() + # All the code inside _runFixturesAndTest will be run such that warnings + # emitted by it will be collected and retrievable by flushWarnings. + _collectWarnings(self._warnings.append, self._runFixturesAndTest, result) + + # Any collected warnings which the test method didn't flush get + # re-emitted so they'll be logged or show up on stdout or whatever. + for w in self.flushWarnings(): + try: + warnings.warn_explicit(**w) + except BaseException: + result.addError(self, failure.Failure()) + + result.stopTest(self) + + # f should be a positional only argument but that is a breaking change + # see https://github.com/twisted/twisted/issues/11967 + def addCleanup( # type: ignore[override] + self, f: Callable[_P, object], *args: _P.args, **kwargs: _P.kwargs + ) -> None: + """ + Add the given function to a list of functions to be called after the + test has run, but before C{tearDown}. + + Functions will be run in reverse order of being added. This helps + ensure that tear down complements set up. + + As with all aspects of L{SynchronousTestCase}, Deferreds are not + supported in cleanup functions. + """ + self._cleanups.append((f, args, kwargs)) + + def patch(self, obj, attribute, value): + """ + Monkey patch an object for the duration of the test. + + The monkey patch will be reverted at the end of the test using the + L{addCleanup} mechanism. + + The L{monkey.MonkeyPatcher} is returned so that users can restore and + re-apply the monkey patch within their tests. + + @param obj: The object to monkey patch. + @param attribute: The name of the attribute to change. + @param value: The value to set the attribute to. + @return: A L{monkey.MonkeyPatcher} object. + """ + monkeyPatch = monkey.MonkeyPatcher((obj, attribute, value)) + monkeyPatch.patch() + self.addCleanup(monkeyPatch.restore) + return monkeyPatch + + def flushLoggedErrors(self, *errorTypes): + """ + Remove stored errors received from the log. + + C{TestCase} stores each error logged during the run of the test and + reports them as errors during the cleanup phase (after C{tearDown}). + + @param errorTypes: If unspecified, flush all errors. Otherwise, only + flush errors that match the given types. + + @return: A list of failures that have been removed. + """ + return self._observer.flushErrors(*errorTypes) + + def flushWarnings(self, offendingFunctions=None): + """ + Remove stored warnings from the list of captured warnings and return + them. + + @param offendingFunctions: If L{None}, all warnings issued during the + currently running test will be flushed. Otherwise, only warnings + which I{point} to a function included in this list will be flushed. + All warnings include a filename and source line number; if these + parts of a warning point to a source line which is part of a + function, then the warning I{points} to that function. + @type offendingFunctions: L{None} or L{list} of functions or methods. + + @raise ValueError: If C{offendingFunctions} is not L{None} and includes + an object which is not a L{types.FunctionType} or + L{types.MethodType} instance. + + @return: A C{list}, each element of which is a C{dict} giving + information about one warning which was flushed by this call. The + keys of each C{dict} are: + + - C{'message'}: The string which was passed as the I{message} + parameter to L{warnings.warn}. + + - C{'category'}: The warning subclass which was passed as the + I{category} parameter to L{warnings.warn}. + + - C{'filename'}: The name of the file containing the definition + of the code object which was C{stacklevel} frames above the + call to L{warnings.warn}, where C{stacklevel} is the value of + the C{stacklevel} parameter passed to L{warnings.warn}. + + - C{'lineno'}: The source line associated with the active + instruction of the code object object which was C{stacklevel} + frames above the call to L{warnings.warn}, where + C{stacklevel} is the value of the C{stacklevel} parameter + passed to L{warnings.warn}. + """ + if offendingFunctions is None: + toFlush = self._warnings[:] + self._warnings[:] = [] + else: + toFlush = [] + for aWarning in self._warnings: + for aFunction in offendingFunctions: + if not isinstance( + aFunction, (types.FunctionType, types.MethodType) + ): + raise ValueError(f"{aFunction!r} is not a function or method") + + # inspect.getabsfile(aFunction) sometimes returns a + # filename which disagrees with the filename the warning + # system generates. This seems to be because a + # function's code object doesn't deal with source files + # being renamed. inspect.getabsfile(module) seems + # better (or at least agrees with the warning system + # more often), and does some normalization for us which + # is desirable. inspect.getmodule() is attractive, but + # somewhat broken in Python < 2.6. See Python bug 4845. + aModule = sys.modules[aFunction.__module__] + filename = inspect.getabsfile(aModule) + + if filename != os.path.normcase(aWarning.filename): + continue + lineNumbers = [ + lineNumber + for _, lineNumber in _findlinestarts(aFunction.__code__) + ] + if not (min(lineNumbers) <= aWarning.lineno <= max(lineNumbers)): + continue + # The warning points to this function, flush it and move on + # to the next warning. + toFlush.append(aWarning) + break + # Remove everything which is being flushed. + list(map(self._warnings.remove, toFlush)) + + return [ + { + "message": w.message, + "category": w.category, + "filename": w.filename, + "lineno": w.lineno, + } + for w in toFlush + ] + + def getDeprecatedModuleAttribute(self, moduleName, name, version, message=None): + """ + Retrieve a module attribute which should have been deprecated, + and assert that we saw the appropriate deprecation warning. + + @type moduleName: C{str} + @param moduleName: Fully-qualified Python name of the module containing + the deprecated attribute; if called from the same module as the + attributes are being deprecated in, using the C{__name__} global can + be helpful + + @type name: C{str} + @param name: Attribute name which we expect to be deprecated + + @param version: The first L{version<twisted.python.versions.Version>} that + the module attribute was deprecated. + + @type message: C{str} + @param message: (optional) The expected deprecation message for the module attribute + + @return: The given attribute from the named module + + @raise FailTest: if no warnings were emitted on getattr, or if the + L{DeprecationWarning} emitted did not produce the canonical + please-use-something-else message that is standard for Twisted + deprecations according to the given version and replacement. + + @since: Twisted 21.2.0 + """ + fqpn = moduleName + "." + name + module = sys.modules[moduleName] + attr = getattr(module, name) + warningsShown = self.flushWarnings([self.getDeprecatedModuleAttribute]) + if len(warningsShown) == 0: + self.fail(f"{fqpn} is not deprecated.") + + observedWarning = warningsShown[0]["message"] + expectedWarning = DEPRECATION_WARNING_FORMAT % { + "fqpn": fqpn, + "version": getVersionString(version), + } + if message is not None: + expectedWarning = expectedWarning + ": " + message + self.assert_( + observedWarning.startswith(expectedWarning), + f"Expected {observedWarning!r} to start with {expectedWarning!r}", + ) + + return attr + + def callDeprecated(self, version, f, *args, **kwargs): + """ + Call a function that should have been deprecated at a specific version + and in favor of a specific alternative, and assert that it was thusly + deprecated. + + @param version: A 2-sequence of (since, replacement), where C{since} is + a the first L{version<incremental.Version>} that C{f} + should have been deprecated since, and C{replacement} is a suggested + replacement for the deprecated functionality, as described by + L{twisted.python.deprecate.deprecated}. If there is no suggested + replacement, this parameter may also be simply a + L{version<incremental.Version>} by itself. + + @param f: The deprecated function to call. + + @param args: The arguments to pass to C{f}. + + @param kwargs: The keyword arguments to pass to C{f}. + + @return: Whatever C{f} returns. + + @raise Exception: Whatever C{f} raises. If any exception is + raised by C{f}, though, no assertions will be made about emitted + deprecations. + + @raise FailTest: if no warnings were emitted by C{f}, or if the + L{DeprecationWarning} emitted did not produce the canonical + please-use-something-else message that is standard for Twisted + deprecations according to the given version and replacement. + """ + result = f(*args, **kwargs) + warningsShown = self.flushWarnings([self.callDeprecated]) + try: + info = list(version) + except TypeError: + since = version + replacement = None + else: + [since, replacement] = info + + if len(warningsShown) == 0: + self.fail(f"{f!r} is not deprecated.") + + observedWarning = warningsShown[0]["message"] + expectedWarning = getDeprecationWarningString(f, since, replacement=replacement) + self.assertEqual(expectedWarning, observedWarning) + + return result + + def mktemp(self): + """ + Create a new path name which can be used for a new file or directory. + + The result is a relative path that is guaranteed to be unique within the + current working directory. The parent of the path will exist, but the + path will not. + + For a temporary directory call os.mkdir on the path. For a temporary + file just create the file (e.g. by opening the path for writing and then + closing it). + + @return: The newly created path + @rtype: C{str} + """ + MAX_FILENAME = 32 # some platforms limit lengths of filenames + base = os.path.join( + self.__class__.__module__[:MAX_FILENAME], + self.__class__.__name__[:MAX_FILENAME], + self._testMethodName[:MAX_FILENAME], + ) + if not os.path.exists(base): + os.makedirs(base) + # With 3.11 or older mkdtemp returns a relative path. + # With newer it is absolute. + # Here we make sure we always handle a relative path. + # See https://github.com/python/cpython/issues/51574 + dirname = os.path.relpath(tempfile.mkdtemp("", "", base)) + return os.path.join(dirname, "temp") + + def _getSuppress(self): + """ + Returns any warning suppressions set for this test. Checks on the + instance first, then the class, then the module, then packages. As + soon as it finds something with a C{suppress} attribute, returns that. + Returns any empty list (i.e. suppress no warnings) if it cannot find + anything. See L{TestCase} docstring for more details. + """ + return util.acquireAttribute(self._parents, "suppress", []) + + def _getSkipReason(self, method, skip): + """ + Return the reason to use for skipping a test method. + + @param method: The method which produced the skip. + @param skip: A L{unittest.SkipTest} instance raised by C{method}. + """ + if len(skip.args) > 0: + return skip.args[0] + + warnAboutFunction( + method, + "Do not raise unittest.SkipTest with no arguments! Give a reason " + "for skipping tests!", + ) + return skip + + def _run(self, suppress, todo, method, result): + """ + Run a single method, either a test method or fixture. + + @param suppress: Any warnings to suppress, as defined by the C{suppress} + attribute on this method, test case, or the module it is defined in. + + @param todo: Any expected failure or failures, as defined by the C{todo} + attribute on this method, test case, or the module it is defined in. + + @param method: The method to run. + + @param result: The TestResult instance to which to report results. + + @return: C{True} if the method fails and no further method/fixture calls + should be made, C{False} otherwise. + """ + if inspect.isgeneratorfunction(method): + exc = TypeError( + "{!r} is a generator function and therefore will never run".format( + method + ) + ) + result.addError(self, failure.Failure(exc)) + return True + try: + runWithWarningsSuppressed(suppress, method) + except SkipTest as e: + result.addSkip(self, self._getSkipReason(method, e)) + except BaseException: + reason = failure.Failure() + if todo is None or not todo.expected(reason): + if reason.check(self.failureException): + addResult = result.addFailure + else: + addResult = result.addError + addResult(self, reason) + else: + result.addExpectedFailure(self, reason, todo) + else: + return False + return True + + def _runFixturesAndTest(self, result): + """ + Run C{setUp}, a test method, test cleanups, and C{tearDown}. + + @param result: The TestResult instance to which to report results. + """ + suppress = self._getSuppress() + try: + if self._run(suppress, None, self.setUp, result): + return + + todo = self.getTodo() + method = getattr(self, self._testMethodName) + failed = self._run(suppress, todo, method, result) + finally: + self._runCleanups(result) + + if todo and not failed: + result.addUnexpectedSuccess(self, todo) + + if self._run(suppress, None, self.tearDown, result): + failed = True + + for error in self._observer.getErrors(): + result.addError(self, error) + failed = True + self._observer.flushErrors() + self._removeObserver() + + if not (failed or todo): + result.addSuccess(self) + + def _runCleanups(self, result): + """ + Synchronously run any cleanups which have been added. + """ + while len(self._cleanups) > 0: + f, args, kwargs = self._cleanups.pop() + try: + f(*args, **kwargs) + except BaseException: + f = failure.Failure() + result.addError(self, f) + + def _installObserver(self): + self._observer = _logObserver + self._observer._add() + + def _removeObserver(self): + self._observer._remove() |