diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/trial/_dist | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/trial/_dist')
9 files changed, 1143 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/__init__.py b/contrib/python/Twisted/py2/twisted/trial/_dist/__init__.py new file mode 100644 index 0000000000..502e840fef --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/__init__.py @@ -0,0 +1,47 @@ +# -*- test-case-name: twisted.trial._dist.test -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +This package implements the distributed Trial test runner: + + - The L{twisted.trial._dist.disttrial} module implements a test runner which + runs in a manager process and can launch additional worker processes in + which to run tests and gather up results from all of them. + + - The L{twisted.trial._dist.options} module defines command line options used + to configure the distributed test runner. + + - The L{twisted.trial._dist.managercommands} module defines AMP commands + which are sent from worker processes back to the manager process to report + the results of tests. + + - The L{twisted.trial._dist.workercommands} module defines AMP commands which + are sent from the manager process to the worker processes to control the + execution of tests there. + + - The L{twisted.trial._dist.distreporter} module defines a proxy for + L{twisted.trial.itrial.IReporter} which enforces the typical requirement + that results be passed to a reporter for only one test at a time, allowing + any reporter to be used with despite disttrial's simultaneously running + tests. + + - The L{twisted.trial._dist.workerreporter} module implements a + L{twisted.trial.itrial.IReporter} which is used by worker processes and + reports results back to the manager process using AMP commands. + + - The L{twisted.trial._dist.workertrial} module is a runnable script which is + the main point for worker processes. + + - The L{twisted.trial._dist.worker} process defines the manager's AMP + protocol for accepting results from worker processes and a process protocol + for use running workers as local child processes (as opposed to + distributing them to another host). + +@since: 12.3 +""" + +# File descriptors numbers used to set up pipes with the worker. +_WORKER_AMP_STDIN = 3 + +_WORKER_AMP_STDOUT = 4 diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/distreporter.py b/contrib/python/Twisted/py2/twisted/trial/_dist/distreporter.py new file mode 100644 index 0000000000..6648b7a0aa --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/distreporter.py @@ -0,0 +1,93 @@ +# -*- test-case-name: twisted.trial._dist.test.test_distreporter -*- +# +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +The reporter is not made to support concurrent test running, so we will +hold test results in here and only send them to the reporter once the +test is over. + +@since: 12.3 +""" + +from zope.interface import implementer +from twisted.trial.itrial import IReporter +from twisted.python.components import proxyForInterface + + + +@implementer(IReporter) +class DistReporter(proxyForInterface(IReporter)): + """ + See module docstring. + """ + + def __init__(self, original): + super(DistReporter, self).__init__(original) + self.running = {} + + + def startTest(self, test): + """ + Queue test starting. + """ + self.running[test.id()] = [] + self.running[test.id()].append((self.original.startTest, test)) + + + def addFailure(self, test, fail): + """ + Queue adding a failure. + """ + self.running[test.id()].append((self.original.addFailure, + test, fail)) + + + def addError(self, test, error): + """ + Queue error adding. + """ + self.running[test.id()].append((self.original.addError, + test, error)) + + + def addSkip(self, test, reason): + """ + Queue adding a skip. + """ + self.running[test.id()].append((self.original.addSkip, + test, reason)) + + + def addUnexpectedSuccess(self, test, todo=None): + """ + Queue adding an unexpected success. + """ + self.running[test.id()].append((self.original.addUnexpectedSuccess, + test, todo)) + + + def addExpectedFailure(self, test, error, todo=None): + """ + Queue adding an unexpected failure. + """ + self.running[test.id()].append((self.original.addExpectedFailure, + test, error, todo)) + + + def addSuccess(self, test): + """ + Queue adding a success. + """ + self.running[test.id()].append((self.original.addSuccess, test)) + + + def stopTest(self, test): + """ + Queue stopping the test, then unroll the queue. + """ + self.running[test.id()].append((self.original.stopTest, test)) + for step in self.running[test.id()]: + step[0](*step[1:]) + del self.running[test.id()] diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py b/contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py new file mode 100644 index 0000000000..96875b4a85 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py @@ -0,0 +1,258 @@ +# -*- test-case-name: twisted.trial._dist.test.test_disttrial -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +This module contains the trial distributed runner, the management class +responsible for coordinating all of trial's behavior at the highest level. + +@since: 12.3 +""" + +import os +import sys + +from twisted.python.filepath import FilePath +from twisted.python.modules import theSystemPath +from twisted.internet.defer import DeferredList +from twisted.internet.task import cooperate + +from twisted.trial.util import _unusedTestDirectory +from twisted.trial._asyncrunner import _iterateTests +from twisted.trial._dist.worker import LocalWorker, LocalWorkerAMP +from twisted.trial._dist.distreporter import DistReporter +from twisted.trial.reporter import UncleanWarningsReporterWrapper +from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT + + + +class DistTrialRunner(object): + """ + A specialized runner for distributed trial. The runner launches a number of + local worker processes which will run tests. + + @ivar _workerNumber: the number of workers to be spawned. + @type _workerNumber: C{int} + + @ivar _stream: stream which the reporter will use. + + @ivar _reporterFactory: the reporter class to be used. + """ + _distReporterFactory = DistReporter + + def _makeResult(self): + """ + Make reporter factory, and wrap it with a L{DistReporter}. + """ + reporter = self._reporterFactory(self._stream, self._tbformat, + realtime=self._rterrors) + if self._uncleanWarnings: + reporter = UncleanWarningsReporterWrapper(reporter) + return self._distReporterFactory(reporter) + + + def __init__(self, reporterFactory, workerNumber, workerArguments, + stream=None, + tracebackFormat='default', + realTimeErrors=False, + uncleanWarnings=False, + logfile='test.log', + workingDirectory='_trial_temp'): + self._workerNumber = workerNumber + self._workerArguments = workerArguments + self._reporterFactory = reporterFactory + if stream is None: + stream = sys.stdout + self._stream = stream + self._tbformat = tracebackFormat + self._rterrors = realTimeErrors + self._uncleanWarnings = uncleanWarnings + self._result = None + self._workingDirectory = workingDirectory + self._logFile = logfile + self._logFileObserver = None + self._logFileObject = None + self._logWarnings = False + + + def writeResults(self, result): + """ + Write test run final outcome to result. + + @param result: A C{TestResult} which will print errors and the summary. + """ + result.done() + + + def createLocalWorkers(self, protocols, workingDirectory): + """ + Create local worker protocol instances and return them. + + @param protocols: An iterable of L{LocalWorkerAMP} instances. + + @param workingDirectory: The base path in which we should run the + workers. + @type workingDirectory: C{str} + + @return: A list of C{quantity} C{LocalWorker} instances. + """ + return [LocalWorker(protocol, + os.path.join(workingDirectory, str(x)), + self._logFile) + for x, protocol in enumerate(protocols)] + + + def launchWorkerProcesses(self, spawner, protocols, arguments): + """ + Spawn processes from a list of process protocols. + + @param spawner: A C{IReactorProcess.spawnProcess} implementation. + + @param protocols: An iterable of C{ProcessProtocol} instances. + + @param arguments: Extra arguments passed to the processes. + """ + workertrialPath = theSystemPath[ + 'twisted.trial._dist.workertrial'].filePath.path + childFDs = {0: 'w', 1: 'r', 2: 'r', _WORKER_AMP_STDIN: 'w', + _WORKER_AMP_STDOUT: 'r'} + environ = os.environ.copy() + # Add an environment variable containing the raw sys.path, to be used by + # subprocesses to make sure it's identical to the parent. See + # workertrial._setupPath. + environ['TRIAL_PYTHONPATH'] = os.pathsep.join(sys.path) + for worker in protocols: + args = [sys.executable, workertrialPath] + args.extend(arguments) + spawner(worker, sys.executable, args=args, childFDs=childFDs, + env=environ) + + + def _driveWorker(self, worker, result, testCases, cooperate): + """ + Drive a L{LocalWorkerAMP} instance, iterating the tests and calling + C{run} for every one of them. + + @param worker: The L{LocalWorkerAMP} to drive. + + @param result: The global L{DistReporter} instance. + + @param testCases: The global list of tests to iterate. + + @param cooperate: The cooperate function to use, to be customized in + tests. + @type cooperate: C{function} + + @return: A C{Deferred} firing when all the tests are finished. + """ + + def resultErrback(error, case): + result.original.addFailure(case, error) + return error + + def task(case): + d = worker.run(case, result) + d.addErrback(resultErrback, case) + return d + + return cooperate(task(case) for case in testCases).whenDone() + + + def run(self, suite, reactor=None, cooperate=cooperate, + untilFailure=False): + """ + Spawn local worker processes and load tests. After that, run them. + + @param suite: A tests suite to be run. + + @param reactor: The reactor to use, to be customized in tests. + @type reactor: A provider of + L{twisted.internet.interfaces.IReactorProcess} + + @param cooperate: The cooperate function to use, to be customized in + tests. + @type cooperate: C{function} + + @param untilFailure: If C{True}, continue to run the tests until they + fail. + @type untilFailure: C{bool}. + + @return: The test result. + @rtype: L{DistReporter} + """ + if reactor is None: + from twisted.internet import reactor + result = self._makeResult() + count = suite.countTestCases() + self._stream.write("Running %d tests.\n" % (count,)) + + if not count: + # Take a shortcut if there is no test + suite.run(result.original) + self.writeResults(result) + return result + + testDir, testDirLock = _unusedTestDirectory( + FilePath(self._workingDirectory)) + workerNumber = min(count, self._workerNumber) + ampWorkers = [LocalWorkerAMP() for x in range(workerNumber)] + workers = self.createLocalWorkers(ampWorkers, testDir.path) + processEndDeferreds = [worker.endDeferred for worker in workers] + self.launchWorkerProcesses(reactor.spawnProcess, workers, + self._workerArguments) + + def runTests(): + testCases = iter(list(_iterateTests(suite))) + + workerDeferreds = [] + for worker in ampWorkers: + workerDeferreds.append( + self._driveWorker(worker, result, testCases, + cooperate=cooperate)) + return DeferredList(workerDeferreds, consumeErrors=True, + fireOnOneErrback=True) + + stopping = [] + + def nextRun(ign): + self.writeResults(result) + if not untilFailure: + return + if not result.wasSuccessful(): + return + d = runTests() + return d.addCallback(nextRun) + + def stop(ign): + testDirLock.unlock() + if not stopping: + stopping.append(None) + reactor.stop() + + def beforeShutDown(): + if not stopping: + stopping.append(None) + d = DeferredList(processEndDeferreds, consumeErrors=True) + return d.addCallback(continueShutdown) + + def continueShutdown(ign): + self.writeResults(result) + return ign + + d = runTests() + d.addCallback(nextRun) + d.addBoth(stop) + + reactor.addSystemEventTrigger('before', 'shutdown', beforeShutDown) + reactor.run() + + return result + + + def runUntilFailure(self, suite): + """ + Run the tests with local worker processes until they fail. + + @param suite: A tests suite to be run. + """ + return self.run(suite, untilFailure=True) diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/managercommands.py b/contrib/python/Twisted/py2/twisted/trial/_dist/managercommands.py new file mode 100644 index 0000000000..4e76b81ddf --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/managercommands.py @@ -0,0 +1,86 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Commands for reporting test success of failure to the manager. + +@since: 12.3 +""" + +from twisted.protocols.amp import Command, String, Boolean, ListOf, Unicode +from twisted.python.compat import _PY3 + +NativeString = Unicode if _PY3 else String + + + +class AddSuccess(Command): + """ + Add a success. + """ + arguments = [(b'testName', NativeString())] + response = [(b'success', Boolean())] + + + +class AddError(Command): + """ + Add an error. + """ + arguments = [(b'testName', NativeString()), + (b'error', NativeString()), + (b'errorClass', NativeString()), + (b'frames', ListOf(NativeString()))] + response = [(b'success', Boolean())] + + + +class AddFailure(Command): + """ + Add a failure. + """ + arguments = [(b'testName', NativeString()), + (b'fail', NativeString()), + (b'failClass', NativeString()), + (b'frames', ListOf(NativeString()))] + response = [(b'success', Boolean())] + + + +class AddSkip(Command): + """ + Add a skip. + """ + arguments = [(b'testName', NativeString()), + (b'reason', NativeString())] + response = [(b'success', Boolean())] + + + +class AddExpectedFailure(Command): + """ + Add an expected failure. + """ + arguments = [(b'testName', NativeString()), + (b'error', NativeString()), + (b'todo', NativeString())] + response = [(b'success', Boolean())] + + + +class AddUnexpectedSuccess(Command): + """ + Add an unexpected success. + """ + arguments = [(b'testName', NativeString()), + (b'todo', NativeString())] + response = [(b'success', Boolean())] + + + +class TestWrite(Command): + """ + Write test log. + """ + arguments = [(b'out', NativeString())] + response = [(b'success', Boolean())] diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/options.py b/contrib/python/Twisted/py2/twisted/trial/_dist/options.py new file mode 100644 index 0000000000..ee5ccd887a --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/options.py @@ -0,0 +1,30 @@ +# -*- test-case-name: twisted.trial._dist.test.test_options -*- +# +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Options handling specific to trial's workers. + +@since: 12.3 +""" + +from twisted.python.filepath import FilePath +from twisted.python.usage import Options +from twisted.scripts.trial import _BasicOptions +from twisted.application.app import ReactorSelectionMixin + + + +class WorkerOptions(_BasicOptions, Options, ReactorSelectionMixin): + """ + Options forwarded to the trial distributed worker. + """ + + + def coverdir(self): + """ + Return a L{FilePath} representing the directory into which coverage + results should be written. + """ + return FilePath('coverage') diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/worker.py b/contrib/python/Twisted/py2/twisted/trial/_dist/worker.py new file mode 100644 index 0000000000..ef13c069d6 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/worker.py @@ -0,0 +1,333 @@ +# -*- test-case-name: twisted.trial._dist.test.test_worker -*- +# +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +This module implements the worker classes. + +@since: 12.3 +""" + +import os + +from zope.interface import implementer + +from twisted.internet.protocol import ProcessProtocol +from twisted.internet.interfaces import ITransport, IAddress +from twisted.internet.defer import Deferred +from twisted.protocols.amp import AMP +from twisted.python.failure import Failure +from twisted.python.reflect import namedObject +from twisted.trial.unittest import Todo +from twisted.trial.runner import TrialSuite, TestLoader +from twisted.trial._dist import workercommands, managercommands +from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT +from twisted.trial._dist.workerreporter import WorkerReporter + + + +class WorkerProtocol(AMP): + """ + The worker-side trial distributed protocol. + """ + + def __init__(self, forceGarbageCollection=False): + self._loader = TestLoader() + self._result = WorkerReporter(self) + self._forceGarbageCollection = forceGarbageCollection + + + def run(self, testCase): + """ + Run a test case by name. + """ + case = self._loader.loadByName(testCase) + suite = TrialSuite([case], self._forceGarbageCollection) + suite.run(self._result) + return {'success': True} + + workercommands.Run.responder(run) + + + def start(self, directory): + """ + Set up the worker, moving into given directory for tests to run in + them. + """ + os.chdir(directory) + return {'success': True} + + workercommands.Start.responder(start) + + + +class LocalWorkerAMP(AMP): + """ + Local implementation of the manager commands. + """ + + def addSuccess(self, testName): + """ + Add a success to the reporter. + """ + self._result.addSuccess(self._testCase) + return {'success': True} + + managercommands.AddSuccess.responder(addSuccess) + + + def _buildFailure(self, error, errorClass, frames): + """ + Helper to build a C{Failure} with some traceback. + + @param error: An C{Exception} instance. + + @param error: The class name of the C{error} class. + + @param frames: A flat list of strings representing the information need + to approximatively rebuild C{Failure} frames. + + @return: A L{Failure} instance with enough information about a test + error. + """ + errorType = namedObject(errorClass) + failure = Failure(error, errorType) + for i in range(0, len(frames), 3): + failure.frames.append( + (frames[i], frames[i + 1], int(frames[i + 2]), [], [])) + return failure + + + def addError(self, testName, error, errorClass, frames): + """ + Add an error to the reporter. + """ + failure = self._buildFailure(error, errorClass, frames) + self._result.addError(self._testCase, failure) + return {'success': True} + + managercommands.AddError.responder(addError) + + + def addFailure(self, testName, fail, failClass, frames): + """ + Add a failure to the reporter. + """ + failure = self._buildFailure(fail, failClass, frames) + self._result.addFailure(self._testCase, failure) + return {'success': True} + + managercommands.AddFailure.responder(addFailure) + + + def addSkip(self, testName, reason): + """ + Add a skip to the reporter. + """ + self._result.addSkip(self._testCase, reason) + return {'success': True} + + managercommands.AddSkip.responder(addSkip) + + + def addExpectedFailure(self, testName, error, todo): + """ + Add an expected failure to the reporter. + """ + _todo = Todo(todo) + self._result.addExpectedFailure(self._testCase, error, _todo) + return {'success': True} + + managercommands.AddExpectedFailure.responder(addExpectedFailure) + + + def addUnexpectedSuccess(self, testName, todo): + """ + Add an unexpected success to the reporter. + """ + self._result.addUnexpectedSuccess(self._testCase, todo) + return {'success': True} + + managercommands.AddUnexpectedSuccess.responder(addUnexpectedSuccess) + + + def testWrite(self, out): + """ + Print test output from the worker. + """ + self._testStream.write(out + '\n') + self._testStream.flush() + return {'success': True} + + managercommands.TestWrite.responder(testWrite) + + + def _stopTest(self, result): + """ + Stop the current running test case, forwarding the result. + """ + self._result.stopTest(self._testCase) + return result + + + def run(self, testCase, result): + """ + Run a test. + """ + self._testCase = testCase + self._result = result + self._result.startTest(testCase) + testCaseId = testCase.id() + d = self.callRemote(workercommands.Run, testCase=testCaseId) + return d.addCallback(self._stopTest) + + + def setTestStream(self, stream): + """ + Set the stream used to log output from tests. + """ + self._testStream = stream + + + +@implementer(IAddress) +class LocalWorkerAddress(object): + """ + A L{IAddress} implementation meant to provide stub addresses for + L{ITransport.getPeer} and L{ITransport.getHost}. + """ + + + +@implementer(ITransport) +class LocalWorkerTransport(object): + """ + A stub transport implementation used to support L{AMP} over a + L{ProcessProtocol} transport. + """ + + def __init__(self, transport): + self._transport = transport + + + def write(self, data): + """ + Forward data to transport. + """ + self._transport.writeToChild(_WORKER_AMP_STDIN, data) + + + def writeSequence(self, sequence): + """ + Emulate C{writeSequence} by iterating data in the C{sequence}. + """ + for data in sequence: + self._transport.writeToChild(_WORKER_AMP_STDIN, data) + + + def loseConnection(self): + """ + Closes the transport. + """ + self._transport.loseConnection() + + + def getHost(self): + """ + Return a L{LocalWorkerAddress} instance. + """ + return LocalWorkerAddress() + + + def getPeer(self): + """ + Return a L{LocalWorkerAddress} instance. + """ + return LocalWorkerAddress() + + + +class LocalWorker(ProcessProtocol): + """ + Local process worker protocol. This worker runs as a local process and + communicates via stdin/out. + + @ivar _ampProtocol: The L{AMP} protocol instance used to communicate with + the worker. + + @ivar _logDirectory: The directory where logs will reside. + + @ivar _logFile: The name of the main log file for tests output. + """ + + def __init__(self, ampProtocol, logDirectory, logFile): + self._ampProtocol = ampProtocol + self._logDirectory = logDirectory + self._logFile = logFile + self.endDeferred = Deferred() + + + def connectionMade(self): + """ + When connection is made, create the AMP protocol instance. + """ + self._ampProtocol.makeConnection(LocalWorkerTransport(self.transport)) + if not os.path.exists(self._logDirectory): + os.makedirs(self._logDirectory) + self._outLog = open(os.path.join(self._logDirectory, 'out.log'), 'wb') + self._errLog = open(os.path.join(self._logDirectory, 'err.log'), 'wb') + self._testLog = open( + os.path.join(self._logDirectory, self._logFile), 'w') + self._ampProtocol.setTestStream(self._testLog) + logDirectory = self._logDirectory + d = self._ampProtocol.callRemote(workercommands.Start, + directory=logDirectory) + # Ignore the potential errors, the test suite will fail properly and it + # would just print garbage. + d.addErrback(lambda x: None) + + + def connectionLost(self, reason): + """ + On connection lost, close the log files that we're managing for stdin + and stdout. + """ + self._outLog.close() + self._errLog.close() + self._testLog.close() + + + def processEnded(self, reason): + """ + When the process closes, call C{connectionLost} for cleanup purposes + and forward the information to the C{_ampProtocol}. + """ + self.connectionLost(reason) + self._ampProtocol.connectionLost(reason) + self.endDeferred.callback(reason) + + + def outReceived(self, data): + """ + Send data received from stdout to log. + """ + + self._outLog.write(data) + + + def errReceived(self, data): + """ + Write error data to log. + """ + self._errLog.write(data) + + + def childDataReceived(self, childFD, data): + """ + Handle data received on the specific pipe for the C{_ampProtocol}. + """ + if childFD == _WORKER_AMP_STDOUT: + self._ampProtocol.dataReceived(data) + else: + ProcessProtocol.childDataReceived(self, childFD, data) diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/workercommands.py b/contrib/python/Twisted/py2/twisted/trial/_dist/workercommands.py new file mode 100644 index 0000000000..517cd67025 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/workercommands.py @@ -0,0 +1,31 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Commands for telling a worker to load tests or run tests. + +@since: 12.3 +""" + +from twisted.protocols.amp import Command, String, Boolean, Unicode +from twisted.python.compat import _PY3 + +NativeString = Unicode if _PY3 else String + + + +class Run(Command): + """ + Run a test. + """ + arguments = [(b'testCase', NativeString())] + response = [(b'success', Boolean())] + + + +class Start(Command): + """ + Set up the worker process, giving the running directory. + """ + arguments = [(b'directory', NativeString())] + response = [(b'success', Boolean())] diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/workerreporter.py b/contrib/python/Twisted/py2/twisted/trial/_dist/workerreporter.py new file mode 100644 index 0000000000..ce82c59994 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/workerreporter.py @@ -0,0 +1,154 @@ +# -*- test-case-name: twisted.trial._dist.test.test_workerreporter -*- +# +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Test reporter forwarding test results over trial distributed AMP commands. + +@since: 12.3 +""" + +from twisted.python.failure import Failure +from twisted.python.reflect import qual +from twisted.trial.reporter import TestResult +from twisted.trial._dist import managercommands + + + +class WorkerReporter(TestResult): + """ + Reporter for trial's distributed workers. We send things not through a + stream, but through an C{AMP} protocol's C{callRemote} method. + + @ivar _DEFAULT_TODO: Default message for expected failures and + unexpected successes, used only if a C{Todo} is not provided. + """ + + _DEFAULT_TODO = 'Test expected to fail' + + def __init__(self, ampProtocol): + """ + @param ampProtocol: The communication channel with the trial + distributed manager which collects all test results. + @type ampProtocol: C{AMP} + """ + super(WorkerReporter, self).__init__() + self.ampProtocol = ampProtocol + + + def _getFailure(self, error): + """ + Convert a C{sys.exc_info()}-style tuple to a L{Failure}, if necessary. + """ + if isinstance(error, tuple): + return Failure(error[1], error[0], error[2]) + return error + + + def _getFrames(self, failure): + """ + Extract frames from a C{Failure} instance. + """ + frames = [] + for frame in failure.frames: + frames.extend([frame[0], frame[1], str(frame[2])]) + return frames + + + def addSuccess(self, test): + """ + Send a success over. + """ + super(WorkerReporter, self).addSuccess(test) + testName = test.id() + self.ampProtocol.callRemote(managercommands.AddSuccess, + testName=testName) + + + def addError(self, test, error): + """ + Send an error over. + """ + super(WorkerReporter, self).addError(test, error) + testName = test.id() + failure = self._getFailure(error) + error = failure.getErrorMessage() + errorClass = qual(failure.type) + frames = [frame for frame in self._getFrames(failure)] + self.ampProtocol.callRemote(managercommands.AddError, + testName=testName, + error=error, + errorClass=errorClass, + frames=frames) + + + def addFailure(self, test, fail): + """ + Send a Failure over. + """ + super(WorkerReporter, self).addFailure(test, fail) + testName = test.id() + failure = self._getFailure(fail) + fail = failure.getErrorMessage() + failClass = qual(failure.type) + frames = [frame for frame in self._getFrames(failure)] + self.ampProtocol.callRemote(managercommands.AddFailure, + testName=testName, + fail=fail, + failClass=failClass, + frames=frames) + + + def addSkip(self, test, reason): + """ + Send a skip over. + """ + super(WorkerReporter, self).addSkip(test, reason) + reason = str(reason) + testName = test.id() + self.ampProtocol.callRemote(managercommands.AddSkip, + testName=testName, + reason=reason) + + + def _getTodoReason(self, todo): + """ + Get the reason for a C{Todo}. + + If C{todo} is L{None}, return a sensible default. + """ + if todo is None: + return self._DEFAULT_TODO + else: + return todo.reason + + + def addExpectedFailure(self, test, error, todo=None): + """ + Send an expected failure over. + """ + super(WorkerReporter, self).addExpectedFailure(test, error, todo) + errorMessage = error.getErrorMessage() + testName = test.id() + self.ampProtocol.callRemote(managercommands.AddExpectedFailure, + testName=testName, + error=errorMessage, + todo=self._getTodoReason(todo)) + + + def addUnexpectedSuccess(self, test, todo=None): + """ + Send an unexpected success over. + """ + super(WorkerReporter, self).addUnexpectedSuccess(test, todo) + testName = test.id() + self.ampProtocol.callRemote(managercommands.AddUnexpectedSuccess, + testName=testName, + todo=self._getTodoReason(todo)) + + + def printSummary(self): + """ + I{Don't} print a summary + """ diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/workertrial.py b/contrib/python/Twisted/py2/twisted/trial/_dist/workertrial.py new file mode 100644 index 0000000000..dd14dd61ef --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/workertrial.py @@ -0,0 +1,111 @@ +# -*- test-case-name: twisted.trial._dist.test.test_workertrial -*- +# +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Implementation of C{AMP} worker commands, and main executable entry point for +the workers. + +@since: 12.3 +""" + +import sys +import os +import errno + + + +def _setupPath(environ): + """ + Override C{sys.path} with what the parent passed in B{TRIAL_PYTHONPATH}. + + @see: twisted.trial._dist.disttrial.DistTrialRunner.launchWorkerProcesses + """ + if 'TRIAL_PYTHONPATH' in environ: + sys.path[:] = environ['TRIAL_PYTHONPATH'].split(os.pathsep) + + +_setupPath(os.environ) + + +from twisted.internet.protocol import FileWrapper +from twisted.python.log import startLoggingWithObserver, textFromEventDict +from twisted.trial._dist.options import WorkerOptions +from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT + + + +class WorkerLogObserver(object): + """ + A log observer that forward its output to a C{AMP} protocol. + """ + + def __init__(self, protocol): + """ + @param protocol: a connected C{AMP} protocol instance. + @type protocol: C{AMP} + """ + self.protocol = protocol + + + def emit(self, eventDict): + """ + Produce a log output. + """ + from twisted.trial._dist import managercommands + text = textFromEventDict(eventDict) + if text is None: + return + self.protocol.callRemote(managercommands.TestWrite, out=text) + + + +def main(_fdopen=os.fdopen): + """ + Main function to be run if __name__ == "__main__". + + @param _fdopen: If specified, the function to use in place of C{os.fdopen}. + @param _fdopen: C{callable} + """ + config = WorkerOptions() + config.parseOptions() + + from twisted.trial._dist.worker import WorkerProtocol + workerProtocol = WorkerProtocol(config['force-gc']) + + protocolIn = _fdopen(_WORKER_AMP_STDIN, 'rb') + protocolOut = _fdopen(_WORKER_AMP_STDOUT, 'wb') + workerProtocol.makeConnection(FileWrapper(protocolOut)) + + observer = WorkerLogObserver(workerProtocol) + startLoggingWithObserver(observer.emit, False) + + while True: + try: + r = protocolIn.read(1) + except IOError as e: + if e.args[0] == errno.EINTR: + if sys.version_info < (3, 0): + sys.exc_clear() + continue + else: + raise + if r == b'': + break + else: + workerProtocol.dataReceived(r) + protocolOut.flush() + sys.stdout.flush() + sys.stderr.flush() + + if config.tracer: + sys.settrace(None) + results = config.tracer.results() + results.write_results(show_missing=True, summary=False, + coverdir=config.coverdir().path) + + + +if __name__ == '__main__': + main() |