diff options
Diffstat (limited to 'contrib/python/pyOpenSSL')
19 files changed, 1536 insertions, 919 deletions
diff --git a/contrib/python/pyOpenSSL/py2/ya.make b/contrib/python/pyOpenSSL/py2/ya.make index 6c1e686c4f8..61ed4e98800 100644 --- a/contrib/python/pyOpenSSL/py2/ya.make +++ b/contrib/python/pyOpenSSL/py2/ya.make @@ -7,7 +7,7 @@ VERSION(21.0.0) LICENSE(Apache-2.0) PEERDIR( - contrib/python/cryptography + contrib/python/cryptography/py2 contrib/python/six ) diff --git a/contrib/python/pyOpenSSL/py3/.dist-info/METADATA b/contrib/python/pyOpenSSL/py3/.dist-info/METADATA index 43ea7f5813d..e4139673e38 100644 --- a/contrib/python/pyOpenSSL/py3/.dist-info/METADATA +++ b/contrib/python/pyOpenSSL/py3/.dist-info/METADATA @@ -1,35 +1,35 @@ Metadata-Version: 2.1 Name: pyOpenSSL -Version: 21.0.0 +Version: 23.0.0 Summary: Python wrapper module around the OpenSSL library Home-page: https://pyopenssl.org/ Author: The pyOpenSSL developers Author-email: [email protected] License: Apache License, Version 2.0 -Platform: UNKNOWN +Project-URL: Source, https://github.com/pyca/pyopenssl Classifier: Development Status :: 6 - Mature Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: Apache Software License Classifier: Operating System :: MacOS :: MacOS X Classifier: Operating System :: Microsoft :: Windows Classifier: Operating System :: POSIX -Classifier: Programming Language :: Python :: 2 -Classifier: Programming Language :: Python :: 2.7 Classifier: Programming Language :: Python :: 3 Classifier: Programming Language :: Python :: 3.6 Classifier: Programming Language :: Python :: 3.7 Classifier: Programming Language :: Python :: 3.8 Classifier: Programming Language :: Python :: 3.9 +Classifier: Programming Language :: Python :: 3.10 +Classifier: Programming Language :: Python :: 3.11 Classifier: Programming Language :: Python :: Implementation :: CPython Classifier: Programming Language :: Python :: Implementation :: PyPy Classifier: Topic :: Security :: Cryptography Classifier: Topic :: Software Development :: Libraries :: Python Modules Classifier: Topic :: System :: Networking -Requires-Python: >=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.* -Requires-Dist: cryptography (>=3.3) -Requires-Dist: six (>=1.5.2) +Requires-Python: >=3.6 +License-File: LICENSE +Requires-Dist: cryptography (<40,>=38.0.0) Provides-Extra: docs -Requires-Dist: sphinx ; extra == 'docs' +Requires-Dist: sphinx (!=5.2.0,!=5.2.0.post0) ; extra == 'docs' Requires-Dist: sphinx-rtd-theme ; extra == 'docs' Provides-Extra: test Requires-Dist: flaky ; extra == 'test' @@ -74,7 +74,7 @@ If you run into bugs, you can file them in our `issue tracker`_. We maintain a cryptography-dev_ mailing list for both user and development discussions. -You can also join ``#cryptography-dev`` on Freenode to ask questions or get involved. +You can also join ``#pyca`` on ``irc.libera.chat`` to ask questions or get involved. .. _documentation: https://pyopenssl.org/ @@ -87,7 +87,73 @@ You can also join ``#cryptography-dev`` on Freenode to ask questions or get invo Release Information =================== -21.0.0 (2020-09-28) +23.0.0 (2023-01-01) +------------------- + +Backward-incompatible changes: +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Deprecations: +^^^^^^^^^^^^^ + +Changes: +^^^^^^^^ + +- Add ``OpenSSL.SSL.X509StoreFlags.PARTIAL_CHAIN`` constant to allow for users + to perform certificate verification on partial certificate chains. + `#1166 <https://github.com/pyca/pyopenssl/pull/1166>`_ +- ``cryptography`` maximum version has been increased to 39.0.x. + +22.1.0 (2022-09-25) +------------------- + +Backward-incompatible changes: +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +- Remove support for SSLv2 and SSLv3. +- The minimum ``cryptography`` version is now 38.0.x (and we now pin releases + against ``cryptography`` major versions to prevent future breakage) +- The ``OpenSSL.crypto.X509StoreContextError`` exception has been refactored, + changing its internal attributes. + `#1133 <https://github.com/pyca/pyopenssl/pull/1133>`_ + +Deprecations: +^^^^^^^^^^^^^ + +- ``OpenSSL.SSL.SSLeay_version`` is deprecated in favor of + ``OpenSSL.SSL.OpenSSL_version``. The constants ``OpenSSL.SSL.SSLEAY_*`` are + deprecated in favor of ``OpenSSL.SSL.OPENSSL_*``. + +Changes: +^^^^^^^^ + +- Add ``OpenSSL.SSL.Connection.set_verify`` and ``OpenSSL.SSL.Connection.get_verify_mode`` + to override the context object's verification flags. + `#1073 <https://github.com/pyca/pyopenssl/pull/1073>`_ +- Add ``OpenSSL.SSL.Connection.use_certificate`` and ``OpenSSL.SSL.Connection.use_privatekey`` + to set a certificate per connection (and not just per context) `#1121 <https://github.com/pyca/pyopenssl/pull/1121>`_. + +22.0.0 (2022-01-29) +------------------- + +Backward-incompatible changes: +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +- Drop support for Python 2.7. + `#1047 <https://github.com/pyca/pyopenssl/pull/1047>`_ +- The minimum ``cryptography`` version is now 35.0. + +Deprecations: +^^^^^^^^^^^^^ + +Changes: +^^^^^^^^ + +- Expose wrappers for some `DTLS + <https://en.wikipedia.org/wiki/Datagram_Transport_Layer_Security>`_ + primitives. `#1026 <https://github.com/pyca/pyopenssl/pull/1026>`_ + +21.0.0 (2021-09-28) ------------------- Backward-incompatible changes: @@ -194,5 +260,3 @@ Changes: `Full changelog <https://pyopenssl.org/en/stable/changelog.html>`_. - - diff --git a/contrib/python/pyOpenSSL/py3/OpenSSL/SSL.py b/contrib/python/pyOpenSSL/py3/OpenSSL/SSL.py index e71b044cc02..dfbd1094e9d 100644 --- a/contrib/python/pyOpenSSL/py3/OpenSSL/SSL.py +++ b/contrib/python/pyOpenSSL/py3/OpenSSL/SSL.py @@ -1,12 +1,10 @@ import os import socket +from errno import errorcode +from functools import partial, wraps +from itertools import chain, count from sys import platform -from functools import wraps, partial -from itertools import count, chain from weakref import WeakValueDictionary -from errno import errorcode - -from six import integer_types, int2byte, indexbytes from OpenSSL._util import ( UNSPECIFIED as _UNSPECIFIED, @@ -14,19 +12,17 @@ from OpenSSL._util import ( ffi as _ffi, lib as _lib, make_assert as _make_assert, - native as _native, - path_string as _path_string, - text_to_bytes_and_warn as _text_to_bytes_and_warn, no_zero_allocator as _no_zero_allocator, + path_bytes as _path_bytes, + text_to_bytes_and_warn as _text_to_bytes_and_warn, ) - from OpenSSL.crypto import ( FILETYPE_PEM, - _PassphraseHelper, PKey, - X509Name, X509, + X509Name, X509Store, + _PassphraseHelper, ) __all__ = [ @@ -36,10 +32,13 @@ __all__ = [ "SSLEAY_PLATFORM", "SSLEAY_DIR", "SSLEAY_BUILT_ON", + "OPENSSL_VERSION", + "OPENSSL_CFLAGS", + "OPENSSL_PLATFORM", + "OPENSSL_DIR", + "OPENSSL_BUILT_ON", "SENT_SHUTDOWN", "RECEIVED_SHUTDOWN", - "SSLv2_METHOD", - "SSLv3_METHOD", "SSLv23_METHOD", "TLSv1_METHOD", "TLSv1_1_METHOD", @@ -47,6 +46,9 @@ __all__ = [ "TLS_METHOD", "TLS_SERVER_METHOD", "TLS_CLIENT_METHOD", + "DTLS_METHOD", + "DTLS_SERVER_METHOD", + "DTLS_CLIENT_METHOD", "SSL3_VERSION", "TLS1_VERSION", "TLS1_1_VERSION", @@ -57,7 +59,6 @@ __all__ = [ "OP_NO_TLSv1", "OP_NO_TLSv1_1", "OP_NO_TLSv1_2", - "OP_NO_TLSv1_3", "MODE_RELEASE_BUFFERS", "OP_SINGLE_DH_USE", "OP_SINGLE_ECDH_USE", @@ -124,26 +125,17 @@ __all__ = [ "Connection", ] -try: - _buffer = buffer -except NameError: - - class _buffer(object): - pass - OPENSSL_VERSION_NUMBER = _lib.OPENSSL_VERSION_NUMBER -SSLEAY_VERSION = _lib.SSLEAY_VERSION -SSLEAY_CFLAGS = _lib.SSLEAY_CFLAGS -SSLEAY_PLATFORM = _lib.SSLEAY_PLATFORM -SSLEAY_DIR = _lib.SSLEAY_DIR -SSLEAY_BUILT_ON = _lib.SSLEAY_BUILT_ON +OPENSSL_VERSION = SSLEAY_VERSION = _lib.OPENSSL_VERSION +OPENSSL_CFLAGS = SSLEAY_CFLAGS = _lib.OPENSSL_CFLAGS +OPENSSL_PLATFORM = SSLEAY_PLATFORM = _lib.OPENSSL_PLATFORM +OPENSSL_DIR = SSLEAY_DIR = _lib.OPENSSL_DIR +OPENSSL_BUILT_ON = SSLEAY_BUILT_ON = _lib.OPENSSL_BUILT_ON SENT_SHUTDOWN = _lib.SSL_SENT_SHUTDOWN RECEIVED_SHUTDOWN = _lib.SSL_RECEIVED_SHUTDOWN -SSLv2_METHOD = 1 -SSLv3_METHOD = 2 SSLv23_METHOD = 3 TLSv1_METHOD = 4 TLSv1_1_METHOD = 5 @@ -151,6 +143,9 @@ TLSv1_2_METHOD = 6 TLS_METHOD = 7 TLS_SERVER_METHOD = 8 TLS_CLIENT_METHOD = 9 +DTLS_METHOD = 10 +DTLS_SERVER_METHOD = 11 +DTLS_CLIENT_METHOD = 12 try: SSL3_VERSION = _lib.SSL3_VERSION @@ -174,6 +169,7 @@ OP_NO_TLSv1_1 = _lib.SSL_OP_NO_TLSv1_1 OP_NO_TLSv1_2 = _lib.SSL_OP_NO_TLSv1_2 try: OP_NO_TLSv1_3 = _lib.SSL_OP_NO_TLSv1_3 + __all__.append("OP_NO_TLSv1_3") except AttributeError: pass @@ -208,6 +204,18 @@ OP_NO_QUERY_MTU = _lib.SSL_OP_NO_QUERY_MTU OP_COOKIE_EXCHANGE = _lib.SSL_OP_COOKIE_EXCHANGE OP_NO_TICKET = _lib.SSL_OP_NO_TICKET +try: + OP_NO_RENEGOTIATION = _lib.SSL_OP_NO_RENEGOTIATION + __all__.append("OP_NO_RENEGOTIATION") +except AttributeError: + pass + +try: + OP_IGNORE_UNEXPECTED_EOF = _lib.SSL_OP_IGNORE_UNEXPECTED_EOF + __all__.append("OP_IGNORE_UNEXPECTED_EOF") +except AttributeError: + pass + OP_ALL = _lib.SSL_OP_ALL VERIFY_PEER = _lib.SSL_VERIFY_PEER @@ -257,8 +265,8 @@ _CERTIFICATE_PATH_LOCATIONS = [ # These values are compared to output from cffi's ffi.string so they must be # byte strings. -_CRYPTOGRAPHY_MANYLINUX1_CA_DIR = b"/opt/pyca/cryptography/openssl/certs" -_CRYPTOGRAPHY_MANYLINUX1_CA_FILE = b"/opt/pyca/cryptography/openssl/cert.pem" +_CRYPTOGRAPHY_MANYLINUX_CA_DIR = b"/opt/pyca/cryptography/openssl/certs" +_CRYPTOGRAPHY_MANYLINUX_CA_FILE = b"/opt/pyca/cryptography/openssl/cert.pem" class Error(Exception): @@ -291,7 +299,7 @@ class SysCallError(Error): pass -class _CallbackExceptionHelper(object): +class _CallbackExceptionHelper: """ A base class for wrapper classes that allow for intelligent exception handling in OpenSSL callbacks. @@ -381,7 +389,7 @@ class _ALPNSelectHelper(_CallbackExceptionHelper): instr = _ffi.buffer(in_, inlen)[:] protolist = [] while instr: - encoded_len = indexbytes(instr, 0) + encoded_len = instr[0] proto = instr[1 : encoded_len + 1] protolist.append(proto) instr = instr[encoded_len + 1 :] @@ -549,17 +557,59 @@ class _OCSPClientCallbackHelper(_CallbackExceptionHelper): self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper) +class _CookieGenerateCallbackHelper(_CallbackExceptionHelper): + def __init__(self, callback): + _CallbackExceptionHelper.__init__(self) + + @wraps(callback) + def wrapper(ssl, out, outlen): + try: + conn = Connection._reverse_mapping[ssl] + cookie = callback(conn) + out[0 : len(cookie)] = cookie + outlen[0] = len(cookie) + return 1 + except Exception as e: + self._problems.append(e) + # "a zero return value can be used to abort the handshake" + return 0 + + self.callback = _ffi.callback( + "int (*)(SSL *, unsigned char *, unsigned int *)", + wrapper, + ) + + +class _CookieVerifyCallbackHelper(_CallbackExceptionHelper): + def __init__(self, callback): + _CallbackExceptionHelper.__init__(self) + + @wraps(callback) + def wrapper(ssl, c_cookie, cookie_len): + try: + conn = Connection._reverse_mapping[ssl] + return callback(conn, bytes(c_cookie[0:cookie_len])) + except Exception as e: + self._problems.append(e) + return 0 + + self.callback = _ffi.callback( + "int (*)(SSL *, unsigned char *, unsigned int)", + wrapper, + ) + + def _asFileDescriptor(obj): fd = None - if not isinstance(obj, integer_types): + if not isinstance(obj, int): meth = getattr(obj, "fileno", None) if meth is not None: obj = meth() - if isinstance(obj, integer_types): + if isinstance(obj, int): fd = obj - if not isinstance(fd, integer_types): + if not isinstance(fd, int): raise TypeError("argument must be an int, or have a fileno() method.") elif fd < 0: raise ValueError( @@ -569,13 +619,16 @@ def _asFileDescriptor(obj): return fd -def SSLeay_version(type): +def OpenSSL_version(type): """ Return a string describing the version of OpenSSL in use. - :param type: One of the :const:`SSLEAY_` constants defined in this module. + :param type: One of the :const:`OPENSSL_` constants defined in this module. """ - return _ffi.string(_lib.SSLeay_version(type)) + return _ffi.string(_lib.OpenSSL_version(type)) + + +SSLeay_version = OpenSSL_version def _make_requires(flag, error): @@ -613,7 +666,7 @@ _requires_keylog = _make_requires( ) -class Session(object): +class Session: """ A class representing an SSL session. A session defines certain connection parameters which may be re-used to speed up the setup of subsequent @@ -625,39 +678,36 @@ class Session(object): pass -class Context(object): +class Context: """ :class:`OpenSSL.SSL.Context` instances define the parameters for setting up new SSL connections. - :param method: One of TLS_METHOD, TLS_CLIENT_METHOD, or TLS_SERVER_METHOD. + :param method: One of TLS_METHOD, TLS_CLIENT_METHOD, TLS_SERVER_METHOD, + DTLS_METHOD, DTLS_CLIENT_METHOD, or DTLS_SERVER_METHOD. SSLv23_METHOD, TLSv1_METHOD, etc. are deprecated and should not be used. """ _methods = { - SSLv2_METHOD: "SSLv2_method", - SSLv3_METHOD: "SSLv3_method", - SSLv23_METHOD: "SSLv23_method", - TLSv1_METHOD: "TLSv1_method", - TLSv1_1_METHOD: "TLSv1_1_method", - TLSv1_2_METHOD: "TLSv1_2_method", - TLS_METHOD: "TLS_method", - TLS_SERVER_METHOD: "TLS_server_method", - TLS_CLIENT_METHOD: "TLS_client_method", + SSLv23_METHOD: (_lib.TLS_method, None), + TLSv1_METHOD: (_lib.TLS_method, TLS1_VERSION), + TLSv1_1_METHOD: (_lib.TLS_method, TLS1_1_VERSION), + TLSv1_2_METHOD: (_lib.TLS_method, TLS1_2_VERSION), + TLS_METHOD: (_lib.TLS_method, None), + TLS_SERVER_METHOD: (_lib.TLS_server_method, None), + TLS_CLIENT_METHOD: (_lib.TLS_client_method, None), + DTLS_METHOD: (_lib.DTLS_method, None), + DTLS_SERVER_METHOD: (_lib.DTLS_server_method, None), + DTLS_CLIENT_METHOD: (_lib.DTLS_client_method, None), } - _methods = dict( - (identifier, getattr(_lib, name)) - for (identifier, name) in _methods.items() - if getattr(_lib, name, None) is not None - ) def __init__(self, method): - if not isinstance(method, integer_types): + if not isinstance(method, int): raise TypeError("method must be an integer") try: - method_func = self._methods[method] + method_func, version = self._methods[method] except KeyError: raise ValueError("No such protocol") @@ -668,12 +718,6 @@ class Context(object): _openssl_assert(context != _ffi.NULL) context = _ffi.gc(context, _lib.SSL_CTX_free) - # Set SSL_CTX_set_ecdh_auto so that the ECDH curve will be - # auto-selected. This function was added in 1.0.2 and made a noop in - # 1.1.0+ (where it is set automatically). - res = _lib.SSL_CTX_set_ecdh_auto(context, 1) - _openssl_assert(res == 1) - self._context = context self._passphrase_helper = None self._passphrase_callback = None @@ -689,8 +733,13 @@ class Context(object): self._ocsp_helper = None self._ocsp_callback = None self._ocsp_data = None + self._cookie_generate_helper = None + self._cookie_verify_helper = None self.set_mode(_lib.SSL_MODE_ENABLE_PARTIAL_WRITE) + if version is not None: + self.set_min_proto_version(version) + self.set_max_proto_version(version) def set_min_proto_version(self, version): """ @@ -737,12 +786,12 @@ class Context(object): if cafile is None: cafile = _ffi.NULL else: - cafile = _path_string(cafile) + cafile = _path_bytes(cafile) if capath is None: capath = _ffi.NULL else: - capath = _path_string(capath) + capath = _path_bytes(capath) load_result = _lib.SSL_CTX_load_verify_locations( self._context, cafile, capath @@ -828,8 +877,8 @@ class Context(object): # to the exact values we use in our manylinux1 builds. If they are # then we know to load the fallbacks if ( - default_dir == _CRYPTOGRAPHY_MANYLINUX1_CA_DIR - and default_file == _CRYPTOGRAPHY_MANYLINUX1_CA_FILE + default_dir == _CRYPTOGRAPHY_MANYLINUX_CA_DIR + and default_file == _CRYPTOGRAPHY_MANYLINUX_CA_FILE ): # This is manylinux1, let's load our fallback paths self._fallback_default_verify_paths( @@ -876,7 +925,7 @@ class Context(object): :return: None """ - certfile = _path_string(certfile) + certfile = _path_bytes(certfile) result = _lib.SSL_CTX_use_certificate_chain_file( self._context, certfile @@ -896,8 +945,8 @@ class Context(object): :return: None """ - certfile = _path_string(certfile) - if not isinstance(filetype, integer_types): + certfile = _path_bytes(certfile) + if not isinstance(filetype, int): raise TypeError("filetype must be an integer") use_result = _lib.SSL_CTX_use_certificate_file( @@ -913,6 +962,7 @@ class Context(object): :param cert: The X509 object :return: None """ + # Mirrored at Connection.use_certificate if not isinstance(cert, X509): raise TypeError("cert must be an X509 instance") @@ -954,11 +1004,11 @@ class Context(object): :return: None """ - keyfile = _path_string(keyfile) + keyfile = _path_bytes(keyfile) if filetype is _UNSPECIFIED: filetype = FILETYPE_PEM - elif not isinstance(filetype, integer_types): + elif not isinstance(filetype, int): raise TypeError("filetype must be an integer") use_result = _lib.SSL_CTX_use_PrivateKey_file( @@ -974,6 +1024,7 @@ class Context(object): :param pkey: The PKey object :return: None """ + # Mirrored at Connection.use_privatekey if not isinstance(pkey, PKey): raise TypeError("pkey must be a PKey instance") @@ -1035,7 +1086,7 @@ class Context(object): .. versionadded:: 0.14 """ - if not isinstance(mode, integer_types): + if not isinstance(mode, int): raise TypeError("mode must be an integer") return _lib.SSL_CTX_set_session_cache_mode(self._context, mode) @@ -1070,7 +1121,7 @@ class Context(object): See SSL_CTX_set_verify(3SSL) for further details. """ - if not isinstance(mode, integer_types): + if not isinstance(mode, int): raise TypeError("mode must be an integer") if callback is None: @@ -1093,7 +1144,7 @@ class Context(object): :param depth: An integer specifying the verify depth :return: None """ - if not isinstance(depth, integer_types): + if not isinstance(depth, int): raise TypeError("depth must be an integer") _lib.SSL_CTX_set_verify_depth(self._context, depth) @@ -1125,7 +1176,7 @@ class Context(object): :return: None """ - dhfile = _path_string(dhfile) + dhfile = _path_bytes(dhfile) bio = _lib.BIO_new_file(dhfile, b"r") if bio == _ffi.NULL: @@ -1253,7 +1304,7 @@ class Context(object): :param timeout: The timeout in (whole) seconds :return: The previous session timeout """ - if not isinstance(timeout, integer_types): + if not isinstance(timeout, int): raise TypeError("timeout must be an integer") return _lib.SSL_CTX_set_timeout(self._context, timeout) @@ -1356,7 +1407,7 @@ class Context(object): :param options: The options to add. :return: The new option bitmask. """ - if not isinstance(options, integer_types): + if not isinstance(options, int): raise TypeError("options must be an integer") return _lib.SSL_CTX_set_options(self._context, options) @@ -1369,7 +1420,7 @@ class Context(object): :param mode: The mode to add. :return: The new mode bitmask. """ - if not isinstance(mode, integer_types): + if not isinstance(mode, int): raise TypeError("mode must be an integer") return _lib.SSL_CTX_set_mode(self._context, mode) @@ -1423,10 +1474,16 @@ class Context(object): This list should be a Python list of bytestrings representing the protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. """ + # Different versions of OpenSSL are inconsistent about how they handle + # empty proto lists (see #1043), so we avoid the problem entirely by + # rejecting them ourselves. + if not protos: + raise ValueError("at least one protocol must be specified") + # Take the list of protocols and join them together, prefixing them # with their lengths. protostr = b"".join( - chain.from_iterable((int2byte(len(p)), p) for p in protos) + chain.from_iterable((bytes((len(p),)), p) for p in protos) ) # Build a C string from the list. We don't need to save this off @@ -1523,8 +1580,22 @@ class Context(object): helper = _OCSPClientCallbackHelper(callback) self._set_ocsp_callback(helper, data) + def set_cookie_generate_callback(self, callback): + self._cookie_generate_helper = _CookieGenerateCallbackHelper(callback) + _lib.SSL_CTX_set_cookie_generate_cb( + self._context, + self._cookie_generate_helper.callback, + ) + + def set_cookie_verify_callback(self, callback): + self._cookie_verify_helper = _CookieVerifyCallbackHelper(callback) + _lib.SSL_CTX_set_cookie_verify_cb( + self._context, + self._cookie_verify_helper.callback, + ) + -class Connection(object): +class Connection: _reverse_mapping = WeakValueDictionary() def __init__(self, context, socket=None): @@ -1560,6 +1631,10 @@ class Connection(object): self._verify_helper = context._verify_helper self._verify_callback = context._verify_callback + # And likewise for the cookie callbacks + self._cookie_generate_helper = context._cookie_generate_helper + self._cookie_verify_helper = context._cookie_verify_helper + self._reverse_mapping[self._ssl] = self if socket is None: @@ -1626,6 +1701,24 @@ class Connection(object): else: # TODO: This is untested. _raise_current_error() + elif error == _lib.SSL_ERROR_SSL and _lib.ERR_peek_error() != 0: + # In 3.0.x an unexpected EOF no longer triggers syscall error + # but we want to maintain compatibility so we check here and + # raise syscall if it is an EOF. Since we're not actually sure + # what else could raise SSL_ERROR_SSL we check for the presence + # of the OpenSSL 3 constant SSL_R_UNEXPECTED_EOF_WHILE_READING + # and if it's not present we just raise an error, which matches + # the behavior before we added this elif section + peeked_error = _lib.ERR_peek_error() + reason = _lib.ERR_GET_REASON(peeked_error) + if _lib.Cryptography_HAS_UNEXPECTED_EOF_WHILE_READING: + _openssl_assert( + reason == _lib.SSL_R_UNEXPECTED_EOF_WHILE_READING + ) + _lib.ERR_clear_error() + raise SysCallError(-1, "Unexpected EOF") + else: + _raise_current_error() elif error == _lib.SSL_ERROR_NONE: pass else: @@ -1668,6 +1761,94 @@ class Connection(object): return _ffi.string(name) + def set_verify(self, mode, callback=None): + """ + Override the Context object's verification flags for this specific + connection. See :py:meth:`Context.set_verify` for details. + """ + if not isinstance(mode, int): + raise TypeError("mode must be an integer") + + if callback is None: + self._verify_helper = None + self._verify_callback = None + _lib.SSL_set_verify(self._ssl, mode, _ffi.NULL) + else: + if not callable(callback): + raise TypeError("callback must be callable") + + self._verify_helper = _VerifyHelper(callback) + self._verify_callback = self._verify_helper.callback + _lib.SSL_set_verify(self._ssl, mode, self._verify_callback) + + def get_verify_mode(self): + """ + Retrieve the Connection object's verify mode, as set by + :meth:`set_verify`. + + :return: The verify mode + """ + return _lib.SSL_get_verify_mode(self._ssl) + + def use_certificate(self, cert): + """ + Load a certificate from a X509 object + + :param cert: The X509 object + :return: None + """ + # Mirrored from Context.use_certificate + if not isinstance(cert, X509): + raise TypeError("cert must be an X509 instance") + + use_result = _lib.SSL_use_certificate(self._ssl, cert._x509) + if not use_result: + _raise_current_error() + + def use_privatekey(self, pkey): + """ + Load a private key from a PKey object + + :param pkey: The PKey object + :return: None + """ + # Mirrored from Context.use_privatekey + if not isinstance(pkey, PKey): + raise TypeError("pkey must be a PKey instance") + + use_result = _lib.SSL_use_PrivateKey(self._ssl, pkey._pkey) + if not use_result: + self._context._raise_passphrase_exception() + + def set_ciphertext_mtu(self, mtu): + """ + For DTLS, set the maximum UDP payload size (*not* including IP/UDP + overhead). + + Note that you might have to set :data:`OP_NO_QUERY_MTU` to prevent + OpenSSL from spontaneously clearing this. + + :param mtu: An integer giving the maximum transmission unit. + + .. versionadded:: 21.1 + """ + _lib.SSL_set_mtu(self._ssl, mtu) + + def get_cleartext_mtu(self): + """ + For DTLS, get the maximum size of unencrypted data you can pass to + :meth:`write` without exceeding the MTU (as passed to + :meth:`set_ciphertext_mtu`). + + :return: The effective MTU as an integer. + + .. versionadded:: 21.1 + """ + + if not hasattr(_lib, "DTLS_get_data_mtu"): + raise NotImplementedError("requires OpenSSL 1.1.1 or better") + return _lib.DTLS_get_data_mtu(self._ssl) + def set_tlsext_host_name(self, name): """ Set the value of the servername extension to send in the client hello. @@ -1839,7 +2020,7 @@ class Connection(object): if self._from_ssl is None: raise TypeError("Connection sock was not None") - if not isinstance(bufsiz, integer_types): + if not isinstance(bufsiz, int): raise TypeError("bufsiz must be an integer") buf = _no_zero_allocator("char[]", bufsiz) @@ -1953,6 +2134,32 @@ class Connection(object): conn.set_accept_state() return (conn, addr) + def DTLSv1_listen(self): + """ + Call the OpenSSL function DTLSv1_listen on this connection. See the + OpenSSL manual for more details. + + :return: None + """ + # Possible future extension: return the BIO_ADDR in some form. + bio_addr = _lib.BIO_ADDR_new() + try: + result = _lib.DTLSv1_listen(self._ssl, bio_addr) + finally: + _lib.BIO_ADDR_free(bio_addr) + # DTLSv1_listen is weird. A zero return value means 'didn't find a + # ClientHello with valid cookie, but keep trying'. So basically + # WantReadError. But it doesn't work correctly with _raise_ssl_error. + # So we raise it manually instead. + if self._cookie_generate_helper is not None: + self._cookie_generate_helper.raise_if_problem() + if self._cookie_verify_helper is not None: + self._cookie_verify_helper.raise_if_problem() + if result == 0: + raise WantReadError() + if result < 0: + self._raise_ssl_error(self._ssl, result) + def bio_shutdown(self): """ If the Connection was created with a memory BIO, this method can be @@ -1994,7 +2201,7 @@ class Connection(object): result = _lib.SSL_get_cipher_list(self._ssl, i) if result == _ffi.NULL: break - ciphers.append(_native(_ffi.string(result))) + ciphers.append(_ffi.string(result).decode("utf-8")) return ciphers def get_client_ca_list(self): @@ -2070,7 +2277,7 @@ class Connection(object): :param state: bitvector of SENT_SHUTDOWN, RECEIVED_SHUTDOWN. :return: None """ - if not isinstance(state, integer_types): + if not isinstance(state, int): raise TypeError("state must be an integer") _lib.SSL_set_shutdown(self._ssl, state) @@ -2451,10 +2658,16 @@ class Connection(object): This list should be a Python list of bytestrings representing the protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. """ + # Different versions of OpenSSL are inconsistent about how they handle + # empty proto lists (see #1043), so we avoid the problem entirely by + # rejecting them ourselves. + if not protos: + raise ValueError("at least one protocol must be specified") + # Take the list of protocols and join them together, prefixing them # with their lengths. protostr = b"".join( - chain.from_iterable((int2byte(len(p)), p) for p in protos) + chain.from_iterable((bytes((len(p),)), p) for p in protos) ) # Build a C string from the list. We don't need to save this off @@ -2475,7 +2688,7 @@ class Connection(object): Get the protocol that was negotiated by ALPN. :returns: A bytestring of the protocol name. If no protocol has been - negotiated yet, returns an empty string. + negotiated yet, returns an empty bytestring. """ data = _ffi.new("unsigned char **") data_len = _ffi.new("unsigned int *") @@ -2498,8 +2711,3 @@ class Connection(object): self._ssl, _lib.TLSEXT_STATUSTYPE_ocsp ) _openssl_assert(rc == 1) - - -# This is similar to the initialization calls at the end of OpenSSL/crypto.py -# but is exercised mostly by the Context initializer. -_lib.SSL_library_init() diff --git a/contrib/python/pyOpenSSL/py3/OpenSSL/__init__.py b/contrib/python/pyOpenSSL/py3/OpenSSL/__init__.py index 11e896a4ea2..0af3acdb8fd 100644 --- a/contrib/python/pyOpenSSL/py3/OpenSSL/__init__.py +++ b/contrib/python/pyOpenSSL/py3/OpenSSL/__init__.py @@ -5,7 +5,7 @@ pyOpenSSL - A simple wrapper around the OpenSSL library """ -from OpenSSL import crypto, SSL +from OpenSSL import SSL, crypto from OpenSSL.version import ( __author__, __copyright__, diff --git a/contrib/python/pyOpenSSL/py3/OpenSSL/_util.py b/contrib/python/pyOpenSSL/py3/OpenSSL/_util.py index 53c0b9e573c..7a102e6c910 100644 --- a/contrib/python/pyOpenSSL/py3/OpenSSL/_util.py +++ b/contrib/python/pyOpenSSL/py3/OpenSSL/_util.py @@ -1,13 +1,13 @@ +import os import sys import warnings - -from six import PY2, text_type +from typing import Any, Callable, NoReturn, Type, Union from cryptography.hazmat.bindings.openssl.binding import Binding +StrOrBytesPath = Union[str, bytes, os.PathLike] binding = Binding() -binding.init_static_locks() ffi = binding.ffi lib = binding.lib @@ -18,7 +18,7 @@ lib = binding.lib no_zero_allocator = ffi.new_allocator(should_clear_after_alloc=False) -def text(charp): +def text(charp: Any) -> str: """ Get a native string type representing of the given CFFI ``char*`` object. @@ -28,10 +28,10 @@ def text(charp): """ if not charp: return "" - return native(ffi.string(charp)) + return ffi.string(charp).decode("utf-8") -def exception_from_error_queue(exception_type): +def exception_from_error_queue(exception_type: Type[Exception]) -> NoReturn: """ Convert an OpenSSL library failure into a Python exception. @@ -57,13 +57,13 @@ def exception_from_error_queue(exception_type): raise exception_type(errors) -def make_assert(error): +def make_assert(error: Type[Exception]) -> Callable[[bool], Any]: """ Create an assert function that uses :func:`exception_from_error_queue` to raise an exception wrapped by *error*. """ - def openssl_assert(ok): + def openssl_assert(ok: bool) -> None: """ If *ok* is not True, retrieve the error from OpenSSL and raise it. """ @@ -73,66 +73,35 @@ def make_assert(error): return openssl_assert -def native(s): +def path_bytes(s: StrOrBytesPath) -> bytes: """ - Convert :py:class:`bytes` or :py:class:`unicode` to the native - :py:class:`str` type, using UTF-8 encoding if conversion is necessary. - - :raise UnicodeError: The input string is not UTF-8 decodeable. + Convert a Python path to a :py:class:`bytes` for the path which can be + passed into an OpenSSL API accepting a filename. - :raise TypeError: The input is neither :py:class:`bytes` nor - :py:class:`unicode`. - """ - if not isinstance(s, (bytes, text_type)): - raise TypeError("%r is neither bytes nor unicode" % s) - if PY2: - if isinstance(s, text_type): - return s.encode("utf-8") - else: - if isinstance(s, bytes): - return s.decode("utf-8") - return s - - -def path_string(s): - """ - Convert a Python string to a :py:class:`bytes` string identifying the same - path and which can be passed into an OpenSSL API accepting a filename. - - :param s: An instance of :py:class:`bytes` or :py:class:`unicode`. + :param s: A path (valid for os.fspath). :return: An instance of :py:class:`bytes`. """ - if isinstance(s, bytes): - return s - elif isinstance(s, text_type): - return s.encode(sys.getfilesystemencoding()) - else: - raise TypeError("Path must be represented as bytes or unicode string") - - -if PY2: - - def byte_string(s): - return s + b = os.fspath(s) + if isinstance(b, str): + return b.encode(sys.getfilesystemencoding()) + else: + return b -else: - def byte_string(s): - return s.encode("charmap") +def byte_string(s: str) -> bytes: + return s.encode("charmap") # A marker object to observe whether some optional arguments are passed any # value or not. UNSPECIFIED = object() -_TEXT_WARNING = ( - text_type.__name__ + " for {0} is no longer accepted, use bytes" -) +_TEXT_WARNING = "str for {0} is no longer accepted, use bytes" -def text_to_bytes_and_warn(label, obj): +def text_to_bytes_and_warn(label: str, obj: Any) -> Any: """ If ``obj`` is text, emit a warning that it should be bytes instead and try to convert it to bytes automatically. @@ -145,7 +114,7 @@ def text_to_bytes_and_warn(label, obj): UTF-8 encoding of that text is returned. Otherwise, ``obj`` itself is returned. """ - if isinstance(obj, text_type): + if isinstance(obj, str): warnings.warn( _TEXT_WARNING.format(label), category=DeprecationWarning, diff --git a/contrib/python/pyOpenSSL/py3/OpenSSL/crypto.py b/contrib/python/pyOpenSSL/py3/OpenSSL/crypto.py index eda4af6f9d9..a9b673c53f5 100644 --- a/contrib/python/pyOpenSSL/py3/OpenSSL/crypto.py +++ b/contrib/python/pyOpenSSL/py3/OpenSSL/crypto.py @@ -1,31 +1,44 @@ import calendar import datetime - +import functools from base64 import b16encode from functools import partial -from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__ - -from six import ( - integer_types as _integer_types, - text_type as _text_type, - PY2 as _PY2, +from os import PathLike +from typing import ( + Any, + Callable, + Iterable, + List, + NoReturn, + Optional, + Sequence, + Set, + Tuple, + Type, + Union, ) from cryptography import utils, x509 -from cryptography.hazmat.primitives.asymmetric import dsa, rsa +from cryptography.hazmat.primitives.asymmetric import ( + dsa, + ec, + ed25519, + ed448, + rsa, +) from OpenSSL._util import ( + UNSPECIFIED as _UNSPECIFIED, + byte_string as _byte_string, + exception_from_error_queue as _exception_from_error_queue, ffi as _ffi, lib as _lib, - exception_from_error_queue as _exception_from_error_queue, - byte_string as _byte_string, - native as _native, - path_string as _path_string, - UNSPECIFIED as _UNSPECIFIED, - text_to_bytes_and_warn as _text_to_bytes_and_warn, make_assert as _make_assert, + path_bytes as _path_bytes, + text_to_bytes_and_warn as _text_to_bytes_and_warn, ) + __all__ = [ "FILETYPE_PEM", "FILETYPE_ASN1", @@ -65,16 +78,24 @@ __all__ = [ "load_pkcs12", ] -FILETYPE_PEM = _lib.SSL_FILETYPE_PEM -FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1 + +_Key = Union[ + dsa.DSAPrivateKey, dsa.DSAPublicKey, rsa.RSAPrivateKey, rsa.RSAPublicKey +] +StrOrBytesPath = Union[str, bytes, PathLike] +PassphraseCallableT = Union[bytes, Callable[..., bytes]] + + +FILETYPE_PEM: int = _lib.SSL_FILETYPE_PEM +FILETYPE_ASN1: int = _lib.SSL_FILETYPE_ASN1 # TODO This was an API mistake. OpenSSL has no such constant. -FILETYPE_TEXT = 2 ** 16 - 1 +FILETYPE_TEXT = 2**16 - 1 -TYPE_RSA = _lib.EVP_PKEY_RSA -TYPE_DSA = _lib.EVP_PKEY_DSA -TYPE_DH = _lib.EVP_PKEY_DH -TYPE_EC = _lib.EVP_PKEY_EC +TYPE_RSA: int = _lib.EVP_PKEY_RSA +TYPE_DSA: int = _lib.EVP_PKEY_DSA +TYPE_DH: int = _lib.EVP_PKEY_DH +TYPE_EC: int = _lib.EVP_PKEY_EC class Error(Exception): @@ -87,20 +108,7 @@ _raise_current_error = partial(_exception_from_error_queue, Error) _openssl_assert = _make_assert(Error) -def _get_backend(): - """ - Importing the backend from cryptography has the side effect of activating - the osrandom engine. This mutates the global state of OpenSSL in the - process and causes issues for various programs that use subinterpreters or - embed Python. By putting the import in this function we can avoid - triggering this side effect unless _get_backend is called. - """ - from cryptography.hazmat.backends.openssl.backend import backend - - return backend - - -def _untested_error(where): +def _untested_error(where: str) -> NoReturn: """ An OpenSSL API failed somehow. Additionally, the failure which was encountered isn't one that's exercised by the test suite so future behavior @@ -109,7 +117,7 @@ def _untested_error(where): raise RuntimeError("Unknown %s failure" % (where,)) -def _new_mem_buf(buffer=None): +def _new_mem_buf(buffer: Optional[bytes] = None) -> Any: """ Allocate a new OpenSSL memory BIO. @@ -126,7 +134,7 @@ def _new_mem_buf(buffer=None): bio = _lib.BIO_new_mem_buf(data, len(buffer)) # Keep the memory alive as long as the bio is alive! - def free(bio, ref=data): + def free(bio: Any, ref: Any = data) -> Any: return _lib.BIO_free(bio) _openssl_assert(bio != _ffi.NULL) @@ -135,7 +143,7 @@ def _new_mem_buf(buffer=None): return bio -def _bio_to_string(bio): +def _bio_to_string(bio: Any) -> bytes: """ Copy the contents of an OpenSSL BIO object into a Python byte string. """ @@ -144,7 +152,7 @@ def _bio_to_string(bio): return _ffi.buffer(result_buffer[0], buffer_length)[:] -def _set_asn1_time(boundary, when): +def _set_asn1_time(boundary: Any, when: bytes) -> None: """ The the time value of an ASN1 time object. @@ -160,13 +168,35 @@ def _set_asn1_time(boundary, when): """ if not isinstance(when, bytes): raise TypeError("when must be a byte string") + # ASN1_TIME_set_string validates the string without writing anything + # when the destination is NULL. + _openssl_assert(boundary != _ffi.NULL) set_result = _lib.ASN1_TIME_set_string(boundary, when) if set_result == 0: raise ValueError("Invalid string") -def _get_asn1_time(timestamp): +def _new_asn1_time(when: bytes) -> Any: + """ + Behaves like _set_asn1_time but returns a new ASN1_TIME object. + + @param when: A string representation of the desired time value. + + @raise TypeError: If C{when} is not a L{bytes} string. + @raise ValueError: If C{when} does not represent a time in the required + format. + @raise RuntimeError: If the time value cannot be set for some other + (unspecified) reason. + """ + ret = _lib.ASN1_TIME_new() + _openssl_assert(ret != _ffi.NULL) + ret = _ffi.gc(ret, _lib.ASN1_TIME_free) + _set_asn1_time(ret, when) + return ret + + +def _get_asn1_time(timestamp: Any) -> Optional[bytes]: """ Retrieve the time value of an ASN1 time object. @@ -182,7 +212,7 @@ def _get_asn1_time(timestamp): elif ( _lib.ASN1_STRING_type(string_timestamp) == _lib.V_ASN1_GENERALIZEDTIME ): - return _ffi.string(_lib.ASN1_STRING_data(string_timestamp)) + return _ffi.string(_lib.ASN1_STRING_get0_data(string_timestamp)) else: generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**") _lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp) @@ -201,26 +231,26 @@ def _get_asn1_time(timestamp): string_timestamp = _ffi.cast( "ASN1_STRING*", generalized_timestamp[0] ) - string_data = _lib.ASN1_STRING_data(string_timestamp) + string_data = _lib.ASN1_STRING_get0_data(string_timestamp) string_result = _ffi.string(string_data) _lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0]) return string_result -class _X509NameInvalidator(object): - def __init__(self): - self._names = [] +class _X509NameInvalidator: + def __init__(self) -> None: + self._names: List[X509Name] = [] - def add(self, name): + def add(self, name: "X509Name") -> None: self._names.append(name) - def clear(self): + def clear(self) -> None: for name in self._names: # Breaks the object, but also prevents UAF! del name._name -class PKey(object): +class PKey: """ A class representing an DSA or RSA public key or key pair. """ @@ -228,12 +258,12 @@ class PKey(object): _only_public = False _initialized = True - def __init__(self): + def __init__(self) -> None: pkey = _lib.EVP_PKEY_new() self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free) self._initialized = False - def to_cryptography_key(self): + def to_cryptography_key(self) -> _Key: """ Export as a ``cryptography`` key. @@ -249,16 +279,15 @@ class PKey(object): load_der_public_key, ) - backend = _get_backend() if self._only_public: der = dump_publickey(FILETYPE_ASN1, self) - return load_der_public_key(der, backend) + return load_der_public_key(der) else: der = dump_privatekey(FILETYPE_ASN1, self) - return load_der_private_key(der, None, backend) + return load_der_private_key(der, None) @classmethod - def from_cryptography_key(cls, crypto_key): + def from_cryptography_key(cls, crypto_key: _Key) -> "PKey": """ Construct based on a ``cryptography`` *crypto_key*. @@ -276,6 +305,9 @@ class PKey(object): rsa.RSAPrivateKey, dsa.DSAPublicKey, dsa.DSAPrivateKey, + ec.EllipticCurvePrivateKey, + ed25519.Ed25519PrivateKey, + ed448.Ed448PrivateKey, ), ): raise TypeError("Unsupported key type") @@ -300,7 +332,7 @@ class PKey(object): ) return load_privatekey(FILETYPE_ASN1, der) - def generate_key(self, type, bits): + def generate_key(self, type: int, bits: int) -> None: """ Generate a key pair of the given type, with the given number of bits. @@ -356,7 +388,7 @@ class PKey(object): self._initialized = True - def check(self): + def check(self) -> bool: """ Check the consistency of an RSA private key. @@ -373,7 +405,7 @@ class PKey(object): raise TypeError("public key only") if _lib.EVP_PKEY_type(self.type()) != _lib.EVP_PKEY_RSA: - raise TypeError("key type unsupported") + raise TypeError("Only RSA keys can currently be checked.") rsa = _lib.EVP_PKEY_get1_RSA(self._pkey) rsa = _ffi.gc(rsa, _lib.RSA_free) @@ -382,7 +414,7 @@ class PKey(object): return True _raise_current_error() - def type(self): + def type(self) -> int: """ Returns the type of the key @@ -390,7 +422,7 @@ class PKey(object): """ return _lib.EVP_PKEY_id(self._pkey) - def bits(self): + def bits(self) -> int: """ Returns the number of bits of the key @@ -399,7 +431,7 @@ class PKey(object): return _lib.EVP_PKEY_bits(self._pkey) -class _EllipticCurve(object): +class _EllipticCurve: """ A representation of a supported elliptic curve. @@ -411,21 +443,19 @@ class _EllipticCurve(object): _curves = None - if not _PY2: - # This only necessary on Python 3. Moreover, it is broken on Python 2. - def __ne__(self, other): - """ - Implement cooperation with the right-hand side argument of ``!=``. + def __ne__(self, other: Any) -> bool: + """ + Implement cooperation with the right-hand side argument of ``!=``. - Python 3 seems to have dropped this cooperation in this very narrow - circumstance. - """ - if isinstance(other, _EllipticCurve): - return super(_EllipticCurve, self).__ne__(other) - return NotImplemented + Python 3 seems to have dropped this cooperation in this very narrow + circumstance. + """ + if isinstance(other, _EllipticCurve): + return super(_EllipticCurve, self).__ne__(other) + return NotImplemented @classmethod - def _load_elliptic_curves(cls, lib): + def _load_elliptic_curves(cls, lib: Any) -> Set["_EllipticCurve"]: """ Get the curves supported by OpenSSL. @@ -443,7 +473,7 @@ class _EllipticCurve(object): return set(cls.from_nid(lib, c.nid) for c in builtin_curves) @classmethod - def _get_elliptic_curves(cls, lib): + def _get_elliptic_curves(cls, lib: Any) -> Set["_EllipticCurve"]: """ Get, cache, and return the curves supported by OpenSSL. @@ -457,7 +487,7 @@ class _EllipticCurve(object): return cls._curves @classmethod - def from_nid(cls, lib, nid): + def from_nid(cls, lib: Any, nid: int) -> "_EllipticCurve": """ Instantiate a new :py:class:`_EllipticCurve` associated with the given OpenSSL NID. @@ -473,7 +503,7 @@ class _EllipticCurve(object): """ return cls(lib, nid, _ffi.string(lib.OBJ_nid2sn(nid)).decode("ascii")) - def __init__(self, lib, nid, name): + def __init__(self, lib: Any, nid: int, name: str) -> None: """ :param _lib: The :py:mod:`cryptography` binding instance used to interface with OpenSSL. @@ -490,10 +520,10 @@ class _EllipticCurve(object): self._nid = nid self.name = name - def __repr__(self): + def __repr__(self) -> str: return "<Curve %r>" % (self.name,) - def _to_EC_KEY(self): + def _to_EC_KEY(self) -> Any: """ Create a new OpenSSL EC_KEY structure initialized to use this curve. @@ -504,7 +534,7 @@ class _EllipticCurve(object): return _ffi.gc(key, _lib.EC_KEY_free) -def get_elliptic_curves(): +def get_elliptic_curves() -> Set["_EllipticCurve"]: """ Return a set of objects representing the elliptic curves supported in the OpenSSL build in use. @@ -519,7 +549,7 @@ def get_elliptic_curves(): return _EllipticCurve._get_elliptic_curves(_lib) -def get_elliptic_curve(name): +def get_elliptic_curve(name: str) -> _EllipticCurve: """ Return a single curve object selected by name. @@ -537,7 +567,8 @@ def get_elliptic_curve(name): raise ValueError("unknown curve name", name) -class X509Name(object): [email protected]_ordering +class X509Name: """ An X.509 Distinguished Name. @@ -562,7 +593,7 @@ class X509Name(object): :ivar emailAddress: The e-mail address of the entity. """ - def __init__(self, name): + def __init__(self, name: "X509Name") -> None: """ Create a new X509Name, copying the given X509Name instance. @@ -570,9 +601,9 @@ class X509Name(object): :type name: :py:class:`X509Name` """ name = _lib.X509_NAME_dup(name._name) - self._name = _ffi.gc(name, _lib.X509_NAME_free) + self._name: Any = _ffi.gc(name, _lib.X509_NAME_free) - def __setattr__(self, name, value): + def __setattr__(self, name: str, value: Any) -> None: if name.startswith("_"): return super(X509Name, self).__setattr__(name, value) @@ -602,7 +633,7 @@ class X509Name(object): _lib.X509_NAME_ENTRY_free(ent) break - if isinstance(value, _text_type): + if isinstance(value, str): value = value.encode("utf-8") add_result = _lib.X509_NAME_add_entry_by_NID( @@ -611,7 +642,7 @@ class X509Name(object): if not add_result: _raise_current_error() - def __getattr__(self, name): + def __getattr__(self, name: str) -> Optional[str]: """ Find attribute. An X509Name object has the following attributes: countryName (alias C), stateOrProvince (alias ST), locality (alias L), @@ -629,7 +660,7 @@ class X509Name(object): _raise_current_error() except Error: pass - return super(X509Name, self).__getattr__(name) + raise AttributeError("No such attribute") entry_index = _lib.X509_NAME_get_index_by_NID(self._name, nid, -1) if entry_index == -1: @@ -651,25 +682,19 @@ class X509Name(object): _lib.OPENSSL_free(result_buffer[0]) return result - def _cmp(op): - def f(self, other): - if not isinstance(other, X509Name): - return NotImplemented - result = _lib.X509_NAME_cmp(self._name, other._name) - return op(result, 0) - - return f + def __eq__(self, other: Any) -> bool: + if not isinstance(other, X509Name): + return NotImplemented - __eq__ = _cmp(__eq__) - __ne__ = _cmp(__ne__) + return _lib.X509_NAME_cmp(self._name, other._name) == 0 - __lt__ = _cmp(__lt__) - __le__ = _cmp(__le__) + def __lt__(self, other: Any) -> bool: + if not isinstance(other, X509Name): + return NotImplemented - __gt__ = _cmp(__gt__) - __ge__ = _cmp(__ge__) + return _lib.X509_NAME_cmp(self._name, other._name) < 0 - def __repr__(self): + def __repr__(self) -> str: """ String representation of an X509Name """ @@ -680,10 +705,10 @@ class X509Name(object): _openssl_assert(format_result != _ffi.NULL) return "<X509Name object '%s'>" % ( - _native(_ffi.string(result_buffer)), + _ffi.string(result_buffer).decode("utf-8"), ) - def hash(self): + def hash(self) -> int: """ Return an integer representation of the first four bytes of the MD5 digest of the DER representation of the name. @@ -695,7 +720,7 @@ class X509Name(object): """ return _lib.X509_NAME_hash(self._name) - def der(self): + def der(self) -> bytes: """ Return the DER encoding of this name. @@ -710,7 +735,7 @@ class X509Name(object): _lib.OPENSSL_free(result_buffer[0]) return string_result - def get_components(self): + def get_components(self) -> List[Tuple[bytes, bytes]]: """ Returns the components of this name, as a sequence of 2-tuples. @@ -730,19 +755,26 @@ class X509Name(object): # ffi.string does not handle strings containing NULL bytes # (which may have been generated by old, broken software) value = _ffi.buffer( - _lib.ASN1_STRING_data(fval), _lib.ASN1_STRING_length(fval) + _lib.ASN1_STRING_get0_data(fval), _lib.ASN1_STRING_length(fval) )[:] result.append((_ffi.string(name), value)) return result -class X509Extension(object): +class X509Extension: """ An X.509 v3 certificate extension. """ - def __init__(self, type_name, critical, value, subject=None, issuer=None): + def __init__( + self, + type_name: bytes, + critical: bool, + value: bytes, + subject: Optional["X509"] = None, + issuer: Optional["X509"] = None, + ) -> None: """ Initializes an X509 extension. @@ -752,7 +784,8 @@ class X509Extension(object): :param bool critical: A flag indicating whether this is a critical extension. - :param value: The value of the extension. + :param value: The OpenSSL textual representation of the extension's + value. :type value: :py:data:`bytes` :param subject: Optional X509 certificate to use as subject. @@ -804,7 +837,7 @@ class X509Extension(object): self._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free) @property - def _nid(self): + def _nid(self) -> Any: return _lib.OBJ_obj2nid( _lib.X509_EXTENSION_get_object(self._extension) ) @@ -815,7 +848,7 @@ class X509Extension(object): _lib.GEN_URI: "URI", } - def _subjectAltNameString(self): + def _subjectAltNameString(self) -> str: names = _ffi.cast( "GENERAL_NAMES*", _lib.X509V3_EXT_d2i(self._extension) ) @@ -829,15 +862,15 @@ class X509Extension(object): except KeyError: bio = _new_mem_buf() _lib.GENERAL_NAME_print(bio, name) - parts.append(_native(_bio_to_string(bio))) + parts.append(_bio_to_string(bio).decode("utf-8")) else: - value = _native( - _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:] - ) + value = _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[ + : + ].decode("utf-8") parts.append(label + ":" + value) return ", ".join(parts) - def __str__(self): + def __str__(self) -> str: """ :return: a nice text representation of the extension """ @@ -848,9 +881,9 @@ class X509Extension(object): print_result = _lib.X509V3_EXT_print(bio, self._extension, 0, 0) _openssl_assert(print_result != 0) - return _native(_bio_to_string(bio)) + return _bio_to_string(bio).decode("utf-8") - def get_critical(self): + def get_critical(self) -> bool: """ Returns the critical field of this X.509 extension. @@ -858,7 +891,7 @@ class X509Extension(object): """ return _lib.X509_EXTENSION_get_critical(self._extension) - def get_short_name(self): + def get_short_name(self) -> bytes: """ Returns the short type name of this X.509 extension. @@ -873,7 +906,7 @@ class X509Extension(object): nid = _lib.OBJ_obj2nid(obj) return _ffi.string(_lib.OBJ_nid2sn(nid)) - def get_data(self): + def get_data(self) -> bytes: """ Returns the data of the X509 extension, encoded as ASN.1. @@ -884,23 +917,23 @@ class X509Extension(object): """ octet_result = _lib.X509_EXTENSION_get_data(self._extension) string_result = _ffi.cast("ASN1_STRING*", octet_result) - char_result = _lib.ASN1_STRING_data(string_result) + char_result = _lib.ASN1_STRING_get0_data(string_result) result_length = _lib.ASN1_STRING_length(string_result) return _ffi.buffer(char_result, result_length)[:] -class X509Req(object): +class X509Req: """ An X.509 certificate signing requests. """ - def __init__(self): + def __init__(self) -> None: req = _lib.X509_REQ_new() self._req = _ffi.gc(req, _lib.X509_REQ_free) # Default to version 0. self.set_version(0) - def to_cryptography(self): + def to_cryptography(self) -> x509.CertificateSigningRequest: """ Export as a ``cryptography`` certificate signing request. @@ -912,11 +945,12 @@ class X509Req(object): der = dump_certificate_request(FILETYPE_ASN1, self) - backend = _get_backend() - return load_der_x509_csr(der, backend) + return load_der_x509_csr(der) @classmethod - def from_cryptography(cls, crypto_req): + def from_cryptography( + cls, crypto_req: x509.CertificateSigningRequest + ) -> "X509Req": """ Construct based on a ``cryptography`` *crypto_req*. @@ -935,7 +969,7 @@ class X509Req(object): der = crypto_req.public_bytes(Encoding.DER) return load_certificate_request(FILETYPE_ASN1, der) - def set_pubkey(self, pkey): + def set_pubkey(self, pkey: PKey) -> None: """ Set the public key of the certificate signing request. @@ -947,7 +981,7 @@ class X509Req(object): set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey) _openssl_assert(set_result == 1) - def get_pubkey(self): + def get_pubkey(self) -> PKey: """ Get the public key of the certificate signing request. @@ -961,9 +995,9 @@ class X509Req(object): pkey._only_public = True return pkey - def set_version(self, version): + def set_version(self, version: int) -> None: """ - Set the version subfield (RFC 2459, section 4.1.2.1) of the certificate + Set the version subfield (RFC 2986, section 4.1) of the certificate request. :param int version: The version number. @@ -972,7 +1006,7 @@ class X509Req(object): set_result = _lib.X509_REQ_set_version(self._req, version) _openssl_assert(set_result == 1) - def get_version(self): + def get_version(self) -> int: """ Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate request. @@ -982,7 +1016,7 @@ class X509Req(object): """ return _lib.X509_REQ_get_version(self._req) - def get_subject(self): + def get_subject(self) -> X509Name: """ Return the subject of this certificate signing request. @@ -1004,7 +1038,7 @@ class X509Req(object): return name - def add_extensions(self, extensions): + def add_extensions(self, extensions: Iterable[X509Extension]) -> None: """ Add extensions to the certificate signing request. @@ -1027,7 +1061,7 @@ class X509Req(object): add_result = _lib.X509_REQ_add_extensions(self._req, stack) _openssl_assert(add_result == 1) - def get_extensions(self): + def get_extensions(self) -> List[X509Extension]: """ Get X.509 extensions in the certificate signing request. @@ -1055,15 +1089,15 @@ class X509Req(object): exts.append(ext) return exts - def sign(self, pkey, digest): + def sign(self, pkey: PKey, digest: str) -> None: """ Sign the certificate signing request with this key and digest type. :param pkey: The key pair to sign with. :type pkey: :py:class:`PKey` :param digest: The name of the message digest to use for the signature, - e.g. :py:data:`b"sha256"`. - :type digest: :py:class:`bytes` + e.g. :py:data:`"sha256"`. + :type digest: :py:class:`str` :return: ``None`` """ if pkey._only_public: @@ -1079,7 +1113,7 @@ class X509Req(object): sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj) _openssl_assert(sign_result > 0) - def verify(self, pkey): + def verify(self, pkey: PKey) -> bool: """ Verifies the signature on this certificate signing request. @@ -1101,12 +1135,12 @@ class X509Req(object): return result -class X509(object): +class X509: """ An X.509 certificate. """ - def __init__(self): + def __init__(self) -> None: x509 = _lib.X509_new() _openssl_assert(x509 != _ffi.NULL) self._x509 = _ffi.gc(x509, _lib.X509_free) @@ -1115,14 +1149,14 @@ class X509(object): self._subject_invalidator = _X509NameInvalidator() @classmethod - def _from_raw_x509_ptr(cls, x509): + def _from_raw_x509_ptr(cls, x509: Any) -> "X509": cert = cls.__new__(cls) cert._x509 = _ffi.gc(x509, _lib.X509_free) cert._issuer_invalidator = _X509NameInvalidator() cert._subject_invalidator = _X509NameInvalidator() return cert - def to_cryptography(self): + def to_cryptography(self) -> x509.Certificate: """ Export as a ``cryptography`` certificate. @@ -1133,11 +1167,10 @@ class X509(object): from cryptography.x509 import load_der_x509_certificate der = dump_certificate(FILETYPE_ASN1, self) - backend = _get_backend() - return load_der_x509_certificate(der, backend) + return load_der_x509_certificate(der) @classmethod - def from_cryptography(cls, crypto_cert): + def from_cryptography(cls, crypto_cert: x509.Certificate) -> "X509": """ Construct based on a ``cryptography`` *crypto_cert*. @@ -1156,7 +1189,7 @@ class X509(object): der = crypto_cert.public_bytes(Encoding.DER) return load_certificate(FILETYPE_ASN1, der) - def set_version(self, version): + def set_version(self, version: int) -> None: """ Set the version number of the certificate. Note that the version value is zero-based, eg. a value of 0 is V1. @@ -1169,9 +1202,9 @@ class X509(object): if not isinstance(version, int): raise TypeError("version must be an integer") - _lib.X509_set_version(self._x509, version) + _openssl_assert(_lib.X509_set_version(self._x509, version) == 1) - def get_version(self): + def get_version(self) -> int: """ Return the version number of the certificate. @@ -1180,7 +1213,7 @@ class X509(object): """ return _lib.X509_get_version(self._x509) - def get_pubkey(self): + def get_pubkey(self) -> PKey: """ Get the public key of the certificate. @@ -1195,7 +1228,7 @@ class X509(object): pkey._only_public = True return pkey - def set_pubkey(self, pkey): + def set_pubkey(self, pkey: PKey) -> None: """ Set the public key of the certificate. @@ -1210,7 +1243,7 @@ class X509(object): set_result = _lib.X509_set_pubkey(self._x509, pkey._pkey) _openssl_assert(set_result == 1) - def sign(self, pkey, digest): + def sign(self, pkey: PKey, digest: str) -> None: """ Sign the certificate with this key and digest type. @@ -1218,7 +1251,7 @@ class X509(object): :type pkey: :py:class:`PKey` :param digest: The name of the message digest to use. - :type digest: :py:class:`bytes` + :type digest: :py:class:`str` :return: :py:data:`None` """ @@ -1238,7 +1271,7 @@ class X509(object): sign_result = _lib.X509_sign(self._x509, pkey._pkey, evp_md) _openssl_assert(sign_result > 0) - def get_signature_algorithm(self): + def get_signature_algorithm(self) -> bytes: """ Return the signature algorithm used in the certificate. @@ -1255,12 +1288,12 @@ class X509(object): raise ValueError("Undefined signature algorithm") return _ffi.string(_lib.OBJ_nid2ln(nid)) - def digest(self, digest_name): + def digest(self, digest_name: str) -> bytes: """ Return the digest of the X509 object. :param digest_name: The name of the digest algorithm to use. - :type digest_name: :py:class:`bytes` + :type digest_name: :py:class:`str` :return: The digest of the object, formatted as :py:const:`b":"`-delimited hex pairs. @@ -1286,7 +1319,7 @@ class X509(object): ] ) - def subject_name_hash(self): + def subject_name_hash(self) -> bytes: """ Return the hash of the X509 subject. @@ -1295,7 +1328,7 @@ class X509(object): """ return _lib.X509_subject_name_hash(self._x509) - def set_serial_number(self, serial): + def set_serial_number(self, serial: int) -> None: """ Set the serial number of the certificate. @@ -1304,19 +1337,18 @@ class X509(object): :return: :py:data`None` """ - if not isinstance(serial, _integer_types): + if not isinstance(serial, int): raise TypeError("serial must be an integer") hex_serial = hex(serial)[2:] - if not isinstance(hex_serial, bytes): - hex_serial = hex_serial.encode("ascii") + hex_serial_bytes = hex_serial.encode("ascii") bignum_serial = _ffi.new("BIGNUM**") # BN_hex2bn stores the result in &bignum. Unless it doesn't feel like # it. If bignum is still NULL after this call, then the return value # is actually the result. I hope. -exarkun - small_serial = _lib.BN_hex2bn(bignum_serial, hex_serial) + small_serial = _lib.BN_hex2bn(bignum_serial, hex_serial_bytes) if bignum_serial[0] == _ffi.NULL: set_result = _lib.ASN1_INTEGER_set( @@ -1335,7 +1367,7 @@ class X509(object): set_result = _lib.X509_set_serialNumber(self._x509, asn1_serial) _openssl_assert(set_result == 1) - def get_serial_number(self): + def get_serial_number(self) -> int: """ Return the serial number of this certificate. @@ -1355,7 +1387,7 @@ class X509(object): finally: _lib.BN_free(bignum_serial) - def gmtime_adj_notAfter(self, amount): + def gmtime_adj_notAfter(self, amount: int) -> None: """ Adjust the time stamp on which the certificate stops being valid. @@ -1369,7 +1401,7 @@ class X509(object): notAfter = _lib.X509_getm_notAfter(self._x509) _lib.X509_gmtime_adj(notAfter, amount) - def gmtime_adj_notBefore(self, amount): + def gmtime_adj_notBefore(self, amount: int) -> None: """ Adjust the timestamp on which the certificate starts being valid. @@ -1382,22 +1414,25 @@ class X509(object): notBefore = _lib.X509_getm_notBefore(self._x509) _lib.X509_gmtime_adj(notBefore, amount) - def has_expired(self): + def has_expired(self) -> bool: """ Check whether the certificate has expired. :return: ``True`` if the certificate has expired, ``False`` otherwise. :rtype: bool """ - time_string = _native(self.get_notAfter()) + time_bytes = self.get_notAfter() + if time_bytes is None: + raise ValueError("Unable to determine notAfter") + time_string = time_bytes.decode("utf-8") not_after = datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") return not_after < datetime.datetime.utcnow() - def _get_boundary_time(self, which): + def _get_boundary_time(self, which: Any) -> Optional[bytes]: return _get_asn1_time(which(self._x509)) - def get_notBefore(self): + def get_notBefore(self) -> Optional[bytes]: """ Get the timestamp at which the certificate starts being valid. @@ -1410,10 +1445,12 @@ class X509(object): """ return self._get_boundary_time(_lib.X509_getm_notBefore) - def _set_boundary_time(self, which, when): + def _set_boundary_time( + self, which: Callable[..., Any], when: bytes + ) -> None: return _set_asn1_time(which(self._x509), when) - def set_notBefore(self, when): + def set_notBefore(self, when: bytes) -> None: """ Set the timestamp at which the certificate starts being valid. @@ -1426,7 +1463,7 @@ class X509(object): """ return self._set_boundary_time(_lib.X509_getm_notBefore, when) - def get_notAfter(self): + def get_notAfter(self) -> Optional[bytes]: """ Get the timestamp at which the certificate stops being valid. @@ -1439,7 +1476,7 @@ class X509(object): """ return self._get_boundary_time(_lib.X509_getm_notAfter) - def set_notAfter(self, when): + def set_notAfter(self, when: bytes) -> None: """ Set the timestamp at which the certificate stops being valid. @@ -1452,7 +1489,7 @@ class X509(object): """ return self._set_boundary_time(_lib.X509_getm_notAfter, when) - def _get_name(self, which): + def _get_name(self, which: Any) -> X509Name: name = X509Name.__new__(X509Name) name._name = which(self._x509) _openssl_assert(name._name != _ffi.NULL) @@ -1463,13 +1500,13 @@ class X509(object): return name - def _set_name(self, which, name): + def _set_name(self, which: Any, name: X509Name) -> None: if not isinstance(name, X509Name): raise TypeError("name must be an X509Name") set_result = which(self._x509, name._name) _openssl_assert(set_result == 1) - def get_issuer(self): + def get_issuer(self) -> X509Name: """ Return the issuer of this certificate. @@ -1485,7 +1522,7 @@ class X509(object): self._issuer_invalidator.add(name) return name - def set_issuer(self, issuer): + def set_issuer(self, issuer: X509Name) -> None: """ Set the issuer of this certificate. @@ -1497,7 +1534,7 @@ class X509(object): self._set_name(_lib.X509_set_issuer_name, issuer) self._issuer_invalidator.clear() - def get_subject(self): + def get_subject(self) -> X509Name: """ Return the subject of this certificate. @@ -1513,7 +1550,7 @@ class X509(object): self._subject_invalidator.add(name) return name - def set_subject(self, subject): + def set_subject(self, subject: X509Name) -> None: """ Set the subject of this certificate. @@ -1525,7 +1562,7 @@ class X509(object): self._set_name(_lib.X509_set_subject_name, subject) self._subject_invalidator.clear() - def get_extension_count(self): + def get_extension_count(self) -> int: """ Get the number of extensions on this certificate. @@ -1536,7 +1573,7 @@ class X509(object): """ return _lib.X509_get_ext_count(self._x509) - def add_extensions(self, extensions): + def add_extensions(self, extensions: Iterable[X509Extension]) -> None: """ Add extensions to the certificate. @@ -1552,7 +1589,7 @@ class X509(object): if not add_result: _raise_current_error() - def get_extension(self, index): + def get_extension(self, index: int) -> X509Extension: """ Get a specific extension of the certificate by index. @@ -1576,7 +1613,7 @@ class X509(object): return ext -class X509StoreFlags(object): +class X509StoreFlags: """ Flags for X509 verification, used to change the behavior of :class:`X509Store`. @@ -1587,19 +1624,20 @@ class X509StoreFlags(object): https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html """ - CRL_CHECK = _lib.X509_V_FLAG_CRL_CHECK - CRL_CHECK_ALL = _lib.X509_V_FLAG_CRL_CHECK_ALL - IGNORE_CRITICAL = _lib.X509_V_FLAG_IGNORE_CRITICAL - X509_STRICT = _lib.X509_V_FLAG_X509_STRICT - ALLOW_PROXY_CERTS = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS - POLICY_CHECK = _lib.X509_V_FLAG_POLICY_CHECK - EXPLICIT_POLICY = _lib.X509_V_FLAG_EXPLICIT_POLICY - INHIBIT_MAP = _lib.X509_V_FLAG_INHIBIT_MAP - NOTIFY_POLICY = _lib.X509_V_FLAG_NOTIFY_POLICY - CHECK_SS_SIGNATURE = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE + CRL_CHECK: int = _lib.X509_V_FLAG_CRL_CHECK + CRL_CHECK_ALL: int = _lib.X509_V_FLAG_CRL_CHECK_ALL + IGNORE_CRITICAL: int = _lib.X509_V_FLAG_IGNORE_CRITICAL + X509_STRICT: int = _lib.X509_V_FLAG_X509_STRICT + ALLOW_PROXY_CERTS: int = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS + POLICY_CHECK: int = _lib.X509_V_FLAG_POLICY_CHECK + EXPLICIT_POLICY: int = _lib.X509_V_FLAG_EXPLICIT_POLICY + INHIBIT_MAP: int = _lib.X509_V_FLAG_INHIBIT_MAP + NOTIFY_POLICY: int = _lib.X509_V_FLAG_NOTIFY_POLICY + CHECK_SS_SIGNATURE: int = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE + PARTIAL_CHAIN: int = _lib.X509_V_FLAG_PARTIAL_CHAIN -class X509Store(object): +class X509Store: """ An X.509 store. @@ -1613,11 +1651,11 @@ class X509Store(object): :class:`X509StoreContext`. """ - def __init__(self): + def __init__(self) -> None: store = _lib.X509_STORE_new() self._store = _ffi.gc(store, _lib.X509_STORE_free) - def add_cert(self, cert): + def add_cert(self, cert: X509) -> None: """ Adds a trusted certificate to this store. @@ -1639,7 +1677,7 @@ class X509Store(object): res = _lib.X509_STORE_add_cert(self._store, cert._x509) _openssl_assert(res == 1) - def add_crl(self, crl): + def add_crl(self, crl: "CRL") -> None: """ Add a certificate revocation list to this store. @@ -1655,7 +1693,7 @@ class X509Store(object): """ _openssl_assert(_lib.X509_STORE_add_crl(self._store, crl._crl) != 0) - def set_flags(self, flags): + def set_flags(self, flags: int) -> None: """ Set verification flags to this store. @@ -1679,7 +1717,7 @@ class X509Store(object): """ _openssl_assert(_lib.X509_STORE_set_flags(self._store, flags) != 0) - def set_time(self, vfy_time): + def set_time(self, vfy_time: datetime.datetime) -> None: """ Set the time against which the certificates are verified. @@ -1703,7 +1741,9 @@ class X509Store(object): ) _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0) - def load_locations(self, cafile, capath=None): + def load_locations( + self, cafile: StrOrBytesPath, capath: Optional[StrOrBytesPath] = None + ) -> None: """ Let X509Store know where we can find trusted certificates for the certificate chain. Note that the certificates have to be in PEM @@ -1737,12 +1777,12 @@ class X509Store(object): if cafile is None: cafile = _ffi.NULL else: - cafile = _path_string(cafile) + cafile = _path_bytes(cafile) if capath is None: capath = _ffi.NULL else: - capath = _path_string(capath) + capath = _path_bytes(capath) load_result = _lib.X509_STORE_load_locations( self._store, cafile, capath @@ -1760,12 +1800,15 @@ class X509StoreContextError(Exception): :type certificate: :class:`X509` """ - def __init__(self, message, certificate): + def __init__( + self, message: str, errors: List[Any], certificate: X509 + ) -> None: super(X509StoreContextError, self).__init__(message) + self.errors = errors self.certificate = certificate -class X509StoreContext(object): +class X509StoreContext: """ An X.509 store context. @@ -1787,7 +1830,12 @@ class X509StoreContext(object): :type chain: :class:`list` of :class:`X509` """ - def __init__(self, store, certificate, chain=None): + def __init__( + self, + store: X509Store, + certificate: X509, + chain: Optional[Sequence[X509]] = None, + ) -> None: store_ctx = _lib.X509_STORE_CTX_new() self._store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free) self._store = store @@ -1799,8 +1847,10 @@ class X509StoreContext(object): self._init() @staticmethod - def _build_certificate_stack(certificates): - def cleanup(s): + def _build_certificate_stack( + certificates: Optional[Sequence[X509]], + ) -> None: + def cleanup(s: Any) -> None: # Equivalent to sk_X509_pop_free, but we don't # currently have a CFFI binding for that available for i in range(_lib.sk_X509_num(s)): @@ -1826,7 +1876,7 @@ class X509StoreContext(object): return stack - def _init(self): + def _init(self) -> None: """ Set up the store context for a subsequent verification operation. @@ -1839,7 +1889,7 @@ class X509StoreContext(object): if ret <= 0: _raise_current_error() - def _cleanup(self): + def _cleanup(self) -> None: """ Internally cleans up the store context. @@ -1847,7 +1897,7 @@ class X509StoreContext(object): """ _lib.X509_STORE_CTX_cleanup(self._store_ctx) - def _exception_from_context(self): + def _exception_from_context(self) -> X509StoreContextError: """ Convert an OpenSSL native context error failure into a Python exception. @@ -1855,25 +1905,24 @@ class X509StoreContext(object): When a call to native OpenSSL X509_verify_cert fails, additional information about the failure can be obtained from the store context. """ + message = _ffi.string( + _lib.X509_verify_cert_error_string( + _lib.X509_STORE_CTX_get_error(self._store_ctx) + ) + ).decode("utf-8") errors = [ _lib.X509_STORE_CTX_get_error(self._store_ctx), _lib.X509_STORE_CTX_get_error_depth(self._store_ctx), - _native( - _ffi.string( - _lib.X509_verify_cert_error_string( - _lib.X509_STORE_CTX_get_error(self._store_ctx) - ) - ) - ), + message, ] # A context error should always be associated with a certificate, so we # expect this call to never return :class:`None`. _x509 = _lib.X509_STORE_CTX_get_current_cert(self._store_ctx) _cert = _lib.X509_dup(_x509) pycert = X509._from_raw_x509_ptr(_cert) - return X509StoreContextError(errors, pycert) + return X509StoreContextError(message, errors, pycert) - def set_store(self, store): + def set_store(self, store: X509Store) -> None: """ Set the context's X.509 store. @@ -1884,7 +1933,7 @@ class X509StoreContext(object): """ self._store = store - def verify_certificate(self): + def verify_certificate(self) -> None: """ Verify a certificate in a context. @@ -1906,7 +1955,7 @@ class X509StoreContext(object): if ret <= 0: raise self._exception_from_context() - def get_verified_chain(self): + def get_verified_chain(self) -> List[X509]: """ Verify a certificate in a context and return the complete validated chain. @@ -1946,7 +1995,7 @@ class X509StoreContext(object): return result -def load_certificate(type, buffer): +def load_certificate(type: int, buffer: bytes) -> X509: """ Load a certificate (X509) from the string *buffer* encoded with the type *type*. @@ -1957,7 +2006,7 @@ def load_certificate(type, buffer): :return: The X509 object """ - if isinstance(buffer, _text_type): + if isinstance(buffer, str): buffer = buffer.encode("ascii") bio = _new_mem_buf(buffer) @@ -1975,7 +2024,7 @@ def load_certificate(type, buffer): return X509._from_raw_x509_ptr(x509) -def dump_certificate(type, cert): +def dump_certificate(type: int, cert: X509) -> bytes: """ Dump the certificate *cert* into a buffer string encoded with the type *type*. @@ -2003,7 +2052,7 @@ def dump_certificate(type, cert): return _bio_to_string(bio) -def dump_publickey(type, pkey): +def dump_publickey(type: int, pkey: PKey) -> bytes: """ Dump a public key to a buffer. @@ -2028,7 +2077,12 @@ def dump_publickey(type, pkey): return _bio_to_string(bio) -def dump_privatekey(type, pkey, cipher=None, passphrase=None): +def dump_privatekey( + type: int, + pkey: PKey, + cipher: Optional[str] = None, + passphrase: Optional[PassphraseCallableT] = None, +) -> bytes: """ Dump the private key *pkey* into a buffer string encoded with the type *type*. Optionally (if *type* is :const:`FILETYPE_PEM`) encrypting it @@ -2092,7 +2146,7 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None): return _bio_to_string(bio) -class Revoked(object): +class Revoked: """ A certificate revocation. """ @@ -2112,11 +2166,11 @@ class Revoked(object): # b"removeFromCRL", ] - def __init__(self): + def __init__(self) -> None: revoked = _lib.X509_REVOKED_new() self._revoked = _ffi.gc(revoked, _lib.X509_REVOKED_free) - def set_serial(self, hex_str): + def set_serial(self, hex_str: bytes) -> None: """ Set the serial number. @@ -2140,7 +2194,7 @@ class Revoked(object): ) _lib.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial) - def get_serial(self): + def get_serial(self) -> bytes: """ Get the serial number. @@ -2158,7 +2212,7 @@ class Revoked(object): _openssl_assert(result >= 0) return _bio_to_string(bio) - def _delete_reason(self): + def _delete_reason(self) -> None: for i in range(_lib.X509_REVOKED_get_ext_count(self._revoked)): ext = _lib.X509_REVOKED_get_ext(self._revoked, i) obj = _lib.X509_EXTENSION_get_object(ext) @@ -2167,7 +2221,7 @@ class Revoked(object): _lib.X509_REVOKED_delete_ext(self._revoked, i) break - def set_reason(self, reason): + def set_reason(self, reason: Optional[bytes]) -> None: """ Set the reason of this revocation. @@ -2204,7 +2258,7 @@ class Revoked(object): ) _openssl_assert(add_result == 1) - def get_reason(self): + def get_reason(self) -> Optional[bytes]: """ Get the reason of this revocation. @@ -2230,8 +2284,9 @@ class Revoked(object): _openssl_assert(print_result != 0) return _bio_to_string(bio) + return None - def all_reasons(self): + def all_reasons(self) -> List[bytes]: """ Return a list of all the supported reason strings. @@ -2243,7 +2298,7 @@ class Revoked(object): """ return self._crl_reasons[:] - def set_rev_date(self, when): + def set_rev_date(self, when: bytes) -> None: """ Set the revocation timestamp. @@ -2251,10 +2306,13 @@ class Revoked(object): as ASN.1 TIME. :return: ``None`` """ - dt = _lib.X509_REVOKED_get0_revocationDate(self._revoked) - return _set_asn1_time(dt, when) + revocationDate = _new_asn1_time(when) + ret = _lib.X509_REVOKED_set_revocationDate( + self._revoked, revocationDate + ) + _openssl_assert(ret == 1) - def get_rev_date(self): + def get_rev_date(self) -> Optional[bytes]: """ Get the revocation timestamp. @@ -2265,16 +2323,16 @@ class Revoked(object): return _get_asn1_time(dt) -class CRL(object): +class CRL: """ A certificate revocation list. """ - def __init__(self): + def __init__(self) -> None: crl = _lib.X509_CRL_new() self._crl = _ffi.gc(crl, _lib.X509_CRL_free) - def to_cryptography(self): + def to_cryptography(self) -> x509.CertificateRevocationList: """ Export as a ``cryptography`` CRL. @@ -2285,12 +2343,12 @@ class CRL(object): from cryptography.x509 import load_der_x509_crl der = dump_crl(FILETYPE_ASN1, self) - - backend = _get_backend() - return load_der_x509_crl(der, backend) + return load_der_x509_crl(der) @classmethod - def from_cryptography(cls, crypto_crl): + def from_cryptography( + cls, crypto_crl: x509.CertificateRevocationList + ) -> "CRL": """ Construct based on a ``cryptography`` *crypto_crl*. @@ -2309,7 +2367,7 @@ class CRL(object): der = crypto_crl.public_bytes(Encoding.DER) return load_crl(FILETYPE_ASN1, der) - def get_revoked(self): + def get_revoked(self) -> Optional[Tuple[Revoked, ...]]: """ Return the revocations in this certificate revocation list. @@ -2323,14 +2381,15 @@ class CRL(object): revoked_stack = _lib.X509_CRL_get_REVOKED(self._crl) for i in range(_lib.sk_X509_REVOKED_num(revoked_stack)): revoked = _lib.sk_X509_REVOKED_value(revoked_stack, i) - revoked_copy = _lib.Cryptography_X509_REVOKED_dup(revoked) + revoked_copy = _lib.X509_REVOKED_dup(revoked) pyrev = Revoked.__new__(Revoked) pyrev._revoked = _ffi.gc(revoked_copy, _lib.X509_REVOKED_free) results.append(pyrev) if results: return tuple(results) + return None - def add_revoked(self, revoked): + def add_revoked(self, revoked: Revoked) -> None: """ Add a revoked (by value not reference) to the CRL structure @@ -2341,13 +2400,13 @@ class CRL(object): :param Revoked revoked: The new revocation. :return: ``None`` """ - copy = _lib.Cryptography_X509_REVOKED_dup(revoked._revoked) + copy = _lib.X509_REVOKED_dup(revoked._revoked) _openssl_assert(copy != _ffi.NULL) add_result = _lib.X509_CRL_add0_revoked(self._crl, copy) _openssl_assert(add_result != 0) - def get_issuer(self): + def get_issuer(self) -> X509Name: """ Get the CRL's issuer. @@ -2362,7 +2421,7 @@ class CRL(object): issuer._name = _issuer return issuer - def set_version(self, version): + def set_version(self, version: int) -> None: """ Set the CRL version. @@ -2373,10 +2432,7 @@ class CRL(object): """ _openssl_assert(_lib.X509_CRL_set_version(self._crl, version) != 0) - def _set_boundary_time(self, which, when): - return _set_asn1_time(which(self._crl), when) - - def set_lastUpdate(self, when): + def set_lastUpdate(self, when: bytes) -> None: """ Set when the CRL was last updated. @@ -2389,9 +2445,11 @@ class CRL(object): :param bytes when: A timestamp string. :return: ``None`` """ - return self._set_boundary_time(_lib.X509_CRL_get_lastUpdate, when) + lastUpdate = _new_asn1_time(when) + ret = _lib.X509_CRL_set1_lastUpdate(self._crl, lastUpdate) + _openssl_assert(ret == 1) - def set_nextUpdate(self, when): + def set_nextUpdate(self, when: bytes) -> None: """ Set when the CRL will next be updated. @@ -2404,9 +2462,11 @@ class CRL(object): :param bytes when: A timestamp string. :return: ``None`` """ - return self._set_boundary_time(_lib.X509_CRL_get_nextUpdate, when) + nextUpdate = _new_asn1_time(when) + ret = _lib.X509_CRL_set1_nextUpdate(self._crl, nextUpdate) + _openssl_assert(ret == 1) - def sign(self, issuer_cert, issuer_key, digest): + def sign(self, issuer_cert: X509, issuer_key: PKey, digest: bytes) -> None: """ Sign the CRL. @@ -2433,8 +2493,13 @@ class CRL(object): _openssl_assert(result != 0) def export( - self, cert, key, type=FILETYPE_PEM, days=100, digest=_UNSPECIFIED - ): + self, + cert: X509, + key: PKey, + type: int = FILETYPE_PEM, + days: int = 100, + digest: bytes = _UNSPECIFIED, # type: ignore + ) -> bytes: """ Export the CRL as a string. @@ -2462,23 +2527,26 @@ class CRL(object): if digest_obj == _ffi.NULL: raise ValueError("No such digest method") - bio = _lib.BIO_new(_lib.BIO_s_mem()) - _openssl_assert(bio != _ffi.NULL) - # A scratch time object to give different values to different CRL # fields sometime = _lib.ASN1_TIME_new() _openssl_assert(sometime != _ffi.NULL) + sometime = _ffi.gc(sometime, _lib.ASN1_TIME_free) - _lib.X509_gmtime_adj(sometime, 0) - _lib.X509_CRL_set_lastUpdate(self._crl, sometime) + ret = _lib.X509_gmtime_adj(sometime, 0) + _openssl_assert(ret != _ffi.NULL) + ret = _lib.X509_CRL_set1_lastUpdate(self._crl, sometime) + _openssl_assert(ret == 1) - _lib.X509_gmtime_adj(sometime, days * 24 * 60 * 60) - _lib.X509_CRL_set_nextUpdate(self._crl, sometime) + ret = _lib.X509_gmtime_adj(sometime, days * 24 * 60 * 60) + _openssl_assert(ret != _ffi.NULL) + ret = _lib.X509_CRL_set1_nextUpdate(self._crl, sometime) + _openssl_assert(ret == 1) - _lib.X509_CRL_set_issuer_name( + ret = _lib.X509_CRL_set_issuer_name( self._crl, _lib.X509_get_subject_name(cert._x509) ) + _openssl_assert(ret == 1) sign_result = _lib.X509_CRL_sign(self._crl, key._pkey, digest_obj) if not sign_result: @@ -2487,8 +2555,11 @@ class CRL(object): return dump_crl(type, self) -class PKCS7(object): - def type_is_signed(self): +class PKCS7: + + _pkcs7: Any + + def type_is_signed(self) -> bool: """ Check if this NID_pkcs7_signed object @@ -2496,7 +2567,7 @@ class PKCS7(object): """ return bool(_lib.PKCS7_type_is_signed(self._pkcs7)) - def type_is_enveloped(self): + def type_is_enveloped(self) -> bool: """ Check if this NID_pkcs7_enveloped object @@ -2504,7 +2575,7 @@ class PKCS7(object): """ return bool(_lib.PKCS7_type_is_enveloped(self._pkcs7)) - def type_is_signedAndEnveloped(self): + def type_is_signedAndEnveloped(self) -> bool: """ Check if this NID_pkcs7_signedAndEnveloped object @@ -2512,7 +2583,7 @@ class PKCS7(object): """ return bool(_lib.PKCS7_type_is_signedAndEnveloped(self._pkcs7)) - def type_is_data(self): + def type_is_data(self) -> bool: """ Check if this NID_pkcs7_data object @@ -2520,7 +2591,7 @@ class PKCS7(object): """ return bool(_lib.PKCS7_type_is_data(self._pkcs7)) - def get_type_name(self): + def get_type_name(self) -> str: """ Returns the type name of the PKCS7 structure @@ -2531,18 +2602,18 @@ class PKCS7(object): return _ffi.string(string_type) -class PKCS12(object): +class PKCS12: """ A PKCS #12 archive. """ - def __init__(self): - self._pkey = None - self._cert = None - self._cacerts = None - self._friendlyname = None + def __init__(self) -> None: + self._pkey: Optional[PKey] = None + self._cert: Optional[X509] = None + self._cacerts: Optional[List[X509]] = None + self._friendlyname: Optional[bytes] = None - def get_certificate(self): + def get_certificate(self) -> Optional[X509]: """ Get the certificate in the PKCS #12 structure. @@ -2551,7 +2622,7 @@ class PKCS12(object): """ return self._cert - def set_certificate(self, cert): + def set_certificate(self, cert: X509) -> None: """ Set the certificate in the PKCS #12 structure. @@ -2564,7 +2635,7 @@ class PKCS12(object): raise TypeError("cert must be an X509 instance") self._cert = cert - def get_privatekey(self): + def get_privatekey(self) -> Optional[PKey]: """ Get the private key in the PKCS #12 structure. @@ -2573,7 +2644,7 @@ class PKCS12(object): """ return self._pkey - def set_privatekey(self, pkey): + def set_privatekey(self, pkey: PKey) -> None: """ Set the certificate portion of the PKCS #12 structure. @@ -2586,7 +2657,7 @@ class PKCS12(object): raise TypeError("pkey must be a PKey instance") self._pkey = pkey - def get_ca_certificates(self): + def get_ca_certificates(self) -> Optional[Tuple[X509, ...]]: """ Get the CA certificates in the PKCS #12 structure. @@ -2596,8 +2667,9 @@ class PKCS12(object): """ if self._cacerts is not None: return tuple(self._cacerts) + return None - def set_ca_certificates(self, cacerts): + def set_ca_certificates(self, cacerts: Optional[Iterable[X509]]) -> None: """ Replace or set the CA certificates within the PKCS12 object. @@ -2618,7 +2690,7 @@ class PKCS12(object): ) self._cacerts = cacerts - def set_friendlyname(self, name): + def set_friendlyname(self, name: Optional[bytes]) -> None: """ Set the friendly name in the PKCS #12 structure. @@ -2635,7 +2707,7 @@ class PKCS12(object): ) self._friendlyname = name - def get_friendlyname(self): + def get_friendlyname(self) -> Optional[bytes]: """ Get the friendly name in the PKCS# 12 structure. @@ -2644,7 +2716,12 @@ class PKCS12(object): """ return self._friendlyname - def export(self, passphrase=None, iter=2048, maciter=1): + def export( + self, + passphrase: Optional[bytes] = None, + iter: int = 2048, + maciter: int = 1, + ) -> bytes: """ Dump a PKCS12 object as a string. @@ -2712,16 +2789,16 @@ class PKCS12(object): return _bio_to_string(bio) -class NetscapeSPKI(object): +class NetscapeSPKI: """ A Netscape SPKI object. """ - def __init__(self): + def __init__(self) -> None: spki = _lib.NETSCAPE_SPKI_new() self._spki = _ffi.gc(spki, _lib.NETSCAPE_SPKI_free) - def sign(self, pkey, digest): + def sign(self, pkey: PKey, digest: str) -> None: """ Sign the certificate request with this key and digest type. @@ -2729,7 +2806,7 @@ class NetscapeSPKI(object): :type pkey: :py:class:`PKey` :param digest: The message digest to use. - :type digest: :py:class:`bytes` + :type digest: :py:class:`str` :return: ``None`` """ @@ -2748,7 +2825,7 @@ class NetscapeSPKI(object): ) _openssl_assert(sign_result > 0) - def verify(self, key): + def verify(self, key: PKey) -> bool: """ Verifies a signature on a certificate request. @@ -2765,7 +2842,7 @@ class NetscapeSPKI(object): _raise_current_error() return True - def b64_encode(self): + def b64_encode(self) -> bytes: """ Generate a base64 encoded representation of this SPKI object. @@ -2777,7 +2854,7 @@ class NetscapeSPKI(object): _lib.OPENSSL_free(encoded) return result - def get_pubkey(self): + def get_pubkey(self) -> PKey: """ Get the public key of this certificate. @@ -2791,7 +2868,7 @@ class NetscapeSPKI(object): pkey._only_public = True return pkey - def set_pubkey(self, pkey): + def set_pubkey(self, pkey: PKey) -> None: """ Set the public key of the certificate @@ -2802,8 +2879,14 @@ class NetscapeSPKI(object): _openssl_assert(set_result == 1) -class _PassphraseHelper(object): - def __init__(self, type, passphrase, more_args=False, truncate=False): +class _PassphraseHelper: + def __init__( + self, + type: int, + passphrase: Optional[PassphraseCallableT], + more_args: bool = False, + truncate: bool = False, + ) -> None: if type != FILETYPE_PEM and passphrase is not None: raise ValueError( "only FILETYPE_PEM key format supports encryption" @@ -2811,10 +2894,10 @@ class _PassphraseHelper(object): self._passphrase = passphrase self._more_args = more_args self._truncate = truncate - self._problems = [] + self._problems: List[Exception] = [] @property - def callback(self): + def callback(self) -> Any: if self._passphrase is None: return _ffi.NULL elif isinstance(self._passphrase, bytes) or callable(self._passphrase): @@ -2825,7 +2908,7 @@ class _PassphraseHelper(object): ) @property - def callback_args(self): + def callback_args(self) -> Any: if self._passphrase is None: return _ffi.NULL elif isinstance(self._passphrase, bytes) or callable(self._passphrase): @@ -2835,7 +2918,7 @@ class _PassphraseHelper(object): "Last argument must be a byte string or a callable." ) - def raise_if_problem(self, exceptionType=Error): + def raise_if_problem(self, exceptionType: Type[Exception] = Error) -> None: if self._problems: # Flush the OpenSSL error queue @@ -2846,7 +2929,9 @@ class _PassphraseHelper(object): raise self._problems.pop(0) - def _read_passphrase(self, buf, size, rwflag, userdata): + def _read_passphrase( + self, buf: Any, size: int, rwflag: Any, userdata: Any + ) -> int: try: if callable(self._passphrase): if self._more_args: @@ -2854,6 +2939,7 @@ class _PassphraseHelper(object): else: result = self._passphrase(rwflag) else: + assert self._passphrase is not None result = self._passphrase if not isinstance(result, bytes): raise ValueError("Bytes expected") @@ -2872,7 +2958,7 @@ class _PassphraseHelper(object): return 0 -def load_publickey(type, buffer): +def load_publickey(type: int, buffer: Union[str, bytes]) -> PKey: """ Load a public key from a buffer. @@ -2883,7 +2969,7 @@ def load_publickey(type, buffer): :return: The PKey object. :rtype: :class:`PKey` """ - if isinstance(buffer, _text_type): + if isinstance(buffer, str): buffer = buffer.encode("ascii") bio = _new_mem_buf(buffer) @@ -2906,7 +2992,11 @@ def load_publickey(type, buffer): return pkey -def load_privatekey(type, buffer, passphrase=None): +def load_privatekey( + type: int, + buffer: Union[str, bytes], + passphrase: Optional[PassphraseCallableT] = None, +) -> PKey: """ Load a private key (PKey) from the string *buffer* encoded with the type *type*. @@ -2919,7 +3009,7 @@ def load_privatekey(type, buffer, passphrase=None): :return: The PKey object """ - if isinstance(buffer, _text_type): + if isinstance(buffer, str): buffer = buffer.encode("ascii") bio = _new_mem_buf(buffer) @@ -2943,7 +3033,7 @@ def load_privatekey(type, buffer, passphrase=None): return pkey -def dump_certificate_request(type, req): +def dump_certificate_request(type: int, req: X509Req) -> bytes: """ Dump the certificate request *req* into a buffer string encoded with the type *type*. @@ -2971,7 +3061,7 @@ def dump_certificate_request(type, req): return _bio_to_string(bio) -def load_certificate_request(type, buffer): +def load_certificate_request(type: int, buffer: bytes) -> X509Req: """ Load a certificate request (X509Req) from the string *buffer* encoded with the type *type*. @@ -2980,7 +3070,7 @@ def load_certificate_request(type, buffer): :param buffer: The buffer the certificate request is stored in :return: The X509Req object """ - if isinstance(buffer, _text_type): + if isinstance(buffer, str): buffer = buffer.encode("ascii") bio = _new_mem_buf(buffer) @@ -2999,7 +3089,7 @@ def load_certificate_request(type, buffer): return x509req -def sign(pkey, data, digest): +def sign(pkey: PKey, data: Union[str, bytes], digest: str) -> bytes: """ Sign a data string using the given key and message digest. @@ -3016,8 +3106,8 @@ def sign(pkey, data, digest): if digest_obj == _ffi.NULL: raise ValueError("No such digest method") - md_ctx = _lib.Cryptography_EVP_MD_CTX_new() - md_ctx = _ffi.gc(md_ctx, _lib.Cryptography_EVP_MD_CTX_free) + md_ctx = _lib.EVP_MD_CTX_new() + md_ctx = _ffi.gc(md_ctx, _lib.EVP_MD_CTX_free) _lib.EVP_SignInit(md_ctx, digest_obj) _lib.EVP_SignUpdate(md_ctx, data, len(data)) @@ -3034,7 +3124,9 @@ def sign(pkey, data, digest): return _ffi.buffer(signature_buffer, signature_length[0])[:] -def verify(cert, signature, data, digest): +def verify( + cert: X509, signature: bytes, data: Union[str, bytes], digest: str +) -> None: """ Verify the signature for a data string. @@ -3057,8 +3149,8 @@ def verify(cert, signature, data, digest): _openssl_assert(pkey != _ffi.NULL) pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free) - md_ctx = _lib.Cryptography_EVP_MD_CTX_new() - md_ctx = _ffi.gc(md_ctx, _lib.Cryptography_EVP_MD_CTX_free) + md_ctx = _lib.EVP_MD_CTX_new() + md_ctx = _ffi.gc(md_ctx, _lib.EVP_MD_CTX_free) _lib.EVP_VerifyInit(md_ctx, digest_obj) _lib.EVP_VerifyUpdate(md_ctx, data, len(data)) @@ -3070,7 +3162,7 @@ def verify(cert, signature, data, digest): _raise_current_error() -def dump_crl(type, crl): +def dump_crl(type: int, crl: CRL) -> bytes: """ Dump a certificate revocation list to a buffer. @@ -3099,7 +3191,7 @@ def dump_crl(type, crl): return _bio_to_string(bio) -def load_crl(type, buffer): +def load_crl(type: int, buffer: Union[str, bytes]) -> CRL: """ Load Certificate Revocation List (CRL) data from a string *buffer*. *buffer* encoded with the type *type*. @@ -3107,9 +3199,9 @@ def load_crl(type, buffer): :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) :param buffer: The buffer the CRL is stored in - :return: The PKey object + :return: The CRL object """ - if isinstance(buffer, _text_type): + if isinstance(buffer, str): buffer = buffer.encode("ascii") bio = _new_mem_buf(buffer) @@ -3129,7 +3221,7 @@ def load_crl(type, buffer): return result -def load_pkcs7_data(type, buffer): +def load_pkcs7_data(type: int, buffer: Union[str, bytes]) -> PKCS7: """ Load pkcs7 data from the string *buffer* encoded with the type *type*. @@ -3138,7 +3230,7 @@ def load_pkcs7_data(type, buffer): :param buffer: The buffer with the pkcs7 data. :return: The PKCS7 object """ - if isinstance(buffer, _text_type): + if isinstance(buffer, str): buffer = buffer.encode("ascii") bio = _new_mem_buf(buffer) @@ -3158,7 +3250,7 @@ def load_pkcs7_data(type, buffer): return pypkcs7 -load_pkcs7_data = utils.deprecated( +utils.deprecated( load_pkcs7_data, __name__, ( @@ -3166,10 +3258,13 @@ load_pkcs7_data = utils.deprecated( "in cryptography." ), DeprecationWarning, + name="load_pkcs7_data", ) -def load_pkcs12(buffer, passphrase=None): +def load_pkcs12( + buffer: Union[str, bytes], passphrase: Optional[bytes] = None +) -> PKCS12: """ Load pkcs12 data from the string *buffer*. If the pkcs12 structure is encrypted, a *passphrase* must be included. The MAC is always @@ -3183,7 +3278,7 @@ def load_pkcs12(buffer, passphrase=None): """ passphrase = _text_to_bytes_and_warn("passphrase", passphrase) - if isinstance(buffer, _text_type): + if isinstance(buffer, str): buffer = buffer.encode("ascii") bio = _new_mem_buf(buffer) @@ -3245,18 +3340,16 @@ def load_pkcs12(buffer, passphrase=None): x509 = _lib.sk_X509_value(cacerts, i) pycacert = X509._from_raw_x509_ptr(x509) pycacerts.append(pycacert) - if not pycacerts: - pycacerts = None pkcs12 = PKCS12.__new__(PKCS12) pkcs12._pkey = pykey pkcs12._cert = pycert - pkcs12._cacerts = pycacerts + pkcs12._cacerts = pycacerts if pycacerts else None pkcs12._friendlyname = friendlyname return pkcs12 -load_pkcs12 = utils.deprecated( +utils.deprecated( load_pkcs12, __name__, ( @@ -3264,25 +3357,5 @@ load_pkcs12 = utils.deprecated( "in cryptography." ), DeprecationWarning, + name="load_pkcs12", ) - - -# There are no direct unit tests for this initialization. It is tested -# indirectly since it is necessary for functions like dump_privatekey when -# using encryption. -# -# Thus OpenSSL.test.test_crypto.FunctionTests.test_dump_privatekey_passphrase -# and some other similar tests may fail without this (though they may not if -# the Python runtime has already done some initialization of the underlying -# OpenSSL library (and is linked against the same one that cryptography is -# using)). -_lib.OpenSSL_add_all_algorithms() - -# This is similar but exercised mainly by exception_from_error_queue. It calls -# both ERR_load_crypto_strings() and ERR_load_SSL_strings(). -_lib.SSL_load_error_strings() - - -# Set the default string mask to match OpenSSL upstream (since 2005) and -# RFC5280 recommendations. -_lib.ASN1_STRING_set_default_mask_asc(b"utf8only") diff --git a/contrib/python/pyOpenSSL/py3/OpenSSL/debug.py b/contrib/python/pyOpenSSL/py3/OpenSSL/debug.py index 04521d59226..e39b128a7e2 100644 --- a/contrib/python/pyOpenSSL/py3/OpenSSL/debug.py +++ b/contrib/python/pyOpenSSL/py3/OpenSSL/debug.py @@ -3,14 +3,16 @@ from __future__ import print_function import ssl import sys -import OpenSSL.SSL import cffi + import cryptography +import OpenSSL.SSL + from . import version -_env_info = u"""\ +_env_info = """\ pyOpenSSL: {pyopenssl} cryptography: {cryptography} cffi: {cffi} diff --git a/contrib/python/pyOpenSSL/py3/OpenSSL/rand.py b/contrib/python/pyOpenSSL/py3/OpenSSL/rand.py index d2c17673e5e..a4aa7210dda 100644 --- a/contrib/python/pyOpenSSL/py3/OpenSSL/rand.py +++ b/contrib/python/pyOpenSSL/py3/OpenSSL/rand.py @@ -5,7 +5,7 @@ PRNG management routines, thin wrappers. from OpenSSL._util import lib as _lib -def add(buffer, entropy): +def add(buffer: bytes, entropy: int) -> None: """ Mix bytes from *string* into the PRNG state. @@ -31,7 +31,7 @@ def add(buffer, entropy): _lib.RAND_add(buffer, len(buffer), entropy) -def status(): +def status() -> int: """ Check whether the PRNG has been seeded with enough data. diff --git a/contrib/python/pyOpenSSL/py3/OpenSSL/version.py b/contrib/python/pyOpenSSL/py3/OpenSSL/version.py index c6fcecb0774..61ec17b2d8f 100644 --- a/contrib/python/pyOpenSSL/py3/OpenSSL/version.py +++ b/contrib/python/pyOpenSSL/py3/OpenSSL/version.py @@ -17,7 +17,7 @@ __all__ = [ "__version__", ] -__version__ = "21.0.0" +__version__ = "23.0.0" __title__ = "pyOpenSSL" __uri__ = "https://pyopenssl.org/" @@ -25,4 +25,4 @@ __summary__ = "Python wrapper module around the OpenSSL library" __author__ = "The pyOpenSSL developers" __email__ = "[email protected]" __license__ = "Apache License, Version 2.0" -__copyright__ = "Copyright 2001-2020 {0}".format(__author__) +__copyright__ = "Copyright 2001-2023 {0}".format(__author__) diff --git a/contrib/python/pyOpenSSL/py3/README.rst b/contrib/python/pyOpenSSL/py3/README.rst index a628c8aea9f..4fecb9cc147 100644 --- a/contrib/python/pyOpenSSL/py3/README.rst +++ b/contrib/python/pyOpenSSL/py3/README.rst @@ -36,7 +36,7 @@ If you run into bugs, you can file them in our `issue tracker`_. We maintain a cryptography-dev_ mailing list for both user and development discussions. -You can also join ``#cryptography-dev`` on Freenode to ask questions or get involved. +You can also join ``#pyca`` on ``irc.libera.chat`` to ask questions or get involved. .. _documentation: https://pyopenssl.org/ diff --git a/contrib/python/pyOpenSSL/py3/patches/01-fix-tests.patch b/contrib/python/pyOpenSSL/py3/patches/01-fix-tests.patch index ab5a02e23c6..2f82d67bebe 100644 --- a/contrib/python/pyOpenSSL/py3/patches/01-fix-tests.patch +++ b/contrib/python/pyOpenSSL/py3/patches/01-fix-tests.patch @@ -9,11 +9,3 @@ """ `Context.set_default_verify_paths` causes the platform-specific CA certificate locations to be used for verification purposes. -@@ -1927,6 +1927,7 @@ class TestApplicationLayerProtoNegotiation(object): - assert server.get_alpn_proto_negotiated() == b"spdy/2" - assert client.get_alpn_proto_negotiated() == b"spdy/2" - -+ @pytest.mark.xfail(reason='https://github.com/pyca/pyopenssl/issues/1043') - def test_alpn_call_failure(self): - """ - SSL_CTX_set_alpn_protos does not like to be called with an empty diff --git a/contrib/python/pyOpenSSL/py3/tests/memdbg.py b/contrib/python/pyOpenSSL/py3/tests/memdbg.py index 590b72d0682..0dd8c318e4e 100644 --- a/contrib/python/pyOpenSSL/py3/tests/memdbg.py +++ b/contrib/python/pyOpenSSL/py3/tests/memdbg.py @@ -1,5 +1,4 @@ import sys - import traceback from cffi import api as _api diff --git a/contrib/python/pyOpenSSL/py3/tests/test_crypto.py b/contrib/python/pyOpenSSL/py3/tests/test_crypto.py index ef3429d17fd..44bbd0f2aba 100644 --- a/contrib/python/pyOpenSSL/py3/tests/test_crypto.py +++ b/contrib/python/pyOpenSSL/py3/tests/test_crypto.py @@ -4,56 +4,65 @@ """ Unit tests for :py:mod:`OpenSSL.crypto`. """ - -from warnings import simplefilter - import base64 -from subprocess import PIPE, Popen -from datetime import datetime, timedelta import sys - -import pytest +from datetime import datetime, timedelta +from subprocess import PIPE, Popen +from warnings import simplefilter from cryptography import x509 -from cryptography.hazmat.backends.openssl.backend import backend from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.asymmetric import ec, ed25519, ed448, rsa import flaky -from OpenSSL.crypto import TYPE_RSA, TYPE_DSA, Error, PKey -from OpenSSL.crypto import X509, X509Name +import pytest + +from OpenSSL._util import ffi as _ffi, lib as _lib from OpenSSL.crypto import ( + CRL, + Error, + FILETYPE_ASN1, + FILETYPE_PEM, + FILETYPE_TEXT, + NetscapeSPKI, + PKCS12, + PKCS7, + PKey, + Revoked, + TYPE_DSA, + TYPE_RSA, + X509, + X509Extension, + X509Name, + X509Req, X509Store, - X509StoreFlags, X509StoreContext, X509StoreContextError, -) -from OpenSSL.crypto import X509Req -from OpenSSL.crypto import X509Extension -from OpenSSL.crypto import load_certificate, load_privatekey -from OpenSSL.crypto import load_publickey, dump_publickey -from OpenSSL.crypto import FILETYPE_PEM, FILETYPE_ASN1, FILETYPE_TEXT -from OpenSSL.crypto import dump_certificate, load_certificate_request -from OpenSSL.crypto import dump_certificate_request, dump_privatekey -from OpenSSL.crypto import PKCS7, load_pkcs7_data -from OpenSSL.crypto import PKCS12, load_pkcs12 -from OpenSSL.crypto import CRL, Revoked, dump_crl, load_crl -from OpenSSL.crypto import NetscapeSPKI -from OpenSSL.crypto import ( - sign, - verify, + X509StoreFlags, + dump_certificate, + dump_certificate_request, + dump_crl, + dump_privatekey, + dump_publickey, get_elliptic_curve, get_elliptic_curves, + load_certificate, + load_certificate_request, + load_crl, + load_pkcs12, + load_pkcs7_data, + load_privatekey, + load_publickey, + sign, + verify, ) -from OpenSSL._util import ffi as _ffi, lib as _lib - from .util import ( EqualityTestsMixin, - is_consistent_type, - WARNING_TYPE_EXPECTED, NON_ASCII, + WARNING_TYPE_EXPECTED, + is_consistent_type, ) @@ -64,7 +73,7 @@ def normalize_privatekey_pem(pem): GOOD_CIPHER = "blowfish" BAD_CIPHER = "zippers" -GOOD_DIGEST = "SHA1" +GOOD_DIGEST = "SHA256" BAD_DIGEST = "monkeys" old_root_cert_pem = b"""-----BEGIN CERTIFICATE----- @@ -609,9 +618,10 @@ vrzEeLDRiiPl92dyyWmu -----END X509 CRL----- """ +# The signature on this CRL is invalid. crlDataUnsupportedExtension = b"""\ -----BEGIN X509 CRL----- -MIIGRzCCBS8CAQIwDQYJKoZIhvcNAQELBQAwJzELMAkGA1UEBhMCVVMxGDAWBgNV +MIIGRzCCBS8CAQEwDQYJKoZIhvcNAQELBQAwJzELMAkGA1UEBhMCVVMxGDAWBgNV BAMMD2NyeXB0b2dyYXBoeS5pbxgPMjAxNTAxMDEwMDAwMDBaGA8yMDE2MDEwMTAw MDAwMFowggTOMBQCAQAYDzIwMTUwMTAxMDAwMDAwWjByAgEBGA8yMDE1MDEwMTAw MDAwMFowXDAYBgNVHRgEERgPMjAxNTAxMDEwMDAwMDBaMDQGA1UdHQQtMCukKTAn @@ -783,6 +793,21 @@ MBsCAQACAS0CAQcCAQACAQ8CAQMCAQACAQACAQA= -----END RSA PRIVATE KEY----- """ +ed25519_private_key_pem = b"""-----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VwBCIEIKlxBbhVsSURoLTmsu9uTqYH6oF7zpxmp1ZQCAPhDmI2 +-----END PRIVATE KEY----- +""" + +ed448_private_key_pem = b"""-----BEGIN PRIVATE KEY----- +MEcCAQAwBQYDK2VxBDsEOcqZ7a3k6JwrJbYO8CNTPT/d7dlWCo5vCf0EYDj79ZvA\nhD8u9EPHlYJw5Y8ZQdH4WmVEfpKA23xkdQ== +-----END PRIVATE KEY----- +""" + +x25519_private_key_pem = b"""-----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VuBCIEIPAjVfPNTm25VxtBRg+JjjFx9tA3M8aaBdVhjb92iBts +-----END PRIVATE KEY----- +""" + @pytest.fixture def x509_data(): @@ -809,7 +834,7 @@ def x509_data(): yield pkey, x509 -class TestX509Ext(object): +class TestX509Ext: """ Tests for `OpenSSL.crypto.X509Extension`. """ @@ -914,7 +939,7 @@ class TestX509Ext(object): b"basicConstraints", False, b"CA:TRUE", subject=x509 ) x509.add_extensions([ext1]) - x509.sign(pkey, "sha1") + x509.sign(pkey, "sha256") # This is a little lame. Can we think of a better way? text = dump_certificate(FILETYPE_TEXT, x509) assert b"X509v3 Basic Constraints:" in text @@ -930,7 +955,7 @@ class TestX509Ext(object): b"subjectKeyIdentifier", False, b"hash", subject=x509 ) x509.add_extensions([ext3]) - x509.sign(pkey, "sha1") + x509.sign(pkey, "sha256") text = dump_certificate(FILETYPE_TEXT, x509) assert b"X509v3 Subject Key Identifier:" in text @@ -963,7 +988,7 @@ class TestX509Ext(object): b"basicConstraints", False, b"CA:TRUE", issuer=x509 ) x509.add_extensions([ext1]) - x509.sign(pkey, "sha1") + x509.sign(pkey, "sha256") text = dump_certificate(FILETYPE_TEXT, x509) assert b"X509v3 Basic Constraints:" in text assert b"CA:TRUE" in text @@ -978,7 +1003,7 @@ class TestX509Ext(object): b"authorityKeyIdentifier", False, b"issuer:always", issuer=x509 ) x509.add_extensions([ext2]) - x509.sign(pkey, "sha1") + x509.sign(pkey, "sha256") text = dump_certificate(FILETYPE_TEXT, x509) assert b"X509v3 Authority Key Identifier:" in text assert b"DirName:/CN=Yoda root CA" in text @@ -1008,22 +1033,40 @@ class TestX509Ext(object): ) -class TestPKey(object): +class TestPKey: """ Tests for `OpenSSL.crypto.PKey`. """ - def test_convert_from_cryptography_private_key(self): + @pytest.mark.parametrize( + ("key_string", "key_type"), + [ + (intermediate_key_pem, rsa.RSAPrivateKey), + (ec_private_key_pem, ec.EllipticCurvePrivateKey), + (ed25519_private_key_pem, ed25519.Ed25519PrivateKey), + (ed448_private_key_pem, ed448.Ed448PrivateKey), + ], + ) + def test_convert_roundtrip_cryptography_private_key( + self, key_string, key_type + ): """ PKey.from_cryptography_key creates a proper private PKey. + PKey.to_cryptography_key creates a proper cryptography private key. """ - key = serialization.load_pem_private_key( - intermediate_key_pem, None, backend - ) + key = serialization.load_pem_private_key(key_string, None) pkey = PKey.from_cryptography_key(key) assert isinstance(pkey, PKey) - assert pkey.bits() == key.key_size + parsed_key = pkey.to_cryptography_key() + assert isinstance(parsed_key, key_type) + assert parsed_key.public_key().public_bytes( + serialization.Encoding.PEM, + serialization.PublicFormat.SubjectPublicKeyInfo, + ) == key.public_key().public_bytes( + serialization.Encoding.PEM, + serialization.PublicFormat.SubjectPublicKeyInfo, + ) assert pkey._only_public is False assert pkey._initialized is True @@ -1031,7 +1074,7 @@ class TestPKey(object): """ PKey.from_cryptography_key creates a proper public PKey. """ - key = serialization.load_pem_public_key(cleartextPublicKeyPEM, backend) + key = serialization.load_pem_public_key(cleartextPublicKeyPEM) pkey = PKey.from_cryptography_key(key) assert isinstance(pkey, PKey) @@ -1043,9 +1086,7 @@ class TestPKey(object): """ PKey.from_cryptography_key raises TypeError with an unsupported type. """ - key = serialization.load_pem_private_key( - ec_private_key_pem, None, backend - ) + key = serialization.load_pem_private_key(x25519_private_key_pem, None) with pytest.raises(TypeError): PKey.from_cryptography_key(key) @@ -1059,16 +1100,6 @@ class TestPKey(object): assert isinstance(key, rsa.RSAPublicKey) assert pkey.bits() == key.key_size - def test_convert_private_pkey_to_cryptography_key(self): - """ - PKey.to_cryptography_key creates a proper cryptography private key. - """ - pkey = load_privatekey(FILETYPE_PEM, root_key_pem) - key = pkey.to_cryptography_key() - - assert isinstance(key, rsa.RSAPrivateKey) - assert pkey.bits() == key.key_size - def test_type(self): """ `PKey` can be used to create instances of that type. @@ -1175,10 +1206,11 @@ class TestPKey(object): def test_inconsistent_key(self): """ - `PKey.check` returns `Error` if the key is not consistent. + Either `load_privatekey` or `PKey.check` returns `Error` if the key is + not consistent. """ - key = load_privatekey(FILETYPE_PEM, inconsistentPrivateKeyPEM) with pytest.raises(Error): + key = load_privatekey(FILETYPE_PEM, inconsistentPrivateKeyPEM) key.check() def test_check_public_key(self): @@ -1197,10 +1229,11 @@ class TestPKey(object): def test_check_pr_897(self): """ - `PKey.check` raises `OpenSSL.crypto.Error` if provided with broken key + Either `load_privatekey` or `PKey.check` raises `OpenSSL.crypto.Error` + if provided with broken key """ - pkey = load_privatekey(FILETYPE_PEM, rsa_p_not_prime_pem) with pytest.raises(Error): + pkey = load_privatekey(FILETYPE_PEM, rsa_p_not_prime_pem) pkey.check() @@ -1222,7 +1255,7 @@ def x509_name(**attrs): return name -class TestX509Name(object): +class TestX509Name: """ Unit tests for `OpenSSL.crypto.X509Name`. """ @@ -1398,6 +1431,19 @@ class TestX509Name(object): # other X509Name. assert_greater_than(x509_name(CN="def"), x509_name(CN="abc")) + def assert_raises(a, b): + with pytest.raises(TypeError): + a < b + with pytest.raises(TypeError): + a <= b + with pytest.raises(TypeError): + a > b + with pytest.raises(TypeError): + a >= b + + # Only X509Name objects can be compared with lesser than / greater than + assert_raises(x509_name(), object()) + def test_hash(self): """ `X509Name.hash` returns an integer hash based on the value of the name. @@ -1555,14 +1601,20 @@ class TestX509Req(_PKeyInteractionTestsMixin): """ `X509Req.set_version` sets the X.509 version of the certificate request. `X509Req.get_version` returns the X.509 version of the - certificate request. The initial value of the version is 0. + certificate request. The only defined version is 0. Others may or + may not be supported depending on backend. """ request = X509Req() assert request.get_version() == 0 - request.set_version(1) - assert request.get_version() == 1 - request.set_version(3) - assert request.get_version() == 3 + request.set_version(0) + assert request.get_version() == 0 + try: + request.set_version(1) + assert request.get_version() == 1 + request.set_version(3) + assert request.get_version() == 3 + except Error: + pass def test_version_wrong_args(self): """ @@ -1686,9 +1738,7 @@ class TestX509Req(_PKeyInteractionTestsMixin): assert request.verify(pkey) def test_convert_from_cryptography(self): - crypto_req = x509.load_pem_x509_csr( - cleartextCertificateRequestPEM, backend - ) + crypto_req = x509.load_pem_x509_csr(cleartextCertificateRequestPEM) req = X509Req.from_cryptography(crypto_req) assert isinstance(req, X509Req) @@ -1752,8 +1802,8 @@ class TestX509(_PKeyInteractionTestsMixin): `X509.get_version` retrieves it. """ cert = X509() - cert.set_version(1234) - assert cert.get_version() == 1234 + cert.set_version(2) + assert cert.get_version() == 2 def test_serial_number(self): """ @@ -1767,12 +1817,12 @@ class TestX509(_PKeyInteractionTestsMixin): assert certificate.get_serial_number() == 0 certificate.set_serial_number(1) assert certificate.get_serial_number() == 1 - certificate.set_serial_number(2 ** 32 + 1) - assert certificate.get_serial_number() == 2 ** 32 + 1 - certificate.set_serial_number(2 ** 64 + 1) - assert certificate.get_serial_number() == 2 ** 64 + 1 - certificate.set_serial_number(2 ** 128 + 1) - assert certificate.get_serial_number() == 2 ** 128 + 1 + certificate.set_serial_number(2**32 + 1) + assert certificate.get_serial_number() == 2**32 + 1 + certificate.set_serial_number(2**64 + 1) + assert certificate.get_serial_number() == 2**64 + 1 + certificate.set_serial_number(2**128 + 1) + assert certificate.get_serial_number() == 2**128 + 1 def _setBoundTest(self, which): """ @@ -1920,6 +1970,14 @@ class TestX509(_PKeyInteractionTestsMixin): cert.gmtime_adj_notAfter(2) assert not cert.has_expired() + def test_has_expired_exception(self): + """ + `X509.has_expired` throws ValueError if not-after time is not set + """ + cert = X509() + with pytest.raises(ValueError): + cert.has_expired() + def test_root_has_not_expired(self): """ `X509.has_expired` returns `False` if the certificate's not-after time @@ -1935,13 +1993,13 @@ class TestX509(_PKeyInteractionTestsMixin): """ cert = load_certificate(FILETYPE_PEM, old_root_cert_pem) assert ( - # This is MD5 instead of GOOD_DIGEST because the digest algorithm - # actually matters to the assertion (ie, another arbitrary, good - # digest will not product the same digest). # Digest verified with the command: - # openssl x509 -in root_cert.pem -noout -fingerprint -md5 - cert.digest("MD5") - == b"19:B3:05:26:2B:F8:F2:FF:0B:8F:21:07:A8:28:B8:75" + # openssl x509 -in root_cert.pem -noout -fingerprint -sha256 + cert.digest("SHA256") + == ( + b"3E:0F:16:39:6B:B1:3E:4F:08:85:C6:5F:10:0D:CB:2C:" + b"25:C2:91:4E:D0:4A:C2:29:06:BD:55:E3:A7:B3:B7:06" + ) ) def _extcert(self, pkey, extensions): @@ -1957,7 +2015,7 @@ class TestX509(_PKeyInteractionTestsMixin): cert.set_notAfter(when) cert.add_extensions(extensions) - cert.sign(pkey, "sha1") + cert.sign(pkey, "sha256") return load_certificate( FILETYPE_PEM, dump_certificate(FILETYPE_PEM, cert) ) @@ -2036,8 +2094,8 @@ class TestX509(_PKeyInteractionTestsMixin): b"DNS:altnull.python.org\x00example.com, " b"email:[email protected]\[email protected], " b"URI:http://null.python.org\x00http://example.org, " - b"IP Address:192.0.2.1, IP Address:2001:DB8:0:0:0:0:0:1\n" - == str(ext).encode("ascii") + b"IP Address:192.0.2.1, IP Address:2001:DB8:0:0:0:0:0:1" + == str(ext).encode("ascii").strip() ) def test_invalid_digest_algorithm(self): @@ -2204,9 +2262,7 @@ tgI5 cert.sign(object(), b"sha256") def test_convert_from_cryptography(self): - crypto_cert = x509.load_pem_x509_certificate( - intermediate_cert_pem, backend - ) + crypto_cert = x509.load_pem_x509_certificate(intermediate_cert_pem) cert = X509.from_cryptography(crypto_cert) assert isinstance(cert, X509) @@ -2224,7 +2280,7 @@ tgI5 assert crypto_cert.version.value == cert.get_version() -class TestX509Store(object): +class TestX509Store: """ Test for `OpenSSL.crypto.X509Store`. """ @@ -2295,7 +2351,7 @@ class TestX509Store(object): def test_load_locations_parameters( self, cafile, capath, call_cafile, call_capath, monkeypatch ): - class LibMock(object): + class LibMock: def load_locations(self, store, cafile, capath): self.cafile = cafile self.capath = capath @@ -2326,7 +2382,7 @@ class TestX509Store(object): store.load_locations(cafile=str(invalid_ca_file)) -class TestPKCS12(object): +class TestPKCS12: """ Test for `OpenSSL.crypto.PKCS12` and `OpenSSL.crypto.load_pkcs12`. """ @@ -2467,7 +2523,7 @@ class TestPKCS12(object): b"-nodes", b"-passin", b"pass:" + passwd, - *extra + *extra, ) assert recovered_key[-len(key) :] == key if cert: @@ -2479,7 +2535,7 @@ class TestPKCS12(object): b"-passin", b"pass:" + passwd, b"-nokeys", - *extra + *extra, ) assert recovered_cert[-len(cert) :] == cert if ca: @@ -2491,7 +2547,7 @@ class TestPKCS12(object): b"-passin", b"pass:" + passwd, b"-nokeys", - *extra + *extra, ) assert recovered_cert[-len(ca) :] == ca @@ -2802,7 +2858,7 @@ def _runopenssl(pem, *args): return output -class TestLoadPublicKey(object): +class TestLoadPublicKey: """ Tests for :func:`load_publickey`. """ @@ -2842,7 +2898,7 @@ class TestLoadPublicKey(object): assert dumped_pem == cleartextPublicKeyPEM -class TestFunction(object): +class TestFunction: """ Tests for free-functions in the `OpenSSL.crypto` module. """ @@ -3263,7 +3319,7 @@ class TestFunction(object): load_pkcs7_data(object(), b"foo") -class TestLoadCertificate(object): +class TestLoadCertificate: """ Tests for `load_certificate_request`. """ @@ -3287,7 +3343,7 @@ class TestLoadCertificate(object): load_certificate(FILETYPE_ASN1, b"lol") -class TestPKCS7(object): +class TestPKCS7: """ Tests for `PKCS7`. """ @@ -3387,7 +3443,7 @@ class TestNetscapeSPKI(_PKeyInteractionTestsMixin): assert isinstance(blob, bytes) -class TestRevoked(object): +class TestRevoked: """ Tests for `OpenSSL.crypto.Revoked`. """ @@ -3477,7 +3533,7 @@ class TestRevoked(object): revoked.set_reason(None) assert revoked.get_reason() is None - @pytest.mark.parametrize("reason", [object(), 1.0, u"foo"]) + @pytest.mark.parametrize("reason", [object(), 1.0, "foo"]) def test_set_reason_wrong_args(self, reason): """ `Revoked.set_reason` raises `TypeError` if called with an argument @@ -3497,7 +3553,7 @@ class TestRevoked(object): revoked.set_reason(b"blue") -class TestCRL(object): +class TestCRL: """ Tests for `OpenSSL.crypto.CRL`. """ @@ -3548,17 +3604,17 @@ class TestCRL(object): dumped_crl = self._get_crl().export( self.cert, self.pkey, days=20, digest=b"sha256" ) - crl = x509.load_pem_x509_crl(dumped_crl, backend) + crl = x509.load_pem_x509_crl(dumped_crl) revoked = crl.get_revoked_certificate_by_serial_number(0x03AB) assert revoked is not None assert crl.issuer == x509.Name( [ - x509.NameAttribute(x509.NameOID.COUNTRY_NAME, u"US"), - x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, u"IL"), - x509.NameAttribute(x509.NameOID.LOCALITY_NAME, u"Chicago"), - x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, u"Testing"), + x509.NameAttribute(x509.NameOID.COUNTRY_NAME, "US"), + x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, "IL"), + x509.NameAttribute(x509.NameOID.LOCALITY_NAME, "Chicago"), + x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, "Testing"), x509.NameAttribute( - x509.NameOID.COMMON_NAME, u"Testing Root CA" + x509.NameOID.COMMON_NAME, "Testing Root CA" ), ] ) @@ -3573,19 +3629,19 @@ class TestCRL(object): # DER format dumped_crl = self._get_crl().export( - self.cert, self.pkey, FILETYPE_ASN1, digest=b"md5" + self.cert, self.pkey, FILETYPE_ASN1, digest=b"sha256" ) - crl = x509.load_der_x509_crl(dumped_crl, backend) + crl = x509.load_der_x509_crl(dumped_crl) revoked = crl.get_revoked_certificate_by_serial_number(0x03AB) assert revoked is not None assert crl.issuer == x509.Name( [ - x509.NameAttribute(x509.NameOID.COUNTRY_NAME, u"US"), - x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, u"IL"), - x509.NameAttribute(x509.NameOID.LOCALITY_NAME, u"Chicago"), - x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, u"Testing"), + x509.NameAttribute(x509.NameOID.COUNTRY_NAME, "US"), + x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, "IL"), + x509.NameAttribute(x509.NameOID.LOCALITY_NAME, "Chicago"), + x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, "Testing"), x509.NameAttribute( - x509.NameOID.COMMON_NAME, u"Testing Root CA" + x509.NameOID.COMMON_NAME, "Testing Root CA" ), ] ) @@ -3600,7 +3656,7 @@ class TestCRL(object): # text format dumped_text = crl.export( - self.cert, self.pkey, type=FILETYPE_TEXT, digest=b"md5" + self.cert, self.pkey, type=FILETYPE_TEXT, digest=b"sha256" ) assert len(dumped_text) > 500 @@ -3610,9 +3666,9 @@ class TestCRL(object): signature algorithm based on that digest function. """ crl = self._get_crl() - dumped_crl = crl.export(self.cert, self.pkey, digest=b"sha1") + dumped_crl = crl.export(self.cert, self.pkey, digest=b"sha384") text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text") - text.index(b"Signature Algorithm: sha1") + text.index(b"Signature Algorithm: sha384") def test_export_md5_digest(self): """ @@ -3794,7 +3850,9 @@ class TestCRL(object): crl.add_revoked(revoked) crl.set_version(1) crl.set_lastUpdate(b"20140601000000Z") - crl.set_nextUpdate(b"20180601000000Z") + # The year 5000 is far into the future so that this CRL isn't + # considered to have expired. + crl.set_nextUpdate(b"50000601000000Z") crl.sign(issuer_cert, issuer_key, digest=b"sha512") return crl @@ -3820,7 +3878,7 @@ class TestCRL(object): store_ctx = X509StoreContext(store, self.intermediate_server_cert) with pytest.raises(X509StoreContextError) as err: store_ctx.verify_certificate() - assert err.value.args[0][2] == "certificate revoked" + assert str(err.value) == "certificate revoked" def test_verify_with_missing_crl(self): """ @@ -3840,11 +3898,11 @@ class TestCRL(object): store_ctx = X509StoreContext(store, self.intermediate_server_cert) with pytest.raises(X509StoreContextError) as err: store_ctx.verify_certificate() - assert err.value.args[0][2] == "unable to get certificate CRL" + assert str(err.value) == "unable to get certificate CRL" assert err.value.certificate.get_subject().CN == "intermediate-service" def test_convert_from_cryptography(self): - crypto_crl = x509.load_pem_x509_crl(crlData, backend) + crypto_crl = x509.load_pem_x509_crl(crlData) crl = CRL.from_cryptography(crypto_crl) assert isinstance(crl, CRL) @@ -3858,7 +3916,7 @@ class TestCRL(object): assert isinstance(crypto_crl, x509.CertificateRevocationList) -class TestX509StoreContext(object): +class TestX509StoreContext: """ Tests for `OpenSSL.crypto.X509StoreContext`. """ @@ -4051,7 +4109,11 @@ class TestX509StoreContext(object): with pytest.raises(X509StoreContextError) as exc: store_ctx.verify_certificate() - assert exc.value.args[0][2] == "self signed certificate" + # OpenSSL 1.1.x and 3.0.x have different error messages + assert str(exc.value) in [ + "self signed certificate", + "self-signed certificate", + ] assert exc.value.certificate.get_subject().CN == "Testing Root CA" def test_invalid_chain_no_root(self): @@ -4066,7 +4128,7 @@ class TestX509StoreContext(object): with pytest.raises(X509StoreContextError) as exc: store_ctx.verify_certificate() - assert exc.value.args[0][2] == "unable to get issuer certificate" + assert str(exc.value) == "unable to get issuer certificate" assert exc.value.certificate.get_subject().CN == "intermediate" def test_invalid_chain_no_intermediate(self): @@ -4081,7 +4143,7 @@ class TestX509StoreContext(object): with pytest.raises(X509StoreContextError) as exc: store_ctx.verify_certificate() - assert exc.value.args[0][2] == "unable to get local issuer certificate" + assert str(exc.value) == "unable to get local issuer certificate" assert exc.value.certificate.get_subject().CN == "intermediate-service" def test_modification_pre_verify(self): @@ -4099,7 +4161,7 @@ class TestX509StoreContext(object): with pytest.raises(X509StoreContextError) as exc: store_ctx.verify_certificate() - assert exc.value.args[0][2] == "unable to get issuer certificate" + assert str(exc.value) == "unable to get issuer certificate" assert exc.value.certificate.get_subject().CN == "intermediate" store_ctx.set_store(store_good) @@ -4124,7 +4186,7 @@ class TestX509StoreContext(object): with pytest.raises(X509StoreContextError) as exc: store_ctx.verify_certificate() - assert exc.value.args[0][2] == "certificate has expired" + assert str(exc.value) == "certificate has expired" def test_get_verified_chain(self): """ @@ -4158,7 +4220,7 @@ class TestX509StoreContext(object): with pytest.raises(X509StoreContextError) as exc: store_ctx.get_verified_chain() - assert exc.value.args[0][2] == "unable to get issuer certificate" + assert str(exc.value) == "unable to get issuer certificate" assert exc.value.certificate.get_subject().CN == "intermediate" @pytest.fixture @@ -4223,10 +4285,23 @@ class TestX509StoreContext(object): with pytest.raises(X509StoreContextError) as exc: store_ctx.verify_certificate() - assert exc.value.args[0][2] == "unable to get local issuer certificate" + assert str(exc.value) == "unable to get local issuer certificate" + + def test_verify_with_partial_chain(self): + store = X509Store() + store.add_cert(self.intermediate_cert) + + store_ctx = X509StoreContext(store, self.intermediate_server_cert) + with pytest.raises(X509StoreContextError): + store_ctx.verify_certificate() + + # Now set the partial verification flag for verification. + store.set_flags(X509StoreFlags.PARTIAL_CHAIN) + store_ctx = X509StoreContext(store, self.intermediate_server_cert) + assert store_ctx.verify_certificate() is None -class TestSignVerify(object): +class TestSignVerify: """ Tests for `OpenSSL.crypto.sign` and `OpenSSL.crypto.verify`. """ @@ -4250,7 +4325,7 @@ class TestSignVerify(object): # certificate unrelated to priv_key, used to trigger an error bad_cert = load_certificate(FILETYPE_PEM, server_cert_pem) - for digest in ["md5", "sha1"]: + for digest in ["md5", "sha1", "sha256"]: sig = sign(priv_key, content, digest) # Verify the signature of content, will throw an exception if @@ -4289,7 +4364,7 @@ class TestSignVerify(object): priv_key = load_privatekey(FILETYPE_PEM, root_key_pem) cert = load_certificate(FILETYPE_PEM, root_cert_pem) - for digest in ["md5", "sha1"]: + for digest in ["md5", "sha1", "sha256"]: with pytest.warns(DeprecationWarning) as w: simplefilter("always") sig = sign(priv_key, content, digest) @@ -4319,8 +4394,8 @@ class TestSignVerify(object): ) priv_key = load_privatekey(FILETYPE_PEM, ec_root_key_pem) cert = load_certificate(FILETYPE_PEM, ec_root_cert_pem) - sig = sign(priv_key, content, "sha1") - verify(cert, sig, content, "sha1") + sig = sign(priv_key, content, "sha256") + verify(cert, sig, content, "sha256") def test_sign_nulls(self): """ @@ -4329,8 +4404,8 @@ class TestSignVerify(object): content = b"Watch out! \0 Did you see it?" priv_key = load_privatekey(FILETYPE_PEM, root_key_pem) good_cert = load_certificate(FILETYPE_PEM, root_cert_pem) - sig = sign(priv_key, content, "sha1") - verify(good_cert, sig, content, "sha1") + sig = sign(priv_key, content, "sha256") + verify(good_cert, sig, content, "sha256") def test_sign_with_large_key(self): """ @@ -4345,10 +4420,10 @@ class TestSignVerify(object): ) priv_key = load_privatekey(FILETYPE_PEM, large_key_pem) - sign(priv_key, content, "sha1") + sign(priv_key, content, "sha256") -class TestEllipticCurve(object): +class TestEllipticCurve: """ Tests for `_EllipticCurve`, `get_elliptic_curve`, and `get_elliptic_curves`. @@ -4375,7 +4450,7 @@ class TestEllipticCurve(object): does not identify a supported curve. """ with pytest.raises(ValueError): - get_elliptic_curve(u"this curve was just invented") + get_elliptic_curve("this curve was just invented") def test_repr(self): """ @@ -4399,7 +4474,7 @@ class TestEllipticCurve(object): curve._to_EC_KEY() -class EllipticCurveFactory(object): +class EllipticCurveFactory: """ A helper to get the names of two curves. """ @@ -4434,7 +4509,7 @@ class TestEllipticCurveEquality(EqualityTestsMixin): return get_elliptic_curve(self.curve_factory.another_curve_name) -class TestEllipticCurveHash(object): +class TestEllipticCurveHash: """ Tests for `_EllipticCurve`'s implementation of hashing (thus use as an item in a `dict` or `set`). diff --git a/contrib/python/pyOpenSSL/py3/tests/test_debug.py b/contrib/python/pyOpenSSL/py3/tests/test_debug.py index 2d62a3a56a1..4eeb3d00b31 100644 --- a/contrib/python/pyOpenSSL/py3/tests/test_debug.py +++ b/contrib/python/pyOpenSSL/py3/tests/test_debug.py @@ -1,5 +1,5 @@ -from OpenSSL.debug import _env_info from OpenSSL import version +from OpenSSL.debug import _env_info def test_debug_info(): diff --git a/contrib/python/pyOpenSSL/py3/tests/test_rand.py b/contrib/python/pyOpenSSL/py3/tests/test_rand.py index 763d71124ca..24f30939545 100644 --- a/contrib/python/pyOpenSSL/py3/tests/test_rand.py +++ b/contrib/python/pyOpenSSL/py3/tests/test_rand.py @@ -10,7 +10,7 @@ import pytest from OpenSSL import rand -class TestRand(object): +class TestRand: @pytest.mark.parametrize("args", [(b"foo", None), (None, 3)]) def test_add_wrong_args(self, args): """ diff --git a/contrib/python/pyOpenSSL/py3/tests/test_ssl.py b/contrib/python/pyOpenSSL/py3/tests/test_ssl.py index 8bb5a035a77..fdedd7133a0 100644 --- a/contrib/python/pyOpenSSL/py3/tests/test_ssl.py +++ b/contrib/python/pyOpenSSL/py3/tests/test_ssl.py @@ -7,124 +7,125 @@ Unit tests for :mod:`OpenSSL.SSL`. import datetime import gc +import select import sys import uuid - -from gc import collect, get_referrers from errno import ( EAFNOSUPPORT, ECONNREFUSED, EINPROGRESS, - EWOULDBLOCK, EPIPE, ESHUTDOWN, + EWOULDBLOCK, ) -from sys import platform, getfilesystemencoding -from socket import AF_INET, AF_INET6, MSG_PEEK, SHUT_RDWR, error, socket +from gc import collect, get_referrers from os import makedirs from os.path import join -from weakref import ref +from socket import ( + AF_INET, + AF_INET6, + MSG_PEEK, + SHUT_RDWR, + error, + socket, +) +from sys import getfilesystemencoding, platform +from typing import Union from warnings import simplefilter - -import flaky - -import pytest - -from pretend import raiser - -from six import PY2, text_type +from weakref import ref from cryptography import x509 -from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.x509.oid import NameOID +import flaky -from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM -from OpenSSL.crypto import PKey, X509, X509Extension, X509Store -from OpenSSL.crypto import dump_privatekey, load_privatekey -from OpenSSL.crypto import dump_certificate, load_certificate -from OpenSSL.crypto import get_elliptic_curves +from pretend import raiser -from OpenSSL.SSL import ( - OPENSSL_VERSION_NUMBER, - SSLEAY_VERSION, - SSLEAY_CFLAGS, - TLS_METHOD, - TLS1_3_VERSION, - TLS1_2_VERSION, - TLS1_1_VERSION, -) -from OpenSSL.SSL import SSLEAY_PLATFORM, SSLEAY_DIR, SSLEAY_BUILT_ON -from OpenSSL.SSL import SENT_SHUTDOWN, RECEIVED_SHUTDOWN -from OpenSSL.SSL import ( - SSLv2_METHOD, - SSLv3_METHOD, - SSLv23_METHOD, - TLSv1_METHOD, - TLSv1_1_METHOD, - TLSv1_2_METHOD, -) -from OpenSSL.SSL import OP_SINGLE_DH_USE, OP_NO_SSLv2, OP_NO_SSLv3 -from OpenSSL.SSL import ( - VERIFY_PEER, - VERIFY_FAIL_IF_NO_PEER_CERT, - VERIFY_CLIENT_ONCE, - VERIFY_NONE, -) +import pytest from OpenSSL import SSL from OpenSSL.SSL import ( - SESS_CACHE_OFF, - SESS_CACHE_CLIENT, - SESS_CACHE_SERVER, + Connection, + Context, + DTLS_METHOD, + Error, + MODE_RELEASE_BUFFERS, + NO_OVERLAPPING_PROTOCOLS, + OPENSSL_VERSION_NUMBER, + OP_COOKIE_EXCHANGE, + OP_NO_COMPRESSION, + OP_NO_QUERY_MTU, + OP_NO_SSLv2, + OP_NO_SSLv3, + OP_NO_TICKET, + OP_SINGLE_DH_USE, + RECEIVED_SHUTDOWN, + SENT_SHUTDOWN, SESS_CACHE_BOTH, + SESS_CACHE_CLIENT, SESS_CACHE_NO_AUTO_CLEAR, + SESS_CACHE_NO_INTERNAL, SESS_CACHE_NO_INTERNAL_LOOKUP, SESS_CACHE_NO_INTERNAL_STORE, - SESS_CACHE_NO_INTERNAL, -) - -from OpenSSL.SSL import ( - Error, + SESS_CACHE_OFF, + SESS_CACHE_SERVER, + SSLEAY_BUILT_ON, + SSLEAY_CFLAGS, + SSLEAY_DIR, + SSLEAY_PLATFORM, + SSLEAY_VERSION, + SSL_CB_ACCEPT_EXIT, + SSL_CB_ACCEPT_LOOP, + SSL_CB_ALERT, + SSL_CB_CONNECT_EXIT, + SSL_CB_CONNECT_LOOP, + SSL_CB_EXIT, + SSL_CB_HANDSHAKE_DONE, + SSL_CB_HANDSHAKE_START, + SSL_CB_LOOP, + SSL_CB_READ, + SSL_CB_READ_ALERT, + SSL_CB_WRITE, + SSL_CB_WRITE_ALERT, + SSL_ST_ACCEPT, + SSL_ST_CONNECT, + SSL_ST_MASK, + SSLeay_version, + SSLv23_METHOD, + Session, SysCallError, + TLS1_1_VERSION, + TLS1_2_VERSION, + TLS1_3_VERSION, + TLS_METHOD, + TLSv1_1_METHOD, + TLSv1_2_METHOD, + TLSv1_METHOD, + VERIFY_CLIENT_ONCE, + VERIFY_FAIL_IF_NO_PEER_CERT, + VERIFY_NONE, + VERIFY_PEER, WantReadError, WantWriteError, ZeroReturnError, + _make_requires, ) -from OpenSSL.SSL import Context, Session, Connection, SSLeay_version -from OpenSSL.SSL import _make_requires - from OpenSSL._util import ffi as _ffi, lib as _lib - -from OpenSSL.SSL import ( - OP_NO_QUERY_MTU, - OP_COOKIE_EXCHANGE, - OP_NO_TICKET, - OP_NO_COMPRESSION, - MODE_RELEASE_BUFFERS, - NO_OVERLAPPING_PROTOCOLS, -) - -from OpenSSL.SSL import ( - SSL_ST_CONNECT, - SSL_ST_ACCEPT, - SSL_ST_MASK, - SSL_CB_LOOP, - SSL_CB_EXIT, - SSL_CB_READ, - SSL_CB_WRITE, - SSL_CB_ALERT, - SSL_CB_READ_ALERT, - SSL_CB_WRITE_ALERT, - SSL_CB_ACCEPT_LOOP, - SSL_CB_ACCEPT_EXIT, - SSL_CB_CONNECT_LOOP, - SSL_CB_CONNECT_EXIT, - SSL_CB_HANDSHAKE_START, - SSL_CB_HANDSHAKE_DONE, +from OpenSSL.crypto import ( + FILETYPE_PEM, + PKey, + TYPE_RSA, + X509, + X509Extension, + X509Store, + dump_certificate, + dump_privatekey, + get_elliptic_curves, + load_certificate, + load_privatekey, ) try: @@ -142,15 +143,15 @@ try: except ImportError: OP_NO_TLSv1_3 = None -from .util import WARNING_TYPE_EXPECTED, NON_ASCII, is_consistent_type from .test_crypto import ( client_cert_pem, client_key_pem, - server_cert_pem, - server_key_pem, root_cert_pem, root_key_pem, + server_cert_pem, + server_key_pem, ) +from .util import NON_ASCII, WARNING_TYPE_EXPECTED, is_consistent_type # openssl dhparam 2048 -out dh-2048.pem @@ -166,9 +167,6 @@ i5s5yYK7a/0eWxxRr2qraYaUj8RwDpH9CwIBAg== """ -skip_if_py3 = pytest.mark.skipif(not PY2, reason="Python 2 only") - - def socket_any_family(): try: return socket(AF_INET) @@ -197,7 +195,7 @@ def join_bytes_or_unicode(prefix, suffix): return join(prefix, suffix) # Otherwise, coerce suffix to the type of prefix. - if isinstance(prefix, text_type): + if isinstance(prefix, str): return join(prefix, suffix.decode(getfilesystemencoding())) else: return join(prefix, suffix.encode(getfilesystemencoding())) @@ -221,6 +219,8 @@ def socket_pair(): client.setblocking(True) server = port.accept()[0] + port.close() + # Let's pass some unencrypted data to make sure our socket connection is # fine. Just one byte, so we don't have to worry about buffers getting # filled up or fragmentation. @@ -366,7 +366,7 @@ def interact_in_memory(client_conn, server_conn): # Give the side a chance to generate some more bytes, or succeed. try: - data = read.recv(2 ** 16) + data = read.recv(2**16) except WantReadError: # It didn't succeed, so we'll hope it generated some output. pass @@ -407,7 +407,7 @@ def handshake_in_memory(client_conn, server_conn): interact_in_memory(client_conn, server_conn) -class TestVersion(object): +class TestVersion: """ Tests for version information exposed by `OpenSSL.SSL.SSLeay_version` and `OpenSSL.SSL.OPENSSL_VERSION_NUMBER`. @@ -444,17 +444,15 @@ def ca_file(tmpdir): """ Create a valid PEM file with CA certificates and return the path. """ - key = rsa.generate_private_key( - public_exponent=65537, key_size=2048, backend=default_backend() - ) + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) public_key = key.public_key() builder = x509.CertificateBuilder() builder = builder.subject_name( - x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org")]) + x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "pyopenssl.org")]) ) builder = builder.issuer_name( - x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org")]) + x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "pyopenssl.org")]) ) one_day = datetime.timedelta(1, 0, 0) builder = builder.not_valid_before(datetime.datetime.today() - one_day) @@ -466,9 +464,7 @@ def ca_file(tmpdir): critical=True, ) - certificate = builder.sign( - private_key=key, algorithm=hashes.SHA256(), backend=default_backend() - ) + certificate = builder.sign(private_key=key, algorithm=hashes.SHA256()) ca_file = tmpdir.join("test.pem") ca_file.write_binary( @@ -488,14 +484,14 @@ def context(): return Context(SSLv23_METHOD) -class TestContext(object): +class TestContext: """ Unit tests for `OpenSSL.SSL.Context`. """ @pytest.mark.parametrize( "cipher_string", - [b"hello world:AES128-SHA", u"hello world:AES128-SHA"], + [b"hello world:AES128-SHA", "hello world:AES128-SHA"], ) def test_set_cipher_list(self, context, cipher_string): """ @@ -525,15 +521,20 @@ class TestContext(object): """ with pytest.raises(Error) as excinfo: context.set_cipher_list(b"imaginary-cipher") - assert excinfo.value.args == ( - [ - ( - "SSL routines", - "SSL_CTX_set_cipher_list", - "no cipher match", - ) - ], - ) + assert excinfo.value.args[0][0] in [ + # 1.1.x + ( + "SSL routines", + "SSL_CTX_set_cipher_list", + "no cipher match", + ), + # 3.0.x + ( + "SSL routines", + "", + "no cipher match", + ), + ] def test_load_client_ca(self, context, ca_file): """ @@ -572,20 +573,27 @@ class TestContext(object): with pytest.raises(Error) as e: context.set_session_id(b"abc" * 1000) - assert [ + assert e.value.args[0][0] in [ + # 1.1.x ( "SSL routines", "SSL_CTX_set_session_id_context", "ssl session id context too long", - ) - ] == e.value.args[0] + ), + # 3.0.x + ( + "SSL routines", + "", + "ssl session id context too long", + ), + ] def test_set_session_id_unicode(self, context): """ `Context.set_session_id` raises a warning if a unicode string is passed. """ - pytest.deprecated_call(context.set_session_id, u"abc") + pytest.deprecated_call(context.set_session_id, "abc") def test_method(self): """ @@ -597,7 +605,7 @@ class TestContext(object): for meth in methods: Context(meth) - maybe = [SSLv2_METHOD, SSLv3_METHOD, TLSv1_1_METHOD, TLSv1_2_METHOD] + maybe = [TLSv1_1_METHOD, TLSv1_2_METHOD] for meth in maybe: try: Context(meth) @@ -609,7 +617,7 @@ class TestContext(object): with pytest.raises(TypeError): Context("") with pytest.raises(ValueError): - Context(10) + Context(13) def test_type(self): """ @@ -617,17 +625,6 @@ class TestContext(object): """ assert is_consistent_type(Context, "Context", TLSv1_METHOD) - def test_use_privatekey(self): - """ - `Context.use_privatekey` takes an `OpenSSL.crypto.PKey` instance. - """ - key = PKey() - key.generate_key(TYPE_RSA, 1024) - ctx = Context(SSLv23_METHOD) - ctx.use_privatekey(key) - with pytest.raises(TypeError): - ctx.use_privatekey("") - def test_use_privatekey_file_missing(self, tmpfile): """ `Context.use_privatekey_file` raises `OpenSSL.SSL.Error` when passed @@ -681,37 +678,6 @@ class TestContext(object): FILETYPE_PEM, ) - def test_use_certificate_wrong_args(self): - """ - `Context.use_certificate_wrong_args` raises `TypeError` when not passed - exactly one `OpenSSL.crypto.X509` instance as an argument. - """ - ctx = Context(SSLv23_METHOD) - with pytest.raises(TypeError): - ctx.use_certificate("hello, world") - - def test_use_certificate_uninitialized(self): - """ - `Context.use_certificate` raises `OpenSSL.SSL.Error` when passed a - `OpenSSL.crypto.X509` instance which has not been initialized - (ie, which does not actually have any certificate data). - """ - ctx = Context(SSLv23_METHOD) - with pytest.raises(Error): - ctx.use_certificate(X509()) - - def test_use_certificate(self): - """ - `Context.use_certificate` sets the certificate which will be - used to identify connections created using the context. - """ - # TODO - # Hard to assert anything. But we could set a privatekey then ask - # OpenSSL if the cert and key agree using check_privatekey. Then as - # long as check_privatekey works right we're good... - ctx = Context(SSLv23_METHOD) - ctx.use_certificate(load_certificate(FILETYPE_PEM, root_cert_pem)) - def test_use_certificate_file_wrong_args(self): """ `Context.use_certificate_file` raises `TypeError` if the first @@ -1211,8 +1177,8 @@ class TestContext(object): def test_fallback_default_verify_paths(self, monkeypatch): """ Test that we load certificates successfully on linux from the fallback - path. To do this we set the _CRYPTOGRAPHY_MANYLINUX1_CA_FILE and - _CRYPTOGRAPHY_MANYLINUX1_CA_DIR vars to be equal to whatever the + path. To do this we set the _CRYPTOGRAPHY_MANYLINUX_CA_FILE and + _CRYPTOGRAPHY_MANYLINUX_CA_DIR vars to be equal to whatever the current OpenSSL default is and we disable SSL_CTX_SET_default_verify_paths so that it can't find certs unless it loads via fallback. @@ -1223,12 +1189,12 @@ class TestContext(object): ) monkeypatch.setattr( SSL, - "_CRYPTOGRAPHY_MANYLINUX1_CA_FILE", + "_CRYPTOGRAPHY_MANYLINUX_CA_FILE", _ffi.string(_lib.X509_get_default_cert_file()), ) monkeypatch.setattr( SSL, - "_CRYPTOGRAPHY_MANYLINUX1_CA_DIR", + "_CRYPTOGRAPHY_MANYLINUX_CA_DIR", _ffi.string(_lib.X509_get_default_cert_dir()), ) context.set_default_verify_paths() @@ -1332,20 +1298,20 @@ class TestContext(object): """ serverSocket, clientSocket = socket_pair() - server = Connection(serverContext, serverSocket) - server.set_accept_state() + with serverSocket, clientSocket: + server = Connection(serverContext, serverSocket) + server.set_accept_state() - client = Connection(clientContext, clientSocket) - client.set_connect_state() + client = Connection(clientContext, clientSocket) + client.set_connect_state() - # Make them talk to each other. - # interact_in_memory(client, server) - for _ in range(3): - for s in [client, server]: - try: - s.do_handshake() - except WantReadError: - pass + # Make them talk to each other. + for _ in range(3): + for s in [client, server]: + try: + s.do_handshake() + except WantReadError: + select.select([client, server], [], []) def test_set_verify_callback_connection_argument(self): """ @@ -1361,7 +1327,7 @@ class TestContext(object): ) serverConnection = Connection(serverContext, None) - class VerifyCallback(object): + class VerifyCallback: def callback(self, connection, *args): self.connection = connection return 1 @@ -1705,7 +1671,7 @@ class TestContext(object): """ context = Context(SSLv23_METHOD) for curve in get_elliptic_curves(): - if curve.name.startswith(u"Oakley-"): + if curve.name.startswith("Oakley-"): # Setting Oakley-EC2N-4 and Oakley-EC2N-3 adds # ('bignum routines', 'BN_mod_inverse', 'no inverse') to the # error queue on OpenSSL 1.0.2. @@ -1751,7 +1717,7 @@ class TestContext(object): """ context = Context(SSLv23_METHOD) with pytest.raises(TypeError): - context.set_tlsext_use_srtp(text_type("SRTP_AES128_CM_SHA1_80")) + context.set_tlsext_use_srtp(str("SRTP_AES128_CM_SHA1_80")) def test_set_tlsext_use_srtp_invalid_profile(self): """ @@ -1773,7 +1739,7 @@ class TestContext(object): assert context.set_tlsext_use_srtp(b"SRTP_AES128_CM_SHA1_80") is None -class TestServerNameCallback(object): +class TestServerNameCallback: """ Tests for `Context.set_tlsext_servername_callback` and its interaction with `Connection`. @@ -1882,7 +1848,7 @@ class TestServerNameCallback(object): assert args == [(server, b"foo1.example.com")] -class TestApplicationLayerProtoNegotiation(object): +class TestApplicationLayerProtoNegotiation: """ Tests for ALPN in PyOpenSSL. """ @@ -1927,14 +1893,13 @@ class TestApplicationLayerProtoNegotiation(object): assert server.get_alpn_proto_negotiated() == b"spdy/2" assert client.get_alpn_proto_negotiated() == b"spdy/2" - @pytest.mark.xfail(reason='https://github.com/pyca/pyopenssl/issues/1043') def test_alpn_call_failure(self): """ SSL_CTX_set_alpn_protos does not like to be called with an empty protocols list. Ensure that we produce a user-visible error. """ context = Context(SSLv23_METHOD) - with pytest.raises(Error): + with pytest.raises(ValueError): context.set_alpn_protos([]) def test_alpn_set_on_connection(self): @@ -2066,7 +2031,7 @@ class TestApplicationLayerProtoNegotiation(object): def invalid_cb(conn, options): invalid_cb_args.append((conn, options)) - return u"can't return unicode" + return "can't return unicode" client_context = Context(SSLv23_METHOD) client_context.set_alpn_protos([b"http/1.1", b"spdy/2"]) @@ -2163,7 +2128,7 @@ class TestApplicationLayerProtoNegotiation(object): assert select_args == [(server, [b"http/1.1", b"spdy/2"])] -class TestSession(object): +class TestSession: """ Unit tests for :py:obj:`OpenSSL.SSL.Session`. """ @@ -2177,7 +2142,77 @@ class TestSession(object): assert isinstance(new_session, Session) -class TestConnection(object): [email protected](params=["context", "connection"]) +def ctx_or_conn(request) -> Union[Context, Connection]: + ctx = Context(SSLv23_METHOD) + if request.param == "context": + return ctx + else: + return Connection(ctx, None) + + +class TestContextConnection: + """ + Unit test for methods that are exposed both by Connection and Context + objects. + """ + + def test_use_privatekey(self, ctx_or_conn): + """ + `use_privatekey` takes an `OpenSSL.crypto.PKey` instance. + """ + key = PKey() + key.generate_key(TYPE_RSA, 1024) + + ctx_or_conn.use_privatekey(key) + with pytest.raises(TypeError): + ctx_or_conn.use_privatekey("") + + def test_use_privatekey_wrong_key(self, ctx_or_conn): + """ + `use_privatekey` raises `OpenSSL.SSL.Error` when passed a + `OpenSSL.crypto.PKey` instance which has not been initialized. + """ + key = PKey() + key.generate_key(TYPE_RSA, 1024) + ctx_or_conn.use_certificate( + load_certificate(FILETYPE_PEM, root_cert_pem) + ) + with pytest.raises(Error): + ctx_or_conn.use_privatekey(key) + + def test_use_certificate(self, ctx_or_conn): + """ + `use_certificate` sets the certificate which will be + used to identify connections created using the context. + """ + # TODO + # Hard to assert anything. But we could set a privatekey then ask + # OpenSSL if the cert and key agree using check_privatekey. Then as + # long as check_privatekey works right we're good... + ctx_or_conn.use_certificate( + load_certificate(FILETYPE_PEM, root_cert_pem) + ) + + def test_use_certificate_wrong_args(self, ctx_or_conn): + """ + `use_certificate_wrong_args` raises `TypeError` when not passed + exactly one `OpenSSL.crypto.X509` instance as an argument. + """ + with pytest.raises(TypeError): + ctx_or_conn.use_certificate("hello, world") + + def test_use_certificate_uninitialized(self, ctx_or_conn): + """ + `use_certificate` raises `OpenSSL.SSL.Error` when passed a + `OpenSSL.crypto.X509` instance which has not been initialized + (ie, which does not actually have any certificate data). + """ + with pytest.raises(Error): + ctx_or_conn.use_certificate(X509()) + + +class TestConnection: """ Unit tests for `OpenSSL.SSL.Connection`. """ @@ -2233,7 +2268,7 @@ class TestConnection(object): connection.bio_write(b"xy") connection.bio_write(bytearray(b"za")) with pytest.warns(DeprecationWarning): - connection.bio_write(u"deprecated") + connection.bio_write("deprecated") def test_get_context(self): """ @@ -2286,10 +2321,8 @@ class TestConnection(object): with pytest.raises(TypeError): conn.set_tlsext_host_name(b"with\0null") - if not PY2: - # On Python 3.x, don't accidentally implicitly convert from text. - with pytest.raises(TypeError): - conn.set_tlsext_host_name(b"example.com".decode("ascii")) + with pytest.raises(TypeError): + conn.set_tlsext_host_name(b"example.com".decode("ascii")) def test_pending(self): """ @@ -2629,6 +2662,52 @@ class TestConnection(object): server = Connection(ctx, None) assert None is server.get_verified_chain() + def test_set_verify_overrides_context(self): + context = Context(SSLv23_METHOD) + context.set_verify(VERIFY_PEER) + conn = Connection(context, None) + conn.set_verify(VERIFY_NONE) + + assert context.get_verify_mode() == VERIFY_PEER + assert conn.get_verify_mode() == VERIFY_NONE + + with pytest.raises(TypeError): + conn.set_verify(None) + + with pytest.raises(TypeError): + conn.set_verify(VERIFY_PEER, "not a callable") + + def test_set_verify_callback_reference(self): + """ + The callback for certificate verification should only be forgotten if + the context and all connections created by it do not use it anymore. + """ + + def callback(conn, cert, errnum, depth, ok): # pragma: no cover + return ok + + tracker = ref(callback) + + context = Context(SSLv23_METHOD) + context.set_verify(VERIFY_PEER, callback) + del callback + + conn = Connection(context, None) + context.set_verify(VERIFY_NONE) + + collect() + collect() + assert tracker() + + conn.set_verify(VERIFY_PEER, lambda conn, cert, errnum, depth, ok: ok) + collect() + collect() + callback = tracker() + if callback is not None: # pragma: nocover + referrers = get_referrers(callback) + if len(referrers) > 1: + pytest.fail("Some references remain: %r" % (referrers,)) + def test_get_session_unconnected(self): """ `Connection.get_session` returns `None` when used with an object @@ -2859,8 +2938,8 @@ class TestConnection(object): client.get_cipher_name(), ) - assert isinstance(server_cipher_name, text_type) - assert isinstance(client_cipher_name, text_type) + assert isinstance(server_cipher_name, str) + assert isinstance(client_cipher_name, str) assert server_cipher_name == client_cipher_name @@ -2884,8 +2963,8 @@ class TestConnection(object): client.get_cipher_version(), ) - assert isinstance(server_cipher_version, text_type) - assert isinstance(client_cipher_version, text_type) + assert isinstance(server_cipher_version, str) + assert isinstance(client_cipher_version, str) assert server_cipher_version == client_cipher_version @@ -2923,8 +3002,8 @@ class TestConnection(object): client_protocol_version_name = client.get_protocol_version_name() server_protocol_version_name = server.get_protocol_version_name() - assert isinstance(server_protocol_version_name, text_type) - assert isinstance(client_protocol_version_name, text_type) + assert isinstance(server_protocol_version_name, str) + assert isinstance(client_protocol_version_name, str) assert server_protocol_version_name == client_protocol_version_name @@ -2979,7 +3058,7 @@ class TestConnection(object): assert 2 == len(data) -class TestConnectionGetCipherList(object): +class TestConnectionGetCipherList: """ Tests for `Connection.get_cipher_list`. """ @@ -3002,10 +3081,10 @@ class VeryLarge(bytes): """ def __len__(self): - return 2 ** 31 + return 2**31 -class TestConnectionSend(object): +class TestConnectionSend: """ Tests for `Connection.send`. """ @@ -3067,20 +3146,8 @@ class TestConnectionSend(object): assert count == 2 assert client.recv(2) == b"xy" - @skip_if_py3 - def test_short_buffer(self): - """ - When passed a buffer containing a small number of bytes, - `Connection.send` transmits all of them and returns the number - of bytes sent. - """ - server, client = loopback() - count = server.send(buffer(b"xy")) # noqa: F821 - assert count == 2 - assert client.recv(2) == b"xy" - @pytest.mark.skipif( - sys.maxsize < 2 ** 31, + sys.maxsize < 2**31, reason="sys.maxsize < 2**31 - test requires 64 bit", ) def test_buf_too_large(self): @@ -3103,7 +3170,7 @@ def _make_memoryview(size): return memoryview(bytearray(size)) -class TestConnectionRecvInto(object): +class TestConnectionRecvInto: """ Tests for `Connection.recv_into`. """ @@ -3226,7 +3293,7 @@ class TestConnectionRecvInto(object): self._doesnt_overfill_test(_make_memoryview) -class TestConnectionSendall(object): +class TestConnectionSendall: """ Tests for `Connection.sendall`. """ @@ -3274,17 +3341,6 @@ class TestConnectionSendall(object): server.sendall(memoryview(b"x")) assert client.recv(1) == b"x" - @skip_if_py3 - def test_short_buffers(self): - """ - When passed a buffer containing a small number of bytes, - `Connection.sendall` transmits all of them. - """ - server, client = loopback() - count = server.sendall(buffer(b"xy")) # noqa: F821 - assert count == 2 - assert client.recv(2) == b"xy" - def test_long(self): """ `Connection.sendall` transmits all the bytes in the string passed to it @@ -3319,7 +3375,7 @@ class TestConnectionSendall(object): assert err.value.args[0] == EPIPE -class TestConnectionRenegotiate(object): +class TestConnectionRenegotiate: """ Tests for SSL renegotiation APIs. """ @@ -3363,7 +3419,7 @@ class TestConnectionRenegotiate(object): pass -class TestError(object): +class TestError: """ Unit tests for `OpenSSL.SSL.Error`. """ @@ -3376,7 +3432,7 @@ class TestError(object): assert Error.__name__ == "Error" -class TestConstants(object): +class TestConstants: """ Tests for the values of constants exposed in `OpenSSL.SSL`. @@ -3493,7 +3549,7 @@ class TestConstants(object): assert 0x300 == SESS_CACHE_NO_INTERNAL -class TestMemoryBIO(object): +class TestMemoryBIO: """ Tests for `OpenSSL.SSL.Connection` using a memory BIO. """ @@ -3666,7 +3722,7 @@ class TestMemoryBIO(object): interact_in_memory(client, server) - size = 2 ** 15 + size = 2**15 sent = client.send(b"x" * size) # Sanity check. We're trying to test what happens when the entire # input can't be sent. If the entire input was sent, this test is @@ -3917,7 +3973,7 @@ class TestMemoryBIO(object): self._check_client_ca_list(set_replaces_add_ca) -class TestInfoConstants(object): +class TestInfoConstants: """ Tests for assorted constants exposed for use in info callbacks. """ @@ -3960,7 +4016,7 @@ class TestInfoConstants(object): assert const is None or isinstance(const, int) -class TestRequires(object): +class TestRequires: """ Tests for the decorator factory used to conditionally raise NotImplementedError when older OpenSSLs are used. @@ -3999,7 +4055,7 @@ class TestRequires(object): assert "Error text" in str(e.value) -class TestOCSP(object): +class TestOCSP: """ Tests for PyOpenSSL's OCSP stapling support. """ @@ -4243,3 +4299,188 @@ class TestOCSP(object): with pytest.raises(TypeError): handshake_in_memory(client, server) + + +class TestDTLS: + # The way you would expect DTLSv1_listen to work is: + # + # - it reads packets in a loop + # - when it finds a valid ClientHello, it returns + # - now the handshake can proceed + # + # However, on older versions of OpenSSL, it did something "cleverer". The + # way it worked is: + # + # - it "peeks" into the BIO to see the next packet without consuming it + # - if *not* a valid ClientHello, then it reads the packet to consume it + # and loops around + # - if it *is* a valid ClientHello, it *leaves the packet in the BIO*, and + # returns + # - then the handshake finds the ClientHello in the BIO and reads it a + # second time. + # + # I'm not sure exactly when this switched over. The OpenSSL v1.1.1 in + # Ubuntu 18.04 has the old behavior. The OpenSSL v1.1.1 in Ubuntu 20.04 has + # the new behavior. There doesn't seem to be any mention of this change in + # the OpenSSL v1.1.1 changelog, but presumably it changed in some point + # release or another. Presumably in 2025 or so there will be only new + # OpenSSLs around we can delete this whole comment and the weird + # workaround. If anyone is still using this library by then, which seems + # both depressing and inevitable. + # + # Anyway, why do we care? The reason is that the old strategy has a + # problem: the "peek" operation is only defined on "DGRAM BIOs", which are + # a special type of object that is different from the more familiar "socket + # BIOs" and "memory BIOs". If you *don't* have a DGRAM BIO, and you try to + # peek into the BIO... then it silently degrades to a full-fledged "read" + # operation that consumes the packet. Which is a problem if your algorithm + # depends on leaving the packet in the BIO to be read again later. + # + # So on old OpenSSL, we have a problem: + # + # - we can't use a DGRAM BIO, because cryptography/pyopenssl don't wrap the + # relevant APIs, nor should they. + # + # - if we use a socket BIO, then the first time DTLSv1_listen sees an + # invalid packet (like for example... the challenge packet that *every + # DTLS handshake starts with before the real ClientHello!*), it tries to + # first "peek" it, and then "read" it. But since the first "peek" + # consumes the packet, the second "read" ends up hanging or consuming + # some unrelated packet, which is undesirable. So you can't even get to + # the handshake stage successfully. + # + # - if we use a memory BIO, then DTLSv1_listen works OK on invalid packets + # -- first the "peek" consumes them, and then it tries to "read" again to + # consume them, which fails immediately, and OpenSSL ignores the failure. + # So it works by accident. BUT, when we get a valid ClientHello, we have + # a problem: DTLSv1_listen tries to "peek" it and then leave it in the + # read BIO for do_handshake to consume. But instead "peek" consumes the + # packet, so it's not there where do_handshake is expecting it, and the + # handshake fails. + # + # Fortunately (if that's the word), we can work around the memory BIO + # problem. (Which is good, because in real life probably all our users will + # be using memory BIOs.) All we have to do is to save the valid ClientHello + # before calling DTLSv1_listen, and then after it returns we push *a second + # copy of it* of the packet memory BIO before calling do_handshake. This + # fakes out OpenSSL and makes it think the "peek" operation worked + # correctly, and we can go on with our lives. + # + # In fact, we push the second copy of the ClientHello unconditionally. On + # new versions of OpenSSL, this is unnecessary, but harmless, because the + # DTLS state machine treats it like a network hiccup that duplicated a + # packet, which DTLS is robust against. + def test_it_works_at_all(self): + # arbitrary number larger than any conceivable handshake volley + LARGE_BUFFER = 65536 + + s_ctx = Context(DTLS_METHOD) + + def generate_cookie(ssl): + return b"xyzzy" + + def verify_cookie(ssl, cookie): + return cookie == b"xyzzy" + + s_ctx.set_cookie_generate_callback(generate_cookie) + s_ctx.set_cookie_verify_callback(verify_cookie) + s_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem)) + s_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem)) + s_ctx.set_options(OP_NO_QUERY_MTU) + s = Connection(s_ctx) + s.set_accept_state() + + c_ctx = Context(DTLS_METHOD) + c_ctx.set_options(OP_NO_QUERY_MTU) + c = Connection(c_ctx) + c.set_connect_state() + + # These are mandatory, because openssl can't guess the MTU for a memory + # bio and will produce a mysterious error if you make it try. + c.set_ciphertext_mtu(1500) + s.set_ciphertext_mtu(1500) + + latest_client_hello = None + + def pump_membio(label, source, sink): + try: + chunk = source.bio_read(LARGE_BUFFER) + except WantReadError: + return False + # I'm not sure this check is needed, but I'm not sure it's *not* + # needed either: + if not chunk: # pragma: no cover + return False + # Gross hack: if this is a ClientHello, save it so we can find it + # later. See giant comment above. + try: + # if ContentType == handshake and HandshakeType == + # client_hello: + if chunk[0] == 22 and chunk[13] == 1: + nonlocal latest_client_hello + latest_client_hello = chunk + except IndexError: # pragma: no cover + pass + print(f"{label}: {chunk.hex()}") + sink.bio_write(chunk) + return True + + def pump(): + # Raises if there was no data to pump, to avoid infinite loops if + # we aren't making progress. + assert pump_membio("s -> c", s, c) or pump_membio("c -> s", c, s) + + c_handshaking = True + s_listening = True + s_handshaking = False + first = True + while c_handshaking or s_listening or s_handshaking: + if not first: + pump() + first = False + + if c_handshaking: + try: + c.do_handshake() + except WantReadError: + pass + else: + c_handshaking = False + + if s_listening: + try: + s.DTLSv1_listen() + except WantReadError: + pass + else: + s_listening = False + s_handshaking = True + # Write the duplicate ClientHello. See giant comment above. + s.bio_write(latest_client_hello) + + if s_handshaking: + try: + s.do_handshake() + except WantReadError: + pass + else: + s_handshaking = False + + s.write(b"hello") + pump() + assert c.read(100) == b"hello" + c.write(b"goodbye") + pump() + assert s.read(100) == b"goodbye" + + # Check that the MTU set/query functions are doing *something* + c.set_ciphertext_mtu(1000) + try: + assert 500 < c.get_cleartext_mtu() < 1000 + except NotImplementedError: # OpenSSL 1.1.0 and earlier + pass + c.set_ciphertext_mtu(500) + try: + assert 0 < c.get_cleartext_mtu() < 500 + except NotImplementedError: # OpenSSL 1.1.0 and earlier + pass diff --git a/contrib/python/pyOpenSSL/py3/tests/test_util.py b/contrib/python/pyOpenSSL/py3/tests/test_util.py index 6224448d433..e029d71a719 100644 --- a/contrib/python/pyOpenSSL/py3/tests/test_util.py +++ b/contrib/python/pyOpenSSL/py3/tests/test_util.py @@ -3,7 +3,7 @@ import pytest from OpenSSL._util import exception_from_error_queue, lib -class TestErrors(object): +class TestErrors: """ Tests for handling of certain OpenSSL error cases. """ diff --git a/contrib/python/pyOpenSSL/py3/tests/util.py b/contrib/python/pyOpenSSL/py3/tests/util.py index 75d2c8de80e..b4649e33992 100644 --- a/contrib/python/pyOpenSSL/py3/tests/util.py +++ b/contrib/python/pyOpenSSL/py3/tests/util.py @@ -6,8 +6,6 @@ Helpers for the OpenSSL test suite, largely copied from U{Twisted<http://twistedmatrix.com/>}. """ -from six import PY2 - # This is the UTF-8 encoding of the SNOWMAN unicode code point. NON_ASCII = b"\xe2\x98\x83".decode("utf-8") @@ -32,7 +30,7 @@ def is_consistent_type(theType, name, *constructionArgs): return True -class EqualityTestsMixin(object): +class EqualityTestsMixin: """ A mixin defining tests for the standard implementation of C{==} and C{!=}. """ @@ -128,7 +126,7 @@ class EqualityTestsMixin(object): operand if it is of an unrelated type. """ - class Delegate(object): + class Delegate: def __eq__(self, other): # Do something crazy and obvious. return [self] @@ -143,7 +141,7 @@ class EqualityTestsMixin(object): operand if it is of an unrelated type. """ - class Delegate(object): + class Delegate: def __ne__(self, other): # Do something crazy and obvious. return [self] @@ -154,7 +152,4 @@ class EqualityTestsMixin(object): # The type name expected in warnings about using the wrong string type. -if PY2: - WARNING_TYPE_EXPECTED = "unicode" -else: - WARNING_TYPE_EXPECTED = "str" +WARNING_TYPE_EXPECTED = "str" diff --git a/contrib/python/pyOpenSSL/py3/ya.make b/contrib/python/pyOpenSSL/py3/ya.make index ff8a0f627a6..eeab889714e 100644 --- a/contrib/python/pyOpenSSL/py3/ya.make +++ b/contrib/python/pyOpenSSL/py3/ya.make @@ -4,13 +4,12 @@ PY3_LIBRARY() SUBSCRIBER(g:python-contrib) -VERSION(21.0.0) +VERSION(23.0.0) LICENSE(Apache-2.0) PEERDIR( - contrib/python/cryptography - contrib/python/six + contrib/python/cryptography/py3 ) NO_LINT() |
