aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/lz4/py3/tests/stream/test_stream_3.py
blob: 2b52d6b54941529b1ea0bbacb98c8b1d4cb5a981 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import lz4.stream
import pytest
import sys


_1KB = 1024
_1MB = _1KB * 1024
_1GB = _1MB * 1024


def compress(x, c_kwargs):
    c = []
    with lz4.stream.LZ4StreamCompressor(**c_kwargs) as proc:
        for start in range(0, len(x), c_kwargs['buffer_size']):
            chunk = x[start:start + c_kwargs['buffer_size']]
            block = proc.compress(chunk)
            c.append(block)
    if c_kwargs.get('return_bytearray', False):
        return bytearray().join(c)
    else:
        return bytes().join(c)


def decompress(x, d_kwargs):
    d = []
    with lz4.stream.LZ4StreamDecompressor(**d_kwargs) as proc:
        start = 0
        while start < len(x):
            block = proc.get_block(x[start:])
            chunk = proc.decompress(block)
            d.append(chunk)
            start += d_kwargs['store_comp_size'] + len(block)
    if d_kwargs.get('return_bytearray', False):
        return bytearray().join(d)
    else:
        return bytes().join(d)


test_buffer_size = sorted(
    [256,
     1 * _1KB,
     64 * _1KB,
     1 * _1MB,
     1 * _1GB,
     lz4.stream.LZ4_MAX_INPUT_SIZE]
)


@pytest.fixture(
    params=test_buffer_size,
    ids=[
        'buffer_size' + str(i) for i in range(len(test_buffer_size))
    ]
)
def buffer_size(request):
    return request.param


test_data = [
    (b'a' * _1MB),
]


@pytest.fixture(
    params=test_data,
    ids=[
        'data' + str(i) for i in range(len(test_data))
    ]
)
def data(request):
    return request.param


def test_block_decompress_mem_usage(data, buffer_size):
    kwargs = {
        'strategy': "double_buffer",
        'buffer_size': buffer_size,
        'store_comp_size': 4,
    }

    if sys.maxsize < 0xffffffff:
        pytest.skip('Py_ssize_t too small for this test')

    tracemalloc = pytest.importorskip('tracemalloc')

    # Trace memory usage on compression
    tracemalloc.start()
    prev_snapshot = None

    for i in range(1000):
        compressed = compress(data, kwargs)

        if i % 100 == 0:
            snapshot = tracemalloc.take_snapshot()

            if prev_snapshot:
                # Filter on lz4.stream module'a allocations
                stats = [x for x in snapshot.compare_to(prev_snapshot, 'lineno')
                         if lz4.stream.__file__ in x.traceback._frames[0][0]]
                assert sum(map(lambda x: x.size_diff, stats)) < (1024 * 4)

            prev_snapshot = snapshot

    tracemalloc.stop()

    tracemalloc.start()
    prev_snapshot = None

    for i in range(1000):
        decompressed = decompress(compressed, kwargs)  # noqa: F841

        if i % 100 == 0:
            snapshot = tracemalloc.take_snapshot()

            if prev_snapshot:
                # Filter on lz4.stream module'a allocations
                stats = [x for x in snapshot.compare_to(prev_snapshot, 'lineno')
                         if lz4.stream.__file__ in x.traceback._frames[0][0]]
                assert sum(map(lambda x: x.size_diff, stats)) < (1024 * 4)

            prev_snapshot = snapshot

    tracemalloc.stop()