diff options
author | thegeorg <thegeorg@yandex-team.com> | 2023-10-03 11:19:48 +0300 |
---|---|---|
committer | thegeorg <thegeorg@yandex-team.com> | 2023-10-03 11:43:28 +0300 |
commit | cda0c13f23f6b169fb0a49dc504b40a0aaecea09 (patch) | |
tree | 26476e92e5af2c856e017afb1df8f8dff42495bf /library/python | |
parent | 4854116da9c5e3c95bb8440f2ea997c54b6e1a61 (diff) | |
download | ydb-cda0c13f23f6b169fb0a49dc504b40a0aaecea09.tar.gz |
Move contrib/tools/jdk to build/platform/java/jdk/testing
Diffstat (limited to 'library/python')
24 files changed, 1406 insertions, 1 deletions
diff --git a/library/python/archive/__init__.py b/library/python/archive/__init__.py new file mode 100644 index 0000000000..a6e032ff4c --- /dev/null +++ b/library/python/archive/__init__.py @@ -0,0 +1,266 @@ +import errno +import logging +import os +import random +import shutil +import stat +import string +import sys + +import six + +import libarchive +import libarchive._libarchive as _libarchive + +from pathlib2 import PurePath + +logger = logging.getLogger(__name__) + +GZIP = "gzip" +ZSTD = "zstd" + +ENCODING = "utf-8" + + +class ConfigureError(Exception): + pass + + +class Level(object): + def __init__(self, level): + self.level = level + + +class Compression(object): + Fast = Level(1) + Default = Level(2) + Best = Level(3) + + +def get_compression_level(filter_name, level): + if level is None or not filter_name: + return None + elif isinstance(level, Level): + level = { + GZIP: { + Compression.Fast: 1, + Compression.Default: 6, + Compression.Best: 9, + }, + ZSTD: { + Compression.Fast: 1, + Compression.Default: 3, + Compression.Best: 22, + }, + }[filter_name][level] + return level + + +def encode(value, encoding): + return value.encode(encoding) + + +def extract_tar(tar_file_path, output_dir, strip_components=None, fail_on_duplicates=True): + output_dir = encode(output_dir, ENCODING) + _make_dirs(output_dir) + with libarchive.Archive(tar_file_path, mode="rb") as tarfile: + for e in tarfile: + p = _strip_prefix(e.pathname, strip_components) + if not p: + continue + dest = os.path.join(output_dir, encode(p, ENCODING)) + if e.pathname.endswith("/"): + _make_dirs(dest) + continue + + if strip_components and fail_on_duplicates: + if os.path.exists(dest): + raise Exception( + "The file {} is duplicated because of strip_components={}".format(dest, strip_components) + ) + + _make_dirs(os.path.dirname(dest)) + + if e.ishardlink(): + src = os.path.join(output_dir, _strip_prefix(e.hardlink, strip_components)) + _hardlink(src, dest) + continue + if e.issym(): + src = _strip_prefix(e.linkname, strip_components) + _symlink(src, dest) + continue + + with open(dest, "wb") as f: + if hasattr(os, "fchmod"): + os.fchmod(f.fileno(), e.mode & 0o7777) + libarchive.call_and_check( + _libarchive.archive_read_data_into_fd, + tarfile._a, + tarfile._a, + f.fileno(), + ) + + +def _strip_prefix(path, strip_components): + if not strip_components: + return path + p = PurePath(path) + stripped = str(p.relative_to(*p.parts[:strip_components])) + return '' if stripped == '.' else stripped + + +def tar( + paths, + output, + compression_filter=None, + compression_level=None, + fixed_mtime=None, + onerror=None, + postprocess=None, + dereference=False, +): + if isinstance(paths, six.string_types): + paths = [paths] + + if isinstance(output, six.string_types): + temp_tar_path, stream = ( + output + "." + "".join(random.sample(string.ascii_lowercase, 8)), + None, + ) + else: + temp_tar_path, stream = None, output + + compression_level = get_compression_level(compression_filter, compression_level) + + try: + if compression_filter: + filter_name = compression_filter + if compression_level is not None: + filter_opts = {"compression-level": str(compression_level)} + else: + filter_opts = {} + # force gzip don't store mtime of the original file being compressed (http://www.gzip.org/zlib/rfc-gzip.html#file-format) + if fixed_mtime is not None and compression_filter == GZIP: + filter_opts["timestamp"] = "" + else: + filter_name = filter_opts = None + + with libarchive.Archive( + stream or temp_tar_path, + mode="wb", + format="gnu", + filter=filter_name, + filter_opts=filter_opts, + fixed_mtime=fixed_mtime, + ) as tarfile: + # determine order if fixed_mtime is specified to produce stable archive + paths = paths if fixed_mtime is None else sorted(paths) + + for p in paths: + if type(p) == tuple: + path, arcname = p + else: + path, arcname = p, os.path.basename(p) + + if os.path.isdir(path): + for root, dirs, files in os.walk(path, followlinks=dereference): + if fixed_mtime is None: + entries = dirs + files + else: + entries = sorted(dirs) + sorted(files) + + reldir = os.path.relpath(root, path) + for f in entries: + _writepath( + tarfile, + os.path.join(root, f), + os.path.normpath(os.path.join(arcname, reldir, f)), + onerror, + postprocess, + dereference, + ) + else: + if not os.path.exists(path): + raise OSError("Specified path doesn't exist: {}".format(path)) + _writepath(tarfile, path, arcname, onerror, postprocess, dereference) + + if temp_tar_path: + os.rename(temp_tar_path, output) + except Exception: + if temp_tar_path and os.path.exists(temp_tar_path): + os.remove(temp_tar_path) + raise + + +def _writepath(tarfile, src, dst, onerror, postprocess, dereference): + def tar_writepath(src, dst): + st = os.lstat(src) + if stat.S_ISREG(st.st_mode) or stat.S_ISDIR(st.st_mode) or stat.S_ISLNK(st.st_mode): + if dereference and stat.S_ISLNK(st.st_mode): + src = os.path.realpath(src) + + tarfile.writepath(src, dst) + + if postprocess: + postprocess(src, dst, st.st_mode) + else: + logger.debug("Skipping non-regular file '%s' (stat: %s)", src, st) + + try: + return tar_writepath(src, dst) + except Exception as e: + if isinstance(e, OSError) and e.errno == errno.ENOENT: + logger.debug( + "Skipping missing file '%s' - looks like directory content has changed during archiving", + src, + ) + return + + if onerror: + if onerror(src, dst, sys.exc_info()): + return tar_writepath(src, dst) + else: + raise + + +def check_tar(tar_file_path): + if os.path.isfile(tar_file_path) or os.path.islink(tar_file_path): + return libarchive.is_archive(tar_file_path) + return False + + +def _make_dirs(path): + try: + os.makedirs(path) + except OSError as e: + if e.errno != errno.EEXIST or not os.path.isdir(path): + raise + + +def _hardlink(src, dst): + if hasattr(os, "link"): + os.link(src, dst) + else: + shutil.copyfile(src, dst) + + +def _symlink(src, dst): + if hasattr(os, "symlink"): + os.symlink(src, dst) + else: + # Windows specific case - we cannot copy file right now, + # because it doesn't exist yet (and would be met later in the archive) or symlink is broken. + # Act like tar and tarfile - skip such symlinks + if os.path.exists(src): + shutil.copytree(src, dst) + + +def get_archive_filter_name(filename): + filters = libarchive.get_archive_filter_names(filename) + # https://a.yandex-team.ru/arc/trunk/arcadia/contrib/libs/libarchive/libarchive/archive_read.c?rev=5800047#L522 + assert filters[-1] == "none", filters + if len(filters) == 1: + return None + if len(filters) == 2: + return filters[0] + raise Exception("Archive has chain of filter: {}".format(filters)) diff --git a/library/python/archive/ya.make b/library/python/archive/ya.make new file mode 100644 index 0000000000..5b86a45a42 --- /dev/null +++ b/library/python/archive/ya.make @@ -0,0 +1,19 @@ +PY23_LIBRARY() + +STYLE_PYTHON() + +PY_SRCS( + __init__.py +) + +PEERDIR( + contrib/python/pathlib2 + contrib/python/python-libarchive +) + +END() + +RECURSE_FOR_TESTS( + benchmark + test +) diff --git a/library/python/cityhash/cityhash.pyx b/library/python/cityhash/cityhash.pyx new file mode 100644 index 0000000000..6f0046f0d7 --- /dev/null +++ b/library/python/cityhash/cityhash.pyx @@ -0,0 +1,75 @@ +from libcpp.pair cimport pair + +cdef extern from "util/system/types.h": + ctypedef unsigned long ui64 + + +cdef extern from "util/digest/city.h": + ui64 CityHash64(const char* buf, size_t len) nogil + pair[ui64, ui64] CityHash128(const char* buf, size_t len) nogil + ui64 CityHash64WithSeed(const char* buf, size_t len, ui64 seed) nogil + + +cdef extern from "library/python/cityhash/hash.h": + ui64 FileCityHash128WithSeedHigh64(const char* fpath) nogil except+ + ui64 FileCityHash64(const char* fpath) nogil except+ + + +def hash64(content): + cdef const char* s = content + cdef size_t size = len(content) + cdef ui64 res = 0 + + if size > 128: + with nogil: + res = CityHash64(s, size) + else: + res = CityHash64(s, size) + + return res + +def hash128(content): + cdef const char* s = content + cdef size_t size = len(content) + cdef pair[ui64, ui64] res = pair[ui64, ui64](0, 0) + + if size > 128: + with nogil: + res = CityHash128(s, size) + else: + res = CityHash128(s, size) + return res + + +def hash64seed(content, seed): + cdef const char* s = content + cdef size_t size = len(content) + cdef ui64 _seed = seed; + + if size > 128: + with nogil: + res = CityHash64WithSeed(s, size, _seed) + else: + res = CityHash64WithSeed(s, size, _seed) + + return res + + +def filehash64(path): + cdef const char* p = path + cdef ui64 res = 0 + + with nogil: + res = FileCityHash64(p) + + return res + + +def filehash128high64(path): + cdef const char* p = path + cdef ui64 res = 0 + + with nogil: + res = FileCityHash128WithSeedHigh64(p) + + return res diff --git a/library/python/cityhash/hash.cpp b/library/python/cityhash/hash.cpp new file mode 100644 index 0000000000..17bd3a75f3 --- /dev/null +++ b/library/python/cityhash/hash.cpp @@ -0,0 +1,32 @@ +#include "hash.h" + +#include <util/digest/city.h> +#include <util/generic/string.h> +#include <util/memory/blob.h> +#include <util/system/file.h> +#include <util/system/fstat.h> + +void ReadFile(const char* fpath, TBlob& blob) { + TFile f(TString{fpath}, RdOnly | Seq); + const TFileStat fs(f); + auto size = fs.Size; + + if (size < (64 << 10)) { + blob = TBlob::FromFileContent(f, 0, size); + } else { + blob = TBlob::FromFile(f); + } +} + +ui64 FileCityHash128WithSeedHigh64(const char* fpath) { + TBlob blob; + ReadFile(fpath, blob); + const uint128 hash = CityHash128WithSeed((const char*)blob.Data(), blob.Size(), uint128(0, blob.Size())); + return Uint128High64(hash); +} + +ui64 FileCityHash64(const char* fpath) { + TBlob blob; + ReadFile(fpath, blob); + return CityHash64(static_cast<const char*>(blob.Data()), blob.Size()); +} diff --git a/library/python/cityhash/hash.h b/library/python/cityhash/hash.h new file mode 100644 index 0000000000..64b22ba74b --- /dev/null +++ b/library/python/cityhash/hash.h @@ -0,0 +1,6 @@ +#pragma once + +#include <util/system/defaults.h> + +ui64 FileCityHash128WithSeedHigh64(const char* fpath); +ui64 FileCityHash64(const char* fpath); diff --git a/library/python/cityhash/ya.make b/library/python/cityhash/ya.make new file mode 100644 index 0000000000..7948e19389 --- /dev/null +++ b/library/python/cityhash/ya.make @@ -0,0 +1,16 @@ +PY23_LIBRARY() + +SRCS( + hash.cpp +) + +PY_SRCS( + TOP_LEVEL + cityhash.pyx +) + +END() + +RECURSE_FOR_TESTS( + test +) diff --git a/library/python/codecs/__codecs.pyx b/library/python/codecs/__codecs.pyx new file mode 100644 index 0000000000..42ec37fe88 --- /dev/null +++ b/library/python/codecs/__codecs.pyx @@ -0,0 +1,61 @@ +import six + +from libcpp cimport bool + +from util.generic.string cimport TString, TStringBuf + + +def to_bytes(s): + try: + return s.encode('utf-8') + except AttributeError: + pass + + return s + + +def from_bytes(s): + if six.PY3: + return s.decode('utf-8') + + return s + + +cdef extern from "library/cpp/blockcodecs/codecs.h" namespace "NBlockCodecs": + cdef cppclass ICodec: + void Encode(TStringBuf data, TString& res) nogil + void Decode(TStringBuf data, TString& res) nogil + + cdef const ICodec* Codec(const TStringBuf& name) except + + cdef TString ListAllCodecsAsString() except + + + +def dumps(name, data): + name = to_bytes(name) + + cdef const ICodec* codec = Codec(TStringBuf(name, len(name))) + cdef TString res + cdef TStringBuf cdata = TStringBuf(data, len(data)) + + with nogil: + codec.Encode(cdata, res) + + return res.c_str()[:res.length()] + + +def loads(name, data): + name = to_bytes(name) + + cdef const ICodec* codec = Codec(TStringBuf(name, len(name))) + cdef TString res + cdef TStringBuf cdata = TStringBuf(data, len(data)) + + with nogil: + codec.Decode(cdata, res) + + return res.c_str()[:res.length()] + +def list_all_codecs(): + cdef TString res = ListAllCodecsAsString() + + return from_bytes(res.c_str()[:res.length()]).split(',') diff --git a/library/python/codecs/__init__.py b/library/python/codecs/__init__.py new file mode 100644 index 0000000000..b9fb00deb0 --- /dev/null +++ b/library/python/codecs/__init__.py @@ -0,0 +1 @@ +from __codecs import loads, dumps, list_all_codecs # noqa diff --git a/library/python/codecs/ya.make b/library/python/codecs/ya.make new file mode 100644 index 0000000000..f42d115d5d --- /dev/null +++ b/library/python/codecs/ya.make @@ -0,0 +1,16 @@ +PY23_LIBRARY() + +PEERDIR( + library/cpp/blockcodecs + contrib/python/six +) + +PY_SRCS( + __init__.py +) + +BUILDWITH_CYTHON_CPP(__codecs.pyx) + +PY_REGISTER(__codecs) + +END() diff --git a/library/python/compress/__init__.py b/library/python/compress/__init__.py new file mode 100644 index 0000000000..380ec47dca --- /dev/null +++ b/library/python/compress/__init__.py @@ -0,0 +1,147 @@ +from io import open + +import struct +import json +import os +import logging + +import library.python.par_apply as lpp +import library.python.codecs as lpc + + +logger = logging.getLogger('compress') + + +def list_all_codecs(): + return sorted(frozenset(lpc.list_all_codecs())) + + +def find_codec(ext): + def ext_compress(x): + return lpc.dumps(ext, x) + + def ext_decompress(x): + return lpc.loads(ext, x) + + ext_decompress(ext_compress(b'')) + + return {'c': ext_compress, 'd': ext_decompress, 'n': ext} + + +def codec_for(path): + for ext in reversed(path.split('.')): + try: + return find_codec(ext) + except Exception as e: + logger.debug('in codec_for(): %s', e) + + raise Exception('unsupported file %s' % path) + + +def compress(fr, to, codec=None, fopen=open, threads=1): + if codec: + codec = find_codec(codec) + else: + codec = codec_for(to) + + func = codec['c'] + + def iter_blocks(): + with fopen(fr, 'rb') as f: + while True: + chunk = f.read(16 * 1024 * 1024) + + if chunk: + yield chunk + else: + yield b'' + + return + + def iter_results(): + info = { + 'codec': codec['n'], + } + + if fr: + info['size'] = os.path.getsize(fr) + + yield json.dumps(info, sort_keys=True) + '\n' + + for c in lpp.par_apply(iter_blocks(), func, threads): + yield c + + with fopen(to, 'wb') as f: + for c in iter_results(): + logger.debug('complete %s', len(c)) + f.write(struct.pack('<I', len(c))) + + try: + f.write(c) + except TypeError: + f.write(c.encode('utf-8')) + + +def decompress(fr, to, codec=None, fopen=open, threads=1): + def iter_chunks(): + with fopen(fr, 'rb') as f: + cnt = 0 + + while True: + ll = f.read(4) + + if ll: + ll = struct.unpack('<I', ll)[0] + + if ll: + if ll > 100000000: + raise Exception('broken stream') + + yield f.read(ll) + + cnt += ll + else: + if not cnt: + raise Exception('empty stream') + + return + + it = iter_chunks() + extra = [] + + for chunk in it: + hdr = {} + + try: + hdr = json.loads(chunk) + except Exception as e: + logger.info('can not parse header, suspect old format: %s', e) + extra.append(chunk) + + break + + def resolve_codec(): + if 'codec' in hdr: + return find_codec(hdr['codec']) + + if codec: + return find_codec(codec) + + return codec_for(fr) + + dc = resolve_codec()['d'] + + def iter_all_chunks(): + for x in extra: + yield x + + for x in it: + yield x + + with fopen(to, 'wb') as f: + for c in lpp.par_apply(iter_all_chunks(), dc, threads): + if c: + logger.debug('complete %s', len(c)) + f.write(c) + else: + break diff --git a/library/python/compress/ya.make b/library/python/compress/ya.make new file mode 100644 index 0000000000..bbf2a784e2 --- /dev/null +++ b/library/python/compress/ya.make @@ -0,0 +1,16 @@ +PY23_LIBRARY() + +PEERDIR( + library/python/codecs + library/python/par_apply +) + +PY_SRCS( + __init__.py +) + +END() + +RECURSE_FOR_TESTS( + tests +) diff --git a/library/python/json/__init__.py b/library/python/json/__init__.py new file mode 100644 index 0000000000..c6420d5e6d --- /dev/null +++ b/library/python/json/__init__.py @@ -0,0 +1,44 @@ +from library.python.json.loads import loads as _loads +from simplejson import loads as _sj_loads + + +def loads(*args, **kwargs): + try: + return _loads(*args, **kwargs) + except Exception as e: + if 'invalid syntax at token' in str(e): + kwargs.pop('intern_keys', None) + kwargs.pop('intern_vals', None) + kwargs.pop('may_unicode', None) + return _sj_loads(*args, **kwargs) + + raise + + +from simplejson import load, dump, dumps # noqa + + +def read_file(file_name, **kwargs): + """ + Read file and return its parsed json contents. + + All kwargs will be proxied to `json.load` method as is. + + :param file_name: file with json contents + :return: parsed json contents + """ + with open(file_name) as f: + return load(f, **kwargs) + + +def write_file(file_name, contents, **kwargs): + """ + Dump json data to file. + + All kwargs will be proxied to `json.dump` method as is. + + :param file_name: file to dump to + :param contents: JSON-serializable object + """ + with open(file_name, "w") as f: + dump(contents, f, **kwargs) diff --git a/library/python/json/loads.cpp b/library/python/json/loads.cpp new file mode 100644 index 0000000000..19cdb096ae --- /dev/null +++ b/library/python/json/loads.cpp @@ -0,0 +1,246 @@ +#include "loads.h" + +#include <Python.h> + +#include <library/cpp/json/fast_sax/parser.h> + +#include <util/generic/algorithm.h> +#include <util/generic/stack.h> +#include <util/generic/vector.h> +#include <util/generic/ylimits.h> +#include <util/string/ascii.h> + +using namespace NJson; + +namespace { + enum EKind { + Undefined, + Array, + Dict, + Value, + Key, + }; + + static inline TStringBuf ToStr(EKind kind) noexcept { + switch (kind) { + case Undefined: + return TStringBuf("Undefined"); + + case Array: + return TStringBuf("Array"); + + case Dict: + return TStringBuf("Dict"); + + case Value: + return TStringBuf("Value"); + + case Key: + return TStringBuf("Key"); + } + + Y_UNREACHABLE(); + } + + struct TUnref { + static inline void Destroy(PyObject* o) noexcept { + Py_XDECREF(o); + } + }; + + using TObjectPtr = TAutoPtr<PyObject, TUnref>; + + static inline TObjectPtr BuildBool(bool val) noexcept { + if (val) { + Py_RETURN_TRUE; + } + + Py_RETURN_FALSE; + } + + // Translate python exceptions from object-creating functions into c++ exceptions + // Such errors are reported by returning nullptr + // When a python error is set and C++ exception is caught by Cython wrapper, + // Python exception is propagated, while C++ exception is discarded. + PyObject* CheckNewObject(PyObject* obj) { + Y_ENSURE(obj != nullptr, "got python exception"); + return obj; + } + + void CheckRetcode(int retcode) { + Y_ENSURE(retcode == 0, "got python exception"); + } + + static inline TObjectPtr BuildSmall(long val) { +#if PY_VERSION_HEX >= 0x03000000 + return CheckNewObject(PyLong_FromLong(val)); +#else + return CheckNewObject(PyInt_FromLong(val)); +#endif + } + + PyObject* CreatePyString(TStringBuf str, bool intern, bool mayUnicode) { +#if PY_VERSION_HEX >= 0x03000000 + Y_UNUSED(mayUnicode); + PyObject* pyStr = PyUnicode_FromStringAndSize(str.data(), str.size()); + if (intern) { + PyUnicode_InternInPlace(&pyStr); + } +#else + const bool needUnicode = mayUnicode && !AllOf(str, IsAscii); + PyObject* pyStr = needUnicode ? PyUnicode_FromStringAndSize(str.data(), str.size()) + : PyString_FromStringAndSize(str.data(), str.size()); + if (intern && !needUnicode) { + PyString_InternInPlace(&pyStr); + } +#endif + return pyStr; + } + + struct TVal { + EKind Kind = Undefined; + TObjectPtr Val; + + inline TVal() noexcept + : Kind(Undefined) + { + } + + inline TVal(EKind kind, TObjectPtr val) noexcept + : Kind(kind) + , Val(val) + { + } + }; + + static inline TObjectPtr NoneRef() noexcept { + Py_RETURN_NONE; + } + + struct TContext: public TJsonCallbacks { + const bool InternKeys; + const bool InternVals; + const bool MayUnicode; + TStack<TVal, TVector<TVal>> S; + + inline TContext(bool internKeys, bool internVals, bool mayUnicode) + : TJsonCallbacks(true) + , InternKeys(internKeys) + , InternVals(internVals) + , MayUnicode(mayUnicode) + { + S.emplace(); + } + + inline bool Consume(TObjectPtr o) { + auto& t = S.top(); + + if (t.Kind == Array) { + CheckRetcode(PyList_Append(t.Val.Get(), o.Get())); + } else if (t.Kind == Key) { + auto key = S.top().Val; + + S.pop(); + + CheckRetcode(PyDict_SetItem(S.top().Val.Get(), key.Get(), o.Get())); + } else { + t = TVal(Value, o); + } + + return true; + } + + inline TObjectPtr Pop(EKind expect) { + auto res = S.top(); + + S.pop(); + + if (res.Kind != expect) { + ythrow yexception() << "unexpected kind(expect " << ToStr(expect) << ", got " << ToStr(res.Kind) << ")"; + } + + return res.Val; + } + + inline void Push(EKind kind, TObjectPtr object) { + S.push(TVal(kind, object)); + } + + virtual bool OnNull() { + return Consume(NoneRef()); + } + + virtual bool OnBoolean(bool v) { + return Consume(BuildBool(v)); + } + + virtual bool OnInteger(long long v) { + if (v >= (long long)Min<long>()) { + return Consume(BuildSmall((long)v)); + } + + return Consume(CheckNewObject(PyLong_FromLongLong(v))); + } + + virtual bool OnUInteger(unsigned long long v) { + if (v <= (unsigned long long)Max<long>()) { + return Consume(BuildSmall((long)v)); + } + + return Consume(CheckNewObject(PyLong_FromUnsignedLongLong(v))); + } + + virtual bool OnDouble(double v) { + return Consume(CheckNewObject(PyFloat_FromDouble(v))); + } + + virtual bool OnString(const TStringBuf& v) { + return Consume(CheckNewObject(CreatePyString(v, InternVals, MayUnicode))); + } + + virtual bool OnOpenMap() { + Push(Dict, CheckNewObject(PyDict_New())); + + return true; + } + + virtual bool OnCloseMap() { + return Consume(Pop(Dict)); + } + + virtual bool OnMapKey(const TStringBuf& k) { + Push(Key, CheckNewObject(CreatePyString(k, InternKeys, MayUnicode))); + return true; + } + + virtual bool OnOpenArray() { + Push(Array, CheckNewObject(PyList_New(0))); + + return true; + } + + virtual bool OnCloseArray() { + return Consume(Pop(Array)); + } + }; +} + +PyObject* LoadJsonFromString(const char* data, size_t len, bool internKeys, bool internVals, bool mayUnicode) { + TContext ctx(internKeys, internVals, mayUnicode); + + if (!len) { + ythrow yexception() << "parse error: zero length input string"; + } + + if (!NJson::ReadJsonFast(TStringBuf(data, len), &ctx)) { + ythrow yexception() << "parse error"; + } + + auto& s = ctx.S; + + if (!s || s.top().Kind != Value) { + ythrow yexception() << "shit happen"; + } + + return s.top().Val.Release(); +} diff --git a/library/python/json/loads.h b/library/python/json/loads.h new file mode 100644 index 0000000000..62dcdf6f21 --- /dev/null +++ b/library/python/json/loads.h @@ -0,0 +1,5 @@ +#pragma once + +#include <Python.h> + +PyObject* LoadJsonFromString(const char* data, size_t len, bool internKeys = false, bool internVals = false, bool mayUnicode = false); diff --git a/library/python/json/loads.pyx b/library/python/json/loads.pyx new file mode 100644 index 0000000000..82e5c6dce7 --- /dev/null +++ b/library/python/json/loads.pyx @@ -0,0 +1,14 @@ +from libcpp cimport bool + +cdef extern from "library/python/json/loads.h": + object LoadJsonFromString(const char*, size_t, bool internKeys, bool internVals, bool mayUnicode) except + + + +def loads(s, intern_keys = False, intern_vals = False, may_unicode = False): + if isinstance(s, unicode): + s = s.encode('utf-8') + + try: + return LoadJsonFromString(s, len(s), intern_keys, intern_vals, may_unicode) + except Exception as e: + raise ValueError(str(e)) diff --git a/library/python/json/ya.make b/library/python/json/ya.make new file mode 100644 index 0000000000..74a82de9d8 --- /dev/null +++ b/library/python/json/ya.make @@ -0,0 +1,17 @@ +PY23_LIBRARY() + +PEERDIR( + contrib/python/simplejson + library/cpp/json/fast_sax +) + +PY_SRCS( + __init__.py + loads.pyx +) + +SRCS( + loads.cpp +) + +END() diff --git a/library/python/par_apply/__init__.py b/library/python/par_apply/__init__.py new file mode 100644 index 0000000000..19b89ae843 --- /dev/null +++ b/library/python/par_apply/__init__.py @@ -0,0 +1,114 @@ +import sys +import threading +import six + +from six.moves import queue + + +def par_apply(seq, func, thr_num, join_polling=None): + if thr_num < 2: + for x in seq: + yield func(x) + + return + + in_q = queue.Queue() + out_q = queue.Queue() + + def enumerate_blocks(): + n = 0 + + for b in seq: + yield n, [b] + n += 1 + + yield n, None + + def iter_out(): + n = 0 + d = {} + + while True: + if n in d: + r = d[n] + del d[n] + n += 1 + + yield r + else: + res = out_q.get() + + d[res[0]] = res + + out_iter = iter_out() + + def wait_block(): + for x in out_iter: + return x + + def iter_compressed(): + p = 0 + + for n, b in enumerate_blocks(): + in_q.put((n, b)) + + while n > p + (thr_num * 2): + p, b, c = wait_block() + + if not b: + return + + yield p, c + + while True: + p, b, c = wait_block() + + if not b: + return + + yield p, c + + def proc(): + while True: + data = in_q.get() + + if data is None: + return + + n, b = data + + if b: + try: + res = (func(b[0]), None) + except Exception: + res = (None, sys.exc_info()) + else: + res = (None, None) + + out_q.put((n, b, res)) + + thrs = [threading.Thread(target=proc) for i in range(0, thr_num)] + + for t in thrs: + t.start() + + try: + for p, c in iter_compressed(): + res, err = c + + if err: + six.reraise(*err) + + yield res + finally: + for t in thrs: + in_q.put(None) + + for t in thrs: + if join_polling is not None: + while True: + t.join(join_polling) + if not t.is_alive(): + break + else: + t.join() diff --git a/library/python/par_apply/ya.make b/library/python/par_apply/ya.make new file mode 100644 index 0000000000..b14592ab79 --- /dev/null +++ b/library/python/par_apply/ya.make @@ -0,0 +1,11 @@ +PY23_LIBRARY() + +PEERDIR( + contrib/python/six +) + +PY_SRCS( + __init__.py +) + +END() diff --git a/library/python/retry/__init__.py b/library/python/retry/__init__.py new file mode 100644 index 0000000000..5520a70332 --- /dev/null +++ b/library/python/retry/__init__.py @@ -0,0 +1,250 @@ +import copy +import datetime +import functools +import itertools +import logging +import random +import time + + +""" +Retry library provides an ability to retry function calls in a configurable way. + +To retry a certain function call use `retry_call` function. To make function auto-retriable use `retry` +or `retry_intrusive` decorators. Both `retry_call` and `retry` optionally accept retry configuration object +or its fields as kwargs. The `retry_intrusive` is designed for methods and uses intrusive configuration object. + +>>> retry_call(foo) +>>> retry_call(foo, foo_args, foo_kwargs) +>>> retry_call(foo, foo_args, foo_kwargs, conf=conf) + +>>> @retry() +>>> def foo(...): +>>> ... + +>>> @retry(conf) +>>> def foo(...): +>>> ... + +>>> class Obj(object): +>>> def __init__(self): +>>> self.retry_conf = conf +>>> +>>> @retry_intrusive +>>> def foo(self, ...): +>>> ... + +This library differs from its alternatives: + * `retry` contrib library lacks time-based limits, reusable configuration objects and is generally less flexible + * `retrying` contrib library is somewhat more complex, but also lacks reusable configuration objects +""" + + +DEFAULT_SLEEP_FUNC = time.sleep +LOGGER = logging.getLogger(__name__) + + +class RetryConf(object): + """ + Configuration object defines retry behaviour and is composed of these fields: + * `retriable` - function that decides if an exception should trigger retrying + * `get_delay` - function that returns a number of seconds retrier must wait before doing the next attempt + * `max_time` - maximum `datetime.timedelta` that can pass after the first call for any retry attempt to be done + * `max_times` - maximum number of retry attempts (note retries, not tries/calls) + * `handle_error` - function that is called for each failed call attempt + * `logger` - logger object to record retry warnings with + * `sleep` - custom sleep function to use for waiting + + >>> RetryConf(max_time=datetime.timedelta(seconds=30), max_times=10) + + Empty configuration retries indefinitely on any exceptions raised. + + By default `DEFAULT_CONF` if used, which retries indefinitely, waiting 1 sec with 1.2 backoff between attempts, and + also logging with built-in logger object. + + Configuration must be cloned before modification to create separate configuration: + + >>> DEFAULT_CONF.clone() + + There are various methods that provide convenient clone-and-modify shortcuts and "retry recipes". + """ + + _PROPS = { + "retriable": lambda e: True, + "get_delay": lambda n, raised_after, last: 0, + "max_time": None, + "max_times": None, + "handle_error": None, + "logger": None, + "sleep": DEFAULT_SLEEP_FUNC, + } + + def __init__(self, **kwargs): + for prop, default_value in self._PROPS.items(): + setattr(self, prop, default_value) + self._set(**kwargs) + + def __repr__(self): + return repr(self.__dict__) + + def clone(self, **kwargs): + """ + Clone configuration. + """ + + obj = copy.copy(self) + obj._set(**kwargs) + return obj + + def on(self, *errors): + """ + Clone and retry on specific exception types (retriable shortcut): + + >>> conf = conf.on(MyException, MyOtherException) + """ + + obj = self.clone() + obj.retriable = lambda e: isinstance(e, errors) + return obj + + def waiting(self, delay=0, backoff=1.0, jitter=0, limit=None): + """ + Clone and wait between attempts with backoff, jitter and limit (get_delay shortcut): + + >>> conf = conf.waiting(delay) + >>> conf = conf.waiting(delay, backoff=2.0) # initial delay with backoff x2 on each attempt + >>> conf = conf.waiting(delay, jitter=3) # jitter from 0 to 3 seconds + >>> conf = conf.waiting(delay, backoff=2.0, limit=60) # delay with backoff, but not greater than a minute + + All these options can be combined together, of course. + """ + + def get_delay(n, raised_after, last): + if n == 1: + return delay + + s = last * backoff + s += random.uniform(0, jitter) + if limit is not None: + s = min(s, limit) + return s + + obj = self.clone() + obj.get_delay = get_delay + return obj + + def upto(self, seconds=0, **other_timedelta_kwargs): + """ + Clone and do retry attempts only for some time (max_time shortcut): + + >>> conf = conf.upto(30) # retrying for 30 seconds + >>> conf = conf.upto(hours=1, minutes=20) # retrying for 1:20 + + Any `datetime.timedelta` kwargs can be used here. + """ + + obj = self.clone() + obj.max_time = datetime.timedelta(seconds=seconds, **other_timedelta_kwargs) + return obj + + def upto_retries(self, retries=0): + """ + Set limit for retry attempts number (max_times shortcut): + + >>> conf = conf.upto_retries(10) + """ + + obj = self.clone() + obj.max_times = retries + return obj + + def _set(self, **kwargs): + for prop, value in kwargs.items(): + if prop not in self._PROPS: + continue + setattr(self, prop, value) + + +DEFAULT_CONF = RetryConf(logger=LOGGER).waiting(1, backoff=1.2) + + +def retry_call(f, f_args=(), f_kwargs={}, conf=DEFAULT_CONF, **kwargs): + """ + Retry function call. + + :param f: function to be retried + :param f_args: target function args + :param f_kwargs: target function kwargs + :param conf: configuration + """ + + if kwargs: + conf = conf.clone(**kwargs) + return _retry(conf, functools.partial(f, *f_args, **f_kwargs)) + + +def retry(conf=DEFAULT_CONF, **kwargs): + """ + Retrying decorator. + + :param conf: configuration + """ + + if kwargs: + conf = conf.clone(**kwargs) + + def decorator(f): + @functools.wraps(f) + def wrapped(*f_args, **f_kwargs): + return _retry(conf, functools.partial(f, *f_args, **f_kwargs)) + + return wrapped + + return decorator + + +def retry_intrusive(f): + """ + Retrying method decorator that uses an intrusive conf (obj.retry_conf). + """ + + @functools.wraps(f) + def wrapped(obj, *f_args, **f_kwargs): + assert hasattr(obj, "retry_conf"), "Object must have retry_conf attribute for decorator to run" + return _retry(obj.retry_conf, functools.partial(f, obj, *f_args, **f_kwargs)) + + return wrapped + + +def _retry(conf, f): + start = datetime.datetime.now() + delay = 0 + for n in itertools.count(1): + try: + return f() + except Exception as error: + raised_after = datetime.datetime.now() - start + if conf.handle_error: + conf.handle_error(error, n, raised_after) + delay = conf.get_delay(n, raised_after, delay) + retry_after = raised_after + datetime.timedelta(seconds=delay) + retrying = ( + conf.retriable(error) + and (conf.max_times is None or n <= conf.max_times) + and (conf.max_time is None or retry_after <= conf.max_time) + ) + if not retrying: + raise + if delay: + conf.sleep(delay) + if conf.logger: + conf.logger.warning( + "Retrying (try %d) after %s (%s + %s sec) on %s: %s", + n, + retry_after, + raised_after, + delay, + error.__class__.__name__, + error, + exc_info=True, + ) diff --git a/library/python/retry/ya.make b/library/python/retry/ya.make new file mode 100644 index 0000000000..dcbf5567cd --- /dev/null +++ b/library/python/retry/ya.make @@ -0,0 +1,11 @@ +PY23_LIBRARY() + +STYLE_PYTHON() + +PY_SRCS(__init__.py) + +END() + +RECURSE_FOR_TESTS( + tests +) diff --git a/library/python/testing/yatest_common/ut/test.py b/library/python/testing/yatest_common/ut/test.py new file mode 100644 index 0000000000..bffdf1e353 --- /dev/null +++ b/library/python/testing/yatest_common/ut/test.py @@ -0,0 +1,17 @@ +import os +import tarfile + +import yatest.common + +import yalibrary.tools + + +def test_jdk_from_package_equals_jdk_tool_from_yaconf_json(): + jdk_path = yatest.common.binary_path(os.path.join('build', 'platform', 'java', 'jdk', 'testing')) + os.makedirs("extracted") + with tarfile.open(os.path.join(jdk_path, "jdk.tar")) as tf: + tf.extractall("extracted") + jdk_tool_path = yalibrary.tools.toolchain_root('java', None, None) + with open(os.path.join("extracted", "release")) as jdk_path_release: + with open(os.path.join(jdk_tool_path, "release")) as jdk_tool_path_release: + assert jdk_path_release.read() == jdk_tool_path_release.read() diff --git a/library/python/testing/yatest_common/ut/ya.make b/library/python/testing/yatest_common/ut/ya.make new file mode 100644 index 0000000000..cc56fa1b6c --- /dev/null +++ b/library/python/testing/yatest_common/ut/ya.make @@ -0,0 +1,17 @@ +PY2TEST() + +TEST_SRCS(test.py) + +PEERDIR( + devtools/ya/yalibrary/tools +) + +DEPENDS( + build/platform/java/jdk/testing +) + +REQUIREMENTS( + network:full +) + +END() diff --git a/library/python/testing/yatest_common/ya.make b/library/python/testing/yatest_common/ya.make index 2f5aa3bf39..9798bb40bb 100644 --- a/library/python/testing/yatest_common/ya.make +++ b/library/python/testing/yatest_common/ya.make @@ -36,3 +36,7 @@ IF (NOT CATBOOST_OPENSOURCE) ENDIF() END() + +RECURSE( + ut +) diff --git a/library/python/testing/yatest_common/yatest/common/runtime.py b/library/python/testing/yatest_common/yatest/common/runtime.py index b43666b596..7f6f9df440 100644 --- a/library/python/testing/yatest_common/yatest/common/runtime.py +++ b/library/python/testing/yatest_common/yatest/common/runtime.py @@ -158,7 +158,7 @@ def java_path(): """ from . import runtime_java - return runtime_java.get_java_path(binary_path(os.path.join('contrib', 'tools', 'jdk'))) + return runtime_java.get_java_path(binary_path(os.path.join('build', 'platform', 'java', 'jdk', 'testing'))) def java_home(): |