diff options
author | hcpp <hcpp@ydb.tech> | 2023-11-08 12:09:41 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-11-08 12:56:14 +0300 |
commit | a361f5b98b98b44ea510d274f6769164640dd5e1 (patch) | |
tree | c47c80962c6e2e7b06798238752fd3da0191a3f6 /library/go/core/metrics/solomon/spack_compression.go | |
parent | 9478806fde1f4d40bd5a45e7cbe77237dab613e9 (diff) | |
download | ydb-a361f5b98b98b44ea510d274f6769164640dd5e1.tar.gz |
metrics have been added
Diffstat (limited to 'library/go/core/metrics/solomon/spack_compression.go')
-rw-r--r-- | library/go/core/metrics/solomon/spack_compression.go | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/library/go/core/metrics/solomon/spack_compression.go b/library/go/core/metrics/solomon/spack_compression.go new file mode 100644 index 0000000000..004fe0150d --- /dev/null +++ b/library/go/core/metrics/solomon/spack_compression.go @@ -0,0 +1,162 @@ +package solomon + +import ( + "encoding/binary" + "io" + + "github.com/OneOfOne/xxhash" + "github.com/pierrec/lz4" +) + +type CompressionType uint8 + +const ( + CompressionNone CompressionType = 0x0 + CompressionZlib CompressionType = 0x1 + CompressionZstd CompressionType = 0x2 + CompressionLz4 CompressionType = 0x3 +) + +const ( + compressionFrameLength = 512 * 1024 + hashTableSize = 64 * 1024 +) + +type noCompressionWriteCloser struct { + underlying io.Writer + written int +} + +func (w *noCompressionWriteCloser) Write(p []byte) (int, error) { + n, err := w.underlying.Write(p) + w.written += n + return n, err +} + +func (w *noCompressionWriteCloser) Close() error { + return nil +} + +type lz4CompressionWriteCloser struct { + underlying io.Writer + buffer []byte + table []int + written int +} + +func (w *lz4CompressionWriteCloser) flushFrame() (written int, err error) { + src := w.buffer + dst := make([]byte, lz4.CompressBlockBound(len(src))) + + sz, err := lz4.CompressBlock(src, dst, w.table) + if err != nil { + return written, err + } + + if sz == 0 { + dst = src + } else { + dst = dst[:sz] + } + + err = binary.Write(w.underlying, binary.LittleEndian, uint32(len(dst))) + if err != nil { + return written, err + } + w.written += 4 + + err = binary.Write(w.underlying, binary.LittleEndian, uint32(len(src))) + if err != nil { + return written, err + } + w.written += 4 + + n, err := w.underlying.Write(dst) + if err != nil { + return written, err + } + w.written += n + + checksum := xxhash.Checksum32S(dst, 0x1337c0de) + err = binary.Write(w.underlying, binary.LittleEndian, checksum) + if err != nil { + return written, err + } + w.written += 4 + + w.buffer = w.buffer[:0] + + return written, nil +} + +func (w *lz4CompressionWriteCloser) Write(p []byte) (written int, err error) { + q := p[:] + for len(q) > 0 { + space := compressionFrameLength - len(w.buffer) + if space == 0 { + n, err := w.flushFrame() + if err != nil { + return written, err + } + w.written += n + space = compressionFrameLength + } + length := len(q) + if length > space { + length = space + } + w.buffer = append(w.buffer, q[:length]...) + q = q[length:] + } + return written, nil +} + +func (w *lz4CompressionWriteCloser) Close() error { + var err error + if len(w.buffer) > 0 { + n, err := w.flushFrame() + if err != nil { + return err + } + w.written += n + } + err = binary.Write(w.underlying, binary.LittleEndian, uint32(0)) + if err != nil { + return nil + } + w.written += 4 + + err = binary.Write(w.underlying, binary.LittleEndian, uint32(0)) + if err != nil { + return nil + } + w.written += 4 + + err = binary.Write(w.underlying, binary.LittleEndian, uint32(0)) + if err != nil { + return nil + } + w.written += 4 + + return nil +} + +func newCompressedWriter(w io.Writer, compression CompressionType) io.WriteCloser { + switch compression { + case CompressionNone: + return &noCompressionWriteCloser{w, 0} + case CompressionZlib: + panic("zlib compression not supported") + case CompressionZstd: + panic("zstd compression not supported") + case CompressionLz4: + return &lz4CompressionWriteCloser{ + w, + make([]byte, 0, compressionFrameLength), + make([]int, hashTableSize), + 0, + } + default: + panic("unsupported compression algorithm") + } +} |