diff options
| author | nkozlovskiy <[email protected]> | 2023-09-29 12:24:06 +0300 |
|---|---|---|
| committer | nkozlovskiy <[email protected]> | 2023-09-29 12:41:34 +0300 |
| commit | e0e3e1717e3d33762ce61950504f9637a6e669ed (patch) | |
| tree | bca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/python/pytest-localserver/py2/pytest_localserver | |
| parent | 38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff) | |
add ydb deps
Diffstat (limited to 'contrib/python/pytest-localserver/py2/pytest_localserver')
6 files changed, 584 insertions, 0 deletions
diff --git a/contrib/python/pytest-localserver/py2/pytest_localserver/__init__.py b/contrib/python/pytest-localserver/py2/pytest_localserver/__init__.py new file mode 100644 index 00000000000..efc6d7a0e58 --- /dev/null +++ b/contrib/python/pytest-localserver/py2/pytest_localserver/__init__.py @@ -0,0 +1 @@ +VERSION = '0.5.1.post0' diff --git a/contrib/python/pytest-localserver/py2/pytest_localserver/http.py b/contrib/python/pytest-localserver/py2/pytest_localserver/http.py new file mode 100644 index 00000000000..1ce71562ebd --- /dev/null +++ b/contrib/python/pytest-localserver/py2/pytest_localserver/http.py @@ -0,0 +1,133 @@ +# Copyright (C) 2010-2013 Sebastian Rahlf and others (see AUTHORS). +# +# This program is release under the MIT license. You can find the full text of +# the license in the LICENSE file. + +import json +import sys +import threading + +from werkzeug.serving import make_server +from werkzeug.wrappers import Response, Request + + +class WSGIServer(threading.Thread): + + """ + HTTP server running a WSGI application in its own thread. + """ + + def __init__(self, host='127.0.0.1', port=0, application=None, **kwargs): + self.app = application + self._server = make_server(host, port, self.app, **kwargs) + self.server_address = self._server.server_address + + super(WSGIServer, self).__init__( + name=self.__class__, + target=self._server.serve_forever) + + def __del__(self): + self.stop() + + def stop(self): + self._server.shutdown() + + @property + def url(self): + host, port = self.server_address + proto = 'http' if self._server.ssl_context is None else 'https' + return '%s://%s:%i' % (proto, host, port) + + +class ContentServer(WSGIServer): + + """ + Small test server which can be taught which content (i.e. string) to serve + with which response code. Try the following snippet for testing API calls:: + + server = ContentServer(port=8080) + server.start() + print 'Test server running at http://%s:%i' % server.server_address + + # any request to http://localhost:8080 will get a 503 response. + server.content = 'Hello World!' + server.code = 503 + + # ... + + # we're done + server.stop() + + """ + + def __init__(self, host='127.0.0.1', port=0, ssl_context=None): + super(ContentServer, self).__init__(host, port, self, ssl_context=ssl_context) + self.content, self.code = ('', 204) # HTTP 204: No Content + self.headers = {} + self.show_post_vars = False + self.compress = None + self.requests = [] + + def __call__(self, environ, start_response): + """ + This is the WSGI application. + """ + request = Request(environ) + self.requests.append(request) + if (request.content_type == 'application/x-www-form-urlencoded' + and request.method == 'POST' and self.show_post_vars): + content = json.dumps(request.form) + else: + content = self.content + + response = Response(status=self.code) + response.headers.clear() + response.headers.extend(self.headers) + + # FIXME get compression working! + # if self.compress == 'gzip': + # content = gzip.compress(content.encode('utf-8')) + # response.content_encoding = 'gzip' + + response.data = content + return response(environ, start_response) + + def serve_content(self, content, code=200, headers=None): + """ + Serves string content (with specified HTTP error code) as response to + all subsequent request. + + :param content: content to be displayed + :param code: HTTP status code + :param headers: HTTP headers to be returned + """ + self.content, self.code = (content, code) + if headers: + self.headers = headers + + +if __name__ == '__main__': # pragma: no cover + import os.path + import time + + app = ContentServer() + server = WSGIServer(application=app) + server.start() + + print('HTTP server is running at %s' % server.url) + print('Type <Ctrl-C> to stop') + + try: + path = sys.argv[1] + except IndexError: + path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), '..', 'README.rst') + + app.serve_content(open(path).read(), 302) + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print('\rstopping...') + server.stop() diff --git a/contrib/python/pytest-localserver/py2/pytest_localserver/https.py b/contrib/python/pytest-localserver/py2/pytest_localserver/https.py new file mode 100644 index 00000000000..9f292876984 --- /dev/null +++ b/contrib/python/pytest-localserver/py2/pytest_localserver/https.py @@ -0,0 +1,153 @@ +# Copyright (C) 2010-2013 Sebastian Rahlf and others (see AUTHORS). +# +# This program is release under the MIT license. You can find the full text of +# the license in the LICENSE file. + +import os.path + +from pytest_localserver.http import ContentServer + +#: default server certificate + +DEFAULT_CERTIFICATE = os.path.join(os.getcwd(), 'server.pem') + + +class SecureContentServer (ContentServer): + + """ + Small test server which works just like :class:`http.Server` over HTTP:: + + server = SecureContentServer( + port=8080, key='/srv/my.key', cert='my.certificate') + server.start() + print 'Test server running at %s' % server.url + server.serve_content(open('/path/to/some.file').read()) + # any call to https://localhost:8080 will get the contents of + # /path/to/some.file as a response. + + To avoid *ssl handshake failures* you can import the `pytest-localserver + CA`_ into your browser of choice. + + How to create a self-signed certificate + --------------------------------------- + + If you want to create your own server certificate, you need `OpenSSL`_ + installed on your machine. A self-signed certificate consists of a + certificate and a private key for your server. It can be created with + a command like this, using OpenSSL 1.1.1:: + + openssl req \ + -x509 \ + -newkey rsa:4096 \ + -sha256 \ + -days 3650 \ + -nodes \ + -keyout server.pem \ + -out server.pem \ + -subj "/CN=127.0.0.1/O=pytest-localserver/OU=Testing Dept." \ + -addext "subjectAltName=DNS:localhost" + + Note that both key and certificate are in a single file now named + ``server.pem``. + + How to create your own Certificate Authority + -------------------------------------------- + + Generate a server key and request for signing (csr). Make sure that the + common name (CN) is your IP address/domain name (e.g. ``localhost``). :: + + openssl genpkey \ + -algorithm RSA \ + -pkeyopt rsa_keygen_bits:4096 \ + -out server.key + openssl req \ + -new \ + -addext "subjectAltName=DNS:localhost" \ + -key server.key \ + -out server.csr + + Generate your own CA. Make sure that this time the CN is *not* your IP + address/domain name (e.g. ``localhost CA``). :: + + openssl genpkey \ + -algorithm RSA \ + -pkeyopt rsa_keygen_bits:4096 \ + -aes256 \ + -out ca.key + openssl req \ + -new \ + -x509 \ + -key ca.key \ + -out ca.crt + + Sign the certificate signing request (csr) with the self-created CA that + you made earlier. Note that OpenSSL does not copy the subjectAltName field + from the request (csr), so you have to provide it again as a file. If you + issue subsequent certificates and your browser already knows about previous + ones simply increment the serial number. :: + + echo "subjectAltName=DNS:localhost" >server-extensions.txt + openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key \ + -set_serial 01 -extfile server-extensions.txt -out server.crt + + Create a single file for both key and certificate:: + + cat server.key server.crt > server.pem + + Now you only need to import ``ca.crt`` as a CA in your browser. + + Want to know more? + ------------------ + + This information was compiled from the following sources, which you might + find helpful if you want to dig deeper into `pyOpenSSH`_, certificates and + CAs: + + - http://code.activestate.com/recipes/442473/ + - http://www.tc.umn.edu/~brams006/selfsign.html + - + + A more advanced tutorial can be found `here`_. + + .. _pytest-localserver CA: https://raw.githubusercontent.com/pytest-dev/pytest-localserver/master/pytest_localserver/ca.crt + .. _pyOpenSSH: https://launchpad.net/pyopenssl + """ + + def __init__(self, host='localhost', port=0, + key=DEFAULT_CERTIFICATE, cert=DEFAULT_CERTIFICATE): + """ + :param key: location of file containing the server private key. + :param cert: location of file containing server certificate. + """ + + super(SecureContentServer, self).__init__(host, port, ssl_context=(key, cert)) + + +if __name__ == '__main__': # pragma: no cover + + import sys + import time + + print('Using certificate %s.' % DEFAULT_CERTIFICATE) + + server = SecureContentServer() + server.start() + server.logging = True + + print('HTTPS server is running at %s' % server.url) + print('Type <Ctrl-C> to stop') + + try: + path = sys.argv[1] + except IndexError: + path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), '..', 'README.rst') + + server.serve_content(open(path).read(), 302) + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print('\rstopping...') + server.stop() diff --git a/contrib/python/pytest-localserver/py2/pytest_localserver/plugin.py b/contrib/python/pytest-localserver/py2/pytest_localserver/plugin.py new file mode 100644 index 00000000000..9a9739a50b9 --- /dev/null +++ b/contrib/python/pytest-localserver/py2/pytest_localserver/plugin.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python +# -*- coding: utf8 -*- +# +# Copyright (C) 2011 Sebastian Rahlf <basti at redtoad dot de> +# +# This program is release under the MIT license. You can find the full text of +# the license in the LICENSE file. +import os +import pkgutil + +import pytest + + +def httpserver(request): + """The returned ``httpserver`` provides a threaded HTTP server instance + running on a randomly assigned port on localhost. It can be taught which + content (i.e. string) to serve with which response code and comes with + following attributes: + + * ``code`` - HTTP response code (int) + * ``content`` - content of next response (str) + * ``headers`` - response headers (dict) + + Once these attribute are set, all subsequent requests will be answered with + these values until they are changed or the server is stopped. A more + convenient way to change these is :: + + httpserver.serve_content( + content='My content', code=200, + headers={'content-type': 'text/plain'}) + + The server address can be found in property + + * ``url`` + + which is the string representation of tuple ``server_address`` (host as + str, port as int). + + Example:: + + import requests + def scrape(url): + html = requests.get(url).text + # some parsing happens here + # ... + return result + + def test_retrieve_some_content(httpserver): + httpserver.serve_content(open('cached-content.html').read()) + assert scrape(httpserver.url) == 'Found it!' + + """ + from pytest_localserver import http + server = http.ContentServer() + server.start() + request.addfinalizer(server.stop) + return server + + +def httpsserver(request): + """The returned ``httpsserver`` (note the additional S!) provides a + threaded HTTP server instance similar to funcarg ``httpserver`` but with + SSL encryption. + """ + from pytest_localserver import https + try: + with open(https.DEFAULT_CERTIFICATE, 'wb') as f: + f.write(pkgutil.get_data('pytest_localserver', 'server.pem')) + server = https.SecureContentServer() + server.start() + request.addfinalizer(server.stop) + yield server + finally: + os.remove(https.DEFAULT_CERTIFICATE) + + +def smtpserver(request): + """The returned ``smtpserver`` provides a threaded instance of + ``smtpd.SMTPServer`` running on localhost. It has the following + attributes: + + * ``addr`` - server address as tuple (host as str, port as int) + """ + from pytest_localserver import smtp + server = smtp.Server() + server.start() + request.addfinalizer(server.stop) + return server diff --git a/contrib/python/pytest-localserver/py2/pytest_localserver/server.pem b/contrib/python/pytest-localserver/py2/pytest_localserver/server.pem new file mode 100644 index 00000000000..4f7f1ed3227 --- /dev/null +++ b/contrib/python/pytest-localserver/py2/pytest_localserver/server.pem @@ -0,0 +1,84 @@ +-----BEGIN PRIVATE KEY----- +MIIJRAIBADANBgkqhkiG9w0BAQEFAASCCS4wggkqAgEAAoICAQC31RlRDMN6eGpQ +zZaqtgk7/c5h98PoPNRpFFoUuuWGdf5PlHv4fMym7Zmz2ljx1DqutKhIIUKqS1vh +xd5zMFpheOUTVPVfQc5evgTIm1GF0rMSSaQSFPVOX3nNXGmF1/Jq9YWTc/rb2ns/ +s+Ip1zSKBqDsdRbrkvpSa7cyCkxYcuYtYo5jRa930Fbn4cNj+aA3dxGXd4bLLfnR +BpRA0V5SzBv93MtOK9kngQwQhjBJC/L/acHPO5dzQISBhM9NTSCAH4zm0SlTiExK +DhBdExSbdjAjnJ3k82hNFLUqY1JAm4yVlvwD3vNY4hkf/gWzuQeJIhzK8kE4A+dD +8BZzdHroK9xnnpmSlS7/P0raQd3VZPc8swEDyw9MrdA5UU95b07sUVs0LM0vWhi+ +rwNAJHfiQ77twc0bP7niyy/Kg+UYf7m0i/nyvJFKq75rHOfZvmsPNs+gOdJff+yy +4vv9pmImj2nulgOgrGrzc4ICnx3GpoKmGFDq/p+hqk99P92dFHmwd7c2bYHQNpC9 +BJh8VzrVuyndX2mL5P+/LfmEi8tI06Imykzqtk/UODLJks7ZIrJfYlYmm7aVdrvO +1U2s10AfloCX/ZVO7u3k4lH7Stj+/C8Ap+5Cm4Q46sZGO0Z5b808p4ETcoAI/AAl +OwpHAMi9ueLqJ7J0ykCDl/LrTyNqrQIDAQABAoICACObXRn71Okl5cHc8HAEbml2 +UcFcElAraCEqVgBp6wdOV4HmitSop6M1pm3VvyCoMO2iBG5kMtt1WUiz4NCC7x6u +IgDKlfRrdKOZPqf0nafEFfdW2DbAZHtXtun2GmJYX5YkFElpT4/CE9lU6FueWYja +m9TxIQ1kHKRWRNemcv820iq8SkQkPUaBzjN/4S6+LTBRGdEyz6MPNrIsCg87/n8f +FdToLWDo0Vj7f/C7bSLY86pRO77+Fem293N23AhnBgKLGemjXdPWNKCrdLPyfC1Y +iR58uYCdPPihKC4bqtTkzCg1ZH8DcjMnKCKwOz6CelkviFAu+D73UpYwLMkUKLH3 +p3meFBwa0oEzUUof+W9J5HPnVX6nGR2V4fXkejcJoOBHUaSsuRFiPS4XJMj++DI7 +uiMOt7QljqCKirmCp8tVQ5raT9zwFgNCsR3+gemD1KC3zlXixGs1DyI4x2YwTgKU +c16vnh9fGS9zq/drxqbeMvVbyVZF98LjJfgPxcmyEAXVH46Rs3/KSr6ve6MpRk9G +3vLd7BVfEXoGA1Sha7PRg9OaKBgODfkDRsyZJqqkqHurE4P+8NQZ3mhzdGa4Prj9 +er5BrE3gmvagtQUJf0n+E6HRHGCFoq4i+jOeBw8qiwxgWV0ITfhneQDJF8JvBrzJ +IByC9fVUYB4R4wESRoOBAoIBAQDgJz1etpf/47PwM5A5bdqJEEMaT6d0ycI7QtFQ +1L3PgdRmu1ag2pCYogPQx1zrkMUe4qX0K1h372toRUz0RvoQFYF6qVLJNK/hOd+O +GQ/Rw3XuCvCCs+6QbeQNqUjhLeYf3+TH4IEbIvukIACnDlHvAhtu+IQR7sVwCXda +Slu1zW0ya7Pa/pPnEOQpA9D758/GjcZpe44hCBq+EnrV40Q3jHXEVtsDexq1ubzz +BZEVLr4iwVrEjELZo4pbT+wQx2waRFqTVej5RaQadnSMCdRC0LCTa+t7hfuzN+KN +DBoSUeOlcQ88TyEGvcZXo0jAyDBdN5HC38ujZlkqHHCZVEGxAoIBAQDR81WeHMYW +/vtUhrP3BaJMj3RL/Vmpujac/i9IjdxrP2bi9mweunkZBH9UHNPcJp2b/+uAdSJO +aQRzghCM+DmuOIuu4rB9FU6qpXGhcag126iu328eSYS1sJg5CVGs9ZhaxKk5xbro +1cV0uUS6Gxl2z1Kpsb2dy/zhPTSwf6nrKXYwfrM65+EURz0fniKGfgxs6+p+uTVS +kkLMe2nusJ1KLGrXqfJfa25sQKo1zaRFHLDd0/pgchijvVkhXDY3A7913i+xbQZu +KIfbGp0pH4XFUJn1AR4XqPpE+wmHiLeqEmFJ5xcDl4q3j2dGnO3mHUYYNFOxh1nt +1MCDCCbKJVu9AoIBAQCOUtv4o19nrqC1x0ev7zxvAtBYiHL/CIw3LHnTJQFQHFNM +125tu9lL0LMzgSJSwB0pOye8HTmTDYXZMwdloxtr0vvfcluKPdXe3+w+QVN2EPF0 +L6X+l1jGg7/lnLMVpxsS6gpNjxLqtA+ralZ/u+vyIhhhIZJaAI2EUb5iqgwJJ2JK +PXB5gGNQt7zm/fFXwRyAKcztdPINryOrw/gSjrblvl2YSL3PO/79m+2JMOOp24AG +eVa0rYpUvi4/REPTc4wEMZqBKm8+tyU3WDcwI52Ovwsez8s5Jx1l8fn7LM/xCeXN +SjguRt/lc+HYC2lKXtG2nm4Cmi6mlXnP7zbfZExBAoIBAQC1xAkU+W5ajGjFllWK +gJsx02TpQS92bVxI8Ru4ofD5/QszZgrXU7Px/93I0ahuShRb8eZO8ZpA7lTHOAzi +Lymo9xWf1Gzd7iuMO+4zyrXJ4yGYPKL0QswdjQVNJA9NQdekhezIsrKOUD1CQAAL +a9jQ7s9vUQ2L5wZJbvcF85EFooDLnXXIguZv6vk1PXBApjJVvq3nBqvuj+g7JoHg +/5E9nVTm4CCRke4o1JdIO4CDwUIy2wpCo6VHZXAcHLxnRtxkzHbYEj7l8jska1cz +OjJTUOPppQ0LiOUcAYcPi0MPgBgwplxbZMDZCNNt5AFnH2MHI45t/XPTH0WIa+9B +RbS1AoIBAQCqZri7tm9ngZfvKNNvdVTgBcKukDFek4f7ar0bOkISALtNrn1xXIID +1ggELNy9afTmzPlttqMVQIxSTL3p7LIkZzTsuK0uthbsyzXxLHw2m+oHgaYut7he +j2v7qTmaw7rgpTiORTDg00+5HDtdMmp3Km4aurNasPA80i8Z2ElI20i50LlQ4K5Q +lIqpHR4fwrBr4SLStzvBo9UK1YYQ94FyKd7xou3uXLLTlY3G8rD6jjKJE2Gg8Ga/ +gGzbRCZWH6AOk1iO/CmOPH6AdFn5axXTx+uAML1Lr2VQ+azrYZCtIhKmW/kuQPQg +apeiobcSY1vsX7eM8mQkM8TxrDLyNjtl +-----END PRIVATE KEY----- +-----BEGIN CERTIFICATE----- +MIIFiTCCA3GgAwIBAgIUUpWEFJm0PzrYTkhLe05yIBhBMuowDQYJKoZIhvcNAQEL +BQAwSTESMBAGA1UEAwwJMTI3LjAuMC4xMRswGQYDVQQKDBJweXRlc3QtbG9jYWxz +ZXJ2ZXIxFjAUBgNVBAsMDVRlc3RpbmcgRGVwdC4wHhcNMjEwOTE0MDU1NzAxWhcN +MzEwOTEyMDU1NzAxWjBJMRIwEAYDVQQDDAkxMjcuMC4wLjExGzAZBgNVBAoMEnB5 +dGVzdC1sb2NhbHNlcnZlcjEWMBQGA1UECwwNVGVzdGluZyBEZXB0LjCCAiIwDQYJ +KoZIhvcNAQEBBQADggIPADCCAgoCggIBALfVGVEMw3p4alDNlqq2CTv9zmH3w+g8 +1GkUWhS65YZ1/k+Ue/h8zKbtmbPaWPHUOq60qEghQqpLW+HF3nMwWmF45RNU9V9B +zl6+BMibUYXSsxJJpBIU9U5fec1caYXX8mr1hZNz+tvaez+z4inXNIoGoOx1FuuS ++lJrtzIKTFhy5i1ijmNFr3fQVufhw2P5oDd3EZd3hsst+dEGlEDRXlLMG/3cy04r +2SeBDBCGMEkL8v9pwc87l3NAhIGEz01NIIAfjObRKVOITEoOEF0TFJt2MCOcneTz +aE0UtSpjUkCbjJWW/APe81jiGR/+BbO5B4kiHMryQTgD50PwFnN0eugr3GeemZKV +Lv8/StpB3dVk9zyzAQPLD0yt0DlRT3lvTuxRWzQszS9aGL6vA0Akd+JDvu3BzRs/ +ueLLL8qD5Rh/ubSL+fK8kUqrvmsc59m+aw82z6A50l9/7LLi+/2mYiaPae6WA6Cs +avNzggKfHcamgqYYUOr+n6GqT30/3Z0UebB3tzZtgdA2kL0EmHxXOtW7Kd1faYvk +/78t+YSLy0jToibKTOq2T9Q4MsmSztkisl9iViabtpV2u87VTazXQB+WgJf9lU7u +7eTiUftK2P78LwCn7kKbhDjqxkY7RnlvzTyngRNygAj8ACU7CkcAyL254uonsnTK +QIOX8utPI2qtAgMBAAGjaTBnMB0GA1UdDgQWBBRzl1iPBK4XZwChdNhdPHfjAb/z +BzAfBgNVHSMEGDAWgBRzl1iPBK4XZwChdNhdPHfjAb/zBzAPBgNVHRMBAf8EBTAD +AQH/MBQGA1UdEQQNMAuCCWxvY2FsaG9zdDANBgkqhkiG9w0BAQsFAAOCAgEATk+Q +t6psMrtGeFcZKYdmSFqW3SZUba4l76PzvHRf8nMcB1eFuZ4mCdiv0NgcQkE8c9T+ +i/J4wEmJ+mf1033MP1vQmrGqnaYBsVHNBTaTsP+gLg6Z7AGPvPaL2fwmWWNwTT0O +1352bdz9ORacKSXW3Pq0Vi1pTMho0kAya3VQpl2paqz8qSUG7ijyGQ46VXjgqNZ1 +P5lv+6CWa3AwEQo6Edv1x+HLesRWVqVAkxxhlaGOPQm1cDlpnI4rxuYIMlsb5cNZ +XTAIxw6Es1eqlPcZ96EoGXyIrG7Ej6Yb9447PrC1ulMnIu74cWLY25eu+oVr7Nvk +Gjp2I7qbVjz9Ful0o0M9Wps4RzCgrpO4WeirCK/jFIUpmXJdn7V4mX0h2ako+dal +vczg+bAd4ZedJWHTiqJs9lVMh4/YD7Ck6n+iAZ8Jusq6OhyTY43/Nyp2zQbwQmYv +y3V6JVX+vY4Cq8pR1i8x5FBHnOCMPoT4sbOjKuoFWVi9wH1d65Q1JOo6/0eYzfwJ +nuGUJza7+aCxYNlqxtqX0ItM670ClxB7fuWUpKh5WHrHD2dqBhYwtXOl9yBHrFOJ +O8toKk3PmtlMqVZ8QXmgSqEy7wkfxhjJLgi2AQsqeA6nDrCLtr2pWdqDWoUfxY8r +r5rc71nFLay/H2CbOYELI+20VFMp8GF3kOZbkRA= +-----END CERTIFICATE----- diff --git a/contrib/python/pytest-localserver/py2/pytest_localserver/smtp.py b/contrib/python/pytest-localserver/py2/pytest_localserver/smtp.py new file mode 100644 index 00000000000..d7756ff756d --- /dev/null +++ b/contrib/python/pytest-localserver/py2/pytest_localserver/smtp.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python +# -*- coding: utf8 -*- +# +# Copyright (C) 2011 Sebastian Rahlf <basti at redtoad dot de> +# with some ideas from http://code.activestate.com/recipes/440690/ +# SmtpMailsink Copyright 2005 Aviarc Corporation +# Written by Adam Feuer, Matt Branthwaite, and Troy Frever +# which is Licensed under the PSF License + +import asyncore +import email +import smtpd +import sys +import threading + + +PY35_OR_NEWER = sys.version_info[:2] >= (3, 5) + +class Server (smtpd.SMTPServer, threading.Thread): + + """ + Small SMTP test server. Try the following snippet for sending mail:: + + server = Server(port=8080) + server.start() + print 'SMTP server is running on %s:%i' % server.addr + + # any e-mail sent to localhost:8080 will end up in server.outbox + # ... + + server.stop() + + """ + + WAIT_BETWEEN_CHECKS = 0.001 + + def __init__(self, host='localhost', port=0): + # Workaround for deprecated signature in Python 3.6 + if PY35_OR_NEWER: + smtpd.SMTPServer.__init__(self, (host, port), None, decode_data=True) + else: + smtpd.SMTPServer.__init__(self, (host, port), None) + + if self._localaddr[1] == 0: + self.addr = self.socket.getsockname() + + self.outbox = [] + + # initialise thread + self._stopevent = threading.Event() + self.threadName = self.__class__.__name__ + threading.Thread.__init__(self, name=self.threadName) + + def process_message(self, peer, mailfrom, rcpttos, data, **kwargs): + """ + Adds message to outbox. + """ + try: + message = email.message_from_bytes(data) + except AttributeError: + message = email.message_from_string(data) + # on the message, also set the envelope details + + class Bunch: + def __init__(self, **kwds): + vars(self).update(kwds) + + message.details = Bunch( + peer=peer, + mailfrom=mailfrom, + rcpttos=rcpttos, + **kwargs + ) + self.outbox.append(message) + + def run(self): + """ + Threads run method. + """ + while not self._stopevent.is_set(): + asyncore.loop(timeout=self.WAIT_BETWEEN_CHECKS, count=1) + + def stop(self, timeout=None): + """ + Stops test server. + + :param timeout: When the timeout argument is present and not None, it + should be a floating point number specifying a timeout for the + operation in seconds (or fractions thereof). + """ + self._stopevent.set() + threading.Thread.join(self, timeout) + self.close() + + def __del__(self): + self.stop() + + def __repr__(self): # pragma: no cover + return '<smtp.Server %s:%s>' % self.addr + +if __name__ == "__main__": # pragma: no cover + import time + + server = Server() + server.start() + + print('SMTP server is running on %s:%i' % server.addr) + print('Type <Ctrl-C> to stop') + + try: + + try: + while True: + time.sleep(1) + finally: + print('\rstopping...') + server.stop() + + except KeyboardInterrupt: + # support for Python 2.4 dictates that try ... finally is not used + # together with any except statements + pass |
