diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-12-12 21:55:07 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-12-12 22:25:10 +0300 |
commit | 4967f99474a4040ba150eb04995de06342252718 (patch) | |
tree | c9c118836513a8fab6e9fcfb25be5d404338bca7 /vendor/github.com/aws/smithy-go/io | |
parent | 2ce9cccb9b0bdd4cd7a3491dc5cbf8687cda51de (diff) | |
download | ydb-4967f99474a4040ba150eb04995de06342252718.tar.gz |
YQ Connector: prepare code base for S3 integration
1. Кодовая база Коннектора переписана с помощью Go дженериков так, чтобы добавление нового источника данных (в частности S3 + csv) максимально переиспользовало имеющийся код (чтобы сохранялась логика нарезания на блоки данных, учёт трафика и пр.)
2. API Connector расширено для работы с S3, но ещё пока не протестировано.
Diffstat (limited to 'vendor/github.com/aws/smithy-go/io')
-rw-r--r-- | vendor/github.com/aws/smithy-go/io/byte.go | 12 | ||||
-rw-r--r-- | vendor/github.com/aws/smithy-go/io/doc.go | 2 | ||||
-rw-r--r-- | vendor/github.com/aws/smithy-go/io/gotest/ya.make | 5 | ||||
-rw-r--r-- | vendor/github.com/aws/smithy-go/io/reader.go | 16 | ||||
-rw-r--r-- | vendor/github.com/aws/smithy-go/io/ringbuffer.go | 94 | ||||
-rw-r--r-- | vendor/github.com/aws/smithy-go/io/ringbuffer_test.go | 465 | ||||
-rw-r--r-- | vendor/github.com/aws/smithy-go/io/ya.make | 18 |
7 files changed, 612 insertions, 0 deletions
diff --git a/vendor/github.com/aws/smithy-go/io/byte.go b/vendor/github.com/aws/smithy-go/io/byte.go new file mode 100644 index 0000000000..f8417c15b8 --- /dev/null +++ b/vendor/github.com/aws/smithy-go/io/byte.go @@ -0,0 +1,12 @@ +package io + +const ( + // Byte is 8 bits + Byte int64 = 1 + // KibiByte (KiB) is 1024 Bytes + KibiByte = Byte * 1024 + // MebiByte (MiB) is 1024 KiB + MebiByte = KibiByte * 1024 + // GibiByte (GiB) is 1024 MiB + GibiByte = MebiByte * 1024 +) diff --git a/vendor/github.com/aws/smithy-go/io/doc.go b/vendor/github.com/aws/smithy-go/io/doc.go new file mode 100644 index 0000000000..a6a33eaf56 --- /dev/null +++ b/vendor/github.com/aws/smithy-go/io/doc.go @@ -0,0 +1,2 @@ +// Package io provides utilities for Smithy generated API clients. +package io diff --git a/vendor/github.com/aws/smithy-go/io/gotest/ya.make b/vendor/github.com/aws/smithy-go/io/gotest/ya.make new file mode 100644 index 0000000000..498726ec13 --- /dev/null +++ b/vendor/github.com/aws/smithy-go/io/gotest/ya.make @@ -0,0 +1,5 @@ +GO_TEST_FOR(vendor/github.com/aws/smithy-go/io) + +LICENSE(Apache-2.0) + +END() diff --git a/vendor/github.com/aws/smithy-go/io/reader.go b/vendor/github.com/aws/smithy-go/io/reader.go new file mode 100644 index 0000000000..07063f2960 --- /dev/null +++ b/vendor/github.com/aws/smithy-go/io/reader.go @@ -0,0 +1,16 @@ +package io + +import ( + "io" +) + +// ReadSeekNopCloser wraps an io.ReadSeeker with an additional Close method +// that does nothing. +type ReadSeekNopCloser struct { + io.ReadSeeker +} + +// Close does nothing. +func (ReadSeekNopCloser) Close() error { + return nil +} diff --git a/vendor/github.com/aws/smithy-go/io/ringbuffer.go b/vendor/github.com/aws/smithy-go/io/ringbuffer.go new file mode 100644 index 0000000000..06b476add8 --- /dev/null +++ b/vendor/github.com/aws/smithy-go/io/ringbuffer.go @@ -0,0 +1,94 @@ +package io + +import ( + "bytes" + "io" +) + +// RingBuffer struct satisfies io.ReadWrite interface. +// +// ReadBuffer is a revolving buffer data structure, which can be used to store snapshots of data in a +// revolving window. +type RingBuffer struct { + slice []byte + start int + end int + size int +} + +// NewRingBuffer method takes in a byte slice as an input and returns a RingBuffer. +func NewRingBuffer(slice []byte) *RingBuffer { + ringBuf := RingBuffer{ + slice: slice, + } + return &ringBuf +} + +// Write method inserts the elements in a byte slice, and returns the number of bytes written along with any error. +func (r *RingBuffer) Write(p []byte) (int, error) { + for _, b := range p { + // check if end points to invalid index, we need to circle back + if r.end == len(r.slice) { + r.end = 0 + } + // check if start points to invalid index, we need to circle back + if r.start == len(r.slice) { + r.start = 0 + } + // if ring buffer is filled, increment the start index + if r.size == len(r.slice) { + r.size-- + r.start++ + } + + r.slice[r.end] = b + r.end++ + r.size++ + } + return len(p), nil +} + +// Read copies the data on the ring buffer into the byte slice provided to the method. +// Returns the read count along with any error encountered while reading. +func (r *RingBuffer) Read(p []byte) (int, error) { + // readCount keeps track of the number of bytes read + var readCount int + for j := 0; j < len(p); j++ { + // if ring buffer is empty or completely read + // return EOF error. + if r.size == 0 { + return readCount, io.EOF + } + + if r.start == len(r.slice) { + r.start = 0 + } + + p[j] = r.slice[r.start] + readCount++ + // increment the start pointer for ring buffer + r.start++ + // decrement the size of ring buffer + r.size-- + } + return readCount, nil +} + +// Len returns the number of unread bytes in the buffer. +func (r *RingBuffer) Len() int { + return r.size +} + +// Bytes returns a copy of the RingBuffer's bytes. +func (r RingBuffer) Bytes() []byte { + var b bytes.Buffer + io.Copy(&b, &r) + return b.Bytes() +} + +// Reset resets the ring buffer. +func (r *RingBuffer) Reset() { + *r = RingBuffer{ + slice: r.slice, + } +} diff --git a/vendor/github.com/aws/smithy-go/io/ringbuffer_test.go b/vendor/github.com/aws/smithy-go/io/ringbuffer_test.go new file mode 100644 index 0000000000..70978cc2a0 --- /dev/null +++ b/vendor/github.com/aws/smithy-go/io/ringbuffer_test.go @@ -0,0 +1,465 @@ +package io + +import ( + "bytes" + "github.com/google/go-cmp/cmp" + "io" + "io/ioutil" + "strconv" + "strings" + "testing" +) + +func TestRingBuffer_Write(t *testing.T) { + cases := map[string]struct { + sliceCapacity int + input []byte + expectedStart int + expectedEnd int + expectedSize int + expectedWrittenBuffer []byte + }{ + "RingBuffer capacity matches Bytes written": { + sliceCapacity: 11, + input: []byte("hello world"), + expectedStart: 0, + expectedEnd: 11, + expectedSize: 11, + expectedWrittenBuffer: []byte("hello world"), + }, + "RingBuffer capacity is lower than Bytes written": { + sliceCapacity: 10, + input: []byte("hello world"), + expectedStart: 1, + expectedEnd: 1, + expectedSize: 10, + expectedWrittenBuffer: []byte("dello worl"), + }, + "RingBuffer capacity is more than Bytes written": { + sliceCapacity: 12, + input: []byte("hello world"), + expectedStart: 0, + expectedEnd: 11, + expectedSize: 11, + expectedWrittenBuffer: []byte("hello world"), + }, + "No Bytes written": { + sliceCapacity: 10, + input: []byte(""), + expectedStart: 0, + expectedEnd: 0, + expectedSize: 0, + expectedWrittenBuffer: []byte(""), + }, + } + for name, c := range cases { + t.Run(name, func(t *testing.T) { + byteSlice := make([]byte, c.sliceCapacity) + ringBuffer := NewRingBuffer(byteSlice) + ringBuffer.Write(c.input) + if e, a := c.expectedSize, ringBuffer.size; e != a { + t.Errorf("expect default size to be %v , got %v", e, a) + } + if e, a := c.expectedStart, ringBuffer.start; e != a { + t.Errorf("expect deafult start to point to %v , got %v", e, a) + } + if e, a := c.expectedEnd, ringBuffer.end; e != a { + t.Errorf("expect default end to point to %v , got %v", e, a) + } + if e, a := c.expectedWrittenBuffer, ringBuffer.slice; !bytes.Contains(a, e) { + t.Errorf("expect written bytes to be %v , got %v", string(e), string(a)) + } + }) + } + +} + +func TestRingBuffer_Read(t *testing.T) { + cases := map[string]struct { + input []byte + numberOfBytesToRead int + expectedStartAfterRead int + expectedEndAfterRead int + expectedSizeOfBufferAfterRead int + expectedReadSlice []byte + expectedErrorAfterRead error + }{ + "Read capacity matches Bytes written": { + input: []byte("Hello world"), + numberOfBytesToRead: 11, + expectedStartAfterRead: 11, + expectedEndAfterRead: 11, + expectedSizeOfBufferAfterRead: 0, + expectedReadSlice: []byte("Hello world"), + expectedErrorAfterRead: nil, + }, + "Read capacity is lower than Bytes written": { + input: []byte("hello world"), + numberOfBytesToRead: 5, + expectedStartAfterRead: 5, + expectedEndAfterRead: 11, + expectedSizeOfBufferAfterRead: 6, + expectedReadSlice: []byte("hello"), + expectedErrorAfterRead: nil, + }, + "Read capacity is more than Bytes written": { + input: []byte("hello world"), + numberOfBytesToRead: 15, + expectedStartAfterRead: 11, + expectedEndAfterRead: 11, + expectedSizeOfBufferAfterRead: 0, + expectedReadSlice: []byte("hello world"), + expectedErrorAfterRead: io.EOF, + }, + "No Bytes are read": { + input: []byte("hello world"), + numberOfBytesToRead: 0, + expectedStartAfterRead: 0, + expectedEndAfterRead: 11, + expectedSizeOfBufferAfterRead: 11, + expectedReadSlice: []byte(""), + expectedErrorAfterRead: nil, + }, + "No Bytes written": { + input: []byte(""), + numberOfBytesToRead: 11, + expectedStartAfterRead: 0, + expectedEndAfterRead: 0, + expectedSizeOfBufferAfterRead: 0, + expectedReadSlice: []byte(""), + expectedErrorAfterRead: io.EOF, + }, + "RingBuffer capacity is more than Bytes Written": { + input: []byte("h"), + numberOfBytesToRead: 11, + expectedStartAfterRead: 1, + expectedEndAfterRead: 1, + expectedSizeOfBufferAfterRead: 0, + expectedReadSlice: []byte("h"), + expectedErrorAfterRead: io.EOF, + }, + } + for name, c := range cases { + byteSlice := make([]byte, 11) + t.Run(name, func(t *testing.T) { + ringBuffer := NewRingBuffer(byteSlice) + readSlice := make([]byte, c.numberOfBytesToRead) + + ringBuffer.Write(c.input) + _, err := ringBuffer.Read(readSlice) + + if e, a := c.expectedErrorAfterRead, err; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := c.expectedReadSlice, readSlice; !bytes.Contains(a, e) { + t.Errorf("expect read buffer to be %v, got %v", string(e), string(a)) + } + if e, a := c.expectedSizeOfBufferAfterRead, ringBuffer.size; e != a { + t.Errorf("expect default size to be %v , got %v", e, a) + } + if e, a := c.expectedStartAfterRead, ringBuffer.start; e != a { + t.Errorf("expect default start to point to %v , got %v", e, a) + } + if e, a := c.expectedEndAfterRead, ringBuffer.end; e != a { + t.Errorf("expect default end to point to %v , got %v", e, a) + } + }) + } +} + +func TestRingBuffer_forConsecutiveReadWrites(t *testing.T) { + cases := map[string]struct { + input []string + sliceCapacity int + numberOfBytesToRead []int + expectedStartAfterRead []int + expectedEnd []int + expectedSizeOfBufferAfterRead []int + expectedReadSlice []string + expectedWrittenBuffer []string + expectedErrorAfterRead []error + }{ + "RingBuffer capacity matches Bytes written": { + input: []string{"Hello World", "Hello Earth", "Mars,/"}, + sliceCapacity: 11, + numberOfBytesToRead: []int{5, 11}, + expectedStartAfterRead: []int{5, 6}, + expectedEnd: []int{11, 6}, + expectedSizeOfBufferAfterRead: []int{6, 0}, + expectedReadSlice: []string{"Hello", "EarthMars,/"}, + expectedWrittenBuffer: []string{"Hello World", "Hello Earth", "Mars,/Earth"}, + expectedErrorAfterRead: []error{nil, nil}, + }, + "RingBuffer capacity is lower than Bytes written": { + input: []string{"Hello World", "Hello Earth", "Mars,/"}, + sliceCapacity: 5, + numberOfBytesToRead: []int{5, 5}, + expectedStartAfterRead: []int{1, 3}, + expectedEnd: []int{1, 3}, + expectedSizeOfBufferAfterRead: []int{0, 0}, + expectedReadSlice: []string{"World", "ars,/"}, + expectedWrittenBuffer: []string{"dWorl", "thEar", "s,/ar"}, + expectedErrorAfterRead: []error{nil, nil}, + }, + "RingBuffer capacity is more than Bytes written": { + input: []string{"Hello World", "Hello Earth", "Mars,/"}, + sliceCapacity: 15, + numberOfBytesToRead: []int{5, 8}, + expectedStartAfterRead: []int{5, 6}, + expectedEnd: []int{11, 13}, + expectedSizeOfBufferAfterRead: []int{6, 7}, + expectedReadSlice: []string{"Hello", "llo Eart"}, + expectedWrittenBuffer: []string{"Hello World", "o EarthorldHell", "o EarthMars,/ll"}, + expectedErrorAfterRead: []error{nil, nil}, + }, + "No Bytes written": { + input: []string{"", "", ""}, + sliceCapacity: 11, + numberOfBytesToRead: []int{5, 8}, + expectedStartAfterRead: []int{0, 0}, + expectedEnd: []int{0, 0}, + expectedSizeOfBufferAfterRead: []int{0, 0}, + expectedReadSlice: []string{"", ""}, + expectedWrittenBuffer: []string{"", "", ""}, + expectedErrorAfterRead: []error{io.EOF, io.EOF}, + }, + } + for name, c := range cases { + writeSlice := make([]byte, c.sliceCapacity) + ringBuffer := NewRingBuffer(writeSlice) + + t.Run(name, func(t *testing.T) { + ringBuffer.Write([]byte(c.input[0])) + if e, a := c.expectedWrittenBuffer[0], string(ringBuffer.slice); !strings.Contains(a, e) { + t.Errorf("Expected %v, got %v", e, a) + } + + readSlice := make([]byte, c.numberOfBytesToRead[0]) + readCount, err := ringBuffer.Read(readSlice) + + if e, a := c.expectedErrorAfterRead[0], err; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := len(c.expectedReadSlice[0]), readCount; e != a { + t.Errorf("Expected to read %v bytes, read only %v", e, a) + } + if e, a := c.expectedReadSlice[0], string(readSlice); !strings.Contains(a, e) { + t.Errorf("expect read buffer to be %v, got %v", e, a) + } + if e, a := c.expectedSizeOfBufferAfterRead[0], ringBuffer.size; e != a { + t.Errorf("expect buffer size to be %v , got %v", e, a) + } + if e, a := c.expectedStartAfterRead[0], ringBuffer.start; e != a { + t.Errorf("expect default start to point to %v , got %v", e, a) + } + if e, a := c.expectedEnd[0], ringBuffer.end; e != a { + t.Errorf("expect default end tp point to %v , got %v", e, a) + } + + /* + Next cycle of read writes. + */ + ringBuffer.Write([]byte(c.input[1])) + if e, a := c.expectedWrittenBuffer[1], string(ringBuffer.slice); !strings.Contains(a, e) { + t.Errorf("Expected %v, got %v", e, a) + } + + ringBuffer.Write([]byte(c.input[2])) + if e, a := c.expectedWrittenBuffer[2], string(ringBuffer.slice); !strings.Contains(a, e) { + t.Errorf("Expected %v, got %v", e, a) + } + + readSlice = make([]byte, c.numberOfBytesToRead[1]) + readCount, err = ringBuffer.Read(readSlice) + if e, a := c.expectedErrorAfterRead[1], err; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := len(c.expectedReadSlice[1]), readCount; e != a { + t.Errorf("Expected to read %v bytes, read only %v", e, a) + } + if e, a := c.expectedReadSlice[1], string(readSlice); !strings.Contains(a, e) { + t.Errorf("expect read buffer to be %v, got %v", e, a) + } + if e, a := c.expectedSizeOfBufferAfterRead[1], ringBuffer.size; e != a { + t.Errorf("expect buffer size to be %v , got %v", e, a) + } + if e, a := c.expectedStartAfterRead[1], ringBuffer.start; e != a { + t.Errorf("expect default start to point to %v , got %v", e, a) + } + if e, a := c.expectedEnd[1], ringBuffer.end; e != a { + t.Errorf("expect default end to point to %v , got %v", e, a) + } + }) + } +} + +func TestRingBuffer_ExhaustiveRead(t *testing.T) { + slice := make([]byte, 5) + buf := NewRingBuffer(slice) + buf.Write([]byte("Hello")) + + readSlice := make([]byte, 5) + readCount, err := buf.Read(readSlice) + if e, a := error(nil), err; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := 5, readCount; e != a { + t.Errorf("Expected to read %v bytes, read only %v", e, a) + } + if e, a := "Hello", string(readSlice); e != a { + t.Errorf("Expected %v to be read, got %v", e, a) + } + + readCount, err = buf.Read(readSlice) + if e, a := io.EOF, err; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := 0, readCount; e != a { + t.Errorf("Expected to read %v bytes, read only %v", e, a) + } + if e, a := 0, buf.size; e != a { + t.Errorf("Expected ring buffer size to be %v, got %v", e, a) + } +} + +func TestRingBuffer_Reset(t *testing.T) { + byteSlice := make([]byte, 10) + ringBuffer := NewRingBuffer(byteSlice) + + ringBuffer.Write([]byte("Hello-world")) + if ringBuffer.size == 0 { + t.Errorf("expected ringBuffer to not be empty") + } + + readBuffer := make([]byte, 5) + ringBuffer.Read(readBuffer) + if ringBuffer.size == 0 { + t.Errorf("expected ringBuffer to not be empty") + } + if e, a := "ello-", string(readBuffer); !strings.EqualFold(e, a) { + t.Errorf("expected read string to be %s, got %s", e, a) + } + + // reset the buffer + ringBuffer.Reset() + if e, a := 0, ringBuffer.size; e != a { + t.Errorf("expect default size to be %v , got %v", e, a) + } + if e, a := 0, ringBuffer.start; e != a { + t.Errorf("expect deafult start to point to %v , got %v", e, a) + } + if e, a := 0, ringBuffer.end; e != a { + t.Errorf("expect default end to point to %v , got %v", e, a) + } + if e, a := 10, len(ringBuffer.slice); e != a { + t.Errorf("expect ringBuffer capacity to be %v, got %v", e, a) + } + + ringBuffer.Write([]byte("someThing new")) + if ringBuffer.size == 0 { + t.Errorf("expected ringBuffer to not be empty") + } + + ringBuffer.Read(readBuffer) + if ringBuffer.size == 0 { + t.Errorf("expected ringBuffer to not be empty") + } + + // Here the ringBuffer length is 10; while written string is "someThing new"; + // The initial characters are thus overwritten by the ringbuffer. + // Thus the ring Buffer if completely read will have "eThing new". + // Here readBuffer size is 5; thus first 5 character "eThin" is read. + if e, a := "eThin", string(readBuffer); !strings.EqualFold(e, a) { + t.Errorf("expected read string to be %s, got %s", e, a) + } + + // reset the buffer + ringBuffer.Reset() + if e, a := 0, ringBuffer.size; e != a { + t.Errorf("expect default size to be %v , got %v", e, a) + } + if e, a := 0, ringBuffer.start; e != a { + t.Errorf("expect deafult start to point to %v , got %v", e, a) + } + if e, a := 0, ringBuffer.end; e != a { + t.Errorf("expect default end to point to %v , got %v", e, a) + } + if e, a := 10, len(ringBuffer.slice); e != a { + t.Errorf("expect ringBuffer capacity to be %v, got %v", e, a) + } + + // reading reset ring buffer + readCount, _ := ringBuffer.Read(readBuffer) + if ringBuffer.size != 0 { + t.Errorf("expected ringBuffer to be empty") + } + if e, a := 0, readCount; e != a { + t.Errorf("expected read string to be of length %v, got %v", e, a) + } +} + +func TestRingBufferWriteRead(t *testing.T) { + cases := []struct { + Input []byte + BufferSize int + Expected []byte + }{ + { + Input: func() []byte { + return []byte(`hello world!`) + }(), + BufferSize: 6, + Expected: []byte(`world!`), + }, + { + Input: func() []byte { + return []byte(`hello world!`) + }(), + BufferSize: 12, + Expected: []byte(`hello world!`), + }, + { + Input: func() []byte { + return []byte(`hello`) + }(), + BufferSize: 6, + Expected: []byte(`hello`), + }, + { + Input: func() []byte { + return []byte(`hello!!`) + }(), + BufferSize: 6, + Expected: []byte(`ello!!`), + }, + } + + for i, tt := range cases { + t.Run(strconv.Itoa(i), func(t *testing.T) { + dataReader := bytes.NewReader(tt.Input) + + ringBuffer := NewRingBuffer(make([]byte, tt.BufferSize)) + + n, err := io.Copy(ringBuffer, dataReader) + if err != nil { + t.Errorf("unexpected error, %v", err) + return + } + + if e, a := int64(len(tt.Input)), n; e != a { + t.Errorf("expect %v, got %v", e, a) + } + + actual, err := ioutil.ReadAll(ringBuffer) + if err != nil { + t.Errorf("unexpected error, %v", err) + return + } + + if diff := cmp.Diff(tt.Expected, actual); len(diff) > 0 { + t.Error(diff) + return + } + }) + } +} diff --git a/vendor/github.com/aws/smithy-go/io/ya.make b/vendor/github.com/aws/smithy-go/io/ya.make new file mode 100644 index 0000000000..bb55b2c8c0 --- /dev/null +++ b/vendor/github.com/aws/smithy-go/io/ya.make @@ -0,0 +1,18 @@ +GO_LIBRARY() + +LICENSE(Apache-2.0) + +SRCS( + byte.go + doc.go + reader.go + ringbuffer.go +) + +GO_TEST_SRCS(ringbuffer_test.go) + +END() + +RECURSE( + gotest +) |