1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
import lz4.stream
import pytest
import sys
if sys.version_info < (3, ):
from struct import pack, unpack
def _get_format(length, byteorder, signed):
_order = {'l': '<', 'b': '>'}
_fmt = {1: 'b', 2: 'h', 4: 'i', 8: 'q'}
_sign = {True: lambda x: x.lower(), False: lambda x: x.upper()}
return _sign[signed](_order[byteorder[0].lower()] + _fmt[length])
def int_to_bytes(value, length=4, byteorder='little', signed=False):
return bytearray(pack(_get_format(length, byteorder, signed), value))
def int_from_bytes(bytes, byteorder='little', signed=False):
return unpack(_get_format(len(bytes), byteorder, signed), bytes)[0]
else:
def int_to_bytes(value, length=4, byteorder='little', signed=False):
return value.to_bytes(length, byteorder, signed=signed)
def int_from_bytes(bytes, byteorder='little', signed=False):
return int.from_bytes(bytes, byteorder, signed=signed)
# Out-of-band block size record tests
def test_round_trip():
data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
kwargs = {'strategy': "double_buffer", 'buffer_size': 256, 'store_comp_size': 4}
oob_kwargs = {}
oob_kwargs.update(kwargs)
oob_kwargs['store_comp_size'] = 0
ib_cstream = bytearray()
oob_cstream = bytearray()
oob_sizes = []
with lz4.stream.LZ4StreamCompressor(**kwargs) as ib_proc, \
lz4.stream.LZ4StreamCompressor(**oob_kwargs) as oob_proc:
for start in range(0, len(data), kwargs['buffer_size']):
chunk = data[start:start + kwargs['buffer_size']]
ib_block = ib_proc.compress(chunk)
oob_block = oob_proc.compress(chunk)
assert (len(ib_block) == (len(oob_block) + kwargs['store_comp_size'])), \
"Blocks size mismatch: " \
"{}/{}".format(len(ib_block), len(oob_block) + kwargs['store_comp_size'])
assert (int_from_bytes(ib_block[:kwargs['store_comp_size']]) == len(oob_block)), \
"Blocks size record mismatch: got {}, expected {}".format(
int_from_bytes(ib_block[:kwargs['store_comp_size']]),
len(oob_block))
assert (ib_block[kwargs['store_comp_size']:] == oob_block), "Blocks data mismatch"
ib_cstream += ib_block
oob_cstream += oob_block
oob_sizes.append(len(oob_block))
ib_dstream = bytearray()
oob_dstream = bytearray()
with lz4.stream.LZ4StreamDecompressor(**kwargs) as ib_proc, \
lz4.stream.LZ4StreamDecompressor(**oob_kwargs) as oob_proc:
ib_offset = 0
oob_index = 0
oob_offset = 0
while ib_offset < len(ib_cstream) and oob_index < len(oob_sizes):
ib_block = ib_proc.get_block(ib_cstream[ib_offset:])
oob_block = oob_cstream[oob_offset:oob_offset + oob_sizes[oob_index]]
assert (len(ib_block) == len(oob_block)), \
"Blocks size mismatch: {}/{}".format(len(ib_block), len(oob_block))
assert (ib_block == oob_block), "Blocks data mismatch"
ib_chunk = ib_proc.decompress(ib_block)
oob_chunk = oob_proc.decompress(oob_block)
assert (len(ib_chunk) == len(oob_chunk)), \
"Chunks size mismatch: {}/{}".format(len(ib_chunk), len(oob_chunk))
assert (ib_chunk == oob_chunk), "Chunks data mismatch"
ib_dstream += ib_chunk
oob_dstream += oob_chunk
ib_offset += kwargs['store_comp_size'] + len(ib_block)
oob_offset += oob_sizes[oob_index]
oob_index += 1
assert (len(ib_dstream) == len(oob_dstream)), "Decompressed streams length mismatch"
assert (len(data) == len(ib_dstream)), "Decompressed streams length mismatch"
assert (len(data) == len(oob_dstream)), "Decompressed streams length mismatch"
assert (ib_dstream == oob_dstream), "Decompressed streams mismatch"
assert (data == ib_dstream), "Decompressed streams mismatch"
assert (data == oob_dstream), "Decompressed streams mismatch"
def test_invalid_usage():
data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
kwargs = {'strategy': "double_buffer", 'buffer_size': 256, 'store_comp_size': 0}
cstream = bytearray()
oob_sizes = []
with lz4.stream.LZ4StreamCompressor(**kwargs) as proc:
for start in range(0, len(data), kwargs['buffer_size']):
chunk = data[start:start + kwargs['buffer_size']]
block = proc.compress(chunk)
cstream += block
oob_sizes.append(len(block))
message = r"^LZ4 context is configured for storing block size out-of-band$"
with pytest.raises(lz4.stream.LZ4StreamError, match=message):
dstream = bytearray()
with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc:
offset = 0
index = 0
while offset < len(cstream):
block = proc.get_block(cstream[offset:])
chunk = proc.decompress(block)
dstream += chunk
offset += kwargs['store_comp_size'] + len(block)
index += 1
|