summaryrefslogtreecommitdiffstats
path: root/library/python/compress
diff options
context:
space:
mode:
authorthegeorg <[email protected]>2023-10-03 11:19:48 +0300
committerthegeorg <[email protected]>2023-10-03 11:43:28 +0300
commitcda0c13f23f6b169fb0a49dc504b40a0aaecea09 (patch)
tree26476e92e5af2c856e017afb1df8f8dff42495bf /library/python/compress
parent4854116da9c5e3c95bb8440f2ea997c54b6e1a61 (diff)
Move contrib/tools/jdk to build/platform/java/jdk/testing
Diffstat (limited to 'library/python/compress')
-rw-r--r--library/python/compress/__init__.py147
-rw-r--r--library/python/compress/ya.make16
2 files changed, 163 insertions, 0 deletions
diff --git a/library/python/compress/__init__.py b/library/python/compress/__init__.py
new file mode 100644
index 00000000000..380ec47dca8
--- /dev/null
+++ b/library/python/compress/__init__.py
@@ -0,0 +1,147 @@
+from io import open
+
+import struct
+import json
+import os
+import logging
+
+import library.python.par_apply as lpp
+import library.python.codecs as lpc
+
+
+logger = logging.getLogger('compress')
+
+
+def list_all_codecs():
+ return sorted(frozenset(lpc.list_all_codecs()))
+
+
+def find_codec(ext):
+ def ext_compress(x):
+ return lpc.dumps(ext, x)
+
+ def ext_decompress(x):
+ return lpc.loads(ext, x)
+
+ ext_decompress(ext_compress(b''))
+
+ return {'c': ext_compress, 'd': ext_decompress, 'n': ext}
+
+
+def codec_for(path):
+ for ext in reversed(path.split('.')):
+ try:
+ return find_codec(ext)
+ except Exception as e:
+ logger.debug('in codec_for(): %s', e)
+
+ raise Exception('unsupported file %s' % path)
+
+
+def compress(fr, to, codec=None, fopen=open, threads=1):
+ if codec:
+ codec = find_codec(codec)
+ else:
+ codec = codec_for(to)
+
+ func = codec['c']
+
+ def iter_blocks():
+ with fopen(fr, 'rb') as f:
+ while True:
+ chunk = f.read(16 * 1024 * 1024)
+
+ if chunk:
+ yield chunk
+ else:
+ yield b''
+
+ return
+
+ def iter_results():
+ info = {
+ 'codec': codec['n'],
+ }
+
+ if fr:
+ info['size'] = os.path.getsize(fr)
+
+ yield json.dumps(info, sort_keys=True) + '\n'
+
+ for c in lpp.par_apply(iter_blocks(), func, threads):
+ yield c
+
+ with fopen(to, 'wb') as f:
+ for c in iter_results():
+ logger.debug('complete %s', len(c))
+ f.write(struct.pack('<I', len(c)))
+
+ try:
+ f.write(c)
+ except TypeError:
+ f.write(c.encode('utf-8'))
+
+
+def decompress(fr, to, codec=None, fopen=open, threads=1):
+ def iter_chunks():
+ with fopen(fr, 'rb') as f:
+ cnt = 0
+
+ while True:
+ ll = f.read(4)
+
+ if ll:
+ ll = struct.unpack('<I', ll)[0]
+
+ if ll:
+ if ll > 100000000:
+ raise Exception('broken stream')
+
+ yield f.read(ll)
+
+ cnt += ll
+ else:
+ if not cnt:
+ raise Exception('empty stream')
+
+ return
+
+ it = iter_chunks()
+ extra = []
+
+ for chunk in it:
+ hdr = {}
+
+ try:
+ hdr = json.loads(chunk)
+ except Exception as e:
+ logger.info('can not parse header, suspect old format: %s', e)
+ extra.append(chunk)
+
+ break
+
+ def resolve_codec():
+ if 'codec' in hdr:
+ return find_codec(hdr['codec'])
+
+ if codec:
+ return find_codec(codec)
+
+ return codec_for(fr)
+
+ dc = resolve_codec()['d']
+
+ def iter_all_chunks():
+ for x in extra:
+ yield x
+
+ for x in it:
+ yield x
+
+ with fopen(to, 'wb') as f:
+ for c in lpp.par_apply(iter_all_chunks(), dc, threads):
+ if c:
+ logger.debug('complete %s', len(c))
+ f.write(c)
+ else:
+ break
diff --git a/library/python/compress/ya.make b/library/python/compress/ya.make
new file mode 100644
index 00000000000..bbf2a784e29
--- /dev/null
+++ b/library/python/compress/ya.make
@@ -0,0 +1,16 @@
+PY23_LIBRARY()
+
+PEERDIR(
+ library/python/codecs
+ library/python/par_apply
+)
+
+PY_SRCS(
+ __init__.py
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ tests
+)