1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
import pytest
import sys
import lz4.stream
import psutil
import os
# This test requires allocating a big lump of memory. In order to
# avoid a massive memory allocation during byte compilation, we have
# to declare a variable for the size of the buffer we're going to
# create outside the scope of the function below. See:
# https://bugs.python.org/issue21074
_4GB = 0xffffffff # actually 4GB - 1B, the maximum size on 4 bytes.
# This test will be killed on Travis due to the 3GB memory limit
# there. Unfortunately psutil reports the host memory, not the memory
# available to the container, and so can't be used to detect available
# memory, so instead, as an ugly hack for detecting we're on Travis we
# check for the TRAVIS environment variable being set. This is quite
# fragile.
if os.environ.get('TRAVIS') is not None or sys.maxsize < _4GB or \
psutil.virtual_memory().available < _4GB:
huge = None
else:
try:
huge = b'\0' * _4GB
except (MemoryError, OverflowError):
huge = None
@pytest.mark.skipif(
os.environ.get('TRAVIS') is not None,
reason='Skipping test on Travis due to insufficient memory'
)
@pytest.mark.skipif(
sys.maxsize < _4GB,
reason='Py_ssize_t too small for this test'
)
@pytest.mark.skipif(
psutil.virtual_memory().available < _4GB or huge is None,
reason='Insufficient system memory for this test'
)
def test_huge_1():
data = b''
kwargs = {
'strategy': "double_buffer",
'buffer_size': lz4.stream.LZ4_MAX_INPUT_SIZE,
'store_comp_size': 4,
'dictionary': huge,
}
if psutil.virtual_memory().available < 3 * kwargs['buffer_size']:
# The internal LZ4 context will request at least 3 times buffer_size
# as memory (2 buffer_size for the double-buffer, and 1.x buffer_size
# for the output buffer)
pytest.skip('Insufficient system memory for this test')
# Triggering overflow error
message = r'^Dictionary too large for LZ4 API$'
with pytest.raises(OverflowError, match=message):
with lz4.stream.LZ4StreamCompressor(**kwargs) as proc:
proc.compress(data)
with pytest.raises(OverflowError, match=message):
with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc:
proc.decompress(data)
@pytest.mark.skipif(
os.environ.get('TRAVIS') is not None,
reason='Skipping test on Travis due to insufficient memory'
)
@pytest.mark.skipif(
sys.maxsize < 0xffffffff,
reason='Py_ssize_t too small for this test'
)
@pytest.mark.skipif(
psutil.virtual_memory().available < _4GB or huge is None,
reason='Insufficient system memory for this test'
)
def test_huge_2():
data = huge
kwargs = {
'strategy': "double_buffer",
'buffer_size': lz4.stream.LZ4_MAX_INPUT_SIZE,
'store_comp_size': 4,
'dictionary': b'',
}
if psutil.virtual_memory().available < 3 * kwargs['buffer_size']:
# The internal LZ4 context will request at least 3 times buffer_size
# as memory (2 buffer_size for the double-buffer, and 1.x buffer_size
# for the output buffer)
pytest.skip('Insufficient system memory for this test')
# Raising overflow error
message = r'^Input too large for LZ4 API$'
with pytest.raises(OverflowError, match=message):
with lz4.stream.LZ4StreamCompressor(**kwargs) as proc:
proc.compress(data)
# On decompression, too large input will raise LZ4StreamError
with pytest.raises(lz4.stream.LZ4StreamError):
with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc:
proc.decompress(data)
@pytest.mark.skipif(
os.environ.get('TRAVIS') is not None,
reason='Skipping test on Travis due to insufficient memory'
)
@pytest.mark.skipif(
sys.maxsize < 0xffffffff,
reason='Py_ssize_t too small for this test'
)
@pytest.mark.skipif(
psutil.virtual_memory().available < _4GB or huge is None,
reason='Insufficient system memory for this test'
)
def test_huge_3():
data = huge
kwargs = {
'strategy': "double_buffer",
'buffer_size': lz4.stream.LZ4_MAX_INPUT_SIZE,
'store_comp_size': 4,
'dictionary': huge,
}
if psutil.virtual_memory().available < 3 * kwargs['buffer_size']:
# The internal LZ4 context will request at least 3 times buffer_size
# as memory (2 buffer_size for the double-buffer, and 1.x buffer_size
# for the output buffer)
pytest.skip('Insufficient system memory for this test')
# Raising overflow error (during initialization because of the dictionary parameter)
message = r'^Dictionary too large for LZ4 API$'
with pytest.raises(OverflowError, match=message):
with lz4.stream.LZ4StreamCompressor(**kwargs) as proc:
proc.compress(data)
with pytest.raises(OverflowError, match=message):
with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc:
proc.decompress(data)
def test_dummy():
pass
|