aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py2/twisted/trial/_dist
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/trial/_dist
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/trial/_dist')
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/__init__.py47
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/distreporter.py93
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py258
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/managercommands.py86
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/options.py30
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/worker.py333
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/workercommands.py31
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/workerreporter.py154
-rw-r--r--contrib/python/Twisted/py2/twisted/trial/_dist/workertrial.py111
9 files changed, 1143 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/__init__.py b/contrib/python/Twisted/py2/twisted/trial/_dist/__init__.py
new file mode 100644
index 0000000000..502e840fef
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/__init__.py
@@ -0,0 +1,47 @@
+# -*- test-case-name: twisted.trial._dist.test -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+This package implements the distributed Trial test runner:
+
+ - The L{twisted.trial._dist.disttrial} module implements a test runner which
+ runs in a manager process and can launch additional worker processes in
+ which to run tests and gather up results from all of them.
+
+ - The L{twisted.trial._dist.options} module defines command line options used
+ to configure the distributed test runner.
+
+ - The L{twisted.trial._dist.managercommands} module defines AMP commands
+ which are sent from worker processes back to the manager process to report
+ the results of tests.
+
+ - The L{twisted.trial._dist.workercommands} module defines AMP commands which
+ are sent from the manager process to the worker processes to control the
+ execution of tests there.
+
+ - The L{twisted.trial._dist.distreporter} module defines a proxy for
+ L{twisted.trial.itrial.IReporter} which enforces the typical requirement
+ that results be passed to a reporter for only one test at a time, allowing
+ any reporter to be used with despite disttrial's simultaneously running
+ tests.
+
+ - The L{twisted.trial._dist.workerreporter} module implements a
+ L{twisted.trial.itrial.IReporter} which is used by worker processes and
+ reports results back to the manager process using AMP commands.
+
+ - The L{twisted.trial._dist.workertrial} module is a runnable script which is
+ the main point for worker processes.
+
+ - The L{twisted.trial._dist.worker} process defines the manager's AMP
+ protocol for accepting results from worker processes and a process protocol
+ for use running workers as local child processes (as opposed to
+ distributing them to another host).
+
+@since: 12.3
+"""
+
+# File descriptors numbers used to set up pipes with the worker.
+_WORKER_AMP_STDIN = 3
+
+_WORKER_AMP_STDOUT = 4
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/distreporter.py b/contrib/python/Twisted/py2/twisted/trial/_dist/distreporter.py
new file mode 100644
index 0000000000..6648b7a0aa
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/distreporter.py
@@ -0,0 +1,93 @@
+# -*- test-case-name: twisted.trial._dist.test.test_distreporter -*-
+#
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+The reporter is not made to support concurrent test running, so we will
+hold test results in here and only send them to the reporter once the
+test is over.
+
+@since: 12.3
+"""
+
+from zope.interface import implementer
+from twisted.trial.itrial import IReporter
+from twisted.python.components import proxyForInterface
+
+
+
+@implementer(IReporter)
+class DistReporter(proxyForInterface(IReporter)):
+ """
+ See module docstring.
+ """
+
+ def __init__(self, original):
+ super(DistReporter, self).__init__(original)
+ self.running = {}
+
+
+ def startTest(self, test):
+ """
+ Queue test starting.
+ """
+ self.running[test.id()] = []
+ self.running[test.id()].append((self.original.startTest, test))
+
+
+ def addFailure(self, test, fail):
+ """
+ Queue adding a failure.
+ """
+ self.running[test.id()].append((self.original.addFailure,
+ test, fail))
+
+
+ def addError(self, test, error):
+ """
+ Queue error adding.
+ """
+ self.running[test.id()].append((self.original.addError,
+ test, error))
+
+
+ def addSkip(self, test, reason):
+ """
+ Queue adding a skip.
+ """
+ self.running[test.id()].append((self.original.addSkip,
+ test, reason))
+
+
+ def addUnexpectedSuccess(self, test, todo=None):
+ """
+ Queue adding an unexpected success.
+ """
+ self.running[test.id()].append((self.original.addUnexpectedSuccess,
+ test, todo))
+
+
+ def addExpectedFailure(self, test, error, todo=None):
+ """
+ Queue adding an unexpected failure.
+ """
+ self.running[test.id()].append((self.original.addExpectedFailure,
+ test, error, todo))
+
+
+ def addSuccess(self, test):
+ """
+ Queue adding a success.
+ """
+ self.running[test.id()].append((self.original.addSuccess, test))
+
+
+ def stopTest(self, test):
+ """
+ Queue stopping the test, then unroll the queue.
+ """
+ self.running[test.id()].append((self.original.stopTest, test))
+ for step in self.running[test.id()]:
+ step[0](*step[1:])
+ del self.running[test.id()]
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py b/contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py
new file mode 100644
index 0000000000..96875b4a85
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py
@@ -0,0 +1,258 @@
+# -*- test-case-name: twisted.trial._dist.test.test_disttrial -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+This module contains the trial distributed runner, the management class
+responsible for coordinating all of trial's behavior at the highest level.
+
+@since: 12.3
+"""
+
+import os
+import sys
+
+from twisted.python.filepath import FilePath
+from twisted.python.modules import theSystemPath
+from twisted.internet.defer import DeferredList
+from twisted.internet.task import cooperate
+
+from twisted.trial.util import _unusedTestDirectory
+from twisted.trial._asyncrunner import _iterateTests
+from twisted.trial._dist.worker import LocalWorker, LocalWorkerAMP
+from twisted.trial._dist.distreporter import DistReporter
+from twisted.trial.reporter import UncleanWarningsReporterWrapper
+from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT
+
+
+
+class DistTrialRunner(object):
+ """
+ A specialized runner for distributed trial. The runner launches a number of
+ local worker processes which will run tests.
+
+ @ivar _workerNumber: the number of workers to be spawned.
+ @type _workerNumber: C{int}
+
+ @ivar _stream: stream which the reporter will use.
+
+ @ivar _reporterFactory: the reporter class to be used.
+ """
+ _distReporterFactory = DistReporter
+
+ def _makeResult(self):
+ """
+ Make reporter factory, and wrap it with a L{DistReporter}.
+ """
+ reporter = self._reporterFactory(self._stream, self._tbformat,
+ realtime=self._rterrors)
+ if self._uncleanWarnings:
+ reporter = UncleanWarningsReporterWrapper(reporter)
+ return self._distReporterFactory(reporter)
+
+
+ def __init__(self, reporterFactory, workerNumber, workerArguments,
+ stream=None,
+ tracebackFormat='default',
+ realTimeErrors=False,
+ uncleanWarnings=False,
+ logfile='test.log',
+ workingDirectory='_trial_temp'):
+ self._workerNumber = workerNumber
+ self._workerArguments = workerArguments
+ self._reporterFactory = reporterFactory
+ if stream is None:
+ stream = sys.stdout
+ self._stream = stream
+ self._tbformat = tracebackFormat
+ self._rterrors = realTimeErrors
+ self._uncleanWarnings = uncleanWarnings
+ self._result = None
+ self._workingDirectory = workingDirectory
+ self._logFile = logfile
+ self._logFileObserver = None
+ self._logFileObject = None
+ self._logWarnings = False
+
+
+ def writeResults(self, result):
+ """
+ Write test run final outcome to result.
+
+ @param result: A C{TestResult} which will print errors and the summary.
+ """
+ result.done()
+
+
+ def createLocalWorkers(self, protocols, workingDirectory):
+ """
+ Create local worker protocol instances and return them.
+
+ @param protocols: An iterable of L{LocalWorkerAMP} instances.
+
+ @param workingDirectory: The base path in which we should run the
+ workers.
+ @type workingDirectory: C{str}
+
+ @return: A list of C{quantity} C{LocalWorker} instances.
+ """
+ return [LocalWorker(protocol,
+ os.path.join(workingDirectory, str(x)),
+ self._logFile)
+ for x, protocol in enumerate(protocols)]
+
+
+ def launchWorkerProcesses(self, spawner, protocols, arguments):
+ """
+ Spawn processes from a list of process protocols.
+
+ @param spawner: A C{IReactorProcess.spawnProcess} implementation.
+
+ @param protocols: An iterable of C{ProcessProtocol} instances.
+
+ @param arguments: Extra arguments passed to the processes.
+ """
+ workertrialPath = theSystemPath[
+ 'twisted.trial._dist.workertrial'].filePath.path
+ childFDs = {0: 'w', 1: 'r', 2: 'r', _WORKER_AMP_STDIN: 'w',
+ _WORKER_AMP_STDOUT: 'r'}
+ environ = os.environ.copy()
+ # Add an environment variable containing the raw sys.path, to be used by
+ # subprocesses to make sure it's identical to the parent. See
+ # workertrial._setupPath.
+ environ['TRIAL_PYTHONPATH'] = os.pathsep.join(sys.path)
+ for worker in protocols:
+ args = [sys.executable, workertrialPath]
+ args.extend(arguments)
+ spawner(worker, sys.executable, args=args, childFDs=childFDs,
+ env=environ)
+
+
+ def _driveWorker(self, worker, result, testCases, cooperate):
+ """
+ Drive a L{LocalWorkerAMP} instance, iterating the tests and calling
+ C{run} for every one of them.
+
+ @param worker: The L{LocalWorkerAMP} to drive.
+
+ @param result: The global L{DistReporter} instance.
+
+ @param testCases: The global list of tests to iterate.
+
+ @param cooperate: The cooperate function to use, to be customized in
+ tests.
+ @type cooperate: C{function}
+
+ @return: A C{Deferred} firing when all the tests are finished.
+ """
+
+ def resultErrback(error, case):
+ result.original.addFailure(case, error)
+ return error
+
+ def task(case):
+ d = worker.run(case, result)
+ d.addErrback(resultErrback, case)
+ return d
+
+ return cooperate(task(case) for case in testCases).whenDone()
+
+
+ def run(self, suite, reactor=None, cooperate=cooperate,
+ untilFailure=False):
+ """
+ Spawn local worker processes and load tests. After that, run them.
+
+ @param suite: A tests suite to be run.
+
+ @param reactor: The reactor to use, to be customized in tests.
+ @type reactor: A provider of
+ L{twisted.internet.interfaces.IReactorProcess}
+
+ @param cooperate: The cooperate function to use, to be customized in
+ tests.
+ @type cooperate: C{function}
+
+ @param untilFailure: If C{True}, continue to run the tests until they
+ fail.
+ @type untilFailure: C{bool}.
+
+ @return: The test result.
+ @rtype: L{DistReporter}
+ """
+ if reactor is None:
+ from twisted.internet import reactor
+ result = self._makeResult()
+ count = suite.countTestCases()
+ self._stream.write("Running %d tests.\n" % (count,))
+
+ if not count:
+ # Take a shortcut if there is no test
+ suite.run(result.original)
+ self.writeResults(result)
+ return result
+
+ testDir, testDirLock = _unusedTestDirectory(
+ FilePath(self._workingDirectory))
+ workerNumber = min(count, self._workerNumber)
+ ampWorkers = [LocalWorkerAMP() for x in range(workerNumber)]
+ workers = self.createLocalWorkers(ampWorkers, testDir.path)
+ processEndDeferreds = [worker.endDeferred for worker in workers]
+ self.launchWorkerProcesses(reactor.spawnProcess, workers,
+ self._workerArguments)
+
+ def runTests():
+ testCases = iter(list(_iterateTests(suite)))
+
+ workerDeferreds = []
+ for worker in ampWorkers:
+ workerDeferreds.append(
+ self._driveWorker(worker, result, testCases,
+ cooperate=cooperate))
+ return DeferredList(workerDeferreds, consumeErrors=True,
+ fireOnOneErrback=True)
+
+ stopping = []
+
+ def nextRun(ign):
+ self.writeResults(result)
+ if not untilFailure:
+ return
+ if not result.wasSuccessful():
+ return
+ d = runTests()
+ return d.addCallback(nextRun)
+
+ def stop(ign):
+ testDirLock.unlock()
+ if not stopping:
+ stopping.append(None)
+ reactor.stop()
+
+ def beforeShutDown():
+ if not stopping:
+ stopping.append(None)
+ d = DeferredList(processEndDeferreds, consumeErrors=True)
+ return d.addCallback(continueShutdown)
+
+ def continueShutdown(ign):
+ self.writeResults(result)
+ return ign
+
+ d = runTests()
+ d.addCallback(nextRun)
+ d.addBoth(stop)
+
+ reactor.addSystemEventTrigger('before', 'shutdown', beforeShutDown)
+ reactor.run()
+
+ return result
+
+
+ def runUntilFailure(self, suite):
+ """
+ Run the tests with local worker processes until they fail.
+
+ @param suite: A tests suite to be run.
+ """
+ return self.run(suite, untilFailure=True)
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/managercommands.py b/contrib/python/Twisted/py2/twisted/trial/_dist/managercommands.py
new file mode 100644
index 0000000000..4e76b81ddf
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/managercommands.py
@@ -0,0 +1,86 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Commands for reporting test success of failure to the manager.
+
+@since: 12.3
+"""
+
+from twisted.protocols.amp import Command, String, Boolean, ListOf, Unicode
+from twisted.python.compat import _PY3
+
+NativeString = Unicode if _PY3 else String
+
+
+
+class AddSuccess(Command):
+ """
+ Add a success.
+ """
+ arguments = [(b'testName', NativeString())]
+ response = [(b'success', Boolean())]
+
+
+
+class AddError(Command):
+ """
+ Add an error.
+ """
+ arguments = [(b'testName', NativeString()),
+ (b'error', NativeString()),
+ (b'errorClass', NativeString()),
+ (b'frames', ListOf(NativeString()))]
+ response = [(b'success', Boolean())]
+
+
+
+class AddFailure(Command):
+ """
+ Add a failure.
+ """
+ arguments = [(b'testName', NativeString()),
+ (b'fail', NativeString()),
+ (b'failClass', NativeString()),
+ (b'frames', ListOf(NativeString()))]
+ response = [(b'success', Boolean())]
+
+
+
+class AddSkip(Command):
+ """
+ Add a skip.
+ """
+ arguments = [(b'testName', NativeString()),
+ (b'reason', NativeString())]
+ response = [(b'success', Boolean())]
+
+
+
+class AddExpectedFailure(Command):
+ """
+ Add an expected failure.
+ """
+ arguments = [(b'testName', NativeString()),
+ (b'error', NativeString()),
+ (b'todo', NativeString())]
+ response = [(b'success', Boolean())]
+
+
+
+class AddUnexpectedSuccess(Command):
+ """
+ Add an unexpected success.
+ """
+ arguments = [(b'testName', NativeString()),
+ (b'todo', NativeString())]
+ response = [(b'success', Boolean())]
+
+
+
+class TestWrite(Command):
+ """
+ Write test log.
+ """
+ arguments = [(b'out', NativeString())]
+ response = [(b'success', Boolean())]
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/options.py b/contrib/python/Twisted/py2/twisted/trial/_dist/options.py
new file mode 100644
index 0000000000..ee5ccd887a
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/options.py
@@ -0,0 +1,30 @@
+# -*- test-case-name: twisted.trial._dist.test.test_options -*-
+#
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Options handling specific to trial's workers.
+
+@since: 12.3
+"""
+
+from twisted.python.filepath import FilePath
+from twisted.python.usage import Options
+from twisted.scripts.trial import _BasicOptions
+from twisted.application.app import ReactorSelectionMixin
+
+
+
+class WorkerOptions(_BasicOptions, Options, ReactorSelectionMixin):
+ """
+ Options forwarded to the trial distributed worker.
+ """
+
+
+ def coverdir(self):
+ """
+ Return a L{FilePath} representing the directory into which coverage
+ results should be written.
+ """
+ return FilePath('coverage')
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/worker.py b/contrib/python/Twisted/py2/twisted/trial/_dist/worker.py
new file mode 100644
index 0000000000..ef13c069d6
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/worker.py
@@ -0,0 +1,333 @@
+# -*- test-case-name: twisted.trial._dist.test.test_worker -*-
+#
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+This module implements the worker classes.
+
+@since: 12.3
+"""
+
+import os
+
+from zope.interface import implementer
+
+from twisted.internet.protocol import ProcessProtocol
+from twisted.internet.interfaces import ITransport, IAddress
+from twisted.internet.defer import Deferred
+from twisted.protocols.amp import AMP
+from twisted.python.failure import Failure
+from twisted.python.reflect import namedObject
+from twisted.trial.unittest import Todo
+from twisted.trial.runner import TrialSuite, TestLoader
+from twisted.trial._dist import workercommands, managercommands
+from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT
+from twisted.trial._dist.workerreporter import WorkerReporter
+
+
+
+class WorkerProtocol(AMP):
+ """
+ The worker-side trial distributed protocol.
+ """
+
+ def __init__(self, forceGarbageCollection=False):
+ self._loader = TestLoader()
+ self._result = WorkerReporter(self)
+ self._forceGarbageCollection = forceGarbageCollection
+
+
+ def run(self, testCase):
+ """
+ Run a test case by name.
+ """
+ case = self._loader.loadByName(testCase)
+ suite = TrialSuite([case], self._forceGarbageCollection)
+ suite.run(self._result)
+ return {'success': True}
+
+ workercommands.Run.responder(run)
+
+
+ def start(self, directory):
+ """
+ Set up the worker, moving into given directory for tests to run in
+ them.
+ """
+ os.chdir(directory)
+ return {'success': True}
+
+ workercommands.Start.responder(start)
+
+
+
+class LocalWorkerAMP(AMP):
+ """
+ Local implementation of the manager commands.
+ """
+
+ def addSuccess(self, testName):
+ """
+ Add a success to the reporter.
+ """
+ self._result.addSuccess(self._testCase)
+ return {'success': True}
+
+ managercommands.AddSuccess.responder(addSuccess)
+
+
+ def _buildFailure(self, error, errorClass, frames):
+ """
+ Helper to build a C{Failure} with some traceback.
+
+ @param error: An C{Exception} instance.
+
+ @param error: The class name of the C{error} class.
+
+ @param frames: A flat list of strings representing the information need
+ to approximatively rebuild C{Failure} frames.
+
+ @return: A L{Failure} instance with enough information about a test
+ error.
+ """
+ errorType = namedObject(errorClass)
+ failure = Failure(error, errorType)
+ for i in range(0, len(frames), 3):
+ failure.frames.append(
+ (frames[i], frames[i + 1], int(frames[i + 2]), [], []))
+ return failure
+
+
+ def addError(self, testName, error, errorClass, frames):
+ """
+ Add an error to the reporter.
+ """
+ failure = self._buildFailure(error, errorClass, frames)
+ self._result.addError(self._testCase, failure)
+ return {'success': True}
+
+ managercommands.AddError.responder(addError)
+
+
+ def addFailure(self, testName, fail, failClass, frames):
+ """
+ Add a failure to the reporter.
+ """
+ failure = self._buildFailure(fail, failClass, frames)
+ self._result.addFailure(self._testCase, failure)
+ return {'success': True}
+
+ managercommands.AddFailure.responder(addFailure)
+
+
+ def addSkip(self, testName, reason):
+ """
+ Add a skip to the reporter.
+ """
+ self._result.addSkip(self._testCase, reason)
+ return {'success': True}
+
+ managercommands.AddSkip.responder(addSkip)
+
+
+ def addExpectedFailure(self, testName, error, todo):
+ """
+ Add an expected failure to the reporter.
+ """
+ _todo = Todo(todo)
+ self._result.addExpectedFailure(self._testCase, error, _todo)
+ return {'success': True}
+
+ managercommands.AddExpectedFailure.responder(addExpectedFailure)
+
+
+ def addUnexpectedSuccess(self, testName, todo):
+ """
+ Add an unexpected success to the reporter.
+ """
+ self._result.addUnexpectedSuccess(self._testCase, todo)
+ return {'success': True}
+
+ managercommands.AddUnexpectedSuccess.responder(addUnexpectedSuccess)
+
+
+ def testWrite(self, out):
+ """
+ Print test output from the worker.
+ """
+ self._testStream.write(out + '\n')
+ self._testStream.flush()
+ return {'success': True}
+
+ managercommands.TestWrite.responder(testWrite)
+
+
+ def _stopTest(self, result):
+ """
+ Stop the current running test case, forwarding the result.
+ """
+ self._result.stopTest(self._testCase)
+ return result
+
+
+ def run(self, testCase, result):
+ """
+ Run a test.
+ """
+ self._testCase = testCase
+ self._result = result
+ self._result.startTest(testCase)
+ testCaseId = testCase.id()
+ d = self.callRemote(workercommands.Run, testCase=testCaseId)
+ return d.addCallback(self._stopTest)
+
+
+ def setTestStream(self, stream):
+ """
+ Set the stream used to log output from tests.
+ """
+ self._testStream = stream
+
+
+
+@implementer(IAddress)
+class LocalWorkerAddress(object):
+ """
+ A L{IAddress} implementation meant to provide stub addresses for
+ L{ITransport.getPeer} and L{ITransport.getHost}.
+ """
+
+
+
+@implementer(ITransport)
+class LocalWorkerTransport(object):
+ """
+ A stub transport implementation used to support L{AMP} over a
+ L{ProcessProtocol} transport.
+ """
+
+ def __init__(self, transport):
+ self._transport = transport
+
+
+ def write(self, data):
+ """
+ Forward data to transport.
+ """
+ self._transport.writeToChild(_WORKER_AMP_STDIN, data)
+
+
+ def writeSequence(self, sequence):
+ """
+ Emulate C{writeSequence} by iterating data in the C{sequence}.
+ """
+ for data in sequence:
+ self._transport.writeToChild(_WORKER_AMP_STDIN, data)
+
+
+ def loseConnection(self):
+ """
+ Closes the transport.
+ """
+ self._transport.loseConnection()
+
+
+ def getHost(self):
+ """
+ Return a L{LocalWorkerAddress} instance.
+ """
+ return LocalWorkerAddress()
+
+
+ def getPeer(self):
+ """
+ Return a L{LocalWorkerAddress} instance.
+ """
+ return LocalWorkerAddress()
+
+
+
+class LocalWorker(ProcessProtocol):
+ """
+ Local process worker protocol. This worker runs as a local process and
+ communicates via stdin/out.
+
+ @ivar _ampProtocol: The L{AMP} protocol instance used to communicate with
+ the worker.
+
+ @ivar _logDirectory: The directory where logs will reside.
+
+ @ivar _logFile: The name of the main log file for tests output.
+ """
+
+ def __init__(self, ampProtocol, logDirectory, logFile):
+ self._ampProtocol = ampProtocol
+ self._logDirectory = logDirectory
+ self._logFile = logFile
+ self.endDeferred = Deferred()
+
+
+ def connectionMade(self):
+ """
+ When connection is made, create the AMP protocol instance.
+ """
+ self._ampProtocol.makeConnection(LocalWorkerTransport(self.transport))
+ if not os.path.exists(self._logDirectory):
+ os.makedirs(self._logDirectory)
+ self._outLog = open(os.path.join(self._logDirectory, 'out.log'), 'wb')
+ self._errLog = open(os.path.join(self._logDirectory, 'err.log'), 'wb')
+ self._testLog = open(
+ os.path.join(self._logDirectory, self._logFile), 'w')
+ self._ampProtocol.setTestStream(self._testLog)
+ logDirectory = self._logDirectory
+ d = self._ampProtocol.callRemote(workercommands.Start,
+ directory=logDirectory)
+ # Ignore the potential errors, the test suite will fail properly and it
+ # would just print garbage.
+ d.addErrback(lambda x: None)
+
+
+ def connectionLost(self, reason):
+ """
+ On connection lost, close the log files that we're managing for stdin
+ and stdout.
+ """
+ self._outLog.close()
+ self._errLog.close()
+ self._testLog.close()
+
+
+ def processEnded(self, reason):
+ """
+ When the process closes, call C{connectionLost} for cleanup purposes
+ and forward the information to the C{_ampProtocol}.
+ """
+ self.connectionLost(reason)
+ self._ampProtocol.connectionLost(reason)
+ self.endDeferred.callback(reason)
+
+
+ def outReceived(self, data):
+ """
+ Send data received from stdout to log.
+ """
+
+ self._outLog.write(data)
+
+
+ def errReceived(self, data):
+ """
+ Write error data to log.
+ """
+ self._errLog.write(data)
+
+
+ def childDataReceived(self, childFD, data):
+ """
+ Handle data received on the specific pipe for the C{_ampProtocol}.
+ """
+ if childFD == _WORKER_AMP_STDOUT:
+ self._ampProtocol.dataReceived(data)
+ else:
+ ProcessProtocol.childDataReceived(self, childFD, data)
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/workercommands.py b/contrib/python/Twisted/py2/twisted/trial/_dist/workercommands.py
new file mode 100644
index 0000000000..517cd67025
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/workercommands.py
@@ -0,0 +1,31 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Commands for telling a worker to load tests or run tests.
+
+@since: 12.3
+"""
+
+from twisted.protocols.amp import Command, String, Boolean, Unicode
+from twisted.python.compat import _PY3
+
+NativeString = Unicode if _PY3 else String
+
+
+
+class Run(Command):
+ """
+ Run a test.
+ """
+ arguments = [(b'testCase', NativeString())]
+ response = [(b'success', Boolean())]
+
+
+
+class Start(Command):
+ """
+ Set up the worker process, giving the running directory.
+ """
+ arguments = [(b'directory', NativeString())]
+ response = [(b'success', Boolean())]
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/workerreporter.py b/contrib/python/Twisted/py2/twisted/trial/_dist/workerreporter.py
new file mode 100644
index 0000000000..ce82c59994
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/workerreporter.py
@@ -0,0 +1,154 @@
+# -*- test-case-name: twisted.trial._dist.test.test_workerreporter -*-
+#
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Test reporter forwarding test results over trial distributed AMP commands.
+
+@since: 12.3
+"""
+
+from twisted.python.failure import Failure
+from twisted.python.reflect import qual
+from twisted.trial.reporter import TestResult
+from twisted.trial._dist import managercommands
+
+
+
+class WorkerReporter(TestResult):
+ """
+ Reporter for trial's distributed workers. We send things not through a
+ stream, but through an C{AMP} protocol's C{callRemote} method.
+
+ @ivar _DEFAULT_TODO: Default message for expected failures and
+ unexpected successes, used only if a C{Todo} is not provided.
+ """
+
+ _DEFAULT_TODO = 'Test expected to fail'
+
+ def __init__(self, ampProtocol):
+ """
+ @param ampProtocol: The communication channel with the trial
+ distributed manager which collects all test results.
+ @type ampProtocol: C{AMP}
+ """
+ super(WorkerReporter, self).__init__()
+ self.ampProtocol = ampProtocol
+
+
+ def _getFailure(self, error):
+ """
+ Convert a C{sys.exc_info()}-style tuple to a L{Failure}, if necessary.
+ """
+ if isinstance(error, tuple):
+ return Failure(error[1], error[0], error[2])
+ return error
+
+
+ def _getFrames(self, failure):
+ """
+ Extract frames from a C{Failure} instance.
+ """
+ frames = []
+ for frame in failure.frames:
+ frames.extend([frame[0], frame[1], str(frame[2])])
+ return frames
+
+
+ def addSuccess(self, test):
+ """
+ Send a success over.
+ """
+ super(WorkerReporter, self).addSuccess(test)
+ testName = test.id()
+ self.ampProtocol.callRemote(managercommands.AddSuccess,
+ testName=testName)
+
+
+ def addError(self, test, error):
+ """
+ Send an error over.
+ """
+ super(WorkerReporter, self).addError(test, error)
+ testName = test.id()
+ failure = self._getFailure(error)
+ error = failure.getErrorMessage()
+ errorClass = qual(failure.type)
+ frames = [frame for frame in self._getFrames(failure)]
+ self.ampProtocol.callRemote(managercommands.AddError,
+ testName=testName,
+ error=error,
+ errorClass=errorClass,
+ frames=frames)
+
+
+ def addFailure(self, test, fail):
+ """
+ Send a Failure over.
+ """
+ super(WorkerReporter, self).addFailure(test, fail)
+ testName = test.id()
+ failure = self._getFailure(fail)
+ fail = failure.getErrorMessage()
+ failClass = qual(failure.type)
+ frames = [frame for frame in self._getFrames(failure)]
+ self.ampProtocol.callRemote(managercommands.AddFailure,
+ testName=testName,
+ fail=fail,
+ failClass=failClass,
+ frames=frames)
+
+
+ def addSkip(self, test, reason):
+ """
+ Send a skip over.
+ """
+ super(WorkerReporter, self).addSkip(test, reason)
+ reason = str(reason)
+ testName = test.id()
+ self.ampProtocol.callRemote(managercommands.AddSkip,
+ testName=testName,
+ reason=reason)
+
+
+ def _getTodoReason(self, todo):
+ """
+ Get the reason for a C{Todo}.
+
+ If C{todo} is L{None}, return a sensible default.
+ """
+ if todo is None:
+ return self._DEFAULT_TODO
+ else:
+ return todo.reason
+
+
+ def addExpectedFailure(self, test, error, todo=None):
+ """
+ Send an expected failure over.
+ """
+ super(WorkerReporter, self).addExpectedFailure(test, error, todo)
+ errorMessage = error.getErrorMessage()
+ testName = test.id()
+ self.ampProtocol.callRemote(managercommands.AddExpectedFailure,
+ testName=testName,
+ error=errorMessage,
+ todo=self._getTodoReason(todo))
+
+
+ def addUnexpectedSuccess(self, test, todo=None):
+ """
+ Send an unexpected success over.
+ """
+ super(WorkerReporter, self).addUnexpectedSuccess(test, todo)
+ testName = test.id()
+ self.ampProtocol.callRemote(managercommands.AddUnexpectedSuccess,
+ testName=testName,
+ todo=self._getTodoReason(todo))
+
+
+ def printSummary(self):
+ """
+ I{Don't} print a summary
+ """
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/workertrial.py b/contrib/python/Twisted/py2/twisted/trial/_dist/workertrial.py
new file mode 100644
index 0000000000..dd14dd61ef
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/trial/_dist/workertrial.py
@@ -0,0 +1,111 @@
+# -*- test-case-name: twisted.trial._dist.test.test_workertrial -*-
+#
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Implementation of C{AMP} worker commands, and main executable entry point for
+the workers.
+
+@since: 12.3
+"""
+
+import sys
+import os
+import errno
+
+
+
+def _setupPath(environ):
+ """
+ Override C{sys.path} with what the parent passed in B{TRIAL_PYTHONPATH}.
+
+ @see: twisted.trial._dist.disttrial.DistTrialRunner.launchWorkerProcesses
+ """
+ if 'TRIAL_PYTHONPATH' in environ:
+ sys.path[:] = environ['TRIAL_PYTHONPATH'].split(os.pathsep)
+
+
+_setupPath(os.environ)
+
+
+from twisted.internet.protocol import FileWrapper
+from twisted.python.log import startLoggingWithObserver, textFromEventDict
+from twisted.trial._dist.options import WorkerOptions
+from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT
+
+
+
+class WorkerLogObserver(object):
+ """
+ A log observer that forward its output to a C{AMP} protocol.
+ """
+
+ def __init__(self, protocol):
+ """
+ @param protocol: a connected C{AMP} protocol instance.
+ @type protocol: C{AMP}
+ """
+ self.protocol = protocol
+
+
+ def emit(self, eventDict):
+ """
+ Produce a log output.
+ """
+ from twisted.trial._dist import managercommands
+ text = textFromEventDict(eventDict)
+ if text is None:
+ return
+ self.protocol.callRemote(managercommands.TestWrite, out=text)
+
+
+
+def main(_fdopen=os.fdopen):
+ """
+ Main function to be run if __name__ == "__main__".
+
+ @param _fdopen: If specified, the function to use in place of C{os.fdopen}.
+ @param _fdopen: C{callable}
+ """
+ config = WorkerOptions()
+ config.parseOptions()
+
+ from twisted.trial._dist.worker import WorkerProtocol
+ workerProtocol = WorkerProtocol(config['force-gc'])
+
+ protocolIn = _fdopen(_WORKER_AMP_STDIN, 'rb')
+ protocolOut = _fdopen(_WORKER_AMP_STDOUT, 'wb')
+ workerProtocol.makeConnection(FileWrapper(protocolOut))
+
+ observer = WorkerLogObserver(workerProtocol)
+ startLoggingWithObserver(observer.emit, False)
+
+ while True:
+ try:
+ r = protocolIn.read(1)
+ except IOError as e:
+ if e.args[0] == errno.EINTR:
+ if sys.version_info < (3, 0):
+ sys.exc_clear()
+ continue
+ else:
+ raise
+ if r == b'':
+ break
+ else:
+ workerProtocol.dataReceived(r)
+ protocolOut.flush()
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ if config.tracer:
+ sys.settrace(None)
+ results = config.tracer.results()
+ results.write_results(show_missing=True, summary=False,
+ coverdir=config.coverdir().path)
+
+
+
+if __name__ == '__main__':
+ main()