aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/lz4/py3/tests/stream
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-11-14 09:58:56 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/python/lz4/py3/tests/stream
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
downloadydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/python/lz4/py3/tests/stream')
-rw-r--r--contrib/python/lz4/py3/tests/stream/conftest.py155
-rw-r--r--contrib/python/lz4/py3/tests/stream/numpy_byte_array.binbin0 -> 8552 bytes
-rw-r--r--contrib/python/lz4/py3/tests/stream/test_stream_0.py116
-rw-r--r--contrib/python/lz4/py3/tests/stream/test_stream_1.py555
-rw-r--r--contrib/python/lz4/py3/tests/stream/test_stream_2.py152
-rw-r--r--contrib/python/lz4/py3/tests/stream/test_stream_3.py123
-rw-r--r--contrib/python/lz4/py3/tests/stream/test_stream_4.py139
7 files changed, 1240 insertions, 0 deletions
diff --git a/contrib/python/lz4/py3/tests/stream/conftest.py b/contrib/python/lz4/py3/tests/stream/conftest.py
new file mode 100644
index 0000000000..b31ab14317
--- /dev/null
+++ b/contrib/python/lz4/py3/tests/stream/conftest.py
@@ -0,0 +1,155 @@
+import pytest
+import os
+import sys
+
+test_data = [
+ (b''),
+ (os.urandom(8 * 1024)),
+ # (b'0' * 8 * 1024),
+ # (bytearray(b'')),
+ # (bytearray(os.urandom(8 * 1024))),
+ #(bytearray(open(os.path.join(os.path.dirname(__file__), 'numpy_byte_array.bin'), 'rb').read()))
+]
+
+if sys.version_info > (2, 7):
+ test_data += [
+ (memoryview(b'')),
+ (memoryview(os.urandom(8 * 1024)))
+ ]
+
+
+@pytest.fixture(
+ params=test_data,
+ ids=[
+ 'data' + str(i) for i in range(len(test_data))
+ ]
+)
+def data(request):
+ return request.param
+
+
+@pytest.fixture(
+ params=[
+ ("double_buffer"),
+ # ("ring_buffer"), # not implemented
+ ]
+)
+def strategy(request):
+ return request.param
+
+
+test_buffer_size = sorted(
+ [1,
+ # 4,
+ # 8,
+ # 64,
+ # 256,
+ 941,
+ # 1 * 1024,
+ # 4 * 1024,
+ # 8 * 1024,
+ # 16 * 1024,
+ # 32 * 1024,
+ 64 * 1024,
+ # 128 * 1024
+ ]
+)
+
+
+@pytest.fixture(
+ params=test_buffer_size,
+ ids=[
+ 'buffer_size' + str(i) for i in range(len(test_buffer_size))
+ ]
+)
+def buffer_size(request):
+ return request.param
+
+
+@pytest.fixture(
+ params=[
+ (
+ {
+ 'store_comp_size': 1
+ }
+ ),
+ (
+ {
+ 'store_comp_size': 2
+ }
+ ),
+ # (
+ # {
+ # 'store_comp_size': 4
+ # }
+ # ),
+ ]
+)
+def store_comp_size(request):
+ return request.param
+
+
+@pytest.fixture(
+ params=[
+ (
+ {
+ 'return_bytearray': True
+ }
+ ),
+ (
+ {
+ 'return_bytearray': False
+ }
+ ),
+ ]
+)
+def return_bytearray(request):
+ return request.param
+
+
+@pytest.fixture
+def c_return_bytearray(return_bytearray):
+ return return_bytearray
+
+
+@pytest.fixture
+def d_return_bytearray(return_bytearray):
+ return return_bytearray
+
+
+@pytest.fixture(
+ params=[
+ ('default', None)
+ ] + [
+ ('fast', None)
+ ] + [
+ ('fast', {'acceleration': 2 * s}) for s in range(5)
+ ] + [
+ ('high_compression', None)
+ ] + [
+ ('high_compression', {'compression_level': 2 * s}) for s in range(9)
+ ] + [
+ (None, None)
+ ]
+)
+def mode(request):
+ return request.param
+
+
+dictionary = [
+ None,
+ (0, 0),
+ (100, 200),
+ (0, 8 * 1024),
+ os.urandom(8 * 1024)
+]
+
+
+@pytest.fixture(
+ params=dictionary,
+ ids=[
+ 'dictionary' + str(i) for i in range(len(dictionary))
+ ]
+)
+def dictionary(request):
+ return request.param
diff --git a/contrib/python/lz4/py3/tests/stream/numpy_byte_array.bin b/contrib/python/lz4/py3/tests/stream/numpy_byte_array.bin
new file mode 100644
index 0000000000..49537e2d90
--- /dev/null
+++ b/contrib/python/lz4/py3/tests/stream/numpy_byte_array.bin
Binary files differ
diff --git a/contrib/python/lz4/py3/tests/stream/test_stream_0.py b/contrib/python/lz4/py3/tests/stream/test_stream_0.py
new file mode 100644
index 0000000000..03b19f3f42
--- /dev/null
+++ b/contrib/python/lz4/py3/tests/stream/test_stream_0.py
@@ -0,0 +1,116 @@
+import lz4.stream
+import sys
+import pytest
+if sys.version_info <= (3, 2):
+ import struct
+
+
+def get_stored_size(buff, block_length_size):
+ if sys.version_info > (2, 7):
+ if isinstance(buff, memoryview):
+ b = buff.tobytes()
+ else:
+ b = bytes(buff)
+ else:
+ b = bytes(buff)
+
+ if len(b) < block_length_size:
+ return None
+
+ if sys.version_info > (3, 2):
+ return int.from_bytes(b[:block_length_size], 'little')
+ else:
+ # This would not work on a memoryview object, hence buff.tobytes call
+ # above
+ fmt = {1: 'B', 2: 'H', 4: 'I', }
+ return struct.unpack('<' + fmt[block_length_size], b[:block_length_size])[0]
+
+
+def roundtrip(x, c_kwargs, d_kwargs, dictionary):
+ if dictionary:
+ if isinstance(dictionary, tuple):
+ dict_ = x[dictionary[0]:dictionary[1]]
+ else:
+ dict_ = dictionary
+ c_kwargs['dictionary'] = dict_
+ d_kwargs['dictionary'] = dict_
+
+ c = bytes()
+ with lz4.stream.LZ4StreamCompressor(**c_kwargs) as proc:
+ for start in range(0, len(x), c_kwargs['buffer_size']):
+ chunk = x[start:start + c_kwargs['buffer_size']]
+ assert len(chunk) <= c_kwargs['buffer_size']
+ block = proc.compress(chunk)
+ if c_kwargs.get('return_bytearray'):
+ assert isinstance(block, bytearray)
+ if start == 0:
+ c = block
+ else:
+ c += block
+ assert get_stored_size(block, c_kwargs['store_comp_size']) == \
+ (len(block) - c_kwargs['store_comp_size'])
+
+ d = bytes()
+ with lz4.stream.LZ4StreamDecompressor(**d_kwargs) as proc:
+ start = 0
+ while start < len(c):
+ block = proc.get_block(c[start:])
+ chunk = proc.decompress(block)
+ if d_kwargs.get('return_bytearray'):
+ assert isinstance(chunk, bytearray)
+ if start == 0:
+ d = chunk
+ else:
+ d += chunk
+ start += d_kwargs['store_comp_size'] + len(block)
+
+ return d
+
+
+def setup_kwargs(strategy, mode, buffer_size, store_comp_size,
+ c_return_bytearray=None, d_return_bytearray=None):
+ c_kwargs = {}
+
+ if mode[0] is not None:
+ c_kwargs['mode'] = mode[0]
+ if mode[1] is not None:
+ c_kwargs.update(mode[1])
+
+ c_kwargs['strategy'] = strategy
+ c_kwargs['buffer_size'] = buffer_size
+ c_kwargs.update(store_comp_size)
+
+ if c_return_bytearray:
+ c_kwargs.update(c_return_bytearray)
+
+ d_kwargs = {}
+
+ if d_return_bytearray:
+ d_kwargs.update(d_return_bytearray)
+
+ d_kwargs['strategy'] = strategy
+ d_kwargs['buffer_size'] = buffer_size
+ d_kwargs.update(store_comp_size)
+
+ return (c_kwargs, d_kwargs)
+
+
+# Test single threaded usage with all valid variations of input
+def test_1(data, strategy, mode, buffer_size, store_comp_size,
+ c_return_bytearray, d_return_bytearray, dictionary):
+ if buffer_size >= (1 << (8 * store_comp_size['store_comp_size'])):
+ pytest.skip("Invalid case: buffer_size too large for the block length area")
+
+ (c_kwargs, d_kwargs) = setup_kwargs(
+ strategy, mode, buffer_size, store_comp_size, c_return_bytearray, d_return_bytearray)
+
+ d = roundtrip(data, c_kwargs, d_kwargs, dictionary)
+
+ assert d == data
+
+
+# Test multi threaded:
+# Not relevant in the lz4.stream case (the process is highly sequential,
+# and re-use/share the same context from one input chunk to the next one).
+def test_2(data, strategy, mode, buffer_size, store_comp_size, dictionary): # noqa
+ pass
diff --git a/contrib/python/lz4/py3/tests/stream/test_stream_1.py b/contrib/python/lz4/py3/tests/stream/test_stream_1.py
new file mode 100644
index 0000000000..6b49267e26
--- /dev/null
+++ b/contrib/python/lz4/py3/tests/stream/test_stream_1.py
@@ -0,0 +1,555 @@
+import lz4.stream
+import pytest
+import sys
+import os
+
+
+if sys.version_info < (3, ):
+ from struct import pack, unpack
+
+ def _get_format(length, byteorder, signed):
+ _order = {'l': '<', 'b': '>'}
+ _fmt = {1: 'b', 2: 'h', 4: 'i', 8: 'q'}
+ _sign = {True: lambda x: x.lower(), False: lambda x: x.upper()}
+ return _sign[signed](_order[byteorder[0].lower()] + _fmt[length])
+
+ def int_to_bytes(value, length=4, byteorder='little', signed=False):
+ return bytearray(pack(_get_format(length, byteorder, signed), value))
+
+ def int_from_bytes(bytes, byteorder='little', signed=False):
+ return unpack(_get_format(len(bytes), byteorder, signed), bytes)[0]
+
+else:
+ def int_to_bytes(value, length=4, byteorder='little', signed=False):
+ return value.to_bytes(length, byteorder, signed=signed)
+
+ def int_from_bytes(bytes, byteorder='little', signed=False):
+ return int.from_bytes(bytes, byteorder, signed=signed)
+
+
+# This test requires allocating a big lump of memory. In order to
+# avoid a massive memory allocation during byte compilation, we have
+# to declare a variable for the size of the buffer we're going to
+# create outside the scope of the function below. See:
+# https://bugs.python.org/issue21074
+_4GB = 0x100000000 # 4GB
+
+
+def compress(x, c_kwargs, return_block_offset=False, check_block_type=False):
+ o = [0, ]
+ if c_kwargs.get('return_bytearray', False):
+ c = bytearray()
+ else:
+ c = bytes()
+ with lz4.stream.LZ4StreamCompressor(**c_kwargs) as proc:
+ for start in range(0, len(x), c_kwargs['buffer_size']):
+ chunk = x[start:start + c_kwargs['buffer_size']]
+ block = proc.compress(chunk)
+ c += block
+ if return_block_offset:
+ o.append(len(c))
+ if check_block_type:
+ assert isinstance(block, c.__class__)
+ if return_block_offset:
+ return c, o
+ else:
+ return c
+
+
+def decompress(x, d_kwargs, check_chunk_type=False):
+ if d_kwargs.get('return_bytearray', False):
+ d = bytearray()
+ else:
+ d = bytes()
+ with lz4.stream.LZ4StreamDecompressor(**d_kwargs) as proc:
+ start = 0
+ while start < len(x):
+ block = proc.get_block(x[start:])
+ chunk = proc.decompress(block)
+ d += chunk
+ start += d_kwargs['store_comp_size'] + len(block)
+ if check_chunk_type:
+ assert isinstance(chunk, d.__class__)
+ return d
+
+
+def test_invalid_config_c_1():
+ c_kwargs = {}
+ c_kwargs['strategy'] = "ring_buffer"
+ c_kwargs['buffer_size'] = 1024
+
+ with pytest.raises(NotImplementedError):
+ lz4.stream.LZ4StreamCompressor(**c_kwargs)
+
+
+def test_invalid_config_d_1():
+ d_kwargs = {}
+ d_kwargs['strategy'] = "ring_buffer"
+ d_kwargs['buffer_size'] = 1024
+
+ with pytest.raises(NotImplementedError):
+ lz4.stream.LZ4StreamDecompressor(**d_kwargs)
+
+
+def test_invalid_config_c_2():
+ c_kwargs = {}
+ c_kwargs['strategy'] = "foo"
+ c_kwargs['buffer_size'] = 1024
+
+ with pytest.raises(ValueError):
+ lz4.stream.LZ4StreamCompressor(**c_kwargs)
+
+
+def test_invalid_config_d_2():
+ d_kwargs = {}
+ d_kwargs['strategy'] = "foo"
+ d_kwargs['buffer_size'] = 1024
+
+ with pytest.raises(ValueError):
+ lz4.stream.LZ4StreamDecompressor(**d_kwargs)
+
+
+def test_invalid_config_c_3(store_comp_size):
+ c_kwargs = {}
+ c_kwargs['strategy'] = "double_buffer"
+ c_kwargs['buffer_size'] = 1024
+ c_kwargs['store_comp_size'] = store_comp_size['store_comp_size'] + 5
+
+ with pytest.raises(ValueError):
+ lz4.stream.LZ4StreamCompressor(**c_kwargs)
+
+
+def test_invalid_config_d_3(store_comp_size):
+ d_kwargs = {}
+ d_kwargs['strategy'] = "double_buffer"
+ d_kwargs['buffer_size'] = 1024
+ d_kwargs['store_comp_size'] = store_comp_size['store_comp_size'] + 5
+
+ with pytest.raises(ValueError):
+ lz4.stream.LZ4StreamDecompressor(**d_kwargs)
+
+
+def test_invalid_config_c_4(store_comp_size):
+ c_kwargs = {}
+ c_kwargs['strategy'] = "double_buffer"
+ c_kwargs['buffer_size'] = 1 << (8 * store_comp_size['store_comp_size'])
+ c_kwargs.update(store_comp_size)
+
+ if store_comp_size['store_comp_size'] >= 4:
+ # No need for skiping this test case, since arguments check is
+ # expecting to raise an error.
+
+ # Make sure the page size is larger than what the input bound will be,
+ # but still fit in 4 bytes
+ c_kwargs['buffer_size'] -= 1
+
+ if c_kwargs['buffer_size'] > lz4.stream.LZ4_MAX_INPUT_SIZE:
+ message = r"^Invalid buffer_size argument: \d+. Cannot define output buffer size. Must be lesser or equal to 2113929216$" # noqa
+ err_class = ValueError
+ else:
+ message = r"^Inconsistent buffer_size/store_comp_size values. Maximal compressed length \(\d+\) cannot fit in a \d+ byte-long integer$" # noqa
+ err_class = lz4.stream.LZ4StreamError
+
+ with pytest.raises(err_class, match=message):
+ lz4.stream.LZ4StreamCompressor(**c_kwargs)
+
+
+def test_invalid_config_d_4(store_comp_size):
+ d_kwargs = {}
+ d_kwargs['strategy'] = "double_buffer"
+ d_kwargs['buffer_size'] = 1 << (8 * store_comp_size['store_comp_size'])
+ d_kwargs.update(store_comp_size)
+
+ if store_comp_size['store_comp_size'] >= 4:
+
+ if sys.maxsize < 0xffffffff:
+ pytest.skip('Py_ssize_t too small for this test')
+
+ # Make sure the page size is larger than what the input bound will be,
+ # but still fit in 4 bytes
+ d_kwargs['buffer_size'] -= 1
+
+ # No failure expected during instanciation/initialization
+ lz4.stream.LZ4StreamDecompressor(**d_kwargs)
+
+
+def test_invalid_config_c_5():
+ c_kwargs = {}
+ c_kwargs['strategy'] = "double_buffer"
+ c_kwargs['buffer_size'] = lz4.stream.LZ4_MAX_INPUT_SIZE
+
+ if sys.maxsize < 0xffffffff:
+ pytest.skip('Py_ssize_t too small for this test')
+
+ # No failure expected
+ lz4.stream.LZ4StreamCompressor(**c_kwargs)
+
+ c_kwargs['buffer_size'] = lz4.stream.LZ4_MAX_INPUT_SIZE + 1
+ with pytest.raises(ValueError):
+ lz4.stream.LZ4StreamCompressor(**c_kwargs)
+
+ # Make sure the page size is larger than what the input bound will be,
+ # but still fit in 4 bytes
+ c_kwargs['buffer_size'] = _4GB - 1 # 4GB - 1 (to fit in 4 bytes)
+ with pytest.raises(ValueError):
+ lz4.stream.LZ4StreamCompressor(**c_kwargs)
+
+
+def test_invalid_config_d_5():
+ d_kwargs = {}
+ d_kwargs['strategy'] = "double_buffer"
+
+ # No failure expected during instanciation/initialization
+ d_kwargs['buffer_size'] = lz4.stream.LZ4_MAX_INPUT_SIZE
+
+ if sys.maxsize < 0xffffffff:
+ pytest.skip('Py_ssize_t too small for this test')
+
+ lz4.stream.LZ4StreamDecompressor(**d_kwargs)
+
+ # No failure expected during instanciation/initialization
+ d_kwargs['buffer_size'] = lz4.stream.LZ4_MAX_INPUT_SIZE + 1
+
+ if sys.maxsize < 0xffffffff:
+ pytest.skip('Py_ssize_t too small for this test')
+
+ lz4.stream.LZ4StreamDecompressor(**d_kwargs)
+
+ # No failure expected during instanciation/initialization
+ d_kwargs['buffer_size'] = _4GB - 1 # 4GB - 1 (to fit in 4 bytes)
+
+ if sys.maxsize < 0xffffffff:
+ pytest.skip('Py_ssize_t too small for this test')
+
+ lz4.stream.LZ4StreamDecompressor(**d_kwargs)
+
+
+def test_decompress_corrupted_input_1():
+ c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
+
+ d_kwargs = {}
+ d_kwargs.update(c_kwargs)
+
+ data = compress(b'A' * 512, c_kwargs)
+ decompress(data, d_kwargs)
+
+ message = r"^Requested input size \(\d+\) larger than source size \(\d+\)$"
+
+ with pytest.raises(lz4.stream.LZ4StreamError, match=message):
+ decompress(data[4:], d_kwargs)
+
+
+def test_decompress_corrupted_input_2():
+ c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
+
+ d_kwargs = {}
+ d_kwargs.update(c_kwargs)
+
+ data = compress(b'A' * 512, c_kwargs)
+ decompress(data, d_kwargs)
+
+ message = r"^Decompression failed. error: \d+$"
+
+ # Block size corruption in the first block
+
+ # Block size longer than actual:
+ data = int_to_bytes(int_from_bytes(data[:4], 'little') + 1, 4, 'little') + data[4:]
+ with pytest.raises(lz4.stream.LZ4StreamError, match=message):
+ decompress(data, d_kwargs)
+
+ # Block size shorter than actual:
+ data = int_to_bytes(int_from_bytes(data[:4], 'little') - 2, 4, 'little') + data[4:]
+ with pytest.raises(lz4.stream.LZ4StreamError, match=message):
+ decompress(data, d_kwargs)
+
+
+def test_decompress_corrupted_input_3():
+ c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
+
+ d_kwargs = {}
+ d_kwargs.update(c_kwargs)
+
+ data = compress(b'A' * 512, c_kwargs)
+ decompress(data, d_kwargs)
+
+ message = r"^Decompression failed. error: \d+$"
+
+ # Block size corruption in a block in the middle of the stream
+ offset = 4 + int_from_bytes(data[:4], 'little')
+
+ # Block size longer than actual:
+ block_len = int_from_bytes(data[offset:offset + 4], 'little') + 1
+ data = data[:offset] + int_to_bytes(block_len, 4, 'little') + data[offset + 4:]
+
+ with pytest.raises(lz4.stream.LZ4StreamError, match=message):
+ decompress(data, d_kwargs)
+
+ # Block size shorter than actual:
+ block_len = int_from_bytes(data[offset:offset + 4], 'little') - 2
+ data = data[:offset] + int_to_bytes(block_len, 4, 'little') + data[offset + 4:]
+
+ with pytest.raises(lz4.stream.LZ4StreamError, match=message):
+ decompress(data, d_kwargs)
+
+
+def test_decompress_corrupted_input_4():
+ c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
+
+ d_kwargs = {}
+ d_kwargs.update(c_kwargs)
+
+ data = compress(b'A' * 256, c_kwargs)
+ decompress(data, d_kwargs)
+
+ # Block size corruption in the last block of the stream
+ offset = 4 + int_from_bytes(data[:4], 'little')
+
+ # Block size longer than actual:
+ block_len = int_from_bytes(data[offset:offset + 4], 'little') + 1
+ data = data[:offset] + int_to_bytes(block_len, 4, 'little') + data[offset + 4:]
+
+ message = r"^Requested input size \(\d+\) larger than source size \(\d+\)$"
+
+ with pytest.raises(lz4.stream.LZ4StreamError, match=message):
+ decompress(data, d_kwargs)
+
+ # Block size shorter than actual:
+ block_len = int_from_bytes(data[offset:offset + 4], 'little') - 2
+ data = data[:offset] + int_to_bytes(block_len, 4, 'little') + data[offset + 4:]
+
+ message = r"^Decompression failed. error: \d+$"
+
+ with pytest.raises(lz4.stream.LZ4StreamError, match=message):
+ decompress(data, d_kwargs)
+
+
+def test_decompress_truncated():
+ c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
+
+ d_kwargs = {}
+ d_kwargs.update(c_kwargs)
+
+ input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
+ compressed, block_offsets = compress(input_data, c_kwargs, return_block_offset=True)
+
+ last_block_offset = 0
+ for n in range(len(compressed)):
+ if n in block_offsets:
+ # end of input matches end of block, so decompression must succeed
+ last_block_offset = n
+ decompress(compressed[:n], d_kwargs)
+
+ else:
+ # end of input does not match end of block, so decompression failure is expected
+ if n - last_block_offset < c_kwargs['store_comp_size']:
+ message = "^Invalid source, too small for holding any block$"
+ else:
+ message = r"^Requested input size \(\d+\) larger than source size \(\d+\)$"
+
+ with pytest.raises(lz4.stream.LZ4StreamError, match=message):
+ decompress(compressed[:n], d_kwargs)
+
+
+# This next test is probably redundant given test_decompress_truncated above
+# since the trailing bytes will be considered as the truncated last block, but
+# we will keep them for now
+
+
+def test_decompress_with_trailer():
+ c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
+
+ d_kwargs = {}
+ d_kwargs.update(c_kwargs)
+
+ data = b'A' * 64
+ comp = compress(data, c_kwargs)
+
+ message = "^Invalid source, too small for holding any block$"
+ with pytest.raises(lz4.stream.LZ4StreamError, match=message):
+ decompress(comp + b'A', d_kwargs)
+
+ message = r"^Requested input size \(\d+\) larger than source size \(\d+\)$"
+ with pytest.raises(lz4.stream.LZ4StreamError, match=message):
+ decompress(comp + b'A' * 10, d_kwargs)
+
+ for n in range(1, 10):
+ if n < d_kwargs['store_comp_size']:
+ message = "^Invalid source, too small for holding any block$"
+ else:
+ message = r"^Decompression failed. error: \d+$"
+ with pytest.raises(lz4.stream.LZ4StreamError, match=message):
+ decompress(comp + b'\x00' * n, d_kwargs)
+
+
+def test_unicode():
+ if sys.version_info < (3,):
+ return # skip
+
+ c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
+
+ d_kwargs = {}
+ d_kwargs.update(c_kwargs)
+
+ DATA = b'x'
+ with pytest.raises(TypeError):
+ compress(DATA.decode('latin1'), c_kwargs)
+ decompress(compress(DATA, c_kwargs).decode('latin1'), d_kwargs)
+
+
+# These next two are probably redundant given test_1 above but we'll keep them
+# for now
+
+
+def test_return_bytearray():
+ if sys.version_info < (3,):
+ return # skip
+
+ c_kwargs_r = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
+ c_kwargs = {'return_bytearray': True}
+ c_kwargs.update(c_kwargs_r)
+
+ d_kwargs = {}
+ d_kwargs.update(c_kwargs)
+
+ data = os.urandom(128 * 1024) # Read 128kb
+ compressed = compress(data, c_kwargs_r, check_block_type=True)
+ b = compress(data, c_kwargs, check_block_type=True)
+ assert isinstance(b, bytearray)
+ assert bytes(b) == compressed
+ b = decompress(compressed, d_kwargs, check_chunk_type=True)
+ assert isinstance(b, bytearray)
+ assert bytes(b) == data
+
+
+def test_memoryview():
+ if sys.version_info < (2, 7):
+ return # skip
+
+ c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
+
+ d_kwargs = {}
+ d_kwargs.update(c_kwargs)
+
+ data = os.urandom(128 * 1024) # Read 128kb
+ compressed = compress(data, c_kwargs)
+ assert compress(memoryview(data), c_kwargs) == compressed
+ assert decompress(memoryview(compressed), d_kwargs) == data
+
+
+def test_with_dict_none():
+ kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
+
+ input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
+ for mode in ['default', 'high_compression']:
+ c_kwargs = {'mode': mode, 'dictionary': None}
+ c_kwargs.update(kwargs)
+ d_kwargs = {}
+ d_kwargs.update(kwargs)
+ assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data
+
+ c_kwargs = {'mode': mode}
+ c_kwargs.update(kwargs)
+ d_kwargs = {'dictionary': None}
+ d_kwargs.update(kwargs)
+ assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data
+
+ c_kwargs = {'mode': mode, 'dictionary': b''}
+ c_kwargs.update(kwargs)
+ d_kwargs = {}
+ d_kwargs.update(kwargs)
+ assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data
+
+ c_kwargs = {'mode': mode}
+ c_kwargs.update(kwargs)
+ d_kwargs = {'dictionary': b''}
+ d_kwargs.update(kwargs)
+ assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data
+
+ c_kwargs = {'mode': mode, 'dictionary': ''}
+ c_kwargs.update(kwargs)
+ d_kwargs = {}
+ d_kwargs.update(kwargs)
+ assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data
+
+ c_kwargs = {'mode': mode}
+ c_kwargs.update(kwargs)
+ d_kwargs = {'dictionary': ''}
+ d_kwargs.update(kwargs)
+ assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data
+
+
+def test_with_dict():
+ kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
+
+ input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
+ dict1 = input_data[10:30]
+ dict2 = input_data[20:40]
+ message = r"^Decompression failed. error: \d+$"
+
+ for mode in ['default', 'high_compression']:
+ c_kwargs = {'mode': mode, 'dictionary': dict1}
+ c_kwargs.update(kwargs)
+ compressed = compress(input_data, c_kwargs)
+
+ d_kwargs = {}
+ d_kwargs.update(kwargs)
+ with pytest.raises(lz4.stream.LZ4StreamError, match=message):
+ decompress(compressed, d_kwargs)
+
+ d_kwargs = {'dictionary': dict1[:2]}
+ d_kwargs.update(kwargs)
+ with pytest.raises(lz4.stream.LZ4StreamError, match=message):
+ decompress(compressed, d_kwargs)
+
+ d_kwargs = {'dictionary': dict2}
+ d_kwargs.update(kwargs)
+ assert decompress(compressed, d_kwargs) != input_data
+
+ d_kwargs = {'dictionary': dict1}
+ d_kwargs.update(kwargs)
+ assert decompress(compressed, d_kwargs) == input_data
+
+ c_kwargs = {}
+ c_kwargs.update(kwargs)
+ d_kwargs = {'dictionary': dict1}
+ d_kwargs.update(kwargs)
+ assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data
+
+
+def test_known_decompress_1():
+ d_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
+
+ output = b''
+
+ input = b'\x00\x00\x00\x00'
+ message = "^Decompression failed. error: 1$"
+ with pytest.raises(lz4.stream.LZ4StreamError, match=message):
+ decompress(input, d_kwargs)
+
+ input = b'\x01\x00\x00\x00\x00'
+ assert decompress(input, d_kwargs) == output
+
+
+def test_known_decompress_2():
+ d_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
+
+ input = b'\x02\x00\x00\x00\x10 '
+ output = b' '
+ assert decompress(input, d_kwargs) == output
+
+
+def test_known_decompress_3():
+ d_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
+
+ # uncompressed data size smaller than buffer_size
+ input = b'%\x00\x00\x00\xff\x0bLorem ipsum dolor sit amet\x1a\x006P amet'
+ output = b'Lorem ipsum dolor sit amet' * 4
+ assert decompress(input, d_kwargs) == output
+
+
+def test_known_decompress_4():
+ d_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
+
+ input = b'%\x00\x00\x00\xff\x0bLorem ipsum dolor sit amet\x1a\x00NPit am\n\x00\x00\x00\x0fh\x00hP sit \x05\x00\x00\x00@amet'
+ output = b'Lorem ipsum dolor sit amet' * 10
+ assert decompress(input, d_kwargs) == output
diff --git a/contrib/python/lz4/py3/tests/stream/test_stream_2.py b/contrib/python/lz4/py3/tests/stream/test_stream_2.py
new file mode 100644
index 0000000000..5578f832c4
--- /dev/null
+++ b/contrib/python/lz4/py3/tests/stream/test_stream_2.py
@@ -0,0 +1,152 @@
+import pytest
+import sys
+import lz4.stream
+import psutil
+import os
+
+
+# This test requires allocating a big lump of memory. In order to
+# avoid a massive memory allocation during byte compilation, we have
+# to declare a variable for the size of the buffer we're going to
+# create outside the scope of the function below. See:
+# https://bugs.python.org/issue21074
+
+_4GB = 0xffffffff # actually 4GB - 1B, the maximum size on 4 bytes.
+
+# This test will be killed on Travis due to the 3GB memory limit
+# there. Unfortunately psutil reports the host memory, not the memory
+# available to the container, and so can't be used to detect available
+# memory, so instead, as an ugly hack for detecting we're on Travis we
+# check for the TRAVIS environment variable being set. This is quite
+# fragile.
+
+if os.environ.get('TRAVIS') is not None or sys.maxsize < _4GB or \
+ psutil.virtual_memory().available < _4GB:
+ huge = None
+else:
+ try:
+ huge = b'\0' * _4GB
+ except (MemoryError, OverflowError):
+ huge = None
+
+
+@pytest.mark.skipif(
+ os.environ.get('TRAVIS') is not None,
+ reason='Skipping test on Travis due to insufficient memory'
+)
+@pytest.mark.skipif(
+ sys.maxsize < _4GB,
+ reason='Py_ssize_t too small for this test'
+)
+@pytest.mark.skipif(
+ psutil.virtual_memory().available < _4GB or huge is None,
+ reason='Insufficient system memory for this test'
+)
+def test_huge_1():
+ data = b''
+ kwargs = {
+ 'strategy': "double_buffer",
+ 'buffer_size': lz4.stream.LZ4_MAX_INPUT_SIZE,
+ 'store_comp_size': 4,
+ 'dictionary': huge,
+ }
+
+ if psutil.virtual_memory().available < 3 * kwargs['buffer_size']:
+ # The internal LZ4 context will request at least 3 times buffer_size
+ # as memory (2 buffer_size for the double-buffer, and 1.x buffer_size
+ # for the output buffer)
+ pytest.skip('Insufficient system memory for this test')
+
+ # Triggering overflow error
+ message = r'^Dictionary too large for LZ4 API$'
+
+ with pytest.raises(OverflowError, match=message):
+ with lz4.stream.LZ4StreamCompressor(**kwargs) as proc:
+ proc.compress(data)
+
+ with pytest.raises(OverflowError, match=message):
+ with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc:
+ proc.decompress(data)
+
+
+@pytest.mark.skipif(
+ os.environ.get('TRAVIS') is not None,
+ reason='Skipping test on Travis due to insufficient memory'
+)
+@pytest.mark.skipif(
+ sys.maxsize < 0xffffffff,
+ reason='Py_ssize_t too small for this test'
+)
+@pytest.mark.skipif(
+ psutil.virtual_memory().available < _4GB or huge is None,
+ reason='Insufficient system memory for this test'
+)
+def test_huge_2():
+ data = huge
+ kwargs = {
+ 'strategy': "double_buffer",
+ 'buffer_size': lz4.stream.LZ4_MAX_INPUT_SIZE,
+ 'store_comp_size': 4,
+ 'dictionary': b'',
+ }
+
+ if psutil.virtual_memory().available < 3 * kwargs['buffer_size']:
+ # The internal LZ4 context will request at least 3 times buffer_size
+ # as memory (2 buffer_size for the double-buffer, and 1.x buffer_size
+ # for the output buffer)
+ pytest.skip('Insufficient system memory for this test')
+
+ # Raising overflow error
+ message = r'^Input too large for LZ4 API$'
+
+ with pytest.raises(OverflowError, match=message):
+ with lz4.stream.LZ4StreamCompressor(**kwargs) as proc:
+ proc.compress(data)
+
+ # On decompression, too large input will raise LZ4StreamError
+ with pytest.raises(lz4.stream.LZ4StreamError):
+ with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc:
+ proc.decompress(data)
+
+
+@pytest.mark.skipif(
+ os.environ.get('TRAVIS') is not None,
+ reason='Skipping test on Travis due to insufficient memory'
+)
+@pytest.mark.skipif(
+ sys.maxsize < 0xffffffff,
+ reason='Py_ssize_t too small for this test'
+)
+@pytest.mark.skipif(
+ psutil.virtual_memory().available < _4GB or huge is None,
+ reason='Insufficient system memory for this test'
+)
+def test_huge_3():
+ data = huge
+ kwargs = {
+ 'strategy': "double_buffer",
+ 'buffer_size': lz4.stream.LZ4_MAX_INPUT_SIZE,
+ 'store_comp_size': 4,
+ 'dictionary': huge,
+ }
+
+ if psutil.virtual_memory().available < 3 * kwargs['buffer_size']:
+ # The internal LZ4 context will request at least 3 times buffer_size
+ # as memory (2 buffer_size for the double-buffer, and 1.x buffer_size
+ # for the output buffer)
+ pytest.skip('Insufficient system memory for this test')
+
+ # Raising overflow error (during initialization because of the dictionary parameter)
+ message = r'^Dictionary too large for LZ4 API$'
+
+ with pytest.raises(OverflowError, match=message):
+ with lz4.stream.LZ4StreamCompressor(**kwargs) as proc:
+ proc.compress(data)
+
+ with pytest.raises(OverflowError, match=message):
+ with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc:
+ proc.decompress(data)
+
+
+def test_dummy():
+ pass
diff --git a/contrib/python/lz4/py3/tests/stream/test_stream_3.py b/contrib/python/lz4/py3/tests/stream/test_stream_3.py
new file mode 100644
index 0000000000..2b52d6b549
--- /dev/null
+++ b/contrib/python/lz4/py3/tests/stream/test_stream_3.py
@@ -0,0 +1,123 @@
+import lz4.stream
+import pytest
+import sys
+
+
+_1KB = 1024
+_1MB = _1KB * 1024
+_1GB = _1MB * 1024
+
+
+def compress(x, c_kwargs):
+ c = []
+ with lz4.stream.LZ4StreamCompressor(**c_kwargs) as proc:
+ for start in range(0, len(x), c_kwargs['buffer_size']):
+ chunk = x[start:start + c_kwargs['buffer_size']]
+ block = proc.compress(chunk)
+ c.append(block)
+ if c_kwargs.get('return_bytearray', False):
+ return bytearray().join(c)
+ else:
+ return bytes().join(c)
+
+
+def decompress(x, d_kwargs):
+ d = []
+ with lz4.stream.LZ4StreamDecompressor(**d_kwargs) as proc:
+ start = 0
+ while start < len(x):
+ block = proc.get_block(x[start:])
+ chunk = proc.decompress(block)
+ d.append(chunk)
+ start += d_kwargs['store_comp_size'] + len(block)
+ if d_kwargs.get('return_bytearray', False):
+ return bytearray().join(d)
+ else:
+ return bytes().join(d)
+
+
+test_buffer_size = sorted(
+ [256,
+ 1 * _1KB,
+ 64 * _1KB,
+ 1 * _1MB,
+ 1 * _1GB,
+ lz4.stream.LZ4_MAX_INPUT_SIZE]
+)
+
+
+@pytest.fixture(
+ params=test_buffer_size,
+ ids=[
+ 'buffer_size' + str(i) for i in range(len(test_buffer_size))
+ ]
+)
+def buffer_size(request):
+ return request.param
+
+
+test_data = [
+ (b'a' * _1MB),
+]
+
+
+@pytest.fixture(
+ params=test_data,
+ ids=[
+ 'data' + str(i) for i in range(len(test_data))
+ ]
+)
+def data(request):
+ return request.param
+
+
+def test_block_decompress_mem_usage(data, buffer_size):
+ kwargs = {
+ 'strategy': "double_buffer",
+ 'buffer_size': buffer_size,
+ 'store_comp_size': 4,
+ }
+
+ if sys.maxsize < 0xffffffff:
+ pytest.skip('Py_ssize_t too small for this test')
+
+ tracemalloc = pytest.importorskip('tracemalloc')
+
+ # Trace memory usage on compression
+ tracemalloc.start()
+ prev_snapshot = None
+
+ for i in range(1000):
+ compressed = compress(data, kwargs)
+
+ if i % 100 == 0:
+ snapshot = tracemalloc.take_snapshot()
+
+ if prev_snapshot:
+ # Filter on lz4.stream module'a allocations
+ stats = [x for x in snapshot.compare_to(prev_snapshot, 'lineno')
+ if lz4.stream.__file__ in x.traceback._frames[0][0]]
+ assert sum(map(lambda x: x.size_diff, stats)) < (1024 * 4)
+
+ prev_snapshot = snapshot
+
+ tracemalloc.stop()
+
+ tracemalloc.start()
+ prev_snapshot = None
+
+ for i in range(1000):
+ decompressed = decompress(compressed, kwargs) # noqa: F841
+
+ if i % 100 == 0:
+ snapshot = tracemalloc.take_snapshot()
+
+ if prev_snapshot:
+ # Filter on lz4.stream module'a allocations
+ stats = [x for x in snapshot.compare_to(prev_snapshot, 'lineno')
+ if lz4.stream.__file__ in x.traceback._frames[0][0]]
+ assert sum(map(lambda x: x.size_diff, stats)) < (1024 * 4)
+
+ prev_snapshot = snapshot
+
+ tracemalloc.stop()
diff --git a/contrib/python/lz4/py3/tests/stream/test_stream_4.py b/contrib/python/lz4/py3/tests/stream/test_stream_4.py
new file mode 100644
index 0000000000..3d139a02ef
--- /dev/null
+++ b/contrib/python/lz4/py3/tests/stream/test_stream_4.py
@@ -0,0 +1,139 @@
+import lz4.stream
+import pytest
+import sys
+
+
+if sys.version_info < (3, ):
+ from struct import pack, unpack
+
+ def _get_format(length, byteorder, signed):
+ _order = {'l': '<', 'b': '>'}
+ _fmt = {1: 'b', 2: 'h', 4: 'i', 8: 'q'}
+ _sign = {True: lambda x: x.lower(), False: lambda x: x.upper()}
+ return _sign[signed](_order[byteorder[0].lower()] + _fmt[length])
+
+ def int_to_bytes(value, length=4, byteorder='little', signed=False):
+ return bytearray(pack(_get_format(length, byteorder, signed), value))
+
+ def int_from_bytes(bytes, byteorder='little', signed=False):
+ return unpack(_get_format(len(bytes), byteorder, signed), bytes)[0]
+
+else:
+ def int_to_bytes(value, length=4, byteorder='little', signed=False):
+ return value.to_bytes(length, byteorder, signed=signed)
+
+ def int_from_bytes(bytes, byteorder='little', signed=False):
+ return int.from_bytes(bytes, byteorder, signed=signed)
+
+# Out-of-band block size record tests
+
+
+def test_round_trip():
+ data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
+ kwargs = {'strategy': "double_buffer", 'buffer_size': 256, 'store_comp_size': 4}
+
+ oob_kwargs = {}
+ oob_kwargs.update(kwargs)
+ oob_kwargs['store_comp_size'] = 0
+
+ ib_cstream = bytearray()
+ oob_cstream = bytearray()
+ oob_sizes = []
+
+ with lz4.stream.LZ4StreamCompressor(**kwargs) as ib_proc, \
+ lz4.stream.LZ4StreamCompressor(**oob_kwargs) as oob_proc:
+ for start in range(0, len(data), kwargs['buffer_size']):
+ chunk = data[start:start + kwargs['buffer_size']]
+ ib_block = ib_proc.compress(chunk)
+ oob_block = oob_proc.compress(chunk)
+
+ assert (len(ib_block) == (len(oob_block) + kwargs['store_comp_size'])), \
+ "Blocks size mismatch: " \
+ "{}/{}".format(len(ib_block), len(oob_block) + kwargs['store_comp_size'])
+
+ assert (int_from_bytes(ib_block[:kwargs['store_comp_size']]) == len(oob_block)), \
+ "Blocks size record mismatch: got {}, expected {}".format(
+ int_from_bytes(ib_block[:kwargs['store_comp_size']]),
+ len(oob_block))
+
+ assert (ib_block[kwargs['store_comp_size']:] == oob_block), "Blocks data mismatch"
+
+ ib_cstream += ib_block
+ oob_cstream += oob_block
+ oob_sizes.append(len(oob_block))
+
+ ib_dstream = bytearray()
+ oob_dstream = bytearray()
+
+ with lz4.stream.LZ4StreamDecompressor(**kwargs) as ib_proc, \
+ lz4.stream.LZ4StreamDecompressor(**oob_kwargs) as oob_proc:
+ ib_offset = 0
+ oob_index = 0
+ oob_offset = 0
+ while ib_offset < len(ib_cstream) and oob_index < len(oob_sizes):
+ ib_block = ib_proc.get_block(ib_cstream[ib_offset:])
+ oob_block = oob_cstream[oob_offset:oob_offset + oob_sizes[oob_index]]
+
+ assert (len(ib_block) == len(oob_block)), \
+ "Blocks size mismatch: {}/{}".format(len(ib_block), len(oob_block))
+
+ assert (ib_block == oob_block), "Blocks data mismatch"
+
+ ib_chunk = ib_proc.decompress(ib_block)
+ oob_chunk = oob_proc.decompress(oob_block)
+
+ assert (len(ib_chunk) == len(oob_chunk)), \
+ "Chunks size mismatch: {}/{}".format(len(ib_chunk), len(oob_chunk))
+
+ assert (ib_chunk == oob_chunk), "Chunks data mismatch"
+
+ ib_dstream += ib_chunk
+ oob_dstream += oob_chunk
+
+ ib_offset += kwargs['store_comp_size'] + len(ib_block)
+ oob_offset += oob_sizes[oob_index]
+ oob_index += 1
+
+ assert (len(ib_dstream) == len(oob_dstream)), "Decompressed streams length mismatch"
+
+ assert (len(data) == len(ib_dstream)), "Decompressed streams length mismatch"
+
+ assert (len(data) == len(oob_dstream)), "Decompressed streams length mismatch"
+
+ assert (ib_dstream == oob_dstream), "Decompressed streams mismatch"
+
+ assert (data == ib_dstream), "Decompressed streams mismatch"
+
+ assert (data == oob_dstream), "Decompressed streams mismatch"
+
+
+def test_invalid_usage():
+ data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
+ kwargs = {'strategy': "double_buffer", 'buffer_size': 256, 'store_comp_size': 0}
+
+ cstream = bytearray()
+ oob_sizes = []
+
+ with lz4.stream.LZ4StreamCompressor(**kwargs) as proc:
+ for start in range(0, len(data), kwargs['buffer_size']):
+ chunk = data[start:start + kwargs['buffer_size']]
+ block = proc.compress(chunk)
+ cstream += block
+ oob_sizes.append(len(block))
+
+ message = r"^LZ4 context is configured for storing block size out-of-band$"
+
+ with pytest.raises(lz4.stream.LZ4StreamError, match=message):
+ dstream = bytearray()
+
+ with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc:
+ offset = 0
+ index = 0
+ while offset < len(cstream):
+ block = proc.get_block(cstream[offset:])
+ chunk = proc.decompress(block)
+
+ dstream += chunk
+
+ offset += kwargs['store_comp_size'] + len(block)
+ index += 1