aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/internet/tcp.py
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/internet/tcp.py
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/internet/tcp.py')
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/tcp.py1523
1 files changed, 1523 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/internet/tcp.py b/contrib/python/Twisted/py3/twisted/internet/tcp.py
new file mode 100644
index 0000000000..c87b5b7333
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/internet/tcp.py
@@ -0,0 +1,1523 @@
+# -*- test-case-name: twisted.test.test_tcp -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Various asynchronous TCP/IP classes.
+
+End users shouldn't use this module directly - use the reactor APIs instead.
+"""
+
+import os
+
+# System Imports
+import socket
+import struct
+import sys
+from typing import Callable, ClassVar, List, Optional
+
+from zope.interface import Interface, implementer
+
+import attr
+import typing_extensions
+
+from twisted.internet.interfaces import (
+ IHalfCloseableProtocol,
+ IListeningPort,
+ ISystemHandle,
+ ITCPTransport,
+)
+from twisted.logger import ILogObserver, LogEvent, Logger
+from twisted.python import deprecate, versions
+from twisted.python.compat import lazyByteSlice
+from twisted.python.runtime import platformType
+
+try:
+ # Try to get the memory BIO based startTLS implementation, available since
+ # pyOpenSSL 0.10
+ from twisted.internet._newtls import (
+ ClientMixin as _TLSClientMixin,
+ ConnectionMixin as _TLSConnectionMixin,
+ ServerMixin as _TLSServerMixin,
+ )
+ from twisted.internet.interfaces import ITLSTransport
+except ImportError:
+ # There is no version of startTLS available
+ ITLSTransport = Interface # type: ignore[misc,assignment]
+
+ class _TLSConnectionMixin: # type: ignore[no-redef]
+ TLS = False
+
+ class _TLSClientMixin: # type: ignore[no-redef]
+ pass
+
+ class _TLSServerMixin: # type: ignore[no-redef]
+ pass
+
+
+if platformType == "win32":
+ # no such thing as WSAEPERM or error code 10001
+ # according to winsock.h or MSDN
+ EPERM = object()
+ from errno import ( # type: ignore[attr-defined]
+ WSAEALREADY as EALREADY,
+ WSAEINPROGRESS as EINPROGRESS,
+ WSAEINVAL as EINVAL,
+ WSAEISCONN as EISCONN,
+ WSAEMFILE as EMFILE,
+ WSAENOBUFS as ENOBUFS,
+ WSAEWOULDBLOCK as EWOULDBLOCK,
+ )
+
+ # No such thing as WSAENFILE, either.
+ ENFILE = object()
+ # Nor ENOMEM
+ ENOMEM = object()
+ EAGAIN = EWOULDBLOCK
+ from errno import WSAECONNRESET as ECONNABORTED # type: ignore[attr-defined]
+
+ from twisted.python.win32 import formatError as strerror
+else:
+ from errno import EPERM
+ from errno import EINVAL
+ from errno import EWOULDBLOCK
+ from errno import EINPROGRESS
+ from errno import EALREADY
+ from errno import EISCONN
+ from errno import ENOBUFS
+ from errno import EMFILE
+ from errno import ENFILE
+ from errno import ENOMEM
+ from errno import EAGAIN
+ from errno import ECONNABORTED
+
+ from os import strerror
+
+from errno import errorcode
+
+# Twisted Imports
+from twisted.internet import abstract, address, base, error, fdesc, main
+from twisted.internet.error import CannotListenError
+from twisted.internet.protocol import Protocol
+from twisted.internet.task import deferLater
+from twisted.python import failure, log, reflect
+from twisted.python.util import untilConcludes
+
+# Not all platforms have, or support, this flag.
+_AI_NUMERICSERV = getattr(socket, "AI_NUMERICSERV", 0)
+
+
+def _getrealname(addr):
+ """
+ Return a 2-tuple of socket IP and port for IPv4 and a 4-tuple of
+ socket IP, port, flowInfo, and scopeID for IPv6. For IPv6, it
+ returns the interface portion (the part after the %) as a part of
+ the IPv6 address, which Python 3.7+ does not include.
+
+ @param addr: A 2-tuple for IPv4 information or a 4-tuple for IPv6
+ information.
+ """
+ if len(addr) == 4:
+ # IPv6
+ host = socket.getnameinfo(addr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)[
+ 0
+ ]
+ return tuple([host] + list(addr[1:]))
+ else:
+ return addr[:2]
+
+
+def _getpeername(skt):
+ """
+ See L{_getrealname}.
+ """
+ return _getrealname(skt.getpeername())
+
+
+def _getsockname(skt):
+ """
+ See L{_getrealname}.
+ """
+ return _getrealname(skt.getsockname())
+
+
+class _SocketCloser:
+ """
+ @ivar _shouldShutdown: Set to C{True} if C{shutdown} should be called
+ before calling C{close} on the underlying socket.
+ @type _shouldShutdown: C{bool}
+ """
+
+ _shouldShutdown = True
+
+ def _closeSocket(self, orderly):
+ # The call to shutdown() before close() isn't really necessary, because
+ # we set FD_CLOEXEC now, which will ensure this is the only process
+ # holding the FD, thus ensuring close() really will shutdown the TCP
+ # socket. However, do it anyways, just to be safe.
+ skt = self.socket
+ try:
+ if orderly:
+ if self._shouldShutdown:
+ skt.shutdown(2)
+ else:
+ # Set SO_LINGER to 1,0 which, by convention, causes a
+ # connection reset to be sent when close is called,
+ # instead of the standard FIN shutdown sequence.
+ self.socket.setsockopt(
+ socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0)
+ )
+
+ except OSError:
+ pass
+ try:
+ skt.close()
+ except OSError:
+ pass
+
+
+class _AbortingMixin:
+ """
+ Common implementation of C{abortConnection}.
+
+ @ivar _aborting: Set to C{True} when C{abortConnection} is called.
+ @type _aborting: C{bool}
+ """
+
+ _aborting = False
+
+ def abortConnection(self):
+ """
+ Aborts the connection immediately, dropping any buffered data.
+
+ @since: 11.1
+ """
+ if self.disconnected or self._aborting:
+ return
+ self._aborting = True
+ self.stopReading()
+ self.stopWriting()
+ self.doRead = lambda *args, **kwargs: None
+ self.doWrite = lambda *args, **kwargs: None
+ self.reactor.callLater(
+ 0, self.connectionLost, failure.Failure(error.ConnectionAborted())
+ )
+
+
+@implementer(ITLSTransport, ITCPTransport, ISystemHandle)
+class Connection(
+ _TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser, _AbortingMixin
+):
+ """
+ Superclass of all socket-based FileDescriptors.
+
+ This is an abstract superclass of all objects which represent a TCP/IP
+ connection based socket.
+
+ @ivar logstr: prefix used when logging events related to this connection.
+ @type logstr: C{str}
+ """
+
+ def __init__(self, skt, protocol, reactor=None):
+ abstract.FileDescriptor.__init__(self, reactor=reactor)
+ self.socket = skt
+ self.socket.setblocking(0)
+ self.fileno = skt.fileno
+ self.protocol = protocol
+
+ def getHandle(self):
+ """Return the socket for this connection."""
+ return self.socket
+
+ def doRead(self):
+ """Calls self.protocol.dataReceived with all available data.
+
+ This reads up to self.bufferSize bytes of data from its socket, then
+ calls self.dataReceived(data) to process it. If the connection is not
+ lost through an error in the physical recv(), this function will return
+ the result of the dataReceived call.
+ """
+ try:
+ data = self.socket.recv(self.bufferSize)
+ except OSError as se:
+ if se.args[0] == EWOULDBLOCK:
+ return
+ else:
+ return main.CONNECTION_LOST
+
+ return self._dataReceived(data)
+
+ def _dataReceived(self, data):
+ if not data:
+ return main.CONNECTION_DONE
+ rval = self.protocol.dataReceived(data)
+ if rval is not None:
+ offender = self.protocol.dataReceived
+ warningFormat = (
+ "Returning a value other than None from %(fqpn)s is "
+ "deprecated since %(version)s."
+ )
+ warningString = deprecate.getDeprecationWarningString(
+ offender, versions.Version("Twisted", 11, 0, 0), format=warningFormat
+ )
+ deprecate.warnAboutFunction(offender, warningString)
+ return rval
+
+ def writeSomeData(self, data):
+ """
+ Write as much as possible of the given data to this TCP connection.
+
+ This sends up to C{self.SEND_LIMIT} bytes from C{data}. If the
+ connection is lost, an exception is returned. Otherwise, the number
+ of bytes successfully written is returned.
+ """
+ # Limit length of buffer to try to send, because some OSes are too
+ # stupid to do so themselves (ahem windows)
+ limitedData = lazyByteSlice(data, 0, self.SEND_LIMIT)
+
+ try:
+ return untilConcludes(self.socket.send, limitedData)
+ except OSError as se:
+ if se.args[0] in (EWOULDBLOCK, ENOBUFS):
+ return 0
+ else:
+ return main.CONNECTION_LOST
+
+ def _closeWriteConnection(self):
+ try:
+ self.socket.shutdown(1)
+ except OSError:
+ pass
+ p = IHalfCloseableProtocol(self.protocol, None)
+ if p:
+ try:
+ p.writeConnectionLost()
+ except BaseException:
+ f = failure.Failure()
+ log.err()
+ self.connectionLost(f)
+
+ def readConnectionLost(self, reason):
+ p = IHalfCloseableProtocol(self.protocol, None)
+ if p:
+ try:
+ p.readConnectionLost()
+ except BaseException:
+ log.err()
+ self.connectionLost(failure.Failure())
+ else:
+ self.connectionLost(reason)
+
+ def connectionLost(self, reason):
+ """See abstract.FileDescriptor.connectionLost()."""
+ # Make sure we're not called twice, which can happen e.g. if
+ # abortConnection() is called from protocol's dataReceived and then
+ # code immediately after throws an exception that reaches the
+ # reactor. We can't rely on "disconnected" attribute for this check
+ # since twisted.internet._oldtls does evil things to it:
+ if not hasattr(self, "socket"):
+ return
+ abstract.FileDescriptor.connectionLost(self, reason)
+ self._closeSocket(not reason.check(error.ConnectionAborted))
+ protocol = self.protocol
+ del self.protocol
+ del self.socket
+ del self.fileno
+ protocol.connectionLost(reason)
+
+ logstr = "Uninitialized"
+
+ def logPrefix(self):
+ """Return the prefix to log with when I own the logging thread."""
+ return self.logstr
+
+ def getTcpNoDelay(self):
+ return bool(self.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY))
+
+ def setTcpNoDelay(self, enabled):
+ self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled)
+
+ def getTcpKeepAlive(self):
+ return bool(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE))
+
+ def setTcpKeepAlive(self, enabled):
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
+
+
+class _BaseBaseClient:
+ """
+ Code shared with other (non-POSIX) reactors for management of general
+ outgoing connections.
+
+ Requirements upon subclasses are documented as instance variables rather
+ than abstract methods, in order to avoid MRO confusion, since this base is
+ mixed in to unfortunately weird and distinctive multiple-inheritance
+ hierarchies and many of these attributes are provided by peer classes
+ rather than descendant classes in those hierarchies.
+
+ @ivar addressFamily: The address family constant (C{socket.AF_INET},
+ C{socket.AF_INET6}, C{socket.AF_UNIX}) of the underlying socket of this
+ client connection.
+ @type addressFamily: C{int}
+
+ @ivar socketType: The socket type constant (C{socket.SOCK_STREAM} or
+ C{socket.SOCK_DGRAM}) of the underlying socket.
+ @type socketType: C{int}
+
+ @ivar _requiresResolution: A flag indicating whether the address of this
+ client will require name resolution. C{True} if the hostname of said
+ address indicates a name that must be resolved by hostname lookup,
+ C{False} if it indicates an IP address literal.
+ @type _requiresResolution: C{bool}
+
+ @cvar _commonConnection: Subclasses must provide this attribute, which
+ indicates the L{Connection}-alike class to invoke C{__init__} and
+ C{connectionLost} on.
+ @type _commonConnection: C{type}
+
+ @ivar _stopReadingAndWriting: Subclasses must implement in order to remove
+ this transport from its reactor's notifications in response to a
+ terminated connection attempt.
+ @type _stopReadingAndWriting: 0-argument callable returning L{None}
+
+ @ivar _closeSocket: Subclasses must implement in order to close the socket
+ in response to a terminated connection attempt.
+ @type _closeSocket: 1-argument callable; see L{_SocketCloser._closeSocket}
+
+ @ivar _collectSocketDetails: Clean up references to the attached socket in
+ its underlying OS resource (such as a file descriptor or file handle),
+ as part of post connection-failure cleanup.
+ @type _collectSocketDetails: 0-argument callable returning L{None}.
+
+ @ivar reactor: The class pointed to by C{_commonConnection} should set this
+ attribute in its constructor.
+ @type reactor: L{twisted.internet.interfaces.IReactorTime},
+ L{twisted.internet.interfaces.IReactorCore},
+ L{twisted.internet.interfaces.IReactorFDSet}
+ """
+
+ addressFamily = socket.AF_INET
+ socketType = socket.SOCK_STREAM
+
+ def _finishInit(self, whenDone, skt, error, reactor):
+ """
+ Called by subclasses to continue to the stage of initialization where
+ the socket connect attempt is made.
+
+ @param whenDone: A 0-argument callable to invoke once the connection is
+ set up. This is L{None} if the connection could not be prepared
+ due to a previous error.
+
+ @param skt: The socket object to use to perform the connection.
+ @type skt: C{socket._socketobject}
+
+ @param error: The error to fail the connection with.
+
+ @param reactor: The reactor to use for this client.
+ @type reactor: L{twisted.internet.interfaces.IReactorTime}
+ """
+ if whenDone:
+ self._commonConnection.__init__(self, skt, None, reactor)
+ reactor.callLater(0, whenDone)
+ else:
+ reactor.callLater(0, self.failIfNotConnected, error)
+
+ def resolveAddress(self):
+ """
+ Resolve the name that was passed to this L{_BaseBaseClient}, if
+ necessary, and then move on to attempting the connection once an
+ address has been determined. (The connection will be attempted
+ immediately within this function if either name resolution can be
+ synchronous or the address was an IP address literal.)
+
+ @note: You don't want to call this method from outside, as it won't do
+ anything useful; it's just part of the connection bootstrapping
+ process. Also, although this method is on L{_BaseBaseClient} for
+ historical reasons, it's not used anywhere except for L{Client}
+ itself.
+
+ @return: L{None}
+ """
+ if self._requiresResolution:
+ d = self.reactor.resolve(self.addr[0])
+ d.addCallback(lambda n: (n,) + self.addr[1:])
+ d.addCallbacks(self._setRealAddress, self.failIfNotConnected)
+ else:
+ self._setRealAddress(self.addr)
+
+ def _setRealAddress(self, address):
+ """
+ Set the resolved address of this L{_BaseBaseClient} and initiate the
+ connection attempt.
+
+ @param address: Depending on whether this is an IPv4 or IPv6 connection
+ attempt, a 2-tuple of C{(host, port)} or a 4-tuple of C{(host,
+ port, flow, scope)}. At this point it is a fully resolved address,
+ and the 'host' portion will always be an IP address, not a DNS
+ name.
+ """
+ if len(address) == 4:
+ # IPv6, make sure we have the scopeID associated
+ hostname = socket.getnameinfo(
+ address, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV
+ )[0]
+ self.realAddress = tuple([hostname] + list(address[1:]))
+ else:
+ self.realAddress = address
+ self.doConnect()
+
+ def failIfNotConnected(self, err):
+ """
+ Generic method called when the attempts to connect failed. It basically
+ cleans everything it can: call connectionFailed, stop read and write,
+ delete socket related members.
+ """
+ if self.connected or self.disconnected or not hasattr(self, "connector"):
+ return
+
+ self._stopReadingAndWriting()
+ try:
+ self._closeSocket(True)
+ except AttributeError:
+ pass
+ else:
+ self._collectSocketDetails()
+ self.connector.connectionFailed(failure.Failure(err))
+ del self.connector
+
+ def stopConnecting(self):
+ """
+ If a connection attempt is still outstanding (i.e. no connection is
+ yet established), immediately stop attempting to connect.
+ """
+ self.failIfNotConnected(error.UserError())
+
+ def connectionLost(self, reason):
+ """
+ Invoked by lower-level logic when it's time to clean the socket up.
+ Depending on the state of the connection, either inform the attached
+ L{Connector} that the connection attempt has failed, or inform the
+ connected L{IProtocol} that the established connection has been lost.
+
+ @param reason: the reason that the connection was terminated
+ @type reason: L{Failure}
+ """
+ if not self.connected:
+ self.failIfNotConnected(error.ConnectError(string=reason))
+ else:
+ self._commonConnection.connectionLost(self, reason)
+ self.connector.connectionLost(reason)
+
+
+class BaseClient(_BaseBaseClient, _TLSClientMixin, Connection):
+ """
+ A base class for client TCP (and similar) sockets.
+
+ @ivar realAddress: The address object that will be used for socket.connect;
+ this address is an address tuple (the number of elements dependent upon
+ the address family) which does not contain any names which need to be
+ resolved.
+ @type realAddress: C{tuple}
+
+ @ivar _base: L{Connection}, which is the base class of this class which has
+ all of the useful file descriptor methods. This is used by
+ L{_TLSServerMixin} to call the right methods to directly manipulate the
+ transport, as is necessary for writing TLS-encrypted bytes (whereas
+ those methods on L{Server} will go through another layer of TLS if it
+ has been enabled).
+ """
+
+ _base = Connection
+ _commonConnection = Connection
+
+ def _stopReadingAndWriting(self):
+ """
+ Implement the POSIX-ish (i.e.
+ L{twisted.internet.interfaces.IReactorFDSet}) method of detaching this
+ socket from the reactor for L{_BaseBaseClient}.
+ """
+ if hasattr(self, "reactor"):
+ # this doesn't happen if we failed in __init__
+ self.stopReading()
+ self.stopWriting()
+
+ def _collectSocketDetails(self):
+ """
+ Clean up references to the socket and its file descriptor.
+
+ @see: L{_BaseBaseClient}
+ """
+ del self.socket, self.fileno
+
+ def createInternetSocket(self):
+ """(internal) Create a non-blocking socket using
+ self.addressFamily, self.socketType.
+ """
+ s = socket.socket(self.addressFamily, self.socketType)
+ s.setblocking(0)
+ fdesc._setCloseOnExec(s.fileno())
+ return s
+
+ def doConnect(self):
+ """
+ Initiate the outgoing connection attempt.
+
+ @note: Applications do not need to call this method; it will be invoked
+ internally as part of L{IReactorTCP.connectTCP}.
+ """
+ self.doWrite = self.doConnect
+ self.doRead = self.doConnect
+ if not hasattr(self, "connector"):
+ # this happens when connection failed but doConnect
+ # was scheduled via a callLater in self._finishInit
+ return
+
+ err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+ if err:
+ self.failIfNotConnected(error.getConnectError((err, strerror(err))))
+ return
+
+ # doConnect gets called twice. The first time we actually need to
+ # start the connection attempt. The second time we don't really
+ # want to (SO_ERROR above will have taken care of any errors, and if
+ # it reported none, the mere fact that doConnect was called again is
+ # sufficient to indicate that the connection has succeeded), but it
+ # is not /particularly/ detrimental to do so. This should get
+ # cleaned up some day, though.
+ try:
+ connectResult = self.socket.connect_ex(self.realAddress)
+ except OSError as se:
+ connectResult = se.args[0]
+ if connectResult:
+ if connectResult == EISCONN:
+ pass
+ # on Windows EINVAL means sometimes that we should keep trying:
+ # http://msdn.microsoft.com/library/default.asp?url=/library/en-us/winsock/winsock/connect_2.asp
+ elif (connectResult in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or (
+ connectResult == EINVAL and platformType == "win32"
+ ):
+ self.startReading()
+ self.startWriting()
+ return
+ else:
+ self.failIfNotConnected(
+ error.getConnectError((connectResult, strerror(connectResult)))
+ )
+ return
+
+ # If I have reached this point without raising or returning, that means
+ # that the socket is connected.
+ del self.doWrite
+ del self.doRead
+ # we first stop and then start, to reset any references to the old doRead
+ self.stopReading()
+ self.stopWriting()
+ self._connectDone()
+
+ def _connectDone(self):
+ """
+ This is a hook for when a connection attempt has succeeded.
+
+ Here, we build the protocol from the
+ L{twisted.internet.protocol.ClientFactory} that was passed in, compute
+ a log string, begin reading so as to send traffic to the newly built
+ protocol, and finally hook up the protocol itself.
+
+ This hook is overridden by L{ssl.Client} to initiate the TLS protocol.
+ """
+ self.protocol = self.connector.buildProtocol(self.getPeer())
+ self.connected = 1
+ logPrefix = self._getLogPrefix(self.protocol)
+ self.logstr = "%s,client" % logPrefix
+ if self.protocol is None:
+ # Factory.buildProtocol is allowed to return None. In that case,
+ # make up a protocol to satisfy the rest of the implementation;
+ # connectionLost is going to be called on something, for example.
+ # This is easier than adding special case support for a None
+ # protocol throughout the rest of the transport implementation.
+ self.protocol = Protocol()
+ # But dispose of the connection quickly.
+ self.loseConnection()
+ else:
+ self.startReading()
+ self.protocol.makeConnection(self)
+
+
+_NUMERIC_ONLY = socket.AI_NUMERICHOST | _AI_NUMERICSERV
+
+
+def _resolveIPv6(ip, port):
+ """
+ Resolve an IPv6 literal into an IPv6 address.
+
+ This is necessary to resolve any embedded scope identifiers to the relevant
+ C{sin6_scope_id} for use with C{socket.connect()}, C{socket.listen()}, or
+ C{socket.bind()}; see U{RFC 3493 <https://tools.ietf.org/html/rfc3493>} for
+ more information.
+
+ @param ip: An IPv6 address literal.
+ @type ip: C{str}
+
+ @param port: A port number.
+ @type port: C{int}
+
+ @return: a 4-tuple of C{(host, port, flow, scope)}, suitable for use as an
+ IPv6 address.
+
+ @raise socket.gaierror: if either the IP or port is not numeric as it
+ should be.
+ """
+ return socket.getaddrinfo(ip, port, 0, 0, 0, _NUMERIC_ONLY)[0][4]
+
+
+class _BaseTCPClient:
+ """
+ Code shared with other (non-POSIX) reactors for management of outgoing TCP
+ connections (both TCPv4 and TCPv6).
+
+ @note: In order to be functional, this class must be mixed into the same
+ hierarchy as L{_BaseBaseClient}. It would subclass L{_BaseBaseClient}
+ directly, but the class hierarchy here is divided in strange ways out
+ of the need to share code along multiple axes; specifically, with the
+ IOCP reactor and also with UNIX clients in other reactors.
+
+ @ivar _addressType: The Twisted _IPAddress implementation for this client
+ @type _addressType: L{IPv4Address} or L{IPv6Address}
+
+ @ivar connector: The L{Connector} which is driving this L{_BaseTCPClient}'s
+ connection attempt.
+
+ @ivar addr: The address that this socket will be connecting to.
+ @type addr: If IPv4, a 2-C{tuple} of C{(str host, int port)}. If IPv6, a
+ 4-C{tuple} of (C{str host, int port, int ignored, int scope}).
+
+ @ivar createInternetSocket: Subclasses must implement this as a method to
+ create a python socket object of the appropriate address family and
+ socket type.
+ @type createInternetSocket: 0-argument callable returning
+ C{socket._socketobject}.
+ """
+
+ _addressType = address.IPv4Address
+
+ def __init__(self, host, port, bindAddress, connector, reactor=None):
+ # BaseClient.__init__ is invoked later
+ self.connector = connector
+ self.addr = (host, port)
+
+ whenDone = self.resolveAddress
+ err = None
+ skt = None
+
+ if abstract.isIPAddress(host):
+ self._requiresResolution = False
+ elif abstract.isIPv6Address(host):
+ self._requiresResolution = False
+ self.addr = _resolveIPv6(host, port)
+ self.addressFamily = socket.AF_INET6
+ self._addressType = address.IPv6Address
+ else:
+ self._requiresResolution = True
+ try:
+ skt = self.createInternetSocket()
+ except OSError as se:
+ err = error.ConnectBindError(se.args[0], se.args[1])
+ whenDone = None
+ if whenDone and bindAddress is not None:
+ try:
+ if abstract.isIPv6Address(bindAddress[0]):
+ bindinfo = _resolveIPv6(*bindAddress)
+ else:
+ bindinfo = bindAddress
+ skt.bind(bindinfo)
+ except OSError as se:
+ err = error.ConnectBindError(se.args[0], se.args[1])
+ whenDone = None
+ self._finishInit(whenDone, skt, err, reactor)
+
+ def getHost(self):
+ """
+ Returns an L{IPv4Address} or L{IPv6Address}.
+
+ This indicates the address from which I am connecting.
+ """
+ return self._addressType("TCP", *_getsockname(self.socket))
+
+ def getPeer(self):
+ """
+ Returns an L{IPv4Address} or L{IPv6Address}.
+
+ This indicates the address that I am connected to.
+ """
+ return self._addressType("TCP", *self.realAddress)
+
+ def __repr__(self) -> str:
+ s = f"<{self.__class__} to {self.addr} at {id(self):x}>"
+ return s
+
+
+class Client(_BaseTCPClient, BaseClient):
+ """
+ A transport for a TCP protocol; either TCPv4 or TCPv6.
+
+ Do not create these directly; use L{IReactorTCP.connectTCP}.
+ """
+
+
+class Server(_TLSServerMixin, Connection):
+ """
+ Serverside socket-stream connection class.
+
+ This is a serverside network connection transport; a socket which came from
+ an accept() on a server.
+
+ @ivar _base: L{Connection}, which is the base class of this class which has
+ all of the useful file descriptor methods. This is used by
+ L{_TLSServerMixin} to call the right methods to directly manipulate the
+ transport, as is necessary for writing TLS-encrypted bytes (whereas
+ those methods on L{Server} will go through another layer of TLS if it
+ has been enabled).
+ """
+
+ _base = Connection
+
+ _addressType = address.IPv4Address
+
+ def __init__(self, sock, protocol, client, server, sessionno, reactor):
+ """
+ Server(sock, protocol, client, server, sessionno)
+
+ Initialize it with a socket, a protocol, a descriptor for my peer (a
+ tuple of host, port describing the other end of the connection), an
+ instance of Port, and a session number.
+ """
+ Connection.__init__(self, sock, protocol, reactor)
+ if len(client) != 2:
+ self._addressType = address.IPv6Address
+ self.server = server
+ self.client = client
+ self.sessionno = sessionno
+ self.hostname = client[0]
+
+ logPrefix = self._getLogPrefix(self.protocol)
+ self.logstr = f"{logPrefix},{sessionno},{self.hostname}"
+ if self.server is not None:
+ self.repstr: str = "<{} #{} on {}>".format(
+ self.protocol.__class__.__name__,
+ self.sessionno,
+ self.server._realPortNumber,
+ )
+ self.startReading()
+ self.connected = 1
+
+ def __repr__(self) -> str:
+ """
+ A string representation of this connection.
+ """
+ return self.repstr
+
+ @classmethod
+ def _fromConnectedSocket(cls, fileDescriptor, addressFamily, factory, reactor):
+ """
+ Create a new L{Server} based on an existing connected I{SOCK_STREAM}
+ socket.
+
+ Arguments are the same as to L{Server.__init__}, except where noted.
+
+ @param fileDescriptor: An integer file descriptor associated with a
+ connected socket. The socket must be in non-blocking mode. Any
+ additional attributes desired, such as I{FD_CLOEXEC}, must also be
+ set already.
+
+ @param addressFamily: The address family (sometimes called I{domain})
+ of the existing socket. For example, L{socket.AF_INET}.
+
+ @return: A new instance of C{cls} wrapping the socket given by
+ C{fileDescriptor}.
+ """
+ addressType = address.IPv4Address
+ if addressFamily == socket.AF_INET6:
+ addressType = address.IPv6Address
+ skt = socket.fromfd(fileDescriptor, addressFamily, socket.SOCK_STREAM)
+ addr = _getpeername(skt)
+ protocolAddr = addressType("TCP", *addr)
+ localPort = skt.getsockname()[1]
+
+ protocol = factory.buildProtocol(protocolAddr)
+ if protocol is None:
+ skt.close()
+ return
+
+ self = cls(skt, protocol, addr, None, addr[1], reactor)
+ self.repstr = "<{} #{} on {}>".format(
+ self.protocol.__class__.__name__,
+ self.sessionno,
+ localPort,
+ )
+ protocol.makeConnection(self)
+ return self
+
+ def getHost(self):
+ """
+ Returns an L{IPv4Address} or L{IPv6Address}.
+
+ This indicates the server's address.
+ """
+ addr = _getsockname(self.socket)
+ return self._addressType("TCP", *addr)
+
+ def getPeer(self):
+ """
+ Returns an L{IPv4Address} or L{IPv6Address}.
+
+ This indicates the client's address.
+ """
+ return self._addressType("TCP", *self.client)
+
+
+class _IFileDescriptorReservation(Interface):
+ """
+ An open file that represents an emergency reservation in the
+ process' file descriptor table. If L{Port} encounters C{EMFILE}
+ on C{accept(2)}, it can close this file descriptor, retry the
+ C{accept} so that the incoming connection occupies this file
+ descriptor's space, and then close that connection and reopen this
+ one.
+
+ Calling L{_IFileDescriptorReservation.reserve} attempts to open
+ the reserve file descriptor if it is not already open.
+ L{_IFileDescriptorReservation.available} returns L{True} if the
+ underlying file is open and its descriptor claimed.
+
+ L{_IFileDescriptorReservation} instances are context managers;
+ entering them releases the underlying file descriptor, while
+ exiting them attempts to reacquire it. The block can take
+ advantage of the free slot in the process' file descriptor table
+ accept and close a client connection.
+
+ Because another thread might open a file descriptor between the
+ time the context manager is entered and the time C{accept} is
+ called, opening the reserve descriptor is best-effort only.
+ """
+
+ def available():
+ """
+ Is the reservation available?
+
+ @return: L{True} if the reserved file descriptor is open and
+ can thus be closed to allow a new file to be opened in its
+ place; L{False} if it is not open.
+ """
+
+ def reserve():
+ """
+ Attempt to open the reserved file descriptor; if this fails
+ because of C{EMFILE}, internal state is reset so that another
+ reservation attempt can be made.
+
+ @raises Exception: Any exception except an L{OSError} whose
+ errno is L{EMFILE}.
+ """
+
+ def __enter__():
+ """
+ Release the underlying file descriptor so that code within the
+ context manager can open a new file.
+ """
+
+ def __exit__(excType, excValue, traceback):
+ """
+ Attempt to re-open the reserved file descriptor. See
+ L{reserve} for caveats.
+
+ @param excType: See L{object.__exit__}
+ @param excValue: See L{object.__exit__}
+ @param traceback: See L{object.__exit__}
+ """
+
+
+class _HasClose(typing_extensions.Protocol):
+ def close(self) -> object:
+ ...
+
+
+@implementer(_IFileDescriptorReservation)
+@attr.s(auto_attribs=True)
+class _FileDescriptorReservation:
+ """
+ L{_IFileDescriptorReservation} implementation.
+
+ @ivar fileFactory: A factory that will be called to reserve a
+ file descriptor.
+ @type fileFactory: A L{callable} that accepts no arguments and
+ returns an object with a C{close} method.
+ """
+
+ _log: ClassVar[Logger] = Logger()
+
+ _fileFactory: Callable[[], _HasClose]
+ _fileDescriptor: Optional[_HasClose] = attr.ib(init=False, default=None)
+
+ def available(self):
+ """
+ See L{_IFileDescriptorReservation.available}.
+
+ @return: L{True} if the reserved file descriptor is open and
+ can thus be closed to allow a new file to be opened in its
+ place; L{False} if it is not open.
+ """
+ return self._fileDescriptor is not None
+
+ def reserve(self):
+ """
+ See L{_IFileDescriptorReservation.reserve}.
+ """
+ if self._fileDescriptor is None:
+ try:
+ fileDescriptor = self._fileFactory()
+ except OSError as e:
+ if e.errno == EMFILE:
+ self._log.failure(
+ "Could not reserve EMFILE recovery file descriptor."
+ )
+ else:
+ raise
+ else:
+ self._fileDescriptor = fileDescriptor
+
+ def __enter__(self):
+ """
+ See L{_IFileDescriptorReservation.__enter__}.
+ """
+ if self._fileDescriptor is None:
+ raise RuntimeError("No file reserved. Have you called my reserve method?")
+ self._fileDescriptor.close()
+ self._fileDescriptor = None
+
+ def __exit__(self, excType, excValue, traceback):
+ """
+ See L{_IFileDescriptorReservation.__exit__}.
+ """
+ try:
+ self.reserve()
+ except Exception:
+ self._log.failure("Could not re-reserve EMFILE recovery file descriptor.")
+
+
+@implementer(_IFileDescriptorReservation)
+class _NullFileDescriptorReservation:
+ """
+ A null implementation of L{_IFileDescriptorReservation}.
+ """
+
+ def available(self):
+ """
+ The reserved file is never available. See
+ L{_IFileDescriptorReservation.available}.
+
+ @return: L{False}
+ """
+ return False
+
+ def reserve(self):
+ """
+ Do nothing. See L{_IFileDescriptorReservation.reserve}.
+ """
+
+ def __enter__(self):
+ """
+ Do nothing. See L{_IFileDescriptorReservation.__enter__}
+
+ @return: L{False}
+ """
+
+ def __exit__(self, excType, excValue, traceback):
+ """
+ Do nothing. See L{_IFileDescriptorReservation.__exit__}.
+
+ @param excType: See L{object.__exit__}
+ @param excValue: See L{object.__exit__}
+ @param traceback: See L{object.__exit__}
+ """
+
+
+# Don't keep a reserve file descriptor for coping with file descriptor
+# exhaustion on Windows.
+
+# WSAEMFILE occurs when a process has run out of memory, not when a
+# specific limit has been reached. Windows sockets are handles, which
+# differ from UNIX's file descriptors in that they can refer to any
+# "named kernel object", including user interface resources like menu
+# and icons. The generality of handles results in a much higher limit
+# than UNIX imposes on file descriptors: a single Windows process can
+# allocate up to 16,777,216 handles. Because they're indexes into a
+# three level table whose upper two layers are allocated from
+# swappable pages, handles compete for heap space with other kernel
+# objects, not with each other. Closing a given socket handle may not
+# release enough memory to allow the process to make progress.
+#
+# This fundamental difference between file descriptors and handles
+# makes a reserve file descriptor useless on Windows. Note that other
+# event loops, such as libuv and libevent, also do not special case
+# WSAEMFILE.
+#
+# For an explanation of handles, see the "Object Manager"
+# (pp. 140-175) section of
+#
+# Windows Internals, Part 1: Covering Windows Server 2008 R2 and
+# Windows 7 (6th ed.)
+# Mark E. Russinovich, David A. Solomon, and Alex
+# Ionescu. 2012. Microsoft Press.
+if platformType == "win32":
+ _reservedFD = _NullFileDescriptorReservation()
+else:
+ _reservedFD = _FileDescriptorReservation(lambda: open(os.devnull)) # type: ignore[assignment]
+
+
+# Linux and other UNIX-like operating systems return EMFILE when a
+# process has reached its soft limit of file descriptors. *BSD and
+# Win32 raise (WSA)ENOBUFS when socket limits are reached. Linux can
+# give ENFILE if the system is out of inodes, or ENOMEM if there is
+# insufficient memory to allocate a new dentry. ECONNABORTED is
+# documented as possible on all relevant platforms (Linux, Windows,
+# macOS, and the BSDs) but occurs only on the BSDs. It occurs when a
+# client sends a FIN or RST after the server sends a SYN|ACK but
+# before application code calls accept(2). On Linux, calling
+# accept(2) on such a listener returns a connection that fails as
+# though the it were terminated after being fully established. This
+# appears to be an implementation choice (see inet_accept in
+# inet/ipv4/af_inet.c). On macOS, such a listener is not considered
+# readable, so accept(2) will never be called. Calling accept(2) on
+# such a listener, however, does not return at all.
+_ACCEPT_ERRORS = (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED)
+
+
+@attr.s(auto_attribs=True)
+class _BuffersLogs:
+ """
+ A context manager that buffers any log events until after its
+ block exits.
+
+ @ivar _namespace: The namespace of the buffered events.
+ @type _namespace: L{str}.
+
+ @ivar _observer: The observer to which buffered log events will be
+ written
+ @type _observer: L{twisted.logger.ILogObserver}.
+ """
+
+ _namespace: str
+ _observer: ILogObserver
+ _logs: List[LogEvent] = attr.ib(default=attr.Factory(list))
+
+ def __enter__(self):
+ """
+ Enter a log buffering context.
+
+ @return: A logger that buffers log events.
+ @rtype: L{Logger}.
+ """
+ return Logger(namespace=self._namespace, observer=self._logs.append)
+
+ def __exit__(self, excValue, excType, traceback):
+ """
+ Exit a log buffering context and log all buffered events to
+ the provided observer.
+
+ @param excType: See L{object.__exit__}
+ @param excValue: See L{object.__exit__}
+ @param traceback: See L{object.__exit__}
+ """
+ for event in self._logs:
+ self._observer(event)
+
+
+def _accept(logger, accepts, listener, reservedFD):
+ """
+ Return a generator that yields client sockets from the provided
+ listening socket until there are none left or an unrecoverable
+ error occurs.
+
+ @param logger: A logger to which C{accept}-related events will be
+ logged. This should not log to arbitrary observers that might
+ open a file descriptor to avoid claiming the C{EMFILE} file
+ descriptor on UNIX-like systems.
+ @type logger: L{Logger}
+
+ @param accepts: An iterable iterated over to limit the number
+ consecutive C{accept}s.
+ @type accepts: An iterable.
+
+ @param listener: The listening socket.
+ @type listener: L{socket.socket}
+
+ @param reservedFD: A reserved file descriptor that can be used to
+ recover from C{EMFILE} on UNIX-like systems.
+ @type reservedFD: L{_IFileDescriptorReservation}
+
+ @return: A generator that yields C{(socket, addr)} tuples from
+ L{socket.socket.accept}
+ """
+ for _ in accepts:
+ try:
+ client, address = listener.accept()
+ except OSError as e:
+ if e.args[0] in (EWOULDBLOCK, EAGAIN):
+ # No more clients.
+ return
+ elif e.args[0] == EPERM:
+ # Netfilter on Linux may have rejected the
+ # connection, but we get told to try to accept()
+ # anyway.
+ continue
+ elif e.args[0] == EMFILE and reservedFD.available():
+ # Linux and other UNIX-like operating systems return
+ # EMFILE when a process has reached its soft limit of
+ # file descriptors. The reserved file descriptor is
+ # available, so it can be released to free up a
+ # descriptor for use by listener.accept()'s clients.
+ # Each client socket will be closed until the listener
+ # returns EAGAIN.
+ logger.info(
+ "EMFILE encountered;" " releasing reserved file descriptor."
+ )
+ # The following block should not run arbitrary code
+ # that might acquire its own file descriptor.
+ with reservedFD:
+ clientsToClose = _accept(logger, accepts, listener, reservedFD)
+ for clientToClose, closedAddress in clientsToClose:
+ clientToClose.close()
+ logger.info(
+ "EMFILE recovery:" " Closed socket from {address}",
+ address=closedAddress,
+ )
+ logger.info("Re-reserving EMFILE recovery file descriptor.")
+ return
+ elif e.args[0] in _ACCEPT_ERRORS:
+ logger.info(
+ "Could not accept new connection ({acceptError})",
+ acceptError=errorcode[e.args[0]],
+ )
+ return
+ else:
+ raise
+ else:
+ yield client, address
+
+
+@implementer(IListeningPort)
+class Port(base.BasePort, _SocketCloser):
+ """
+ A TCP server port, listening for connections.
+
+ When a connection is accepted, this will call a factory's buildProtocol
+ with the incoming address as an argument, according to the specification
+ described in L{twisted.internet.interfaces.IProtocolFactory}.
+
+ If you wish to change the sort of transport that will be used, the
+ C{transport} attribute will be called with the signature expected for
+ C{Server.__init__}, so it can be replaced.
+
+ @ivar deferred: a deferred created when L{stopListening} is called, and
+ that will fire when connection is lost. This is not to be used it
+ directly: prefer the deferred returned by L{stopListening} instead.
+ @type deferred: L{defer.Deferred}
+
+ @ivar disconnecting: flag indicating that the L{stopListening} method has
+ been called and that no connections should be accepted anymore.
+ @type disconnecting: C{bool}
+
+ @ivar connected: flag set once the listen has successfully been called on
+ the socket.
+ @type connected: C{bool}
+
+ @ivar _type: A string describing the connections which will be created by
+ this port. Normally this is C{"TCP"}, since this is a TCP port, but
+ when the TLS implementation re-uses this class it overrides the value
+ with C{"TLS"}. Only used for logging.
+
+ @ivar _preexistingSocket: If not L{None}, a L{socket.socket} instance which
+ was created and initialized outside of the reactor and will be used to
+ listen for connections (instead of a new socket being created by this
+ L{Port}).
+ """
+
+ socketType = socket.SOCK_STREAM
+
+ transport = Server
+ sessionno = 0
+ interface = ""
+ backlog = 50
+
+ _type = "TCP"
+
+ # Actual port number being listened on, only set to a non-None
+ # value when we are actually listening.
+ _realPortNumber: Optional[int] = None
+
+ # An externally initialized socket that we will use, rather than creating
+ # our own.
+ _preexistingSocket = None
+
+ addressFamily = socket.AF_INET
+ _addressType = address.IPv4Address
+ _logger = Logger()
+
+ def __init__(self, port, factory, backlog=50, interface="", reactor=None):
+ """Initialize with a numeric port to listen on."""
+ base.BasePort.__init__(self, reactor=reactor)
+ self.port = port
+ self.factory = factory
+ self.backlog = backlog
+ if abstract.isIPv6Address(interface):
+ self.addressFamily = socket.AF_INET6
+ self._addressType = address.IPv6Address
+ self.interface = interface
+
+ @classmethod
+ def _fromListeningDescriptor(cls, reactor, fd, addressFamily, factory):
+ """
+ Create a new L{Port} based on an existing listening I{SOCK_STREAM}
+ socket.
+
+ Arguments are the same as to L{Port.__init__}, except where noted.
+
+ @param fd: An integer file descriptor associated with a listening
+ socket. The socket must be in non-blocking mode. Any additional
+ attributes desired, such as I{FD_CLOEXEC}, must also be set already.
+
+ @param addressFamily: The address family (sometimes called I{domain}) of
+ the existing socket. For example, L{socket.AF_INET}.
+
+ @return: A new instance of C{cls} wrapping the socket given by C{fd}.
+ """
+ port = socket.fromfd(fd, addressFamily, cls.socketType)
+ interface = _getsockname(port)[0]
+ self = cls(None, factory, None, interface, reactor)
+ self._preexistingSocket = port
+ return self
+
+ def __repr__(self) -> str:
+ if self._realPortNumber is not None:
+ return "<{} of {} on {}>".format(
+ self.__class__,
+ self.factory.__class__,
+ self._realPortNumber,
+ )
+ else:
+ return "<{} of {} (not listening)>".format(
+ self.__class__,
+ self.factory.__class__,
+ )
+
+ def createInternetSocket(self):
+ s = base.BasePort.createInternetSocket(self)
+ if platformType == "posix" and sys.platform != "cygwin":
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ return s
+
+ def startListening(self):
+ """Create and bind my socket, and begin listening on it.
+
+ This is called on unserialization, and must be called after creating a
+ server to begin listening on the specified port.
+ """
+ _reservedFD.reserve()
+ if self._preexistingSocket is None:
+ # Create a new socket and make it listen
+ try:
+ skt = self.createInternetSocket()
+ if self.addressFamily == socket.AF_INET6:
+ addr = _resolveIPv6(self.interface, self.port)
+ else:
+ addr = (self.interface, self.port)
+ skt.bind(addr)
+ except OSError as le:
+ raise CannotListenError(self.interface, self.port, le)
+ skt.listen(self.backlog)
+ else:
+ # Re-use the externally specified socket
+ skt = self._preexistingSocket
+ self._preexistingSocket = None
+ # Avoid shutting it down at the end.
+ self._shouldShutdown = False
+
+ # Make sure that if we listened on port 0, we update that to
+ # reflect what the OS actually assigned us.
+ self._realPortNumber = skt.getsockname()[1]
+
+ log.msg(
+ "%s starting on %s"
+ % (self._getLogPrefix(self.factory), self._realPortNumber)
+ )
+
+ # The order of the next 5 lines is kind of bizarre. If no one
+ # can explain it, perhaps we should re-arrange them.
+ self.factory.doStart()
+ self.connected = True
+ self.socket = skt
+ self.fileno = self.socket.fileno
+ self.numberAccepts = 100
+
+ self.startReading()
+
+ def _buildAddr(self, address):
+ return self._addressType("TCP", *address)
+
+ def doRead(self):
+ """
+ Called when my socket is ready for reading.
+
+ This accepts a connection and calls self.protocol() to handle the
+ wire-level protocol.
+ """
+ try:
+ if platformType == "posix":
+ numAccepts = self.numberAccepts
+ else:
+ # win32 event loop breaks if we do more than one accept()
+ # in an iteration of the event loop.
+ numAccepts = 1
+
+ with _BuffersLogs(
+ self._logger.namespace, self._logger.observer
+ ) as bufferingLogger:
+ accepted = 0
+ clients = _accept(
+ bufferingLogger, range(numAccepts), self.socket, _reservedFD
+ )
+
+ for accepted, (skt, addr) in enumerate(clients, 1):
+ fdesc._setCloseOnExec(skt.fileno())
+
+ if len(addr) == 4:
+ # IPv6, make sure we get the scopeID if it
+ # exists
+ host = socket.getnameinfo(
+ addr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV
+ )
+ addr = tuple([host[0]] + list(addr[1:]))
+
+ protocol = self.factory.buildProtocol(self._buildAddr(addr))
+ if protocol is None:
+ skt.close()
+ continue
+ s = self.sessionno
+ self.sessionno = s + 1
+ transport = self.transport(
+ skt, protocol, addr, self, s, self.reactor
+ )
+ protocol.makeConnection(transport)
+
+ # Scale our synchronous accept loop according to traffic
+ # Reaching our limit on consecutive accept calls indicates
+ # there might be still more clients to serve the next time
+ # the reactor calls us. Prepare to accept some more.
+ if accepted == self.numberAccepts:
+ self.numberAccepts += 20
+ # Otherwise, don't attempt to accept any more clients than
+ # we just accepted or any less than 1.
+ else:
+ self.numberAccepts = max(1, accepted)
+ except BaseException:
+ # Note that in TLS mode, this will possibly catch SSL.Errors
+ # raised by self.socket.accept()
+ #
+ # There is no "except SSL.Error:" above because SSL may be
+ # None if there is no SSL support. In any case, all the
+ # "except SSL.Error:" suite would probably do is log.deferr()
+ # and return, so handling it here works just as well.
+ log.deferr()
+
+ def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)):
+ """
+ Stop accepting connections on this port.
+
+ This will shut down the socket and call self.connectionLost(). It
+ returns a deferred which will fire successfully when the port is
+ actually closed, or with a failure if an error occurs shutting down.
+ """
+ self.disconnecting = True
+ self.stopReading()
+ if self.connected:
+ self.deferred = deferLater(self.reactor, 0, self.connectionLost, connDone)
+ return self.deferred
+
+ stopListening = loseConnection
+
+ def _logConnectionLostMsg(self):
+ """
+ Log message for closing port
+ """
+ log.msg(f"({self._type} Port {self._realPortNumber} Closed)")
+
+ def connectionLost(self, reason):
+ """
+ Cleans up the socket.
+ """
+ self._logConnectionLostMsg()
+ self._realPortNumber = None
+
+ base.BasePort.connectionLost(self, reason)
+ self.connected = False
+ self._closeSocket(True)
+ del self.socket
+ del self.fileno
+
+ try:
+ self.factory.doStop()
+ finally:
+ self.disconnecting = False
+
+ def logPrefix(self):
+ """Returns the name of my class, to prefix log entries with."""
+ return reflect.qual(self.factory.__class__)
+
+ def getHost(self):
+ """
+ Return an L{IPv4Address} or L{IPv6Address} indicating the listening
+ address of this port.
+ """
+ addr = _getsockname(self.socket)
+ return self._addressType("TCP", *addr)
+
+
+class Connector(base.BaseConnector):
+ """
+ A L{Connector} provides of L{twisted.internet.interfaces.IConnector} for
+ all POSIX-style reactors.
+
+ @ivar _addressType: the type returned by L{Connector.getDestination}.
+ Either L{IPv4Address} or L{IPv6Address}, depending on the type of
+ address.
+ @type _addressType: C{type}
+ """
+
+ _addressType = address.IPv4Address
+
+ def __init__(self, host, port, factory, timeout, bindAddress, reactor=None):
+ if isinstance(port, str):
+ try:
+ port = socket.getservbyname(port, "tcp")
+ except OSError as e:
+ raise error.ServiceNameUnknownError(string=f"{e} ({port!r})")
+ self.host, self.port = host, port
+ if abstract.isIPv6Address(host):
+ self._addressType = address.IPv6Address
+ self.bindAddress = bindAddress
+ base.BaseConnector.__init__(self, factory, timeout, reactor)
+
+ def _makeTransport(self):
+ """
+ Create a L{Client} bound to this L{Connector}.
+
+ @return: a new L{Client}
+ @rtype: L{Client}
+ """
+ return Client(self.host, self.port, self.bindAddress, self, self.reactor)
+
+ def getDestination(self):
+ """
+ @see: L{twisted.internet.interfaces.IConnector.getDestination}.
+ """
+ return self._addressType("TCP", self.host, self.port)