diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/python/fs | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/python/fs')
-rw-r--r-- | library/python/fs/__init__.py | 501 | ||||
-rw-r--r-- | library/python/fs/clonefile.pyx | 18 | ||||
-rw-r--r-- | library/python/fs/test/test_fs.py | 1037 | ||||
-rw-r--r-- | library/python/fs/test/ya.make | 14 | ||||
-rw-r--r-- | library/python/fs/ya.make | 23 |
5 files changed, 1593 insertions, 0 deletions
diff --git a/library/python/fs/__init__.py b/library/python/fs/__init__.py new file mode 100644 index 0000000000..b1b7cde079 --- /dev/null +++ b/library/python/fs/__init__.py @@ -0,0 +1,501 @@ +# coding: utf-8 + +import codecs +import errno +import logging +import os +import random +import shutil +import six +import stat +import sys + +import library.python.func +import library.python.strings +import library.python.windows + +logger = logging.getLogger(__name__) + + +try: + WindowsError +except NameError: + WindowsError = None + + +_diehard_win_tries = 10 +errorfix_win = library.python.windows.errorfix + + +class CustomFsError(OSError): + def __init__(self, errno, message='', filename=None): + super(CustomFsError, self).__init__(message) + self.errno = errno + self.strerror = os.strerror(errno) + self.filename = filename + + +# Directories creation +# If dst is already exists and is a directory - does nothing +# Throws OSError +@errorfix_win +def ensure_dir(path): + try: + os.makedirs(path) + except OSError as e: + if e.errno != errno.EEXIST or not os.path.isdir(path): + raise + + +# Directories creation +# If dst is already exists and is a directory - does nothing +# Returns path +# Throws OSError +@errorfix_win +def create_dirs(path): + ensure_dir(path) + return path + + +# Atomic file/directory move (rename) +# Doesn't guarantee dst replacement +# Atomic if no device boundaries are crossed +# Depends on ctypes on Windows +# Throws OSError +# On Unix, if dst exists: +# if dst is file or empty dir - replaces it +# if src is dir and dst is not dir - throws OSError (errno ENOTDIR) +# if src is dir and dst is non-empty dir - throws OSError (errno ENOTEMPTY) +# if src is file and dst is dir - throws OSError (errno EISDIR) +# On Windows, if dst exists - throws OSError (errno EEXIST) +@errorfix_win +@library.python.windows.diehard(library.python.windows.RETRIABLE_FILE_ERRORS, tries=_diehard_win_tries) +def move(src, dst): + os.rename(src, dst) + + +# Atomic replacing file move (rename) +# Replaces dst if exists and not a dir +# Doesn't guarantee dst dir replacement +# Atomic if no device boundaries are crossed +# Depends on ctypes on Windows +# Throws OSError +# On Unix, if dst exists: +# if dst is file - replaces it +# if dst is dir - throws OSError (errno EISDIR) +# On Windows, if dst exists: +# if dst is file - replaces it +# if dst is dir - throws OSError (errno EACCES) +@errorfix_win +@library.python.windows.diehard(library.python.windows.RETRIABLE_FILE_ERRORS, tries=_diehard_win_tries) +def replace_file(src, dst): + if library.python.windows.on_win(): + library.python.windows.replace_file(src, dst) + else: + os.rename(src, dst) + + +# File/directory replacing move (rename) +# Removes dst if exists +# Non-atomic +# Depends on ctypes on Windows +# Throws OSError +@errorfix_win +def replace(src, dst): + try: + move(src, dst) + except OSError as e: + if e.errno not in (errno.EEXIST, errno.EISDIR, errno.ENOTDIR, errno.ENOTEMPTY): + raise + remove_tree(dst) + move(src, dst) + + +# Atomic file remove +# Throws OSError +@errorfix_win +@library.python.windows.diehard(library.python.windows.RETRIABLE_FILE_ERRORS, tries=_diehard_win_tries) +def remove_file(path): + os.remove(path) + + +# Atomic empty directory remove +# Throws OSError +@errorfix_win +@library.python.windows.diehard(library.python.windows.RETRIABLE_DIR_ERRORS, tries=_diehard_win_tries) +def remove_dir(path): + os.rmdir(path) + + +def fix_path_encoding(path): + return library.python.strings.to_str(path, library.python.strings.fs_encoding()) + + +# File/directory remove +# Non-atomic +# Throws OSError, AssertionError +@errorfix_win +def remove_tree(path): + @library.python.windows.diehard(library.python.windows.RETRIABLE_DIR_ERRORS, tries=_diehard_win_tries) + def rmtree(path): + if library.python.windows.on_win(): + library.python.windows.rmtree(path) + else: + shutil.rmtree(fix_path_encoding(path)) + + st = os.lstat(path) + if stat.S_ISLNK(st.st_mode) or stat.S_ISREG(st.st_mode): + remove_file(path) + elif stat.S_ISDIR(st.st_mode): + rmtree(path) + else: + assert False + + +# File/directory remove ignoring errors +# Non-atomic +@errorfix_win +def remove_tree_safe(path): + try: + st = os.lstat(path) + if stat.S_ISLNK(st.st_mode) or stat.S_ISREG(st.st_mode): + os.remove(path) + elif stat.S_ISDIR(st.st_mode): + shutil.rmtree(fix_path_encoding(path), ignore_errors=True) + # XXX + except UnicodeDecodeError as e: + logging.exception(u'remove_tree_safe with argument %s raise exception: %s', path, e) + raise + except OSError: + pass + + +# File/directory remove +# If path doesn't exist - does nothing +# Non-atomic +# Throws OSError, AssertionError +@errorfix_win +def ensure_removed(path): + try: + remove_tree(path) + except OSError as e: + if e.errno != errno.ENOENT: + raise + + +# Atomic file hardlink +# Dst must not exist +# Depends on ctypes on Windows +# Throws OSError +# If dst exists - throws OSError (errno EEXIST) +@errorfix_win +def hardlink(src, lnk): + if library.python.windows.on_win(): + library.python.windows.hardlink(src, lnk) + else: + os.link(src, lnk) + + +@errorfix_win +def hardlink_or_copy(src, lnk): + def should_fallback_to_copy(exc): + if WindowsError is not None and isinstance(exc, WindowsError) and exc.winerror == 1142: # too many hardlinks + return True + # cross-device hardlink or too many hardlinks, or some known WSL error + if isinstance(exc, OSError) and exc.errno in ( + errno.EXDEV, + errno.EMLINK, + errno.EINVAL, + errno.EACCES, + errno.EPERM, + ): + return True + return False + + try: + hardlink(src, lnk) + except Exception as e: + logger.debug('Failed to hardlink %s to %s with error %s, will copy it', src, lnk, repr(e)) + if should_fallback_to_copy(e): + copy2(src, lnk, follow_symlinks=False) + else: + raise + + +# Atomic file/directory symlink (Unix only) +# Dst must not exist +# Throws OSError +# If dst exists - throws OSError (errno EEXIST) +@errorfix_win +def symlink(src, lnk): + if library.python.windows.on_win(): + library.python.windows.run_disabled(src, lnk) + else: + os.symlink(src, lnk) + + +# shutil.copy2 with follow_symlinks=False parameter (Unix only) +def copy2(src, lnk, follow_symlinks=True): + if six.PY3: + shutil.copy2(src, lnk, follow_symlinks=follow_symlinks) + return + + if follow_symlinks or not os.path.islink(src): + shutil.copy2(src, lnk) + return + + symlink(os.readlink(src), lnk) + + +# Recursively hardlink directory +# Uses plain hardlink for files +# Dst must not exist +# Non-atomic +# Throws OSError +@errorfix_win +def hardlink_tree(src, dst): + if not os.path.exists(src): + raise CustomFsError(errno.ENOENT, filename=src) + if os.path.isfile(src): + hardlink(src, dst) + return + for dirpath, _, filenames in walk_relative(src): + src_dirpath = os.path.join(src, dirpath) if dirpath != '.' else src + dst_dirpath = os.path.join(dst, dirpath) if dirpath != '.' else dst + os.mkdir(dst_dirpath) + for filename in filenames: + hardlink(os.path.join(src_dirpath, filename), os.path.join(dst_dirpath, filename)) + + +# File copy +# throws EnvironmentError (OSError, IOError) +@errorfix_win +def copy_file(src, dst, copy_function=shutil.copy2): + if os.path.isdir(dst): + raise CustomFsError(errno.EISDIR, filename=dst) + copy_function(src, dst) + + +# File/directory copy +# throws EnvironmentError (OSError, IOError, shutil.Error) +@errorfix_win +def copy_tree(src, dst, copy_function=shutil.copy2): + if os.path.isfile(src): + copy_file(src, dst, copy_function=copy_function) + return + copytree3(src, dst, copy_function=copy_function) + + +# File read +# Throws OSError +@errorfix_win +def read_file(path, binary=True): + with open(path, 'r' + ('b' if binary else '')) as f: + return f.read() + + +# Decoding file read +# Throws OSError +@errorfix_win +def read_file_unicode(path, binary=True, enc='utf-8'): + if not binary: + if six.PY2: + with open(path, 'r') as f: + return library.python.strings.to_unicode(f.read(), enc) + else: + with open(path, 'r', encoding=enc) as f: + return f.read() + # codecs.open is always binary + with codecs.open(path, 'r', encoding=enc, errors=library.python.strings.ENCODING_ERRORS_POLICY) as f: + return f.read() + + +@errorfix_win +def open_file(*args, **kwargs): + return ( + library.python.windows.open_file(*args, **kwargs) if library.python.windows.on_win() else open(*args, **kwargs) + ) + + +# Atomic file write +# Throws OSError +@errorfix_win +def write_file(path, data, binary=True): + dir_path = os.path.dirname(path) + if dir_path: + ensure_dir(dir_path) + tmp_path = path + '.tmp.' + str(random.random()) + with open_file(tmp_path, 'w' + ('b' if binary else '')) as f: + if not isinstance(data, bytes) and binary: + data = data.encode('UTF-8') + f.write(data) + replace_file(tmp_path, path) + + +# File size +# Throws OSError +@errorfix_win +def get_file_size(path): + return os.path.getsize(path) + + +# File/directory size +# Non-recursive mode for directory counts size for immediates +# While raise_all_errors is set to False, file size fallbacks to zero in case of getsize errors +# Throws OSError +@errorfix_win +def get_tree_size(path, recursive=False, raise_all_errors=False): + if os.path.isfile(path): + return get_file_size(path) + total_size = 0 + for dir_path, _, files in os.walk(path): + for f in files: + fp = os.path.join(dir_path, f) + try: + total_size += get_file_size(fp) + except OSError as e: + if raise_all_errors: + raise + logger.debug("Cannot calculate file size: %s", e) + if not recursive: + break + return total_size + + +# Directory copy ported from Python 3 +def copytree3( + src, + dst, + symlinks=False, + ignore=None, + copy_function=shutil.copy2, + ignore_dangling_symlinks=False, + dirs_exist_ok=False, +): + """Recursively copy a directory tree. + + The copytree3 is a port of shutil.copytree function from python-3.2. + It has additional useful parameters and may be helpful while we are + on python-2.x. It has to be removed as soon as we have moved to + python-3.2 or higher. + + The destination directory must not already exist. + If exception(s) occur, an Error is raised with a list of reasons. + + If the optional symlinks flag is true, symbolic links in the + source tree result in symbolic links in the destination tree; if + it is false, the contents of the files pointed to by symbolic + links are copied. If the file pointed by the symlink doesn't + exist, an exception will be added in the list of errors raised in + an Error exception at the end of the copy process. + + You can set the optional ignore_dangling_symlinks flag to true if you + want to silence this exception. Notice that this has no effect on + platforms that don't support os.symlink. + + The optional ignore argument is a callable. If given, it + is called with the `src` parameter, which is the directory + being visited by copytree3(), and `names` which is the list of + `src` contents, as returned by os.listdir(): + + callable(src, names) -> ignored_names + + Since copytree3() is called recursively, the callable will be + called once for each directory that is copied. It returns a + list of names relative to the `src` directory that should + not be copied. + + The optional copy_function argument is a callable that will be used + to copy each file. It will be called with the source path and the + destination path as arguments. By default, copy2() is used, but any + function that supports the same signature (like copy()) can be used. + + """ + names = os.listdir(src) + if ignore is not None: + ignored_names = ignore(src, names) + else: + ignored_names = set() + + if not (dirs_exist_ok and os.path.isdir(dst)): + os.makedirs(dst) + + errors = [] + for name in names: + if name in ignored_names: + continue + srcname = os.path.join(src, name) + dstname = os.path.join(dst, name) + try: + if os.path.islink(srcname): + linkto = os.readlink(srcname) + if symlinks: + # We can't just leave it to `copy_function` because legacy + # code with a custom `copy_function` may rely on copytree3 + # doing the right thing. + os.symlink(linkto, dstname) + else: + # ignore dangling symlink if the flag is on + if not os.path.exists(linkto) and ignore_dangling_symlinks: + continue + # otherwise let the copy occurs. copy2 will raise an error + copy_function(srcname, dstname) + elif os.path.isdir(srcname): + copytree3(srcname, dstname, symlinks, ignore, copy_function, dirs_exist_ok=dirs_exist_ok) + else: + # Will raise a SpecialFileError for unsupported file types + copy_function(srcname, dstname) + # catch the Error from the recursive copytree3 so that we can + # continue with other files + except shutil.Error as err: + errors.extend(err.args[0]) + except EnvironmentError as why: + errors.append((srcname, dstname, str(why))) + try: + shutil.copystat(src, dst) + except OSError as why: + if WindowsError is not None and isinstance(why, WindowsError): + # Copying file access times may fail on Windows + pass + else: + errors.extend((src, dst, str(why))) + if errors: + raise shutil.Error(errors) + + +def walk_relative(path, topdown=True, onerror=None, followlinks=False): + for dirpath, dirnames, filenames in os.walk(path, topdown=topdown, onerror=onerror, followlinks=followlinks): + yield os.path.relpath(dirpath, path), dirnames, filenames + + +def supports_clone(): + if 'darwin' in sys.platform: + import platform + + return list(map(int, platform.mac_ver()[0].split('.'))) >= [10, 13] + return False + + +def commonpath(paths): + assert paths + if len(paths) == 1: + return next(iter(paths)) + + split_paths = [path.split(os.sep) for path in paths] + smin = min(split_paths) + smax = max(split_paths) + + common = smin + for i, c in enumerate(smin): + if c != smax[i]: + common = smin[:i] + break + + return os.path.sep.join(common) + + +def set_execute_bits(filename): + stm = os.stat(filename).st_mode + exe = stm | 0o111 + if stm != exe: + os.chmod(filename, exe) diff --git a/library/python/fs/clonefile.pyx b/library/python/fs/clonefile.pyx new file mode 100644 index 0000000000..830bb894f2 --- /dev/null +++ b/library/python/fs/clonefile.pyx @@ -0,0 +1,18 @@ +import six + +cdef extern from "sys/clonefile.h" nogil: + int clonefile(const char * src, const char * dst, int flags) + +cdef extern from "Python.h": + ctypedef struct PyObject + cdef PyObject *PyExc_OSError + PyObject *PyErr_SetFromErrno(PyObject *) + +cdef int _macos_clone_file(const char* src, const char* dst) except? 0: + if clonefile(src, dst, 0) == -1: + PyErr_SetFromErrno(PyExc_OSError) + return 0 + return 1 + +def macos_clone_file(src, dst): + return _macos_clone_file(six.ensure_binary(src), six.ensure_binary(dst)) != 0 diff --git a/library/python/fs/test/test_fs.py b/library/python/fs/test/test_fs.py new file mode 100644 index 0000000000..9e2c70c069 --- /dev/null +++ b/library/python/fs/test/test_fs.py @@ -0,0 +1,1037 @@ +# coding=utf-8 + +import errno +import os +import pytest +import shutil +import six + +import library.python.fs +import library.python.strings +import library.python.tmp +import library.python.windows + +import yatest.common + + +def in_env(case): + def wrapped_case(*args, **kwargs): + with library.python.tmp.temp_dir() as temp_dir: + case(lambda path: os.path.join(temp_dir, path)) + + return wrapped_case + + +def mkfile(path, data=''): + with open(path, 'wb') as f: + if data: + f.write(data) if isinstance(data, six.binary_type) else f.write( + data.encode(library.python.strings.fs_encoding()) + ) + + +def mktree_example(path, name): + os.mkdir(path(name)) + mkfile(path(name + '/file1'), 'FILE1') + os.mkdir(path(name + '/dir1')) + os.mkdir(path(name + '/dir2')) + mkfile(path(name + '/dir2/file2'), 'FILE2') + mkfile(path(name + '/dir2/file3'), 'FILE3') + + +def file_data(path): + with open(path, 'rb') as f: + return f.read().decode('utf-8') + + +def serialize_tree(path): + if os.path.isfile(path): + return file_data(path) + data = {'dirs': set(), 'files': {}} + for dirpath, dirnames, filenames in os.walk(path): + dirpath_rel = os.path.relpath(dirpath, path) + if dirpath_rel == '.': + dirpath_rel = '' + data['dirs'].update(set(os.path.join(dirpath_rel, x) for x in dirnames)) + data['files'].update({os.path.join(dirpath_rel, x): file_data(os.path.join(dirpath, x)) for x in filenames}) + return data + + +def trees_equal(dir1, dir2): + return serialize_tree(dir1) == serialize_tree(dir2) + + +def inodes_unsupported(): + return library.python.windows.on_win() + + +def inodes_equal(path1, path2): + return os.stat(path1).st_ino == os.stat(path2).st_ino + + +def gen_error_access_denied(): + if library.python.windows.on_win(): + err = WindowsError() + err.errno = errno.EACCES + err.strerror = '' + err.winerror = library.python.windows.ERRORS['ACCESS_DENIED'] + else: + err = OSError() + err.errno = errno.EACCES + err.strerror = os.strerror(err.errno) + err.filename = 'unknown/file' + raise err + + +def test_errorfix_win(): + @library.python.fs.errorfix_win + def erroneous_func(): + gen_error_access_denied() + + with pytest.raises(OSError) as errinfo: + erroneous_func() + assert errinfo.value.errno == errno.EACCES + assert errinfo.value.filename == 'unknown/file' + # See transcode_error, which encodes strerror, in library/python/windows/__init__.py + assert isinstance(errinfo.value.strerror, (six.binary_type, six.text_type)) + assert errinfo.value.strerror + + +def test_custom_fs_error(): + with pytest.raises(OSError) as errinfo: + raise library.python.fs.CustomFsError(errno.EACCES, filename='some/file') + assert errinfo.value.errno == errno.EACCES + # See transcode_error, which encodes strerror, in library/python/windows/__init__.py + assert isinstance(errinfo.value.strerror, (six.binary_type, six.text_type)) + assert errinfo.value.filename == 'some/file' + + +@in_env +def test_ensure_dir(path): + library.python.fs.ensure_dir(path('dir/subdir')) + assert os.path.isdir(path('dir')) + assert os.path.isdir(path('dir/subdir')) + + +@in_env +def test_ensure_dir_exists(path): + os.makedirs(path('dir/subdir')) + library.python.fs.ensure_dir(path('dir/subdir')) + assert os.path.isdir(path('dir')) + assert os.path.isdir(path('dir/subdir')) + + +@in_env +def test_ensure_dir_exists_partly(path): + os.mkdir(path('dir')) + library.python.fs.ensure_dir(path('dir/subdir')) + assert os.path.isdir(path('dir')) + assert os.path.isdir(path('dir/subdir')) + + +@in_env +def test_ensure_dir_exists_file(path): + mkfile(path('dir')) + with pytest.raises(OSError) as errinfo: + library.python.fs.ensure_dir(path('dir/subdir')) + # ENOENT on Windows! + assert errinfo.value.errno in (errno.ENOTDIR, errno.ENOENT) + assert os.path.isfile(path('dir')) + + +@in_env +def test_create_dirs(path): + assert library.python.fs.create_dirs(path('dir/subdir')) == path('dir/subdir') + assert os.path.isdir(path('dir')) + assert os.path.isdir(path('dir/subdir')) + + +@in_env +def test_move_file(path): + mkfile(path('src'), 'SRC') + library.python.fs.move(path('src'), path('dst')) + assert not os.path.isfile(path('src')) + assert os.path.isfile(path('dst')) + assert file_data(path('dst')) == 'SRC' + + +@in_env +def test_move_file_no_src(path): + with pytest.raises(OSError) as errinfo: + library.python.fs.move(path('src'), path('dst')) + assert errinfo.value.errno == errno.ENOENT + + +@in_env +def test_move_file_exists(path): + mkfile(path('src'), 'SRC') + mkfile(path('dst'), 'DST') + if library.python.windows.on_win(): + # move is platform-dependent, use replace_file for dst replacement on all platforms + with pytest.raises(OSError) as errinfo: + library.python.fs.move(path('src'), path('dst')) + assert errinfo.value.errno == errno.EEXIST + assert os.path.isfile(path('src')) + assert os.path.isfile(path('dst')) + assert file_data(path('dst')) == 'DST' + else: + library.python.fs.move(path('src'), path('dst')) + assert not os.path.isfile(path('src')) + assert os.path.isfile(path('dst')) + assert file_data(path('dst')) == 'SRC' + + +@in_env +def test_move_file_exists_dir_empty(path): + mkfile(path('src'), 'SRC') + os.mkdir(path('dst')) + with pytest.raises(OSError) as errinfo: + library.python.fs.move(path('src'), path('dst')) + assert errinfo.value.errno in (errno.EEXIST, errno.EISDIR) + assert os.path.isfile(path('src')) + assert os.path.isdir(path('dst')) + assert not os.path.isfile(path('dst/src')) + + +@in_env +def test_move_file_exists_dir_nonempty(path): + mkfile(path('src'), 'SRC') + os.mkdir(path('dst')) + mkfile(path('dst/dst_file')) + with pytest.raises(OSError) as errinfo: + library.python.fs.move(path('src'), path('dst')) + assert errinfo.value.errno in (errno.EEXIST, errno.EISDIR) + assert os.path.isfile(path('src')) + assert os.path.isdir(path('dst')) + assert os.path.isfile(path('dst/dst_file')) + assert not os.path.isfile(path('dst/src')) + + +@in_env +def test_move_dir(path): + os.mkdir(path('src')) + mkfile(path('src/src_file')) + library.python.fs.move(path('src'), path('dst')) + assert not os.path.isdir(path('src')) + assert os.path.isdir(path('dst')) + assert os.path.isfile(path('dst/src_file')) + + +@in_env +def test_move_dir_exists_empty(path): + os.mkdir(path('src')) + mkfile(path('src/src_file')) + os.mkdir(path('dst')) + if library.python.windows.on_win(): + # move is platform-dependent, use non-atomic replace for directory replacement + with pytest.raises(OSError) as errinfo: + library.python.fs.move(path('src'), path('dst')) + assert errinfo.value.errno == errno.EEXIST + assert os.path.isdir(path('src')) + assert os.path.isdir(path('dst')) + assert not os.path.isfile(path('dst/src_file')) + else: + library.python.fs.move(path('src'), path('dst')) + assert not os.path.isdir(path('src')) + assert os.path.isdir(path('dst')) + assert os.path.isfile(path('dst/src_file')) + + +@in_env +def test_move_dir_exists_nonempty(path): + os.mkdir(path('src')) + mkfile(path('src/src_file')) + os.mkdir(path('dst')) + mkfile(path('dst/dst_file')) + with pytest.raises(OSError) as errinfo: + library.python.fs.move(path('src'), path('dst')) + assert errinfo.value.errno in (errno.EEXIST, errno.ENOTEMPTY) + assert os.path.isdir(path('src')) + assert os.path.isfile(path('src/src_file')) + assert os.path.isdir(path('dst')) + assert not os.path.isfile(path('dst/src_file')) + assert os.path.isfile(path('dst/dst_file')) + + +@in_env +def test_move_dir_exists_file(path): + os.mkdir(path('src')) + mkfile(path('src/src_file')) + mkfile(path('dst'), 'DST') + with pytest.raises(OSError) as errinfo: + library.python.fs.move(path('src'), path('dst')) + assert errinfo.value.errno in (errno.EEXIST, errno.ENOTDIR) + assert os.path.isdir(path('src')) + assert os.path.isfile(path('dst')) + assert file_data(path('dst')) == 'DST' + + +@in_env +def test_replace_file(path): + mkfile(path('src'), 'SRC') + library.python.fs.replace_file(path('src'), path('dst')) + assert not os.path.isfile(path('src')) + assert os.path.isfile(path('dst')) + assert file_data(path('dst')) == 'SRC' + + mkfile(path('src'), 'SRC') + library.python.fs.replace(path('src'), path('dst2')) + assert not os.path.isfile(path('src')) + assert os.path.isfile(path('dst2')) + assert file_data(path('dst2')) == 'SRC' + + +@in_env +def test_replace_file_no_src(path): + with pytest.raises(OSError) as errinfo: + library.python.fs.replace_file(path('src'), path('dst')) + assert errinfo.value.errno == errno.ENOENT + + with pytest.raises(OSError) as errinfo2: + library.python.fs.replace(path('src'), path('dst2')) + assert errinfo2.value.errno == errno.ENOENT + + +@in_env +def test_replace_file_exists(path): + mkfile(path('src'), 'SRC') + mkfile(path('dst'), 'DST') + library.python.fs.replace_file(path('src'), path('dst')) + assert not os.path.isfile(path('src')) + assert os.path.isfile(path('dst')) + assert file_data(path('dst')) == 'SRC' + + mkfile(path('src'), 'SRC') + mkfile(path('dst2'), 'DST') + library.python.fs.replace(path('src'), path('dst2')) + assert not os.path.isfile(path('src')) + assert os.path.isfile(path('dst2')) + assert file_data(path('dst2')) == 'SRC' + + +@in_env +def test_replace_file_exists_dir_empty(path): + mkfile(path('src'), 'SRC') + os.mkdir(path('dst')) + with pytest.raises(OSError) as errinfo: + library.python.fs.replace_file(path('src'), path('dst')) + assert errinfo.value.errno in (errno.EISDIR, errno.EACCES) + assert os.path.isfile(path('src')) + assert os.path.isdir(path('dst')) + assert not os.path.isfile(path('dst/src')) + + +@in_env +def test_replace_file_exists_dir_empty_overwrite(path): + mkfile(path('src'), 'SRC') + os.mkdir(path('dst')) + library.python.fs.replace(path('src'), path('dst')) + assert not os.path.isfile(path('src')) + assert os.path.isfile(path('dst')) + assert file_data(path('dst')) == 'SRC' + + +@in_env +def test_replace_file_exists_dir_nonempty(path): + mkfile(path('src'), 'SRC') + os.mkdir(path('dst')) + mkfile(path('dst/dst_file')) + with pytest.raises(OSError) as errinfo: + library.python.fs.replace_file(path('src'), path('dst')) + assert errinfo.value.errno in (errno.EISDIR, errno.EACCES) + assert os.path.isfile(path('src')) + assert os.path.isdir(path('dst')) + assert os.path.isfile(path('dst/dst_file')) + assert not os.path.isfile(path('dst/src')) + + +@in_env +def test_replace_file_exists_dir_nonempty_overwrite(path): + mkfile(path('src'), 'SRC') + os.mkdir(path('dst')) + mkfile(path('dst/dst_file')) + library.python.fs.replace(path('src'), path('dst')) + assert not os.path.isfile(path('src')) + assert os.path.isfile(path('dst')) + assert file_data(path('dst')) == 'SRC' + + +@in_env +def test_replace_dir(path): + os.mkdir(path('src')) + mkfile(path('src/src_file')) + library.python.fs.replace(path('src'), path('dst')) + assert not os.path.isdir(path('src')) + assert os.path.isdir(path('dst')) + assert os.path.isfile(path('dst/src_file')) + + +@in_env +def test_replace_dir_exists_empty(path): + os.mkdir(path('src')) + mkfile(path('src/src_file')) + os.mkdir(path('dst')) + library.python.fs.replace(path('src'), path('dst')) + assert not os.path.isdir(path('src')) + assert os.path.isdir(path('dst')) + assert os.path.isfile(path('dst/src_file')) + + +@in_env +def test_replace_dir_exists_nonempty(path): + os.mkdir(path('src')) + mkfile(path('src/src_file')) + os.mkdir(path('dst')) + mkfile(path('dst/dst_file')) + library.python.fs.replace(path('src'), path('dst')) + assert not os.path.isdir(path('src')) + assert os.path.isdir(path('dst')) + assert os.path.isfile(path('dst/src_file')) + assert not os.path.isfile(path('dst/dst_file')) + + +@in_env +def test_replace_dir_exists_file(path): + os.mkdir(path('src')) + mkfile(path('src/src_file')) + mkfile(path('dst'), 'DST') + library.python.fs.replace(path('src'), path('dst')) + assert not os.path.isdir(path('src')) + assert os.path.isdir(path('dst')) + assert os.path.isfile(path('dst/src_file')) + + +@in_env +def test_remove_file(path): + mkfile(path('path')) + library.python.fs.remove_file(path('path')) + assert not os.path.exists(path('path')) + + +@in_env +def test_remove_file_no(path): + with pytest.raises(OSError) as errinfo: + library.python.fs.remove_file(path('path')) + assert errinfo.value.errno == errno.ENOENT + + +@in_env +def test_remove_file_exists_dir(path): + os.mkdir(path('path')) + with pytest.raises(OSError) as errinfo: + library.python.fs.remove_file(path('path')) + assert errinfo.value.errno in (errno.EISDIR, errno.EACCES) + assert os.path.isdir(path('path')) + + +@in_env +def test_remove_dir(path): + os.mkdir(path('path')) + library.python.fs.remove_dir(path('path')) + assert not os.path.exists(path('path')) + + +@in_env +def test_remove_dir_no(path): + with pytest.raises(OSError) as errinfo: + library.python.fs.remove_dir(path('path')) + assert errinfo.value.errno == errno.ENOENT + + +@in_env +def test_remove_dir_exists_file(path): + mkfile(path('path')) + with pytest.raises(OSError) as errinfo: + library.python.fs.remove_dir(path('path')) + assert errinfo.value.errno in (errno.ENOTDIR, errno.EINVAL) + assert os.path.isfile(path('path')) + + +@in_env +def test_remove_tree(path): + mktree_example(path, 'path') + library.python.fs.remove_tree(path('path')) + assert not os.path.exists(path('path')) + + +@in_env +def test_remove_tree_empty(path): + os.mkdir(path('path')) + library.python.fs.remove_tree(path('path')) + assert not os.path.exists(path('path')) + + +@in_env +def test_remove_tree_file(path): + mkfile(path('path')) + library.python.fs.remove_tree(path('path')) + assert not os.path.exists(path('path')) + + +@in_env +def test_remove_tree_no(path): + with pytest.raises(OSError) as errinfo: + library.python.fs.remove_tree(path('path')) + assert errinfo.value.errno == errno.ENOENT + + +@in_env +def test_remove_tree_safe(path): + library.python.fs.remove_tree_safe(path('path')) + + +@in_env +def test_ensure_removed(path): + library.python.fs.ensure_removed(path('path')) + + +@in_env +def test_ensure_removed_exists(path): + os.makedirs(path('dir/subdir')) + library.python.fs.ensure_removed(path('dir')) + assert not os.path.exists(path('dir')) + + +@in_env +def test_ensure_removed_exists_precise(path): + os.makedirs(path('dir/subdir')) + library.python.fs.ensure_removed(path('dir/subdir')) + assert os.path.exists(path('dir')) + assert not os.path.exists(path('dir/subdir')) + + +@in_env +def test_hardlink_file(path): + mkfile(path('src'), 'SRC') + library.python.fs.hardlink(path('src'), path('dst')) + assert os.path.isfile(path('src')) + assert os.path.isfile(path('dst')) + assert file_data(path('dst')) == 'SRC' + assert inodes_unsupported() or inodes_equal(path('src'), path('dst')) + + +@in_env +def test_hardlink_file_no_src(path): + with pytest.raises(OSError) as errinfo: + library.python.fs.hardlink(path('src'), path('dst')) + assert errinfo.value.errno == errno.ENOENT + + +@in_env +def test_hardlink_file_exists(path): + mkfile(path('src'), 'SRC') + mkfile(path('dst'), 'DST') + with pytest.raises(OSError) as errinfo: + library.python.fs.hardlink(path('src'), path('dst')) + assert errinfo.value.errno == errno.EEXIST + assert os.path.isfile(path('src')) + assert os.path.isfile(path('dst')) + assert file_data(path('dst')) == 'DST' + assert inodes_unsupported() or not inodes_equal(path('src'), path('dst')) + + +@in_env +def test_hardlink_file_exists_dir(path): + mkfile(path('src'), 'SRC') + os.mkdir(path('dst')) + with pytest.raises(OSError) as errinfo: + library.python.fs.hardlink(path('src'), path('dst')) + assert errinfo.value.errno == errno.EEXIST + assert os.path.isfile(path('src')) + assert os.path.isdir(path('dst')) + assert not os.path.isfile(path('dst/src')) + + +@in_env +def test_hardlink_dir(path): + os.mkdir(path('src')) + mkfile(path('src/src_file')) + with pytest.raises(OSError) as errinfo: + library.python.fs.hardlink(path('src'), path('dst')) + assert errinfo.value.errno in (errno.EPERM, errno.EACCES) + assert os.path.isdir(path('src')) + assert not os.path.isdir(path('dst')) + + +@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') +@in_env +def test_symlink_file(path): + mkfile(path('src'), 'SRC') + library.python.fs.symlink(path('src'), path('dst')) + assert os.path.isfile(path('src')) + assert os.path.isfile(path('dst')) + assert os.path.islink(path('dst')) + assert file_data(path('dst')) == 'SRC' + + +@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') +@in_env +def test_symlink_file_no_src(path): + library.python.fs.symlink(path('src'), path('dst')) + assert not os.path.isfile(path('src')) + assert not os.path.isfile(path('dst')) + assert os.path.islink(path('dst')) + + +@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') +@in_env +def test_symlink_file_exists(path): + mkfile(path('src'), 'SRC') + mkfile(path('dst'), 'DST') + with pytest.raises(OSError) as errinfo: + library.python.fs.symlink(path('src'), path('dst')) + assert errinfo.value.errno == errno.EEXIST + assert os.path.isfile(path('src')) + assert os.path.isfile(path('dst')) + assert not os.path.islink(path('dst')) + assert file_data(path('dst')) == 'DST' + + +@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') +@in_env +def test_symlink_file_exists_dir(path): + mkfile(path('src'), 'SRC') + os.mkdir(path('dst')) + with pytest.raises(OSError) as errinfo: + library.python.fs.symlink(path('src'), path('dst')) + assert errinfo.value.errno == errno.EEXIST + assert os.path.isfile(path('src')) + assert os.path.isdir(path('dst')) + assert not os.path.islink(path('dst')) + assert not os.path.isfile(path('dst/src')) + + +@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') +@in_env +def test_symlink_dir(path): + os.mkdir(path('src')) + mkfile(path('src/src_file')) + library.python.fs.symlink(path('src'), path('dst')) + assert os.path.isdir(path('src')) + assert os.path.isdir(path('dst')) + assert os.path.islink(path('dst')) + assert os.path.isfile(path('dst/src_file')) + + +@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') +@in_env +def test_symlink_dir_no_src(path): + library.python.fs.symlink(path('src'), path('dst')) + assert not os.path.isdir(path('src')) + assert not os.path.isdir(path('dst')) + assert os.path.islink(path('dst')) + + +@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') +@in_env +def test_symlink_dir_exists(path): + os.mkdir(path('src')) + mkfile(path('src/src_file')) + os.mkdir(path('dst')) + with pytest.raises(OSError) as errinfo: + library.python.fs.symlink(path('src'), path('dst')) + assert errinfo.value.errno == errno.EEXIST + assert os.path.isdir(path('src')) + assert os.path.isdir(path('dst')) + assert not os.path.islink(path('dst')) + assert not os.path.isfile(path('dst/src_file')) + + +@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') +@in_env +def test_symlink_dir_exists_file(path): + os.mkdir(path('src')) + mkfile(path('src/src_file')) + mkfile(path('dst'), 'DST') + with pytest.raises(OSError) as errinfo: + library.python.fs.symlink(path('src'), path('dst')) + assert errinfo.value.errno == errno.EEXIST + assert os.path.isdir(path('src')) + assert os.path.isfile(path('dst')) + assert not os.path.islink(path('dst')) + + +@in_env +def test_hardlink_tree(path): + mktree_example(path, 'src') + library.python.fs.hardlink_tree(path('src'), path('dst')) + assert trees_equal(path('src'), path('dst')) + + +@in_env +def test_hardlink_tree_empty(path): + os.mkdir(path('src')) + library.python.fs.hardlink_tree(path('src'), path('dst')) + assert trees_equal(path('src'), path('dst')) + + +@in_env +def test_hardlink_tree_file(path): + mkfile(path('src'), 'SRC') + library.python.fs.hardlink_tree(path('src'), path('dst')) + assert trees_equal(path('src'), path('dst')) + + +@in_env +def test_hardlink_tree_no_src(path): + with pytest.raises(OSError) as errinfo: + library.python.fs.hardlink_tree(path('src'), path('dst')) + assert errinfo.value.errno == errno.ENOENT + + +@in_env +def test_hardlink_tree_exists(path): + mktree_example(path, 'src') + os.mkdir(path('dst_dir')) + with pytest.raises(OSError) as errinfo: + library.python.fs.hardlink_tree(path('src'), path('dst_dir')) + assert errinfo.value.errno == errno.EEXIST + mkfile(path('dst_file'), 'DST') + with pytest.raises(OSError) as errinfo: + library.python.fs.hardlink_tree(path('src'), path('dst_file')) + assert errinfo.value.errno == errno.EEXIST + + +@in_env +def test_hardlink_tree_file_exists(path): + mkfile(path('src'), 'SRC') + os.mkdir(path('dst_dir')) + with pytest.raises(OSError) as errinfo: + library.python.fs.hardlink_tree(path('src'), path('dst_dir')) + assert errinfo.value.errno == errno.EEXIST + mkfile(path('dst_file'), 'DST') + with pytest.raises(OSError) as errinfo: + library.python.fs.hardlink_tree(path('src'), path('dst_file')) + assert errinfo.value.errno == errno.EEXIST + + +@in_env +def test_copy_file(path): + mkfile(path('src'), 'SRC') + library.python.fs.copy_file(path('src'), path('dst')) + assert os.path.isfile(path('src')) + assert os.path.isfile(path('dst')) + assert file_data(path('dst')) == 'SRC' + + +@in_env +def test_copy_file_no_src(path): + with pytest.raises(EnvironmentError): + library.python.fs.copy_file(path('src'), path('dst')) + + +@in_env +def test_copy_file_exists(path): + mkfile(path('src'), 'SRC') + mkfile(path('dst'), 'DST') + library.python.fs.copy_file(path('src'), path('dst')) + assert os.path.isfile(path('src')) + assert os.path.isfile(path('dst')) + assert file_data(path('dst')) == 'SRC' + + +@in_env +def test_copy_file_exists_dir_empty(path): + mkfile(path('src'), 'SRC') + os.mkdir(path('dst')) + with pytest.raises(EnvironmentError): + library.python.fs.copy_file(path('src'), path('dst')) + assert os.path.isfile(path('src')) + assert os.path.isdir(path('dst')) + assert not os.path.isfile(path('dst/src')) + + +@in_env +def test_copy_file_exists_dir_nonempty(path): + mkfile(path('src'), 'SRC') + os.mkdir(path('dst')) + mkfile(path('dst/dst_file')) + with pytest.raises(EnvironmentError): + library.python.fs.copy_file(path('src'), path('dst')) + assert os.path.isfile(path('src')) + assert os.path.isdir(path('dst')) + assert os.path.isfile(path('dst/dst_file')) + assert not os.path.isfile(path('dst/src')) + + +@in_env +def test_copy_tree(path): + mktree_example(path, 'src') + library.python.fs.copy_tree(path('src'), path('dst')) + assert trees_equal(path('src'), path('dst')) + + +@in_env +def test_copy_tree_empty(path): + os.mkdir(path('src')) + library.python.fs.copy_tree(path('src'), path('dst')) + assert trees_equal(path('src'), path('dst')) + + +@in_env +def test_copy_tree_file(path): + mkfile(path('src'), 'SRC') + library.python.fs.copy_tree(path('src'), path('dst')) + assert trees_equal(path('src'), path('dst')) + + +@in_env +def test_copy_tree_no_src(path): + with pytest.raises(EnvironmentError): + library.python.fs.copy_tree(path('src'), path('dst')) + + +@in_env +def test_copy_tree_exists(path): + mktree_example(path, 'src') + os.mkdir(path('dst_dir')) + with pytest.raises(EnvironmentError): + library.python.fs.copy_tree(path('src'), path('dst_dir')) + mkfile(path('dst_file'), 'DST') + with pytest.raises(EnvironmentError): + library.python.fs.copy_tree(path('src'), path('dst_file')) + + +@in_env +def test_copy_tree_file_exists(path): + mkfile(path('src'), 'SRC') + os.mkdir(path('dst_dir')) + with pytest.raises(EnvironmentError): + library.python.fs.copy_tree(path('src'), path('dst_dir')) + mkfile(path('dst_file'), 'DST') + library.python.fs.copy_tree(path('src'), path('dst_file')) + assert trees_equal(path('src'), path('dst_file')) + + +@in_env +def test_read_file(path): + mkfile(path('src'), 'SRC') + assert library.python.fs.read_file(path('src')).decode(library.python.strings.fs_encoding()) == 'SRC' + assert library.python.fs.read_file(path('src'), binary=False) == 'SRC' + + +@in_env +def test_read_file_empty(path): + mkfile(path('src')) + assert library.python.fs.read_file(path('src')).decode(library.python.strings.fs_encoding()) == '' + assert library.python.fs.read_file(path('src'), binary=False) == '' + + +@in_env +def test_read_file_multiline(path): + mkfile(path('src'), 'SRC line 1\nSRC line 2\n') + assert ( + library.python.fs.read_file(path('src')).decode(library.python.strings.fs_encoding()) + == 'SRC line 1\nSRC line 2\n' + ) + assert library.python.fs.read_file(path('src'), binary=False) == 'SRC line 1\nSRC line 2\n' + + +@in_env +def test_read_file_multiline_crlf(path): + mkfile(path('src'), 'SRC line 1\r\nSRC line 2\r\n') + assert ( + library.python.fs.read_file(path('src')).decode(library.python.strings.fs_encoding()) + == 'SRC line 1\r\nSRC line 2\r\n' + ) + if library.python.windows.on_win() or six.PY3: # universal newlines are by default in text mode in python3 + assert library.python.fs.read_file(path('src'), binary=False) == 'SRC line 1\nSRC line 2\n' + else: + assert library.python.fs.read_file(path('src'), binary=False) == 'SRC line 1\r\nSRC line 2\r\n' + + +@in_env +def test_read_file_unicode(path): + s = u'АБВ' + mkfile(path('src'), s.encode('utf-8')) + mkfile(path('src_cp1251'), s.encode('cp1251')) + assert library.python.fs.read_file_unicode(path('src')) == s + assert library.python.fs.read_file_unicode(path('src_cp1251'), enc='cp1251') == s + assert library.python.fs.read_file_unicode(path('src'), binary=False) == s + assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == s + + +@in_env +def test_read_file_unicode_empty(path): + mkfile(path('src')) + mkfile(path('src_cp1251')) + assert library.python.fs.read_file_unicode(path('src')) == '' + assert library.python.fs.read_file_unicode(path('src_cp1251'), enc='cp1251') == '' + assert library.python.fs.read_file_unicode(path('src'), binary=False) == '' + assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == '' + + +@in_env +def test_read_file_unicode_multiline(path): + s = u'АБВ\nИ еще\n' + mkfile(path('src'), s.encode('utf-8')) + mkfile(path('src_cp1251'), s.encode('cp1251')) + assert library.python.fs.read_file_unicode(path('src')) == s + assert library.python.fs.read_file_unicode(path('src_cp1251'), enc='cp1251') == s + assert library.python.fs.read_file_unicode(path('src'), binary=False) == s + assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == s + + +@in_env +def test_read_file_unicode_multiline_crlf(path): + s = u'АБВ\r\nИ еще\r\n' + mkfile(path('src'), s.encode('utf-8')) + mkfile(path('src_cp1251'), s.encode('cp1251')) + assert library.python.fs.read_file_unicode(path('src')) == s + assert library.python.fs.read_file_unicode(path('src_cp1251'), enc='cp1251') == s + if library.python.windows.on_win() or six.PY3: # universal newlines are by default in text mode in python3 + assert library.python.fs.read_file_unicode(path('src'), binary=False) == u'АБВ\nИ еще\n' + assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == u'АБВ\nИ еще\n' + else: + assert library.python.fs.read_file_unicode(path('src'), binary=False) == s + assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == s + + +@in_env +def test_write_file(path): + library.python.fs.write_file(path('src'), 'SRC') + assert file_data(path('src')) == 'SRC' + library.python.fs.write_file(path('src2'), 'SRC', binary=False) + assert file_data(path('src2')) == 'SRC' + + +@in_env +def test_write_file_empty(path): + library.python.fs.write_file(path('src'), '') + assert file_data(path('src')) == '' + library.python.fs.write_file(path('src2'), '', binary=False) + assert file_data(path('src2')) == '' + + +@in_env +def test_write_file_multiline(path): + library.python.fs.write_file(path('src'), 'SRC line 1\nSRC line 2\n') + assert file_data(path('src')) == 'SRC line 1\nSRC line 2\n' + library.python.fs.write_file(path('src2'), 'SRC line 1\nSRC line 2\n', binary=False) + if library.python.windows.on_win(): + assert file_data(path('src2')) == 'SRC line 1\r\nSRC line 2\r\n' + else: + assert file_data(path('src2')) == 'SRC line 1\nSRC line 2\n' + + +@in_env +def test_write_file_multiline_crlf(path): + library.python.fs.write_file(path('src'), 'SRC line 1\r\nSRC line 2\r\n') + assert file_data(path('src')) == 'SRC line 1\r\nSRC line 2\r\n' + library.python.fs.write_file(path('src2'), 'SRC line 1\r\nSRC line 2\r\n', binary=False) + if library.python.windows.on_win(): + assert file_data(path('src2')) == 'SRC line 1\r\r\nSRC line 2\r\r\n' + else: + assert file_data(path('src2')) == 'SRC line 1\r\nSRC line 2\r\n' + + +@in_env +def test_get_file_size(path): + mkfile(path('one.txt'), '22') + assert library.python.fs.get_file_size(path('one.txt')) == 2 + + +@in_env +def test_get_file_size_empty(path): + mkfile(path('one.txt')) + assert library.python.fs.get_file_size(path('one.txt')) == 0 + + +@in_env +def test_get_tree_size(path): + os.makedirs(path('deeper')) + mkfile(path('one.txt'), '1') + mkfile(path('deeper/two.txt'), '22') + assert library.python.fs.get_tree_size(path('one.txt')) == 1 + assert library.python.fs.get_tree_size(path('')) == 1 + assert library.python.fs.get_tree_size(path(''), recursive=True) == 3 + + +@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows') +@in_env +def test_get_tree_size_dangling_symlink(path): + os.makedirs(path('deeper')) + mkfile(path('one.txt'), '1') + mkfile(path('deeper/two.txt'), '22') + os.symlink(path('deeper/two.txt'), path("deeper/link.txt")) + os.remove(path('deeper/two.txt')) + # does not fail + assert library.python.fs.get_tree_size(path(''), recursive=True) == 1 + + +@pytest.mark.skipif(not library.python.windows.on_win(), reason='Test hardlinks on windows') +def test_hardlink_or_copy(): + max_allowed_hard_links = 1023 + + def run(hardlink_function, dir): + src = r"test.txt" + with open(src, "w") as f: + f.write("test") + for i in range(max_allowed_hard_links + 1): + hardlink_function(src, os.path.join(dir, "{}.txt".format(i))) + + dir1 = library.python.fs.create_dirs("one") + with pytest.raises(WindowsError) as e: + run(library.python.fs.hardlink, dir1) + assert e.value.winerror == 1142 + assert len(os.listdir(dir1)) == max_allowed_hard_links + + dir2 = library.python.fs.create_dirs("two") + run(library.python.fs.hardlink_or_copy, dir2) + assert len(os.listdir(dir2)) == max_allowed_hard_links + 1 + + +def test_remove_tree_unicode(): + path = u"test_remove_tree_unicode/русский".encode("utf-8") + os.makedirs(path) + library.python.fs.remove_tree(six.text_type("test_remove_tree_unicode")) + assert not os.path.exists("test_remove_tree_unicode") + + +def test_remove_tree_safe_unicode(): + path = u"test_remove_tree_safe_unicode/русский".encode("utf-8") + os.makedirs(path) + library.python.fs.remove_tree_safe(six.text_type("test_remove_tree_safe_unicode")) + assert not os.path.exists("test_remove_tree_safe_unicode") + + +def test_copy_tree_custom_copy_function(): + library.python.fs.create_dirs("test_copy_tree_src/deepper/inner") + library.python.fs.write_file("test_copy_tree_src/deepper/deepper.txt", "deepper.txt") + library.python.fs.write_file("test_copy_tree_src/deepper/inner/inner.txt", "inner.txt") + copied = [] + + def copy_function(src, dst): + shutil.copy2(src, dst) + copied.append(dst) + + library.python.fs.copy_tree( + "test_copy_tree_src", yatest.common.work_path("test_copy_tree_dst"), copy_function=copy_function + ) + assert len(copied) == 2 + assert yatest.common.work_path("test_copy_tree_dst/deepper/deepper.txt") in copied + assert yatest.common.work_path("test_copy_tree_dst/deepper/inner/inner.txt") in copied + + +def test_copy2(): + library.python.fs.symlink("non-existent", "link") + library.python.fs.copy2("link", "link2", follow_symlinks=False) + + assert os.path.islink("link2") + assert os.readlink("link2") == "non-existent" + + +def test_commonpath(): + pj = os.path.join + pja = lambda *x: os.path.abspath(pj(*x)) + + assert library.python.fs.commonpath(['a', 'b']) == '' + assert library.python.fs.commonpath([pj('t', '1')]) == pj('t', '1') + assert library.python.fs.commonpath([pj('t', '1'), pj('t', '2')]) == pj('t') + assert library.python.fs.commonpath([pj('t', '1', '2'), pj('t', '1', '2')]) == pj('t', '1', '2') + assert library.python.fs.commonpath([pj('t', '1', '1'), pj('t', '1', '2')]) == pj('t', '1') + assert library.python.fs.commonpath([pj('t', '1', '1'), pj('t', '1', '2'), pj('t', '1', '3')]) == pj('t', '1') + + assert library.python.fs.commonpath([pja('t', '1', '1'), pja('t', '1', '2')]) == pja('t', '1') + + assert library.python.fs.commonpath({pj('t', '1'), pj('t', '2')}) == pj('t') diff --git a/library/python/fs/test/ya.make b/library/python/fs/test/ya.make new file mode 100644 index 0000000000..33e3f5b4ff --- /dev/null +++ b/library/python/fs/test/ya.make @@ -0,0 +1,14 @@ +OWNER(g:yatool) + +PY23_TEST() + +TEST_SRCS( + test_fs.py +) + +PEERDIR( + library/python/fs + library/python/tmp +) + +END() diff --git a/library/python/fs/ya.make b/library/python/fs/ya.make new file mode 100644 index 0000000000..b3c5092c71 --- /dev/null +++ b/library/python/fs/ya.make @@ -0,0 +1,23 @@ +OWNER(g:yatool) + +PY23_LIBRARY() + +PY_SRCS( + __init__.py +) + +IF (OS_DARWIN) + PY_SRCS( + clonefile.pyx + ) +ENDIF() + +PEERDIR( + library/python/func + library/python/strings + library/python/windows +) + +END() + +RECURSE_FOR_TESTS(test) |