diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-12-01 12:48:42 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-12-01 14:55:59 +0300 |
commit | 84229e665d1613a00cc3faa7bb9b471f69dbe7d9 (patch) | |
tree | 2a88eeb81cc447fb7a574602b67f2f1fdc66f910 | |
parent | c592f379889405b544d359c62b0a48404f7c83d6 (diff) | |
download | ydb-84229e665d1613a00cc3faa7bb9b471f69dbe7d9.tar.gz |
YQ Connector:integration test for tables exceeding the size of a single protocol message
6 files changed, 128 insertions, 23 deletions
diff --git a/ydb/library/yql/providers/generic/connector/recipe/__main__.py b/ydb/library/yql/providers/generic/connector/recipe/__main__.py index d873a7cbe5..3fe692664a 100644 --- a/ydb/library/yql/providers/generic/connector/recipe/__main__.py +++ b/ydb/library/yql/providers/generic/connector/recipe/__main__.py @@ -7,7 +7,7 @@ import os import logging import socket from pathlib import Path -from typing import Final +from typing import Dict, Final import tempfile import jinja2 @@ -26,24 +26,30 @@ def start(argv): connector = yat.build_path("ydb/library/yql/providers/generic/connector/app/yq-connector") - grpc_host = "0.0.0.0" - grpc_port = find_free_ports(1)[0] + options = { + "grpc_host": "0.0.0.0", + "grpc_port": find_free_ports(1)[0], + "paging_bytes_per_page": 1024, + "paging_prefetch_queue_capacity": 2, + } - config_path = _render_config(grpc_host=grpc_host, grpc_port=grpc_port) + config_path = _render_config(options) logger.info('Starting connector server...') start_daemon( command=[connector, 'server', f'--config={config_path}'], pid_file_name=yat.work_path(CONNECTOR_PID_FILE), - is_alive_check=lambda: _is_alive_check(grpc_host, grpc_port), - environment=_update_environment(grpc_host=grpc_host, grpc_port=grpc_port), + is_alive_check=lambda: _is_alive_check(host=options["grpc_host"], port=options["grpc_port"]), + environment=_update_environment(options), ) logger.info('Connector server started') -def _render_config(grpc_host: str, grpc_port: int) -> Path: +def _render_config( + options: Dict, +) -> Path: template_ = ''' connector_server { endpoint { @@ -58,13 +64,13 @@ def _render_config(grpc_host: str, grpc_port: int) -> Path: } paging { - bytes_per_page: 1024 - prefetch_queue_capacity: 2 + bytes_per_page: {{paging_bytes_per_page}} + prefetch_queue_capacity: {{paging_prefetch_queue_capacity}} } ''' template = jinja2.Environment(loader=jinja2.BaseLoader).from_string(template_) - content = template.render(grpc_host=grpc_host, grpc_port=grpc_port) + content = template.render(**options) tmp = tempfile.NamedTemporaryFile(delete=False) with open(tmp.name, 'w') as f: f.write(content) @@ -72,10 +78,12 @@ def _render_config(grpc_host: str, grpc_port: int) -> Path: return tmp.name -def _update_environment(grpc_host: str, grpc_port: int): +def _update_environment(options: Dict): variables = { - 'YQL_RECIPE_CONNECTOR_GRPC_HOST': grpc_host, - 'YQL_RECIPE_CONNECTOR_GRPC_PORT': str(grpc_port), + 'YDB_CONNECTOR_RECIPE_GRPC_HOST': options['grpc_host'], + 'YDB_CONNECTOR_RECIPE_GRPC_PORT': str(options['grpc_port']), + 'YDB_CONNECTOR_RECIPE_GRPC_PAGING_BYTES_PER_PAGE': str(options['paging_bytes_per_page']), + 'YDB_CONNECTOR_RECIPE_GRPC_PAGING_PREFETCH_QUEUE_CAPACITY': str(options['paging_prefetch_queue_capacity']), } for variable in variables.items(): diff --git a/ydb/library/yql/providers/generic/connector/tests/test_cases/collection.py b/ydb/library/yql/providers/generic/connector/tests/test_cases/collection.py index cc1752914e..7b9b278c8f 100644 --- a/ydb/library/yql/providers/generic/connector/tests/test_cases/collection.py +++ b/ydb/library/yql/providers/generic/connector/tests/test_cases/collection.py @@ -21,9 +21,9 @@ class Collection(object): 'select_missing_database': select_missing_database.Factory().make_test_cases(), 'select_missing_table': select_missing_table.Factory().make_test_cases(), 'select_positive_postgresql': select_positive_postgresql.Factory().make_test_cases() - + select_positive_common.Factory().make_test_cases(EDataSourceKind.POSTGRESQL), + + select_positive_common.Factory(ss).make_test_cases(EDataSourceKind.POSTGRESQL), 'select_positive_clickhouse': select_positive_clickhouse.Factory().make_test_cases() - + select_positive_common.Factory().make_test_cases(EDataSourceKind.CLICKHOUSE), + + select_positive_common.Factory(ss).make_test_cases(EDataSourceKind.CLICKHOUSE), 'select_datetime': select_datetime.Factory().make_test_cases(), 'select_pg_schema': select_pg_schema.Factory().make_test_cases(), } diff --git a/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_common.py b/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_common.py index 6c0a8a910c..b52608d88c 100644 --- a/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_common.py +++ b/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive_common.py @@ -5,6 +5,8 @@ from typing import Sequence, Optional from ydb.library.yql.providers.generic.connector.api.common.data_source_pb2 import EDataSourceKind, EProtocol from ydb.public.api.protos.ydb_value_pb2 import Type +from ydb.library.yql.providers.generic.connector.tests.utils.settings import Settings +from ydb.library.yql.providers.generic.connector.tests.utils.generate import generate_table_data import ydb.library.yql.providers.generic.connector.tests.utils.clickhouse as clickhouse import ydb.library.yql.providers.generic.connector.tests.utils.postgresql as postgresql from ydb.library.yql.providers.generic.connector.tests.utils.database import Database @@ -20,8 +22,6 @@ from ydb.library.yql.providers.generic.connector.tests.utils.schema import ( from ydb.library.yql.providers.generic.connector.tests.test_cases.base import BaseTestCase from ydb.library.yql.providers.generic.connector.tests.utils.settings import GenericSettings -# TODO: Canonize test data in YQL way https://st.yandex-team.ru/YQ-2108 - @dataclass class TestCase(BaseTestCase): @@ -46,6 +46,11 @@ class TestCase(BaseTestCase): class Factory: + ss: Settings + + def __init__(self, ss: Settings): + self.ss = ss + def _column_selection(self) -> Sequence[TestCase]: ''' In these test case set we check SELECT from a small table with various SELECT parameters, @@ -198,6 +203,57 @@ class Factory: return test_cases + def _large_table(self) -> Sequence[TestCase]: + ''' + In this test the dataset is obviously large than a single page + (a single message of ReadSplits protocol), so it will take at least several protocol messages + to transfer the table from Connector to the engine (dqrun, kqprun). + + Therefore, we will check: + 1. Connector's data prefetching logic + 2. Engine backpressure logic + + # TODO: assert connector stats when it will be accessible + ''' + + table_size = 2.5 * self.ss.connector.paging_bytes_per_page * self.ss.connector.paging_prefetch_queue_capacity + + schema = Schema( + columns=ColumnList( + Column( + name='col_int64', + ydb_type=Type.INT64, + data_source_type=DataSourceType(ch=clickhouse.Int32(), pg=postgresql.Int8()), + ), + Column( + name='col_string', + ydb_type=Type.UTF8, + data_source_type=DataSourceType(ch=clickhouse.String(), pg=postgresql.Text()), + ), + ) + ) + + data_in = generate_table_data(schema=schema, bytes_soft_limit=table_size) + data_source_kinds = [EDataSourceKind.CLICKHOUSE, EDataSourceKind.POSTGRESQL] + + test_cases = [] + for data_source_kind in data_source_kinds: + tc = TestCase( + name=f'large_table_{data_source_kind}', + data_source_kind=data_source_kind, + data_in=data_in, + data_out_=data_in, + select_what=SelectWhat.asterisk(schema.columns), + select_where=None, + schema=schema, + database=Database.make_for_data_source_kind(data_source_kind), + pragmas=dict(), + ) + + test_cases.append(tc) + + return test_cases + def make_test_cases(self, data_source_kind: EDataSourceKind) -> Sequence[TestCase]: protocols = { EDataSourceKind.CLICKHOUSE: [EProtocol.NATIVE, EProtocol.HTTP], @@ -207,16 +263,18 @@ class Factory: base_test_cases = list( itertools.chain( self._column_selection(), + self._large_table(), ) ) test_cases = [] for base_tc in base_test_cases: - if base_tc != data_source_kind: + if base_tc.data_source_kind != data_source_kind: continue for protocol in protocols[base_tc.data_source_kind]: tc = replace(base_tc) tc.name += f'_{protocol}' tc.protocol = protocol test_cases.append(tc) + return test_cases diff --git a/ydb/library/yql/providers/generic/connector/tests/utils/generate.py b/ydb/library/yql/providers/generic/connector/tests/utils/generate.py new file mode 100644 index 0000000000..106f0aa3ac --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/tests/utils/generate.py @@ -0,0 +1,34 @@ +import hashlib + +from typing import Sequence + +from utils.schema import Schema +from ydb.public.api.protos.ydb_value_pb2 import Type + + +def generate_table_data(schema: Schema, bytes_soft_limit: int) -> Sequence[Sequence]: + rows = [] + + ix = 0 + actual_size = 0 + + while actual_size < bytes_soft_limit: + row = [] + + for col in schema.columns: + match col.ydb_type: + case Type.INT64: + row.append(ix) + actual_size += 8 + case Type.UTF8: + value = hashlib.md5(str(2).encode('utf-8')).hexdigest() + row.append(value) + actual_size += len(value) + case _: + raise ValueError(f'unexpected type {col.ydb_type}') + + rows.append(row) + + ix += 1 + + return rows diff --git a/ydb/library/yql/providers/generic/connector/tests/utils/settings.py b/ydb/library/yql/providers/generic/connector/tests/utils/settings.py index 75d532b82b..8cb049c2e7 100644 --- a/ydb/library/yql/providers/generic/connector/tests/utils/settings.py +++ b/ydb/library/yql/providers/generic/connector/tests/utils/settings.py @@ -1,6 +1,6 @@ from os import environ from dataclasses import dataclass -from typing import Optional +from typing import Optional, Sequence from ydb.library.yql.providers.generic.connector.api.common.data_source_pb2 import EDataSourceKind, EProtocol from ydb.library.yql.providers.generic.connector.api.service.protos.connector_pb2 import EDateTimeFormat @@ -12,6 +12,8 @@ class Settings: class Connector: grpc_host: str grpc_port: int + paging_bytes_per_page: int + paging_prefetch_queue_capacity: int connector: Connector @@ -42,8 +44,10 @@ class Settings: def from_env(cls) -> 'Settings': return cls( connector=cls.Connector( - grpc_host=environ['YQL_RECIPE_CONNECTOR_GRPC_HOST'], - grpc_port=int(environ['YQL_RECIPE_CONNECTOR_GRPC_PORT']), + grpc_host=environ['YDB_CONNECTOR_RECIPE_GRPC_HOST'], + grpc_port=int(environ['YDB_CONNECTOR_RECIPE_GRPC_PORT']), + paging_bytes_per_page=int(environ['YDB_CONNECTOR_RECIPE_GRPC_PAGING_BYTES_PER_PAGE']), + paging_prefetch_queue_capacity=int(environ['YDB_CONNECTOR_RECIPE_GRPC_PAGING_PREFETCH_QUEUE_CAPACITY']), ), clickhouse=cls.ClickHouse( cluster_name='clickhouse_integration_test', @@ -84,7 +88,7 @@ class GenericSettings: database: str protocol: EProtocol - clickhouse_clusters: list[ClickHouseCluster] + clickhouse_clusters: Sequence[ClickHouseCluster] @dataclass class PostgreSQLCluster: @@ -94,6 +98,6 @@ class GenericSettings: database: str schema: str - postgresql_clusters: list[PostgreSQLCluster] + postgresql_clusters: Sequence[PostgreSQLCluster] date_time_format: EDateTimeFormat diff --git a/ydb/library/yql/providers/generic/connector/tests/utils/ya.make b/ydb/library/yql/providers/generic/connector/tests/utils/ya.make index bcbd9f2fea..5e0640ecfa 100644 --- a/ydb/library/yql/providers/generic/connector/tests/utils/ya.make +++ b/ydb/library/yql/providers/generic/connector/tests/utils/ya.make @@ -7,6 +7,7 @@ PY_SRCS( comparator.py database.py dqrun.py + generate.py kqprun.py log.py postgresql.py |