aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest-localserver/py3/pytest_localserver/smtp.py
blob: 82dbc394b98d4a60e465cc99c756717bc353a9f7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# Copyright (C) 2011 Sebastian Rahlf <basti at redtoad dot de>
# with some ideas from http://code.activestate.com/recipes/440690/
# SmtpMailsink Copyright 2005 Aviarc Corporation
# Written by Adam Feuer, Matt Branthwaite, and Troy Frever
# which is Licensed under the PSF License
import email

import aiosmtpd.controller


class MessageDetails:
    def __init__(self, peer, mailfrom, rcpttos, *, mail_options=None, rcpt_options=None):
        self.peer = peer
        self.mailfrom = mailfrom
        self.rcpttos = rcpttos
        if mail_options:
            self.mail_options = mail_options
        if rcpt_options:
            self.rcpt_options = rcpt_options


class Handler:
    def __init__(self):
        self.outbox = []

    async def handle_DATA(self, server, session, envelope):
        message = email.message_from_bytes(envelope.content)
        message.details = MessageDetails(session.peer, envelope.mail_from, envelope.rcpt_tos)
        self.outbox.append(message)
        return "250 OK"


class Server(aiosmtpd.controller.Controller):

    """
    Small SMTP test server.

    This is little more than a wrapper around aiosmtpd.controller.Controller
    which offers a slightly different interface for backward compatibility with
    earlier versions of pytest-localserver. You can just as well use a standard
    Controller and pass it a Handler instance.

    Here is how to use this class for sending an email, if you really need to::

        server = Server(port=8080)
        server.start()
        print 'SMTP server is running on %s:%i' % server.addr

        # any e-mail sent to localhost:8080 will end up in server.outbox
        # ...

        server.stop()

    """

    def __init__(self, host="localhost", port=0):
        try:
            super().__init__(Handler(), hostname=host, port=port, server_hostname=host)
        except TypeError:
            # for aiosmtpd <1.3
            super().__init__(Handler(), hostname=host, port=port)

    @property
    def outbox(self):
        return self.handler.outbox

    def _set_server_socket_attributes(self):
        """
        Set the addr and port attributes on this Server instance, if they're not
        already set.
        """

        # I split this out into its own method to allow running this code in
        # aiosmtpd <1.4, which doesn't have the _trigger_server() method on
        # the Controller class. If I put it directly in _trigger_server(), it
        # would fail when calling super()._trigger_server(). In the future, when
        # we can safely require aiosmtpd >=1.4, this method can be inlined
        # directly into _trigger_server().
        if hasattr(self, "addr"):
            assert hasattr(self, "port")
            return

        self.addr = self.server.sockets[0].getsockname()[:2]

        # Work around a bug/missing feature in aiosmtpd (https://github.com/aio-libs/aiosmtpd/issues/276)
        if self.port == 0:
            self.port = self.addr[1]
            assert self.port != 0

    def _trigger_server(self):
        self._set_server_socket_attributes()
        super()._trigger_server()

    def is_alive(self):
        return self._thread is not None and self._thread.is_alive()

    @property
    def accepting(self):
        try:
            return self.server.is_serving()
        except AttributeError:
            # asyncio.base_events.Server.is_serving() only exists in Python 3.6
            # and up. For Python 3.5, asyncio.base_events.BaseEventLoop.is_running()
            # is a close approximation; it should mostly return the same value
            # except for brief periods when the server is starting up or shutting
            # down. Once we drop support for Python 3.5, this branch becomes
            # unnecessary.
            return self.loop.is_running()

    # for aiosmtpd <1.4
    if not hasattr(aiosmtpd.controller.Controller, "_trigger_server"):

        def start(self):
            super().start()
            self._set_server_socket_attributes()

    def stop(self, timeout=None):
        """
        Stops test server.
        :param timeout: When the timeout argument is present and not None, it
        should be a floating point number specifying a timeout for the
        operation in seconds (or fractions thereof).
        """

        # This mostly copies the implementation from Controller.stop(), with two
        # differences:
        # - It removes the assertion that the thread exists, allowing stop() to
        #   be called more than once safely
        # - It passes the timeout argument to Thread.join()
        if self.loop.is_running():
            try:
                self.loop.call_soon_threadsafe(self.cancel_tasks)
            except AttributeError:
                # for aiosmtpd < 1.4.3
                self.loop.call_soon_threadsafe(self._stop)
        if self._thread is not None:
            self._thread.join(timeout)
            self._thread = None
        self._thread_exception = None
        self._factory_invoked = None
        self.server_coro = None
        self.server = None
        self.smtpd = None

    def __del__(self):
        # This is just for backward compatibility, to preserve the behavior that
        # the server is stopped when this object is finalized. But it seems
        # sketchy to rely on this to stop the server. Typically, the server
        # should be stopped "manually", before it gets deleted.
        if self.is_alive():
            self.stop()

    def __repr__(self):  # pragma: no cover
        return "<smtp.Server %s:%s>" % self.addr


def main():
    import time

    server = Server()
    server.start()

    print("SMTP server is running on %s:%i" % server.addr)
    print("Type <Ctrl-C> to stop")

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        pass
    finally:
        print("\rstopping...")
        server.stop()


if __name__ == "__main__":  # pragma: no cover
    main()