1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
import lz4.frame as lz4frame
import pytest
import os
import sys
from . helpers import (
get_chunked,
get_frame_info_check,
)
test_data = [
(b'', 1, 1),
(os.urandom(8 * 1024), 8, 1),
(os.urandom(8 * 1024), 1, 8),
(b'0' * 8 * 1024, 8, 1),
(b'0' * 8 * 1024, 8, 1),
(bytearray(b''), 1, 1),
(bytearray(os.urandom(8 * 1024)), 8, 1),
]
if sys.version_info > (2, 7):
test_data += [
(memoryview(b''), 1, 1),
(memoryview(os.urandom(8 * 1024)), 8, 1)
]
@pytest.fixture(
params=test_data,
ids=[
'data' + str(i) for i in range(len(test_data))
]
)
def data(request):
return request.param
def test_roundtrip_chunked(data, block_size, block_linked,
content_checksum, block_checksum,
compression_level,
auto_flush, store_size):
data, c_chunks, d_chunks = data
c_context = lz4frame.create_compression_context()
kwargs = {}
kwargs['compression_level'] = compression_level
kwargs['block_size'] = block_size
kwargs['block_linked'] = block_linked
kwargs['content_checksum'] = content_checksum
kwargs['block_checksum'] = block_checksum
kwargs['auto_flush'] = auto_flush
if store_size is True:
kwargs['source_size'] = len(data)
compressed = lz4frame.compress_begin(
c_context,
**kwargs
)
data_in = get_chunked(data, c_chunks)
try:
while True:
compressed += lz4frame.compress_chunk(
c_context,
next(data_in)
)
except StopIteration:
pass
finally:
del data_in
compressed += lz4frame.compress_flush(c_context)
get_frame_info_check(
compressed,
len(data),
store_size,
block_size,
block_linked,
content_checksum,
block_checksum,
)
d_context = lz4frame.create_decompression_context()
compressed_in = get_chunked(compressed, d_chunks)
decompressed = b''
bytes_read = 0
eofs = []
try:
while True:
d, b, e = lz4frame.decompress_chunk(
d_context,
next(compressed_in),
)
decompressed += d
bytes_read += b
eofs.append(e)
except StopIteration:
pass
finally:
del compressed_in
assert bytes_read == len(compressed)
assert decompressed == data
assert eofs[-1] is True
assert (True in eofs[:-2]) is False
|