aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/application/twist/_options.py
blob: 2bcd207bf4c8d236d5c5cb1961e1596b5b49bab9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# -*- test-case-name: twisted.application.twist.test.test_options -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Command line options for C{twist}.
"""

import typing
from sys import stderr, stdout
from textwrap import dedent
from typing import Callable, Iterable, Mapping, Optional, Sequence, Tuple, cast

from twisted.copyright import version
from twisted.internet.interfaces import IReactorCore
from twisted.logger import (
    InvalidLogLevelError,
    LogLevel,
    jsonFileLogObserver,
    textFileLogObserver,
)
from twisted.plugin import getPlugins
from twisted.python.usage import Options, UsageError
from ..reactors import NoSuchReactor, getReactorTypes, installReactor
from ..runner._exit import ExitStatus, exit
from ..service import IServiceMaker

openFile = open


def _update_doc(opt: Callable[["TwistOptions", str], None], **kwargs: str) -> None:
    """
    Update the docstring of a method that implements an option.
    The string is dedented and the given keyword arguments are substituted.
    """
    opt.__doc__ = dedent(opt.__doc__ or "").format(**kwargs)


class TwistOptions(Options):
    """
    Command line options for C{twist}.
    """

    defaultReactorName = "default"
    defaultLogLevel = LogLevel.info

    def __init__(self) -> None:
        Options.__init__(self)

        self["reactorName"] = self.defaultReactorName
        self["logLevel"] = self.defaultLogLevel
        self["logFile"] = stdout
        # An empty long description is explicitly set here as otherwise
        # when executing from distributed trial twisted.python.usage will
        # pull the description from `__main__` which is another entry point.
        self.longdesc = ""

    def getSynopsis(self) -> str:
        return f"{Options.getSynopsis(self)} plugin [plugin_options]"

    def opt_version(self) -> "typing.NoReturn":
        """
        Print version and exit.
        """
        exit(ExitStatus.EX_OK, f"{version}")

    def opt_reactor(self, name: str) -> None:
        """
        The name of the reactor to use.
        (options: {options})
        """
        # Actually actually actually install the reactor right at this very
        # moment, before any other code (for example, a sub-command plugin)
        # runs and accidentally imports and installs the default reactor.
        try:
            self["reactor"] = self.installReactor(name)
        except NoSuchReactor:
            raise UsageError(f"Unknown reactor: {name}")
        else:
            self["reactorName"] = name

    _update_doc(
        opt_reactor,
        options=", ".join(f'"{rt.shortName}"' for rt in getReactorTypes()),
    )

    def installReactor(self, name: str) -> IReactorCore:
        """
        Install the reactor.
        """
        if name == self.defaultReactorName:
            from twisted.internet import reactor

            return cast(IReactorCore, reactor)
        else:
            return installReactor(name)

    def opt_log_level(self, levelName: str) -> None:
        """
        Set default log level.
        (options: {options}; default: "{default}")
        """
        try:
            self["logLevel"] = LogLevel.levelWithName(levelName)
        except InvalidLogLevelError:
            raise UsageError(f"Invalid log level: {levelName}")

    _update_doc(
        opt_log_level,
        options=", ".join(
            f'"{constant.name}"' for constant in LogLevel.iterconstants()
        ),
        default=defaultLogLevel.name,
    )

    def opt_log_file(self, fileName: str) -> None:
        """
        Log to file. ("-" for stdout, "+" for stderr; default: "-")
        """
        if fileName == "-":
            self["logFile"] = stdout
            return

        if fileName == "+":
            self["logFile"] = stderr
            return

        try:
            self["logFile"] = openFile(fileName, "a")
        except OSError as e:
            exit(
                ExitStatus.EX_IOERR,
                f"Unable to open log file {fileName!r}: {e}",
            )

    def opt_log_format(self, format: str) -> None:
        """
        Log file format.
        (options: "text", "json"; default: "text" if the log file is a tty,
        otherwise "json")
        """
        format = format.lower()

        if format == "text":
            self["fileLogObserverFactory"] = textFileLogObserver
        elif format == "json":
            self["fileLogObserverFactory"] = jsonFileLogObserver
        else:
            raise UsageError(f"Invalid log format: {format}")
        self["logFormat"] = format

    _update_doc(opt_log_format)

    def selectDefaultLogObserver(self) -> None:
        """
        Set C{fileLogObserverFactory} to the default appropriate for the
        chosen C{logFile}.
        """
        if "fileLogObserverFactory" not in self:
            logFile = self["logFile"]

            if hasattr(logFile, "isatty") and logFile.isatty():
                self["fileLogObserverFactory"] = textFileLogObserver
                self["logFormat"] = "text"
            else:
                self["fileLogObserverFactory"] = jsonFileLogObserver
                self["logFormat"] = "json"

    def parseOptions(self, options: Optional[Sequence[str]] = None) -> None:
        self.selectDefaultLogObserver()

        Options.parseOptions(self, options=options)

        if "reactor" not in self:
            self["reactor"] = self.installReactor(self["reactorName"])

    @property
    def plugins(self) -> Mapping[str, IServiceMaker]:
        if "plugins" not in self:
            plugins = {}
            for plugin in getPlugins(IServiceMaker):
                plugins[plugin.tapname] = plugin
            self["plugins"] = plugins

        return cast(Mapping[str, IServiceMaker], self["plugins"])

    @property
    def subCommands(
        self,
    ) -> Iterable[Tuple[str, None, Callable[[IServiceMaker], Options], str]]:
        plugins = self.plugins
        for name in sorted(plugins):
            plugin = plugins[name]

            # Don't pass plugin.options along in order to avoid resolving the
            # options attribute right away, in case it's a property with a
            # non-trivial getter (eg, one which imports modules).
            def options(plugin: IServiceMaker = plugin) -> Options:
                return cast(Options, plugin.options())

            yield (plugin.tapname, None, options, plugin.description)

    def postOptions(self) -> None:
        Options.postOptions(self)

        if self.subCommand is None:
            raise UsageError("No plugin specified.")