diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/trial | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/trial')
19 files changed, 6300 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/trial/__init__.py b/contrib/python/Twisted/py2/twisted/trial/__init__.py new file mode 100644 index 0000000000..5faaa99ddd --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/__init__.py @@ -0,0 +1,50 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. +# +# Maintainer: Jonathan Lange + +""" +Twisted Trial: Asynchronous unit testing framework. + +Trial extends Python's builtin C{unittest} to provide support for asynchronous +tests. + +Trial strives to be compatible with other Python xUnit testing frameworks. +"Compatibility" is a difficult things to define. In practice, it means that: + + - L{twisted.trial.unittest.TestCase} objects should be able to be used by + other test runners without those runners requiring special support for + Trial tests. + + - Tests that subclass the standard library C{TestCase} and don't do anything + "too weird" should be able to be discoverable and runnable by the Trial + test runner without the authors of those tests having to jump through + hoops. + + - Tests that implement the interface provided by the standard library + C{TestCase} should be runnable by the Trial runner. + + - The Trial test runner and Trial L{unittest.TestCase} objects ought to be + able to use standard library C{TestResult} objects, and third party + C{TestResult} objects based on the standard library. + +This list is not necessarily exhaustive -- compatibility is hard to define. +Contributors who discover more helpful ways of defining compatibility are +encouraged to update this document. + + +Examples: + +B{Timeouts} for tests should be implemented in the runner. If this is done, +then timeouts could work for third-party TestCase objects as well as for +L{twisted.trial.unittest.TestCase} objects. Further, Twisted C{TestCase} +objects will run in other runners without timing out. +See U{http://twistedmatrix.com/trac/ticket/2675}. + +Running tests in a temporary directory should be a feature of the test case, +because often tests themselves rely on this behaviour. If the feature is +implemented in the runner, then tests will change behaviour (possibly +breaking) when run in a different test runner. Further, many tests don't even +care about the filesystem. +See U{http://twistedmatrix.com/trac/ticket/2916}. +""" diff --git a/contrib/python/Twisted/py2/twisted/trial/__main__.py b/contrib/python/Twisted/py2/twisted/trial/__main__.py new file mode 100644 index 0000000000..fe5534efd4 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/__main__.py @@ -0,0 +1,10 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +if __name__ == '__main__': + from pkg_resources import load_entry_point + import sys + + sys.exit( + load_entry_point('Twisted', 'console_scripts', 'trial')() + ) diff --git a/contrib/python/Twisted/py2/twisted/trial/_asyncrunner.py b/contrib/python/Twisted/py2/twisted/trial/_asyncrunner.py new file mode 100644 index 0000000000..15e4a1add5 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_asyncrunner.py @@ -0,0 +1,185 @@ +# -*- test-case-name: twisted.trial.test -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Infrastructure for test running and suites. +""" + +from __future__ import division, absolute_import + +import doctest +import gc + +from twisted.python import components + +from twisted.trial import itrial, reporter +from twisted.trial._synctest import _logObserver + +pyunit = __import__('unittest') + +from zope.interface import implementer + + + +class TestSuite(pyunit.TestSuite): + """ + Extend the standard library's C{TestSuite} with a consistently overrideable + C{run} method. + """ + + def run(self, result): + """ + Call C{run} on every member of the suite. + """ + for test in self._tests: + if result.shouldStop: + break + test(result) + return result + + + +@implementer(itrial.ITestCase) +class TestDecorator(components.proxyForInterface(itrial.ITestCase, + "_originalTest")): + """ + Decorator for test cases. + + @param _originalTest: The wrapped instance of test. + @type _originalTest: A provider of L{itrial.ITestCase} + """ + + def __call__(self, result): + """ + Run the unit test. + + @param result: A TestResult object. + """ + return self.run(result) + + + def run(self, result): + """ + Run the unit test. + + @param result: A TestResult object. + """ + return self._originalTest.run( + reporter._AdaptedReporter(result, self.__class__)) + + + +def _clearSuite(suite): + """ + Clear all tests from C{suite}. + + This messes with the internals of C{suite}. In particular, it assumes that + the suite keeps all of its tests in a list in an instance variable called + C{_tests}. + """ + suite._tests = [] + + + +def decorate(test, decorator): + """ + Decorate all test cases in C{test} with C{decorator}. + + C{test} can be a test case or a test suite. If it is a test suite, then the + structure of the suite is preserved. + + L{decorate} tries to preserve the class of the test suites it finds, but + assumes the presence of the C{_tests} attribute on the suite. + + @param test: The C{TestCase} or C{TestSuite} to decorate. + + @param decorator: A unary callable used to decorate C{TestCase}s. + + @return: A decorated C{TestCase} or a C{TestSuite} containing decorated + C{TestCase}s. + """ + + try: + tests = iter(test) + except TypeError: + return decorator(test) + + # At this point, we know that 'test' is a test suite. + _clearSuite(test) + + for case in tests: + test.addTest(decorate(case, decorator)) + return test + + + +class _PyUnitTestCaseAdapter(TestDecorator): + """ + Adapt from pyunit.TestCase to ITestCase. + """ + + + +class _BrokenIDTestCaseAdapter(_PyUnitTestCaseAdapter): + """ + Adapter for pyunit-style C{TestCase} subclasses that have undesirable id() + methods. That is C{unittest.FunctionTestCase} and C{unittest.DocTestCase}. + """ + + def id(self): + """ + Return the fully-qualified Python name of the doctest. + """ + testID = self._originalTest.shortDescription() + if testID is not None: + return testID + return self._originalTest.id() + + + +class _ForceGarbageCollectionDecorator(TestDecorator): + """ + Forces garbage collection to be run before and after the test. Any errors + logged during the post-test collection are added to the test result as + errors. + """ + + def run(self, result): + gc.collect() + TestDecorator.run(self, result) + _logObserver._add() + gc.collect() + for error in _logObserver.getErrors(): + result.addError(self, error) + _logObserver.flushErrors() + _logObserver._remove() + + +components.registerAdapter( + _PyUnitTestCaseAdapter, pyunit.TestCase, itrial.ITestCase) + + +components.registerAdapter( + _BrokenIDTestCaseAdapter, pyunit.FunctionTestCase, itrial.ITestCase) + + +_docTestCase = getattr(doctest, 'DocTestCase', None) +if _docTestCase: + components.registerAdapter( + _BrokenIDTestCaseAdapter, _docTestCase, itrial.ITestCase) + + + +def _iterateTests(testSuiteOrCase): + """ + Iterate through all of the test cases in C{testSuiteOrCase}. + """ + try: + suite = iter(testSuiteOrCase) + except TypeError: + yield testSuiteOrCase + else: + for test in suite: + for subtest in _iterateTests(test): + yield subtest diff --git a/contrib/python/Twisted/py2/twisted/trial/_asynctest.py b/contrib/python/Twisted/py2/twisted/trial/_asynctest.py new file mode 100644 index 0000000000..c7a7090cf3 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_asynctest.py @@ -0,0 +1,405 @@ +# -*- test-case-name: twisted.trial.test -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Things likely to be used by writers of unit tests. + +Maintainer: Jonathan Lange +""" + +from __future__ import division, absolute_import + +import inspect +import warnings + +from zope.interface import implementer + +# We can't import reactor at module-level because this code runs before trial +# installs a user-specified reactor, installing the default reactor and +# breaking reactor installation. See also #6047. +from twisted.internet import defer, utils +from twisted.python import failure + +from twisted.trial import itrial, util +from twisted.trial._synctest import ( + FailTest, SkipTest, SynchronousTestCase) + +_wait_is_running = [] + +@implementer(itrial.ITestCase) +class TestCase(SynchronousTestCase): + """ + A unit test. The atom of the unit testing universe. + + This class extends L{SynchronousTestCase} which extends C{unittest.TestCase} + from the standard library. The main feature is the ability to return + C{Deferred}s from tests and fixture methods and to have the suite wait for + those C{Deferred}s to fire. Also provides new assertions such as + L{assertFailure}. + + @ivar timeout: A real number of seconds. If set, the test will + raise an error if it takes longer than C{timeout} seconds. + If not set, util.DEFAULT_TIMEOUT_DURATION is used. + """ + + def __init__(self, methodName='runTest'): + """ + Construct an asynchronous test case for C{methodName}. + + @param methodName: The name of a method on C{self}. This method should + be a unit test. That is, it should be a short method that calls some of + the assert* methods. If C{methodName} is unspecified, + L{SynchronousTestCase.runTest} will be used as the test method. This is + mostly useful for testing Trial. + """ + super(TestCase, self).__init__(methodName) + + + def assertFailure(self, deferred, *expectedFailures): + """ + Fail if C{deferred} does not errback with one of C{expectedFailures}. + Returns the original Deferred with callbacks added. You will need + to return this Deferred from your test case. + """ + def _cb(ignore): + raise self.failureException( + "did not catch an error, instead got %r" % (ignore,)) + + def _eb(failure): + if failure.check(*expectedFailures): + return failure.value + else: + output = ('\nExpected: %r\nGot:\n%s' + % (expectedFailures, str(failure))) + raise self.failureException(output) + return deferred.addCallbacks(_cb, _eb) + failUnlessFailure = assertFailure + + + def _run(self, methodName, result): + from twisted.internet import reactor + timeout = self.getTimeout() + def onTimeout(d): + e = defer.TimeoutError("%r (%s) still running at %s secs" + % (self, methodName, timeout)) + f = failure.Failure(e) + # try to errback the deferred that the test returns (for no gorram + # reason) (see issue1005 and test_errorPropagation in + # test_deferred) + try: + d.errback(f) + except defer.AlreadyCalledError: + # if the deferred has been called already but the *back chain + # is still unfinished, crash the reactor and report timeout + # error ourself. + reactor.crash() + self._timedOut = True # see self._wait + todo = self.getTodo() + if todo is not None and todo.expected(f): + result.addExpectedFailure(self, f, todo) + else: + result.addError(self, f) + onTimeout = utils.suppressWarnings( + onTimeout, util.suppress(category=DeprecationWarning)) + method = getattr(self, methodName) + if inspect.isgeneratorfunction(method): + exc = TypeError( + '%r is a generator function and therefore will never run' % ( + method,)) + return defer.fail(exc) + d = defer.maybeDeferred( + utils.runWithWarningsSuppressed, self._getSuppress(), method) + call = reactor.callLater(timeout, onTimeout, d) + d.addBoth(lambda x : call.active() and call.cancel() or x) + return d + + + def __call__(self, *args, **kwargs): + return self.run(*args, **kwargs) + + + def deferSetUp(self, ignored, result): + d = self._run('setUp', result) + d.addCallbacks(self.deferTestMethod, self._ebDeferSetUp, + callbackArgs=(result,), + errbackArgs=(result,)) + return d + + + def _ebDeferSetUp(self, failure, result): + if failure.check(SkipTest): + result.addSkip(self, self._getSkipReason(self.setUp, failure.value)) + else: + result.addError(self, failure) + if failure.check(KeyboardInterrupt): + result.stop() + return self.deferRunCleanups(None, result) + + + def deferTestMethod(self, ignored, result): + d = self._run(self._testMethodName, result) + d.addCallbacks(self._cbDeferTestMethod, self._ebDeferTestMethod, + callbackArgs=(result,), + errbackArgs=(result,)) + d.addBoth(self.deferRunCleanups, result) + d.addBoth(self.deferTearDown, result) + return d + + + def _cbDeferTestMethod(self, ignored, result): + if self.getTodo() is not None: + result.addUnexpectedSuccess(self, self.getTodo()) + else: + self._passed = True + return ignored + + + def _ebDeferTestMethod(self, f, result): + todo = self.getTodo() + if todo is not None and todo.expected(f): + result.addExpectedFailure(self, f, todo) + elif f.check(self.failureException, FailTest): + result.addFailure(self, f) + elif f.check(KeyboardInterrupt): + result.addError(self, f) + result.stop() + elif f.check(SkipTest): + result.addSkip( + self, + self._getSkipReason(getattr(self, self._testMethodName), f.value)) + else: + result.addError(self, f) + + + def deferTearDown(self, ignored, result): + d = self._run('tearDown', result) + d.addErrback(self._ebDeferTearDown, result) + return d + + + def _ebDeferTearDown(self, failure, result): + result.addError(self, failure) + if failure.check(KeyboardInterrupt): + result.stop() + self._passed = False + + + def deferRunCleanups(self, ignored, result): + """ + Run any scheduled cleanups and report errors (if any to the result + object. + """ + d = self._runCleanups() + d.addCallback(self._cbDeferRunCleanups, result) + return d + + + def _cbDeferRunCleanups(self, cleanupResults, result): + for flag, testFailure in cleanupResults: + if flag == defer.FAILURE: + result.addError(self, testFailure) + if testFailure.check(KeyboardInterrupt): + result.stop() + self._passed = False + + + def _cleanUp(self, result): + try: + clean = util._Janitor(self, result).postCaseCleanup() + if not clean: + self._passed = False + except: + result.addError(self, failure.Failure()) + self._passed = False + for error in self._observer.getErrors(): + result.addError(self, error) + self._passed = False + self.flushLoggedErrors() + self._removeObserver() + if self._passed: + result.addSuccess(self) + + + def _classCleanUp(self, result): + try: + util._Janitor(self, result).postClassCleanup() + except: + result.addError(self, failure.Failure()) + + + def _makeReactorMethod(self, name): + """ + Create a method which wraps the reactor method C{name}. The new + method issues a deprecation warning and calls the original. + """ + def _(*a, **kw): + warnings.warn("reactor.%s cannot be used inside unit tests. " + "In the future, using %s will fail the test and may " + "crash or hang the test run." + % (name, name), + stacklevel=2, category=DeprecationWarning) + return self._reactorMethods[name](*a, **kw) + return _ + + + def _deprecateReactor(self, reactor): + """ + Deprecate C{iterate}, C{crash} and C{stop} on C{reactor}. That is, + each method is wrapped in a function that issues a deprecation + warning, then calls the original. + + @param reactor: The Twisted reactor. + """ + self._reactorMethods = {} + for name in ['crash', 'iterate', 'stop']: + self._reactorMethods[name] = getattr(reactor, name) + setattr(reactor, name, self._makeReactorMethod(name)) + + + def _undeprecateReactor(self, reactor): + """ + Restore the deprecated reactor methods. Undoes what + L{_deprecateReactor} did. + + @param reactor: The Twisted reactor. + """ + for name, method in self._reactorMethods.items(): + setattr(reactor, name, method) + self._reactorMethods = {} + + + def _runCleanups(self): + """ + Run the cleanups added with L{addCleanup} in order. + + @return: A C{Deferred} that fires when all cleanups are run. + """ + def _makeFunction(f, args, kwargs): + return lambda: f(*args, **kwargs) + callables = [] + while len(self._cleanups) > 0: + f, args, kwargs = self._cleanups.pop() + callables.append(_makeFunction(f, args, kwargs)) + return util._runSequentially(callables) + + + def _runFixturesAndTest(self, result): + """ + Really run C{setUp}, the test method, and C{tearDown}. Any of these may + return L{defer.Deferred}s. After they complete, do some reactor cleanup. + + @param result: A L{TestResult} object. + """ + from twisted.internet import reactor + self._deprecateReactor(reactor) + self._timedOut = False + try: + d = self.deferSetUp(None, result) + try: + self._wait(d) + finally: + self._cleanUp(result) + self._classCleanUp(result) + finally: + self._undeprecateReactor(reactor) + + + def addCleanup(self, f, *args, **kwargs): + """ + Extend the base cleanup feature with support for cleanup functions which + return Deferreds. + + If the function C{f} returns a Deferred, C{TestCase} will wait until the + Deferred has fired before proceeding to the next function. + """ + return super(TestCase, self).addCleanup(f, *args, **kwargs) + + + def getSuppress(self): + return self._getSuppress() + + + def getTimeout(self): + """ + Returns the timeout value set on this test. Checks on the instance + first, then the class, then the module, then packages. As soon as it + finds something with a C{timeout} attribute, returns that. Returns + L{util.DEFAULT_TIMEOUT_DURATION} if it cannot find anything. See + L{TestCase} docstring for more details. + """ + timeout = util.acquireAttribute(self._parents, 'timeout', + util.DEFAULT_TIMEOUT_DURATION) + try: + return float(timeout) + except (ValueError, TypeError): + # XXX -- this is here because sometimes people will have methods + # called 'timeout', or set timeout to 'orange', or something + # Particularly, test_news.NewsTestCase and ReactorCoreTestCase + # both do this. + warnings.warn("'timeout' attribute needs to be a number.", + category=DeprecationWarning) + return util.DEFAULT_TIMEOUT_DURATION + + + def _wait(self, d, running=_wait_is_running): + """Take a Deferred that only ever callbacks. Block until it happens. + """ + if running: + raise RuntimeError("_wait is not reentrant") + + from twisted.internet import reactor + results = [] + def append(any): + if results is not None: + results.append(any) + def crash(ign): + if results is not None: + reactor.crash() + crash = utils.suppressWarnings( + crash, util.suppress(message=r'reactor\.crash cannot be used.*', + category=DeprecationWarning)) + def stop(): + reactor.crash() + stop = utils.suppressWarnings( + stop, util.suppress(message=r'reactor\.crash cannot be used.*', + category=DeprecationWarning)) + + running.append(None) + try: + d.addBoth(append) + if results: + # d might have already been fired, in which case append is + # called synchronously. Avoid any reactor stuff. + return + d.addBoth(crash) + reactor.stop = stop + try: + reactor.run() + finally: + del reactor.stop + + # If the reactor was crashed elsewhere due to a timeout, hopefully + # that crasher also reported an error. Just return. + # _timedOut is most likely to be set when d has fired but hasn't + # completed its callback chain (see self._run) + if results or self._timedOut: #defined in run() and _run() + return + + # If the timeout didn't happen, and we didn't get a result or + # a failure, then the user probably aborted the test, so let's + # just raise KeyboardInterrupt. + + # FIXME: imagine this: + # web/test/test_webclient.py: + # exc = self.assertRaises(error.Error, wait, method(url)) + # + # wait() will raise KeyboardInterrupt, and assertRaises will + # swallow it. Therefore, wait() raising KeyboardInterrupt is + # insufficient to stop trial. A suggested solution is to have + # this code set a "stop trial" flag, or otherwise notify trial + # that it should really try to stop as soon as possible. + raise KeyboardInterrupt() + finally: + results = None + running.pop() diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/__init__.py b/contrib/python/Twisted/py2/twisted/trial/_dist/__init__.py new file mode 100644 index 0000000000..502e840fef --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/__init__.py @@ -0,0 +1,47 @@ +# -*- test-case-name: twisted.trial._dist.test -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +This package implements the distributed Trial test runner: + + - The L{twisted.trial._dist.disttrial} module implements a test runner which + runs in a manager process and can launch additional worker processes in + which to run tests and gather up results from all of them. + + - The L{twisted.trial._dist.options} module defines command line options used + to configure the distributed test runner. + + - The L{twisted.trial._dist.managercommands} module defines AMP commands + which are sent from worker processes back to the manager process to report + the results of tests. + + - The L{twisted.trial._dist.workercommands} module defines AMP commands which + are sent from the manager process to the worker processes to control the + execution of tests there. + + - The L{twisted.trial._dist.distreporter} module defines a proxy for + L{twisted.trial.itrial.IReporter} which enforces the typical requirement + that results be passed to a reporter for only one test at a time, allowing + any reporter to be used with despite disttrial's simultaneously running + tests. + + - The L{twisted.trial._dist.workerreporter} module implements a + L{twisted.trial.itrial.IReporter} which is used by worker processes and + reports results back to the manager process using AMP commands. + + - The L{twisted.trial._dist.workertrial} module is a runnable script which is + the main point for worker processes. + + - The L{twisted.trial._dist.worker} process defines the manager's AMP + protocol for accepting results from worker processes and a process protocol + for use running workers as local child processes (as opposed to + distributing them to another host). + +@since: 12.3 +""" + +# File descriptors numbers used to set up pipes with the worker. +_WORKER_AMP_STDIN = 3 + +_WORKER_AMP_STDOUT = 4 diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/distreporter.py b/contrib/python/Twisted/py2/twisted/trial/_dist/distreporter.py new file mode 100644 index 0000000000..6648b7a0aa --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/distreporter.py @@ -0,0 +1,93 @@ +# -*- test-case-name: twisted.trial._dist.test.test_distreporter -*- +# +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +The reporter is not made to support concurrent test running, so we will +hold test results in here and only send them to the reporter once the +test is over. + +@since: 12.3 +""" + +from zope.interface import implementer +from twisted.trial.itrial import IReporter +from twisted.python.components import proxyForInterface + + + +@implementer(IReporter) +class DistReporter(proxyForInterface(IReporter)): + """ + See module docstring. + """ + + def __init__(self, original): + super(DistReporter, self).__init__(original) + self.running = {} + + + def startTest(self, test): + """ + Queue test starting. + """ + self.running[test.id()] = [] + self.running[test.id()].append((self.original.startTest, test)) + + + def addFailure(self, test, fail): + """ + Queue adding a failure. + """ + self.running[test.id()].append((self.original.addFailure, + test, fail)) + + + def addError(self, test, error): + """ + Queue error adding. + """ + self.running[test.id()].append((self.original.addError, + test, error)) + + + def addSkip(self, test, reason): + """ + Queue adding a skip. + """ + self.running[test.id()].append((self.original.addSkip, + test, reason)) + + + def addUnexpectedSuccess(self, test, todo=None): + """ + Queue adding an unexpected success. + """ + self.running[test.id()].append((self.original.addUnexpectedSuccess, + test, todo)) + + + def addExpectedFailure(self, test, error, todo=None): + """ + Queue adding an unexpected failure. + """ + self.running[test.id()].append((self.original.addExpectedFailure, + test, error, todo)) + + + def addSuccess(self, test): + """ + Queue adding a success. + """ + self.running[test.id()].append((self.original.addSuccess, test)) + + + def stopTest(self, test): + """ + Queue stopping the test, then unroll the queue. + """ + self.running[test.id()].append((self.original.stopTest, test)) + for step in self.running[test.id()]: + step[0](*step[1:]) + del self.running[test.id()] diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py b/contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py new file mode 100644 index 0000000000..96875b4a85 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py @@ -0,0 +1,258 @@ +# -*- test-case-name: twisted.trial._dist.test.test_disttrial -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +This module contains the trial distributed runner, the management class +responsible for coordinating all of trial's behavior at the highest level. + +@since: 12.3 +""" + +import os +import sys + +from twisted.python.filepath import FilePath +from twisted.python.modules import theSystemPath +from twisted.internet.defer import DeferredList +from twisted.internet.task import cooperate + +from twisted.trial.util import _unusedTestDirectory +from twisted.trial._asyncrunner import _iterateTests +from twisted.trial._dist.worker import LocalWorker, LocalWorkerAMP +from twisted.trial._dist.distreporter import DistReporter +from twisted.trial.reporter import UncleanWarningsReporterWrapper +from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT + + + +class DistTrialRunner(object): + """ + A specialized runner for distributed trial. The runner launches a number of + local worker processes which will run tests. + + @ivar _workerNumber: the number of workers to be spawned. + @type _workerNumber: C{int} + + @ivar _stream: stream which the reporter will use. + + @ivar _reporterFactory: the reporter class to be used. + """ + _distReporterFactory = DistReporter + + def _makeResult(self): + """ + Make reporter factory, and wrap it with a L{DistReporter}. + """ + reporter = self._reporterFactory(self._stream, self._tbformat, + realtime=self._rterrors) + if self._uncleanWarnings: + reporter = UncleanWarningsReporterWrapper(reporter) + return self._distReporterFactory(reporter) + + + def __init__(self, reporterFactory, workerNumber, workerArguments, + stream=None, + tracebackFormat='default', + realTimeErrors=False, + uncleanWarnings=False, + logfile='test.log', + workingDirectory='_trial_temp'): + self._workerNumber = workerNumber + self._workerArguments = workerArguments + self._reporterFactory = reporterFactory + if stream is None: + stream = sys.stdout + self._stream = stream + self._tbformat = tracebackFormat + self._rterrors = realTimeErrors + self._uncleanWarnings = uncleanWarnings + self._result = None + self._workingDirectory = workingDirectory + self._logFile = logfile + self._logFileObserver = None + self._logFileObject = None + self._logWarnings = False + + + def writeResults(self, result): + """ + Write test run final outcome to result. + + @param result: A C{TestResult} which will print errors and the summary. + """ + result.done() + + + def createLocalWorkers(self, protocols, workingDirectory): + """ + Create local worker protocol instances and return them. + + @param protocols: An iterable of L{LocalWorkerAMP} instances. + + @param workingDirectory: The base path in which we should run the + workers. + @type workingDirectory: C{str} + + @return: A list of C{quantity} C{LocalWorker} instances. + """ + return [LocalWorker(protocol, + os.path.join(workingDirectory, str(x)), + self._logFile) + for x, protocol in enumerate(protocols)] + + + def launchWorkerProcesses(self, spawner, protocols, arguments): + """ + Spawn processes from a list of process protocols. + + @param spawner: A C{IReactorProcess.spawnProcess} implementation. + + @param protocols: An iterable of C{ProcessProtocol} instances. + + @param arguments: Extra arguments passed to the processes. + """ + workertrialPath = theSystemPath[ + 'twisted.trial._dist.workertrial'].filePath.path + childFDs = {0: 'w', 1: 'r', 2: 'r', _WORKER_AMP_STDIN: 'w', + _WORKER_AMP_STDOUT: 'r'} + environ = os.environ.copy() + # Add an environment variable containing the raw sys.path, to be used by + # subprocesses to make sure it's identical to the parent. See + # workertrial._setupPath. + environ['TRIAL_PYTHONPATH'] = os.pathsep.join(sys.path) + for worker in protocols: + args = [sys.executable, workertrialPath] + args.extend(arguments) + spawner(worker, sys.executable, args=args, childFDs=childFDs, + env=environ) + + + def _driveWorker(self, worker, result, testCases, cooperate): + """ + Drive a L{LocalWorkerAMP} instance, iterating the tests and calling + C{run} for every one of them. + + @param worker: The L{LocalWorkerAMP} to drive. + + @param result: The global L{DistReporter} instance. + + @param testCases: The global list of tests to iterate. + + @param cooperate: The cooperate function to use, to be customized in + tests. + @type cooperate: C{function} + + @return: A C{Deferred} firing when all the tests are finished. + """ + + def resultErrback(error, case): + result.original.addFailure(case, error) + return error + + def task(case): + d = worker.run(case, result) + d.addErrback(resultErrback, case) + return d + + return cooperate(task(case) for case in testCases).whenDone() + + + def run(self, suite, reactor=None, cooperate=cooperate, + untilFailure=False): + """ + Spawn local worker processes and load tests. After that, run them. + + @param suite: A tests suite to be run. + + @param reactor: The reactor to use, to be customized in tests. + @type reactor: A provider of + L{twisted.internet.interfaces.IReactorProcess} + + @param cooperate: The cooperate function to use, to be customized in + tests. + @type cooperate: C{function} + + @param untilFailure: If C{True}, continue to run the tests until they + fail. + @type untilFailure: C{bool}. + + @return: The test result. + @rtype: L{DistReporter} + """ + if reactor is None: + from twisted.internet import reactor + result = self._makeResult() + count = suite.countTestCases() + self._stream.write("Running %d tests.\n" % (count,)) + + if not count: + # Take a shortcut if there is no test + suite.run(result.original) + self.writeResults(result) + return result + + testDir, testDirLock = _unusedTestDirectory( + FilePath(self._workingDirectory)) + workerNumber = min(count, self._workerNumber) + ampWorkers = [LocalWorkerAMP() for x in range(workerNumber)] + workers = self.createLocalWorkers(ampWorkers, testDir.path) + processEndDeferreds = [worker.endDeferred for worker in workers] + self.launchWorkerProcesses(reactor.spawnProcess, workers, + self._workerArguments) + + def runTests(): + testCases = iter(list(_iterateTests(suite))) + + workerDeferreds = [] + for worker in ampWorkers: + workerDeferreds.append( + self._driveWorker(worker, result, testCases, + cooperate=cooperate)) + return DeferredList(workerDeferreds, consumeErrors=True, + fireOnOneErrback=True) + + stopping = [] + + def nextRun(ign): + self.writeResults(result) + if not untilFailure: + return + if not result.wasSuccessful(): + return + d = runTests() + return d.addCallback(nextRun) + + def stop(ign): + testDirLock.unlock() + if not stopping: + stopping.append(None) + reactor.stop() + + def beforeShutDown(): + if not stopping: + stopping.append(None) + d = DeferredList(processEndDeferreds, consumeErrors=True) + return d.addCallback(continueShutdown) + + def continueShutdown(ign): + self.writeResults(result) + return ign + + d = runTests() + d.addCallback(nextRun) + d.addBoth(stop) + + reactor.addSystemEventTrigger('before', 'shutdown', beforeShutDown) + reactor.run() + + return result + + + def runUntilFailure(self, suite): + """ + Run the tests with local worker processes until they fail. + + @param suite: A tests suite to be run. + """ + return self.run(suite, untilFailure=True) diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/managercommands.py b/contrib/python/Twisted/py2/twisted/trial/_dist/managercommands.py new file mode 100644 index 0000000000..4e76b81ddf --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/managercommands.py @@ -0,0 +1,86 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Commands for reporting test success of failure to the manager. + +@since: 12.3 +""" + +from twisted.protocols.amp import Command, String, Boolean, ListOf, Unicode +from twisted.python.compat import _PY3 + +NativeString = Unicode if _PY3 else String + + + +class AddSuccess(Command): + """ + Add a success. + """ + arguments = [(b'testName', NativeString())] + response = [(b'success', Boolean())] + + + +class AddError(Command): + """ + Add an error. + """ + arguments = [(b'testName', NativeString()), + (b'error', NativeString()), + (b'errorClass', NativeString()), + (b'frames', ListOf(NativeString()))] + response = [(b'success', Boolean())] + + + +class AddFailure(Command): + """ + Add a failure. + """ + arguments = [(b'testName', NativeString()), + (b'fail', NativeString()), + (b'failClass', NativeString()), + (b'frames', ListOf(NativeString()))] + response = [(b'success', Boolean())] + + + +class AddSkip(Command): + """ + Add a skip. + """ + arguments = [(b'testName', NativeString()), + (b'reason', NativeString())] + response = [(b'success', Boolean())] + + + +class AddExpectedFailure(Command): + """ + Add an expected failure. + """ + arguments = [(b'testName', NativeString()), + (b'error', NativeString()), + (b'todo', NativeString())] + response = [(b'success', Boolean())] + + + +class AddUnexpectedSuccess(Command): + """ + Add an unexpected success. + """ + arguments = [(b'testName', NativeString()), + (b'todo', NativeString())] + response = [(b'success', Boolean())] + + + +class TestWrite(Command): + """ + Write test log. + """ + arguments = [(b'out', NativeString())] + response = [(b'success', Boolean())] diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/options.py b/contrib/python/Twisted/py2/twisted/trial/_dist/options.py new file mode 100644 index 0000000000..ee5ccd887a --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/options.py @@ -0,0 +1,30 @@ +# -*- test-case-name: twisted.trial._dist.test.test_options -*- +# +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Options handling specific to trial's workers. + +@since: 12.3 +""" + +from twisted.python.filepath import FilePath +from twisted.python.usage import Options +from twisted.scripts.trial import _BasicOptions +from twisted.application.app import ReactorSelectionMixin + + + +class WorkerOptions(_BasicOptions, Options, ReactorSelectionMixin): + """ + Options forwarded to the trial distributed worker. + """ + + + def coverdir(self): + """ + Return a L{FilePath} representing the directory into which coverage + results should be written. + """ + return FilePath('coverage') diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/worker.py b/contrib/python/Twisted/py2/twisted/trial/_dist/worker.py new file mode 100644 index 0000000000..ef13c069d6 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/worker.py @@ -0,0 +1,333 @@ +# -*- test-case-name: twisted.trial._dist.test.test_worker -*- +# +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +This module implements the worker classes. + +@since: 12.3 +""" + +import os + +from zope.interface import implementer + +from twisted.internet.protocol import ProcessProtocol +from twisted.internet.interfaces import ITransport, IAddress +from twisted.internet.defer import Deferred +from twisted.protocols.amp import AMP +from twisted.python.failure import Failure +from twisted.python.reflect import namedObject +from twisted.trial.unittest import Todo +from twisted.trial.runner import TrialSuite, TestLoader +from twisted.trial._dist import workercommands, managercommands +from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT +from twisted.trial._dist.workerreporter import WorkerReporter + + + +class WorkerProtocol(AMP): + """ + The worker-side trial distributed protocol. + """ + + def __init__(self, forceGarbageCollection=False): + self._loader = TestLoader() + self._result = WorkerReporter(self) + self._forceGarbageCollection = forceGarbageCollection + + + def run(self, testCase): + """ + Run a test case by name. + """ + case = self._loader.loadByName(testCase) + suite = TrialSuite([case], self._forceGarbageCollection) + suite.run(self._result) + return {'success': True} + + workercommands.Run.responder(run) + + + def start(self, directory): + """ + Set up the worker, moving into given directory for tests to run in + them. + """ + os.chdir(directory) + return {'success': True} + + workercommands.Start.responder(start) + + + +class LocalWorkerAMP(AMP): + """ + Local implementation of the manager commands. + """ + + def addSuccess(self, testName): + """ + Add a success to the reporter. + """ + self._result.addSuccess(self._testCase) + return {'success': True} + + managercommands.AddSuccess.responder(addSuccess) + + + def _buildFailure(self, error, errorClass, frames): + """ + Helper to build a C{Failure} with some traceback. + + @param error: An C{Exception} instance. + + @param error: The class name of the C{error} class. + + @param frames: A flat list of strings representing the information need + to approximatively rebuild C{Failure} frames. + + @return: A L{Failure} instance with enough information about a test + error. + """ + errorType = namedObject(errorClass) + failure = Failure(error, errorType) + for i in range(0, len(frames), 3): + failure.frames.append( + (frames[i], frames[i + 1], int(frames[i + 2]), [], [])) + return failure + + + def addError(self, testName, error, errorClass, frames): + """ + Add an error to the reporter. + """ + failure = self._buildFailure(error, errorClass, frames) + self._result.addError(self._testCase, failure) + return {'success': True} + + managercommands.AddError.responder(addError) + + + def addFailure(self, testName, fail, failClass, frames): + """ + Add a failure to the reporter. + """ + failure = self._buildFailure(fail, failClass, frames) + self._result.addFailure(self._testCase, failure) + return {'success': True} + + managercommands.AddFailure.responder(addFailure) + + + def addSkip(self, testName, reason): + """ + Add a skip to the reporter. + """ + self._result.addSkip(self._testCase, reason) + return {'success': True} + + managercommands.AddSkip.responder(addSkip) + + + def addExpectedFailure(self, testName, error, todo): + """ + Add an expected failure to the reporter. + """ + _todo = Todo(todo) + self._result.addExpectedFailure(self._testCase, error, _todo) + return {'success': True} + + managercommands.AddExpectedFailure.responder(addExpectedFailure) + + + def addUnexpectedSuccess(self, testName, todo): + """ + Add an unexpected success to the reporter. + """ + self._result.addUnexpectedSuccess(self._testCase, todo) + return {'success': True} + + managercommands.AddUnexpectedSuccess.responder(addUnexpectedSuccess) + + + def testWrite(self, out): + """ + Print test output from the worker. + """ + self._testStream.write(out + '\n') + self._testStream.flush() + return {'success': True} + + managercommands.TestWrite.responder(testWrite) + + + def _stopTest(self, result): + """ + Stop the current running test case, forwarding the result. + """ + self._result.stopTest(self._testCase) + return result + + + def run(self, testCase, result): + """ + Run a test. + """ + self._testCase = testCase + self._result = result + self._result.startTest(testCase) + testCaseId = testCase.id() + d = self.callRemote(workercommands.Run, testCase=testCaseId) + return d.addCallback(self._stopTest) + + + def setTestStream(self, stream): + """ + Set the stream used to log output from tests. + """ + self._testStream = stream + + + +@implementer(IAddress) +class LocalWorkerAddress(object): + """ + A L{IAddress} implementation meant to provide stub addresses for + L{ITransport.getPeer} and L{ITransport.getHost}. + """ + + + +@implementer(ITransport) +class LocalWorkerTransport(object): + """ + A stub transport implementation used to support L{AMP} over a + L{ProcessProtocol} transport. + """ + + def __init__(self, transport): + self._transport = transport + + + def write(self, data): + """ + Forward data to transport. + """ + self._transport.writeToChild(_WORKER_AMP_STDIN, data) + + + def writeSequence(self, sequence): + """ + Emulate C{writeSequence} by iterating data in the C{sequence}. + """ + for data in sequence: + self._transport.writeToChild(_WORKER_AMP_STDIN, data) + + + def loseConnection(self): + """ + Closes the transport. + """ + self._transport.loseConnection() + + + def getHost(self): + """ + Return a L{LocalWorkerAddress} instance. + """ + return LocalWorkerAddress() + + + def getPeer(self): + """ + Return a L{LocalWorkerAddress} instance. + """ + return LocalWorkerAddress() + + + +class LocalWorker(ProcessProtocol): + """ + Local process worker protocol. This worker runs as a local process and + communicates via stdin/out. + + @ivar _ampProtocol: The L{AMP} protocol instance used to communicate with + the worker. + + @ivar _logDirectory: The directory where logs will reside. + + @ivar _logFile: The name of the main log file for tests output. + """ + + def __init__(self, ampProtocol, logDirectory, logFile): + self._ampProtocol = ampProtocol + self._logDirectory = logDirectory + self._logFile = logFile + self.endDeferred = Deferred() + + + def connectionMade(self): + """ + When connection is made, create the AMP protocol instance. + """ + self._ampProtocol.makeConnection(LocalWorkerTransport(self.transport)) + if not os.path.exists(self._logDirectory): + os.makedirs(self._logDirectory) + self._outLog = open(os.path.join(self._logDirectory, 'out.log'), 'wb') + self._errLog = open(os.path.join(self._logDirectory, 'err.log'), 'wb') + self._testLog = open( + os.path.join(self._logDirectory, self._logFile), 'w') + self._ampProtocol.setTestStream(self._testLog) + logDirectory = self._logDirectory + d = self._ampProtocol.callRemote(workercommands.Start, + directory=logDirectory) + # Ignore the potential errors, the test suite will fail properly and it + # would just print garbage. + d.addErrback(lambda x: None) + + + def connectionLost(self, reason): + """ + On connection lost, close the log files that we're managing for stdin + and stdout. + """ + self._outLog.close() + self._errLog.close() + self._testLog.close() + + + def processEnded(self, reason): + """ + When the process closes, call C{connectionLost} for cleanup purposes + and forward the information to the C{_ampProtocol}. + """ + self.connectionLost(reason) + self._ampProtocol.connectionLost(reason) + self.endDeferred.callback(reason) + + + def outReceived(self, data): + """ + Send data received from stdout to log. + """ + + self._outLog.write(data) + + + def errReceived(self, data): + """ + Write error data to log. + """ + self._errLog.write(data) + + + def childDataReceived(self, childFD, data): + """ + Handle data received on the specific pipe for the C{_ampProtocol}. + """ + if childFD == _WORKER_AMP_STDOUT: + self._ampProtocol.dataReceived(data) + else: + ProcessProtocol.childDataReceived(self, childFD, data) diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/workercommands.py b/contrib/python/Twisted/py2/twisted/trial/_dist/workercommands.py new file mode 100644 index 0000000000..517cd67025 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/workercommands.py @@ -0,0 +1,31 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Commands for telling a worker to load tests or run tests. + +@since: 12.3 +""" + +from twisted.protocols.amp import Command, String, Boolean, Unicode +from twisted.python.compat import _PY3 + +NativeString = Unicode if _PY3 else String + + + +class Run(Command): + """ + Run a test. + """ + arguments = [(b'testCase', NativeString())] + response = [(b'success', Boolean())] + + + +class Start(Command): + """ + Set up the worker process, giving the running directory. + """ + arguments = [(b'directory', NativeString())] + response = [(b'success', Boolean())] diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/workerreporter.py b/contrib/python/Twisted/py2/twisted/trial/_dist/workerreporter.py new file mode 100644 index 0000000000..ce82c59994 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/workerreporter.py @@ -0,0 +1,154 @@ +# -*- test-case-name: twisted.trial._dist.test.test_workerreporter -*- +# +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Test reporter forwarding test results over trial distributed AMP commands. + +@since: 12.3 +""" + +from twisted.python.failure import Failure +from twisted.python.reflect import qual +from twisted.trial.reporter import TestResult +from twisted.trial._dist import managercommands + + + +class WorkerReporter(TestResult): + """ + Reporter for trial's distributed workers. We send things not through a + stream, but through an C{AMP} protocol's C{callRemote} method. + + @ivar _DEFAULT_TODO: Default message for expected failures and + unexpected successes, used only if a C{Todo} is not provided. + """ + + _DEFAULT_TODO = 'Test expected to fail' + + def __init__(self, ampProtocol): + """ + @param ampProtocol: The communication channel with the trial + distributed manager which collects all test results. + @type ampProtocol: C{AMP} + """ + super(WorkerReporter, self).__init__() + self.ampProtocol = ampProtocol + + + def _getFailure(self, error): + """ + Convert a C{sys.exc_info()}-style tuple to a L{Failure}, if necessary. + """ + if isinstance(error, tuple): + return Failure(error[1], error[0], error[2]) + return error + + + def _getFrames(self, failure): + """ + Extract frames from a C{Failure} instance. + """ + frames = [] + for frame in failure.frames: + frames.extend([frame[0], frame[1], str(frame[2])]) + return frames + + + def addSuccess(self, test): + """ + Send a success over. + """ + super(WorkerReporter, self).addSuccess(test) + testName = test.id() + self.ampProtocol.callRemote(managercommands.AddSuccess, + testName=testName) + + + def addError(self, test, error): + """ + Send an error over. + """ + super(WorkerReporter, self).addError(test, error) + testName = test.id() + failure = self._getFailure(error) + error = failure.getErrorMessage() + errorClass = qual(failure.type) + frames = [frame for frame in self._getFrames(failure)] + self.ampProtocol.callRemote(managercommands.AddError, + testName=testName, + error=error, + errorClass=errorClass, + frames=frames) + + + def addFailure(self, test, fail): + """ + Send a Failure over. + """ + super(WorkerReporter, self).addFailure(test, fail) + testName = test.id() + failure = self._getFailure(fail) + fail = failure.getErrorMessage() + failClass = qual(failure.type) + frames = [frame for frame in self._getFrames(failure)] + self.ampProtocol.callRemote(managercommands.AddFailure, + testName=testName, + fail=fail, + failClass=failClass, + frames=frames) + + + def addSkip(self, test, reason): + """ + Send a skip over. + """ + super(WorkerReporter, self).addSkip(test, reason) + reason = str(reason) + testName = test.id() + self.ampProtocol.callRemote(managercommands.AddSkip, + testName=testName, + reason=reason) + + + def _getTodoReason(self, todo): + """ + Get the reason for a C{Todo}. + + If C{todo} is L{None}, return a sensible default. + """ + if todo is None: + return self._DEFAULT_TODO + else: + return todo.reason + + + def addExpectedFailure(self, test, error, todo=None): + """ + Send an expected failure over. + """ + super(WorkerReporter, self).addExpectedFailure(test, error, todo) + errorMessage = error.getErrorMessage() + testName = test.id() + self.ampProtocol.callRemote(managercommands.AddExpectedFailure, + testName=testName, + error=errorMessage, + todo=self._getTodoReason(todo)) + + + def addUnexpectedSuccess(self, test, todo=None): + """ + Send an unexpected success over. + """ + super(WorkerReporter, self).addUnexpectedSuccess(test, todo) + testName = test.id() + self.ampProtocol.callRemote(managercommands.AddUnexpectedSuccess, + testName=testName, + todo=self._getTodoReason(todo)) + + + def printSummary(self): + """ + I{Don't} print a summary + """ diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/workertrial.py b/contrib/python/Twisted/py2/twisted/trial/_dist/workertrial.py new file mode 100644 index 0000000000..dd14dd61ef --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/workertrial.py @@ -0,0 +1,111 @@ +# -*- test-case-name: twisted.trial._dist.test.test_workertrial -*- +# +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Implementation of C{AMP} worker commands, and main executable entry point for +the workers. + +@since: 12.3 +""" + +import sys +import os +import errno + + + +def _setupPath(environ): + """ + Override C{sys.path} with what the parent passed in B{TRIAL_PYTHONPATH}. + + @see: twisted.trial._dist.disttrial.DistTrialRunner.launchWorkerProcesses + """ + if 'TRIAL_PYTHONPATH' in environ: + sys.path[:] = environ['TRIAL_PYTHONPATH'].split(os.pathsep) + + +_setupPath(os.environ) + + +from twisted.internet.protocol import FileWrapper +from twisted.python.log import startLoggingWithObserver, textFromEventDict +from twisted.trial._dist.options import WorkerOptions +from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT + + + +class WorkerLogObserver(object): + """ + A log observer that forward its output to a C{AMP} protocol. + """ + + def __init__(self, protocol): + """ + @param protocol: a connected C{AMP} protocol instance. + @type protocol: C{AMP} + """ + self.protocol = protocol + + + def emit(self, eventDict): + """ + Produce a log output. + """ + from twisted.trial._dist import managercommands + text = textFromEventDict(eventDict) + if text is None: + return + self.protocol.callRemote(managercommands.TestWrite, out=text) + + + +def main(_fdopen=os.fdopen): + """ + Main function to be run if __name__ == "__main__". + + @param _fdopen: If specified, the function to use in place of C{os.fdopen}. + @param _fdopen: C{callable} + """ + config = WorkerOptions() + config.parseOptions() + + from twisted.trial._dist.worker import WorkerProtocol + workerProtocol = WorkerProtocol(config['force-gc']) + + protocolIn = _fdopen(_WORKER_AMP_STDIN, 'rb') + protocolOut = _fdopen(_WORKER_AMP_STDOUT, 'wb') + workerProtocol.makeConnection(FileWrapper(protocolOut)) + + observer = WorkerLogObserver(workerProtocol) + startLoggingWithObserver(observer.emit, False) + + while True: + try: + r = protocolIn.read(1) + except IOError as e: + if e.args[0] == errno.EINTR: + if sys.version_info < (3, 0): + sys.exc_clear() + continue + else: + raise + if r == b'': + break + else: + workerProtocol.dataReceived(r) + protocolOut.flush() + sys.stdout.flush() + sys.stderr.flush() + + if config.tracer: + sys.settrace(None) + results = config.tracer.results() + results.write_results(show_missing=True, summary=False, + coverdir=config.coverdir().path) + + + +if __name__ == '__main__': + main() diff --git a/contrib/python/Twisted/py2/twisted/trial/_synctest.py b/contrib/python/Twisted/py2/twisted/trial/_synctest.py new file mode 100644 index 0000000000..2a76ca8e22 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_synctest.py @@ -0,0 +1,1416 @@ +# -*- test-case-name: twisted.trial.test -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Things likely to be used by writers of unit tests. + +Maintainer: Jonathan Lange +""" + +from __future__ import division, absolute_import + +import inspect +import os, warnings, sys, tempfile, types +from dis import findlinestarts as _findlinestarts + +from twisted.python import failure, log, monkey +from twisted.python.reflect import fullyQualifiedName +from twisted.python.util import runWithWarningsSuppressed +from twisted.python.deprecate import ( + getDeprecationWarningString, warnAboutFunction +) +from twisted.internet.defer import ensureDeferred + +from twisted.trial import itrial, util + +import unittest as pyunit + +# Python 2.7 and higher has skip support built-in +SkipTest = pyunit.SkipTest + + + +class FailTest(AssertionError): + """ + Raised to indicate the current test has failed to pass. + """ + + + +class Todo(object): + """ + Internal object used to mark a L{TestCase} as 'todo'. Tests marked 'todo' + are reported differently in Trial L{TestResult}s. If todo'd tests fail, + they do not fail the suite and the errors are reported in a separate + category. If todo'd tests succeed, Trial L{TestResult}s will report an + unexpected success. + """ + + def __init__(self, reason, errors=None): + """ + @param reason: A string explaining why the test is marked 'todo' + + @param errors: An iterable of exception types that the test is + expected to raise. If one of these errors is raised by the test, it + will be trapped. Raising any other kind of error will fail the test. + If L{None} is passed, then all errors will be trapped. + """ + self.reason = reason + self.errors = errors + + + def __repr__(self): + return "<Todo reason=%r errors=%r>" % (self.reason, self.errors) + + + def expected(self, failure): + """ + @param failure: A L{twisted.python.failure.Failure}. + + @return: C{True} if C{failure} is expected, C{False} otherwise. + """ + if self.errors is None: + return True + for error in self.errors: + if failure.check(error): + return True + return False + + + +def makeTodo(value): + """ + Return a L{Todo} object built from C{value}. + + If C{value} is a string, return a Todo that expects any exception with + C{value} as a reason. If C{value} is a tuple, the second element is used + as the reason and the first element as the excepted error(s). + + @param value: A string or a tuple of C{(errors, reason)}, where C{errors} + is either a single exception class or an iterable of exception classes. + + @return: A L{Todo} object. + """ + if isinstance(value, str): + return Todo(reason=value) + if isinstance(value, tuple): + errors, reason = value + try: + errors = list(errors) + except TypeError: + errors = [errors] + return Todo(reason=reason, errors=errors) + + + +class _Warning(object): + """ + A L{_Warning} instance represents one warning emitted through the Python + warning system (L{warnings}). This is used to insulate callers of + L{_collectWarnings} from changes to the Python warnings system which might + otherwise require changes to the warning objects that function passes to + the observer object it accepts. + + @ivar message: The string which was passed as the message parameter to + L{warnings.warn}. + + @ivar category: The L{Warning} subclass which was passed as the category + parameter to L{warnings.warn}. + + @ivar filename: The name of the file containing the definition of the code + object which was C{stacklevel} frames above the call to + L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel} + parameter passed to L{warnings.warn}. + + @ivar lineno: The source line associated with the active instruction of the + code object object which was C{stacklevel} frames above the call to + L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel} + parameter passed to L{warnings.warn}. + """ + def __init__(self, message, category, filename, lineno): + self.message = message + self.category = category + self.filename = filename + self.lineno = lineno + + + +def _setWarningRegistryToNone(modules): + """ + Disable the per-module cache for every module found in C{modules}, typically + C{sys.modules}. + + @param modules: Dictionary of modules, typically sys.module dict + """ + for v in list(modules.values()): + if v is not None: + try: + v.__warningregistry__ = None + except: + # Don't specify a particular exception type to handle in case + # some wacky object raises some wacky exception in response to + # the setattr attempt. + pass + + + +def _collectWarnings(observeWarning, f, *args, **kwargs): + """ + Call C{f} with C{args} positional arguments and C{kwargs} keyword arguments + and collect all warnings which are emitted as a result in a list. + + @param observeWarning: A callable which will be invoked with a L{_Warning} + instance each time a warning is emitted. + + @return: The return value of C{f(*args, **kwargs)}. + """ + def showWarning(message, category, filename, lineno, file=None, line=None): + assert isinstance(message, Warning) + observeWarning(_Warning( + str(message), category, filename, lineno)) + + # Disable the per-module cache for every module otherwise if the warning + # which the caller is expecting us to collect was already emitted it won't + # be re-emitted by the call to f which happens below. + _setWarningRegistryToNone(sys.modules) + + origFilters = warnings.filters[:] + origShow = warnings.showwarning + warnings.simplefilter('always') + try: + warnings.showwarning = showWarning + result = f(*args, **kwargs) + finally: + warnings.filters[:] = origFilters + warnings.showwarning = origShow + return result + + + +class UnsupportedTrialFeature(Exception): + """A feature of twisted.trial was used that pyunit cannot support.""" + + + +class PyUnitResultAdapter(object): + """ + Wrap a C{TestResult} from the standard library's C{unittest} so that it + supports the extended result types from Trial, and also supports + L{twisted.python.failure.Failure}s being passed to L{addError} and + L{addFailure}. + """ + + def __init__(self, original): + """ + @param original: A C{TestResult} instance from C{unittest}. + """ + self.original = original + + + def _exc_info(self, err): + return util.excInfoOrFailureToExcInfo(err) + + + def startTest(self, method): + self.original.startTest(method) + + + def stopTest(self, method): + self.original.stopTest(method) + + + def addFailure(self, test, fail): + self.original.addFailure(test, self._exc_info(fail)) + + + def addError(self, test, error): + self.original.addError(test, self._exc_info(error)) + + + def _unsupported(self, test, feature, info): + self.original.addFailure( + test, + (UnsupportedTrialFeature, + UnsupportedTrialFeature(feature, info), + None)) + + + def addSkip(self, test, reason): + """ + Report the skip as a failure. + """ + self.original.addSkip(test, reason) + + + def addUnexpectedSuccess(self, test, todo=None): + """ + Report the unexpected success as a failure. + """ + self._unsupported(test, 'unexpected success', todo) + + + def addExpectedFailure(self, test, error): + """ + Report the expected failure (i.e. todo) as a failure. + """ + self._unsupported(test, 'expected failure', error) + + + def addSuccess(self, test): + self.original.addSuccess(test) + + + def upDownError(self, method, error, warn, printStatus): + pass + + + +class _AssertRaisesContext(object): + """ + A helper for implementing C{assertRaises}. This is a context manager and a + helper method to support the non-context manager version of + C{assertRaises}. + + @ivar _testCase: See C{testCase} parameter of C{__init__} + + @ivar _expected: See C{expected} parameter of C{__init__} + + @ivar _returnValue: The value returned by the callable being tested (only + when not being used as a context manager). + + @ivar _expectedName: A short string describing the expected exception + (usually the name of the exception class). + + @ivar exception: The exception which was raised by the function being + tested (if it raised one). + """ + + def __init__(self, testCase, expected): + """ + @param testCase: The L{TestCase} instance which is used to raise a + test-failing exception when that is necessary. + + @param expected: The exception type expected to be raised. + """ + self._testCase = testCase + self._expected = expected + self._returnValue = None + try: + self._expectedName = self._expected.__name__ + except AttributeError: + self._expectedName = str(self._expected) + + + def _handle(self, obj): + """ + Call the given object using this object as a context manager. + + @param obj: The object to call and which is expected to raise some + exception. + @type obj: L{object} + + @return: Whatever exception is raised by C{obj()}. + @rtype: L{BaseException} + """ + with self as context: + self._returnValue = obj() + return context.exception + + + def __enter__(self): + return self + + + def __exit__(self, exceptionType, exceptionValue, traceback): + """ + Check exit exception against expected exception. + """ + # No exception raised. + if exceptionType is None: + self._testCase.fail( + "{0} not raised ({1} returned)".format( + self._expectedName, self._returnValue) + ) + + if not isinstance(exceptionValue, exceptionType): + # Support some Python 2.6 ridiculousness. Exceptions raised using + # the C API appear here as the arguments you might pass to the + # exception class to create an exception instance. So... do that + # to turn them into the instances. + if isinstance(exceptionValue, tuple): + exceptionValue = exceptionType(*exceptionValue) + else: + exceptionValue = exceptionType(exceptionValue) + + # Store exception so that it can be access from context. + self.exception = exceptionValue + + # Wrong exception raised. + if not issubclass(exceptionType, self._expected): + reason = failure.Failure(exceptionValue, exceptionType, traceback) + self._testCase.fail( + "{0} raised instead of {1}:\n {2}".format( + fullyQualifiedName(exceptionType), + self._expectedName, reason.getTraceback()), + ) + + # All good. + return True + + + +class _Assertions(pyunit.TestCase, object): + """ + Replaces many of the built-in TestCase assertions. In general, these + assertions provide better error messages and are easier to use in + callbacks. + """ + + def fail(self, msg=None): + """ + Absolutely fail the test. Do not pass go, do not collect $200. + + @param msg: the message that will be displayed as the reason for the + failure + """ + raise self.failureException(msg) + + + def assertFalse(self, condition, msg=None): + """ + Fail the test if C{condition} evaluates to True. + + @param condition: any object that defines __nonzero__ + """ + super(_Assertions, self).assertFalse(condition, msg) + return condition + assertNot = failUnlessFalse = failIf = assertFalse + + + def assertTrue(self, condition, msg=None): + """ + Fail the test if C{condition} evaluates to False. + + @param condition: any object that defines __nonzero__ + """ + super(_Assertions, self).assertTrue(condition, msg) + return condition + assert_ = failUnlessTrue = failUnless = assertTrue + + + def assertRaises(self, exception, f=None, *args, **kwargs): + """ + Fail the test unless calling the function C{f} with the given + C{args} and C{kwargs} raises C{exception}. The failure will report + the traceback and call stack of the unexpected exception. + + @param exception: exception type that is to be expected + @param f: the function to call + + @return: If C{f} is L{None}, a context manager which will make an + assertion about the exception raised from the suite it manages. If + C{f} is not L{None}, the exception raised by C{f}. + + @raise self.failureException: Raised if the function call does + not raise an exception or if it raises an exception of a + different type. + """ + context = _AssertRaisesContext(self, exception) + if f is None: + return context + + return context._handle(lambda: f(*args, **kwargs)) + failUnlessRaises = assertRaises + + + def assertEqual(self, first, second, msg=None): + """ + Fail the test if C{first} and C{second} are not equal. + + @param msg: A string describing the failure that's included in the + exception. + """ + super(_Assertions, self).assertEqual(first, second, msg) + return first + failUnlessEqual = failUnlessEquals = assertEquals = assertEqual + + + def assertIs(self, first, second, msg=None): + """ + Fail the test if C{first} is not C{second}. This is an + obect-identity-equality test, not an object equality + (i.e. C{__eq__}) test. + + @param msg: if msg is None, then the failure message will be + '%r is not %r' % (first, second) + """ + if first is not second: + raise self.failureException(msg or '%r is not %r' % (first, second)) + return first + failUnlessIdentical = assertIdentical = assertIs + + + def assertIsNot(self, first, second, msg=None): + """ + Fail the test if C{first} is C{second}. This is an + obect-identity-equality test, not an object equality + (i.e. C{__eq__}) test. + + @param msg: if msg is None, then the failure message will be + '%r is %r' % (first, second) + """ + if first is second: + raise self.failureException(msg or '%r is %r' % (first, second)) + return first + failIfIdentical = assertNotIdentical = assertIsNot + + + def assertNotEqual(self, first, second, msg=None): + """ + Fail the test if C{first} == C{second}. + + @param msg: if msg is None, then the failure message will be + '%r == %r' % (first, second) + """ + if not first != second: + raise self.failureException(msg or '%r == %r' % (first, second)) + return first + assertNotEquals = failIfEquals = failIfEqual = assertNotEqual + + + def assertIn(self, containee, container, msg=None): + """ + Fail the test if C{containee} is not found in C{container}. + + @param containee: the value that should be in C{container} + @param container: a sequence type, or in the case of a mapping type, + will follow semantics of 'if key in dict.keys()' + @param msg: if msg is None, then the failure message will be + '%r not in %r' % (first, second) + """ + if containee not in container: + raise self.failureException(msg or "%r not in %r" + % (containee, container)) + return containee + failUnlessIn = assertIn + + + def assertNotIn(self, containee, container, msg=None): + """ + Fail the test if C{containee} is found in C{container}. + + @param containee: the value that should not be in C{container} + @param container: a sequence type, or in the case of a mapping type, + will follow semantics of 'if key in dict.keys()' + @param msg: if msg is None, then the failure message will be + '%r in %r' % (first, second) + """ + if containee in container: + raise self.failureException(msg or "%r in %r" + % (containee, container)) + return containee + failIfIn = assertNotIn + + + def assertNotAlmostEqual(self, first, second, places=7, msg=None): + """ + Fail if the two objects are equal as determined by their + difference rounded to the given number of decimal places + (default 7) and comparing to zero. + + @note: decimal places (from zero) is usually not the same + as significant digits (measured from the most + significant digit). + + @note: included for compatibility with PyUnit test cases + """ + if round(second-first, places) == 0: + raise self.failureException(msg or '%r == %r within %r places' + % (first, second, places)) + return first + assertNotAlmostEquals = failIfAlmostEqual = assertNotAlmostEqual + failIfAlmostEquals = assertNotAlmostEqual + + + def assertAlmostEqual(self, first, second, places=7, msg=None): + """ + Fail if the two objects are unequal as determined by their + difference rounded to the given number of decimal places + (default 7) and comparing to zero. + + @note: decimal places (from zero) is usually not the same + as significant digits (measured from the most + significant digit). + + @note: included for compatibility with PyUnit test cases + """ + if round(second-first, places) != 0: + raise self.failureException(msg or '%r != %r within %r places' + % (first, second, places)) + return first + assertAlmostEquals = failUnlessAlmostEqual = assertAlmostEqual + failUnlessAlmostEquals = assertAlmostEqual + + + def assertApproximates(self, first, second, tolerance, msg=None): + """ + Fail if C{first} - C{second} > C{tolerance} + + @param msg: if msg is None, then the failure message will be + '%r ~== %r' % (first, second) + """ + if abs(first - second) > tolerance: + raise self.failureException(msg or "%s ~== %s" % (first, second)) + return first + failUnlessApproximates = assertApproximates + + + def assertSubstring(self, substring, astring, msg=None): + """ + Fail if C{substring} does not exist within C{astring}. + """ + return self.failUnlessIn(substring, astring, msg) + failUnlessSubstring = assertSubstring + + + def assertNotSubstring(self, substring, astring, msg=None): + """ + Fail if C{astring} contains C{substring}. + """ + return self.failIfIn(substring, astring, msg) + failIfSubstring = assertNotSubstring + + + def assertWarns(self, category, message, filename, f, + *args, **kwargs): + """ + Fail if the given function doesn't generate the specified warning when + called. It calls the function, checks the warning, and forwards the + result of the function if everything is fine. + + @param category: the category of the warning to check. + @param message: the output message of the warning to check. + @param filename: the filename where the warning should come from. + @param f: the function which is supposed to generate the warning. + @type f: any callable. + @param args: the arguments to C{f}. + @param kwargs: the keywords arguments to C{f}. + + @return: the result of the original function C{f}. + """ + warningsShown = [] + result = _collectWarnings(warningsShown.append, f, *args, **kwargs) + + if not warningsShown: + self.fail("No warnings emitted") + first = warningsShown[0] + for other in warningsShown[1:]: + if ((other.message, other.category) + != (first.message, first.category)): + self.fail("Can't handle different warnings") + self.assertEqual(first.message, message) + self.assertIdentical(first.category, category) + + # Use starts with because of .pyc/.pyo issues. + self.assertTrue( + filename.startswith(first.filename), + 'Warning in %r, expected %r' % (first.filename, filename)) + + # It would be nice to be able to check the line number as well, but + # different configurations actually end up reporting different line + # numbers (generally the variation is only 1 line, but that's enough + # to fail the test erroneously...). + # self.assertEqual(lineno, xxx) + + return result + failUnlessWarns = assertWarns + + + def assertIsInstance(self, instance, classOrTuple, message=None): + """ + Fail if C{instance} is not an instance of the given class or of + one of the given classes. + + @param instance: the object to test the type (first argument of the + C{isinstance} call). + @type instance: any. + @param classOrTuple: the class or classes to test against (second + argument of the C{isinstance} call). + @type classOrTuple: class, type, or tuple. + + @param message: Custom text to include in the exception text if the + assertion fails. + """ + if not isinstance(instance, classOrTuple): + if message is None: + suffix = "" + else: + suffix = ": " + message + self.fail("%r is not an instance of %s%s" % ( + instance, classOrTuple, suffix)) + failUnlessIsInstance = assertIsInstance + + + def assertNotIsInstance(self, instance, classOrTuple): + """ + Fail if C{instance} is an instance of the given class or of one of the + given classes. + + @param instance: the object to test the type (first argument of the + C{isinstance} call). + @type instance: any. + @param classOrTuple: the class or classes to test against (second + argument of the C{isinstance} call). + @type classOrTuple: class, type, or tuple. + """ + if isinstance(instance, classOrTuple): + self.fail("%r is an instance of %s" % (instance, classOrTuple)) + failIfIsInstance = assertNotIsInstance + + + def successResultOf(self, deferred): + """ + Return the current success result of C{deferred} or raise + C{self.failureException}. + + @param deferred: A L{Deferred<twisted.internet.defer.Deferred>} which + has a success result. This means + L{Deferred.callback<twisted.internet.defer.Deferred.callback>} or + L{Deferred.errback<twisted.internet.defer.Deferred.errback>} has + been called on it and it has reached the end of its callback chain + and the last callback or errback returned a non-L{failure.Failure}. + @type deferred: L{Deferred<twisted.internet.defer.Deferred>} + + @raise SynchronousTestCase.failureException: If the + L{Deferred<twisted.internet.defer.Deferred>} has no result or has a + failure result. + + @return: The result of C{deferred}. + """ + deferred = ensureDeferred(deferred) + result = [] + deferred.addBoth(result.append) + + if not result: + self.fail( + "Success result expected on {!r}, found no result instead" + .format(deferred) + ) + + result = result[0] + + if isinstance(result, failure.Failure): + self.fail( + "Success result expected on {!r}, " + "found failure result instead:\n{}" + .format(deferred, result.getTraceback()) + ) + + return result + + + def failureResultOf(self, deferred, *expectedExceptionTypes): + """ + Return the current failure result of C{deferred} or raise + C{self.failureException}. + + @param deferred: A L{Deferred<twisted.internet.defer.Deferred>} which + has a failure result. This means + L{Deferred.callback<twisted.internet.defer.Deferred.callback>} or + L{Deferred.errback<twisted.internet.defer.Deferred.errback>} has + been called on it and it has reached the end of its callback chain + and the last callback or errback raised an exception or returned a + L{failure.Failure}. + @type deferred: L{Deferred<twisted.internet.defer.Deferred>} + + @param expectedExceptionTypes: Exception types to expect - if + provided, and the exception wrapped by the failure result is + not one of the types provided, then this test will fail. + + @raise SynchronousTestCase.failureException: If the + L{Deferred<twisted.internet.defer.Deferred>} has no result, has a + success result, or has an unexpected failure result. + + @return: The failure result of C{deferred}. + @rtype: L{failure.Failure} + """ + deferred = ensureDeferred(deferred) + result = [] + deferred.addBoth(result.append) + + if not result: + self.fail( + "Failure result expected on {!r}, found no result instead" + .format(deferred) + ) + + result = result[0] + + if not isinstance(result, failure.Failure): + self.fail( + "Failure result expected on {!r}, " + "found success result ({!r}) instead" + .format(deferred, result) + ) + + if ( + expectedExceptionTypes and + not result.check(*expectedExceptionTypes) + ): + expectedString = " or ".join([ + ".".join((t.__module__, t.__name__)) + for t in expectedExceptionTypes + ]) + + self.fail( + "Failure of type ({}) expected on {!r}, " + "found type {!r} instead: {}" + .format( + expectedString, deferred, result.type, + result.getTraceback() + ) + ) + + return result + + + def assertNoResult(self, deferred): + """ + Assert that C{deferred} does not have a result at this point. + + If the assertion succeeds, then the result of C{deferred} is left + unchanged. Otherwise, any L{failure.Failure} result is swallowed. + + @param deferred: A L{Deferred<twisted.internet.defer.Deferred>} without + a result. This means that neither + L{Deferred.callback<twisted.internet.defer.Deferred.callback>} nor + L{Deferred.errback<twisted.internet.defer.Deferred.errback>} has + been called, or that the + L{Deferred<twisted.internet.defer.Deferred>} is waiting on another + L{Deferred<twisted.internet.defer.Deferred>} for a result. + @type deferred: L{Deferred<twisted.internet.defer.Deferred>} + + @raise SynchronousTestCase.failureException: If the + L{Deferred<twisted.internet.defer.Deferred>} has a result. + """ + deferred = ensureDeferred(deferred) + result = [] + + def cb(res): + result.append(res) + return res + + deferred.addBoth(cb) + + if result: + # If there is already a failure, the self.fail below will + # report it, so swallow it in the deferred + deferred.addErrback(lambda _: None) + self.fail( + "No result expected on {!r}, found {!r} instead" + .format(deferred, result[0]) + ) + + + def assertRegex(self, text, regex, msg=None): + """ + Fail the test if a C{regexp} search of C{text} fails. + + @param text: Text which is under test. + @type text: L{str} + + @param regex: A regular expression object or a string containing a + regular expression suitable for use by re.search(). + @type regex: L{str} or L{re.RegexObject} + + @param msg: Text used as the error message on failure. + @type msg: L{str} + """ + if sys.version_info[:2] > (2, 7): + super(_Assertions, self).assertRegex(text, regex, msg) + else: + # Python 2.7 has unittest.assertRegexpMatches() which was + # renamed to unittest.assertRegex() in Python 3.2 + super(_Assertions, self).assertRegexpMatches(text, regex, msg) + + + +class _LogObserver(object): + """ + Observes the Twisted logs and catches any errors. + + @ivar _errors: A C{list} of L{Failure} instances which were received as + error events from the Twisted logging system. + + @ivar _added: A C{int} giving the number of times C{_add} has been called + less the number of times C{_remove} has been called; used to only add + this observer to the Twisted logging since once, regardless of the + number of calls to the add method. + + @ivar _ignored: A C{list} of exception types which will not be recorded. + """ + + def __init__(self): + self._errors = [] + self._added = 0 + self._ignored = [] + + + def _add(self): + if self._added == 0: + log.addObserver(self.gotEvent) + self._added += 1 + + + def _remove(self): + self._added -= 1 + if self._added == 0: + log.removeObserver(self.gotEvent) + + + def _ignoreErrors(self, *errorTypes): + """ + Do not store any errors with any of the given types. + """ + self._ignored.extend(errorTypes) + + + def _clearIgnores(self): + """ + Stop ignoring any errors we might currently be ignoring. + """ + self._ignored = [] + + + def flushErrors(self, *errorTypes): + """ + Flush errors from the list of caught errors. If no arguments are + specified, remove all errors. If arguments are specified, only remove + errors of those types from the stored list. + """ + if errorTypes: + flushed = [] + remainder = [] + for f in self._errors: + if f.check(*errorTypes): + flushed.append(f) + else: + remainder.append(f) + self._errors = remainder + else: + flushed = self._errors + self._errors = [] + return flushed + + + def getErrors(self): + """ + Return a list of errors caught by this observer. + """ + return self._errors + + + def gotEvent(self, event): + """ + The actual observer method. Called whenever a message is logged. + + @param event: A dictionary containing the log message. Actual + structure undocumented (see source for L{twisted.python.log}). + """ + if event.get('isError', False) and 'failure' in event: + f = event['failure'] + if len(self._ignored) == 0 or not f.check(*self._ignored): + self._errors.append(f) + + + +_logObserver = _LogObserver() + + +class SynchronousTestCase(_Assertions): + """ + A unit test. The atom of the unit testing universe. + + This class extends C{unittest.TestCase} from the standard library. A number + of convenient testing helpers are added, including logging and warning + integration, monkey-patching support, and more. + + To write a unit test, subclass C{SynchronousTestCase} and define a method + (say, 'test_foo') on the subclass. To run the test, instantiate your + subclass with the name of the method, and call L{run} on the instance, + passing a L{TestResult} object. + + The C{trial} script will automatically find any C{SynchronousTestCase} + subclasses defined in modules beginning with 'test_' and construct test + cases for all methods beginning with 'test'. + + If an error is logged during the test run, the test will fail with an + error. See L{log.err}. + + @ivar failureException: An exception class, defaulting to C{FailTest}. If + the test method raises this exception, it will be reported as a failure, + rather than an exception. All of the assertion methods raise this if the + assertion fails. + + @ivar skip: L{None} or a string explaining why this test is to be + skipped. If defined, the test will not be run. Instead, it will be + reported to the result object as 'skipped' (if the C{TestResult} supports + skipping). + + @ivar todo: L{None}, a string or a tuple of C{(errors, reason)} where + C{errors} is either an exception class or an iterable of exception + classes, and C{reason} is a string. See L{Todo} or L{makeTodo} for more + information. + + @ivar suppress: L{None} or a list of tuples of C{(args, kwargs)} to be + passed to C{warnings.filterwarnings}. Use these to suppress warnings + raised in a test. Useful for testing deprecated code. See also + L{util.suppress}. + """ + failureException = FailTest + + def __init__(self, methodName='runTest'): + super(SynchronousTestCase, self).__init__(methodName) + self._passed = False + self._cleanups = [] + self._testMethodName = methodName + testMethod = getattr(self, methodName) + self._parents = [ + testMethod, self, sys.modules.get(self.__class__.__module__)] + + + def __eq__(self, other): + """ + Override the comparison defined by the base TestCase which considers + instances of the same class with the same _testMethodName to be + equal. Since trial puts TestCase instances into a set, that + definition of comparison makes it impossible to run the same test + method twice. Most likely, trial should stop using a set to hold + tests, but until it does, this is necessary on Python 2.6. -exarkun + """ + return self is other + + + def __ne__(self, other): + return self is not other + + + def __hash__(self): + return hash((self.__class__, self._testMethodName)) + + + def shortDescription(self): + desc = super(SynchronousTestCase, self).shortDescription() + if desc is None: + return self._testMethodName + return desc + + + def getSkip(self): + """ + Return the skip reason set on this test, if any is set. Checks on the + instance first, then the class, then the module, then packages. As + soon as it finds something with a C{skip} attribute, returns that. + Returns L{None} if it cannot find anything. See L{TestCase} docstring + for more details. + """ + return util.acquireAttribute(self._parents, 'skip', None) + + + def getTodo(self): + """ + Return a L{Todo} object if the test is marked todo. Checks on the + instance first, then the class, then the module, then packages. As + soon as it finds something with a C{todo} attribute, returns that. + Returns L{None} if it cannot find anything. See L{TestCase} docstring + for more details. + """ + todo = util.acquireAttribute(self._parents, 'todo', None) + if todo is None: + return None + return makeTodo(todo) + + + def runTest(self): + """ + If no C{methodName} argument is passed to the constructor, L{run} will + treat this method as the thing with the actual test inside. + """ + + + def run(self, result): + """ + Run the test case, storing the results in C{result}. + + First runs C{setUp} on self, then runs the test method (defined in the + constructor), then runs C{tearDown}. As with the standard library + L{unittest.TestCase}, the return value of these methods is disregarded. + In particular, returning a L{Deferred<twisted.internet.defer.Deferred>} + has no special additional consequences. + + @param result: A L{TestResult} object. + """ + log.msg("--> %s <--" % (self.id())) + new_result = itrial.IReporter(result, None) + if new_result is None: + result = PyUnitResultAdapter(result) + else: + result = new_result + result.startTest(self) + if self.getSkip(): # don't run test methods that are marked as .skip + result.addSkip(self, self.getSkip()) + result.stopTest(self) + return + + self._passed = False + self._warnings = [] + + self._installObserver() + # All the code inside _runFixturesAndTest will be run such that warnings + # emitted by it will be collected and retrievable by flushWarnings. + _collectWarnings(self._warnings.append, self._runFixturesAndTest, result) + + # Any collected warnings which the test method didn't flush get + # re-emitted so they'll be logged or show up on stdout or whatever. + for w in self.flushWarnings(): + try: + warnings.warn_explicit(**w) + except: + result.addError(self, failure.Failure()) + + result.stopTest(self) + + + def addCleanup(self, f, *args, **kwargs): + """ + Add the given function to a list of functions to be called after the + test has run, but before C{tearDown}. + + Functions will be run in reverse order of being added. This helps + ensure that tear down complements set up. + + As with all aspects of L{SynchronousTestCase}, Deferreds are not + supported in cleanup functions. + """ + self._cleanups.append((f, args, kwargs)) + + + def patch(self, obj, attribute, value): + """ + Monkey patch an object for the duration of the test. + + The monkey patch will be reverted at the end of the test using the + L{addCleanup} mechanism. + + The L{monkey.MonkeyPatcher} is returned so that users can restore and + re-apply the monkey patch within their tests. + + @param obj: The object to monkey patch. + @param attribute: The name of the attribute to change. + @param value: The value to set the attribute to. + @return: A L{monkey.MonkeyPatcher} object. + """ + monkeyPatch = monkey.MonkeyPatcher((obj, attribute, value)) + monkeyPatch.patch() + self.addCleanup(monkeyPatch.restore) + return monkeyPatch + + + def flushLoggedErrors(self, *errorTypes): + """ + Remove stored errors received from the log. + + C{TestCase} stores each error logged during the run of the test and + reports them as errors during the cleanup phase (after C{tearDown}). + + @param *errorTypes: If unspecified, flush all errors. Otherwise, only + flush errors that match the given types. + + @return: A list of failures that have been removed. + """ + return self._observer.flushErrors(*errorTypes) + + + def flushWarnings(self, offendingFunctions=None): + """ + Remove stored warnings from the list of captured warnings and return + them. + + @param offendingFunctions: If L{None}, all warnings issued during the + currently running test will be flushed. Otherwise, only warnings + which I{point} to a function included in this list will be flushed. + All warnings include a filename and source line number; if these + parts of a warning point to a source line which is part of a + function, then the warning I{points} to that function. + @type offendingFunctions: L{None} or L{list} of functions or methods. + + @raise ValueError: If C{offendingFunctions} is not L{None} and includes + an object which is not a L{types.FunctionType} or + L{types.MethodType} instance. + + @return: A C{list}, each element of which is a C{dict} giving + information about one warning which was flushed by this call. The + keys of each C{dict} are: + + - C{'message'}: The string which was passed as the I{message} + parameter to L{warnings.warn}. + + - C{'category'}: The warning subclass which was passed as the + I{category} parameter to L{warnings.warn}. + + - C{'filename'}: The name of the file containing the definition + of the code object which was C{stacklevel} frames above the + call to L{warnings.warn}, where C{stacklevel} is the value of + the C{stacklevel} parameter passed to L{warnings.warn}. + + - C{'lineno'}: The source line associated with the active + instruction of the code object object which was C{stacklevel} + frames above the call to L{warnings.warn}, where + C{stacklevel} is the value of the C{stacklevel} parameter + passed to L{warnings.warn}. + """ + if offendingFunctions is None: + toFlush = self._warnings[:] + self._warnings[:] = [] + else: + toFlush = [] + for aWarning in self._warnings: + for aFunction in offendingFunctions: + if not isinstance(aFunction, ( + types.FunctionType, types.MethodType)): + raise ValueError("%r is not a function or method" % ( + aFunction,)) + + # inspect.getabsfile(aFunction) sometimes returns a + # filename which disagrees with the filename the warning + # system generates. This seems to be because a + # function's code object doesn't deal with source files + # being renamed. inspect.getabsfile(module) seems + # better (or at least agrees with the warning system + # more often), and does some normalization for us which + # is desirable. inspect.getmodule() is attractive, but + # somewhat broken in Python < 2.6. See Python bug 4845. + aModule = sys.modules[aFunction.__module__] + filename = inspect.getabsfile(aModule) + + if filename != os.path.normcase(aWarning.filename): + continue + lineStarts = list(_findlinestarts(aFunction.__code__)) + first = lineStarts[0][1] + last = lineStarts[-1][1] + if not (first <= aWarning.lineno <= last): + continue + # The warning points to this function, flush it and move on + # to the next warning. + toFlush.append(aWarning) + break + # Remove everything which is being flushed. + list(map(self._warnings.remove, toFlush)) + + return [ + {'message': w.message, 'category': w.category, + 'filename': w.filename, 'lineno': w.lineno} + for w in toFlush] + + + def callDeprecated(self, version, f, *args, **kwargs): + """ + Call a function that should have been deprecated at a specific version + and in favor of a specific alternative, and assert that it was thusly + deprecated. + + @param version: A 2-sequence of (since, replacement), where C{since} is + a the first L{version<incremental.Version>} that C{f} + should have been deprecated since, and C{replacement} is a suggested + replacement for the deprecated functionality, as described by + L{twisted.python.deprecate.deprecated}. If there is no suggested + replacement, this parameter may also be simply a + L{version<incremental.Version>} by itself. + + @param f: The deprecated function to call. + + @param args: The arguments to pass to C{f}. + + @param kwargs: The keyword arguments to pass to C{f}. + + @return: Whatever C{f} returns. + + @raise: Whatever C{f} raises. If any exception is + raised by C{f}, though, no assertions will be made about emitted + deprecations. + + @raise FailTest: if no warnings were emitted by C{f}, or if the + L{DeprecationWarning} emitted did not produce the canonical + please-use-something-else message that is standard for Twisted + deprecations according to the given version and replacement. + """ + result = f(*args, **kwargs) + warningsShown = self.flushWarnings([self.callDeprecated]) + try: + info = list(version) + except TypeError: + since = version + replacement = None + else: + [since, replacement] = info + + if len(warningsShown) == 0: + self.fail('%r is not deprecated.' % (f,)) + + observedWarning = warningsShown[0]['message'] + expectedWarning = getDeprecationWarningString( + f, since, replacement=replacement) + self.assertEqual(expectedWarning, observedWarning) + + return result + + + def mktemp(self): + """ + Create a new path name which can be used for a new file or directory. + + The result is a relative path that is guaranteed to be unique within the + current working directory. The parent of the path will exist, but the + path will not. + + For a temporary directory call os.mkdir on the path. For a temporary + file just create the file (e.g. by opening the path for writing and then + closing it). + + @return: The newly created path + @rtype: C{str} + """ + MAX_FILENAME = 32 # some platforms limit lengths of filenames + base = os.path.join(self.__class__.__module__[:MAX_FILENAME], + self.__class__.__name__[:MAX_FILENAME], + self._testMethodName[:MAX_FILENAME]) + if not os.path.exists(base): + os.makedirs(base) + dirname = tempfile.mkdtemp('', '', base) + return os.path.join(dirname, 'temp') + + + def _getSuppress(self): + """ + Returns any warning suppressions set for this test. Checks on the + instance first, then the class, then the module, then packages. As + soon as it finds something with a C{suppress} attribute, returns that. + Returns any empty list (i.e. suppress no warnings) if it cannot find + anything. See L{TestCase} docstring for more details. + """ + return util.acquireAttribute(self._parents, 'suppress', []) + + + def _getSkipReason(self, method, skip): + """ + Return the reason to use for skipping a test method. + + @param method: The method which produced the skip. + @param skip: A L{unittest.SkipTest} instance raised by C{method}. + """ + if len(skip.args) > 0: + return skip.args[0] + + warnAboutFunction( + method, + "Do not raise unittest.SkipTest with no arguments! Give a reason " + "for skipping tests!") + return skip + + + def _run(self, suppress, todo, method, result): + """ + Run a single method, either a test method or fixture. + + @param suppress: Any warnings to suppress, as defined by the C{suppress} + attribute on this method, test case, or the module it is defined in. + + @param todo: Any expected failure or failures, as defined by the C{todo} + attribute on this method, test case, or the module it is defined in. + + @param method: The method to run. + + @param result: The TestResult instance to which to report results. + + @return: C{True} if the method fails and no further method/fixture calls + should be made, C{False} otherwise. + """ + if inspect.isgeneratorfunction(method): + exc = TypeError( + '%r is a generator function and therefore will never run' % ( + method,)) + result.addError(self, failure.Failure(exc)) + return True + try: + runWithWarningsSuppressed(suppress, method) + except SkipTest as e: + result.addSkip(self, self._getSkipReason(method, e)) + except: + reason = failure.Failure() + if todo is None or not todo.expected(reason): + if reason.check(self.failureException): + addResult = result.addFailure + else: + addResult = result.addError + addResult(self, reason) + else: + result.addExpectedFailure(self, reason, todo) + else: + return False + return True + + + def _runFixturesAndTest(self, result): + """ + Run C{setUp}, a test method, test cleanups, and C{tearDown}. + + @param result: The TestResult instance to which to report results. + """ + suppress = self._getSuppress() + try: + if self._run(suppress, None, self.setUp, result): + return + + todo = self.getTodo() + method = getattr(self, self._testMethodName) + failed = self._run(suppress, todo, method, result) + finally: + self._runCleanups(result) + + if todo and not failed: + result.addUnexpectedSuccess(self, todo) + + if self._run(suppress, None, self.tearDown, result): + failed = True + + for error in self._observer.getErrors(): + result.addError(self, error) + failed = True + self._observer.flushErrors() + self._removeObserver() + + if not (failed or todo): + result.addSuccess(self) + + + def _runCleanups(self, result): + """ + Synchronously run any cleanups which have been added. + """ + while len(self._cleanups) > 0: + f, args, kwargs = self._cleanups.pop() + try: + f(*args, **kwargs) + except: + f = failure.Failure() + result.addError(self, f) + + + def _installObserver(self): + self._observer = _logObserver + self._observer._add() + + + def _removeObserver(self): + self._observer._remove() diff --git a/contrib/python/Twisted/py2/twisted/trial/itrial.py b/contrib/python/Twisted/py2/twisted/trial/itrial.py new file mode 100644 index 0000000000..186b7e5b76 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/itrial.py @@ -0,0 +1,259 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Interfaces for Trial. + +Maintainer: Jonathan Lange +""" + +from __future__ import division, absolute_import + +import zope.interface as zi +from zope.interface import Attribute + + +class ITestCase(zi.Interface): + """ + The interface that a test case must implement in order to be used in Trial. + """ + + failureException = zi.Attribute( + "The exception class that is raised by failed assertions") + + + def __call__(result): + """ + Run the test. Should always do exactly the same thing as run(). + """ + + + def countTestCases(): + """ + Return the number of tests in this test case. Usually 1. + """ + + + def id(): + """ + Return a unique identifier for the test, usually the fully-qualified + Python name. + """ + + + def run(result): + """ + Run the test, storing the results in C{result}. + + @param result: A L{TestResult}. + """ + + + def shortDescription(): + """ + Return a short description of the test. + """ + + + +class IReporter(zi.Interface): + """ + I report results from a run of a test suite. + """ + + stream = zi.Attribute( + "Deprecated in Twisted 8.0. " + "The io-stream that this reporter will write to") + tbformat = zi.Attribute("Either 'default', 'brief', or 'verbose'") + args = zi.Attribute( + "Additional string argument passed from the command line") + shouldStop = zi.Attribute( + """ + A boolean indicating that this reporter would like the test run to stop. + """) + separator = Attribute( + "Deprecated in Twisted 8.0. " + "A value which will occasionally be passed to the L{write} method.") + testsRun = Attribute( + """ + The number of tests that seem to have been run according to this + reporter. + """) + + + def startTest(method): + """ + Report the beginning of a run of a single test method. + + @param method: an object that is adaptable to ITestMethod + """ + + + def stopTest(method): + """ + Report the status of a single test method + + @param method: an object that is adaptable to ITestMethod + """ + + + def startSuite(name): + """ + Deprecated in Twisted 8.0. + + Suites which wish to appear in reporter output should call this + before running their tests. + """ + + + def endSuite(name): + """ + Deprecated in Twisted 8.0. + + Called at the end of a suite, if and only if that suite has called + C{startSuite}. + """ + + + def cleanupErrors(errs): + """ + Deprecated in Twisted 8.0. + + Called when the reactor has been left in a 'dirty' state + + @param errs: a list of L{twisted.python.failure.Failure}s + """ + + + def upDownError(userMeth, warn=True, printStatus=True): + """ + Deprecated in Twisted 8.0. + + Called when an error occurs in a setUp* or tearDown* method + + @param warn: indicates whether or not the reporter should emit a + warning about the error + @type warn: Boolean + @param printStatus: indicates whether or not the reporter should + print the name of the method and the status + message appropriate for the type of error + @type printStatus: Boolean + """ + + + def addSuccess(test): + """ + Record that test passed. + """ + + + def addError(test, error): + """ + Record that a test has raised an unexpected exception. + + @param test: The test that has raised an error. + @param error: The error that the test raised. It will either be a + three-tuple in the style of C{sys.exc_info()} or a + L{Failure<twisted.python.failure.Failure>} object. + """ + + + def addFailure(test, failure): + """ + Record that a test has failed with the given failure. + + @param test: The test that has failed. + @param failure: The failure that the test failed with. It will + either be a three-tuple in the style of C{sys.exc_info()} + or a L{Failure<twisted.python.failure.Failure>} object. + """ + + + def addExpectedFailure(test, failure, todo=None): + """ + Record that the given test failed, and was expected to do so. + + In Twisted 15.5 and prior, C{todo} was a mandatory parameter. + + @type test: L{unittest.TestCase} + @param test: The test which this is about. + @type error: L{failure.Failure} + @param error: The error which this test failed with. + @type todo: L{unittest.Todo} + @param todo: The reason for the test's TODO status. If L{None}, a + generic reason is used. + """ + + + def addUnexpectedSuccess(test, todo=None): + """ + Record that the given test failed, and was expected to do so. + + In Twisted 15.5 and prior, C{todo} was a mandatory parameter. + + @type test: L{unittest.TestCase} + @param test: The test which this is about. + @type todo: L{unittest.Todo} + @param todo: The reason for the test's TODO status. If L{None}, a + generic reason is used. + """ + + + def addSkip(test, reason): + """ + Record that a test has been skipped for the given reason. + + @param test: The test that has been skipped. + @param reason: An object that the test case has specified as the reason + for skipping the test. + """ + + + def printSummary(): + """ + Deprecated in Twisted 8.0, use L{done} instead. + + Present a summary of the test results. + """ + + + def printErrors(): + """ + Deprecated in Twisted 8.0, use L{done} instead. + + Present the errors that have occurred during the test run. This method + will be called after all tests have been run. + """ + + + def write(string): + """ + Deprecated in Twisted 8.0, use L{done} instead. + + Display a string to the user, without appending a new line. + """ + + + def writeln(string): + """ + Deprecated in Twisted 8.0, use L{done} instead. + + Display a string to the user, appending a new line. + """ + + def wasSuccessful(): + """ + Return a boolean indicating whether all test results that were reported + to this reporter were successful or not. + """ + + + def done(): + """ + Called when the test run is complete. + + This gives the result object an opportunity to display a summary of + information to the user. Once you have called C{done} on an + L{IReporter} object, you should assume that the L{IReporter} object is + no longer usable. + """ diff --git a/contrib/python/Twisted/py2/twisted/trial/reporter.py b/contrib/python/Twisted/py2/twisted/trial/reporter.py new file mode 100644 index 0000000000..eb48c7b99c --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/reporter.py @@ -0,0 +1,1322 @@ +# -*- test-case-name: twisted.trial.test.test_reporter -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. +# +# Maintainer: Jonathan Lange + +""" +Defines classes that handle the results of tests. +""" + +from __future__ import division, absolute_import + +import sys +import os +import time +import warnings +import unittest as pyunit + +from collections import OrderedDict + +from zope.interface import implementer + +from twisted.python import reflect, log +from twisted.python.components import proxyForInterface +from twisted.python.failure import Failure +from twisted.python.util import untilConcludes +from twisted.python.compat import _PY3, items +from twisted.trial import itrial, util + +try: + from subunit import TestProtocolClient +except ImportError: + TestProtocolClient = None + + + +def _makeTodo(value): + """ + Return a L{Todo} object built from C{value}. + + This is a synonym for L{twisted.trial.unittest.makeTodo}, but imported + locally to avoid circular imports. + + @param value: A string or a tuple of C{(errors, reason)}, where C{errors} + is either a single exception class or an iterable of exception classes. + + @return: A L{Todo} object. + """ + from twisted.trial.unittest import makeTodo + return makeTodo(value) + + + +class BrokenTestCaseWarning(Warning): + """ + Emitted as a warning when an exception occurs in one of setUp or tearDown. + """ + + + +class SafeStream(object): + """ + Wraps a stream object so that all C{write} calls are wrapped in + L{untilConcludes<twisted.python.util.untilConcludes>}. + """ + + def __init__(self, original): + self.original = original + + + def __getattr__(self, name): + return getattr(self.original, name) + + + def write(self, *a, **kw): + return untilConcludes(self.original.write, *a, **kw) + + + +@implementer(itrial.IReporter) +class TestResult(pyunit.TestResult, object): + """ + Accumulates the results of several L{twisted.trial.unittest.TestCase}s. + + @ivar successes: count the number of successes achieved by the test run. + @type successes: C{int} + """ + + # Used when no todo provided to addExpectedFailure or addUnexpectedSuccess. + _DEFAULT_TODO = 'Test expected to fail' + + def __init__(self): + super(TestResult, self).__init__() + self.skips = [] + self.expectedFailures = [] + self.unexpectedSuccesses = [] + self.successes = 0 + self._timings = [] + + + def __repr__(self): + return ('<%s run=%d errors=%d failures=%d todos=%d dones=%d skips=%d>' + % (reflect.qual(self.__class__), self.testsRun, + len(self.errors), len(self.failures), + len(self.expectedFailures), len(self.skips), + len(self.unexpectedSuccesses))) + + + def _getTime(self): + return time.time() + + + def _getFailure(self, error): + """ + Convert a C{sys.exc_info()}-style tuple to a L{Failure}, if necessary. + """ + if isinstance(error, tuple): + return Failure(error[1], error[0], error[2]) + return error + + + def startTest(self, test): + """ + This must be called before the given test is commenced. + + @type test: L{pyunit.TestCase} + """ + super(TestResult, self).startTest(test) + self._testStarted = self._getTime() + + + def stopTest(self, test): + """ + This must be called after the given test is completed. + + @type test: L{pyunit.TestCase} + """ + super(TestResult, self).stopTest(test) + self._lastTime = self._getTime() - self._testStarted + + + def addFailure(self, test, fail): + """ + Report a failed assertion for the given test. + + @type test: L{pyunit.TestCase} + @type fail: L{Failure} or L{tuple} + """ + self.failures.append((test, self._getFailure(fail))) + + + def addError(self, test, error): + """ + Report an error that occurred while running the given test. + + @type test: L{pyunit.TestCase} + @type error: L{Failure} or L{tuple} + """ + self.errors.append((test, self._getFailure(error))) + + + def addSkip(self, test, reason): + """ + Report that the given test was skipped. + + In Trial, tests can be 'skipped'. Tests are skipped mostly because + there is some platform or configuration issue that prevents them from + being run correctly. + + @type test: L{pyunit.TestCase} + @type reason: L{str} + """ + self.skips.append((test, reason)) + + + def addUnexpectedSuccess(self, test, todo=None): + """ + Report that the given test succeeded against expectations. + + In Trial, tests can be marked 'todo'. That is, they are expected to + fail. When a test that is expected to fail instead succeeds, it should + call this method to report the unexpected success. + + @type test: L{pyunit.TestCase} + @type todo: L{unittest.Todo}, or L{None}, in which case a default todo + message is provided. + """ + if todo is None: + todo = _makeTodo(self._DEFAULT_TODO) + self.unexpectedSuccesses.append((test, todo)) + + + def addExpectedFailure(self, test, error, todo=None): + """ + Report that the given test failed, and was expected to do so. + + In Trial, tests can be marked 'todo'. That is, they are expected to + fail. + + @type test: L{pyunit.TestCase} + @type error: L{Failure} + @type todo: L{unittest.Todo}, or L{None}, in which case a default todo + message is provided. + """ + if todo is None: + todo = _makeTodo(self._DEFAULT_TODO) + self.expectedFailures.append((test, error, todo)) + + + def addSuccess(self, test): + """ + Report that the given test succeeded. + + @type test: L{pyunit.TestCase} + """ + self.successes += 1 + + + def wasSuccessful(self): + """ + Report whether or not this test suite was successful or not. + + The behaviour of this method changed in L{pyunit} in Python 3.4 to + fail if there are any errors, failures, or unexpected successes. + Previous to 3.4, it was only if there were errors or failures. This + method implements the old behaviour for backwards compatibility reasons, + checking just for errors and failures. + + @rtype: L{bool} + """ + return len(self.failures) == len(self.errors) == 0 + + + def done(self): + """ + The test suite has finished running. + """ + + + +@implementer(itrial.IReporter) +class TestResultDecorator(proxyForInterface(itrial.IReporter, + "_originalReporter")): + """ + Base class for TestResult decorators. + + @ivar _originalReporter: The wrapped instance of reporter. + @type _originalReporter: A provider of L{itrial.IReporter} + """ + + + +@implementer(itrial.IReporter) +class UncleanWarningsReporterWrapper(TestResultDecorator): + """ + A wrapper for a reporter that converts L{util.DirtyReactorAggregateError}s + to warnings. + """ + + def addError(self, test, error): + """ + If the error is a L{util.DirtyReactorAggregateError}, instead of + reporting it as a normal error, throw a warning. + """ + + if (isinstance(error, Failure) + and error.check(util.DirtyReactorAggregateError)): + warnings.warn(error.getErrorMessage()) + else: + self._originalReporter.addError(test, error) + + + +@implementer(itrial.IReporter) +class _ExitWrapper(TestResultDecorator): + """ + A wrapper for a reporter that causes the reporter to stop after + unsuccessful tests. + """ + + def addError(self, *args, **kwargs): + self.shouldStop = True + return self._originalReporter.addError(*args, **kwargs) + + + def addFailure(self, *args, **kwargs): + self.shouldStop = True + return self._originalReporter.addFailure(*args, **kwargs) + + + +class _AdaptedReporter(TestResultDecorator): + """ + TestResult decorator that makes sure that addError only gets tests that + have been adapted with a particular test adapter. + """ + + def __init__(self, original, testAdapter): + """ + Construct an L{_AdaptedReporter}. + + @param original: An {itrial.IReporter}. + @param testAdapter: A callable that returns an L{itrial.ITestCase}. + """ + TestResultDecorator.__init__(self, original) + self.testAdapter = testAdapter + + + def addError(self, test, error): + """ + See L{itrial.IReporter}. + """ + test = self.testAdapter(test) + return self._originalReporter.addError(test, error) + + + def addExpectedFailure(self, test, failure, todo=None): + """ + See L{itrial.IReporter}. + + @type test: A L{pyunit.TestCase}. + @type failure: A L{failure.Failure} or L{exceptions.AssertionError} + @type todo: A L{unittest.Todo} or None + + When C{todo} is L{None} a generic C{unittest.Todo} is built. + + L{pyunit.TestCase}'s C{run()} calls this with 3 positional arguments + (without C{todo}). + """ + return self._originalReporter.addExpectedFailure( + self.testAdapter(test), failure, todo) + + + def addFailure(self, test, failure): + """ + See L{itrial.IReporter}. + """ + test = self.testAdapter(test) + return self._originalReporter.addFailure(test, failure) + + + def addSkip(self, test, skip): + """ + See L{itrial.IReporter}. + """ + test = self.testAdapter(test) + return self._originalReporter.addSkip(test, skip) + + + def addUnexpectedSuccess(self, test, todo=None): + """ + See L{itrial.IReporter}. + + @type test: A L{pyunit.TestCase}. + @type todo: A L{unittest.Todo} or None + + When C{todo} is L{None} a generic C{unittest.Todo} is built. + + L{pyunit.TestCase}'s C{run()} calls this with 2 positional arguments + (without C{todo}). + """ + test = self.testAdapter(test) + return self._originalReporter.addUnexpectedSuccess(test, todo) + + + def startTest(self, test): + """ + See L{itrial.IReporter}. + """ + return self._originalReporter.startTest(self.testAdapter(test)) + + + def stopTest(self, test): + """ + See L{itrial.IReporter}. + """ + return self._originalReporter.stopTest(self.testAdapter(test)) + + + +@implementer(itrial.IReporter) +class Reporter(TestResult): + """ + A basic L{TestResult} with support for writing to a stream. + + @ivar _startTime: The time when the first test was started. It defaults to + L{None}, which means that no test was actually launched. + @type _startTime: C{float} or L{None} + + @ivar _warningCache: A C{set} of tuples of warning message (file, line, + text, category) which have already been written to the output stream + during the currently executing test. This is used to avoid writing + duplicates of the same warning to the output stream. + @type _warningCache: C{set} + + @ivar _publisher: The log publisher which will be observed for warning + events. + @type _publisher: L{twisted.python.log.LogPublisher} + """ + + _separator = '-' * 79 + _doubleSeparator = '=' * 79 + + def __init__(self, stream=sys.stdout, tbformat='default', realtime=False, + publisher=None): + super(Reporter, self).__init__() + self._stream = SafeStream(stream) + self.tbformat = tbformat + self.realtime = realtime + self._startTime = None + self._warningCache = set() + + # Start observing log events so as to be able to report warnings. + self._publisher = publisher + if publisher is not None: + publisher.addObserver(self._observeWarnings) + + + def _observeWarnings(self, event): + """ + Observe warning events and write them to C{self._stream}. + + This method is a log observer which will be registered with + C{self._publisher.addObserver}. + + @param event: A C{dict} from the logging system. If it has a + C{'warning'} key, a logged warning will be extracted from it and + possibly written to C{self.stream}. + """ + if 'warning' in event: + key = (event['filename'], event['lineno'], + event['category'].split('.')[-1], + str(event['warning'])) + if key not in self._warningCache: + self._warningCache.add(key) + self._stream.write('%s:%s: %s: %s\n' % key) + + + def startTest(self, test): + """ + Called when a test begins to run. Records the time when it was first + called and resets the warning cache. + + @param test: L{ITestCase} + """ + super(Reporter, self).startTest(test) + if self._startTime is None: + self._startTime = self._getTime() + self._warningCache = set() + + + def addFailure(self, test, fail): + """ + Called when a test fails. If C{realtime} is set, then it prints the + error to the stream. + + @param test: L{ITestCase} that failed. + @param fail: L{failure.Failure} containing the error. + """ + super(Reporter, self).addFailure(test, fail) + if self.realtime: + fail = self.failures[-1][1] # guarantee it's a Failure + self._write(self._formatFailureTraceback(fail)) + + + def addError(self, test, error): + """ + Called when a test raises an error. If C{realtime} is set, then it + prints the error to the stream. + + @param test: L{ITestCase} that raised the error. + @param error: L{failure.Failure} containing the error. + """ + error = self._getFailure(error) + super(Reporter, self).addError(test, error) + if self.realtime: + error = self.errors[-1][1] # guarantee it's a Failure + self._write(self._formatFailureTraceback(error)) + + + def _write(self, format, *args): + """ + Safely write to the reporter's stream. + + @param format: A format string to write. + @param *args: The arguments for the format string. + """ + s = str(format) + assert isinstance(s, type('')) + if args: + self._stream.write(s % args) + else: + self._stream.write(s) + untilConcludes(self._stream.flush) + + + def _writeln(self, format, *args): + """ + Safely write a line to the reporter's stream. Newline is appended to + the format string. + + @param format: A format string to write. + @param *args: The arguments for the format string. + """ + self._write(format, *args) + self._write('\n') + + + def upDownError(self, method, error, warn, printStatus): + super(Reporter, self).upDownError(method, error, warn, printStatus) + if warn: + tbStr = self._formatFailureTraceback(error) + log.msg(tbStr) + msg = ("caught exception in %s, your TestCase is broken\n\n%s" + % (method, tbStr)) + warnings.warn(msg, BrokenTestCaseWarning, stacklevel=2) + + + def cleanupErrors(self, errs): + super(Reporter, self).cleanupErrors(errs) + warnings.warn("%s\n%s" % ("REACTOR UNCLEAN! traceback(s) follow: ", + self._formatFailureTraceback(errs)), + BrokenTestCaseWarning) + + + def _trimFrames(self, frames): + """ + Trim frames to remove internal paths. + + When a C{SynchronousTestCase} method fails synchronously, the stack + looks like this: + - [0]: C{SynchronousTestCase._run} + - [1]: C{util.runWithWarningsSuppressed} + - [2:-2]: code in the test method which failed + - [-1]: C{_synctest.fail} + + When a C{TestCase} method fails synchronously, the stack looks like + this: + - [0]: C{defer.maybeDeferred} + - [1]: C{utils.runWithWarningsSuppressed} + - [2]: C{utils.runWithWarningsSuppressed} + - [3:-2]: code in the test method which failed + - [-1]: C{_synctest.fail} + + When a method fails inside a C{Deferred} (i.e., when the test method + returns a C{Deferred}, and that C{Deferred}'s errback fires), the stack + captured inside the resulting C{Failure} looks like this: + - [0]: C{defer.Deferred._runCallbacks} + - [1:-2]: code in the testmethod which failed + - [-1]: C{_synctest.fail} + + As a result, we want to trim either [maybeDeferred, runWWS, runWWS] or + [Deferred._runCallbacks] or [SynchronousTestCase._run, runWWS] from the + front, and trim the [unittest.fail] from the end. + + There is also another case, when the test method is badly defined and + contains extra arguments. + + If it doesn't recognize one of these cases, it just returns the + original frames. + + @param frames: The C{list} of frames from the test failure. + + @return: The C{list} of frames to display. + """ + newFrames = list(frames) + + if len(frames) < 2: + return newFrames + + firstMethod = newFrames[0][0] + firstFile = os.path.splitext(os.path.basename(newFrames[0][1]))[0] + + secondMethod = newFrames[1][0] + secondFile = os.path.splitext(os.path.basename(newFrames[1][1]))[0] + + syncCase = (("_run", "_synctest"), + ("runWithWarningsSuppressed", "util")) + asyncCase = (("maybeDeferred", "defer"), + ("runWithWarningsSuppressed", "utils")) + + twoFrames = ((firstMethod, firstFile), (secondMethod, secondFile)) + + if _PY3: + # On PY3, we have an extra frame which is reraising the exception + for frame in newFrames: + frameFile = os.path.splitext(os.path.basename(frame[1]))[0] + if frameFile == "compat" and frame[0] == "reraise": + # If it's in the compat module and is reraise, BLAM IT + newFrames.pop(newFrames.index(frame)) + + if twoFrames == syncCase: + newFrames = newFrames[2:] + elif twoFrames == asyncCase: + newFrames = newFrames[3:] + elif (firstMethod, firstFile) == ("_runCallbacks", "defer"): + newFrames = newFrames[1:] + + if not newFrames: + # The method fails before getting called, probably an argument + # problem + return newFrames + + last = newFrames[-1] + if (last[0].startswith('fail') + and os.path.splitext(os.path.basename(last[1]))[0] == '_synctest'): + newFrames = newFrames[:-1] + + return newFrames + + + def _formatFailureTraceback(self, fail): + if isinstance(fail, str): + return fail.rstrip() + '\n' + fail.frames, frames = self._trimFrames(fail.frames), fail.frames + result = fail.getTraceback(detail=self.tbformat, + elideFrameworkCode=True) + fail.frames = frames + return result + + + def _groupResults(self, results, formatter): + """ + Group tests together based on their results. + + @param results: An iterable of tuples of two or more elements. The + first element of each tuple is a test case. The remaining + elements describe the outcome of that test case. + + @param formatter: A callable which turns a test case result into a + string. The elements after the first of the tuples in + C{results} will be passed as positional arguments to + C{formatter}. + + @return: A C{list} of two-tuples. The first element of each tuple + is a unique string describing one result from at least one of + the test cases in C{results}. The second element is a list of + the test cases which had that result. + """ + groups = OrderedDict() + for content in results: + case = content[0] + outcome = content[1:] + key = formatter(*outcome) + groups.setdefault(key, []).append(case) + return items(groups) + + + def _printResults(self, flavor, errors, formatter): + """ + Print a group of errors to the stream. + + @param flavor: A string indicating the kind of error (e.g. 'TODO'). + @param errors: A list of errors, often L{failure.Failure}s, but + sometimes 'todo' errors. + @param formatter: A callable that knows how to format the errors. + """ + for reason, cases in self._groupResults(errors, formatter): + self._writeln(self._doubleSeparator) + self._writeln(flavor) + self._write(reason) + self._writeln('') + for case in cases: + self._writeln(case.id()) + + + def _printExpectedFailure(self, error, todo): + return 'Reason: %r\n%s' % (todo.reason, + self._formatFailureTraceback(error)) + + + def _printUnexpectedSuccess(self, todo): + ret = 'Reason: %r\n' % (todo.reason,) + if todo.errors: + ret += 'Expected errors: %s\n' % (', '.join(todo.errors),) + return ret + + + def _printErrors(self): + """ + Print all of the non-success results to the stream in full. + """ + self._write('\n') + self._printResults('[SKIPPED]', self.skips, lambda x: '%s\n' % x) + self._printResults('[TODO]', self.expectedFailures, + self._printExpectedFailure) + self._printResults('[FAIL]', self.failures, + self._formatFailureTraceback) + self._printResults('[ERROR]', self.errors, + self._formatFailureTraceback) + self._printResults('[SUCCESS!?!]', self.unexpectedSuccesses, + self._printUnexpectedSuccess) + + + def _getSummary(self): + """ + Return a formatted count of tests status results. + """ + summaries = [] + for stat in ("skips", "expectedFailures", "failures", "errors", + "unexpectedSuccesses"): + num = len(getattr(self, stat)) + if num: + summaries.append('%s=%d' % (stat, num)) + if self.successes: + summaries.append('successes=%d' % (self.successes,)) + summary = (summaries and ' (' + ', '.join(summaries) + ')') or '' + return summary + + + def _printSummary(self): + """ + Print a line summarising the test results to the stream. + """ + summary = self._getSummary() + if self.wasSuccessful(): + status = "PASSED" + else: + status = "FAILED" + self._write("%s%s\n", status, summary) + + + def done(self): + """ + Summarize the result of the test run. + + The summary includes a report of all of the errors, todos, skips and + so forth that occurred during the run. It also includes the number of + tests that were run and how long it took to run them (not including + load time). + + Expects that C{_printErrors}, C{_writeln}, C{_write}, C{_printSummary} + and C{_separator} are all implemented. + """ + if self._publisher is not None: + self._publisher.removeObserver(self._observeWarnings) + self._printErrors() + self._writeln(self._separator) + if self._startTime is not None: + self._writeln('Ran %d tests in %.3fs', self.testsRun, + time.time() - self._startTime) + self._write('\n') + self._printSummary() + + + +class MinimalReporter(Reporter): + """ + A minimalist reporter that prints only a summary of the test result, in + the form of (timeTaken, #tests, #tests, #errors, #failures, #skips). + """ + + def _printErrors(self): + """ + Don't print a detailed summary of errors. We only care about the + counts. + """ + + + def _printSummary(self): + """ + Print out a one-line summary of the form: + '%(runtime) %(number_of_tests) %(number_of_tests) %(num_errors) + %(num_failures) %(num_skips)' + """ + numTests = self.testsRun + if self._startTime is not None: + timing = self._getTime() - self._startTime + else: + timing = 0 + t = (timing, numTests, numTests, + len(self.errors), len(self.failures), len(self.skips)) + self._writeln(' '.join(map(str, t))) + + + +class TextReporter(Reporter): + """ + Simple reporter that prints a single character for each test as it runs, + along with the standard Trial summary text. + """ + + def addSuccess(self, test): + super(TextReporter, self).addSuccess(test) + self._write('.') + + + def addError(self, *args): + super(TextReporter, self).addError(*args) + self._write('E') + + + def addFailure(self, *args): + super(TextReporter, self).addFailure(*args) + self._write('F') + + + def addSkip(self, *args): + super(TextReporter, self).addSkip(*args) + self._write('S') + + + def addExpectedFailure(self, *args): + super(TextReporter, self).addExpectedFailure(*args) + self._write('T') + + + def addUnexpectedSuccess(self, *args): + super(TextReporter, self).addUnexpectedSuccess(*args) + self._write('!') + + + +class VerboseTextReporter(Reporter): + """ + A verbose reporter that prints the name of each test as it is running. + + Each line is printed with the name of the test, followed by the result of + that test. + """ + + # This is actually the bwverbose option + + def startTest(self, tm): + self._write('%s ... ', tm.id()) + super(VerboseTextReporter, self).startTest(tm) + + + def addSuccess(self, test): + super(VerboseTextReporter, self).addSuccess(test) + self._write('[OK]') + + + def addError(self, *args): + super(VerboseTextReporter, self).addError(*args) + self._write('[ERROR]') + + + def addFailure(self, *args): + super(VerboseTextReporter, self).addFailure(*args) + self._write('[FAILURE]') + + + def addSkip(self, *args): + super(VerboseTextReporter, self).addSkip(*args) + self._write('[SKIPPED]') + + + def addExpectedFailure(self, *args): + super(VerboseTextReporter, self).addExpectedFailure(*args) + self._write('[TODO]') + + + def addUnexpectedSuccess(self, *args): + super(VerboseTextReporter, self).addUnexpectedSuccess(*args) + self._write('[SUCCESS!?!]') + + + def stopTest(self, test): + super(VerboseTextReporter, self).stopTest(test) + self._write('\n') + + + +class TimingTextReporter(VerboseTextReporter): + """ + Prints out each test as it is running, followed by the time taken for each + test to run. + """ + + def stopTest(self, method): + """ + Mark the test as stopped, and write the time it took to run the test + to the stream. + """ + super(TimingTextReporter, self).stopTest(method) + self._write("(%.03f secs)\n" % self._lastTime) + + + +class _AnsiColorizer(object): + """ + A colorizer is an object that loosely wraps around a stream, allowing + callers to write text to the stream in a particular color. + + Colorizer classes must implement C{supported()} and C{write(text, color)}. + """ + _colors = dict(black=30, red=31, green=32, yellow=33, + blue=34, magenta=35, cyan=36, white=37) + + def __init__(self, stream): + self.stream = stream + + + def supported(cls, stream=sys.stdout): + """ + A class method that returns True if the current platform supports + coloring terminal output using this method. Returns False otherwise. + """ + if not stream.isatty(): + return False # auto color only on TTYs + try: + import curses + except ImportError: + return False + else: + try: + try: + return curses.tigetnum("colors") > 2 + except curses.error: + curses.setupterm() + return curses.tigetnum("colors") > 2 + except: + # guess false in case of error + return False + supported = classmethod(supported) + + + def write(self, text, color): + """ + Write the given text to the stream in the given color. + + @param text: Text to be written to the stream. + + @param color: A string label for a color. e.g. 'red', 'white'. + """ + color = self._colors[color] + self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text)) + + + +class _Win32Colorizer(object): + """ + See _AnsiColorizer docstring. + """ + def __init__(self, stream): + from win32console import GetStdHandle, STD_OUTPUT_HANDLE, \ + FOREGROUND_RED, FOREGROUND_BLUE, FOREGROUND_GREEN, \ + FOREGROUND_INTENSITY + red, green, blue, bold = (FOREGROUND_RED, FOREGROUND_GREEN, + FOREGROUND_BLUE, FOREGROUND_INTENSITY) + self.stream = stream + self.screenBuffer = GetStdHandle(STD_OUTPUT_HANDLE) + self._colors = { + 'normal': red | green | blue, + 'red': red | bold, + 'green': green | bold, + 'blue': blue | bold, + 'yellow': red | green | bold, + 'magenta': red | blue | bold, + 'cyan': green | blue | bold, + 'white': red | green | blue | bold + } + + + def supported(cls, stream=sys.stdout): + try: + import win32console + screenBuffer = win32console.GetStdHandle( + win32console.STD_OUTPUT_HANDLE) + except ImportError: + return False + import pywintypes + try: + screenBuffer.SetConsoleTextAttribute( + win32console.FOREGROUND_RED | + win32console.FOREGROUND_GREEN | + win32console.FOREGROUND_BLUE) + except pywintypes.error: + return False + else: + return True + supported = classmethod(supported) + + + def write(self, text, color): + color = self._colors[color] + self.screenBuffer.SetConsoleTextAttribute(color) + self.stream.write(text) + self.screenBuffer.SetConsoleTextAttribute(self._colors['normal']) + + + +class _NullColorizer(object): + """ + See _AnsiColorizer docstring. + """ + def __init__(self, stream): + self.stream = stream + + + def supported(cls, stream=sys.stdout): + return True + supported = classmethod(supported) + + + def write(self, text, color): + self.stream.write(text) + + + +@implementer(itrial.IReporter) +class SubunitReporter(object): + """ + Reports test output via Subunit. + + @ivar _subunit: The subunit protocol client that we are wrapping. + + @ivar _successful: An internal variable, used to track whether we have + received only successful results. + + @since: 10.0 + """ + + def __init__(self, stream=sys.stdout, tbformat='default', + realtime=False, publisher=None): + """ + Construct a L{SubunitReporter}. + + @param stream: A file-like object representing the stream to print + output to. Defaults to stdout. + @param tbformat: The format for tracebacks. Ignored, since subunit + always uses Python's standard format. + @param realtime: Whether or not to print exceptions in the middle + of the test results. Ignored, since subunit always does this. + @param publisher: The log publisher which will be preserved for + reporting events. Ignored, as it's not relevant to subunit. + """ + if TestProtocolClient is None: + raise Exception("Subunit not available") + self._subunit = TestProtocolClient(stream) + self._successful = True + + + def done(self): + """ + Record that the entire test suite run is finished. + + We do nothing, since a summary clause is irrelevant to the subunit + protocol. + """ + pass + + + def shouldStop(self): + """ + Whether or not the test runner should stop running tests. + """ + return self._subunit.shouldStop + shouldStop = property(shouldStop) + + + def stop(self): + """ + Signal that the test runner should stop running tests. + """ + return self._subunit.stop() + + + def wasSuccessful(self): + """ + Has the test run been successful so far? + + @return: C{True} if we have received no reports of errors or failures, + C{False} otherwise. + """ + # Subunit has a bug in its implementation of wasSuccessful, see + # https://bugs.edge.launchpad.net/subunit/+bug/491090, so we can't + # simply forward it on. + return self._successful + + + def startTest(self, test): + """ + Record that C{test} has started. + """ + return self._subunit.startTest(test) + + + def stopTest(self, test): + """ + Record that C{test} has completed. + """ + return self._subunit.stopTest(test) + + + def addSuccess(self, test): + """ + Record that C{test} was successful. + """ + return self._subunit.addSuccess(test) + + + def addSkip(self, test, reason): + """ + Record that C{test} was skipped for C{reason}. + + Some versions of subunit don't have support for addSkip. In those + cases, the skip is reported as a success. + + @param test: A unittest-compatible C{TestCase}. + @param reason: The reason for it being skipped. The C{str()} of this + object will be included in the subunit output stream. + """ + addSkip = getattr(self._subunit, 'addSkip', None) + if addSkip is None: + self.addSuccess(test) + else: + self._subunit.addSkip(test, reason) + + + def addError(self, test, err): + """ + Record that C{test} failed with an unexpected error C{err}. + + Also marks the run as being unsuccessful, causing + L{SubunitReporter.wasSuccessful} to return C{False}. + """ + self._successful = False + return self._subunit.addError( + test, util.excInfoOrFailureToExcInfo(err)) + + + def addFailure(self, test, err): + """ + Record that C{test} failed an assertion with the error C{err}. + + Also marks the run as being unsuccessful, causing + L{SubunitReporter.wasSuccessful} to return C{False}. + """ + self._successful = False + return self._subunit.addFailure( + test, util.excInfoOrFailureToExcInfo(err)) + + + def addExpectedFailure(self, test, failure, todo): + """ + Record an expected failure from a test. + + Some versions of subunit do not implement this. For those versions, we + record a success. + """ + failure = util.excInfoOrFailureToExcInfo(failure) + addExpectedFailure = getattr(self._subunit, 'addExpectedFailure', None) + if addExpectedFailure is None: + self.addSuccess(test) + else: + addExpectedFailure(test, failure) + + + def addUnexpectedSuccess(self, test, todo=None): + """ + Record an unexpected success. + + Since subunit has no way of expressing this concept, we record a + success on the subunit stream. + """ + # Not represented in pyunit/subunit. + self.addSuccess(test) + + + +class TreeReporter(Reporter): + """ + Print out the tests in the form a tree. + + Tests are indented according to which class and module they belong. + Results are printed in ANSI color. + """ + + currentLine = '' + indent = ' ' + columns = 79 + + FAILURE = 'red' + ERROR = 'red' + TODO = 'blue' + SKIP = 'blue' + TODONE = 'red' + SUCCESS = 'green' + + def __init__(self, stream=sys.stdout, *args, **kwargs): + super(TreeReporter, self).__init__(stream, *args, **kwargs) + self._lastTest = [] + for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]: + if colorizer.supported(stream): + self._colorizer = colorizer(stream) + break + + + def getDescription(self, test): + """ + Return the name of the method which 'test' represents. This is + what gets displayed in the leaves of the tree. + + e.g. getDescription(TestCase('test_foo')) ==> test_foo + """ + return test.id().split('.')[-1] + + + def addSuccess(self, test): + super(TreeReporter, self).addSuccess(test) + self.endLine('[OK]', self.SUCCESS) + + + def addError(self, *args): + super(TreeReporter, self).addError(*args) + self.endLine('[ERROR]', self.ERROR) + + + def addFailure(self, *args): + super(TreeReporter, self).addFailure(*args) + self.endLine('[FAIL]', self.FAILURE) + + + def addSkip(self, *args): + super(TreeReporter, self).addSkip(*args) + self.endLine('[SKIPPED]', self.SKIP) + + + def addExpectedFailure(self, *args): + super(TreeReporter, self).addExpectedFailure(*args) + self.endLine('[TODO]', self.TODO) + + + def addUnexpectedSuccess(self, *args): + super(TreeReporter, self).addUnexpectedSuccess(*args) + self.endLine('[SUCCESS!?!]', self.TODONE) + + + def _write(self, format, *args): + if args: + format = format % args + self.currentLine = format + super(TreeReporter, self)._write(self.currentLine) + + + def _getPreludeSegments(self, testID): + """ + Return a list of all non-leaf segments to display in the tree. + + Normally this is the module and class name. + """ + segments = testID.split('.')[:-1] + if len(segments) == 0: + return segments + segments = [ + seg for seg in ('.'.join(segments[:-1]), segments[-1]) + if len(seg) > 0] + return segments + + + def _testPrelude(self, testID): + """ + Write the name of the test to the stream, indenting it appropriately. + + If the test is the first test in a new 'branch' of the tree, also + write all of the parents in that branch. + """ + segments = self._getPreludeSegments(testID) + indentLevel = 0 + for seg in segments: + if indentLevel < len(self._lastTest): + if seg != self._lastTest[indentLevel]: + self._write('%s%s\n' % (self.indent * indentLevel, seg)) + else: + self._write('%s%s\n' % (self.indent * indentLevel, seg)) + indentLevel += 1 + self._lastTest = segments + + + def cleanupErrors(self, errs): + self._colorizer.write(' cleanup errors', self.ERROR) + self.endLine('[ERROR]', self.ERROR) + super(TreeReporter, self).cleanupErrors(errs) + + + def upDownError(self, method, error, warn, printStatus): + self._colorizer.write(" %s" % method, self.ERROR) + if printStatus: + self.endLine('[ERROR]', self.ERROR) + super(TreeReporter, self).upDownError(method, error, warn, printStatus) + + + def startTest(self, test): + """ + Called when C{test} starts. Writes the tests name to the stream using + a tree format. + """ + self._testPrelude(test.id()) + self._write('%s%s ... ' % (self.indent * (len(self._lastTest)), + self.getDescription(test))) + super(TreeReporter, self).startTest(test) + + + def endLine(self, message, color): + """ + Print 'message' in the given color. + + @param message: A string message, usually '[OK]' or something similar. + @param color: A string color, 'red', 'green' and so forth. + """ + spaces = ' ' * (self.columns - len(self.currentLine) - len(message)) + super(TreeReporter, self)._write(spaces) + self._colorizer.write(message, color) + super(TreeReporter, self)._write("\n") + + + def _printSummary(self): + """ + Print a line summarising the test results to the stream, and color the + status result. + """ + summary = self._getSummary() + if self.wasSuccessful(): + status = "PASSED" + color = self.SUCCESS + else: + status = "FAILED" + color = self.FAILURE + self._colorizer.write(status, color) + self._write("%s\n", summary) diff --git a/contrib/python/Twisted/py2/twisted/trial/runner.py b/contrib/python/Twisted/py2/twisted/trial/runner.py new file mode 100644 index 0000000000..220c34ba70 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/runner.py @@ -0,0 +1,1064 @@ +# -*- test-case-name: twisted.trial.test.test_runner -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +A miscellany of code used to run Trial tests. + +Maintainer: Jonathan Lange +""" + +from __future__ import absolute_import, division + +__all__ = [ + 'TestSuite', + + 'DestructiveTestSuite', 'ErrorHolder', 'LoggedSuite', + 'TestHolder', 'TestLoader', 'TrialRunner', 'TrialSuite', + + 'filenameToModule', 'isPackage', 'isPackageDirectory', 'isTestCase', + 'name', 'samefile', 'NOT_IN_TEST', + ] + +import doctest +import inspect +import os +import sys +import time +import types +import warnings + +from twisted.python import reflect, log, failure, modules, filepath +from twisted.python.compat import _PY3, _PY35PLUS + +from twisted.internet import defer +from twisted.trial import util, unittest +from twisted.trial.itrial import ITestCase +from twisted.trial.reporter import _ExitWrapper, UncleanWarningsReporterWrapper +from twisted.trial._asyncrunner import _ForceGarbageCollectionDecorator, _iterateTests +from twisted.trial._synctest import _logObserver + +# These are imported so that they remain in the public API for t.trial.runner +from twisted.trial.unittest import TestSuite + +from zope.interface import implementer + +pyunit = __import__('unittest') + + + +def isPackage(module): + """Given an object return True if the object looks like a package""" + if not isinstance(module, types.ModuleType): + return False + basename = os.path.splitext(os.path.basename(module.__file__))[0] + return basename == '__init__' + + +def isPackageDirectory(dirname): + """ + Is the directory at path 'dirname' a Python package directory? + Returns the name of the __init__ file (it may have a weird extension) + if dirname is a package directory. Otherwise, returns False + """ + def _getSuffixes(): + if _PY3: + import importlib + return importlib.machinery.all_suffixes() + else: + import imp + return list(zip(*imp.get_suffixes()))[0] + + + for ext in _getSuffixes(): + initFile = '__init__' + ext + if os.path.exists(os.path.join(dirname, initFile)): + return initFile + return False + + +def samefile(filename1, filename2): + """ + A hacky implementation of C{os.path.samefile}. Used by L{filenameToModule} + when the platform doesn't provide C{os.path.samefile}. Do not use this. + """ + return os.path.abspath(filename1) == os.path.abspath(filename2) + + +def filenameToModule(fn): + """ + Given a filename, do whatever possible to return a module object matching + that file. + + If the file in question is a module in Python path, properly import and + return that module. Otherwise, load the source manually. + + @param fn: A filename. + @return: A module object. + @raise ValueError: If C{fn} does not exist. + """ + if not os.path.exists(fn): + raise ValueError("%r doesn't exist" % (fn,)) + try: + ret = reflect.namedAny(reflect.filenameToModuleName(fn)) + except (ValueError, AttributeError): + # Couldn't find module. The file 'fn' is not in PYTHONPATH + return _importFromFile(fn) + + # >=3.7 has __file__ attribute as None, previously __file__ was not present + if getattr(ret, "__file__", None) is None: + # This isn't a Python module in a package, so import it from a file + return _importFromFile(fn) + + # ensure that the loaded module matches the file + retFile = os.path.splitext(ret.__file__)[0] + '.py' + # not all platforms (e.g. win32) have os.path.samefile + same = getattr(os.path, 'samefile', samefile) + if os.path.isfile(fn) and not same(fn, retFile): + del sys.modules[ret.__name__] + ret = _importFromFile(fn) + return ret + + +def _importFromFile(fn, moduleName=None): + fn = _resolveDirectory(fn) + if not moduleName: + moduleName = os.path.splitext(os.path.split(fn)[-1])[0] + if moduleName in sys.modules: + return sys.modules[moduleName] + if _PY35PLUS: + import importlib + + spec = importlib.util.spec_from_file_location(moduleName, fn) + if not spec: + raise SyntaxError(fn) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + sys.modules[moduleName] = module + else: + import imp + + with open(fn, 'r') as fd: + module = imp.load_source(moduleName, fn, fd) + return module + + +def _resolveDirectory(fn): + if os.path.isdir(fn): + initFile = isPackageDirectory(fn) + if initFile: + fn = os.path.join(fn, initFile) + else: + raise ValueError('%r is not a package directory' % (fn,)) + return fn + + +def _getMethodNameInClass(method): + """ + Find the attribute name on the method's class which refers to the method. + + For some methods, notably decorators which have not had __name__ set correctly: + + getattr(method.im_class, method.__name__) != method + """ + if getattr(method.im_class, method.__name__, object()) != method: + for alias in dir(method.im_class): + if getattr(method.im_class, alias, object()) == method: + return alias + return method.__name__ + + +class DestructiveTestSuite(TestSuite): + """ + A test suite which remove the tests once run, to minimize memory usage. + """ + + def run(self, result): + """ + Almost the same as L{TestSuite.run}, but with C{self._tests} being + empty at the end. + """ + while self._tests: + if result.shouldStop: + break + test = self._tests.pop(0) + test(result) + return result + + + +# When an error occurs outside of any test, the user will see this string +# in place of a test's name. +NOT_IN_TEST = "<not in test>" + + + +class LoggedSuite(TestSuite): + """ + Any errors logged in this suite will be reported to the L{TestResult} + object. + """ + + def run(self, result): + """ + Run the suite, storing all errors in C{result}. If an error is logged + while no tests are running, then it will be added as an error to + C{result}. + + @param result: A L{TestResult} object. + """ + observer = _logObserver + observer._add() + super(LoggedSuite, self).run(result) + observer._remove() + for error in observer.getErrors(): + result.addError(TestHolder(NOT_IN_TEST), error) + observer.flushErrors() + + + +class TrialSuite(TestSuite): + """ + Suite to wrap around every single test in a C{trial} run. Used internally + by Trial to set up things necessary for Trial tests to work, regardless of + what context they are run in. + """ + + def __init__(self, tests=(), forceGarbageCollection=False): + if forceGarbageCollection: + newTests = [] + for test in tests: + test = unittest.decorate( + test, _ForceGarbageCollectionDecorator) + newTests.append(test) + tests = newTests + suite = LoggedSuite(tests) + super(TrialSuite, self).__init__([suite]) + + + def _bail(self): + from twisted.internet import reactor + d = defer.Deferred() + reactor.addSystemEventTrigger('after', 'shutdown', + lambda: d.callback(None)) + reactor.fireSystemEvent('shutdown') # radix's suggestion + # As long as TestCase does crap stuff with the reactor we need to + # manually shutdown the reactor here, and that requires util.wait + # :( + # so that the shutdown event completes + unittest.TestCase('mktemp')._wait(d) + + def run(self, result): + try: + TestSuite.run(self, result) + finally: + self._bail() + + +def name(thing): + """ + @param thing: an object from modules (instance of PythonModule, + PythonAttribute), a TestCase subclass, or an instance of a TestCase. + """ + if isTestCase(thing): + # TestCase subclass + theName = reflect.qual(thing) + else: + # thing from trial, or thing from modules. + # this monstrosity exists so that modules' objects do not have to + # implement id(). -jml + try: + theName = thing.id() + except AttributeError: + theName = thing.name + return theName + + +def isTestCase(obj): + """ + @return: C{True} if C{obj} is a class that contains test cases, C{False} + otherwise. Used to find all the tests in a module. + """ + try: + return issubclass(obj, pyunit.TestCase) + except TypeError: + return False + + + +@implementer(ITestCase) +class TestHolder(object): + """ + Placeholder for a L{TestCase} inside a reporter. As far as a L{TestResult} + is concerned, this looks exactly like a unit test. + """ + + failureException = None + + def __init__(self, description): + """ + @param description: A string to be displayed L{TestResult}. + """ + self.description = description + + + def __call__(self, result): + return self.run(result) + + + def id(self): + return self.description + + + def countTestCases(self): + return 0 + + + def run(self, result): + """ + This test is just a placeholder. Run the test successfully. + + @param result: The C{TestResult} to store the results in. + @type result: L{twisted.trial.itrial.IReporter}. + """ + result.startTest(self) + result.addSuccess(self) + result.stopTest(self) + + + def shortDescription(self): + return self.description + + + +class ErrorHolder(TestHolder): + """ + Used to insert arbitrary errors into a test suite run. Provides enough + methods to look like a C{TestCase}, however, when it is run, it simply adds + an error to the C{TestResult}. The most common use-case is for when a + module fails to import. + """ + + def __init__(self, description, error): + """ + @param description: A string used by C{TestResult}s to identify this + error. Generally, this is the name of a module that failed to import. + + @param error: The error to be added to the result. Can be an `exc_info` + tuple or a L{twisted.python.failure.Failure}. + """ + super(ErrorHolder, self).__init__(description) + self.error = util.excInfoOrFailureToExcInfo(error) + + + def __repr__(self): + return "<ErrorHolder description=%r error=%r>" % ( + self.description, self.error[1]) + + + def run(self, result): + """ + Run the test, reporting the error. + + @param result: The C{TestResult} to store the results in. + @type result: L{twisted.trial.itrial.IReporter}. + """ + result.startTest(self) + result.addError(self, self.error) + result.stopTest(self) + + + +class TestLoader(object): + """ + I find tests inside function, modules, files -- whatever -- then return + them wrapped inside a Test (either a L{TestSuite} or a L{TestCase}). + + @ivar methodPrefix: A string prefix. C{TestLoader} will assume that all the + methods in a class that begin with C{methodPrefix} are test cases. + + @ivar modulePrefix: A string prefix. Every module in a package that begins + with C{modulePrefix} is considered a module full of tests. + + @ivar forceGarbageCollection: A flag applied to each C{TestCase} loaded. + See L{unittest.TestCase} for more information. + + @ivar sorter: A key function used to sort C{TestCase}s, test classes, + modules and packages. + + @ivar suiteFactory: A callable which is passed a list of tests (which + themselves may be suites of tests). Must return a test suite. + """ + + methodPrefix = 'test' + modulePrefix = 'test_' + + def __init__(self): + self.suiteFactory = TestSuite + self.sorter = name + self._importErrors = [] + + def sort(self, xs): + """ + Sort the given things using L{sorter}. + + @param xs: A list of test cases, class or modules. + """ + return sorted(xs, key=self.sorter) + + def findTestClasses(self, module): + """Given a module, return all Trial test classes""" + classes = [] + for name, val in inspect.getmembers(module): + if isTestCase(val): + classes.append(val) + return self.sort(classes) + + def findByName(self, name): + """ + Return a Python object given a string describing it. + + @param name: a string which may be either a filename or a + fully-qualified Python name. + + @return: If C{name} is a filename, return the module. If C{name} is a + fully-qualified Python name, return the object it refers to. + """ + if os.path.exists(name): + return filenameToModule(name) + return reflect.namedAny(name) + + def loadModule(self, module): + """ + Return a test suite with all the tests from a module. + + Included are TestCase subclasses and doctests listed in the module's + __doctests__ module. If that's not good for you, put a function named + either C{testSuite} or C{test_suite} in your module that returns a + TestSuite, and I'll use the results of that instead. + + If C{testSuite} and C{test_suite} are both present, then I'll use + C{testSuite}. + """ + ## XXX - should I add an optional parameter to disable the check for + ## a custom suite. + ## OR, should I add another method + if not isinstance(module, types.ModuleType): + raise TypeError("%r is not a module" % (module,)) + if hasattr(module, 'testSuite'): + return module.testSuite() + elif hasattr(module, 'test_suite'): + return module.test_suite() + suite = self.suiteFactory() + for testClass in self.findTestClasses(module): + suite.addTest(self.loadClass(testClass)) + if not hasattr(module, '__doctests__'): + return suite + docSuite = self.suiteFactory() + for docTest in module.__doctests__: + docSuite.addTest(self.loadDoctests(docTest)) + return self.suiteFactory([suite, docSuite]) + loadTestsFromModule = loadModule + + def loadClass(self, klass): + """ + Given a class which contains test cases, return a sorted list of + C{TestCase} instances. + """ + if not (isinstance(klass, type) or isinstance(klass, types.ClassType)): + raise TypeError("%r is not a class" % (klass,)) + if not isTestCase(klass): + raise ValueError("%r is not a test case" % (klass,)) + names = self.getTestCaseNames(klass) + tests = self.sort([self._makeCase(klass, self.methodPrefix+name) + for name in names]) + return self.suiteFactory(tests) + loadTestsFromTestCase = loadClass + + def getTestCaseNames(self, klass): + """ + Given a class that contains C{TestCase}s, return a list of names of + methods that probably contain tests. + """ + return reflect.prefixedMethodNames(klass, self.methodPrefix) + + def loadMethod(self, method): + """ + Given a method of a C{TestCase} that represents a test, return a + C{TestCase} instance for that test. + """ + if not isinstance(method, types.MethodType): + raise TypeError("%r not a method" % (method,)) + return self._makeCase(method.im_class, _getMethodNameInClass(method)) + + def _makeCase(self, klass, methodName): + return klass(methodName) + + def loadPackage(self, package, recurse=False): + """ + Load tests from a module object representing a package, and return a + TestSuite containing those tests. + + Tests are only loaded from modules whose name begins with 'test_' + (or whatever C{modulePrefix} is set to). + + @param package: a types.ModuleType object (or reasonable facsimile + obtained by importing) which may contain tests. + + @param recurse: A boolean. If True, inspect modules within packages + within the given package (and so on), otherwise, only inspect modules + in the package itself. + + @raise: TypeError if 'package' is not a package. + + @return: a TestSuite created with my suiteFactory, containing all the + tests. + """ + if not isPackage(package): + raise TypeError("%r is not a package" % (package,)) + pkgobj = modules.getModule(package.__name__) + if recurse: + discovery = pkgobj.walkModules() + else: + discovery = pkgobj.iterModules() + discovered = [] + for disco in discovery: + if disco.name.split(".")[-1].startswith(self.modulePrefix): + discovered.append(disco) + suite = self.suiteFactory() + for modinfo in self.sort(discovered): + try: + module = modinfo.load() + except: + thingToAdd = ErrorHolder(modinfo.name, failure.Failure()) + else: + thingToAdd = self.loadModule(module) + suite.addTest(thingToAdd) + return suite + + def loadDoctests(self, module): + """ + Return a suite of tests for all the doctests defined in C{module}. + + @param module: A module object or a module name. + """ + if isinstance(module, str): + try: + module = reflect.namedAny(module) + except: + return ErrorHolder(module, failure.Failure()) + if not inspect.ismodule(module): + warnings.warn("trial only supports doctesting modules") + return + extraArgs = {} + + # Work around Python issue2604: DocTestCase.tearDown clobbers globs + def saveGlobals(test): + """ + Save C{test.globs} and replace it with a copy so that if + necessary, the original will be available for the next test + run. + """ + test._savedGlobals = getattr(test, '_savedGlobals', test.globs) + test.globs = test._savedGlobals.copy() + extraArgs['setUp'] = saveGlobals + return doctest.DocTestSuite(module, **extraArgs) + + def loadAnything(self, thing, recurse=False, parent=None, qualName=None): + """ + Given a Python object, return whatever tests that are in it. Whatever + 'in' might mean. + + @param thing: A Python object. A module, method, class or package. + @param recurse: Whether or not to look in subpackages of packages. + Defaults to False. + + @param parent: For compatibility with the Python 3 loader, does + nothing. + @param qualname: For compatibility with the Python 3 loader, does + nothing. + + @return: A C{TestCase} or C{TestSuite}. + """ + if isinstance(thing, types.ModuleType): + if isPackage(thing): + return self.loadPackage(thing, recurse) + return self.loadModule(thing) + elif isinstance(thing, types.ClassType): + return self.loadClass(thing) + elif isinstance(thing, type): + return self.loadClass(thing) + elif isinstance(thing, types.MethodType): + return self.loadMethod(thing) + raise TypeError("No loader for %r. Unrecognized type" % (thing,)) + + def loadByName(self, name, recurse=False): + """ + Given a string representing a Python object, return whatever tests + are in that object. + + If C{name} is somehow inaccessible (e.g. the module can't be imported, + there is no Python object with that name etc) then return an + L{ErrorHolder}. + + @param name: The fully-qualified name of a Python object. + """ + try: + thing = self.findByName(name) + except: + return ErrorHolder(name, failure.Failure()) + return self.loadAnything(thing, recurse) + + loadTestsFromName = loadByName + + def loadByNames(self, names, recurse=False): + """ + Construct a TestSuite containing all the tests found in 'names', where + names is a list of fully qualified python names and/or filenames. The + suite returned will have no duplicate tests, even if the same object + is named twice. + """ + things = [] + errors = [] + for name in names: + try: + things.append(self.findByName(name)) + except: + errors.append(ErrorHolder(name, failure.Failure())) + suites = [self.loadAnything(thing, recurse) + for thing in self._uniqueTests(things)] + suites.extend(errors) + return self.suiteFactory(suites) + + + def _uniqueTests(self, things): + """ + Gather unique suite objects from loaded things. This will guarantee + uniqueness of inherited methods on TestCases which would otherwise hash + to same value and collapse to one test unexpectedly if using simpler + means: e.g. set(). + """ + seen = set() + for thing in things: + if isinstance(thing, types.MethodType): + thing = (thing, thing.im_class) + else: + thing = (thing,) + + if thing not in seen: + yield thing[0] + seen.add(thing) + + + +class Py3TestLoader(TestLoader): + """ + A test loader finds tests from the functions, modules, and files that is + asked to and loads them into a L{TestSuite} or L{TestCase}. + + See L{TestLoader} for further details. + """ + + def loadFile(self, fileName, recurse=False): + """ + Load a file, and then the tests in that file. + + @param fileName: The file name to load. + @param recurse: A boolean. If True, inspect modules within packages + within the given package (and so on), otherwise, only inspect + modules in the package itself. + """ + from importlib.machinery import SourceFileLoader + + name = reflect.filenameToModuleName(fileName) + try: + module = SourceFileLoader(name, fileName).load_module() + return self.loadAnything(module, recurse=recurse) + except OSError: + raise ValueError("{} is not a Python file.".format(fileName)) + + + def findByName(self, _name, recurse=False): + """ + Find and load tests, given C{name}. + + This partially duplicates the logic in C{unittest.loader.TestLoader}. + + @param name: The qualified name of the thing to load. + @param recurse: A boolean. If True, inspect modules within packages + within the given package (and so on), otherwise, only inspect + modules in the package itself. + """ + if os.sep in _name: + # It's a file, try and get the module name for this file. + name = reflect.filenameToModuleName(_name) + + try: + # Try and import it, if it's on the path. + # CAVEAT: If you have two twisteds, and you try and import the + # one NOT on your path, it'll load the one on your path. But + # that's silly, nobody should do that, and existing Trial does + # that anyway. + __import__(name) + except ImportError: + # If we can't import it, look for one NOT on the path. + return self.loadFile(_name, recurse=recurse) + + else: + name = _name + + obj = parent = remaining = None + + for searchName, remainingName in _qualNameWalker(name): + # Walk down the qualified name, trying to import a module. For + # example, `twisted.test.test_paths.FilePathTests` would try + # the full qualified name, then just up to test_paths, and then + # just up to test, and so forth. + # This gets us the highest level thing which is a module. + try: + obj = reflect.namedModule(searchName) + # If we reach here, we have successfully found a module. + # obj will be the module, and remaining will be the remaining + # part of the qualified name. + remaining = remainingName + break + + except ImportError: + # Check to see where the ImportError happened. If it happened + # in this file, ignore it. + tb = sys.exc_info()[2] + + # Walk down to the deepest frame, where it actually happened. + while tb.tb_next is not None: + tb = tb.tb_next + + # Get the filename that the ImportError originated in. + filenameWhereHappened = tb.tb_frame.f_code.co_filename + + # If it originated in the reflect file, then it's because it + # doesn't exist. If it originates elsewhere, it's because an + # ImportError happened in a module that does exist. + if filenameWhereHappened != reflect.__file__: + raise + + if remaining == "": + raise reflect.ModuleNotFound( + "The module {} does not exist.".format(name) + ) + + if obj is None: + # If it's none here, we didn't get to import anything. + # Try something drastic. + obj = reflect.namedAny(name) + remaining = name.split(".")[len(".".split(obj.__name__))+1:] + + try: + for part in remaining: + # Walk down the remaining modules. Hold on to the parent for + # methods, as on Python 3, you can no longer get the parent + # class from just holding onto the method. + parent, obj = obj, getattr(obj, part) + except AttributeError: + raise AttributeError("{} does not exist.".format(name)) + + return self.loadAnything(obj, parent=parent, qualName=remaining, + recurse=recurse) + + + def loadAnything(self, obj, recurse=False, parent=None, qualName=None): + """ + Load absolutely anything (as long as that anything is a module, + package, class, or method (with associated parent class and qualname). + + @param obj: The object to load. + @param recurse: A boolean. If True, inspect modules within packages + within the given package (and so on), otherwise, only inspect + modules in the package itself. + @param parent: If C{obj} is a method, this is the parent class of the + method. C{qualName} is also required. + @param qualName: If C{obj} is a method, this a list containing is the + qualified name of the method. C{parent} is also required. + """ + if isinstance(obj, types.ModuleType): + # It looks like a module + if isPackage(obj): + # It's a package, so recurse down it. + return self.loadPackage(obj, recurse=recurse) + # Otherwise get all the tests in the module. + return self.loadTestsFromModule(obj) + elif isinstance(obj, type) and issubclass(obj, pyunit.TestCase): + # We've found a raw test case, get the tests from it. + return self.loadTestsFromTestCase(obj) + elif (isinstance(obj, types.FunctionType) and + isinstance(parent, type) and + issubclass(parent, pyunit.TestCase)): + # We've found a method, and its parent is a TestCase. Instantiate + # it with the name of the method we want. + name = qualName[-1] + inst = parent(name) + + # Sanity check to make sure that the method we have got from the + # test case is the same one as was passed in. This doesn't actually + # use the function we passed in, because reasons. + assert getattr(inst, inst._testMethodName).__func__ == obj + + return inst + elif isinstance(obj, TestSuite): + # We've found a test suite. + return obj + else: + raise TypeError("don't know how to make test from: %s" % (obj,)) + + + def loadByName(self, name, recurse=False): + """ + Load some tests by name. + + @param name: The qualified name for the test to load. + @param recurse: A boolean. If True, inspect modules within packages + within the given package (and so on), otherwise, only inspect + modules in the package itself. + """ + try: + return self.suiteFactory([self.findByName(name, recurse=recurse)]) + except: + return self.suiteFactory([ErrorHolder(name, failure.Failure())]) + + + def loadByNames(self, names, recurse=False): + """ + Load some tests by a list of names. + + @param names: A L{list} of qualified names. + @param recurse: A boolean. If True, inspect modules within packages + within the given package (and so on), otherwise, only inspect + modules in the package itself. + """ + things = [] + errors = [] + for name in names: + try: + things.append(self.loadByName(name, recurse=recurse)) + except: + errors.append(ErrorHolder(name, failure.Failure())) + things.extend(errors) + return self.suiteFactory(self._uniqueTests(things)) + + + def loadClass(self, klass): + """ + Given a class which contains test cases, return a list of L{TestCase}s. + + @param klass: The class to load tests from. + """ + if not isinstance(klass, type): + raise TypeError("%r is not a class" % (klass,)) + if not isTestCase(klass): + raise ValueError("%r is not a test case" % (klass,)) + names = self.getTestCaseNames(klass) + tests = self.sort([self._makeCase(klass, self.methodPrefix+name) + for name in names]) + return self.suiteFactory(tests) + + + def loadMethod(self, method): + raise NotImplementedError("Can't happen on Py3") + + + def _uniqueTests(self, things): + """ + Gather unique suite objects from loaded things. This will guarantee + uniqueness of inherited methods on TestCases which would otherwise hash + to same value and collapse to one test unexpectedly if using simpler + means: e.g. set(). + """ + seen = set() + for testthing in things: + testthings = testthing._tests + for thing in testthings: + # This is horrible. + if str(thing) not in seen: + yield thing + seen.add(str(thing)) + + +def _qualNameWalker(qualName): + """ + Given a Python qualified name, this function yields a 2-tuple of the most + specific qualified name first, followed by the next-most-specific qualified + name, and so on, paired with the remainder of the qualified name. + + @param qualName: A Python qualified name. + @type qualName: L{str} + """ + # Yield what we were just given + yield (qualName, []) + + # If they want more, split the qualified name up + qualParts = qualName.split(".") + + for index in range(1, len(qualParts)): + # This code here will produce, from the example walker.texas.ranger: + # (walker.texas, ["ranger"]) + # (walker, ["texas", "ranger"]) + yield (".".join(qualParts[:-index]), qualParts[-index:]) + + +if _PY3: + del TestLoader + TestLoader = Py3TestLoader + + + +class TrialRunner(object): + """ + A specialised runner that the trial front end uses. + """ + + DEBUG = 'debug' + DRY_RUN = 'dry-run' + + def _setUpTestdir(self): + self._tearDownLogFile() + currentDir = os.getcwd() + base = filepath.FilePath(self.workingDirectory) + testdir, self._testDirLock = util._unusedTestDirectory(base) + os.chdir(testdir.path) + return currentDir + + + def _tearDownTestdir(self, oldDir): + os.chdir(oldDir) + self._testDirLock.unlock() + + + _log = log + def _makeResult(self): + reporter = self.reporterFactory(self.stream, self.tbformat, + self.rterrors, self._log) + if self._exitFirst: + reporter = _ExitWrapper(reporter) + if self.uncleanWarnings: + reporter = UncleanWarningsReporterWrapper(reporter) + return reporter + + def __init__(self, reporterFactory, + mode=None, + logfile='test.log', + stream=sys.stdout, + profile=False, + tracebackFormat='default', + realTimeErrors=False, + uncleanWarnings=False, + workingDirectory=None, + forceGarbageCollection=False, + debugger=None, + exitFirst=False): + self.reporterFactory = reporterFactory + self.logfile = logfile + self.mode = mode + self.stream = stream + self.tbformat = tracebackFormat + self.rterrors = realTimeErrors + self.uncleanWarnings = uncleanWarnings + self._result = None + self.workingDirectory = workingDirectory or '_trial_temp' + self._logFileObserver = None + self._logFileObject = None + self._forceGarbageCollection = forceGarbageCollection + self.debugger = debugger + self._exitFirst = exitFirst + if profile: + self.run = util.profiled(self.run, 'profile.data') + + def _tearDownLogFile(self): + if self._logFileObserver is not None: + log.removeObserver(self._logFileObserver.emit) + self._logFileObserver = None + if self._logFileObject is not None: + self._logFileObject.close() + self._logFileObject = None + + def _setUpLogFile(self): + self._tearDownLogFile() + if self.logfile == '-': + logFile = sys.stdout + else: + logFile = open(self.logfile, 'a') + self._logFileObject = logFile + self._logFileObserver = log.FileLogObserver(logFile) + log.startLoggingWithObserver(self._logFileObserver.emit, 0) + + + def run(self, test): + """ + Run the test or suite and return a result object. + """ + test = unittest.decorate(test, ITestCase) + return self._runWithoutDecoration(test, self._forceGarbageCollection) + + + def _runWithoutDecoration(self, test, forceGarbageCollection=False): + """ + Private helper that runs the given test but doesn't decorate it. + """ + result = self._makeResult() + # decorate the suite with reactor cleanup and log starting + # This should move out of the runner and be presumed to be + # present + suite = TrialSuite([test], forceGarbageCollection) + startTime = time.time() + if self.mode == self.DRY_RUN: + for single in _iterateTests(suite): + result.startTest(single) + result.addSuccess(single) + result.stopTest(single) + else: + if self.mode == self.DEBUG: + run = lambda: self.debugger.runcall(suite.run, result) + else: + run = lambda: suite.run(result) + + oldDir = self._setUpTestdir() + try: + self._setUpLogFile() + run() + finally: + self._tearDownLogFile() + self._tearDownTestdir(oldDir) + + endTime = time.time() + done = getattr(result, 'done', None) + if done is None: + warnings.warn( + "%s should implement done() but doesn't. Falling back to " + "printErrors() and friends." % reflect.qual(result.__class__), + category=DeprecationWarning, stacklevel=3) + result.printErrors() + result.writeln(result.separator) + result.writeln('Ran %d tests in %.3fs', result.testsRun, + endTime - startTime) + result.write('\n') + result.printSummary() + else: + result.done() + return result + + + def runUntilFailure(self, test): + """ + Repeatedly run C{test} until it fails. + """ + count = 0 + while True: + count += 1 + self.stream.write("Test Pass %d\n" % (count,)) + if count == 1: + result = self.run(test) + else: + result = self._runWithoutDecoration(test) + if result.testsRun == 0: + break + if not result.wasSuccessful(): + break + return result diff --git a/contrib/python/Twisted/py2/twisted/trial/unittest.py b/contrib/python/Twisted/py2/twisted/trial/unittest.py new file mode 100644 index 0000000000..d3f60346a6 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/unittest.py @@ -0,0 +1,35 @@ +# -*- test-case-name: twisted.trial.test -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Things likely to be used by writers of unit tests. +""" + +from __future__ import division, absolute_import + +# Define the public API from the two implementation modules +from twisted.trial._synctest import ( + FailTest, SkipTest, SynchronousTestCase, PyUnitResultAdapter, Todo, + makeTodo) +from twisted.trial._asynctest import TestCase +from twisted.trial._asyncrunner import ( + TestSuite, TestDecorator, decorate) + +# Further obscure the origins of these objects, to reduce surprise (and this is +# what the values were before code got shuffled around between files, but was +# otherwise unchanged). +FailTest.__module__ = SkipTest.__module__ = __name__ + +__all__ = [ + 'decorate', + 'FailTest', + 'makeTodo', + 'PyUnitResultAdapter', + 'SkipTest', + 'SynchronousTestCase', + 'TestCase', + 'TestDecorator', + 'TestSuite', + 'Todo', + ] diff --git a/contrib/python/Twisted/py2/twisted/trial/util.py b/contrib/python/Twisted/py2/twisted/trial/util.py new file mode 100644 index 0000000000..4f07112d32 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/util.py @@ -0,0 +1,411 @@ +# -*- test-case-name: twisted.trial.test.test_util -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. +# + +""" +A collection of utility functions and classes, used internally by Trial. + +This code is for Trial's internal use. Do NOT use this code if you are writing +tests. It is subject to change at the Trial maintainer's whim. There is +nothing here in this module for you to use unless you are maintaining Trial. + +Any non-Trial Twisted code that uses this module will be shot. + +Maintainer: Jonathan Lange + +@var DEFAULT_TIMEOUT_DURATION: The default timeout which will be applied to + asynchronous (ie, Deferred-returning) test methods, in seconds. +""" + +from __future__ import division, absolute_import, print_function + +from random import randrange + +from twisted.internet import defer, utils, interfaces +from twisted.python.failure import Failure +from twisted.python.filepath import FilePath +from twisted.python.lockfile import FilesystemLock + +__all__ = [ + 'DEFAULT_TIMEOUT_DURATION', + + 'excInfoOrFailureToExcInfo', 'suppress', 'acquireAttribute'] + +DEFAULT_TIMEOUT = object() +DEFAULT_TIMEOUT_DURATION = 120.0 + + + +class DirtyReactorAggregateError(Exception): + """ + Passed to L{twisted.trial.itrial.IReporter.addError} when the reactor is + left in an unclean state after a test. + + @ivar delayedCalls: The L{DelayedCall<twisted.internet.base.DelayedCall>} + objects which weren't cleaned up. + @ivar selectables: The selectables which weren't cleaned up. + """ + + def __init__(self, delayedCalls, selectables=None): + self.delayedCalls = delayedCalls + self.selectables = selectables + + + def __str__(self): + """ + Return a multi-line message describing all of the unclean state. + """ + msg = "Reactor was unclean." + if self.delayedCalls: + msg += ("\nDelayedCalls: (set " + "twisted.internet.base.DelayedCall.debug = True to " + "debug)\n") + msg += "\n".join(map(str, self.delayedCalls)) + if self.selectables: + msg += "\nSelectables:\n" + msg += "\n".join(map(str, self.selectables)) + return msg + + + +class _Janitor(object): + """ + The guy that cleans up after you. + + @ivar test: The L{TestCase} to report errors about. + @ivar result: The L{IReporter} to report errors to. + @ivar reactor: The reactor to use. If None, the global reactor + will be used. + """ + def __init__(self, test, result, reactor=None): + """ + @param test: See L{_Janitor.test}. + @param result: See L{_Janitor.result}. + @param reactor: See L{_Janitor.reactor}. + """ + self.test = test + self.result = result + self.reactor = reactor + + + def postCaseCleanup(self): + """ + Called by L{unittest.TestCase} after a test to catch any logged errors + or pending L{DelayedCall<twisted.internet.base.DelayedCall>}s. + """ + calls = self._cleanPending() + if calls: + aggregate = DirtyReactorAggregateError(calls) + self.result.addError(self.test, Failure(aggregate)) + return False + return True + + + def postClassCleanup(self): + """ + Called by L{unittest.TestCase} after the last test in a C{TestCase} + subclass. Ensures the reactor is clean by murdering the threadpool, + catching any pending + L{DelayedCall<twisted.internet.base.DelayedCall>}s, open sockets etc. + """ + selectables = self._cleanReactor() + calls = self._cleanPending() + if selectables or calls: + aggregate = DirtyReactorAggregateError(calls, selectables) + self.result.addError(self.test, Failure(aggregate)) + self._cleanThreads() + + + def _getReactor(self): + """ + Get either the passed-in reactor or the global reactor. + """ + if self.reactor is not None: + reactor = self.reactor + else: + from twisted.internet import reactor + return reactor + + + def _cleanPending(self): + """ + Cancel all pending calls and return their string representations. + """ + reactor = self._getReactor() + + # flush short-range timers + reactor.iterate(0) + reactor.iterate(0) + + delayedCallStrings = [] + for p in reactor.getDelayedCalls(): + if p.active(): + delayedString = str(p) + p.cancel() + else: + print("WEIRDNESS! pending timed call not active!") + delayedCallStrings.append(delayedString) + return delayedCallStrings + _cleanPending = utils.suppressWarnings( + _cleanPending, (('ignore',), {'category': DeprecationWarning, + 'message': + r'reactor\.iterate cannot be used.*'})) + + def _cleanThreads(self): + reactor = self._getReactor() + if interfaces.IReactorThreads.providedBy(reactor): + if reactor.threadpool is not None: + # Stop the threadpool now so that a new one is created. + # This improves test isolation somewhat (although this is a + # post class cleanup hook, so it's only isolating classes + # from each other, not methods from each other). + reactor._stopThreadPool() + + + def _cleanReactor(self): + """ + Remove all selectables from the reactor, kill any of them that were + processes, and return their string representation. + """ + reactor = self._getReactor() + selectableStrings = [] + for sel in reactor.removeAll(): + if interfaces.IProcessTransport.providedBy(sel): + sel.signalProcess('KILL') + selectableStrings.append(repr(sel)) + return selectableStrings + + + +_DEFAULT = object() +def acquireAttribute(objects, attr, default=_DEFAULT): + """ + Go through the list 'objects' sequentially until we find one which has + attribute 'attr', then return the value of that attribute. If not found, + return 'default' if set, otherwise, raise AttributeError. + """ + for obj in objects: + if hasattr(obj, attr): + return getattr(obj, attr) + if default is not _DEFAULT: + return default + raise AttributeError('attribute %r not found in %r' % (attr, objects)) + + + +def excInfoOrFailureToExcInfo(err): + """ + Coerce a Failure to an _exc_info, if err is a Failure. + + @param err: Either a tuple such as returned by L{sys.exc_info} or a + L{Failure} object. + @return: A tuple like the one returned by L{sys.exc_info}. e.g. + C{exception_type, exception_object, traceback_object}. + """ + if isinstance(err, Failure): + # Unwrap the Failure into an exc_info tuple. + err = (err.type, err.value, err.getTracebackObject()) + return err + + + +def suppress(action='ignore', **kwarg): + """ + Sets up the .suppress tuple properly, pass options to this method as you + would the stdlib warnings.filterwarnings() + + So, to use this with a .suppress magic attribute you would do the + following: + + >>> from twisted.trial import unittest, util + >>> import warnings + >>> + >>> class TestFoo(unittest.TestCase): + ... def testFooBar(self): + ... warnings.warn("i am deprecated", DeprecationWarning) + ... testFooBar.suppress = [util.suppress(message='i am deprecated')] + ... + >>> + + Note that as with the todo and timeout attributes: the module level + attribute acts as a default for the class attribute which acts as a default + for the method attribute. The suppress attribute can be overridden at any + level by specifying C{.suppress = []} + """ + return ((action,), kwarg) + + + +# This should be deleted, and replaced with twisted.application's code; see +# #6016: +def profiled(f, outputFile): + def _(*args, **kwargs): + import profile + prof = profile.Profile() + try: + result = prof.runcall(f, *args, **kwargs) + prof.dump_stats(outputFile) + except SystemExit: + pass + prof.print_stats() + return result + return _ + + + +@defer.inlineCallbacks +def _runSequentially(callables, stopOnFirstError=False): + """ + Run the given callables one after the other. If a callable returns a + Deferred, wait until it has finished before running the next callable. + + @param callables: An iterable of callables that take no parameters. + + @param stopOnFirstError: If True, then stop running callables as soon as + one raises an exception or fires an errback. False by default. + + @return: A L{Deferred} that fires a list of C{(flag, value)} tuples. Each + tuple will be either C{(SUCCESS, <return value>)} or C{(FAILURE, + <Failure>)}. + """ + results = [] + for f in callables: + d = defer.maybeDeferred(f) + try: + thing = yield d + results.append((defer.SUCCESS, thing)) + except Exception: + results.append((defer.FAILURE, Failure())) + if stopOnFirstError: + break + defer.returnValue(results) + + + +class _NoTrialMarker(Exception): + """ + No trial marker file could be found. + + Raised when trial attempts to remove a trial temporary working directory + that does not contain a marker file. + """ + + + +def _removeSafely(path): + """ + Safely remove a path, recursively. + + If C{path} does not contain a node named C{_trial_marker}, a + L{_NoTrialMarker} exception is raised and the path is not removed. + """ + if not path.child(b'_trial_marker').exists(): + raise _NoTrialMarker( + '%r is not a trial temporary path, refusing to remove it' + % (path,)) + try: + path.remove() + except OSError as e: + print ("could not remove %r, caught OSError [Errno %s]: %s" + % (path, e.errno, e.strerror)) + try: + newPath = FilePath(b'_trial_temp_old' + + str(randrange(10000000)).encode("utf-8")) + path.moveTo(newPath) + except OSError as e: + print ("could not rename path, caught OSError [Errno %s]: %s" + % (e.errno,e.strerror)) + raise + + + +class _WorkingDirectoryBusy(Exception): + """ + A working directory was specified to the runner, but another test run is + currently using that directory. + """ + + + +def _unusedTestDirectory(base): + """ + Find an unused directory named similarly to C{base}. + + Once a directory is found, it will be locked and a marker dropped into it + to identify it as a trial temporary directory. + + @param base: A template path for the discovery process. If this path + exactly cannot be used, a path which varies only in a suffix of the + basename will be used instead. + @type base: L{FilePath} + + @return: A two-tuple. The first element is a L{FilePath} representing the + directory which was found and created. The second element is a locked + L{FilesystemLock<twisted.python.lockfile.FilesystemLock>}. Another + call to C{_unusedTestDirectory} will not be able to reused the + same name until the lock is released, either explicitly or by this + process exiting. + """ + counter = 0 + while True: + if counter: + testdir = base.sibling('%s-%d' % (base.basename(), counter)) + else: + testdir = base + + testDirLock = FilesystemLock(testdir.path + '.lock') + if testDirLock.lock(): + # It is not in use + if testdir.exists(): + # It exists though - delete it + _removeSafely(testdir) + + # Create it anew and mark it as ours so the next _removeSafely on + # it succeeds. + testdir.makedirs() + testdir.child(b'_trial_marker').setContent(b'') + return testdir, testDirLock + else: + # It is in use + if base.basename() == '_trial_temp': + counter += 1 + else: + raise _WorkingDirectoryBusy() + + + +def _listToPhrase(things, finalDelimiter, delimiter=', '): + """ + Produce a string containing each thing in C{things}, + separated by a C{delimiter}, with the last couple being separated + by C{finalDelimiter} + + @param things: The elements of the resulting phrase + @type things: L{list} or L{tuple} + + @param finalDelimiter: What to put between the last two things + (typically 'and' or 'or') + @type finalDelimiter: L{str} + + @param delimiter: The separator to use between each thing, + not including the last two. Should typically include a trailing space. + @type delimiter: L{str} + + @return: The resulting phrase + @rtype: L{str} + """ + if not isinstance(things, (list, tuple)): + raise TypeError("Things must be a list or a tuple") + if not things: + return '' + if len(things) == 1: + return str(things[0]) + if len(things) == 2: + return "%s %s %s" % (str(things[0]), finalDelimiter, str(things[1])) + else: + strThings = [] + for thing in things: + strThings.append(str(thing)) + return "%s%s%s %s" % (delimiter.join(strThings[:-1]), + delimiter, finalDelimiter, strThings[-1]) |