aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/internet
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-12-17 12:07:28 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-12-17 12:18:43 +0300
commit48bd5f88777f4dc94fd41a7dd22808ed639b985d (patch)
tree6a899d7cc8bd632073408198260a93d76f99ef32 /contrib/python/Twisted/py3/twisted/internet
parent3e05dc5f5c47aa8d220db7b5508cfbd4a0d8919f (diff)
downloadydb-48bd5f88777f4dc94fd41a7dd22808ed639b985d.tar.gz
Intermediate changes
commit_hash:3786c4fc65af12274eea45a3ef9de6050e262ac0
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/internet')
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/_multicast.py161
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/abstract.py34
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/endpoints.py255
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/interfaces.py89
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/iocpreactor/reactor.py4
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/iocpreactor/udp.py101
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/posixbase.py12
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/protocol.py13
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/tcp.py33
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/udp.py88
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/unix.py30
11 files changed, 524 insertions, 296 deletions
diff --git a/contrib/python/Twisted/py3/twisted/internet/_multicast.py b/contrib/python/Twisted/py3/twisted/internet/_multicast.py
new file mode 100644
index 0000000000..99b1c937f2
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/internet/_multicast.py
@@ -0,0 +1,161 @@
+# -*- test-case-name: twisted.test.test_udp -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+from __future__ import annotations
+
+import socket
+import struct
+from typing import Any
+
+from twisted.internet.abstract import isIPAddress, isIPv6Address
+from twisted.internet.defer import Deferred, succeed
+from twisted.internet.error import MulticastJoinError
+from twisted.internet.interfaces import IReactorCore
+
+
+def _maybeResolve(reactor: IReactorCore, addr: str) -> Deferred[str]:
+ if isIPv6Address(addr) or isIPAddress(addr):
+ return succeed(addr)
+ return reactor.resolve(addr)
+
+
+class MulticastMixin:
+ """
+ Implement multicast functionality.
+ """
+
+ addressFamily: socket.AddressFamily
+ reactor: Any
+ socket: socket.socket
+
+ def _addrpack(self, addr: str) -> bytes:
+ """
+ Pack an IP address literal into bytes, according to the address family
+ of this transport.
+ """
+ try:
+ return socket.inet_pton(self.addressFamily, addr)
+ except OSError:
+ raise MulticastJoinError(
+ f"invalid address literal for {socket.AddressFamily(self.addressFamily).name}: {addr!r}"
+ )
+
+ @property
+ def _ipproto(self) -> int:
+ return (
+ socket.IPPROTO_IP
+ if self.addressFamily == socket.AF_INET
+ else socket.IPPROTO_IPV6
+ )
+
+ @property
+ def _multiloop(self) -> int:
+ return (
+ socket.IP_MULTICAST_LOOP
+ if self.addressFamily == socket.AF_INET
+ else socket.IPV6_MULTICAST_LOOP
+ )
+
+ @property
+ def _multiif(self) -> int:
+ return (
+ socket.IP_MULTICAST_IF
+ if self.addressFamily == socket.AF_INET
+ else socket.IPV6_MULTICAST_IF
+ )
+
+ @property
+ def _joingroup(self) -> int:
+ return (
+ socket.IP_ADD_MEMBERSHIP
+ if self.addressFamily == socket.AF_INET
+ else socket.IPV6_JOIN_GROUP
+ )
+
+ @property
+ def _leavegroup(self) -> int:
+ return (
+ socket.IP_DROP_MEMBERSHIP
+ if self.addressFamily == socket.AF_INET
+ else socket.IPV6_LEAVE_GROUP
+ )
+
+ def getOutgoingInterface(self) -> str | int:
+ blen = 0x4 if self.addressFamily == socket.AF_INET else 0x10
+ ipproto = self._ipproto
+ multiif = self._multiif
+ i = self.socket.getsockopt(ipproto, multiif, blen)
+ from sys import byteorder
+
+ if self.addressFamily == socket.AF_INET6:
+ return int.from_bytes(i, byteorder)
+ return socket.inet_ntop(self.addressFamily, i)
+
+ def setOutgoingInterface(self, addr: str | int) -> Deferred[int]:
+ """
+ @see: L{IMulticastTransport.setOutgoingInterface}
+ """
+
+ async def asynchronously() -> int:
+ i: bytes | int
+ if self.addressFamily == socket.AF_INET:
+ assert isinstance(
+ addr, str
+ ), "IPv4 interfaces are specified as addresses"
+ i = self._addrpack(await _maybeResolve(self.reactor, addr))
+ else:
+ assert isinstance(
+ addr, int
+ ), "IPv6 interfaces are specified as integers"
+ i = addr
+ self.socket.setsockopt(self._ipproto, self._multiif, i)
+ return 1
+
+ return Deferred.fromCoroutine(asynchronously())
+
+ def getLoopbackMode(self) -> bool:
+ return bool(self.socket.getsockopt(self._ipproto, self._multiloop))
+
+ def setLoopbackMode(self, mode: int) -> None:
+ # mode = struct.pack("b", bool(mode))
+ a = self._ipproto
+ b = self._multiloop
+ self.socket.setsockopt(a, b, int(bool(mode)))
+
+ def getTTL(self) -> int:
+ return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL)
+
+ def setTTL(self, ttl: int) -> None:
+ bttl = struct.pack("B", ttl)
+ self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, bttl)
+
+ def _joinleave(self, addr: str, interface: str, join: bool) -> Deferred[None]:
+ cmd = self._joingroup if join else self._leavegroup
+ if not interface:
+ interface = "0.0.0.0" if self.addressFamily == socket.AF_INET else "::"
+
+ async def impl() -> None:
+ resaddr = await _maybeResolve(self.reactor, addr)
+ resif = await _maybeResolve(self.reactor, interface)
+
+ packaddr = self._addrpack(resaddr)
+ packif = self._addrpack(resif)
+ try:
+ self.socket.setsockopt(self._ipproto, cmd, packaddr + packif)
+ except OSError as e:
+ raise MulticastJoinError(addr, interface, *e.args) from e
+
+ return Deferred.fromCoroutine(impl())
+
+ def joinGroup(self, addr: str, interface: str = "") -> Deferred[None]:
+ """
+ @see: L{IMulticastTransport.joinGroup}
+ """
+ return self._joinleave(addr, interface, True)
+
+ def leaveGroup(self, addr: str, interface: str = "") -> Deferred[None]:
+ """
+ @see: L{IMulticastTransport.leaveGroup}
+ """
+ return self._joinleave(addr, interface, False)
diff --git a/contrib/python/Twisted/py3/twisted/internet/abstract.py b/contrib/python/Twisted/py3/twisted/internet/abstract.py
index 45a80383c2..09bd751199 100644
--- a/contrib/python/Twisted/py3/twisted/internet/abstract.py
+++ b/contrib/python/Twisted/py3/twisted/internet/abstract.py
@@ -6,9 +6,10 @@
Support for generic select()able objects.
"""
+from __future__ import annotations
from socket import AF_INET, AF_INET6, inet_pton
-from typing import Iterable, List, Optional
+from typing import Iterable, List, Optional, Union
from zope.interface import implementer
@@ -165,6 +166,12 @@ class FileDescriptor(_ConsumerMixin, _LogOwner):
valid to be passed to select(2).
"""
+ # We have two buffers: a list (_tempDataBuffer), and a byte string
+ # (self.dataBuffer). A given write may not be able to write everything, and
+ # we also limit to sending at most SEND_LIMIT bytes at a time. Thus, we
+ # also have self.offset tracks where in self.dataBuffer we are in the
+ # writing process, to reduce unnecessary copying if we failed to write all
+ # the data.
connected = 0
disconnected = 0
disconnecting = 0
@@ -208,7 +215,7 @@ class FileDescriptor(_ConsumerMixin, _LogOwner):
self.stopReading()
self.stopWriting()
- def writeSomeData(self, data: bytes) -> None:
+ def writeSomeData(self, data: bytes) -> Union[int, BaseException]:
"""
Write as much as possible of the given data, immediately.
@@ -242,12 +249,22 @@ class FileDescriptor(_ConsumerMixin, _LogOwner):
@see: L{twisted.internet.interfaces.IWriteDescriptor.doWrite}.
"""
- if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
- # If there is currently less than SEND_LIMIT bytes left to send
- # in the string, extend it with the array data.
- self.dataBuffer = _concatenate(
- self.dataBuffer, self.offset, self._tempDataBuffer
- )
+ # We only send at most SEND_LIMIT bytes at a time. If the amount of
+ # bytes in our send-immediately buffer is smaller than that limit,
+ # probably a good time to add the bytes from our secondary, list-based
+ # buffer (self._tempDataBuffer.)
+ remaining = len(self.dataBuffer) - self.offset
+ if remaining < self.SEND_LIMIT:
+ if remaining > 0:
+ # There is currently some data to write, extend it with the
+ # list data.
+ self.dataBuffer = _concatenate(
+ self.dataBuffer, self.offset, self._tempDataBuffer
+ )
+ else:
+ # self.dataBuffer has nothing left to write, so just convert
+ # the list buffer to bytes buffer in a cheaper way:
+ self.dataBuffer = b"".join(self._tempDataBuffer)
self.offset = 0
self._tempDataBuffer = []
self._tempDataLen = 0
@@ -256,6 +273,7 @@ class FileDescriptor(_ConsumerMixin, _LogOwner):
if self.offset:
l = self.writeSomeData(lazyByteSlice(self.dataBuffer, self.offset))
else:
+ # Optimization: skip lazyByteSlice() when it's unnecessary.
l = self.writeSomeData(self.dataBuffer)
# There is no writeSomeData implementation in Twisted which returns
diff --git a/contrib/python/Twisted/py3/twisted/internet/endpoints.py b/contrib/python/Twisted/py3/twisted/internet/endpoints.py
index a98fd2ba43..dfa0cc43ce 100644
--- a/contrib/python/Twisted/py3/twisted/internet/endpoints.py
+++ b/contrib/python/Twisted/py3/twisted/internet/endpoints.py
@@ -12,21 +12,22 @@ parsed by the L{clientFromString} and L{serverFromString} functions.
@since: 10.1
"""
+from __future__ import annotations
import os
import re
import socket
import warnings
-from typing import Optional, Sequence, Type
+from typing import Any, Iterable, Optional, Sequence, Type
from unicodedata import normalize
-from zope.interface import directlyProvides, implementer, provider
+from zope.interface import directlyProvides, implementer
from constantly import NamedConstant, Names
from incremental import Version
from twisted.internet import defer, error, fdesc, interfaces, threads
-from twisted.internet.abstract import isIPAddress, isIPv6Address
+from twisted.internet.abstract import isIPv6Address
from twisted.internet.address import (
HostnameAddress,
IPv4Address,
@@ -37,9 +38,13 @@ from twisted.internet.interfaces import (
IAddress,
IHostnameResolver,
IHostResolution,
+ IOpenSSLClientConnectionCreator,
+ IProtocol,
+ IProtocolFactory,
IReactorPluggableNameResolver,
IReactorSocket,
IResolutionReceiver,
+ IStreamClientEndpoint,
IStreamClientEndpointStringParserWithReactor,
IStreamServerEndpointStringParser,
)
@@ -201,14 +206,16 @@ class _WrappingFactory(ClientFactory):
# Type is wrong. See https://twistedmatrix.com/trac/ticket/10005#ticket
protocol = _WrappingProtocol # type: ignore[assignment]
- def __init__(self, wrappedFactory):
+ def __init__(self, wrappedFactory: IProtocolFactory) -> None:
"""
@param wrappedFactory: A provider of I{IProtocolFactory} whose
buildProtocol method will be called and whose resulting protocol
will be wrapped.
"""
self._wrappedFactory = wrappedFactory
- self._onConnection = defer.Deferred(canceller=self._canceller)
+ self._onConnection: defer.Deferred[IProtocol] = defer.Deferred(
+ canceller=self._canceller
+ )
def startedConnecting(self, connector):
"""
@@ -567,7 +574,14 @@ class TCP4ClientEndpoint:
TCP client endpoint with an IPv4 configuration.
"""
- def __init__(self, reactor, host, port, timeout=30, bindAddress=None):
+ def __init__(
+ self,
+ reactor: Any,
+ host: str,
+ port: int,
+ timeout: float = 30,
+ bindAddress: str | tuple[bytes | str, int] | None = None,
+ ) -> None:
"""
@param reactor: An L{IReactorTCP} provider
@@ -591,7 +605,7 @@ class TCP4ClientEndpoint:
self._timeout = timeout
self._bindAddress = bindAddress
- def connect(self, protocolFactory):
+ def connect(self, protocolFactory: IProtocolFactory) -> Deferred[IProtocol]:
"""
Implement L{IStreamClientEndpoint.connect} to connect via TCP.
"""
@@ -665,7 +679,9 @@ class TCP6ClientEndpoint:
"""
return self._deferToThread(self._getaddrinfo, host, 0, socket.AF_INET6)
- def _resolvedHostConnect(self, resolvedHost, protocolFactory):
+ def _resolvedHostConnect(
+ self, resolvedHost: str, protocolFactory: IProtocolFactory
+ ) -> Deferred[IProtocol]:
"""
Connect to the server using the resolved hostname.
"""
@@ -774,10 +790,6 @@ class HostnameEndpoint:
associated with this endpoint.
@type _hostBytes: L{bytes}
- @ivar _hostStr: the native-string representation of the hostname passed to
- the constructor, used for exception construction
- @type _hostStr: native L{str}
-
@ivar _badHostname: a flag - hopefully false! - indicating that an invalid
hostname was passed to the constructor. This might be a textual
hostname that isn't valid IDNA, or non-ASCII bytes.
@@ -789,8 +801,14 @@ class HostnameEndpoint:
_DEFAULT_ATTEMPT_DELAY = 0.3
def __init__(
- self, reactor, host, port, timeout=30, bindAddress=None, attemptDelay=None
- ):
+ self,
+ reactor: Any,
+ host: str | bytes,
+ port: int,
+ timeout: float = 30,
+ bindAddress: bytes | str | tuple[bytes | str, int] | None = None,
+ attemptDelay: float | None = None,
+ ) -> None:
"""
Create a L{HostnameEndpoint}.
@@ -799,7 +817,7 @@ class HostnameEndpoint:
L{IReactorPluggableNameResolver} or L{IReactorPluggableResolver}.
@param host: A hostname to connect to.
- @type host: L{bytes} or L{unicode}
+ @type host: L{bytes} or L{str}
@param port: The port number to connect to.
@type port: L{int}
@@ -833,9 +851,9 @@ class HostnameEndpoint:
[self._badHostname, self._hostBytes, self._hostText] = self._hostAsBytesAndText(
host
)
- self._hostStr = self._hostBytes if bytes is str else self._hostText
self._port = port
self._timeout = timeout
+
if bindAddress is not None:
if isinstance(bindAddress, (bytes, str)):
bindAddress = (bindAddress, 0)
@@ -852,21 +870,25 @@ class HostnameEndpoint:
@return: A L{str}
"""
- if self._badHostname:
- # Use the backslash-encoded version of the string passed to the
- # constructor, which is already a native string.
- host = self._hostStr
- elif isIPv6Address(self._hostStr):
- host = f"[{self._hostStr}]"
- else:
- # Convert the bytes representation to a native string to ensure
- # that we display the punycoded version of the hostname, which is
- # more useful than any IDN version as it can be easily copy-pasted
- # into debugging tools.
- host = nativeString(self._hostBytes)
- return "".join(["<HostnameEndpoint ", host, ":", str(self._port), ">"])
+ host = (
+ # It the hostname is bad, use the backslash-encoded version of the
+ # string passed to the constructor, which is already a string.
+ self._hostText
+ if self._badHostname
+ else (
+ # Add some square brackets if it's an IPv6 address.
+ f"[{self._hostText}]"
+ if isIPv6Address(self._hostText)
+ # Convert the bytes representation to a native string to ensure
+ # that we display the punycoded version of the hostname, which is
+ # more useful than any IDN version as it can be easily copy-pasted
+ # into debugging tools.
+ else self._hostBytes.decode("ascii")
+ )
+ )
+ return f"<HostnameEndpoint {host}:{self._port}>"
- def _getNameResolverAndMaybeWarn(self, reactor):
+ def _getNameResolverAndMaybeWarn(self, reactor: object) -> IHostnameResolver:
"""
Retrieve a C{nameResolver} callable and warn the caller's
caller that using a reactor which doesn't provide
@@ -894,7 +916,7 @@ class HostnameEndpoint:
return reactor.nameResolver
@staticmethod
- def _hostAsBytesAndText(host):
+ def _hostAsBytesAndText(host: bytes | str) -> tuple[bool, bytes, str]:
"""
For various reasons (documented in the C{@ivar}'s in the class
docstring) we need both a textual and a binary representation of the
@@ -906,39 +928,36 @@ class HostnameEndpoint:
this up in the future and just operate in terms of text internally.
@param host: A hostname to convert.
- @type host: L{bytes} or C{str}
@return: a 3-tuple of C{(invalid, bytes, text)} where C{invalid} is a
boolean indicating the validity of the hostname, C{bytes} is a
binary representation of C{host}, and C{text} is a textual
representation of C{host}.
"""
+ invalid = False
if isinstance(host, bytes):
- if isIPAddress(host) or isIPv6Address(host):
- return False, host, host.decode("ascii")
- else:
- try:
- return False, host, _idnaText(host)
- except UnicodeError:
- # Convert the host to _some_ kind of text, to handle below.
- host = host.decode("charmap")
+ hostBytes = host
+ try:
+ hostText = _idnaText(hostBytes)
+ except UnicodeError:
+ hostText = hostBytes.decode("charmap")
+ if not isIPv6Address(hostText):
+ invalid = True
else:
- host = normalize("NFC", host)
- if isIPAddress(host) or isIPv6Address(host):
- return False, host.encode("ascii"), host
+ hostText = normalize("NFC", host)
+ if isIPv6Address(hostText):
+ hostBytes = hostText.encode("ascii")
else:
try:
- return False, _idnaBytes(host), host
+ hostBytes = _idnaBytes(hostText)
except UnicodeError:
- pass
- # `host` has been converted to text by this point either way; it's
- # invalid as a hostname, and so may contain unprintable characters and
- # such. escape it with backslashes so the user can get _some_ guess as
- # to what went wrong.
- asciibytes = host.encode("ascii", "backslashreplace")
- return True, asciibytes, asciibytes.decode("ascii")
+ invalid = True
+ if invalid:
+ hostBytes = hostText.encode("ascii", "backslashreplace")
+ hostText = hostBytes.decode("ascii")
+ return invalid, hostBytes, hostText
- def connect(self, protocolFactory):
+ def connect(self, protocolFactory: IProtocolFactory) -> Deferred[IProtocol]:
"""
Attempts a connection to each resolved address, and returns a
connection which is established first.
@@ -952,37 +971,38 @@ class HostnameEndpoint:
or fails a connection-related error.
"""
if self._badHostname:
- return defer.fail(ValueError(f"invalid hostname: {self._hostStr}"))
+ return defer.fail(ValueError(f"invalid hostname: {self._hostText}"))
- d = Deferred()
- addresses = []
+ resolved: Deferred[list[IAddress]] = Deferred()
+ addresses: list[IAddress] = []
- @provider(IResolutionReceiver)
+ @implementer(IResolutionReceiver)
class EndpointReceiver:
@staticmethod
- def resolutionBegan(resolutionInProgress):
+ def resolutionBegan(resolutionInProgress: IHostResolution) -> None:
pass
@staticmethod
- def addressResolved(address):
+ def addressResolved(address: IAddress) -> None:
addresses.append(address)
@staticmethod
- def resolutionComplete():
- d.callback(addresses)
+ def resolutionComplete() -> None:
+ resolved.callback(addresses)
self._nameResolver.resolveHostName(
- EndpointReceiver, self._hostText, portNumber=self._port
+ EndpointReceiver(), self._hostText, portNumber=self._port
)
- d.addErrback(
+ resolved.addErrback(
lambda ignored: defer.fail(
- error.DNSLookupError(f"Couldn't find the hostname '{self._hostStr}'")
+ error.DNSLookupError(f"Couldn't find the hostname '{self._hostText}'")
)
)
- @d.addCallback
- def resolvedAddressesToEndpoints(addresses):
+ def resolvedAddressesToEndpoints(
+ addresses: Iterable[IAddress],
+ ) -> Iterable[TCP6ClientEndpoint | TCP4ClientEndpoint]:
# Yield an endpoint for every address resolved from the name.
for eachAddress in addresses:
if isinstance(eachAddress, IPv6Address):
@@ -1002,22 +1022,24 @@ class HostnameEndpoint:
self._bindAddress,
)
- d.addCallback(list)
+ iterd = resolved.addCallback(resolvedAddressesToEndpoints)
+ listd = iterd.addCallback(list)
- def _canceller(d):
+ def _canceller(cancelled: Deferred[IProtocol]) -> None:
# This canceller must remain defined outside of
# `startConnectionAttempts`, because Deferred should not
# participate in cycles with their cancellers; that would create a
# potentially problematic circular reference and possibly
# gc.garbage.
- d.errback(
+ cancelled.errback(
error.ConnectingCancelledError(
HostnameAddress(self._hostBytes, self._port)
)
)
- @d.addCallback
- def startConnectionAttempts(endpoints):
+ def startConnectionAttempts(
+ endpoints: list[TCP6ClientEndpoint | TCP4ClientEndpoint],
+ ) -> Deferred[IProtocol]:
"""
Given a sequence of endpoints obtained via name resolution, start
connecting to a new one every C{self._attemptDelay} seconds until
@@ -1037,62 +1059,68 @@ class HostnameEndpoint:
"""
if not endpoints:
raise error.DNSLookupError(
- f"no results for hostname lookup: {self._hostStr}"
+ f"no results for hostname lookup: {self._hostText}"
)
iterEndpoints = iter(endpoints)
- pending = []
- failures = []
- winner = defer.Deferred(canceller=_canceller)
+ pending: list[defer.Deferred[IProtocol]] = []
+ failures: list[Failure] = []
+ winner: defer.Deferred[IProtocol] = defer.Deferred(canceller=_canceller)
- def checkDone():
- if pending or checkDone.completed or checkDone.endpointsLeft:
+ checkDoneCompleted = False
+ checkDoneEndpointsLeft = True
+
+ def checkDone() -> None:
+ if pending or checkDoneCompleted or checkDoneEndpointsLeft:
return
winner.errback(failures.pop())
- checkDone.completed = False
- checkDone.endpointsLeft = True
-
@LoopingCall
- def iterateEndpoint():
+ def iterateEndpoint() -> None:
+ nonlocal checkDoneEndpointsLeft
endpoint = next(iterEndpoints, None)
if endpoint is None:
# The list of endpoints ends.
- checkDone.endpointsLeft = False
+ checkDoneEndpointsLeft = False
checkDone()
return
eachAttempt = endpoint.connect(protocolFactory)
pending.append(eachAttempt)
- @eachAttempt.addBoth
- def noLongerPending(result):
+ def noLongerPending(result: IProtocol | Failure) -> IProtocol | Failure:
pending.remove(eachAttempt)
return result
- @eachAttempt.addCallback
- def succeeded(result):
+ successState = eachAttempt.addBoth(noLongerPending)
+
+ def succeeded(result: IProtocol) -> None:
winner.callback(result)
- @eachAttempt.addErrback
+ successState.addCallback(succeeded)
+
def failed(reason):
failures.append(reason)
checkDone()
+ successState.addErrback(failed)
+
iterateEndpoint.clock = self._reactor
iterateEndpoint.start(self._attemptDelay)
- @winner.addBoth
- def cancelRemainingPending(result):
- checkDone.completed = True
+ def cancelRemainingPending(
+ result: IProtocol | Failure,
+ ) -> IProtocol | Failure:
+ nonlocal checkDoneCompleted
+ checkDoneCompleted = True
for remaining in pending[:]:
remaining.cancel()
if iterateEndpoint.running:
iterateEndpoint.stop()
return result
- return winner
+ return winner.addBoth(cancelRemainingPending)
- return d
+ return listd.addCallback(startConnectionAttempts)
def _fallbackNameResolution(self, host, port):
"""
@@ -2218,7 +2246,10 @@ class _WrapperServerEndpoint:
return self._wrappedEndpoint.listen(self._wrapperFactory(protocolFactory))
-def wrapClientTLS(connectionCreator, wrappedEndpoint):
+def wrapClientTLS(
+ connectionCreator: IOpenSSLClientConnectionCreator,
+ wrappedEndpoint: IStreamClientEndpoint,
+) -> _WrapperEndpoint:
"""
Wrap an endpoint which upgrades to TLS as soon as the connection is
established.
@@ -2250,17 +2281,17 @@ def wrapClientTLS(connectionCreator, wrappedEndpoint):
def _parseClientTLS(
- reactor,
- host,
- port,
- timeout=b"30",
- bindAddress=None,
- certificate=None,
- privateKey=None,
- trustRoots=None,
- endpoint=None,
- **kwargs,
-):
+ reactor: Any,
+ host: bytes | str,
+ port: bytes | str,
+ timeout: bytes | str = b"30",
+ bindAddress: bytes | str | None = None,
+ certificate: bytes | str | None = None,
+ privateKey: bytes | str | None = None,
+ trustRoots: bytes | str | None = None,
+ endpoint: bytes | str | None = None,
+ **kwargs: object,
+) -> IStreamClientEndpoint:
"""
Internal method to construct an endpoint from string parameters.
@@ -2303,18 +2334,24 @@ def _parseClientTLS(
if isinstance(bindAddress, str) or bindAddress is None
else bindAddress.decode("utf-8")
)
- port = int(port)
- timeout = int(timeout)
+ portint = int(port)
+ timeoutint = int(timeout)
return wrapClientTLS(
optionsForClientTLS(
host,
trustRoot=_parseTrustRootPath(trustRoots),
clientCertificate=_privateCertFromPaths(certificate, privateKey),
),
- clientFromString(reactor, endpoint)
- if endpoint is not None
- else HostnameEndpoint(
- reactor, _idnaBytes(host), port, timeout, (bindAddress, 0)
+ (
+ clientFromString(reactor, endpoint)
+ if endpoint is not None
+ else HostnameEndpoint(
+ reactor,
+ _idnaBytes(host),
+ portint,
+ timeoutint,
+ None if bindAddress is None else (bindAddress, 0),
+ )
),
)
diff --git a/contrib/python/Twisted/py3/twisted/internet/interfaces.py b/contrib/python/Twisted/py3/twisted/internet/interfaces.py
index 78380ccc39..28ab9bffbd 100644
--- a/contrib/python/Twisted/py3/twisted/internet/interfaces.py
+++ b/contrib/python/Twisted/py3/twisted/internet/interfaces.py
@@ -688,7 +688,10 @@ class IResolver(IResolverSimple):
class IReactorTCP(Interface):
def listenTCP(
- port: int, factory: "ServerFactory", backlog: int, interface: str
+ port: int,
+ factory: "ServerFactory",
+ backlog: int = 50,
+ interface: str = "",
) -> "IListeningPort":
"""
Connects a given protocol factory to the given numeric TCP/IP port.
@@ -712,8 +715,8 @@ class IReactorTCP(Interface):
host: str,
port: int,
factory: "ClientFactory",
- timeout: float,
- bindAddress: Optional[Tuple[str, int]],
+ timeout: float = 30.0,
+ bindAddress: Optional[Tuple[str, int]] = None,
) -> IConnector:
"""
Connect a TCP client.
@@ -784,7 +787,10 @@ class IReactorUNIX(Interface):
"""
def connectUNIX(
- address: str, factory: "ClientFactory", timeout: float, checkPID: bool
+ address: str,
+ factory: "ClientFactory",
+ timeout: float = 30,
+ checkPID: bool = False,
) -> IConnector:
"""
Connect a client protocol to a UNIX socket.
@@ -801,7 +807,11 @@ class IReactorUNIX(Interface):
"""
def listenUNIX(
- address: str, factory: "Factory", backlog: int, mode: int, wantPID: bool
+ address: str,
+ factory: "Factory",
+ backlog: int = 50,
+ mode: int = 0o666,
+ wantPID: bool = False,
) -> "IListeningPort":
"""
Listen on a UNIX socket.
@@ -923,21 +933,35 @@ class IReactorMulticast(Interface):
def listenMulticast(
port: int,
protocol: "DatagramProtocol",
- interface: str,
- maxPacketSize: int,
- listenMultiple: bool,
- ) -> "IListeningPort":
+ interface: str = "",
+ maxPacketSize: int = 8192,
+ listenMultiple: bool = False,
+ ) -> IMulticastTransport:
"""
Connects a given
L{DatagramProtocol<twisted.internet.protocol.DatagramProtocol>} to the
given numeric UDP port.
- @param listenMultiple: If set to True, allows multiple sockets to
- bind to the same address and port number at the same time.
+ @param port: The port number to bind to.
- @returns: An object which provides L{IListeningPort}.
+ @param protocol: the datagram receiver that will receive multicast
+ packets sent to the given interface and port.
+
+ @param interface: The IP address literal of the network interface to
+ bind to. By default, this will be C{"0.0.0.0"}, i.e. all IPv4
+ interfaces. Note that the format of this literal determines the
+ address family of the resulting multicast transport: passing an
+ IPv6 literal, such as C{"::"}, will result in an IPv6 multicast
+ transport.
+
+ @param maxPacketSize: The maximum packet size to accept.
+
+ @param listenMultiple: If set to True, allows multiple sockets to bind
+ to the same address and port number at the same time.
+
+ @returns: An L{IMulticastTransport} that can send multicast traffic to
+ C{interface}.
- @see: L{twisted.internet.interfaces.IMulticastTransport}
@see: U{http://twistedmatrix.com/documents/current/core/howto/udp.html}
"""
@@ -1330,10 +1354,25 @@ class IReactorCore(Interface):
"I{during shutdown} and C{False} the rest of the time."
)
- def resolve(name: str, timeout: Sequence[int]) -> "Deferred[str]":
+ def resolve(name: str, timeout: Sequence[int] = (1, 3, 11, 45)) -> "Deferred[str]":
"""
- Return a L{twisted.internet.defer.Deferred} that will resolve
- a hostname.
+ Asynchronously resolve a hostname to a single IPv4 address.
+
+ @note: Rather than calling this API directly, you probably want to use
+ L{twisted.internet.endpoints.HostnameEndpoint} to connect to a
+ hostname. If you do want to resolve a hostname without connecting
+ to it, see L{IReactorPluggableNameResolver} and
+ L{IHostnameResolver} so that you can receive multiple results and
+ IPv6 addresses.
+
+ @param name: The hostname to resolve.
+
+ @param timeout: A sequence of timeouts, meant to mirror the sequence of
+ timeouts used for each hop in recursive queries. Note that
+ different implementations of the resolver backend may not honor
+ this timeout as such, or at all; if the underlying platform API
+ supports it, implementations make a best-effort attempt to cancel
+ the underlying resolution if the sum of these timeouts elapses.
"""
def run() -> None:
@@ -1477,7 +1516,7 @@ class IReactorPluggableNameResolver(Interface):
set to a user-supplied object.
"""
- nameResolver = Attribute(
+ nameResolver: IHostnameResolver = Attribute(
"""
Read-only attribute; the resolver installed with L{installResolver}.
An L{IHostnameResolver}.
@@ -2569,21 +2608,25 @@ class IUNIXDatagramConnectedTransport(Interface):
"""
-class IMulticastTransport(Interface):
+class IMulticastTransport(IUDPTransport):
"""
Additional functionality for multicast UDP.
"""
- def getOutgoingInterface() -> str:
+ def getOutgoingInterface() -> str | int:
"""
Return interface of outgoing multicast packets.
"""
- def setOutgoingInterface(addr: str) -> None:
+ def setOutgoingInterface(addr: str | int) -> Deferred[int]:
"""
Set interface for outgoing multicast packets.
- Returns Deferred of success.
+ @note: For IPv4 multicast sockets, the address must be a hostname or IP
+ address. For IPv6 multicast sockets, the address must be an
+ interface index, as described in L{socket.if_nameindex}.
+
+ @returns: Deferred of (1: success, 0: failure).
"""
def getLoopbackMode() -> bool:
@@ -2606,7 +2649,7 @@ class IMulticastTransport(Interface):
Set time to live on multicast packets.
"""
- def joinGroup(addr: str, interface: str) -> "Deferred[None]":
+ def joinGroup(addr: str, interface: str = "") -> "Deferred[None]":
"""
Join a multicast group. Returns L{Deferred} of success or failure.
@@ -2614,7 +2657,7 @@ class IMulticastTransport(Interface):
L{error.MulticastJoinError}.
"""
- def leaveGroup(addr: str, interface: str) -> "Deferred[None]":
+ def leaveGroup(addr: str, interface: str = "") -> "Deferred[None]":
"""
Leave multicast group, return L{Deferred} of success.
"""
diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/reactor.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/reactor.py
index e9c3716219..976d1096e7 100644
--- a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/reactor.py
+++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/reactor.py
@@ -139,7 +139,9 @@ class IOCPReactor(base.ReactorBase, _ThreadedWin32EventsMixin):
def registerHandle(self, handle):
self.port.addHandle(handle, KEY_NORMAL)
- def createSocket(self, af, stype):
+ def createSocket(
+ self, af: socket.AddressFamily, stype: socket.SocketKind
+ ) -> socket.socket:
skt = socket.socket(af, stype)
self.registerHandle(skt.fileno())
return skt
diff --git a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/udp.py b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/udp.py
index 59c5fefb4b..744efa03e1 100644
--- a/contrib/python/Twisted/py3/twisted/internet/iocpreactor/udp.py
+++ b/contrib/python/Twisted/py3/twisted/internet/iocpreactor/udp.py
@@ -5,15 +5,18 @@
UDP support for IOCP reactor
"""
+from __future__ import annotations
+
import errno
import socket
import struct
import warnings
-from typing import Optional
+from typing import TYPE_CHECKING, Optional
from zope.interface import implementer
from twisted.internet import address, defer, error, interfaces
+from twisted.internet._multicast import MulticastMixin
from twisted.internet.abstract import isIPAddress, isIPv6Address
from twisted.internet.iocpreactor import abstract, iocpsupport as _iocp
from twisted.internet.iocpreactor.const import (
@@ -22,7 +25,11 @@ from twisted.internet.iocpreactor.const import (
ERROR_PORT_UNREACHABLE,
)
from twisted.internet.iocpreactor.interfaces import IReadWriteHandle
-from twisted.python import failure, log
+from twisted.internet.protocol import AbstractDatagramProtocol
+from twisted.python import log
+
+if TYPE_CHECKING:
+ from twisted.internet.iocpreactor.reactor import IOCPReactor
@implementer(
@@ -39,6 +46,7 @@ class Port(abstract.FileHandle):
whether this port is listening on an IPv4 address or an IPv6 address.
"""
+ reactor: IOCPReactor
addressFamily = socket.AF_INET
socketType = socket.SOCK_DGRAM
dynamicReadBuffers = False
@@ -47,7 +55,14 @@ class Port(abstract.FileHandle):
# value when we are actually listening.
_realPortNumber: Optional[int] = None
- def __init__(self, port, proto, interface="", maxPacketSize=8192, reactor=None):
+ def __init__(
+ self,
+ port: int,
+ proto: AbstractDatagramProtocol,
+ interface: str = "",
+ maxPacketSize: int = 8192,
+ reactor: IOCPReactor | None = None,
+ ) -> None:
"""
Initialize with a numeric port to listen on.
"""
@@ -102,7 +117,7 @@ class Port(abstract.FileHandle):
self._bindSocket()
self._connectToProtocol()
- def createSocket(self):
+ def createSocket(self) -> socket.socket:
return self.reactor.createSocket(self.addressFamily, self.socketType)
def _bindSocket(self):
@@ -339,68 +354,6 @@ class Port(abstract.FileHandle):
return bool(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST))
-class MulticastMixin:
- """
- Implement multicast functionality.
- """
-
- def getOutgoingInterface(self):
- i = self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF)
- return socket.inet_ntoa(struct.pack("@i", i))
-
- def setOutgoingInterface(self, addr):
- """
- Returns Deferred of success.
- """
- return self.reactor.resolve(addr).addCallback(self._setInterface)
-
- def _setInterface(self, addr):
- i = socket.inet_aton(addr)
- self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, i)
- return 1
-
- def getLoopbackMode(self):
- return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP)
-
- def setLoopbackMode(self, mode):
- mode = struct.pack("b", bool(mode))
- self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, mode)
-
- def getTTL(self):
- return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL)
-
- def setTTL(self, ttl):
- ttl = struct.pack("B", ttl)
- self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
-
- def joinGroup(self, addr, interface=""):
- """
- Join a multicast group. Returns Deferred of success.
- """
- return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 1)
-
- def _joinAddr1(self, addr, interface, join):
- return self.reactor.resolve(interface).addCallback(self._joinAddr2, addr, join)
-
- def _joinAddr2(self, interface, addr, join):
- addr = socket.inet_aton(addr)
- interface = socket.inet_aton(interface)
- if join:
- cmd = socket.IP_ADD_MEMBERSHIP
- else:
- cmd = socket.IP_DROP_MEMBERSHIP
- try:
- self.socket.setsockopt(socket.IPPROTO_IP, cmd, addr + interface)
- except OSError as e:
- return failure.Failure(error.MulticastJoinError(addr, interface, *e.args))
-
- def leaveGroup(self, addr, interface=""):
- """
- Leave multicast group, return Deferred of success.
- """
- return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 0)
-
-
@implementer(interfaces.IMulticastTransport)
class MulticastPort(MulticastMixin, Port):
"""
@@ -409,17 +362,17 @@ class MulticastPort(MulticastMixin, Port):
def __init__(
self,
- port,
- proto,
- interface="",
- maxPacketSize=8192,
- reactor=None,
- listenMultiple=False,
- ):
+ port: int,
+ proto: AbstractDatagramProtocol,
+ interface: str = "",
+ maxPacketSize: int = 8192,
+ reactor: IOCPReactor | None = None,
+ listenMultiple: bool = False,
+ ) -> None:
Port.__init__(self, port, proto, interface, maxPacketSize, reactor)
self.listenMultiple = listenMultiple
- def createSocket(self):
+ def createSocket(self) -> socket.socket:
skt = Port.createSocket(self)
if self.listenMultiple:
skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
diff --git a/contrib/python/Twisted/py3/twisted/internet/posixbase.py b/contrib/python/Twisted/py3/twisted/internet/posixbase.py
index 3be65752a2..d9fa8a9a55 100644
--- a/contrib/python/Twisted/py3/twisted/internet/posixbase.py
+++ b/contrib/python/Twisted/py3/twisted/internet/posixbase.py
@@ -6,6 +6,7 @@
Posix reactor base class
"""
+from __future__ import annotations
import socket
import sys
@@ -16,6 +17,7 @@ from zope.interface import classImplements, implementer
from twisted.internet import error, tcp, udp
from twisted.internet.base import ReactorBase
from twisted.internet.interfaces import (
+ IConnector,
IHalfCloseableDescriptor,
IReactorFDSet,
IReactorMulticast,
@@ -28,6 +30,7 @@ from twisted.internet.interfaces import (
IReactorUNIXDatagram,
)
from twisted.internet.main import CONNECTION_DONE, CONNECTION_LOST
+from twisted.internet.protocol import ClientFactory
from twisted.python import failure, log
from twisted.python.runtime import platform, platformType
from ._signals import (
@@ -363,7 +366,14 @@ class PosixReactorBase(_DisconnectSelectableMixin, ReactorBase):
p.startListening()
return p
- def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
+ def connectTCP(
+ self,
+ host: str,
+ port: int,
+ factory: "ClientFactory",
+ timeout: float = 30.0,
+ bindAddress: tuple[str, int] | None = None,
+ ) -> IConnector:
c = tcp.Connector(host, port, factory, timeout, bindAddress, self)
c.connect()
return c
diff --git a/contrib/python/Twisted/py3/twisted/internet/protocol.py b/contrib/python/Twisted/py3/twisted/internet/protocol.py
index 4fcf0e1038..bd1e647302 100644
--- a/contrib/python/Twisted/py3/twisted/internet/protocol.py
+++ b/contrib/python/Twisted/py3/twisted/internet/protocol.py
@@ -8,7 +8,7 @@ Standard implementations of Twisted protocol-related interfaces.
Start here if you are looking to write a new protocol implementation for
Twisted. The Protocol class contains some introductory material.
"""
-
+from __future__ import annotations
import random
from typing import Any, Callable, Optional
@@ -16,7 +16,12 @@ from typing import Any, Callable, Optional
from zope.interface import implementer
from twisted.internet import defer, error, interfaces
-from twisted.internet.interfaces import IAddress, ITransport
+from twisted.internet.interfaces import (
+ IAddress,
+ IMulticastTransport,
+ ITransport,
+ IUDPTransport,
+)
from twisted.logger import _loggerFor
from twisted.python import components, failure, log
@@ -686,7 +691,7 @@ class AbstractDatagramProtocol:
UDP.
"""
- transport = None
+ transport: IUDPTransport | IMulticastTransport | None = None
numPorts = 0
noisy = True
@@ -735,7 +740,7 @@ class AbstractDatagramProtocol:
Will only be called once, after all ports are disconnected.
"""
- def makeConnection(self, transport):
+ def makeConnection(self, transport: IUDPTransport) -> None:
"""
Make a connection to a transport and a server.
diff --git a/contrib/python/Twisted/py3/twisted/internet/tcp.py b/contrib/python/Twisted/py3/twisted/internet/tcp.py
index 018d1912d2..7bc4d1eaac 100644
--- a/contrib/python/Twisted/py3/twisted/internet/tcp.py
+++ b/contrib/python/Twisted/py3/twisted/internet/tcp.py
@@ -14,7 +14,7 @@ import os
import socket
import struct
import sys
-from typing import Callable, ClassVar, List, Optional, Union
+from typing import Any, Callable, ClassVar, List, Optional, Union
from zope.interface import Interface, implementer
@@ -29,9 +29,9 @@ from twisted.internet.interfaces import (
ISystemHandle,
ITCPTransport,
)
+from twisted.internet.protocol import ClientFactory
from twisted.logger import ILogObserver, LogEvent, Logger
from twisted.python import deprecate, versions
-from twisted.python.compat import lazyByteSlice
from twisted.python.runtime import platformType
try:
@@ -275,7 +275,7 @@ class Connection(
"""
# Limit length of buffer to try to send, because some OSes are too
# stupid to do so themselves (ahem windows)
- limitedData = lazyByteSlice(data, 0, self.SEND_LIMIT)
+ limitedData = memoryview(data)[: self.SEND_LIMIT]
try:
return untilConcludes(self.socket.send, limitedData)
@@ -337,7 +337,18 @@ class Connection(
return bool(self.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY))
def setTcpNoDelay(self, enabled):
- self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled)
+ try:
+ # There are bug reports about failures when setting TCP_NODELAY under certain conditions
+ # on macOS: https://github.com/thespianpy/Thespian/issues/70,
+ # https://github.com/envoyproxy/envoy/issues/1446.
+ #
+ # It is reasonable to simply eat errors coming from setting TCP_NODELAY because
+ # TCP_NODELAY is relatively small performance optimization. In almost all cases the
+ # caller will not be able to do anything to remedy the situation and will simply
+ # continue.
+ self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled)
+ except OSError as e: # pragma: no cover
+ log.err(e, "got error when setting TCP_NODELAY on TCP socket")
def getTcpKeepAlive(self):
return bool(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE))
@@ -1506,9 +1517,17 @@ class Connector(base.BaseConnector):
@type _addressType: C{type}
"""
- _addressType = address.IPv4Address
+ _addressType: type[address.IPv4Address | address.IPv6Address] = address.IPv4Address
- def __init__(self, host, port, factory, timeout, bindAddress, reactor=None):
+ def __init__(
+ self,
+ host: str,
+ port: int | str,
+ factory: ClientFactory,
+ timeout: float,
+ bindAddress: str | tuple[str, int] | None,
+ reactor: Any = None,
+ ) -> None:
if isinstance(port, str):
try:
port = socket.getservbyname(port, "tcp")
@@ -1520,7 +1539,7 @@ class Connector(base.BaseConnector):
self.bindAddress = bindAddress
base.BaseConnector.__init__(self, factory, timeout, reactor)
- def _makeTransport(self):
+ def _makeTransport(self) -> Client:
"""
Create a L{Client} bound to this L{Connector}.
diff --git a/contrib/python/Twisted/py3/twisted/internet/udp.py b/contrib/python/Twisted/py3/twisted/internet/udp.py
index 7601f2dc84..6ef67f323c 100644
--- a/contrib/python/Twisted/py3/twisted/internet/udp.py
+++ b/contrib/python/Twisted/py3/twisted/internet/udp.py
@@ -14,16 +14,18 @@ Please do not use this module directly.
@var _sockErrReadRefuse: list of symbolic error constants (from the C{errno}
module) representing socket errors that indicate connection refused.
"""
-
+from __future__ import annotations
# System Imports
import socket
-import struct
import warnings
from typing import Optional
from zope.interface import implementer
+from twisted.internet._multicast import MulticastMixin
+from twisted.internet.interfaces import IReactorMulticast
+from twisted.internet.protocol import AbstractDatagramProtocol
from twisted.python.runtime import platformType
if platformType == "win32":
@@ -56,7 +58,7 @@ else:
# Twisted Imports
from twisted.internet import abstract, address, base, defer, error, interfaces
-from twisted.python import failure, log
+from twisted.python import log
@implementer(
@@ -81,8 +83,8 @@ class Port(base.BasePort):
L{Port}).
"""
- addressFamily = socket.AF_INET
- socketType = socket.SOCK_DGRAM
+ addressFamily: socket.AddressFamily = socket.AF_INET
+ socketType: socket.SocketKind = socket.SOCK_DGRAM
maxThroughput = 256 * 1024
_realPortNumber: Optional[int] = None
@@ -440,62 +442,6 @@ class Port(base.BasePort):
return bool(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST))
-class MulticastMixin:
- """
- Implement multicast functionality.
- """
-
- def getOutgoingInterface(self):
- i = self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF)
- return socket.inet_ntoa(struct.pack("@i", i))
-
- def setOutgoingInterface(self, addr):
- """Returns Deferred of success."""
- return self.reactor.resolve(addr).addCallback(self._setInterface)
-
- def _setInterface(self, addr):
- i = socket.inet_aton(addr)
- self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, i)
- return 1
-
- def getLoopbackMode(self):
- return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP)
-
- def setLoopbackMode(self, mode):
- mode = struct.pack("b", bool(mode))
- self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, mode)
-
- def getTTL(self):
- return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL)
-
- def setTTL(self, ttl):
- ttl = struct.pack("B", ttl)
- self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
-
- def joinGroup(self, addr, interface=""):
- """Join a multicast group. Returns Deferred of success."""
- return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 1)
-
- def _joinAddr1(self, addr, interface, join):
- return self.reactor.resolve(interface).addCallback(self._joinAddr2, addr, join)
-
- def _joinAddr2(self, interface, addr, join):
- addr = socket.inet_aton(addr)
- interface = socket.inet_aton(interface)
- if join:
- cmd = socket.IP_ADD_MEMBERSHIP
- else:
- cmd = socket.IP_DROP_MEMBERSHIP
- try:
- self.socket.setsockopt(socket.IPPROTO_IP, cmd, addr + interface)
- except OSError as e:
- return failure.Failure(error.MulticastJoinError(addr, interface, *e.args))
-
- def leaveGroup(self, addr, interface=""):
- """Leave multicast group, return Deferred of success."""
- return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 0)
-
-
@implementer(interfaces.IMulticastTransport)
class MulticastPort(MulticastMixin, Port):
"""
@@ -504,20 +450,24 @@ class MulticastPort(MulticastMixin, Port):
def __init__(
self,
- port,
- proto,
- interface="",
- maxPacketSize=8192,
- reactor=None,
- listenMultiple=False,
- ):
+ port: int,
+ proto: AbstractDatagramProtocol,
+ interface: str = "",
+ maxPacketSize: int = 8192,
+ reactor: IReactorMulticast | None = None,
+ listenMultiple: bool = False,
+ ) -> None:
"""
@see: L{twisted.internet.interfaces.IReactorMulticast.listenMulticast}
"""
Port.__init__(self, port, proto, interface, maxPacketSize, reactor)
self.listenMultiple = listenMultiple
- def createInternetSocket(self):
+ def createInternetSocket(self) -> socket.socket:
+ """
+ Override L{Port.createInternetSocket} to configure the socket to honor
+ the C{listenMultiple} argument to L{IReactorMulticast.listenMultiple}.
+ """
skt = Port.createInternetSocket(self)
if self.listenMultiple:
skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
diff --git a/contrib/python/Twisted/py3/twisted/internet/unix.py b/contrib/python/Twisted/py3/twisted/internet/unix.py
index c3fe62b22d..b33ef29974 100644
--- a/contrib/python/Twisted/py3/twisted/internet/unix.py
+++ b/contrib/python/Twisted/py3/twisted/internet/unix.py
@@ -294,6 +294,21 @@ class Server(_SendmsgMixin, tcp.Server):
def getPeer(self):
return address.UNIXAddress(self.hostname or None)
+ def getTcpNoDelay(self):
+ """
+ FIXME:https://github.com/twisted/twisted/issues/12369
+
+ L{twisted.internet.unix.Server} inherits from L{twisted.internet.tcp.Server} which has
+ this method implemented for TCP. For Unix socket, this is just a NOOP to avoid
+ errors for the code that calls TCP specicific methods thinking that the Unix transport
+ is a TCP transport.
+ """
+ return False
+
+ def setTcpNoDelay(self, enabled):
+ # This is not supported on UNIX sockets and therefore silently ignored.
+ pass
+
def _inFilesystemNamespace(path):
"""
@@ -467,6 +482,21 @@ class Client(_SendmsgMixin, tcp.BaseClient):
def getHost(self):
return address.UNIXAddress(None)
+ def getTcpNoDelay(self):
+ """
+ FIXME:https://github.com/twisted/twisted/issues/12369
+
+ L{twisted.internet.unix.Client} inherits from L{twisted.internet.tcp.Client} which has
+ this method implemented for TCP. For Unix socket, this is just a NOOP to avoid
+ errors for the code that calls TCP specicific methods thinking that the Unix transport
+ is a TCP transport.
+ """
+ return False
+
+ def setTcpNoDelay(self, enabled):
+ # This is not supported on UNIX sockets and therefore silently ignored.
+ pass
+
class Connector(base.BaseConnector):
def __init__(self, address, factory, timeout, reactor, checkPID):