1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
// +build !clz4
package binary
import (
"encoding/binary"
"fmt"
"io"
"github.com/ClickHouse/clickhouse-go/lib/lz4"
)
type compressReader struct {
reader io.Reader
// data uncompressed
data []byte
// data position
pos int
// data compressed
zdata []byte
// lz4 headers
header []byte
}
// NewCompressReader wrap the io.Reader
func NewCompressReader(r io.Reader) *compressReader {
p := &compressReader{
reader: r,
header: make([]byte, HeaderSize),
}
p.data = make([]byte, BlockMaxSize, BlockMaxSize)
zlen := lz4.CompressBound(BlockMaxSize) + HeaderSize
p.zdata = make([]byte, zlen, zlen)
p.pos = len(p.data)
return p
}
func (cr *compressReader) Read(buf []byte) (n int, err error) {
var bytesRead = 0
n = len(buf)
if cr.pos < len(cr.data) {
copyedSize := copy(buf, cr.data[cr.pos:])
bytesRead += copyedSize
cr.pos += copyedSize
}
for bytesRead < n {
if err = cr.readCompressedData(); err != nil {
return bytesRead, err
}
copyedSize := copy(buf[bytesRead:], cr.data)
bytesRead += copyedSize
cr.pos = copyedSize
}
return n, nil
}
func (cr *compressReader) readCompressedData() (err error) {
cr.pos = 0
var n int
n, err = cr.reader.Read(cr.header)
if err != nil {
return
}
if n != len(cr.header) {
return fmt.Errorf("Lz4 decompression header EOF")
}
compressedSize := int(binary.LittleEndian.Uint32(cr.header[17:])) - 9
decompressedSize := int(binary.LittleEndian.Uint32(cr.header[21:]))
if compressedSize > cap(cr.zdata) {
cr.zdata = make([]byte, compressedSize)
}
if decompressedSize > cap(cr.data) {
cr.data = make([]byte, decompressedSize)
}
cr.zdata = cr.zdata[:compressedSize]
cr.data = cr.data[:decompressedSize]
// @TODO checksum
if cr.header[16] == LZ4 {
n, err = cr.reader.Read(cr.zdata)
if err != nil {
return
}
if n != len(cr.zdata) {
return fmt.Errorf("Decompress read size not match")
}
_, err = lz4.Decode(cr.data, cr.zdata)
if err != nil {
return
}
} else {
return fmt.Errorf("Unknown compression method: 0x%02x ", cr.header[16])
}
return nil
}
|