diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/trial/util.py | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/trial/util.py')
-rw-r--r-- | contrib/python/Twisted/py3/twisted/trial/util.py | 407 |
1 files changed, 407 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/trial/util.py b/contrib/python/Twisted/py3/twisted/trial/util.py new file mode 100644 index 0000000000..a8345ccc6c --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/trial/util.py @@ -0,0 +1,407 @@ +# -*- test-case-name: twisted.trial.test.test_util -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. +# + +""" +A collection of utility functions and classes, used internally by Trial. + +This code is for Trial's internal use. Do NOT use this code if you are writing +tests. It is subject to change at the Trial maintainer's whim. There is +nothing here in this module for you to use unless you are maintaining Trial. + +Any non-Trial Twisted code that uses this module will be shot. + +Maintainer: Jonathan Lange + +@var DEFAULT_TIMEOUT_DURATION: The default timeout which will be applied to + asynchronous (ie, Deferred-returning) test methods, in seconds. +""" +from __future__ import annotations + +from random import randrange +from typing import Any, Callable, TextIO, TypeVar + +from typing_extensions import ParamSpec + +from twisted.internet import interfaces, utils +from twisted.python.failure import Failure +from twisted.python.filepath import FilePath +from twisted.python.lockfile import FilesystemLock + +__all__ = [ + "DEFAULT_TIMEOUT_DURATION", + "excInfoOrFailureToExcInfo", + "suppress", + "acquireAttribute", +] + +DEFAULT_TIMEOUT = object() +DEFAULT_TIMEOUT_DURATION = 120.0 + + +class DirtyReactorAggregateError(Exception): + """ + Passed to L{twisted.trial.itrial.IReporter.addError} when the reactor is + left in an unclean state after a test. + + @ivar delayedCalls: The L{DelayedCall<twisted.internet.base.DelayedCall>} + objects which weren't cleaned up. + @ivar selectables: The selectables which weren't cleaned up. + """ + + def __init__(self, delayedCalls, selectables=None): + self.delayedCalls = delayedCalls + self.selectables = selectables + + def __str__(self) -> str: + """ + Return a multi-line message describing all of the unclean state. + """ + msg = "Reactor was unclean." + if self.delayedCalls: + msg += ( + "\nDelayedCalls: (set " + "twisted.internet.base.DelayedCall.debug = True to " + "debug)\n" + ) + msg += "\n".join(map(str, self.delayedCalls)) + if self.selectables: + msg += "\nSelectables:\n" + msg += "\n".join(map(str, self.selectables)) + return msg + + +class _Janitor: + """ + The guy that cleans up after you. + + @ivar test: The L{TestCase} to report errors about. + @ivar result: The L{IReporter} to report errors to. + @ivar reactor: The reactor to use. If None, the global reactor + will be used. + """ + + def __init__(self, test, result, reactor=None): + """ + @param test: See L{_Janitor.test}. + @param result: See L{_Janitor.result}. + @param reactor: See L{_Janitor.reactor}. + """ + self.test = test + self.result = result + self.reactor = reactor + + def postCaseCleanup(self): + """ + Called by L{unittest.TestCase} after a test to catch any logged errors + or pending L{DelayedCall<twisted.internet.base.DelayedCall>}s. + """ + calls = self._cleanPending() + if calls: + aggregate = DirtyReactorAggregateError(calls) + self.result.addError(self.test, Failure(aggregate)) + return False + return True + + def postClassCleanup(self): + """ + Called by L{unittest.TestCase} after the last test in a C{TestCase} + subclass. Ensures the reactor is clean by murdering the threadpool, + catching any pending + L{DelayedCall<twisted.internet.base.DelayedCall>}s, open sockets etc. + """ + selectables = self._cleanReactor() + calls = self._cleanPending() + if selectables or calls: + aggregate = DirtyReactorAggregateError(calls, selectables) + self.result.addError(self.test, Failure(aggregate)) + self._cleanThreads() + + def _getReactor(self): + """ + Get either the passed-in reactor or the global reactor. + """ + if self.reactor is not None: + reactor = self.reactor + else: + from twisted.internet import reactor + return reactor + + def _cleanPending(self): + """ + Cancel all pending calls and return their string representations. + """ + reactor = self._getReactor() + + # flush short-range timers + reactor.iterate(0) + reactor.iterate(0) + + delayedCallStrings = [] + for p in reactor.getDelayedCalls(): + if p.active(): + delayedString = str(p) + p.cancel() + else: + print("WEIRDNESS! pending timed call not active!") + delayedCallStrings.append(delayedString) + return delayedCallStrings + + _cleanPending = utils.suppressWarnings( + _cleanPending, + ( + ("ignore",), + { + "category": DeprecationWarning, + "message": r"reactor\.iterate cannot be used.*", + }, + ), + ) + + def _cleanThreads(self): + reactor = self._getReactor() + if interfaces.IReactorThreads.providedBy(reactor): + if reactor.threadpool is not None: + # Stop the threadpool now so that a new one is created. + # This improves test isolation somewhat (although this is a + # post class cleanup hook, so it's only isolating classes + # from each other, not methods from each other). + reactor._stopThreadPool() + + def _cleanReactor(self): + """ + Remove all selectables from the reactor, kill any of them that were + processes, and return their string representation. + """ + reactor = self._getReactor() + selectableStrings = [] + for sel in reactor.removeAll(): + if interfaces.IProcessTransport.providedBy(sel): + sel.signalProcess("KILL") + selectableStrings.append(repr(sel)) + return selectableStrings + + +_DEFAULT = object() + + +def acquireAttribute(objects, attr, default=_DEFAULT): + """ + Go through the list 'objects' sequentially until we find one which has + attribute 'attr', then return the value of that attribute. If not found, + return 'default' if set, otherwise, raise AttributeError. + """ + for obj in objects: + if hasattr(obj, attr): + return getattr(obj, attr) + if default is not _DEFAULT: + return default + raise AttributeError(f"attribute {attr!r} not found in {objects!r}") + + +def excInfoOrFailureToExcInfo(err): + """ + Coerce a Failure to an _exc_info, if err is a Failure. + + @param err: Either a tuple such as returned by L{sys.exc_info} or a + L{Failure} object. + @return: A tuple like the one returned by L{sys.exc_info}. e.g. + C{exception_type, exception_object, traceback_object}. + """ + if isinstance(err, Failure): + # Unwrap the Failure into an exc_info tuple. + err = (err.type, err.value, err.getTracebackObject()) + return err + + +def suppress(action="ignore", **kwarg): + """ + Sets up the .suppress tuple properly, pass options to this method as you + would the stdlib warnings.filterwarnings() + + So, to use this with a .suppress magic attribute you would do the + following: + + >>> from twisted.trial import unittest, util + >>> import warnings + >>> + >>> class TestFoo(unittest.TestCase): + ... def testFooBar(self): + ... warnings.warn("i am deprecated", DeprecationWarning) + ... testFooBar.suppress = [util.suppress(message='i am deprecated')] + ... + >>> + + Note that as with the todo and timeout attributes: the module level + attribute acts as a default for the class attribute which acts as a default + for the method attribute. The suppress attribute can be overridden at any + level by specifying C{.suppress = []} + """ + return ((action,), kwarg) + + +# This should be deleted, and replaced with twisted.application's code; see +# https://github.com/twisted/twisted/issues/6016: +_P = ParamSpec("_P") +_T = TypeVar("_T") + + +def profiled(f: Callable[_P, _T], outputFile: str) -> Callable[_P, _T]: + def _(*args: _P.args, **kwargs: _P.kwargs) -> _T: + import profile + + prof = profile.Profile() + try: + result = prof.runcall(f, *args, **kwargs) + prof.dump_stats(outputFile) + except SystemExit: + pass + prof.print_stats() + return result + + return _ + + +class _NoTrialMarker(Exception): + """ + No trial marker file could be found. + + Raised when trial attempts to remove a trial temporary working directory + that does not contain a marker file. + """ + + +def _removeSafely(path): + """ + Safely remove a path, recursively. + + If C{path} does not contain a node named C{_trial_marker}, a + L{_NoTrialMarker} exception is raised and the path is not removed. + """ + if not path.child(b"_trial_marker").exists(): + raise _NoTrialMarker( + f"{path!r} is not a trial temporary path, refusing to remove it" + ) + try: + path.remove() + except OSError as e: + print( + "could not remove %r, caught OSError [Errno %s]: %s" + % (path, e.errno, e.strerror) + ) + try: + newPath = FilePath( + b"_trial_temp_old" + str(randrange(10000000)).encode("utf-8") + ) + path.moveTo(newPath) + except OSError as e: + print( + "could not rename path, caught OSError [Errno %s]: %s" + % (e.errno, e.strerror) + ) + raise + + +class _WorkingDirectoryBusy(Exception): + """ + A working directory was specified to the runner, but another test run is + currently using that directory. + """ + + +def _unusedTestDirectory(base): + """ + Find an unused directory named similarly to C{base}. + + Once a directory is found, it will be locked and a marker dropped into it + to identify it as a trial temporary directory. + + @param base: A template path for the discovery process. If this path + exactly cannot be used, a path which varies only in a suffix of the + basename will be used instead. + @type base: L{FilePath} + + @return: A two-tuple. The first element is a L{FilePath} representing the + directory which was found and created. The second element is a locked + L{FilesystemLock<twisted.python.lockfile.FilesystemLock>}. Another + call to C{_unusedTestDirectory} will not be able to reused the + same name until the lock is released, either explicitly or by this + process exiting. + """ + counter = 0 + while True: + if counter: + testdir = base.sibling("%s-%d" % (base.basename(), counter)) + else: + testdir = base + + testdir.parent().makedirs(ignoreExistingDirectory=True) + testDirLock = FilesystemLock(testdir.path + ".lock") + if testDirLock.lock(): + # It is not in use + if testdir.exists(): + # It exists though - delete it + _removeSafely(testdir) + + # Create it anew and mark it as ours so the next _removeSafely on + # it succeeds. + testdir.makedirs() + testdir.child(b"_trial_marker").setContent(b"") + return testdir, testDirLock + else: + # It is in use + if base.basename() == "_trial_temp": + counter += 1 + else: + raise _WorkingDirectoryBusy() + + +def _listToPhrase(things, finalDelimiter, delimiter=", "): + """ + Produce a string containing each thing in C{things}, + separated by a C{delimiter}, with the last couple being separated + by C{finalDelimiter} + + @param things: The elements of the resulting phrase + @type things: L{list} or L{tuple} + + @param finalDelimiter: What to put between the last two things + (typically 'and' or 'or') + @type finalDelimiter: L{str} + + @param delimiter: The separator to use between each thing, + not including the last two. Should typically include a trailing space. + @type delimiter: L{str} + + @return: The resulting phrase + @rtype: L{str} + """ + if not isinstance(things, (list, tuple)): + raise TypeError("Things must be a list or a tuple") + if not things: + return "" + if len(things) == 1: + return str(things[0]) + if len(things) == 2: + return f"{str(things[0])} {finalDelimiter} {str(things[1])}" + else: + strThings = [] + for thing in things: + strThings.append(str(thing)) + return "{}{}{} {}".format( + delimiter.join(strThings[:-1]), + delimiter, + finalDelimiter, + strThings[-1], + ) + + +def openTestLog(path: FilePath[Any]) -> TextIO: + """ + Open the given path such that test log messages can be written to it. + """ + path.parent().makedirs(ignoreExistingDirectory=True) + # Always use UTF-8 because, considering all platforms, the system default + # encoding can not reliably encode all code points. + return open(path.path, "a", encoding="utf-8", errors="strict") |