diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/internet/udp.py | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/internet/udp.py')
-rw-r--r-- | contrib/python/Twisted/py3/twisted/internet/udp.py | 533 |
1 files changed, 533 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/internet/udp.py b/contrib/python/Twisted/py3/twisted/internet/udp.py new file mode 100644 index 0000000000..7601f2dc84 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/internet/udp.py @@ -0,0 +1,533 @@ +# -*- test-case-name: twisted.test.test_udp -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Various asynchronous UDP classes. + +Please do not use this module directly. + +@var _sockErrReadIgnore: list of symbolic error constants (from the C{errno} + module) representing socket errors where the error is temporary and can be + ignored. + +@var _sockErrReadRefuse: list of symbolic error constants (from the C{errno} + module) representing socket errors that indicate connection refused. +""" + + +# System Imports +import socket +import struct +import warnings +from typing import Optional + +from zope.interface import implementer + +from twisted.python.runtime import platformType + +if platformType == "win32": + from errno import WSAEINPROGRESS # type: ignore[attr-defined] + from errno import WSAEWOULDBLOCK # type: ignore[attr-defined] + from errno import ( # type: ignore[attr-defined] + WSAECONNREFUSED, + WSAECONNRESET, + WSAEINTR, + WSAEMSGSIZE, + WSAENETRESET, + WSAENOPROTOOPT as ENOPROTOOPT, + WSAETIMEDOUT, + ) + + # Classify read and write errors + _sockErrReadIgnore = [WSAEINTR, WSAEWOULDBLOCK, WSAEMSGSIZE, WSAEINPROGRESS] + _sockErrReadRefuse = [WSAECONNREFUSED, WSAECONNRESET, WSAENETRESET, WSAETIMEDOUT] + + # POSIX-compatible write errors + EMSGSIZE = WSAEMSGSIZE + ECONNREFUSED = WSAECONNREFUSED + EAGAIN = WSAEWOULDBLOCK + EINTR = WSAEINTR +else: + from errno import EAGAIN, ECONNREFUSED, EINTR, EMSGSIZE, ENOPROTOOPT, EWOULDBLOCK + + _sockErrReadIgnore = [EAGAIN, EINTR, EWOULDBLOCK] + _sockErrReadRefuse = [ECONNREFUSED] + +# Twisted Imports +from twisted.internet import abstract, address, base, defer, error, interfaces +from twisted.python import failure, log + + +@implementer( + interfaces.IListeningPort, interfaces.IUDPTransport, interfaces.ISystemHandle +) +class Port(base.BasePort): + """ + UDP port, listening for packets. + + @ivar maxThroughput: Maximum number of bytes read in one event + loop iteration. + + @ivar addressFamily: L{socket.AF_INET} or L{socket.AF_INET6}, depending on + whether this port is listening on an IPv4 address or an IPv6 address. + + @ivar _realPortNumber: Actual port number being listened on. The + value will be L{None} until this L{Port} is listening. + + @ivar _preexistingSocket: If not L{None}, a L{socket.socket} instance which + was created and initialized outside of the reactor and will be used to + listen for connections (instead of a new socket being created by this + L{Port}). + """ + + addressFamily = socket.AF_INET + socketType = socket.SOCK_DGRAM + maxThroughput = 256 * 1024 + + _realPortNumber: Optional[int] = None + _preexistingSocket = None + + def __init__(self, port, proto, interface="", maxPacketSize=8192, reactor=None): + """ + @param port: A port number on which to listen. + @type port: L{int} + + @param proto: A C{DatagramProtocol} instance which will be + connected to the given C{port}. + @type proto: L{twisted.internet.protocol.DatagramProtocol} + + @param interface: The local IPv4 or IPv6 address to which to bind; + defaults to '', ie all IPv4 addresses. + @type interface: L{str} + + @param maxPacketSize: The maximum packet size to accept. + @type maxPacketSize: L{int} + + @param reactor: A reactor which will notify this C{Port} when + its socket is ready for reading or writing. Defaults to + L{None}, ie the default global reactor. + @type reactor: L{interfaces.IReactorFDSet} + """ + base.BasePort.__init__(self, reactor) + self.port = port + self.protocol = proto + self.maxPacketSize = maxPacketSize + self.interface = interface + self.setLogStr() + self._connectedAddr = None + self._setAddressFamily() + + @classmethod + def _fromListeningDescriptor( + cls, reactor, fd, addressFamily, protocol, maxPacketSize + ): + """ + Create a new L{Port} based on an existing listening + I{SOCK_DGRAM} socket. + + @param reactor: A reactor which will notify this L{Port} when + its socket is ready for reading or writing. Defaults to + L{None}, ie the default global reactor. + @type reactor: L{interfaces.IReactorFDSet} + + @param fd: An integer file descriptor associated with a listening + socket. The socket must be in non-blocking mode. Any additional + attributes desired, such as I{FD_CLOEXEC}, must also be set already. + @type fd: L{int} + + @param addressFamily: The address family (sometimes called I{domain}) of + the existing socket. For example, L{socket.AF_INET}. + @type addressFamily: L{int} + + @param protocol: A C{DatagramProtocol} instance which will be + connected to the C{port}. + @type protocol: L{twisted.internet.protocol.DatagramProtocol} + + @param maxPacketSize: The maximum packet size to accept. + @type maxPacketSize: L{int} + + @return: A new instance of C{cls} wrapping the socket given by C{fd}. + @rtype: L{Port} + """ + port = socket.fromfd(fd, addressFamily, cls.socketType) + interface = port.getsockname()[0] + self = cls( + None, + protocol, + interface=interface, + reactor=reactor, + maxPacketSize=maxPacketSize, + ) + self._preexistingSocket = port + return self + + def __repr__(self) -> str: + if self._realPortNumber is not None: + return f"<{self.protocol.__class__} on {self._realPortNumber}>" + else: + return f"<{self.protocol.__class__} not connected>" + + def getHandle(self): + """ + Return a socket object. + """ + return self.socket + + def startListening(self): + """ + Create and bind my socket, and begin listening on it. + + This is called on unserialization, and must be called after creating a + server to begin listening on the specified port. + """ + self._bindSocket() + self._connectToProtocol() + + def _bindSocket(self): + """ + Prepare and assign a L{socket.socket} instance to + C{self.socket}. + + Either creates a new SOCK_DGRAM L{socket.socket} bound to + C{self.interface} and C{self.port} or takes an existing + L{socket.socket} provided via the + L{interfaces.IReactorSocket.adoptDatagramPort} interface. + """ + if self._preexistingSocket is None: + # Create a new socket and make it listen + try: + skt = self.createInternetSocket() + skt.bind((self.interface, self.port)) + except OSError as le: + raise error.CannotListenError(self.interface, self.port, le) + else: + # Re-use the externally specified socket + skt = self._preexistingSocket + self._preexistingSocket = None + + # Make sure that if we listened on port 0, we update that to + # reflect what the OS actually assigned us. + self._realPortNumber = skt.getsockname()[1] + + log.msg( + "%s starting on %s" + % (self._getLogPrefix(self.protocol), self._realPortNumber) + ) + + self.connected = 1 + self.socket = skt + self.fileno = self.socket.fileno + + def _connectToProtocol(self): + self.protocol.makeConnection(self) + self.startReading() + + def doRead(self): + """ + Called when my socket is ready for reading. + """ + read = 0 + while read < self.maxThroughput: + try: + data, addr = self.socket.recvfrom(self.maxPacketSize) + except OSError as se: + no = se.args[0] + if no in _sockErrReadIgnore: + return + if no in _sockErrReadRefuse: + if self._connectedAddr: + self.protocol.connectionRefused() + return + raise + else: + read += len(data) + if self.addressFamily == socket.AF_INET6: + # Remove the flow and scope ID from the address tuple, + # reducing it to a tuple of just (host, port). + # + # TODO: This should be amended to return an object that can + # unpack to (host, port) but also includes the flow info + # and scope ID. See http://tm.tl/6826 + addr = addr[:2] + try: + self.protocol.datagramReceived(data, addr) + except BaseException: + log.err() + + def write(self, datagram, addr=None): + """ + Write a datagram. + + @type datagram: L{bytes} + @param datagram: The datagram to be sent. + + @type addr: L{tuple} containing L{str} as first element and L{int} as + second element, or L{None} + @param addr: A tuple of (I{stringified IPv4 or IPv6 address}, + I{integer port number}); can be L{None} in connected mode. + """ + if self._connectedAddr: + assert addr in (None, self._connectedAddr) + try: + return self.socket.send(datagram) + except OSError as se: + no = se.args[0] + if no == EINTR: + return self.write(datagram) + elif no == EMSGSIZE: + raise error.MessageLengthError("message too long") + elif no == ECONNREFUSED: + self.protocol.connectionRefused() + else: + raise + else: + assert addr != None + if ( + not abstract.isIPAddress(addr[0]) + and not abstract.isIPv6Address(addr[0]) + and addr[0] != "<broadcast>" + ): + raise error.InvalidAddressError( + addr[0], "write() only accepts IP addresses, not hostnames" + ) + if ( + abstract.isIPAddress(addr[0]) or addr[0] == "<broadcast>" + ) and self.addressFamily == socket.AF_INET6: + raise error.InvalidAddressError( + addr[0], "IPv6 port write() called with IPv4 or broadcast address" + ) + if abstract.isIPv6Address(addr[0]) and self.addressFamily == socket.AF_INET: + raise error.InvalidAddressError( + addr[0], "IPv4 port write() called with IPv6 address" + ) + try: + return self.socket.sendto(datagram, addr) + except OSError as se: + no = se.args[0] + if no == EINTR: + return self.write(datagram, addr) + elif no == EMSGSIZE: + raise error.MessageLengthError("message too long") + elif no == ECONNREFUSED: + # in non-connected UDP ECONNREFUSED is platform dependent, I + # think and the info is not necessarily useful. Nevertheless + # maybe we should call connectionRefused? XXX + return + else: + raise + + def writeSequence(self, seq, addr): + """ + Write a datagram constructed from an iterable of L{bytes}. + + @param seq: The data that will make up the complete datagram to be + written. + @type seq: an iterable of L{bytes} + + @type addr: L{tuple} containing L{str} as first element and L{int} as + second element, or L{None} + @param addr: A tuple of (I{stringified IPv4 or IPv6 address}, + I{integer port number}); can be L{None} in connected mode. + """ + self.write(b"".join(seq), addr) + + def connect(self, host, port): + """ + 'Connect' to remote server. + """ + if self._connectedAddr: + raise RuntimeError( + "already connected, reconnecting is not currently supported" + ) + if not abstract.isIPAddress(host) and not abstract.isIPv6Address(host): + raise error.InvalidAddressError(host, "not an IPv4 or IPv6 address.") + self._connectedAddr = (host, port) + self.socket.connect((host, port)) + + def _loseConnection(self): + self.stopReading() + if self.connected: # actually means if we are *listening* + self.reactor.callLater(0, self.connectionLost) + + def stopListening(self): + if self.connected: + result = self.d = defer.Deferred() + else: + result = None + self._loseConnection() + return result + + def loseConnection(self): + warnings.warn( + "Please use stopListening() to disconnect port", + DeprecationWarning, + stacklevel=2, + ) + self.stopListening() + + def connectionLost(self, reason=None): + """ + Cleans up my socket. + """ + log.msg("(UDP Port %s Closed)" % self._realPortNumber) + self._realPortNumber = None + self.maxThroughput = -1 + base.BasePort.connectionLost(self, reason) + self.protocol.doStop() + self.socket.close() + del self.socket + del self.fileno + if hasattr(self, "d"): + self.d.callback(None) + del self.d + + def setLogStr(self): + """ + Initialize the C{logstr} attribute to be used by C{logPrefix}. + """ + logPrefix = self._getLogPrefix(self.protocol) + self.logstr = "%s (UDP)" % logPrefix + + def _setAddressFamily(self): + """ + Resolve address family for the socket. + """ + if abstract.isIPv6Address(self.interface): + self.addressFamily = socket.AF_INET6 + elif abstract.isIPAddress(self.interface): + self.addressFamily = socket.AF_INET + elif self.interface: + raise error.InvalidAddressError( + self.interface, "not an IPv4 or IPv6 address." + ) + + def logPrefix(self): + """ + Return the prefix to log with. + """ + return self.logstr + + def getHost(self): + """ + Return the local address of the UDP connection + + @returns: the local address of the UDP connection + @rtype: L{IPv4Address} or L{IPv6Address} + """ + addr = self.socket.getsockname() + if self.addressFamily == socket.AF_INET: + return address.IPv4Address("UDP", *addr) + elif self.addressFamily == socket.AF_INET6: + return address.IPv6Address("UDP", *(addr[:2])) + + def setBroadcastAllowed(self, enabled): + """ + Set whether this port may broadcast. This is disabled by default. + + @param enabled: Whether the port may broadcast. + @type enabled: L{bool} + """ + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, enabled) + + def getBroadcastAllowed(self): + """ + Checks if broadcast is currently allowed on this port. + + @return: Whether this port may broadcast. + @rtype: L{bool} + """ + return bool(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST)) + + +class MulticastMixin: + """ + Implement multicast functionality. + """ + + def getOutgoingInterface(self): + i = self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF) + return socket.inet_ntoa(struct.pack("@i", i)) + + def setOutgoingInterface(self, addr): + """Returns Deferred of success.""" + return self.reactor.resolve(addr).addCallback(self._setInterface) + + def _setInterface(self, addr): + i = socket.inet_aton(addr) + self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, i) + return 1 + + def getLoopbackMode(self): + return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP) + + def setLoopbackMode(self, mode): + mode = struct.pack("b", bool(mode)) + self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, mode) + + def getTTL(self): + return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL) + + def setTTL(self, ttl): + ttl = struct.pack("B", ttl) + self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl) + + def joinGroup(self, addr, interface=""): + """Join a multicast group. Returns Deferred of success.""" + return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 1) + + def _joinAddr1(self, addr, interface, join): + return self.reactor.resolve(interface).addCallback(self._joinAddr2, addr, join) + + def _joinAddr2(self, interface, addr, join): + addr = socket.inet_aton(addr) + interface = socket.inet_aton(interface) + if join: + cmd = socket.IP_ADD_MEMBERSHIP + else: + cmd = socket.IP_DROP_MEMBERSHIP + try: + self.socket.setsockopt(socket.IPPROTO_IP, cmd, addr + interface) + except OSError as e: + return failure.Failure(error.MulticastJoinError(addr, interface, *e.args)) + + def leaveGroup(self, addr, interface=""): + """Leave multicast group, return Deferred of success.""" + return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 0) + + +@implementer(interfaces.IMulticastTransport) +class MulticastPort(MulticastMixin, Port): + """ + UDP Port that supports multicasting. + """ + + def __init__( + self, + port, + proto, + interface="", + maxPacketSize=8192, + reactor=None, + listenMultiple=False, + ): + """ + @see: L{twisted.internet.interfaces.IReactorMulticast.listenMulticast} + """ + Port.__init__(self, port, proto, interface, maxPacketSize, reactor) + self.listenMultiple = listenMultiple + + def createInternetSocket(self): + skt = Port.createInternetSocket(self) + if self.listenMultiple: + skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + if hasattr(socket, "SO_REUSEPORT"): + try: + skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + except OSError as le: + # RHEL6 defines SO_REUSEPORT but it doesn't work + if le.errno == ENOPROTOOPT: + pass + else: + raise + return skt |