aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py2/twisted/trial/_asynctest.py
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/trial/_asynctest.py
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/trial/_asynctest.py')
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_asynctest.py405
1 files changed, 405 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/trial/_asynctest.py b/contrib/python/Twisted/py2/twisted/trial/_asynctest.py
new file mode 100644
index 0000000000..c7a7090cf3
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_asynctest.py
@@ -0,0 +1,405 @@
+# -*- test-case-name: twisted.trial.test -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Things likely to be used by writers of unit tests.
+
+Maintainer: Jonathan Lange
+"""
+
+from __future__ import division, absolute_import
+
+import inspect
+import warnings
+
+from zope.interface import implementer
+
+# We can't import reactor at module-level because this code runs before trial
+# installs a user-specified reactor, installing the default reactor and
+# breaking reactor installation. See also #6047.
+from twisted.internet import defer, utils
+from twisted.python import failure
+
+from twisted.trial import itrial, util
+from twisted.trial._synctest import (
+ FailTest, SkipTest, SynchronousTestCase)
+
+_wait_is_running = []
+
+@implementer(itrial.ITestCase)
+class TestCase(SynchronousTestCase):
+ """
+ A unit test. The atom of the unit testing universe.
+
+ This class extends L{SynchronousTestCase} which extends C{unittest.TestCase}
+ from the standard library. The main feature is the ability to return
+ C{Deferred}s from tests and fixture methods and to have the suite wait for
+ those C{Deferred}s to fire. Also provides new assertions such as
+ L{assertFailure}.
+
+ @ivar timeout: A real number of seconds. If set, the test will
+ raise an error if it takes longer than C{timeout} seconds.
+ If not set, util.DEFAULT_TIMEOUT_DURATION is used.
+ """
+
+ def __init__(self, methodName='runTest'):
+ """
+ Construct an asynchronous test case for C{methodName}.
+
+ @param methodName: The name of a method on C{self}. This method should
+ be a unit test. That is, it should be a short method that calls some of
+ the assert* methods. If C{methodName} is unspecified,
+ L{SynchronousTestCase.runTest} will be used as the test method. This is
+ mostly useful for testing Trial.
+ """
+ super(TestCase, self).__init__(methodName)
+
+
+ def assertFailure(self, deferred, *expectedFailures):
+ """
+ Fail if C{deferred} does not errback with one of C{expectedFailures}.
+ Returns the original Deferred with callbacks added. You will need
+ to return this Deferred from your test case.
+ """
+ def _cb(ignore):
+ raise self.failureException(
+ "did not catch an error, instead got %r" % (ignore,))
+
+ def _eb(failure):
+ if failure.check(*expectedFailures):
+ return failure.value
+ else:
+ output = ('\nExpected: %r\nGot:\n%s'
+ % (expectedFailures, str(failure)))
+ raise self.failureException(output)
+ return deferred.addCallbacks(_cb, _eb)
+ failUnlessFailure = assertFailure
+
+
+ def _run(self, methodName, result):
+ from twisted.internet import reactor
+ timeout = self.getTimeout()
+ def onTimeout(d):
+ e = defer.TimeoutError("%r (%s) still running at %s secs"
+ % (self, methodName, timeout))
+ f = failure.Failure(e)
+ # try to errback the deferred that the test returns (for no gorram
+ # reason) (see issue1005 and test_errorPropagation in
+ # test_deferred)
+ try:
+ d.errback(f)
+ except defer.AlreadyCalledError:
+ # if the deferred has been called already but the *back chain
+ # is still unfinished, crash the reactor and report timeout
+ # error ourself.
+ reactor.crash()
+ self._timedOut = True # see self._wait
+ todo = self.getTodo()
+ if todo is not None and todo.expected(f):
+ result.addExpectedFailure(self, f, todo)
+ else:
+ result.addError(self, f)
+ onTimeout = utils.suppressWarnings(
+ onTimeout, util.suppress(category=DeprecationWarning))
+ method = getattr(self, methodName)
+ if inspect.isgeneratorfunction(method):
+ exc = TypeError(
+ '%r is a generator function and therefore will never run' % (
+ method,))
+ return defer.fail(exc)
+ d = defer.maybeDeferred(
+ utils.runWithWarningsSuppressed, self._getSuppress(), method)
+ call = reactor.callLater(timeout, onTimeout, d)
+ d.addBoth(lambda x : call.active() and call.cancel() or x)
+ return d
+
+
+ def __call__(self, *args, **kwargs):
+ return self.run(*args, **kwargs)
+
+
+ def deferSetUp(self, ignored, result):
+ d = self._run('setUp', result)
+ d.addCallbacks(self.deferTestMethod, self._ebDeferSetUp,
+ callbackArgs=(result,),
+ errbackArgs=(result,))
+ return d
+
+
+ def _ebDeferSetUp(self, failure, result):
+ if failure.check(SkipTest):
+ result.addSkip(self, self._getSkipReason(self.setUp, failure.value))
+ else:
+ result.addError(self, failure)
+ if failure.check(KeyboardInterrupt):
+ result.stop()
+ return self.deferRunCleanups(None, result)
+
+
+ def deferTestMethod(self, ignored, result):
+ d = self._run(self._testMethodName, result)
+ d.addCallbacks(self._cbDeferTestMethod, self._ebDeferTestMethod,
+ callbackArgs=(result,),
+ errbackArgs=(result,))
+ d.addBoth(self.deferRunCleanups, result)
+ d.addBoth(self.deferTearDown, result)
+ return d
+
+
+ def _cbDeferTestMethod(self, ignored, result):
+ if self.getTodo() is not None:
+ result.addUnexpectedSuccess(self, self.getTodo())
+ else:
+ self._passed = True
+ return ignored
+
+
+ def _ebDeferTestMethod(self, f, result):
+ todo = self.getTodo()
+ if todo is not None and todo.expected(f):
+ result.addExpectedFailure(self, f, todo)
+ elif f.check(self.failureException, FailTest):
+ result.addFailure(self, f)
+ elif f.check(KeyboardInterrupt):
+ result.addError(self, f)
+ result.stop()
+ elif f.check(SkipTest):
+ result.addSkip(
+ self,
+ self._getSkipReason(getattr(self, self._testMethodName), f.value))
+ else:
+ result.addError(self, f)
+
+
+ def deferTearDown(self, ignored, result):
+ d = self._run('tearDown', result)
+ d.addErrback(self._ebDeferTearDown, result)
+ return d
+
+
+ def _ebDeferTearDown(self, failure, result):
+ result.addError(self, failure)
+ if failure.check(KeyboardInterrupt):
+ result.stop()
+ self._passed = False
+
+
+ def deferRunCleanups(self, ignored, result):
+ """
+ Run any scheduled cleanups and report errors (if any to the result
+ object.
+ """
+ d = self._runCleanups()
+ d.addCallback(self._cbDeferRunCleanups, result)
+ return d
+
+
+ def _cbDeferRunCleanups(self, cleanupResults, result):
+ for flag, testFailure in cleanupResults:
+ if flag == defer.FAILURE:
+ result.addError(self, testFailure)
+ if testFailure.check(KeyboardInterrupt):
+ result.stop()
+ self._passed = False
+
+
+ def _cleanUp(self, result):
+ try:
+ clean = util._Janitor(self, result).postCaseCleanup()
+ if not clean:
+ self._passed = False
+ except:
+ result.addError(self, failure.Failure())
+ self._passed = False
+ for error in self._observer.getErrors():
+ result.addError(self, error)
+ self._passed = False
+ self.flushLoggedErrors()
+ self._removeObserver()
+ if self._passed:
+ result.addSuccess(self)
+
+
+ def _classCleanUp(self, result):
+ try:
+ util._Janitor(self, result).postClassCleanup()
+ except:
+ result.addError(self, failure.Failure())
+
+
+ def _makeReactorMethod(self, name):
+ """
+ Create a method which wraps the reactor method C{name}. The new
+ method issues a deprecation warning and calls the original.
+ """
+ def _(*a, **kw):
+ warnings.warn("reactor.%s cannot be used inside unit tests. "
+ "In the future, using %s will fail the test and may "
+ "crash or hang the test run."
+ % (name, name),
+ stacklevel=2, category=DeprecationWarning)
+ return self._reactorMethods[name](*a, **kw)
+ return _
+
+
+ def _deprecateReactor(self, reactor):
+ """
+ Deprecate C{iterate}, C{crash} and C{stop} on C{reactor}. That is,
+ each method is wrapped in a function that issues a deprecation
+ warning, then calls the original.
+
+ @param reactor: The Twisted reactor.
+ """
+ self._reactorMethods = {}
+ for name in ['crash', 'iterate', 'stop']:
+ self._reactorMethods[name] = getattr(reactor, name)
+ setattr(reactor, name, self._makeReactorMethod(name))
+
+
+ def _undeprecateReactor(self, reactor):
+ """
+ Restore the deprecated reactor methods. Undoes what
+ L{_deprecateReactor} did.
+
+ @param reactor: The Twisted reactor.
+ """
+ for name, method in self._reactorMethods.items():
+ setattr(reactor, name, method)
+ self._reactorMethods = {}
+
+
+ def _runCleanups(self):
+ """
+ Run the cleanups added with L{addCleanup} in order.
+
+ @return: A C{Deferred} that fires when all cleanups are run.
+ """
+ def _makeFunction(f, args, kwargs):
+ return lambda: f(*args, **kwargs)
+ callables = []
+ while len(self._cleanups) > 0:
+ f, args, kwargs = self._cleanups.pop()
+ callables.append(_makeFunction(f, args, kwargs))
+ return util._runSequentially(callables)
+
+
+ def _runFixturesAndTest(self, result):
+ """
+ Really run C{setUp}, the test method, and C{tearDown}. Any of these may
+ return L{defer.Deferred}s. After they complete, do some reactor cleanup.
+
+ @param result: A L{TestResult} object.
+ """
+ from twisted.internet import reactor
+ self._deprecateReactor(reactor)
+ self._timedOut = False
+ try:
+ d = self.deferSetUp(None, result)
+ try:
+ self._wait(d)
+ finally:
+ self._cleanUp(result)
+ self._classCleanUp(result)
+ finally:
+ self._undeprecateReactor(reactor)
+
+
+ def addCleanup(self, f, *args, **kwargs):
+ """
+ Extend the base cleanup feature with support for cleanup functions which
+ return Deferreds.
+
+ If the function C{f} returns a Deferred, C{TestCase} will wait until the
+ Deferred has fired before proceeding to the next function.
+ """
+ return super(TestCase, self).addCleanup(f, *args, **kwargs)
+
+
+ def getSuppress(self):
+ return self._getSuppress()
+
+
+ def getTimeout(self):
+ """
+ Returns the timeout value set on this test. Checks on the instance
+ first, then the class, then the module, then packages. As soon as it
+ finds something with a C{timeout} attribute, returns that. Returns
+ L{util.DEFAULT_TIMEOUT_DURATION} if it cannot find anything. See
+ L{TestCase} docstring for more details.
+ """
+ timeout = util.acquireAttribute(self._parents, 'timeout',
+ util.DEFAULT_TIMEOUT_DURATION)
+ try:
+ return float(timeout)
+ except (ValueError, TypeError):
+ # XXX -- this is here because sometimes people will have methods
+ # called 'timeout', or set timeout to 'orange', or something
+ # Particularly, test_news.NewsTestCase and ReactorCoreTestCase
+ # both do this.
+ warnings.warn("'timeout' attribute needs to be a number.",
+ category=DeprecationWarning)
+ return util.DEFAULT_TIMEOUT_DURATION
+
+
+ def _wait(self, d, running=_wait_is_running):
+ """Take a Deferred that only ever callbacks. Block until it happens.
+ """
+ if running:
+ raise RuntimeError("_wait is not reentrant")
+
+ from twisted.internet import reactor
+ results = []
+ def append(any):
+ if results is not None:
+ results.append(any)
+ def crash(ign):
+ if results is not None:
+ reactor.crash()
+ crash = utils.suppressWarnings(
+ crash, util.suppress(message=r'reactor\.crash cannot be used.*',
+ category=DeprecationWarning))
+ def stop():
+ reactor.crash()
+ stop = utils.suppressWarnings(
+ stop, util.suppress(message=r'reactor\.crash cannot be used.*',
+ category=DeprecationWarning))
+
+ running.append(None)
+ try:
+ d.addBoth(append)
+ if results:
+ # d might have already been fired, in which case append is
+ # called synchronously. Avoid any reactor stuff.
+ return
+ d.addBoth(crash)
+ reactor.stop = stop
+ try:
+ reactor.run()
+ finally:
+ del reactor.stop
+
+ # If the reactor was crashed elsewhere due to a timeout, hopefully
+ # that crasher also reported an error. Just return.
+ # _timedOut is most likely to be set when d has fired but hasn't
+ # completed its callback chain (see self._run)
+ if results or self._timedOut: #defined in run() and _run()
+ return
+
+ # If the timeout didn't happen, and we didn't get a result or
+ # a failure, then the user probably aborted the test, so let's
+ # just raise KeyboardInterrupt.
+
+ # FIXME: imagine this:
+ # web/test/test_webclient.py:
+ # exc = self.assertRaises(error.Error, wait, method(url))
+ #
+ # wait() will raise KeyboardInterrupt, and assertRaises will
+ # swallow it. Therefore, wait() raising KeyboardInterrupt is
+ # insufficient to stop trial. A suggested solution is to have
+ # this code set a "stop trial" flag, or otherwise notify trial
+ # that it should really try to stop as soon as possible.
+ raise KeyboardInterrupt()
+ finally:
+ results = None
+ running.pop()