aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/internet/testing.py
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-11-12 07:54:50 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-11-12 08:05:59 +0300
commit55cec9f6b0618fb3570fc8ef66aad151f4932591 (patch)
tree9198c2ca0b0305269062c3674ce79f19c4990e65 /contrib/python/Twisted/py3/twisted/internet/testing.py
parentb77b1fbf262ea4f40e33a60ce32c4db4e5e49015 (diff)
downloadydb-55cec9f6b0618fb3570fc8ef66aad151f4932591.tar.gz
Intermediate changes
commit_hash:c229701a8b4f4d9ee57ce1ed763099d862d53fa6
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/internet/testing.py')
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/testing.py107
1 files changed, 105 insertions, 2 deletions
diff --git a/contrib/python/Twisted/py3/twisted/internet/testing.py b/contrib/python/Twisted/py3/twisted/internet/testing.py
index 2c372495844..6563184edf9 100644
--- a/contrib/python/Twisted/py3/twisted/internet/testing.py
+++ b/contrib/python/Twisted/py3/twisted/internet/testing.py
@@ -9,7 +9,18 @@ from __future__ import annotations
from io import BytesIO
from socket import AF_INET, AF_INET6
-from typing import Callable, Iterator, Sequence, overload
+from time import time
+from typing import (
+ Any,
+ Callable,
+ Coroutine,
+ Generator,
+ Iterator,
+ Sequence,
+ TypeVar,
+ Union,
+ overload,
+)
from zope.interface import implementedBy, implementer
from zope.interface.verify import verifyClass
@@ -19,7 +30,7 @@ from typing_extensions import ParamSpec, Self
from twisted.internet import address, error, protocol, task
from twisted.internet.abstract import _dataMustBeBytes, isIPv6Address
from twisted.internet.address import IPv4Address, IPv6Address, UNIXAddress
-from twisted.internet.defer import Deferred
+from twisted.internet.defer import Deferred, ensureDeferred, succeed
from twisted.internet.error import UnsupportedAddressFamily
from twisted.internet.interfaces import (
IConnector,
@@ -967,3 +978,95 @@ class EventLoggingObserver(Sequence[LogEvent]):
publisher.addObserver(obs)
testInstance.addCleanup(lambda: publisher.removeObserver(obs))
return obs
+
+
+_T = TypeVar("_T")
+
+
+def _benchmarkWithReactor(
+ test_target: Callable[
+ [],
+ Union[
+ Coroutine[Deferred[Any], Any, _T],
+ Generator[Deferred[Any], Any, _T],
+ Deferred[_T],
+ ],
+ ]
+) -> Callable[[Any], None]: # pragma: no cover
+ """
+ Decorator for running a benchmark tests that loops the reactor.
+
+ This is designed to decorate test method executed using pytest and
+ pytest-benchmark.
+ """
+
+ def deferredWrapper():
+ return ensureDeferred(test_target())
+
+ def benchmark_test(benchmark: Any) -> None:
+ # Spinning up and spinning down the reactor adds quite a lot of
+ # overhead to the benchmarked function. So, make sure that the overhead
+ # isn't making the benchmark meaningless before we bother with any real
+ # benchmarking.
+ start = time()
+ _runReactor(lambda: succeed(None))
+ justReactorElapsed = time() - start
+
+ start = time()
+ _runReactor(deferredWrapper)
+ benchmarkElapsed = time() - start
+
+ if benchmarkElapsed / justReactorElapsed < 5:
+ raise RuntimeError( # pragma: no cover
+ "The function you are benchmarking is fast enough that its "
+ "run time is being swamped by the startup/shutdown of the "
+ "reactor. Consider adding a for loop to the benchmark "
+ "function so it does the work a number of times."
+ )
+
+ benchmark(_runReactor, deferredWrapper)
+
+ return benchmark_test
+
+
+def _runReactor(callback: Callable[[], Deferred[_T]]) -> None: # pragma: no cover
+ """
+ (re)Start a reactor that might have been previously started.
+ """
+ # Delay to import to prevent side-effect in normal tests that are
+ # expecting to import twisted.internet.testing while no reactor is
+ # installed.
+ from twisted.internet import reactor
+
+ errors: list[failure.Failure] = []
+
+ deferred = callback()
+ deferred.addErrback(errors.append)
+ deferred.addBoth(lambda _: reactor.callLater(0, _stopReactor, reactor)) # type: ignore[attr-defined]
+ reactor.run(installSignalHandlers=False) # type: ignore[attr-defined]
+
+ if errors: # pragma: no cover
+ # Make sure the test fails in a visible way:
+ errors[0].raiseException()
+
+
+def _stopReactor(reactor): # pragma: no cover
+ """
+ Stop the reactor and allow it to be re-started later.
+ """
+ reactor.stop()
+ # Allow for on shutdown hooks to execute.
+ reactor.iterate()
+ # Since we're going to be poking the reactor's guts, let's make sure what
+ # we're doing is vaguely reasonable:
+ assert hasattr(reactor, "_startedBefore")
+ assert hasattr(reactor, "_started")
+ assert hasattr(reactor, "_justStopped")
+ assert hasattr(reactor, "running")
+ reactor._startedBefore = False
+ reactor._started = False
+ reactor._justStopped = False
+ reactor.running = False
+ # Start running has consumed the startup events, so we need
+ # to restore them.
+ reactor.addSystemEventTrigger("during", "startup", reactor._reallyStartRunning)