diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/internet/iocpreactor | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/internet/iocpreactor')
9 files changed, 1836 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/__init__.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/__init__.py new file mode 100644 index 0000000000..d1881d4fe3 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/__init__.py @@ -0,0 +1,10 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +I/O Completion Ports reactor +""" + +from twisted.internet.iocpreactor.reactor import install + +__all__ = ["install"] diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/abstract.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/abstract.py new file mode 100644 index 0000000000..818c86068d --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/abstract.py @@ -0,0 +1,387 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Abstract file handle class +""" + +import errno + +from zope.interface import implementer + +from twisted.internet import error, interfaces, main +from twisted.internet.abstract import _ConsumerMixin, _dataMustBeBytes, _LogOwner +from twisted.internet.iocpreactor import iocpsupport as _iocp +from twisted.internet.iocpreactor.const import ERROR_HANDLE_EOF, ERROR_IO_PENDING +from twisted.python import failure + + +@implementer( + interfaces.IPushProducer, + interfaces.IConsumer, + interfaces.ITransport, + interfaces.IHalfCloseableDescriptor, +) +class FileHandle(_ConsumerMixin, _LogOwner): + """ + File handle that can read and write asynchronously + """ + + # read stuff + maxReadBuffers = 16 + readBufferSize = 4096 + reading = False + dynamicReadBuffers = True # set this to false if subclass doesn't do iovecs + _readNextBuffer = 0 + _readSize = 0 # how much data we have in the read buffer + _readScheduled = None + _readScheduledInOS = False + + def startReading(self): + self.reactor.addActiveHandle(self) + if not self._readScheduled and not self.reading: + self.reading = True + self._readScheduled = self.reactor.callLater(0, self._resumeReading) + + def stopReading(self): + if self._readScheduled: + self._readScheduled.cancel() + self._readScheduled = None + self.reading = False + + def _resumeReading(self): + self._readScheduled = None + if self._dispatchData() and not self._readScheduledInOS: + self.doRead() + + def _dispatchData(self): + """ + Dispatch previously read data. Return True if self.reading and we don't + have any more data + """ + if not self._readSize: + return self.reading + size = self._readSize + full_buffers = size // self.readBufferSize + while self._readNextBuffer < full_buffers: + self.dataReceived(self._readBuffers[self._readNextBuffer]) + self._readNextBuffer += 1 + if not self.reading: + return False + remainder = size % self.readBufferSize + if remainder: + self.dataReceived(self._readBuffers[full_buffers][0:remainder]) + if self.dynamicReadBuffers: + total_buffer_size = self.readBufferSize * len(self._readBuffers) + # we have one buffer too many + if size < total_buffer_size - self.readBufferSize: + del self._readBuffers[-1] + # we filled all buffers, so allocate one more + elif ( + size == total_buffer_size + and len(self._readBuffers) < self.maxReadBuffers + ): + self._readBuffers.append(bytearray(self.readBufferSize)) + self._readNextBuffer = 0 + self._readSize = 0 + return self.reading + + def _cbRead(self, rc, data, evt): + self._readScheduledInOS = False + if self._handleRead(rc, data, evt): + self.doRead() + + def _handleRead(self, rc, data, evt): + """ + Returns False if we should stop reading for now + """ + if self.disconnected: + return False + # graceful disconnection + if (not (rc or data)) or rc in (errno.WSAEDISCON, ERROR_HANDLE_EOF): + self.reactor.removeActiveHandle(self) + self.readConnectionLost(failure.Failure(main.CONNECTION_DONE)) + return False + # XXX: not handling WSAEWOULDBLOCK + # ("too many outstanding overlapped I/O requests") + elif rc: + self.connectionLost( + failure.Failure( + error.ConnectionLost( + "read error -- %s (%s)" + % (errno.errorcode.get(rc, "unknown"), rc) + ) + ) + ) + return False + else: + assert self._readSize == 0 + assert self._readNextBuffer == 0 + self._readSize = data + return self._dispatchData() + + def doRead(self): + evt = _iocp.Event(self._cbRead, self) + + evt.buff = buff = self._readBuffers + rc, numBytesRead = self.readFromHandle(buff, evt) + + if not rc or rc == ERROR_IO_PENDING: + self._readScheduledInOS = True + else: + self._handleRead(rc, numBytesRead, evt) + + def readFromHandle(self, bufflist, evt): + raise NotImplementedError() # TODO: this should default to ReadFile + + def dataReceived(self, data): + raise NotImplementedError + + def readConnectionLost(self, reason): + self.connectionLost(reason) + + # write stuff + dataBuffer = b"" + offset = 0 + writing = False + _writeScheduled = None + _writeDisconnecting = False + _writeDisconnected = False + writeBufferSize = 2**2**2**2 + + def loseWriteConnection(self): + self._writeDisconnecting = True + self.startWriting() + + def _closeWriteConnection(self): + # override in subclasses + pass + + def writeConnectionLost(self, reason): + # in current code should never be called + self.connectionLost(reason) + + def startWriting(self): + self.reactor.addActiveHandle(self) + + if not self._writeScheduled and not self.writing: + self.writing = True + self._writeScheduled = self.reactor.callLater(0, self._resumeWriting) + + def stopWriting(self): + if self._writeScheduled: + self._writeScheduled.cancel() + self._writeScheduled = None + self.writing = False + + def _resumeWriting(self): + self._writeScheduled = None + self.doWrite() + + def _cbWrite(self, rc, numBytesWritten, evt): + if self._handleWrite(rc, numBytesWritten, evt): + self.doWrite() + + def _handleWrite(self, rc, numBytesWritten, evt): + """ + Returns false if we should stop writing for now + """ + if self.disconnected or self._writeDisconnected: + return False + # XXX: not handling WSAEWOULDBLOCK + # ("too many outstanding overlapped I/O requests") + if rc: + self.connectionLost( + failure.Failure( + error.ConnectionLost( + "write error -- %s (%s)" + % (errno.errorcode.get(rc, "unknown"), rc) + ) + ) + ) + return False + else: + self.offset += numBytesWritten + # If there is nothing left to send, + if self.offset == len(self.dataBuffer) and not self._tempDataLen: + self.dataBuffer = b"" + self.offset = 0 + # stop writing + self.stopWriting() + # If I've got a producer who is supposed to supply me with data + if self.producer is not None and ( + (not self.streamingProducer) or self.producerPaused + ): + # tell them to supply some more. + self.producerPaused = True + self.producer.resumeProducing() + elif self.disconnecting: + # But if I was previously asked to let the connection die, + # do so. + self.connectionLost(failure.Failure(main.CONNECTION_DONE)) + elif self._writeDisconnecting: + # I was previously asked to half-close the connection. + self._writeDisconnected = True + self._closeWriteConnection() + return False + else: + return True + + def doWrite(self): + if len(self.dataBuffer) - self.offset < self.SEND_LIMIT: + # If there is currently less than SEND_LIMIT bytes left to send + # in the string, extend it with the array data. + self.dataBuffer = self.dataBuffer[self.offset :] + b"".join( + self._tempDataBuffer + ) + self.offset = 0 + self._tempDataBuffer = [] + self._tempDataLen = 0 + + evt = _iocp.Event(self._cbWrite, self) + + # Send as much data as you can. + if self.offset: + sendView = memoryview(self.dataBuffer) + evt.buff = buff = sendView[self.offset :] + else: + evt.buff = buff = self.dataBuffer + rc, data = self.writeToHandle(buff, evt) + if rc and rc != ERROR_IO_PENDING: + self._handleWrite(rc, data, evt) + + def writeToHandle(self, buff, evt): + raise NotImplementedError() # TODO: this should default to WriteFile + + def write(self, data): + """Reliably write some data. + + The data is buffered until his file descriptor is ready for writing. + """ + _dataMustBeBytes(data) + if not self.connected or self._writeDisconnected: + return + if data: + self._tempDataBuffer.append(data) + self._tempDataLen += len(data) + if self.producer is not None and self.streamingProducer: + if len(self.dataBuffer) + self._tempDataLen > self.writeBufferSize: + self.producerPaused = True + self.producer.pauseProducing() + self.startWriting() + + def writeSequence(self, iovec): + for i in iovec: + _dataMustBeBytes(i) + if not self.connected or not iovec or self._writeDisconnected: + return + self._tempDataBuffer.extend(iovec) + for i in iovec: + self._tempDataLen += len(i) + if self.producer is not None and self.streamingProducer: + if len(self.dataBuffer) + self._tempDataLen > self.writeBufferSize: + self.producerPaused = True + self.producer.pauseProducing() + self.startWriting() + + # general stuff + connected = False + disconnected = False + disconnecting = False + logstr = "Uninitialized" + + SEND_LIMIT = 128 * 1024 + + def __init__(self, reactor=None): + if not reactor: + from twisted.internet import reactor + self.reactor = reactor + self._tempDataBuffer = [] # will be added to dataBuffer in doWrite + self._tempDataLen = 0 + self._readBuffers = [bytearray(self.readBufferSize)] + + def connectionLost(self, reason): + """ + The connection was lost. + + This is called when the connection on a selectable object has been + lost. It will be called whether the connection was closed explicitly, + an exception occurred in an event handler, or the other end of the + connection closed it first. + + Clean up state here, but make sure to call back up to FileDescriptor. + """ + + self.disconnected = True + self.connected = False + if self.producer is not None: + self.producer.stopProducing() + self.producer = None + self.stopReading() + self.stopWriting() + self.reactor.removeActiveHandle(self) + + def getFileHandle(self): + return -1 + + def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)): + """ + Close the connection at the next available opportunity. + + Call this to cause this FileDescriptor to lose its connection. It will + first write any data that it has buffered. + + If there is data buffered yet to be written, this method will cause the + transport to lose its connection as soon as it's done flushing its + write buffer. If you have a producer registered, the connection won't + be closed until the producer is finished. Therefore, make sure you + unregister your producer when it's finished, or the connection will + never close. + """ + + if self.connected and not self.disconnecting: + if self._writeDisconnected: + # doWrite won't trigger the connection close anymore + self.stopReading() + self.stopWriting + self.connectionLost(_connDone) + else: + self.stopReading() + self.startWriting() + self.disconnecting = 1 + + # Producer/consumer implementation + + def stopConsuming(self): + """ + Stop consuming data. + + This is called when a producer has lost its connection, to tell the + consumer to go lose its connection (and break potential circular + references). + """ + self.unregisterProducer() + self.loseConnection() + + # producer interface implementation + + def resumeProducing(self): + if self.connected and not self.disconnecting: + self.startReading() + + def pauseProducing(self): + self.stopReading() + + def stopProducing(self): + self.loseConnection() + + def getHost(self): + # ITransport.getHost + raise NotImplementedError() + + def getPeer(self): + # ITransport.getPeer + raise NotImplementedError() + + +__all__ = ["FileHandle"] diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/const.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/const.py new file mode 100644 index 0000000000..4814425af9 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/const.py @@ -0,0 +1,25 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + + +""" +Windows constants for IOCP +""" + + +# this stuff should really be gotten from Windows headers via pyrex, but it +# probably is not going to change + +ERROR_PORT_UNREACHABLE = 1234 +ERROR_NETWORK_UNREACHABLE = 1231 +ERROR_CONNECTION_REFUSED = 1225 +ERROR_IO_PENDING = 997 +ERROR_OPERATION_ABORTED = 995 +WAIT_TIMEOUT = 258 +ERROR_NETNAME_DELETED = 64 +ERROR_HANDLE_EOF = 38 + +INFINITE = -1 + +SO_UPDATE_CONNECT_CONTEXT = 0x7010 +SO_UPDATE_ACCEPT_CONTEXT = 0x700B diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/interfaces.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/interfaces.py new file mode 100644 index 0000000000..b161341efa --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/interfaces.py @@ -0,0 +1,42 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + + +""" +Interfaces for iocpreactor +""" + + +from zope.interface import Interface + + +class IReadHandle(Interface): + def readFromHandle(bufflist, evt): + """ + Read into the given buffers from this handle. + + @param bufflist: the buffers to read into + @type bufflist: list of objects implementing the read/write buffer protocol + + @param evt: an IOCP Event object + + @return: tuple (return code, number of bytes read) + """ + + +class IWriteHandle(Interface): + def writeToHandle(buff, evt): + """ + Write the given buffer to this handle. + + @param buff: the buffer to write + @type buff: any object implementing the buffer protocol + + @param evt: an IOCP Event object + + @return: tuple (return code, number of bytes written) + """ + + +class IReadWriteHandle(IReadHandle, IWriteHandle): + pass diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/iocpsupport.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/iocpsupport.py new file mode 100644 index 0000000000..826c976487 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/iocpsupport.py @@ -0,0 +1,27 @@ +__all__ = [ + "CompletionPort", + "Event", + "accept", + "connect", + "get_accept_addrs", + "have_connectex", + "makesockaddr", + "maxAddrLen", + "recv", + "recvfrom", + "send", +] + +from twisted_iocpsupport.iocpsupport import ( # type: ignore[import] + CompletionPort, + Event, + accept, + connect, + get_accept_addrs, + have_connectex, + makesockaddr, + maxAddrLen, + recv, + recvfrom, + send, +) diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/notes.txt b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/notes.txt new file mode 100644 index 0000000000..4caffb882f --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/notes.txt @@ -0,0 +1,24 @@ +test specifically: +failed accept error message -- similar to test_tcp_internals +immediate success on accept/connect/recv, including Event.ignore +parametrize iocpsupport somehow -- via reactor? + +do: +break handling -- WaitForSingleObject on the IOCP handle? +iovecs for write buffer +do not wait for a mainloop iteration if resumeProducing (in _handleWrite) does startWriting +don't addActiveHandle in every call to startWriting/startReading +iocpified process support + win32er-in-a-thread (or run GQCS in a thread -- it can't receive SIGBREAK) +blocking in sendto() -- I think Windows can do that, especially with local UDP + +buildbot: +run in vmware +start from a persistent snapshot + +use a stub inside the vm to svnup/run tests/collect stdio +lift logs through SMB? or ship them via tcp beams to the VM host + +have a timeout on the test run +if we time out, take a screenshot, save it, kill the VM + diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/reactor.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/reactor.py new file mode 100644 index 0000000000..e9c3716219 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/reactor.py @@ -0,0 +1,285 @@ +# -*- test-case-name: twisted.internet.test.test_iocp -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Reactor that uses IO completion ports +""" + + +import socket +import sys +import warnings +from typing import Tuple, Type + +from zope.interface import implementer + +from twisted.internet import base, error, interfaces, main +from twisted.internet._dumbwin32proc import Process +from twisted.internet.iocpreactor import iocpsupport as _iocp, tcp, udp +from twisted.internet.iocpreactor.const import WAIT_TIMEOUT +from twisted.internet.win32eventreactor import _ThreadedWin32EventsMixin +from twisted.python import failure, log + +try: + from twisted.protocols.tls import TLSMemoryBIOFactory as _TLSMemoryBIOFactory +except ImportError: + TLSMemoryBIOFactory = None + # Either pyOpenSSL isn't installed, or it is too old for this code to work. + # The reactor won't provide IReactorSSL. + _extraInterfaces: Tuple[Type[interfaces.IReactorSSL], ...] = () + warnings.warn( + "pyOpenSSL 0.10 or newer is required for SSL support in iocpreactor. " + "It is missing, so the reactor will not support SSL APIs." + ) +else: + TLSMemoryBIOFactory = _TLSMemoryBIOFactory + _extraInterfaces = (interfaces.IReactorSSL,) + +MAX_TIMEOUT = 2000 # 2 seconds, see doIteration for explanation + +EVENTS_PER_LOOP = 1000 # XXX: what's a good value here? + +# keys to associate with normal and waker events +KEY_NORMAL, KEY_WAKEUP = range(2) + +_NO_GETHANDLE = error.ConnectionFdescWentAway("Handler has no getFileHandle method") +_NO_FILEDESC = error.ConnectionFdescWentAway("Filedescriptor went away") + + +@implementer( + interfaces.IReactorTCP, + interfaces.IReactorUDP, + interfaces.IReactorMulticast, + interfaces.IReactorProcess, + *_extraInterfaces, +) +class IOCPReactor(base.ReactorBase, _ThreadedWin32EventsMixin): + port = None + + def __init__(self): + base.ReactorBase.__init__(self) + self.port = _iocp.CompletionPort() + self.handles = set() + + def addActiveHandle(self, handle): + self.handles.add(handle) + + def removeActiveHandle(self, handle): + self.handles.discard(handle) + + def doIteration(self, timeout): + """ + Poll the IO completion port for new events. + """ + # This function sits and waits for an IO completion event. + # + # There are two requirements: process IO events as soon as they arrive + # and process ctrl-break from the user in a reasonable amount of time. + # + # There are three kinds of waiting. + # 1) GetQueuedCompletionStatus (self.port.getEvent) to wait for IO + # events only. + # 2) Msg* family of wait functions that can stop waiting when + # ctrl-break is detected (then, I think, Python converts it into a + # KeyboardInterrupt) + # 3) *Ex family of wait functions that put the thread into an + # "alertable" wait state which is supposedly triggered by IO completion + # + # 2) and 3) can be combined. Trouble is, my IO completion is not + # causing 3) to trigger, possibly because I do not use an IO completion + # callback. Windows is weird. + # There are two ways to handle this. I could use MsgWaitForSingleObject + # here and GetQueuedCompletionStatus in a thread. Or I could poll with + # a reasonable interval. Guess what! Threads are hard. + + processed_events = 0 + if timeout is None: + timeout = MAX_TIMEOUT + else: + timeout = min(MAX_TIMEOUT, int(1000 * timeout)) + rc, numBytes, key, evt = self.port.getEvent(timeout) + while 1: + if rc == WAIT_TIMEOUT: + break + if key != KEY_WAKEUP: + assert key == KEY_NORMAL + log.callWithLogger( + evt.owner, self._callEventCallback, rc, numBytes, evt + ) + processed_events += 1 + if processed_events >= EVENTS_PER_LOOP: + break + rc, numBytes, key, evt = self.port.getEvent(0) + + def _callEventCallback(self, rc, numBytes, evt): + owner = evt.owner + why = None + try: + evt.callback(rc, numBytes, evt) + handfn = getattr(owner, "getFileHandle", None) + if not handfn: + why = _NO_GETHANDLE + elif handfn() == -1: + why = _NO_FILEDESC + if why: + return # ignore handles that were closed + except BaseException: + why = sys.exc_info()[1] + log.err() + if why: + owner.loseConnection(failure.Failure(why)) + + def installWaker(self): + pass + + def wakeUp(self): + self.port.postEvent(0, KEY_WAKEUP, None) + + def registerHandle(self, handle): + self.port.addHandle(handle, KEY_NORMAL) + + def createSocket(self, af, stype): + skt = socket.socket(af, stype) + self.registerHandle(skt.fileno()) + return skt + + def listenTCP(self, port, factory, backlog=50, interface=""): + """ + @see: twisted.internet.interfaces.IReactorTCP.listenTCP + """ + p = tcp.Port(port, factory, backlog, interface, self) + p.startListening() + return p + + def connectTCP(self, host, port, factory, timeout=30, bindAddress=None): + """ + @see: twisted.internet.interfaces.IReactorTCP.connectTCP + """ + c = tcp.Connector(host, port, factory, timeout, bindAddress, self) + c.connect() + return c + + if TLSMemoryBIOFactory is not None: + + def listenSSL(self, port, factory, contextFactory, backlog=50, interface=""): + """ + @see: twisted.internet.interfaces.IReactorSSL.listenSSL + """ + port = self.listenTCP( + port, + TLSMemoryBIOFactory(contextFactory, False, factory), + backlog, + interface, + ) + port._type = "TLS" + return port + + def connectSSL( + self, host, port, factory, contextFactory, timeout=30, bindAddress=None + ): + """ + @see: twisted.internet.interfaces.IReactorSSL.connectSSL + """ + return self.connectTCP( + host, + port, + TLSMemoryBIOFactory(contextFactory, True, factory), + timeout, + bindAddress, + ) + + else: + + def listenSSL(self, port, factory, contextFactory, backlog=50, interface=""): + """ + Non-implementation of L{IReactorSSL.listenSSL}. Some dependency + is not satisfied. This implementation always raises + L{NotImplementedError}. + """ + raise NotImplementedError( + "pyOpenSSL 0.10 or newer is required for SSL support in " + "iocpreactor. It is missing, so the reactor does not support " + "SSL APIs." + ) + + def connectSSL( + self, host, port, factory, contextFactory, timeout=30, bindAddress=None + ): + """ + Non-implementation of L{IReactorSSL.connectSSL}. Some dependency + is not satisfied. This implementation always raises + L{NotImplementedError}. + """ + raise NotImplementedError( + "pyOpenSSL 0.10 or newer is required for SSL support in " + "iocpreactor. It is missing, so the reactor does not support " + "SSL APIs." + ) + + def listenUDP(self, port, protocol, interface="", maxPacketSize=8192): + """ + Connects a given L{DatagramProtocol} to the given numeric UDP port. + + @returns: object conforming to L{IListeningPort}. + """ + p = udp.Port(port, protocol, interface, maxPacketSize, self) + p.startListening() + return p + + def listenMulticast( + self, port, protocol, interface="", maxPacketSize=8192, listenMultiple=False + ): + """ + Connects a given DatagramProtocol to the given numeric UDP port. + + EXPERIMENTAL. + + @returns: object conforming to IListeningPort. + """ + p = udp.MulticastPort( + port, protocol, interface, maxPacketSize, self, listenMultiple + ) + p.startListening() + return p + + def spawnProcess( + self, + processProtocol, + executable, + args=(), + env={}, + path=None, + uid=None, + gid=None, + usePTY=0, + childFDs=None, + ): + """ + Spawn a process. + """ + if uid is not None: + raise ValueError("Setting UID is unsupported on this platform.") + if gid is not None: + raise ValueError("Setting GID is unsupported on this platform.") + if usePTY: + raise ValueError("PTYs are unsupported on this platform.") + if childFDs is not None: + raise ValueError( + "Custom child file descriptor mappings are unsupported on " + "this platform." + ) + return Process(self, processProtocol, executable, args, env, path) + + def removeAll(self): + res = list(self.handles) + self.handles.clear() + return res + + +def install(): + r = IOCPReactor() + main.installReactor(r) + + +__all__ = ["IOCPReactor", "install"] diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/tcp.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/tcp.py new file mode 100644 index 0000000000..aadd685269 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/tcp.py @@ -0,0 +1,608 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +TCP support for IOCP reactor +""" + +import errno +import socket +import struct +from typing import Optional + +from zope.interface import classImplements, implementer + +from twisted.internet import address, defer, error, interfaces, main +from twisted.internet.abstract import _LogOwner, isIPv6Address +from twisted.internet.iocpreactor import abstract, iocpsupport as _iocp +from twisted.internet.iocpreactor.const import ( + ERROR_CONNECTION_REFUSED, + ERROR_IO_PENDING, + ERROR_NETWORK_UNREACHABLE, + SO_UPDATE_ACCEPT_CONTEXT, + SO_UPDATE_CONNECT_CONTEXT, +) +from twisted.internet.iocpreactor.interfaces import IReadWriteHandle +from twisted.internet.protocol import Protocol +from twisted.internet.tcp import ( + Connector as TCPConnector, + _AbortingMixin, + _BaseBaseClient, + _BaseTCPClient, + _getsockname, + _resolveIPv6, + _SocketCloser, +) +from twisted.python import failure, log, reflect + +try: + from twisted.internet._newtls import startTLS as __startTLS +except ImportError: + _startTLS = None +else: + _startTLS = __startTLS + + +# ConnectEx returns these. XXX: find out what it does for timeout +connectExErrors = { + ERROR_CONNECTION_REFUSED: errno.WSAECONNREFUSED, # type: ignore[attr-defined] + ERROR_NETWORK_UNREACHABLE: errno.WSAENETUNREACH, # type: ignore[attr-defined] +} + + +@implementer(IReadWriteHandle, interfaces.ITCPTransport, interfaces.ISystemHandle) +class Connection(abstract.FileHandle, _SocketCloser, _AbortingMixin): + """ + @ivar TLS: C{False} to indicate the connection is in normal TCP mode, + C{True} to indicate that TLS has been started and that operations must + be routed through the L{TLSMemoryBIOProtocol} instance. + """ + + TLS = False + + def __init__(self, sock, proto, reactor=None): + abstract.FileHandle.__init__(self, reactor) + self.socket = sock + self.getFileHandle = sock.fileno + self.protocol = proto + + def getHandle(self): + return self.socket + + def dataReceived(self, rbuffer): + """ + @param rbuffer: Data received. + @type rbuffer: L{bytes} or L{bytearray} + """ + if isinstance(rbuffer, bytes): + pass + elif isinstance(rbuffer, bytearray): + # XXX: some day, we'll have protocols that can handle raw buffers + rbuffer = bytes(rbuffer) + else: + raise TypeError("data must be bytes or bytearray, not " + type(rbuffer)) + + self.protocol.dataReceived(rbuffer) + + def readFromHandle(self, bufflist, evt): + return _iocp.recv(self.getFileHandle(), bufflist, evt) + + def writeToHandle(self, buff, evt): + """ + Send C{buff} to current file handle using C{_iocp.send}. The buffer + sent is limited to a size of C{self.SEND_LIMIT}. + """ + writeView = memoryview(buff) + return _iocp.send( + self.getFileHandle(), writeView[0 : self.SEND_LIMIT].tobytes(), evt + ) + + def _closeWriteConnection(self): + try: + self.socket.shutdown(1) + except OSError: + pass + p = interfaces.IHalfCloseableProtocol(self.protocol, None) + if p: + try: + p.writeConnectionLost() + except BaseException: + f = failure.Failure() + log.err() + self.connectionLost(f) + + def readConnectionLost(self, reason): + p = interfaces.IHalfCloseableProtocol(self.protocol, None) + if p: + try: + p.readConnectionLost() + except BaseException: + log.err() + self.connectionLost(failure.Failure()) + else: + self.connectionLost(reason) + + def connectionLost(self, reason): + if self.disconnected: + return + abstract.FileHandle.connectionLost(self, reason) + isClean = reason is None or not reason.check(error.ConnectionAborted) + self._closeSocket(isClean) + protocol = self.protocol + del self.protocol + del self.socket + del self.getFileHandle + protocol.connectionLost(reason) + + def logPrefix(self): + """ + Return the prefix to log with when I own the logging thread. + """ + return self.logstr + + def getTcpNoDelay(self): + return bool(self.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)) + + def setTcpNoDelay(self, enabled): + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled) + + def getTcpKeepAlive(self): + return bool(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE)) + + def setTcpKeepAlive(self, enabled): + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled) + + if _startTLS is not None: + + def startTLS(self, contextFactory, normal=True): + """ + @see: L{ITLSTransport.startTLS} + """ + _startTLS(self, contextFactory, normal, abstract.FileHandle) + + def write(self, data): + """ + Write some data, either directly to the underlying handle or, if TLS + has been started, to the L{TLSMemoryBIOProtocol} for it to encrypt and + send. + + @see: L{twisted.internet.interfaces.ITransport.write} + """ + if self.disconnected: + return + if self.TLS: + self.protocol.write(data) + else: + abstract.FileHandle.write(self, data) + + def writeSequence(self, iovec): + """ + Write some data, either directly to the underlying handle or, if TLS + has been started, to the L{TLSMemoryBIOProtocol} for it to encrypt and + send. + + @see: L{twisted.internet.interfaces.ITransport.writeSequence} + """ + if self.disconnected: + return + if self.TLS: + self.protocol.writeSequence(iovec) + else: + abstract.FileHandle.writeSequence(self, iovec) + + def loseConnection(self, reason=None): + """ + Close the underlying handle or, if TLS has been started, first shut it + down. + + @see: L{twisted.internet.interfaces.ITransport.loseConnection} + """ + if self.TLS: + if self.connected and not self.disconnecting: + self.protocol.loseConnection() + else: + abstract.FileHandle.loseConnection(self, reason) + + def registerProducer(self, producer, streaming): + """ + Register a producer. + + If TLS is enabled, the TLS connection handles this. + """ + if self.TLS: + # Registering a producer before we're connected shouldn't be a + # problem. If we end up with a write(), that's already handled in + # the write() code above, and there are no other potential + # side-effects. + self.protocol.registerProducer(producer, streaming) + else: + abstract.FileHandle.registerProducer(self, producer, streaming) + + def unregisterProducer(self): + """ + Unregister a producer. + + If TLS is enabled, the TLS connection handles this. + """ + if self.TLS: + self.protocol.unregisterProducer() + else: + abstract.FileHandle.unregisterProducer(self) + + def getHost(self): + # ITCPTransport.getHost + pass + + def getPeer(self): + # ITCPTransport.getPeer + pass + + +if _startTLS is not None: + classImplements(Connection, interfaces.ITLSTransport) + + +class Client(_BaseBaseClient, _BaseTCPClient, Connection): + """ + @ivar _tlsClientDefault: Always C{True}, indicating that this is a client + connection, and by default when TLS is negotiated this class will act as + a TLS client. + """ + + addressFamily = socket.AF_INET + socketType = socket.SOCK_STREAM + + _tlsClientDefault = True + _commonConnection = Connection + + def __init__(self, host, port, bindAddress, connector, reactor): + # ConnectEx documentation says socket _has_ to be bound + if bindAddress is None: + bindAddress = ("", 0) + self.reactor = reactor # createInternetSocket needs this + _BaseTCPClient.__init__(self, host, port, bindAddress, connector, reactor) + + def createInternetSocket(self): + """ + Create a socket registered with the IOCP reactor. + + @see: L{_BaseTCPClient} + """ + return self.reactor.createSocket(self.addressFamily, self.socketType) + + def _collectSocketDetails(self): + """ + Clean up potentially circular references to the socket and to its + C{getFileHandle} method. + + @see: L{_BaseBaseClient} + """ + del self.socket, self.getFileHandle + + def _stopReadingAndWriting(self): + """ + Remove the active handle from the reactor. + + @see: L{_BaseBaseClient} + """ + self.reactor.removeActiveHandle(self) + + def cbConnect(self, rc, data, evt): + if rc: + rc = connectExErrors.get(rc, rc) + self.failIfNotConnected( + error.getConnectError((rc, errno.errorcode.get(rc, "Unknown error"))) + ) + else: + self.socket.setsockopt( + socket.SOL_SOCKET, + SO_UPDATE_CONNECT_CONTEXT, + struct.pack("P", self.socket.fileno()), + ) + self.protocol = self.connector.buildProtocol(self.getPeer()) + self.connected = True + logPrefix = self._getLogPrefix(self.protocol) + self.logstr = logPrefix + ",client" + if self.protocol is None: + # Factory.buildProtocol is allowed to return None. In that + # case, make up a protocol to satisfy the rest of the + # implementation; connectionLost is going to be called on + # something, for example. This is easier than adding special + # case support for a None protocol throughout the rest of the + # transport implementation. + self.protocol = Protocol() + # But dispose of the connection quickly. + self.loseConnection() + else: + self.protocol.makeConnection(self) + self.startReading() + + def doConnect(self): + if not hasattr(self, "connector"): + # this happens if we connector.stopConnecting in + # factory.startedConnecting + return + assert _iocp.have_connectex + self.reactor.addActiveHandle(self) + evt = _iocp.Event(self.cbConnect, self) + + rc = _iocp.connect(self.socket.fileno(), self.realAddress, evt) + if rc and rc != ERROR_IO_PENDING: + self.cbConnect(rc, 0, evt) + + +class Server(Connection): + """ + Serverside socket-stream connection class. + + I am a serverside network connection transport; a socket which came from an + accept() on a server. + + @ivar _tlsClientDefault: Always C{False}, indicating that this is a server + connection, and by default when TLS is negotiated this class will act as + a TLS server. + """ + + _tlsClientDefault = False + + def __init__(self, sock, protocol, clientAddr, serverAddr, sessionno, reactor): + """ + Server(sock, protocol, client, server, sessionno) + + Initialize me with a socket, a protocol, a descriptor for my peer (a + tuple of host, port describing the other end of the connection), an + instance of Port, and a session number. + """ + Connection.__init__(self, sock, protocol, reactor) + self.serverAddr = serverAddr + self.clientAddr = clientAddr + self.sessionno = sessionno + logPrefix = self._getLogPrefix(self.protocol) + self.logstr = f"{logPrefix},{sessionno},{self.clientAddr.host}" + self.repstr: str = "<{} #{} on {}>".format( + self.protocol.__class__.__name__, + self.sessionno, + self.serverAddr.port, + ) + self.connected = True + self.startReading() + + def __repr__(self) -> str: + """ + A string representation of this connection. + """ + return self.repstr + + def getHost(self): + """ + Returns an IPv4Address. + + This indicates the server's address. + """ + return self.serverAddr + + def getPeer(self): + """ + Returns an IPv4Address. + + This indicates the client's address. + """ + return self.clientAddr + + +class Connector(TCPConnector): + def _makeTransport(self): + return Client(self.host, self.port, self.bindAddress, self, self.reactor) + + +@implementer(interfaces.IListeningPort) +class Port(_SocketCloser, _LogOwner): + connected = False + disconnected = False + disconnecting = False + addressFamily = socket.AF_INET + socketType = socket.SOCK_STREAM + _addressType = address.IPv4Address + sessionno = 0 + + # Actual port number being listened on, only set to a non-None + # value when we are actually listening. + _realPortNumber: Optional[int] = None + + # A string describing the connections which will be created by this port. + # Normally this is C{"TCP"}, since this is a TCP port, but when the TLS + # implementation re-uses this class it overrides the value with C{"TLS"}. + # Only used for logging. + _type = "TCP" + + def __init__(self, port, factory, backlog=50, interface="", reactor=None): + self.port = port + self.factory = factory + self.backlog = backlog + self.interface = interface + self.reactor = reactor + if isIPv6Address(interface): + self.addressFamily = socket.AF_INET6 + self._addressType = address.IPv6Address + + def __repr__(self) -> str: + if self._realPortNumber is not None: + return "<{} of {} on {}>".format( + self.__class__, + self.factory.__class__, + self._realPortNumber, + ) + else: + return "<{} of {} (not listening)>".format( + self.__class__, + self.factory.__class__, + ) + + def startListening(self): + try: + skt = self.reactor.createSocket(self.addressFamily, self.socketType) + # TODO: resolve self.interface if necessary + if self.addressFamily == socket.AF_INET6: + addr = _resolveIPv6(self.interface, self.port) + else: + addr = (self.interface, self.port) + skt.bind(addr) + except OSError as le: + raise error.CannotListenError(self.interface, self.port, le) + + self.addrLen = _iocp.maxAddrLen(skt.fileno()) + + # Make sure that if we listened on port 0, we update that to + # reflect what the OS actually assigned us. + self._realPortNumber = skt.getsockname()[1] + + log.msg( + "%s starting on %s" + % (self._getLogPrefix(self.factory), self._realPortNumber) + ) + + self.factory.doStart() + skt.listen(self.backlog) + self.connected = True + self.disconnected = False + self.reactor.addActiveHandle(self) + self.socket = skt + self.getFileHandle = self.socket.fileno + self.doAccept() + + def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)): + """ + Stop accepting connections on this port. + + This will shut down my socket and call self.connectionLost(). + It returns a deferred which will fire successfully when the + port is actually closed. + """ + self.disconnecting = True + if self.connected: + self.deferred = defer.Deferred() + self.reactor.callLater(0, self.connectionLost, connDone) + return self.deferred + + stopListening = loseConnection + + def _logConnectionLostMsg(self): + """ + Log message for closing port + """ + log.msg(f"({self._type} Port {self._realPortNumber} Closed)") + + def connectionLost(self, reason): + """ + Cleans up the socket. + """ + self._logConnectionLostMsg() + self._realPortNumber = None + d = None + if hasattr(self, "deferred"): + d = self.deferred + del self.deferred + + self.disconnected = True + self.reactor.removeActiveHandle(self) + self.connected = False + self._closeSocket(True) + del self.socket + del self.getFileHandle + + try: + self.factory.doStop() + except BaseException: + self.disconnecting = False + if d is not None: + d.errback(failure.Failure()) + else: + raise + else: + self.disconnecting = False + if d is not None: + d.callback(None) + + def logPrefix(self): + """ + Returns the name of my class, to prefix log entries with. + """ + return reflect.qual(self.factory.__class__) + + def getHost(self): + """ + Returns an IPv4Address or IPv6Address. + + This indicates the server's address. + """ + return self._addressType("TCP", *_getsockname(self.socket)) + + def cbAccept(self, rc, data, evt): + self.handleAccept(rc, evt) + if not (self.disconnecting or self.disconnected): + self.doAccept() + + def handleAccept(self, rc, evt): + if self.disconnecting or self.disconnected: + return False + + # possible errors: + # (WSAEMFILE, WSAENOBUFS, WSAENFILE, WSAENOMEM, WSAECONNABORTED) + if rc: + log.msg( + "Could not accept new connection -- %s (%s)" + % (errno.errorcode.get(rc, "unknown error"), rc) + ) + return False + else: + # Inherit the properties from the listening port socket as + # documented in the `Remarks` section of AcceptEx. + # https://docs.microsoft.com/en-us/windows/win32/api/mswsock/nf-mswsock-acceptex + # In this way we can call getsockname and getpeername on the + # accepted socket. + evt.newskt.setsockopt( + socket.SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, + struct.pack("P", self.socket.fileno()), + ) + family, lAddr, rAddr = _iocp.get_accept_addrs(evt.newskt.fileno(), evt.buff) + assert family == self.addressFamily + + # Build an IPv6 address that includes the scopeID, if necessary + if "%" in lAddr[0]: + scope = int(lAddr[0].split("%")[1]) + lAddr = (lAddr[0], lAddr[1], 0, scope) + if "%" in rAddr[0]: + scope = int(rAddr[0].split("%")[1]) + rAddr = (rAddr[0], rAddr[1], 0, scope) + + protocol = self.factory.buildProtocol(self._addressType("TCP", *rAddr)) + if protocol is None: + evt.newskt.close() + else: + s = self.sessionno + self.sessionno = s + 1 + transport = Server( + evt.newskt, + protocol, + self._addressType("TCP", *rAddr), + self._addressType("TCP", *lAddr), + s, + self.reactor, + ) + protocol.makeConnection(transport) + return True + + def doAccept(self): + evt = _iocp.Event(self.cbAccept, self) + + # see AcceptEx documentation + evt.buff = buff = bytearray(2 * (self.addrLen + 16)) + + evt.newskt = newskt = self.reactor.createSocket( + self.addressFamily, self.socketType + ) + rc = _iocp.accept(self.socket.fileno(), newskt.fileno(), buff, evt) + + if rc and rc != ERROR_IO_PENDING: + self.handleAccept(rc, evt) diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/udp.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/udp.py new file mode 100644 index 0000000000..59c5fefb4b --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/udp.py @@ -0,0 +1,428 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +UDP support for IOCP reactor +""" + +import errno +import socket +import struct +import warnings +from typing import Optional + +from zope.interface import implementer + +from twisted.internet import address, defer, error, interfaces +from twisted.internet.abstract import isIPAddress, isIPv6Address +from twisted.internet.iocpreactor import abstract, iocpsupport as _iocp +from twisted.internet.iocpreactor.const import ( + ERROR_CONNECTION_REFUSED, + ERROR_IO_PENDING, + ERROR_PORT_UNREACHABLE, +) +from twisted.internet.iocpreactor.interfaces import IReadWriteHandle +from twisted.python import failure, log + + +@implementer( + IReadWriteHandle, + interfaces.IListeningPort, + interfaces.IUDPTransport, + interfaces.ISystemHandle, +) +class Port(abstract.FileHandle): + """ + UDP port, listening for packets. + + @ivar addressFamily: L{socket.AF_INET} or L{socket.AF_INET6}, depending on + whether this port is listening on an IPv4 address or an IPv6 address. + """ + + addressFamily = socket.AF_INET + socketType = socket.SOCK_DGRAM + dynamicReadBuffers = False + + # Actual port number being listened on, only set to a non-None + # value when we are actually listening. + _realPortNumber: Optional[int] = None + + def __init__(self, port, proto, interface="", maxPacketSize=8192, reactor=None): + """ + Initialize with a numeric port to listen on. + """ + self.port = port + self.protocol = proto + self.readBufferSize = maxPacketSize + self.interface = interface + self.setLogStr() + self._connectedAddr = None + self._setAddressFamily() + + abstract.FileHandle.__init__(self, reactor) + + skt = socket.socket(self.addressFamily, self.socketType) + addrLen = _iocp.maxAddrLen(skt.fileno()) + self.addressBuffer = bytearray(addrLen) + # WSARecvFrom takes an int + self.addressLengthBuffer = bytearray(struct.calcsize("i")) + + def _setAddressFamily(self): + """ + Resolve address family for the socket. + """ + if isIPv6Address(self.interface): + self.addressFamily = socket.AF_INET6 + elif isIPAddress(self.interface): + self.addressFamily = socket.AF_INET + elif self.interface: + raise error.InvalidAddressError( + self.interface, "not an IPv4 or IPv6 address" + ) + + def __repr__(self) -> str: + if self._realPortNumber is not None: + return f"<{self.protocol.__class__} on {self._realPortNumber}>" + else: + return f"<{self.protocol.__class__} not connected>" + + def getHandle(self): + """ + Return a socket object. + """ + return self.socket + + def startListening(self): + """ + Create and bind my socket, and begin listening on it. + + This is called on unserialization, and must be called after creating a + server to begin listening on the specified port. + """ + self._bindSocket() + self._connectToProtocol() + + def createSocket(self): + return self.reactor.createSocket(self.addressFamily, self.socketType) + + def _bindSocket(self): + try: + skt = self.createSocket() + skt.bind((self.interface, self.port)) + except OSError as le: + raise error.CannotListenError(self.interface, self.port, le) + + # Make sure that if we listened on port 0, we update that to + # reflect what the OS actually assigned us. + self._realPortNumber = skt.getsockname()[1] + + log.msg( + "%s starting on %s" + % (self._getLogPrefix(self.protocol), self._realPortNumber) + ) + + self.connected = True + self.socket = skt + self.getFileHandle = self.socket.fileno + + def _connectToProtocol(self): + self.protocol.makeConnection(self) + self.startReading() + self.reactor.addActiveHandle(self) + + def cbRead(self, rc, data, evt): + if self.reading: + self.handleRead(rc, data, evt) + self.doRead() + + def handleRead(self, rc, data, evt): + if rc in ( + errno.WSAECONNREFUSED, + errno.WSAECONNRESET, + ERROR_CONNECTION_REFUSED, + ERROR_PORT_UNREACHABLE, + ): + if self._connectedAddr: + self.protocol.connectionRefused() + elif rc: + log.msg( + "error in recvfrom -- %s (%s)" + % (errno.errorcode.get(rc, "unknown error"), rc) + ) + else: + try: + self.protocol.datagramReceived( + bytes(evt.buff[:data]), _iocp.makesockaddr(evt.addr_buff) + ) + except BaseException: + log.err() + + def doRead(self): + evt = _iocp.Event(self.cbRead, self) + + evt.buff = buff = self._readBuffers[0] + evt.addr_buff = addr_buff = self.addressBuffer + evt.addr_len_buff = addr_len_buff = self.addressLengthBuffer + rc, data = _iocp.recvfrom( + self.getFileHandle(), buff, addr_buff, addr_len_buff, evt + ) + + if rc and rc != ERROR_IO_PENDING: + # If the error was not 0 or IO_PENDING then that means recvfrom() hit a + # failure condition. In this situation recvfrom() gives us our response + # right away and we don't need to wait for Windows to call the callback + # on our event. In fact, windows will not call it for us so we must call it + # ourselves manually + self.reactor.callLater(0, self.cbRead, rc, data, evt) + + def write(self, datagram, addr=None): + """ + Write a datagram. + + @param addr: should be a tuple (ip, port), can be None in connected + mode. + """ + if self._connectedAddr: + assert addr in (None, self._connectedAddr) + try: + return self.socket.send(datagram) + except OSError as se: + no = se.args[0] + if no == errno.WSAEINTR: + return self.write(datagram) + elif no == errno.WSAEMSGSIZE: + raise error.MessageLengthError("message too long") + elif no in ( + errno.WSAECONNREFUSED, + errno.WSAECONNRESET, + ERROR_CONNECTION_REFUSED, + ERROR_PORT_UNREACHABLE, + ): + self.protocol.connectionRefused() + else: + raise + else: + assert addr != None + if ( + not isIPAddress(addr[0]) + and not isIPv6Address(addr[0]) + and addr[0] != "<broadcast>" + ): + raise error.InvalidAddressError( + addr[0], "write() only accepts IP addresses, not hostnames" + ) + if isIPAddress(addr[0]) and self.addressFamily == socket.AF_INET6: + raise error.InvalidAddressError( + addr[0], "IPv6 port write() called with IPv4 address" + ) + if isIPv6Address(addr[0]) and self.addressFamily == socket.AF_INET: + raise error.InvalidAddressError( + addr[0], "IPv4 port write() called with IPv6 address" + ) + try: + return self.socket.sendto(datagram, addr) + except OSError as se: + no = se.args[0] + if no == errno.WSAEINTR: + return self.write(datagram, addr) + elif no == errno.WSAEMSGSIZE: + raise error.MessageLengthError("message too long") + elif no in ( + errno.WSAECONNREFUSED, + errno.WSAECONNRESET, + ERROR_CONNECTION_REFUSED, + ERROR_PORT_UNREACHABLE, + ): + # in non-connected UDP ECONNREFUSED is platform dependent, + # I think and the info is not necessarily useful. + # Nevertheless maybe we should call connectionRefused? XXX + return + else: + raise + + def writeSequence(self, seq, addr): + self.write(b"".join(seq), addr) + + def connect(self, host, port): + """ + 'Connect' to remote server. + """ + if self._connectedAddr: + raise RuntimeError( + "already connected, reconnecting is not currently supported " + "(talk to itamar if you want this)" + ) + if not isIPAddress(host) and not isIPv6Address(host): + raise error.InvalidAddressError(host, "not an IPv4 or IPv6 address.") + self._connectedAddr = (host, port) + self.socket.connect((host, port)) + + def _loseConnection(self): + self.stopReading() + self.reactor.removeActiveHandle(self) + if self.connected: # actually means if we are *listening* + self.reactor.callLater(0, self.connectionLost) + + def stopListening(self): + if self.connected: + result = self.d = defer.Deferred() + else: + result = None + self._loseConnection() + return result + + def loseConnection(self): + warnings.warn( + "Please use stopListening() to disconnect port", + DeprecationWarning, + stacklevel=2, + ) + self.stopListening() + + def connectionLost(self, reason=None): + """ + Cleans up my socket. + """ + log.msg("(UDP Port %s Closed)" % self._realPortNumber) + self._realPortNumber = None + abstract.FileHandle.connectionLost(self, reason) + self.protocol.doStop() + self.socket.close() + del self.socket + del self.getFileHandle + if hasattr(self, "d"): + self.d.callback(None) + del self.d + + def setLogStr(self): + """ + Initialize the C{logstr} attribute to be used by C{logPrefix}. + """ + logPrefix = self._getLogPrefix(self.protocol) + self.logstr = "%s (UDP)" % logPrefix + + def logPrefix(self): + """ + Returns the name of my class, to prefix log entries with. + """ + return self.logstr + + def getHost(self): + """ + Return the local address of the UDP connection + + @returns: the local address of the UDP connection + @rtype: L{IPv4Address} or L{IPv6Address} + """ + addr = self.socket.getsockname() + if self.addressFamily == socket.AF_INET: + return address.IPv4Address("UDP", *addr) + elif self.addressFamily == socket.AF_INET6: + return address.IPv6Address("UDP", *(addr[:2])) + + def setBroadcastAllowed(self, enabled): + """ + Set whether this port may broadcast. This is disabled by default. + + @param enabled: Whether the port may broadcast. + @type enabled: L{bool} + """ + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, enabled) + + def getBroadcastAllowed(self): + """ + Checks if broadcast is currently allowed on this port. + + @return: Whether this port may broadcast. + @rtype: L{bool} + """ + return bool(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST)) + + +class MulticastMixin: + """ + Implement multicast functionality. + """ + + def getOutgoingInterface(self): + i = self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF) + return socket.inet_ntoa(struct.pack("@i", i)) + + def setOutgoingInterface(self, addr): + """ + Returns Deferred of success. + """ + return self.reactor.resolve(addr).addCallback(self._setInterface) + + def _setInterface(self, addr): + i = socket.inet_aton(addr) + self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, i) + return 1 + + def getLoopbackMode(self): + return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP) + + def setLoopbackMode(self, mode): + mode = struct.pack("b", bool(mode)) + self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, mode) + + def getTTL(self): + return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL) + + def setTTL(self, ttl): + ttl = struct.pack("B", ttl) + self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl) + + def joinGroup(self, addr, interface=""): + """ + Join a multicast group. Returns Deferred of success. + """ + return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 1) + + def _joinAddr1(self, addr, interface, join): + return self.reactor.resolve(interface).addCallback(self._joinAddr2, addr, join) + + def _joinAddr2(self, interface, addr, join): + addr = socket.inet_aton(addr) + interface = socket.inet_aton(interface) + if join: + cmd = socket.IP_ADD_MEMBERSHIP + else: + cmd = socket.IP_DROP_MEMBERSHIP + try: + self.socket.setsockopt(socket.IPPROTO_IP, cmd, addr + interface) + except OSError as e: + return failure.Failure(error.MulticastJoinError(addr, interface, *e.args)) + + def leaveGroup(self, addr, interface=""): + """ + Leave multicast group, return Deferred of success. + """ + return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 0) + + +@implementer(interfaces.IMulticastTransport) +class MulticastPort(MulticastMixin, Port): + """ + UDP Port that supports multicasting. + """ + + def __init__( + self, + port, + proto, + interface="", + maxPacketSize=8192, + reactor=None, + listenMultiple=False, + ): + Port.__init__(self, port, proto, interface, maxPacketSize, reactor) + self.listenMultiple = listenMultiple + + def createSocket(self): + skt = Port.createSocket(self) + if self.listenMultiple: + skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + if hasattr(socket, "SO_REUSEPORT"): + skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + return skt |