1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
import zlib
from abc import abstractmethod
from typing import Union
import lz4
import lz4.frame
import zstandard
try:
import brotli
except ImportError:
brotli = None
available_compression = ['lz4', 'zstd']
if brotli:
available_compression.append('br')
available_compression.extend(['gzip', 'deflate'])
comp_map = {}
class Compressor:
def __init_subclass__(cls, tag: str, thread_safe: bool = True):
comp_map[tag] = cls() if thread_safe else cls
@abstractmethod
def compress_block(self, block) -> Union[bytes, bytearray]:
return block
def flush(self):
pass
class GzipCompressor(Compressor, tag='gzip', thread_safe=False):
def __init__(self, level: int = 6, wbits: int = 31):
self.zlib_obj = zlib.compressobj(level=level, wbits=wbits)
def compress_block(self, block):
return self.zlib_obj.compress(block)
def flush(self):
return self.zlib_obj.flush()
class Lz4Compressor(Compressor, tag='lz4', thread_safe=False):
def __init__(self):
self.comp = lz4.frame.LZ4FrameCompressor()
def compress_block(self, block):
output = self.comp.begin(len(block))
output += self.comp.compress(block)
return output + self.comp.flush()
class ZstdCompressor(Compressor, tag='zstd'):
def compress_block(self, block):
return zstandard.compress(block)
class BrotliCompressor(Compressor, tag='br'):
def compress_block(self, block):
return brotli.compress(block)
null_compressor = Compressor()
def get_compressor(compression: str) -> Compressor:
if not compression:
return null_compressor
comp = comp_map[compression]
try:
return comp()
except TypeError:
return comp
|