diff options
| author | YDBot <[email protected]> | 2026-07-01 16:23:57 +0000 |
|---|---|---|
| committer | YDBot <[email protected]> | 2026-07-01 16:23:57 +0000 |
| commit | e9b445a7f111ee5ba5c58672d767833df25ba262 (patch) | |
| tree | 1b8bdb84d3fd994ff3ac60e36c30f93fbfce1243 /contrib/python/pyOpenSSL/py3/OpenSSL/crypto.py | |
| parent | 1cfcce4de55cd075cfba845abf02c1554f141e01 (diff) | |
| parent | c98b5dbfe575ba628d7c3427854e7bd26030eb2c (diff) | |
Merge pull request #44637 from ydb-platform/merge-rightlib-260626-1120
Diffstat (limited to 'contrib/python/pyOpenSSL/py3/OpenSSL/crypto.py')
| -rw-r--r-- | contrib/python/pyOpenSSL/py3/OpenSSL/crypto.py | 777 |
1 files changed, 425 insertions, 352 deletions
diff --git a/contrib/python/pyOpenSSL/py3/OpenSSL/crypto.py b/contrib/python/pyOpenSSL/py3/OpenSSL/crypto.py index eda4af6f9d9..a9b673c53f5 100644 --- a/contrib/python/pyOpenSSL/py3/OpenSSL/crypto.py +++ b/contrib/python/pyOpenSSL/py3/OpenSSL/crypto.py @@ -1,31 +1,44 @@ import calendar import datetime - +import functools from base64 import b16encode from functools import partial -from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__ - -from six import ( - integer_types as _integer_types, - text_type as _text_type, - PY2 as _PY2, +from os import PathLike +from typing import ( + Any, + Callable, + Iterable, + List, + NoReturn, + Optional, + Sequence, + Set, + Tuple, + Type, + Union, ) from cryptography import utils, x509 -from cryptography.hazmat.primitives.asymmetric import dsa, rsa +from cryptography.hazmat.primitives.asymmetric import ( + dsa, + ec, + ed25519, + ed448, + rsa, +) from OpenSSL._util import ( + UNSPECIFIED as _UNSPECIFIED, + byte_string as _byte_string, + exception_from_error_queue as _exception_from_error_queue, ffi as _ffi, lib as _lib, - exception_from_error_queue as _exception_from_error_queue, - byte_string as _byte_string, - native as _native, - path_string as _path_string, - UNSPECIFIED as _UNSPECIFIED, - text_to_bytes_and_warn as _text_to_bytes_and_warn, make_assert as _make_assert, + path_bytes as _path_bytes, + text_to_bytes_and_warn as _text_to_bytes_and_warn, ) + __all__ = [ "FILETYPE_PEM", "FILETYPE_ASN1", @@ -65,16 +78,24 @@ __all__ = [ "load_pkcs12", ] -FILETYPE_PEM = _lib.SSL_FILETYPE_PEM -FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1 + +_Key = Union[ + dsa.DSAPrivateKey, dsa.DSAPublicKey, rsa.RSAPrivateKey, rsa.RSAPublicKey +] +StrOrBytesPath = Union[str, bytes, PathLike] +PassphraseCallableT = Union[bytes, Callable[..., bytes]] + + +FILETYPE_PEM: int = _lib.SSL_FILETYPE_PEM +FILETYPE_ASN1: int = _lib.SSL_FILETYPE_ASN1 # TODO This was an API mistake. OpenSSL has no such constant. -FILETYPE_TEXT = 2 ** 16 - 1 +FILETYPE_TEXT = 2**16 - 1 -TYPE_RSA = _lib.EVP_PKEY_RSA -TYPE_DSA = _lib.EVP_PKEY_DSA -TYPE_DH = _lib.EVP_PKEY_DH -TYPE_EC = _lib.EVP_PKEY_EC +TYPE_RSA: int = _lib.EVP_PKEY_RSA +TYPE_DSA: int = _lib.EVP_PKEY_DSA +TYPE_DH: int = _lib.EVP_PKEY_DH +TYPE_EC: int = _lib.EVP_PKEY_EC class Error(Exception): @@ -87,20 +108,7 @@ _raise_current_error = partial(_exception_from_error_queue, Error) _openssl_assert = _make_assert(Error) -def _get_backend(): - """ - Importing the backend from cryptography has the side effect of activating - the osrandom engine. This mutates the global state of OpenSSL in the - process and causes issues for various programs that use subinterpreters or - embed Python. By putting the import in this function we can avoid - triggering this side effect unless _get_backend is called. - """ - from cryptography.hazmat.backends.openssl.backend import backend - - return backend - - -def _untested_error(where): +def _untested_error(where: str) -> NoReturn: """ An OpenSSL API failed somehow. Additionally, the failure which was encountered isn't one that's exercised by the test suite so future behavior @@ -109,7 +117,7 @@ def _untested_error(where): raise RuntimeError("Unknown %s failure" % (where,)) -def _new_mem_buf(buffer=None): +def _new_mem_buf(buffer: Optional[bytes] = None) -> Any: """ Allocate a new OpenSSL memory BIO. @@ -126,7 +134,7 @@ def _new_mem_buf(buffer=None): bio = _lib.BIO_new_mem_buf(data, len(buffer)) # Keep the memory alive as long as the bio is alive! - def free(bio, ref=data): + def free(bio: Any, ref: Any = data) -> Any: return _lib.BIO_free(bio) _openssl_assert(bio != _ffi.NULL) @@ -135,7 +143,7 @@ def _new_mem_buf(buffer=None): return bio -def _bio_to_string(bio): +def _bio_to_string(bio: Any) -> bytes: """ Copy the contents of an OpenSSL BIO object into a Python byte string. """ @@ -144,7 +152,7 @@ def _bio_to_string(bio): return _ffi.buffer(result_buffer[0], buffer_length)[:] -def _set_asn1_time(boundary, when): +def _set_asn1_time(boundary: Any, when: bytes) -> None: """ The the time value of an ASN1 time object. @@ -160,13 +168,35 @@ def _set_asn1_time(boundary, when): """ if not isinstance(when, bytes): raise TypeError("when must be a byte string") + # ASN1_TIME_set_string validates the string without writing anything + # when the destination is NULL. + _openssl_assert(boundary != _ffi.NULL) set_result = _lib.ASN1_TIME_set_string(boundary, when) if set_result == 0: raise ValueError("Invalid string") -def _get_asn1_time(timestamp): +def _new_asn1_time(when: bytes) -> Any: + """ + Behaves like _set_asn1_time but returns a new ASN1_TIME object. + + @param when: A string representation of the desired time value. + + @raise TypeError: If C{when} is not a L{bytes} string. + @raise ValueError: If C{when} does not represent a time in the required + format. + @raise RuntimeError: If the time value cannot be set for some other + (unspecified) reason. + """ + ret = _lib.ASN1_TIME_new() + _openssl_assert(ret != _ffi.NULL) + ret = _ffi.gc(ret, _lib.ASN1_TIME_free) + _set_asn1_time(ret, when) + return ret + + +def _get_asn1_time(timestamp: Any) -> Optional[bytes]: """ Retrieve the time value of an ASN1 time object. @@ -182,7 +212,7 @@ def _get_asn1_time(timestamp): elif ( _lib.ASN1_STRING_type(string_timestamp) == _lib.V_ASN1_GENERALIZEDTIME ): - return _ffi.string(_lib.ASN1_STRING_data(string_timestamp)) + return _ffi.string(_lib.ASN1_STRING_get0_data(string_timestamp)) else: generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**") _lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp) @@ -201,26 +231,26 @@ def _get_asn1_time(timestamp): string_timestamp = _ffi.cast( "ASN1_STRING*", generalized_timestamp[0] ) - string_data = _lib.ASN1_STRING_data(string_timestamp) + string_data = _lib.ASN1_STRING_get0_data(string_timestamp) string_result = _ffi.string(string_data) _lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0]) return string_result -class _X509NameInvalidator(object): - def __init__(self): - self._names = [] +class _X509NameInvalidator: + def __init__(self) -> None: + self._names: List[X509Name] = [] - def add(self, name): + def add(self, name: "X509Name") -> None: self._names.append(name) - def clear(self): + def clear(self) -> None: for name in self._names: # Breaks the object, but also prevents UAF! del name._name -class PKey(object): +class PKey: """ A class representing an DSA or RSA public key or key pair. """ @@ -228,12 +258,12 @@ class PKey(object): _only_public = False _initialized = True - def __init__(self): + def __init__(self) -> None: pkey = _lib.EVP_PKEY_new() self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free) self._initialized = False - def to_cryptography_key(self): + def to_cryptography_key(self) -> _Key: """ Export as a ``cryptography`` key. @@ -249,16 +279,15 @@ class PKey(object): load_der_public_key, ) - backend = _get_backend() if self._only_public: der = dump_publickey(FILETYPE_ASN1, self) - return load_der_public_key(der, backend) + return load_der_public_key(der) else: der = dump_privatekey(FILETYPE_ASN1, self) - return load_der_private_key(der, None, backend) + return load_der_private_key(der, None) @classmethod - def from_cryptography_key(cls, crypto_key): + def from_cryptography_key(cls, crypto_key: _Key) -> "PKey": """ Construct based on a ``cryptography`` *crypto_key*. @@ -276,6 +305,9 @@ class PKey(object): rsa.RSAPrivateKey, dsa.DSAPublicKey, dsa.DSAPrivateKey, + ec.EllipticCurvePrivateKey, + ed25519.Ed25519PrivateKey, + ed448.Ed448PrivateKey, ), ): raise TypeError("Unsupported key type") @@ -300,7 +332,7 @@ class PKey(object): ) return load_privatekey(FILETYPE_ASN1, der) - def generate_key(self, type, bits): + def generate_key(self, type: int, bits: int) -> None: """ Generate a key pair of the given type, with the given number of bits. @@ -356,7 +388,7 @@ class PKey(object): self._initialized = True - def check(self): + def check(self) -> bool: """ Check the consistency of an RSA private key. @@ -373,7 +405,7 @@ class PKey(object): raise TypeError("public key only") if _lib.EVP_PKEY_type(self.type()) != _lib.EVP_PKEY_RSA: - raise TypeError("key type unsupported") + raise TypeError("Only RSA keys can currently be checked.") rsa = _lib.EVP_PKEY_get1_RSA(self._pkey) rsa = _ffi.gc(rsa, _lib.RSA_free) @@ -382,7 +414,7 @@ class PKey(object): return True _raise_current_error() - def type(self): + def type(self) -> int: """ Returns the type of the key @@ -390,7 +422,7 @@ class PKey(object): """ return _lib.EVP_PKEY_id(self._pkey) - def bits(self): + def bits(self) -> int: """ Returns the number of bits of the key @@ -399,7 +431,7 @@ class PKey(object): return _lib.EVP_PKEY_bits(self._pkey) -class _EllipticCurve(object): +class _EllipticCurve: """ A representation of a supported elliptic curve. @@ -411,21 +443,19 @@ class _EllipticCurve(object): _curves = None - if not _PY2: - # This only necessary on Python 3. Moreover, it is broken on Python 2. - def __ne__(self, other): - """ - Implement cooperation with the right-hand side argument of ``!=``. + def __ne__(self, other: Any) -> bool: + """ + Implement cooperation with the right-hand side argument of ``!=``. - Python 3 seems to have dropped this cooperation in this very narrow - circumstance. - """ - if isinstance(other, _EllipticCurve): - return super(_EllipticCurve, self).__ne__(other) - return NotImplemented + Python 3 seems to have dropped this cooperation in this very narrow + circumstance. + """ + if isinstance(other, _EllipticCurve): + return super(_EllipticCurve, self).__ne__(other) + return NotImplemented @classmethod - def _load_elliptic_curves(cls, lib): + def _load_elliptic_curves(cls, lib: Any) -> Set["_EllipticCurve"]: """ Get the curves supported by OpenSSL. @@ -443,7 +473,7 @@ class _EllipticCurve(object): return set(cls.from_nid(lib, c.nid) for c in builtin_curves) @classmethod - def _get_elliptic_curves(cls, lib): + def _get_elliptic_curves(cls, lib: Any) -> Set["_EllipticCurve"]: """ Get, cache, and return the curves supported by OpenSSL. @@ -457,7 +487,7 @@ class _EllipticCurve(object): return cls._curves @classmethod - def from_nid(cls, lib, nid): + def from_nid(cls, lib: Any, nid: int) -> "_EllipticCurve": """ Instantiate a new :py:class:`_EllipticCurve` associated with the given OpenSSL NID. @@ -473,7 +503,7 @@ class _EllipticCurve(object): """ return cls(lib, nid, _ffi.string(lib.OBJ_nid2sn(nid)).decode("ascii")) - def __init__(self, lib, nid, name): + def __init__(self, lib: Any, nid: int, name: str) -> None: """ :param _lib: The :py:mod:`cryptography` binding instance used to interface with OpenSSL. @@ -490,10 +520,10 @@ class _EllipticCurve(object): self._nid = nid self.name = name - def __repr__(self): + def __repr__(self) -> str: return "<Curve %r>" % (self.name,) - def _to_EC_KEY(self): + def _to_EC_KEY(self) -> Any: """ Create a new OpenSSL EC_KEY structure initialized to use this curve. @@ -504,7 +534,7 @@ class _EllipticCurve(object): return _ffi.gc(key, _lib.EC_KEY_free) -def get_elliptic_curves(): +def get_elliptic_curves() -> Set["_EllipticCurve"]: """ Return a set of objects representing the elliptic curves supported in the OpenSSL build in use. @@ -519,7 +549,7 @@ def get_elliptic_curves(): return _EllipticCurve._get_elliptic_curves(_lib) -def get_elliptic_curve(name): +def get_elliptic_curve(name: str) -> _EllipticCurve: """ Return a single curve object selected by name. @@ -537,7 +567,8 @@ def get_elliptic_curve(name): raise ValueError("unknown curve name", name) -class X509Name(object): [email protected]_ordering +class X509Name: """ An X.509 Distinguished Name. @@ -562,7 +593,7 @@ class X509Name(object): :ivar emailAddress: The e-mail address of the entity. """ - def __init__(self, name): + def __init__(self, name: "X509Name") -> None: """ Create a new X509Name, copying the given X509Name instance. @@ -570,9 +601,9 @@ class X509Name(object): :type name: :py:class:`X509Name` """ name = _lib.X509_NAME_dup(name._name) - self._name = _ffi.gc(name, _lib.X509_NAME_free) + self._name: Any = _ffi.gc(name, _lib.X509_NAME_free) - def __setattr__(self, name, value): + def __setattr__(self, name: str, value: Any) -> None: if name.startswith("_"): return super(X509Name, self).__setattr__(name, value) @@ -602,7 +633,7 @@ class X509Name(object): _lib.X509_NAME_ENTRY_free(ent) break - if isinstance(value, _text_type): + if isinstance(value, str): value = value.encode("utf-8") add_result = _lib.X509_NAME_add_entry_by_NID( @@ -611,7 +642,7 @@ class X509Name(object): if not add_result: _raise_current_error() - def __getattr__(self, name): + def __getattr__(self, name: str) -> Optional[str]: """ Find attribute. An X509Name object has the following attributes: countryName (alias C), stateOrProvince (alias ST), locality (alias L), @@ -629,7 +660,7 @@ class X509Name(object): _raise_current_error() except Error: pass - return super(X509Name, self).__getattr__(name) + raise AttributeError("No such attribute") entry_index = _lib.X509_NAME_get_index_by_NID(self._name, nid, -1) if entry_index == -1: @@ -651,25 +682,19 @@ class X509Name(object): _lib.OPENSSL_free(result_buffer[0]) return result - def _cmp(op): - def f(self, other): - if not isinstance(other, X509Name): - return NotImplemented - result = _lib.X509_NAME_cmp(self._name, other._name) - return op(result, 0) - - return f + def __eq__(self, other: Any) -> bool: + if not isinstance(other, X509Name): + return NotImplemented - __eq__ = _cmp(__eq__) - __ne__ = _cmp(__ne__) + return _lib.X509_NAME_cmp(self._name, other._name) == 0 - __lt__ = _cmp(__lt__) - __le__ = _cmp(__le__) + def __lt__(self, other: Any) -> bool: + if not isinstance(other, X509Name): + return NotImplemented - __gt__ = _cmp(__gt__) - __ge__ = _cmp(__ge__) + return _lib.X509_NAME_cmp(self._name, other._name) < 0 - def __repr__(self): + def __repr__(self) -> str: """ String representation of an X509Name """ @@ -680,10 +705,10 @@ class X509Name(object): _openssl_assert(format_result != _ffi.NULL) return "<X509Name object '%s'>" % ( - _native(_ffi.string(result_buffer)), + _ffi.string(result_buffer).decode("utf-8"), ) - def hash(self): + def hash(self) -> int: """ Return an integer representation of the first four bytes of the MD5 digest of the DER representation of the name. @@ -695,7 +720,7 @@ class X509Name(object): """ return _lib.X509_NAME_hash(self._name) - def der(self): + def der(self) -> bytes: """ Return the DER encoding of this name. @@ -710,7 +735,7 @@ class X509Name(object): _lib.OPENSSL_free(result_buffer[0]) return string_result - def get_components(self): + def get_components(self) -> List[Tuple[bytes, bytes]]: """ Returns the components of this name, as a sequence of 2-tuples. @@ -730,19 +755,26 @@ class X509Name(object): # ffi.string does not handle strings containing NULL bytes # (which may have been generated by old, broken software) value = _ffi.buffer( - _lib.ASN1_STRING_data(fval), _lib.ASN1_STRING_length(fval) + _lib.ASN1_STRING_get0_data(fval), _lib.ASN1_STRING_length(fval) )[:] result.append((_ffi.string(name), value)) return result -class X509Extension(object): +class X509Extension: """ An X.509 v3 certificate extension. """ - def __init__(self, type_name, critical, value, subject=None, issuer=None): + def __init__( + self, + type_name: bytes, + critical: bool, + value: bytes, + subject: Optional["X509"] = None, + issuer: Optional["X509"] = None, + ) -> None: """ Initializes an X509 extension. @@ -752,7 +784,8 @@ class X509Extension(object): :param bool critical: A flag indicating whether this is a critical extension. - :param value: The value of the extension. + :param value: The OpenSSL textual representation of the extension's + value. :type value: :py:data:`bytes` :param subject: Optional X509 certificate to use as subject. @@ -804,7 +837,7 @@ class X509Extension(object): self._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free) @property - def _nid(self): + def _nid(self) -> Any: return _lib.OBJ_obj2nid( _lib.X509_EXTENSION_get_object(self._extension) ) @@ -815,7 +848,7 @@ class X509Extension(object): _lib.GEN_URI: "URI", } - def _subjectAltNameString(self): + def _subjectAltNameString(self) -> str: names = _ffi.cast( "GENERAL_NAMES*", _lib.X509V3_EXT_d2i(self._extension) ) @@ -829,15 +862,15 @@ class X509Extension(object): except KeyError: bio = _new_mem_buf() _lib.GENERAL_NAME_print(bio, name) - parts.append(_native(_bio_to_string(bio))) + parts.append(_bio_to_string(bio).decode("utf-8")) else: - value = _native( - _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:] - ) + value = _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[ + : + ].decode("utf-8") parts.append(label + ":" + value) return ", ".join(parts) - def __str__(self): + def __str__(self) -> str: """ :return: a nice text representation of the extension """ @@ -848,9 +881,9 @@ class X509Extension(object): print_result = _lib.X509V3_EXT_print(bio, self._extension, 0, 0) _openssl_assert(print_result != 0) - return _native(_bio_to_string(bio)) + return _bio_to_string(bio).decode("utf-8") - def get_critical(self): + def get_critical(self) -> bool: """ Returns the critical field of this X.509 extension. @@ -858,7 +891,7 @@ class X509Extension(object): """ return _lib.X509_EXTENSION_get_critical(self._extension) - def get_short_name(self): + def get_short_name(self) -> bytes: """ Returns the short type name of this X.509 extension. @@ -873,7 +906,7 @@ class X509Extension(object): nid = _lib.OBJ_obj2nid(obj) return _ffi.string(_lib.OBJ_nid2sn(nid)) - def get_data(self): + def get_data(self) -> bytes: """ Returns the data of the X509 extension, encoded as ASN.1. @@ -884,23 +917,23 @@ class X509Extension(object): """ octet_result = _lib.X509_EXTENSION_get_data(self._extension) string_result = _ffi.cast("ASN1_STRING*", octet_result) - char_result = _lib.ASN1_STRING_data(string_result) + char_result = _lib.ASN1_STRING_get0_data(string_result) result_length = _lib.ASN1_STRING_length(string_result) return _ffi.buffer(char_result, result_length)[:] -class X509Req(object): +class X509Req: """ An X.509 certificate signing requests. """ - def __init__(self): + def __init__(self) -> None: req = _lib.X509_REQ_new() self._req = _ffi.gc(req, _lib.X509_REQ_free) # Default to version 0. self.set_version(0) - def to_cryptography(self): + def to_cryptography(self) -> x509.CertificateSigningRequest: """ Export as a ``cryptography`` certificate signing request. @@ -912,11 +945,12 @@ class X509Req(object): der = dump_certificate_request(FILETYPE_ASN1, self) - backend = _get_backend() - return load_der_x509_csr(der, backend) + return load_der_x509_csr(der) @classmethod - def from_cryptography(cls, crypto_req): + def from_cryptography( + cls, crypto_req: x509.CertificateSigningRequest + ) -> "X509Req": """ Construct based on a ``cryptography`` *crypto_req*. @@ -935,7 +969,7 @@ class X509Req(object): der = crypto_req.public_bytes(Encoding.DER) return load_certificate_request(FILETYPE_ASN1, der) - def set_pubkey(self, pkey): + def set_pubkey(self, pkey: PKey) -> None: """ Set the public key of the certificate signing request. @@ -947,7 +981,7 @@ class X509Req(object): set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey) _openssl_assert(set_result == 1) - def get_pubkey(self): + def get_pubkey(self) -> PKey: """ Get the public key of the certificate signing request. @@ -961,9 +995,9 @@ class X509Req(object): pkey._only_public = True return pkey - def set_version(self, version): + def set_version(self, version: int) -> None: """ - Set the version subfield (RFC 2459, section 4.1.2.1) of the certificate + Set the version subfield (RFC 2986, section 4.1) of the certificate request. :param int version: The version number. @@ -972,7 +1006,7 @@ class X509Req(object): set_result = _lib.X509_REQ_set_version(self._req, version) _openssl_assert(set_result == 1) - def get_version(self): + def get_version(self) -> int: """ Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate request. @@ -982,7 +1016,7 @@ class X509Req(object): """ return _lib.X509_REQ_get_version(self._req) - def get_subject(self): + def get_subject(self) -> X509Name: """ Return the subject of this certificate signing request. @@ -1004,7 +1038,7 @@ class X509Req(object): return name - def add_extensions(self, extensions): + def add_extensions(self, extensions: Iterable[X509Extension]) -> None: """ Add extensions to the certificate signing request. @@ -1027,7 +1061,7 @@ class X509Req(object): add_result = _lib.X509_REQ_add_extensions(self._req, stack) _openssl_assert(add_result == 1) - def get_extensions(self): + def get_extensions(self) -> List[X509Extension]: """ Get X.509 extensions in the certificate signing request. @@ -1055,15 +1089,15 @@ class X509Req(object): exts.append(ext) return exts - def sign(self, pkey, digest): + def sign(self, pkey: PKey, digest: str) -> None: """ Sign the certificate signing request with this key and digest type. :param pkey: The key pair to sign with. :type pkey: :py:class:`PKey` :param digest: The name of the message digest to use for the signature, - e.g. :py:data:`b"sha256"`. - :type digest: :py:class:`bytes` + e.g. :py:data:`"sha256"`. + :type digest: :py:class:`str` :return: ``None`` """ if pkey._only_public: @@ -1079,7 +1113,7 @@ class X509Req(object): sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj) _openssl_assert(sign_result > 0) - def verify(self, pkey): + def verify(self, pkey: PKey) -> bool: """ Verifies the signature on this certificate signing request. @@ -1101,12 +1135,12 @@ class X509Req(object): return result -class X509(object): +class X509: """ An X.509 certificate. """ - def __init__(self): + def __init__(self) -> None: x509 = _lib.X509_new() _openssl_assert(x509 != _ffi.NULL) self._x509 = _ffi.gc(x509, _lib.X509_free) @@ -1115,14 +1149,14 @@ class X509(object): self._subject_invalidator = _X509NameInvalidator() @classmethod - def _from_raw_x509_ptr(cls, x509): + def _from_raw_x509_ptr(cls, x509: Any) -> "X509": cert = cls.__new__(cls) cert._x509 = _ffi.gc(x509, _lib.X509_free) cert._issuer_invalidator = _X509NameInvalidator() cert._subject_invalidator = _X509NameInvalidator() return cert - def to_cryptography(self): + def to_cryptography(self) -> x509.Certificate: """ Export as a ``cryptography`` certificate. @@ -1133,11 +1167,10 @@ class X509(object): from cryptography.x509 import load_der_x509_certificate der = dump_certificate(FILETYPE_ASN1, self) - backend = _get_backend() - return load_der_x509_certificate(der, backend) + return load_der_x509_certificate(der) @classmethod - def from_cryptography(cls, crypto_cert): + def from_cryptography(cls, crypto_cert: x509.Certificate) -> "X509": """ Construct based on a ``cryptography`` *crypto_cert*. @@ -1156,7 +1189,7 @@ class X509(object): der = crypto_cert.public_bytes(Encoding.DER) return load_certificate(FILETYPE_ASN1, der) - def set_version(self, version): + def set_version(self, version: int) -> None: """ Set the version number of the certificate. Note that the version value is zero-based, eg. a value of 0 is V1. @@ -1169,9 +1202,9 @@ class X509(object): if not isinstance(version, int): raise TypeError("version must be an integer") - _lib.X509_set_version(self._x509, version) + _openssl_assert(_lib.X509_set_version(self._x509, version) == 1) - def get_version(self): + def get_version(self) -> int: """ Return the version number of the certificate. @@ -1180,7 +1213,7 @@ class X509(object): """ return _lib.X509_get_version(self._x509) - def get_pubkey(self): + def get_pubkey(self) -> PKey: """ Get the public key of the certificate. @@ -1195,7 +1228,7 @@ class X509(object): pkey._only_public = True return pkey - def set_pubkey(self, pkey): + def set_pubkey(self, pkey: PKey) -> None: """ Set the public key of the certificate. @@ -1210,7 +1243,7 @@ class X509(object): set_result = _lib.X509_set_pubkey(self._x509, pkey._pkey) _openssl_assert(set_result == 1) - def sign(self, pkey, digest): + def sign(self, pkey: PKey, digest: str) -> None: """ Sign the certificate with this key and digest type. @@ -1218,7 +1251,7 @@ class X509(object): :type pkey: :py:class:`PKey` :param digest: The name of the message digest to use. - :type digest: :py:class:`bytes` + :type digest: :py:class:`str` :return: :py:data:`None` """ @@ -1238,7 +1271,7 @@ class X509(object): sign_result = _lib.X509_sign(self._x509, pkey._pkey, evp_md) _openssl_assert(sign_result > 0) - def get_signature_algorithm(self): + def get_signature_algorithm(self) -> bytes: """ Return the signature algorithm used in the certificate. @@ -1255,12 +1288,12 @@ class X509(object): raise ValueError("Undefined signature algorithm") return _ffi.string(_lib.OBJ_nid2ln(nid)) - def digest(self, digest_name): + def digest(self, digest_name: str) -> bytes: """ Return the digest of the X509 object. :param digest_name: The name of the digest algorithm to use. - :type digest_name: :py:class:`bytes` + :type digest_name: :py:class:`str` :return: The digest of the object, formatted as :py:const:`b":"`-delimited hex pairs. @@ -1286,7 +1319,7 @@ class X509(object): ] ) - def subject_name_hash(self): + def subject_name_hash(self) -> bytes: """ Return the hash of the X509 subject. @@ -1295,7 +1328,7 @@ class X509(object): """ return _lib.X509_subject_name_hash(self._x509) - def set_serial_number(self, serial): + def set_serial_number(self, serial: int) -> None: """ Set the serial number of the certificate. @@ -1304,19 +1337,18 @@ class X509(object): :return: :py:data`None` """ - if not isinstance(serial, _integer_types): + if not isinstance(serial, int): raise TypeError("serial must be an integer") hex_serial = hex(serial)[2:] - if not isinstance(hex_serial, bytes): - hex_serial = hex_serial.encode("ascii") + hex_serial_bytes = hex_serial.encode("ascii") bignum_serial = _ffi.new("BIGNUM**") # BN_hex2bn stores the result in &bignum. Unless it doesn't feel like # it. If bignum is still NULL after this call, then the return value # is actually the result. I hope. -exarkun - small_serial = _lib.BN_hex2bn(bignum_serial, hex_serial) + small_serial = _lib.BN_hex2bn(bignum_serial, hex_serial_bytes) if bignum_serial[0] == _ffi.NULL: set_result = _lib.ASN1_INTEGER_set( @@ -1335,7 +1367,7 @@ class X509(object): set_result = _lib.X509_set_serialNumber(self._x509, asn1_serial) _openssl_assert(set_result == 1) - def get_serial_number(self): + def get_serial_number(self) -> int: """ Return the serial number of this certificate. @@ -1355,7 +1387,7 @@ class X509(object): finally: _lib.BN_free(bignum_serial) - def gmtime_adj_notAfter(self, amount): + def gmtime_adj_notAfter(self, amount: int) -> None: """ Adjust the time stamp on which the certificate stops being valid. @@ -1369,7 +1401,7 @@ class X509(object): notAfter = _lib.X509_getm_notAfter(self._x509) _lib.X509_gmtime_adj(notAfter, amount) - def gmtime_adj_notBefore(self, amount): + def gmtime_adj_notBefore(self, amount: int) -> None: """ Adjust the timestamp on which the certificate starts being valid. @@ -1382,22 +1414,25 @@ class X509(object): notBefore = _lib.X509_getm_notBefore(self._x509) _lib.X509_gmtime_adj(notBefore, amount) - def has_expired(self): + def has_expired(self) -> bool: """ Check whether the certificate has expired. :return: ``True`` if the certificate has expired, ``False`` otherwise. :rtype: bool """ - time_string = _native(self.get_notAfter()) + time_bytes = self.get_notAfter() + if time_bytes is None: + raise ValueError("Unable to determine notAfter") + time_string = time_bytes.decode("utf-8") not_after = datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") return not_after < datetime.datetime.utcnow() - def _get_boundary_time(self, which): + def _get_boundary_time(self, which: Any) -> Optional[bytes]: return _get_asn1_time(which(self._x509)) - def get_notBefore(self): + def get_notBefore(self) -> Optional[bytes]: """ Get the timestamp at which the certificate starts being valid. @@ -1410,10 +1445,12 @@ class X509(object): """ return self._get_boundary_time(_lib.X509_getm_notBefore) - def _set_boundary_time(self, which, when): + def _set_boundary_time( + self, which: Callable[..., Any], when: bytes + ) -> None: return _set_asn1_time(which(self._x509), when) - def set_notBefore(self, when): + def set_notBefore(self, when: bytes) -> None: """ Set the timestamp at which the certificate starts being valid. @@ -1426,7 +1463,7 @@ class X509(object): """ return self._set_boundary_time(_lib.X509_getm_notBefore, when) - def get_notAfter(self): + def get_notAfter(self) -> Optional[bytes]: """ Get the timestamp at which the certificate stops being valid. @@ -1439,7 +1476,7 @@ class X509(object): """ return self._get_boundary_time(_lib.X509_getm_notAfter) - def set_notAfter(self, when): + def set_notAfter(self, when: bytes) -> None: """ Set the timestamp at which the certificate stops being valid. @@ -1452,7 +1489,7 @@ class X509(object): """ return self._set_boundary_time(_lib.X509_getm_notAfter, when) - def _get_name(self, which): + def _get_name(self, which: Any) -> X509Name: name = X509Name.__new__(X509Name) name._name = which(self._x509) _openssl_assert(name._name != _ffi.NULL) @@ -1463,13 +1500,13 @@ class X509(object): return name - def _set_name(self, which, name): + def _set_name(self, which: Any, name: X509Name) -> None: if not isinstance(name, X509Name): raise TypeError("name must be an X509Name") set_result = which(self._x509, name._name) _openssl_assert(set_result == 1) - def get_issuer(self): + def get_issuer(self) -> X509Name: """ Return the issuer of this certificate. @@ -1485,7 +1522,7 @@ class X509(object): self._issuer_invalidator.add(name) return name - def set_issuer(self, issuer): + def set_issuer(self, issuer: X509Name) -> None: """ Set the issuer of this certificate. @@ -1497,7 +1534,7 @@ class X509(object): self._set_name(_lib.X509_set_issuer_name, issuer) self._issuer_invalidator.clear() - def get_subject(self): + def get_subject(self) -> X509Name: """ Return the subject of this certificate. @@ -1513,7 +1550,7 @@ class X509(object): self._subject_invalidator.add(name) return name - def set_subject(self, subject): + def set_subject(self, subject: X509Name) -> None: """ Set the subject of this certificate. @@ -1525,7 +1562,7 @@ class X509(object): self._set_name(_lib.X509_set_subject_name, subject) self._subject_invalidator.clear() - def get_extension_count(self): + def get_extension_count(self) -> int: """ Get the number of extensions on this certificate. @@ -1536,7 +1573,7 @@ class X509(object): """ return _lib.X509_get_ext_count(self._x509) - def add_extensions(self, extensions): + def add_extensions(self, extensions: Iterable[X509Extension]) -> None: """ Add extensions to the certificate. @@ -1552,7 +1589,7 @@ class X509(object): if not add_result: _raise_current_error() - def get_extension(self, index): + def get_extension(self, index: int) -> X509Extension: """ Get a specific extension of the certificate by index. @@ -1576,7 +1613,7 @@ class X509(object): return ext -class X509StoreFlags(object): +class X509StoreFlags: """ Flags for X509 verification, used to change the behavior of :class:`X509Store`. @@ -1587,19 +1624,20 @@ class X509StoreFlags(object): https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html """ - CRL_CHECK = _lib.X509_V_FLAG_CRL_CHECK - CRL_CHECK_ALL = _lib.X509_V_FLAG_CRL_CHECK_ALL - IGNORE_CRITICAL = _lib.X509_V_FLAG_IGNORE_CRITICAL - X509_STRICT = _lib.X509_V_FLAG_X509_STRICT - ALLOW_PROXY_CERTS = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS - POLICY_CHECK = _lib.X509_V_FLAG_POLICY_CHECK - EXPLICIT_POLICY = _lib.X509_V_FLAG_EXPLICIT_POLICY - INHIBIT_MAP = _lib.X509_V_FLAG_INHIBIT_MAP - NOTIFY_POLICY = _lib.X509_V_FLAG_NOTIFY_POLICY - CHECK_SS_SIGNATURE = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE + CRL_CHECK: int = _lib.X509_V_FLAG_CRL_CHECK + CRL_CHECK_ALL: int = _lib.X509_V_FLAG_CRL_CHECK_ALL + IGNORE_CRITICAL: int = _lib.X509_V_FLAG_IGNORE_CRITICAL + X509_STRICT: int = _lib.X509_V_FLAG_X509_STRICT + ALLOW_PROXY_CERTS: int = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS + POLICY_CHECK: int = _lib.X509_V_FLAG_POLICY_CHECK + EXPLICIT_POLICY: int = _lib.X509_V_FLAG_EXPLICIT_POLICY + INHIBIT_MAP: int = _lib.X509_V_FLAG_INHIBIT_MAP + NOTIFY_POLICY: int = _lib.X509_V_FLAG_NOTIFY_POLICY + CHECK_SS_SIGNATURE: int = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE + PARTIAL_CHAIN: int = _lib.X509_V_FLAG_PARTIAL_CHAIN -class X509Store(object): +class X509Store: """ An X.509 store. @@ -1613,11 +1651,11 @@ class X509Store(object): :class:`X509StoreContext`. """ - def __init__(self): + def __init__(self) -> None: store = _lib.X509_STORE_new() self._store = _ffi.gc(store, _lib.X509_STORE_free) - def add_cert(self, cert): + def add_cert(self, cert: X509) -> None: """ Adds a trusted certificate to this store. @@ -1639,7 +1677,7 @@ class X509Store(object): res = _lib.X509_STORE_add_cert(self._store, cert._x509) _openssl_assert(res == 1) - def add_crl(self, crl): + def add_crl(self, crl: "CRL") -> None: """ Add a certificate revocation list to this store. @@ -1655,7 +1693,7 @@ class X509Store(object): """ _openssl_assert(_lib.X509_STORE_add_crl(self._store, crl._crl) != 0) - def set_flags(self, flags): + def set_flags(self, flags: int) -> None: """ Set verification flags to this store. @@ -1679,7 +1717,7 @@ class X509Store(object): """ _openssl_assert(_lib.X509_STORE_set_flags(self._store, flags) != 0) - def set_time(self, vfy_time): + def set_time(self, vfy_time: datetime.datetime) -> None: """ Set the time against which the certificates are verified. @@ -1703,7 +1741,9 @@ class X509Store(object): ) _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0) - def load_locations(self, cafile, capath=None): + def load_locations( + self, cafile: StrOrBytesPath, capath: Optional[StrOrBytesPath] = None + ) -> None: """ Let X509Store know where we can find trusted certificates for the certificate chain. Note that the certificates have to be in PEM @@ -1737,12 +1777,12 @@ class X509Store(object): if cafile is None: cafile = _ffi.NULL else: - cafile = _path_string(cafile) + cafile = _path_bytes(cafile) if capath is None: capath = _ffi.NULL else: - capath = _path_string(capath) + capath = _path_bytes(capath) load_result = _lib.X509_STORE_load_locations( self._store, cafile, capath @@ -1760,12 +1800,15 @@ class X509StoreContextError(Exception): :type certificate: :class:`X509` """ - def __init__(self, message, certificate): + def __init__( + self, message: str, errors: List[Any], certificate: X509 + ) -> None: super(X509StoreContextError, self).__init__(message) + self.errors = errors self.certificate = certificate -class X509StoreContext(object): +class X509StoreContext: """ An X.509 store context. @@ -1787,7 +1830,12 @@ class X509StoreContext(object): :type chain: :class:`list` of :class:`X509` """ - def __init__(self, store, certificate, chain=None): + def __init__( + self, + store: X509Store, + certificate: X509, + chain: Optional[Sequence[X509]] = None, + ) -> None: store_ctx = _lib.X509_STORE_CTX_new() self._store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free) self._store = store @@ -1799,8 +1847,10 @@ class X509StoreContext(object): self._init() @staticmethod - def _build_certificate_stack(certificates): - def cleanup(s): + def _build_certificate_stack( + certificates: Optional[Sequence[X509]], + ) -> None: + def cleanup(s: Any) -> None: # Equivalent to sk_X509_pop_free, but we don't # currently have a CFFI binding for that available for i in range(_lib.sk_X509_num(s)): @@ -1826,7 +1876,7 @@ class X509StoreContext(object): return stack - def _init(self): + def _init(self) -> None: """ Set up the store context for a subsequent verification operation. @@ -1839,7 +1889,7 @@ class X509StoreContext(object): if ret <= 0: _raise_current_error() - def _cleanup(self): + def _cleanup(self) -> None: """ Internally cleans up the store context. @@ -1847,7 +1897,7 @@ class X509StoreContext(object): """ _lib.X509_STORE_CTX_cleanup(self._store_ctx) - def _exception_from_context(self): + def _exception_from_context(self) -> X509StoreContextError: """ Convert an OpenSSL native context error failure into a Python exception. @@ -1855,25 +1905,24 @@ class X509StoreContext(object): When a call to native OpenSSL X509_verify_cert fails, additional information about the failure can be obtained from the store context. """ + message = _ffi.string( + _lib.X509_verify_cert_error_string( + _lib.X509_STORE_CTX_get_error(self._store_ctx) + ) + ).decode("utf-8") errors = [ _lib.X509_STORE_CTX_get_error(self._store_ctx), _lib.X509_STORE_CTX_get_error_depth(self._store_ctx), - _native( - _ffi.string( - _lib.X509_verify_cert_error_string( - _lib.X509_STORE_CTX_get_error(self._store_ctx) - ) - ) - ), + message, ] # A context error should always be associated with a certificate, so we # expect this call to never return :class:`None`. _x509 = _lib.X509_STORE_CTX_get_current_cert(self._store_ctx) _cert = _lib.X509_dup(_x509) pycert = X509._from_raw_x509_ptr(_cert) - return X509StoreContextError(errors, pycert) + return X509StoreContextError(message, errors, pycert) - def set_store(self, store): + def set_store(self, store: X509Store) -> None: """ Set the context's X.509 store. @@ -1884,7 +1933,7 @@ class X509StoreContext(object): """ self._store = store - def verify_certificate(self): + def verify_certificate(self) -> None: """ Verify a certificate in a context. @@ -1906,7 +1955,7 @@ class X509StoreContext(object): if ret <= 0: raise self._exception_from_context() - def get_verified_chain(self): + def get_verified_chain(self) -> List[X509]: """ Verify a certificate in a context and return the complete validated chain. @@ -1946,7 +1995,7 @@ class X509StoreContext(object): return result -def load_certificate(type, buffer): +def load_certificate(type: int, buffer: bytes) -> X509: """ Load a certificate (X509) from the string *buffer* encoded with the type *type*. @@ -1957,7 +2006,7 @@ def load_certificate(type, buffer): :return: The X509 object """ - if isinstance(buffer, _text_type): + if isinstance(buffer, str): buffer = buffer.encode("ascii") bio = _new_mem_buf(buffer) @@ -1975,7 +2024,7 @@ def load_certificate(type, buffer): return X509._from_raw_x509_ptr(x509) -def dump_certificate(type, cert): +def dump_certificate(type: int, cert: X509) -> bytes: """ Dump the certificate *cert* into a buffer string encoded with the type *type*. @@ -2003,7 +2052,7 @@ def dump_certificate(type, cert): return _bio_to_string(bio) -def dump_publickey(type, pkey): +def dump_publickey(type: int, pkey: PKey) -> bytes: """ Dump a public key to a buffer. @@ -2028,7 +2077,12 @@ def dump_publickey(type, pkey): return _bio_to_string(bio) -def dump_privatekey(type, pkey, cipher=None, passphrase=None): +def dump_privatekey( + type: int, + pkey: PKey, + cipher: Optional[str] = None, + passphrase: Optional[PassphraseCallableT] = None, +) -> bytes: """ Dump the private key *pkey* into a buffer string encoded with the type *type*. Optionally (if *type* is :const:`FILETYPE_PEM`) encrypting it @@ -2092,7 +2146,7 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None): return _bio_to_string(bio) -class Revoked(object): +class Revoked: """ A certificate revocation. """ @@ -2112,11 +2166,11 @@ class Revoked(object): # b"removeFromCRL", ] - def __init__(self): + def __init__(self) -> None: revoked = _lib.X509_REVOKED_new() self._revoked = _ffi.gc(revoked, _lib.X509_REVOKED_free) - def set_serial(self, hex_str): + def set_serial(self, hex_str: bytes) -> None: """ Set the serial number. @@ -2140,7 +2194,7 @@ class Revoked(object): ) _lib.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial) - def get_serial(self): + def get_serial(self) -> bytes: """ Get the serial number. @@ -2158,7 +2212,7 @@ class Revoked(object): _openssl_assert(result >= 0) return _bio_to_string(bio) - def _delete_reason(self): + def _delete_reason(self) -> None: for i in range(_lib.X509_REVOKED_get_ext_count(self._revoked)): ext = _lib.X509_REVOKED_get_ext(self._revoked, i) obj = _lib.X509_EXTENSION_get_object(ext) @@ -2167,7 +2221,7 @@ class Revoked(object): _lib.X509_REVOKED_delete_ext(self._revoked, i) break - def set_reason(self, reason): + def set_reason(self, reason: Optional[bytes]) -> None: """ Set the reason of this revocation. @@ -2204,7 +2258,7 @@ class Revoked(object): ) _openssl_assert(add_result == 1) - def get_reason(self): + def get_reason(self) -> Optional[bytes]: """ Get the reason of this revocation. @@ -2230,8 +2284,9 @@ class Revoked(object): _openssl_assert(print_result != 0) return _bio_to_string(bio) + return None - def all_reasons(self): + def all_reasons(self) -> List[bytes]: """ Return a list of all the supported reason strings. @@ -2243,7 +2298,7 @@ class Revoked(object): """ return self._crl_reasons[:] - def set_rev_date(self, when): + def set_rev_date(self, when: bytes) -> None: """ Set the revocation timestamp. @@ -2251,10 +2306,13 @@ class Revoked(object): as ASN.1 TIME. :return: ``None`` """ - dt = _lib.X509_REVOKED_get0_revocationDate(self._revoked) - return _set_asn1_time(dt, when) + revocationDate = _new_asn1_time(when) + ret = _lib.X509_REVOKED_set_revocationDate( + self._revoked, revocationDate + ) + _openssl_assert(ret == 1) - def get_rev_date(self): + def get_rev_date(self) -> Optional[bytes]: """ Get the revocation timestamp. @@ -2265,16 +2323,16 @@ class Revoked(object): return _get_asn1_time(dt) -class CRL(object): +class CRL: """ A certificate revocation list. """ - def __init__(self): + def __init__(self) -> None: crl = _lib.X509_CRL_new() self._crl = _ffi.gc(crl, _lib.X509_CRL_free) - def to_cryptography(self): + def to_cryptography(self) -> x509.CertificateRevocationList: """ Export as a ``cryptography`` CRL. @@ -2285,12 +2343,12 @@ class CRL(object): from cryptography.x509 import load_der_x509_crl der = dump_crl(FILETYPE_ASN1, self) - - backend = _get_backend() - return load_der_x509_crl(der, backend) + return load_der_x509_crl(der) @classmethod - def from_cryptography(cls, crypto_crl): + def from_cryptography( + cls, crypto_crl: x509.CertificateRevocationList + ) -> "CRL": """ Construct based on a ``cryptography`` *crypto_crl*. @@ -2309,7 +2367,7 @@ class CRL(object): der = crypto_crl.public_bytes(Encoding.DER) return load_crl(FILETYPE_ASN1, der) - def get_revoked(self): + def get_revoked(self) -> Optional[Tuple[Revoked, ...]]: """ Return the revocations in this certificate revocation list. @@ -2323,14 +2381,15 @@ class CRL(object): revoked_stack = _lib.X509_CRL_get_REVOKED(self._crl) for i in range(_lib.sk_X509_REVOKED_num(revoked_stack)): revoked = _lib.sk_X509_REVOKED_value(revoked_stack, i) - revoked_copy = _lib.Cryptography_X509_REVOKED_dup(revoked) + revoked_copy = _lib.X509_REVOKED_dup(revoked) pyrev = Revoked.__new__(Revoked) pyrev._revoked = _ffi.gc(revoked_copy, _lib.X509_REVOKED_free) results.append(pyrev) if results: return tuple(results) + return None - def add_revoked(self, revoked): + def add_revoked(self, revoked: Revoked) -> None: """ Add a revoked (by value not reference) to the CRL structure @@ -2341,13 +2400,13 @@ class CRL(object): :param Revoked revoked: The new revocation. :return: ``None`` """ - copy = _lib.Cryptography_X509_REVOKED_dup(revoked._revoked) + copy = _lib.X509_REVOKED_dup(revoked._revoked) _openssl_assert(copy != _ffi.NULL) add_result = _lib.X509_CRL_add0_revoked(self._crl, copy) _openssl_assert(add_result != 0) - def get_issuer(self): + def get_issuer(self) -> X509Name: """ Get the CRL's issuer. @@ -2362,7 +2421,7 @@ class CRL(object): issuer._name = _issuer return issuer - def set_version(self, version): + def set_version(self, version: int) -> None: """ Set the CRL version. @@ -2373,10 +2432,7 @@ class CRL(object): """ _openssl_assert(_lib.X509_CRL_set_version(self._crl, version) != 0) - def _set_boundary_time(self, which, when): - return _set_asn1_time(which(self._crl), when) - - def set_lastUpdate(self, when): + def set_lastUpdate(self, when: bytes) -> None: """ Set when the CRL was last updated. @@ -2389,9 +2445,11 @@ class CRL(object): :param bytes when: A timestamp string. :return: ``None`` """ - return self._set_boundary_time(_lib.X509_CRL_get_lastUpdate, when) + lastUpdate = _new_asn1_time(when) + ret = _lib.X509_CRL_set1_lastUpdate(self._crl, lastUpdate) + _openssl_assert(ret == 1) - def set_nextUpdate(self, when): + def set_nextUpdate(self, when: bytes) -> None: """ Set when the CRL will next be updated. @@ -2404,9 +2462,11 @@ class CRL(object): :param bytes when: A timestamp string. :return: ``None`` """ - return self._set_boundary_time(_lib.X509_CRL_get_nextUpdate, when) + nextUpdate = _new_asn1_time(when) + ret = _lib.X509_CRL_set1_nextUpdate(self._crl, nextUpdate) + _openssl_assert(ret == 1) - def sign(self, issuer_cert, issuer_key, digest): + def sign(self, issuer_cert: X509, issuer_key: PKey, digest: bytes) -> None: """ Sign the CRL. @@ -2433,8 +2493,13 @@ class CRL(object): _openssl_assert(result != 0) def export( - self, cert, key, type=FILETYPE_PEM, days=100, digest=_UNSPECIFIED - ): + self, + cert: X509, + key: PKey, + type: int = FILETYPE_PEM, + days: int = 100, + digest: bytes = _UNSPECIFIED, # type: ignore + ) -> bytes: """ Export the CRL as a string. @@ -2462,23 +2527,26 @@ class CRL(object): if digest_obj == _ffi.NULL: raise ValueError("No such digest method") - bio = _lib.BIO_new(_lib.BIO_s_mem()) - _openssl_assert(bio != _ffi.NULL) - # A scratch time object to give different values to different CRL # fields sometime = _lib.ASN1_TIME_new() _openssl_assert(sometime != _ffi.NULL) + sometime = _ffi.gc(sometime, _lib.ASN1_TIME_free) - _lib.X509_gmtime_adj(sometime, 0) - _lib.X509_CRL_set_lastUpdate(self._crl, sometime) + ret = _lib.X509_gmtime_adj(sometime, 0) + _openssl_assert(ret != _ffi.NULL) + ret = _lib.X509_CRL_set1_lastUpdate(self._crl, sometime) + _openssl_assert(ret == 1) - _lib.X509_gmtime_adj(sometime, days * 24 * 60 * 60) - _lib.X509_CRL_set_nextUpdate(self._crl, sometime) + ret = _lib.X509_gmtime_adj(sometime, days * 24 * 60 * 60) + _openssl_assert(ret != _ffi.NULL) + ret = _lib.X509_CRL_set1_nextUpdate(self._crl, sometime) + _openssl_assert(ret == 1) - _lib.X509_CRL_set_issuer_name( + ret = _lib.X509_CRL_set_issuer_name( self._crl, _lib.X509_get_subject_name(cert._x509) ) + _openssl_assert(ret == 1) sign_result = _lib.X509_CRL_sign(self._crl, key._pkey, digest_obj) if not sign_result: @@ -2487,8 +2555,11 @@ class CRL(object): return dump_crl(type, self) -class PKCS7(object): - def type_is_signed(self): +class PKCS7: + + _pkcs7: Any + + def type_is_signed(self) -> bool: """ Check if this NID_pkcs7_signed object @@ -2496,7 +2567,7 @@ class PKCS7(object): """ return bool(_lib.PKCS7_type_is_signed(self._pkcs7)) - def type_is_enveloped(self): + def type_is_enveloped(self) -> bool: """ Check if this NID_pkcs7_enveloped object @@ -2504,7 +2575,7 @@ class PKCS7(object): """ return bool(_lib.PKCS7_type_is_enveloped(self._pkcs7)) - def type_is_signedAndEnveloped(self): + def type_is_signedAndEnveloped(self) -> bool: """ Check if this NID_pkcs7_signedAndEnveloped object @@ -2512,7 +2583,7 @@ class PKCS7(object): """ return bool(_lib.PKCS7_type_is_signedAndEnveloped(self._pkcs7)) - def type_is_data(self): + def type_is_data(self) -> bool: """ Check if this NID_pkcs7_data object @@ -2520,7 +2591,7 @@ class PKCS7(object): """ return bool(_lib.PKCS7_type_is_data(self._pkcs7)) - def get_type_name(self): + def get_type_name(self) -> str: """ Returns the type name of the PKCS7 structure @@ -2531,18 +2602,18 @@ class PKCS7(object): return _ffi.string(string_type) -class PKCS12(object): +class PKCS12: """ A PKCS #12 archive. """ - def __init__(self): - self._pkey = None - self._cert = None - self._cacerts = None - self._friendlyname = None + def __init__(self) -> None: + self._pkey: Optional[PKey] = None + self._cert: Optional[X509] = None + self._cacerts: Optional[List[X509]] = None + self._friendlyname: Optional[bytes] = None - def get_certificate(self): + def get_certificate(self) -> Optional[X509]: """ Get the certificate in the PKCS #12 structure. @@ -2551,7 +2622,7 @@ class PKCS12(object): """ return self._cert - def set_certificate(self, cert): + def set_certificate(self, cert: X509) -> None: """ Set the certificate in the PKCS #12 structure. @@ -2564,7 +2635,7 @@ class PKCS12(object): raise TypeError("cert must be an X509 instance") self._cert = cert - def get_privatekey(self): + def get_privatekey(self) -> Optional[PKey]: """ Get the private key in the PKCS #12 structure. @@ -2573,7 +2644,7 @@ class PKCS12(object): """ return self._pkey - def set_privatekey(self, pkey): + def set_privatekey(self, pkey: PKey) -> None: """ Set the certificate portion of the PKCS #12 structure. @@ -2586,7 +2657,7 @@ class PKCS12(object): raise TypeError("pkey must be a PKey instance") self._pkey = pkey - def get_ca_certificates(self): + def get_ca_certificates(self) -> Optional[Tuple[X509, ...]]: """ Get the CA certificates in the PKCS #12 structure. @@ -2596,8 +2667,9 @@ class PKCS12(object): """ if self._cacerts is not None: return tuple(self._cacerts) + return None - def set_ca_certificates(self, cacerts): + def set_ca_certificates(self, cacerts: Optional[Iterable[X509]]) -> None: """ Replace or set the CA certificates within the PKCS12 object. @@ -2618,7 +2690,7 @@ class PKCS12(object): ) self._cacerts = cacerts - def set_friendlyname(self, name): + def set_friendlyname(self, name: Optional[bytes]) -> None: """ Set the friendly name in the PKCS #12 structure. @@ -2635,7 +2707,7 @@ class PKCS12(object): ) self._friendlyname = name - def get_friendlyname(self): + def get_friendlyname(self) -> Optional[bytes]: """ Get the friendly name in the PKCS# 12 structure. @@ -2644,7 +2716,12 @@ class PKCS12(object): """ return self._friendlyname - def export(self, passphrase=None, iter=2048, maciter=1): + def export( + self, + passphrase: Optional[bytes] = None, + iter: int = 2048, + maciter: int = 1, + ) -> bytes: """ Dump a PKCS12 object as a string. @@ -2712,16 +2789,16 @@ class PKCS12(object): return _bio_to_string(bio) -class NetscapeSPKI(object): +class NetscapeSPKI: """ A Netscape SPKI object. """ - def __init__(self): + def __init__(self) -> None: spki = _lib.NETSCAPE_SPKI_new() self._spki = _ffi.gc(spki, _lib.NETSCAPE_SPKI_free) - def sign(self, pkey, digest): + def sign(self, pkey: PKey, digest: str) -> None: """ Sign the certificate request with this key and digest type. @@ -2729,7 +2806,7 @@ class NetscapeSPKI(object): :type pkey: :py:class:`PKey` :param digest: The message digest to use. - :type digest: :py:class:`bytes` + :type digest: :py:class:`str` :return: ``None`` """ @@ -2748,7 +2825,7 @@ class NetscapeSPKI(object): ) _openssl_assert(sign_result > 0) - def verify(self, key): + def verify(self, key: PKey) -> bool: """ Verifies a signature on a certificate request. @@ -2765,7 +2842,7 @@ class NetscapeSPKI(object): _raise_current_error() return True - def b64_encode(self): + def b64_encode(self) -> bytes: """ Generate a base64 encoded representation of this SPKI object. @@ -2777,7 +2854,7 @@ class NetscapeSPKI(object): _lib.OPENSSL_free(encoded) return result - def get_pubkey(self): + def get_pubkey(self) -> PKey: """ Get the public key of this certificate. @@ -2791,7 +2868,7 @@ class NetscapeSPKI(object): pkey._only_public = True return pkey - def set_pubkey(self, pkey): + def set_pubkey(self, pkey: PKey) -> None: """ Set the public key of the certificate @@ -2802,8 +2879,14 @@ class NetscapeSPKI(object): _openssl_assert(set_result == 1) -class _PassphraseHelper(object): - def __init__(self, type, passphrase, more_args=False, truncate=False): +class _PassphraseHelper: + def __init__( + self, + type: int, + passphrase: Optional[PassphraseCallableT], + more_args: bool = False, + truncate: bool = False, + ) -> None: if type != FILETYPE_PEM and passphrase is not None: raise ValueError( "only FILETYPE_PEM key format supports encryption" @@ -2811,10 +2894,10 @@ class _PassphraseHelper(object): self._passphrase = passphrase self._more_args = more_args self._truncate = truncate - self._problems = [] + self._problems: List[Exception] = [] @property - def callback(self): + def callback(self) -> Any: if self._passphrase is None: return _ffi.NULL elif isinstance(self._passphrase, bytes) or callable(self._passphrase): @@ -2825,7 +2908,7 @@ class _PassphraseHelper(object): ) @property - def callback_args(self): + def callback_args(self) -> Any: if self._passphrase is None: return _ffi.NULL elif isinstance(self._passphrase, bytes) or callable(self._passphrase): @@ -2835,7 +2918,7 @@ class _PassphraseHelper(object): "Last argument must be a byte string or a callable." ) - def raise_if_problem(self, exceptionType=Error): + def raise_if_problem(self, exceptionType: Type[Exception] = Error) -> None: if self._problems: # Flush the OpenSSL error queue @@ -2846,7 +2929,9 @@ class _PassphraseHelper(object): raise self._problems.pop(0) - def _read_passphrase(self, buf, size, rwflag, userdata): + def _read_passphrase( + self, buf: Any, size: int, rwflag: Any, userdata: Any + ) -> int: try: if callable(self._passphrase): if self._more_args: @@ -2854,6 +2939,7 @@ class _PassphraseHelper(object): else: result = self._passphrase(rwflag) else: + assert self._passphrase is not None result = self._passphrase if not isinstance(result, bytes): raise ValueError("Bytes expected") @@ -2872,7 +2958,7 @@ class _PassphraseHelper(object): return 0 -def load_publickey(type, buffer): +def load_publickey(type: int, buffer: Union[str, bytes]) -> PKey: """ Load a public key from a buffer. @@ -2883,7 +2969,7 @@ def load_publickey(type, buffer): :return: The PKey object. :rtype: :class:`PKey` """ - if isinstance(buffer, _text_type): + if isinstance(buffer, str): buffer = buffer.encode("ascii") bio = _new_mem_buf(buffer) @@ -2906,7 +2992,11 @@ def load_publickey(type, buffer): return pkey -def load_privatekey(type, buffer, passphrase=None): +def load_privatekey( + type: int, + buffer: Union[str, bytes], + passphrase: Optional[PassphraseCallableT] = None, +) -> PKey: """ Load a private key (PKey) from the string *buffer* encoded with the type *type*. @@ -2919,7 +3009,7 @@ def load_privatekey(type, buffer, passphrase=None): :return: The PKey object """ - if isinstance(buffer, _text_type): + if isinstance(buffer, str): buffer = buffer.encode("ascii") bio = _new_mem_buf(buffer) @@ -2943,7 +3033,7 @@ def load_privatekey(type, buffer, passphrase=None): return pkey -def dump_certificate_request(type, req): +def dump_certificate_request(type: int, req: X509Req) -> bytes: """ Dump the certificate request *req* into a buffer string encoded with the type *type*. @@ -2971,7 +3061,7 @@ def dump_certificate_request(type, req): return _bio_to_string(bio) -def load_certificate_request(type, buffer): +def load_certificate_request(type: int, buffer: bytes) -> X509Req: """ Load a certificate request (X509Req) from the string *buffer* encoded with the type *type*. @@ -2980,7 +3070,7 @@ def load_certificate_request(type, buffer): :param buffer: The buffer the certificate request is stored in :return: The X509Req object """ - if isinstance(buffer, _text_type): + if isinstance(buffer, str): buffer = buffer.encode("ascii") bio = _new_mem_buf(buffer) @@ -2999,7 +3089,7 @@ def load_certificate_request(type, buffer): return x509req -def sign(pkey, data, digest): +def sign(pkey: PKey, data: Union[str, bytes], digest: str) -> bytes: """ Sign a data string using the given key and message digest. @@ -3016,8 +3106,8 @@ def sign(pkey, data, digest): if digest_obj == _ffi.NULL: raise ValueError("No such digest method") - md_ctx = _lib.Cryptography_EVP_MD_CTX_new() - md_ctx = _ffi.gc(md_ctx, _lib.Cryptography_EVP_MD_CTX_free) + md_ctx = _lib.EVP_MD_CTX_new() + md_ctx = _ffi.gc(md_ctx, _lib.EVP_MD_CTX_free) _lib.EVP_SignInit(md_ctx, digest_obj) _lib.EVP_SignUpdate(md_ctx, data, len(data)) @@ -3034,7 +3124,9 @@ def sign(pkey, data, digest): return _ffi.buffer(signature_buffer, signature_length[0])[:] -def verify(cert, signature, data, digest): +def verify( + cert: X509, signature: bytes, data: Union[str, bytes], digest: str +) -> None: """ Verify the signature for a data string. @@ -3057,8 +3149,8 @@ def verify(cert, signature, data, digest): _openssl_assert(pkey != _ffi.NULL) pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free) - md_ctx = _lib.Cryptography_EVP_MD_CTX_new() - md_ctx = _ffi.gc(md_ctx, _lib.Cryptography_EVP_MD_CTX_free) + md_ctx = _lib.EVP_MD_CTX_new() + md_ctx = _ffi.gc(md_ctx, _lib.EVP_MD_CTX_free) _lib.EVP_VerifyInit(md_ctx, digest_obj) _lib.EVP_VerifyUpdate(md_ctx, data, len(data)) @@ -3070,7 +3162,7 @@ def verify(cert, signature, data, digest): _raise_current_error() -def dump_crl(type, crl): +def dump_crl(type: int, crl: CRL) -> bytes: """ Dump a certificate revocation list to a buffer. @@ -3099,7 +3191,7 @@ def dump_crl(type, crl): return _bio_to_string(bio) -def load_crl(type, buffer): +def load_crl(type: int, buffer: Union[str, bytes]) -> CRL: """ Load Certificate Revocation List (CRL) data from a string *buffer*. *buffer* encoded with the type *type*. @@ -3107,9 +3199,9 @@ def load_crl(type, buffer): :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) :param buffer: The buffer the CRL is stored in - :return: The PKey object + :return: The CRL object """ - if isinstance(buffer, _text_type): + if isinstance(buffer, str): buffer = buffer.encode("ascii") bio = _new_mem_buf(buffer) @@ -3129,7 +3221,7 @@ def load_crl(type, buffer): return result -def load_pkcs7_data(type, buffer): +def load_pkcs7_data(type: int, buffer: Union[str, bytes]) -> PKCS7: """ Load pkcs7 data from the string *buffer* encoded with the type *type*. @@ -3138,7 +3230,7 @@ def load_pkcs7_data(type, buffer): :param buffer: The buffer with the pkcs7 data. :return: The PKCS7 object """ - if isinstance(buffer, _text_type): + if isinstance(buffer, str): buffer = buffer.encode("ascii") bio = _new_mem_buf(buffer) @@ -3158,7 +3250,7 @@ def load_pkcs7_data(type, buffer): return pypkcs7 -load_pkcs7_data = utils.deprecated( +utils.deprecated( load_pkcs7_data, __name__, ( @@ -3166,10 +3258,13 @@ load_pkcs7_data = utils.deprecated( "in cryptography." ), DeprecationWarning, + name="load_pkcs7_data", ) -def load_pkcs12(buffer, passphrase=None): +def load_pkcs12( + buffer: Union[str, bytes], passphrase: Optional[bytes] = None +) -> PKCS12: """ Load pkcs12 data from the string *buffer*. If the pkcs12 structure is encrypted, a *passphrase* must be included. The MAC is always @@ -3183,7 +3278,7 @@ def load_pkcs12(buffer, passphrase=None): """ passphrase = _text_to_bytes_and_warn("passphrase", passphrase) - if isinstance(buffer, _text_type): + if isinstance(buffer, str): buffer = buffer.encode("ascii") bio = _new_mem_buf(buffer) @@ -3245,18 +3340,16 @@ def load_pkcs12(buffer, passphrase=None): x509 = _lib.sk_X509_value(cacerts, i) pycacert = X509._from_raw_x509_ptr(x509) pycacerts.append(pycacert) - if not pycacerts: - pycacerts = None pkcs12 = PKCS12.__new__(PKCS12) pkcs12._pkey = pykey pkcs12._cert = pycert - pkcs12._cacerts = pycacerts + pkcs12._cacerts = pycacerts if pycacerts else None pkcs12._friendlyname = friendlyname return pkcs12 -load_pkcs12 = utils.deprecated( +utils.deprecated( load_pkcs12, __name__, ( @@ -3264,25 +3357,5 @@ load_pkcs12 = utils.deprecated( "in cryptography." ), DeprecationWarning, + name="load_pkcs12", ) - - -# There are no direct unit tests for this initialization. It is tested -# indirectly since it is necessary for functions like dump_privatekey when -# using encryption. -# -# Thus OpenSSL.test.test_crypto.FunctionTests.test_dump_privatekey_passphrase -# and some other similar tests may fail without this (though they may not if -# the Python runtime has already done some initialization of the underlying -# OpenSSL library (and is linked against the same one that cryptography is -# using)). -_lib.OpenSSL_add_all_algorithms() - -# This is similar but exercised mainly by exception_from_error_queue. It calls -# both ERR_load_crypto_strings() and ERR_load_SSL_strings(). -_lib.SSL_load_error_strings() - - -# Set the default string mask to match OpenSSL upstream (since 2005) and -# RFC5280 recommendations. -_lib.ASN1_STRING_set_default_mask_asc(b"utf8only") |
