aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/scripts/trial.py
blob: 531fe46ce178b36ab29cbf084d6f3b56c3e276f4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
# -*- test-case-name: twisted.trial.test.test_script -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.


import gc
import inspect
import os
import pdb
import random
import sys
import time
import trace
import warnings
from typing import NoReturn, Optional, Type

from twisted import plugin
from twisted.application import app
from twisted.internet import defer
from twisted.python import failure, reflect, usage
from twisted.python.filepath import FilePath
from twisted.python.reflect import namedModule
from twisted.trial import itrial, runner
from twisted.trial._dist.disttrial import DistTrialRunner
from twisted.trial.unittest import TestSuite

# Yea, this is stupid.  Leave it for command-line compatibility for a
# while, though.
TBFORMAT_MAP = {
    "plain": "default",
    "default": "default",
    "emacs": "brief",
    "brief": "brief",
    "cgitb": "verbose",
    "verbose": "verbose",
}


def _parseLocalVariables(line):
    """
    Accepts a single line in Emacs local variable declaration format and
    returns a dict of all the variables {name: value}.
    Raises ValueError if 'line' is in the wrong format.

    See http://www.gnu.org/software/emacs/manual/html_node/File-Variables.html
    """
    paren = "-*-"
    start = line.find(paren) + len(paren)
    end = line.rfind(paren)
    if start == -1 or end == -1:
        raise ValueError(f"{line!r} not a valid local variable declaration")
    items = line[start:end].split(";")
    localVars = {}
    for item in items:
        if len(item.strip()) == 0:
            continue
        split = item.split(":")
        if len(split) != 2:
            raise ValueError(f"{line!r} contains invalid declaration {item!r}")
        localVars[split[0].strip()] = split[1].strip()
    return localVars


def loadLocalVariables(filename):
    """
    Accepts a filename and attempts to load the Emacs variable declarations
    from that file, simulating what Emacs does.

    See http://www.gnu.org/software/emacs/manual/html_node/File-Variables.html
    """
    with open(filename) as f:
        lines = [f.readline(), f.readline()]
    for line in lines:
        try:
            return _parseLocalVariables(line)
        except ValueError:
            pass
    return {}


def getTestModules(filename):
    testCaseVar = loadLocalVariables(filename).get("test-case-name", None)
    if testCaseVar is None:
        return []
    return testCaseVar.split(",")


def isTestFile(filename):
    """
    Returns true if 'filename' looks like a file containing unit tests.
    False otherwise.  Doesn't care whether filename exists.
    """
    basename = os.path.basename(filename)
    return basename.startswith("test_") and os.path.splitext(basename)[1] == (".py")


def _reporterAction():
    return usage.CompleteList([p.longOpt for p in plugin.getPlugins(itrial.IReporter)])


def _maybeFindSourceLine(testThing):
    """
    Try to find the source line of the given test thing.

    @param testThing: the test item to attempt to inspect
    @type testThing: an L{TestCase}, test method, or module, though only the
        former two have a chance to succeed
    @rtype: int
    @return: the starting source line, or -1 if one couldn't be found
    """

    # an instance of L{TestCase} -- locate the test it will run
    method = getattr(testThing, "_testMethodName", None)
    if method is not None:
        testThing = getattr(testThing, method)

    # If it's a function, we can get the line number even if the source file no
    # longer exists
    code = getattr(testThing, "__code__", None)
    if code is not None:
        return code.co_firstlineno

    try:
        return inspect.getsourcelines(testThing)[1]
    except (OSError, TypeError):
        # either testThing is a module, which raised a TypeError, or the file
        # couldn't be read
        return -1


# orders which can be passed to trial --order
_runOrders = {
    "alphabetical": (
        "alphabetical order for test methods, arbitrary order for test cases",
        runner.name,
    ),
    "toptobottom": (
        "attempt to run test cases and methods in the order they were defined",
        _maybeFindSourceLine,
    ),
}


def _checkKnownRunOrder(order):
    """
    Check that the given order is a known test running order.

    Does nothing else, since looking up the appropriate callable to sort the
    tests should be done when it actually will be used, as the default argument
    will not be coerced by this function.

    @param order: one of the known orders in C{_runOrders}
    @return: the order unmodified
    """
    if order not in _runOrders:
        raise usage.UsageError(
            "--order must be one of: %s. See --help-orders for details"
            % (", ".join(repr(order) for order in _runOrders),)
        )
    return order


