aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/internet/iocpreactor
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/internet/iocpreactor
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/internet/iocpreactor')
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/iocpreactor/__init__.py10
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/iocpreactor/abstract.py387
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/iocpreactor/const.py25
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/iocpreactor/interfaces.py42
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/iocpreactor/iocpsupport.py27
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/iocpreactor/notes.txt24
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/iocpreactor/reactor.py285
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/iocpreactor/tcp.py608
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/iocpreactor/udp.py428
9 files changed, 1836 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/__init__.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/__init__.py
new file mode 100644
index 0000000000..d1881d4fe3
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/__init__.py
@@ -0,0 +1,10 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+I/O Completion Ports reactor
+"""
+
+from twisted.internet.iocpreactor.reactor import install
+
+__all__ = ["install"]
diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/abstract.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/abstract.py
new file mode 100644
index 0000000000..818c86068d
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/abstract.py
@@ -0,0 +1,387 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Abstract file handle class
+"""
+
+import errno
+
+from zope.interface import implementer
+
+from twisted.internet import error, interfaces, main
+from twisted.internet.abstract import _ConsumerMixin, _dataMustBeBytes, _LogOwner
+from twisted.internet.iocpreactor import iocpsupport as _iocp
+from twisted.internet.iocpreactor.const import ERROR_HANDLE_EOF, ERROR_IO_PENDING
+from twisted.python import failure
+
+
+@implementer(
+ interfaces.IPushProducer,
+ interfaces.IConsumer,
+ interfaces.ITransport,
+ interfaces.IHalfCloseableDescriptor,
+)
+class FileHandle(_ConsumerMixin, _LogOwner):
+ """
+ File handle that can read and write asynchronously
+ """
+
+ # read stuff
+ maxReadBuffers = 16
+ readBufferSize = 4096
+ reading = False
+ dynamicReadBuffers = True # set this to false if subclass doesn't do iovecs
+ _readNextBuffer = 0
+ _readSize = 0 # how much data we have in the read buffer
+ _readScheduled = None
+ _readScheduledInOS = False
+
+ def startReading(self):
+ self.reactor.addActiveHandle(self)
+ if not self._readScheduled and not self.reading:
+ self.reading = True
+ self._readScheduled = self.reactor.callLater(0, self._resumeReading)
+
+ def stopReading(self):
+ if self._readScheduled:
+ self._readScheduled.cancel()
+ self._readScheduled = None
+ self.reading = False
+
+ def _resumeReading(self):
+ self._readScheduled = None
+ if self._dispatchData() and not self._readScheduledInOS:
+ self.doRead()
+
+ def _dispatchData(self):
+ """
+ Dispatch previously read data. Return True if self.reading and we don't
+ have any more data
+ """
+ if not self._readSize:
+ return self.reading
+ size = self._readSize
+ full_buffers = size // self.readBufferSize
+ while self._readNextBuffer < full_buffers:
+ self.dataReceived(self._readBuffers[self._readNextBuffer])
+ self._readNextBuffer += 1
+ if not self.reading:
+ return False
+ remainder = size % self.readBufferSize
+ if remainder:
+ self.dataReceived(self._readBuffers[full_buffers][0:remainder])
+ if self.dynamicReadBuffers:
+ total_buffer_size = self.readBufferSize * len(self._readBuffers)
+ # we have one buffer too many
+ if size < total_buffer_size - self.readBufferSize:
+ del self._readBuffers[-1]
+ # we filled all buffers, so allocate one more
+ elif (
+ size == total_buffer_size
+ and len(self._readBuffers) < self.maxReadBuffers
+ ):
+ self._readBuffers.append(bytearray(self.readBufferSize))
+ self._readNextBuffer = 0
+ self._readSize = 0
+ return self.reading
+
+ def _cbRead(self, rc, data, evt):
+ self._readScheduledInOS = False
+ if self._handleRead(rc, data, evt):
+ self.doRead()
+
+ def _handleRead(self, rc, data, evt):
+ """
+ Returns False if we should stop reading for now
+ """
+ if self.disconnected:
+ return False
+ # graceful disconnection
+ if (not (rc or data)) or rc in (errno.WSAEDISCON, ERROR_HANDLE_EOF):
+ self.reactor.removeActiveHandle(self)
+ self.readConnectionLost(failure.Failure(main.CONNECTION_DONE))
+ return False
+ # XXX: not handling WSAEWOULDBLOCK
+ # ("too many outstanding overlapped I/O requests")
+ elif rc:
+ self.connectionLost(
+ failure.Failure(
+ error.ConnectionLost(
+ "read error -- %s (%s)"
+ % (errno.errorcode.get(rc, "unknown"), rc)
+ )
+ )
+ )
+ return False
+ else:
+ assert self._readSize == 0
+ assert self._readNextBuffer == 0
+ self._readSize = data
+ return self._dispatchData()
+
+ def doRead(self):
+ evt = _iocp.Event(self._cbRead, self)
+
+ evt.buff = buff = self._readBuffers
+ rc, numBytesRead = self.readFromHandle(buff, evt)
+
+ if not rc or rc == ERROR_IO_PENDING:
+ self._readScheduledInOS = True
+ else:
+ self._handleRead(rc, numBytesRead, evt)
+
+ def readFromHandle(self, bufflist, evt):
+ raise NotImplementedError() # TODO: this should default to ReadFile
+
+ def dataReceived(self, data):
+ raise NotImplementedError
+
+ def readConnectionLost(self, reason):
+ self.connectionLost(reason)
+
+ # write stuff
+ dataBuffer = b""
+ offset = 0
+ writing = False
+ _writeScheduled = None
+ _writeDisconnecting = False
+ _writeDisconnected = False
+ writeBufferSize = 2**2**2**2
+
+ def loseWriteConnection(self):
+ self._writeDisconnecting = True
+ self.startWriting()
+
+ def _closeWriteConnection(self):
+ # override in subclasses
+ pass
+
+ def writeConnectionLost(self, reason):
+ # in current code should never be called
+ self.connectionLost(reason)
+
+ def startWriting(self):
+ self.reactor.addActiveHandle(self)
+
+ if not self._writeScheduled and not self.writing:
+ self.writing = True
+ self._writeScheduled = self.reactor.callLater(0, self._resumeWriting)
+
+ def stopWriting(self):
+ if self._writeScheduled:
+ self._writeScheduled.cancel()
+ self._writeScheduled = None
+ self.writing = False
+
+ def _resumeWriting(self):
+ self._writeScheduled = None
+ self.doWrite()
+
+ def _cbWrite(self, rc, numBytesWritten, evt):
+ if self._handleWrite(rc, numBytesWritten, evt):
+ self.doWrite()
+
+ def _handleWrite(self, rc, numBytesWritten, evt):
+ """
+ Returns false if we should stop writing for now
+ """
+ if self.disconnected or self._writeDisconnected:
+ return False
+ # XXX: not handling WSAEWOULDBLOCK
+ # ("too many outstanding overlapped I/O requests")
+ if rc:
+ self.connectionLost(
+ failure.Failure(
+ error.ConnectionLost(
+ "write error -- %s (%s)"
+ % (errno.errorcode.get(rc, "unknown"), rc)
+ )
+ )
+ )
+ return False
+ else:
+ self.offset += numBytesWritten
+ # If there is nothing left to send,
+ if self.offset == len(self.dataBuffer) and not self._tempDataLen:
+ self.dataBuffer = b""
+ self.offset = 0
+ # stop writing
+ self.stopWriting()
+ # If I've got a producer who is supposed to supply me with data
+ if self.producer is not None and (
+ (not self.streamingProducer) or self.producerPaused
+ ):
+ # tell them to supply some more.
+ self.producerPaused = True
+ self.producer.resumeProducing()
+ elif self.disconnecting:
+ # But if I was previously asked to let the connection die,
+ # do so.
+ self.connectionLost(failure.Failure(main.CONNECTION_DONE))
+ elif self._writeDisconnecting:
+ # I was previously asked to half-close the connection.
+ self._writeDisconnected = True
+ self._closeWriteConnection()
+ return False
+ else:
+ return True
+
+ def doWrite(self):
+ if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
+ # If there is currently less than SEND_LIMIT bytes left to send
+ # in the string, extend it with the array data.
+ self.dataBuffer = self.dataBuffer[self.offset :] + b"".join(
+ self._tempDataBuffer
+ )
+ self.offset = 0
+ self._tempDataBuffer = []
+ self._tempDataLen = 0
+
+ evt = _iocp.Event(self._cbWrite, self)
+
+ # Send as much data as you can.
+ if self.offset:
+ sendView = memoryview(self.dataBuffer)
+ evt.buff = buff = sendView[self.offset :]
+ else:
+ evt.buff = buff = self.dataBuffer
+ rc, data = self.writeToHandle(buff, evt)
+ if rc and rc != ERROR_IO_PENDING:
+ self._handleWrite(rc, data, evt)
+
+ def writeToHandle(self, buff, evt):
+ raise NotImplementedError() # TODO: this should default to WriteFile
+
+ def write(self, data):
+ """Reliably write some data.
+
+ The data is buffered until his file descriptor is ready for writing.
+ """
+ _dataMustBeBytes(data)
+ if not self.connected or self._writeDisconnected:
+ return
+ if data:
+ self._tempDataBuffer.append(data)
+ self._tempDataLen += len(data)
+ if self.producer is not None and self.streamingProducer:
+ if len(self.dataBuffer) + self._tempDataLen > self.writeBufferSize:
+ self.producerPaused = True
+ self.producer.pauseProducing()
+ self.startWriting()
+
+ def writeSequence(self, iovec):
+ for i in iovec:
+ _dataMustBeBytes(i)
+ if not self.connected or not iovec or self._writeDisconnected:
+ return
+ self._tempDataBuffer.extend(iovec)
+ for i in iovec:
+ self._tempDataLen += len(i)
+ if self.producer is not None and self.streamingProducer:
+ if len(self.dataBuffer) + self._tempDataLen > self.writeBufferSize:
+ self.producerPaused = True
+ self.producer.pauseProducing()
+ self.startWriting()
+
+ # general stuff
+ connected = False
+ disconnected = False
+ disconnecting = False
+ logstr = "Uninitialized"
+
+ SEND_LIMIT = 128 * 1024
+
+ def __init__(self, reactor=None):
+ if not reactor:
+ from twisted.internet import reactor
+ self.reactor = reactor
+ self._tempDataBuffer = [] # will be added to dataBuffer in doWrite
+ self._tempDataLen = 0
+ self._readBuffers = [bytearray(self.readBufferSize)]
+
+ def connectionLost(self, reason):
+ """
+ The connection was lost.
+
+ This is called when the connection on a selectable object has been
+ lost. It will be called whether the connection was closed explicitly,
+ an exception occurred in an event handler, or the other end of the
+ connection closed it first.
+
+ Clean up state here, but make sure to call back up to FileDescriptor.
+ """
+
+ self.disconnected = True
+ self.connected = False
+ if self.producer is not None:
+ self.producer.stopProducing()
+ self.producer = None
+ self.stopReading()
+ self.stopWriting()
+ self.reactor.removeActiveHandle(self)
+
+ def getFileHandle(self):
+ return -1
+
+ def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
+ """
+ Close the connection at the next available opportunity.
+
+ Call this to cause this FileDescriptor to lose its connection. It will
+ first write any data that it has buffered.
+
+ If there is data buffered yet to be written, this method will cause the
+ transport to lose its connection as soon as it's done flushing its
+ write buffer. If you have a producer registered, the connection won't
+ be closed until the producer is finished. Therefore, make sure you
+ unregister your producer when it's finished, or the connection will
+ never close.
+ """
+
+ if self.connected and not self.disconnecting:
+ if self._writeDisconnected:
+ # doWrite won't trigger the connection close anymore
+ self.stopReading()
+ self.stopWriting
+ self.connectionLost(_connDone)
+ else:
+ self.stopReading()
+ self.startWriting()
+ self.disconnecting = 1
+
+ # Producer/consumer implementation
+
+ def stopConsuming(self):
+ """
+ Stop consuming data.
+
+ This is called when a producer has lost its connection, to tell the
+ consumer to go lose its connection (and break potential circular
+ references).
+ """
+ self.unregisterProducer()
+ self.loseConnection()
+
+ # producer interface implementation
+
+ def resumeProducing(self):
+ if self.connected and not self.disconnecting:
+ self.startReading()
+
+ def pauseProducing(self):
+ self.stopReading()
+
+ def stopProducing(self):
+ self.loseConnection()
+
+ def getHost(self):
+ # ITransport.getHost
+ raise NotImplementedError()
+
+ def getPeer(self):
+ # ITransport.getPeer
+ raise NotImplementedError()
+
+
+__all__ = ["FileHandle"]
diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/const.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/const.py
new file mode 100644
index 0000000000..4814425af9
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/const.py
@@ -0,0 +1,25 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+
+"""
+Windows constants for IOCP
+"""
+
+
+# this stuff should really be gotten from Windows headers via pyrex, but it
+# probably is not going to change
+
+ERROR_PORT_UNREACHABLE = 1234
+ERROR_NETWORK_UNREACHABLE = 1231
+ERROR_CONNECTION_REFUSED = 1225
+ERROR_IO_PENDING = 997
+ERROR_OPERATION_ABORTED = 995
+WAIT_TIMEOUT = 258
+ERROR_NETNAME_DELETED = 64
+ERROR_HANDLE_EOF = 38
+
+INFINITE = -1
+
+SO_UPDATE_CONNECT_CONTEXT = 0x7010
+SO_UPDATE_ACCEPT_CONTEXT = 0x700B
diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/interfaces.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/interfaces.py
new file mode 100644
index 0000000000..b161341efa
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/interfaces.py
@@ -0,0 +1,42 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+
+"""
+Interfaces for iocpreactor
+"""
+
+
+from zope.interface import Interface
+
+
+class IReadHandle(Interface):
+ def readFromHandle(bufflist, evt):
+ """
+ Read into the given buffers from this handle.
+
+ @param bufflist: the buffers to read into
+ @type bufflist: list of objects implementing the read/write buffer protocol
+
+ @param evt: an IOCP Event object
+
+ @return: tuple (return code, number of bytes read)
+ """
+
+
+class IWriteHandle(Interface):
+ def writeToHandle(buff, evt):
+ """
+ Write the given buffer to this handle.
+
+ @param buff: the buffer to write
+ @type buff: any object implementing the buffer protocol
+
+ @param evt: an IOCP Event object
+
+ @return: tuple (return code, number of bytes written)
+ """
+
+
+class IReadWriteHandle(IReadHandle, IWriteHandle):
+ pass
diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/iocpsupport.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/iocpsupport.py
new file mode 100644
index 0000000000..826c976487
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/iocpsupport.py
@@ -0,0 +1,27 @@
+__all__ = [
+ "CompletionPort",
+ "Event",
+ "accept",
+ "connect",
+ "get_accept_addrs",
+ "have_connectex",
+ "makesockaddr",
+ "maxAddrLen",
+ "recv",
+ "recvfrom",
+ "send",
+]
+
+from twisted_iocpsupport.iocpsupport import ( # type: ignore[import]
+ CompletionPort,
+ Event,
+ accept,
+ connect,
+ get_accept_addrs,
+ have_connectex,
+ makesockaddr,
+ maxAddrLen,
+ recv,
+ recvfrom,
+ send,
+)
diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/notes.txt b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/notes.txt
new file mode 100644
index 0000000000..4caffb882f
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/notes.txt
@@ -0,0 +1,24 @@
+test specifically:
+failed accept error message -- similar to test_tcp_internals
+immediate success on accept/connect/recv, including Event.ignore
+parametrize iocpsupport somehow -- via reactor?
+
+do:
+break handling -- WaitForSingleObject on the IOCP handle?
+iovecs for write buffer
+do not wait for a mainloop iteration if resumeProducing (in _handleWrite) does startWriting
+don't addActiveHandle in every call to startWriting/startReading
+iocpified process support
+ win32er-in-a-thread (or run GQCS in a thread -- it can't receive SIGBREAK)
+blocking in sendto() -- I think Windows can do that, especially with local UDP
+
+buildbot:
+run in vmware
+start from a persistent snapshot
+
+use a stub inside the vm to svnup/run tests/collect stdio
+lift logs through SMB? or ship them via tcp beams to the VM host
+
+have a timeout on the test run
+if we time out, take a screenshot, save it, kill the VM
+
diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/reactor.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/reactor.py
new file mode 100644
index 0000000000..e9c3716219
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/reactor.py
@@ -0,0 +1,285 @@
+# -*- test-case-name: twisted.internet.test.test_iocp -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Reactor that uses IO completion ports
+"""
+
+
+import socket
+import sys
+import warnings
+from typing import Tuple, Type
+
+from zope.interface import implementer
+
+from twisted.internet import base, error, interfaces, main
+from twisted.internet._dumbwin32proc import Process
+from twisted.internet.iocpreactor import iocpsupport as _iocp, tcp, udp
+from twisted.internet.iocpreactor.const import WAIT_TIMEOUT
+from twisted.internet.win32eventreactor import _ThreadedWin32EventsMixin
+from twisted.python import failure, log
+
+try:
+ from twisted.protocols.tls import TLSMemoryBIOFactory as _TLSMemoryBIOFactory
+except ImportError:
+ TLSMemoryBIOFactory = None
+ # Either pyOpenSSL isn't installed, or it is too old for this code to work.
+ # The reactor won't provide IReactorSSL.
+ _extraInterfaces: Tuple[Type[interfaces.IReactorSSL], ...] = ()
+ warnings.warn(
+ "pyOpenSSL 0.10 or newer is required for SSL support in iocpreactor. "
+ "It is missing, so the reactor will not support SSL APIs."
+ )
+else:
+ TLSMemoryBIOFactory = _TLSMemoryBIOFactory
+ _extraInterfaces = (interfaces.IReactorSSL,)
+
+MAX_TIMEOUT = 2000 # 2 seconds, see doIteration for explanation
+
+EVENTS_PER_LOOP = 1000 # XXX: what's a good value here?
+
+# keys to associate with normal and waker events
+KEY_NORMAL, KEY_WAKEUP = range(2)
+
+_NO_GETHANDLE = error.ConnectionFdescWentAway("Handler has no getFileHandle method")
+_NO_FILEDESC = error.ConnectionFdescWentAway("Filedescriptor went away")
+
+
+@implementer(
+ interfaces.IReactorTCP,
+ interfaces.IReactorUDP,
+ interfaces.IReactorMulticast,
+ interfaces.IReactorProcess,
+ *_extraInterfaces,
+)
+class IOCPReactor(base.ReactorBase, _ThreadedWin32EventsMixin):
+ port = None
+
+ def __init__(self):
+ base.ReactorBase.__init__(self)
+ self.port = _iocp.CompletionPort()
+ self.handles = set()
+
+ def addActiveHandle(self, handle):
+ self.handles.add(handle)
+
+ def removeActiveHandle(self, handle):
+ self.handles.discard(handle)
+
+ def doIteration(self, timeout):
+ """
+ Poll the IO completion port for new events.
+ """
+ # This function sits and waits for an IO completion event.
+ #
+ # There are two requirements: process IO events as soon as they arrive
+ # and process ctrl-break from the user in a reasonable amount of time.
+ #
+ # There are three kinds of waiting.
+ # 1) GetQueuedCompletionStatus (self.port.getEvent) to wait for IO
+ # events only.
+ # 2) Msg* family of wait functions that can stop waiting when
+ # ctrl-break is detected (then, I think, Python converts it into a
+ # KeyboardInterrupt)
+ # 3) *Ex family of wait functions that put the thread into an
+ # "alertable" wait state which is supposedly triggered by IO completion
+ #
+ # 2) and 3) can be combined. Trouble is, my IO completion is not
+ # causing 3) to trigger, possibly because I do not use an IO completion
+ # callback. Windows is weird.
+ # There are two ways to handle this. I could use MsgWaitForSingleObject
+ # here and GetQueuedCompletionStatus in a thread. Or I could poll with
+ # a reasonable interval. Guess what! Threads are hard.
+
+ processed_events = 0
+ if timeout is None:
+ timeout = MAX_TIMEOUT
+ else:
+ timeout = min(MAX_TIMEOUT, int(1000 * timeout))
+ rc, numBytes, key, evt = self.port.getEvent(timeout)
+ while 1:
+ if rc == WAIT_TIMEOUT:
+ break
+ if key != KEY_WAKEUP:
+ assert key == KEY_NORMAL
+ log.callWithLogger(
+ evt.owner, self._callEventCallback, rc, numBytes, evt
+ )
+ processed_events += 1
+ if processed_events >= EVENTS_PER_LOOP:
+ break
+ rc, numBytes, key, evt = self.port.getEvent(0)
+
+ def _callEventCallback(self, rc, numBytes, evt):
+ owner = evt.owner
+ why = None
+ try:
+ evt.callback(rc, numBytes, evt)
+ handfn = getattr(owner, "getFileHandle", None)
+ if not handfn:
+ why = _NO_GETHANDLE
+ elif handfn() == -1:
+ why = _NO_FILEDESC
+ if why:
+ return # ignore handles that were closed
+ except BaseException:
+ why = sys.exc_info()[1]
+ log.err()
+ if why:
+ owner.loseConnection(failure.Failure(why))
+
+ def installWaker(self):
+ pass
+
+ def wakeUp(self):
+ self.port.postEvent(0, KEY_WAKEUP, None)
+
+ def registerHandle(self, handle):
+ self.port.addHandle(handle, KEY_NORMAL)
+
+ def createSocket(self, af, stype):
+ skt = socket.socket(af, stype)
+ self.registerHandle(skt.fileno())
+ return skt
+
+ def listenTCP(self, port, factory, backlog=50, interface=""):
+ """
+ @see: twisted.internet.interfaces.IReactorTCP.listenTCP
+ """
+ p = tcp.Port(port, factory, backlog, interface, self)
+ p.startListening()
+ return p
+
+ def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
+ """
+ @see: twisted.internet.interfaces.IReactorTCP.connectTCP
+ """
+ c = tcp.Connector(host, port, factory, timeout, bindAddress, self)
+ c.connect()
+ return c
+
+ if TLSMemoryBIOFactory is not None:
+
+ def listenSSL(self, port, factory, contextFactory, backlog=50, interface=""):
+ """
+ @see: twisted.internet.interfaces.IReactorSSL.listenSSL
+ """
+ port = self.listenTCP(
+ port,
+ TLSMemoryBIOFactory(contextFactory, False, factory),
+ backlog,
+ interface,
+ )
+ port._type = "TLS"
+ return port
+
+ def connectSSL(
+ self, host, port, factory, contextFactory, timeout=30, bindAddress=None
+ ):
+ """
+ @see: twisted.internet.interfaces.IReactorSSL.connectSSL
+ """
+ return self.connectTCP(
+ host,
+ port,
+ TLSMemoryBIOFactory(contextFactory, True, factory),
+ timeout,
+ bindAddress,
+ )
+
+ else:
+
+ def listenSSL(self, port, factory, contextFactory, backlog=50, interface=""):
+ """
+ Non-implementation of L{IReactorSSL.listenSSL}. Some dependency
+ is not satisfied. This implementation always raises
+ L{NotImplementedError}.
+ """
+ raise NotImplementedError(
+ "pyOpenSSL 0.10 or newer is required for SSL support in "
+ "iocpreactor. It is missing, so the reactor does not support "
+ "SSL APIs."
+ )
+
+ def connectSSL(
+ self, host, port, factory, contextFactory, timeout=30, bindAddress=None
+ ):
+ """
+ Non-implementation of L{IReactorSSL.connectSSL}. Some dependency
+ is not satisfied. This implementation always raises
+ L{NotImplementedError}.
+ """
+ raise NotImplementedError(
+ "pyOpenSSL 0.10 or newer is required for SSL support in "
+ "iocpreactor. It is missing, so the reactor does not support "
+ "SSL APIs."
+ )
+
+ def listenUDP(self, port, protocol, interface="", maxPacketSize=8192):
+ """
+ Connects a given L{DatagramProtocol} to the given numeric UDP port.
+
+ @returns: object conforming to L{IListeningPort}.
+ """
+ p = udp.Port(port, protocol, interface, maxPacketSize, self)
+ p.startListening()
+ return p
+
+ def listenMulticast(
+ self, port, protocol, interface="", maxPacketSize=8192, listenMultiple=False
+ ):
+ """
+ Connects a given DatagramProtocol to the given numeric UDP port.
+
+ EXPERIMENTAL.
+
+ @returns: object conforming to IListeningPort.
+ """
+ p = udp.MulticastPort(
+ port, protocol, interface, maxPacketSize, self, listenMultiple
+ )
+ p.startListening()
+ return p
+
+ def spawnProcess(
+ self,
+ processProtocol,
+ executable,
+ args=(),
+ env={},
+ path=None,
+ uid=None,
+ gid=None,
+ usePTY=0,
+ childFDs=None,
+ ):
+ """
+ Spawn a process.
+ """
+ if uid is not None:
+ raise ValueError("Setting UID is unsupported on this platform.")
+ if gid is not None:
+ raise ValueError("Setting GID is unsupported on this platform.")
+ if usePTY:
+ raise ValueError("PTYs are unsupported on this platform.")
+ if childFDs is not None:
+ raise ValueError(
+ "Custom child file descriptor mappings are unsupported on "
+ "this platform."
+ )
+ return Process(self, processProtocol, executable, args, env, path)
+
+ def removeAll(self):
+ res = list(self.handles)
+ self.handles.clear()
+ return res
+
+
+def install():
+ r = IOCPReactor()
+ main.installReactor(r)
+
+
+__all__ = ["IOCPReactor", "install"]
diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/tcp.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/tcp.py
new file mode 100644
index 0000000000..aadd685269
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/tcp.py
@@ -0,0 +1,608 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+TCP support for IOCP reactor
+"""
+
+import errno
+import socket
+import struct
+from typing import Optional
+
+from zope.interface import classImplements, implementer
+
+from twisted.internet import address, defer, error, interfaces, main
+from twisted.internet.abstract import _LogOwner, isIPv6Address
+from twisted.internet.iocpreactor import abstract, iocpsupport as _iocp
+from twisted.internet.iocpreactor.const import (
+ ERROR_CONNECTION_REFUSED,
+ ERROR_IO_PENDING,
+ ERROR_NETWORK_UNREACHABLE,
+ SO_UPDATE_ACCEPT_CONTEXT,
+ SO_UPDATE_CONNECT_CONTEXT,
+)
+from twisted.internet.iocpreactor.interfaces import IReadWriteHandle
+from twisted.internet.protocol import Protocol
+from twisted.internet.tcp import (
+ Connector as TCPConnector,
+ _AbortingMixin,
+ _BaseBaseClient,
+ _BaseTCPClient,
+ _getsockname,
+ _resolveIPv6,
+ _SocketCloser,
+)
+from twisted.python import failure, log, reflect
+
+try:
+ from twisted.internet._newtls import startTLS as __startTLS
+except ImportError:
+ _startTLS = None
+else:
+ _startTLS = __startTLS
+
+
+# ConnectEx returns these. XXX: find out what it does for timeout
+connectExErrors = {
+ ERROR_CONNECTION_REFUSED: errno.WSAECONNREFUSED, # type: ignore[attr-defined]
+ ERROR_NETWORK_UNREACHABLE: errno.WSAENETUNREACH, # type: ignore[attr-defined]
+}
+
+
+@implementer(IReadWriteHandle, interfaces.ITCPTransport, interfaces.ISystemHandle)
+class Connection(abstract.FileHandle, _SocketCloser, _AbortingMixin):
+ """
+ @ivar TLS: C{False} to indicate the connection is in normal TCP mode,
+ C{True} to indicate that TLS has been started and that operations must
+ be routed through the L{TLSMemoryBIOProtocol} instance.
+ """
+
+ TLS = False
+
+ def __init__(self, sock, proto, reactor=None):
+ abstract.FileHandle.__init__(self, reactor)
+ self.socket = sock
+ self.getFileHandle = sock.fileno
+ self.protocol = proto
+
+ def getHandle(self):
+ return self.socket
+
+ def dataReceived(self, rbuffer):
+ """
+ @param rbuffer: Data received.
+ @type rbuffer: L{bytes} or L{bytearray}
+ """
+ if isinstance(rbuffer, bytes):
+ pass
+ elif isinstance(rbuffer, bytearray):
+ # XXX: some day, we'll have protocols that can handle raw buffers
+ rbuffer = bytes(rbuffer)
+ else:
+ raise TypeError("data must be bytes or bytearray, not " + type(rbuffer))
+
+ self.protocol.dataReceived(rbuffer)
+
+ def readFromHandle(self, bufflist, evt):
+ return _iocp.recv(self.getFileHandle(), bufflist, evt)
+
+ def writeToHandle(self, buff, evt):
+ """
+ Send C{buff} to current file handle using C{_iocp.send}. The buffer
+ sent is limited to a size of C{self.SEND_LIMIT}.
+ """
+ writeView = memoryview(buff)
+ return _iocp.send(
+ self.getFileHandle(), writeView[0 : self.SEND_LIMIT].tobytes(), evt
+ )
+
+ def _closeWriteConnection(self):
+ try:
+ self.socket.shutdown(1)
+ except OSError:
+ pass
+ p = interfaces.IHalfCloseableProtocol(self.protocol, None)
+ if p:
+ try:
+ p.writeConnectionLost()
+ except BaseException:
+ f = failure.Failure()
+ log.err()
+ self.connectionLost(f)
+
+ def readConnectionLost(self, reason):
+ p = interfaces.IHalfCloseableProtocol(self.protocol, None)
+ if p:
+ try:
+ p.readConnectionLost()
+ except BaseException:
+ log.err()
+ self.connectionLost(failure.Failure())
+ else:
+ self.connectionLost(reason)
+
+ def connectionLost(self, reason):
+ if self.disconnected:
+ return
+ abstract.FileHandle.connectionLost(self, reason)
+ isClean = reason is None or not reason.check(error.ConnectionAborted)
+ self._closeSocket(isClean)
+ protocol = self.protocol
+ del self.protocol
+ del self.socket
+ del self.getFileHandle
+ protocol.connectionLost(reason)
+
+ def logPrefix(self):
+ """
+ Return the prefix to log with when I own the logging thread.
+ """
+ return self.logstr
+
+ def getTcpNoDelay(self):
+ return bool(self.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY))
+
+ def setTcpNoDelay(self, enabled):
+ self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled)
+
+ def getTcpKeepAlive(self):
+ return bool(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE))
+
+ def setTcpKeepAlive(self, enabled):
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
+
+ if _startTLS is not None:
+
+ def startTLS(self, contextFactory, normal=True):
+ """
+ @see: L{ITLSTransport.startTLS}
+ """
+ _startTLS(self, contextFactory, normal, abstract.FileHandle)
+
+ def write(self, data):
+ """
+ Write some data, either directly to the underlying handle or, if TLS
+ has been started, to the L{TLSMemoryBIOProtocol} for it to encrypt and
+ send.
+
+ @see: L{twisted.internet.interfaces.ITransport.write}
+ """
+ if self.disconnected:
+ return
+ if self.TLS:
+ self.protocol.write(data)
+ else:
+ abstract.FileHandle.write(self, data)
+
+ def writeSequence(self, iovec):
+ """
+ Write some data, either directly to the underlying handle or, if TLS
+ has been started, to the L{TLSMemoryBIOProtocol} for it to encrypt and
+ send.
+
+ @see: L{twisted.internet.interfaces.ITransport.writeSequence}
+ """
+ if self.disconnected:
+ return
+ if self.TLS:
+ self.protocol.writeSequence(iovec)
+ else:
+ abstract.FileHandle.writeSequence(self, iovec)
+
+ def loseConnection(self, reason=None):
+ """
+ Close the underlying handle or, if TLS has been started, first shut it
+ down.
+
+ @see: L{twisted.internet.interfaces.ITransport.loseConnection}
+ """
+ if self.TLS:
+ if self.connected and not self.disconnecting:
+ self.protocol.loseConnection()
+ else:
+ abstract.FileHandle.loseConnection(self, reason)
+
+ def registerProducer(self, producer, streaming):
+ """
+ Register a producer.
+
+ If TLS is enabled, the TLS connection handles this.
+ """
+ if self.TLS:
+ # Registering a producer before we're connected shouldn't be a
+ # problem. If we end up with a write(), that's already handled in
+ # the write() code above, and there are no other potential
+ # side-effects.
+ self.protocol.registerProducer(producer, streaming)
+ else:
+ abstract.FileHandle.registerProducer(self, producer, streaming)
+
+ def unregisterProducer(self):
+ """
+ Unregister a producer.
+
+ If TLS is enabled, the TLS connection handles this.
+ """
+ if self.TLS:
+ self.protocol.unregisterProducer()
+ else:
+ abstract.FileHandle.unregisterProducer(self)
+
+ def getHost(self):
+ # ITCPTransport.getHost
+ pass
+
+ def getPeer(self):
+ # ITCPTransport.getPeer
+ pass
+
+
+if _startTLS is not None:
+ classImplements(Connection, interfaces.ITLSTransport)
+
+
+class Client(_BaseBaseClient, _BaseTCPClient, Connection):
+ """
+ @ivar _tlsClientDefault: Always C{True}, indicating that this is a client
+ connection, and by default when TLS is negotiated this class will act as
+ a TLS client.
+ """
+
+ addressFamily = socket.AF_INET
+ socketType = socket.SOCK_STREAM
+
+ _tlsClientDefault = True
+ _commonConnection = Connection
+
+ def __init__(self, host, port, bindAddress, connector, reactor):
+ # ConnectEx documentation says socket _has_ to be bound
+ if bindAddress is None:
+ bindAddress = ("", 0)
+ self.reactor = reactor # createInternetSocket needs this
+ _BaseTCPClient.__init__(self, host, port, bindAddress, connector, reactor)
+
+ def createInternetSocket(self):
+ """
+ Create a socket registered with the IOCP reactor.
+
+ @see: L{_BaseTCPClient}
+ """
+ return self.reactor.createSocket(self.addressFamily, self.socketType)
+
+ def _collectSocketDetails(self):
+ """
+ Clean up potentially circular references to the socket and to its
+ C{getFileHandle} method.
+
+ @see: L{_BaseBaseClient}
+ """
+ del self.socket, self.getFileHandle
+
+ def _stopReadingAndWriting(self):
+ """
+ Remove the active handle from the reactor.
+
+ @see: L{_BaseBaseClient}
+ """
+ self.reactor.removeActiveHandle(self)
+
+ def cbConnect(self, rc, data, evt):
+ if rc:
+ rc = connectExErrors.get(rc, rc)
+ self.failIfNotConnected(
+ error.getConnectError((rc, errno.errorcode.get(rc, "Unknown error")))
+ )
+ else:
+ self.socket.setsockopt(
+ socket.SOL_SOCKET,
+ SO_UPDATE_CONNECT_CONTEXT,
+ struct.pack("P", self.socket.fileno()),
+ )
+ self.protocol = self.connector.buildProtocol(self.getPeer())
+ self.connected = True
+ logPrefix = self._getLogPrefix(self.protocol)
+ self.logstr = logPrefix + ",client"
+ if self.protocol is None:
+ # Factory.buildProtocol is allowed to return None. In that
+ # case, make up a protocol to satisfy the rest of the
+ # implementation; connectionLost is going to be called on
+ # something, for example. This is easier than adding special
+ # case support for a None protocol throughout the rest of the
+ # transport implementation.
+ self.protocol = Protocol()
+ # But dispose of the connection quickly.
+ self.loseConnection()
+ else:
+ self.protocol.makeConnection(self)
+ self.startReading()
+
+ def doConnect(self):
+ if not hasattr(self, "connector"):
+ # this happens if we connector.stopConnecting in
+ # factory.startedConnecting
+ return
+ assert _iocp.have_connectex
+ self.reactor.addActiveHandle(self)
+ evt = _iocp.Event(self.cbConnect, self)
+
+ rc = _iocp.connect(self.socket.fileno(), self.realAddress, evt)
+ if rc and rc != ERROR_IO_PENDING:
+ self.cbConnect(rc, 0, evt)
+
+
+class Server(Connection):
+ """
+ Serverside socket-stream connection class.
+
+ I am a serverside network connection transport; a socket which came from an
+ accept() on a server.
+
+ @ivar _tlsClientDefault: Always C{False}, indicating that this is a server
+ connection, and by default when TLS is negotiated this class will act as
+ a TLS server.
+ """
+
+ _tlsClientDefault = False
+
+ def __init__(self, sock, protocol, clientAddr, serverAddr, sessionno, reactor):
+ """
+ Server(sock, protocol, client, server, sessionno)
+
+ Initialize me with a socket, a protocol, a descriptor for my peer (a
+ tuple of host, port describing the other end of the connection), an
+ instance of Port, and a session number.
+ """
+ Connection.__init__(self, sock, protocol, reactor)
+ self.serverAddr = serverAddr
+ self.clientAddr = clientAddr
+ self.sessionno = sessionno
+ logPrefix = self._getLogPrefix(self.protocol)
+ self.logstr = f"{logPrefix},{sessionno},{self.clientAddr.host}"
+ self.repstr: str = "<{} #{} on {}>".format(
+ self.protocol.__class__.__name__,
+ self.sessionno,
+ self.serverAddr.port,
+ )
+ self.connected = True
+ self.startReading()
+
+ def __repr__(self) -> str:
+ """
+ A string representation of this connection.
+ """
+ return self.repstr
+
+ def getHost(self):
+ """
+ Returns an IPv4Address.
+
+ This indicates the server's address.
+ """
+ return self.serverAddr
+
+ def getPeer(self):
+ """
+ Returns an IPv4Address.
+
+ This indicates the client's address.
+ """
+ return self.clientAddr
+
+
+class Connector(TCPConnector):
+ def _makeTransport(self):
+ return Client(self.host, self.port, self.bindAddress, self, self.reactor)
+
+
+@implementer(interfaces.IListeningPort)
+class Port(_SocketCloser, _LogOwner):
+ connected = False
+ disconnected = False
+ disconnecting = False
+ addressFamily = socket.AF_INET
+ socketType = socket.SOCK_STREAM
+ _addressType = address.IPv4Address
+ sessionno = 0
+
+ # Actual port number being listened on, only set to a non-None
+ # value when we are actually listening.
+ _realPortNumber: Optional[int] = None
+
+ # A string describing the connections which will be created by this port.
+ # Normally this is C{"TCP"}, since this is a TCP port, but when the TLS
+ # implementation re-uses this class it overrides the value with C{"TLS"}.
+ # Only used for logging.
+ _type = "TCP"
+
+ def __init__(self, port, factory, backlog=50, interface="", reactor=None):
+ self.port = port
+ self.factory = factory
+ self.backlog = backlog
+ self.interface = interface
+ self.reactor = reactor
+ if isIPv6Address(interface):
+ self.addressFamily = socket.AF_INET6
+ self._addressType = address.IPv6Address
+
+ def __repr__(self) -> str:
+ if self._realPortNumber is not None:
+ return "<{} of {} on {}>".format(
+ self.__class__,
+ self.factory.__class__,
+ self._realPortNumber,
+ )
+ else:
+ return "<{} of {} (not listening)>".format(
+ self.__class__,
+ self.factory.__class__,
+ )
+
+ def startListening(self):
+ try:
+ skt = self.reactor.createSocket(self.addressFamily, self.socketType)
+ # TODO: resolve self.interface if necessary
+ if self.addressFamily == socket.AF_INET6:
+ addr = _resolveIPv6(self.interface, self.port)
+ else:
+ addr = (self.interface, self.port)
+ skt.bind(addr)
+ except OSError as le:
+ raise error.CannotListenError(self.interface, self.port, le)
+
+ self.addrLen = _iocp.maxAddrLen(skt.fileno())
+
+ # Make sure that if we listened on port 0, we update that to
+ # reflect what the OS actually assigned us.
+ self._realPortNumber = skt.getsockname()[1]
+
+ log.msg(
+ "%s starting on %s"
+ % (self._getLogPrefix(self.factory), self._realPortNumber)
+ )
+
+ self.factory.doStart()
+ skt.listen(self.backlog)
+ self.connected = True
+ self.disconnected = False
+ self.reactor.addActiveHandle(self)
+ self.socket = skt
+ self.getFileHandle = self.socket.fileno
+ self.doAccept()
+
+ def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)):
+ """
+ Stop accepting connections on this port.
+
+ This will shut down my socket and call self.connectionLost().
+ It returns a deferred which will fire successfully when the
+ port is actually closed.
+ """
+ self.disconnecting = True
+ if self.connected:
+ self.deferred = defer.Deferred()
+ self.reactor.callLater(0, self.connectionLost, connDone)
+ return self.deferred
+
+ stopListening = loseConnection
+
+ def _logConnectionLostMsg(self):
+ """
+ Log message for closing port
+ """
+ log.msg(f"({self._type} Port {self._realPortNumber} Closed)")
+
+ def connectionLost(self, reason):
+ """
+ Cleans up the socket.
+ """
+ self._logConnectionLostMsg()
+ self._realPortNumber = None
+ d = None
+ if hasattr(self, "deferred"):
+ d = self.deferred
+ del self.deferred
+
+ self.disconnected = True
+ self.reactor.removeActiveHandle(self)
+ self.connected = False
+ self._closeSocket(True)
+ del self.socket
+ del self.getFileHandle
+
+ try:
+ self.factory.doStop()
+ except BaseException:
+ self.disconnecting = False
+ if d is not None:
+ d.errback(failure.Failure())
+ else:
+ raise
+ else:
+ self.disconnecting = False
+ if d is not None:
+ d.callback(None)
+
+ def logPrefix(self):
+ """
+ Returns the name of my class, to prefix log entries with.
+ """
+ return reflect.qual(self.factory.__class__)
+
+ def getHost(self):
+ """
+ Returns an IPv4Address or IPv6Address.
+
+ This indicates the server's address.
+ """
+ return self._addressType("TCP", *_getsockname(self.socket))
+
+ def cbAccept(self, rc, data, evt):
+ self.handleAccept(rc, evt)
+ if not (self.disconnecting or self.disconnected):
+ self.doAccept()
+
+ def handleAccept(self, rc, evt):
+ if self.disconnecting or self.disconnected:
+ return False
+
+ # possible errors:
+ # (WSAEMFILE, WSAENOBUFS, WSAENFILE, WSAENOMEM, WSAECONNABORTED)
+ if rc:
+ log.msg(
+ "Could not accept new connection -- %s (%s)"
+ % (errno.errorcode.get(rc, "unknown error"), rc)
+ )
+ return False
+ else:
+ # Inherit the properties from the listening port socket as
+ # documented in the `Remarks` section of AcceptEx.
+ # https://docs.microsoft.com/en-us/windows/win32/api/mswsock/nf-mswsock-acceptex
+ # In this way we can call getsockname and getpeername on the
+ # accepted socket.
+ evt.newskt.setsockopt(
+ socket.SOL_SOCKET,
+ SO_UPDATE_ACCEPT_CONTEXT,
+ struct.pack("P", self.socket.fileno()),
+ )
+ family, lAddr, rAddr = _iocp.get_accept_addrs(evt.newskt.fileno(), evt.buff)
+ assert family == self.addressFamily
+
+ # Build an IPv6 address that includes the scopeID, if necessary
+ if "%" in lAddr[0]:
+ scope = int(lAddr[0].split("%")[1])
+ lAddr = (lAddr[0], lAddr[1], 0, scope)
+ if "%" in rAddr[0]:
+ scope = int(rAddr[0].split("%")[1])
+ rAddr = (rAddr[0], rAddr[1], 0, scope)
+
+ protocol = self.factory.buildProtocol(self._addressType("TCP", *rAddr))
+ if protocol is None:
+ evt.newskt.close()
+ else:
+ s = self.sessionno
+ self.sessionno = s + 1
+ transport = Server(
+ evt.newskt,
+ protocol,
+ self._addressType("TCP", *rAddr),
+ self._addressType("TCP", *lAddr),
+ s,
+ self.reactor,
+ )
+ protocol.makeConnection(transport)
+ return True
+
+ def doAccept(self):
+ evt = _iocp.Event(self.cbAccept, self)
+
+ # see AcceptEx documentation
+ evt.buff = buff = bytearray(2 * (self.addrLen + 16))
+
+ evt.newskt = newskt = self.reactor.createSocket(
+ self.addressFamily, self.socketType
+ )
+ rc = _iocp.accept(self.socket.fileno(), newskt.fileno(), buff, evt)
+
+ if rc and rc != ERROR_IO_PENDING:
+ self.handleAccept(rc, evt)
diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/udp.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/udp.py
new file mode 100644
index 0000000000..59c5fefb4b
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/udp.py
@@ -0,0 +1,428 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+UDP support for IOCP reactor
+"""
+
+import errno
+import socket
+import struct
+import warnings
+from typing import Optional
+
+from zope.interface import implementer
+
+from twisted.internet import address, defer, error, interfaces
+from twisted.internet.abstract import isIPAddress, isIPv6Address
+from twisted.internet.iocpreactor import abstract, iocpsupport as _iocp
+from twisted.internet.iocpreactor.const import (
+ ERROR_CONNECTION_REFUSED,
+ ERROR_IO_PENDING,
+ ERROR_PORT_UNREACHABLE,
+)
+from twisted.internet.iocpreactor.interfaces import IReadWriteHandle
+from twisted.python import failure, log
+
+
+@implementer(
+ IReadWriteHandle,
+ interfaces.IListeningPort,
+ interfaces.IUDPTransport,
+ interfaces.ISystemHandle,
+)
+class Port(abstract.FileHandle):
+ """
+ UDP port, listening for packets.
+
+ @ivar addressFamily: L{socket.AF_INET} or L{socket.AF_INET6}, depending on
+ whether this port is listening on an IPv4 address or an IPv6 address.
+ """
+
+ addressFamily = socket.AF_INET
+ socketType = socket.SOCK_DGRAM
+ dynamicReadBuffers = False
+
+ # Actual port number being listened on, only set to a non-None
+ # value when we are actually listening.
+ _realPortNumber: Optional[int] = None
+
+ def __init__(self, port, proto, interface="", maxPacketSize=8192, reactor=None):
+ """
+ Initialize with a numeric port to listen on.
+ """
+ self.port = port
+ self.protocol = proto
+ self.readBufferSize = maxPacketSize
+ self.interface = interface
+ self.setLogStr()
+ self._connectedAddr = None
+ self._setAddressFamily()
+
+ abstract.FileHandle.__init__(self, reactor)
+
+ skt = socket.socket(self.addressFamily, self.socketType)
+ addrLen = _iocp.maxAddrLen(skt.fileno())
+ self.addressBuffer = bytearray(addrLen)
+ # WSARecvFrom takes an int
+ self.addressLengthBuffer = bytearray(struct.calcsize("i"))
+
+ def _setAddressFamily(self):
+ """
+ Resolve address family for the socket.
+ """
+ if isIPv6Address(self.interface):
+ self.addressFamily = socket.AF_INET6
+ elif isIPAddress(self.interface):
+ self.addressFamily = socket.AF_INET
+ elif self.interface:
+ raise error.InvalidAddressError(
+ self.interface, "not an IPv4 or IPv6 address"
+ )
+
+ def __repr__(self) -> str:
+ if self._realPortNumber is not None:
+ return f"<{self.protocol.__class__} on {self._realPortNumber}>"
+ else:
+ return f"<{self.protocol.__class__} not connected>"
+
+ def getHandle(self):
+ """
+ Return a socket object.
+ """
+ return self.socket
+
+ def startListening(self):
+ """
+ Create and bind my socket, and begin listening on it.
+
+ This is called on unserialization, and must be called after creating a
+ server to begin listening on the specified port.
+ """
+ self._bindSocket()
+ self._connectToProtocol()
+
+ def createSocket(self):
+ return self.reactor.createSocket(self.addressFamily, self.socketType)
+
+ def _bindSocket(self):
+ try:
+ skt = self.createSocket()
+ skt.bind((self.interface, self.port))
+ except OSError as le:
+ raise error.CannotListenError(self.interface, self.port, le)
+
+ # Make sure that if we listened on port 0, we update that to
+ # reflect what the OS actually assigned us.
+ self._realPortNumber = skt.getsockname()[1]
+
+ log.msg(
+ "%s starting on %s"
+ % (self._getLogPrefix(self.protocol), self._realPortNumber)
+ )
+
+ self.connected = True
+ self.socket = skt
+ self.getFileHandle = self.socket.fileno
+
+ def _connectToProtocol(self):
+ self.protocol.makeConnection(self)
+ self.startReading()
+ self.reactor.addActiveHandle(self)
+
+ def cbRead(self, rc, data, evt):
+ if self.reading:
+ self.handleRead(rc, data, evt)
+ self.doRead()
+
+ def handleRead(self, rc, data, evt):
+ if rc in (
+ errno.WSAECONNREFUSED,
+ errno.WSAECONNRESET,
+ ERROR_CONNECTION_REFUSED,
+ ERROR_PORT_UNREACHABLE,
+ ):
+ if self._connectedAddr:
+ self.protocol.connectionRefused()
+ elif rc:
+ log.msg(
+ "error in recvfrom -- %s (%s)"
+ % (errno.errorcode.get(rc, "unknown error"), rc)
+ )
+ else:
+ try:
+ self.protocol.datagramReceived(
+ bytes(evt.buff[:data]), _iocp.makesockaddr(evt.addr_buff)
+ )
+ except BaseException:
+ log.err()
+
+ def doRead(self):
+ evt = _iocp.Event(self.cbRead, self)
+
+ evt.buff = buff = self._readBuffers[0]
+ evt.addr_buff = addr_buff = self.addressBuffer
+ evt.addr_len_buff = addr_len_buff = self.addressLengthBuffer
+ rc, data = _iocp.recvfrom(
+ self.getFileHandle(), buff, addr_buff, addr_len_buff, evt
+ )
+
+ if rc and rc != ERROR_IO_PENDING:
+ # If the error was not 0 or IO_PENDING then that means recvfrom() hit a
+ # failure condition. In this situation recvfrom() gives us our response
+ # right away and we don't need to wait for Windows to call the callback
+ # on our event. In fact, windows will not call it for us so we must call it
+ # ourselves manually
+ self.reactor.callLater(0, self.cbRead, rc, data, evt)
+
+ def write(self, datagram, addr=None):
+ """
+ Write a datagram.
+
+ @param addr: should be a tuple (ip, port), can be None in connected
+ mode.
+ """
+ if self._connectedAddr:
+ assert addr in (None, self._connectedAddr)
+ try:
+ return self.socket.send(datagram)
+ except OSError as se:
+ no = se.args[0]
+ if no == errno.WSAEINTR:
+ return self.write(datagram)
+ elif no == errno.WSAEMSGSIZE:
+ raise error.MessageLengthError("message too long")
+ elif no in (
+ errno.WSAECONNREFUSED,
+ errno.WSAECONNRESET,
+ ERROR_CONNECTION_REFUSED,
+ ERROR_PORT_UNREACHABLE,
+ ):
+ self.protocol.connectionRefused()
+ else:
+ raise
+ else:
+ assert addr != None
+ if (
+ not isIPAddress(addr[0])
+ and not isIPv6Address(addr[0])
+ and addr[0] != "<broadcast>"
+ ):
+ raise error.InvalidAddressError(
+ addr[0], "write() only accepts IP addresses, not hostnames"
+ )
+ if isIPAddress(addr[0]) and self.addressFamily == socket.AF_INET6:
+ raise error.InvalidAddressError(
+ addr[0], "IPv6 port write() called with IPv4 address"
+ )
+ if isIPv6Address(addr[0]) and self.addressFamily == socket.AF_INET:
+ raise error.InvalidAddressError(
+ addr[0], "IPv4 port write() called with IPv6 address"
+ )
+ try:
+ return self.socket.sendto(datagram, addr)
+ except OSError as se:
+ no = se.args[0]
+ if no == errno.WSAEINTR:
+ return self.write(datagram, addr)
+ elif no == errno.WSAEMSGSIZE:
+ raise error.MessageLengthError("message too long")
+ elif no in (
+ errno.WSAECONNREFUSED,
+ errno.WSAECONNRESET,
+ ERROR_CONNECTION_REFUSED,
+ ERROR_PORT_UNREACHABLE,
+ ):
+ # in non-connected UDP ECONNREFUSED is platform dependent,
+ # I think and the info is not necessarily useful.
+ # Nevertheless maybe we should call connectionRefused? XXX
+ return
+ else:
+ raise
+
+ def writeSequence(self, seq, addr):
+ self.write(b"".join(seq), addr)
+
+ def connect(self, host, port):
+ """
+ 'Connect' to remote server.
+ """
+ if self._connectedAddr:
+ raise RuntimeError(
+ "already connected, reconnecting is not currently supported "
+ "(talk to itamar if you want this)"
+ )
+ if not isIPAddress(host) and not isIPv6Address(host):
+ raise error.InvalidAddressError(host, "not an IPv4 or IPv6 address.")
+ self._connectedAddr = (host, port)
+ self.socket.connect((host, port))
+
+ def _loseConnection(self):
+ self.stopReading()
+ self.reactor.removeActiveHandle(self)
+ if self.connected: # actually means if we are *listening*
+ self.reactor.callLater(0, self.connectionLost)
+
+ def stopListening(self):
+ if self.connected:
+ result = self.d = defer.Deferred()
+ else:
+ result = None
+ self._loseConnection()
+ return result
+
+ def loseConnection(self):
+ warnings.warn(
+ "Please use stopListening() to disconnect port",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ self.stopListening()
+
+ def connectionLost(self, reason=None):
+ """
+ Cleans up my socket.
+ """
+ log.msg("(UDP Port %s Closed)" % self._realPortNumber)
+ self._realPortNumber = None
+ abstract.FileHandle.connectionLost(self, reason)
+ self.protocol.doStop()
+ self.socket.close()
+ del self.socket
+ del self.getFileHandle
+ if hasattr(self, "d"):
+ self.d.callback(None)
+ del self.d
+
+ def setLogStr(self):
+ """
+ Initialize the C{logstr} attribute to be used by C{logPrefix}.
+ """
+ logPrefix = self._getLogPrefix(self.protocol)
+ self.logstr = "%s (UDP)" % logPrefix
+
+ def logPrefix(self):
+ """
+ Returns the name of my class, to prefix log entries with.
+ """
+ return self.logstr
+
+ def getHost(self):
+ """
+ Return the local address of the UDP connection
+
+ @returns: the local address of the UDP connection
+ @rtype: L{IPv4Address} or L{IPv6Address}
+ """
+ addr = self.socket.getsockname()
+ if self.addressFamily == socket.AF_INET:
+ return address.IPv4Address("UDP", *addr)
+ elif self.addressFamily == socket.AF_INET6:
+ return address.IPv6Address("UDP", *(addr[:2]))
+
+ def setBroadcastAllowed(self, enabled):
+ """
+ Set whether this port may broadcast. This is disabled by default.
+
+ @param enabled: Whether the port may broadcast.
+ @type enabled: L{bool}
+ """
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, enabled)
+
+ def getBroadcastAllowed(self):
+ """
+ Checks if broadcast is currently allowed on this port.
+
+ @return: Whether this port may broadcast.
+ @rtype: L{bool}
+ """
+ return bool(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST))
+
+
+class MulticastMixin:
+ """
+ Implement multicast functionality.
+ """
+
+ def getOutgoingInterface(self):
+ i = self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF)
+ return socket.inet_ntoa(struct.pack("@i", i))
+
+ def setOutgoingInterface(self, addr):
+ """
+ Returns Deferred of success.
+ """
+ return self.reactor.resolve(addr).addCallback(self._setInterface)
+
+ def _setInterface(self, addr):
+ i = socket.inet_aton(addr)
+ self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, i)
+ return 1
+
+ def getLoopbackMode(self):
+ return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP)
+
+ def setLoopbackMode(self, mode):
+ mode = struct.pack("b", bool(mode))
+ self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, mode)
+
+ def getTTL(self):
+ return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL)
+
+ def setTTL(self, ttl):
+ ttl = struct.pack("B", ttl)
+ self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
+
+ def joinGroup(self, addr, interface=""):
+ """
+ Join a multicast group. Returns Deferred of success.
+ """
+ return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 1)
+
+ def _joinAddr1(self, addr, interface, join):
+ return self.reactor.resolve(interface).addCallback(self._joinAddr2, addr, join)
+
+ def _joinAddr2(self, interface, addr, join):
+ addr = socket.inet_aton(addr)
+ interface = socket.inet_aton(interface)
+ if join:
+ cmd = socket.IP_ADD_MEMBERSHIP
+ else:
+ cmd = socket.IP_DROP_MEMBERSHIP
+ try:
+ self.socket.setsockopt(socket.IPPROTO_IP, cmd, addr + interface)
+ except OSError as e:
+ return failure.Failure(error.MulticastJoinError(addr, interface, *e.args))
+
+ def leaveGroup(self, addr, interface=""):
+ """
+ Leave multicast group, return Deferred of success.
+ """
+ return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 0)
+
+
+@implementer(interfaces.IMulticastTransport)
+class MulticastPort(MulticastMixin, Port):
+ """
+ UDP Port that supports multicasting.
+ """
+
+ def __init__(
+ self,
+ port,
+ proto,
+ interface="",
+ maxPacketSize=8192,
+ reactor=None,
+ listenMultiple=False,
+ ):
+ Port.__init__(self, port, proto, interface, maxPacketSize, reactor)
+ self.listenMultiple = listenMultiple
+
+ def createSocket(self):
+ skt = Port.createSocket(self)
+ if self.listenMultiple:
+ skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ if hasattr(socket, "SO_REUSEPORT"):
+ skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+ return skt