diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/trial/_dist | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/trial/_dist')
11 files changed, 1933 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/__init__.py b/contrib/python/Twisted/py3/twisted/trial/_dist/__init__.py new file mode 100644 index 0000000000..502e840fef --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/trial/_dist/__init__.py @@ -0,0 +1,47 @@ +# -*- test-case-name: twisted.trial._dist.test -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +This package implements the distributed Trial test runner: + + - The L{twisted.trial._dist.disttrial} module implements a test runner which + runs in a manager process and can launch additional worker processes in + which to run tests and gather up results from all of them. + + - The L{twisted.trial._dist.options} module defines command line options used + to configure the distributed test runner. + + - The L{twisted.trial._dist.managercommands} module defines AMP commands + which are sent from worker processes back to the manager process to report + the results of tests. + + - The L{twisted.trial._dist.workercommands} module defines AMP commands which + are sent from the manager process to the worker processes to control the + execution of tests there. + + - The L{twisted.trial._dist.distreporter} module defines a proxy for + L{twisted.trial.itrial.IReporter} which enforces the typical requirement + that results be passed to a reporter for only one test at a time, allowing + any reporter to be used with despite disttrial's simultaneously running + tests. + + - The L{twisted.trial._dist.workerreporter} module implements a + L{twisted.trial.itrial.IReporter} which is used by worker processes and + reports results back to the manager process using AMP commands. + + - The L{twisted.trial._dist.workertrial} module is a runnable script which is + the main point for worker processes. + + - The L{twisted.trial._dist.worker} process defines the manager's AMP + protocol for accepting results from worker processes and a process protocol + for use running workers as local child processes (as opposed to + distributing them to another host). + +@since: 12.3 +""" + +# File descriptors numbers used to set up pipes with the worker. +_WORKER_AMP_STDIN = 3 + +_WORKER_AMP_STDOUT = 4 diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/distreporter.py b/contrib/python/Twisted/py3/twisted/trial/_dist/distreporter.py new file mode 100644 index 0000000000..3a45cc4806 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/trial/_dist/distreporter.py @@ -0,0 +1,90 @@ +# -*- test-case-name: twisted.trial._dist.test.test_distreporter -*- +# +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +The reporter is not made to support concurrent test running, so we will +hold test results in here and only send them to the reporter once the +test is over. + +@since: 12.3 +""" + +from types import TracebackType +from typing import Optional, Tuple, Union + +from zope.interface import implementer + +from twisted.python.components import proxyForInterface +from twisted.python.failure import Failure +from ..itrial import IReporter, ITestCase + +ReporterFailure = Union[Failure, Tuple[type, Exception, TracebackType]] + + +@implementer(IReporter) +class DistReporter(proxyForInterface(IReporter)): # type: ignore[misc] + """ + See module docstring. + """ + + def __init__(self, original): + super().__init__(original) + self.running = {} + + def startTest(self, test): + """ + Queue test starting. + """ + self.running[test.id()] = [] + self.running[test.id()].append((self.original.startTest, test)) + + def addFailure(self, test: ITestCase, fail: ReporterFailure) -> None: + """ + Queue adding a failure. + """ + self.running[test.id()].append((self.original.addFailure, test, fail)) + + def addError(self, test: ITestCase, error: ReporterFailure) -> None: + """ + Queue error adding. + """ + self.running[test.id()].append((self.original.addError, test, error)) + + def addSkip(self, test, reason): + """ + Queue adding a skip. + """ + self.running[test.id()].append((self.original.addSkip, test, reason)) + + def addUnexpectedSuccess(self, test, todo=None): + """ + Queue adding an unexpected success. + """ + self.running[test.id()].append((self.original.addUnexpectedSuccess, test, todo)) + + def addExpectedFailure( + self, test: ITestCase, error: ReporterFailure, todo: Optional[str] = None + ) -> None: + """ + Queue adding an expected failure. + """ + self.running[test.id()].append( + (self.original.addExpectedFailure, test, error, todo) + ) + + def addSuccess(self, test): + """ + Queue adding a success. + """ + self.running[test.id()].append((self.original.addSuccess, test)) + + def stopTest(self, test): + """ + Queue stopping the test, then unroll the queue. + """ + self.running[test.id()].append((self.original.stopTest, test)) + for step in self.running[test.id()]: + step[0](*step[1:]) + del self.running[test.id()] diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/disttrial.py b/contrib/python/Twisted/py3/twisted/trial/_dist/disttrial.py new file mode 100644 index 0000000000..bad82a8814 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/trial/_dist/disttrial.py @@ -0,0 +1,512 @@ +# -*- test-case-name: twisted.trial._dist.test.test_disttrial -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +This module contains the trial distributed runner, the management class +responsible for coordinating all of trial's behavior at the highest level. + +@since: 12.3 +""" + +import os +import sys +from functools import partial +from os.path import isabs +from typing import ( + Any, + Awaitable, + Callable, + Iterable, + List, + Optional, + Sequence, + TextIO, + Union, + cast, +) +from unittest import TestCase, TestSuite + +from attrs import define, field, frozen +from attrs.converters import default_if_none + +from twisted.internet.defer import Deferred, DeferredList, gatherResults +from twisted.internet.interfaces import IReactorCore, IReactorProcess +from twisted.logger import Logger +from twisted.python.failure import Failure +from twisted.python.filepath import FilePath +from twisted.python.lockfile import FilesystemLock +from twisted.python.modules import theSystemPath +from .._asyncrunner import _iterateTests +from ..itrial import IReporter, ITestCase +from ..reporter import UncleanWarningsReporterWrapper +from ..runner import TestHolder +from ..util import _unusedTestDirectory, openTestLog +from . import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT +from .distreporter import DistReporter +from .functional import countingCalls, discardResult, iterateWhile, takeWhile +from .worker import LocalWorker, LocalWorkerAMP, WorkerAction + + +class IDistTrialReactor(IReactorCore, IReactorProcess): + """ + The reactor interfaces required by disttrial. + """ + + +def _defaultReactor() -> IDistTrialReactor: + """ + Get the default reactor, ensuring it is suitable for use with disttrial. + """ + import twisted.internet.reactor as defaultReactor + + if all( + [ + IReactorCore.providedBy(defaultReactor), + IReactorProcess.providedBy(defaultReactor), + ] + ): + # If it provides each of the interfaces then it provides the + # intersection interface. cast it to make it easier to talk about + # later on. + return cast(IDistTrialReactor, defaultReactor) + + raise TypeError("Reactor does not provide the right interfaces") + + +@frozen +class WorkerPoolConfig: + """ + Configuration parameters for a pool of test-running workers. + + @ivar numWorkers: The number of workers in the pool. + + @ivar workingDirectory: A directory in which working directories for each + of the workers will be created. + + @ivar workerArguments: Extra arguments to pass the worker process in its + argv. + + @ivar logFile: The basename of the overall test log file. + """ + + numWorkers: int + workingDirectory: FilePath[Any] + workerArguments: Sequence[str] + logFile: str + + +@define +class StartedWorkerPool: + """ + A pool of workers which have already been started. + + @ivar workingDirectory: A directory holding the working directories for + each of the workers. + + @ivar testDirLock: An object representing the cooperative lock this pool + holds on its working directory. + + @ivar testLog: The open overall test log file. + + @ivar workers: Objects corresponding to the worker child processes and + adapting between process-related interfaces and C{IProtocol}. + + @ivar ampWorkers: AMP protocol instances corresponding to the worker child + processes. + """ + + workingDirectory: FilePath[Any] + testDirLock: FilesystemLock + testLog: TextIO + workers: List[LocalWorker] + ampWorkers: List[LocalWorkerAMP] + + _logger = Logger() + + async def run(self, workerAction: WorkerAction[Any]) -> None: + """ + Run an action on all of the workers in the pool. + """ + await gatherResults( + discardResult(workerAction(worker)) for worker in self.ampWorkers + ) + return None + + async def join(self) -> None: + """ + Shut down all of the workers in the pool. + + The pool is unusable after this method is called. + """ + results = await DeferredList( + [Deferred.fromCoroutine(worker.exit()) for worker in self.workers], + consumeErrors=True, + ) + for n, (succeeded, failure) in enumerate(results): + if not succeeded: + self._logger.failure(f"joining disttrial worker #{n} failed", failure) + + del self.workers[:] + del self.ampWorkers[:] + self.testLog.close() + self.testDirLock.unlock() + + +@frozen +class WorkerPool: + """ + Manage a fixed-size collection of child processes which can run tests. + + @ivar _config: Configuration for the precise way in which the pool is run. + """ + + _config: WorkerPoolConfig + + def _createLocalWorkers( + self, + protocols: Iterable[LocalWorkerAMP], + workingDirectory: FilePath[Any], + logFile: TextIO, + ) -> List[LocalWorker]: + """ + Create local worker protocol instances and return them. + + @param protocols: The process/protocol adapters to use for the created + workers. + + @param workingDirectory: The base path in which we should run the + workers. + + @param logFile: The test log, for workers to write to. + + @return: A list of C{quantity} C{LocalWorker} instances. + """ + return [ + LocalWorker(protocol, workingDirectory.child(str(x)), logFile) + for x, protocol in enumerate(protocols) + ] + + def _launchWorkerProcesses(self, spawner, protocols, arguments): + """ + Spawn processes from a list of process protocols. + + @param spawner: A C{IReactorProcess.spawnProcess} implementation. + + @param protocols: An iterable of C{ProcessProtocol} instances. + + @param arguments: Extra arguments passed to the processes. + """ + workertrialPath = theSystemPath["twisted.trial._dist.workertrial"].filePath.path + childFDs = { + 0: "w", + 1: "r", + 2: "r", + _WORKER_AMP_STDIN: "w", + _WORKER_AMP_STDOUT: "r", + } + environ = os.environ.copy() + # Add an environment variable containing the raw sys.path, to be used + # by subprocesses to try to make it identical to the parent's. + environ["PYTHONPATH"] = os.pathsep.join(sys.path) + for worker in protocols: + args = [sys.executable, workertrialPath] + args.extend(arguments) + spawner(worker, sys.executable, args=args, childFDs=childFDs, env=environ) + + async def start(self, reactor: IReactorProcess) -> StartedWorkerPool: + """ + Launch all of the workers for this pool. + + @return: A started pool object that can run jobs using the workers. + """ + testDir, testDirLock = _unusedTestDirectory( + self._config.workingDirectory, + ) + + if isabs(self._config.logFile): + # Open a log file wherever the user asked. + testLogPath = FilePath(self._config.logFile) + else: + # Open a log file in the chosen working directory (not necessarily + # the same as our configured working directory, if that path was + # in use). + testLogPath = testDir.preauthChild(self._config.logFile) + testLog = openTestLog(testLogPath) + + ampWorkers = [LocalWorkerAMP() for x in range(self._config.numWorkers)] + workers = self._createLocalWorkers( + ampWorkers, + testDir, + testLog, + ) + self._launchWorkerProcesses( + reactor.spawnProcess, + workers, + self._config.workerArguments, + ) + + return StartedWorkerPool( + testDir, + testDirLock, + testLog, + workers, + ampWorkers, + ) + + +def shouldContinue(untilFailure: bool, result: IReporter) -> bool: + """ + Determine whether the test suite should be iterated again. + + @param untilFailure: C{True} if the suite is supposed to run until + failure. + + @param result: The test result of the test suite iteration which just + completed. + """ + return untilFailure and result.wasSuccessful() + + +async def runTests( + pool: StartedWorkerPool, + testCases: Iterable[ITestCase], + result: DistReporter, + driveWorker: Callable[ + [DistReporter, Sequence[ITestCase], LocalWorkerAMP], Awaitable[None] + ], +) -> None: + try: + # Run the tests using the worker pool. + await pool.run(partial(driveWorker, result, testCases)) + except Exception: + # Exceptions from test code are handled somewhere else. An + # exception here is a bug in the runner itself. The only + # convenient place to put it is in the result, though. + result.original.addError(TestHolder("<runTests>"), Failure()) + + +@define +class DistTrialRunner: + """ + A specialized runner for distributed trial. The runner launches a number of + local worker processes which will run tests. + + @ivar _maxWorkers: the number of workers to be spawned. + + @ivar _exitFirst: ``True`` to stop the run as soon as a test case fails. + ``False`` to run through the whole suite and report all of the results + at the end. + + @ivar stream: stream which the reporter will use. + + @ivar _reporterFactory: the reporter class to be used. + """ + + _distReporterFactory = DistReporter + _logger = Logger() + + # accepts a `realtime` keyword argument which we can't annotate, so punt + # on the argument annotation + _reporterFactory: Callable[..., IReporter] + _maxWorkers: int + _workerArguments: List[str] + _exitFirst: bool = False + _reactor: IDistTrialReactor = field( + # mypy doesn't understand the converter + default=None, + converter=default_if_none(factory=_defaultReactor), # type: ignore [misc] + ) + # mypy doesn't understand the converter + stream: TextIO = field(default=None, converter=default_if_none(sys.stdout)) # type: ignore [misc] + + _tracebackFormat: str = "default" + _realTimeErrors: bool = False + _uncleanWarnings: bool = False + _logfile: str = "test.log" + _workingDirectory: str = "_trial_temp" + _workerPoolFactory: Callable[[WorkerPoolConfig], WorkerPool] = WorkerPool + + def _makeResult(self) -> DistReporter: + """ + Make reporter factory, and wrap it with a L{DistReporter}. + """ + reporter = self._reporterFactory( + self.stream, self._tracebackFormat, realtime=self._realTimeErrors + ) + if self._uncleanWarnings: + reporter = UncleanWarningsReporterWrapper(reporter) + return self._distReporterFactory(reporter) + + def writeResults(self, result): + """ + Write test run final outcome to result. + + @param result: A C{TestResult} which will print errors and the summary. + """ + result.done() + + async def _driveWorker( + self, + result: DistReporter, + testCases: Sequence[ITestCase], + worker: LocalWorkerAMP, + ) -> None: + """ + Drive a L{LocalWorkerAMP} instance, iterating the tests and calling + C{run} for every one of them. + + @param worker: The L{LocalWorkerAMP} to drive. + + @param result: The global L{DistReporter} instance. + + @param testCases: The global list of tests to iterate. + + @return: A coroutine that completes after all of the tests have + completed. + """ + + async def task(case): + try: + await worker.run(case, result) + except Exception: + result.original.addError(case, Failure()) + + for case in testCases: + await task(case) + + async def runAsync( + self, + suite: Union[TestCase, TestSuite], + untilFailure: bool = False, + ) -> DistReporter: + """ + Spawn local worker processes and load tests. After that, run them. + + @param suite: A test or suite to be run. + + @param untilFailure: If C{True}, continue to run the tests until they + fail. + + @return: A coroutine that completes with the test result. + """ + + # Realize a concrete set of tests to run. + testCases = list(_iterateTests(suite)) + + # Create a worker pool to use to execute them. + poolStarter = self._workerPoolFactory( + WorkerPoolConfig( + # Don't make it larger than is useful or allowed. + min(len(testCases), self._maxWorkers), + FilePath(self._workingDirectory), + self._workerArguments, + self._logfile, + ), + ) + + # Announce that we're beginning. countTestCases result is preferred + # (over len(testCases)) because testCases may contain synthetic cases + # for error reporting purposes. + self.stream.write(f"Running {suite.countTestCases()} tests.\n") + + # Start the worker pool. + startedPool = await poolStarter.start(self._reactor) + + # The condition that will determine whether the test run repeats. + condition = partial(shouldContinue, untilFailure) + + # A function that will run the whole suite once. + @countingCalls + async def runAndReport(n: int) -> DistReporter: + if untilFailure: + # If and only if we're running the suite more than once, + # provide a report about which run this is. + self.stream.write(f"Test Pass {n + 1}\n") + + result = self._makeResult() + + if self._exitFirst: + # Keep giving out tests as long as the result object has only + # seen success. + casesCondition = lambda _: result.original.wasSuccessful() + else: + casesCondition = lambda _: True + + await runTests( + startedPool, + takeWhile(casesCondition, testCases), + result, + self._driveWorker, + ) + self.writeResults(result) + return result + + try: + # Start submitting tests to workers in the pool. Perhaps repeat + # the whole test suite more than once, if appropriate for our + # configuration. + return await iterateWhile(condition, runAndReport) + finally: + # Shut down the worker pool. + await startedPool.join() + + def _run(self, test: Union[TestCase, TestSuite], untilFailure: bool) -> IReporter: + result: Union[Failure, DistReporter, None] = None + reactorStopping: bool = False + testsInProgress: Deferred[object] + + def capture(r: Union[Failure, DistReporter]) -> None: + nonlocal result + result = r + + def maybeStopTests() -> Optional[Deferred[object]]: + nonlocal reactorStopping + reactorStopping = True + if result is None: + testsInProgress.cancel() + return testsInProgress + return None + + def maybeStopReactor(result: object) -> object: + if not reactorStopping: + self._reactor.stop() + return result + + self._reactor.addSystemEventTrigger("before", "shutdown", maybeStopTests) + + testsInProgress = ( + Deferred.fromCoroutine(self.runAsync(test, untilFailure)) + .addBoth(capture) + .addBoth(maybeStopReactor) + ) + + self._reactor.run() + + if isinstance(result, Failure): + result.raiseException() + + # mypy can't see that raiseException raises an exception so we can + # only get here if result is not a Failure, so tell mypy result is + # certainly a DistReporter at this point. + assert isinstance(result, DistReporter), f"{result} is not DistReporter" + + # Unwrap the DistReporter to give the caller some regular IReporter + # object. DistReporter isn't type annotated correctly so fix it here. + return cast(IReporter, result.original) + + def run(self, test: Union[TestCase, TestSuite]) -> IReporter: + """ + Run a reactor and a test suite. + + @param test: The test or suite to run. + """ + return self._run(test, untilFailure=False) + + def runUntilFailure(self, test: Union[TestCase, TestSuite]) -> IReporter: + """ + Run the tests with local worker processes until they fail. + + @param test: The test or suite to run. + """ + return self._run(test, untilFailure=True) diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/functional.py b/contrib/python/Twisted/py3/twisted/trial/_dist/functional.py new file mode 100644 index 0000000000..3db4dca5de --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/trial/_dist/functional.py @@ -0,0 +1,125 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +General functional-style helpers for disttrial. +""" + +from functools import partial, wraps +from typing import Awaitable, Callable, Iterable, Optional, TypeVar + +from twisted.internet.defer import Deferred, succeed + +_A = TypeVar("_A") +_B = TypeVar("_B") +_C = TypeVar("_C") + + +def fromOptional(default: _A, optional: Optional[_A]) -> _A: + """ + Get a definite value from an optional value. + + @param default: The value to return if the optional value is missing. + + @param optional: The optional value to return if it exists. + """ + if optional is None: + return default + return optional + + +def takeWhile(condition: Callable[[_A], bool], xs: Iterable[_A]) -> Iterable[_A]: + """ + :return: An iterable over C{xs} that stops when C{condition} returns + ``False`` based on the value of iterated C{xs}. + """ + for x in xs: + if condition(x): + yield x + else: + break + + +async def sequence(a: Awaitable[_A], b: Awaitable[_B]) -> _B: + """ + Wait for one action to complete and then another. + + If either action fails, failure is propagated. If the first action fails, + the second action is not waited on. + """ + await a + return await b + + +def flip(f: Callable[[_A, _B], _C]) -> Callable[[_B, _A], _C]: + """ + Create a function like another but with the order of the first two + arguments flipped. + """ + + @wraps(f) + def g(b, a): + return f(a, b) + + return g + + +def compose(fx: Callable[[_B], _C], fy: Callable[[_A], _B]) -> Callable[[_A], _C]: + """ + Create a function that calls one function with an argument and then + another function with the result of the first function. + """ + + @wraps(fx) + @wraps(fy) + def g(a): + return fx(fy(a)) + + return g + + +# Discard the result of an awaitable and substitute None in its place. +# +# Ignore the `Cannot infer type argument 1 of "compose"` +# https://github.com/python/mypy/issues/6220 +discardResult: Callable[[Awaitable[_A]], Deferred[None]] = compose( # type: ignore[misc] + Deferred.fromCoroutine, + partial(flip(sequence), succeed(None)), +) + + +async def iterateWhile( + predicate: Callable[[_A], bool], + action: Callable[[], Awaitable[_A]], +) -> _A: + """ + Call a function repeatedly until its result fails to satisfy a predicate. + + @param predicate: The check to apply. + + @param action: The function to call. + + @return: The result of C{action} which did not satisfy C{predicate}. + """ + while True: + result = await action() + if not predicate(result): + return result + + +def countingCalls(f: Callable[[int], _A]) -> Callable[[], _A]: + """ + Wrap a function with another that automatically passes an integer counter + of the number of calls that have gone through the wrapper. + """ + counter = 0 + + def g() -> _A: + nonlocal counter + try: + result = f(counter) + finally: + counter += 1 + return result + + return g diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/managercommands.py b/contrib/python/Twisted/py3/twisted/trial/_dist/managercommands.py new file mode 100644 index 0000000000..4f3080a24f --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/trial/_dist/managercommands.py @@ -0,0 +1,89 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Commands for reporting test success of failure to the manager. + +@since: 12.3 +""" + +from twisted.protocols.amp import Boolean, Command, Integer, Unicode + +NativeString = Unicode + + +class AddSuccess(Command): + """ + Add a success. + """ + + arguments = [(b"testName", NativeString())] + response = [(b"success", Boolean())] + + +class AddError(Command): + """ + Add an error. + """ + + arguments = [ + (b"testName", NativeString()), + (b"errorClass", NativeString()), + (b"errorStreamId", Integer()), + (b"framesStreamId", Integer()), + ] + response = [(b"success", Boolean())] + + +class AddFailure(Command): + """ + Add a failure. + """ + + arguments = [ + (b"testName", NativeString()), + (b"failStreamId", Integer()), + (b"failClass", NativeString()), + (b"framesStreamId", Integer()), + ] + response = [(b"success", Boolean())] + + +class AddSkip(Command): + """ + Add a skip. + """ + + arguments = [(b"testName", NativeString()), (b"reason", NativeString())] + response = [(b"success", Boolean())] + + +class AddExpectedFailure(Command): + """ + Add an expected failure. + """ + + arguments = [ + (b"testName", NativeString()), + (b"errorStreamId", Integer()), + (b"todo", NativeString()), + ] + response = [(b"success", Boolean())] + + +class AddUnexpectedSuccess(Command): + """ + Add an unexpected success. + """ + + arguments = [(b"testName", NativeString()), (b"todo", NativeString())] + response = [(b"success", Boolean())] + + +class TestWrite(Command): + """ + Write test log. + """ + + arguments = [(b"out", NativeString())] + response = [(b"success", Boolean())] diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/options.py b/contrib/python/Twisted/py3/twisted/trial/_dist/options.py new file mode 100644 index 0000000000..19f9a5f6cf --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/trial/_dist/options.py @@ -0,0 +1,28 @@ +# -*- test-case-name: twisted.trial._dist.test.test_options -*- +# +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Options handling specific to trial's workers. + +@since: 12.3 +""" + +from twisted.application.app import ReactorSelectionMixin +from twisted.python.filepath import FilePath +from twisted.python.usage import Options +from twisted.scripts.trial import _BasicOptions + + +class WorkerOptions(_BasicOptions, Options, ReactorSelectionMixin): + """ + Options forwarded to the trial distributed worker. + """ + + def coverdir(self): + """ + Return a L{FilePath} representing the directory into which coverage + results should be written. + """ + return FilePath("coverage") diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/stream.py b/contrib/python/Twisted/py3/twisted/trial/_dist/stream.py new file mode 100644 index 0000000000..a53fd4ab21 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/trial/_dist/stream.py @@ -0,0 +1,100 @@ +""" +Buffer byte streams. +""" + +from itertools import count +from typing import Dict, Iterator, List, TypeVar + +from attrs import Factory, define + +from twisted.protocols.amp import AMP, Command, Integer, String as Bytes + +T = TypeVar("T") + + +class StreamOpen(Command): + """ + Open a new stream. + """ + + response = [(b"streamId", Integer())] + + +class StreamWrite(Command): + """ + Write a chunk of data to a stream. + """ + + arguments = [ + (b"streamId", Integer()), + (b"data", Bytes()), + ] + + +@define +class StreamReceiver: + """ + Buffering de-multiplexing byte stream receiver. + """ + + _counter: Iterator[int] = count() + _streams: Dict[int, List[bytes]] = Factory(dict) + + def open(self) -> int: + """ + Open a new stream and return its unique identifier. + """ + newId = next(self._counter) + self._streams[newId] = [] + return newId + + def write(self, streamId: int, chunk: bytes) -> None: + """ + Write to an open stream using its unique identifier. + + @raise KeyError: If there is no such open stream. + """ + self._streams[streamId].append(chunk) + + def finish(self, streamId: int) -> List[bytes]: + """ + Indicate an open stream may receive no further data and return all of + its current contents. + + @raise KeyError: If there is no such open stream. + """ + return self._streams.pop(streamId) + + +def chunk(data: bytes, chunkSize: int) -> Iterator[bytes]: + """ + Break a byte string into pieces of no more than ``chunkSize`` length. + + @param data: The byte string. + + @param chunkSize: The maximum length of the resulting pieces. All pieces + except possibly the last will be this length. + + @return: The pieces. + """ + pos = 0 + while pos < len(data): + yield data[pos : pos + chunkSize] + pos += chunkSize + + +async def stream(amp: AMP, chunks: Iterator[bytes]) -> int: + """ + Send the given stream chunks, one by one, over the given connection. + + The chunks are sent using L{StreamWrite} over a stream opened using + L{StreamOpen}. + + @return: The identifier of the stream over which the chunks were sent. + """ + streamId = (await amp.callRemote(StreamOpen))["streamId"] + assert isinstance(streamId, int) + + for oneChunk in chunks: + await amp.callRemote(StreamWrite, streamId=streamId, data=oneChunk) + return streamId diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/worker.py b/contrib/python/Twisted/py3/twisted/trial/_dist/worker.py new file mode 100644 index 0000000000..77e502173a --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/trial/_dist/worker.py @@ -0,0 +1,465 @@ +# -*- test-case-name: twisted.trial._dist.test.test_worker -*- +# +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +This module implements the worker classes. + +@since: 12.3 +""" + +import os +from typing import Any, Awaitable, Callable, Dict, List, Optional, TextIO, TypeVar +from unittest import TestCase + +from zope.interface import implementer + +from attrs import frozen +from typing_extensions import Protocol, TypedDict + +from twisted.internet.defer import Deferred, DeferredList +from twisted.internet.error import ProcessDone +from twisted.internet.interfaces import IAddress, ITransport +from twisted.internet.protocol import ProcessProtocol +from twisted.logger import Logger +from twisted.protocols.amp import AMP +from twisted.python.failure import Failure +from twisted.python.filepath import FilePath +from twisted.python.reflect import namedObject +from twisted.trial._dist import ( + _WORKER_AMP_STDIN, + _WORKER_AMP_STDOUT, + managercommands, + workercommands, +) +from twisted.trial._dist.workerreporter import WorkerReporter +from twisted.trial.reporter import TestResult +from twisted.trial.runner import TestLoader, TrialSuite +from twisted.trial.unittest import Todo +from .stream import StreamOpen, StreamReceiver, StreamWrite + + +@frozen(auto_exc=False) +class WorkerException(Exception): + """ + An exception was reported by a test running in a worker process. + + @ivar message: An error message describing the exception. + """ + + message: str + + +class RunResult(TypedDict): + """ + Represent the result of a L{workercommands.Run} command. + """ + + success: bool + + +class Worker(Protocol): + """ + An object that can run actions. + """ + + async def run(self, case: TestCase, result: TestResult) -> RunResult: + """ + Run a test case. + """ + + +_T = TypeVar("_T") +WorkerAction = Callable[[Worker], Awaitable[_T]] + + +class WorkerProtocol(AMP): + """ + The worker-side trial distributed protocol. + """ + + logger = Logger() + + def __init__(self, forceGarbageCollection=False): + self._loader = TestLoader() + self._result = WorkerReporter(self) + self._forceGarbageCollection = forceGarbageCollection + + @workercommands.Run.responder + async def run(self, testCase: str) -> RunResult: + """ + Run a test case by name. + """ + with self._result.gatherReportingResults() as results: + case = self._loader.loadByName(testCase) + suite = TrialSuite([case], self._forceGarbageCollection) + suite.run(self._result) + + allSucceeded = True + for success, result in await DeferredList(results, consumeErrors=True): + if success: + # Nothing to do here, proceed to the next result. + continue + + # There was some error reporting a result to the peer. + allSucceeded = False + + # We can try to report the error but since something has already + # gone wrong we shouldn't be extremely confident that this will + # succeed. So we will also log it (and any errors reporting *it*) + # to our local log. + self.logger.failure( + "Result reporting for {id} failed", + # The DeferredList type annotation assumes all results succeed + failure=result, # type: ignore[arg-type] + id=testCase, + ) + try: + await self._result.addErrorFallible( + testCase, + # The DeferredList type annotation assumes all results succeed + result, # type: ignore[arg-type] + ) + except BaseException: + # We failed to report the failure to the peer. It doesn't + # seem very likely that reporting this new failure to the peer + # will succeed so just log it locally. + self.logger.failure( + "Additionally, reporting the reporting failure failed." + ) + + return {"success": allSucceeded} + + @workercommands.Start.responder + def start(self, directory): + """ + Set up the worker, moving into given directory for tests to run in + them. + """ + os.chdir(directory) + return {"success": True} + + +class LocalWorkerAMP(AMP): + """ + Local implementation of the manager commands. + """ + + def __init__(self, boxReceiver=None, locator=None): + super().__init__(boxReceiver, locator) + self._streams = StreamReceiver() + + @StreamOpen.responder + def streamOpen(self): + return {"streamId": self._streams.open()} + + @StreamWrite.responder + def streamWrite(self, streamId, data): + self._streams.write(streamId, data) + return {} + + @managercommands.AddSuccess.responder + def addSuccess(self, testName): + """ + Add a success to the reporter. + """ + self._result.addSuccess(self._testCase) + return {"success": True} + + def _buildFailure( + self, + error: WorkerException, + errorClass: str, + frames: List[str], + ) -> Failure: + """ + Helper to build a C{Failure} with some traceback. + + @param error: An C{Exception} instance. + + @param errorClass: The class name of the C{error} class. + + @param frames: A flat list of strings representing the information need + to approximatively rebuild C{Failure} frames. + + @return: A L{Failure} instance with enough information about a test + error. + """ + errorType = namedObject(errorClass) + failure = Failure(error, errorType) + for i in range(0, len(frames), 3): + failure.frames.append( + (frames[i], frames[i + 1], int(frames[i + 2]), [], []) + ) + return failure + + @managercommands.AddError.responder + def addError( + self, + testName: str, + errorClass: str, + errorStreamId: int, + framesStreamId: int, + ) -> Dict[str, bool]: + """ + Add an error to the reporter. + + @param errorStreamId: The identifier of a stream over which the text + of this error was previously completely sent to the peer. + + @param framesStreamId: The identifier of a stream over which the lines + of the traceback for this error were previously completely sent to + the peer. + + @param error: A message describing the error. + """ + error = b"".join(self._streams.finish(errorStreamId)).decode("utf-8") + frames = [ + frame.decode("utf-8") for frame in self._streams.finish(framesStreamId) + ] + # Wrap the error message in ``WorkerException`` because it is not + # possible to transfer arbitrary exception values over the AMP + # connection to the main process but we must give *some* Exception + # (not a str) to the test result object. + failure = self._buildFailure(WorkerException(error), errorClass, frames) + self._result.addError(self._testCase, failure) + return {"success": True} + + @managercommands.AddFailure.responder + def addFailure( + self, + testName: str, + failStreamId: int, + failClass: str, + framesStreamId: int, + ) -> Dict[str, bool]: + """ + Add a failure to the reporter. + + @param failStreamId: The identifier of a stream over which the text of + this failure was previously completely sent to the peer. + + @param framesStreamId: The identifier of a stream over which the lines + of the traceback for this error were previously completely sent to the + peer. + """ + fail = b"".join(self._streams.finish(failStreamId)).decode("utf-8") + frames = [ + frame.decode("utf-8") for frame in self._streams.finish(framesStreamId) + ] + # See addError for info about use of WorkerException here. + failure = self._buildFailure(WorkerException(fail), failClass, frames) + self._result.addFailure(self._testCase, failure) + return {"success": True} + + @managercommands.AddSkip.responder + def addSkip(self, testName, reason): + """ + Add a skip to the reporter. + """ + self._result.addSkip(self._testCase, reason) + return {"success": True} + + @managercommands.AddExpectedFailure.responder + def addExpectedFailure( + self, testName: str, errorStreamId: int, todo: Optional[str] + ) -> Dict[str, bool]: + """ + Add an expected failure to the reporter. + + @param errorStreamId: The identifier of a stream over which the text + of this error was previously completely sent to the peer. + """ + error = b"".join(self._streams.finish(errorStreamId)).decode("utf-8") + _todo = Todo("<unknown>" if todo is None else todo) + self._result.addExpectedFailure(self._testCase, error, _todo) + return {"success": True} + + @managercommands.AddUnexpectedSuccess.responder + def addUnexpectedSuccess(self, testName, todo): + """ + Add an unexpected success to the reporter. + """ + self._result.addUnexpectedSuccess(self._testCase, todo) + return {"success": True} + + @managercommands.TestWrite.responder + def testWrite(self, out): + """ + Print test output from the worker. + """ + self._testStream.write(out + "\n") + self._testStream.flush() + return {"success": True} + + async def run(self, testCase: TestCase, result: TestResult) -> RunResult: + """ + Run a test. + """ + self._testCase = testCase + self._result = result + self._result.startTest(testCase) + testCaseId = testCase.id() + try: + return await self.callRemote(workercommands.Run, testCase=testCaseId) # type: ignore[no-any-return] + finally: + self._result.stopTest(testCase) + + def setTestStream(self, stream): + """ + Set the stream used to log output from tests. + """ + self._testStream = stream + + +@implementer(IAddress) +class LocalWorkerAddress: + """ + A L{IAddress} implementation meant to provide stub addresses for + L{ITransport.getPeer} and L{ITransport.getHost}. + """ + + +@implementer(ITransport) +class LocalWorkerTransport: + """ + A stub transport implementation used to support L{AMP} over a + L{ProcessProtocol} transport. + """ + + def __init__(self, transport): + self._transport = transport + + def write(self, data): + """ + Forward data to transport. + """ + self._transport.writeToChild(_WORKER_AMP_STDIN, data) + + def writeSequence(self, sequence): + """ + Emulate C{writeSequence} by iterating data in the C{sequence}. + """ + for data in sequence: + self._transport.writeToChild(_WORKER_AMP_STDIN, data) + + def loseConnection(self): + """ + Closes the transport. + """ + self._transport.loseConnection() + + def getHost(self): + """ + Return a L{LocalWorkerAddress} instance. + """ + return LocalWorkerAddress() + + def getPeer(self): + """ + Return a L{LocalWorkerAddress} instance. + """ + return LocalWorkerAddress() + + +class NotRunning(Exception): + """ + An operation was attempted on a worker process which is not running. + """ + + +class LocalWorker(ProcessProtocol): + """ + Local process worker protocol. This worker runs as a local process and + communicates via stdin/out. + + @ivar _ampProtocol: The L{AMP} protocol instance used to communicate with + the worker. + + @ivar _logDirectory: The directory where logs will reside. + + @ivar _logFile: The main log file for tests output. + """ + + def __init__( + self, + ampProtocol: LocalWorkerAMP, + logDirectory: FilePath[Any], + logFile: TextIO, + ): + self._ampProtocol = ampProtocol + self._logDirectory = logDirectory + self._logFile = logFile + self.endDeferred: Deferred[None] = Deferred() + + async def exit(self) -> None: + """ + Cause the worker process to exit. + """ + if self.transport is None: + raise NotRunning() + + endDeferred = self.endDeferred + self.transport.closeChildFD(_WORKER_AMP_STDIN) + try: + await endDeferred + except ProcessDone: + pass + + def connectionMade(self): + """ + When connection is made, create the AMP protocol instance. + """ + self._ampProtocol.makeConnection(LocalWorkerTransport(self.transport)) + self._logDirectory.makedirs(ignoreExistingDirectory=True) + self._outLog = self._logDirectory.child("out.log").open("w") + self._errLog = self._logDirectory.child("err.log").open("w") + self._ampProtocol.setTestStream(self._logFile) + d = self._ampProtocol.callRemote( + workercommands.Start, + directory=self._logDirectory.path, + ) + # Ignore the potential errors, the test suite will fail properly and it + # would just print garbage. + d.addErrback(lambda x: None) + + def connectionLost(self, reason): + """ + On connection lost, close the log files that we're managing for stdin + and stdout. + """ + self._outLog.close() + self._errLog.close() + self.transport = None + + def processEnded(self, reason: Failure) -> None: + """ + When the process closes, call C{connectionLost} for cleanup purposes + and forward the information to the C{_ampProtocol}. + """ + self.connectionLost(reason) + self._ampProtocol.connectionLost(reason) + self.endDeferred.callback(reason) + + def outReceived(self, data): + """ + Send data received from stdout to log. + """ + + self._outLog.write(data) + + def errReceived(self, data): + """ + Write error data to log. + """ + self._errLog.write(data) + + def childDataReceived(self, childFD, data): + """ + Handle data received on the specific pipe for the C{_ampProtocol}. + """ + if childFD == _WORKER_AMP_STDOUT: + self._ampProtocol.dataReceived(data) + else: + ProcessProtocol.childDataReceived(self, childFD, data) diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/workercommands.py b/contrib/python/Twisted/py3/twisted/trial/_dist/workercommands.py new file mode 100644 index 0000000000..f7d8d26b2e --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/trial/_dist/workercommands.py @@ -0,0 +1,30 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Commands for telling a worker to load tests or run tests. + +@since: 12.3 +""" + +from twisted.protocols.amp import Boolean, Command, Unicode + +NativeString = Unicode + + +class Run(Command): + """ + Run a test. + """ + + arguments = [(b"testCase", NativeString())] + response = [(b"success", Boolean())] + + +class Start(Command): + """ + Set up the worker process, giving the running directory. + """ + + arguments = [(b"directory", NativeString())] + response = [(b"success", Boolean())] diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/workerreporter.py b/contrib/python/Twisted/py3/twisted/trial/_dist/workerreporter.py new file mode 100644 index 0000000000..5f2d7a0cab --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/trial/_dist/workerreporter.py @@ -0,0 +1,354 @@ +# -*- test-case-name: twisted.trial._dist.test.test_workerreporter -*- +# +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Test reporter forwarding test results over trial distributed AMP commands. + +@since: 12.3 +""" + +from types import TracebackType +from typing import Callable, List, Optional, Sequence, Type, TypeVar +from unittest import TestCase as PyUnitTestCase + +from attrs import Factory, define +from typing_extensions import Literal + +from twisted.internet.defer import Deferred, maybeDeferred +from twisted.protocols.amp import AMP, MAX_VALUE_LENGTH +from twisted.python.failure import Failure +from twisted.python.reflect import qual +from twisted.trial._dist import managercommands +from twisted.trial.reporter import TestResult +from ..reporter import TrialFailure +from .stream import chunk, stream + +T = TypeVar("T") + + +async def addError( + amp: AMP, testName: str, errorClass: str, error: str, frames: List[str] +) -> None: + """ + Send an error to the worker manager over an AMP connection. + + First the pieces which can be large are streamed over the connection. + Then, L{managercommands.AddError} is called with the rest of the + information and the stream IDs. + + :param amp: The connection to use. + :param testName: The name (or ID) of the test the error relates to. + :param errorClass: The fully qualified name of the error type. + :param error: The string representation of the error. + :param frames: The lines of the traceback associated with the error. + """ + + errorStreamId = await stream(amp, chunk(error.encode("utf-8"), MAX_VALUE_LENGTH)) + framesStreamId = await stream(amp, (frame.encode("utf-8") for frame in frames)) + + await amp.callRemote( + managercommands.AddError, + testName=testName, + errorClass=errorClass, + errorStreamId=errorStreamId, + framesStreamId=framesStreamId, + ) + + +async def addFailure( + amp: AMP, testName: str, fail: str, failClass: str, frames: List[str] +) -> None: + """ + Like L{addError} but for failures. + + :param amp: See L{addError} + :param testName: See L{addError} + :param failClass: The fully qualified name of the exception associated + with the failure. + :param fail: The string representation of the failure. + :param frames: The lines of the traceback associated with the error. + """ + failStreamId = await stream(amp, chunk(fail.encode("utf-8"), MAX_VALUE_LENGTH)) + framesStreamId = await stream(amp, (frame.encode("utf-8") for frame in frames)) + + await amp.callRemote( + managercommands.AddFailure, + testName=testName, + failClass=failClass, + failStreamId=failStreamId, + framesStreamId=framesStreamId, + ) + + +async def addExpectedFailure(amp: AMP, testName: str, error: str, todo: str) -> None: + """ + Like L{addError} but for expected failures. + + :param amp: See L{addError} + :param testName: See L{addError} + :param error: The string representation of the expected failure. + :param todo: The string description of the expectation. + """ + errorStreamId = await stream(amp, chunk(error.encode("utf-8"), MAX_VALUE_LENGTH)) + + await amp.callRemote( + managercommands.AddExpectedFailure, + testName=testName, + errorStreamId=errorStreamId, + todo=todo, + ) + + +@define +class ReportingResults: + """ + A mutable container for the result of sending test results back to the + parent process. + + Since it is possible for these sends to fail asynchronously but the + L{TestResult} protocol is not well suited for asynchronous result + reporting, results are collected on an instance of this class and when the + runner believes the test is otherwise complete, it can collect the results + and do something with any errors. + + :ivar _reporter: The L{WorkerReporter} this object is associated with. + This is the object doing the result reporting. + + :ivar _results: A list of L{Deferred} instances representing the results + of reporting operations. This is expected to grow over the course of + the test run and then be inspected by the runner once the test is + over. The public interface to this list is via the context manager + interface. + """ + + _reporter: "WorkerReporter" + _results: List[Deferred[object]] = Factory(list) + + def __enter__(self) -> Sequence[Deferred[object]]: + """ + Begin a new reportable context in which results can be collected. + + :return: A sequence which will contain the L{Deferred} instances + representing the results of all test result reporting that happens + while the context manager is active. The sequence is extended as + the test runs so its value should not be consumed until the test + is over. + """ + return self._results + + def __exit__( + self, + excType: Type[BaseException], + excValue: BaseException, + excTraceback: TracebackType, + ) -> Literal[False]: + """ + End the reportable context. + """ + self._reporter._reporting = None + return False + + def record(self, result: Deferred[object]) -> None: + """ + Record a L{Deferred} instance representing one test result reporting + operation. + """ + self._results.append(result) + + +class WorkerReporter(TestResult): + """ + Reporter for trial's distributed workers. We send things not through a + stream, but through an C{AMP} protocol's C{callRemote} method. + + @ivar _DEFAULT_TODO: Default message for expected failures and + unexpected successes, used only if a C{Todo} is not provided. + + @ivar _reporting: When a "result reporting" context is active, the + corresponding context manager. Otherwise, L{None}. + """ + + _DEFAULT_TODO = "Test expected to fail" + + ampProtocol: AMP + _reporting: Optional[ReportingResults] = None + + def __init__(self, ampProtocol): + """ + @param ampProtocol: The communication channel with the trial + distributed manager which collects all test results. + """ + super().__init__() + self.ampProtocol = ampProtocol + + def gatherReportingResults(self) -> ReportingResults: + """ + Get a "result reporting" context manager. + + In a "result reporting" context, asynchronous test result reporting + methods may be used safely. Their results (in particular, failures) + are available from the context manager. + """ + self._reporting = ReportingResults(self) + return self._reporting + + def _getFailure(self, error: TrialFailure) -> Failure: + """ + Convert a C{sys.exc_info()}-style tuple to a L{Failure}, if necessary. + """ + if isinstance(error, tuple): + return Failure(error[1], error[0], error[2]) + return error + + def _getFrames(self, failure: Failure) -> List[str]: + """ + Extract frames from a C{Failure} instance. + """ + frames: List[str] = [] + for frame in failure.frames: + # The code object's name, the code object's filename, and the line + # number. + frames.extend([frame[0], frame[1], str(frame[2])]) + return frames + + def _call(self, f: Callable[[], T]) -> None: + """ + Call L{f} if and only if a "result reporting" context is active. + + @param f: A function to call. Its result is accumulated into the + result reporting context. It may return a L{Deferred} or a + coroutine or synchronously raise an exception or return a result + value. + + @raise ValueError: If no result reporting context is active. + """ + if self._reporting is not None: + self._reporting.record(maybeDeferred(f)) + else: + raise ValueError( + "Cannot call command outside of reporting context manager." + ) + + def addSuccess(self, test: PyUnitTestCase) -> None: + """ + Send a success to the parent process. + + This must be called in context managed by L{gatherReportingResults}. + """ + super().addSuccess(test) + testName = test.id() + self._call( + lambda: self.ampProtocol.callRemote( + managercommands.AddSuccess, testName=testName + ) + ) + + async def addErrorFallible(self, testName: str, errorObj: TrialFailure) -> None: + """ + Attempt to report an error to the parent process. + + Unlike L{addError} this can fail asynchronously. This version is for + infrastructure code that can apply its own failure handling. + + @return: A L{Deferred} that fires with the result of the attempt. + """ + failure = self._getFailure(errorObj) + errorStr = failure.getErrorMessage() + errorClass = qual(failure.type) + frames = self._getFrames(failure) + await addError( + self.ampProtocol, + testName, + errorClass, + errorStr, + frames, + ) + + def addError(self, test: PyUnitTestCase, error: TrialFailure) -> None: + """ + Send an error to the parent process. + """ + super().addError(test, error) + testName = test.id() + self._call(lambda: self.addErrorFallible(testName, error)) + + def addFailure(self, test: PyUnitTestCase, fail: TrialFailure) -> None: + """ + Send a Failure over. + """ + super().addFailure(test, fail) + testName = test.id() + failure = self._getFailure(fail) + failureMessage = failure.getErrorMessage() + failClass = qual(failure.type) + frames = self._getFrames(failure) + self._call( + lambda: addFailure( + self.ampProtocol, + testName, + failureMessage, + failClass, + frames, + ), + ) + + def addSkip(self, test, reason): + """ + Send a skip over. + """ + super().addSkip(test, reason) + reason = str(reason) + testName = test.id() + self._call( + lambda: self.ampProtocol.callRemote( + managercommands.AddSkip, testName=testName, reason=reason + ) + ) + + def _getTodoReason(self, todo): + """ + Get the reason for a C{Todo}. + + If C{todo} is L{None}, return a sensible default. + """ + if todo is None: + return self._DEFAULT_TODO + else: + return todo.reason + + def addExpectedFailure(self, test, error, todo=None): + """ + Send an expected failure over. + """ + super().addExpectedFailure(test, error, todo) + errorMessage = error.getErrorMessage() + testName = test.id() + self._call( + lambda: addExpectedFailure( + self.ampProtocol, + testName=testName, + error=errorMessage, + todo=self._getTodoReason(todo), + ) + ) + + def addUnexpectedSuccess(self, test, todo=None): + """ + Send an unexpected success over. + """ + super().addUnexpectedSuccess(test, todo) + testName = test.id() + self._call( + lambda: self.ampProtocol.callRemote( + managercommands.AddUnexpectedSuccess, + testName=testName, + todo=self._getTodoReason(todo), + ) + ) + + def printSummary(self): + """ + I{Don't} print a summary + """ diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/workertrial.py b/contrib/python/Twisted/py3/twisted/trial/_dist/workertrial.py new file mode 100644 index 0000000000..847dbc51a6 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/trial/_dist/workertrial.py @@ -0,0 +1,93 @@ +# -*- test-case-name: twisted.trial._dist.test.test_workertrial -*- +# +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Implementation of C{AMP} worker commands, and main executable entry point for +the workers. + +@since: 12.3 +""" + +import errno +import os +import sys + +from twisted.internet.protocol import FileWrapper +from twisted.python.log import startLoggingWithObserver, textFromEventDict +from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT +from twisted.trial._dist.options import WorkerOptions + + +class WorkerLogObserver: + """ + A log observer that forward its output to a C{AMP} protocol. + """ + + def __init__(self, protocol): + """ + @param protocol: a connected C{AMP} protocol instance. + @type protocol: C{AMP} + """ + self.protocol = protocol + + def emit(self, eventDict): + """ + Produce a log output. + """ + from twisted.trial._dist import managercommands + + text = textFromEventDict(eventDict) + if text is None: + return + self.protocol.callRemote(managercommands.TestWrite, out=text) + + +def main(_fdopen=os.fdopen): + """ + Main function to be run if __name__ == "__main__". + + @param _fdopen: If specified, the function to use in place of C{os.fdopen}. + @type _fdopen: C{callable} + """ + config = WorkerOptions() + config.parseOptions() + + from twisted.trial._dist.worker import WorkerProtocol + + workerProtocol = WorkerProtocol(config["force-gc"]) + + protocolIn = _fdopen(_WORKER_AMP_STDIN, "rb") + protocolOut = _fdopen(_WORKER_AMP_STDOUT, "wb") + workerProtocol.makeConnection(FileWrapper(protocolOut)) + + observer = WorkerLogObserver(workerProtocol) + startLoggingWithObserver(observer.emit, False) + + while True: + try: + r = protocolIn.read(1) + except OSError as e: + if e.args[0] == errno.EINTR: + continue + else: + raise + if r == b"": + break + else: + workerProtocol.dataReceived(r) + protocolOut.flush() + sys.stdout.flush() + sys.stderr.flush() + + if config.tracer: + sys.settrace(None) + results = config.tracer.results() + results.write_results( + show_missing=True, summary=False, coverdir=config.coverdir().path + ) + + +if __name__ == "__main__": + main() |