diff options
author | alexv-smirnov <alex@ydb.tech> | 2023-12-01 12:02:50 +0300 |
---|---|---|
committer | alexv-smirnov <alex@ydb.tech> | 2023-12-01 13:28:10 +0300 |
commit | 0e578a4c44d4abd539d9838347b9ebafaca41dfb (patch) | |
tree | a0c1969c37f818c830ebeff9c077eacf30be6ef8 /contrib/python/pytest-localserver/py3/pytest_localserver/smtp.py | |
parent | 84f2d3d4cc985e63217cff149bd2e6d67ae6fe22 (diff) | |
download | ydb-0e578a4c44d4abd539d9838347b9ebafaca41dfb.tar.gz |
Change "ya.make"
Diffstat (limited to 'contrib/python/pytest-localserver/py3/pytest_localserver/smtp.py')
-rw-r--r-- | contrib/python/pytest-localserver/py3/pytest_localserver/smtp.py | 177 |
1 files changed, 177 insertions, 0 deletions
diff --git a/contrib/python/pytest-localserver/py3/pytest_localserver/smtp.py b/contrib/python/pytest-localserver/py3/pytest_localserver/smtp.py new file mode 100644 index 0000000000..82dbc394b9 --- /dev/null +++ b/contrib/python/pytest-localserver/py3/pytest_localserver/smtp.py @@ -0,0 +1,177 @@ +# Copyright (C) 2011 Sebastian Rahlf <basti at redtoad dot de> +# with some ideas from http://code.activestate.com/recipes/440690/ +# SmtpMailsink Copyright 2005 Aviarc Corporation +# Written by Adam Feuer, Matt Branthwaite, and Troy Frever +# which is Licensed under the PSF License +import email + +import aiosmtpd.controller + + +class MessageDetails: + def __init__(self, peer, mailfrom, rcpttos, *, mail_options=None, rcpt_options=None): + self.peer = peer + self.mailfrom = mailfrom + self.rcpttos = rcpttos + if mail_options: + self.mail_options = mail_options + if rcpt_options: + self.rcpt_options = rcpt_options + + +class Handler: + def __init__(self): + self.outbox = [] + + async def handle_DATA(self, server, session, envelope): + message = email.message_from_bytes(envelope.content) + message.details = MessageDetails(session.peer, envelope.mail_from, envelope.rcpt_tos) + self.outbox.append(message) + return "250 OK" + + +class Server(aiosmtpd.controller.Controller): + + """ + Small SMTP test server. + + This is little more than a wrapper around aiosmtpd.controller.Controller + which offers a slightly different interface for backward compatibility with + earlier versions of pytest-localserver. You can just as well use a standard + Controller and pass it a Handler instance. + + Here is how to use this class for sending an email, if you really need to:: + + server = Server(port=8080) + server.start() + print 'SMTP server is running on %s:%i' % server.addr + + # any e-mail sent to localhost:8080 will end up in server.outbox + # ... + + server.stop() + + """ + + def __init__(self, host="localhost", port=0): + try: + super().__init__(Handler(), hostname=host, port=port, server_hostname=host) + except TypeError: + # for aiosmtpd <1.3 + super().__init__(Handler(), hostname=host, port=port) + + @property + def outbox(self): + return self.handler.outbox + + def _set_server_socket_attributes(self): + """ + Set the addr and port attributes on this Server instance, if they're not + already set. + """ + + # I split this out into its own method to allow running this code in + # aiosmtpd <1.4, which doesn't have the _trigger_server() method on + # the Controller class. If I put it directly in _trigger_server(), it + # would fail when calling super()._trigger_server(). In the future, when + # we can safely require aiosmtpd >=1.4, this method can be inlined + # directly into _trigger_server(). + if hasattr(self, "addr"): + assert hasattr(self, "port") + return + + self.addr = self.server.sockets[0].getsockname()[:2] + + # Work around a bug/missing feature in aiosmtpd (https://github.com/aio-libs/aiosmtpd/issues/276) + if self.port == 0: + self.port = self.addr[1] + assert self.port != 0 + + def _trigger_server(self): + self._set_server_socket_attributes() + super()._trigger_server() + + def is_alive(self): + return self._thread is not None and self._thread.is_alive() + + @property + def accepting(self): + try: + return self.server.is_serving() + except AttributeError: + # asyncio.base_events.Server.is_serving() only exists in Python 3.6 + # and up. For Python 3.5, asyncio.base_events.BaseEventLoop.is_running() + # is a close approximation; it should mostly return the same value + # except for brief periods when the server is starting up or shutting + # down. Once we drop support for Python 3.5, this branch becomes + # unnecessary. + return self.loop.is_running() + + # for aiosmtpd <1.4 + if not hasattr(aiosmtpd.controller.Controller, "_trigger_server"): + + def start(self): + super().start() + self._set_server_socket_attributes() + + def stop(self, timeout=None): + """ + Stops test server. + :param timeout: When the timeout argument is present and not None, it + should be a floating point number specifying a timeout for the + operation in seconds (or fractions thereof). + """ + + # This mostly copies the implementation from Controller.stop(), with two + # differences: + # - It removes the assertion that the thread exists, allowing stop() to + # be called more than once safely + # - It passes the timeout argument to Thread.join() + if self.loop.is_running(): + try: + self.loop.call_soon_threadsafe(self.cancel_tasks) + except AttributeError: + # for aiosmtpd < 1.4.3 + self.loop.call_soon_threadsafe(self._stop) + if self._thread is not None: + self._thread.join(timeout) + self._thread = None + self._thread_exception = None + self._factory_invoked = None + self.server_coro = None + self.server = None + self.smtpd = None + + def __del__(self): + # This is just for backward compatibility, to preserve the behavior that + # the server is stopped when this object is finalized. But it seems + # sketchy to rely on this to stop the server. Typically, the server + # should be stopped "manually", before it gets deleted. + if self.is_alive(): + self.stop() + + def __repr__(self): # pragma: no cover + return "<smtp.Server %s:%s>" % self.addr + + +def main(): + import time + + server = Server() + server.start() + + print("SMTP server is running on %s:%i" % server.addr) + print("Type <Ctrl-C> to stop") + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + pass + finally: + print("\rstopping...") + server.stop() + + +if __name__ == "__main__": # pragma: no cover + main() |