summaryrefslogtreecommitdiffstats
path: root/contrib/python/pyOpenSSL/py3/tests
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/python/pyOpenSSL/py3/tests')
-rw-r--r--contrib/python/pyOpenSSL/py3/tests/memdbg.py1
-rw-r--r--contrib/python/pyOpenSSL/py3/tests/test_crypto.py363
-rw-r--r--contrib/python/pyOpenSSL/py3/tests/test_debug.py2
-rw-r--r--contrib/python/pyOpenSSL/py3/tests/test_rand.py2
-rw-r--r--contrib/python/pyOpenSSL/py3/tests/test_ssl.py709
-rw-r--r--contrib/python/pyOpenSSL/py3/tests/test_util.py2
-rw-r--r--contrib/python/pyOpenSSL/py3/tests/util.py13
7 files changed, 701 insertions, 391 deletions
diff --git a/contrib/python/pyOpenSSL/py3/tests/memdbg.py b/contrib/python/pyOpenSSL/py3/tests/memdbg.py
index 590b72d0682..0dd8c318e4e 100644
--- a/contrib/python/pyOpenSSL/py3/tests/memdbg.py
+++ b/contrib/python/pyOpenSSL/py3/tests/memdbg.py
@@ -1,5 +1,4 @@
import sys
-
import traceback
from cffi import api as _api
diff --git a/contrib/python/pyOpenSSL/py3/tests/test_crypto.py b/contrib/python/pyOpenSSL/py3/tests/test_crypto.py
index ef3429d17fd..44bbd0f2aba 100644
--- a/contrib/python/pyOpenSSL/py3/tests/test_crypto.py
+++ b/contrib/python/pyOpenSSL/py3/tests/test_crypto.py
@@ -4,56 +4,65 @@
"""
Unit tests for :py:mod:`OpenSSL.crypto`.
"""
-
-from warnings import simplefilter
-
import base64
-from subprocess import PIPE, Popen
-from datetime import datetime, timedelta
import sys
-
-import pytest
+from datetime import datetime, timedelta
+from subprocess import PIPE, Popen
+from warnings import simplefilter
from cryptography import x509
-from cryptography.hazmat.backends.openssl.backend import backend
from cryptography.hazmat.primitives import serialization
-from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives.asymmetric import ec, ed25519, ed448, rsa
import flaky
-from OpenSSL.crypto import TYPE_RSA, TYPE_DSA, Error, PKey
-from OpenSSL.crypto import X509, X509Name
+import pytest
+
+from OpenSSL._util import ffi as _ffi, lib as _lib
from OpenSSL.crypto import (
+ CRL,
+ Error,
+ FILETYPE_ASN1,
+ FILETYPE_PEM,
+ FILETYPE_TEXT,
+ NetscapeSPKI,
+ PKCS12,
+ PKCS7,
+ PKey,
+ Revoked,
+ TYPE_DSA,
+ TYPE_RSA,
+ X509,
+ X509Extension,
+ X509Name,
+ X509Req,
X509Store,
- X509StoreFlags,
X509StoreContext,
X509StoreContextError,
-)
-from OpenSSL.crypto import X509Req
-from OpenSSL.crypto import X509Extension
-from OpenSSL.crypto import load_certificate, load_privatekey
-from OpenSSL.crypto import load_publickey, dump_publickey
-from OpenSSL.crypto import FILETYPE_PEM, FILETYPE_ASN1, FILETYPE_TEXT
-from OpenSSL.crypto import dump_certificate, load_certificate_request
-from OpenSSL.crypto import dump_certificate_request, dump_privatekey
-from OpenSSL.crypto import PKCS7, load_pkcs7_data
-from OpenSSL.crypto import PKCS12, load_pkcs12
-from OpenSSL.crypto import CRL, Revoked, dump_crl, load_crl
-from OpenSSL.crypto import NetscapeSPKI
-from OpenSSL.crypto import (
- sign,
- verify,
+ X509StoreFlags,
+ dump_certificate,
+ dump_certificate_request,
+ dump_crl,
+ dump_privatekey,
+ dump_publickey,
get_elliptic_curve,
get_elliptic_curves,
+ load_certificate,
+ load_certificate_request,
+ load_crl,
+ load_pkcs12,
+ load_pkcs7_data,
+ load_privatekey,
+ load_publickey,
+ sign,
+ verify,
)
-from OpenSSL._util import ffi as _ffi, lib as _lib
-
from .util import (
EqualityTestsMixin,
- is_consistent_type,
- WARNING_TYPE_EXPECTED,
NON_ASCII,
+ WARNING_TYPE_EXPECTED,
+ is_consistent_type,
)
@@ -64,7 +73,7 @@ def normalize_privatekey_pem(pem):
GOOD_CIPHER = "blowfish"
BAD_CIPHER = "zippers"
-GOOD_DIGEST = "SHA1"
+GOOD_DIGEST = "SHA256"
BAD_DIGEST = "monkeys"
old_root_cert_pem = b"""-----BEGIN CERTIFICATE-----
@@ -609,9 +618,10 @@ vrzEeLDRiiPl92dyyWmu
-----END X509 CRL-----
"""
+# The signature on this CRL is invalid.
crlDataUnsupportedExtension = b"""\
-----BEGIN X509 CRL-----
-MIIGRzCCBS8CAQIwDQYJKoZIhvcNAQELBQAwJzELMAkGA1UEBhMCVVMxGDAWBgNV
+MIIGRzCCBS8CAQEwDQYJKoZIhvcNAQELBQAwJzELMAkGA1UEBhMCVVMxGDAWBgNV
BAMMD2NyeXB0b2dyYXBoeS5pbxgPMjAxNTAxMDEwMDAwMDBaGA8yMDE2MDEwMTAw
MDAwMFowggTOMBQCAQAYDzIwMTUwMTAxMDAwMDAwWjByAgEBGA8yMDE1MDEwMTAw
MDAwMFowXDAYBgNVHRgEERgPMjAxNTAxMDEwMDAwMDBaMDQGA1UdHQQtMCukKTAn
@@ -783,6 +793,21 @@ MBsCAQACAS0CAQcCAQACAQ8CAQMCAQACAQACAQA=
-----END RSA PRIVATE KEY-----
"""
+ed25519_private_key_pem = b"""-----BEGIN PRIVATE KEY-----
+MC4CAQAwBQYDK2VwBCIEIKlxBbhVsSURoLTmsu9uTqYH6oF7zpxmp1ZQCAPhDmI2
+-----END PRIVATE KEY-----
+"""
+
+ed448_private_key_pem = b"""-----BEGIN PRIVATE KEY-----
+MEcCAQAwBQYDK2VxBDsEOcqZ7a3k6JwrJbYO8CNTPT/d7dlWCo5vCf0EYDj79ZvA\nhD8u9EPHlYJw5Y8ZQdH4WmVEfpKA23xkdQ==
+-----END PRIVATE KEY-----
+"""
+
+x25519_private_key_pem = b"""-----BEGIN PRIVATE KEY-----
+MC4CAQAwBQYDK2VuBCIEIPAjVfPNTm25VxtBRg+JjjFx9tA3M8aaBdVhjb92iBts
+-----END PRIVATE KEY-----
+"""
+
@pytest.fixture
def x509_data():
@@ -809,7 +834,7 @@ def x509_data():
yield pkey, x509
-class TestX509Ext(object):
+class TestX509Ext:
"""
Tests for `OpenSSL.crypto.X509Extension`.
"""
@@ -914,7 +939,7 @@ class TestX509Ext(object):
b"basicConstraints", False, b"CA:TRUE", subject=x509
)
x509.add_extensions([ext1])
- x509.sign(pkey, "sha1")
+ x509.sign(pkey, "sha256")
# This is a little lame. Can we think of a better way?
text = dump_certificate(FILETYPE_TEXT, x509)
assert b"X509v3 Basic Constraints:" in text
@@ -930,7 +955,7 @@ class TestX509Ext(object):
b"subjectKeyIdentifier", False, b"hash", subject=x509
)
x509.add_extensions([ext3])
- x509.sign(pkey, "sha1")
+ x509.sign(pkey, "sha256")
text = dump_certificate(FILETYPE_TEXT, x509)
assert b"X509v3 Subject Key Identifier:" in text
@@ -963,7 +988,7 @@ class TestX509Ext(object):
b"basicConstraints", False, b"CA:TRUE", issuer=x509
)
x509.add_extensions([ext1])
- x509.sign(pkey, "sha1")
+ x509.sign(pkey, "sha256")
text = dump_certificate(FILETYPE_TEXT, x509)
assert b"X509v3 Basic Constraints:" in text
assert b"CA:TRUE" in text
@@ -978,7 +1003,7 @@ class TestX509Ext(object):
b"authorityKeyIdentifier", False, b"issuer:always", issuer=x509
)
x509.add_extensions([ext2])
- x509.sign(pkey, "sha1")
+ x509.sign(pkey, "sha256")
text = dump_certificate(FILETYPE_TEXT, x509)
assert b"X509v3 Authority Key Identifier:" in text
assert b"DirName:/CN=Yoda root CA" in text
@@ -1008,22 +1033,40 @@ class TestX509Ext(object):
)
-class TestPKey(object):
+class TestPKey:
"""
Tests for `OpenSSL.crypto.PKey`.
"""
- def test_convert_from_cryptography_private_key(self):
+ @pytest.mark.parametrize(
+ ("key_string", "key_type"),
+ [
+ (intermediate_key_pem, rsa.RSAPrivateKey),
+ (ec_private_key_pem, ec.EllipticCurvePrivateKey),
+ (ed25519_private_key_pem, ed25519.Ed25519PrivateKey),
+ (ed448_private_key_pem, ed448.Ed448PrivateKey),
+ ],
+ )
+ def test_convert_roundtrip_cryptography_private_key(
+ self, key_string, key_type
+ ):
"""
PKey.from_cryptography_key creates a proper private PKey.
+ PKey.to_cryptography_key creates a proper cryptography private key.
"""
- key = serialization.load_pem_private_key(
- intermediate_key_pem, None, backend
- )
+ key = serialization.load_pem_private_key(key_string, None)
pkey = PKey.from_cryptography_key(key)
assert isinstance(pkey, PKey)
- assert pkey.bits() == key.key_size
+ parsed_key = pkey.to_cryptography_key()
+ assert isinstance(parsed_key, key_type)
+ assert parsed_key.public_key().public_bytes(
+ serialization.Encoding.PEM,
+ serialization.PublicFormat.SubjectPublicKeyInfo,
+ ) == key.public_key().public_bytes(
+ serialization.Encoding.PEM,
+ serialization.PublicFormat.SubjectPublicKeyInfo,
+ )
assert pkey._only_public is False
assert pkey._initialized is True
@@ -1031,7 +1074,7 @@ class TestPKey(object):
"""
PKey.from_cryptography_key creates a proper public PKey.
"""
- key = serialization.load_pem_public_key(cleartextPublicKeyPEM, backend)
+ key = serialization.load_pem_public_key(cleartextPublicKeyPEM)
pkey = PKey.from_cryptography_key(key)
assert isinstance(pkey, PKey)
@@ -1043,9 +1086,7 @@ class TestPKey(object):
"""
PKey.from_cryptography_key raises TypeError with an unsupported type.
"""
- key = serialization.load_pem_private_key(
- ec_private_key_pem, None, backend
- )
+ key = serialization.load_pem_private_key(x25519_private_key_pem, None)
with pytest.raises(TypeError):
PKey.from_cryptography_key(key)
@@ -1059,16 +1100,6 @@ class TestPKey(object):
assert isinstance(key, rsa.RSAPublicKey)
assert pkey.bits() == key.key_size
- def test_convert_private_pkey_to_cryptography_key(self):
- """
- PKey.to_cryptography_key creates a proper cryptography private key.
- """
- pkey = load_privatekey(FILETYPE_PEM, root_key_pem)
- key = pkey.to_cryptography_key()
-
- assert isinstance(key, rsa.RSAPrivateKey)
- assert pkey.bits() == key.key_size
-
def test_type(self):
"""
`PKey` can be used to create instances of that type.
@@ -1175,10 +1206,11 @@ class TestPKey(object):
def test_inconsistent_key(self):
"""
- `PKey.check` returns `Error` if the key is not consistent.
+ Either `load_privatekey` or `PKey.check` returns `Error` if the key is
+ not consistent.
"""
- key = load_privatekey(FILETYPE_PEM, inconsistentPrivateKeyPEM)
with pytest.raises(Error):
+ key = load_privatekey(FILETYPE_PEM, inconsistentPrivateKeyPEM)
key.check()
def test_check_public_key(self):
@@ -1197,10 +1229,11 @@ class TestPKey(object):
def test_check_pr_897(self):
"""
- `PKey.check` raises `OpenSSL.crypto.Error` if provided with broken key
+ Either `load_privatekey` or `PKey.check` raises `OpenSSL.crypto.Error`
+ if provided with broken key
"""
- pkey = load_privatekey(FILETYPE_PEM, rsa_p_not_prime_pem)
with pytest.raises(Error):
+ pkey = load_privatekey(FILETYPE_PEM, rsa_p_not_prime_pem)
pkey.check()
@@ -1222,7 +1255,7 @@ def x509_name(**attrs):
return name
-class TestX509Name(object):
+class TestX509Name:
"""
Unit tests for `OpenSSL.crypto.X509Name`.
"""
@@ -1398,6 +1431,19 @@ class TestX509Name(object):
# other X509Name.
assert_greater_than(x509_name(CN="def"), x509_name(CN="abc"))
+ def assert_raises(a, b):
+ with pytest.raises(TypeError):
+ a < b
+ with pytest.raises(TypeError):
+ a <= b
+ with pytest.raises(TypeError):
+ a > b
+ with pytest.raises(TypeError):
+ a >= b
+
+ # Only X509Name objects can be compared with lesser than / greater than
+ assert_raises(x509_name(), object())
+
def test_hash(self):
"""
`X509Name.hash` returns an integer hash based on the value of the name.
@@ -1555,14 +1601,20 @@ class TestX509Req(_PKeyInteractionTestsMixin):
"""
`X509Req.set_version` sets the X.509 version of the certificate
request. `X509Req.get_version` returns the X.509 version of the
- certificate request. The initial value of the version is 0.
+ certificate request. The only defined version is 0. Others may or
+ may not be supported depending on backend.
"""
request = X509Req()
assert request.get_version() == 0
- request.set_version(1)
- assert request.get_version() == 1
- request.set_version(3)
- assert request.get_version() == 3
+ request.set_version(0)
+ assert request.get_version() == 0
+ try:
+ request.set_version(1)
+ assert request.get_version() == 1
+ request.set_version(3)
+ assert request.get_version() == 3
+ except Error:
+ pass
def test_version_wrong_args(self):
"""
@@ -1686,9 +1738,7 @@ class TestX509Req(_PKeyInteractionTestsMixin):
assert request.verify(pkey)
def test_convert_from_cryptography(self):
- crypto_req = x509.load_pem_x509_csr(
- cleartextCertificateRequestPEM, backend
- )
+ crypto_req = x509.load_pem_x509_csr(cleartextCertificateRequestPEM)
req = X509Req.from_cryptography(crypto_req)
assert isinstance(req, X509Req)
@@ -1752,8 +1802,8 @@ class TestX509(_PKeyInteractionTestsMixin):
`X509.get_version` retrieves it.
"""
cert = X509()
- cert.set_version(1234)
- assert cert.get_version() == 1234
+ cert.set_version(2)
+ assert cert.get_version() == 2
def test_serial_number(self):
"""
@@ -1767,12 +1817,12 @@ class TestX509(_PKeyInteractionTestsMixin):
assert certificate.get_serial_number() == 0
certificate.set_serial_number(1)
assert certificate.get_serial_number() == 1
- certificate.set_serial_number(2 ** 32 + 1)
- assert certificate.get_serial_number() == 2 ** 32 + 1
- certificate.set_serial_number(2 ** 64 + 1)
- assert certificate.get_serial_number() == 2 ** 64 + 1
- certificate.set_serial_number(2 ** 128 + 1)
- assert certificate.get_serial_number() == 2 ** 128 + 1
+ certificate.set_serial_number(2**32 + 1)
+ assert certificate.get_serial_number() == 2**32 + 1
+ certificate.set_serial_number(2**64 + 1)
+ assert certificate.get_serial_number() == 2**64 + 1
+ certificate.set_serial_number(2**128 + 1)
+ assert certificate.get_serial_number() == 2**128 + 1
def _setBoundTest(self, which):
"""
@@ -1920,6 +1970,14 @@ class TestX509(_PKeyInteractionTestsMixin):
cert.gmtime_adj_notAfter(2)
assert not cert.has_expired()
+ def test_has_expired_exception(self):
+ """
+ `X509.has_expired` throws ValueError if not-after time is not set
+ """
+ cert = X509()
+ with pytest.raises(ValueError):
+ cert.has_expired()
+
def test_root_has_not_expired(self):
"""
`X509.has_expired` returns `False` if the certificate's not-after time
@@ -1935,13 +1993,13 @@ class TestX509(_PKeyInteractionTestsMixin):
"""
cert = load_certificate(FILETYPE_PEM, old_root_cert_pem)
assert (
- # This is MD5 instead of GOOD_DIGEST because the digest algorithm
- # actually matters to the assertion (ie, another arbitrary, good
- # digest will not product the same digest).
# Digest verified with the command:
- # openssl x509 -in root_cert.pem -noout -fingerprint -md5
- cert.digest("MD5")
- == b"19:B3:05:26:2B:F8:F2:FF:0B:8F:21:07:A8:28:B8:75"
+ # openssl x509 -in root_cert.pem -noout -fingerprint -sha256
+ cert.digest("SHA256")
+ == (
+ b"3E:0F:16:39:6B:B1:3E:4F:08:85:C6:5F:10:0D:CB:2C:"
+ b"25:C2:91:4E:D0:4A:C2:29:06:BD:55:E3:A7:B3:B7:06"
+ )
)
def _extcert(self, pkey, extensions):
@@ -1957,7 +2015,7 @@ class TestX509(_PKeyInteractionTestsMixin):
cert.set_notAfter(when)
cert.add_extensions(extensions)
- cert.sign(pkey, "sha1")
+ cert.sign(pkey, "sha256")
return load_certificate(
FILETYPE_PEM, dump_certificate(FILETYPE_PEM, cert)
)
@@ -2036,8 +2094,8 @@ class TestX509(_PKeyInteractionTestsMixin):
b"DNS:altnull.python.org\x00example.com, "
b"URI:http://null.python.org\x00http://example.org, "
- b"IP Address:192.0.2.1, IP Address:2001:DB8:0:0:0:0:0:1\n"
- == str(ext).encode("ascii")
+ b"IP Address:192.0.2.1, IP Address:2001:DB8:0:0:0:0:0:1"
+ == str(ext).encode("ascii").strip()
)
def test_invalid_digest_algorithm(self):
@@ -2204,9 +2262,7 @@ tgI5
cert.sign(object(), b"sha256")
def test_convert_from_cryptography(self):
- crypto_cert = x509.load_pem_x509_certificate(
- intermediate_cert_pem, backend
- )
+ crypto_cert = x509.load_pem_x509_certificate(intermediate_cert_pem)
cert = X509.from_cryptography(crypto_cert)
assert isinstance(cert, X509)
@@ -2224,7 +2280,7 @@ tgI5
assert crypto_cert.version.value == cert.get_version()
-class TestX509Store(object):
+class TestX509Store:
"""
Test for `OpenSSL.crypto.X509Store`.
"""
@@ -2295,7 +2351,7 @@ class TestX509Store(object):
def test_load_locations_parameters(
self, cafile, capath, call_cafile, call_capath, monkeypatch
):
- class LibMock(object):
+ class LibMock:
def load_locations(self, store, cafile, capath):
self.cafile = cafile
self.capath = capath
@@ -2326,7 +2382,7 @@ class TestX509Store(object):
store.load_locations(cafile=str(invalid_ca_file))
-class TestPKCS12(object):
+class TestPKCS12:
"""
Test for `OpenSSL.crypto.PKCS12` and `OpenSSL.crypto.load_pkcs12`.
"""
@@ -2467,7 +2523,7 @@ class TestPKCS12(object):
b"-nodes",
b"-passin",
b"pass:" + passwd,
- *extra
+ *extra,
)
assert recovered_key[-len(key) :] == key
if cert:
@@ -2479,7 +2535,7 @@ class TestPKCS12(object):
b"-passin",
b"pass:" + passwd,
b"-nokeys",
- *extra
+ *extra,
)
assert recovered_cert[-len(cert) :] == cert
if ca:
@@ -2491,7 +2547,7 @@ class TestPKCS12(object):
b"-passin",
b"pass:" + passwd,
b"-nokeys",
- *extra
+ *extra,
)
assert recovered_cert[-len(ca) :] == ca
@@ -2802,7 +2858,7 @@ def _runopenssl(pem, *args):
return output
-class TestLoadPublicKey(object):
+class TestLoadPublicKey:
"""
Tests for :func:`load_publickey`.
"""
@@ -2842,7 +2898,7 @@ class TestLoadPublicKey(object):
assert dumped_pem == cleartextPublicKeyPEM
-class TestFunction(object):
+class TestFunction:
"""
Tests for free-functions in the `OpenSSL.crypto` module.
"""
@@ -3263,7 +3319,7 @@ class TestFunction(object):
load_pkcs7_data(object(), b"foo")
-class TestLoadCertificate(object):
+class TestLoadCertificate:
"""
Tests for `load_certificate_request`.
"""
@@ -3287,7 +3343,7 @@ class TestLoadCertificate(object):
load_certificate(FILETYPE_ASN1, b"lol")
-class TestPKCS7(object):
+class TestPKCS7:
"""
Tests for `PKCS7`.
"""
@@ -3387,7 +3443,7 @@ class TestNetscapeSPKI(_PKeyInteractionTestsMixin):
assert isinstance(blob, bytes)
-class TestRevoked(object):
+class TestRevoked:
"""
Tests for `OpenSSL.crypto.Revoked`.
"""
@@ -3477,7 +3533,7 @@ class TestRevoked(object):
revoked.set_reason(None)
assert revoked.get_reason() is None
- @pytest.mark.parametrize("reason", [object(), 1.0, u"foo"])
+ @pytest.mark.parametrize("reason", [object(), 1.0, "foo"])
def test_set_reason_wrong_args(self, reason):
"""
`Revoked.set_reason` raises `TypeError` if called with an argument
@@ -3497,7 +3553,7 @@ class TestRevoked(object):
revoked.set_reason(b"blue")
-class TestCRL(object):
+class TestCRL:
"""
Tests for `OpenSSL.crypto.CRL`.
"""
@@ -3548,17 +3604,17 @@ class TestCRL(object):
dumped_crl = self._get_crl().export(
self.cert, self.pkey, days=20, digest=b"sha256"
)
- crl = x509.load_pem_x509_crl(dumped_crl, backend)
+ crl = x509.load_pem_x509_crl(dumped_crl)
revoked = crl.get_revoked_certificate_by_serial_number(0x03AB)
assert revoked is not None
assert crl.issuer == x509.Name(
[
- x509.NameAttribute(x509.NameOID.COUNTRY_NAME, u"US"),
- x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, u"IL"),
- x509.NameAttribute(x509.NameOID.LOCALITY_NAME, u"Chicago"),
- x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, u"Testing"),
+ x509.NameAttribute(x509.NameOID.COUNTRY_NAME, "US"),
+ x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, "IL"),
+ x509.NameAttribute(x509.NameOID.LOCALITY_NAME, "Chicago"),
+ x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, "Testing"),
x509.NameAttribute(
- x509.NameOID.COMMON_NAME, u"Testing Root CA"
+ x509.NameOID.COMMON_NAME, "Testing Root CA"
),
]
)
@@ -3573,19 +3629,19 @@ class TestCRL(object):
# DER format
dumped_crl = self._get_crl().export(
- self.cert, self.pkey, FILETYPE_ASN1, digest=b"md5"
+ self.cert, self.pkey, FILETYPE_ASN1, digest=b"sha256"
)
- crl = x509.load_der_x509_crl(dumped_crl, backend)
+ crl = x509.load_der_x509_crl(dumped_crl)
revoked = crl.get_revoked_certificate_by_serial_number(0x03AB)
assert revoked is not None
assert crl.issuer == x509.Name(
[
- x509.NameAttribute(x509.NameOID.COUNTRY_NAME, u"US"),
- x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, u"IL"),
- x509.NameAttribute(x509.NameOID.LOCALITY_NAME, u"Chicago"),
- x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, u"Testing"),
+ x509.NameAttribute(x509.NameOID.COUNTRY_NAME, "US"),
+ x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, "IL"),
+ x509.NameAttribute(x509.NameOID.LOCALITY_NAME, "Chicago"),
+ x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, "Testing"),
x509.NameAttribute(
- x509.NameOID.COMMON_NAME, u"Testing Root CA"
+ x509.NameOID.COMMON_NAME, "Testing Root CA"
),
]
)
@@ -3600,7 +3656,7 @@ class TestCRL(object):
# text format
dumped_text = crl.export(
- self.cert, self.pkey, type=FILETYPE_TEXT, digest=b"md5"
+ self.cert, self.pkey, type=FILETYPE_TEXT, digest=b"sha256"
)
assert len(dumped_text) > 500
@@ -3610,9 +3666,9 @@ class TestCRL(object):
signature algorithm based on that digest function.
"""
crl = self._get_crl()
- dumped_crl = crl.export(self.cert, self.pkey, digest=b"sha1")
+ dumped_crl = crl.export(self.cert, self.pkey, digest=b"sha384")
text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text")
- text.index(b"Signature Algorithm: sha1")
+ text.index(b"Signature Algorithm: sha384")
def test_export_md5_digest(self):
"""
@@ -3794,7 +3850,9 @@ class TestCRL(object):
crl.add_revoked(revoked)
crl.set_version(1)
crl.set_lastUpdate(b"20140601000000Z")
- crl.set_nextUpdate(b"20180601000000Z")
+ # The year 5000 is far into the future so that this CRL isn't
+ # considered to have expired.
+ crl.set_nextUpdate(b"50000601000000Z")
crl.sign(issuer_cert, issuer_key, digest=b"sha512")
return crl
@@ -3820,7 +3878,7 @@ class TestCRL(object):
store_ctx = X509StoreContext(store, self.intermediate_server_cert)
with pytest.raises(X509StoreContextError) as err:
store_ctx.verify_certificate()
- assert err.value.args[0][2] == "certificate revoked"
+ assert str(err.value) == "certificate revoked"
def test_verify_with_missing_crl(self):
"""
@@ -3840,11 +3898,11 @@ class TestCRL(object):
store_ctx = X509StoreContext(store, self.intermediate_server_cert)
with pytest.raises(X509StoreContextError) as err:
store_ctx.verify_certificate()
- assert err.value.args[0][2] == "unable to get certificate CRL"
+ assert str(err.value) == "unable to get certificate CRL"
assert err.value.certificate.get_subject().CN == "intermediate-service"
def test_convert_from_cryptography(self):
- crypto_crl = x509.load_pem_x509_crl(crlData, backend)
+ crypto_crl = x509.load_pem_x509_crl(crlData)
crl = CRL.from_cryptography(crypto_crl)
assert isinstance(crl, CRL)
@@ -3858,7 +3916,7 @@ class TestCRL(object):
assert isinstance(crypto_crl, x509.CertificateRevocationList)
-class TestX509StoreContext(object):
+class TestX509StoreContext:
"""
Tests for `OpenSSL.crypto.X509StoreContext`.
"""
@@ -4051,7 +4109,11 @@ class TestX509StoreContext(object):
with pytest.raises(X509StoreContextError) as exc:
store_ctx.verify_certificate()
- assert exc.value.args[0][2] == "self signed certificate"
+ # OpenSSL 1.1.x and 3.0.x have different error messages
+ assert str(exc.value) in [
+ "self signed certificate",
+ "self-signed certificate",
+ ]
assert exc.value.certificate.get_subject().CN == "Testing Root CA"
def test_invalid_chain_no_root(self):
@@ -4066,7 +4128,7 @@ class TestX509StoreContext(object):
with pytest.raises(X509StoreContextError) as exc:
store_ctx.verify_certificate()
- assert exc.value.args[0][2] == "unable to get issuer certificate"
+ assert str(exc.value) == "unable to get issuer certificate"
assert exc.value.certificate.get_subject().CN == "intermediate"
def test_invalid_chain_no_intermediate(self):
@@ -4081,7 +4143,7 @@ class TestX509StoreContext(object):
with pytest.raises(X509StoreContextError) as exc:
store_ctx.verify_certificate()
- assert exc.value.args[0][2] == "unable to get local issuer certificate"
+ assert str(exc.value) == "unable to get local issuer certificate"
assert exc.value.certificate.get_subject().CN == "intermediate-service"
def test_modification_pre_verify(self):
@@ -4099,7 +4161,7 @@ class TestX509StoreContext(object):
with pytest.raises(X509StoreContextError) as exc:
store_ctx.verify_certificate()
- assert exc.value.args[0][2] == "unable to get issuer certificate"
+ assert str(exc.value) == "unable to get issuer certificate"
assert exc.value.certificate.get_subject().CN == "intermediate"
store_ctx.set_store(store_good)
@@ -4124,7 +4186,7 @@ class TestX509StoreContext(object):
with pytest.raises(X509StoreContextError) as exc:
store_ctx.verify_certificate()
- assert exc.value.args[0][2] == "certificate has expired"
+ assert str(exc.value) == "certificate has expired"
def test_get_verified_chain(self):
"""
@@ -4158,7 +4220,7 @@ class TestX509StoreContext(object):
with pytest.raises(X509StoreContextError) as exc:
store_ctx.get_verified_chain()
- assert exc.value.args[0][2] == "unable to get issuer certificate"
+ assert str(exc.value) == "unable to get issuer certificate"
assert exc.value.certificate.get_subject().CN == "intermediate"
@pytest.fixture
@@ -4223,10 +4285,23 @@ class TestX509StoreContext(object):
with pytest.raises(X509StoreContextError) as exc:
store_ctx.verify_certificate()
- assert exc.value.args[0][2] == "unable to get local issuer certificate"
+ assert str(exc.value) == "unable to get local issuer certificate"
+
+ def test_verify_with_partial_chain(self):
+ store = X509Store()
+ store.add_cert(self.intermediate_cert)
+
+ store_ctx = X509StoreContext(store, self.intermediate_server_cert)
+ with pytest.raises(X509StoreContextError):
+ store_ctx.verify_certificate()
+
+ # Now set the partial verification flag for verification.
+ store.set_flags(X509StoreFlags.PARTIAL_CHAIN)
+ store_ctx = X509StoreContext(store, self.intermediate_server_cert)
+ assert store_ctx.verify_certificate() is None
-class TestSignVerify(object):
+class TestSignVerify:
"""
Tests for `OpenSSL.crypto.sign` and `OpenSSL.crypto.verify`.
"""
@@ -4250,7 +4325,7 @@ class TestSignVerify(object):
# certificate unrelated to priv_key, used to trigger an error
bad_cert = load_certificate(FILETYPE_PEM, server_cert_pem)
- for digest in ["md5", "sha1"]:
+ for digest in ["md5", "sha1", "sha256"]:
sig = sign(priv_key, content, digest)
# Verify the signature of content, will throw an exception if
@@ -4289,7 +4364,7 @@ class TestSignVerify(object):
priv_key = load_privatekey(FILETYPE_PEM, root_key_pem)
cert = load_certificate(FILETYPE_PEM, root_cert_pem)
- for digest in ["md5", "sha1"]:
+ for digest in ["md5", "sha1", "sha256"]:
with pytest.warns(DeprecationWarning) as w:
simplefilter("always")
sig = sign(priv_key, content, digest)
@@ -4319,8 +4394,8 @@ class TestSignVerify(object):
)
priv_key = load_privatekey(FILETYPE_PEM, ec_root_key_pem)
cert = load_certificate(FILETYPE_PEM, ec_root_cert_pem)
- sig = sign(priv_key, content, "sha1")
- verify(cert, sig, content, "sha1")
+ sig = sign(priv_key, content, "sha256")
+ verify(cert, sig, content, "sha256")
def test_sign_nulls(self):
"""
@@ -4329,8 +4404,8 @@ class TestSignVerify(object):
content = b"Watch out! \0 Did you see it?"
priv_key = load_privatekey(FILETYPE_PEM, root_key_pem)
good_cert = load_certificate(FILETYPE_PEM, root_cert_pem)
- sig = sign(priv_key, content, "sha1")
- verify(good_cert, sig, content, "sha1")
+ sig = sign(priv_key, content, "sha256")
+ verify(good_cert, sig, content, "sha256")
def test_sign_with_large_key(self):
"""
@@ -4345,10 +4420,10 @@ class TestSignVerify(object):
)
priv_key = load_privatekey(FILETYPE_PEM, large_key_pem)
- sign(priv_key, content, "sha1")
+ sign(priv_key, content, "sha256")
-class TestEllipticCurve(object):
+class TestEllipticCurve:
"""
Tests for `_EllipticCurve`, `get_elliptic_curve`, and
`get_elliptic_curves`.
@@ -4375,7 +4450,7 @@ class TestEllipticCurve(object):
does not identify a supported curve.
"""
with pytest.raises(ValueError):
- get_elliptic_curve(u"this curve was just invented")
+ get_elliptic_curve("this curve was just invented")
def test_repr(self):
"""
@@ -4399,7 +4474,7 @@ class TestEllipticCurve(object):
curve._to_EC_KEY()
-class EllipticCurveFactory(object):
+class EllipticCurveFactory:
"""
A helper to get the names of two curves.
"""
@@ -4434,7 +4509,7 @@ class TestEllipticCurveEquality(EqualityTestsMixin):
return get_elliptic_curve(self.curve_factory.another_curve_name)
-class TestEllipticCurveHash(object):
+class TestEllipticCurveHash:
"""
Tests for `_EllipticCurve`'s implementation of hashing (thus use as
an item in a `dict` or `set`).
diff --git a/contrib/python/pyOpenSSL/py3/tests/test_debug.py b/contrib/python/pyOpenSSL/py3/tests/test_debug.py
index 2d62a3a56a1..4eeb3d00b31 100644
--- a/contrib/python/pyOpenSSL/py3/tests/test_debug.py
+++ b/contrib/python/pyOpenSSL/py3/tests/test_debug.py
@@ -1,5 +1,5 @@
-from OpenSSL.debug import _env_info
from OpenSSL import version
+from OpenSSL.debug import _env_info
def test_debug_info():
diff --git a/contrib/python/pyOpenSSL/py3/tests/test_rand.py b/contrib/python/pyOpenSSL/py3/tests/test_rand.py
index 763d71124ca..24f30939545 100644
--- a/contrib/python/pyOpenSSL/py3/tests/test_rand.py
+++ b/contrib/python/pyOpenSSL/py3/tests/test_rand.py
@@ -10,7 +10,7 @@ import pytest
from OpenSSL import rand
-class TestRand(object):
+class TestRand:
@pytest.mark.parametrize("args", [(b"foo", None), (None, 3)])
def test_add_wrong_args(self, args):
"""
diff --git a/contrib/python/pyOpenSSL/py3/tests/test_ssl.py b/contrib/python/pyOpenSSL/py3/tests/test_ssl.py
index 8bb5a035a77..fdedd7133a0 100644
--- a/contrib/python/pyOpenSSL/py3/tests/test_ssl.py
+++ b/contrib/python/pyOpenSSL/py3/tests/test_ssl.py
@@ -7,124 +7,125 @@ Unit tests for :mod:`OpenSSL.SSL`.
import datetime
import gc
+import select
import sys
import uuid
-
-from gc import collect, get_referrers
from errno import (
EAFNOSUPPORT,
ECONNREFUSED,
EINPROGRESS,
- EWOULDBLOCK,
EPIPE,
ESHUTDOWN,
+ EWOULDBLOCK,
)
-from sys import platform, getfilesystemencoding
-from socket import AF_INET, AF_INET6, MSG_PEEK, SHUT_RDWR, error, socket
+from gc import collect, get_referrers
from os import makedirs
from os.path import join
-from weakref import ref
+from socket import (
+ AF_INET,
+ AF_INET6,
+ MSG_PEEK,
+ SHUT_RDWR,
+ error,
+ socket,
+)
+from sys import getfilesystemencoding, platform
+from typing import Union
from warnings import simplefilter
-
-import flaky
-
-import pytest
-
-from pretend import raiser
-
-from six import PY2, text_type
+from weakref import ref
from cryptography import x509
-from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
+import flaky
-from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM
-from OpenSSL.crypto import PKey, X509, X509Extension, X509Store
-from OpenSSL.crypto import dump_privatekey, load_privatekey
-from OpenSSL.crypto import dump_certificate, load_certificate
-from OpenSSL.crypto import get_elliptic_curves
+from pretend import raiser
-from OpenSSL.SSL import (
- OPENSSL_VERSION_NUMBER,
- SSLEAY_VERSION,
- SSLEAY_CFLAGS,
- TLS_METHOD,
- TLS1_3_VERSION,
- TLS1_2_VERSION,
- TLS1_1_VERSION,
-)
-from OpenSSL.SSL import SSLEAY_PLATFORM, SSLEAY_DIR, SSLEAY_BUILT_ON
-from OpenSSL.SSL import SENT_SHUTDOWN, RECEIVED_SHUTDOWN
-from OpenSSL.SSL import (
- SSLv2_METHOD,
- SSLv3_METHOD,
- SSLv23_METHOD,
- TLSv1_METHOD,
- TLSv1_1_METHOD,
- TLSv1_2_METHOD,
-)
-from OpenSSL.SSL import OP_SINGLE_DH_USE, OP_NO_SSLv2, OP_NO_SSLv3
-from OpenSSL.SSL import (
- VERIFY_PEER,
- VERIFY_FAIL_IF_NO_PEER_CERT,
- VERIFY_CLIENT_ONCE,
- VERIFY_NONE,
-)
+import pytest
from OpenSSL import SSL
from OpenSSL.SSL import (
- SESS_CACHE_OFF,
- SESS_CACHE_CLIENT,
- SESS_CACHE_SERVER,
+ Connection,
+ Context,
+ DTLS_METHOD,
+ Error,
+ MODE_RELEASE_BUFFERS,
+ NO_OVERLAPPING_PROTOCOLS,
+ OPENSSL_VERSION_NUMBER,
+ OP_COOKIE_EXCHANGE,
+ OP_NO_COMPRESSION,
+ OP_NO_QUERY_MTU,
+ OP_NO_SSLv2,
+ OP_NO_SSLv3,
+ OP_NO_TICKET,
+ OP_SINGLE_DH_USE,
+ RECEIVED_SHUTDOWN,
+ SENT_SHUTDOWN,
SESS_CACHE_BOTH,
+ SESS_CACHE_CLIENT,
SESS_CACHE_NO_AUTO_CLEAR,
+ SESS_CACHE_NO_INTERNAL,
SESS_CACHE_NO_INTERNAL_LOOKUP,
SESS_CACHE_NO_INTERNAL_STORE,
- SESS_CACHE_NO_INTERNAL,
-)
-
-from OpenSSL.SSL import (
- Error,
+ SESS_CACHE_OFF,
+ SESS_CACHE_SERVER,
+ SSLEAY_BUILT_ON,
+ SSLEAY_CFLAGS,
+ SSLEAY_DIR,
+ SSLEAY_PLATFORM,
+ SSLEAY_VERSION,
+ SSL_CB_ACCEPT_EXIT,
+ SSL_CB_ACCEPT_LOOP,
+ SSL_CB_ALERT,
+ SSL_CB_CONNECT_EXIT,
+ SSL_CB_CONNECT_LOOP,
+ SSL_CB_EXIT,
+ SSL_CB_HANDSHAKE_DONE,
+ SSL_CB_HANDSHAKE_START,
+ SSL_CB_LOOP,
+ SSL_CB_READ,
+ SSL_CB_READ_ALERT,
+ SSL_CB_WRITE,
+ SSL_CB_WRITE_ALERT,
+ SSL_ST_ACCEPT,
+ SSL_ST_CONNECT,
+ SSL_ST_MASK,
+ SSLeay_version,
+ SSLv23_METHOD,
+ Session,
SysCallError,
+ TLS1_1_VERSION,
+ TLS1_2_VERSION,
+ TLS1_3_VERSION,
+ TLS_METHOD,
+ TLSv1_1_METHOD,
+ TLSv1_2_METHOD,
+ TLSv1_METHOD,
+ VERIFY_CLIENT_ONCE,
+ VERIFY_FAIL_IF_NO_PEER_CERT,
+ VERIFY_NONE,
+ VERIFY_PEER,
WantReadError,
WantWriteError,
ZeroReturnError,
+ _make_requires,
)
-from OpenSSL.SSL import Context, Session, Connection, SSLeay_version
-from OpenSSL.SSL import _make_requires
-
from OpenSSL._util import ffi as _ffi, lib as _lib
-
-from OpenSSL.SSL import (
- OP_NO_QUERY_MTU,
- OP_COOKIE_EXCHANGE,
- OP_NO_TICKET,
- OP_NO_COMPRESSION,
- MODE_RELEASE_BUFFERS,
- NO_OVERLAPPING_PROTOCOLS,
-)
-
-from OpenSSL.SSL import (
- SSL_ST_CONNECT,
- SSL_ST_ACCEPT,
- SSL_ST_MASK,
- SSL_CB_LOOP,
- SSL_CB_EXIT,
- SSL_CB_READ,
- SSL_CB_WRITE,
- SSL_CB_ALERT,
- SSL_CB_READ_ALERT,
- SSL_CB_WRITE_ALERT,
- SSL_CB_ACCEPT_LOOP,
- SSL_CB_ACCEPT_EXIT,
- SSL_CB_CONNECT_LOOP,
- SSL_CB_CONNECT_EXIT,
- SSL_CB_HANDSHAKE_START,
- SSL_CB_HANDSHAKE_DONE,
+from OpenSSL.crypto import (
+ FILETYPE_PEM,
+ PKey,
+ TYPE_RSA,
+ X509,
+ X509Extension,
+ X509Store,
+ dump_certificate,
+ dump_privatekey,
+ get_elliptic_curves,
+ load_certificate,
+ load_privatekey,
)
try:
@@ -142,15 +143,15 @@ try:
except ImportError:
OP_NO_TLSv1_3 = None
-from .util import WARNING_TYPE_EXPECTED, NON_ASCII, is_consistent_type
from .test_crypto import (
client_cert_pem,
client_key_pem,
- server_cert_pem,
- server_key_pem,
root_cert_pem,
root_key_pem,
+ server_cert_pem,
+ server_key_pem,
)
+from .util import NON_ASCII, WARNING_TYPE_EXPECTED, is_consistent_type
# openssl dhparam 2048 -out dh-2048.pem
@@ -166,9 +167,6 @@ i5s5yYK7a/0eWxxRr2qraYaUj8RwDpH9CwIBAg==
"""
-skip_if_py3 = pytest.mark.skipif(not PY2, reason="Python 2 only")
-
-
def socket_any_family():
try:
return socket(AF_INET)
@@ -197,7 +195,7 @@ def join_bytes_or_unicode(prefix, suffix):
return join(prefix, suffix)
# Otherwise, coerce suffix to the type of prefix.
- if isinstance(prefix, text_type):
+ if isinstance(prefix, str):
return join(prefix, suffix.decode(getfilesystemencoding()))
else:
return join(prefix, suffix.encode(getfilesystemencoding()))
@@ -221,6 +219,8 @@ def socket_pair():
client.setblocking(True)
server = port.accept()[0]
+ port.close()
+
# Let's pass some unencrypted data to make sure our socket connection is
# fine. Just one byte, so we don't have to worry about buffers getting
# filled up or fragmentation.
@@ -366,7 +366,7 @@ def interact_in_memory(client_conn, server_conn):
# Give the side a chance to generate some more bytes, or succeed.
try:
- data = read.recv(2 ** 16)
+ data = read.recv(2**16)
except WantReadError:
# It didn't succeed, so we'll hope it generated some output.
pass
@@ -407,7 +407,7 @@ def handshake_in_memory(client_conn, server_conn):
interact_in_memory(client_conn, server_conn)
-class TestVersion(object):
+class TestVersion:
"""
Tests for version information exposed by `OpenSSL.SSL.SSLeay_version` and
`OpenSSL.SSL.OPENSSL_VERSION_NUMBER`.
@@ -444,17 +444,15 @@ def ca_file(tmpdir):
"""
Create a valid PEM file with CA certificates and return the path.
"""
- key = rsa.generate_private_key(
- public_exponent=65537, key_size=2048, backend=default_backend()
- )
+ key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = key.public_key()
builder = x509.CertificateBuilder()
builder = builder.subject_name(
- x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org")])
+ x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "pyopenssl.org")])
)
builder = builder.issuer_name(
- x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org")])
+ x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "pyopenssl.org")])
)
one_day = datetime.timedelta(1, 0, 0)
builder = builder.not_valid_before(datetime.datetime.today() - one_day)
@@ -466,9 +464,7 @@ def ca_file(tmpdir):
critical=True,
)
- certificate = builder.sign(
- private_key=key, algorithm=hashes.SHA256(), backend=default_backend()
- )
+ certificate = builder.sign(private_key=key, algorithm=hashes.SHA256())
ca_file = tmpdir.join("test.pem")
ca_file.write_binary(
@@ -488,14 +484,14 @@ def context():
return Context(SSLv23_METHOD)
-class TestContext(object):
+class TestContext:
"""
Unit tests for `OpenSSL.SSL.Context`.
"""
@pytest.mark.parametrize(
"cipher_string",
- [b"hello world:AES128-SHA", u"hello world:AES128-SHA"],
+ [b"hello world:AES128-SHA", "hello world:AES128-SHA"],
)
def test_set_cipher_list(self, context, cipher_string):
"""
@@ -525,15 +521,20 @@ class TestContext(object):
"""
with pytest.raises(Error) as excinfo:
context.set_cipher_list(b"imaginary-cipher")
- assert excinfo.value.args == (
- [
- (
- "SSL routines",
- "SSL_CTX_set_cipher_list",
- "no cipher match",
- )
- ],
- )
+ assert excinfo.value.args[0][0] in [
+ # 1.1.x
+ (
+ "SSL routines",
+ "SSL_CTX_set_cipher_list",
+ "no cipher match",
+ ),
+ # 3.0.x
+ (
+ "SSL routines",
+ "",
+ "no cipher match",
+ ),
+ ]
def test_load_client_ca(self, context, ca_file):
"""
@@ -572,20 +573,27 @@ class TestContext(object):
with pytest.raises(Error) as e:
context.set_session_id(b"abc" * 1000)
- assert [
+ assert e.value.args[0][0] in [
+ # 1.1.x
(
"SSL routines",
"SSL_CTX_set_session_id_context",
"ssl session id context too long",
- )
- ] == e.value.args[0]
+ ),
+ # 3.0.x
+ (
+ "SSL routines",
+ "",
+ "ssl session id context too long",
+ ),
+ ]
def test_set_session_id_unicode(self, context):
"""
`Context.set_session_id` raises a warning if a unicode string is
passed.
"""
- pytest.deprecated_call(context.set_session_id, u"abc")
+ pytest.deprecated_call(context.set_session_id, "abc")
def test_method(self):
"""
@@ -597,7 +605,7 @@ class TestContext(object):
for meth in methods:
Context(meth)
- maybe = [SSLv2_METHOD, SSLv3_METHOD, TLSv1_1_METHOD, TLSv1_2_METHOD]
+ maybe = [TLSv1_1_METHOD, TLSv1_2_METHOD]
for meth in maybe:
try:
Context(meth)
@@ -609,7 +617,7 @@ class TestContext(object):
with pytest.raises(TypeError):
Context("")
with pytest.raises(ValueError):
- Context(10)
+ Context(13)
def test_type(self):
"""
@@ -617,17 +625,6 @@ class TestContext(object):
"""
assert is_consistent_type(Context, "Context", TLSv1_METHOD)
- def test_use_privatekey(self):
- """
- `Context.use_privatekey` takes an `OpenSSL.crypto.PKey` instance.
- """
- key = PKey()
- key.generate_key(TYPE_RSA, 1024)
- ctx = Context(SSLv23_METHOD)
- ctx.use_privatekey(key)
- with pytest.raises(TypeError):
- ctx.use_privatekey("")
-
def test_use_privatekey_file_missing(self, tmpfile):
"""
`Context.use_privatekey_file` raises `OpenSSL.SSL.Error` when passed
@@ -681,37 +678,6 @@ class TestContext(object):
FILETYPE_PEM,
)
- def test_use_certificate_wrong_args(self):
- """
- `Context.use_certificate_wrong_args` raises `TypeError` when not passed
- exactly one `OpenSSL.crypto.X509` instance as an argument.
- """
- ctx = Context(SSLv23_METHOD)
- with pytest.raises(TypeError):
- ctx.use_certificate("hello, world")
-
- def test_use_certificate_uninitialized(self):
- """
- `Context.use_certificate` raises `OpenSSL.SSL.Error` when passed a
- `OpenSSL.crypto.X509` instance which has not been initialized
- (ie, which does not actually have any certificate data).
- """
- ctx = Context(SSLv23_METHOD)
- with pytest.raises(Error):
- ctx.use_certificate(X509())
-
- def test_use_certificate(self):
- """
- `Context.use_certificate` sets the certificate which will be
- used to identify connections created using the context.
- """
- # TODO
- # Hard to assert anything. But we could set a privatekey then ask
- # OpenSSL if the cert and key agree using check_privatekey. Then as
- # long as check_privatekey works right we're good...
- ctx = Context(SSLv23_METHOD)
- ctx.use_certificate(load_certificate(FILETYPE_PEM, root_cert_pem))
-
def test_use_certificate_file_wrong_args(self):
"""
`Context.use_certificate_file` raises `TypeError` if the first
@@ -1211,8 +1177,8 @@ class TestContext(object):
def test_fallback_default_verify_paths(self, monkeypatch):
"""
Test that we load certificates successfully on linux from the fallback
- path. To do this we set the _CRYPTOGRAPHY_MANYLINUX1_CA_FILE and
- _CRYPTOGRAPHY_MANYLINUX1_CA_DIR vars to be equal to whatever the
+ path. To do this we set the _CRYPTOGRAPHY_MANYLINUX_CA_FILE and
+ _CRYPTOGRAPHY_MANYLINUX_CA_DIR vars to be equal to whatever the
current OpenSSL default is and we disable
SSL_CTX_SET_default_verify_paths so that it can't find certs unless
it loads via fallback.
@@ -1223,12 +1189,12 @@ class TestContext(object):
)
monkeypatch.setattr(
SSL,
- "_CRYPTOGRAPHY_MANYLINUX1_CA_FILE",
+ "_CRYPTOGRAPHY_MANYLINUX_CA_FILE",
_ffi.string(_lib.X509_get_default_cert_file()),
)
monkeypatch.setattr(
SSL,
- "_CRYPTOGRAPHY_MANYLINUX1_CA_DIR",
+ "_CRYPTOGRAPHY_MANYLINUX_CA_DIR",
_ffi.string(_lib.X509_get_default_cert_dir()),
)
context.set_default_verify_paths()
@@ -1332,20 +1298,20 @@ class TestContext(object):
"""
serverSocket, clientSocket = socket_pair()
- server = Connection(serverContext, serverSocket)
- server.set_accept_state()
+ with serverSocket, clientSocket:
+ server = Connection(serverContext, serverSocket)
+ server.set_accept_state()
- client = Connection(clientContext, clientSocket)
- client.set_connect_state()
+ client = Connection(clientContext, clientSocket)
+ client.set_connect_state()
- # Make them talk to each other.
- # interact_in_memory(client, server)
- for _ in range(3):
- for s in [client, server]:
- try:
- s.do_handshake()
- except WantReadError:
- pass
+ # Make them talk to each other.
+ for _ in range(3):
+ for s in [client, server]:
+ try:
+ s.do_handshake()
+ except WantReadError:
+ select.select([client, server], [], [])
def test_set_verify_callback_connection_argument(self):
"""
@@ -1361,7 +1327,7 @@ class TestContext(object):
)
serverConnection = Connection(serverContext, None)
- class VerifyCallback(object):
+ class VerifyCallback:
def callback(self, connection, *args):
self.connection = connection
return 1
@@ -1705,7 +1671,7 @@ class TestContext(object):
"""
context = Context(SSLv23_METHOD)
for curve in get_elliptic_curves():
- if curve.name.startswith(u"Oakley-"):
+ if curve.name.startswith("Oakley-"):
# Setting Oakley-EC2N-4 and Oakley-EC2N-3 adds
# ('bignum routines', 'BN_mod_inverse', 'no inverse') to the
# error queue on OpenSSL 1.0.2.
@@ -1751,7 +1717,7 @@ class TestContext(object):
"""
context = Context(SSLv23_METHOD)
with pytest.raises(TypeError):
- context.set_tlsext_use_srtp(text_type("SRTP_AES128_CM_SHA1_80"))
+ context.set_tlsext_use_srtp(str("SRTP_AES128_CM_SHA1_80"))
def test_set_tlsext_use_srtp_invalid_profile(self):
"""
@@ -1773,7 +1739,7 @@ class TestContext(object):
assert context.set_tlsext_use_srtp(b"SRTP_AES128_CM_SHA1_80") is None
-class TestServerNameCallback(object):
+class TestServerNameCallback:
"""
Tests for `Context.set_tlsext_servername_callback` and its
interaction with `Connection`.
@@ -1882,7 +1848,7 @@ class TestServerNameCallback(object):
assert args == [(server, b"foo1.example.com")]
-class TestApplicationLayerProtoNegotiation(object):
+class TestApplicationLayerProtoNegotiation:
"""
Tests for ALPN in PyOpenSSL.
"""
@@ -1927,14 +1893,13 @@ class TestApplicationLayerProtoNegotiation(object):
assert server.get_alpn_proto_negotiated() == b"spdy/2"
assert client.get_alpn_proto_negotiated() == b"spdy/2"
- @pytest.mark.xfail(reason='https://github.com/pyca/pyopenssl/issues/1043')
def test_alpn_call_failure(self):
"""
SSL_CTX_set_alpn_protos does not like to be called with an empty
protocols list. Ensure that we produce a user-visible error.
"""
context = Context(SSLv23_METHOD)
- with pytest.raises(Error):
+ with pytest.raises(ValueError):
context.set_alpn_protos([])
def test_alpn_set_on_connection(self):
@@ -2066,7 +2031,7 @@ class TestApplicationLayerProtoNegotiation(object):
def invalid_cb(conn, options):
invalid_cb_args.append((conn, options))
- return u"can't return unicode"
+ return "can't return unicode"
client_context = Context(SSLv23_METHOD)
client_context.set_alpn_protos([b"http/1.1", b"spdy/2"])
@@ -2163,7 +2128,7 @@ class TestApplicationLayerProtoNegotiation(object):
assert select_args == [(server, [b"http/1.1", b"spdy/2"])]
-class TestSession(object):
+class TestSession:
"""
Unit tests for :py:obj:`OpenSSL.SSL.Session`.
"""
@@ -2177,7 +2142,77 @@ class TestSession(object):
assert isinstance(new_session, Session)
-class TestConnection(object):
[email protected](params=["context", "connection"])
+def ctx_or_conn(request) -> Union[Context, Connection]:
+ ctx = Context(SSLv23_METHOD)
+ if request.param == "context":
+ return ctx
+ else:
+ return Connection(ctx, None)
+
+
+class TestContextConnection:
+ """
+ Unit test for methods that are exposed both by Connection and Context
+ objects.
+ """
+
+ def test_use_privatekey(self, ctx_or_conn):
+ """
+ `use_privatekey` takes an `OpenSSL.crypto.PKey` instance.
+ """
+ key = PKey()
+ key.generate_key(TYPE_RSA, 1024)
+
+ ctx_or_conn.use_privatekey(key)
+ with pytest.raises(TypeError):
+ ctx_or_conn.use_privatekey("")
+
+ def test_use_privatekey_wrong_key(self, ctx_or_conn):
+ """
+ `use_privatekey` raises `OpenSSL.SSL.Error` when passed a
+ `OpenSSL.crypto.PKey` instance which has not been initialized.
+ """
+ key = PKey()
+ key.generate_key(TYPE_RSA, 1024)
+ ctx_or_conn.use_certificate(
+ load_certificate(FILETYPE_PEM, root_cert_pem)
+ )
+ with pytest.raises(Error):
+ ctx_or_conn.use_privatekey(key)
+
+ def test_use_certificate(self, ctx_or_conn):
+ """
+ `use_certificate` sets the certificate which will be
+ used to identify connections created using the context.
+ """
+ # TODO
+ # Hard to assert anything. But we could set a privatekey then ask
+ # OpenSSL if the cert and key agree using check_privatekey. Then as
+ # long as check_privatekey works right we're good...
+ ctx_or_conn.use_certificate(
+ load_certificate(FILETYPE_PEM, root_cert_pem)
+ )
+
+ def test_use_certificate_wrong_args(self, ctx_or_conn):
+ """
+ `use_certificate_wrong_args` raises `TypeError` when not passed
+ exactly one `OpenSSL.crypto.X509` instance as an argument.
+ """
+ with pytest.raises(TypeError):
+ ctx_or_conn.use_certificate("hello, world")
+
+ def test_use_certificate_uninitialized(self, ctx_or_conn):
+ """
+ `use_certificate` raises `OpenSSL.SSL.Error` when passed a
+ `OpenSSL.crypto.X509` instance which has not been initialized
+ (ie, which does not actually have any certificate data).
+ """
+ with pytest.raises(Error):
+ ctx_or_conn.use_certificate(X509())
+
+
+class TestConnection:
"""
Unit tests for `OpenSSL.SSL.Connection`.
"""
@@ -2233,7 +2268,7 @@ class TestConnection(object):
connection.bio_write(b"xy")
connection.bio_write(bytearray(b"za"))
with pytest.warns(DeprecationWarning):
- connection.bio_write(u"deprecated")
+ connection.bio_write("deprecated")
def test_get_context(self):
"""
@@ -2286,10 +2321,8 @@ class TestConnection(object):
with pytest.raises(TypeError):
conn.set_tlsext_host_name(b"with\0null")
- if not PY2:
- # On Python 3.x, don't accidentally implicitly convert from text.
- with pytest.raises(TypeError):
- conn.set_tlsext_host_name(b"example.com".decode("ascii"))
+ with pytest.raises(TypeError):
+ conn.set_tlsext_host_name(b"example.com".decode("ascii"))
def test_pending(self):
"""
@@ -2629,6 +2662,52 @@ class TestConnection(object):
server = Connection(ctx, None)
assert None is server.get_verified_chain()
+ def test_set_verify_overrides_context(self):
+ context = Context(SSLv23_METHOD)
+ context.set_verify(VERIFY_PEER)
+ conn = Connection(context, None)
+ conn.set_verify(VERIFY_NONE)
+
+ assert context.get_verify_mode() == VERIFY_PEER
+ assert conn.get_verify_mode() == VERIFY_NONE
+
+ with pytest.raises(TypeError):
+ conn.set_verify(None)
+
+ with pytest.raises(TypeError):
+ conn.set_verify(VERIFY_PEER, "not a callable")
+
+ def test_set_verify_callback_reference(self):
+ """
+ The callback for certificate verification should only be forgotten if
+ the context and all connections created by it do not use it anymore.
+ """
+
+ def callback(conn, cert, errnum, depth, ok): # pragma: no cover
+ return ok
+
+ tracker = ref(callback)
+
+ context = Context(SSLv23_METHOD)
+ context.set_verify(VERIFY_PEER, callback)
+ del callback
+
+ conn = Connection(context, None)
+ context.set_verify(VERIFY_NONE)
+
+ collect()
+ collect()
+ assert tracker()
+
+ conn.set_verify(VERIFY_PEER, lambda conn, cert, errnum, depth, ok: ok)
+ collect()
+ collect()
+ callback = tracker()
+ if callback is not None: # pragma: nocover
+ referrers = get_referrers(callback)
+ if len(referrers) > 1:
+ pytest.fail("Some references remain: %r" % (referrers,))
+
def test_get_session_unconnected(self):
"""
`Connection.get_session` returns `None` when used with an object
@@ -2859,8 +2938,8 @@ class TestConnection(object):
client.get_cipher_name(),
)
- assert isinstance(server_cipher_name, text_type)
- assert isinstance(client_cipher_name, text_type)
+ assert isinstance(server_cipher_name, str)
+ assert isinstance(client_cipher_name, str)
assert server_cipher_name == client_cipher_name
@@ -2884,8 +2963,8 @@ class TestConnection(object):
client.get_cipher_version(),
)
- assert isinstance(server_cipher_version, text_type)
- assert isinstance(client_cipher_version, text_type)
+ assert isinstance(server_cipher_version, str)
+ assert isinstance(client_cipher_version, str)
assert server_cipher_version == client_cipher_version
@@ -2923,8 +3002,8 @@ class TestConnection(object):
client_protocol_version_name = client.get_protocol_version_name()
server_protocol_version_name = server.get_protocol_version_name()
- assert isinstance(server_protocol_version_name, text_type)
- assert isinstance(client_protocol_version_name, text_type)
+ assert isinstance(server_protocol_version_name, str)
+ assert isinstance(client_protocol_version_name, str)
assert server_protocol_version_name == client_protocol_version_name
@@ -2979,7 +3058,7 @@ class TestConnection(object):
assert 2 == len(data)
-class TestConnectionGetCipherList(object):
+class TestConnectionGetCipherList:
"""
Tests for `Connection.get_cipher_list`.
"""
@@ -3002,10 +3081,10 @@ class VeryLarge(bytes):
"""
def __len__(self):
- return 2 ** 31
+ return 2**31
-class TestConnectionSend(object):
+class TestConnectionSend:
"""
Tests for `Connection.send`.
"""
@@ -3067,20 +3146,8 @@ class TestConnectionSend(object):
assert count == 2
assert client.recv(2) == b"xy"
- @skip_if_py3
- def test_short_buffer(self):
- """
- When passed a buffer containing a small number of bytes,
- `Connection.send` transmits all of them and returns the number
- of bytes sent.
- """
- server, client = loopback()
- count = server.send(buffer(b"xy")) # noqa: F821
- assert count == 2
- assert client.recv(2) == b"xy"
-
@pytest.mark.skipif(
- sys.maxsize < 2 ** 31,
+ sys.maxsize < 2**31,
reason="sys.maxsize < 2**31 - test requires 64 bit",
)
def test_buf_too_large(self):
@@ -3103,7 +3170,7 @@ def _make_memoryview(size):
return memoryview(bytearray(size))
-class TestConnectionRecvInto(object):
+class TestConnectionRecvInto:
"""
Tests for `Connection.recv_into`.
"""
@@ -3226,7 +3293,7 @@ class TestConnectionRecvInto(object):
self._doesnt_overfill_test(_make_memoryview)
-class TestConnectionSendall(object):
+class TestConnectionSendall:
"""
Tests for `Connection.sendall`.
"""
@@ -3274,17 +3341,6 @@ class TestConnectionSendall(object):
server.sendall(memoryview(b"x"))
assert client.recv(1) == b"x"
- @skip_if_py3
- def test_short_buffers(self):
- """
- When passed a buffer containing a small number of bytes,
- `Connection.sendall` transmits all of them.
- """
- server, client = loopback()
- count = server.sendall(buffer(b"xy")) # noqa: F821
- assert count == 2
- assert client.recv(2) == b"xy"
-
def test_long(self):
"""
`Connection.sendall` transmits all the bytes in the string passed to it
@@ -3319,7 +3375,7 @@ class TestConnectionSendall(object):
assert err.value.args[0] == EPIPE
-class TestConnectionRenegotiate(object):
+class TestConnectionRenegotiate:
"""
Tests for SSL renegotiation APIs.
"""
@@ -3363,7 +3419,7 @@ class TestConnectionRenegotiate(object):
pass
-class TestError(object):
+class TestError:
"""
Unit tests for `OpenSSL.SSL.Error`.
"""
@@ -3376,7 +3432,7 @@ class TestError(object):
assert Error.__name__ == "Error"
-class TestConstants(object):
+class TestConstants:
"""
Tests for the values of constants exposed in `OpenSSL.SSL`.
@@ -3493,7 +3549,7 @@ class TestConstants(object):
assert 0x300 == SESS_CACHE_NO_INTERNAL
-class TestMemoryBIO(object):
+class TestMemoryBIO:
"""
Tests for `OpenSSL.SSL.Connection` using a memory BIO.
"""
@@ -3666,7 +3722,7 @@ class TestMemoryBIO(object):
interact_in_memory(client, server)
- size = 2 ** 15
+ size = 2**15
sent = client.send(b"x" * size)
# Sanity check. We're trying to test what happens when the entire
# input can't be sent. If the entire input was sent, this test is
@@ -3917,7 +3973,7 @@ class TestMemoryBIO(object):
self._check_client_ca_list(set_replaces_add_ca)
-class TestInfoConstants(object):
+class TestInfoConstants:
"""
Tests for assorted constants exposed for use in info callbacks.
"""
@@ -3960,7 +4016,7 @@ class TestInfoConstants(object):
assert const is None or isinstance(const, int)
-class TestRequires(object):
+class TestRequires:
"""
Tests for the decorator factory used to conditionally raise
NotImplementedError when older OpenSSLs are used.
@@ -3999,7 +4055,7 @@ class TestRequires(object):
assert "Error text" in str(e.value)
-class TestOCSP(object):
+class TestOCSP:
"""
Tests for PyOpenSSL's OCSP stapling support.
"""
@@ -4243,3 +4299,188 @@ class TestOCSP(object):
with pytest.raises(TypeError):
handshake_in_memory(client, server)
+
+
+class TestDTLS:
+ # The way you would expect DTLSv1_listen to work is:
+ #
+ # - it reads packets in a loop
+ # - when it finds a valid ClientHello, it returns
+ # - now the handshake can proceed
+ #
+ # However, on older versions of OpenSSL, it did something "cleverer". The
+ # way it worked is:
+ #
+ # - it "peeks" into the BIO to see the next packet without consuming it
+ # - if *not* a valid ClientHello, then it reads the packet to consume it
+ # and loops around
+ # - if it *is* a valid ClientHello, it *leaves the packet in the BIO*, and
+ # returns
+ # - then the handshake finds the ClientHello in the BIO and reads it a
+ # second time.
+ #
+ # I'm not sure exactly when this switched over. The OpenSSL v1.1.1 in
+ # Ubuntu 18.04 has the old behavior. The OpenSSL v1.1.1 in Ubuntu 20.04 has
+ # the new behavior. There doesn't seem to be any mention of this change in
+ # the OpenSSL v1.1.1 changelog, but presumably it changed in some point
+ # release or another. Presumably in 2025 or so there will be only new
+ # OpenSSLs around we can delete this whole comment and the weird
+ # workaround. If anyone is still using this library by then, which seems
+ # both depressing and inevitable.
+ #
+ # Anyway, why do we care? The reason is that the old strategy has a
+ # problem: the "peek" operation is only defined on "DGRAM BIOs", which are
+ # a special type of object that is different from the more familiar "socket
+ # BIOs" and "memory BIOs". If you *don't* have a DGRAM BIO, and you try to
+ # peek into the BIO... then it silently degrades to a full-fledged "read"
+ # operation that consumes the packet. Which is a problem if your algorithm
+ # depends on leaving the packet in the BIO to be read again later.
+ #
+ # So on old OpenSSL, we have a problem:
+ #
+ # - we can't use a DGRAM BIO, because cryptography/pyopenssl don't wrap the
+ # relevant APIs, nor should they.
+ #
+ # - if we use a socket BIO, then the first time DTLSv1_listen sees an
+ # invalid packet (like for example... the challenge packet that *every
+ # DTLS handshake starts with before the real ClientHello!*), it tries to
+ # first "peek" it, and then "read" it. But since the first "peek"
+ # consumes the packet, the second "read" ends up hanging or consuming
+ # some unrelated packet, which is undesirable. So you can't even get to
+ # the handshake stage successfully.
+ #
+ # - if we use a memory BIO, then DTLSv1_listen works OK on invalid packets
+ # -- first the "peek" consumes them, and then it tries to "read" again to
+ # consume them, which fails immediately, and OpenSSL ignores the failure.
+ # So it works by accident. BUT, when we get a valid ClientHello, we have
+ # a problem: DTLSv1_listen tries to "peek" it and then leave it in the
+ # read BIO for do_handshake to consume. But instead "peek" consumes the
+ # packet, so it's not there where do_handshake is expecting it, and the
+ # handshake fails.
+ #
+ # Fortunately (if that's the word), we can work around the memory BIO
+ # problem. (Which is good, because in real life probably all our users will
+ # be using memory BIOs.) All we have to do is to save the valid ClientHello
+ # before calling DTLSv1_listen, and then after it returns we push *a second
+ # copy of it* of the packet memory BIO before calling do_handshake. This
+ # fakes out OpenSSL and makes it think the "peek" operation worked
+ # correctly, and we can go on with our lives.
+ #
+ # In fact, we push the second copy of the ClientHello unconditionally. On
+ # new versions of OpenSSL, this is unnecessary, but harmless, because the
+ # DTLS state machine treats it like a network hiccup that duplicated a
+ # packet, which DTLS is robust against.
+ def test_it_works_at_all(self):
+ # arbitrary number larger than any conceivable handshake volley
+ LARGE_BUFFER = 65536
+
+ s_ctx = Context(DTLS_METHOD)
+
+ def generate_cookie(ssl):
+ return b"xyzzy"
+
+ def verify_cookie(ssl, cookie):
+ return cookie == b"xyzzy"
+
+ s_ctx.set_cookie_generate_callback(generate_cookie)
+ s_ctx.set_cookie_verify_callback(verify_cookie)
+ s_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
+ s_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
+ s_ctx.set_options(OP_NO_QUERY_MTU)
+ s = Connection(s_ctx)
+ s.set_accept_state()
+
+ c_ctx = Context(DTLS_METHOD)
+ c_ctx.set_options(OP_NO_QUERY_MTU)
+ c = Connection(c_ctx)
+ c.set_connect_state()
+
+ # These are mandatory, because openssl can't guess the MTU for a memory
+ # bio and will produce a mysterious error if you make it try.
+ c.set_ciphertext_mtu(1500)
+ s.set_ciphertext_mtu(1500)
+
+ latest_client_hello = None
+
+ def pump_membio(label, source, sink):
+ try:
+ chunk = source.bio_read(LARGE_BUFFER)
+ except WantReadError:
+ return False
+ # I'm not sure this check is needed, but I'm not sure it's *not*
+ # needed either:
+ if not chunk: # pragma: no cover
+ return False
+ # Gross hack: if this is a ClientHello, save it so we can find it
+ # later. See giant comment above.
+ try:
+ # if ContentType == handshake and HandshakeType ==
+ # client_hello:
+ if chunk[0] == 22 and chunk[13] == 1:
+ nonlocal latest_client_hello
+ latest_client_hello = chunk
+ except IndexError: # pragma: no cover
+ pass
+ print(f"{label}: {chunk.hex()}")
+ sink.bio_write(chunk)
+ return True
+
+ def pump():
+ # Raises if there was no data to pump, to avoid infinite loops if
+ # we aren't making progress.
+ assert pump_membio("s -> c", s, c) or pump_membio("c -> s", c, s)
+
+ c_handshaking = True
+ s_listening = True
+ s_handshaking = False
+ first = True
+ while c_handshaking or s_listening or s_handshaking:
+ if not first:
+ pump()
+ first = False
+
+ if c_handshaking:
+ try:
+ c.do_handshake()
+ except WantReadError:
+ pass
+ else:
+ c_handshaking = False
+
+ if s_listening:
+ try:
+ s.DTLSv1_listen()
+ except WantReadError:
+ pass
+ else:
+ s_listening = False
+ s_handshaking = True
+ # Write the duplicate ClientHello. See giant comment above.
+ s.bio_write(latest_client_hello)
+
+ if s_handshaking:
+ try:
+ s.do_handshake()
+ except WantReadError:
+ pass
+ else:
+ s_handshaking = False
+
+ s.write(b"hello")
+ pump()
+ assert c.read(100) == b"hello"
+ c.write(b"goodbye")
+ pump()
+ assert s.read(100) == b"goodbye"
+
+ # Check that the MTU set/query functions are doing *something*
+ c.set_ciphertext_mtu(1000)
+ try:
+ assert 500 < c.get_cleartext_mtu() < 1000
+ except NotImplementedError: # OpenSSL 1.1.0 and earlier
+ pass
+ c.set_ciphertext_mtu(500)
+ try:
+ assert 0 < c.get_cleartext_mtu() < 500
+ except NotImplementedError: # OpenSSL 1.1.0 and earlier
+ pass
diff --git a/contrib/python/pyOpenSSL/py3/tests/test_util.py b/contrib/python/pyOpenSSL/py3/tests/test_util.py
index 6224448d433..e029d71a719 100644
--- a/contrib/python/pyOpenSSL/py3/tests/test_util.py
+++ b/contrib/python/pyOpenSSL/py3/tests/test_util.py
@@ -3,7 +3,7 @@ import pytest
from OpenSSL._util import exception_from_error_queue, lib
-class TestErrors(object):
+class TestErrors:
"""
Tests for handling of certain OpenSSL error cases.
"""
diff --git a/contrib/python/pyOpenSSL/py3/tests/util.py b/contrib/python/pyOpenSSL/py3/tests/util.py
index 75d2c8de80e..b4649e33992 100644
--- a/contrib/python/pyOpenSSL/py3/tests/util.py
+++ b/contrib/python/pyOpenSSL/py3/tests/util.py
@@ -6,8 +6,6 @@ Helpers for the OpenSSL test suite, largely copied from
U{Twisted<http://twistedmatrix.com/>}.
"""
-from six import PY2
-
# This is the UTF-8 encoding of the SNOWMAN unicode code point.
NON_ASCII = b"\xe2\x98\x83".decode("utf-8")
@@ -32,7 +30,7 @@ def is_consistent_type(theType, name, *constructionArgs):
return True
-class EqualityTestsMixin(object):
+class EqualityTestsMixin:
"""
A mixin defining tests for the standard implementation of C{==} and C{!=}.
"""
@@ -128,7 +126,7 @@ class EqualityTestsMixin(object):
operand if it is of an unrelated type.
"""
- class Delegate(object):
+ class Delegate:
def __eq__(self, other):
# Do something crazy and obvious.
return [self]
@@ -143,7 +141,7 @@ class EqualityTestsMixin(object):
operand if it is of an unrelated type.
"""
- class Delegate(object):
+ class Delegate:
def __ne__(self, other):
# Do something crazy and obvious.
return [self]
@@ -154,7 +152,4 @@ class EqualityTestsMixin(object):
# The type name expected in warnings about using the wrong string type.
-if PY2:
- WARNING_TYPE_EXPECTED = "unicode"
-else:
- WARNING_TYPE_EXPECTED = "str"
+WARNING_TYPE_EXPECTED = "str"