summaryrefslogtreecommitdiffstats
path: root/library/go/blockcodecs/decoder.go
diff options
context:
space:
mode:
authorqrort <[email protected]>2022-12-02 11:31:25 +0300
committerqrort <[email protected]>2022-12-02 11:31:25 +0300
commitb1f4ffc9c8abff3ba58dc1ec9a9f92d2f0de6806 (patch)
tree2a23209faf0fea5586a6d4b9cee60d1b318d29fe /library/go/blockcodecs/decoder.go
parent559174a9144de40d6bb3997ea4073c82289b4974 (diff)
remove kikimr/driver DEPENDS
Diffstat (limited to 'library/go/blockcodecs/decoder.go')
-rw-r--r--library/go/blockcodecs/decoder.go155
1 files changed, 0 insertions, 155 deletions
diff --git a/library/go/blockcodecs/decoder.go b/library/go/blockcodecs/decoder.go
deleted file mode 100644
index bb38dcf844f..00000000000
--- a/library/go/blockcodecs/decoder.go
+++ /dev/null
@@ -1,155 +0,0 @@
-package blockcodecs
-
-import (
- "encoding/binary"
- "fmt"
- "io"
-)
-
-type Decoder struct {
- // optional
- codec Codec
-
- r io.Reader
- header [10]byte
- eof bool
- checkEOF bool
-
- pos int
- buffer []byte
-
- scratch []byte
-}
-
-func (d *Decoder) getCodec(id CodecID) (Codec, error) {
- if d.codec != nil {
- if id != d.codec.ID() {
- return nil, fmt.Errorf("blockcodecs: received block codec differs from provided: %d != %d", id, d.codec.ID())
- }
-
- return d.codec, nil
- }
-
- if codec := FindCodec(id); codec != nil {
- return codec, nil
- }
-
- return nil, fmt.Errorf("blockcodecs: received block with unsupported codec %d", id)
-}
-
-// SetCheckUnderlyingEOF changes EOF handling.
-//
-// Blockcodecs format contains end of stream separator. By default Decoder will stop right after
-// that separator, without trying to read following bytes from underlying reader.
-//
-// That allows reading sequence of blockcodecs streams from one underlying stream of bytes,
-// but messes up HTTP keep-alive, when using blockcodecs together with net/http connection pool.
-//
-// Setting CheckUnderlyingEOF to true, changes that. After encoutering end of stream block,
-// Decoder will perform one more Read from underlying reader and check for io.EOF.
-func (d *Decoder) SetCheckUnderlyingEOF(checkEOF bool) {
- d.checkEOF = checkEOF
-}
-
-func (d *Decoder) Read(p []byte) (int, error) {
- if d.eof {
- return 0, io.EOF
- }
-
- if d.pos == len(d.buffer) {
- if _, err := io.ReadFull(d.r, d.header[:]); err != nil {
- return 0, fmt.Errorf("blockcodecs: invalid header: %w", err)
- }
-
- codecID := CodecID(binary.LittleEndian.Uint16(d.header[:2]))
- size := int(binary.LittleEndian.Uint64(d.header[2:]))
-
- codec, err := d.getCodec(codecID)
- if err != nil {
- return 0, err
- }
-
- if limit := int(maxDecompressedBlockSize.Load()); size > limit {
- return 0, fmt.Errorf("blockcodecs: block size exceeds limit: %d > %d", size, limit)
- }
-
- if len(d.scratch) < size {
- d.scratch = append(d.scratch, make([]byte, size-len(d.scratch))...)
- }
- d.scratch = d.scratch[:size]
-
- if _, err := io.ReadFull(d.r, d.scratch[:]); err != nil {
- return 0, fmt.Errorf("blockcodecs: truncated block: %w", err)
- }
-
- decodedSize, err := codec.DecodedLen(d.scratch[:])
- if err != nil {
- return 0, fmt.Errorf("blockcodecs: corrupted block: %w", err)
- }
-
- if decodedSize == 0 {
- if d.checkEOF {
- var scratch [1]byte
- n, err := d.r.Read(scratch[:])
- if n != 0 {
- return 0, fmt.Errorf("blockcodecs: data after EOF block")
- }
- if err != nil && err != io.EOF {
- return 0, fmt.Errorf("blockcodecs: error after EOF block: %v", err)
- }
- }
-
- d.eof = true
- return 0, io.EOF
- }
-
- if limit := int(maxDecompressedBlockSize.Load()); decodedSize > limit {
- return 0, fmt.Errorf("blockcodecs: decoded block size exceeds limit: %d > %d", decodedSize, limit)
- }
-
- decodeInto := func(buf []byte) error {
- out, err := codec.Decode(buf, d.scratch)
- if err != nil {
- return fmt.Errorf("blockcodecs: corrupted block: %w", err)
- } else if len(out) != decodedSize {
- return fmt.Errorf("blockcodecs: incorrect block size: %d != %d", len(out), decodedSize)
- }
-
- return nil
- }
-
- if len(p) >= decodedSize {
- if err := decodeInto(p[:decodedSize]); err != nil {
- return 0, err
- }
-
- return decodedSize, nil
- }
-
- if len(d.buffer) < decodedSize {
- d.buffer = append(d.buffer, make([]byte, decodedSize-len(d.buffer))...)
- }
- d.buffer = d.buffer[:decodedSize]
- d.pos = decodedSize
-
- if err := decodeInto(d.buffer); err != nil {
- return 0, err
- }
-
- d.pos = 0
- }
-
- n := copy(p, d.buffer[d.pos:])
- d.pos += n
- return n, nil
-}
-
-// NewDecoder creates decoder that supports input in any of registered codecs.
-func NewDecoder(r io.Reader) *Decoder {
- return &Decoder{r: r}
-}
-
-// NewDecoderCodec creates decode that tries to decode input using provided codec.
-func NewDecoderCodec(r io.Reader, codec Codec) *Decoder {
- return &Decoder{r: r, codec: codec}
-}