aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/internet
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2025-06-22 18:50:56 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2025-06-22 19:04:42 +0300
commitc7cbc6d480c5488ff6e921c709680fd2c1340a10 (patch)
tree10843f44b67c0fb5717ad555556064095f701d8c /contrib/python/Twisted/py3/twisted/internet
parent26d391cdb94d2ce5efc8d0cc5cea7607dc363c0b (diff)
downloadydb-c7cbc6d480c5488ff6e921c709680fd2c1340a10.tar.gz
Intermediate changes
commit_hash:28750b74281710ec1ab5bdc2403c8ab24bdd164b
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/internet')
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/defer.py192
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/endpoints.py40
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/testing.py125
3 files changed, 159 insertions, 198 deletions
diff --git a/contrib/python/Twisted/py3/twisted/internet/defer.py b/contrib/python/Twisted/py3/twisted/internet/defer.py
index 951ca87d6b5..1892eb483ee 100644
--- a/contrib/python/Twisted/py3/twisted/internet/defer.py
+++ b/contrib/python/Twisted/py3/twisted/internet/defer.py
@@ -49,7 +49,7 @@ from twisted.internet.interfaces import IDelayedCall, IReactorTime
from twisted.logger import Logger
from twisted.python import lockfile
from twisted.python.compat import _PYPY, cmp, comparable
-from twisted.python.deprecate import deprecated, warnAboutFunction
+from twisted.python.deprecate import deprecated, deprecatedProperty, warnAboutFunction
from twisted.python.failure import Failure, _extraneous
log = Logger()
@@ -469,12 +469,16 @@ class Deferred(Awaitable[_SelfResultT]):
@type canceller: a 1-argument callable which takes a L{Deferred}. The
return result is ignored.
"""
- self.callbacks: List[_CallbackChain] = []
+ self._callbacks: List[_CallbackChain] = []
self._canceller = canceller
if self.debug:
self._debugInfo = DebugInfo()
self._debugInfo.creator = traceback.format_stack()[:-1]
+ @deprecatedProperty(Version("Twisted", 25, 5, 0))
+ def callbacks(self) -> List[_CallbackChain]:
+ return self._callbacks
+
def addCallbacks(
self,
callback: Union[
@@ -528,7 +532,7 @@ class Deferred(Awaitable[_SelfResultT]):
# Note that this logic is duplicated in addCallbac/addErrback/addBoth
# for performance reasons.
- self.callbacks.append(
+ self._callbacks.append(
(
(callback, callbackArgs, callbackKeywords),
(errback, errbackArgs, errbackKeywords),
@@ -622,7 +626,7 @@ class Deferred(Awaitable[_SelfResultT]):
"""
# This could be implemented as a call to addCallbacks, but doing it
# directly is faster.
- self.callbacks.append(((callback, args, kwargs), (_failthru, (), {})))
+ self._callbacks.append(((callback, args, kwargs), (_failthru, (), {})))
if self.called:
self._runCallbacks()
@@ -664,7 +668,7 @@ class Deferred(Awaitable[_SelfResultT]):
"""
# This could be implemented as a call to addCallbacks, but doing it
# directly is faster.
- self.callbacks.append(((passthru, (), {}), (errback, args, kwargs)))
+ self._callbacks.append(((passthru, (), {}), (errback, args, kwargs)))
if self.called:
self._runCallbacks()
@@ -754,7 +758,7 @@ class Deferred(Awaitable[_SelfResultT]):
# This could be implemented as a call to addCallbacks, but doing it
# directly is faster.
call = (callback, args, kwargs)
- self.callbacks.append((call, call))
+ self._callbacks.append((call, call))
if self.called:
self._runCallbacks()
@@ -1048,8 +1052,8 @@ class Deferred(Awaitable[_SelfResultT]):
finished = True
current._chainedTo = None
- while current.callbacks:
- item = current.callbacks.pop(0)
+ while current._callbacks:
+ item = current._callbacks.pop(0)
if not isinstance(current.result, Failure):
callback, args, kwargs = item[0]
else:
@@ -1123,7 +1127,7 @@ class Deferred(Awaitable[_SelfResultT]):
# running its callbacks right now. Therefore we can
# append to the callbacks list directly instead of
# using addCallbacks.
- currentResult.callbacks.append(current._continuation())
+ currentResult._callbacks.append(current._continuation())
break
else:
# Yep, it did. Steal it.
@@ -1729,170 +1733,6 @@ SUCCESS = True
FAILURE = False
-## deferredGenerator
-class waitForDeferred:
- """
- See L{deferredGenerator}.
- """
-
- result: Any = _NO_RESULT
-
- def __init__(self, d: Deferred[object]) -> None:
- warnings.warn(
- "twisted.internet.defer.waitForDeferred was deprecated in "
- "Twisted 15.0.0; please use twisted.internet.defer.inlineCallbacks "
- "instead",
- DeprecationWarning,
- stacklevel=2,
- )
-
- if not isinstance(d, Deferred):
- raise TypeError(
- f"You must give waitForDeferred a Deferred. You gave it {d!r}."
- )
- self.d = d
-
- def getResult(self) -> Any:
- if isinstance(self.result, Failure):
- self.result.raiseException()
- self.result is not _NO_RESULT
- return self.result
-
-
-_DeferableGenerator = Generator[object, None, None]
-
-
-def _deferGenerator(
- g: _DeferableGenerator, deferred: Deferred[object]
-) -> Deferred[Any]:
- """
- See L{deferredGenerator}.
- """
-
- result = None
-
- # This function is complicated by the need to prevent unbounded recursion
- # arising from repeatedly yielding immediately ready deferreds. This while
- # loop and the waiting variable solve that by manually unfolding the
- # recursion.
-
- # defgen is waiting for result? # result
- # type note: List[Any] because you can't annotate List items by index.
- # …better fix would be to create a class, but we need to jettison
- # deferredGenerator anyway.
- waiting: List[Any] = [True, None]
-
- while 1:
- try:
- result = next(g)
- except StopIteration:
- deferred.callback(result)
- return deferred
- except BaseException:
- deferred.errback()
- return deferred
-
- # Deferred.callback(Deferred) raises an error; we catch this case
- # early here and give a nicer error message to the user in case
- # they yield a Deferred.
- if isinstance(result, Deferred):
- return fail(TypeError("Yield waitForDeferred(d), not d!"))
-
- if isinstance(result, waitForDeferred):
- # a waitForDeferred was yielded, get the result.
- # Pass result in so it don't get changed going around the loop
- # This isn't a problem for waiting, as it's only reused if
- # gotResult has already been executed.
- def gotResult(
- r: object, result: waitForDeferred = cast(waitForDeferred, result)
- ) -> None:
- result.result = r
- if waiting[0]:
- waiting[0] = False
- waiting[1] = r
- else:
- _deferGenerator(g, deferred)
-
- result.d.addBoth(gotResult)
- if waiting[0]:
- # Haven't called back yet, set flag so that we get reinvoked
- # and return from the loop
- waiting[0] = False
- return deferred
- # Reset waiting to initial values for next loop
- waiting[0] = True
- waiting[1] = None
-
- result = None
-
-
-@deprecated(Version("Twisted", 15, 0, 0), "twisted.internet.defer.inlineCallbacks")
-def deferredGenerator(
- f: Callable[..., _DeferableGenerator]
-) -> Callable[..., Deferred[object]]:
- """
- L{deferredGenerator} and L{waitForDeferred} help you write
- L{Deferred}-using code that looks like a regular sequential function.
- Consider the use of L{inlineCallbacks} instead, which can accomplish
- the same thing in a more concise manner.
-
- There are two important functions involved: L{waitForDeferred}, and
- L{deferredGenerator}. They are used together, like this::
-
- @deferredGenerator
- def thingummy():
- thing = waitForDeferred(makeSomeRequestResultingInDeferred())
- yield thing
- thing = thing.getResult()
- print(thing) #the result! hoorj!
-
- L{waitForDeferred} returns something that you should immediately yield; when
- your generator is resumed, calling C{thing.getResult()} will either give you
- the result of the L{Deferred} if it was a success, or raise an exception if it
- was a failure. Calling C{getResult} is B{absolutely mandatory}. If you do
- not call it, I{your program will not work}.
-
- L{deferredGenerator} takes one of these waitForDeferred-using generator
- functions and converts it into a function that returns a L{Deferred}. The
- result of the L{Deferred} will be the last value that your generator yielded
- unless the last value is a L{waitForDeferred} instance, in which case the
- result will be L{None}. If the function raises an unhandled exception, the
- L{Deferred} will errback instead. Remember that C{return result} won't work;
- use C{yield result; return} in place of that.
-
- Note that not yielding anything from your generator will make the L{Deferred}
- result in L{None}. Yielding a L{Deferred} from your generator is also an error
- condition; always yield C{waitForDeferred(d)} instead.
-
- The L{Deferred} returned from your deferred generator may also errback if your
- generator raised an exception. For example::
-
- @deferredGenerator
- def thingummy():
- thing = waitForDeferred(makeSomeRequestResultingInDeferred())
- yield thing
- thing = thing.getResult()
- if thing == 'I love Twisted':
- # will become the result of the Deferred
- yield 'TWISTED IS GREAT!'
- return
- else:
- # will trigger an errback
- raise Exception('DESTROY ALL LIFE')
-
- Put succinctly, these functions connect deferred-using code with this 'fake
- blocking' style in both directions: L{waitForDeferred} converts from a
- L{Deferred} to the 'blocking' style, and L{deferredGenerator} converts from the
- 'blocking' style to a L{Deferred}.
- """
-
- @wraps(f)
- def unwindGenerator(*args: object, **kwargs: object) -> Deferred[object]:
- return _deferGenerator(f(*args, **kwargs), Deferred())
-
- return unwindGenerator
-
-
## inlineCallbacks
@@ -2154,9 +1994,9 @@ def _addCancelCallbackToDeferred(
@param it: The L{Deferred} to add the errback to.
@param status: a L{_CancellationStatus} tracking the current status of C{gen}
"""
- it.callbacks, tmp = [], it.callbacks
+ it._callbacks, tmp = [], it._callbacks
it = it.addErrback(_handleCancelInlineCallbacks, status)
- it.callbacks.extend(tmp)
+ it._callbacks.extend(tmp)
it.errback(_InternalInlineCallbacksCancelledError())
@@ -2724,8 +2564,6 @@ __all__ = [
"gatherResults",
"maybeDeferred",
"ensureDeferred",
- "waitForDeferred",
- "deferredGenerator",
"inlineCallbacks",
"returnValue",
"DeferredLock",
diff --git a/contrib/python/Twisted/py3/twisted/internet/endpoints.py b/contrib/python/Twisted/py3/twisted/internet/endpoints.py
index dfa0cc43ce8..b73b2acc100 100644
--- a/contrib/python/Twisted/py3/twisted/internet/endpoints.py
+++ b/contrib/python/Twisted/py3/twisted/internet/endpoints.py
@@ -1,4 +1,4 @@
-# -*- test-case-name: twisted.internet.test.test_endpoints -*-
+# -*- test-case-name: twisted.internet.test.test_endpoints.HostnameEndpointMemoryIPv4ReactorTests.test_errorsLogged -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
@@ -18,7 +18,7 @@ import os
import re
import socket
import warnings
-from typing import Any, Iterable, Optional, Sequence, Type
+from typing import Any, Callable, Iterable, List, Optional, Sequence, Tuple, Type, Union
from unicodedata import normalize
from zope.interface import directlyProvides, implementer
@@ -699,6 +699,23 @@ class TCP6ClientEndpoint:
return defer.fail()
+_gairesult = List[
+ Tuple[
+ socket.AddressFamily,
+ socket.SocketKind,
+ int,
+ str,
+ Union[
+ Tuple[str, int],
+ Tuple[str, int, int, int],
+ ],
+ ]
+]
+"""
+Alias for the result type of L{socket.getaddrinfo}C{()}
+"""
+
+
@implementer(IHostnameResolver)
class _SimpleHostnameResolver:
"""
@@ -714,7 +731,9 @@ class _SimpleHostnameResolver:
_log = Logger()
- def __init__(self, nameResolution):
+ def __init__(
+ self, nameResolution: Callable[[str, int], Deferred[_gairesult]]
+ ) -> None:
"""
Create a L{_SimpleHostnameResolver} instance.
"""
@@ -847,7 +866,17 @@ class HostnameEndpoint:
"""
self._reactor = reactor
- self._nameResolver = self._getNameResolverAndMaybeWarn(reactor)
+
+ # We retrieve the actual name resolver to use from the reactor at
+ # C{connect()} time, in case the reactor modifies its name-resolution
+ # configuration after this HostnameEndpoint has been constructed.
+ # However, in order to make any warnings a bit more legible in the much
+ # more common case that the reactor's name resolution is configured
+ # before any endpoints are constructed, this eagerly validates the name
+ # resolver's configuration during endpoint construction but discards
+ # the actual resolver retrieved.
+ self._getNameResolverAndMaybeWarn(reactor)
+
[self._badHostname, self._hostBytes, self._hostText] = self._hostAsBytesAndText(
host
)
@@ -990,7 +1019,8 @@ class HostnameEndpoint:
def resolutionComplete() -> None:
resolved.callback(addresses)
- self._nameResolver.resolveHostName(
+ nameResolver = self._getNameResolverAndMaybeWarn(self._reactor)
+ nameResolver.resolveHostName(
EndpointReceiver(), self._hostText, portNumber=self._port
)
diff --git a/contrib/python/Twisted/py3/twisted/internet/testing.py b/contrib/python/Twisted/py3/twisted/internet/testing.py
index 6563184edf9..a7e1de67286 100644
--- a/contrib/python/Twisted/py3/twisted/internet/testing.py
+++ b/contrib/python/Twisted/py3/twisted/internet/testing.py
@@ -7,6 +7,8 @@ Assorted functionality which is commonly useful when writing unit tests.
"""
from __future__ import annotations
+import typing
+from dataclasses import dataclass
from io import BytesIO
from socket import AF_INET, AF_INET6
from time import time
@@ -33,17 +35,22 @@ from twisted.internet.address import IPv4Address, IPv6Address, UNIXAddress
from twisted.internet.defer import Deferred, ensureDeferred, succeed
from twisted.internet.error import UnsupportedAddressFamily
from twisted.internet.interfaces import (
+ IAddress,
IConnector,
IConsumer,
+ IHostnameResolver,
+ IHostResolution,
IListeningPort,
IProtocol,
IPushProducer,
IReactorCore,
IReactorFDSet,
+ IReactorPluggableNameResolver,
IReactorSocket,
IReactorSSL,
IReactorTCP,
IReactorUNIX,
+ IResolutionReceiver,
ITransport,
)
from twisted.internet.task import Clock
@@ -72,6 +79,14 @@ __all__ = [
_P = ParamSpec("_P")
+class _ProtocolConnectionMadeHaver(typing.Protocol):
+ """
+ Explicit stipulation of the implicit requirement of L{AccumulatingProtocol}'s factory.
+ """
+
+ protocolConnectionMade: Deferred[AccumulatingProtocol] | None
+
+
class AccumulatingProtocol(protocol.Protocol):
"""
L{AccumulatingProtocol} is an L{IProtocol} implementation which collects
@@ -87,26 +102,29 @@ class AccumulatingProtocol(protocol.Protocol):
C{connectionLost} is called.
"""
+ made: int
+ closed: int
made = closed = 0
- closedReason = None
-
- closedDeferred = None
+ closedReason: failure.Failure | None = None
+ closedDeferred: Deferred[None] | None = None
+ data: bytes = b""
- data = b""
+ factory: protocol.Factory | None = None
- factory = None
-
- def connectionMade(self):
+ def connectionMade(self) -> None:
self.made = 1
- if self.factory is not None and self.factory.protocolConnectionMade is not None:
- d = self.factory.protocolConnectionMade
- self.factory.protocolConnectionMade = None
+ factory: _ProtocolConnectionMadeHaver | None = (
+ self.factory # type:ignore[assignment]
+ )
+ if factory is not None and factory.protocolConnectionMade is not None:
+ d = factory.protocolConnectionMade
+ factory.protocolConnectionMade = None
d.callback(self)
- def dataReceived(self, data):
+ def dataReceived(self, data: bytes) -> None:
self.data += data
- def connectionLost(self, reason):
+ def connectionLost(self, reason: failure.Failure | None = None) -> None:
self.closed = 1
self.closedReason = reason
if self.closedDeferred is not None:
@@ -414,8 +432,54 @@ class _FakeConnector:
return self._address
+@implementer(IHostResolution)
+@dataclass
+class _SynchronousResolution:
+ name: str
+
+ def cancel(self) -> None:
+ """
+ Provided just for interface compliance; it should be impossible to
+ reach here, since it's resolved synchronously.
+ """
+ raise Exception("already resolved") # pragma: no cover
+
+
+@implementer(IHostnameResolver)
+class SynchronousResolver:
+ """
+ A very simple L{IHostnameResolver} that immediately, synchronously resolves
+ all host names to a single static address (TCPv4, 127.0.0.1) while
+ preserving any requested port number.
+ """
+
+ def resolveHostName(
+ self,
+ resolutionReceiver: IResolutionReceiver,
+ hostName: str,
+ portNumber: int = 0,
+ addressTypes: Sequence[type[IAddress]] | None = None,
+ transportSemantics: str = "TCP",
+ ) -> IHostResolution:
+ """
+ Implement L{IHostnameResolver.resolveHostName} to synchronously resolve
+ the name and complete resolution before returning.
+ """
+ resolution = _SynchronousResolution(hostName)
+ resolutionReceiver.resolutionBegan(resolution)
+ resolutionReceiver.addressResolved(IPv4Address("TCP", "127.0.0.1", portNumber))
+ resolutionReceiver.resolutionComplete()
+ return resolution
+
+
@implementer(
- IReactorCore, IReactorTCP, IReactorSSL, IReactorUNIX, IReactorSocket, IReactorFDSet
+ IReactorCore,
+ IReactorTCP,
+ IReactorSSL,
+ IReactorUNIX,
+ IReactorSocket,
+ IReactorFDSet,
+ IReactorPluggableNameResolver,
)
class MemoryReactor:
"""
@@ -474,6 +538,8 @@ class MemoryReactor:
connections added using C{adoptStreamConnection}.
"""
+ nameResolver: IHostnameResolver
+
def __init__(self):
"""
Initialize the tracking lists.
@@ -500,6 +566,15 @@ class MemoryReactor:
self.readers = set()
self.writers = set()
+ self.nameResolver = SynchronousResolver()
+
+ def installNameResolver(self, resolver: IHostnameResolver) -> IHostnameResolver:
+ """
+ Implement L{IReactorPluggableNameResolver}.
+ """
+ oldResolver = self.nameResolver
+ self.nameResolver = resolver
+ return oldResolver
def install(self):
"""
@@ -761,7 +836,13 @@ class MemoryReactorClock(MemoryReactor, Clock):
Clock.__init__(self)
-@implementer(IReactorTCP, IReactorSSL, IReactorUNIX, IReactorSocket)
+@implementer(
+ IReactorTCP,
+ IReactorSSL,
+ IReactorUNIX,
+ IReactorSocket,
+ IReactorPluggableNameResolver,
+)
class RaisingMemoryReactor:
"""
A fake reactor to be used in tests. It accepts TCP connection setup
@@ -771,7 +852,11 @@ class RaisingMemoryReactor:
@ivar _connectException: An instance of an L{Exception}
"""
- def __init__(self, listenException=None, connectException=None):
+ def __init__(
+ self,
+ listenException: Exception | None = None,
+ connectException: Exception | None = None,
+ ) -> None:
"""
@param listenException: An instance of an L{Exception} to raise
when any C{listen} method is called.
@@ -781,6 +866,14 @@ class RaisingMemoryReactor:
"""
self._listenException = listenException
self._connectException = connectException
+ self.nameResolver: IHostnameResolver = SynchronousResolver()
+
+ def installNameResolver(self, nameResolver: IHostnameResolver) -> IHostnameResolver:
+ """
+ Implement L{IReactorPluggableNameResolver}.
+ """
+ previous, self.nameResolver = self.nameResolver, nameResolver
+ return previous
def adoptStreamPort(self, fileno, addressFamily, factory):
"""
@@ -991,7 +1084,7 @@ def _benchmarkWithReactor(
Generator[Deferred[Any], Any, _T],
Deferred[_T],
],
- ]
+ ],
) -> Callable[[Any], None]: # pragma: no cover
"""
Decorator for running a benchmark tests that loops the reactor.