# coding: utf-8
import os
import stat
import sys
import shutil
import logging
from six import reraise
import library.python.func
import library.python.strings
logger = logging.getLogger(__name__)
ERRORS = {
'SUCCESS': 0,
'PATH_NOT_FOUND': 3,
'ACCESS_DENIED': 5,
'SHARING_VIOLATION': 32,
'INSUFFICIENT_BUFFER': 122,
'DIR_NOT_EMPTY': 145,
}
RETRIABLE_FILE_ERRORS = (ERRORS['ACCESS_DENIED'], ERRORS['SHARING_VIOLATION'])
RETRIABLE_DIR_ERRORS = (ERRORS['ACCESS_DENIED'], ERRORS['DIR_NOT_EMPTY'], ERRORS['SHARING_VIOLATION'])
@library.python.func.lazy
def on_win():
"""Check if code run on Windows"""
return os.name == 'nt'
class NotOnWindowsError(RuntimeError):
def __init__(self, message):
super(NotOnWindowsError, self).__init__(message)
class DisabledOnWindowsError(RuntimeError):
def __init__(self, message):
super(DisabledOnWindowsError, self).__init__(message)
class NoCTypesError(RuntimeError):
def __init__(self, message):
super(NoCTypesError, self).__init__(message)
def win_only(f):
"""Decorator for Windows-only functions"""
def f_wrapped(*args, **kwargs):
if not on_win():
raise NotOnWindowsError('Windows-only function is called, but platform is not Windows')
return f(*args, **kwargs)
return f_wrapped
def win_disabled(f):
"""Decorator for functions disabled on Windows"""
def f_wrapped(*args, **kwargs):
if on_win():
run_disabled()
return f(*args, **kwargs)
return f_wrapped
def errorfix(f):
if not on_win():
return f
def f_wrapped(*args, **kwargs):
try:
return f(*args, **kwargs)
except WindowsError:
tp, value, tb = sys.exc_info()
fix_error(value)
reraise(tp, value, tb)
return f_wrapped
def diehard(winerrors, tries=100, delay=1):
"""
Decorator for diehard wrapper
On Windows platform retries to run function while specific WindowsError is thrown
On non-Windows platforms fallbacks to function itself
"""
def wrap(f):
if not on_win():
return f
return lambda *args, **kwargs: run_diehard(f, winerrors, tries, delay, *args, **kwargs)
return wrap
def win_path_fix(path):
"""Fix slashes in paths on windows"""
return path if sys.platform != 'win32' else path.replace('\\', '/')
if on_win():
import msvcrt
import time
import library.python.strings
_has_ctypes = True
try:
import ctypes
from ctypes import wintypes
except ImportError:
_has_ctypes = False
_INVALID_HANDLE_VALUE = -1
_MOVEFILE_REPLACE_EXISTING = 0x1
_MOVEFILE_WRITE_THROUGH = 0x8
_SEM_FAILCRITICALERRORS = 0x1
_SEM_NOGPFAULTERRORBOX = 0x2
_SEM_NOALIGNMENTFAULTEXCEPT = 0x4
_SEM_NOOPENFILEERRORBOX = 0x8
_SYMBOLIC_LINK_FLAG_DIRECTORY = 0x1
_CREATE_NO_WINDOW = 0x8000000
_ATOMIC_RENAME_FILE_TRANSACTION_DEFAULT_TIMEOUT = 1000
_HANDLE_FLAG_INHERIT = 0x1
@win_only
def require_ctypes(f):
def f_wrapped(*args, **kwargs):
if not _has_ctypes:
raise NoCTypesError('No ctypes found')
return f(*args, **kwargs)
return f_wrapped
# Run function in diehard mode (see diehard decorator commentary)
@win_only
def run_diehard(f, winerrors, tries, delay, *args, **kwargs):
if isinstance(winerrors, int):
winerrors = (winerrors,)
ei = None
for t in range(tries):
if t:
logger.debug('Diehard [errs %s]: try #%d in %s', ','.join(str(x) for x in winerrors), t, f)
try:
return f(*args, **kwargs)
except WindowsError as e:
if e.winerror not in winerrors:
raise
ei = sys.exc_info()
time.sleep(delay)
reraise(ei[0], ei[1], ei[2])
@win_only
def run_disabled(*args, **kwargs):
"""Placeholder for disabled functions"""
raise DisabledOnWindowsError('Function called is disabled on Windows')
class CustomWinError(WindowsError):
def __init__(self, winerror, message='', filename=None):
super(CustomWinError, self).__init__(winerror, message)
self.message = message
self.strerror = self.message if self.message else format_error(self.windows_error)
self.filename = filename
self.utf8 = True
@win_only
def unicode_path(path):
return library.python.strings.to_unicode(path, library.python.strings.fs_encoding())
@win_only
@require_ctypes
def format_error(error):
if isinstance(error, WindowsError):
error = error.winerror
if not isinstance(error, int):
return 'Unknown'
return ctypes.FormatError(error)
@win_only
def fix_error(windows_error):
if not windows_error.strerror:
windows_error.strerror = format_error(windows_error)
transcode_error(windows_error)
@win_only
def transcode_error(windows_error, to_enc='utf-8'):
from_enc = 'utf-8' if getattr(windows_error, 'utf8', False) else library.python.strings.guess_default_encoding()
if from_enc != to_enc:
windows_error.strerror = library.python.strings.to_str(
windows_error.strerror, to_enc=to_enc, from_enc=from_enc
)
setattr(windows_error, 'utf8', to_enc == 'utf-8')
class Transaction(object):
def __init__(self, timeout=None, description=''):
self.timeout = timeout
self.description = description
@require_ctypes
def __enter__(self):
self._handle = ctypes.windll.ktmw32.CreateTransaction(None, 0, 0, 0, 0, self.timeout, self.description)
if self._handle == _INVALID_HANDLE_VALUE:
raise ctypes.WinError()
return self._handle
@require_ctypes
def __exit__(self, t, v, tb):
try:
if not ctypes.windll.ktmw32.CommitTransaction(self._handle):
raise ctypes.WinError()
finally:
ctypes.windll.kernel32.CloseHandle(self._handle)
@win_only
def file_handle(f):
return msvcrt.get_osfhandle(f.fileno())
# https://www.python.org/dev/peps/pep-0446/
# http://mihalop.blogspot.ru/2014/05/python-subprocess-and-file-descriptors.html
@require_ctypes
@win_only
def open_file(*args, **kwargs):
f = open(*args, **kwargs)
ctypes.windll.kernel32.SetHandleInformation(file_handle(f), _HANDLE_FLAG_INHERIT, 0)
return f
@win_only
@require_ctypes
def replace_file(src, dst):
if not ctypes.windll.kernel32.MoveFileExW(
unicode_path(src), unicode_path(dst), _MOVEFILE_REPLACE_EXISTING | _MOVEFILE_WRITE_THROUGH
):
raise ctypes.WinError()
@win_only
@require_ctypes
def replace_file_across_devices(src, dst):
with Transaction(
timeout=_ATOMIC_RENAME_FILE_TRANSACTION_DEFAULT_TIMEOUT,
description='ya library.python.windows replace_file_across_devices',
) as transaction:
if not ctypes.windll.kernel32.MoveFileTransactedW(
unicode_path(src),
unicode_path(dst),
None,
None,
_MOVEFILE_REPLACE_EXISTING | _MOVEFILE_WRITE_THROUGH,
transaction,
):
raise ctypes.WinError()
@win_only
@require_ctypes
def hardlink(src, lnk):
if not ctypes.windll.kernel32.CreateHardLinkW(unicode_path(lnk), unicode_path(src), None):
raise ctypes.WinError()
# Requires SE_CREATE_SYMBOLIC_LINK_NAME privilege
@win_only
@win_disabled
@require_ctypes
def symlink_file(src, lnk):
if not ctypes.windll.kernel32.CreateSymbolicLinkW(unicode_path(lnk), unicode_path(src), 0):
raise ctypes.WinError()
# Requires SE_CREATE_SYMBOLIC_LINK_NAME privilege
@win_only
@win_disabled
@require_ctypes
def symlink_dir(src, lnk):
if not ctypes.windll.kernel32.CreateSymbolicLinkW(
unicode_path(lnk), unicode_path(src), _SYMBOLIC_LINK_FLAG_DIRECTORY
):
raise ctypes.WinError()
@win_only
@require_ctypes
def lock_file(f, offset, length, raises=True):
locked = ctypes.windll.kernel32.LockFile(
file_handle(f), _low_dword(offset), _high_dword(offset), _low_dword(length), _high_dword(length)
)
if not raises:
return bool(locked)
if not locked:
raise ctypes.WinError()
@win_only
@require_ctypes
def unlock_file(f, offset, length, raises=True):
unlocked = ctypes.windll.kernel32.UnlockFile(
file_handle(f), _low_dword(offset), _high_dword(offset), _low_dword(length), _high_dword(length)
)
if not raises:
return bool(unlocked)
if not unlocked:
raise ctypes.WinError()
@win_only
@require_ctypes
def set_error_mode(mode):
return ctypes.windll.kernel32.SetErrorMode(mode)
@win_only
def rmtree(path):
def error_handler(func, handling_path, execinfo):
e = execinfo[1]
if e.winerror == ERRORS['PATH_NOT_FOUND']:
handling_path = "\\\\?\\" + handling_path # handle path over 256 symbols
if os.path.exists(path):
return func(handling_path)
if e.winerror == ERRORS['ACCESS_DENIED']:
try:
# removing of r/w directory with read-only files in it yields ACCESS_DENIED
# which is not an insuperable obstacle https://bugs.python.org/issue19643
os.chmod(handling_path, stat.S_IWRITE)
except OSError:
pass
else:
# propagate true last error if this attempt fails
return func(handling_path)
raise e
shutil.rmtree(path, onerror=error_handler)
# Don't display the Windows GPF dialog if the invoked program dies.
# http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
@win_only
def disable_error_dialogs():
set_error_mode(_SEM_NOGPFAULTERRORBOX | _SEM_FAILCRITICALERRORS)
@win_only
def default_process_creation_flags():
return 0
@require_ctypes
def _low_dword(x):
return ctypes.c_ulong(x & ((1 << 32) - 1))
@require_ctypes
def _high_dword(x):
return ctypes.c_ulong((x >> 32) & ((1 << 32) - 1))
@win_only
@require_ctypes
def get_current_process():
handle = ctypes.windll.kernel32.GetCurrentProcess()
if not handle:
raise ctypes.WinError()
return wintypes.HANDLE(handle)
@win_only
@require_ctypes
def get_process_handle_count(proc_handle):
assert isinstance(proc_handle, wintypes.HANDLE)
GetProcessHandleCount = ctypes.WINFUNCTYPE(wintypes.BOOL, wintypes.HANDLE, wintypes.POINTER(wintypes.DWORD))(
("GetProcessHandleCount", ctypes.windll.kernel32)
)
hndcnt = wintypes.DWORD()
if not GetProcessHandleCount(proc_handle, ctypes.byref(hndcnt)):
raise ctypes.WinError()
return hndcnt.value
@win_only
@require_ctypes
def set_handle_information(file, inherit=None, protect_from_close=None):
for flag, value in [(inherit, 1), (protect_from_close, 2)]:
if flag is not None:
assert isinstance(flag, bool)
if not ctypes.windll.kernel32.SetHandleInformation(
file_handle(file), _low_dword(value), _low_dword(int(flag))
):
raise ctypes.WinError()
@win_only
@require_ctypes
def get_windows_directory():
buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
size = ctypes.windll.kernel32.GetWindowsDirectoryW(buf, ctypes.wintypes.MAX_PATH)
if not size:
raise ctypes.WinError()
if size > ctypes.wintypes.MAX_PATH - 1:
raise CustomWinError(ERRORS['INSUFFICIENT_BUFFER'])
return ctypes.wstring_at(buf, size)