aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-12-01 12:48:42 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-12-01 14:55:59 +0300
commit84229e665d1613a00cc3faa7bb9b471f69dbe7d9 (patch)
tree2a88eeb81cc447fb7a574602b67f2f1fdc66f910
parentc592f379889405b544d359c62b0a48404f7c83d6 (diff)
downloadydb-84229e665d1613a00cc3faa7bb9b471f69dbe7d9.tar.gz
YQ Connector:integration test for tables exceeding the size of a single protocol message
-rw-r--r--ydb/library/yql/providers/generic/connector/recipe/__main__.py34
-rw-r--r--ydb/library/yql/providers/generic/connector/tests/test_cases/collection.py4
-rw-r--r--ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_common.py64
-rw-r--r--ydb/library/yql/providers/generic/connector/tests/utils/generate.py34
-rw-r--r--ydb/library/yql/providers/generic/connector/tests/utils/settings.py14
-rw-r--r--ydb/library/yql/providers/generic/connector/tests/utils/ya.make1
6 files changed, 128 insertions, 23 deletions
diff --git a/ydb/library/yql/providers/generic/connector/recipe/__main__.py b/ydb/library/yql/providers/generic/connector/recipe/__main__.py
index d873a7cbe5..3fe692664a 100644
--- a/ydb/library/yql/providers/generic/connector/recipe/__main__.py
+++ b/ydb/library/yql/providers/generic/connector/recipe/__main__.py
@@ -7,7 +7,7 @@ import os
import logging
import socket
from pathlib import Path
-from typing import Final
+from typing import Dict, Final
import tempfile
import jinja2
@@ -26,24 +26,30 @@ def start(argv):
connector = yat.build_path("ydb/library/yql/providers/generic/connector/app/yq-connector")
- grpc_host = "0.0.0.0"
- grpc_port = find_free_ports(1)[0]
+ options = {
+ "grpc_host": "0.0.0.0",
+ "grpc_port": find_free_ports(1)[0],
+ "paging_bytes_per_page": 1024,
+ "paging_prefetch_queue_capacity": 2,
+ }
- config_path = _render_config(grpc_host=grpc_host, grpc_port=grpc_port)
+ config_path = _render_config(options)
logger.info('Starting connector server...')
start_daemon(
command=[connector, 'server', f'--config={config_path}'],
pid_file_name=yat.work_path(CONNECTOR_PID_FILE),
- is_alive_check=lambda: _is_alive_check(grpc_host, grpc_port),
- environment=_update_environment(grpc_host=grpc_host, grpc_port=grpc_port),
+ is_alive_check=lambda: _is_alive_check(host=options["grpc_host"], port=options["grpc_port"]),
+ environment=_update_environment(options),
)
logger.info('Connector server started')
-def _render_config(grpc_host: str, grpc_port: int) -> Path:
+def _render_config(
+ options: Dict,
+) -> Path:
template_ = '''
connector_server {
endpoint {
@@ -58,13 +64,13 @@ def _render_config(grpc_host: str, grpc_port: int) -> Path:
}
paging {
- bytes_per_page: 1024
- prefetch_queue_capacity: 2
+ bytes_per_page: {{paging_bytes_per_page}}
+ prefetch_queue_capacity: {{paging_prefetch_queue_capacity}}
}
'''
template = jinja2.Environment(loader=jinja2.BaseLoader).from_string(template_)
- content = template.render(grpc_host=grpc_host, grpc_port=grpc_port)
+ content = template.render(**options)
tmp = tempfile.NamedTemporaryFile(delete=False)
with open(tmp.name, 'w') as f:
f.write(content)
@@ -72,10 +78,12 @@ def _render_config(grpc_host: str, grpc_port: int) -> Path:
return tmp.name
-def _update_environment(grpc_host: str, grpc_port: int):
+def _update_environment(options: Dict):
variables = {
- 'YQL_RECIPE_CONNECTOR_GRPC_HOST': grpc_host,
- 'YQL_RECIPE_CONNECTOR_GRPC_PORT': str(grpc_port),
+ 'YDB_CONNECTOR_RECIPE_GRPC_HOST': options['grpc_host'],
+ 'YDB_CONNECTOR_RECIPE_GRPC_PORT': str(options['grpc_port']),
+ 'YDB_CONNECTOR_RECIPE_GRPC_PAGING_BYTES_PER_PAGE': str(options['paging_bytes_per_page']),
+ 'YDB_CONNECTOR_RECIPE_GRPC_PAGING_PREFETCH_QUEUE_CAPACITY': str(options['paging_prefetch_queue_capacity']),
}
for variable in variables.items():
diff --git a/ydb/library/yql/providers/generic/connector/tests/test_cases/collection.py b/ydb/library/yql/providers/generic/connector/tests/test_cases/collection.py
index cc1752914e..7b9b278c8f 100644
--- a/ydb/library/yql/providers/generic/connector/tests/test_cases/collection.py
+++ b/ydb/library/yql/providers/generic/connector/tests/test_cases/collection.py
@@ -21,9 +21,9 @@ class Collection(object):
'select_missing_database': select_missing_database.Factory().make_test_cases(),
'select_missing_table': select_missing_table.Factory().make_test_cases(),
'select_positive_postgresql': select_positive_postgresql.Factory().make_test_cases()
- + select_positive_common.Factory().make_test_cases(EDataSourceKind.POSTGRESQL),
+ + select_positive_common.Factory(ss).make_test_cases(EDataSourceKind.POSTGRESQL),
'select_positive_clickhouse': select_positive_clickhouse.Factory().make_test_cases()
- + select_positive_common.Factory().make_test_cases(EDataSourceKind.CLICKHOUSE),
+ + select_positive_common.Factory(ss).make_test_cases(EDataSourceKind.CLICKHOUSE),
'select_datetime': select_datetime.Factory().make_test_cases(),
'select_pg_schema': select_pg_schema.Factory().make_test_cases(),
}
diff --git a/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_common.py b/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_common.py
index 6c0a8a910c..b52608d88c 100644
--- a/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_common.py
+++ b/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_common.py
@@ -5,6 +5,8 @@ from typing import Sequence, Optional
from ydb.library.yql.providers.generic.connector.api.common.data_source_pb2 import EDataSourceKind, EProtocol
from ydb.public.api.protos.ydb_value_pb2 import Type
+from ydb.library.yql.providers.generic.connector.tests.utils.settings import Settings
+from ydb.library.yql.providers.generic.connector.tests.utils.generate import generate_table_data
import ydb.library.yql.providers.generic.connector.tests.utils.clickhouse as clickhouse
import ydb.library.yql.providers.generic.connector.tests.utils.postgresql as postgresql
from ydb.library.yql.providers.generic.connector.tests.utils.database import Database
@@ -20,8 +22,6 @@ from ydb.library.yql.providers.generic.connector.tests.utils.schema import (
from ydb.library.yql.providers.generic.connector.tests.test_cases.base import BaseTestCase
from ydb.library.yql.providers.generic.connector.tests.utils.settings import GenericSettings
-# TODO: Canonize test data in YQL way https://st.yandex-team.ru/YQ-2108
-
@dataclass
class TestCase(BaseTestCase):
@@ -46,6 +46,11 @@ class TestCase(BaseTestCase):
class Factory:
+ ss: Settings
+
+ def __init__(self, ss: Settings):
+ self.ss = ss
+
def _column_selection(self) -> Sequence[TestCase]:
'''
In these test case set we check SELECT from a small table with various SELECT parameters,
@@ -198,6 +203,57 @@ class Factory:
return test_cases
+ def _large_table(self) -> Sequence[TestCase]:
+ '''
+ In this test the dataset is obviously large than a single page
+ (a single message of ReadSplits protocol), so it will take at least several protocol messages
+ to transfer the table from Connector to the engine (dqrun, kqprun).
+
+ Therefore, we will check:
+ 1. Connector's data prefetching logic
+ 2. Engine backpressure logic
+
+ # TODO: assert connector stats when it will be accessible
+ '''
+
+ table_size = 2.5 * self.ss.connector.paging_bytes_per_page * self.ss.connector.paging_prefetch_queue_capacity
+
+ schema = Schema(
+ columns=ColumnList(
+ Column(
+ name='col_int64',
+ ydb_type=Type.INT64,
+ data_source_type=DataSourceType(ch=clickhouse.Int32(), pg=postgresql.Int8()),
+ ),
+ Column(
+ name='col_string',
+ ydb_type=Type.UTF8,
+ data_source_type=DataSourceType(ch=clickhouse.String(), pg=postgresql.Text()),
+ ),
+ )
+ )
+
+ data_in = generate_table_data(schema=schema, bytes_soft_limit=table_size)
+ data_source_kinds = [EDataSourceKind.CLICKHOUSE, EDataSourceKind.POSTGRESQL]
+
+ test_cases = []
+ for data_source_kind in data_source_kinds:
+ tc = TestCase(
+ name=f'large_table_{data_source_kind}',
+ data_source_kind=data_source_kind,
+ data_in=data_in,
+ data_out_=data_in,
+ select_what=SelectWhat.asterisk(schema.columns),
+ select_where=None,
+ schema=schema,
+ database=Database.make_for_data_source_kind(data_source_kind),
+ pragmas=dict(),
+ )
+
+ test_cases.append(tc)
+
+ return test_cases
+
def make_test_cases(self, data_source_kind: EDataSourceKind) -> Sequence[TestCase]:
protocols = {
EDataSourceKind.CLICKHOUSE: [EProtocol.NATIVE, EProtocol.HTTP],
@@ -207,16 +263,18 @@ class Factory:
base_test_cases = list(
itertools.chain(
self._column_selection(),
+ self._large_table(),
)
)
test_cases = []
for base_tc in base_test_cases:
- if base_tc != data_source_kind:
+ if base_tc.data_source_kind != data_source_kind:
continue
for protocol in protocols[base_tc.data_source_kind]:
tc = replace(base_tc)
tc.name += f'_{protocol}'
tc.protocol = protocol
test_cases.append(tc)
+
return test_cases
diff --git a/ydb/library/yql/providers/generic/connector/tests/utils/generate.py b/ydb/library/yql/providers/generic/connector/tests/utils/generate.py
new file mode 100644
index 0000000000..106f0aa3ac
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/tests/utils/generate.py
@@ -0,0 +1,34 @@
+import hashlib
+
+from typing import Sequence
+
+from utils.schema import Schema
+from ydb.public.api.protos.ydb_value_pb2 import Type
+
+
+def generate_table_data(schema: Schema, bytes_soft_limit: int) -> Sequence[Sequence]:
+ rows = []
+
+ ix = 0
+ actual_size = 0
+
+ while actual_size < bytes_soft_limit:
+ row = []
+
+ for col in schema.columns:
+ match col.ydb_type:
+ case Type.INT64:
+ row.append(ix)
+ actual_size += 8
+ case Type.UTF8:
+ value = hashlib.md5(str(2).encode('utf-8')).hexdigest()
+ row.append(value)
+ actual_size += len(value)
+ case _:
+ raise ValueError(f'unexpected type {col.ydb_type}')
+
+ rows.append(row)
+
+ ix += 1
+
+ return rows
diff --git a/ydb/library/yql/providers/generic/connector/tests/utils/settings.py b/ydb/library/yql/providers/generic/connector/tests/utils/settings.py
index 75d532b82b..8cb049c2e7 100644
--- a/ydb/library/yql/providers/generic/connector/tests/utils/settings.py
+++ b/ydb/library/yql/providers/generic/connector/tests/utils/settings.py
@@ -1,6 +1,6 @@
from os import environ
from dataclasses import dataclass
-from typing import Optional
+from typing import Optional, Sequence
from ydb.library.yql.providers.generic.connector.api.common.data_source_pb2 import EDataSourceKind, EProtocol
from ydb.library.yql.providers.generic.connector.api.service.protos.connector_pb2 import EDateTimeFormat
@@ -12,6 +12,8 @@ class Settings:
class Connector:
grpc_host: str
grpc_port: int
+ paging_bytes_per_page: int
+ paging_prefetch_queue_capacity: int
connector: Connector
@@ -42,8 +44,10 @@ class Settings:
def from_env(cls) -> 'Settings':
return cls(
connector=cls.Connector(
- grpc_host=environ['YQL_RECIPE_CONNECTOR_GRPC_HOST'],
- grpc_port=int(environ['YQL_RECIPE_CONNECTOR_GRPC_PORT']),
+ grpc_host=environ['YDB_CONNECTOR_RECIPE_GRPC_HOST'],
+ grpc_port=int(environ['YDB_CONNECTOR_RECIPE_GRPC_PORT']),
+ paging_bytes_per_page=int(environ['YDB_CONNECTOR_RECIPE_GRPC_PAGING_BYTES_PER_PAGE']),
+ paging_prefetch_queue_capacity=int(environ['YDB_CONNECTOR_RECIPE_GRPC_PAGING_PREFETCH_QUEUE_CAPACITY']),
),
clickhouse=cls.ClickHouse(
cluster_name='clickhouse_integration_test',
@@ -84,7 +88,7 @@ class GenericSettings:
database: str
protocol: EProtocol
- clickhouse_clusters: list[ClickHouseCluster]
+ clickhouse_clusters: Sequence[ClickHouseCluster]
@dataclass
class PostgreSQLCluster:
@@ -94,6 +98,6 @@ class GenericSettings:
database: str
schema: str
- postgresql_clusters: list[PostgreSQLCluster]
+ postgresql_clusters: Sequence[PostgreSQLCluster]
date_time_format: EDateTimeFormat
diff --git a/ydb/library/yql/providers/generic/connector/tests/utils/ya.make b/ydb/library/yql/providers/generic/connector/tests/utils/ya.make
index bcbd9f2fea..5e0640ecfa 100644
--- a/ydb/library/yql/providers/generic/connector/tests/utils/ya.make
+++ b/ydb/library/yql/providers/generic/connector/tests/utils/ya.make
@@ -7,6 +7,7 @@ PY_SRCS(
comparator.py
database.py
dqrun.py
+ generate.py
kqprun.py
log.py
postgresql.py