aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/lz4/py3/tests/block/test_block_1.py
blob: 4392bb332c5e4effbeea86a46ddb28db47c7ae0d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import lz4.block
import pytest
import sys
import os


def test_decompress_ui32_overflow():
    data = lz4.block.compress(b'A' * 64)
    with pytest.raises(OverflowError):
        lz4.block.decompress(data[4:], uncompressed_size=((1 << 32) + 64))


def test_decompress_without_leak():
    # Verify that hand-crafted packet does not leak uninitialized(?) memory.
    data = lz4.block.compress(b'A' * 64)
    message = r'^Decompressor wrote 64 bytes, but 79 bytes expected from header$'
    with pytest.raises(lz4.block.LZ4BlockError, match=message):
        lz4.block.decompress(b'\x4f' + data[1:])


def test_decompress_with_small_buffer():
    data = lz4.block.compress(b'A' * 64, store_size=False)
    message = r'^Decompression failed: corrupt input or insufficient space in destination buffer. Error code: \d+$'
    with pytest.raises(lz4.block.LZ4BlockError, match=message):
        lz4.block.decompress(data[4:], uncompressed_size=64)
    with pytest.raises(lz4.block.LZ4BlockError, match=message):
        lz4.block.decompress(data, uncompressed_size=60)


def test_decompress_truncated():
    input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
    compressed = lz4.block.compress(input_data)
    # for i in range(len(compressed)):
    #     try:
    #         lz4.block.decompress(compressed[:i])
    #     except:
    #         print(i, sys.exc_info()[0], sys.exc_info()[1])
    with pytest.raises(ValueError, match='Input source data size too small'):
        lz4.block.decompress(compressed[:0])
    for n in [0, 1]:
        with pytest.raises(ValueError, match='Input source data size too small'):
            lz4.block.decompress(compressed[:n])
    for n in [24, 25, -2, 27, 67, 85]:
        with pytest.raises(lz4.block.LZ4BlockError):
            lz4.block.decompress(compressed[:n])


def test_decompress_with_trailer():
    data = b'A' * 64
    comp = lz4.block.compress(data)
    message = r'^Decompression failed: corrupt input or insufficient space in destination buffer. Error code: \d+$'
    with pytest.raises(lz4.block.LZ4BlockError, match=message):
        lz4.block.decompress(comp + b'A')
    with pytest.raises(lz4.block.LZ4BlockError, match=message):
        lz4.block.decompress(comp + comp)
    with pytest.raises(lz4.block.LZ4BlockError, match=message):
        lz4.block.decompress(comp + comp[4:])


def test_unicode():
    if sys.version_info < (3,):
        return  # skip
    DATA = b'x'
    with pytest.raises(TypeError):
        lz4.block.compress(DATA.decode('latin1'))
        lz4.block.decompress(lz4.block.compress(DATA).decode('latin1'))

# These next two are probably redundant given test_1 above but we'll keep them
# for now


def test_return_bytearray():
    if sys.version_info < (3,):
        return  # skip
    data = os.urandom(128 * 1024)  # Read 128kb
    compressed = lz4.block.compress(data)
    b = lz4.block.compress(data, return_bytearray=True)
    assert isinstance(b, bytearray)
    assert bytes(b) == compressed
    b = lz4.block.decompress(compressed, return_bytearray=True)
    assert isinstance(b, bytearray)
    assert bytes(b) == data


def test_memoryview():
    if sys.version_info < (2, 7):
        return  # skip
    data = os.urandom(128 * 1024)  # Read 128kb
    compressed = lz4.block.compress(data)
    assert lz4.block.compress(memoryview(data)) == compressed
    assert lz4.block.decompress(memoryview(compressed)) == data


def test_with_dict_none():
    input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
    for mode in ['default', 'high_compression']:
        assert lz4.block.decompress(lz4.block.compress(
            input_data, mode=mode, dict=None)) == input_data
        assert lz4.block.decompress(lz4.block.compress(
            input_data, mode=mode), dict=None) == input_data
        assert lz4.block.decompress(lz4.block.compress(
            input_data, mode=mode, dict=b'')) == input_data
        assert lz4.block.decompress(lz4.block.compress(
            input_data, mode=mode), dict=b'') == input_data
        assert lz4.block.decompress(lz4.block.compress(
            input_data, mode=mode, dict='')) == input_data
        assert lz4.block.decompress(lz4.block.compress(
            input_data, mode=mode), dict='') == input_data


def test_with_dict():
    input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
    dict1 = input_data[10:30]
    dict2 = input_data[20:40]
    message = r'^Decompression failed: corrupt input or insufficient space in destination buffer. Error code: \d+$'
    for mode in ['default', 'high_compression']:
        compressed = lz4.block.compress(input_data, mode=mode, dict=dict1)
        with pytest.raises(lz4.block.LZ4BlockError, match=message):
            lz4.block.decompress(compressed)
        with pytest.raises(lz4.block.LZ4BlockError, match=message):
            lz4.block.decompress(compressed, dict=dict1[:2])
        assert lz4.block.decompress(compressed, dict=dict2) != input_data
        assert lz4.block.decompress(compressed, dict=dict1) == input_data
    assert lz4.block.decompress(lz4.block.compress(
        input_data), dict=dict1) == input_data


def test_known_decompress_1():
    input = b'\x00\x00\x00\x00\x00'
    output = b''
    assert lz4.block.decompress(input) == output


def test_known_decompress_2():
    input = b'\x01\x00\x00\x00\x10 '
    output = b' '
    assert lz4.block.decompress(input) == output


def test_known_decompress_3():
    input = b'h\x00\x00\x00\xff\x0bLorem ipsum dolor sit amet\x1a\x006P amet'
    output = b'Lorem ipsum dolor sit amet' * 4
    assert lz4.block.decompress(input) == output


def test_known_decompress_4():
    input = b'\xb0\xb3\x00\x00\xff\x1fExcepteur sint occaecat cupidatat non proident.\x00' + (b'\xff' * 180) + b'\x1ePident'
    output = b'Excepteur sint occaecat cupidatat non proident' * 1000
    assert lz4.block.decompress(input) == output