diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 09:58:56 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 10:20:20 +0300 |
commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/python/lz4/py3/tests/stream | |
parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
download | ydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/python/lz4/py3/tests/stream')
-rw-r--r-- | contrib/python/lz4/py3/tests/stream/conftest.py | 155 | ||||
-rw-r--r-- | contrib/python/lz4/py3/tests/stream/numpy_byte_array.bin | bin | 0 -> 8552 bytes | |||
-rw-r--r-- | contrib/python/lz4/py3/tests/stream/test_stream_0.py | 116 | ||||
-rw-r--r-- | contrib/python/lz4/py3/tests/stream/test_stream_1.py | 555 | ||||
-rw-r--r-- | contrib/python/lz4/py3/tests/stream/test_stream_2.py | 152 | ||||
-rw-r--r-- | contrib/python/lz4/py3/tests/stream/test_stream_3.py | 123 | ||||
-rw-r--r-- | contrib/python/lz4/py3/tests/stream/test_stream_4.py | 139 |
7 files changed, 1240 insertions, 0 deletions
diff --git a/contrib/python/lz4/py3/tests/stream/conftest.py b/contrib/python/lz4/py3/tests/stream/conftest.py new file mode 100644 index 0000000000..b31ab14317 --- /dev/null +++ b/contrib/python/lz4/py3/tests/stream/conftest.py @@ -0,0 +1,155 @@ +import pytest +import os +import sys + +test_data = [ + (b''), + (os.urandom(8 * 1024)), + # (b'0' * 8 * 1024), + # (bytearray(b'')), + # (bytearray(os.urandom(8 * 1024))), + #(bytearray(open(os.path.join(os.path.dirname(__file__), 'numpy_byte_array.bin'), 'rb').read())) +] + +if sys.version_info > (2, 7): + test_data += [ + (memoryview(b'')), + (memoryview(os.urandom(8 * 1024))) + ] + + +@pytest.fixture( + params=test_data, + ids=[ + 'data' + str(i) for i in range(len(test_data)) + ] +) +def data(request): + return request.param + + +@pytest.fixture( + params=[ + ("double_buffer"), + # ("ring_buffer"), # not implemented + ] +) +def strategy(request): + return request.param + + +test_buffer_size = sorted( + [1, + # 4, + # 8, + # 64, + # 256, + 941, + # 1 * 1024, + # 4 * 1024, + # 8 * 1024, + # 16 * 1024, + # 32 * 1024, + 64 * 1024, + # 128 * 1024 + ] +) + + +@pytest.fixture( + params=test_buffer_size, + ids=[ + 'buffer_size' + str(i) for i in range(len(test_buffer_size)) + ] +) +def buffer_size(request): + return request.param + + +@pytest.fixture( + params=[ + ( + { + 'store_comp_size': 1 + } + ), + ( + { + 'store_comp_size': 2 + } + ), + # ( + # { + # 'store_comp_size': 4 + # } + # ), + ] +) +def store_comp_size(request): + return request.param + + +@pytest.fixture( + params=[ + ( + { + 'return_bytearray': True + } + ), + ( + { + 'return_bytearray': False + } + ), + ] +) +def return_bytearray(request): + return request.param + + +@pytest.fixture +def c_return_bytearray(return_bytearray): + return return_bytearray + + +@pytest.fixture +def d_return_bytearray(return_bytearray): + return return_bytearray + + +@pytest.fixture( + params=[ + ('default', None) + ] + [ + ('fast', None) + ] + [ + ('fast', {'acceleration': 2 * s}) for s in range(5) + ] + [ + ('high_compression', None) + ] + [ + ('high_compression', {'compression_level': 2 * s}) for s in range(9) + ] + [ + (None, None) + ] +) +def mode(request): + return request.param + + +dictionary = [ + None, + (0, 0), + (100, 200), + (0, 8 * 1024), + os.urandom(8 * 1024) +] + + +@pytest.fixture( + params=dictionary, + ids=[ + 'dictionary' + str(i) for i in range(len(dictionary)) + ] +) +def dictionary(request): + return request.param diff --git a/contrib/python/lz4/py3/tests/stream/numpy_byte_array.bin b/contrib/python/lz4/py3/tests/stream/numpy_byte_array.bin Binary files differnew file mode 100644 index 0000000000..49537e2d90 --- /dev/null +++ b/contrib/python/lz4/py3/tests/stream/numpy_byte_array.bin diff --git a/contrib/python/lz4/py3/tests/stream/test_stream_0.py b/contrib/python/lz4/py3/tests/stream/test_stream_0.py new file mode 100644 index 0000000000..03b19f3f42 --- /dev/null +++ b/contrib/python/lz4/py3/tests/stream/test_stream_0.py @@ -0,0 +1,116 @@ +import lz4.stream +import sys +import pytest +if sys.version_info <= (3, 2): + import struct + + +def get_stored_size(buff, block_length_size): + if sys.version_info > (2, 7): + if isinstance(buff, memoryview): + b = buff.tobytes() + else: + b = bytes(buff) + else: + b = bytes(buff) + + if len(b) < block_length_size: + return None + + if sys.version_info > (3, 2): + return int.from_bytes(b[:block_length_size], 'little') + else: + # This would not work on a memoryview object, hence buff.tobytes call + # above + fmt = {1: 'B', 2: 'H', 4: 'I', } + return struct.unpack('<' + fmt[block_length_size], b[:block_length_size])[0] + + +def roundtrip(x, c_kwargs, d_kwargs, dictionary): + if dictionary: + if isinstance(dictionary, tuple): + dict_ = x[dictionary[0]:dictionary[1]] + else: + dict_ = dictionary + c_kwargs['dictionary'] = dict_ + d_kwargs['dictionary'] = dict_ + + c = bytes() + with lz4.stream.LZ4StreamCompressor(**c_kwargs) as proc: + for start in range(0, len(x), c_kwargs['buffer_size']): + chunk = x[start:start + c_kwargs['buffer_size']] + assert len(chunk) <= c_kwargs['buffer_size'] + block = proc.compress(chunk) + if c_kwargs.get('return_bytearray'): + assert isinstance(block, bytearray) + if start == 0: + c = block + else: + c += block + assert get_stored_size(block, c_kwargs['store_comp_size']) == \ + (len(block) - c_kwargs['store_comp_size']) + + d = bytes() + with lz4.stream.LZ4StreamDecompressor(**d_kwargs) as proc: + start = 0 + while start < len(c): + block = proc.get_block(c[start:]) + chunk = proc.decompress(block) + if d_kwargs.get('return_bytearray'): + assert isinstance(chunk, bytearray) + if start == 0: + d = chunk + else: + d += chunk + start += d_kwargs['store_comp_size'] + len(block) + + return d + + +def setup_kwargs(strategy, mode, buffer_size, store_comp_size, + c_return_bytearray=None, d_return_bytearray=None): + c_kwargs = {} + + if mode[0] is not None: + c_kwargs['mode'] = mode[0] + if mode[1] is not None: + c_kwargs.update(mode[1]) + + c_kwargs['strategy'] = strategy + c_kwargs['buffer_size'] = buffer_size + c_kwargs.update(store_comp_size) + + if c_return_bytearray: + c_kwargs.update(c_return_bytearray) + + d_kwargs = {} + + if d_return_bytearray: + d_kwargs.update(d_return_bytearray) + + d_kwargs['strategy'] = strategy + d_kwargs['buffer_size'] = buffer_size + d_kwargs.update(store_comp_size) + + return (c_kwargs, d_kwargs) + + +# Test single threaded usage with all valid variations of input +def test_1(data, strategy, mode, buffer_size, store_comp_size, + c_return_bytearray, d_return_bytearray, dictionary): + if buffer_size >= (1 << (8 * store_comp_size['store_comp_size'])): + pytest.skip("Invalid case: buffer_size too large for the block length area") + + (c_kwargs, d_kwargs) = setup_kwargs( + strategy, mode, buffer_size, store_comp_size, c_return_bytearray, d_return_bytearray) + + d = roundtrip(data, c_kwargs, d_kwargs, dictionary) + + assert d == data + + +# Test multi threaded: +# Not relevant in the lz4.stream case (the process is highly sequential, +# and re-use/share the same context from one input chunk to the next one). +def test_2(data, strategy, mode, buffer_size, store_comp_size, dictionary): # noqa + pass diff --git a/contrib/python/lz4/py3/tests/stream/test_stream_1.py b/contrib/python/lz4/py3/tests/stream/test_stream_1.py new file mode 100644 index 0000000000..6b49267e26 --- /dev/null +++ b/contrib/python/lz4/py3/tests/stream/test_stream_1.py @@ -0,0 +1,555 @@ +import lz4.stream +import pytest +import sys +import os + + +if sys.version_info < (3, ): + from struct import pack, unpack + + def _get_format(length, byteorder, signed): + _order = {'l': '<', 'b': '>'} + _fmt = {1: 'b', 2: 'h', 4: 'i', 8: 'q'} + _sign = {True: lambda x: x.lower(), False: lambda x: x.upper()} + return _sign[signed](_order[byteorder[0].lower()] + _fmt[length]) + + def int_to_bytes(value, length=4, byteorder='little', signed=False): + return bytearray(pack(_get_format(length, byteorder, signed), value)) + + def int_from_bytes(bytes, byteorder='little', signed=False): + return unpack(_get_format(len(bytes), byteorder, signed), bytes)[0] + +else: + def int_to_bytes(value, length=4, byteorder='little', signed=False): + return value.to_bytes(length, byteorder, signed=signed) + + def int_from_bytes(bytes, byteorder='little', signed=False): + return int.from_bytes(bytes, byteorder, signed=signed) + + +# This test requires allocating a big lump of memory. In order to +# avoid a massive memory allocation during byte compilation, we have +# to declare a variable for the size of the buffer we're going to +# create outside the scope of the function below. See: +# https://bugs.python.org/issue21074 +_4GB = 0x100000000 # 4GB + + +def compress(x, c_kwargs, return_block_offset=False, check_block_type=False): + o = [0, ] + if c_kwargs.get('return_bytearray', False): + c = bytearray() + else: + c = bytes() + with lz4.stream.LZ4StreamCompressor(**c_kwargs) as proc: + for start in range(0, len(x), c_kwargs['buffer_size']): + chunk = x[start:start + c_kwargs['buffer_size']] + block = proc.compress(chunk) + c += block + if return_block_offset: + o.append(len(c)) + if check_block_type: + assert isinstance(block, c.__class__) + if return_block_offset: + return c, o + else: + return c + + +def decompress(x, d_kwargs, check_chunk_type=False): + if d_kwargs.get('return_bytearray', False): + d = bytearray() + else: + d = bytes() + with lz4.stream.LZ4StreamDecompressor(**d_kwargs) as proc: + start = 0 + while start < len(x): + block = proc.get_block(x[start:]) + chunk = proc.decompress(block) + d += chunk + start += d_kwargs['store_comp_size'] + len(block) + if check_chunk_type: + assert isinstance(chunk, d.__class__) + return d + + +def test_invalid_config_c_1(): + c_kwargs = {} + c_kwargs['strategy'] = "ring_buffer" + c_kwargs['buffer_size'] = 1024 + + with pytest.raises(NotImplementedError): + lz4.stream.LZ4StreamCompressor(**c_kwargs) + + +def test_invalid_config_d_1(): + d_kwargs = {} + d_kwargs['strategy'] = "ring_buffer" + d_kwargs['buffer_size'] = 1024 + + with pytest.raises(NotImplementedError): + lz4.stream.LZ4StreamDecompressor(**d_kwargs) + + +def test_invalid_config_c_2(): + c_kwargs = {} + c_kwargs['strategy'] = "foo" + c_kwargs['buffer_size'] = 1024 + + with pytest.raises(ValueError): + lz4.stream.LZ4StreamCompressor(**c_kwargs) + + +def test_invalid_config_d_2(): + d_kwargs = {} + d_kwargs['strategy'] = "foo" + d_kwargs['buffer_size'] = 1024 + + with pytest.raises(ValueError): + lz4.stream.LZ4StreamDecompressor(**d_kwargs) + + +def test_invalid_config_c_3(store_comp_size): + c_kwargs = {} + c_kwargs['strategy'] = "double_buffer" + c_kwargs['buffer_size'] = 1024 + c_kwargs['store_comp_size'] = store_comp_size['store_comp_size'] + 5 + + with pytest.raises(ValueError): + lz4.stream.LZ4StreamCompressor(**c_kwargs) + + +def test_invalid_config_d_3(store_comp_size): + d_kwargs = {} + d_kwargs['strategy'] = "double_buffer" + d_kwargs['buffer_size'] = 1024 + d_kwargs['store_comp_size'] = store_comp_size['store_comp_size'] + 5 + + with pytest.raises(ValueError): + lz4.stream.LZ4StreamDecompressor(**d_kwargs) + + +def test_invalid_config_c_4(store_comp_size): + c_kwargs = {} + c_kwargs['strategy'] = "double_buffer" + c_kwargs['buffer_size'] = 1 << (8 * store_comp_size['store_comp_size']) + c_kwargs.update(store_comp_size) + + if store_comp_size['store_comp_size'] >= 4: + # No need for skiping this test case, since arguments check is + # expecting to raise an error. + + # Make sure the page size is larger than what the input bound will be, + # but still fit in 4 bytes + c_kwargs['buffer_size'] -= 1 + + if c_kwargs['buffer_size'] > lz4.stream.LZ4_MAX_INPUT_SIZE: + message = r"^Invalid buffer_size argument: \d+. Cannot define output buffer size. Must be lesser or equal to 2113929216$" # noqa + err_class = ValueError + else: + message = r"^Inconsistent buffer_size/store_comp_size values. Maximal compressed length \(\d+\) cannot fit in a \d+ byte-long integer$" # noqa + err_class = lz4.stream.LZ4StreamError + + with pytest.raises(err_class, match=message): + lz4.stream.LZ4StreamCompressor(**c_kwargs) + + +def test_invalid_config_d_4(store_comp_size): + d_kwargs = {} + d_kwargs['strategy'] = "double_buffer" + d_kwargs['buffer_size'] = 1 << (8 * store_comp_size['store_comp_size']) + d_kwargs.update(store_comp_size) + + if store_comp_size['store_comp_size'] >= 4: + + if sys.maxsize < 0xffffffff: + pytest.skip('Py_ssize_t too small for this test') + + # Make sure the page size is larger than what the input bound will be, + # but still fit in 4 bytes + d_kwargs['buffer_size'] -= 1 + + # No failure expected during instanciation/initialization + lz4.stream.LZ4StreamDecompressor(**d_kwargs) + + +def test_invalid_config_c_5(): + c_kwargs = {} + c_kwargs['strategy'] = "double_buffer" + c_kwargs['buffer_size'] = lz4.stream.LZ4_MAX_INPUT_SIZE + + if sys.maxsize < 0xffffffff: + pytest.skip('Py_ssize_t too small for this test') + + # No failure expected + lz4.stream.LZ4StreamCompressor(**c_kwargs) + + c_kwargs['buffer_size'] = lz4.stream.LZ4_MAX_INPUT_SIZE + 1 + with pytest.raises(ValueError): + lz4.stream.LZ4StreamCompressor(**c_kwargs) + + # Make sure the page size is larger than what the input bound will be, + # but still fit in 4 bytes + c_kwargs['buffer_size'] = _4GB - 1 # 4GB - 1 (to fit in 4 bytes) + with pytest.raises(ValueError): + lz4.stream.LZ4StreamCompressor(**c_kwargs) + + +def test_invalid_config_d_5(): + d_kwargs = {} + d_kwargs['strategy'] = "double_buffer" + + # No failure expected during instanciation/initialization + d_kwargs['buffer_size'] = lz4.stream.LZ4_MAX_INPUT_SIZE + + if sys.maxsize < 0xffffffff: + pytest.skip('Py_ssize_t too small for this test') + + lz4.stream.LZ4StreamDecompressor(**d_kwargs) + + # No failure expected during instanciation/initialization + d_kwargs['buffer_size'] = lz4.stream.LZ4_MAX_INPUT_SIZE + 1 + + if sys.maxsize < 0xffffffff: + pytest.skip('Py_ssize_t too small for this test') + + lz4.stream.LZ4StreamDecompressor(**d_kwargs) + + # No failure expected during instanciation/initialization + d_kwargs['buffer_size'] = _4GB - 1 # 4GB - 1 (to fit in 4 bytes) + + if sys.maxsize < 0xffffffff: + pytest.skip('Py_ssize_t too small for this test') + + lz4.stream.LZ4StreamDecompressor(**d_kwargs) + + +def test_decompress_corrupted_input_1(): + c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4} + + d_kwargs = {} + d_kwargs.update(c_kwargs) + + data = compress(b'A' * 512, c_kwargs) + decompress(data, d_kwargs) + + message = r"^Requested input size \(\d+\) larger than source size \(\d+\)$" + + with pytest.raises(lz4.stream.LZ4StreamError, match=message): + decompress(data[4:], d_kwargs) + + +def test_decompress_corrupted_input_2(): + c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4} + + d_kwargs = {} + d_kwargs.update(c_kwargs) + + data = compress(b'A' * 512, c_kwargs) + decompress(data, d_kwargs) + + message = r"^Decompression failed. error: \d+$" + + # Block size corruption in the first block + + # Block size longer than actual: + data = int_to_bytes(int_from_bytes(data[:4], 'little') + 1, 4, 'little') + data[4:] + with pytest.raises(lz4.stream.LZ4StreamError, match=message): + decompress(data, d_kwargs) + + # Block size shorter than actual: + data = int_to_bytes(int_from_bytes(data[:4], 'little') - 2, 4, 'little') + data[4:] + with pytest.raises(lz4.stream.LZ4StreamError, match=message): + decompress(data, d_kwargs) + + +def test_decompress_corrupted_input_3(): + c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4} + + d_kwargs = {} + d_kwargs.update(c_kwargs) + + data = compress(b'A' * 512, c_kwargs) + decompress(data, d_kwargs) + + message = r"^Decompression failed. error: \d+$" + + # Block size corruption in a block in the middle of the stream + offset = 4 + int_from_bytes(data[:4], 'little') + + # Block size longer than actual: + block_len = int_from_bytes(data[offset:offset + 4], 'little') + 1 + data = data[:offset] + int_to_bytes(block_len, 4, 'little') + data[offset + 4:] + + with pytest.raises(lz4.stream.LZ4StreamError, match=message): + decompress(data, d_kwargs) + + # Block size shorter than actual: + block_len = int_from_bytes(data[offset:offset + 4], 'little') - 2 + data = data[:offset] + int_to_bytes(block_len, 4, 'little') + data[offset + 4:] + + with pytest.raises(lz4.stream.LZ4StreamError, match=message): + decompress(data, d_kwargs) + + +def test_decompress_corrupted_input_4(): + c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4} + + d_kwargs = {} + d_kwargs.update(c_kwargs) + + data = compress(b'A' * 256, c_kwargs) + decompress(data, d_kwargs) + + # Block size corruption in the last block of the stream + offset = 4 + int_from_bytes(data[:4], 'little') + + # Block size longer than actual: + block_len = int_from_bytes(data[offset:offset + 4], 'little') + 1 + data = data[:offset] + int_to_bytes(block_len, 4, 'little') + data[offset + 4:] + + message = r"^Requested input size \(\d+\) larger than source size \(\d+\)$" + + with pytest.raises(lz4.stream.LZ4StreamError, match=message): + decompress(data, d_kwargs) + + # Block size shorter than actual: + block_len = int_from_bytes(data[offset:offset + 4], 'little') - 2 + data = data[:offset] + int_to_bytes(block_len, 4, 'little') + data[offset + 4:] + + message = r"^Decompression failed. error: \d+$" + + with pytest.raises(lz4.stream.LZ4StreamError, match=message): + decompress(data, d_kwargs) + + +def test_decompress_truncated(): + c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4} + + d_kwargs = {} + d_kwargs.update(c_kwargs) + + input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24 + compressed, block_offsets = compress(input_data, c_kwargs, return_block_offset=True) + + last_block_offset = 0 + for n in range(len(compressed)): + if n in block_offsets: + # end of input matches end of block, so decompression must succeed + last_block_offset = n + decompress(compressed[:n], d_kwargs) + + else: + # end of input does not match end of block, so decompression failure is expected + if n - last_block_offset < c_kwargs['store_comp_size']: + message = "^Invalid source, too small for holding any block$" + else: + message = r"^Requested input size \(\d+\) larger than source size \(\d+\)$" + + with pytest.raises(lz4.stream.LZ4StreamError, match=message): + decompress(compressed[:n], d_kwargs) + + +# This next test is probably redundant given test_decompress_truncated above +# since the trailing bytes will be considered as the truncated last block, but +# we will keep them for now + + +def test_decompress_with_trailer(): + c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4} + + d_kwargs = {} + d_kwargs.update(c_kwargs) + + data = b'A' * 64 + comp = compress(data, c_kwargs) + + message = "^Invalid source, too small for holding any block$" + with pytest.raises(lz4.stream.LZ4StreamError, match=message): + decompress(comp + b'A', d_kwargs) + + message = r"^Requested input size \(\d+\) larger than source size \(\d+\)$" + with pytest.raises(lz4.stream.LZ4StreamError, match=message): + decompress(comp + b'A' * 10, d_kwargs) + + for n in range(1, 10): + if n < d_kwargs['store_comp_size']: + message = "^Invalid source, too small for holding any block$" + else: + message = r"^Decompression failed. error: \d+$" + with pytest.raises(lz4.stream.LZ4StreamError, match=message): + decompress(comp + b'\x00' * n, d_kwargs) + + +def test_unicode(): + if sys.version_info < (3,): + return # skip + + c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4} + + d_kwargs = {} + d_kwargs.update(c_kwargs) + + DATA = b'x' + with pytest.raises(TypeError): + compress(DATA.decode('latin1'), c_kwargs) + decompress(compress(DATA, c_kwargs).decode('latin1'), d_kwargs) + + +# These next two are probably redundant given test_1 above but we'll keep them +# for now + + +def test_return_bytearray(): + if sys.version_info < (3,): + return # skip + + c_kwargs_r = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4} + c_kwargs = {'return_bytearray': True} + c_kwargs.update(c_kwargs_r) + + d_kwargs = {} + d_kwargs.update(c_kwargs) + + data = os.urandom(128 * 1024) # Read 128kb + compressed = compress(data, c_kwargs_r, check_block_type=True) + b = compress(data, c_kwargs, check_block_type=True) + assert isinstance(b, bytearray) + assert bytes(b) == compressed + b = decompress(compressed, d_kwargs, check_chunk_type=True) + assert isinstance(b, bytearray) + assert bytes(b) == data + + +def test_memoryview(): + if sys.version_info < (2, 7): + return # skip + + c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4} + + d_kwargs = {} + d_kwargs.update(c_kwargs) + + data = os.urandom(128 * 1024) # Read 128kb + compressed = compress(data, c_kwargs) + assert compress(memoryview(data), c_kwargs) == compressed + assert decompress(memoryview(compressed), d_kwargs) == data + + +def test_with_dict_none(): + kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4} + + input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24 + for mode in ['default', 'high_compression']: + c_kwargs = {'mode': mode, 'dictionary': None} + c_kwargs.update(kwargs) + d_kwargs = {} + d_kwargs.update(kwargs) + assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data + + c_kwargs = {'mode': mode} + c_kwargs.update(kwargs) + d_kwargs = {'dictionary': None} + d_kwargs.update(kwargs) + assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data + + c_kwargs = {'mode': mode, 'dictionary': b''} + c_kwargs.update(kwargs) + d_kwargs = {} + d_kwargs.update(kwargs) + assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data + + c_kwargs = {'mode': mode} + c_kwargs.update(kwargs) + d_kwargs = {'dictionary': b''} + d_kwargs.update(kwargs) + assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data + + c_kwargs = {'mode': mode, 'dictionary': ''} + c_kwargs.update(kwargs) + d_kwargs = {} + d_kwargs.update(kwargs) + assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data + + c_kwargs = {'mode': mode} + c_kwargs.update(kwargs) + d_kwargs = {'dictionary': ''} + d_kwargs.update(kwargs) + assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data + + +def test_with_dict(): + kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4} + + input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24 + dict1 = input_data[10:30] + dict2 = input_data[20:40] + message = r"^Decompression failed. error: \d+$" + + for mode in ['default', 'high_compression']: + c_kwargs = {'mode': mode, 'dictionary': dict1} + c_kwargs.update(kwargs) + compressed = compress(input_data, c_kwargs) + + d_kwargs = {} + d_kwargs.update(kwargs) + with pytest.raises(lz4.stream.LZ4StreamError, match=message): + decompress(compressed, d_kwargs) + + d_kwargs = {'dictionary': dict1[:2]} + d_kwargs.update(kwargs) + with pytest.raises(lz4.stream.LZ4StreamError, match=message): + decompress(compressed, d_kwargs) + + d_kwargs = {'dictionary': dict2} + d_kwargs.update(kwargs) + assert decompress(compressed, d_kwargs) != input_data + + d_kwargs = {'dictionary': dict1} + d_kwargs.update(kwargs) + assert decompress(compressed, d_kwargs) == input_data + + c_kwargs = {} + c_kwargs.update(kwargs) + d_kwargs = {'dictionary': dict1} + d_kwargs.update(kwargs) + assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data + + +def test_known_decompress_1(): + d_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4} + + output = b'' + + input = b'\x00\x00\x00\x00' + message = "^Decompression failed. error: 1$" + with pytest.raises(lz4.stream.LZ4StreamError, match=message): + decompress(input, d_kwargs) + + input = b'\x01\x00\x00\x00\x00' + assert decompress(input, d_kwargs) == output + + +def test_known_decompress_2(): + d_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4} + + input = b'\x02\x00\x00\x00\x10 ' + output = b' ' + assert decompress(input, d_kwargs) == output + + +def test_known_decompress_3(): + d_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4} + + # uncompressed data size smaller than buffer_size + input = b'%\x00\x00\x00\xff\x0bLorem ipsum dolor sit amet\x1a\x006P amet' + output = b'Lorem ipsum dolor sit amet' * 4 + assert decompress(input, d_kwargs) == output + + +def test_known_decompress_4(): + d_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4} + + input = b'%\x00\x00\x00\xff\x0bLorem ipsum dolor sit amet\x1a\x00NPit am\n\x00\x00\x00\x0fh\x00hP sit \x05\x00\x00\x00@amet' + output = b'Lorem ipsum dolor sit amet' * 10 + assert decompress(input, d_kwargs) == output diff --git a/contrib/python/lz4/py3/tests/stream/test_stream_2.py b/contrib/python/lz4/py3/tests/stream/test_stream_2.py new file mode 100644 index 0000000000..5578f832c4 --- /dev/null +++ b/contrib/python/lz4/py3/tests/stream/test_stream_2.py @@ -0,0 +1,152 @@ +import pytest +import sys +import lz4.stream +import psutil +import os + + +# This test requires allocating a big lump of memory. In order to +# avoid a massive memory allocation during byte compilation, we have +# to declare a variable for the size of the buffer we're going to +# create outside the scope of the function below. See: +# https://bugs.python.org/issue21074 + +_4GB = 0xffffffff # actually 4GB - 1B, the maximum size on 4 bytes. + +# This test will be killed on Travis due to the 3GB memory limit +# there. Unfortunately psutil reports the host memory, not the memory +# available to the container, and so can't be used to detect available +# memory, so instead, as an ugly hack for detecting we're on Travis we +# check for the TRAVIS environment variable being set. This is quite +# fragile. + +if os.environ.get('TRAVIS') is not None or sys.maxsize < _4GB or \ + psutil.virtual_memory().available < _4GB: + huge = None +else: + try: + huge = b'\0' * _4GB + except (MemoryError, OverflowError): + huge = None + + +@pytest.mark.skipif( + os.environ.get('TRAVIS') is not None, + reason='Skipping test on Travis due to insufficient memory' +) +@pytest.mark.skipif( + sys.maxsize < _4GB, + reason='Py_ssize_t too small for this test' +) +@pytest.mark.skipif( + psutil.virtual_memory().available < _4GB or huge is None, + reason='Insufficient system memory for this test' +) +def test_huge_1(): + data = b'' + kwargs = { + 'strategy': "double_buffer", + 'buffer_size': lz4.stream.LZ4_MAX_INPUT_SIZE, + 'store_comp_size': 4, + 'dictionary': huge, + } + + if psutil.virtual_memory().available < 3 * kwargs['buffer_size']: + # The internal LZ4 context will request at least 3 times buffer_size + # as memory (2 buffer_size for the double-buffer, and 1.x buffer_size + # for the output buffer) + pytest.skip('Insufficient system memory for this test') + + # Triggering overflow error + message = r'^Dictionary too large for LZ4 API$' + + with pytest.raises(OverflowError, match=message): + with lz4.stream.LZ4StreamCompressor(**kwargs) as proc: + proc.compress(data) + + with pytest.raises(OverflowError, match=message): + with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc: + proc.decompress(data) + + +@pytest.mark.skipif( + os.environ.get('TRAVIS') is not None, + reason='Skipping test on Travis due to insufficient memory' +) +@pytest.mark.skipif( + sys.maxsize < 0xffffffff, + reason='Py_ssize_t too small for this test' +) +@pytest.mark.skipif( + psutil.virtual_memory().available < _4GB or huge is None, + reason='Insufficient system memory for this test' +) +def test_huge_2(): + data = huge + kwargs = { + 'strategy': "double_buffer", + 'buffer_size': lz4.stream.LZ4_MAX_INPUT_SIZE, + 'store_comp_size': 4, + 'dictionary': b'', + } + + if psutil.virtual_memory().available < 3 * kwargs['buffer_size']: + # The internal LZ4 context will request at least 3 times buffer_size + # as memory (2 buffer_size for the double-buffer, and 1.x buffer_size + # for the output buffer) + pytest.skip('Insufficient system memory for this test') + + # Raising overflow error + message = r'^Input too large for LZ4 API$' + + with pytest.raises(OverflowError, match=message): + with lz4.stream.LZ4StreamCompressor(**kwargs) as proc: + proc.compress(data) + + # On decompression, too large input will raise LZ4StreamError + with pytest.raises(lz4.stream.LZ4StreamError): + with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc: + proc.decompress(data) + + +@pytest.mark.skipif( + os.environ.get('TRAVIS') is not None, + reason='Skipping test on Travis due to insufficient memory' +) +@pytest.mark.skipif( + sys.maxsize < 0xffffffff, + reason='Py_ssize_t too small for this test' +) +@pytest.mark.skipif( + psutil.virtual_memory().available < _4GB or huge is None, + reason='Insufficient system memory for this test' +) +def test_huge_3(): + data = huge + kwargs = { + 'strategy': "double_buffer", + 'buffer_size': lz4.stream.LZ4_MAX_INPUT_SIZE, + 'store_comp_size': 4, + 'dictionary': huge, + } + + if psutil.virtual_memory().available < 3 * kwargs['buffer_size']: + # The internal LZ4 context will request at least 3 times buffer_size + # as memory (2 buffer_size for the double-buffer, and 1.x buffer_size + # for the output buffer) + pytest.skip('Insufficient system memory for this test') + + # Raising overflow error (during initialization because of the dictionary parameter) + message = r'^Dictionary too large for LZ4 API$' + + with pytest.raises(OverflowError, match=message): + with lz4.stream.LZ4StreamCompressor(**kwargs) as proc: + proc.compress(data) + + with pytest.raises(OverflowError, match=message): + with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc: + proc.decompress(data) + + +def test_dummy(): + pass diff --git a/contrib/python/lz4/py3/tests/stream/test_stream_3.py b/contrib/python/lz4/py3/tests/stream/test_stream_3.py new file mode 100644 index 0000000000..2b52d6b549 --- /dev/null +++ b/contrib/python/lz4/py3/tests/stream/test_stream_3.py @@ -0,0 +1,123 @@ +import lz4.stream +import pytest +import sys + + +_1KB = 1024 +_1MB = _1KB * 1024 +_1GB = _1MB * 1024 + + +def compress(x, c_kwargs): + c = [] + with lz4.stream.LZ4StreamCompressor(**c_kwargs) as proc: + for start in range(0, len(x), c_kwargs['buffer_size']): + chunk = x[start:start + c_kwargs['buffer_size']] + block = proc.compress(chunk) + c.append(block) + if c_kwargs.get('return_bytearray', False): + return bytearray().join(c) + else: + return bytes().join(c) + + +def decompress(x, d_kwargs): + d = [] + with lz4.stream.LZ4StreamDecompressor(**d_kwargs) as proc: + start = 0 + while start < len(x): + block = proc.get_block(x[start:]) + chunk = proc.decompress(block) + d.append(chunk) + start += d_kwargs['store_comp_size'] + len(block) + if d_kwargs.get('return_bytearray', False): + return bytearray().join(d) + else: + return bytes().join(d) + + +test_buffer_size = sorted( + [256, + 1 * _1KB, + 64 * _1KB, + 1 * _1MB, + 1 * _1GB, + lz4.stream.LZ4_MAX_INPUT_SIZE] +) + + +@pytest.fixture( + params=test_buffer_size, + ids=[ + 'buffer_size' + str(i) for i in range(len(test_buffer_size)) + ] +) +def buffer_size(request): + return request.param + + +test_data = [ + (b'a' * _1MB), +] + + +@pytest.fixture( + params=test_data, + ids=[ + 'data' + str(i) for i in range(len(test_data)) + ] +) +def data(request): + return request.param + + +def test_block_decompress_mem_usage(data, buffer_size): + kwargs = { + 'strategy': "double_buffer", + 'buffer_size': buffer_size, + 'store_comp_size': 4, + } + + if sys.maxsize < 0xffffffff: + pytest.skip('Py_ssize_t too small for this test') + + tracemalloc = pytest.importorskip('tracemalloc') + + # Trace memory usage on compression + tracemalloc.start() + prev_snapshot = None + + for i in range(1000): + compressed = compress(data, kwargs) + + if i % 100 == 0: + snapshot = tracemalloc.take_snapshot() + + if prev_snapshot: + # Filter on lz4.stream module'a allocations + stats = [x for x in snapshot.compare_to(prev_snapshot, 'lineno') + if lz4.stream.__file__ in x.traceback._frames[0][0]] + assert sum(map(lambda x: x.size_diff, stats)) < (1024 * 4) + + prev_snapshot = snapshot + + tracemalloc.stop() + + tracemalloc.start() + prev_snapshot = None + + for i in range(1000): + decompressed = decompress(compressed, kwargs) # noqa: F841 + + if i % 100 == 0: + snapshot = tracemalloc.take_snapshot() + + if prev_snapshot: + # Filter on lz4.stream module'a allocations + stats = [x for x in snapshot.compare_to(prev_snapshot, 'lineno') + if lz4.stream.__file__ in x.traceback._frames[0][0]] + assert sum(map(lambda x: x.size_diff, stats)) < (1024 * 4) + + prev_snapshot = snapshot + + tracemalloc.stop() diff --git a/contrib/python/lz4/py3/tests/stream/test_stream_4.py b/contrib/python/lz4/py3/tests/stream/test_stream_4.py new file mode 100644 index 0000000000..3d139a02ef --- /dev/null +++ b/contrib/python/lz4/py3/tests/stream/test_stream_4.py @@ -0,0 +1,139 @@ +import lz4.stream +import pytest +import sys + + +if sys.version_info < (3, ): + from struct import pack, unpack + + def _get_format(length, byteorder, signed): + _order = {'l': '<', 'b': '>'} + _fmt = {1: 'b', 2: 'h', 4: 'i', 8: 'q'} + _sign = {True: lambda x: x.lower(), False: lambda x: x.upper()} + return _sign[signed](_order[byteorder[0].lower()] + _fmt[length]) + + def int_to_bytes(value, length=4, byteorder='little', signed=False): + return bytearray(pack(_get_format(length, byteorder, signed), value)) + + def int_from_bytes(bytes, byteorder='little', signed=False): + return unpack(_get_format(len(bytes), byteorder, signed), bytes)[0] + +else: + def int_to_bytes(value, length=4, byteorder='little', signed=False): + return value.to_bytes(length, byteorder, signed=signed) + + def int_from_bytes(bytes, byteorder='little', signed=False): + return int.from_bytes(bytes, byteorder, signed=signed) + +# Out-of-band block size record tests + + +def test_round_trip(): + data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24 + kwargs = {'strategy': "double_buffer", 'buffer_size': 256, 'store_comp_size': 4} + + oob_kwargs = {} + oob_kwargs.update(kwargs) + oob_kwargs['store_comp_size'] = 0 + + ib_cstream = bytearray() + oob_cstream = bytearray() + oob_sizes = [] + + with lz4.stream.LZ4StreamCompressor(**kwargs) as ib_proc, \ + lz4.stream.LZ4StreamCompressor(**oob_kwargs) as oob_proc: + for start in range(0, len(data), kwargs['buffer_size']): + chunk = data[start:start + kwargs['buffer_size']] + ib_block = ib_proc.compress(chunk) + oob_block = oob_proc.compress(chunk) + + assert (len(ib_block) == (len(oob_block) + kwargs['store_comp_size'])), \ + "Blocks size mismatch: " \ + "{}/{}".format(len(ib_block), len(oob_block) + kwargs['store_comp_size']) + + assert (int_from_bytes(ib_block[:kwargs['store_comp_size']]) == len(oob_block)), \ + "Blocks size record mismatch: got {}, expected {}".format( + int_from_bytes(ib_block[:kwargs['store_comp_size']]), + len(oob_block)) + + assert (ib_block[kwargs['store_comp_size']:] == oob_block), "Blocks data mismatch" + + ib_cstream += ib_block + oob_cstream += oob_block + oob_sizes.append(len(oob_block)) + + ib_dstream = bytearray() + oob_dstream = bytearray() + + with lz4.stream.LZ4StreamDecompressor(**kwargs) as ib_proc, \ + lz4.stream.LZ4StreamDecompressor(**oob_kwargs) as oob_proc: + ib_offset = 0 + oob_index = 0 + oob_offset = 0 + while ib_offset < len(ib_cstream) and oob_index < len(oob_sizes): + ib_block = ib_proc.get_block(ib_cstream[ib_offset:]) + oob_block = oob_cstream[oob_offset:oob_offset + oob_sizes[oob_index]] + + assert (len(ib_block) == len(oob_block)), \ + "Blocks size mismatch: {}/{}".format(len(ib_block), len(oob_block)) + + assert (ib_block == oob_block), "Blocks data mismatch" + + ib_chunk = ib_proc.decompress(ib_block) + oob_chunk = oob_proc.decompress(oob_block) + + assert (len(ib_chunk) == len(oob_chunk)), \ + "Chunks size mismatch: {}/{}".format(len(ib_chunk), len(oob_chunk)) + + assert (ib_chunk == oob_chunk), "Chunks data mismatch" + + ib_dstream += ib_chunk + oob_dstream += oob_chunk + + ib_offset += kwargs['store_comp_size'] + len(ib_block) + oob_offset += oob_sizes[oob_index] + oob_index += 1 + + assert (len(ib_dstream) == len(oob_dstream)), "Decompressed streams length mismatch" + + assert (len(data) == len(ib_dstream)), "Decompressed streams length mismatch" + + assert (len(data) == len(oob_dstream)), "Decompressed streams length mismatch" + + assert (ib_dstream == oob_dstream), "Decompressed streams mismatch" + + assert (data == ib_dstream), "Decompressed streams mismatch" + + assert (data == oob_dstream), "Decompressed streams mismatch" + + +def test_invalid_usage(): + data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24 + kwargs = {'strategy': "double_buffer", 'buffer_size': 256, 'store_comp_size': 0} + + cstream = bytearray() + oob_sizes = [] + + with lz4.stream.LZ4StreamCompressor(**kwargs) as proc: + for start in range(0, len(data), kwargs['buffer_size']): + chunk = data[start:start + kwargs['buffer_size']] + block = proc.compress(chunk) + cstream += block + oob_sizes.append(len(block)) + + message = r"^LZ4 context is configured for storing block size out-of-band$" + + with pytest.raises(lz4.stream.LZ4StreamError, match=message): + dstream = bytearray() + + with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc: + offset = 0 + index = 0 + while offset < len(cstream): + block = proc.get_block(cstream[offset:]) + chunk = proc.decompress(block) + + dstream += chunk + + offset += kwargs['store_comp_size'] + len(block) + index += 1 |