aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py2/twisted/internet/_sslverify.py
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/internet/_sslverify.py
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/internet/_sslverify.py')
-rw-r--r--contrib/python/Twisted/py2/twisted/internet/_sslverify.py2058
1 files changed, 2058 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/internet/_sslverify.py b/contrib/python/Twisted/py2/twisted/internet/_sslverify.py
new file mode 100644
index 0000000000..c44ec14324
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/internet/_sslverify.py
@@ -0,0 +1,2058 @@
+# -*- test-case-name: twisted.test.test_sslverify -*-
+# Copyright (c) 2005 Divmod, Inc.
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+from __future__ import division, absolute_import
+
+import warnings
+
+from constantly import Names, NamedConstant
+from hashlib import md5
+
+from OpenSSL import SSL, crypto
+from OpenSSL._util import lib as pyOpenSSLlib
+
+from twisted.internet.abstract import isIPAddress, isIPv6Address
+from twisted.python import log
+from twisted.python.randbytes import secureRandom
+from twisted.python._oldstyle import _oldStyle
+from ._idna import _idnaBytes
+
+from zope.interface import Interface, implementer
+from constantly import Flags, FlagConstant
+from incremental import Version
+
+from twisted.internet.defer import Deferred
+from twisted.internet.error import VerifyError, CertificateError
+from twisted.internet.interfaces import (
+ IAcceptableCiphers, ICipher, IOpenSSLClientConnectionCreator,
+ IOpenSSLContextFactory
+)
+
+from twisted.python import util
+from twisted.python.deprecate import _mutuallyExclusiveArguments
+from twisted.python.compat import nativeString, unicode
+from twisted.python.failure import Failure
+from twisted.python.util import FancyEqMixin
+
+from twisted.python.deprecate import deprecated
+
+
+
+class TLSVersion(Names):
+ """
+ TLS versions that we can negotiate with the client/server.
+ """
+ SSLv3 = NamedConstant()
+ TLSv1_0 = NamedConstant()
+ TLSv1_1 = NamedConstant()
+ TLSv1_2 = NamedConstant()
+ TLSv1_3 = NamedConstant()
+
+
+
+_tlsDisableFlags = {
+ TLSVersion.SSLv3: SSL.OP_NO_SSLv3,
+ TLSVersion.TLSv1_0: SSL.OP_NO_TLSv1,
+ TLSVersion.TLSv1_1: SSL.OP_NO_TLSv1_1,
+ TLSVersion.TLSv1_2: SSL.OP_NO_TLSv1_2,
+
+ # If we don't have TLS v1.3 yet, we can't disable it -- this is just so
+ # when it makes it into OpenSSL, connections knowingly bracketed to v1.2
+ # don't end up going to v1.3
+ TLSVersion.TLSv1_3: getattr(SSL, "OP_NO_TLSv1_3", 0x00),
+}
+
+
+
+def _getExcludedTLSProtocols(oldest, newest):
+ """
+ Given a pair of L{TLSVersion} constants, figure out what versions we want
+ to disable (as OpenSSL is an exclusion based API).
+
+ @param oldest: The oldest L{TLSVersion} we want to allow.
+ @type oldest: L{TLSVersion} constant
+
+ @param newest: The newest L{TLSVersion} we want to allow, or L{None} for no
+ upper limit.
+ @type newest: L{TLSVersion} constant or L{None}
+
+ @return: The versions we want to disable.
+ @rtype: L{list} of L{TLSVersion} constants.
+ """
+ versions = list(TLSVersion.iterconstants())
+ excludedVersions = [x for x in versions[:versions.index(oldest)]]
+
+ if newest:
+ excludedVersions.extend([x for x in versions[versions.index(newest):]])
+
+ return excludedVersions
+
+
+
+
+class SimpleVerificationError(Exception):
+ """
+ Not a very useful verification error.
+ """
+
+
+
+def simpleVerifyHostname(connection, hostname):
+ """
+ Check only the common name in the certificate presented by the peer and
+ only for an exact match.
+
+ This is to provide I{something} in the way of hostname verification to
+ users who haven't installed C{service_identity}. This check is overly
+ strict, relies on a deprecated TLS feature (you're supposed to ignore the
+ commonName if the subjectAlternativeName extensions are present, I
+ believe), and lots of valid certificates will fail.
+
+ @param connection: the OpenSSL connection to verify.
+ @type connection: L{OpenSSL.SSL.Connection}
+
+ @param hostname: The hostname expected by the user.
+ @type hostname: L{unicode}
+
+ @raise twisted.internet.ssl.VerificationError: if the common name and
+ hostname don't match.
+ """
+ commonName = connection.get_peer_certificate().get_subject().commonName
+ if commonName != hostname:
+ raise SimpleVerificationError(repr(commonName) + "!=" +
+ repr(hostname))
+
+
+
+def simpleVerifyIPAddress(connection, hostname):
+ """
+ Always fails validation of IP addresses
+
+ @param connection: the OpenSSL connection to verify.
+ @type connection: L{OpenSSL.SSL.Connection}
+
+ @param hostname: The hostname expected by the user.
+ @type hostname: L{unicode}
+
+ @raise twisted.internet.ssl.VerificationError: Always raised
+ """
+ raise SimpleVerificationError("Cannot verify certificate IP addresses")
+
+
+
+def _usablePyOpenSSL(version):
+ """
+ Check pyOpenSSL version string whether we can use it for host verification.
+
+ @param version: A pyOpenSSL version string.
+ @type version: L{str}
+
+ @rtype: L{bool}
+ """
+ major, minor = (int(part) for part in version.split(".")[:2])
+ return (major, minor) >= (0, 12)
+
+
+
+def _selectVerifyImplementation():
+ """
+ Determine if C{service_identity} is installed. If so, use it. If not, use
+ simplistic and incorrect checking as implemented in
+ L{simpleVerifyHostname}.
+
+ @return: 2-tuple of (C{verify_hostname}, C{VerificationError})
+ @rtype: L{tuple}
+ """
+
+ whatsWrong = (
+ "Without the service_identity module, Twisted can perform only "
+ "rudimentary TLS client hostname verification. Many valid "
+ "certificate/hostname mappings may be rejected."
+ )
+
+ try:
+ from service_identity import VerificationError
+ from service_identity.pyopenssl import (
+ verify_hostname, verify_ip_address
+ )
+ return verify_hostname, verify_ip_address, VerificationError
+ except ImportError as e:
+ warnings.warn_explicit(
+ "You do not have a working installation of the "
+ "service_identity module: '" + str(e) + "'. "
+ "Please install it from "
+ "<https://pypi.python.org/pypi/service_identity> and make "
+ "sure all of its dependencies are satisfied. "
+ + whatsWrong,
+ # Unfortunately the lineno is required.
+ category=UserWarning, filename="", lineno=0)
+
+ return simpleVerifyHostname, simpleVerifyIPAddress, SimpleVerificationError
+
+
+
+verifyHostname, verifyIPAddress, VerificationError = \
+ _selectVerifyImplementation()
+
+
+
+class ProtocolNegotiationSupport(Flags):
+ """
+ L{ProtocolNegotiationSupport} defines flags which are used to indicate the
+ level of NPN/ALPN support provided by the TLS backend.
+
+ @cvar NOSUPPORT: There is no support for NPN or ALPN. This is exclusive
+ with both L{NPN} and L{ALPN}.
+ @cvar NPN: The implementation supports Next Protocol Negotiation.
+ @cvar ALPN: The implementation supports Application Layer Protocol
+ Negotiation.
+ """
+ NPN = FlagConstant(0x0001)
+ ALPN = FlagConstant(0x0002)
+
+# FIXME: https://twistedmatrix.com/trac/ticket/8074
+# Currently flags with literal zero values behave incorrectly. However,
+# creating a flag by NOTing a flag with itself appears to work totally fine, so
+# do that instead.
+ProtocolNegotiationSupport.NOSUPPORT = (
+ ProtocolNegotiationSupport.NPN ^ ProtocolNegotiationSupport.NPN
+)
+
+
+def protocolNegotiationMechanisms():
+ """
+ Checks whether your versions of PyOpenSSL and OpenSSL are recent enough to
+ support protocol negotiation, and if they are, what kind of protocol
+ negotiation is supported.
+
+ @return: A combination of flags from L{ProtocolNegotiationSupport} that
+ indicate which mechanisms for protocol negotiation are supported.
+ @rtype: L{constantly.FlagConstant}
+ """
+ support = ProtocolNegotiationSupport.NOSUPPORT
+ ctx = SSL.Context(SSL.SSLv23_METHOD)
+
+ try:
+ ctx.set_npn_advertise_callback(lambda c: None)
+ except (AttributeError, NotImplementedError):
+ pass
+ else:
+ support |= ProtocolNegotiationSupport.NPN
+
+ try:
+ ctx.set_alpn_select_callback(lambda c: None)
+ except (AttributeError, NotImplementedError):
+ pass
+ else:
+ support |= ProtocolNegotiationSupport.ALPN
+
+ return support
+
+
+
+_x509names = {
+ 'CN': 'commonName',
+ 'commonName': 'commonName',
+
+ 'O': 'organizationName',
+ 'organizationName': 'organizationName',
+
+ 'OU': 'organizationalUnitName',
+ 'organizationalUnitName': 'organizationalUnitName',
+
+ 'L': 'localityName',
+ 'localityName': 'localityName',
+
+ 'ST': 'stateOrProvinceName',
+ 'stateOrProvinceName': 'stateOrProvinceName',
+
+ 'C': 'countryName',
+ 'countryName': 'countryName',
+
+ 'emailAddress': 'emailAddress'}
+
+
+
+class DistinguishedName(dict):
+ """
+ Identify and describe an entity.
+
+ Distinguished names are used to provide a minimal amount of identifying
+ information about a certificate issuer or subject. They are commonly
+ created with one or more of the following fields::
+
+ commonName (CN)
+ organizationName (O)
+ organizationalUnitName (OU)
+ localityName (L)
+ stateOrProvinceName (ST)
+ countryName (C)
+ emailAddress
+
+ A L{DistinguishedName} should be constructed using keyword arguments whose
+ keys can be any of the field names above (as a native string), and the
+ values are either Unicode text which is encodable to ASCII, or L{bytes}
+ limited to the ASCII subset. Any fields passed to the constructor will be
+ set as attributes, accessible using both their extended name and their
+ shortened acronym. The attribute values will be the ASCII-encoded
+ bytes. For example::
+
+ >>> dn = DistinguishedName(commonName=b'www.example.com',
+ ... C='US')
+ >>> dn.C
+ b'US'
+ >>> dn.countryName
+ b'US'
+ >>> hasattr(dn, "organizationName")
+ False
+
+ L{DistinguishedName} instances can also be used as dictionaries; the keys
+ are extended name of the fields::
+
+ >>> dn.keys()
+ ['countryName', 'commonName']
+ >>> dn['countryName']
+ b'US'
+
+ """
+ __slots__ = ()
+
+ def __init__(self, **kw):
+ for k, v in kw.items():
+ setattr(self, k, v)
+
+
+ def _copyFrom(self, x509name):
+ for name in _x509names:
+ value = getattr(x509name, name, None)
+ if value is not None:
+ setattr(self, name, value)
+
+
+ def _copyInto(self, x509name):
+ for k, v in self.items():
+ setattr(x509name, k, nativeString(v))
+
+
+ def __repr__(self):
+ return '<DN %s>' % (dict.__repr__(self)[1:-1])
+
+
+ def __getattr__(self, attr):
+ try:
+ return self[_x509names[attr]]
+ except KeyError:
+ raise AttributeError(attr)
+
+
+ def __setattr__(self, attr, value):
+ if attr not in _x509names:
+ raise AttributeError("%s is not a valid OpenSSL X509 name field" % (attr,))
+ realAttr = _x509names[attr]
+ if not isinstance(value, bytes):
+ value = value.encode("ascii")
+ self[realAttr] = value
+
+
+ def inspect(self):
+ """
+ Return a multi-line, human-readable representation of this DN.
+
+ @rtype: L{str}
+ """
+ l = []
+ lablen = 0
+ def uniqueValues(mapping):
+ return set(mapping.values())
+ for k in sorted(uniqueValues(_x509names)):
+ label = util.nameToLabel(k)
+ lablen = max(len(label), lablen)
+ v = getattr(self, k, None)
+ if v is not None:
+ l.append((label, nativeString(v)))
+ lablen += 2
+ for n, (label, attr) in enumerate(l):
+ l[n] = (label.rjust(lablen)+': '+ attr)
+ return '\n'.join(l)
+
+DN = DistinguishedName
+
+
+
+@_oldStyle
+class CertBase:
+ """
+ Base class for public (certificate only) and private (certificate + key
+ pair) certificates.
+
+ @ivar original: The underlying OpenSSL certificate object.
+ @type original: L{OpenSSL.crypto.X509}
+ """
+
+ def __init__(self, original):
+ self.original = original
+
+
+ def _copyName(self, suffix):
+ dn = DistinguishedName()
+ dn._copyFrom(getattr(self.original, 'get_'+suffix)())
+ return dn
+
+
+ def getSubject(self):
+ """
+ Retrieve the subject of this certificate.
+
+ @return: A copy of the subject of this certificate.
+ @rtype: L{DistinguishedName}
+ """
+ return self._copyName('subject')
+
+
+ def __conform__(self, interface):
+ """
+ Convert this L{CertBase} into a provider of the given interface.
+
+ @param interface: The interface to conform to.
+ @type interface: L{zope.interface.interfaces.IInterface}
+
+ @return: an L{IOpenSSLTrustRoot} provider or L{NotImplemented}
+ @rtype: L{IOpenSSLTrustRoot} or L{NotImplemented}
+ """
+ if interface is IOpenSSLTrustRoot:
+ return OpenSSLCertificateAuthorities([self.original])
+ return NotImplemented
+
+
+
+def _handleattrhelper(Class, transport, methodName):
+ """
+ (private) Helper for L{Certificate.peerFromTransport} and
+ L{Certificate.hostFromTransport} which checks for incompatible handle types
+ and null certificates and raises the appropriate exception or returns the
+ appropriate certificate object.
+ """
+ method = getattr(transport.getHandle(),
+ "get_%s_certificate" % (methodName,), None)
+ if method is None:
+ raise CertificateError(
+ "non-TLS transport %r did not have %s certificate" % (transport, methodName))
+ cert = method()
+ if cert is None:
+ raise CertificateError(
+ "TLS transport %r did not have %s certificate" % (transport, methodName))
+ return Class(cert)
+
+
+
+class Certificate(CertBase):
+ """
+ An x509 certificate.
+ """
+ def __repr__(self):
+ return '<%s Subject=%s Issuer=%s>' % (self.__class__.__name__,
+ self.getSubject().commonName,
+ self.getIssuer().commonName)
+
+
+ def __eq__(self, other):
+ if isinstance(other, Certificate):
+ return self.dump() == other.dump()
+ return False
+
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+
+ @classmethod
+ def load(Class, requestData, format=crypto.FILETYPE_ASN1, args=()):
+ """
+ Load a certificate from an ASN.1- or PEM-format string.
+
+ @rtype: C{Class}
+ """
+ return Class(crypto.load_certificate(format, requestData), *args)
+
+ # We can't use super() because it is old style still, so we have to hack
+ # around things wanting to call the parent function
+ _load = load
+
+
+ def dumpPEM(self):
+ """
+ Dump this certificate to a PEM-format data string.
+
+ @rtype: L{str}
+ """
+ return self.dump(crypto.FILETYPE_PEM)
+
+
+ @classmethod
+ def loadPEM(Class, data):
+ """
+ Load a certificate from a PEM-format data string.
+
+ @rtype: C{Class}
+ """
+ return Class.load(data, crypto.FILETYPE_PEM)
+
+
+ @classmethod
+ def peerFromTransport(Class, transport):
+ """
+ Get the certificate for the remote end of the given transport.
+
+ @param transport: an L{ISystemHandle} provider
+
+ @rtype: C{Class}
+
+ @raise: L{CertificateError}, if the given transport does not have a peer
+ certificate.
+ """
+ return _handleattrhelper(Class, transport, 'peer')
+
+
+ @classmethod
+ def hostFromTransport(Class, transport):
+ """
+ Get the certificate for the local end of the given transport.
+
+ @param transport: an L{ISystemHandle} provider; the transport we will
+
+ @rtype: C{Class}
+
+ @raise: L{CertificateError}, if the given transport does not have a host
+ certificate.
+ """
+ return _handleattrhelper(Class, transport, 'host')
+
+
+ def getPublicKey(self):
+ """
+ Get the public key for this certificate.
+
+ @rtype: L{PublicKey}
+ """
+ return PublicKey(self.original.get_pubkey())
+
+
+ def dump(self, format=crypto.FILETYPE_ASN1):
+ return crypto.dump_certificate(format, self.original)
+
+
+ def serialNumber(self):
+ """
+ Retrieve the serial number of this certificate.
+
+ @rtype: L{int}
+ """
+ return self.original.get_serial_number()
+
+
+ def digest(self, method='md5'):
+ """
+ Return a digest hash of this certificate using the specified hash
+ algorithm.
+
+ @param method: One of C{'md5'} or C{'sha'}.
+
+ @return: The digest of the object, formatted as b":"-delimited hex
+ pairs
+ @rtype: L{bytes}
+ """
+ return self.original.digest(method)
+
+
+ def _inspect(self):
+ return '\n'.join(['Certificate For Subject:',
+ self.getSubject().inspect(),
+ '\nIssuer:',
+ self.getIssuer().inspect(),
+ '\nSerial Number: %d' % self.serialNumber(),
+ 'Digest: %s' % nativeString(self.digest())])
+
+
+ def inspect(self):
+ """
+ Return a multi-line, human-readable representation of this
+ Certificate, including information about the subject, issuer, and
+ public key.
+ """
+ return '\n'.join((self._inspect(), self.getPublicKey().inspect()))
+
+
+ def getIssuer(self):
+ """
+ Retrieve the issuer of this certificate.
+
+ @rtype: L{DistinguishedName}
+ @return: A copy of the issuer of this certificate.
+ """
+ return self._copyName('issuer')
+
+
+ def options(self, *authorities):
+ raise NotImplementedError('Possible, but doubtful we need this yet')
+
+
+
+class CertificateRequest(CertBase):
+ """
+ An x509 certificate request.
+
+ Certificate requests are given to certificate authorities to be signed and
+ returned resulting in an actual certificate.
+ """
+ @classmethod
+ def load(Class, requestData, requestFormat=crypto.FILETYPE_ASN1):
+ req = crypto.load_certificate_request(requestFormat, requestData)
+ dn = DistinguishedName()
+ dn._copyFrom(req.get_subject())
+ if not req.verify(req.get_pubkey()):
+ raise VerifyError("Can't verify that request for %r is self-signed." % (dn,))
+ return Class(req)
+
+
+ def dump(self, format=crypto.FILETYPE_ASN1):
+ return crypto.dump_certificate_request(format, self.original)
+
+
+
+class PrivateCertificate(Certificate):
+ """
+ An x509 certificate and private key.
+ """
+ def __repr__(self):
+ return Certificate.__repr__(self) + ' with ' + repr(self.privateKey)
+
+
+ def _setPrivateKey(self, privateKey):
+ if not privateKey.matches(self.getPublicKey()):
+ raise VerifyError(
+ "Certificate public and private keys do not match.")
+ self.privateKey = privateKey
+ return self
+
+
+ def newCertificate(self, newCertData, format=crypto.FILETYPE_ASN1):
+ """
+ Create a new L{PrivateCertificate} from the given certificate data and
+ this instance's private key.
+ """
+ return self.load(newCertData, self.privateKey, format)
+
+
+ @classmethod
+ def load(Class, data, privateKey, format=crypto.FILETYPE_ASN1):
+ return Class._load(data, format)._setPrivateKey(privateKey)
+
+
+ def inspect(self):
+ return '\n'.join([Certificate._inspect(self),
+ self.privateKey.inspect()])
+
+
+ def dumpPEM(self):
+ """
+ Dump both public and private parts of a private certificate to
+ PEM-format data.
+ """
+ return self.dump(crypto.FILETYPE_PEM) + self.privateKey.dump(crypto.FILETYPE_PEM)
+
+
+ @classmethod
+ def loadPEM(Class, data):
+ """
+ Load both private and public parts of a private certificate from a
+ chunk of PEM-format data.
+ """
+ return Class.load(data, KeyPair.load(data, crypto.FILETYPE_PEM),
+ crypto.FILETYPE_PEM)
+
+
+ @classmethod
+ def fromCertificateAndKeyPair(Class, certificateInstance, privateKey):
+ privcert = Class(certificateInstance.original)
+ return privcert._setPrivateKey(privateKey)
+
+
+ def options(self, *authorities):
+ """
+ Create a context factory using this L{PrivateCertificate}'s certificate
+ and private key.
+
+ @param authorities: A list of L{Certificate} object
+
+ @return: A context factory.
+ @rtype: L{CertificateOptions <twisted.internet.ssl.CertificateOptions>}
+ """
+ options = dict(privateKey=self.privateKey.original,
+ certificate=self.original)
+ if authorities:
+ options.update(dict(trustRoot=OpenSSLCertificateAuthorities(
+ [auth.original for auth in authorities]
+ )))
+ return OpenSSLCertificateOptions(**options)
+
+
+ def certificateRequest(self, format=crypto.FILETYPE_ASN1,
+ digestAlgorithm='sha256'):
+ return self.privateKey.certificateRequest(
+ self.getSubject(),
+ format,
+ digestAlgorithm)
+
+
+ def signCertificateRequest(self,
+ requestData,
+ verifyDNCallback,
+ serialNumber,
+ requestFormat=crypto.FILETYPE_ASN1,
+ certificateFormat=crypto.FILETYPE_ASN1):
+ issuer = self.getSubject()
+ return self.privateKey.signCertificateRequest(
+ issuer,
+ requestData,
+ verifyDNCallback,
+ serialNumber,
+ requestFormat,
+ certificateFormat)
+
+
+ def signRequestObject(self, certificateRequest, serialNumber,
+ secondsToExpiry=60 * 60 * 24 * 365, # One year
+ digestAlgorithm='sha256'):
+ return self.privateKey.signRequestObject(self.getSubject(),
+ certificateRequest,
+ serialNumber,
+ secondsToExpiry,
+ digestAlgorithm)
+
+
+
+@_oldStyle
+class PublicKey:
+ """
+ A L{PublicKey} is a representation of the public part of a key pair.
+
+ You can't do a whole lot with it aside from comparing it to other
+ L{PublicKey} objects.
+
+ @note: If constructing a L{PublicKey} manually, be sure to pass only a
+ L{OpenSSL.crypto.PKey} that does not contain a private key!
+
+ @ivar original: The original private key.
+ """
+
+ def __init__(self, osslpkey):
+ """
+ @param osslpkey: The underlying pyOpenSSL key object.
+ @type osslpkey: L{OpenSSL.crypto.PKey}
+ """
+ self.original = osslpkey
+
+
+ def matches(self, otherKey):
+ """
+ Does this L{PublicKey} contain the same value as another L{PublicKey}?
+
+ @param otherKey: The key to compare C{self} to.
+ @type otherKey: L{PublicKey}
+
+ @return: L{True} if these keys match, L{False} if not.
+ @rtype: L{bool}
+ """
+ return self.keyHash() == otherKey.keyHash()
+
+
+ def __repr__(self):
+ return '<%s %s>' % (self.__class__.__name__, self.keyHash())
+
+
+ def keyHash(self):
+ """
+ Compute a hash of the underlying PKey object.
+
+ The purpose of this method is to allow you to determine if two
+ certificates share the same public key; it is not really useful for
+ anything else.
+
+ In versions of Twisted prior to 15.0, C{keyHash} used a technique
+ involving certificate requests for computing the hash that was not
+ stable in the face of changes to the underlying OpenSSL library.
+
+ @return: Return a 32-character hexadecimal string uniquely identifying
+ this public key, I{for this version of Twisted}.
+ @rtype: native L{str}
+ """
+ raw = crypto.dump_publickey(crypto.FILETYPE_ASN1, self.original)
+ h = md5()
+ h.update(raw)
+ return h.hexdigest()
+
+
+ def inspect(self):
+ return 'Public Key with Hash: %s' % (self.keyHash(),)
+
+
+
+class KeyPair(PublicKey):
+
+ @classmethod
+ def load(Class, data, format=crypto.FILETYPE_ASN1):
+ return Class(crypto.load_privatekey(format, data))
+
+
+ def dump(self, format=crypto.FILETYPE_ASN1):
+ return crypto.dump_privatekey(format, self.original)
+
+
+ def __getstate__(self):
+ return self.dump()
+
+
+ def __setstate__(self, state):
+ self.__init__(crypto.load_privatekey(crypto.FILETYPE_ASN1, state))
+
+
+ def inspect(self):
+ t = self.original.type()
+ if t == crypto.TYPE_RSA:
+ ts = 'RSA'
+ elif t == crypto.TYPE_DSA:
+ ts = 'DSA'
+ else:
+ ts = '(Unknown Type!)'
+ L = (self.original.bits(), ts, self.keyHash())
+ return '%s-bit %s Key Pair with Hash: %s' % L
+
+
+ @classmethod
+ def generate(Class, kind=crypto.TYPE_RSA, size=2048):
+ pkey = crypto.PKey()
+ pkey.generate_key(kind, size)
+ return Class(pkey)
+
+
+ def newCertificate(self, newCertData, format=crypto.FILETYPE_ASN1):
+ return PrivateCertificate.load(newCertData, self, format)
+
+
+ def requestObject(self, distinguishedName, digestAlgorithm='sha256'):
+ req = crypto.X509Req()
+ req.set_pubkey(self.original)
+ distinguishedName._copyInto(req.get_subject())
+ req.sign(self.original, digestAlgorithm)
+ return CertificateRequest(req)
+
+
+ def certificateRequest(self, distinguishedName,
+ format=crypto.FILETYPE_ASN1,
+ digestAlgorithm='sha256'):
+ """
+ Create a certificate request signed with this key.
+
+ @return: a string, formatted according to the 'format' argument.
+ """
+ return self.requestObject(distinguishedName, digestAlgorithm).dump(format)
+
+
+ def signCertificateRequest(self,
+ issuerDistinguishedName,
+ requestData,
+ verifyDNCallback,
+ serialNumber,
+ requestFormat=crypto.FILETYPE_ASN1,
+ certificateFormat=crypto.FILETYPE_ASN1,
+ secondsToExpiry=60 * 60 * 24 * 365, # One year
+ digestAlgorithm='sha256'):
+ """
+ Given a blob of certificate request data and a certificate authority's
+ DistinguishedName, return a blob of signed certificate data.
+
+ If verifyDNCallback returns a Deferred, I will return a Deferred which
+ fires the data when that Deferred has completed.
+ """
+ hlreq = CertificateRequest.load(requestData, requestFormat)
+
+ dn = hlreq.getSubject()
+ vval = verifyDNCallback(dn)
+
+ def verified(value):
+ if not value:
+ raise VerifyError("DN callback %r rejected request DN %r" % (verifyDNCallback, dn))
+ return self.signRequestObject(issuerDistinguishedName, hlreq,
+ serialNumber, secondsToExpiry, digestAlgorithm).dump(certificateFormat)
+
+ if isinstance(vval, Deferred):
+ return vval.addCallback(verified)
+ else:
+ return verified(vval)
+
+
+ def signRequestObject(self,
+ issuerDistinguishedName,
+ requestObject,
+ serialNumber,
+ secondsToExpiry=60 * 60 * 24 * 365, # One year
+ digestAlgorithm='sha256'):
+ """
+ Sign a CertificateRequest instance, returning a Certificate instance.
+ """
+ req = requestObject.original
+ cert = crypto.X509()
+ issuerDistinguishedName._copyInto(cert.get_issuer())
+ cert.set_subject(req.get_subject())
+ cert.set_pubkey(req.get_pubkey())
+ cert.gmtime_adj_notBefore(0)
+ cert.gmtime_adj_notAfter(secondsToExpiry)
+ cert.set_serial_number(serialNumber)
+ cert.sign(self.original, digestAlgorithm)
+ return Certificate(cert)
+
+
+ def selfSignedCert(self, serialNumber, **kw):
+ dn = DN(**kw)
+ return PrivateCertificate.fromCertificateAndKeyPair(
+ self.signRequestObject(dn, self.requestObject(dn), serialNumber),
+ self)
+
+KeyPair.__getstate__ = deprecated(Version("Twisted", 15, 0, 0),
+ "a real persistence system")(KeyPair.__getstate__)
+KeyPair.__setstate__ = deprecated(Version("Twisted", 15, 0, 0),
+ "a real persistence system")(KeyPair.__setstate__)
+
+
+
+class IOpenSSLTrustRoot(Interface):
+ """
+ Trust settings for an OpenSSL context.
+
+ Note that this interface's methods are private, so things outside of
+ Twisted shouldn't implement it.
+ """
+
+ def _addCACertsToContext(context):
+ """
+ Add certificate-authority certificates to an SSL context whose
+ connections should trust those authorities.
+
+ @param context: An SSL context for a connection which should be
+ verified by some certificate authority.
+ @type context: L{OpenSSL.SSL.Context}
+
+ @return: L{None}
+ """
+
+
+
+@implementer(IOpenSSLTrustRoot)
+class OpenSSLCertificateAuthorities(object):
+ """
+ Trust an explicitly specified set of certificates, represented by a list of
+ L{OpenSSL.crypto.X509} objects.
+ """
+
+ def __init__(self, caCerts):
+ """
+ @param caCerts: The certificate authorities to trust when using this
+ object as a C{trustRoot} for L{OpenSSLCertificateOptions}.
+ @type caCerts: L{list} of L{OpenSSL.crypto.X509}
+ """
+ self._caCerts = caCerts
+
+
+ def _addCACertsToContext(self, context):
+ store = context.get_cert_store()
+ for cert in self._caCerts:
+ store.add_cert(cert)
+
+
+
+def trustRootFromCertificates(certificates):
+ """
+ Builds an object that trusts multiple root L{Certificate}s.
+
+ When passed to L{optionsForClientTLS}, connections using those options will
+ reject any server certificate not signed by at least one of the
+ certificates in the `certificates` list.
+
+ @since: 16.0
+
+ @param certificates: All certificates which will be trusted.
+ @type certificates: C{iterable} of L{CertBase}
+
+ @rtype: L{IOpenSSLTrustRoot}
+ @return: an object suitable for use as the trustRoot= keyword argument to
+ L{optionsForClientTLS}
+ """
+
+ certs = []
+ for cert in certificates:
+ # PrivateCertificate or Certificate are both okay
+ if isinstance(cert, CertBase):
+ cert = cert.original
+ else:
+ raise TypeError(
+ "certificates items must be twisted.internet.ssl.CertBase"
+ " instances"
+ )
+ certs.append(cert)
+ return OpenSSLCertificateAuthorities(certs)
+
+
+
+@implementer(IOpenSSLTrustRoot)
+class OpenSSLDefaultPaths(object):
+ """
+ Trust the set of default verify paths that OpenSSL was built with, as
+ specified by U{SSL_CTX_set_default_verify_paths
+ <https://www.openssl.org/docs/ssl/SSL_CTX_load_verify_locations.html>}.
+ """
+
+ def _addCACertsToContext(self, context):
+ context.set_default_verify_paths()
+
+
+
+def platformTrust():
+ """
+ Attempt to discover a set of trusted certificate authority certificates
+ (or, in other words: trust roots, or root certificates) whose trust is
+ managed and updated by tools outside of Twisted.
+
+ If you are writing any client-side TLS code with Twisted, you should use
+ this as the C{trustRoot} argument to L{CertificateOptions
+ <twisted.internet.ssl.CertificateOptions>}.
+
+ The result of this function should be like the up-to-date list of
+ certificates in a web browser. When developing code that uses
+ C{platformTrust}, you can think of it that way. However, the choice of
+ which certificate authorities to trust is never Twisted's responsibility.
+ Unless you're writing a very unusual application or library, it's not your
+ code's responsibility either. The user may use platform-specific tools for
+ defining which server certificates should be trusted by programs using TLS.
+ The purpose of using this API is to respect that decision as much as
+ possible.
+
+ This should be a set of trust settings most appropriate for I{client} TLS
+ connections; i.e. those which need to verify a server's authenticity. You
+ should probably use this by default for any client TLS connection that you
+ create. For servers, however, client certificates are typically not
+ verified; or, if they are, their verification will depend on a custom,
+ application-specific certificate authority.
+
+ @since: 14.0
+
+ @note: Currently, L{platformTrust} depends entirely upon your OpenSSL build
+ supporting a set of "L{default verify paths <OpenSSLDefaultPaths>}"
+ which correspond to certificate authority trust roots. Unfortunately,
+ whether this is true of your system is both outside of Twisted's
+ control and difficult (if not impossible) for Twisted to detect
+ automatically.
+
+ Nevertheless, this ought to work as desired by default on:
+
+ - Ubuntu Linux machines with the U{ca-certificates
+ <https://launchpad.net/ubuntu/+source/ca-certificates>} package
+ installed,
+
+ - macOS when using the system-installed version of OpenSSL (i.e.
+ I{not} one installed via MacPorts or Homebrew),
+
+ - any build of OpenSSL which has had certificate authority
+ certificates installed into its default verify paths (by default,
+ C{/usr/local/ssl/certs} if you've built your own OpenSSL), or
+
+ - any process where the C{SSL_CERT_FILE} environment variable is
+ set to the path of a file containing your desired CA certificates
+ bundle.
+
+ Hopefully soon, this API will be updated to use more sophisticated
+ trust-root discovery mechanisms. Until then, you can follow tickets in
+ the Twisted tracker for progress on this implementation on U{Microsoft
+ Windows <https://twistedmatrix.com/trac/ticket/6371>}, U{macOS
+ <https://twistedmatrix.com/trac/ticket/6372>}, and U{a fallback for
+ other platforms which do not have native trust management tools
+ <https://twistedmatrix.com/trac/ticket/6934>}.
+
+ @return: an appropriate trust settings object for your platform.
+ @rtype: L{IOpenSSLTrustRoot}
+
+ @raise NotImplementedError: if this platform is not yet supported by
+ Twisted. At present, only OpenSSL is supported.
+ """
+ return OpenSSLDefaultPaths()
+
+
+
+def _tolerateErrors(wrapped):
+ """
+ Wrap up an C{info_callback} for pyOpenSSL so that if something goes wrong
+ the error is immediately logged and the connection is dropped if possible.
+
+ This wrapper exists because some versions of pyOpenSSL don't handle errors
+ from callbacks at I{all}, and those which do write tracebacks directly to
+ stderr rather than to a supplied logging system. This reports unexpected
+ errors to the Twisted logging system.
+
+ Also, this terminates the connection immediately if possible because if
+ you've got bugs in your verification logic it's much safer to just give up.
+
+ @param wrapped: A valid C{info_callback} for pyOpenSSL.
+ @type wrapped: L{callable}
+
+ @return: A valid C{info_callback} for pyOpenSSL that handles any errors in
+ C{wrapped}.
+ @rtype: L{callable}
+ """
+ def infoCallback(connection, where, ret):
+ try:
+ return wrapped(connection, where, ret)
+ except:
+ f = Failure()
+ log.err(f, "Error during info_callback")
+ connection.get_app_data().failVerification(f)
+ return infoCallback
+
+
+
+@implementer(IOpenSSLClientConnectionCreator)
+class ClientTLSOptions(object):
+ """
+ Client creator for TLS.
+
+ Private implementation type (not exposed to applications) for public
+ L{optionsForClientTLS} API.
+
+ @ivar _ctx: The context to use for new connections.
+ @type _ctx: L{OpenSSL.SSL.Context}
+
+ @ivar _hostname: The hostname to verify, as specified by the application,
+ as some human-readable text.
+ @type _hostname: L{unicode}
+
+ @ivar _hostnameBytes: The hostname to verify, decoded into IDNA-encoded
+ bytes. This is passed to APIs which think that hostnames are bytes,
+ such as OpenSSL's SNI implementation.
+ @type _hostnameBytes: L{bytes}
+
+ @ivar _hostnameASCII: The hostname, as transcoded into IDNA ASCII-range
+ unicode code points. This is pre-transcoded because the
+ C{service_identity} package is rather strict about requiring the
+ C{idna} package from PyPI for internationalized domain names, rather
+ than working with Python's built-in (but sometimes broken) IDNA
+ encoding. ASCII values, however, will always work.
+ @type _hostnameASCII: L{unicode}
+
+ @ivar _hostnameIsDnsName: Whether or not the C{_hostname} is a DNSName.
+ Will be L{False} if C{_hostname} is an IP address or L{True} if
+ C{_hostname} is a DNSName
+ @type _hostnameIsDnsName: L{bool}
+ """
+
+ def __init__(self, hostname, ctx):
+ """
+ Initialize L{ClientTLSOptions}.
+
+ @param hostname: The hostname to verify as input by a human.
+ @type hostname: L{unicode}
+
+ @param ctx: an L{OpenSSL.SSL.Context} to use for new connections.
+ @type ctx: L{OpenSSL.SSL.Context}.
+ """
+ self._ctx = ctx
+ self._hostname = hostname
+
+ if isIPAddress(hostname) or isIPv6Address(hostname):
+ self._hostnameBytes = hostname.encode('ascii')
+ self._hostnameIsDnsName = False
+ else:
+ self._hostnameBytes = _idnaBytes(hostname)
+ self._hostnameIsDnsName = True
+
+ self._hostnameASCII = self._hostnameBytes.decode("ascii")
+ ctx.set_info_callback(
+ _tolerateErrors(self._identityVerifyingInfoCallback)
+ )
+
+
+ def clientConnectionForTLS(self, tlsProtocol):
+ """
+ Create a TLS connection for a client.
+
+ @note: This will call C{set_app_data} on its connection. If you're
+ delegating to this implementation of this method, don't ever call
+ C{set_app_data} or C{set_info_callback} on the returned connection,
+ or you'll break the implementation of various features of this
+ class.
+
+ @param tlsProtocol: the TLS protocol initiating the connection.
+ @type tlsProtocol: L{twisted.protocols.tls.TLSMemoryBIOProtocol}
+
+ @return: the configured client connection.
+ @rtype: L{OpenSSL.SSL.Connection}
+ """
+ context = self._ctx
+ connection = SSL.Connection(context, None)
+ connection.set_app_data(tlsProtocol)
+ return connection
+
+
+ def _identityVerifyingInfoCallback(self, connection, where, ret):
+ """
+ U{info_callback
+ <http://pythonhosted.org/pyOpenSSL/api/ssl.html#OpenSSL.SSL.Context.set_info_callback>
+ } for pyOpenSSL that verifies the hostname in the presented certificate
+ matches the one passed to this L{ClientTLSOptions}.
+
+ @param connection: the connection which is handshaking.
+ @type connection: L{OpenSSL.SSL.Connection}
+
+ @param where: flags indicating progress through a TLS handshake.
+ @type where: L{int}
+
+ @param ret: ignored
+ @type ret: ignored
+ """
+ # Literal IPv4 and IPv6 addresses are not permitted
+ # as host names according to the RFCs
+ if where & SSL.SSL_CB_HANDSHAKE_START and self._hostnameIsDnsName:
+ connection.set_tlsext_host_name(self._hostnameBytes)
+ elif where & SSL.SSL_CB_HANDSHAKE_DONE:
+ try:
+ if self._hostnameIsDnsName:
+ verifyHostname(connection, self._hostnameASCII)
+ else:
+ verifyIPAddress(connection, self._hostnameASCII)
+ except VerificationError:
+ f = Failure()
+ transport = connection.get_app_data()
+ transport.failVerification(f)
+
+
+
+def optionsForClientTLS(hostname, trustRoot=None, clientCertificate=None,
+ acceptableProtocols=None, **kw):
+ """
+ Create a L{client connection creator <IOpenSSLClientConnectionCreator>} for
+ use with APIs such as L{SSL4ClientEndpoint
+ <twisted.internet.endpoints.SSL4ClientEndpoint>}, L{connectSSL
+ <twisted.internet.interfaces.IReactorSSL.connectSSL>}, and L{startTLS
+ <twisted.internet.interfaces.ITLSTransport.startTLS>}.
+
+ @since: 14.0
+
+ @param hostname: The expected name of the remote host. This serves two
+ purposes: first, and most importantly, it verifies that the certificate
+ received from the server correctly identifies the specified hostname.
+ The second purpose is to use the U{Server Name Indication extension
+ <https://en.wikipedia.org/wiki/Server_Name_Indication>} to indicate to
+ the server which certificate should be used.
+ @type hostname: L{unicode}
+
+ @param trustRoot: Specification of trust requirements of peers. This may be
+ a L{Certificate} or the result of L{platformTrust}. By default it is
+ L{platformTrust} and you probably shouldn't adjust it unless you really
+ know what you're doing. Be aware that clients using this interface
+ I{must} verify the server; you cannot explicitly pass L{None} since
+ that just means to use L{platformTrust}.
+ @type trustRoot: L{IOpenSSLTrustRoot}
+
+ @param clientCertificate: The certificate and private key that the client
+ will use to authenticate to the server. If unspecified, the client will
+ not authenticate.
+ @type clientCertificate: L{PrivateCertificate}
+
+ @param acceptableProtocols: The protocols this peer is willing to speak
+ after the TLS negotiation has completed, advertised over both ALPN and
+ NPN. If this argument is specified, and no overlap can be found with
+ the other peer, the connection will fail to be established. If the
+ remote peer does not offer NPN or ALPN, the connection will be
+ established, but no protocol wil be negotiated. Protocols earlier in
+ the list are preferred over those later in the list.
+ @type acceptableProtocols: L{list} of L{bytes}
+
+ @param extraCertificateOptions: keyword-only argument; this is a dictionary
+ of additional keyword arguments to be presented to
+ L{CertificateOptions}. Please avoid using this unless you absolutely
+ need to; any time you need to pass an option here that is a bug in this
+ interface.
+ @type extraCertificateOptions: L{dict}
+
+ @param kw: (Backwards compatibility hack to allow keyword-only arguments on
+ Python 2. Please ignore; arbitrary keyword arguments will be errors.)
+ @type kw: L{dict}
+
+ @return: A client connection creator.
+ @rtype: L{IOpenSSLClientConnectionCreator}
+ """
+ extraCertificateOptions = kw.pop('extraCertificateOptions', None) or {}
+ if trustRoot is None:
+ trustRoot = platformTrust()
+ if kw:
+ raise TypeError(
+ "optionsForClientTLS() got an unexpected keyword argument"
+ " '{arg}'".format(
+ arg=kw.popitem()[0]
+ )
+ )
+ if not isinstance(hostname, unicode):
+ raise TypeError(
+ "optionsForClientTLS requires text for host names, not "
+ + hostname.__class__.__name__
+ )
+ if clientCertificate:
+ extraCertificateOptions.update(
+ privateKey=clientCertificate.privateKey.original,
+ certificate=clientCertificate.original
+ )
+ certificateOptions = OpenSSLCertificateOptions(
+ trustRoot=trustRoot,
+ acceptableProtocols=acceptableProtocols,
+ **extraCertificateOptions
+ )
+ return ClientTLSOptions(hostname, certificateOptions.getContext())
+
+
+
+@implementer(IOpenSSLContextFactory)
+class OpenSSLCertificateOptions(object):
+ """
+ A L{CertificateOptions <twisted.internet.ssl.CertificateOptions>} specifies
+ the security properties for a client or server TLS connection used with
+ OpenSSL.
+
+ @ivar _options: Any option flags to set on the L{OpenSSL.SSL.Context}
+ object that will be created.
+ @type _options: L{int}
+
+ @ivar _cipherString: An OpenSSL-specific cipher string.
+ @type _cipherString: L{unicode}
+
+ @ivar _defaultMinimumTLSVersion: The default TLS version that will be
+ negotiated. This should be a "safe default", with wide client and
+ server support, vs an optimally secure one that excludes a large number
+ of users. As of late 2016, TLSv1.0 is that safe default.
+ @type _defaultMinimumTLSVersion: L{TLSVersion} constant
+ """
+
+ # Factory for creating contexts. Configurable for testability.
+ _contextFactory = SSL.Context
+ _context = None
+
+ _OP_NO_TLSv1_3 = _tlsDisableFlags[TLSVersion.TLSv1_3]
+
+ _defaultMinimumTLSVersion = TLSVersion.TLSv1_0
+
+ @_mutuallyExclusiveArguments([
+ ['trustRoot', 'requireCertificate'],
+ ['trustRoot', 'verify'],
+ ['trustRoot', 'caCerts'],
+ ['method', 'insecurelyLowerMinimumTo'],
+ ['method', 'raiseMinimumTo'],
+ ['raiseMinimumTo', 'insecurelyLowerMinimumTo'],
+ ['method', 'lowerMaximumSecurityTo'],
+ ])
+ def __init__(self,
+ privateKey=None,
+ certificate=None,
+ method=None,
+ verify=False,
+ caCerts=None,
+ verifyDepth=9,
+ requireCertificate=True,
+ verifyOnce=True,
+ enableSingleUseKeys=True,
+ enableSessions=True,
+ fixBrokenPeers=False,
+ enableSessionTickets=False,
+ extraCertChain=None,
+ acceptableCiphers=None,
+ dhParameters=None,
+ trustRoot=None,
+ acceptableProtocols=None,
+ raiseMinimumTo=None,
+ insecurelyLowerMinimumTo=None,
+ lowerMaximumSecurityTo=None,
+ ):
+ """
+ Create an OpenSSL context SSL connection context factory.
+
+ @param privateKey: A PKey object holding the private key.
+
+ @param certificate: An X509 object holding the certificate.
+
+ @param method: Deprecated, use a combination of
+ C{insecurelyLowerMinimumTo}, C{raiseMinimumTo}, or
+ C{lowerMaximumSecurityTo} instead. The SSL protocol to use, one of
+ C{SSLv23_METHOD}, C{SSLv2_METHOD}, C{SSLv3_METHOD}, C{TLSv1_METHOD}
+ (or any other method constants provided by pyOpenSSL). By default,
+ a setting will be used which allows TLSv1.0, TLSv1.1, and TLSv1.2.
+ Can not be used with C{insecurelyLowerMinimumTo},
+ C{raiseMinimumTo}, or C{lowerMaximumSecurityTo}
+
+ @param verify: Please use a C{trustRoot} keyword argument instead,
+ since it provides the same functionality in a less error-prone way.
+ By default this is L{False}.
+
+ If L{True}, verify certificates received from the peer and fail the
+ handshake if verification fails. Otherwise, allow anonymous
+ sessions and sessions with certificates which fail validation.
+
+ @param caCerts: Please use a C{trustRoot} keyword argument instead,
+ since it provides the same functionality in a less error-prone way.
+
+ List of certificate authority certificate objects to use to verify
+ the peer's certificate. Only used if verify is L{True} and will be
+ ignored otherwise. Since verify is L{False} by default, this is
+ L{None} by default.
+
+ @type caCerts: L{list} of L{OpenSSL.crypto.X509}
+
+ @param verifyDepth: Depth in certificate chain down to which to verify.
+ If unspecified, use the underlying default (9).
+
+ @param requireCertificate: Please use a C{trustRoot} keyword argument
+ instead, since it provides the same functionality in a less
+ error-prone way.
+
+ If L{True}, do not allow anonymous sessions; defaults to L{True}.
+
+ @param verifyOnce: If True, do not re-verify the certificate on session
+ resumption.
+
+ @param enableSingleUseKeys: If L{True}, generate a new key whenever
+ ephemeral DH and ECDH parameters are used to prevent small subgroup
+ attacks and to ensure perfect forward secrecy.
+
+ @param enableSessions: If True, set a session ID on each context. This
+ allows a shortened handshake to be used when a known client
+ reconnects.
+
+ @param fixBrokenPeers: If True, enable various non-spec protocol fixes
+ for broken SSL implementations. This should be entirely safe,
+ according to the OpenSSL documentation, but YMMV. This option is
+ now off by default, because it causes problems with connections
+ between peers using OpenSSL 0.9.8a.
+
+ @param enableSessionTickets: If L{True}, enable session ticket
+ extension for session resumption per RFC 5077. Note there is no
+ support for controlling session tickets. This option is off by
+ default, as some server implementations don't correctly process
+ incoming empty session ticket extensions in the hello.
+
+ @param extraCertChain: List of certificates that I{complete} your
+ verification chain if the certificate authority that signed your
+ C{certificate} isn't widely supported. Do I{not} add
+ C{certificate} to it.
+ @type extraCertChain: C{list} of L{OpenSSL.crypto.X509}
+
+ @param acceptableCiphers: Ciphers that are acceptable for connections.
+ Uses a secure default if left L{None}.
+ @type acceptableCiphers: L{IAcceptableCiphers}
+
+ @param dhParameters: Key generation parameters that are required for
+ Diffie-Hellman key exchange. If this argument is left L{None},
+ C{EDH} ciphers are I{disabled} regardless of C{acceptableCiphers}.
+ @type dhParameters: L{DiffieHellmanParameters
+ <twisted.internet.ssl.DiffieHellmanParameters>}
+
+ @param trustRoot: Specification of trust requirements of peers. If
+ this argument is specified, the peer is verified. It requires a
+ certificate, and that certificate must be signed by one of the
+ certificate authorities specified by this object.
+
+ Note that since this option specifies the same information as
+ C{caCerts}, C{verify}, and C{requireCertificate}, specifying any of
+ those options in combination with this one will raise a
+ L{TypeError}.
+
+ @type trustRoot: L{IOpenSSLTrustRoot}
+
+ @param acceptableProtocols: The protocols this peer is willing to speak
+ after the TLS negotiation has completed, advertised over both ALPN
+ and NPN. If this argument is specified, and no overlap can be
+ found with the other peer, the connection will fail to be
+ established. If the remote peer does not offer NPN or ALPN, the
+ connection will be established, but no protocol wil be negotiated.
+ Protocols earlier in the list are preferred over those later in the
+ list.
+ @type acceptableProtocols: L{list} of L{bytes}
+
+ @param raiseMinimumTo: The minimum TLS version that you want to use, or
+ Twisted's default if it is higher. Use this if you want to make
+ your client/server more secure than Twisted's default, but will
+ accept Twisted's default instead if it moves higher than this
+ value. You probably want to use this over
+ C{insecurelyLowerMinimumTo}.
+ @type raiseMinimumTo: L{TLSVersion} constant
+
+ @param insecurelyLowerMinimumTo: The minimum TLS version to use,
+ possibly lower than Twisted's default. If not specified, it is a
+ generally considered safe default (TLSv1.0). If you want to raise
+ your minimum TLS version to above that of this default, use
+ C{raiseMinimumTo}. DO NOT use this argument unless you are
+ absolutely sure this is what you want.
+ @type insecurelyLowerMinimumTo: L{TLSVersion} constant
+
+ @param lowerMaximumSecurityTo: The maximum TLS version to use. If not
+ specified, it is the most recent your OpenSSL supports. You only
+ want to set this if the peer that you are communicating with has
+ problems with more recent TLS versions, it lowers your security
+ when communicating with newer peers. DO NOT use this argument
+ unless you are absolutely sure this is what you want.
+ @type lowerMaximumSecurityTo: L{TLSVersion} constant
+
+ @raise ValueError: when C{privateKey} or C{certificate} are set without
+ setting the respective other.
+ @raise ValueError: when C{verify} is L{True} but C{caCerts} doesn't
+ specify any CA certificates.
+ @raise ValueError: when C{extraCertChain} is passed without specifying
+ C{privateKey} or C{certificate}.
+ @raise ValueError: when C{acceptableCiphers} doesn't yield any usable
+ ciphers for the current platform.
+
+ @raise TypeError: if C{trustRoot} is passed in combination with
+ C{caCert}, C{verify}, or C{requireCertificate}. Please prefer
+ C{trustRoot} in new code, as its semantics are less tricky.
+ @raise TypeError: if C{method} is passed in combination with
+ C{tlsProtocols}. Please prefer the more explicit C{tlsProtocols}
+ in new code.
+
+ @raises NotImplementedError: If acceptableProtocols were provided but
+ no negotiation mechanism is available.
+ """
+
+ if (privateKey is None) != (certificate is None):
+ raise ValueError(
+ "Specify neither or both of privateKey and certificate")
+ self.privateKey = privateKey
+ self.certificate = certificate
+
+ # Set basic security options: disallow insecure SSLv2, disallow TLS
+ # compression to avoid CRIME attack, make the server choose the
+ # ciphers.
+ self._options = (
+ SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION |
+ SSL.OP_CIPHER_SERVER_PREFERENCE
+ )
+
+ # Set the mode to Release Buffers, which demallocs send/recv buffers on
+ # idle TLS connections to save memory
+ self._mode = SSL.MODE_RELEASE_BUFFERS
+
+ if method is None:
+ self.method = SSL.SSLv23_METHOD
+
+ if raiseMinimumTo:
+ if (lowerMaximumSecurityTo and
+ raiseMinimumTo > lowerMaximumSecurityTo):
+ raise ValueError(
+ ("raiseMinimumTo needs to be lower than "
+ "lowerMaximumSecurityTo"))
+
+ if raiseMinimumTo > self._defaultMinimumTLSVersion:
+ insecurelyLowerMinimumTo = raiseMinimumTo
+
+ if insecurelyLowerMinimumTo is None:
+ insecurelyLowerMinimumTo = self._defaultMinimumTLSVersion
+
+ # If you set the max lower than the default, but don't set the
+ # minimum, pull it down to that
+ if (lowerMaximumSecurityTo and
+ insecurelyLowerMinimumTo > lowerMaximumSecurityTo):
+ insecurelyLowerMinimumTo = lowerMaximumSecurityTo
+
+ if (lowerMaximumSecurityTo and
+ insecurelyLowerMinimumTo > lowerMaximumSecurityTo):
+ raise ValueError(
+ ("insecurelyLowerMinimumTo needs to be lower than "
+ "lowerMaximumSecurityTo"))
+
+ excludedVersions = _getExcludedTLSProtocols(
+ insecurelyLowerMinimumTo, lowerMaximumSecurityTo)
+
+ for version in excludedVersions:
+ self._options |= _tlsDisableFlags[version]
+ else:
+ warnings.warn(
+ ("Passing method to twisted.internet.ssl.CertificateOptions "
+ "was deprecated in Twisted 17.1.0. Please use a combination "
+ "of insecurelyLowerMinimumTo, raiseMinimumTo, and "
+ "lowerMaximumSecurityTo instead, as Twisted will correctly "
+ "configure the method."),
+ DeprecationWarning, stacklevel=3)
+
+ # Otherwise respect the application decision.
+ self.method = method
+
+ if verify and not caCerts:
+ raise ValueError("Specify client CA certificate information if and"
+ " only if enabling certificate verification")
+ self.verify = verify
+ if extraCertChain is not None and None in (privateKey, certificate):
+ raise ValueError("A private key and a certificate are required "
+ "when adding a supplemental certificate chain.")
+ if extraCertChain is not None:
+ self.extraCertChain = extraCertChain
+ else:
+ self.extraCertChain = []
+
+ self.caCerts = caCerts
+ self.verifyDepth = verifyDepth
+ self.requireCertificate = requireCertificate
+ self.verifyOnce = verifyOnce
+ self.enableSingleUseKeys = enableSingleUseKeys
+ if enableSingleUseKeys:
+ self._options |= SSL.OP_SINGLE_DH_USE | SSL.OP_SINGLE_ECDH_USE
+ self.enableSessions = enableSessions
+ self.fixBrokenPeers = fixBrokenPeers
+ if fixBrokenPeers:
+ self._options |= SSL.OP_ALL
+ self.enableSessionTickets = enableSessionTickets
+
+ if not enableSessionTickets:
+ self._options |= SSL.OP_NO_TICKET
+ self.dhParameters = dhParameters
+
+ self._ecChooser = _ChooseDiffieHellmanEllipticCurve(
+ SSL.OPENSSL_VERSION_NUMBER,
+ openSSLlib=pyOpenSSLlib,
+ openSSLcrypto=crypto,
+ )
+
+ if acceptableCiphers is None:
+ acceptableCiphers = defaultCiphers
+ # This needs to run when method and _options are finalized.
+ self._cipherString = u':'.join(
+ c.fullName
+ for c in acceptableCiphers.selectCiphers(
+ _expandCipherString(u'ALL', self.method, self._options)
+ )
+ )
+ if self._cipherString == u'':
+ raise ValueError(
+ 'Supplied IAcceptableCiphers yielded no usable ciphers '
+ 'on this platform.'
+ )
+
+ if trustRoot is None:
+ if self.verify:
+ trustRoot = OpenSSLCertificateAuthorities(caCerts)
+ else:
+ self.verify = True
+ self.requireCertificate = True
+ trustRoot = IOpenSSLTrustRoot(trustRoot)
+ self.trustRoot = trustRoot
+
+ if acceptableProtocols is not None and not protocolNegotiationMechanisms():
+ raise NotImplementedError(
+ "No support for protocol negotiation on this platform."
+ )
+
+ self._acceptableProtocols = acceptableProtocols
+
+
+ def __getstate__(self):
+ d = self.__dict__.copy()
+ try:
+ del d['_context']
+ except KeyError:
+ pass
+ return d
+
+
+ def __setstate__(self, state):
+ self.__dict__ = state
+
+
+ def getContext(self):
+ """
+ Return an L{OpenSSL.SSL.Context} object.
+ """
+ if self._context is None:
+ self._context = self._makeContext()
+ return self._context
+
+
+ def _makeContext(self):
+ ctx = self._contextFactory(self.method)
+ ctx.set_options(self._options)
+ ctx.set_mode(self._mode)
+
+ if self.certificate is not None and self.privateKey is not None:
+ ctx.use_certificate(self.certificate)
+ ctx.use_privatekey(self.privateKey)
+ for extraCert in self.extraCertChain:
+ ctx.add_extra_chain_cert(extraCert)
+ # Sanity check
+ ctx.check_privatekey()
+
+ verifyFlags = SSL.VERIFY_NONE
+ if self.verify:
+ verifyFlags = SSL.VERIFY_PEER
+ if self.requireCertificate:
+ verifyFlags |= SSL.VERIFY_FAIL_IF_NO_PEER_CERT
+ if self.verifyOnce:
+ verifyFlags |= SSL.VERIFY_CLIENT_ONCE
+ self.trustRoot._addCACertsToContext(ctx)
+
+ # It'd be nice if pyOpenSSL let us pass None here for this behavior (as
+ # the underlying OpenSSL API call allows NULL to be passed). It
+ # doesn't, so we'll supply a function which does the same thing.
+ def _verifyCallback(conn, cert, errno, depth, preverify_ok):
+ return preverify_ok
+ ctx.set_verify(verifyFlags, _verifyCallback)
+ if self.verifyDepth is not None:
+ ctx.set_verify_depth(self.verifyDepth)
+
+ if self.enableSessions:
+ # 32 bytes is the maximum length supported
+ # Unfortunately pyOpenSSL doesn't provide SSL_MAX_SESSION_ID_LENGTH
+ sessionName = secureRandom(32)
+ ctx.set_session_id(sessionName)
+
+ if self.dhParameters:
+ ctx.load_tmp_dh(self.dhParameters._dhFile.path)
+ ctx.set_cipher_list(self._cipherString.encode('ascii'))
+
+ self._ecChooser.configureECDHCurve(ctx)
+
+ if self._acceptableProtocols:
+ # Try to set NPN and ALPN. _acceptableProtocols cannot be set by
+ # the constructor unless at least one mechanism is supported.
+ _setAcceptableProtocols(ctx, self._acceptableProtocols)
+
+ return ctx
+
+
+OpenSSLCertificateOptions.__getstate__ = deprecated(
+ Version("Twisted", 15, 0, 0),
+ "a real persistence system")(OpenSSLCertificateOptions.__getstate__)
+OpenSSLCertificateOptions.__setstate__ = deprecated(
+ Version("Twisted", 15, 0, 0),
+ "a real persistence system")(OpenSSLCertificateOptions.__setstate__)
+
+
+
+@implementer(ICipher)
+class OpenSSLCipher(FancyEqMixin, object):
+ """
+ A representation of an OpenSSL cipher.
+ """
+ compareAttributes = ('fullName',)
+
+ def __init__(self, fullName):
+ """
+ @param fullName: The full name of the cipher. For example
+ C{u"ECDHE-RSA-AES256-GCM-SHA384"}.
+ @type fullName: L{unicode}
+ """
+ self.fullName = fullName
+
+
+ def __repr__(self):
+ """
+ A runnable representation of the cipher.
+ """
+ return 'OpenSSLCipher({0!r})'.format(self.fullName)
+
+
+
+def _expandCipherString(cipherString, method, options):
+ """
+ Expand C{cipherString} according to C{method} and C{options} to a list
+ of explicit ciphers that are supported by the current platform.
+
+ @param cipherString: An OpenSSL cipher string to expand.
+ @type cipherString: L{unicode}
+
+ @param method: An OpenSSL method like C{SSL.TLSv1_METHOD} used for
+ determining the effective ciphers.
+
+ @param options: OpenSSL options like C{SSL.OP_NO_SSLv3} ORed together.
+ @type options: L{int}
+
+ @return: The effective list of explicit ciphers that results from the
+ arguments on the current platform.
+ @rtype: L{list} of L{ICipher}
+ """
+ ctx = SSL.Context(method)
+ ctx.set_options(options)
+ try:
+ ctx.set_cipher_list(cipherString.encode('ascii'))
+ except SSL.Error as e:
+ # OpenSSL 1.1.1 turns an invalid cipher list into TLS 1.3
+ # ciphers, so pyOpenSSL >= 19.0.0 raises an artificial Error
+ # that lacks a corresponding OpenSSL error if the cipher list
+ # consists only of these after a call to set_cipher_list.
+ if not e.args[0]:
+ return []
+ if e.args[0][0][2] == 'no cipher match':
+ return []
+ else:
+ raise
+ conn = SSL.Connection(ctx, None)
+ ciphers = conn.get_cipher_list()
+ if isinstance(ciphers[0], unicode):
+ return [OpenSSLCipher(cipher) for cipher in ciphers]
+ else:
+ return [OpenSSLCipher(cipher.decode('ascii')) for cipher in ciphers]
+
+
+
+@implementer(IAcceptableCiphers)
+class OpenSSLAcceptableCiphers(object):
+ """
+ A representation of ciphers that are acceptable for TLS connections.
+ """
+ def __init__(self, ciphers):
+ self._ciphers = ciphers
+
+
+ def selectCiphers(self, availableCiphers):
+ return [cipher
+ for cipher in self._ciphers
+ if cipher in availableCiphers]
+
+
+ @classmethod
+ def fromOpenSSLCipherString(cls, cipherString):
+ """
+ Create a new instance using an OpenSSL cipher string.
+
+ @param cipherString: An OpenSSL cipher string that describes what
+ cipher suites are acceptable.
+ See the documentation of U{OpenSSL
+ <http://www.openssl.org/docs/apps/ciphers.html#CIPHER_STRINGS>} or
+ U{Apache
+ <http://httpd.apache.org/docs/2.4/mod/mod_ssl.html#sslciphersuite>}
+ for details.
+ @type cipherString: L{unicode}
+
+ @return: Instance representing C{cipherString}.
+ @rtype: L{twisted.internet.ssl.AcceptableCiphers}
+ """
+ return cls(_expandCipherString(
+ nativeString(cipherString),
+ SSL.SSLv23_METHOD, SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3)
+ )
+
+
+# A secure default.
+# Sources for more information on TLS ciphers:
+#
+# - https://wiki.mozilla.org/Security/Server_Side_TLS
+# - https://www.ssllabs.com/projects/best-practices/index.html
+# - https://hynek.me/articles/hardening-your-web-servers-ssl-ciphers/
+#
+# The general intent is:
+# - Prefer cipher suites that offer perfect forward secrecy (DHE/ECDHE),
+# - prefer ECDHE over DHE for better performance,
+# - prefer any AES-GCM and ChaCha20 over any AES-CBC for better performance and
+# security,
+# - prefer AES-GCM to ChaCha20 because AES hardware support is common,
+# - disable NULL authentication, MD5 MACs and DSS for security reasons.
+#
+defaultCiphers = OpenSSLAcceptableCiphers.fromOpenSSLCipherString(
+ "TLS13-AES-256-GCM-SHA384:TLS13-CHACHA20-POLY1305-SHA256:"
+ "TLS13-AES-128-GCM-SHA256:"
+ "ECDH+AESGCM:ECDH+CHACHA20:DH+AESGCM:DH+CHACHA20:ECDH+AES256:DH+AES256:"
+ "ECDH+AES128:DH+AES:RSA+AESGCM:RSA+AES:"
+ "!aNULL:!MD5:!DSS"
+)
+_defaultCurveName = u"prime256v1"
+
+
+
+class _ChooseDiffieHellmanEllipticCurve(object):
+ """
+ Chooses the best elliptic curve for Elliptic Curve Diffie-Hellman
+ key exchange, and provides a C{configureECDHCurve} method to set
+ the curve, when appropriate, on a new L{OpenSSL.SSL.Context}.
+
+ The C{configureECDHCurve} method will be set to one of the
+ following based on the provided OpenSSL version and configuration:
+
+ - L{_configureOpenSSL110}
+
+ - L{_configureOpenSSL102}
+
+ - L{_configureOpenSSL101}
+
+ - L{_configureOpenSSL101NoCurves}.
+
+ @param openSSLVersion: The OpenSSL version number.
+ @type openSSLVersion: L{int}
+
+ @see: L{OpenSSL.SSL.OPENSSL_VERSION_NUMBER}
+
+ @param openSSLlib: The OpenSSL C{cffi} library module.
+ @param openSSLlib: The OpenSSL L{crypto} module.
+
+ @see: L{crypto}
+ """
+
+ def __init__(self, openSSLVersion, openSSLlib, openSSLcrypto):
+ self._openSSLlib = openSSLlib
+ self._openSSLcrypto = openSSLcrypto
+ if openSSLVersion >= 0x10100000:
+ self.configureECDHCurve = self._configureOpenSSL110
+ elif openSSLVersion >= 0x10002000:
+ self.configureECDHCurve = self._configureOpenSSL102
+ else:
+ try:
+ self._ecCurve = openSSLcrypto.get_elliptic_curve(
+ _defaultCurveName)
+ except ValueError:
+ # The get_elliptic_curve method raises a ValueError
+ # when the curve does not exist.
+ self.configureECDHCurve = self._configureOpenSSL101NoCurves
+ else:
+ self.configureECDHCurve = self._configureOpenSSL101
+
+
+ def _configureOpenSSL110(self, ctx):
+ """
+ OpenSSL 1.1.0 Contexts are preconfigured with an optimal set
+ of ECDH curves. This method does nothing.
+
+ @param ctx: L{OpenSSL.SSL.Context}
+ """
+
+
+ def _configureOpenSSL102(self, ctx):
+ """
+ Have the context automatically choose elliptic curves for
+ ECDH. Run on OpenSSL 1.0.2 and OpenSSL 1.1.0+, but only has
+ an effect on OpenSSL 1.0.2.
+
+ @param ctx: The context which .
+ @type ctx: L{OpenSSL.SSL.Context}
+ """
+ ctxPtr = ctx._context
+ try:
+ self._openSSLlib.SSL_CTX_set_ecdh_auto(ctxPtr, True)
+ except:
+ pass
+
+
+ def _configureOpenSSL101(self, ctx):
+ """
+ Set the default elliptic curve for ECDH on the context. Only
+ run on OpenSSL 1.0.1.
+
+ @param ctx: The context on which to set the ECDH curve.
+ @type ctx: L{OpenSSL.SSL.Context}
+ """
+ try:
+ ctx.set_tmp_ecdh(self._ecCurve)
+ except:
+ pass
+
+
+ def _configureOpenSSL101NoCurves(self, ctx):
+ """
+ No elliptic curves are available on OpenSSL 1.0.1. We can't
+ set anything, so do nothing.
+
+ @param ctx: The context on which to set the ECDH curve.
+ @type ctx: L{OpenSSL.SSL.Context}
+ """
+
+
+
+class OpenSSLDiffieHellmanParameters(object):
+ """
+ A representation of key generation parameters that are required for
+ Diffie-Hellman key exchange.
+ """
+ def __init__(self, parameters):
+ self._dhFile = parameters
+
+
+ @classmethod
+ def fromFile(cls, filePath):
+ """
+ Load parameters from a file.
+
+ Such a file can be generated using the C{openssl} command line tool as
+ following:
+
+ C{openssl dhparam -out dh_param_2048.pem -2 2048}
+
+ Please refer to U{OpenSSL's C{dhparam} documentation
+ <http://www.openssl.org/docs/apps/dhparam.html>} for further details.
+
+ @param filePath: A file containing parameters for Diffie-Hellman key
+ exchange.
+ @type filePath: L{FilePath <twisted.python.filepath.FilePath>}
+
+ @return: An instance that loads its parameters from C{filePath}.
+ @rtype: L{DiffieHellmanParameters
+ <twisted.internet.ssl.DiffieHellmanParameters>}
+ """
+ return cls(filePath)
+
+
+
+def _setAcceptableProtocols(context, acceptableProtocols):
+ """
+ Called to set up the L{OpenSSL.SSL.Context} for doing NPN and/or ALPN
+ negotiation.
+
+ @param context: The context which is set up.
+ @type context: L{OpenSSL.SSL.Context}
+
+ @param acceptableProtocols: The protocols this peer is willing to speak
+ after the TLS negotiation has completed, advertised over both ALPN and
+ NPN. If this argument is specified, and no overlap can be found with
+ the other peer, the connection will fail to be established. If the
+ remote peer does not offer NPN or ALPN, the connection will be
+ established, but no protocol wil be negotiated. Protocols earlier in
+ the list are preferred over those later in the list.
+ @type acceptableProtocols: L{list} of L{bytes}
+ """
+ def protoSelectCallback(conn, protocols):
+ """
+ NPN client-side and ALPN server-side callback used to select
+ the next protocol. Prefers protocols found earlier in
+ C{_acceptableProtocols}.
+
+ @param conn: The context which is set up.
+ @type conn: L{OpenSSL.SSL.Connection}
+
+ @param conn: Protocols advertised by the other side.
+ @type conn: L{list} of L{bytes}
+ """
+ overlap = set(protocols) & set(acceptableProtocols)
+
+ for p in acceptableProtocols:
+ if p in overlap:
+ return p
+ else:
+ return b''
+
+ # If we don't actually have protocols to negotiate, don't set anything up.
+ # Depending on OpenSSL version, failing some of the selection callbacks can
+ # cause the handshake to fail, which is presumably not what was intended
+ # here.
+ if not acceptableProtocols:
+ return
+
+ supported = protocolNegotiationMechanisms()
+
+ if supported & ProtocolNegotiationSupport.NPN:
+ def npnAdvertiseCallback(conn):
+ return acceptableProtocols
+
+ context.set_npn_advertise_callback(npnAdvertiseCallback)
+ context.set_npn_select_callback(protoSelectCallback)
+
+ if supported & ProtocolNegotiationSupport.ALPN:
+ context.set_alpn_select_callback(protoSelectCallback)
+ context.set_alpn_protos(acceptableProtocols)