aboutsummaryrefslogtreecommitdiffstats
path: root/library/python/windows
diff options
context:
space:
mode:
authorNikita Slyusarev <nslus@yandex-team.com>2022-02-10 16:46:52 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:52 +0300
commitcd77cecfc03a3eaf87816af28a33067c4f0cdb59 (patch)
tree1308e0bae862d52e0020d881fe758080437fe389 /library/python/windows
parentcdae02d225fb5b3afbb28990e79a7ac6c9125327 (diff)
downloadydb-cd77cecfc03a3eaf87816af28a33067c4f0cdb59.tar.gz
Restoring authorship annotation for Nikita Slyusarev <nslus@yandex-team.com>. Commit 1 of 2.
Diffstat (limited to 'library/python/windows')
-rw-r--r--library/python/windows/__init__.py538
-rw-r--r--library/python/windows/ut/test_windows.py162
-rw-r--r--library/python/windows/ut/ya.make2
3 files changed, 351 insertions, 351 deletions
diff --git a/library/python/windows/__init__.py b/library/python/windows/__init__.py
index 62861b3309..2e426f2bd8 100644
--- a/library/python/windows/__init__.py
+++ b/library/python/windows/__init__.py
@@ -1,290 +1,290 @@
# coding: utf-8
-import os
+import os
import stat
import sys
import shutil
import logging
-
+
from six import reraise
import library.python.func
import library.python.strings
-
+
logger = logging.getLogger(__name__)
+
-
-ERRORS = {
- 'SUCCESS': 0,
+ERRORS = {
+ 'SUCCESS': 0,
'PATH_NOT_FOUND': 3,
'ACCESS_DENIED': 5,
'SHARING_VIOLATION': 32,
- 'INSUFFICIENT_BUFFER': 122,
- 'DIR_NOT_EMPTY': 145,
-}
-
+ 'INSUFFICIENT_BUFFER': 122,
+ 'DIR_NOT_EMPTY': 145,
+}
+
RETRIABLE_FILE_ERRORS = (ERRORS['ACCESS_DENIED'], ERRORS['SHARING_VIOLATION'])
RETRIABLE_DIR_ERRORS = (ERRORS['ACCESS_DENIED'], ERRORS['DIR_NOT_EMPTY'], ERRORS['SHARING_VIOLATION'])
+
-
-# Check if on Windows
+# Check if on Windows
@library.python.func.lazy
-def on_win():
- return os.name == 'nt'
-
-
-class NotOnWindowsError(RuntimeError):
- def __init__(self, message):
- super(NotOnWindowsError, self).__init__(message)
-
-
-class DisabledOnWindowsError(RuntimeError):
- def __init__(self, message):
- super(DisabledOnWindowsError, self).__init__(message)
-
-
-class NoCTypesError(RuntimeError):
- def __init__(self, message):
- super(NoCTypesError, self).__init__(message)
-
-
-# Decorator for Windows-only functions
-def win_only(f):
- def f_wrapped(*args, **kwargs):
- if not on_win():
- raise NotOnWindowsError('Windows-only function is called, but platform is not Windows')
- return f(*args, **kwargs)
-
- return f_wrapped
-
-
-# Decorator for functions disabled on Windows
-def win_disabled(f):
- def f_wrapped(*args, **kwargs):
- if on_win():
- run_disabled()
- return f(*args, **kwargs)
-
- return f_wrapped
-
-
-def errorfix(f):
- if not on_win():
- return f
-
- def f_wrapped(*args, **kwargs):
- try:
- return f(*args, **kwargs)
+def on_win():
+ return os.name == 'nt'
+
+
+class NotOnWindowsError(RuntimeError):
+ def __init__(self, message):
+ super(NotOnWindowsError, self).__init__(message)
+
+
+class DisabledOnWindowsError(RuntimeError):
+ def __init__(self, message):
+ super(DisabledOnWindowsError, self).__init__(message)
+
+
+class NoCTypesError(RuntimeError):
+ def __init__(self, message):
+ super(NoCTypesError, self).__init__(message)
+
+
+# Decorator for Windows-only functions
+def win_only(f):
+ def f_wrapped(*args, **kwargs):
+ if not on_win():
+ raise NotOnWindowsError('Windows-only function is called, but platform is not Windows')
+ return f(*args, **kwargs)
+
+ return f_wrapped
+
+
+# Decorator for functions disabled on Windows
+def win_disabled(f):
+ def f_wrapped(*args, **kwargs):
+ if on_win():
+ run_disabled()
+ return f(*args, **kwargs)
+
+ return f_wrapped
+
+
+def errorfix(f):
+ if not on_win():
+ return f
+
+ def f_wrapped(*args, **kwargs):
+ try:
+ return f(*args, **kwargs)
except WindowsError:
tp, value, tb = sys.exc_info()
fix_error(value)
reraise(tp, value, tb)
-
- return f_wrapped
-
-
-# Decorator for diehard wrapper
-# On Windows platform retries to run function while specific WindowsError is thrown
-# On non-Windows platforms fallbacks to function itself
-def diehard(winerrors, tries=100, delay=1):
- def wrap(f):
- if not on_win():
- return f
-
- return lambda *args, **kwargs: run_diehard(f, winerrors, tries, delay, *args, **kwargs)
-
- return wrap
-
-
-if on_win():
- import msvcrt
- import time
-
+
+ return f_wrapped
+
+
+# Decorator for diehard wrapper
+# On Windows platform retries to run function while specific WindowsError is thrown
+# On non-Windows platforms fallbacks to function itself
+def diehard(winerrors, tries=100, delay=1):
+ def wrap(f):
+ if not on_win():
+ return f
+
+ return lambda *args, **kwargs: run_diehard(f, winerrors, tries, delay, *args, **kwargs)
+
+ return wrap
+
+
+if on_win():
+ import msvcrt
+ import time
+
import library.python.strings
-
- _has_ctypes = True
- try:
- import ctypes
+
+ _has_ctypes = True
+ try:
+ import ctypes
from ctypes import wintypes
- except ImportError:
- _has_ctypes = False
-
- _INVALID_HANDLE_VALUE = -1
-
- _MOVEFILE_REPLACE_EXISTING = 0x1
- _MOVEFILE_WRITE_THROUGH = 0x8
-
- _SEM_FAILCRITICALERRORS = 0x1
- _SEM_NOGPFAULTERRORBOX = 0x2
- _SEM_NOALIGNMENTFAULTEXCEPT = 0x4
- _SEM_NOOPENFILEERRORBOX = 0x8
-
- _SYMBOLIC_LINK_FLAG_DIRECTORY = 0x1
-
- _CREATE_NO_WINDOW = 0x8000000
-
- _ATOMIC_RENAME_FILE_TRANSACTION_DEFAULT_TIMEOUT = 1000
-
+ except ImportError:
+ _has_ctypes = False
+
+ _INVALID_HANDLE_VALUE = -1
+
+ _MOVEFILE_REPLACE_EXISTING = 0x1
+ _MOVEFILE_WRITE_THROUGH = 0x8
+
+ _SEM_FAILCRITICALERRORS = 0x1
+ _SEM_NOGPFAULTERRORBOX = 0x2
+ _SEM_NOALIGNMENTFAULTEXCEPT = 0x4
+ _SEM_NOOPENFILEERRORBOX = 0x8
+
+ _SYMBOLIC_LINK_FLAG_DIRECTORY = 0x1
+
+ _CREATE_NO_WINDOW = 0x8000000
+
+ _ATOMIC_RENAME_FILE_TRANSACTION_DEFAULT_TIMEOUT = 1000
+
_HANDLE_FLAG_INHERIT = 0x1
- @win_only
- def require_ctypes(f):
- def f_wrapped(*args, **kwargs):
- if not _has_ctypes:
- raise NoCTypesError('No ctypes found')
- return f(*args, **kwargs)
-
- return f_wrapped
-
- # Run function in diehard mode (see diehard decorator commentary)
- @win_only
- def run_diehard(f, winerrors, tries, delay, *args, **kwargs):
- if isinstance(winerrors, int):
- winerrors = (winerrors,)
+ @win_only
+ def require_ctypes(f):
+ def f_wrapped(*args, **kwargs):
+ if not _has_ctypes:
+ raise NoCTypesError('No ctypes found')
+ return f(*args, **kwargs)
+
+ return f_wrapped
+
+ # Run function in diehard mode (see diehard decorator commentary)
+ @win_only
+ def run_diehard(f, winerrors, tries, delay, *args, **kwargs):
+ if isinstance(winerrors, int):
+ winerrors = (winerrors,)
ei = None
- for t in xrange(tries):
- if t:
+ for t in xrange(tries):
+ if t:
logger.debug('Diehard [errs %s]: try #%d in %s', ','.join(str(x) for x in winerrors), t, f)
- try:
- return f(*args, **kwargs)
- except WindowsError as e:
- if e.winerror not in winerrors:
- raise
+ try:
+ return f(*args, **kwargs)
+ except WindowsError as e:
+ if e.winerror not in winerrors:
+ raise
ei = sys.exc_info()
- time.sleep(delay)
+ time.sleep(delay)
reraise(ei[0], ei[1], ei[2])
-
- # Placeholder for disabled functions
- @win_only
- def run_disabled(*args, **kwargs):
- raise DisabledOnWindowsError('Function called is disabled on Windows')
-
- class CustomWinError(WindowsError):
- def __init__(self, winerror, message='', filename=None):
- super(CustomWinError, self).__init__(winerror, message)
- self.message = message
- self.strerror = self.message if self.message else format_error(self.windows_error)
- self.filename = filename
- self.utf8 = True
-
- @win_only
- def unicode_path(path):
+
+ # Placeholder for disabled functions
+ @win_only
+ def run_disabled(*args, **kwargs):
+ raise DisabledOnWindowsError('Function called is disabled on Windows')
+
+ class CustomWinError(WindowsError):
+ def __init__(self, winerror, message='', filename=None):
+ super(CustomWinError, self).__init__(winerror, message)
+ self.message = message
+ self.strerror = self.message if self.message else format_error(self.windows_error)
+ self.filename = filename
+ self.utf8 = True
+
+ @win_only
+ def unicode_path(path):
return library.python.strings.to_unicode(path, library.python.strings.fs_encoding())
-
- @win_only
- @require_ctypes
- def format_error(error):
- if isinstance(error, WindowsError):
- error = error.winerror
- if not isinstance(error, int):
- return 'Unknown'
- return ctypes.FormatError(error)
-
- @win_only
- def fix_error(windows_error):
- if not windows_error.strerror:
- windows_error.strerror = format_error(windows_error)
- transcode_error(windows_error)
-
- @win_only
- def transcode_error(windows_error, to_enc='utf-8'):
+
+ @win_only
+ @require_ctypes
+ def format_error(error):
+ if isinstance(error, WindowsError):
+ error = error.winerror
+ if not isinstance(error, int):
+ return 'Unknown'
+ return ctypes.FormatError(error)
+
+ @win_only
+ def fix_error(windows_error):
+ if not windows_error.strerror:
+ windows_error.strerror = format_error(windows_error)
+ transcode_error(windows_error)
+
+ @win_only
+ def transcode_error(windows_error, to_enc='utf-8'):
from_enc = 'utf-8' if getattr(windows_error, 'utf8', False) else library.python.strings.guess_default_encoding()
- if from_enc != to_enc:
+ if from_enc != to_enc:
windows_error.strerror = library.python.strings.to_str(windows_error.strerror, to_enc=to_enc, from_enc=from_enc)
- setattr(windows_error, 'utf8', to_enc == 'utf-8')
-
- class Transaction(object):
- def __init__(self, timeout=None, description=''):
- self.timeout = timeout
- self.description = description
-
- @require_ctypes
- def __enter__(self):
- self._handle = ctypes.windll.ktmw32.CreateTransaction(None, 0, 0, 0, 0, self.timeout, self.description)
- if self._handle == _INVALID_HANDLE_VALUE:
- raise ctypes.WinError()
- return self._handle
-
- @require_ctypes
- def __exit__(self, t, v, tb):
- try:
- if not ctypes.windll.ktmw32.CommitTransaction(self._handle):
- raise ctypes.WinError()
- finally:
- ctypes.windll.kernel32.CloseHandle(self._handle)
-
- @win_only
- def file_handle(f):
- return msvcrt.get_osfhandle(f.fileno())
-
+ setattr(windows_error, 'utf8', to_enc == 'utf-8')
+
+ class Transaction(object):
+ def __init__(self, timeout=None, description=''):
+ self.timeout = timeout
+ self.description = description
+
+ @require_ctypes
+ def __enter__(self):
+ self._handle = ctypes.windll.ktmw32.CreateTransaction(None, 0, 0, 0, 0, self.timeout, self.description)
+ if self._handle == _INVALID_HANDLE_VALUE:
+ raise ctypes.WinError()
+ return self._handle
+
+ @require_ctypes
+ def __exit__(self, t, v, tb):
+ try:
+ if not ctypes.windll.ktmw32.CommitTransaction(self._handle):
+ raise ctypes.WinError()
+ finally:
+ ctypes.windll.kernel32.CloseHandle(self._handle)
+
+ @win_only
+ def file_handle(f):
+ return msvcrt.get_osfhandle(f.fileno())
+
# https://www.python.org/dev/peps/pep-0446/
# http://mihalop.blogspot.ru/2014/05/python-subprocess-and-file-descriptors.html
@require_ctypes
- @win_only
+ @win_only
def open_file(*args, **kwargs):
f = open(*args, **kwargs)
ctypes.windll.kernel32.SetHandleInformation(file_handle(f), _HANDLE_FLAG_INHERIT, 0)
return f
@win_only
- @require_ctypes
- def replace_file(src, dst):
- if not ctypes.windll.kernel32.MoveFileExW(unicode_path(src), unicode_path(dst), _MOVEFILE_REPLACE_EXISTING | _MOVEFILE_WRITE_THROUGH):
- raise ctypes.WinError()
-
- @win_only
- @require_ctypes
- def replace_file_across_devices(src, dst):
+ @require_ctypes
+ def replace_file(src, dst):
+ if not ctypes.windll.kernel32.MoveFileExW(unicode_path(src), unicode_path(dst), _MOVEFILE_REPLACE_EXISTING | _MOVEFILE_WRITE_THROUGH):
+ raise ctypes.WinError()
+
+ @win_only
+ @require_ctypes
+ def replace_file_across_devices(src, dst):
with Transaction(timeout=_ATOMIC_RENAME_FILE_TRANSACTION_DEFAULT_TIMEOUT, description='ya library.python.windows replace_file_across_devices') as transaction:
- if not ctypes.windll.kernel32.MoveFileTransactedW(unicode_path(src), unicode_path(dst), None, None, _MOVEFILE_REPLACE_EXISTING | _MOVEFILE_WRITE_THROUGH, transaction):
- raise ctypes.WinError()
-
- @win_only
- @require_ctypes
- def hardlink(src, lnk):
- if not ctypes.windll.kernel32.CreateHardLinkW(unicode_path(lnk), unicode_path(src), None):
- raise ctypes.WinError()
-
- # Requires SE_CREATE_SYMBOLIC_LINK_NAME privilege
- @win_only
- @win_disabled
- @require_ctypes
- def symlink_file(src, lnk):
- if not ctypes.windll.kernel32.CreateSymbolicLinkW(unicode_path(lnk), unicode_path(src), 0):
- raise ctypes.WinError()
-
- # Requires SE_CREATE_SYMBOLIC_LINK_NAME privilege
- @win_only
- @win_disabled
- @require_ctypes
- def symlink_dir(src, lnk):
- if not ctypes.windll.kernel32.CreateSymbolicLinkW(unicode_path(lnk), unicode_path(src), _SYMBOLIC_LINK_FLAG_DIRECTORY):
- raise ctypes.WinError()
-
- @win_only
- @require_ctypes
- def lock_file(f, offset, length, raises=True):
- locked = ctypes.windll.kernel32.LockFile(file_handle(f), _low_dword(offset), _high_dword(offset), _low_dword(length), _high_dword(length))
- if not raises:
- return bool(locked)
- if not locked:
- raise ctypes.WinError()
-
- @win_only
- @require_ctypes
- def unlock_file(f, offset, length, raises=True):
- unlocked = ctypes.windll.kernel32.UnlockFile(file_handle(f), _low_dword(offset), _high_dword(offset), _low_dword(length), _high_dword(length))
- if not raises:
- return bool(unlocked)
- if not unlocked:
- raise ctypes.WinError()
-
- @win_only
- @require_ctypes
- def set_error_mode(mode):
- return ctypes.windll.kernel32.SetErrorMode(mode)
-
+ if not ctypes.windll.kernel32.MoveFileTransactedW(unicode_path(src), unicode_path(dst), None, None, _MOVEFILE_REPLACE_EXISTING | _MOVEFILE_WRITE_THROUGH, transaction):
+ raise ctypes.WinError()
+
+ @win_only
+ @require_ctypes
+ def hardlink(src, lnk):
+ if not ctypes.windll.kernel32.CreateHardLinkW(unicode_path(lnk), unicode_path(src), None):
+ raise ctypes.WinError()
+
+ # Requires SE_CREATE_SYMBOLIC_LINK_NAME privilege
+ @win_only
+ @win_disabled
+ @require_ctypes
+ def symlink_file(src, lnk):
+ if not ctypes.windll.kernel32.CreateSymbolicLinkW(unicode_path(lnk), unicode_path(src), 0):
+ raise ctypes.WinError()
+
+ # Requires SE_CREATE_SYMBOLIC_LINK_NAME privilege
+ @win_only
+ @win_disabled
+ @require_ctypes
+ def symlink_dir(src, lnk):
+ if not ctypes.windll.kernel32.CreateSymbolicLinkW(unicode_path(lnk), unicode_path(src), _SYMBOLIC_LINK_FLAG_DIRECTORY):
+ raise ctypes.WinError()
+
+ @win_only
+ @require_ctypes
+ def lock_file(f, offset, length, raises=True):
+ locked = ctypes.windll.kernel32.LockFile(file_handle(f), _low_dword(offset), _high_dword(offset), _low_dword(length), _high_dword(length))
+ if not raises:
+ return bool(locked)
+ if not locked:
+ raise ctypes.WinError()
+
+ @win_only
+ @require_ctypes
+ def unlock_file(f, offset, length, raises=True):
+ unlocked = ctypes.windll.kernel32.UnlockFile(file_handle(f), _low_dword(offset), _high_dword(offset), _low_dword(length), _high_dword(length))
+ if not raises:
+ return bool(unlocked)
+ if not unlocked:
+ raise ctypes.WinError()
+
+ @win_only
+ @require_ctypes
+ def set_error_mode(mode):
+ return ctypes.windll.kernel32.SetErrorMode(mode)
+
@win_only
def rmtree(path):
def error_handler(func, handling_path, execinfo):
@@ -306,23 +306,23 @@ if on_win():
raise e
shutil.rmtree(path, onerror=error_handler)
- # Don't display the Windows GPF dialog if the invoked program dies.
- # http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
- @win_only
- def disable_error_dialogs():
+ # Don't display the Windows GPF dialog if the invoked program dies.
+ # http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
+ @win_only
+ def disable_error_dialogs():
set_error_mode(_SEM_NOGPFAULTERRORBOX | _SEM_FAILCRITICALERRORS)
-
- @win_only
- def default_process_creation_flags():
- return 0
-
- @require_ctypes
- def _low_dword(x):
- return ctypes.c_ulong(x & ((1 << 32) - 1))
-
- @require_ctypes
- def _high_dword(x):
- return ctypes.c_ulong((x >> 32) & ((1 << 32) - 1))
+
+ @win_only
+ def default_process_creation_flags():
+ return 0
+
+ @require_ctypes
+ def _low_dword(x):
+ return ctypes.c_ulong(x & ((1 << 32) - 1))
+
+ @require_ctypes
+ def _high_dword(x):
+ return ctypes.c_ulong((x >> 32) & ((1 << 32) - 1))
@win_only
@require_ctypes
@@ -351,14 +351,14 @@ if on_win():
assert isinstance(flag, bool)
if not ctypes.windll.kernel32.SetHandleInformation(file_handle(file), _low_dword(value), _low_dword(int(flag))):
raise ctypes.WinError()
-
- @win_only
- @require_ctypes
- def get_windows_directory():
- buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
- size = ctypes.windll.kernel32.GetWindowsDirectoryW(buf, ctypes.wintypes.MAX_PATH)
- if not size:
- raise ctypes.WinError()
- if size > ctypes.wintypes.MAX_PATH - 1:
- raise CustomWinError(ERRORS['INSUFFICIENT_BUFFER'])
- return ctypes.wstring_at(buf, size)
+
+ @win_only
+ @require_ctypes
+ def get_windows_directory():
+ buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
+ size = ctypes.windll.kernel32.GetWindowsDirectoryW(buf, ctypes.wintypes.MAX_PATH)
+ if not size:
+ raise ctypes.WinError()
+ if size > ctypes.wintypes.MAX_PATH - 1:
+ raise CustomWinError(ERRORS['INSUFFICIENT_BUFFER'])
+ return ctypes.wstring_at(buf, size)
diff --git a/library/python/windows/ut/test_windows.py b/library/python/windows/ut/test_windows.py
index bef3ec2dc5..516dfa4bb4 100644
--- a/library/python/windows/ut/test_windows.py
+++ b/library/python/windows/ut/test_windows.py
@@ -1,96 +1,96 @@
-# coding=utf-8
-
-import errno
-import os
-import pytest
-
+# coding=utf-8
+
+import errno
+import os
+import pytest
+
import library.python.strings
import library.python.windows
-
-
-def gen_error_access_denied():
+
+
+def gen_error_access_denied():
if library.python.windows.on_win():
- err = WindowsError()
- err.errno = errno.EACCES
- err.strerror = ''
+ err = WindowsError()
+ err.errno = errno.EACCES
+ err.strerror = ''
err.winerror = library.python.windows.ERRORS['ACCESS_DENIED']
- else:
- err = OSError()
- err.errno = errno.EACCES
- err.strerror = os.strerror(err.errno)
- err.filename = 'unknown/file'
- raise err
-
-
-def test_errorfix_buggy():
+ else:
+ err = OSError()
+ err.errno = errno.EACCES
+ err.strerror = os.strerror(err.errno)
+ err.filename = 'unknown/file'
+ raise err
+
+
+def test_errorfix_buggy():
@library.python.windows.errorfix
- def erroneous_func():
- gen_error_access_denied()
-
- with pytest.raises(OSError) as errinfo:
- erroneous_func()
- assert errinfo.value.errno == errno.EACCES
- assert errinfo.value.filename == 'unknown/file'
- assert isinstance(errinfo.value.strerror, basestring)
- assert errinfo.value.strerror
-
-
-def test_errorfix_explicit():
+ def erroneous_func():
+ gen_error_access_denied()
+
+ with pytest.raises(OSError) as errinfo:
+ erroneous_func()
+ assert errinfo.value.errno == errno.EACCES
+ assert errinfo.value.filename == 'unknown/file'
+ assert isinstance(errinfo.value.strerror, basestring)
+ assert errinfo.value.strerror
+
+
+def test_errorfix_explicit():
@library.python.windows.errorfix
- def erroneous_func():
+ def erroneous_func():
if library.python.windows.on_win():
- err = WindowsError()
+ err = WindowsError()
err.winerror = library.python.windows.ERRORS['ACCESS_DENIED']
- else:
- err = OSError()
- err.errno = errno.EACCES
- err.strerror = 'Some error description'
- err.filename = 'unknown/file'
- raise err
-
- with pytest.raises(OSError) as errinfo:
- erroneous_func()
- assert errinfo.value.errno == errno.EACCES
- assert errinfo.value.filename == 'unknown/file'
- assert errinfo.value.strerror == 'Some error description'
-
-
-def test_errorfix_decoding_cp1251():
+ else:
+ err = OSError()
+ err.errno = errno.EACCES
+ err.strerror = 'Some error description'
+ err.filename = 'unknown/file'
+ raise err
+
+ with pytest.raises(OSError) as errinfo:
+ erroneous_func()
+ assert errinfo.value.errno == errno.EACCES
+ assert errinfo.value.filename == 'unknown/file'
+ assert errinfo.value.strerror == 'Some error description'
+
+
+def test_errorfix_decoding_cp1251():
@library.python.windows.errorfix
- def erroneous_func():
- model_msg = u'Какое-то описание ошибки'
+ def erroneous_func():
+ model_msg = u'Какое-то описание ошибки'
if library.python.windows.on_win():
- err = WindowsError()
+ err = WindowsError()
err.strerror = library.python.strings.to_str(model_msg, 'cp1251')
- else:
- err = OSError()
+ else:
+ err = OSError()
err.strerror = library.python.strings.to_str(model_msg)
- raise err
-
- with pytest.raises(OSError) as errinfo:
- erroneous_func()
- error_msg = errinfo.value.strerror
- if not isinstance(errinfo.value.strerror, unicode):
+ raise err
+
+ with pytest.raises(OSError) as errinfo:
+ erroneous_func()
+ error_msg = errinfo.value.strerror
+ if not isinstance(errinfo.value.strerror, unicode):
error_msg = library.python.strings.to_unicode(error_msg)
- assert error_msg == u'Какое-то описание ошибки'
-
-
-def test_diehard():
+ assert error_msg == u'Какое-то описание ошибки'
+
+
+def test_diehard():
@library.python.windows.diehard(library.python.windows.ERRORS['ACCESS_DENIED'], tries=5)
- def erroneous_func(errors):
- try:
- gen_error_access_denied()
- except Exception as e:
- errors.append(e)
- raise
-
- raised_errors = []
- with pytest.raises(OSError) as errinfo:
- erroneous_func(raised_errors)
- assert errinfo.value.errno == errno.EACCES
- assert any(e.errno == errno.EACCES for e in raised_errors)
- assert raised_errors and errinfo.value == raised_errors[-1]
+ def erroneous_func(errors):
+ try:
+ gen_error_access_denied()
+ except Exception as e:
+ errors.append(e)
+ raise
+
+ raised_errors = []
+ with pytest.raises(OSError) as errinfo:
+ erroneous_func(raised_errors)
+ assert errinfo.value.errno == errno.EACCES
+ assert any(e.errno == errno.EACCES for e in raised_errors)
+ assert raised_errors and errinfo.value == raised_errors[-1]
if library.python.windows.on_win():
- assert len(raised_errors) == 5
- else:
- assert len(raised_errors) == 1
+ assert len(raised_errors) == 5
+ else:
+ assert len(raised_errors) == 1
diff --git a/library/python/windows/ut/ya.make b/library/python/windows/ut/ya.make
index c39f1797b8..992afe0259 100644
--- a/library/python/windows/ut/ya.make
+++ b/library/python/windows/ut/ya.make
@@ -2,7 +2,7 @@ OWNER(g:yatool)
PY2TEST()
-TEST_SRCS(test_windows.py)
+TEST_SRCS(test_windows.py)
PEERDIR(
library/python/windows