summaryrefslogtreecommitdiffstats
path: root/contrib/python/pyOpenSSL
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/python/pyOpenSSL')
-rw-r--r--contrib/python/pyOpenSSL/py2/ya.make2
-rw-r--r--contrib/python/pyOpenSSL/py3/.dist-info/METADATA88
-rw-r--r--contrib/python/pyOpenSSL/py3/OpenSSL/SSL.py390
-rw-r--r--contrib/python/pyOpenSSL/py3/OpenSSL/__init__.py2
-rw-r--r--contrib/python/pyOpenSSL/py3/OpenSSL/_util.py75
-rw-r--r--contrib/python/pyOpenSSL/py3/OpenSSL/crypto.py777
-rw-r--r--contrib/python/pyOpenSSL/py3/OpenSSL/debug.py6
-rw-r--r--contrib/python/pyOpenSSL/py3/OpenSSL/rand.py4
-rw-r--r--contrib/python/pyOpenSSL/py3/OpenSSL/version.py4
-rw-r--r--contrib/python/pyOpenSSL/py3/README.rst2
-rw-r--r--contrib/python/pyOpenSSL/py3/patches/01-fix-tests.patch8
-rw-r--r--contrib/python/pyOpenSSL/py3/tests/memdbg.py1
-rw-r--r--contrib/python/pyOpenSSL/py3/tests/test_crypto.py363
-rw-r--r--contrib/python/pyOpenSSL/py3/tests/test_debug.py2
-rw-r--r--contrib/python/pyOpenSSL/py3/tests/test_rand.py2
-rw-r--r--contrib/python/pyOpenSSL/py3/tests/test_ssl.py709
-rw-r--r--contrib/python/pyOpenSSL/py3/tests/test_util.py2
-rw-r--r--contrib/python/pyOpenSSL/py3/tests/util.py13
-rw-r--r--contrib/python/pyOpenSSL/py3/ya.make5
19 files changed, 1536 insertions, 919 deletions
diff --git a/contrib/python/pyOpenSSL/py2/ya.make b/contrib/python/pyOpenSSL/py2/ya.make
index 6c1e686c4f8..61ed4e98800 100644
--- a/contrib/python/pyOpenSSL/py2/ya.make
+++ b/contrib/python/pyOpenSSL/py2/ya.make
@@ -7,7 +7,7 @@ VERSION(21.0.0)
LICENSE(Apache-2.0)
PEERDIR(
- contrib/python/cryptography
+ contrib/python/cryptography/py2
contrib/python/six
)
diff --git a/contrib/python/pyOpenSSL/py3/.dist-info/METADATA b/contrib/python/pyOpenSSL/py3/.dist-info/METADATA
index 43ea7f5813d..e4139673e38 100644
--- a/contrib/python/pyOpenSSL/py3/.dist-info/METADATA
+++ b/contrib/python/pyOpenSSL/py3/.dist-info/METADATA
@@ -1,35 +1,35 @@
Metadata-Version: 2.1
Name: pyOpenSSL
-Version: 21.0.0
+Version: 23.0.0
Summary: Python wrapper module around the OpenSSL library
Home-page: https://pyopenssl.org/
Author: The pyOpenSSL developers
Author-email: [email protected]
License: Apache License, Version 2.0
-Platform: UNKNOWN
+Project-URL: Source, https://github.com/pyca/pyopenssl
Classifier: Development Status :: 6 - Mature
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: MacOS :: MacOS X
Classifier: Operating System :: Microsoft :: Windows
Classifier: Operating System :: POSIX
-Classifier: Programming Language :: Python :: 2
-Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
+Classifier: Programming Language :: Python :: 3.10
+Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Topic :: Security :: Cryptography
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Networking
-Requires-Python: >=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*
-Requires-Dist: cryptography (>=3.3)
-Requires-Dist: six (>=1.5.2)
+Requires-Python: >=3.6
+License-File: LICENSE
+Requires-Dist: cryptography (<40,>=38.0.0)
Provides-Extra: docs
-Requires-Dist: sphinx ; extra == 'docs'
+Requires-Dist: sphinx (!=5.2.0,!=5.2.0.post0) ; extra == 'docs'
Requires-Dist: sphinx-rtd-theme ; extra == 'docs'
Provides-Extra: test
Requires-Dist: flaky ; extra == 'test'
@@ -74,7 +74,7 @@ If you run into bugs, you can file them in our `issue tracker`_.
We maintain a cryptography-dev_ mailing list for both user and development discussions.
-You can also join ``#cryptography-dev`` on Freenode to ask questions or get involved.
+You can also join ``#pyca`` on ``irc.libera.chat`` to ask questions or get involved.
.. _documentation: https://pyopenssl.org/
@@ -87,7 +87,73 @@ You can also join ``#cryptography-dev`` on Freenode to ask questions or get invo
Release Information
===================
-21.0.0 (2020-09-28)
+23.0.0 (2023-01-01)
+-------------------
+
+Backward-incompatible changes:
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Deprecations:
+^^^^^^^^^^^^^
+
+Changes:
+^^^^^^^^
+
+- Add ``OpenSSL.SSL.X509StoreFlags.PARTIAL_CHAIN`` constant to allow for users
+ to perform certificate verification on partial certificate chains.
+ `#1166 <https://github.com/pyca/pyopenssl/pull/1166>`_
+- ``cryptography`` maximum version has been increased to 39.0.x.
+
+22.1.0 (2022-09-25)
+-------------------
+
+Backward-incompatible changes:
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- Remove support for SSLv2 and SSLv3.
+- The minimum ``cryptography`` version is now 38.0.x (and we now pin releases
+ against ``cryptography`` major versions to prevent future breakage)
+- The ``OpenSSL.crypto.X509StoreContextError`` exception has been refactored,
+ changing its internal attributes.
+ `#1133 <https://github.com/pyca/pyopenssl/pull/1133>`_
+
+Deprecations:
+^^^^^^^^^^^^^
+
+- ``OpenSSL.SSL.SSLeay_version`` is deprecated in favor of
+ ``OpenSSL.SSL.OpenSSL_version``. The constants ``OpenSSL.SSL.SSLEAY_*`` are
+ deprecated in favor of ``OpenSSL.SSL.OPENSSL_*``.
+
+Changes:
+^^^^^^^^
+
+- Add ``OpenSSL.SSL.Connection.set_verify`` and ``OpenSSL.SSL.Connection.get_verify_mode``
+ to override the context object's verification flags.
+ `#1073 <https://github.com/pyca/pyopenssl/pull/1073>`_
+- Add ``OpenSSL.SSL.Connection.use_certificate`` and ``OpenSSL.SSL.Connection.use_privatekey``
+ to set a certificate per connection (and not just per context) `#1121 <https://github.com/pyca/pyopenssl/pull/1121>`_.
+
+22.0.0 (2022-01-29)
+-------------------
+
+Backward-incompatible changes:
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- Drop support for Python 2.7.
+ `#1047 <https://github.com/pyca/pyopenssl/pull/1047>`_
+- The minimum ``cryptography`` version is now 35.0.
+
+Deprecations:
+^^^^^^^^^^^^^
+
+Changes:
+^^^^^^^^
+
+- Expose wrappers for some `DTLS
+ <https://en.wikipedia.org/wiki/Datagram_Transport_Layer_Security>`_
+ primitives. `#1026 <https://github.com/pyca/pyopenssl/pull/1026>`_
+
+21.0.0 (2021-09-28)
-------------------
Backward-incompatible changes:
@@ -194,5 +260,3 @@ Changes:
`Full changelog <https://pyopenssl.org/en/stable/changelog.html>`_.
-
-
diff --git a/contrib/python/pyOpenSSL/py3/OpenSSL/SSL.py b/contrib/python/pyOpenSSL/py3/OpenSSL/SSL.py
index e71b044cc02..dfbd1094e9d 100644
--- a/contrib/python/pyOpenSSL/py3/OpenSSL/SSL.py
+++ b/contrib/python/pyOpenSSL/py3/OpenSSL/SSL.py
@@ -1,12 +1,10 @@
import os
import socket
+from errno import errorcode
+from functools import partial, wraps
+from itertools import chain, count
from sys import platform
-from functools import wraps, partial
-from itertools import count, chain
from weakref import WeakValueDictionary
-from errno import errorcode
-
-from six import integer_types, int2byte, indexbytes
from OpenSSL._util import (
UNSPECIFIED as _UNSPECIFIED,
@@ -14,19 +12,17 @@ from OpenSSL._util import (
ffi as _ffi,
lib as _lib,
make_assert as _make_assert,
- native as _native,
- path_string as _path_string,
- text_to_bytes_and_warn as _text_to_bytes_and_warn,
no_zero_allocator as _no_zero_allocator,
+ path_bytes as _path_bytes,
+ text_to_bytes_and_warn as _text_to_bytes_and_warn,
)
-
from OpenSSL.crypto import (
FILETYPE_PEM,
- _PassphraseHelper,
PKey,
- X509Name,
X509,
+ X509Name,
X509Store,
+ _PassphraseHelper,
)
__all__ = [
@@ -36,10 +32,13 @@ __all__ = [
"SSLEAY_PLATFORM",
"SSLEAY_DIR",
"SSLEAY_BUILT_ON",
+ "OPENSSL_VERSION",
+ "OPENSSL_CFLAGS",
+ "OPENSSL_PLATFORM",
+ "OPENSSL_DIR",
+ "OPENSSL_BUILT_ON",
"SENT_SHUTDOWN",
"RECEIVED_SHUTDOWN",
- "SSLv2_METHOD",
- "SSLv3_METHOD",
"SSLv23_METHOD",
"TLSv1_METHOD",
"TLSv1_1_METHOD",
@@ -47,6 +46,9 @@ __all__ = [
"TLS_METHOD",
"TLS_SERVER_METHOD",
"TLS_CLIENT_METHOD",
+ "DTLS_METHOD",
+ "DTLS_SERVER_METHOD",
+ "DTLS_CLIENT_METHOD",
"SSL3_VERSION",
"TLS1_VERSION",
"TLS1_1_VERSION",
@@ -57,7 +59,6 @@ __all__ = [
"OP_NO_TLSv1",
"OP_NO_TLSv1_1",
"OP_NO_TLSv1_2",
- "OP_NO_TLSv1_3",
"MODE_RELEASE_BUFFERS",
"OP_SINGLE_DH_USE",
"OP_SINGLE_ECDH_USE",
@@ -124,26 +125,17 @@ __all__ = [
"Connection",
]
-try:
- _buffer = buffer
-except NameError:
-
- class _buffer(object):
- pass
-
OPENSSL_VERSION_NUMBER = _lib.OPENSSL_VERSION_NUMBER
-SSLEAY_VERSION = _lib.SSLEAY_VERSION
-SSLEAY_CFLAGS = _lib.SSLEAY_CFLAGS
-SSLEAY_PLATFORM = _lib.SSLEAY_PLATFORM
-SSLEAY_DIR = _lib.SSLEAY_DIR
-SSLEAY_BUILT_ON = _lib.SSLEAY_BUILT_ON
+OPENSSL_VERSION = SSLEAY_VERSION = _lib.OPENSSL_VERSION
+OPENSSL_CFLAGS = SSLEAY_CFLAGS = _lib.OPENSSL_CFLAGS
+OPENSSL_PLATFORM = SSLEAY_PLATFORM = _lib.OPENSSL_PLATFORM
+OPENSSL_DIR = SSLEAY_DIR = _lib.OPENSSL_DIR
+OPENSSL_BUILT_ON = SSLEAY_BUILT_ON = _lib.OPENSSL_BUILT_ON
SENT_SHUTDOWN = _lib.SSL_SENT_SHUTDOWN
RECEIVED_SHUTDOWN = _lib.SSL_RECEIVED_SHUTDOWN
-SSLv2_METHOD = 1
-SSLv3_METHOD = 2
SSLv23_METHOD = 3
TLSv1_METHOD = 4
TLSv1_1_METHOD = 5
@@ -151,6 +143,9 @@ TLSv1_2_METHOD = 6
TLS_METHOD = 7
TLS_SERVER_METHOD = 8
TLS_CLIENT_METHOD = 9
+DTLS_METHOD = 10
+DTLS_SERVER_METHOD = 11
+DTLS_CLIENT_METHOD = 12
try:
SSL3_VERSION = _lib.SSL3_VERSION
@@ -174,6 +169,7 @@ OP_NO_TLSv1_1 = _lib.SSL_OP_NO_TLSv1_1
OP_NO_TLSv1_2 = _lib.SSL_OP_NO_TLSv1_2
try:
OP_NO_TLSv1_3 = _lib.SSL_OP_NO_TLSv1_3
+ __all__.append("OP_NO_TLSv1_3")
except AttributeError:
pass
@@ -208,6 +204,18 @@ OP_NO_QUERY_MTU = _lib.SSL_OP_NO_QUERY_MTU
OP_COOKIE_EXCHANGE = _lib.SSL_OP_COOKIE_EXCHANGE
OP_NO_TICKET = _lib.SSL_OP_NO_TICKET
+try:
+ OP_NO_RENEGOTIATION = _lib.SSL_OP_NO_RENEGOTIATION
+ __all__.append("OP_NO_RENEGOTIATION")
+except AttributeError:
+ pass
+
+try:
+ OP_IGNORE_UNEXPECTED_EOF = _lib.SSL_OP_IGNORE_UNEXPECTED_EOF
+ __all__.append("OP_IGNORE_UNEXPECTED_EOF")
+except AttributeError:
+ pass
+
OP_ALL = _lib.SSL_OP_ALL
VERIFY_PEER = _lib.SSL_VERIFY_PEER
@@ -257,8 +265,8 @@ _CERTIFICATE_PATH_LOCATIONS = [
# These values are compared to output from cffi's ffi.string so they must be
# byte strings.
-_CRYPTOGRAPHY_MANYLINUX1_CA_DIR = b"/opt/pyca/cryptography/openssl/certs"
-_CRYPTOGRAPHY_MANYLINUX1_CA_FILE = b"/opt/pyca/cryptography/openssl/cert.pem"
+_CRYPTOGRAPHY_MANYLINUX_CA_DIR = b"/opt/pyca/cryptography/openssl/certs"
+_CRYPTOGRAPHY_MANYLINUX_CA_FILE = b"/opt/pyca/cryptography/openssl/cert.pem"
class Error(Exception):
@@ -291,7 +299,7 @@ class SysCallError(Error):
pass
-class _CallbackExceptionHelper(object):
+class _CallbackExceptionHelper:
"""
A base class for wrapper classes that allow for intelligent exception
handling in OpenSSL callbacks.
@@ -381,7 +389,7 @@ class _ALPNSelectHelper(_CallbackExceptionHelper):
instr = _ffi.buffer(in_, inlen)[:]
protolist = []
while instr:
- encoded_len = indexbytes(instr, 0)
+ encoded_len = instr[0]
proto = instr[1 : encoded_len + 1]
protolist.append(proto)
instr = instr[encoded_len + 1 :]
@@ -549,17 +557,59 @@ class _OCSPClientCallbackHelper(_CallbackExceptionHelper):
self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper)
+class _CookieGenerateCallbackHelper(_CallbackExceptionHelper):
+ def __init__(self, callback):
+ _CallbackExceptionHelper.__init__(self)
+
+ @wraps(callback)
+ def wrapper(ssl, out, outlen):
+ try:
+ conn = Connection._reverse_mapping[ssl]
+ cookie = callback(conn)
+ out[0 : len(cookie)] = cookie
+ outlen[0] = len(cookie)
+ return 1
+ except Exception as e:
+ self._problems.append(e)
+ # "a zero return value can be used to abort the handshake"
+ return 0
+
+ self.callback = _ffi.callback(
+ "int (*)(SSL *, unsigned char *, unsigned int *)",
+ wrapper,
+ )
+
+
+class _CookieVerifyCallbackHelper(_CallbackExceptionHelper):
+ def __init__(self, callback):
+ _CallbackExceptionHelper.__init__(self)
+
+ @wraps(callback)
+ def wrapper(ssl, c_cookie, cookie_len):
+ try:
+ conn = Connection._reverse_mapping[ssl]
+ return callback(conn, bytes(c_cookie[0:cookie_len]))
+ except Exception as e:
+ self._problems.append(e)
+ return 0
+
+ self.callback = _ffi.callback(
+ "int (*)(SSL *, unsigned char *, unsigned int)",
+ wrapper,
+ )
+
+
def _asFileDescriptor(obj):
fd = None
- if not isinstance(obj, integer_types):
+ if not isinstance(obj, int):
meth = getattr(obj, "fileno", None)
if meth is not None:
obj = meth()
- if isinstance(obj, integer_types):
+ if isinstance(obj, int):
fd = obj
- if not isinstance(fd, integer_types):
+ if not isinstance(fd, int):
raise TypeError("argument must be an int, or have a fileno() method.")
elif fd < 0:
raise ValueError(
@@ -569,13 +619,16 @@ def _asFileDescriptor(obj):
return fd
-def SSLeay_version(type):
+def OpenSSL_version(type):
"""
Return a string describing the version of OpenSSL in use.
- :param type: One of the :const:`SSLEAY_` constants defined in this module.
+ :param type: One of the :const:`OPENSSL_` constants defined in this module.
"""
- return _ffi.string(_lib.SSLeay_version(type))
+ return _ffi.string(_lib.OpenSSL_version(type))
+
+
+SSLeay_version = OpenSSL_version
def _make_requires(flag, error):
@@ -613,7 +666,7 @@ _requires_keylog = _make_requires(
)
-class Session(object):
+class Session:
"""
A class representing an SSL session. A session defines certain connection
parameters which may be re-used to speed up the setup of subsequent
@@ -625,39 +678,36 @@ class Session(object):
pass
-class Context(object):
+class Context:
"""
:class:`OpenSSL.SSL.Context` instances define the parameters for setting
up new SSL connections.
- :param method: One of TLS_METHOD, TLS_CLIENT_METHOD, or TLS_SERVER_METHOD.
+ :param method: One of TLS_METHOD, TLS_CLIENT_METHOD, TLS_SERVER_METHOD,
+ DTLS_METHOD, DTLS_CLIENT_METHOD, or DTLS_SERVER_METHOD.
SSLv23_METHOD, TLSv1_METHOD, etc. are deprecated and should
not be used.
"""
_methods = {
- SSLv2_METHOD: "SSLv2_method",
- SSLv3_METHOD: "SSLv3_method",
- SSLv23_METHOD: "SSLv23_method",
- TLSv1_METHOD: "TLSv1_method",
- TLSv1_1_METHOD: "TLSv1_1_method",
- TLSv1_2_METHOD: "TLSv1_2_method",
- TLS_METHOD: "TLS_method",
- TLS_SERVER_METHOD: "TLS_server_method",
- TLS_CLIENT_METHOD: "TLS_client_method",
+ SSLv23_METHOD: (_lib.TLS_method, None),
+ TLSv1_METHOD: (_lib.TLS_method, TLS1_VERSION),
+ TLSv1_1_METHOD: (_lib.TLS_method, TLS1_1_VERSION),
+ TLSv1_2_METHOD: (_lib.TLS_method, TLS1_2_VERSION),
+ TLS_METHOD: (_lib.TLS_method, None),
+ TLS_SERVER_METHOD: (_lib.TLS_server_method, None),
+ TLS_CLIENT_METHOD: (_lib.TLS_client_method, None),
+ DTLS_METHOD: (_lib.DTLS_method, None),
+ DTLS_SERVER_METHOD: (_lib.DTLS_server_method, None),
+ DTLS_CLIENT_METHOD: (_lib.DTLS_client_method, None),
}
- _methods = dict(
- (identifier, getattr(_lib, name))
- for (identifier, name) in _methods.items()
- if getattr(_lib, name, None) is not None
- )
def __init__(self, method):
- if not isinstance(method, integer_types):
+ if not isinstance(method, int):
raise TypeError("method must be an integer")
try:
- method_func = self._methods[method]
+ method_func, version = self._methods[method]
except KeyError:
raise ValueError("No such protocol")
@@ -668,12 +718,6 @@ class Context(object):
_openssl_assert(context != _ffi.NULL)
context = _ffi.gc(context, _lib.SSL_CTX_free)
- # Set SSL_CTX_set_ecdh_auto so that the ECDH curve will be
- # auto-selected. This function was added in 1.0.2 and made a noop in
- # 1.1.0+ (where it is set automatically).
- res = _lib.SSL_CTX_set_ecdh_auto(context, 1)
- _openssl_assert(res == 1)
-
self._context = context
self._passphrase_helper = None
self._passphrase_callback = None
@@ -689,8 +733,13 @@ class Context(object):
self._ocsp_helper = None
self._ocsp_callback = None
self._ocsp_data = None
+ self._cookie_generate_helper = None
+ self._cookie_verify_helper = None
self.set_mode(_lib.SSL_MODE_ENABLE_PARTIAL_WRITE)
+ if version is not None:
+ self.set_min_proto_version(version)
+ self.set_max_proto_version(version)
def set_min_proto_version(self, version):
"""
@@ -737,12 +786,12 @@ class Context(object):
if cafile is None:
cafile = _ffi.NULL
else:
- cafile = _path_string(cafile)
+ cafile = _path_bytes(cafile)
if capath is None:
capath = _ffi.NULL
else:
- capath = _path_string(capath)
+ capath = _path_bytes(capath)
load_result = _lib.SSL_CTX_load_verify_locations(
self._context, cafile, capath
@@ -828,8 +877,8 @@ class Context(object):
# to the exact values we use in our manylinux1 builds. If they are
# then we know to load the fallbacks
if (
- default_dir == _CRYPTOGRAPHY_MANYLINUX1_CA_DIR
- and default_file == _CRYPTOGRAPHY_MANYLINUX1_CA_FILE
+ default_dir == _CRYPTOGRAPHY_MANYLINUX_CA_DIR
+ and default_file == _CRYPTOGRAPHY_MANYLINUX_CA_FILE
):
# This is manylinux1, let's load our fallback paths
self._fallback_default_verify_paths(
@@ -876,7 +925,7 @@ class Context(object):
:return: None
"""
- certfile = _path_string(certfile)
+ certfile = _path_bytes(certfile)
result = _lib.SSL_CTX_use_certificate_chain_file(
self._context, certfile
@@ -896,8 +945,8 @@ class Context(object):
:return: None
"""
- certfile = _path_string(certfile)
- if not isinstance(filetype, integer_types):
+ certfile = _path_bytes(certfile)
+ if not isinstance(filetype, int):
raise TypeError("filetype must be an integer")
use_result = _lib.SSL_CTX_use_certificate_file(
@@ -913,6 +962,7 @@ class Context(object):
:param cert: The X509 object
:return: None
"""
+ # Mirrored at Connection.use_certificate
if not isinstance(cert, X509):
raise TypeError("cert must be an X509 instance")
@@ -954,11 +1004,11 @@ class Context(object):
:return: None
"""
- keyfile = _path_string(keyfile)
+ keyfile = _path_bytes(keyfile)
if filetype is _UNSPECIFIED:
filetype = FILETYPE_PEM
- elif not isinstance(filetype, integer_types):
+ elif not isinstance(filetype, int):
raise TypeError("filetype must be an integer")
use_result = _lib.SSL_CTX_use_PrivateKey_file(
@@ -974,6 +1024,7 @@ class Context(object):
:param pkey: The PKey object
:return: None
"""
+ # Mirrored at Connection.use_privatekey
if not isinstance(pkey, PKey):
raise TypeError("pkey must be a PKey instance")
@@ -1035,7 +1086,7 @@ class Context(object):
.. versionadded:: 0.14
"""
- if not isinstance(mode, integer_types):
+ if not isinstance(mode, int):
raise TypeError("mode must be an integer")
return _lib.SSL_CTX_set_session_cache_mode(self._context, mode)
@@ -1070,7 +1121,7 @@ class Context(object):
See SSL_CTX_set_verify(3SSL) for further details.
"""
- if not isinstance(mode, integer_types):
+ if not isinstance(mode, int):
raise TypeError("mode must be an integer")
if callback is None:
@@ -1093,7 +1144,7 @@ class Context(object):
:param depth: An integer specifying the verify depth
:return: None
"""
- if not isinstance(depth, integer_types):
+ if not isinstance(depth, int):
raise TypeError("depth must be an integer")
_lib.SSL_CTX_set_verify_depth(self._context, depth)
@@ -1125,7 +1176,7 @@ class Context(object):
:return: None
"""
- dhfile = _path_string(dhfile)
+ dhfile = _path_bytes(dhfile)
bio = _lib.BIO_new_file(dhfile, b"r")
if bio == _ffi.NULL:
@@ -1253,7 +1304,7 @@ class Context(object):
:param timeout: The timeout in (whole) seconds
:return: The previous session timeout
"""
- if not isinstance(timeout, integer_types):
+ if not isinstance(timeout, int):
raise TypeError("timeout must be an integer")
return _lib.SSL_CTX_set_timeout(self._context, timeout)
@@ -1356,7 +1407,7 @@ class Context(object):
:param options: The options to add.
:return: The new option bitmask.
"""
- if not isinstance(options, integer_types):
+ if not isinstance(options, int):
raise TypeError("options must be an integer")
return _lib.SSL_CTX_set_options(self._context, options)
@@ -1369,7 +1420,7 @@ class Context(object):
:param mode: The mode to add.
:return: The new mode bitmask.
"""
- if not isinstance(mode, integer_types):
+ if not isinstance(mode, int):
raise TypeError("mode must be an integer")
return _lib.SSL_CTX_set_mode(self._context, mode)
@@ -1423,10 +1474,16 @@ class Context(object):
This list should be a Python list of bytestrings representing the
protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
"""
+ # Different versions of OpenSSL are inconsistent about how they handle
+ # empty proto lists (see #1043), so we avoid the problem entirely by
+ # rejecting them ourselves.
+ if not protos:
+ raise ValueError("at least one protocol must be specified")
+
# Take the list of protocols and join them together, prefixing them
# with their lengths.
protostr = b"".join(
- chain.from_iterable((int2byte(len(p)), p) for p in protos)
+ chain.from_iterable((bytes((len(p),)), p) for p in protos)
)
# Build a C string from the list. We don't need to save this off
@@ -1523,8 +1580,22 @@ class Context(object):
helper = _OCSPClientCallbackHelper(callback)
self._set_ocsp_callback(helper, data)
+ def set_cookie_generate_callback(self, callback):
+ self._cookie_generate_helper = _CookieGenerateCallbackHelper(callback)
+ _lib.SSL_CTX_set_cookie_generate_cb(
+ self._context,
+ self._cookie_generate_helper.callback,
+ )
+
+ def set_cookie_verify_callback(self, callback):
+ self._cookie_verify_helper = _CookieVerifyCallbackHelper(callback)
+ _lib.SSL_CTX_set_cookie_verify_cb(
+ self._context,
+ self._cookie_verify_helper.callback,
+ )
+
-class Connection(object):
+class Connection:
_reverse_mapping = WeakValueDictionary()
def __init__(self, context, socket=None):
@@ -1560,6 +1631,10 @@ class Connection(object):
self._verify_helper = context._verify_helper
self._verify_callback = context._verify_callback
+ # And likewise for the cookie callbacks
+ self._cookie_generate_helper = context._cookie_generate_helper
+ self._cookie_verify_helper = context._cookie_verify_helper
+
self._reverse_mapping[self._ssl] = self
if socket is None:
@@ -1626,6 +1701,24 @@ class Connection(object):
else:
# TODO: This is untested.
_raise_current_error()
+ elif error == _lib.SSL_ERROR_SSL and _lib.ERR_peek_error() != 0:
+ # In 3.0.x an unexpected EOF no longer triggers syscall error
+ # but we want to maintain compatibility so we check here and
+ # raise syscall if it is an EOF. Since we're not actually sure
+ # what else could raise SSL_ERROR_SSL we check for the presence
+ # of the OpenSSL 3 constant SSL_R_UNEXPECTED_EOF_WHILE_READING
+ # and if it's not present we just raise an error, which matches
+ # the behavior before we added this elif section
+ peeked_error = _lib.ERR_peek_error()
+ reason = _lib.ERR_GET_REASON(peeked_error)
+ if _lib.Cryptography_HAS_UNEXPECTED_EOF_WHILE_READING:
+ _openssl_assert(
+ reason == _lib.SSL_R_UNEXPECTED_EOF_WHILE_READING
+ )
+ _lib.ERR_clear_error()
+ raise SysCallError(-1, "Unexpected EOF")
+ else:
+ _raise_current_error()
elif error == _lib.SSL_ERROR_NONE:
pass
else:
@@ -1668,6 +1761,94 @@ class Connection(object):
return _ffi.string(name)
+ def set_verify(self, mode, callback=None):
+ """
+ Override the Context object's verification flags for this specific
+ connection. See :py:meth:`Context.set_verify` for details.
+ """
+ if not isinstance(mode, int):
+ raise TypeError("mode must be an integer")
+
+ if callback is None:
+ self._verify_helper = None
+ self._verify_callback = None
+ _lib.SSL_set_verify(self._ssl, mode, _ffi.NULL)
+ else:
+ if not callable(callback):
+ raise TypeError("callback must be callable")
+
+ self._verify_helper = _VerifyHelper(callback)
+ self._verify_callback = self._verify_helper.callback
+ _lib.SSL_set_verify(self._ssl, mode, self._verify_callback)
+
+ def get_verify_mode(self):
+ """
+ Retrieve the Connection object's verify mode, as set by
+ :meth:`set_verify`.
+
+ :return: The verify mode
+ """
+ return _lib.SSL_get_verify_mode(self._ssl)
+
+ def use_certificate(self, cert):
+ """
+ Load a certificate from a X509 object
+
+ :param cert: The X509 object
+ :return: None
+ """
+ # Mirrored from Context.use_certificate
+ if not isinstance(cert, X509):
+ raise TypeError("cert must be an X509 instance")
+
+ use_result = _lib.SSL_use_certificate(self._ssl, cert._x509)
+ if not use_result:
+ _raise_current_error()
+
+ def use_privatekey(self, pkey):
+ """
+ Load a private key from a PKey object
+
+ :param pkey: The PKey object
+ :return: None
+ """
+ # Mirrored from Context.use_privatekey
+ if not isinstance(pkey, PKey):
+ raise TypeError("pkey must be a PKey instance")
+
+ use_result = _lib.SSL_use_PrivateKey(self._ssl, pkey._pkey)
+ if not use_result:
+ self._context._raise_passphrase_exception()
+
+ def set_ciphertext_mtu(self, mtu):
+ """
+ For DTLS, set the maximum UDP payload size (*not* including IP/UDP
+ overhead).
+
+ Note that you might have to set :data:`OP_NO_QUERY_MTU` to prevent
+ OpenSSL from spontaneously clearing this.
+
+ :param mtu: An integer giving the maximum transmission unit.
+
+ .. versionadded:: 21.1
+ """
+ _lib.SSL_set_mtu(self._ssl, mtu)
+
+ def get_cleartext_mtu(self):
+ """
+ For DTLS, get the maximum size of unencrypted data you can pass to
+ :meth:`write` without exceeding the MTU (as passed to
+ :meth:`set_ciphertext_mtu`).
+
+ :return: The effective MTU as an integer.
+
+ .. versionadded:: 21.1
+ """
+
+ if not hasattr(_lib, "DTLS_get_data_mtu"):
+ raise NotImplementedError("requires OpenSSL 1.1.1 or better")
+ return _lib.DTLS_get_data_mtu(self._ssl)
+
def set_tlsext_host_name(self, name):
"""
Set the value of the servername extension to send in the client hello.
@@ -1839,7 +2020,7 @@ class Connection(object):
if self._from_ssl is None:
raise TypeError("Connection sock was not None")
- if not isinstance(bufsiz, integer_types):
+ if not isinstance(bufsiz, int):
raise TypeError("bufsiz must be an integer")
buf = _no_zero_allocator("char[]", bufsiz)
@@ -1953,6 +2134,32 @@ class Connection(object):
conn.set_accept_state()
return (conn, addr)
+ def DTLSv1_listen(self):
+ """
+ Call the OpenSSL function DTLSv1_listen on this connection. See the
+ OpenSSL manual for more details.
+
+ :return: None
+ """
+ # Possible future extension: return the BIO_ADDR in some form.
+ bio_addr = _lib.BIO_ADDR_new()
+ try:
+ result = _lib.DTLSv1_listen(self._ssl, bio_addr)
+ finally:
+ _lib.BIO_ADDR_free(bio_addr)
+ # DTLSv1_listen is weird. A zero return value means 'didn't find a
+ # ClientHello with valid cookie, but keep trying'. So basically
+ # WantReadError. But it doesn't work correctly with _raise_ssl_error.
+ # So we raise it manually instead.
+ if self._cookie_generate_helper is not None:
+ self._cookie_generate_helper.raise_if_problem()
+ if self._cookie_verify_helper is not None:
+ self._cookie_verify_helper.raise_if_problem()
+ if result == 0:
+ raise WantReadError()
+ if result < 0:
+ self._raise_ssl_error(self._ssl, result)
+
def bio_shutdown(self):
"""
If the Connection was created with a memory BIO, this method can be
@@ -1994,7 +2201,7 @@ class Connection(object):
result = _lib.SSL_get_cipher_list(self._ssl, i)
if result == _ffi.NULL:
break
- ciphers.append(_native(_ffi.string(result)))
+ ciphers.append(_ffi.string(result).decode("utf-8"))
return ciphers
def get_client_ca_list(self):
@@ -2070,7 +2277,7 @@ class Connection(object):
:param state: bitvector of SENT_SHUTDOWN, RECEIVED_SHUTDOWN.
:return: None
"""
- if not isinstance(state, integer_types):
+ if not isinstance(state, int):
raise TypeError("state must be an integer")
_lib.SSL_set_shutdown(self._ssl, state)
@@ -2451,10 +2658,16 @@ class Connection(object):
This list should be a Python list of bytestrings representing the
protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
"""
+ # Different versions of OpenSSL are inconsistent about how they handle
+ # empty proto lists (see #1043), so we avoid the problem entirely by
+ # rejecting them ourselves.
+ if not protos:
+ raise ValueError("at least one protocol must be specified")
+
# Take the list of protocols and join them together, prefixing them
# with their lengths.
protostr = b"".join(
- chain.from_iterable((int2byte(len(p)), p) for p in protos)
+ chain.from_iterable((bytes((len(p),)), p) for p in protos)
)
# Build a C string from the list. We don't need to save this off
@@ -2475,7 +2688,7 @@ class Connection(object):
Get the protocol that was negotiated by ALPN.
:returns: A bytestring of the protocol name. If no protocol has been
- negotiated yet, returns an empty string.
+ negotiated yet, returns an empty bytestring.
"""
data = _ffi.new("unsigned char **")
data_len = _ffi.new("unsigned int *")
@@ -2498,8 +2711,3 @@ class Connection(object):
self._ssl, _lib.TLSEXT_STATUSTYPE_ocsp
)
_openssl_assert(rc == 1)
-
-
-# This is similar to the initialization calls at the end of OpenSSL/crypto.py
-# but is exercised mostly by the Context initializer.
-_lib.SSL_library_init()
diff --git a/contrib/python/pyOpenSSL/py3/OpenSSL/__init__.py b/contrib/python/pyOpenSSL/py3/OpenSSL/__init__.py
index 11e896a4ea2..0af3acdb8fd 100644
--- a/contrib/python/pyOpenSSL/py3/OpenSSL/__init__.py
+++ b/contrib/python/pyOpenSSL/py3/OpenSSL/__init__.py
@@ -5,7 +5,7 @@
pyOpenSSL - A simple wrapper around the OpenSSL library
"""
-from OpenSSL import crypto, SSL
+from OpenSSL import SSL, crypto
from OpenSSL.version import (
__author__,
__copyright__,
diff --git a/contrib/python/pyOpenSSL/py3/OpenSSL/_util.py b/contrib/python/pyOpenSSL/py3/OpenSSL/_util.py
index 53c0b9e573c..7a102e6c910 100644
--- a/contrib/python/pyOpenSSL/py3/OpenSSL/_util.py
+++ b/contrib/python/pyOpenSSL/py3/OpenSSL/_util.py
@@ -1,13 +1,13 @@
+import os
import sys
import warnings
-
-from six import PY2, text_type
+from typing import Any, Callable, NoReturn, Type, Union
from cryptography.hazmat.bindings.openssl.binding import Binding
+StrOrBytesPath = Union[str, bytes, os.PathLike]
binding = Binding()
-binding.init_static_locks()
ffi = binding.ffi
lib = binding.lib
@@ -18,7 +18,7 @@ lib = binding.lib
no_zero_allocator = ffi.new_allocator(should_clear_after_alloc=False)
-def text(charp):
+def text(charp: Any) -> str:
"""
Get a native string type representing of the given CFFI ``char*`` object.
@@ -28,10 +28,10 @@ def text(charp):
"""
if not charp:
return ""
- return native(ffi.string(charp))
+ return ffi.string(charp).decode("utf-8")
-def exception_from_error_queue(exception_type):
+def exception_from_error_queue(exception_type: Type[Exception]) -> NoReturn:
"""
Convert an OpenSSL library failure into a Python exception.
@@ -57,13 +57,13 @@ def exception_from_error_queue(exception_type):
raise exception_type(errors)
-def make_assert(error):
+def make_assert(error: Type[Exception]) -> Callable[[bool], Any]:
"""
Create an assert function that uses :func:`exception_from_error_queue` to
raise an exception wrapped by *error*.
"""
- def openssl_assert(ok):
+ def openssl_assert(ok: bool) -> None:
"""
If *ok* is not True, retrieve the error from OpenSSL and raise it.
"""
@@ -73,66 +73,35 @@ def make_assert(error):
return openssl_assert
-def native(s):
+def path_bytes(s: StrOrBytesPath) -> bytes:
"""
- Convert :py:class:`bytes` or :py:class:`unicode` to the native
- :py:class:`str` type, using UTF-8 encoding if conversion is necessary.
-
- :raise UnicodeError: The input string is not UTF-8 decodeable.
+ Convert a Python path to a :py:class:`bytes` for the path which can be
+ passed into an OpenSSL API accepting a filename.
- :raise TypeError: The input is neither :py:class:`bytes` nor
- :py:class:`unicode`.
- """
- if not isinstance(s, (bytes, text_type)):
- raise TypeError("%r is neither bytes nor unicode" % s)
- if PY2:
- if isinstance(s, text_type):
- return s.encode("utf-8")
- else:
- if isinstance(s, bytes):
- return s.decode("utf-8")
- return s
-
-
-def path_string(s):
- """
- Convert a Python string to a :py:class:`bytes` string identifying the same
- path and which can be passed into an OpenSSL API accepting a filename.
-
- :param s: An instance of :py:class:`bytes` or :py:class:`unicode`.
+ :param s: A path (valid for os.fspath).
:return: An instance of :py:class:`bytes`.
"""
- if isinstance(s, bytes):
- return s
- elif isinstance(s, text_type):
- return s.encode(sys.getfilesystemencoding())
- else:
- raise TypeError("Path must be represented as bytes or unicode string")
-
-
-if PY2:
-
- def byte_string(s):
- return s
+ b = os.fspath(s)
+ if isinstance(b, str):
+ return b.encode(sys.getfilesystemencoding())
+ else:
+ return b
-else:
- def byte_string(s):
- return s.encode("charmap")
+def byte_string(s: str) -> bytes:
+ return s.encode("charmap")
# A marker object to observe whether some optional arguments are passed any
# value or not.
UNSPECIFIED = object()
-_TEXT_WARNING = (
- text_type.__name__ + " for {0} is no longer accepted, use bytes"
-)
+_TEXT_WARNING = "str for {0} is no longer accepted, use bytes"
-def text_to_bytes_and_warn(label, obj):
+def text_to_bytes_and_warn(label: str, obj: Any) -> Any:
"""
If ``obj`` is text, emit a warning that it should be bytes instead and try
to convert it to bytes automatically.
@@ -145,7 +114,7 @@ def text_to_bytes_and_warn(label, obj):
UTF-8 encoding of that text is returned. Otherwise, ``obj`` itself is
returned.
"""
- if isinstance(obj, text_type):
+ if isinstance(obj, str):
warnings.warn(
_TEXT_WARNING.format(label),
category=DeprecationWarning,
diff --git a/contrib/python/pyOpenSSL/py3/OpenSSL/crypto.py b/contrib/python/pyOpenSSL/py3/OpenSSL/crypto.py
index eda4af6f9d9..a9b673c53f5 100644
--- a/contrib/python/pyOpenSSL/py3/OpenSSL/crypto.py
+++ b/contrib/python/pyOpenSSL/py3/OpenSSL/crypto.py
@@ -1,31 +1,44 @@
import calendar
import datetime
-
+import functools
from base64 import b16encode
from functools import partial
-from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__
-
-from six import (
- integer_types as _integer_types,
- text_type as _text_type,
- PY2 as _PY2,
+from os import PathLike
+from typing import (
+ Any,
+ Callable,
+ Iterable,
+ List,
+ NoReturn,
+ Optional,
+ Sequence,
+ Set,
+ Tuple,
+ Type,
+ Union,
)
from cryptography import utils, x509
-from cryptography.hazmat.primitives.asymmetric import dsa, rsa
+from cryptography.hazmat.primitives.asymmetric import (
+ dsa,
+ ec,
+ ed25519,
+ ed448,
+ rsa,
+)
from OpenSSL._util import (
+ UNSPECIFIED as _UNSPECIFIED,
+ byte_string as _byte_string,
+ exception_from_error_queue as _exception_from_error_queue,
ffi as _ffi,
lib as _lib,
- exception_from_error_queue as _exception_from_error_queue,
- byte_string as _byte_string,
- native as _native,
- path_string as _path_string,
- UNSPECIFIED as _UNSPECIFIED,
- text_to_bytes_and_warn as _text_to_bytes_and_warn,
make_assert as _make_assert,
+ path_bytes as _path_bytes,
+ text_to_bytes_and_warn as _text_to_bytes_and_warn,
)
+
__all__ = [
"FILETYPE_PEM",
"FILETYPE_ASN1",
@@ -65,16 +78,24 @@ __all__ = [
"load_pkcs12",
]
-FILETYPE_PEM = _lib.SSL_FILETYPE_PEM
-FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1
+
+_Key = Union[
+ dsa.DSAPrivateKey, dsa.DSAPublicKey, rsa.RSAPrivateKey, rsa.RSAPublicKey
+]
+StrOrBytesPath = Union[str, bytes, PathLike]
+PassphraseCallableT = Union[bytes, Callable[..., bytes]]
+
+
+FILETYPE_PEM: int = _lib.SSL_FILETYPE_PEM
+FILETYPE_ASN1: int = _lib.SSL_FILETYPE_ASN1
# TODO This was an API mistake. OpenSSL has no such constant.
-FILETYPE_TEXT = 2 ** 16 - 1
+FILETYPE_TEXT = 2**16 - 1
-TYPE_RSA = _lib.EVP_PKEY_RSA
-TYPE_DSA = _lib.EVP_PKEY_DSA
-TYPE_DH = _lib.EVP_PKEY_DH
-TYPE_EC = _lib.EVP_PKEY_EC
+TYPE_RSA: int = _lib.EVP_PKEY_RSA
+TYPE_DSA: int = _lib.EVP_PKEY_DSA
+TYPE_DH: int = _lib.EVP_PKEY_DH
+TYPE_EC: int = _lib.EVP_PKEY_EC
class Error(Exception):
@@ -87,20 +108,7 @@ _raise_current_error = partial(_exception_from_error_queue, Error)
_openssl_assert = _make_assert(Error)
-def _get_backend():
- """
- Importing the backend from cryptography has the side effect of activating
- the osrandom engine. This mutates the global state of OpenSSL in the
- process and causes issues for various programs that use subinterpreters or
- embed Python. By putting the import in this function we can avoid
- triggering this side effect unless _get_backend is called.
- """
- from cryptography.hazmat.backends.openssl.backend import backend
-
- return backend
-
-
-def _untested_error(where):
+def _untested_error(where: str) -> NoReturn:
"""
An OpenSSL API failed somehow. Additionally, the failure which was
encountered isn't one that's exercised by the test suite so future behavior
@@ -109,7 +117,7 @@ def _untested_error(where):
raise RuntimeError("Unknown %s failure" % (where,))
-def _new_mem_buf(buffer=None):
+def _new_mem_buf(buffer: Optional[bytes] = None) -> Any:
"""
Allocate a new OpenSSL memory BIO.
@@ -126,7 +134,7 @@ def _new_mem_buf(buffer=None):
bio = _lib.BIO_new_mem_buf(data, len(buffer))
# Keep the memory alive as long as the bio is alive!
- def free(bio, ref=data):
+ def free(bio: Any, ref: Any = data) -> Any:
return _lib.BIO_free(bio)
_openssl_assert(bio != _ffi.NULL)
@@ -135,7 +143,7 @@ def _new_mem_buf(buffer=None):
return bio
-def _bio_to_string(bio):
+def _bio_to_string(bio: Any) -> bytes:
"""
Copy the contents of an OpenSSL BIO object into a Python byte string.
"""
@@ -144,7 +152,7 @@ def _bio_to_string(bio):
return _ffi.buffer(result_buffer[0], buffer_length)[:]
-def _set_asn1_time(boundary, when):
+def _set_asn1_time(boundary: Any, when: bytes) -> None:
"""
The the time value of an ASN1 time object.
@@ -160,13 +168,35 @@ def _set_asn1_time(boundary, when):
"""
if not isinstance(when, bytes):
raise TypeError("when must be a byte string")
+ # ASN1_TIME_set_string validates the string without writing anything
+ # when the destination is NULL.
+ _openssl_assert(boundary != _ffi.NULL)
set_result = _lib.ASN1_TIME_set_string(boundary, when)
if set_result == 0:
raise ValueError("Invalid string")
-def _get_asn1_time(timestamp):
+def _new_asn1_time(when: bytes) -> Any:
+ """
+ Behaves like _set_asn1_time but returns a new ASN1_TIME object.
+
+ @param when: A string representation of the desired time value.
+
+ @raise TypeError: If C{when} is not a L{bytes} string.
+ @raise ValueError: If C{when} does not represent a time in the required
+ format.
+ @raise RuntimeError: If the time value cannot be set for some other
+ (unspecified) reason.
+ """
+ ret = _lib.ASN1_TIME_new()
+ _openssl_assert(ret != _ffi.NULL)
+ ret = _ffi.gc(ret, _lib.ASN1_TIME_free)
+ _set_asn1_time(ret, when)
+ return ret
+
+
+def _get_asn1_time(timestamp: Any) -> Optional[bytes]:
"""
Retrieve the time value of an ASN1 time object.
@@ -182,7 +212,7 @@ def _get_asn1_time(timestamp):
elif (
_lib.ASN1_STRING_type(string_timestamp) == _lib.V_ASN1_GENERALIZEDTIME
):
- return _ffi.string(_lib.ASN1_STRING_data(string_timestamp))
+ return _ffi.string(_lib.ASN1_STRING_get0_data(string_timestamp))
else:
generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**")
_lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp)
@@ -201,26 +231,26 @@ def _get_asn1_time(timestamp):
string_timestamp = _ffi.cast(
"ASN1_STRING*", generalized_timestamp[0]
)
- string_data = _lib.ASN1_STRING_data(string_timestamp)
+ string_data = _lib.ASN1_STRING_get0_data(string_timestamp)
string_result = _ffi.string(string_data)
_lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0])
return string_result
-class _X509NameInvalidator(object):
- def __init__(self):
- self._names = []
+class _X509NameInvalidator:
+ def __init__(self) -> None:
+ self._names: List[X509Name] = []
- def add(self, name):
+ def add(self, name: "X509Name") -> None:
self._names.append(name)
- def clear(self):
+ def clear(self) -> None:
for name in self._names:
# Breaks the object, but also prevents UAF!
del name._name
-class PKey(object):
+class PKey:
"""
A class representing an DSA or RSA public key or key pair.
"""
@@ -228,12 +258,12 @@ class PKey(object):
_only_public = False
_initialized = True
- def __init__(self):
+ def __init__(self) -> None:
pkey = _lib.EVP_PKEY_new()
self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free)
self._initialized = False
- def to_cryptography_key(self):
+ def to_cryptography_key(self) -> _Key:
"""
Export as a ``cryptography`` key.
@@ -249,16 +279,15 @@ class PKey(object):
load_der_public_key,
)
- backend = _get_backend()
if self._only_public:
der = dump_publickey(FILETYPE_ASN1, self)
- return load_der_public_key(der, backend)
+ return load_der_public_key(der)
else:
der = dump_privatekey(FILETYPE_ASN1, self)
- return load_der_private_key(der, None, backend)
+ return load_der_private_key(der, None)
@classmethod
- def from_cryptography_key(cls, crypto_key):
+ def from_cryptography_key(cls, crypto_key: _Key) -> "PKey":
"""
Construct based on a ``cryptography`` *crypto_key*.
@@ -276,6 +305,9 @@ class PKey(object):
rsa.RSAPrivateKey,
dsa.DSAPublicKey,
dsa.DSAPrivateKey,
+ ec.EllipticCurvePrivateKey,
+ ed25519.Ed25519PrivateKey,
+ ed448.Ed448PrivateKey,
),
):
raise TypeError("Unsupported key type")
@@ -300,7 +332,7 @@ class PKey(object):
)
return load_privatekey(FILETYPE_ASN1, der)
- def generate_key(self, type, bits):
+ def generate_key(self, type: int, bits: int) -> None:
"""
Generate a key pair of the given type, with the given number of bits.
@@ -356,7 +388,7 @@ class PKey(object):
self._initialized = True
- def check(self):
+ def check(self) -> bool:
"""
Check the consistency of an RSA private key.
@@ -373,7 +405,7 @@ class PKey(object):
raise TypeError("public key only")
if _lib.EVP_PKEY_type(self.type()) != _lib.EVP_PKEY_RSA:
- raise TypeError("key type unsupported")
+ raise TypeError("Only RSA keys can currently be checked.")
rsa = _lib.EVP_PKEY_get1_RSA(self._pkey)
rsa = _ffi.gc(rsa, _lib.RSA_free)
@@ -382,7 +414,7 @@ class PKey(object):
return True
_raise_current_error()
- def type(self):
+ def type(self) -> int:
"""
Returns the type of the key
@@ -390,7 +422,7 @@ class PKey(object):
"""
return _lib.EVP_PKEY_id(self._pkey)
- def bits(self):
+ def bits(self) -> int:
"""
Returns the number of bits of the key
@@ -399,7 +431,7 @@ class PKey(object):
return _lib.EVP_PKEY_bits(self._pkey)
-class _EllipticCurve(object):
+class _EllipticCurve:
"""
A representation of a supported elliptic curve.
@@ -411,21 +443,19 @@ class _EllipticCurve(object):
_curves = None
- if not _PY2:
- # This only necessary on Python 3. Moreover, it is broken on Python 2.
- def __ne__(self, other):
- """
- Implement cooperation with the right-hand side argument of ``!=``.
+ def __ne__(self, other: Any) -> bool:
+ """
+ Implement cooperation with the right-hand side argument of ``!=``.
- Python 3 seems to have dropped this cooperation in this very narrow
- circumstance.
- """
- if isinstance(other, _EllipticCurve):
- return super(_EllipticCurve, self).__ne__(other)
- return NotImplemented
+ Python 3 seems to have dropped this cooperation in this very narrow
+ circumstance.
+ """
+ if isinstance(other, _EllipticCurve):
+ return super(_EllipticCurve, self).__ne__(other)
+ return NotImplemented
@classmethod
- def _load_elliptic_curves(cls, lib):
+ def _load_elliptic_curves(cls, lib: Any) -> Set["_EllipticCurve"]:
"""
Get the curves supported by OpenSSL.
@@ -443,7 +473,7 @@ class _EllipticCurve(object):
return set(cls.from_nid(lib, c.nid) for c in builtin_curves)
@classmethod
- def _get_elliptic_curves(cls, lib):
+ def _get_elliptic_curves(cls, lib: Any) -> Set["_EllipticCurve"]:
"""
Get, cache, and return the curves supported by OpenSSL.
@@ -457,7 +487,7 @@ class _EllipticCurve(object):
return cls._curves
@classmethod
- def from_nid(cls, lib, nid):
+ def from_nid(cls, lib: Any, nid: int) -> "_EllipticCurve":
"""
Instantiate a new :py:class:`_EllipticCurve` associated with the given
OpenSSL NID.
@@ -473,7 +503,7 @@ class _EllipticCurve(object):
"""
return cls(lib, nid, _ffi.string(lib.OBJ_nid2sn(nid)).decode("ascii"))
- def __init__(self, lib, nid, name):
+ def __init__(self, lib: Any, nid: int, name: str) -> None:
"""
:param _lib: The :py:mod:`cryptography` binding instance used to
interface with OpenSSL.
@@ -490,10 +520,10 @@ class _EllipticCurve(object):
self._nid = nid
self.name = name
- def __repr__(self):
+ def __repr__(self) -> str:
return "<Curve %r>" % (self.name,)
- def _to_EC_KEY(self):
+ def _to_EC_KEY(self) -> Any:
"""
Create a new OpenSSL EC_KEY structure initialized to use this curve.
@@ -504,7 +534,7 @@ class _EllipticCurve(object):
return _ffi.gc(key, _lib.EC_KEY_free)
-def get_elliptic_curves():
+def get_elliptic_curves() -> Set["_EllipticCurve"]:
"""
Return a set of objects representing the elliptic curves supported in the
OpenSSL build in use.
@@ -519,7 +549,7 @@ def get_elliptic_curves():
return _EllipticCurve._get_elliptic_curves(_lib)
-def get_elliptic_curve(name):
+def get_elliptic_curve(name: str) -> _EllipticCurve:
"""
Return a single curve object selected by name.
@@ -537,7 +567,8 @@ def get_elliptic_curve(name):
raise ValueError("unknown curve name", name)
-class X509Name(object):
+class X509Name:
"""
An X.509 Distinguished Name.
@@ -562,7 +593,7 @@ class X509Name(object):
:ivar emailAddress: The e-mail address of the entity.
"""
- def __init__(self, name):
+ def __init__(self, name: "X509Name") -> None:
"""
Create a new X509Name, copying the given X509Name instance.
@@ -570,9 +601,9 @@ class X509Name(object):
:type name: :py:class:`X509Name`
"""
name = _lib.X509_NAME_dup(name._name)
- self._name = _ffi.gc(name, _lib.X509_NAME_free)
+ self._name: Any = _ffi.gc(name, _lib.X509_NAME_free)
- def __setattr__(self, name, value):
+ def __setattr__(self, name: str, value: Any) -> None:
if name.startswith("_"):
return super(X509Name, self).__setattr__(name, value)
@@ -602,7 +633,7 @@ class X509Name(object):
_lib.X509_NAME_ENTRY_free(ent)
break
- if isinstance(value, _text_type):
+ if isinstance(value, str):
value = value.encode("utf-8")
add_result = _lib.X509_NAME_add_entry_by_NID(
@@ -611,7 +642,7 @@ class X509Name(object):
if not add_result:
_raise_current_error()
- def __getattr__(self, name):
+ def __getattr__(self, name: str) -> Optional[str]:
"""
Find attribute. An X509Name object has the following attributes:
countryName (alias C), stateOrProvince (alias ST), locality (alias L),
@@ -629,7 +660,7 @@ class X509Name(object):
_raise_current_error()
except Error:
pass
- return super(X509Name, self).__getattr__(name)
+ raise AttributeError("No such attribute")
entry_index = _lib.X509_NAME_get_index_by_NID(self._name, nid, -1)
if entry_index == -1:
@@ -651,25 +682,19 @@ class X509Name(object):
_lib.OPENSSL_free(result_buffer[0])
return result
- def _cmp(op):
- def f(self, other):
- if not isinstance(other, X509Name):
- return NotImplemented
- result = _lib.X509_NAME_cmp(self._name, other._name)
- return op(result, 0)
-
- return f
+ def __eq__(self, other: Any) -> bool:
+ if not isinstance(other, X509Name):
+ return NotImplemented
- __eq__ = _cmp(__eq__)
- __ne__ = _cmp(__ne__)
+ return _lib.X509_NAME_cmp(self._name, other._name) == 0
- __lt__ = _cmp(__lt__)
- __le__ = _cmp(__le__)
+ def __lt__(self, other: Any) -> bool:
+ if not isinstance(other, X509Name):
+ return NotImplemented
- __gt__ = _cmp(__gt__)
- __ge__ = _cmp(__ge__)
+ return _lib.X509_NAME_cmp(self._name, other._name) < 0
- def __repr__(self):
+ def __repr__(self) -> str:
"""
String representation of an X509Name
"""
@@ -680,10 +705,10 @@ class X509Name(object):
_openssl_assert(format_result != _ffi.NULL)
return "<X509Name object '%s'>" % (
- _native(_ffi.string(result_buffer)),
+ _ffi.string(result_buffer).decode("utf-8"),
)
- def hash(self):
+ def hash(self) -> int:
"""
Return an integer representation of the first four bytes of the
MD5 digest of the DER representation of the name.
@@ -695,7 +720,7 @@ class X509Name(object):
"""
return _lib.X509_NAME_hash(self._name)
- def der(self):
+ def der(self) -> bytes:
"""
Return the DER encoding of this name.
@@ -710,7 +735,7 @@ class X509Name(object):
_lib.OPENSSL_free(result_buffer[0])
return string_result
- def get_components(self):
+ def get_components(self) -> List[Tuple[bytes, bytes]]:
"""
Returns the components of this name, as a sequence of 2-tuples.
@@ -730,19 +755,26 @@ class X509Name(object):
# ffi.string does not handle strings containing NULL bytes
# (which may have been generated by old, broken software)
value = _ffi.buffer(
- _lib.ASN1_STRING_data(fval), _lib.ASN1_STRING_length(fval)
+ _lib.ASN1_STRING_get0_data(fval), _lib.ASN1_STRING_length(fval)
)[:]
result.append((_ffi.string(name), value))
return result
-class X509Extension(object):
+class X509Extension:
"""
An X.509 v3 certificate extension.
"""
- def __init__(self, type_name, critical, value, subject=None, issuer=None):
+ def __init__(
+ self,
+ type_name: bytes,
+ critical: bool,
+ value: bytes,
+ subject: Optional["X509"] = None,
+ issuer: Optional["X509"] = None,
+ ) -> None:
"""
Initializes an X509 extension.
@@ -752,7 +784,8 @@ class X509Extension(object):
:param bool critical: A flag indicating whether this is a critical
extension.
- :param value: The value of the extension.
+ :param value: The OpenSSL textual representation of the extension's
+ value.
:type value: :py:data:`bytes`
:param subject: Optional X509 certificate to use as subject.
@@ -804,7 +837,7 @@ class X509Extension(object):
self._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free)
@property
- def _nid(self):
+ def _nid(self) -> Any:
return _lib.OBJ_obj2nid(
_lib.X509_EXTENSION_get_object(self._extension)
)
@@ -815,7 +848,7 @@ class X509Extension(object):
_lib.GEN_URI: "URI",
}
- def _subjectAltNameString(self):
+ def _subjectAltNameString(self) -> str:
names = _ffi.cast(
"GENERAL_NAMES*", _lib.X509V3_EXT_d2i(self._extension)
)
@@ -829,15 +862,15 @@ class X509Extension(object):
except KeyError:
bio = _new_mem_buf()
_lib.GENERAL_NAME_print(bio, name)
- parts.append(_native(_bio_to_string(bio)))
+ parts.append(_bio_to_string(bio).decode("utf-8"))
else:
- value = _native(
- _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:]
- )
+ value = _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[
+ :
+ ].decode("utf-8")
parts.append(label + ":" + value)
return ", ".join(parts)
- def __str__(self):
+ def __str__(self) -> str:
"""
:return: a nice text representation of the extension
"""
@@ -848,9 +881,9 @@ class X509Extension(object):
print_result = _lib.X509V3_EXT_print(bio, self._extension, 0, 0)
_openssl_assert(print_result != 0)
- return _native(_bio_to_string(bio))
+ return _bio_to_string(bio).decode("utf-8")
- def get_critical(self):
+ def get_critical(self) -> bool:
"""
Returns the critical field of this X.509 extension.
@@ -858,7 +891,7 @@ class X509Extension(object):
"""
return _lib.X509_EXTENSION_get_critical(self._extension)
- def get_short_name(self):
+ def get_short_name(self) -> bytes:
"""
Returns the short type name of this X.509 extension.
@@ -873,7 +906,7 @@ class X509Extension(object):
nid = _lib.OBJ_obj2nid(obj)
return _ffi.string(_lib.OBJ_nid2sn(nid))
- def get_data(self):
+ def get_data(self) -> bytes:
"""
Returns the data of the X509 extension, encoded as ASN.1.
@@ -884,23 +917,23 @@ class X509Extension(object):
"""
octet_result = _lib.X509_EXTENSION_get_data(self._extension)
string_result = _ffi.cast("ASN1_STRING*", octet_result)
- char_result = _lib.ASN1_STRING_data(string_result)
+ char_result = _lib.ASN1_STRING_get0_data(string_result)
result_length = _lib.ASN1_STRING_length(string_result)
return _ffi.buffer(char_result, result_length)[:]
-class X509Req(object):
+class X509Req:
"""
An X.509 certificate signing requests.
"""
- def __init__(self):
+ def __init__(self) -> None:
req = _lib.X509_REQ_new()
self._req = _ffi.gc(req, _lib.X509_REQ_free)
# Default to version 0.
self.set_version(0)
- def to_cryptography(self):
+ def to_cryptography(self) -> x509.CertificateSigningRequest:
"""
Export as a ``cryptography`` certificate signing request.
@@ -912,11 +945,12 @@ class X509Req(object):
der = dump_certificate_request(FILETYPE_ASN1, self)
- backend = _get_backend()
- return load_der_x509_csr(der, backend)
+ return load_der_x509_csr(der)
@classmethod
- def from_cryptography(cls, crypto_req):
+ def from_cryptography(
+ cls, crypto_req: x509.CertificateSigningRequest
+ ) -> "X509Req":
"""
Construct based on a ``cryptography`` *crypto_req*.
@@ -935,7 +969,7 @@ class X509Req(object):
der = crypto_req.public_bytes(Encoding.DER)
return load_certificate_request(FILETYPE_ASN1, der)
- def set_pubkey(self, pkey):
+ def set_pubkey(self, pkey: PKey) -> None:
"""
Set the public key of the certificate signing request.
@@ -947,7 +981,7 @@ class X509Req(object):
set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey)
_openssl_assert(set_result == 1)
- def get_pubkey(self):
+ def get_pubkey(self) -> PKey:
"""
Get the public key of the certificate signing request.
@@ -961,9 +995,9 @@ class X509Req(object):
pkey._only_public = True
return pkey
- def set_version(self, version):
+ def set_version(self, version: int) -> None:
"""
- Set the version subfield (RFC 2459, section 4.1.2.1) of the certificate
+ Set the version subfield (RFC 2986, section 4.1) of the certificate
request.
:param int version: The version number.
@@ -972,7 +1006,7 @@ class X509Req(object):
set_result = _lib.X509_REQ_set_version(self._req, version)
_openssl_assert(set_result == 1)
- def get_version(self):
+ def get_version(self) -> int:
"""
Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate
request.
@@ -982,7 +1016,7 @@ class X509Req(object):
"""
return _lib.X509_REQ_get_version(self._req)
- def get_subject(self):
+ def get_subject(self) -> X509Name:
"""
Return the subject of this certificate signing request.
@@ -1004,7 +1038,7 @@ class X509Req(object):
return name
- def add_extensions(self, extensions):
+ def add_extensions(self, extensions: Iterable[X509Extension]) -> None:
"""
Add extensions to the certificate signing request.
@@ -1027,7 +1061,7 @@ class X509Req(object):
add_result = _lib.X509_REQ_add_extensions(self._req, stack)
_openssl_assert(add_result == 1)
- def get_extensions(self):
+ def get_extensions(self) -> List[X509Extension]:
"""
Get X.509 extensions in the certificate signing request.
@@ -1055,15 +1089,15 @@ class X509Req(object):
exts.append(ext)
return exts
- def sign(self, pkey, digest):
+ def sign(self, pkey: PKey, digest: str) -> None:
"""
Sign the certificate signing request with this key and digest type.
:param pkey: The key pair to sign with.
:type pkey: :py:class:`PKey`
:param digest: The name of the message digest to use for the signature,
- e.g. :py:data:`b"sha256"`.
- :type digest: :py:class:`bytes`
+ e.g. :py:data:`"sha256"`.
+ :type digest: :py:class:`str`
:return: ``None``
"""
if pkey._only_public:
@@ -1079,7 +1113,7 @@ class X509Req(object):
sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj)
_openssl_assert(sign_result > 0)
- def verify(self, pkey):
+ def verify(self, pkey: PKey) -> bool:
"""
Verifies the signature on this certificate signing request.
@@ -1101,12 +1135,12 @@ class X509Req(object):
return result
-class X509(object):
+class X509:
"""
An X.509 certificate.
"""
- def __init__(self):
+ def __init__(self) -> None:
x509 = _lib.X509_new()
_openssl_assert(x509 != _ffi.NULL)
self._x509 = _ffi.gc(x509, _lib.X509_free)
@@ -1115,14 +1149,14 @@ class X509(object):
self._subject_invalidator = _X509NameInvalidator()
@classmethod
- def _from_raw_x509_ptr(cls, x509):
+ def _from_raw_x509_ptr(cls, x509: Any) -> "X509":
cert = cls.__new__(cls)
cert._x509 = _ffi.gc(x509, _lib.X509_free)
cert._issuer_invalidator = _X509NameInvalidator()
cert._subject_invalidator = _X509NameInvalidator()
return cert
- def to_cryptography(self):
+ def to_cryptography(self) -> x509.Certificate:
"""
Export as a ``cryptography`` certificate.
@@ -1133,11 +1167,10 @@ class X509(object):
from cryptography.x509 import load_der_x509_certificate
der = dump_certificate(FILETYPE_ASN1, self)
- backend = _get_backend()
- return load_der_x509_certificate(der, backend)
+ return load_der_x509_certificate(der)
@classmethod
- def from_cryptography(cls, crypto_cert):
+ def from_cryptography(cls, crypto_cert: x509.Certificate) -> "X509":
"""
Construct based on a ``cryptography`` *crypto_cert*.
@@ -1156,7 +1189,7 @@ class X509(object):
der = crypto_cert.public_bytes(Encoding.DER)
return load_certificate(FILETYPE_ASN1, der)
- def set_version(self, version):
+ def set_version(self, version: int) -> None:
"""
Set the version number of the certificate. Note that the
version value is zero-based, eg. a value of 0 is V1.
@@ -1169,9 +1202,9 @@ class X509(object):
if not isinstance(version, int):
raise TypeError("version must be an integer")
- _lib.X509_set_version(self._x509, version)
+ _openssl_assert(_lib.X509_set_version(self._x509, version) == 1)
- def get_version(self):
+ def get_version(self) -> int:
"""
Return the version number of the certificate.
@@ -1180,7 +1213,7 @@ class X509(object):
"""
return _lib.X509_get_version(self._x509)
- def get_pubkey(self):
+ def get_pubkey(self) -> PKey:
"""
Get the public key of the certificate.
@@ -1195,7 +1228,7 @@ class X509(object):
pkey._only_public = True
return pkey
- def set_pubkey(self, pkey):
+ def set_pubkey(self, pkey: PKey) -> None:
"""
Set the public key of the certificate.
@@ -1210,7 +1243,7 @@ class X509(object):
set_result = _lib.X509_set_pubkey(self._x509, pkey._pkey)
_openssl_assert(set_result == 1)
- def sign(self, pkey, digest):
+ def sign(self, pkey: PKey, digest: str) -> None:
"""
Sign the certificate with this key and digest type.
@@ -1218,7 +1251,7 @@ class X509(object):
:type pkey: :py:class:`PKey`
:param digest: The name of the message digest to use.
- :type digest: :py:class:`bytes`
+ :type digest: :py:class:`str`
:return: :py:data:`None`
"""
@@ -1238,7 +1271,7 @@ class X509(object):
sign_result = _lib.X509_sign(self._x509, pkey._pkey, evp_md)
_openssl_assert(sign_result > 0)
- def get_signature_algorithm(self):
+ def get_signature_algorithm(self) -> bytes:
"""
Return the signature algorithm used in the certificate.
@@ -1255,12 +1288,12 @@ class X509(object):
raise ValueError("Undefined signature algorithm")
return _ffi.string(_lib.OBJ_nid2ln(nid))
- def digest(self, digest_name):
+ def digest(self, digest_name: str) -> bytes:
"""
Return the digest of the X509 object.
:param digest_name: The name of the digest algorithm to use.
- :type digest_name: :py:class:`bytes`
+ :type digest_name: :py:class:`str`
:return: The digest of the object, formatted as
:py:const:`b":"`-delimited hex pairs.
@@ -1286,7 +1319,7 @@ class X509(object):
]
)
- def subject_name_hash(self):
+ def subject_name_hash(self) -> bytes:
"""
Return the hash of the X509 subject.
@@ -1295,7 +1328,7 @@ class X509(object):
"""
return _lib.X509_subject_name_hash(self._x509)
- def set_serial_number(self, serial):
+ def set_serial_number(self, serial: int) -> None:
"""
Set the serial number of the certificate.
@@ -1304,19 +1337,18 @@ class X509(object):
:return: :py:data`None`
"""
- if not isinstance(serial, _integer_types):
+ if not isinstance(serial, int):
raise TypeError("serial must be an integer")
hex_serial = hex(serial)[2:]
- if not isinstance(hex_serial, bytes):
- hex_serial = hex_serial.encode("ascii")
+ hex_serial_bytes = hex_serial.encode("ascii")
bignum_serial = _ffi.new("BIGNUM**")
# BN_hex2bn stores the result in &bignum. Unless it doesn't feel like
# it. If bignum is still NULL after this call, then the return value
# is actually the result. I hope. -exarkun
- small_serial = _lib.BN_hex2bn(bignum_serial, hex_serial)
+ small_serial = _lib.BN_hex2bn(bignum_serial, hex_serial_bytes)
if bignum_serial[0] == _ffi.NULL:
set_result = _lib.ASN1_INTEGER_set(
@@ -1335,7 +1367,7 @@ class X509(object):
set_result = _lib.X509_set_serialNumber(self._x509, asn1_serial)
_openssl_assert(set_result == 1)
- def get_serial_number(self):
+ def get_serial_number(self) -> int:
"""
Return the serial number of this certificate.
@@ -1355,7 +1387,7 @@ class X509(object):
finally:
_lib.BN_free(bignum_serial)
- def gmtime_adj_notAfter(self, amount):
+ def gmtime_adj_notAfter(self, amount: int) -> None:
"""
Adjust the time stamp on which the certificate stops being valid.
@@ -1369,7 +1401,7 @@ class X509(object):
notAfter = _lib.X509_getm_notAfter(self._x509)
_lib.X509_gmtime_adj(notAfter, amount)
- def gmtime_adj_notBefore(self, amount):
+ def gmtime_adj_notBefore(self, amount: int) -> None:
"""
Adjust the timestamp on which the certificate starts being valid.
@@ -1382,22 +1414,25 @@ class X509(object):
notBefore = _lib.X509_getm_notBefore(self._x509)
_lib.X509_gmtime_adj(notBefore, amount)
- def has_expired(self):
+ def has_expired(self) -> bool:
"""
Check whether the certificate has expired.
:return: ``True`` if the certificate has expired, ``False`` otherwise.
:rtype: bool
"""
- time_string = _native(self.get_notAfter())
+ time_bytes = self.get_notAfter()
+ if time_bytes is None:
+ raise ValueError("Unable to determine notAfter")
+ time_string = time_bytes.decode("utf-8")
not_after = datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ")
return not_after < datetime.datetime.utcnow()
- def _get_boundary_time(self, which):
+ def _get_boundary_time(self, which: Any) -> Optional[bytes]:
return _get_asn1_time(which(self._x509))
- def get_notBefore(self):
+ def get_notBefore(self) -> Optional[bytes]:
"""
Get the timestamp at which the certificate starts being valid.
@@ -1410,10 +1445,12 @@ class X509(object):
"""
return self._get_boundary_time(_lib.X509_getm_notBefore)
- def _set_boundary_time(self, which, when):
+ def _set_boundary_time(
+ self, which: Callable[..., Any], when: bytes
+ ) -> None:
return _set_asn1_time(which(self._x509), when)
- def set_notBefore(self, when):
+ def set_notBefore(self, when: bytes) -> None:
"""
Set the timestamp at which the certificate starts being valid.
@@ -1426,7 +1463,7 @@ class X509(object):
"""
return self._set_boundary_time(_lib.X509_getm_notBefore, when)
- def get_notAfter(self):
+ def get_notAfter(self) -> Optional[bytes]:
"""
Get the timestamp at which the certificate stops being valid.
@@ -1439,7 +1476,7 @@ class X509(object):
"""
return self._get_boundary_time(_lib.X509_getm_notAfter)
- def set_notAfter(self, when):
+ def set_notAfter(self, when: bytes) -> None:
"""
Set the timestamp at which the certificate stops being valid.
@@ -1452,7 +1489,7 @@ class X509(object):
"""
return self._set_boundary_time(_lib.X509_getm_notAfter, when)
- def _get_name(self, which):
+ def _get_name(self, which: Any) -> X509Name:
name = X509Name.__new__(X509Name)
name._name = which(self._x509)
_openssl_assert(name._name != _ffi.NULL)
@@ -1463,13 +1500,13 @@ class X509(object):
return name
- def _set_name(self, which, name):
+ def _set_name(self, which: Any, name: X509Name) -> None:
if not isinstance(name, X509Name):
raise TypeError("name must be an X509Name")
set_result = which(self._x509, name._name)
_openssl_assert(set_result == 1)
- def get_issuer(self):
+ def get_issuer(self) -> X509Name:
"""
Return the issuer of this certificate.
@@ -1485,7 +1522,7 @@ class X509(object):
self._issuer_invalidator.add(name)
return name
- def set_issuer(self, issuer):
+ def set_issuer(self, issuer: X509Name) -> None:
"""
Set the issuer of this certificate.
@@ -1497,7 +1534,7 @@ class X509(object):
self._set_name(_lib.X509_set_issuer_name, issuer)
self._issuer_invalidator.clear()
- def get_subject(self):
+ def get_subject(self) -> X509Name:
"""
Return the subject of this certificate.
@@ -1513,7 +1550,7 @@ class X509(object):
self._subject_invalidator.add(name)
return name
- def set_subject(self, subject):
+ def set_subject(self, subject: X509Name) -> None:
"""
Set the subject of this certificate.
@@ -1525,7 +1562,7 @@ class X509(object):
self._set_name(_lib.X509_set_subject_name, subject)
self._subject_invalidator.clear()
- def get_extension_count(self):
+ def get_extension_count(self) -> int:
"""
Get the number of extensions on this certificate.
@@ -1536,7 +1573,7 @@ class X509(object):
"""
return _lib.X509_get_ext_count(self._x509)
- def add_extensions(self, extensions):
+ def add_extensions(self, extensions: Iterable[X509Extension]) -> None:
"""
Add extensions to the certificate.
@@ -1552,7 +1589,7 @@ class X509(object):
if not add_result:
_raise_current_error()
- def get_extension(self, index):
+ def get_extension(self, index: int) -> X509Extension:
"""
Get a specific extension of the certificate by index.
@@ -1576,7 +1613,7 @@ class X509(object):
return ext
-class X509StoreFlags(object):
+class X509StoreFlags:
"""
Flags for X509 verification, used to change the behavior of
:class:`X509Store`.
@@ -1587,19 +1624,20 @@ class X509StoreFlags(object):
https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html
"""
- CRL_CHECK = _lib.X509_V_FLAG_CRL_CHECK
- CRL_CHECK_ALL = _lib.X509_V_FLAG_CRL_CHECK_ALL
- IGNORE_CRITICAL = _lib.X509_V_FLAG_IGNORE_CRITICAL
- X509_STRICT = _lib.X509_V_FLAG_X509_STRICT
- ALLOW_PROXY_CERTS = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS
- POLICY_CHECK = _lib.X509_V_FLAG_POLICY_CHECK
- EXPLICIT_POLICY = _lib.X509_V_FLAG_EXPLICIT_POLICY
- INHIBIT_MAP = _lib.X509_V_FLAG_INHIBIT_MAP
- NOTIFY_POLICY = _lib.X509_V_FLAG_NOTIFY_POLICY
- CHECK_SS_SIGNATURE = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE
+ CRL_CHECK: int = _lib.X509_V_FLAG_CRL_CHECK
+ CRL_CHECK_ALL: int = _lib.X509_V_FLAG_CRL_CHECK_ALL
+ IGNORE_CRITICAL: int = _lib.X509_V_FLAG_IGNORE_CRITICAL
+ X509_STRICT: int = _lib.X509_V_FLAG_X509_STRICT
+ ALLOW_PROXY_CERTS: int = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS
+ POLICY_CHECK: int = _lib.X509_V_FLAG_POLICY_CHECK
+ EXPLICIT_POLICY: int = _lib.X509_V_FLAG_EXPLICIT_POLICY
+ INHIBIT_MAP: int = _lib.X509_V_FLAG_INHIBIT_MAP
+ NOTIFY_POLICY: int = _lib.X509_V_FLAG_NOTIFY_POLICY
+ CHECK_SS_SIGNATURE: int = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE
+ PARTIAL_CHAIN: int = _lib.X509_V_FLAG_PARTIAL_CHAIN
-class X509Store(object):
+class X509Store:
"""
An X.509 store.
@@ -1613,11 +1651,11 @@ class X509Store(object):
:class:`X509StoreContext`.
"""
- def __init__(self):
+ def __init__(self) -> None:
store = _lib.X509_STORE_new()
self._store = _ffi.gc(store, _lib.X509_STORE_free)
- def add_cert(self, cert):
+ def add_cert(self, cert: X509) -> None:
"""
Adds a trusted certificate to this store.
@@ -1639,7 +1677,7 @@ class X509Store(object):
res = _lib.X509_STORE_add_cert(self._store, cert._x509)
_openssl_assert(res == 1)
- def add_crl(self, crl):
+ def add_crl(self, crl: "CRL") -> None:
"""
Add a certificate revocation list to this store.
@@ -1655,7 +1693,7 @@ class X509Store(object):
"""
_openssl_assert(_lib.X509_STORE_add_crl(self._store, crl._crl) != 0)
- def set_flags(self, flags):
+ def set_flags(self, flags: int) -> None:
"""
Set verification flags to this store.
@@ -1679,7 +1717,7 @@ class X509Store(object):
"""
_openssl_assert(_lib.X509_STORE_set_flags(self._store, flags) != 0)
- def set_time(self, vfy_time):
+ def set_time(self, vfy_time: datetime.datetime) -> None:
"""
Set the time against which the certificates are verified.
@@ -1703,7 +1741,9 @@ class X509Store(object):
)
_openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0)
- def load_locations(self, cafile, capath=None):
+ def load_locations(
+ self, cafile: StrOrBytesPath, capath: Optional[StrOrBytesPath] = None
+ ) -> None:
"""
Let X509Store know where we can find trusted certificates for the
certificate chain. Note that the certificates have to be in PEM
@@ -1737,12 +1777,12 @@ class X509Store(object):
if cafile is None:
cafile = _ffi.NULL
else:
- cafile = _path_string(cafile)
+ cafile = _path_bytes(cafile)
if capath is None:
capath = _ffi.NULL
else:
- capath = _path_string(capath)
+ capath = _path_bytes(capath)
load_result = _lib.X509_STORE_load_locations(
self._store, cafile, capath
@@ -1760,12 +1800,15 @@ class X509StoreContextError(Exception):
:type certificate: :class:`X509`
"""
- def __init__(self, message, certificate):
+ def __init__(
+ self, message: str, errors: List[Any], certificate: X509
+ ) -> None:
super(X509StoreContextError, self).__init__(message)
+ self.errors = errors
self.certificate = certificate
-class X509StoreContext(object):
+class X509StoreContext:
"""
An X.509 store context.
@@ -1787,7 +1830,12 @@ class X509StoreContext(object):
:type chain: :class:`list` of :class:`X509`
"""
- def __init__(self, store, certificate, chain=None):
+ def __init__(
+ self,
+ store: X509Store,
+ certificate: X509,
+ chain: Optional[Sequence[X509]] = None,
+ ) -> None:
store_ctx = _lib.X509_STORE_CTX_new()
self._store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free)
self._store = store
@@ -1799,8 +1847,10 @@ class X509StoreContext(object):
self._init()
@staticmethod
- def _build_certificate_stack(certificates):
- def cleanup(s):
+ def _build_certificate_stack(
+ certificates: Optional[Sequence[X509]],
+ ) -> None:
+ def cleanup(s: Any) -> None:
# Equivalent to sk_X509_pop_free, but we don't
# currently have a CFFI binding for that available
for i in range(_lib.sk_X509_num(s)):
@@ -1826,7 +1876,7 @@ class X509StoreContext(object):
return stack
- def _init(self):
+ def _init(self) -> None:
"""
Set up the store context for a subsequent verification operation.
@@ -1839,7 +1889,7 @@ class X509StoreContext(object):
if ret <= 0:
_raise_current_error()
- def _cleanup(self):
+ def _cleanup(self) -> None:
"""
Internally cleans up the store context.
@@ -1847,7 +1897,7 @@ class X509StoreContext(object):
"""
_lib.X509_STORE_CTX_cleanup(self._store_ctx)
- def _exception_from_context(self):
+ def _exception_from_context(self) -> X509StoreContextError:
"""
Convert an OpenSSL native context error failure into a Python
exception.
@@ -1855,25 +1905,24 @@ class X509StoreContext(object):
When a call to native OpenSSL X509_verify_cert fails, additional
information about the failure can be obtained from the store context.
"""
+ message = _ffi.string(
+ _lib.X509_verify_cert_error_string(
+ _lib.X509_STORE_CTX_get_error(self._store_ctx)
+ )
+ ).decode("utf-8")
errors = [
_lib.X509_STORE_CTX_get_error(self._store_ctx),
_lib.X509_STORE_CTX_get_error_depth(self._store_ctx),
- _native(
- _ffi.string(
- _lib.X509_verify_cert_error_string(
- _lib.X509_STORE_CTX_get_error(self._store_ctx)
- )
- )
- ),
+ message,
]
# A context error should always be associated with a certificate, so we
# expect this call to never return :class:`None`.
_x509 = _lib.X509_STORE_CTX_get_current_cert(self._store_ctx)
_cert = _lib.X509_dup(_x509)
pycert = X509._from_raw_x509_ptr(_cert)
- return X509StoreContextError(errors, pycert)
+ return X509StoreContextError(message, errors, pycert)
- def set_store(self, store):
+ def set_store(self, store: X509Store) -> None:
"""
Set the context's X.509 store.
@@ -1884,7 +1933,7 @@ class X509StoreContext(object):
"""
self._store = store
- def verify_certificate(self):
+ def verify_certificate(self) -> None:
"""
Verify a certificate in a context.
@@ -1906,7 +1955,7 @@ class X509StoreContext(object):
if ret <= 0:
raise self._exception_from_context()
- def get_verified_chain(self):
+ def get_verified_chain(self) -> List[X509]:
"""
Verify a certificate in a context and return the complete validated
chain.
@@ -1946,7 +1995,7 @@ class X509StoreContext(object):
return result
-def load_certificate(type, buffer):
+def load_certificate(type: int, buffer: bytes) -> X509:
"""
Load a certificate (X509) from the string *buffer* encoded with the
type *type*.
@@ -1957,7 +2006,7 @@ def load_certificate(type, buffer):
:return: The X509 object
"""
- if isinstance(buffer, _text_type):
+ if isinstance(buffer, str):
buffer = buffer.encode("ascii")
bio = _new_mem_buf(buffer)
@@ -1975,7 +2024,7 @@ def load_certificate(type, buffer):
return X509._from_raw_x509_ptr(x509)
-def dump_certificate(type, cert):
+def dump_certificate(type: int, cert: X509) -> bytes:
"""
Dump the certificate *cert* into a buffer string encoded with the type
*type*.
@@ -2003,7 +2052,7 @@ def dump_certificate(type, cert):
return _bio_to_string(bio)
-def dump_publickey(type, pkey):
+def dump_publickey(type: int, pkey: PKey) -> bytes:
"""
Dump a public key to a buffer.
@@ -2028,7 +2077,12 @@ def dump_publickey(type, pkey):
return _bio_to_string(bio)
-def dump_privatekey(type, pkey, cipher=None, passphrase=None):
+def dump_privatekey(
+ type: int,
+ pkey: PKey,
+ cipher: Optional[str] = None,
+ passphrase: Optional[PassphraseCallableT] = None,
+) -> bytes:
"""
Dump the private key *pkey* into a buffer string encoded with the type
*type*. Optionally (if *type* is :const:`FILETYPE_PEM`) encrypting it
@@ -2092,7 +2146,7 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None):
return _bio_to_string(bio)
-class Revoked(object):
+class Revoked:
"""
A certificate revocation.
"""
@@ -2112,11 +2166,11 @@ class Revoked(object):
# b"removeFromCRL",
]
- def __init__(self):
+ def __init__(self) -> None:
revoked = _lib.X509_REVOKED_new()
self._revoked = _ffi.gc(revoked, _lib.X509_REVOKED_free)
- def set_serial(self, hex_str):
+ def set_serial(self, hex_str: bytes) -> None:
"""
Set the serial number.
@@ -2140,7 +2194,7 @@ class Revoked(object):
)
_lib.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial)
- def get_serial(self):
+ def get_serial(self) -> bytes:
"""
Get the serial number.
@@ -2158,7 +2212,7 @@ class Revoked(object):
_openssl_assert(result >= 0)
return _bio_to_string(bio)
- def _delete_reason(self):
+ def _delete_reason(self) -> None:
for i in range(_lib.X509_REVOKED_get_ext_count(self._revoked)):
ext = _lib.X509_REVOKED_get_ext(self._revoked, i)
obj = _lib.X509_EXTENSION_get_object(ext)
@@ -2167,7 +2221,7 @@ class Revoked(object):
_lib.X509_REVOKED_delete_ext(self._revoked, i)
break
- def set_reason(self, reason):
+ def set_reason(self, reason: Optional[bytes]) -> None:
"""
Set the reason of this revocation.
@@ -2204,7 +2258,7 @@ class Revoked(object):
)
_openssl_assert(add_result == 1)
- def get_reason(self):
+ def get_reason(self) -> Optional[bytes]:
"""
Get the reason of this revocation.
@@ -2230,8 +2284,9 @@ class Revoked(object):
_openssl_assert(print_result != 0)
return _bio_to_string(bio)
+ return None
- def all_reasons(self):
+ def all_reasons(self) -> List[bytes]:
"""
Return a list of all the supported reason strings.
@@ -2243,7 +2298,7 @@ class Revoked(object):
"""
return self._crl_reasons[:]
- def set_rev_date(self, when):
+ def set_rev_date(self, when: bytes) -> None:
"""
Set the revocation timestamp.
@@ -2251,10 +2306,13 @@ class Revoked(object):
as ASN.1 TIME.
:return: ``None``
"""
- dt = _lib.X509_REVOKED_get0_revocationDate(self._revoked)
- return _set_asn1_time(dt, when)
+ revocationDate = _new_asn1_time(when)
+ ret = _lib.X509_REVOKED_set_revocationDate(
+ self._revoked, revocationDate
+ )
+ _openssl_assert(ret == 1)
- def get_rev_date(self):
+ def get_rev_date(self) -> Optional[bytes]:
"""
Get the revocation timestamp.
@@ -2265,16 +2323,16 @@ class Revoked(object):
return _get_asn1_time(dt)
-class CRL(object):
+class CRL:
"""
A certificate revocation list.
"""
- def __init__(self):
+ def __init__(self) -> None:
crl = _lib.X509_CRL_new()
self._crl = _ffi.gc(crl, _lib.X509_CRL_free)
- def to_cryptography(self):
+ def to_cryptography(self) -> x509.CertificateRevocationList:
"""
Export as a ``cryptography`` CRL.
@@ -2285,12 +2343,12 @@ class CRL(object):
from cryptography.x509 import load_der_x509_crl
der = dump_crl(FILETYPE_ASN1, self)
-
- backend = _get_backend()
- return load_der_x509_crl(der, backend)
+ return load_der_x509_crl(der)
@classmethod
- def from_cryptography(cls, crypto_crl):
+ def from_cryptography(
+ cls, crypto_crl: x509.CertificateRevocationList
+ ) -> "CRL":
"""
Construct based on a ``cryptography`` *crypto_crl*.
@@ -2309,7 +2367,7 @@ class CRL(object):
der = crypto_crl.public_bytes(Encoding.DER)
return load_crl(FILETYPE_ASN1, der)
- def get_revoked(self):
+ def get_revoked(self) -> Optional[Tuple[Revoked, ...]]:
"""
Return the revocations in this certificate revocation list.
@@ -2323,14 +2381,15 @@ class CRL(object):
revoked_stack = _lib.X509_CRL_get_REVOKED(self._crl)
for i in range(_lib.sk_X509_REVOKED_num(revoked_stack)):
revoked = _lib.sk_X509_REVOKED_value(revoked_stack, i)
- revoked_copy = _lib.Cryptography_X509_REVOKED_dup(revoked)
+ revoked_copy = _lib.X509_REVOKED_dup(revoked)
pyrev = Revoked.__new__(Revoked)
pyrev._revoked = _ffi.gc(revoked_copy, _lib.X509_REVOKED_free)
results.append(pyrev)
if results:
return tuple(results)
+ return None
- def add_revoked(self, revoked):
+ def add_revoked(self, revoked: Revoked) -> None:
"""
Add a revoked (by value not reference) to the CRL structure
@@ -2341,13 +2400,13 @@ class CRL(object):
:param Revoked revoked: The new revocation.
:return: ``None``
"""
- copy = _lib.Cryptography_X509_REVOKED_dup(revoked._revoked)
+ copy = _lib.X509_REVOKED_dup(revoked._revoked)
_openssl_assert(copy != _ffi.NULL)
add_result = _lib.X509_CRL_add0_revoked(self._crl, copy)
_openssl_assert(add_result != 0)
- def get_issuer(self):
+ def get_issuer(self) -> X509Name:
"""
Get the CRL's issuer.
@@ -2362,7 +2421,7 @@ class CRL(object):
issuer._name = _issuer
return issuer
- def set_version(self, version):
+ def set_version(self, version: int) -> None:
"""
Set the CRL version.
@@ -2373,10 +2432,7 @@ class CRL(object):
"""
_openssl_assert(_lib.X509_CRL_set_version(self._crl, version) != 0)
- def _set_boundary_time(self, which, when):
- return _set_asn1_time(which(self._crl), when)
-
- def set_lastUpdate(self, when):
+ def set_lastUpdate(self, when: bytes) -> None:
"""
Set when the CRL was last updated.
@@ -2389,9 +2445,11 @@ class CRL(object):
:param bytes when: A timestamp string.
:return: ``None``
"""
- return self._set_boundary_time(_lib.X509_CRL_get_lastUpdate, when)
+ lastUpdate = _new_asn1_time(when)
+ ret = _lib.X509_CRL_set1_lastUpdate(self._crl, lastUpdate)
+ _openssl_assert(ret == 1)
- def set_nextUpdate(self, when):
+ def set_nextUpdate(self, when: bytes) -> None:
"""
Set when the CRL will next be updated.
@@ -2404,9 +2462,11 @@ class CRL(object):
:param bytes when: A timestamp string.
:return: ``None``
"""
- return self._set_boundary_time(_lib.X509_CRL_get_nextUpdate, when)
+ nextUpdate = _new_asn1_time(when)
+ ret = _lib.X509_CRL_set1_nextUpdate(self._crl, nextUpdate)
+ _openssl_assert(ret == 1)
- def sign(self, issuer_cert, issuer_key, digest):
+ def sign(self, issuer_cert: X509, issuer_key: PKey, digest: bytes) -> None:
"""
Sign the CRL.
@@ -2433,8 +2493,13 @@ class CRL(object):
_openssl_assert(result != 0)
def export(
- self, cert, key, type=FILETYPE_PEM, days=100, digest=_UNSPECIFIED
- ):
+ self,
+ cert: X509,
+ key: PKey,
+ type: int = FILETYPE_PEM,
+ days: int = 100,
+ digest: bytes = _UNSPECIFIED, # type: ignore
+ ) -> bytes:
"""
Export the CRL as a string.
@@ -2462,23 +2527,26 @@ class CRL(object):
if digest_obj == _ffi.NULL:
raise ValueError("No such digest method")
- bio = _lib.BIO_new(_lib.BIO_s_mem())
- _openssl_assert(bio != _ffi.NULL)
-
# A scratch time object to give different values to different CRL
# fields
sometime = _lib.ASN1_TIME_new()
_openssl_assert(sometime != _ffi.NULL)
+ sometime = _ffi.gc(sometime, _lib.ASN1_TIME_free)
- _lib.X509_gmtime_adj(sometime, 0)
- _lib.X509_CRL_set_lastUpdate(self._crl, sometime)
+ ret = _lib.X509_gmtime_adj(sometime, 0)
+ _openssl_assert(ret != _ffi.NULL)
+ ret = _lib.X509_CRL_set1_lastUpdate(self._crl, sometime)
+ _openssl_assert(ret == 1)
- _lib.X509_gmtime_adj(sometime, days * 24 * 60 * 60)
- _lib.X509_CRL_set_nextUpdate(self._crl, sometime)
+ ret = _lib.X509_gmtime_adj(sometime, days * 24 * 60 * 60)
+ _openssl_assert(ret != _ffi.NULL)
+ ret = _lib.X509_CRL_set1_nextUpdate(self._crl, sometime)
+ _openssl_assert(ret == 1)
- _lib.X509_CRL_set_issuer_name(
+ ret = _lib.X509_CRL_set_issuer_name(
self._crl, _lib.X509_get_subject_name(cert._x509)
)
+ _openssl_assert(ret == 1)
sign_result = _lib.X509_CRL_sign(self._crl, key._pkey, digest_obj)
if not sign_result:
@@ -2487,8 +2555,11 @@ class CRL(object):
return dump_crl(type, self)
-class PKCS7(object):
- def type_is_signed(self):
+class PKCS7:
+
+ _pkcs7: Any
+
+ def type_is_signed(self) -> bool:
"""
Check if this NID_pkcs7_signed object
@@ -2496,7 +2567,7 @@ class PKCS7(object):
"""
return bool(_lib.PKCS7_type_is_signed(self._pkcs7))
- def type_is_enveloped(self):
+ def type_is_enveloped(self) -> bool:
"""
Check if this NID_pkcs7_enveloped object
@@ -2504,7 +2575,7 @@ class PKCS7(object):
"""
return bool(_lib.PKCS7_type_is_enveloped(self._pkcs7))
- def type_is_signedAndEnveloped(self):
+ def type_is_signedAndEnveloped(self) -> bool:
"""
Check if this NID_pkcs7_signedAndEnveloped object
@@ -2512,7 +2583,7 @@ class PKCS7(object):
"""
return bool(_lib.PKCS7_type_is_signedAndEnveloped(self._pkcs7))
- def type_is_data(self):
+ def type_is_data(self) -> bool:
"""
Check if this NID_pkcs7_data object
@@ -2520,7 +2591,7 @@ class PKCS7(object):
"""
return bool(_lib.PKCS7_type_is_data(self._pkcs7))
- def get_type_name(self):
+ def get_type_name(self) -> str:
"""
Returns the type name of the PKCS7 structure
@@ -2531,18 +2602,18 @@ class PKCS7(object):
return _ffi.string(string_type)
-class PKCS12(object):
+class PKCS12:
"""
A PKCS #12 archive.
"""
- def __init__(self):
- self._pkey = None
- self._cert = None
- self._cacerts = None
- self._friendlyname = None
+ def __init__(self) -> None:
+ self._pkey: Optional[PKey] = None
+ self._cert: Optional[X509] = None
+ self._cacerts: Optional[List[X509]] = None
+ self._friendlyname: Optional[bytes] = None
- def get_certificate(self):
+ def get_certificate(self) -> Optional[X509]:
"""
Get the certificate in the PKCS #12 structure.
@@ -2551,7 +2622,7 @@ class PKCS12(object):
"""
return self._cert
- def set_certificate(self, cert):
+ def set_certificate(self, cert: X509) -> None:
"""
Set the certificate in the PKCS #12 structure.
@@ -2564,7 +2635,7 @@ class PKCS12(object):
raise TypeError("cert must be an X509 instance")
self._cert = cert
- def get_privatekey(self):
+ def get_privatekey(self) -> Optional[PKey]:
"""
Get the private key in the PKCS #12 structure.
@@ -2573,7 +2644,7 @@ class PKCS12(object):
"""
return self._pkey
- def set_privatekey(self, pkey):
+ def set_privatekey(self, pkey: PKey) -> None:
"""
Set the certificate portion of the PKCS #12 structure.
@@ -2586,7 +2657,7 @@ class PKCS12(object):
raise TypeError("pkey must be a PKey instance")
self._pkey = pkey
- def get_ca_certificates(self):
+ def get_ca_certificates(self) -> Optional[Tuple[X509, ...]]:
"""
Get the CA certificates in the PKCS #12 structure.
@@ -2596,8 +2667,9 @@ class PKCS12(object):
"""
if self._cacerts is not None:
return tuple(self._cacerts)
+ return None
- def set_ca_certificates(self, cacerts):
+ def set_ca_certificates(self, cacerts: Optional[Iterable[X509]]) -> None:
"""
Replace or set the CA certificates within the PKCS12 object.
@@ -2618,7 +2690,7 @@ class PKCS12(object):
)
self._cacerts = cacerts
- def set_friendlyname(self, name):
+ def set_friendlyname(self, name: Optional[bytes]) -> None:
"""
Set the friendly name in the PKCS #12 structure.
@@ -2635,7 +2707,7 @@ class PKCS12(object):
)
self._friendlyname = name
- def get_friendlyname(self):
+ def get_friendlyname(self) -> Optional[bytes]:
"""
Get the friendly name in the PKCS# 12 structure.
@@ -2644,7 +2716,12 @@ class PKCS12(object):
"""
return self._friendlyname
- def export(self, passphrase=None, iter=2048, maciter=1):
+ def export(
+ self,
+ passphrase: Optional[bytes] = None,
+ iter: int = 2048,
+ maciter: int = 1,
+ ) -> bytes:
"""
Dump a PKCS12 object as a string.
@@ -2712,16 +2789,16 @@ class PKCS12(object):
return _bio_to_string(bio)
-class NetscapeSPKI(object):
+class NetscapeSPKI:
"""
A Netscape SPKI object.
"""
- def __init__(self):
+ def __init__(self) -> None:
spki = _lib.NETSCAPE_SPKI_new()
self._spki = _ffi.gc(spki, _lib.NETSCAPE_SPKI_free)
- def sign(self, pkey, digest):
+ def sign(self, pkey: PKey, digest: str) -> None:
"""
Sign the certificate request with this key and digest type.
@@ -2729,7 +2806,7 @@ class NetscapeSPKI(object):
:type pkey: :py:class:`PKey`
:param digest: The message digest to use.
- :type digest: :py:class:`bytes`
+ :type digest: :py:class:`str`
:return: ``None``
"""
@@ -2748,7 +2825,7 @@ class NetscapeSPKI(object):
)
_openssl_assert(sign_result > 0)
- def verify(self, key):
+ def verify(self, key: PKey) -> bool:
"""
Verifies a signature on a certificate request.
@@ -2765,7 +2842,7 @@ class NetscapeSPKI(object):
_raise_current_error()
return True
- def b64_encode(self):
+ def b64_encode(self) -> bytes:
"""
Generate a base64 encoded representation of this SPKI object.
@@ -2777,7 +2854,7 @@ class NetscapeSPKI(object):
_lib.OPENSSL_free(encoded)
return result
- def get_pubkey(self):
+ def get_pubkey(self) -> PKey:
"""
Get the public key of this certificate.
@@ -2791,7 +2868,7 @@ class NetscapeSPKI(object):
pkey._only_public = True
return pkey
- def set_pubkey(self, pkey):
+ def set_pubkey(self, pkey: PKey) -> None:
"""
Set the public key of the certificate
@@ -2802,8 +2879,14 @@ class NetscapeSPKI(object):
_openssl_assert(set_result == 1)
-class _PassphraseHelper(object):
- def __init__(self, type, passphrase, more_args=False, truncate=False):
+class _PassphraseHelper:
+ def __init__(
+ self,
+ type: int,
+ passphrase: Optional[PassphraseCallableT],
+ more_args: bool = False,
+ truncate: bool = False,
+ ) -> None:
if type != FILETYPE_PEM and passphrase is not None:
raise ValueError(
"only FILETYPE_PEM key format supports encryption"
@@ -2811,10 +2894,10 @@ class _PassphraseHelper(object):
self._passphrase = passphrase
self._more_args = more_args
self._truncate = truncate
- self._problems = []
+ self._problems: List[Exception] = []
@property
- def callback(self):
+ def callback(self) -> Any:
if self._passphrase is None:
return _ffi.NULL
elif isinstance(self._passphrase, bytes) or callable(self._passphrase):
@@ -2825,7 +2908,7 @@ class _PassphraseHelper(object):
)
@property
- def callback_args(self):
+ def callback_args(self) -> Any:
if self._passphrase is None:
return _ffi.NULL
elif isinstance(self._passphrase, bytes) or callable(self._passphrase):
@@ -2835,7 +2918,7 @@ class _PassphraseHelper(object):
"Last argument must be a byte string or a callable."
)
- def raise_if_problem(self, exceptionType=Error):
+ def raise_if_problem(self, exceptionType: Type[Exception] = Error) -> None:
if self._problems:
# Flush the OpenSSL error queue
@@ -2846,7 +2929,9 @@ class _PassphraseHelper(object):
raise self._problems.pop(0)
- def _read_passphrase(self, buf, size, rwflag, userdata):
+ def _read_passphrase(
+ self, buf: Any, size: int, rwflag: Any, userdata: Any
+ ) -> int:
try:
if callable(self._passphrase):
if self._more_args:
@@ -2854,6 +2939,7 @@ class _PassphraseHelper(object):
else:
result = self._passphrase(rwflag)
else:
+ assert self._passphrase is not None
result = self._passphrase
if not isinstance(result, bytes):
raise ValueError("Bytes expected")
@@ -2872,7 +2958,7 @@ class _PassphraseHelper(object):
return 0
-def load_publickey(type, buffer):
+def load_publickey(type: int, buffer: Union[str, bytes]) -> PKey:
"""
Load a public key from a buffer.
@@ -2883,7 +2969,7 @@ def load_publickey(type, buffer):
:return: The PKey object.
:rtype: :class:`PKey`
"""
- if isinstance(buffer, _text_type):
+ if isinstance(buffer, str):
buffer = buffer.encode("ascii")
bio = _new_mem_buf(buffer)
@@ -2906,7 +2992,11 @@ def load_publickey(type, buffer):
return pkey
-def load_privatekey(type, buffer, passphrase=None):
+def load_privatekey(
+ type: int,
+ buffer: Union[str, bytes],
+ passphrase: Optional[PassphraseCallableT] = None,
+) -> PKey:
"""
Load a private key (PKey) from the string *buffer* encoded with the type
*type*.
@@ -2919,7 +3009,7 @@ def load_privatekey(type, buffer, passphrase=None):
:return: The PKey object
"""
- if isinstance(buffer, _text_type):
+ if isinstance(buffer, str):
buffer = buffer.encode("ascii")
bio = _new_mem_buf(buffer)
@@ -2943,7 +3033,7 @@ def load_privatekey(type, buffer, passphrase=None):
return pkey
-def dump_certificate_request(type, req):
+def dump_certificate_request(type: int, req: X509Req) -> bytes:
"""
Dump the certificate request *req* into a buffer string encoded with the
type *type*.
@@ -2971,7 +3061,7 @@ def dump_certificate_request(type, req):
return _bio_to_string(bio)
-def load_certificate_request(type, buffer):
+def load_certificate_request(type: int, buffer: bytes) -> X509Req:
"""
Load a certificate request (X509Req) from the string *buffer* encoded with
the type *type*.
@@ -2980,7 +3070,7 @@ def load_certificate_request(type, buffer):
:param buffer: The buffer the certificate request is stored in
:return: The X509Req object
"""
- if isinstance(buffer, _text_type):
+ if isinstance(buffer, str):
buffer = buffer.encode("ascii")
bio = _new_mem_buf(buffer)
@@ -2999,7 +3089,7 @@ def load_certificate_request(type, buffer):
return x509req
-def sign(pkey, data, digest):
+def sign(pkey: PKey, data: Union[str, bytes], digest: str) -> bytes:
"""
Sign a data string using the given key and message digest.
@@ -3016,8 +3106,8 @@ def sign(pkey, data, digest):
if digest_obj == _ffi.NULL:
raise ValueError("No such digest method")
- md_ctx = _lib.Cryptography_EVP_MD_CTX_new()
- md_ctx = _ffi.gc(md_ctx, _lib.Cryptography_EVP_MD_CTX_free)
+ md_ctx = _lib.EVP_MD_CTX_new()
+ md_ctx = _ffi.gc(md_ctx, _lib.EVP_MD_CTX_free)
_lib.EVP_SignInit(md_ctx, digest_obj)
_lib.EVP_SignUpdate(md_ctx, data, len(data))
@@ -3034,7 +3124,9 @@ def sign(pkey, data, digest):
return _ffi.buffer(signature_buffer, signature_length[0])[:]
-def verify(cert, signature, data, digest):
+def verify(
+ cert: X509, signature: bytes, data: Union[str, bytes], digest: str
+) -> None:
"""
Verify the signature for a data string.
@@ -3057,8 +3149,8 @@ def verify(cert, signature, data, digest):
_openssl_assert(pkey != _ffi.NULL)
pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free)
- md_ctx = _lib.Cryptography_EVP_MD_CTX_new()
- md_ctx = _ffi.gc(md_ctx, _lib.Cryptography_EVP_MD_CTX_free)
+ md_ctx = _lib.EVP_MD_CTX_new()
+ md_ctx = _ffi.gc(md_ctx, _lib.EVP_MD_CTX_free)
_lib.EVP_VerifyInit(md_ctx, digest_obj)
_lib.EVP_VerifyUpdate(md_ctx, data, len(data))
@@ -3070,7 +3162,7 @@ def verify(cert, signature, data, digest):
_raise_current_error()
-def dump_crl(type, crl):
+def dump_crl(type: int, crl: CRL) -> bytes:
"""
Dump a certificate revocation list to a buffer.
@@ -3099,7 +3191,7 @@ def dump_crl(type, crl):
return _bio_to_string(bio)
-def load_crl(type, buffer):
+def load_crl(type: int, buffer: Union[str, bytes]) -> CRL:
"""
Load Certificate Revocation List (CRL) data from a string *buffer*.
*buffer* encoded with the type *type*.
@@ -3107,9 +3199,9 @@ def load_crl(type, buffer):
:param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
:param buffer: The buffer the CRL is stored in
- :return: The PKey object
+ :return: The CRL object
"""
- if isinstance(buffer, _text_type):
+ if isinstance(buffer, str):
buffer = buffer.encode("ascii")
bio = _new_mem_buf(buffer)
@@ -3129,7 +3221,7 @@ def load_crl(type, buffer):
return result
-def load_pkcs7_data(type, buffer):
+def load_pkcs7_data(type: int, buffer: Union[str, bytes]) -> PKCS7:
"""
Load pkcs7 data from the string *buffer* encoded with the type
*type*.
@@ -3138,7 +3230,7 @@ def load_pkcs7_data(type, buffer):
:param buffer: The buffer with the pkcs7 data.
:return: The PKCS7 object
"""
- if isinstance(buffer, _text_type):
+ if isinstance(buffer, str):
buffer = buffer.encode("ascii")
bio = _new_mem_buf(buffer)
@@ -3158,7 +3250,7 @@ def load_pkcs7_data(type, buffer):
return pypkcs7
-load_pkcs7_data = utils.deprecated(
+utils.deprecated(
load_pkcs7_data,
__name__,
(
@@ -3166,10 +3258,13 @@ load_pkcs7_data = utils.deprecated(
"in cryptography."
),
DeprecationWarning,
+ name="load_pkcs7_data",
)
-def load_pkcs12(buffer, passphrase=None):
+def load_pkcs12(
+ buffer: Union[str, bytes], passphrase: Optional[bytes] = None
+) -> PKCS12:
"""
Load pkcs12 data from the string *buffer*. If the pkcs12 structure is
encrypted, a *passphrase* must be included. The MAC is always
@@ -3183,7 +3278,7 @@ def load_pkcs12(buffer, passphrase=None):
"""
passphrase = _text_to_bytes_and_warn("passphrase", passphrase)
- if isinstance(buffer, _text_type):
+ if isinstance(buffer, str):
buffer = buffer.encode("ascii")
bio = _new_mem_buf(buffer)
@@ -3245,18 +3340,16 @@ def load_pkcs12(buffer, passphrase=None):
x509 = _lib.sk_X509_value(cacerts, i)
pycacert = X509._from_raw_x509_ptr(x509)
pycacerts.append(pycacert)
- if not pycacerts:
- pycacerts = None
pkcs12 = PKCS12.__new__(PKCS12)
pkcs12._pkey = pykey
pkcs12._cert = pycert
- pkcs12._cacerts = pycacerts
+ pkcs12._cacerts = pycacerts if pycacerts else None
pkcs12._friendlyname = friendlyname
return pkcs12
-load_pkcs12 = utils.deprecated(
+utils.deprecated(
load_pkcs12,
__name__,
(
@@ -3264,25 +3357,5 @@ load_pkcs12 = utils.deprecated(
"in cryptography."
),
DeprecationWarning,
+ name="load_pkcs12",
)
-
-
-# There are no direct unit tests for this initialization. It is tested
-# indirectly since it is necessary for functions like dump_privatekey when
-# using encryption.
-#
-# Thus OpenSSL.test.test_crypto.FunctionTests.test_dump_privatekey_passphrase
-# and some other similar tests may fail without this (though they may not if
-# the Python runtime has already done some initialization of the underlying
-# OpenSSL library (and is linked against the same one that cryptography is
-# using)).
-_lib.OpenSSL_add_all_algorithms()
-
-# This is similar but exercised mainly by exception_from_error_queue. It calls
-# both ERR_load_crypto_strings() and ERR_load_SSL_strings().
-_lib.SSL_load_error_strings()
-
-
-# Set the default string mask to match OpenSSL upstream (since 2005) and
-# RFC5280 recommendations.
-_lib.ASN1_STRING_set_default_mask_asc(b"utf8only")
diff --git a/contrib/python/pyOpenSSL/py3/OpenSSL/debug.py b/contrib/python/pyOpenSSL/py3/OpenSSL/debug.py
index 04521d59226..e39b128a7e2 100644
--- a/contrib/python/pyOpenSSL/py3/OpenSSL/debug.py
+++ b/contrib/python/pyOpenSSL/py3/OpenSSL/debug.py
@@ -3,14 +3,16 @@ from __future__ import print_function
import ssl
import sys
-import OpenSSL.SSL
import cffi
+
import cryptography
+import OpenSSL.SSL
+
from . import version
-_env_info = u"""\
+_env_info = """\
pyOpenSSL: {pyopenssl}
cryptography: {cryptography}
cffi: {cffi}
diff --git a/contrib/python/pyOpenSSL/py3/OpenSSL/rand.py b/contrib/python/pyOpenSSL/py3/OpenSSL/rand.py
index d2c17673e5e..a4aa7210dda 100644
--- a/contrib/python/pyOpenSSL/py3/OpenSSL/rand.py
+++ b/contrib/python/pyOpenSSL/py3/OpenSSL/rand.py
@@ -5,7 +5,7 @@ PRNG management routines, thin wrappers.
from OpenSSL._util import lib as _lib
-def add(buffer, entropy):
+def add(buffer: bytes, entropy: int) -> None:
"""
Mix bytes from *string* into the PRNG state.
@@ -31,7 +31,7 @@ def add(buffer, entropy):
_lib.RAND_add(buffer, len(buffer), entropy)
-def status():
+def status() -> int:
"""
Check whether the PRNG has been seeded with enough data.
diff --git a/contrib/python/pyOpenSSL/py3/OpenSSL/version.py b/contrib/python/pyOpenSSL/py3/OpenSSL/version.py
index c6fcecb0774..61ec17b2d8f 100644
--- a/contrib/python/pyOpenSSL/py3/OpenSSL/version.py
+++ b/contrib/python/pyOpenSSL/py3/OpenSSL/version.py
@@ -17,7 +17,7 @@ __all__ = [
"__version__",
]
-__version__ = "21.0.0"
+__version__ = "23.0.0"
__title__ = "pyOpenSSL"
__uri__ = "https://pyopenssl.org/"
@@ -25,4 +25,4 @@ __summary__ = "Python wrapper module around the OpenSSL library"
__author__ = "The pyOpenSSL developers"
__email__ = "[email protected]"
__license__ = "Apache License, Version 2.0"
-__copyright__ = "Copyright 2001-2020 {0}".format(__author__)
+__copyright__ = "Copyright 2001-2023 {0}".format(__author__)
diff --git a/contrib/python/pyOpenSSL/py3/README.rst b/contrib/python/pyOpenSSL/py3/README.rst
index a628c8aea9f..4fecb9cc147 100644
--- a/contrib/python/pyOpenSSL/py3/README.rst
+++ b/contrib/python/pyOpenSSL/py3/README.rst
@@ -36,7 +36,7 @@ If you run into bugs, you can file them in our `issue tracker`_.
We maintain a cryptography-dev_ mailing list for both user and development discussions.
-You can also join ``#cryptography-dev`` on Freenode to ask questions or get involved.
+You can also join ``#pyca`` on ``irc.libera.chat`` to ask questions or get involved.
.. _documentation: https://pyopenssl.org/
diff --git a/contrib/python/pyOpenSSL/py3/patches/01-fix-tests.patch b/contrib/python/pyOpenSSL/py3/patches/01-fix-tests.patch
index ab5a02e23c6..2f82d67bebe 100644
--- a/contrib/python/pyOpenSSL/py3/patches/01-fix-tests.patch
+++ b/contrib/python/pyOpenSSL/py3/patches/01-fix-tests.patch
@@ -9,11 +9,3 @@
"""
`Context.set_default_verify_paths` causes the platform-specific CA
certificate locations to be used for verification purposes.
-@@ -1927,6 +1927,7 @@ class TestApplicationLayerProtoNegotiation(object):
- assert server.get_alpn_proto_negotiated() == b"spdy/2"
- assert client.get_alpn_proto_negotiated() == b"spdy/2"
-
-+ @pytest.mark.xfail(reason='https://github.com/pyca/pyopenssl/issues/1043')
- def test_alpn_call_failure(self):
- """
- SSL_CTX_set_alpn_protos does not like to be called with an empty
diff --git a/contrib/python/pyOpenSSL/py3/tests/memdbg.py b/contrib/python/pyOpenSSL/py3/tests/memdbg.py
index 590b72d0682..0dd8c318e4e 100644
--- a/contrib/python/pyOpenSSL/py3/tests/memdbg.py
+++ b/contrib/python/pyOpenSSL/py3/tests/memdbg.py
@@ -1,5 +1,4 @@
import sys
-
import traceback
from cffi import api as _api
diff --git a/contrib/python/pyOpenSSL/py3/tests/test_crypto.py b/contrib/python/pyOpenSSL/py3/tests/test_crypto.py
index ef3429d17fd..44bbd0f2aba 100644
--- a/contrib/python/pyOpenSSL/py3/tests/test_crypto.py
+++ b/contrib/python/pyOpenSSL/py3/tests/test_crypto.py
@@ -4,56 +4,65 @@
"""
Unit tests for :py:mod:`OpenSSL.crypto`.
"""
-
-from warnings import simplefilter
-
import base64
-from subprocess import PIPE, Popen
-from datetime import datetime, timedelta
import sys
-
-import pytest
+from datetime import datetime, timedelta
+from subprocess import PIPE, Popen
+from warnings import simplefilter
from cryptography import x509
-from cryptography.hazmat.backends.openssl.backend import backend
from cryptography.hazmat.primitives import serialization
-from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives.asymmetric import ec, ed25519, ed448, rsa
import flaky
-from OpenSSL.crypto import TYPE_RSA, TYPE_DSA, Error, PKey
-from OpenSSL.crypto import X509, X509Name
+import pytest
+
+from OpenSSL._util import ffi as _ffi, lib as _lib
from OpenSSL.crypto import (
+ CRL,
+ Error,
+ FILETYPE_ASN1,
+ FILETYPE_PEM,
+ FILETYPE_TEXT,
+ NetscapeSPKI,
+ PKCS12,
+ PKCS7,
+ PKey,
+ Revoked,
+ TYPE_DSA,
+ TYPE_RSA,
+ X509,
+ X509Extension,
+ X509Name,
+ X509Req,
X509Store,
- X509StoreFlags,
X509StoreContext,
X509StoreContextError,
-)
-from OpenSSL.crypto import X509Req
-from OpenSSL.crypto import X509Extension
-from OpenSSL.crypto import load_certificate, load_privatekey
-from OpenSSL.crypto import load_publickey, dump_publickey
-from OpenSSL.crypto import FILETYPE_PEM, FILETYPE_ASN1, FILETYPE_TEXT
-from OpenSSL.crypto import dump_certificate, load_certificate_request
-from OpenSSL.crypto import dump_certificate_request, dump_privatekey
-from OpenSSL.crypto import PKCS7, load_pkcs7_data
-from OpenSSL.crypto import PKCS12, load_pkcs12
-from OpenSSL.crypto import CRL, Revoked, dump_crl, load_crl
-from OpenSSL.crypto import NetscapeSPKI
-from OpenSSL.crypto import (
- sign,
- verify,
+ X509StoreFlags,
+ dump_certificate,
+ dump_certificate_request,
+ dump_crl,
+ dump_privatekey,
+ dump_publickey,
get_elliptic_curve,
get_elliptic_curves,
+ load_certificate,
+ load_certificate_request,
+ load_crl,
+ load_pkcs12,
+ load_pkcs7_data,
+ load_privatekey,
+ load_publickey,
+ sign,
+ verify,
)
-from OpenSSL._util import ffi as _ffi, lib as _lib
-
from .util import (
EqualityTestsMixin,
- is_consistent_type,
- WARNING_TYPE_EXPECTED,
NON_ASCII,
+ WARNING_TYPE_EXPECTED,
+ is_consistent_type,
)
@@ -64,7 +73,7 @@ def normalize_privatekey_pem(pem):
GOOD_CIPHER = "blowfish"
BAD_CIPHER = "zippers"
-GOOD_DIGEST = "SHA1"
+GOOD_DIGEST = "SHA256"
BAD_DIGEST = "monkeys"
old_root_cert_pem = b"""-----BEGIN CERTIFICATE-----
@@ -609,9 +618,10 @@ vrzEeLDRiiPl92dyyWmu
-----END X509 CRL-----
"""
+# The signature on this CRL is invalid.
crlDataUnsupportedExtension = b"""\
-----BEGIN X509 CRL-----
-MIIGRzCCBS8CAQIwDQYJKoZIhvcNAQELBQAwJzELMAkGA1UEBhMCVVMxGDAWBgNV
+MIIGRzCCBS8CAQEwDQYJKoZIhvcNAQELBQAwJzELMAkGA1UEBhMCVVMxGDAWBgNV
BAMMD2NyeXB0b2dyYXBoeS5pbxgPMjAxNTAxMDEwMDAwMDBaGA8yMDE2MDEwMTAw
MDAwMFowggTOMBQCAQAYDzIwMTUwMTAxMDAwMDAwWjByAgEBGA8yMDE1MDEwMTAw
MDAwMFowXDAYBgNVHRgEERgPMjAxNTAxMDEwMDAwMDBaMDQGA1UdHQQtMCukKTAn
@@ -783,6 +793,21 @@ MBsCAQACAS0CAQcCAQACAQ8CAQMCAQACAQACAQA=
-----END RSA PRIVATE KEY-----
"""
+ed25519_private_key_pem = b"""-----BEGIN PRIVATE KEY-----
+MC4CAQAwBQYDK2VwBCIEIKlxBbhVsSURoLTmsu9uTqYH6oF7zpxmp1ZQCAPhDmI2
+-----END PRIVATE KEY-----
+"""
+
+ed448_private_key_pem = b"""-----BEGIN PRIVATE KEY-----
+MEcCAQAwBQYDK2VxBDsEOcqZ7a3k6JwrJbYO8CNTPT/d7dlWCo5vCf0EYDj79ZvA\nhD8u9EPHlYJw5Y8ZQdH4WmVEfpKA23xkdQ==
+-----END PRIVATE KEY-----
+"""
+
+x25519_private_key_pem = b"""-----BEGIN PRIVATE KEY-----
+MC4CAQAwBQYDK2VuBCIEIPAjVfPNTm25VxtBRg+JjjFx9tA3M8aaBdVhjb92iBts
+-----END PRIVATE KEY-----
+"""
+
@pytest.fixture
def x509_data():
@@ -809,7 +834,7 @@ def x509_data():
yield pkey, x509
-class TestX509Ext(object):
+class TestX509Ext:
"""
Tests for `OpenSSL.crypto.X509Extension`.
"""
@@ -914,7 +939,7 @@ class TestX509Ext(object):
b"basicConstraints", False, b"CA:TRUE", subject=x509
)
x509.add_extensions([ext1])
- x509.sign(pkey, "sha1")
+ x509.sign(pkey, "sha256")
# This is a little lame. Can we think of a better way?
text = dump_certificate(FILETYPE_TEXT, x509)
assert b"X509v3 Basic Constraints:" in text
@@ -930,7 +955,7 @@ class TestX509Ext(object):
b"subjectKeyIdentifier", False, b"hash", subject=x509
)
x509.add_extensions([ext3])
- x509.sign(pkey, "sha1")
+ x509.sign(pkey, "sha256")
text = dump_certificate(FILETYPE_TEXT, x509)
assert b"X509v3 Subject Key Identifier:" in text
@@ -963,7 +988,7 @@ class TestX509Ext(object):
b"basicConstraints", False, b"CA:TRUE", issuer=x509
)
x509.add_extensions([ext1])
- x509.sign(pkey, "sha1")
+ x509.sign(pkey, "sha256")
text = dump_certificate(FILETYPE_TEXT, x509)
assert b"X509v3 Basic Constraints:" in text
assert b"CA:TRUE" in text
@@ -978,7 +1003,7 @@ class TestX509Ext(object):
b"authorityKeyIdentifier", False, b"issuer:always", issuer=x509
)
x509.add_extensions([ext2])
- x509.sign(pkey, "sha1")
+ x509.sign(pkey, "sha256")
text = dump_certificate(FILETYPE_TEXT, x509)
assert b"X509v3 Authority Key Identifier:" in text
assert b"DirName:/CN=Yoda root CA" in text
@@ -1008,22 +1033,40 @@ class TestX509Ext(object):
)
-class TestPKey(object):
+class TestPKey:
"""
Tests for `OpenSSL.crypto.PKey`.
"""
- def test_convert_from_cryptography_private_key(self):
+ @pytest.mark.parametrize(
+ ("key_string", "key_type"),
+ [
+ (intermediate_key_pem, rsa.RSAPrivateKey),
+ (ec_private_key_pem, ec.EllipticCurvePrivateKey),
+ (ed25519_private_key_pem, ed25519.Ed25519PrivateKey),
+ (ed448_private_key_pem, ed448.Ed448PrivateKey),
+ ],
+ )
+ def test_convert_roundtrip_cryptography_private_key(
+ self, key_string, key_type
+ ):
"""
PKey.from_cryptography_key creates a proper private PKey.
+ PKey.to_cryptography_key creates a proper cryptography private key.
"""
- key = serialization.load_pem_private_key(
- intermediate_key_pem, None, backend
- )
+ key = serialization.load_pem_private_key(key_string, None)
pkey = PKey.from_cryptography_key(key)
assert isinstance(pkey, PKey)
- assert pkey.bits() == key.key_size
+ parsed_key = pkey.to_cryptography_key()
+ assert isinstance(parsed_key, key_type)
+ assert parsed_key.public_key().public_bytes(
+ serialization.Encoding.PEM,
+ serialization.PublicFormat.SubjectPublicKeyInfo,
+ ) == key.public_key().public_bytes(
+ serialization.Encoding.PEM,
+ serialization.PublicFormat.SubjectPublicKeyInfo,
+ )
assert pkey._only_public is False
assert pkey._initialized is True
@@ -1031,7 +1074,7 @@ class TestPKey(object):
"""
PKey.from_cryptography_key creates a proper public PKey.
"""
- key = serialization.load_pem_public_key(cleartextPublicKeyPEM, backend)
+ key = serialization.load_pem_public_key(cleartextPublicKeyPEM)
pkey = PKey.from_cryptography_key(key)
assert isinstance(pkey, PKey)
@@ -1043,9 +1086,7 @@ class TestPKey(object):
"""
PKey.from_cryptography_key raises TypeError with an unsupported type.
"""
- key = serialization.load_pem_private_key(
- ec_private_key_pem, None, backend
- )
+ key = serialization.load_pem_private_key(x25519_private_key_pem, None)
with pytest.raises(TypeError):
PKey.from_cryptography_key(key)
@@ -1059,16 +1100,6 @@ class TestPKey(object):
assert isinstance(key, rsa.RSAPublicKey)
assert pkey.bits() == key.key_size
- def test_convert_private_pkey_to_cryptography_key(self):
- """
- PKey.to_cryptography_key creates a proper cryptography private key.
- """
- pkey = load_privatekey(FILETYPE_PEM, root_key_pem)
- key = pkey.to_cryptography_key()
-
- assert isinstance(key, rsa.RSAPrivateKey)
- assert pkey.bits() == key.key_size
-
def test_type(self):
"""
`PKey` can be used to create instances of that type.
@@ -1175,10 +1206,11 @@ class TestPKey(object):
def test_inconsistent_key(self):
"""
- `PKey.check` returns `Error` if the key is not consistent.
+ Either `load_privatekey` or `PKey.check` returns `Error` if the key is
+ not consistent.
"""
- key = load_privatekey(FILETYPE_PEM, inconsistentPrivateKeyPEM)
with pytest.raises(Error):
+ key = load_privatekey(FILETYPE_PEM, inconsistentPrivateKeyPEM)
key.check()
def test_check_public_key(self):
@@ -1197,10 +1229,11 @@ class TestPKey(object):
def test_check_pr_897(self):
"""
- `PKey.check` raises `OpenSSL.crypto.Error` if provided with broken key
+ Either `load_privatekey` or `PKey.check` raises `OpenSSL.crypto.Error`
+ if provided with broken key
"""
- pkey = load_privatekey(FILETYPE_PEM, rsa_p_not_prime_pem)
with pytest.raises(Error):
+ pkey = load_privatekey(FILETYPE_PEM, rsa_p_not_prime_pem)
pkey.check()
@@ -1222,7 +1255,7 @@ def x509_name(**attrs):
return name
-class TestX509Name(object):
+class TestX509Name:
"""
Unit tests for `OpenSSL.crypto.X509Name`.
"""
@@ -1398,6 +1431,19 @@ class TestX509Name(object):
# other X509Name.
assert_greater_than(x509_name(CN="def"), x509_name(CN="abc"))
+ def assert_raises(a, b):
+ with pytest.raises(TypeError):
+ a < b
+ with pytest.raises(TypeError):
+ a <= b
+ with pytest.raises(TypeError):
+ a > b
+ with pytest.raises(TypeError):
+ a >= b
+
+ # Only X509Name objects can be compared with lesser than / greater than
+ assert_raises(x509_name(), object())
+
def test_hash(self):
"""
`X509Name.hash` returns an integer hash based on the value of the name.
@@ -1555,14 +1601,20 @@ class TestX509Req(_PKeyInteractionTestsMixin):
"""
`X509Req.set_version` sets the X.509 version of the certificate
request. `X509Req.get_version` returns the X.509 version of the
- certificate request. The initial value of the version is 0.
+ certificate request. The only defined version is 0. Others may or
+ may not be supported depending on backend.
"""
request = X509Req()
assert request.get_version() == 0
- request.set_version(1)
- assert request.get_version() == 1
- request.set_version(3)
- assert request.get_version() == 3
+ request.set_version(0)
+ assert request.get_version() == 0
+ try:
+ request.set_version(1)
+ assert request.get_version() == 1
+ request.set_version(3)
+ assert request.get_version() == 3
+ except Error:
+ pass
def test_version_wrong_args(self):
"""
@@ -1686,9 +1738,7 @@ class TestX509Req(_PKeyInteractionTestsMixin):
assert request.verify(pkey)
def test_convert_from_cryptography(self):
- crypto_req = x509.load_pem_x509_csr(
- cleartextCertificateRequestPEM, backend
- )
+ crypto_req = x509.load_pem_x509_csr(cleartextCertificateRequestPEM)
req = X509Req.from_cryptography(crypto_req)
assert isinstance(req, X509Req)
@@ -1752,8 +1802,8 @@ class TestX509(_PKeyInteractionTestsMixin):
`X509.get_version` retrieves it.
"""
cert = X509()
- cert.set_version(1234)
- assert cert.get_version() == 1234
+ cert.set_version(2)
+ assert cert.get_version() == 2
def test_serial_number(self):
"""
@@ -1767,12 +1817,12 @@ class TestX509(_PKeyInteractionTestsMixin):
assert certificate.get_serial_number() == 0
certificate.set_serial_number(1)
assert certificate.get_serial_number() == 1
- certificate.set_serial_number(2 ** 32 + 1)
- assert certificate.get_serial_number() == 2 ** 32 + 1
- certificate.set_serial_number(2 ** 64 + 1)
- assert certificate.get_serial_number() == 2 ** 64 + 1
- certificate.set_serial_number(2 ** 128 + 1)
- assert certificate.get_serial_number() == 2 ** 128 + 1
+ certificate.set_serial_number(2**32 + 1)
+ assert certificate.get_serial_number() == 2**32 + 1
+ certificate.set_serial_number(2**64 + 1)
+ assert certificate.get_serial_number() == 2**64 + 1
+ certificate.set_serial_number(2**128 + 1)
+ assert certificate.get_serial_number() == 2**128 + 1
def _setBoundTest(self, which):
"""
@@ -1920,6 +1970,14 @@ class TestX509(_PKeyInteractionTestsMixin):
cert.gmtime_adj_notAfter(2)
assert not cert.has_expired()
+ def test_has_expired_exception(self):
+ """
+ `X509.has_expired` throws ValueError if not-after time is not set
+ """
+ cert = X509()
+ with pytest.raises(ValueError):
+ cert.has_expired()
+
def test_root_has_not_expired(self):
"""
`X509.has_expired` returns `False` if the certificate's not-after time
@@ -1935,13 +1993,13 @@ class TestX509(_PKeyInteractionTestsMixin):
"""
cert = load_certificate(FILETYPE_PEM, old_root_cert_pem)
assert (
- # This is MD5 instead of GOOD_DIGEST because the digest algorithm
- # actually matters to the assertion (ie, another arbitrary, good
- # digest will not product the same digest).
# Digest verified with the command:
- # openssl x509 -in root_cert.pem -noout -fingerprint -md5
- cert.digest("MD5")
- == b"19:B3:05:26:2B:F8:F2:FF:0B:8F:21:07:A8:28:B8:75"
+ # openssl x509 -in root_cert.pem -noout -fingerprint -sha256
+ cert.digest("SHA256")
+ == (
+ b"3E:0F:16:39:6B:B1:3E:4F:08:85:C6:5F:10:0D:CB:2C:"
+ b"25:C2:91:4E:D0:4A:C2:29:06:BD:55:E3:A7:B3:B7:06"
+ )
)
def _extcert(self, pkey, extensions):
@@ -1957,7 +2015,7 @@ class TestX509(_PKeyInteractionTestsMixin):
cert.set_notAfter(when)
cert.add_extensions(extensions)
- cert.sign(pkey, "sha1")
+ cert.sign(pkey, "sha256")
return load_certificate(
FILETYPE_PEM, dump_certificate(FILETYPE_PEM, cert)
)
@@ -2036,8 +2094,8 @@ class TestX509(_PKeyInteractionTestsMixin):
b"DNS:altnull.python.org\x00example.com, "
b"URI:http://null.python.org\x00http://example.org, "
- b"IP Address:192.0.2.1, IP Address:2001:DB8:0:0:0:0:0:1\n"
- == str(ext).encode("ascii")
+ b"IP Address:192.0.2.1, IP Address:2001:DB8:0:0:0:0:0:1"
+ == str(ext).encode("ascii").strip()
)
def test_invalid_digest_algorithm(self):
@@ -2204,9 +2262,7 @@ tgI5
cert.sign(object(), b"sha256")
def test_convert_from_cryptography(self):
- crypto_cert = x509.load_pem_x509_certificate(
- intermediate_cert_pem, backend
- )
+ crypto_cert = x509.load_pem_x509_certificate(intermediate_cert_pem)
cert = X509.from_cryptography(crypto_cert)
assert isinstance(cert, X509)
@@ -2224,7 +2280,7 @@ tgI5
assert crypto_cert.version.value == cert.get_version()
-class TestX509Store(object):
+class TestX509Store:
"""
Test for `OpenSSL.crypto.X509Store`.
"""
@@ -2295,7 +2351,7 @@ class TestX509Store(object):
def test_load_locations_parameters(
self, cafile, capath, call_cafile, call_capath, monkeypatch
):
- class LibMock(object):
+ class LibMock:
def load_locations(self, store, cafile, capath):
self.cafile = cafile
self.capath = capath
@@ -2326,7 +2382,7 @@ class TestX509Store(object):
store.load_locations(cafile=str(invalid_ca_file))
-class TestPKCS12(object):
+class TestPKCS12:
"""
Test for `OpenSSL.crypto.PKCS12` and `OpenSSL.crypto.load_pkcs12`.
"""
@@ -2467,7 +2523,7 @@ class TestPKCS12(object):
b"-nodes",
b"-passin",
b"pass:" + passwd,
- *extra
+ *extra,
)
assert recovered_key[-len(key) :] == key
if cert:
@@ -2479,7 +2535,7 @@ class TestPKCS12(object):
b"-passin",
b"pass:" + passwd,
b"-nokeys",
- *extra
+ *extra,
)
assert recovered_cert[-len(cert) :] == cert
if ca:
@@ -2491,7 +2547,7 @@ class TestPKCS12(object):
b"-passin",
b"pass:" + passwd,
b"-nokeys",
- *extra
+ *extra,
)
assert recovered_cert[-len(ca) :] == ca
@@ -2802,7 +2858,7 @@ def _runopenssl(pem, *args):
return output
-class TestLoadPublicKey(object):
+class TestLoadPublicKey:
"""
Tests for :func:`load_publickey`.
"""
@@ -2842,7 +2898,7 @@ class TestLoadPublicKey(object):
assert dumped_pem == cleartextPublicKeyPEM
-class TestFunction(object):
+class TestFunction:
"""
Tests for free-functions in the `OpenSSL.crypto` module.
"""
@@ -3263,7 +3319,7 @@ class TestFunction(object):
load_pkcs7_data(object(), b"foo")
-class TestLoadCertificate(object):
+class TestLoadCertificate:
"""
Tests for `load_certificate_request`.
"""
@@ -3287,7 +3343,7 @@ class TestLoadCertificate(object):
load_certificate(FILETYPE_ASN1, b"lol")
-class TestPKCS7(object):
+class TestPKCS7:
"""
Tests for `PKCS7`.
"""
@@ -3387,7 +3443,7 @@ class TestNetscapeSPKI(_PKeyInteractionTestsMixin):
assert isinstance(blob, bytes)
-class TestRevoked(object):
+class TestRevoked:
"""
Tests for `OpenSSL.crypto.Revoked`.
"""
@@ -3477,7 +3533,7 @@ class TestRevoked(object):
revoked.set_reason(None)
assert revoked.get_reason() is None
- @pytest.mark.parametrize("reason", [object(), 1.0, u"foo"])
+ @pytest.mark.parametrize("reason", [object(), 1.0, "foo"])
def test_set_reason_wrong_args(self, reason):
"""
`Revoked.set_reason` raises `TypeError` if called with an argument
@@ -3497,7 +3553,7 @@ class TestRevoked(object):
revoked.set_reason(b"blue")
-class TestCRL(object):
+class TestCRL:
"""
Tests for `OpenSSL.crypto.CRL`.
"""
@@ -3548,17 +3604,17 @@ class TestCRL(object):
dumped_crl = self._get_crl().export(
self.cert, self.pkey, days=20, digest=b"sha256"
)
- crl = x509.load_pem_x509_crl(dumped_crl, backend)
+ crl = x509.load_pem_x509_crl(dumped_crl)
revoked = crl.get_revoked_certificate_by_serial_number(0x03AB)
assert revoked is not None
assert crl.issuer == x509.Name(
[
- x509.NameAttribute(x509.NameOID.COUNTRY_NAME, u"US"),
- x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, u"IL"),
- x509.NameAttribute(x509.NameOID.LOCALITY_NAME, u"Chicago"),
- x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, u"Testing"),
+ x509.NameAttribute(x509.NameOID.COUNTRY_NAME, "US"),
+ x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, "IL"),
+ x509.NameAttribute(x509.NameOID.LOCALITY_NAME, "Chicago"),
+ x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, "Testing"),
x509.NameAttribute(
- x509.NameOID.COMMON_NAME, u"Testing Root CA"
+ x509.NameOID.COMMON_NAME, "Testing Root CA"
),
]
)
@@ -3573,19 +3629,19 @@ class TestCRL(object):
# DER format
dumped_crl = self._get_crl().export(
- self.cert, self.pkey, FILETYPE_ASN1, digest=b"md5"
+ self.cert, self.pkey, FILETYPE_ASN1, digest=b"sha256"
)
- crl = x509.load_der_x509_crl(dumped_crl, backend)
+ crl = x509.load_der_x509_crl(dumped_crl)
revoked = crl.get_revoked_certificate_by_serial_number(0x03AB)
assert revoked is not None
assert crl.issuer == x509.Name(
[
- x509.NameAttribute(x509.NameOID.COUNTRY_NAME, u"US"),
- x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, u"IL"),
- x509.NameAttribute(x509.NameOID.LOCALITY_NAME, u"Chicago"),
- x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, u"Testing"),
+ x509.NameAttribute(x509.NameOID.COUNTRY_NAME, "US"),
+ x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, "IL"),
+ x509.NameAttribute(x509.NameOID.LOCALITY_NAME, "Chicago"),
+ x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, "Testing"),
x509.NameAttribute(
- x509.NameOID.COMMON_NAME, u"Testing Root CA"
+ x509.NameOID.COMMON_NAME, "Testing Root CA"
),
]
)
@@ -3600,7 +3656,7 @@ class TestCRL(object):
# text format
dumped_text = crl.export(
- self.cert, self.pkey, type=FILETYPE_TEXT, digest=b"md5"
+ self.cert, self.pkey, type=FILETYPE_TEXT, digest=b"sha256"
)
assert len(dumped_text) > 500
@@ -3610,9 +3666,9 @@ class TestCRL(object):
signature algorithm based on that digest function.
"""
crl = self._get_crl()
- dumped_crl = crl.export(self.cert, self.pkey, digest=b"sha1")
+ dumped_crl = crl.export(self.cert, self.pkey, digest=b"sha384")
text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text")
- text.index(b"Signature Algorithm: sha1")
+ text.index(b"Signature Algorithm: sha384")
def test_export_md5_digest(self):
"""
@@ -3794,7 +3850,9 @@ class TestCRL(object):
crl.add_revoked(revoked)
crl.set_version(1)
crl.set_lastUpdate(b"20140601000000Z")
- crl.set_nextUpdate(b"20180601000000Z")
+ # The year 5000 is far into the future so that this CRL isn't
+ # considered to have expired.
+ crl.set_nextUpdate(b"50000601000000Z")
crl.sign(issuer_cert, issuer_key, digest=b"sha512")
return crl
@@ -3820,7 +3878,7 @@ class TestCRL(object):
store_ctx = X509StoreContext(store, self.intermediate_server_cert)
with pytest.raises(X509StoreContextError) as err:
store_ctx.verify_certificate()
- assert err.value.args[0][2] == "certificate revoked"
+ assert str(err.value) == "certificate revoked"
def test_verify_with_missing_crl(self):
"""
@@ -3840,11 +3898,11 @@ class TestCRL(object):
store_ctx = X509StoreContext(store, self.intermediate_server_cert)
with pytest.raises(X509StoreContextError) as err:
store_ctx.verify_certificate()
- assert err.value.args[0][2] == "unable to get certificate CRL"
+ assert str(err.value) == "unable to get certificate CRL"
assert err.value.certificate.get_subject().CN == "intermediate-service"
def test_convert_from_cryptography(self):
- crypto_crl = x509.load_pem_x509_crl(crlData, backend)
+ crypto_crl = x509.load_pem_x509_crl(crlData)
crl = CRL.from_cryptography(crypto_crl)
assert isinstance(crl, CRL)
@@ -3858,7 +3916,7 @@ class TestCRL(object):
assert isinstance(crypto_crl, x509.CertificateRevocationList)
-class TestX509StoreContext(object):
+class TestX509StoreContext:
"""
Tests for `OpenSSL.crypto.X509StoreContext`.
"""
@@ -4051,7 +4109,11 @@ class TestX509StoreContext(object):
with pytest.raises(X509StoreContextError) as exc:
store_ctx.verify_certificate()
- assert exc.value.args[0][2] == "self signed certificate"
+ # OpenSSL 1.1.x and 3.0.x have different error messages
+ assert str(exc.value) in [
+ "self signed certificate",
+ "self-signed certificate",
+ ]
assert exc.value.certificate.get_subject().CN == "Testing Root CA"
def test_invalid_chain_no_root(self):
@@ -4066,7 +4128,7 @@ class TestX509StoreContext(object):
with pytest.raises(X509StoreContextError) as exc:
store_ctx.verify_certificate()
- assert exc.value.args[0][2] == "unable to get issuer certificate"
+ assert str(exc.value) == "unable to get issuer certificate"
assert exc.value.certificate.get_subject().CN == "intermediate"
def test_invalid_chain_no_intermediate(self):
@@ -4081,7 +4143,7 @@ class TestX509StoreContext(object):
with pytest.raises(X509StoreContextError) as exc:
store_ctx.verify_certificate()
- assert exc.value.args[0][2] == "unable to get local issuer certificate"
+ assert str(exc.value) == "unable to get local issuer certificate"
assert exc.value.certificate.get_subject().CN == "intermediate-service"
def test_modification_pre_verify(self):
@@ -4099,7 +4161,7 @@ class TestX509StoreContext(object):
with pytest.raises(X509StoreContextError) as exc:
store_ctx.verify_certificate()
- assert exc.value.args[0][2] == "unable to get issuer certificate"
+ assert str(exc.value) == "unable to get issuer certificate"
assert exc.value.certificate.get_subject().CN == "intermediate"
store_ctx.set_store(store_good)
@@ -4124,7 +4186,7 @@ class TestX509StoreContext(object):
with pytest.raises(X509StoreContextError) as exc:
store_ctx.verify_certificate()
- assert exc.value.args[0][2] == "certificate has expired"
+ assert str(exc.value) == "certificate has expired"
def test_get_verified_chain(self):
"""
@@ -4158,7 +4220,7 @@ class TestX509StoreContext(object):
with pytest.raises(X509StoreContextError) as exc:
store_ctx.get_verified_chain()
- assert exc.value.args[0][2] == "unable to get issuer certificate"
+ assert str(exc.value) == "unable to get issuer certificate"
assert exc.value.certificate.get_subject().CN == "intermediate"
@pytest.fixture
@@ -4223,10 +4285,23 @@ class TestX509StoreContext(object):
with pytest.raises(X509StoreContextError) as exc:
store_ctx.verify_certificate()
- assert exc.value.args[0][2] == "unable to get local issuer certificate"
+ assert str(exc.value) == "unable to get local issuer certificate"
+
+ def test_verify_with_partial_chain(self):
+ store = X509Store()
+ store.add_cert(self.intermediate_cert)
+
+ store_ctx = X509StoreContext(store, self.intermediate_server_cert)
+ with pytest.raises(X509StoreContextError):
+ store_ctx.verify_certificate()
+
+ # Now set the partial verification flag for verification.
+ store.set_flags(X509StoreFlags.PARTIAL_CHAIN)
+ store_ctx = X509StoreContext(store, self.intermediate_server_cert)
+ assert store_ctx.verify_certificate() is None
-class TestSignVerify(object):
+class TestSignVerify:
"""
Tests for `OpenSSL.crypto.sign` and `OpenSSL.crypto.verify`.
"""
@@ -4250,7 +4325,7 @@ class TestSignVerify(object):
# certificate unrelated to priv_key, used to trigger an error
bad_cert = load_certificate(FILETYPE_PEM, server_cert_pem)
- for digest in ["md5", "sha1"]:
+ for digest in ["md5", "sha1", "sha256"]:
sig = sign(priv_key, content, digest)
# Verify the signature of content, will throw an exception if
@@ -4289,7 +4364,7 @@ class TestSignVerify(object):
priv_key = load_privatekey(FILETYPE_PEM, root_key_pem)
cert = load_certificate(FILETYPE_PEM, root_cert_pem)
- for digest in ["md5", "sha1"]:
+ for digest in ["md5", "sha1", "sha256"]:
with pytest.warns(DeprecationWarning) as w:
simplefilter("always")
sig = sign(priv_key, content, digest)
@@ -4319,8 +4394,8 @@ class TestSignVerify(object):
)
priv_key = load_privatekey(FILETYPE_PEM, ec_root_key_pem)
cert = load_certificate(FILETYPE_PEM, ec_root_cert_pem)
- sig = sign(priv_key, content, "sha1")
- verify(cert, sig, content, "sha1")
+ sig = sign(priv_key, content, "sha256")
+ verify(cert, sig, content, "sha256")
def test_sign_nulls(self):
"""
@@ -4329,8 +4404,8 @@ class TestSignVerify(object):
content = b"Watch out! \0 Did you see it?"
priv_key = load_privatekey(FILETYPE_PEM, root_key_pem)
good_cert = load_certificate(FILETYPE_PEM, root_cert_pem)
- sig = sign(priv_key, content, "sha1")
- verify(good_cert, sig, content, "sha1")
+ sig = sign(priv_key, content, "sha256")
+ verify(good_cert, sig, content, "sha256")
def test_sign_with_large_key(self):
"""
@@ -4345,10 +4420,10 @@ class TestSignVerify(object):
)
priv_key = load_privatekey(FILETYPE_PEM, large_key_pem)
- sign(priv_key, content, "sha1")
+ sign(priv_key, content, "sha256")
-class TestEllipticCurve(object):
+class TestEllipticCurve:
"""
Tests for `_EllipticCurve`, `get_elliptic_curve`, and
`get_elliptic_curves`.
@@ -4375,7 +4450,7 @@ class TestEllipticCurve(object):
does not identify a supported curve.
"""
with pytest.raises(ValueError):
- get_elliptic_curve(u"this curve was just invented")
+ get_elliptic_curve("this curve was just invented")
def test_repr(self):
"""
@@ -4399,7 +4474,7 @@ class TestEllipticCurve(object):
curve._to_EC_KEY()
-class EllipticCurveFactory(object):
+class EllipticCurveFactory:
"""
A helper to get the names of two curves.
"""
@@ -4434,7 +4509,7 @@ class TestEllipticCurveEquality(EqualityTestsMixin):
return get_elliptic_curve(self.curve_factory.another_curve_name)
-class TestEllipticCurveHash(object):
+class TestEllipticCurveHash:
"""
Tests for `_EllipticCurve`'s implementation of hashing (thus use as
an item in a `dict` or `set`).
diff --git a/contrib/python/pyOpenSSL/py3/tests/test_debug.py b/contrib/python/pyOpenSSL/py3/tests/test_debug.py
index 2d62a3a56a1..4eeb3d00b31 100644
--- a/contrib/python/pyOpenSSL/py3/tests/test_debug.py
+++ b/contrib/python/pyOpenSSL/py3/tests/test_debug.py
@@ -1,5 +1,5 @@
-from OpenSSL.debug import _env_info
from OpenSSL import version
+from OpenSSL.debug import _env_info
def test_debug_info():
diff --git a/contrib/python/pyOpenSSL/py3/tests/test_rand.py b/contrib/python/pyOpenSSL/py3/tests/test_rand.py
index 763d71124ca..24f30939545 100644
--- a/contrib/python/pyOpenSSL/py3/tests/test_rand.py
+++ b/contrib/python/pyOpenSSL/py3/tests/test_rand.py
@@ -10,7 +10,7 @@ import pytest
from OpenSSL import rand
-class TestRand(object):
+class TestRand:
@pytest.mark.parametrize("args", [(b"foo", None), (None, 3)])
def test_add_wrong_args(self, args):
"""
diff --git a/contrib/python/pyOpenSSL/py3/tests/test_ssl.py b/contrib/python/pyOpenSSL/py3/tests/test_ssl.py
index 8bb5a035a77..fdedd7133a0 100644
--- a/contrib/python/pyOpenSSL/py3/tests/test_ssl.py
+++ b/contrib/python/pyOpenSSL/py3/tests/test_ssl.py
@@ -7,124 +7,125 @@ Unit tests for :mod:`OpenSSL.SSL`.
import datetime
import gc
+import select
import sys
import uuid
-
-from gc import collect, get_referrers
from errno import (
EAFNOSUPPORT,
ECONNREFUSED,
EINPROGRESS,
- EWOULDBLOCK,
EPIPE,
ESHUTDOWN,
+ EWOULDBLOCK,
)
-from sys import platform, getfilesystemencoding
-from socket import AF_INET, AF_INET6, MSG_PEEK, SHUT_RDWR, error, socket
+from gc import collect, get_referrers
from os import makedirs
from os.path import join
-from weakref import ref
+from socket import (
+ AF_INET,
+ AF_INET6,
+ MSG_PEEK,
+ SHUT_RDWR,
+ error,
+ socket,
+)
+from sys import getfilesystemencoding, platform
+from typing import Union
from warnings import simplefilter
-
-import flaky
-
-import pytest
-
-from pretend import raiser
-
-from six import PY2, text_type
+from weakref import ref
from cryptography import x509
-from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
+import flaky
-from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM
-from OpenSSL.crypto import PKey, X509, X509Extension, X509Store
-from OpenSSL.crypto import dump_privatekey, load_privatekey
-from OpenSSL.crypto import dump_certificate, load_certificate
-from OpenSSL.crypto import get_elliptic_curves
+from pretend import raiser
-from OpenSSL.SSL import (
- OPENSSL_VERSION_NUMBER,
- SSLEAY_VERSION,
- SSLEAY_CFLAGS,
- TLS_METHOD,
- TLS1_3_VERSION,
- TLS1_2_VERSION,
- TLS1_1_VERSION,
-)
-from OpenSSL.SSL import SSLEAY_PLATFORM, SSLEAY_DIR, SSLEAY_BUILT_ON
-from OpenSSL.SSL import SENT_SHUTDOWN, RECEIVED_SHUTDOWN
-from OpenSSL.SSL import (
- SSLv2_METHOD,
- SSLv3_METHOD,
- SSLv23_METHOD,
- TLSv1_METHOD,
- TLSv1_1_METHOD,
- TLSv1_2_METHOD,
-)
-from OpenSSL.SSL import OP_SINGLE_DH_USE, OP_NO_SSLv2, OP_NO_SSLv3
-from OpenSSL.SSL import (
- VERIFY_PEER,
- VERIFY_FAIL_IF_NO_PEER_CERT,
- VERIFY_CLIENT_ONCE,
- VERIFY_NONE,
-)
+import pytest
from OpenSSL import SSL
from OpenSSL.SSL import (
- SESS_CACHE_OFF,
- SESS_CACHE_CLIENT,
- SESS_CACHE_SERVER,
+ Connection,
+ Context,
+ DTLS_METHOD,
+ Error,
+ MODE_RELEASE_BUFFERS,
+ NO_OVERLAPPING_PROTOCOLS,
+ OPENSSL_VERSION_NUMBER,
+ OP_COOKIE_EXCHANGE,
+ OP_NO_COMPRESSION,
+ OP_NO_QUERY_MTU,
+ OP_NO_SSLv2,
+ OP_NO_SSLv3,
+ OP_NO_TICKET,
+ OP_SINGLE_DH_USE,
+ RECEIVED_SHUTDOWN,
+ SENT_SHUTDOWN,
SESS_CACHE_BOTH,
+ SESS_CACHE_CLIENT,
SESS_CACHE_NO_AUTO_CLEAR,
+ SESS_CACHE_NO_INTERNAL,
SESS_CACHE_NO_INTERNAL_LOOKUP,
SESS_CACHE_NO_INTERNAL_STORE,
- SESS_CACHE_NO_INTERNAL,
-)
-
-from OpenSSL.SSL import (
- Error,
+ SESS_CACHE_OFF,
+ SESS_CACHE_SERVER,
+ SSLEAY_BUILT_ON,
+ SSLEAY_CFLAGS,
+ SSLEAY_DIR,
+ SSLEAY_PLATFORM,
+ SSLEAY_VERSION,
+ SSL_CB_ACCEPT_EXIT,
+ SSL_CB_ACCEPT_LOOP,
+ SSL_CB_ALERT,
+ SSL_CB_CONNECT_EXIT,
+ SSL_CB_CONNECT_LOOP,
+ SSL_CB_EXIT,
+ SSL_CB_HANDSHAKE_DONE,
+ SSL_CB_HANDSHAKE_START,
+ SSL_CB_LOOP,
+ SSL_CB_READ,
+ SSL_CB_READ_ALERT,
+ SSL_CB_WRITE,
+ SSL_CB_WRITE_ALERT,
+ SSL_ST_ACCEPT,
+ SSL_ST_CONNECT,
+ SSL_ST_MASK,
+ SSLeay_version,
+ SSLv23_METHOD,
+ Session,
SysCallError,
+ TLS1_1_VERSION,
+ TLS1_2_VERSION,
+ TLS1_3_VERSION,
+ TLS_METHOD,
+ TLSv1_1_METHOD,
+ TLSv1_2_METHOD,
+ TLSv1_METHOD,
+ VERIFY_CLIENT_ONCE,
+ VERIFY_FAIL_IF_NO_PEER_CERT,
+ VERIFY_NONE,
+ VERIFY_PEER,
WantReadError,
WantWriteError,
ZeroReturnError,
+ _make_requires,
)
-from OpenSSL.SSL import Context, Session, Connection, SSLeay_version
-from OpenSSL.SSL import _make_requires
-
from OpenSSL._util import ffi as _ffi, lib as _lib
-
-from OpenSSL.SSL import (
- OP_NO_QUERY_MTU,
- OP_COOKIE_EXCHANGE,
- OP_NO_TICKET,
- OP_NO_COMPRESSION,
- MODE_RELEASE_BUFFERS,
- NO_OVERLAPPING_PROTOCOLS,
-)
-
-from OpenSSL.SSL import (
- SSL_ST_CONNECT,
- SSL_ST_ACCEPT,
- SSL_ST_MASK,
- SSL_CB_LOOP,
- SSL_CB_EXIT,
- SSL_CB_READ,
- SSL_CB_WRITE,
- SSL_CB_ALERT,
- SSL_CB_READ_ALERT,
- SSL_CB_WRITE_ALERT,
- SSL_CB_ACCEPT_LOOP,
- SSL_CB_ACCEPT_EXIT,
- SSL_CB_CONNECT_LOOP,
- SSL_CB_CONNECT_EXIT,
- SSL_CB_HANDSHAKE_START,
- SSL_CB_HANDSHAKE_DONE,
+from OpenSSL.crypto import (
+ FILETYPE_PEM,
+ PKey,
+ TYPE_RSA,
+ X509,
+ X509Extension,
+ X509Store,
+ dump_certificate,
+ dump_privatekey,
+ get_elliptic_curves,
+ load_certificate,
+ load_privatekey,
)
try:
@@ -142,15 +143,15 @@ try:
except ImportError:
OP_NO_TLSv1_3 = None
-from .util import WARNING_TYPE_EXPECTED, NON_ASCII, is_consistent_type
from .test_crypto import (
client_cert_pem,
client_key_pem,
- server_cert_pem,
- server_key_pem,
root_cert_pem,
root_key_pem,
+ server_cert_pem,
+ server_key_pem,
)
+from .util import NON_ASCII, WARNING_TYPE_EXPECTED, is_consistent_type
# openssl dhparam 2048 -out dh-2048.pem
@@ -166,9 +167,6 @@ i5s5yYK7a/0eWxxRr2qraYaUj8RwDpH9CwIBAg==
"""
-skip_if_py3 = pytest.mark.skipif(not PY2, reason="Python 2 only")
-
-
def socket_any_family():
try:
return socket(AF_INET)
@@ -197,7 +195,7 @@ def join_bytes_or_unicode(prefix, suffix):
return join(prefix, suffix)
# Otherwise, coerce suffix to the type of prefix.
- if isinstance(prefix, text_type):
+ if isinstance(prefix, str):
return join(prefix, suffix.decode(getfilesystemencoding()))
else:
return join(prefix, suffix.encode(getfilesystemencoding()))
@@ -221,6 +219,8 @@ def socket_pair():
client.setblocking(True)
server = port.accept()[0]
+ port.close()
+
# Let's pass some unencrypted data to make sure our socket connection is
# fine. Just one byte, so we don't have to worry about buffers getting
# filled up or fragmentation.
@@ -366,7 +366,7 @@ def interact_in_memory(client_conn, server_conn):
# Give the side a chance to generate some more bytes, or succeed.
try:
- data = read.recv(2 ** 16)
+ data = read.recv(2**16)
except WantReadError:
# It didn't succeed, so we'll hope it generated some output.
pass
@@ -407,7 +407,7 @@ def handshake_in_memory(client_conn, server_conn):
interact_in_memory(client_conn, server_conn)
-class TestVersion(object):
+class TestVersion:
"""
Tests for version information exposed by `OpenSSL.SSL.SSLeay_version` and
`OpenSSL.SSL.OPENSSL_VERSION_NUMBER`.
@@ -444,17 +444,15 @@ def ca_file(tmpdir):
"""
Create a valid PEM file with CA certificates and return the path.
"""
- key = rsa.generate_private_key(
- public_exponent=65537, key_size=2048, backend=default_backend()
- )
+ key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = key.public_key()
builder = x509.CertificateBuilder()
builder = builder.subject_name(
- x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org")])
+ x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "pyopenssl.org")])
)
builder = builder.issuer_name(
- x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org")])
+ x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "pyopenssl.org")])
)
one_day = datetime.timedelta(1, 0, 0)
builder = builder.not_valid_before(datetime.datetime.today() - one_day)
@@ -466,9 +464,7 @@ def ca_file(tmpdir):
critical=True,
)
- certificate = builder.sign(
- private_key=key, algorithm=hashes.SHA256(), backend=default_backend()
- )
+ certificate = builder.sign(private_key=key, algorithm=hashes.SHA256())
ca_file = tmpdir.join("test.pem")
ca_file.write_binary(
@@ -488,14 +484,14 @@ def context():
return Context(SSLv23_METHOD)
-class TestContext(object):
+class TestContext:
"""
Unit tests for `OpenSSL.SSL.Context`.
"""
@pytest.mark.parametrize(
"cipher_string",
- [b"hello world:AES128-SHA", u"hello world:AES128-SHA"],
+ [b"hello world:AES128-SHA", "hello world:AES128-SHA"],
)
def test_set_cipher_list(self, context, cipher_string):
"""
@@ -525,15 +521,20 @@ class TestContext(object):
"""
with pytest.raises(Error) as excinfo:
context.set_cipher_list(b"imaginary-cipher")
- assert excinfo.value.args == (
- [
- (
- "SSL routines",
- "SSL_CTX_set_cipher_list",
- "no cipher match",
- )
- ],
- )
+ assert excinfo.value.args[0][0] in [
+ # 1.1.x
+ (
+ "SSL routines",
+ "SSL_CTX_set_cipher_list",
+ "no cipher match",
+ ),
+ # 3.0.x
+ (
+ "SSL routines",
+ "",
+ "no cipher match",
+ ),
+ ]
def test_load_client_ca(self, context, ca_file):
"""
@@ -572,20 +573,27 @@ class TestContext(object):
with pytest.raises(Error) as e:
context.set_session_id(b"abc" * 1000)
- assert [
+ assert e.value.args[0][0] in [
+ # 1.1.x
(
"SSL routines",
"SSL_CTX_set_session_id_context",
"ssl session id context too long",
- )
- ] == e.value.args[0]
+ ),
+ # 3.0.x
+ (
+ "SSL routines",
+ "",
+ "ssl session id context too long",
+ ),
+ ]
def test_set_session_id_unicode(self, context):
"""
`Context.set_session_id` raises a warning if a unicode string is
passed.
"""
- pytest.deprecated_call(context.set_session_id, u"abc")
+ pytest.deprecated_call(context.set_session_id, "abc")
def test_method(self):
"""
@@ -597,7 +605,7 @@ class TestContext(object):
for meth in methods:
Context(meth)
- maybe = [SSLv2_METHOD, SSLv3_METHOD, TLSv1_1_METHOD, TLSv1_2_METHOD]
+ maybe = [TLSv1_1_METHOD, TLSv1_2_METHOD]
for meth in maybe:
try:
Context(meth)
@@ -609,7 +617,7 @@ class TestContext(object):
with pytest.raises(TypeError):
Context("")
with pytest.raises(ValueError):
- Context(10)
+ Context(13)
def test_type(self):
"""
@@ -617,17 +625,6 @@ class TestContext(object):
"""
assert is_consistent_type(Context, "Context", TLSv1_METHOD)
- def test_use_privatekey(self):
- """
- `Context.use_privatekey` takes an `OpenSSL.crypto.PKey` instance.
- """
- key = PKey()
- key.generate_key(TYPE_RSA, 1024)
- ctx = Context(SSLv23_METHOD)
- ctx.use_privatekey(key)
- with pytest.raises(TypeError):
- ctx.use_privatekey("")
-
def test_use_privatekey_file_missing(self, tmpfile):
"""
`Context.use_privatekey_file` raises `OpenSSL.SSL.Error` when passed
@@ -681,37 +678,6 @@ class TestContext(object):
FILETYPE_PEM,
)
- def test_use_certificate_wrong_args(self):
- """
- `Context.use_certificate_wrong_args` raises `TypeError` when not passed
- exactly one `OpenSSL.crypto.X509` instance as an argument.
- """
- ctx = Context(SSLv23_METHOD)
- with pytest.raises(TypeError):
- ctx.use_certificate("hello, world")
-
- def test_use_certificate_uninitialized(self):
- """
- `Context.use_certificate` raises `OpenSSL.SSL.Error` when passed a
- `OpenSSL.crypto.X509` instance which has not been initialized
- (ie, which does not actually have any certificate data).
- """
- ctx = Context(SSLv23_METHOD)
- with pytest.raises(Error):
- ctx.use_certificate(X509())
-
- def test_use_certificate(self):
- """
- `Context.use_certificate` sets the certificate which will be
- used to identify connections created using the context.
- """
- # TODO
- # Hard to assert anything. But we could set a privatekey then ask
- # OpenSSL if the cert and key agree using check_privatekey. Then as
- # long as check_privatekey works right we're good...
- ctx = Context(SSLv23_METHOD)
- ctx.use_certificate(load_certificate(FILETYPE_PEM, root_cert_pem))
-
def test_use_certificate_file_wrong_args(self):
"""
`Context.use_certificate_file` raises `TypeError` if the first
@@ -1211,8 +1177,8 @@ class TestContext(object):
def test_fallback_default_verify_paths(self, monkeypatch):
"""
Test that we load certificates successfully on linux from the fallback
- path. To do this we set the _CRYPTOGRAPHY_MANYLINUX1_CA_FILE and
- _CRYPTOGRAPHY_MANYLINUX1_CA_DIR vars to be equal to whatever the
+ path. To do this we set the _CRYPTOGRAPHY_MANYLINUX_CA_FILE and
+ _CRYPTOGRAPHY_MANYLINUX_CA_DIR vars to be equal to whatever the
current OpenSSL default is and we disable
SSL_CTX_SET_default_verify_paths so that it can't find certs unless
it loads via fallback.
@@ -1223,12 +1189,12 @@ class TestContext(object):
)
monkeypatch.setattr(
SSL,
- "_CRYPTOGRAPHY_MANYLINUX1_CA_FILE",
+ "_CRYPTOGRAPHY_MANYLINUX_CA_FILE",
_ffi.string(_lib.X509_get_default_cert_file()),
)
monkeypatch.setattr(
SSL,
- "_CRYPTOGRAPHY_MANYLINUX1_CA_DIR",
+ "_CRYPTOGRAPHY_MANYLINUX_CA_DIR",
_ffi.string(_lib.X509_get_default_cert_dir()),
)
context.set_default_verify_paths()
@@ -1332,20 +1298,20 @@ class TestContext(object):
"""
serverSocket, clientSocket = socket_pair()
- server = Connection(serverContext, serverSocket)
- server.set_accept_state()
+ with serverSocket, clientSocket:
+ server = Connection(serverContext, serverSocket)
+ server.set_accept_state()
- client = Connection(clientContext, clientSocket)
- client.set_connect_state()
+ client = Connection(clientContext, clientSocket)
+ client.set_connect_state()
- # Make them talk to each other.
- # interact_in_memory(client, server)
- for _ in range(3):
- for s in [client, server]:
- try:
- s.do_handshake()
- except WantReadError:
- pass
+ # Make them talk to each other.
+ for _ in range(3):
+ for s in [client, server]:
+ try:
+ s.do_handshake()
+ except WantReadError:
+ select.select([client, server], [], [])
def test_set_verify_callback_connection_argument(self):
"""
@@ -1361,7 +1327,7 @@ class TestContext(object):
)
serverConnection = Connection(serverContext, None)
- class VerifyCallback(object):
+ class VerifyCallback:
def callback(self, connection, *args):
self.connection = connection
return 1
@@ -1705,7 +1671,7 @@ class TestContext(object):
"""
context = Context(SSLv23_METHOD)
for curve in get_elliptic_curves():
- if curve.name.startswith(u"Oakley-"):
+ if curve.name.startswith("Oakley-"):
# Setting Oakley-EC2N-4 and Oakley-EC2N-3 adds
# ('bignum routines', 'BN_mod_inverse', 'no inverse') to the
# error queue on OpenSSL 1.0.2.
@@ -1751,7 +1717,7 @@ class TestContext(object):
"""
context = Context(SSLv23_METHOD)
with pytest.raises(TypeError):
- context.set_tlsext_use_srtp(text_type("SRTP_AES128_CM_SHA1_80"))
+ context.set_tlsext_use_srtp(str("SRTP_AES128_CM_SHA1_80"))
def test_set_tlsext_use_srtp_invalid_profile(self):
"""
@@ -1773,7 +1739,7 @@ class TestContext(object):
assert context.set_tlsext_use_srtp(b"SRTP_AES128_CM_SHA1_80") is None
-class TestServerNameCallback(object):
+class TestServerNameCallback:
"""
Tests for `Context.set_tlsext_servername_callback` and its
interaction with `Connection`.
@@ -1882,7 +1848,7 @@ class TestServerNameCallback(object):
assert args == [(server, b"foo1.example.com")]
-class TestApplicationLayerProtoNegotiation(object):
+class TestApplicationLayerProtoNegotiation:
"""
Tests for ALPN in PyOpenSSL.
"""
@@ -1927,14 +1893,13 @@ class TestApplicationLayerProtoNegotiation(object):
assert server.get_alpn_proto_negotiated() == b"spdy/2"
assert client.get_alpn_proto_negotiated() == b"spdy/2"
- @pytest.mark.xfail(reason='https://github.com/pyca/pyopenssl/issues/1043')
def test_alpn_call_failure(self):
"""
SSL_CTX_set_alpn_protos does not like to be called with an empty
protocols list. Ensure that we produce a user-visible error.
"""
context = Context(SSLv23_METHOD)
- with pytest.raises(Error):
+ with pytest.raises(ValueError):
context.set_alpn_protos([])
def test_alpn_set_on_connection(self):
@@ -2066,7 +2031,7 @@ class TestApplicationLayerProtoNegotiation(object):
def invalid_cb(conn, options):
invalid_cb_args.append((conn, options))
- return u"can't return unicode"
+ return "can't return unicode"
client_context = Context(SSLv23_METHOD)
client_context.set_alpn_protos([b"http/1.1", b"spdy/2"])
@@ -2163,7 +2128,7 @@ class TestApplicationLayerProtoNegotiation(object):
assert select_args == [(server, [b"http/1.1", b"spdy/2"])]
-class TestSession(object):
+class TestSession:
"""
Unit tests for :py:obj:`OpenSSL.SSL.Session`.
"""
@@ -2177,7 +2142,77 @@ class TestSession(object):
assert isinstance(new_session, Session)
-class TestConnection(object):
[email protected](params=["context", "connection"])
+def ctx_or_conn(request) -> Union[Context, Connection]:
+ ctx = Context(SSLv23_METHOD)
+ if request.param == "context":
+ return ctx
+ else:
+ return Connection(ctx, None)
+
+
+class TestContextConnection:
+ """
+ Unit test for methods that are exposed both by Connection and Context
+ objects.
+ """
+
+ def test_use_privatekey(self, ctx_or_conn):
+ """
+ `use_privatekey` takes an `OpenSSL.crypto.PKey` instance.
+ """
+ key = PKey()
+ key.generate_key(TYPE_RSA, 1024)
+
+ ctx_or_conn.use_privatekey(key)
+ with pytest.raises(TypeError):
+ ctx_or_conn.use_privatekey("")
+
+ def test_use_privatekey_wrong_key(self, ctx_or_conn):
+ """
+ `use_privatekey` raises `OpenSSL.SSL.Error` when passed a
+ `OpenSSL.crypto.PKey` instance which has not been initialized.
+ """
+ key = PKey()
+ key.generate_key(TYPE_RSA, 1024)
+ ctx_or_conn.use_certificate(
+ load_certificate(FILETYPE_PEM, root_cert_pem)
+ )
+ with pytest.raises(Error):
+ ctx_or_conn.use_privatekey(key)
+
+ def test_use_certificate(self, ctx_or_conn):
+ """
+ `use_certificate` sets the certificate which will be
+ used to identify connections created using the context.
+ """
+ # TODO
+ # Hard to assert anything. But we could set a privatekey then ask
+ # OpenSSL if the cert and key agree using check_privatekey. Then as
+ # long as check_privatekey works right we're good...
+ ctx_or_conn.use_certificate(
+ load_certificate(FILETYPE_PEM, root_cert_pem)
+ )
+
+ def test_use_certificate_wrong_args(self, ctx_or_conn):
+ """
+ `use_certificate_wrong_args` raises `TypeError` when not passed
+ exactly one `OpenSSL.crypto.X509` instance as an argument.
+ """
+ with pytest.raises(TypeError):
+ ctx_or_conn.use_certificate("hello, world")
+
+ def test_use_certificate_uninitialized(self, ctx_or_conn):
+ """
+ `use_certificate` raises `OpenSSL.SSL.Error` when passed a
+ `OpenSSL.crypto.X509` instance which has not been initialized
+ (ie, which does not actually have any certificate data).
+ """
+ with pytest.raises(Error):
+ ctx_or_conn.use_certificate(X509())
+
+
+class TestConnection:
"""
Unit tests for `OpenSSL.SSL.Connection`.
"""
@@ -2233,7 +2268,7 @@ class TestConnection(object):
connection.bio_write(b"xy")
connection.bio_write(bytearray(b"za"))
with pytest.warns(DeprecationWarning):
- connection.bio_write(u"deprecated")
+ connection.bio_write("deprecated")
def test_get_context(self):
"""
@@ -2286,10 +2321,8 @@ class TestConnection(object):
with pytest.raises(TypeError):
conn.set_tlsext_host_name(b"with\0null")
- if not PY2:
- # On Python 3.x, don't accidentally implicitly convert from text.
- with pytest.raises(TypeError):
- conn.set_tlsext_host_name(b"example.com".decode("ascii"))
+ with pytest.raises(TypeError):
+ conn.set_tlsext_host_name(b"example.com".decode("ascii"))
def test_pending(self):
"""
@@ -2629,6 +2662,52 @@ class TestConnection(object):
server = Connection(ctx, None)
assert None is server.get_verified_chain()
+ def test_set_verify_overrides_context(self):
+ context = Context(SSLv23_METHOD)
+ context.set_verify(VERIFY_PEER)
+ conn = Connection(context, None)
+ conn.set_verify(VERIFY_NONE)
+
+ assert context.get_verify_mode() == VERIFY_PEER
+ assert conn.get_verify_mode() == VERIFY_NONE
+
+ with pytest.raises(TypeError):
+ conn.set_verify(None)
+
+ with pytest.raises(TypeError):
+ conn.set_verify(VERIFY_PEER, "not a callable")
+
+ def test_set_verify_callback_reference(self):
+ """
+ The callback for certificate verification should only be forgotten if
+ the context and all connections created by it do not use it anymore.
+ """
+
+ def callback(conn, cert, errnum, depth, ok): # pragma: no cover
+ return ok
+
+ tracker = ref(callback)
+
+ context = Context(SSLv23_METHOD)
+ context.set_verify(VERIFY_PEER, callback)
+ del callback
+
+ conn = Connection(context, None)
+ context.set_verify(VERIFY_NONE)
+
+ collect()
+ collect()
+ assert tracker()
+
+ conn.set_verify(VERIFY_PEER, lambda conn, cert, errnum, depth, ok: ok)
+ collect()
+ collect()
+ callback = tracker()
+ if callback is not None: # pragma: nocover
+ referrers = get_referrers(callback)
+ if len(referrers) > 1:
+ pytest.fail("Some references remain: %r" % (referrers,))
+
def test_get_session_unconnected(self):
"""
`Connection.get_session` returns `None` when used with an object
@@ -2859,8 +2938,8 @@ class TestConnection(object):
client.get_cipher_name(),
)
- assert isinstance(server_cipher_name, text_type)
- assert isinstance(client_cipher_name, text_type)
+ assert isinstance(server_cipher_name, str)
+ assert isinstance(client_cipher_name, str)
assert server_cipher_name == client_cipher_name
@@ -2884,8 +2963,8 @@ class TestConnection(object):
client.get_cipher_version(),
)
- assert isinstance(server_cipher_version, text_type)
- assert isinstance(client_cipher_version, text_type)
+ assert isinstance(server_cipher_version, str)
+ assert isinstance(client_cipher_version, str)
assert server_cipher_version == client_cipher_version
@@ -2923,8 +3002,8 @@ class TestConnection(object):
client_protocol_version_name = client.get_protocol_version_name()
server_protocol_version_name = server.get_protocol_version_name()
- assert isinstance(server_protocol_version_name, text_type)
- assert isinstance(client_protocol_version_name, text_type)
+ assert isinstance(server_protocol_version_name, str)
+ assert isinstance(client_protocol_version_name, str)
assert server_protocol_version_name == client_protocol_version_name
@@ -2979,7 +3058,7 @@ class TestConnection(object):
assert 2 == len(data)
-class TestConnectionGetCipherList(object):
+class TestConnectionGetCipherList:
"""
Tests for `Connection.get_cipher_list`.
"""
@@ -3002,10 +3081,10 @@ class VeryLarge(bytes):
"""
def __len__(self):
- return 2 ** 31
+ return 2**31
-class TestConnectionSend(object):
+class TestConnectionSend:
"""
Tests for `Connection.send`.
"""
@@ -3067,20 +3146,8 @@ class TestConnectionSend(object):
assert count == 2
assert client.recv(2) == b"xy"
- @skip_if_py3
- def test_short_buffer(self):
- """
- When passed a buffer containing a small number of bytes,
- `Connection.send` transmits all of them and returns the number
- of bytes sent.
- """
- server, client = loopback()
- count = server.send(buffer(b"xy")) # noqa: F821
- assert count == 2
- assert client.recv(2) == b"xy"
-
@pytest.mark.skipif(
- sys.maxsize < 2 ** 31,
+ sys.maxsize < 2**31,
reason="sys.maxsize < 2**31 - test requires 64 bit",
)
def test_buf_too_large(self):
@@ -3103,7 +3170,7 @@ def _make_memoryview(size):
return memoryview(bytearray(size))
-class TestConnectionRecvInto(object):
+class TestConnectionRecvInto:
"""
Tests for `Connection.recv_into`.
"""
@@ -3226,7 +3293,7 @@ class TestConnectionRecvInto(object):
self._doesnt_overfill_test(_make_memoryview)
-class TestConnectionSendall(object):
+class TestConnectionSendall:
"""
Tests for `Connection.sendall`.
"""
@@ -3274,17 +3341,6 @@ class TestConnectionSendall(object):
server.sendall(memoryview(b"x"))
assert client.recv(1) == b"x"
- @skip_if_py3
- def test_short_buffers(self):
- """
- When passed a buffer containing a small number of bytes,
- `Connection.sendall` transmits all of them.
- """
- server, client = loopback()
- count = server.sendall(buffer(b"xy")) # noqa: F821
- assert count == 2
- assert client.recv(2) == b"xy"
-
def test_long(self):
"""
`Connection.sendall` transmits all the bytes in the string passed to it
@@ -3319,7 +3375,7 @@ class TestConnectionSendall(object):
assert err.value.args[0] == EPIPE
-class TestConnectionRenegotiate(object):
+class TestConnectionRenegotiate:
"""
Tests for SSL renegotiation APIs.
"""
@@ -3363,7 +3419,7 @@ class TestConnectionRenegotiate(object):
pass
-class TestError(object):
+class TestError:
"""
Unit tests for `OpenSSL.SSL.Error`.
"""
@@ -3376,7 +3432,7 @@ class TestError(object):
assert Error.__name__ == "Error"
-class TestConstants(object):
+class TestConstants:
"""
Tests for the values of constants exposed in `OpenSSL.SSL`.
@@ -3493,7 +3549,7 @@ class TestConstants(object):
assert 0x300 == SESS_CACHE_NO_INTERNAL
-class TestMemoryBIO(object):
+class TestMemoryBIO:
"""
Tests for `OpenSSL.SSL.Connection` using a memory BIO.
"""
@@ -3666,7 +3722,7 @@ class TestMemoryBIO(object):
interact_in_memory(client, server)
- size = 2 ** 15
+ size = 2**15
sent = client.send(b"x" * size)
# Sanity check. We're trying to test what happens when the entire
# input can't be sent. If the entire input was sent, this test is
@@ -3917,7 +3973,7 @@ class TestMemoryBIO(object):
self._check_client_ca_list(set_replaces_add_ca)
-class TestInfoConstants(object):
+class TestInfoConstants:
"""
Tests for assorted constants exposed for use in info callbacks.
"""
@@ -3960,7 +4016,7 @@ class TestInfoConstants(object):
assert const is None or isinstance(const, int)
-class TestRequires(object):
+class TestRequires:
"""
Tests for the decorator factory used to conditionally raise
NotImplementedError when older OpenSSLs are used.
@@ -3999,7 +4055,7 @@ class TestRequires(object):
assert "Error text" in str(e.value)
-class TestOCSP(object):
+class TestOCSP:
"""
Tests for PyOpenSSL's OCSP stapling support.
"""
@@ -4243,3 +4299,188 @@ class TestOCSP(object):
with pytest.raises(TypeError):
handshake_in_memory(client, server)
+
+
+class TestDTLS:
+ # The way you would expect DTLSv1_listen to work is:
+ #
+ # - it reads packets in a loop
+ # - when it finds a valid ClientHello, it returns
+ # - now the handshake can proceed
+ #
+ # However, on older versions of OpenSSL, it did something "cleverer". The
+ # way it worked is:
+ #
+ # - it "peeks" into the BIO to see the next packet without consuming it
+ # - if *not* a valid ClientHello, then it reads the packet to consume it
+ # and loops around
+ # - if it *is* a valid ClientHello, it *leaves the packet in the BIO*, and
+ # returns
+ # - then the handshake finds the ClientHello in the BIO and reads it a
+ # second time.
+ #
+ # I'm not sure exactly when this switched over. The OpenSSL v1.1.1 in
+ # Ubuntu 18.04 has the old behavior. The OpenSSL v1.1.1 in Ubuntu 20.04 has
+ # the new behavior. There doesn't seem to be any mention of this change in
+ # the OpenSSL v1.1.1 changelog, but presumably it changed in some point
+ # release or another. Presumably in 2025 or so there will be only new
+ # OpenSSLs around we can delete this whole comment and the weird
+ # workaround. If anyone is still using this library by then, which seems
+ # both depressing and inevitable.
+ #
+ # Anyway, why do we care? The reason is that the old strategy has a
+ # problem: the "peek" operation is only defined on "DGRAM BIOs", which are
+ # a special type of object that is different from the more familiar "socket
+ # BIOs" and "memory BIOs". If you *don't* have a DGRAM BIO, and you try to
+ # peek into the BIO... then it silently degrades to a full-fledged "read"
+ # operation that consumes the packet. Which is a problem if your algorithm
+ # depends on leaving the packet in the BIO to be read again later.
+ #
+ # So on old OpenSSL, we have a problem:
+ #
+ # - we can't use a DGRAM BIO, because cryptography/pyopenssl don't wrap the
+ # relevant APIs, nor should they.
+ #
+ # - if we use a socket BIO, then the first time DTLSv1_listen sees an
+ # invalid packet (like for example... the challenge packet that *every
+ # DTLS handshake starts with before the real ClientHello!*), it tries to
+ # first "peek" it, and then "read" it. But since the first "peek"
+ # consumes the packet, the second "read" ends up hanging or consuming
+ # some unrelated packet, which is undesirable. So you can't even get to
+ # the handshake stage successfully.
+ #
+ # - if we use a memory BIO, then DTLSv1_listen works OK on invalid packets
+ # -- first the "peek" consumes them, and then it tries to "read" again to
+ # consume them, which fails immediately, and OpenSSL ignores the failure.
+ # So it works by accident. BUT, when we get a valid ClientHello, we have
+ # a problem: DTLSv1_listen tries to "peek" it and then leave it in the
+ # read BIO for do_handshake to consume. But instead "peek" consumes the
+ # packet, so it's not there where do_handshake is expecting it, and the
+ # handshake fails.
+ #
+ # Fortunately (if that's the word), we can work around the memory BIO
+ # problem. (Which is good, because in real life probably all our users will
+ # be using memory BIOs.) All we have to do is to save the valid ClientHello
+ # before calling DTLSv1_listen, and then after it returns we push *a second
+ # copy of it* of the packet memory BIO before calling do_handshake. This
+ # fakes out OpenSSL and makes it think the "peek" operation worked
+ # correctly, and we can go on with our lives.
+ #
+ # In fact, we push the second copy of the ClientHello unconditionally. On
+ # new versions of OpenSSL, this is unnecessary, but harmless, because the
+ # DTLS state machine treats it like a network hiccup that duplicated a
+ # packet, which DTLS is robust against.
+ def test_it_works_at_all(self):
+ # arbitrary number larger than any conceivable handshake volley
+ LARGE_BUFFER = 65536
+
+ s_ctx = Context(DTLS_METHOD)
+
+ def generate_cookie(ssl):
+ return b"xyzzy"
+
+ def verify_cookie(ssl, cookie):
+ return cookie == b"xyzzy"
+
+ s_ctx.set_cookie_generate_callback(generate_cookie)
+ s_ctx.set_cookie_verify_callback(verify_cookie)
+ s_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
+ s_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
+ s_ctx.set_options(OP_NO_QUERY_MTU)
+ s = Connection(s_ctx)
+ s.set_accept_state()
+
+ c_ctx = Context(DTLS_METHOD)
+ c_ctx.set_options(OP_NO_QUERY_MTU)
+ c = Connection(c_ctx)
+ c.set_connect_state()
+
+ # These are mandatory, because openssl can't guess the MTU for a memory
+ # bio and will produce a mysterious error if you make it try.
+ c.set_ciphertext_mtu(1500)
+ s.set_ciphertext_mtu(1500)
+
+ latest_client_hello = None
+
+ def pump_membio(label, source, sink):
+ try:
+ chunk = source.bio_read(LARGE_BUFFER)
+ except WantReadError:
+ return False
+ # I'm not sure this check is needed, but I'm not sure it's *not*
+ # needed either:
+ if not chunk: # pragma: no cover
+ return False
+ # Gross hack: if this is a ClientHello, save it so we can find it
+ # later. See giant comment above.
+ try:
+ # if ContentType == handshake and HandshakeType ==
+ # client_hello:
+ if chunk[0] == 22 and chunk[13] == 1:
+ nonlocal latest_client_hello
+ latest_client_hello = chunk
+ except IndexError: # pragma: no cover
+ pass
+ print(f"{label}: {chunk.hex()}")
+ sink.bio_write(chunk)
+ return True
+
+ def pump():
+ # Raises if there was no data to pump, to avoid infinite loops if
+ # we aren't making progress.
+ assert pump_membio("s -> c", s, c) or pump_membio("c -> s", c, s)
+
+ c_handshaking = True
+ s_listening = True
+ s_handshaking = False
+ first = True
+ while c_handshaking or s_listening or s_handshaking:
+ if not first:
+ pump()
+ first = False
+
+ if c_handshaking:
+ try:
+ c.do_handshake()
+ except WantReadError:
+ pass
+ else:
+ c_handshaking = False
+
+ if s_listening:
+ try:
+ s.DTLSv1_listen()
+ except WantReadError:
+ pass
+ else:
+ s_listening = False
+ s_handshaking = True
+ # Write the duplicate ClientHello. See giant comment above.
+ s.bio_write(latest_client_hello)
+
+ if s_handshaking:
+ try:
+ s.do_handshake()
+ except WantReadError:
+ pass
+ else:
+ s_handshaking = False
+
+ s.write(b"hello")
+ pump()
+ assert c.read(100) == b"hello"
+ c.write(b"goodbye")
+ pump()
+ assert s.read(100) == b"goodbye"
+
+ # Check that the MTU set/query functions are doing *something*
+ c.set_ciphertext_mtu(1000)
+ try:
+ assert 500 < c.get_cleartext_mtu() < 1000
+ except NotImplementedError: # OpenSSL 1.1.0 and earlier
+ pass
+ c.set_ciphertext_mtu(500)
+ try:
+ assert 0 < c.get_cleartext_mtu() < 500
+ except NotImplementedError: # OpenSSL 1.1.0 and earlier
+ pass
diff --git a/contrib/python/pyOpenSSL/py3/tests/test_util.py b/contrib/python/pyOpenSSL/py3/tests/test_util.py
index 6224448d433..e029d71a719 100644
--- a/contrib/python/pyOpenSSL/py3/tests/test_util.py
+++ b/contrib/python/pyOpenSSL/py3/tests/test_util.py
@@ -3,7 +3,7 @@ import pytest
from OpenSSL._util import exception_from_error_queue, lib
-class TestErrors(object):
+class TestErrors:
"""
Tests for handling of certain OpenSSL error cases.
"""
diff --git a/contrib/python/pyOpenSSL/py3/tests/util.py b/contrib/python/pyOpenSSL/py3/tests/util.py
index 75d2c8de80e..b4649e33992 100644
--- a/contrib/python/pyOpenSSL/py3/tests/util.py
+++ b/contrib/python/pyOpenSSL/py3/tests/util.py
@@ -6,8 +6,6 @@ Helpers for the OpenSSL test suite, largely copied from
U{Twisted<http://twistedmatrix.com/>}.
"""
-from six import PY2
-
# This is the UTF-8 encoding of the SNOWMAN unicode code point.
NON_ASCII = b"\xe2\x98\x83".decode("utf-8")
@@ -32,7 +30,7 @@ def is_consistent_type(theType, name, *constructionArgs):
return True
-class EqualityTestsMixin(object):
+class EqualityTestsMixin:
"""
A mixin defining tests for the standard implementation of C{==} and C{!=}.
"""
@@ -128,7 +126,7 @@ class EqualityTestsMixin(object):
operand if it is of an unrelated type.
"""
- class Delegate(object):
+ class Delegate:
def __eq__(self, other):
# Do something crazy and obvious.
return [self]
@@ -143,7 +141,7 @@ class EqualityTestsMixin(object):
operand if it is of an unrelated type.
"""
- class Delegate(object):
+ class Delegate:
def __ne__(self, other):
# Do something crazy and obvious.
return [self]
@@ -154,7 +152,4 @@ class EqualityTestsMixin(object):
# The type name expected in warnings about using the wrong string type.
-if PY2:
- WARNING_TYPE_EXPECTED = "unicode"
-else:
- WARNING_TYPE_EXPECTED = "str"
+WARNING_TYPE_EXPECTED = "str"
diff --git a/contrib/python/pyOpenSSL/py3/ya.make b/contrib/python/pyOpenSSL/py3/ya.make
index ff8a0f627a6..eeab889714e 100644
--- a/contrib/python/pyOpenSSL/py3/ya.make
+++ b/contrib/python/pyOpenSSL/py3/ya.make
@@ -4,13 +4,12 @@ PY3_LIBRARY()
SUBSCRIBER(g:python-contrib)
-VERSION(21.0.0)
+VERSION(23.0.0)
LICENSE(Apache-2.0)
PEERDIR(
- contrib/python/cryptography
- contrib/python/six
+ contrib/python/cryptography/py3
)
NO_LINT()