diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/scripts/trial.py | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/scripts/trial.py')
-rw-r--r-- | contrib/python/Twisted/py2/twisted/scripts/trial.py | 627 |
1 files changed, 627 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/scripts/trial.py b/contrib/python/Twisted/py2/twisted/scripts/trial.py new file mode 100644 index 0000000000..6c6e80aa16 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/scripts/trial.py @@ -0,0 +1,627 @@ +# -*- test-case-name: twisted.trial.test.test_script -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +from __future__ import absolute_import, division, print_function + +import gc +import inspect +import os +import pdb +import random +import sys +import time +import warnings + +from twisted.internet import defer +from twisted.application import app +from twisted.python import usage, reflect, failure +from twisted.python.filepath import FilePath +from twisted.python.reflect import namedModule +from twisted.python.compat import long +from twisted import plugin +from twisted.trial import runner, itrial, reporter + + +# Yea, this is stupid. Leave it for command-line compatibility for a +# while, though. +TBFORMAT_MAP = { + 'plain': 'default', + 'default': 'default', + 'emacs': 'brief', + 'brief': 'brief', + 'cgitb': 'verbose', + 'verbose': 'verbose' + } + + +def _parseLocalVariables(line): + """ + Accepts a single line in Emacs local variable declaration format and + returns a dict of all the variables {name: value}. + Raises ValueError if 'line' is in the wrong format. + + See http://www.gnu.org/software/emacs/manual/html_node/File-Variables.html + """ + paren = '-*-' + start = line.find(paren) + len(paren) + end = line.rfind(paren) + if start == -1 or end == -1: + raise ValueError("%r not a valid local variable declaration" % (line,)) + items = line[start:end].split(';') + localVars = {} + for item in items: + if len(item.strip()) == 0: + continue + split = item.split(':') + if len(split) != 2: + raise ValueError("%r contains invalid declaration %r" + % (line, item)) + localVars[split[0].strip()] = split[1].strip() + return localVars + + +def loadLocalVariables(filename): + """ + Accepts a filename and attempts to load the Emacs variable declarations + from that file, simulating what Emacs does. + + See http://www.gnu.org/software/emacs/manual/html_node/File-Variables.html + """ + with open(filename, "r") as f: + lines = [f.readline(), f.readline()] + for line in lines: + try: + return _parseLocalVariables(line) + except ValueError: + pass + return {} + + +def getTestModules(filename): + testCaseVar = loadLocalVariables(filename).get('test-case-name', None) + if testCaseVar is None: + return [] + return testCaseVar.split(',') + + +def isTestFile(filename): + """ + Returns true if 'filename' looks like a file containing unit tests. + False otherwise. Doesn't care whether filename exists. + """ + basename = os.path.basename(filename) + return (basename.startswith('test_') + and os.path.splitext(basename)[1] == ('.py')) + + +def _reporterAction(): + return usage.CompleteList([p.longOpt for p in + plugin.getPlugins(itrial.IReporter)]) + + +def _maybeFindSourceLine(testThing): + """ + Try to find the source line of the given test thing. + + @param testThing: the test item to attempt to inspect + @type testThing: an L{TestCase}, test method, or module, though only the + former two have a chance to succeed + @rtype: int + @return: the starting source line, or -1 if one couldn't be found + """ + + # an instance of L{TestCase} -- locate the test it will run + method = getattr(testThing, "_testMethodName", None) + if method is not None: + testThing = getattr(testThing, method) + + # If it's a function, we can get the line number even if the source file no + # longer exists + code = getattr(testThing, "__code__", None) + if code is not None: + return code.co_firstlineno + + try: + return inspect.getsourcelines(testThing)[1] + except (IOError, TypeError): + # either testThing is a module, which raised a TypeError, or the file + # couldn't be read + return -1 + + +# orders which can be passed to trial --order +_runOrders = { + "alphabetical" : ( + "alphabetical order for test methods, arbitrary order for test cases", + runner.name), + "toptobottom" : ( + "attempt to run test cases and methods in the order they were defined", + _maybeFindSourceLine), +} + + +def _checkKnownRunOrder(order): + """ + Check that the given order is a known test running order. + + Does nothing else, since looking up the appropriate callable to sort the + tests should be done when it actually will be used, as the default argument + will not be coerced by this function. + + @param order: one of the known orders in C{_runOrders} + @return: the order unmodified + """ + if order not in _runOrders: + raise usage.UsageError( + "--order must be one of: %s. See --help-orders for details" % + (", ".join(repr(order) for order in _runOrders),)) + return order + + + +class _BasicOptions(object): + """ + Basic options shared between trial and its local workers. + """ + longdesc = ("trial loads and executes a suite of unit tests, obtained " + "from modules, packages and files listed on the command line.") + + optFlags = [["help", "h"], + ["no-recurse", "N", "Don't recurse into packages"], + ['help-orders', None, "Help on available test running orders"], + ['help-reporters', None, + "Help on available output plugins (reporters)"], + ["rterrors", "e", "realtime errors, print out tracebacks as " + "soon as they occur"], + ["unclean-warnings", None, + "Turn dirty reactor errors into warnings"], + ["force-gc", None, "Have Trial run gc.collect() before and " + "after each test case."], + ["exitfirst", "x", + "Exit after the first non-successful result (cannot be " + "specified along with --jobs)."], + ] + + optParameters = [ + ["order", "o", None, + "Specify what order to run test cases and methods. " + "See --help-orders for more info.", _checkKnownRunOrder], + ["random", "z", None, + "Run tests in random order using the specified seed"], + ['temp-directory', None, '_trial_temp', + 'Path to use as working directory for tests.'], + ['reporter', None, 'verbose', + 'The reporter to use for this test run. See --help-reporters for ' + 'more info.']] + + compData = usage.Completions( + optActions={"order": usage.CompleteList(_runOrders), + "reporter": _reporterAction, + "logfile": usage.CompleteFiles(descr="log file name"), + "random": usage.Completer(descr="random seed")}, + extraActions=[usage.CompleteFiles( + "*.py", descr="file | module | package | TestCase | testMethod", + repeat=True)], + ) + + fallbackReporter = reporter.TreeReporter + tracer = None + + def __init__(self): + self['tests'] = [] + usage.Options.__init__(self) + + def getSynopsis(self): + executableName = reflect.filenameToModuleName(sys.argv[0]) + + if executableName.endswith('.__main__'): + executableName = '{} -m {}'.format(os.path.basename(sys.executable), + executableName.replace('.__main__', '')) + + return """%s [options] [[file|package|module|TestCase|testmethod]...] + """ % (executableName,) + + def coverdir(self): + """ + Return a L{FilePath} representing the directory into which coverage + results should be written. + """ + coverdir = 'coverage' + result = FilePath(self['temp-directory']).child(coverdir) + print("Setting coverage directory to %s." % (result.path,)) + return result + + + # TODO: Some of the opt_* methods on this class have docstrings and some do + # not. This is mostly because usage.Options's currently will replace + # any intended output in optFlags and optParameters with the + # docstring. See #6427. When that is fixed, all methods should be + # given docstrings (and it should be verified that those with + # docstrings already have content suitable for printing as usage + # information). + + def opt_coverage(self): + """ + Generate coverage information in the coverage file in the + directory specified by the temp-directory option. + """ + import trace + self.tracer = trace.Trace(count=1, trace=0) + sys.settrace(self.tracer.globaltrace) + self['coverage'] = True + + + def opt_testmodule(self, filename): + """ + Filename to grep for test cases (-*- test-case-name). + """ + # If the filename passed to this parameter looks like a test module + # we just add that to the test suite. + # + # If not, we inspect it for an Emacs buffer local variable called + # 'test-case-name'. If that variable is declared, we try to add its + # value to the test suite as a module. + # + # This parameter allows automated processes (like Buildbot) to pass + # a list of files to Trial with the general expectation of "these files, + # whatever they are, will get tested" + if not os.path.isfile(filename): + sys.stderr.write("File %r doesn't exist\n" % (filename,)) + return + filename = os.path.abspath(filename) + if isTestFile(filename): + self['tests'].append(filename) + else: + self['tests'].extend(getTestModules(filename)) + + + def opt_spew(self): + """ + Print an insanely verbose log of everything that happens. Useful + when debugging freezes or locks in complex code. + """ + from twisted.python.util import spewer + sys.settrace(spewer) + + + def opt_help_orders(self): + synopsis = ("Trial can attempt to run test cases and their methods in " + "a few different orders. You can select any of the " + "following options using --order=<foo>.\n") + + print(synopsis) + for name, (description, _) in sorted(_runOrders.items()): + print(' ', name, '\t', description) + sys.exit(0) + + + def opt_help_reporters(self): + synopsis = ("Trial's output can be customized using plugins called " + "Reporters. You can\nselect any of the following " + "reporters using --reporter=<foo>\n") + print(synopsis) + for p in plugin.getPlugins(itrial.IReporter): + print(' ', p.longOpt, '\t', p.description) + sys.exit(0) + + + def opt_disablegc(self): + """ + Disable the garbage collector + """ + self["disablegc"] = True + gc.disable() + + + def opt_tbformat(self, opt): + """ + Specify the format to display tracebacks with. Valid formats are + 'plain', 'emacs', and 'cgitb' which uses the nicely verbose stdlib + cgitb.text function + """ + try: + self['tbformat'] = TBFORMAT_MAP[opt] + except KeyError: + raise usage.UsageError( + "tbformat must be 'plain', 'emacs', or 'cgitb'.") + + + def opt_recursionlimit(self, arg): + """ + see sys.setrecursionlimit() + """ + try: + sys.setrecursionlimit(int(arg)) + except (TypeError, ValueError): + raise usage.UsageError( + "argument to recursionlimit must be an integer") + else: + self["recursionlimit"] = int(arg) + + + def opt_random(self, option): + try: + self['random'] = long(option) + except ValueError: + raise usage.UsageError( + "Argument to --random must be a positive integer") + else: + if self['random'] < 0: + raise usage.UsageError( + "Argument to --random must be a positive integer") + elif self['random'] == 0: + self['random'] = long(time.time() * 100) + + + def opt_without_module(self, option): + """ + Fake the lack of the specified modules, separated with commas. + """ + self["without-module"] = option + for module in option.split(","): + if module in sys.modules: + warnings.warn("Module '%s' already imported, " + "disabling anyway." % (module,), + category=RuntimeWarning) + sys.modules[module] = None + + + def parseArgs(self, *args): + self['tests'].extend(args) + + + def _loadReporterByName(self, name): + for p in plugin.getPlugins(itrial.IReporter): + qual = "%s.%s" % (p.module, p.klass) + if p.longOpt == name: + return reflect.namedAny(qual) + raise usage.UsageError("Only pass names of Reporter plugins to " + "--reporter. See --help-reporters for " + "more info.") + + + def postOptions(self): + # Only load reporters now, as opposed to any earlier, to avoid letting + # application-defined plugins muck up reactor selecting by importing + # t.i.reactor and causing the default to be installed. + self['reporter'] = self._loadReporterByName(self['reporter']) + if 'tbformat' not in self: + self['tbformat'] = 'default' + if self['order'] is not None and self['random'] is not None: + raise usage.UsageError( + "You can't specify --random when using --order") + + + +class Options(_BasicOptions, usage.Options, app.ReactorSelectionMixin): + """ + Options to the trial command line tool. + + @ivar _workerFlags: List of flags which are accepted by trial distributed + workers. This is used by C{_getWorkerArguments} to build the command + line arguments. + @type _workerFlags: C{list} + + @ivar _workerParameters: List of parameter which are accepted by trial + distributed workers. This is used by C{_getWorkerArguments} to build + the command line arguments. + @type _workerParameters: C{list} + """ + + optFlags = [ + ["debug", "b", "Run tests in a debugger. If that debugger is " + "pdb, will load '.pdbrc' from current directory if it exists." + ], + ["debug-stacktraces", "B", "Report Deferred creation and " + "callback stack traces"], + ["nopm", None, "don't automatically jump into debugger for " + "postmorteming of exceptions"], + ["dry-run", 'n', "do everything but run the tests"], + ["profile", None, "Run tests under the Python profiler"], + ["until-failure", "u", "Repeat test until it fails"], + ] + + optParameters = [ + ["debugger", None, "pdb", "the fully qualified name of a debugger to " + "use if --debug is passed"], + ["logfile", "l", "test.log", "log file name"], + ["jobs", "j", None, "Number of local workers to run"] + ] + + compData = usage.Completions( + optActions = { + "tbformat": usage.CompleteList(["plain", "emacs", "cgitb"]), + "reporter": _reporterAction, + }, + ) + + _workerFlags = ["disablegc", "force-gc", "coverage"] + _workerParameters = ["recursionlimit", "reactor", "without-module"] + + fallbackReporter = reporter.TreeReporter + extra = None + tracer = None + + + def opt_jobs(self, number): + """ + Number of local workers to run, a strictly positive integer. + """ + try: + number = int(number) + except ValueError: + raise usage.UsageError( + "Expecting integer argument to jobs, got '%s'" % number) + if number <= 0: + raise usage.UsageError( + "Argument to jobs must be a strictly positive integer") + self["jobs"] = number + + + def _getWorkerArguments(self): + """ + Return a list of options to pass to distributed workers. + """ + args = [] + for option in self._workerFlags: + if self.get(option) is not None: + if self[option]: + args.append("--%s" % (option,)) + for option in self._workerParameters: + if self.get(option) is not None: + args.extend(["--%s" % (option,), str(self[option])]) + return args + + + def postOptions(self): + _BasicOptions.postOptions(self) + if self['jobs']: + conflicts = ['debug', 'profile', 'debug-stacktraces', 'exitfirst'] + for option in conflicts: + if self[option]: + raise usage.UsageError( + "You can't specify --%s when using --jobs" % option) + if self['nopm']: + if not self['debug']: + raise usage.UsageError("You must specify --debug when using " + "--nopm ") + failure.DO_POST_MORTEM = False + + + +def _initialDebugSetup(config): + # do this part of debug setup first for easy debugging of import failures + if config['debug']: + failure.startDebugMode() + if config['debug'] or config['debug-stacktraces']: + defer.setDebugging(True) + + + +def _getSuite(config): + loader = _getLoader(config) + recurse = not config['no-recurse'] + return loader.loadByNames(config['tests'], recurse=recurse) + + + +def _getLoader(config): + loader = runner.TestLoader() + if config['random']: + randomer = random.Random() + randomer.seed(config['random']) + loader.sorter = lambda x : randomer.random() + print('Running tests shuffled with seed %d\n' % config['random']) + elif config['order']: + _, sorter = _runOrders[config['order']] + loader.sorter = sorter + if not config['until-failure']: + loader.suiteFactory = runner.DestructiveTestSuite + return loader + + +def _wrappedPdb(): + """ + Wrap an instance of C{pdb.Pdb} with readline support and load any .rcs. + + """ + + dbg = pdb.Pdb() + try: + namedModule('readline') + except ImportError: + print("readline module not available") + for path in ('.pdbrc', 'pdbrc'): + if os.path.exists(path): + try: + rcFile = open(path, 'r') + except IOError: + pass + else: + with rcFile: + dbg.rcLines.extend(rcFile.readlines()) + return dbg + + +class _DebuggerNotFound(Exception): + """ + A debugger import failed. + + Used to allow translating these errors into usage error messages. + + """ + + + +def _makeRunner(config): + """ + Return a trial runner class set up with the parameters extracted from + C{config}. + + @return: A trial runner instance. + @rtype: L{runner.TrialRunner} or C{DistTrialRunner} depending on the + configuration. + """ + cls = runner.TrialRunner + args = {'reporterFactory': config['reporter'], + 'tracebackFormat': config['tbformat'], + 'realTimeErrors': config['rterrors'], + 'uncleanWarnings': config['unclean-warnings'], + 'logfile': config['logfile'], + 'workingDirectory': config['temp-directory']} + if config['dry-run']: + args['mode'] = runner.TrialRunner.DRY_RUN + elif config['jobs']: + from twisted.trial._dist.disttrial import DistTrialRunner + cls = DistTrialRunner + args['workerNumber'] = config['jobs'] + args['workerArguments'] = config._getWorkerArguments() + else: + if config['debug']: + args['mode'] = runner.TrialRunner.DEBUG + debugger = config['debugger'] + + if debugger != 'pdb': + try: + args['debugger'] = reflect.namedAny(debugger) + except reflect.ModuleNotFound: + raise _DebuggerNotFound( + '%r debugger could not be found.' % (debugger,)) + else: + args['debugger'] = _wrappedPdb() + + args['exitFirst'] = config['exitfirst'] + args['profile'] = config['profile'] + args['forceGarbageCollection'] = config['force-gc'] + + return cls(**args) + + + +def run(): + if len(sys.argv) == 1: + sys.argv.append("--help") + config = Options() + try: + config.parseOptions() + except usage.error as ue: + raise SystemExit("%s: %s" % (sys.argv[0], ue)) + _initialDebugSetup(config) + + try: + trialRunner = _makeRunner(config) + except _DebuggerNotFound as e: + raise SystemExit('%s: %s' % (sys.argv[0], str(e))) + + suite = _getSuite(config) + if config['until-failure']: + test_result = trialRunner.runUntilFailure(suite) + else: + test_result = trialRunner.run(suite) + if config.tracer: + sys.settrace(None) + results = config.tracer.results() + results.write_results(show_missing=1, summary=False, + coverdir=config.coverdir().path) + sys.exit(not test_result.wasSuccessful()) |