blob: a476ca98b79f9be0f58e5c34f99d306cc5604cf3 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
# -*- test-case-name: twisted.test.test_application -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Plugin-based system for enumerating available reactors and installing one of
them.
"""
from typing import Iterable, cast
from zope.interface import Attribute, Interface, implementer
from twisted.internet.interfaces import IReactorCore
from twisted.plugin import IPlugin, getPlugins
from twisted.python.reflect import namedAny
class IReactorInstaller(Interface):
"""
Definition of a reactor which can probably be installed.
"""
shortName = Attribute(
"""
A brief string giving the user-facing name of this reactor.
"""
)
description = Attribute(
"""
A longer string giving a user-facing description of this reactor.
"""
)
def install() -> None:
"""
Install this reactor.
"""
# TODO - A method which provides a best-guess as to whether this reactor
# can actually be used in the execution environment.
class NoSuchReactor(KeyError):
"""
Raised when an attempt is made to install a reactor which cannot be found.
"""
@implementer(IPlugin, IReactorInstaller)
class Reactor:
"""
@ivar moduleName: The fully-qualified Python name of the module of which
the install callable is an attribute.
"""
def __init__(self, shortName: str, moduleName: str, description: str):
self.shortName = shortName
self.moduleName = moduleName
self.description = description
def install(self) -> None:
namedAny(self.moduleName).install()
def getReactorTypes() -> Iterable[IReactorInstaller]:
"""
Return an iterator of L{IReactorInstaller} plugins.
"""
return getPlugins(IReactorInstaller)
def installReactor(shortName: str) -> IReactorCore:
"""
Install the reactor with the given C{shortName} attribute.
@raise NoSuchReactor: If no reactor is found with a matching C{shortName}.
@raise Exception: Anything that the specified reactor can raise when installed.
"""
for installer in getReactorTypes():
if installer.shortName == shortName:
installer.install()
from twisted.internet import reactor
return cast(IReactorCore, reactor)
raise NoSuchReactor(shortName)
|