diff options
| author | maxim-yurchuk <[email protected]> | 2024-10-09 12:29:46 +0300 |
|---|---|---|
| committer | maxim-yurchuk <[email protected]> | 2024-10-09 13:14:22 +0300 |
| commit | 9731d8a4bb7ee2cc8554eaf133bb85498a4c7d80 (patch) | |
| tree | a8fb3181d5947c0d78cf402aa56e686130179049 /contrib/python/zstandard | |
| parent | a44b779cd359f06c3ebbef4ec98c6b38609d9d85 (diff) | |
publishFullContrib: true for ydb
<HIDDEN_URL>
commit_hash:c82a80ac4594723cebf2c7387dec9c60217f603e
Diffstat (limited to 'contrib/python/zstandard')
| -rw-r--r-- | contrib/python/zstandard/py2/.yandex_meta/yamaker.yaml | 9 | ||||
| -rw-r--r-- | contrib/python/zstandard/py2/patches/01-zstd.patch | 15 | ||||
| -rw-r--r-- | contrib/python/zstandard/py2/zstandard/cffi.py | 2769 | ||||
| -rw-r--r-- | contrib/python/zstandard/py3/.yandex_meta/yamaker.yaml | 8 | ||||
| -rw-r--r-- | contrib/python/zstandard/py3/zstandard/backend_cffi.py | 4479 |
5 files changed, 7280 insertions, 0 deletions
diff --git a/contrib/python/zstandard/py2/.yandex_meta/yamaker.yaml b/contrib/python/zstandard/py2/.yandex_meta/yamaker.yaml new file mode 100644 index 00000000000..de83fb30a2c --- /dev/null +++ b/contrib/python/zstandard/py2/.yandex_meta/yamaker.yaml @@ -0,0 +1,9 @@ +requirements: + - contrib/libs/zstd +copy: + - c-ext/* + - zstd.c +exclude: + - zstd/* +exclude_from_macros: + - zstandard/cffi.py diff --git a/contrib/python/zstandard/py2/patches/01-zstd.patch b/contrib/python/zstandard/py2/patches/01-zstd.patch new file mode 100644 index 00000000000..7fd70117781 --- /dev/null +++ b/contrib/python/zstandard/py2/patches/01-zstd.patch @@ -0,0 +1,15 @@ +--- contrib/python/zstandard/py2/c-ext/constants.c (index) ++++ contrib/python/zstandard/py2/c-ext/constants.c (working tree) +@@ -77,7 +77,6 @@ void constants_module_init(PyObject* mod) { + PyModule_AddIntConstant(mod, "CHAINLOG_MAX", ZSTD_CHAINLOG_MAX); + PyModule_AddIntConstant(mod, "HASHLOG_MIN", ZSTD_HASHLOG_MIN); + PyModule_AddIntConstant(mod, "HASHLOG_MAX", ZSTD_HASHLOG_MAX); +- PyModule_AddIntConstant(mod, "HASHLOG3_MAX", ZSTD_HASHLOG3_MAX); + PyModule_AddIntConstant(mod, "SEARCHLOG_MIN", ZSTD_SEARCHLOG_MIN); + PyModule_AddIntConstant(mod, "SEARCHLOG_MAX", ZSTD_SEARCHLOG_MAX); + PyModule_AddIntConstant(mod, "MINMATCH_MIN", ZSTD_MINMATCH_MIN); +--- contrib/python/zstandard/py2/zstd.c (index) ++++ contrib/python/zstandard/py2/zstd.c (working tree) +@@ -213,1 +213,1 @@ void zstd_module_init(PyObject* m) { +- if (ZSTD_VERSION_NUMBER != 10405 || ZSTD_versionNumber() != 10405) { ++ if (ZSTD_VERSION_NUMBER != 10506 || ZSTD_versionNumber() != 10506) { diff --git a/contrib/python/zstandard/py2/zstandard/cffi.py b/contrib/python/zstandard/py2/zstandard/cffi.py new file mode 100644 index 00000000000..d4b1757db19 --- /dev/null +++ b/contrib/python/zstandard/py2/zstandard/cffi.py @@ -0,0 +1,2769 @@ +# Copyright (c) 2016-present, Gregory Szorc +# All rights reserved. +# +# This software may be modified and distributed under the terms +# of the BSD license. See the LICENSE file for details. + +"""Python interface to the Zstandard (zstd) compression library.""" + +from __future__ import absolute_import, unicode_literals + +# This should match what the C extension exports. +__all__ = [ + #'BufferSegment', + #'BufferSegments', + #'BufferWithSegments', + #'BufferWithSegmentsCollection', + "CompressionParameters", + "ZstdCompressionDict", + "ZstdCompressionParameters", + "ZstdCompressor", + "ZstdError", + "ZstdDecompressor", + "FrameParameters", + "estimate_decompression_context_size", + "frame_content_size", + "frame_header_size", + "get_frame_parameters", + "train_dictionary", + # Constants. + "FLUSH_BLOCK", + "FLUSH_FRAME", + "COMPRESSOBJ_FLUSH_FINISH", + "COMPRESSOBJ_FLUSH_BLOCK", + "ZSTD_VERSION", + "FRAME_HEADER", + "CONTENTSIZE_UNKNOWN", + "CONTENTSIZE_ERROR", + "MAX_COMPRESSION_LEVEL", + "COMPRESSION_RECOMMENDED_INPUT_SIZE", + "COMPRESSION_RECOMMENDED_OUTPUT_SIZE", + "DECOMPRESSION_RECOMMENDED_INPUT_SIZE", + "DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE", + "MAGIC_NUMBER", + "BLOCKSIZELOG_MAX", + "BLOCKSIZE_MAX", + "WINDOWLOG_MIN", + "WINDOWLOG_MAX", + "CHAINLOG_MIN", + "CHAINLOG_MAX", + "HASHLOG_MIN", + "HASHLOG_MAX", + "HASHLOG3_MAX", + "MINMATCH_MIN", + "MINMATCH_MAX", + "SEARCHLOG_MIN", + "SEARCHLOG_MAX", + "SEARCHLENGTH_MIN", + "SEARCHLENGTH_MAX", + "TARGETLENGTH_MIN", + "TARGETLENGTH_MAX", + "LDM_MINMATCH_MIN", + "LDM_MINMATCH_MAX", + "LDM_BUCKETSIZELOG_MAX", + "STRATEGY_FAST", + "STRATEGY_DFAST", + "STRATEGY_GREEDY", + "STRATEGY_LAZY", + "STRATEGY_LAZY2", + "STRATEGY_BTLAZY2", + "STRATEGY_BTOPT", + "STRATEGY_BTULTRA", + "STRATEGY_BTULTRA2", + "DICT_TYPE_AUTO", + "DICT_TYPE_RAWCONTENT", + "DICT_TYPE_FULLDICT", + "FORMAT_ZSTD1", + "FORMAT_ZSTD1_MAGICLESS", +] + +import io +import os +import sys + +from _zstd_cffi import ( + ffi, + lib, +) + +if sys.version_info[0] == 2: + bytes_type = str + int_type = long +else: + bytes_type = bytes + int_type = int + + +COMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_CStreamInSize() +COMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_CStreamOutSize() +DECOMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_DStreamInSize() +DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_DStreamOutSize() + +new_nonzero = ffi.new_allocator(should_clear_after_alloc=False) + + +MAX_COMPRESSION_LEVEL = lib.ZSTD_maxCLevel() +MAGIC_NUMBER = lib.ZSTD_MAGICNUMBER +FRAME_HEADER = b"\x28\xb5\x2f\xfd" +CONTENTSIZE_UNKNOWN = lib.ZSTD_CONTENTSIZE_UNKNOWN +CONTENTSIZE_ERROR = lib.ZSTD_CONTENTSIZE_ERROR +ZSTD_VERSION = ( + lib.ZSTD_VERSION_MAJOR, + lib.ZSTD_VERSION_MINOR, + lib.ZSTD_VERSION_RELEASE, +) + +BLOCKSIZELOG_MAX = lib.ZSTD_BLOCKSIZELOG_MAX +BLOCKSIZE_MAX = lib.ZSTD_BLOCKSIZE_MAX +WINDOWLOG_MIN = lib.ZSTD_WINDOWLOG_MIN +WINDOWLOG_MAX = lib.ZSTD_WINDOWLOG_MAX +CHAINLOG_MIN = lib.ZSTD_CHAINLOG_MIN +CHAINLOG_MAX = lib.ZSTD_CHAINLOG_MAX +HASHLOG_MIN = lib.ZSTD_HASHLOG_MIN +HASHLOG_MAX = lib.ZSTD_HASHLOG_MAX +HASHLOG3_MAX = lib.ZSTD_HASHLOG3_MAX +MINMATCH_MIN = lib.ZSTD_MINMATCH_MIN +MINMATCH_MAX = lib.ZSTD_MINMATCH_MAX +SEARCHLOG_MIN = lib.ZSTD_SEARCHLOG_MIN +SEARCHLOG_MAX = lib.ZSTD_SEARCHLOG_MAX +SEARCHLENGTH_MIN = lib.ZSTD_MINMATCH_MIN +SEARCHLENGTH_MAX = lib.ZSTD_MINMATCH_MAX +TARGETLENGTH_MIN = lib.ZSTD_TARGETLENGTH_MIN +TARGETLENGTH_MAX = lib.ZSTD_TARGETLENGTH_MAX +LDM_MINMATCH_MIN = lib.ZSTD_LDM_MINMATCH_MIN +LDM_MINMATCH_MAX = lib.ZSTD_LDM_MINMATCH_MAX +LDM_BUCKETSIZELOG_MAX = lib.ZSTD_LDM_BUCKETSIZELOG_MAX + +STRATEGY_FAST = lib.ZSTD_fast +STRATEGY_DFAST = lib.ZSTD_dfast +STRATEGY_GREEDY = lib.ZSTD_greedy +STRATEGY_LAZY = lib.ZSTD_lazy +STRATEGY_LAZY2 = lib.ZSTD_lazy2 +STRATEGY_BTLAZY2 = lib.ZSTD_btlazy2 +STRATEGY_BTOPT = lib.ZSTD_btopt +STRATEGY_BTULTRA = lib.ZSTD_btultra +STRATEGY_BTULTRA2 = lib.ZSTD_btultra2 + +DICT_TYPE_AUTO = lib.ZSTD_dct_auto +DICT_TYPE_RAWCONTENT = lib.ZSTD_dct_rawContent +DICT_TYPE_FULLDICT = lib.ZSTD_dct_fullDict + +FORMAT_ZSTD1 = lib.ZSTD_f_zstd1 +FORMAT_ZSTD1_MAGICLESS = lib.ZSTD_f_zstd1_magicless + +FLUSH_BLOCK = 0 +FLUSH_FRAME = 1 + +COMPRESSOBJ_FLUSH_FINISH = 0 +COMPRESSOBJ_FLUSH_BLOCK = 1 + + +def _cpu_count(): + # os.cpu_count() was introducd in Python 3.4. + try: + return os.cpu_count() or 0 + except AttributeError: + pass + + # Linux. + try: + if sys.version_info[0] == 2: + return os.sysconf(b"SC_NPROCESSORS_ONLN") + else: + return os.sysconf("SC_NPROCESSORS_ONLN") + except (AttributeError, ValueError): + pass + + # TODO implement on other platforms. + return 0 + + +class ZstdError(Exception): + pass + + +def _zstd_error(zresult): + # Resolves to bytes on Python 2 and 3. We use the string for formatting + # into error messages, which will be literal unicode. So convert it to + # unicode. + return ffi.string(lib.ZSTD_getErrorName(zresult)).decode("utf-8") + + +def _make_cctx_params(params): + res = lib.ZSTD_createCCtxParams() + if res == ffi.NULL: + raise MemoryError() + + res = ffi.gc(res, lib.ZSTD_freeCCtxParams) + + attrs = [ + (lib.ZSTD_c_format, params.format), + (lib.ZSTD_c_compressionLevel, params.compression_level), + (lib.ZSTD_c_windowLog, params.window_log), + (lib.ZSTD_c_hashLog, params.hash_log), + (lib.ZSTD_c_chainLog, params.chain_log), + (lib.ZSTD_c_searchLog, params.search_log), + (lib.ZSTD_c_minMatch, params.min_match), + (lib.ZSTD_c_targetLength, params.target_length), + (lib.ZSTD_c_strategy, params.compression_strategy), + (lib.ZSTD_c_contentSizeFlag, params.write_content_size), + (lib.ZSTD_c_checksumFlag, params.write_checksum), + (lib.ZSTD_c_dictIDFlag, params.write_dict_id), + (lib.ZSTD_c_nbWorkers, params.threads), + (lib.ZSTD_c_jobSize, params.job_size), + (lib.ZSTD_c_overlapLog, params.overlap_log), + (lib.ZSTD_c_forceMaxWindow, params.force_max_window), + (lib.ZSTD_c_enableLongDistanceMatching, params.enable_ldm), + (lib.ZSTD_c_ldmHashLog, params.ldm_hash_log), + (lib.ZSTD_c_ldmMinMatch, params.ldm_min_match), + (lib.ZSTD_c_ldmBucketSizeLog, params.ldm_bucket_size_log), + (lib.ZSTD_c_ldmHashRateLog, params.ldm_hash_rate_log), + ] + + for param, value in attrs: + _set_compression_parameter(res, param, value) + + return res + + +class ZstdCompressionParameters(object): + @staticmethod + def from_level(level, source_size=0, dict_size=0, **kwargs): + params = lib.ZSTD_getCParams(level, source_size, dict_size) + + args = { + "window_log": "windowLog", + "chain_log": "chainLog", + "hash_log": "hashLog", + "search_log": "searchLog", + "min_match": "minMatch", + "target_length": "targetLength", + "compression_strategy": "strategy", + } + + for arg, attr in args.items(): + if arg not in kwargs: + kwargs[arg] = getattr(params, attr) + + return ZstdCompressionParameters(**kwargs) + + def __init__( + self, + format=0, + compression_level=0, + window_log=0, + hash_log=0, + chain_log=0, + search_log=0, + min_match=0, + target_length=0, + strategy=-1, + compression_strategy=-1, + write_content_size=1, + write_checksum=0, + write_dict_id=0, + job_size=0, + overlap_log=-1, + overlap_size_log=-1, + force_max_window=0, + enable_ldm=0, + ldm_hash_log=0, + ldm_min_match=0, + ldm_bucket_size_log=0, + ldm_hash_rate_log=-1, + ldm_hash_every_log=-1, + threads=0, + ): + + params = lib.ZSTD_createCCtxParams() + if params == ffi.NULL: + raise MemoryError() + + params = ffi.gc(params, lib.ZSTD_freeCCtxParams) + + self._params = params + + if threads < 0: + threads = _cpu_count() + + # We need to set ZSTD_c_nbWorkers before ZSTD_c_jobSize and ZSTD_c_overlapLog + # because setting ZSTD_c_nbWorkers resets the other parameters. + _set_compression_parameter(params, lib.ZSTD_c_nbWorkers, threads) + + _set_compression_parameter(params, lib.ZSTD_c_format, format) + _set_compression_parameter( + params, lib.ZSTD_c_compressionLevel, compression_level + ) + _set_compression_parameter(params, lib.ZSTD_c_windowLog, window_log) + _set_compression_parameter(params, lib.ZSTD_c_hashLog, hash_log) + _set_compression_parameter(params, lib.ZSTD_c_chainLog, chain_log) + _set_compression_parameter(params, lib.ZSTD_c_searchLog, search_log) + _set_compression_parameter(params, lib.ZSTD_c_minMatch, min_match) + _set_compression_parameter( + params, lib.ZSTD_c_targetLength, target_length + ) + + if strategy != -1 and compression_strategy != -1: + raise ValueError( + "cannot specify both compression_strategy and strategy" + ) + + if compression_strategy != -1: + strategy = compression_strategy + elif strategy == -1: + strategy = 0 + + _set_compression_parameter(params, lib.ZSTD_c_strategy, strategy) + _set_compression_parameter( + params, lib.ZSTD_c_contentSizeFlag, write_content_size + ) + _set_compression_parameter( + params, lib.ZSTD_c_checksumFlag, write_checksum + ) + _set_compression_parameter(params, lib.ZSTD_c_dictIDFlag, write_dict_id) + _set_compression_parameter(params, lib.ZSTD_c_jobSize, job_size) + + if overlap_log != -1 and overlap_size_log != -1: + raise ValueError( + "cannot specify both overlap_log and overlap_size_log" + ) + + if overlap_size_log != -1: + overlap_log = overlap_size_log + elif overlap_log == -1: + overlap_log = 0 + + _set_compression_parameter(params, lib.ZSTD_c_overlapLog, overlap_log) + _set_compression_parameter( + params, lib.ZSTD_c_forceMaxWindow, force_max_window + ) + _set_compression_parameter( + params, lib.ZSTD_c_enableLongDistanceMatching, enable_ldm + ) + _set_compression_parameter(params, lib.ZSTD_c_ldmHashLog, ldm_hash_log) + _set_compression_parameter( + params, lib.ZSTD_c_ldmMinMatch, ldm_min_match + ) + _set_compression_parameter( + params, lib.ZSTD_c_ldmBucketSizeLog, ldm_bucket_size_log + ) + + if ldm_hash_rate_log != -1 and ldm_hash_every_log != -1: + raise ValueError( + "cannot specify both ldm_hash_rate_log and ldm_hash_every_log" + ) + + if ldm_hash_every_log != -1: + ldm_hash_rate_log = ldm_hash_every_log + elif ldm_hash_rate_log == -1: + ldm_hash_rate_log = 0 + + _set_compression_parameter( + params, lib.ZSTD_c_ldmHashRateLog, ldm_hash_rate_log + ) + + @property + def format(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_format) + + @property + def compression_level(self): + return _get_compression_parameter( + self._params, lib.ZSTD_c_compressionLevel + ) + + @property + def window_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_windowLog) + + @property + def hash_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_hashLog) + + @property + def chain_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_chainLog) + + @property + def search_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_searchLog) + + @property + def min_match(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_minMatch) + + @property + def target_length(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_targetLength) + + @property + def compression_strategy(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_strategy) + + @property + def write_content_size(self): + return _get_compression_parameter( + self._params, lib.ZSTD_c_contentSizeFlag + ) + + @property + def write_checksum(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_checksumFlag) + + @property + def write_dict_id(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_dictIDFlag) + + @property + def job_size(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_jobSize) + + @property + def overlap_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_overlapLog) + + @property + def overlap_size_log(self): + return self.overlap_log + + @property + def force_max_window(self): + return _get_compression_parameter( + self._params, lib.ZSTD_c_forceMaxWindow + ) + + @property + def enable_ldm(self): + return _get_compression_parameter( + self._params, lib.ZSTD_c_enableLongDistanceMatching + ) + + @property + def ldm_hash_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_ldmHashLog) + + @property + def ldm_min_match(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_ldmMinMatch) + + @property + def ldm_bucket_size_log(self): + return _get_compression_parameter( + self._params, lib.ZSTD_c_ldmBucketSizeLog + ) + + @property + def ldm_hash_rate_log(self): + return _get_compression_parameter( + self._params, lib.ZSTD_c_ldmHashRateLog + ) + + @property + def ldm_hash_every_log(self): + return self.ldm_hash_rate_log + + @property + def threads(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_nbWorkers) + + def estimated_compression_context_size(self): + return lib.ZSTD_estimateCCtxSize_usingCCtxParams(self._params) + + +CompressionParameters = ZstdCompressionParameters + + +def estimate_decompression_context_size(): + return lib.ZSTD_estimateDCtxSize() + + +def _set_compression_parameter(params, param, value): + zresult = lib.ZSTD_CCtxParams_setParameter(params, param, value) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "unable to set compression context parameter: %s" + % _zstd_error(zresult) + ) + + +def _get_compression_parameter(params, param): + result = ffi.new("int *") + + zresult = lib.ZSTD_CCtxParams_getParameter(params, param, result) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "unable to get compression context parameter: %s" + % _zstd_error(zresult) + ) + + return result[0] + + +class ZstdCompressionWriter(object): + def __init__( + self, compressor, writer, source_size, write_size, write_return_read + ): + self._compressor = compressor + self._writer = writer + self._write_size = write_size + self._write_return_read = bool(write_return_read) + self._entered = False + self._closed = False + self._bytes_compressed = 0 + + self._dst_buffer = ffi.new("char[]", write_size) + self._out_buffer = ffi.new("ZSTD_outBuffer *") + self._out_buffer.dst = self._dst_buffer + self._out_buffer.size = len(self._dst_buffer) + self._out_buffer.pos = 0 + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(compressor._cctx, source_size) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error setting source size: %s" % _zstd_error(zresult) + ) + + def __enter__(self): + if self._closed: + raise ValueError("stream is closed") + + if self._entered: + raise ZstdError("cannot __enter__ multiple times") + + self._entered = True + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self._entered = False + + if not exc_type and not exc_value and not exc_tb: + self.close() + + self._compressor = None + + return False + + def memory_size(self): + return lib.ZSTD_sizeof_CCtx(self._compressor._cctx) + + def fileno(self): + f = getattr(self._writer, "fileno", None) + if f: + return f() + else: + raise OSError("fileno not available on underlying writer") + + def close(self): + if self._closed: + return + + try: + self.flush(FLUSH_FRAME) + finally: + self._closed = True + + # Call close() on underlying stream as well. + f = getattr(self._writer, "close", None) + if f: + f() + + @property + def closed(self): + return self._closed + + def isatty(self): + return False + + def readable(self): + return False + + def readline(self, size=-1): + raise io.UnsupportedOperation() + + def readlines(self, hint=-1): + raise io.UnsupportedOperation() + + def seek(self, offset, whence=None): + raise io.UnsupportedOperation() + + def seekable(self): + return False + + def truncate(self, size=None): + raise io.UnsupportedOperation() + + def writable(self): + return True + + def writelines(self, lines): + raise NotImplementedError("writelines() is not yet implemented") + + def read(self, size=-1): + raise io.UnsupportedOperation() + + def readall(self): + raise io.UnsupportedOperation() + + def readinto(self, b): + raise io.UnsupportedOperation() + + def write(self, data): + if self._closed: + raise ValueError("stream is closed") + + total_write = 0 + + data_buffer = ffi.from_buffer(data) + + in_buffer = ffi.new("ZSTD_inBuffer *") + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + out_buffer = self._out_buffer + out_buffer.pos = 0 + + while in_buffer.pos < in_buffer.size: + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, + out_buffer, + in_buffer, + lib.ZSTD_e_continue, + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd compress error: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + self._writer.write( + ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + ) + total_write += out_buffer.pos + self._bytes_compressed += out_buffer.pos + out_buffer.pos = 0 + + if self._write_return_read: + return in_buffer.pos + else: + return total_write + + def flush(self, flush_mode=FLUSH_BLOCK): + if flush_mode == FLUSH_BLOCK: + flush = lib.ZSTD_e_flush + elif flush_mode == FLUSH_FRAME: + flush = lib.ZSTD_e_end + else: + raise ValueError("unknown flush_mode: %r" % flush_mode) + + if self._closed: + raise ValueError("stream is closed") + + total_write = 0 + + out_buffer = self._out_buffer + out_buffer.pos = 0 + + in_buffer = ffi.new("ZSTD_inBuffer *") + in_buffer.src = ffi.NULL + in_buffer.size = 0 + in_buffer.pos = 0 + + while True: + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, out_buffer, in_buffer, flush + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd compress error: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + self._writer.write( + ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + ) + total_write += out_buffer.pos + self._bytes_compressed += out_buffer.pos + out_buffer.pos = 0 + + if not zresult: + break + + return total_write + + def tell(self): + return self._bytes_compressed + + +class ZstdCompressionObj(object): + def compress(self, data): + if self._finished: + raise ZstdError("cannot call compress() after compressor finished") + + data_buffer = ffi.from_buffer(data) + source = ffi.new("ZSTD_inBuffer *") + source.src = data_buffer + source.size = len(data_buffer) + source.pos = 0 + + chunks = [] + + while source.pos < len(data): + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, self._out, source, lib.ZSTD_e_continue + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd compress error: %s" % _zstd_error(zresult) + ) + + if self._out.pos: + chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) + self._out.pos = 0 + + return b"".join(chunks) + + def flush(self, flush_mode=COMPRESSOBJ_FLUSH_FINISH): + if flush_mode not in ( + COMPRESSOBJ_FLUSH_FINISH, + COMPRESSOBJ_FLUSH_BLOCK, + ): + raise ValueError("flush mode not recognized") + + if self._finished: + raise ZstdError("compressor object already finished") + + if flush_mode == COMPRESSOBJ_FLUSH_BLOCK: + z_flush_mode = lib.ZSTD_e_flush + elif flush_mode == COMPRESSOBJ_FLUSH_FINISH: + z_flush_mode = lib.ZSTD_e_end + self._finished = True + else: + raise ZstdError("unhandled flush mode") + + assert self._out.pos == 0 + + in_buffer = ffi.new("ZSTD_inBuffer *") + in_buffer.src = ffi.NULL + in_buffer.size = 0 + in_buffer.pos = 0 + + chunks = [] + + while True: + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, self._out, in_buffer, z_flush_mode + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error ending compression stream: %s" % _zstd_error(zresult) + ) + + if self._out.pos: + chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) + self._out.pos = 0 + + if not zresult: + break + + return b"".join(chunks) + + +class ZstdCompressionChunker(object): + def __init__(self, compressor, chunk_size): + self._compressor = compressor + self._out = ffi.new("ZSTD_outBuffer *") + self._dst_buffer = ffi.new("char[]", chunk_size) + self._out.dst = self._dst_buffer + self._out.size = chunk_size + self._out.pos = 0 + + self._in = ffi.new("ZSTD_inBuffer *") + self._in.src = ffi.NULL + self._in.size = 0 + self._in.pos = 0 + self._finished = False + + def compress(self, data): + if self._finished: + raise ZstdError("cannot call compress() after compression finished") + + if self._in.src != ffi.NULL: + raise ZstdError( + "cannot perform operation before consuming output " + "from previous operation" + ) + + data_buffer = ffi.from_buffer(data) + + if not len(data_buffer): + return + + self._in.src = data_buffer + self._in.size = len(data_buffer) + self._in.pos = 0 + + while self._in.pos < self._in.size: + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, self._out, self._in, lib.ZSTD_e_continue + ) + + if self._in.pos == self._in.size: + self._in.src = ffi.NULL + self._in.size = 0 + self._in.pos = 0 + + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd compress error: %s" % _zstd_error(zresult) + ) + + if self._out.pos == self._out.size: + yield ffi.buffer(self._out.dst, self._out.pos)[:] + self._out.pos = 0 + + def flush(self): + if self._finished: + raise ZstdError("cannot call flush() after compression finished") + + if self._in.src != ffi.NULL: + raise ZstdError( + "cannot call flush() before consuming output from " + "previous operation" + ) + + while True: + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, self._out, self._in, lib.ZSTD_e_flush + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd compress error: %s" % _zstd_error(zresult) + ) + + if self._out.pos: + yield ffi.buffer(self._out.dst, self._out.pos)[:] + self._out.pos = 0 + + if not zresult: + return + + def finish(self): + if self._finished: + raise ZstdError("cannot call finish() after compression finished") + + if self._in.src != ffi.NULL: + raise ZstdError( + "cannot call finish() before consuming output from " + "previous operation" + ) + + while True: + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, self._out, self._in, lib.ZSTD_e_end + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd compress error: %s" % _zstd_error(zresult) + ) + + if self._out.pos: + yield ffi.buffer(self._out.dst, self._out.pos)[:] + self._out.pos = 0 + + if not zresult: + self._finished = True + return + + +class ZstdCompressionReader(object): + def __init__(self, compressor, source, read_size): + self._compressor = compressor + self._source = source + self._read_size = read_size + self._entered = False + self._closed = False + self._bytes_compressed = 0 + self._finished_input = False + self._finished_output = False + + self._in_buffer = ffi.new("ZSTD_inBuffer *") + # Holds a ref so backing bytes in self._in_buffer stay alive. + self._source_buffer = None + + def __enter__(self): + if self._entered: + raise ValueError("cannot __enter__ multiple times") + + self._entered = True + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self._entered = False + self._closed = True + self._source = None + self._compressor = None + + return False + + def readable(self): + return True + + def writable(self): + return False + + def seekable(self): + return False + + def readline(self): + raise io.UnsupportedOperation() + + def readlines(self): + raise io.UnsupportedOperation() + + def write(self, data): + raise OSError("stream is not writable") + + def writelines(self, ignored): + raise OSError("stream is not writable") + + def isatty(self): + return False + + def flush(self): + return None + + def close(self): + self._closed = True + return None + + @property + def closed(self): + return self._closed + + def tell(self): + return self._bytes_compressed + + def readall(self): + chunks = [] + + while True: + chunk = self.read(1048576) + if not chunk: + break + + chunks.append(chunk) + + return b"".join(chunks) + + def __iter__(self): + raise io.UnsupportedOperation() + + def __next__(self): + raise io.UnsupportedOperation() + + next = __next__ + + def _read_input(self): + if self._finished_input: + return + + if hasattr(self._source, "read"): + data = self._source.read(self._read_size) + + if not data: + self._finished_input = True + return + + self._source_buffer = ffi.from_buffer(data) + self._in_buffer.src = self._source_buffer + self._in_buffer.size = len(self._source_buffer) + self._in_buffer.pos = 0 + else: + self._source_buffer = ffi.from_buffer(self._source) + self._in_buffer.src = self._source_buffer + self._in_buffer.size = len(self._source_buffer) + self._in_buffer.pos = 0 + + def _compress_into_buffer(self, out_buffer): + if self._in_buffer.pos >= self._in_buffer.size: + return + + old_pos = out_buffer.pos + + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, + out_buffer, + self._in_buffer, + lib.ZSTD_e_continue, + ) + + self._bytes_compressed += out_buffer.pos - old_pos + + if self._in_buffer.pos == self._in_buffer.size: + self._in_buffer.src = ffi.NULL + self._in_buffer.pos = 0 + self._in_buffer.size = 0 + self._source_buffer = None + + if not hasattr(self._source, "read"): + self._finished_input = True + + if lib.ZSTD_isError(zresult): + raise ZstdError("zstd compress error: %s", _zstd_error(zresult)) + + return out_buffer.pos and out_buffer.pos == out_buffer.size + + def read(self, size=-1): + if self._closed: + raise ValueError("stream is closed") + + if size < -1: + raise ValueError("cannot read negative amounts less than -1") + + if size == -1: + return self.readall() + + if self._finished_output or size == 0: + return b"" + + # Need a dedicated ref to dest buffer otherwise it gets collected. + dst_buffer = ffi.new("char[]", size) + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = dst_buffer + out_buffer.size = size + out_buffer.pos = 0 + + if self._compress_into_buffer(out_buffer): + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + while not self._finished_input: + self._read_input() + + if self._compress_into_buffer(out_buffer): + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + # EOF + old_pos = out_buffer.pos + + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end + ) + + self._bytes_compressed += out_buffer.pos - old_pos + + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error ending compression stream: %s", _zstd_error(zresult) + ) + + if zresult == 0: + self._finished_output = True + + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + def read1(self, size=-1): + if self._closed: + raise ValueError("stream is closed") + + if size < -1: + raise ValueError("cannot read negative amounts less than -1") + + if self._finished_output or size == 0: + return b"" + + # -1 returns arbitrary number of bytes. + if size == -1: + size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE + + dst_buffer = ffi.new("char[]", size) + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = dst_buffer + out_buffer.size = size + out_buffer.pos = 0 + + # read1() dictates that we can perform at most 1 call to the + # underlying stream to get input. However, we can't satisfy this + # restriction with compression because not all input generates output. + # It is possible to perform a block flush in order to ensure output. + # But this may not be desirable behavior. So we allow multiple read() + # to the underlying stream. But unlike read(), we stop once we have + # any output. + + self._compress_into_buffer(out_buffer) + if out_buffer.pos: + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + while not self._finished_input: + self._read_input() + + # If we've filled the output buffer, return immediately. + if self._compress_into_buffer(out_buffer): + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + # If we've populated the output buffer and we're not at EOF, + # also return, as we've satisfied the read1() limits. + if out_buffer.pos and not self._finished_input: + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + # Else if we're at EOS and we have room left in the buffer, + # fall through to below and try to add more data to the output. + + # EOF. + old_pos = out_buffer.pos + + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end + ) + + self._bytes_compressed += out_buffer.pos - old_pos + + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error ending compression stream: %s" % _zstd_error(zresult) + ) + + if zresult == 0: + self._finished_output = True + + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + def readinto(self, b): + if self._closed: + raise ValueError("stream is closed") + + if self._finished_output: + return 0 + + # TODO use writable=True once we require CFFI >= 1.12. + dest_buffer = ffi.from_buffer(b) + ffi.memmove(b, b"", 0) + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = dest_buffer + out_buffer.size = len(dest_buffer) + out_buffer.pos = 0 + + if self._compress_into_buffer(out_buffer): + return out_buffer.pos + + while not self._finished_input: + self._read_input() + if self._compress_into_buffer(out_buffer): + return out_buffer.pos + + # EOF. + old_pos = out_buffer.pos + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end + ) + + self._bytes_compressed += out_buffer.pos - old_pos + + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error ending compression stream: %s", _zstd_error(zresult) + ) + + if zresult == 0: + self._finished_output = True + + return out_buffer.pos + + def readinto1(self, b): + if self._closed: + raise ValueError("stream is closed") + + if self._finished_output: + return 0 + + # TODO use writable=True once we require CFFI >= 1.12. + dest_buffer = ffi.from_buffer(b) + ffi.memmove(b, b"", 0) + + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = dest_buffer + out_buffer.size = len(dest_buffer) + out_buffer.pos = 0 + + self._compress_into_buffer(out_buffer) + if out_buffer.pos: + return out_buffer.pos + + while not self._finished_input: + self._read_input() + + if self._compress_into_buffer(out_buffer): + return out_buffer.pos + + if out_buffer.pos and not self._finished_input: + return out_buffer.pos + + # EOF. + old_pos = out_buffer.pos + + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end + ) + + self._bytes_compressed += out_buffer.pos - old_pos + + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error ending compression stream: %s" % _zstd_error(zresult) + ) + + if zresult == 0: + self._finished_output = True + + return out_buffer.pos + + +class ZstdCompressor(object): + def __init__( + self, + level=3, + dict_data=None, + compression_params=None, + write_checksum=None, + write_content_size=None, + write_dict_id=None, + threads=0, + ): + if level > lib.ZSTD_maxCLevel(): + raise ValueError( + "level must be less than %d" % lib.ZSTD_maxCLevel() + ) + + if threads < 0: + threads = _cpu_count() + + if compression_params and write_checksum is not None: + raise ValueError( + "cannot define compression_params and " "write_checksum" + ) + + if compression_params and write_content_size is not None: + raise ValueError( + "cannot define compression_params and " "write_content_size" + ) + + if compression_params and write_dict_id is not None: + raise ValueError( + "cannot define compression_params and " "write_dict_id" + ) + + if compression_params and threads: + raise ValueError("cannot define compression_params and threads") + + if compression_params: + self._params = _make_cctx_params(compression_params) + else: + if write_dict_id is None: + write_dict_id = True + + params = lib.ZSTD_createCCtxParams() + if params == ffi.NULL: + raise MemoryError() + + self._params = ffi.gc(params, lib.ZSTD_freeCCtxParams) + + _set_compression_parameter( + self._params, lib.ZSTD_c_compressionLevel, level + ) + + _set_compression_parameter( + self._params, + lib.ZSTD_c_contentSizeFlag, + write_content_size if write_content_size is not None else 1, + ) + + _set_compression_parameter( + self._params, + lib.ZSTD_c_checksumFlag, + 1 if write_checksum else 0, + ) + + _set_compression_parameter( + self._params, lib.ZSTD_c_dictIDFlag, 1 if write_dict_id else 0 + ) + + if threads: + _set_compression_parameter( + self._params, lib.ZSTD_c_nbWorkers, threads + ) + + cctx = lib.ZSTD_createCCtx() + if cctx == ffi.NULL: + raise MemoryError() + + self._cctx = cctx + self._dict_data = dict_data + + # We defer setting up garbage collection until after calling + # _setup_cctx() to ensure the memory size estimate is more accurate. + try: + self._setup_cctx() + finally: + self._cctx = ffi.gc( + cctx, lib.ZSTD_freeCCtx, size=lib.ZSTD_sizeof_CCtx(cctx) + ) + + def _setup_cctx(self): + zresult = lib.ZSTD_CCtx_setParametersUsingCCtxParams( + self._cctx, self._params + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "could not set compression parameters: %s" + % _zstd_error(zresult) + ) + + dict_data = self._dict_data + + if dict_data: + if dict_data._cdict: + zresult = lib.ZSTD_CCtx_refCDict(self._cctx, dict_data._cdict) + else: + zresult = lib.ZSTD_CCtx_loadDictionary_advanced( + self._cctx, + dict_data.as_bytes(), + len(dict_data), + lib.ZSTD_dlm_byRef, + dict_data._dict_type, + ) + + if lib.ZSTD_isError(zresult): + raise ZstdError( + "could not load compression dictionary: %s" + % _zstd_error(zresult) + ) + + def memory_size(self): + return lib.ZSTD_sizeof_CCtx(self._cctx) + + def compress(self, data): + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + data_buffer = ffi.from_buffer(data) + + dest_size = lib.ZSTD_compressBound(len(data_buffer)) + out = new_nonzero("char[]", dest_size) + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, len(data_buffer)) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error setting source size: %s" % _zstd_error(zresult) + ) + + out_buffer = ffi.new("ZSTD_outBuffer *") + in_buffer = ffi.new("ZSTD_inBuffer *") + + out_buffer.dst = out + out_buffer.size = dest_size + out_buffer.pos = 0 + + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + zresult = lib.ZSTD_compressStream2( + self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end + ) + + if lib.ZSTD_isError(zresult): + raise ZstdError("cannot compress: %s" % _zstd_error(zresult)) + elif zresult: + raise ZstdError("unexpected partial frame flush") + + return ffi.buffer(out, out_buffer.pos)[:] + + def compressobj(self, size=-1): + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error setting source size: %s" % _zstd_error(zresult) + ) + + cobj = ZstdCompressionObj() + cobj._out = ffi.new("ZSTD_outBuffer *") + cobj._dst_buffer = ffi.new( + "char[]", COMPRESSION_RECOMMENDED_OUTPUT_SIZE + ) + cobj._out.dst = cobj._dst_buffer + cobj._out.size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE + cobj._out.pos = 0 + cobj._compressor = self + cobj._finished = False + + return cobj + + def chunker(self, size=-1, chunk_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error setting source size: %s" % _zstd_error(zresult) + ) + + return ZstdCompressionChunker(self, chunk_size=chunk_size) + + def copy_stream( + self, + ifh, + ofh, + size=-1, + read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, + write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE, + ): + + if not hasattr(ifh, "read"): + raise ValueError("first argument must have a read() method") + if not hasattr(ofh, "write"): + raise ValueError("second argument must have a write() method") + + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error setting source size: %s" % _zstd_error(zresult) + ) + + in_buffer = ffi.new("ZSTD_inBuffer *") + out_buffer = ffi.new("ZSTD_outBuffer *") + + dst_buffer = ffi.new("char[]", write_size) + out_buffer.dst = dst_buffer + out_buffer.size = write_size + out_buffer.pos = 0 + + total_read, total_write = 0, 0 + + while True: + data = ifh.read(read_size) + if not data: + break + + data_buffer = ffi.from_buffer(data) + total_read += len(data_buffer) + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + while in_buffer.pos < in_buffer.size: + zresult = lib.ZSTD_compressStream2( + self._cctx, out_buffer, in_buffer, lib.ZSTD_e_continue + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd compress error: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) + total_write += out_buffer.pos + out_buffer.pos = 0 + + # We've finished reading. Flush the compressor. + while True: + zresult = lib.ZSTD_compressStream2( + self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error ending compression stream: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) + total_write += out_buffer.pos + out_buffer.pos = 0 + + if zresult == 0: + break + + return total_read, total_write + + def stream_reader( + self, source, size=-1, read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE + ): + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + try: + size = len(source) + except Exception: + pass + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error setting source size: %s" % _zstd_error(zresult) + ) + + return ZstdCompressionReader(self, source, read_size) + + def stream_writer( + self, + writer, + size=-1, + write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE, + write_return_read=False, + ): + + if not hasattr(writer, "write"): + raise ValueError("must pass an object with a write() method") + + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + return ZstdCompressionWriter( + self, writer, size, write_size, write_return_read + ) + + write_to = stream_writer + + def read_to_iter( + self, + reader, + size=-1, + read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, + write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE, + ): + if hasattr(reader, "read"): + have_read = True + elif hasattr(reader, "__getitem__"): + have_read = False + buffer_offset = 0 + size = len(reader) + else: + raise ValueError( + "must pass an object with a read() method or " + "conforms to buffer protocol" + ) + + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error setting source size: %s" % _zstd_error(zresult) + ) + + in_buffer = ffi.new("ZSTD_inBuffer *") + out_buffer = ffi.new("ZSTD_outBuffer *") + + in_buffer.src = ffi.NULL + in_buffer.size = 0 + in_buffer.pos = 0 + + dst_buffer = ffi.new("char[]", write_size) + out_buffer.dst = dst_buffer + out_buffer.size = write_size + out_buffer.pos = 0 + + while True: + # We should never have output data sitting around after a previous + # iteration. + assert out_buffer.pos == 0 + + # Collect input data. + if have_read: + read_result = reader.read(read_size) + else: + remaining = len(reader) - buffer_offset + slice_size = min(remaining, read_size) + read_result = reader[buffer_offset : buffer_offset + slice_size] + buffer_offset += slice_size + + # No new input data. Break out of the read loop. + if not read_result: + break + + # Feed all read data into the compressor and emit output until + # exhausted. + read_buffer = ffi.from_buffer(read_result) + in_buffer.src = read_buffer + in_buffer.size = len(read_buffer) + in_buffer.pos = 0 + + while in_buffer.pos < in_buffer.size: + zresult = lib.ZSTD_compressStream2( + self._cctx, out_buffer, in_buffer, lib.ZSTD_e_continue + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd compress error: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + out_buffer.pos = 0 + yield data + + assert out_buffer.pos == 0 + + # And repeat the loop to collect more data. + continue + + # If we get here, input is exhausted. End the stream and emit what + # remains. + while True: + assert out_buffer.pos == 0 + zresult = lib.ZSTD_compressStream2( + self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error ending compression stream: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + out_buffer.pos = 0 + yield data + + if zresult == 0: + break + + read_from = read_to_iter + + def frame_progression(self): + progression = lib.ZSTD_getFrameProgression(self._cctx) + + return progression.ingested, progression.consumed, progression.produced + + +class FrameParameters(object): + def __init__(self, fparams): + self.content_size = fparams.frameContentSize + self.window_size = fparams.windowSize + self.dict_id = fparams.dictID + self.has_checksum = bool(fparams.checksumFlag) + + +def frame_content_size(data): + data_buffer = ffi.from_buffer(data) + + size = lib.ZSTD_getFrameContentSize(data_buffer, len(data_buffer)) + + if size == lib.ZSTD_CONTENTSIZE_ERROR: + raise ZstdError("error when determining content size") + elif size == lib.ZSTD_CONTENTSIZE_UNKNOWN: + return -1 + else: + return size + + +def frame_header_size(data): + data_buffer = ffi.from_buffer(data) + + zresult = lib.ZSTD_frameHeaderSize(data_buffer, len(data_buffer)) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "could not determine frame header size: %s" % _zstd_error(zresult) + ) + + return zresult + + +def get_frame_parameters(data): + params = ffi.new("ZSTD_frameHeader *") + + data_buffer = ffi.from_buffer(data) + zresult = lib.ZSTD_getFrameHeader(params, data_buffer, len(data_buffer)) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "cannot get frame parameters: %s" % _zstd_error(zresult) + ) + + if zresult: + raise ZstdError( + "not enough data for frame parameters; need %d bytes" % zresult + ) + + return FrameParameters(params[0]) + + +class ZstdCompressionDict(object): + def __init__(self, data, dict_type=DICT_TYPE_AUTO, k=0, d=0): + assert isinstance(data, bytes_type) + self._data = data + self.k = k + self.d = d + + if dict_type not in ( + DICT_TYPE_AUTO, + DICT_TYPE_RAWCONTENT, + DICT_TYPE_FULLDICT, + ): + raise ValueError( + "invalid dictionary load mode: %d; must use " + "DICT_TYPE_* constants" + ) + + self._dict_type = dict_type + self._cdict = None + + def __len__(self): + return len(self._data) + + def dict_id(self): + return int_type(lib.ZDICT_getDictID(self._data, len(self._data))) + + def as_bytes(self): + return self._data + + def precompute_compress(self, level=0, compression_params=None): + if level and compression_params: + raise ValueError( + "must only specify one of level or " "compression_params" + ) + + if not level and not compression_params: + raise ValueError("must specify one of level or compression_params") + + if level: + cparams = lib.ZSTD_getCParams(level, 0, len(self._data)) + else: + cparams = ffi.new("ZSTD_compressionParameters") + cparams.chainLog = compression_params.chain_log + cparams.hashLog = compression_params.hash_log + cparams.minMatch = compression_params.min_match + cparams.searchLog = compression_params.search_log + cparams.strategy = compression_params.compression_strategy + cparams.targetLength = compression_params.target_length + cparams.windowLog = compression_params.window_log + + cdict = lib.ZSTD_createCDict_advanced( + self._data, + len(self._data), + lib.ZSTD_dlm_byRef, + self._dict_type, + cparams, + lib.ZSTD_defaultCMem, + ) + if cdict == ffi.NULL: + raise ZstdError("unable to precompute dictionary") + + self._cdict = ffi.gc( + cdict, lib.ZSTD_freeCDict, size=lib.ZSTD_sizeof_CDict(cdict) + ) + + @property + def _ddict(self): + ddict = lib.ZSTD_createDDict_advanced( + self._data, + len(self._data), + lib.ZSTD_dlm_byRef, + self._dict_type, + lib.ZSTD_defaultCMem, + ) + + if ddict == ffi.NULL: + raise ZstdError("could not create decompression dict") + + ddict = ffi.gc( + ddict, lib.ZSTD_freeDDict, size=lib.ZSTD_sizeof_DDict(ddict) + ) + self.__dict__["_ddict"] = ddict + + return ddict + + +def train_dictionary( + dict_size, + samples, + k=0, + d=0, + notifications=0, + dict_id=0, + level=0, + steps=0, + threads=0, +): + if not isinstance(samples, list): + raise TypeError("samples must be a list") + + if threads < 0: + threads = _cpu_count() + + total_size = sum(map(len, samples)) + + samples_buffer = new_nonzero("char[]", total_size) + sample_sizes = new_nonzero("size_t[]", len(samples)) + + offset = 0 + for i, sample in enumerate(samples): + if not isinstance(sample, bytes_type): + raise ValueError("samples must be bytes") + + l = len(sample) + ffi.memmove(samples_buffer + offset, sample, l) + offset += l + sample_sizes[i] = l + + dict_data = new_nonzero("char[]", dict_size) + + dparams = ffi.new("ZDICT_cover_params_t *")[0] + dparams.k = k + dparams.d = d + dparams.steps = steps + dparams.nbThreads = threads + dparams.zParams.notificationLevel = notifications + dparams.zParams.dictID = dict_id + dparams.zParams.compressionLevel = level + + if ( + not dparams.k + and not dparams.d + and not dparams.steps + and not dparams.nbThreads + and not dparams.zParams.notificationLevel + and not dparams.zParams.dictID + and not dparams.zParams.compressionLevel + ): + zresult = lib.ZDICT_trainFromBuffer( + ffi.addressof(dict_data), + dict_size, + ffi.addressof(samples_buffer), + ffi.addressof(sample_sizes, 0), + len(samples), + ) + elif dparams.steps or dparams.nbThreads: + zresult = lib.ZDICT_optimizeTrainFromBuffer_cover( + ffi.addressof(dict_data), + dict_size, + ffi.addressof(samples_buffer), + ffi.addressof(sample_sizes, 0), + len(samples), + ffi.addressof(dparams), + ) + else: + zresult = lib.ZDICT_trainFromBuffer_cover( + ffi.addressof(dict_data), + dict_size, + ffi.addressof(samples_buffer), + ffi.addressof(sample_sizes, 0), + len(samples), + dparams, + ) + + if lib.ZDICT_isError(zresult): + msg = ffi.string(lib.ZDICT_getErrorName(zresult)).decode("utf-8") + raise ZstdError("cannot train dict: %s" % msg) + + return ZstdCompressionDict( + ffi.buffer(dict_data, zresult)[:], + dict_type=DICT_TYPE_FULLDICT, + k=dparams.k, + d=dparams.d, + ) + + +class ZstdDecompressionObj(object): + def __init__(self, decompressor, write_size): + self._decompressor = decompressor + self._write_size = write_size + self._finished = False + + def decompress(self, data): + if self._finished: + raise ZstdError("cannot use a decompressobj multiple times") + + in_buffer = ffi.new("ZSTD_inBuffer *") + out_buffer = ffi.new("ZSTD_outBuffer *") + + data_buffer = ffi.from_buffer(data) + + if len(data_buffer) == 0: + return b"" + + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + dst_buffer = ffi.new("char[]", self._write_size) + out_buffer.dst = dst_buffer + out_buffer.size = len(dst_buffer) + out_buffer.pos = 0 + + chunks = [] + + while True: + zresult = lib.ZSTD_decompressStream( + self._decompressor._dctx, out_buffer, in_buffer + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd decompressor error: %s" % _zstd_error(zresult) + ) + + if zresult == 0: + self._finished = True + self._decompressor = None + + if out_buffer.pos: + chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) + + if zresult == 0 or ( + in_buffer.pos == in_buffer.size and out_buffer.pos == 0 + ): + break + + out_buffer.pos = 0 + + return b"".join(chunks) + + def flush(self, length=0): + pass + + +class ZstdDecompressionReader(object): + def __init__(self, decompressor, source, read_size, read_across_frames): + self._decompressor = decompressor + self._source = source + self._read_size = read_size + self._read_across_frames = bool(read_across_frames) + self._entered = False + self._closed = False + self._bytes_decompressed = 0 + self._finished_input = False + self._finished_output = False + self._in_buffer = ffi.new("ZSTD_inBuffer *") + # Holds a ref to self._in_buffer.src. + self._source_buffer = None + + def __enter__(self): + if self._entered: + raise ValueError("cannot __enter__ multiple times") + + self._entered = True + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self._entered = False + self._closed = True + self._source = None + self._decompressor = None + + return False + + def readable(self): + return True + + def writable(self): + return False + + def seekable(self): + return True + + def readline(self): + raise io.UnsupportedOperation() + + def readlines(self): + raise io.UnsupportedOperation() + + def write(self, data): + raise io.UnsupportedOperation() + + def writelines(self, lines): + raise io.UnsupportedOperation() + + def isatty(self): + return False + + def flush(self): + return None + + def close(self): + self._closed = True + return None + + @property + def closed(self): + return self._closed + + def tell(self): + return self._bytes_decompressed + + def readall(self): + chunks = [] + + while True: + chunk = self.read(1048576) + if not chunk: + break + + chunks.append(chunk) + + return b"".join(chunks) + + def __iter__(self): + raise io.UnsupportedOperation() + + def __next__(self): + raise io.UnsupportedOperation() + + next = __next__ + + def _read_input(self): + # We have data left over in the input buffer. Use it. + if self._in_buffer.pos < self._in_buffer.size: + return + + # All input data exhausted. Nothing to do. + if self._finished_input: + return + + # Else populate the input buffer from our source. + if hasattr(self._source, "read"): + data = self._source.read(self._read_size) + + if not data: + self._finished_input = True + return + + self._source_buffer = ffi.from_buffer(data) + self._in_buffer.src = self._source_buffer + self._in_buffer.size = len(self._source_buffer) + self._in_buffer.pos = 0 + else: + self._source_buffer = ffi.from_buffer(self._source) + self._in_buffer.src = self._source_buffer + self._in_buffer.size = len(self._source_buffer) + self._in_buffer.pos = 0 + + def _decompress_into_buffer(self, out_buffer): + """Decompress available input into an output buffer. + + Returns True if data in output buffer should be emitted. + """ + zresult = lib.ZSTD_decompressStream( + self._decompressor._dctx, out_buffer, self._in_buffer + ) + + if self._in_buffer.pos == self._in_buffer.size: + self._in_buffer.src = ffi.NULL + self._in_buffer.pos = 0 + self._in_buffer.size = 0 + self._source_buffer = None + + if not hasattr(self._source, "read"): + self._finished_input = True + + if lib.ZSTD_isError(zresult): + raise ZstdError("zstd decompress error: %s" % _zstd_error(zresult)) + + # Emit data if there is data AND either: + # a) output buffer is full (read amount is satisfied) + # b) we're at end of a frame and not in frame spanning mode + return out_buffer.pos and ( + out_buffer.pos == out_buffer.size + or zresult == 0 + and not self._read_across_frames + ) + + def read(self, size=-1): + if self._closed: + raise ValueError("stream is closed") + + if size < -1: + raise ValueError("cannot read negative amounts less than -1") + + if size == -1: + # This is recursive. But it gets the job done. + return self.readall() + + if self._finished_output or size == 0: + return b"" + + # We /could/ call into readinto() here. But that introduces more + # overhead. + dst_buffer = ffi.new("char[]", size) + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = dst_buffer + out_buffer.size = size + out_buffer.pos = 0 + + self._read_input() + if self._decompress_into_buffer(out_buffer): + self._bytes_decompressed += out_buffer.pos + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + while not self._finished_input: + self._read_input() + if self._decompress_into_buffer(out_buffer): + self._bytes_decompressed += out_buffer.pos + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + self._bytes_decompressed += out_buffer.pos + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + def readinto(self, b): + if self._closed: + raise ValueError("stream is closed") + + if self._finished_output: + return 0 + + # TODO use writable=True once we require CFFI >= 1.12. + dest_buffer = ffi.from_buffer(b) + ffi.memmove(b, b"", 0) + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = dest_buffer + out_buffer.size = len(dest_buffer) + out_buffer.pos = 0 + + self._read_input() + if self._decompress_into_buffer(out_buffer): + self._bytes_decompressed += out_buffer.pos + return out_buffer.pos + + while not self._finished_input: + self._read_input() + if self._decompress_into_buffer(out_buffer): + self._bytes_decompressed += out_buffer.pos + return out_buffer.pos + + self._bytes_decompressed += out_buffer.pos + return out_buffer.pos + + def read1(self, size=-1): + if self._closed: + raise ValueError("stream is closed") + + if size < -1: + raise ValueError("cannot read negative amounts less than -1") + + if self._finished_output or size == 0: + return b"" + + # -1 returns arbitrary number of bytes. + if size == -1: + size = DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE + + dst_buffer = ffi.new("char[]", size) + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = dst_buffer + out_buffer.size = size + out_buffer.pos = 0 + + # read1() dictates that we can perform at most 1 call to underlying + # stream to get input. However, we can't satisfy this restriction with + # decompression because not all input generates output. So we allow + # multiple read(). But unlike read(), we stop once we have any output. + while not self._finished_input: + self._read_input() + self._decompress_into_buffer(out_buffer) + + if out_buffer.pos: + break + + self._bytes_decompressed += out_buffer.pos + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + def readinto1(self, b): + if self._closed: + raise ValueError("stream is closed") + + if self._finished_output: + return 0 + + # TODO use writable=True once we require CFFI >= 1.12. + dest_buffer = ffi.from_buffer(b) + ffi.memmove(b, b"", 0) + + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = dest_buffer + out_buffer.size = len(dest_buffer) + out_buffer.pos = 0 + + while not self._finished_input and not self._finished_output: + self._read_input() + self._decompress_into_buffer(out_buffer) + + if out_buffer.pos: + break + + self._bytes_decompressed += out_buffer.pos + return out_buffer.pos + + def seek(self, pos, whence=os.SEEK_SET): + if self._closed: + raise ValueError("stream is closed") + + read_amount = 0 + + if whence == os.SEEK_SET: + if pos < 0: + raise ValueError( + "cannot seek to negative position with SEEK_SET" + ) + + if pos < self._bytes_decompressed: + raise ValueError( + "cannot seek zstd decompression stream " "backwards" + ) + + read_amount = pos - self._bytes_decompressed + + elif whence == os.SEEK_CUR: + if pos < 0: + raise ValueError( + "cannot seek zstd decompression stream " "backwards" + ) + + read_amount = pos + elif whence == os.SEEK_END: + raise ValueError( + "zstd decompression streams cannot be seeked " "with SEEK_END" + ) + + while read_amount: + result = self.read( + min(read_amount, DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE) + ) + + if not result: + break + + read_amount -= len(result) + + return self._bytes_decompressed + + +class ZstdDecompressionWriter(object): + def __init__(self, decompressor, writer, write_size, write_return_read): + decompressor._ensure_dctx() + + self._decompressor = decompressor + self._writer = writer + self._write_size = write_size + self._write_return_read = bool(write_return_read) + self._entered = False + self._closed = False + + def __enter__(self): + if self._closed: + raise ValueError("stream is closed") + + if self._entered: + raise ZstdError("cannot __enter__ multiple times") + + self._entered = True + + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self._entered = False + self.close() + + def memory_size(self): + return lib.ZSTD_sizeof_DCtx(self._decompressor._dctx) + + def close(self): + if self._closed: + return + + try: + self.flush() + finally: + self._closed = True + + f = getattr(self._writer, "close", None) + if f: + f() + + @property + def closed(self): + return self._closed + + def fileno(self): + f = getattr(self._writer, "fileno", None) + if f: + return f() + else: + raise OSError("fileno not available on underlying writer") + + def flush(self): + if self._closed: + raise ValueError("stream is closed") + + f = getattr(self._writer, "flush", None) + if f: + return f() + + def isatty(self): + return False + + def readable(self): + return False + + def readline(self, size=-1): + raise io.UnsupportedOperation() + + def readlines(self, hint=-1): + raise io.UnsupportedOperation() + + def seek(self, offset, whence=None): + raise io.UnsupportedOperation() + + def seekable(self): + return False + + def tell(self): + raise io.UnsupportedOperation() + + def truncate(self, size=None): + raise io.UnsupportedOperation() + + def writable(self): + return True + + def writelines(self, lines): + raise io.UnsupportedOperation() + + def read(self, size=-1): + raise io.UnsupportedOperation() + + def readall(self): + raise io.UnsupportedOperation() + + def readinto(self, b): + raise io.UnsupportedOperation() + + def write(self, data): + if self._closed: + raise ValueError("stream is closed") + + total_write = 0 + + in_buffer = ffi.new("ZSTD_inBuffer *") + out_buffer = ffi.new("ZSTD_outBuffer *") + + data_buffer = ffi.from_buffer(data) + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + dst_buffer = ffi.new("char[]", self._write_size) + out_buffer.dst = dst_buffer + out_buffer.size = len(dst_buffer) + out_buffer.pos = 0 + + dctx = self._decompressor._dctx + + while in_buffer.pos < in_buffer.size: + zresult = lib.ZSTD_decompressStream(dctx, out_buffer, in_buffer) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd decompress error: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + self._writer.write( + ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + ) + total_write += out_buffer.pos + out_buffer.pos = 0 + + if self._write_return_read: + return in_buffer.pos + else: + return total_write + + +class ZstdDecompressor(object): + def __init__(self, dict_data=None, max_window_size=0, format=FORMAT_ZSTD1): + self._dict_data = dict_data + self._max_window_size = max_window_size + self._format = format + + dctx = lib.ZSTD_createDCtx() + if dctx == ffi.NULL: + raise MemoryError() + + self._dctx = dctx + + # Defer setting up garbage collection until full state is loaded so + # the memory size is more accurate. + try: + self._ensure_dctx() + finally: + self._dctx = ffi.gc( + dctx, lib.ZSTD_freeDCtx, size=lib.ZSTD_sizeof_DCtx(dctx) + ) + + def memory_size(self): + return lib.ZSTD_sizeof_DCtx(self._dctx) + + def decompress(self, data, max_output_size=0): + self._ensure_dctx() + + data_buffer = ffi.from_buffer(data) + + output_size = lib.ZSTD_getFrameContentSize( + data_buffer, len(data_buffer) + ) + + if output_size == lib.ZSTD_CONTENTSIZE_ERROR: + raise ZstdError("error determining content size from frame header") + elif output_size == 0: + return b"" + elif output_size == lib.ZSTD_CONTENTSIZE_UNKNOWN: + if not max_output_size: + raise ZstdError( + "could not determine content size in frame header" + ) + + result_buffer = ffi.new("char[]", max_output_size) + result_size = max_output_size + output_size = 0 + else: + result_buffer = ffi.new("char[]", output_size) + result_size = output_size + + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = result_buffer + out_buffer.size = result_size + out_buffer.pos = 0 + + in_buffer = ffi.new("ZSTD_inBuffer *") + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) + if lib.ZSTD_isError(zresult): + raise ZstdError("decompression error: %s" % _zstd_error(zresult)) + elif zresult: + raise ZstdError( + "decompression error: did not decompress full frame" + ) + elif output_size and out_buffer.pos != output_size: + raise ZstdError( + "decompression error: decompressed %d bytes; expected %d" + % (zresult, output_size) + ) + + return ffi.buffer(result_buffer, out_buffer.pos)[:] + + def stream_reader( + self, + source, + read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, + read_across_frames=False, + ): + self._ensure_dctx() + return ZstdDecompressionReader( + self, source, read_size, read_across_frames + ) + + def decompressobj(self, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE): + if write_size < 1: + raise ValueError("write_size must be positive") + + self._ensure_dctx() + return ZstdDecompressionObj(self, write_size=write_size) + + def read_to_iter( + self, + reader, + read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, + write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, + skip_bytes=0, + ): + if skip_bytes >= read_size: + raise ValueError("skip_bytes must be smaller than read_size") + + if hasattr(reader, "read"): + have_read = True + elif hasattr(reader, "__getitem__"): + have_read = False + buffer_offset = 0 + size = len(reader) + else: + raise ValueError( + "must pass an object with a read() method or " + "conforms to buffer protocol" + ) + + if skip_bytes: + if have_read: + reader.read(skip_bytes) + else: + if skip_bytes > size: + raise ValueError("skip_bytes larger than first input chunk") + + buffer_offset = skip_bytes + + self._ensure_dctx() + + in_buffer = ffi.new("ZSTD_inBuffer *") + out_buffer = ffi.new("ZSTD_outBuffer *") + + dst_buffer = ffi.new("char[]", write_size) + out_buffer.dst = dst_buffer + out_buffer.size = len(dst_buffer) + out_buffer.pos = 0 + + while True: + assert out_buffer.pos == 0 + + if have_read: + read_result = reader.read(read_size) + else: + remaining = size - buffer_offset + slice_size = min(remaining, read_size) + read_result = reader[buffer_offset : buffer_offset + slice_size] + buffer_offset += slice_size + + # No new input. Break out of read loop. + if not read_result: + break + + # Feed all read data into decompressor and emit output until + # exhausted. + read_buffer = ffi.from_buffer(read_result) + in_buffer.src = read_buffer + in_buffer.size = len(read_buffer) + in_buffer.pos = 0 + + while in_buffer.pos < in_buffer.size: + assert out_buffer.pos == 0 + + zresult = lib.ZSTD_decompressStream( + self._dctx, out_buffer, in_buffer + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd decompress error: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + out_buffer.pos = 0 + yield data + + if zresult == 0: + return + + # Repeat loop to collect more input data. + continue + + # If we get here, input is exhausted. + + read_from = read_to_iter + + def stream_writer( + self, + writer, + write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, + write_return_read=False, + ): + if not hasattr(writer, "write"): + raise ValueError("must pass an object with a write() method") + + return ZstdDecompressionWriter( + self, writer, write_size, write_return_read + ) + + write_to = stream_writer + + def copy_stream( + self, + ifh, + ofh, + read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, + write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, + ): + if not hasattr(ifh, "read"): + raise ValueError("first argument must have a read() method") + if not hasattr(ofh, "write"): + raise ValueError("second argument must have a write() method") + + self._ensure_dctx() + + in_buffer = ffi.new("ZSTD_inBuffer *") + out_buffer = ffi.new("ZSTD_outBuffer *") + + dst_buffer = ffi.new("char[]", write_size) + out_buffer.dst = dst_buffer + out_buffer.size = write_size + out_buffer.pos = 0 + + total_read, total_write = 0, 0 + + # Read all available input. + while True: + data = ifh.read(read_size) + if not data: + break + + data_buffer = ffi.from_buffer(data) + total_read += len(data_buffer) + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + # Flush all read data to output. + while in_buffer.pos < in_buffer.size: + zresult = lib.ZSTD_decompressStream( + self._dctx, out_buffer, in_buffer + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd decompressor error: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) + total_write += out_buffer.pos + out_buffer.pos = 0 + + # Continue loop to keep reading. + + return total_read, total_write + + def decompress_content_dict_chain(self, frames): + if not isinstance(frames, list): + raise TypeError("argument must be a list") + + if not frames: + raise ValueError("empty input chain") + + # First chunk should not be using a dictionary. We handle it specially. + chunk = frames[0] + if not isinstance(chunk, bytes_type): + raise ValueError("chunk 0 must be bytes") + + # All chunks should be zstd frames and should have content size set. + chunk_buffer = ffi.from_buffer(chunk) + params = ffi.new("ZSTD_frameHeader *") + zresult = lib.ZSTD_getFrameHeader( + params, chunk_buffer, len(chunk_buffer) + ) + if lib.ZSTD_isError(zresult): + raise ValueError("chunk 0 is not a valid zstd frame") + elif zresult: + raise ValueError("chunk 0 is too small to contain a zstd frame") + + if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN: + raise ValueError("chunk 0 missing content size in frame") + + self._ensure_dctx(load_dict=False) + + last_buffer = ffi.new("char[]", params.frameContentSize) + + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = last_buffer + out_buffer.size = len(last_buffer) + out_buffer.pos = 0 + + in_buffer = ffi.new("ZSTD_inBuffer *") + in_buffer.src = chunk_buffer + in_buffer.size = len(chunk_buffer) + in_buffer.pos = 0 + + zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "could not decompress chunk 0: %s" % _zstd_error(zresult) + ) + elif zresult: + raise ZstdError("chunk 0 did not decompress full frame") + + # Special case of chain length of 1 + if len(frames) == 1: + return ffi.buffer(last_buffer, len(last_buffer))[:] + + i = 1 + while i < len(frames): + chunk = frames[i] + if not isinstance(chunk, bytes_type): + raise ValueError("chunk %d must be bytes" % i) + + chunk_buffer = ffi.from_buffer(chunk) + zresult = lib.ZSTD_getFrameHeader( + params, chunk_buffer, len(chunk_buffer) + ) + if lib.ZSTD_isError(zresult): + raise ValueError("chunk %d is not a valid zstd frame" % i) + elif zresult: + raise ValueError( + "chunk %d is too small to contain a zstd frame" % i + ) + + if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN: + raise ValueError("chunk %d missing content size in frame" % i) + + dest_buffer = ffi.new("char[]", params.frameContentSize) + + out_buffer.dst = dest_buffer + out_buffer.size = len(dest_buffer) + out_buffer.pos = 0 + + in_buffer.src = chunk_buffer + in_buffer.size = len(chunk_buffer) + in_buffer.pos = 0 + + zresult = lib.ZSTD_decompressStream( + self._dctx, out_buffer, in_buffer + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "could not decompress chunk %d: %s" % _zstd_error(zresult) + ) + elif zresult: + raise ZstdError("chunk %d did not decompress full frame" % i) + + last_buffer = dest_buffer + i += 1 + + return ffi.buffer(last_buffer, len(last_buffer))[:] + + def _ensure_dctx(self, load_dict=True): + lib.ZSTD_DCtx_reset(self._dctx, lib.ZSTD_reset_session_only) + + if self._max_window_size: + zresult = lib.ZSTD_DCtx_setMaxWindowSize( + self._dctx, self._max_window_size + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "unable to set max window size: %s" % _zstd_error(zresult) + ) + + zresult = lib.ZSTD_DCtx_setFormat(self._dctx, self._format) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "unable to set decoding format: %s" % _zstd_error(zresult) + ) + + if self._dict_data and load_dict: + zresult = lib.ZSTD_DCtx_refDDict(self._dctx, self._dict_data._ddict) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "unable to reference prepared dictionary: %s" + % _zstd_error(zresult) + ) diff --git a/contrib/python/zstandard/py3/.yandex_meta/yamaker.yaml b/contrib/python/zstandard/py3/.yandex_meta/yamaker.yaml new file mode 100644 index 00000000000..cda8f312978 --- /dev/null +++ b/contrib/python/zstandard/py3/.yandex_meta/yamaker.yaml @@ -0,0 +1,8 @@ +requirements: + - contrib/libs/zstd +disable_includes: + - zstd.c +exclude_from_macros: + - zstandard/backend_cffi.py +copy: + - c-ext/* diff --git a/contrib/python/zstandard/py3/zstandard/backend_cffi.py b/contrib/python/zstandard/py3/zstandard/backend_cffi.py new file mode 100644 index 00000000000..80d6fc8b012 --- /dev/null +++ b/contrib/python/zstandard/py3/zstandard/backend_cffi.py @@ -0,0 +1,4479 @@ +# Copyright (c) 2016-present, Gregory Szorc +# All rights reserved. +# +# This software may be modified and distributed under the terms +# of the BSD license. See the LICENSE file for details. + +"""Python interface to the Zstandard (zstd) compression library.""" + +from __future__ import absolute_import, unicode_literals + +# This should match what the C extension exports. +__all__ = [ + "BufferSegment", + "BufferSegments", + "BufferWithSegments", + "BufferWithSegmentsCollection", + "ZstdCompressionChunker", + "ZstdCompressionDict", + "ZstdCompressionObj", + "ZstdCompressionParameters", + "ZstdCompressionReader", + "ZstdCompressionWriter", + "ZstdCompressor", + "ZstdDecompressionObj", + "ZstdDecompressionReader", + "ZstdDecompressionWriter", + "ZstdDecompressor", + "ZstdError", + "FrameParameters", + "backend_features", + "estimate_decompression_context_size", + "frame_content_size", + "frame_header_size", + "get_frame_parameters", + "train_dictionary", + # Constants. + "FLUSH_BLOCK", + "FLUSH_FRAME", + "COMPRESSOBJ_FLUSH_FINISH", + "COMPRESSOBJ_FLUSH_BLOCK", + "ZSTD_VERSION", + "FRAME_HEADER", + "CONTENTSIZE_UNKNOWN", + "CONTENTSIZE_ERROR", + "MAX_COMPRESSION_LEVEL", + "COMPRESSION_RECOMMENDED_INPUT_SIZE", + "COMPRESSION_RECOMMENDED_OUTPUT_SIZE", + "DECOMPRESSION_RECOMMENDED_INPUT_SIZE", + "DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE", + "MAGIC_NUMBER", + "BLOCKSIZELOG_MAX", + "BLOCKSIZE_MAX", + "WINDOWLOG_MIN", + "WINDOWLOG_MAX", + "CHAINLOG_MIN", + "CHAINLOG_MAX", + "HASHLOG_MIN", + "HASHLOG_MAX", + "MINMATCH_MIN", + "MINMATCH_MAX", + "SEARCHLOG_MIN", + "SEARCHLOG_MAX", + "SEARCHLENGTH_MIN", + "SEARCHLENGTH_MAX", + "TARGETLENGTH_MIN", + "TARGETLENGTH_MAX", + "LDM_MINMATCH_MIN", + "LDM_MINMATCH_MAX", + "LDM_BUCKETSIZELOG_MAX", + "STRATEGY_FAST", + "STRATEGY_DFAST", + "STRATEGY_GREEDY", + "STRATEGY_LAZY", + "STRATEGY_LAZY2", + "STRATEGY_BTLAZY2", + "STRATEGY_BTOPT", + "STRATEGY_BTULTRA", + "STRATEGY_BTULTRA2", + "DICT_TYPE_AUTO", + "DICT_TYPE_RAWCONTENT", + "DICT_TYPE_FULLDICT", + "FORMAT_ZSTD1", + "FORMAT_ZSTD1_MAGICLESS", +] + +import io +import os + +from ._cffi import ( # type: ignore + ffi, + lib, +) + + +backend_features = set() # type: ignore + +COMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_CStreamInSize() +COMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_CStreamOutSize() +DECOMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_DStreamInSize() +DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_DStreamOutSize() + +new_nonzero = ffi.new_allocator(should_clear_after_alloc=False) + + +MAX_COMPRESSION_LEVEL = lib.ZSTD_maxCLevel() +MAGIC_NUMBER = lib.ZSTD_MAGICNUMBER +FRAME_HEADER = b"\x28\xb5\x2f\xfd" +CONTENTSIZE_UNKNOWN = lib.ZSTD_CONTENTSIZE_UNKNOWN +CONTENTSIZE_ERROR = lib.ZSTD_CONTENTSIZE_ERROR +ZSTD_VERSION = ( + lib.ZSTD_VERSION_MAJOR, + lib.ZSTD_VERSION_MINOR, + lib.ZSTD_VERSION_RELEASE, +) + +BLOCKSIZELOG_MAX = lib.ZSTD_BLOCKSIZELOG_MAX +BLOCKSIZE_MAX = lib.ZSTD_BLOCKSIZE_MAX +WINDOWLOG_MIN = lib.ZSTD_WINDOWLOG_MIN +WINDOWLOG_MAX = lib.ZSTD_WINDOWLOG_MAX +CHAINLOG_MIN = lib.ZSTD_CHAINLOG_MIN +CHAINLOG_MAX = lib.ZSTD_CHAINLOG_MAX +HASHLOG_MIN = lib.ZSTD_HASHLOG_MIN +HASHLOG_MAX = lib.ZSTD_HASHLOG_MAX +MINMATCH_MIN = lib.ZSTD_MINMATCH_MIN +MINMATCH_MAX = lib.ZSTD_MINMATCH_MAX +SEARCHLOG_MIN = lib.ZSTD_SEARCHLOG_MIN +SEARCHLOG_MAX = lib.ZSTD_SEARCHLOG_MAX +SEARCHLENGTH_MIN = lib.ZSTD_MINMATCH_MIN +SEARCHLENGTH_MAX = lib.ZSTD_MINMATCH_MAX +TARGETLENGTH_MIN = lib.ZSTD_TARGETLENGTH_MIN +TARGETLENGTH_MAX = lib.ZSTD_TARGETLENGTH_MAX +LDM_MINMATCH_MIN = lib.ZSTD_LDM_MINMATCH_MIN +LDM_MINMATCH_MAX = lib.ZSTD_LDM_MINMATCH_MAX +LDM_BUCKETSIZELOG_MAX = lib.ZSTD_LDM_BUCKETSIZELOG_MAX + +STRATEGY_FAST = lib.ZSTD_fast +STRATEGY_DFAST = lib.ZSTD_dfast +STRATEGY_GREEDY = lib.ZSTD_greedy +STRATEGY_LAZY = lib.ZSTD_lazy +STRATEGY_LAZY2 = lib.ZSTD_lazy2 +STRATEGY_BTLAZY2 = lib.ZSTD_btlazy2 +STRATEGY_BTOPT = lib.ZSTD_btopt +STRATEGY_BTULTRA = lib.ZSTD_btultra +STRATEGY_BTULTRA2 = lib.ZSTD_btultra2 + +DICT_TYPE_AUTO = lib.ZSTD_dct_auto +DICT_TYPE_RAWCONTENT = lib.ZSTD_dct_rawContent +DICT_TYPE_FULLDICT = lib.ZSTD_dct_fullDict + +FORMAT_ZSTD1 = lib.ZSTD_f_zstd1 +FORMAT_ZSTD1_MAGICLESS = lib.ZSTD_f_zstd1_magicless + +FLUSH_BLOCK = 0 +FLUSH_FRAME = 1 + +COMPRESSOBJ_FLUSH_FINISH = 0 +COMPRESSOBJ_FLUSH_BLOCK = 1 + + +def _cpu_count(): + # os.cpu_count() was introducd in Python 3.4. + try: + return os.cpu_count() or 0 + except AttributeError: + pass + + # Linux. + try: + return os.sysconf("SC_NPROCESSORS_ONLN") + except (AttributeError, ValueError): + pass + + # TODO implement on other platforms. + return 0 + + +class BufferSegment: + """Represents a segment within a ``BufferWithSegments``. + + This type is essentially a reference to N bytes within a + ``BufferWithSegments``. + + The object conforms to the buffer protocol. + """ + + @property + def offset(self): + """The byte offset of this segment within its parent buffer.""" + raise NotImplementedError() + + def __len__(self): + """Obtain the length of the segment, in bytes.""" + raise NotImplementedError() + + def tobytes(self): + """Obtain bytes copy of this segment.""" + raise NotImplementedError() + + +class BufferSegments: + """Represents an array of ``(offset, length)`` integers. + + This type is effectively an index used by :py:class:`BufferWithSegments`. + + The array members are 64-bit unsigned integers using host/native bit order. + + Instances conform to the buffer protocol. + """ + + +class BufferWithSegments: + """A memory buffer containing N discrete items of known lengths. + + This type is essentially a fixed size memory address and an array + of 2-tuples of ``(offset, length)`` 64-bit unsigned native-endian + integers defining the byte offset and length of each segment within + the buffer. + + Instances behave like containers. + + Instances also conform to the buffer protocol. So a reference to the + backing bytes can be obtained via ``memoryview(o)``. A *copy* of the + backing bytes can be obtained via ``.tobytes()``. + + This type exists to facilitate operations against N>1 items without + the overhead of Python object creation and management. Used with + APIs like :py:meth:`ZstdDecompressor.multi_decompress_to_buffer`, it + is possible to decompress many objects in parallel without the GIL + held, leading to even better performance. + """ + + @property + def size(self): + """Total sizein bytes of the backing buffer.""" + raise NotImplementedError() + + def __len__(self): + raise NotImplementedError() + + def __getitem__(self, i): + """Obtains a segment within the buffer. + + The returned object references memory within this buffer. + + :param i: + Integer index of segment to retrieve. + :return: + :py:class:`BufferSegment` + """ + raise NotImplementedError() + + def segments(self): + """Obtain the array of ``(offset, length)`` segments in the buffer. + + :return: + :py:class:`BufferSegments` + """ + raise NotImplementedError() + + def tobytes(self): + """Obtain bytes copy of this instance.""" + raise NotImplementedError() + + +class BufferWithSegmentsCollection: + """A virtual spanning view over multiple BufferWithSegments. + + Instances are constructed from 1 or more :py:class:`BufferWithSegments` + instances. The resulting object behaves like an ordered sequence whose + members are the segments within each ``BufferWithSegments``. + + If the object is composed of 2 ``BufferWithSegments`` instances with the + first having 2 segments and the second have 3 segments, then ``b[0]`` + and ``b[1]`` access segments in the first object and ``b[2]``, ``b[3]``, + and ``b[4]`` access segments from the second. + """ + + def __len__(self): + """The number of segments within all ``BufferWithSegments``.""" + raise NotImplementedError() + + def __getitem__(self, i): + """Obtain the ``BufferSegment`` at an offset.""" + raise NotImplementedError() + + +class ZstdError(Exception): + pass + + +def _zstd_error(zresult): + # Resolves to bytes on Python 2 and 3. We use the string for formatting + # into error messages, which will be literal unicode. So convert it to + # unicode. + return ffi.string(lib.ZSTD_getErrorName(zresult)).decode("utf-8") + + +def _make_cctx_params(params): + res = lib.ZSTD_createCCtxParams() + if res == ffi.NULL: + raise MemoryError() + + res = ffi.gc(res, lib.ZSTD_freeCCtxParams) + + attrs = [ + (lib.ZSTD_c_format, params.format), + (lib.ZSTD_c_compressionLevel, params.compression_level), + (lib.ZSTD_c_windowLog, params.window_log), + (lib.ZSTD_c_hashLog, params.hash_log), + (lib.ZSTD_c_chainLog, params.chain_log), + (lib.ZSTD_c_searchLog, params.search_log), + (lib.ZSTD_c_minMatch, params.min_match), + (lib.ZSTD_c_targetLength, params.target_length), + (lib.ZSTD_c_strategy, params.strategy), + (lib.ZSTD_c_contentSizeFlag, params.write_content_size), + (lib.ZSTD_c_checksumFlag, params.write_checksum), + (lib.ZSTD_c_dictIDFlag, params.write_dict_id), + (lib.ZSTD_c_nbWorkers, params.threads), + (lib.ZSTD_c_jobSize, params.job_size), + (lib.ZSTD_c_overlapLog, params.overlap_log), + (lib.ZSTD_c_forceMaxWindow, params.force_max_window), + (lib.ZSTD_c_enableLongDistanceMatching, params.enable_ldm), + (lib.ZSTD_c_ldmHashLog, params.ldm_hash_log), + (lib.ZSTD_c_ldmMinMatch, params.ldm_min_match), + (lib.ZSTD_c_ldmBucketSizeLog, params.ldm_bucket_size_log), + (lib.ZSTD_c_ldmHashRateLog, params.ldm_hash_rate_log), + ] + + for param, value in attrs: + _set_compression_parameter(res, param, value) + + return res + + +class ZstdCompressionParameters(object): + """Low-level zstd compression parameters. + + This type represents a collection of parameters to control how zstd + compression is performed. + + Instances can be constructed from raw parameters or derived from a + base set of defaults specified from a compression level (recommended) + via :py:meth:`ZstdCompressionParameters.from_level`. + + >>> # Derive compression settings for compression level 7. + >>> params = zstandard.ZstdCompressionParameters.from_level(7) + + >>> # With an input size of 1MB + >>> params = zstandard.ZstdCompressionParameters.from_level(7, source_size=1048576) + + Using ``from_level()``, it is also possible to override individual compression + parameters or to define additional settings that aren't automatically derived. + e.g.: + + >>> params = zstandard.ZstdCompressionParameters.from_level(4, window_log=10) + >>> params = zstandard.ZstdCompressionParameters.from_level(5, threads=4) + + Or you can define low-level compression settings directly: + + >>> params = zstandard.ZstdCompressionParameters(window_log=12, enable_ldm=True) + + Once a ``ZstdCompressionParameters`` instance is obtained, it can be used to + configure a compressor: + + >>> cctx = zstandard.ZstdCompressor(compression_params=params) + + Some of these are very low-level settings. It may help to consult the official + zstandard documentation for their behavior. Look for the ``ZSTD_p_*`` constants + in ``zstd.h`` (https://github.com/facebook/zstd/blob/dev/lib/zstd.h). + """ + + @staticmethod + def from_level(level, source_size=0, dict_size=0, **kwargs): + """Create compression parameters from a compression level. + + :param level: + Integer compression level. + :param source_size: + Integer size in bytes of source to be compressed. + :param dict_size: + Integer size in bytes of compression dictionary to use. + :return: + :py:class:`ZstdCompressionParameters` + """ + params = lib.ZSTD_getCParams(level, source_size, dict_size) + + args = { + "window_log": "windowLog", + "chain_log": "chainLog", + "hash_log": "hashLog", + "search_log": "searchLog", + "min_match": "minMatch", + "target_length": "targetLength", + "strategy": "strategy", + } + + for arg, attr in args.items(): + if arg not in kwargs: + kwargs[arg] = getattr(params, attr) + + return ZstdCompressionParameters(**kwargs) + + def __init__( + self, + format=0, + compression_level=0, + window_log=0, + hash_log=0, + chain_log=0, + search_log=0, + min_match=0, + target_length=0, + strategy=-1, + write_content_size=1, + write_checksum=0, + write_dict_id=0, + job_size=0, + overlap_log=-1, + force_max_window=0, + enable_ldm=0, + ldm_hash_log=0, + ldm_min_match=0, + ldm_bucket_size_log=0, + ldm_hash_rate_log=-1, + threads=0, + ): + params = lib.ZSTD_createCCtxParams() + if params == ffi.NULL: + raise MemoryError() + + params = ffi.gc(params, lib.ZSTD_freeCCtxParams) + + self._params = params + + if threads < 0: + threads = _cpu_count() + + # We need to set ZSTD_c_nbWorkers before ZSTD_c_jobSize and ZSTD_c_overlapLog + # because setting ZSTD_c_nbWorkers resets the other parameters. + _set_compression_parameter(params, lib.ZSTD_c_nbWorkers, threads) + + _set_compression_parameter(params, lib.ZSTD_c_format, format) + _set_compression_parameter( + params, lib.ZSTD_c_compressionLevel, compression_level + ) + _set_compression_parameter(params, lib.ZSTD_c_windowLog, window_log) + _set_compression_parameter(params, lib.ZSTD_c_hashLog, hash_log) + _set_compression_parameter(params, lib.ZSTD_c_chainLog, chain_log) + _set_compression_parameter(params, lib.ZSTD_c_searchLog, search_log) + _set_compression_parameter(params, lib.ZSTD_c_minMatch, min_match) + _set_compression_parameter( + params, lib.ZSTD_c_targetLength, target_length + ) + + if strategy == -1: + strategy = 0 + + _set_compression_parameter(params, lib.ZSTD_c_strategy, strategy) + _set_compression_parameter( + params, lib.ZSTD_c_contentSizeFlag, write_content_size + ) + _set_compression_parameter( + params, lib.ZSTD_c_checksumFlag, write_checksum + ) + _set_compression_parameter(params, lib.ZSTD_c_dictIDFlag, write_dict_id) + _set_compression_parameter(params, lib.ZSTD_c_jobSize, job_size) + + if overlap_log == -1: + overlap_log = 0 + + _set_compression_parameter(params, lib.ZSTD_c_overlapLog, overlap_log) + _set_compression_parameter( + params, lib.ZSTD_c_forceMaxWindow, force_max_window + ) + _set_compression_parameter( + params, lib.ZSTD_c_enableLongDistanceMatching, enable_ldm + ) + _set_compression_parameter(params, lib.ZSTD_c_ldmHashLog, ldm_hash_log) + _set_compression_parameter( + params, lib.ZSTD_c_ldmMinMatch, ldm_min_match + ) + _set_compression_parameter( + params, lib.ZSTD_c_ldmBucketSizeLog, ldm_bucket_size_log + ) + + if ldm_hash_rate_log == -1: + ldm_hash_rate_log = 0 + + _set_compression_parameter( + params, lib.ZSTD_c_ldmHashRateLog, ldm_hash_rate_log + ) + + @property + def format(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_format) + + @property + def compression_level(self): + return _get_compression_parameter( + self._params, lib.ZSTD_c_compressionLevel + ) + + @property + def window_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_windowLog) + + @property + def hash_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_hashLog) + + @property + def chain_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_chainLog) + + @property + def search_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_searchLog) + + @property + def min_match(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_minMatch) + + @property + def target_length(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_targetLength) + + @property + def strategy(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_strategy) + + @property + def write_content_size(self): + return _get_compression_parameter( + self._params, lib.ZSTD_c_contentSizeFlag + ) + + @property + def write_checksum(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_checksumFlag) + + @property + def write_dict_id(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_dictIDFlag) + + @property + def job_size(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_jobSize) + + @property + def overlap_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_overlapLog) + + @property + def force_max_window(self): + return _get_compression_parameter( + self._params, lib.ZSTD_c_forceMaxWindow + ) + + @property + def enable_ldm(self): + return _get_compression_parameter( + self._params, lib.ZSTD_c_enableLongDistanceMatching + ) + + @property + def ldm_hash_log(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_ldmHashLog) + + @property + def ldm_min_match(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_ldmMinMatch) + + @property + def ldm_bucket_size_log(self): + return _get_compression_parameter( + self._params, lib.ZSTD_c_ldmBucketSizeLog + ) + + @property + def ldm_hash_rate_log(self): + return _get_compression_parameter( + self._params, lib.ZSTD_c_ldmHashRateLog + ) + + @property + def threads(self): + return _get_compression_parameter(self._params, lib.ZSTD_c_nbWorkers) + + def estimated_compression_context_size(self): + """Estimated size in bytes needed to compress with these parameters.""" + return lib.ZSTD_estimateCCtxSize_usingCCtxParams(self._params) + + +def estimate_decompression_context_size(): + """Estimate the memory size requirements for a decompressor instance. + + :return: + Integer number of bytes. + """ + return lib.ZSTD_estimateDCtxSize() + + +def _set_compression_parameter(params, param, value): + zresult = lib.ZSTD_CCtxParams_setParameter(params, param, value) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "unable to set compression context parameter: %s" + % _zstd_error(zresult) + ) + + +def _get_compression_parameter(params, param): + result = ffi.new("int *") + + zresult = lib.ZSTD_CCtxParams_getParameter(params, param, result) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "unable to get compression context parameter: %s" + % _zstd_error(zresult) + ) + + return result[0] + + +class ZstdCompressionWriter(object): + """Writable compressing stream wrapper. + + ``ZstdCompressionWriter`` is a write-only stream interface for writing + compressed data to another stream. + + This type conforms to the ``io.RawIOBase`` interface and should be usable + by any type that operates against a *file-object* (``typing.BinaryIO`` + in Python type hinting speak). Only methods that involve writing will do + useful things. + + As data is written to this stream (e.g. via ``write()``), that data + is sent to the compressor. As compressed data becomes available from + the compressor, it is sent to the underlying stream by calling its + ``write()`` method. + + Both ``write()`` and ``flush()`` return the number of bytes written to the + object's ``write()``. In many cases, small inputs do not accumulate enough + data to cause a write and ``write()`` will return ``0``. + + Calling ``close()`` will mark the stream as closed and subsequent I/O + operations will raise ``ValueError`` (per the documented behavior of + ``io.RawIOBase``). ``close()`` will also call ``close()`` on the underlying + stream if such a method exists and the instance was constructed with + ``closefd=True`` + + Instances are obtained by calling :py:meth:`ZstdCompressor.stream_writer`. + + Typically usage is as follows: + + >>> cctx = zstandard.ZstdCompressor(level=10) + >>> compressor = cctx.stream_writer(fh) + >>> compressor.write(b"chunk 0\\n") + >>> compressor.write(b"chunk 1\\n") + >>> compressor.flush() + >>> # Receiver will be able to decode ``chunk 0\\nchunk 1\\n`` at this point. + >>> # Receiver is also expecting more data in the zstd *frame*. + >>> + >>> compressor.write(b"chunk 2\\n") + >>> compressor.flush(zstandard.FLUSH_FRAME) + >>> # Receiver will be able to decode ``chunk 0\\nchunk 1\\nchunk 2``. + >>> # Receiver is expecting no more data, as the zstd frame is closed. + >>> # Any future calls to ``write()`` at this point will construct a new + >>> # zstd frame. + + Instances can be used as context managers. Exiting the context manager is + the equivalent of calling ``close()``, which is equivalent to calling + ``flush(zstandard.FLUSH_FRAME)``: + + >>> cctx = zstandard.ZstdCompressor(level=10) + >>> with cctx.stream_writer(fh) as compressor: + ... compressor.write(b'chunk 0') + ... compressor.write(b'chunk 1') + ... ... + + .. important:: + + If ``flush(FLUSH_FRAME)`` is not called, emitted data doesn't + constitute a full zstd *frame* and consumers of this data may complain + about malformed input. It is recommended to use instances as a context + manager to ensure *frames* are properly finished. + + If the size of the data being fed to this streaming compressor is known, + you can declare it before compression begins: + + >>> cctx = zstandard.ZstdCompressor() + >>> with cctx.stream_writer(fh, size=data_len) as compressor: + ... compressor.write(chunk0) + ... compressor.write(chunk1) + ... ... + + Declaring the size of the source data allows compression parameters to + be tuned. And if ``write_content_size`` is used, it also results in the + content size being written into the frame header of the output data. + + The size of chunks being ``write()`` to the destination can be specified: + + >>> cctx = zstandard.ZstdCompressor() + >>> with cctx.stream_writer(fh, write_size=32768) as compressor: + ... ... + + To see how much memory is being used by the streaming compressor: + + >>> cctx = zstandard.ZstdCompressor() + >>> with cctx.stream_writer(fh) as compressor: + ... ... + ... byte_size = compressor.memory_size() + + Thte total number of bytes written so far are exposed via ``tell()``: + + >>> cctx = zstandard.ZstdCompressor() + >>> with cctx.stream_writer(fh) as compressor: + ... ... + ... total_written = compressor.tell() + + ``stream_writer()`` accepts a ``write_return_read`` boolean argument to + control the return value of ``write()``. When ``False`` (the default), + ``write()`` returns the number of bytes that were ``write()``'en to the + underlying object. When ``True``, ``write()`` returns the number of bytes + read from the input that were subsequently written to the compressor. + ``True`` is the *proper* behavior for ``write()`` as specified by the + ``io.RawIOBase`` interface and will become the default value in a future + release. + """ + + def __init__( + self, + compressor, + writer, + source_size, + write_size, + write_return_read, + closefd=True, + ): + self._compressor = compressor + self._writer = writer + self._write_size = write_size + self._write_return_read = bool(write_return_read) + self._closefd = bool(closefd) + self._entered = False + self._closing = False + self._closed = False + self._bytes_compressed = 0 + + self._dst_buffer = ffi.new("char[]", write_size) + self._out_buffer = ffi.new("ZSTD_outBuffer *") + self._out_buffer.dst = self._dst_buffer + self._out_buffer.size = len(self._dst_buffer) + self._out_buffer.pos = 0 + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(compressor._cctx, source_size) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error setting source size: %s" % _zstd_error(zresult) + ) + + def __enter__(self): + if self._closed: + raise ValueError("stream is closed") + + if self._entered: + raise ZstdError("cannot __enter__ multiple times") + + self._entered = True + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self._entered = False + self.close() + self._compressor = None + + return False + + def __iter__(self): + raise io.UnsupportedOperation() + + def __next__(self): + raise io.UnsupportedOperation() + + def memory_size(self): + return lib.ZSTD_sizeof_CCtx(self._compressor._cctx) + + def fileno(self): + f = getattr(self._writer, "fileno", None) + if f: + return f() + else: + raise OSError("fileno not available on underlying writer") + + def close(self): + if self._closed: + return + + try: + self._closing = True + self.flush(FLUSH_FRAME) + finally: + self._closing = False + self._closed = True + + # Call close() on underlying stream as well. + f = getattr(self._writer, "close", None) + if self._closefd and f: + f() + + @property + def closed(self): + return self._closed + + def isatty(self): + return False + + def readable(self): + return False + + def readline(self, size=-1): + raise io.UnsupportedOperation() + + def readlines(self, hint=-1): + raise io.UnsupportedOperation() + + def seek(self, offset, whence=None): + raise io.UnsupportedOperation() + + def seekable(self): + return False + + def truncate(self, size=None): + raise io.UnsupportedOperation() + + def writable(self): + return True + + def writelines(self, lines): + raise NotImplementedError("writelines() is not yet implemented") + + def read(self, size=-1): + raise io.UnsupportedOperation() + + def readall(self): + raise io.UnsupportedOperation() + + def readinto(self, b): + raise io.UnsupportedOperation() + + def write(self, data): + """Send data to the compressor and possibly to the inner stream.""" + if self._closed: + raise ValueError("stream is closed") + + total_write = 0 + + data_buffer = ffi.from_buffer(data) + + in_buffer = ffi.new("ZSTD_inBuffer *") + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + out_buffer = self._out_buffer + out_buffer.pos = 0 + + while in_buffer.pos < in_buffer.size: + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, + out_buffer, + in_buffer, + lib.ZSTD_e_continue, + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd compress error: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + self._writer.write( + ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + ) + total_write += out_buffer.pos + self._bytes_compressed += out_buffer.pos + out_buffer.pos = 0 + + if self._write_return_read: + return in_buffer.pos + else: + return total_write + + def flush(self, flush_mode=FLUSH_BLOCK): + """Evict data from compressor's internal state and write it to inner stream. + + Calling this method may result in 0 or more ``write()`` calls to the + inner stream. + + This method will also call ``flush()`` on the inner stream, if such a + method exists. + + :param flush_mode: + How to flush the zstd compressor. + + ``zstandard.FLUSH_BLOCK`` will flush data already sent to the + compressor but not emitted to the inner stream. The stream is still + writable after calling this. This is the default behavior. + + See documentation for other ``zstandard.FLUSH_*`` constants for more + flushing options. + :return: + Integer number of bytes written to the inner stream. + """ + + if flush_mode == FLUSH_BLOCK: + flush = lib.ZSTD_e_flush + elif flush_mode == FLUSH_FRAME: + flush = lib.ZSTD_e_end + else: + raise ValueError("unknown flush_mode: %r" % flush_mode) + + if self._closed: + raise ValueError("stream is closed") + + total_write = 0 + + out_buffer = self._out_buffer + out_buffer.pos = 0 + + in_buffer = ffi.new("ZSTD_inBuffer *") + in_buffer.src = ffi.NULL + in_buffer.size = 0 + in_buffer.pos = 0 + + while True: + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, out_buffer, in_buffer, flush + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd compress error: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + self._writer.write( + ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + ) + total_write += out_buffer.pos + self._bytes_compressed += out_buffer.pos + out_buffer.pos = 0 + + if not zresult: + break + + f = getattr(self._writer, "flush", None) + if f and not self._closing: + f() + + return total_write + + def tell(self): + return self._bytes_compressed + + +class ZstdCompressionObj(object): + """A compressor conforming to the API in Python's standard library. + + This type implements an API similar to compression types in Python's + standard library such as ``zlib.compressobj`` and ``bz2.BZ2Compressor``. + This enables existing code targeting the standard library API to swap + in this type to achieve zstd compression. + + .. important:: + + The design of this API is not ideal for optimal performance. + + The reason performance is not optimal is because the API is limited to + returning a single buffer holding compressed data. When compressing + data, we don't know how much data will be emitted. So in order to + capture all this data in a single buffer, we need to perform buffer + reallocations and/or extra memory copies. This can add significant + overhead depending on the size or nature of the compressed data how + much your application calls this type. + + If performance is critical, consider an API like + :py:meth:`ZstdCompressor.stream_reader`, + :py:meth:`ZstdCompressor.stream_writer`, + :py:meth:`ZstdCompressor.chunker`, or + :py:meth:`ZstdCompressor.read_to_iter`, which result in less overhead + managing buffers. + + Instances are obtained by calling :py:meth:`ZstdCompressor.compressobj`. + + Here is how this API should be used: + + >>> cctx = zstandard.ZstdCompressor() + >>> cobj = cctx.compressobj() + >>> data = cobj.compress(b"raw input 0") + >>> data = cobj.compress(b"raw input 1") + >>> data = cobj.flush() + + Or to flush blocks: + + >>> cctx.zstandard.ZstdCompressor() + >>> cobj = cctx.compressobj() + >>> data = cobj.compress(b"chunk in first block") + >>> data = cobj.flush(zstandard.COMPRESSOBJ_FLUSH_BLOCK) + >>> data = cobj.compress(b"chunk in second block") + >>> data = cobj.flush() + + For best performance results, keep input chunks under 256KB. This avoids + extra allocations for a large output object. + + It is possible to declare the input size of the data that will be fed + into the compressor: + + >>> cctx = zstandard.ZstdCompressor() + >>> cobj = cctx.compressobj(size=6) + >>> data = cobj.compress(b"foobar") + >>> data = cobj.flush() + """ + + def __init__( + self, compressor, write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE + ): + self._compressor = compressor + self._out = ffi.new("ZSTD_outBuffer *") + self._dst_buffer = ffi.new("char[]", write_size) + self._out.dst = self._dst_buffer + self._out.size = write_size + self._out.pos = 0 + self._finished = False + + def compress(self, data): + """Send data to the compressor. + + This method receives bytes to feed to the compressor and returns + bytes constituting zstd compressed data. + + The zstd compressor accumulates bytes and the returned bytes may be + substantially smaller or larger than the size of the input data on + any given call. The returned value may be the empty byte string + (``b""``). + + :param data: + Data to write to the compressor. + :return: + Compressed data. + """ + if self._finished: + raise ZstdError("cannot call compress() after compressor finished") + + data_buffer = ffi.from_buffer(data) + source = ffi.new("ZSTD_inBuffer *") + source.src = data_buffer + source.size = len(data_buffer) + source.pos = 0 + + chunks = [] + + while source.pos < len(data): + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, self._out, source, lib.ZSTD_e_continue + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd compress error: %s" % _zstd_error(zresult) + ) + + if self._out.pos: + chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) + self._out.pos = 0 + + return b"".join(chunks) + + def flush(self, flush_mode=COMPRESSOBJ_FLUSH_FINISH): + """Emit data accumulated in the compressor that hasn't been outputted yet. + + The ``flush_mode`` argument controls how to end the stream. + + ``zstandard.COMPRESSOBJ_FLUSH_FINISH`` (the default) ends the + compression stream and finishes a zstd frame. Once this type of flush + is performed, ``compress()`` and ``flush()`` can no longer be called. + This type of flush **must** be called to end the compression context. If + not called, the emitted data may be incomplete and may not be readable + by a decompressor. + + ``zstandard.COMPRESSOBJ_FLUSH_BLOCK`` will flush a zstd block. This + ensures that all data fed to this instance will have been omitted and + can be decoded by a decompressor. Flushes of this type can be performed + multiple times. The next call to ``compress()`` will begin a new zstd + block. + + :param flush_mode: + How to flush the zstd compressor. + :return: + Compressed data. + """ + if flush_mode not in ( + COMPRESSOBJ_FLUSH_FINISH, + COMPRESSOBJ_FLUSH_BLOCK, + ): + raise ValueError("flush mode not recognized") + + if self._finished: + raise ZstdError("compressor object already finished") + + if flush_mode == COMPRESSOBJ_FLUSH_BLOCK: + z_flush_mode = lib.ZSTD_e_flush + elif flush_mode == COMPRESSOBJ_FLUSH_FINISH: + z_flush_mode = lib.ZSTD_e_end + self._finished = True + else: + raise ZstdError("unhandled flush mode") + + assert self._out.pos == 0 + + in_buffer = ffi.new("ZSTD_inBuffer *") + in_buffer.src = ffi.NULL + in_buffer.size = 0 + in_buffer.pos = 0 + + chunks = [] + + while True: + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, self._out, in_buffer, z_flush_mode + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error ending compression stream: %s" % _zstd_error(zresult) + ) + + if self._out.pos: + chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) + self._out.pos = 0 + + if not zresult: + break + + return b"".join(chunks) + + +class ZstdCompressionChunker(object): + """Compress data to uniformly sized chunks. + + This type allows you to iteratively feed chunks of data into a compressor + and produce output chunks of uniform size. + + ``compress()``, ``flush()``, and ``finish()`` all return an iterator of + ``bytes`` instances holding compressed data. The iterator may be empty. + Callers MUST iterate through all elements of the returned iterator before + performing another operation on the object or else the compressor's + internal state may become confused. This can result in an exception being + raised or malformed data being emitted. + + All chunks emitted by ``compress()`` will have a length of the configured + chunk size. + + ``flush()`` and ``finish()`` may return a final chunk smaller than + the configured chunk size. + + Instances are obtained by calling :py:meth:`ZstdCompressor.chunker`. + + Here is how the API should be used: + + >>> cctx = zstandard.ZstdCompressor() + >>> chunker = cctx.chunker(chunk_size=32768) + >>> + >>> with open(path, 'rb') as fh: + ... while True: + ... in_chunk = fh.read(32768) + ... if not in_chunk: + ... break + ... + ... for out_chunk in chunker.compress(in_chunk): + ... # Do something with output chunk of size 32768. + ... + ... for out_chunk in chunker.finish(): + ... # Do something with output chunks that finalize the zstd frame. + + This compressor type is often a better alternative to + :py:class:`ZstdCompressor.compressobj` because it has better performance + properties. + + ``compressobj()`` will emit output data as it is available. This results + in a *stream* of output chunks of varying sizes. The consistency of the + output chunk size with ``chunker()`` is more appropriate for many usages, + such as sending compressed data to a socket. + + ``compressobj()`` may also perform extra memory reallocations in order + to dynamically adjust the sizes of the output chunks. Since ``chunker()`` + output chunks are all the same size (except for flushed or final chunks), + there is less memory allocation/copying overhead. + """ + + def __init__(self, compressor, chunk_size): + self._compressor = compressor + self._out = ffi.new("ZSTD_outBuffer *") + self._dst_buffer = ffi.new("char[]", chunk_size) + self._out.dst = self._dst_buffer + self._out.size = chunk_size + self._out.pos = 0 + + self._in = ffi.new("ZSTD_inBuffer *") + self._in.src = ffi.NULL + self._in.size = 0 + self._in.pos = 0 + self._finished = False + + def compress(self, data): + """Feed new input data into the compressor. + + :param data: + Data to feed to compressor. + :return: + Iterator of ``bytes`` representing chunks of compressed data. + """ + if self._finished: + raise ZstdError("cannot call compress() after compression finished") + + if self._in.src != ffi.NULL: + raise ZstdError( + "cannot perform operation before consuming output " + "from previous operation" + ) + + data_buffer = ffi.from_buffer(data) + + if not len(data_buffer): + return + + self._in.src = data_buffer + self._in.size = len(data_buffer) + self._in.pos = 0 + + while self._in.pos < self._in.size: + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, self._out, self._in, lib.ZSTD_e_continue + ) + + if self._in.pos == self._in.size: + self._in.src = ffi.NULL + self._in.size = 0 + self._in.pos = 0 + + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd compress error: %s" % _zstd_error(zresult) + ) + + if self._out.pos == self._out.size: + yield ffi.buffer(self._out.dst, self._out.pos)[:] + self._out.pos = 0 + + def flush(self): + """Flushes all data currently in the compressor. + + :return: + Iterator of ``bytes`` of compressed data. + """ + if self._finished: + raise ZstdError("cannot call flush() after compression finished") + + if self._in.src != ffi.NULL: + raise ZstdError( + "cannot call flush() before consuming output from " + "previous operation" + ) + + while True: + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, self._out, self._in, lib.ZSTD_e_flush + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd compress error: %s" % _zstd_error(zresult) + ) + + if self._out.pos: + yield ffi.buffer(self._out.dst, self._out.pos)[:] + self._out.pos = 0 + + if not zresult: + return + + def finish(self): + """Signals the end of input data. + + No new data can be compressed after this method is called. + + This method will flush buffered data and finish the zstd frame. + + :return: + Iterator of ``bytes`` of compressed data. + """ + if self._finished: + raise ZstdError("cannot call finish() after compression finished") + + if self._in.src != ffi.NULL: + raise ZstdError( + "cannot call finish() before consuming output from " + "previous operation" + ) + + while True: + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, self._out, self._in, lib.ZSTD_e_end + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd compress error: %s" % _zstd_error(zresult) + ) + + if self._out.pos: + yield ffi.buffer(self._out.dst, self._out.pos)[:] + self._out.pos = 0 + + if not zresult: + self._finished = True + return + + +class ZstdCompressionReader(object): + """Readable compressing stream wrapper. + + ``ZstdCompressionReader`` is a read-only stream interface for obtaining + compressed data from a source. + + This type conforms to the ``io.RawIOBase`` interface and should be usable + by any type that operates against a *file-object* (``typing.BinaryIO`` + in Python type hinting speak). + + Instances are neither writable nor seekable (even if the underlying + source is seekable). ``readline()`` and ``readlines()`` are not implemented + because they don't make sense for compressed data. ``tell()`` returns the + number of compressed bytes emitted so far. + + Instances are obtained by calling :py:meth:`ZstdCompressor.stream_reader`. + + In this example, we open a file for reading and then wrap that file + handle with a stream from which compressed data can be ``read()``. + + >>> with open(path, 'rb') as fh: + ... cctx = zstandard.ZstdCompressor() + ... reader = cctx.stream_reader(fh) + ... while True: + ... chunk = reader.read(16384) + ... if not chunk: + ... break + ... + ... # Do something with compressed chunk. + + Instances can also be used as context managers: + + >>> with open(path, 'rb') as fh: + ... cctx = zstandard.ZstdCompressor() + ... with cctx.stream_reader(fh) as reader: + ... while True: + ... chunk = reader.read(16384) + ... if not chunk: + ... break + ... + ... # Do something with compressed chunk. + + When the context manager exits or ``close()`` is called, the stream is + closed, underlying resources are released, and future operations against + the compression stream will fail. + + ``stream_reader()`` accepts a ``size`` argument specifying how large the + input stream is. This is used to adjust compression parameters so they are + tailored to the source size. e.g. + + >>> with open(path, 'rb') as fh: + ... cctx = zstandard.ZstdCompressor() + ... with cctx.stream_reader(fh, size=os.stat(path).st_size) as reader: + ... ... + + If the ``source`` is a stream, you can specify how large ``read()`` + requests to that stream should be via the ``read_size`` argument. + It defaults to ``zstandard.COMPRESSION_RECOMMENDED_INPUT_SIZE``. e.g. + + >>> with open(path, 'rb') as fh: + ... cctx = zstandard.ZstdCompressor() + ... # Will perform fh.read(8192) when obtaining data to feed into the + ... # compressor. + ... with cctx.stream_reader(fh, read_size=8192) as reader: + ... ... + """ + + def __init__(self, compressor, source, read_size, closefd=True): + self._compressor = compressor + self._source = source + self._read_size = read_size + self._closefd = closefd + self._entered = False + self._closed = False + self._bytes_compressed = 0 + self._finished_input = False + self._finished_output = False + + self._in_buffer = ffi.new("ZSTD_inBuffer *") + # Holds a ref so backing bytes in self._in_buffer stay alive. + self._source_buffer = None + + def __enter__(self): + if self._entered: + raise ValueError("cannot __enter__ multiple times") + + if self._closed: + raise ValueError("stream is closed") + + self._entered = True + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self._entered = False + self._compressor = None + self.close() + self._source = None + + return False + + def readable(self): + return True + + def writable(self): + return False + + def seekable(self): + return False + + def readline(self): + raise io.UnsupportedOperation() + + def readlines(self): + raise io.UnsupportedOperation() + + def write(self, data): + raise OSError("stream is not writable") + + def writelines(self, ignored): + raise OSError("stream is not writable") + + def isatty(self): + return False + + def flush(self): + return None + + def close(self): + if self._closed: + return + + self._closed = True + + f = getattr(self._source, "close", None) + if self._closefd and f: + f() + + @property + def closed(self): + return self._closed + + def tell(self): + return self._bytes_compressed + + def readall(self): + chunks = [] + + while True: + chunk = self.read(1048576) + if not chunk: + break + + chunks.append(chunk) + + return b"".join(chunks) + + def __iter__(self): + raise io.UnsupportedOperation() + + def __next__(self): + raise io.UnsupportedOperation() + + next = __next__ + + def _read_input(self): + if self._finished_input: + return + + if hasattr(self._source, "read"): + data = self._source.read(self._read_size) + + if not data: + self._finished_input = True + return + + self._source_buffer = ffi.from_buffer(data) + self._in_buffer.src = self._source_buffer + self._in_buffer.size = len(self._source_buffer) + self._in_buffer.pos = 0 + else: + self._source_buffer = ffi.from_buffer(self._source) + self._in_buffer.src = self._source_buffer + self._in_buffer.size = len(self._source_buffer) + self._in_buffer.pos = 0 + + def _compress_into_buffer(self, out_buffer): + if self._in_buffer.pos >= self._in_buffer.size: + return + + old_pos = out_buffer.pos + + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, + out_buffer, + self._in_buffer, + lib.ZSTD_e_continue, + ) + + self._bytes_compressed += out_buffer.pos - old_pos + + if self._in_buffer.pos == self._in_buffer.size: + self._in_buffer.src = ffi.NULL + self._in_buffer.pos = 0 + self._in_buffer.size = 0 + self._source_buffer = None + + if not hasattr(self._source, "read"): + self._finished_input = True + + if lib.ZSTD_isError(zresult): + raise ZstdError("zstd compress error: %s", _zstd_error(zresult)) + + return out_buffer.pos and out_buffer.pos == out_buffer.size + + def read(self, size=-1): + if self._closed: + raise ValueError("stream is closed") + + if size < -1: + raise ValueError("cannot read negative amounts less than -1") + + if size == -1: + return self.readall() + + if self._finished_output or size == 0: + return b"" + + # Need a dedicated ref to dest buffer otherwise it gets collected. + dst_buffer = ffi.new("char[]", size) + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = dst_buffer + out_buffer.size = size + out_buffer.pos = 0 + + if self._compress_into_buffer(out_buffer): + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + while not self._finished_input: + self._read_input() + + if self._compress_into_buffer(out_buffer): + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + # EOF + old_pos = out_buffer.pos + + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end + ) + + self._bytes_compressed += out_buffer.pos - old_pos + + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error ending compression stream: %s", _zstd_error(zresult) + ) + + if zresult == 0: + self._finished_output = True + + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + def read1(self, size=-1): + if self._closed: + raise ValueError("stream is closed") + + if size < -1: + raise ValueError("cannot read negative amounts less than -1") + + if self._finished_output or size == 0: + return b"" + + # -1 returns arbitrary number of bytes. + if size == -1: + size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE + + dst_buffer = ffi.new("char[]", size) + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = dst_buffer + out_buffer.size = size + out_buffer.pos = 0 + + # read1() dictates that we can perform at most 1 call to the + # underlying stream to get input. However, we can't satisfy this + # restriction with compression because not all input generates output. + # It is possible to perform a block flush in order to ensure output. + # But this may not be desirable behavior. So we allow multiple read() + # to the underlying stream. But unlike read(), we stop once we have + # any output. + + self._compress_into_buffer(out_buffer) + if out_buffer.pos: + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + while not self._finished_input: + self._read_input() + + # If we've filled the output buffer, return immediately. + if self._compress_into_buffer(out_buffer): + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + # If we've populated the output buffer and we're not at EOF, + # also return, as we've satisfied the read1() limits. + if out_buffer.pos and not self._finished_input: + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + # Else if we're at EOS and we have room left in the buffer, + # fall through to below and try to add more data to the output. + + # EOF. + old_pos = out_buffer.pos + + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end + ) + + self._bytes_compressed += out_buffer.pos - old_pos + + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error ending compression stream: %s" % _zstd_error(zresult) + ) + + if zresult == 0: + self._finished_output = True + + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + def readinto(self, b): + if self._closed: + raise ValueError("stream is closed") + + if self._finished_output: + return 0 + + # TODO use writable=True once we require CFFI >= 1.12. + dest_buffer = ffi.from_buffer(b) + ffi.memmove(b, b"", 0) + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = dest_buffer + out_buffer.size = len(dest_buffer) + out_buffer.pos = 0 + + if self._compress_into_buffer(out_buffer): + return out_buffer.pos + + while not self._finished_input: + self._read_input() + if self._compress_into_buffer(out_buffer): + return out_buffer.pos + + # EOF. + old_pos = out_buffer.pos + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end + ) + + self._bytes_compressed += out_buffer.pos - old_pos + + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error ending compression stream: %s", _zstd_error(zresult) + ) + + if zresult == 0: + self._finished_output = True + + return out_buffer.pos + + def readinto1(self, b): + if self._closed: + raise ValueError("stream is closed") + + if self._finished_output: + return 0 + + # TODO use writable=True once we require CFFI >= 1.12. + dest_buffer = ffi.from_buffer(b) + ffi.memmove(b, b"", 0) + + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = dest_buffer + out_buffer.size = len(dest_buffer) + out_buffer.pos = 0 + + self._compress_into_buffer(out_buffer) + if out_buffer.pos: + return out_buffer.pos + + while not self._finished_input: + self._read_input() + + if self._compress_into_buffer(out_buffer): + return out_buffer.pos + + if out_buffer.pos and not self._finished_input: + return out_buffer.pos + + # EOF. + old_pos = out_buffer.pos + + zresult = lib.ZSTD_compressStream2( + self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end + ) + + self._bytes_compressed += out_buffer.pos - old_pos + + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error ending compression stream: %s" % _zstd_error(zresult) + ) + + if zresult == 0: + self._finished_output = True + + return out_buffer.pos + + +class ZstdCompressor(object): + """ + Create an object used to perform Zstandard compression. + + Each instance is essentially a wrapper around a ``ZSTD_CCtx`` from + zstd's C API. + + An instance can compress data various ways. Instances can be used + multiple times. Each compression operation will use the compression + parameters defined at construction time. + + .. note: + + When using a compression dictionary and multiple compression + operations are performed, the ``ZstdCompressionParameters`` derived + from an integer compression ``level`` and the first compressed data's + size will be reused for all subsequent operations. This may not be + desirable if source data sizes vary significantly. + + ``compression_params`` is mutually exclusive with ``level``, + ``write_checksum``, ``write_content_size``, ``write_dict_id``, and + ``threads``. + + Assume that each ``ZstdCompressor`` instance can only handle a single + logical compression operation at the same time. i.e. if you call a method + like ``stream_reader()`` to obtain multiple objects derived from the same + ``ZstdCompressor`` instance and attempt to use them simultaneously, errors + will likely occur. + + If you need to perform multiple logical compression operations and you + can't guarantee those operations are temporally non-overlapping, you need + to obtain multiple ``ZstdCompressor`` instances. + + Unless specified otherwise, assume that no two methods of + ``ZstdCompressor`` instances can be called from multiple Python + threads simultaneously. In other words, assume instances are not thread safe + unless stated otherwise. + + :param level: + Integer compression level. Valid values are all negative integers + through 22. Lower values generally yield faster operations with lower + compression ratios. Higher values are generally slower but compress + better. The default is 3, which is what the ``zstd`` CLI uses. Negative + levels effectively engage ``--fast`` mode from the ``zstd`` CLI. + :param dict_data: + A ``ZstdCompressionDict`` to be used to compress with dictionary + data. + :param compression_params: + A ``ZstdCompressionParameters`` instance defining low-level compression + parameters. If defined, this will overwrite the ``level`` argument. + :param write_checksum: + If True, a 4 byte content checksum will be written with the compressed + data, allowing the decompressor to perform content verification. + :param write_content_size: + If True (the default), the decompressed content size will be included + in the header of the compressed data. This data will only be written if + the compressor knows the size of the input data. + :param write_dict_id: + Determines whether the dictionary ID will be written into the compressed + data. Defaults to True. Only adds content to the compressed data if + a dictionary is being used. + :param threads: + Number of threads to use to compress data concurrently. When set, + compression operations are performed on multiple threads. The default + value (0) disables multi-threaded compression. A value of ``-1`` means + to set the number of threads to the number of detected logical CPUs. + """ + + def __init__( + self, + level=3, + dict_data=None, + compression_params=None, + write_checksum=None, + write_content_size=None, + write_dict_id=None, + threads=0, + ): + if level > lib.ZSTD_maxCLevel(): + raise ValueError( + "level must be less than %d" % lib.ZSTD_maxCLevel() + ) + + if threads < 0: + threads = _cpu_count() + + if compression_params and write_checksum is not None: + raise ValueError( + "cannot define compression_params and " "write_checksum" + ) + + if compression_params and write_content_size is not None: + raise ValueError( + "cannot define compression_params and " "write_content_size" + ) + + if compression_params and write_dict_id is not None: + raise ValueError( + "cannot define compression_params and " "write_dict_id" + ) + + if compression_params and threads: + raise ValueError("cannot define compression_params and threads") + + if compression_params: + self._params = _make_cctx_params(compression_params) + else: + if write_dict_id is None: + write_dict_id = True + + params = lib.ZSTD_createCCtxParams() + if params == ffi.NULL: + raise MemoryError() + + self._params = ffi.gc(params, lib.ZSTD_freeCCtxParams) + + _set_compression_parameter( + self._params, lib.ZSTD_c_compressionLevel, level + ) + + _set_compression_parameter( + self._params, + lib.ZSTD_c_contentSizeFlag, + write_content_size if write_content_size is not None else 1, + ) + + _set_compression_parameter( + self._params, + lib.ZSTD_c_checksumFlag, + 1 if write_checksum else 0, + ) + + _set_compression_parameter( + self._params, lib.ZSTD_c_dictIDFlag, 1 if write_dict_id else 0 + ) + + if threads: + _set_compression_parameter( + self._params, lib.ZSTD_c_nbWorkers, threads + ) + + cctx = lib.ZSTD_createCCtx() + if cctx == ffi.NULL: + raise MemoryError() + + self._cctx = cctx + self._dict_data = dict_data + + # We defer setting up garbage collection until after calling + # _setup_cctx() to ensure the memory size estimate is more accurate. + try: + self._setup_cctx() + finally: + self._cctx = ffi.gc( + cctx, lib.ZSTD_freeCCtx, size=lib.ZSTD_sizeof_CCtx(cctx) + ) + + def _setup_cctx(self): + zresult = lib.ZSTD_CCtx_setParametersUsingCCtxParams( + self._cctx, self._params + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "could not set compression parameters: %s" + % _zstd_error(zresult) + ) + + dict_data = self._dict_data + + if dict_data: + if dict_data._cdict: + zresult = lib.ZSTD_CCtx_refCDict(self._cctx, dict_data._cdict) + else: + zresult = lib.ZSTD_CCtx_loadDictionary_advanced( + self._cctx, + dict_data.as_bytes(), + len(dict_data), + lib.ZSTD_dlm_byRef, + dict_data._dict_type, + ) + + if lib.ZSTD_isError(zresult): + raise ZstdError( + "could not load compression dictionary: %s" + % _zstd_error(zresult) + ) + + def memory_size(self): + """Obtain the memory usage of this compressor, in bytes. + + >>> cctx = zstandard.ZstdCompressor() + >>> memory = cctx.memory_size() + """ + return lib.ZSTD_sizeof_CCtx(self._cctx) + + def compress(self, data): + """ + Compress data in a single operation. + + This is the simplest mechanism to perform compression: simply pass in a + value and get a compressed value back. It is almost the most prone to + abuse. + + The input and output values must fit in memory, so passing in very large + values can result in excessive memory usage. For this reason, one of the + streaming based APIs is preferred for larger values. + + :param data: + Source data to compress + :return: + Compressed data + + >>> cctx = zstandard.ZstdCompressor() + >>> compressed = cctx.compress(b"data to compress") + """ + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + data_buffer = ffi.from_buffer(data) + + dest_size = lib.ZSTD_compressBound(len(data_buffer)) + out = new_nonzero("char[]", dest_size) + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, len(data_buffer)) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error setting source size: %s" % _zstd_error(zresult) + ) + + out_buffer = ffi.new("ZSTD_outBuffer *") + in_buffer = ffi.new("ZSTD_inBuffer *") + + out_buffer.dst = out + out_buffer.size = dest_size + out_buffer.pos = 0 + + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + zresult = lib.ZSTD_compressStream2( + self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end + ) + + if lib.ZSTD_isError(zresult): + raise ZstdError("cannot compress: %s" % _zstd_error(zresult)) + elif zresult: + raise ZstdError("unexpected partial frame flush") + + return ffi.buffer(out, out_buffer.pos)[:] + + def compressobj(self, size=-1): + """ + Obtain a compressor exposing the Python standard library compression API. + + See :py:class:`ZstdCompressionObj` for the full documentation. + + :param size: + Size in bytes of data that will be compressed. + :return: + :py:class:`ZstdCompressionObj` + """ + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error setting source size: %s" % _zstd_error(zresult) + ) + + cobj = ZstdCompressionObj(self, COMPRESSION_RECOMMENDED_OUTPUT_SIZE) + + return cobj + + def chunker(self, size=-1, chunk_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): + """ + Create an object for iterative compressing to same-sized chunks. + + This API is similar to :py:meth:`ZstdCompressor.compressobj` but has + better performance properties. + + :param size: + Size in bytes of data that will be compressed. + :param chunk_size: + Size of compressed chunks. + :return: + :py:class:`ZstdCompressionChunker` + """ + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error setting source size: %s" % _zstd_error(zresult) + ) + + return ZstdCompressionChunker(self, chunk_size=chunk_size) + + def copy_stream( + self, + ifh, + ofh, + size=-1, + read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, + write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE, + ): + """ + Copy data between 2 streams while compressing it. + + Data will be read from ``ifh``, compressed, and written to ``ofh``. + ``ifh`` must have a ``read(size)`` method. ``ofh`` must have a + ``write(data)`` + method. + + >>> cctx = zstandard.ZstdCompressor() + >>> with open(input_path, "rb") as ifh, open(output_path, "wb") as ofh: + ... cctx.copy_stream(ifh, ofh) + + It is also possible to declare the size of the source stream: + + >>> cctx = zstandard.ZstdCompressor() + >>> cctx.copy_stream(ifh, ofh, size=len_of_input) + + You can also specify how large the chunks that are ``read()`` + and ``write()`` from and to the streams: + + >>> cctx = zstandard.ZstdCompressor() + >>> cctx.copy_stream(ifh, ofh, read_size=32768, write_size=16384) + + The stream copier returns a 2-tuple of bytes read and written: + + >>> cctx = zstandard.ZstdCompressor() + >>> read_count, write_count = cctx.copy_stream(ifh, ofh) + + :param ifh: + Source stream to read from + :param ofh: + Destination stream to write to + :param size: + Size in bytes of the source stream. If defined, compression + parameters will be tuned for this size. + :param read_size: + Chunk sizes that source stream should be ``read()`` from. + :param write_size: + Chunk sizes that destination stream should be ``write()`` to. + :return: + 2-tuple of ints of bytes read and written, respectively. + """ + + if not hasattr(ifh, "read"): + raise ValueError("first argument must have a read() method") + if not hasattr(ofh, "write"): + raise ValueError("second argument must have a write() method") + + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error setting source size: %s" % _zstd_error(zresult) + ) + + in_buffer = ffi.new("ZSTD_inBuffer *") + out_buffer = ffi.new("ZSTD_outBuffer *") + + dst_buffer = ffi.new("char[]", write_size) + out_buffer.dst = dst_buffer + out_buffer.size = write_size + out_buffer.pos = 0 + + total_read, total_write = 0, 0 + + while True: + data = ifh.read(read_size) + if not data: + break + + data_buffer = ffi.from_buffer(data) + total_read += len(data_buffer) + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + while in_buffer.pos < in_buffer.size: + zresult = lib.ZSTD_compressStream2( + self._cctx, out_buffer, in_buffer, lib.ZSTD_e_continue + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd compress error: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) + total_write += out_buffer.pos + out_buffer.pos = 0 + + # We've finished reading. Flush the compressor. + while True: + zresult = lib.ZSTD_compressStream2( + self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error ending compression stream: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) + total_write += out_buffer.pos + out_buffer.pos = 0 + + if zresult == 0: + break + + return total_read, total_write + + def stream_reader( + self, + source, + size=-1, + read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, + closefd=True, + ): + """ + Wrap a readable source with a stream that can read compressed data. + + This will produce an object conforming to the ``io.RawIOBase`` + interface which can be ``read()`` from to retrieve compressed data + from a source. + + The source object can be any object with a ``read(size)`` method + or an object that conforms to the buffer protocol. + + See :py:class:`ZstdCompressionReader` for type documentation and usage + examples. + + :param source: + Object to read source data from + :param size: + Size in bytes of source object. + :param read_size: + How many bytes to request when ``read()``'ing from the source. + :param closefd: + Whether to close the source stream when the returned stream is + closed. + :return: + :py:class:`ZstdCompressionReader` + """ + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + try: + size = len(source) + except Exception: + pass + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error setting source size: %s" % _zstd_error(zresult) + ) + + return ZstdCompressionReader(self, source, read_size, closefd=closefd) + + def stream_writer( + self, + writer, + size=-1, + write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE, + write_return_read=True, + closefd=True, + ): + """ + Create a stream that will write compressed data into another stream. + + The argument to ``stream_writer()`` must have a ``write(data)`` method. + As compressed data is available, ``write()`` will be called with the + compressed data as its argument. Many common Python types implement + ``write()``, including open file handles and ``io.BytesIO``. + + See :py:class:`ZstdCompressionWriter` for more documentation, including + usage examples. + + :param writer: + Stream to write compressed data to. + :param size: + Size in bytes of data to be compressed. If set, it will be used + to influence compression parameter tuning and could result in the + size being written into the header of the compressed data. + :param write_size: + How much data to ``write()`` to ``writer`` at a time. + :param write_return_read: + Whether ``write()`` should return the number of bytes that were + consumed from the input. + :param closefd: + Whether to ``close`` the ``writer`` when this stream is closed. + :return: + :py:class:`ZstdCompressionWriter` + """ + if not hasattr(writer, "write"): + raise ValueError("must pass an object with a write() method") + + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + return ZstdCompressionWriter( + self, writer, size, write_size, write_return_read, closefd=closefd + ) + + def read_to_iter( + self, + reader, + size=-1, + read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, + write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE, + ): + """ + Read uncompressed data from a reader and return an iterator + + Returns an iterator of compressed data produced from reading from + ``reader``. + + This method provides a mechanism to stream compressed data out of a + source as an iterator of data chunks. + + Uncompressed data will be obtained from ``reader`` by calling the + ``read(size)`` method of it or by reading a slice (if ``reader`` + conforms to the *buffer protocol*). The source data will be streamed + into a compressor. As compressed data is available, it will be exposed + to the iterator. + + Data is read from the source in chunks of ``read_size``. Compressed + chunks are at most ``write_size`` bytes. Both values default to the + zstd input and and output defaults, respectively. + + If reading from the source via ``read()``, ``read()`` will be called + until it raises or returns an empty bytes (``b""``). It is perfectly + valid for the source to deliver fewer bytes than were what requested + by ``read(size)``. + + The caller is partially in control of how fast data is fed into the + compressor by how it consumes the returned iterator. The compressor + will not consume from the reader unless the caller consumes from the + iterator. + + >>> cctx = zstandard.ZstdCompressor() + >>> for chunk in cctx.read_to_iter(fh): + ... # Do something with emitted data. + + ``read_to_iter()`` accepts a ``size`` argument declaring the size of + the input stream: + + >>> cctx = zstandard.ZstdCompressor() + >>> for chunk in cctx.read_to_iter(fh, size=some_int): + >>> pass + + You can also control the size that data is ``read()`` from the source + and the ideal size of output chunks: + + >>> cctx = zstandard.ZstdCompressor() + >>> for chunk in cctx.read_to_iter(fh, read_size=16384, write_size=8192): + >>> pass + + ``read_to_iter()`` does not give direct control over the sizes of chunks + fed into the compressor. Instead, chunk sizes will be whatever the object + being read from delivers. These will often be of a uniform size. + + :param reader: + Stream providing data to be compressed. + :param size: + Size in bytes of input data. + :param read_size: + Controls how many bytes are ``read()`` from the source. + :param write_size: + Controls the output size of emitted chunks. + :return: + Iterator of ``bytes``. + """ + + if hasattr(reader, "read"): + have_read = True + elif hasattr(reader, "__getitem__"): + have_read = False + buffer_offset = 0 + size = len(reader) + else: + raise ValueError( + "must pass an object with a read() method or " + "conforms to buffer protocol" + ) + + lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) + + if size < 0: + size = lib.ZSTD_CONTENTSIZE_UNKNOWN + + zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error setting source size: %s" % _zstd_error(zresult) + ) + + in_buffer = ffi.new("ZSTD_inBuffer *") + out_buffer = ffi.new("ZSTD_outBuffer *") + + in_buffer.src = ffi.NULL + in_buffer.size = 0 + in_buffer.pos = 0 + + dst_buffer = ffi.new("char[]", write_size) + out_buffer.dst = dst_buffer + out_buffer.size = write_size + out_buffer.pos = 0 + + while True: + # We should never have output data sitting around after a previous + # iteration. + assert out_buffer.pos == 0 + + # Collect input data. + if have_read: + read_result = reader.read(read_size) + else: + remaining = len(reader) - buffer_offset + slice_size = min(remaining, read_size) + read_result = reader[buffer_offset : buffer_offset + slice_size] + buffer_offset += slice_size + + # No new input data. Break out of the read loop. + if not read_result: + break + + # Feed all read data into the compressor and emit output until + # exhausted. + read_buffer = ffi.from_buffer(read_result) + in_buffer.src = read_buffer + in_buffer.size = len(read_buffer) + in_buffer.pos = 0 + + while in_buffer.pos < in_buffer.size: + zresult = lib.ZSTD_compressStream2( + self._cctx, out_buffer, in_buffer, lib.ZSTD_e_continue + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd compress error: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + out_buffer.pos = 0 + yield data + + assert out_buffer.pos == 0 + + # And repeat the loop to collect more data. + continue + + # If we get here, input is exhausted. End the stream and emit what + # remains. + while True: + assert out_buffer.pos == 0 + zresult = lib.ZSTD_compressStream2( + self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "error ending compression stream: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + out_buffer.pos = 0 + yield data + + if zresult == 0: + break + + def multi_compress_to_buffer(self, data, threads=-1): + """ + Compress multiple pieces of data as a single function call. + + (Experimental. Not yet supported by CFFI backend.) + + This function is optimized to perform multiple compression operations + as as possible with as little overhead as possible. + + Data to be compressed can be passed as a ``BufferWithSegmentsCollection``, + a ``BufferWithSegments``, or a list containing byte like objects. Each + element of the container will be compressed individually using the + configured parameters on the ``ZstdCompressor`` instance. + + The ``threads`` argument controls how many threads to use for + compression. The default is ``0`` which means to use a single thread. + Negative values use the number of logical CPUs in the machine. + + The function returns a ``BufferWithSegmentsCollection``. This type + represents N discrete memory allocations, each holding 1 or more + compressed frames. + + Output data is written to shared memory buffers. This means that unlike + regular Python objects, a reference to *any* object within the collection + keeps the shared buffer and therefore memory backing it alive. This can + have undesirable effects on process memory usage. + + The API and behavior of this function is experimental and will likely + change. Known deficiencies include: + + * If asked to use multiple threads, it will always spawn that many + threads, even if the input is too small to use them. It should + automatically lower the thread count when the extra threads would + just add overhead. + * The buffer allocation strategy is fixed. There is room to make it + dynamic, perhaps even to allow one output buffer per input, + facilitating a variation of the API to return a list without the + adverse effects of shared memory buffers. + + :param data: + Source to read discrete pieces of data to compress. + + Can be a ``BufferWithSegmentsCollection``, a ``BufferWithSegments``, + or a ``list[bytes]``. + :return: + BufferWithSegmentsCollection holding compressed data. + """ + raise NotImplementedError() + + def frame_progression(self): + """ + Return information on how much work the compressor has done. + + Returns a 3-tuple of (ingested, consumed, produced). + + >>> cctx = zstandard.ZstdCompressor() + >>> (ingested, consumed, produced) = cctx.frame_progression() + """ + progression = lib.ZSTD_getFrameProgression(self._cctx) + + return progression.ingested, progression.consumed, progression.produced + + +class FrameParameters(object): + """Information about a zstd frame. + + Instances have the following attributes: + + ``content_size`` + Integer size of original, uncompressed content. This will be ``0`` if the + original content size isn't written to the frame (controlled with the + ``write_content_size`` argument to ``ZstdCompressor``) or if the input + content size was ``0``. + + ``window_size`` + Integer size of maximum back-reference distance in compressed data. + + ``dict_id`` + Integer of dictionary ID used for compression. ``0`` if no dictionary + ID was used or if the dictionary ID was ``0``. + + ``has_checksum`` + Bool indicating whether a 4 byte content checksum is stored at the end + of the frame. + """ + + def __init__(self, fparams): + self.content_size = fparams.frameContentSize + self.window_size = fparams.windowSize + self.dict_id = fparams.dictID + self.has_checksum = bool(fparams.checksumFlag) + + +def frame_content_size(data): + """Obtain the decompressed size of a frame. + + The returned value is usually accurate. But strictly speaking it should + not be trusted. + + :return: + ``-1`` if size unknown and a non-negative integer otherwise. + """ + data_buffer = ffi.from_buffer(data) + + size = lib.ZSTD_getFrameContentSize(data_buffer, len(data_buffer)) + + if size == lib.ZSTD_CONTENTSIZE_ERROR: + raise ZstdError("error when determining content size") + elif size == lib.ZSTD_CONTENTSIZE_UNKNOWN: + return -1 + else: + return size + + +def frame_header_size(data): + """Obtain the size of a frame header. + + :return: + Integer size in bytes. + """ + data_buffer = ffi.from_buffer(data) + + zresult = lib.ZSTD_frameHeaderSize(data_buffer, len(data_buffer)) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "could not determine frame header size: %s" % _zstd_error(zresult) + ) + + return zresult + + +def get_frame_parameters(data): + """ + Parse a zstd frame header into frame parameters. + + Depending on which fields are present in the frame and their values, the + length of the frame parameters varies. If insufficient bytes are passed + in to fully parse the frame parameters, ``ZstdError`` is raised. To ensure + frame parameters can be parsed, pass in at least 18 bytes. + + :param data: + Data from which to read frame parameters. + :return: + :py:class:`FrameParameters` + """ + params = ffi.new("ZSTD_frameHeader *") + + data_buffer = ffi.from_buffer(data) + zresult = lib.ZSTD_getFrameHeader(params, data_buffer, len(data_buffer)) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "cannot get frame parameters: %s" % _zstd_error(zresult) + ) + + if zresult: + raise ZstdError( + "not enough data for frame parameters; need %d bytes" % zresult + ) + + return FrameParameters(params[0]) + + +class ZstdCompressionDict(object): + """Represents a computed compression dictionary. + + Instances are obtained by calling :py:func:`train_dictionary` or by + passing bytes obtained from another source into the constructor. + + Instances can be constructed from bytes: + + >>> dict_data = zstandard.ZstdCompressionDict(data) + + It is possible to construct a dictionary from *any* data. If the data + doesn't begin with a magic header, it will be treated as a *prefix* + dictionary. *Prefix* dictionaries allow compression operations to + reference raw data within the dictionary. + + It is possible to force the use of *prefix* dictionaries or to require + a dictionary header: + + >>> dict_data = zstandard.ZstdCompressionDict(data, dict_type=zstandard.DICT_TYPE_RAWCONTENT) + >>> dict_data = zstandard.ZstdCompressionDict(data, dict_type=zstandard.DICT_TYPE_FULLDICT) + + You can see how many bytes are in the dictionary by calling ``len()``: + + >>> dict_data = zstandard.train_dictionary(size, samples) + >>> dict_size = len(dict_data) # will not be larger than ``size`` + + Once you have a dictionary, you can pass it to the objects performing + compression and decompression: + + >>> dict_data = zstandard.train_dictionary(131072, samples) + >>> cctx = zstandard.ZstdCompressor(dict_data=dict_data) + >>> for source_data in input_data: + ... compressed = cctx.compress(source_data) + ... # Do something with compressed data. + ... + >>> dctx = zstandard.ZstdDecompressor(dict_data=dict_data) + >>> for compressed_data in input_data: + ... buffer = io.BytesIO() + ... with dctx.stream_writer(buffer) as decompressor: + ... decompressor.write(compressed_data) + ... # Do something with raw data in ``buffer``. + + Dictionaries have unique integer IDs. You can retrieve this ID via: + + >>> dict_id = zstandard.dictionary_id(dict_data) + + You can obtain the raw data in the dict (useful for persisting and constructing + a ``ZstdCompressionDict`` later) via ``as_bytes()``: + + >>> dict_data = zstandard.train_dictionary(size, samples) + >>> raw_data = dict_data.as_bytes() + + By default, when a ``ZstdCompressionDict`` is *attached* to a + ``ZstdCompressor``, each ``ZstdCompressor`` performs work to prepare the + dictionary for use. This is fine if only 1 compression operation is being + performed or if the ``ZstdCompressor`` is being reused for multiple operations. + But if multiple ``ZstdCompressor`` instances are being used with the dictionary, + this can add overhead. + + It is possible to *precompute* the dictionary so it can readily be consumed + by multiple ``ZstdCompressor`` instances: + + >>> d = zstandard.ZstdCompressionDict(data) + >>> # Precompute for compression level 3. + >>> d.precompute_compress(level=3) + >>> # Precompute with specific compression parameters. + >>> params = zstandard.ZstdCompressionParameters(...) + >>> d.precompute_compress(compression_params=params) + + .. note:: + + When a dictionary is precomputed, the compression parameters used to + precompute the dictionary overwrite some of the compression parameters + specified to ``ZstdCompressor``. + + :param data: + Dictionary data. + :param dict_type: + Type of dictionary. One of the ``DICT_TYPE_*`` constants. + """ + + def __init__(self, data, dict_type=DICT_TYPE_AUTO, k=0, d=0): + assert isinstance(data, bytes) + self._data = data + self.k = k + self.d = d + + if dict_type not in ( + DICT_TYPE_AUTO, + DICT_TYPE_RAWCONTENT, + DICT_TYPE_FULLDICT, + ): + raise ValueError( + "invalid dictionary load mode: %d; must use " + "DICT_TYPE_* constants" + ) + + self._dict_type = dict_type + self._cdict = None + + def __len__(self): + return len(self._data) + + def dict_id(self): + """Obtain the integer ID of the dictionary.""" + return int(lib.ZDICT_getDictID(self._data, len(self._data))) + + def as_bytes(self): + """Obtain the ``bytes`` representation of the dictionary.""" + return self._data + + def precompute_compress(self, level=0, compression_params=None): + """Precompute a dictionary os it can be used by multiple compressors. + + Calling this method on an instance that will be used by multiple + :py:class:`ZstdCompressor` instances will improve performance. + """ + if level and compression_params: + raise ValueError( + "must only specify one of level or " "compression_params" + ) + + if not level and not compression_params: + raise ValueError("must specify one of level or compression_params") + + if level: + cparams = lib.ZSTD_getCParams(level, 0, len(self._data)) + else: + cparams = ffi.new("ZSTD_compressionParameters") + cparams.chainLog = compression_params.chain_log + cparams.hashLog = compression_params.hash_log + cparams.minMatch = compression_params.min_match + cparams.searchLog = compression_params.search_log + cparams.strategy = compression_params.strategy + cparams.targetLength = compression_params.target_length + cparams.windowLog = compression_params.window_log + + cdict = lib.ZSTD_createCDict_advanced( + self._data, + len(self._data), + lib.ZSTD_dlm_byRef, + self._dict_type, + cparams, + lib.ZSTD_defaultCMem, + ) + if cdict == ffi.NULL: + raise ZstdError("unable to precompute dictionary") + + self._cdict = ffi.gc( + cdict, lib.ZSTD_freeCDict, size=lib.ZSTD_sizeof_CDict(cdict) + ) + + @property + def _ddict(self): + ddict = lib.ZSTD_createDDict_advanced( + self._data, + len(self._data), + lib.ZSTD_dlm_byRef, + self._dict_type, + lib.ZSTD_defaultCMem, + ) + + if ddict == ffi.NULL: + raise ZstdError("could not create decompression dict") + + ddict = ffi.gc( + ddict, lib.ZSTD_freeDDict, size=lib.ZSTD_sizeof_DDict(ddict) + ) + self.__dict__["_ddict"] = ddict + + return ddict + + +def train_dictionary( + dict_size, + samples, + k=0, + d=0, + f=0, + split_point=0.0, + accel=0, + notifications=0, + dict_id=0, + level=0, + steps=0, + threads=0, +): + """Train a dictionary from sample data using the COVER algorithm. + + A compression dictionary of size ``dict_size`` will be created from the + iterable of ``samples``. The raw dictionary bytes will be returned. + + The dictionary training mechanism is known as *cover*. More details about it + are available in the paper *Effective Construction of Relative Lempel-Ziv + Dictionaries* (authors: Liao, Petri, Moffat, Wirth). + + The cover algorithm takes parameters ``k`` and ``d``. These are the + *segment size* and *dmer size*, respectively. The returned dictionary + instance created by this function has ``k`` and ``d`` attributes + containing the values for these parameters. If a ``ZstdCompressionDict`` + is constructed from raw bytes data (a content-only dictionary), the + ``k`` and ``d`` attributes will be ``0``. + + The segment and dmer size parameters to the cover algorithm can either be + specified manually or ``train_dictionary()`` can try multiple values + and pick the best one, where *best* means the smallest compressed data size. + This later mode is called *optimization* mode. + + Under the hood, this function always calls + ``ZDICT_optimizeTrainFromBuffer_fastCover()``. See the corresponding C library + documentation for more. + + If neither ``steps`` nor ``threads`` is defined, defaults for ``d``, ``steps``, + and ``level`` will be used that are equivalent with what + ``ZDICT_trainFromBuffer()`` would use. + + + :param dict_size: + Target size in bytes of the dictionary to generate. + :param samples: + A list of bytes holding samples the dictionary will be trained from. + :param k: + Segment size : constraint: 0 < k : Reasonable range [16, 2048+] + :param d: + dmer size : constraint: 0 < d <= k : Reasonable range [6, 16] + :param f: + log of size of frequency array : constraint: 0 < f <= 31 : 1 means + default(20) + :param split_point: + Percentage of samples used for training: Only used for optimization. + The first # samples * ``split_point`` samples will be used to training. + The last # samples * (1 - split_point) samples will be used for testing. + 0 means default (0.75), 1.0 when all samples are used for both training + and testing. + :param accel: + Acceleration level: constraint: 0 < accel <= 10. Higher means faster + and less accurate, 0 means default(1). + :param dict_id: + Integer dictionary ID for the produced dictionary. Default is 0, which uses + a random value. + :param steps: + Number of steps through ``k`` values to perform when trying parameter + variations. + :param threads: + Number of threads to use when trying parameter variations. Default is 0, + which means to use a single thread. A negative value can be specified to + use as many threads as there are detected logical CPUs. + :param level: + Integer target compression level when trying parameter variations. + :param notifications: + Controls writing of informational messages to ``stderr``. ``0`` (the + default) means to write nothing. ``1`` writes errors. ``2`` writes + progression info. ``3`` writes more details. And ``4`` writes all info. + """ + + if not isinstance(samples, list): + raise TypeError("samples must be a list") + + if threads < 0: + threads = _cpu_count() + + if not steps and not threads: + d = d or 8 + steps = steps or 4 + level = level or 3 + + total_size = sum(map(len, samples)) + + samples_buffer = new_nonzero("char[]", total_size) + sample_sizes = new_nonzero("size_t[]", len(samples)) + + offset = 0 + for i, sample in enumerate(samples): + if not isinstance(sample, bytes): + raise ValueError("samples must be bytes") + + l = len(sample) + ffi.memmove(samples_buffer + offset, sample, l) + offset += l + sample_sizes[i] = l + + dict_data = new_nonzero("char[]", dict_size) + + dparams = ffi.new("ZDICT_fastCover_params_t *")[0] + dparams.k = k + dparams.d = d + dparams.f = f + dparams.steps = steps + dparams.nbThreads = threads + dparams.splitPoint = split_point + dparams.accel = accel + dparams.zParams.notificationLevel = notifications + dparams.zParams.dictID = dict_id + dparams.zParams.compressionLevel = level + + zresult = lib.ZDICT_optimizeTrainFromBuffer_fastCover( + ffi.addressof(dict_data), + dict_size, + ffi.addressof(samples_buffer), + ffi.addressof(sample_sizes, 0), + len(samples), + ffi.addressof(dparams), + ) + + if lib.ZDICT_isError(zresult): + msg = ffi.string(lib.ZDICT_getErrorName(zresult)).decode("utf-8") + raise ZstdError("cannot train dict: %s" % msg) + + return ZstdCompressionDict( + ffi.buffer(dict_data, zresult)[:], + dict_type=DICT_TYPE_FULLDICT, + k=dparams.k, + d=dparams.d, + ) + + +class ZstdDecompressionObj(object): + """A standard library API compatible decompressor. + + This type implements a compressor that conforms to the API by other + decompressors in Python's standard library. e.g. ``zlib.decompressobj`` + or ``bz2.BZ2Decompressor``. This allows callers to use zstd compression + while conforming to a similar API. + + Compressed data chunks are fed into ``decompress(data)`` and + uncompressed output (or an empty bytes) is returned. Output from + subsequent calls needs to be concatenated to reassemble the full + decompressed byte sequence. + + If ``read_across_frames=False``, each instance is single use: once an + input frame is decoded, ``decompress()`` will raise an exception. If + ``read_across_frames=True``, instances can decode multiple frames. + + >>> dctx = zstandard.ZstdDecompressor() + >>> dobj = dctx.decompressobj() + >>> data = dobj.decompress(compressed_chunk_0) + >>> data = dobj.decompress(compressed_chunk_1) + + By default, calls to ``decompress()`` write output data in chunks of size + ``DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE``. These chunks are concatenated + before being returned to the caller. It is possible to define the size of + these temporary chunks by passing ``write_size`` to ``decompressobj()``: + + >>> dctx = zstandard.ZstdDecompressor() + >>> dobj = dctx.decompressobj(write_size=1048576) + + .. note:: + + Because calls to ``decompress()`` may need to perform multiple + memory (re)allocations, this streaming decompression API isn't as + efficient as other APIs. + """ + + def __init__(self, decompressor, write_size, read_across_frames): + self._decompressor = decompressor + self._write_size = write_size + self._finished = False + self._read_across_frames = read_across_frames + self._unused_input = b"" + + def decompress(self, data): + """Send compressed data to the decompressor and obtain decompressed data. + + :param data: + Data to feed into the decompressor. + :return: + Decompressed bytes. + """ + if self._finished: + raise ZstdError("cannot use a decompressobj multiple times") + + in_buffer = ffi.new("ZSTD_inBuffer *") + out_buffer = ffi.new("ZSTD_outBuffer *") + + data_buffer = ffi.from_buffer(data) + + if len(data_buffer) == 0: + return b"" + + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + dst_buffer = ffi.new("char[]", self._write_size) + out_buffer.dst = dst_buffer + out_buffer.size = len(dst_buffer) + out_buffer.pos = 0 + + chunks = [] + + while True: + zresult = lib.ZSTD_decompressStream( + self._decompressor._dctx, out_buffer, in_buffer + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd decompressor error: %s" % _zstd_error(zresult) + ) + + # Always record any output from decompressor. + if out_buffer.pos: + chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) + + # 0 is only seen when a frame is fully decoded *and* fully flushed. + # Behavior depends on whether we're in single or multiple frame + # mode. + if zresult == 0 and not self._read_across_frames: + # Mark the instance as done and make any unconsumed input available + # for retrieval. + self._finished = True + self._decompressor = None + self._unused_input = data[in_buffer.pos : in_buffer.size] + break + elif zresult == 0 and self._read_across_frames: + # We're at the end of a fully flushed frame and we can read more. + # Try to read more if there's any more input. + if in_buffer.pos == in_buffer.size: + break + else: + out_buffer.pos = 0 + + # We're not at the end of the frame *or* we're not fully flushed. + + # The decompressor will write out all the bytes it can to the output + # buffer. So if the output buffer is partially filled and the input + # is exhausted, there's nothing more to write. So we've done all we + # can. + elif ( + in_buffer.pos == in_buffer.size + and out_buffer.pos < out_buffer.size + ): + break + else: + out_buffer.pos = 0 + + return b"".join(chunks) + + def flush(self, length=0): + """Effectively a no-op. + + Implemented for compatibility with the standard library APIs. + + Safe to call at any time. + + :return: + Empty bytes. + """ + return b"" + + @property + def unused_data(self): + """Bytes past the end of compressed data. + + If ``decompress()`` is fed additional data beyond the end of a zstd + frame, this value will be non-empty once ``decompress()`` fully decodes + the input frame. + """ + return self._unused_input + + @property + def unconsumed_tail(self): + """Data that has not yet been fed into the decompressor.""" + return b"" + + @property + def eof(self): + """Whether the end of the compressed data stream has been reached.""" + return self._finished + + +class ZstdDecompressionReader(object): + """Read only decompressor that pull uncompressed data from another stream. + + This type provides a read-only stream interface for performing transparent + decompression from another stream or data source. It conforms to the + ``io.RawIOBase`` interface. Only methods relevant to reading are + implemented. + + >>> with open(path, 'rb') as fh: + >>> dctx = zstandard.ZstdDecompressor() + >>> reader = dctx.stream_reader(fh) + >>> while True: + ... chunk = reader.read(16384) + ... if not chunk: + ... break + ... # Do something with decompressed chunk. + + The stream can also be used as a context manager: + + >>> with open(path, 'rb') as fh: + ... dctx = zstandard.ZstdDecompressor() + ... with dctx.stream_reader(fh) as reader: + ... ... + + When used as a context manager, the stream is closed and the underlying + resources are released when the context manager exits. Future operations + against the stream will fail. + + The ``source`` argument to ``stream_reader()`` can be any object with a + ``read(size)`` method or any object implementing the *buffer protocol*. + + If the ``source`` is a stream, you can specify how large ``read()`` requests + to that stream should be via the ``read_size`` argument. It defaults to + ``zstandard.DECOMPRESSION_RECOMMENDED_INPUT_SIZE``.: + + >>> with open(path, 'rb') as fh: + ... dctx = zstandard.ZstdDecompressor() + ... # Will perform fh.read(8192) when obtaining data for the decompressor. + ... with dctx.stream_reader(fh, read_size=8192) as reader: + ... ... + + Instances are *partially* seekable. Absolute and relative positions + (``SEEK_SET`` and ``SEEK_CUR``) forward of the current position are + allowed. Offsets behind the current read position and offsets relative + to the end of stream are not allowed and will raise ``ValueError`` + if attempted. + + ``tell()`` returns the number of decompressed bytes read so far. + + Not all I/O methods are implemented. Notably missing is support for + ``readline()``, ``readlines()``, and linewise iteration support. This is + because streams operate on binary data - not text data. If you want to + convert decompressed output to text, you can chain an ``io.TextIOWrapper`` + to the stream: + + >>> with open(path, 'rb') as fh: + ... dctx = zstandard.ZstdDecompressor() + ... stream_reader = dctx.stream_reader(fh) + ... text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8') + ... for line in text_stream: + ... ... + """ + + def __init__( + self, + decompressor, + source, + read_size, + read_across_frames, + closefd=True, + ): + self._decompressor = decompressor + self._source = source + self._read_size = read_size + self._read_across_frames = bool(read_across_frames) + self._closefd = bool(closefd) + self._entered = False + self._closed = False + self._bytes_decompressed = 0 + self._finished_input = False + self._finished_output = False + self._in_buffer = ffi.new("ZSTD_inBuffer *") + # Holds a ref to self._in_buffer.src. + self._source_buffer = None + + def __enter__(self): + if self._entered: + raise ValueError("cannot __enter__ multiple times") + + if self._closed: + raise ValueError("stream is closed") + + self._entered = True + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self._entered = False + self._decompressor = None + self.close() + self._source = None + + return False + + def readable(self): + return True + + def writable(self): + return False + + def seekable(self): + return False + + def readline(self, size=-1): + raise io.UnsupportedOperation() + + def readlines(self, hint=-1): + raise io.UnsupportedOperation() + + def write(self, data): + raise io.UnsupportedOperation() + + def writelines(self, lines): + raise io.UnsupportedOperation() + + def isatty(self): + return False + + def flush(self): + return None + + def close(self): + if self._closed: + return None + + self._closed = True + + f = getattr(self._source, "close", None) + if self._closefd and f: + f() + + @property + def closed(self): + return self._closed + + def tell(self): + return self._bytes_decompressed + + def readall(self): + chunks = [] + + while True: + chunk = self.read(1048576) + if not chunk: + break + + chunks.append(chunk) + + return b"".join(chunks) + + def __iter__(self): + raise io.UnsupportedOperation() + + def __next__(self): + raise io.UnsupportedOperation() + + next = __next__ + + def _read_input(self): + # We have data left over in the input buffer. Use it. + if self._in_buffer.pos < self._in_buffer.size: + return + + # All input data exhausted. Nothing to do. + if self._finished_input: + return + + # Else populate the input buffer from our source. + if hasattr(self._source, "read"): + data = self._source.read(self._read_size) + + if not data: + self._finished_input = True + return + + self._source_buffer = ffi.from_buffer(data) + self._in_buffer.src = self._source_buffer + self._in_buffer.size = len(self._source_buffer) + self._in_buffer.pos = 0 + else: + self._source_buffer = ffi.from_buffer(self._source) + self._in_buffer.src = self._source_buffer + self._in_buffer.size = len(self._source_buffer) + self._in_buffer.pos = 0 + + def _decompress_into_buffer(self, out_buffer): + """Decompress available input into an output buffer. + + Returns True if data in output buffer should be emitted. + """ + zresult = lib.ZSTD_decompressStream( + self._decompressor._dctx, out_buffer, self._in_buffer + ) + + if self._in_buffer.pos == self._in_buffer.size: + self._in_buffer.src = ffi.NULL + self._in_buffer.pos = 0 + self._in_buffer.size = 0 + self._source_buffer = None + + if not hasattr(self._source, "read"): + self._finished_input = True + + if lib.ZSTD_isError(zresult): + raise ZstdError("zstd decompress error: %s" % _zstd_error(zresult)) + + # Emit data if there is data AND either: + # a) output buffer is full (read amount is satisfied) + # b) we're at end of a frame and not in frame spanning mode + return out_buffer.pos and ( + out_buffer.pos == out_buffer.size + or zresult == 0 + and not self._read_across_frames + ) + + def read(self, size=-1): + if self._closed: + raise ValueError("stream is closed") + + if size < -1: + raise ValueError("cannot read negative amounts less than -1") + + if size == -1: + # This is recursive. But it gets the job done. + return self.readall() + + if self._finished_output or size == 0: + return b"" + + # We /could/ call into readinto() here. But that introduces more + # overhead. + dst_buffer = ffi.new("char[]", size) + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = dst_buffer + out_buffer.size = size + out_buffer.pos = 0 + + self._read_input() + if self._decompress_into_buffer(out_buffer): + self._bytes_decompressed += out_buffer.pos + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + while not self._finished_input: + self._read_input() + if self._decompress_into_buffer(out_buffer): + self._bytes_decompressed += out_buffer.pos + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + self._bytes_decompressed += out_buffer.pos + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + def readinto(self, b): + if self._closed: + raise ValueError("stream is closed") + + if self._finished_output: + return 0 + + # TODO use writable=True once we require CFFI >= 1.12. + dest_buffer = ffi.from_buffer(b) + ffi.memmove(b, b"", 0) + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = dest_buffer + out_buffer.size = len(dest_buffer) + out_buffer.pos = 0 + + self._read_input() + if self._decompress_into_buffer(out_buffer): + self._bytes_decompressed += out_buffer.pos + return out_buffer.pos + + while not self._finished_input: + self._read_input() + if self._decompress_into_buffer(out_buffer): + self._bytes_decompressed += out_buffer.pos + return out_buffer.pos + + self._bytes_decompressed += out_buffer.pos + return out_buffer.pos + + def read1(self, size=-1): + if self._closed: + raise ValueError("stream is closed") + + if size < -1: + raise ValueError("cannot read negative amounts less than -1") + + if self._finished_output or size == 0: + return b"" + + # -1 returns arbitrary number of bytes. + if size == -1: + size = DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE + + dst_buffer = ffi.new("char[]", size) + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = dst_buffer + out_buffer.size = size + out_buffer.pos = 0 + + # read1() dictates that we can perform at most 1 call to underlying + # stream to get input. However, we can't satisfy this restriction with + # decompression because not all input generates output. So we allow + # multiple read(). But unlike read(), we stop once we have any output. + while not self._finished_input: + self._read_input() + self._decompress_into_buffer(out_buffer) + + if out_buffer.pos: + break + + self._bytes_decompressed += out_buffer.pos + return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + + def readinto1(self, b): + if self._closed: + raise ValueError("stream is closed") + + if self._finished_output: + return 0 + + # TODO use writable=True once we require CFFI >= 1.12. + dest_buffer = ffi.from_buffer(b) + ffi.memmove(b, b"", 0) + + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = dest_buffer + out_buffer.size = len(dest_buffer) + out_buffer.pos = 0 + + while not self._finished_input and not self._finished_output: + self._read_input() + self._decompress_into_buffer(out_buffer) + + if out_buffer.pos: + break + + self._bytes_decompressed += out_buffer.pos + return out_buffer.pos + + def seek(self, pos, whence=os.SEEK_SET): + if self._closed: + raise ValueError("stream is closed") + + read_amount = 0 + + if whence == os.SEEK_SET: + if pos < 0: + raise OSError("cannot seek to negative position with SEEK_SET") + + if pos < self._bytes_decompressed: + raise OSError( + "cannot seek zstd decompression stream " "backwards" + ) + + read_amount = pos - self._bytes_decompressed + + elif whence == os.SEEK_CUR: + if pos < 0: + raise OSError( + "cannot seek zstd decompression stream " "backwards" + ) + + read_amount = pos + elif whence == os.SEEK_END: + raise OSError( + "zstd decompression streams cannot be seeked " "with SEEK_END" + ) + + while read_amount: + result = self.read( + min(read_amount, DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE) + ) + + if not result: + break + + read_amount -= len(result) + + return self._bytes_decompressed + + +class ZstdDecompressionWriter(object): + """ + Write-only stream wrapper that performs decompression. + + This type provides a writable stream that performs decompression and writes + decompressed data to another stream. + + This type implements the ``io.RawIOBase`` interface. Only methods that + involve writing will do useful things. + + Behavior is similar to :py:meth:`ZstdCompressor.stream_writer`: compressed + data is sent to the decompressor by calling ``write(data)`` and decompressed + output is written to the inner stream by calling its ``write(data)`` + method: + + >>> dctx = zstandard.ZstdDecompressor() + >>> decompressor = dctx.stream_writer(fh) + >>> # Will call fh.write() with uncompressed data. + >>> decompressor.write(compressed_data) + + Instances can be used as context managers. However, context managers add no + extra special behavior other than automatically calling ``close()`` when + they exit. + + Calling ``close()`` will mark the stream as closed and subsequent I/O + operations will raise ``ValueError`` (per the documented behavior of + ``io.RawIOBase``). ``close()`` will also call ``close()`` on the + underlying stream if such a method exists and the instance was created with + ``closefd=True``. + + The size of chunks to ``write()`` to the destination can be specified: + + >>> dctx = zstandard.ZstdDecompressor() + >>> with dctx.stream_writer(fh, write_size=16384) as decompressor: + >>> pass + + You can see how much memory is being used by the decompressor: + + >>> dctx = zstandard.ZstdDecompressor() + >>> with dctx.stream_writer(fh) as decompressor: + >>> byte_size = decompressor.memory_size() + + ``stream_writer()`` accepts a ``write_return_read`` boolean argument to control + the return value of ``write()``. When ``True`` (the default)``, ``write()`` + returns the number of bytes that were read from the input. When ``False``, + ``write()`` returns the number of bytes that were ``write()`` to the inner + stream. + """ + + def __init__( + self, + decompressor, + writer, + write_size, + write_return_read, + closefd=True, + ): + decompressor._ensure_dctx() + + self._decompressor = decompressor + self._writer = writer + self._write_size = write_size + self._write_return_read = bool(write_return_read) + self._closefd = bool(closefd) + self._entered = False + self._closing = False + self._closed = False + + def __enter__(self): + if self._closed: + raise ValueError("stream is closed") + + if self._entered: + raise ZstdError("cannot __enter__ multiple times") + + self._entered = True + + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self._entered = False + self.close() + + return False + + def __iter__(self): + raise io.UnsupportedOperation() + + def __next__(self): + raise io.UnsupportedOperation() + + def memory_size(self): + return lib.ZSTD_sizeof_DCtx(self._decompressor._dctx) + + def close(self): + if self._closed: + return + + try: + self._closing = True + self.flush() + finally: + self._closing = False + self._closed = True + + f = getattr(self._writer, "close", None) + if self._closefd and f: + f() + + @property + def closed(self): + return self._closed + + def fileno(self): + f = getattr(self._writer, "fileno", None) + if f: + return f() + else: + raise OSError("fileno not available on underlying writer") + + def flush(self): + if self._closed: + raise ValueError("stream is closed") + + f = getattr(self._writer, "flush", None) + if f and not self._closing: + return f() + + def isatty(self): + return False + + def readable(self): + return False + + def readline(self, size=-1): + raise io.UnsupportedOperation() + + def readlines(self, hint=-1): + raise io.UnsupportedOperation() + + def seek(self, offset, whence=None): + raise io.UnsupportedOperation() + + def seekable(self): + return False + + def tell(self): + raise io.UnsupportedOperation() + + def truncate(self, size=None): + raise io.UnsupportedOperation() + + def writable(self): + return True + + def writelines(self, lines): + raise io.UnsupportedOperation() + + def read(self, size=-1): + raise io.UnsupportedOperation() + + def readall(self): + raise io.UnsupportedOperation() + + def readinto(self, b): + raise io.UnsupportedOperation() + + def write(self, data): + if self._closed: + raise ValueError("stream is closed") + + total_write = 0 + + in_buffer = ffi.new("ZSTD_inBuffer *") + out_buffer = ffi.new("ZSTD_outBuffer *") + + data_buffer = ffi.from_buffer(data) + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + dst_buffer = ffi.new("char[]", self._write_size) + out_buffer.dst = dst_buffer + out_buffer.size = len(dst_buffer) + out_buffer.pos = 0 + + dctx = self._decompressor._dctx + + while in_buffer.pos < in_buffer.size: + zresult = lib.ZSTD_decompressStream(dctx, out_buffer, in_buffer) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd decompress error: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + self._writer.write( + ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + ) + total_write += out_buffer.pos + out_buffer.pos = 0 + + if self._write_return_read: + return in_buffer.pos + else: + return total_write + + +class ZstdDecompressor(object): + """ + Context for performing zstandard decompression. + + Each instance is essentially a wrapper around a ``ZSTD_DCtx`` from zstd's + C API. + + An instance can compress data various ways. Instances can be used multiple + times. + + The interface of this class is very similar to + :py:class:`zstandard.ZstdCompressor` (by design). + + Assume that each ``ZstdDecompressor`` instance can only handle a single + logical compression operation at the same time. i.e. if you call a method + like ``decompressobj()`` to obtain multiple objects derived from the same + ``ZstdDecompressor`` instance and attempt to use them simultaneously, errors + will likely occur. + + If you need to perform multiple logical decompression operations and you + can't guarantee those operations are temporally non-overlapping, you need + to obtain multiple ``ZstdDecompressor`` instances. + + Unless specified otherwise, assume that no two methods of + ``ZstdDecompressor`` instances can be called from multiple Python + threads simultaneously. In other words, assume instances are not thread safe + unless stated otherwise. + + :param dict_data: + Compression dictionary to use. + :param max_window_size: + Sets an upper limit on the window size for decompression operations in + kibibytes. This setting can be used to prevent large memory allocations + for inputs using large compression windows. + :param format: + Set the format of data for the decoder. + + By default this is ``zstandard.FORMAT_ZSTD1``. It can be set to + ``zstandard.FORMAT_ZSTD1_MAGICLESS`` to allow decoding frames without + the 4 byte magic header. Not all decompression APIs support this mode. + """ + + def __init__(self, dict_data=None, max_window_size=0, format=FORMAT_ZSTD1): + self._dict_data = dict_data + self._max_window_size = max_window_size + self._format = format + + dctx = lib.ZSTD_createDCtx() + if dctx == ffi.NULL: + raise MemoryError() + + self._dctx = dctx + + # Defer setting up garbage collection until full state is loaded so + # the memory size is more accurate. + try: + self._ensure_dctx() + finally: + self._dctx = ffi.gc( + dctx, lib.ZSTD_freeDCtx, size=lib.ZSTD_sizeof_DCtx(dctx) + ) + + def memory_size(self): + """Size of decompression context, in bytes. + + >>> dctx = zstandard.ZstdDecompressor() + >>> size = dctx.memory_size() + """ + return lib.ZSTD_sizeof_DCtx(self._dctx) + + def decompress( + self, + data, + max_output_size=0, + read_across_frames=False, + allow_extra_data=True, + ): + """ + Decompress data in a single operation. + + This method will decompress the input data in a single operation and + return the decompressed data. + + The input bytes are expected to contain at least 1 full Zstandard frame + (something compressed with :py:meth:`ZstdCompressor.compress` or + similar). If the input does not contain a full frame, an exception will + be raised. + + ``read_across_frames`` controls whether to read multiple zstandard + frames in the input. When False, decompression stops after reading the + first frame. This feature is not yet implemented but the argument is + provided for forward API compatibility when the default is changed to + True in a future release. For now, if you need to decompress multiple + frames, use an API like :py:meth:`ZstdCompressor.stream_reader` with + ``read_across_frames=True``. + + ``allow_extra_data`` controls how to handle extra input data after a + fully decoded frame. If False, any extra data (which could be a valid + zstd frame) will result in ``ZstdError`` being raised. If True, extra + data is silently ignored. The default will likely change to False in a + future release when ``read_across_frames`` defaults to True. + + If the input contains extra data after a full frame, that extra input + data is silently ignored. This behavior is undesirable in many scenarios + and will likely be changed or controllable in a future release (see + #181). + + If the frame header of the compressed data does not contain the content + size, ``max_output_size`` must be specified or ``ZstdError`` will be + raised. An allocation of size ``max_output_size`` will be performed and an + attempt will be made to perform decompression into that buffer. If the + buffer is too small or cannot be allocated, ``ZstdError`` will be + raised. The buffer will be resized if it is too large. + + Uncompressed data could be much larger than compressed data. As a result, + calling this function could result in a very large memory allocation + being performed to hold the uncompressed data. This could potentially + result in ``MemoryError`` or system memory swapping. If you don't need + the full output data in a single contiguous array in memory, consider + using streaming decompression for more resilient memory behavior. + + Usage: + + >>> dctx = zstandard.ZstdDecompressor() + >>> decompressed = dctx.decompress(data) + + If the compressed data doesn't have its content size embedded within it, + decompression can be attempted by specifying the ``max_output_size`` + argument: + + >>> dctx = zstandard.ZstdDecompressor() + >>> uncompressed = dctx.decompress(data, max_output_size=1048576) + + Ideally, ``max_output_size`` will be identical to the decompressed + output size. + + .. important:: + + If the exact size of decompressed data is unknown (not passed in + explicitly and not stored in the zstd frame), for performance + reasons it is encouraged to use a streaming API. + + :param data: + Compressed data to decompress. + :param max_output_size: + Integer max size of response. + + If ``0``, there is no limit and we can attempt to allocate an output + buffer of infinite size. + :return: + ``bytes`` representing decompressed output. + """ + + if read_across_frames: + raise ZstdError( + "ZstdDecompressor.read_across_frames=True is not yet implemented" + ) + + self._ensure_dctx() + + data_buffer = ffi.from_buffer(data) + + output_size = lib.ZSTD_getFrameContentSize( + data_buffer, len(data_buffer) + ) + + if output_size == lib.ZSTD_CONTENTSIZE_ERROR: + raise ZstdError("error determining content size from frame header") + elif output_size == 0: + return b"" + elif output_size == lib.ZSTD_CONTENTSIZE_UNKNOWN: + if not max_output_size: + raise ZstdError( + "could not determine content size in frame header" + ) + + result_buffer = ffi.new("char[]", max_output_size) + result_size = max_output_size + output_size = 0 + else: + result_buffer = ffi.new("char[]", output_size) + result_size = output_size + + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = result_buffer + out_buffer.size = result_size + out_buffer.pos = 0 + + in_buffer = ffi.new("ZSTD_inBuffer *") + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) + if lib.ZSTD_isError(zresult): + raise ZstdError("decompression error: %s" % _zstd_error(zresult)) + elif zresult: + raise ZstdError( + "decompression error: did not decompress full frame" + ) + elif output_size and out_buffer.pos != output_size: + raise ZstdError( + "decompression error: decompressed %d bytes; expected %d" + % (zresult, output_size) + ) + elif not allow_extra_data and in_buffer.pos < in_buffer.size: + count = in_buffer.size - in_buffer.pos + + raise ZstdError( + "compressed input contains %d bytes of unused data, which is disallowed" + % count + ) + + return ffi.buffer(result_buffer, out_buffer.pos)[:] + + def stream_reader( + self, + source, + read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, + read_across_frames=False, + closefd=True, + ): + """ + Read-only stream wrapper that performs decompression. + + This method obtains an object that conforms to the ``io.RawIOBase`` + interface and performs transparent decompression via ``read()`` + operations. Source data is obtained by calling ``read()`` on a + source stream or object implementing the buffer protocol. + + See :py:class:`zstandard.ZstdDecompressionReader` for more documentation + and usage examples. + + :param source: + Source of compressed data to decompress. Can be any object + with a ``read(size)`` method or that conforms to the buffer protocol. + :param read_size: + Integer number of bytes to read from the source and feed into the + compressor at a time. + :param read_across_frames: + Whether to read data across multiple zstd frames. If False, + decompression is stopped at frame boundaries. + :param closefd: + Whether to close the source stream when this instance is closed. + :return: + :py:class:`zstandard.ZstdDecompressionReader`. + """ + self._ensure_dctx() + return ZstdDecompressionReader( + self, source, read_size, read_across_frames, closefd=closefd + ) + + def decompressobj( + self, + write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, + read_across_frames=False, + ): + """Obtain a standard library compatible incremental decompressor. + + See :py:class:`ZstdDecompressionObj` for more documentation + and usage examples. + + :param write_size: size of internal output buffer to collect decompressed + chunks in. + :param read_across_frames: whether to read across multiple zstd frames. + If False, reading stops after 1 frame and subsequent decompress + attempts will raise an exception. + :return: + :py:class:`zstandard.ZstdDecompressionObj` + """ + if write_size < 1: + raise ValueError("write_size must be positive") + + self._ensure_dctx() + return ZstdDecompressionObj( + self, write_size=write_size, read_across_frames=read_across_frames + ) + + def read_to_iter( + self, + reader, + read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, + write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, + skip_bytes=0, + ): + """Read compressed data to an iterator of uncompressed chunks. + + This method will read data from ``reader``, feed it to a decompressor, + and emit ``bytes`` chunks representing the decompressed result. + + >>> dctx = zstandard.ZstdDecompressor() + >>> for chunk in dctx.read_to_iter(fh): + ... # Do something with original data. + + ``read_to_iter()`` accepts an object with a ``read(size)`` method that + will return compressed bytes or an object conforming to the buffer + protocol. + + ``read_to_iter()`` returns an iterator whose elements are chunks of the + decompressed data. + + The size of requested ``read()`` from the source can be specified: + + >>> dctx = zstandard.ZstdDecompressor() + >>> for chunk in dctx.read_to_iter(fh, read_size=16384): + ... pass + + It is also possible to skip leading bytes in the input data: + + >>> dctx = zstandard.ZstdDecompressor() + >>> for chunk in dctx.read_to_iter(fh, skip_bytes=1): + ... pass + + .. tip:: + + Skipping leading bytes is useful if the source data contains extra + *header* data. Traditionally, you would need to create a slice or + ``memoryview`` of the data you want to decompress. This would create + overhead. It is more efficient to pass the offset into this API. + + Similarly to :py:meth:`ZstdCompressor.read_to_iter`, the consumer of the + iterator controls when data is decompressed. If the iterator isn't consumed, + decompression is put on hold. + + When ``read_to_iter()`` is passed an object conforming to the buffer protocol, + the behavior may seem similar to what occurs when the simple decompression + API is used. However, this API works when the decompressed size is unknown. + Furthermore, if feeding large inputs, the decompressor will work in chunks + instead of performing a single operation. + + :param reader: + Source of compressed data. Can be any object with a + ``read(size)`` method or any object conforming to the buffer + protocol. + :param read_size: + Integer size of data chunks to read from ``reader`` and feed into + the decompressor. + :param write_size: + Integer size of data chunks to emit from iterator. + :param skip_bytes: + Integer number of bytes to skip over before sending data into + the decompressor. + :return: + Iterator of ``bytes`` representing uncompressed data. + """ + + if skip_bytes >= read_size: + raise ValueError("skip_bytes must be smaller than read_size") + + if hasattr(reader, "read"): + have_read = True + elif hasattr(reader, "__getitem__"): + have_read = False + buffer_offset = 0 + size = len(reader) + else: + raise ValueError( + "must pass an object with a read() method or " + "conforms to buffer protocol" + ) + + if skip_bytes: + if have_read: + reader.read(skip_bytes) + else: + if skip_bytes > size: + raise ValueError("skip_bytes larger than first input chunk") + + buffer_offset = skip_bytes + + self._ensure_dctx() + + in_buffer = ffi.new("ZSTD_inBuffer *") + out_buffer = ffi.new("ZSTD_outBuffer *") + + dst_buffer = ffi.new("char[]", write_size) + out_buffer.dst = dst_buffer + out_buffer.size = len(dst_buffer) + out_buffer.pos = 0 + + while True: + assert out_buffer.pos == 0 + + if have_read: + read_result = reader.read(read_size) + else: + remaining = size - buffer_offset + slice_size = min(remaining, read_size) + read_result = reader[buffer_offset : buffer_offset + slice_size] + buffer_offset += slice_size + + # No new input. Break out of read loop. + if not read_result: + break + + # Feed all read data into decompressor and emit output until + # exhausted. + read_buffer = ffi.from_buffer(read_result) + in_buffer.src = read_buffer + in_buffer.size = len(read_buffer) + in_buffer.pos = 0 + + while in_buffer.pos < in_buffer.size: + assert out_buffer.pos == 0 + + zresult = lib.ZSTD_decompressStream( + self._dctx, out_buffer, in_buffer + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd decompress error: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] + out_buffer.pos = 0 + yield data + + if zresult == 0: + return + + # Repeat loop to collect more input data. + continue + + # If we get here, input is exhausted. + + def stream_writer( + self, + writer, + write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, + write_return_read=True, + closefd=True, + ): + """ + Push-based stream wrapper that performs decompression. + + This method constructs a stream wrapper that conforms to the + ``io.RawIOBase`` interface and performs transparent decompression + when writing to a wrapper stream. + + See :py:class:`zstandard.ZstdDecompressionWriter` for more documentation + and usage examples. + + :param writer: + Destination for decompressed output. Can be any object with a + ``write(data)``. + :param write_size: + Integer size of chunks to ``write()`` to ``writer``. + :param write_return_read: + Whether ``write()`` should return the number of bytes of input + consumed. If False, ``write()`` returns the number of bytes sent + to the inner stream. + :param closefd: + Whether to ``close()`` the inner stream when this stream is closed. + :return: + :py:class:`zstandard.ZstdDecompressionWriter` + """ + if not hasattr(writer, "write"): + raise ValueError("must pass an object with a write() method") + + return ZstdDecompressionWriter( + self, + writer, + write_size, + write_return_read, + closefd=closefd, + ) + + def copy_stream( + self, + ifh, + ofh, + read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, + write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, + ): + """ + Copy data between streams, decompressing in the process. + + Compressed data will be read from ``ifh``, decompressed, and written + to ``ofh``. + + >>> dctx = zstandard.ZstdDecompressor() + >>> dctx.copy_stream(ifh, ofh) + + e.g. to decompress a file to another file: + + >>> dctx = zstandard.ZstdDecompressor() + >>> with open(input_path, 'rb') as ifh, open(output_path, 'wb') as ofh: + ... dctx.copy_stream(ifh, ofh) + + The size of chunks being ``read()`` and ``write()`` from and to the + streams can be specified: + + >>> dctx = zstandard.ZstdDecompressor() + >>> dctx.copy_stream(ifh, ofh, read_size=8192, write_size=16384) + + :param ifh: + Source stream to read compressed data from. + + Must have a ``read()`` method. + :param ofh: + Destination stream to write uncompressed data to. + + Must have a ``write()`` method. + :param read_size: + The number of bytes to ``read()`` from the source in a single + operation. + :param write_size: + The number of bytes to ``write()`` to the destination in a single + operation. + :return: + 2-tuple of integers representing the number of bytes read and + written, respectively. + """ + + if not hasattr(ifh, "read"): + raise ValueError("first argument must have a read() method") + if not hasattr(ofh, "write"): + raise ValueError("second argument must have a write() method") + + self._ensure_dctx() + + in_buffer = ffi.new("ZSTD_inBuffer *") + out_buffer = ffi.new("ZSTD_outBuffer *") + + dst_buffer = ffi.new("char[]", write_size) + out_buffer.dst = dst_buffer + out_buffer.size = write_size + out_buffer.pos = 0 + + total_read, total_write = 0, 0 + + # Read all available input. + while True: + data = ifh.read(read_size) + if not data: + break + + data_buffer = ffi.from_buffer(data) + total_read += len(data_buffer) + in_buffer.src = data_buffer + in_buffer.size = len(data_buffer) + in_buffer.pos = 0 + + # Flush all read data to output. + while in_buffer.pos < in_buffer.size: + zresult = lib.ZSTD_decompressStream( + self._dctx, out_buffer, in_buffer + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "zstd decompressor error: %s" % _zstd_error(zresult) + ) + + if out_buffer.pos: + ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) + total_write += out_buffer.pos + out_buffer.pos = 0 + + # Continue loop to keep reading. + + return total_read, total_write + + def decompress_content_dict_chain(self, frames): + """ + Decompress a series of frames using the content dictionary chaining technique. + + Such a list of frames is produced by compressing discrete inputs where + each non-initial input is compressed with a *prefix* dictionary consisting + of the content of the previous input. + + For example, say you have the following inputs: + + >>> inputs = [b"input 1", b"input 2", b"input 3"] + + The zstd frame chain consists of: + + 1. ``b"input 1"`` compressed in standalone/discrete mode + 2. ``b"input 2"`` compressed using ``b"input 1"`` as a *prefix* dictionary + 3. ``b"input 3"`` compressed using ``b"input 2"`` as a *prefix* dictionary + + Each zstd frame **must** have the content size written. + + The following Python code can be used to produce a *prefix dictionary chain*: + + >>> def make_chain(inputs): + ... frames = [] + ... + ... # First frame is compressed in standalone/discrete mode. + ... zctx = zstandard.ZstdCompressor() + ... frames.append(zctx.compress(inputs[0])) + ... + ... # Subsequent frames use the previous fulltext as a prefix dictionary + ... for i, raw in enumerate(inputs[1:]): + ... dict_data = zstandard.ZstdCompressionDict( + ... inputs[i], dict_type=zstandard.DICT_TYPE_RAWCONTENT) + ... zctx = zstandard.ZstdCompressor(dict_data=dict_data) + ... frames.append(zctx.compress(raw)) + ... + ... return frames + + ``decompress_content_dict_chain()`` returns the uncompressed data of the last + element in the input chain. + + .. note:: + + It is possible to implement *prefix dictionary chain* decompression + on top of other APIs. However, this function will likely be faster - + especially for long input chains - as it avoids the overhead of + instantiating and passing around intermediate objects between + multiple functions. + + :param frames: + List of ``bytes`` holding compressed zstd frames. + :return: + """ + if not isinstance(frames, list): + raise TypeError("argument must be a list") + + if not frames: + raise ValueError("empty input chain") + + # First chunk should not be using a dictionary. We handle it specially. + chunk = frames[0] + if not isinstance(chunk, bytes): + raise ValueError("chunk 0 must be bytes") + + # All chunks should be zstd frames and should have content size set. + chunk_buffer = ffi.from_buffer(chunk) + params = ffi.new("ZSTD_frameHeader *") + zresult = lib.ZSTD_getFrameHeader( + params, chunk_buffer, len(chunk_buffer) + ) + if lib.ZSTD_isError(zresult): + raise ValueError("chunk 0 is not a valid zstd frame") + elif zresult: + raise ValueError("chunk 0 is too small to contain a zstd frame") + + if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN: + raise ValueError("chunk 0 missing content size in frame") + + self._ensure_dctx(load_dict=False) + + last_buffer = ffi.new("char[]", params.frameContentSize) + + out_buffer = ffi.new("ZSTD_outBuffer *") + out_buffer.dst = last_buffer + out_buffer.size = len(last_buffer) + out_buffer.pos = 0 + + in_buffer = ffi.new("ZSTD_inBuffer *") + in_buffer.src = chunk_buffer + in_buffer.size = len(chunk_buffer) + in_buffer.pos = 0 + + zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "could not decompress chunk 0: %s" % _zstd_error(zresult) + ) + elif zresult: + raise ZstdError("chunk 0 did not decompress full frame") + + # Special case of chain length of 1 + if len(frames) == 1: + return ffi.buffer(last_buffer, len(last_buffer))[:] + + i = 1 + while i < len(frames): + chunk = frames[i] + if not isinstance(chunk, bytes): + raise ValueError("chunk %d must be bytes" % i) + + chunk_buffer = ffi.from_buffer(chunk) + zresult = lib.ZSTD_getFrameHeader( + params, chunk_buffer, len(chunk_buffer) + ) + if lib.ZSTD_isError(zresult): + raise ValueError("chunk %d is not a valid zstd frame" % i) + elif zresult: + raise ValueError( + "chunk %d is too small to contain a zstd frame" % i + ) + + if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN: + raise ValueError("chunk %d missing content size in frame" % i) + + dest_buffer = ffi.new("char[]", params.frameContentSize) + + out_buffer.dst = dest_buffer + out_buffer.size = len(dest_buffer) + out_buffer.pos = 0 + + in_buffer.src = chunk_buffer + in_buffer.size = len(chunk_buffer) + in_buffer.pos = 0 + + zresult = lib.ZSTD_decompressStream( + self._dctx, out_buffer, in_buffer + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "could not decompress chunk %d: %s" % _zstd_error(zresult) + ) + elif zresult: + raise ZstdError("chunk %d did not decompress full frame" % i) + + last_buffer = dest_buffer + i += 1 + + return ffi.buffer(last_buffer, len(last_buffer))[:] + + def multi_decompress_to_buffer( + self, frames, decompressed_sizes=None, threads=0 + ): + """ + Decompress multiple zstd frames to output buffers as a single operation. + + (Experimental. Not available in CFFI backend.) + + Compressed frames can be passed to the function as a + ``BufferWithSegments``, a ``BufferWithSegmentsCollection``, or as a + list containing objects that conform to the buffer protocol. For best + performance, pass a ``BufferWithSegmentsCollection`` or a + ``BufferWithSegments``, as minimal input validation will be done for + that type. If calling from Python (as opposed to C), constructing one + of these instances may add overhead cancelling out the performance + overhead of validation for list inputs. + + Returns a ``BufferWithSegmentsCollection`` containing the decompressed + data. All decompressed data is allocated in a single memory buffer. The + ``BufferWithSegments`` instance tracks which objects are at which offsets + and their respective lengths. + + >>> dctx = zstandard.ZstdDecompressor() + >>> results = dctx.multi_decompress_to_buffer([b'...', b'...']) + + The decompressed size of each frame MUST be discoverable. It can either be + embedded within the zstd frame or passed in via the ``decompressed_sizes`` + argument. + + The ``decompressed_sizes`` argument is an object conforming to the buffer + protocol which holds an array of 64-bit unsigned integers in the machine's + native format defining the decompressed sizes of each frame. If this argument + is passed, it avoids having to scan each frame for its decompressed size. + This frame scanning can add noticeable overhead in some scenarios. + + >>> frames = [...] + >>> sizes = struct.pack('=QQQQ', len0, len1, len2, len3) + >>> + >>> dctx = zstandard.ZstdDecompressor() + >>> results = dctx.multi_decompress_to_buffer(frames, decompressed_sizes=sizes) + + .. note:: + + It is possible to pass a ``mmap.mmap()`` instance into this function by + wrapping it with a ``BufferWithSegments`` instance (which will define the + offsets of frames within the memory mapped region). + + This function is logically equivalent to performing + :py:meth:`ZstdCompressor.decompress` on each input frame and returning the + result. + + This function exists to perform decompression on multiple frames as fast + as possible by having as little overhead as possible. Since decompression is + performed as a single operation and since the decompressed output is stored in + a single buffer, extra memory allocations, Python objects, and Python function + calls are avoided. This is ideal for scenarios where callers know up front that + they need to access data for multiple frames, such as when *delta chains* are + being used. + + Currently, the implementation always spawns multiple threads when requested, + even if the amount of work to do is small. In the future, it will be smarter + about avoiding threads and their associated overhead when the amount of + work to do is small. + + :param frames: + Source defining zstd frames to decompress. + :param decompressed_sizes: + Array of integers representing sizes of decompressed zstd frames. + :param threads: + How many threads to use for decompression operations. + + Negative values will use the same number of threads as logical CPUs + on the machine. Values ``0`` or ``1`` use a single thread. + :return: + ``BufferWithSegmentsCollection`` + """ + raise NotImplementedError() + + def _ensure_dctx(self, load_dict=True): + lib.ZSTD_DCtx_reset(self._dctx, lib.ZSTD_reset_session_only) + + if self._max_window_size: + zresult = lib.ZSTD_DCtx_setMaxWindowSize( + self._dctx, self._max_window_size + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "unable to set max window size: %s" % _zstd_error(zresult) + ) + + zresult = lib.ZSTD_DCtx_setParameter( + self._dctx, lib.ZSTD_d_format, self._format + ) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "unable to set decoding format: %s" % _zstd_error(zresult) + ) + + if self._dict_data and load_dict: + zresult = lib.ZSTD_DCtx_refDDict(self._dctx, self._dict_data._ddict) + if lib.ZSTD_isError(zresult): + raise ZstdError( + "unable to reference prepared dictionary: %s" + % _zstd_error(zresult) + ) |
