aboutsummaryrefslogtreecommitdiffstats
path: root/library/python
diff options
context:
space:
mode:
authorthegeorg <thegeorg@yandex-team.com>2023-10-03 11:19:48 +0300
committerthegeorg <thegeorg@yandex-team.com>2023-10-03 11:43:28 +0300
commitcda0c13f23f6b169fb0a49dc504b40a0aaecea09 (patch)
tree26476e92e5af2c856e017afb1df8f8dff42495bf /library/python
parent4854116da9c5e3c95bb8440f2ea997c54b6e1a61 (diff)
downloadydb-cda0c13f23f6b169fb0a49dc504b40a0aaecea09.tar.gz
Move contrib/tools/jdk to build/platform/java/jdk/testing
Diffstat (limited to 'library/python')
-rw-r--r--library/python/archive/__init__.py266
-rw-r--r--library/python/archive/ya.make19
-rw-r--r--library/python/cityhash/cityhash.pyx75
-rw-r--r--library/python/cityhash/hash.cpp32
-rw-r--r--library/python/cityhash/hash.h6
-rw-r--r--library/python/cityhash/ya.make16
-rw-r--r--library/python/codecs/__codecs.pyx61
-rw-r--r--library/python/codecs/__init__.py1
-rw-r--r--library/python/codecs/ya.make16
-rw-r--r--library/python/compress/__init__.py147
-rw-r--r--library/python/compress/ya.make16
-rw-r--r--library/python/json/__init__.py44
-rw-r--r--library/python/json/loads.cpp246
-rw-r--r--library/python/json/loads.h5
-rw-r--r--library/python/json/loads.pyx14
-rw-r--r--library/python/json/ya.make17
-rw-r--r--library/python/par_apply/__init__.py114
-rw-r--r--library/python/par_apply/ya.make11
-rw-r--r--library/python/retry/__init__.py250
-rw-r--r--library/python/retry/ya.make11
-rw-r--r--library/python/testing/yatest_common/ut/test.py17
-rw-r--r--library/python/testing/yatest_common/ut/ya.make17
-rw-r--r--library/python/testing/yatest_common/ya.make4
-rw-r--r--library/python/testing/yatest_common/yatest/common/runtime.py2
24 files changed, 1406 insertions, 1 deletions
diff --git a/library/python/archive/__init__.py b/library/python/archive/__init__.py
new file mode 100644
index 0000000000..a6e032ff4c
--- /dev/null
+++ b/library/python/archive/__init__.py
@@ -0,0 +1,266 @@
+import errno
+import logging
+import os
+import random
+import shutil
+import stat
+import string
+import sys
+
+import six
+
+import libarchive
+import libarchive._libarchive as _libarchive
+
+from pathlib2 import PurePath
+
+logger = logging.getLogger(__name__)
+
+GZIP = "gzip"
+ZSTD = "zstd"
+
+ENCODING = "utf-8"
+
+
+class ConfigureError(Exception):
+ pass
+
+
+class Level(object):
+ def __init__(self, level):
+ self.level = level
+
+
+class Compression(object):
+ Fast = Level(1)
+ Default = Level(2)
+ Best = Level(3)
+
+
+def get_compression_level(filter_name, level):
+ if level is None or not filter_name:
+ return None
+ elif isinstance(level, Level):
+ level = {
+ GZIP: {
+ Compression.Fast: 1,
+ Compression.Default: 6,
+ Compression.Best: 9,
+ },
+ ZSTD: {
+ Compression.Fast: 1,
+ Compression.Default: 3,
+ Compression.Best: 22,
+ },
+ }[filter_name][level]
+ return level
+
+
+def encode(value, encoding):
+ return value.encode(encoding)
+
+
+def extract_tar(tar_file_path, output_dir, strip_components=None, fail_on_duplicates=True):
+ output_dir = encode(output_dir, ENCODING)
+ _make_dirs(output_dir)
+ with libarchive.Archive(tar_file_path, mode="rb") as tarfile:
+ for e in tarfile:
+ p = _strip_prefix(e.pathname, strip_components)
+ if not p:
+ continue
+ dest = os.path.join(output_dir, encode(p, ENCODING))
+ if e.pathname.endswith("/"):
+ _make_dirs(dest)
+ continue
+
+ if strip_components and fail_on_duplicates:
+ if os.path.exists(dest):
+ raise Exception(
+ "The file {} is duplicated because of strip_components={}".format(dest, strip_components)
+ )
+
+ _make_dirs(os.path.dirname(dest))
+
+ if e.ishardlink():
+ src = os.path.join(output_dir, _strip_prefix(e.hardlink, strip_components))
+ _hardlink(src, dest)
+ continue
+ if e.issym():
+ src = _strip_prefix(e.linkname, strip_components)
+ _symlink(src, dest)
+ continue
+
+ with open(dest, "wb") as f:
+ if hasattr(os, "fchmod"):
+ os.fchmod(f.fileno(), e.mode & 0o7777)
+ libarchive.call_and_check(
+ _libarchive.archive_read_data_into_fd,
+ tarfile._a,
+ tarfile._a,
+ f.fileno(),
+ )
+
+
+def _strip_prefix(path, strip_components):
+ if not strip_components:
+ return path
+ p = PurePath(path)
+ stripped = str(p.relative_to(*p.parts[:strip_components]))
+ return '' if stripped == '.' else stripped
+
+
+def tar(
+ paths,
+ output,
+ compression_filter=None,
+ compression_level=None,
+ fixed_mtime=None,
+ onerror=None,
+ postprocess=None,
+ dereference=False,
+):
+ if isinstance(paths, six.string_types):
+ paths = [paths]
+
+ if isinstance(output, six.string_types):
+ temp_tar_path, stream = (
+ output + "." + "".join(random.sample(string.ascii_lowercase, 8)),
+ None,
+ )
+ else:
+ temp_tar_path, stream = None, output
+
+ compression_level = get_compression_level(compression_filter, compression_level)
+
+ try:
+ if compression_filter:
+ filter_name = compression_filter
+ if compression_level is not None:
+ filter_opts = {"compression-level": str(compression_level)}
+ else:
+ filter_opts = {}
+ # force gzip don't store mtime of the original file being compressed (http://www.gzip.org/zlib/rfc-gzip.html#file-format)
+ if fixed_mtime is not None and compression_filter == GZIP:
+ filter_opts["timestamp"] = ""
+ else:
+ filter_name = filter_opts = None
+
+ with libarchive.Archive(
+ stream or temp_tar_path,
+ mode="wb",
+ format="gnu",
+ filter=filter_name,
+ filter_opts=filter_opts,
+ fixed_mtime=fixed_mtime,
+ ) as tarfile:
+ # determine order if fixed_mtime is specified to produce stable archive
+ paths = paths if fixed_mtime is None else sorted(paths)
+
+ for p in paths:
+ if type(p) == tuple:
+ path, arcname = p
+ else:
+ path, arcname = p, os.path.basename(p)
+
+ if os.path.isdir(path):
+ for root, dirs, files in os.walk(path, followlinks=dereference):
+ if fixed_mtime is None:
+ entries = dirs + files
+ else:
+ entries = sorted(dirs) + sorted(files)
+
+ reldir = os.path.relpath(root, path)
+ for f in entries:
+ _writepath(
+ tarfile,
+ os.path.join(root, f),
+ os.path.normpath(os.path.join(arcname, reldir, f)),
+ onerror,
+ postprocess,
+ dereference,
+ )
+ else:
+ if not os.path.exists(path):
+ raise OSError("Specified path doesn't exist: {}".format(path))
+ _writepath(tarfile, path, arcname, onerror, postprocess, dereference)
+
+ if temp_tar_path:
+ os.rename(temp_tar_path, output)
+ except Exception:
+ if temp_tar_path and os.path.exists(temp_tar_path):
+ os.remove(temp_tar_path)
+ raise
+
+
+def _writepath(tarfile, src, dst, onerror, postprocess, dereference):
+ def tar_writepath(src, dst):
+ st = os.lstat(src)
+ if stat.S_ISREG(st.st_mode) or stat.S_ISDIR(st.st_mode) or stat.S_ISLNK(st.st_mode):
+ if dereference and stat.S_ISLNK(st.st_mode):
+ src = os.path.realpath(src)
+
+ tarfile.writepath(src, dst)
+
+ if postprocess:
+ postprocess(src, dst, st.st_mode)
+ else:
+ logger.debug("Skipping non-regular file '%s' (stat: %s)", src, st)
+
+ try:
+ return tar_writepath(src, dst)
+ except Exception as e:
+ if isinstance(e, OSError) and e.errno == errno.ENOENT:
+ logger.debug(
+ "Skipping missing file '%s' - looks like directory content has changed during archiving",
+ src,
+ )
+ return
+
+ if onerror:
+ if onerror(src, dst, sys.exc_info()):
+ return tar_writepath(src, dst)
+ else:
+ raise
+
+
+def check_tar(tar_file_path):
+ if os.path.isfile(tar_file_path) or os.path.islink(tar_file_path):
+ return libarchive.is_archive(tar_file_path)
+ return False
+
+
+def _make_dirs(path):
+ try:
+ os.makedirs(path)
+ except OSError as e:
+ if e.errno != errno.EEXIST or not os.path.isdir(path):
+ raise
+
+
+def _hardlink(src, dst):
+ if hasattr(os, "link"):
+ os.link(src, dst)
+ else:
+ shutil.copyfile(src, dst)
+
+
+def _symlink(src, dst):
+ if hasattr(os, "symlink"):
+ os.symlink(src, dst)
+ else:
+ # Windows specific case - we cannot copy file right now,
+ # because it doesn't exist yet (and would be met later in the archive) or symlink is broken.
+ # Act like tar and tarfile - skip such symlinks
+ if os.path.exists(src):
+ shutil.copytree(src, dst)
+
+
+def get_archive_filter_name(filename):
+ filters = libarchive.get_archive_filter_names(filename)
+ # https://a.yandex-team.ru/arc/trunk/arcadia/contrib/libs/libarchive/libarchive/archive_read.c?rev=5800047#L522
+ assert filters[-1] == "none", filters
+ if len(filters) == 1:
+ return None
+ if len(filters) == 2:
+ return filters[0]
+ raise Exception("Archive has chain of filter: {}".format(filters))
diff --git a/library/python/archive/ya.make b/library/python/archive/ya.make
new file mode 100644
index 0000000000..5b86a45a42
--- /dev/null
+++ b/library/python/archive/ya.make
@@ -0,0 +1,19 @@
+PY23_LIBRARY()
+
+STYLE_PYTHON()
+
+PY_SRCS(
+ __init__.py
+)
+
+PEERDIR(
+ contrib/python/pathlib2
+ contrib/python/python-libarchive
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ benchmark
+ test
+)
diff --git a/library/python/cityhash/cityhash.pyx b/library/python/cityhash/cityhash.pyx
new file mode 100644
index 0000000000..6f0046f0d7
--- /dev/null
+++ b/library/python/cityhash/cityhash.pyx
@@ -0,0 +1,75 @@
+from libcpp.pair cimport pair
+
+cdef extern from "util/system/types.h":
+ ctypedef unsigned long ui64
+
+
+cdef extern from "util/digest/city.h":
+ ui64 CityHash64(const char* buf, size_t len) nogil
+ pair[ui64, ui64] CityHash128(const char* buf, size_t len) nogil
+ ui64 CityHash64WithSeed(const char* buf, size_t len, ui64 seed) nogil
+
+
+cdef extern from "library/python/cityhash/hash.h":
+ ui64 FileCityHash128WithSeedHigh64(const char* fpath) nogil except+
+ ui64 FileCityHash64(const char* fpath) nogil except+
+
+
+def hash64(content):
+ cdef const char* s = content
+ cdef size_t size = len(content)
+ cdef ui64 res = 0
+
+ if size > 128:
+ with nogil:
+ res = CityHash64(s, size)
+ else:
+ res = CityHash64(s, size)
+
+ return res
+
+def hash128(content):
+ cdef const char* s = content
+ cdef size_t size = len(content)
+ cdef pair[ui64, ui64] res = pair[ui64, ui64](0, 0)
+
+ if size > 128:
+ with nogil:
+ res = CityHash128(s, size)
+ else:
+ res = CityHash128(s, size)
+ return res
+
+
+def hash64seed(content, seed):
+ cdef const char* s = content
+ cdef size_t size = len(content)
+ cdef ui64 _seed = seed;
+
+ if size > 128:
+ with nogil:
+ res = CityHash64WithSeed(s, size, _seed)
+ else:
+ res = CityHash64WithSeed(s, size, _seed)
+
+ return res
+
+
+def filehash64(path):
+ cdef const char* p = path
+ cdef ui64 res = 0
+
+ with nogil:
+ res = FileCityHash64(p)
+
+ return res
+
+
+def filehash128high64(path):
+ cdef const char* p = path
+ cdef ui64 res = 0
+
+ with nogil:
+ res = FileCityHash128WithSeedHigh64(p)
+
+ return res
diff --git a/library/python/cityhash/hash.cpp b/library/python/cityhash/hash.cpp
new file mode 100644
index 0000000000..17bd3a75f3
--- /dev/null
+++ b/library/python/cityhash/hash.cpp
@@ -0,0 +1,32 @@
+#include "hash.h"
+
+#include <util/digest/city.h>
+#include <util/generic/string.h>
+#include <util/memory/blob.h>
+#include <util/system/file.h>
+#include <util/system/fstat.h>
+
+void ReadFile(const char* fpath, TBlob& blob) {
+ TFile f(TString{fpath}, RdOnly | Seq);
+ const TFileStat fs(f);
+ auto size = fs.Size;
+
+ if (size < (64 << 10)) {
+ blob = TBlob::FromFileContent(f, 0, size);
+ } else {
+ blob = TBlob::FromFile(f);
+ }
+}
+
+ui64 FileCityHash128WithSeedHigh64(const char* fpath) {
+ TBlob blob;
+ ReadFile(fpath, blob);
+ const uint128 hash = CityHash128WithSeed((const char*)blob.Data(), blob.Size(), uint128(0, blob.Size()));
+ return Uint128High64(hash);
+}
+
+ui64 FileCityHash64(const char* fpath) {
+ TBlob blob;
+ ReadFile(fpath, blob);
+ return CityHash64(static_cast<const char*>(blob.Data()), blob.Size());
+}
diff --git a/library/python/cityhash/hash.h b/library/python/cityhash/hash.h
new file mode 100644
index 0000000000..64b22ba74b
--- /dev/null
+++ b/library/python/cityhash/hash.h
@@ -0,0 +1,6 @@
+#pragma once
+
+#include <util/system/defaults.h>
+
+ui64 FileCityHash128WithSeedHigh64(const char* fpath);
+ui64 FileCityHash64(const char* fpath);
diff --git a/library/python/cityhash/ya.make b/library/python/cityhash/ya.make
new file mode 100644
index 0000000000..7948e19389
--- /dev/null
+++ b/library/python/cityhash/ya.make
@@ -0,0 +1,16 @@
+PY23_LIBRARY()
+
+SRCS(
+ hash.cpp
+)
+
+PY_SRCS(
+ TOP_LEVEL
+ cityhash.pyx
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ test
+)
diff --git a/library/python/codecs/__codecs.pyx b/library/python/codecs/__codecs.pyx
new file mode 100644
index 0000000000..42ec37fe88
--- /dev/null
+++ b/library/python/codecs/__codecs.pyx
@@ -0,0 +1,61 @@
+import six
+
+from libcpp cimport bool
+
+from util.generic.string cimport TString, TStringBuf
+
+
+def to_bytes(s):
+ try:
+ return s.encode('utf-8')
+ except AttributeError:
+ pass
+
+ return s
+
+
+def from_bytes(s):
+ if six.PY3:
+ return s.decode('utf-8')
+
+ return s
+
+
+cdef extern from "library/cpp/blockcodecs/codecs.h" namespace "NBlockCodecs":
+ cdef cppclass ICodec:
+ void Encode(TStringBuf data, TString& res) nogil
+ void Decode(TStringBuf data, TString& res) nogil
+
+ cdef const ICodec* Codec(const TStringBuf& name) except +
+ cdef TString ListAllCodecsAsString() except +
+
+
+def dumps(name, data):
+ name = to_bytes(name)
+
+ cdef const ICodec* codec = Codec(TStringBuf(name, len(name)))
+ cdef TString res
+ cdef TStringBuf cdata = TStringBuf(data, len(data))
+
+ with nogil:
+ codec.Encode(cdata, res)
+
+ return res.c_str()[:res.length()]
+
+
+def loads(name, data):
+ name = to_bytes(name)
+
+ cdef const ICodec* codec = Codec(TStringBuf(name, len(name)))
+ cdef TString res
+ cdef TStringBuf cdata = TStringBuf(data, len(data))
+
+ with nogil:
+ codec.Decode(cdata, res)
+
+ return res.c_str()[:res.length()]
+
+def list_all_codecs():
+ cdef TString res = ListAllCodecsAsString()
+
+ return from_bytes(res.c_str()[:res.length()]).split(',')
diff --git a/library/python/codecs/__init__.py b/library/python/codecs/__init__.py
new file mode 100644
index 0000000000..b9fb00deb0
--- /dev/null
+++ b/library/python/codecs/__init__.py
@@ -0,0 +1 @@
+from __codecs import loads, dumps, list_all_codecs # noqa
diff --git a/library/python/codecs/ya.make b/library/python/codecs/ya.make
new file mode 100644
index 0000000000..f42d115d5d
--- /dev/null
+++ b/library/python/codecs/ya.make
@@ -0,0 +1,16 @@
+PY23_LIBRARY()
+
+PEERDIR(
+ library/cpp/blockcodecs
+ contrib/python/six
+)
+
+PY_SRCS(
+ __init__.py
+)
+
+BUILDWITH_CYTHON_CPP(__codecs.pyx)
+
+PY_REGISTER(__codecs)
+
+END()
diff --git a/library/python/compress/__init__.py b/library/python/compress/__init__.py
new file mode 100644
index 0000000000..380ec47dca
--- /dev/null
+++ b/library/python/compress/__init__.py
@@ -0,0 +1,147 @@
+from io import open
+
+import struct
+import json
+import os
+import logging
+
+import library.python.par_apply as lpp
+import library.python.codecs as lpc
+
+
+logger = logging.getLogger('compress')
+
+
+def list_all_codecs():
+ return sorted(frozenset(lpc.list_all_codecs()))
+
+
+def find_codec(ext):
+ def ext_compress(x):
+ return lpc.dumps(ext, x)
+
+ def ext_decompress(x):
+ return lpc.loads(ext, x)
+
+ ext_decompress(ext_compress(b''))
+
+ return {'c': ext_compress, 'd': ext_decompress, 'n': ext}
+
+
+def codec_for(path):
+ for ext in reversed(path.split('.')):
+ try:
+ return find_codec(ext)
+ except Exception as e:
+ logger.debug('in codec_for(): %s', e)
+
+ raise Exception('unsupported file %s' % path)
+
+
+def compress(fr, to, codec=None, fopen=open, threads=1):
+ if codec:
+ codec = find_codec(codec)
+ else:
+ codec = codec_for(to)
+
+ func = codec['c']
+
+ def iter_blocks():
+ with fopen(fr, 'rb') as f:
+ while True:
+ chunk = f.read(16 * 1024 * 1024)
+
+ if chunk:
+ yield chunk
+ else:
+ yield b''
+
+ return
+
+ def iter_results():
+ info = {
+ 'codec': codec['n'],
+ }
+
+ if fr:
+ info['size'] = os.path.getsize(fr)
+
+ yield json.dumps(info, sort_keys=True) + '\n'
+
+ for c in lpp.par_apply(iter_blocks(), func, threads):
+ yield c
+
+ with fopen(to, 'wb') as f:
+ for c in iter_results():
+ logger.debug('complete %s', len(c))
+ f.write(struct.pack('<I', len(c)))
+
+ try:
+ f.write(c)
+ except TypeError:
+ f.write(c.encode('utf-8'))
+
+
+def decompress(fr, to, codec=None, fopen=open, threads=1):
+ def iter_chunks():
+ with fopen(fr, 'rb') as f:
+ cnt = 0
+
+ while True:
+ ll = f.read(4)
+
+ if ll:
+ ll = struct.unpack('<I', ll)[0]
+
+ if ll:
+ if ll > 100000000:
+ raise Exception('broken stream')
+
+ yield f.read(ll)
+
+ cnt += ll
+ else:
+ if not cnt:
+ raise Exception('empty stream')
+
+ return
+
+ it = iter_chunks()
+ extra = []
+
+ for chunk in it:
+ hdr = {}
+
+ try:
+ hdr = json.loads(chunk)
+ except Exception as e:
+ logger.info('can not parse header, suspect old format: %s', e)
+ extra.append(chunk)
+
+ break
+
+ def resolve_codec():
+ if 'codec' in hdr:
+ return find_codec(hdr['codec'])
+
+ if codec:
+ return find_codec(codec)
+
+ return codec_for(fr)
+
+ dc = resolve_codec()['d']
+
+ def iter_all_chunks():
+ for x in extra:
+ yield x
+
+ for x in it:
+ yield x
+
+ with fopen(to, 'wb') as f:
+ for c in lpp.par_apply(iter_all_chunks(), dc, threads):
+ if c:
+ logger.debug('complete %s', len(c))
+ f.write(c)
+ else:
+ break
diff --git a/library/python/compress/ya.make b/library/python/compress/ya.make
new file mode 100644
index 0000000000..bbf2a784e2
--- /dev/null
+++ b/library/python/compress/ya.make
@@ -0,0 +1,16 @@
+PY23_LIBRARY()
+
+PEERDIR(
+ library/python/codecs
+ library/python/par_apply
+)
+
+PY_SRCS(
+ __init__.py
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ tests
+)
diff --git a/library/python/json/__init__.py b/library/python/json/__init__.py
new file mode 100644
index 0000000000..c6420d5e6d
--- /dev/null
+++ b/library/python/json/__init__.py
@@ -0,0 +1,44 @@
+from library.python.json.loads import loads as _loads
+from simplejson import loads as _sj_loads
+
+
+def loads(*args, **kwargs):
+ try:
+ return _loads(*args, **kwargs)
+ except Exception as e:
+ if 'invalid syntax at token' in str(e):
+ kwargs.pop('intern_keys', None)
+ kwargs.pop('intern_vals', None)
+ kwargs.pop('may_unicode', None)
+ return _sj_loads(*args, **kwargs)
+
+ raise
+
+
+from simplejson import load, dump, dumps # noqa
+
+
+def read_file(file_name, **kwargs):
+ """
+ Read file and return its parsed json contents.
+
+ All kwargs will be proxied to `json.load` method as is.
+
+ :param file_name: file with json contents
+ :return: parsed json contents
+ """
+ with open(file_name) as f:
+ return load(f, **kwargs)
+
+
+def write_file(file_name, contents, **kwargs):
+ """
+ Dump json data to file.
+
+ All kwargs will be proxied to `json.dump` method as is.
+
+ :param file_name: file to dump to
+ :param contents: JSON-serializable object
+ """
+ with open(file_name, "w") as f:
+ dump(contents, f, **kwargs)
diff --git a/library/python/json/loads.cpp b/library/python/json/loads.cpp
new file mode 100644
index 0000000000..19cdb096ae
--- /dev/null
+++ b/library/python/json/loads.cpp
@@ -0,0 +1,246 @@
+#include "loads.h"
+
+#include <Python.h>
+
+#include <library/cpp/json/fast_sax/parser.h>
+
+#include <util/generic/algorithm.h>
+#include <util/generic/stack.h>
+#include <util/generic/vector.h>
+#include <util/generic/ylimits.h>
+#include <util/string/ascii.h>
+
+using namespace NJson;
+
+namespace {
+ enum EKind {
+ Undefined,
+ Array,
+ Dict,
+ Value,
+ Key,
+ };
+
+ static inline TStringBuf ToStr(EKind kind) noexcept {
+ switch (kind) {
+ case Undefined:
+ return TStringBuf("Undefined");
+
+ case Array:
+ return TStringBuf("Array");
+
+ case Dict:
+ return TStringBuf("Dict");
+
+ case Value:
+ return TStringBuf("Value");
+
+ case Key:
+ return TStringBuf("Key");
+ }
+
+ Y_UNREACHABLE();
+ }
+
+ struct TUnref {
+ static inline void Destroy(PyObject* o) noexcept {
+ Py_XDECREF(o);
+ }
+ };
+
+ using TObjectPtr = TAutoPtr<PyObject, TUnref>;
+
+ static inline TObjectPtr BuildBool(bool val) noexcept {
+ if (val) {
+ Py_RETURN_TRUE;
+ }
+
+ Py_RETURN_FALSE;
+ }
+
+ // Translate python exceptions from object-creating functions into c++ exceptions
+ // Such errors are reported by returning nullptr
+ // When a python error is set and C++ exception is caught by Cython wrapper,
+ // Python exception is propagated, while C++ exception is discarded.
+ PyObject* CheckNewObject(PyObject* obj) {
+ Y_ENSURE(obj != nullptr, "got python exception");
+ return obj;
+ }
+
+ void CheckRetcode(int retcode) {
+ Y_ENSURE(retcode == 0, "got python exception");
+ }
+
+ static inline TObjectPtr BuildSmall(long val) {
+#if PY_VERSION_HEX >= 0x03000000
+ return CheckNewObject(PyLong_FromLong(val));
+#else
+ return CheckNewObject(PyInt_FromLong(val));
+#endif
+ }
+
+ PyObject* CreatePyString(TStringBuf str, bool intern, bool mayUnicode) {
+#if PY_VERSION_HEX >= 0x03000000
+ Y_UNUSED(mayUnicode);
+ PyObject* pyStr = PyUnicode_FromStringAndSize(str.data(), str.size());
+ if (intern) {
+ PyUnicode_InternInPlace(&pyStr);
+ }
+#else
+ const bool needUnicode = mayUnicode && !AllOf(str, IsAscii);
+ PyObject* pyStr = needUnicode ? PyUnicode_FromStringAndSize(str.data(), str.size())
+ : PyString_FromStringAndSize(str.data(), str.size());
+ if (intern && !needUnicode) {
+ PyString_InternInPlace(&pyStr);
+ }
+#endif
+ return pyStr;
+ }
+
+ struct TVal {
+ EKind Kind = Undefined;
+ TObjectPtr Val;
+
+ inline TVal() noexcept
+ : Kind(Undefined)
+ {
+ }
+
+ inline TVal(EKind kind, TObjectPtr val) noexcept
+ : Kind(kind)
+ , Val(val)
+ {
+ }
+ };
+
+ static inline TObjectPtr NoneRef() noexcept {
+ Py_RETURN_NONE;
+ }
+
+ struct TContext: public TJsonCallbacks {
+ const bool InternKeys;
+ const bool InternVals;
+ const bool MayUnicode;
+ TStack<TVal, TVector<TVal>> S;
+
+ inline TContext(bool internKeys, bool internVals, bool mayUnicode)
+ : TJsonCallbacks(true)
+ , InternKeys(internKeys)
+ , InternVals(internVals)
+ , MayUnicode(mayUnicode)
+ {
+ S.emplace();
+ }
+
+ inline bool Consume(TObjectPtr o) {
+ auto& t = S.top();
+
+ if (t.Kind == Array) {
+ CheckRetcode(PyList_Append(t.Val.Get(), o.Get()));
+ } else if (t.Kind == Key) {
+ auto key = S.top().Val;
+
+ S.pop();
+
+ CheckRetcode(PyDict_SetItem(S.top().Val.Get(), key.Get(), o.Get()));
+ } else {
+ t = TVal(Value, o);
+ }
+
+ return true;
+ }
+
+ inline TObjectPtr Pop(EKind expect) {
+ auto res = S.top();
+
+ S.pop();
+
+ if (res.Kind != expect) {
+ ythrow yexception() << "unexpected kind(expect " << ToStr(expect) << ", got " << ToStr(res.Kind) << ")";
+ }
+
+ return res.Val;
+ }
+
+ inline void Push(EKind kind, TObjectPtr object) {
+ S.push(TVal(kind, object));
+ }
+
+ virtual bool OnNull() {
+ return Consume(NoneRef());
+ }
+
+ virtual bool OnBoolean(bool v) {
+ return Consume(BuildBool(v));
+ }
+
+ virtual bool OnInteger(long long v) {
+ if (v >= (long long)Min<long>()) {
+ return Consume(BuildSmall((long)v));
+ }
+
+ return Consume(CheckNewObject(PyLong_FromLongLong(v)));
+ }
+
+ virtual bool OnUInteger(unsigned long long v) {
+ if (v <= (unsigned long long)Max<long>()) {
+ return Consume(BuildSmall((long)v));
+ }
+
+ return Consume(CheckNewObject(PyLong_FromUnsignedLongLong(v)));
+ }
+
+ virtual bool OnDouble(double v) {
+ return Consume(CheckNewObject(PyFloat_FromDouble(v)));
+ }
+
+ virtual bool OnString(const TStringBuf& v) {
+ return Consume(CheckNewObject(CreatePyString(v, InternVals, MayUnicode)));
+ }
+
+ virtual bool OnOpenMap() {
+ Push(Dict, CheckNewObject(PyDict_New()));
+
+ return true;
+ }
+
+ virtual bool OnCloseMap() {
+ return Consume(Pop(Dict));
+ }
+
+ virtual bool OnMapKey(const TStringBuf& k) {
+ Push(Key, CheckNewObject(CreatePyString(k, InternKeys, MayUnicode)));
+ return true;
+ }
+
+ virtual bool OnOpenArray() {
+ Push(Array, CheckNewObject(PyList_New(0)));
+
+ return true;
+ }
+
+ virtual bool OnCloseArray() {
+ return Consume(Pop(Array));
+ }
+ };
+}
+
+PyObject* LoadJsonFromString(const char* data, size_t len, bool internKeys, bool internVals, bool mayUnicode) {
+ TContext ctx(internKeys, internVals, mayUnicode);
+
+ if (!len) {
+ ythrow yexception() << "parse error: zero length input string";
+ }
+
+ if (!NJson::ReadJsonFast(TStringBuf(data, len), &ctx)) {
+ ythrow yexception() << "parse error";
+ }
+
+ auto& s = ctx.S;
+
+ if (!s || s.top().Kind != Value) {
+ ythrow yexception() << "shit happen";
+ }
+
+ return s.top().Val.Release();
+}
diff --git a/library/python/json/loads.h b/library/python/json/loads.h
new file mode 100644
index 0000000000..62dcdf6f21
--- /dev/null
+++ b/library/python/json/loads.h
@@ -0,0 +1,5 @@
+#pragma once
+
+#include <Python.h>
+
+PyObject* LoadJsonFromString(const char* data, size_t len, bool internKeys = false, bool internVals = false, bool mayUnicode = false);
diff --git a/library/python/json/loads.pyx b/library/python/json/loads.pyx
new file mode 100644
index 0000000000..82e5c6dce7
--- /dev/null
+++ b/library/python/json/loads.pyx
@@ -0,0 +1,14 @@
+from libcpp cimport bool
+
+cdef extern from "library/python/json/loads.h":
+ object LoadJsonFromString(const char*, size_t, bool internKeys, bool internVals, bool mayUnicode) except +
+
+
+def loads(s, intern_keys = False, intern_vals = False, may_unicode = False):
+ if isinstance(s, unicode):
+ s = s.encode('utf-8')
+
+ try:
+ return LoadJsonFromString(s, len(s), intern_keys, intern_vals, may_unicode)
+ except Exception as e:
+ raise ValueError(str(e))
diff --git a/library/python/json/ya.make b/library/python/json/ya.make
new file mode 100644
index 0000000000..74a82de9d8
--- /dev/null
+++ b/library/python/json/ya.make
@@ -0,0 +1,17 @@
+PY23_LIBRARY()
+
+PEERDIR(
+ contrib/python/simplejson
+ library/cpp/json/fast_sax
+)
+
+PY_SRCS(
+ __init__.py
+ loads.pyx
+)
+
+SRCS(
+ loads.cpp
+)
+
+END()
diff --git a/library/python/par_apply/__init__.py b/library/python/par_apply/__init__.py
new file mode 100644
index 0000000000..19b89ae843
--- /dev/null
+++ b/library/python/par_apply/__init__.py
@@ -0,0 +1,114 @@
+import sys
+import threading
+import six
+
+from six.moves import queue
+
+
+def par_apply(seq, func, thr_num, join_polling=None):
+ if thr_num < 2:
+ for x in seq:
+ yield func(x)
+
+ return
+
+ in_q = queue.Queue()
+ out_q = queue.Queue()
+
+ def enumerate_blocks():
+ n = 0
+
+ for b in seq:
+ yield n, [b]
+ n += 1
+
+ yield n, None
+
+ def iter_out():
+ n = 0
+ d = {}
+
+ while True:
+ if n in d:
+ r = d[n]
+ del d[n]
+ n += 1
+
+ yield r
+ else:
+ res = out_q.get()
+
+ d[res[0]] = res
+
+ out_iter = iter_out()
+
+ def wait_block():
+ for x in out_iter:
+ return x
+
+ def iter_compressed():
+ p = 0
+
+ for n, b in enumerate_blocks():
+ in_q.put((n, b))
+
+ while n > p + (thr_num * 2):
+ p, b, c = wait_block()
+
+ if not b:
+ return
+
+ yield p, c
+
+ while True:
+ p, b, c = wait_block()
+
+ if not b:
+ return
+
+ yield p, c
+
+ def proc():
+ while True:
+ data = in_q.get()
+
+ if data is None:
+ return
+
+ n, b = data
+
+ if b:
+ try:
+ res = (func(b[0]), None)
+ except Exception:
+ res = (None, sys.exc_info())
+ else:
+ res = (None, None)
+
+ out_q.put((n, b, res))
+
+ thrs = [threading.Thread(target=proc) for i in range(0, thr_num)]
+
+ for t in thrs:
+ t.start()
+
+ try:
+ for p, c in iter_compressed():
+ res, err = c
+
+ if err:
+ six.reraise(*err)
+
+ yield res
+ finally:
+ for t in thrs:
+ in_q.put(None)
+
+ for t in thrs:
+ if join_polling is not None:
+ while True:
+ t.join(join_polling)
+ if not t.is_alive():
+ break
+ else:
+ t.join()
diff --git a/library/python/par_apply/ya.make b/library/python/par_apply/ya.make
new file mode 100644
index 0000000000..b14592ab79
--- /dev/null
+++ b/library/python/par_apply/ya.make
@@ -0,0 +1,11 @@
+PY23_LIBRARY()
+
+PEERDIR(
+ contrib/python/six
+)
+
+PY_SRCS(
+ __init__.py
+)
+
+END()
diff --git a/library/python/retry/__init__.py b/library/python/retry/__init__.py
new file mode 100644
index 0000000000..5520a70332
--- /dev/null
+++ b/library/python/retry/__init__.py
@@ -0,0 +1,250 @@
+import copy
+import datetime
+import functools
+import itertools
+import logging
+import random
+import time
+
+
+"""
+Retry library provides an ability to retry function calls in a configurable way.
+
+To retry a certain function call use `retry_call` function. To make function auto-retriable use `retry`
+or `retry_intrusive` decorators. Both `retry_call` and `retry` optionally accept retry configuration object
+or its fields as kwargs. The `retry_intrusive` is designed for methods and uses intrusive configuration object.
+
+>>> retry_call(foo)
+>>> retry_call(foo, foo_args, foo_kwargs)
+>>> retry_call(foo, foo_args, foo_kwargs, conf=conf)
+
+>>> @retry()
+>>> def foo(...):
+>>> ...
+
+>>> @retry(conf)
+>>> def foo(...):
+>>> ...
+
+>>> class Obj(object):
+>>> def __init__(self):
+>>> self.retry_conf = conf
+>>>
+>>> @retry_intrusive
+>>> def foo(self, ...):
+>>> ...
+
+This library differs from its alternatives:
+ * `retry` contrib library lacks time-based limits, reusable configuration objects and is generally less flexible
+ * `retrying` contrib library is somewhat more complex, but also lacks reusable configuration objects
+"""
+
+
+DEFAULT_SLEEP_FUNC = time.sleep
+LOGGER = logging.getLogger(__name__)
+
+
+class RetryConf(object):
+ """
+ Configuration object defines retry behaviour and is composed of these fields:
+ * `retriable` - function that decides if an exception should trigger retrying
+ * `get_delay` - function that returns a number of seconds retrier must wait before doing the next attempt
+ * `max_time` - maximum `datetime.timedelta` that can pass after the first call for any retry attempt to be done
+ * `max_times` - maximum number of retry attempts (note retries, not tries/calls)
+ * `handle_error` - function that is called for each failed call attempt
+ * `logger` - logger object to record retry warnings with
+ * `sleep` - custom sleep function to use for waiting
+
+ >>> RetryConf(max_time=datetime.timedelta(seconds=30), max_times=10)
+
+ Empty configuration retries indefinitely on any exceptions raised.
+
+ By default `DEFAULT_CONF` if used, which retries indefinitely, waiting 1 sec with 1.2 backoff between attempts, and
+ also logging with built-in logger object.
+
+ Configuration must be cloned before modification to create separate configuration:
+
+ >>> DEFAULT_CONF.clone()
+
+ There are various methods that provide convenient clone-and-modify shortcuts and "retry recipes".
+ """
+
+ _PROPS = {
+ "retriable": lambda e: True,
+ "get_delay": lambda n, raised_after, last: 0,
+ "max_time": None,
+ "max_times": None,
+ "handle_error": None,
+ "logger": None,
+ "sleep": DEFAULT_SLEEP_FUNC,
+ }
+
+ def __init__(self, **kwargs):
+ for prop, default_value in self._PROPS.items():
+ setattr(self, prop, default_value)
+ self._set(**kwargs)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def clone(self, **kwargs):
+ """
+ Clone configuration.
+ """
+
+ obj = copy.copy(self)
+ obj._set(**kwargs)
+ return obj
+
+ def on(self, *errors):
+ """
+ Clone and retry on specific exception types (retriable shortcut):
+
+ >>> conf = conf.on(MyException, MyOtherException)
+ """
+
+ obj = self.clone()
+ obj.retriable = lambda e: isinstance(e, errors)
+ return obj
+
+ def waiting(self, delay=0, backoff=1.0, jitter=0, limit=None):
+ """
+ Clone and wait between attempts with backoff, jitter and limit (get_delay shortcut):
+
+ >>> conf = conf.waiting(delay)
+ >>> conf = conf.waiting(delay, backoff=2.0) # initial delay with backoff x2 on each attempt
+ >>> conf = conf.waiting(delay, jitter=3) # jitter from 0 to 3 seconds
+ >>> conf = conf.waiting(delay, backoff=2.0, limit=60) # delay with backoff, but not greater than a minute
+
+ All these options can be combined together, of course.
+ """
+
+ def get_delay(n, raised_after, last):
+ if n == 1:
+ return delay
+
+ s = last * backoff
+ s += random.uniform(0, jitter)
+ if limit is not None:
+ s = min(s, limit)
+ return s
+
+ obj = self.clone()
+ obj.get_delay = get_delay
+ return obj
+
+ def upto(self, seconds=0, **other_timedelta_kwargs):
+ """
+ Clone and do retry attempts only for some time (max_time shortcut):
+
+ >>> conf = conf.upto(30) # retrying for 30 seconds
+ >>> conf = conf.upto(hours=1, minutes=20) # retrying for 1:20
+
+ Any `datetime.timedelta` kwargs can be used here.
+ """
+
+ obj = self.clone()
+ obj.max_time = datetime.timedelta(seconds=seconds, **other_timedelta_kwargs)
+ return obj
+
+ def upto_retries(self, retries=0):
+ """
+ Set limit for retry attempts number (max_times shortcut):
+
+ >>> conf = conf.upto_retries(10)
+ """
+
+ obj = self.clone()
+ obj.max_times = retries
+ return obj
+
+ def _set(self, **kwargs):
+ for prop, value in kwargs.items():
+ if prop not in self._PROPS:
+ continue
+ setattr(self, prop, value)
+
+
+DEFAULT_CONF = RetryConf(logger=LOGGER).waiting(1, backoff=1.2)
+
+
+def retry_call(f, f_args=(), f_kwargs={}, conf=DEFAULT_CONF, **kwargs):
+ """
+ Retry function call.
+
+ :param f: function to be retried
+ :param f_args: target function args
+ :param f_kwargs: target function kwargs
+ :param conf: configuration
+ """
+
+ if kwargs:
+ conf = conf.clone(**kwargs)
+ return _retry(conf, functools.partial(f, *f_args, **f_kwargs))
+
+
+def retry(conf=DEFAULT_CONF, **kwargs):
+ """
+ Retrying decorator.
+
+ :param conf: configuration
+ """
+
+ if kwargs:
+ conf = conf.clone(**kwargs)
+
+ def decorator(f):
+ @functools.wraps(f)
+ def wrapped(*f_args, **f_kwargs):
+ return _retry(conf, functools.partial(f, *f_args, **f_kwargs))
+
+ return wrapped
+
+ return decorator
+
+
+def retry_intrusive(f):
+ """
+ Retrying method decorator that uses an intrusive conf (obj.retry_conf).
+ """
+
+ @functools.wraps(f)
+ def wrapped(obj, *f_args, **f_kwargs):
+ assert hasattr(obj, "retry_conf"), "Object must have retry_conf attribute for decorator to run"
+ return _retry(obj.retry_conf, functools.partial(f, obj, *f_args, **f_kwargs))
+
+ return wrapped
+
+
+def _retry(conf, f):
+ start = datetime.datetime.now()
+ delay = 0
+ for n in itertools.count(1):
+ try:
+ return f()
+ except Exception as error:
+ raised_after = datetime.datetime.now() - start
+ if conf.handle_error:
+ conf.handle_error(error, n, raised_after)
+ delay = conf.get_delay(n, raised_after, delay)
+ retry_after = raised_after + datetime.timedelta(seconds=delay)
+ retrying = (
+ conf.retriable(error)
+ and (conf.max_times is None or n <= conf.max_times)
+ and (conf.max_time is None or retry_after <= conf.max_time)
+ )
+ if not retrying:
+ raise
+ if delay:
+ conf.sleep(delay)
+ if conf.logger:
+ conf.logger.warning(
+ "Retrying (try %d) after %s (%s + %s sec) on %s: %s",
+ n,
+ retry_after,
+ raised_after,
+ delay,
+ error.__class__.__name__,
+ error,
+ exc_info=True,
+ )
diff --git a/library/python/retry/ya.make b/library/python/retry/ya.make
new file mode 100644
index 0000000000..dcbf5567cd
--- /dev/null
+++ b/library/python/retry/ya.make
@@ -0,0 +1,11 @@
+PY23_LIBRARY()
+
+STYLE_PYTHON()
+
+PY_SRCS(__init__.py)
+
+END()
+
+RECURSE_FOR_TESTS(
+ tests
+)
diff --git a/library/python/testing/yatest_common/ut/test.py b/library/python/testing/yatest_common/ut/test.py
new file mode 100644
index 0000000000..bffdf1e353
--- /dev/null
+++ b/library/python/testing/yatest_common/ut/test.py
@@ -0,0 +1,17 @@
+import os
+import tarfile
+
+import yatest.common
+
+import yalibrary.tools
+
+
+def test_jdk_from_package_equals_jdk_tool_from_yaconf_json():
+ jdk_path = yatest.common.binary_path(os.path.join('build', 'platform', 'java', 'jdk', 'testing'))
+ os.makedirs("extracted")
+ with tarfile.open(os.path.join(jdk_path, "jdk.tar")) as tf:
+ tf.extractall("extracted")
+ jdk_tool_path = yalibrary.tools.toolchain_root('java', None, None)
+ with open(os.path.join("extracted", "release")) as jdk_path_release:
+ with open(os.path.join(jdk_tool_path, "release")) as jdk_tool_path_release:
+ assert jdk_path_release.read() == jdk_tool_path_release.read()
diff --git a/library/python/testing/yatest_common/ut/ya.make b/library/python/testing/yatest_common/ut/ya.make
new file mode 100644
index 0000000000..cc56fa1b6c
--- /dev/null
+++ b/library/python/testing/yatest_common/ut/ya.make
@@ -0,0 +1,17 @@
+PY2TEST()
+
+TEST_SRCS(test.py)
+
+PEERDIR(
+ devtools/ya/yalibrary/tools
+)
+
+DEPENDS(
+ build/platform/java/jdk/testing
+)
+
+REQUIREMENTS(
+ network:full
+)
+
+END()
diff --git a/library/python/testing/yatest_common/ya.make b/library/python/testing/yatest_common/ya.make
index 2f5aa3bf39..9798bb40bb 100644
--- a/library/python/testing/yatest_common/ya.make
+++ b/library/python/testing/yatest_common/ya.make
@@ -36,3 +36,7 @@ IF (NOT CATBOOST_OPENSOURCE)
ENDIF()
END()
+
+RECURSE(
+ ut
+)
diff --git a/library/python/testing/yatest_common/yatest/common/runtime.py b/library/python/testing/yatest_common/yatest/common/runtime.py
index b43666b596..7f6f9df440 100644
--- a/library/python/testing/yatest_common/yatest/common/runtime.py
+++ b/library/python/testing/yatest_common/yatest/common/runtime.py
@@ -158,7 +158,7 @@ def java_path():
"""
from . import runtime_java
- return runtime_java.get_java_path(binary_path(os.path.join('contrib', 'tools', 'jdk')))
+ return runtime_java.get_java_path(binary_path(os.path.join('build', 'platform', 'java', 'jdk', 'testing')))
def java_home():