diff options
author | robot-piglet <[email protected]> | 2025-08-28 14:27:58 +0300 |
---|---|---|
committer | robot-piglet <[email protected]> | 2025-08-28 14:57:06 +0300 |
commit | 81d828c32c8d5477cb2f0ce5da06a1a8d9392ca3 (patch) | |
tree | 3081d566f0d5158d76e9093261344f6406fd09f7 /library/python | |
parent | 77ea11423f959e51795cc3ef36a48d808b4ffb98 (diff) |
Intermediate changes
commit_hash:d5b1af16dbe9030537a04c27eb410c88c2f496cd
Diffstat (limited to 'library/python')
-rw-r--r-- | library/python/archive/__init__.py | 320 | ||||
-rw-r--r-- | library/python/archive/ya.make | 19 | ||||
-rw-r--r-- | library/python/cityhash/cityhash.pyx | 75 | ||||
-rw-r--r-- | library/python/cityhash/hash.cpp | 34 | ||||
-rw-r--r-- | library/python/cityhash/hash.h | 6 | ||||
-rw-r--r-- | library/python/cityhash/ya.make | 16 | ||||
-rw-r--r-- | library/python/codecs/__codecs.pyx | 99 | ||||
-rw-r--r-- | library/python/codecs/__init__.py | 1 | ||||
-rw-r--r-- | library/python/codecs/ya.make | 16 | ||||
-rw-r--r-- | library/python/compress/__init__.py | 147 | ||||
-rw-r--r-- | library/python/compress/ya.make | 16 | ||||
-rw-r--r-- | library/python/json/__init__.py | 44 | ||||
-rw-r--r-- | library/python/json/loads.cpp | 246 | ||||
-rw-r--r-- | library/python/json/loads.h | 5 | ||||
-rw-r--r-- | library/python/json/loads.pyx | 14 | ||||
-rw-r--r-- | library/python/json/ya.make | 17 | ||||
-rw-r--r-- | library/python/par_apply/__init__.py | 114 | ||||
-rw-r--r-- | library/python/par_apply/ya.make | 11 |
18 files changed, 1200 insertions, 0 deletions
diff --git a/library/python/archive/__init__.py b/library/python/archive/__init__.py new file mode 100644 index 00000000000..f907f15d0a6 --- /dev/null +++ b/library/python/archive/__init__.py @@ -0,0 +1,320 @@ +import errno +import logging +import os +import random +import shutil +import stat +import string +import sys + +import six + +import libarchive +import libarchive._libarchive as _libarchive + +from pathlib2 import PurePath + +logger = logging.getLogger(__name__) + +GZIP = "gzip" +ZSTD = "zstd" + +ENCODING = "utf-8" + + +class ConfigureError(Exception): + pass + + +class Level(object): + def __init__(self, level): + self.level = level + + +class Compression(object): + Fast = Level(1) + Default = Level(2) + Best = Level(3) + + +def get_compression_level(filter_name, level): + if level is None or not filter_name: + return None + elif isinstance(level, Level): + level = { + GZIP: { + Compression.Fast: 1, + Compression.Default: 6, + Compression.Best: 9, + }, + ZSTD: { + Compression.Fast: 1, + Compression.Default: 3, + Compression.Best: 22, + }, + }[filter_name][level] + return level + + +def encode(value, encoding): + return value.encode(encoding) + + +def extract_tar( + tar_file_path, output_dir, strip_components=None, fail_on_duplicates=True, apply_mtime=True, entry_filter=None +): + """ + entry_filter: function that takes a libarchive.Entry and returns True if the entry should be extracted + """ + output_dir = encode(output_dir, ENCODING) + _make_dirs(output_dir) + mtime2fix = [] + + with libarchive.Archive(tar_file_path, mode="rb") as tarfile: + for e in tarfile: + if entry_filter and not entry_filter(e): + continue + p = _strip_prefix(e.pathname, strip_components) + if not p: + continue + + dest = os.path.join(output_dir, encode(p, ENCODING)) + if e.pathname.endswith("/") or e.isdir(): + _make_dirs(dest) + if apply_mtime: + mtime2fix.append((dest, e.mtime)) + continue + + if strip_components and fail_on_duplicates: + if os.path.exists(dest): + raise Exception( + "The file {} is duplicated because of strip_components={}".format(dest, strip_components) + ) + + _make_dirs(os.path.dirname(dest)) + if apply_mtime: + mtime2fix.append((dest, e.mtime)) + + if e.ishardlink(): + src = os.path.join(output_dir, _strip_prefix(e.hardlink, strip_components)) + _hardlink(src, dest) + if apply_mtime: + mtime2fix.append((dest, e.mtime)) + continue + + if e.issym(): + src = _strip_prefix(e.linkname, strip_components) + _symlink(src, dest) + if apply_mtime: + if six.PY2: + raise AssertionError( + "You have requested apply_mtime, but it cannot be done with os.utime in python2, it does not support follow_symlinks parameter" + ) + mtime2fix.append((dest, e.mtime)) + continue + + with open(dest, "wb") as f: + if hasattr(os, "fchmod"): + os.fchmod(f.fileno(), e.mode & 0o7777) + libarchive.call_and_check( + _libarchive.archive_read_data_into_fd, + tarfile._a, + tarfile._a, + f.fileno(), + ) + if apply_mtime: + mtime2fix.append((dest, e.mtime)) + + if mtime2fix: + seen = set() + sep = encode(os.sep, ENCODING) + for path, mtime in mtime2fix: + rel = path[len(output_dir) + 1 :] + path = output_dir + for x in rel.split(sep): + path = os.path.join(path, x) + if path in seen: + continue + if six.PY3 and not sys.platform.startswith('win'): + os.utime(path, (mtime, mtime), follow_symlinks=False) + else: + os.utime(path, (mtime, mtime)) + seen.add(path) + + +def _strip_prefix(path, strip_components): + if not strip_components: + return path + p = PurePath(path) + stripped = str(p.relative_to(*p.parts[:strip_components])) + return '' if stripped == '.' else stripped + + +def is_empty(tar_file_path): + with libarchive.Archive(tar_file_path, mode="rb") as tarfile: + for _ in tarfile: + return False + else: + return True + + +def tar( + paths, + output, + compression_filter=None, + compression_level=None, + fixed_mtime=None, + onerror=None, + postprocess=None, + dereference=False, +): + if isinstance(paths, six.string_types): + paths = [paths] + + if isinstance(output, six.string_types): + temp_tar_path, stream = ( + output + "." + "".join(random.sample(string.ascii_lowercase, 8)), + None, + ) + else: + temp_tar_path, stream = None, output + + compression_level = get_compression_level(compression_filter, compression_level) + + try: + if compression_filter: + filter_name = compression_filter + if compression_level is not None: + filter_opts = {"compression-level": str(compression_level)} + else: + filter_opts = {} + # force gzip don't store mtime of the original file being compressed (http://www.gzip.org/zlib/rfc-gzip.html#file-format) + if fixed_mtime is not None and compression_filter == GZIP: + filter_opts["timestamp"] = "" + else: + filter_name = filter_opts = None + + with libarchive.Archive( + stream or temp_tar_path, + mode="wb", + format="gnu", + filter=filter_name, + filter_opts=filter_opts, + fixed_mtime=fixed_mtime, + ) as tarfile: + # determine order if fixed_mtime is specified to produce stable archive + paths = paths if fixed_mtime is None else sorted(paths) + + for p in paths: + if isinstance(p, tuple): + path, arcname = p + else: + path, arcname = p, os.path.basename(p) + + if os.path.isdir(path): + for root, dirs, files in os.walk(path, followlinks=dereference): + if fixed_mtime is None: + entries = dirs + files + else: + entries = sorted(dirs) + sorted(files) + + reldir = os.path.relpath(root, path) + for f in entries: + _writepath( + tarfile, + os.path.join(root, f), + os.path.normpath(os.path.join(arcname, reldir, f)), + onerror, + postprocess, + dereference, + ) + else: + if not os.path.exists(path): + raise OSError("Specified path doesn't exist: {}".format(path)) + _writepath(tarfile, path, arcname, onerror, postprocess, dereference) + + if temp_tar_path: + os.rename(temp_tar_path, output) + except Exception: + if temp_tar_path and os.path.exists(temp_tar_path): + os.remove(temp_tar_path) + raise + + +def _writepath(tarfile, src, dst, onerror, postprocess, dereference): + def tar_writepath(src, dst): + st = os.lstat(src) + if stat.S_ISREG(st.st_mode) or stat.S_ISDIR(st.st_mode) or stat.S_ISLNK(st.st_mode): + if dereference and stat.S_ISLNK(st.st_mode): + src = os.path.realpath(src) + + tarfile.writepath(src, dst) + + if postprocess: + postprocess(src, dst, st.st_mode) + else: + logger.debug("Skipping non-regular file '%s' (stat: %s)", src, st) + + try: + return tar_writepath(src, dst) + except Exception as e: + if isinstance(e, OSError) and e.errno == errno.ENOENT: + logger.debug( + "Skipping missing file '%s' - looks like directory content has changed during archiving", + src, + ) + return + + if onerror: + if onerror(src, dst, sys.exc_info()): + return tar_writepath(src, dst) + else: + raise + + +def check_tar(tar_file_path): + if os.path.isfile(tar_file_path) or os.path.islink(tar_file_path): + return libarchive.is_archive(tar_file_path) + return False + + +def _make_dirs(path): + try: + os.makedirs(path) + except OSError as e: + if e.errno != errno.EEXIST or not os.path.isdir(path): + raise + + +def _hardlink(src, dst): + if hasattr(os, "link"): + os.link(src, dst) + else: + shutil.copyfile(src, dst) + + +def _symlink(src, dst): + if hasattr(os, "symlink"): + os.symlink(src, dst) + else: + # Windows specific case - we cannot copy file right now, + # because it doesn't exist yet (and would be met later in the archive) or symlink is broken. + # Act like tar and tarfile - skip such symlinks + if os.path.exists(src): + shutil.copytree(src, dst) + + +def get_archive_filter_name(filename): + filters = libarchive.get_archive_filter_names(filename) + # https://a.yandex-team.ru/arc/trunk/arcadia/contrib/libs/libarchive/libarchive/archive_read.c?rev=5800047#L522 + assert filters[-1] == "none", filters + if len(filters) == 1: + return None + if len(filters) == 2: + return filters[0] + raise Exception("Archive has chain of filter: {}".format(filters)) + + +def get_archive_filenames(filename): + with libarchive.Archive(filename) as archive: + return [entry.pathname for entry in archive] diff --git a/library/python/archive/ya.make b/library/python/archive/ya.make new file mode 100644 index 00000000000..5b86a45a422 --- /dev/null +++ b/library/python/archive/ya.make @@ -0,0 +1,19 @@ +PY23_LIBRARY() + +STYLE_PYTHON() + +PY_SRCS( + __init__.py +) + +PEERDIR( + contrib/python/pathlib2 + contrib/python/python-libarchive +) + +END() + +RECURSE_FOR_TESTS( + benchmark + test +) diff --git a/library/python/cityhash/cityhash.pyx b/library/python/cityhash/cityhash.pyx new file mode 100644 index 00000000000..6f0046f0d77 --- /dev/null +++ b/library/python/cityhash/cityhash.pyx @@ -0,0 +1,75 @@ +from libcpp.pair cimport pair + +cdef extern from "util/system/types.h": + ctypedef unsigned long ui64 + + +cdef extern from "util/digest/city.h": + ui64 CityHash64(const char* buf, size_t len) nogil + pair[ui64, ui64] CityHash128(const char* buf, size_t len) nogil + ui64 CityHash64WithSeed(const char* buf, size_t len, ui64 seed) nogil + + +cdef extern from "library/python/cityhash/hash.h": + ui64 FileCityHash128WithSeedHigh64(const char* fpath) nogil except+ + ui64 FileCityHash64(const char* fpath) nogil except+ + + +def hash64(content): + cdef const char* s = content + cdef size_t size = len(content) + cdef ui64 res = 0 + + if size > 128: + with nogil: + res = CityHash64(s, size) + else: + res = CityHash64(s, size) + + return res + +def hash128(content): + cdef const char* s = content + cdef size_t size = len(content) + cdef pair[ui64, ui64] res = pair[ui64, ui64](0, 0) + + if size > 128: + with nogil: + res = CityHash128(s, size) + else: + res = CityHash128(s, size) + return res + + +def hash64seed(content, seed): + cdef const char* s = content + cdef size_t size = len(content) + cdef ui64 _seed = seed; + + if size > 128: + with nogil: + res = CityHash64WithSeed(s, size, _seed) + else: + res = CityHash64WithSeed(s, size, _seed) + + return res + + +def filehash64(path): + cdef const char* p = path + cdef ui64 res = 0 + + with nogil: + res = FileCityHash64(p) + + return res + + +def filehash128high64(path): + cdef const char* p = path + cdef ui64 res = 0 + + with nogil: + res = FileCityHash128WithSeedHigh64(p) + + return res diff --git a/library/python/cityhash/hash.cpp b/library/python/cityhash/hash.cpp new file mode 100644 index 00000000000..a060a130289 --- /dev/null +++ b/library/python/cityhash/hash.cpp @@ -0,0 +1,34 @@ +#include "hash.h" + +#include <util/digest/city.h> +#include <util/generic/string.h> +#include <util/memory/blob.h> +#include <util/system/file.h> +#include <util/system/fstat.h> + +void ReadFile(const char* fpath, TBlob& blob) { + TFile f(TString{fpath}, RdOnly | Seq); + const TFileStat fs(f); + auto size = fs.Size; + + if (size < (64 << 10)) { + blob = TBlob::FromFileContent(f, 0, size); + } else { + // Read 1 byte before mapping to detect access problems for encrypted and banned files in arc + TBlob::FromFileContentSingleThreaded(f, 0, 1); + blob = TBlob::FromFile(f); + } +} + +ui64 FileCityHash128WithSeedHigh64(const char* fpath) { + TBlob blob; + ReadFile(fpath, blob); + const uint128 hash = CityHash128WithSeed((const char*)blob.Data(), blob.Size(), uint128(0, blob.Size())); + return Uint128High64(hash); +} + +ui64 FileCityHash64(const char* fpath) { + TBlob blob; + ReadFile(fpath, blob); + return CityHash64(static_cast<const char*>(blob.Data()), blob.Size()); +} diff --git a/library/python/cityhash/hash.h b/library/python/cityhash/hash.h new file mode 100644 index 00000000000..64b22ba74b0 --- /dev/null +++ b/library/python/cityhash/hash.h @@ -0,0 +1,6 @@ +#pragma once + +#include <util/system/defaults.h> + +ui64 FileCityHash128WithSeedHigh64(const char* fpath); +ui64 FileCityHash64(const char* fpath); diff --git a/library/python/cityhash/ya.make b/library/python/cityhash/ya.make new file mode 100644 index 00000000000..7948e19389b --- /dev/null +++ b/library/python/cityhash/ya.make @@ -0,0 +1,16 @@ +PY23_LIBRARY() + +SRCS( + hash.cpp +) + +PY_SRCS( + TOP_LEVEL + cityhash.pyx +) + +END() + +RECURSE_FOR_TESTS( + test +) diff --git a/library/python/codecs/__codecs.pyx b/library/python/codecs/__codecs.pyx new file mode 100644 index 00000000000..de8741e4e84 --- /dev/null +++ b/library/python/codecs/__codecs.pyx @@ -0,0 +1,99 @@ +import six + +from libcpp cimport bool + +from util.generic.string cimport TString, TStringBuf + + +def to_bytes(s): + try: + return s.encode('utf-8') + except AttributeError: + pass + + return s + + +def from_bytes(s): + if six.PY3: + return s.decode('utf-8') + + return s + + +cdef extern from "library/cpp/blockcodecs/codecs.h" namespace "NBlockCodecs": + cdef cppclass ICodec: + void Encode(TStringBuf data, TString& res) nogil except + + void Decode(TStringBuf data, TString& res) nogil except + + + cdef const ICodec* Codec(const TStringBuf& name) except + + cdef TString ListAllCodecsAsString() except + + + +def dumps(name, data): + name = to_bytes(name) + + cdef const ICodec* codec = Codec(TStringBuf(name, len(name))) + cdef TString res + cdef TStringBuf cdata = TStringBuf(data, len(data)) + + with nogil: + codec.Encode(cdata, res) + + return res.c_str()[:res.length()] + + +def loads(name, data): + name = to_bytes(name) + + cdef const ICodec* codec = Codec(TStringBuf(name, len(name))) + cdef TString res + cdef TStringBuf cdata = TStringBuf(data, len(data)) + + with nogil: + codec.Decode(cdata, res) + + return res.c_str()[:res.length()] + + +def get_codec_id(name): + if name == "lz4": + return 6051 + elif name == "snappy": + return 50986 + elif name == "std08_1": + return 55019 + elif name == "std08_3": + return 23308 + elif name == "std08_7": + return 33533 + elif name == "brotli_1": + return 48947 + elif name == "brotli_10": + return 43475 + elif name == "brotli_11": + return 7241 + elif name == "brotli_2": + return 63895 + elif name == "brotli_3": + return 11408 + elif name == "brotli_4": + return 47136 + elif name == "brotli_5": + return 45284 + elif name == "brotli_6": + return 63219 + elif name == "brotli_7": + return 59675 + elif name == "brotli_8": + return 40233 + elif name == "brotli_9": + return 10380 + else: + raise RuntimeError("Unknown code name: " + name) + + +def list_all_codecs(): + cdef TString res = ListAllCodecsAsString() + + return from_bytes(res.c_str()[:res.length()]).split(',') diff --git a/library/python/codecs/__init__.py b/library/python/codecs/__init__.py new file mode 100644 index 00000000000..420885d0551 --- /dev/null +++ b/library/python/codecs/__init__.py @@ -0,0 +1 @@ +from __codecs import loads, dumps, list_all_codecs, get_codec_id # noqa diff --git a/library/python/codecs/ya.make b/library/python/codecs/ya.make new file mode 100644 index 00000000000..f42d115d5d1 --- /dev/null +++ b/library/python/codecs/ya.make @@ -0,0 +1,16 @@ +PY23_LIBRARY() + +PEERDIR( + library/cpp/blockcodecs + contrib/python/six +) + +PY_SRCS( + __init__.py +) + +BUILDWITH_CYTHON_CPP(__codecs.pyx) + +PY_REGISTER(__codecs) + +END() diff --git a/library/python/compress/__init__.py b/library/python/compress/__init__.py new file mode 100644 index 00000000000..380ec47dca8 --- /dev/null +++ b/library/python/compress/__init__.py @@ -0,0 +1,147 @@ +from io import open + +import struct +import json +import os +import logging + +import library.python.par_apply as lpp +import library.python.codecs as lpc + + +logger = logging.getLogger('compress') + + +def list_all_codecs(): + return sorted(frozenset(lpc.list_all_codecs())) + + +def find_codec(ext): + def ext_compress(x): + return lpc.dumps(ext, x) + + def ext_decompress(x): + return lpc.loads(ext, x) + + ext_decompress(ext_compress(b'')) + + return {'c': ext_compress, 'd': ext_decompress, 'n': ext} + + +def codec_for(path): + for ext in reversed(path.split('.')): + try: + return find_codec(ext) + except Exception as e: + logger.debug('in codec_for(): %s', e) + + raise Exception('unsupported file %s' % path) + + +def compress(fr, to, codec=None, fopen=open, threads=1): + if codec: + codec = find_codec(codec) + else: + codec = codec_for(to) + + func = codec['c'] + + def iter_blocks(): + with fopen(fr, 'rb') as f: + while True: + chunk = f.read(16 * 1024 * 1024) + + if chunk: + yield chunk + else: + yield b'' + + return + + def iter_results(): + info = { + 'codec': codec['n'], + } + + if fr: + info['size'] = os.path.getsize(fr) + + yield json.dumps(info, sort_keys=True) + '\n' + + for c in lpp.par_apply(iter_blocks(), func, threads): + yield c + + with fopen(to, 'wb') as f: + for c in iter_results(): + logger.debug('complete %s', len(c)) + f.write(struct.pack('<I', len(c))) + + try: + f.write(c) + except TypeError: + f.write(c.encode('utf-8')) + + +def decompress(fr, to, codec=None, fopen=open, threads=1): + def iter_chunks(): + with fopen(fr, 'rb') as f: + cnt = 0 + + while True: + ll = f.read(4) + + if ll: + ll = struct.unpack('<I', ll)[0] + + if ll: + if ll > 100000000: + raise Exception('broken stream') + + yield f.read(ll) + + cnt += ll + else: + if not cnt: + raise Exception('empty stream') + + return + + it = iter_chunks() + extra = [] + + for chunk in it: + hdr = {} + + try: + hdr = json.loads(chunk) + except Exception as e: + logger.info('can not parse header, suspect old format: %s', e) + extra.append(chunk) + + break + + def resolve_codec(): + if 'codec' in hdr: + return find_codec(hdr['codec']) + + if codec: + return find_codec(codec) + + return codec_for(fr) + + dc = resolve_codec()['d'] + + def iter_all_chunks(): + for x in extra: + yield x + + for x in it: + yield x + + with fopen(to, 'wb') as f: + for c in lpp.par_apply(iter_all_chunks(), dc, threads): + if c: + logger.debug('complete %s', len(c)) + f.write(c) + else: + break diff --git a/library/python/compress/ya.make b/library/python/compress/ya.make new file mode 100644 index 00000000000..bbf2a784e29 --- /dev/null +++ b/library/python/compress/ya.make @@ -0,0 +1,16 @@ +PY23_LIBRARY() + +PEERDIR( + library/python/codecs + library/python/par_apply +) + +PY_SRCS( + __init__.py +) + +END() + +RECURSE_FOR_TESTS( + tests +) diff --git a/library/python/json/__init__.py b/library/python/json/__init__.py new file mode 100644 index 00000000000..c6420d5e6d2 --- /dev/null +++ b/library/python/json/__init__.py @@ -0,0 +1,44 @@ +from library.python.json.loads import loads as _loads +from simplejson import loads as _sj_loads + + +def loads(*args, **kwargs): + try: + return _loads(*args, **kwargs) + except Exception as e: + if 'invalid syntax at token' in str(e): + kwargs.pop('intern_keys', None) + kwargs.pop('intern_vals', None) + kwargs.pop('may_unicode', None) + return _sj_loads(*args, **kwargs) + + raise + + +from simplejson import load, dump, dumps # noqa + + +def read_file(file_name, **kwargs): + """ + Read file and return its parsed json contents. + + All kwargs will be proxied to `json.load` method as is. + + :param file_name: file with json contents + :return: parsed json contents + """ + with open(file_name) as f: + return load(f, **kwargs) + + +def write_file(file_name, contents, **kwargs): + """ + Dump json data to file. + + All kwargs will be proxied to `json.dump` method as is. + + :param file_name: file to dump to + :param contents: JSON-serializable object + """ + with open(file_name, "w") as f: + dump(contents, f, **kwargs) diff --git a/library/python/json/loads.cpp b/library/python/json/loads.cpp new file mode 100644 index 00000000000..19cdb096aef --- /dev/null +++ b/library/python/json/loads.cpp @@ -0,0 +1,246 @@ +#include "loads.h" + +#include <Python.h> + +#include <library/cpp/json/fast_sax/parser.h> + +#include <util/generic/algorithm.h> +#include <util/generic/stack.h> +#include <util/generic/vector.h> +#include <util/generic/ylimits.h> +#include <util/string/ascii.h> + +using namespace NJson; + +namespace { + enum EKind { + Undefined, + Array, + Dict, + Value, + Key, + }; + + static inline TStringBuf ToStr(EKind kind) noexcept { + switch (kind) { + case Undefined: + return TStringBuf("Undefined"); + + case Array: + return TStringBuf("Array"); + + case Dict: + return TStringBuf("Dict"); + + case Value: + return TStringBuf("Value"); + + case Key: + return TStringBuf("Key"); + } + + Y_UNREACHABLE(); + } + + struct TUnref { + static inline void Destroy(PyObject* o) noexcept { + Py_XDECREF(o); + } + }; + + using TObjectPtr = TAutoPtr<PyObject, TUnref>; + + static inline TObjectPtr BuildBool(bool val) noexcept { + if (val) { + Py_RETURN_TRUE; + } + + Py_RETURN_FALSE; + } + + // Translate python exceptions from object-creating functions into c++ exceptions + // Such errors are reported by returning nullptr + // When a python error is set and C++ exception is caught by Cython wrapper, + // Python exception is propagated, while C++ exception is discarded. + PyObject* CheckNewObject(PyObject* obj) { + Y_ENSURE(obj != nullptr, "got python exception"); + return obj; + } + + void CheckRetcode(int retcode) { + Y_ENSURE(retcode == 0, "got python exception"); + } + + static inline TObjectPtr BuildSmall(long val) { +#if PY_VERSION_HEX >= 0x03000000 + return CheckNewObject(PyLong_FromLong(val)); +#else + return CheckNewObject(PyInt_FromLong(val)); +#endif + } + + PyObject* CreatePyString(TStringBuf str, bool intern, bool mayUnicode) { +#if PY_VERSION_HEX >= 0x03000000 + Y_UNUSED(mayUnicode); + PyObject* pyStr = PyUnicode_FromStringAndSize(str.data(), str.size()); + if (intern) { + PyUnicode_InternInPlace(&pyStr); + } +#else + const bool needUnicode = mayUnicode && !AllOf(str, IsAscii); + PyObject* pyStr = needUnicode ? PyUnicode_FromStringAndSize(str.data(), str.size()) + : PyString_FromStringAndSize(str.data(), str.size()); + if (intern && !needUnicode) { + PyString_InternInPlace(&pyStr); + } +#endif + return pyStr; + } + + struct TVal { + EKind Kind = Undefined; + TObjectPtr Val; + + inline TVal() noexcept + : Kind(Undefined) + { + } + + inline TVal(EKind kind, TObjectPtr val) noexcept + : Kind(kind) + , Val(val) + { + } + }; + + static inline TObjectPtr NoneRef() noexcept { + Py_RETURN_NONE; + } + + struct TContext: public TJsonCallbacks { + const bool InternKeys; + const bool InternVals; + const bool MayUnicode; + TStack<TVal, TVector<TVal>> S; + + inline TContext(bool internKeys, bool internVals, bool mayUnicode) + : TJsonCallbacks(true) + , InternKeys(internKeys) + , InternVals(internVals) + , MayUnicode(mayUnicode) + { + S.emplace(); + } + + inline bool Consume(TObjectPtr o) { + auto& t = S.top(); + + if (t.Kind == Array) { + CheckRetcode(PyList_Append(t.Val.Get(), o.Get())); + } else if (t.Kind == Key) { + auto key = S.top().Val; + + S.pop(); + + CheckRetcode(PyDict_SetItem(S.top().Val.Get(), key.Get(), o.Get())); + } else { + t = TVal(Value, o); + } + + return true; + } + + inline TObjectPtr Pop(EKind expect) { + auto res = S.top(); + + S.pop(); + + if (res.Kind != expect) { + ythrow yexception() << "unexpected kind(expect " << ToStr(expect) << ", got " << ToStr(res.Kind) << ")"; + } + + return res.Val; + } + + inline void Push(EKind kind, TObjectPtr object) { + S.push(TVal(kind, object)); + } + + virtual bool OnNull() { + return Consume(NoneRef()); + } + + virtual bool OnBoolean(bool v) { + return Consume(BuildBool(v)); + } + + virtual bool OnInteger(long long v) { + if (v >= (long long)Min<long>()) { + return Consume(BuildSmall((long)v)); + } + + return Consume(CheckNewObject(PyLong_FromLongLong(v))); + } + + virtual bool OnUInteger(unsigned long long v) { + if (v <= (unsigned long long)Max<long>()) { + return Consume(BuildSmall((long)v)); + } + + return Consume(CheckNewObject(PyLong_FromUnsignedLongLong(v))); + } + + virtual bool OnDouble(double v) { + return Consume(CheckNewObject(PyFloat_FromDouble(v))); + } + + virtual bool OnString(const TStringBuf& v) { + return Consume(CheckNewObject(CreatePyString(v, InternVals, MayUnicode))); + } + + virtual bool OnOpenMap() { + Push(Dict, CheckNewObject(PyDict_New())); + + return true; + } + + virtual bool OnCloseMap() { + return Consume(Pop(Dict)); + } + + virtual bool OnMapKey(const TStringBuf& k) { + Push(Key, CheckNewObject(CreatePyString(k, InternKeys, MayUnicode))); + return true; + } + + virtual bool OnOpenArray() { + Push(Array, CheckNewObject(PyList_New(0))); + + return true; + } + + virtual bool OnCloseArray() { + return Consume(Pop(Array)); + } + }; +} + +PyObject* LoadJsonFromString(const char* data, size_t len, bool internKeys, bool internVals, bool mayUnicode) { + TContext ctx(internKeys, internVals, mayUnicode); + + if (!len) { + ythrow yexception() << "parse error: zero length input string"; + } + + if (!NJson::ReadJsonFast(TStringBuf(data, len), &ctx)) { + ythrow yexception() << "parse error"; + } + + auto& s = ctx.S; + + if (!s || s.top().Kind != Value) { + ythrow yexception() << "shit happen"; + } + + return s.top().Val.Release(); +} diff --git a/library/python/json/loads.h b/library/python/json/loads.h new file mode 100644 index 00000000000..62dcdf6f213 --- /dev/null +++ b/library/python/json/loads.h @@ -0,0 +1,5 @@ +#pragma once + +#include <Python.h> + +PyObject* LoadJsonFromString(const char* data, size_t len, bool internKeys = false, bool internVals = false, bool mayUnicode = false); diff --git a/library/python/json/loads.pyx b/library/python/json/loads.pyx new file mode 100644 index 00000000000..82e5c6dce7b --- /dev/null +++ b/library/python/json/loads.pyx @@ -0,0 +1,14 @@ +from libcpp cimport bool + +cdef extern from "library/python/json/loads.h": + object LoadJsonFromString(const char*, size_t, bool internKeys, bool internVals, bool mayUnicode) except + + + +def loads(s, intern_keys = False, intern_vals = False, may_unicode = False): + if isinstance(s, unicode): + s = s.encode('utf-8') + + try: + return LoadJsonFromString(s, len(s), intern_keys, intern_vals, may_unicode) + except Exception as e: + raise ValueError(str(e)) diff --git a/library/python/json/ya.make b/library/python/json/ya.make new file mode 100644 index 00000000000..74a82de9d8f --- /dev/null +++ b/library/python/json/ya.make @@ -0,0 +1,17 @@ +PY23_LIBRARY() + +PEERDIR( + contrib/python/simplejson + library/cpp/json/fast_sax +) + +PY_SRCS( + __init__.py + loads.pyx +) + +SRCS( + loads.cpp +) + +END() diff --git a/library/python/par_apply/__init__.py b/library/python/par_apply/__init__.py new file mode 100644 index 00000000000..19b89ae8431 --- /dev/null +++ b/library/python/par_apply/__init__.py @@ -0,0 +1,114 @@ +import sys +import threading +import six + +from six.moves import queue + + +def par_apply(seq, func, thr_num, join_polling=None): + if thr_num < 2: + for x in seq: + yield func(x) + + return + + in_q = queue.Queue() + out_q = queue.Queue() + + def enumerate_blocks(): + n = 0 + + for b in seq: + yield n, [b] + n += 1 + + yield n, None + + def iter_out(): + n = 0 + d = {} + + while True: + if n in d: + r = d[n] + del d[n] + n += 1 + + yield r + else: + res = out_q.get() + + d[res[0]] = res + + out_iter = iter_out() + + def wait_block(): + for x in out_iter: + return x + + def iter_compressed(): + p = 0 + + for n, b in enumerate_blocks(): + in_q.put((n, b)) + + while n > p + (thr_num * 2): + p, b, c = wait_block() + + if not b: + return + + yield p, c + + while True: + p, b, c = wait_block() + + if not b: + return + + yield p, c + + def proc(): + while True: + data = in_q.get() + + if data is None: + return + + n, b = data + + if b: + try: + res = (func(b[0]), None) + except Exception: + res = (None, sys.exc_info()) + else: + res = (None, None) + + out_q.put((n, b, res)) + + thrs = [threading.Thread(target=proc) for i in range(0, thr_num)] + + for t in thrs: + t.start() + + try: + for p, c in iter_compressed(): + res, err = c + + if err: + six.reraise(*err) + + yield res + finally: + for t in thrs: + in_q.put(None) + + for t in thrs: + if join_polling is not None: + while True: + t.join(join_polling) + if not t.is_alive(): + break + else: + t.join() diff --git a/library/python/par_apply/ya.make b/library/python/par_apply/ya.make new file mode 100644 index 00000000000..b14592ab799 --- /dev/null +++ b/library/python/par_apply/ya.make @@ -0,0 +1,11 @@ +PY23_LIBRARY() + +PEERDIR( + contrib/python/six +) + +PY_SRCS( + __init__.py +) + +END() |