diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-13 10:20:39 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-13 10:38:14 +0300 |
commit | 5315659cfcfc83a2997433487f8373a69f5b38a9 (patch) | |
tree | 59e4a53e9b3c4a6e0bc9fca191b51c9d64c48cb9 | |
parent | b7579a2e21c2048763703213e852f6b76ad31cfa (diff) | |
download | ydb-5315659cfcfc83a2997433487f8373a69f5b38a9.tar.gz |
YQ Connector: fix deadlock in ReadSplits
1. Поправлен механизм перекачки данных из соединения с СУБД в GRPC-стрим (устранён дедлок, асинхронная логика упростилась)
2. Упрощен overengineered код (перемудрил раньше с Go дженериками)
3. Написаны сквозные юнит-тесты, покрывающие всю логику перекладки данных из базы в GRPC стрим.
4. Выявлено нарушение соглашения о типах (бинарные строки мапились не в YQL String, а в YQL UTF8)
32 files changed, 1071 insertions, 490 deletions
diff --git a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go index 0b9d9e95af..4aa52b99fa 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go +++ b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go @@ -59,7 +59,7 @@ func (c Connection) Query(ctx context.Context, query string, args ...any) (utils return rows{Rows: out}, nil } -var _ utils.ConnectionManager[*Connection] = (*connectionManager)(nil) +var _ utils.ConnectionManager = (*connectionManager)(nil) type connectionManager struct { utils.ConnectionManagerBase @@ -70,7 +70,7 @@ func (c *connectionManager) Make( ctx context.Context, logger log.Logger, dsi *api_common.TDataSourceInstance, -) (*Connection, error) { +) (utils.Connection, error) { if dsi.GetCredentials().GetBasic() == nil { return nil, fmt.Errorf("currently only basic auth is supported") } @@ -129,10 +129,10 @@ func (c *connectionManager) Make( return &Connection{DB: conn, logger: queryLogger}, nil } -func (c *connectionManager) Release(logger log.Logger, conn *Connection) { +func (c *connectionManager) Release(logger log.Logger, conn utils.Connection) { utils.LogCloserError(logger, conn, "close clickhouse connection") } -func NewConnectionManager(cfg utils.ConnectionManagerBase) utils.ConnectionManager[*Connection] { +func NewConnectionManager(cfg utils.ConnectionManagerBase) utils.ConnectionManager { return &connectionManager{ConnectionManagerBase: cfg} } diff --git a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go index 2f9071bc65..a3b469d58c 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go +++ b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go @@ -11,7 +11,7 @@ import ( type queryExecutor struct { } -func (qm queryExecutor) DescribeTable(ctx context.Context, conn *Connection, request *api_service_protos.TDescribeTableRequest) (utils.Rows, error) { +func (qm queryExecutor) DescribeTable(ctx context.Context, conn utils.Connection, request *api_service_protos.TDescribeTableRequest) (utils.Rows, error) { out, err := conn.Query( ctx, "SELECT name, type FROM system.columns WHERE table = ? and database = ?", @@ -30,6 +30,6 @@ func (qm queryExecutor) DescribeTable(ctx context.Context, conn *Connection, req return out, nil } -func NewQueryExecutor() utils.QueryExecutor[*Connection] { +func NewQueryExecutor() utils.QueryExecutor { return queryExecutor{} } diff --git a/ydb/library/yql/providers/generic/connector/app/server/config.go b/ydb/library/yql/providers/generic/connector/app/server/config.go index 15c7998b98..8a3eca0696 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/config.go +++ b/ydb/library/yql/providers/generic/connector/app/server/config.go @@ -2,7 +2,6 @@ package server import ( "fmt" - "io/ioutil" "math" "os" @@ -123,14 +122,19 @@ func fileMustExist(path string) error { } func newConfigFromPath(configPath string) (*config.TServerConfig, error) { - data, err := ioutil.ReadFile(configPath) + data, err := os.ReadFile(configPath) if err != nil { return nil, fmt.Errorf("read file %v: %w", configPath, err) } var cfg config.TServerConfig - if err := prototext.Unmarshal(data, &cfg); err != nil { + unmarshaller := prototext.UnmarshalOptions{ + // Do not emit an error if config contains outdated or too fresh fields + DiscardUnknown: true, + } + + if err := unmarshaller.Unmarshal(data, &cfg); err != nil { return nil, fmt.Errorf("prototext unmarshal `%v`: %w", string(data), err) } diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/columnar_buffer_arrow_ipc_streaming.go b/ydb/library/yql/providers/generic/connector/app/server/paging/columnar_buffer_arrow_ipc_streaming.go index e0106a014d..76ec036ab3 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/paging/columnar_buffer_arrow_ipc_streaming.go +++ b/ydb/library/yql/providers/generic/connector/app/server/paging/columnar_buffer_arrow_ipc_streaming.go @@ -86,13 +86,15 @@ func (cb *columnarBufferArrowIPCStreaming) Release() { } } +func (cb *columnarBufferArrowIPCStreaming) TotalRows() int { return cb.builders[0].Len() } + // special implementation for buffer that writes schema with empty columns set type columnarBufferArrowIPCStreamingEmptyColumns struct { arrowAllocator memory.Allocator readLimiter ReadLimiter schema *arrow.Schema typeMapper utils.TypeMapper - rowsAdded int64 + rowsAdded int } // AddRow saves a row obtained from the datasource into the buffer @@ -114,7 +116,7 @@ func (cb *columnarBufferArrowIPCStreamingEmptyColumns) AddRow(acceptors []any) e func (cb *columnarBufferArrowIPCStreamingEmptyColumns) ToResponse() (*api_service_protos.TReadSplitsResponse, error) { columns := make([]arrow.Array, 0) - record := array.NewRecord(cb.schema, columns, cb.rowsAdded) + record := array.NewRecord(cb.schema, columns, int64(cb.rowsAdded)) // prepare arrow writer var buf bytes.Buffer @@ -138,6 +140,8 @@ func (cb *columnarBufferArrowIPCStreamingEmptyColumns) ToResponse() (*api_servic return out, nil } +func (cb *columnarBufferArrowIPCStreamingEmptyColumns) TotalRows() int { return int(cb.rowsAdded) } + // Frees resources if buffer is no longer used func (cb *columnarBufferArrowIPCStreamingEmptyColumns) Release() { } diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/interface.go b/ydb/library/yql/providers/generic/connector/app/server/paging/interface.go index 20762478a3..0ae70c3b60 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/paging/interface.go +++ b/ydb/library/yql/providers/generic/connector/app/server/paging/interface.go @@ -7,29 +7,45 @@ import ( api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos" ) -type Writer interface { - AddRow(acceptors []any) error - Finish() error - BufferQueue() <-chan ColumnarBuffer -} - -type WriterFactory interface { - MakeWriter( - ctx context.Context, - logger log.Logger, - pagination *api_service_protos.TPagination, - ) (Writer, error) -} - type ColumnarBuffer interface { // AddRow saves a row obtained from the datasource into the buffer AddRow(acceptors []any) error // ToResponse returns all the accumulated data and clears buffer ToResponse() (*api_service_protos.TReadSplitsResponse, error) - // Frees resources if buffer is no longer used + // Release frees resources if buffer is no longer used Release() + // TotalRows return the number of rows accumulated + TotalRows() int } type ColumnarBufferFactory interface { MakeBuffer() (ColumnarBuffer, error) } + +// ReadResult is an algebraic data type containing: +// 1. a buffer (e. g. page) packed with data +// 2. result of read operation (potentially with error) +type ReadResult struct { + ColumnarBuffer ColumnarBuffer + Error error +} + +// Sink is a destination for a data stream that is read out of an external data source. +type Sink interface { + // AddRow saves the row obtained from a stream incoming from an external data source. + AddRow(acceptors []any) error + // AddError propagates an error occured during the reading from the external data source. + AddError(err error) + // Finish reports the successful completion of reading the data stream. + Finish() + // ResultQueue returns a channel with results + ResultQueue() <-chan *ReadResult +} + +type SinkFactory interface { + MakeSink( + ctx context.Context, + logger log.Logger, + pagination *api_service_protos.TPagination, + ) (Sink, error) +} diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/mock.go b/ydb/library/yql/providers/generic/connector/app/server/paging/mock.go index c9642611f4..95325f4f53 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/paging/mock.go +++ b/ydb/library/yql/providers/generic/connector/app/server/paging/mock.go @@ -8,43 +8,44 @@ import ( api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos" ) -var _ Writer = (*WriterMock)(nil) +var _ Sink = (*SinkMock)(nil) -type WriterMock struct { - ColumnarBufferChan chan ColumnarBuffer +type SinkMock struct { mock.Mock } -func (m *WriterMock) AddRow(acceptors []any) error { - panic("not implemented") // TODO: Implement -} +func (m *SinkMock) AddRow(acceptors []any) error { + args := m.Called(acceptors...) -func (m *WriterMock) Finish() error { - args := m.Called() + return args.Error(0) +} - close(m.ColumnarBufferChan) +func (m *SinkMock) AddError(err error) { + m.Called(err) +} - return args.Error(0) +func (m *SinkMock) Finish() { + m.Called() } -func (m *WriterMock) BufferQueue() <-chan ColumnarBuffer { - return m.ColumnarBufferChan +func (m *SinkMock) ResultQueue() <-chan *ReadResult { + return m.Called().Get(0).(chan *ReadResult) } -var _ WriterFactory = (*WriterFactoryMock)(nil) +var _ SinkFactory = (*SinkFactoryMock)(nil) -type WriterFactoryMock struct { +type SinkFactoryMock struct { mock.Mock } -func (m *WriterFactoryMock) MakeWriter( +func (m *SinkFactoryMock) MakeSink( ctx context.Context, logger log.Logger, pagination *api_service_protos.TPagination, -) (Writer, error) { +) (Sink, error) { args := m.Called(pagination) - return args.Get(0).(Writer), args.Error(1) + return args.Get(0).(Sink), args.Error(1) } var _ ColumnarBuffer = (*ColumnarBufferMock)(nil) @@ -66,3 +67,7 @@ func (m *ColumnarBufferMock) ToResponse() (*api_service_protos.TReadSplitsRespon func (m *ColumnarBufferMock) Release() { m.Called() } + +func (m *ColumnarBufferMock) TotalRows() int { + panic("not implemented") // TODO: Implement +} diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/sink.go b/ydb/library/yql/providers/generic/connector/app/server/paging/sink.go new file mode 100644 index 0000000000..d616b30088 --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/app/server/paging/sink.go @@ -0,0 +1,126 @@ +//go:generate stringer -type=sinkState -output=sink_string.go +package paging + +import ( + "context" + "fmt" + + "github.com/ydb-platform/ydb/library/go/core/log" + "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils" + api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos" +) + +type sinkState int8 + +const ( + operational sinkState = iota + 1 + failed + finished +) + +var _ Sink = (*sinkImpl)(nil) + +type sinkImpl struct { + currBuffer ColumnarBuffer // accumulates incoming rows + resultQueue chan *ReadResult // outgoing buffer queue + bufferFactory ColumnarBufferFactory // creates new buffer + pagination *api_service_protos.TPagination // settings + rowsReceived uint64 // simple stats + rowsPerBuffer uint64 // TODO: use cfg + logger log.Logger // annotated logger + state sinkState // flag showing if it's ready to return data + ctx context.Context // client context +} + +func (s *sinkImpl) AddRow(acceptors []any) error { + if s.state != operational { + panic(s.unexpectedState(operational)) + } + + if err := s.currBuffer.AddRow(acceptors); err != nil { + return fmt.Errorf("acceptors to row set: %w", err) + } + + s.rowsReceived++ + + if s.isEnough() { + if err := s.flush(true); err != nil { + return fmt.Errorf("flush: %w", err) + } + } + + return nil +} + +func (s *sinkImpl) AddError(err error) { + if s.state != operational { + panic(s.unexpectedState(operational)) + } + + s.respondWith(nil, err) + + s.state = failed +} + +func (s *sinkImpl) isEnough() bool { + // TODO: implement pagination logic, check limits provided by client or config + return s.rowsReceived%s.rowsPerBuffer == 0 +} + +func (s *sinkImpl) flush(makeNewBuffer bool) error { + if s.currBuffer.TotalRows() == 0 { + return nil + } + + s.respondWith(s.currBuffer, nil) + + s.currBuffer = nil + + if makeNewBuffer { + var err error + + s.currBuffer, err = s.bufferFactory.MakeBuffer() + if err != nil { + return fmt.Errorf("make buffer: %w", err) + } + } + + return nil +} + +func (s *sinkImpl) Finish() { + if s.state != operational && s.state != failed { + panic(s.unexpectedState(operational, failed)) + } + + // if there is some data left, send it to the channel + if s.state == operational { + err := s.flush(false) + if err != nil { + s.respondWith(nil, fmt.Errorf("flush: %w", err)) + s.state = failed + } else { + s.state = finished + } + } + + // notify reader about the end of data + close(s.resultQueue) +} + +func (s *sinkImpl) ResultQueue() <-chan *ReadResult { + return s.resultQueue +} + +func (s *sinkImpl) respondWith(buf ColumnarBuffer, err error) { + select { + case s.resultQueue <- &ReadResult{ColumnarBuffer: buf, Error: err}: + case <-s.ctx.Done(): + } +} + +func (s *sinkImpl) unexpectedState(expected ...sinkState) error { + return fmt.Errorf( + "unexpected state '%v' (expected are '%v'): %w", + s.state, expected, utils.ErrInvariantViolation) +} diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/sink_factory.go b/ydb/library/yql/providers/generic/connector/app/server/paging/sink_factory.go new file mode 100644 index 0000000000..c2c482d820 --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/app/server/paging/sink_factory.go @@ -0,0 +1,53 @@ +package paging + +import ( + "context" + "fmt" + + "github.com/ydb-platform/ydb/library/go/core/log" + api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos" +) + +type sinkFactoryImpl struct { + columnarBufferFactory ColumnarBufferFactory + resultQueueCapacity int // TODO: take from config + rowsPerBuffer int // TODO: take from config +} + +func (sf *sinkFactoryImpl) MakeSink( + ctx context.Context, + logger log.Logger, + pagination *api_service_protos.TPagination, +) (Sink, error) { + if pagination != nil { + return nil, fmt.Errorf("pagination settings are not supported yet") + } + + buffer, err := sf.columnarBufferFactory.MakeBuffer() + if err != nil { + return nil, fmt.Errorf("wrap buffer: %w", err) + } + + return &sinkImpl{ + bufferFactory: sf.columnarBufferFactory, + resultQueue: make(chan *ReadResult, sf.resultQueueCapacity), + rowsPerBuffer: uint64(sf.rowsPerBuffer), + currBuffer: buffer, + logger: logger, + pagination: pagination, + state: operational, + ctx: ctx, + }, nil +} + +func NewSinkFactory( + cbf ColumnarBufferFactory, + resultQueueCapacity int, + rowsPerBuffer int, +) SinkFactory { + return &sinkFactoryImpl{ + columnarBufferFactory: cbf, + resultQueueCapacity: resultQueueCapacity, + rowsPerBuffer: rowsPerBuffer, + } +} diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/sink_string.go b/ydb/library/yql/providers/generic/connector/app/server/paging/sink_string.go new file mode 100644 index 0000000000..bea6becd67 --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/app/server/paging/sink_string.go @@ -0,0 +1,26 @@ +// Code generated by "stringer -type=sinkState -output=sink_string.go"; DO NOT EDIT. + +package paging + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[operational-1] + _ = x[failed-2] + _ = x[finished-3] +} + +const _sinkState_name = "operationalfailedfinished" + +var _sinkState_index = [...]uint8{0, 11, 17, 25} + +func (i sinkState) String() string { + i -= 1 + if i < 0 || i >= sinkState(len(_sinkState_index)-1) { + return "sinkState(" + strconv.FormatInt(int64(i+1), 10) + ")" + } + return _sinkState_name[_sinkState_index[i]:_sinkState_index[i+1]] +} diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/writer.go b/ydb/library/yql/providers/generic/connector/app/server/paging/writer.go deleted file mode 100644 index ced84deabe..0000000000 --- a/ydb/library/yql/providers/generic/connector/app/server/paging/writer.go +++ /dev/null @@ -1,83 +0,0 @@ -package paging - -import ( - "context" - "fmt" - - "github.com/ydb-platform/ydb/library/go/core/log" - api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos" -) - -var _ Writer = (*writerImpl)(nil) - -type writerImpl struct { - buffer ColumnarBuffer // accumulates data from rows - bufferQueue chan ColumnarBuffer // outgoing buffer queue - bufferFactory ColumnarBufferFactory // creates new buffer - pagination *api_service_protos.TPagination // settings - rowsReceived uint64 // simple stats - logger log.Logger // annotated logger - operational bool // flag showing if it's ready to return data - ctx context.Context // client context -} - -func (pw *writerImpl) AddRow(acceptors []any) error { - if !pw.operational { - return fmt.Errorf("paging writer is not operational") - } - - if err := pw.buffer.AddRow(acceptors); err != nil { - return fmt.Errorf("acceptors to row set: %w", err) - } - - pw.rowsReceived++ - - if pw.isEnough() { - if err := pw.flush(true); err != nil { - return fmt.Errorf("flush: %w", err) - } - } - - return nil -} - -func (pw *writerImpl) isEnough() bool { - // TODO: implement pagination logic, check limits provided by client - return pw.rowsReceived%10000 == 0 -} - -func (pw *writerImpl) flush(makeNewBuffer bool) error { - select { - case pw.bufferQueue <- pw.buffer: - case <-pw.ctx.Done(): - return pw.ctx.Err() - } - - var err error - - if makeNewBuffer { - pw.buffer, err = pw.bufferFactory.MakeBuffer() - if err != nil { - return fmt.Errorf("make buffer: %w", err) - } - } - - return nil -} - -func (pw *writerImpl) Finish() error { - if err := pw.flush(false); err != nil { - return fmt.Errorf("flush: %w", err) - } - - pw.operational = false - - // notify reader about end of the stream - close(pw.bufferQueue) - - return nil -} - -func (pw *writerImpl) BufferQueue() <-chan ColumnarBuffer { - return pw.bufferQueue -} diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/writer_factory.go b/ydb/library/yql/providers/generic/connector/app/server/paging/writer_factory.go deleted file mode 100644 index 3cded585f8..0000000000 --- a/ydb/library/yql/providers/generic/connector/app/server/paging/writer_factory.go +++ /dev/null @@ -1,44 +0,0 @@ -package paging - -import ( - "context" - "fmt" - - "github.com/ydb-platform/ydb/library/go/core/log" - api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos" -) - -type writerFactoryImpl struct { - columnarBufferFactory ColumnarBufferFactory -} - -func (wf *writerFactoryImpl) MakeWriter( - ctx context.Context, - logger log.Logger, - pagination *api_service_protos.TPagination, -) (Writer, error) { - if pagination != nil { - return nil, fmt.Errorf("pagination settings are not supported yet") - } - - buffer, err := wf.columnarBufferFactory.MakeBuffer() - if err != nil { - return nil, fmt.Errorf("wrap buffer: %w", err) - } - - return &writerImpl{ - bufferFactory: wf.columnarBufferFactory, - bufferQueue: make(chan ColumnarBuffer, 10), // TODO: use config - buffer: buffer, - logger: logger, - pagination: pagination, - operational: true, - ctx: ctx, - }, nil -} - -func NewWriterFactory(cbf ColumnarBufferFactory) WriterFactory { - return &writerFactoryImpl{ - columnarBufferFactory: cbf, - } -} diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/ya.make b/ydb/library/yql/providers/generic/connector/app/server/paging/ya.make index 080042a0cc..6479e5b3f5 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/paging/ya.make +++ b/ydb/library/yql/providers/generic/connector/app/server/paging/ya.make @@ -7,12 +7,9 @@ SRCS( interface.go mock.go read_limiter.go - writer.go - writer_factory.go -) - -GO_TEST_SRCS( - time_test.go + sink.go + sink_factory.go + sink_string.go ) END() diff --git a/ydb/library/yql/providers/generic/connector/app/server/postgresql/connection_manager.go b/ydb/library/yql/providers/generic/connector/app/server/postgresql/connection_manager.go index cd80d35cef..874f7cc758 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/postgresql/connection_manager.go +++ b/ydb/library/yql/providers/generic/connector/app/server/postgresql/connection_manager.go @@ -57,7 +57,7 @@ func (c Connection) Query(ctx context.Context, query string, args ...any) (utils return rows{Rows: out}, err } -var _ utils.ConnectionManager[*Connection] = (*connectionManager)(nil) +var _ utils.ConnectionManager = (*connectionManager)(nil) type connectionManager struct { utils.ConnectionManagerBase @@ -68,7 +68,7 @@ func (c *connectionManager) Make( ctx context.Context, logger log.Logger, dsi *api_common.TDataSourceInstance, -) (*Connection, error) { +) (utils.Connection, error) { if dsi.GetCredentials().GetBasic() == nil { return nil, fmt.Errorf("currently only basic auth is supported") } @@ -118,10 +118,10 @@ func (c *connectionManager) Make( return &Connection{conn, queryLogger}, nil } -func (c *connectionManager) Release(logger log.Logger, conn *Connection) { +func (c *connectionManager) Release(logger log.Logger, conn utils.Connection) { utils.LogCloserError(logger, conn, "close posgresql connection") } -func NewConnectionManager(cfg utils.ConnectionManagerBase) utils.ConnectionManager[*Connection] { +func NewConnectionManager(cfg utils.ConnectionManagerBase) utils.ConnectionManager { return &connectionManager{ConnectionManagerBase: cfg} } diff --git a/ydb/library/yql/providers/generic/connector/app/server/postgresql/query_executor.go b/ydb/library/yql/providers/generic/connector/app/server/postgresql/query_executor.go index b27578bfa9..e894bfb3c7 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/postgresql/query_executor.go +++ b/ydb/library/yql/providers/generic/connector/app/server/postgresql/query_executor.go @@ -11,7 +11,7 @@ import ( type queryExecutor struct { } -func (qm queryExecutor) DescribeTable(ctx context.Context, conn *Connection, request *api_service_protos.TDescribeTableRequest) (utils.Rows, error) { +func (qm queryExecutor) DescribeTable(ctx context.Context, conn utils.Connection, request *api_service_protos.TDescribeTableRequest) (utils.Rows, error) { schema := request.GetDataSourceInstance().GetPgOptions().GetSchema() out, err := conn.Query( ctx, @@ -27,6 +27,6 @@ func (qm queryExecutor) DescribeTable(ctx context.Context, conn *Connection, req return out, nil } -func NewQueryExecutor() utils.QueryExecutor[*Connection] { +func NewQueryExecutor() utils.QueryExecutor { return queryExecutor{} } diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go index e24ef109b7..66c8268591 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go +++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go @@ -6,20 +6,19 @@ import ( "strings" "github.com/ydb-platform/ydb/library/go/core/log" - api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common" "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/paging" "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils" api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos" ) -type handlerImpl[CONN utils.Connection] struct { +type handlerImpl struct { typeMapper utils.TypeMapper - queryBuilder utils.QueryExecutor[CONN] - connectionManager utils.ConnectionManager[CONN] + queryBuilder utils.QueryExecutor + connectionManager utils.ConnectionManager logger log.Logger } -func (h *handlerImpl[CONN]) DescribeTable( +func (h *handlerImpl) DescribeTable( ctx context.Context, logger log.Logger, request *api_service_protos.TDescribeTableRequest, @@ -67,22 +66,11 @@ func (h *handlerImpl[CONN]) DescribeTable( return &api_service_protos.TDescribeTableResponse{Schema: schema}, nil } -func (h *handlerImpl[CONN]) ReadSplit( - ctx context.Context, +func (h *handlerImpl) makeReadSplitQuery( logger log.Logger, - dataSourceInstance *api_common.TDataSourceInstance, split *api_service_protos.TSplit, - pagingWriter paging.Writer, -) error { - conn, err := h.connectionManager.Make(ctx, logger, dataSourceInstance) - if err != nil { - return fmt.Errorf("make connection: %w", err) - } - - defer h.connectionManager.Release(logger, conn) - +) (string, error) { // SELECT $columns - // interpolate request var sb strings.Builder @@ -90,7 +78,7 @@ func (h *handlerImpl[CONN]) ReadSplit( columns, err := utils.SelectWhatToYDBColumns(split.Select.What) if err != nil { - return fmt.Errorf("convert Select.What.Items to Ydb.Columns: %w", err) + return "", fmt.Errorf("convert Select.What.Items to Ydb.Columns: %w", err) } // for the case of empty column set select some constant for constructing a valid sql statement @@ -109,7 +97,7 @@ func (h *handlerImpl[CONN]) ReadSplit( // SELECT $columns FROM $from tableName := split.GetSelect().GetFrom().GetTable() if tableName == "" { - return fmt.Errorf("empty table name") + return "", fmt.Errorf("empty table name") } sb.WriteString(" FROM ") @@ -127,7 +115,26 @@ func (h *handlerImpl[CONN]) ReadSplit( // execute query - query := sb.String() + return sb.String(), nil +} + +func (h *handlerImpl) doReadSplit( + ctx context.Context, + logger log.Logger, + split *api_service_protos.TSplit, + sink paging.Sink, +) error { + query, err := h.makeReadSplitQuery(logger, split) + if err != nil { + return fmt.Errorf("make read split query: %w", err) + } + + conn, err := h.connectionManager.Make(ctx, logger, split.Select.DataSourceInstance) + if err != nil { + return fmt.Errorf("make connection: %w", err) + } + + defer h.connectionManager.Release(logger, conn) rows, err := conn.Query(ctx, query) if err != nil { @@ -146,7 +153,7 @@ func (h *handlerImpl[CONN]) ReadSplit( return fmt.Errorf("rows scan error: %w", err) } - if err := pagingWriter.AddRow(acceptors); err != nil { + if err := sink.AddRow(acceptors); err != nil { return fmt.Errorf("add row to paging writer: %w", err) } } @@ -158,13 +165,27 @@ func (h *handlerImpl[CONN]) ReadSplit( return nil } -func (h *handlerImpl[CONN]) TypeMapper() utils.TypeMapper { return h.typeMapper } +func (h *handlerImpl) ReadSplit( + ctx context.Context, + logger log.Logger, + split *api_service_protos.TSplit, + sink paging.Sink, +) { + err := h.doReadSplit(ctx, logger, split, sink) + if err != nil { + sink.AddError(err) + } + + sink.Finish() +} + +func (h *handlerImpl) TypeMapper() utils.TypeMapper { return h.typeMapper } -func newHandler[CONN utils.Connection]( +func newHandler( logger log.Logger, - preset *preset[CONN], + preset *handlerPreset, ) Handler { - return &handlerImpl[CONN]{ + return &handlerImpl{ logger: logger, queryBuilder: preset.queryExecutor, connectionManager: preset.connectionManager, diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler_factory.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler_factory.go index 6adca6351e..2b4aa105ba 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler_factory.go +++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler_factory.go @@ -10,43 +10,43 @@ import ( "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils" ) -type preset[CONN utils.Connection] struct { - queryExecutor utils.QueryExecutor[CONN] - connectionManager utils.ConnectionManager[CONN] +type handlerPreset struct { + queryExecutor utils.QueryExecutor + connectionManager utils.ConnectionManager typeMapper utils.TypeMapper } -type HandlerFactory struct { - clickhouse preset[*clickhouse.Connection] - postgresql preset[*postgresql.Connection] +type handlerFactoryImpl struct { + clickhouse handlerPreset + postgresql handlerPreset } -func (hf *HandlerFactory) Make( +func (hf *handlerFactoryImpl) Make( logger log.Logger, dataSourceType api_common.EDataSourceKind, ) (Handler, error) { switch dataSourceType { case api_common.EDataSourceKind_CLICKHOUSE: - return newHandler[*clickhouse.Connection](logger, &hf.clickhouse), nil + return newHandler(logger, &hf.clickhouse), nil case api_common.EDataSourceKind_POSTGRESQL: - return newHandler[*postgresql.Connection](logger, &hf.postgresql), nil + return newHandler(logger, &hf.postgresql), nil default: return nil, fmt.Errorf("pick handler for data source type '%v': %w", dataSourceType, utils.ErrDataSourceNotSupported) } } -func NewHandlerFactory(qlf utils.QueryLoggerFactory) *HandlerFactory { +func NewHandlerFactory(qlf utils.QueryLoggerFactory) HandlerFactory { connManagerCfg := utils.ConnectionManagerBase{ QueryLoggerFactory: qlf, } - return &HandlerFactory{ - clickhouse: preset[*clickhouse.Connection]{ + return &handlerFactoryImpl{ + clickhouse: handlerPreset{ queryExecutor: clickhouse.NewQueryExecutor(), connectionManager: clickhouse.NewConnectionManager(connManagerCfg), typeMapper: clickhouse.NewTypeMapper(), }, - postgresql: preset[*postgresql.Connection]{ + postgresql: handlerPreset{ queryExecutor: postgresql.NewQueryExecutor(), connectionManager: postgresql.NewConnectionManager(connManagerCfg), typeMapper: postgresql.NewTypeMapper(), diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/interface.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/interface.go index b9177d1867..dd79021b81 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/interface.go +++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/interface.go @@ -20,10 +20,16 @@ type Handler interface { ReadSplit( ctx context.Context, logger log.Logger, - dataSourceInstance *api_common.TDataSourceInstance, split *api_service_protos.TSplit, - pagingWriter paging.Writer, - ) error + sink paging.Sink, + ) TypeMapper() utils.TypeMapper } + +type HandlerFactory interface { + Make( + logger log.Logger, + dataSourceType api_common.EDataSourceKind, + ) (Handler, error) +} diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/mock.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/mock.go index 369db5db95..28dbe58fe6 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/mock.go +++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/mock.go @@ -15,7 +15,6 @@ var _ Handler = (*HandlerMock)(nil) type HandlerMock struct { mock.Mock - ReadFinished chan struct{} // close this channel to allow } func (m *HandlerMock) DescribeTable( @@ -29,17 +28,33 @@ func (m *HandlerMock) DescribeTable( func (m *HandlerMock) ReadSplit( ctx context.Context, logger log.Logger, - dataSourceInstance *api_common.TDataSourceInstance, split *api_service_protos.TSplit, - pagingWriter paging.Writer, -) error { - <-m.ReadFinished - - args := m.Called(dataSourceInstance, split, pagingWriter) - - return args.Error(0) + pagingWriter paging.Sink, +) { + m.Called(split, pagingWriter) } func (m *HandlerMock) TypeMapper() utils.TypeMapper { panic("not implemented") // TODO: Implement } + +var _ HandlerFactory = (*HandlerFactoryMock)(nil) + +type HandlerFactoryMock struct { + QueryExecutor utils.QueryExecutor + ConnectionManager utils.ConnectionManager + TypeMapper utils.TypeMapper +} + +func (m *HandlerFactoryMock) Make(logger log.Logger, dataSourceType api_common.EDataSourceKind) (Handler, error) { + handler := newHandler( + logger, + &handlerPreset{ + queryExecutor: m.QueryExecutor, + connectionManager: m.ConnectionManager, + typeMapper: m.TypeMapper, + }, + ) + + return handler, nil +} diff --git a/ydb/library/yql/providers/generic/connector/app/server/service_connector.go b/ydb/library/yql/providers/generic/connector/app/server/service_connector.go index 29d1fe1726..54ac7a1aa4 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/service_connector.go +++ b/ydb/library/yql/providers/generic/connector/app/server/service_connector.go @@ -19,11 +19,12 @@ import ( api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/reflection" ) type serviceConnector struct { api_service.UnimplementedConnectorServer - handlerFactory *rdbms.HandlerFactory + handlerFactory rdbms.HandlerFactory memoryAllocator memory.Allocator readLimiterFactory *paging.ReadLimiterFactory cfg *config.TServerConfig @@ -183,14 +184,30 @@ func (s *serviceConnector) doReadSplits( for i, split := range request.Splits { columnarBufferFactory, err := paging.NewColumnarBufferFactory( - logger, s.memoryAllocator, s.readLimiterFactory, request.Format, split.Select.What, handler.TypeMapper()) + logger, + s.memoryAllocator, + s.readLimiterFactory, + request.Format, + split.Select.What, + handler.TypeMapper()) if err != nil { return 0, fmt.Errorf("new columnar buffer factory: %w", err) } - pagingWriterFactory := paging.NewWriterFactory(columnarBufferFactory) + // TODO: use configs + const ( + resultQueueCapacity = 10 + rowsPerBuffer = 10000 + ) - bytesInSplit, err := s.readSplit(logger, stream, request, split, pagingWriterFactory, handler) + sinkFactory := paging.NewSinkFactory(columnarBufferFactory, resultQueueCapacity, rowsPerBuffer) + + sink, err := sinkFactory.MakeSink(stream.Context(), logger, request.Pagination) + if err != nil { + return 0, fmt.Errorf("new sink: %w", err) + } + + bytesInSplit, err := s.readSplit(logger, stream, request, split, sink, handler) if err != nil { return 0, fmt.Errorf("read split %d: %w", i, err) } @@ -206,22 +223,19 @@ func (s *serviceConnector) readSplit( stream api_service.Connector_ReadSplitsServer, request *api_service_protos.TReadSplitsRequest, split *api_service_protos.TSplit, - pagingWriterFactory paging.WriterFactory, + sink paging.Sink, handler rdbms.Handler, ) (uint64, error) { logger.Debug("split reading started") - streamer, err := streaming.NewStreamer( + streamer := streaming.NewStreamer( logger, stream, request, split, - pagingWriterFactory, + sink, handler, ) - if err != nil { - return 0, fmt.Errorf("new streamer: %w", err) - } totalBytesSent, err := streamer.Run() if err != nil { @@ -318,6 +332,7 @@ func newServiceConnector( } grpcServer := grpc.NewServer(options...) + reflection.Register(grpcServer) s := &serviceConnector{ handlerFactory: rdbms.NewHandlerFactory(queryLoggerFactory), diff --git a/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer.go b/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer.go index c3d4db278c..5842dcf892 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer.go +++ b/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer.go @@ -1,6 +1,7 @@ package streaming import ( + "context" "fmt" "sync" @@ -17,33 +18,32 @@ type Streamer struct { request *api_service_protos.TReadSplitsRequest handler rdbms.Handler split *api_service_protos.TSplit - pagingWriter paging.Writer + sink paging.Sink totalBytesSent uint64 // TODO: replace with stats accumulator logger log.Logger + ctx context.Context // clone of a stream context + cancel context.CancelFunc } -func (s *Streamer) writeDataToStream(readErrChan <-chan error) error { +func (s *Streamer) writeDataToStream() error { + // exit from this function will cause publisher's goroutine termination as well + defer s.cancel() + for { select { - case buffer, ok := <-s.pagingWriter.BufferQueue(): + case result, ok := <-s.sink.ResultQueue(): if !ok { // correct termination return nil } - // handle next data block - if err := s.sendBufferToStream(buffer); err != nil { - return fmt.Errorf("send buffer to stream: %w", err) - } - case err := <-readErrChan: - // terminate loop in case of read error - if err != nil { - return fmt.Errorf("read error: %w", err) + if result.Error != nil { + return fmt.Errorf("read result: %w", result.Error) } - // otherwise drain the last rows left in writer into a channel - if err := s.pagingWriter.Finish(); err != nil { - return fmt.Errorf("finish paging writer: %w", err) + // handle next data block + if err := s.sendBufferToStream(result.ColumnarBuffer); err != nil { + return fmt.Errorf("send buffer to stream: %w", err) } case <-s.stream.Context().Done(): // handle request termination @@ -73,44 +73,22 @@ func (s *Streamer) sendBufferToStream(buffer paging.ColumnarBuffer) error { return nil } -func (s *Streamer) readDataFromSource() error { - // run blocking read - err := s.handler.ReadSplit( - s.stream.Context(), - s.logger, - s.request.GetDataSourceInstance(), - s.split, - s.pagingWriter, - ) - - if err != nil { - return fmt.Errorf("read split: %w", err) - } - - return nil -} - func (s *Streamer) Run() (uint64, error) { - readErrChan := make(chan error) - wg := &sync.WaitGroup{} wg.Add(1) defer wg.Wait() // launch read from the data source; - // reading goroutine controls writing goroutine lifetime + // subsriber goroutine controls publisher goroutine lifetime go func() { defer wg.Done() - select { - case readErrChan <- s.readDataFromSource(): - case <-s.stream.Context().Done(): - } + s.handler.ReadSplit(s.ctx, s.logger, s.split, s.sink) }() // pass received blocks into the GRPC channel - if err := s.writeDataToStream(readErrChan); err != nil { + if err := s.writeDataToStream(); err != nil { return s.totalBytesSent, fmt.Errorf("write data to stream: %w", err) } @@ -122,24 +100,19 @@ func NewStreamer( stream api_service.Connector_ReadSplitsServer, request *api_service_protos.TReadSplitsRequest, split *api_service_protos.TSplit, - pagingWriterFactory paging.WriterFactory, + sink paging.Sink, handler rdbms.Handler, -) (*Streamer, error) { - pagingWriter, err := pagingWriterFactory.MakeWriter( - stream.Context(), - logger, - request.GetPagination(), - ) - if err != nil { - return nil, fmt.Errorf("new paging writer: %w", err) - } +) *Streamer { + ctx, cancel := context.WithCancel(stream.Context()) return &Streamer{ - logger: logger, - stream: stream, - split: split, - request: request, - handler: handler, - pagingWriter: pagingWriter, - }, nil + logger: logger, + stream: stream, + split: split, + request: request, + handler: handler, + sink: sink, + ctx: ctx, + cancel: cancel, + } } diff --git a/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer_test.go b/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer_test.go index e1852190c9..51152171bd 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer_test.go +++ b/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer_test.go @@ -1,13 +1,21 @@ package streaming import ( + "bytes" "context" "errors" "fmt" "testing" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/ipc" + "github.com/apache/arrow/go/v13/arrow/memory" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/ydb-platform/ydb/library/go/core/log" + api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common" + "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/clickhouse" "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/paging" "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/rdbms" "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils" @@ -20,7 +28,7 @@ var _ api_service.Connector_ReadSplitsServer = (*streamMock)(nil) type streamMock struct { mock.Mock api_service.Connector_ReadSplitsServer - sendTriggeredChan chan struct{} + logger log.Logger } func (m *streamMock) Context() context.Context { @@ -32,228 +40,268 @@ func (m *streamMock) Context() context.Context { func (m *streamMock) Send(response *api_service_protos.TReadSplitsResponse) error { args := m.Called(response) - if m.sendTriggeredChan != nil { - close(m.sendTriggeredChan) - } - return args.Error(0) } -func TestStreaming(t *testing.T) { - logger := utils.NewTestLogger(t) +func (m *streamMock) makeSendMatcher( + t *testing.T, + split *api_service_protos.TSplit, + expectedColumnarBlock [][]any, + expectedRowCount int, +) func(response *api_service_protos.TReadSplitsResponse) bool { + return func(response *api_service_protos.TReadSplitsResponse) bool { + buf := bytes.NewBuffer(response.GetArrowIpcStreaming()) - request := &api_service_protos.TReadSplitsRequest{} - split := &api_service_protos.TSplit{} + reader, err := ipc.NewReader(buf) + require.NoError(t, err) - t.Run("positive", func(t *testing.T) { - ctx := context.Background() - stream := &streamMock{} - stream.On("Context").Return(ctx) + for reader.Next() { + record := reader.Record() - writer := &paging.WriterMock{ - ColumnarBufferChan: make(chan paging.ColumnarBuffer), - } + require.Equal(t, len(split.Select.What.Items), len(record.Columns())) - writerFactory := &paging.WriterFactoryMock{} - writerFactory.On("MakeWriter", request.GetPagination()).Return(writer, nil) + if record.NumRows() != int64(expectedRowCount) { + return false + } - handler := &rdbms.HandlerMock{ - ReadFinished: make(chan struct{}), - } + col0 := record.Column(0).(*array.Int32) + require.Equal(t, &arrow.Int32Type{}, col0.DataType()) - // populate channel with predefined data - const ( - pageSize = 1 << 10 - totalPages = 3 - ) + for i := 0; i < len(expectedColumnarBlock[0]); i++ { + if expectedColumnarBlock[0][i] != col0.Value(i) { + return false + } + } - preparedColumnarBuffers := []paging.ColumnarBuffer{} + // FIXME: YQ-2590: String -> Binary + col1 := record.Column(1).(*array.String) + require.Equal(t, &arrow.StringType{}, col1.DataType()) - for i := 0; i < totalPages; i++ { - cb := &paging.ColumnarBufferMock{} - response := &api_service_protos.TReadSplitsResponse{ - Payload: &api_service_protos.TReadSplitsResponse_ArrowIpcStreaming{ - ArrowIpcStreaming: make([]byte, pageSize), - }, + for i := 0; i < len(expectedColumnarBlock[1]); i++ { + if expectedColumnarBlock[1][i].(string) != col1.Value(i) { + return false + } } - cb.On("ToResponse").Return(response, nil).Once() - stream.On("Send", response).Return(nil).Once() - cb.On("Release").Return().Once() - - preparedColumnarBuffers = append(preparedColumnarBuffers, cb) } - go func() { - // inject buffers into queue - for _, cb := range preparedColumnarBuffers { - writer.ColumnarBufferChan <- cb - } + reader.Release() - // let handler return - close(handler.ReadFinished) - }() + return true + } +} - // read is succesfull - readFinished := handler. - On("ReadSplit", request.GetDataSourceInstance(), split, writer). - Return(nil).Once() +type testCaseStreaming struct { + src [][]any + rowsPerBlock int + bufferQueueCapacity int + scanErr error + sendErr error +} - writer.On("Finish").Return(nil).NotBefore(readFinished).Once() +func (tc testCaseStreaming) name() string { + return fmt.Sprintf( + "totalRows_%d_rowsPerBlock_%d_bufferQueueCapacity_%d", + len(tc.src), tc.rowsPerBlock, tc.bufferQueueCapacity) +} - streamer, err := NewStreamer( - logger, - stream, - request, - split, - writerFactory, - handler, - ) +func (tc testCaseStreaming) messageParams() (sentMessages, rowsInLastMessage int) { + modulo := len(tc.src) % tc.rowsPerBlock - require.NoError(t, err) - require.NotNil(t, streamer) + if modulo == 0 { + sentMessages = len(tc.src) / tc.rowsPerBlock + rowsInLastMessage = tc.rowsPerBlock + } else { + sentMessages = len(tc.src)/tc.rowsPerBlock + 1 + rowsInLastMessage = modulo + } - totalBytes, err := streamer.Run() - require.NoError(t, err) - require.Equal(t, totalBytes, uint64(pageSize*totalPages)) + if tc.scanErr != nil { + sentMessages-- + rowsInLastMessage = tc.rowsPerBlock + } - mocks := []interface{}{stream, writer, writerFactory, handler} - for _, cb := range preparedColumnarBuffers { - mocks = append(mocks, cb) - } + return +} - mock.AssertExpectationsForObjects(t, mocks...) - }) +func (tc testCaseStreaming) execute(t *testing.T) { + logger := utils.NewTestLogger(t) + request := &api_service_protos.TReadSplitsRequest{} + split := utils.MakeTestSplit() - t.Run("handler read splits error", func(t *testing.T) { - ctx := context.Background() - stream := &streamMock{} - stream.On("Context").Return(ctx) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - writer := &paging.WriterMock{ - ColumnarBufferChan: make(chan paging.ColumnarBuffer), - } + stream := &streamMock{logger: logger} - writerFactory := &paging.WriterFactoryMock{} - writerFactory.On("MakeWriter", request.GetPagination()).Return(writer, nil) + stream.On("Context").Return(ctx) - handler := &rdbms.HandlerMock{ - ReadFinished: make(chan struct{}), - } + connection := &utils.ConnectionMock{} + + connectionManager := &utils.ConnectionManagerMock{} + connectionManager.On("Make", split.Select.DataSourceInstance).Return(connection, nil).Once() + connectionManager.On("Release", connection).Return().Once() - // populate channel with predefined data - const pageSize = 1 << 10 + rows := &utils.RowsMock{ + PredefinedData: tc.src, + } + connection.On("Query", "SELECT col0, col1 FROM example_1").Return(rows, nil).Once() + + // FIXME: YQ-2590: string -> []byte + col0Acceptor := new(*int32) + *col0Acceptor = new(int32) + col1Acceptor := new(*string) + *col1Acceptor = new(string) + + acceptors := []any{col0Acceptor, col1Acceptor} + + if tc.scanErr == nil { + rows.On("MakeAcceptors").Return(acceptors, nil).Once() + rows.On("Next").Return(true).Times(len(rows.PredefinedData)) + rows.On("Next").Return(false).Once() + rows.On("Scan", acceptors...).Return(nil).Times(len(rows.PredefinedData)) + rows.On("Err").Return(nil).Once() + rows.On("Close").Return(nil).Once() + } else { + rows.On("MakeAcceptors").Return(acceptors, nil).Once() + rows.On("Next").Return(true).Times(len(rows.PredefinedData) + 1) + rows.On("Scan", acceptors...).Return(nil).Times(len(rows.PredefinedData) - 1) + // instead of the last message, an error occurs + rows.On("Scan", acceptors...).Return(tc.scanErr).Once() + rows.On("Err").Return(nil).Once() + rows.On("Close").Return(nil).Once() + } + + totalMessages, rowsInLastMessage := tc.messageParams() + + expectedColumnarBlocks := utils.DataConverter{}.RowsToColumnBlocks(rows.PredefinedData, tc.rowsPerBlock) + + if tc.sendErr == nil { + for sendCallID := 0; sendCallID < totalMessages; sendCallID++ { + expectedColumnarBlock := expectedColumnarBlocks[sendCallID] + + rowsInMessage := tc.rowsPerBlock + if sendCallID == totalMessages-1 { + rowsInMessage = rowsInLastMessage + } + + matcher := stream.makeSendMatcher(t, split, expectedColumnarBlock, rowsInMessage) - cb := &paging.ColumnarBufferMock{} - response1 := &api_service_protos.TReadSplitsResponse{ - Payload: &api_service_protos.TReadSplitsResponse_ArrowIpcStreaming{ - ArrowIpcStreaming: make([]byte, pageSize), - }, + stream.On("Send", mock.MatchedBy(matcher)).Return(nil).Once() } + } else { + // the first attempt to send response is failed + stream.On("Send", mock.MatchedBy(func(response *api_service_protos.TReadSplitsResponse) bool { + cancel() // emulate real behavior of GRPC - cb.On("ToResponse").Return(response1, nil).Once() - stream.On("Send", response1).Return(nil).Once() - cb.On("Release").Return().NotBefore().Once() + return true + })).Return(tc.sendErr).Once() + } + + typeMapper := clickhouse.NewTypeMapper() + + handlerFactory := &rdbms.HandlerFactoryMock{ + ConnectionManager: connectionManager, + TypeMapper: typeMapper, + } - go func() { - // after first received block an error occurs - writer.ColumnarBufferChan <- cb + handler, err := handlerFactory.Make(logger, api_common.EDataSourceKind_CLICKHOUSE) + require.NoError(t, err) - close(handler.ReadFinished) - }() + cbf, err := paging.NewColumnarBufferFactory( + logger, + memory.NewGoAllocator(), + paging.NewReadLimiterFactory(nil), + api_service_protos.TReadSplitsRequest_ARROW_IPC_STREAMING, + split.Select.What, + typeMapper) + require.NoError(t, err) - // reading from data source returned an error - readErr := fmt.Errorf("failed to read from data source") - handler. - On("ReadSplit", request.GetDataSourceInstance(), split, writer). - Return(readErr).Once() + sink, err := paging.NewSinkFactory(cbf, tc.bufferQueueCapacity, tc.rowsPerBlock).MakeSink(ctx, logger, nil) + require.NoError(t, err) - streamer, err := NewStreamer( - logger, - stream, - request, - split, - writerFactory, - handler, - ) + streamer := NewStreamer(logger, stream, request, split, sink, handler) + _, err = streamer.Run() + + switch { + case tc.scanErr != nil: + require.True(t, errors.Is(err, tc.scanErr)) + case tc.sendErr != nil: + require.True(t, errors.Is(err, tc.sendErr)) + default: require.NoError(t, err) - require.NotNil(t, streamer) + } - totalBytes, err := streamer.Run() - require.Error(t, err) - require.True(t, errors.Is(err, readErr)) - require.Equal(t, totalBytes, uint64(pageSize)) + mocks := []interface{}{stream, connectionManager, connection} - mock.AssertExpectationsForObjects(t, stream, writer, writerFactory, handler, cb) - }) + mock.AssertExpectationsForObjects(t, mocks...) +} - t.Run("grpc stream send error", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - stream := &streamMock{ - sendTriggeredChan: make(chan struct{}), - } - stream.On("Context").Return(ctx) +func TestStreaming(t *testing.T) { + srcValues := [][][]any{ + { + {int32(1), "a"}, + {int32(2), "b"}, + {int32(3), "c"}, + {int32(4), "d"}, + }, + { + {int32(1), "a"}, + {int32(2), "b"}, + {int32(3), "c"}, + {int32(4), "d"}, + {int32(5), "e"}, + }, + } + rowsPerBlockValues := []int{2} + bufferQueueCapacityValues := []int{0, 1, 10} - writer := &paging.WriterMock{ - ColumnarBufferChan: make(chan paging.ColumnarBuffer), - } + var testCases []testCaseStreaming - writerFactory := &paging.WriterFactoryMock{} - writerFactory.On("MakeWriter", request.GetPagination()).Return(writer, nil) + for _, src := range srcValues { + for _, rowsPerBlock := range rowsPerBlockValues { + for _, bufferQueueCapacity := range bufferQueueCapacityValues { + tc := testCaseStreaming{ + src: src, + rowsPerBlock: rowsPerBlock, + bufferQueueCapacity: bufferQueueCapacity, + } - handler := &rdbms.HandlerMock{ - ReadFinished: stream.sendTriggeredChan, + testCases = append(testCases, tc) + } } + } - // populate channel with predefined data - const pageSize = 1 << 10 - - cb := &paging.ColumnarBufferMock{} - response1 := &api_service_protos.TReadSplitsResponse{ - Payload: &api_service_protos.TReadSplitsResponse_ArrowIpcStreaming{ - ArrowIpcStreaming: make([]byte, pageSize), - }, + t.Run("positive", func(t *testing.T) { + for _, tc := range testCases { + tc := tc + t.Run(tc.name(), func(t *testing.T) { + tc.execute(t) + }) } + }) - cb.On("ToResponse").Return(response1, nil).Once() - - // network error occures when trying to send first page to the stream - sendErr := fmt.Errorf("GRPC error") - send1 := stream.On("Send", response1).Return(sendErr).Once() - cb.On("Release").Return().Once() - - go func() { - // populate incoming queue with data - writer.ColumnarBufferChan <- cb - }() - - go func() { - // trigger context cancellation after send error occurs - <-stream.sendTriggeredChan - cancel() - }() - - handler. - On("ReadSplit", request.GetDataSourceInstance(), split, writer). - Return(nil).NotBefore(send1).Once() - - streamer, err := NewStreamer( - logger, - stream, - request, - split, - writerFactory, - handler, - ) + t.Run("scan error", func(t *testing.T) { + scanErr := fmt.Errorf("scan error") - require.NoError(t, err) - require.NotNil(t, streamer) + for _, tc := range testCases { + tc := tc + tc.scanErr = scanErr + t.Run(tc.name(), func(t *testing.T) { + tc.execute(t) + }) + } + }) - totalBytes, err := streamer.Run() - require.Error(t, err) - require.True(t, errors.Is(err, sendErr)) - require.Equal(t, totalBytes, uint64(0)) + t.Run("send error", func(t *testing.T) { + sendErr := fmt.Errorf("stream send error") - mock.AssertExpectationsForObjects(t, stream, writer, writerFactory, handler, cb) + for _, tc := range testCases { + tc := tc + tc.sendErr = sendErr + t.Run(tc.name(), func(t *testing.T) { + tc.execute(t) + }) + } }) } diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/arrow_helpers.go b/ydb/library/yql/providers/generic/connector/app/server/utils/arrow_helpers.go index 63a42cd401..ddda5f60c1 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/utils/arrow_helpers.go +++ b/ydb/library/yql/providers/generic/connector/app/server/utils/arrow_helpers.go @@ -115,6 +115,10 @@ func ydbTypeToArrowBuilder(typeID Ydb.Type_PrimitiveTypeId, arrowAllocator memor case Ydb.Type_DOUBLE: builder = array.NewFloat64Builder(arrowAllocator) case Ydb.Type_STRING: + // FIXME: YQ-2590: map it to arrow.BinaryTypes.Binary + // TODO: what about LargeBinary? + // https://arrow.apache.org/docs/cpp/api/datatype.html#_CPPv4N5arrow4Type4type12LARGE_BINARYE + // builder = array.NewBinaryBuilder(arrowAllocator, arrow.BinaryTypes.Binary) builder = array.NewStringBuilder(arrowAllocator) case Ydb.Type_UTF8: // TODO: what about LargeString? @@ -162,6 +166,10 @@ func ydbTypeToArrowField(typeID Ydb.Type_PrimitiveTypeId, column *Ydb.Column) (a case Ydb.Type_DOUBLE: field = arrow.Field{Name: column.Name, Type: arrow.PrimitiveTypes.Float64} case Ydb.Type_STRING: + // FIXME: YQ-2590: map it to arrow.BinaryTypes.Binary + // TODO: what about LargeBinary? + // https://arrow.apache.org/docs/cpp/api/datatype.html#_CPPv4N5arrow4Type4type12LARGE_BINARYE + // field = arrow.Field{Name: column.Name, Type: arrow.BinaryTypes.Binary} field = arrow.Field{Name: column.Name, Type: arrow.BinaryTypes.String} case Ydb.Type_UTF8: // TODO: what about LargeString? diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/errors.go b/ydb/library/yql/providers/generic/connector/app/server/utils/errors.go index 08d114ebaf..a7517a193e 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/utils/errors.go +++ b/ydb/library/yql/providers/generic/connector/app/server/utils/errors.go @@ -21,6 +21,7 @@ var ( ErrUnimplementedExpression = fmt.Errorf("unimplemented expression") ErrUnimplementedOperation = fmt.Errorf("unimplemented operation") ErrUnimplementedPredicateType = fmt.Errorf("unimplemented predicate type") + ErrInvariantViolation = fmt.Errorf("implementation error (invariant violation)") ) func NewSuccess() *api_service_protos.TError { diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/query_builder.go b/ydb/library/yql/providers/generic/connector/app/server/utils/query_builder.go deleted file mode 100644 index 5e8fe2a0be..0000000000 --- a/ydb/library/yql/providers/generic/connector/app/server/utils/query_builder.go +++ /dev/null @@ -1,11 +0,0 @@ -package utils - -import ( - "context" - - api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos" -) - -type QueryExecutor[CONN Connection] interface { - DescribeTable(ctx context.Context, conn CONN, request *api_service_protos.TDescribeTableRequest) (Rows, error) -} diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/connection_manager.go b/ydb/library/yql/providers/generic/connector/app/server/utils/sql.go index 624fe6984d..578602a650 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/utils/connection_manager.go +++ b/ydb/library/yql/providers/generic/connector/app/server/utils/sql.go @@ -5,6 +5,7 @@ import ( "github.com/ydb-platform/ydb/library/go/core/log" api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common" + api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos" ) type Connection interface { @@ -20,11 +21,15 @@ type Rows interface { MakeAcceptors() ([]any, error) } -type ConnectionManager[CONN any] interface { - Make(ctx context.Context, logger log.Logger, dataSourceInstance *api_common.TDataSourceInstance) (CONN, error) - Release(logger log.Logger, conn CONN) +type ConnectionManager interface { + Make(ctx context.Context, logger log.Logger, dataSourceInstance *api_common.TDataSourceInstance) (Connection, error) + Release(logger log.Logger, connection Connection) } type ConnectionManagerBase struct { QueryLoggerFactory QueryLoggerFactory } + +type QueryExecutor interface { + DescribeTable(ctx context.Context, conn Connection, request *api_service_protos.TDescribeTableRequest) (Rows, error) +} diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/sql_mock.go b/ydb/library/yql/providers/generic/connector/app/server/utils/sql_mock.go new file mode 100644 index 0000000000..7d9fd27800 --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/app/server/utils/sql_mock.go @@ -0,0 +1,90 @@ +package utils + +import ( + "context" + + "github.com/stretchr/testify/mock" + "github.com/ydb-platform/ydb/library/go/core/log" + api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common" +) + +var _ Connection = (*ConnectionMock)(nil) + +type ConnectionMock struct { + mock.Mock +} + +func (m *ConnectionMock) Query(ctx context.Context, query string, params ...any) (Rows, error) { + called := []any{query} + called = append(called, params...) + args := m.Called(called...) + + return args.Get(0).(Rows), args.Error(1) +} + +func (m *ConnectionMock) Close() error { + return m.Called().Error(0) +} + +type ConnectionManagerMock struct { + mock.Mock +} + +func (m *ConnectionManagerMock) Make( + ctx context.Context, + logger log.Logger, + dataSourceInstance *api_common.TDataSourceInstance) (Connection, error) { + args := m.Called(dataSourceInstance) + + return args.Get(0).(Connection), args.Error(1) +} + +func (m *ConnectionManagerMock) Release(logger log.Logger, conn Connection) { + m.Called(conn) +} + +var _ Rows = (*RowsMock)(nil) + +type RowsMock struct { + mock.Mock + PredefinedData [][]any + scanCalls int +} + +func (m *RowsMock) Close() error { + return m.Called().Error(0) +} + +func (m *RowsMock) Err() error { + return m.Called().Error(0) +} + +func (m *RowsMock) Next() bool { + return m.Called().Bool(0) +} + +func (m *RowsMock) Scan(dest ...any) error { + args := m.Called(dest...) + + // mutate acceptors by reference + row := m.PredefinedData[m.scanCalls] + + for i, d := range dest { + switch t := d.(type) { + case **int32: + **t = row[i].(int32) + case **string: + **t = row[i].(string) + } + } + + m.scanCalls++ + + return args.Error(0) +} + +func (m *RowsMock) MakeAcceptors() ([]any, error) { + args := m.Called() + + return args.Get(0).([]any), args.Error(1) +} diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/time_test.go b/ydb/library/yql/providers/generic/connector/app/server/utils/time_test.go new file mode 100644 index 0000000000..aac6cb8c9f --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/app/server/utils/time_test.go @@ -0,0 +1,147 @@ +package utils + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestTimeToYDBDate(t *testing.T) { + type testCase struct { + input time.Time + output uint16 + err error + } + + tcs := []testCase{ + { + input: time.Date(1970, 01, 01, 00, 00, 00, 00, time.UTC), + output: 0, + err: nil, + }, + { + input: time.Date(1970, 01, 02, 00, 00, 00, 00, time.UTC), + output: 1, + err: nil, + }, + { + input: time.Date(1969, 12, 31, 23, 59, 00, 00, time.UTC), + output: 0, + err: ErrValueOutOfTypeBounds, + }, + { + input: time.Date(9999, 01, 01, 00, 00, 00, 00, time.UTC), + output: 0, + err: ErrValueOutOfTypeBounds, + }, + } + + for _, tc := range tcs { + tc := tc + + t.Run(tc.input.String(), func(t *testing.T) { + output, err := TimeToYDBDate(&tc.input) + require.Equal(t, tc.output, output) + + if tc.err != nil { + require.True(t, errors.Is(tc.err, ErrValueOutOfTypeBounds)) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestTimeToYDBDatetime(t *testing.T) { + type testCase struct { + input time.Time + output uint32 + err error + } + + tcs := []testCase{ + { + input: time.Date(1970, 01, 01, 00, 00, 00, 00, time.UTC), + output: 0, + err: nil, + }, + { + input: time.Date(1970, 01, 02, 00, 00, 00, 00, time.UTC), + output: 86400, + err: nil, + }, + { + input: time.Date(1969, 12, 31, 23, 59, 00, 00, time.UTC), + output: 0, + err: ErrValueOutOfTypeBounds, + }, + { + input: time.Date(9999, 01, 01, 00, 00, 00, 00, time.UTC), + output: 0, + err: ErrValueOutOfTypeBounds, + }, + } + + for _, tc := range tcs { + tc := tc + + t.Run(tc.input.String(), func(t *testing.T) { + output, err := TimeToYDBDatetime(&tc.input) + require.Equal(t, tc.output, output) + + if tc.err != nil { + require.True(t, errors.Is(tc.err, ErrValueOutOfTypeBounds)) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestTimeToYDBTimestamp(t *testing.T) { + type testCase struct { + input time.Time + output uint64 + err error + } + + tcs := []testCase{ + { + input: time.Date(1970, 01, 01, 00, 00, 00, 00, time.UTC), + output: 0, + err: nil, + }, + { + input: time.Date(1970, 01, 02, 00, 00, 00, 00, time.UTC), + output: 86400000000, + err: nil, + }, + { + input: time.Date(1969, 12, 31, 23, 59, 00, 00, time.UTC), + output: 0, + err: ErrValueOutOfTypeBounds, + }, + { + input: time.Date(29427, 01, 01, 00, 00, 00, 00, time.UTC), + output: 0, + err: ErrValueOutOfTypeBounds, + }, + } + + for _, tc := range tcs { + tc := tc + + t.Run(tc.input.String(), func(t *testing.T) { + output, err := TimeToYDBTimestamp(&tc.input) + require.Equal(t, tc.output, output) + + if tc.err != nil { + require.True(t, errors.Is(tc.err, ErrValueOutOfTypeBounds)) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/type_mapper.go b/ydb/library/yql/providers/generic/connector/app/server/utils/type_mapper.go index 72b963b2d6..fd9cd88942 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/utils/type_mapper.go +++ b/ydb/library/yql/providers/generic/connector/app/server/utils/type_mapper.go @@ -8,6 +8,7 @@ import ( type TypeMapper interface { SQLTypeToYDBColumn(columnName, typeName string, rules *api_service_protos.TTypeMappingSettings) (*Ydb.Column, error) + AddRowToArrowIPCStreaming( ydbTypes []*Ydb.Type, acceptors []any, diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/unit_test_helpers.go b/ydb/library/yql/providers/generic/connector/app/server/utils/unit_test_helpers.go new file mode 100644 index 0000000000..ef47473088 --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/app/server/utils/unit_test_helpers.go @@ -0,0 +1,75 @@ +package utils + +import ( + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common" + api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos" +) + +func MakeTestSplit() *api_service_protos.TSplit { + return &api_service_protos.TSplit{ + Select: &api_service_protos.TSelect{ + DataSourceInstance: &api_common.TDataSourceInstance{}, + What: &api_service_protos.TSelect_TWhat{ + Items: []*api_service_protos.TSelect_TWhat_TItem{ + { + Payload: &api_service_protos.TSelect_TWhat_TItem_Column{ + Column: &Ydb.Column{ + Name: "col0", + Type: &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_INT32}}, + }, + }, + }, + { + Payload: &api_service_protos.TSelect_TWhat_TItem_Column{ + Column: &Ydb.Column{ + Name: "col1", + Type: &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_STRING}}, + }, + }, + }, + }, + }, + From: &api_service_protos.TSelect_TFrom{ + Table: "example_1", + }, + }, + } +} + +// DataConverter should be used only from unit tests +type DataConverter struct{} + +func (dc DataConverter) RowsToColumnBlocks(input [][]any, rowsPerBlock int) [][][]any { + var ( + totalColumns = len(input[0]) + results [][][]any + ) + + for i := 0; i < len(input); i += rowsPerBlock { + start := i + + end := start + rowsPerBlock + if end > len(input) { + end = len(input) + } + + result := dc.rowGroupToColumnBlock(input, totalColumns, start, end) + + results = append(results, result) + } + + return results +} + +func (dc DataConverter) rowGroupToColumnBlock(input [][]any, totalColumns, start, end int) [][]any { + columnarData := make([][]any, totalColumns) + + for columnID := range columnarData { + for rowID := range input[start:end] { + columnarData[columnID] = append(columnarData[columnID], input[rowID+start][columnID]) + } + } + + return columnarData +} diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/unit_test_helpers_test.go b/ydb/library/yql/providers/generic/connector/app/server/utils/unit_test_helpers_test.go new file mode 100644 index 0000000000..f08aaff5fd --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/app/server/utils/unit_test_helpers_test.go @@ -0,0 +1,72 @@ +package utils + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDataConverter(t *testing.T) { + dc := DataConverter{} + + type testCase struct { + src [][]any + dst [][][]any + rowsPerBlock int + } + + testCases := []testCase{ + { + src: [][]any{ + {int32(1), "a"}, + {int32(2), "b"}, + {int32(3), "c"}, + {int32(4), "d"}, + }, + dst: [][][]any{ + { + {int32(1), int32(2)}, + {"a", "b"}, + }, + { + {int32(3), int32(4)}, + {"c", "d"}, + }, + }, + rowsPerBlock: 2, + }, + { + src: [][]any{ + {int32(1), "a"}, + {int32(2), "b"}, + {int32(3), "c"}, + {int32(4), "d"}, + {int32(5), "e"}, + }, + dst: [][][]any{ + { + {int32(1), int32(2)}, + {"a", "b"}, + }, + { + {int32(3), int32(4)}, + {"c", "d"}, + }, + { + {int32(5)}, + {"e"}, + }, + }, + rowsPerBlock: 2, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(fmt.Sprintf("rowsPerRecord_%d", tc.rowsPerBlock), func(t *testing.T) { + actual := dc.RowsToColumnBlocks(tc.src, tc.rowsPerBlock) + require.Equal(t, tc.dst, actual) + }) + } +} diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/ut/ya.make b/ydb/library/yql/providers/generic/connector/app/server/utils/ut/ya.make new file mode 100644 index 0000000000..227f12138e --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/app/server/utils/ut/ya.make @@ -0,0 +1,5 @@ +GO_TEST_FOR(ydb/library/yql/providers/generic/connector/app/server/utils) + +SIZE(SMALL) + +END() diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/ya.make b/ydb/library/yql/providers/generic/connector/app/server/utils/ya.make index 5205e5a2d0..ccf51cffe0 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/utils/ya.make +++ b/ydb/library/yql/providers/generic/connector/app/server/utils/ya.make @@ -2,21 +2,27 @@ GO_LIBRARY() SRCS( arrow_helpers.go - connection_manager.go converters.go doc.go endpoint.go errors.go logger.go protobuf.go - query_builder.go select_helpers.go + sql.go + sql_mock.go time.go type_mapper.go + unit_test_helpers.go ) GO_TEST_SRCS( time_test.go + unit_test_helpers_test.go ) END() + +RECURSE_FOR_TESTS( + ut +) |