summaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/conch/endpoints.py
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2025-06-22 18:50:56 +0300
committerrobot-piglet <[email protected]>2025-06-22 19:04:42 +0300
commitc7cbc6d480c5488ff6e921c709680fd2c1340a10 (patch)
tree10843f44b67c0fb5717ad555556064095f701d8c /contrib/python/Twisted/py3/twisted/conch/endpoints.py
parent26d391cdb94d2ce5efc8d0cc5cea7607dc363c0b (diff)
Intermediate changes
commit_hash:28750b74281710ec1ab5bdc2403c8ab24bdd164b
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/conch/endpoints.py')
-rw-r--r--contrib/python/Twisted/py3/twisted/conch/endpoints.py138
1 files changed, 75 insertions, 63 deletions
diff --git a/contrib/python/Twisted/py3/twisted/conch/endpoints.py b/contrib/python/Twisted/py3/twisted/conch/endpoints.py
index 3269532acd1..966669edec7 100644
--- a/contrib/python/Twisted/py3/twisted/conch/endpoints.py
+++ b/contrib/python/Twisted/py3/twisted/conch/endpoints.py
@@ -31,13 +31,18 @@ from twisted.conch.ssh.connection import SSHConnection
from twisted.conch.ssh.keys import Key
from twisted.conch.ssh.transport import SSHClientTransport
from twisted.conch.ssh.userauth import SSHUserAuthClient
+from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.defer import CancelledError, Deferred, succeed
from twisted.internet.endpoints import TCP4ClientEndpoint, connectProtocol
from twisted.internet.error import ConnectionDone, ProcessTerminated
-from twisted.internet.interfaces import IStreamClientEndpoint
+from twisted.internet.interfaces import (
+ IReactorTCP,
+ IStreamClientEndpoint,
+ ITCPTransport,
+)
from twisted.internet.protocol import Factory
from twisted.logger import Logger
-from twisted.python.compat import nativeString, networkString
+from twisted.python.compat import nativeString
from twisted.python.failure import Failure
from twisted.python.filepath import FilePath
@@ -297,8 +302,8 @@ class _UserAuth(SSHUserAuthClient):
authentication, and delegating authentication to an agent.
"""
- password = None
- keys = None
+ password: bytes | None = None
+ keys: list[Key] | None = None
agent = None
def getPublicKey(self):
@@ -414,16 +419,23 @@ class _CommandTransport(SSHClientTransport):
_hostKeyFailure = None
- _userauth = None
+ _userauth: _UserAuth | None = None
- def __init__(self, creator):
+ def __init__(self, creator: _NewConnectionHelper) -> None:
"""
@param creator: The L{_NewConnectionHelper} that created this
connection.
-
- @type creator: L{_NewConnectionHelper}.
"""
- self.connectionReady = Deferred(lambda d: self.transport.abortConnection())
+
+ def cancelReady(d: Deferred[None]) -> None:
+ transport = ITCPTransport(self.transport, None)
+ # adaptation is papering over an annoying type-punning issue here,
+ # we more or less have to run over an abortable transport, so not
+ # testing the negative branch.
+ if transport is not None: # pragma: no branch
+ transport.abortConnection()
+
+ self.connectionReady: Deferred[None] = Deferred(cancelReady)
# Clear the reference to that deferred to help the garbage collector
# and to signal to other parts of this implementation (in particular
# connectionLost) that it has already been fired and does not need to
@@ -436,7 +448,7 @@ class _CommandTransport(SSHClientTransport):
self.connectionReady.addBoth(readyFired)
self.creator = creator
- def verifyHostKey(self, hostKey, fingerprint):
+ def verifyHostKey(self, hostKey: Key, fingerprint: bytes) -> Deferred[bool]:
"""
Ask the L{KnownHostsFile} provider available on the factory which
created this protocol this protocol to verify the given host key.
@@ -445,30 +457,29 @@ class _CommandTransport(SSHClientTransport):
L{KnownHostsFile.verifyHostKey}.
"""
hostname = self.creator.hostname
- ip = networkString(self.transport.getPeer().host)
-
+ transport = self.transport
+ assert transport is not None
+ peer = transport.getPeer()
+ assert isinstance(peer, (IPv4Address, IPv6Address))
+ ip = peer.host.encode("ascii")
self._state = b"SECURING"
- d = self.creator.knownHosts.verifyHostKey(
+ return self.creator.knownHosts.verifyHostKey(
self.creator.ui, hostname, ip, Key.fromString(hostKey)
- )
- d.addErrback(self._saveHostKeyFailure)
- return d
+ ).addErrback(self._saveHostKeyFailure)
- def _saveHostKeyFailure(self, reason):
+ def _saveHostKeyFailure(self, reason: Failure) -> Failure:
"""
When host key verification fails, record the reason for the failure in
order to fire a L{Deferred} with it later.
@param reason: The cause of the host key verification failure.
- @type reason: L{Failure}
@return: C{reason}
- @rtype: L{Failure}
"""
self._hostKeyFailure = reason
return reason
- def connectionSecure(self):
+ def connectionSecure(self) -> None:
"""
When the connection is secure, start the authentication process.
"""
@@ -481,17 +492,18 @@ class _CommandTransport(SSHClientTransport):
if self.creator.keys:
self._userauth.keys = list(self.creator.keys)
- if self.creator.agentEndpoint is not None:
- d = self._userauth.connectToAgent(self.creator.agentEndpoint)
- else:
- d = succeed(None)
+ d = (
+ succeed(None)
+ if self.creator.agentEndpoint is None
+ else self._userauth.connectToAgent(self.creator.agentEndpoint)
+ )
def maybeGotAgent(ignored):
self.requestService(self._userauth)
d.addBoth(maybeGotAgent)
- def connectionLost(self, reason):
+ def connectionLost(self, reason: Failure | None = None) -> None:
"""
When the underlying connection to the SSH server is lost, if there were
any connection setup errors, propagate them. Also, clean up the
@@ -529,7 +541,7 @@ class SSHCommandClientEndpoint:
command invocations over a single SSH connection.
"""
- def __init__(self, creator, command):
+ def __init__(self, creator: _ISSHConnectionCreator, command: bytes) -> None:
"""
@param creator: An L{_ISSHConnectionCreator} provider which will be
used to set up the SSH connection which will be used to run a
@@ -550,17 +562,17 @@ class SSHCommandClientEndpoint:
@classmethod
def newConnection(
cls,
- reactor,
- command,
- username,
- hostname,
- port=None,
- keys=None,
- password=None,
- agentEndpoint=None,
- knownHosts=None,
- ui=None,
- ):
+ reactor: IReactorTCP,
+ command: bytes,
+ username: bytes,
+ hostname: bytes,
+ port: int | None = None,
+ keys: list[Key] | None = None,
+ password: bytes | None = None,
+ agentEndpoint: IStreamClientEndpoint | None = None,
+ knownHosts: str | None = None,
+ ui: ConsoleUI | None = None,
+ ) -> SSHCommandClientEndpoint:
"""
Create and return a new endpoint which will try to create a new
connection to an SSH server and run a command over it. It will also
@@ -569,44 +581,36 @@ class SSHCommandClientEndpoint:
L{Deferred} is cancelled.
@param reactor: The reactor to use to establish the connection.
- @type reactor: L{IReactorTCP} provider
@param command: See L{__init__}'s C{command} argument.
@param username: The username with which to authenticate to the SSH
server.
- @type username: L{bytes}
@param hostname: The hostname of the SSH server.
- @type hostname: L{bytes}
@param port: The port number of the SSH server. By default, the
standard SSH port number is used.
- @type port: L{int}
@param keys: Private keys with which to authenticate to the SSH server,
if key authentication is to be attempted (otherwise L{None}).
- @type keys: L{list} of L{Key}
@param password: The password with which to authenticate to the SSH
server, if password authentication is to be attempted (otherwise
L{None}).
- @type password: L{bytes} or L{None}
@param agentEndpoint: An L{IStreamClientEndpoint} provider which may be
used to connect to an SSH agent, if one is to be used to help with
authentication.
- @type agentEndpoint: L{IStreamClientEndpoint} provider
- @param knownHosts: The currently known host keys, used to check the
- host key presented by the server we actually connect to.
- @type knownHosts: L{KnownHostsFile}
+ @param knownHosts: The path to the currently known host keys file, used
+ to check the host key presented by the server we actually connect
+ to.
@param ui: An object for interacting with users to make decisions about
whether to accept the server host keys. If L{None}, a L{ConsoleUI}
connected to /dev/tty will be used; if /dev/tty is unavailable, an
object which answers C{b"no"} to all prompts will be used.
- @type ui: L{None} or L{ConsoleUI}
@return: A new instance of C{cls} (probably
L{SSHCommandClientEndpoint}).
@@ -707,17 +711,19 @@ class _NewConnectionHelper:
_KNOWN_HOSTS = _KNOWN_HOSTS
port = 22
+ knownHosts: KnownHostsFile
+
def __init__(
self,
reactor: Any,
- hostname: str,
- port: int,
- command: str,
- username: str,
- keys: str,
- password: str,
- agentEndpoint: str,
- knownHosts: str | None,
+ hostname: bytes,
+ port: int | None,
+ command: bytes,
+ username: bytes,
+ keys: list[Key] | None,
+ password: bytes | None,
+ agentEndpoint: IStreamClientEndpoint | None,
+ knownHosts: str | None | KnownHostsFile,
ui: ConsoleUI | None,
tty: FilePath[bytes] | FilePath[str] = FilePath(b"/dev/tty"),
):
@@ -733,12 +739,13 @@ class _NewConnectionHelper:
self.port = port
self.command = command
self.username = username
- self.keys = keys
+ self.keys = [] if keys is None else keys
self.password = password
self.agentEndpoint = agentEndpoint
- if knownHosts is None:
- knownHosts = self._knownHosts()
- self.knownHosts = knownHosts
+ if isinstance(knownHosts, KnownHostsFile):
+ self.knownHosts = knownHosts
+ else:
+ self.knownHosts = self._knownHosts(knownHosts)
if ui is None:
ui = ConsoleUI(self._opener)
@@ -760,14 +767,19 @@ class _NewConnectionHelper:
return BytesIO(b"no")
@classmethod
- def _knownHosts(cls):
+ def _knownHosts(cls, path: str | None = None) -> KnownHostsFile:
"""
-
@return: A L{KnownHostsFile} instance pointed at the user's personal
I{known hosts} file.
@rtype: L{KnownHostsFile}
"""
- return KnownHostsFile.fromPath(FilePath(expanduser(cls._KNOWN_HOSTS)))
+ if path is None: # pragma: no branch
+ # negative branch untested because this fallback path requires user
+ # configuration that tests shouldn't be messing with
+ # directly. (This should be factored out for better testability in
+ # terms of coverage.)
+ path = expanduser(cls._KNOWN_HOSTS)
+ return KnownHostsFile.fromPath(FilePath(path))
def secureConnection(self):
"""