aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/scripts/trial.py
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/scripts/trial.py
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/scripts/trial.py')
-rw-r--r--contrib/python/Twisted/py3/twisted/scripts/trial.py654
1 files changed, 654 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/scripts/trial.py b/contrib/python/Twisted/py3/twisted/scripts/trial.py
new file mode 100644
index 0000000000..531fe46ce1
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/scripts/trial.py
@@ -0,0 +1,654 @@
+# -*- test-case-name: twisted.trial.test.test_script -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+
+import gc
+import inspect
+import os
+import pdb
+import random
+import sys
+import time
+import trace
+import warnings
+from typing import NoReturn, Optional, Type
+
+from twisted import plugin
+from twisted.application import app
+from twisted.internet import defer
+from twisted.python import failure, reflect, usage
+from twisted.python.filepath import FilePath
+from twisted.python.reflect import namedModule
+from twisted.trial import itrial, runner
+from twisted.trial._dist.disttrial import DistTrialRunner
+from twisted.trial.unittest import TestSuite
+
+# Yea, this is stupid. Leave it for command-line compatibility for a
+# while, though.
+TBFORMAT_MAP = {
+ "plain": "default",
+ "default": "default",
+ "emacs": "brief",
+ "brief": "brief",
+ "cgitb": "verbose",
+ "verbose": "verbose",
+}
+
+
+def _parseLocalVariables(line):
+ """
+ Accepts a single line in Emacs local variable declaration format and
+ returns a dict of all the variables {name: value}.
+ Raises ValueError if 'line' is in the wrong format.
+
+ See http://www.gnu.org/software/emacs/manual/html_node/File-Variables.html
+ """
+ paren = "-*-"
+ start = line.find(paren) + len(paren)
+ end = line.rfind(paren)
+ if start == -1 or end == -1:
+ raise ValueError(f"{line!r} not a valid local variable declaration")
+ items = line[start:end].split(";")
+ localVars = {}
+ for item in items:
+ if len(item.strip()) == 0:
+ continue
+ split = item.split(":")
+ if len(split) != 2:
+ raise ValueError(f"{line!r} contains invalid declaration {item!r}")
+ localVars[split[0].strip()] = split[1].strip()
+ return localVars
+
+
+def loadLocalVariables(filename):
+ """
+ Accepts a filename and attempts to load the Emacs variable declarations
+ from that file, simulating what Emacs does.
+
+ See http://www.gnu.org/software/emacs/manual/html_node/File-Variables.html
+ """
+ with open(filename) as f:
+ lines = [f.readline(), f.readline()]
+ for line in lines:
+ try:
+ return _parseLocalVariables(line)
+ except ValueError:
+ pass
+ return {}
+
+
+def getTestModules(filename):
+ testCaseVar = loadLocalVariables(filename).get("test-case-name", None)
+ if testCaseVar is None:
+ return []
+ return testCaseVar.split(",")
+
+
+def isTestFile(filename):
+ """
+ Returns true if 'filename' looks like a file containing unit tests.
+ False otherwise. Doesn't care whether filename exists.
+ """
+ basename = os.path.basename(filename)
+ return basename.startswith("test_") and os.path.splitext(basename)[1] == (".py")
+
+
+def _reporterAction():
+ return usage.CompleteList([p.longOpt for p in plugin.getPlugins(itrial.IReporter)])
+
+
+def _maybeFindSourceLine(testThing):
+ """
+ Try to find the source line of the given test thing.
+
+ @param testThing: the test item to attempt to inspect
+ @type testThing: an L{TestCase}, test method, or module, though only the
+ former two have a chance to succeed
+ @rtype: int
+ @return: the starting source line, or -1 if one couldn't be found
+ """
+
+ # an instance of L{TestCase} -- locate the test it will run
+ method = getattr(testThing, "_testMethodName", None)
+ if method is not None:
+ testThing = getattr(testThing, method)
+
+ # If it's a function, we can get the line number even if the source file no
+ # longer exists
+ code = getattr(testThing, "__code__", None)
+ if code is not None:
+ return code.co_firstlineno
+
+ try:
+ return inspect.getsourcelines(testThing)[1]
+ except (OSError, TypeError):
+ # either testThing is a module, which raised a TypeError, or the file
+ # couldn't be read
+ return -1
+
+
+# orders which can be passed to trial --order
+_runOrders = {
+ "alphabetical": (
+ "alphabetical order for test methods, arbitrary order for test cases",
+ runner.name,
+ ),
+ "toptobottom": (
+ "attempt to run test cases and methods in the order they were defined",
+ _maybeFindSourceLine,
+ ),
+}
+
+
+def _checkKnownRunOrder(order):
+ """
+ Check that the given order is a known test running order.
+
+ Does nothing else, since looking up the appropriate callable to sort the
+ tests should be done when it actually will be used, as the default argument
+ will not be coerced by this function.
+
+ @param order: one of the known orders in C{_runOrders}
+ @return: the order unmodified
+ """
+ if order not in _runOrders:
+ raise usage.UsageError(
+ "--order must be one of: %s. See --help-orders for details"
+ % (", ".join(repr(order) for order in _runOrders),)
+ )
+ return order
+
+
+class _BasicOptions:
+ """
+ Basic options shared between trial and its local workers.
+ """
+
+ longdesc = (
+ "trial loads and executes a suite of unit tests, obtained "
+ "from modules, packages and files listed on the command line."
+ )
+
+ optFlags = [
+ ["help", "h"],
+ ["no-recurse", "N", "Don't recurse into packages"],
+ ["help-orders", None, "Help on available test running orders"],
+ ["help-reporters", None, "Help on available output plugins (reporters)"],
+ [
+ "rterrors",
+ "e",
+ "realtime errors, print out tracebacks as " "soon as they occur",
+ ],
+ ["unclean-warnings", None, "Turn dirty reactor errors into warnings"],
+ [
+ "force-gc",
+ None,
+ "Have Trial run gc.collect() before and " "after each test case.",
+ ],
+ [
+ "exitfirst",
+ "x",
+ "Exit after the first non-successful result (cannot be "
+ "specified along with --jobs).",
+ ],
+ ]
+
+ optParameters = [
+ [
+ "order",
+ "o",
+ None,
+ "Specify what order to run test cases and methods. "
+ "See --help-orders for more info.",
+ _checkKnownRunOrder,
+ ],
+ ["random", "z", None, "Run tests in random order using the specified seed"],
+ [
+ "temp-directory",
+ None,
+ "_trial_temp",
+ "Path to use as working directory for tests.",
+ ],
+ [
+ "reporter",
+ None,
+ "verbose",
+ "The reporter to use for this test run. See --help-reporters for "
+ "more info.",
+ ],
+ ]
+
+ compData = usage.Completions(
+ optActions={
+ "order": usage.CompleteList(_runOrders),
+ "reporter": _reporterAction,
+ "logfile": usage.CompleteFiles(descr="log file name"),
+ "random": usage.Completer(descr="random seed"),
+ },
+ extraActions=[
+ usage.CompleteFiles(
+ "*.py",
+ descr="file | module | package | TestCase | testMethod",
+ repeat=True,
+ )
+ ],
+ )
+
+ tracer: Optional[trace.Trace] = None
+
+ def __init__(self):
+ self["tests"] = []
+ usage.Options.__init__(self)
+
+ def getSynopsis(self):
+ executableName = reflect.filenameToModuleName(sys.argv[0])
+
+ if executableName.endswith(".__main__"):
+ executableName = "{} -m {}".format(
+ os.path.basename(sys.executable),
+ executableName.replace(".__main__", ""),
+ )
+
+ return """{} [options] [[file|package|module|TestCase|testmethod]...]
+ """.format(
+ executableName,
+ )
+
+ def coverdir(self):
+ """
+ Return a L{FilePath} representing the directory into which coverage
+ results should be written.
+ """
+ coverdir = "coverage"
+ result = FilePath(self["temp-directory"]).child(coverdir)
+ print(f"Setting coverage directory to {result.path}.")
+ return result
+
+ # TODO: Some of the opt_* methods on this class have docstrings and some do
+ # not. This is mostly because usage.Options's currently will replace
+ # any intended output in optFlags and optParameters with the
+ # docstring. See #6427. When that is fixed, all methods should be
+ # given docstrings (and it should be verified that those with
+ # docstrings already have content suitable for printing as usage
+ # information).
+
+ def opt_coverage(self):
+ """
+ Generate coverage information in the coverage file in the
+ directory specified by the temp-directory option.
+ """
+ self.tracer = trace.Trace(count=1, trace=0)
+ sys.settrace(self.tracer.globaltrace)
+ self["coverage"] = True
+
+ def opt_testmodule(self, filename):
+ """
+ Filename to grep for test cases (-*- test-case-name).
+ """
+ # If the filename passed to this parameter looks like a test module
+ # we just add that to the test suite.
+ #
+ # If not, we inspect it for an Emacs buffer local variable called
+ # 'test-case-name'. If that variable is declared, we try to add its
+ # value to the test suite as a module.
+ #
+ # This parameter allows automated processes (like Buildbot) to pass
+ # a list of files to Trial with the general expectation of "these files,
+ # whatever they are, will get tested"
+ if not os.path.isfile(filename):
+ sys.stderr.write(f"File {filename!r} doesn't exist\n")
+ return
+ filename = os.path.abspath(filename)
+ if isTestFile(filename):
+ self["tests"].append(filename)
+ else:
+ self["tests"].extend(getTestModules(filename))
+
+ def opt_spew(self):
+ """
+ Print an insanely verbose log of everything that happens. Useful
+ when debugging freezes or locks in complex code.
+ """
+ from twisted.python.util import spewer
+
+ sys.settrace(spewer)
+
+ def opt_help_orders(self):
+ synopsis = (
+ "Trial can attempt to run test cases and their methods in "
+ "a few different orders. You can select any of the "
+ "following options using --order=<foo>.\n"
+ )
+
+ print(synopsis)
+ for name, (description, _) in sorted(_runOrders.items()):
+ print(" ", name, "\t", description)
+ sys.exit(0)
+
+ def opt_help_reporters(self):
+ synopsis = (
+ "Trial's output can be customized using plugins called "
+ "Reporters. You can\nselect any of the following "
+ "reporters using --reporter=<foo>\n"
+ )
+ print(synopsis)
+ for p in plugin.getPlugins(itrial.IReporter):
+ print(" ", p.longOpt, "\t", p.description)
+ sys.exit(0)
+
+ def opt_disablegc(self):
+ """
+ Disable the garbage collector
+ """
+ self["disablegc"] = True
+ gc.disable()
+
+ def opt_tbformat(self, opt):
+ """
+ Specify the format to display tracebacks with. Valid formats are
+ 'plain', 'emacs', and 'cgitb' which uses the nicely verbose stdlib
+ cgitb.text function
+ """
+ try:
+ self["tbformat"] = TBFORMAT_MAP[opt]
+ except KeyError:
+ raise usage.UsageError("tbformat must be 'plain', 'emacs', or 'cgitb'.")
+
+ def opt_recursionlimit(self, arg):
+ """
+ see sys.setrecursionlimit()
+ """
+ try:
+ sys.setrecursionlimit(int(arg))
+ except (TypeError, ValueError):
+ raise usage.UsageError("argument to recursionlimit must be an integer")
+ else:
+ self["recursionlimit"] = int(arg)
+
+ def opt_random(self, option):
+ try:
+ self["random"] = int(option)
+ except ValueError:
+ raise usage.UsageError("Argument to --random must be a positive integer")
+ else:
+ if self["random"] < 0:
+ raise usage.UsageError(
+ "Argument to --random must be a positive integer"
+ )
+ elif self["random"] == 0:
+ self["random"] = int(time.time() * 100)
+
+ def opt_without_module(self, option):
+ """
+ Fake the lack of the specified modules, separated with commas.
+ """
+ self["without-module"] = option
+ for module in option.split(","):
+ if module in sys.modules:
+ warnings.warn(
+ "Module '%s' already imported, " "disabling anyway." % (module,),
+ category=RuntimeWarning,
+ )
+ sys.modules[module] = None
+
+ def parseArgs(self, *args):
+ self["tests"].extend(args)
+
+ def _loadReporterByName(self, name):
+ for p in plugin.getPlugins(itrial.IReporter):
+ qual = f"{p.module}.{p.klass}"
+ if p.longOpt == name:
+ return reflect.namedAny(qual)
+ raise usage.UsageError(
+ "Only pass names of Reporter plugins to "
+ "--reporter. See --help-reporters for "
+ "more info."
+ )
+
+ def postOptions(self):
+ # Only load reporters now, as opposed to any earlier, to avoid letting
+ # application-defined plugins muck up reactor selecting by importing
+ # t.i.reactor and causing the default to be installed.
+ self["reporter"] = self._loadReporterByName(self["reporter"])
+ if "tbformat" not in self:
+ self["tbformat"] = "default"
+ if self["order"] is not None and self["random"] is not None:
+ raise usage.UsageError("You can't specify --random when using --order")
+
+
+class Options(_BasicOptions, usage.Options, app.ReactorSelectionMixin):
+ """
+ Options to the trial command line tool.
+
+ @ivar _workerFlags: List of flags which are accepted by trial distributed
+ workers. This is used by C{_getWorkerArguments} to build the command
+ line arguments.
+ @type _workerFlags: C{list}
+
+ @ivar _workerParameters: List of parameter which are accepted by trial
+ distributed workers. This is used by C{_getWorkerArguments} to build
+ the command line arguments.
+ @type _workerParameters: C{list}
+ """
+
+ optFlags = [
+ [
+ "debug",
+ "b",
+ "Run tests in a debugger. If that debugger is "
+ "pdb, will load '.pdbrc' from current directory if it exists.",
+ ],
+ [
+ "debug-stacktraces",
+ "B",
+ "Report Deferred creation and " "callback stack traces",
+ ],
+ [
+ "nopm",
+ None,
+ "don't automatically jump into debugger for " "postmorteming of exceptions",
+ ],
+ ["dry-run", "n", "do everything but run the tests"],
+ ["profile", None, "Run tests under the Python profiler"],
+ ["until-failure", "u", "Repeat test until it fails"],
+ ]
+
+ optParameters = [
+ [
+ "debugger",
+ None,
+ "pdb",
+ "the fully qualified name of a debugger to " "use if --debug is passed",
+ ],
+ ["logfile", "l", "test.log", "log file name"],
+ ["jobs", "j", None, "Number of local workers to run"],
+ ]
+
+ compData = usage.Completions(
+ optActions={
+ "tbformat": usage.CompleteList(["plain", "emacs", "cgitb"]),
+ "reporter": _reporterAction,
+ },
+ )
+
+ _workerFlags = ["disablegc", "force-gc", "coverage"]
+ _workerParameters = ["recursionlimit", "reactor", "without-module"]
+
+ def opt_jobs(self, number):
+ """
+ Number of local workers to run, a strictly positive integer.
+ """
+ try:
+ number = int(number)
+ except ValueError:
+ raise usage.UsageError(
+ "Expecting integer argument to jobs, got '%s'" % number
+ )
+ if number <= 0:
+ raise usage.UsageError(
+ "Argument to jobs must be a strictly positive integer"
+ )
+ self["jobs"] = number
+
+ def _getWorkerArguments(self):
+ """
+ Return a list of options to pass to distributed workers.
+ """
+ args = []
+ for option in self._workerFlags:
+ if self.get(option) is not None:
+ if self[option]:
+ args.append(f"--{option}")
+ for option in self._workerParameters:
+ if self.get(option) is not None:
+ args.extend([f"--{option}", str(self[option])])
+ return args
+
+ def postOptions(self):
+ _BasicOptions.postOptions(self)
+ if self["jobs"]:
+ conflicts = ["debug", "profile", "debug-stacktraces"]
+ for option in conflicts:
+ if self[option]:
+ raise usage.UsageError(
+ "You can't specify --%s when using --jobs" % option
+ )
+ if self["nopm"]:
+ if not self["debug"]:
+ raise usage.UsageError("You must specify --debug when using " "--nopm ")
+ failure.DO_POST_MORTEM = False
+
+
+def _initialDebugSetup(config: Options) -> None:
+ # do this part of debug setup first for easy debugging of import failures
+ if config["debug"]:
+ failure.startDebugMode()
+ if config["debug"] or config["debug-stacktraces"]:
+ defer.setDebugging(True)
+
+
+def _getSuite(config: Options) -> TestSuite:
+ loader = _getLoader(config)
+ recurse = not config["no-recurse"]
+ return loader.loadByNames(config["tests"], recurse=recurse)
+
+
+def _getLoader(config: Options) -> runner.TestLoader:
+ loader = runner.TestLoader()
+ if config["random"]:
+ randomer = random.Random()
+ randomer.seed(config["random"])
+ loader.sorter = lambda x: randomer.random()
+ print("Running tests shuffled with seed %d\n" % config["random"])
+ elif config["order"]:
+ _, sorter = _runOrders[config["order"]]
+ loader.sorter = sorter
+ if not config["until-failure"]:
+ loader.suiteFactory = runner.DestructiveTestSuite
+ return loader
+
+
+def _wrappedPdb():
+ """
+ Wrap an instance of C{pdb.Pdb} with readline support and load any .rcs.
+
+ """
+
+ dbg = pdb.Pdb()
+ try:
+ namedModule("readline")
+ except ImportError:
+ print("readline module not available")
+ for path in (".pdbrc", "pdbrc"):
+ if os.path.exists(path):
+ try:
+ rcFile = open(path)
+ except OSError:
+ pass
+ else:
+ with rcFile:
+ dbg.rcLines.extend(rcFile.readlines())
+ return dbg
+
+
+class _DebuggerNotFound(Exception):
+ """
+ A debugger import failed.
+
+ Used to allow translating these errors into usage error messages.
+
+ """
+
+
+def _makeRunner(config: Options) -> runner._Runner:
+ """
+ Return a trial runner class set up with the parameters extracted from
+ C{config}.
+
+ @return: A trial runner instance.
+ """
+ cls: Type[runner._Runner] = runner.TrialRunner
+ args = {
+ "reporterFactory": config["reporter"],
+ "tracebackFormat": config["tbformat"],
+ "realTimeErrors": config["rterrors"],
+ "uncleanWarnings": config["unclean-warnings"],
+ "logfile": config["logfile"],
+ "workingDirectory": config["temp-directory"],
+ "exitFirst": config["exitfirst"],
+ }
+ if config["dry-run"]:
+ args["mode"] = runner.TrialRunner.DRY_RUN
+ elif config["jobs"]:
+ cls = DistTrialRunner
+ args["maxWorkers"] = config["jobs"]
+ args["workerArguments"] = config._getWorkerArguments()
+ else:
+ if config["debug"]:
+ args["mode"] = runner.TrialRunner.DEBUG
+ debugger = config["debugger"]
+
+ if debugger != "pdb":
+ try:
+ args["debugger"] = reflect.namedAny(debugger)
+ except reflect.ModuleNotFound:
+ raise _DebuggerNotFound(
+ f"{debugger!r} debugger could not be found."
+ )
+ else:
+ args["debugger"] = _wrappedPdb()
+
+ args["profile"] = config["profile"]
+ args["forceGarbageCollection"] = config["force-gc"]
+
+ return cls(**args)
+
+
+def run() -> NoReturn:
+ if len(sys.argv) == 1:
+ sys.argv.append("--help")
+ config = Options()
+ try:
+ config.parseOptions()
+ except usage.error as ue:
+ raise SystemExit(f"{sys.argv[0]}: {ue}")
+ _initialDebugSetup(config)
+
+ try:
+ trialRunner = _makeRunner(config)
+ except _DebuggerNotFound as e:
+ raise SystemExit(f"{sys.argv[0]}: {str(e)}")
+
+ suite = _getSuite(config)
+ if config["until-failure"]:
+ testResult = trialRunner.runUntilFailure(suite)
+ else:
+ testResult = trialRunner.run(suite)
+ if config.tracer:
+ sys.settrace(None)
+ results = config.tracer.results()
+ results.write_results(
+ show_missing=True, summary=False, coverdir=config.coverdir().path
+ )
+ sys.exit(not testResult.wasSuccessful())