class _BasicOptions:
    """
    Basic options shared between trial and its local workers.
    """

    longdesc = (
        "trial loads and executes a suite of unit tests, obtained "
        "from modules, packages and files listed on the command line."
    )

    optFlags = [
        ["help", "h"],
        ["no-recurse", "N", "Don't recurse into packages"],
        ["help-orders", None, "Help on available test running orders"],
        ["help-reporters", None, "Help on available output plugins (reporters)"],
        [
            "rterrors",
            "e",
            "realtime errors, print out tracebacks as " "soon as they occur",
        ],
        ["unclean-warnings", None, "Turn dirty reactor errors into warnings"],
        [
            "force-gc",
            None,
            "Have Trial run gc.collect() before and " "after each test case.",
        ],
        [
            "exitfirst",
            "x",
            "Exit after the first non-successful result (cannot be "
            "specified along with --jobs).",
        ],
    ]

    optParameters = [
        [
            "order",
            "o",
            None,
            "Specify what order to run test cases and methods. "
            "See --help-orders for more info.",
            _checkKnownRunOrder,
        ],
        ["random", "z", None, "Run tests in random order using the specified seed"],
        [
            "temp-directory",
            None,
            "_trial_temp",
            "Path to use as working directory for tests.",
        ],
        [
            "reporter",
            None,
            "verbose",
            "The reporter to use for this test run.  See --help-reporters for "
            "more info.",
        ],
    ]

    compData = usage.Completions(
        optActions={
            "order": usage.CompleteList(_runOrders),
            "reporter": _reporterAction,
            "logfile": usage.CompleteFiles(descr="log file name"),
            "random": usage.Completer(descr="random seed"),
        },
        extraActions=[
            usage.CompleteFiles(
                "*.py",
                descr="file | module | package | TestCase | testMethod",
                repeat=True,
            )
        ],
    )

    tracer: Optional[trace.Trace] = None

    def __init__(self):
        self["tests"] = []
        usage.Options.__init__(self)

    def getSynopsis(self):
        executableName = reflect.filenameToModuleName(sys.argv[0])

        if executableName.endswith(".__main__"):
            executableName = "{} -m {}".format(
                os.path.basename(sys.executable),
                executableName.replace(".__main__", ""),
            )

        return """{} [options] [[file|package|module|TestCase|testmethod]...]
        """.format(
            executableName,
        )

    def coverdir(self):
        """
        Return a L{FilePath} representing the directory into which coverage
        results should be written.
        """
        coverdir = "coverage"
        result = FilePath(self["temp-directory"]).child(coverdir)
        print(f"Setting coverage directory to {result.path}.")
        return result

    # TODO: Some of the opt_* methods on this class have docstrings and some do
    #       not. This is mostly because usage.Options's currently will replace
    #       any intended output in optFlags and optParameters with the
    #       docstring. See #6427. When that is fixed, all methods should be
    #       given docstrings (and it should be verified that those with
    #       docstrings already have content suitable for printing as usage
    #       information).

    def opt_coverage(self):
        """
        Generate coverage information in the coverage file in the
        directory specified by the temp-directory option.
        """
        self.tracer = trace.Trace(count=1, trace=0)
        sys.settrace(self.tracer.globaltrace)
        self["coverage"] = True

    def opt_testmodule(self, filename):
        """
        Filename to grep for test cases (-*- test-case-name).
        """
        # If the filename passed to this parameter looks like a test module
        # we just add that to the test suite.
        #
        # If not, we inspect it for an Emacs buffer local variable called
        # 'test-case-name'.  If that variable is declared, we try to add its
        # value to the test suite as a module.
        #
        # This parameter allows automated processes (like Buildbot) to pass
        # a list of files to Trial with the general expectation of "these files,
        # whatever they are, will get tested"
        if not os.path.isfile(filename):
            sys.stderr.write(f"File {filename!r} doesn't exist\n")
            return
        filename = os.path.abspath(filename)
        if isTestFile(filename):
            self["tests"].append(filename)
        else:
            self["tests"].extend(getTestModules(filename))

    def opt_spew(self):
        """
        Print an insanely verbose log of everything that happens.  Useful
        when debugging freezes or locks in complex code.
        """
        from twisted.python.util import spewer

        sys.settrace(spewer)

    def opt_help_orders(self):
        synopsis = (
            "Trial can attempt to run test cases and their methods in "
            "a few different orders. You can select any of the "
            "following options using --order=<foo>.\n"
        )

        print(synopsis)
        for name, (description, _) in sorted(_runOrders.items()):
            print("   ", name, "\t", description)
        sys.exit(0)

    def opt_help_reporters(self):
        synopsis = (
            "Trial's output can be customized using plugins called "
            "Reporters. You can\nselect any of the following "
            "reporters using --reporter=<foo>\n"
        )
        print(synopsis)
        for p in plugin.getPlugins(itrial.IReporter):
            print("   ", p.longOpt, "\t", p.description)
        sys.exit(0)

    def opt_disablegc(self):
        """
        Disable the garbage collector
        """
        self["disablegc"] = True
        gc.disable()

    def opt_tbformat(self, opt):
        """
        Specify the format to display tracebacks with. Valid formats are
        'plain', 'emacs', and 'cgitb' which uses the nicely verbose stdlib
        cgitb.text function
        """
        try:
            self["tbformat"] = TBFORMAT_MAP[opt]
        except KeyError:
            raise usage.UsageError("tbformat must be 'plain', 'emacs', or 'cgitb'.")

    def opt_recursionlimit(self, arg):
        """
        see sys.setrecursionlimit()
        """
        try:
            sys.setrecursionlimit(int(arg))
        except (TypeError, ValueError):
            raise usage.UsageError("argument to recursionlimit must be an integer")
        else:
            self["recursionlimit"] = int(arg)

    def opt_random(self, option):
        try:
            self["random"] = int(option)
        except ValueError:
            raise usage.UsageError("Argument to --random must be a positive integer")
        else:
            if self["random"] < 0:
                raise usage.UsageError(
                    "Argument to --random must be a positive integer"
                )
            elif self["random"] == 0:
                self["random"] = int(time.time() * 100)

    def opt_without_module(self, option):
        """
        Fake the lack of the specified modules, separated with commas.
        """
        self["without-module"] = option
        for module in option.split(","):
            if module in sys.modules:
                warnings.warn(
                    "Module '%s' already imported, " "disabling anyway." % (module,),
                    category=RuntimeWarning,
                )
            sys.modules[module] = None

    def parseArgs(self, *args):
        self["tests"].extend(args)

    def _loadReporterByName(self, name):
        for p in plugin.getPlugins(itrial.IReporter):
            qual = f"{p.module}.{p.klass}"
            if p.longOpt == name:
                return reflect.namedAny(qual)
        raise usage.UsageError(
            "Only pass names of Reporter plugins to "
            "--reporter. See --help-reporters for "
            "more info."
        )

    def postOptions(self):
        # Only load reporters now, as opposed to any earlier, to avoid letting
        # application-defined plugins muck up reactor selecting by importing
        # t.i.reactor and causing the default to be installed.
        self["reporter"] = self._loadReporterByName(self["reporter"])
        if "tbformat" not in self:
            self["tbformat"] = "default"
        if self["order"] is not None and self["random"] is not None:
            raise usage.UsageError("You can't specify --random when using --order")


