1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
import lz4.frame as lz4frame
import os
import sys
import pytest
from .helpers import get_frame_info_check
test_data = [
(b''),
(os.urandom(8 * 1024)),
(b'0' * 8 * 1024),
(bytearray(b'')),
(bytearray(os.urandom(8 * 1024))),
(os.urandom(128 * 1024)),
(os.urandom(256 * 1024)),
(os.urandom(512 * 1024)),
]
if sys.version_info > (2, 7):
test_data += [
(memoryview(b'')),
(memoryview(os.urandom(8 * 1024)))
]
@pytest.fixture(
params=test_data,
ids=[
'data' + str(i) for i in range(len(test_data))
]
)
def data(request):
return request.param
def test_roundtrip_1(
data,
block_size,
block_linked,
content_checksum,
block_checksum,
compression_level,
store_size):
compressed = lz4frame.compress(
data,
store_size=store_size,
compression_level=compression_level,
block_size=block_size,
block_linked=block_linked,
content_checksum=content_checksum,
block_checksum=block_checksum,
)
get_frame_info_check(
compressed,
len(data),
store_size,
block_size,
block_linked,
content_checksum,
block_checksum,
)
decompressed, bytes_read = lz4frame.decompress(
compressed, return_bytes_read=True)
assert bytes_read == len(compressed)
assert decompressed == data
def test_roundtrip_2(data,
block_size,
block_linked,
content_checksum,
block_checksum,
compression_level,
auto_flush,
store_size):
c_context = lz4frame.create_compression_context()
kwargs = {}
kwargs['compression_level'] = compression_level
kwargs['block_size'] = block_size
kwargs['block_linked'] = block_linked
kwargs['content_checksum'] = content_checksum
kwargs['block_checksum'] = block_checksum
kwargs['auto_flush'] = auto_flush
if store_size is True:
kwargs['source_size'] = len(data)
compressed = lz4frame.compress_begin(
c_context,
**kwargs
)
compressed += lz4frame.compress_chunk(
c_context,
data
)
compressed += lz4frame.compress_flush(c_context)
get_frame_info_check(
compressed,
len(data),
store_size,
block_size,
block_linked,
content_checksum,
block_checksum,
)
decompressed, bytes_read = lz4frame.decompress(
compressed, return_bytes_read=True)
assert bytes_read == len(compressed)
assert decompressed == data
|