aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest-localserver/py3/pytest_localserver/smtp.py
diff options
context:
space:
mode:
authoralexv-smirnov <alex@ydb.tech>2023-12-01 12:02:50 +0300
committeralexv-smirnov <alex@ydb.tech>2023-12-01 13:28:10 +0300
commit0e578a4c44d4abd539d9838347b9ebafaca41dfb (patch)
treea0c1969c37f818c830ebeff9c077eacf30be6ef8 /contrib/python/pytest-localserver/py3/pytest_localserver/smtp.py
parent84f2d3d4cc985e63217cff149bd2e6d67ae6fe22 (diff)
downloadydb-0e578a4c44d4abd539d9838347b9ebafaca41dfb.tar.gz
Change "ya.make"
Diffstat (limited to 'contrib/python/pytest-localserver/py3/pytest_localserver/smtp.py')
-rw-r--r--contrib/python/pytest-localserver/py3/pytest_localserver/smtp.py177
1 files changed, 177 insertions, 0 deletions
diff --git a/contrib/python/pytest-localserver/py3/pytest_localserver/smtp.py b/contrib/python/pytest-localserver/py3/pytest_localserver/smtp.py
new file mode 100644
index 0000000000..82dbc394b9
--- /dev/null
+++ b/contrib/python/pytest-localserver/py3/pytest_localserver/smtp.py
@@ -0,0 +1,177 @@
+# Copyright (C) 2011 Sebastian Rahlf <basti at redtoad dot de>
+# with some ideas from http://code.activestate.com/recipes/440690/
+# SmtpMailsink Copyright 2005 Aviarc Corporation
+# Written by Adam Feuer, Matt Branthwaite, and Troy Frever
+# which is Licensed under the PSF License
+import email
+
+import aiosmtpd.controller
+
+
+class MessageDetails:
+ def __init__(self, peer, mailfrom, rcpttos, *, mail_options=None, rcpt_options=None):
+ self.peer = peer
+ self.mailfrom = mailfrom
+ self.rcpttos = rcpttos
+ if mail_options:
+ self.mail_options = mail_options
+ if rcpt_options:
+ self.rcpt_options = rcpt_options
+
+
+class Handler:
+ def __init__(self):
+ self.outbox = []
+
+ async def handle_DATA(self, server, session, envelope):
+ message = email.message_from_bytes(envelope.content)
+ message.details = MessageDetails(session.peer, envelope.mail_from, envelope.rcpt_tos)
+ self.outbox.append(message)
+ return "250 OK"
+
+
+class Server(aiosmtpd.controller.Controller):
+
+ """
+ Small SMTP test server.
+
+ This is little more than a wrapper around aiosmtpd.controller.Controller
+ which offers a slightly different interface for backward compatibility with
+ earlier versions of pytest-localserver. You can just as well use a standard
+ Controller and pass it a Handler instance.
+
+ Here is how to use this class for sending an email, if you really need to::
+
+ server = Server(port=8080)
+ server.start()
+ print 'SMTP server is running on %s:%i' % server.addr
+
+ # any e-mail sent to localhost:8080 will end up in server.outbox
+ # ...
+
+ server.stop()
+
+ """
+
+ def __init__(self, host="localhost", port=0):
+ try:
+ super().__init__(Handler(), hostname=host, port=port, server_hostname=host)
+ except TypeError:
+ # for aiosmtpd <1.3
+ super().__init__(Handler(), hostname=host, port=port)
+
+ @property
+ def outbox(self):
+ return self.handler.outbox
+
+ def _set_server_socket_attributes(self):
+ """
+ Set the addr and port attributes on this Server instance, if they're not
+ already set.
+ """
+
+ # I split this out into its own method to allow running this code in
+ # aiosmtpd <1.4, which doesn't have the _trigger_server() method on
+ # the Controller class. If I put it directly in _trigger_server(), it
+ # would fail when calling super()._trigger_server(). In the future, when
+ # we can safely require aiosmtpd >=1.4, this method can be inlined
+ # directly into _trigger_server().
+ if hasattr(self, "addr"):
+ assert hasattr(self, "port")
+ return
+
+ self.addr = self.server.sockets[0].getsockname()[:2]
+
+ # Work around a bug/missing feature in aiosmtpd (https://github.com/aio-libs/aiosmtpd/issues/276)
+ if self.port == 0:
+ self.port = self.addr[1]
+ assert self.port != 0
+
+ def _trigger_server(self):
+ self._set_server_socket_attributes()
+ super()._trigger_server()
+
+ def is_alive(self):
+ return self._thread is not None and self._thread.is_alive()
+
+ @property
+ def accepting(self):
+ try:
+ return self.server.is_serving()
+ except AttributeError:
+ # asyncio.base_events.Server.is_serving() only exists in Python 3.6
+ # and up. For Python 3.5, asyncio.base_events.BaseEventLoop.is_running()
+ # is a close approximation; it should mostly return the same value
+ # except for brief periods when the server is starting up or shutting
+ # down. Once we drop support for Python 3.5, this branch becomes
+ # unnecessary.
+ return self.loop.is_running()
+
+ # for aiosmtpd <1.4
+ if not hasattr(aiosmtpd.controller.Controller, "_trigger_server"):
+
+ def start(self):
+ super().start()
+ self._set_server_socket_attributes()
+
+ def stop(self, timeout=None):
+ """
+ Stops test server.
+ :param timeout: When the timeout argument is present and not None, it
+ should be a floating point number specifying a timeout for the
+ operation in seconds (or fractions thereof).
+ """
+
+ # This mostly copies the implementation from Controller.stop(), with two
+ # differences:
+ # - It removes the assertion that the thread exists, allowing stop() to
+ # be called more than once safely
+ # - It passes the timeout argument to Thread.join()
+ if self.loop.is_running():
+ try:
+ self.loop.call_soon_threadsafe(self.cancel_tasks)
+ except AttributeError:
+ # for aiosmtpd < 1.4.3
+ self.loop.call_soon_threadsafe(self._stop)
+ if self._thread is not None:
+ self._thread.join(timeout)
+ self._thread = None
+ self._thread_exception = None
+ self._factory_invoked = None
+ self.server_coro = None
+ self.server = None
+ self.smtpd = None
+
+ def __del__(self):
+ # This is just for backward compatibility, to preserve the behavior that
+ # the server is stopped when this object is finalized. But it seems
+ # sketchy to rely on this to stop the server. Typically, the server
+ # should be stopped "manually", before it gets deleted.
+ if self.is_alive():
+ self.stop()
+
+ def __repr__(self): # pragma: no cover
+ return "<smtp.Server %s:%s>" % self.addr
+
+
+def main():
+ import time
+
+ server = Server()
+ server.start()
+
+ print("SMTP server is running on %s:%i" % server.addr)
+ print("Type <Ctrl-C> to stop")
+
+ try:
+ while True:
+ time.sleep(1)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ print("\rstopping...")
+ server.stop()
+
+
+if __name__ == "__main__": # pragma: no cover
+ main()