aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py2/twisted/trial
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/trial
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/trial')
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/__init__.py50
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/__main__.py10
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_asyncrunner.py185
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_asynctest.py405
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/__init__.py47
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/distreporter.py93
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py258
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/managercommands.py86
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/options.py30
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/worker.py333
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/workercommands.py31
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/workerreporter.py154
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/workertrial.py111
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_synctest.py1416
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/itrial.py259
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/reporter.py1322
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/runner.py1064
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/unittest.py35
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/util.py411
19 files changed, 6300 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/trial/__init__.py b/contrib/python/Twisted/py2/twisted/trial/__init__.py
new file mode 100644
index 0000000000..5faaa99ddd
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/__init__.py
@@ -0,0 +1,50 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+#
+# Maintainer: Jonathan Lange
+
+"""
+Twisted Trial: Asynchronous unit testing framework.
+
+Trial extends Python's builtin C{unittest} to provide support for asynchronous
+tests.
+
+Trial strives to be compatible with other Python xUnit testing frameworks.
+"Compatibility" is a difficult things to define. In practice, it means that:
+
+ - L{twisted.trial.unittest.TestCase} objects should be able to be used by
+ other test runners without those runners requiring special support for
+ Trial tests.
+
+ - Tests that subclass the standard library C{TestCase} and don't do anything
+ "too weird" should be able to be discoverable and runnable by the Trial
+ test runner without the authors of those tests having to jump through
+ hoops.
+
+ - Tests that implement the interface provided by the standard library
+ C{TestCase} should be runnable by the Trial runner.
+
+ - The Trial test runner and Trial L{unittest.TestCase} objects ought to be
+ able to use standard library C{TestResult} objects, and third party
+ C{TestResult} objects based on the standard library.
+
+This list is not necessarily exhaustive -- compatibility is hard to define.
+Contributors who discover more helpful ways of defining compatibility are
+encouraged to update this document.
+
+
+Examples:
+
+B{Timeouts} for tests should be implemented in the runner. If this is done,
+then timeouts could work for third-party TestCase objects as well as for
+L{twisted.trial.unittest.TestCase} objects. Further, Twisted C{TestCase}
+objects will run in other runners without timing out.
+See U{http://twistedmatrix.com/trac/ticket/2675}.
+
+Running tests in a temporary directory should be a feature of the test case,
+because often tests themselves rely on this behaviour. If the feature is
+implemented in the runner, then tests will change behaviour (possibly
+breaking) when run in a different test runner. Further, many tests don't even
+care about the filesystem.
+See U{http://twistedmatrix.com/trac/ticket/2916}.
+"""
diff --git a/contrib/python/Twisted/py2/twisted/trial/__main__.py b/contrib/python/Twisted/py2/twisted/trial/__main__.py
new file mode 100644
index 0000000000..fe5534efd4
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/__main__.py
@@ -0,0 +1,10 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+if __name__ == '__main__':
+ from pkg_resources import load_entry_point
+ import sys
+
+ sys.exit(
+ load_entry_point('Twisted', 'console_scripts', 'trial')()
+ )
diff --git a/contrib/python/Twisted/py2/twisted/trial/_asyncrunner.py b/contrib/python/Twisted/py2/twisted/trial/_asyncrunner.py
new file mode 100644
index 0000000000..15e4a1add5
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_asyncrunner.py
@@ -0,0 +1,185 @@
+# -*- test-case-name: twisted.trial.test -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Infrastructure for test running and suites.
+"""
+
+from __future__ import division, absolute_import
+
+import doctest
+import gc
+
+from twisted.python import components
+
+from twisted.trial import itrial, reporter
+from twisted.trial._synctest import _logObserver
+
+pyunit = __import__('unittest')
+
+from zope.interface import implementer
+
+
+
+class TestSuite(pyunit.TestSuite):
+ """
+ Extend the standard library's C{TestSuite} with a consistently overrideable
+ C{run} method.
+ """
+
+ def run(self, result):
+ """
+ Call C{run} on every member of the suite.
+ """
+ for test in self._tests:
+ if result.shouldStop:
+ break
+ test(result)
+ return result
+
+
+
+@implementer(itrial.ITestCase)
+class TestDecorator(components.proxyForInterface(itrial.ITestCase,
+ "_originalTest")):
+ """
+ Decorator for test cases.
+
+ @param _originalTest: The wrapped instance of test.
+ @type _originalTest: A provider of L{itrial.ITestCase}
+ """
+
+ def __call__(self, result):
+ """
+ Run the unit test.
+
+ @param result: A TestResult object.
+ """
+ return self.run(result)
+
+
+ def run(self, result):
+ """
+ Run the unit test.
+
+ @param result: A TestResult object.
+ """
+ return self._originalTest.run(
+ reporter._AdaptedReporter(result, self.__class__))
+
+
+
+def _clearSuite(suite):
+ """
+ Clear all tests from C{suite}.
+
+ This messes with the internals of C{suite}. In particular, it assumes that
+ the suite keeps all of its tests in a list in an instance variable called
+ C{_tests}.
+ """
+ suite._tests = []
+
+
+
+def decorate(test, decorator):
+ """
+ Decorate all test cases in C{test} with C{decorator}.
+
+ C{test} can be a test case or a test suite. If it is a test suite, then the
+ structure of the suite is preserved.
+
+ L{decorate} tries to preserve the class of the test suites it finds, but
+ assumes the presence of the C{_tests} attribute on the suite.
+
+ @param test: The C{TestCase} or C{TestSuite} to decorate.
+
+ @param decorator: A unary callable used to decorate C{TestCase}s.
+
+ @return: A decorated C{TestCase} or a C{TestSuite} containing decorated
+ C{TestCase}s.
+ """
+
+ try:
+ tests = iter(test)
+ except TypeError:
+ return decorator(test)
+
+ # At this point, we know that 'test' is a test suite.
+ _clearSuite(test)
+
+ for case in tests:
+ test.addTest(decorate(case, decorator))
+ return test
+
+
+
+class _PyUnitTestCaseAdapter(TestDecorator):
+ """
+ Adapt from pyunit.TestCase to ITestCase.
+ """
+
+
+
+class _BrokenIDTestCaseAdapter(_PyUnitTestCaseAdapter):
+ """
+ Adapter for pyunit-style C{TestCase} subclasses that have undesirable id()
+ methods. That is C{unittest.FunctionTestCase} and C{unittest.DocTestCase}.
+ """
+
+ def id(self):
+ """
+ Return the fully-qualified Python name of the doctest.
+ """
+ testID = self._originalTest.shortDescription()
+ if testID is not None:
+ return testID
+ return self._originalTest.id()
+
+
+
+class _ForceGarbageCollectionDecorator(TestDecorator):
+ """
+ Forces garbage collection to be run before and after the test. Any errors
+ logged during the post-test collection are added to the test result as
+ errors.
+ """
+
+ def run(self, result):
+ gc.collect()
+ TestDecorator.run(self, result)
+ _logObserver._add()
+ gc.collect()
+ for error in _logObserver.getErrors():
+ result.addError(self, error)
+ _logObserver.flushErrors()
+ _logObserver._remove()
+
+
+components.registerAdapter(
+ _PyUnitTestCaseAdapter, pyunit.TestCase, itrial.ITestCase)
+
+
+components.registerAdapter(
+ _BrokenIDTestCaseAdapter, pyunit.FunctionTestCase, itrial.ITestCase)
+
+
+_docTestCase = getattr(doctest, 'DocTestCase', None)
+if _docTestCase:
+ components.registerAdapter(
+ _BrokenIDTestCaseAdapter, _docTestCase, itrial.ITestCase)
+
+
+
+def _iterateTests(testSuiteOrCase):
+ """
+ Iterate through all of the test cases in C{testSuiteOrCase}.
+ """
+ try:
+ suite = iter(testSuiteOrCase)
+ except TypeError:
+ yield testSuiteOrCase
+ else:
+ for test in suite:
+ for subtest in _iterateTests(test):
+ yield subtest
diff --git a/contrib/python/Twisted/py2/twisted/trial/_asynctest.py b/contrib/python/Twisted/py2/twisted/trial/_asynctest.py
new file mode 100644
index 0000000000..c7a7090cf3
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_asynctest.py
@@ -0,0 +1,405 @@
+# -*- test-case-name: twisted.trial.test -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Things likely to be used by writers of unit tests.
+
+Maintainer: Jonathan Lange
+"""
+
+from __future__ import division, absolute_import
+
+import inspect
+import warnings
+
+from zope.interface import implementer
+
+# We can't import reactor at module-level because this code runs before trial
+# installs a user-specified reactor, installing the default reactor and
+# breaking reactor installation. See also #6047.
+from twisted.internet import defer, utils
+from twisted.python import failure
+
+from twisted.trial import itrial, util
+from twisted.trial._synctest import (
+ FailTest, SkipTest, SynchronousTestCase)
+
+_wait_is_running = []
+
+@implementer(itrial.ITestCase)
+class TestCase(SynchronousTestCase):
+ """
+ A unit test. The atom of the unit testing universe.
+
+ This class extends L{SynchronousTestCase} which extends C{unittest.TestCase}
+ from the standard library. The main feature is the ability to return
+ C{Deferred}s from tests and fixture methods and to have the suite wait for
+ those C{Deferred}s to fire. Also provides new assertions such as
+ L{assertFailure}.
+
+ @ivar timeout: A real number of seconds. If set, the test will
+ raise an error if it takes longer than C{timeout} seconds.
+ If not set, util.DEFAULT_TIMEOUT_DURATION is used.
+ """
+
+ def __init__(self, methodName='runTest'):
+ """
+ Construct an asynchronous test case for C{methodName}.
+
+ @param methodName: The name of a method on C{self}. This method should
+ be a unit test. That is, it should be a short method that calls some of
+ the assert* methods. If C{methodName} is unspecified,
+ L{SynchronousTestCase.runTest} will be used as the test method. This is
+ mostly useful for testing Trial.
+ """
+ super(TestCase, self).__init__(methodName)
+
+
+ def assertFailure(self, deferred, *expectedFailures):
+ """
+ Fail if C{deferred} does not errback with one of C{expectedFailures}.
+ Returns the original Deferred with callbacks added. You will need
+ to return this Deferred from your test case.
+ """
+ def _cb(ignore):
+ raise self.failureException(
+ "did not catch an error, instead got %r" % (ignore,))
+
+ def _eb(failure):
+ if failure.check(*expectedFailures):
+ return failure.value
+ else:
+ output = ('\nExpected: %r\nGot:\n%s'
+ % (expectedFailures, str(failure)))
+ raise self.failureException(output)
+ return deferred.addCallbacks(_cb, _eb)
+ failUnlessFailure = assertFailure
+
+
+ def _run(self, methodName, result):
+ from twisted.internet import reactor
+ timeout = self.getTimeout()
+ def onTimeout(d):
+ e = defer.TimeoutError("%r (%s) still running at %s secs"
+ % (self, methodName, timeout))
+ f = failure.Failure(e)
+ # try to errback the deferred that the test returns (for no gorram
+ # reason) (see issue1005 and test_errorPropagation in
+ # test_deferred)
+ try:
+ d.errback(f)
+ except defer.AlreadyCalledError:
+ # if the deferred has been called already but the *back chain
+ # is still unfinished, crash the reactor and report timeout
+ # error ourself.
+ reactor.crash()
+ self._timedOut = True # see self._wait
+ todo = self.getTodo()
+ if todo is not None and todo.expected(f):
+ result.addExpectedFailure(self, f, todo)
+ else:
+ result.addError(self, f)
+ onTimeout = utils.suppressWarnings(
+ onTimeout, util.suppress(category=DeprecationWarning))
+ method = getattr(self, methodName)
+ if inspect.isgeneratorfunction(method):
+ exc = TypeError(
+ '%r is a generator function and therefore will never run' % (
+ method,))
+ return defer.fail(exc)
+ d = defer.maybeDeferred(
+ utils.runWithWarningsSuppressed, self._getSuppress(), method)
+ call = reactor.callLater(timeout, onTimeout, d)
+ d.addBoth(lambda x : call.active() and call.cancel() or x)
+ return d
+
+
+ def __call__(self, *args, **kwargs):
+ return self.run(*args, **kwargs)
+
+
+ def deferSetUp(self, ignored, result):
+ d = self._run('setUp', result)
+ d.addCallbacks(self.deferTestMethod, self._ebDeferSetUp,
+ callbackArgs=(result,),
+ errbackArgs=(result,))
+ return d
+
+
+ def _ebDeferSetUp(self, failure, result):
+ if failure.check(SkipTest):
+ result.addSkip(self, self._getSkipReason(self.setUp, failure.value))
+ else:
+ result.addError(self, failure)
+ if failure.check(KeyboardInterrupt):
+ result.stop()
+ return self.deferRunCleanups(None, result)
+
+
+ def deferTestMethod(self, ignored, result):
+ d = self._run(self._testMethodName, result)
+ d.addCallbacks(self._cbDeferTestMethod, self._ebDeferTestMethod,
+ callbackArgs=(result,),
+ errbackArgs=(result,))
+ d.addBoth(self.deferRunCleanups, result)
+ d.addBoth(self.deferTearDown, result)
+ return d
+
+
+ def _cbDeferTestMethod(self, ignored, result):
+ if self.getTodo() is not None:
+ result.addUnexpectedSuccess(self, self.getTodo())
+ else:
+ self._passed = True
+ return ignored
+
+
+ def _ebDeferTestMethod(self, f, result):
+ todo = self.getTodo()
+ if todo is not None and todo.expected(f):
+ result.addExpectedFailure(self, f, todo)
+ elif f.check(self.failureException, FailTest):
+ result.addFailure(self, f)
+ elif f.check(KeyboardInterrupt):
+ result.addError(self, f)
+ result.stop()
+ elif f.check(SkipTest):
+ result.addSkip(
+ self,
+ self._getSkipReason(getattr(self, self._testMethodName), f.value))
+ else:
+ result.addError(self, f)
+
+
+ def deferTearDown(self, ignored, result):
+ d = self._run('tearDown', result)
+ d.addErrback(self._ebDeferTearDown, result)
+ return d
+
+
+ def _ebDeferTearDown(self, failure, result):
+ result.addError(self, failure)
+ if failure.check(KeyboardInterrupt):
+ result.stop()
+ self._passed = False
+
+
+ def deferRunCleanups(self, ignored, result):
+ """
+ Run any scheduled cleanups and report errors (if any to the result
+ object.
+ """
+ d = self._runCleanups()
+ d.addCallback(self._cbDeferRunCleanups, result)
+ return d
+
+
+ def _cbDeferRunCleanups(self, cleanupResults, result):
+ for flag, testFailure in cleanupResults:
+ if flag == defer.FAILURE:
+ result.addError(self, testFailure)
+ if testFailure.check(KeyboardInterrupt):
+ result.stop()
+ self._passed = False
+
+
+ def _cleanUp(self, result):
+ try:
+ clean = util._Janitor(self, result).postCaseCleanup()
+ if not clean:
+ self._passed = False
+ except:
+ result.addError(self, failure.Failure())
+ self._passed = False
+ for error in self._observer.getErrors():
+ result.addError(self, error)
+ self._passed = False
+ self.flushLoggedErrors()
+ self._removeObserver()
+ if self._passed:
+ result.addSuccess(self)
+
+
+ def _classCleanUp(self, result):
+ try:
+ util._Janitor(self, result).postClassCleanup()
+ except:
+ result.addError(self, failure.Failure())
+
+
+ def _makeReactorMethod(self, name):
+ """
+ Create a method which wraps the reactor method C{name}. The new
+ method issues a deprecation warning and calls the original.
+ """
+ def _(*a, **kw):
+ warnings.warn("reactor.%s cannot be used inside unit tests. "
+ "In the future, using %s will fail the test and may "
+ "crash or hang the test run."
+ % (name, name),
+ stacklevel=2, category=DeprecationWarning)
+ return self._reactorMethods[name](*a, **kw)
+ return _
+
+
+ def _deprecateReactor(self, reactor):
+ """
+ Deprecate C{iterate}, C{crash} and C{stop} on C{reactor}. That is,
+ each method is wrapped in a function that issues a deprecation
+ warning, then calls the original.
+
+ @param reactor: The Twisted reactor.
+ """
+ self._reactorMethods = {}
+ for name in ['crash', 'iterate', 'stop']:
+ self._reactorMethods[name] = getattr(reactor, name)
+ setattr(reactor, name, self._makeReactorMethod(name))
+
+
+ def _undeprecateReactor(self, reactor):
+ """
+ Restore the deprecated reactor methods. Undoes what
+ L{_deprecateReactor} did.
+
+ @param reactor: The Twisted reactor.
+ """
+ for name, method in self._reactorMethods.items():
+ setattr(reactor, name, method)
+ self._reactorMethods = {}
+
+
+ def _runCleanups(self):
+ """
+ Run the cleanups added with L{addCleanup} in order.
+
+ @return: A C{Deferred} that fires when all cleanups are run.
+ """
+ def _makeFunction(f, args, kwargs):
+ return lambda: f(*args, **kwargs)
+ callables = []
+ while len(self._cleanups) > 0:
+ f, args, kwargs = self._cleanups.pop()
+ callables.append(_makeFunction(f, args, kwargs))
+ return util._runSequentially(callables)
+
+
+ def _runFixturesAndTest(self, result):
+ """
+ Really run C{setUp}, the test method, and C{tearDown}. Any of these may
+ return L{defer.Deferred}s. After they complete, do some reactor cleanup.
+
+ @param result: A L{TestResult} object.
+ """
+ from twisted.internet import reactor
+ self._deprecateReactor(reactor)
+ self._timedOut = False
+ try:
+ d = self.deferSetUp(None, result)
+ try:
+ self._wait(d)
+ finally:
+ self._cleanUp(result)
+ self._classCleanUp(result)
+ finally:
+ self._undeprecateReactor(reactor)
+
+
+ def addCleanup(self, f, *args, **kwargs):
+ """
+ Extend the base cleanup feature with support for cleanup functions which
+ return Deferreds.
+
+ If the function C{f} returns a Deferred, C{TestCase} will wait until the
+ Deferred has fired before proceeding to the next function.
+ """
+ return super(TestCase, self).addCleanup(f, *args, **kwargs)
+
+
+ def getSuppress(self):
+ return self._getSuppress()
+
+
+ def getTimeout(self):
+ """
+ Returns the timeout value set on this test. Checks on the instance
+ first, then the class, then the module, then packages. As soon as it
+ finds something with a C{timeout} attribute, returns that. Returns
+ L{util.DEFAULT_TIMEOUT_DURATION} if it cannot find anything. See
+ L{TestCase} docstring for more details.
+ """
+ timeout = util.acquireAttribute(self._parents, 'timeout',
+ util.DEFAULT_TIMEOUT_DURATION)
+ try:
+ return float(timeout)
+ except (ValueError, TypeError):
+ # XXX -- this is here because sometimes people will have methods
+ # called 'timeout', or set timeout to 'orange', or something
+ # Particularly, test_news.NewsTestCase and ReactorCoreTestCase
+ # both do this.
+ warnings.warn("'timeout' attribute needs to be a number.",
+ category=DeprecationWarning)
+ return util.DEFAULT_TIMEOUT_DURATION
+
+
+ def _wait(self, d, running=_wait_is_running):
+ """Take a Deferred that only ever callbacks. Block until it happens.
+ """
+ if running:
+ raise RuntimeError("_wait is not reentrant")
+
+ from twisted.internet import reactor
+ results = []
+ def append(any):
+ if results is not None:
+ results.append(any)
+ def crash(ign):
+ if results is not None:
+ reactor.crash()
+ crash = utils.suppressWarnings(
+ crash, util.suppress(message=r'reactor\.crash cannot be used.*',
+ category=DeprecationWarning))
+ def stop():
+ reactor.crash()
+ stop = utils.suppressWarnings(
+ stop, util.suppress(message=r'reactor\.crash cannot be used.*',
+ category=DeprecationWarning))
+
+ running.append(None)
+ try:
+ d.addBoth(append)
+ if results:
+ # d might have already been fired, in which case append is
+ # called synchronously. Avoid any reactor stuff.
+ return
+ d.addBoth(crash)
+ reactor.stop = stop
+ try:
+ reactor.run()
+ finally:
+ del reactor.stop
+
+ # If the reactor was crashed elsewhere due to a timeout, hopefully
+ # that crasher also reported an error. Just return.
+ # _timedOut is most likely to be set when d has fired but hasn't
+ # completed its callback chain (see self._run)
+ if results or self._timedOut: #defined in run() and _run()
+ return
+
+ # If the timeout didn't happen, and we didn't get a result or
+ # a failure, then the user probably aborted the test, so let's
+ # just raise KeyboardInterrupt.
+
+ # FIXME: imagine this:
+ # web/test/test_webclient.py:
+ # exc = self.assertRaises(error.Error, wait, method(url))
+ #
+ # wait() will raise KeyboardInterrupt, and assertRaises will
+ # swallow it. Therefore, wait() raising KeyboardInterrupt is
+ # insufficient to stop trial. A suggested solution is to have
+ # this code set a "stop trial" flag, or otherwise notify trial
+ # that it should really try to stop as soon as possible.
+ raise KeyboardInterrupt()
+ finally:
+ results = None
+ running.pop()
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/__init__.py b/contrib/python/Twisted/py2/twisted/trial/_dist/__init__.py
new file mode 100644
index 0000000000..502e840fef
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/__init__.py
@@ -0,0 +1,47 @@
+# -*- test-case-name: twisted.trial._dist.test -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+This package implements the distributed Trial test runner:
+
+ - The L{twisted.trial._dist.disttrial} module implements a test runner which
+ runs in a manager process and can launch additional worker processes in
+ which to run tests and gather up results from all of them.
+
+ - The L{twisted.trial._dist.options} module defines command line options used
+ to configure the distributed test runner.
+
+ - The L{twisted.trial._dist.managercommands} module defines AMP commands
+ which are sent from worker processes back to the manager process to report
+ the results of tests.
+
+ - The L{twisted.trial._dist.workercommands} module defines AMP commands which
+ are sent from the manager process to the worker processes to control the
+ execution of tests there.
+
+ - The L{twisted.trial._dist.distreporter} module defines a proxy for
+ L{twisted.trial.itrial.IReporter} which enforces the typical requirement
+ that results be passed to a reporter for only one test at a time, allowing
+ any reporter to be used with despite disttrial's simultaneously running
+ tests.
+
+ - The L{twisted.trial._dist.workerreporter} module implements a
+ L{twisted.trial.itrial.IReporter} which is used by worker processes and
+ reports results back to the manager process using AMP commands.
+
+ - The L{twisted.trial._dist.workertrial} module is a runnable script which is
+ the main point for worker processes.
+
+ - The L{twisted.trial._dist.worker} process defines the manager's AMP
+ protocol for accepting results from worker processes and a process protocol
+ for use running workers as local child processes (as opposed to
+ distributing them to another host).
+
+@since: 12.3
+"""
+
+# File descriptors numbers used to set up pipes with the worker.
+_WORKER_AMP_STDIN = 3
+
+_WORKER_AMP_STDOUT = 4
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/distreporter.py b/contrib/python/Twisted/py2/twisted/trial/_dist/distreporter.py
new file mode 100644
index 0000000000..6648b7a0aa
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/distreporter.py
@@ -0,0 +1,93 @@
+# -*- test-case-name: twisted.trial._dist.test.test_distreporter -*-
+#
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+The reporter is not made to support concurrent test running, so we will
+hold test results in here and only send them to the reporter once the
+test is over.
+
+@since: 12.3
+"""
+
+from zope.interface import implementer
+from twisted.trial.itrial import IReporter
+from twisted.python.components import proxyForInterface
+
+
+
+@implementer(IReporter)
+class DistReporter(proxyForInterface(IReporter)):
+ """
+ See module docstring.
+ """
+
+ def __init__(self, original):
+ super(DistReporter, self).__init__(original)
+ self.running = {}
+
+
+ def startTest(self, test):
+ """
+ Queue test starting.
+ """
+ self.running[test.id()] = []
+ self.running[test.id()].append((self.original.startTest, test))
+
+
+ def addFailure(self, test, fail):
+ """
+ Queue adding a failure.
+ """
+ self.running[test.id()].append((self.original.addFailure,
+ test, fail))
+
+
+ def addError(self, test, error):
+ """
+ Queue error adding.
+ """
+ self.running[test.id()].append((self.original.addError,
+ test, error))
+
+
+ def addSkip(self, test, reason):
+ """
+ Queue adding a skip.
+ """
+ self.running[test.id()].append((self.original.addSkip,
+ test, reason))
+
+
+ def addUnexpectedSuccess(self, test, todo=None):
+ """
+ Queue adding an unexpected success.
+ """
+ self.running[test.id()].append((self.original.addUnexpectedSuccess,
+ test, todo))
+
+
+ def addExpectedFailure(self, test, error, todo=None):
+ """
+ Queue adding an unexpected failure.
+ """
+ self.running[test.id()].append((self.original.addExpectedFailure,
+ test, error, todo))
+
+
+ def addSuccess(self, test):
+ """
+ Queue adding a success.
+ """
+ self.running[test.id()].append((self.original.addSuccess, test))
+
+
+ def stopTest(self, test):
+ """
+ Queue stopping the test, then unroll the queue.
+ """
+ self.running[test.id()].append((self.original.stopTest, test))
+ for step in self.running[test.id()]:
+ step[0](*step[1:])
+ del self.running[test.id()]
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py b/contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py
new file mode 100644
index 0000000000..96875b4a85
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py
@@ -0,0 +1,258 @@
+# -*- test-case-name: twisted.trial._dist.test.test_disttrial -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+This module contains the trial distributed runner, the management class
+responsible for coordinating all of trial's behavior at the highest level.
+
+@since: 12.3
+"""
+
+import os
+import sys
+
+from twisted.python.filepath import FilePath
+from twisted.python.modules import theSystemPath
+from twisted.internet.defer import DeferredList
+from twisted.internet.task import cooperate
+
+from twisted.trial.util import _unusedTestDirectory
+from twisted.trial._asyncrunner import _iterateTests
+from twisted.trial._dist.worker import LocalWorker, LocalWorkerAMP
+from twisted.trial._dist.distreporter import DistReporter
+from twisted.trial.reporter import UncleanWarningsReporterWrapper
+from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT
+
+
+
+class DistTrialRunner(object):
+ """
+ A specialized runner for distributed trial. The runner launches a number of
+ local worker processes which will run tests.
+
+ @ivar _workerNumber: the number of workers to be spawned.
+ @type _workerNumber: C{int}
+
+ @ivar _stream: stream which the reporter will use.
+
+ @ivar _reporterFactory: the reporter class to be used.
+ """
+ _distReporterFactory = DistReporter
+
+ def _makeResult(self):
+ """
+ Make reporter factory, and wrap it with a L{DistReporter}.
+ """
+ reporter = self._reporterFactory(self._stream, self._tbformat,
+ realtime=self._rterrors)
+ if self._uncleanWarnings:
+ reporter = UncleanWarningsReporterWrapper(reporter)
+ return self._distReporterFactory(reporter)
+
+
+ def __init__(self, reporterFactory, workerNumber, workerArguments,
+ stream=None,
+ tracebackFormat='default',
+ realTimeErrors=False,
+ uncleanWarnings=False,
+ logfile='test.log',
+ workingDirectory='_trial_temp'):
+ self._workerNumber = workerNumber
+ self._workerArguments = workerArguments
+ self._reporterFactory = reporterFactory
+ if stream is None:
+ stream = sys.stdout
+ self._stream = stream
+ self._tbformat = tracebackFormat
+ self._rterrors = realTimeErrors
+ self._uncleanWarnings = uncleanWarnings
+ self._result = None
+ self._workingDirectory = workingDirectory
+ self._logFile = logfile
+ self._logFileObserver = None
+ self._logFileObject = None
+ self._logWarnings = False
+
+
+ def writeResults(self, result):
+ """
+ Write test run final outcome to result.
+
+ @param result: A C{TestResult} which will print errors and the summary.
+ """
+ result.done()
+
+
+ def createLocalWorkers(self, protocols, workingDirectory):
+ """
+ Create local worker protocol instances and return them.
+
+ @param protocols: An iterable of L{LocalWorkerAMP} instances.
+
+ @param workingDirectory: The base path in which we should run the
+ workers.
+ @type workingDirectory: C{str}
+
+ @return: A list of C{quantity} C{LocalWorker} instances.
+ """
+ return [LocalWorker(protocol,
+ os.path.join(workingDirectory, str(x)),
+ self._logFile)
+ for x, protocol in enumerate(protocols)]
+
+
+ def launchWorkerProcesses(self, spawner, protocols, arguments):
+ """
+ Spawn processes from a list of process protocols.
+
+ @param spawner: A C{IReactorProcess.spawnProcess} implementation.
+
+ @param protocols: An iterable of C{ProcessProtocol} instances.
+
+ @param arguments: Extra arguments passed to the processes.
+ """
+ workertrialPath = theSystemPath[
+ 'twisted.trial._dist.workertrial'].filePath.path
+ childFDs = {0: 'w', 1: 'r', 2: 'r', _WORKER_AMP_STDIN: 'w',
+ _WORKER_AMP_STDOUT: 'r'}
+ environ = os.environ.copy()
+ # Add an environment variable containing the raw sys.path, to be used by
+ # subprocesses to make sure it's identical to the parent. See
+ # workertrial._setupPath.
+ environ['TRIAL_PYTHONPATH'] = os.pathsep.join(sys.path)
+ for worker in protocols:
+ args = [sys.executable, workertrialPath]
+ args.extend(arguments)
+ spawner(worker, sys.executable, args=args, childFDs=childFDs,
+ env=environ)
+
+
+ def _driveWorker(self, worker, result, testCases, cooperate):
+ """
+ Drive a L{LocalWorkerAMP} instance, iterating the tests and calling
+ C{run} for every one of them.
+
+ @param worker: The L{LocalWorkerAMP} to drive.
+
+ @param result: The global L{DistReporter} instance.
+
+ @param testCases: The global list of tests to iterate.
+
+ @param cooperate: The cooperate function to use, to be customized in
+ tests.
+ @type cooperate: C{function}
+
+ @return: A C{Deferred} firing when all the tests are finished.
+ """
+
+ def resultErrback(error, case):
+ result.original.addFailure(case, error)
+ return error
+
+ def task(case):
+ d = worker.run(case, result)
+ d.addErrback(resultErrback, case)
+ return d
+
+ return cooperate(task(case) for case in testCases).whenDone()
+
+
+ def run(self, suite, reactor=None, cooperate=cooperate,
+ untilFailure=False):
+ """
+ Spawn local worker processes and load tests. After that, run them.
+
+ @param suite: A tests suite to be run.
+
+ @param reactor: The reactor to use, to be customized in tests.
+ @type reactor: A provider of
+ L{twisted.internet.interfaces.IReactorProcess}
+
+ @param cooperate: The cooperate function to use, to be customized in
+ tests.
+ @type cooperate: C{function}
+
+ @param untilFailure: If C{True}, continue to run the tests until they
+ fail.
+ @type untilFailure: C{bool}.
+
+ @return: The test result.
+ @rtype: L{DistReporter}
+ """
+ if reactor is None:
+ from twisted.internet import reactor
+ result = self._makeResult()
+ count = suite.countTestCases()
+ self._stream.write("Running %d tests.\n" % (count,))
+
+ if not count:
+ # Take a shortcut if there is no test
+ suite.run(result.original)
+ self.writeResults(result)
+ return result
+
+ testDir, testDirLock = _unusedTestDirectory(
+ FilePath(self._workingDirectory))
+ workerNumber = min(count, self._workerNumber)
+ ampWorkers = [LocalWorkerAMP() for x in range(workerNumber)]
+ workers = self.createLocalWorkers(ampWorkers, testDir.path)
+ processEndDeferreds = [worker.endDeferred for worker in workers]
+ self.launchWorkerProcesses(reactor.spawnProcess, workers,
+ self._workerArguments)
+
+ def runTests():
+ testCases = iter(list(_iterateTests(suite)))
+
+ workerDeferreds = []
+ for worker in ampWorkers:
+ workerDeferreds.append(
+ self._driveWorker(worker, result, testCases,
+ cooperate=cooperate))
+ return DeferredList(workerDeferreds, consumeErrors=True,
+ fireOnOneErrback=True)
+
+ stopping = []
+
+ def nextRun(ign):
+ self.writeResults(result)
+ if not untilFailure:
+ return
+ if not result.wasSuccessful():
+ return
+ d = runTests()
+ return d.addCallback(nextRun)
+
+ def stop(ign):
+ testDirLock.unlock()
+ if not stopping:
+ stopping.append(None)
+ reactor.stop()
+
+ def beforeShutDown():
+ if not stopping:
+ stopping.append(None)
+ d = DeferredList(processEndDeferreds, consumeErrors=True)
+ return d.addCallback(continueShutdown)
+
+ def continueShutdown(ign):
+ self.writeResults(result)
+ return ign
+
+ d = runTests()
+ d.addCallback(nextRun)
+ d.addBoth(stop)
+
+ reactor.addSystemEventTrigger('before', 'shutdown', beforeShutDown)
+ reactor.run()
+
+ return result
+
+
+ def runUntilFailure(self, suite):
+ """
+ Run the tests with local worker processes until they fail.
+
+ @param suite: A tests suite to be run.
+ """
+ return self.run(suite, untilFailure=True)
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/managercommands.py b/contrib/python/Twisted/py2/twisted/trial/_dist/managercommands.py
new file mode 100644
index 0000000000..4e76b81ddf
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/managercommands.py
@@ -0,0 +1,86 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Commands for reporting test success of failure to the manager.
+
+@since: 12.3
+"""
+
+from twisted.protocols.amp import Command, String, Boolean, ListOf, Unicode
+from twisted.python.compat import _PY3
+
+NativeString = Unicode if _PY3 else String
+
+
+
+class AddSuccess(Command):
+ """
+ Add a success.
+ """
+ arguments = [(b'testName', NativeString())]
+ response = [(b'success', Boolean())]
+
+
+
+class AddError(Command):
+ """
+ Add an error.
+ """
+ arguments = [(b'testName', NativeString()),
+ (b'error', NativeString()),
+ (b'errorClass', NativeString()),
+ (b'frames', ListOf(NativeString()))]
+ response = [(b'success', Boolean())]
+
+
+
+class AddFailure(Command):
+ """
+ Add a failure.
+ """
+ arguments = [(b'testName', NativeString()),
+ (b'fail', NativeString()),
+ (b'failClass', NativeString()),
+ (b'frames', ListOf(NativeString()))]
+ response = [(b'success', Boolean())]
+
+
+
+class AddSkip(Command):
+ """
+ Add a skip.
+ """
+ arguments = [(b'testName', NativeString()),
+ (b'reason', NativeString())]
+ response = [(b'success', Boolean())]
+
+
+
+class AddExpectedFailure(Command):
+ """
+ Add an expected failure.
+ """
+ arguments = [(b'testName', NativeString()),
+ (b'error', NativeString()),
+ (b'todo', NativeString())]
+ response = [(b'success', Boolean())]
+
+
+
+class AddUnexpectedSuccess(Command):
+ """
+ Add an unexpected success.
+ """
+ arguments = [(b'testName', NativeString()),
+ (b'todo', NativeString())]
+ response = [(b'success', Boolean())]
+
+
+
+class TestWrite(Command):
+ """
+ Write test log.
+ """
+ arguments = [(b'out', NativeString())]
+ response = [(b'success', Boolean())]
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/options.py b/contrib/python/Twisted/py2/twisted/trial/_dist/options.py
new file mode 100644
index 0000000000..ee5ccd887a
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/options.py
@@ -0,0 +1,30 @@
+# -*- test-case-name: twisted.trial._dist.test.test_options -*-
+#
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Options handling specific to trial's workers.
+
+@since: 12.3
+"""
+
+from twisted.python.filepath import FilePath
+from twisted.python.usage import Options
+from twisted.scripts.trial import _BasicOptions
+from twisted.application.app import ReactorSelectionMixin
+
+
+
+class WorkerOptions(_BasicOptions, Options, ReactorSelectionMixin):
+ """
+ Options forwarded to the trial distributed worker.
+ """
+
+
+ def coverdir(self):
+ """
+ Return a L{FilePath} representing the directory into which coverage
+ results should be written.
+ """
+ return FilePath('coverage')
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/worker.py b/contrib/python/Twisted/py2/twisted/trial/_dist/worker.py
new file mode 100644
index 0000000000..ef13c069d6
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/worker.py
@@ -0,0 +1,333 @@
+# -*- test-case-name: twisted.trial._dist.test.test_worker -*-
+#
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+This module implements the worker classes.
+
+@since: 12.3
+"""
+
+import os
+
+from zope.interface import implementer
+
+from twisted.internet.protocol import ProcessProtocol
+from twisted.internet.interfaces import ITransport, IAddress
+from twisted.internet.defer import Deferred
+from twisted.protocols.amp import AMP
+from twisted.python.failure import Failure
+from twisted.python.reflect import namedObject
+from twisted.trial.unittest import Todo
+from twisted.trial.runner import TrialSuite, TestLoader
+from twisted.trial._dist import workercommands, managercommands
+from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT
+from twisted.trial._dist.workerreporter import WorkerReporter
+
+
+
+class WorkerProtocol(AMP):
+ """
+ The worker-side trial distributed protocol.
+ """
+
+ def __init__(self, forceGarbageCollection=False):
+ self._loader = TestLoader()
+ self._result = WorkerReporter(self)
+ self._forceGarbageCollection = forceGarbageCollection
+
+
+ def run(self, testCase):
+ """
+ Run a test case by name.
+ """
+ case = self._loader.loadByName(testCase)
+ suite = TrialSuite([case], self._forceGarbageCollection)
+ suite.run(self._result)
+ return {'success': True}
+
+ workercommands.Run.responder(run)
+
+
+ def start(self, directory):
+ """
+ Set up the worker, moving into given directory for tests to run in
+ them.
+ """
+ os.chdir(directory)
+ return {'success': True}
+
+ workercommands.Start.responder(start)
+
+
+
+class LocalWorkerAMP(AMP):
+ """
+ Local implementation of the manager commands.
+ """
+
+ def addSuccess(self, testName):
+ """
+ Add a success to the reporter.
+ """
+ self._result.addSuccess(self._testCase)
+ return {'success': True}
+
+ managercommands.AddSuccess.responder(addSuccess)
+
+
+ def _buildFailure(self, error, errorClass, frames):
+ """
+ Helper to build a C{Failure} with some traceback.
+
+ @param error: An C{Exception} instance.
+
+ @param error: The class name of the C{error} class.
+
+ @param frames: A flat list of strings representing the information need
+ to approximatively rebuild C{Failure} frames.
+
+ @return: A L{Failure} instance with enough information about a test
+ error.
+ """
+ errorType = namedObject(errorClass)
+ failure = Failure(error, errorType)
+ for i in range(0, len(frames), 3):
+ failure.frames.append(
+ (frames[i], frames[i + 1], int(frames[i + 2]), [], []))
+ return failure
+
+
+ def addError(self, testName, error, errorClass, frames):
+ """
+ Add an error to the reporter.
+ """
+ failure = self._buildFailure(error, errorClass, frames)
+ self._result.addError(self._testCase, failure)
+ return {'success': True}
+
+ managercommands.AddError.responder(addError)
+
+
+ def addFailure(self, testName, fail, failClass, frames):
+ """
+ Add a failure to the reporter.
+ """
+ failure = self._buildFailure(fail, failClass, frames)
+ self._result.addFailure(self._testCase, failure)
+ return {'success': True}
+
+ managercommands.AddFailure.responder(addFailure)
+
+
+ def addSkip(self, testName, reason):
+ """
+ Add a skip to the reporter.
+ """
+ self._result.addSkip(self._testCase, reason)
+ return {'success': True}
+
+ managercommands.AddSkip.responder(addSkip)
+
+
+ def addExpectedFailure(self, testName, error, todo):
+ """
+ Add an expected failure to the reporter.
+ """
+ _todo = Todo(todo)
+ self._result.addExpectedFailure(self._testCase, error, _todo)
+ return {'success': True}
+
+ managercommands.AddExpectedFailure.responder(addExpectedFailure)
+
+
+ def addUnexpectedSuccess(self, testName, todo):
+ """
+ Add an unexpected success to the reporter.
+ """
+ self._result.addUnexpectedSuccess(self._testCase, todo)
+ return {'success': True}
+
+ managercommands.AddUnexpectedSuccess.responder(addUnexpectedSuccess)
+
+
+ def testWrite(self, out):
+ """
+ Print test output from the worker.
+ """
+ self._testStream.write(out + '\n')
+ self._testStream.flush()
+ return {'success': True}
+
+ managercommands.TestWrite.responder(testWrite)
+
+
+ def _stopTest(self, result):
+ """
+ Stop the current running test case, forwarding the result.
+ """
+ self._result.stopTest(self._testCase)
+ return result
+
+
+ def run(self, testCase, result):
+ """
+ Run a test.
+ """
+ self._testCase = testCase
+ self._result = result
+ self._result.startTest(testCase)
+ testCaseId = testCase.id()
+ d = self.callRemote(workercommands.Run, testCase=testCaseId)
+ return d.addCallback(self._stopTest)
+
+
+ def setTestStream(self, stream):
+ """
+ Set the stream used to log output from tests.
+ """
+ self._testStream = stream
+
+
+
+@implementer(IAddress)
+class LocalWorkerAddress(object):
+ """
+ A L{IAddress} implementation meant to provide stub addresses for
+ L{ITransport.getPeer} and L{ITransport.getHost}.
+ """
+
+
+
+@implementer(ITransport)
+class LocalWorkerTransport(object):
+ """
+ A stub transport implementation used to support L{AMP} over a
+ L{ProcessProtocol} transport.
+ """
+
+ def __init__(self, transport):
+ self._transport = transport
+
+
+ def write(self, data):
+ """
+ Forward data to transport.
+ """
+ self._transport.writeToChild(_WORKER_AMP_STDIN, data)
+
+
+ def writeSequence(self, sequence):
+ """
+ Emulate C{writeSequence} by iterating data in the C{sequence}.
+ """
+ for data in sequence:
+ self._transport.writeToChild(_WORKER_AMP_STDIN, data)
+
+
+ def loseConnection(self):
+ """
+ Closes the transport.
+ """
+ self._transport.loseConnection()
+
+
+ def getHost(self):
+ """
+ Return a L{LocalWorkerAddress} instance.
+ """
+ return LocalWorkerAddress()
+
+
+ def getPeer(self):
+ """
+ Return a L{LocalWorkerAddress} instance.
+ """
+ return LocalWorkerAddress()
+
+
+
+class LocalWorker(ProcessProtocol):
+ """
+ Local process worker protocol. This worker runs as a local process and
+ communicates via stdin/out.
+
+ @ivar _ampProtocol: The L{AMP} protocol instance used to communicate with
+ the worker.
+
+ @ivar _logDirectory: The directory where logs will reside.
+
+ @ivar _logFile: The name of the main log file for tests output.
+ """
+
+ def __init__(self, ampProtocol, logDirectory, logFile):
+ self._ampProtocol = ampProtocol
+ self._logDirectory = logDirectory
+ self._logFile = logFile
+ self.endDeferred = Deferred()
+
+
+ def connectionMade(self):
+ """
+ When connection is made, create the AMP protocol instance.
+ """
+ self._ampProtocol.makeConnection(LocalWorkerTransport(self.transport))
+ if not os.path.exists(self._logDirectory):
+ os.makedirs(self._logDirectory)
+ self._outLog = open(os.path.join(self._logDirectory, 'out.log'), 'wb')
+ self._errLog = open(os.path.join(self._logDirectory, 'err.log'), 'wb')
+ self._testLog = open(
+ os.path.join(self._logDirectory, self._logFile), 'w')
+ self._ampProtocol.setTestStream(self._testLog)
+ logDirectory = self._logDirectory
+ d = self._ampProtocol.callRemote(workercommands.Start,
+ directory=logDirectory)
+ # Ignore the potential errors, the test suite will fail properly and it
+ # would just print garbage.
+ d.addErrback(lambda x: None)
+
+
+ def connectionLost(self, reason):
+ """
+ On connection lost, close the log files that we're managing for stdin
+ and stdout.
+ """
+ self._outLog.close()
+ self._errLog.close()
+ self._testLog.close()
+
+
+ def processEnded(self, reason):
+ """
+ When the process closes, call C{connectionLost} for cleanup purposes
+ and forward the information to the C{_ampProtocol}.
+ """
+ self.connectionLost(reason)
+ self._ampProtocol.connectionLost(reason)
+ self.endDeferred.callback(reason)
+
+
+ def outReceived(self, data):
+ """
+ Send data received from stdout to log.
+ """
+
+ self._outLog.write(data)
+
+
+ def errReceived(self, data):
+ """
+ Write error data to log.
+ """
+ self._errLog.write(data)
+
+
+ def childDataReceived(self, childFD, data):
+ """
+ Handle data received on the specific pipe for the C{_ampProtocol}.
+ """
+ if childFD == _WORKER_AMP_STDOUT:
+ self._ampProtocol.dataReceived(data)
+ else:
+ ProcessProtocol.childDataReceived(self, childFD, data)
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/workercommands.py b/contrib/python/Twisted/py2/twisted/trial/_dist/workercommands.py
new file mode 100644
index 0000000000..517cd67025
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/workercommands.py
@@ -0,0 +1,31 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Commands for telling a worker to load tests or run tests.
+
+@since: 12.3
+"""
+
+from twisted.protocols.amp import Command, String, Boolean, Unicode
+from twisted.python.compat import _PY3
+
+NativeString = Unicode if _PY3 else String
+
+
+
+class Run(Command):
+ """
+ Run a test.
+ """
+ arguments = [(b'testCase', NativeString())]
+ response = [(b'success', Boolean())]
+
+
+
+class Start(Command):
+ """
+ Set up the worker process, giving the running directory.
+ """
+ arguments = [(b'directory', NativeString())]
+ response = [(b'success', Boolean())]
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/workerreporter.py b/contrib/python/Twisted/py2/twisted/trial/_dist/workerreporter.py
new file mode 100644
index 0000000000..ce82c59994
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/workerreporter.py
@@ -0,0 +1,154 @@
+# -*- test-case-name: twisted.trial._dist.test.test_workerreporter -*-
+#
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Test reporter forwarding test results over trial distributed AMP commands.
+
+@since: 12.3
+"""
+
+from twisted.python.failure import Failure
+from twisted.python.reflect import qual
+from twisted.trial.reporter import TestResult
+from twisted.trial._dist import managercommands
+
+
+
+class WorkerReporter(TestResult):
+ """
+ Reporter for trial's distributed workers. We send things not through a
+ stream, but through an C{AMP} protocol's C{callRemote} method.
+
+ @ivar _DEFAULT_TODO: Default message for expected failures and
+ unexpected successes, used only if a C{Todo} is not provided.
+ """
+
+ _DEFAULT_TODO = 'Test expected to fail'
+
+ def __init__(self, ampProtocol):
+ """
+ @param ampProtocol: The communication channel with the trial
+ distributed manager which collects all test results.
+ @type ampProtocol: C{AMP}
+ """
+ super(WorkerReporter, self).__init__()
+ self.ampProtocol = ampProtocol
+
+
+ def _getFailure(self, error):
+ """
+ Convert a C{sys.exc_info()}-style tuple to a L{Failure}, if necessary.
+ """
+ if isinstance(error, tuple):
+ return Failure(error[1], error[0], error[2])
+ return error
+
+
+ def _getFrames(self, failure):
+ """
+ Extract frames from a C{Failure} instance.
+ """
+ frames = []
+ for frame in failure.frames:
+ frames.extend([frame[0], frame[1], str(frame[2])])
+ return frames
+
+
+ def addSuccess(self, test):
+ """
+ Send a success over.
+ """
+ super(WorkerReporter, self).addSuccess(test)
+ testName = test.id()
+ self.ampProtocol.callRemote(managercommands.AddSuccess,
+ testName=testName)
+
+
+ def addError(self, test, error):
+ """
+ Send an error over.
+ """
+ super(WorkerReporter, self).addError(test, error)
+ testName = test.id()
+ failure = self._getFailure(error)
+ error = failure.getErrorMessage()
+ errorClass = qual(failure.type)
+ frames = [frame for frame in self._getFrames(failure)]
+ self.ampProtocol.callRemote(managercommands.AddError,
+ testName=testName,
+ error=error,
+ errorClass=errorClass,
+ frames=frames)
+
+
+ def addFailure(self, test, fail):
+ """
+ Send a Failure over.
+ """
+ super(WorkerReporter, self).addFailure(test, fail)
+ testName = test.id()
+ failure = self._getFailure(fail)
+ fail = failure.getErrorMessage()
+ failClass = qual(failure.type)
+ frames = [frame for frame in self._getFrames(failure)]
+ self.ampProtocol.callRemote(managercommands.AddFailure,
+ testName=testName,
+ fail=fail,
+ failClass=failClass,
+ frames=frames)
+
+
+ def addSkip(self, test, reason):
+ """
+ Send a skip over.
+ """
+ super(WorkerReporter, self).addSkip(test, reason)
+ reason = str(reason)
+ testName = test.id()
+ self.ampProtocol.callRemote(managercommands.AddSkip,
+ testName=testName,
+ reason=reason)
+
+
+ def _getTodoReason(self, todo):
+ """
+ Get the reason for a C{Todo}.
+
+ If C{todo} is L{None}, return a sensible default.
+ """
+ if todo is None:
+ return self._DEFAULT_TODO
+ else:
+ return todo.reason
+
+
+ def addExpectedFailure(self, test, error, todo=None):
+ """
+ Send an expected failure over.
+ """
+ super(WorkerReporter, self).addExpectedFailure(test, error, todo)
+ errorMessage = error.getErrorMessage()
+ testName = test.id()
+ self.ampProtocol.callRemote(managercommands.AddExpectedFailure,
+ testName=testName,
+ error=errorMessage,
+ todo=self._getTodoReason(todo))
+
+
+ def addUnexpectedSuccess(self, test, todo=None):
+ """
+ Send an unexpected success over.
+ """
+ super(WorkerReporter, self).addUnexpectedSuccess(test, todo)
+ testName = test.id()
+ self.ampProtocol.callRemote(managercommands.AddUnexpectedSuccess,
+ testName=testName,
+ todo=self._getTodoReason(todo))
+
+
+ def printSummary(self):
+ """
+ I{Don't} print a summary
+ """
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/workertrial.py b/contrib/python/Twisted/py2/twisted/trial/_dist/workertrial.py
new file mode 100644
index 0000000000..dd14dd61ef
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/workertrial.py
@@ -0,0 +1,111 @@
+# -*- test-case-name: twisted.trial._dist.test.test_workertrial -*-
+#
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Implementation of C{AMP} worker commands, and main executable entry point for
+the workers.
+
+@since: 12.3
+"""
+
+import sys
+import os
+import errno
+
+
+
+def _setupPath(environ):
+ """
+ Override C{sys.path} with what the parent passed in B{TRIAL_PYTHONPATH}.
+
+ @see: twisted.trial._dist.disttrial.DistTrialRunner.launchWorkerProcesses
+ """
+ if 'TRIAL_PYTHONPATH' in environ:
+ sys.path[:] = environ['TRIAL_PYTHONPATH'].split(os.pathsep)
+
+
+_setupPath(os.environ)
+
+
+from twisted.internet.protocol import FileWrapper
+from twisted.python.log import startLoggingWithObserver, textFromEventDict
+from twisted.trial._dist.options import WorkerOptions
+from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT
+
+
+
+class WorkerLogObserver(object):
+ """
+ A log observer that forward its output to a C{AMP} protocol.
+ """
+
+ def __init__(self, protocol):
+ """
+ @param protocol: a connected C{AMP} protocol instance.
+ @type protocol: C{AMP}
+ """
+ self.protocol = protocol
+
+
+ def emit(self, eventDict):
+ """
+ Produce a log output.
+ """
+ from twisted.trial._dist import managercommands
+ text = textFromEventDict(eventDict)
+ if text is None:
+ return
+ self.protocol.callRemote(managercommands.TestWrite, out=text)
+
+
+
+def main(_fdopen=os.fdopen):
+ """
+ Main function to be run if __name__ == "__main__".
+
+ @param _fdopen: If specified, the function to use in place of C{os.fdopen}.
+ @param _fdopen: C{callable}
+ """
+ config = WorkerOptions()
+ config.parseOptions()
+
+ from twisted.trial._dist.worker import WorkerProtocol
+ workerProtocol = WorkerProtocol(config['force-gc'])
+
+ protocolIn = _fdopen(_WORKER_AMP_STDIN, 'rb')
+ protocolOut = _fdopen(_WORKER_AMP_STDOUT, 'wb')
+ workerProtocol.makeConnection(FileWrapper(protocolOut))
+
+ observer = WorkerLogObserver(workerProtocol)
+ startLoggingWithObserver(observer.emit, False)
+
+ while True:
+ try:
+ r = protocolIn.read(1)
+ except IOError as e:
+ if e.args[0] == errno.EINTR:
+ if sys.version_info < (3, 0):
+ sys.exc_clear()
+ continue
+ else:
+ raise
+ if r == b'':
+ break
+ else:
+ workerProtocol.dataReceived(r)
+ protocolOut.flush()
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ if config.tracer:
+ sys.settrace(None)
+ results = config.tracer.results()
+ results.write_results(show_missing=True, summary=False,
+ coverdir=config.coverdir().path)
+
+
+
+if __name__ == '__main__':
+ main()
diff --git a/contrib/python/Twisted/py2/twisted/trial/_synctest.py b/contrib/python/Twisted/py2/twisted/trial/_synctest.py
new file mode 100644
index 0000000000..2a76ca8e22
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_synctest.py
@@ -0,0 +1,1416 @@
+# -*- test-case-name: twisted.trial.test -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Things likely to be used by writers of unit tests.
+
+Maintainer: Jonathan Lange
+"""
+
+from __future__ import division, absolute_import
+
+import inspect
+import os, warnings, sys, tempfile, types
+from dis import findlinestarts as _findlinestarts
+
+from twisted.python import failure, log, monkey
+from twisted.python.reflect import fullyQualifiedName
+from twisted.python.util import runWithWarningsSuppressed
+from twisted.python.deprecate import (
+ getDeprecationWarningString, warnAboutFunction
+)
+from twisted.internet.defer import ensureDeferred
+
+from twisted.trial import itrial, util
+
+import unittest as pyunit
+
+# Python 2.7 and higher has skip support built-in
+SkipTest = pyunit.SkipTest
+
+
+
+class FailTest(AssertionError):
+ """
+ Raised to indicate the current test has failed to pass.
+ """
+
+
+
+class Todo(object):
+ """
+ Internal object used to mark a L{TestCase} as 'todo'. Tests marked 'todo'
+ are reported differently in Trial L{TestResult}s. If todo'd tests fail,
+ they do not fail the suite and the errors are reported in a separate
+ category. If todo'd tests succeed, Trial L{TestResult}s will report an
+ unexpected success.
+ """
+
+ def __init__(self, reason, errors=None):
+ """
+ @param reason: A string explaining why the test is marked 'todo'
+
+ @param errors: An iterable of exception types that the test is
+ expected to raise. If one of these errors is raised by the test, it
+ will be trapped. Raising any other kind of error will fail the test.
+ If L{None} is passed, then all errors will be trapped.
+ """
+ self.reason = reason
+ self.errors = errors
+
+
+ def __repr__(self):
+ return "<Todo reason=%r errors=%r>" % (self.reason, self.errors)
+
+
+ def expected(self, failure):
+ """
+ @param failure: A L{twisted.python.failure.Failure}.
+
+ @return: C{True} if C{failure} is expected, C{False} otherwise.
+ """
+ if self.errors is None:
+ return True
+ for error in self.errors:
+ if failure.check(error):
+ return True
+ return False
+
+
+
+def makeTodo(value):
+ """
+ Return a L{Todo} object built from C{value}.
+
+ If C{value} is a string, return a Todo that expects any exception with
+ C{value} as a reason. If C{value} is a tuple, the second element is used
+ as the reason and the first element as the excepted error(s).
+
+ @param value: A string or a tuple of C{(errors, reason)}, where C{errors}
+ is either a single exception class or an iterable of exception classes.
+
+ @return: A L{Todo} object.
+ """
+ if isinstance(value, str):
+ return Todo(reason=value)
+ if isinstance(value, tuple):
+ errors, reason = value
+ try:
+ errors = list(errors)
+ except TypeError:
+ errors = [errors]
+ return Todo(reason=reason, errors=errors)
+
+
+
+class _Warning(object):
+ """
+ A L{_Warning} instance represents one warning emitted through the Python
+ warning system (L{warnings}). This is used to insulate callers of
+ L{_collectWarnings} from changes to the Python warnings system which might
+ otherwise require changes to the warning objects that function passes to
+ the observer object it accepts.
+
+ @ivar message: The string which was passed as the message parameter to
+ L{warnings.warn}.
+
+ @ivar category: The L{Warning} subclass which was passed as the category
+ parameter to L{warnings.warn}.
+
+ @ivar filename: The name of the file containing the definition of the code
+ object which was C{stacklevel} frames above the call to
+ L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel}
+ parameter passed to L{warnings.warn}.
+
+ @ivar lineno: The source line associated with the active instruction of the
+ code object object which was C{stacklevel} frames above the call to
+ L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel}
+ parameter passed to L{warnings.warn}.
+ """
+ def __init__(self, message, category, filename, lineno):
+ self.message = message
+ self.category = category
+ self.filename = filename
+ self.lineno = lineno
+
+
+
+def _setWarningRegistryToNone(modules):
+ """
+ Disable the per-module cache for every module found in C{modules}, typically
+ C{sys.modules}.
+
+ @param modules: Dictionary of modules, typically sys.module dict
+ """
+ for v in list(modules.values()):
+ if v is not None:
+ try:
+ v.__warningregistry__ = None
+ except:
+ # Don't specify a particular exception type to handle in case
+ # some wacky object raises some wacky exception in response to
+ # the setattr attempt.
+ pass
+
+
+
+def _collectWarnings(observeWarning, f, *args, **kwargs):
+ """
+ Call C{f} with C{args} positional arguments and C{kwargs} keyword arguments
+ and collect all warnings which are emitted as a result in a list.
+
+ @param observeWarning: A callable which will be invoked with a L{_Warning}
+ instance each time a warning is emitted.
+
+ @return: The return value of C{f(*args, **kwargs)}.
+ """
+ def showWarning(message, category, filename, lineno, file=None, line=None):
+ assert isinstance(message, Warning)
+ observeWarning(_Warning(
+ str(message), category, filename, lineno))
+
+ # Disable the per-module cache for every module otherwise if the warning
+ # which the caller is expecting us to collect was already emitted it won't
+ # be re-emitted by the call to f which happens below.
+ _setWarningRegistryToNone(sys.modules)
+
+ origFilters = warnings.filters[:]
+ origShow = warnings.showwarning
+ warnings.simplefilter('always')
+ try:
+ warnings.showwarning = showWarning
+ result = f(*args, **kwargs)
+ finally:
+ warnings.filters[:] = origFilters
+ warnings.showwarning = origShow
+ return result
+
+
+
+class UnsupportedTrialFeature(Exception):
+ """A feature of twisted.trial was used that pyunit cannot support."""
+
+
+
+class PyUnitResultAdapter(object):
+ """
+ Wrap a C{TestResult} from the standard library's C{unittest} so that it
+ supports the extended result types from Trial, and also supports
+ L{twisted.python.failure.Failure}s being passed to L{addError} and
+ L{addFailure}.
+ """
+
+ def __init__(self, original):
+ """
+ @param original: A C{TestResult} instance from C{unittest}.
+ """
+ self.original = original
+
+
+ def _exc_info(self, err):
+ return util.excInfoOrFailureToExcInfo(err)
+
+
+ def startTest(self, method):
+ self.original.startTest(method)
+
+
+ def stopTest(self, method):
+ self.original.stopTest(method)
+
+
+ def addFailure(self, test, fail):
+ self.original.addFailure(test, self._exc_info(fail))
+
+
+ def addError(self, test, error):
+ self.original.addError(test, self._exc_info(error))
+
+
+ def _unsupported(self, test, feature, info):
+ self.original.addFailure(
+ test,
+ (UnsupportedTrialFeature,
+ UnsupportedTrialFeature(feature, info),
+ None))
+
+
+ def addSkip(self, test, reason):
+ """
+ Report the skip as a failure.
+ """
+ self.original.addSkip(test, reason)
+
+
+ def addUnexpectedSuccess(self, test, todo=None):
+ """
+ Report the unexpected success as a failure.
+ """
+ self._unsupported(test, 'unexpected success', todo)
+
+
+ def addExpectedFailure(self, test, error):
+ """
+ Report the expected failure (i.e. todo) as a failure.
+ """
+ self._unsupported(test, 'expected failure', error)
+
+
+ def addSuccess(self, test):
+ self.original.addSuccess(test)
+
+
+ def upDownError(self, method, error, warn, printStatus):
+ pass
+
+
+
+class _AssertRaisesContext(object):
+ """
+ A helper for implementing C{assertRaises}. This is a context manager and a
+ helper method to support the non-context manager version of
+ C{assertRaises}.
+
+ @ivar _testCase: See C{testCase} parameter of C{__init__}
+
+ @ivar _expected: See C{expected} parameter of C{__init__}
+
+ @ivar _returnValue: The value returned by the callable being tested (only
+ when not being used as a context manager).
+
+ @ivar _expectedName: A short string describing the expected exception
+ (usually the name of the exception class).
+
+ @ivar exception: The exception which was raised by the function being
+ tested (if it raised one).
+ """
+
+ def __init__(self, testCase, expected):
+ """
+ @param testCase: The L{TestCase} instance which is used to raise a
+ test-failing exception when that is necessary.
+
+ @param expected: The exception type expected to be raised.
+ """
+ self._testCase = testCase
+ self._expected = expected
+ self._returnValue = None
+ try:
+ self._expectedName = self._expected.__name__
+ except AttributeError:
+ self._expectedName = str(self._expected)
+
+
+ def _handle(self, obj):
+ """
+ Call the given object using this object as a context manager.
+
+ @param obj: The object to call and which is expected to raise some
+ exception.
+ @type obj: L{object}
+
+ @return: Whatever exception is raised by C{obj()}.
+ @rtype: L{BaseException}
+ """
+ with self as context:
+ self._returnValue = obj()
+ return context.exception
+
+
+ def __enter__(self):
+ return self
+
+
+ def __exit__(self, exceptionType, exceptionValue, traceback):
+ """
+ Check exit exception against expected exception.
+ """
+ # No exception raised.
+ if exceptionType is None:
+ self._testCase.fail(
+ "{0} not raised ({1} returned)".format(
+ self._expectedName, self._returnValue)
+ )
+
+ if not isinstance(exceptionValue, exceptionType):
+ # Support some Python 2.6 ridiculousness. Exceptions raised using
+ # the C API appear here as the arguments you might pass to the
+ # exception class to create an exception instance. So... do that
+ # to turn them into the instances.
+ if isinstance(exceptionValue, tuple):
+ exceptionValue = exceptionType(*exceptionValue)
+ else:
+ exceptionValue = exceptionType(exceptionValue)
+
+ # Store exception so that it can be access from context.
+ self.exception = exceptionValue
+
+ # Wrong exception raised.
+ if not issubclass(exceptionType, self._expected):
+ reason = failure.Failure(exceptionValue, exceptionType, traceback)
+ self._testCase.fail(
+ "{0} raised instead of {1}:\n {2}".format(
+ fullyQualifiedName(exceptionType),
+ self._expectedName, reason.getTraceback()),
+ )
+
+ # All good.
+ return True
+
+
+
+class _Assertions(pyunit.TestCase, object):
+ """
+ Replaces many of the built-in TestCase assertions. In general, these
+ assertions provide better error messages and are easier to use in
+ callbacks.
+ """
+
+ def fail(self, msg=None):
+ """
+ Absolutely fail the test. Do not pass go, do not collect $200.
+
+ @param msg: the message that will be displayed as the reason for the
+ failure
+ """
+ raise self.failureException(msg)
+
+
+ def assertFalse(self, condition, msg=None):
+ """
+ Fail the test if C{condition} evaluates to True.
+
+ @param condition: any object that defines __nonzero__
+ """
+ super(_Assertions, self).assertFalse(condition, msg)
+ return condition
+ assertNot = failUnlessFalse = failIf = assertFalse
+
+
+ def assertTrue(self, condition, msg=None):
+ """
+ Fail the test if C{condition} evaluates to False.
+
+ @param condition: any object that defines __nonzero__
+ """
+ super(_Assertions, self).assertTrue(condition, msg)
+ return condition
+ assert_ = failUnlessTrue = failUnless = assertTrue
+
+
+ def assertRaises(self, exception, f=None, *args, **kwargs):
+ """
+ Fail the test unless calling the function C{f} with the given
+ C{args} and C{kwargs} raises C{exception}. The failure will report
+ the traceback and call stack of the unexpected exception.
+
+ @param exception: exception type that is to be expected
+ @param f: the function to call
+
+ @return: If C{f} is L{None}, a context manager which will make an
+ assertion about the exception raised from the suite it manages. If
+ C{f} is not L{None}, the exception raised by C{f}.
+
+ @raise self.failureException: Raised if the function call does
+ not raise an exception or if it raises an exception of a
+ different type.
+ """
+ context = _AssertRaisesContext(self, exception)
+ if f is None:
+ return context
+
+ return context._handle(lambda: f(*args, **kwargs))
+ failUnlessRaises = assertRaises
+
+
+ def assertEqual(self, first, second, msg=None):
+ """
+ Fail the test if C{first} and C{second} are not equal.
+
+ @param msg: A string describing the failure that's included in the
+ exception.
+ """
+ super(_Assertions, self).assertEqual(first, second, msg)
+ return first
+ failUnlessEqual = failUnlessEquals = assertEquals = assertEqual
+
+
+ def assertIs(self, first, second, msg=None):
+ """
+ Fail the test if C{first} is not C{second}. This is an
+ obect-identity-equality test, not an object equality
+ (i.e. C{__eq__}) test.
+
+ @param msg: if msg is None, then the failure message will be
+ '%r is not %r' % (first, second)
+ """
+ if first is not second:
+ raise self.failureException(msg or '%r is not %r' % (first, second))
+ return first
+ failUnlessIdentical = assertIdentical = assertIs
+
+
+ def assertIsNot(self, first, second, msg=None):
+ """
+ Fail the test if C{first} is C{second}. This is an
+ obect-identity-equality test, not an object equality
+ (i.e. C{__eq__}) test.
+
+ @param msg: if msg is None, then the failure message will be
+ '%r is %r' % (first, second)
+ """
+ if first is second:
+ raise self.failureException(msg or '%r is %r' % (first, second))
+ return first
+ failIfIdentical = assertNotIdentical = assertIsNot
+
+
+ def assertNotEqual(self, first, second, msg=None):
+ """
+ Fail the test if C{first} == C{second}.
+
+ @param msg: if msg is None, then the failure message will be
+ '%r == %r' % (first, second)
+ """
+ if not first != second:
+ raise self.failureException(msg or '%r == %r' % (first, second))
+ return first
+ assertNotEquals = failIfEquals = failIfEqual = assertNotEqual
+
+
+ def assertIn(self, containee, container, msg=None):
+ """
+ Fail the test if C{containee} is not found in C{container}.
+
+ @param containee: the value that should be in C{container}
+ @param container: a sequence type, or in the case of a mapping type,
+ will follow semantics of 'if key in dict.keys()'
+ @param msg: if msg is None, then the failure message will be
+ '%r not in %r' % (first, second)
+ """
+ if containee not in container:
+ raise self.failureException(msg or "%r not in %r"
+ % (containee, container))
+ return containee
+ failUnlessIn = assertIn
+
+
+ def assertNotIn(self, containee, container, msg=None):
+ """
+ Fail the test if C{containee} is found in C{container}.
+
+ @param containee: the value that should not be in C{container}
+ @param container: a sequence type, or in the case of a mapping type,
+ will follow semantics of 'if key in dict.keys()'
+ @param msg: if msg is None, then the failure message will be
+ '%r in %r' % (first, second)
+ """
+ if containee in container:
+ raise self.failureException(msg or "%r in %r"
+ % (containee, container))
+ return containee
+ failIfIn = assertNotIn
+
+
+ def assertNotAlmostEqual(self, first, second, places=7, msg=None):
+ """
+ Fail if the two objects are equal as determined by their
+ difference rounded to the given number of decimal places
+ (default 7) and comparing to zero.
+
+ @note: decimal places (from zero) is usually not the same
+ as significant digits (measured from the most
+ significant digit).
+
+ @note: included for compatibility with PyUnit test cases
+ """
+ if round(second-first, places) == 0:
+ raise self.failureException(msg or '%r == %r within %r places'
+ % (first, second, places))
+ return first
+ assertNotAlmostEquals = failIfAlmostEqual = assertNotAlmostEqual
+ failIfAlmostEquals = assertNotAlmostEqual
+
+
+ def assertAlmostEqual(self, first, second, places=7, msg=None):
+ """
+ Fail if the two objects are unequal as determined by their
+ difference rounded to the given number of decimal places
+ (default 7) and comparing to zero.
+
+ @note: decimal places (from zero) is usually not the same
+ as significant digits (measured from the most
+ significant digit).
+
+ @note: included for compatibility with PyUnit test cases
+ """
+ if round(second-first, places) != 0:
+ raise self.failureException(msg or '%r != %r within %r places'
+ % (first, second, places))
+ return first
+ assertAlmostEquals = failUnlessAlmostEqual = assertAlmostEqual
+ failUnlessAlmostEquals = assertAlmostEqual
+
+
+ def assertApproximates(self, first, second, tolerance, msg=None):
+ """
+ Fail if C{first} - C{second} > C{tolerance}
+
+ @param msg: if msg is None, then the failure message will be
+ '%r ~== %r' % (first, second)
+ """
+ if abs(first - second) > tolerance:
+ raise self.failureException(msg or "%s ~== %s" % (first, second))
+ return first
+ failUnlessApproximates = assertApproximates
+
+
+ def assertSubstring(self, substring, astring, msg=None):
+ """
+ Fail if C{substring} does not exist within C{astring}.
+ """
+ return self.failUnlessIn(substring, astring, msg)
+ failUnlessSubstring = assertSubstring
+
+
+ def assertNotSubstring(self, substring, astring, msg=None):
+ """
+ Fail if C{astring} contains C{substring}.
+ """
+ return self.failIfIn(substring, astring, msg)
+ failIfSubstring = assertNotSubstring
+
+
+ def assertWarns(self, category, message, filename, f,
+ *args, **kwargs):
+ """
+ Fail if the given function doesn't generate the specified warning when
+ called. It calls the function, checks the warning, and forwards the
+ result of the function if everything is fine.
+
+ @param category: the category of the warning to check.
+ @param message: the output message of the warning to check.
+ @param filename: the filename where the warning should come from.
+ @param f: the function which is supposed to generate the warning.
+ @type f: any callable.
+ @param args: the arguments to C{f}.
+ @param kwargs: the keywords arguments to C{f}.
+
+ @return: the result of the original function C{f}.
+ """
+ warningsShown = []
+ result = _collectWarnings(warningsShown.append, f, *args, **kwargs)
+
+ if not warningsShown:
+ self.fail("No warnings emitted")
+ first = warningsShown[0]
+ for other in warningsShown[1:]:
+ if ((other.message, other.category)
+ != (first.message, first.category)):
+ self.fail("Can't handle different warnings")
+ self.assertEqual(first.message, message)
+ self.assertIdentical(first.category, category)
+
+ # Use starts with because of .pyc/.pyo issues.
+ self.assertTrue(
+ filename.startswith(first.filename),
+ 'Warning in %r, expected %r' % (first.filename, filename))
+
+ # It would be nice to be able to check the line number as well, but
+ # different configurations actually end up reporting different line
+ # numbers (generally the variation is only 1 line, but that's enough
+ # to fail the test erroneously...).
+ # self.assertEqual(lineno, xxx)
+
+ return result
+ failUnlessWarns = assertWarns
+
+
+ def assertIsInstance(self, instance, classOrTuple, message=None):
+ """
+ Fail if C{instance} is not an instance of the given class or of
+ one of the given classes.
+
+ @param instance: the object to test the type (first argument of the
+ C{isinstance} call).
+ @type instance: any.
+ @param classOrTuple: the class or classes to test against (second
+ argument of the C{isinstance} call).
+ @type classOrTuple: class, type, or tuple.
+
+ @param message: Custom text to include in the exception text if the
+ assertion fails.
+ """
+ if not isinstance(instance, classOrTuple):
+ if message is None:
+ suffix = ""
+ else:
+ suffix = ": " + message
+ self.fail("%r is not an instance of %s%s" % (
+ instance, classOrTuple, suffix))
+ failUnlessIsInstance = assertIsInstance
+
+
+ def assertNotIsInstance(self, instance, classOrTuple):
+ """
+ Fail if C{instance} is an instance of the given class or of one of the
+ given classes.
+
+ @param instance: the object to test the type (first argument of the
+ C{isinstance} call).
+ @type instance: any.
+ @param classOrTuple: the class or classes to test against (second
+ argument of the C{isinstance} call).
+ @type classOrTuple: class, type, or tuple.
+ """
+ if isinstance(instance, classOrTuple):
+ self.fail("%r is an instance of %s" % (instance, classOrTuple))
+ failIfIsInstance = assertNotIsInstance
+
+
+ def successResultOf(self, deferred):
+ """
+ Return the current success result of C{deferred} or raise
+ C{self.failureException}.
+
+ @param deferred: A L{Deferred<twisted.internet.defer.Deferred>} which
+ has a success result. This means
+ L{Deferred.callback<twisted.internet.defer.Deferred.callback>} or
+ L{Deferred.errback<twisted.internet.defer.Deferred.errback>} has
+ been called on it and it has reached the end of its callback chain
+ and the last callback or errback returned a non-L{failure.Failure}.
+ @type deferred: L{Deferred<twisted.internet.defer.Deferred>}
+
+ @raise SynchronousTestCase.failureException: If the
+ L{Deferred<twisted.internet.defer.Deferred>} has no result or has a
+ failure result.
+
+ @return: The result of C{deferred}.
+ """
+ deferred = ensureDeferred(deferred)
+ result = []
+ deferred.addBoth(result.append)
+
+ if not result:
+ self.fail(
+ "Success result expected on {!r}, found no result instead"
+ .format(deferred)
+ )
+
+ result = result[0]
+
+ if isinstance(result, failure.Failure):
+ self.fail(
+ "Success result expected on {!r}, "
+ "found failure result instead:\n{}"
+ .format(deferred, result.getTraceback())
+ )
+
+ return result
+
+
+ def failureResultOf(self, deferred, *expectedExceptionTypes):
+ """
+ Return the current failure result of C{deferred} or raise
+ C{self.failureException}.
+
+ @param deferred: A L{Deferred<twisted.internet.defer.Deferred>} which
+ has a failure result. This means
+ L{Deferred.callback<twisted.internet.defer.Deferred.callback>} or
+ L{Deferred.errback<twisted.internet.defer.Deferred.errback>} has
+ been called on it and it has reached the end of its callback chain
+ and the last callback or errback raised an exception or returned a
+ L{failure.Failure}.
+ @type deferred: L{Deferred<twisted.internet.defer.Deferred>}
+
+ @param expectedExceptionTypes: Exception types to expect - if
+ provided, and the exception wrapped by the failure result is
+ not one of the types provided, then this test will fail.
+
+ @raise SynchronousTestCase.failureException: If the
+ L{Deferred<twisted.internet.defer.Deferred>} has no result, has a
+ success result, or has an unexpected failure result.
+
+ @return: The failure result of C{deferred}.
+ @rtype: L{failure.Failure}
+ """
+ deferred = ensureDeferred(deferred)
+ result = []
+ deferred.addBoth(result.append)
+
+ if not result:
+ self.fail(
+ "Failure result expected on {!r}, found no result instead"
+ .format(deferred)
+ )
+
+ result = result[0]
+
+ if not isinstance(result, failure.Failure):
+ self.fail(
+ "Failure result expected on {!r}, "
+ "found success result ({!r}) instead"
+ .format(deferred, result)
+ )
+
+ if (
+ expectedExceptionTypes and
+ not result.check(*expectedExceptionTypes)
+ ):
+ expectedString = " or ".join([
+ ".".join((t.__module__, t.__name__))
+ for t in expectedExceptionTypes
+ ])
+
+ self.fail(
+ "Failure of type ({}) expected on {!r}, "
+ "found type {!r} instead: {}"
+ .format(
+ expectedString, deferred, result.type,
+ result.getTraceback()
+ )
+ )
+
+ return result
+
+
+ def assertNoResult(self, deferred):
+ """
+ Assert that C{deferred} does not have a result at this point.
+
+ If the assertion succeeds, then the result of C{deferred} is left
+ unchanged. Otherwise, any L{failure.Failure} result is swallowed.
+
+ @param deferred: A L{Deferred<twisted.internet.defer.Deferred>} without
+ a result. This means that neither
+ L{Deferred.callback<twisted.internet.defer.Deferred.callback>} nor
+ L{Deferred.errback<twisted.internet.defer.Deferred.errback>} has
+ been called, or that the
+ L{Deferred<twisted.internet.defer.Deferred>} is waiting on another
+ L{Deferred<twisted.internet.defer.Deferred>} for a result.
+ @type deferred: L{Deferred<twisted.internet.defer.Deferred>}
+
+ @raise SynchronousTestCase.failureException: If the
+ L{Deferred<twisted.internet.defer.Deferred>} has a result.
+ """
+ deferred = ensureDeferred(deferred)
+ result = []
+
+ def cb(res):
+ result.append(res)
+ return res
+
+ deferred.addBoth(cb)
+
+ if result:
+ # If there is already a failure, the self.fail below will
+ # report it, so swallow it in the deferred
+ deferred.addErrback(lambda _: None)
+ self.fail(
+ "No result expected on {!r}, found {!r} instead"
+ .format(deferred, result[0])
+ )
+
+
+ def assertRegex(self, text, regex, msg=None):
+ """
+ Fail the test if a C{regexp} search of C{text} fails.
+
+ @param text: Text which is under test.
+ @type text: L{str}
+
+ @param regex: A regular expression object or a string containing a
+ regular expression suitable for use by re.search().
+ @type regex: L{str} or L{re.RegexObject}
+
+ @param msg: Text used as the error message on failure.
+ @type msg: L{str}
+ """
+ if sys.version_info[:2] > (2, 7):
+ super(_Assertions, self).assertRegex(text, regex, msg)
+ else:
+ # Python 2.7 has unittest.assertRegexpMatches() which was
+ # renamed to unittest.assertRegex() in Python 3.2
+ super(_Assertions, self).assertRegexpMatches(text, regex, msg)
+
+
+
+class _LogObserver(object):
+ """
+ Observes the Twisted logs and catches any errors.
+
+ @ivar _errors: A C{list} of L{Failure} instances which were received as
+ error events from the Twisted logging system.
+
+ @ivar _added: A C{int} giving the number of times C{_add} has been called
+ less the number of times C{_remove} has been called; used to only add
+ this observer to the Twisted logging since once, regardless of the
+ number of calls to the add method.
+
+ @ivar _ignored: A C{list} of exception types which will not be recorded.
+ """
+
+ def __init__(self):
+ self._errors = []
+ self._added = 0
+ self._ignored = []
+
+
+ def _add(self):
+ if self._added == 0:
+ log.addObserver(self.gotEvent)
+ self._added += 1
+
+
+ def _remove(self):
+ self._added -= 1
+ if self._added == 0:
+ log.removeObserver(self.gotEvent)
+
+
+ def _ignoreErrors(self, *errorTypes):
+ """
+ Do not store any errors with any of the given types.
+ """
+ self._ignored.extend(errorTypes)
+
+
+ def _clearIgnores(self):
+ """
+ Stop ignoring any errors we might currently be ignoring.
+ """
+ self._ignored = []
+
+
+ def flushErrors(self, *errorTypes):
+ """
+ Flush errors from the list of caught errors. If no arguments are
+ specified, remove all errors. If arguments are specified, only remove
+ errors of those types from the stored list.
+ """
+ if errorTypes:
+ flushed = []
+ remainder = []
+ for f in self._errors:
+ if f.check(*errorTypes):
+ flushed.append(f)
+ else:
+ remainder.append(f)
+ self._errors = remainder
+ else:
+ flushed = self._errors
+ self._errors = []
+ return flushed
+
+
+ def getErrors(self):
+ """
+ Return a list of errors caught by this observer.
+ """
+ return self._errors
+
+
+ def gotEvent(self, event):
+ """
+ The actual observer method. Called whenever a message is logged.
+
+ @param event: A dictionary containing the log message. Actual
+ structure undocumented (see source for L{twisted.python.log}).
+ """
+ if event.get('isError', False) and 'failure' in event:
+ f = event['failure']
+ if len(self._ignored) == 0 or not f.check(*self._ignored):
+ self._errors.append(f)
+
+
+
+_logObserver = _LogObserver()
+
+
+class SynchronousTestCase(_Assertions):
+ """
+ A unit test. The atom of the unit testing universe.
+
+ This class extends C{unittest.TestCase} from the standard library. A number
+ of convenient testing helpers are added, including logging and warning
+ integration, monkey-patching support, and more.
+
+ To write a unit test, subclass C{SynchronousTestCase} and define a method
+ (say, 'test_foo') on the subclass. To run the test, instantiate your
+ subclass with the name of the method, and call L{run} on the instance,
+ passing a L{TestResult} object.
+
+ The C{trial} script will automatically find any C{SynchronousTestCase}
+ subclasses defined in modules beginning with 'test_' and construct test
+ cases for all methods beginning with 'test'.
+
+ If an error is logged during the test run, the test will fail with an
+ error. See L{log.err}.
+
+ @ivar failureException: An exception class, defaulting to C{FailTest}. If
+ the test method raises this exception, it will be reported as a failure,
+ rather than an exception. All of the assertion methods raise this if the
+ assertion fails.
+
+ @ivar skip: L{None} or a string explaining why this test is to be
+ skipped. If defined, the test will not be run. Instead, it will be
+ reported to the result object as 'skipped' (if the C{TestResult} supports
+ skipping).
+
+ @ivar todo: L{None}, a string or a tuple of C{(errors, reason)} where
+ C{errors} is either an exception class or an iterable of exception
+ classes, and C{reason} is a string. See L{Todo} or L{makeTodo} for more
+ information.
+
+ @ivar suppress: L{None} or a list of tuples of C{(args, kwargs)} to be
+ passed to C{warnings.filterwarnings}. Use these to suppress warnings
+ raised in a test. Useful for testing deprecated code. See also
+ L{util.suppress}.
+ """
+ failureException = FailTest
+
+ def __init__(self, methodName='runTest'):
+ super(SynchronousTestCase, self).__init__(methodName)
+ self._passed = False
+ self._cleanups = []
+ self._testMethodName = methodName
+ testMethod = getattr(self, methodName)
+ self._parents = [
+ testMethod, self, sys.modules.get(self.__class__.__module__)]
+
+
+ def __eq__(self, other):
+ """
+ Override the comparison defined by the base TestCase which considers
+ instances of the same class with the same _testMethodName to be
+ equal. Since trial puts TestCase instances into a set, that
+ definition of comparison makes it impossible to run the same test
+ method twice. Most likely, trial should stop using a set to hold
+ tests, but until it does, this is necessary on Python 2.6. -exarkun
+ """
+ return self is other
+
+
+ def __ne__(self, other):
+ return self is not other
+
+
+ def __hash__(self):
+ return hash((self.__class__, self._testMethodName))
+
+
+ def shortDescription(self):
+ desc = super(SynchronousTestCase, self).shortDescription()
+ if desc is None:
+ return self._testMethodName
+ return desc
+
+
+ def getSkip(self):
+ """
+ Return the skip reason set on this test, if any is set. Checks on the
+ instance first, then the class, then the module, then packages. As
+ soon as it finds something with a C{skip} attribute, returns that.
+ Returns L{None} if it cannot find anything. See L{TestCase} docstring
+ for more details.
+ """
+ return util.acquireAttribute(self._parents, 'skip', None)
+
+
+ def getTodo(self):
+ """
+ Return a L{Todo} object if the test is marked todo. Checks on the
+ instance first, then the class, then the module, then packages. As
+ soon as it finds something with a C{todo} attribute, returns that.
+ Returns L{None} if it cannot find anything. See L{TestCase} docstring
+ for more details.
+ """
+ todo = util.acquireAttribute(self._parents, 'todo', None)
+ if todo is None:
+ return None
+ return makeTodo(todo)
+
+
+ def runTest(self):
+ """
+ If no C{methodName} argument is passed to the constructor, L{run} will
+ treat this method as the thing with the actual test inside.
+ """
+
+
+ def run(self, result):
+ """
+ Run the test case, storing the results in C{result}.
+
+ First runs C{setUp} on self, then runs the test method (defined in the
+ constructor), then runs C{tearDown}. As with the standard library
+ L{unittest.TestCase}, the return value of these methods is disregarded.
+ In particular, returning a L{Deferred<twisted.internet.defer.Deferred>}
+ has no special additional consequences.
+
+ @param result: A L{TestResult} object.
+ """
+ log.msg("--> %s <--" % (self.id()))
+ new_result = itrial.IReporter(result, None)
+ if new_result is None:
+ result = PyUnitResultAdapter(result)
+ else:
+ result = new_result
+ result.startTest(self)
+ if self.getSkip(): # don't run test methods that are marked as .skip
+ result.addSkip(self, self.getSkip())
+ result.stopTest(self)
+ return
+
+ self._passed = False
+ self._warnings = []
+
+ self._installObserver()
+ # All the code inside _runFixturesAndTest will be run such that warnings
+ # emitted by it will be collected and retrievable by flushWarnings.
+ _collectWarnings(self._warnings.append, self._runFixturesAndTest, result)
+
+ # Any collected warnings which the test method didn't flush get
+ # re-emitted so they'll be logged or show up on stdout or whatever.
+ for w in self.flushWarnings():
+ try:
+ warnings.warn_explicit(**w)
+ except:
+ result.addError(self, failure.Failure())
+
+ result.stopTest(self)
+
+
+ def addCleanup(self, f, *args, **kwargs):
+ """
+ Add the given function to a list of functions to be called after the
+ test has run, but before C{tearDown}.
+
+ Functions will be run in reverse order of being added. This helps
+ ensure that tear down complements set up.
+
+ As with all aspects of L{SynchronousTestCase}, Deferreds are not
+ supported in cleanup functions.
+ """
+ self._cleanups.append((f, args, kwargs))
+
+
+ def patch(self, obj, attribute, value):
+ """
+ Monkey patch an object for the duration of the test.
+
+ The monkey patch will be reverted at the end of the test using the
+ L{addCleanup} mechanism.
+
+ The L{monkey.MonkeyPatcher} is returned so that users can restore and
+ re-apply the monkey patch within their tests.
+
+ @param obj: The object to monkey patch.
+ @param attribute: The name of the attribute to change.
+ @param value: The value to set the attribute to.
+ @return: A L{monkey.MonkeyPatcher} object.
+ """
+ monkeyPatch = monkey.MonkeyPatcher((obj, attribute, value))
+ monkeyPatch.patch()
+ self.addCleanup(monkeyPatch.restore)
+ return monkeyPatch
+
+
+ def flushLoggedErrors(self, *errorTypes):
+ """
+ Remove stored errors received from the log.
+
+ C{TestCase} stores each error logged during the run of the test and
+ reports them as errors during the cleanup phase (after C{tearDown}).
+
+ @param *errorTypes: If unspecified, flush all errors. Otherwise, only
+ flush errors that match the given types.
+
+ @return: A list of failures that have been removed.
+ """
+ return self._observer.flushErrors(*errorTypes)
+
+
+ def flushWarnings(self, offendingFunctions=None):
+ """
+ Remove stored warnings from the list of captured warnings and return
+ them.
+
+ @param offendingFunctions: If L{None}, all warnings issued during the
+ currently running test will be flushed. Otherwise, only warnings
+ which I{point} to a function included in this list will be flushed.
+ All warnings include a filename and source line number; if these
+ parts of a warning point to a source line which is part of a
+ function, then the warning I{points} to that function.
+ @type offendingFunctions: L{None} or L{list} of functions or methods.
+
+ @raise ValueError: If C{offendingFunctions} is not L{None} and includes
+ an object which is not a L{types.FunctionType} or
+ L{types.MethodType} instance.
+
+ @return: A C{list}, each element of which is a C{dict} giving
+ information about one warning which was flushed by this call. The
+ keys of each C{dict} are:
+
+ - C{'message'}: The string which was passed as the I{message}
+ parameter to L{warnings.warn}.
+
+ - C{'category'}: The warning subclass which was passed as the
+ I{category} parameter to L{warnings.warn}.
+
+ - C{'filename'}: The name of the file containing the definition
+ of the code object which was C{stacklevel} frames above the
+ call to L{warnings.warn}, where C{stacklevel} is the value of
+ the C{stacklevel} parameter passed to L{warnings.warn}.
+
+ - C{'lineno'}: The source line associated with the active
+ instruction of the code object object which was C{stacklevel}
+ frames above the call to L{warnings.warn}, where
+ C{stacklevel} is the value of the C{stacklevel} parameter
+ passed to L{warnings.warn}.
+ """
+ if offendingFunctions is None:
+ toFlush = self._warnings[:]
+ self._warnings[:] = []
+ else:
+ toFlush = []
+ for aWarning in self._warnings:
+ for aFunction in offendingFunctions:
+ if not isinstance(aFunction, (
+ types.FunctionType, types.MethodType)):
+ raise ValueError("%r is not a function or method" % (
+ aFunction,))
+
+ # inspect.getabsfile(aFunction) sometimes returns a
+ # filename which disagrees with the filename the warning
+ # system generates. This seems to be because a
+ # function's code object doesn't deal with source files
+ # being renamed. inspect.getabsfile(module) seems
+ # better (or at least agrees with the warning system
+ # more often), and does some normalization for us which
+ # is desirable. inspect.getmodule() is attractive, but
+ # somewhat broken in Python < 2.6. See Python bug 4845.
+ aModule = sys.modules[aFunction.__module__]
+ filename = inspect.getabsfile(aModule)
+
+ if filename != os.path.normcase(aWarning.filename):
+ continue
+ lineStarts = list(_findlinestarts(aFunction.__code__))
+ first = lineStarts[0][1]
+ last = lineStarts[-1][1]
+ if not (first <= aWarning.lineno <= last):
+ continue
+ # The warning points to this function, flush it and move on
+ # to the next warning.
+ toFlush.append(aWarning)
+ break
+ # Remove everything which is being flushed.
+ list(map(self._warnings.remove, toFlush))
+
+ return [
+ {'message': w.message, 'category': w.category,
+ 'filename': w.filename, 'lineno': w.lineno}
+ for w in toFlush]
+
+
+ def callDeprecated(self, version, f, *args, **kwargs):
+ """
+ Call a function that should have been deprecated at a specific version
+ and in favor of a specific alternative, and assert that it was thusly
+ deprecated.
+
+ @param version: A 2-sequence of (since, replacement), where C{since} is
+ a the first L{version<incremental.Version>} that C{f}
+ should have been deprecated since, and C{replacement} is a suggested
+ replacement for the deprecated functionality, as described by
+ L{twisted.python.deprecate.deprecated}. If there is no suggested
+ replacement, this parameter may also be simply a
+ L{version<incremental.Version>} by itself.
+
+ @param f: The deprecated function to call.
+
+ @param args: The arguments to pass to C{f}.
+
+ @param kwargs: The keyword arguments to pass to C{f}.
+
+ @return: Whatever C{f} returns.
+
+ @raise: Whatever C{f} raises. If any exception is
+ raised by C{f}, though, no assertions will be made about emitted
+ deprecations.
+
+ @raise FailTest: if no warnings were emitted by C{f}, or if the
+ L{DeprecationWarning} emitted did not produce the canonical
+ please-use-something-else message that is standard for Twisted
+ deprecations according to the given version and replacement.
+ """
+ result = f(*args, **kwargs)
+ warningsShown = self.flushWarnings([self.callDeprecated])
+ try:
+ info = list(version)
+ except TypeError:
+ since = version
+ replacement = None
+ else:
+ [since, replacement] = info
+
+ if len(warningsShown) == 0:
+ self.fail('%r is not deprecated.' % (f,))
+
+ observedWarning = warningsShown[0]['message']
+ expectedWarning = getDeprecationWarningString(
+ f, since, replacement=replacement)
+ self.assertEqual(expectedWarning, observedWarning)
+
+ return result
+
+
+ def mktemp(self):
+ """
+ Create a new path name which can be used for a new file or directory.
+
+ The result is a relative path that is guaranteed to be unique within the
+ current working directory. The parent of the path will exist, but the
+ path will not.
+
+ For a temporary directory call os.mkdir on the path. For a temporary
+ file just create the file (e.g. by opening the path for writing and then
+ closing it).
+
+ @return: The newly created path
+ @rtype: C{str}
+ """
+ MAX_FILENAME = 32 # some platforms limit lengths of filenames
+ base = os.path.join(self.__class__.__module__[:MAX_FILENAME],
+ self.__class__.__name__[:MAX_FILENAME],
+ self._testMethodName[:MAX_FILENAME])
+ if not os.path.exists(base):
+ os.makedirs(base)
+ dirname = tempfile.mkdtemp('', '', base)
+ return os.path.join(dirname, 'temp')
+
+
+ def _getSuppress(self):
+ """
+ Returns any warning suppressions set for this test. Checks on the
+ instance first, then the class, then the module, then packages. As
+ soon as it finds something with a C{suppress} attribute, returns that.
+ Returns any empty list (i.e. suppress no warnings) if it cannot find
+ anything. See L{TestCase} docstring for more details.
+ """
+ return util.acquireAttribute(self._parents, 'suppress', [])
+
+
+ def _getSkipReason(self, method, skip):
+ """
+ Return the reason to use for skipping a test method.
+
+ @param method: The method which produced the skip.
+ @param skip: A L{unittest.SkipTest} instance raised by C{method}.
+ """
+ if len(skip.args) > 0:
+ return skip.args[0]
+
+ warnAboutFunction(
+ method,
+ "Do not raise unittest.SkipTest with no arguments! Give a reason "
+ "for skipping tests!")
+ return skip
+
+
+ def _run(self, suppress, todo, method, result):
+ """
+ Run a single method, either a test method or fixture.
+
+ @param suppress: Any warnings to suppress, as defined by the C{suppress}
+ attribute on this method, test case, or the module it is defined in.
+
+ @param todo: Any expected failure or failures, as defined by the C{todo}
+ attribute on this method, test case, or the module it is defined in.
+
+ @param method: The method to run.
+
+ @param result: The TestResult instance to which to report results.
+
+ @return: C{True} if the method fails and no further method/fixture calls
+ should be made, C{False} otherwise.
+ """
+ if inspect.isgeneratorfunction(method):
+ exc = TypeError(
+ '%r is a generator function and therefore will never run' % (
+ method,))
+ result.addError(self, failure.Failure(exc))
+ return True
+ try:
+ runWithWarningsSuppressed(suppress, method)
+ except SkipTest as e:
+ result.addSkip(self, self._getSkipReason(method, e))
+ except:
+ reason = failure.Failure()
+ if todo is None or not todo.expected(reason):
+ if reason.check(self.failureException):
+ addResult = result.addFailure
+ else:
+ addResult = result.addError
+ addResult(self, reason)
+ else:
+ result.addExpectedFailure(self, reason, todo)
+ else:
+ return False
+ return True
+
+
+ def _runFixturesAndTest(self, result):
+ """
+ Run C{setUp}, a test method, test cleanups, and C{tearDown}.
+
+ @param result: The TestResult instance to which to report results.
+ """
+ suppress = self._getSuppress()
+ try:
+ if self._run(suppress, None, self.setUp, result):
+ return
+
+ todo = self.getTodo()
+ method = getattr(self, self._testMethodName)
+ failed = self._run(suppress, todo, method, result)
+ finally:
+ self._runCleanups(result)
+
+ if todo and not failed:
+ result.addUnexpectedSuccess(self, todo)
+
+ if self._run(suppress, None, self.tearDown, result):
+ failed = True
+
+ for error in self._observer.getErrors():
+ result.addError(self, error)
+ failed = True
+ self._observer.flushErrors()
+ self._removeObserver()
+
+ if not (failed or todo):
+ result.addSuccess(self)
+
+
+ def _runCleanups(self, result):
+ """
+ Synchronously run any cleanups which have been added.
+ """
+ while len(self._cleanups) > 0:
+ f, args, kwargs = self._cleanups.pop()
+ try:
+ f(*args, **kwargs)
+ except:
+ f = failure.Failure()
+ result.addError(self, f)
+
+
+ def _installObserver(self):
+ self._observer = _logObserver
+ self._observer._add()
+
+
+ def _removeObserver(self):
+ self._observer._remove()
diff --git a/contrib/python/Twisted/py2/twisted/trial/itrial.py b/contrib/python/Twisted/py2/twisted/trial/itrial.py
new file mode 100644
index 0000000000..186b7e5b76
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/itrial.py
@@ -0,0 +1,259 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Interfaces for Trial.
+
+Maintainer: Jonathan Lange
+"""
+
+from __future__ import division, absolute_import
+
+import zope.interface as zi
+from zope.interface import Attribute
+
+
+class ITestCase(zi.Interface):
+ """
+ The interface that a test case must implement in order to be used in Trial.
+ """
+
+ failureException = zi.Attribute(
+ "The exception class that is raised by failed assertions")
+
+
+ def __call__(result):
+ """
+ Run the test. Should always do exactly the same thing as run().
+ """
+
+
+ def countTestCases():
+ """
+ Return the number of tests in this test case. Usually 1.
+ """
+
+
+ def id():
+ """
+ Return a unique identifier for the test, usually the fully-qualified
+ Python name.
+ """
+
+
+ def run(result):
+ """
+ Run the test, storing the results in C{result}.
+
+ @param result: A L{TestResult}.
+ """
+
+
+ def shortDescription():
+ """
+ Return a short description of the test.
+ """
+
+
+
+class IReporter(zi.Interface):
+ """
+ I report results from a run of a test suite.
+ """
+
+ stream = zi.Attribute(
+ "Deprecated in Twisted 8.0. "
+ "The io-stream that this reporter will write to")
+ tbformat = zi.Attribute("Either 'default', 'brief', or 'verbose'")
+ args = zi.Attribute(
+ "Additional string argument passed from the command line")
+ shouldStop = zi.Attribute(
+ """
+ A boolean indicating that this reporter would like the test run to stop.
+ """)
+ separator = Attribute(
+ "Deprecated in Twisted 8.0. "
+ "A value which will occasionally be passed to the L{write} method.")
+ testsRun = Attribute(
+ """
+ The number of tests that seem to have been run according to this
+ reporter.
+ """)
+
+
+ def startTest(method):
+ """
+ Report the beginning of a run of a single test method.
+
+ @param method: an object that is adaptable to ITestMethod
+ """
+
+
+ def stopTest(method):
+ """
+ Report the status of a single test method
+
+ @param method: an object that is adaptable to ITestMethod
+ """
+
+
+ def startSuite(name):
+ """
+ Deprecated in Twisted 8.0.
+
+ Suites which wish to appear in reporter output should call this
+ before running their tests.
+ """
+
+
+ def endSuite(name):
+ """
+ Deprecated in Twisted 8.0.
+
+ Called at the end of a suite, if and only if that suite has called
+ C{startSuite}.
+ """
+
+
+ def cleanupErrors(errs):
+ """
+ Deprecated in Twisted 8.0.
+
+ Called when the reactor has been left in a 'dirty' state
+
+ @param errs: a list of L{twisted.python.failure.Failure}s
+ """
+
+
+ def upDownError(userMeth, warn=True, printStatus=True):
+ """
+ Deprecated in Twisted 8.0.
+
+ Called when an error occurs in a setUp* or tearDown* method
+
+ @param warn: indicates whether or not the reporter should emit a
+ warning about the error
+ @type warn: Boolean
+ @param printStatus: indicates whether or not the reporter should
+ print the name of the method and the status
+ message appropriate for the type of error
+ @type printStatus: Boolean
+ """
+
+
+ def addSuccess(test):
+ """
+ Record that test passed.
+ """
+
+
+ def addError(test, error):
+ """
+ Record that a test has raised an unexpected exception.
+
+ @param test: The test that has raised an error.
+ @param error: The error that the test raised. It will either be a
+ three-tuple in the style of C{sys.exc_info()} or a
+ L{Failure<twisted.python.failure.Failure>} object.
+ """
+
+
+ def addFailure(test, failure):
+ """
+ Record that a test has failed with the given failure.
+
+ @param test: The test that has failed.
+ @param failure: The failure that the test failed with. It will
+ either be a three-tuple in the style of C{sys.exc_info()}
+ or a L{Failure<twisted.python.failure.Failure>} object.
+ """
+
+
+ def addExpectedFailure(test, failure, todo=None):
+ """
+ Record that the given test failed, and was expected to do so.
+
+ In Twisted 15.5 and prior, C{todo} was a mandatory parameter.
+
+ @type test: L{unittest.TestCase}
+ @param test: The test which this is about.
+ @type error: L{failure.Failure}
+ @param error: The error which this test failed with.
+ @type todo: L{unittest.Todo}
+ @param todo: The reason for the test's TODO status. If L{None}, a
+ generic reason is used.
+ """
+
+
+ def addUnexpectedSuccess(test, todo=None):
+ """
+ Record that the given test failed, and was expected to do so.
+
+ In Twisted 15.5 and prior, C{todo} was a mandatory parameter.
+
+ @type test: L{unittest.TestCase}
+ @param test: The test which this is about.
+ @type todo: L{unittest.Todo}
+ @param todo: The reason for the test's TODO status. If L{None}, a
+ generic reason is used.
+ """
+
+
+ def addSkip(test, reason):
+ """
+ Record that a test has been skipped for the given reason.
+
+ @param test: The test that has been skipped.
+ @param reason: An object that the test case has specified as the reason
+ for skipping the test.
+ """
+
+
+ def printSummary():
+ """
+ Deprecated in Twisted 8.0, use L{done} instead.
+
+ Present a summary of the test results.
+ """
+
+
+ def printErrors():
+ """
+ Deprecated in Twisted 8.0, use L{done} instead.
+
+ Present the errors that have occurred during the test run. This method
+ will be called after all tests have been run.
+ """
+
+
+ def write(string):
+ """
+ Deprecated in Twisted 8.0, use L{done} instead.
+
+ Display a string to the user, without appending a new line.
+ """
+
+
+ def writeln(string):
+ """
+ Deprecated in Twisted 8.0, use L{done} instead.
+
+ Display a string to the user, appending a new line.
+ """
+
+ def wasSuccessful():
+ """
+ Return a boolean indicating whether all test results that were reported
+ to this reporter were successful or not.
+ """
+
+
+ def done():
+ """
+ Called when the test run is complete.
+
+ This gives the result object an opportunity to display a summary of
+ information to the user. Once you have called C{done} on an
+ L{IReporter} object, you should assume that the L{IReporter} object is
+ no longer usable.
+ """
diff --git a/contrib/python/Twisted/py2/twisted/trial/reporter.py b/contrib/python/Twisted/py2/twisted/trial/reporter.py
new file mode 100644
index 0000000000..eb48c7b99c
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/reporter.py
@@ -0,0 +1,1322 @@
+# -*- test-case-name: twisted.trial.test.test_reporter -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+#
+# Maintainer: Jonathan Lange
+
+"""
+Defines classes that handle the results of tests.
+"""
+
+from __future__ import division, absolute_import
+
+import sys
+import os
+import time
+import warnings
+import unittest as pyunit
+
+from collections import OrderedDict
+
+from zope.interface import implementer
+
+from twisted.python import reflect, log
+from twisted.python.components import proxyForInterface
+from twisted.python.failure import Failure
+from twisted.python.util import untilConcludes
+from twisted.python.compat import _PY3, items
+from twisted.trial import itrial, util
+
+try:
+ from subunit import TestProtocolClient
+except ImportError:
+ TestProtocolClient = None
+
+
+
+def _makeTodo(value):
+ """
+ Return a L{Todo} object built from C{value}.
+
+ This is a synonym for L{twisted.trial.unittest.makeTodo}, but imported
+ locally to avoid circular imports.
+
+ @param value: A string or a tuple of C{(errors, reason)}, where C{errors}
+ is either a single exception class or an iterable of exception classes.
+
+ @return: A L{Todo} object.
+ """
+ from twisted.trial.unittest import makeTodo
+ return makeTodo(value)
+
+
+
+class BrokenTestCaseWarning(Warning):
+ """
+ Emitted as a warning when an exception occurs in one of setUp or tearDown.
+ """
+
+
+
+class SafeStream(object):
+ """
+ Wraps a stream object so that all C{write} calls are wrapped in
+ L{untilConcludes<twisted.python.util.untilConcludes>}.
+ """
+
+ def __init__(self, original):
+ self.original = original
+
+
+ def __getattr__(self, name):
+ return getattr(self.original, name)
+
+
+ def write(self, *a, **kw):
+ return untilConcludes(self.original.write, *a, **kw)
+
+
+
+@implementer(itrial.IReporter)
+class TestResult(pyunit.TestResult, object):
+ """
+ Accumulates the results of several L{twisted.trial.unittest.TestCase}s.
+
+ @ivar successes: count the number of successes achieved by the test run.
+ @type successes: C{int}
+ """
+
+ # Used when no todo provided to addExpectedFailure or addUnexpectedSuccess.
+ _DEFAULT_TODO = 'Test expected to fail'
+
+ def __init__(self):
+ super(TestResult, self).__init__()
+ self.skips = []
+ self.expectedFailures = []
+ self.unexpectedSuccesses = []
+ self.successes = 0
+ self._timings = []
+
+
+ def __repr__(self):
+ return ('<%s run=%d errors=%d failures=%d todos=%d dones=%d skips=%d>'
+ % (reflect.qual(self.__class__), self.testsRun,
+ len(self.errors), len(self.failures),
+ len(self.expectedFailures), len(self.skips),
+ len(self.unexpectedSuccesses)))
+
+
+ def _getTime(self):
+ return time.time()
+
+
+ def _getFailure(self, error):
+ """
+ Convert a C{sys.exc_info()}-style tuple to a L{Failure}, if necessary.
+ """
+ if isinstance(error, tuple):
+ return Failure(error[1], error[0], error[2])
+ return error
+
+
+ def startTest(self, test):
+ """
+ This must be called before the given test is commenced.
+
+ @type test: L{pyunit.TestCase}
+ """
+ super(TestResult, self).startTest(test)
+ self._testStarted = self._getTime()
+
+
+ def stopTest(self, test):
+ """
+ This must be called after the given test is completed.
+
+ @type test: L{pyunit.TestCase}
+ """
+ super(TestResult, self).stopTest(test)
+ self._lastTime = self._getTime() - self._testStarted
+
+
+ def addFailure(self, test, fail):
+ """
+ Report a failed assertion for the given test.
+
+ @type test: L{pyunit.TestCase}
+ @type fail: L{Failure} or L{tuple}
+ """
+ self.failures.append((test, self._getFailure(fail)))
+
+
+ def addError(self, test, error):
+ """
+ Report an error that occurred while running the given test.
+
+ @type test: L{pyunit.TestCase}
+ @type error: L{Failure} or L{tuple}
+ """
+ self.errors.append((test, self._getFailure(error)))
+
+
+ def addSkip(self, test, reason):
+ """
+ Report that the given test was skipped.
+
+ In Trial, tests can be 'skipped'. Tests are skipped mostly because
+ there is some platform or configuration issue that prevents them from
+ being run correctly.
+
+ @type test: L{pyunit.TestCase}
+ @type reason: L{str}
+ """
+ self.skips.append((test, reason))
+
+
+ def addUnexpectedSuccess(self, test, todo=None):
+ """
+ Report that the given test succeeded against expectations.
+
+ In Trial, tests can be marked 'todo'. That is, they are expected to
+ fail. When a test that is expected to fail instead succeeds, it should
+ call this method to report the unexpected success.
+
+ @type test: L{pyunit.TestCase}
+ @type todo: L{unittest.Todo}, or L{None}, in which case a default todo
+ message is provided.
+ """
+ if todo is None:
+ todo = _makeTodo(self._DEFAULT_TODO)
+ self.unexpectedSuccesses.append((test, todo))
+
+
+ def addExpectedFailure(self, test, error, todo=None):
+ """
+ Report that the given test failed, and was expected to do so.
+
+ In Trial, tests can be marked 'todo'. That is, they are expected to
+ fail.
+
+ @type test: L{pyunit.TestCase}
+ @type error: L{Failure}
+ @type todo: L{unittest.Todo}, or L{None}, in which case a default todo
+ message is provided.
+ """
+ if todo is None:
+ todo = _makeTodo(self._DEFAULT_TODO)
+ self.expectedFailures.append((test, error, todo))
+
+
+ def addSuccess(self, test):
+ """
+ Report that the given test succeeded.
+
+ @type test: L{pyunit.TestCase}
+ """
+ self.successes += 1
+
+
+ def wasSuccessful(self):
+ """
+ Report whether or not this test suite was successful or not.
+
+ The behaviour of this method changed in L{pyunit} in Python 3.4 to
+ fail if there are any errors, failures, or unexpected successes.
+ Previous to 3.4, it was only if there were errors or failures. This
+ method implements the old behaviour for backwards compatibility reasons,
+ checking just for errors and failures.
+
+ @rtype: L{bool}
+ """
+ return len(self.failures) == len(self.errors) == 0
+
+
+ def done(self):
+ """
+ The test suite has finished running.
+ """
+
+
+
+@implementer(itrial.IReporter)
+class TestResultDecorator(proxyForInterface(itrial.IReporter,
+ "_originalReporter")):
+ """
+ Base class for TestResult decorators.
+
+ @ivar _originalReporter: The wrapped instance of reporter.
+ @type _originalReporter: A provider of L{itrial.IReporter}
+ """
+
+
+
+@implementer(itrial.IReporter)
+class UncleanWarningsReporterWrapper(TestResultDecorator):
+ """
+ A wrapper for a reporter that converts L{util.DirtyReactorAggregateError}s
+ to warnings.
+ """
+
+ def addError(self, test, error):
+ """
+ If the error is a L{util.DirtyReactorAggregateError}, instead of
+ reporting it as a normal error, throw a warning.
+ """
+
+ if (isinstance(error, Failure)
+ and error.check(util.DirtyReactorAggregateError)):
+ warnings.warn(error.getErrorMessage())
+ else:
+ self._originalReporter.addError(test, error)
+
+
+
+@implementer(itrial.IReporter)
+class _ExitWrapper(TestResultDecorator):
+ """
+ A wrapper for a reporter that causes the reporter to stop after
+ unsuccessful tests.
+ """
+
+ def addError(self, *args, **kwargs):
+ self.shouldStop = True
+ return self._originalReporter.addError(*args, **kwargs)
+
+
+ def addFailure(self, *args, **kwargs):
+ self.shouldStop = True
+ return self._originalReporter.addFailure(*args, **kwargs)
+
+
+
+class _AdaptedReporter(TestResultDecorator):
+ """
+ TestResult decorator that makes sure that addError only gets tests that
+ have been adapted with a particular test adapter.
+ """
+
+ def __init__(self, original, testAdapter):
+ """
+ Construct an L{_AdaptedReporter}.
+
+ @param original: An {itrial.IReporter}.
+ @param testAdapter: A callable that returns an L{itrial.ITestCase}.
+ """
+ TestResultDecorator.__init__(self, original)
+ self.testAdapter = testAdapter
+
+
+ def addError(self, test, error):
+ """
+ See L{itrial.IReporter}.
+ """
+ test = self.testAdapter(test)
+ return self._originalReporter.addError(test, error)
+
+
+ def addExpectedFailure(self, test, failure, todo=None):
+ """
+ See L{itrial.IReporter}.
+
+ @type test: A L{pyunit.TestCase}.
+ @type failure: A L{failure.Failure} or L{exceptions.AssertionError}
+ @type todo: A L{unittest.Todo} or None
+
+ When C{todo} is L{None} a generic C{unittest.Todo} is built.
+
+ L{pyunit.TestCase}'s C{run()} calls this with 3 positional arguments
+ (without C{todo}).
+ """
+ return self._originalReporter.addExpectedFailure(
+ self.testAdapter(test), failure, todo)
+
+
+ def addFailure(self, test, failure):
+ """
+ See L{itrial.IReporter}.
+ """
+ test = self.testAdapter(test)
+ return self._originalReporter.addFailure(test, failure)
+
+
+ def addSkip(self, test, skip):
+ """
+ See L{itrial.IReporter}.
+ """
+ test = self.testAdapter(test)
+ return self._originalReporter.addSkip(test, skip)
+
+
+ def addUnexpectedSuccess(self, test, todo=None):
+ """
+ See L{itrial.IReporter}.
+
+ @type test: A L{pyunit.TestCase}.
+ @type todo: A L{unittest.Todo} or None
+
+ When C{todo} is L{None} a generic C{unittest.Todo} is built.
+
+ L{pyunit.TestCase}'s C{run()} calls this with 2 positional arguments
+ (without C{todo}).
+ """
+ test = self.testAdapter(test)
+ return self._originalReporter.addUnexpectedSuccess(test, todo)
+
+
+ def startTest(self, test):
+ """
+ See L{itrial.IReporter}.
+ """
+ return self._originalReporter.startTest(self.testAdapter(test))
+
+
+ def stopTest(self, test):
+ """
+ See L{itrial.IReporter}.
+ """
+ return self._originalReporter.stopTest(self.testAdapter(test))
+
+
+
+@implementer(itrial.IReporter)
+class Reporter(TestResult):
+ """
+ A basic L{TestResult} with support for writing to a stream.
+
+ @ivar _startTime: The time when the first test was started. It defaults to
+ L{None}, which means that no test was actually launched.
+ @type _startTime: C{float} or L{None}
+
+ @ivar _warningCache: A C{set} of tuples of warning message (file, line,
+ text, category) which have already been written to the output stream
+ during the currently executing test. This is used to avoid writing
+ duplicates of the same warning to the output stream.
+ @type _warningCache: C{set}
+
+ @ivar _publisher: The log publisher which will be observed for warning
+ events.
+ @type _publisher: L{twisted.python.log.LogPublisher}
+ """
+
+ _separator = '-' * 79
+ _doubleSeparator = '=' * 79
+
+ def __init__(self, stream=sys.stdout, tbformat='default', realtime=False,
+ publisher=None):
+ super(Reporter, self).__init__()
+ self._stream = SafeStream(stream)
+ self.tbformat = tbformat
+ self.realtime = realtime
+ self._startTime = None
+ self._warningCache = set()
+
+ # Start observing log events so as to be able to report warnings.
+ self._publisher = publisher
+ if publisher is not None:
+ publisher.addObserver(self._observeWarnings)
+
+
+ def _observeWarnings(self, event):
+ """
+ Observe warning events and write them to C{self._stream}.
+
+ This method is a log observer which will be registered with
+ C{self._publisher.addObserver}.
+
+ @param event: A C{dict} from the logging system. If it has a
+ C{'warning'} key, a logged warning will be extracted from it and
+ possibly written to C{self.stream}.
+ """
+ if 'warning' in event:
+ key = (event['filename'], event['lineno'],
+ event['category'].split('.')[-1],
+ str(event['warning']))
+ if key not in self._warningCache:
+ self._warningCache.add(key)
+ self._stream.write('%s:%s: %s: %s\n' % key)
+
+
+ def startTest(self, test):
+ """
+ Called when a test begins to run. Records the time when it was first
+ called and resets the warning cache.
+
+ @param test: L{ITestCase}
+ """
+ super(Reporter, self).startTest(test)
+ if self._startTime is None:
+ self._startTime = self._getTime()
+ self._warningCache = set()
+
+
+ def addFailure(self, test, fail):
+ """
+ Called when a test fails. If C{realtime} is set, then it prints the
+ error to the stream.
+
+ @param test: L{ITestCase} that failed.
+ @param fail: L{failure.Failure} containing the error.
+ """
+ super(Reporter, self).addFailure(test, fail)
+ if self.realtime:
+ fail = self.failures[-1][1] # guarantee it's a Failure
+ self._write(self._formatFailureTraceback(fail))
+
+
+ def addError(self, test, error):
+ """
+ Called when a test raises an error. If C{realtime} is set, then it
+ prints the error to the stream.
+
+ @param test: L{ITestCase} that raised the error.
+ @param error: L{failure.Failure} containing the error.
+ """
+ error = self._getFailure(error)
+ super(Reporter, self).addError(test, error)
+ if self.realtime:
+ error = self.errors[-1][1] # guarantee it's a Failure
+ self._write(self._formatFailureTraceback(error))
+
+
+ def _write(self, format, *args):
+ """
+ Safely write to the reporter's stream.
+
+ @param format: A format string to write.
+ @param *args: The arguments for the format string.
+ """
+ s = str(format)
+ assert isinstance(s, type(''))
+ if args:
+ self._stream.write(s % args)
+ else:
+ self._stream.write(s)
+ untilConcludes(self._stream.flush)
+
+
+ def _writeln(self, format, *args):
+ """
+ Safely write a line to the reporter's stream. Newline is appended to
+ the format string.
+
+ @param format: A format string to write.
+ @param *args: The arguments for the format string.
+ """
+ self._write(format, *args)
+ self._write('\n')
+
+
+ def upDownError(self, method, error, warn, printStatus):
+ super(Reporter, self).upDownError(method, error, warn, printStatus)
+ if warn:
+ tbStr = self._formatFailureTraceback(error)
+ log.msg(tbStr)
+ msg = ("caught exception in %s, your TestCase is broken\n\n%s"
+ % (method, tbStr))
+ warnings.warn(msg, BrokenTestCaseWarning, stacklevel=2)
+
+
+ def cleanupErrors(self, errs):
+ super(Reporter, self).cleanupErrors(errs)
+ warnings.warn("%s\n%s" % ("REACTOR UNCLEAN! traceback(s) follow: ",
+ self._formatFailureTraceback(errs)),
+ BrokenTestCaseWarning)
+
+
+ def _trimFrames(self, frames):
+ """
+ Trim frames to remove internal paths.
+
+ When a C{SynchronousTestCase} method fails synchronously, the stack
+ looks like this:
+ - [0]: C{SynchronousTestCase._run}
+ - [1]: C{util.runWithWarningsSuppressed}
+ - [2:-2]: code in the test method which failed
+ - [-1]: C{_synctest.fail}
+
+ When a C{TestCase} method fails synchronously, the stack looks like
+ this:
+ - [0]: C{defer.maybeDeferred}
+ - [1]: C{utils.runWithWarningsSuppressed}
+ - [2]: C{utils.runWithWarningsSuppressed}
+ - [3:-2]: code in the test method which failed
+ - [-1]: C{_synctest.fail}
+
+ When a method fails inside a C{Deferred} (i.e., when the test method
+ returns a C{Deferred}, and that C{Deferred}'s errback fires), the stack
+ captured inside the resulting C{Failure} looks like this:
+ - [0]: C{defer.Deferred._runCallbacks}
+ - [1:-2]: code in the testmethod which failed
+ - [-1]: C{_synctest.fail}
+
+ As a result, we want to trim either [maybeDeferred, runWWS, runWWS] or
+ [Deferred._runCallbacks] or [SynchronousTestCase._run, runWWS] from the
+ front, and trim the [unittest.fail] from the end.
+
+ There is also another case, when the test method is badly defined and
+ contains extra arguments.
+
+ If it doesn't recognize one of these cases, it just returns the
+ original frames.
+
+ @param frames: The C{list} of frames from the test failure.
+
+ @return: The C{list} of frames to display.
+ """
+ newFrames = list(frames)
+
+ if len(frames) < 2:
+ return newFrames
+
+ firstMethod = newFrames[0][0]
+ firstFile = os.path.splitext(os.path.basename(newFrames[0][1]))[0]
+
+ secondMethod = newFrames[1][0]
+ secondFile = os.path.splitext(os.path.basename(newFrames[1][1]))[0]
+
+ syncCase = (("_run", "_synctest"),
+ ("runWithWarningsSuppressed", "util"))
+ asyncCase = (("maybeDeferred", "defer"),
+ ("runWithWarningsSuppressed", "utils"))
+
+ twoFrames = ((firstMethod, firstFile), (secondMethod, secondFile))
+
+ if _PY3:
+ # On PY3, we have an extra frame which is reraising the exception
+ for frame in newFrames:
+ frameFile = os.path.splitext(os.path.basename(frame[1]))[0]
+ if frameFile == "compat" and frame[0] == "reraise":
+ # If it's in the compat module and is reraise, BLAM IT
+ newFrames.pop(newFrames.index(frame))
+
+ if twoFrames == syncCase:
+ newFrames = newFrames[2:]
+ elif twoFrames == asyncCase:
+ newFrames = newFrames[3:]
+ elif (firstMethod, firstFile) == ("_runCallbacks", "defer"):
+ newFrames = newFrames[1:]
+
+ if not newFrames:
+ # The method fails before getting called, probably an argument
+ # problem
+ return newFrames
+
+ last = newFrames[-1]
+ if (last[0].startswith('fail')
+ and os.path.splitext(os.path.basename(last[1]))[0] == '_synctest'):
+ newFrames = newFrames[:-1]
+
+ return newFrames
+
+
+ def _formatFailureTraceback(self, fail):
+ if isinstance(fail, str):
+ return fail.rstrip() + '\n'
+ fail.frames, frames = self._trimFrames(fail.frames), fail.frames
+ result = fail.getTraceback(detail=self.tbformat,
+ elideFrameworkCode=True)
+ fail.frames = frames
+ return result
+
+
+ def _groupResults(self, results, formatter):
+ """
+ Group tests together based on their results.
+
+ @param results: An iterable of tuples of two or more elements. The
+ first element of each tuple is a test case. The remaining
+ elements describe the outcome of that test case.
+
+ @param formatter: A callable which turns a test case result into a
+ string. The elements after the first of the tuples in
+ C{results} will be passed as positional arguments to
+ C{formatter}.
+
+ @return: A C{list} of two-tuples. The first element of each tuple
+ is a unique string describing one result from at least one of
+ the test cases in C{results}. The second element is a list of
+ the test cases which had that result.
+ """
+ groups = OrderedDict()
+ for content in results:
+ case = content[0]
+ outcome = content[1:]
+ key = formatter(*outcome)
+ groups.setdefault(key, []).append(case)
+ return items(groups)
+
+
+ def _printResults(self, flavor, errors, formatter):
+ """
+ Print a group of errors to the stream.
+
+ @param flavor: A string indicating the kind of error (e.g. 'TODO').
+ @param errors: A list of errors, often L{failure.Failure}s, but
+ sometimes 'todo' errors.
+ @param formatter: A callable that knows how to format the errors.
+ """
+ for reason, cases in self._groupResults(errors, formatter):
+ self._writeln(self._doubleSeparator)
+ self._writeln(flavor)
+ self._write(reason)
+ self._writeln('')
+ for case in cases:
+ self._writeln(case.id())
+
+
+ def _printExpectedFailure(self, error, todo):
+ return 'Reason: %r\n%s' % (todo.reason,
+ self._formatFailureTraceback(error))
+
+
+ def _printUnexpectedSuccess(self, todo):
+ ret = 'Reason: %r\n' % (todo.reason,)
+ if todo.errors:
+ ret += 'Expected errors: %s\n' % (', '.join(todo.errors),)
+ return ret
+
+
+ def _printErrors(self):
+ """
+ Print all of the non-success results to the stream in full.
+ """
+ self._write('\n')
+ self._printResults('[SKIPPED]', self.skips, lambda x: '%s\n' % x)
+ self._printResults('[TODO]', self.expectedFailures,
+ self._printExpectedFailure)
+ self._printResults('[FAIL]', self.failures,
+ self._formatFailureTraceback)
+ self._printResults('[ERROR]', self.errors,
+ self._formatFailureTraceback)
+ self._printResults('[SUCCESS!?!]', self.unexpectedSuccesses,
+ self._printUnexpectedSuccess)
+
+
+ def _getSummary(self):
+ """
+ Return a formatted count of tests status results.
+ """
+ summaries = []
+ for stat in ("skips", "expectedFailures", "failures", "errors",
+ "unexpectedSuccesses"):
+ num = len(getattr(self, stat))
+ if num:
+ summaries.append('%s=%d' % (stat, num))
+ if self.successes:
+ summaries.append('successes=%d' % (self.successes,))
+ summary = (summaries and ' (' + ', '.join(summaries) + ')') or ''
+ return summary
+
+
+ def _printSummary(self):
+ """
+ Print a line summarising the test results to the stream.
+ """
+ summary = self._getSummary()
+ if self.wasSuccessful():
+ status = "PASSED"
+ else:
+ status = "FAILED"
+ self._write("%s%s\n", status, summary)
+
+
+ def done(self):
+ """
+ Summarize the result of the test run.
+
+ The summary includes a report of all of the errors, todos, skips and
+ so forth that occurred during the run. It also includes the number of
+ tests that were run and how long it took to run them (not including
+ load time).
+
+ Expects that C{_printErrors}, C{_writeln}, C{_write}, C{_printSummary}
+ and C{_separator} are all implemented.
+ """
+ if self._publisher is not None:
+ self._publisher.removeObserver(self._observeWarnings)
+ self._printErrors()
+ self._writeln(self._separator)
+ if self._startTime is not None:
+ self._writeln('Ran %d tests in %.3fs', self.testsRun,
+ time.time() - self._startTime)
+ self._write('\n')
+ self._printSummary()
+
+
+
+class MinimalReporter(Reporter):
+ """
+ A minimalist reporter that prints only a summary of the test result, in
+ the form of (timeTaken, #tests, #tests, #errors, #failures, #skips).
+ """
+
+ def _printErrors(self):
+ """
+ Don't print a detailed summary of errors. We only care about the
+ counts.
+ """
+
+
+ def _printSummary(self):
+ """
+ Print out a one-line summary of the form:
+ '%(runtime) %(number_of_tests) %(number_of_tests) %(num_errors)
+ %(num_failures) %(num_skips)'
+ """
+ numTests = self.testsRun
+ if self._startTime is not None:
+ timing = self._getTime() - self._startTime
+ else:
+ timing = 0
+ t = (timing, numTests, numTests,
+ len(self.errors), len(self.failures), len(self.skips))
+ self._writeln(' '.join(map(str, t)))
+
+
+
+class TextReporter(Reporter):
+ """
+ Simple reporter that prints a single character for each test as it runs,
+ along with the standard Trial summary text.
+ """
+
+ def addSuccess(self, test):
+ super(TextReporter, self).addSuccess(test)
+ self._write('.')
+
+
+ def addError(self, *args):
+ super(TextReporter, self).addError(*args)
+ self._write('E')
+
+
+ def addFailure(self, *args):
+ super(TextReporter, self).addFailure(*args)
+ self._write('F')
+
+
+ def addSkip(self, *args):
+ super(TextReporter, self).addSkip(*args)
+ self._write('S')
+
+
+ def addExpectedFailure(self, *args):
+ super(TextReporter, self).addExpectedFailure(*args)
+ self._write('T')
+
+
+ def addUnexpectedSuccess(self, *args):
+ super(TextReporter, self).addUnexpectedSuccess(*args)
+ self._write('!')
+
+
+
+class VerboseTextReporter(Reporter):
+ """
+ A verbose reporter that prints the name of each test as it is running.
+
+ Each line is printed with the name of the test, followed by the result of
+ that test.
+ """
+
+ # This is actually the bwverbose option
+
+ def startTest(self, tm):
+ self._write('%s ... ', tm.id())
+ super(VerboseTextReporter, self).startTest(tm)
+
+
+ def addSuccess(self, test):
+ super(VerboseTextReporter, self).addSuccess(test)
+ self._write('[OK]')
+
+
+ def addError(self, *args):
+ super(VerboseTextReporter, self).addError(*args)
+ self._write('[ERROR]')
+
+
+ def addFailure(self, *args):
+ super(VerboseTextReporter, self).addFailure(*args)
+ self._write('[FAILURE]')
+
+
+ def addSkip(self, *args):
+ super(VerboseTextReporter, self).addSkip(*args)
+ self._write('[SKIPPED]')
+
+
+ def addExpectedFailure(self, *args):
+ super(VerboseTextReporter, self).addExpectedFailure(*args)
+ self._write('[TODO]')
+
+
+ def addUnexpectedSuccess(self, *args):
+ super(VerboseTextReporter, self).addUnexpectedSuccess(*args)
+ self._write('[SUCCESS!?!]')
+
+
+ def stopTest(self, test):
+ super(VerboseTextReporter, self).stopTest(test)
+ self._write('\n')
+
+
+
+class TimingTextReporter(VerboseTextReporter):
+ """
+ Prints out each test as it is running, followed by the time taken for each
+ test to run.
+ """
+
+ def stopTest(self, method):
+ """
+ Mark the test as stopped, and write the time it took to run the test
+ to the stream.
+ """
+ super(TimingTextReporter, self).stopTest(method)
+ self._write("(%.03f secs)\n" % self._lastTime)
+
+
+
+class _AnsiColorizer(object):
+ """
+ A colorizer is an object that loosely wraps around a stream, allowing
+ callers to write text to the stream in a particular color.
+
+ Colorizer classes must implement C{supported()} and C{write(text, color)}.
+ """
+ _colors = dict(black=30, red=31, green=32, yellow=33,
+ blue=34, magenta=35, cyan=36, white=37)
+
+ def __init__(self, stream):
+ self.stream = stream
+
+
+ def supported(cls, stream=sys.stdout):
+ """
+ A class method that returns True if the current platform supports
+ coloring terminal output using this method. Returns False otherwise.
+ """
+ if not stream.isatty():
+ return False # auto color only on TTYs
+ try:
+ import curses
+ except ImportError:
+ return False
+ else:
+ try:
+ try:
+ return curses.tigetnum("colors") > 2
+ except curses.error:
+ curses.setupterm()
+ return curses.tigetnum("colors") > 2
+ except:
+ # guess false in case of error
+ return False
+ supported = classmethod(supported)
+
+
+ def write(self, text, color):
+ """
+ Write the given text to the stream in the given color.
+
+ @param text: Text to be written to the stream.
+
+ @param color: A string label for a color. e.g. 'red', 'white'.
+ """
+ color = self._colors[color]
+ self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
+
+
+
+class _Win32Colorizer(object):
+ """
+ See _AnsiColorizer docstring.
+ """
+ def __init__(self, stream):
+ from win32console import GetStdHandle, STD_OUTPUT_HANDLE, \
+ FOREGROUND_RED, FOREGROUND_BLUE, FOREGROUND_GREEN, \
+ FOREGROUND_INTENSITY
+ red, green, blue, bold = (FOREGROUND_RED, FOREGROUND_GREEN,
+ FOREGROUND_BLUE, FOREGROUND_INTENSITY)
+ self.stream = stream
+ self.screenBuffer = GetStdHandle(STD_OUTPUT_HANDLE)
+ self._colors = {
+ 'normal': red | green | blue,
+ 'red': red | bold,
+ 'green': green | bold,
+ 'blue': blue | bold,
+ 'yellow': red | green | bold,
+ 'magenta': red | blue | bold,
+ 'cyan': green | blue | bold,
+ 'white': red | green | blue | bold
+ }
+
+
+ def supported(cls, stream=sys.stdout):
+ try:
+ import win32console
+ screenBuffer = win32console.GetStdHandle(
+ win32console.STD_OUTPUT_HANDLE)
+ except ImportError:
+ return False
+ import pywintypes
+ try:
+ screenBuffer.SetConsoleTextAttribute(
+ win32console.FOREGROUND_RED |
+ win32console.FOREGROUND_GREEN |
+ win32console.FOREGROUND_BLUE)
+ except pywintypes.error:
+ return False
+ else:
+ return True
+ supported = classmethod(supported)
+
+
+ def write(self, text, color):
+ color = self._colors[color]
+ self.screenBuffer.SetConsoleTextAttribute(color)
+ self.stream.write(text)
+ self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
+
+
+
+class _NullColorizer(object):
+ """
+ See _AnsiColorizer docstring.
+ """
+ def __init__(self, stream):
+ self.stream = stream
+
+
+ def supported(cls, stream=sys.stdout):
+ return True
+ supported = classmethod(supported)
+
+
+ def write(self, text, color):
+ self.stream.write(text)
+
+
+
+@implementer(itrial.IReporter)
+class SubunitReporter(object):
+ """
+ Reports test output via Subunit.
+
+ @ivar _subunit: The subunit protocol client that we are wrapping.
+
+ @ivar _successful: An internal variable, used to track whether we have
+ received only successful results.
+
+ @since: 10.0
+ """
+
+ def __init__(self, stream=sys.stdout, tbformat='default',
+ realtime=False, publisher=None):
+ """
+ Construct a L{SubunitReporter}.
+
+ @param stream: A file-like object representing the stream to print
+ output to. Defaults to stdout.
+ @param tbformat: The format for tracebacks. Ignored, since subunit
+ always uses Python's standard format.
+ @param realtime: Whether or not to print exceptions in the middle
+ of the test results. Ignored, since subunit always does this.
+ @param publisher: The log publisher which will be preserved for
+ reporting events. Ignored, as it's not relevant to subunit.
+ """
+ if TestProtocolClient is None:
+ raise Exception("Subunit not available")
+ self._subunit = TestProtocolClient(stream)
+ self._successful = True
+
+
+ def done(self):
+ """
+ Record that the entire test suite run is finished.
+
+ We do nothing, since a summary clause is irrelevant to the subunit
+ protocol.
+ """
+ pass
+
+
+ def shouldStop(self):
+ """
+ Whether or not the test runner should stop running tests.
+ """
+ return self._subunit.shouldStop
+ shouldStop = property(shouldStop)
+
+
+ def stop(self):
+ """
+ Signal that the test runner should stop running tests.
+ """
+ return self._subunit.stop()
+
+
+ def wasSuccessful(self):
+ """
+ Has the test run been successful so far?
+
+ @return: C{True} if we have received no reports of errors or failures,
+ C{False} otherwise.
+ """
+ # Subunit has a bug in its implementation of wasSuccessful, see
+ # https://bugs.edge.launchpad.net/subunit/+bug/491090, so we can't
+ # simply forward it on.
+ return self._successful
+
+
+ def startTest(self, test):
+ """
+ Record that C{test} has started.
+ """
+ return self._subunit.startTest(test)
+
+
+ def stopTest(self, test):
+ """
+ Record that C{test} has completed.
+ """
+ return self._subunit.stopTest(test)
+
+
+ def addSuccess(self, test):
+ """
+ Record that C{test} was successful.
+ """
+ return self._subunit.addSuccess(test)
+
+
+ def addSkip(self, test, reason):
+ """
+ Record that C{test} was skipped for C{reason}.
+
+ Some versions of subunit don't have support for addSkip. In those
+ cases, the skip is reported as a success.
+
+ @param test: A unittest-compatible C{TestCase}.
+ @param reason: The reason for it being skipped. The C{str()} of this
+ object will be included in the subunit output stream.
+ """
+ addSkip = getattr(self._subunit, 'addSkip', None)
+ if addSkip is None:
+ self.addSuccess(test)
+ else:
+ self._subunit.addSkip(test, reason)
+
+
+ def addError(self, test, err):
+ """
+ Record that C{test} failed with an unexpected error C{err}.
+
+ Also marks the run as being unsuccessful, causing
+ L{SubunitReporter.wasSuccessful} to return C{False}.
+ """
+ self._successful = False
+ return self._subunit.addError(
+ test, util.excInfoOrFailureToExcInfo(err))
+
+
+ def addFailure(self, test, err):
+ """
+ Record that C{test} failed an assertion with the error C{err}.
+
+ Also marks the run as being unsuccessful, causing
+ L{SubunitReporter.wasSuccessful} to return C{False}.
+ """
+ self._successful = False
+ return self._subunit.addFailure(
+ test, util.excInfoOrFailureToExcInfo(err))
+
+
+ def addExpectedFailure(self, test, failure, todo):
+ """
+ Record an expected failure from a test.
+
+ Some versions of subunit do not implement this. For those versions, we
+ record a success.
+ """
+ failure = util.excInfoOrFailureToExcInfo(failure)
+ addExpectedFailure = getattr(self._subunit, 'addExpectedFailure', None)
+ if addExpectedFailure is None:
+ self.addSuccess(test)
+ else:
+ addExpectedFailure(test, failure)
+
+
+ def addUnexpectedSuccess(self, test, todo=None):
+ """
+ Record an unexpected success.
+
+ Since subunit has no way of expressing this concept, we record a
+ success on the subunit stream.
+ """
+ # Not represented in pyunit/subunit.
+ self.addSuccess(test)
+
+
+
+class TreeReporter(Reporter):
+ """
+ Print out the tests in the form a tree.
+
+ Tests are indented according to which class and module they belong.
+ Results are printed in ANSI color.
+ """
+
+ currentLine = ''
+ indent = ' '
+ columns = 79
+
+ FAILURE = 'red'
+ ERROR = 'red'
+ TODO = 'blue'
+ SKIP = 'blue'
+ TODONE = 'red'
+ SUCCESS = 'green'
+
+ def __init__(self, stream=sys.stdout, *args, **kwargs):
+ super(TreeReporter, self).__init__(stream, *args, **kwargs)
+ self._lastTest = []
+ for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
+ if colorizer.supported(stream):
+ self._colorizer = colorizer(stream)
+ break
+
+
+ def getDescription(self, test):
+ """
+ Return the name of the method which 'test' represents. This is
+ what gets displayed in the leaves of the tree.
+
+ e.g. getDescription(TestCase('test_foo')) ==> test_foo
+ """
+ return test.id().split('.')[-1]
+
+
+ def addSuccess(self, test):
+ super(TreeReporter, self).addSuccess(test)
+ self.endLine('[OK]', self.SUCCESS)
+
+
+ def addError(self, *args):
+ super(TreeReporter, self).addError(*args)
+ self.endLine('[ERROR]', self.ERROR)
+
+
+ def addFailure(self, *args):
+ super(TreeReporter, self).addFailure(*args)
+ self.endLine('[FAIL]', self.FAILURE)
+
+
+ def addSkip(self, *args):
+ super(TreeReporter, self).addSkip(*args)
+ self.endLine('[SKIPPED]', self.SKIP)
+
+
+ def addExpectedFailure(self, *args):
+ super(TreeReporter, self).addExpectedFailure(*args)
+ self.endLine('[TODO]', self.TODO)
+
+
+ def addUnexpectedSuccess(self, *args):
+ super(TreeReporter, self).addUnexpectedSuccess(*args)
+ self.endLine('[SUCCESS!?!]', self.TODONE)
+
+
+ def _write(self, format, *args):
+ if args:
+ format = format % args
+ self.currentLine = format
+ super(TreeReporter, self)._write(self.currentLine)
+
+
+ def _getPreludeSegments(self, testID):
+ """
+ Return a list of all non-leaf segments to display in the tree.
+
+ Normally this is the module and class name.
+ """
+ segments = testID.split('.')[:-1]
+ if len(segments) == 0:
+ return segments
+ segments = [
+ seg for seg in ('.'.join(segments[:-1]), segments[-1])
+ if len(seg) > 0]
+ return segments
+
+
+ def _testPrelude(self, testID):
+ """
+ Write the name of the test to the stream, indenting it appropriately.
+
+ If the test is the first test in a new 'branch' of the tree, also
+ write all of the parents in that branch.
+ """
+ segments = self._getPreludeSegments(testID)
+ indentLevel = 0
+ for seg in segments:
+ if indentLevel < len(self._lastTest):
+ if seg != self._lastTest[indentLevel]:
+ self._write('%s%s\n' % (self.indent * indentLevel, seg))
+ else:
+ self._write('%s%s\n' % (self.indent * indentLevel, seg))
+ indentLevel += 1
+ self._lastTest = segments
+
+
+ def cleanupErrors(self, errs):
+ self._colorizer.write(' cleanup errors', self.ERROR)
+ self.endLine('[ERROR]', self.ERROR)
+ super(TreeReporter, self).cleanupErrors(errs)
+
+
+ def upDownError(self, method, error, warn, printStatus):
+ self._colorizer.write(" %s" % method, self.ERROR)
+ if printStatus:
+ self.endLine('[ERROR]', self.ERROR)
+ super(TreeReporter, self).upDownError(method, error, warn, printStatus)
+
+
+ def startTest(self, test):
+ """
+ Called when C{test} starts. Writes the tests name to the stream using
+ a tree format.
+ """
+ self._testPrelude(test.id())
+ self._write('%s%s ... ' % (self.indent * (len(self._lastTest)),
+ self.getDescription(test)))
+ super(TreeReporter, self).startTest(test)
+
+
+ def endLine(self, message, color):
+ """
+ Print 'message' in the given color.
+
+ @param message: A string message, usually '[OK]' or something similar.
+ @param color: A string color, 'red', 'green' and so forth.
+ """
+ spaces = ' ' * (self.columns - len(self.currentLine) - len(message))
+ super(TreeReporter, self)._write(spaces)
+ self._colorizer.write(message, color)
+ super(TreeReporter, self)._write("\n")
+
+
+ def _printSummary(self):
+ """
+ Print a line summarising the test results to the stream, and color the
+ status result.
+ """
+ summary = self._getSummary()
+ if self.wasSuccessful():
+ status = "PASSED"
+ color = self.SUCCESS
+ else:
+ status = "FAILED"
+ color = self.FAILURE
+ self._colorizer.write(status, color)
+ self._write("%s\n", summary)
diff --git a/contrib/python/Twisted/py2/twisted/trial/runner.py b/contrib/python/Twisted/py2/twisted/trial/runner.py
new file mode 100644
index 0000000000..220c34ba70
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/runner.py
@@ -0,0 +1,1064 @@
+# -*- test-case-name: twisted.trial.test.test_runner -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+A miscellany of code used to run Trial tests.
+
+Maintainer: Jonathan Lange
+"""
+
+from __future__ import absolute_import, division
+
+__all__ = [
+ 'TestSuite',
+
+ 'DestructiveTestSuite', 'ErrorHolder', 'LoggedSuite',
+ 'TestHolder', 'TestLoader', 'TrialRunner', 'TrialSuite',
+
+ 'filenameToModule', 'isPackage', 'isPackageDirectory', 'isTestCase',
+ 'name', 'samefile', 'NOT_IN_TEST',
+ ]
+
+import doctest
+import inspect
+import os
+import sys
+import time
+import types
+import warnings
+
+from twisted.python import reflect, log, failure, modules, filepath
+from twisted.python.compat import _PY3, _PY35PLUS
+
+from twisted.internet import defer
+from twisted.trial import util, unittest
+from twisted.trial.itrial import ITestCase
+from twisted.trial.reporter import _ExitWrapper, UncleanWarningsReporterWrapper
+from twisted.trial._asyncrunner import _ForceGarbageCollectionDecorator, _iterateTests
+from twisted.trial._synctest import _logObserver
+
+# These are imported so that they remain in the public API for t.trial.runner
+from twisted.trial.unittest import TestSuite
+
+from zope.interface import implementer
+
+pyunit = __import__('unittest')
+
+
+
+def isPackage(module):
+ """Given an object return True if the object looks like a package"""
+ if not isinstance(module, types.ModuleType):
+ return False
+ basename = os.path.splitext(os.path.basename(module.__file__))[0]
+ return basename == '__init__'
+
+
+def isPackageDirectory(dirname):
+ """
+ Is the directory at path 'dirname' a Python package directory?
+ Returns the name of the __init__ file (it may have a weird extension)
+ if dirname is a package directory. Otherwise, returns False
+ """
+ def _getSuffixes():
+ if _PY3:
+ import importlib
+ return importlib.machinery.all_suffixes()
+ else:
+ import imp
+ return list(zip(*imp.get_suffixes()))[0]
+
+
+ for ext in _getSuffixes():
+ initFile = '__init__' + ext
+ if os.path.exists(os.path.join(dirname, initFile)):
+ return initFile
+ return False
+
+
+def samefile(filename1, filename2):
+ """
+ A hacky implementation of C{os.path.samefile}. Used by L{filenameToModule}
+ when the platform doesn't provide C{os.path.samefile}. Do not use this.
+ """
+ return os.path.abspath(filename1) == os.path.abspath(filename2)
+
+
+def filenameToModule(fn):
+ """
+ Given a filename, do whatever possible to return a module object matching
+ that file.
+
+ If the file in question is a module in Python path, properly import and
+ return that module. Otherwise, load the source manually.
+
+ @param fn: A filename.
+ @return: A module object.
+ @raise ValueError: If C{fn} does not exist.
+ """
+ if not os.path.exists(fn):
+ raise ValueError("%r doesn't exist" % (fn,))
+ try:
+ ret = reflect.namedAny(reflect.filenameToModuleName(fn))
+ except (ValueError, AttributeError):
+ # Couldn't find module. The file 'fn' is not in PYTHONPATH
+ return _importFromFile(fn)
+
+ # >=3.7 has __file__ attribute as None, previously __file__ was not present
+ if getattr(ret, "__file__", None) is None:
+ # This isn't a Python module in a package, so import it from a file
+ return _importFromFile(fn)
+
+ # ensure that the loaded module matches the file
+ retFile = os.path.splitext(ret.__file__)[0] + '.py'
+ # not all platforms (e.g. win32) have os.path.samefile
+ same = getattr(os.path, 'samefile', samefile)
+ if os.path.isfile(fn) and not same(fn, retFile):
+ del sys.modules[ret.__name__]
+ ret = _importFromFile(fn)
+ return ret
+
+
+def _importFromFile(fn, moduleName=None):
+ fn = _resolveDirectory(fn)
+ if not moduleName:
+ moduleName = os.path.splitext(os.path.split(fn)[-1])[0]
+ if moduleName in sys.modules:
+ return sys.modules[moduleName]
+ if _PY35PLUS:
+ import importlib
+
+ spec = importlib.util.spec_from_file_location(moduleName, fn)
+ if not spec:
+ raise SyntaxError(fn)
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module)
+ sys.modules[moduleName] = module
+ else:
+ import imp
+
+ with open(fn, 'r') as fd:
+ module = imp.load_source(moduleName, fn, fd)
+ return module
+
+
+def _resolveDirectory(fn):
+ if os.path.isdir(fn):
+ initFile = isPackageDirectory(fn)
+ if initFile:
+ fn = os.path.join(fn, initFile)
+ else:
+ raise ValueError('%r is not a package directory' % (fn,))
+ return fn
+
+
+def _getMethodNameInClass(method):
+ """
+ Find the attribute name on the method's class which refers to the method.
+
+ For some methods, notably decorators which have not had __name__ set correctly:
+
+ getattr(method.im_class, method.__name__) != method
+ """
+ if getattr(method.im_class, method.__name__, object()) != method:
+ for alias in dir(method.im_class):
+ if getattr(method.im_class, alias, object()) == method:
+ return alias
+ return method.__name__
+
+
+class DestructiveTestSuite(TestSuite):
+ """
+ A test suite which remove the tests once run, to minimize memory usage.
+ """
+
+ def run(self, result):
+ """
+ Almost the same as L{TestSuite.run}, but with C{self._tests} being
+ empty at the end.
+ """
+ while self._tests:
+ if result.shouldStop:
+ break
+ test = self._tests.pop(0)
+ test(result)
+ return result
+
+
+
+# When an error occurs outside of any test, the user will see this string
+# in place of a test's name.
+NOT_IN_TEST = "<not in test>"
+
+
+
+class LoggedSuite(TestSuite):
+ """
+ Any errors logged in this suite will be reported to the L{TestResult}
+ object.
+ """
+
+ def run(self, result):
+ """
+ Run the suite, storing all errors in C{result}. If an error is logged
+ while no tests are running, then it will be added as an error to
+ C{result}.
+
+ @param result: A L{TestResult} object.
+ """
+ observer = _logObserver
+ observer._add()
+ super(LoggedSuite, self).run(result)
+ observer._remove()
+ for error in observer.getErrors():
+ result.addError(TestHolder(NOT_IN_TEST), error)
+ observer.flushErrors()
+
+
+
+class TrialSuite(TestSuite):
+ """
+ Suite to wrap around every single test in a C{trial} run. Used internally
+ by Trial to set up things necessary for Trial tests to work, regardless of
+ what context they are run in.
+ """
+
+ def __init__(self, tests=(), forceGarbageCollection=False):
+ if forceGarbageCollection:
+ newTests = []
+ for test in tests:
+ test = unittest.decorate(
+ test, _ForceGarbageCollectionDecorator)
+ newTests.append(test)
+ tests = newTests
+ suite = LoggedSuite(tests)
+ super(TrialSuite, self).__init__([suite])
+
+
+ def _bail(self):
+ from twisted.internet import reactor
+ d = defer.Deferred()
+ reactor.addSystemEventTrigger('after', 'shutdown',
+ lambda: d.callback(None))
+ reactor.fireSystemEvent('shutdown') # radix's suggestion
+ # As long as TestCase does crap stuff with the reactor we need to
+ # manually shutdown the reactor here, and that requires util.wait
+ # :(
+ # so that the shutdown event completes
+ unittest.TestCase('mktemp')._wait(d)
+
+ def run(self, result):
+ try:
+ TestSuite.run(self, result)
+ finally:
+ self._bail()
+
+
+def name(thing):
+ """
+ @param thing: an object from modules (instance of PythonModule,
+ PythonAttribute), a TestCase subclass, or an instance of a TestCase.
+ """
+ if isTestCase(thing):
+ # TestCase subclass
+ theName = reflect.qual(thing)
+ else:
+ # thing from trial, or thing from modules.
+ # this monstrosity exists so that modules' objects do not have to
+ # implement id(). -jml
+ try:
+ theName = thing.id()
+ except AttributeError:
+ theName = thing.name
+ return theName
+
+
+def isTestCase(obj):
+ """
+ @return: C{True} if C{obj} is a class that contains test cases, C{False}
+ otherwise. Used to find all the tests in a module.
+ """
+ try:
+ return issubclass(obj, pyunit.TestCase)
+ except TypeError:
+ return False
+
+
+
+@implementer(ITestCase)
+class TestHolder(object):
+ """
+ Placeholder for a L{TestCase} inside a reporter. As far as a L{TestResult}
+ is concerned, this looks exactly like a unit test.
+ """
+
+ failureException = None
+
+ def __init__(self, description):
+ """
+ @param description: A string to be displayed L{TestResult}.
+ """
+ self.description = description
+
+
+ def __call__(self, result):
+ return self.run(result)
+
+
+ def id(self):
+ return self.description
+
+
+ def countTestCases(self):
+ return 0
+
+
+ def run(self, result):
+ """
+ This test is just a placeholder. Run the test successfully.
+
+ @param result: The C{TestResult} to store the results in.
+ @type result: L{twisted.trial.itrial.IReporter}.
+ """
+ result.startTest(self)
+ result.addSuccess(self)
+ result.stopTest(self)
+
+
+ def shortDescription(self):
+ return self.description
+
+
+
+class ErrorHolder(TestHolder):
+ """
+ Used to insert arbitrary errors into a test suite run. Provides enough
+ methods to look like a C{TestCase}, however, when it is run, it simply adds
+ an error to the C{TestResult}. The most common use-case is for when a
+ module fails to import.
+ """
+
+ def __init__(self, description, error):
+ """
+ @param description: A string used by C{TestResult}s to identify this
+ error. Generally, this is the name of a module that failed to import.
+
+ @param error: The error to be added to the result. Can be an `exc_info`
+ tuple or a L{twisted.python.failure.Failure}.
+ """
+ super(ErrorHolder, self).__init__(description)
+ self.error = util.excInfoOrFailureToExcInfo(error)
+
+
+ def __repr__(self):
+ return "<ErrorHolder description=%r error=%r>" % (
+ self.description, self.error[1])
+
+
+ def run(self, result):
+ """
+ Run the test, reporting the error.
+
+ @param result: The C{TestResult} to store the results in.
+ @type result: L{twisted.trial.itrial.IReporter}.
+ """
+ result.startTest(self)
+ result.addError(self, self.error)
+ result.stopTest(self)
+
+
+
+class TestLoader(object):
+ """
+ I find tests inside function, modules, files -- whatever -- then return
+ them wrapped inside a Test (either a L{TestSuite} or a L{TestCase}).
+
+ @ivar methodPrefix: A string prefix. C{TestLoader} will assume that all the
+ methods in a class that begin with C{methodPrefix} are test cases.
+
+ @ivar modulePrefix: A string prefix. Every module in a package that begins
+ with C{modulePrefix} is considered a module full of tests.
+
+ @ivar forceGarbageCollection: A flag applied to each C{TestCase} loaded.
+ See L{unittest.TestCase} for more information.
+
+ @ivar sorter: A key function used to sort C{TestCase}s, test classes,
+ modules and packages.
+
+ @ivar suiteFactory: A callable which is passed a list of tests (which
+ themselves may be suites of tests). Must return a test suite.
+ """
+
+ methodPrefix = 'test'
+ modulePrefix = 'test_'
+
+ def __init__(self):
+ self.suiteFactory = TestSuite
+ self.sorter = name
+ self._importErrors = []
+
+ def sort(self, xs):
+ """
+ Sort the given things using L{sorter}.
+
+ @param xs: A list of test cases, class or modules.
+ """
+ return sorted(xs, key=self.sorter)
+
+ def findTestClasses(self, module):
+ """Given a module, return all Trial test classes"""
+ classes = []
+ for name, val in inspect.getmembers(module):
+ if isTestCase(val):
+ classes.append(val)
+ return self.sort(classes)
+
+ def findByName(self, name):
+ """
+ Return a Python object given a string describing it.
+
+ @param name: a string which may be either a filename or a
+ fully-qualified Python name.
+
+ @return: If C{name} is a filename, return the module. If C{name} is a
+ fully-qualified Python name, return the object it refers to.
+ """
+ if os.path.exists(name):
+ return filenameToModule(name)
+ return reflect.namedAny(name)
+
+ def loadModule(self, module):
+ """
+ Return a test suite with all the tests from a module.
+
+ Included are TestCase subclasses and doctests listed in the module's
+ __doctests__ module. If that's not good for you, put a function named
+ either C{testSuite} or C{test_suite} in your module that returns a
+ TestSuite, and I'll use the results of that instead.
+
+ If C{testSuite} and C{test_suite} are both present, then I'll use
+ C{testSuite}.
+ """
+ ## XXX - should I add an optional parameter to disable the check for
+ ## a custom suite.
+ ## OR, should I add another method
+ if not isinstance(module, types.ModuleType):
+ raise TypeError("%r is not a module" % (module,))
+ if hasattr(module, 'testSuite'):
+ return module.testSuite()
+ elif hasattr(module, 'test_suite'):
+ return module.test_suite()
+ suite = self.suiteFactory()
+ for testClass in self.findTestClasses(module):
+ suite.addTest(self.loadClass(testClass))
+ if not hasattr(module, '__doctests__'):
+ return suite
+ docSuite = self.suiteFactory()
+ for docTest in module.__doctests__:
+ docSuite.addTest(self.loadDoctests(docTest))
+ return self.suiteFactory([suite, docSuite])
+ loadTestsFromModule = loadModule
+
+ def loadClass(self, klass):
+ """
+ Given a class which contains test cases, return a sorted list of
+ C{TestCase} instances.
+ """
+ if not (isinstance(klass, type) or isinstance(klass, types.ClassType)):
+ raise TypeError("%r is not a class" % (klass,))
+ if not isTestCase(klass):
+ raise ValueError("%r is not a test case" % (klass,))
+ names = self.getTestCaseNames(klass)
+ tests = self.sort([self._makeCase(klass, self.methodPrefix+name)
+ for name in names])
+ return self.suiteFactory(tests)
+ loadTestsFromTestCase = loadClass
+
+ def getTestCaseNames(self, klass):
+ """
+ Given a class that contains C{TestCase}s, return a list of names of
+ methods that probably contain tests.
+ """
+ return reflect.prefixedMethodNames(klass, self.methodPrefix)
+
+ def loadMethod(self, method):
+ """
+ Given a method of a C{TestCase} that represents a test, return a
+ C{TestCase} instance for that test.
+ """
+ if not isinstance(method, types.MethodType):
+ raise TypeError("%r not a method" % (method,))
+ return self._makeCase(method.im_class, _getMethodNameInClass(method))
+
+ def _makeCase(self, klass, methodName):
+ return klass(methodName)
+
+ def loadPackage(self, package, recurse=False):
+ """
+ Load tests from a module object representing a package, and return a
+ TestSuite containing those tests.
+
+ Tests are only loaded from modules whose name begins with 'test_'
+ (or whatever C{modulePrefix} is set to).
+
+ @param package: a types.ModuleType object (or reasonable facsimile
+ obtained by importing) which may contain tests.
+
+ @param recurse: A boolean. If True, inspect modules within packages
+ within the given package (and so on), otherwise, only inspect modules
+ in the package itself.
+
+ @raise: TypeError if 'package' is not a package.
+
+ @return: a TestSuite created with my suiteFactory, containing all the
+ tests.
+ """
+ if not isPackage(package):
+ raise TypeError("%r is not a package" % (package,))
+ pkgobj = modules.getModule(package.__name__)
+ if recurse:
+ discovery = pkgobj.walkModules()
+ else:
+ discovery = pkgobj.iterModules()
+ discovered = []
+ for disco in discovery:
+ if disco.name.split(".")[-1].startswith(self.modulePrefix):
+ discovered.append(disco)
+ suite = self.suiteFactory()
+ for modinfo in self.sort(discovered):
+ try:
+ module = modinfo.load()
+ except:
+ thingToAdd = ErrorHolder(modinfo.name, failure.Failure())
+ else:
+ thingToAdd = self.loadModule(module)
+ suite.addTest(thingToAdd)
+ return suite
+
+ def loadDoctests(self, module):
+ """
+ Return a suite of tests for all the doctests defined in C{module}.
+
+ @param module: A module object or a module name.
+ """
+ if isinstance(module, str):
+ try:
+ module = reflect.namedAny(module)
+ except:
+ return ErrorHolder(module, failure.Failure())
+ if not inspect.ismodule(module):
+ warnings.warn("trial only supports doctesting modules")
+ return
+ extraArgs = {}
+
+ # Work around Python issue2604: DocTestCase.tearDown clobbers globs
+ def saveGlobals(test):
+ """
+ Save C{test.globs} and replace it with a copy so that if
+ necessary, the original will be available for the next test
+ run.
+ """
+ test._savedGlobals = getattr(test, '_savedGlobals', test.globs)
+ test.globs = test._savedGlobals.copy()
+ extraArgs['setUp'] = saveGlobals
+ return doctest.DocTestSuite(module, **extraArgs)
+
+ def loadAnything(self, thing, recurse=False, parent=None, qualName=None):
+ """
+ Given a Python object, return whatever tests that are in it. Whatever
+ 'in' might mean.
+
+ @param thing: A Python object. A module, method, class or package.
+ @param recurse: Whether or not to look in subpackages of packages.
+ Defaults to False.
+
+ @param parent: For compatibility with the Python 3 loader, does
+ nothing.
+ @param qualname: For compatibility with the Python 3 loader, does
+ nothing.
+
+ @return: A C{TestCase} or C{TestSuite}.
+ """
+ if isinstance(thing, types.ModuleType):
+ if isPackage(thing):
+ return self.loadPackage(thing, recurse)
+ return self.loadModule(thing)
+ elif isinstance(thing, types.ClassType):
+ return self.loadClass(thing)
+ elif isinstance(thing, type):
+ return self.loadClass(thing)
+ elif isinstance(thing, types.MethodType):
+ return self.loadMethod(thing)
+ raise TypeError("No loader for %r. Unrecognized type" % (thing,))
+
+ def loadByName(self, name, recurse=False):
+ """
+ Given a string representing a Python object, return whatever tests
+ are in that object.
+
+ If C{name} is somehow inaccessible (e.g. the module can't be imported,
+ there is no Python object with that name etc) then return an
+ L{ErrorHolder}.
+
+ @param name: The fully-qualified name of a Python object.
+ """
+ try:
+ thing = self.findByName(name)
+ except:
+ return ErrorHolder(name, failure.Failure())
+ return self.loadAnything(thing, recurse)
+
+ loadTestsFromName = loadByName
+
+ def loadByNames(self, names, recurse=False):
+ """
+ Construct a TestSuite containing all the tests found in 'names', where
+ names is a list of fully qualified python names and/or filenames. The
+ suite returned will have no duplicate tests, even if the same object
+ is named twice.
+ """
+ things = []
+ errors = []
+ for name in names:
+ try:
+ things.append(self.findByName(name))
+ except:
+ errors.append(ErrorHolder(name, failure.Failure()))
+ suites = [self.loadAnything(thing, recurse)
+ for thing in self._uniqueTests(things)]
+ suites.extend(errors)
+ return self.suiteFactory(suites)
+
+
+ def _uniqueTests(self, things):
+ """
+ Gather unique suite objects from loaded things. This will guarantee
+ uniqueness of inherited methods on TestCases which would otherwise hash
+ to same value and collapse to one test unexpectedly if using simpler
+ means: e.g. set().
+ """
+ seen = set()
+ for thing in things:
+ if isinstance(thing, types.MethodType):
+ thing = (thing, thing.im_class)
+ else:
+ thing = (thing,)
+
+ if thing not in seen:
+ yield thing[0]
+ seen.add(thing)
+
+
+
+class Py3TestLoader(TestLoader):
+ """
+ A test loader finds tests from the functions, modules, and files that is
+ asked to and loads them into a L{TestSuite} or L{TestCase}.
+
+ See L{TestLoader} for further details.
+ """
+
+ def loadFile(self, fileName, recurse=False):
+ """
+ Load a file, and then the tests in that file.
+
+ @param fileName: The file name to load.
+ @param recurse: A boolean. If True, inspect modules within packages
+ within the given package (and so on), otherwise, only inspect
+ modules in the package itself.
+ """
+ from importlib.machinery import SourceFileLoader
+
+ name = reflect.filenameToModuleName(fileName)
+ try:
+ module = SourceFileLoader(name, fileName).load_module()
+ return self.loadAnything(module, recurse=recurse)
+ except OSError:
+ raise ValueError("{} is not a Python file.".format(fileName))
+
+
+ def findByName(self, _name, recurse=False):
+ """
+ Find and load tests, given C{name}.
+
+ This partially duplicates the logic in C{unittest.loader.TestLoader}.
+
+ @param name: The qualified name of the thing to load.
+ @param recurse: A boolean. If True, inspect modules within packages
+ within the given package (and so on), otherwise, only inspect
+ modules in the package itself.
+ """
+ if os.sep in _name:
+ # It's a file, try and get the module name for this file.
+ name = reflect.filenameToModuleName(_name)
+
+ try:
+ # Try and import it, if it's on the path.
+ # CAVEAT: If you have two twisteds, and you try and import the
+ # one NOT on your path, it'll load the one on your path. But
+ # that's silly, nobody should do that, and existing Trial does
+ # that anyway.
+ __import__(name)
+ except ImportError:
+ # If we can't import it, look for one NOT on the path.
+ return self.loadFile(_name, recurse=recurse)
+
+ else:
+ name = _name
+
+ obj = parent = remaining = None
+
+ for searchName, remainingName in _qualNameWalker(name):
+ # Walk down the qualified name, trying to import a module. For
+ # example, `twisted.test.test_paths.FilePathTests` would try
+ # the full qualified name, then just up to test_paths, and then
+ # just up to test, and so forth.
+ # This gets us the highest level thing which is a module.
+ try:
+ obj = reflect.namedModule(searchName)
+ # If we reach here, we have successfully found a module.
+ # obj will be the module, and remaining will be the remaining
+ # part of the qualified name.
+ remaining = remainingName
+ break
+
+ except ImportError:
+ # Check to see where the ImportError happened. If it happened
+ # in this file, ignore it.
+ tb = sys.exc_info()[2]
+
+ # Walk down to the deepest frame, where it actually happened.
+ while tb.tb_next is not None:
+ tb = tb.tb_next
+
+ # Get the filename that the ImportError originated in.
+ filenameWhereHappened = tb.tb_frame.f_code.co_filename
+
+ # If it originated in the reflect file, then it's because it
+ # doesn't exist. If it originates elsewhere, it's because an
+ # ImportError happened in a module that does exist.
+ if filenameWhereHappened != reflect.__file__:
+ raise
+
+ if remaining == "":
+ raise reflect.ModuleNotFound(
+ "The module {} does not exist.".format(name)
+ )
+
+ if obj is None:
+ # If it's none here, we didn't get to import anything.
+ # Try something drastic.
+ obj = reflect.namedAny(name)
+ remaining = name.split(".")[len(".".split(obj.__name__))+1:]
+
+ try:
+ for part in remaining:
+ # Walk down the remaining modules. Hold on to the parent for
+ # methods, as on Python 3, you can no longer get the parent
+ # class from just holding onto the method.
+ parent, obj = obj, getattr(obj, part)
+ except AttributeError:
+ raise AttributeError("{} does not exist.".format(name))
+
+ return self.loadAnything(obj, parent=parent, qualName=remaining,
+ recurse=recurse)
+
+
+ def loadAnything(self, obj, recurse=False, parent=None, qualName=None):
+ """
+ Load absolutely anything (as long as that anything is a module,
+ package, class, or method (with associated parent class and qualname).
+
+ @param obj: The object to load.
+ @param recurse: A boolean. If True, inspect modules within packages
+ within the given package (and so on), otherwise, only inspect
+ modules in the package itself.
+ @param parent: If C{obj} is a method, this is the parent class of the
+ method. C{qualName} is also required.
+ @param qualName: If C{obj} is a method, this a list containing is the
+ qualified name of the method. C{parent} is also required.
+ """
+ if isinstance(obj, types.ModuleType):
+ # It looks like a module
+ if isPackage(obj):
+ # It's a package, so recurse down it.
+ return self.loadPackage(obj, recurse=recurse)
+ # Otherwise get all the tests in the module.
+ return self.loadTestsFromModule(obj)
+ elif isinstance(obj, type) and issubclass(obj, pyunit.TestCase):
+ # We've found a raw test case, get the tests from it.
+ return self.loadTestsFromTestCase(obj)
+ elif (isinstance(obj, types.FunctionType) and
+ isinstance(parent, type) and
+ issubclass(parent, pyunit.TestCase)):
+ # We've found a method, and its parent is a TestCase. Instantiate
+ # it with the name of the method we want.
+ name = qualName[-1]
+ inst = parent(name)
+
+ # Sanity check to make sure that the method we have got from the
+ # test case is the same one as was passed in. This doesn't actually
+ # use the function we passed in, because reasons.
+ assert getattr(inst, inst._testMethodName).__func__ == obj
+
+ return inst
+ elif isinstance(obj, TestSuite):
+ # We've found a test suite.
+ return obj
+ else:
+ raise TypeError("don't know how to make test from: %s" % (obj,))
+
+
+ def loadByName(self, name, recurse=False):
+ """
+ Load some tests by name.
+
+ @param name: The qualified name for the test to load.
+ @param recurse: A boolean. If True, inspect modules within packages
+ within the given package (and so on), otherwise, only inspect
+ modules in the package itself.
+ """
+ try:
+ return self.suiteFactory([self.findByName(name, recurse=recurse)])
+ except:
+ return self.suiteFactory([ErrorHolder(name, failure.Failure())])
+
+
+ def loadByNames(self, names, recurse=False):
+ """
+ Load some tests by a list of names.
+
+ @param names: A L{list} of qualified names.
+ @param recurse: A boolean. If True, inspect modules within packages
+ within the given package (and so on), otherwise, only inspect
+ modules in the package itself.
+ """
+ things = []
+ errors = []
+ for name in names:
+ try:
+ things.append(self.loadByName(name, recurse=recurse))
+ except:
+ errors.append(ErrorHolder(name, failure.Failure()))
+ things.extend(errors)
+ return self.suiteFactory(self._uniqueTests(things))
+
+
+ def loadClass(self, klass):
+ """
+ Given a class which contains test cases, return a list of L{TestCase}s.
+
+ @param klass: The class to load tests from.
+ """
+ if not isinstance(klass, type):
+ raise TypeError("%r is not a class" % (klass,))
+ if not isTestCase(klass):
+ raise ValueError("%r is not a test case" % (klass,))
+ names = self.getTestCaseNames(klass)
+ tests = self.sort([self._makeCase(klass, self.methodPrefix+name)
+ for name in names])
+ return self.suiteFactory(tests)
+
+
+ def loadMethod(self, method):
+ raise NotImplementedError("Can't happen on Py3")
+
+
+ def _uniqueTests(self, things):
+ """
+ Gather unique suite objects from loaded things. This will guarantee
+ uniqueness of inherited methods on TestCases which would otherwise hash
+ to same value and collapse to one test unexpectedly if using simpler
+ means: e.g. set().
+ """
+ seen = set()
+ for testthing in things:
+ testthings = testthing._tests
+ for thing in testthings:
+ # This is horrible.
+ if str(thing) not in seen:
+ yield thing
+ seen.add(str(thing))
+
+
+def _qualNameWalker(qualName):
+ """
+ Given a Python qualified name, this function yields a 2-tuple of the most
+ specific qualified name first, followed by the next-most-specific qualified
+ name, and so on, paired with the remainder of the qualified name.
+
+ @param qualName: A Python qualified name.
+ @type qualName: L{str}
+ """
+ # Yield what we were just given
+ yield (qualName, [])
+
+ # If they want more, split the qualified name up
+ qualParts = qualName.split(".")
+
+ for index in range(1, len(qualParts)):
+ # This code here will produce, from the example walker.texas.ranger:
+ # (walker.texas, ["ranger"])
+ # (walker, ["texas", "ranger"])
+ yield (".".join(qualParts[:-index]), qualParts[-index:])
+
+
+if _PY3:
+ del TestLoader
+ TestLoader = Py3TestLoader
+
+
+
+class TrialRunner(object):
+ """
+ A specialised runner that the trial front end uses.
+ """
+
+ DEBUG = 'debug'
+ DRY_RUN = 'dry-run'
+
+ def _setUpTestdir(self):
+ self._tearDownLogFile()
+ currentDir = os.getcwd()
+ base = filepath.FilePath(self.workingDirectory)
+ testdir, self._testDirLock = util._unusedTestDirectory(base)
+ os.chdir(testdir.path)
+ return currentDir
+
+
+ def _tearDownTestdir(self, oldDir):
+ os.chdir(oldDir)
+ self._testDirLock.unlock()
+
+
+ _log = log
+ def _makeResult(self):
+ reporter = self.reporterFactory(self.stream, self.tbformat,
+ self.rterrors, self._log)
+ if self._exitFirst:
+ reporter = _ExitWrapper(reporter)
+ if self.uncleanWarnings:
+ reporter = UncleanWarningsReporterWrapper(reporter)
+ return reporter
+
+ def __init__(self, reporterFactory,
+ mode=None,
+ logfile='test.log',
+ stream=sys.stdout,
+ profile=False,
+ tracebackFormat='default',
+ realTimeErrors=False,
+ uncleanWarnings=False,
+ workingDirectory=None,
+ forceGarbageCollection=False,
+ debugger=None,
+ exitFirst=False):
+ self.reporterFactory = reporterFactory
+ self.logfile = logfile
+ self.mode = mode
+ self.stream = stream
+ self.tbformat = tracebackFormat
+ self.rterrors = realTimeErrors
+ self.uncleanWarnings = uncleanWarnings
+ self._result = None
+ self.workingDirectory = workingDirectory or '_trial_temp'
+ self._logFileObserver = None
+ self._logFileObject = None
+ self._forceGarbageCollection = forceGarbageCollection
+ self.debugger = debugger
+ self._exitFirst = exitFirst
+ if profile:
+ self.run = util.profiled(self.run, 'profile.data')
+
+ def _tearDownLogFile(self):
+ if self._logFileObserver is not None:
+ log.removeObserver(self._logFileObserver.emit)
+ self._logFileObserver = None
+ if self._logFileObject is not None:
+ self._logFileObject.close()
+ self._logFileObject = None
+
+ def _setUpLogFile(self):
+ self._tearDownLogFile()
+ if self.logfile == '-':
+ logFile = sys.stdout
+ else:
+ logFile = open(self.logfile, 'a')
+ self._logFileObject = logFile
+ self._logFileObserver = log.FileLogObserver(logFile)
+ log.startLoggingWithObserver(self._logFileObserver.emit, 0)
+
+
+ def run(self, test):
+ """
+ Run the test or suite and return a result object.
+ """
+ test = unittest.decorate(test, ITestCase)
+ return self._runWithoutDecoration(test, self._forceGarbageCollection)
+
+
+ def _runWithoutDecoration(self, test, forceGarbageCollection=False):
+ """
+ Private helper that runs the given test but doesn't decorate it.
+ """
+ result = self._makeResult()
+ # decorate the suite with reactor cleanup and log starting
+ # This should move out of the runner and be presumed to be
+ # present
+ suite = TrialSuite([test], forceGarbageCollection)
+ startTime = time.time()
+ if self.mode == self.DRY_RUN:
+ for single in _iterateTests(suite):
+ result.startTest(single)
+ result.addSuccess(single)
+ result.stopTest(single)
+ else:
+ if self.mode == self.DEBUG:
+ run = lambda: self.debugger.runcall(suite.run, result)
+ else:
+ run = lambda: suite.run(result)
+
+ oldDir = self._setUpTestdir()
+ try:
+ self._setUpLogFile()
+ run()
+ finally:
+ self._tearDownLogFile()
+ self._tearDownTestdir(oldDir)
+
+ endTime = time.time()
+ done = getattr(result, 'done', None)
+ if done is None:
+ warnings.warn(
+ "%s should implement done() but doesn't. Falling back to "
+ "printErrors() and friends." % reflect.qual(result.__class__),
+ category=DeprecationWarning, stacklevel=3)
+ result.printErrors()
+ result.writeln(result.separator)
+ result.writeln('Ran %d tests in %.3fs', result.testsRun,
+ endTime - startTime)
+ result.write('\n')
+ result.printSummary()
+ else:
+ result.done()
+ return result
+
+
+ def runUntilFailure(self, test):
+ """
+ Repeatedly run C{test} until it fails.
+ """
+ count = 0
+ while True:
+ count += 1
+ self.stream.write("Test Pass %d\n" % (count,))
+ if count == 1:
+ result = self.run(test)
+ else:
+ result = self._runWithoutDecoration(test)
+ if result.testsRun == 0:
+ break
+ if not result.wasSuccessful():
+ break
+ return result
diff --git a/contrib/python/Twisted/py2/twisted/trial/unittest.py b/contrib/python/Twisted/py2/twisted/trial/unittest.py
new file mode 100644
index 0000000000..d3f60346a6
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/unittest.py
@@ -0,0 +1,35 @@
+# -*- test-case-name: twisted.trial.test -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Things likely to be used by writers of unit tests.
+"""
+
+from __future__ import division, absolute_import
+
+# Define the public API from the two implementation modules
+from twisted.trial._synctest import (
+ FailTest, SkipTest, SynchronousTestCase, PyUnitResultAdapter, Todo,
+ makeTodo)
+from twisted.trial._asynctest import TestCase
+from twisted.trial._asyncrunner import (
+ TestSuite, TestDecorator, decorate)
+
+# Further obscure the origins of these objects, to reduce surprise (and this is
+# what the values were before code got shuffled around between files, but was
+# otherwise unchanged).
+FailTest.__module__ = SkipTest.__module__ = __name__
+
+__all__ = [
+ 'decorate',
+ 'FailTest',
+ 'makeTodo',
+ 'PyUnitResultAdapter',
+ 'SkipTest',
+ 'SynchronousTestCase',
+ 'TestCase',
+ 'TestDecorator',
+ 'TestSuite',
+ 'Todo',
+ ]
diff --git a/contrib/python/Twisted/py2/twisted/trial/util.py b/contrib/python/Twisted/py2/twisted/trial/util.py
new file mode 100644
index 0000000000..4f07112d32
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/util.py
@@ -0,0 +1,411 @@
+# -*- test-case-name: twisted.trial.test.test_util -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+#
+
+"""
+A collection of utility functions and classes, used internally by Trial.
+
+This code is for Trial's internal use. Do NOT use this code if you are writing
+tests. It is subject to change at the Trial maintainer's whim. There is
+nothing here in this module for you to use unless you are maintaining Trial.
+
+Any non-Trial Twisted code that uses this module will be shot.
+
+Maintainer: Jonathan Lange
+
+@var DEFAULT_TIMEOUT_DURATION: The default timeout which will be applied to
+ asynchronous (ie, Deferred-returning) test methods, in seconds.
+"""
+
+from __future__ import division, absolute_import, print_function
+
+from random import randrange
+
+from twisted.internet import defer, utils, interfaces
+from twisted.python.failure import Failure
+from twisted.python.filepath import FilePath
+from twisted.python.lockfile import FilesystemLock
+
+__all__ = [
+ 'DEFAULT_TIMEOUT_DURATION',
+
+ 'excInfoOrFailureToExcInfo', 'suppress', 'acquireAttribute']
+
+DEFAULT_TIMEOUT = object()
+DEFAULT_TIMEOUT_DURATION = 120.0
+
+
+
+class DirtyReactorAggregateError(Exception):
+ """
+ Passed to L{twisted.trial.itrial.IReporter.addError} when the reactor is
+ left in an unclean state after a test.
+
+ @ivar delayedCalls: The L{DelayedCall<twisted.internet.base.DelayedCall>}
+ objects which weren't cleaned up.
+ @ivar selectables: The selectables which weren't cleaned up.
+ """
+
+ def __init__(self, delayedCalls, selectables=None):
+ self.delayedCalls = delayedCalls
+ self.selectables = selectables
+
+
+ def __str__(self):
+ """
+ Return a multi-line message describing all of the unclean state.
+ """
+ msg = "Reactor was unclean."
+ if self.delayedCalls:
+ msg += ("\nDelayedCalls: (set "
+ "twisted.internet.base.DelayedCall.debug = True to "
+ "debug)\n")
+ msg += "\n".join(map(str, self.delayedCalls))
+ if self.selectables:
+ msg += "\nSelectables:\n"
+ msg += "\n".join(map(str, self.selectables))
+ return msg
+
+
+
+class _Janitor(object):
+ """
+ The guy that cleans up after you.
+
+ @ivar test: The L{TestCase} to report errors about.
+ @ivar result: The L{IReporter} to report errors to.
+ @ivar reactor: The reactor to use. If None, the global reactor
+ will be used.
+ """
+ def __init__(self, test, result, reactor=None):
+ """
+ @param test: See L{_Janitor.test}.
+ @param result: See L{_Janitor.result}.
+ @param reactor: See L{_Janitor.reactor}.
+ """
+ self.test = test
+ self.result = result
+ self.reactor = reactor
+
+
+ def postCaseCleanup(self):
+ """
+ Called by L{unittest.TestCase} after a test to catch any logged errors
+ or pending L{DelayedCall<twisted.internet.base.DelayedCall>}s.
+ """
+ calls = self._cleanPending()
+ if calls:
+ aggregate = DirtyReactorAggregateError(calls)
+ self.result.addError(self.test, Failure(aggregate))
+ return False
+ return True
+
+
+ def postClassCleanup(self):
+ """
+ Called by L{unittest.TestCase} after the last test in a C{TestCase}
+ subclass. Ensures the reactor is clean by murdering the threadpool,
+ catching any pending
+ L{DelayedCall<twisted.internet.base.DelayedCall>}s, open sockets etc.
+ """
+ selectables = self._cleanReactor()
+ calls = self._cleanPending()
+ if selectables or calls:
+ aggregate = DirtyReactorAggregateError(calls, selectables)
+ self.result.addError(self.test, Failure(aggregate))
+ self._cleanThreads()
+
+
+ def _getReactor(self):
+ """
+ Get either the passed-in reactor or the global reactor.
+ """
+ if self.reactor is not None:
+ reactor = self.reactor
+ else:
+ from twisted.internet import reactor
+ return reactor
+
+
+ def _cleanPending(self):
+ """
+ Cancel all pending calls and return their string representations.
+ """
+ reactor = self._getReactor()
+
+ # flush short-range timers
+ reactor.iterate(0)
+ reactor.iterate(0)
+
+ delayedCallStrings = []
+ for p in reactor.getDelayedCalls():
+ if p.active():
+ delayedString = str(p)
+ p.cancel()
+ else:
+ print("WEIRDNESS! pending timed call not active!")
+ delayedCallStrings.append(delayedString)
+ return delayedCallStrings
+ _cleanPending = utils.suppressWarnings(
+ _cleanPending, (('ignore',), {'category': DeprecationWarning,
+ 'message':
+ r'reactor\.iterate cannot be used.*'}))
+
+ def _cleanThreads(self):
+ reactor = self._getReactor()
+ if interfaces.IReactorThreads.providedBy(reactor):
+ if reactor.threadpool is not None:
+ # Stop the threadpool now so that a new one is created.
+ # This improves test isolation somewhat (although this is a
+ # post class cleanup hook, so it's only isolating classes
+ # from each other, not methods from each other).
+ reactor._stopThreadPool()
+
+
+ def _cleanReactor(self):
+ """
+ Remove all selectables from the reactor, kill any of them that were
+ processes, and return their string representation.
+ """
+ reactor = self._getReactor()
+ selectableStrings = []
+ for sel in reactor.removeAll():
+ if interfaces.IProcessTransport.providedBy(sel):
+ sel.signalProcess('KILL')
+ selectableStrings.append(repr(sel))
+ return selectableStrings
+
+
+
+_DEFAULT = object()
+def acquireAttribute(objects, attr, default=_DEFAULT):
+ """
+ Go through the list 'objects' sequentially until we find one which has
+ attribute 'attr', then return the value of that attribute. If not found,
+ return 'default' if set, otherwise, raise AttributeError.
+ """
+ for obj in objects:
+ if hasattr(obj, attr):
+ return getattr(obj, attr)
+ if default is not _DEFAULT:
+ return default
+ raise AttributeError('attribute %r not found in %r' % (attr, objects))
+
+
+
+def excInfoOrFailureToExcInfo(err):
+ """
+ Coerce a Failure to an _exc_info, if err is a Failure.
+
+ @param err: Either a tuple such as returned by L{sys.exc_info} or a
+ L{Failure} object.
+ @return: A tuple like the one returned by L{sys.exc_info}. e.g.
+ C{exception_type, exception_object, traceback_object}.
+ """
+ if isinstance(err, Failure):
+ # Unwrap the Failure into an exc_info tuple.
+ err = (err.type, err.value, err.getTracebackObject())
+ return err
+
+
+
+def suppress(action='ignore', **kwarg):
+ """
+ Sets up the .suppress tuple properly, pass options to this method as you
+ would the stdlib warnings.filterwarnings()
+
+ So, to use this with a .suppress magic attribute you would do the
+ following:
+
+ >>> from twisted.trial import unittest, util
+ >>> import warnings
+ >>>
+ >>> class TestFoo(unittest.TestCase):
+ ... def testFooBar(self):
+ ... warnings.warn("i am deprecated", DeprecationWarning)
+ ... testFooBar.suppress = [util.suppress(message='i am deprecated')]
+ ...
+ >>>
+
+ Note that as with the todo and timeout attributes: the module level
+ attribute acts as a default for the class attribute which acts as a default
+ for the method attribute. The suppress attribute can be overridden at any
+ level by specifying C{.suppress = []}
+ """
+ return ((action,), kwarg)
+
+
+
+# This should be deleted, and replaced with twisted.application's code; see
+# #6016:
+def profiled(f, outputFile):
+ def _(*args, **kwargs):
+ import profile
+ prof = profile.Profile()
+ try:
+ result = prof.runcall(f, *args, **kwargs)
+ prof.dump_stats(outputFile)
+ except SystemExit:
+ pass
+ prof.print_stats()
+ return result
+ return _
+
+
+
+@defer.inlineCallbacks
+def _runSequentially(callables, stopOnFirstError=False):
+ """
+ Run the given callables one after the other. If a callable returns a
+ Deferred, wait until it has finished before running the next callable.
+
+ @param callables: An iterable of callables that take no parameters.
+
+ @param stopOnFirstError: If True, then stop running callables as soon as
+ one raises an exception or fires an errback. False by default.
+
+ @return: A L{Deferred} that fires a list of C{(flag, value)} tuples. Each
+ tuple will be either C{(SUCCESS, <return value>)} or C{(FAILURE,
+ <Failure>)}.
+ """
+ results = []
+ for f in callables:
+ d = defer.maybeDeferred(f)
+ try:
+ thing = yield d
+ results.append((defer.SUCCESS, thing))
+ except Exception:
+ results.append((defer.FAILURE, Failure()))
+ if stopOnFirstError:
+ break
+ defer.returnValue(results)
+
+
+
+class _NoTrialMarker(Exception):
+ """
+ No trial marker file could be found.
+
+ Raised when trial attempts to remove a trial temporary working directory
+ that does not contain a marker file.
+ """
+
+
+
+def _removeSafely(path):
+ """
+ Safely remove a path, recursively.
+
+ If C{path} does not contain a node named C{_trial_marker}, a
+ L{_NoTrialMarker} exception is raised and the path is not removed.
+ """
+ if not path.child(b'_trial_marker').exists():
+ raise _NoTrialMarker(
+ '%r is not a trial temporary path, refusing to remove it'
+ % (path,))
+ try:
+ path.remove()
+ except OSError as e:
+ print ("could not remove %r, caught OSError [Errno %s]: %s"
+ % (path, e.errno, e.strerror))
+ try:
+ newPath = FilePath(b'_trial_temp_old' +
+ str(randrange(10000000)).encode("utf-8"))
+ path.moveTo(newPath)
+ except OSError as e:
+ print ("could not rename path, caught OSError [Errno %s]: %s"
+ % (e.errno,e.strerror))
+ raise
+
+
+
+class _WorkingDirectoryBusy(Exception):
+ """
+ A working directory was specified to the runner, but another test run is
+ currently using that directory.
+ """
+
+
+
+def _unusedTestDirectory(base):
+ """
+ Find an unused directory named similarly to C{base}.
+
+ Once a directory is found, it will be locked and a marker dropped into it
+ to identify it as a trial temporary directory.
+
+ @param base: A template path for the discovery process. If this path
+ exactly cannot be used, a path which varies only in a suffix of the
+ basename will be used instead.
+ @type base: L{FilePath}
+
+ @return: A two-tuple. The first element is a L{FilePath} representing the
+ directory which was found and created. The second element is a locked
+ L{FilesystemLock<twisted.python.lockfile.FilesystemLock>}. Another
+ call to C{_unusedTestDirectory} will not be able to reused the
+ same name until the lock is released, either explicitly or by this
+ process exiting.
+ """
+ counter = 0
+ while True:
+ if counter:
+ testdir = base.sibling('%s-%d' % (base.basename(), counter))
+ else:
+ testdir = base
+
+ testDirLock = FilesystemLock(testdir.path + '.lock')
+ if testDirLock.lock():
+ # It is not in use
+ if testdir.exists():
+ # It exists though - delete it
+ _removeSafely(testdir)
+
+ # Create it anew and mark it as ours so the next _removeSafely on
+ # it succeeds.
+ testdir.makedirs()
+ testdir.child(b'_trial_marker').setContent(b'')
+ return testdir, testDirLock
+ else:
+ # It is in use
+ if base.basename() == '_trial_temp':
+ counter += 1
+ else:
+ raise _WorkingDirectoryBusy()
+
+
+
+def _listToPhrase(things, finalDelimiter, delimiter=', '):
+ """
+ Produce a string containing each thing in C{things},
+ separated by a C{delimiter}, with the last couple being separated
+ by C{finalDelimiter}
+
+ @param things: The elements of the resulting phrase
+ @type things: L{list} or L{tuple}
+
+ @param finalDelimiter: What to put between the last two things
+ (typically 'and' or 'or')
+ @type finalDelimiter: L{str}
+
+ @param delimiter: The separator to use between each thing,
+ not including the last two. Should typically include a trailing space.
+ @type delimiter: L{str}
+
+ @return: The resulting phrase
+ @rtype: L{str}
+ """
+ if not isinstance(things, (list, tuple)):
+ raise TypeError("Things must be a list or a tuple")
+ if not things:
+ return ''
+ if len(things) == 1:
+ return str(things[0])
+ if len(things) == 2:
+ return "%s %s %s" % (str(things[0]), finalDelimiter, str(things[1]))
+ else:
+ strThings = []
+ for thing in things:
+ strThings.append(str(thing))
+ return "%s%s%s %s" % (delimiter.join(strThings[:-1]),
+ delimiter, finalDelimiter, strThings[-1])