diff options
Diffstat (limited to 'contrib/python/pyOpenSSL/py3/tests')
| -rw-r--r-- | contrib/python/pyOpenSSL/py3/tests/memdbg.py | 1 | ||||
| -rw-r--r-- | contrib/python/pyOpenSSL/py3/tests/test_crypto.py | 363 | ||||
| -rw-r--r-- | contrib/python/pyOpenSSL/py3/tests/test_debug.py | 2 | ||||
| -rw-r--r-- | contrib/python/pyOpenSSL/py3/tests/test_rand.py | 2 | ||||
| -rw-r--r-- | contrib/python/pyOpenSSL/py3/tests/test_ssl.py | 709 | ||||
| -rw-r--r-- | contrib/python/pyOpenSSL/py3/tests/test_util.py | 2 | ||||
| -rw-r--r-- | contrib/python/pyOpenSSL/py3/tests/util.py | 13 |
7 files changed, 701 insertions, 391 deletions
diff --git a/contrib/python/pyOpenSSL/py3/tests/memdbg.py b/contrib/python/pyOpenSSL/py3/tests/memdbg.py index 590b72d0682..0dd8c318e4e 100644 --- a/contrib/python/pyOpenSSL/py3/tests/memdbg.py +++ b/contrib/python/pyOpenSSL/py3/tests/memdbg.py @@ -1,5 +1,4 @@ import sys - import traceback from cffi import api as _api diff --git a/contrib/python/pyOpenSSL/py3/tests/test_crypto.py b/contrib/python/pyOpenSSL/py3/tests/test_crypto.py index ef3429d17fd..44bbd0f2aba 100644 --- a/contrib/python/pyOpenSSL/py3/tests/test_crypto.py +++ b/contrib/python/pyOpenSSL/py3/tests/test_crypto.py @@ -4,56 +4,65 @@ """ Unit tests for :py:mod:`OpenSSL.crypto`. """ - -from warnings import simplefilter - import base64 -from subprocess import PIPE, Popen -from datetime import datetime, timedelta import sys - -import pytest +from datetime import datetime, timedelta +from subprocess import PIPE, Popen +from warnings import simplefilter from cryptography import x509 -from cryptography.hazmat.backends.openssl.backend import backend from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.asymmetric import ec, ed25519, ed448, rsa import flaky -from OpenSSL.crypto import TYPE_RSA, TYPE_DSA, Error, PKey -from OpenSSL.crypto import X509, X509Name +import pytest + +from OpenSSL._util import ffi as _ffi, lib as _lib from OpenSSL.crypto import ( + CRL, + Error, + FILETYPE_ASN1, + FILETYPE_PEM, + FILETYPE_TEXT, + NetscapeSPKI, + PKCS12, + PKCS7, + PKey, + Revoked, + TYPE_DSA, + TYPE_RSA, + X509, + X509Extension, + X509Name, + X509Req, X509Store, - X509StoreFlags, X509StoreContext, X509StoreContextError, -) -from OpenSSL.crypto import X509Req -from OpenSSL.crypto import X509Extension -from OpenSSL.crypto import load_certificate, load_privatekey -from OpenSSL.crypto import load_publickey, dump_publickey -from OpenSSL.crypto import FILETYPE_PEM, FILETYPE_ASN1, FILETYPE_TEXT -from OpenSSL.crypto import dump_certificate, load_certificate_request -from OpenSSL.crypto import dump_certificate_request, dump_privatekey -from OpenSSL.crypto import PKCS7, load_pkcs7_data -from OpenSSL.crypto import PKCS12, load_pkcs12 -from OpenSSL.crypto import CRL, Revoked, dump_crl, load_crl -from OpenSSL.crypto import NetscapeSPKI -from OpenSSL.crypto import ( - sign, - verify, + X509StoreFlags, + dump_certificate, + dump_certificate_request, + dump_crl, + dump_privatekey, + dump_publickey, get_elliptic_curve, get_elliptic_curves, + load_certificate, + load_certificate_request, + load_crl, + load_pkcs12, + load_pkcs7_data, + load_privatekey, + load_publickey, + sign, + verify, ) -from OpenSSL._util import ffi as _ffi, lib as _lib - from .util import ( EqualityTestsMixin, - is_consistent_type, - WARNING_TYPE_EXPECTED, NON_ASCII, + WARNING_TYPE_EXPECTED, + is_consistent_type, ) @@ -64,7 +73,7 @@ def normalize_privatekey_pem(pem): GOOD_CIPHER = "blowfish" BAD_CIPHER = "zippers" -GOOD_DIGEST = "SHA1" +GOOD_DIGEST = "SHA256" BAD_DIGEST = "monkeys" old_root_cert_pem = b"""-----BEGIN CERTIFICATE----- @@ -609,9 +618,10 @@ vrzEeLDRiiPl92dyyWmu -----END X509 CRL----- """ +# The signature on this CRL is invalid. crlDataUnsupportedExtension = b"""\ -----BEGIN X509 CRL----- -MIIGRzCCBS8CAQIwDQYJKoZIhvcNAQELBQAwJzELMAkGA1UEBhMCVVMxGDAWBgNV +MIIGRzCCBS8CAQEwDQYJKoZIhvcNAQELBQAwJzELMAkGA1UEBhMCVVMxGDAWBgNV BAMMD2NyeXB0b2dyYXBoeS5pbxgPMjAxNTAxMDEwMDAwMDBaGA8yMDE2MDEwMTAw MDAwMFowggTOMBQCAQAYDzIwMTUwMTAxMDAwMDAwWjByAgEBGA8yMDE1MDEwMTAw MDAwMFowXDAYBgNVHRgEERgPMjAxNTAxMDEwMDAwMDBaMDQGA1UdHQQtMCukKTAn @@ -783,6 +793,21 @@ MBsCAQACAS0CAQcCAQACAQ8CAQMCAQACAQACAQA= -----END RSA PRIVATE KEY----- """ +ed25519_private_key_pem = b"""-----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VwBCIEIKlxBbhVsSURoLTmsu9uTqYH6oF7zpxmp1ZQCAPhDmI2 +-----END PRIVATE KEY----- +""" + +ed448_private_key_pem = b"""-----BEGIN PRIVATE KEY----- +MEcCAQAwBQYDK2VxBDsEOcqZ7a3k6JwrJbYO8CNTPT/d7dlWCo5vCf0EYDj79ZvA\nhD8u9EPHlYJw5Y8ZQdH4WmVEfpKA23xkdQ== +-----END PRIVATE KEY----- +""" + +x25519_private_key_pem = b"""-----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VuBCIEIPAjVfPNTm25VxtBRg+JjjFx9tA3M8aaBdVhjb92iBts +-----END PRIVATE KEY----- +""" + @pytest.fixture def x509_data(): @@ -809,7 +834,7 @@ def x509_data(): yield pkey, x509 -class TestX509Ext(object): +class TestX509Ext: """ Tests for `OpenSSL.crypto.X509Extension`. """ @@ -914,7 +939,7 @@ class TestX509Ext(object): b"basicConstraints", False, b"CA:TRUE", subject=x509 ) x509.add_extensions([ext1]) - x509.sign(pkey, "sha1") + x509.sign(pkey, "sha256") # This is a little lame. Can we think of a better way? text = dump_certificate(FILETYPE_TEXT, x509) assert b"X509v3 Basic Constraints:" in text @@ -930,7 +955,7 @@ class TestX509Ext(object): b"subjectKeyIdentifier", False, b"hash", subject=x509 ) x509.add_extensions([ext3]) - x509.sign(pkey, "sha1") + x509.sign(pkey, "sha256") text = dump_certificate(FILETYPE_TEXT, x509) assert b"X509v3 Subject Key Identifier:" in text @@ -963,7 +988,7 @@ class TestX509Ext(object): b"basicConstraints", False, b"CA:TRUE", issuer=x509 ) x509.add_extensions([ext1]) - x509.sign(pkey, "sha1") + x509.sign(pkey, "sha256") text = dump_certificate(FILETYPE_TEXT, x509) assert b"X509v3 Basic Constraints:" in text assert b"CA:TRUE" in text @@ -978,7 +1003,7 @@ class TestX509Ext(object): b"authorityKeyIdentifier", False, b"issuer:always", issuer=x509 ) x509.add_extensions([ext2]) - x509.sign(pkey, "sha1") + x509.sign(pkey, "sha256") text = dump_certificate(FILETYPE_TEXT, x509) assert b"X509v3 Authority Key Identifier:" in text assert b"DirName:/CN=Yoda root CA" in text @@ -1008,22 +1033,40 @@ class TestX509Ext(object): ) -class TestPKey(object): +class TestPKey: """ Tests for `OpenSSL.crypto.PKey`. """ - def test_convert_from_cryptography_private_key(self): + @pytest.mark.parametrize( + ("key_string", "key_type"), + [ + (intermediate_key_pem, rsa.RSAPrivateKey), + (ec_private_key_pem, ec.EllipticCurvePrivateKey), + (ed25519_private_key_pem, ed25519.Ed25519PrivateKey), + (ed448_private_key_pem, ed448.Ed448PrivateKey), + ], + ) + def test_convert_roundtrip_cryptography_private_key( + self, key_string, key_type + ): """ PKey.from_cryptography_key creates a proper private PKey. + PKey.to_cryptography_key creates a proper cryptography private key. """ - key = serialization.load_pem_private_key( - intermediate_key_pem, None, backend - ) + key = serialization.load_pem_private_key(key_string, None) pkey = PKey.from_cryptography_key(key) assert isinstance(pkey, PKey) - assert pkey.bits() == key.key_size + parsed_key = pkey.to_cryptography_key() + assert isinstance(parsed_key, key_type) + assert parsed_key.public_key().public_bytes( + serialization.Encoding.PEM, + serialization.PublicFormat.SubjectPublicKeyInfo, + ) == key.public_key().public_bytes( + serialization.Encoding.PEM, + serialization.PublicFormat.SubjectPublicKeyInfo, + ) assert pkey._only_public is False assert pkey._initialized is True @@ -1031,7 +1074,7 @@ class TestPKey(object): """ PKey.from_cryptography_key creates a proper public PKey. """ - key = serialization.load_pem_public_key(cleartextPublicKeyPEM, backend) + key = serialization.load_pem_public_key(cleartextPublicKeyPEM) pkey = PKey.from_cryptography_key(key) assert isinstance(pkey, PKey) @@ -1043,9 +1086,7 @@ class TestPKey(object): """ PKey.from_cryptography_key raises TypeError with an unsupported type. """ - key = serialization.load_pem_private_key( - ec_private_key_pem, None, backend - ) + key = serialization.load_pem_private_key(x25519_private_key_pem, None) with pytest.raises(TypeError): PKey.from_cryptography_key(key) @@ -1059,16 +1100,6 @@ class TestPKey(object): assert isinstance(key, rsa.RSAPublicKey) assert pkey.bits() == key.key_size - def test_convert_private_pkey_to_cryptography_key(self): - """ - PKey.to_cryptography_key creates a proper cryptography private key. - """ - pkey = load_privatekey(FILETYPE_PEM, root_key_pem) - key = pkey.to_cryptography_key() - - assert isinstance(key, rsa.RSAPrivateKey) - assert pkey.bits() == key.key_size - def test_type(self): """ `PKey` can be used to create instances of that type. @@ -1175,10 +1206,11 @@ class TestPKey(object): def test_inconsistent_key(self): """ - `PKey.check` returns `Error` if the key is not consistent. + Either `load_privatekey` or `PKey.check` returns `Error` if the key is + not consistent. """ - key = load_privatekey(FILETYPE_PEM, inconsistentPrivateKeyPEM) with pytest.raises(Error): + key = load_privatekey(FILETYPE_PEM, inconsistentPrivateKeyPEM) key.check() def test_check_public_key(self): @@ -1197,10 +1229,11 @@ class TestPKey(object): def test_check_pr_897(self): """ - `PKey.check` raises `OpenSSL.crypto.Error` if provided with broken key + Either `load_privatekey` or `PKey.check` raises `OpenSSL.crypto.Error` + if provided with broken key """ - pkey = load_privatekey(FILETYPE_PEM, rsa_p_not_prime_pem) with pytest.raises(Error): + pkey = load_privatekey(FILETYPE_PEM, rsa_p_not_prime_pem) pkey.check() @@ -1222,7 +1255,7 @@ def x509_name(**attrs): return name -class TestX509Name(object): +class TestX509Name: """ Unit tests for `OpenSSL.crypto.X509Name`. """ @@ -1398,6 +1431,19 @@ class TestX509Name(object): # other X509Name. assert_greater_than(x509_name(CN="def"), x509_name(CN="abc")) + def assert_raises(a, b): + with pytest.raises(TypeError): + a < b + with pytest.raises(TypeError): + a <= b + with pytest.raises(TypeError): + a > b + with pytest.raises(TypeError): + a >= b + + # Only X509Name objects can be compared with lesser than / greater than + assert_raises(x509_name(), object()) + def test_hash(self): """ `X509Name.hash` returns an integer hash based on the value of the name. @@ -1555,14 +1601,20 @@ class TestX509Req(_PKeyInteractionTestsMixin): """ `X509Req.set_version` sets the X.509 version of the certificate request. `X509Req.get_version` returns the X.509 version of the - certificate request. The initial value of the version is 0. + certificate request. The only defined version is 0. Others may or + may not be supported depending on backend. """ request = X509Req() assert request.get_version() == 0 - request.set_version(1) - assert request.get_version() == 1 - request.set_version(3) - assert request.get_version() == 3 + request.set_version(0) + assert request.get_version() == 0 + try: + request.set_version(1) + assert request.get_version() == 1 + request.set_version(3) + assert request.get_version() == 3 + except Error: + pass def test_version_wrong_args(self): """ @@ -1686,9 +1738,7 @@ class TestX509Req(_PKeyInteractionTestsMixin): assert request.verify(pkey) def test_convert_from_cryptography(self): - crypto_req = x509.load_pem_x509_csr( - cleartextCertificateRequestPEM, backend - ) + crypto_req = x509.load_pem_x509_csr(cleartextCertificateRequestPEM) req = X509Req.from_cryptography(crypto_req) assert isinstance(req, X509Req) @@ -1752,8 +1802,8 @@ class TestX509(_PKeyInteractionTestsMixin): `X509.get_version` retrieves it. """ cert = X509() - cert.set_version(1234) - assert cert.get_version() == 1234 + cert.set_version(2) + assert cert.get_version() == 2 def test_serial_number(self): """ @@ -1767,12 +1817,12 @@ class TestX509(_PKeyInteractionTestsMixin): assert certificate.get_serial_number() == 0 certificate.set_serial_number(1) assert certificate.get_serial_number() == 1 - certificate.set_serial_number(2 ** 32 + 1) - assert certificate.get_serial_number() == 2 ** 32 + 1 - certificate.set_serial_number(2 ** 64 + 1) - assert certificate.get_serial_number() == 2 ** 64 + 1 - certificate.set_serial_number(2 ** 128 + 1) - assert certificate.get_serial_number() == 2 ** 128 + 1 + certificate.set_serial_number(2**32 + 1) + assert certificate.get_serial_number() == 2**32 + 1 + certificate.set_serial_number(2**64 + 1) + assert certificate.get_serial_number() == 2**64 + 1 + certificate.set_serial_number(2**128 + 1) + assert certificate.get_serial_number() == 2**128 + 1 def _setBoundTest(self, which): """ @@ -1920,6 +1970,14 @@ class TestX509(_PKeyInteractionTestsMixin): cert.gmtime_adj_notAfter(2) assert not cert.has_expired() + def test_has_expired_exception(self): + """ + `X509.has_expired` throws ValueError if not-after time is not set + """ + cert = X509() + with pytest.raises(ValueError): + cert.has_expired() + def test_root_has_not_expired(self): """ `X509.has_expired` returns `False` if the certificate's not-after time @@ -1935,13 +1993,13 @@ class TestX509(_PKeyInteractionTestsMixin): """ cert = load_certificate(FILETYPE_PEM, old_root_cert_pem) assert ( - # This is MD5 instead of GOOD_DIGEST because the digest algorithm - # actually matters to the assertion (ie, another arbitrary, good - # digest will not product the same digest). # Digest verified with the command: - # openssl x509 -in root_cert.pem -noout -fingerprint -md5 - cert.digest("MD5") - == b"19:B3:05:26:2B:F8:F2:FF:0B:8F:21:07:A8:28:B8:75" + # openssl x509 -in root_cert.pem -noout -fingerprint -sha256 + cert.digest("SHA256") + == ( + b"3E:0F:16:39:6B:B1:3E:4F:08:85:C6:5F:10:0D:CB:2C:" + b"25:C2:91:4E:D0:4A:C2:29:06:BD:55:E3:A7:B3:B7:06" + ) ) def _extcert(self, pkey, extensions): @@ -1957,7 +2015,7 @@ class TestX509(_PKeyInteractionTestsMixin): cert.set_notAfter(when) cert.add_extensions(extensions) - cert.sign(pkey, "sha1") + cert.sign(pkey, "sha256") return load_certificate( FILETYPE_PEM, dump_certificate(FILETYPE_PEM, cert) ) @@ -2036,8 +2094,8 @@ class TestX509(_PKeyInteractionTestsMixin): b"DNS:altnull.python.org\x00example.com, " b"email:[email protected]\[email protected], " b"URI:http://null.python.org\x00http://example.org, " - b"IP Address:192.0.2.1, IP Address:2001:DB8:0:0:0:0:0:1\n" - == str(ext).encode("ascii") + b"IP Address:192.0.2.1, IP Address:2001:DB8:0:0:0:0:0:1" + == str(ext).encode("ascii").strip() ) def test_invalid_digest_algorithm(self): @@ -2204,9 +2262,7 @@ tgI5 cert.sign(object(), b"sha256") def test_convert_from_cryptography(self): - crypto_cert = x509.load_pem_x509_certificate( - intermediate_cert_pem, backend - ) + crypto_cert = x509.load_pem_x509_certificate(intermediate_cert_pem) cert = X509.from_cryptography(crypto_cert) assert isinstance(cert, X509) @@ -2224,7 +2280,7 @@ tgI5 assert crypto_cert.version.value == cert.get_version() -class TestX509Store(object): +class TestX509Store: """ Test for `OpenSSL.crypto.X509Store`. """ @@ -2295,7 +2351,7 @@ class TestX509Store(object): def test_load_locations_parameters( self, cafile, capath, call_cafile, call_capath, monkeypatch ): - class LibMock(object): + class LibMock: def load_locations(self, store, cafile, capath): self.cafile = cafile self.capath = capath @@ -2326,7 +2382,7 @@ class TestX509Store(object): store.load_locations(cafile=str(invalid_ca_file)) -class TestPKCS12(object): +class TestPKCS12: """ Test for `OpenSSL.crypto.PKCS12` and `OpenSSL.crypto.load_pkcs12`. """ @@ -2467,7 +2523,7 @@ class TestPKCS12(object): b"-nodes", b"-passin", b"pass:" + passwd, - *extra + *extra, ) assert recovered_key[-len(key) :] == key if cert: @@ -2479,7 +2535,7 @@ class TestPKCS12(object): b"-passin", b"pass:" + passwd, b"-nokeys", - *extra + *extra, ) assert recovered_cert[-len(cert) :] == cert if ca: @@ -2491,7 +2547,7 @@ class TestPKCS12(object): b"-passin", b"pass:" + passwd, b"-nokeys", - *extra + *extra, ) assert recovered_cert[-len(ca) :] == ca @@ -2802,7 +2858,7 @@ def _runopenssl(pem, *args): return output -class TestLoadPublicKey(object): +class TestLoadPublicKey: """ Tests for :func:`load_publickey`. """ @@ -2842,7 +2898,7 @@ class TestLoadPublicKey(object): assert dumped_pem == cleartextPublicKeyPEM -class TestFunction(object): +class TestFunction: """ Tests for free-functions in the `OpenSSL.crypto` module. """ @@ -3263,7 +3319,7 @@ class TestFunction(object): load_pkcs7_data(object(), b"foo") -class TestLoadCertificate(object): +class TestLoadCertificate: """ Tests for `load_certificate_request`. """ @@ -3287,7 +3343,7 @@ class TestLoadCertificate(object): load_certificate(FILETYPE_ASN1, b"lol") -class TestPKCS7(object): +class TestPKCS7: """ Tests for `PKCS7`. """ @@ -3387,7 +3443,7 @@ class TestNetscapeSPKI(_PKeyInteractionTestsMixin): assert isinstance(blob, bytes) -class TestRevoked(object): +class TestRevoked: """ Tests for `OpenSSL.crypto.Revoked`. """ @@ -3477,7 +3533,7 @@ class TestRevoked(object): revoked.set_reason(None) assert revoked.get_reason() is None - @pytest.mark.parametrize("reason", [object(), 1.0, u"foo"]) + @pytest.mark.parametrize("reason", [object(), 1.0, "foo"]) def test_set_reason_wrong_args(self, reason): """ `Revoked.set_reason` raises `TypeError` if called with an argument @@ -3497,7 +3553,7 @@ class TestRevoked(object): revoked.set_reason(b"blue") -class TestCRL(object): +class TestCRL: """ Tests for `OpenSSL.crypto.CRL`. """ @@ -3548,17 +3604,17 @@ class TestCRL(object): dumped_crl = self._get_crl().export( self.cert, self.pkey, days=20, digest=b"sha256" ) - crl = x509.load_pem_x509_crl(dumped_crl, backend) + crl = x509.load_pem_x509_crl(dumped_crl) revoked = crl.get_revoked_certificate_by_serial_number(0x03AB) assert revoked is not None assert crl.issuer == x509.Name( [ - x509.NameAttribute(x509.NameOID.COUNTRY_NAME, u"US"), - x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, u"IL"), - x509.NameAttribute(x509.NameOID.LOCALITY_NAME, u"Chicago"), - x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, u"Testing"), + x509.NameAttribute(x509.NameOID.COUNTRY_NAME, "US"), + x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, "IL"), + x509.NameAttribute(x509.NameOID.LOCALITY_NAME, "Chicago"), + x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, "Testing"), x509.NameAttribute( - x509.NameOID.COMMON_NAME, u"Testing Root CA" + x509.NameOID.COMMON_NAME, "Testing Root CA" ), ] ) @@ -3573,19 +3629,19 @@ class TestCRL(object): # DER format dumped_crl = self._get_crl().export( - self.cert, self.pkey, FILETYPE_ASN1, digest=b"md5" + self.cert, self.pkey, FILETYPE_ASN1, digest=b"sha256" ) - crl = x509.load_der_x509_crl(dumped_crl, backend) + crl = x509.load_der_x509_crl(dumped_crl) revoked = crl.get_revoked_certificate_by_serial_number(0x03AB) assert revoked is not None assert crl.issuer == x509.Name( [ - x509.NameAttribute(x509.NameOID.COUNTRY_NAME, u"US"), - x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, u"IL"), - x509.NameAttribute(x509.NameOID.LOCALITY_NAME, u"Chicago"), - x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, u"Testing"), + x509.NameAttribute(x509.NameOID.COUNTRY_NAME, "US"), + x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, "IL"), + x509.NameAttribute(x509.NameOID.LOCALITY_NAME, "Chicago"), + x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, "Testing"), x509.NameAttribute( - x509.NameOID.COMMON_NAME, u"Testing Root CA" + x509.NameOID.COMMON_NAME, "Testing Root CA" ), ] ) @@ -3600,7 +3656,7 @@ class TestCRL(object): # text format dumped_text = crl.export( - self.cert, self.pkey, type=FILETYPE_TEXT, digest=b"md5" + self.cert, self.pkey, type=FILETYPE_TEXT, digest=b"sha256" ) assert len(dumped_text) > 500 @@ -3610,9 +3666,9 @@ class TestCRL(object): signature algorithm based on that digest function. """ crl = self._get_crl() - dumped_crl = crl.export(self.cert, self.pkey, digest=b"sha1") + dumped_crl = crl.export(self.cert, self.pkey, digest=b"sha384") text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text") - text.index(b"Signature Algorithm: sha1") + text.index(b"Signature Algorithm: sha384") def test_export_md5_digest(self): """ @@ -3794,7 +3850,9 @@ class TestCRL(object): crl.add_revoked(revoked) crl.set_version(1) crl.set_lastUpdate(b"20140601000000Z") - crl.set_nextUpdate(b"20180601000000Z") + # The year 5000 is far into the future so that this CRL isn't + # considered to have expired. + crl.set_nextUpdate(b"50000601000000Z") crl.sign(issuer_cert, issuer_key, digest=b"sha512") return crl @@ -3820,7 +3878,7 @@ class TestCRL(object): store_ctx = X509StoreContext(store, self.intermediate_server_cert) with pytest.raises(X509StoreContextError) as err: store_ctx.verify_certificate() - assert err.value.args[0][2] == "certificate revoked" + assert str(err.value) == "certificate revoked" def test_verify_with_missing_crl(self): """ @@ -3840,11 +3898,11 @@ class TestCRL(object): store_ctx = X509StoreContext(store, self.intermediate_server_cert) with pytest.raises(X509StoreContextError) as err: store_ctx.verify_certificate() - assert err.value.args[0][2] == "unable to get certificate CRL" + assert str(err.value) == "unable to get certificate CRL" assert err.value.certificate.get_subject().CN == "intermediate-service" def test_convert_from_cryptography(self): - crypto_crl = x509.load_pem_x509_crl(crlData, backend) + crypto_crl = x509.load_pem_x509_crl(crlData) crl = CRL.from_cryptography(crypto_crl) assert isinstance(crl, CRL) @@ -3858,7 +3916,7 @@ class TestCRL(object): assert isinstance(crypto_crl, x509.CertificateRevocationList) -class TestX509StoreContext(object): +class TestX509StoreContext: """ Tests for `OpenSSL.crypto.X509StoreContext`. """ @@ -4051,7 +4109,11 @@ class TestX509StoreContext(object): with pytest.raises(X509StoreContextError) as exc: store_ctx.verify_certificate() - assert exc.value.args[0][2] == "self signed certificate" + # OpenSSL 1.1.x and 3.0.x have different error messages + assert str(exc.value) in [ + "self signed certificate", + "self-signed certificate", + ] assert exc.value.certificate.get_subject().CN == "Testing Root CA" def test_invalid_chain_no_root(self): @@ -4066,7 +4128,7 @@ class TestX509StoreContext(object): with pytest.raises(X509StoreContextError) as exc: store_ctx.verify_certificate() - assert exc.value.args[0][2] == "unable to get issuer certificate" + assert str(exc.value) == "unable to get issuer certificate" assert exc.value.certificate.get_subject().CN == "intermediate" def test_invalid_chain_no_intermediate(self): @@ -4081,7 +4143,7 @@ class TestX509StoreContext(object): with pytest.raises(X509StoreContextError) as exc: store_ctx.verify_certificate() - assert exc.value.args[0][2] == "unable to get local issuer certificate" + assert str(exc.value) == "unable to get local issuer certificate" assert exc.value.certificate.get_subject().CN == "intermediate-service" def test_modification_pre_verify(self): @@ -4099,7 +4161,7 @@ class TestX509StoreContext(object): with pytest.raises(X509StoreContextError) as exc: store_ctx.verify_certificate() - assert exc.value.args[0][2] == "unable to get issuer certificate" + assert str(exc.value) == "unable to get issuer certificate" assert exc.value.certificate.get_subject().CN == "intermediate" store_ctx.set_store(store_good) @@ -4124,7 +4186,7 @@ class TestX509StoreContext(object): with pytest.raises(X509StoreContextError) as exc: store_ctx.verify_certificate() - assert exc.value.args[0][2] == "certificate has expired" + assert str(exc.value) == "certificate has expired" def test_get_verified_chain(self): """ @@ -4158,7 +4220,7 @@ class TestX509StoreContext(object): with pytest.raises(X509StoreContextError) as exc: store_ctx.get_verified_chain() - assert exc.value.args[0][2] == "unable to get issuer certificate" + assert str(exc.value) == "unable to get issuer certificate" assert exc.value.certificate.get_subject().CN == "intermediate" @pytest.fixture @@ -4223,10 +4285,23 @@ class TestX509StoreContext(object): with pytest.raises(X509StoreContextError) as exc: store_ctx.verify_certificate() - assert exc.value.args[0][2] == "unable to get local issuer certificate" + assert str(exc.value) == "unable to get local issuer certificate" + + def test_verify_with_partial_chain(self): + store = X509Store() + store.add_cert(self.intermediate_cert) + + store_ctx = X509StoreContext(store, self.intermediate_server_cert) + with pytest.raises(X509StoreContextError): + store_ctx.verify_certificate() + + # Now set the partial verification flag for verification. + store.set_flags(X509StoreFlags.PARTIAL_CHAIN) + store_ctx = X509StoreContext(store, self.intermediate_server_cert) + assert store_ctx.verify_certificate() is None -class TestSignVerify(object): +class TestSignVerify: """ Tests for `OpenSSL.crypto.sign` and `OpenSSL.crypto.verify`. """ @@ -4250,7 +4325,7 @@ class TestSignVerify(object): # certificate unrelated to priv_key, used to trigger an error bad_cert = load_certificate(FILETYPE_PEM, server_cert_pem) - for digest in ["md5", "sha1"]: + for digest in ["md5", "sha1", "sha256"]: sig = sign(priv_key, content, digest) # Verify the signature of content, will throw an exception if @@ -4289,7 +4364,7 @@ class TestSignVerify(object): priv_key = load_privatekey(FILETYPE_PEM, root_key_pem) cert = load_certificate(FILETYPE_PEM, root_cert_pem) - for digest in ["md5", "sha1"]: + for digest in ["md5", "sha1", "sha256"]: with pytest.warns(DeprecationWarning) as w: simplefilter("always") sig = sign(priv_key, content, digest) @@ -4319,8 +4394,8 @@ class TestSignVerify(object): ) priv_key = load_privatekey(FILETYPE_PEM, ec_root_key_pem) cert = load_certificate(FILETYPE_PEM, ec_root_cert_pem) - sig = sign(priv_key, content, "sha1") - verify(cert, sig, content, "sha1") + sig = sign(priv_key, content, "sha256") + verify(cert, sig, content, "sha256") def test_sign_nulls(self): """ @@ -4329,8 +4404,8 @@ class TestSignVerify(object): content = b"Watch out! \0 Did you see it?" priv_key = load_privatekey(FILETYPE_PEM, root_key_pem) good_cert = load_certificate(FILETYPE_PEM, root_cert_pem) - sig = sign(priv_key, content, "sha1") - verify(good_cert, sig, content, "sha1") + sig = sign(priv_key, content, "sha256") + verify(good_cert, sig, content, "sha256") def test_sign_with_large_key(self): """ @@ -4345,10 +4420,10 @@ class TestSignVerify(object): ) priv_key = load_privatekey(FILETYPE_PEM, large_key_pem) - sign(priv_key, content, "sha1") + sign(priv_key, content, "sha256") -class TestEllipticCurve(object): +class TestEllipticCurve: """ Tests for `_EllipticCurve`, `get_elliptic_curve`, and `get_elliptic_curves`. @@ -4375,7 +4450,7 @@ class TestEllipticCurve(object): does not identify a supported curve. """ with pytest.raises(ValueError): - get_elliptic_curve(u"this curve was just invented") + get_elliptic_curve("this curve was just invented") def test_repr(self): """ @@ -4399,7 +4474,7 @@ class TestEllipticCurve(object): curve._to_EC_KEY() -class EllipticCurveFactory(object): +class EllipticCurveFactory: """ A helper to get the names of two curves. """ @@ -4434,7 +4509,7 @@ class TestEllipticCurveEquality(EqualityTestsMixin): return get_elliptic_curve(self.curve_factory.another_curve_name) -class TestEllipticCurveHash(object): +class TestEllipticCurveHash: """ Tests for `_EllipticCurve`'s implementation of hashing (thus use as an item in a `dict` or `set`). diff --git a/contrib/python/pyOpenSSL/py3/tests/test_debug.py b/contrib/python/pyOpenSSL/py3/tests/test_debug.py index 2d62a3a56a1..4eeb3d00b31 100644 --- a/contrib/python/pyOpenSSL/py3/tests/test_debug.py +++ b/contrib/python/pyOpenSSL/py3/tests/test_debug.py @@ -1,5 +1,5 @@ -from OpenSSL.debug import _env_info from OpenSSL import version +from OpenSSL.debug import _env_info def test_debug_info(): diff --git a/contrib/python/pyOpenSSL/py3/tests/test_rand.py b/contrib/python/pyOpenSSL/py3/tests/test_rand.py index 763d71124ca..24f30939545 100644 --- a/contrib/python/pyOpenSSL/py3/tests/test_rand.py +++ b/contrib/python/pyOpenSSL/py3/tests/test_rand.py @@ -10,7 +10,7 @@ import pytest from OpenSSL import rand -class TestRand(object): +class TestRand: @pytest.mark.parametrize("args", [(b"foo", None), (None, 3)]) def test_add_wrong_args(self, args): """ diff --git a/contrib/python/pyOpenSSL/py3/tests/test_ssl.py b/contrib/python/pyOpenSSL/py3/tests/test_ssl.py index 8bb5a035a77..fdedd7133a0 100644 --- a/contrib/python/pyOpenSSL/py3/tests/test_ssl.py +++ b/contrib/python/pyOpenSSL/py3/tests/test_ssl.py @@ -7,124 +7,125 @@ Unit tests for :mod:`OpenSSL.SSL`. import datetime import gc +import select import sys import uuid - -from gc import collect, get_referrers from errno import ( EAFNOSUPPORT, ECONNREFUSED, EINPROGRESS, - EWOULDBLOCK, EPIPE, ESHUTDOWN, + EWOULDBLOCK, ) -from sys import platform, getfilesystemencoding -from socket import AF_INET, AF_INET6, MSG_PEEK, SHUT_RDWR, error, socket +from gc import collect, get_referrers from os import makedirs from os.path import join -from weakref import ref +from socket import ( + AF_INET, + AF_INET6, + MSG_PEEK, + SHUT_RDWR, + error, + socket, +) +from sys import getfilesystemencoding, platform +from typing import Union from warnings import simplefilter - -import flaky - -import pytest - -from pretend import raiser - -from six import PY2, text_type +from weakref import ref from cryptography import x509 -from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.x509.oid import NameOID +import flaky -from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM -from OpenSSL.crypto import PKey, X509, X509Extension, X509Store -from OpenSSL.crypto import dump_privatekey, load_privatekey -from OpenSSL.crypto import dump_certificate, load_certificate -from OpenSSL.crypto import get_elliptic_curves +from pretend import raiser -from OpenSSL.SSL import ( - OPENSSL_VERSION_NUMBER, - SSLEAY_VERSION, - SSLEAY_CFLAGS, - TLS_METHOD, - TLS1_3_VERSION, - TLS1_2_VERSION, - TLS1_1_VERSION, -) -from OpenSSL.SSL import SSLEAY_PLATFORM, SSLEAY_DIR, SSLEAY_BUILT_ON -from OpenSSL.SSL import SENT_SHUTDOWN, RECEIVED_SHUTDOWN -from OpenSSL.SSL import ( - SSLv2_METHOD, - SSLv3_METHOD, - SSLv23_METHOD, - TLSv1_METHOD, - TLSv1_1_METHOD, - TLSv1_2_METHOD, -) -from OpenSSL.SSL import OP_SINGLE_DH_USE, OP_NO_SSLv2, OP_NO_SSLv3 -from OpenSSL.SSL import ( - VERIFY_PEER, - VERIFY_FAIL_IF_NO_PEER_CERT, - VERIFY_CLIENT_ONCE, - VERIFY_NONE, -) +import pytest from OpenSSL import SSL from OpenSSL.SSL import ( - SESS_CACHE_OFF, - SESS_CACHE_CLIENT, - SESS_CACHE_SERVER, + Connection, + Context, + DTLS_METHOD, + Error, + MODE_RELEASE_BUFFERS, + NO_OVERLAPPING_PROTOCOLS, + OPENSSL_VERSION_NUMBER, + OP_COOKIE_EXCHANGE, + OP_NO_COMPRESSION, + OP_NO_QUERY_MTU, + OP_NO_SSLv2, + OP_NO_SSLv3, + OP_NO_TICKET, + OP_SINGLE_DH_USE, + RECEIVED_SHUTDOWN, + SENT_SHUTDOWN, SESS_CACHE_BOTH, + SESS_CACHE_CLIENT, SESS_CACHE_NO_AUTO_CLEAR, + SESS_CACHE_NO_INTERNAL, SESS_CACHE_NO_INTERNAL_LOOKUP, SESS_CACHE_NO_INTERNAL_STORE, - SESS_CACHE_NO_INTERNAL, -) - -from OpenSSL.SSL import ( - Error, + SESS_CACHE_OFF, + SESS_CACHE_SERVER, + SSLEAY_BUILT_ON, + SSLEAY_CFLAGS, + SSLEAY_DIR, + SSLEAY_PLATFORM, + SSLEAY_VERSION, + SSL_CB_ACCEPT_EXIT, + SSL_CB_ACCEPT_LOOP, + SSL_CB_ALERT, + SSL_CB_CONNECT_EXIT, + SSL_CB_CONNECT_LOOP, + SSL_CB_EXIT, + SSL_CB_HANDSHAKE_DONE, + SSL_CB_HANDSHAKE_START, + SSL_CB_LOOP, + SSL_CB_READ, + SSL_CB_READ_ALERT, + SSL_CB_WRITE, + SSL_CB_WRITE_ALERT, + SSL_ST_ACCEPT, + SSL_ST_CONNECT, + SSL_ST_MASK, + SSLeay_version, + SSLv23_METHOD, + Session, SysCallError, + TLS1_1_VERSION, + TLS1_2_VERSION, + TLS1_3_VERSION, + TLS_METHOD, + TLSv1_1_METHOD, + TLSv1_2_METHOD, + TLSv1_METHOD, + VERIFY_CLIENT_ONCE, + VERIFY_FAIL_IF_NO_PEER_CERT, + VERIFY_NONE, + VERIFY_PEER, WantReadError, WantWriteError, ZeroReturnError, + _make_requires, ) -from OpenSSL.SSL import Context, Session, Connection, SSLeay_version -from OpenSSL.SSL import _make_requires - from OpenSSL._util import ffi as _ffi, lib as _lib - -from OpenSSL.SSL import ( - OP_NO_QUERY_MTU, - OP_COOKIE_EXCHANGE, - OP_NO_TICKET, - OP_NO_COMPRESSION, - MODE_RELEASE_BUFFERS, - NO_OVERLAPPING_PROTOCOLS, -) - -from OpenSSL.SSL import ( - SSL_ST_CONNECT, - SSL_ST_ACCEPT, - SSL_ST_MASK, - SSL_CB_LOOP, - SSL_CB_EXIT, - SSL_CB_READ, - SSL_CB_WRITE, - SSL_CB_ALERT, - SSL_CB_READ_ALERT, - SSL_CB_WRITE_ALERT, - SSL_CB_ACCEPT_LOOP, - SSL_CB_ACCEPT_EXIT, - SSL_CB_CONNECT_LOOP, - SSL_CB_CONNECT_EXIT, - SSL_CB_HANDSHAKE_START, - SSL_CB_HANDSHAKE_DONE, +from OpenSSL.crypto import ( + FILETYPE_PEM, + PKey, + TYPE_RSA, + X509, + X509Extension, + X509Store, + dump_certificate, + dump_privatekey, + get_elliptic_curves, + load_certificate, + load_privatekey, ) try: @@ -142,15 +143,15 @@ try: except ImportError: OP_NO_TLSv1_3 = None -from .util import WARNING_TYPE_EXPECTED, NON_ASCII, is_consistent_type from .test_crypto import ( client_cert_pem, client_key_pem, - server_cert_pem, - server_key_pem, root_cert_pem, root_key_pem, + server_cert_pem, + server_key_pem, ) +from .util import NON_ASCII, WARNING_TYPE_EXPECTED, is_consistent_type # openssl dhparam 2048 -out dh-2048.pem @@ -166,9 +167,6 @@ i5s5yYK7a/0eWxxRr2qraYaUj8RwDpH9CwIBAg== """ -skip_if_py3 = pytest.mark.skipif(not PY2, reason="Python 2 only") - - def socket_any_family(): try: return socket(AF_INET) @@ -197,7 +195,7 @@ def join_bytes_or_unicode(prefix, suffix): return join(prefix, suffix) # Otherwise, coerce suffix to the type of prefix. - if isinstance(prefix, text_type): + if isinstance(prefix, str): return join(prefix, suffix.decode(getfilesystemencoding())) else: return join(prefix, suffix.encode(getfilesystemencoding())) @@ -221,6 +219,8 @@ def socket_pair(): client.setblocking(True) server = port.accept()[0] + port.close() + # Let's pass some unencrypted data to make sure our socket connection is # fine. Just one byte, so we don't have to worry about buffers getting # filled up or fragmentation. @@ -366,7 +366,7 @@ def interact_in_memory(client_conn, server_conn): # Give the side a chance to generate some more bytes, or succeed. try: - data = read.recv(2 ** 16) + data = read.recv(2**16) except WantReadError: # It didn't succeed, so we'll hope it generated some output. pass @@ -407,7 +407,7 @@ def handshake_in_memory(client_conn, server_conn): interact_in_memory(client_conn, server_conn) -class TestVersion(object): +class TestVersion: """ Tests for version information exposed by `OpenSSL.SSL.SSLeay_version` and `OpenSSL.SSL.OPENSSL_VERSION_NUMBER`. @@ -444,17 +444,15 @@ def ca_file(tmpdir): """ Create a valid PEM file with CA certificates and return the path. """ - key = rsa.generate_private_key( - public_exponent=65537, key_size=2048, backend=default_backend() - ) + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) public_key = key.public_key() builder = x509.CertificateBuilder() builder = builder.subject_name( - x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org")]) + x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "pyopenssl.org")]) ) builder = builder.issuer_name( - x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org")]) + x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "pyopenssl.org")]) ) one_day = datetime.timedelta(1, 0, 0) builder = builder.not_valid_before(datetime.datetime.today() - one_day) @@ -466,9 +464,7 @@ def ca_file(tmpdir): critical=True, ) - certificate = builder.sign( - private_key=key, algorithm=hashes.SHA256(), backend=default_backend() - ) + certificate = builder.sign(private_key=key, algorithm=hashes.SHA256()) ca_file = tmpdir.join("test.pem") ca_file.write_binary( @@ -488,14 +484,14 @@ def context(): return Context(SSLv23_METHOD) -class TestContext(object): +class TestContext: """ Unit tests for `OpenSSL.SSL.Context`. """ @pytest.mark.parametrize( "cipher_string", - [b"hello world:AES128-SHA", u"hello world:AES128-SHA"], + [b"hello world:AES128-SHA", "hello world:AES128-SHA"], ) def test_set_cipher_list(self, context, cipher_string): """ @@ -525,15 +521,20 @@ class TestContext(object): """ with pytest.raises(Error) as excinfo: context.set_cipher_list(b"imaginary-cipher") - assert excinfo.value.args == ( - [ - ( - "SSL routines", - "SSL_CTX_set_cipher_list", - "no cipher match", - ) - ], - ) + assert excinfo.value.args[0][0] in [ + # 1.1.x + ( + "SSL routines", + "SSL_CTX_set_cipher_list", + "no cipher match", + ), + # 3.0.x + ( + "SSL routines", + "", + "no cipher match", + ), + ] def test_load_client_ca(self, context, ca_file): """ @@ -572,20 +573,27 @@ class TestContext(object): with pytest.raises(Error) as e: context.set_session_id(b"abc" * 1000) - assert [ + assert e.value.args[0][0] in [ + # 1.1.x ( "SSL routines", "SSL_CTX_set_session_id_context", "ssl session id context too long", - ) - ] == e.value.args[0] + ), + # 3.0.x + ( + "SSL routines", + "", + "ssl session id context too long", + ), + ] def test_set_session_id_unicode(self, context): """ `Context.set_session_id` raises a warning if a unicode string is passed. """ - pytest.deprecated_call(context.set_session_id, u"abc") + pytest.deprecated_call(context.set_session_id, "abc") def test_method(self): """ @@ -597,7 +605,7 @@ class TestContext(object): for meth in methods: Context(meth) - maybe = [SSLv2_METHOD, SSLv3_METHOD, TLSv1_1_METHOD, TLSv1_2_METHOD] + maybe = [TLSv1_1_METHOD, TLSv1_2_METHOD] for meth in maybe: try: Context(meth) @@ -609,7 +617,7 @@ class TestContext(object): with pytest.raises(TypeError): Context("") with pytest.raises(ValueError): - Context(10) + Context(13) def test_type(self): """ @@ -617,17 +625,6 @@ class TestContext(object): """ assert is_consistent_type(Context, "Context", TLSv1_METHOD) - def test_use_privatekey(self): - """ - `Context.use_privatekey` takes an `OpenSSL.crypto.PKey` instance. - """ - key = PKey() - key.generate_key(TYPE_RSA, 1024) - ctx = Context(SSLv23_METHOD) - ctx.use_privatekey(key) - with pytest.raises(TypeError): - ctx.use_privatekey("") - def test_use_privatekey_file_missing(self, tmpfile): """ `Context.use_privatekey_file` raises `OpenSSL.SSL.Error` when passed @@ -681,37 +678,6 @@ class TestContext(object): FILETYPE_PEM, ) - def test_use_certificate_wrong_args(self): - """ - `Context.use_certificate_wrong_args` raises `TypeError` when not passed - exactly one `OpenSSL.crypto.X509` instance as an argument. - """ - ctx = Context(SSLv23_METHOD) - with pytest.raises(TypeError): - ctx.use_certificate("hello, world") - - def test_use_certificate_uninitialized(self): - """ - `Context.use_certificate` raises `OpenSSL.SSL.Error` when passed a - `OpenSSL.crypto.X509` instance which has not been initialized - (ie, which does not actually have any certificate data). - """ - ctx = Context(SSLv23_METHOD) - with pytest.raises(Error): - ctx.use_certificate(X509()) - - def test_use_certificate(self): - """ - `Context.use_certificate` sets the certificate which will be - used to identify connections created using the context. - """ - # TODO - # Hard to assert anything. But we could set a privatekey then ask - # OpenSSL if the cert and key agree using check_privatekey. Then as - # long as check_privatekey works right we're good... - ctx = Context(SSLv23_METHOD) - ctx.use_certificate(load_certificate(FILETYPE_PEM, root_cert_pem)) - def test_use_certificate_file_wrong_args(self): """ `Context.use_certificate_file` raises `TypeError` if the first @@ -1211,8 +1177,8 @@ class TestContext(object): def test_fallback_default_verify_paths(self, monkeypatch): """ Test that we load certificates successfully on linux from the fallback - path. To do this we set the _CRYPTOGRAPHY_MANYLINUX1_CA_FILE and - _CRYPTOGRAPHY_MANYLINUX1_CA_DIR vars to be equal to whatever the + path. To do this we set the _CRYPTOGRAPHY_MANYLINUX_CA_FILE and + _CRYPTOGRAPHY_MANYLINUX_CA_DIR vars to be equal to whatever the current OpenSSL default is and we disable SSL_CTX_SET_default_verify_paths so that it can't find certs unless it loads via fallback. @@ -1223,12 +1189,12 @@ class TestContext(object): ) monkeypatch.setattr( SSL, - "_CRYPTOGRAPHY_MANYLINUX1_CA_FILE", + "_CRYPTOGRAPHY_MANYLINUX_CA_FILE", _ffi.string(_lib.X509_get_default_cert_file()), ) monkeypatch.setattr( SSL, - "_CRYPTOGRAPHY_MANYLINUX1_CA_DIR", + "_CRYPTOGRAPHY_MANYLINUX_CA_DIR", _ffi.string(_lib.X509_get_default_cert_dir()), ) context.set_default_verify_paths() @@ -1332,20 +1298,20 @@ class TestContext(object): """ serverSocket, clientSocket = socket_pair() - server = Connection(serverContext, serverSocket) - server.set_accept_state() + with serverSocket, clientSocket: + server = Connection(serverContext, serverSocket) + server.set_accept_state() - client = Connection(clientContext, clientSocket) - client.set_connect_state() + client = Connection(clientContext, clientSocket) + client.set_connect_state() - # Make them talk to each other. - # interact_in_memory(client, server) - for _ in range(3): - for s in [client, server]: - try: - s.do_handshake() - except WantReadError: - pass + # Make them talk to each other. + for _ in range(3): + for s in [client, server]: + try: + s.do_handshake() + except WantReadError: + select.select([client, server], [], []) def test_set_verify_callback_connection_argument(self): """ @@ -1361,7 +1327,7 @@ class TestContext(object): ) serverConnection = Connection(serverContext, None) - class VerifyCallback(object): + class VerifyCallback: def callback(self, connection, *args): self.connection = connection return 1 @@ -1705,7 +1671,7 @@ class TestContext(object): """ context = Context(SSLv23_METHOD) for curve in get_elliptic_curves(): - if curve.name.startswith(u"Oakley-"): + if curve.name.startswith("Oakley-"): # Setting Oakley-EC2N-4 and Oakley-EC2N-3 adds # ('bignum routines', 'BN_mod_inverse', 'no inverse') to the # error queue on OpenSSL 1.0.2. @@ -1751,7 +1717,7 @@ class TestContext(object): """ context = Context(SSLv23_METHOD) with pytest.raises(TypeError): - context.set_tlsext_use_srtp(text_type("SRTP_AES128_CM_SHA1_80")) + context.set_tlsext_use_srtp(str("SRTP_AES128_CM_SHA1_80")) def test_set_tlsext_use_srtp_invalid_profile(self): """ @@ -1773,7 +1739,7 @@ class TestContext(object): assert context.set_tlsext_use_srtp(b"SRTP_AES128_CM_SHA1_80") is None -class TestServerNameCallback(object): +class TestServerNameCallback: """ Tests for `Context.set_tlsext_servername_callback` and its interaction with `Connection`. @@ -1882,7 +1848,7 @@ class TestServerNameCallback(object): assert args == [(server, b"foo1.example.com")] -class TestApplicationLayerProtoNegotiation(object): +class TestApplicationLayerProtoNegotiation: """ Tests for ALPN in PyOpenSSL. """ @@ -1927,14 +1893,13 @@ class TestApplicationLayerProtoNegotiation(object): assert server.get_alpn_proto_negotiated() == b"spdy/2" assert client.get_alpn_proto_negotiated() == b"spdy/2" - @pytest.mark.xfail(reason='https://github.com/pyca/pyopenssl/issues/1043') def test_alpn_call_failure(self): """ SSL_CTX_set_alpn_protos does not like to be called with an empty protocols list. Ensure that we produce a user-visible error. """ context = Context(SSLv23_METHOD) - with pytest.raises(Error): + with pytest.raises(ValueError): context.set_alpn_protos([]) def test_alpn_set_on_connection(self): @@ -2066,7 +2031,7 @@ class TestApplicationLayerProtoNegotiation(object): def invalid_cb(conn, options): invalid_cb_args.append((conn, options)) - return u"can't return unicode" + return "can't return unicode" client_context = Context(SSLv23_METHOD) client_context.set_alpn_protos([b"http/1.1", b"spdy/2"]) @@ -2163,7 +2128,7 @@ class TestApplicationLayerProtoNegotiation(object): assert select_args == [(server, [b"http/1.1", b"spdy/2"])] -class TestSession(object): +class TestSession: """ Unit tests for :py:obj:`OpenSSL.SSL.Session`. """ @@ -2177,7 +2142,77 @@ class TestSession(object): assert isinstance(new_session, Session) -class TestConnection(object): [email protected](params=["context", "connection"]) +def ctx_or_conn(request) -> Union[Context, Connection]: + ctx = Context(SSLv23_METHOD) + if request.param == "context": + return ctx + else: + return Connection(ctx, None) + + +class TestContextConnection: + """ + Unit test for methods that are exposed both by Connection and Context + objects. + """ + + def test_use_privatekey(self, ctx_or_conn): + """ + `use_privatekey` takes an `OpenSSL.crypto.PKey` instance. + """ + key = PKey() + key.generate_key(TYPE_RSA, 1024) + + ctx_or_conn.use_privatekey(key) + with pytest.raises(TypeError): + ctx_or_conn.use_privatekey("") + + def test_use_privatekey_wrong_key(self, ctx_or_conn): + """ + `use_privatekey` raises `OpenSSL.SSL.Error` when passed a + `OpenSSL.crypto.PKey` instance which has not been initialized. + """ + key = PKey() + key.generate_key(TYPE_RSA, 1024) + ctx_or_conn.use_certificate( + load_certificate(FILETYPE_PEM, root_cert_pem) + ) + with pytest.raises(Error): + ctx_or_conn.use_privatekey(key) + + def test_use_certificate(self, ctx_or_conn): + """ + `use_certificate` sets the certificate which will be + used to identify connections created using the context. + """ + # TODO + # Hard to assert anything. But we could set a privatekey then ask + # OpenSSL if the cert and key agree using check_privatekey. Then as + # long as check_privatekey works right we're good... + ctx_or_conn.use_certificate( + load_certificate(FILETYPE_PEM, root_cert_pem) + ) + + def test_use_certificate_wrong_args(self, ctx_or_conn): + """ + `use_certificate_wrong_args` raises `TypeError` when not passed + exactly one `OpenSSL.crypto.X509` instance as an argument. + """ + with pytest.raises(TypeError): + ctx_or_conn.use_certificate("hello, world") + + def test_use_certificate_uninitialized(self, ctx_or_conn): + """ + `use_certificate` raises `OpenSSL.SSL.Error` when passed a + `OpenSSL.crypto.X509` instance which has not been initialized + (ie, which does not actually have any certificate data). + """ + with pytest.raises(Error): + ctx_or_conn.use_certificate(X509()) + + +class TestConnection: """ Unit tests for `OpenSSL.SSL.Connection`. """ @@ -2233,7 +2268,7 @@ class TestConnection(object): connection.bio_write(b"xy") connection.bio_write(bytearray(b"za")) with pytest.warns(DeprecationWarning): - connection.bio_write(u"deprecated") + connection.bio_write("deprecated") def test_get_context(self): """ @@ -2286,10 +2321,8 @@ class TestConnection(object): with pytest.raises(TypeError): conn.set_tlsext_host_name(b"with\0null") - if not PY2: - # On Python 3.x, don't accidentally implicitly convert from text. - with pytest.raises(TypeError): - conn.set_tlsext_host_name(b"example.com".decode("ascii")) + with pytest.raises(TypeError): + conn.set_tlsext_host_name(b"example.com".decode("ascii")) def test_pending(self): """ @@ -2629,6 +2662,52 @@ class TestConnection(object): server = Connection(ctx, None) assert None is server.get_verified_chain() + def test_set_verify_overrides_context(self): + context = Context(SSLv23_METHOD) + context.set_verify(VERIFY_PEER) + conn = Connection(context, None) + conn.set_verify(VERIFY_NONE) + + assert context.get_verify_mode() == VERIFY_PEER + assert conn.get_verify_mode() == VERIFY_NONE + + with pytest.raises(TypeError): + conn.set_verify(None) + + with pytest.raises(TypeError): + conn.set_verify(VERIFY_PEER, "not a callable") + + def test_set_verify_callback_reference(self): + """ + The callback for certificate verification should only be forgotten if + the context and all connections created by it do not use it anymore. + """ + + def callback(conn, cert, errnum, depth, ok): # pragma: no cover + return ok + + tracker = ref(callback) + + context = Context(SSLv23_METHOD) + context.set_verify(VERIFY_PEER, callback) + del callback + + conn = Connection(context, None) + context.set_verify(VERIFY_NONE) + + collect() + collect() + assert tracker() + + conn.set_verify(VERIFY_PEER, lambda conn, cert, errnum, depth, ok: ok) + collect() + collect() + callback = tracker() + if callback is not None: # pragma: nocover + referrers = get_referrers(callback) + if len(referrers) > 1: + pytest.fail("Some references remain: %r" % (referrers,)) + def test_get_session_unconnected(self): """ `Connection.get_session` returns `None` when used with an object @@ -2859,8 +2938,8 @@ class TestConnection(object): client.get_cipher_name(), ) - assert isinstance(server_cipher_name, text_type) - assert isinstance(client_cipher_name, text_type) + assert isinstance(server_cipher_name, str) + assert isinstance(client_cipher_name, str) assert server_cipher_name == client_cipher_name @@ -2884,8 +2963,8 @@ class TestConnection(object): client.get_cipher_version(), ) - assert isinstance(server_cipher_version, text_type) - assert isinstance(client_cipher_version, text_type) + assert isinstance(server_cipher_version, str) + assert isinstance(client_cipher_version, str) assert server_cipher_version == client_cipher_version @@ -2923,8 +3002,8 @@ class TestConnection(object): client_protocol_version_name = client.get_protocol_version_name() server_protocol_version_name = server.get_protocol_version_name() - assert isinstance(server_protocol_version_name, text_type) - assert isinstance(client_protocol_version_name, text_type) + assert isinstance(server_protocol_version_name, str) + assert isinstance(client_protocol_version_name, str) assert server_protocol_version_name == client_protocol_version_name @@ -2979,7 +3058,7 @@ class TestConnection(object): assert 2 == len(data) -class TestConnectionGetCipherList(object): +class TestConnectionGetCipherList: """ Tests for `Connection.get_cipher_list`. """ @@ -3002,10 +3081,10 @@ class VeryLarge(bytes): """ def __len__(self): - return 2 ** 31 + return 2**31 -class TestConnectionSend(object): +class TestConnectionSend: """ Tests for `Connection.send`. """ @@ -3067,20 +3146,8 @@ class TestConnectionSend(object): assert count == 2 assert client.recv(2) == b"xy" - @skip_if_py3 - def test_short_buffer(self): - """ - When passed a buffer containing a small number of bytes, - `Connection.send` transmits all of them and returns the number - of bytes sent. - """ - server, client = loopback() - count = server.send(buffer(b"xy")) # noqa: F821 - assert count == 2 - assert client.recv(2) == b"xy" - @pytest.mark.skipif( - sys.maxsize < 2 ** 31, + sys.maxsize < 2**31, reason="sys.maxsize < 2**31 - test requires 64 bit", ) def test_buf_too_large(self): @@ -3103,7 +3170,7 @@ def _make_memoryview(size): return memoryview(bytearray(size)) -class TestConnectionRecvInto(object): +class TestConnectionRecvInto: """ Tests for `Connection.recv_into`. """ @@ -3226,7 +3293,7 @@ class TestConnectionRecvInto(object): self._doesnt_overfill_test(_make_memoryview) -class TestConnectionSendall(object): +class TestConnectionSendall: """ Tests for `Connection.sendall`. """ @@ -3274,17 +3341,6 @@ class TestConnectionSendall(object): server.sendall(memoryview(b"x")) assert client.recv(1) == b"x" - @skip_if_py3 - def test_short_buffers(self): - """ - When passed a buffer containing a small number of bytes, - `Connection.sendall` transmits all of them. - """ - server, client = loopback() - count = server.sendall(buffer(b"xy")) # noqa: F821 - assert count == 2 - assert client.recv(2) == b"xy" - def test_long(self): """ `Connection.sendall` transmits all the bytes in the string passed to it @@ -3319,7 +3375,7 @@ class TestConnectionSendall(object): assert err.value.args[0] == EPIPE -class TestConnectionRenegotiate(object): +class TestConnectionRenegotiate: """ Tests for SSL renegotiation APIs. """ @@ -3363,7 +3419,7 @@ class TestConnectionRenegotiate(object): pass -class TestError(object): +class TestError: """ Unit tests for `OpenSSL.SSL.Error`. """ @@ -3376,7 +3432,7 @@ class TestError(object): assert Error.__name__ == "Error" -class TestConstants(object): +class TestConstants: """ Tests for the values of constants exposed in `OpenSSL.SSL`. @@ -3493,7 +3549,7 @@ class TestConstants(object): assert 0x300 == SESS_CACHE_NO_INTERNAL -class TestMemoryBIO(object): +class TestMemoryBIO: """ Tests for `OpenSSL.SSL.Connection` using a memory BIO. """ @@ -3666,7 +3722,7 @@ class TestMemoryBIO(object): interact_in_memory(client, server) - size = 2 ** 15 + size = 2**15 sent = client.send(b"x" * size) # Sanity check. We're trying to test what happens when the entire # input can't be sent. If the entire input was sent, this test is @@ -3917,7 +3973,7 @@ class TestMemoryBIO(object): self._check_client_ca_list(set_replaces_add_ca) -class TestInfoConstants(object): +class TestInfoConstants: """ Tests for assorted constants exposed for use in info callbacks. """ @@ -3960,7 +4016,7 @@ class TestInfoConstants(object): assert const is None or isinstance(const, int) -class TestRequires(object): +class TestRequires: """ Tests for the decorator factory used to conditionally raise NotImplementedError when older OpenSSLs are used. @@ -3999,7 +4055,7 @@ class TestRequires(object): assert "Error text" in str(e.value) -class TestOCSP(object): +class TestOCSP: """ Tests for PyOpenSSL's OCSP stapling support. """ @@ -4243,3 +4299,188 @@ class TestOCSP(object): with pytest.raises(TypeError): handshake_in_memory(client, server) + + +class TestDTLS: + # The way you would expect DTLSv1_listen to work is: + # + # - it reads packets in a loop + # - when it finds a valid ClientHello, it returns + # - now the handshake can proceed + # + # However, on older versions of OpenSSL, it did something "cleverer". The + # way it worked is: + # + # - it "peeks" into the BIO to see the next packet without consuming it + # - if *not* a valid ClientHello, then it reads the packet to consume it + # and loops around + # - if it *is* a valid ClientHello, it *leaves the packet in the BIO*, and + # returns + # - then the handshake finds the ClientHello in the BIO and reads it a + # second time. + # + # I'm not sure exactly when this switched over. The OpenSSL v1.1.1 in + # Ubuntu 18.04 has the old behavior. The OpenSSL v1.1.1 in Ubuntu 20.04 has + # the new behavior. There doesn't seem to be any mention of this change in + # the OpenSSL v1.1.1 changelog, but presumably it changed in some point + # release or another. Presumably in 2025 or so there will be only new + # OpenSSLs around we can delete this whole comment and the weird + # workaround. If anyone is still using this library by then, which seems + # both depressing and inevitable. + # + # Anyway, why do we care? The reason is that the old strategy has a + # problem: the "peek" operation is only defined on "DGRAM BIOs", which are + # a special type of object that is different from the more familiar "socket + # BIOs" and "memory BIOs". If you *don't* have a DGRAM BIO, and you try to + # peek into the BIO... then it silently degrades to a full-fledged "read" + # operation that consumes the packet. Which is a problem if your algorithm + # depends on leaving the packet in the BIO to be read again later. + # + # So on old OpenSSL, we have a problem: + # + # - we can't use a DGRAM BIO, because cryptography/pyopenssl don't wrap the + # relevant APIs, nor should they. + # + # - if we use a socket BIO, then the first time DTLSv1_listen sees an + # invalid packet (like for example... the challenge packet that *every + # DTLS handshake starts with before the real ClientHello!*), it tries to + # first "peek" it, and then "read" it. But since the first "peek" + # consumes the packet, the second "read" ends up hanging or consuming + # some unrelated packet, which is undesirable. So you can't even get to + # the handshake stage successfully. + # + # - if we use a memory BIO, then DTLSv1_listen works OK on invalid packets + # -- first the "peek" consumes them, and then it tries to "read" again to + # consume them, which fails immediately, and OpenSSL ignores the failure. + # So it works by accident. BUT, when we get a valid ClientHello, we have + # a problem: DTLSv1_listen tries to "peek" it and then leave it in the + # read BIO for do_handshake to consume. But instead "peek" consumes the + # packet, so it's not there where do_handshake is expecting it, and the + # handshake fails. + # + # Fortunately (if that's the word), we can work around the memory BIO + # problem. (Which is good, because in real life probably all our users will + # be using memory BIOs.) All we have to do is to save the valid ClientHello + # before calling DTLSv1_listen, and then after it returns we push *a second + # copy of it* of the packet memory BIO before calling do_handshake. This + # fakes out OpenSSL and makes it think the "peek" operation worked + # correctly, and we can go on with our lives. + # + # In fact, we push the second copy of the ClientHello unconditionally. On + # new versions of OpenSSL, this is unnecessary, but harmless, because the + # DTLS state machine treats it like a network hiccup that duplicated a + # packet, which DTLS is robust against. + def test_it_works_at_all(self): + # arbitrary number larger than any conceivable handshake volley + LARGE_BUFFER = 65536 + + s_ctx = Context(DTLS_METHOD) + + def generate_cookie(ssl): + return b"xyzzy" + + def verify_cookie(ssl, cookie): + return cookie == b"xyzzy" + + s_ctx.set_cookie_generate_callback(generate_cookie) + s_ctx.set_cookie_verify_callback(verify_cookie) + s_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem)) + s_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem)) + s_ctx.set_options(OP_NO_QUERY_MTU) + s = Connection(s_ctx) + s.set_accept_state() + + c_ctx = Context(DTLS_METHOD) + c_ctx.set_options(OP_NO_QUERY_MTU) + c = Connection(c_ctx) + c.set_connect_state() + + # These are mandatory, because openssl can't guess the MTU for a memory + # bio and will produce a mysterious error if you make it try. + c.set_ciphertext_mtu(1500) + s.set_ciphertext_mtu(1500) + + latest_client_hello = None + + def pump_membio(label, source, sink): + try: + chunk = source.bio_read(LARGE_BUFFER) + except WantReadError: + return False + # I'm not sure this check is needed, but I'm not sure it's *not* + # needed either: + if not chunk: # pragma: no cover + return False + # Gross hack: if this is a ClientHello, save it so we can find it + # later. See giant comment above. + try: + # if ContentType == handshake and HandshakeType == + # client_hello: + if chunk[0] == 22 and chunk[13] == 1: + nonlocal latest_client_hello + latest_client_hello = chunk + except IndexError: # pragma: no cover + pass + print(f"{label}: {chunk.hex()}") + sink.bio_write(chunk) + return True + + def pump(): + # Raises if there was no data to pump, to avoid infinite loops if + # we aren't making progress. + assert pump_membio("s -> c", s, c) or pump_membio("c -> s", c, s) + + c_handshaking = True + s_listening = True + s_handshaking = False + first = True + while c_handshaking or s_listening or s_handshaking: + if not first: + pump() + first = False + + if c_handshaking: + try: + c.do_handshake() + except WantReadError: + pass + else: + c_handshaking = False + + if s_listening: + try: + s.DTLSv1_listen() + except WantReadError: + pass + else: + s_listening = False + s_handshaking = True + # Write the duplicate ClientHello. See giant comment above. + s.bio_write(latest_client_hello) + + if s_handshaking: + try: + s.do_handshake() + except WantReadError: + pass + else: + s_handshaking = False + + s.write(b"hello") + pump() + assert c.read(100) == b"hello" + c.write(b"goodbye") + pump() + assert s.read(100) == b"goodbye" + + # Check that the MTU set/query functions are doing *something* + c.set_ciphertext_mtu(1000) + try: + assert 500 < c.get_cleartext_mtu() < 1000 + except NotImplementedError: # OpenSSL 1.1.0 and earlier + pass + c.set_ciphertext_mtu(500) + try: + assert 0 < c.get_cleartext_mtu() < 500 + except NotImplementedError: # OpenSSL 1.1.0 and earlier + pass diff --git a/contrib/python/pyOpenSSL/py3/tests/test_util.py b/contrib/python/pyOpenSSL/py3/tests/test_util.py index 6224448d433..e029d71a719 100644 --- a/contrib/python/pyOpenSSL/py3/tests/test_util.py +++ b/contrib/python/pyOpenSSL/py3/tests/test_util.py @@ -3,7 +3,7 @@ import pytest from OpenSSL._util import exception_from_error_queue, lib -class TestErrors(object): +class TestErrors: """ Tests for handling of certain OpenSSL error cases. """ diff --git a/contrib/python/pyOpenSSL/py3/tests/util.py b/contrib/python/pyOpenSSL/py3/tests/util.py index 75d2c8de80e..b4649e33992 100644 --- a/contrib/python/pyOpenSSL/py3/tests/util.py +++ b/contrib/python/pyOpenSSL/py3/tests/util.py @@ -6,8 +6,6 @@ Helpers for the OpenSSL test suite, largely copied from U{Twisted<http://twistedmatrix.com/>}. """ -from six import PY2 - # This is the UTF-8 encoding of the SNOWMAN unicode code point. NON_ASCII = b"\xe2\x98\x83".decode("utf-8") @@ -32,7 +30,7 @@ def is_consistent_type(theType, name, *constructionArgs): return True -class EqualityTestsMixin(object): +class EqualityTestsMixin: """ A mixin defining tests for the standard implementation of C{==} and C{!=}. """ @@ -128,7 +126,7 @@ class EqualityTestsMixin(object): operand if it is of an unrelated type. """ - class Delegate(object): + class Delegate: def __eq__(self, other): # Do something crazy and obvious. return [self] @@ -143,7 +141,7 @@ class EqualityTestsMixin(object): operand if it is of an unrelated type. """ - class Delegate(object): + class Delegate: def __ne__(self, other): # Do something crazy and obvious. return [self] @@ -154,7 +152,4 @@ class EqualityTestsMixin(object): # The type name expected in warnings about using the wrong string type. -if PY2: - WARNING_TYPE_EXPECTED = "unicode" -else: - WARNING_TYPE_EXPECTED = "str" +WARNING_TYPE_EXPECTED = "str" |
