blob: 583f3fbb0577d5b3437b38995123ea8422168576 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
import lz4.frame as lz4frame
import pytest
import os
test_data = [
(os.urandom(32) * 256),
]
@pytest.fixture(
params=test_data,
ids=[
'data' + str(i) for i in range(len(test_data))
]
)
def data(request):
return request.param
def test_roundtrip_multiframe_1(data):
nframes = 4
compressed = b''
for _ in range(nframes):
compressed += lz4frame.compress(data)
decompressed = b''
for _ in range(nframes):
decompressed += lz4frame.decompress(compressed)
assert len(decompressed) == nframes * len(data)
assert data * nframes == decompressed
def test_roundtrip_multiframe_2(data):
nframes = 4
compressed = b''
ctx = lz4frame.create_compression_context()
for _ in range(nframes):
compressed += lz4frame.compress_begin(ctx)
compressed += lz4frame.compress_chunk(ctx, data)
compressed += lz4frame.compress_flush(ctx)
decompressed = b''
for _ in range(nframes):
decompressed += lz4frame.decompress(compressed)
assert len(decompressed) == nframes * len(data)
assert data * nframes == decompressed
def test_roundtrip_multiframe_3(data):
nframes = 4
compressed = b''
ctx = lz4frame.create_compression_context()
for _ in range(nframes):
compressed += lz4frame.compress_begin(ctx)
compressed += lz4frame.compress_chunk(ctx, data)
compressed += lz4frame.compress_flush(ctx)
decompressed = b''
ctx = lz4frame.create_decompression_context()
for _ in range(nframes):
d, bytes_read, eof = lz4frame.decompress_chunk(ctx, compressed)
decompressed += d
assert eof is True
assert bytes_read == len(compressed) // nframes
assert len(decompressed) == nframes * len(data)
assert data * nframes == decompressed
def test_roundtrip_multiframe_4(data):
nframes = 4
compressed = b''
with lz4frame.LZ4FrameCompressor() as compressor:
for _ in range(nframes):
compressed += compressor.begin()
compressed += compressor.compress(data)
compressed += compressor.flush()
decompressed = b''
with lz4frame.LZ4FrameDecompressor() as decompressor:
for i in range(nframes):
if i == 0:
d = compressed
else:
d = decompressor.unused_data
decompressed += decompressor.decompress(d)
assert decompressor.eof is True
assert decompressor.needs_input is True
if i == nframes - 1:
assert decompressor.unused_data is None
else:
assert len(decompressor.unused_data) == len(
compressed) * (nframes - i - 1) / nframes
assert len(decompressed) == nframes * len(data)
assert data * nframes == decompressed
|