aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/trial/_dist
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/trial/_dist
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/trial/_dist')
-rw-r--r--contrib/python/Twisted/py3/twisted/trial/_dist/__init__.py47
-rw-r--r--contrib/python/Twisted/py3/twisted/trial/_dist/distreporter.py90
-rw-r--r--contrib/python/Twisted/py3/twisted/trial/_dist/disttrial.py512
-rw-r--r--contrib/python/Twisted/py3/twisted/trial/_dist/functional.py125
-rw-r--r--contrib/python/Twisted/py3/twisted/trial/_dist/managercommands.py89
-rw-r--r--contrib/python/Twisted/py3/twisted/trial/_dist/options.py28
-rw-r--r--contrib/python/Twisted/py3/twisted/trial/_dist/stream.py100
-rw-r--r--contrib/python/Twisted/py3/twisted/trial/_dist/worker.py465
-rw-r--r--contrib/python/Twisted/py3/twisted/trial/_dist/workercommands.py30
-rw-r--r--contrib/python/Twisted/py3/twisted/trial/_dist/workerreporter.py354
-rw-r--r--contrib/python/Twisted/py3/twisted/trial/_dist/workertrial.py93
11 files changed, 1933 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/__init__.py b/contrib/python/Twisted/py3/twisted/trial/_dist/__init__.py
new file mode 100644
index 0000000000..502e840fef
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/trial/_dist/__init__.py
@@ -0,0 +1,47 @@
+# -*- test-case-name: twisted.trial._dist.test -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+This package implements the distributed Trial test runner:
+
+ - The L{twisted.trial._dist.disttrial} module implements a test runner which
+ runs in a manager process and can launch additional worker processes in
+ which to run tests and gather up results from all of them.
+
+ - The L{twisted.trial._dist.options} module defines command line options used
+ to configure the distributed test runner.
+
+ - The L{twisted.trial._dist.managercommands} module defines AMP commands
+ which are sent from worker processes back to the manager process to report
+ the results of tests.
+
+ - The L{twisted.trial._dist.workercommands} module defines AMP commands which
+ are sent from the manager process to the worker processes to control the
+ execution of tests there.
+
+ - The L{twisted.trial._dist.distreporter} module defines a proxy for
+ L{twisted.trial.itrial.IReporter} which enforces the typical requirement
+ that results be passed to a reporter for only one test at a time, allowing
+ any reporter to be used with despite disttrial's simultaneously running
+ tests.
+
+ - The L{twisted.trial._dist.workerreporter} module implements a
+ L{twisted.trial.itrial.IReporter} which is used by worker processes and
+ reports results back to the manager process using AMP commands.
+
+ - The L{twisted.trial._dist.workertrial} module is a runnable script which is
+ the main point for worker processes.
+
+ - The L{twisted.trial._dist.worker} process defines the manager's AMP
+ protocol for accepting results from worker processes and a process protocol
+ for use running workers as local child processes (as opposed to
+ distributing them to another host).
+
+@since: 12.3
+"""
+
+# File descriptors numbers used to set up pipes with the worker.
+_WORKER_AMP_STDIN = 3
+
+_WORKER_AMP_STDOUT = 4
diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/distreporter.py b/contrib/python/Twisted/py3/twisted/trial/_dist/distreporter.py
new file mode 100644
index 0000000000..3a45cc4806
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/trial/_dist/distreporter.py
@@ -0,0 +1,90 @@
+# -*- test-case-name: twisted.trial._dist.test.test_distreporter -*-
+#
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+The reporter is not made to support concurrent test running, so we will
+hold test results in here and only send them to the reporter once the
+test is over.
+
+@since: 12.3
+"""
+
+from types import TracebackType
+from typing import Optional, Tuple, Union
+
+from zope.interface import implementer
+
+from twisted.python.components import proxyForInterface
+from twisted.python.failure import Failure
+from ..itrial import IReporter, ITestCase
+
+ReporterFailure = Union[Failure, Tuple[type, Exception, TracebackType]]
+
+
+@implementer(IReporter)
+class DistReporter(proxyForInterface(IReporter)): # type: ignore[misc]
+ """
+ See module docstring.
+ """
+
+ def __init__(self, original):
+ super().__init__(original)
+ self.running = {}
+
+ def startTest(self, test):
+ """
+ Queue test starting.
+ """
+ self.running[test.id()] = []
+ self.running[test.id()].append((self.original.startTest, test))
+
+ def addFailure(self, test: ITestCase, fail: ReporterFailure) -> None:
+ """
+ Queue adding a failure.
+ """
+ self.running[test.id()].append((self.original.addFailure, test, fail))
+
+ def addError(self, test: ITestCase, error: ReporterFailure) -> None:
+ """
+ Queue error adding.
+ """
+ self.running[test.id()].append((self.original.addError, test, error))
+
+ def addSkip(self, test, reason):
+ """
+ Queue adding a skip.
+ """
+ self.running[test.id()].append((self.original.addSkip, test, reason))
+
+ def addUnexpectedSuccess(self, test, todo=None):
+ """
+ Queue adding an unexpected success.
+ """
+ self.running[test.id()].append((self.original.addUnexpectedSuccess, test, todo))
+
+ def addExpectedFailure(
+ self, test: ITestCase, error: ReporterFailure, todo: Optional[str] = None
+ ) -> None:
+ """
+ Queue adding an expected failure.
+ """
+ self.running[test.id()].append(
+ (self.original.addExpectedFailure, test, error, todo)
+ )
+
+ def addSuccess(self, test):
+ """
+ Queue adding a success.
+ """
+ self.running[test.id()].append((self.original.addSuccess, test))
+
+ def stopTest(self, test):
+ """
+ Queue stopping the test, then unroll the queue.
+ """
+ self.running[test.id()].append((self.original.stopTest, test))
+ for step in self.running[test.id()]:
+ step[0](*step[1:])
+ del self.running[test.id()]
diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/disttrial.py b/contrib/python/Twisted/py3/twisted/trial/_dist/disttrial.py
new file mode 100644
index 0000000000..bad82a8814
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/trial/_dist/disttrial.py
@@ -0,0 +1,512 @@
+# -*- test-case-name: twisted.trial._dist.test.test_disttrial -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+This module contains the trial distributed runner, the management class
+responsible for coordinating all of trial's behavior at the highest level.
+
+@since: 12.3
+"""
+
+import os
+import sys
+from functools import partial
+from os.path import isabs
+from typing import (
+ Any,
+ Awaitable,
+ Callable,
+ Iterable,
+ List,
+ Optional,
+ Sequence,
+ TextIO,
+ Union,
+ cast,
+)
+from unittest import TestCase, TestSuite
+
+from attrs import define, field, frozen
+from attrs.converters import default_if_none
+
+from twisted.internet.defer import Deferred, DeferredList, gatherResults
+from twisted.internet.interfaces import IReactorCore, IReactorProcess
+from twisted.logger import Logger
+from twisted.python.failure import Failure
+from twisted.python.filepath import FilePath
+from twisted.python.lockfile import FilesystemLock
+from twisted.python.modules import theSystemPath
+from .._asyncrunner import _iterateTests
+from ..itrial import IReporter, ITestCase
+from ..reporter import UncleanWarningsReporterWrapper
+from ..runner import TestHolder
+from ..util import _unusedTestDirectory, openTestLog
+from . import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT
+from .distreporter import DistReporter
+from .functional import countingCalls, discardResult, iterateWhile, takeWhile
+from .worker import LocalWorker, LocalWorkerAMP, WorkerAction
+
+
+class IDistTrialReactor(IReactorCore, IReactorProcess):
+ """
+ The reactor interfaces required by disttrial.
+ """
+
+
+def _defaultReactor() -> IDistTrialReactor:
+ """
+ Get the default reactor, ensuring it is suitable for use with disttrial.
+ """
+ import twisted.internet.reactor as defaultReactor
+
+ if all(
+ [
+ IReactorCore.providedBy(defaultReactor),
+ IReactorProcess.providedBy(defaultReactor),
+ ]
+ ):
+ # If it provides each of the interfaces then it provides the
+ # intersection interface. cast it to make it easier to talk about
+ # later on.
+ return cast(IDistTrialReactor, defaultReactor)
+
+ raise TypeError("Reactor does not provide the right interfaces")
+
+
+@frozen
+class WorkerPoolConfig:
+ """
+ Configuration parameters for a pool of test-running workers.
+
+ @ivar numWorkers: The number of workers in the pool.
+
+ @ivar workingDirectory: A directory in which working directories for each
+ of the workers will be created.
+
+ @ivar workerArguments: Extra arguments to pass the worker process in its
+ argv.
+
+ @ivar logFile: The basename of the overall test log file.
+ """
+
+ numWorkers: int
+ workingDirectory: FilePath[Any]
+ workerArguments: Sequence[str]
+ logFile: str
+
+
+@define
+class StartedWorkerPool:
+ """
+ A pool of workers which have already been started.
+
+ @ivar workingDirectory: A directory holding the working directories for
+ each of the workers.
+
+ @ivar testDirLock: An object representing the cooperative lock this pool
+ holds on its working directory.
+
+ @ivar testLog: The open overall test log file.
+
+ @ivar workers: Objects corresponding to the worker child processes and
+ adapting between process-related interfaces and C{IProtocol}.
+
+ @ivar ampWorkers: AMP protocol instances corresponding to the worker child
+ processes.
+ """
+
+ workingDirectory: FilePath[Any]
+ testDirLock: FilesystemLock
+ testLog: TextIO
+ workers: List[LocalWorker]
+ ampWorkers: List[LocalWorkerAMP]
+
+ _logger = Logger()
+
+ async def run(self, workerAction: WorkerAction[Any]) -> None:
+ """
+ Run an action on all of the workers in the pool.
+ """
+ await gatherResults(
+ discardResult(workerAction(worker)) for worker in self.ampWorkers
+ )
+ return None
+
+ async def join(self) -> None:
+ """
+ Shut down all of the workers in the pool.
+
+ The pool is unusable after this method is called.
+ """
+ results = await DeferredList(
+ [Deferred.fromCoroutine(worker.exit()) for worker in self.workers],
+ consumeErrors=True,
+ )
+ for n, (succeeded, failure) in enumerate(results):
+ if not succeeded:
+ self._logger.failure(f"joining disttrial worker #{n} failed", failure)
+
+ del self.workers[:]
+ del self.ampWorkers[:]
+ self.testLog.close()
+ self.testDirLock.unlock()
+
+
+@frozen
+class WorkerPool:
+ """
+ Manage a fixed-size collection of child processes which can run tests.
+
+ @ivar _config: Configuration for the precise way in which the pool is run.
+ """
+
+ _config: WorkerPoolConfig
+
+ def _createLocalWorkers(
+ self,
+ protocols: Iterable[LocalWorkerAMP],
+ workingDirectory: FilePath[Any],
+ logFile: TextIO,
+ ) -> List[LocalWorker]:
+ """
+ Create local worker protocol instances and return them.
+
+ @param protocols: The process/protocol adapters to use for the created
+ workers.
+
+ @param workingDirectory: The base path in which we should run the
+ workers.
+
+ @param logFile: The test log, for workers to write to.
+
+ @return: A list of C{quantity} C{LocalWorker} instances.
+ """
+ return [
+ LocalWorker(protocol, workingDirectory.child(str(x)), logFile)
+ for x, protocol in enumerate(protocols)
+ ]
+
+ def _launchWorkerProcesses(self, spawner, protocols, arguments):
+ """
+ Spawn processes from a list of process protocols.
+
+ @param spawner: A C{IReactorProcess.spawnProcess} implementation.
+
+ @param protocols: An iterable of C{ProcessProtocol} instances.
+
+ @param arguments: Extra arguments passed to the processes.
+ """
+ workertrialPath = theSystemPath["twisted.trial._dist.workertrial"].filePath.path
+ childFDs = {
+ 0: "w",
+ 1: "r",
+ 2: "r",
+ _WORKER_AMP_STDIN: "w",
+ _WORKER_AMP_STDOUT: "r",
+ }
+ environ = os.environ.copy()
+ # Add an environment variable containing the raw sys.path, to be used
+ # by subprocesses to try to make it identical to the parent's.
+ environ["PYTHONPATH"] = os.pathsep.join(sys.path)
+ for worker in protocols:
+ args = [sys.executable, workertrialPath]
+ args.extend(arguments)
+ spawner(worker, sys.executable, args=args, childFDs=childFDs, env=environ)
+
+ async def start(self, reactor: IReactorProcess) -> StartedWorkerPool:
+ """
+ Launch all of the workers for this pool.
+
+ @return: A started pool object that can run jobs using the workers.
+ """
+ testDir, testDirLock = _unusedTestDirectory(
+ self._config.workingDirectory,
+ )
+
+ if isabs(self._config.logFile):
+ # Open a log file wherever the user asked.
+ testLogPath = FilePath(self._config.logFile)
+ else:
+ # Open a log file in the chosen working directory (not necessarily
+ # the same as our configured working directory, if that path was
+ # in use).
+ testLogPath = testDir.preauthChild(self._config.logFile)
+ testLog = openTestLog(testLogPath)
+
+ ampWorkers = [LocalWorkerAMP() for x in range(self._config.numWorkers)]
+ workers = self._createLocalWorkers(
+ ampWorkers,
+ testDir,
+ testLog,
+ )
+ self._launchWorkerProcesses(
+ reactor.spawnProcess,
+ workers,
+ self._config.workerArguments,
+ )
+
+ return StartedWorkerPool(
+ testDir,
+ testDirLock,
+ testLog,
+ workers,
+ ampWorkers,
+ )
+
+
+def shouldContinue(untilFailure: bool, result: IReporter) -> bool:
+ """
+ Determine whether the test suite should be iterated again.
+
+ @param untilFailure: C{True} if the suite is supposed to run until
+ failure.
+
+ @param result: The test result of the test suite iteration which just
+ completed.
+ """
+ return untilFailure and result.wasSuccessful()
+
+
+async def runTests(
+ pool: StartedWorkerPool,
+ testCases: Iterable[ITestCase],
+ result: DistReporter,
+ driveWorker: Callable[
+ [DistReporter, Sequence[ITestCase], LocalWorkerAMP], Awaitable[None]
+ ],
+) -> None:
+ try:
+ # Run the tests using the worker pool.
+ await pool.run(partial(driveWorker, result, testCases))
+ except Exception:
+ # Exceptions from test code are handled somewhere else. An
+ # exception here is a bug in the runner itself. The only
+ # convenient place to put it is in the result, though.
+ result.original.addError(TestHolder("<runTests>"), Failure())
+
+
+@define
+class DistTrialRunner:
+ """
+ A specialized runner for distributed trial. The runner launches a number of
+ local worker processes which will run tests.
+
+ @ivar _maxWorkers: the number of workers to be spawned.
+
+ @ivar _exitFirst: ``True`` to stop the run as soon as a test case fails.
+ ``False`` to run through the whole suite and report all of the results
+ at the end.
+
+ @ivar stream: stream which the reporter will use.
+
+ @ivar _reporterFactory: the reporter class to be used.
+ """
+
+ _distReporterFactory = DistReporter
+ _logger = Logger()
+
+ # accepts a `realtime` keyword argument which we can't annotate, so punt
+ # on the argument annotation
+ _reporterFactory: Callable[..., IReporter]
+ _maxWorkers: int
+ _workerArguments: List[str]
+ _exitFirst: bool = False
+ _reactor: IDistTrialReactor = field(
+ # mypy doesn't understand the converter
+ default=None,
+ converter=default_if_none(factory=_defaultReactor), # type: ignore [misc]
+ )
+ # mypy doesn't understand the converter
+ stream: TextIO = field(default=None, converter=default_if_none(sys.stdout)) # type: ignore [misc]
+
+ _tracebackFormat: str = "default"
+ _realTimeErrors: bool = False
+ _uncleanWarnings: bool = False
+ _logfile: str = "test.log"
+ _workingDirectory: str = "_trial_temp"
+ _workerPoolFactory: Callable[[WorkerPoolConfig], WorkerPool] = WorkerPool
+
+ def _makeResult(self) -> DistReporter:
+ """
+ Make reporter factory, and wrap it with a L{DistReporter}.
+ """
+ reporter = self._reporterFactory(
+ self.stream, self._tracebackFormat, realtime=self._realTimeErrors
+ )
+ if self._uncleanWarnings:
+ reporter = UncleanWarningsReporterWrapper(reporter)
+ return self._distReporterFactory(reporter)
+
+ def writeResults(self, result):
+ """
+ Write test run final outcome to result.
+
+ @param result: A C{TestResult} which will print errors and the summary.
+ """
+ result.done()
+
+ async def _driveWorker(
+ self,
+ result: DistReporter,
+ testCases: Sequence[ITestCase],
+ worker: LocalWorkerAMP,
+ ) -> None:
+ """
+ Drive a L{LocalWorkerAMP} instance, iterating the tests and calling
+ C{run} for every one of them.
+
+ @param worker: The L{LocalWorkerAMP} to drive.
+
+ @param result: The global L{DistReporter} instance.
+
+ @param testCases: The global list of tests to iterate.
+
+ @return: A coroutine that completes after all of the tests have
+ completed.
+ """
+
+ async def task(case):
+ try:
+ await worker.run(case, result)
+ except Exception:
+ result.original.addError(case, Failure())
+
+ for case in testCases:
+ await task(case)
+
+ async def runAsync(
+ self,
+ suite: Union[TestCase, TestSuite],
+ untilFailure: bool = False,
+ ) -> DistReporter:
+ """
+ Spawn local worker processes and load tests. After that, run them.
+
+ @param suite: A test or suite to be run.
+
+ @param untilFailure: If C{True}, continue to run the tests until they
+ fail.
+
+ @return: A coroutine that completes with the test result.
+ """
+
+ # Realize a concrete set of tests to run.
+ testCases = list(_iterateTests(suite))
+
+ # Create a worker pool to use to execute them.
+ poolStarter = self._workerPoolFactory(
+ WorkerPoolConfig(
+ # Don't make it larger than is useful or allowed.
+ min(len(testCases), self._maxWorkers),
+ FilePath(self._workingDirectory),
+ self._workerArguments,
+ self._logfile,
+ ),
+ )
+
+ # Announce that we're beginning. countTestCases result is preferred
+ # (over len(testCases)) because testCases may contain synthetic cases
+ # for error reporting purposes.
+ self.stream.write(f"Running {suite.countTestCases()} tests.\n")
+
+ # Start the worker pool.
+ startedPool = await poolStarter.start(self._reactor)
+
+ # The condition that will determine whether the test run repeats.
+ condition = partial(shouldContinue, untilFailure)
+
+ # A function that will run the whole suite once.
+ @countingCalls
+ async def runAndReport(n: int) -> DistReporter:
+ if untilFailure:
+ # If and only if we're running the suite more than once,
+ # provide a report about which run this is.
+ self.stream.write(f"Test Pass {n + 1}\n")
+
+ result = self._makeResult()
+
+ if self._exitFirst:
+ # Keep giving out tests as long as the result object has only
+ # seen success.
+ casesCondition = lambda _: result.original.wasSuccessful()
+ else:
+ casesCondition = lambda _: True
+
+ await runTests(
+ startedPool,
+ takeWhile(casesCondition, testCases),
+ result,
+ self._driveWorker,
+ )
+ self.writeResults(result)
+ return result
+
+ try:
+ # Start submitting tests to workers in the pool. Perhaps repeat
+ # the whole test suite more than once, if appropriate for our
+ # configuration.
+ return await iterateWhile(condition, runAndReport)
+ finally:
+ # Shut down the worker pool.
+ await startedPool.join()
+
+ def _run(self, test: Union[TestCase, TestSuite], untilFailure: bool) -> IReporter:
+ result: Union[Failure, DistReporter, None] = None
+ reactorStopping: bool = False
+ testsInProgress: Deferred[object]
+
+ def capture(r: Union[Failure, DistReporter]) -> None:
+ nonlocal result
+ result = r
+
+ def maybeStopTests() -> Optional[Deferred[object]]:
+ nonlocal reactorStopping
+ reactorStopping = True
+ if result is None:
+ testsInProgress.cancel()
+ return testsInProgress
+ return None
+
+ def maybeStopReactor(result: object) -> object:
+ if not reactorStopping:
+ self._reactor.stop()
+ return result
+
+ self._reactor.addSystemEventTrigger("before", "shutdown", maybeStopTests)
+
+ testsInProgress = (
+ Deferred.fromCoroutine(self.runAsync(test, untilFailure))
+ .addBoth(capture)
+ .addBoth(maybeStopReactor)
+ )
+
+ self._reactor.run()
+
+ if isinstance(result, Failure):
+ result.raiseException()
+
+ # mypy can't see that raiseException raises an exception so we can
+ # only get here if result is not a Failure, so tell mypy result is
+ # certainly a DistReporter at this point.
+ assert isinstance(result, DistReporter), f"{result} is not DistReporter"
+
+ # Unwrap the DistReporter to give the caller some regular IReporter
+ # object. DistReporter isn't type annotated correctly so fix it here.
+ return cast(IReporter, result.original)
+
+ def run(self, test: Union[TestCase, TestSuite]) -> IReporter:
+ """
+ Run a reactor and a test suite.
+
+ @param test: The test or suite to run.
+ """
+ return self._run(test, untilFailure=False)
+
+ def runUntilFailure(self, test: Union[TestCase, TestSuite]) -> IReporter:
+ """
+ Run the tests with local worker processes until they fail.
+
+ @param test: The test or suite to run.
+ """
+ return self._run(test, untilFailure=True)
diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/functional.py b/contrib/python/Twisted/py3/twisted/trial/_dist/functional.py
new file mode 100644
index 0000000000..3db4dca5de
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/trial/_dist/functional.py
@@ -0,0 +1,125 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+General functional-style helpers for disttrial.
+"""
+
+from functools import partial, wraps
+from typing import Awaitable, Callable, Iterable, Optional, TypeVar
+
+from twisted.internet.defer import Deferred, succeed
+
+_A = TypeVar("_A")
+_B = TypeVar("_B")
+_C = TypeVar("_C")
+
+
+def fromOptional(default: _A, optional: Optional[_A]) -> _A:
+ """
+ Get a definite value from an optional value.
+
+ @param default: The value to return if the optional value is missing.
+
+ @param optional: The optional value to return if it exists.
+ """
+ if optional is None:
+ return default
+ return optional
+
+
+def takeWhile(condition: Callable[[_A], bool], xs: Iterable[_A]) -> Iterable[_A]:
+ """
+ :return: An iterable over C{xs} that stops when C{condition} returns
+ ``False`` based on the value of iterated C{xs}.
+ """
+ for x in xs:
+ if condition(x):
+ yield x
+ else:
+ break
+
+
+async def sequence(a: Awaitable[_A], b: Awaitable[_B]) -> _B:
+ """
+ Wait for one action to complete and then another.
+
+ If either action fails, failure is propagated. If the first action fails,
+ the second action is not waited on.
+ """
+ await a
+ return await b
+
+
+def flip(f: Callable[[_A, _B], _C]) -> Callable[[_B, _A], _C]:
+ """
+ Create a function like another but with the order of the first two
+ arguments flipped.
+ """
+
+ @wraps(f)
+ def g(b, a):
+ return f(a, b)
+
+ return g
+
+
+def compose(fx: Callable[[_B], _C], fy: Callable[[_A], _B]) -> Callable[[_A], _C]:
+ """
+ Create a function that calls one function with an argument and then
+ another function with the result of the first function.
+ """
+
+ @wraps(fx)
+ @wraps(fy)
+ def g(a):
+ return fx(fy(a))
+
+ return g
+
+
+# Discard the result of an awaitable and substitute None in its place.
+#
+# Ignore the `Cannot infer type argument 1 of "compose"`
+# https://github.com/python/mypy/issues/6220
+discardResult: Callable[[Awaitable[_A]], Deferred[None]] = compose( # type: ignore[misc]
+ Deferred.fromCoroutine,
+ partial(flip(sequence), succeed(None)),
+)
+
+
+async def iterateWhile(
+ predicate: Callable[[_A], bool],
+ action: Callable[[], Awaitable[_A]],
+) -> _A:
+ """
+ Call a function repeatedly until its result fails to satisfy a predicate.
+
+ @param predicate: The check to apply.
+
+ @param action: The function to call.
+
+ @return: The result of C{action} which did not satisfy C{predicate}.
+ """
+ while True:
+ result = await action()
+ if not predicate(result):
+ return result
+
+
+def countingCalls(f: Callable[[int], _A]) -> Callable[[], _A]:
+ """
+ Wrap a function with another that automatically passes an integer counter
+ of the number of calls that have gone through the wrapper.
+ """
+ counter = 0
+
+ def g() -> _A:
+ nonlocal counter
+ try:
+ result = f(counter)
+ finally:
+ counter += 1
+ return result
+
+ return g
diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/managercommands.py b/contrib/python/Twisted/py3/twisted/trial/_dist/managercommands.py
new file mode 100644
index 0000000000..4f3080a24f
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/trial/_dist/managercommands.py
@@ -0,0 +1,89 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Commands for reporting test success of failure to the manager.
+
+@since: 12.3
+"""
+
+from twisted.protocols.amp import Boolean, Command, Integer, Unicode
+
+NativeString = Unicode
+
+
+class AddSuccess(Command):
+ """
+ Add a success.
+ """
+
+ arguments = [(b"testName", NativeString())]
+ response = [(b"success", Boolean())]
+
+
+class AddError(Command):
+ """
+ Add an error.
+ """
+
+ arguments = [
+ (b"testName", NativeString()),
+ (b"errorClass", NativeString()),
+ (b"errorStreamId", Integer()),
+ (b"framesStreamId", Integer()),
+ ]
+ response = [(b"success", Boolean())]
+
+
+class AddFailure(Command):
+ """
+ Add a failure.
+ """
+
+ arguments = [
+ (b"testName", NativeString()),
+ (b"failStreamId", Integer()),
+ (b"failClass", NativeString()),
+ (b"framesStreamId", Integer()),
+ ]
+ response = [(b"success", Boolean())]
+
+
+class AddSkip(Command):
+ """
+ Add a skip.
+ """
+
+ arguments = [(b"testName", NativeString()), (b"reason", NativeString())]
+ response = [(b"success", Boolean())]
+
+
+class AddExpectedFailure(Command):
+ """
+ Add an expected failure.
+ """
+
+ arguments = [
+ (b"testName", NativeString()),
+ (b"errorStreamId", Integer()),
+ (b"todo", NativeString()),
+ ]
+ response = [(b"success", Boolean())]
+
+
+class AddUnexpectedSuccess(Command):
+ """
+ Add an unexpected success.
+ """
+
+ arguments = [(b"testName", NativeString()), (b"todo", NativeString())]
+ response = [(b"success", Boolean())]
+
+
+class TestWrite(Command):
+ """
+ Write test log.
+ """
+
+ arguments = [(b"out", NativeString())]
+ response = [(b"success", Boolean())]
diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/options.py b/contrib/python/Twisted/py3/twisted/trial/_dist/options.py
new file mode 100644
index 0000000000..19f9a5f6cf
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/trial/_dist/options.py
@@ -0,0 +1,28 @@
+# -*- test-case-name: twisted.trial._dist.test.test_options -*-
+#
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Options handling specific to trial's workers.
+
+@since: 12.3
+"""
+
+from twisted.application.app import ReactorSelectionMixin
+from twisted.python.filepath import FilePath
+from twisted.python.usage import Options
+from twisted.scripts.trial import _BasicOptions
+
+
+class WorkerOptions(_BasicOptions, Options, ReactorSelectionMixin):
+ """
+ Options forwarded to the trial distributed worker.
+ """
+
+ def coverdir(self):
+ """
+ Return a L{FilePath} representing the directory into which coverage
+ results should be written.
+ """
+ return FilePath("coverage")
diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/stream.py b/contrib/python/Twisted/py3/twisted/trial/_dist/stream.py
new file mode 100644
index 0000000000..a53fd4ab21
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/trial/_dist/stream.py
@@ -0,0 +1,100 @@
+"""
+Buffer byte streams.
+"""
+
+from itertools import count
+from typing import Dict, Iterator, List, TypeVar
+
+from attrs import Factory, define
+
+from twisted.protocols.amp import AMP, Command, Integer, String as Bytes
+
+T = TypeVar("T")
+
+
+class StreamOpen(Command):
+ """
+ Open a new stream.
+ """
+
+ response = [(b"streamId", Integer())]
+
+
+class StreamWrite(Command):
+ """
+ Write a chunk of data to a stream.
+ """
+
+ arguments = [
+ (b"streamId", Integer()),
+ (b"data", Bytes()),
+ ]
+
+
+@define
+class StreamReceiver:
+ """
+ Buffering de-multiplexing byte stream receiver.
+ """
+
+ _counter: Iterator[int] = count()
+ _streams: Dict[int, List[bytes]] = Factory(dict)
+
+ def open(self) -> int:
+ """
+ Open a new stream and return its unique identifier.
+ """
+ newId = next(self._counter)
+ self._streams[newId] = []
+ return newId
+
+ def write(self, streamId: int, chunk: bytes) -> None:
+ """
+ Write to an open stream using its unique identifier.
+
+ @raise KeyError: If there is no such open stream.
+ """
+ self._streams[streamId].append(chunk)
+
+ def finish(self, streamId: int) -> List[bytes]:
+ """
+ Indicate an open stream may receive no further data and return all of
+ its current contents.
+
+ @raise KeyError: If there is no such open stream.
+ """
+ return self._streams.pop(streamId)
+
+
+def chunk(data: bytes, chunkSize: int) -> Iterator[bytes]:
+ """
+ Break a byte string into pieces of no more than ``chunkSize`` length.
+
+ @param data: The byte string.
+
+ @param chunkSize: The maximum length of the resulting pieces. All pieces
+ except possibly the last will be this length.
+
+ @return: The pieces.
+ """
+ pos = 0
+ while pos < len(data):
+ yield data[pos : pos + chunkSize]
+ pos += chunkSize
+
+
+async def stream(amp: AMP, chunks: Iterator[bytes]) -> int:
+ """
+ Send the given stream chunks, one by one, over the given connection.
+
+ The chunks are sent using L{StreamWrite} over a stream opened using
+ L{StreamOpen}.
+
+ @return: The identifier of the stream over which the chunks were sent.
+ """
+ streamId = (await amp.callRemote(StreamOpen))["streamId"]
+ assert isinstance(streamId, int)
+
+ for oneChunk in chunks:
+ await amp.callRemote(StreamWrite, streamId=streamId, data=oneChunk)
+ return streamId
diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/worker.py b/contrib/python/Twisted/py3/twisted/trial/_dist/worker.py
new file mode 100644
index 0000000000..77e502173a
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/trial/_dist/worker.py
@@ -0,0 +1,465 @@
+# -*- test-case-name: twisted.trial._dist.test.test_worker -*-
+#
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+This module implements the worker classes.
+
+@since: 12.3
+"""
+
+import os
+from typing import Any, Awaitable, Callable, Dict, List, Optional, TextIO, TypeVar
+from unittest import TestCase
+
+from zope.interface import implementer
+
+from attrs import frozen
+from typing_extensions import Protocol, TypedDict
+
+from twisted.internet.defer import Deferred, DeferredList
+from twisted.internet.error import ProcessDone
+from twisted.internet.interfaces import IAddress, ITransport
+from twisted.internet.protocol import ProcessProtocol
+from twisted.logger import Logger
+from twisted.protocols.amp import AMP
+from twisted.python.failure import Failure
+from twisted.python.filepath import FilePath
+from twisted.python.reflect import namedObject
+from twisted.trial._dist import (
+ _WORKER_AMP_STDIN,
+ _WORKER_AMP_STDOUT,
+ managercommands,
+ workercommands,
+)
+from twisted.trial._dist.workerreporter import WorkerReporter
+from twisted.trial.reporter import TestResult
+from twisted.trial.runner import TestLoader, TrialSuite
+from twisted.trial.unittest import Todo
+from .stream import StreamOpen, StreamReceiver, StreamWrite
+
+
+@frozen(auto_exc=False)
+class WorkerException(Exception):
+ """
+ An exception was reported by a test running in a worker process.
+
+ @ivar message: An error message describing the exception.
+ """
+
+ message: str
+
+
+class RunResult(TypedDict):
+ """
+ Represent the result of a L{workercommands.Run} command.
+ """
+
+ success: bool
+
+
+class Worker(Protocol):
+ """
+ An object that can run actions.
+ """
+
+ async def run(self, case: TestCase, result: TestResult) -> RunResult:
+ """
+ Run a test case.
+ """
+
+
+_T = TypeVar("_T")
+WorkerAction = Callable[[Worker], Awaitable[_T]]
+
+
+class WorkerProtocol(AMP):
+ """
+ The worker-side trial distributed protocol.
+ """
+
+ logger = Logger()
+
+ def __init__(self, forceGarbageCollection=False):
+ self._loader = TestLoader()
+ self._result = WorkerReporter(self)
+ self._forceGarbageCollection = forceGarbageCollection
+
+ @workercommands.Run.responder
+ async def run(self, testCase: str) -> RunResult:
+ """
+ Run a test case by name.
+ """
+ with self._result.gatherReportingResults() as results:
+ case = self._loader.loadByName(testCase)
+ suite = TrialSuite([case], self._forceGarbageCollection)
+ suite.run(self._result)
+
+ allSucceeded = True
+ for success, result in await DeferredList(results, consumeErrors=True):
+ if success:
+ # Nothing to do here, proceed to the next result.
+ continue
+
+ # There was some error reporting a result to the peer.
+ allSucceeded = False
+
+ # We can try to report the error but since something has already
+ # gone wrong we shouldn't be extremely confident that this will
+ # succeed. So we will also log it (and any errors reporting *it*)
+ # to our local log.
+ self.logger.failure(
+ "Result reporting for {id} failed",
+ # The DeferredList type annotation assumes all results succeed
+ failure=result, # type: ignore[arg-type]
+ id=testCase,
+ )
+ try:
+ await self._result.addErrorFallible(
+ testCase,
+ # The DeferredList type annotation assumes all results succeed
+ result, # type: ignore[arg-type]
+ )
+ except BaseException:
+ # We failed to report the failure to the peer. It doesn't
+ # seem very likely that reporting this new failure to the peer
+ # will succeed so just log it locally.
+ self.logger.failure(
+ "Additionally, reporting the reporting failure failed."
+ )
+
+ return {"success": allSucceeded}
+
+ @workercommands.Start.responder
+ def start(self, directory):
+ """
+ Set up the worker, moving into given directory for tests to run in
+ them.
+ """
+ os.chdir(directory)
+ return {"success": True}
+
+
+class LocalWorkerAMP(AMP):
+ """
+ Local implementation of the manager commands.
+ """
+
+ def __init__(self, boxReceiver=None, locator=None):
+ super().__init__(boxReceiver, locator)
+ self._streams = StreamReceiver()
+
+ @StreamOpen.responder
+ def streamOpen(self):
+ return {"streamId": self._streams.open()}
+
+ @StreamWrite.responder
+ def streamWrite(self, streamId, data):
+ self._streams.write(streamId, data)
+ return {}
+
+ @managercommands.AddSuccess.responder
+ def addSuccess(self, testName):
+ """
+ Add a success to the reporter.
+ """
+ self._result.addSuccess(self._testCase)
+ return {"success": True}
+
+ def _buildFailure(
+ self,
+ error: WorkerException,
+ errorClass: str,
+ frames: List[str],
+ ) -> Failure:
+ """
+ Helper to build a C{Failure} with some traceback.
+
+ @param error: An C{Exception} instance.
+
+ @param errorClass: The class name of the C{error} class.
+
+ @param frames: A flat list of strings representing the information need
+ to approximatively rebuild C{Failure} frames.
+
+ @return: A L{Failure} instance with enough information about a test
+ error.
+ """
+ errorType = namedObject(errorClass)
+ failure = Failure(error, errorType)
+ for i in range(0, len(frames), 3):
+ failure.frames.append(
+ (frames[i], frames[i + 1], int(frames[i + 2]), [], [])
+ )
+ return failure
+
+ @managercommands.AddError.responder
+ def addError(
+ self,
+ testName: str,
+ errorClass: str,
+ errorStreamId: int,
+ framesStreamId: int,
+ ) -> Dict[str, bool]:
+ """
+ Add an error to the reporter.
+
+ @param errorStreamId: The identifier of a stream over which the text
+ of this error was previously completely sent to the peer.
+
+ @param framesStreamId: The identifier of a stream over which the lines
+ of the traceback for this error were previously completely sent to
+ the peer.
+
+ @param error: A message describing the error.
+ """
+ error = b"".join(self._streams.finish(errorStreamId)).decode("utf-8")
+ frames = [
+ frame.decode("utf-8") for frame in self._streams.finish(framesStreamId)
+ ]
+ # Wrap the error message in ``WorkerException`` because it is not
+ # possible to transfer arbitrary exception values over the AMP
+ # connection to the main process but we must give *some* Exception
+ # (not a str) to the test result object.
+ failure = self._buildFailure(WorkerException(error), errorClass, frames)
+ self._result.addError(self._testCase, failure)
+ return {"success": True}
+
+ @managercommands.AddFailure.responder
+ def addFailure(
+ self,
+ testName: str,
+ failStreamId: int,
+ failClass: str,
+ framesStreamId: int,
+ ) -> Dict[str, bool]:
+ """
+ Add a failure to the reporter.
+
+ @param failStreamId: The identifier of a stream over which the text of
+ this failure was previously completely sent to the peer.
+
+ @param framesStreamId: The identifier of a stream over which the lines
+ of the traceback for this error were previously completely sent to the
+ peer.
+ """
+ fail = b"".join(self._streams.finish(failStreamId)).decode("utf-8")
+ frames = [
+ frame.decode("utf-8") for frame in self._streams.finish(framesStreamId)
+ ]
+ # See addError for info about use of WorkerException here.
+ failure = self._buildFailure(WorkerException(fail), failClass, frames)
+ self._result.addFailure(self._testCase, failure)
+ return {"success": True}
+
+ @managercommands.AddSkip.responder
+ def addSkip(self, testName, reason):
+ """
+ Add a skip to the reporter.
+ """
+ self._result.addSkip(self._testCase, reason)
+ return {"success": True}
+
+ @managercommands.AddExpectedFailure.responder
+ def addExpectedFailure(
+ self, testName: str, errorStreamId: int, todo: Optional[str]
+ ) -> Dict[str, bool]:
+ """
+ Add an expected failure to the reporter.
+
+ @param errorStreamId: The identifier of a stream over which the text
+ of this error was previously completely sent to the peer.
+ """
+ error = b"".join(self._streams.finish(errorStreamId)).decode("utf-8")
+ _todo = Todo("<unknown>" if todo is None else todo)
+ self._result.addExpectedFailure(self._testCase, error, _todo)
+ return {"success": True}
+
+ @managercommands.AddUnexpectedSuccess.responder
+ def addUnexpectedSuccess(self, testName, todo):
+ """
+ Add an unexpected success to the reporter.
+ """
+ self._result.addUnexpectedSuccess(self._testCase, todo)
+ return {"success": True}
+
+ @managercommands.TestWrite.responder
+ def testWrite(self, out):
+ """
+ Print test output from the worker.
+ """
+ self._testStream.write(out + "\n")
+ self._testStream.flush()
+ return {"success": True}
+
+ async def run(self, testCase: TestCase, result: TestResult) -> RunResult:
+ """
+ Run a test.
+ """
+ self._testCase = testCase
+ self._result = result
+ self._result.startTest(testCase)
+ testCaseId = testCase.id()
+ try:
+ return await self.callRemote(workercommands.Run, testCase=testCaseId) # type: ignore[no-any-return]
+ finally:
+ self._result.stopTest(testCase)
+
+ def setTestStream(self, stream):
+ """
+ Set the stream used to log output from tests.
+ """
+ self._testStream = stream
+
+
+@implementer(IAddress)
+class LocalWorkerAddress:
+ """
+ A L{IAddress} implementation meant to provide stub addresses for
+ L{ITransport.getPeer} and L{ITransport.getHost}.
+ """
+
+
+@implementer(ITransport)
+class LocalWorkerTransport:
+ """
+ A stub transport implementation used to support L{AMP} over a
+ L{ProcessProtocol} transport.
+ """
+
+ def __init__(self, transport):
+ self._transport = transport
+
+ def write(self, data):
+ """
+ Forward data to transport.
+ """
+ self._transport.writeToChild(_WORKER_AMP_STDIN, data)
+
+ def writeSequence(self, sequence):
+ """
+ Emulate C{writeSequence} by iterating data in the C{sequence}.
+ """
+ for data in sequence:
+ self._transport.writeToChild(_WORKER_AMP_STDIN, data)
+
+ def loseConnection(self):
+ """
+ Closes the transport.
+ """
+ self._transport.loseConnection()
+
+ def getHost(self):
+ """
+ Return a L{LocalWorkerAddress} instance.
+ """
+ return LocalWorkerAddress()
+
+ def getPeer(self):
+ """
+ Return a L{LocalWorkerAddress} instance.
+ """
+ return LocalWorkerAddress()
+
+
+class NotRunning(Exception):
+ """
+ An operation was attempted on a worker process which is not running.
+ """
+
+
+class LocalWorker(ProcessProtocol):
+ """
+ Local process worker protocol. This worker runs as a local process and
+ communicates via stdin/out.
+
+ @ivar _ampProtocol: The L{AMP} protocol instance used to communicate with
+ the worker.
+
+ @ivar _logDirectory: The directory where logs will reside.
+
+ @ivar _logFile: The main log file for tests output.
+ """
+
+ def __init__(
+ self,
+ ampProtocol: LocalWorkerAMP,
+ logDirectory: FilePath[Any],
+ logFile: TextIO,
+ ):
+ self._ampProtocol = ampProtocol
+ self._logDirectory = logDirectory
+ self._logFile = logFile
+ self.endDeferred: Deferred[None] = Deferred()
+
+ async def exit(self) -> None:
+ """
+ Cause the worker process to exit.
+ """
+ if self.transport is None:
+ raise NotRunning()
+
+ endDeferred = self.endDeferred
+ self.transport.closeChildFD(_WORKER_AMP_STDIN)
+ try:
+ await endDeferred
+ except ProcessDone:
+ pass
+
+ def connectionMade(self):
+ """
+ When connection is made, create the AMP protocol instance.
+ """
+ self._ampProtocol.makeConnection(LocalWorkerTransport(self.transport))
+ self._logDirectory.makedirs(ignoreExistingDirectory=True)
+ self._outLog = self._logDirectory.child("out.log").open("w")
+ self._errLog = self._logDirectory.child("err.log").open("w")
+ self._ampProtocol.setTestStream(self._logFile)
+ d = self._ampProtocol.callRemote(
+ workercommands.Start,
+ directory=self._logDirectory.path,
+ )
+ # Ignore the potential errors, the test suite will fail properly and it
+ # would just print garbage.
+ d.addErrback(lambda x: None)
+
+ def connectionLost(self, reason):
+ """
+ On connection lost, close the log files that we're managing for stdin
+ and stdout.
+ """
+ self._outLog.close()
+ self._errLog.close()
+ self.transport = None
+
+ def processEnded(self, reason: Failure) -> None:
+ """
+ When the process closes, call C{connectionLost} for cleanup purposes
+ and forward the information to the C{_ampProtocol}.
+ """
+ self.connectionLost(reason)
+ self._ampProtocol.connectionLost(reason)
+ self.endDeferred.callback(reason)
+
+ def outReceived(self, data):
+ """
+ Send data received from stdout to log.
+ """
+
+ self._outLog.write(data)
+
+ def errReceived(self, data):
+ """
+ Write error data to log.
+ """
+ self._errLog.write(data)
+
+ def childDataReceived(self, childFD, data):
+ """
+ Handle data received on the specific pipe for the C{_ampProtocol}.
+ """
+ if childFD == _WORKER_AMP_STDOUT:
+ self._ampProtocol.dataReceived(data)
+ else:
+ ProcessProtocol.childDataReceived(self, childFD, data)
diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/workercommands.py b/contrib/python/Twisted/py3/twisted/trial/_dist/workercommands.py
new file mode 100644
index 0000000000..f7d8d26b2e
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/trial/_dist/workercommands.py
@@ -0,0 +1,30 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Commands for telling a worker to load tests or run tests.
+
+@since: 12.3
+"""
+
+from twisted.protocols.amp import Boolean, Command, Unicode
+
+NativeString = Unicode
+
+
+class Run(Command):
+ """
+ Run a test.
+ """
+
+ arguments = [(b"testCase", NativeString())]
+ response = [(b"success", Boolean())]
+
+
+class Start(Command):
+ """
+ Set up the worker process, giving the running directory.
+ """
+
+ arguments = [(b"directory", NativeString())]
+ response = [(b"success", Boolean())]
diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/workerreporter.py b/contrib/python/Twisted/py3/twisted/trial/_dist/workerreporter.py
new file mode 100644
index 0000000000..5f2d7a0cab
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/trial/_dist/workerreporter.py
@@ -0,0 +1,354 @@
+# -*- test-case-name: twisted.trial._dist.test.test_workerreporter -*-
+#
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Test reporter forwarding test results over trial distributed AMP commands.
+
+@since: 12.3
+"""
+
+from types import TracebackType
+from typing import Callable, List, Optional, Sequence, Type, TypeVar
+from unittest import TestCase as PyUnitTestCase
+
+from attrs import Factory, define
+from typing_extensions import Literal
+
+from twisted.internet.defer import Deferred, maybeDeferred
+from twisted.protocols.amp import AMP, MAX_VALUE_LENGTH
+from twisted.python.failure import Failure
+from twisted.python.reflect import qual
+from twisted.trial._dist import managercommands
+from twisted.trial.reporter import TestResult
+from ..reporter import TrialFailure
+from .stream import chunk, stream
+
+T = TypeVar("T")
+
+
+async def addError(
+ amp: AMP, testName: str, errorClass: str, error: str, frames: List[str]
+) -> None:
+ """
+ Send an error to the worker manager over an AMP connection.
+
+ First the pieces which can be large are streamed over the connection.
+ Then, L{managercommands.AddError} is called with the rest of the
+ information and the stream IDs.
+
+ :param amp: The connection to use.
+ :param testName: The name (or ID) of the test the error relates to.
+ :param errorClass: The fully qualified name of the error type.
+ :param error: The string representation of the error.
+ :param frames: The lines of the traceback associated with the error.
+ """
+
+ errorStreamId = await stream(amp, chunk(error.encode("utf-8"), MAX_VALUE_LENGTH))
+ framesStreamId = await stream(amp, (frame.encode("utf-8") for frame in frames))
+
+ await amp.callRemote(
+ managercommands.AddError,
+ testName=testName,
+ errorClass=errorClass,
+ errorStreamId=errorStreamId,
+ framesStreamId=framesStreamId,
+ )
+
+
+async def addFailure(
+ amp: AMP, testName: str, fail: str, failClass: str, frames: List[str]
+) -> None:
+ """
+ Like L{addError} but for failures.
+
+ :param amp: See L{addError}
+ :param testName: See L{addError}
+ :param failClass: The fully qualified name of the exception associated
+ with the failure.
+ :param fail: The string representation of the failure.
+ :param frames: The lines of the traceback associated with the error.
+ """
+ failStreamId = await stream(amp, chunk(fail.encode("utf-8"), MAX_VALUE_LENGTH))
+ framesStreamId = await stream(amp, (frame.encode("utf-8") for frame in frames))
+
+ await amp.callRemote(
+ managercommands.AddFailure,
+ testName=testName,
+ failClass=failClass,
+ failStreamId=failStreamId,
+ framesStreamId=framesStreamId,
+ )
+
+
+async def addExpectedFailure(amp: AMP, testName: str, error: str, todo: str) -> None:
+ """
+ Like L{addError} but for expected failures.
+
+ :param amp: See L{addError}
+ :param testName: See L{addError}
+ :param error: The string representation of the expected failure.
+ :param todo: The string description of the expectation.
+ """
+ errorStreamId = await stream(amp, chunk(error.encode("utf-8"), MAX_VALUE_LENGTH))
+
+ await amp.callRemote(
+ managercommands.AddExpectedFailure,
+ testName=testName,
+ errorStreamId=errorStreamId,
+ todo=todo,
+ )
+
+
+@define
+class ReportingResults:
+ """
+ A mutable container for the result of sending test results back to the
+ parent process.
+
+ Since it is possible for these sends to fail asynchronously but the
+ L{TestResult} protocol is not well suited for asynchronous result
+ reporting, results are collected on an instance of this class and when the
+ runner believes the test is otherwise complete, it can collect the results
+ and do something with any errors.
+
+ :ivar _reporter: The L{WorkerReporter} this object is associated with.
+ This is the object doing the result reporting.
+
+ :ivar _results: A list of L{Deferred} instances representing the results
+ of reporting operations. This is expected to grow over the course of
+ the test run and then be inspected by the runner once the test is
+ over. The public interface to this list is via the context manager
+ interface.
+ """
+
+ _reporter: "WorkerReporter"
+ _results: List[Deferred[object]] = Factory(list)
+
+ def __enter__(self) -> Sequence[Deferred[object]]:
+ """
+ Begin a new reportable context in which results can be collected.
+
+ :return: A sequence which will contain the L{Deferred} instances
+ representing the results of all test result reporting that happens
+ while the context manager is active. The sequence is extended as
+ the test runs so its value should not be consumed until the test
+ is over.
+ """
+ return self._results
+
+ def __exit__(
+ self,
+ excType: Type[BaseException],
+ excValue: BaseException,
+ excTraceback: TracebackType,
+ ) -> Literal[False]:
+ """
+ End the reportable context.
+ """
+ self._reporter._reporting = None
+ return False
+
+ def record(self, result: Deferred[object]) -> None:
+ """
+ Record a L{Deferred} instance representing one test result reporting
+ operation.
+ """
+ self._results.append(result)
+
+
+class WorkerReporter(TestResult):
+ """
+ Reporter for trial's distributed workers. We send things not through a
+ stream, but through an C{AMP} protocol's C{callRemote} method.
+
+ @ivar _DEFAULT_TODO: Default message for expected failures and
+ unexpected successes, used only if a C{Todo} is not provided.
+
+ @ivar _reporting: When a "result reporting" context is active, the
+ corresponding context manager. Otherwise, L{None}.
+ """
+
+ _DEFAULT_TODO = "Test expected to fail"
+
+ ampProtocol: AMP
+ _reporting: Optional[ReportingResults] = None
+
+ def __init__(self, ampProtocol):
+ """
+ @param ampProtocol: The communication channel with the trial
+ distributed manager which collects all test results.
+ """
+ super().__init__()
+ self.ampProtocol = ampProtocol
+
+ def gatherReportingResults(self) -> ReportingResults:
+ """
+ Get a "result reporting" context manager.
+
+ In a "result reporting" context, asynchronous test result reporting
+ methods may be used safely. Their results (in particular, failures)
+ are available from the context manager.
+ """
+ self._reporting = ReportingResults(self)
+ return self._reporting
+
+ def _getFailure(self, error: TrialFailure) -> Failure:
+ """
+ Convert a C{sys.exc_info()}-style tuple to a L{Failure}, if necessary.
+ """
+ if isinstance(error, tuple):
+ return Failure(error[1], error[0], error[2])
+ return error
+
+ def _getFrames(self, failure: Failure) -> List[str]:
+ """
+ Extract frames from a C{Failure} instance.
+ """
+ frames: List[str] = []
+ for frame in failure.frames:
+ # The code object's name, the code object's filename, and the line
+ # number.
+ frames.extend([frame[0], frame[1], str(frame[2])])
+ return frames
+
+ def _call(self, f: Callable[[], T]) -> None:
+ """
+ Call L{f} if and only if a "result reporting" context is active.
+
+ @param f: A function to call. Its result is accumulated into the
+ result reporting context. It may return a L{Deferred} or a
+ coroutine or synchronously raise an exception or return a result
+ value.
+
+ @raise ValueError: If no result reporting context is active.
+ """
+ if self._reporting is not None:
+ self._reporting.record(maybeDeferred(f))
+ else:
+ raise ValueError(
+ "Cannot call command outside of reporting context manager."
+ )
+
+ def addSuccess(self, test: PyUnitTestCase) -> None:
+ """
+ Send a success to the parent process.
+
+ This must be called in context managed by L{gatherReportingResults}.
+ """
+ super().addSuccess(test)
+ testName = test.id()
+ self._call(
+ lambda: self.ampProtocol.callRemote(
+ managercommands.AddSuccess, testName=testName
+ )
+ )
+
+ async def addErrorFallible(self, testName: str, errorObj: TrialFailure) -> None:
+ """
+ Attempt to report an error to the parent process.
+
+ Unlike L{addError} this can fail asynchronously. This version is for
+ infrastructure code that can apply its own failure handling.
+
+ @return: A L{Deferred} that fires with the result of the attempt.
+ """
+ failure = self._getFailure(errorObj)
+ errorStr = failure.getErrorMessage()
+ errorClass = qual(failure.type)
+ frames = self._getFrames(failure)
+ await addError(
+ self.ampProtocol,
+ testName,
+ errorClass,
+ errorStr,
+ frames,
+ )
+
+ def addError(self, test: PyUnitTestCase, error: TrialFailure) -> None:
+ """
+ Send an error to the parent process.
+ """
+ super().addError(test, error)
+ testName = test.id()
+ self._call(lambda: self.addErrorFallible(testName, error))
+
+ def addFailure(self, test: PyUnitTestCase, fail: TrialFailure) -> None:
+ """
+ Send a Failure over.
+ """
+ super().addFailure(test, fail)
+ testName = test.id()
+ failure = self._getFailure(fail)
+ failureMessage = failure.getErrorMessage()
+ failClass = qual(failure.type)
+ frames = self._getFrames(failure)
+ self._call(
+ lambda: addFailure(
+ self.ampProtocol,
+ testName,
+ failureMessage,
+ failClass,
+ frames,
+ ),
+ )
+
+ def addSkip(self, test, reason):
+ """
+ Send a skip over.
+ """
+ super().addSkip(test, reason)
+ reason = str(reason)
+ testName = test.id()
+ self._call(
+ lambda: self.ampProtocol.callRemote(
+ managercommands.AddSkip, testName=testName, reason=reason
+ )
+ )
+
+ def _getTodoReason(self, todo):
+ """
+ Get the reason for a C{Todo}.
+
+ If C{todo} is L{None}, return a sensible default.
+ """
+ if todo is None:
+ return self._DEFAULT_TODO
+ else:
+ return todo.reason
+
+ def addExpectedFailure(self, test, error, todo=None):
+ """
+ Send an expected failure over.
+ """
+ super().addExpectedFailure(test, error, todo)
+ errorMessage = error.getErrorMessage()
+ testName = test.id()
+ self._call(
+ lambda: addExpectedFailure(
+ self.ampProtocol,
+ testName=testName,
+ error=errorMessage,
+ todo=self._getTodoReason(todo),
+ )
+ )
+
+ def addUnexpectedSuccess(self, test, todo=None):
+ """
+ Send an unexpected success over.
+ """
+ super().addUnexpectedSuccess(test, todo)
+ testName = test.id()
+ self._call(
+ lambda: self.ampProtocol.callRemote(
+ managercommands.AddUnexpectedSuccess,
+ testName=testName,
+ todo=self._getTodoReason(todo),
+ )
+ )
+
+ def printSummary(self):
+ """
+ I{Don't} print a summary
+ """
diff --git a/contrib/python/Twisted/py3/twisted/trial/_dist/workertrial.py b/contrib/python/Twisted/py3/twisted/trial/_dist/workertrial.py
new file mode 100644
index 0000000000..847dbc51a6
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/trial/_dist/workertrial.py
@@ -0,0 +1,93 @@
+# -*- test-case-name: twisted.trial._dist.test.test_workertrial -*-
+#
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Implementation of C{AMP} worker commands, and main executable entry point for
+the workers.
+
+@since: 12.3
+"""
+
+import errno
+import os
+import sys
+
+from twisted.internet.protocol import FileWrapper
+from twisted.python.log import startLoggingWithObserver, textFromEventDict
+from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT
+from twisted.trial._dist.options import WorkerOptions
+
+
+class WorkerLogObserver:
+ """
+ A log observer that forward its output to a C{AMP} protocol.
+ """
+
+ def __init__(self, protocol):
+ """
+ @param protocol: a connected C{AMP} protocol instance.
+ @type protocol: C{AMP}
+ """
+ self.protocol = protocol
+
+ def emit(self, eventDict):
+ """
+ Produce a log output.
+ """
+ from twisted.trial._dist import managercommands
+
+ text = textFromEventDict(eventDict)
+ if text is None:
+ return
+ self.protocol.callRemote(managercommands.TestWrite, out=text)
+
+
+def main(_fdopen=os.fdopen):
+ """
+ Main function to be run if __name__ == "__main__".
+
+ @param _fdopen: If specified, the function to use in place of C{os.fdopen}.
+ @type _fdopen: C{callable}
+ """
+ config = WorkerOptions()
+ config.parseOptions()
+
+ from twisted.trial._dist.worker import WorkerProtocol
+
+ workerProtocol = WorkerProtocol(config["force-gc"])
+
+ protocolIn = _fdopen(_WORKER_AMP_STDIN, "rb")
+ protocolOut = _fdopen(_WORKER_AMP_STDOUT, "wb")
+ workerProtocol.makeConnection(FileWrapper(protocolOut))
+
+ observer = WorkerLogObserver(workerProtocol)
+ startLoggingWithObserver(observer.emit, False)
+
+ while True:
+ try:
+ r = protocolIn.read(1)
+ except OSError as e:
+ if e.args[0] == errno.EINTR:
+ continue
+ else:
+ raise
+ if r == b"":
+ break
+ else:
+ workerProtocol.dataReceived(r)
+ protocolOut.flush()
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ if config.tracer:
+ sys.settrace(None)
+ results = config.tracer.results()
+ results.write_results(
+ show_missing=True, summary=False, coverdir=config.coverdir().path
+ )
+
+
+if __name__ == "__main__":
+ main()