summaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/conch/ssh
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2025-06-22 18:50:56 +0300
committerrobot-piglet <[email protected]>2025-06-22 19:04:42 +0300
commitc7cbc6d480c5488ff6e921c709680fd2c1340a10 (patch)
tree10843f44b67c0fb5717ad555556064095f701d8c /contrib/python/Twisted/py3/twisted/conch/ssh
parent26d391cdb94d2ce5efc8d0cc5cea7607dc363c0b (diff)
Intermediate changes
commit_hash:28750b74281710ec1ab5bdc2403c8ab24bdd164b
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/conch/ssh')
-rw-r--r--contrib/python/Twisted/py3/twisted/conch/ssh/_kex.py44
-rw-r--r--contrib/python/Twisted/py3/twisted/conch/ssh/common.py23
-rw-r--r--contrib/python/Twisted/py3/twisted/conch/ssh/keys.py21
-rw-r--r--contrib/python/Twisted/py3/twisted/conch/ssh/service.py8
-rw-r--r--contrib/python/Twisted/py3/twisted/conch/ssh/transport.py121
-rw-r--r--contrib/python/Twisted/py3/twisted/conch/ssh/userauth.py175
6 files changed, 205 insertions, 187 deletions
diff --git a/contrib/python/Twisted/py3/twisted/conch/ssh/_kex.py b/contrib/python/Twisted/py3/twisted/conch/ssh/_kex.py
index c23acec219c..0b04f1f5f6b 100644
--- a/contrib/python/Twisted/py3/twisted/conch/ssh/_kex.py
+++ b/contrib/python/Twisted/py3/twisted/conch/ssh/_kex.py
@@ -6,26 +6,37 @@
SSH key exchange handling.
"""
+from __future__ import annotations
from hashlib import sha1, sha256, sha384, sha512
+from typing import TYPE_CHECKING, Protocol
from zope.interface import Attribute, Interface, implementer
from twisted.conch import error
+if TYPE_CHECKING:
+ # NB: Not a real attribute at runtime.
+ from hashlib import _Hash
+
+
+class _HashFactory(Protocol):
+ def __call__(self, data: bytes = ...) -> _Hash:
+ ...
+
class _IKexAlgorithm(Interface):
"""
An L{_IKexAlgorithm} describes a key exchange algorithm.
"""
- preference = Attribute(
+ preference: int = Attribute(
"An L{int} giving the preference of the algorithm when negotiating "
"key exchange. Algorithms with lower precedence values are more "
"preferred."
)
- hashProcessor = Attribute(
+ hashProcessor: _HashFactory = Attribute(
"A callable hash algorithm constructor (e.g. C{hashlib.sha256}) "
"suitable for use with this key exchange algorithm."
)
@@ -175,7 +186,7 @@ class _DHGroup14SHA1:
# Which ECDH hash function to use is dependent on the size.
-_kexAlgorithms = {
+_kexAlgorithms: dict[bytes, _IKexAlgorithm] = {
b"curve25519-sha256": _Curve25519SHA256(),
b"[email protected]": _Curve25519SHA256LibSSH(),
b"diffie-hellman-group-exchange-sha256": _DHGroupExchangeSHA256(),
@@ -187,7 +198,7 @@ _kexAlgorithms = {
}
-def getKex(kexAlgorithm):
+def getKex(kexAlgorithm: bytes) -> _IKexAlgorithm:
"""
Get a description of a named key exchange algorithm.
@@ -201,53 +212,47 @@ def getKex(kexAlgorithm):
@raises ConchError: if the key exchange algorithm is not found.
"""
if kexAlgorithm not in _kexAlgorithms:
- raise error.ConchError(f"Unsupported key exchange algorithm: {kexAlgorithm}")
+ raise error.ConchError(f"Unsupported key exchange algorithm: {kexAlgorithm!r}")
return _kexAlgorithms[kexAlgorithm]
-def isEllipticCurve(kexAlgorithm):
+def isEllipticCurve(kexAlgorithm: bytes) -> bool:
"""
Returns C{True} if C{kexAlgorithm} is an elliptic curve.
@param kexAlgorithm: The key exchange algorithm name.
- @type kexAlgorithm: C{str}
- @return: C{True} if C{kexAlgorithm} is an elliptic curve,
- otherwise C{False}.
- @rtype: C{bool}
+ @return: C{True} if C{kexAlgorithm} is an elliptic curve, otherwise
+ C{False}.
"""
return _IEllipticCurveExchangeKexAlgorithm.providedBy(getKex(kexAlgorithm))
-def isFixedGroup(kexAlgorithm):
+def isFixedGroup(kexAlgorithm: bytes) -> bool:
"""
Returns C{True} if C{kexAlgorithm} has a fixed prime / generator group.
@param kexAlgorithm: The key exchange algorithm name.
- @type kexAlgorithm: L{bytes}
@return: C{True} if C{kexAlgorithm} has a fixed prime / generator group,
otherwise C{False}.
- @rtype: L{bool}
"""
return _IFixedGroupKexAlgorithm.providedBy(getKex(kexAlgorithm))
-def getHashProcessor(kexAlgorithm):
+def getHashProcessor(kexAlgorithm: bytes) -> _HashFactory:
"""
Get the hash algorithm callable to use in key exchange.
@param kexAlgorithm: The key exchange algorithm name.
- @type kexAlgorithm: L{bytes}
@return: A callable hash algorithm constructor (e.g. C{hashlib.sha256}).
- @rtype: C{callable}
"""
kex = getKex(kexAlgorithm)
return kex.hashProcessor
-def getDHGeneratorAndPrime(kexAlgorithm):
+def getDHGeneratorAndPrime(kexAlgorithm: bytes) -> tuple[int, int]:
"""
Get the generator and the prime to use in key exchange.
@@ -257,17 +262,16 @@ def getDHGeneratorAndPrime(kexAlgorithm):
@return: A L{tuple} containing L{int} generator and L{int} prime.
@rtype: L{tuple}
"""
- kex = getKex(kexAlgorithm)
+ kex = _IFixedGroupKexAlgorithm(getKex(kexAlgorithm))
return kex.generator, kex.prime
-def getSupportedKeyExchanges():
+def getSupportedKeyExchanges() -> list[bytes]:
"""
Get a list of supported key exchange algorithm names in order of
preference.
@return: A C{list} of supported key exchange algorithm names.
- @rtype: C{list} of L{bytes}
"""
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec
diff --git a/contrib/python/Twisted/py3/twisted/conch/ssh/common.py b/contrib/python/Twisted/py3/twisted/conch/ssh/common.py
index 8bb6a286c3b..8d01ab14e50 100644
--- a/contrib/python/Twisted/py3/twisted/conch/ssh/common.py
+++ b/contrib/python/Twisted/py3/twisted/conch/ssh/common.py
@@ -4,12 +4,11 @@
"""
Common functions for the SSH classes.
-
-Maintainer: Paul Swartz
"""
-
+from __future__ import annotations
import struct
+from typing import Sequence, overload
from cryptography.utils import int_to_bytes
@@ -19,7 +18,7 @@ from twisted.python.versions import Version
__all__ = ["NS", "getNS", "MP", "getMP", "ffs"]
-def NS(t):
+def NS(t: bytes | str) -> bytes:
"""
net string
"""
@@ -28,7 +27,7 @@ def NS(t):
return struct.pack("!L", len(t)) + t
-def getNS(s, count=1):
+def getNS(s: bytes, count: int = 1) -> Sequence[bytes]:
"""
get net string
"""
@@ -41,7 +40,7 @@ def getNS(s, count=1):
return tuple(ns) + (s[c:],)
-def MP(number):
+def MP(number: int) -> bytes:
if number == 0:
return b"\000" * 4
assert number > 0
@@ -51,7 +50,17 @@ def MP(number):
return struct.pack(">L", len(bn)) + bn
-def getMP(data, count=1):
+@overload
+def getMP(data: bytes) -> tuple[int, bytes]:
+ ...
+
+
+@overload
+def getMP(data: bytes, count: int) -> Sequence[int | bytes]:
+ ...
+
+
+def getMP(data: bytes, count: int = 1) -> Sequence[int | bytes]:
"""
Get multiple precision integer out of the string. A multiple precision
integer is stored as a 4-byte length followed by length bytes of the
diff --git a/contrib/python/Twisted/py3/twisted/conch/ssh/keys.py b/contrib/python/Twisted/py3/twisted/conch/ssh/keys.py
index e0e4a4b2c54..e52608df9aa 100644
--- a/contrib/python/Twisted/py3/twisted/conch/ssh/keys.py
+++ b/contrib/python/Twisted/py3/twisted/conch/ssh/keys.py
@@ -1675,20 +1675,17 @@ class Key:
values = (data["p"], data["q"], data["g"], data["y"], data["x"])
return common.NS(self.sshType()) + b"".join(map(common.MP, values))
- def sign(self, data, signatureType=None):
+ def sign(self, data: bytes, signatureType: bytes | None = None) -> bytes:
"""
Sign some data with this key.
SECSH-TRANS RFC 4253 Section 6.6.
- @type data: L{bytes}
@param data: The data to sign.
- @type signatureType: L{bytes}
@param signatureType: The SSH public key algorithm name to sign this
- data with, or L{None} to use a reasonable default for the key.
+ data with, or L{None} to use a reasonable default for the key.
- @rtype: L{bytes}
@return: A signature for the given data.
"""
keyType = self.type()
@@ -1702,7 +1699,7 @@ class Key:
hashAlgorithm = self._getHashAlgorithm(signatureType)
if hashAlgorithm is None:
raise BadSignatureAlgorithmError(
- f"public key signature algorithm {signatureType} is not "
+ f"public key signature algorithm {signatureType!r} is not "
f"defined for {keyType} keys"
)
@@ -1726,21 +1723,13 @@ class Key:
rb = int_to_bytes(r)
sb = int_to_bytes(s)
- # Int_to_bytes returns rb[0] as a str in python2
- # and an as int in python3
- if type(rb[0]) is str:
- rcomp = ord(rb[0])
- else:
- rcomp = rb[0]
+ rcomp = rb[0]
# If the MSB is set, prepend a null byte for correct formatting.
if rcomp & 0x80:
rb = b"\x00" + rb
- if type(sb[0]) is str:
- scomp = ord(sb[0])
- else:
- scomp = sb[0]
+ scomp = sb[0]
if scomp & 0x80:
sb = b"\x00" + sb
diff --git a/contrib/python/Twisted/py3/twisted/conch/ssh/service.py b/contrib/python/Twisted/py3/twisted/conch/ssh/service.py
index 7d0d41c4aed..acfd40ee6a7 100644
--- a/contrib/python/Twisted/py3/twisted/conch/ssh/service.py
+++ b/contrib/python/Twisted/py3/twisted/conch/ssh/service.py
@@ -7,18 +7,22 @@ are ssh-userauth and ssh-connection.
Maintainer: Paul Swartz
"""
+from __future__ import annotations
-from typing import Dict
+from typing import TYPE_CHECKING, Dict
from twisted.logger import Logger
+if TYPE_CHECKING:
+ from twisted.conch.ssh.transport import SSHTransportBase
+
class SSHService:
# this is the ssh name for the service:
name: bytes = None # type:ignore[assignment]
protocolMessages: Dict[int, str] = {} # map #'s -> protocol names
- transport = None # gets set later
+ transport: SSHTransportBase | None = None # gets set later
_log = Logger()
diff --git a/contrib/python/Twisted/py3/twisted/conch/ssh/transport.py b/contrib/python/Twisted/py3/twisted/conch/ssh/transport.py
index 545c010f76e..323236f4720 100644
--- a/contrib/python/Twisted/py3/twisted/conch/ssh/transport.py
+++ b/contrib/python/Twisted/py3/twisted/conch/ssh/transport.py
@@ -17,7 +17,7 @@ import struct
import types
import zlib
from hashlib import md5, sha1, sha256, sha384, sha512
-from typing import Any, Callable, Dict, Tuple, Union
+from typing import TYPE_CHECKING, Any, Callable, Dict, Tuple, Union
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.backends import default_backend
@@ -29,14 +29,19 @@ from typing_extensions import Literal
from twisted import __version__ as twisted_version
from twisted.conch.ssh import _kex, address, keys
from twisted.conch.ssh.common import MP, NS, ffs, getMP, getNS
+from twisted.conch.ssh.service import SSHService
from twisted.internet import defer, protocol
from twisted.logger import Logger
from twisted.python import randbytes
from twisted.python.compat import iterbytes, networkString
+from twisted.python.failure import Failure
# This import is needed if SHA256 hashing is used.
# from twisted.python.compat import nativeString
+if TYPE_CHECKING:
+ from twisted.conch.ssh.factory import SSHFactory
+
def _mpFromBytes(data):
"""Make an SSH multiple-precision integer from big-endian L{bytes}.
@@ -311,8 +316,8 @@ def _getSupportedCiphers():
class SSHTransportBase(protocol.Protocol):
"""
- Protocol supporting basic SSH functionality: sending/receiving packets
- and message dispatch. To connect to or run a server, you must use
+ Protocol supporting basic SSH functionality: sending/receiving packets and
+ message dispatch. To connect to or run a server, you must use
SSHClientTransport or SSHServerTransport.
@ivar protocolVersion: A string representing the version of the SSH
@@ -321,23 +326,22 @@ class SSHTransportBase(protocol.Protocol):
@ivar version: A string representing the version of the server or client.
Currently defaults to 'Twisted'.
- @ivar comment: An optional string giving more information about the
- server or client.
+ @ivar comment: An optional string giving more information about the server
+ or client.
@ivar supportedCiphers: A list of strings representing the encryption
algorithms supported, in order from most-preferred to least.
@ivar supportedMACs: A list of strings representing the message
authentication codes (hashes) supported, in order from most-preferred
- to least. Both this and supportedCiphers can include 'none' to use
- no encryption or authentication, but that must be done manually,
+ to least. Both this and supportedCiphers can include 'none' to use no
+ encryption or authentication, but that must be done manually,
- @ivar supportedKeyExchanges: A list of strings representing the
- key exchanges supported, in order from most-preferred to least.
+ @ivar supportedKeyExchanges: A list of strings representing the key
+ exchanges supported, in order from most-preferred to least.
- @ivar supportedPublicKeys: A list of strings representing the
- public key algorithms supported, in order from most-preferred to
- least.
+ @ivar supportedPublicKeys: A list of strings representing the public key
+ algorithms supported, in order from most-preferred to least.
@ivar supportedCompressions: A list of strings representing compression
types supported, from most-preferred to least.
@@ -350,16 +354,16 @@ class SSHTransportBase(protocol.Protocol):
@ivar isClient: A boolean indicating whether this is a client or server.
- @ivar gotVersion: A boolean indicating whether we have received the
- version string from the other side.
+ @ivar gotVersion: A boolean indicating whether we have received the version
+ string from the other side.
@ivar buf: Data we've received but hasn't been parsed into a packet.
@ivar outgoingPacketSequence: the sequence number of the next packet we
will send.
- @ivar incomingPacketSequence: the sequence number of the next packet we
- are expecting from the other side.
+ @ivar incomingPacketSequence: the sequence number of the next packet we are
+ expecting from the other side.
@ivar outgoingCompression: an object supporting the .compress(str) and
.flush() methods, or None if there is no outgoing compression. Used to
@@ -391,8 +395,8 @@ class SSHTransportBase(protocol.Protocol):
part of the key exchange, sessionID is used to generate the various
encryption and authentication keys.
- @ivar service: an SSHService instance, or None. If it's set to an object,
- it's the currently running service.
+ @ivar service: an L{SSHService} instance, or None. If it's set to an
+ object, it's the currently running service.
@ivar kexAlg: the agreed-upon key exchange algorithm.
@@ -476,8 +480,8 @@ class SSHTransportBase(protocol.Protocol):
incomingPacketSequence = 0
outgoingCompression = None
incomingCompression = None
- sessionID = None
- service = None
+ sessionID: bytes | None = None
+ service: SSHService | None = None
# There is no key exchange activity in progress.
_KEY_EXCHANGE_NONE = "_KEY_EXCHANGE_NONE"
@@ -507,7 +511,13 @@ class SSHTransportBase(protocol.Protocol):
_peerSupportsExtensions = False
peerExtensions: Dict[bytes, bytes] = {}
- def connectionLost(self, reason):
+ factory: SSHFactory
+
+ # Set by twisted.conch.ssh.userauth.SSHUserAuthServer._cbFinishedAuth
+ avatar: object
+ logoutFunction: Callable[[], None]
+
+ def connectionLost(self, reason: Failure | None = None) -> None:
"""
When the underlying connection is closed, stop the running service (if
any), and log out the avatar (if any).
@@ -1171,41 +1181,35 @@ class SSHTransportBase(protocol.Protocol):
prefix = struct.pack(">L", len(secret))
return prefix + secret
- def _getKey(self, c, sharedSecret, exchangeHash):
+ def _getKey(self, c: bytes, sharedSecret: bytes, exchangeHash: bytes) -> bytes:
"""
Get one of the keys for authentication/encryption.
- @type c: L{bytes}
@param c: The letter identifying which key this is.
- @type sharedSecret: L{bytes}
@param sharedSecret: The shared secret K.
- @type exchangeHash: L{bytes}
@param exchangeHash: The hash H from key exchange.
- @rtype: L{bytes}
@return: The derived key.
"""
hashProcessor = _kex.getHashProcessor(self.kexAlg)
- k1 = hashProcessor(sharedSecret + exchangeHash + c + self.sessionID)
- k1 = k1.digest()
+ assert self.sessionID is not None, "session ID must already have been assigned"
+ k1 = hashProcessor(sharedSecret + exchangeHash + c + self.sessionID).digest()
k2 = hashProcessor(sharedSecret + exchangeHash + k1).digest()
k3 = hashProcessor(sharedSecret + exchangeHash + k1 + k2).digest()
k4 = hashProcessor(sharedSecret + exchangeHash + k1 + k2 + k3).digest()
return k1 + k2 + k3 + k4
- def _keySetup(self, sharedSecret, exchangeHash):
+ def _keySetup(self, sharedSecret: bytes, exchangeHash: bytes) -> None:
"""
- Set up the keys for the connection and sends MSG_NEWKEYS when
- finished,
+ Set up the keys for the connection and sends MSG_NEWKEYS when finished.
@param sharedSecret: a secret string agreed upon using a Diffie-
- Hellman exchange, so it is only shared between
- the server and the client.
- @type sharedSecret: L{str}
+ Hellman exchange, so it is only shared between the server and the
+ client.
+
@param exchangeHash: A hash of various data known by both sides.
- @type exchangeHash: L{str}
"""
if not self.sessionID:
self.sessionID = exchangeHash
@@ -1442,7 +1446,7 @@ class SSHServerTransport(SSHTransportBase):
isClient = False
ignoreNextPacket = 0
- def _getHostKeys(self, keyAlg):
+ def _getHostKeys(self, keyAlg: bytes) -> tuple[keys.Key, keys.Key]:
"""
Get the public and private host keys corresponding to the given
public key signature algorithm.
@@ -1467,7 +1471,7 @@ class SSHServerTransport(SSHTransportBase):
keyFormat = keyAlg
return self.factory.publicKeys[keyFormat], self.factory.privateKeys[keyFormat]
- def ssh_KEXINIT(self, packet):
+ def ssh_KEXINIT(self, packet: bytes) -> None:
"""
Called when we receive a MSG_KEXINIT message. For a description
of the packet, see SSHTransportBase.ssh_KEXINIT(). Additionally,
@@ -1487,29 +1491,26 @@ class SSHServerTransport(SSHTransportBase):
):
self.ignoreNextPacket = True # Guess was wrong
- def _ssh_KEX_ECDH_INIT(self, packet):
+ def _ssh_KEX_ECDH_INIT(self, packet: bytes) -> None:
"""
- Called from L{ssh_KEX_DH_GEX_REQUEST_OLD} to handle
- elliptic curve key exchanges.
+ Called from L{ssh_KEX_DH_GEX_REQUEST_OLD} to handle elliptic curve key
+ exchanges.
Payload::
string client Elliptic Curve Diffie-Hellman public key
Just like L{_ssh_KEXDH_INIT} this message type is also not dispatched
- directly. Extra check to determine if this is really KEX_ECDH_INIT
- is required.
+ directly. Extra check to determine if this is really KEX_ECDH_INIT is
+ required.
- First we load the host's public/private keys.
- Then we generate the ECDH public/private keypair for the given curve.
- With that we generate the shared secret key.
- Then we compute the hash to sign and send back to the client
- Along with the server's public key and the ECDH public key.
+ First we load the host's public/private keys. Then we generate the
+ ECDH public/private keypair for the given curve. With that we generate
+ the shared secret key. Then we compute the hash to sign and send back
+ to the client Along with the server's public key and the ECDH public
+ key.
- @type packet: L{bytes}
@param packet: The message data.
-
- @return: None.
"""
# Get the raw client public key.
pktPub, packet = getNS(packet)
@@ -1547,7 +1548,7 @@ class SSHServerTransport(SSHTransportBase):
)
self._keySetup(sharedSecret, exchangeHash)
- def _ssh_KEXDH_INIT(self, packet):
+ def _ssh_KEXDH_INIT(self, packet: bytes) -> None:
"""
Called to handle the beginning of a non-group key exchange.
@@ -1588,7 +1589,7 @@ class SSHServerTransport(SSHTransportBase):
)
self._keySetup(sharedSecret, exchangeHash)
- def ssh_KEX_DH_GEX_REQUEST_OLD(self, packet):
+ def ssh_KEX_DH_GEX_REQUEST_OLD(self, packet: bytes) -> None:
"""
This represents different key exchange methods that share the same
integer value. If the message is determined to be a KEXDH_INIT,
@@ -1625,7 +1626,7 @@ class SSHServerTransport(SSHTransportBase):
self._startEphemeralDH()
self.sendPacket(MSG_KEX_DH_GEX_GROUP, MP(self.p) + MP(self.g))
- def ssh_KEX_DH_GEX_REQUEST(self, packet):
+ def ssh_KEX_DH_GEX_REQUEST(self, packet: bytes) -> None:
"""
Called when we receive a MSG_KEX_DH_GEX_REQUEST message. Payload::
integer minimum
@@ -1651,7 +1652,7 @@ class SSHServerTransport(SSHTransportBase):
self._startEphemeralDH()
self.sendPacket(MSG_KEX_DH_GEX_GROUP, MP(self.p) + MP(self.g))
- def ssh_KEX_DH_GEX_INIT(self, packet):
+ def ssh_KEX_DH_GEX_INIT(self, packet: bytes) -> None:
"""
Called when we get a MSG_KEX_DH_GEX_INIT message. Payload::
integer e (client DH public key)
@@ -1693,7 +1694,7 @@ class SSHServerTransport(SSHTransportBase):
)
self._keySetup(sharedSecret, exchangeHash)
- def _keySetup(self, sharedSecret, exchangeHash):
+ def _keySetup(self, sharedSecret: bytes, exchangeHash: bytes) -> None:
"""
See SSHTransportBase._keySetup().
"""
@@ -1709,13 +1710,12 @@ class SSHServerTransport(SSHTransportBase):
[(b"server-sig-algs", b",".join(self.supportedPublicKeys))]
)
- def ssh_NEWKEYS(self, packet):
+ def ssh_NEWKEYS(self, packet: bytes) -> None:
"""
Called when we get a MSG_NEWKEYS message. No payload.
When we get this, the keys have been set on both sides, and we
start using them to encrypt and authenticate the connection.
- @type packet: L{bytes}
@param packet: The message data.
"""
if packet != b"":
@@ -1723,7 +1723,7 @@ class SSHServerTransport(SSHTransportBase):
return
self._newKeys()
- def ssh_SERVICE_REQUEST(self, packet):
+ def ssh_SERVICE_REQUEST(self, packet: bytes) -> None:
"""
Called when we get a MSG_SERVICE_REQUEST message. Payload::
string serviceName
@@ -1906,7 +1906,8 @@ class SSHClientTransport(SSHTransportBase):
d.addCallback(_continue_KEX_ECDH_REPLY, hostKey, pubKey, signature)
d.addErrback(
lambda unused: self.sendDisconnect(
- DISCONNECT_HOST_KEY_NOT_VERIFIABLE, b"bad host key"
+ DISCONNECT_HOST_KEY_NOT_VERIFIABLE,
+ f"bad host key [ecdh] {unused}".encode("utf-8"),
)
)
return d
@@ -2122,7 +2123,7 @@ class SSHClientTransport(SSHTransportBase):
)
self.setService(self.instance)
- def requestService(self, instance):
+ def requestService(self, instance: SSHService) -> None:
"""
Request that a service be run over this transport.
diff --git a/contrib/python/Twisted/py3/twisted/conch/ssh/userauth.py b/contrib/python/Twisted/py3/twisted/conch/ssh/userauth.py
index 310f5f09f2e..0d24df00f92 100644
--- a/contrib/python/Twisted/py3/twisted/conch/ssh/userauth.py
+++ b/contrib/python/Twisted/py3/twisted/conch/ssh/userauth.py
@@ -8,20 +8,28 @@ Currently implemented authentication types are public-key and password.
Maintainer: Paul Swartz
"""
-
+from __future__ import annotations
import struct
+from typing import Callable, Tuple, Type
from twisted.conch import error, interfaces
from twisted.conch.ssh import keys, service, transport
from twisted.conch.ssh.common import NS, getNS
+from twisted.conch.ssh.keys import Key
from twisted.cred import credentials
from twisted.cred.error import UnauthorizedLogin
from twisted.internet import defer, reactor
+from twisted.internet.defer import Deferred
+from twisted.internet.interfaces import IReactorTime
from twisted.logger import Logger
from twisted.python import failure
from twisted.python.compat import nativeString
+_ConchPortalTuple = Tuple[
+ Type[interfaces.IConchUser], interfaces.IConchUser, Callable[[], None]
+]
+
class SSHUserAuthServer(service.SSHService):
"""
@@ -72,7 +80,7 @@ class SSHUserAuthServer(service.SSHService):
attemptsBeforeDisconnect = 20
# 20 login attempts before a disconnect
passwordDelay = 1 # number of seconds to delay on a failed password
- clock = reactor
+ clock: IReactorTime = IReactorTime(reactor)
interfaceToMethod = {
credentials.ISSHPrivateKey: b"publickey",
credentials.IUsernamePassword: b"password",
@@ -124,37 +132,40 @@ class SSHUserAuthServer(service.SSHService):
transport.DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLE, b"you took too long"
)
- def tryAuth(self, kind, user, data):
+ def tryAuth(
+ self, kind: bytes, user: bytes, data: bytes
+ ) -> Deferred[_ConchPortalTuple]:
"""
Try to authenticate the user with the given method. Dispatches to a
auth_* method.
@param kind: the authentication method to try.
- @type kind: L{bytes}
+
@param user: the username the client is authenticating with.
- @type user: L{bytes}
+
@param data: authentication specific data sent by the client.
- @type data: L{bytes}
+
@return: A Deferred called back if the method succeeded, or erred back
if it failed.
- @rtype: C{defer.Deferred}
"""
self._log.debug("{user!r} trying auth {kind!r}", user=user, kind=kind)
if kind not in self.supportedAuthentications:
return defer.fail(error.ConchError("unsupported authentication, failing"))
- kind = nativeString(kind.replace(b"-", b"_"))
- f = getattr(self, f"auth_{kind}", None)
- if f:
+ strkind = kind.replace(b"-", b"_").decode("ascii")
+ f: Callable[[bytes], Deferred[_ConchPortalTuple] | None] | None = getattr(
+ self, f"auth_{strkind}", None
+ )
+ if f is not None:
ret = f(data)
- if not ret:
+ if ret is None:
return defer.fail(
- error.ConchError(f"{kind} return None instead of a Deferred")
+ error.ConchError(f"{strkind} return None instead of a Deferred")
)
else:
return ret
- return defer.fail(error.ConchError(f"bad auth type: {kind}"))
+ return defer.fail(error.ConchError(f"bad auth type: {strkind}"))
- def ssh_USERAUTH_REQUEST(self, packet):
+ def ssh_USERAUTH_REQUEST(self, packet: bytes) -> Deferred[_ConchPortalTuple] | None:
"""
The client has requested authentication. Payload::
string user
@@ -173,19 +184,21 @@ class SSHUserAuthServer(service.SSHService):
d = self.tryAuth(method, user, rest)
if not d:
self._ebBadAuth(failure.Failure(error.ConchError("auth returned none")))
- return
- d.addCallback(self._cbFinishedAuth)
- d.addErrback(self._ebMaybeBadAuth)
- d.addErrback(self._ebBadAuth)
- return d
+ return None
+ return (
+ d.addCallback(self._cbFinishedAuth)
+ .addErrback(self._ebMaybeBadAuth)
+ .addErrback(self._ebBadAuth)
+ )
- def _cbFinishedAuth(self, result):
+ def _cbFinishedAuth(self, result: _ConchPortalTuple) -> None:
"""
The callback when user has successfully been authenticated. For a
description of the arguments, see L{twisted.cred.portal.Portal.login}.
We start the service requested by the user.
"""
(interface, avatar, logout) = result
+ assert self.transport is not None
self.transport.avatar = avatar
self.transport.logoutFunction = logout
service = self.transport.factory.getService(self.transport, self.nextService)
@@ -249,7 +262,7 @@ class SSHUserAuthServer(service.SSHService):
MSG_USERAUTH_FAILURE, NS(b",".join(self.supportedAuthentications)) + b"\x00"
)
- def auth_publickey(self, packet):
+ def auth_publickey(self, packet: bytes) -> Deferred[_ConchPortalTuple]:
"""
Public key authentication. Payload::
byte has signature
@@ -262,6 +275,7 @@ class SSHUserAuthServer(service.SSHService):
hasSig = ord(packet[0:1])
algName, blob, rest = getNS(packet[1:], 2)
+ result: Deferred[_ConchPortalTuple]
try:
keys.Key.fromString(blob)
except keys.BadKeyError:
@@ -271,6 +285,8 @@ class SSHUserAuthServer(service.SSHService):
signature = hasSig and getNS(rest)[0] or None
if hasSig:
+ assert self.transport is not None, "must have transport for auth"
+ assert self.transport.sessionID is not None, "must have session for auth"
b = (
NS(self.transport.sessionID)
+ bytes((MSG_USERAUTH_REQUEST,))
@@ -282,19 +298,21 @@ class SSHUserAuthServer(service.SSHService):
+ NS(blob)
)
c = credentials.SSHPrivateKey(self.user, algName, blob, b, signature)
- return self.portal.login(c, None, interfaces.IConchUser)
+ result = self.portal.login(c, None, interfaces.IConchUser)
else:
c = credentials.SSHPrivateKey(self.user, algName, blob, None, None)
- return self.portal.login(c, None, interfaces.IConchUser).addErrback(
+ result = self.portal.login(c, None, interfaces.IConchUser).addErrback(
self._ebCheckKey, packet[1:]
)
+ return result
- def _ebCheckKey(self, reason, packet):
+ def _ebCheckKey(self, reason: failure.Failure, packet: bytes) -> failure.Failure:
"""
Called back if the user did not sent a signature. If reason is
error.ValidPublicKey then this key is valid for the user to
authenticate with. Send MSG_USERAUTH_PK_OK.
"""
+ assert self.transport is not None
reason.trap(error.ValidPublicKey)
# if we make it here, it means that the publickey is valid
self.transport.sendPacket(MSG_USERAUTH_PK_OK, packet)
@@ -331,64 +349,67 @@ class SSHUserAuthClient(service.SSHService):
making callbacks for more information when necessary.
@ivar name: the name of this service: 'ssh-userauth'
- @type name: L{str}
+
@ivar preferredOrder: a list of authentication methods that should be used
first, in order of preference, if supported by the server
- @type preferredOrder: L{list}
+
@ivar user: the name of the user to authenticate as
- @type user: L{bytes}
+
@ivar instance: the service to start after authentication has finished
- @type instance: L{service.SSHService}
- @ivar authenticatedWith: a list of strings of authentication methods we've tried
- @type authenticatedWith: L{list} of L{bytes}
+
+ @ivar authenticatedWith: a list of strings of authentication methods we've
+ tried
+
@ivar triedPublicKeys: a list of public key objects that we've tried to
authenticate with
- @type triedPublicKeys: L{list} of L{Key}
+
@ivar lastPublicKey: the last public key object we've tried to authenticate
with
- @type lastPublicKey: L{Key}
"""
- name = b"ssh-userauth"
- preferredOrder = [b"publickey", b"password", b"keyboard-interactive"]
+ name: bytes = b"ssh-userauth"
+ preferredOrder: list[bytes] = [
+ b"publickey",
+ b"password",
+ b"keyboard-interactive",
+ ]
- def __init__(self, user, instance):
+ def __init__(self, user: bytes, instance: service.SSHService):
self.user = user
self.instance = instance
- def serviceStarted(self):
- self.authenticatedWith = []
- self.triedPublicKeys = []
- self.lastPublicKey = None
+ def serviceStarted(self) -> None:
+ self.authenticatedWith: list[bytes] = []
+ self.triedPublicKeys: list[Key] = []
+ self.lastPublicKey: Key | None = None
self.askForAuth(b"none", b"")
- def askForAuth(self, kind, extraData):
+ def askForAuth(self, kind: bytes, extraData: bytes) -> None:
"""
- Send a MSG_USERAUTH_REQUEST.
+ Send a C{MSG_USERAUTH_REQUEST}.
@param kind: the authentication method to try.
- @type kind: L{bytes}
+
@param extraData: method-specific data to go in the packet
- @type extraData: L{bytes}
"""
+ assert self.transport is not None
self.lastAuth = kind
self.transport.sendPacket(
MSG_USERAUTH_REQUEST,
NS(self.user) + NS(self.instance.name) + NS(kind) + extraData,
)
- def tryAuth(self, kind):
+ def tryAuth(self, kind: bytes) -> None | Deferred[bool]:
"""
Dispatch to an authentication method.
@param kind: the authentication method
@type kind: L{bytes}
"""
- kind = nativeString(kind.replace(b"-", b"_"))
+ strkind = kind.replace(b"-", b"_").decode("ascii")
self._log.debug("trying to auth with {kind}", kind=kind)
- f = getattr(self, "auth_" + kind, None)
- if f:
- return f()
+ f: Callable[[], Deferred[bool]] | None = getattr(self, "auth_" + strkind, None)
+ return f() if f is not None else None
def _ebAuth(self, ignored, *args):
"""
@@ -597,19 +618,15 @@ class SSHUserAuthClient(service.SSHService):
data += NS(r.encode("UTF8"))
self.transport.sendPacket(MSG_USERAUTH_INFO_RESPONSE, data)
- def auth_publickey(self):
+ def auth_publickey(self) -> Deferred[bool]:
"""
Try to authenticate with a public key. Ask the user for a public key;
if the user has one, send the request to the server and return True.
Otherwise, return False.
-
- @rtype: L{bool}
"""
- d = defer.maybeDeferred(self.getPublicKey)
- d.addBoth(self._cbGetPublicKey)
- return d
+ return defer.maybeDeferred(self.getPublicKey).addBoth(self._cbGetPublicKey)
- def _cbGetPublicKey(self, publicKey):
+ def _cbGetPublicKey(self, publicKey: Key | failure.Failure | None) -> bool:
if not isinstance(publicKey, keys.Key): # failure or None
publicKey = None
if publicKey is not None:
@@ -623,13 +640,15 @@ class SSHUserAuthClient(service.SSHService):
else:
return False
- def auth_password(self):
+ # Section defining C{auth_}-prefixed methods begins here: they must each be
+ # defined with the signature (() -> bool), as described by
+ # L{SSHUserAuthClient.tryAuth}.
+
+ def auth_password(self) -> bool:
"""
Try to authenticate with a password. Ask the user for a password.
If the user will return a password, return True. Otherwise, return
False.
-
- @rtype: L{bool}
"""
d = self.getPassword()
if d:
@@ -638,83 +657,75 @@ class SSHUserAuthClient(service.SSHService):
else: # returned None, don't do password auth
return False
- def auth_keyboard_interactive(self):
+ def auth_keyboard_interactive(self) -> bool:
"""
Try to authenticate with keyboard-interactive authentication. Send
the request to the server and return True.
-
- @rtype: L{bool}
"""
self._log.debug("authing with keyboard-interactive")
self.askForAuth(b"keyboard-interactive", NS(b"") + NS(b""))
return True
- def _cbPassword(self, password):
+ # Section defining C{auth_}-prefixed methods ends here.
+
+ def _cbPassword(self, password: bytes) -> None:
"""
Called back when the user gives a password. Send the request to the
server.
@param password: the password the user entered
- @type password: L{bytes}
"""
self.askForAuth(b"password", b"\x00" + NS(password))
- def signData(self, publicKey, signData):
+ def signData(self, publicKey: keys.Key, signData: bytes) -> Deferred[bytes] | None:
"""
Sign the given data with the given public key.
- By default, this will call getPrivateKey to get the private key,
- then sign the data using Key.sign().
+ By default, this will call getPrivateKey to get the private key, then
+ sign the data using Key.sign().
This method is factored out so that it can be overridden to use
alternate methods, such as a key agent.
@param publicKey: The public key object returned from L{getPublicKey}
- @type publicKey: L{keys.Key}
@param signData: the data to be signed by the private key.
- @type signData: L{bytes}
+
@return: a Deferred that's called back with the signature
- @rtype: L{defer.Deferred}
"""
key = self.getPrivateKey()
if not key:
- return
+ return None
return key.addCallback(self._cbSignData, signData)
- def _cbSignData(self, privateKey, signData):
+ def _cbSignData(self, privateKey: keys.Key, signData: bytes) -> bytes:
"""
- Called back when the private key is returned. Sign the data and
- return the signature.
+ Called back when the private key is returned. Sign the data and return
+ the signature.
@param privateKey: the private key object
- @type privateKey: L{keys.Key}
+
@param signData: the data to be signed by the private key.
- @type signData: L{bytes}
+
@return: the signature
- @rtype: L{bytes}
"""
return privateKey.sign(signData)
- def getPublicKey(self):
+ def getPublicKey(self) -> Key | None:
"""
Return a public key for the user. If no more public keys are
available, return L{None}.
This implementation always returns L{None}. Override it in a
subclass to actually find and return a public key object.
-
- @rtype: L{Key} or L{None}
"""
return None
- def getPrivateKey(self):
+ def getPrivateKey(self) -> Deferred[Key]:
"""
Return a L{Deferred} that will be called back with the private key
object corresponding to the last public key from getPublicKey().
If the private key is not available, errback on the Deferred.
-
- @rtype: L{Deferred} called back with L{Key}
"""
return defer.fail(NotImplementedError())