aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortsmax2004 <tsmax2004@yandex-team.com>2023-11-07 12:49:30 +0300
committertsmax2004 <tsmax2004@yandex-team.com>2023-11-07 13:13:26 +0300
commit1996767eac487ceba44a68e57800ff88d8ed31bd (patch)
treecb872fd128f42739952f9bc73da939f7cf6ff89b
parent5739154a120fc5dea45dc20fbeec062b1ef79ac7 (diff)
downloadydb-1996767eac487ceba44a68e57800ff88d8ed31bd.tar.gz
YQ Connector:native protocol for clickhouse
YQ Connector:native protocol for clickhouse
-rw-r--r--ydb/core/fq/libs/actors/clusters_from_connections.cpp2
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp2
-rw-r--r--ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.cpp5
-rw-r--r--ydb/core/fq/libs/db_id_async_resolver_impl/ut/mdb_endpoint_generator_ut.cpp8
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go32
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go17
6 files changed, 24 insertions, 42 deletions
diff --git a/ydb/core/fq/libs/actors/clusters_from_connections.cpp b/ydb/core/fq/libs/actors/clusters_from_connections.cpp
index 069239989c..a060b172ac 100644
--- a/ydb/core/fq/libs/actors/clusters_from_connections.cpp
+++ b/ydb/core/fq/libs/actors/clusters_from_connections.cpp
@@ -115,7 +115,7 @@ void FillGenericClusterConfigBase(
// In YQv2 protocol can be configured via `CREATE EXTERNAL DATA SOURCE` params.
switch (dataSourceKind) {
case NYql::NConnector::NApi::CLICKHOUSE:
- clusterCfg.SetProtocol(NYql::NConnector::NApi::EProtocol::HTTP);
+ clusterCfg.SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE);
break;
case NYql::NConnector::NApi::POSTGRESQL:
clusterCfg.SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE);
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
index 8c2cad260c..53dc998552 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
@@ -184,7 +184,7 @@ TString MakeCreateExternalDataSourceQuery(
SOURCE_TYPE="ClickHouse",
MDB_CLUSTER_ID={mdb_cluster_id},
DATABASE_NAME={database_name},
- PROTOCOL="HTTP",
+ PROTOCOL="NATIVE",
USE_TLS="true"
)",
"mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().clickhouse_cluster().database_id(), '"'),
diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.cpp b/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.cpp
index c1c741f357..22f51caef5 100644
--- a/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.cpp
+++ b/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.cpp
@@ -8,9 +8,8 @@ namespace NFq {
using TEndpoint = NYql::IMdbEndpointGenerator::TEndpoint;
- // Currently we're going to use only HTTP protocol for ClickHouse
- constexpr ui32 CLICKHOUSE_SECURE_PORT = 8443;
- constexpr ui32 CLICKHOUSE_INSECURE_PORT = 8123;
+ constexpr ui32 CLICKHOUSE_SECURE_PORT = 9440;
+ constexpr ui32 CLICKHOUSE_INSECURE_PORT = 9000;
// Managed PostgreSQL provides the only port both for secure and insecure connections
constexpr ui32 POSTGRESQL_PORT = 6432;
diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/ut/mdb_endpoint_generator_ut.cpp b/ydb/core/fq/libs/db_id_async_resolver_impl/ut/mdb_endpoint_generator_ut.cpp
index 3bd862a7e5..0985c6ee22 100644
--- a/ydb/core/fq/libs/db_id_async_resolver_impl/ut/mdb_endpoint_generator_ut.cpp
+++ b/ydb/core/fq/libs/db_id_async_resolver_impl/ut/mdb_endpoint_generator_ut.cpp
@@ -9,11 +9,11 @@ Y_UNIT_TEST_SUITE(MdbEndpoingGenerator) {
UNIT_ASSERT_VALUES_EQUAL(
transformer->ToEndpoint(NYql::EDatabaseType::ClickHouse, "rc1c-p5waby2y5y1kb5ue.db.yandex.net", true),
- TEndpoint("rc1c-p5waby2y5y1kb5ue.db.yandex.net", 8443));
+ TEndpoint("rc1c-p5waby2y5y1kb5ue.db.yandex.net", 9440));
UNIT_ASSERT_VALUES_EQUAL(
transformer->ToEndpoint(NYql::EDatabaseType::ClickHouse, "ya.ru", false),
- TEndpoint("ya.db.yandex.net", 8123));
+ TEndpoint("ya.db.yandex.net", 9000));
}
Y_UNIT_TEST(Generic_NoTransformHost) {
@@ -21,7 +21,7 @@ Y_UNIT_TEST_SUITE(MdbEndpoingGenerator) {
UNIT_ASSERT_VALUES_EQUAL(
transformer->ToEndpoint(NYql::EDatabaseType::ClickHouse, "rc1a-d6dv17lv47v5mcop.mdb.yandexcloud.net", true),
- TEndpoint("rc1a-d6dv17lv47v5mcop.mdb.yandexcloud.net", 8443));
+ TEndpoint("rc1a-d6dv17lv47v5mcop.mdb.yandexcloud.net", 9440));
UNIT_ASSERT_VALUES_EQUAL(
transformer->ToEndpoint(NYql::EDatabaseType::PostgreSQL, "rc1b-eyt6dtobu96rwydq.mdb.yandexcloud.net", false),
@@ -33,7 +33,7 @@ Y_UNIT_TEST_SUITE(MdbEndpoingGenerator) {
UNIT_ASSERT_VALUES_EQUAL(
transformer->ToEndpoint(NYql::EDatabaseType::ClickHouse, "rc1a-d6dv17lv47v5mcop.mdb.yandexcloud.net", false),
- TEndpoint("rc1a-d6dv17lv47v5mcop.db.yandex.net", 8123));
+ TEndpoint("rc1a-d6dv17lv47v5mcop.db.yandex.net", 9000));
UNIT_ASSERT_VALUES_EQUAL(
transformer->ToEndpoint(NYql::EDatabaseType::PostgreSQL, "rc1b-eyt6dtobu96rwydq.mdb.yandexcloud.net", true),
diff --git a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go
index b61cb0378f..0b9d9e95af 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/connection_manager.go
@@ -5,7 +5,6 @@ import (
"crypto/tls"
"database/sql"
"fmt"
- "net"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
@@ -76,8 +75,14 @@ func (c *connectionManager) Make(
return nil, fmt.Errorf("currently only basic auth is supported")
}
- if dsi.Protocol != api_common.EProtocol_HTTP {
- // FIXME: fix NATIVE protocol in https://st.yandex-team.ru/YQ-2286
+ var protocol clickhouse.Protocol
+
+ switch dsi.Protocol {
+ case api_common.EProtocol_NATIVE:
+ protocol = clickhouse.Native
+ case api_common.EProtocol_HTTP:
+ protocol = clickhouse.HTTP
+ default:
return nil, fmt.Errorf("can not run ClickHouse connection with protocol '%v'", dsi.Protocol)
}
@@ -88,11 +93,6 @@ func (c *connectionManager) Make(
Username: dsi.Credentials.GetBasic().Username,
Password: dsi.Credentials.GetBasic().Password,
},
- DialContext: func(ctx context.Context, addr string) (net.Conn, error) {
- var d net.Dialer
-
- return d.DialContext(ctx, "tcp", addr)
- },
Debug: true,
Debugf: func(format string, v ...any) {
logger.Debugf(format, v...)
@@ -100,7 +100,7 @@ func (c *connectionManager) Make(
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
- Protocol: clickhouse.HTTP,
+ Protocol: protocol,
}
if dsi.UseTls {
@@ -109,26 +109,20 @@ func (c *connectionManager) Make(
}
}
- // FIXME: uncomment after YQ-2286
- // conn, err := clickhouse.Open(opts)
- // if err != nil {
- // return nil, fmt.Errorf("open connection: %w", err)
- // }
-
conn := clickhouse.OpenDB(opts)
-
if err := conn.Ping(); err != nil {
return nil, fmt.Errorf("conn ping: %w", err)
}
const (
- maxIdleConns = 5
- maxOpenConns = 10
+ maxIdleConns = 5
+ maxOpenConns = 10
+ connMaxLifetime = time.Hour
)
conn.SetMaxIdleConns(maxIdleConns)
conn.SetMaxOpenConns(maxOpenConns)
- conn.SetConnMaxLifetime(time.Hour)
+ conn.SetConnMaxLifetime(connMaxLifetime)
queryLogger := c.QueryLoggerFactory.Make(logger)
diff --git a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go
index 3bc595bdd8..2f9071bc65 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/query_executor.go
@@ -4,7 +4,6 @@ import (
"context"
"fmt"
- "github.com/ydb-platform/ydb/library/go/core/log"
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
)
@@ -12,12 +11,8 @@ import (
type queryExecutor struct {
}
-func (qm queryExecutor) DescribeTable(
- ctx context.Context,
- conn *Connection,
- request *api_service_protos.TDescribeTableRequest,
-) (utils.Rows, error) {
- out, err := conn.QueryContext(
+func (qm queryExecutor) DescribeTable(ctx context.Context, conn *Connection, request *api_service_protos.TDescribeTableRequest) (utils.Rows, error) {
+ out, err := conn.Query(
ctx,
"SELECT name, type FROM system.columns WHERE table = ? and database = ?",
request.Table,
@@ -29,16 +24,10 @@ func (qm queryExecutor) DescribeTable(
}
if err := out.Err(); err != nil {
- defer func() {
- if err := out.Close(); err != nil {
- conn.logger.Error("close rows", log.Error(err))
- }
- }()
-
return nil, fmt.Errorf("rows err: %w", err)
}
- return rows{Rows: out}, nil
+ return out, nil
}
func NewQueryExecutor() utils.QueryExecutor[*Connection] {