aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/aws/aws-sdk-go-v2/service/s3/eventstream.go
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-12-12 21:55:07 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-12-12 22:25:10 +0300
commit4967f99474a4040ba150eb04995de06342252718 (patch)
treec9c118836513a8fab6e9fcfb25be5d404338bca7 /vendor/github.com/aws/aws-sdk-go-v2/service/s3/eventstream.go
parent2ce9cccb9b0bdd4cd7a3491dc5cbf8687cda51de (diff)
downloadydb-4967f99474a4040ba150eb04995de06342252718.tar.gz
YQ Connector: prepare code base for S3 integration
1. Кодовая база Коннектора переписана с помощью Go дженериков так, чтобы добавление нового источника данных (в частности S3 + csv) максимально переиспользовало имеющийся код (чтобы сохранялась логика нарезания на блоки данных, учёт трафика и пр.) 2. API Connector расширено для работы с S3, но ещё пока не протестировано.
Diffstat (limited to 'vendor/github.com/aws/aws-sdk-go-v2/service/s3/eventstream.go')
-rw-r--r--vendor/github.com/aws/aws-sdk-go-v2/service/s3/eventstream.go285
1 files changed, 285 insertions, 0 deletions
diff --git a/vendor/github.com/aws/aws-sdk-go-v2/service/s3/eventstream.go b/vendor/github.com/aws/aws-sdk-go-v2/service/s3/eventstream.go
new file mode 100644
index 0000000000..d6cdb53372
--- /dev/null
+++ b/vendor/github.com/aws/aws-sdk-go-v2/service/s3/eventstream.go
@@ -0,0 +1,285 @@
+// Code generated by smithy-go-codegen DO NOT EDIT.
+
+package s3
+
+import (
+ "context"
+ "fmt"
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream"
+ "github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream/eventstreamapi"
+ "github.com/aws/aws-sdk-go-v2/service/s3/types"
+ smithy "github.com/aws/smithy-go"
+ "github.com/aws/smithy-go/middleware"
+ smithysync "github.com/aws/smithy-go/sync"
+ smithyhttp "github.com/aws/smithy-go/transport/http"
+ "io"
+ "io/ioutil"
+ "sync"
+)
+
+// SelectObjectContentEventStreamReader provides the interface for reading events
+// from a stream.
+//
+// The writer's Close method must allow multiple concurrent calls.
+type SelectObjectContentEventStreamReader interface {
+ Events() <-chan types.SelectObjectContentEventStream
+ Close() error
+ Err() error
+}
+
+type selectObjectContentEventStreamReader struct {
+ stream chan types.SelectObjectContentEventStream
+ decoder *eventstream.Decoder
+ eventStream io.ReadCloser
+ err *smithysync.OnceErr
+ payloadBuf []byte
+ done chan struct{}
+ closeOnce sync.Once
+}
+
+func newSelectObjectContentEventStreamReader(readCloser io.ReadCloser, decoder *eventstream.Decoder) *selectObjectContentEventStreamReader {
+ w := &selectObjectContentEventStreamReader{
+ stream: make(chan types.SelectObjectContentEventStream),
+ decoder: decoder,
+ eventStream: readCloser,
+ err: smithysync.NewOnceErr(),
+ done: make(chan struct{}),
+ payloadBuf: make([]byte, 10*1024),
+ }
+
+ go w.readEventStream()
+
+ return w
+}
+
+func (r *selectObjectContentEventStreamReader) Events() <-chan types.SelectObjectContentEventStream {
+ return r.stream
+}
+
+func (r *selectObjectContentEventStreamReader) readEventStream() {
+ defer r.Close()
+ defer close(r.stream)
+
+ for {
+ r.payloadBuf = r.payloadBuf[0:0]
+ decodedMessage, err := r.decoder.Decode(r.eventStream, r.payloadBuf)
+ if err != nil {
+ if err == io.EOF {
+ return
+ }
+ select {
+ case <-r.done:
+ return
+ default:
+ r.err.SetError(err)
+ return
+ }
+ }
+
+ event, err := r.deserializeEventMessage(&decodedMessage)
+ if err != nil {
+ r.err.SetError(err)
+ return
+ }
+
+ select {
+ case r.stream <- event:
+ case <-r.done:
+ return
+ }
+
+ }
+}
+
+func (r *selectObjectContentEventStreamReader) deserializeEventMessage(msg *eventstream.Message) (types.SelectObjectContentEventStream, error) {
+ messageType := msg.Headers.Get(eventstreamapi.MessageTypeHeader)
+ if messageType == nil {
+ return nil, fmt.Errorf("%s event header not present", eventstreamapi.MessageTypeHeader)
+ }
+
+ switch messageType.String() {
+ case eventstreamapi.EventMessageType:
+ var v types.SelectObjectContentEventStream
+ if err := awsRestxml_deserializeEventStreamSelectObjectContentEventStream(&v, msg); err != nil {
+ return nil, err
+ }
+ return v, nil
+
+ case eventstreamapi.ExceptionMessageType:
+ return nil, awsRestxml_deserializeEventStreamExceptionSelectObjectContentEventStream(msg)
+
+ case eventstreamapi.ErrorMessageType:
+ errorCode := "UnknownError"
+ errorMessage := errorCode
+ if header := msg.Headers.Get(eventstreamapi.ErrorCodeHeader); header != nil {
+ errorCode = header.String()
+ }
+ if header := msg.Headers.Get(eventstreamapi.ErrorMessageHeader); header != nil {
+ errorMessage = header.String()
+ }
+ return nil, &smithy.GenericAPIError{
+ Code: errorCode,
+ Message: errorMessage,
+ }
+
+ default:
+ mc := msg.Clone()
+ return nil, &UnknownEventMessageError{
+ Type: messageType.String(),
+ Message: &mc,
+ }
+
+ }
+}
+
+func (r *selectObjectContentEventStreamReader) ErrorSet() <-chan struct{} {
+ return r.err.ErrorSet()
+}
+
+func (r *selectObjectContentEventStreamReader) Close() error {
+ r.closeOnce.Do(r.safeClose)
+ return r.Err()
+}
+
+func (r *selectObjectContentEventStreamReader) safeClose() {
+ close(r.done)
+ r.eventStream.Close()
+
+}
+
+func (r *selectObjectContentEventStreamReader) Err() error {
+ return r.err.Err()
+}
+
+func (r *selectObjectContentEventStreamReader) Closed() <-chan struct{} {
+ return r.done
+}
+
+type awsRestxml_deserializeOpEventStreamSelectObjectContent struct {
+ LogEventStreamWrites bool
+ LogEventStreamReads bool
+}
+
+func (*awsRestxml_deserializeOpEventStreamSelectObjectContent) ID() string {
+ return "OperationEventStreamDeserializer"
+}
+
+func (m *awsRestxml_deserializeOpEventStreamSelectObjectContent) HandleDeserialize(ctx context.Context, in middleware.DeserializeInput, next middleware.DeserializeHandler) (
+ out middleware.DeserializeOutput, metadata middleware.Metadata, err error,
+) {
+ defer func() {
+ if err == nil {
+ return
+ }
+ m.closeResponseBody(out)
+ }()
+
+ logger := middleware.GetLogger(ctx)
+
+ request, ok := in.Request.(*smithyhttp.Request)
+ if !ok {
+ return out, metadata, fmt.Errorf("unknown transport type: %T", in.Request)
+ }
+ _ = request
+
+ out, metadata, err = next.HandleDeserialize(ctx, in)
+ if err != nil {
+ return out, metadata, err
+ }
+
+ deserializeOutput, ok := out.RawResponse.(*smithyhttp.Response)
+ if !ok {
+ return out, metadata, fmt.Errorf("unknown transport type: %T", out.RawResponse)
+ }
+ _ = deserializeOutput
+
+ output, ok := out.Result.(*SelectObjectContentOutput)
+ if out.Result != nil && !ok {
+ return out, metadata, fmt.Errorf("unexpected output result type: %T", out.Result)
+ } else if out.Result == nil {
+ output = &SelectObjectContentOutput{}
+ out.Result = output
+ }
+
+ eventReader := newSelectObjectContentEventStreamReader(
+ deserializeOutput.Body,
+ eventstream.NewDecoder(func(options *eventstream.DecoderOptions) {
+ options.Logger = logger
+ options.LogMessages = m.LogEventStreamReads
+
+ }),
+ )
+ defer func() {
+ if err == nil {
+ return
+ }
+ _ = eventReader.Close()
+ }()
+
+ output.eventStream = NewSelectObjectContentEventStream(func(stream *SelectObjectContentEventStream) {
+ stream.Reader = eventReader
+ })
+
+ go output.eventStream.waitStreamClose()
+
+ return out, metadata, nil
+}
+
+func (*awsRestxml_deserializeOpEventStreamSelectObjectContent) closeResponseBody(out middleware.DeserializeOutput) {
+ if resp, ok := out.RawResponse.(*smithyhttp.Response); ok && resp != nil && resp.Body != nil {
+ _, _ = io.Copy(ioutil.Discard, resp.Body)
+ _ = resp.Body.Close()
+ }
+}
+
+func addEventStreamSelectObjectContentMiddleware(stack *middleware.Stack, options Options) error {
+ if err := stack.Deserialize.Insert(&awsRestxml_deserializeOpEventStreamSelectObjectContent{
+ LogEventStreamWrites: options.ClientLogMode.IsRequestEventMessage(),
+ LogEventStreamReads: options.ClientLogMode.IsResponseEventMessage(),
+ }, "OperationDeserializer", middleware.Before); err != nil {
+ return err
+ }
+ return nil
+
+}
+
+// UnknownEventMessageError provides an error when a message is received from the stream,
+// but the reader is unable to determine what kind of message it is.
+type UnknownEventMessageError struct {
+ Type string
+ Message *eventstream.Message
+}
+
+// Error retruns the error message string.
+func (e *UnknownEventMessageError) Error() string {
+ return "unknown event stream message type, " + e.Type
+}
+
+func setSafeEventStreamClientLogMode(o *Options, operation string) {
+ switch operation {
+ case "SelectObjectContent":
+ toggleEventStreamClientLogMode(o, false, true)
+ return
+
+ default:
+ return
+
+ }
+}
+func toggleEventStreamClientLogMode(o *Options, request, response bool) {
+ mode := o.ClientLogMode
+
+ if request && mode.IsRequestWithBody() {
+ mode.ClearRequestWithBody()
+ mode |= aws.LogRequest
+ }
+
+ if response && mode.IsResponseWithBody() {
+ mode.ClearResponseWithBody()
+ mode |= aws.LogResponse
+ }
+
+ o.ClientLogMode = mode
+
+}