aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/clickhouse-connect/clickhouse_connect/driver/compression.py
blob: db69ae3f040d424c9ee4811372b52b0a310a7575 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import zlib
from abc import abstractmethod
from typing import Union

import lz4
import lz4.frame
import zstandard

try:
    import brotli
except ImportError:
    brotli = None


available_compression = ['lz4', 'zstd']

if brotli:
    available_compression.append('br')
available_compression.extend(['gzip', 'deflate'])

comp_map = {}


class Compressor:
    def __init_subclass__(cls, tag: str, thread_safe: bool = True):
        comp_map[tag] = cls() if thread_safe else cls

    @abstractmethod
    def compress_block(self, block) -> Union[bytes, bytearray]:
        return block

    def flush(self):
        pass


class GzipCompressor(Compressor, tag='gzip', thread_safe=False):
    def __init__(self, level: int = 6, wbits: int = 31):
        self.zlib_obj = zlib.compressobj(level=level, wbits=wbits)

    def compress_block(self, block):
        return self.zlib_obj.compress(block)

    def flush(self):
        return self.zlib_obj.flush()


class Lz4Compressor(Compressor, tag='lz4', thread_safe=False):
    def __init__(self):
        self.comp = lz4.frame.LZ4FrameCompressor()

    def compress_block(self, block):
        output = self.comp.begin(len(block))
        output += self.comp.compress(block)
        return output + self.comp.flush()


class ZstdCompressor(Compressor, tag='zstd'):
    def compress_block(self, block):
        return zstandard.compress(block)


class BrotliCompressor(Compressor, tag='br'):
    def compress_block(self, block):
        return brotli.compress(block)


null_compressor = Compressor()


def get_compressor(compression: str) -> Compressor:
    if not compression:
        return null_compressor
    comp = comp_map[compression]
    try:
        return comp()
    except TypeError:
        return comp