aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-09-19 18:04:38 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-09-19 18:25:46 +0300
commitbcefefa702db0dacd409df4b94f3374d3ea13d56 (patch)
treefdd5ad2acba5ec56bfe3ecdc7e26b2b93318e76a
parentac07955c16a54731f7e64ae893d2306a6e6b11c9 (diff)
downloadydb-bcefefa702db0dacd409df4b94f3374d3ea13d56.tar.gz
YQ Connector: fix potential SQL injection in DescribeTable
1. Отрефакторен код создания PG conn string (патч от @axlodin). 2. Используем prepared statements при запросе метаданных таблицы. Для этого пришлось слегка поменять внутренние интерфейсы и добавить дженериков.
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go17
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_builder.go22
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go31
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/clickhouse/type_mapper.go2
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/clickhouse/ya.make2
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/postgresql/connection_manager.go47
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/postgresql/query_builder.go22
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/postgresql/query_executor.go30
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/postgresql/ya.make2
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go51
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/rdbms/handler_factory.go52
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/rdbms/query_builder.go9
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/rdbms/ya.make3
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/server.go24
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/connection_manager.go (renamed from ydb/library/yql/providers/generic/connector/app/server/rdbms/connection_manager.go)8
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/query_builder.go11
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/ya.make4
17 files changed, 211 insertions, 126 deletions
diff --git a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go
index df47895d501..5cdbf69db7b 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go
@@ -11,21 +11,20 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ydb-platform/ydb/library/go/core/log"
api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common"
- "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/rdbms"
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
)
-var _ rdbms.Connection = (*connection)(nil)
+var _ utils.Connection = (*Connection)(nil)
-type connection struct {
+type Connection struct {
*sql.DB
}
-func (c connection) Query(ctx context.Context, query string, args ...any) (rdbms.Rows, error) {
+func (c Connection) Query(ctx context.Context, query string, args ...any) (utils.Rows, error) {
return c.DB.QueryContext(ctx, query, args...)
}
-var _ rdbms.ConnectionManager = (*connectionManager)(nil)
+var _ utils.ConnectionManager[*Connection] = (*connectionManager)(nil)
type connectionManager struct {
// TODO: cache of connections, remove unused connections with TTL
@@ -35,7 +34,7 @@ func (c *connectionManager) Make(
ctx context.Context,
logger log.Logger,
dsi *api_common.TDataSourceInstance,
-) (rdbms.Connection, error) {
+) (*Connection, error) {
if dsi.GetCredentials().GetBasic() == nil {
return nil, fmt.Errorf("currently only basic auth is supported")
}
@@ -93,13 +92,13 @@ func (c *connectionManager) Make(
conn.SetMaxOpenConns(maxOpenConns)
conn.SetConnMaxLifetime(time.Hour)
- return &connection{DB: conn}, nil
+ return &Connection{DB: conn}, nil
}
-func (c *connectionManager) Release(logger log.Logger, conn rdbms.Connection) {
+func (c *connectionManager) Release(logger log.Logger, conn *Connection) {
utils.LogCloserError(logger, conn, "close clickhouse connection")
}
-func NewConnectionManager() rdbms.ConnectionManager {
+func NewConnectionManager() utils.ConnectionManager[*Connection] {
return &connectionManager{}
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_builder.go b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_builder.go
deleted file mode 100644
index c55d7058360..00000000000
--- a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_builder.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package clickhouse
-
-import (
- "fmt"
-
- "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/rdbms"
- api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
-)
-
-var _ rdbms.QueryBuilder = (*queryBuilder)(nil)
-
-type queryBuilder struct {
-}
-
-func (qb queryBuilder) DescribeTable(request *api_service_protos.TDescribeTableRequest) string {
- return fmt.Sprintf(
- "SELECT name, type FROM system.columns WHERE table = '%s' and database ='%s'",
- request.Table,
- request.DataSourceInstance.Database)
-}
-
-func NewQueryBuilder() rdbms.QueryBuilder { return queryBuilder{} }
diff --git a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go
new file mode 100644
index 00000000000..f15f8176f39
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go
@@ -0,0 +1,31 @@
+package clickhouse
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
+ api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
+)
+
+type queryExecutor struct {
+}
+
+func (qm queryExecutor) DescribeTable(ctx context.Context, conn *Connection, request *api_service_protos.TDescribeTableRequest) (utils.Rows, error) {
+ out, err := conn.QueryContext(
+ ctx,
+ "SELECT name, type FROM system.columns WHERE table = ? and database = ?",
+ request.Table,
+ request.DataSourceInstance.Database,
+ )
+
+ if err != nil {
+ return nil, fmt.Errorf("query: %w", err)
+ }
+
+ return out, nil
+}
+
+func NewQueryExecutor() utils.QueryExecutor[*Connection] {
+ return queryExecutor{}
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/type_mapper.go b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/type_mapper.go
index 93988edbd7f..42fa693d65c 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/type_mapper.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/type_mapper.go
@@ -262,7 +262,7 @@ func (typeMapper) appendValueToBuilder(
func NewTypeMapper() utils.TypeMapper {
return typeMapper{
isFixedString: regexp.MustCompile(`FixedString\([0-9]+\)`),
- isDateTime64: regexp.MustCompile(`DateTime64\(\d\)`),
+ isDateTime64: regexp.MustCompile(`DateTime64\(\d{1}\)`),
isNullable: regexp.MustCompile(`Nullable\((?P<Internal>\w+)\)`),
}
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/ya.make b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/ya.make
index 314fa142ddb..a04c18a0aae 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/ya.make
+++ b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/ya.make
@@ -2,7 +2,7 @@ GO_LIBRARY()
SRCS(
connection_manager.go
- query_builder.go
+ query_executor.go
type_mapper.go
)
diff --git a/ydb/library/yql/providers/generic/connector/app/server/postgresql/connection_manager.go b/ydb/library/yql/providers/generic/connector/app/server/postgresql/connection_manager.go
index 411b4af24da..2e18f4aebb6 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/postgresql/connection_manager.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/postgresql/connection_manager.go
@@ -5,14 +5,14 @@ import (
"fmt"
"github.com/jackc/pgx/v5"
+ "github.com/jackc/pgx/v5/pgconn"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/ydb-platform/ydb/library/go/core/log"
api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common"
- "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/rdbms"
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
)
-var _ rdbms.Connection = (*connection)(nil)
+var _ utils.Connection = (*Connection)(nil)
type rows struct {
pgx.Rows
@@ -23,20 +23,20 @@ func (r rows) Close() error {
return nil
}
-type connection struct {
+type Connection struct {
*pgx.Conn
}
-func (c connection) Close() error {
+func (c Connection) Close() error {
return c.Conn.Close(context.TODO())
}
-func (c connection) Query(ctx context.Context, query string, args ...any) (rdbms.Rows, error) {
+func (c Connection) Query(ctx context.Context, query string, args ...any) (utils.Rows, error) {
out, err := c.Conn.Query(ctx, query, args...)
return rows{Rows: out}, err
}
-var _ rdbms.ConnectionManager = (*connectionManager)(nil)
+var _ utils.ConnectionManager[*Connection] = (*connectionManager)(nil)
type connectionManager struct {
// TODO: cache of connections, remove unused connections with TTL
@@ -46,41 +46,48 @@ func (c *connectionManager) Make(
ctx context.Context,
_ log.Logger,
dsi *api_common.TDataSourceInstance,
-) (rdbms.Connection, error) {
+) (*Connection, error) {
if dsi.GetCredentials().GetBasic() == nil {
return nil, fmt.Errorf("currently only basic auth is supported")
}
if dsi.Protocol != api_common.EProtocol_NATIVE {
- return nil, fmt.Errorf("can not run PostgreSQL connection with protocol '%v'", dsi.Protocol)
+ return nil, fmt.Errorf("can not create PostgreSQL connection with protocol '%v'", dsi.Protocol)
}
- connStr := fmt.Sprintf("dbname=%s user=%s password=%s host=%s port=%d",
- dsi.Database,
- dsi.Credentials.GetBasic().Username,
- dsi.Credentials.GetBasic().Password,
- dsi.GetEndpoint().GetHost(),
- dsi.GetEndpoint().GetPort(),
- )
+ if socketType, _ := pgconn.NetworkAddress(dsi.GetEndpoint().GetHost(), uint16(dsi.GetEndpoint().GetPort())); socketType != "tcp" {
+ return nil, fmt.Errorf("can not create PostgreSQL connection with socket type '%s'", socketType)
+ }
+ connStr := "dbname=DBNAME user=USER password=PASSWORD host=HOST port=5432"
if dsi.UseTls {
connStr += " sslmode=verify-full"
} else {
connStr += " sslmode=disable"
}
- conn, err := pgx.Connect(ctx, connStr)
+ connCfg, err := pgx.ParseConfig(connStr)
+ if err != nil {
+ return nil, fmt.Errorf("parse connection config template: %w", err)
+ }
+ connCfg.Database = dsi.Database
+ connCfg.Host = dsi.GetEndpoint().GetHost()
+ connCfg.Port = uint16(dsi.GetEndpoint().GetPort())
+ connCfg.User = dsi.Credentials.GetBasic().GetUsername()
+ connCfg.Password = dsi.Credentials.GetBasic().GetPassword()
+
+ conn, err := pgx.ConnectConfig(ctx, connCfg)
if err != nil {
return nil, fmt.Errorf("open connection: %w", err)
}
- return &connection{Conn: conn}, nil
+ return &Connection{conn}, nil
}
-func (c *connectionManager) Release(logger log.Logger, conn rdbms.Connection) {
- utils.LogCloserError(logger, conn, "close connection to PostgreSQL")
+func (c *connectionManager) Release(logger log.Logger, conn *Connection) {
+ utils.LogCloserError(logger, conn, "close posgresql connection")
}
-func NewConnectionManager() rdbms.ConnectionManager {
+func NewConnectionManager() utils.ConnectionManager[*Connection] {
return &connectionManager{}
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/postgresql/query_builder.go b/ydb/library/yql/providers/generic/connector/app/server/postgresql/query_builder.go
deleted file mode 100644
index c7dbdfdcdab..00000000000
--- a/ydb/library/yql/providers/generic/connector/app/server/postgresql/query_builder.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package postgresql
-
-import (
- "fmt"
-
- "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/rdbms"
- api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
-)
-
-var _ rdbms.QueryBuilder = (*queryBuilder)(nil)
-
-type queryBuilder struct {
-}
-
-func (qb queryBuilder) DescribeTable(request *api_service_protos.TDescribeTableRequest) string {
- return fmt.Sprintf(
- // TODO: is hardconing schema correct?
- "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '%s' AND table_schema ='public'",
- request.Table)
-}
-
-func NewQueryBuilder() rdbms.QueryBuilder { return queryBuilder{} }
diff --git a/ydb/library/yql/providers/generic/connector/app/server/postgresql/query_executor.go b/ydb/library/yql/providers/generic/connector/app/server/postgresql/query_executor.go
new file mode 100644
index 00000000000..c61292f25dd
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/postgresql/query_executor.go
@@ -0,0 +1,30 @@
+package postgresql
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
+ api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
+)
+
+type queryExecutor struct {
+}
+
+func (qm queryExecutor) DescribeTable(ctx context.Context, conn *Connection, request *api_service_protos.TDescribeTableRequest) (utils.Rows, error) {
+ out, err := conn.Query(
+ ctx,
+ "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = $1 AND table_schema ='public'",
+ request.Table,
+ )
+
+ if err != nil {
+ return nil, fmt.Errorf("connection query: %w", err)
+ }
+
+ return out, nil
+}
+
+func NewQueryExecutor() utils.QueryExecutor[*Connection] {
+ return queryExecutor{}
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/postgresql/ya.make b/ydb/library/yql/providers/generic/connector/app/server/postgresql/ya.make
index 314fa142ddb..a04c18a0aae 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/postgresql/ya.make
+++ b/ydb/library/yql/providers/generic/connector/app/server/postgresql/ya.make
@@ -2,7 +2,7 @@ GO_LIBRARY()
SRCS(
connection_manager.go
- query_builder.go
+ query_executor.go
type_mapper.go
)
diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go
index 1792cfbe855..4e571031051 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go
@@ -11,20 +11,36 @@ import (
api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
)
-type Handler struct {
+type Handler interface {
+ DescribeTable(
+ ctx context.Context,
+ logger log.Logger,
+ request *api_service_protos.TDescribeTableRequest,
+ ) (*api_service_protos.TDescribeTableResponse, error)
+
+ ReadSplit(
+ ctx context.Context,
+ logger log.Logger,
+ dataSourceInstance *api_common.TDataSourceInstance,
+ split *api_service_protos.TSplit,
+ pagingWriter *utils.PagingWriter,
+ ) error
+
+ TypeMapper() utils.TypeMapper
+}
+
+type handlerImpl[CONN utils.Connection] struct {
typeMapper utils.TypeMapper
- queryBuilder QueryBuilder
- connectionManager ConnectionManager
+ queryBuilder utils.QueryExecutor[CONN]
+ connectionManager utils.ConnectionManager[CONN]
logger log.Logger
}
-func (h *Handler) DescribeTable(
+func (h *handlerImpl[CONN]) DescribeTable(
ctx context.Context,
logger log.Logger,
request *api_service_protos.TDescribeTableRequest,
) (*api_service_protos.TDescribeTableResponse, error) {
- query := h.queryBuilder.DescribeTable(request)
-
conn, err := h.connectionManager.Make(ctx, logger, request.DataSourceInstance)
if err != nil {
return nil, fmt.Errorf("make connection: %w", err)
@@ -32,13 +48,12 @@ func (h *Handler) DescribeTable(
defer h.connectionManager.Release(logger, conn)
- logger.Debug("execute query", log.String("query", query))
-
- rows, err := conn.Query(ctx, query)
+ rows, err := h.queryBuilder.DescribeTable(ctx, conn, request)
if err != nil {
- return nil, fmt.Errorf("query '%s' error: %w", query, err)
+ return nil, fmt.Errorf("query builder error: %w", err)
}
+ // logger.Debug("execute query", log.String("query", query))
defer func() { utils.LogCloserError(logger, rows, "close rows") }()
var (
@@ -71,7 +86,7 @@ func (h *Handler) DescribeTable(
return &api_service_protos.TDescribeTableResponse{Schema: &schema}, nil
}
-func (h *Handler) ReadSplit(
+func (h *handlerImpl[CONN]) ReadSplit(
ctx context.Context,
logger log.Logger,
dataSourceInstance *api_common.TDataSourceInstance,
@@ -156,8 +171,16 @@ func (h *Handler) ReadSplit(
return nil
}
-func (h *Handler) TypeMapper() utils.TypeMapper { return h.typeMapper }
+func (h *handlerImpl[CONN]) TypeMapper() utils.TypeMapper { return h.typeMapper }
-func NewHandler(logger log.Logger, queryBuilder QueryBuilder, connectionManager ConnectionManager, typeMapper utils.TypeMapper) *Handler {
- return &Handler{logger: logger, queryBuilder: queryBuilder, connectionManager: connectionManager, typeMapper: typeMapper}
+func newHandler[CONN utils.Connection](
+ logger log.Logger,
+ preset *preset[CONN],
+) Handler {
+ return &handlerImpl[CONN]{
+ logger: logger,
+ queryBuilder: preset.queryExecutor,
+ connectionManager: preset.connectionManager,
+ typeMapper: preset.typeMapper,
+ }
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler_factory.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler_factory.go
new file mode 100644
index 00000000000..e7d03426b15
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler_factory.go
@@ -0,0 +1,52 @@
+package rdbms
+
+import (
+ "fmt"
+
+ "github.com/ydb-platform/ydb/library/go/core/log"
+ api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/clickhouse"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/postgresql"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
+)
+
+type preset[CONN utils.Connection] struct {
+ queryExecutor utils.QueryExecutor[CONN]
+ connectionManager utils.ConnectionManager[CONN]
+ typeMapper utils.TypeMapper
+}
+
+type HandlerFactory struct {
+ clickhouse preset[*clickhouse.Connection]
+ postgresql preset[*postgresql.Connection]
+}
+
+func (hf *HandlerFactory) Make(
+ logger log.Logger,
+ dataSourceType api_common.EDataSourceKind,
+) (Handler, error) {
+
+ switch dataSourceType {
+ case api_common.EDataSourceKind_CLICKHOUSE:
+ return newHandler[*clickhouse.Connection](logger, &hf.clickhouse), nil
+ case api_common.EDataSourceKind_POSTGRESQL:
+ return newHandler[*postgresql.Connection](logger, &hf.postgresql), nil
+ default:
+ return nil, fmt.Errorf("pick handler for data source type '%v': %w", dataSourceType, utils.ErrDataSourceNotSupported)
+ }
+}
+
+func NewHandlerFactory() *HandlerFactory {
+ return &HandlerFactory{
+ clickhouse: preset[*clickhouse.Connection]{
+ queryExecutor: clickhouse.NewQueryExecutor(),
+ connectionManager: clickhouse.NewConnectionManager(),
+ typeMapper: clickhouse.NewTypeMapper(),
+ },
+ postgresql: preset[*postgresql.Connection]{
+ queryExecutor: postgresql.NewQueryExecutor(),
+ connectionManager: postgresql.NewConnectionManager(),
+ typeMapper: postgresql.NewTypeMapper(),
+ },
+ }
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/query_builder.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/query_builder.go
deleted file mode 100644
index 6aa6e7991b9..00000000000
--- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/query_builder.go
+++ /dev/null
@@ -1,9 +0,0 @@
-package rdbms
-
-import (
- api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
-)
-
-type QueryBuilder interface {
- DescribeTable(request *api_service_protos.TDescribeTableRequest) string
-}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/ya.make b/ydb/library/yql/providers/generic/connector/app/server/rdbms/ya.make
index f73218d66d5..b89c13e8570 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/ya.make
+++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/ya.make
@@ -1,9 +1,8 @@
GO_LIBRARY()
SRCS(
- connection_manager.go
handler.go
- query_builder.go
+ handler_factory.go
)
END()
diff --git a/ydb/library/yql/providers/generic/connector/app/server/server.go b/ydb/library/yql/providers/generic/connector/app/server/server.go
index 85f88108b9b..30deb25ea07 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/server.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/server.go
@@ -8,10 +8,7 @@ import (
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/spf13/cobra"
"github.com/ydb-platform/ydb/library/go/core/log"
- api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common"
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/config"
- "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/clickhouse"
- "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/postgresql"
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/rdbms"
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
api_service "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service"
@@ -22,7 +19,7 @@ import (
type Server struct {
api_service.UnimplementedConnectorServer
- handlers map[api_common.EDataSourceKind]*rdbms.Handler
+ handlerFactory *rdbms.HandlerFactory
columnarBufferFactory *utils.ColumnarBufferFactory
cfg *config.ServerConfig
logger log.Logger
@@ -47,7 +44,7 @@ func (s *Server) DescribeTable(
}, nil
}
- handler, err := s.getHandler(request.DataSourceInstance.Kind)
+ handler, err := s.handlerFactory.Make(logger, request.DataSourceInstance.Kind)
if err != nil {
logger.Error("request handling failed", log.Error(err))
@@ -181,7 +178,7 @@ func (s *Server) readSplit(
) error {
logger.Debug("reading split", log.String("split", split.String()))
- handler, err := s.getHandler(request.DataSourceInstance.Kind)
+ handler, err := s.handlerFactory.Make(logger, request.DataSourceInstance.Kind)
if err != nil {
return fmt.Errorf("get handler: %w", err)
}
@@ -261,25 +258,12 @@ func (s *Server) makeOptions() ([]grpc.ServerOption, error) {
return opts, nil
}
-func (s *Server) getHandler(dataSourceType api_common.EDataSourceKind) (*rdbms.Handler, error) {
- if h, ok := s.handlers[dataSourceType]; ok {
- return h, nil
- }
-
- return nil, fmt.Errorf("pick handler for data source type '%v': %w", dataSourceType, utils.ErrDataSourceNotSupported)
-}
-
func newServer(
logger log.Logger,
cfg *config.ServerConfig,
) (*Server, error) {
return &Server{
- handlers: map[api_common.EDataSourceKind]*rdbms.Handler{
- api_common.EDataSourceKind_CLICKHOUSE: rdbms.NewHandler(
- logger, clickhouse.NewQueryBuilder(), clickhouse.NewConnectionManager(), clickhouse.NewTypeMapper()),
- api_common.EDataSourceKind_POSTGRESQL: rdbms.NewHandler(
- logger, postgresql.NewQueryBuilder(), postgresql.NewConnectionManager(), postgresql.NewTypeMapper()),
- },
+ handlerFactory: rdbms.NewHandlerFactory(),
columnarBufferFactory: utils.NewColumnarBufferFactory(
memory.DefaultAllocator,
utils.NewReadLimiterFactory(cfg.ReadLimit),
diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/connection_manager.go b/ydb/library/yql/providers/generic/connector/app/server/utils/connection_manager.go
index 7327322f0f6..46738960b8b 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/connection_manager.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/connection_manager.go
@@ -1,4 +1,4 @@
-package rdbms
+package utils
import (
"context"
@@ -19,7 +19,7 @@ type Rows interface {
Scan(dest ...any) error
}
-type ConnectionManager interface {
- Make(ctx context.Context, logger log.Logger, dataSourceInstance *api_common.TDataSourceInstance) (Connection, error)
- Release(logger log.Logger, conn Connection)
+type ConnectionManager[CONN any] interface {
+ Make(ctx context.Context, logger log.Logger, dataSourceInstance *api_common.TDataSourceInstance) (CONN, error)
+ Release(logger log.Logger, conn CONN)
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/query_builder.go b/ydb/library/yql/providers/generic/connector/app/server/utils/query_builder.go
new file mode 100644
index 00000000000..5e8fe2a0be7
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/query_builder.go
@@ -0,0 +1,11 @@
+package utils
+
+import (
+ "context"
+
+ api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
+)
+
+type QueryExecutor[CONN Connection] interface {
+ DescribeTable(ctx context.Context, conn CONN, request *api_service_protos.TDescribeTableRequest) (Rows, error)
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/ya.make b/ydb/library/yql/providers/generic/connector/app/server/utils/ya.make
index 1b9a99f962f..0e57385587d 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/utils/ya.make
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/ya.make
@@ -4,12 +4,14 @@ SRCS(
arrow_helpers.go
columnar_buffer_arrow_ipc_streaming.go
columnar_buffer_factory.go
+ connection_manager.go
converters.go
- errors.go
endpoint.go
+ errors.go
logger.go
paging_writer.go
protobuf.go
+ query_builder.go
read_limiter.go
select_helpers.go
time.go