aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-11-13 10:20:39 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-11-13 10:38:14 +0300
commit5315659cfcfc83a2997433487f8373a69f5b38a9 (patch)
tree59e4a53e9b3c4a6e0bc9fca191b51c9d64c48cb9
parentb7579a2e21c2048763703213e852f6b76ad31cfa (diff)
downloadydb-5315659cfcfc83a2997433487f8373a69f5b38a9.tar.gz
YQ Connector: fix deadlock in ReadSplits
1. Поправлен механизм перекачки данных из соединения с СУБД в GRPC-стрим (устранён дедлок, асинхронная логика упростилась) 2. Упрощен overengineered код (перемудрил раньше с Go дженериками) 3. Написаны сквозные юнит-тесты, покрывающие всю логику перекладки данных из базы в GRPC стрим. 4. Выявлено нарушение соглашения о типах (бинарные строки мапились не в YQL String, а в YQL UTF8)
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go8
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go4
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/config.go10
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/paging/columnar_buffer_arrow_ipc_streaming.go8
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/paging/interface.go46
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/paging/mock.go39
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/paging/sink.go126
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/paging/sink_factory.go53
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/paging/sink_string.go26
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/paging/writer.go83
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/paging/writer_factory.go44
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/paging/ya.make9
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/postgresql/connection_manager.go8
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/postgresql/query_executor.go4
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go73
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/rdbms/handler_factory.go26
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/rdbms/interface.go12
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/rdbms/mock.go33
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/service_connector.go35
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/streaming/streamer.go85
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/streaming/streamer_test.go398
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/arrow_helpers.go8
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/errors.go1
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/query_builder.go11
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/sql.go (renamed from ydb/library/yql/providers/generic/connector/app/server/utils/connection_manager.go)11
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/sql_mock.go90
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/time_test.go147
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/type_mapper.go1
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/unit_test_helpers.go75
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/unit_test_helpers_test.go72
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/ut/ya.make5
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/ya.make10
32 files changed, 1071 insertions, 490 deletions
diff --git a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go
index 0b9d9e95af..4aa52b99fa 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go
@@ -59,7 +59,7 @@ func (c Connection) Query(ctx context.Context, query string, args ...any) (utils
return rows{Rows: out}, nil
}
-var _ utils.ConnectionManager[*Connection] = (*connectionManager)(nil)
+var _ utils.ConnectionManager = (*connectionManager)(nil)
type connectionManager struct {
utils.ConnectionManagerBase
@@ -70,7 +70,7 @@ func (c *connectionManager) Make(
ctx context.Context,
logger log.Logger,
dsi *api_common.TDataSourceInstance,
-) (*Connection, error) {
+) (utils.Connection, error) {
if dsi.GetCredentials().GetBasic() == nil {
return nil, fmt.Errorf("currently only basic auth is supported")
}
@@ -129,10 +129,10 @@ func (c *connectionManager) Make(
return &Connection{DB: conn, logger: queryLogger}, nil
}
-func (c *connectionManager) Release(logger log.Logger, conn *Connection) {
+func (c *connectionManager) Release(logger log.Logger, conn utils.Connection) {
utils.LogCloserError(logger, conn, "close clickhouse connection")
}
-func NewConnectionManager(cfg utils.ConnectionManagerBase) utils.ConnectionManager[*Connection] {
+func NewConnectionManager(cfg utils.ConnectionManagerBase) utils.ConnectionManager {
return &connectionManager{ConnectionManagerBase: cfg}
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go
index 2f9071bc65..a3b469d58c 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go
@@ -11,7 +11,7 @@ import (
type queryExecutor struct {
}
-func (qm queryExecutor) DescribeTable(ctx context.Context, conn *Connection, request *api_service_protos.TDescribeTableRequest) (utils.Rows, error) {
+func (qm queryExecutor) DescribeTable(ctx context.Context, conn utils.Connection, request *api_service_protos.TDescribeTableRequest) (utils.Rows, error) {
out, err := conn.Query(
ctx,
"SELECT name, type FROM system.columns WHERE table = ? and database = ?",
@@ -30,6 +30,6 @@ func (qm queryExecutor) DescribeTable(ctx context.Context, conn *Connection, req
return out, nil
}
-func NewQueryExecutor() utils.QueryExecutor[*Connection] {
+func NewQueryExecutor() utils.QueryExecutor {
return queryExecutor{}
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/config.go b/ydb/library/yql/providers/generic/connector/app/server/config.go
index 15c7998b98..8a3eca0696 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/config.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/config.go
@@ -2,7 +2,6 @@ package server
import (
"fmt"
- "io/ioutil"
"math"
"os"
@@ -123,14 +122,19 @@ func fileMustExist(path string) error {
}
func newConfigFromPath(configPath string) (*config.TServerConfig, error) {
- data, err := ioutil.ReadFile(configPath)
+ data, err := os.ReadFile(configPath)
if err != nil {
return nil, fmt.Errorf("read file %v: %w", configPath, err)
}
var cfg config.TServerConfig
- if err := prototext.Unmarshal(data, &cfg); err != nil {
+ unmarshaller := prototext.UnmarshalOptions{
+ // Do not emit an error if config contains outdated or too fresh fields
+ DiscardUnknown: true,
+ }
+
+ if err := unmarshaller.Unmarshal(data, &cfg); err != nil {
return nil, fmt.Errorf("prototext unmarshal `%v`: %w", string(data), err)
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/columnar_buffer_arrow_ipc_streaming.go b/ydb/library/yql/providers/generic/connector/app/server/paging/columnar_buffer_arrow_ipc_streaming.go
index e0106a014d..76ec036ab3 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/paging/columnar_buffer_arrow_ipc_streaming.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/paging/columnar_buffer_arrow_ipc_streaming.go
@@ -86,13 +86,15 @@ func (cb *columnarBufferArrowIPCStreaming) Release() {
}
}
+func (cb *columnarBufferArrowIPCStreaming) TotalRows() int { return cb.builders[0].Len() }
+
// special implementation for buffer that writes schema with empty columns set
type columnarBufferArrowIPCStreamingEmptyColumns struct {
arrowAllocator memory.Allocator
readLimiter ReadLimiter
schema *arrow.Schema
typeMapper utils.TypeMapper
- rowsAdded int64
+ rowsAdded int
}
// AddRow saves a row obtained from the datasource into the buffer
@@ -114,7 +116,7 @@ func (cb *columnarBufferArrowIPCStreamingEmptyColumns) AddRow(acceptors []any) e
func (cb *columnarBufferArrowIPCStreamingEmptyColumns) ToResponse() (*api_service_protos.TReadSplitsResponse, error) {
columns := make([]arrow.Array, 0)
- record := array.NewRecord(cb.schema, columns, cb.rowsAdded)
+ record := array.NewRecord(cb.schema, columns, int64(cb.rowsAdded))
// prepare arrow writer
var buf bytes.Buffer
@@ -138,6 +140,8 @@ func (cb *columnarBufferArrowIPCStreamingEmptyColumns) ToResponse() (*api_servic
return out, nil
}
+func (cb *columnarBufferArrowIPCStreamingEmptyColumns) TotalRows() int { return int(cb.rowsAdded) }
+
// Frees resources if buffer is no longer used
func (cb *columnarBufferArrowIPCStreamingEmptyColumns) Release() {
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/interface.go b/ydb/library/yql/providers/generic/connector/app/server/paging/interface.go
index 20762478a3..0ae70c3b60 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/paging/interface.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/paging/interface.go
@@ -7,29 +7,45 @@ import (
api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
)
-type Writer interface {
- AddRow(acceptors []any) error
- Finish() error
- BufferQueue() <-chan ColumnarBuffer
-}
-
-type WriterFactory interface {
- MakeWriter(
- ctx context.Context,
- logger log.Logger,
- pagination *api_service_protos.TPagination,
- ) (Writer, error)
-}
-
type ColumnarBuffer interface {
// AddRow saves a row obtained from the datasource into the buffer
AddRow(acceptors []any) error
// ToResponse returns all the accumulated data and clears buffer
ToResponse() (*api_service_protos.TReadSplitsResponse, error)
- // Frees resources if buffer is no longer used
+ // Release frees resources if buffer is no longer used
Release()
+ // TotalRows return the number of rows accumulated
+ TotalRows() int
}
type ColumnarBufferFactory interface {
MakeBuffer() (ColumnarBuffer, error)
}
+
+// ReadResult is an algebraic data type containing:
+// 1. a buffer (e. g. page) packed with data
+// 2. result of read operation (potentially with error)
+type ReadResult struct {
+ ColumnarBuffer ColumnarBuffer
+ Error error
+}
+
+// Sink is a destination for a data stream that is read out of an external data source.
+type Sink interface {
+ // AddRow saves the row obtained from a stream incoming from an external data source.
+ AddRow(acceptors []any) error
+ // AddError propagates an error occured during the reading from the external data source.
+ AddError(err error)
+ // Finish reports the successful completion of reading the data stream.
+ Finish()
+ // ResultQueue returns a channel with results
+ ResultQueue() <-chan *ReadResult
+}
+
+type SinkFactory interface {
+ MakeSink(
+ ctx context.Context,
+ logger log.Logger,
+ pagination *api_service_protos.TPagination,
+ ) (Sink, error)
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/mock.go b/ydb/library/yql/providers/generic/connector/app/server/paging/mock.go
index c9642611f4..95325f4f53 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/paging/mock.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/paging/mock.go
@@ -8,43 +8,44 @@ import (
api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
)
-var _ Writer = (*WriterMock)(nil)
+var _ Sink = (*SinkMock)(nil)
-type WriterMock struct {
- ColumnarBufferChan chan ColumnarBuffer
+type SinkMock struct {
mock.Mock
}
-func (m *WriterMock) AddRow(acceptors []any) error {
- panic("not implemented") // TODO: Implement
-}
+func (m *SinkMock) AddRow(acceptors []any) error {
+ args := m.Called(acceptors...)
-func (m *WriterMock) Finish() error {
- args := m.Called()
+ return args.Error(0)
+}
- close(m.ColumnarBufferChan)
+func (m *SinkMock) AddError(err error) {
+ m.Called(err)
+}
- return args.Error(0)
+func (m *SinkMock) Finish() {
+ m.Called()
}
-func (m *WriterMock) BufferQueue() <-chan ColumnarBuffer {
- return m.ColumnarBufferChan
+func (m *SinkMock) ResultQueue() <-chan *ReadResult {
+ return m.Called().Get(0).(chan *ReadResult)
}
-var _ WriterFactory = (*WriterFactoryMock)(nil)
+var _ SinkFactory = (*SinkFactoryMock)(nil)
-type WriterFactoryMock struct {
+type SinkFactoryMock struct {
mock.Mock
}
-func (m *WriterFactoryMock) MakeWriter(
+func (m *SinkFactoryMock) MakeSink(
ctx context.Context,
logger log.Logger,
pagination *api_service_protos.TPagination,
-) (Writer, error) {
+) (Sink, error) {
args := m.Called(pagination)
- return args.Get(0).(Writer), args.Error(1)
+ return args.Get(0).(Sink), args.Error(1)
}
var _ ColumnarBuffer = (*ColumnarBufferMock)(nil)
@@ -66,3 +67,7 @@ func (m *ColumnarBufferMock) ToResponse() (*api_service_protos.TReadSplitsRespon
func (m *ColumnarBufferMock) Release() {
m.Called()
}
+
+func (m *ColumnarBufferMock) TotalRows() int {
+ panic("not implemented") // TODO: Implement
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/sink.go b/ydb/library/yql/providers/generic/connector/app/server/paging/sink.go
new file mode 100644
index 0000000000..d616b30088
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/paging/sink.go
@@ -0,0 +1,126 @@
+//go:generate stringer -type=sinkState -output=sink_string.go
+package paging
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/ydb-platform/ydb/library/go/core/log"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
+ api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
+)
+
+type sinkState int8
+
+const (
+ operational sinkState = iota + 1
+ failed
+ finished
+)
+
+var _ Sink = (*sinkImpl)(nil)
+
+type sinkImpl struct {
+ currBuffer ColumnarBuffer // accumulates incoming rows
+ resultQueue chan *ReadResult // outgoing buffer queue
+ bufferFactory ColumnarBufferFactory // creates new buffer
+ pagination *api_service_protos.TPagination // settings
+ rowsReceived uint64 // simple stats
+ rowsPerBuffer uint64 // TODO: use cfg
+ logger log.Logger // annotated logger
+ state sinkState // flag showing if it's ready to return data
+ ctx context.Context // client context
+}
+
+func (s *sinkImpl) AddRow(acceptors []any) error {
+ if s.state != operational {
+ panic(s.unexpectedState(operational))
+ }
+
+ if err := s.currBuffer.AddRow(acceptors); err != nil {
+ return fmt.Errorf("acceptors to row set: %w", err)
+ }
+
+ s.rowsReceived++
+
+ if s.isEnough() {
+ if err := s.flush(true); err != nil {
+ return fmt.Errorf("flush: %w", err)
+ }
+ }
+
+ return nil
+}
+
+func (s *sinkImpl) AddError(err error) {
+ if s.state != operational {
+ panic(s.unexpectedState(operational))
+ }
+
+ s.respondWith(nil, err)
+
+ s.state = failed
+}
+
+func (s *sinkImpl) isEnough() bool {
+ // TODO: implement pagination logic, check limits provided by client or config
+ return s.rowsReceived%s.rowsPerBuffer == 0
+}
+
+func (s *sinkImpl) flush(makeNewBuffer bool) error {
+ if s.currBuffer.TotalRows() == 0 {
+ return nil
+ }
+
+ s.respondWith(s.currBuffer, nil)
+
+ s.currBuffer = nil
+
+ if makeNewBuffer {
+ var err error
+
+ s.currBuffer, err = s.bufferFactory.MakeBuffer()
+ if err != nil {
+ return fmt.Errorf("make buffer: %w", err)
+ }
+ }
+
+ return nil
+}
+
+func (s *sinkImpl) Finish() {
+ if s.state != operational && s.state != failed {
+ panic(s.unexpectedState(operational, failed))
+ }
+
+ // if there is some data left, send it to the channel
+ if s.state == operational {
+ err := s.flush(false)
+ if err != nil {
+ s.respondWith(nil, fmt.Errorf("flush: %w", err))
+ s.state = failed
+ } else {
+ s.state = finished
+ }
+ }
+
+ // notify reader about the end of data
+ close(s.resultQueue)
+}
+
+func (s *sinkImpl) ResultQueue() <-chan *ReadResult {
+ return s.resultQueue
+}
+
+func (s *sinkImpl) respondWith(buf ColumnarBuffer, err error) {
+ select {
+ case s.resultQueue <- &ReadResult{ColumnarBuffer: buf, Error: err}:
+ case <-s.ctx.Done():
+ }
+}
+
+func (s *sinkImpl) unexpectedState(expected ...sinkState) error {
+ return fmt.Errorf(
+ "unexpected state '%v' (expected are '%v'): %w",
+ s.state, expected, utils.ErrInvariantViolation)
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/sink_factory.go b/ydb/library/yql/providers/generic/connector/app/server/paging/sink_factory.go
new file mode 100644
index 0000000000..c2c482d820
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/paging/sink_factory.go
@@ -0,0 +1,53 @@
+package paging
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/ydb-platform/ydb/library/go/core/log"
+ api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
+)
+
+type sinkFactoryImpl struct {
+ columnarBufferFactory ColumnarBufferFactory
+ resultQueueCapacity int // TODO: take from config
+ rowsPerBuffer int // TODO: take from config
+}
+
+func (sf *sinkFactoryImpl) MakeSink(
+ ctx context.Context,
+ logger log.Logger,
+ pagination *api_service_protos.TPagination,
+) (Sink, error) {
+ if pagination != nil {
+ return nil, fmt.Errorf("pagination settings are not supported yet")
+ }
+
+ buffer, err := sf.columnarBufferFactory.MakeBuffer()
+ if err != nil {
+ return nil, fmt.Errorf("wrap buffer: %w", err)
+ }
+
+ return &sinkImpl{
+ bufferFactory: sf.columnarBufferFactory,
+ resultQueue: make(chan *ReadResult, sf.resultQueueCapacity),
+ rowsPerBuffer: uint64(sf.rowsPerBuffer),
+ currBuffer: buffer,
+ logger: logger,
+ pagination: pagination,
+ state: operational,
+ ctx: ctx,
+ }, nil
+}
+
+func NewSinkFactory(
+ cbf ColumnarBufferFactory,
+ resultQueueCapacity int,
+ rowsPerBuffer int,
+) SinkFactory {
+ return &sinkFactoryImpl{
+ columnarBufferFactory: cbf,
+ resultQueueCapacity: resultQueueCapacity,
+ rowsPerBuffer: rowsPerBuffer,
+ }
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/sink_string.go b/ydb/library/yql/providers/generic/connector/app/server/paging/sink_string.go
new file mode 100644
index 0000000000..bea6becd67
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/paging/sink_string.go
@@ -0,0 +1,26 @@
+// Code generated by "stringer -type=sinkState -output=sink_string.go"; DO NOT EDIT.
+
+package paging
+
+import "strconv"
+
+func _() {
+ // An "invalid array index" compiler error signifies that the constant values have changed.
+ // Re-run the stringer command to generate them again.
+ var x [1]struct{}
+ _ = x[operational-1]
+ _ = x[failed-2]
+ _ = x[finished-3]
+}
+
+const _sinkState_name = "operationalfailedfinished"
+
+var _sinkState_index = [...]uint8{0, 11, 17, 25}
+
+func (i sinkState) String() string {
+ i -= 1
+ if i < 0 || i >= sinkState(len(_sinkState_index)-1) {
+ return "sinkState(" + strconv.FormatInt(int64(i+1), 10) + ")"
+ }
+ return _sinkState_name[_sinkState_index[i]:_sinkState_index[i+1]]
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/writer.go b/ydb/library/yql/providers/generic/connector/app/server/paging/writer.go
deleted file mode 100644
index ced84deabe..0000000000
--- a/ydb/library/yql/providers/generic/connector/app/server/paging/writer.go
+++ /dev/null
@@ -1,83 +0,0 @@
-package paging
-
-import (
- "context"
- "fmt"
-
- "github.com/ydb-platform/ydb/library/go/core/log"
- api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
-)
-
-var _ Writer = (*writerImpl)(nil)
-
-type writerImpl struct {
- buffer ColumnarBuffer // accumulates data from rows
- bufferQueue chan ColumnarBuffer // outgoing buffer queue
- bufferFactory ColumnarBufferFactory // creates new buffer
- pagination *api_service_protos.TPagination // settings
- rowsReceived uint64 // simple stats
- logger log.Logger // annotated logger
- operational bool // flag showing if it's ready to return data
- ctx context.Context // client context
-}
-
-func (pw *writerImpl) AddRow(acceptors []any) error {
- if !pw.operational {
- return fmt.Errorf("paging writer is not operational")
- }
-
- if err := pw.buffer.AddRow(acceptors); err != nil {
- return fmt.Errorf("acceptors to row set: %w", err)
- }
-
- pw.rowsReceived++
-
- if pw.isEnough() {
- if err := pw.flush(true); err != nil {
- return fmt.Errorf("flush: %w", err)
- }
- }
-
- return nil
-}
-
-func (pw *writerImpl) isEnough() bool {
- // TODO: implement pagination logic, check limits provided by client
- return pw.rowsReceived%10000 == 0
-}
-
-func (pw *writerImpl) flush(makeNewBuffer bool) error {
- select {
- case pw.bufferQueue <- pw.buffer:
- case <-pw.ctx.Done():
- return pw.ctx.Err()
- }
-
- var err error
-
- if makeNewBuffer {
- pw.buffer, err = pw.bufferFactory.MakeBuffer()
- if err != nil {
- return fmt.Errorf("make buffer: %w", err)
- }
- }
-
- return nil
-}
-
-func (pw *writerImpl) Finish() error {
- if err := pw.flush(false); err != nil {
- return fmt.Errorf("flush: %w", err)
- }
-
- pw.operational = false
-
- // notify reader about end of the stream
- close(pw.bufferQueue)
-
- return nil
-}
-
-func (pw *writerImpl) BufferQueue() <-chan ColumnarBuffer {
- return pw.bufferQueue
-}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/writer_factory.go b/ydb/library/yql/providers/generic/connector/app/server/paging/writer_factory.go
deleted file mode 100644
index 3cded585f8..0000000000
--- a/ydb/library/yql/providers/generic/connector/app/server/paging/writer_factory.go
+++ /dev/null
@@ -1,44 +0,0 @@
-package paging
-
-import (
- "context"
- "fmt"
-
- "github.com/ydb-platform/ydb/library/go/core/log"
- api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
-)
-
-type writerFactoryImpl struct {
- columnarBufferFactory ColumnarBufferFactory
-}
-
-func (wf *writerFactoryImpl) MakeWriter(
- ctx context.Context,
- logger log.Logger,
- pagination *api_service_protos.TPagination,
-) (Writer, error) {
- if pagination != nil {
- return nil, fmt.Errorf("pagination settings are not supported yet")
- }
-
- buffer, err := wf.columnarBufferFactory.MakeBuffer()
- if err != nil {
- return nil, fmt.Errorf("wrap buffer: %w", err)
- }
-
- return &writerImpl{
- bufferFactory: wf.columnarBufferFactory,
- bufferQueue: make(chan ColumnarBuffer, 10), // TODO: use config
- buffer: buffer,
- logger: logger,
- pagination: pagination,
- operational: true,
- ctx: ctx,
- }, nil
-}
-
-func NewWriterFactory(cbf ColumnarBufferFactory) WriterFactory {
- return &writerFactoryImpl{
- columnarBufferFactory: cbf,
- }
-}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/ya.make b/ydb/library/yql/providers/generic/connector/app/server/paging/ya.make
index 080042a0cc..6479e5b3f5 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/paging/ya.make
+++ b/ydb/library/yql/providers/generic/connector/app/server/paging/ya.make
@@ -7,12 +7,9 @@ SRCS(
interface.go
mock.go
read_limiter.go
- writer.go
- writer_factory.go
-)
-
-GO_TEST_SRCS(
- time_test.go
+ sink.go
+ sink_factory.go
+ sink_string.go
)
END()
diff --git a/ydb/library/yql/providers/generic/connector/app/server/postgresql/connection_manager.go b/ydb/library/yql/providers/generic/connector/app/server/postgresql/connection_manager.go
index cd80d35cef..874f7cc758 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/postgresql/connection_manager.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/postgresql/connection_manager.go
@@ -57,7 +57,7 @@ func (c Connection) Query(ctx context.Context, query string, args ...any) (utils
return rows{Rows: out}, err
}
-var _ utils.ConnectionManager[*Connection] = (*connectionManager)(nil)
+var _ utils.ConnectionManager = (*connectionManager)(nil)
type connectionManager struct {
utils.ConnectionManagerBase
@@ -68,7 +68,7 @@ func (c *connectionManager) Make(
ctx context.Context,
logger log.Logger,
dsi *api_common.TDataSourceInstance,
-) (*Connection, error) {
+) (utils.Connection, error) {
if dsi.GetCredentials().GetBasic() == nil {
return nil, fmt.Errorf("currently only basic auth is supported")
}
@@ -118,10 +118,10 @@ func (c *connectionManager) Make(
return &Connection{conn, queryLogger}, nil
}
-func (c *connectionManager) Release(logger log.Logger, conn *Connection) {
+func (c *connectionManager) Release(logger log.Logger, conn utils.Connection) {
utils.LogCloserError(logger, conn, "close posgresql connection")
}
-func NewConnectionManager(cfg utils.ConnectionManagerBase) utils.ConnectionManager[*Connection] {
+func NewConnectionManager(cfg utils.ConnectionManagerBase) utils.ConnectionManager {
return &connectionManager{ConnectionManagerBase: cfg}
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/postgresql/query_executor.go b/ydb/library/yql/providers/generic/connector/app/server/postgresql/query_executor.go
index b27578bfa9..e894bfb3c7 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/postgresql/query_executor.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/postgresql/query_executor.go
@@ -11,7 +11,7 @@ import (
type queryExecutor struct {
}
-func (qm queryExecutor) DescribeTable(ctx context.Context, conn *Connection, request *api_service_protos.TDescribeTableRequest) (utils.Rows, error) {
+func (qm queryExecutor) DescribeTable(ctx context.Context, conn utils.Connection, request *api_service_protos.TDescribeTableRequest) (utils.Rows, error) {
schema := request.GetDataSourceInstance().GetPgOptions().GetSchema()
out, err := conn.Query(
ctx,
@@ -27,6 +27,6 @@ func (qm queryExecutor) DescribeTable(ctx context.Context, conn *Connection, req
return out, nil
}
-func NewQueryExecutor() utils.QueryExecutor[*Connection] {
+func NewQueryExecutor() utils.QueryExecutor {
return queryExecutor{}
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go
index e24ef109b7..66c8268591 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go
@@ -6,20 +6,19 @@ import (
"strings"
"github.com/ydb-platform/ydb/library/go/core/log"
- api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common"
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/paging"
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
)
-type handlerImpl[CONN utils.Connection] struct {
+type handlerImpl struct {
typeMapper utils.TypeMapper
- queryBuilder utils.QueryExecutor[CONN]
- connectionManager utils.ConnectionManager[CONN]
+ queryBuilder utils.QueryExecutor
+ connectionManager utils.ConnectionManager
logger log.Logger
}
-func (h *handlerImpl[CONN]) DescribeTable(
+func (h *handlerImpl) DescribeTable(
ctx context.Context,
logger log.Logger,
request *api_service_protos.TDescribeTableRequest,
@@ -67,22 +66,11 @@ func (h *handlerImpl[CONN]) DescribeTable(
return &api_service_protos.TDescribeTableResponse{Schema: schema}, nil
}
-func (h *handlerImpl[CONN]) ReadSplit(
- ctx context.Context,
+func (h *handlerImpl) makeReadSplitQuery(
logger log.Logger,
- dataSourceInstance *api_common.TDataSourceInstance,
split *api_service_protos.TSplit,
- pagingWriter paging.Writer,
-) error {
- conn, err := h.connectionManager.Make(ctx, logger, dataSourceInstance)
- if err != nil {
- return fmt.Errorf("make connection: %w", err)
- }
-
- defer h.connectionManager.Release(logger, conn)
-
+) (string, error) {
// SELECT $columns
-
// interpolate request
var sb strings.Builder
@@ -90,7 +78,7 @@ func (h *handlerImpl[CONN]) ReadSplit(
columns, err := utils.SelectWhatToYDBColumns(split.Select.What)
if err != nil {
- return fmt.Errorf("convert Select.What.Items to Ydb.Columns: %w", err)
+ return "", fmt.Errorf("convert Select.What.Items to Ydb.Columns: %w", err)
}
// for the case of empty column set select some constant for constructing a valid sql statement
@@ -109,7 +97,7 @@ func (h *handlerImpl[CONN]) ReadSplit(
// SELECT $columns FROM $from
tableName := split.GetSelect().GetFrom().GetTable()
if tableName == "" {
- return fmt.Errorf("empty table name")
+ return "", fmt.Errorf("empty table name")
}
sb.WriteString(" FROM ")
@@ -127,7 +115,26 @@ func (h *handlerImpl[CONN]) ReadSplit(
// execute query
- query := sb.String()
+ return sb.String(), nil
+}
+
+func (h *handlerImpl) doReadSplit(
+ ctx context.Context,
+ logger log.Logger,
+ split *api_service_protos.TSplit,
+ sink paging.Sink,
+) error {
+ query, err := h.makeReadSplitQuery(logger, split)
+ if err != nil {
+ return fmt.Errorf("make read split query: %w", err)
+ }
+
+ conn, err := h.connectionManager.Make(ctx, logger, split.Select.DataSourceInstance)
+ if err != nil {
+ return fmt.Errorf("make connection: %w", err)
+ }
+
+ defer h.connectionManager.Release(logger, conn)
rows, err := conn.Query(ctx, query)
if err != nil {
@@ -146,7 +153,7 @@ func (h *handlerImpl[CONN]) ReadSplit(
return fmt.Errorf("rows scan error: %w", err)
}
- if err := pagingWriter.AddRow(acceptors); err != nil {
+ if err := sink.AddRow(acceptors); err != nil {
return fmt.Errorf("add row to paging writer: %w", err)
}
}
@@ -158,13 +165,27 @@ func (h *handlerImpl[CONN]) ReadSplit(
return nil
}
-func (h *handlerImpl[CONN]) TypeMapper() utils.TypeMapper { return h.typeMapper }
+func (h *handlerImpl) ReadSplit(
+ ctx context.Context,
+ logger log.Logger,
+ split *api_service_protos.TSplit,
+ sink paging.Sink,
+) {
+ err := h.doReadSplit(ctx, logger, split, sink)
+ if err != nil {
+ sink.AddError(err)
+ }
+
+ sink.Finish()
+}
+
+func (h *handlerImpl) TypeMapper() utils.TypeMapper { return h.typeMapper }
-func newHandler[CONN utils.Connection](
+func newHandler(
logger log.Logger,
- preset *preset[CONN],
+ preset *handlerPreset,
) Handler {
- return &handlerImpl[CONN]{
+ return &handlerImpl{
logger: logger,
queryBuilder: preset.queryExecutor,
connectionManager: preset.connectionManager,
diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler_factory.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler_factory.go
index 6adca6351e..2b4aa105ba 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler_factory.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler_factory.go
@@ -10,43 +10,43 @@ import (
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
)
-type preset[CONN utils.Connection] struct {
- queryExecutor utils.QueryExecutor[CONN]
- connectionManager utils.ConnectionManager[CONN]
+type handlerPreset struct {
+ queryExecutor utils.QueryExecutor
+ connectionManager utils.ConnectionManager
typeMapper utils.TypeMapper
}
-type HandlerFactory struct {
- clickhouse preset[*clickhouse.Connection]
- postgresql preset[*postgresql.Connection]
+type handlerFactoryImpl struct {
+ clickhouse handlerPreset
+ postgresql handlerPreset
}
-func (hf *HandlerFactory) Make(
+func (hf *handlerFactoryImpl) Make(
logger log.Logger,
dataSourceType api_common.EDataSourceKind,
) (Handler, error) {
switch dataSourceType {
case api_common.EDataSourceKind_CLICKHOUSE:
- return newHandler[*clickhouse.Connection](logger, &hf.clickhouse), nil
+ return newHandler(logger, &hf.clickhouse), nil
case api_common.EDataSourceKind_POSTGRESQL:
- return newHandler[*postgresql.Connection](logger, &hf.postgresql), nil
+ return newHandler(logger, &hf.postgresql), nil
default:
return nil, fmt.Errorf("pick handler for data source type '%v': %w", dataSourceType, utils.ErrDataSourceNotSupported)
}
}
-func NewHandlerFactory(qlf utils.QueryLoggerFactory) *HandlerFactory {
+func NewHandlerFactory(qlf utils.QueryLoggerFactory) HandlerFactory {
connManagerCfg := utils.ConnectionManagerBase{
QueryLoggerFactory: qlf,
}
- return &HandlerFactory{
- clickhouse: preset[*clickhouse.Connection]{
+ return &handlerFactoryImpl{
+ clickhouse: handlerPreset{
queryExecutor: clickhouse.NewQueryExecutor(),
connectionManager: clickhouse.NewConnectionManager(connManagerCfg),
typeMapper: clickhouse.NewTypeMapper(),
},
- postgresql: preset[*postgresql.Connection]{
+ postgresql: handlerPreset{
queryExecutor: postgresql.NewQueryExecutor(),
connectionManager: postgresql.NewConnectionManager(connManagerCfg),
typeMapper: postgresql.NewTypeMapper(),
diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/interface.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/interface.go
index b9177d1867..dd79021b81 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/interface.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/interface.go
@@ -20,10 +20,16 @@ type Handler interface {
ReadSplit(
ctx context.Context,
logger log.Logger,
- dataSourceInstance *api_common.TDataSourceInstance,
split *api_service_protos.TSplit,
- pagingWriter paging.Writer,
- ) error
+ sink paging.Sink,
+ )
TypeMapper() utils.TypeMapper
}
+
+type HandlerFactory interface {
+ Make(
+ logger log.Logger,
+ dataSourceType api_common.EDataSourceKind,
+ ) (Handler, error)
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/mock.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/mock.go
index 369db5db95..28dbe58fe6 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/mock.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/mock.go
@@ -15,7 +15,6 @@ var _ Handler = (*HandlerMock)(nil)
type HandlerMock struct {
mock.Mock
- ReadFinished chan struct{} // close this channel to allow
}
func (m *HandlerMock) DescribeTable(
@@ -29,17 +28,33 @@ func (m *HandlerMock) DescribeTable(
func (m *HandlerMock) ReadSplit(
ctx context.Context,
logger log.Logger,
- dataSourceInstance *api_common.TDataSourceInstance,
split *api_service_protos.TSplit,
- pagingWriter paging.Writer,
-) error {
- <-m.ReadFinished
-
- args := m.Called(dataSourceInstance, split, pagingWriter)
-
- return args.Error(0)
+ pagingWriter paging.Sink,
+) {
+ m.Called(split, pagingWriter)
}
func (m *HandlerMock) TypeMapper() utils.TypeMapper {
panic("not implemented") // TODO: Implement
}
+
+var _ HandlerFactory = (*HandlerFactoryMock)(nil)
+
+type HandlerFactoryMock struct {
+ QueryExecutor utils.QueryExecutor
+ ConnectionManager utils.ConnectionManager
+ TypeMapper utils.TypeMapper
+}
+
+func (m *HandlerFactoryMock) Make(logger log.Logger, dataSourceType api_common.EDataSourceKind) (Handler, error) {
+ handler := newHandler(
+ logger,
+ &handlerPreset{
+ queryExecutor: m.QueryExecutor,
+ connectionManager: m.ConnectionManager,
+ typeMapper: m.TypeMapper,
+ },
+ )
+
+ return handler, nil
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/service_connector.go b/ydb/library/yql/providers/generic/connector/app/server/service_connector.go
index 29d1fe1726..54ac7a1aa4 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/service_connector.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/service_connector.go
@@ -19,11 +19,12 @@ import (
api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/reflection"
)
type serviceConnector struct {
api_service.UnimplementedConnectorServer
- handlerFactory *rdbms.HandlerFactory
+ handlerFactory rdbms.HandlerFactory
memoryAllocator memory.Allocator
readLimiterFactory *paging.ReadLimiterFactory
cfg *config.TServerConfig
@@ -183,14 +184,30 @@ func (s *serviceConnector) doReadSplits(
for i, split := range request.Splits {
columnarBufferFactory, err := paging.NewColumnarBufferFactory(
- logger, s.memoryAllocator, s.readLimiterFactory, request.Format, split.Select.What, handler.TypeMapper())
+ logger,
+ s.memoryAllocator,
+ s.readLimiterFactory,
+ request.Format,
+ split.Select.What,
+ handler.TypeMapper())
if err != nil {
return 0, fmt.Errorf("new columnar buffer factory: %w", err)
}
- pagingWriterFactory := paging.NewWriterFactory(columnarBufferFactory)
+ // TODO: use configs
+ const (
+ resultQueueCapacity = 10
+ rowsPerBuffer = 10000
+ )
- bytesInSplit, err := s.readSplit(logger, stream, request, split, pagingWriterFactory, handler)
+ sinkFactory := paging.NewSinkFactory(columnarBufferFactory, resultQueueCapacity, rowsPerBuffer)
+
+ sink, err := sinkFactory.MakeSink(stream.Context(), logger, request.Pagination)
+ if err != nil {
+ return 0, fmt.Errorf("new sink: %w", err)
+ }
+
+ bytesInSplit, err := s.readSplit(logger, stream, request, split, sink, handler)
if err != nil {
return 0, fmt.Errorf("read split %d: %w", i, err)
}
@@ -206,22 +223,19 @@ func (s *serviceConnector) readSplit(
stream api_service.Connector_ReadSplitsServer,
request *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
- pagingWriterFactory paging.WriterFactory,
+ sink paging.Sink,
handler rdbms.Handler,
) (uint64, error) {
logger.Debug("split reading started")
- streamer, err := streaming.NewStreamer(
+ streamer := streaming.NewStreamer(
logger,
stream,
request,
split,
- pagingWriterFactory,
+ sink,
handler,
)
- if err != nil {
- return 0, fmt.Errorf("new streamer: %w", err)
- }
totalBytesSent, err := streamer.Run()
if err != nil {
@@ -318,6 +332,7 @@ func newServiceConnector(
}
grpcServer := grpc.NewServer(options...)
+ reflection.Register(grpcServer)
s := &serviceConnector{
handlerFactory: rdbms.NewHandlerFactory(queryLoggerFactory),
diff --git a/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer.go b/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer.go
index c3d4db278c..5842dcf892 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer.go
@@ -1,6 +1,7 @@
package streaming
import (
+ "context"
"fmt"
"sync"
@@ -17,33 +18,32 @@ type Streamer struct {
request *api_service_protos.TReadSplitsRequest
handler rdbms.Handler
split *api_service_protos.TSplit
- pagingWriter paging.Writer
+ sink paging.Sink
totalBytesSent uint64 // TODO: replace with stats accumulator
logger log.Logger
+ ctx context.Context // clone of a stream context
+ cancel context.CancelFunc
}
-func (s *Streamer) writeDataToStream(readErrChan <-chan error) error {
+func (s *Streamer) writeDataToStream() error {
+ // exit from this function will cause publisher's goroutine termination as well
+ defer s.cancel()
+
for {
select {
- case buffer, ok := <-s.pagingWriter.BufferQueue():
+ case result, ok := <-s.sink.ResultQueue():
if !ok {
// correct termination
return nil
}
- // handle next data block
- if err := s.sendBufferToStream(buffer); err != nil {
- return fmt.Errorf("send buffer to stream: %w", err)
- }
- case err := <-readErrChan:
- // terminate loop in case of read error
- if err != nil {
- return fmt.Errorf("read error: %w", err)
+ if result.Error != nil {
+ return fmt.Errorf("read result: %w", result.Error)
}
- // otherwise drain the last rows left in writer into a channel
- if err := s.pagingWriter.Finish(); err != nil {
- return fmt.Errorf("finish paging writer: %w", err)
+ // handle next data block
+ if err := s.sendBufferToStream(result.ColumnarBuffer); err != nil {
+ return fmt.Errorf("send buffer to stream: %w", err)
}
case <-s.stream.Context().Done():
// handle request termination
@@ -73,44 +73,22 @@ func (s *Streamer) sendBufferToStream(buffer paging.ColumnarBuffer) error {
return nil
}
-func (s *Streamer) readDataFromSource() error {
- // run blocking read
- err := s.handler.ReadSplit(
- s.stream.Context(),
- s.logger,
- s.request.GetDataSourceInstance(),
- s.split,
- s.pagingWriter,
- )
-
- if err != nil {
- return fmt.Errorf("read split: %w", err)
- }
-
- return nil
-}
-
func (s *Streamer) Run() (uint64, error) {
- readErrChan := make(chan error)
-
wg := &sync.WaitGroup{}
wg.Add(1)
defer wg.Wait()
// launch read from the data source;
- // reading goroutine controls writing goroutine lifetime
+ // subsriber goroutine controls publisher goroutine lifetime
go func() {
defer wg.Done()
- select {
- case readErrChan <- s.readDataFromSource():
- case <-s.stream.Context().Done():
- }
+ s.handler.ReadSplit(s.ctx, s.logger, s.split, s.sink)
}()
// pass received blocks into the GRPC channel
- if err := s.writeDataToStream(readErrChan); err != nil {
+ if err := s.writeDataToStream(); err != nil {
return s.totalBytesSent, fmt.Errorf("write data to stream: %w", err)
}
@@ -122,24 +100,19 @@ func NewStreamer(
stream api_service.Connector_ReadSplitsServer,
request *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
- pagingWriterFactory paging.WriterFactory,
+ sink paging.Sink,
handler rdbms.Handler,
-) (*Streamer, error) {
- pagingWriter, err := pagingWriterFactory.MakeWriter(
- stream.Context(),
- logger,
- request.GetPagination(),
- )
- if err != nil {
- return nil, fmt.Errorf("new paging writer: %w", err)
- }
+) *Streamer {
+ ctx, cancel := context.WithCancel(stream.Context())
return &Streamer{
- logger: logger,
- stream: stream,
- split: split,
- request: request,
- handler: handler,
- pagingWriter: pagingWriter,
- }, nil
+ logger: logger,
+ stream: stream,
+ split: split,
+ request: request,
+ handler: handler,
+ sink: sink,
+ ctx: ctx,
+ cancel: cancel,
+ }
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer_test.go b/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer_test.go
index e1852190c9..51152171bd 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer_test.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer_test.go
@@ -1,13 +1,21 @@
package streaming
import (
+ "bytes"
"context"
"errors"
"fmt"
"testing"
+ "github.com/apache/arrow/go/v13/arrow"
+ "github.com/apache/arrow/go/v13/arrow/array"
+ "github.com/apache/arrow/go/v13/arrow/ipc"
+ "github.com/apache/arrow/go/v13/arrow/memory"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
+ "github.com/ydb-platform/ydb/library/go/core/log"
+ api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/clickhouse"
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/paging"
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/rdbms"
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
@@ -20,7 +28,7 @@ var _ api_service.Connector_ReadSplitsServer = (*streamMock)(nil)
type streamMock struct {
mock.Mock
api_service.Connector_ReadSplitsServer
- sendTriggeredChan chan struct{}
+ logger log.Logger
}
func (m *streamMock) Context() context.Context {
@@ -32,228 +40,268 @@ func (m *streamMock) Context() context.Context {
func (m *streamMock) Send(response *api_service_protos.TReadSplitsResponse) error {
args := m.Called(response)
- if m.sendTriggeredChan != nil {
- close(m.sendTriggeredChan)
- }
-
return args.Error(0)
}
-func TestStreaming(t *testing.T) {
- logger := utils.NewTestLogger(t)
+func (m *streamMock) makeSendMatcher(
+ t *testing.T,
+ split *api_service_protos.TSplit,
+ expectedColumnarBlock [][]any,
+ expectedRowCount int,
+) func(response *api_service_protos.TReadSplitsResponse) bool {
+ return func(response *api_service_protos.TReadSplitsResponse) bool {
+ buf := bytes.NewBuffer(response.GetArrowIpcStreaming())
- request := &api_service_protos.TReadSplitsRequest{}
- split := &api_service_protos.TSplit{}
+ reader, err := ipc.NewReader(buf)
+ require.NoError(t, err)
- t.Run("positive", func(t *testing.T) {
- ctx := context.Background()
- stream := &streamMock{}
- stream.On("Context").Return(ctx)
+ for reader.Next() {
+ record := reader.Record()
- writer := &paging.WriterMock{
- ColumnarBufferChan: make(chan paging.ColumnarBuffer),
- }
+ require.Equal(t, len(split.Select.What.Items), len(record.Columns()))
- writerFactory := &paging.WriterFactoryMock{}
- writerFactory.On("MakeWriter", request.GetPagination()).Return(writer, nil)
+ if record.NumRows() != int64(expectedRowCount) {
+ return false
+ }
- handler := &rdbms.HandlerMock{
- ReadFinished: make(chan struct{}),
- }
+ col0 := record.Column(0).(*array.Int32)
+ require.Equal(t, &arrow.Int32Type{}, col0.DataType())
- // populate channel with predefined data
- const (
- pageSize = 1 << 10
- totalPages = 3
- )
+ for i := 0; i < len(expectedColumnarBlock[0]); i++ {
+ if expectedColumnarBlock[0][i] != col0.Value(i) {
+ return false
+ }
+ }
- preparedColumnarBuffers := []paging.ColumnarBuffer{}
+ // FIXME: YQ-2590: String -> Binary
+ col1 := record.Column(1).(*array.String)
+ require.Equal(t, &arrow.StringType{}, col1.DataType())
- for i := 0; i < totalPages; i++ {
- cb := &paging.ColumnarBufferMock{}
- response := &api_service_protos.TReadSplitsResponse{
- Payload: &api_service_protos.TReadSplitsResponse_ArrowIpcStreaming{
- ArrowIpcStreaming: make([]byte, pageSize),
- },
+ for i := 0; i < len(expectedColumnarBlock[1]); i++ {
+ if expectedColumnarBlock[1][i].(string) != col1.Value(i) {
+ return false
+ }
}
- cb.On("ToResponse").Return(response, nil).Once()
- stream.On("Send", response).Return(nil).Once()
- cb.On("Release").Return().Once()
-
- preparedColumnarBuffers = append(preparedColumnarBuffers, cb)
}
- go func() {
- // inject buffers into queue
- for _, cb := range preparedColumnarBuffers {
- writer.ColumnarBufferChan <- cb
- }
+ reader.Release()
- // let handler return
- close(handler.ReadFinished)
- }()
+ return true
+ }
+}
- // read is succesfull
- readFinished := handler.
- On("ReadSplit", request.GetDataSourceInstance(), split, writer).
- Return(nil).Once()
+type testCaseStreaming struct {
+ src [][]any
+ rowsPerBlock int
+ bufferQueueCapacity int
+ scanErr error
+ sendErr error
+}
- writer.On("Finish").Return(nil).NotBefore(readFinished).Once()
+func (tc testCaseStreaming) name() string {
+ return fmt.Sprintf(
+ "totalRows_%d_rowsPerBlock_%d_bufferQueueCapacity_%d",
+ len(tc.src), tc.rowsPerBlock, tc.bufferQueueCapacity)
+}
- streamer, err := NewStreamer(
- logger,
- stream,
- request,
- split,
- writerFactory,
- handler,
- )
+func (tc testCaseStreaming) messageParams() (sentMessages, rowsInLastMessage int) {
+ modulo := len(tc.src) % tc.rowsPerBlock
- require.NoError(t, err)
- require.NotNil(t, streamer)
+ if modulo == 0 {
+ sentMessages = len(tc.src) / tc.rowsPerBlock
+ rowsInLastMessage = tc.rowsPerBlock
+ } else {
+ sentMessages = len(tc.src)/tc.rowsPerBlock + 1
+ rowsInLastMessage = modulo
+ }
- totalBytes, err := streamer.Run()
- require.NoError(t, err)
- require.Equal(t, totalBytes, uint64(pageSize*totalPages))
+ if tc.scanErr != nil {
+ sentMessages--
+ rowsInLastMessage = tc.rowsPerBlock
+ }
- mocks := []interface{}{stream, writer, writerFactory, handler}
- for _, cb := range preparedColumnarBuffers {
- mocks = append(mocks, cb)
- }
+ return
+}
- mock.AssertExpectationsForObjects(t, mocks...)
- })
+func (tc testCaseStreaming) execute(t *testing.T) {
+ logger := utils.NewTestLogger(t)
+ request := &api_service_protos.TReadSplitsRequest{}
+ split := utils.MakeTestSplit()
- t.Run("handler read splits error", func(t *testing.T) {
- ctx := context.Background()
- stream := &streamMock{}
- stream.On("Context").Return(ctx)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
- writer := &paging.WriterMock{
- ColumnarBufferChan: make(chan paging.ColumnarBuffer),
- }
+ stream := &streamMock{logger: logger}
- writerFactory := &paging.WriterFactoryMock{}
- writerFactory.On("MakeWriter", request.GetPagination()).Return(writer, nil)
+ stream.On("Context").Return(ctx)
- handler := &rdbms.HandlerMock{
- ReadFinished: make(chan struct{}),
- }
+ connection := &utils.ConnectionMock{}
+
+ connectionManager := &utils.ConnectionManagerMock{}
+ connectionManager.On("Make", split.Select.DataSourceInstance).Return(connection, nil).Once()
+ connectionManager.On("Release", connection).Return().Once()
- // populate channel with predefined data
- const pageSize = 1 << 10
+ rows := &utils.RowsMock{
+ PredefinedData: tc.src,
+ }
+ connection.On("Query", "SELECT col0, col1 FROM example_1").Return(rows, nil).Once()
+
+ // FIXME: YQ-2590: string -> []byte
+ col0Acceptor := new(*int32)
+ *col0Acceptor = new(int32)
+ col1Acceptor := new(*string)
+ *col1Acceptor = new(string)
+
+ acceptors := []any{col0Acceptor, col1Acceptor}
+
+ if tc.scanErr == nil {
+ rows.On("MakeAcceptors").Return(acceptors, nil).Once()
+ rows.On("Next").Return(true).Times(len(rows.PredefinedData))
+ rows.On("Next").Return(false).Once()
+ rows.On("Scan", acceptors...).Return(nil).Times(len(rows.PredefinedData))
+ rows.On("Err").Return(nil).Once()
+ rows.On("Close").Return(nil).Once()
+ } else {
+ rows.On("MakeAcceptors").Return(acceptors, nil).Once()
+ rows.On("Next").Return(true).Times(len(rows.PredefinedData) + 1)
+ rows.On("Scan", acceptors...).Return(nil).Times(len(rows.PredefinedData) - 1)
+ // instead of the last message, an error occurs
+ rows.On("Scan", acceptors...).Return(tc.scanErr).Once()
+ rows.On("Err").Return(nil).Once()
+ rows.On("Close").Return(nil).Once()
+ }
+
+ totalMessages, rowsInLastMessage := tc.messageParams()
+
+ expectedColumnarBlocks := utils.DataConverter{}.RowsToColumnBlocks(rows.PredefinedData, tc.rowsPerBlock)
+
+ if tc.sendErr == nil {
+ for sendCallID := 0; sendCallID < totalMessages; sendCallID++ {
+ expectedColumnarBlock := expectedColumnarBlocks[sendCallID]
+
+ rowsInMessage := tc.rowsPerBlock
+ if sendCallID == totalMessages-1 {
+ rowsInMessage = rowsInLastMessage
+ }
+
+ matcher := stream.makeSendMatcher(t, split, expectedColumnarBlock, rowsInMessage)
- cb := &paging.ColumnarBufferMock{}
- response1 := &api_service_protos.TReadSplitsResponse{
- Payload: &api_service_protos.TReadSplitsResponse_ArrowIpcStreaming{
- ArrowIpcStreaming: make([]byte, pageSize),
- },
+ stream.On("Send", mock.MatchedBy(matcher)).Return(nil).Once()
}
+ } else {
+ // the first attempt to send response is failed
+ stream.On("Send", mock.MatchedBy(func(response *api_service_protos.TReadSplitsResponse) bool {
+ cancel() // emulate real behavior of GRPC
- cb.On("ToResponse").Return(response1, nil).Once()
- stream.On("Send", response1).Return(nil).Once()
- cb.On("Release").Return().NotBefore().Once()
+ return true
+ })).Return(tc.sendErr).Once()
+ }
+
+ typeMapper := clickhouse.NewTypeMapper()
+
+ handlerFactory := &rdbms.HandlerFactoryMock{
+ ConnectionManager: connectionManager,
+ TypeMapper: typeMapper,
+ }
- go func() {
- // after first received block an error occurs
- writer.ColumnarBufferChan <- cb
+ handler, err := handlerFactory.Make(logger, api_common.EDataSourceKind_CLICKHOUSE)
+ require.NoError(t, err)
- close(handler.ReadFinished)
- }()
+ cbf, err := paging.NewColumnarBufferFactory(
+ logger,
+ memory.NewGoAllocator(),
+ paging.NewReadLimiterFactory(nil),
+ api_service_protos.TReadSplitsRequest_ARROW_IPC_STREAMING,
+ split.Select.What,
+ typeMapper)
+ require.NoError(t, err)
- // reading from data source returned an error
- readErr := fmt.Errorf("failed to read from data source")
- handler.
- On("ReadSplit", request.GetDataSourceInstance(), split, writer).
- Return(readErr).Once()
+ sink, err := paging.NewSinkFactory(cbf, tc.bufferQueueCapacity, tc.rowsPerBlock).MakeSink(ctx, logger, nil)
+ require.NoError(t, err)
- streamer, err := NewStreamer(
- logger,
- stream,
- request,
- split,
- writerFactory,
- handler,
- )
+ streamer := NewStreamer(logger, stream, request, split, sink, handler)
+ _, err = streamer.Run()
+
+ switch {
+ case tc.scanErr != nil:
+ require.True(t, errors.Is(err, tc.scanErr))
+ case tc.sendErr != nil:
+ require.True(t, errors.Is(err, tc.sendErr))
+ default:
require.NoError(t, err)
- require.NotNil(t, streamer)
+ }
- totalBytes, err := streamer.Run()
- require.Error(t, err)
- require.True(t, errors.Is(err, readErr))
- require.Equal(t, totalBytes, uint64(pageSize))
+ mocks := []interface{}{stream, connectionManager, connection}
- mock.AssertExpectationsForObjects(t, stream, writer, writerFactory, handler, cb)
- })
+ mock.AssertExpectationsForObjects(t, mocks...)
+}
- t.Run("grpc stream send error", func(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- stream := &streamMock{
- sendTriggeredChan: make(chan struct{}),
- }
- stream.On("Context").Return(ctx)
+func TestStreaming(t *testing.T) {
+ srcValues := [][][]any{
+ {
+ {int32(1), "a"},
+ {int32(2), "b"},
+ {int32(3), "c"},
+ {int32(4), "d"},
+ },
+ {
+ {int32(1), "a"},
+ {int32(2), "b"},
+ {int32(3), "c"},
+ {int32(4), "d"},
+ {int32(5), "e"},
+ },
+ }
+ rowsPerBlockValues := []int{2}
+ bufferQueueCapacityValues := []int{0, 1, 10}
- writer := &paging.WriterMock{
- ColumnarBufferChan: make(chan paging.ColumnarBuffer),
- }
+ var testCases []testCaseStreaming
- writerFactory := &paging.WriterFactoryMock{}
- writerFactory.On("MakeWriter", request.GetPagination()).Return(writer, nil)
+ for _, src := range srcValues {
+ for _, rowsPerBlock := range rowsPerBlockValues {
+ for _, bufferQueueCapacity := range bufferQueueCapacityValues {
+ tc := testCaseStreaming{
+ src: src,
+ rowsPerBlock: rowsPerBlock,
+ bufferQueueCapacity: bufferQueueCapacity,
+ }
- handler := &rdbms.HandlerMock{
- ReadFinished: stream.sendTriggeredChan,
+ testCases = append(testCases, tc)
+ }
}
+ }
- // populate channel with predefined data
- const pageSize = 1 << 10
-
- cb := &paging.ColumnarBufferMock{}
- response1 := &api_service_protos.TReadSplitsResponse{
- Payload: &api_service_protos.TReadSplitsResponse_ArrowIpcStreaming{
- ArrowIpcStreaming: make([]byte, pageSize),
- },
+ t.Run("positive", func(t *testing.T) {
+ for _, tc := range testCases {
+ tc := tc
+ t.Run(tc.name(), func(t *testing.T) {
+ tc.execute(t)
+ })
}
+ })
- cb.On("ToResponse").Return(response1, nil).Once()
-
- // network error occures when trying to send first page to the stream
- sendErr := fmt.Errorf("GRPC error")
- send1 := stream.On("Send", response1).Return(sendErr).Once()
- cb.On("Release").Return().Once()
-
- go func() {
- // populate incoming queue with data
- writer.ColumnarBufferChan <- cb
- }()
-
- go func() {
- // trigger context cancellation after send error occurs
- <-stream.sendTriggeredChan
- cancel()
- }()
-
- handler.
- On("ReadSplit", request.GetDataSourceInstance(), split, writer).
- Return(nil).NotBefore(send1).Once()
-
- streamer, err := NewStreamer(
- logger,
- stream,
- request,
- split,
- writerFactory,
- handler,
- )
+ t.Run("scan error", func(t *testing.T) {
+ scanErr := fmt.Errorf("scan error")
- require.NoError(t, err)
- require.NotNil(t, streamer)
+ for _, tc := range testCases {
+ tc := tc
+ tc.scanErr = scanErr
+ t.Run(tc.name(), func(t *testing.T) {
+ tc.execute(t)
+ })
+ }
+ })
- totalBytes, err := streamer.Run()
- require.Error(t, err)
- require.True(t, errors.Is(err, sendErr))
- require.Equal(t, totalBytes, uint64(0))
+ t.Run("send error", func(t *testing.T) {
+ sendErr := fmt.Errorf("stream send error")
- mock.AssertExpectationsForObjects(t, stream, writer, writerFactory, handler, cb)
+ for _, tc := range testCases {
+ tc := tc
+ tc.sendErr = sendErr
+ t.Run(tc.name(), func(t *testing.T) {
+ tc.execute(t)
+ })
+ }
})
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/arrow_helpers.go b/ydb/library/yql/providers/generic/connector/app/server/utils/arrow_helpers.go
index 63a42cd401..ddda5f60c1 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/utils/arrow_helpers.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/arrow_helpers.go
@@ -115,6 +115,10 @@ func ydbTypeToArrowBuilder(typeID Ydb.Type_PrimitiveTypeId, arrowAllocator memor
case Ydb.Type_DOUBLE:
builder = array.NewFloat64Builder(arrowAllocator)
case Ydb.Type_STRING:
+ // FIXME: YQ-2590: map it to arrow.BinaryTypes.Binary
+ // TODO: what about LargeBinary?
+ // https://arrow.apache.org/docs/cpp/api/datatype.html#_CPPv4N5arrow4Type4type12LARGE_BINARYE
+ // builder = array.NewBinaryBuilder(arrowAllocator, arrow.BinaryTypes.Binary)
builder = array.NewStringBuilder(arrowAllocator)
case Ydb.Type_UTF8:
// TODO: what about LargeString?
@@ -162,6 +166,10 @@ func ydbTypeToArrowField(typeID Ydb.Type_PrimitiveTypeId, column *Ydb.Column) (a
case Ydb.Type_DOUBLE:
field = arrow.Field{Name: column.Name, Type: arrow.PrimitiveTypes.Float64}
case Ydb.Type_STRING:
+ // FIXME: YQ-2590: map it to arrow.BinaryTypes.Binary
+ // TODO: what about LargeBinary?
+ // https://arrow.apache.org/docs/cpp/api/datatype.html#_CPPv4N5arrow4Type4type12LARGE_BINARYE
+ // field = arrow.Field{Name: column.Name, Type: arrow.BinaryTypes.Binary}
field = arrow.Field{Name: column.Name, Type: arrow.BinaryTypes.String}
case Ydb.Type_UTF8:
// TODO: what about LargeString?
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/errors.go b/ydb/library/yql/providers/generic/connector/app/server/utils/errors.go
index 08d114ebaf..a7517a193e 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/utils/errors.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/errors.go
@@ -21,6 +21,7 @@ var (
ErrUnimplementedExpression = fmt.Errorf("unimplemented expression")
ErrUnimplementedOperation = fmt.Errorf("unimplemented operation")
ErrUnimplementedPredicateType = fmt.Errorf("unimplemented predicate type")
+ ErrInvariantViolation = fmt.Errorf("implementation error (invariant violation)")
)
func NewSuccess() *api_service_protos.TError {
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/query_builder.go b/ydb/library/yql/providers/generic/connector/app/server/utils/query_builder.go
deleted file mode 100644
index 5e8fe2a0be..0000000000
--- a/ydb/library/yql/providers/generic/connector/app/server/utils/query_builder.go
+++ /dev/null
@@ -1,11 +0,0 @@
-package utils
-
-import (
- "context"
-
- api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
-)
-
-type QueryExecutor[CONN Connection] interface {
- DescribeTable(ctx context.Context, conn CONN, request *api_service_protos.TDescribeTableRequest) (Rows, error)
-}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/connection_manager.go b/ydb/library/yql/providers/generic/connector/app/server/utils/sql.go
index 624fe6984d..578602a650 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/utils/connection_manager.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/sql.go
@@ -5,6 +5,7 @@ import (
"github.com/ydb-platform/ydb/library/go/core/log"
api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common"
+ api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
)
type Connection interface {
@@ -20,11 +21,15 @@ type Rows interface {
MakeAcceptors() ([]any, error)
}
-type ConnectionManager[CONN any] interface {
- Make(ctx context.Context, logger log.Logger, dataSourceInstance *api_common.TDataSourceInstance) (CONN, error)
- Release(logger log.Logger, conn CONN)
+type ConnectionManager interface {
+ Make(ctx context.Context, logger log.Logger, dataSourceInstance *api_common.TDataSourceInstance) (Connection, error)
+ Release(logger log.Logger, connection Connection)
}
type ConnectionManagerBase struct {
QueryLoggerFactory QueryLoggerFactory
}
+
+type QueryExecutor interface {
+ DescribeTable(ctx context.Context, conn Connection, request *api_service_protos.TDescribeTableRequest) (Rows, error)
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/sql_mock.go b/ydb/library/yql/providers/generic/connector/app/server/utils/sql_mock.go
new file mode 100644
index 0000000000..7d9fd27800
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/sql_mock.go
@@ -0,0 +1,90 @@
+package utils
+
+import (
+ "context"
+
+ "github.com/stretchr/testify/mock"
+ "github.com/ydb-platform/ydb/library/go/core/log"
+ api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common"
+)
+
+var _ Connection = (*ConnectionMock)(nil)
+
+type ConnectionMock struct {
+ mock.Mock
+}
+
+func (m *ConnectionMock) Query(ctx context.Context, query string, params ...any) (Rows, error) {
+ called := []any{query}
+ called = append(called, params...)
+ args := m.Called(called...)
+
+ return args.Get(0).(Rows), args.Error(1)
+}
+
+func (m *ConnectionMock) Close() error {
+ return m.Called().Error(0)
+}
+
+type ConnectionManagerMock struct {
+ mock.Mock
+}
+
+func (m *ConnectionManagerMock) Make(
+ ctx context.Context,
+ logger log.Logger,
+ dataSourceInstance *api_common.TDataSourceInstance) (Connection, error) {
+ args := m.Called(dataSourceInstance)
+
+ return args.Get(0).(Connection), args.Error(1)
+}
+
+func (m *ConnectionManagerMock) Release(logger log.Logger, conn Connection) {
+ m.Called(conn)
+}
+
+var _ Rows = (*RowsMock)(nil)
+
+type RowsMock struct {
+ mock.Mock
+ PredefinedData [][]any
+ scanCalls int
+}
+
+func (m *RowsMock) Close() error {
+ return m.Called().Error(0)
+}
+
+func (m *RowsMock) Err() error {
+ return m.Called().Error(0)
+}
+
+func (m *RowsMock) Next() bool {
+ return m.Called().Bool(0)
+}
+
+func (m *RowsMock) Scan(dest ...any) error {
+ args := m.Called(dest...)
+
+ // mutate acceptors by reference
+ row := m.PredefinedData[m.scanCalls]
+
+ for i, d := range dest {
+ switch t := d.(type) {
+ case **int32:
+ **t = row[i].(int32)
+ case **string:
+ **t = row[i].(string)
+ }
+ }
+
+ m.scanCalls++
+
+ return args.Error(0)
+}
+
+func (m *RowsMock) MakeAcceptors() ([]any, error) {
+ args := m.Called()
+
+ return args.Get(0).([]any), args.Error(1)
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/time_test.go b/ydb/library/yql/providers/generic/connector/app/server/utils/time_test.go
new file mode 100644
index 0000000000..aac6cb8c9f
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/time_test.go
@@ -0,0 +1,147 @@
+package utils
+
+import (
+ "errors"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestTimeToYDBDate(t *testing.T) {
+ type testCase struct {
+ input time.Time
+ output uint16
+ err error
+ }
+
+ tcs := []testCase{
+ {
+ input: time.Date(1970, 01, 01, 00, 00, 00, 00, time.UTC),
+ output: 0,
+ err: nil,
+ },
+ {
+ input: time.Date(1970, 01, 02, 00, 00, 00, 00, time.UTC),
+ output: 1,
+ err: nil,
+ },
+ {
+ input: time.Date(1969, 12, 31, 23, 59, 00, 00, time.UTC),
+ output: 0,
+ err: ErrValueOutOfTypeBounds,
+ },
+ {
+ input: time.Date(9999, 01, 01, 00, 00, 00, 00, time.UTC),
+ output: 0,
+ err: ErrValueOutOfTypeBounds,
+ },
+ }
+
+ for _, tc := range tcs {
+ tc := tc
+
+ t.Run(tc.input.String(), func(t *testing.T) {
+ output, err := TimeToYDBDate(&tc.input)
+ require.Equal(t, tc.output, output)
+
+ if tc.err != nil {
+ require.True(t, errors.Is(tc.err, ErrValueOutOfTypeBounds))
+ } else {
+ require.NoError(t, err)
+ }
+ })
+ }
+}
+
+func TestTimeToYDBDatetime(t *testing.T) {
+ type testCase struct {
+ input time.Time
+ output uint32
+ err error
+ }
+
+ tcs := []testCase{
+ {
+ input: time.Date(1970, 01, 01, 00, 00, 00, 00, time.UTC),
+ output: 0,
+ err: nil,
+ },
+ {
+ input: time.Date(1970, 01, 02, 00, 00, 00, 00, time.UTC),
+ output: 86400,
+ err: nil,
+ },
+ {
+ input: time.Date(1969, 12, 31, 23, 59, 00, 00, time.UTC),
+ output: 0,
+ err: ErrValueOutOfTypeBounds,
+ },
+ {
+ input: time.Date(9999, 01, 01, 00, 00, 00, 00, time.UTC),
+ output: 0,
+ err: ErrValueOutOfTypeBounds,
+ },
+ }
+
+ for _, tc := range tcs {
+ tc := tc
+
+ t.Run(tc.input.String(), func(t *testing.T) {
+ output, err := TimeToYDBDatetime(&tc.input)
+ require.Equal(t, tc.output, output)
+
+ if tc.err != nil {
+ require.True(t, errors.Is(tc.err, ErrValueOutOfTypeBounds))
+ } else {
+ require.NoError(t, err)
+ }
+ })
+ }
+}
+
+func TestTimeToYDBTimestamp(t *testing.T) {
+ type testCase struct {
+ input time.Time
+ output uint64
+ err error
+ }
+
+ tcs := []testCase{
+ {
+ input: time.Date(1970, 01, 01, 00, 00, 00, 00, time.UTC),
+ output: 0,
+ err: nil,
+ },
+ {
+ input: time.Date(1970, 01, 02, 00, 00, 00, 00, time.UTC),
+ output: 86400000000,
+ err: nil,
+ },
+ {
+ input: time.Date(1969, 12, 31, 23, 59, 00, 00, time.UTC),
+ output: 0,
+ err: ErrValueOutOfTypeBounds,
+ },
+ {
+ input: time.Date(29427, 01, 01, 00, 00, 00, 00, time.UTC),
+ output: 0,
+ err: ErrValueOutOfTypeBounds,
+ },
+ }
+
+ for _, tc := range tcs {
+ tc := tc
+
+ t.Run(tc.input.String(), func(t *testing.T) {
+ output, err := TimeToYDBTimestamp(&tc.input)
+ require.Equal(t, tc.output, output)
+
+ if tc.err != nil {
+ require.True(t, errors.Is(tc.err, ErrValueOutOfTypeBounds))
+ } else {
+ require.NoError(t, err)
+ }
+ })
+ }
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/type_mapper.go b/ydb/library/yql/providers/generic/connector/app/server/utils/type_mapper.go
index 72b963b2d6..fd9cd88942 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/utils/type_mapper.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/type_mapper.go
@@ -8,6 +8,7 @@ import (
type TypeMapper interface {
SQLTypeToYDBColumn(columnName, typeName string, rules *api_service_protos.TTypeMappingSettings) (*Ydb.Column, error)
+
AddRowToArrowIPCStreaming(
ydbTypes []*Ydb.Type,
acceptors []any,
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/unit_test_helpers.go b/ydb/library/yql/providers/generic/connector/app/server/utils/unit_test_helpers.go
new file mode 100644
index 0000000000..ef47473088
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/unit_test_helpers.go
@@ -0,0 +1,75 @@
+package utils
+
+import (
+ "github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
+ api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common"
+ api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
+)
+
+func MakeTestSplit() *api_service_protos.TSplit {
+ return &api_service_protos.TSplit{
+ Select: &api_service_protos.TSelect{
+ DataSourceInstance: &api_common.TDataSourceInstance{},
+ What: &api_service_protos.TSelect_TWhat{
+ Items: []*api_service_protos.TSelect_TWhat_TItem{
+ {
+ Payload: &api_service_protos.TSelect_TWhat_TItem_Column{
+ Column: &Ydb.Column{
+ Name: "col0",
+ Type: &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_INT32}},
+ },
+ },
+ },
+ {
+ Payload: &api_service_protos.TSelect_TWhat_TItem_Column{
+ Column: &Ydb.Column{
+ Name: "col1",
+ Type: &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_STRING}},
+ },
+ },
+ },
+ },
+ },
+ From: &api_service_protos.TSelect_TFrom{
+ Table: "example_1",
+ },
+ },
+ }
+}
+
+// DataConverter should be used only from unit tests
+type DataConverter struct{}
+
+func (dc DataConverter) RowsToColumnBlocks(input [][]any, rowsPerBlock int) [][][]any {
+ var (
+ totalColumns = len(input[0])
+ results [][][]any
+ )
+
+ for i := 0; i < len(input); i += rowsPerBlock {
+ start := i
+
+ end := start + rowsPerBlock
+ if end > len(input) {
+ end = len(input)
+ }
+
+ result := dc.rowGroupToColumnBlock(input, totalColumns, start, end)
+
+ results = append(results, result)
+ }
+
+ return results
+}
+
+func (dc DataConverter) rowGroupToColumnBlock(input [][]any, totalColumns, start, end int) [][]any {
+ columnarData := make([][]any, totalColumns)
+
+ for columnID := range columnarData {
+ for rowID := range input[start:end] {
+ columnarData[columnID] = append(columnarData[columnID], input[rowID+start][columnID])
+ }
+ }
+
+ return columnarData
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/unit_test_helpers_test.go b/ydb/library/yql/providers/generic/connector/app/server/utils/unit_test_helpers_test.go
new file mode 100644
index 0000000000..f08aaff5fd
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/unit_test_helpers_test.go
@@ -0,0 +1,72 @@
+package utils
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestDataConverter(t *testing.T) {
+ dc := DataConverter{}
+
+ type testCase struct {
+ src [][]any
+ dst [][][]any
+ rowsPerBlock int
+ }
+
+ testCases := []testCase{
+ {
+ src: [][]any{
+ {int32(1), "a"},
+ {int32(2), "b"},
+ {int32(3), "c"},
+ {int32(4), "d"},
+ },
+ dst: [][][]any{
+ {
+ {int32(1), int32(2)},
+ {"a", "b"},
+ },
+ {
+ {int32(3), int32(4)},
+ {"c", "d"},
+ },
+ },
+ rowsPerBlock: 2,
+ },
+ {
+ src: [][]any{
+ {int32(1), "a"},
+ {int32(2), "b"},
+ {int32(3), "c"},
+ {int32(4), "d"},
+ {int32(5), "e"},
+ },
+ dst: [][][]any{
+ {
+ {int32(1), int32(2)},
+ {"a", "b"},
+ },
+ {
+ {int32(3), int32(4)},
+ {"c", "d"},
+ },
+ {
+ {int32(5)},
+ {"e"},
+ },
+ },
+ rowsPerBlock: 2,
+ },
+ }
+
+ for _, tc := range testCases {
+ tc := tc
+ t.Run(fmt.Sprintf("rowsPerRecord_%d", tc.rowsPerBlock), func(t *testing.T) {
+ actual := dc.RowsToColumnBlocks(tc.src, tc.rowsPerBlock)
+ require.Equal(t, tc.dst, actual)
+ })
+ }
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/ut/ya.make b/ydb/library/yql/providers/generic/connector/app/server/utils/ut/ya.make
new file mode 100644
index 0000000000..227f12138e
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/ut/ya.make
@@ -0,0 +1,5 @@
+GO_TEST_FOR(ydb/library/yql/providers/generic/connector/app/server/utils)
+
+SIZE(SMALL)
+
+END()
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/ya.make b/ydb/library/yql/providers/generic/connector/app/server/utils/ya.make
index 5205e5a2d0..ccf51cffe0 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/utils/ya.make
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/ya.make
@@ -2,21 +2,27 @@ GO_LIBRARY()
SRCS(
arrow_helpers.go
- connection_manager.go
converters.go
doc.go
endpoint.go
errors.go
logger.go
protobuf.go
- query_builder.go
select_helpers.go
+ sql.go
+ sql_mock.go
time.go
type_mapper.go
+ unit_test_helpers.go
)
GO_TEST_SRCS(
time_test.go
+ unit_test_helpers_test.go
)
END()
+
+RECURSE_FOR_TESTS(
+ ut
+)