diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-11-12 07:54:50 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-11-12 08:05:59 +0300 |
commit | 55cec9f6b0618fb3570fc8ef66aad151f4932591 (patch) | |
tree | 9198c2ca0b0305269062c3674ce79f19c4990e65 /contrib/python/Twisted/py3/twisted/internet/testing.py | |
parent | b77b1fbf262ea4f40e33a60ce32c4db4e5e49015 (diff) | |
download | ydb-55cec9f6b0618fb3570fc8ef66aad151f4932591.tar.gz |
Intermediate changes
commit_hash:c229701a8b4f4d9ee57ce1ed763099d862d53fa6
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/internet/testing.py')
-rw-r--r-- | contrib/python/Twisted/py3/twisted/internet/testing.py | 107 |
1 files changed, 105 insertions, 2 deletions
diff --git a/contrib/python/Twisted/py3/twisted/internet/testing.py b/contrib/python/Twisted/py3/twisted/internet/testing.py index 2c372495844..6563184edf9 100644 --- a/contrib/python/Twisted/py3/twisted/internet/testing.py +++ b/contrib/python/Twisted/py3/twisted/internet/testing.py @@ -9,7 +9,18 @@ from __future__ import annotations from io import BytesIO from socket import AF_INET, AF_INET6 -from typing import Callable, Iterator, Sequence, overload +from time import time +from typing import ( + Any, + Callable, + Coroutine, + Generator, + Iterator, + Sequence, + TypeVar, + Union, + overload, +) from zope.interface import implementedBy, implementer from zope.interface.verify import verifyClass @@ -19,7 +30,7 @@ from typing_extensions import ParamSpec, Self from twisted.internet import address, error, protocol, task from twisted.internet.abstract import _dataMustBeBytes, isIPv6Address from twisted.internet.address import IPv4Address, IPv6Address, UNIXAddress -from twisted.internet.defer import Deferred +from twisted.internet.defer import Deferred, ensureDeferred, succeed from twisted.internet.error import UnsupportedAddressFamily from twisted.internet.interfaces import ( IConnector, @@ -967,3 +978,95 @@ class EventLoggingObserver(Sequence[LogEvent]): publisher.addObserver(obs) testInstance.addCleanup(lambda: publisher.removeObserver(obs)) return obs + + +_T = TypeVar("_T") + + +def _benchmarkWithReactor( + test_target: Callable[ + [], + Union[ + Coroutine[Deferred[Any], Any, _T], + Generator[Deferred[Any], Any, _T], + Deferred[_T], + ], + ] +) -> Callable[[Any], None]: # pragma: no cover + """ + Decorator for running a benchmark tests that loops the reactor. + + This is designed to decorate test method executed using pytest and + pytest-benchmark. + """ + + def deferredWrapper(): + return ensureDeferred(test_target()) + + def benchmark_test(benchmark: Any) -> None: + # Spinning up and spinning down the reactor adds quite a lot of + # overhead to the benchmarked function. So, make sure that the overhead + # isn't making the benchmark meaningless before we bother with any real + # benchmarking. + start = time() + _runReactor(lambda: succeed(None)) + justReactorElapsed = time() - start + + start = time() + _runReactor(deferredWrapper) + benchmarkElapsed = time() - start + + if benchmarkElapsed / justReactorElapsed < 5: + raise RuntimeError( # pragma: no cover + "The function you are benchmarking is fast enough that its " + "run time is being swamped by the startup/shutdown of the " + "reactor. Consider adding a for loop to the benchmark " + "function so it does the work a number of times." + ) + + benchmark(_runReactor, deferredWrapper) + + return benchmark_test + + +def _runReactor(callback: Callable[[], Deferred[_T]]) -> None: # pragma: no cover + """ + (re)Start a reactor that might have been previously started. + """ + # Delay to import to prevent side-effect in normal tests that are + # expecting to import twisted.internet.testing while no reactor is + # installed. + from twisted.internet import reactor + + errors: list[failure.Failure] = [] + + deferred = callback() + deferred.addErrback(errors.append) + deferred.addBoth(lambda _: reactor.callLater(0, _stopReactor, reactor)) # type: ignore[attr-defined] + reactor.run(installSignalHandlers=False) # type: ignore[attr-defined] + + if errors: # pragma: no cover + # Make sure the test fails in a visible way: + errors[0].raiseException() + + +def _stopReactor(reactor): # pragma: no cover + """ + Stop the reactor and allow it to be re-started later. + """ + reactor.stop() + # Allow for on shutdown hooks to execute. + reactor.iterate() + # Since we're going to be poking the reactor's guts, let's make sure what + # we're doing is vaguely reasonable: + assert hasattr(reactor, "_startedBefore") + assert hasattr(reactor, "_started") + assert hasattr(reactor, "_justStopped") + assert hasattr(reactor, "running") + reactor._startedBefore = False + reactor._started = False + reactor._justStopped = False + reactor.running = False + # Start running has consumed the startup events, so we need + # to restore them. + reactor.addSystemEventTrigger("during", "startup", reactor._reallyStartRunning) |