diff options
author | Anton Samokhvalov <[email protected]> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /contrib/tools/cython/Cython/Utils.py | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) |
Restoring authorship annotation for Anton Samokhvalov <[email protected]>. Commit 1 of 2.
Diffstat (limited to 'contrib/tools/cython/Cython/Utils.py')
-rw-r--r-- | contrib/tools/cython/Cython/Utils.py | 532 |
1 files changed, 266 insertions, 266 deletions
diff --git a/contrib/tools/cython/Cython/Utils.py b/contrib/tools/cython/Cython/Utils.py index d59d67d78b1..5a3a5bb0b68 100644 --- a/contrib/tools/cython/Cython/Utils.py +++ b/contrib/tools/cython/Cython/Utils.py @@ -1,8 +1,8 @@ -# -# Cython -- Things that don't belong -# anywhere else in particular -# - +# +# Cython -- Things that don't belong +# anywhere else in particular +# + from __future__ import absolute_import try: @@ -15,87 +15,87 @@ try: except NameError: FileNotFoundError = OSError -import os -import sys -import re -import io -import codecs +import os +import sys +import re +import io +import codecs import shutil import tempfile -from contextlib import contextmanager - -modification_time = os.path.getmtime - +from contextlib import contextmanager + +modification_time = os.path.getmtime + _function_caches = [] def clear_function_caches(): for cache in _function_caches: cache.clear() - -def cached_function(f): - cache = {} + +def cached_function(f): + cache = {} _function_caches.append(cache) - uncomputed = object() - def wrapper(*args): - res = cache.get(args, uncomputed) - if res is uncomputed: - res = cache[args] = f(*args) - return res + uncomputed = object() + def wrapper(*args): + res = cache.get(args, uncomputed) + if res is uncomputed: + res = cache[args] = f(*args) + return res wrapper.uncached = f - return wrapper - -def cached_method(f): - cache_name = '__%s_cache' % f.__name__ - def wrapper(self, *args): - cache = getattr(self, cache_name, None) - if cache is None: - cache = {} - setattr(self, cache_name, cache) - if args in cache: - return cache[args] - res = cache[args] = f(self, *args) - return res - return wrapper - -def replace_suffix(path, newsuf): - base, _ = os.path.splitext(path) - return base + newsuf - - -def open_new_file(path): - if os.path.exists(path): - # Make sure to create a new file here so we can - # safely hard link the output files. - os.unlink(path) - - # we use the ISO-8859-1 encoding here because we only write pure - # ASCII strings or (e.g. for file names) byte encoded strings as - # Unicode, so we need a direct mapping from the first 256 Unicode - # characters to a byte sequence, which ISO-8859-1 provides - - # note: can't use io.open() in Py2 as we may be writing str objects - return codecs.open(path, "w", encoding="ISO-8859-1") - - -def castrate_file(path, st): - # Remove junk contents from an output file after a - # failed compilation. - # Also sets access and modification times back to - # those specified by st (a stat struct). - try: - f = open_new_file(path) - except EnvironmentError: - pass - else: - f.write( - "#error Do not use this file, it is the result of a failed Cython compilation.\n") - f.close() - if st: - os.utime(path, (st.st_atime, st.st_mtime-1)) - -def file_newer_than(path, time): - ftime = modification_time(path) - return ftime > time - + return wrapper + +def cached_method(f): + cache_name = '__%s_cache' % f.__name__ + def wrapper(self, *args): + cache = getattr(self, cache_name, None) + if cache is None: + cache = {} + setattr(self, cache_name, cache) + if args in cache: + return cache[args] + res = cache[args] = f(self, *args) + return res + return wrapper + +def replace_suffix(path, newsuf): + base, _ = os.path.splitext(path) + return base + newsuf + + +def open_new_file(path): + if os.path.exists(path): + # Make sure to create a new file here so we can + # safely hard link the output files. + os.unlink(path) + + # we use the ISO-8859-1 encoding here because we only write pure + # ASCII strings or (e.g. for file names) byte encoded strings as + # Unicode, so we need a direct mapping from the first 256 Unicode + # characters to a byte sequence, which ISO-8859-1 provides + + # note: can't use io.open() in Py2 as we may be writing str objects + return codecs.open(path, "w", encoding="ISO-8859-1") + + +def castrate_file(path, st): + # Remove junk contents from an output file after a + # failed compilation. + # Also sets access and modification times back to + # those specified by st (a stat struct). + try: + f = open_new_file(path) + except EnvironmentError: + pass + else: + f.write( + "#error Do not use this file, it is the result of a failed Cython compilation.\n") + f.close() + if st: + os.utime(path, (st.st_atime, st.st_mtime-1)) + +def file_newer_than(path, time): + ftime = modification_time(path) + return ftime > time + def safe_makedirs(path): try: @@ -124,61 +124,61 @@ def copy_file_to_dir_if_newer(sourcefile, destdir): shutil.copy2(sourcefile, destfile) -@cached_function -def find_root_package_dir(file_path): - dir = os.path.dirname(file_path) - if file_path == dir: - return dir - elif is_package_dir(dir): - return find_root_package_dir(dir) - else: - return dir - -@cached_function -def check_package_dir(dir, package_names): - for dirname in package_names: - dir = os.path.join(dir, dirname) - if not is_package_dir(dir): - return None - return dir - -@cached_function -def is_package_dir(dir_path): - for filename in ("__init__.py", - "__init__.pyc", - "__init__.pyx", - "__init__.pxd"): - path = os.path.join(dir_path, filename) - if path_exists(path): - return 1 - -@cached_function -def path_exists(path): - # try on the filesystem first - if os.path.exists(path): - return True - # figure out if a PEP 302 loader is around - try: - loader = __loader__ - # XXX the code below assumes a 'zipimport.zipimporter' instance - # XXX should be easy to generalize, but too lazy right now to write it - archive_path = getattr(loader, 'archive', None) - if archive_path: - normpath = os.path.normpath(path) - if normpath.startswith(archive_path): - arcname = normpath[len(archive_path)+1:] - try: - loader.get_data(arcname) - return True - except IOError: - return False - except NameError: - pass - return False - -# file name encodings - -def decode_filename(filename): +@cached_function +def find_root_package_dir(file_path): + dir = os.path.dirname(file_path) + if file_path == dir: + return dir + elif is_package_dir(dir): + return find_root_package_dir(dir) + else: + return dir + +@cached_function +def check_package_dir(dir, package_names): + for dirname in package_names: + dir = os.path.join(dir, dirname) + if not is_package_dir(dir): + return None + return dir + +@cached_function +def is_package_dir(dir_path): + for filename in ("__init__.py", + "__init__.pyc", + "__init__.pyx", + "__init__.pxd"): + path = os.path.join(dir_path, filename) + if path_exists(path): + return 1 + +@cached_function +def path_exists(path): + # try on the filesystem first + if os.path.exists(path): + return True + # figure out if a PEP 302 loader is around + try: + loader = __loader__ + # XXX the code below assumes a 'zipimport.zipimporter' instance + # XXX should be easy to generalize, but too lazy right now to write it + archive_path = getattr(loader, 'archive', None) + if archive_path: + normpath = os.path.normpath(path) + if normpath.startswith(archive_path): + arcname = normpath[len(archive_path)+1:] + try: + loader.get_data(arcname) + return True + except IOError: + return False + except NameError: + pass + return False + +# file name encodings + +def decode_filename(filename): if isinstance(filename, bytes): try: filename_encoding = sys.getfilesystemencoding() @@ -187,17 +187,17 @@ def decode_filename(filename): filename = filename.decode(filename_encoding) except UnicodeDecodeError: pass - return filename - -# support for source file encoding detection - + return filename + +# support for source file encoding detection + _match_file_encoding = re.compile(br"(\w*coding)[:=]\s*([-\w.]+)").search - - -def detect_opened_file_encoding(f): - # PEPs 263 and 3120 + + +def detect_opened_file_encoding(f): + # PEPs 263 and 3120 # Most of the time the first two lines fall in the first couple of hundred chars, - # and this bulk read/split is much faster. + # and this bulk read/split is much faster. lines = () start = b'' while len(lines) < 3: @@ -211,21 +211,21 @@ def detect_opened_file_encoding(f): return m.group(2).decode('iso8859-1') elif len(lines) > 1: m = _match_file_encoding(lines[1]) - if m: + if m: return m.group(2).decode('iso8859-1') - return "UTF-8" - - -def skip_bom(f): - """ - Read past a BOM at the beginning of a source file. - This could be added to the scanner, but it's *substantially* easier - to keep it at this level. - """ - if f.read(1) != u'\uFEFF': - f.seek(0) - - + return "UTF-8" + + +def skip_bom(f): + """ + Read past a BOM at the beginning of a source file. + This could be added to the scanner, but it's *substantially* easier + to keep it at this level. + """ + if f.read(1) != u'\uFEFF': + f.seek(0) + + def open_source_file(source_filename, encoding=None, error_handling=None): stream = None try: @@ -233,157 +233,157 @@ def open_source_file(source_filename, encoding=None, error_handling=None): # Most of the time the encoding is not specified, so try hard to open the file only once. f = io.open(source_filename, 'rb') encoding = detect_opened_file_encoding(f) - f.seek(0) + f.seek(0) stream = io.TextIOWrapper(f, encoding=encoding, errors=error_handling) - else: + else: stream = io.open(source_filename, encoding=encoding, errors=error_handling) - + except OSError: if os.path.exists(source_filename): raise # File is there, but something went wrong reading from it. # Allow source files to be in zip files etc. - try: - loader = __loader__ - if source_filename.startswith(loader.archive): + try: + loader = __loader__ + if source_filename.startswith(loader.archive): stream = open_source_from_loader( - loader, source_filename, + loader, source_filename, encoding, error_handling) - except (NameError, AttributeError): - pass - + except (NameError, AttributeError): + pass + if stream is None: raise FileNotFoundError(source_filename) - skip_bom(stream) - return stream - - -def open_source_from_loader(loader, - source_filename, + skip_bom(stream) + return stream + + +def open_source_from_loader(loader, + source_filename, encoding=None, error_handling=None): - nrmpath = os.path.normpath(source_filename) - arcname = nrmpath[len(loader.archive)+1:] - data = loader.get_data(arcname) - return io.TextIOWrapper(io.BytesIO(data), - encoding=encoding, - errors=error_handling) - - -def str_to_number(value): - # note: this expects a string as input that was accepted by the + nrmpath = os.path.normpath(source_filename) + arcname = nrmpath[len(loader.archive)+1:] + data = loader.get_data(arcname) + return io.TextIOWrapper(io.BytesIO(data), + encoding=encoding, + errors=error_handling) + + +def str_to_number(value): + # note: this expects a string as input that was accepted by the # parser already, with an optional "-" sign in front is_neg = False if value[:1] == '-': is_neg = True value = value[1:] - if len(value) < 2: - value = int(value, 0) - elif value[0] == '0': + if len(value) < 2: + value = int(value, 0) + elif value[0] == '0': literal_type = value[1] # 0'o' - 0'b' - 0'x' if literal_type in 'xX': - # hex notation ('0x1AF') - value = int(value[2:], 16) + # hex notation ('0x1AF') + value = int(value[2:], 16) elif literal_type in 'oO': - # Py3 octal notation ('0o136') - value = int(value[2:], 8) + # Py3 octal notation ('0o136') + value = int(value[2:], 8) elif literal_type in 'bB': - # Py3 binary notation ('0b101') - value = int(value[2:], 2) - else: - # Py2 octal notation ('0136') - value = int(value, 8) - else: - value = int(value, 0) + # Py3 binary notation ('0b101') + value = int(value[2:], 2) + else: + # Py2 octal notation ('0136') + value = int(value, 8) + else: + value = int(value, 0) return -value if is_neg else value - - -def long_literal(value): - if isinstance(value, basestring): - value = str_to_number(value) - return not -2**31 <= value < 2**31 - - -@cached_function -def get_cython_cache_dir(): + + +def long_literal(value): + if isinstance(value, basestring): + value = str_to_number(value) + return not -2**31 <= value < 2**31 + + +@cached_function +def get_cython_cache_dir(): r""" Return the base directory containing Cython's caches. - - Priority: - - 1. CYTHON_CACHE_DIR - 2. (OS X): ~/Library/Caches/Cython - (posix not OS X): XDG_CACHE_HOME/cython if XDG_CACHE_HOME defined - 3. ~/.cython - - """ - if 'CYTHON_CACHE_DIR' in os.environ: - return os.environ['CYTHON_CACHE_DIR'] - - parent = None - if os.name == 'posix': - if sys.platform == 'darwin': - parent = os.path.expanduser('~/Library/Caches') - else: - # this could fallback on ~/.cache - parent = os.environ.get('XDG_CACHE_HOME') - - if parent and os.path.isdir(parent): - return os.path.join(parent, 'cython') - - # last fallback: ~/.cython - return os.path.expanduser(os.path.join('~', '.cython')) - - -@contextmanager -def captured_fd(stream=2, encoding=None): - orig_stream = os.dup(stream) # keep copy of original stream - try: + + Priority: + + 1. CYTHON_CACHE_DIR + 2. (OS X): ~/Library/Caches/Cython + (posix not OS X): XDG_CACHE_HOME/cython if XDG_CACHE_HOME defined + 3. ~/.cython + + """ + if 'CYTHON_CACHE_DIR' in os.environ: + return os.environ['CYTHON_CACHE_DIR'] + + parent = None + if os.name == 'posix': + if sys.platform == 'darwin': + parent = os.path.expanduser('~/Library/Caches') + else: + # this could fallback on ~/.cache + parent = os.environ.get('XDG_CACHE_HOME') + + if parent and os.path.isdir(parent): + return os.path.join(parent, 'cython') + + # last fallback: ~/.cython + return os.path.expanduser(os.path.join('~', '.cython')) + + +@contextmanager +def captured_fd(stream=2, encoding=None): + orig_stream = os.dup(stream) # keep copy of original stream + try: with tempfile.TemporaryFile(mode="a+b") as temp_file: def read_output(_output=[b'']): if not temp_file.closed: temp_file.seek(0) _output[0] = temp_file.read() return _output[0] - + os.dup2(temp_file.fileno(), stream) # replace stream by copy of pipe try: def get_output(): result = read_output() return result.decode(encoding) if encoding else result - + yield get_output finally: os.dup2(orig_stream, stream) # restore original stream read_output() # keep the output in case it's used after closing the context manager - finally: - os.close(orig_stream) - - + finally: + os.close(orig_stream) + + def print_bytes(s, header_text=None, end=b'\n', file=sys.stdout, flush=True): if header_text: file.write(header_text) # note: text! => file.write() instead of out.write() - file.flush() - try: - out = file.buffer # Py3 - except AttributeError: - out = file # Py2 - out.write(s) - if end: - out.write(end) - if flush: - out.flush() - -class LazyStr: - def __init__(self, callback): - self.callback = callback - def __str__(self): - return self.callback() - def __repr__(self): - return self.callback() - def __add__(self, right): - return self.callback() + right - def __radd__(self, left): - return left + self.callback() - + file.flush() + try: + out = file.buffer # Py3 + except AttributeError: + out = file # Py2 + out.write(s) + if end: + out.write(end) + if flush: + out.flush() + +class LazyStr: + def __init__(self, callback): + self.callback = callback + def __str__(self): + return self.callback() + def __repr__(self): + return self.callback() + def __add__(self, right): + return self.callback() + right + def __radd__(self, left): + return left + self.callback() + class OrderedSet(object): def __init__(self, elements=()): |