diff options
Diffstat (limited to 'library/python')
-rw-r--r-- | library/python/archive/__init__.py | 212 | ||||
-rw-r--r-- | library/python/cityhash/cityhash.pyx | 75 | ||||
-rw-r--r-- | library/python/cityhash/hash.cpp | 32 | ||||
-rw-r--r-- | library/python/cityhash/hash.h | 6 | ||||
-rw-r--r-- | library/python/codecs/__codecs.pyx | 61 | ||||
-rw-r--r-- | library/python/codecs/__init__.py | 1 | ||||
-rw-r--r-- | library/python/compress/__init__.py | 147 | ||||
-rw-r--r-- | library/python/json/__init__.py | 44 | ||||
-rw-r--r-- | library/python/json/loads.cpp | 246 | ||||
-rw-r--r-- | library/python/json/loads.h | 5 | ||||
-rw-r--r-- | library/python/json/loads.pyx | 14 | ||||
-rw-r--r-- | library/python/par_apply/__init__.py | 114 | ||||
-rw-r--r-- | library/python/retry/__init__.py | 239 | ||||
-rw-r--r-- | library/python/testing/custom_linter_util/linter_params.py | 81 | ||||
-rw-r--r-- | library/python/testing/custom_linter_util/reporter.py | 39 |
15 files changed, 0 insertions, 1316 deletions
diff --git a/library/python/archive/__init__.py b/library/python/archive/__init__.py deleted file mode 100644 index f5814559a6..0000000000 --- a/library/python/archive/__init__.py +++ /dev/null @@ -1,212 +0,0 @@ -import errno -import logging -import os -import random -import shutil -import stat -import string -import sys - -import six - -import libarchive -import libarchive._libarchive as _libarchive - -logger = logging.getLogger(__name__) - -GZIP = 'gzip' -ZSTD = 'zstd' - -ENCODING = 'utf-8' - - -class ConfigureError(Exception): - pass - - -class Level(object): - def __init__(self, level): - self.level = level - - -class Compression(object): - Fast = Level(1) - Default = Level(2) - Best = Level(3) - - -def get_compression_level(filter_name, l): - if l is None or not filter_name: - return None - elif isinstance(l, Level): - l = { - GZIP: { - Compression.Fast: 1, - Compression.Default: 6, - Compression.Best: 9, - }, - ZSTD: { - Compression.Fast: 1, - Compression.Default: 3, - Compression.Best: 22, - } - }[filter_name][l] - return l - - -def encode(value, encoding): - return value.encode(encoding) - - -def extract_tar(tar_file_path, output_dir): - output_dir = encode(output_dir, ENCODING) - _make_dirs(output_dir) - with libarchive.Archive(tar_file_path, mode="rb") as tarfile: - for e in tarfile: - p = e.pathname - dest = os.path.join(output_dir, encode(p, ENCODING)) - if p.endswith("/"): - _make_dirs(dest) - continue - - _make_dirs(os.path.dirname(dest)) - - if e.ishardlink(): - _hardlink(os.path.join(output_dir, e.hardlink), dest) - continue - if e.issym(): - _symlink(e.linkname, dest) - continue - - with open(dest, 'wb') as f: - if hasattr(os, 'fchmod'): - os.fchmod(f.fileno(), e.mode & 0o7777) - libarchive.call_and_check(_libarchive.archive_read_data_into_fd, tarfile._a, tarfile._a, f.fileno()) - - -def tar(paths, output, compression_filter=None, compression_level=None, fixed_mtime=None, onerror=None, postprocess=None, dereference=False): - if isinstance(paths, six.string_types): - paths = [paths] - - if isinstance(output, six.string_types): - temp_tar_path, stream = output + "." + "".join(random.sample(string.ascii_lowercase, 8)), None - else: - temp_tar_path, stream = None, output - - compression_level = get_compression_level(compression_filter, compression_level) - - try: - if compression_filter: - filter_name = compression_filter - if compression_level is not None: - filter_opts = {"compression-level": str(compression_level)} - else: - filter_opts = {} - # force gzip don't store mtime of the original file being compressed (http://www.gzip.org/zlib/rfc-gzip.html#file-format) - if fixed_mtime is not None and compression_filter == GZIP: - filter_opts["timestamp"] = "" - else: - filter_name = filter_opts = None - - with libarchive.Archive(stream or temp_tar_path, mode="wb", format="gnu", filter=filter_name, filter_opts=filter_opts, fixed_mtime=fixed_mtime) as tarfile: - # determine order if fixed_mtime is specified to produce stable archive - paths = paths if fixed_mtime is None else sorted(paths) - - for p in paths: - if type(p) == tuple: - path, arcname = p - else: - path, arcname = p, os.path.basename(p) - - if os.path.isdir(path): - for root, dirs, files in os.walk(path, followlinks=dereference): - if fixed_mtime is None: - entries = dirs + files - else: - entries = sorted(dirs) + sorted(files) - - reldir = os.path.relpath(root, path) - for f in entries: - _writepath(tarfile, os.path.join(root, f), os.path.normpath(os.path.join(arcname, reldir, f)), onerror, postprocess, dereference) - else: - if not os.path.exists(path): - raise OSError("Specified path doesn't exist: {}".format(path)) - _writepath(tarfile, path, arcname, onerror, postprocess, dereference) - - if temp_tar_path: - os.rename(temp_tar_path, output) - except Exception: - if temp_tar_path and os.path.exists(temp_tar_path): - os.remove(temp_tar_path) - raise - - -def _writepath(tarfile, src, dst, onerror, postprocess, dereference): - def tar_writepath(src, dst): - st = os.lstat(src) - if stat.S_ISREG(st.st_mode) or stat.S_ISDIR(st.st_mode) or stat.S_ISLNK(st.st_mode): - if dereference and stat.S_ISLNK(st.st_mode): - src = os.path.realpath(src) - - tarfile.writepath(src, dst) - - if postprocess: - postprocess(src, dst, st.st_mode) - else: - logger.debug("Skipping non-regular file '%s' (stat: %s)", src, st) - - try: - return tar_writepath(src, dst) - except Exception as e: - if isinstance(e, OSError) and e.errno == errno.ENOENT: - logger.debug("Skipping missing file '%s' - looks like directory content has changed during archiving", src) - return - - if onerror: - if onerror(src, dst, sys.exc_info()): - return tar_writepath(src, dst) - else: - raise - - -def check_tar(tar_file_path): - if os.path.isfile(tar_file_path) or os.path.islink(tar_file_path): - return libarchive.is_archive(tar_file_path) - return False - - -def _make_dirs(path): - try: - os.makedirs(path) - except OSError as e: - if e.errno != errno.EEXIST or not os.path.isdir(path): - raise - - -def _hardlink(src, dst): - if hasattr(os, 'link'): - os.link(src, dst) - else: - shutil.copyfile(src, dst) - - -def _symlink(src, dst): - if hasattr(os, 'symlink'): - os.symlink(src, dst) - else: - # Windows specific case - we cannot copy file right now, - # because it doesn't exist yet (and would be met later in the archive) or symlink is broken. - # Act like tar and tarfile - skip such symlinks - if os.path.exists(src): - shutil.copytree(src, dst) - - -def get_archive_filter_name(filename): - filters = libarchive.get_archive_filter_names(filename) - # https://a.yandex-team.ru/arc/trunk/arcadia/contrib/libs/libarchive/libarchive/archive_read.c?rev=5800047#L522 - assert filters[-1] == 'none', filters - if len(filters) == 1: - return None - if len(filters) == 2: - return filters[0] - raise Exception("Archive has chain of filter: {}".format(filters)) diff --git a/library/python/cityhash/cityhash.pyx b/library/python/cityhash/cityhash.pyx deleted file mode 100644 index c1868300cc..0000000000 --- a/library/python/cityhash/cityhash.pyx +++ /dev/null @@ -1,75 +0,0 @@ -from libcpp.pair cimport pair - -cdef extern from "util/system/types.h": - ctypedef unsigned long ui64 - - -cdef extern from "util/digest/city.h": - ui64 CityHash64(const char* buf, size_t len) nogil - pair[ui64, ui64] CityHash128(const char* buf, size_t len) nogil - ui64 CityHash64WithSeed(const char* buf, size_t len, ui64 seed) nogil - - -cdef extern from "library/python/cityhash/hash.h": - ui64 FileCityHash128WithSeedHigh64(const char* fpath) nogil - ui64 FileCityHash64(const char* fpath) nogil - - -def hash64(content): - cdef const char* s = content - cdef size_t size = len(content) - cdef ui64 res = 0 - - if size > 128: - with nogil: - res = CityHash64(s, size) - else: - res = CityHash64(s, size) - - return res - -def hash128(content): - cdef const char* s = content - cdef size_t size = len(content) - cdef pair[ui64, ui64] res = pair[ui64, ui64](0, 0) - - if size > 128: - with nogil: - res = CityHash128(s, size) - else: - res = CityHash128(s, size) - return res - - -def hash64seed(content, seed): - cdef const char* s = content - cdef size_t size = len(content) - cdef ui64 _seed = seed; - - if size > 128: - with nogil: - res = CityHash64WithSeed(s, size, _seed) - else: - res = CityHash64WithSeed(s, size, _seed) - - return res - - -def filehash64(path): - cdef const char* p = path - cdef ui64 res = 0 - - with nogil: - res = FileCityHash64(p) - - return res - - -def filehash128high64(path): - cdef const char* p = path - cdef ui64 res = 0 - - with nogil: - res = FileCityHash128WithSeedHigh64(p) - - return res diff --git a/library/python/cityhash/hash.cpp b/library/python/cityhash/hash.cpp deleted file mode 100644 index 17bd3a75f3..0000000000 --- a/library/python/cityhash/hash.cpp +++ /dev/null @@ -1,32 +0,0 @@ -#include "hash.h" - -#include <util/digest/city.h> -#include <util/generic/string.h> -#include <util/memory/blob.h> -#include <util/system/file.h> -#include <util/system/fstat.h> - -void ReadFile(const char* fpath, TBlob& blob) { - TFile f(TString{fpath}, RdOnly | Seq); - const TFileStat fs(f); - auto size = fs.Size; - - if (size < (64 << 10)) { - blob = TBlob::FromFileContent(f, 0, size); - } else { - blob = TBlob::FromFile(f); - } -} - -ui64 FileCityHash128WithSeedHigh64(const char* fpath) { - TBlob blob; - ReadFile(fpath, blob); - const uint128 hash = CityHash128WithSeed((const char*)blob.Data(), blob.Size(), uint128(0, blob.Size())); - return Uint128High64(hash); -} - -ui64 FileCityHash64(const char* fpath) { - TBlob blob; - ReadFile(fpath, blob); - return CityHash64(static_cast<const char*>(blob.Data()), blob.Size()); -} diff --git a/library/python/cityhash/hash.h b/library/python/cityhash/hash.h deleted file mode 100644 index 64b22ba74b..0000000000 --- a/library/python/cityhash/hash.h +++ /dev/null @@ -1,6 +0,0 @@ -#pragma once - -#include <util/system/defaults.h> - -ui64 FileCityHash128WithSeedHigh64(const char* fpath); -ui64 FileCityHash64(const char* fpath); diff --git a/library/python/codecs/__codecs.pyx b/library/python/codecs/__codecs.pyx deleted file mode 100644 index 42ec37fe88..0000000000 --- a/library/python/codecs/__codecs.pyx +++ /dev/null @@ -1,61 +0,0 @@ -import six - -from libcpp cimport bool - -from util.generic.string cimport TString, TStringBuf - - -def to_bytes(s): - try: - return s.encode('utf-8') - except AttributeError: - pass - - return s - - -def from_bytes(s): - if six.PY3: - return s.decode('utf-8') - - return s - - -cdef extern from "library/cpp/blockcodecs/codecs.h" namespace "NBlockCodecs": - cdef cppclass ICodec: - void Encode(TStringBuf data, TString& res) nogil - void Decode(TStringBuf data, TString& res) nogil - - cdef const ICodec* Codec(const TStringBuf& name) except + - cdef TString ListAllCodecsAsString() except + - - -def dumps(name, data): - name = to_bytes(name) - - cdef const ICodec* codec = Codec(TStringBuf(name, len(name))) - cdef TString res - cdef TStringBuf cdata = TStringBuf(data, len(data)) - - with nogil: - codec.Encode(cdata, res) - - return res.c_str()[:res.length()] - - -def loads(name, data): - name = to_bytes(name) - - cdef const ICodec* codec = Codec(TStringBuf(name, len(name))) - cdef TString res - cdef TStringBuf cdata = TStringBuf(data, len(data)) - - with nogil: - codec.Decode(cdata, res) - - return res.c_str()[:res.length()] - -def list_all_codecs(): - cdef TString res = ListAllCodecsAsString() - - return from_bytes(res.c_str()[:res.length()]).split(',') diff --git a/library/python/codecs/__init__.py b/library/python/codecs/__init__.py deleted file mode 100644 index b9fb00deb0..0000000000 --- a/library/python/codecs/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from __codecs import loads, dumps, list_all_codecs # noqa diff --git a/library/python/compress/__init__.py b/library/python/compress/__init__.py deleted file mode 100644 index 380ec47dca..0000000000 --- a/library/python/compress/__init__.py +++ /dev/null @@ -1,147 +0,0 @@ -from io import open - -import struct -import json -import os -import logging - -import library.python.par_apply as lpp -import library.python.codecs as lpc - - -logger = logging.getLogger('compress') - - -def list_all_codecs(): - return sorted(frozenset(lpc.list_all_codecs())) - - -def find_codec(ext): - def ext_compress(x): - return lpc.dumps(ext, x) - - def ext_decompress(x): - return lpc.loads(ext, x) - - ext_decompress(ext_compress(b'')) - - return {'c': ext_compress, 'd': ext_decompress, 'n': ext} - - -def codec_for(path): - for ext in reversed(path.split('.')): - try: - return find_codec(ext) - except Exception as e: - logger.debug('in codec_for(): %s', e) - - raise Exception('unsupported file %s' % path) - - -def compress(fr, to, codec=None, fopen=open, threads=1): - if codec: - codec = find_codec(codec) - else: - codec = codec_for(to) - - func = codec['c'] - - def iter_blocks(): - with fopen(fr, 'rb') as f: - while True: - chunk = f.read(16 * 1024 * 1024) - - if chunk: - yield chunk - else: - yield b'' - - return - - def iter_results(): - info = { - 'codec': codec['n'], - } - - if fr: - info['size'] = os.path.getsize(fr) - - yield json.dumps(info, sort_keys=True) + '\n' - - for c in lpp.par_apply(iter_blocks(), func, threads): - yield c - - with fopen(to, 'wb') as f: - for c in iter_results(): - logger.debug('complete %s', len(c)) - f.write(struct.pack('<I', len(c))) - - try: - f.write(c) - except TypeError: - f.write(c.encode('utf-8')) - - -def decompress(fr, to, codec=None, fopen=open, threads=1): - def iter_chunks(): - with fopen(fr, 'rb') as f: - cnt = 0 - - while True: - ll = f.read(4) - - if ll: - ll = struct.unpack('<I', ll)[0] - - if ll: - if ll > 100000000: - raise Exception('broken stream') - - yield f.read(ll) - - cnt += ll - else: - if not cnt: - raise Exception('empty stream') - - return - - it = iter_chunks() - extra = [] - - for chunk in it: - hdr = {} - - try: - hdr = json.loads(chunk) - except Exception as e: - logger.info('can not parse header, suspect old format: %s', e) - extra.append(chunk) - - break - - def resolve_codec(): - if 'codec' in hdr: - return find_codec(hdr['codec']) - - if codec: - return find_codec(codec) - - return codec_for(fr) - - dc = resolve_codec()['d'] - - def iter_all_chunks(): - for x in extra: - yield x - - for x in it: - yield x - - with fopen(to, 'wb') as f: - for c in lpp.par_apply(iter_all_chunks(), dc, threads): - if c: - logger.debug('complete %s', len(c)) - f.write(c) - else: - break diff --git a/library/python/json/__init__.py b/library/python/json/__init__.py deleted file mode 100644 index c6420d5e6d..0000000000 --- a/library/python/json/__init__.py +++ /dev/null @@ -1,44 +0,0 @@ -from library.python.json.loads import loads as _loads -from simplejson import loads as _sj_loads - - -def loads(*args, **kwargs): - try: - return _loads(*args, **kwargs) - except Exception as e: - if 'invalid syntax at token' in str(e): - kwargs.pop('intern_keys', None) - kwargs.pop('intern_vals', None) - kwargs.pop('may_unicode', None) - return _sj_loads(*args, **kwargs) - - raise - - -from simplejson import load, dump, dumps # noqa - - -def read_file(file_name, **kwargs): - """ - Read file and return its parsed json contents. - - All kwargs will be proxied to `json.load` method as is. - - :param file_name: file with json contents - :return: parsed json contents - """ - with open(file_name) as f: - return load(f, **kwargs) - - -def write_file(file_name, contents, **kwargs): - """ - Dump json data to file. - - All kwargs will be proxied to `json.dump` method as is. - - :param file_name: file to dump to - :param contents: JSON-serializable object - """ - with open(file_name, "w") as f: - dump(contents, f, **kwargs) diff --git a/library/python/json/loads.cpp b/library/python/json/loads.cpp deleted file mode 100644 index 19cdb096ae..0000000000 --- a/library/python/json/loads.cpp +++ /dev/null @@ -1,246 +0,0 @@ -#include "loads.h" - -#include <Python.h> - -#include <library/cpp/json/fast_sax/parser.h> - -#include <util/generic/algorithm.h> -#include <util/generic/stack.h> -#include <util/generic/vector.h> -#include <util/generic/ylimits.h> -#include <util/string/ascii.h> - -using namespace NJson; - -namespace { - enum EKind { - Undefined, - Array, - Dict, - Value, - Key, - }; - - static inline TStringBuf ToStr(EKind kind) noexcept { - switch (kind) { - case Undefined: - return TStringBuf("Undefined"); - - case Array: - return TStringBuf("Array"); - - case Dict: - return TStringBuf("Dict"); - - case Value: - return TStringBuf("Value"); - - case Key: - return TStringBuf("Key"); - } - - Y_UNREACHABLE(); - } - - struct TUnref { - static inline void Destroy(PyObject* o) noexcept { - Py_XDECREF(o); - } - }; - - using TObjectPtr = TAutoPtr<PyObject, TUnref>; - - static inline TObjectPtr BuildBool(bool val) noexcept { - if (val) { - Py_RETURN_TRUE; - } - - Py_RETURN_FALSE; - } - - // Translate python exceptions from object-creating functions into c++ exceptions - // Such errors are reported by returning nullptr - // When a python error is set and C++ exception is caught by Cython wrapper, - // Python exception is propagated, while C++ exception is discarded. - PyObject* CheckNewObject(PyObject* obj) { - Y_ENSURE(obj != nullptr, "got python exception"); - return obj; - } - - void CheckRetcode(int retcode) { - Y_ENSURE(retcode == 0, "got python exception"); - } - - static inline TObjectPtr BuildSmall(long val) { -#if PY_VERSION_HEX >= 0x03000000 - return CheckNewObject(PyLong_FromLong(val)); -#else - return CheckNewObject(PyInt_FromLong(val)); -#endif - } - - PyObject* CreatePyString(TStringBuf str, bool intern, bool mayUnicode) { -#if PY_VERSION_HEX >= 0x03000000 - Y_UNUSED(mayUnicode); - PyObject* pyStr = PyUnicode_FromStringAndSize(str.data(), str.size()); - if (intern) { - PyUnicode_InternInPlace(&pyStr); - } -#else - const bool needUnicode = mayUnicode && !AllOf(str, IsAscii); - PyObject* pyStr = needUnicode ? PyUnicode_FromStringAndSize(str.data(), str.size()) - : PyString_FromStringAndSize(str.data(), str.size()); - if (intern && !needUnicode) { - PyString_InternInPlace(&pyStr); - } -#endif - return pyStr; - } - - struct TVal { - EKind Kind = Undefined; - TObjectPtr Val; - - inline TVal() noexcept - : Kind(Undefined) - { - } - - inline TVal(EKind kind, TObjectPtr val) noexcept - : Kind(kind) - , Val(val) - { - } - }; - - static inline TObjectPtr NoneRef() noexcept { - Py_RETURN_NONE; - } - - struct TContext: public TJsonCallbacks { - const bool InternKeys; - const bool InternVals; - const bool MayUnicode; - TStack<TVal, TVector<TVal>> S; - - inline TContext(bool internKeys, bool internVals, bool mayUnicode) - : TJsonCallbacks(true) - , InternKeys(internKeys) - , InternVals(internVals) - , MayUnicode(mayUnicode) - { - S.emplace(); - } - - inline bool Consume(TObjectPtr o) { - auto& t = S.top(); - - if (t.Kind == Array) { - CheckRetcode(PyList_Append(t.Val.Get(), o.Get())); - } else if (t.Kind == Key) { - auto key = S.top().Val; - - S.pop(); - - CheckRetcode(PyDict_SetItem(S.top().Val.Get(), key.Get(), o.Get())); - } else { - t = TVal(Value, o); - } - - return true; - } - - inline TObjectPtr Pop(EKind expect) { - auto res = S.top(); - - S.pop(); - - if (res.Kind != expect) { - ythrow yexception() << "unexpected kind(expect " << ToStr(expect) << ", got " << ToStr(res.Kind) << ")"; - } - - return res.Val; - } - - inline void Push(EKind kind, TObjectPtr object) { - S.push(TVal(kind, object)); - } - - virtual bool OnNull() { - return Consume(NoneRef()); - } - - virtual bool OnBoolean(bool v) { - return Consume(BuildBool(v)); - } - - virtual bool OnInteger(long long v) { - if (v >= (long long)Min<long>()) { - return Consume(BuildSmall((long)v)); - } - - return Consume(CheckNewObject(PyLong_FromLongLong(v))); - } - - virtual bool OnUInteger(unsigned long long v) { - if (v <= (unsigned long long)Max<long>()) { - return Consume(BuildSmall((long)v)); - } - - return Consume(CheckNewObject(PyLong_FromUnsignedLongLong(v))); - } - - virtual bool OnDouble(double v) { - return Consume(CheckNewObject(PyFloat_FromDouble(v))); - } - - virtual bool OnString(const TStringBuf& v) { - return Consume(CheckNewObject(CreatePyString(v, InternVals, MayUnicode))); - } - - virtual bool OnOpenMap() { - Push(Dict, CheckNewObject(PyDict_New())); - - return true; - } - - virtual bool OnCloseMap() { - return Consume(Pop(Dict)); - } - - virtual bool OnMapKey(const TStringBuf& k) { - Push(Key, CheckNewObject(CreatePyString(k, InternKeys, MayUnicode))); - return true; - } - - virtual bool OnOpenArray() { - Push(Array, CheckNewObject(PyList_New(0))); - - return true; - } - - virtual bool OnCloseArray() { - return Consume(Pop(Array)); - } - }; -} - -PyObject* LoadJsonFromString(const char* data, size_t len, bool internKeys, bool internVals, bool mayUnicode) { - TContext ctx(internKeys, internVals, mayUnicode); - - if (!len) { - ythrow yexception() << "parse error: zero length input string"; - } - - if (!NJson::ReadJsonFast(TStringBuf(data, len), &ctx)) { - ythrow yexception() << "parse error"; - } - - auto& s = ctx.S; - - if (!s || s.top().Kind != Value) { - ythrow yexception() << "shit happen"; - } - - return s.top().Val.Release(); -} diff --git a/library/python/json/loads.h b/library/python/json/loads.h deleted file mode 100644 index 62dcdf6f21..0000000000 --- a/library/python/json/loads.h +++ /dev/null @@ -1,5 +0,0 @@ -#pragma once - -#include <Python.h> - -PyObject* LoadJsonFromString(const char* data, size_t len, bool internKeys = false, bool internVals = false, bool mayUnicode = false); diff --git a/library/python/json/loads.pyx b/library/python/json/loads.pyx deleted file mode 100644 index 82e5c6dce7..0000000000 --- a/library/python/json/loads.pyx +++ /dev/null @@ -1,14 +0,0 @@ -from libcpp cimport bool - -cdef extern from "library/python/json/loads.h": - object LoadJsonFromString(const char*, size_t, bool internKeys, bool internVals, bool mayUnicode) except + - - -def loads(s, intern_keys = False, intern_vals = False, may_unicode = False): - if isinstance(s, unicode): - s = s.encode('utf-8') - - try: - return LoadJsonFromString(s, len(s), intern_keys, intern_vals, may_unicode) - except Exception as e: - raise ValueError(str(e)) diff --git a/library/python/par_apply/__init__.py b/library/python/par_apply/__init__.py deleted file mode 100644 index 19b89ae843..0000000000 --- a/library/python/par_apply/__init__.py +++ /dev/null @@ -1,114 +0,0 @@ -import sys -import threading -import six - -from six.moves import queue - - -def par_apply(seq, func, thr_num, join_polling=None): - if thr_num < 2: - for x in seq: - yield func(x) - - return - - in_q = queue.Queue() - out_q = queue.Queue() - - def enumerate_blocks(): - n = 0 - - for b in seq: - yield n, [b] - n += 1 - - yield n, None - - def iter_out(): - n = 0 - d = {} - - while True: - if n in d: - r = d[n] - del d[n] - n += 1 - - yield r - else: - res = out_q.get() - - d[res[0]] = res - - out_iter = iter_out() - - def wait_block(): - for x in out_iter: - return x - - def iter_compressed(): - p = 0 - - for n, b in enumerate_blocks(): - in_q.put((n, b)) - - while n > p + (thr_num * 2): - p, b, c = wait_block() - - if not b: - return - - yield p, c - - while True: - p, b, c = wait_block() - - if not b: - return - - yield p, c - - def proc(): - while True: - data = in_q.get() - - if data is None: - return - - n, b = data - - if b: - try: - res = (func(b[0]), None) - except Exception: - res = (None, sys.exc_info()) - else: - res = (None, None) - - out_q.put((n, b, res)) - - thrs = [threading.Thread(target=proc) for i in range(0, thr_num)] - - for t in thrs: - t.start() - - try: - for p, c in iter_compressed(): - res, err = c - - if err: - six.reraise(*err) - - yield res - finally: - for t in thrs: - in_q.put(None) - - for t in thrs: - if join_polling is not None: - while True: - t.join(join_polling) - if not t.is_alive(): - break - else: - t.join() diff --git a/library/python/retry/__init__.py b/library/python/retry/__init__.py deleted file mode 100644 index 139c7dcbde..0000000000 --- a/library/python/retry/__init__.py +++ /dev/null @@ -1,239 +0,0 @@ -import copy -import datetime -import functools -import itertools -import logging -import random -import time - - -""" -Retry library provides an ability to retry function calls in a configurable way. - -To retry a certain function call use `retry_call` function. To make function auto-retriable use `retry` -or `retry_intrusive` decorators. Both `retry_call` and `retry` optionally accept retry configuration object -or its fields as kwargs. The `retry_intrusive` is designed for methods and uses intrusive configuration object. - ->>> retry_call(foo) ->>> retry_call(foo, foo_args, foo_kwargs) ->>> retry_call(foo, foo_args, foo_kwargs, conf=conf) - ->>> @retry() ->>> def foo(...): ->>> ... - ->>> @retry(conf) ->>> def foo(...): ->>> ... - ->>> class Obj(object): ->>> def __init__(self): ->>> self.retry_conf = conf ->>> ->>> @retry_intrusive ->>> def foo(self, ...): ->>> ... - -This library differs from its alternatives: - * `retry` contrib library lacks time-based limits, reusable configuration objects and is generally less flexible - * `retrying` contrib library is somewhat more complex, but also lacks reusable configuration objects -""" - - -DEFAULT_SLEEP_FUNC = time.sleep -LOGGER = logging.getLogger(__name__) - - -class RetryConf(object): - """ - Configuration object defines retry behaviour and is composed of these fields: - * `retriable` - function that decides if an exception should trigger retrying - * `get_delay` - function that returns a number of seconds retrier must wait before doing the next attempt - * `max_time` - maximum `datetime.timedelta` that can pass after the first call for any retry attempt to be done - * `max_times` - maximum number of retry attempts (note retries, not tries/calls) - * `handle_error` - function that is called for each failed call attempt - * `logger` - logger object to record retry warnings with - * `sleep` - custom sleep function to use for waiting - - >>> RetryConf(max_time=datetime.timedelta(seconds=30), max_times=10) - - Empty configuration retries indefinitely on any exceptions raised. - - By default `DEFAULT_CONF` if used, which retries indefinitely, waiting 1 sec with 1.2 backoff between attempts, and - also logging with built-in logger object. - - Configuration must be cloned before modification to create separate configuration: - - >>> DEFAULT_CONF.clone() - - There are various methods that provide convenient clone-and-modify shortcuts and "retry recipes". - """ - - _PROPS = { - 'retriable': lambda e: True, - 'get_delay': lambda n, raised_after, last: 0, - 'max_time': None, - 'max_times': None, - 'handle_error': None, - 'logger': None, - 'sleep': DEFAULT_SLEEP_FUNC, - } - - def __init__(self, **kwargs): - for prop, default_value in self._PROPS.items(): - setattr(self, prop, default_value) - self._set(**kwargs) - - def __repr__(self): - return repr(self.__dict__) - - def clone(self, **kwargs): - """ - Clone configuration. - """ - - obj = copy.copy(self) - obj._set(**kwargs) - return obj - - def on(self, *errors): - """ - Clone and retry on specific exception types (retriable shortcut): - - >>> conf = conf.on(MyException, MyOtherException) - """ - - obj = self.clone() - obj.retriable = lambda e: isinstance(e, errors) - return obj - - def waiting(self, delay=0, backoff=1.0, jitter=0, limit=None): - """ - Clone and wait between attempts with backoff, jitter and limit (get_delay shortcut): - - >>> conf = conf.waiting(delay) - >>> conf = conf.waiting(delay, backoff=2.0) # initial delay with backoff x2 on each attempt - >>> conf = conf.waiting(delay, jitter=3) # jitter from 0 to 3 seconds - >>> conf = conf.waiting(delay, backoff=2.0, limit=60) # delay with backoff, but not greater than a minute - - All these options can be combined together, of course. - """ - - def get_delay(n, raised_after, last): - if n == 1: - return delay - - s = last * backoff - s += random.uniform(0, jitter) - if limit is not None: - s = min(s, limit) - return s - - obj = self.clone() - obj.get_delay = get_delay - return obj - - def upto(self, seconds=0, **other_timedelta_kwargs): - """ - Clone and do retry attempts only for some time (max_time shortcut): - - >>> conf = conf.upto(30) # retrying for 30 seconds - >>> conf = conf.upto(hours=1, minutes=20) # retrying for 1:20 - - Any `datetime.timedelta` kwargs can be used here. - """ - - obj = self.clone() - obj.max_time = datetime.timedelta(seconds=seconds, **other_timedelta_kwargs) - return obj - - def upto_retries(self, retries=0): - """ - Set limit for retry attempts number (max_times shortcut): - - >>> conf = conf.upto_retries(10) - """ - - obj = self.clone() - obj.max_times = retries - return obj - - def _set(self, **kwargs): - for prop, value in kwargs.items(): - if prop not in self._PROPS: - continue - setattr(self, prop, value) - - -DEFAULT_CONF = RetryConf(logger=LOGGER).waiting(1, backoff=1.2) - - -def retry_call(f, f_args=(), f_kwargs={}, conf=DEFAULT_CONF, **kwargs): - """ - Retry function call. - - :param f: function to be retried - :param f_args: target function args - :param f_kwargs: target function kwargs - :param conf: configuration - """ - - if kwargs: - conf = conf.clone(**kwargs) - return _retry(conf, functools.partial(f, *f_args, **f_kwargs)) - - -def retry(conf=DEFAULT_CONF, **kwargs): - """ - Retrying decorator. - - :param conf: configuration - """ - - if kwargs: - conf = conf.clone(**kwargs) - - def decorator(f): - @functools.wraps(f) - def wrapped(*f_args, **f_kwargs): - return _retry(conf, functools.partial(f, *f_args, **f_kwargs)) - return wrapped - return decorator - - -def retry_intrusive(f): - """ - Retrying method decorator that uses an intrusive conf (obj.retry_conf). - """ - - @functools.wraps(f) - def wrapped(obj, *f_args, **f_kwargs): - assert hasattr(obj, 'retry_conf'), 'Object must have retry_conf attribute for decorator to run' - return _retry(obj.retry_conf, functools.partial(f, obj, *f_args, **f_kwargs)) - return wrapped - - -def _retry(conf, f): - start = datetime.datetime.now() - delay = 0 - for n in itertools.count(1): - try: - return f() - except Exception as error: - raised_after = datetime.datetime.now() - start - if conf.handle_error: - conf.handle_error(error, n, raised_after) - delay = conf.get_delay(n, raised_after, delay) - retry_after = raised_after + datetime.timedelta(seconds=delay) - retrying = conf.retriable(error) \ - and (conf.max_times is None or n <= conf.max_times) \ - and (conf.max_time is None or retry_after <= conf.max_time) - if not retrying: - raise - if delay: - conf.sleep(delay) - if conf.logger: - conf.logger.warning('Retrying (try %d) after %s (%s + %s sec) on %s: %s', - n, retry_after, raised_after, delay, - error.__class__.__name__, error, - exc_info=True) diff --git a/library/python/testing/custom_linter_util/linter_params.py b/library/python/testing/custom_linter_util/linter_params.py deleted file mode 100644 index 522e6da9ed..0000000000 --- a/library/python/testing/custom_linter_util/linter_params.py +++ /dev/null @@ -1,81 +0,0 @@ -import argparse -import json -from dataclasses import dataclass -from typing import Optional - - -@dataclass -class LinterArgs: - source_root: str - project_path: str - output_path: str - lint_name: str - depends: dict[str, str] - global_resources: dict[str, str] - configs: list[str] - extra_params: dict[str, str] - report_file: str - files: list[str] - - -def get_params(raw_args: Optional[list[str]] = None) -> LinterArgs: - parser = argparse.ArgumentParser() - parser.add_argument("--params") - parser.add_argument("--source-root") - parser.add_argument("--project-path") - parser.add_argument("--output-path") - parser.add_argument("--lint-name", default="") - parser.add_argument("--depends", action="append") - parser.add_argument("--global-resource", action="append", dest="global_resources") - parser.add_argument("--config", action="append", dest="configs") - parser.add_argument("--extra-param", action="append", dest="extra_params") - parser.add_argument("--report-file", default="-") - parser.add_argument("files", nargs="*") - args = parser.parse_args(raw_args) - - if args.params: - with open(args.params) as f: - params = json.load(f) - source_root = params["source_root"] - project_path = params["project_path"] - output_path = params["output_path"] - lint_name = params.get("lint_name", "") - depends = params.get("depends", {}) - global_resources = params.get("global_resources", {}) - configs = params.get("configs", []) - extra_params = params.get("extra_params", {}) - report_file = params["report_file"] - files = params["files"] - else: - source_root = args.source_root - project_path = args.project_path - output_path = args.output_path - lint_name = args.lint_name - depends = _parse_kv_arg(args.depends, ":") - global_resources = _parse_kv_arg(args.global_resources, ":") - configs = args.configs if args.configs else [] - extra_params = _parse_kv_arg(args.extra_params, "=") - report_file = args.report_file - files = args.files - - return LinterArgs( - source_root=source_root, - project_path=project_path, - output_path=output_path, - lint_name=lint_name, - depends=depends, - global_resources=global_resources, - configs=configs, - extra_params=extra_params, - report_file=report_file, - files=files, - ) - - -def _parse_kv_arg(arg, sep): - result = {} - if arg: - for item in arg: - var, val = item.split(sep, 1) - result[var] = val - return result diff --git a/library/python/testing/custom_linter_util/reporter.py b/library/python/testing/custom_linter_util/reporter.py deleted file mode 100644 index 8ae559bca0..0000000000 --- a/library/python/testing/custom_linter_util/reporter.py +++ /dev/null @@ -1,39 +0,0 @@ -import json -import sys -from enum import Enum -from typing import Optional - - -class LintStatus(Enum): - GOOD = "GOOD" - FAIL = "FAIL" - SKIPPED = "SKIPPED" - - -class LintReport(): - def __init__(self): - self._report = {} - - def add(self, file_name: str, status: LintStatus, message: str = "", elapsed: float = 0.0): - self._report[file_name] = { - "status": status.value, - "message": message, - "elapsed": elapsed, - } - - def dump(self, report_file, pretty: Optional[bool] = None): - data = { - "report": self._report, - } - if report_file == "-": - if pretty is None: - pretty = True - self._do_dump(sys.stdout, data, pretty) - else: - with open(report_file, "w") as f: - self._do_dump(f, data, pretty) - - @staticmethod - def _do_dump(dest, data, pretty): - indent = 4 if pretty else None - json.dump(data, dest, indent=indent) |