class Options(_BasicOptions, usage.Options, app.ReactorSelectionMixin):
    """
    Options to the trial command line tool.

    @ivar _workerFlags: List of flags which are accepted by trial distributed
        workers. This is used by C{_getWorkerArguments} to build the command
        line arguments.
    @type _workerFlags: C{list}

    @ivar _workerParameters: List of parameter which are accepted by trial
        distributed workers. This is used by C{_getWorkerArguments} to build
        the command line arguments.
    @type _workerParameters: C{list}
    """

    optFlags = [
        [
            "debug",
            "b",
            "Run tests in a debugger. If that debugger is "
            "pdb, will load '.pdbrc' from current directory if it exists.",
        ],
        [
            "debug-stacktraces",
            "B",
            "Report Deferred creation and " "callback stack traces",
        ],
        [
            "nopm",
            None,
            "don't automatically jump into debugger for " "postmorteming of exceptions",
        ],
        ["dry-run", "n", "do everything but run the tests"],
        ["profile", None, "Run tests under the Python profiler"],
        ["until-failure", "u", "Repeat test until it fails"],
    ]

    optParameters = [
        [
            "debugger",
            None,
            "pdb",
            "the fully qualified name of a debugger to " "use if --debug is passed",
        ],
        ["logfile", "l", "test.log", "log file name"],
        ["jobs", "j", None, "Number of local workers to run"],
    ]

    compData = usage.Completions(
        optActions={
            "tbformat": usage.CompleteList(["plain", "emacs", "cgitb"]),
            "reporter": _reporterAction,
        },
    )

    _workerFlags = ["disablegc", "force-gc", "coverage"]
    _workerParameters = ["recursionlimit", "reactor", "without-module"]

    def opt_jobs(self, number):
        """
        Number of local workers to run, a strictly positive integer.
        """
        try:
            number = int(number)
        except ValueError:
            raise usage.UsageError(
                "Expecting integer argument to jobs, got '%s'" % number
            )
        if number <= 0:
            raise usage.UsageError(
                "Argument to jobs must be a strictly positive integer"
            )
        self["jobs"] = number

    def _getWorkerArguments(self):
        """
        Return a list of options to pass to distributed workers.
        """
        args = []
        for option in self._workerFlags:
            if self.get(option) is not None:
                if self[option]:
                    args.append(f"--{option}")
        for option in self._workerParameters:
            if self.get(option) is not None:
                args.extend([f"--{option}", str(self[option])])
        return args

    def postOptions(self):
        _BasicOptions.postOptions(self)
        if self["jobs"]:
            conflicts = ["debug", "profile", "debug-stacktraces"]
            for option in conflicts:
                if self[option]:
                    raise usage.UsageError(
                        "You can't specify --%s when using --jobs" % option
                    )
        if self["nopm"]:
            if not self["debug"]:
                raise usage.UsageError("You must specify --debug when using " "--nopm ")
            failure.DO_POST_MORTEM = False


