diff options
author | maxim-yurchuk <[email protected]> | 2024-10-09 12:29:46 +0300 |
---|---|---|
committer | maxim-yurchuk <[email protected]> | 2024-10-09 13:14:22 +0300 |
commit | 9731d8a4bb7ee2cc8554eaf133bb85498a4c7d80 (patch) | |
tree | a8fb3181d5947c0d78cf402aa56e686130179049 /contrib/python/pytest-localserver/py3 | |
parent | a44b779cd359f06c3ebbef4ec98c6b38609d9d85 (diff) |
publishFullContrib: true for ydb
<HIDDEN_URL>
commit_hash:c82a80ac4594723cebf2c7387dec9c60217f603e
Diffstat (limited to 'contrib/python/pytest-localserver/py3')
7 files changed, 506 insertions, 0 deletions
diff --git a/contrib/python/pytest-localserver/py3/patches/01-arcadia.patch b/contrib/python/pytest-localserver/py3/patches/01-arcadia.patch new file mode 100644 index 00000000000..a89510c6ef0 --- /dev/null +++ b/contrib/python/pytest-localserver/py3/patches/01-arcadia.patch @@ -0,0 +1,38 @@ +--- contrib/python/pytest-localserver/py3/pytest_localserver/https.py (index) ++++ contrib/python/pytest-localserver/py3/pytest_localserver/https.py (working tree) +@@ -10,1 +10,1 @@ import os.path +-DEFAULT_CERTIFICATE = os.path.join(os.path.abspath(os.path.dirname(__file__)), "server.pem") ++DEFAULT_CERTIFICATE = os.path.join(os.getcwd(), "server.pem") +--- contrib/python/pytest-localserver/py3/pytest_localserver/plugin.py (index) ++++ contrib/python/pytest-localserver/py3/pytest_localserver/plugin.py (working tree) +@@ -4,6 +4,9 @@ + # + # This program is release under the MIT license. You can find the full text of + # the license in the LICENSE file. ++import os ++import pkgutil ++ + import pytest + + +@@ -62,11 +65,15 @@ def httpsserver(request): + SSL encryption. + """ + from pytest_localserver import https +- +- server = https.SecureContentServer() +- server.start() +- request.addfinalizer(server.stop) +- return server ++ try: ++ with open(https.DEFAULT_CERTIFICATE, 'wb') as f: ++ f.write(pkgutil.get_data('pytest_localserver', 'server.pem')) ++ server = https.SecureContentServer() ++ server.start() ++ request.addfinalizer(server.stop) ++ yield server ++ finally: ++ os.remove(https.DEFAULT_CERTIFICATE) + + + @pytest.fixture diff --git a/contrib/python/pytest-localserver/py3/tests/__init__.py b/contrib/python/pytest-localserver/py3/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/contrib/python/pytest-localserver/py3/tests/__init__.py diff --git a/contrib/python/pytest-localserver/py3/tests/test_http.py b/contrib/python/pytest-localserver/py3/tests/test_http.py new file mode 100644 index 00000000000..64859ffd255 --- /dev/null +++ b/contrib/python/pytest-localserver/py3/tests/test_http.py @@ -0,0 +1,297 @@ +import itertools +import subprocess +import sys +import textwrap + +import pytest +import requests + +from pytest_localserver import http +from pytest_localserver import plugin + + +# define test fixture here again in order to run tests without having to +# install the plugin anew every single time +httpserver = plugin.httpserver + + +transfer_encoded = pytest.mark.parametrize( + "transfer_encoding_header", ["Transfer-encoding", "Transfer-Encoding", "transfer-encoding", "TRANSFER-ENCODING"] +) + + +def test_httpserver_funcarg(httpserver): + assert isinstance(httpserver, http.ContentServer) + assert httpserver.is_alive() + assert httpserver.server_address + + +def test_server_does_not_serve_file_at_startup(httpserver): + assert httpserver.code == 204 + assert httpserver.content == "" + + +def test_some_content_retrieval(httpserver): + httpserver.serve_content("TEST!") + resp = requests.get(httpserver.url) + assert resp.text == "TEST!" + assert resp.status_code == 200 + + +def test_request_is_stored(httpserver): + httpserver.serve_content("TEST!") + assert len(httpserver.requests) == 0 + requests.get(httpserver.url) + assert len(httpserver.requests) == 1 + + +def test_GET_request(httpserver): + httpserver.serve_content("TEST!", headers={"Content-type": "text/plain"}) + resp = requests.get(httpserver.url, headers={"User-Agent": "Test method"}) + assert resp.text == "TEST!" + assert resp.status_code == 200 + assert "text/plain" in resp.headers["Content-type"] + + +# FIXME get compression working! +# def test_gzipped_GET_request(httpserver): +# httpserver.serve_content('TEST!', headers={'Content-type': 'text/plain'}) +# httpserver.compress = 'gzip' +# resp = requests.get(httpserver.url, headers={ +# 'User-Agent': 'Test method', +# 'Accept-encoding': 'gzip' +# }) +# assert resp.text == 'TEST!' +# assert resp.status_code == 200 +# assert resp.content_encoding == 'gzip' +# assert resp.headers['Content-type'] == 'text/plain' +# assert resp.headers['content-encoding'] == 'gzip' + + +def test_HEAD_request(httpserver): + httpserver.serve_content("TEST!", headers={"Content-type": "text/plain"}) + resp = requests.head(httpserver.url) + assert resp.status_code == 200 + assert resp.headers["Content-type"] == "text/plain" + + +# def test_POST_request(httpserver): +# headers = {'Content-type': 'application/x-www-form-urlencoded', +# 'set-cookie': 'some _cookie_content'} +# +# httpserver.serve_content('TEST!', headers=headers) +# resp = requests.post(httpserver.url, data={'data': 'value'}, headers=headers) +# assert resp.text == 'TEST!' +# assert resp.status_code == 200 +# +# httpserver.serve_content('TEST!', headers=headers, show_post_vars=True) +# resp = requests.post(httpserver.url, data={'data': 'value'}, headers=headers) +# assert resp.json() == {'data': 'value'} +# assert resp.status_code == 200 + + [email protected]("chunked_flag", [http.Chunked.YES, http.Chunked.AUTO, http.Chunked.NO]) +def test_chunked_attribute_without_header(httpserver, chunked_flag): + """ + Test that passing the chunked attribute to serve_content() properly sets + the chunked property of the server. + """ + httpserver.serve_content(("TEST!", "test"), headers={"Content-type": "text/plain"}, chunked=chunked_flag) + assert httpserver.chunked == chunked_flag + + [email protected]("chunked_flag", [http.Chunked.YES, http.Chunked.AUTO, http.Chunked.NO]) +def test_chunked_attribute_with_header(httpserver, chunked_flag): + """ + Test that passing the chunked attribute to serve_content() properly sets + the chunked property of the server even when the transfer-encoding header is + also set. + """ + httpserver.serve_content( + ("TEST!", "test"), headers={"Content-type": "text/plain", "Transfer-encoding": "chunked"}, chunked=chunked_flag + ) + assert httpserver.chunked == chunked_flag + + +@transfer_encoded [email protected]("chunked_flag", [http.Chunked.YES, http.Chunked.AUTO]) +def test_GET_request_chunked_parameter(httpserver, transfer_encoding_header, chunked_flag): + """ + Test that passing YES or AUTO as the chunked parameter to serve_content() + causes the response to be sent using chunking when the Transfer-encoding + header is also set. + """ + httpserver.serve_content( + ("TEST!", "test"), + headers={"Content-type": "text/plain", transfer_encoding_header: "chunked"}, + chunked=chunked_flag, + ) + resp = requests.get(httpserver.url, headers={"User-Agent": "Test method"}) + assert resp.text == "TEST!test" + assert resp.status_code == 200 + assert "text/plain" in resp.headers["Content-type"] + assert "chunked" in resp.headers["Transfer-encoding"] + + +@transfer_encoded [email protected]("chunked_flag", [http.Chunked.YES, http.Chunked.AUTO]) +def test_GET_request_chunked_attribute(httpserver, transfer_encoding_header, chunked_flag): + """ + Test that setting the chunked attribute of httpserver to YES or AUTO + causes the response to be sent using chunking when the Transfer-encoding + header is also set. + """ + httpserver.serve_content( + ("TEST!", "test"), headers={"Content-type": "text/plain", transfer_encoding_header: "chunked"} + ) + httpserver.chunked = chunked_flag + resp = requests.get(httpserver.url, headers={"User-Agent": "Test method"}) + assert resp.text == "TEST!test" + assert resp.status_code == 200 + assert "text/plain" in resp.headers["Content-type"] + assert "chunked" in resp.headers["Transfer-encoding"] + + +@transfer_encoded +def test_GET_request_not_chunked(httpserver, transfer_encoding_header): + """ + Test that setting the chunked attribute of httpserver to NO causes + the response not to be sent using chunking even if the Transfer-encoding + header is set. + """ + httpserver.serve_content( + ("TEST!", "test"), + headers={"Content-type": "text/plain", transfer_encoding_header: "chunked"}, + chunked=http.Chunked.NO, + ) + with pytest.raises(requests.exceptions.ChunkedEncodingError): + requests.get(httpserver.url, headers={"User-Agent": "Test method"}) + + [email protected]("chunked_flag", [http.Chunked.NO, http.Chunked.AUTO]) +def test_GET_request_chunked_parameter_no_header(httpserver, chunked_flag): + """ + Test that passing NO or AUTO as the chunked parameter to serve_content() + causes the response not to be sent using chunking when the Transfer-encoding + header is not set. + """ + httpserver.serve_content(("TEST!", "test"), headers={"Content-type": "text/plain"}, chunked=chunked_flag) + resp = requests.get(httpserver.url, headers={"User-Agent": "Test method"}) + assert resp.text == "TEST!test" + assert resp.status_code == 200 + assert "text/plain" in resp.headers["Content-type"] + assert "Transfer-encoding" not in resp.headers + + [email protected]("chunked_flag", [http.Chunked.NO, http.Chunked.AUTO]) +def test_GET_request_chunked_attribute_no_header(httpserver, chunked_flag): + """ + Test that setting the chunked attribute of httpserver to NO or AUTO + causes the response not to be sent using chunking when the Transfer-encoding + header is not set. + """ + httpserver.serve_content(("TEST!", "test"), headers={"Content-type": "text/plain"}) + httpserver.chunked = chunked_flag + resp = requests.get(httpserver.url, headers={"User-Agent": "Test method"}) + assert resp.text == "TEST!test" + assert resp.status_code == 200 + assert "text/plain" in resp.headers["Content-type"] + assert "Transfer-encoding" not in resp.headers + + +def test_GET_request_chunked_no_header(httpserver): + """ + Test that setting the chunked attribute of httpserver to YES causes + the response to be sent using chunking even if the Transfer-encoding + header is not set. + """ + httpserver.serve_content(("TEST!", "test"), headers={"Content-type": "text/plain"}, chunked=http.Chunked.YES) + resp = requests.get(httpserver.url, headers={"User-Agent": "Test method"}) + # Without the Transfer-encoding header set, requests does not undo the chunk + # encoding so it comes through as "raw" chunks + assert resp.text == "5\r\nTEST!\r\n4\r\ntest\r\n0\r\n\r\n" + + +def _format_chunk(chunk): + r = repr(chunk) + if len(r) <= 40: + return r + else: + return r[:13] + "..." + r[-14:] + " (length {})".format(len(chunk)) + + +def _compare_chunks(expected, actual): + __tracebackhide__ = True + if expected != actual: + message = [_format_chunk(expected) + " != " + _format_chunk(actual)] + if type(expected) == type(actual): + for i, (e, a) in enumerate(itertools.zip_longest(expected, actual, fillvalue="<end>")): + if e != a: + message += [ + " Chunks differ at index {}:".format(i), + " Expected: " + (repr(expected[i : i + 5]) + "..." if e != "<end>" else "<end>"), + " Found: " + (repr(actual[i : i + 5]) + "..." if a != "<end>" else "<end>"), + ] + break + pytest.fail("\n".join(message)) + + [email protected]("chunk_size", [400, 499, 500, 512, 750, 1024, 4096, 8192]) +def test_GET_request_large_chunks(httpserver, chunk_size): + """ + Test that a response with large chunks comes through correctly + """ + body = b"0123456789abcdef" * 1024 # 16 kb total + # Split body into fixed-size chunks, from https://stackoverflow.com/a/18854817/56541 + chunks = [body[0 + i : chunk_size + i] for i in range(0, len(body), chunk_size)] + httpserver.serve_content( + chunks, headers={"Content-type": "text/plain", "Transfer-encoding": "chunked"}, chunked=http.Chunked.YES + ) + resp = requests.get(httpserver.url, headers={"User-Agent": "Test method"}, stream=True) + assert resp.status_code == 200 + text = b"" + for original_chunk, received_chunk in itertools.zip_longest(chunks, resp.iter_content(chunk_size=None)): + _compare_chunks(original_chunk, received_chunk) + text += received_chunk + assert text == body + assert "chunked" in resp.headers["Transfer-encoding"] + + [email protected]("chunked_flag", [http.Chunked.YES, http.Chunked.AUTO]) +def test_GET_request_chunked_no_content_length(httpserver, chunked_flag): + """ + Test that a chunked response does not include a Content-length header + """ + httpserver.serve_content( + ("TEST!", "test"), headers={"Content-type": "text/plain", "Transfer-encoding": "chunked"}, chunked=chunked_flag + ) + resp = requests.get(httpserver.url, headers={"User-Agent": "Test method"}) + assert resp.status_code == 200 + assert "Transfer-encoding" in resp.headers + assert "Content-length" not in resp.headers + + +def test_httpserver_init_failure_no_stderr_during_cleanup(tmp_path): + """ + Test that, when the server encounters an error during __init__, its cleanup + does not raise an AttributeError in its __del__ method, which would emit a + warning onto stderr. + """ + + script_path = tmp_path.joinpath("script.py") + + script_path.write_text(textwrap.dedent(""" + from pytest_localserver import http + from unittest.mock import patch + + with patch("pytest_localserver.http.make_server", side_effect=RuntimeError("init failure")): + server = http.ContentServer() + """)) + + result = subprocess.run([sys.executable, str(script_path)], stderr=subprocess.PIPE) + + # We ensure that no warning about an AttributeError is printed on stderr + # due to an error in the server's __del__ method. This AttributeError is + # raised during cleanup and doesn't affect the exit code of the script, so + # we can't use the exit code to tell whether the script did its job. + assert b"AttributeError" not in result.stderr diff --git a/contrib/python/pytest-localserver/py3/tests/test_https.py b/contrib/python/pytest-localserver/py3/tests/test_https.py new file mode 100644 index 00000000000..4c676d17533 --- /dev/null +++ b/contrib/python/pytest-localserver/py3/tests/test_https.py @@ -0,0 +1,43 @@ +import requests + +from pytest_localserver import https +from pytest_localserver import plugin + + +# define test fixture here again in order to run tests without having to +# install the plugin anew every single time +httpsserver = plugin.httpsserver + + +def test_httpsserver_funcarg(httpsserver): + assert isinstance(httpsserver, https.SecureContentServer) + assert httpsserver.is_alive() + assert httpsserver.server_address + + +def test_server_does_not_serve_file_at_startup(httpsserver): + assert httpsserver.code == 204 + assert httpsserver.content == "" + + +def test_some_content_retrieval(httpsserver): + httpsserver.serve_content("TEST!") + resp = requests.get(httpsserver.url, verify=False) + assert resp.text == "TEST!" + assert resp.status_code == 200 + + +def test_GET_request(httpsserver): + httpsserver.serve_content("TEST!", headers={"Content-type": "text/plain"}) + resp = requests.get(httpsserver.url, headers={"User-Agent": "Test method"}, verify=False) + assert resp.text == "TEST!" + assert resp.status_code == 200 + assert "text/plain" in resp.headers["Content-type"] + + +def test_HEAD_request(httpsserver): + httpsserver.serve_content("TEST!", headers={"Content-type": "text/plain"}) + print(httpsserver.url) + resp = requests.head(httpsserver.url, verify=False) + assert resp.status_code == 200 + assert resp.headers["Content-type"] == "text/plain" diff --git a/contrib/python/pytest-localserver/py3/tests/test_smtp.py b/contrib/python/pytest-localserver/py3/tests/test_smtp.py new file mode 100644 index 00000000000..7fc3882819d --- /dev/null +++ b/contrib/python/pytest-localserver/py3/tests/test_smtp.py @@ -0,0 +1,102 @@ +import smtplib + +import pytest + +try: # python 3 + from email.mime.text import MIMEText +except ImportError: # python 2? + from email.MIMEText import MIMEText + +from pytest_localserver import plugin + + +smtp = pytest.importorskip("pytest_localserver.smtp") + + +def send_plain_email(to, from_, subject, txt, server=("localhost", 25)): + """ + Sends a simple plain text message via SMTP. + """ + if type(to) in (tuple, list): + to = ", ".join(to) + + # Create a text/plain message + msg = MIMEText(txt) + msg["Subject"] = subject + msg["From"] = from_ + msg["To"] = to + + host, port = server[:2] + server = smtplib.SMTP(host, port) + server.set_debuglevel(1) + server.sendmail(from_, to, msg.as_string()) + server.quit() + + +# define test fixture here again in order to run tests without having to +# install the plugin anew every single time +smtpserver = plugin.smtpserver + + +def test_smtpserver_funcarg(smtpserver): + assert isinstance(smtpserver, smtp.Server) + assert smtpserver.is_alive() + assert smtpserver.accepting and smtpserver.addr + + +def test_smtpserver_addr(smtpserver): + host, port = smtpserver.addr + assert isinstance(host, str) + assert isinstance(port, int) + assert port > 0 + + +def test_server_is_killed(smtpserver): + assert smtpserver.is_alive() + smtpserver.stop() + assert not smtpserver.is_alive() + + +def test_server_is_deleted(smtpserver): + assert smtpserver.is_alive() + smtpserver.__del__() # need to call magic method here! + assert not smtpserver.is_alive() + + +def test_smtpserver_has_empty_outbox_at_startup(smtpserver): + assert len(smtpserver.outbox) == 0 + + +def test_send_email(smtpserver): + # send one e-mail + send_plain_email( + "[email protected]", + "[email protected]", + "Your e-mail is getting there", + "Seems like this test actually works!", + smtpserver.addr, + ) + msg = smtpserver.outbox[-1] + assert msg["To"] == "[email protected]" + assert msg["From"] == "[email protected]" + assert msg["Subject"] == "Your e-mail is getting there" + assert msg.details.rcpttos == ["[email protected]"] + assert msg.details.peer + assert msg.details.mailfrom + + # send another e-mail + send_plain_email( + "[email protected]", + "[email protected]", + "His e-mail too", + "Seems like this test actually works!", + smtpserver.addr, + ) + + msg = smtpserver.outbox[-1] + assert msg["To"] == "[email protected]" + assert msg["From"] == "[email protected]" + assert msg["Subject"] == "His e-mail too" + + # two mails are now in outbox + assert len(smtpserver.outbox) == 2 diff --git a/contrib/python/pytest-localserver/py3/tests/test_version.py b/contrib/python/pytest-localserver/py3/tests/test_version.py new file mode 100644 index 00000000000..cc7246fb528 --- /dev/null +++ b/contrib/python/pytest-localserver/py3/tests/test_version.py @@ -0,0 +1,6 @@ +import pytest_localserver + + +def test_version(): + assert hasattr(pytest_localserver, "VERSION") + assert isinstance(pytest_localserver.VERSION, str) diff --git a/contrib/python/pytest-localserver/py3/tests/ya.make b/contrib/python/pytest-localserver/py3/tests/ya.make new file mode 100644 index 00000000000..58784ecb8e5 --- /dev/null +++ b/contrib/python/pytest-localserver/py3/tests/ya.make @@ -0,0 +1,20 @@ +PY3TEST() + +SUBSCRIBER(g:python-contrib) + +PEERDIR( + contrib/python/pytest-localserver + contrib/python/requests + contrib/python/aiosmtpd +) + +TEST_SRCS( + test_http.py + test_https.py + test_smtp.py + test_version.py +) + +NO_LINT() + +END() |