aboutsummaryrefslogtreecommitdiffstats
path: root/library/python/fs/test
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/python/fs/test
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/python/fs/test')
-rw-r--r--library/python/fs/test/test_fs.py1037
-rw-r--r--library/python/fs/test/ya.make14
2 files changed, 1051 insertions, 0 deletions
diff --git a/library/python/fs/test/test_fs.py b/library/python/fs/test/test_fs.py
new file mode 100644
index 00000000000..9e2c70c0690
--- /dev/null
+++ b/library/python/fs/test/test_fs.py
@@ -0,0 +1,1037 @@
+# coding=utf-8
+
+import errno
+import os
+import pytest
+import shutil
+import six
+
+import library.python.fs
+import library.python.strings
+import library.python.tmp
+import library.python.windows
+
+import yatest.common
+
+
+def in_env(case):
+ def wrapped_case(*args, **kwargs):
+ with library.python.tmp.temp_dir() as temp_dir:
+ case(lambda path: os.path.join(temp_dir, path))
+
+ return wrapped_case
+
+
+def mkfile(path, data=''):
+ with open(path, 'wb') as f:
+ if data:
+ f.write(data) if isinstance(data, six.binary_type) else f.write(
+ data.encode(library.python.strings.fs_encoding())
+ )
+
+
+def mktree_example(path, name):
+ os.mkdir(path(name))
+ mkfile(path(name + '/file1'), 'FILE1')
+ os.mkdir(path(name + '/dir1'))
+ os.mkdir(path(name + '/dir2'))
+ mkfile(path(name + '/dir2/file2'), 'FILE2')
+ mkfile(path(name + '/dir2/file3'), 'FILE3')
+
+
+def file_data(path):
+ with open(path, 'rb') as f:
+ return f.read().decode('utf-8')
+
+
+def serialize_tree(path):
+ if os.path.isfile(path):
+ return file_data(path)
+ data = {'dirs': set(), 'files': {}}
+ for dirpath, dirnames, filenames in os.walk(path):
+ dirpath_rel = os.path.relpath(dirpath, path)
+ if dirpath_rel == '.':
+ dirpath_rel = ''
+ data['dirs'].update(set(os.path.join(dirpath_rel, x) for x in dirnames))
+ data['files'].update({os.path.join(dirpath_rel, x): file_data(os.path.join(dirpath, x)) for x in filenames})
+ return data
+
+
+def trees_equal(dir1, dir2):
+ return serialize_tree(dir1) == serialize_tree(dir2)
+
+
+def inodes_unsupported():
+ return library.python.windows.on_win()
+
+
+def inodes_equal(path1, path2):
+ return os.stat(path1).st_ino == os.stat(path2).st_ino
+
+
+def gen_error_access_denied():
+ if library.python.windows.on_win():
+ err = WindowsError()
+ err.errno = errno.EACCES
+ err.strerror = ''
+ err.winerror = library.python.windows.ERRORS['ACCESS_DENIED']
+ else:
+ err = OSError()
+ err.errno = errno.EACCES
+ err.strerror = os.strerror(err.errno)
+ err.filename = 'unknown/file'
+ raise err
+
+
+def test_errorfix_win():
+ @library.python.fs.errorfix_win
+ def erroneous_func():
+ gen_error_access_denied()
+
+ with pytest.raises(OSError) as errinfo:
+ erroneous_func()
+ assert errinfo.value.errno == errno.EACCES
+ assert errinfo.value.filename == 'unknown/file'
+ # See transcode_error, which encodes strerror, in library/python/windows/__init__.py
+ assert isinstance(errinfo.value.strerror, (six.binary_type, six.text_type))
+ assert errinfo.value.strerror
+
+
+def test_custom_fs_error():
+ with pytest.raises(OSError) as errinfo:
+ raise library.python.fs.CustomFsError(errno.EACCES, filename='some/file')
+ assert errinfo.value.errno == errno.EACCES
+ # See transcode_error, which encodes strerror, in library/python/windows/__init__.py
+ assert isinstance(errinfo.value.strerror, (six.binary_type, six.text_type))
+ assert errinfo.value.filename == 'some/file'
+
+
+@in_env
+def test_ensure_dir(path):
+ library.python.fs.ensure_dir(path('dir/subdir'))
+ assert os.path.isdir(path('dir'))
+ assert os.path.isdir(path('dir/subdir'))
+
+
+@in_env
+def test_ensure_dir_exists(path):
+ os.makedirs(path('dir/subdir'))
+ library.python.fs.ensure_dir(path('dir/subdir'))
+ assert os.path.isdir(path('dir'))
+ assert os.path.isdir(path('dir/subdir'))
+
+
+@in_env
+def test_ensure_dir_exists_partly(path):
+ os.mkdir(path('dir'))
+ library.python.fs.ensure_dir(path('dir/subdir'))
+ assert os.path.isdir(path('dir'))
+ assert os.path.isdir(path('dir/subdir'))
+
+
+@in_env
+def test_ensure_dir_exists_file(path):
+ mkfile(path('dir'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.ensure_dir(path('dir/subdir'))
+ # ENOENT on Windows!
+ assert errinfo.value.errno in (errno.ENOTDIR, errno.ENOENT)
+ assert os.path.isfile(path('dir'))
+
+
+@in_env
+def test_create_dirs(path):
+ assert library.python.fs.create_dirs(path('dir/subdir')) == path('dir/subdir')
+ assert os.path.isdir(path('dir'))
+ assert os.path.isdir(path('dir/subdir'))
+
+
+@in_env
+def test_move_file(path):
+ mkfile(path('src'), 'SRC')
+ library.python.fs.move(path('src'), path('dst'))
+ assert not os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+
+@in_env
+def test_move_file_no_src(path):
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.move(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.ENOENT
+
+
+@in_env
+def test_move_file_exists(path):
+ mkfile(path('src'), 'SRC')
+ mkfile(path('dst'), 'DST')
+ if library.python.windows.on_win():
+ # move is platform-dependent, use replace_file for dst replacement on all platforms
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.move(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.EEXIST
+ assert os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'DST'
+ else:
+ library.python.fs.move(path('src'), path('dst'))
+ assert not os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+
+@in_env
+def test_move_file_exists_dir_empty(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.move(path('src'), path('dst'))
+ assert errinfo.value.errno in (errno.EEXIST, errno.EISDIR)
+ assert os.path.isfile(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert not os.path.isfile(path('dst/src'))
+
+
+@in_env
+def test_move_file_exists_dir_nonempty(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ mkfile(path('dst/dst_file'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.move(path('src'), path('dst'))
+ assert errinfo.value.errno in (errno.EEXIST, errno.EISDIR)
+ assert os.path.isfile(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/dst_file'))
+ assert not os.path.isfile(path('dst/src'))
+
+
+@in_env
+def test_move_dir(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ library.python.fs.move(path('src'), path('dst'))
+ assert not os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/src_file'))
+
+
+@in_env
+def test_move_dir_exists_empty(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ os.mkdir(path('dst'))
+ if library.python.windows.on_win():
+ # move is platform-dependent, use non-atomic replace for directory replacement
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.move(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.EEXIST
+ assert os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert not os.path.isfile(path('dst/src_file'))
+ else:
+ library.python.fs.move(path('src'), path('dst'))
+ assert not os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/src_file'))
+
+
+@in_env
+def test_move_dir_exists_nonempty(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ os.mkdir(path('dst'))
+ mkfile(path('dst/dst_file'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.move(path('src'), path('dst'))
+ assert errinfo.value.errno in (errno.EEXIST, errno.ENOTEMPTY)
+ assert os.path.isdir(path('src'))
+ assert os.path.isfile(path('src/src_file'))
+ assert os.path.isdir(path('dst'))
+ assert not os.path.isfile(path('dst/src_file'))
+ assert os.path.isfile(path('dst/dst_file'))
+
+
+@in_env
+def test_move_dir_exists_file(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ mkfile(path('dst'), 'DST')
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.move(path('src'), path('dst'))
+ assert errinfo.value.errno in (errno.EEXIST, errno.ENOTDIR)
+ assert os.path.isdir(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'DST'
+
+
+@in_env
+def test_replace_file(path):
+ mkfile(path('src'), 'SRC')
+ library.python.fs.replace_file(path('src'), path('dst'))
+ assert not os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+ mkfile(path('src'), 'SRC')
+ library.python.fs.replace(path('src'), path('dst2'))
+ assert not os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst2'))
+ assert file_data(path('dst2')) == 'SRC'
+
+
+@in_env
+def test_replace_file_no_src(path):
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.replace_file(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.ENOENT
+
+ with pytest.raises(OSError) as errinfo2:
+ library.python.fs.replace(path('src'), path('dst2'))
+ assert errinfo2.value.errno == errno.ENOENT
+
+
+@in_env
+def test_replace_file_exists(path):
+ mkfile(path('src'), 'SRC')
+ mkfile(path('dst'), 'DST')
+ library.python.fs.replace_file(path('src'), path('dst'))
+ assert not os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+ mkfile(path('src'), 'SRC')
+ mkfile(path('dst2'), 'DST')
+ library.python.fs.replace(path('src'), path('dst2'))
+ assert not os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst2'))
+ assert file_data(path('dst2')) == 'SRC'
+
+
+@in_env
+def test_replace_file_exists_dir_empty(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.replace_file(path('src'), path('dst'))
+ assert errinfo.value.errno in (errno.EISDIR, errno.EACCES)
+ assert os.path.isfile(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert not os.path.isfile(path('dst/src'))
+
+
+@in_env
+def test_replace_file_exists_dir_empty_overwrite(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ library.python.fs.replace(path('src'), path('dst'))
+ assert not os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+
+@in_env
+def test_replace_file_exists_dir_nonempty(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ mkfile(path('dst/dst_file'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.replace_file(path('src'), path('dst'))
+ assert errinfo.value.errno in (errno.EISDIR, errno.EACCES)
+ assert os.path.isfile(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/dst_file'))
+ assert not os.path.isfile(path('dst/src'))
+
+
+@in_env
+def test_replace_file_exists_dir_nonempty_overwrite(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ mkfile(path('dst/dst_file'))
+ library.python.fs.replace(path('src'), path('dst'))
+ assert not os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+
+@in_env
+def test_replace_dir(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ library.python.fs.replace(path('src'), path('dst'))
+ assert not os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/src_file'))
+
+
+@in_env
+def test_replace_dir_exists_empty(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ os.mkdir(path('dst'))
+ library.python.fs.replace(path('src'), path('dst'))
+ assert not os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/src_file'))
+
+
+@in_env
+def test_replace_dir_exists_nonempty(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ os.mkdir(path('dst'))
+ mkfile(path('dst/dst_file'))
+ library.python.fs.replace(path('src'), path('dst'))
+ assert not os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/src_file'))
+ assert not os.path.isfile(path('dst/dst_file'))
+
+
+@in_env
+def test_replace_dir_exists_file(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ mkfile(path('dst'), 'DST')
+ library.python.fs.replace(path('src'), path('dst'))
+ assert not os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/src_file'))
+
+
+@in_env
+def test_remove_file(path):
+ mkfile(path('path'))
+ library.python.fs.remove_file(path('path'))
+ assert not os.path.exists(path('path'))
+
+
+@in_env
+def test_remove_file_no(path):
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.remove_file(path('path'))
+ assert errinfo.value.errno == errno.ENOENT
+
+
+@in_env
+def test_remove_file_exists_dir(path):
+ os.mkdir(path('path'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.remove_file(path('path'))
+ assert errinfo.value.errno in (errno.EISDIR, errno.EACCES)
+ assert os.path.isdir(path('path'))
+
+
+@in_env
+def test_remove_dir(path):
+ os.mkdir(path('path'))
+ library.python.fs.remove_dir(path('path'))
+ assert not os.path.exists(path('path'))
+
+
+@in_env
+def test_remove_dir_no(path):
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.remove_dir(path('path'))
+ assert errinfo.value.errno == errno.ENOENT
+
+
+@in_env
+def test_remove_dir_exists_file(path):
+ mkfile(path('path'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.remove_dir(path('path'))
+ assert errinfo.value.errno in (errno.ENOTDIR, errno.EINVAL)
+ assert os.path.isfile(path('path'))
+
+
+@in_env
+def test_remove_tree(path):
+ mktree_example(path, 'path')
+ library.python.fs.remove_tree(path('path'))
+ assert not os.path.exists(path('path'))
+
+
+@in_env
+def test_remove_tree_empty(path):
+ os.mkdir(path('path'))
+ library.python.fs.remove_tree(path('path'))
+ assert not os.path.exists(path('path'))
+
+
+@in_env
+def test_remove_tree_file(path):
+ mkfile(path('path'))
+ library.python.fs.remove_tree(path('path'))
+ assert not os.path.exists(path('path'))
+
+
+@in_env
+def test_remove_tree_no(path):
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.remove_tree(path('path'))
+ assert errinfo.value.errno == errno.ENOENT
+
+
+@in_env
+def test_remove_tree_safe(path):
+ library.python.fs.remove_tree_safe(path('path'))
+
+
+@in_env
+def test_ensure_removed(path):
+ library.python.fs.ensure_removed(path('path'))
+
+
+@in_env
+def test_ensure_removed_exists(path):
+ os.makedirs(path('dir/subdir'))
+ library.python.fs.ensure_removed(path('dir'))
+ assert not os.path.exists(path('dir'))
+
+
+@in_env
+def test_ensure_removed_exists_precise(path):
+ os.makedirs(path('dir/subdir'))
+ library.python.fs.ensure_removed(path('dir/subdir'))
+ assert os.path.exists(path('dir'))
+ assert not os.path.exists(path('dir/subdir'))
+
+
+@in_env
+def test_hardlink_file(path):
+ mkfile(path('src'), 'SRC')
+ library.python.fs.hardlink(path('src'), path('dst'))
+ assert os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+ assert inodes_unsupported() or inodes_equal(path('src'), path('dst'))
+
+
+@in_env
+def test_hardlink_file_no_src(path):
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.ENOENT
+
+
+@in_env
+def test_hardlink_file_exists(path):
+ mkfile(path('src'), 'SRC')
+ mkfile(path('dst'), 'DST')
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.EEXIST
+ assert os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'DST'
+ assert inodes_unsupported() or not inodes_equal(path('src'), path('dst'))
+
+
+@in_env
+def test_hardlink_file_exists_dir(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.EEXIST
+ assert os.path.isfile(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert not os.path.isfile(path('dst/src'))
+
+
+@in_env
+def test_hardlink_dir(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink(path('src'), path('dst'))
+ assert errinfo.value.errno in (errno.EPERM, errno.EACCES)
+ assert os.path.isdir(path('src'))
+ assert not os.path.isdir(path('dst'))
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_symlink_file(path):
+ mkfile(path('src'), 'SRC')
+ library.python.fs.symlink(path('src'), path('dst'))
+ assert os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert os.path.islink(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_symlink_file_no_src(path):
+ library.python.fs.symlink(path('src'), path('dst'))
+ assert not os.path.isfile(path('src'))
+ assert not os.path.isfile(path('dst'))
+ assert os.path.islink(path('dst'))
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_symlink_file_exists(path):
+ mkfile(path('src'), 'SRC')
+ mkfile(path('dst'), 'DST')
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.symlink(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.EEXIST
+ assert os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert not os.path.islink(path('dst'))
+ assert file_data(path('dst')) == 'DST'
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_symlink_file_exists_dir(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.symlink(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.EEXIST
+ assert os.path.isfile(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert not os.path.islink(path('dst'))
+ assert not os.path.isfile(path('dst/src'))
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_symlink_dir(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ library.python.fs.symlink(path('src'), path('dst'))
+ assert os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.islink(path('dst'))
+ assert os.path.isfile(path('dst/src_file'))
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_symlink_dir_no_src(path):
+ library.python.fs.symlink(path('src'), path('dst'))
+ assert not os.path.isdir(path('src'))
+ assert not os.path.isdir(path('dst'))
+ assert os.path.islink(path('dst'))
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_symlink_dir_exists(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ os.mkdir(path('dst'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.symlink(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.EEXIST
+ assert os.path.isdir(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert not os.path.islink(path('dst'))
+ assert not os.path.isfile(path('dst/src_file'))
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_symlink_dir_exists_file(path):
+ os.mkdir(path('src'))
+ mkfile(path('src/src_file'))
+ mkfile(path('dst'), 'DST')
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.symlink(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.EEXIST
+ assert os.path.isdir(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert not os.path.islink(path('dst'))
+
+
+@in_env
+def test_hardlink_tree(path):
+ mktree_example(path, 'src')
+ library.python.fs.hardlink_tree(path('src'), path('dst'))
+ assert trees_equal(path('src'), path('dst'))
+
+
+@in_env
+def test_hardlink_tree_empty(path):
+ os.mkdir(path('src'))
+ library.python.fs.hardlink_tree(path('src'), path('dst'))
+ assert trees_equal(path('src'), path('dst'))
+
+
+@in_env
+def test_hardlink_tree_file(path):
+ mkfile(path('src'), 'SRC')
+ library.python.fs.hardlink_tree(path('src'), path('dst'))
+ assert trees_equal(path('src'), path('dst'))
+
+
+@in_env
+def test_hardlink_tree_no_src(path):
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink_tree(path('src'), path('dst'))
+ assert errinfo.value.errno == errno.ENOENT
+
+
+@in_env
+def test_hardlink_tree_exists(path):
+ mktree_example(path, 'src')
+ os.mkdir(path('dst_dir'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink_tree(path('src'), path('dst_dir'))
+ assert errinfo.value.errno == errno.EEXIST
+ mkfile(path('dst_file'), 'DST')
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink_tree(path('src'), path('dst_file'))
+ assert errinfo.value.errno == errno.EEXIST
+
+
+@in_env
+def test_hardlink_tree_file_exists(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst_dir'))
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink_tree(path('src'), path('dst_dir'))
+ assert errinfo.value.errno == errno.EEXIST
+ mkfile(path('dst_file'), 'DST')
+ with pytest.raises(OSError) as errinfo:
+ library.python.fs.hardlink_tree(path('src'), path('dst_file'))
+ assert errinfo.value.errno == errno.EEXIST
+
+
+@in_env
+def test_copy_file(path):
+ mkfile(path('src'), 'SRC')
+ library.python.fs.copy_file(path('src'), path('dst'))
+ assert os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+
+@in_env
+def test_copy_file_no_src(path):
+ with pytest.raises(EnvironmentError):
+ library.python.fs.copy_file(path('src'), path('dst'))
+
+
+@in_env
+def test_copy_file_exists(path):
+ mkfile(path('src'), 'SRC')
+ mkfile(path('dst'), 'DST')
+ library.python.fs.copy_file(path('src'), path('dst'))
+ assert os.path.isfile(path('src'))
+ assert os.path.isfile(path('dst'))
+ assert file_data(path('dst')) == 'SRC'
+
+
+@in_env
+def test_copy_file_exists_dir_empty(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ with pytest.raises(EnvironmentError):
+ library.python.fs.copy_file(path('src'), path('dst'))
+ assert os.path.isfile(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert not os.path.isfile(path('dst/src'))
+
+
+@in_env
+def test_copy_file_exists_dir_nonempty(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst'))
+ mkfile(path('dst/dst_file'))
+ with pytest.raises(EnvironmentError):
+ library.python.fs.copy_file(path('src'), path('dst'))
+ assert os.path.isfile(path('src'))
+ assert os.path.isdir(path('dst'))
+ assert os.path.isfile(path('dst/dst_file'))
+ assert not os.path.isfile(path('dst/src'))
+
+
+@in_env
+def test_copy_tree(path):
+ mktree_example(path, 'src')
+ library.python.fs.copy_tree(path('src'), path('dst'))
+ assert trees_equal(path('src'), path('dst'))
+
+
+@in_env
+def test_copy_tree_empty(path):
+ os.mkdir(path('src'))
+ library.python.fs.copy_tree(path('src'), path('dst'))
+ assert trees_equal(path('src'), path('dst'))
+
+
+@in_env
+def test_copy_tree_file(path):
+ mkfile(path('src'), 'SRC')
+ library.python.fs.copy_tree(path('src'), path('dst'))
+ assert trees_equal(path('src'), path('dst'))
+
+
+@in_env
+def test_copy_tree_no_src(path):
+ with pytest.raises(EnvironmentError):
+ library.python.fs.copy_tree(path('src'), path('dst'))
+
+
+@in_env
+def test_copy_tree_exists(path):
+ mktree_example(path, 'src')
+ os.mkdir(path('dst_dir'))
+ with pytest.raises(EnvironmentError):
+ library.python.fs.copy_tree(path('src'), path('dst_dir'))
+ mkfile(path('dst_file'), 'DST')
+ with pytest.raises(EnvironmentError):
+ library.python.fs.copy_tree(path('src'), path('dst_file'))
+
+
+@in_env
+def test_copy_tree_file_exists(path):
+ mkfile(path('src'), 'SRC')
+ os.mkdir(path('dst_dir'))
+ with pytest.raises(EnvironmentError):
+ library.python.fs.copy_tree(path('src'), path('dst_dir'))
+ mkfile(path('dst_file'), 'DST')
+ library.python.fs.copy_tree(path('src'), path('dst_file'))
+ assert trees_equal(path('src'), path('dst_file'))
+
+
+@in_env
+def test_read_file(path):
+ mkfile(path('src'), 'SRC')
+ assert library.python.fs.read_file(path('src')).decode(library.python.strings.fs_encoding()) == 'SRC'
+ assert library.python.fs.read_file(path('src'), binary=False) == 'SRC'
+
+
+@in_env
+def test_read_file_empty(path):
+ mkfile(path('src'))
+ assert library.python.fs.read_file(path('src')).decode(library.python.strings.fs_encoding()) == ''
+ assert library.python.fs.read_file(path('src'), binary=False) == ''
+
+
+@in_env
+def test_read_file_multiline(path):
+ mkfile(path('src'), 'SRC line 1\nSRC line 2\n')
+ assert (
+ library.python.fs.read_file(path('src')).decode(library.python.strings.fs_encoding())
+ == 'SRC line 1\nSRC line 2\n'
+ )
+ assert library.python.fs.read_file(path('src'), binary=False) == 'SRC line 1\nSRC line 2\n'
+
+
+@in_env
+def test_read_file_multiline_crlf(path):
+ mkfile(path('src'), 'SRC line 1\r\nSRC line 2\r\n')
+ assert (
+ library.python.fs.read_file(path('src')).decode(library.python.strings.fs_encoding())
+ == 'SRC line 1\r\nSRC line 2\r\n'
+ )
+ if library.python.windows.on_win() or six.PY3: # universal newlines are by default in text mode in python3
+ assert library.python.fs.read_file(path('src'), binary=False) == 'SRC line 1\nSRC line 2\n'
+ else:
+ assert library.python.fs.read_file(path('src'), binary=False) == 'SRC line 1\r\nSRC line 2\r\n'
+
+
+@in_env
+def test_read_file_unicode(path):
+ s = u'АБВ'
+ mkfile(path('src'), s.encode('utf-8'))
+ mkfile(path('src_cp1251'), s.encode('cp1251'))
+ assert library.python.fs.read_file_unicode(path('src')) == s
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), enc='cp1251') == s
+ assert library.python.fs.read_file_unicode(path('src'), binary=False) == s
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == s
+
+
+@in_env
+def test_read_file_unicode_empty(path):
+ mkfile(path('src'))
+ mkfile(path('src_cp1251'))
+ assert library.python.fs.read_file_unicode(path('src')) == ''
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), enc='cp1251') == ''
+ assert library.python.fs.read_file_unicode(path('src'), binary=False) == ''
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == ''
+
+
+@in_env
+def test_read_file_unicode_multiline(path):
+ s = u'АБВ\nИ еще\n'
+ mkfile(path('src'), s.encode('utf-8'))
+ mkfile(path('src_cp1251'), s.encode('cp1251'))
+ assert library.python.fs.read_file_unicode(path('src')) == s
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), enc='cp1251') == s
+ assert library.python.fs.read_file_unicode(path('src'), binary=False) == s
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == s
+
+
+@in_env
+def test_read_file_unicode_multiline_crlf(path):
+ s = u'АБВ\r\nИ еще\r\n'
+ mkfile(path('src'), s.encode('utf-8'))
+ mkfile(path('src_cp1251'), s.encode('cp1251'))
+ assert library.python.fs.read_file_unicode(path('src')) == s
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), enc='cp1251') == s
+ if library.python.windows.on_win() or six.PY3: # universal newlines are by default in text mode in python3
+ assert library.python.fs.read_file_unicode(path('src'), binary=False) == u'АБВ\nИ еще\n'
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == u'АБВ\nИ еще\n'
+ else:
+ assert library.python.fs.read_file_unicode(path('src'), binary=False) == s
+ assert library.python.fs.read_file_unicode(path('src_cp1251'), binary=False, enc='cp1251') == s
+
+
+@in_env
+def test_write_file(path):
+ library.python.fs.write_file(path('src'), 'SRC')
+ assert file_data(path('src')) == 'SRC'
+ library.python.fs.write_file(path('src2'), 'SRC', binary=False)
+ assert file_data(path('src2')) == 'SRC'
+
+
+@in_env
+def test_write_file_empty(path):
+ library.python.fs.write_file(path('src'), '')
+ assert file_data(path('src')) == ''
+ library.python.fs.write_file(path('src2'), '', binary=False)
+ assert file_data(path('src2')) == ''
+
+
+@in_env
+def test_write_file_multiline(path):
+ library.python.fs.write_file(path('src'), 'SRC line 1\nSRC line 2\n')
+ assert file_data(path('src')) == 'SRC line 1\nSRC line 2\n'
+ library.python.fs.write_file(path('src2'), 'SRC line 1\nSRC line 2\n', binary=False)
+ if library.python.windows.on_win():
+ assert file_data(path('src2')) == 'SRC line 1\r\nSRC line 2\r\n'
+ else:
+ assert file_data(path('src2')) == 'SRC line 1\nSRC line 2\n'
+
+
+@in_env
+def test_write_file_multiline_crlf(path):
+ library.python.fs.write_file(path('src'), 'SRC line 1\r\nSRC line 2\r\n')
+ assert file_data(path('src')) == 'SRC line 1\r\nSRC line 2\r\n'
+ library.python.fs.write_file(path('src2'), 'SRC line 1\r\nSRC line 2\r\n', binary=False)
+ if library.python.windows.on_win():
+ assert file_data(path('src2')) == 'SRC line 1\r\r\nSRC line 2\r\r\n'
+ else:
+ assert file_data(path('src2')) == 'SRC line 1\r\nSRC line 2\r\n'
+
+
+@in_env
+def test_get_file_size(path):
+ mkfile(path('one.txt'), '22')
+ assert library.python.fs.get_file_size(path('one.txt')) == 2
+
+
+@in_env
+def test_get_file_size_empty(path):
+ mkfile(path('one.txt'))
+ assert library.python.fs.get_file_size(path('one.txt')) == 0
+
+
+@in_env
+def test_get_tree_size(path):
+ os.makedirs(path('deeper'))
+ mkfile(path('one.txt'), '1')
+ mkfile(path('deeper/two.txt'), '22')
+ assert library.python.fs.get_tree_size(path('one.txt')) == 1
+ assert library.python.fs.get_tree_size(path('')) == 1
+ assert library.python.fs.get_tree_size(path(''), recursive=True) == 3
+
+
+@pytest.mark.skipif(library.python.windows.on_win(), reason='Symlinks disabled on Windows')
+@in_env
+def test_get_tree_size_dangling_symlink(path):
+ os.makedirs(path('deeper'))
+ mkfile(path('one.txt'), '1')
+ mkfile(path('deeper/two.txt'), '22')
+ os.symlink(path('deeper/two.txt'), path("deeper/link.txt"))
+ os.remove(path('deeper/two.txt'))
+ # does not fail
+ assert library.python.fs.get_tree_size(path(''), recursive=True) == 1
+
+
+@pytest.mark.skipif(not library.python.windows.on_win(), reason='Test hardlinks on windows')
+def test_hardlink_or_copy():
+ max_allowed_hard_links = 1023
+
+ def run(hardlink_function, dir):
+ src = r"test.txt"
+ with open(src, "w") as f:
+ f.write("test")
+ for i in range(max_allowed_hard_links + 1):
+ hardlink_function(src, os.path.join(dir, "{}.txt".format(i)))
+
+ dir1 = library.python.fs.create_dirs("one")
+ with pytest.raises(WindowsError) as e:
+ run(library.python.fs.hardlink, dir1)
+ assert e.value.winerror == 1142
+ assert len(os.listdir(dir1)) == max_allowed_hard_links
+
+ dir2 = library.python.fs.create_dirs("two")
+ run(library.python.fs.hardlink_or_copy, dir2)
+ assert len(os.listdir(dir2)) == max_allowed_hard_links + 1
+
+
+def test_remove_tree_unicode():
+ path = u"test_remove_tree_unicode/русский".encode("utf-8")
+ os.makedirs(path)
+ library.python.fs.remove_tree(six.text_type("test_remove_tree_unicode"))
+ assert not os.path.exists("test_remove_tree_unicode")
+
+
+def test_remove_tree_safe_unicode():
+ path = u"test_remove_tree_safe_unicode/русский".encode("utf-8")
+ os.makedirs(path)
+ library.python.fs.remove_tree_safe(six.text_type("test_remove_tree_safe_unicode"))
+ assert not os.path.exists("test_remove_tree_safe_unicode")
+
+
+def test_copy_tree_custom_copy_function():
+ library.python.fs.create_dirs("test_copy_tree_src/deepper/inner")
+ library.python.fs.write_file("test_copy_tree_src/deepper/deepper.txt", "deepper.txt")
+ library.python.fs.write_file("test_copy_tree_src/deepper/inner/inner.txt", "inner.txt")
+ copied = []
+
+ def copy_function(src, dst):
+ shutil.copy2(src, dst)
+ copied.append(dst)
+
+ library.python.fs.copy_tree(
+ "test_copy_tree_src", yatest.common.work_path("test_copy_tree_dst"), copy_function=copy_function
+ )
+ assert len(copied) == 2
+ assert yatest.common.work_path("test_copy_tree_dst/deepper/deepper.txt") in copied
+ assert yatest.common.work_path("test_copy_tree_dst/deepper/inner/inner.txt") in copied
+
+
+def test_copy2():
+ library.python.fs.symlink("non-existent", "link")
+ library.python.fs.copy2("link", "link2", follow_symlinks=False)
+
+ assert os.path.islink("link2")
+ assert os.readlink("link2") == "non-existent"
+
+
+def test_commonpath():
+ pj = os.path.join
+ pja = lambda *x: os.path.abspath(pj(*x))
+
+ assert library.python.fs.commonpath(['a', 'b']) == ''
+ assert library.python.fs.commonpath([pj('t', '1')]) == pj('t', '1')
+ assert library.python.fs.commonpath([pj('t', '1'), pj('t', '2')]) == pj('t')
+ assert library.python.fs.commonpath([pj('t', '1', '2'), pj('t', '1', '2')]) == pj('t', '1', '2')
+ assert library.python.fs.commonpath([pj('t', '1', '1'), pj('t', '1', '2')]) == pj('t', '1')
+ assert library.python.fs.commonpath([pj('t', '1', '1'), pj('t', '1', '2'), pj('t', '1', '3')]) == pj('t', '1')
+
+ assert library.python.fs.commonpath([pja('t', '1', '1'), pja('t', '1', '2')]) == pja('t', '1')
+
+ assert library.python.fs.commonpath({pj('t', '1'), pj('t', '2')}) == pj('t')
diff --git a/library/python/fs/test/ya.make b/library/python/fs/test/ya.make
new file mode 100644
index 00000000000..33e3f5b4fff
--- /dev/null
+++ b/library/python/fs/test/ya.make
@@ -0,0 +1,14 @@
+OWNER(g:yatool)
+
+PY23_TEST()
+
+TEST_SRCS(
+ test_fs.py
+)
+
+PEERDIR(
+ library/python/fs
+ library/python/tmp
+)
+
+END()