diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py')
-rw-r--r-- | contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py | 258 |
1 files changed, 258 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py b/contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py new file mode 100644 index 0000000000..96875b4a85 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/trial/_dist/disttrial.py @@ -0,0 +1,258 @@ +# -*- test-case-name: twisted.trial._dist.test.test_disttrial -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +This module contains the trial distributed runner, the management class +responsible for coordinating all of trial's behavior at the highest level. + +@since: 12.3 +""" + +import os +import sys + +from twisted.python.filepath import FilePath +from twisted.python.modules import theSystemPath +from twisted.internet.defer import DeferredList +from twisted.internet.task import cooperate + +from twisted.trial.util import _unusedTestDirectory +from twisted.trial._asyncrunner import _iterateTests +from twisted.trial._dist.worker import LocalWorker, LocalWorkerAMP +from twisted.trial._dist.distreporter import DistReporter +from twisted.trial.reporter import UncleanWarningsReporterWrapper +from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT + + + +class DistTrialRunner(object): + """ + A specialized runner for distributed trial. The runner launches a number of + local worker processes which will run tests. + + @ivar _workerNumber: the number of workers to be spawned. + @type _workerNumber: C{int} + + @ivar _stream: stream which the reporter will use. + + @ivar _reporterFactory: the reporter class to be used. + """ + _distReporterFactory = DistReporter + + def _makeResult(self): + """ + Make reporter factory, and wrap it with a L{DistReporter}. + """ + reporter = self._reporterFactory(self._stream, self._tbformat, + realtime=self._rterrors) + if self._uncleanWarnings: + reporter = UncleanWarningsReporterWrapper(reporter) + return self._distReporterFactory(reporter) + + + def __init__(self, reporterFactory, workerNumber, workerArguments, + stream=None, + tracebackFormat='default', + realTimeErrors=False, + uncleanWarnings=False, + logfile='test.log', + workingDirectory='_trial_temp'): + self._workerNumber = workerNumber + self._workerArguments = workerArguments + self._reporterFactory = reporterFactory + if stream is None: + stream = sys.stdout + self._stream = stream + self._tbformat = tracebackFormat + self._rterrors = realTimeErrors + self._uncleanWarnings = uncleanWarnings + self._result = None + self._workingDirectory = workingDirectory + self._logFile = logfile + self._logFileObserver = None + self._logFileObject = None + self._logWarnings = False + + + def writeResults(self, result): + """ + Write test run final outcome to result. + + @param result: A C{TestResult} which will print errors and the summary. + """ + result.done() + + + def createLocalWorkers(self, protocols, workingDirectory): + """ + Create local worker protocol instances and return them. + + @param protocols: An iterable of L{LocalWorkerAMP} instances. + + @param workingDirectory: The base path in which we should run the + workers. + @type workingDirectory: C{str} + + @return: A list of C{quantity} C{LocalWorker} instances. + """ + return [LocalWorker(protocol, + os.path.join(workingDirectory, str(x)), + self._logFile) + for x, protocol in enumerate(protocols)] + + + def launchWorkerProcesses(self, spawner, protocols, arguments): + """ + Spawn processes from a list of process protocols. + + @param spawner: A C{IReactorProcess.spawnProcess} implementation. + + @param protocols: An iterable of C{ProcessProtocol} instances. + + @param arguments: Extra arguments passed to the processes. + """ + workertrialPath = theSystemPath[ + 'twisted.trial._dist.workertrial'].filePath.path + childFDs = {0: 'w', 1: 'r', 2: 'r', _WORKER_AMP_STDIN: 'w', + _WORKER_AMP_STDOUT: 'r'} + environ = os.environ.copy() + # Add an environment variable containing the raw sys.path, to be used by + # subprocesses to make sure it's identical to the parent. See + # workertrial._setupPath. + environ['TRIAL_PYTHONPATH'] = os.pathsep.join(sys.path) + for worker in protocols: + args = [sys.executable, workertrialPath] + args.extend(arguments) + spawner(worker, sys.executable, args=args, childFDs=childFDs, + env=environ) + + + def _driveWorker(self, worker, result, testCases, cooperate): + """ + Drive a L{LocalWorkerAMP} instance, iterating the tests and calling + C{run} for every one of them. + + @param worker: The L{LocalWorkerAMP} to drive. + + @param result: The global L{DistReporter} instance. + + @param testCases: The global list of tests to iterate. + + @param cooperate: The cooperate function to use, to be customized in + tests. + @type cooperate: C{function} + + @return: A C{Deferred} firing when all the tests are finished. + """ + + def resultErrback(error, case): + result.original.addFailure(case, error) + return error + + def task(case): + d = worker.run(case, result) + d.addErrback(resultErrback, case) + return d + + return cooperate(task(case) for case in testCases).whenDone() + + + def run(self, suite, reactor=None, cooperate=cooperate, + untilFailure=False): + """ + Spawn local worker processes and load tests. After that, run them. + + @param suite: A tests suite to be run. + + @param reactor: The reactor to use, to be customized in tests. + @type reactor: A provider of + L{twisted.internet.interfaces.IReactorProcess} + + @param cooperate: The cooperate function to use, to be customized in + tests. + @type cooperate: C{function} + + @param untilFailure: If C{True}, continue to run the tests until they + fail. + @type untilFailure: C{bool}. + + @return: The test result. + @rtype: L{DistReporter} + """ + if reactor is None: + from twisted.internet import reactor + result = self._makeResult() + count = suite.countTestCases() + self._stream.write("Running %d tests.\n" % (count,)) + + if not count: + # Take a shortcut if there is no test + suite.run(result.original) + self.writeResults(result) + return result + + testDir, testDirLock = _unusedTestDirectory( + FilePath(self._workingDirectory)) + workerNumber = min(count, self._workerNumber) + ampWorkers = [LocalWorkerAMP() for x in range(workerNumber)] + workers = self.createLocalWorkers(ampWorkers, testDir.path) + processEndDeferreds = [worker.endDeferred for worker in workers] + self.launchWorkerProcesses(reactor.spawnProcess, workers, + self._workerArguments) + + def runTests(): + testCases = iter(list(_iterateTests(suite))) + + workerDeferreds = [] + for worker in ampWorkers: + workerDeferreds.append( + self._driveWorker(worker, result, testCases, + cooperate=cooperate)) + return DeferredList(workerDeferreds, consumeErrors=True, + fireOnOneErrback=True) + + stopping = [] + + def nextRun(ign): + self.writeResults(result) + if not untilFailure: + return + if not result.wasSuccessful(): + return + d = runTests() + return d.addCallback(nextRun) + + def stop(ign): + testDirLock.unlock() + if not stopping: + stopping.append(None) + reactor.stop() + + def beforeShutDown(): + if not stopping: + stopping.append(None) + d = DeferredList(processEndDeferreds, consumeErrors=True) + return d.addCallback(continueShutdown) + + def continueShutdown(ign): + self.writeResults(result) + return ign + + d = runTests() + d.addCallback(nextRun) + d.addBoth(stop) + + reactor.addSystemEventTrigger('before', 'shutdown', beforeShutDown) + reactor.run() + + return result + + + def runUntilFailure(self, suite): + """ + Run the tests with local worker processes until they fail. + + @param suite: A tests suite to be run. + """ + return self.run(suite, untilFailure=True) |