diff options
author | tsmax2004 <tsmax2004@yandex-team.com> | 2023-11-07 12:49:30 +0300 |
---|---|---|
committer | tsmax2004 <tsmax2004@yandex-team.com> | 2023-11-07 13:13:26 +0300 |
commit | 1996767eac487ceba44a68e57800ff88d8ed31bd (patch) | |
tree | cb872fd128f42739952f9bc73da939f7cf6ff89b | |
parent | 5739154a120fc5dea45dc20fbeec062b1ef79ac7 (diff) | |
download | ydb-1996767eac487ceba44a68e57800ff88d8ed31bd.tar.gz |
YQ Connector:native protocol for clickhouse
YQ Connector:native protocol for clickhouse
6 files changed, 24 insertions, 42 deletions
diff --git a/ydb/core/fq/libs/actors/clusters_from_connections.cpp b/ydb/core/fq/libs/actors/clusters_from_connections.cpp index 069239989c..a060b172ac 100644 --- a/ydb/core/fq/libs/actors/clusters_from_connections.cpp +++ b/ydb/core/fq/libs/actors/clusters_from_connections.cpp @@ -115,7 +115,7 @@ void FillGenericClusterConfigBase( // In YQv2 protocol can be configured via `CREATE EXTERNAL DATA SOURCE` params. switch (dataSourceKind) { case NYql::NConnector::NApi::CLICKHOUSE: - clusterCfg.SetProtocol(NYql::NConnector::NApi::EProtocol::HTTP); + clusterCfg.SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE); break; case NYql::NConnector::NApi::POSTGRESQL: clusterCfg.SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE); diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp index 8c2cad260c..53dc998552 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp @@ -184,7 +184,7 @@ TString MakeCreateExternalDataSourceQuery( SOURCE_TYPE="ClickHouse", MDB_CLUSTER_ID={mdb_cluster_id}, DATABASE_NAME={database_name}, - PROTOCOL="HTTP", + PROTOCOL="NATIVE", USE_TLS="true" )", "mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().clickhouse_cluster().database_id(), '"'), diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.cpp b/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.cpp index c1c741f357..22f51caef5 100644 --- a/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.cpp +++ b/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.cpp @@ -8,9 +8,8 @@ namespace NFq { using TEndpoint = NYql::IMdbEndpointGenerator::TEndpoint; - // Currently we're going to use only HTTP protocol for ClickHouse - constexpr ui32 CLICKHOUSE_SECURE_PORT = 8443; - constexpr ui32 CLICKHOUSE_INSECURE_PORT = 8123; + constexpr ui32 CLICKHOUSE_SECURE_PORT = 9440; + constexpr ui32 CLICKHOUSE_INSECURE_PORT = 9000; // Managed PostgreSQL provides the only port both for secure and insecure connections constexpr ui32 POSTGRESQL_PORT = 6432; diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/ut/mdb_endpoint_generator_ut.cpp b/ydb/core/fq/libs/db_id_async_resolver_impl/ut/mdb_endpoint_generator_ut.cpp index 3bd862a7e5..0985c6ee22 100644 --- a/ydb/core/fq/libs/db_id_async_resolver_impl/ut/mdb_endpoint_generator_ut.cpp +++ b/ydb/core/fq/libs/db_id_async_resolver_impl/ut/mdb_endpoint_generator_ut.cpp @@ -9,11 +9,11 @@ Y_UNIT_TEST_SUITE(MdbEndpoingGenerator) { UNIT_ASSERT_VALUES_EQUAL( transformer->ToEndpoint(NYql::EDatabaseType::ClickHouse, "rc1c-p5waby2y5y1kb5ue.db.yandex.net", true), - TEndpoint("rc1c-p5waby2y5y1kb5ue.db.yandex.net", 8443)); + TEndpoint("rc1c-p5waby2y5y1kb5ue.db.yandex.net", 9440)); UNIT_ASSERT_VALUES_EQUAL( transformer->ToEndpoint(NYql::EDatabaseType::ClickHouse, "ya.ru", false), - TEndpoint("ya.db.yandex.net", 8123)); + TEndpoint("ya.db.yandex.net", 9000)); } Y_UNIT_TEST(Generic_NoTransformHost) { @@ -21,7 +21,7 @@ Y_UNIT_TEST_SUITE(MdbEndpoingGenerator) { UNIT_ASSERT_VALUES_EQUAL( transformer->ToEndpoint(NYql::EDatabaseType::ClickHouse, "rc1a-d6dv17lv47v5mcop.mdb.yandexcloud.net", true), - TEndpoint("rc1a-d6dv17lv47v5mcop.mdb.yandexcloud.net", 8443)); + TEndpoint("rc1a-d6dv17lv47v5mcop.mdb.yandexcloud.net", 9440)); UNIT_ASSERT_VALUES_EQUAL( transformer->ToEndpoint(NYql::EDatabaseType::PostgreSQL, "rc1b-eyt6dtobu96rwydq.mdb.yandexcloud.net", false), @@ -33,7 +33,7 @@ Y_UNIT_TEST_SUITE(MdbEndpoingGenerator) { UNIT_ASSERT_VALUES_EQUAL( transformer->ToEndpoint(NYql::EDatabaseType::ClickHouse, "rc1a-d6dv17lv47v5mcop.mdb.yandexcloud.net", false), - TEndpoint("rc1a-d6dv17lv47v5mcop.db.yandex.net", 8123)); + TEndpoint("rc1a-d6dv17lv47v5mcop.db.yandex.net", 9000)); UNIT_ASSERT_VALUES_EQUAL( transformer->ToEndpoint(NYql::EDatabaseType::PostgreSQL, "rc1b-eyt6dtobu96rwydq.mdb.yandexcloud.net", true), diff --git a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go index b61cb0378f..0b9d9e95af 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go +++ b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go @@ -5,7 +5,6 @@ import ( "crypto/tls" "database/sql" "fmt" - "net" "time" "github.com/ClickHouse/clickhouse-go/v2" @@ -76,8 +75,14 @@ func (c *connectionManager) Make( return nil, fmt.Errorf("currently only basic auth is supported") } - if dsi.Protocol != api_common.EProtocol_HTTP { - // FIXME: fix NATIVE protocol in https://st.yandex-team.ru/YQ-2286 + var protocol clickhouse.Protocol + + switch dsi.Protocol { + case api_common.EProtocol_NATIVE: + protocol = clickhouse.Native + case api_common.EProtocol_HTTP: + protocol = clickhouse.HTTP + default: return nil, fmt.Errorf("can not run ClickHouse connection with protocol '%v'", dsi.Protocol) } @@ -88,11 +93,6 @@ func (c *connectionManager) Make( Username: dsi.Credentials.GetBasic().Username, Password: dsi.Credentials.GetBasic().Password, }, - DialContext: func(ctx context.Context, addr string) (net.Conn, error) { - var d net.Dialer - - return d.DialContext(ctx, "tcp", addr) - }, Debug: true, Debugf: func(format string, v ...any) { logger.Debugf(format, v...) @@ -100,7 +100,7 @@ func (c *connectionManager) Make( Compression: &clickhouse.Compression{ Method: clickhouse.CompressionLZ4, }, - Protocol: clickhouse.HTTP, + Protocol: protocol, } if dsi.UseTls { @@ -109,26 +109,20 @@ func (c *connectionManager) Make( } } - // FIXME: uncomment after YQ-2286 - // conn, err := clickhouse.Open(opts) - // if err != nil { - // return nil, fmt.Errorf("open connection: %w", err) - // } - conn := clickhouse.OpenDB(opts) - if err := conn.Ping(); err != nil { return nil, fmt.Errorf("conn ping: %w", err) } const ( - maxIdleConns = 5 - maxOpenConns = 10 + maxIdleConns = 5 + maxOpenConns = 10 + connMaxLifetime = time.Hour ) conn.SetMaxIdleConns(maxIdleConns) conn.SetMaxOpenConns(maxOpenConns) - conn.SetConnMaxLifetime(time.Hour) + conn.SetConnMaxLifetime(connMaxLifetime) queryLogger := c.QueryLoggerFactory.Make(logger) diff --git a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go index 3bc595bdd8..2f9071bc65 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go +++ b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/ydb-platform/ydb/library/go/core/log" "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils" api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos" ) @@ -12,12 +11,8 @@ import ( type queryExecutor struct { } -func (qm queryExecutor) DescribeTable( - ctx context.Context, - conn *Connection, - request *api_service_protos.TDescribeTableRequest, -) (utils.Rows, error) { - out, err := conn.QueryContext( +func (qm queryExecutor) DescribeTable(ctx context.Context, conn *Connection, request *api_service_protos.TDescribeTableRequest) (utils.Rows, error) { + out, err := conn.Query( ctx, "SELECT name, type FROM system.columns WHERE table = ? and database = ?", request.Table, @@ -29,16 +24,10 @@ func (qm queryExecutor) DescribeTable( } if err := out.Err(); err != nil { - defer func() { - if err := out.Close(); err != nil { - conn.logger.Error("close rows", log.Error(err)) - } - }() - return nil, fmt.Errorf("rows err: %w", err) } - return rows{Rows: out}, nil + return out, nil } func NewQueryExecutor() utils.QueryExecutor[*Connection] { |