1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
import os
import pytest
import lz4.frame as lz4frame
test_data = [
b'',
(128 * (32 * os.urandom(32))),
(5 * 128 * os.urandom(1024)),
]
@pytest.fixture(
params=test_data,
ids=[
'data' + str(i) for i in range(len(test_data))
]
)
def data(request):
return request.param
compression_levels = [
(lz4frame.COMPRESSIONLEVEL_MIN),
# (lz4frame.COMPRESSIONLEVEL_MINHC),
# (lz4frame.COMPRESSIONLEVEL_MAX),
]
@pytest.fixture(
params=compression_levels
)
def compression_level(request):
return request.param
def test_lz4frame_open_write(data):
with lz4frame.open('testfile', mode='wb') as fp:
fp.write(data)
def test_lz4frame_open_write_read_defaults(data):
with lz4frame.open('testfile', mode='wb') as fp:
fp.write(data)
with lz4frame.open('testfile', mode='r') as fp:
data_out = fp.read()
assert data_out == data
def test_lz4frame_open_write_read_text():
data = u'This is a test string'
with lz4frame.open('testfile', mode='wt') as fp:
fp.write(data)
with lz4frame.open('testfile', mode='rt') as fp:
data_out = fp.read()
assert data_out == data
def test_lz4frame_open_write_read_text_iter():
data = u'This is a test string'
with lz4frame.open('testfile', mode='wt') as fp:
fp.write(data)
data_out = ''
with lz4frame.open('testfile', mode='rt') as fp:
for line in fp:
data_out += line
assert data_out == data
def test_lz4frame_open_write_read(
data,
compression_level,
block_linked,
block_checksum,
block_size,
content_checksum,
auto_flush,
store_size,
return_bytearray):
kwargs = {}
if store_size is True:
kwargs['source_size'] = len(data)
kwargs['compression_level'] = compression_level
kwargs['block_size'] = block_size
kwargs['block_linked'] = block_linked
kwargs['content_checksum'] = content_checksum
kwargs['block_checksum'] = block_checksum
kwargs['auto_flush'] = auto_flush
kwargs['return_bytearray'] = return_bytearray
kwargs['mode'] = 'wb'
with lz4frame.open('testfile', **kwargs) as fp:
fp.write(data)
with lz4frame.open('testfile', mode='r') as fp:
data_out = fp.read()
assert data_out == data
|