aboutsummaryrefslogtreecommitdiffstats
path: root/library/python/fs
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/python/fs
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/python/fs')
-rw-r--r--library/python/fs/__init__.py501
-rw-r--r--library/python/fs/clonefile.pyx18
-rw-r--r--library/python/fs/test/test_fs.py1037
-rw-r--r--library/python/fs/test/ya.make14
-rw-r--r--library/python/fs/ya.make23
5 files changed, 1593 insertions, 0 deletions
diff --git a/library/python/fs/__init__.py b/library/python/fs/__init__.py
new file mode 100644
index 0000000000..b1b7cde079
--- /dev/null
+++ b/library/python/fs/__init__.py
@@ -0,0 +1,501 @@
+# coding: utf-8
+
+import codecs
+import errno
+import logging
+import os
+import random
+import shutil
+import six
+import stat
+import sys
+
+import library.python.func
+import library.python.strings
+import library.python.windows
+
+logger = logging.getLogger(__name__)
+
+
+try:
+ WindowsError
+except NameError:
+ WindowsError = None
+
+
+_diehard_win_tries = 10
+errorfix_win = library.python.windows.errorfix
+
+
+class CustomFsError(OSError):
+ def __init__(self, errno, message='', filename=None):
+ super(CustomFsError, self).__init__(message)
+ self.errno = errno
+ self.strerror = os.strerror(errno)
+ self.filename = filename
+
+
+# Directories creation
+# If dst is already exists and is a directory - does nothing
+# Throws OSError
+@errorfix_win
+def ensure_dir(path):
+ try:
+ os.makedirs(path)
+ except OSError as e:
+ if e.errno != errno.EEXIST or not os.path.isdir(path):
+ raise
+
+
+# Directories creation
+# If dst is already exists and is a directory - does nothing
+# Returns path
+# Throws OSError
+@errorfix_win
+def create_dirs(path):
+ ensure_dir(path)
+ return path
+
+
+# Atomic file/directory move (rename)
+# Doesn't guarantee dst replacement
+# Atomic if no device boundaries are crossed
+# Depends on ctypes on Windows
+# Throws OSError
+# On Unix, if dst exists:
+# if dst is file or empty dir - replaces it
+# if src is dir and dst is not dir - throws OSError (errno ENOTDIR)
+# if src is dir and dst is non-empty dir - throws OSError (errno ENOTEMPTY)
+# if src is file and dst is dir - throws OSError (errno EISDIR)
+# On Windows, if dst exists - throws OSError (errno EEXIST)
+@errorfix_win
+@library.python.windows.diehard(library.python.windows.RETRIABLE_FILE_ERRORS, tries=_diehard_win_tries)
+def move(src, dst):
+ os.rename(src, dst)
+
+
+# Atomic replacing file move (rename)
+# Replaces dst if exists and not a dir
+# Doesn't guarantee dst dir replacement
+# Atomic if no device boundaries are crossed
+# Depends on ctypes on Windows
+# Throws OSError
+# On Unix, if dst exists:
+# if dst is file - replaces it
+# if dst is dir - throws OSError (errno EISDIR)
+# On Windows, if dst exists:
+# if dst is file - replaces it
+# if dst is dir - throws OSError (errno EACCES)
+@errorfix_win
+@library.python.windows.diehard(library.python.windows.RETRIABLE_FILE_ERRORS, tries=_diehard_win_tries)
+def replace_file(src, dst):
+ if library.python.windows.on_win():
+ library.python.windows.replace_file(src, dst)
+ else:
+ os.rename(src, dst)
+
+
+# File/directory replacing move (rename)
+# Removes dst if exists
+# Non-atomic
+# Depends on ctypes on Windows
+# Throws OSError
+@errorfix_win
+def replace(src, dst):
+ try:
+ move(src, dst)
+ except OSError as e:
+ if e.errno not in (errno.EEXIST, errno.EISDIR, errno.ENOTDIR, errno.ENOTEMPTY):
+ raise
+ remove_tree(dst)
+ move(src, dst)
+
+
+# Atomic file remove
+# Throws OSError
+@errorfix_win
+@library.python.windows.diehard(library.python.windows.RETRIABLE_FILE_ERRORS, tries=_diehard_win_tries)
+def remove_file(path):
+ os.remove(path)
+
+
+# Atomic empty directory remove
+# Throws OSError
+@errorfix_win
+@library.python.windows.diehard(library.python.windows.RETRIABLE_DIR_ERRORS, tries=_diehard_win_tries)
+def remove_dir(path):
+ os.rmdir(path)
+
+
+def fix_path_encoding(path):
+ return library.python.strings.to_str(path, library.python.strings.fs_encoding())
+
+
+# File/directory remove
+# Non-atomic
+# Throws OSError, AssertionError
+@errorfix_win
+def remove_tree(path):
+ @library.python.windows.diehard(library.python.windows.RETRIABLE_DIR_ERRORS, tries=_diehard_win_tries)
+ def rmtree(path):
+ if library.python.windows.on_win():
+ library.python.windows.rmtree(path)
+ else:
+ shutil.rmtree(fix_path_encoding(path))
+
+ st = os.lstat(path)
+ if stat.S_ISLNK(st.st_mode) or stat.S_ISREG(st.st_mode):
+ remove_file(path)
+ elif stat.S_ISDIR(st.st_mode):
+ rmtree(path)
+ else:
+ assert False
+
+
+# File/directory remove ignoring errors
+# Non-atomic
+@errorfix_win
+def remove_tree_safe(path):
+ try:
+ st = os.lstat(path)
+ if stat.S_ISLNK(st.st_mode) or stat.S_ISREG(st.st_mode):
+ os.remove(path)
+ elif stat.S_ISDIR(st.st_mode):
+ shutil.rmtree(fix_path_encoding(path), ignore_errors=True)
+ # XXX
+ except UnicodeDecodeError as e:
+ logging.exception(u'remove_tree_safe with argument %s raise exception: %s', path, e)
+ raise
+ except OSError:
+ pass
+
+
+# File/directory remove
+# If path doesn't exist - does nothing
+# Non-atomic
+# Throws OSError, AssertionError
+@errorfix_win
+def ensure_removed(path):
+ try:
+ remove_tree(path)
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ raise
+
+
+# Atomic file hardlink
+# Dst must not exist
+# Depends on ctypes on Windows
+# Throws OSError
+# If dst exists - throws OSError (errno EEXIST)
+@errorfix_win
+def hardlink(src, lnk):
+ if library.python.windows.on_win():
+ library.python.windows.hardlink(src, lnk)
+ else:
+ os.link(src, lnk)
+
+
+@errorfix_win
+def hardlink_or_copy(src, lnk):
+ def should_fallback_to_copy(exc):
+ if WindowsError is not None and isinstance(exc, WindowsError) and exc.winerror == 1142: # too many hardlinks
+ return True
+ # cross-device hardlink or too many hardlinks, or some known WSL error
+ if isinstance(exc, OSError) and exc.errno in (
+ errno.EXDEV,
+ errno.EMLINK,
+ errno.EINVAL,
+ errno.EACCES,
+ errno.EPERM,
+ ):
+ return True
+ return False
+
+ try:
+ hardlink(src, lnk)
+ except Exception as e:
+ logger.debug('Failed to hardlink %s to %s with error %s, will copy it', src, lnk, repr(e))
+ if should_fallback_to_copy(e):
+ copy2(src, lnk, follow_symlinks=False)
+ else:
+ raise
+
+
+# Atomic file/directory symlink (Unix only)
+# Dst must not exist
+# Throws OSError
+# If dst exists - throws OSError (errno EEXIST)
+@errorfix_win
+def symlink(src, lnk):
+ if library.python.windows.on_win():
+ library.python.windows.run_disabled(src, lnk)
+ else:
+ os.symlink(src, lnk)
+
+
+# shutil.copy2 with follow_symlinks=False parameter (Unix only)
+def copy2(src, lnk, follow_symlinks=True):
+ if six.PY3:
+ shutil.copy2(src, lnk, follow_symlinks=follow_symlinks)
+ return
+
+ if follow_symlinks or not os.path.islink(src):
+ shutil.copy2(src, lnk)
+ return
+
+ symlink(os.readlink(src), lnk)
+
+
+# Recursively hardlink directory
+# Uses plain hardlink for files
+# Dst must not exist
+# Non-atomic
+# Throws OSError
+@errorfix_win
+def hardlink_tree(src, dst):
+ if not os.path.exists(src):
+ raise CustomFsError(errno.ENOENT, filename=src)
+ if os.path.isfile(src):
+ hardlink(src, dst)
+ return
+ for dirpath, _, filenames in walk_relative(src):
+ src_dirpath = os.path.join(src, dirpath) if dirpath != '.' else src
+ dst_dirpath = os.path.join(dst, dirpath) if dirpath != '.' else dst
+ os.mkdir(dst_dirpath)
+ for filename in filenames:
+ hardlink(os.path.join(src_dirpath, filename), os.path.join(dst_dirpath, filename))
+
+
+# File copy
+# throws EnvironmentError (OSError, IOError)
+@errorfix_win
+def copy_file(src, dst, copy_function=shutil.copy2):
+ if os.path.isdir(dst):
+ raise CustomFsError(errno.EISDIR, filename=dst)
+ copy_function(src, dst)
+
+
+# File/directory copy
+# throws EnvironmentError (OSError, IOError, shutil.Error)
+@errorfix_win
+def copy_tree(src, dst, copy_function=shutil.copy2):
+ if os.path.isfile(src):
+ copy_file(src, dst, copy_function=copy_function)
+ return
+ copytree3(src, dst, copy_function=copy_function)
+
+
+# File read
+# Throws OSError
+@errorfix_win
+def read_file(path, binary=True):
+ with open(path, 'r' + ('b' if binary else '')) as f:
+ return f.read()
+
+
+# Decoding file read
+# Throws OSError
+@errorfix_win
+def read_file_unicode(path, binary=True, enc='utf-8'):
+ if not binary:
+ if six.PY2:
+ with open(path, 'r') as f:
+ return library.python.strings.to_unicode(f.read(), enc)
+ else:
+ with open(path, 'r', encoding=enc) as f:
+ return f.read()
+ # codecs.open is always binary
+ with codecs.open(path, 'r', encoding=enc, errors=library.python.strings.ENCODING_ERRORS_POLICY) as f:
+ return f.read()
+
+
+@errorfix_win
+def open_file(*args, **kwargs):
+ return (
+ library.python.windows.open_file(*args, **kwargs) if library.python.windows.on_win() else open(*args, **kwargs)
+ )
+
+
+# Atomic file write
+# Throws OSError
+@errorfix_win
+def write_file(path, data, binary=True):
+ dir_path = os.path.dirname(path)
+ if dir_path:
+ ensure_dir(dir_path)
+ tmp_path = path + '.tmp.' + str(random.random())
+ with open_file(tmp_path, 'w' + ('b' if binary else '')) as f:
+ if not isinstance(data, bytes) and binary:
+ data = data.encode('UTF-8')
+ f.write(data)
+ replace_file(tmp_path, path)
+
+
+# File size
+# Throws OSError
+@errorfix_win
+def get_file_size(path):
+ return os.path.getsize(path)
+
+
+# File/directory size
+# Non-recursive mode for directory counts size for immediates
+# While raise_all_errors is set to False, file size fallbacks to zero in case of getsize errors
+# Throws OSError
+@errorfix_win
+def get_tree_size(path, recursive=False, raise_all_errors=False):
+ if os.path.isfile(path):
+ return get_file_size(path)
+ total_size = 0
+ for dir_path, _, files in os.walk(path):
+ for f in files:
+ fp = os.path.join(dir_path, f)
+ try:
+ total_size += get_file_size(fp)
+ except OSError as e:
+ if raise_all_errors:
+ raise
+ logger.debug("Cannot calculate file size: %s", e)
+ if not recursive:
+ break
+ return total_size
+
+
+# Directory copy ported from Python 3
+def copytree3(
+ src,
+ dst,
+ symlinks=False,
+ ignore=None,
+ copy_function=shutil.copy2,
+ ignore_dangling_symlinks=False,
+ dirs_exist_ok=False,
+):
+ """Recursively copy a directory tree.
+
+ The copytree3 is a port of shutil.copytree function from python-3.2.
+ It has additional useful parameters and may be helpful while we are
+ on python-2.x. It has to be removed as soon as we have moved to
+ python-3.2 or higher.
+
+ The destination directory must not already exist.
+ If exception(s) occur, an Error is raised with a list of reasons.
+
+ If the optional symlinks flag is true, symbolic links in the
+ source tree result in symbolic links in the destination tree; if
+ it is false, the contents of the files pointed to by symbolic
+ links are copied. If the file pointed by the symlink doesn't
+ exist, an exception will be added in the list of errors raised in
+ an Error exception at the end of the copy process.
+
+ You can set the optional ignore_dangling_symlinks flag to true if you
+ want to silence this exception. Notice that this has no effect on
+ platforms that don't support os.symlink.
+
+ The optional ignore argument is a callable. If given, it
+ is called with the `src` parameter, which is the directory
+ being visited by copytree3(), and `names` which is the list of
+ `src` contents, as returned by os.listdir():
+
+ callable(src, names) -> ignored_names
+
+ Since copytree3() is called recursively, the callable will be
+ called once for each directory that is copied. It returns a
+ list of names relative to the `src` directory that should
+ not be copied.
+
+ The optional copy_function argument is a callable that will be used
+ to copy each file. It will be called with the source path and the
+ destination path as arguments. By default, copy2() is used, but any
+ function that supports the same signature (like copy()) can be used.
+
+ """
+ names = os.listdir(src)
+ if ignore is not None:
+ ignored_names = ignore(src, names)
+ else:
+ ignored_names = set()
+
+ if not (dirs_exist_ok and os.path.isdir(dst)):
+ os.makedirs(dst)
+
+ errors = []
+ for name in names:
+ if name in ignored_names:
+ continue
+ srcname = os.path.join(src, name)
+ dstname = os.path.join(dst, name)
+ try:
+ if os.path.islink(srcname):
+ linkto = os.readlink(srcname)
+ if symlinks:
+ # We can't just leave it to `copy_function` because legacy
+ # code with a custom `copy_function` may rely on copytree3
+ # doing the right thing.
+ os.symlink(linkto, dstname)
+ else:
+ # ignore dangling symlink if the flag is on
+ if not os.path.exists(linkto) and ignore_dangling_symlinks:
+ continue
+ # otherwise let the copy occurs. copy2 will raise an error
+ copy_function(srcname, dstname)
+ elif os.path.isdir(srcname):
+ copytree3(srcname, dstname, symlinks, ignore, copy_function, dirs_exist_ok=dirs_exist_ok)
+ else:
+ # Will raise a SpecialFileError for unsupported file types
+ copy_function(srcname, dstname)
+ # catch the Error from the recursive copytree3 so that we can
+ # continue with other files
+ except shutil.Error as err:
+ errors.extend(err.args[0])
+ except EnvironmentError as why:
+ errors.append((srcname, dstname, str(why)))
+ try:
+ shutil.copystat(src, dst)
+ except OSError as why:
+ if WindowsError is not None and isinstance(why, WindowsError):
+ # Copying file access times may fail on Windows
+ pass
+ else:
+ errors.extend((src, dst, str(why)))
+ if errors:
+ raise shutil.Error(errors)
+
+
+def walk_relative(path, topdown=True, onerror=None, followlinks=False):
+ for dirpath, dirnames, filenames in os.walk(path, topdown=topdown, onerror=onerror, followlinks=followlinks):
+ yield os.path.relpath(dirpath, path), dirnames, filenames
+
+
+def supports_clone():
+ if 'darwin' in sys.platform:
+ import platform
+
+ return list(map(int, platform.mac_ver()[0].split('.'))) >= [10, 13]
+ return False
+
+
+def commonpath(paths):
+ assert paths
+ if len(paths) == 1:
+ return next(iter(paths))
+
+ split_paths = [path.split(os.sep) for path in paths]
+ smin = min(split_paths)
+ smax = max(split_paths)
+
+ common = smin
+ for i, c in enumerate(smin):
+ if c != smax[i]:
+ common = smin[:i]
+ break
+
+ return os.path.sep.join(common)
+
+
+def set_execute_bits(filename):
+ stm = os.stat(filename).st_mode
+ exe = stm | 0o111
+ if stm != exe:
+ os.chmod(filename, exe)
diff --git a/library/python/fs/clonefile.pyx b/library/python/fs/clonefile.pyx
new file mode 100644
index 0000000000..830bb894f2
--- /dev/null
+++ b/library/python/fs/clonefile.pyx
@@ -0,0 +1,18 @@
+import six
+
+cdef extern from "sys/clonefile.h" nogil:
+ int clonefile(const char * src, const char * dst, int flags)
+
+cdef extern from "Python.h":
+ ctypedef struct PyObject
+ cdef PyObject *PyExc_OSError
+ PyObject *PyErr_SetFromErrno(PyObject *)
+
+cdef int _macos_clone_file(const char* src, const char* dst) except? 0:
+ if clonefile(src, dst, 0) == -1:
+ PyErr_SetFromErrno(PyExc_OSError)
+ return 0
+ return 1
+
+def macos_clone_file(src, dst):
+ return _macos_clone_file(six.ensure_binary(src), six.ensure_binary(dst)) != 0
diff --git a/library/python/fs/test/test_fs.py b/library/python/fs/test/test_fs.py
new file mode 100644
index 0000000000..9e2c70c069
--- /dev/null
+++ b/library/python/fs/test/test_fs.py
@@ -0,0 +1,1037 @@
+# coding=utf-8
+
+import errno
+import os
+import pytest
+import shutil
+import six
+
+import library.python.fs
+import library.python.strings
+import library.python.tmp
+import library.python.windows
+
+import yatest.common
+
+
+def in_env(case):
+ def wrapped_case(*args, **kwargs):
+ with library.python.tmp.temp_dir() as temp_dir:
+ case(lambda path: os.path.join(temp_dir, path))
+
+ return wrapped_case
+
+
+def mkfile(path, data=''):
+ with open(path, 'wb') as f:
+ if data:
+ f.write(data) if isinstance(data, six.binary_type) else f.write(
+ data.encode(library.python.strings.fs_encoding())
+ )
+
+
+def mktree_example(path, name):
+ os.mkdir(path(name))
+ mkfile(path(name + '/file1'), 'FILE1')
+ os.mkdir(path(name + '/dir1'))
+ os.mkdir(path(name + '/dir2'))
+ mkfile(path(name + '/dir2/file2'), 'FILE2')
+ mkfile(path(name + '/dir2/file3'), 'FILE3')
+
+
+def file_data(path):
+ with open(path, 'rb') as f:
+ return f.read().decode('utf-8')
+
+
+def serialize_tree(path):
+ if os.path.isfile(path):
+ return file_data(path)
+ data = {'dirs': set(), 'files': {}}
+ for dirpath, dirnames, filenames in os.walk(path):
+ dirpath_rel = os.path.relpath(dirpath, path)
+ if dirpath_rel == '.':
+ dirpath_rel = ''
+ data['dirs'].update(set(os.path.join(dirpath_rel, x) for x in dirnames))
+ data['files'].update({os.path.join(dirpath_rel, x): file_data(os.path.join(dirpath, x)) for x in filenames})
+ return data
+
+
+def trees_equal(dir1, dir2):
+ return serialize_tree(dir1) == serialize_tree(dir2)
+
+
+def inodes_unsupported():
+ return library.python.windows.on_win()
+
+
+def inodes_equal(path1, path2):
+ return os.stat(path1).st_ino == os.stat(path2).st_ino
+
+
+def gen_error_access_denied():
+ if library.python.windows.on_win():
+ err = WindowsError()
+ err.errno = errno.EACCES
+ err.strerror = ''
+ err.winerror = library.python.windows.ERRORS['ACCESS_DENIED']
+ else:
+ err = OSError()
+ err.errno = errno.EACCES
+ err.strerror = os.strerror(err.errno)
+ err.filename = 'unknown/file'
+ raise err
+
+
+def test_errorfix_win():
+ @library.python.fs.errorfix_win
+ def erroneous_func():
+ gen_error_access_denied()
+
+ with pytest.raises(OSError) as errinfo:
+ erroneous_func()
+ assert errinfo.value.errno == errno.EACCES
+ assert errinfo.value.filename == 'unknown/file'
+ # See transcode_error, which encodes strerror, in library/python/windows/__init__.py
+ assert isinstance(errinfo.value.strerror, (six.binary_type, six.text_type))
+ assert errinfo.value.strerror
+
+
+def test_custom_fs_error():
+ with pytest.raises(OSError) as errinfo:
+ raise library.python.fs.CustomFsError(errno.EACCES, filename='some/file')
+ assert errinfo.value.errno == errno.EACCES
+ # See transcode_error, which encodes strerror, in library/python/windows/__init__.py
+ assert isinstance(errinfo.value.strerror, (six.binary_type, six.text_type))
+ assert errinfo.value.filename == 'some/file'
+
+
+@in_env
+def test_ensure_dir(path):
+ library.python.fs.ensure_dir(path('dir/subdir'))
+ assert os.path.isdir(path('dir'))
+ assert os.path.isdir(path('dir/subdir'))
+
+
+@in_env
+def test_ensure_dir_exists(path):
+ os.makedirs(path('dir/subdir'))
+ library.python.fs.ensure_dir(path('dir/subdir'))
+ assert os.path.isdir(path('dir'))
+ assert os.path.isdir(path('dir/subdir'))
+
+
+@in_env
+def test_ensure_dir_exists_partly(path):
+ os.mkdir(path('dir'))
+ library.python.fs.ensure_dir(path('dir/subdir'))
+ assert os.path.isdir(path('dir'))
+ assert os.path.isdir(path('dir/subdir'))
+
+
+@in_env
+def test_ensure_dir_exists_file(path):
+ mkfile(path('dir'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.ensure_dir(path('dir/subdir'))
+ # ENOENT on Windows!
+ assert errinfo.value.errno in (errno.ENOTDIR, errno.ENOENT)
+ assert os.path.isfile(path('dir'))
+
+
+@in_env
+def test_create_dirs(path):
+ assert library.python.fs.create_dirs(path('dir/subdir')) == path('dir/subdir')
+ assert os.path.isdir(path('dir'))
+ assert os.path.isdir(path('dir/subdir'))
+
+
+@in_env
+def test_move_file(path):
+ mkfile(path('src'), 'SRC')
+ library.python.fs.move(path('src'), path('dst'))
+ assert not os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+
+@in_env
+def test_move_file_no_src(path):
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.move(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.ENOENT
+
+
+@in_env
+def test_move_file_exists(path):
+ mkfile(path('src'), 'SRC')
+ mkfile(path('dst'), 'DST')
+ if library.python.windows.on_win():
+ # move is platform-dependent, use replace_file for dst replacement on all platforms
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.move(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.EEXIST
+ assert os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'DST'
+ else:
+ library.python.fs.move(path('src'), path('dst'))
+ assert not os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+
+@in_env
+def test_move_file_exists_dir_empty(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.move(path('src'), path('dst'))
+ assert errinfo.value.errno in (errno.EEXIST, errno.EISDIR)
+ assert os.path.isfile(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert not os.path.isfile(path('dst/src'))
+
+
+@in_env
+def test_move_file_exists_dir_nonempty(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ mkfile(path('dst/dst_file'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.move(path('src'), path('dst'))
+ assert errinfo.value.errno in (errno.EEXIST, errno.EISDIR)
+ assert os.path.isfile(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/dst_file'))
+ assert not os.path.isfile(path('dst/src'))
+
+
+@in_env
+def test_move_dir(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ library.python.fs.move(path('src'), path('dst'))
+ assert not os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/src_file'))
+
+
+@in_env
+def test_move_dir_exists_empty(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ os.mkdir(path('dst'))
+ if library.python.windows.on_win():
+ # move is platform-dependent, use non-atomic replace for directory replacement
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.move(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.EEXIST
+ assert os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert not os.path.isfile(path('dst/src_file'))
+ else:
+ library.python.fs.move(path('src'), path('dst'))
+ assert not os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/src_file'))
+
+
+@in_env
+def test_move_dir_exists_nonempty(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ os.mkdir(path('dst'))
+ mkfile(path('dst/dst_file'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.move(path('src'), path('dst'))
+ assert errinfo.value.errno in (errno.EEXIST, errno.ENOTEMPTY)
+ assert os.path.isdir(path('src'))
+ assert os.path.isfile(path('src/src_file'))
+ assert os.path.isdir(path('dst'))
+ assert not os.path.isfile(path('dst/src_file'))
+ assert os.path.isfile(path('dst/dst_file'))
+
+
+@in_env
+def test_move_dir_exists_file(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ mkfile(path('dst'), 'DST')
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.move(path('src'), path('dst'))
+ assert errinfo.value.errno in (errno.EEXIST, errno.ENOTDIR)
+ assert os.path.isdir(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'DST'
+
+
+@in_env
+def test_replace_file(path):
+ mkfile(path('src'), 'SRC')
+ library.python.fs.replace_file(path('src'), path('dst'))
+ assert not os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+ mkfile(path('src'), 'SRC')
+ library.python.fs.replace(path('src'), path('dst2'))
+ assert not os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst2'))
+ assert file_data(path('dst2')) == 'SRC'
+
+
+@in_env
+def test_replace_file_no_src(path):
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.replace_file(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.ENOENT
+
+ with pytest.raises(OSError) as errinfo2:
+ library.python.fs.replace(path('src'), path('dst2'))
+ assert errinfo2.value.errno == errno.ENOENT
+
+
+@in_env
+def test_replace_file_exists(path):
+ mkfile(path('src'), 'SRC')
+ mkfile(path('dst'), 'DST')
+ library.python.fs.replace_file(path('src'), path('dst'))
+ assert not os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+ mkfile(path('src'), 'SRC')
+ mkfile(path('dst2'), 'DST')
+ library.python.fs.replace(path('src'), path('dst2'))
+ assert not os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst2'))
+ assert file_data(path('dst2')) == 'SRC'
+
+
+@in_env
+def test_replace_file_exists_dir_empty(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.replace_file(path('src'), path('dst'))
+ assert errinfo.value.errno in (errno.EISDIR, errno.EACCES)
+ assert os.path.isfile(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert not os.path.isfile(path('dst/src'))
+
+
+@in_env
+def test_replace_file_exists_dir_empty_overwrite(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ library.python.fs.replace(path('src'), path('dst'))
+ assert not os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+
+@in_env
+def test_replace_file_exists_dir_nonempty(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ mkfile(path('dst/dst_file'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.replace_file(path('src'), path('dst'))
+ assert errinfo.value.errno in (errno.EISDIR, errno.EACCES)
+ assert os.path.isfile(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/dst_file'))
+ assert not os.path.isfile(path('dst/src'))
+
+
+@in_env
+def test_replace_file_exists_dir_nonempty_overwrite(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ mkfile(path('dst/dst_file'))
+ library.python.fs.replace(path('src'), path('dst'))
+ assert not os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+
+@in_env
+def test_replace_dir(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ library.python.fs.replace(path('src'), path('dst'))
+ assert not os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/src_file'))
+
+
+@in_env
+def test_replace_dir_exists_empty(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ os.mkdir(path('dst'))
+ library.python.fs.replace(path('src'), path('dst'))
+ assert not os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/src_file'))
+
+
+@in_env
+def test_replace_dir_exists_nonempty(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ os.mkdir(path('dst'))
+ mkfile(path('dst/dst_file'))
+ library.python.fs.replace(path('src'), path('dst'))
+ assert not os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/src_file'))
+ assert not os.path.isfile(path('dst/dst_file'))
+
+
+@in_env
+def test_replace_dir_exists_file(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ mkfile(path('dst'), 'DST')
+ library.python.fs.replace(path('src'), path('dst'))
+ assert not os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/src_file'))
+
+
+@in_env
+def test_remove_file(path):
+ mkfile(path('path'))
+ library.python.fs.remove_file(path('path'))
+ assert not os.path.exists(path('path'))
+
+
+@in_env
+def test_remove_file_no(path):
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.remove_file(path('path'))
+ assert errinfo.value.errno == errno.ENOENT
+
+
+@in_env
+def test_remove_file_exists_dir(path):
+ os.mkdir(path('path'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.remove_file(path('path'))
+ assert errinfo.value.errno in (errno.EISDIR, errno.EACCES)
+ assert os.path.isdir(path('path'))
+
+
+@in_env
+def test_remove_dir(path):
+ os.mkdir(path('path'))
+ library.python.fs.remove_dir(path('path'))
+ assert not os.path.exists(path('path'))
+
+
+@in_env
+def test_remove_dir_no(path):
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.remove_dir(path('path'))
+ assert errinfo.value.errno == errno.ENOENT
+
+
+@in_env
+def test_remove_dir_exists_file(path):
+ mkfile(path('path'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.remove_dir(path('path'))
+ assert errinfo.value.errno in (errno.ENOTDIR, errno.EINVAL)
+ assert os.path.isfile(path('path'))
+
+
+@in_env
+def test_remove_tree(path):
+ mktree_example(path, 'path')
+ library.python.fs.remove_tree(path('path'))
+ assert not os.path.exists(path('path'))
+
+
+@in_env
+def test_remove_tree_empty(path):
+ os.mkdir(path('path'))
+ library.python.fs.remove_tree(path('path'))
+ assert not os.path.exists(path('path'))
+
+
+@in_env
+def test_remove_tree_file(path):
+ mkfile(path('path'))
+ library.python.fs.remove_tree(path('path'))
+ assert not os.path.exists(path('path'))
+
+
+@in_env
+def test_remove_tree_no(path):
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.remove_tree(path('path'))
+ assert errinfo.value.errno == errno.ENOENT
+
+
+@in_env
+def test_remove_tree_safe(path):
+ library.python.fs.remove_tree_safe(path('path'))
+
+
+@in_env
+def test_ensure_removed(path):
+ library.python.fs.ensure_removed(path('path'))
+
+
+@in_env
+def test_ensure_removed_exists(path):
+ os.makedirs(path('dir/subdir'))
+ library.python.fs.ensure_removed(path('dir'))
+ assert not os.path.exists(path('dir'))
+
+
+@in_env
+def test_ensure_removed_exists_precise(path):
+ os.makedirs(path('dir/subdir'))
+ library.python.fs.ensure_removed(path('dir/subdir'))
+ assert os.path.exists(path('dir'))
+ assert not os.path.exists(path('dir/subdir'))
+
+
+@in_env
+def test_hardlink_file(path):
+ mkfile(path('src'), 'SRC')
+ library.python.fs.hardlink(path('src'), path('dst'))
+ assert os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+ assert inodes_unsupported() or inodes_equal(path('src'), path('dst'))
+
+
+@in_env
+def test_hardlink_file_no_src(path):
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.ENOENT
+
+
+@in_env
+def test_hardlink_file_exists(path):
+ mkfile(path('src'), 'SRC')
+ mkfile(path('dst'), 'DST')
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.EEXIST
+ assert os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'DST'
+ assert inodes_unsupported() or not inodes_equal(path('src'), path('dst'))
+
+
+@in_env
+def test_hardlink_file_exists_dir(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.EEXIST
+ assert os.path.isfile(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert not os.path.isfile(path('dst/src'))
+
+
+@in_env
+def test_hardlink_dir(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink(path('src'), path('dst'))
+ assert errinfo.value.errno in (errno.EPERM, errno.EACCES)
+ assert os.path.isdir(path('src'))
+ assert not os.path.isdir(path('dst'))
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_symlink_file(path):
+ mkfile(path('src'), 'SRC')
+ library.python.fs.symlink(path('src'), path('dst'))
+ assert os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert os.path.islink(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_symlink_file_no_src(path):
+ library.python.fs.symlink(path('src'), path('dst'))
+ assert not os.path.isfile(path('src'))
+ assert not os.path.isfile(path('dst'))
+ assert os.path.islink(path('dst'))
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_symlink_file_exists(path):
+ mkfile(path('src'), 'SRC')
+ mkfile(path('dst'), 'DST')
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.symlink(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.EEXIST
+ assert os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert not os.path.islink(path('dst'))
+ assert file_data(path('dst')) == 'DST'
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_symlink_file_exists_dir(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.symlink(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.EEXIST
+ assert os.path.isfile(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert not os.path.islink(path('dst'))
+ assert not os.path.isfile(path('dst/src'))
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_symlink_dir(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ library.python.fs.symlink(path('src'), path('dst'))
+ assert os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.islink(path('dst'))
+ assert os.path.isfile(path('dst/src_file'))
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_symlink_dir_no_src(path):
+ library.python.fs.symlink(path('src'), path('dst'))
+ assert not os.path.isdir(path('src'))
+ assert not os.path.isdir(path('dst'))
+ assert os.path.islink(path('dst'))
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_symlink_dir_exists(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ os.mkdir(path('dst'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.symlink(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.EEXIST
+ assert os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert not os.path.islink(path('dst'))
+ assert not os.path.isfile(path('dst/src_file'))
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_symlink_dir_exists_file(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ mkfile(path('dst'), 'DST')
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.symlink(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.EEXIST
+ assert os.path.isdir(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert not os.path.islink(path('dst'))
+
+
+@in_env
+def test_hardlink_tree(path):
+ mktree_example(path, 'src')
+ library.python.fs.hardlink_tree(path('src'), path('dst'))
+ assert trees_equal(path('src'), path('dst'))
+
+
+@in_env
+def test_hardlink_tree_empty(path):
+ os.mkdir(path('src'))
+ library.python.fs.hardlink_tree(path('src'), path('dst'))
+ assert trees_equal(path('src'), path('dst'))
+
+
+@in_env
+def test_hardlink_tree_file(path):
+ mkfile(path('src'), 'SRC')
+ library.python.fs.hardlink_tree(path('src'), path('dst'))
+ assert trees_equal(path('src'), path('dst'))
+
+
+@in_env
+def test_hardlink_tree_no_src(path):
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink_tree(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.ENOENT
+
+
+@in_env
+def test_hardlink_tree_exists(path):
+ mktree_example(path, 'src')
+ os.mkdir(path('dst_dir'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink_tree(path('src'), path('dst_dir'))
+ assert errinfo.value.errno == errno.EEXIST
+ mkfile(path('dst_file'), 'DST')
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink_tree(path('src'), path('dst_file'))
+ assert errinfo.value.errno == errno.EEXIST
+
+
+@in_env
+def test_hardlink_tree_file_exists(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst_dir'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink_tree(path('src'), path('dst_dir'))
+ assert errinfo.value.errno == errno.EEXIST
+ mkfile(path('dst_file'), 'DST')
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink_tree(path('src'), path('dst_file'))
+ assert errinfo.value.errno == errno.EEXIST
+
+
+@in_env
+def test_copy_file(path):
+ mkfile(path('src'), 'SRC')
+ library.python.fs.copy_file(path('src'), path('dst'))
+ assert os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+
+@in_env
+def test_copy_file_no_src(path):
+ with pytest.raises(EnvironmentError):
+ library.python.fs.copy_file(path('src'), path('dst'))
+
+
+@in_env
+def test_copy_file_exists(path):
+ mkfile(path('src'), 'SRC')
+ mkfile(path('dst'), 'DST')
+ library.python.fs.copy_file(path('src'), path('dst'))
+ assert os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+
+@in_env
+def test_copy_file_exists_dir_empty(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ with pytest.raises(EnvironmentError):
+ library.python.fs.copy_file(path('src'), path('dst'))
+ assert os.path.isfile(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert not os.path.isfile(path('dst/src'))
+
+
+@in_env
+def test_copy_file_exists_dir_nonempty(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ mkfile(path('dst/dst_file'))
+ with pytest.raises(EnvironmentError):
+ library.python.fs.copy_file(path('src'), path('dst'))
+ assert os.path.isfile(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/dst_file'))
+ assert not os.path.isfile(path('dst/src'))
+
+
+@in_env
+def test_copy_tree(path):
+ mktree_example(path, 'src')
+ library.python.fs.copy_tree(path('src'), path('dst'))
+ assert trees_equal(path('src'), path('dst'))
+
+
+@in_env
+def test_copy_tree_empty(path):
+ os.mkdir(path('src'))
+ library.python.fs.copy_tree(path('src'), path('dst'))
+ assert trees_equal(path('src'), path('dst'))
+
+
+@in_env
+def test_copy_tree_file(path):
+ mkfile(path('src'), 'SRC')
+ library.python.fs.copy_tree(path('src'), path('dst'))
+ assert trees_equal(path('src'), path('dst'))
+
+
+@in_env
+def test_copy_tree_no_src(path):
+ with pytest.raises(EnvironmentError):
+ library.python.fs.copy_tree(path('src'), path('dst'))
+
+
+@in_env
+def test_copy_tree_exists(path):
+ mktree_example(path, 'src')
+ os.mkdir(path('dst_dir'))
+ with pytest.raises(EnvironmentError):
+ library.python.fs.copy_tree(path('src'), path('dst_dir'))
+ mkfile(path('dst_file'), 'DST')
+ with pytest.raises(EnvironmentError):
+ library.python.fs.copy_tree(path('src'), path('dst_file'))
+
+
+@in_env
+def test_copy_tree_file_exists(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst_dir'))
+ with pytest.raises(EnvironmentError):
+ library.python.fs.copy_tree(path('src'), path('dst_dir'))
+ mkfile(path('dst_file'), 'DST')
+ library.python.fs.copy_tree(path('src'), path('dst_file'))
+ assert trees_equal(path('src'), path('dst_file'))
+
+
+@in_env
+def test_read_file(path):
+ mkfile(path('src'), 'SRC')
+ assert library.python.fs.read_file(path('src')).decode(library.python.strings.fs_encoding()) == 'SRC'
+ assert library.python.fs.read_file(path('src'), binary=False) == 'SRC'
+
+
+@in_env
+def test_read_file_empty(path):
+ mkfile(path('src'))
+ assert library.python.fs.read_file(path('src')).decode(library.python.strings.fs_encoding()) == ''
+ assert library.python.fs.read_file(path('src'), binary=False) == ''
+
+
+@in_env
+def test_read_file_multiline(path):
+ mkfile(path('src'), 'SRC line 1\nSRC line 2\n')
+ assert (
+ library.python.fs.read_file(path('src')).decode(library.python.strings.fs_encoding())
+ == 'SRC line 1\nSRC line 2\n'
+ )
+ assert library.python.fs.read_file(path('src'), binary=False) == 'SRC line 1\nSRC line 2\n'
+
+
+@in_env
+def test_read_file_multiline_crlf(path):
+ mkfile(path('src'), 'SRC line 1\r\nSRC line 2\r\n')
+ assert (
+ library.python.fs.read_file(path('src')).decode(library.python.strings.fs_encoding())
+ == 'SRC line 1\r\nSRC line 2\r\n'
+ )
+ if library.python.windows.on_win() or six.PY3: # universal newlines are by default in text mode in python3
+ assert library.python.fs.read_file(path('src'), binary=False) == 'SRC line 1\nSRC line 2\n'
+ else:
+ assert library.python.fs.read_file(path('src'), binary=False) == 'SRC line 1\r\nSRC line 2\r\n'
+
+
+@in_env
+def test_read_file_unicode(path):
+ s = u'АБВ'
+ mkfile(path('src'), s.encode('utf-8'))
+ mkfile(path('src_cp1251'), s.encode('cp1251'))
+ assert library.python.fs.read_file_unicode(path('src')) == s
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), enc='cp1251') == s
+ assert library.python.fs.read_file_unicode(path('src'), binary=False) == s
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == s
+
+
+@in_env
+def test_read_file_unicode_empty(path):
+ mkfile(path('src'))
+ mkfile(path('src_cp1251'))
+ assert library.python.fs.read_file_unicode(path('src')) == ''
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), enc='cp1251') == ''
+ assert library.python.fs.read_file_unicode(path('src'), binary=False) == ''
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == ''
+
+
+@in_env
+def test_read_file_unicode_multiline(path):
+ s = u'АБВ\nИ еще\n'
+ mkfile(path('src'), s.encode('utf-8'))
+ mkfile(path('src_cp1251'), s.encode('cp1251'))
+ assert library.python.fs.read_file_unicode(path('src')) == s
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), enc='cp1251') == s
+ assert library.python.fs.read_file_unicode(path('src'), binary=False) == s
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == s
+
+
+@in_env
+def test_read_file_unicode_multiline_crlf(path):
+ s = u'АБВ\r\nИ еще\r\n'
+ mkfile(path('src'), s.encode('utf-8'))
+ mkfile(path('src_cp1251'), s.encode('cp1251'))
+ assert library.python.fs.read_file_unicode(path('src')) == s
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), enc='cp1251') == s
+ if library.python.windows.on_win() or six.PY3: # universal newlines are by default in text mode in python3
+ assert library.python.fs.read_file_unicode(path('src'), binary=False) == u'АБВ\nИ еще\n'
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == u'АБВ\nИ еще\n'
+ else:
+ assert library.python.fs.read_file_unicode(path('src'), binary=False) == s
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == s
+
+
+@in_env
+def test_write_file(path):
+ library.python.fs.write_file(path('src'), 'SRC')
+ assert file_data(path('src')) == 'SRC'
+ library.python.fs.write_file(path('src2'), 'SRC', binary=False)
+ assert file_data(path('src2')) == 'SRC'
+
+
+@in_env
+def test_write_file_empty(path):
+ library.python.fs.write_file(path('src'), '')
+ assert file_data(path('src')) == ''
+ library.python.fs.write_file(path('src2'), '', binary=False)
+ assert file_data(path('src2')) == ''
+
+
+@in_env
+def test_write_file_multiline(path):
+ library.python.fs.write_file(path('src'), 'SRC line 1\nSRC line 2\n')
+ assert file_data(path('src')) == 'SRC line 1\nSRC line 2\n'
+ library.python.fs.write_file(path('src2'), 'SRC line 1\nSRC line 2\n', binary=False)
+ if library.python.windows.on_win():
+ assert file_data(path('src2')) == 'SRC line 1\r\nSRC line 2\r\n'
+ else:
+ assert file_data(path('src2')) == 'SRC line 1\nSRC line 2\n'
+
+
+@in_env
+def test_write_file_multiline_crlf(path):
+ library.python.fs.write_file(path('src'), 'SRC line 1\r\nSRC line 2\r\n')
+ assert file_data(path('src')) == 'SRC line 1\r\nSRC line 2\r\n'
+ library.python.fs.write_file(path('src2'), 'SRC line 1\r\nSRC line 2\r\n', binary=False)
+ if library.python.windows.on_win():
+ assert file_data(path('src2')) == 'SRC line 1\r\r\nSRC line 2\r\r\n'
+ else:
+ assert file_data(path('src2')) == 'SRC line 1\r\nSRC line 2\r\n'
+
+
+@in_env
+def test_get_file_size(path):
+ mkfile(path('one.txt'), '22')
+ assert library.python.fs.get_file_size(path('one.txt')) == 2
+
+
+@in_env
+def test_get_file_size_empty(path):
+ mkfile(path('one.txt'))
+ assert library.python.fs.get_file_size(path('one.txt')) == 0
+
+
+@in_env
+def test_get_tree_size(path):
+ os.makedirs(path('deeper'))
+ mkfile(path('one.txt'), '1')
+ mkfile(path('deeper/two.txt'), '22')
+ assert library.python.fs.get_tree_size(path('one.txt')) == 1
+ assert library.python.fs.get_tree_size(path('')) == 1
+ assert library.python.fs.get_tree_size(path(''), recursive=True) == 3
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_get_tree_size_dangling_symlink(path):
+ os.makedirs(path('deeper'))
+ mkfile(path('one.txt'), '1')
+ mkfile(path('deeper/two.txt'), '22')
+ os.symlink(path('deeper/two.txt'), path("deeper/link.txt"))
+ os.remove(path('deeper/two.txt'))
+ # does not fail
+ assert library.python.fs.get_tree_size(path(''), recursive=True) == 1
+
+
+@pytest.mark.skipif(not library.python.windows.on_win(), reason='Test hardlinks on windows')
+def test_hardlink_or_copy():
+ max_allowed_hard_links = 1023
+
+ def run(hardlink_function, dir):
+ src = r"test.txt"
+ with open(src, "w") as f:
+ f.write("test")
+ for i in range(max_allowed_hard_links + 1):
+ hardlink_function(src, os.path.join(dir, "{}.txt".format(i)))
+
+ dir1 = library.python.fs.create_dirs("one")
+ with pytest.raises(WindowsError) as e:
+ run(library.python.fs.hardlink, dir1)
+ assert e.value.winerror == 1142
+ assert len(os.listdir(dir1)) == max_allowed_hard_links
+
+ dir2 = library.python.fs.create_dirs("two")
+ run(library.python.fs.hardlink_or_copy, dir2)
+ assert len(os.listdir(dir2)) == max_allowed_hard_links + 1
+
+
+def test_remove_tree_unicode():
+ path = u"test_remove_tree_unicode/русский".encode("utf-8")
+ os.makedirs(path)
+ library.python.fs.remove_tree(six.text_type("test_remove_tree_unicode"))
+ assert not os.path.exists("test_remove_tree_unicode")
+
+
+def test_remove_tree_safe_unicode():
+ path = u"test_remove_tree_safe_unicode/русский".encode("utf-8")
+ os.makedirs(path)
+ library.python.fs.remove_tree_safe(six.text_type("test_remove_tree_safe_unicode"))
+ assert not os.path.exists("test_remove_tree_safe_unicode")
+
+
+def test_copy_tree_custom_copy_function():
+ library.python.fs.create_dirs("test_copy_tree_src/deepper/inner")
+ library.python.fs.write_file("test_copy_tree_src/deepper/deepper.txt", "deepper.txt")
+ library.python.fs.write_file("test_copy_tree_src/deepper/inner/inner.txt", "inner.txt")
+ copied = []
+
+ def copy_function(src, dst):
+ shutil.copy2(src, dst)
+ copied.append(dst)
+
+ library.python.fs.copy_tree(
+ "test_copy_tree_src", yatest.common.work_path("test_copy_tree_dst"), copy_function=copy_function
+ )
+ assert len(copied) == 2
+ assert yatest.common.work_path("test_copy_tree_dst/deepper/deepper.txt") in copied
+ assert yatest.common.work_path("test_copy_tree_dst/deepper/inner/inner.txt") in copied
+
+
+def test_copy2():
+ library.python.fs.symlink("non-existent", "link")
+ library.python.fs.copy2("link", "link2", follow_symlinks=False)
+
+ assert os.path.islink("link2")
+ assert os.readlink("link2") == "non-existent"
+
+
+def test_commonpath():
+ pj = os.path.join
+ pja = lambda *x: os.path.abspath(pj(*x))
+
+ assert library.python.fs.commonpath(['a', 'b']) == ''
+ assert library.python.fs.commonpath([pj('t', '1')]) == pj('t', '1')
+ assert library.python.fs.commonpath([pj('t', '1'), pj('t', '2')]) == pj('t')
+ assert library.python.fs.commonpath([pj('t', '1', '2'), pj('t', '1', '2')]) == pj('t', '1', '2')
+ assert library.python.fs.commonpath([pj('t', '1', '1'), pj('t', '1', '2')]) == pj('t', '1')
+ assert library.python.fs.commonpath([pj('t', '1', '1'), pj('t', '1', '2'), pj('t', '1', '3')]) == pj('t', '1')
+
+ assert library.python.fs.commonpath([pja('t', '1', '1'), pja('t', '1', '2')]) == pja('t', '1')
+
+ assert library.python.fs.commonpath({pj('t', '1'), pj('t', '2')}) == pj('t')
diff --git a/library/python/fs/test/ya.make b/library/python/fs/test/ya.make
new file mode 100644
index 0000000000..33e3f5b4ff
--- /dev/null
+++ b/library/python/fs/test/ya.make
@@ -0,0 +1,14 @@
+OWNER(g:yatool)
+
+PY23_TEST()
+
+TEST_SRCS(
+ test_fs.py
+)
+
+PEERDIR(
+ library/python/fs
+ library/python/tmp
+)
+
+END()
diff --git a/library/python/fs/ya.make b/library/python/fs/ya.make
new file mode 100644
index 0000000000..b3c5092c71
--- /dev/null
+++ b/library/python/fs/ya.make
@@ -0,0 +1,23 @@
+OWNER(g:yatool)
+
+PY23_LIBRARY()
+
+PY_SRCS(
+ __init__.py
+)
+
+IF (OS_DARWIN)
+ PY_SRCS(
+ clonefile.pyx
+ )
+ENDIF()
+
+PEERDIR(
+ library/python/func
+ library/python/strings
+ library/python/windows
+)
+
+END()
+
+RECURSE_FOR_TESTS(test)