diff options
author | nkozlovskiy <nmk@ydb.tech> | 2023-09-29 12:24:06 +0300 |
---|---|---|
committer | nkozlovskiy <nmk@ydb.tech> | 2023-09-29 12:41:34 +0300 |
commit | e0e3e1717e3d33762ce61950504f9637a6e669ed (patch) | |
tree | bca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/tools/python/src/Lib/test | |
parent | 38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff) | |
download | ydb-e0e3e1717e3d33762ce61950504f9637a6e669ed.tar.gz |
add ydb deps
Diffstat (limited to 'contrib/tools/python/src/Lib/test')
-rw-r--r-- | contrib/tools/python/src/Lib/test/__init__.py | 1 | ||||
-rw-r--r-- | contrib/tools/python/src/Lib/test/support/__init__.py | 2173 | ||||
-rw-r--r-- | contrib/tools/python/src/Lib/test/support/script_helper.py | 170 | ||||
-rw-r--r-- | contrib/tools/python/src/Lib/test/test_support.py | 3 |
4 files changed, 2347 insertions, 0 deletions
diff --git a/contrib/tools/python/src/Lib/test/__init__.py b/contrib/tools/python/src/Lib/test/__init__.py new file mode 100644 index 0000000000..b93054b3ec --- /dev/null +++ b/contrib/tools/python/src/Lib/test/__init__.py @@ -0,0 +1 @@ +# Dummy file to make this directory a package. diff --git a/contrib/tools/python/src/Lib/test/support/__init__.py b/contrib/tools/python/src/Lib/test/support/__init__.py new file mode 100644 index 0000000000..9effdddd27 --- /dev/null +++ b/contrib/tools/python/src/Lib/test/support/__init__.py @@ -0,0 +1,2173 @@ +"""Supporting definitions for the Python regression tests.""" + +if __name__ != 'test.support': + raise ImportError('test.support must be imported from the test package') + +import contextlib +import errno +import fnmatch +import functools +import gc +import socket +import stat +import sys +import os +import platform +import shutil +import warnings +import unittest +import importlib +import UserDict +import re +import time +import struct +import sysconfig +import types + +try: + import thread +except ImportError: + thread = None + +__all__ = ["Error", "TestFailed", "TestDidNotRun", "ResourceDenied", "import_module", + "verbose", "use_resources", "max_memuse", "record_original_stdout", + "get_original_stdout", "unload", "unlink", "rmtree", "forget", + "is_resource_enabled", "requires", "requires_mac_ver", + "find_unused_port", "bind_port", + "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ", + "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error", + "open_urlresource", "check_warnings", "check_py3k_warnings", + "CleanImport", "EnvironmentVarGuard", "captured_output", + "captured_stdout", "TransientResource", "transient_internet", + "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest", + "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", + "threading_cleanup", "reap_threads", "start_threads", "cpython_only", + "check_impl_detail", "get_attribute", "py3k_bytes", + "import_fresh_module", "threading_cleanup", "reap_children", + "strip_python_stderr", "IPV6_ENABLED", "run_with_tz", + "SuppressCrashReport"] + +class Error(Exception): + """Base class for regression test exceptions.""" + +class TestFailed(Error): + """Test failed.""" + +class TestDidNotRun(Error): + """Test did not run any subtests.""" + +class ResourceDenied(unittest.SkipTest): + """Test skipped because it requested a disallowed resource. + + This is raised when a test calls requires() for a resource that + has not been enabled. It is used to distinguish between expected + and unexpected skips. + """ + +@contextlib.contextmanager +def _ignore_deprecated_imports(ignore=True): + """Context manager to suppress package and module deprecation + warnings when importing them. + + If ignore is False, this context manager has no effect.""" + if ignore: + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", ".+ (module|package)", + DeprecationWarning) + yield + else: + yield + + +def import_module(name, deprecated=False): + """Import and return the module to be tested, raising SkipTest if + it is not available. + + If deprecated is True, any module or package deprecation messages + will be suppressed.""" + with _ignore_deprecated_imports(deprecated): + try: + return importlib.import_module(name) + except ImportError, msg: + raise unittest.SkipTest(str(msg)) + + +def _save_and_remove_module(name, orig_modules): + """Helper function to save and remove a module from sys.modules + + Raise ImportError if the module can't be imported.""" + # try to import the module and raise an error if it can't be imported + if name not in sys.modules: + __import__(name) + del sys.modules[name] + for modname in list(sys.modules): + if modname == name or modname.startswith(name + '.'): + orig_modules[modname] = sys.modules[modname] + del sys.modules[modname] + +def _save_and_block_module(name, orig_modules): + """Helper function to save and block a module in sys.modules + + Return True if the module was in sys.modules, False otherwise.""" + saved = True + try: + orig_modules[name] = sys.modules[name] + except KeyError: + saved = False + sys.modules[name] = None + return saved + + +def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): + """Imports and returns a module, deliberately bypassing the sys.modules cache + and importing a fresh copy of the module. Once the import is complete, + the sys.modules cache is restored to its original state. + + Modules named in fresh are also imported anew if needed by the import. + If one of these modules can't be imported, None is returned. + + Importing of modules named in blocked is prevented while the fresh import + takes place. + + If deprecated is True, any module or package deprecation messages + will be suppressed.""" + # NOTE: test_heapq, test_json, and test_warnings include extra sanity + # checks to make sure that this utility function is working as expected + with _ignore_deprecated_imports(deprecated): + # Keep track of modules saved for later restoration as well + # as those which just need a blocking entry removed + orig_modules = {} + names_to_remove = [] + _save_and_remove_module(name, orig_modules) + try: + for fresh_name in fresh: + _save_and_remove_module(fresh_name, orig_modules) + for blocked_name in blocked: + if not _save_and_block_module(blocked_name, orig_modules): + names_to_remove.append(blocked_name) + fresh_module = importlib.import_module(name) + except ImportError: + fresh_module = None + finally: + for orig_name, module in orig_modules.items(): + sys.modules[orig_name] = module + for name_to_remove in names_to_remove: + del sys.modules[name_to_remove] + return fresh_module + + +def get_attribute(obj, name): + """Get an attribute, raising SkipTest if AttributeError is raised.""" + try: + attribute = getattr(obj, name) + except AttributeError: + if isinstance(obj, types.ModuleType): + msg = "module %r has no attribute %r" % (obj.__name__, name) + elif isinstance(obj, types.ClassType): + msg = "class %s has no attribute %r" % (obj.__name__, name) + elif isinstance(obj, types.InstanceType): + msg = "%s instance has no attribute %r" % (obj.__class__.__name__, name) + elif isinstance(obj, type): + msg = "type object %r has no attribute %r" % (obj.__name__, name) + else: + msg = "%r object has no attribute %r" % (type(obj).__name__, name) + raise unittest.SkipTest(msg) + else: + return attribute + + +verbose = 1 # Flag set to 0 by regrtest.py +use_resources = None # Flag set to [] by regrtest.py +max_memuse = 0 # Disable bigmem tests (they will still be run with + # small sizes, to make sure they work.) +real_max_memuse = 0 +failfast = False + +# _original_stdout is meant to hold stdout at the time regrtest began. +# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. +# The point is to have some flavor of stdout the user can actually see. +_original_stdout = None +def record_original_stdout(stdout): + global _original_stdout + _original_stdout = stdout + +def get_original_stdout(): + return _original_stdout or sys.stdout + +def unload(name): + try: + del sys.modules[name] + except KeyError: + pass + +def _force_run(path, func, *args): + try: + return func(*args) + except EnvironmentError as err: + if verbose >= 2: + print('%s: %s' % (err.__class__.__name__, err)) + print('re-run %s%r' % (func.__name__, args)) + os.chmod(path, stat.S_IRWXU) + return func(*args) + +if sys.platform.startswith("win"): + def _waitfor(func, pathname, waitall=False): + # Perform the operation + func(pathname) + # Now setup the wait loop + if waitall: + dirname = pathname + else: + dirname, name = os.path.split(pathname) + dirname = dirname or '.' + # Check for `pathname` to be removed from the filesystem. + # The exponential backoff of the timeout amounts to a total + # of ~1 second after which the deletion is probably an error + # anyway. + # Testing on an i7@4.3GHz shows that usually only 1 iteration is + # required when contention occurs. + timeout = 0.001 + while timeout < 1.0: + # Note we are only testing for the existence of the file(s) in + # the contents of the directory regardless of any security or + # access rights. If we have made it this far, we have sufficient + # permissions to do that much using Python's equivalent of the + # Windows API FindFirstFile. + # Other Windows APIs can fail or give incorrect results when + # dealing with files that are pending deletion. + L = os.listdir(dirname) + if not (L if waitall else name in L): + return + # Increase the timeout and try again + time.sleep(timeout) + timeout *= 2 + warnings.warn('tests may fail, delete still pending for ' + pathname, + RuntimeWarning, stacklevel=4) + + def _unlink(filename): + _waitfor(os.unlink, filename) + + def _rmdir(dirname): + _waitfor(os.rmdir, dirname) + + def _rmtree(path): + def _rmtree_inner(path): + for name in _force_run(path, os.listdir, path): + fullname = os.path.join(path, name) + if os.path.isdir(fullname): + _waitfor(_rmtree_inner, fullname, waitall=True) + _force_run(fullname, os.rmdir, fullname) + else: + _force_run(fullname, os.unlink, fullname) + _waitfor(_rmtree_inner, path, waitall=True) + _waitfor(lambda p: _force_run(p, os.rmdir, p), path) +else: + _unlink = os.unlink + _rmdir = os.rmdir + + def _rmtree(path): + try: + shutil.rmtree(path) + return + except EnvironmentError: + pass + + def _rmtree_inner(path): + for name in _force_run(path, os.listdir, path): + fullname = os.path.join(path, name) + try: + mode = os.lstat(fullname).st_mode + except EnvironmentError: + mode = 0 + if stat.S_ISDIR(mode): + _rmtree_inner(fullname) + _force_run(path, os.rmdir, fullname) + else: + _force_run(path, os.unlink, fullname) + _rmtree_inner(path) + os.rmdir(path) + +def unlink(filename): + try: + _unlink(filename) + except OSError as exc: + if exc.errno not in (errno.ENOENT, errno.ENOTDIR): + raise + +def rmdir(dirname): + try: + _rmdir(dirname) + except OSError as error: + # The directory need not exist. + if error.errno != errno.ENOENT: + raise + +def rmtree(path): + try: + _rmtree(path) + except OSError, e: + # Unix returns ENOENT, Windows returns ESRCH. + if e.errno not in (errno.ENOENT, errno.ESRCH): + raise + +def forget(modname): + '''"Forget" a module was ever imported by removing it from sys.modules and + deleting any .pyc and .pyo files.''' + unload(modname) + for dirname in sys.path: + unlink(os.path.join(dirname, modname + os.extsep + 'pyc')) + # Deleting the .pyo file cannot be within the 'try' for the .pyc since + # the chance exists that there is no .pyc (and thus the 'try' statement + # is exited) but there is a .pyo file. + unlink(os.path.join(dirname, modname + os.extsep + 'pyo')) + +# Check whether a gui is actually available +def _is_gui_available(): + if hasattr(_is_gui_available, 'result'): + return _is_gui_available.result + reason = None + if sys.platform.startswith('win'): + # if Python is running as a service (such as the buildbot service), + # gui interaction may be disallowed + import ctypes + import ctypes.wintypes + UOI_FLAGS = 1 + WSF_VISIBLE = 0x0001 + class USEROBJECTFLAGS(ctypes.Structure): + _fields_ = [("fInherit", ctypes.wintypes.BOOL), + ("fReserved", ctypes.wintypes.BOOL), + ("dwFlags", ctypes.wintypes.DWORD)] + dll = ctypes.windll.user32 + h = dll.GetProcessWindowStation() + if not h: + raise ctypes.WinError() + uof = USEROBJECTFLAGS() + needed = ctypes.wintypes.DWORD() + res = dll.GetUserObjectInformationW(h, + UOI_FLAGS, + ctypes.byref(uof), + ctypes.sizeof(uof), + ctypes.byref(needed)) + if not res: + raise ctypes.WinError() + if not bool(uof.dwFlags & WSF_VISIBLE): + reason = "gui not available (WSF_VISIBLE flag not set)" + elif sys.platform == 'darwin': + # The Aqua Tk implementations on OS X can abort the process if + # being called in an environment where a window server connection + # cannot be made, for instance when invoked by a buildbot or ssh + # process not running under the same user id as the current console + # user. To avoid that, raise an exception if the window manager + # connection is not available. + from ctypes import cdll, c_int, pointer, Structure + from ctypes.util import find_library + + app_services = cdll.LoadLibrary(find_library("ApplicationServices")) + + if app_services.CGMainDisplayID() == 0: + reason = "gui tests cannot run without OS X window manager" + else: + class ProcessSerialNumber(Structure): + _fields_ = [("highLongOfPSN", c_int), + ("lowLongOfPSN", c_int)] + psn = ProcessSerialNumber() + psn_p = pointer(psn) + if ( (app_services.GetCurrentProcess(psn_p) < 0) or + (app_services.SetFrontProcess(psn_p) < 0) ): + reason = "cannot run without OS X gui process" + + # check on every platform whether tkinter can actually do anything + if not reason: + try: + from Tkinter import Tk + root = Tk() + root.withdraw() + root.update() + root.destroy() + except Exception as e: + err_string = str(e) + if len(err_string) > 50: + err_string = err_string[:50] + ' [...]' + reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__, + err_string) + + _is_gui_available.reason = reason + _is_gui_available.result = not reason + + return _is_gui_available.result + +def is_resource_enabled(resource): + """Test whether a resource is enabled. + + Known resources are set by regrtest.py. If not running under regrtest.py, + all resources are assumed enabled unless use_resources has been set. + """ + return use_resources is None or resource in use_resources + +def requires(resource, msg=None): + """Raise ResourceDenied if the specified resource is not available.""" + if not is_resource_enabled(resource): + if msg is None: + msg = "Use of the `%s' resource not enabled" % resource + raise ResourceDenied(msg) + if resource == 'gui' and not _is_gui_available(): + raise ResourceDenied(_is_gui_available.reason) + +def requires_mac_ver(*min_version): + """Decorator raising SkipTest if the OS is Mac OS X and the OS X + version if less than min_version. + + For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version + is lesser than 10.5. + """ + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kw): + if sys.platform == 'darwin': + version_txt = platform.mac_ver()[0] + try: + version = tuple(map(int, version_txt.split('.'))) + except ValueError: + pass + else: + if version < min_version: + min_version_txt = '.'.join(map(str, min_version)) + raise unittest.SkipTest( + "Mac OS X %s or higher required, not %s" + % (min_version_txt, version_txt)) + return func(*args, **kw) + wrapper.min_version = min_version + return wrapper + return decorator + + +# Don't use "localhost", since resolving it uses the DNS under recent +# Windows versions (see issue #18792). +HOST = "127.0.0.1" +HOSTv6 = "::1" + + +def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): + """Returns an unused port that should be suitable for binding. This is + achieved by creating a temporary socket with the same family and type as + the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to + the specified host address (defaults to 0.0.0.0) with the port set to 0, + eliciting an unused ephemeral port from the OS. The temporary socket is + then closed and deleted, and the ephemeral port is returned. + + Either this method or bind_port() should be used for any tests where a + server socket needs to be bound to a particular port for the duration of + the test. Which one to use depends on whether the calling code is creating + a python socket, or if an unused port needs to be provided in a constructor + or passed to an external program (i.e. the -accept argument to openssl's + s_server mode). Always prefer bind_port() over find_unused_port() where + possible. Hard coded ports should *NEVER* be used. As soon as a server + socket is bound to a hard coded port, the ability to run multiple instances + of the test simultaneously on the same host is compromised, which makes the + test a ticking time bomb in a buildbot environment. On Unix buildbots, this + may simply manifest as a failed test, which can be recovered from without + intervention in most cases, but on Windows, the entire python process can + completely and utterly wedge, requiring someone to log in to the buildbot + and manually kill the affected process. + + (This is easy to reproduce on Windows, unfortunately, and can be traced to + the SO_REUSEADDR socket option having different semantics on Windows versus + Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, + listen and then accept connections on identical host/ports. An EADDRINUSE + socket.error will be raised at some point (depending on the platform and + the order bind and listen were called on each socket). + + However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE + will ever be raised when attempting to bind two identical host/ports. When + accept() is called on each socket, the second caller's process will steal + the port from the first caller, leaving them both in an awkwardly wedged + state where they'll no longer respond to any signals or graceful kills, and + must be forcibly killed via OpenProcess()/TerminateProcess(). + + The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option + instead of SO_REUSEADDR, which effectively affords the same semantics as + SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open + Source world compared to Windows ones, this is a common mistake. A quick + look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when + openssl.exe is called with the 's_server' option, for example. See + http://bugs.python.org/issue2550 for more info. The following site also + has a very thorough description about the implications of both REUSEADDR + and EXCLUSIVEADDRUSE on Windows: + http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) + + XXX: although this approach is a vast improvement on previous attempts to + elicit unused ports, it rests heavily on the assumption that the ephemeral + port returned to us by the OS won't immediately be dished back out to some + other process when we close and delete our temporary socket but before our + calling code has a chance to bind the returned port. We can deal with this + issue if/when we come across it.""" + tempsock = socket.socket(family, socktype) + port = bind_port(tempsock) + tempsock.close() + del tempsock + return port + +def bind_port(sock, host=HOST): + """Bind the socket to a free port and return the port number. Relies on + ephemeral ports in order to ensure we are using an unbound port. This is + important as many tests may be running simultaneously, especially in a + buildbot environment. This method raises an exception if the sock.family + is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR + or SO_REUSEPORT set on it. Tests should *never* set these socket options + for TCP/IP sockets. The only case for setting these options is testing + multicasting via multiple UDP sockets. + + Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. + on Windows), it will be set on the socket. This will prevent anyone else + from bind()'ing to our host/port for the duration of the test. + """ + if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: + if hasattr(socket, 'SO_REUSEADDR'): + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: + raise TestFailed("tests should never set the SO_REUSEADDR " \ + "socket option on TCP/IP sockets!") + if hasattr(socket, 'SO_REUSEPORT'): + try: + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: + raise TestFailed("tests should never set the SO_REUSEPORT " \ + "socket option on TCP/IP sockets!") + except EnvironmentError: + # Python's socket module was compiled using modern headers + # thus defining SO_REUSEPORT but this process is running + # under an older kernel that does not support SO_REUSEPORT. + pass + if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): + sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) + + sock.bind((host, 0)) + port = sock.getsockname()[1] + return port + +def _is_ipv6_enabled(): + """Check whether IPv6 is enabled on this host.""" + if socket.has_ipv6: + sock = None + try: + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.bind((HOSTv6, 0)) + return True + except socket.error: + pass + finally: + if sock: + sock.close() + return False + +IPV6_ENABLED = _is_ipv6_enabled() + +def system_must_validate_cert(f): + """Skip the test on TLS certificate validation failures.""" + @functools.wraps(f) + def dec(*args, **kwargs): + try: + f(*args, **kwargs) + except IOError as e: + if "CERTIFICATE_VERIFY_FAILED" in str(e): + raise unittest.SkipTest("system does not contain " + "necessary certificates") + raise + return dec + +FUZZ = 1e-6 + +def fcmp(x, y): # fuzzy comparison function + if isinstance(x, float) or isinstance(y, float): + try: + fuzz = (abs(x) + abs(y)) * FUZZ + if abs(x-y) <= fuzz: + return 0 + except: + pass + elif type(x) == type(y) and isinstance(x, (tuple, list)): + for i in range(min(len(x), len(y))): + outcome = fcmp(x[i], y[i]) + if outcome != 0: + return outcome + return (len(x) > len(y)) - (len(x) < len(y)) + return (x > y) - (x < y) + + +# A constant likely larger than the underlying OS pipe buffer size, to +# make writes blocking. +# Windows limit seems to be around 512 B, and many Unix kernels have a +# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. +# (see issue #17835 for a discussion of this number). +PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1 + +# A constant likely larger than the underlying OS socket buffer size, to make +# writes blocking. +# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl +# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643 +# for a discussion of this number). +SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 + +is_jython = sys.platform.startswith('java') + +try: + unicode + have_unicode = True +except NameError: + have_unicode = False + +requires_unicode = unittest.skipUnless(have_unicode, 'no unicode support') + +def u(s): + return unicode(s, 'unicode-escape') + +# FS_NONASCII: non-ASCII Unicode character encodable by +# sys.getfilesystemencoding(), or None if there is no such character. +FS_NONASCII = None +if have_unicode: + for character in ( + # First try printable and common characters to have a readable filename. + # For each character, the encoding list are just example of encodings able + # to encode the character (the list is not exhaustive). + + # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 + unichr(0x00E6), + # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 + unichr(0x0130), + # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 + unichr(0x0141), + # U+03C6 (Greek Small Letter Phi): cp1253 + unichr(0x03C6), + # U+041A (Cyrillic Capital Letter Ka): cp1251 + unichr(0x041A), + # U+05D0 (Hebrew Letter Alef): Encodable to cp424 + unichr(0x05D0), + # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic + unichr(0x060C), + # U+062A (Arabic Letter Teh): cp720 + unichr(0x062A), + # U+0E01 (Thai Character Ko Kai): cp874 + unichr(0x0E01), + + # Then try more "special" characters. "special" because they may be + # interpreted or displayed differently depending on the exact locale + # encoding and the font. + + # U+00A0 (No-Break Space) + unichr(0x00A0), + # U+20AC (Euro Sign) + unichr(0x20AC), + ): + try: + # In Windows, 'mbcs' is used, and encode() returns '?' + # for characters missing in the ANSI codepage + if character.encode(sys.getfilesystemencoding())\ + .decode(sys.getfilesystemencoding())\ + != character: + raise UnicodeError + except UnicodeError: + pass + else: + FS_NONASCII = character + break + +# Filename used for testing +if os.name == 'java': + # Jython disallows @ in module names + TESTFN = '$test' +elif os.name == 'riscos': + TESTFN = 'testfile' +else: + TESTFN = '@test' + # Unicode name only used if TEST_FN_ENCODING exists for the platform. + if have_unicode: + # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() + # TESTFN_UNICODE is a filename that can be encoded using the + # file system encoding, but *not* with the default (ascii) encoding + if isinstance('', unicode): + # python -U + # XXX perhaps unicode() should accept Unicode strings? + TESTFN_UNICODE = "@test-\xe0\xf2" + else: + # 2 latin characters. + TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1") + TESTFN_ENCODING = sys.getfilesystemencoding() + # TESTFN_UNENCODABLE is a filename that should *not* be + # able to be encoded by *either* the default or filesystem encoding. + # This test really only makes sense on Windows NT platforms + # which have special Unicode support in posixmodule. + if (not hasattr(sys, "getwindowsversion") or + sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME + TESTFN_UNENCODABLE = None + else: + # Japanese characters (I think - from bug 846133) + TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"') + try: + # XXX - Note - should be using TESTFN_ENCODING here - but for + # Windows, "mbcs" currently always operates as if in + # errors=ignore' mode - hence we get '?' characters rather than + # the exception. 'Latin1' operates as we expect - ie, fails. + # See [ 850997 ] mbcs encoding ignores errors + TESTFN_UNENCODABLE.encode("Latin1") + except UnicodeEncodeError: + pass + else: + print \ + 'WARNING: The filename %r CAN be encoded by the filesystem. ' \ + 'Unicode filename tests may not be effective' \ + % TESTFN_UNENCODABLE + + +# Disambiguate TESTFN for parallel testing, while letting it remain a valid +# module name. +TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) + +# Save the initial cwd +SAVEDCWD = os.getcwd() + +@contextlib.contextmanager +def temp_dir(path=None, quiet=False): + """Return a context manager that creates a temporary directory. + + Arguments: + + path: the directory to create temporarily. If omitted or None, + defaults to creating a temporary directory using tempfile.mkdtemp. + + quiet: if False (the default), the context manager raises an exception + on error. Otherwise, if the path is specified and cannot be + created, only a warning is issued. + + """ + dir_created = False + if path is None: + import tempfile + path = tempfile.mkdtemp() + dir_created = True + path = os.path.realpath(path) + else: + if (have_unicode and isinstance(path, unicode) and + not os.path.supports_unicode_filenames): + try: + path = path.encode(sys.getfilesystemencoding() or 'ascii') + except UnicodeEncodeError: + if not quiet: + raise unittest.SkipTest('unable to encode the cwd name with ' + 'the filesystem encoding.') + try: + os.mkdir(path) + dir_created = True + except OSError: + if not quiet: + raise + warnings.warn('tests may fail, unable to create temp dir: ' + path, + RuntimeWarning, stacklevel=3) + if dir_created: + pid = os.getpid() + try: + yield path + finally: + # In case the process forks, let only the parent remove the + # directory. The child has a diffent process id. (bpo-30028) + if dir_created and pid == os.getpid(): + rmtree(path) + +@contextlib.contextmanager +def change_cwd(path, quiet=False): + """Return a context manager that changes the current working directory. + + Arguments: + + path: the directory to use as the temporary current working directory. + + quiet: if False (the default), the context manager raises an exception + on error. Otherwise, it issues only a warning and keeps the current + working directory the same. + + """ + saved_dir = os.getcwd() + try: + os.chdir(path) + except OSError: + if not quiet: + raise + warnings.warn('tests may fail, unable to change CWD to: ' + path, + RuntimeWarning, stacklevel=3) + try: + yield os.getcwd() + finally: + os.chdir(saved_dir) + + +@contextlib.contextmanager +def temp_cwd(name='tempcwd', quiet=False): + """ + Context manager that temporarily creates and changes the CWD. + + The function temporarily changes the current working directory + after creating a temporary directory in the current directory with + name *name*. If *name* is None, the temporary directory is + created using tempfile.mkdtemp. + + If *quiet* is False (default) and it is not possible to + create or change the CWD, an error is raised. If *quiet* is True, + only a warning is raised and the original CWD is used. + + """ + with temp_dir(path=name, quiet=quiet) as temp_path: + with change_cwd(temp_path, quiet=quiet) as cwd_dir: + yield cwd_dir + +# TEST_HOME_DIR refers to the top level directory of the "test" package +# that contains Python's regression test suite +TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) +TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR) + +# TEST_DATA_DIR is used as a target download location for remote resources +TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data") + +def findfile(file, subdir=None): + """Try to find a file on sys.path and the working directory. If it is not + found the argument passed to the function is returned (this does not + necessarily signal failure; could still be the legitimate path).""" + if os.path.isabs(file): + return file + if subdir is not None: + file = os.path.join(subdir, file) + path = [TEST_HOME_DIR] + sys.path + for dn in path: + fn = os.path.join(dn, file) + if os.path.exists(fn): return fn + return file + +def sortdict(dict): + "Like repr(dict), but in sorted order." + items = dict.items() + items.sort() + reprpairs = ["%r: %r" % pair for pair in items] + withcommas = ", ".join(reprpairs) + return "{%s}" % withcommas + +def make_bad_fd(): + """ + Create an invalid file descriptor by opening and closing a file and return + its fd. + """ + file = open(TESTFN, "wb") + try: + return file.fileno() + finally: + file.close() + unlink(TESTFN) + +def check_syntax_error(testcase, statement, errtext='', lineno=None, offset=None): + with testcase.assertRaisesRegexp(SyntaxError, errtext) as cm: + compile(statement, '<test string>', 'exec') + err = cm.exception + if lineno is not None: + testcase.assertEqual(err.lineno, lineno) + if offset is not None: + testcase.assertEqual(err.offset, offset) + +def open_urlresource(url, check=None): + import urlparse, urllib2 + + filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! + + fn = os.path.join(TEST_DATA_DIR, filename) + + def check_valid_file(fn): + f = open(fn) + if check is None: + return f + elif check(f): + f.seek(0) + return f + f.close() + + if os.path.exists(fn): + f = check_valid_file(fn) + if f is not None: + return f + unlink(fn) + + # Verify the requirement before downloading the file + requires('urlfetch') + + print >> get_original_stdout(), '\tfetching %s ...' % url + f = urllib2.urlopen(url, timeout=15) + try: + with open(fn, "wb") as out: + s = f.read() + while s: + out.write(s) + s = f.read() + finally: + f.close() + + f = check_valid_file(fn) + if f is not None: + return f + raise TestFailed('invalid resource "%s"' % fn) + + +class WarningsRecorder(object): + """Convenience wrapper for the warnings list returned on + entry to the warnings.catch_warnings() context manager. + """ + def __init__(self, warnings_list): + self._warnings = warnings_list + self._last = 0 + + def __getattr__(self, attr): + if len(self._warnings) > self._last: + return getattr(self._warnings[-1], attr) + elif attr in warnings.WarningMessage._WARNING_DETAILS: + return None + raise AttributeError("%r has no attribute %r" % (self, attr)) + + @property + def warnings(self): + return self._warnings[self._last:] + + def reset(self): + self._last = len(self._warnings) + + +def _filterwarnings(filters, quiet=False): + """Catch the warnings, then check if all the expected + warnings have been raised and re-raise unexpected warnings. + If 'quiet' is True, only re-raise the unexpected warnings. + """ + # Clear the warning registry of the calling module + # in order to re-raise the warnings. + frame = sys._getframe(2) + registry = frame.f_globals.get('__warningregistry__') + if registry: + registry.clear() + with warnings.catch_warnings(record=True) as w: + # Set filter "always" to record all warnings. Because + # test_warnings swap the module, we need to look up in + # the sys.modules dictionary. + sys.modules['warnings'].simplefilter("always") + yield WarningsRecorder(w) + # Filter the recorded warnings + reraise = [warning.message for warning in w] + missing = [] + for msg, cat in filters: + seen = False + for exc in reraise[:]: + message = str(exc) + # Filter out the matching messages + if (re.match(msg, message, re.I) and + issubclass(exc.__class__, cat)): + seen = True + reraise.remove(exc) + if not seen and not quiet: + # This filter caught nothing + missing.append((msg, cat.__name__)) + if reraise: + raise AssertionError("unhandled warning %r" % reraise[0]) + if missing: + raise AssertionError("filter (%r, %s) did not catch any warning" % + missing[0]) + + +@contextlib.contextmanager +def check_warnings(*filters, **kwargs): + """Context manager to silence warnings. + + Accept 2-tuples as positional arguments: + ("message regexp", WarningCategory) + + Optional argument: + - if 'quiet' is True, it does not fail if a filter catches nothing + (default True without argument, + default False if some filters are defined) + + Without argument, it defaults to: + check_warnings(("", Warning), quiet=True) + """ + quiet = kwargs.get('quiet') + if not filters: + filters = (("", Warning),) + # Preserve backward compatibility + if quiet is None: + quiet = True + return _filterwarnings(filters, quiet) + + +@contextlib.contextmanager +def check_py3k_warnings(*filters, **kwargs): + """Context manager to silence py3k warnings. + + Accept 2-tuples as positional arguments: + ("message regexp", WarningCategory) + + Optional argument: + - if 'quiet' is True, it does not fail if a filter catches nothing + (default False) + + Without argument, it defaults to: + check_py3k_warnings(("", DeprecationWarning), quiet=False) + """ + if sys.py3kwarning: + if not filters: + filters = (("", DeprecationWarning),) + else: + # It should not raise any py3k warning + filters = () + return _filterwarnings(filters, kwargs.get('quiet')) + + +class CleanImport(object): + """Context manager to force import to return a new module reference. + + This is useful for testing module-level behaviours, such as + the emission of a DeprecationWarning on import. + + Use like this: + + with CleanImport("foo"): + importlib.import_module("foo") # new reference + """ + + def __init__(self, *module_names): + self.original_modules = sys.modules.copy() + for module_name in module_names: + if module_name in sys.modules: + module = sys.modules[module_name] + # It is possible that module_name is just an alias for + # another module (e.g. stub for modules renamed in 3.x). + # In that case, we also need delete the real module to clear + # the import cache. + if module.__name__ != module_name: + del sys.modules[module.__name__] + del sys.modules[module_name] + + def __enter__(self): + return self + + def __exit__(self, *ignore_exc): + sys.modules.update(self.original_modules) + + +class EnvironmentVarGuard(UserDict.DictMixin): + + """Class to help protect the environment variable properly. Can be used as + a context manager.""" + + def __init__(self): + self._environ = os.environ + self._changed = {} + + def __getitem__(self, envvar): + return self._environ[envvar] + + def __setitem__(self, envvar, value): + # Remember the initial value on the first access + if envvar not in self._changed: + self._changed[envvar] = self._environ.get(envvar) + self._environ[envvar] = value + + def __delitem__(self, envvar): + # Remember the initial value on the first access + if envvar not in self._changed: + self._changed[envvar] = self._environ.get(envvar) + if envvar in self._environ: + del self._environ[envvar] + + def keys(self): + return self._environ.keys() + + def set(self, envvar, value): + self[envvar] = value + + def unset(self, envvar): + del self[envvar] + + def __enter__(self): + return self + + def __exit__(self, *ignore_exc): + for (k, v) in self._changed.items(): + if v is None: + if k in self._environ: + del self._environ[k] + else: + self._environ[k] = v + os.environ = self._environ + + +class DirsOnSysPath(object): + """Context manager to temporarily add directories to sys.path. + + This makes a copy of sys.path, appends any directories given + as positional arguments, then reverts sys.path to the copied + settings when the context ends. + + Note that *all* sys.path modifications in the body of the + context manager, including replacement of the object, + will be reverted at the end of the block. + """ + + def __init__(self, *paths): + self.original_value = sys.path[:] + self.original_object = sys.path + sys.path.extend(paths) + + def __enter__(self): + return self + + def __exit__(self, *ignore_exc): + sys.path = self.original_object + sys.path[:] = self.original_value + + +class TransientResource(object): + + """Raise ResourceDenied if an exception is raised while the context manager + is in effect that matches the specified exception and attributes.""" + + def __init__(self, exc, **kwargs): + self.exc = exc + self.attrs = kwargs + + def __enter__(self): + return self + + def __exit__(self, type_=None, value=None, traceback=None): + """If type_ is a subclass of self.exc and value has attributes matching + self.attrs, raise ResourceDenied. Otherwise let the exception + propagate (if any).""" + if type_ is not None and issubclass(self.exc, type_): + for attr, attr_value in self.attrs.iteritems(): + if not hasattr(value, attr): + break + if getattr(value, attr) != attr_value: + break + else: + raise ResourceDenied("an optional resource is not available") + + +@contextlib.contextmanager +def transient_internet(resource_name, timeout=30.0, errnos=()): + """Return a context manager that raises ResourceDenied when various issues + with the Internet connection manifest themselves as exceptions.""" + default_errnos = [ + ('ECONNREFUSED', 111), + ('ECONNRESET', 104), + ('EHOSTUNREACH', 113), + ('ENETUNREACH', 101), + ('ETIMEDOUT', 110), + # socket.create_connection() fails randomly with + # EADDRNOTAVAIL on Travis CI. + ('EADDRNOTAVAIL', 99), + ] + default_gai_errnos = [ + ('EAI_AGAIN', -3), + ('EAI_FAIL', -4), + ('EAI_NONAME', -2), + ('EAI_NODATA', -5), + # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo() + # implementation actually returns WSANO_DATA i.e. 11004. + ('WSANO_DATA', 11004), + ] + + denied = ResourceDenied("Resource '%s' is not available" % resource_name) + captured_errnos = errnos + gai_errnos = [] + if not captured_errnos: + captured_errnos = [getattr(errno, name, num) + for (name, num) in default_errnos] + gai_errnos = [getattr(socket, name, num) + for (name, num) in default_gai_errnos] + + def filter_error(err): + n = getattr(err, 'errno', None) + if (isinstance(err, socket.timeout) or + (isinstance(err, socket.gaierror) and n in gai_errnos) or + n in captured_errnos): + if not verbose: + sys.stderr.write(denied.args[0] + "\n") + raise denied + + old_timeout = socket.getdefaulttimeout() + try: + if timeout is not None: + socket.setdefaulttimeout(timeout) + yield + except IOError as err: + # urllib can wrap original socket errors multiple times (!), we must + # unwrap to get at the original error. + while True: + a = err.args + if len(a) >= 1 and isinstance(a[0], IOError): + err = a[0] + # The error can also be wrapped as args[1]: + # except socket.error as msg: + # raise IOError('socket error', msg).with_traceback(sys.exc_info()[2]) + elif len(a) >= 2 and isinstance(a[1], IOError): + err = a[1] + else: + break + filter_error(err) + raise + # XXX should we catch generic exceptions and look for their + # __cause__ or __context__? + finally: + socket.setdefaulttimeout(old_timeout) + + +@contextlib.contextmanager +def captured_output(stream_name): + """Return a context manager used by captured_stdout and captured_stdin + that temporarily replaces the sys stream *stream_name* with a StringIO.""" + import StringIO + orig_stdout = getattr(sys, stream_name) + setattr(sys, stream_name, StringIO.StringIO()) + try: + yield getattr(sys, stream_name) + finally: + setattr(sys, stream_name, orig_stdout) + +def captured_stdout(): + """Capture the output of sys.stdout: + + with captured_stdout() as s: + print "hello" + self.assertEqual(s.getvalue(), "hello") + """ + return captured_output("stdout") + +def captured_stderr(): + return captured_output("stderr") + +def captured_stdin(): + return captured_output("stdin") + +def gc_collect(): + """Force as many objects as possible to be collected. + + In non-CPython implementations of Python, this is needed because timely + deallocation is not guaranteed by the garbage collector. (Even in CPython + this can be the case in case of reference cycles.) This means that __del__ + methods may be called later than expected and weakrefs may remain alive for + longer than expected. This function tries its best to force all garbage + objects to disappear. + """ + gc.collect() + if is_jython: + time.sleep(0.1) + gc.collect() + gc.collect() + + +_header = '2P' +if hasattr(sys, "gettotalrefcount"): + _header = '2P' + _header +_vheader = _header + 'P' + +def calcobjsize(fmt): + return struct.calcsize(_header + fmt + '0P') + +def calcvobjsize(fmt): + return struct.calcsize(_vheader + fmt + '0P') + + +_TPFLAGS_HAVE_GC = 1<<14 +_TPFLAGS_HEAPTYPE = 1<<9 + +def check_sizeof(test, o, size): + import _testcapi + result = sys.getsizeof(o) + # add GC header size + if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ + ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): + size += _testcapi.SIZEOF_PYGC_HEAD + msg = 'wrong size for %s: got %d, expected %d' \ + % (type(o), result, size) + test.assertEqual(result, size, msg) + + +#======================================================================= +# Decorator for running a function in a different locale, correctly resetting +# it afterwards. + +def run_with_locale(catstr, *locales): + def decorator(func): + def inner(*args, **kwds): + try: + import locale + category = getattr(locale, catstr) + orig_locale = locale.setlocale(category) + except AttributeError: + # if the test author gives us an invalid category string + raise + except: + # cannot retrieve original locale, so do nothing + locale = orig_locale = None + else: + for loc in locales: + try: + locale.setlocale(category, loc) + break + except: + pass + + # now run the function, resetting the locale on exceptions + try: + return func(*args, **kwds) + finally: + if locale and orig_locale: + locale.setlocale(category, orig_locale) + inner.func_name = func.func_name + inner.__doc__ = func.__doc__ + return inner + return decorator + +#======================================================================= +# Decorator for running a function in a specific timezone, correctly +# resetting it afterwards. + +def run_with_tz(tz): + def decorator(func): + def inner(*args, **kwds): + try: + tzset = time.tzset + except AttributeError: + raise unittest.SkipTest("tzset required") + if 'TZ' in os.environ: + orig_tz = os.environ['TZ'] + else: + orig_tz = None + os.environ['TZ'] = tz + tzset() + + # now run the function, resetting the tz on exceptions + try: + return func(*args, **kwds) + finally: + if orig_tz is None: + del os.environ['TZ'] + else: + os.environ['TZ'] = orig_tz + time.tzset() + + inner.__name__ = func.__name__ + inner.__doc__ = func.__doc__ + return inner + return decorator + +#======================================================================= +# Big-memory-test support. Separate from 'resources' because memory use should be configurable. + +# Some handy shorthands. Note that these are used for byte-limits as well +# as size-limits, in the various bigmem tests +_1M = 1024*1024 +_1G = 1024 * _1M +_2G = 2 * _1G +_4G = 4 * _1G + +MAX_Py_ssize_t = sys.maxsize + +def set_memlimit(limit): + global max_memuse + global real_max_memuse + sizes = { + 'k': 1024, + 'm': _1M, + 'g': _1G, + 't': 1024*_1G, + } + m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, + re.IGNORECASE | re.VERBOSE) + if m is None: + raise ValueError('Invalid memory limit %r' % (limit,)) + memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) + real_max_memuse = memlimit + if memlimit > MAX_Py_ssize_t: + memlimit = MAX_Py_ssize_t + if memlimit < _2G - 1: + raise ValueError('Memory limit %r too low to be useful' % (limit,)) + max_memuse = memlimit + +def bigmemtest(minsize, memuse, overhead=5*_1M): + """Decorator for bigmem tests. + + 'minsize' is the minimum useful size for the test (in arbitrary, + test-interpreted units.) 'memuse' is the number of 'bytes per size' for + the test, or a good estimate of it. 'overhead' specifies fixed overhead, + independent of the testsize, and defaults to 5Mb. + + The decorator tries to guess a good value for 'size' and passes it to + the decorated test function. If minsize * memuse is more than the + allowed memory use (as defined by max_memuse), the test is skipped. + Otherwise, minsize is adjusted upward to use up to max_memuse. + """ + def decorator(f): + def wrapper(self): + if not max_memuse: + # If max_memuse is 0 (the default), + # we still want to run the tests with size set to a few kb, + # to make sure they work. We still want to avoid using + # too much memory, though, but we do that noisily. + maxsize = 5147 + self.assertFalse(maxsize * memuse + overhead > 20 * _1M) + else: + maxsize = int((max_memuse - overhead) / memuse) + if maxsize < minsize: + # Really ought to print 'test skipped' or something + if verbose: + sys.stderr.write("Skipping %s because of memory " + "constraint\n" % (f.__name__,)) + return + # Try to keep some breathing room in memory use + maxsize = max(maxsize - 50 * _1M, minsize) + return f(self, maxsize) + wrapper.minsize = minsize + wrapper.memuse = memuse + wrapper.overhead = overhead + return wrapper + return decorator + +def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True): + def decorator(f): + def wrapper(self): + if not real_max_memuse: + maxsize = 5147 + else: + maxsize = size + + if ((real_max_memuse or not dry_run) + and real_max_memuse < maxsize * memuse): + if verbose: + sys.stderr.write("Skipping %s because of memory " + "constraint\n" % (f.__name__,)) + return + + return f(self, maxsize) + wrapper.size = size + wrapper.memuse = memuse + wrapper.overhead = overhead + return wrapper + return decorator + +def bigaddrspacetest(f): + """Decorator for tests that fill the address space.""" + def wrapper(self): + if max_memuse < MAX_Py_ssize_t: + if verbose: + sys.stderr.write("Skipping %s because of memory " + "constraint\n" % (f.__name__,)) + else: + return f(self) + return wrapper + +#======================================================================= +# unittest integration. + +class BasicTestRunner: + def run(self, test): + result = unittest.TestResult() + test(result) + return result + +def _id(obj): + return obj + +def requires_resource(resource): + if resource == 'gui' and not _is_gui_available(): + return unittest.skip(_is_gui_available.reason) + if is_resource_enabled(resource): + return _id + else: + return unittest.skip("resource {0!r} is not enabled".format(resource)) + +def cpython_only(test): + """ + Decorator for tests only applicable on CPython. + """ + return impl_detail(cpython=True)(test) + +def impl_detail(msg=None, **guards): + if check_impl_detail(**guards): + return _id + if msg is None: + guardnames, default = _parse_guards(guards) + if default: + msg = "implementation detail not available on {0}" + else: + msg = "implementation detail specific to {0}" + guardnames = sorted(guardnames.keys()) + msg = msg.format(' or '.join(guardnames)) + return unittest.skip(msg) + +def _parse_guards(guards): + # Returns a tuple ({platform_name: run_me}, default_value) + if not guards: + return ({'cpython': True}, False) + is_true = guards.values()[0] + assert guards.values() == [is_true] * len(guards) # all True or all False + return (guards, not is_true) + +# Use the following check to guard CPython's implementation-specific tests -- +# or to run them only on the implementation(s) guarded by the arguments. +def check_impl_detail(**guards): + """This function returns True or False depending on the host platform. + Examples: + if check_impl_detail(): # only on CPython (default) + if check_impl_detail(jython=True): # only on Jython + if check_impl_detail(cpython=False): # everywhere except on CPython + """ + guards, default = _parse_guards(guards) + return guards.get(platform.python_implementation().lower(), default) + + +def _filter_suite(suite, pred): + """Recursively filter test cases in a suite based on a predicate.""" + newtests = [] + for test in suite._tests: + if isinstance(test, unittest.TestSuite): + _filter_suite(test, pred) + newtests.append(test) + else: + if pred(test): + newtests.append(test) + suite._tests = newtests + +def _run_suite(suite): + """Run tests from a unittest.TestSuite-derived class.""" + if verbose: + runner = unittest.TextTestRunner(sys.stdout, verbosity=2, + failfast=failfast) + else: + runner = BasicTestRunner() + + result = runner.run(suite) + if not result.testsRun and not result.skipped: + raise TestDidNotRun + if not result.wasSuccessful(): + if len(result.errors) == 1 and not result.failures: + err = result.errors[0][1] + elif len(result.failures) == 1 and not result.errors: + err = result.failures[0][1] + else: + err = "multiple errors occurred" + if not verbose: + err += "; run in verbose mode for details" + raise TestFailed(err) + + +# By default, don't filter tests +_match_test_func = None +_match_test_patterns = None + + +def match_test(test): + # Function used by support.run_unittest() and regrtest --list-cases + if _match_test_func is None: + return True + else: + return _match_test_func(test.id()) + + +def _is_full_match_test(pattern): + # If a pattern contains at least one dot, it's considered + # as a full test identifier. + # Example: 'test.test_os.FileTests.test_access'. + # + # Reject patterns which contain fnmatch patterns: '*', '?', '[...]' + # or '[!...]'. For example, reject 'test_access*'. + return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern)) + + +def set_match_tests(patterns): + global _match_test_func, _match_test_patterns + + if patterns == _match_test_patterns: + # No change: no need to recompile patterns. + return + + if not patterns: + func = None + # set_match_tests(None) behaves as set_match_tests(()) + patterns = () + elif all(map(_is_full_match_test, patterns)): + # Simple case: all patterns are full test identifier. + # The test.bisect utility only uses such full test identifiers. + func = set(patterns).__contains__ + else: + regex = '|'.join(map(fnmatch.translate, patterns)) + # The search *is* case sensitive on purpose: + # don't use flags=re.IGNORECASE + regex_match = re.compile(regex).match + + def match_test_regex(test_id): + if regex_match(test_id): + # The regex matchs the whole identifier like + # 'test.test_os.FileTests.test_access' + return True + else: + # Try to match parts of the test identifier. + # For example, split 'test.test_os.FileTests.test_access' + # into: 'test', 'test_os', 'FileTests' and 'test_access'. + return any(map(regex_match, test_id.split("."))) + + func = match_test_regex + + # Create a copy since patterns can be mutable and so modified later + _match_test_patterns = tuple(patterns) + _match_test_func = func + + + +def run_unittest(*classes): + """Run tests from unittest.TestCase-derived classes.""" + valid_types = (unittest.TestSuite, unittest.TestCase) + suite = unittest.TestSuite() + for cls in classes: + if isinstance(cls, str): + if cls in sys.modules: + suite.addTest(unittest.findTestCases(sys.modules[cls])) + else: + raise ValueError("str arguments must be keys in sys.modules") + elif isinstance(cls, valid_types): + suite.addTest(cls) + else: + suite.addTest(unittest.makeSuite(cls)) + _filter_suite(suite, match_test) + _run_suite(suite) + +#======================================================================= +# Check for the presence of docstrings. + +HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or + sys.platform == 'win32' or + sysconfig.get_config_var('WITH_DOC_STRINGS')) + +requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, + "test requires docstrings") + + +#======================================================================= +# doctest driver. + +def run_doctest(module, verbosity=None): + """Run doctest on the given module. Return (#failures, #tests). + + If optional argument verbosity is not specified (or is None), pass + test.support's belief about verbosity on to doctest. Else doctest's + usual behavior is used (it searches sys.argv for -v). + """ + + import doctest + + if verbosity is None: + verbosity = verbose + else: + verbosity = None + + # Direct doctest output (normally just errors) to real stdout; doctest + # output shouldn't be compared by regrtest. + save_stdout = sys.stdout + sys.stdout = get_original_stdout() + try: + f, t = doctest.testmod(module, verbose=verbosity) + if f: + raise TestFailed("%d of %d doctests failed" % (f, t)) + finally: + sys.stdout = save_stdout + if verbose: + print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t) + return f, t + +#======================================================================= +# Threading support to prevent reporting refleaks when running regrtest.py -R + +# Flag used by saved_test_environment of test.libregrtest.save_env, +# to check if a test modified the environment. The flag should be set to False +# before running a new test. +# +# For example, threading_cleanup() sets the flag is the function fails +# to cleanup threads. +environment_altered = False + +# NOTE: we use thread._count() rather than threading.enumerate() (or the +# moral equivalent thereof) because a threading.Thread object is still alive +# until its __bootstrap() method has returned, even after it has been +# unregistered from the threading module. +# thread._count(), on the other hand, only gets decremented *after* the +# __bootstrap() method has returned, which gives us reliable reference counts +# at the end of a test run. + +def threading_setup(): + if thread: + return thread._count(), + else: + return 1, + +def threading_cleanup(nb_threads): + if not thread: + return + + _MAX_COUNT = 10 + for count in range(_MAX_COUNT): + n = thread._count() + if n == nb_threads: + break + time.sleep(0.1) + # XXX print a warning in case of failure? + +def reap_threads(func): + """Use this function when threads are being used. This will + ensure that the threads are cleaned up even when the test fails. + If threading is unavailable this function does nothing. + """ + if not thread: + return func + + @functools.wraps(func) + def decorator(*args): + key = threading_setup() + try: + return func(*args) + finally: + threading_cleanup(*key) + return decorator + + +@contextlib.contextmanager +def wait_threads_exit(timeout=60.0): + """ + bpo-31234: Context manager to wait until all threads created in the with + statement exit. + + Use thread.count() to check if threads exited. Indirectly, wait until + threads exit the internal t_bootstrap() C function of the thread module. + + threading_setup() and threading_cleanup() are designed to emit a warning + if a test leaves running threads in the background. This context manager + is designed to cleanup threads started by the thread.start_new_thread() + which doesn't allow to wait for thread exit, whereas thread.Thread has a + join() method. + """ + old_count = thread._count() + try: + yield + finally: + start_time = time.time() + deadline = start_time + timeout + while True: + count = thread._count() + if count <= old_count: + break + if time.time() > deadline: + dt = time.time() - start_time + msg = ("wait_threads() failed to cleanup %s " + "threads after %.1f seconds " + "(count: %s, old count: %s)" + % (count - old_count, dt, count, old_count)) + raise AssertionError(msg) + time.sleep(0.010) + gc_collect() + + +def reap_children(): + """Use this function at the end of test_main() whenever sub-processes + are started. This will help ensure that no extra children (zombies) + stick around to hog resources and create problems when looking + for refleaks. + """ + + # Reap all our dead child processes so we don't leave zombies around. + # These hog resources and might be causing some of the buildbots to die. + if hasattr(os, 'waitpid'): + any_process = -1 + while True: + try: + # This will raise an exception on Windows. That's ok. + pid, status = os.waitpid(any_process, os.WNOHANG) + if pid == 0: + break + except: + break + +@contextlib.contextmanager +def start_threads(threads, unlock=None): + threads = list(threads) + started = [] + try: + try: + for t in threads: + t.start() + started.append(t) + except: + if verbose: + print("Can't start %d threads, only %d threads started" % + (len(threads), len(started))) + raise + yield + finally: + if unlock: + unlock() + endtime = starttime = time.time() + for timeout in range(1, 16): + endtime += 60 + for t in started: + t.join(max(endtime - time.time(), 0.01)) + started = [t for t in started if t.isAlive()] + if not started: + break + if verbose: + print('Unable to join %d threads during a period of ' + '%d minutes' % (len(started), timeout)) + started = [t for t in started if t.isAlive()] + if started: + raise AssertionError('Unable to join %d threads' % len(started)) + +@contextlib.contextmanager +def swap_attr(obj, attr, new_val): + """Temporary swap out an attribute with a new object. + + Usage: + with swap_attr(obj, "attr", 5): + ... + + This will set obj.attr to 5 for the duration of the with: block, + restoring the old value at the end of the block. If `attr` doesn't + exist on `obj`, it will be created and then deleted at the end of the + block. + + The old value (or None if it doesn't exist) will be assigned to the + target of the "as" clause, if there is one. + """ + if hasattr(obj, attr): + real_val = getattr(obj, attr) + setattr(obj, attr, new_val) + try: + yield real_val + finally: + setattr(obj, attr, real_val) + else: + setattr(obj, attr, new_val) + try: + yield + finally: + if hasattr(obj, attr): + delattr(obj, attr) + +@contextlib.contextmanager +def swap_item(obj, item, new_val): + """Temporary swap out an item with a new object. + + Usage: + with swap_item(obj, "item", 5): + ... + + This will set obj["item"] to 5 for the duration of the with: block, + restoring the old value at the end of the block. If `item` doesn't + exist on `obj`, it will be created and then deleted at the end of the + block. + + The old value (or None if it doesn't exist) will be assigned to the + target of the "as" clause, if there is one. + """ + if item in obj: + real_val = obj[item] + obj[item] = new_val + try: + yield real_val + finally: + obj[item] = real_val + else: + obj[item] = new_val + try: + yield + finally: + if item in obj: + del obj[item] + +def py3k_bytes(b): + """Emulate the py3k bytes() constructor. + + NOTE: This is only a best effort function. + """ + try: + # memoryview? + return b.tobytes() + except AttributeError: + try: + # iterable of ints? + return b"".join(chr(x) for x in b) + except TypeError: + return bytes(b) + +requires_type_collecting = unittest.skipIf(hasattr(sys, 'getcounts'), + 'types are immortal if COUNT_ALLOCS is defined') + +def args_from_interpreter_flags(): + """Return a list of command-line arguments reproducing the current + settings in sys.flags.""" + import subprocess + return subprocess._args_from_interpreter_flags() + +def strip_python_stderr(stderr): + """Strip the stderr of a Python process from potential debug output + emitted by the interpreter. + + This will typically be run on the result of the communicate() method + of a subprocess.Popen object. + """ + stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip() + return stderr + + +def check_free_after_iterating(test, iter, cls, args=()): + class A(cls): + def __del__(self): + done[0] = True + try: + next(it) + except StopIteration: + pass + + done = [False] + it = iter(A(*args)) + # Issue 26494: Shouldn't crash + test.assertRaises(StopIteration, next, it) + # The sequence should be deallocated just after the end of iterating + gc_collect() + test.assertTrue(done[0]) + +@contextlib.contextmanager +def disable_gc(): + have_gc = gc.isenabled() + gc.disable() + try: + yield + finally: + if have_gc: + gc.enable() + + +def python_is_optimized(): + """Find if Python was built with optimizations.""" + cflags = sysconfig.get_config_var('PY_CFLAGS') or '' + final_opt = "" + for opt in cflags.split(): + if opt.startswith('-O'): + final_opt = opt + return final_opt not in ('', '-O0', '-Og') + + +class SuppressCrashReport: + """Try to prevent a crash report from popping up. + + On Windows, don't display the Windows Error Reporting dialog. On UNIX, + disable the creation of coredump file. + """ + old_value = None + old_modes = None + + def __enter__(self): + """On Windows, disable Windows Error Reporting dialogs using + SetErrorMode. + + On UNIX, try to save the previous core file size limit, then set + soft limit to 0. + """ + if sys.platform.startswith('win'): + # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx + # GetErrorMode is not available on Windows XP and Windows Server 2003, + # but SetErrorMode returns the previous value, so we can use that + import ctypes + self._k32 = ctypes.windll.kernel32 + SEM_NOGPFAULTERRORBOX = 0x02 + self.old_value = self._k32.SetErrorMode(SEM_NOGPFAULTERRORBOX) + self._k32.SetErrorMode(self.old_value | SEM_NOGPFAULTERRORBOX) + + # Suppress assert dialogs in debug builds + # (see http://bugs.python.org/issue23314) + try: + import _testcapi + _testcapi.CrtSetReportMode + except (AttributeError, ImportError): + # no _testcapi or a release build + pass + else: + self.old_modes = {} + for report_type in [_testcapi.CRT_WARN, + _testcapi.CRT_ERROR, + _testcapi.CRT_ASSERT]: + old_mode = _testcapi.CrtSetReportMode(report_type, + _testcapi.CRTDBG_MODE_FILE) + old_file = _testcapi.CrtSetReportFile(report_type, + _testcapi.CRTDBG_FILE_STDERR) + self.old_modes[report_type] = old_mode, old_file + + else: + try: + import resource + except ImportError: + resource = None + + if resource is not None: + try: + self.old_value = resource.getrlimit(resource.RLIMIT_CORE) + resource.setrlimit(resource.RLIMIT_CORE, + (0, self.old_value[1])) + except (ValueError, OSError): + pass + + if sys.platform == 'darwin': + # Check if the 'Crash Reporter' on OSX was configured + # in 'Developer' mode and warn that it will get triggered + # when it is. + # + # This assumes that this context manager is used in tests + # that might trigger the next manager. + import subprocess + cmd = ['/usr/bin/defaults', 'read', + 'com.apple.CrashReporter', 'DialogType'] + proc = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout = proc.communicate()[0] + if stdout.strip() == b'developer': + sys.stdout.write("this test triggers the Crash Reporter, " + "that is intentional") + sys.stdout.flush() + + return self + + def __exit__(self, *ignore_exc): + """Restore Windows ErrorMode or core file behavior to initial value.""" + if self.old_value is None: + return + + if sys.platform.startswith('win'): + self._k32.SetErrorMode(self.old_value) + + if self.old_modes: + import _testcapi + for report_type, (old_mode, old_file) in self.old_modes.items(): + _testcapi.CrtSetReportMode(report_type, old_mode) + _testcapi.CrtSetReportFile(report_type, old_file) + else: + import resource + try: + resource.setrlimit(resource.RLIMIT_CORE, self.old_value) + except (ValueError, OSError): + pass + + +def _crash_python(): + """Deliberate crash of Python. + + Python can be killed by a segmentation fault (SIGSEGV), a bus error + (SIGBUS), or a different error depending on the platform. + + Use SuppressCrashReport() to prevent a crash report from popping up. + """ + + import _testcapi + with SuppressCrashReport(): + _testcapi._read_null() + + +def fd_count(): + """Count the number of open file descriptors. + """ + if sys.platform.startswith(('linux', 'freebsd')): + try: + names = os.listdir("/proc/self/fd") + # Substract one because listdir() opens internally a file + # descriptor to list the content of the /proc/self/fd/ directory. + return len(names) - 1 + except OSError as exc: + if exc.errno != errno.ENOENT: + raise + + MAXFD = 256 + if hasattr(os, 'sysconf'): + try: + MAXFD = os.sysconf("SC_OPEN_MAX") + except OSError: + pass + + old_modes = None + if sys.platform == 'win32': + # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process + # on invalid file descriptor if Python is compiled in debug mode + try: + import msvcrt + msvcrt.CrtSetReportMode + except (AttributeError, ImportError): + # no msvcrt or a release build + pass + else: + old_modes = {} + for report_type in (msvcrt.CRT_WARN, + msvcrt.CRT_ERROR, + msvcrt.CRT_ASSERT): + old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 0) + + try: + count = 0 + for fd in range(MAXFD): + try: + # Prefer dup() over fstat(). fstat() can require input/output + # whereas dup() doesn't. + fd2 = os.dup(fd) + except OSError as e: + if e.errno != errno.EBADF: + raise + else: + os.close(fd2) + count += 1 + finally: + if old_modes is not None: + for report_type in (msvcrt.CRT_WARN, + msvcrt.CRT_ERROR, + msvcrt.CRT_ASSERT): + msvcrt.CrtSetReportMode(report_type, old_modes[report_type]) + + return count + + +class SaveSignals: + """ + Save an restore signal handlers. + + This class is only able to save/restore signal handlers registered + by the Python signal module: see bpo-13285 for "external" signal + handlers. + """ + + def __init__(self): + import signal + self.signal = signal + self.signals = list(range(1, signal.NSIG)) + # SIGKILL and SIGSTOP signals cannot be ignored nor catched + for signame in ('SIGKILL', 'SIGSTOP'): + try: + signum = getattr(signal, signame) + except AttributeError: + continue + self.signals.remove(signum) + self.handlers = {} + + def save(self): + for signum in self.signals: + handler = self.signal.getsignal(signum) + if handler is None: + # getsignal() returns None if a signal handler was not + # registered by the Python signal module, + # and the handler is not SIG_DFL nor SIG_IGN. + # + # Ignore the signal: we cannot restore the handler. + continue + self.handlers[signum] = handler + + def restore(self): + for signum, handler in self.handlers.items(): + self.signal.signal(signum, handler) diff --git a/contrib/tools/python/src/Lib/test/support/script_helper.py b/contrib/tools/python/src/Lib/test/support/script_helper.py new file mode 100644 index 0000000000..e06cdc32d1 --- /dev/null +++ b/contrib/tools/python/src/Lib/test/support/script_helper.py @@ -0,0 +1,170 @@ +# Common utility functions used by various script execution tests +# e.g. test_cmd_line, test_cmd_line_script and test_runpy + +import sys +import os +import re +import os.path +import tempfile +import subprocess +import py_compile +import contextlib +import shutil +try: + import zipfile +except ImportError: + # If Python is build without Unicode support, importing _io will + # fail, which, in turn, means that zipfile cannot be imported + # Most of this module can then still be used. + pass + +from test.support import strip_python_stderr + +# Executing the interpreter in a subprocess +def _assert_python(expected_success, *args, **env_vars): + cmd_line = [sys.executable] + if not env_vars: + cmd_line.append('-E') + cmd_line.extend(args) + # Need to preserve the original environment, for in-place testing of + # shared library builds. + env = os.environ.copy() + env.update(env_vars) + p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + env=env) + try: + out, err = p.communicate() + finally: + subprocess._cleanup() + p.stdout.close() + p.stderr.close() + rc = p.returncode + err = strip_python_stderr(err) + if (rc and expected_success) or (not rc and not expected_success): + raise AssertionError( + "Process return code is %d, " + "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore'))) + return rc, out, err + +def assert_python_ok(*args, **env_vars): + """ + Assert that running the interpreter with `args` and optional environment + variables `env_vars` is ok and return a (return code, stdout, stderr) tuple. + """ + return _assert_python(True, *args, **env_vars) + +def assert_python_failure(*args, **env_vars): + """ + Assert that running the interpreter with `args` and optional environment + variables `env_vars` fails and return a (return code, stdout, stderr) tuple. + """ + return _assert_python(False, *args, **env_vars) + +def python_exit_code(*args): + cmd_line = [sys.executable, '-E'] + cmd_line.extend(args) + with open(os.devnull, 'w') as devnull: + return subprocess.call(cmd_line, stdout=devnull, + stderr=subprocess.STDOUT) + +def spawn_python(*args, **kwargs): + cmd_line = [sys.executable, '-E'] + cmd_line.extend(args) + return subprocess.Popen(cmd_line, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + **kwargs) + +def kill_python(p): + p.stdin.close() + data = p.stdout.read() + p.stdout.close() + # try to cleanup the child so we don't appear to leak when running + # with regrtest -R. + p.wait() + subprocess._cleanup() + return data + +def run_python(*args, **kwargs): + if __debug__: + p = spawn_python(*args, **kwargs) + else: + p = spawn_python('-O', *args, **kwargs) + stdout_data = kill_python(p) + return p.wait(), stdout_data + +# Script creation utilities +@contextlib.contextmanager +def temp_dir(): + dirname = tempfile.mkdtemp() + dirname = os.path.realpath(dirname) + try: + yield dirname + finally: + shutil.rmtree(dirname) + +def make_script(script_dir, script_basename, source): + script_filename = script_basename+os.extsep+'py' + script_name = os.path.join(script_dir, script_filename) + script_file = open(script_name, 'w') + script_file.write(source) + script_file.close() + return script_name + +def compile_script(script_name): + py_compile.compile(script_name, doraise=True) + if __debug__: + compiled_name = script_name + 'c' + else: + compiled_name = script_name + 'o' + return compiled_name + +def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None): + zip_filename = zip_basename+os.extsep+'zip' + zip_name = os.path.join(zip_dir, zip_filename) + zip_file = zipfile.ZipFile(zip_name, 'w') + if name_in_zip is None: + name_in_zip = os.path.basename(script_name) + zip_file.write(script_name, name_in_zip) + zip_file.close() + #if test.test_support.verbose: + # zip_file = zipfile.ZipFile(zip_name, 'r') + # print 'Contents of %r:' % zip_name + # zip_file.printdir() + # zip_file.close() + return zip_name, os.path.join(zip_name, name_in_zip) + +def make_pkg(pkg_dir, init_source=''): + os.mkdir(pkg_dir) + make_script(pkg_dir, '__init__', init_source) + +def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename, + source, depth=1, compiled=False): + unlink = [] + init_name = make_script(zip_dir, '__init__', '') + unlink.append(init_name) + init_basename = os.path.basename(init_name) + script_name = make_script(zip_dir, script_basename, source) + unlink.append(script_name) + if compiled: + init_name = compile_script(init_name) + script_name = compile_script(script_name) + unlink.extend((init_name, script_name)) + pkg_names = [os.sep.join([pkg_name]*i) for i in range(1, depth+1)] + script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name)) + zip_filename = zip_basename+os.extsep+'zip' + zip_name = os.path.join(zip_dir, zip_filename) + zip_file = zipfile.ZipFile(zip_name, 'w') + for name in pkg_names: + init_name_in_zip = os.path.join(name, init_basename) + zip_file.write(init_name, init_name_in_zip) + zip_file.write(script_name, script_name_in_zip) + zip_file.close() + for name in unlink: + os.unlink(name) + #if test.test_support.verbose: + # zip_file = zipfile.ZipFile(zip_name, 'r') + # print 'Contents of %r:' % zip_name + # zip_file.printdir() + # zip_file.close() + return zip_name, os.path.join(zip_name, script_name_in_zip) diff --git a/contrib/tools/python/src/Lib/test/test_support.py b/contrib/tools/python/src/Lib/test/test_support.py new file mode 100644 index 0000000000..3c894afcef --- /dev/null +++ b/contrib/tools/python/src/Lib/test/test_support.py @@ -0,0 +1,3 @@ +import sys +import test.support +sys.modules['test.test_support'] = test.support |