def _initialDebugSetup(config: Options) -> None:
    # do this part of debug setup first for easy debugging of import failures
    if config["debug"]:
        failure.startDebugMode()
    if config["debug"] or config["debug-stacktraces"]:
        defer.setDebugging(True)


def _getSuite(config: Options) -> TestSuite:
    loader = _getLoader(config)
    recurse = not config["no-recurse"]
    return loader.loadByNames(config["tests"], recurse=recurse)


def _getLoader(config: Options) -> runner.TestLoader:
    loader = runner.TestLoader()
    if config["random"]:
        randomer = random.Random()
        randomer.seed(config["random"])
        loader.sorter = lambda x: randomer.random()
        print("Running tests shuffled with seed %d\n" % config["random"])
    elif config["order"]:
        _, sorter = _runOrders[config["order"]]
        loader.sorter = sorter
    if not config["until-failure"]:
        loader.suiteFactory = runner.DestructiveTestSuite
    return loader


def _wrappedPdb():
    """
    Wrap an instance of C{pdb.Pdb} with readline support and load any .rcs.

    """

    dbg = pdb.Pdb()
    try:
        namedModule("readline")
    except ImportError:
        print("readline module not available")
    for path in (".pdbrc", "pdbrc"):
        if os.path.exists(path):
            try:
                rcFile = open(path)
            except OSError:
                pass
            else:
                with rcFile:
                    dbg.rcLines.extend(rcFile.readlines())
    return dbg


class _DebuggerNotFound(Exception):
    """
    A debugger import failed.

    Used to allow translating these errors into usage error messages.

    """


def _makeRunner(config: Options) -> runner._Runner:
    """
    Return a trial runner class set up with the parameters extracted from
    C{config}.

    @return: A trial runner instance.
    """
    cls: Type[runner._Runner] = runner.TrialRunner
    args = {
        "reporterFactory": config["reporter"],
        "tracebackFormat": config["tbformat"],
        "realTimeErrors": config["rterrors"],
        "uncleanWarnings": config["unclean-warnings"],
        "logfile": config["logfile"],
        "workingDirectory": config["temp-directory"],
        "exitFirst": config["exitfirst"],
    }
    if config["dry-run"]:
        args["mode"] = runner.TrialRunner.DRY_RUN
    elif config["jobs"]:
        cls = DistTrialRunner
        args["maxWorkers"] = config["jobs"]
        args["workerArguments"] = config._getWorkerArguments()
    else:
        if config["debug"]:
            args["mode"] = runner.TrialRunner.DEBUG
            debugger = config["debugger"]

            if debugger != "pdb":
                try:
                    args["debugger"] = reflect.namedAny(debugger)
                except reflect.ModuleNotFound:
                    raise _DebuggerNotFound(
                        f"{debugger!r} debugger could not be found."
                    )
            else:
                args["debugger"] = _wrappedPdb()

        args["profile"] = config["profile"]
        args["forceGarbageCollection"] = config["force-gc"]

    return cls(**args)


def run() -> NoReturn:
    if len(sys.argv) == 1:
        sys.argv.append("--help")
    config = Options()
    try:
        config.parseOptions()
    except usage.error as ue:
        raise SystemExit(f"{sys.argv[0]}: {ue}")
    _initialDebugSetup(config)

    try:
        trialRunner = _makeRunner(config)
    except _DebuggerNotFound as e:
        raise SystemExit(f"{sys.argv[0]}: {str(e)}")

    suite = _getSuite(config)
    if config["until-failure"]:
        testResult = trialRunner.runUntilFailure(suite)
    else:
        testResult = trialRunner.run(suite)
    if config.tracer:
        sys.settrace(None)
        results = config.tracer.results()
        results.write_results(
            show_missing=True, summary=False, coverdir=config.coverdir().path
        )
    sys.exit(not testResult.wasSuccessful())